fluxus_api/operators/
window_aggregator.rs

1use async_trait::async_trait;
2use fluxus_runtime::state::KeyedStateBackend;
3use fluxus_transformers::Operator;
4use fluxus_utils::{
5    models::{Record, StreamResult},
6    window::WindowConfig,
7};
8use std::marker::PhantomData;
9
10pub struct WindowAggregator<T, A, F> {
11    window_config: WindowConfig,
12    init: A,
13    f: F,
14    state: KeyedStateBackend<u64, A>,
15    _phantom: PhantomData<T>,
16}
17
18impl<T, A, F> WindowAggregator<T, A, F>
19where
20    A: Clone,
21    F: Fn(A, T) -> A,
22{
23    pub fn new(window_config: WindowConfig, init: A, f: F) -> Self {
24        Self {
25            window_config,
26            init,
27            f,
28            state: KeyedStateBackend::new(),
29            _phantom: PhantomData,
30        }
31    }
32
33    fn get_window_keys(&self, timestamp: i64) -> Vec<u64> {
34        self.window_config.window_type.get_window_keys(timestamp)
35    }
36}
37
38#[async_trait]
39impl<T, A, F> Operator<T, A> for WindowAggregator<T, A, F>
40where
41    T: Clone + Send + Sync + 'static,
42    A: Clone + Send + Sync + 'static,
43    F: Fn(A, T) -> A + Send + Sync,
44{
45    async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<A>>> {
46        let mut results = Vec::new();
47
48        for window_key in self.get_window_keys(record.timestamp) {
49            let current = self
50                .state
51                .get(&window_key)
52                .unwrap_or_else(|| self.init.clone());
53            let new_value = (self.f)(current, record.data.clone());
54            self.state.set(window_key, new_value.clone());
55
56            results.push(Record {
57                data: new_value,
58                timestamp: record.timestamp,
59            });
60        }
61
62        Ok(results)
63    }
64}