1use crate::{
2 dynamic::{DowncastTrait, DynData},
3 operator::dynamic::{
4 aggregate::{IncAggregateLinearFactories, StreamLinearAggregateFactories},
5 count::{DistinctCountFactories, StreamDistinctCountFactories},
6 },
7 storage::file::Deserializable,
8 typed_batch::{DynOrdIndexedZSet, IndexedZSet, OrdIndexedZSet},
9 Circuit, DynZWeight, Stream, ZWeight,
10};
11
12impl<C, Z> Stream<C, Z>
13where
14 C: Circuit,
15 Z: IndexedZSet<DynK = DynData>,
16 Z::InnerBatch: Send,
17 <Z::Key as Deserializable>::ArchivedDeser: Ord,
18{
19 #[allow(clippy::type_complexity)]
23 #[track_caller]
24 pub fn weighted_count(&self) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>> {
25 let factories: IncAggregateLinearFactories<
26 Z::Inner,
27 DynZWeight,
28 DynOrdIndexedZSet<DynData, DynData>,
29 C::Time,
30 > = IncAggregateLinearFactories::new::<Z::Key, ZWeight, ZWeight>();
31
32 self.inner()
33 .dyn_weighted_count_generic(
34 None,
35 &factories,
36 Box::new(|w, out| *unsafe { out.downcast_mut() } = **w),
37 )
38 .typed()
39 }
40
41 #[track_caller]
43 pub fn weighted_count_generic<O>(&self) -> Stream<C, O>
44 where
45 O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, DynV = DynData>,
46 {
47 let factories: IncAggregateLinearFactories<Z::Inner, DynZWeight, O::Inner, C::Time> =
48 IncAggregateLinearFactories::new::<Z::Key, ZWeight, O::Val>();
49
50 self.inner()
51 .dyn_weighted_count_generic(
52 None,
53 &factories,
54 Box::new(|w, out| *unsafe { out.downcast_mut() } = **w),
55 )
56 .typed()
57 }
58
59 #[allow(clippy::type_complexity)]
64 #[track_caller]
65 pub fn distinct_count(&self) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>> {
66 let factories: DistinctCountFactories<
67 Z::Inner,
68 DynOrdIndexedZSet<DynData, DynData>,
69 C::Time,
70 > = DistinctCountFactories::new::<Z::Key, Z::Val, ZWeight>();
71
72 self.inner()
73 .dyn_distinct_count_generic(
74 None,
75 &factories,
76 Box::new(|w, out| *unsafe { out.downcast_mut() } = **w),
77 )
78 .typed()
79 }
80
81 #[track_caller]
83 pub fn distinct_count_generic<O>(&self) -> Stream<C, O>
84 where
85 O: IndexedZSet<Key = Z::Key, DynK = DynData>,
86 {
87 let factories: DistinctCountFactories<Z::Inner, O::Inner, C::Time> =
88 DistinctCountFactories::new::<Z::Key, Z::Val, O::Val>();
89
90 self.inner()
91 .dyn_distinct_count_generic(
92 None,
93 &factories,
94 Box::new(|w, out| *unsafe { out.downcast_mut() } = **w),
95 )
96 .typed()
97 }
98
99 #[allow(clippy::type_complexity)]
103 #[track_caller]
104 pub fn stream_weighted_count(&self) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>> {
105 let factories: StreamLinearAggregateFactories<
106 Z::Inner,
107 Z::DynR,
108 DynOrdIndexedZSet<DynData, DynData>,
109 > = StreamLinearAggregateFactories::new::<Z::Key, Z::Val, ZWeight, ZWeight>();
110
111 self.inner()
112 .dyn_stream_weighted_count_generic(
113 &factories,
114 Box::new(|w, out| *unsafe { out.downcast_mut() } = **w),
115 )
116 .typed()
117 }
118
119 #[track_caller]
121 pub fn stream_weighted_count_generic<O>(&self) -> Stream<C, O>
122 where
123 O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, Val = ZWeight, DynV = DynData>,
124 {
125 let factories: StreamLinearAggregateFactories<Z::Inner, Z::DynR, O::Inner> =
126 StreamLinearAggregateFactories::new::<Z::Key, Z::Val, ZWeight, ZWeight>();
127
128 self.inner()
129 .dyn_stream_weighted_count_generic(
130 &factories,
131 Box::new(|w, out| *unsafe { out.downcast_mut() } = **w),
132 )
133 .typed()
134 }
135
136 #[allow(clippy::type_complexity)]
141 #[track_caller]
142 pub fn stream_distinct_count(&self) -> Stream<C, OrdIndexedZSet<Z::Key, ZWeight>> {
143 let factories: StreamDistinctCountFactories<Z::Inner, DynOrdIndexedZSet<DynData, DynData>> =
144 StreamDistinctCountFactories::new::<Z::Key, Z::Val, ZWeight>();
145
146 self.inner()
147 .dyn_stream_distinct_count_generic(
148 &factories,
149 Box::new(|w, out| *unsafe { out.downcast_mut() } = **w),
150 )
151 .typed()
152 }
153
154 #[track_caller]
156 pub fn stream_distinct_count_generic<O>(&self) -> Stream<C, O>
157 where
158 O: IndexedZSet<Key = Z::Key, DynK = Z::DynK, Val = ZWeight, DynV = DynData>,
159 {
160 let factories: StreamDistinctCountFactories<Z::Inner, O::Inner> =
161 StreamDistinctCountFactories::new::<Z::Key, Z::Val, ZWeight>();
162
163 self.inner()
164 .dyn_stream_distinct_count_generic(
165 &factories,
166 Box::new(|w, out| *unsafe { out.downcast_mut() } = **w),
167 )
168 .typed()
169 }
170}