Skip to main content

shape_runtime/
window_manager.rs

1//! Generic window manager for time-series aggregations
2//!
3//! Provides windowing operations for streaming data:
4//! - Tumbling (fixed non-overlapping)
5//! - Sliding (overlapping)
6//! - Session (gap-based)
7//! - Count-based
8//! - Cumulative
9//!
10//! This module is industry-agnostic and works with any timestamped data.
11
12use chrono::{DateTime, Duration, Utc};
13use shape_value::KindedSlot;
14use std::collections::HashMap;
15
16use shape_ast::error::Result;
17/// Window type for aggregations
18#[derive(Debug, Clone)]
19pub enum WindowType {
20    /// Fixed non-overlapping windows
21    Tumbling { size: Duration },
22    /// Overlapping windows with a slide interval
23    Sliding { size: Duration, slide: Duration },
24    /// Windows based on inactivity gaps
25    Session { gap: Duration },
26    /// Count-based windows (every N records)
27    Count { size: usize },
28    /// Cumulative from start
29    Cumulative,
30}
31
32impl WindowType {
33    /// Create a tumbling window
34    pub fn tumbling(size: Duration) -> Self {
35        WindowType::Tumbling { size }
36    }
37
38    /// Create a sliding window
39    pub fn sliding(size: Duration, slide: Duration) -> Self {
40        WindowType::Sliding { size, slide }
41    }
42
43    /// Create a session window
44    pub fn session(gap: Duration) -> Self {
45        WindowType::Session { gap }
46    }
47
48    /// Create a count-based window
49    pub fn count(size: usize) -> Self {
50        WindowType::Count { size }
51    }
52
53    /// Create a cumulative window
54    pub fn cumulative() -> Self {
55        WindowType::Cumulative
56    }
57}
58
59/// A single data point in a window
60#[derive(Debug, Clone)]
61pub struct WindowDataPoint {
62    pub timestamp: DateTime<Utc>,
63    /// Raw field values. `KindedSlot`'s explicit `Drop` / `Clone` dispatch
64    /// on `NativeKind` to release / bump heap refcounts, so
65    /// `HashMap<String, KindedSlot>` preserves the WB2.4 retain discipline
66    /// across snapshot/aggregator clones (per ADR-006 §2.7).
67    pub fields: HashMap<String, KindedSlot>,
68}
69
70/// A completed window with aggregated data
71#[derive(Debug, Clone)]
72pub struct WindowResult {
73    /// Window start time
74    pub start: DateTime<Utc>,
75    /// Window end time
76    pub end: DateTime<Utc>,
77    /// Number of data points in window
78    pub count: usize,
79    /// Aggregated values
80    pub aggregates: HashMap<String, f64>,
81}
82
83/// Aggregation function type
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum AggregateFunction {
86    Sum,
87    Avg,
88    Min,
89    Max,
90    Count,
91    First,
92    Last,
93    StdDev,
94    Variance,
95}
96
97/// Aggregation specification
98#[derive(Debug, Clone)]
99pub struct AggregateSpec {
100    pub field: String,
101    pub function: AggregateFunction,
102    pub output_name: String,
103}
104
105/// Window state for tracking active windows
106#[derive(Debug)]
107struct WindowState {
108    start: DateTime<Utc>,
109    data: Vec<WindowDataPoint>,
110    last_timestamp: Option<DateTime<Utc>>,
111}
112
113/// Generic window manager for streaming aggregations
114pub struct WindowManager {
115    /// Window type configuration
116    window_type: WindowType,
117    /// Aggregation specifications
118    aggregates: Vec<AggregateSpec>,
119    /// Active windows (for sliding/session)
120    active_windows: Vec<WindowState>,
121    /// Current tumbling window
122    current_window: Option<WindowState>,
123    /// Count for count-based windows
124    current_count: usize,
125    /// Cumulative data (for cumulative windows)
126    cumulative_data: Vec<WindowDataPoint>,
127    /// Completed windows waiting to be emitted
128    completed_windows: Vec<WindowResult>,
129}
130
131impl WindowManager {
132    /// Create a new window manager
133    pub fn new(window_type: WindowType) -> Self {
134        Self {
135            window_type,
136            aggregates: Vec::new(),
137            active_windows: Vec::new(),
138            current_window: None,
139            current_count: 0,
140            cumulative_data: Vec::new(),
141            completed_windows: Vec::new(),
142        }
143    }
144
145    /// Add an aggregation specification
146    pub fn aggregate(
147        &mut self,
148        field: &str,
149        function: AggregateFunction,
150        output_name: &str,
151    ) -> &mut Self {
152        self.aggregates.push(AggregateSpec {
153            field: field.to_string(),
154            function,
155            output_name: output_name.to_string(),
156        });
157        self
158    }
159
160    /// Process a data point
161    pub fn process(
162        &mut self,
163        timestamp: DateTime<Utc>,
164        fields: HashMap<String, KindedSlot>,
165    ) -> Result<()> {
166        let data_point = WindowDataPoint { timestamp, fields };
167
168        match &self.window_type {
169            WindowType::Tumbling { size } => {
170                self.process_tumbling(&data_point, *size)?;
171            }
172            WindowType::Sliding { size, slide } => {
173                self.process_sliding(&data_point, *size, *slide)?;
174            }
175            WindowType::Session { gap } => {
176                self.process_session(&data_point, *gap)?;
177            }
178            WindowType::Count { size } => {
179                self.process_count(&data_point, *size)?;
180            }
181            WindowType::Cumulative => {
182                self.process_cumulative(&data_point)?;
183            }
184        }
185
186        Ok(())
187    }
188
189    /// Process a tumbling window data point
190    fn process_tumbling(&mut self, data_point: &WindowDataPoint, size: Duration) -> Result<()> {
191        let window_start = self.align_to_window(data_point.timestamp, size);
192
193        // Check if we need to close the current window
194        let should_close = self
195            .current_window
196            .as_ref()
197            .map(|w| data_point.timestamp >= w.start + size)
198            .unwrap_or(false);
199
200        if should_close {
201            // Take the window out to compute result
202            if let Some(window) = self.current_window.take() {
203                let result = self.compute_window_result(&window)?;
204                self.completed_windows.push(result);
205            }
206        }
207
208        // Add to current window or create new one
209        match &mut self.current_window {
210            Some(window) => {
211                window.data.push(data_point.clone());
212                window.last_timestamp = Some(data_point.timestamp);
213            }
214            None => {
215                self.current_window = Some(WindowState {
216                    start: window_start,
217                    data: vec![data_point.clone()],
218                    last_timestamp: Some(data_point.timestamp),
219                });
220            }
221        }
222
223        Ok(())
224    }
225
226    /// Process a sliding window data point
227    fn process_sliding(
228        &mut self,
229        data_point: &WindowDataPoint,
230        size: Duration,
231        slide: Duration,
232    ) -> Result<()> {
233        // Add point to all applicable windows
234        let ts = data_point.timestamp;
235
236        // Create new windows as needed
237        let window_start = self.align_to_window(ts, slide);
238
239        // Check if we need to create a new window
240        let needs_new_window = self.active_windows.is_empty()
241            || self
242                .active_windows
243                .last()
244                .map(|w| ts >= w.start + slide)
245                .unwrap_or(true);
246
247        if needs_new_window {
248            self.active_windows.push(WindowState {
249                start: window_start,
250                data: Vec::new(),
251                last_timestamp: None,
252            });
253        }
254
255        // Add point to all windows that contain this timestamp
256        for window in &mut self.active_windows {
257            if ts >= window.start && ts < window.start + size {
258                window.data.push(data_point.clone());
259                window.last_timestamp = Some(ts);
260            }
261        }
262
263        // Close windows that have ended
264        let mut closed_indices = Vec::new();
265        for (i, window) in self.active_windows.iter().enumerate() {
266            if ts >= window.start + size {
267                let result = self.compute_window_result(window)?;
268                self.completed_windows.push(result);
269                closed_indices.push(i);
270            }
271        }
272
273        // Remove closed windows (in reverse to maintain indices)
274        for i in closed_indices.into_iter().rev() {
275            self.active_windows.remove(i);
276        }
277
278        Ok(())
279    }
280
281    /// Process a session window data point
282    fn process_session(&mut self, data_point: &WindowDataPoint, gap: Duration) -> Result<()> {
283        // Check if we need to close the current session due to gap
284        let should_close = self
285            .current_window
286            .as_ref()
287            .and_then(|w| w.last_timestamp)
288            .map(|last_ts| data_point.timestamp - last_ts > gap)
289            .unwrap_or(false);
290
291        if should_close {
292            if let Some(window) = self.current_window.take() {
293                let result = self.compute_window_result(&window)?;
294                self.completed_windows.push(result);
295            }
296        }
297
298        // Add to current session or start new one
299        match &mut self.current_window {
300            Some(window) => {
301                window.data.push(data_point.clone());
302                window.last_timestamp = Some(data_point.timestamp);
303            }
304            None => {
305                self.current_window = Some(WindowState {
306                    start: data_point.timestamp,
307                    data: vec![data_point.clone()],
308                    last_timestamp: Some(data_point.timestamp),
309                });
310            }
311        }
312
313        Ok(())
314    }
315
316    /// Process a count-based window data point
317    fn process_count(&mut self, data_point: &WindowDataPoint, size: usize) -> Result<()> {
318        if self.current_window.is_none() {
319            self.current_window = Some(WindowState {
320                start: data_point.timestamp,
321                data: Vec::new(),
322                last_timestamp: None,
323            });
324        }
325
326        // Add to current window
327        if let Some(window) = &mut self.current_window {
328            window.data.push(data_point.clone());
329            window.last_timestamp = Some(data_point.timestamp);
330        }
331        self.current_count += 1;
332
333        // Check if window is complete
334        if self.current_count >= size {
335            if let Some(window) = self.current_window.take() {
336                let result = self.compute_window_result(&window)?;
337                self.completed_windows.push(result);
338            }
339            self.current_count = 0;
340        }
341
342        Ok(())
343    }
344
345    /// Process a cumulative window data point
346    fn process_cumulative(&mut self, data_point: &WindowDataPoint) -> Result<()> {
347        self.cumulative_data.push(data_point.clone());
348
349        // Create a window result for the cumulative state
350        let start = self
351            .cumulative_data
352            .first()
353            .map(|d| d.timestamp)
354            .unwrap_or(data_point.timestamp);
355        let end = data_point.timestamp;
356
357        let window = WindowState {
358            start,
359            data: self.cumulative_data.clone(),
360            last_timestamp: Some(end),
361        };
362
363        let result = self.compute_window_result(&window)?;
364        self.completed_windows.push(result);
365
366        Ok(())
367    }
368
369    /// Align timestamp to window boundary
370    fn align_to_window(&self, ts: DateTime<Utc>, size: Duration) -> DateTime<Utc> {
371        let epoch = DateTime::UNIX_EPOCH;
372        let since_epoch = ts - epoch;
373        let size_millis = size.num_milliseconds();
374
375        if size_millis == 0 {
376            return ts;
377        }
378
379        let aligned_millis = (since_epoch.num_milliseconds() / size_millis) * size_millis;
380        epoch + Duration::milliseconds(aligned_millis)
381    }
382
383    /// Compute aggregations for a window
384    fn compute_window_result(&self, window: &WindowState) -> Result<WindowResult> {
385        let mut aggregates = HashMap::new();
386
387        for spec in &self.aggregates {
388            let values: Vec<f64> = window
389                .data
390                .iter()
391                .filter_map(|d| d.fields.get(&spec.field).map(|v| v.slot().as_f64()))
392                .collect();
393
394            let result = self.compute_aggregate(&values, spec.function)?;
395            aggregates.insert(spec.output_name.clone(), result);
396        }
397
398        let end = window.last_timestamp.unwrap_or(window.start);
399
400        Ok(WindowResult {
401            start: window.start,
402            end,
403            count: window.data.len(),
404            aggregates,
405        })
406    }
407
408    /// Compute a single aggregate value
409    fn compute_aggregate(&self, values: &[f64], function: AggregateFunction) -> Result<f64> {
410        if values.is_empty() {
411            return Ok(f64::NAN);
412        }
413
414        Ok(match function {
415            AggregateFunction::Sum => values.iter().sum(),
416            AggregateFunction::Avg => values.iter().sum::<f64>() / values.len() as f64,
417            AggregateFunction::Min => values.iter().cloned().fold(f64::INFINITY, f64::min),
418            AggregateFunction::Max => values.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
419            AggregateFunction::Count => values.len() as f64,
420            AggregateFunction::First => values.first().copied().unwrap_or(f64::NAN),
421            AggregateFunction::Last => values.last().copied().unwrap_or(f64::NAN),
422            AggregateFunction::StdDev => {
423                let mean = values.iter().sum::<f64>() / values.len() as f64;
424                let variance =
425                    values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64;
426                variance.sqrt()
427            }
428            AggregateFunction::Variance => {
429                let mean = values.iter().sum::<f64>() / values.len() as f64;
430                values.iter().map(|v| (v - mean).powi(2)).sum::<f64>() / values.len() as f64
431            }
432        })
433    }
434
435    /// Take completed windows
436    pub fn take_completed(&mut self) -> Vec<WindowResult> {
437        std::mem::take(&mut self.completed_windows)
438    }
439
440    /// Flush any remaining windows (call at end of stream)
441    pub fn flush(&mut self) -> Result<Vec<WindowResult>> {
442        // Close any active windows
443        if let Some(ref window) = self.current_window {
444            let result = self.compute_window_result(window)?;
445            self.completed_windows.push(result);
446        }
447
448        for window in &self.active_windows {
449            let result = self.compute_window_result(window)?;
450            self.completed_windows.push(result);
451        }
452
453        self.current_window = None;
454        self.active_windows.clear();
455
456        Ok(self.take_completed())
457    }
458}
459
460#[cfg(test)]
461mod tests {
462    use super::*;
463
464    fn make_data_point(
465        timestamp: DateTime<Utc>,
466        value: f64,
467    ) -> (DateTime<Utc>, HashMap<String, KindedSlot>) {
468        let mut fields = HashMap::new();
469        fields.insert("value".to_string(), KindedSlot::from_number(value));
470        (timestamp, fields)
471    }
472
473    #[test]
474    fn test_tumbling_window() {
475        let mut manager = WindowManager::new(WindowType::tumbling(Duration::seconds(10)));
476        manager.aggregate("value", AggregateFunction::Sum, "sum");
477        manager.aggregate("value", AggregateFunction::Avg, "avg");
478
479        // Use a fixed base time that aligns well with 10-second windows
480        let base = DateTime::from_timestamp(1000000000, 0).unwrap(); // A nice round timestamp
481
482        // Add points in first window (0-9 seconds)
483        for i in 0..5 {
484            let (ts, fields) = make_data_point(base + Duration::seconds(i), 10.0);
485            manager.process(ts, fields).unwrap();
486        }
487
488        // Should have no completed windows yet (all within first 10-sec window)
489        assert!(
490            manager.take_completed().is_empty(),
491            "Expected no completed windows within first window"
492        );
493
494        // Add point in next window (at 15 seconds, triggers close of first window)
495        let (ts, fields) = make_data_point(base + Duration::seconds(15), 20.0);
496        manager.process(ts, fields).unwrap();
497
498        let completed = manager.take_completed();
499        assert_eq!(completed.len(), 1, "Expected exactly 1 completed window");
500        assert_eq!(completed[0].count, 5, "Expected 5 data points in window");
501        assert_eq!(completed[0].aggregates.get("sum"), Some(&50.0));
502        assert_eq!(completed[0].aggregates.get("avg"), Some(&10.0));
503    }
504
505    #[test]
506    fn test_count_window() {
507        let mut manager = WindowManager::new(WindowType::count(3));
508        manager.aggregate("value", AggregateFunction::Sum, "sum");
509
510        let base = DateTime::from_timestamp(1000000000, 0).unwrap();
511
512        for i in 0..3 {
513            let (ts, fields) = make_data_point(base + Duration::seconds(i as i64), (i + 1) as f64);
514            manager.process(ts, fields).unwrap();
515        }
516
517        let completed = manager.take_completed();
518        assert_eq!(completed.len(), 1);
519        assert_eq!(completed[0].count, 3);
520        assert_eq!(completed[0].aggregates.get("sum"), Some(&6.0)); // 1 + 2 + 3
521    }
522
523    #[test]
524    fn test_session_window() {
525        let mut manager = WindowManager::new(WindowType::session(Duration::seconds(5)));
526        manager.aggregate("value", AggregateFunction::Count, "count");
527
528        let base = DateTime::from_timestamp(1000000000, 0).unwrap();
529
530        // First session: 3 points close together
531        for i in 0..3 {
532            let (ts, fields) = make_data_point(base + Duration::seconds(i), 1.0);
533            manager.process(ts, fields).unwrap();
534        }
535
536        // Gap > 5 seconds, starts new session
537        let (ts, fields) = make_data_point(base + Duration::seconds(10), 1.0);
538        manager.process(ts, fields).unwrap();
539
540        let completed = manager.take_completed();
541        assert_eq!(completed.len(), 1); // First session closed
542        assert_eq!(completed[0].count, 3);
543    }
544
545    #[test]
546    fn test_aggregate_functions() {
547        let mut manager = WindowManager::new(WindowType::count(5));
548        manager.aggregate("value", AggregateFunction::Min, "min");
549        manager.aggregate("value", AggregateFunction::Max, "max");
550        manager.aggregate("value", AggregateFunction::StdDev, "std");
551
552        let base = DateTime::from_timestamp(1000000000, 0).unwrap();
553        let values = [1.0, 2.0, 3.0, 4.0, 5.0];
554
555        for (i, v) in values.iter().enumerate() {
556            let (ts, fields) = make_data_point(base + Duration::seconds(i as i64), *v);
557            manager.process(ts, fields).unwrap();
558        }
559
560        let completed = manager.take_completed();
561        assert_eq!(completed.len(), 1);
562        assert_eq!(completed[0].aggregates.get("min"), Some(&1.0));
563        assert_eq!(completed[0].aggregates.get("max"), Some(&5.0));
564        // Standard deviation of [1,2,3,4,5] is sqrt(2) ≈ 1.414
565        let std = completed[0].aggregates.get("std").unwrap();
566        assert!((std - 1.414).abs() < 0.01);
567    }
568
569    #[test]
570    fn test_flush() {
571        let mut manager = WindowManager::new(WindowType::tumbling(Duration::seconds(10)));
572        manager.aggregate("value", AggregateFunction::Sum, "sum");
573
574        let base = DateTime::from_timestamp(1000000000, 0).unwrap();
575        let (ts, fields) = make_data_point(base, 42.0);
576        manager.process(ts, fields).unwrap();
577
578        // Flush should emit partial window
579        let results = manager.flush().unwrap();
580        assert_eq!(results.len(), 1);
581        assert_eq!(results[0].aggregates.get("sum"), Some(&42.0));
582    }
583}