use crate::storage::bucket::Bucket;
use crate::storage::entry::update_labels::UpdateLabels;
use reduct_base::error::ReductError;
use reduct_base::Labels;
use std::collections::{BTreeMap, HashSet};
#[derive(Clone)]
pub(crate) struct UpdateLabelsMulti {
pub entry_name: String,
pub time: u64,
pub update: Labels,
pub remove: HashSet<String>,
}
type UpdateResult = BTreeMap<String, BTreeMap<u64, Result<Labels, ReductError>>>;
impl Bucket {
pub async fn update_labels(
&self,
updates: Vec<UpdateLabelsMulti>,
) -> Result<UpdateResult, ReductError> {
let mut result: UpdateResult = BTreeMap::new();
let mut updates_per_entry: BTreeMap<String, Vec<(u64, Labels, HashSet<String>)>> =
BTreeMap::new();
for update in updates {
updates_per_entry
.entry(update.entry_name.clone())
.or_default()
.push((update.time, update.update, update.remove));
}
for (entry_name, entry_updates) in updates_per_entry {
match self.get_entry(&entry_name).await {
Ok(entry) => {
let entry = entry.upgrade()?;
let formatted_updates = entry_updates
.into_iter()
.map(|(time, update, remove)| UpdateLabels {
time,
update,
remove,
})
.collect();
let entry_results = entry.update_labels(formatted_updates).await?;
result.insert(entry_name, entry_results);
}
Err(e) => {
let mut entry_result = BTreeMap::new();
for (time, _, _) in entry_updates {
entry_result.insert(time, Err(e.clone()));
}
result.insert(entry_name, entry_result);
}
}
}
Ok(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::bucket::tests::{bucket, write};
use crate::storage::entry::Entry;
use bytes::Bytes;
use reduct_base::io::ReadRecord;
use reduct_base::not_found;
use rstest::rstest;
use std::sync::Arc;
async fn write_with_labels(entry: &Arc<Entry>, time: u64, labels: Labels) {
let mut sender = entry
.clone()
.begin_write(time, 1, "text/plain".to_string(), labels)
.await
.unwrap();
sender
.send(Ok(Some(Bytes::from_static(b"x"))))
.await
.unwrap();
sender.send(Ok(None)).await.unwrap();
}
#[rstest]
#[tokio::test]
async fn updates_labels_across_entries(#[future] bucket: Arc<Bucket>) {
let bucket = bucket.await;
let entry1 = bucket
.get_or_create_entry("entry-1")
.await
.unwrap()
.upgrade()
.unwrap();
let entry2 = bucket
.get_or_create_entry("entry-2")
.await
.unwrap()
.upgrade()
.unwrap();
write_with_labels(
&entry1,
1,
Labels::from_iter(vec![
("keep".to_string(), "v1".to_string()),
("drop".to_string(), "tmp".to_string()),
]),
)
.await;
write_with_labels(
&entry2,
2,
Labels::from_iter(vec![("foo".to_string(), "bar".to_string())]),
)
.await;
let result = bucket
.update_labels(vec![
UpdateLabelsMulti {
entry_name: "entry-1".into(),
time: 1,
update: Labels::from_iter(vec![("keep".into(), "v2".into())]),
remove: HashSet::from_iter(vec!["drop".to_string()]),
},
UpdateLabelsMulti {
entry_name: "entry-2".into(),
time: 2,
update: Labels::from_iter(vec![("new".into(), "v".into())]),
remove: HashSet::new(),
},
])
.await
.unwrap();
let entry1_labels = result
.get("entry-1")
.unwrap()
.get(&1)
.unwrap()
.as_ref()
.unwrap();
assert_eq!(
entry1_labels,
&Labels::from_iter(vec![("keep".into(), "v2".into())])
);
let entry2_labels = result
.get("entry-2")
.unwrap()
.get(&2)
.unwrap()
.as_ref()
.unwrap();
assert_eq!(
entry2_labels,
&Labels::from_iter(vec![
("foo".into(), "bar".into()),
("new".into(), "v".into())
])
);
let stored1 = entry1.begin_read(1).await.unwrap();
assert_eq!(stored1.meta().labels(), entry1_labels);
let stored2 = entry2.begin_read(2).await.unwrap();
assert_eq!(stored2.meta().labels(), entry2_labels);
}
#[rstest]
#[tokio::test]
async fn returns_error_for_missing_entry(#[future] bucket: Arc<Bucket>) {
let bucket = bucket.await;
write(&bucket, "present", 1, b"a").await.unwrap();
let result = bucket
.update_labels(vec![
UpdateLabelsMulti {
entry_name: "present".into(),
time: 1,
update: Labels::from_iter(vec![("a".into(), "b".into())]),
remove: HashSet::new(),
},
UpdateLabelsMulti {
entry_name: "missing".into(),
time: 2,
update: Labels::new(),
remove: HashSet::new(),
},
])
.await
.unwrap();
assert!(result
.get("present")
.unwrap()
.get(&1)
.unwrap()
.as_ref()
.unwrap()
.contains_key("a"));
let err = result
.get("missing")
.unwrap()
.get(&2)
.unwrap()
.as_ref()
.err()
.unwrap();
assert_eq!(
err,
¬_found!("Entry 'missing' not found in bucket 'test'")
);
}
#[rstest]
#[tokio::test]
async fn remove_true_updates_meta_record_with_tombstone(#[future] bucket: Arc<Bucket>) {
let bucket = bucket.await;
let meta_entry = bucket
.get_or_create_entry("entry-1/$meta")
.await
.unwrap()
.upgrade()
.unwrap();
write_with_labels(
&meta_entry,
1000,
Labels::from_iter(vec![("key".to_string(), "meta-1".to_string())]),
)
.await;
bucket
.update_labels(vec![UpdateLabelsMulti {
entry_name: "entry-1/$meta".into(),
time: 1000,
update: Labels::from_iter(vec![("remove".into(), "true".into())]),
remove: HashSet::new(),
}])
.await
.unwrap();
let reader = meta_entry.begin_read(1000).await.unwrap();
assert_eq!(
reader.meta().labels().get("remove"),
Some(&"true".to_string())
);
}
}