fluxus_api/stream/
windowed_stream.rs

1use crate::operators::WindowAggregator;
2use crate::stream::datastream::DataStream;
3use fluxus_core::WindowConfig;
4
5/// Represents a windowed stream for aggregation operations
6pub struct WindowedStream<T> {
7    pub(crate) stream: DataStream<T>,
8    pub(crate) window_config: WindowConfig,
9}
10
11impl<T> WindowedStream<T>
12where
13    T: Clone + Send + Sync + 'static,
14{
15    /// Aggregate values in the window
16    pub fn aggregate<A, F>(self, init: A, f: F) -> DataStream<A>
17    where
18        A: Clone + Send + Sync + 'static,
19        F: Fn(A, T) -> A + Send + Sync + 'static,
20    {
21        let aggregator = WindowAggregator::new(self.window_config, init, f);
22        self.stream.transform(aggregator)
23    }
24}