use crate::{
DBData, DBWeight, Timestamp,
algebra::{HasZero, MonoidValue, Semigroup},
dynamic::{DataTrait, DynUnit, Erase, WeightTrait},
operator::Aggregator,
trace::Cursor,
};
use std::{convert::identity, marker::PhantomData};
#[derive(Clone)]
pub struct Fold<V, A, S, SF, OF> {
init: A,
step: SF,
output: OF,
phantom: PhantomData<(V, S)>,
}
impl<V, A, S, SF> Fold<V, A, S, SF, fn(A) -> A> {
pub fn new(init: A, step: SF) -> Self {
Self {
init,
step,
output: identity,
phantom: PhantomData,
}
}
}
impl<V, A, S, SF, OF> Fold<V, A, S, SF, OF> {
pub fn with_output(init: A, step: SF, output: OF) -> Self {
Self {
init,
step,
output,
phantom: PhantomData,
}
}
}
impl<V, T, R, A, S, O, SF, OF> Aggregator<V, T, R> for Fold<V, A, S, SF, OF>
where
T: Timestamp,
V: DBData,
R: DBWeight + MonoidValue,
A: DBData,
SF: Fn(&mut A, &V, R) + Clone + 'static,
OF: Fn(A) -> O + Clone + 'static,
S: Semigroup<A> + Clone + 'static,
O: DBData,
{
type Accumulator = A;
type Output = O;
type Semigroup = S;
fn aggregate<VTrait, RTrait>(
&self,
cursor: &mut dyn Cursor<VTrait, DynUnit, T, RTrait>,
) -> Option<Self::Accumulator>
where
VTrait: DataTrait + ?Sized,
RTrait: WeightTrait + ?Sized,
V: Erase<VTrait>,
R: Erase<RTrait>,
{
let mut acc = self.init.clone();
let mut non_empty = false;
while cursor.key_valid() {
let mut weight: R = HasZero::zero();
cursor.map_times(&mut |_t, w| weight.add_assign_by_ref(unsafe { w.downcast() }));
if !weight.is_zero() {
non_empty = true;
(self.step)(&mut acc, unsafe { cursor.key().downcast() }, weight);
}
cursor.step_key();
}
non_empty.then_some(acc)
}
fn finalize(&self, acc: Self::Accumulator) -> Self::Output {
(self.output)(acc)
}
}