fluxus_api/stream/
windowed_stream.rs1use fluxus_utils::window::WindowConfig;
2
3use crate::operators::WindowAggregator;
4use crate::stream::datastream::DataStream;
5
6pub struct WindowedStream<T> {
8 pub(crate) stream: DataStream<T>,
9 pub(crate) window_config: WindowConfig,
10}
11
12impl<T> WindowedStream<T>
13where
14 T: Clone + Send + Sync + 'static,
15{
16 pub fn aggregate<A, F>(self, init: A, f: F) -> DataStream<A>
18 where
19 A: Clone + Send + Sync + 'static,
20 F: Fn(A, T) -> A + Send + Sync + 'static,
21 {
22 let aggregator = WindowAggregator::new(self.window_config, init, f);
23 self.stream.transform(aggregator)
24 }
25}