use std::mem::take;
use dyn_clone::clone_box;
use crate::{
Circuit, DBData, DBWeight, DynZWeight, RootCircuit, Stream, TypedBox, ZWeight,
algebra::MulByRef,
circuit::{WithClock, metadata::MetaItem},
dynamic::{ClonableTrait, DowncastTrait, DynData, DynUnit, DynWeight, Erase},
operator::dynamic::aggregate::{
Aggregator, DynAggregatorImpl, IncAggregateFactories, IncAggregateLinearFactories,
StreamAggregateFactories, StreamLinearAggregateFactories,
},
storage::file::Deserializable,
trace::{BatchReaderFactories, Filter},
typed_batch::{Batch, BatchReader, DynOrdIndexedZSet, IndexedZSet, OrdIndexedZSet, OrdWSet},
};
impl<C, K, V> Stream<C, OrdIndexedZSet<K, V>>
where
C: Circuit,
K: DBData,
<K as Deserializable>::ArchivedDeser: Ord,
V: DBData,
{
#[cfg(not(feature = "backend-mode"))]
#[allow(clippy::type_complexity)]
pub fn aggregate<A>(&self, aggregator: A) -> Stream<C, OrdIndexedZSet<K, A::Output>>
where
A: Aggregator<V, <C as WithClock>::Time, ZWeight>,
{
let aggregate_factories = IncAggregateFactories::new::<K, V, ZWeight, A::Output>();
let dyn_aggregator = DynAggregatorImpl::<
DynData,
V,
C::Time,
DynZWeight,
ZWeight,
A,
DynData,
DynData,
>::new(aggregator);
self.inner()
.dyn_aggregate(None, &aggregate_factories, &dyn_aggregator)
.typed()
}
}
impl<C, Z> Stream<C, Z>
where
C: Circuit,
{
#[allow(clippy::type_complexity)]
pub fn stream_aggregate<A>(&self, aggregator: A) -> Stream<C, OrdIndexedZSet<Z::Key, A::Output>>
where
Z: IndexedZSet<DynK = DynData>,
Z::InnerBatch: Send,
A: Aggregator<Z::Val, (), ZWeight>,
{
let factories: StreamAggregateFactories<
<Z as BatchReader>::Inner,
<OrdIndexedZSet<Z::Key, A::Output> as BatchReader>::Inner,
> = StreamAggregateFactories::new::<Z::Key, Z::Val, ZWeight, A::Output>();
self.inner()
.dyn_stream_aggregate(
&factories,
&DynAggregatorImpl::<Z::DynV, Z::Val, (), DynZWeight, ZWeight, A, DynData, DynData>::new(aggregator),
)
.typed()
}
pub fn stream_aggregate_generic<A, O>(&self, aggregator: A) -> Stream<C, O>
where
Z: Batch<Time = ()>,
Z::InnerBatch: Send,
A: Aggregator<Z::Val, (), Z::R>,
A::Output: Erase<O::DynV>,
O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, Val = A::Output>,
{
let factories: StreamAggregateFactories<
<Z as BatchReader>::Inner,
<O as BatchReader>::Inner,
> = StreamAggregateFactories::new::<Z::Key, Z::Val, Z::R, A::Output>();
self.inner()
.dyn_stream_aggregate_generic(
&factories,
&DynAggregatorImpl::<Z::DynV, Z::Val, (), Z::DynR, Z::R, A, DynData, O::DynV>::new(
aggregator,
),
)
.typed()
}
pub fn stream_aggregate_linear<F, A>(&self, f: F) -> Stream<C, OrdIndexedZSet<Z::Key, A>>
where
Z: IndexedZSet<DynK = DynData>,
A: DBWeight + MulByRef<ZWeight, Output = A>,
F: Fn(&Z::Val) -> A + Clone + 'static,
{
let factories: StreamLinearAggregateFactories<
Z::Inner,
DynWeight,
DynOrdIndexedZSet<DynData, DynData>,
> = StreamLinearAggregateFactories::new::<Z::Key, Z::Val, A, A>();
self.inner()
.dyn_stream_aggregate_linear_generic(
&factories,
Box::new(move |_k, v, r, acc| unsafe {
*acc.downcast_mut() = f(v.downcast()).mul_by_ref(&**r)
}),
Box::new(|w, out| w.as_data_mut().move_to(out)),
)
.typed()
}
pub fn stream_aggregate_linear_generic<F, A, O>(&self, f: F) -> Stream<C, O>
where
Z: IndexedZSet,
O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, Val = A, DynV = DynData>,
A: DBWeight + MulByRef<ZWeight, Output = A>,
F: Fn(&Z::Val) -> A + Clone + 'static,
{
let factories: StreamLinearAggregateFactories<Z::Inner, DynWeight, O::Inner> =
StreamLinearAggregateFactories::new::<Z::Key, Z::Val, A, A>();
self.inner()
.dyn_stream_aggregate_linear_generic(
&factories,
Box::new(move |_k, v, r, acc| unsafe {
*acc.downcast_mut() = f(v.downcast()).mul_by_ref(&**r)
}),
Box::new(|w, out| w.as_data_mut().move_to(out)),
)
.typed()
}
pub fn aggregate_generic<A, O>(&self, aggregator: A) -> Stream<C, O>
where
Z: Batch<Time = ()> + std::fmt::Debug,
Z::InnerBatch: Send,
A: Aggregator<Z::Val, <C as WithClock>::Time, Z::R>,
O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, Val = DynData>,
A::Output: Erase<O::DynV>,
<Z::Key as Deserializable>::ArchivedDeser: Ord,
{
let factories: IncAggregateFactories<Z::Inner, O::Inner, C::Time> =
IncAggregateFactories::new::<Z::Key, Z::Val, Z::R, A::Output>();
self.inner()
.dyn_aggregate_generic(
None,
&factories,
&DynAggregatorImpl::<Z::DynV, Z::Val, _, Z::DynR, Z::R, _, DynData, O::DynV>::new(
aggregator,
),
)
.typed()
}
#[track_caller]
pub fn aggregate_linear<F, A>(&self, f: F) -> Stream<C, OrdIndexedZSet<Z::Key, A>>
where
Z: IndexedZSet<DynK = DynData>,
A: DBWeight + MulByRef<ZWeight, Output = A>,
F: Fn(&Z::Val) -> A + Clone + 'static,
<Z::Key as Deserializable>::ArchivedDeser: Ord,
{
let factories: IncAggregateLinearFactories<
Z::Inner,
DynWeight,
DynOrdIndexedZSet<DynData, DynData>,
C::Time,
> = IncAggregateLinearFactories::new::<Z::Key, A, A>();
self.inner()
.dyn_aggregate_linear_generic(
None,
&factories,
Box::new(move |_k, v, r, acc| unsafe {
*acc.downcast_mut::<A>() = f(v.downcast::<Z::Val>()).mul_by_ref(&**r)
}),
Box::new(|w, out| w.as_data_mut().move_to(out)),
)
.typed()
}
#[cfg(not(feature = "backend-mode"))]
#[track_caller]
pub fn aggregate_linear_postprocess<F, A, OF, OV>(
&self,
f: F,
of: OF,
) -> Stream<C, OrdIndexedZSet<Z::Key, OV>>
where
Z: IndexedZSet<DynK = DynData>,
A: DBWeight + MulByRef<ZWeight, Output = A>,
OV: DBData,
F: Fn(&Z::Val) -> A + Clone + 'static,
OF: Fn(A) -> OV + Clone + 'static,
<Z::Key as Deserializable>::ArchivedDeser: Ord,
{
let factories: IncAggregateLinearFactories<
Z::Inner,
DynWeight,
DynOrdIndexedZSet<DynData, DynData>,
C::Time,
> = IncAggregateLinearFactories::new::<Z::Key, A, OV>();
self.inner()
.dyn_aggregate_linear_generic(
None,
&factories,
Box::new(move |_k, v, r, acc| unsafe {
*acc.downcast_mut::<A>() = f(v.downcast::<Z::Val>()).mul_by_ref(&**r)
}),
Box::new(move |w, out| unsafe {
*out.downcast_mut::<OV>() = of(take(w.downcast_mut::<A>()))
}),
)
.typed()
}
pub fn weigh<F, T>(&self, f: F) -> Stream<C, OrdWSet<Z::Key, T, DynWeight>>
where
Z: IndexedZSet<DynK = DynData>,
F: Fn(&Z::Key, &Z::Val) -> T + 'static,
T: DBWeight + MulByRef<ZWeight, Output = T>,
{
self.inner()
.dyn_weigh(
&BatchReaderFactories::new::<Z::Key, (), T>(),
Box::new(move |k, v, r, acc: &mut DynWeight| unsafe {
*acc.downcast_mut() = f(k.downcast(), v.downcast()).mul_by_ref(r.downcast())
}),
)
.typed()
}
pub fn weigh_generic<F, T, O>(&self, f: F) -> Stream<C, O>
where
Z: IndexedZSet,
F: Fn(&Z::Key, &Z::Val) -> T + 'static,
O: Batch<
Key = Z::Key,
DynK = Z::DynK,
Val = (),
DynV = DynUnit,
Time = (),
DynR = DynWeight,
>,
T: DBWeight + MulByRef<ZWeight, Output = T>,
{
self.inner()
.dyn_weigh_generic(
&BatchReaderFactories::new::<Z::Key, (), O::R>(),
Box::new(move |k, v, r, acc: &mut DynWeight| unsafe {
*acc.downcast_mut() = f(k.downcast(), v.downcast()).mul_by_ref(r.downcast())
}),
)
.typed()
}
}
impl<K, V> Stream<RootCircuit, OrdIndexedZSet<K, V>>
where
K: DBData,
V: DBData,
{
#[track_caller]
pub fn aggregate_linear_postprocess_retain_keys<F, A, OF, OV, TS, RK>(
&self,
waterline: &Stream<RootCircuit, TypedBox<TS, DynData>>,
retain_key_func: RK,
f: F,
of: OF,
) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where
A: DBWeight + MulByRef<ZWeight, Output = A>,
OV: DBData,
TS: DBData,
RK: Fn(&K, &TS) -> bool + Clone + Send + Sync + 'static,
F: Fn(&V) -> A + Clone + 'static,
OF: Fn(A) -> OV + Clone + 'static,
{
self.aggregate_linear_postprocess_retain_keys_persistent::<F, A, OF, OV, TS, RK>(
None,
waterline,
retain_key_func,
f,
of,
)
}
#[track_caller]
pub fn aggregate_linear_postprocess_retain_keys_persistent<F, A, OF, OV, TS, RK>(
&self,
persistent_id: Option<&str>,
waterline: &Stream<RootCircuit, TypedBox<TS, DynData>>,
retain_key_func: RK,
f: F,
of: OF,
) -> Stream<RootCircuit, OrdIndexedZSet<K, OV>>
where
A: DBWeight + MulByRef<ZWeight, Output = A>,
OV: DBData,
TS: DBData,
RK: Fn(&K, &TS) -> bool + Clone + Send + Sync + 'static,
F: Fn(&V) -> A + Clone + 'static,
OF: Fn(A) -> OV + Clone + 'static,
{
let factories: IncAggregateLinearFactories<
DynOrdIndexedZSet<DynData, DynData>,
DynWeight,
DynOrdIndexedZSet<DynData, DynData>,
(),
> = IncAggregateLinearFactories::new::<K, A, OV>();
self.inner()
.dyn_aggregate_linear_retain_keys_mono(
persistent_id,
&factories,
&waterline.inner_data(),
Box::new(move |ts| {
let metadata = MetaItem::String(format!("{ts:?}"));
let ts = clone_box(ts);
let retain_key_func = retain_key_func.clone();
Filter::new(Box::new(move |k: &DynData| {
retain_key_func(unsafe { k.downcast::<K>() }, unsafe {
ts.as_ref().downcast::<TS>()
})
}))
.with_metadata(metadata)
}),
Box::new(move |_k, v, r, acc| unsafe {
*acc.downcast_mut::<A>() = f(v.downcast::<V>()).mul_by_ref(&**r)
}),
Box::new(move |w, out| unsafe {
*out.downcast_mut::<OV>() = of(take(w.downcast_mut::<A>()))
}),
)
.typed()
}
#[track_caller]
pub fn aggregate_linear_retain_keys<F, A, TS, RK>(
&self,
waterline: &Stream<RootCircuit, TypedBox<TS, DynData>>,
retain_key_func: RK,
f: F,
) -> Stream<RootCircuit, OrdIndexedZSet<K, A>>
where
A: DBWeight + MulByRef<ZWeight, Output = A>,
TS: DBData,
RK: Fn(&K, &TS) -> bool + Clone + Send + Sync + 'static,
F: Fn(&V) -> A + Clone + 'static,
{
let factories: IncAggregateLinearFactories<
DynOrdIndexedZSet<DynData, DynData>,
DynWeight,
DynOrdIndexedZSet<DynData, DynData>,
(),
> = IncAggregateLinearFactories::new::<K, A, A>();
self.inner()
.dyn_aggregate_linear_retain_keys_mono(
None,
&factories,
&waterline.inner_data(),
Box::new(move |ts| {
let metadata = MetaItem::String(format!("{ts:?}"));
let ts = clone_box(ts);
let retain_key_func = retain_key_func.clone();
Filter::new(Box::new(move |k: &DynData| {
retain_key_func(unsafe { k.downcast::<K>() }, unsafe {
ts.as_ref().downcast::<TS>()
})
}))
.with_metadata(metadata)
}),
Box::new(move |_k, v, r, acc| unsafe {
*acc.downcast_mut::<A>() = f(v.downcast::<V>()).mul_by_ref(&**r)
}),
Box::new(|w, out| w.as_data_mut().move_to(out)),
)
.typed()
}
}