fluxus_transformers/operator/
window_reduce.rs1use async_trait::async_trait;
2use fluxus_utils::models::{Record, StreamResult};
3use fluxus_utils::time::current_time;
4use fluxus_utils::window::{WindowConfig, WindowType};
5use std::collections::HashMap;
6use std::marker::PhantomData;
7
8pub struct WindowReduceOperator<T, F>
10where
11 T: Clone,
12 F: Fn(T, T) -> T + Send + Sync,
13{
14 func: F,
15 window: WindowConfig,
16 buffer: HashMap<i64, Vec<Record<T>>>,
17 _phantom: PhantomData<T>,
18}
19
20impl<T, F> WindowReduceOperator<T, F>
21where
22 T: Clone,
23 F: Fn(T, T) -> T + Send + Sync,
24{
25 pub fn new(func: F, window: WindowConfig) -> Self {
26 Self {
27 func,
28 window,
29 buffer: HashMap::new(),
30 _phantom: PhantomData,
31 }
32 }
33
34 fn get_affected_windows(&self, timestamp: i64) -> Vec<i64> {
35 self.window.window_type.get_affected_windows(timestamp)
36 }
37
38 fn process_window(&self, records: &[Record<T>]) -> Option<Record<T>> {
39 records.first().map(|first| {
40 let result = records[1..].iter().fold(first.data.clone(), |acc, record| {
41 (self.func)(acc, record.data.clone())
42 });
43 Record {
44 data: result,
45 timestamp: first.timestamp,
46 }
47 })
48 }
49}
50
51#[async_trait]
52impl<T, F> super::Operator<T, T> for WindowReduceOperator<T, F>
53where
54 T: Clone + Send,
55 F: Fn(T, T) -> T + Send + Sync,
56{
57 async fn process(&mut self, record: Record<T>) -> StreamResult<Vec<Record<T>>> {
58 let mut results = Vec::new();
59
60 let window_keys = self.get_affected_windows(record.timestamp);
62
63 for window_key in window_keys {
65 let records = self.buffer.entry(window_key).or_default();
66 records.push(record.clone());
67
68 let window_records = records.clone();
70 if let Some(result) = self.process_window(&window_records) {
71 results.push(result);
72 }
73 }
74
75 Ok(results)
76 }
77
78 async fn on_window_trigger(&mut self) -> StreamResult<Vec<Record<T>>> {
79 let mut results = Vec::new();
80 let now = current_time() as i64;
81
82 let expired_keys: Vec<_> = self
84 .buffer
85 .keys()
86 .filter(|&&key| match &self.window.window_type {
87 WindowType::Tumbling(duration) => {
88 key + duration.as_millis() as i64
89 + self.window.allow_lateness.as_millis() as i64
90 <= now
91 }
92 WindowType::Sliding(size, _) => {
93 key + size.as_millis() as i64 + self.window.allow_lateness.as_millis() as i64
94 <= now
95 }
96 WindowType::Session(gap) => {
97 key + gap.as_millis() as i64 + self.window.allow_lateness.as_millis() as i64
98 <= now
99 }
100 WindowType::Global => {
101 false
103 }
104 })
105 .cloned()
106 .collect();
107
108 for key in expired_keys {
109 if let Some(records) = self.buffer.remove(&key) {
110 if let Some(result) = self.process_window(&records) {
111 results.push(result);
112 }
113 }
114 }
115
116 Ok(results)
117 }
118}