fluxus_api/stream/
windowed_stream.rs1use std::cmp::{Ordering, Reverse};
2use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
3use std::hash::Hash;
4
5use fluxus_transformers::operator::{WindowAllOperator, WindowAnyOperator};
6use fluxus_utils::window::WindowConfig;
7
8use crate::operators::{
9 SortOrder, WindowAggregator, WindowSkipper, WindowSorter, WindowTimestampSorter,
10};
11use crate::stream::datastream::DataStream;
12
13pub struct WindowedStream<T> {
15 pub(crate) stream: DataStream<T>,
16 pub(crate) window_config: WindowConfig,
17}
18
19impl<T> WindowedStream<T>
20where
21 T: Clone + Send + Sync + 'static,
22{
23 pub fn aggregate<A, F>(self, init: A, f: F) -> DataStream<A>
25 where
26 A: Clone + Send + Sync + 'static,
27 F: Fn(A, T) -> A + Send + Sync + 'static,
28 {
29 let aggregator = WindowAggregator::new(self.window_config, init, f);
30 self.stream.transform(aggregator)
31 }
32
33 pub fn any<F>(self, f: F) -> DataStream<bool>
34 where
35 F: Fn(&T) -> bool + Send + Sync + 'static,
36 {
37 let anyer = WindowAnyOperator::new(f, self.window_config);
38 self.stream.transform(anyer)
39 }
40
41 pub fn all<F>(self, f: F) -> DataStream<bool>
42 where
43 F: Fn(&T) -> bool + Send + Sync + 'static,
44 {
45 let aller = WindowAllOperator::new(f, self.window_config);
46 self.stream.transform(aller)
47 }
48
49 pub fn limit(self, n: usize) -> DataStream<Vec<T>> {
51 let limiter = WindowAggregator::new(self.window_config, vec![], move |mut acc, value| {
52 if acc.len() < n {
53 acc.push(value);
54 }
55 acc
56 });
57 self.stream.transform(limiter)
58 }
59
60 pub fn tail(self, n: usize) -> DataStream<Vec<T>> {
62 let init = VecDeque::with_capacity(n);
63 let limiter = WindowAggregator::new(self.window_config, init, move |mut acc, value| {
64 acc.push_back(value);
65 if acc.len() > n {
66 acc.pop_front();
67 }
68 acc
69 });
70 self.stream
71 .transform(limiter)
72 .map(|d| d.into_iter().collect())
73 }
74
75 pub fn sort_by<F>(self, f: F) -> DataStream<Vec<T>>
77 where
78 F: FnMut(&T, &T) -> Ordering + Send + Sync + 'static,
79 {
80 let sorter = WindowSorter::new(self.window_config, f);
81 self.stream.transform(sorter)
82 }
83
84 pub fn sort_by_ts(self, order: SortOrder) -> DataStream<Vec<T>> {
86 let sorter = WindowTimestampSorter::new(self.window_config, order);
87 self.stream.transform(sorter)
88 }
89
90 pub fn sort_by_ts_asc(self) -> DataStream<Vec<T>> {
92 let sorter = WindowTimestampSorter::new(self.window_config, SortOrder::Asc);
93 self.stream.transform(sorter)
94 }
95
96 pub fn sort_by_ts_desc(self) -> DataStream<Vec<T>> {
98 let sorter = WindowTimestampSorter::new(self.window_config, SortOrder::Desc);
99 self.stream.transform(sorter)
100 }
101
102 pub fn skip(self, n: usize) -> DataStream<Vec<T>> {
104 let skipper = WindowSkipper::new(self.window_config, n);
105 self.stream.transform(skipper)
106 }
107}
108
109impl<T> WindowedStream<T>
110where
111 T: Ord + Clone + Send + Sync + 'static,
112{
113 pub fn sort(self, ord: SortOrder) -> DataStream<Vec<T>> {
115 self.sort_by(move |v1, v2| match ord {
116 SortOrder::Asc => v1.cmp(v2),
117 SortOrder::Desc => v2.cmp(v1),
118 })
119 }
120
121 pub fn top_k(self, k: usize) -> DataStream<Vec<T>> {
123 let init = BinaryHeap::<Reverse<T>>::new();
124 let res = self.aggregate(init, move |mut heap, v| {
125 heap.push(Reverse(v));
126 if heap.len() > k {
127 heap.pop();
128 }
129 heap
130 });
131 res.map(|heap| {
132 heap.into_sorted_vec()
133 .into_iter()
134 .map(|Reverse(v)| v)
135 .collect()
136 })
137 }
138}
139
140impl<T> WindowedStream<T>
141where
142 T: Eq + Hash + Clone + Send + Sync + 'static,
143{
144 pub fn distinct(self) -> DataStream<HashSet<T>> {
146 self.aggregate(HashSet::new(), |mut set, value| {
147 set.insert(value);
148 set
149 })
150 }
151}
152
153impl<T> WindowedStream<T>
154where
155 T: Clone + Send + Sync + 'static,
156{
157 pub fn distinct_by_key<F, K>(self, f: F) -> DataStream<Vec<T>>
159 where
160 F: Fn(&T) -> K + Sync + Send + 'static,
161 K: Eq + Hash + Clone + Sync + Send + 'static,
162 {
163 let keys = HashSet::new();
164 let data = vec![];
165 self.aggregate((keys, data), move |(mut keys, mut data), value| {
166 let k = f(&value);
167 if !keys.contains(&k) {
168 keys.insert(k);
169 data.push(value);
170 }
171 (keys, data)
172 })
173 .map(|(_, data)| data)
174 }
175
176 pub fn top_k_by_key<F, K>(self, n: usize, f: F) -> DataStream<Vec<T>>
178 where
179 F: Fn(&T) -> K + Sync + Send + 'static,
180 K: Ord + Eq + Hash + Clone + Sync + Send + 'static,
181 {
182 let keys = BinaryHeap::<Reverse<K>>::new();
184 let kvs: HashMap<K, Vec<T>> = HashMap::new();
186 self.aggregate((keys, kvs), move |(mut keys, mut kvs), value| {
187 let k = f(&value);
188
189 keys.push(Reverse(k.clone()));
190 kvs.entry(k).or_default().push(value);
191
192 if keys.len() > n {
193 if let Some(Reverse(min_k)) = keys.pop() {
194 kvs.get_mut(&min_k).map(|v| v.pop());
195 }
196 }
197 (keys, kvs)
198 })
199 .map(|(top_keys, mut kvs)| {
200 top_keys
201 .into_sorted_vec()
202 .into_iter()
203 .fold(vec![], move |mut acc, Reverse(k)| {
204 let values = kvs.remove(&k).unwrap_or_default();
205 acc.extend(values);
206 acc
207 })
208 })
209 }
210}