fluxus_api/operators/
window_aggregator.rs

1use async_trait::async_trait;
2use fluxus_core::{Operator, Record, StreamResult, WindowConfig};
3use fluxus_runtime::state::KeyedStateBackend;
4use std::marker::PhantomData;
5
6pub struct WindowAggregator<T, A, F> {
7    window_config: WindowConfig,
8    init: A,
9    f: F,
10    state: KeyedStateBackend<u64, A>,
11    _phantom: PhantomData<T>,
12}
13
14impl<T, A, F> WindowAggregator<T, A, F>
15where
16    A: Clone,
17    F: Fn(A, T) -> A,
18{
19    pub fn new(window_config: WindowConfig, init: A, f: F) -> Self {
20        Self {
21            window_config,
22            init,
23            f,
24            state: KeyedStateBackend::new(),
25            _phantom: PhantomData,
26        }
27    }
28
29    fn get_window_keys(&self, timestamp: i64) -> Vec<u64> {
30        self.window_config.window_type.get_window_keys(timestamp)
31    }
32}
33
34#[async_trait]
35impl<T, A, F> Operator<T, A> for WindowAggregator<T, A, F>
36where
37    T: Clone + Send + Sync + 'static,
38    A: Clone + Send + Sync + 'static,
39    F: Fn(A, T) -> A + Send + Sync,
40{
41    async fn init(&mut self) -> StreamResult<()> {
42        Ok(())
43    }
44
45    async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<A>>> {
46        let mut results = Vec::new();
47
48        for window_key in self.get_window_keys(record.timestamp) {
49            let current = self
50                .state
51                .get(&window_key)
52                .unwrap_or_else(|| self.init.clone());
53            let new_value = (self.f)(current, record.data.clone());
54            self.state.set(window_key, new_value.clone());
55
56            results.push(Record {
57                data: new_value,
58                timestamp: record.timestamp,
59            });
60        }
61
62        Ok(results)
63    }
64
65    async fn close(&mut self) -> StreamResult<()> {
66        Ok(())
67    }
68}