fluxus_api/operators/
window_aggregator.rs1use async_trait::async_trait;
2use fluxus_runtime::state::KeyedStateBackend;
3use fluxus_transformers::Operator;
4use fluxus_utils::{
5 models::{Record, StreamResult},
6 window::WindowConfig,
7};
8use std::marker::PhantomData;
9
10pub struct WindowAggregator<T, A, F> {
11 window_config: WindowConfig,
12 init: A,
13 f: F,
14 state: KeyedStateBackend<u64, A>,
15 _phantom: PhantomData<T>,
16}
17
18impl<T, A, F> WindowAggregator<T, A, F>
19where
20 A: Clone,
21 F: Fn(A, T) -> A,
22{
23 pub fn new(window_config: WindowConfig, init: A, f: F) -> Self {
24 Self {
25 window_config,
26 init,
27 f,
28 state: KeyedStateBackend::new(),
29 _phantom: PhantomData,
30 }
31 }
32
33 fn get_window_keys(&self, timestamp: i64) -> Vec<u64> {
34 self.window_config.window_type.get_window_keys(timestamp)
35 }
36}
37
38#[async_trait]
39impl<T, A, F> Operator<T, A> for WindowAggregator<T, A, F>
40where
41 T: Clone + Send + Sync + 'static,
42 A: Clone + Send + Sync + 'static,
43 F: Fn(A, T) -> A + Send + Sync,
44{
45 async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<A>>> {
46 let mut results = Vec::new();
47
48 for window_key in self.get_window_keys(record.timestamp) {
49 let current = self
50 .state
51 .get(&window_key)
52 .unwrap_or_else(|| self.init.clone());
53 let new_value = (self.f)(current, record.data.clone());
54 self.state.set(window_key, new_value.clone());
55
56 results.push(Record {
57 data: new_value,
58 timestamp: record.timestamp,
59 });
60 }
61
62 Ok(results)
63 }
64}