fluxus_api/stream/
windowed_stream.rs1use crate::operators::WindowAggregator;
2use crate::stream::datastream::DataStream;
3use fluxus_core::WindowConfig;
4
5pub struct WindowedStream<T> {
7 pub(crate) stream: DataStream<T>,
8 pub(crate) window_config: WindowConfig,
9}
10
11impl<T> WindowedStream<T>
12where
13 T: Clone + Send + Sync + 'static,
14{
15 pub fn aggregate<A, F>(self, init: A, f: F) -> DataStream<A>
17 where
18 A: Clone + Send + Sync + 'static,
19 F: Fn(A, T) -> A + Send + Sync + 'static,
20 {
21 let aggregator = WindowAggregator::new(self.window_config, init, f);
22 self.stream.transform(aggregator)
23 }
24}