use std::convert::Infallible;
use std::fmt::Display;
use std::hash::BuildHasher;
use std::sync::Arc;
use quick_cache::sync::Cache;
use quick_cache::UnitWeighter;
use crate::block::{BlockStructure, GroupHasherBuilder, OperatorStructure};
use crate::operator::{Data, Operator, StreamElement};
use crate::scheduler::ExecutionMetadata;
use super::DataKey;
#[derive(Clone, Derivative)]
#[derivative(Debug)]
pub struct MapMemo<
O: Data + Sync,
K: DataKey + Sync,
F,
Fk,
Op,
H: BuildHasher + Clone = GroupHasherBuilder,
> where
F: Fn(Op::Out) -> O + Send + Clone,
Fk: Fn(&Op::Out) -> K + Send + Clone,
Op: Operator,
O: Data + Sync,
K: DataKey + Sync,
{
prev: Op,
#[derivative(Debug = "ignore")]
f: F,
#[derivative(Debug = "ignore")]
fk: Fk,
cache: Arc<Cache<K, O, UnitWeighter, H>>,
}
impl<O, K, F, Fk, Op> Display for MapMemo<O, K, F, Fk, Op>
where
F: Fn(Op::Out) -> O + Send + Clone,
Fk: Fn(&Op::Out) -> K + Send + Clone,
Op: Operator,
O: Data + Sync,
K: DataKey + Sync,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{} -> MapMemo<{} -> {}>",
self.prev,
std::any::type_name::<Op::Out>(),
std::any::type_name::<O>()
)
}
}
impl<O, K, F, Fk, Op> MapMemo<O, K, F, Fk, Op>
where
F: Fn(Op::Out) -> O + Send + Clone,
Fk: Fn(&Op::Out) -> K + Send + Clone,
Op: Operator,
O: Data + Sync,
K: DataKey + Sync,
{
pub(super) fn new(prev: Op, f: F, fk: Fk, capacity: usize) -> Self {
Self {
prev,
f,
fk,
cache: Arc::new(Cache::with(
capacity,
capacity as u64,
UnitWeighter,
Default::default(),
Default::default(),
)),
}
}
}
impl<O, K, F, Fk, Op> Operator for MapMemo<O, K, F, Fk, Op>
where
F: Fn(Op::Out) -> O + Send + Clone,
Fk: Fn(&Op::Out) -> K + Send + Clone,
Op: Operator,
O: Data + Sync,
K: DataKey + Sync,
{
type Out = O;
fn setup(&mut self, metadata: &mut ExecutionMetadata) {
self.prev.setup(metadata);
}
#[inline]
fn next(&mut self) -> StreamElement<O> {
self.prev.next().map(|v| {
let k = (self.fk)(&v);
self.cache
.get_or_insert_with(&k, || Ok::<O, Infallible>((self.f)(v)))
.unwrap()
})
}
fn structure(&self) -> BlockStructure {
self.prev
.structure()
.add_operator(OperatorStructure::new::<O, _>("MapMemo"))
}
}