rust_rule_engine/streaming/
window.rs

1//! Time Window Management for Stream Processing
2//!
3//! Provides time-based windows for event aggregation and analysis.
4
5use crate::streaming::event::StreamEvent;
6use std::collections::VecDeque;
7use std::time::Duration;
8
9/// Type of time window
10#[derive(Debug, Clone, PartialEq)]
11pub enum WindowType {
12    /// Sliding window - continuously moves forward
13    Sliding,
14    /// Tumbling window - non-overlapping fixed intervals
15    Tumbling,
16    /// Session window - based on inactivity gaps
17    Session { timeout: Duration },
18}
19
20/// Time-based window for event processing
21#[derive(Debug)]
22pub struct TimeWindow {
23    /// Window type
24    pub window_type: WindowType,
25    /// Window duration
26    pub duration: Duration,
27    /// Events in this window
28    events: VecDeque<StreamEvent>,
29    /// Window start time (milliseconds since epoch)
30    pub start_time: u64,
31    /// Window end time (milliseconds since epoch)
32    pub end_time: u64,
33    /// Maximum number of events to retain
34    max_events: usize,
35}
36
37impl TimeWindow {
38    /// Create a new time window
39    pub fn new(
40        window_type: WindowType,
41        duration: Duration,
42        start_time: u64,
43        max_events: usize,
44    ) -> Self {
45        let end_time = start_time + duration.as_millis() as u64;
46
47        Self {
48            window_type,
49            duration,
50            events: VecDeque::new(),
51            start_time,
52            end_time,
53            max_events,
54        }
55    }
56
57    /// Add event to window if it fits
58    pub fn add_event(&mut self, event: StreamEvent) -> bool {
59        if self.contains_timestamp(event.metadata.timestamp) {
60            self.events.push_back(event);
61
62            // Keep window size under limit
63            while self.events.len() > self.max_events {
64                self.events.pop_front();
65            }
66
67            true
68        } else {
69            false
70        }
71    }
72
73    /// Check if timestamp falls within this window
74    pub fn contains_timestamp(&self, timestamp: u64) -> bool {
75        timestamp >= self.start_time && timestamp < self.end_time
76    }
77
78    /// Get all events in window
79    pub fn events(&self) -> &VecDeque<StreamEvent> {
80        &self.events
81    }
82
83    /// Get event count
84    pub fn count(&self) -> usize {
85        self.events.len()
86    }
87
88    /// Check if window is expired
89    pub fn is_expired(&self, current_time: u64) -> bool {
90        current_time >= self.end_time
91    }
92
93    /// Get window duration in milliseconds
94    pub fn duration_ms(&self) -> u64 {
95        self.duration.as_millis() as u64
96    }
97
98    /// Clear all events from window
99    pub fn clear(&mut self) {
100        self.events.clear();
101    }
102
103    /// Get events filtered by type
104    pub fn events_by_type(&self, event_type: &str) -> Vec<&StreamEvent> {
105        self.events
106            .iter()
107            .filter(|e| e.event_type == event_type)
108            .collect()
109    }
110
111    /// Calculate sum of numeric field across events
112    pub fn sum(&self, field: &str) -> f64 {
113        self.events
114            .iter()
115            .filter_map(|e| e.get_numeric(field))
116            .sum()
117    }
118
119    /// Calculate average of numeric field across events
120    pub fn average(&self, field: &str) -> Option<f64> {
121        let values: Vec<f64> = self
122            .events
123            .iter()
124            .filter_map(|e| e.get_numeric(field))
125            .collect();
126
127        if values.is_empty() {
128            None
129        } else {
130            Some(values.iter().sum::<f64>() / values.len() as f64)
131        }
132    }
133
134    /// Find minimum value of numeric field
135    pub fn min(&self, field: &str) -> Option<f64> {
136        self.events
137            .iter()
138            .filter_map(|e| e.get_numeric(field))
139            .fold(None, |acc, x| match acc {
140                None => Some(x),
141                Some(min) => Some(min.min(x)),
142            })
143    }
144
145    /// Find maximum value of numeric field
146    pub fn max(&self, field: &str) -> Option<f64> {
147        self.events
148            .iter()
149            .filter_map(|e| e.get_numeric(field))
150            .fold(None, |acc, x| match acc {
151                None => Some(x),
152                Some(max) => Some(max.max(x)),
153            })
154    }
155
156    /// Get the latest event timestamp
157    pub fn latest_timestamp(&self) -> Option<u64> {
158        self.events.iter().map(|e| e.metadata.timestamp).max()
159    }
160
161    /// Get events within a sub-window
162    pub fn events_in_range(&self, start: u64, end: u64) -> Vec<&StreamEvent> {
163        self.events
164            .iter()
165            .filter(|e| e.metadata.timestamp >= start && e.metadata.timestamp < end)
166            .collect()
167    }
168}
169
170/// Manages multiple time windows for stream processing
171#[derive(Debug)]
172pub struct WindowManager {
173    /// Active windows
174    windows: Vec<TimeWindow>,
175    /// Window configuration
176    window_type: WindowType,
177    /// Window duration
178    duration: Duration,
179    /// Maximum events per window
180    max_events_per_window: usize,
181    /// Maximum number of windows to keep
182    max_windows: usize,
183}
184
185impl WindowManager {
186    /// Create a new window manager
187    pub fn new(
188        window_type: WindowType,
189        duration: Duration,
190        max_events_per_window: usize,
191        max_windows: usize,
192    ) -> Self {
193        Self {
194            windows: Vec::new(),
195            window_type,
196            duration,
197            max_events_per_window,
198            max_windows,
199        }
200    }
201
202    /// Process a new event through the window system
203    pub fn process_event(&mut self, event: StreamEvent) {
204        let event_time = event.metadata.timestamp;
205
206        // Find or create appropriate window
207        let mut added = false;
208
209        for window in &mut self.windows {
210            if window.add_event(event.clone()) {
211                added = true;
212                break;
213            }
214        }
215
216        if !added {
217            // Create new window for this event
218            let window_start = self.calculate_window_start(event_time);
219            let mut new_window = TimeWindow::new(
220                self.window_type.clone(),
221                self.duration,
222                window_start,
223                self.max_events_per_window,
224            );
225
226            new_window.add_event(event);
227            self.windows.push(new_window);
228        }
229
230        // Clean up expired windows
231        self.cleanup_expired_windows(event_time);
232
233        // Limit total number of windows
234        while self.windows.len() > self.max_windows {
235            self.windows.remove(0);
236        }
237
238        // Sort windows by start time
239        self.windows.sort_by_key(|w| w.start_time);
240    }
241
242    /// Calculate window start time based on window type
243    fn calculate_window_start(&self, event_time: u64) -> u64 {
244        match self.window_type {
245            WindowType::Tumbling => {
246                let window_ms = self.duration.as_millis() as u64;
247                (event_time / window_ms) * window_ms
248            }
249            WindowType::Sliding | WindowType::Session { .. } => event_time,
250        }
251    }
252
253    /// Remove expired windows
254    fn cleanup_expired_windows(&mut self, current_time: u64) {
255        self.windows
256            .retain(|window| !window.is_expired(current_time));
257    }
258
259    /// Get all active windows
260    pub fn active_windows(&self) -> &[TimeWindow] {
261        &self.windows
262    }
263
264    /// Get the latest window
265    pub fn latest_window(&self) -> Option<&TimeWindow> {
266        self.windows.last()
267    }
268
269    /// Get total event count across all windows
270    pub fn total_event_count(&self) -> usize {
271        self.windows.iter().map(|w| w.count()).sum()
272    }
273
274    /// Get windows that contain events of a specific type
275    pub fn windows_with_event_type(&self, event_type: &str) -> Vec<&TimeWindow> {
276        self.windows
277            .iter()
278            .filter(|w| w.events().iter().any(|e| e.event_type == event_type))
279            .collect()
280    }
281
282    /// Calculate aggregate across all windows
283    pub fn aggregate_across_windows<F>(&self, aggregator: F) -> f64
284    where
285        F: Fn(&TimeWindow) -> f64,
286    {
287        self.windows.iter().map(aggregator).sum()
288    }
289
290    /// Get window statistics
291    pub fn get_statistics(&self) -> WindowStatistics {
292        WindowStatistics {
293            total_windows: self.windows.len(),
294            total_events: self.total_event_count(),
295            oldest_window_start: self.windows.first().map(|w| w.start_time),
296            newest_window_start: self.windows.last().map(|w| w.start_time),
297            average_events_per_window: if self.windows.is_empty() {
298                0.0
299            } else {
300                self.total_event_count() as f64 / self.windows.len() as f64
301            },
302        }
303    }
304}
305
306/// Statistics about window manager state
307#[derive(Debug, Clone)]
308pub struct WindowStatistics {
309    /// Total number of active windows
310    pub total_windows: usize,
311    /// Total events across all windows
312    pub total_events: usize,
313    /// Start time of oldest window
314    pub oldest_window_start: Option<u64>,
315    /// Start time of newest window
316    pub newest_window_start: Option<u64>,
317    /// Average events per window
318    pub average_events_per_window: f64,
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324    use crate::types::Value;
325    use std::collections::HashMap;
326
327    #[test]
328    fn test_time_window_creation() {
329        let window = TimeWindow::new(WindowType::Sliding, Duration::from_secs(60), 1000, 100);
330
331        assert_eq!(window.start_time, 1000);
332        assert_eq!(window.end_time, 61000);
333        assert_eq!(window.count(), 0);
334    }
335
336    #[test]
337    fn test_window_event_addition() {
338        let mut window = TimeWindow::new(WindowType::Sliding, Duration::from_secs(60), 1000, 100);
339
340        let mut data = HashMap::new();
341        data.insert("value".to_string(), Value::Number(10.0));
342
343        let event = StreamEvent::with_timestamp("TestEvent", data, "test", 30000);
344
345        assert!(window.add_event(event));
346        assert_eq!(window.count(), 1);
347    }
348
349    #[test]
350    fn test_window_aggregations() {
351        let mut window = TimeWindow::new(WindowType::Sliding, Duration::from_secs(60), 1000, 100);
352
353        // Add test events
354        for i in 0..5 {
355            let mut data = HashMap::new();
356            data.insert("value".to_string(), Value::Number(i as f64));
357
358            let event = StreamEvent::with_timestamp("TestEvent", data, "test", 30000 + i);
359            window.add_event(event);
360        }
361
362        assert_eq!(window.sum("value"), 10.0); // 0+1+2+3+4
363        assert_eq!(window.average("value"), Some(2.0));
364        assert_eq!(window.min("value"), Some(0.0));
365        assert_eq!(window.max("value"), Some(4.0));
366    }
367
368    #[test]
369    fn test_window_manager() {
370        let mut manager = WindowManager::new(WindowType::Sliding, Duration::from_secs(60), 100, 10);
371
372        let mut data = HashMap::new();
373        data.insert("value".to_string(), Value::Number(1.0));
374
375        let event = StreamEvent::with_timestamp("TestEvent", data, "test", 30000);
376        manager.process_event(event);
377
378        assert_eq!(manager.active_windows().len(), 1);
379        assert_eq!(manager.total_event_count(), 1);
380    }
381}