use std::mem;
use crate::{
DBData, OrdIndexedZSet, RootCircuit, Stream, ZWeight,
dynamic::{DowncastTrait, DynData},
trace::BatchReaderFactories,
};
impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
where
K: DBData,
V: DBData,
{
#[track_caller]
pub fn chain_aggregate<A, FInit, FUpdate>(
&self,
finit: FInit,
fupdate: FUpdate,
) -> Stream<RootCircuit, OrdIndexedZSet<K, A>>
where
A: DBData,
FInit: Fn(&V, ZWeight) -> A + 'static,
FUpdate: Fn(A, &V, ZWeight) -> A + 'static,
{
self.chain_aggregate_persistent::<A, FInit, FUpdate>(None, finit, fupdate)
}
#[track_caller]
pub fn chain_aggregate_persistent<A, FInit, FUpdate>(
&self,
persistent_id: Option<&str>,
finit: FInit,
fupdate: FUpdate,
) -> Stream<RootCircuit, OrdIndexedZSet<K, A>>
where
A: DBData,
FInit: Fn(&V, ZWeight) -> A + 'static,
FUpdate: Fn(A, &V, ZWeight) -> A + 'static,
{
let input_factories = BatchReaderFactories::new::<K, V, ZWeight>();
let output_factories = BatchReaderFactories::new::<K, A, ZWeight>();
self.inner()
.dyn_chain_aggregate_mono(
persistent_id,
&input_factories,
&output_factories,
Box::new(move |acc: &mut DynData, v: &DynData, w: ZWeight| unsafe {
*acc.downcast_mut() = finit(v.downcast(), w)
}),
Box::new(move |acc: &mut DynData, v: &DynData, w: ZWeight| unsafe {
*acc.downcast_mut() =
fupdate(mem::take(acc.downcast_mut::<A>()), v.downcast(), w)
}),
)
.typed()
}
}