dbsp/operator/
count.rs

1use crate::{
2    dynamic::{DowncastTrait, DynData},
3    operator::dynamic::{
4        aggregate::{IncAggregateLinearFactories, StreamLinearAggregateFactories},
5        count::{DistinctCountFactories, StreamDistinctCountFactories},
6    },
7    storage::file::Deserializable,
8    typed_batch::{DynOrdIndexedZSet, IndexedZSet, OrdIndexedZSet},
9    Circuit, DynZWeight, Stream, ZWeight,
10};
11
12impl<C, Z> Stream<C, Z>
13where
14    C: Circuit,
15    Z: IndexedZSet<DynK = DynData>,
16    Z::InnerBatch: Send,
17    <Z::Key as Deserializable>::ArchivedDeser: Ord,
18{
19    /// Incrementally sums the weights for each key `self` into an indexed Z-set
20    /// that maps from the original keys to the weights.  Both the input and
21    /// output are streams of updates.
22    #[allow(clippy::type_complexity)]
23    #[track_caller]
24    pub fn weighted_count(&self) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>> {
25        let factories: IncAggregateLinearFactories<
26            Z::Inner,
27            DynZWeight,
28            DynOrdIndexedZSet<DynData, DynData>,
29            C::Time,
30        > = IncAggregateLinearFactories::new::<Z::Key, ZWeight, ZWeight>();
31
32        self.inner()
33            .dyn_weighted_count_generic(
34                None,
35                &factories,
36                Box::new(|w, out| *unsafe { out.downcast_mut() } = **w),
37            )
38            .typed()
39    }
40
41    /// Like [`Self::dyn_weighted_count`], but can return any batch type.
42    #[track_caller]
43    pub fn weighted_count_generic<O>(&self) -> Stream<C, O>
44    where
45        O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, DynV = DynData>,
46    {
47        let factories: IncAggregateLinearFactories<Z::Inner, DynZWeight, O::Inner, C::Time> =
48            IncAggregateLinearFactories::new::<Z::Key, ZWeight, O::Val>();
49
50        self.inner()
51            .dyn_weighted_count_generic(
52                None,
53                &factories,
54                Box::new(|w, out| *unsafe { out.downcast_mut() } = **w),
55            )
56            .typed()
57    }
58
59    /// Incrementally, for each key in `self`, counts the number of unique
60    /// values having positive weights, and outputs it as an indexed Z-set
61    /// that maps from the original keys to the unique value counts.  Both
62    /// the input and output are streams of updates.
63    #[allow(clippy::type_complexity)]
64    #[track_caller]
65    pub fn distinct_count(&self) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>> {
66        let factories: DistinctCountFactories<
67            Z::Inner,
68            DynOrdIndexedZSet<DynData, DynData>,
69            C::Time,
70        > = DistinctCountFactories::new::<Z::Key, Z::Val, ZWeight>();
71
72        self.inner()
73            .dyn_distinct_count_generic(
74                None,
75                &factories,
76                Box::new(|w, out| *unsafe { out.downcast_mut() } = **w),
77            )
78            .typed()
79    }
80
81    /// Like [`Self::dyn_distinct_count`], but can return any batch type.
82    #[track_caller]
83    pub fn distinct_count_generic<O>(&self) -> Stream<C, O>
84    where
85        O: IndexedZSet<Key = Z::Key, DynK = DynData>,
86    {
87        let factories: DistinctCountFactories<Z::Inner, O::Inner, C::Time> =
88            DistinctCountFactories::new::<Z::Key, Z::Val, O::Val>();
89
90        self.inner()
91            .dyn_distinct_count_generic(
92                None,
93                &factories,
94                Box::new(|w, out| *unsafe { out.downcast_mut() } = **w),
95            )
96            .typed()
97    }
98
99    /// Non-incrementally sums the weights for each key `self` into an indexed
100    /// Z-set that maps from the original keys to the weights.  Both the
101    /// input and output are streams of data (not updates).
102    #[allow(clippy::type_complexity)]
103    #[track_caller]
104    pub fn stream_weighted_count(&self) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>> {
105        let factories: StreamLinearAggregateFactories<
106            Z::Inner,
107            Z::DynR,
108            DynOrdIndexedZSet<DynData, DynData>,
109        > = StreamLinearAggregateFactories::new::<Z::Key, Z::Val, ZWeight, ZWeight>();
110
111        self.inner()
112            .dyn_stream_weighted_count_generic(
113                &factories,
114                Box::new(|w, out| *unsafe { out.downcast_mut() } = **w),
115            )
116            .typed()
117    }
118
119    /// Like [`Self::dyn_stream_weighted_count`], but can return any batch type.
120    #[track_caller]
121    pub fn stream_weighted_count_generic<O>(&self) -> Stream<C, O>
122    where
123        O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, Val = ZWeight, DynV = DynData>,
124    {
125        let factories: StreamLinearAggregateFactories<Z::Inner, Z::DynR, O::Inner> =
126            StreamLinearAggregateFactories::new::<Z::Key, Z::Val, ZWeight, ZWeight>();
127
128        self.inner()
129            .dyn_stream_weighted_count_generic(
130                &factories,
131                Box::new(|w, out| *unsafe { out.downcast_mut() } = **w),
132            )
133            .typed()
134    }
135
136    /// Incrementally, for each key in `self`, counts the number of unique
137    /// values having positive weights, and outputs it as an indexed Z-set
138    /// that maps from the original keys to the unique value counts.  Both
139    /// the input and output are streams of data (not updates).
140    #[allow(clippy::type_complexity)]
141    #[track_caller]
142    pub fn stream_distinct_count(&self) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>> {
143        let factories: StreamDistinctCountFactories<Z::Inner, DynOrdIndexedZSet<DynData, DynData>> =
144            StreamDistinctCountFactories::new::<Z::Key, Z::Val, ZWeight>();
145
146        self.inner()
147            .dyn_stream_distinct_count_generic(
148                &factories,
149                Box::new(|w, out| *unsafe { out.downcast_mut() } = **w),
150            )
151            .typed()
152    }
153
154    /// Like [`Self::dyn_distinct_count`], but can return any batch type.
155    #[track_caller]
156    pub fn stream_distinct_count_generic<O>(&self) -> Stream<C, O>
157    where
158        O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, Val = ZWeight, DynV = DynData>,
159    {
160        let factories: StreamDistinctCountFactories<Z::Inner, O::Inner> =
161            StreamDistinctCountFactories::new::<Z::Key, Z::Val, ZWeight>();
162
163        self.inner()
164            .dyn_stream_distinct_count_generic(
165                &factories,
166                Box::new(|w, out| *unsafe { out.downcast_mut() } = **w),
167            )
168            .typed()
169    }
170}