use crate::internal::mr::reduce::*;
use crate::internal::mr::rvec::RVec;
use crate::traits::record::Record;
use crate::traits::valid_key::{BorrowedKey, ValidKey};
use crate::types::storage::Storage;
use std::collections::HashMap;
pub struct Reduction<ChunkKey, Element, Summary>
where
ChunkKey: BorrowedKey + ?Sized,
ChunkKey::Owned: ValidKey,
{
parent_id: u64,
group_size: usize,
gc_chunk_list: RVec<Option<ChunkKey::Owned>>,
rules: ReduceRules<Element, Summary>,
chunkwise_reductions:
HashMap<ChunkKey::Owned, Reduce<Element, Summary>, crate::internal::hasher::HasherImpl>,
chunkwise_summaries: RVec<Summary>,
reduction: Reduce<Summary, Summary>,
}
impl<ChunkKey, Element, Summary> Reduction<ChunkKey, Element, Summary>
where
ChunkKey: BorrowedKey + ?Sized,
ChunkKey::Owned: ValidKey,
Summary: Default + Clone,
{
pub fn new<ItemKey, Map, Fold>(
storage: &Storage<ChunkKey, ItemKey, Element>,
group_size: usize,
map: Map,
fold: Fold,
) -> Self
where
ItemKey: BorrowedKey + ?Sized,
ItemKey::Owned: ValidKey,
Element: Record<ChunkKey, ItemKey>,
Map: Fn(&Element, &Summary) -> Option<Summary> + Clone + Send + Sync + 'static,
Fold: Fn(&[Summary], &Summary) -> Option<Summary> + Clone + Send + Sync + 'static,
{
let chunkwise_summaries = RVec::default();
let reduction = Reduce::new(
&chunkwise_summaries,
group_size,
Self::reduction_rules(map.clone(), fold.clone()),
);
Reduction {
parent_id: storage.id(),
group_size,
gc_chunk_list: RVec::default(),
rules: Self::chunkwise_rules(map.clone(), fold.clone()),
chunkwise_reductions: HashMap::with_hasher(
crate::internal::hasher::HasherImpl::default(),
),
chunkwise_summaries,
reduction,
}
}
fn reduction_rules<Map, Reduce>(_map: Map, reduce: Reduce) -> ReduceRules<Summary, Summary>
where
Map: Fn(&Element, &Summary) -> Option<Summary> + Clone + Send + Sync + 'static,
Reduce: Fn(&[Summary], &Summary) -> Option<Summary> + Clone + Send + Sync + 'static,
{
let map = reduce.clone();
ReduceRules::new(move |ss, s, _| map(std::slice::from_ref(ss), s), reduce)
}
fn chunkwise_rules<Map, Reduce>(map: Map, reduce: Reduce) -> ReduceRules<Element, Summary>
where
Map: Fn(&Element, &Summary) -> Option<Summary> + Clone + Send + Sync + 'static,
Reduce: Fn(&[Summary], &Summary) -> Option<Summary> + Clone + Send + Sync + 'static,
{
ReduceRules::new(move |e, s, _| map(e, s), reduce)
}
fn gc<ItemKey>(&mut self, parent: &Storage<ChunkKey, ItemKey, Element>)
where
ItemKey: BorrowedKey + ?Sized,
ItemKey::Owned: ValidKey,
Element: Record<ChunkKey, ItemKey>,
{
parent.gc(&mut self.gc_chunk_list, &mut self.chunkwise_reductions);
}
pub fn reduce<ItemKey>(
&mut self,
storage: &Storage<ChunkKey, ItemKey, Element>,
) -> Option<&Summary>
where
Element: Record<ChunkKey, ItemKey>,
ItemKey: BorrowedKey + ?Sized,
ItemKey::Owned: ValidKey,
{
assert_eq!(
self.parent_id,
storage.id(),
"Id mismatch: a Reduction may only be used with it's parent Storage, never any other Storage"
);
self.gc(storage);
let chunkwise_reductions = &mut self.chunkwise_reductions;
let chunkwise_summaries = &mut self.chunkwise_summaries;
let group_size = self.group_size;
let rules = &self.rules;
chunkwise_summaries.reduce(&self.gc_chunk_list, 1, |chunk_key, _old_summary, idx| {
assert!(chunk_key.len() <= 1);
if chunk_key.is_empty() {
return None;
}
let chunk_key = chunk_key[0]
.as_ref()
.cloned()
.expect("retriever bug: chunk keys should be defined for all indices after gc");
let internal_storage = storage.internal_rvec()[idx].internal_rvec();
chunkwise_reductions
.entry(chunk_key)
.or_insert_with(|| Reduce::new(internal_storage, group_size, rules.clone()))
.update(&internal_storage)
.cloned()
});
self.reduction.update(&self.chunkwise_summaries)
}
pub fn reduce_chunk<ItemKey>(
&mut self,
storage: &Storage<ChunkKey, ItemKey, Element>,
chunk_key: &ChunkKey,
) -> Option<&Summary>
where
Element: Record<ChunkKey, ItemKey>,
ItemKey: BorrowedKey + ?Sized,
ItemKey::Owned: ValidKey,
{
assert_eq!(
self.parent_id,
storage.id(),
"Id mismatch: a Reduction may only be used with it's parent Storage, never any other Storage"
);
self.gc(storage);
let chunkwise_reductions = &mut self.chunkwise_reductions;
let group_size = self.group_size;
let rules = &self.rules;
let idx = storage.internal_idx_of(chunk_key)?;
let internal_storage = storage.internal_rvec()[idx].internal_rvec();
chunkwise_reductions
.entry(chunk_key.to_owned())
.or_insert_with(|| Reduce::new(internal_storage, group_size, rules.clone()))
.update(&internal_storage)
}
}