Skip to main content

tensorlogic_infer/
windowed_aggregation.rs

1//! Windowed aggregation over tensor data streams.
2//!
3//! Supports tumbling windows (non-overlapping), sliding windows (overlapping),
4//! session windows (gap-based), and count-based windows.
5
6/// Type of window to apply.
7#[derive(Debug, Clone, Copy, PartialEq)]
8pub enum WindowType {
9    /// Non-overlapping windows of fixed duration in milliseconds.
10    Tumbling { size_ms: u64 },
11    /// Overlapping windows with a fixed size and advance step.
12    Sliding { size_ms: u64, step_ms: u64 },
13    /// Session windows separated by inactivity gaps.
14    Session { gap_ms: u64 },
15    /// Count-based window of `size` elements that advances by `step` elements.
16    Count { size: usize, step: usize },
17}
18
19/// Aggregation function applied within each window.
20#[derive(Debug, Clone, Copy, PartialEq, Eq)]
21pub enum WindowAggregation {
22    /// Sum of all values.
23    Sum,
24    /// Arithmetic mean.
25    Mean,
26    /// Maximum value.
27    Max,
28    /// Minimum value.
29    Min,
30    /// Number of elements (as f64).
31    Count,
32    /// Last (most recent) value in the window.
33    LastValue,
34    /// First (earliest) value in the window.
35    FirstValue,
36}
37
38/// Configuration for windowed aggregation.
39#[derive(Debug, Clone)]
40pub struct WindowConfig {
41    /// Window type (Tumbling, Sliding, Session, or Count).
42    pub window_type: WindowType,
43    /// Aggregation function to apply within each window.
44    pub aggregation: WindowAggregation,
45    /// Emit a partial result for windows that do not span a complete interval.
46    pub emit_partial: bool,
47    /// Minimum number of elements required before a window result is emitted.
48    pub min_elements: usize,
49}
50
51impl WindowConfig {
52    /// Create a tumbling window configuration.
53    pub fn tumbling(size_ms: u64, aggregation: WindowAggregation) -> Self {
54        WindowConfig {
55            window_type: WindowType::Tumbling { size_ms },
56            aggregation,
57            emit_partial: false,
58            min_elements: 1,
59        }
60    }
61
62    /// Create a sliding window configuration.
63    pub fn sliding(size_ms: u64, step_ms: u64, aggregation: WindowAggregation) -> Self {
64        WindowConfig {
65            window_type: WindowType::Sliding { size_ms, step_ms },
66            aggregation,
67            emit_partial: false,
68            min_elements: 1,
69        }
70    }
71
72    /// Create a count-based window configuration.
73    pub fn count(size: usize, step: usize, aggregation: WindowAggregation) -> Self {
74        WindowConfig {
75            window_type: WindowType::Count { size, step },
76            aggregation,
77            emit_partial: false,
78            min_elements: 1,
79        }
80    }
81
82    /// Set whether partial (incomplete) windows should be emitted.
83    pub fn with_emit_partial(mut self, emit: bool) -> Self {
84        self.emit_partial = emit;
85        self
86    }
87
88    /// Set the minimum number of elements required to emit a window.
89    pub fn with_min_elements(mut self, min: usize) -> Self {
90        self.min_elements = min;
91        self
92    }
93}
94
95/// The result of processing a single window.
96#[derive(Debug, Clone)]
97pub struct WindowResult {
98    /// Window start time in milliseconds (0 for count-based windows, counts as index).
99    pub start_ms: u64,
100    /// Window end time in milliseconds (equal to end-index for count-based windows).
101    pub end_ms: u64,
102    /// Number of data elements contained in this window.
103    pub element_count: usize,
104    /// Aggregated value for this window.
105    pub value: f64,
106    /// Whether this window is complete (as opposed to a partial/trailing window).
107    pub is_complete: bool,
108}
109
110/// Main windowed aggregation processor.
111pub struct WindowedAggregation {
112    config: WindowConfig,
113}
114
115impl WindowedAggregation {
116    /// Create a new processor with the given configuration.
117    pub fn new(config: WindowConfig) -> Self {
118        WindowedAggregation { config }
119    }
120
121    /// Process a sequence of `(timestamp_ms, value)` pairs using tumbling windows.
122    ///
123    /// Each event is assigned to exactly one window `[start, start + size_ms)`.
124    pub fn process_tumbling(&self, events: &[(u64, f64)]) -> Vec<WindowResult> {
125        if events.is_empty() {
126            return Vec::new();
127        }
128
129        let size_ms = match self.config.window_type {
130            WindowType::Tumbling { size_ms } => size_ms,
131            _ => return Vec::new(),
132        };
133
134        let first_ts = events[0].0;
135        // Align window start to a multiple of size_ms relative to the first event.
136        let window_start_base = (first_ts / size_ms) * size_ms;
137
138        let last_ts = events.iter().map(|(t, _)| *t).max().unwrap_or(first_ts);
139        // Number of complete windows needed to cover all events.
140        let num_windows = ((last_ts.saturating_sub(window_start_base)) / size_ms + 1) as usize;
141
142        let mut results = Vec::new();
143
144        for i in 0..num_windows {
145            let start = window_start_base + i as u64 * size_ms;
146            let end = start + size_ms;
147
148            let window_values: Vec<f64> = events
149                .iter()
150                .filter(|(t, _)| *t >= start && *t < end)
151                .map(|(_, v)| *v)
152                .collect();
153
154            if window_values.len() < self.config.min_elements {
155                continue;
156            }
157
158            // A tumbling window is "partial" only when emit_partial is enabled AND
159            // the stream ends before the window's upper boundary. When emit_partial is
160            // false we only emit windows that have at least min_elements events;
161            // in that scenario every emitted window is considered complete.
162            // When emit_partial is true, the last window may be incomplete if the stream
163            // ended before filling the window boundary.
164            let is_complete = if self.config.emit_partial {
165                // Under emit_partial semantics, mark the window as partial when the
166                // stream does not reach the window end.
167                last_ts >= end
168            } else {
169                // Without emit_partial, we only emit "full-enough" windows and
170                // consider all of them complete.
171                true
172            };
173
174            if !is_complete && !self.config.emit_partial {
175                // This branch is unreachable since is_complete=true when !emit_partial,
176                // but kept for clarity.
177                continue;
178            }
179
180            let value = WindowedAggregation::aggregate(&window_values, self.config.aggregation);
181            results.push(WindowResult {
182                start_ms: start,
183                end_ms: end,
184                element_count: window_values.len(),
185                value,
186                is_complete,
187            });
188        }
189
190        results
191    }
192
193    /// Process a sequence of `(timestamp_ms, value)` pairs using sliding windows.
194    ///
195    /// Windows overlap: each window advances by `step_ms` and spans `size_ms`.
196    pub fn process_sliding(&self, events: &[(u64, f64)]) -> Vec<WindowResult> {
197        if events.is_empty() {
198            return Vec::new();
199        }
200
201        let (size_ms, step_ms) = match self.config.window_type {
202            WindowType::Sliding { size_ms, step_ms } => (size_ms, step_ms),
203            _ => return Vec::new(),
204        };
205
206        let first_ts = events[0].0;
207        let last_ts = events.iter().map(|(t, _)| *t).max().unwrap_or(first_ts);
208
209        // Align the first window to a step_ms boundary.
210        let start_base = (first_ts / step_ms) * step_ms;
211
212        let mut results = Vec::new();
213        let mut window_start = start_base;
214
215        loop {
216            if window_start > last_ts {
217                break;
218            }
219            let window_end = window_start + size_ms;
220
221            let window_values: Vec<f64> = events
222                .iter()
223                .filter(|(t, _)| *t >= window_start && *t < window_end)
224                .map(|(_, v)| *v)
225                .collect();
226
227            if window_values.len() >= self.config.min_elements {
228                // When emit_partial is false, every non-empty window is considered
229                // complete (batch semantics). Under emit_partial mode, mark the
230                // window as partial when the stream has not advanced past the window end.
231                let is_complete = if self.config.emit_partial {
232                    last_ts >= window_end
233                } else {
234                    true
235                };
236                if is_complete || self.config.emit_partial {
237                    let value =
238                        WindowedAggregation::aggregate(&window_values, self.config.aggregation);
239                    results.push(WindowResult {
240                        start_ms: window_start,
241                        end_ms: window_end,
242                        element_count: window_values.len(),
243                        value,
244                        is_complete,
245                    });
246                }
247            }
248
249            window_start += step_ms;
250        }
251
252        results
253    }
254
255    /// Process a flat slice of values using count-based windows.
256    ///
257    /// Window `i` covers `values[i*step .. i*step+size]`.
258    pub fn process_count(&self, values: &[f64]) -> Vec<WindowResult> {
259        if values.is_empty() {
260            return Vec::new();
261        }
262
263        let (size, step) = match self.config.window_type {
264            WindowType::Count { size, step } => (size, step),
265            _ => return Vec::new(),
266        };
267
268        if step == 0 || size == 0 {
269            return Vec::new();
270        }
271
272        let mut results = Vec::new();
273        let mut offset = 0usize;
274
275        loop {
276            if offset >= values.len() {
277                break;
278            }
279            let end = (offset + size).min(values.len());
280            let window_values = &values[offset..end];
281
282            let is_complete = offset + size <= values.len();
283
284            if window_values.len() < self.config.min_elements {
285                break;
286            }
287
288            if !is_complete && !self.config.emit_partial {
289                break;
290            }
291
292            let value = WindowedAggregation::aggregate(window_values, self.config.aggregation);
293            results.push(WindowResult {
294                start_ms: offset as u64,
295                end_ms: (offset + size) as u64,
296                element_count: window_values.len(),
297                value,
298                is_complete,
299            });
300
301            offset += step;
302        }
303
304        results
305    }
306
307    /// Process session windows: start a new session whenever the gap between
308    /// consecutive events exceeds `gap_ms`.
309    pub fn process_session(&self, events: &[(u64, f64)]) -> Vec<WindowResult> {
310        if events.is_empty() {
311            return Vec::new();
312        }
313
314        let gap_ms = match self.config.window_type {
315            WindowType::Session { gap_ms } => gap_ms,
316            _ => return Vec::new(),
317        };
318
319        let mut results = Vec::new();
320        let mut session_start = events[0].0;
321        let mut session_values: Vec<f64> = Vec::new();
322
323        for (idx, (ts, val)) in events.iter().enumerate() {
324            if idx > 0 {
325                let prev_ts = events[idx - 1].0;
326                let gap = ts.saturating_sub(prev_ts);
327                if gap > gap_ms {
328                    // Close the current session.
329                    if session_values.len() >= self.config.min_elements {
330                        let value = WindowedAggregation::aggregate(
331                            &session_values,
332                            self.config.aggregation,
333                        );
334                        results.push(WindowResult {
335                            start_ms: session_start,
336                            end_ms: events[idx - 1].0,
337                            element_count: session_values.len(),
338                            value,
339                            is_complete: true,
340                        });
341                    }
342                    // Begin new session.
343                    session_start = *ts;
344                    session_values.clear();
345                }
346            }
347            session_values.push(*val);
348        }
349
350        // Emit the final session.
351        if session_values.len() >= self.config.min_elements {
352            let value = WindowedAggregation::aggregate(&session_values, self.config.aggregation);
353            results.push(WindowResult {
354                start_ms: session_start,
355                end_ms: events.last().map(|(t, _)| *t).unwrap_or(session_start),
356                element_count: session_values.len(),
357                value,
358                is_complete: true,
359            });
360        }
361
362        results
363    }
364
365    /// Apply an aggregation function to a slice of values.
366    pub fn aggregate(values: &[f64], agg: WindowAggregation) -> f64 {
367        match agg {
368            WindowAggregation::Sum => values.iter().copied().fold(0.0_f64, |acc, v| acc + v),
369            WindowAggregation::Mean => {
370                if values.is_empty() {
371                    0.0
372                } else {
373                    let sum: f64 = values.iter().copied().fold(0.0_f64, |acc, v| acc + v);
374                    sum / values.len() as f64
375                }
376            }
377            WindowAggregation::Max => values.iter().copied().fold(f64::NEG_INFINITY, f64::max),
378            WindowAggregation::Min => values.iter().copied().fold(f64::INFINITY, f64::min),
379            WindowAggregation::Count => values.len() as f64,
380            WindowAggregation::LastValue => values.last().copied().unwrap_or(0.0),
381            WindowAggregation::FirstValue => values.first().copied().unwrap_or(0.0),
382        }
383    }
384
385    /// Process events using the window type specified in the configuration.
386    pub fn process(&self, events: &[(u64, f64)]) -> Vec<WindowResult> {
387        match self.config.window_type {
388            WindowType::Tumbling { .. } => self.process_tumbling(events),
389            WindowType::Sliding { .. } => self.process_sliding(events),
390            WindowType::Session { .. } => self.process_session(events),
391            WindowType::Count { .. } => {
392                // For count windows, extract only the values (timestamps are ignored).
393                let values: Vec<f64> = events.iter().map(|(_, v)| *v).collect();
394                self.process_count(&values)
395            }
396        }
397    }
398}
399
400/// Error type for windowed aggregation operations.
401#[derive(Debug, thiserror::Error)]
402pub enum WindowError {
403    /// The sliding step size exceeds the window size.
404    #[error("Step size {step} must be <= window size {size}")]
405    StepExceedsSize { step: u64, size: u64 },
406    /// The event stream was empty.
407    #[error("Empty event stream")]
408    EmptyStream,
409    /// The window configuration is invalid.
410    #[error("Invalid window configuration: {0}")]
411    InvalidConfig(String),
412}
413
414// ============================================================
415// Tests
416// ============================================================
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421
422    // ---- Tumbling window tests ----
423
424    #[test]
425    fn test_tumbling_sum_basic() {
426        // 10 events at 0, 100, 200, ..., 900 ms, all value 1.0
427        let events: Vec<(u64, f64)> = (0..10u64).map(|i| (i * 100, 1.0)).collect();
428        let cfg = WindowConfig::tumbling(500, WindowAggregation::Sum);
429        let wa = WindowedAggregation::new(cfg);
430        let results = wa.process_tumbling(&events);
431
432        assert_eq!(results.len(), 2, "Expected 2 tumbling windows");
433        assert_eq!(results[0].element_count, 5);
434        assert!((results[0].value - 5.0).abs() < 1e-9);
435        assert_eq!(results[1].element_count, 5);
436        assert!((results[1].value - 5.0).abs() < 1e-9);
437        assert!(results[0].is_complete);
438        assert!(results[1].is_complete);
439    }
440
441    #[test]
442    fn test_tumbling_non_overlapping() {
443        // Each event should be counted in exactly one window.
444        let events: Vec<(u64, f64)> = (0..10u64).map(|i| (i * 100, 1.0)).collect();
445        let cfg = WindowConfig::tumbling(500, WindowAggregation::Count);
446        let wa = WindowedAggregation::new(cfg);
447        let results = wa.process_tumbling(&events);
448
449        let total_counted: usize = results.iter().map(|r| r.element_count).sum();
450        assert_eq!(
451            total_counted, 10,
452            "Each event must appear in exactly one window"
453        );
454    }
455
456    #[test]
457    fn test_tumbling_empty_returns_empty() {
458        let cfg = WindowConfig::tumbling(500, WindowAggregation::Sum);
459        let wa = WindowedAggregation::new(cfg);
460        let results = wa.process_tumbling(&[]);
461        assert!(results.is_empty());
462    }
463
464    #[test]
465    fn test_tumbling_with_partial_window() {
466        // 7 events at 0..=600 ms (step 100), window size 500 ms, emit_partial=true
467        let events: Vec<(u64, f64)> = (0..7u64).map(|i| (i * 100, 1.0)).collect();
468        let cfg = WindowConfig::tumbling(500, WindowAggregation::Sum).with_emit_partial(true);
469        let wa = WindowedAggregation::new(cfg);
470        let results = wa.process_tumbling(&events);
471
472        assert_eq!(results.len(), 2, "Expected complete + partial window");
473        // First window: events at 0,100,200,300,400 → sum=5
474        assert_eq!(results[0].element_count, 5);
475        assert!((results[0].value - 5.0).abs() < 1e-9);
476        assert!(results[0].is_complete);
477        // Second window: events at 500,600 → sum=2, incomplete
478        assert_eq!(results[1].element_count, 2);
479        assert!((results[1].value - 2.0).abs() < 1e-9);
480        assert!(!results[1].is_complete);
481    }
482
483    // ---- Sliding window tests ----
484
485    #[test]
486    fn test_sliding_overlapping_windows() {
487        // 5 events at 0,100,200,300,400; sliding(size=300, step=100, Sum)
488        let events: Vec<(u64, f64)> = (0..5u64).map(|i| (i * 100, 1.0)).collect();
489        let cfg = WindowConfig::sliding(300, 100, WindowAggregation::Sum);
490        let wa = WindowedAggregation::new(cfg);
491        let results = wa.process_sliding(&events);
492
493        // Should produce more than one window (windows overlap).
494        assert!(!results.is_empty());
495        // First complete window [0,300): events 0,100,200 → sum=3
496        assert!(
497            (results[0].value - 3.0).abs() < 1e-9,
498            "First window sum should be 3.0, got {}",
499            results[0].value
500        );
501    }
502
503    #[test]
504    fn test_sliding_step_equals_size_is_tumbling() {
505        // Sliding with step == size behaves like tumbling.
506        let events: Vec<(u64, f64)> = (0..10u64).map(|i| (i * 100, 1.0)).collect();
507        let cfg_sliding = WindowConfig::sliding(500, 500, WindowAggregation::Sum);
508        let cfg_tumbling = WindowConfig::tumbling(500, WindowAggregation::Sum);
509        let wa_s = WindowedAggregation::new(cfg_sliding);
510        let wa_t = WindowedAggregation::new(cfg_tumbling);
511        let sliding_results = wa_s.process_sliding(&events);
512        let tumbling_results = wa_t.process_tumbling(&events);
513        assert_eq!(
514            sliding_results.len(),
515            tumbling_results.len(),
516            "Sliding with step==size should match tumbling"
517        );
518        for (s, t) in sliding_results.iter().zip(tumbling_results.iter()) {
519            assert!((s.value - t.value).abs() < 1e-9);
520        }
521    }
522
523    #[test]
524    fn test_sliding_mean() {
525        // 3 events at 0,1,2 ms, values 1.0,2.0,3.0
526        // Sliding(size=3ms, step=1ms, Mean), emit_partial=true
527        let events = vec![(0u64, 1.0_f64), (1, 2.0), (2, 3.0)];
528        let cfg = WindowConfig::sliding(3, 1, WindowAggregation::Mean).with_emit_partial(true);
529        let wa = WindowedAggregation::new(cfg);
530        let results = wa.process_sliding(&events);
531        // First window [0,3): mean of 1,2,3 = 2.0
532        assert!(!results.is_empty());
533        assert!(
534            (results[0].value - 2.0).abs() < 1e-9,
535            "Mean should be 2.0, got {}",
536            results[0].value
537        );
538    }
539
540    // ---- Count window tests ----
541
542    #[test]
543    fn test_count_window_basic() {
544        // 9 values [1..=9], count(3, 3, Sum) → 3 non-overlapping windows
545        let values: Vec<f64> = (1..=9u32).map(|v| v as f64).collect();
546        let cfg = WindowConfig::count(3, 3, WindowAggregation::Sum);
547        let wa = WindowedAggregation::new(cfg);
548        let results = wa.process_count(&values);
549
550        assert_eq!(results.len(), 3);
551        assert!((results[0].value - 6.0).abs() < 1e-9); // 1+2+3
552        assert!((results[1].value - 15.0).abs() < 1e-9); // 4+5+6
553        assert!((results[2].value - 24.0).abs() < 1e-9); // 7+8+9
554    }
555
556    #[test]
557    fn test_count_window_sliding() {
558        // 8 values [1..=8], count(4, 2, Sum)
559        // Windows: [1,2,3,4]→10, [3,4,5,6]→18, [5,6,7,8]→26
560        let values: Vec<f64> = (1..=8u32).map(|v| v as f64).collect();
561        let cfg = WindowConfig::count(4, 2, WindowAggregation::Sum);
562        let wa = WindowedAggregation::new(cfg);
563        let results = wa.process_count(&values);
564
565        assert_eq!(results.len(), 3);
566        assert!((results[0].value - 10.0).abs() < 1e-9);
567        assert!((results[1].value - 18.0).abs() < 1e-9);
568        assert!((results[2].value - 26.0).abs() < 1e-9);
569    }
570
571    #[test]
572    fn test_count_window_min() {
573        // 6 values [3,1,4,1,5,9], count(3,3,Min)
574        let values = vec![3.0_f64, 1.0, 4.0, 1.0, 5.0, 9.0];
575        let cfg = WindowConfig::count(3, 3, WindowAggregation::Min);
576        let wa = WindowedAggregation::new(cfg);
577        let results = wa.process_count(&values);
578
579        assert_eq!(results.len(), 2);
580        assert!((results[0].value - 1.0).abs() < 1e-9); // min(3,1,4)=1
581        assert!((results[1].value - 1.0).abs() < 1e-9); // min(1,5,9)=1
582    }
583
584    // ---- Aggregate + session tests ----
585
586    #[test]
587    fn test_aggregate_all_strategies() {
588        let values = vec![1.0_f64, 2.0, 3.0, 4.0, 5.0];
589        assert!(
590            (WindowedAggregation::aggregate(&values, WindowAggregation::Sum) - 15.0).abs() < 1e-9
591        );
592        assert!(
593            (WindowedAggregation::aggregate(&values, WindowAggregation::Mean) - 3.0).abs() < 1e-9
594        );
595        assert!(
596            (WindowedAggregation::aggregate(&values, WindowAggregation::Max) - 5.0).abs() < 1e-9
597        );
598        assert!(
599            (WindowedAggregation::aggregate(&values, WindowAggregation::Min) - 1.0).abs() < 1e-9
600        );
601        assert!(
602            (WindowedAggregation::aggregate(&values, WindowAggregation::Count) - 5.0).abs() < 1e-9
603        );
604        assert!(
605            (WindowedAggregation::aggregate(&values, WindowAggregation::FirstValue) - 1.0).abs()
606                < 1e-9
607        );
608        assert!(
609            (WindowedAggregation::aggregate(&values, WindowAggregation::LastValue) - 5.0).abs()
610                < 1e-9
611        );
612    }
613
614    #[test]
615    fn test_session_window_gap_detection() {
616        // Events: (0,1),(100,2),(200,3) — gap < 500ms, then (1500,4),(1600,5) — gap > 500ms
617        let events = vec![
618            (0u64, 1.0_f64),
619            (100, 2.0),
620            (200, 3.0),
621            (1500, 4.0),
622            (1600, 5.0),
623        ];
624        let cfg = WindowConfig {
625            window_type: WindowType::Session { gap_ms: 500 },
626            aggregation: WindowAggregation::Sum,
627            emit_partial: false,
628            min_elements: 1,
629        };
630        let wa = WindowedAggregation::new(cfg);
631        let results = wa.process_session(&events);
632
633        assert_eq!(results.len(), 2, "Expected 2 sessions");
634        assert_eq!(
635            results[0].element_count, 3,
636            "First session should have 3 elements"
637        );
638        assert_eq!(
639            results[1].element_count, 2,
640            "Second session should have 2 elements"
641        );
642        assert!(results[0].is_complete);
643        assert!(results[1].is_complete);
644    }
645}