fluxus_api/stream/
windowed_stream.rs

1use std::cmp::{Ordering, Reverse};
2use std::collections::{BinaryHeap, HashMap, HashSet, VecDeque};
3use std::hash::Hash;
4
5use fluxus_transformers::operator::{WindowAllOperator, WindowAnyOperator};
6use fluxus_utils::window::WindowConfig;
7
8use crate::operators::{
9    SortOrder, WindowAggregator, WindowSkipper, WindowSorter, WindowTimestampSorter,
10};
11use crate::stream::datastream::DataStream;
12
13/// Represents a windowed stream for aggregation operations
14pub struct WindowedStream<T> {
15    pub(crate) stream: DataStream<T>,
16    pub(crate) window_config: WindowConfig,
17}
18
19impl<T> WindowedStream<T>
20where
21    T: Clone + Send + Sync + 'static,
22{
23    /// Aggregate values in the window
24    pub fn aggregate<A, F>(self, init: A, f: F) -> DataStream<A>
25    where
26        A: Clone + Send + Sync + 'static,
27        F: Fn(A, T) -> A + Send + Sync + 'static,
28    {
29        let aggregator = WindowAggregator::new(self.window_config, init, f);
30        self.stream.transform(aggregator)
31    }
32
33    pub fn any<F>(self, f: F) -> DataStream<bool>
34    where
35        F: Fn(&T) -> bool + Send + Sync + 'static,
36    {
37        let anyer = WindowAnyOperator::new(f, self.window_config);
38        self.stream.transform(anyer)
39    }
40
41    pub fn all<F>(self, f: F) -> DataStream<bool>
42    where
43        F: Fn(&T) -> bool + Send + Sync + 'static,
44    {
45        let aller = WindowAllOperator::new(f, self.window_config);
46        self.stream.transform(aller)
47    }
48
49    /// Limit the number of values in the window
50    pub fn limit(self, n: usize) -> DataStream<Vec<T>> {
51        let limiter = WindowAggregator::new(self.window_config, vec![], move |mut acc, value| {
52            if acc.len() < n {
53                acc.push(value);
54            }
55            acc
56        });
57        self.stream.transform(limiter)
58    }
59
60    /// Retain last n values in the window
61    pub fn tail(self, n: usize) -> DataStream<Vec<T>> {
62        let init = VecDeque::with_capacity(n);
63        let limiter = WindowAggregator::new(self.window_config, init, move |mut acc, value| {
64            acc.push_back(value);
65            if acc.len() > n {
66                acc.pop_front();
67            }
68            acc
69        });
70        self.stream
71            .transform(limiter)
72            .map(|d| d.into_iter().collect())
73    }
74
75    /// Sort values in the window
76    pub fn sort_by<F>(self, f: F) -> DataStream<Vec<T>>
77    where
78        F: FnMut(&T, &T) -> Ordering + Send + Sync + 'static,
79    {
80        let sorter = WindowSorter::new(self.window_config, f);
81        self.stream.transform(sorter)
82    }
83
84    /// Sort values in the window by timestamp
85    pub fn sort_by_ts(self, order: SortOrder) -> DataStream<Vec<T>> {
86        let sorter = WindowTimestampSorter::new(self.window_config, order);
87        self.stream.transform(sorter)
88    }
89
90    /// Sort values in the window by timestamp in ascending order
91    pub fn sort_by_ts_asc(self) -> DataStream<Vec<T>> {
92        let sorter = WindowTimestampSorter::new(self.window_config, SortOrder::Asc);
93        self.stream.transform(sorter)
94    }
95
96    /// Sort values in the window by timestamp in descending order
97    pub fn sort_by_ts_desc(self) -> DataStream<Vec<T>> {
98        let sorter = WindowTimestampSorter::new(self.window_config, SortOrder::Desc);
99        self.stream.transform(sorter)
100    }
101
102    /// Skip
103    pub fn skip(self, n: usize) -> DataStream<Vec<T>> {
104        let skipper = WindowSkipper::new(self.window_config, n);
105        self.stream.transform(skipper)
106    }
107}
108
109impl<T> WindowedStream<T>
110where
111    T: Ord + Clone + Send + Sync + 'static,
112{
113    /// Sort values in specified order
114    pub fn sort(self, ord: SortOrder) -> DataStream<Vec<T>> {
115        self.sort_by(move |v1, v2| match ord {
116            SortOrder::Asc => v1.cmp(v2),
117            SortOrder::Desc => v2.cmp(v1),
118        })
119    }
120
121    /// Get the top k values in the window, the values are sorted in descending order
122    pub fn top_k(self, k: usize) -> DataStream<Vec<T>> {
123        let init = BinaryHeap::<Reverse<T>>::new();
124        let res = self.aggregate(init, move |mut heap, v| {
125            heap.push(Reverse(v));
126            if heap.len() > k {
127                heap.pop();
128            }
129            heap
130        });
131        res.map(|heap| {
132            heap.into_sorted_vec()
133                .into_iter()
134                .map(|Reverse(v)| v)
135                .collect()
136        })
137    }
138}
139
140impl<T> WindowedStream<T>
141where
142    T: Eq + Hash + Clone + Send + Sync + 'static,
143{
144    /// Distinct values
145    pub fn distinct(self) -> DataStream<HashSet<T>> {
146        self.aggregate(HashSet::new(), |mut set, value| {
147            set.insert(value);
148            set
149        })
150    }
151}
152
153impl<T> WindowedStream<T>
154where
155    T: Clone + Send + Sync + 'static,
156{
157    /// Distinct values by key. When the same key is encountered, the first occurrence of the value is retained
158    pub fn distinct_by_key<F, K>(self, f: F) -> DataStream<Vec<T>>
159    where
160        F: Fn(&T) -> K + Sync + Send + 'static,
161        K: Eq + Hash + Clone + Sync + Send + 'static,
162    {
163        let keys = HashSet::new();
164        let data = vec![];
165        self.aggregate((keys, data), move |(mut keys, mut data), value| {
166            let k = f(&value);
167            if !keys.contains(&k) {
168                keys.insert(k);
169                data.push(value);
170            }
171            (keys, data)
172        })
173        .map(|(_, data)| data)
174    }
175
176    /// Get top k values by key. The values are sorted by key in descending order
177    pub fn top_k_by_key<F, K>(self, n: usize, f: F) -> DataStream<Vec<T>>
178    where
179        F: Fn(&T) -> K + Sync + Send + 'static,
180        K: Ord + Eq + Hash + Clone + Sync + Send + 'static,
181    {
182        // Store the top k keys
183        let keys = BinaryHeap::<Reverse<K>>::new();
184        // Store the values by key
185        let kvs: HashMap<K, Vec<T>> = HashMap::new();
186        self.aggregate((keys, kvs), move |(mut keys, mut kvs), value| {
187            let k = f(&value);
188
189            keys.push(Reverse(k.clone()));
190            kvs.entry(k).or_default().push(value);
191
192            if keys.len() > n {
193                if let Some(Reverse(min_k)) = keys.pop() {
194                    kvs.get_mut(&min_k).map(|v| v.pop());
195                }
196            }
197            (keys, kvs)
198        })
199        .map(|(top_keys, mut kvs)| {
200            top_keys
201                .into_sorted_vec()
202                .into_iter()
203                .fold(vec![], move |mut acc, Reverse(k)| {
204                    let values = kvs.remove(&k).unwrap_or_default();
205                    acc.extend(values);
206                    acc
207                })
208        })
209    }
210}