Skip to main content

oxirs_stream/
window_function.rs

1//! Stream windowing functions: tumbling, sliding, and session windows.
2//!
3//! Provides stateless window computation over ordered time-series data points.
4
5use std::collections::HashMap;
6
7/// A single data point in a stream.
8#[derive(Debug, Clone, PartialEq)]
9pub struct DataPoint {
10    /// Epoch milliseconds.
11    pub timestamp_ms: i64,
12    /// Numeric value carried by this point.
13    pub value: f64,
14    /// Optional grouping key for `by_key` operations.
15    pub key: Option<String>,
16}
17
18/// Specifies how windows are formed.
19#[derive(Debug, Clone, PartialEq)]
20pub enum WindowType {
21    /// Non-overlapping windows of fixed `size_ms`.
22    Tumbling(i64),
23    /// Overlapping windows: each window spans `size_ms`, advanced by `step_ms`.
24    Sliding { size_ms: i64, step_ms: i64 },
25    /// Windows that close after `gap_ms` of inactivity.
26    Session { gap_ms: i64 },
27}
28
29/// A window containing zero or more data points.
30#[derive(Debug, Clone, PartialEq)]
31pub struct Window {
32    /// Inclusive start timestamp (ms).
33    pub start_ms: i64,
34    /// Exclusive end timestamp (ms).
35    pub end_ms: i64,
36    /// Data points that fall inside this window.
37    pub points: Vec<DataPoint>,
38}
39
40impl Window {
41    /// Returns `true` if this window contains no data points.
42    pub fn is_empty(&self) -> bool {
43        self.points.is_empty()
44    }
45
46    /// Window span in milliseconds (`end_ms - start_ms`).
47    pub fn duration_ms(&self) -> i64 {
48        self.end_ms - self.start_ms
49    }
50
51    /// Number of data points in this window.
52    pub fn point_count(&self) -> usize {
53        self.points.len()
54    }
55}
56
57/// Result of applying a window function to a data stream.
58#[derive(Debug, Clone)]
59pub struct WindowResult {
60    /// All produced windows (may include empty windows for tumbling/sliding).
61    pub windows: Vec<Window>,
62    /// Total number of data points processed across all windows.
63    pub total_points: usize,
64}
65
66/// Aggregated statistics for a single window.
67#[derive(Debug, Clone)]
68pub struct WindowAggregation {
69    /// The window this aggregation describes.
70    pub window: Window,
71    /// Number of data points.
72    pub count: usize,
73    /// Sum of all values.
74    pub sum: f64,
75    /// Minimum value (f64::MAX if empty).
76    pub min: f64,
77    /// Maximum value (f64::MIN if empty).
78    pub max: f64,
79    /// Mean of all values (0.0 if empty).
80    pub mean: f64,
81}
82
83/// Stateless windowing utility.
84pub struct WindowFunction;
85
86impl WindowFunction {
87    /// Apply the specified `WindowType` to `data` and return the result.
88    pub fn apply(data: &[DataPoint], window_type: &WindowType) -> WindowResult {
89        let windows = match window_type {
90            WindowType::Tumbling(size_ms) => Self::tumbling(data, *size_ms),
91            WindowType::Sliding { size_ms, step_ms } => Self::sliding(data, *size_ms, *step_ms),
92            WindowType::Session { gap_ms } => Self::session(data, *gap_ms),
93        };
94        let total_points = windows.iter().map(|w| w.points.len()).sum();
95        WindowResult {
96            windows,
97            total_points,
98        }
99    }
100
101    /// Compute non-overlapping (tumbling) windows of `size_ms` milliseconds.
102    ///
103    /// Windows are aligned to multiples of `size_ms` from epoch 0.
104    /// Empty windows between the first and last occupied window are included.
105    pub fn tumbling(data: &[DataPoint], size_ms: i64) -> Vec<Window> {
106        if data.is_empty() || size_ms <= 0 {
107            return Vec::new();
108        }
109
110        // Determine the range of windows needed.
111        let min_ts = data.iter().map(|p| p.timestamp_ms).min().unwrap_or(0);
112        let max_ts = data.iter().map(|p| p.timestamp_ms).max().unwrap_or(0);
113
114        let first_window_start = floor_div(min_ts, size_ms) * size_ms;
115        let last_window_start = floor_div(max_ts, size_ms) * size_ms;
116
117        let mut windows = Vec::new();
118        let mut current = first_window_start;
119        while current <= last_window_start {
120            let start = current;
121            let end = current + size_ms;
122            let points: Vec<DataPoint> = data
123                .iter()
124                .filter(|p| p.timestamp_ms >= start && p.timestamp_ms < end)
125                .cloned()
126                .collect();
127            windows.push(Window {
128                start_ms: start,
129                end_ms: end,
130                points,
131            });
132            current += size_ms;
133        }
134        windows
135    }
136
137    /// Compute overlapping (sliding) windows of `size_ms`, advanced by `step_ms`.
138    ///
139    /// A window `[t, t+size_ms)` is created for every `t = floor(first/step)*step, ...`
140    /// up to the last timestamp.  If `step_ms >= size_ms` the windows do not overlap.
141    pub fn sliding(data: &[DataPoint], size_ms: i64, step_ms: i64) -> Vec<Window> {
142        if data.is_empty() || size_ms <= 0 || step_ms <= 0 {
143            return Vec::new();
144        }
145
146        let min_ts = data.iter().map(|p| p.timestamp_ms).min().unwrap_or(0);
147        let max_ts = data.iter().map(|p| p.timestamp_ms).max().unwrap_or(0);
148
149        let first_window_start = floor_div(min_ts, step_ms) * step_ms;
150
151        let mut windows = Vec::new();
152        let mut current = first_window_start;
153        loop {
154            let start = current;
155            let end = current + size_ms;
156            // Keep windows that could contain at least the latest point.
157            if start > max_ts {
158                break;
159            }
160            let points: Vec<DataPoint> = data
161                .iter()
162                .filter(|p| p.timestamp_ms >= start && p.timestamp_ms < end)
163                .cloned()
164                .collect();
165            windows.push(Window {
166                start_ms: start,
167                end_ms: end,
168                points,
169            });
170            current += step_ms;
171        }
172        windows
173    }
174
175    /// Compute session windows: a new window opens on each point and closes
176    /// when the gap to the next point exceeds `gap_ms`.
177    pub fn session(data: &[DataPoint], gap_ms: i64) -> Vec<Window> {
178        if data.is_empty() || gap_ms <= 0 {
179            return Vec::new();
180        }
181
182        // Sort by timestamp (clone to avoid mutating caller's slice).
183        let mut sorted: Vec<DataPoint> = data.to_vec();
184        sorted.sort_by_key(|p| p.timestamp_ms);
185
186        let mut windows = Vec::new();
187        let mut session_points: Vec<DataPoint> = Vec::new();
188
189        for point in sorted {
190            if let Some(last) = session_points.last() {
191                if point.timestamp_ms - last.timestamp_ms > gap_ms {
192                    // Close current session.
193                    let start = session_points.first().map(|p| p.timestamp_ms).unwrap_or(0);
194                    let end = session_points
195                        .last()
196                        .map(|p| p.timestamp_ms + 1)
197                        .unwrap_or(1);
198                    windows.push(Window {
199                        start_ms: start,
200                        end_ms: end,
201                        points: std::mem::take(&mut session_points),
202                    });
203                }
204            }
205            session_points.push(point);
206        }
207
208        // Flush the remaining session.
209        if !session_points.is_empty() {
210            let start = session_points.first().map(|p| p.timestamp_ms).unwrap_or(0);
211            let end = session_points
212                .last()
213                .map(|p| p.timestamp_ms + 1)
214                .unwrap_or(1);
215            windows.push(Window {
216                start_ms: start,
217                end_ms: end,
218                points: session_points,
219            });
220        }
221
222        windows
223    }
224
225    /// Compute aggregated statistics for a single `window`.
226    pub fn aggregate(window: &Window) -> WindowAggregation {
227        let count = window.points.len();
228        if count == 0 {
229            return WindowAggregation {
230                window: window.clone(),
231                count: 0,
232                sum: 0.0,
233                min: f64::MAX,
234                max: f64::MIN,
235                mean: 0.0,
236            };
237        }
238        let sum: f64 = window.points.iter().map(|p| p.value).sum();
239        let min = window
240            .points
241            .iter()
242            .map(|p| p.value)
243            .fold(f64::MAX, f64::min);
244        let max = window
245            .points
246            .iter()
247            .map(|p| p.value)
248            .fold(f64::MIN, f64::max);
249        let mean = sum / count as f64;
250        WindowAggregation {
251            window: window.clone(),
252            count,
253            sum,
254            min,
255            max,
256            mean,
257        }
258    }
259
260    /// Aggregate every window in a `WindowResult`.
261    pub fn aggregate_all(result: &WindowResult) -> Vec<WindowAggregation> {
262        result.windows.iter().map(Self::aggregate).collect()
263    }
264
265    /// Group `data` by `DataPoint::key`, apply the window type to each group,
266    /// and return a map of key → `WindowResult`.
267    ///
268    /// Points with `key = None` are grouped under the empty string `""`.
269    pub fn by_key(data: &[DataPoint], window_type: &WindowType) -> HashMap<String, WindowResult> {
270        let mut groups: HashMap<String, Vec<DataPoint>> = HashMap::new();
271        for point in data {
272            let k = point.key.clone().unwrap_or_default();
273            groups.entry(k).or_default().push(point.clone());
274        }
275        groups
276            .into_iter()
277            .map(|(k, pts)| (k, Self::apply(&pts, window_type)))
278            .collect()
279    }
280}
281
282/// Integer floor division (handles negative timestamps correctly).
283fn floor_div(a: i64, b: i64) -> i64 {
284    let d = a / b;
285    // Adjust if the division rounded toward zero instead of floor.
286    if (a ^ b) < 0 && d * b != a {
287        d - 1
288    } else {
289        d
290    }
291}
292
293// ---------------------------------------------------------------------------
294// Tests
295// ---------------------------------------------------------------------------
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300
301    fn pts(timestamps: &[(i64, f64)]) -> Vec<DataPoint> {
302        timestamps
303            .iter()
304            .map(|&(ts, v)| DataPoint {
305                timestamp_ms: ts,
306                value: v,
307                key: None,
308            })
309            .collect()
310    }
311
312    fn keyed(ts: i64, value: f64, key: &str) -> DataPoint {
313        DataPoint {
314            timestamp_ms: ts,
315            value,
316            key: Some(key.to_string()),
317        }
318    }
319
320    // --- Tumbling window tests ---
321
322    #[test]
323    fn test_tumbling_basic() {
324        let data = pts(&[(0, 1.0), (500, 2.0), (1000, 3.0), (1500, 4.0)]);
325        let windows = WindowFunction::tumbling(&data, 1000);
326        assert_eq!(windows.len(), 2);
327        assert_eq!(windows[0].points.len(), 2); // 0, 500
328        assert_eq!(windows[1].points.len(), 2); // 1000, 1500
329    }
330
331    #[test]
332    fn test_tumbling_alignment() {
333        // All points in the same window
334        let data = pts(&[(100, 1.0), (200, 2.0), (300, 3.0)]);
335        let windows = WindowFunction::tumbling(&data, 1000);
336        assert_eq!(windows.len(), 1);
337        assert_eq!(windows[0].start_ms, 0);
338        assert_eq!(windows[0].end_ms, 1000);
339    }
340
341    #[test]
342    fn test_tumbling_empty_data() {
343        let windows = WindowFunction::tumbling(&[], 1000);
344        assert!(windows.is_empty());
345    }
346
347    #[test]
348    fn test_tumbling_single_point() {
349        let data = pts(&[(5000, 42.0)]);
350        let windows = WindowFunction::tumbling(&data, 1000);
351        assert_eq!(windows.len(), 1);
352        assert_eq!(windows[0].points.len(), 1);
353        assert_eq!(windows[0].start_ms, 5000);
354    }
355
356    #[test]
357    fn test_tumbling_with_empty_intermediate_window() {
358        // Gap between ts=0 and ts=3000 → windows [0,1000), [1000,2000), [2000,3000), [3000,4000)
359        let data = pts(&[(0, 1.0), (3500, 2.0)]);
360        let windows = WindowFunction::tumbling(&data, 1000);
361        // 4 windows: 0..1000, 1000..2000, 2000..3000, 3000..4000
362        assert_eq!(windows.len(), 4);
363        // Middle windows should be empty
364        assert!(windows[1].is_empty());
365        assert!(windows[2].is_empty());
366    }
367
368    #[test]
369    fn test_tumbling_exact_boundary() {
370        // ts=1000 is exactly on the boundary → belongs to [1000,2000)
371        let data = pts(&[(999, 1.0), (1000, 2.0)]);
372        let windows = WindowFunction::tumbling(&data, 1000);
373        assert_eq!(windows.len(), 2);
374        assert_eq!(windows[0].points.len(), 1); // ts=999
375        assert_eq!(windows[1].points.len(), 1); // ts=1000
376    }
377
378    // --- Sliding window tests ---
379
380    #[test]
381    fn test_sliding_basic() {
382        let data = pts(&[(0, 1.0), (200, 2.0), (400, 3.0), (600, 4.0)]);
383        let windows = WindowFunction::sliding(&data, 500, 200);
384        // Each window is 500ms wide, step 200ms
385        assert!(!windows.is_empty());
386        // Check overlaps: point at ts=200 should appear in multiple windows
387        let windows_containing_200: Vec<_> = windows
388            .iter()
389            .filter(|w| w.points.iter().any(|p| p.timestamp_ms == 200))
390            .collect();
391        assert!(
392            windows_containing_200.len() > 1,
393            "sliding windows must overlap"
394        );
395    }
396
397    #[test]
398    fn test_sliding_no_overlap_when_step_ge_size() {
399        let data = pts(&[(0, 1.0), (1000, 2.0), (2000, 3.0)]);
400        // step >= size → effectively tumbling
401        let windows = WindowFunction::sliding(&data, 1000, 1000);
402        // No point should appear in more than one window
403        for (i, w) in windows.iter().enumerate() {
404            for (j, other) in windows.iter().enumerate() {
405                if i == j {
406                    continue;
407                }
408                for p in &w.points {
409                    assert!(
410                        !other
411                            .points
412                            .iter()
413                            .any(|q| q.timestamp_ms == p.timestamp_ms),
414                        "point should not appear in multiple windows when step >= size"
415                    );
416                }
417            }
418        }
419    }
420
421    #[test]
422    fn test_sliding_empty_data() {
423        let windows = WindowFunction::sliding(&[], 500, 100);
424        assert!(windows.is_empty());
425    }
426
427    #[test]
428    fn test_sliding_single_point() {
429        let data = pts(&[(500, 7.0)]);
430        let windows = WindowFunction::sliding(&data, 1000, 500);
431        // At least one window must contain the point
432        assert!(windows.iter().any(|w| !w.is_empty()));
433    }
434
435    #[test]
436    fn test_sliding_step_greater_than_size() {
437        let data = pts(&[(0, 1.0), (100, 2.0), (200, 3.0)]);
438        // step > size → gaps between windows
439        let windows = WindowFunction::sliding(&data, 50, 200);
440        // Windows of 50ms wide should capture points individually
441        for w in &windows {
442            assert!(w.points.len() <= 1);
443        }
444    }
445
446    // --- Session window tests ---
447
448    #[test]
449    fn test_session_basic_gap_splitting() {
450        // Two clusters separated by > gap_ms
451        let data = pts(&[(0, 1.0), (100, 2.0), (200, 3.0), (5000, 4.0), (5100, 5.0)]);
452        let windows = WindowFunction::session(&data, 500);
453        assert_eq!(windows.len(), 2);
454        assert_eq!(windows[0].points.len(), 3);
455        assert_eq!(windows[1].points.len(), 2);
456    }
457
458    #[test]
459    fn test_session_no_split_within_gap() {
460        let data = pts(&[(0, 1.0), (100, 2.0), (200, 3.0)]);
461        let windows = WindowFunction::session(&data, 200);
462        // 200ms gap exactly → points 0-100 and 100-200 are within gap
463        assert_eq!(windows.len(), 1);
464    }
465
466    #[test]
467    fn test_session_single_point() {
468        let data = pts(&[(1000, 9.9)]);
469        let windows = WindowFunction::session(&data, 500);
470        assert_eq!(windows.len(), 1);
471        assert_eq!(windows[0].points.len(), 1);
472    }
473
474    #[test]
475    fn test_session_empty_data() {
476        let windows = WindowFunction::session(&[], 500);
477        assert!(windows.is_empty());
478    }
479
480    #[test]
481    fn test_session_multiple_gaps() {
482        let data = pts(&[(0, 1.0), (2000, 2.0), (4000, 3.0), (4100, 4.0)]);
483        let windows = WindowFunction::session(&data, 500);
484        assert_eq!(windows.len(), 3);
485    }
486
487    // --- Aggregate tests ---
488
489    #[test]
490    fn test_aggregate_count() {
491        let data = pts(&[(0, 1.0), (100, 2.0), (200, 3.0)]);
492        let win = Window {
493            start_ms: 0,
494            end_ms: 1000,
495            points: data,
496        };
497        let agg = WindowFunction::aggregate(&win);
498        assert_eq!(agg.count, 3);
499    }
500
501    #[test]
502    fn test_aggregate_sum() {
503        let data = pts(&[(0, 10.0), (100, 20.0), (200, 30.0)]);
504        let win = Window {
505            start_ms: 0,
506            end_ms: 1000,
507            points: data,
508        };
509        let agg = WindowFunction::aggregate(&win);
510        assert!((agg.sum - 60.0).abs() < 1e-9);
511    }
512
513    #[test]
514    fn test_aggregate_min() {
515        let data = pts(&[(0, 5.0), (100, 1.0), (200, 3.0)]);
516        let win = Window {
517            start_ms: 0,
518            end_ms: 1000,
519            points: data,
520        };
521        let agg = WindowFunction::aggregate(&win);
522        assert!((agg.min - 1.0).abs() < 1e-9);
523    }
524
525    #[test]
526    fn test_aggregate_max() {
527        let data = pts(&[(0, 5.0), (100, 1.0), (200, 3.0)]);
528        let win = Window {
529            start_ms: 0,
530            end_ms: 1000,
531            points: data,
532        };
533        let agg = WindowFunction::aggregate(&win);
534        assert!((agg.max - 5.0).abs() < 1e-9);
535    }
536
537    #[test]
538    fn test_aggregate_mean() {
539        let data = pts(&[(0, 2.0), (100, 4.0), (200, 6.0)]);
540        let win = Window {
541            start_ms: 0,
542            end_ms: 1000,
543            points: data,
544        };
545        let agg = WindowFunction::aggregate(&win);
546        assert!((agg.mean - 4.0).abs() < 1e-9);
547    }
548
549    #[test]
550    fn test_aggregate_empty_window() {
551        let win = Window {
552            start_ms: 0,
553            end_ms: 1000,
554            points: Vec::new(),
555        };
556        let agg = WindowFunction::aggregate(&win);
557        assert_eq!(agg.count, 0);
558        assert!((agg.sum - 0.0).abs() < 1e-9);
559        assert!((agg.mean - 0.0).abs() < 1e-9);
560    }
561
562    #[test]
563    fn test_aggregate_all() {
564        let data = pts(&[(0, 1.0), (500, 2.0), (1000, 3.0)]);
565        let result = WindowFunction::apply(&data, &WindowType::Tumbling(1000));
566        let aggs = WindowFunction::aggregate_all(&result);
567        assert_eq!(aggs.len(), result.windows.len());
568    }
569
570    // --- by_key tests ---
571
572    #[test]
573    fn test_by_key_groups_correctly() {
574        let data = vec![
575            keyed(0, 1.0, "a"),
576            keyed(100, 2.0, "b"),
577            keyed(200, 3.0, "a"),
578            keyed(300, 4.0, "b"),
579        ];
580        let groups = WindowFunction::by_key(&data, &WindowType::Tumbling(1000));
581        assert_eq!(groups.len(), 2);
582        let a_result = groups.get("a").expect("key a must exist");
583        assert_eq!(a_result.total_points, 2);
584        let b_result = groups.get("b").expect("key b must exist");
585        assert_eq!(b_result.total_points, 2);
586    }
587
588    #[test]
589    fn test_by_key_none_mapped_to_empty_string() {
590        let data = pts(&[(0, 1.0), (100, 2.0)]);
591        let groups = WindowFunction::by_key(&data, &WindowType::Tumbling(1000));
592        assert!(groups.contains_key(""));
593    }
594
595    #[test]
596    fn test_by_key_multiple_keys() {
597        let data = vec![keyed(0, 1.0, "x"), keyed(0, 2.0, "y"), keyed(0, 3.0, "z")];
598        let groups = WindowFunction::by_key(&data, &WindowType::Tumbling(1000));
599        assert_eq!(groups.len(), 3);
600    }
601
602    // --- WindowResult.total_points ---
603
604    #[test]
605    fn test_window_result_total_points() {
606        let data = pts(&[(0, 1.0), (500, 2.0), (1000, 3.0), (1500, 4.0)]);
607        let result = WindowFunction::apply(&data, &WindowType::Tumbling(1000));
608        // Each point appears in exactly one tumbling window.
609        assert_eq!(result.total_points, 4);
610    }
611
612    #[test]
613    fn test_window_result_total_points_sliding_overlap() {
614        let data = pts(&[(0, 1.0), (200, 2.0)]);
615        // A point may appear in more than one sliding window.
616        let result = WindowFunction::apply(
617            &data,
618            &WindowType::Sliding {
619                size_ms: 400,
620                step_ms: 100,
621            },
622        );
623        // With overlap, total_points ≥ actual unique points.
624        assert!(result.total_points >= 2);
625    }
626
627    // --- Window helper methods ---
628
629    #[test]
630    fn test_window_is_empty_true() {
631        let w = Window {
632            start_ms: 0,
633            end_ms: 1000,
634            points: Vec::new(),
635        };
636        assert!(w.is_empty());
637    }
638
639    #[test]
640    fn test_window_is_empty_false() {
641        let w = Window {
642            start_ms: 0,
643            end_ms: 1000,
644            points: vec![DataPoint {
645                timestamp_ms: 0,
646                value: 1.0,
647                key: None,
648            }],
649        };
650        assert!(!w.is_empty());
651    }
652
653    #[test]
654    fn test_window_duration_ms() {
655        let w = Window {
656            start_ms: 1000,
657            end_ms: 3000,
658            points: Vec::new(),
659        };
660        assert_eq!(w.duration_ms(), 2000);
661    }
662
663    #[test]
664    fn test_window_point_count() {
665        let data = pts(&[(0, 1.0), (100, 2.0)]);
666        let w = Window {
667            start_ms: 0,
668            end_ms: 1000,
669            points: data,
670        };
671        assert_eq!(w.point_count(), 2);
672    }
673
674    // --- apply() dispatch ---
675
676    #[test]
677    fn test_apply_tumbling() {
678        let data = pts(&[(0, 1.0)]);
679        let result = WindowFunction::apply(&data, &WindowType::Tumbling(1000));
680        assert_eq!(result.windows.len(), 1);
681    }
682
683    #[test]
684    fn test_apply_sliding() {
685        let data = pts(&[(0, 1.0)]);
686        let result = WindowFunction::apply(
687            &data,
688            &WindowType::Sliding {
689                size_ms: 1000,
690                step_ms: 500,
691            },
692        );
693        assert!(!result.windows.is_empty());
694    }
695
696    #[test]
697    fn test_apply_session() {
698        let data = pts(&[(0, 1.0)]);
699        let result = WindowFunction::apply(&data, &WindowType::Session { gap_ms: 500 });
700        assert_eq!(result.windows.len(), 1);
701    }
702
703    #[test]
704    fn test_tumbling_zero_size_returns_empty() {
705        let data = pts(&[(0, 1.0)]);
706        let windows = WindowFunction::tumbling(&data, 0);
707        assert!(windows.is_empty());
708    }
709
710    #[test]
711    fn test_sliding_zero_step_returns_empty() {
712        let data = pts(&[(0, 1.0)]);
713        let windows = WindowFunction::sliding(&data, 1000, 0);
714        assert!(windows.is_empty());
715    }
716
717    #[test]
718    fn test_aggregate_single_point() {
719        let data = pts(&[(500, 7.7)]);
720        let win = Window {
721            start_ms: 0,
722            end_ms: 1000,
723            points: data,
724        };
725        let agg = WindowFunction::aggregate(&win);
726        assert_eq!(agg.count, 1);
727        assert!((agg.min - 7.7).abs() < 1e-6);
728        assert!((agg.max - 7.7).abs() < 1e-6);
729        assert!((agg.mean - 7.7).abs() < 1e-6);
730    }
731
732    #[test]
733    fn test_tumbling_many_points() {
734        let data: Vec<DataPoint> = (0..100)
735            .map(|i| DataPoint {
736                timestamp_ms: i * 100,
737                value: i as f64,
738                key: None,
739            })
740            .collect();
741        let windows = WindowFunction::tumbling(&data, 1000);
742        // 100 points × 100ms = 10 seconds → 10 windows of 1000ms each
743        assert_eq!(windows.len(), 10);
744        for w in &windows {
745            assert_eq!(w.point_count(), 10);
746        }
747    }
748
749    #[test]
750    fn test_session_unsorted_input() {
751        // session() must sort by timestamp internally
752        let data = vec![
753            DataPoint {
754                timestamp_ms: 5000,
755                value: 3.0,
756                key: None,
757            },
758            DataPoint {
759                timestamp_ms: 0,
760                value: 1.0,
761                key: None,
762            },
763            DataPoint {
764                timestamp_ms: 100,
765                value: 2.0,
766                key: None,
767            },
768        ];
769        let windows = WindowFunction::session(&data, 500);
770        assert_eq!(windows.len(), 2); // [0,100] and [5000]
771    }
772
773    #[test]
774    fn test_by_key_empty_data() {
775        let groups = WindowFunction::by_key(&[], &WindowType::Tumbling(1000));
776        assert!(groups.is_empty());
777    }
778
779    #[test]
780    fn test_window_result_windows_count() {
781        let data = pts(&[(0, 1.0), (1000, 2.0), (2000, 3.0)]);
782        let result = WindowFunction::apply(&data, &WindowType::Tumbling(1000));
783        assert_eq!(result.windows.len(), 3);
784    }
785}