use crate::storage::entry::Entry;
use crate::storage::proto::record::Label;
use crate::storage::proto::Record;
use reduct_base::error::ReductError;
use reduct_base::{not_found, Labels};
use std::collections::{BTreeMap, HashSet};
use std::sync::Arc;
use tokio::task::JoinHandle;
pub(crate) struct UpdateLabels {
pub time: u64,
pub update: Labels,
pub remove: HashSet<String>,
}
type UpdateResult = BTreeMap<u64, Result<Labels, ReductError>>;
impl Entry {
pub async fn update_labels(
self: Arc<Self>,
updates: Vec<UpdateLabels>,
) -> Result<UpdateResult, ReductError> {
let mut result = UpdateResult::new();
let mut records_per_block = BTreeMap::new();
for UpdateLabels {
time,
update,
remove,
} in updates
{
let located = {
let mut bm = self.block_manager.write().await?;
match bm.find_block(time).await {
Ok(block_ref) => {
let block = block_ref.read().await?;
block
.get_record(time)
.cloned()
.map(|record| (block.block_id(), record))
}
Err(err) => {
result.insert(time, Err(err));
continue;
}
}
};
let Some((block_id, record)) = located else {
result.insert(
time,
Err(not_found!(
"Record {} not found in entry {}/{}",
time,
self.bucket_name,
self.name
)),
);
continue;
};
let record = Entry::update_single_label(record, update, remove);
records_per_block
.entry(block_id)
.or_insert_with(Vec::new)
.push(record.clone());
result.insert(
time,
Ok(record
.labels
.iter()
.map(|label| (label.name.clone(), label.value.clone()))
.collect()),
);
}
let mut handlers = Vec::new();
for (block_id, records) in records_per_block.into_iter() {
let local_block_manager = self.block_manager.clone();
let handler: JoinHandle<Result<_, ReductError>> = tokio::spawn(async move {
let mut bm = local_block_manager.write().await?;
bm.update_records(block_id, records).await?;
Ok(())
});
handlers.push(handler);
}
for handler in handlers {
handler.await.unwrap()?;
}
Ok(result)
}
pub(in crate::storage::entry) fn update_single_label(
mut record: Record,
mut update: Labels,
remove: HashSet<String>,
) -> Record {
let mut new_labels = Vec::new();
for label in &record.labels {
if remove.contains(label.name.as_str()) {
continue;
}
match update.remove(label.name.as_str()) {
Some(value) => {
new_labels.push(Label {
name: label.name.clone(),
value,
});
}
None => {
new_labels.push(label.clone());
}
}
}
for (name, value) in update {
new_labels.push(Label { name, value });
}
record.labels = new_labels;
record
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::entry::tests::{entry, write_record_with_labels};
use crate::storage::entry::EntrySettings;
use reduct_base::io::ReadRecord;
use rstest::rstest;
use std::sync::Arc;
#[rstest]
#[tokio::test]
async fn test_update_labels(#[future] entry: Arc<Entry>) {
let entry = entry.await;
entry
.set_settings(EntrySettings {
max_block_records: 2,
..entry.settings().await.unwrap()
})
.await
.unwrap();
write_stub_record(&entry, 1).await;
write_stub_record(&entry, 2).await;
write_stub_record(&entry, 3).await;
let result = entry
.clone()
.update_labels(vec![
make_update(0),
make_update(1),
make_update(2),
make_update(3),
make_update(5),
])
.await
.unwrap();
assert_eq!(result.len(), 5, "result contains entry for each update");
assert_eq!(
result[&0].as_ref().err().unwrap(),
¬_found!("Record 0 not found in entry bucket/entry")
);
assert_eq!(
result[&5].as_ref().err().unwrap(),
¬_found!("Record 5 not found in entry bucket/entry")
);
let updated_labels = result.get(&1).unwrap().as_ref().unwrap();
let expected_labels_1 = make_expected_labels(1);
assert_eq!(updated_labels, &expected_labels_1);
let updated_labels = result.get(&2).unwrap().as_ref().unwrap();
let expected_labels_2 = make_expected_labels(2);
assert_eq!(updated_labels, &expected_labels_2);
let updated_labels = result.get(&3).unwrap().as_ref().unwrap();
let expected_labels_3 = make_expected_labels(3);
assert_eq!(updated_labels, &expected_labels_3);
let labels = entry.begin_read(1).await.unwrap().meta().labels().clone();
assert_eq!(labels, expected_labels_1);
let labels = entry.begin_read(2).await.unwrap().meta().labels().clone();
assert_eq!(labels, expected_labels_2);
let labels = entry.begin_read(3).await.unwrap().meta().labels().clone();
assert_eq!(labels, expected_labels_3);
}
#[rstest]
#[tokio::test]
async fn test_update_nothing(#[future] entry: Arc<Entry>) {
let entry = entry.await;
write_stub_record(&entry, 1).await;
let result = entry
.clone()
.update_labels(vec![UpdateLabels {
time: 1,
update: Labels::new(),
remove: HashSet::new(),
}])
.await
.unwrap();
assert_eq!(result.len(), 1);
let updated_labels = result.get(&1).unwrap().as_ref().unwrap();
let expected_labels = vec![
Label {
name: "a-1".to_string(),
value: "x-1".to_string(),
},
Label {
name: "c-1".to_string(),
value: "z-1".to_string(),
},
];
assert_eq!(
updated_labels,
&expected_labels
.iter()
.map(|l| (l.name.clone(), l.value.clone()))
.collect::<Labels>()
);
let block = entry
.block_manager
.write()
.await
.unwrap()
.load_block(1)
.await
.unwrap();
let mut record = block.read().await.unwrap().get_record(1).unwrap().clone();
record.labels.sort_by(|a, b| a.name.cmp(&b.name));
assert_eq!(record.labels, expected_labels);
}
fn make_update(time: u64) -> UpdateLabels {
UpdateLabels {
time: time,
update: Labels::from_iter(vec![
(format!("a-{}", time), format!("y-{}", time)),
(format!("b-{}", time), format!("f-{}", time)),
]),
remove: HashSet::from_iter(vec![format!("c-{}", time), "".to_string()]),
}
}
fn make_expected_labels(time: u64) -> Labels {
Labels::from_iter(vec![
(format!("a-{}", time), format!("y-{}", time)),
(format!("b-{}", time), format!("f-{}", time)),
])
}
async fn write_stub_record(entry: &Arc<Entry>, time: u64) {
write_record_with_labels(
entry,
time,
vec![],
Labels::from_iter(vec![
(format!("a-{}", time), format!("x-{}", time)),
(format!("c-{}", time), format!("z-{}", time)),
]),
)
.await;
}
}