fluxus_api/operators/
window_aggregator.rs1use async_trait::async_trait;
2use fluxus_runtime::state::KeyedStateBackend;
3use fluxus_transformers::Operator;
4use fluxus_utils::{
5 models::{Record, StreamResult},
6 window::WindowConfig,
7};
8use std::marker::PhantomData;
9
10pub struct WindowAggregator<T, A, F> {
11 window_config: WindowConfig,
12 init: A,
13 f: F,
14 state: KeyedStateBackend<u64, A>,
15 _phantom: PhantomData<T>,
16}
17
18impl<T, A, F> WindowAggregator<T, A, F>
19where
20 A: Clone,
21 F: Fn(A, T) -> A,
22{
23 pub fn new(window_config: WindowConfig, init: A, f: F) -> Self {
24 Self {
25 window_config,
26 init,
27 f,
28 state: KeyedStateBackend::new(),
29 _phantom: PhantomData,
30 }
31 }
32
33 fn get_window_keys(&self, timestamp: i64) -> Vec<u64> {
34 self.window_config.window_type.get_window_keys(timestamp)
35 }
36}
37
38#[async_trait]
39impl<T, A, F> Operator<T, A> for WindowAggregator<T, A, F>
40where
41 T: Clone + Send + Sync + 'static,
42 A: Clone + Send + Sync + 'static,
43 F: Fn(A, T) -> A + Send + Sync,
44{
45 async fn init(&mut self) -> StreamResult<()> {
46 Ok(())
47 }
48
49 async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<A>>> {
50 let mut results = Vec::new();
51
52 for window_key in self.get_window_keys(record.timestamp) {
53 let current = self
54 .state
55 .get(&window_key)
56 .unwrap_or_else(|| self.init.clone());
57 let new_value = (self.f)(current, record.data.clone());
58 self.state.set(window_key, new_value.clone());
59
60 results.push(Record {
61 data: new_value,
62 timestamp: record.timestamp,
63 });
64 }
65
66 Ok(results)
67 }
68
69 async fn close(&mut self) -> StreamResult<()> {
70 Ok(())
71 }
72}