use crate::record::update_record::UpdateRecordBuilder;
use crate::record::write_batched_records_v1::WriteBatchType;
use crate::{Bucket, WriteBatchBuilder, WriteRecordBatchBuilder};
use std::sync::Arc;
impl Bucket {
pub fn update_record(&self, entry: &str) -> UpdateRecordBuilder {
UpdateRecordBuilder::new(
self.name.clone(),
entry.to_string(),
Arc::clone(&self.http_client),
)
}
pub fn update_batch(&self, entry: &str) -> WriteBatchBuilder {
WriteBatchBuilder::new(
self.name.clone(),
entry.to_string(),
Arc::clone(&self.http_client),
WriteBatchType::Update,
)
}
pub fn update_record_batch(&self) -> WriteRecordBatchBuilder {
WriteRecordBatchBuilder::new(
self.name.clone(),
Arc::clone(&self.http_client),
WriteBatchType::Update,
)
}
}
#[cfg(test)]
mod tests {
use crate::bucket::tests::bucket;
use crate::{Bucket, RecordBuilder};
use bytes::Bytes;
use reduct_base::error::ErrorCode;
use rstest::rstest;
#[rstest]
#[tokio::test]
async fn test_update_record(#[future] bucket: Bucket) {
let bucket: Bucket = bucket.await;
bucket
.write_record("test")
.timestamp_us(1000)
.data(Bytes::from("Hey"))
.add_label("test", "1")
.add_label("x", "y")
.send()
.await
.unwrap();
bucket
.update_record("test")
.timestamp_us(1000)
.update_label("test", "2")
.remove_label("x")
.send()
.await
.unwrap();
let record = bucket
.read_record("test")
.timestamp_us(1000)
.send()
.await
.unwrap();
assert_eq!(record.labels().get("test"), Some(&"2".to_string()));
}
#[rstest]
#[tokio::test]
async fn test_update_record_batched(#[future] bucket: Bucket) {
let bucket: Bucket = bucket.await;
bucket
.write_record("test")
.timestamp_us(1000)
.data(Bytes::from("Hey"))
.add_label("test", "1")
.add_label("x", "y")
.send()
.await
.unwrap();
let batch = bucket.update_batch("test");
let record1 = RecordBuilder::new()
.timestamp_us(1000)
.add_label("test".to_string(), "2".to_string())
.add_label("x".to_string(), "".to_string())
.build();
let record2 = RecordBuilder::new()
.timestamp_us(10000)
.add_label("test".to_string(), "3".to_string())
.build();
let error_map = batch
.add_record(record1)
.add_record(record2)
.send()
.await
.unwrap();
assert_eq!(error_map.len(), 1);
assert_eq!(error_map.get(&10000).unwrap().status, ErrorCode::NotFound);
let record = bucket
.read_record("test")
.timestamp_us(1000)
.send()
.await
.unwrap();
assert_eq!(record.labels().get("test"), Some(&"2".to_string()));
assert_eq!(record.labels().get("x"), None);
}
#[rstest]
#[tokio::test]
async fn test_update_record_batched_multi_entry(#[future] bucket: Bucket) {
let bucket: Bucket = bucket.await;
bucket
.write_record("entry-3")
.timestamp_us(1000)
.data(Bytes::from("A"))
.add_label("test", "1")
.add_label("x", "y")
.send()
.await
.unwrap();
bucket
.write_record("entry-4")
.timestamp_us(1000)
.data(Bytes::from("B"))
.add_label("test", "1")
.add_label("x", "y")
.send()
.await
.unwrap();
let batch = bucket.update_record_batch();
let record1 = RecordBuilder::new()
.entry("entry-3")
.timestamp_us(1000)
.add_label("test".to_string(), "2".to_string())
.add_label("x".to_string(), "".to_string())
.build();
let record2 = RecordBuilder::new()
.entry("entry-4")
.timestamp_us(1000)
.add_label("test".to_string(), "3".to_string())
.build();
let error_map = batch
.add_record(record1)
.add_record(record2)
.send()
.await
.unwrap();
assert!(error_map.is_empty());
let record = bucket
.read_record("entry-3")
.timestamp_us(1000)
.send()
.await
.unwrap();
assert_eq!(record.labels().get("test"), Some(&"2".to_string()));
assert_eq!(record.labels().get("x"), None);
let record = bucket
.read_record("entry-4")
.timestamp_us(1000)
.send()
.await
.unwrap();
assert_eq!(record.labels().get("test"), Some(&"3".to_string()));
assert_eq!(record.labels().get("x"), Some(&"y".to_string()));
}
#[rstest]
#[tokio::test]
async fn test_update_record_batched_multi_entry_with_error(#[future] bucket: Bucket) {
let bucket: Bucket = bucket.await;
bucket
.write_record("entry-3")
.timestamp_us(1000)
.data(Bytes::from("A"))
.add_label("test", "1")
.send()
.await
.unwrap();
let batch = bucket.update_record_batch();
let record1 = RecordBuilder::new()
.entry("entry-3")
.timestamp_us(1000)
.add_label("test".to_string(), "2".to_string())
.build();
let record2 = RecordBuilder::new()
.entry("entry-4")
.timestamp_us(1000)
.add_label("test".to_string(), "3".to_string())
.build();
let error_map = batch
.add_record(record1)
.add_record(record2)
.send()
.await
.unwrap();
assert_eq!(error_map.len(), 1);
let error = error_map.get(&(String::from("entry-4"), 1000)).unwrap();
assert_eq!(error.status, ErrorCode::NotFound);
let record = bucket
.read_record("entry-3")
.timestamp_us(1000)
.send()
.await
.unwrap();
assert_eq!(record.labels().get("test"), Some(&"2".to_string()));
}
}