1use std::collections::HashMap;
6
7#[derive(Debug, Clone, PartialEq)]
9pub struct DataPoint {
10 pub timestamp_ms: i64,
12 pub value: f64,
14 pub key: Option<String>,
16}
17
18#[derive(Debug, Clone, PartialEq)]
20pub enum WindowType {
21 Tumbling(i64),
23 Sliding { size_ms: i64, step_ms: i64 },
25 Session { gap_ms: i64 },
27}
28
29#[derive(Debug, Clone, PartialEq)]
31pub struct Window {
32 pub start_ms: i64,
34 pub end_ms: i64,
36 pub points: Vec<DataPoint>,
38}
39
40impl Window {
41 pub fn is_empty(&self) -> bool {
43 self.points.is_empty()
44 }
45
46 pub fn duration_ms(&self) -> i64 {
48 self.end_ms - self.start_ms
49 }
50
51 pub fn point_count(&self) -> usize {
53 self.points.len()
54 }
55}
56
57#[derive(Debug, Clone)]
59pub struct WindowResult {
60 pub windows: Vec<Window>,
62 pub total_points: usize,
64}
65
66#[derive(Debug, Clone)]
68pub struct WindowAggregation {
69 pub window: Window,
71 pub count: usize,
73 pub sum: f64,
75 pub min: f64,
77 pub max: f64,
79 pub mean: f64,
81}
82
83pub struct WindowFunction;
85
86impl WindowFunction {
87 pub fn apply(data: &[DataPoint], window_type: &WindowType) -> WindowResult {
89 let windows = match window_type {
90 WindowType::Tumbling(size_ms) => Self::tumbling(data, *size_ms),
91 WindowType::Sliding { size_ms, step_ms } => Self::sliding(data, *size_ms, *step_ms),
92 WindowType::Session { gap_ms } => Self::session(data, *gap_ms),
93 };
94 let total_points = windows.iter().map(|w| w.points.len()).sum();
95 WindowResult {
96 windows,
97 total_points,
98 }
99 }
100
101 pub fn tumbling(data: &[DataPoint], size_ms: i64) -> Vec<Window> {
106 if data.is_empty() || size_ms <= 0 {
107 return Vec::new();
108 }
109
110 let min_ts = data.iter().map(|p| p.timestamp_ms).min().unwrap_or(0);
112 let max_ts = data.iter().map(|p| p.timestamp_ms).max().unwrap_or(0);
113
114 let first_window_start = floor_div(min_ts, size_ms) * size_ms;
115 let last_window_start = floor_div(max_ts, size_ms) * size_ms;
116
117 let mut windows = Vec::new();
118 let mut current = first_window_start;
119 while current <= last_window_start {
120 let start = current;
121 let end = current + size_ms;
122 let points: Vec<DataPoint> = data
123 .iter()
124 .filter(|p| p.timestamp_ms >= start && p.timestamp_ms < end)
125 .cloned()
126 .collect();
127 windows.push(Window {
128 start_ms: start,
129 end_ms: end,
130 points,
131 });
132 current += size_ms;
133 }
134 windows
135 }
136
137 pub fn sliding(data: &[DataPoint], size_ms: i64, step_ms: i64) -> Vec<Window> {
142 if data.is_empty() || size_ms <= 0 || step_ms <= 0 {
143 return Vec::new();
144 }
145
146 let min_ts = data.iter().map(|p| p.timestamp_ms).min().unwrap_or(0);
147 let max_ts = data.iter().map(|p| p.timestamp_ms).max().unwrap_or(0);
148
149 let first_window_start = floor_div(min_ts, step_ms) * step_ms;
150
151 let mut windows = Vec::new();
152 let mut current = first_window_start;
153 loop {
154 let start = current;
155 let end = current + size_ms;
156 if start > max_ts {
158 break;
159 }
160 let points: Vec<DataPoint> = data
161 .iter()
162 .filter(|p| p.timestamp_ms >= start && p.timestamp_ms < end)
163 .cloned()
164 .collect();
165 windows.push(Window {
166 start_ms: start,
167 end_ms: end,
168 points,
169 });
170 current += step_ms;
171 }
172 windows
173 }
174
175 pub fn session(data: &[DataPoint], gap_ms: i64) -> Vec<Window> {
178 if data.is_empty() || gap_ms <= 0 {
179 return Vec::new();
180 }
181
182 let mut sorted: Vec<DataPoint> = data.to_vec();
184 sorted.sort_by_key(|p| p.timestamp_ms);
185
186 let mut windows = Vec::new();
187 let mut session_points: Vec<DataPoint> = Vec::new();
188
189 for point in sorted {
190 if let Some(last) = session_points.last() {
191 if point.timestamp_ms - last.timestamp_ms > gap_ms {
192 let start = session_points.first().map(|p| p.timestamp_ms).unwrap_or(0);
194 let end = session_points
195 .last()
196 .map(|p| p.timestamp_ms + 1)
197 .unwrap_or(1);
198 windows.push(Window {
199 start_ms: start,
200 end_ms: end,
201 points: std::mem::take(&mut session_points),
202 });
203 }
204 }
205 session_points.push(point);
206 }
207
208 if !session_points.is_empty() {
210 let start = session_points.first().map(|p| p.timestamp_ms).unwrap_or(0);
211 let end = session_points
212 .last()
213 .map(|p| p.timestamp_ms + 1)
214 .unwrap_or(1);
215 windows.push(Window {
216 start_ms: start,
217 end_ms: end,
218 points: session_points,
219 });
220 }
221
222 windows
223 }
224
225 pub fn aggregate(window: &Window) -> WindowAggregation {
227 let count = window.points.len();
228 if count == 0 {
229 return WindowAggregation {
230 window: window.clone(),
231 count: 0,
232 sum: 0.0,
233 min: f64::MAX,
234 max: f64::MIN,
235 mean: 0.0,
236 };
237 }
238 let sum: f64 = window.points.iter().map(|p| p.value).sum();
239 let min = window
240 .points
241 .iter()
242 .map(|p| p.value)
243 .fold(f64::MAX, f64::min);
244 let max = window
245 .points
246 .iter()
247 .map(|p| p.value)
248 .fold(f64::MIN, f64::max);
249 let mean = sum / count as f64;
250 WindowAggregation {
251 window: window.clone(),
252 count,
253 sum,
254 min,
255 max,
256 mean,
257 }
258 }
259
260 pub fn aggregate_all(result: &WindowResult) -> Vec<WindowAggregation> {
262 result.windows.iter().map(Self::aggregate).collect()
263 }
264
265 pub fn by_key(data: &[DataPoint], window_type: &WindowType) -> HashMap<String, WindowResult> {
270 let mut groups: HashMap<String, Vec<DataPoint>> = HashMap::new();
271 for point in data {
272 let k = point.key.clone().unwrap_or_default();
273 groups.entry(k).or_default().push(point.clone());
274 }
275 groups
276 .into_iter()
277 .map(|(k, pts)| (k, Self::apply(&pts, window_type)))
278 .collect()
279 }
280}
281
282fn floor_div(a: i64, b: i64) -> i64 {
284 let d = a / b;
285 if (a ^ b) < 0 && d * b != a {
287 d - 1
288 } else {
289 d
290 }
291}
292
293#[cfg(test)]
298mod tests {
299 use super::*;
300
301 fn pts(timestamps: &[(i64, f64)]) -> Vec<DataPoint> {
302 timestamps
303 .iter()
304 .map(|&(ts, v)| DataPoint {
305 timestamp_ms: ts,
306 value: v,
307 key: None,
308 })
309 .collect()
310 }
311
312 fn keyed(ts: i64, value: f64, key: &str) -> DataPoint {
313 DataPoint {
314 timestamp_ms: ts,
315 value,
316 key: Some(key.to_string()),
317 }
318 }
319
320 #[test]
323 fn test_tumbling_basic() {
324 let data = pts(&[(0, 1.0), (500, 2.0), (1000, 3.0), (1500, 4.0)]);
325 let windows = WindowFunction::tumbling(&data, 1000);
326 assert_eq!(windows.len(), 2);
327 assert_eq!(windows[0].points.len(), 2); assert_eq!(windows[1].points.len(), 2); }
330
331 #[test]
332 fn test_tumbling_alignment() {
333 let data = pts(&[(100, 1.0), (200, 2.0), (300, 3.0)]);
335 let windows = WindowFunction::tumbling(&data, 1000);
336 assert_eq!(windows.len(), 1);
337 assert_eq!(windows[0].start_ms, 0);
338 assert_eq!(windows[0].end_ms, 1000);
339 }
340
341 #[test]
342 fn test_tumbling_empty_data() {
343 let windows = WindowFunction::tumbling(&[], 1000);
344 assert!(windows.is_empty());
345 }
346
347 #[test]
348 fn test_tumbling_single_point() {
349 let data = pts(&[(5000, 42.0)]);
350 let windows = WindowFunction::tumbling(&data, 1000);
351 assert_eq!(windows.len(), 1);
352 assert_eq!(windows[0].points.len(), 1);
353 assert_eq!(windows[0].start_ms, 5000);
354 }
355
356 #[test]
357 fn test_tumbling_with_empty_intermediate_window() {
358 let data = pts(&[(0, 1.0), (3500, 2.0)]);
360 let windows = WindowFunction::tumbling(&data, 1000);
361 assert_eq!(windows.len(), 4);
363 assert!(windows[1].is_empty());
365 assert!(windows[2].is_empty());
366 }
367
368 #[test]
369 fn test_tumbling_exact_boundary() {
370 let data = pts(&[(999, 1.0), (1000, 2.0)]);
372 let windows = WindowFunction::tumbling(&data, 1000);
373 assert_eq!(windows.len(), 2);
374 assert_eq!(windows[0].points.len(), 1); assert_eq!(windows[1].points.len(), 1); }
377
378 #[test]
381 fn test_sliding_basic() {
382 let data = pts(&[(0, 1.0), (200, 2.0), (400, 3.0), (600, 4.0)]);
383 let windows = WindowFunction::sliding(&data, 500, 200);
384 assert!(!windows.is_empty());
386 let windows_containing_200: Vec<_> = windows
388 .iter()
389 .filter(|w| w.points.iter().any(|p| p.timestamp_ms == 200))
390 .collect();
391 assert!(
392 windows_containing_200.len() > 1,
393 "sliding windows must overlap"
394 );
395 }
396
397 #[test]
398 fn test_sliding_no_overlap_when_step_ge_size() {
399 let data = pts(&[(0, 1.0), (1000, 2.0), (2000, 3.0)]);
400 let windows = WindowFunction::sliding(&data, 1000, 1000);
402 for (i, w) in windows.iter().enumerate() {
404 for (j, other) in windows.iter().enumerate() {
405 if i == j {
406 continue;
407 }
408 for p in &w.points {
409 assert!(
410 !other
411 .points
412 .iter()
413 .any(|q| q.timestamp_ms == p.timestamp_ms),
414 "point should not appear in multiple windows when step >= size"
415 );
416 }
417 }
418 }
419 }
420
421 #[test]
422 fn test_sliding_empty_data() {
423 let windows = WindowFunction::sliding(&[], 500, 100);
424 assert!(windows.is_empty());
425 }
426
427 #[test]
428 fn test_sliding_single_point() {
429 let data = pts(&[(500, 7.0)]);
430 let windows = WindowFunction::sliding(&data, 1000, 500);
431 assert!(windows.iter().any(|w| !w.is_empty()));
433 }
434
435 #[test]
436 fn test_sliding_step_greater_than_size() {
437 let data = pts(&[(0, 1.0), (100, 2.0), (200, 3.0)]);
438 let windows = WindowFunction::sliding(&data, 50, 200);
440 for w in &windows {
442 assert!(w.points.len() <= 1);
443 }
444 }
445
446 #[test]
449 fn test_session_basic_gap_splitting() {
450 let data = pts(&[(0, 1.0), (100, 2.0), (200, 3.0), (5000, 4.0), (5100, 5.0)]);
452 let windows = WindowFunction::session(&data, 500);
453 assert_eq!(windows.len(), 2);
454 assert_eq!(windows[0].points.len(), 3);
455 assert_eq!(windows[1].points.len(), 2);
456 }
457
458 #[test]
459 fn test_session_no_split_within_gap() {
460 let data = pts(&[(0, 1.0), (100, 2.0), (200, 3.0)]);
461 let windows = WindowFunction::session(&data, 200);
462 assert_eq!(windows.len(), 1);
464 }
465
466 #[test]
467 fn test_session_single_point() {
468 let data = pts(&[(1000, 9.9)]);
469 let windows = WindowFunction::session(&data, 500);
470 assert_eq!(windows.len(), 1);
471 assert_eq!(windows[0].points.len(), 1);
472 }
473
474 #[test]
475 fn test_session_empty_data() {
476 let windows = WindowFunction::session(&[], 500);
477 assert!(windows.is_empty());
478 }
479
480 #[test]
481 fn test_session_multiple_gaps() {
482 let data = pts(&[(0, 1.0), (2000, 2.0), (4000, 3.0), (4100, 4.0)]);
483 let windows = WindowFunction::session(&data, 500);
484 assert_eq!(windows.len(), 3);
485 }
486
487 #[test]
490 fn test_aggregate_count() {
491 let data = pts(&[(0, 1.0), (100, 2.0), (200, 3.0)]);
492 let win = Window {
493 start_ms: 0,
494 end_ms: 1000,
495 points: data,
496 };
497 let agg = WindowFunction::aggregate(&win);
498 assert_eq!(agg.count, 3);
499 }
500
501 #[test]
502 fn test_aggregate_sum() {
503 let data = pts(&[(0, 10.0), (100, 20.0), (200, 30.0)]);
504 let win = Window {
505 start_ms: 0,
506 end_ms: 1000,
507 points: data,
508 };
509 let agg = WindowFunction::aggregate(&win);
510 assert!((agg.sum - 60.0).abs() < 1e-9);
511 }
512
513 #[test]
514 fn test_aggregate_min() {
515 let data = pts(&[(0, 5.0), (100, 1.0), (200, 3.0)]);
516 let win = Window {
517 start_ms: 0,
518 end_ms: 1000,
519 points: data,
520 };
521 let agg = WindowFunction::aggregate(&win);
522 assert!((agg.min - 1.0).abs() < 1e-9);
523 }
524
525 #[test]
526 fn test_aggregate_max() {
527 let data = pts(&[(0, 5.0), (100, 1.0), (200, 3.0)]);
528 let win = Window {
529 start_ms: 0,
530 end_ms: 1000,
531 points: data,
532 };
533 let agg = WindowFunction::aggregate(&win);
534 assert!((agg.max - 5.0).abs() < 1e-9);
535 }
536
537 #[test]
538 fn test_aggregate_mean() {
539 let data = pts(&[(0, 2.0), (100, 4.0), (200, 6.0)]);
540 let win = Window {
541 start_ms: 0,
542 end_ms: 1000,
543 points: data,
544 };
545 let agg = WindowFunction::aggregate(&win);
546 assert!((agg.mean - 4.0).abs() < 1e-9);
547 }
548
549 #[test]
550 fn test_aggregate_empty_window() {
551 let win = Window {
552 start_ms: 0,
553 end_ms: 1000,
554 points: Vec::new(),
555 };
556 let agg = WindowFunction::aggregate(&win);
557 assert_eq!(agg.count, 0);
558 assert!((agg.sum - 0.0).abs() < 1e-9);
559 assert!((agg.mean - 0.0).abs() < 1e-9);
560 }
561
562 #[test]
563 fn test_aggregate_all() {
564 let data = pts(&[(0, 1.0), (500, 2.0), (1000, 3.0)]);
565 let result = WindowFunction::apply(&data, &WindowType::Tumbling(1000));
566 let aggs = WindowFunction::aggregate_all(&result);
567 assert_eq!(aggs.len(), result.windows.len());
568 }
569
570 #[test]
573 fn test_by_key_groups_correctly() {
574 let data = vec![
575 keyed(0, 1.0, "a"),
576 keyed(100, 2.0, "b"),
577 keyed(200, 3.0, "a"),
578 keyed(300, 4.0, "b"),
579 ];
580 let groups = WindowFunction::by_key(&data, &WindowType::Tumbling(1000));
581 assert_eq!(groups.len(), 2);
582 let a_result = groups.get("a").expect("key a must exist");
583 assert_eq!(a_result.total_points, 2);
584 let b_result = groups.get("b").expect("key b must exist");
585 assert_eq!(b_result.total_points, 2);
586 }
587
588 #[test]
589 fn test_by_key_none_mapped_to_empty_string() {
590 let data = pts(&[(0, 1.0), (100, 2.0)]);
591 let groups = WindowFunction::by_key(&data, &WindowType::Tumbling(1000));
592 assert!(groups.contains_key(""));
593 }
594
595 #[test]
596 fn test_by_key_multiple_keys() {
597 let data = vec![keyed(0, 1.0, "x"), keyed(0, 2.0, "y"), keyed(0, 3.0, "z")];
598 let groups = WindowFunction::by_key(&data, &WindowType::Tumbling(1000));
599 assert_eq!(groups.len(), 3);
600 }
601
602 #[test]
605 fn test_window_result_total_points() {
606 let data = pts(&[(0, 1.0), (500, 2.0), (1000, 3.0), (1500, 4.0)]);
607 let result = WindowFunction::apply(&data, &WindowType::Tumbling(1000));
608 assert_eq!(result.total_points, 4);
610 }
611
612 #[test]
613 fn test_window_result_total_points_sliding_overlap() {
614 let data = pts(&[(0, 1.0), (200, 2.0)]);
615 let result = WindowFunction::apply(
617 &data,
618 &WindowType::Sliding {
619 size_ms: 400,
620 step_ms: 100,
621 },
622 );
623 assert!(result.total_points >= 2);
625 }
626
627 #[test]
630 fn test_window_is_empty_true() {
631 let w = Window {
632 start_ms: 0,
633 end_ms: 1000,
634 points: Vec::new(),
635 };
636 assert!(w.is_empty());
637 }
638
639 #[test]
640 fn test_window_is_empty_false() {
641 let w = Window {
642 start_ms: 0,
643 end_ms: 1000,
644 points: vec![DataPoint {
645 timestamp_ms: 0,
646 value: 1.0,
647 key: None,
648 }],
649 };
650 assert!(!w.is_empty());
651 }
652
653 #[test]
654 fn test_window_duration_ms() {
655 let w = Window {
656 start_ms: 1000,
657 end_ms: 3000,
658 points: Vec::new(),
659 };
660 assert_eq!(w.duration_ms(), 2000);
661 }
662
663 #[test]
664 fn test_window_point_count() {
665 let data = pts(&[(0, 1.0), (100, 2.0)]);
666 let w = Window {
667 start_ms: 0,
668 end_ms: 1000,
669 points: data,
670 };
671 assert_eq!(w.point_count(), 2);
672 }
673
674 #[test]
677 fn test_apply_tumbling() {
678 let data = pts(&[(0, 1.0)]);
679 let result = WindowFunction::apply(&data, &WindowType::Tumbling(1000));
680 assert_eq!(result.windows.len(), 1);
681 }
682
683 #[test]
684 fn test_apply_sliding() {
685 let data = pts(&[(0, 1.0)]);
686 let result = WindowFunction::apply(
687 &data,
688 &WindowType::Sliding {
689 size_ms: 1000,
690 step_ms: 500,
691 },
692 );
693 assert!(!result.windows.is_empty());
694 }
695
696 #[test]
697 fn test_apply_session() {
698 let data = pts(&[(0, 1.0)]);
699 let result = WindowFunction::apply(&data, &WindowType::Session { gap_ms: 500 });
700 assert_eq!(result.windows.len(), 1);
701 }
702
703 #[test]
704 fn test_tumbling_zero_size_returns_empty() {
705 let data = pts(&[(0, 1.0)]);
706 let windows = WindowFunction::tumbling(&data, 0);
707 assert!(windows.is_empty());
708 }
709
710 #[test]
711 fn test_sliding_zero_step_returns_empty() {
712 let data = pts(&[(0, 1.0)]);
713 let windows = WindowFunction::sliding(&data, 1000, 0);
714 assert!(windows.is_empty());
715 }
716
717 #[test]
718 fn test_aggregate_single_point() {
719 let data = pts(&[(500, 7.7)]);
720 let win = Window {
721 start_ms: 0,
722 end_ms: 1000,
723 points: data,
724 };
725 let agg = WindowFunction::aggregate(&win);
726 assert_eq!(agg.count, 1);
727 assert!((agg.min - 7.7).abs() < 1e-6);
728 assert!((agg.max - 7.7).abs() < 1e-6);
729 assert!((agg.mean - 7.7).abs() < 1e-6);
730 }
731
732 #[test]
733 fn test_tumbling_many_points() {
734 let data: Vec<DataPoint> = (0..100)
735 .map(|i| DataPoint {
736 timestamp_ms: i * 100,
737 value: i as f64,
738 key: None,
739 })
740 .collect();
741 let windows = WindowFunction::tumbling(&data, 1000);
742 assert_eq!(windows.len(), 10);
744 for w in &windows {
745 assert_eq!(w.point_count(), 10);
746 }
747 }
748
749 #[test]
750 fn test_session_unsorted_input() {
751 let data = vec![
753 DataPoint {
754 timestamp_ms: 5000,
755 value: 3.0,
756 key: None,
757 },
758 DataPoint {
759 timestamp_ms: 0,
760 value: 1.0,
761 key: None,
762 },
763 DataPoint {
764 timestamp_ms: 100,
765 value: 2.0,
766 key: None,
767 },
768 ];
769 let windows = WindowFunction::session(&data, 500);
770 assert_eq!(windows.len(), 2); }
772
773 #[test]
774 fn test_by_key_empty_data() {
775 let groups = WindowFunction::by_key(&[], &WindowType::Tumbling(1000));
776 assert!(groups.is_empty());
777 }
778
779 #[test]
780 fn test_window_result_windows_count() {
781 let data = pts(&[(0, 1.0), (1000, 2.0), (2000, 3.0)]);
782 let result = WindowFunction::apply(&data, &WindowType::Tumbling(1000));
783 assert_eq!(result.windows.len(), 3);
784 }
785}