fluxus_api/operators/
window_aggregator.rs1use async_trait::async_trait;
2use fluxus_core::{Operator, Record, StreamResult, WindowConfig};
3use fluxus_runtime::state::KeyedStateBackend;
4use std::marker::PhantomData;
5
6pub struct WindowAggregator<T, A, F> {
7 window_config: WindowConfig,
8 init: A,
9 f: F,
10 state: KeyedStateBackend<u64, A>,
11 _phantom: PhantomData<T>,
12}
13
14impl<T, A, F> WindowAggregator<T, A, F>
15where
16 A: Clone,
17 F: Fn(A, T) -> A,
18{
19 pub fn new(window_config: WindowConfig, init: A, f: F) -> Self {
20 Self {
21 window_config,
22 init,
23 f,
24 state: KeyedStateBackend::new(),
25 _phantom: PhantomData,
26 }
27 }
28
29 fn get_window_keys(&self, timestamp: i64) -> Vec<u64> {
30 self.window_config.window_type.get_window_keys(timestamp)
31 }
32}
33
34#[async_trait]
35impl<T, A, F> Operator<T, A> for WindowAggregator<T, A, F>
36where
37 T: Clone + Send + Sync + 'static,
38 A: Clone + Send + Sync + 'static,
39 F: Fn(A, T) -> A + Send + Sync,
40{
41 async fn init(&mut self) -> StreamResult<()> {
42 Ok(())
43 }
44
45 async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<A>>> {
46 let mut results = Vec::new();
47
48 for window_key in self.get_window_keys(record.timestamp) {
49 let current = self
50 .state
51 .get(&window_key)
52 .unwrap_or_else(|| self.init.clone());
53 let new_value = (self.f)(current, record.data.clone());
54 self.state.set(window_key, new_value.clone());
55
56 results.push(Record {
57 data: new_value,
58 timestamp: record.timestamp,
59 });
60 }
61
62 Ok(results)
63 }
64
65 async fn close(&mut self) -> StreamResult<()> {
66 Ok(())
67 }
68}