rust_rule_engine/streaming/
window.rs1use crate::streaming::event::StreamEvent;
6use std::collections::VecDeque;
7use std::time::Duration;
8
9#[derive(Debug, Clone, PartialEq)]
11pub enum WindowType {
12 Sliding,
14 Tumbling,
16 Session { timeout: Duration },
18}
19
20#[derive(Debug)]
22pub struct TimeWindow {
23 pub window_type: WindowType,
25 pub duration: Duration,
27 events: VecDeque<StreamEvent>,
29 pub start_time: u64,
31 pub end_time: u64,
33 max_events: usize,
35}
36
37impl TimeWindow {
38 pub fn new(
40 window_type: WindowType,
41 duration: Duration,
42 start_time: u64,
43 max_events: usize,
44 ) -> Self {
45 let end_time = start_time + duration.as_millis() as u64;
46
47 Self {
48 window_type,
49 duration,
50 events: VecDeque::new(),
51 start_time,
52 end_time,
53 max_events,
54 }
55 }
56
57 pub fn add_event(&mut self, event: StreamEvent) -> bool {
59 if self.contains_timestamp(event.metadata.timestamp) {
60 self.events.push_back(event);
61
62 while self.events.len() > self.max_events {
64 self.events.pop_front();
65 }
66
67 true
68 } else {
69 false
70 }
71 }
72
73 pub fn contains_timestamp(&self, timestamp: u64) -> bool {
75 timestamp >= self.start_time && timestamp < self.end_time
76 }
77
78 pub fn events(&self) -> &VecDeque<StreamEvent> {
80 &self.events
81 }
82
83 pub fn count(&self) -> usize {
85 self.events.len()
86 }
87
88 pub fn is_expired(&self, current_time: u64) -> bool {
90 current_time >= self.end_time
91 }
92
93 pub fn duration_ms(&self) -> u64 {
95 self.duration.as_millis() as u64
96 }
97
98 pub fn clear(&mut self) {
100 self.events.clear();
101 }
102
103 pub fn events_by_type(&self, event_type: &str) -> Vec<&StreamEvent> {
105 self.events
106 .iter()
107 .filter(|e| e.event_type == event_type)
108 .collect()
109 }
110
111 pub fn sum(&self, field: &str) -> f64 {
113 self.events
114 .iter()
115 .filter_map(|e| e.get_numeric(field))
116 .sum()
117 }
118
119 pub fn average(&self, field: &str) -> Option<f64> {
121 let values: Vec<f64> = self
122 .events
123 .iter()
124 .filter_map(|e| e.get_numeric(field))
125 .collect();
126
127 if values.is_empty() {
128 None
129 } else {
130 Some(values.iter().sum::<f64>() / values.len() as f64)
131 }
132 }
133
134 pub fn min(&self, field: &str) -> Option<f64> {
136 self.events
137 .iter()
138 .filter_map(|e| e.get_numeric(field))
139 .fold(None, |acc, x| match acc {
140 None => Some(x),
141 Some(min) => Some(min.min(x)),
142 })
143 }
144
145 pub fn max(&self, field: &str) -> Option<f64> {
147 self.events
148 .iter()
149 .filter_map(|e| e.get_numeric(field))
150 .fold(None, |acc, x| match acc {
151 None => Some(x),
152 Some(max) => Some(max.max(x)),
153 })
154 }
155
156 pub fn latest_timestamp(&self) -> Option<u64> {
158 self.events.iter().map(|e| e.metadata.timestamp).max()
159 }
160
161 pub fn events_in_range(&self, start: u64, end: u64) -> Vec<&StreamEvent> {
163 self.events
164 .iter()
165 .filter(|e| e.metadata.timestamp >= start && e.metadata.timestamp < end)
166 .collect()
167 }
168}
169
170#[derive(Debug)]
172pub struct WindowManager {
173 windows: Vec<TimeWindow>,
175 window_type: WindowType,
177 duration: Duration,
179 max_events_per_window: usize,
181 max_windows: usize,
183}
184
185impl WindowManager {
186 pub fn new(
188 window_type: WindowType,
189 duration: Duration,
190 max_events_per_window: usize,
191 max_windows: usize,
192 ) -> Self {
193 Self {
194 windows: Vec::new(),
195 window_type,
196 duration,
197 max_events_per_window,
198 max_windows,
199 }
200 }
201
202 pub fn process_event(&mut self, event: StreamEvent) {
204 let event_time = event.metadata.timestamp;
205
206 let mut added = false;
208
209 for window in &mut self.windows {
210 if window.add_event(event.clone()) {
211 added = true;
212 break;
213 }
214 }
215
216 if !added {
217 let window_start = self.calculate_window_start(event_time);
219 let mut new_window = TimeWindow::new(
220 self.window_type.clone(),
221 self.duration,
222 window_start,
223 self.max_events_per_window,
224 );
225
226 new_window.add_event(event);
227 self.windows.push(new_window);
228 }
229
230 self.cleanup_expired_windows(event_time);
232
233 while self.windows.len() > self.max_windows {
235 self.windows.remove(0);
236 }
237
238 self.windows.sort_by_key(|w| w.start_time);
240 }
241
242 fn calculate_window_start(&self, event_time: u64) -> u64 {
244 match self.window_type {
245 WindowType::Tumbling => {
246 let window_ms = self.duration.as_millis() as u64;
247 (event_time / window_ms) * window_ms
248 }
249 WindowType::Sliding | WindowType::Session { .. } => event_time,
250 }
251 }
252
253 fn cleanup_expired_windows(&mut self, current_time: u64) {
255 self.windows
256 .retain(|window| !window.is_expired(current_time));
257 }
258
259 pub fn active_windows(&self) -> &[TimeWindow] {
261 &self.windows
262 }
263
264 pub fn latest_window(&self) -> Option<&TimeWindow> {
266 self.windows.last()
267 }
268
269 pub fn total_event_count(&self) -> usize {
271 self.windows.iter().map(|w| w.count()).sum()
272 }
273
274 pub fn windows_with_event_type(&self, event_type: &str) -> Vec<&TimeWindow> {
276 self.windows
277 .iter()
278 .filter(|w| w.events().iter().any(|e| e.event_type == event_type))
279 .collect()
280 }
281
282 pub fn aggregate_across_windows<F>(&self, aggregator: F) -> f64
284 where
285 F: Fn(&TimeWindow) -> f64,
286 {
287 self.windows.iter().map(aggregator).sum()
288 }
289
290 pub fn get_statistics(&self) -> WindowStatistics {
292 WindowStatistics {
293 total_windows: self.windows.len(),
294 total_events: self.total_event_count(),
295 oldest_window_start: self.windows.first().map(|w| w.start_time),
296 newest_window_start: self.windows.last().map(|w| w.start_time),
297 average_events_per_window: if self.windows.is_empty() {
298 0.0
299 } else {
300 self.total_event_count() as f64 / self.windows.len() as f64
301 },
302 }
303 }
304}
305
306#[derive(Debug, Clone)]
308pub struct WindowStatistics {
309 pub total_windows: usize,
311 pub total_events: usize,
313 pub oldest_window_start: Option<u64>,
315 pub newest_window_start: Option<u64>,
317 pub average_events_per_window: f64,
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324 use crate::types::Value;
325 use std::collections::HashMap;
326
327 #[test]
328 fn test_time_window_creation() {
329 let window = TimeWindow::new(WindowType::Sliding, Duration::from_secs(60), 1000, 100);
330
331 assert_eq!(window.start_time, 1000);
332 assert_eq!(window.end_time, 61000);
333 assert_eq!(window.count(), 0);
334 }
335
336 #[test]
337 fn test_window_event_addition() {
338 let mut window = TimeWindow::new(WindowType::Sliding, Duration::from_secs(60), 1000, 100);
339
340 let mut data = HashMap::new();
341 data.insert("value".to_string(), Value::Number(10.0));
342
343 let event = StreamEvent::with_timestamp("TestEvent", data, "test", 30000);
344
345 assert!(window.add_event(event));
346 assert_eq!(window.count(), 1);
347 }
348
349 #[test]
350 fn test_window_aggregations() {
351 let mut window = TimeWindow::new(WindowType::Sliding, Duration::from_secs(60), 1000, 100);
352
353 for i in 0..5 {
355 let mut data = HashMap::new();
356 data.insert("value".to_string(), Value::Number(i as f64));
357
358 let event = StreamEvent::with_timestamp("TestEvent", data, "test", 30000 + i);
359 window.add_event(event);
360 }
361
362 assert_eq!(window.sum("value"), 10.0); assert_eq!(window.average("value"), Some(2.0));
364 assert_eq!(window.min("value"), Some(0.0));
365 assert_eq!(window.max("value"), Some(4.0));
366 }
367
368 #[test]
369 fn test_window_manager() {
370 let mut manager = WindowManager::new(WindowType::Sliding, Duration::from_secs(60), 100, 10);
371
372 let mut data = HashMap::new();
373 data.insert("value".to_string(), Value::Number(1.0));
374
375 let event = StreamEvent::with_timestamp("TestEvent", data, "test", 30000);
376 manager.process_event(event);
377
378 assert_eq!(manager.active_windows().len(), 1);
379 assert_eq!(manager.total_event_count(), 1);
380 }
381}