fluxus_api/stream/
windowed_stream.rs

1use fluxus_utils::window::WindowConfig;
2
3use crate::operators::WindowAggregator;
4use crate::stream::datastream::DataStream;
5
6/// Represents a windowed stream for aggregation operations
7pub struct WindowedStream<T> {
8    pub(crate) stream: DataStream<T>,
9    pub(crate) window_config: WindowConfig,
10}
11
12impl<T> WindowedStream<T>
13where
14    T: Clone + Send + Sync + 'static,
15{
16    /// Aggregate values in the window
17    pub fn aggregate<A, F>(self, init: A, f: F) -> DataStream<A>
18    where
19        A: Clone + Send + Sync + 'static,
20        F: Fn(A, T) -> A + Send + Sync + 'static,
21    {
22        let aggregator = WindowAggregator::new(self.window_config, init, f);
23        self.stream.transform(aggregator)
24    }
25}