fluxus_transformers/operator/
window_reduce.rs

1use async_trait::async_trait;
2use fluxus_utils::models::{Record, StreamResult};
3use fluxus_utils::time::current_time;
4use fluxus_utils::window::{WindowConfig, WindowType};
5use std::collections::HashMap;
6use std::marker::PhantomData;
7
8/// Built-in window reduce operator
9pub struct WindowReduceOperator<T, F>
10where
11    T: Clone,
12    F: Fn(T, T) -> T + Send + Sync,
13{
14    func: F,
15    window: WindowConfig,
16    buffer: HashMap<i64, Vec<Record<T>>>,
17    _phantom: PhantomData<T>,
18}
19
20impl<T, F> WindowReduceOperator<T, F>
21where
22    T: Clone,
23    F: Fn(T, T) -> T + Send + Sync,
24{
25    pub fn new(func: F, window: WindowConfig) -> Self {
26        Self {
27            func,
28            window,
29            buffer: HashMap::new(),
30            _phantom: PhantomData,
31        }
32    }
33
34    fn get_affected_windows(&self, timestamp: i64) -> Vec<i64> {
35        self.window.window_type.get_affected_windows(timestamp)
36    }
37
38    fn process_window(&self, records: &[Record<T>]) -> Option<Record<T>> {
39        records.first().map(|first| {
40            let result = records[1..].iter().fold(first.data.clone(), |acc, record| {
41                (self.func)(acc, record.data.clone())
42            });
43            Record {
44                data: result,
45                timestamp: first.timestamp,
46            }
47        })
48    }
49}
50
51#[async_trait]
52impl<T, F> super::Operator<T, T> for WindowReduceOperator<T, F>
53where
54    T: Clone + Send,
55    F: Fn(T, T) -> T + Send + Sync,
56{
57    async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<T>>> {
58        let mut results = Vec::new();
59
60        // Get all windows that this record belongs to
61        let window_keys = self.get_affected_windows(record.timestamp);
62
63        // Add the record to all relevant windows
64        for window_key in window_keys {
65            let records = self.buffer.entry(window_key).or_default();
66            records.push(record.clone());
67
68            // Process each affected window
69            let window_records = records.clone();
70            if let Some(result) = self.process_window(&window_records) {
71                results.push(result);
72            }
73        }
74
75        Ok(results)
76    }
77
78    async fn on_window_trigger(&mut self) -> StreamResult<Vec<Record<T>>> {
79        let mut results = Vec::new();
80        let now = current_time() as i64;
81
82        // Process and remove expired windows
83        let expired_keys: Vec<_> = self
84            .buffer
85            .keys()
86            .filter(|&&key| match &self.window.window_type {
87                WindowType::Tumbling(duration) => {
88                    key + duration.as_millis() as i64
89                        + self.window.allow_lateness.as_millis() as i64
90                        <= now
91                }
92                WindowType::Sliding(size, _) => {
93                    key + size.as_millis() as i64 + self.window.allow_lateness.as_millis() as i64
94                        <= now
95                }
96                WindowType::Session(gap) => {
97                    key + gap.as_millis() as i64 + self.window.allow_lateness.as_millis() as i64
98                        <= now
99                }
100                WindowType::Global => {
101                    // Global window doesn't expire based on time, so it's never considered expired here
102                    false
103                }
104            })
105            .cloned()
106            .collect();
107
108        for key in expired_keys {
109            if let Some(records) = self.buffer.remove(&key) {
110                if let Some(result) = self.process_window(&records) {
111                    results.push(result);
112                }
113            }
114        }
115
116        Ok(results)
117    }
118}