1use chrono::{DateTime, Utc};
18use serde::{Deserialize, Serialize};
19use std::collections::{BTreeMap, HashMap, VecDeque};
20use std::time::Duration;
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
28pub enum WindowKind {
29 Tumbling {
31 size: Duration,
33 },
34 Sliding {
36 size: Duration,
38 slide: Duration,
40 },
41 Session {
43 gap: Duration,
45 },
46 Count {
48 count: usize,
50 },
51}
52
53#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
55pub enum LatePolicy {
56 #[default]
58 Drop,
59 SideOutput,
61 AllowedLateness {
63 lateness: Duration,
65 },
66}
67
68#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct WindowAlgebraConfig {
71 pub kind: WindowKind,
73 pub late_policy: LatePolicy,
75 pub max_open_windows: usize,
77 pub emit_on_evict: bool,
79}
80
81impl Default for WindowAlgebraConfig {
82 fn default() -> Self {
83 Self {
84 kind: WindowKind::Tumbling {
85 size: Duration::from_secs(60),
86 },
87 late_policy: LatePolicy::default(),
88 max_open_windows: 10_000,
89 emit_on_evict: true,
90 }
91 }
92}
93
94#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
100pub struct WindowId {
101 pub start: i64,
103 pub end: i64,
105 pub key: Option<String>,
107}
108
109impl WindowId {
110 pub fn time_range(start_ms: i64, end_ms: i64) -> Self {
112 Self {
113 start: start_ms,
114 end: end_ms,
115 key: None,
116 }
117 }
118
119 pub fn keyed(start_ms: i64, end_ms: i64, key: impl Into<String>) -> Self {
121 Self {
122 start: start_ms,
123 end: end_ms,
124 key: Some(key.into()),
125 }
126 }
127
128 pub fn duration_ms(&self) -> i64 {
130 self.end - self.start
131 }
132
133 pub fn contains(&self, ts: i64) -> bool {
135 ts >= self.start && ts < self.end
136 }
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct WindowEvent<T: Clone> {
142 pub value: T,
144 pub timestamp_ms: i64,
146 pub ingestion_time: DateTime<Utc>,
148}
149
150impl<T: Clone> WindowEvent<T> {
151 pub fn new(value: T, timestamp_ms: i64) -> Self {
152 Self {
153 value,
154 timestamp_ms,
155 ingestion_time: Utc::now(),
156 }
157 }
158}
159
160#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
162pub enum WindowState {
163 Open,
165 Closing,
167 Closed,
169}
170
171#[derive(Debug, Clone)]
173pub struct WindowPane<T: Clone> {
174 pub id: WindowId,
176 pub events: Vec<WindowEvent<T>>,
178 pub state: WindowState,
180 pub created_at: DateTime<Utc>,
182 pub lateness_deadline_ms: Option<i64>,
184}
185
186impl<T: Clone> WindowPane<T> {
187 fn new(id: WindowId) -> Self {
188 Self {
189 id,
190 events: Vec::new(),
191 state: WindowState::Open,
192 created_at: Utc::now(),
193 lateness_deadline_ms: None,
194 }
195 }
196
197 fn new_with_lateness(id: WindowId, lateness_ms: i64) -> Self {
198 let deadline = id.end + lateness_ms;
199 Self {
200 id,
201 events: Vec::new(),
202 state: WindowState::Open,
203 created_at: Utc::now(),
204 lateness_deadline_ms: Some(deadline),
205 }
206 }
207
208 pub fn len(&self) -> usize {
210 self.events.len()
211 }
212
213 pub fn is_empty(&self) -> bool {
215 self.events.is_empty()
216 }
217
218 pub fn min_timestamp(&self) -> Option<i64> {
220 self.events.iter().map(|e| e.timestamp_ms).min()
221 }
222
223 pub fn max_timestamp(&self) -> Option<i64> {
225 self.events.iter().map(|e| e.timestamp_ms).max()
226 }
227}
228
229#[derive(Debug, Clone)]
231pub struct WindowOutput<T: Clone> {
232 pub window_id: WindowId,
234 pub events: Vec<WindowEvent<T>>,
236 pub is_partial: bool,
238 pub late_dropped: usize,
240 pub late_side_output: usize,
242}
243
244#[derive(Debug, Clone, Default, Serialize, Deserialize)]
246pub struct WindowAlgebraStats {
247 pub total_events: u64,
249 pub windows_opened: u64,
251 pub windows_closed: u64,
253 pub windows_evicted: u64,
255 pub late_events_dropped: u64,
257 pub late_events_side_output: u64,
259 pub late_events_accepted: u64,
261 pub open_windows: u64,
263}
264
265pub struct WindowAlgebra<T: Clone> {
273 config: WindowAlgebraConfig,
274 panes: BTreeMap<WindowId, WindowPane<T>>,
276 side_output: VecDeque<WindowEvent<T>>,
278 watermark_ms: i64,
280 stats: WindowAlgebraStats,
282 count_buffers: HashMap<Option<String>, Vec<WindowEvent<T>>>,
284 count_seq: i64,
286 session_last_event: HashMap<Option<String>, i64>,
288}
289
290impl<T: Clone> WindowAlgebra<T> {
291 pub fn new(config: WindowAlgebraConfig) -> Self {
293 Self {
294 config,
295 panes: BTreeMap::new(),
296 side_output: VecDeque::new(),
297 watermark_ms: i64::MIN,
298 stats: WindowAlgebraStats::default(),
299 count_buffers: HashMap::new(),
300 count_seq: 0,
301 session_last_event: HashMap::new(),
302 }
303 }
304
305 pub fn tumbling(size: Duration) -> Self {
307 Self::new(WindowAlgebraConfig {
308 kind: WindowKind::Tumbling { size },
309 ..Default::default()
310 })
311 }
312
313 pub fn sliding(size: Duration, slide: Duration) -> Self {
315 Self::new(WindowAlgebraConfig {
316 kind: WindowKind::Sliding { size, slide },
317 ..Default::default()
318 })
319 }
320
321 pub fn session(gap: Duration) -> Self {
323 Self::new(WindowAlgebraConfig {
324 kind: WindowKind::Session { gap },
325 ..Default::default()
326 })
327 }
328
329 pub fn count(count: usize) -> Self {
331 Self::new(WindowAlgebraConfig {
332 kind: WindowKind::Count { count },
333 ..Default::default()
334 })
335 }
336
337 pub fn with_late_policy(mut self, policy: LatePolicy) -> Self {
339 self.config.late_policy = policy;
340 self
341 }
342
343 pub fn with_max_open_windows(mut self, max: usize) -> Self {
345 self.config.max_open_windows = max;
346 self
347 }
348
349 pub fn stats(&self) -> &WindowAlgebraStats {
351 &self.stats
352 }
353
354 pub fn watermark_ms(&self) -> i64 {
356 self.watermark_ms
357 }
358
359 pub fn open_pane_count(&self) -> usize {
361 self.panes
362 .values()
363 .filter(|p| p.state == WindowState::Open || p.state == WindowState::Closing)
364 .count()
365 }
366
367 pub fn drain_side_output(&mut self) -> Vec<WindowEvent<T>> {
369 self.side_output.drain(..).collect()
370 }
371
372 pub fn advance_watermark(&mut self, watermark_ms: i64) -> Vec<WindowOutput<T>> {
374 if watermark_ms <= self.watermark_ms {
375 return Vec::new();
376 }
377 self.watermark_ms = watermark_ms;
378 self.close_expired_windows()
379 }
380
381 pub fn ingest(&mut self, event: WindowEvent<T>) -> Vec<WindowOutput<T>> {
383 self.stats.total_events += 1;
384
385 match &self.config.kind {
386 WindowKind::Tumbling { size } => self.ingest_tumbling(event, *size),
387 WindowKind::Sliding { size, slide } => self.ingest_sliding(event, *size, *slide),
388 WindowKind::Session { gap } => self.ingest_session(event, *gap),
389 WindowKind::Count { count } => self.ingest_count(event, *count),
390 }
391 }
392
393 pub fn ingest_batch(&mut self, events: Vec<WindowEvent<T>>) -> Vec<WindowOutput<T>> {
395 let mut outputs = Vec::new();
396 for event in events {
397 outputs.extend(self.ingest(event));
398 }
399 outputs
400 }
401
402 fn ingest_tumbling(&mut self, event: WindowEvent<T>, size: Duration) -> Vec<WindowOutput<T>> {
405 let size_ms = size.as_millis() as i64;
406 if size_ms == 0 {
407 return Vec::new();
408 }
409 let window_start = (event.timestamp_ms / size_ms) * size_ms;
410 let window_end = window_start + size_ms;
411 let wid = WindowId::time_range(window_start, window_end);
412
413 self.assign_to_window(event, wid)
414 }
415
416 fn ingest_sliding(
419 &mut self,
420 event: WindowEvent<T>,
421 size: Duration,
422 slide: Duration,
423 ) -> Vec<WindowOutput<T>> {
424 let size_ms = size.as_millis() as i64;
425 let slide_ms = slide.as_millis() as i64;
426 if slide_ms == 0 || size_ms == 0 {
427 return Vec::new();
428 }
429
430 let ts = event.timestamp_ms;
432 let latest_start = (ts / slide_ms) * slide_ms;
434 let earliest_start = latest_start - size_ms + slide_ms;
435
436 let mut outputs = Vec::new();
437 let mut start = earliest_start;
438 while start <= latest_start {
439 let end = start + size_ms;
440 if ts >= start && ts < end {
441 let wid = WindowId::time_range(start, end);
442 outputs.extend(self.assign_to_window(event.clone(), wid));
443 }
444 start += slide_ms;
445 }
446 outputs
447 }
448
449 fn ingest_session(&mut self, event: WindowEvent<T>, gap: Duration) -> Vec<WindowOutput<T>> {
452 let gap_ms = gap.as_millis() as i64;
453 let ts = event.timestamp_ms;
454 let key: Option<String> = None; let mut outputs = Vec::new();
457
458 if let Some(&last_ts) = self.session_last_event.get(&key) {
459 if ts - last_ts > gap_ms {
460 outputs.extend(self.close_session_windows(&key));
462 }
463 }
464
465 let active_wid = self.find_or_extend_active_session(&key, ts, gap_ms);
468 self.session_last_event.insert(key.clone(), ts);
469
470 outputs.extend(self.assign_to_window(event, active_wid));
471 outputs
472 }
473
474 fn find_or_extend_active_session(
475 &mut self,
476 key: &Option<String>,
477 ts: i64,
478 gap_ms: i64,
479 ) -> WindowId {
480 let existing_wid = self
482 .panes
483 .iter()
484 .find(|(wid, pane)| {
485 wid.key == *key && pane.state == WindowState::Open && ts >= wid.start
486 })
487 .map(|(wid, _)| wid.clone());
488
489 if let Some(old_wid) = existing_wid {
490 let new_end = ts + gap_ms;
491 if new_end == old_wid.end {
492 return old_wid;
494 }
495 let new_wid = WindowId {
497 start: old_wid.start,
498 end: new_end,
499 key: key.clone(),
500 };
501 if let Some(mut pane) = self.panes.remove(&old_wid) {
504 pane.id = new_wid.clone();
505 self.panes.insert(new_wid.clone(), pane);
506 }
507 new_wid
508 } else {
509 WindowId {
511 start: ts,
512 end: ts + gap_ms,
513 key: key.clone(),
514 }
515 }
516 }
517
518 fn close_session_windows(&mut self, key: &Option<String>) -> Vec<WindowOutput<T>> {
519 let mut outputs = Vec::new();
520 let wids_to_close: Vec<WindowId> = self
521 .panes
522 .keys()
523 .filter(|wid| wid.key == *key)
524 .cloned()
525 .collect();
526
527 for wid in wids_to_close {
528 if let Some(mut pane) = self.panes.remove(&wid) {
529 pane.state = WindowState::Closed;
530 self.stats.windows_closed += 1;
531 self.stats.open_windows = self.stats.open_windows.saturating_sub(1);
532 outputs.push(WindowOutput {
533 window_id: wid,
534 events: pane.events,
535 is_partial: false,
536 late_dropped: 0,
537 late_side_output: 0,
538 });
539 }
540 }
541 outputs
542 }
543
544 fn ingest_count(&mut self, event: WindowEvent<T>, count: usize) -> Vec<WindowOutput<T>> {
547 let key: Option<String> = None;
548 let buf = self.count_buffers.entry(key.clone()).or_default();
549 buf.push(event);
550
551 let mut outputs = Vec::new();
552 while buf.len() >= count {
553 let window_events: Vec<_> = buf.drain(..count).collect();
554 let seq = self.count_seq;
555 self.count_seq += 1;
556 let wid = WindowId {
557 start: seq * count as i64,
558 end: (seq + 1) * count as i64,
559 key: key.clone(),
560 };
561 self.stats.windows_opened += 1;
562 self.stats.windows_closed += 1;
563 outputs.push(WindowOutput {
564 window_id: wid,
565 events: window_events,
566 is_partial: false,
567 late_dropped: 0,
568 late_side_output: 0,
569 });
570 }
571 outputs
572 }
573
574 fn assign_to_window(&mut self, event: WindowEvent<T>, wid: WindowId) -> Vec<WindowOutput<T>> {
577 let mut outputs = Vec::new();
578
579 if let Some(pane) = self.panes.get(&wid) {
581 match pane.state {
582 WindowState::Closed => {
583 return self.handle_late_event(event);
584 }
585 WindowState::Closing => {
586 if let Some(deadline) = pane.lateness_deadline_ms {
588 if event.timestamp_ms > deadline {
589 return self.handle_late_event(event);
590 }
591 }
592 }
594 WindowState::Open => {
595 }
597 }
598 }
599
600 if event.timestamp_ms < self.watermark_ms && !self.panes.contains_key(&wid) {
602 return self.handle_late_event(event);
603 }
604
605 if !self.panes.contains_key(&wid) {
607 let pane = match self.config.late_policy {
608 LatePolicy::AllowedLateness { lateness } => {
609 WindowPane::new_with_lateness(wid.clone(), lateness.as_millis() as i64)
610 }
611 _ => WindowPane::new(wid.clone()),
612 };
613 self.panes.insert(wid.clone(), pane);
614 self.stats.windows_opened += 1;
615 self.stats.open_windows += 1;
616 }
617
618 if let Some(pane) = self.panes.get_mut(&wid) {
619 pane.events.push(event);
620 }
621
622 outputs.extend(self.enforce_max_open_windows());
624
625 outputs
626 }
627
628 fn handle_late_event(&mut self, event: WindowEvent<T>) -> Vec<WindowOutput<T>> {
629 match self.config.late_policy {
630 LatePolicy::Drop => {
631 self.stats.late_events_dropped += 1;
632 }
633 LatePolicy::SideOutput => {
634 self.stats.late_events_side_output += 1;
635 self.side_output.push_back(event);
636 }
637 LatePolicy::AllowedLateness { .. } => {
638 self.stats.late_events_dropped += 1;
640 }
641 }
642 Vec::new()
643 }
644
645 fn close_expired_windows(&mut self) -> Vec<WindowOutput<T>> {
646 let mut outputs = Vec::new();
647 let wm = self.watermark_ms;
648
649 let expired: Vec<WindowId> = self
650 .panes
651 .iter()
652 .filter(|(wid, pane)| {
653 if pane.state == WindowState::Closed {
654 return false;
655 }
656 match pane.lateness_deadline_ms {
657 Some(deadline) => wm >= deadline,
658 None => wm >= wid.end,
659 }
660 })
661 .map(|(wid, _)| wid.clone())
662 .collect();
663
664 for wid in expired {
665 if let Some(mut pane) = self.panes.remove(&wid) {
666 pane.state = WindowState::Closed;
667 self.stats.windows_closed += 1;
668 self.stats.open_windows = self.stats.open_windows.saturating_sub(1);
669 outputs.push(WindowOutput {
670 window_id: wid,
671 events: pane.events,
672 is_partial: false,
673 late_dropped: 0,
674 late_side_output: 0,
675 });
676 }
677 }
678
679 outputs
680 }
681
682 fn enforce_max_open_windows(&mut self) -> Vec<WindowOutput<T>> {
683 let mut outputs = Vec::new();
684 while self.panes.len() > self.config.max_open_windows {
685 if let Some(wid) = self.panes.keys().next().cloned() {
687 if let Some(mut pane) = self.panes.remove(&wid) {
688 pane.state = WindowState::Closed;
689 self.stats.windows_evicted += 1;
690 self.stats.open_windows = self.stats.open_windows.saturating_sub(1);
691 if self.config.emit_on_evict {
692 outputs.push(WindowOutput {
693 window_id: wid,
694 events: pane.events,
695 is_partial: true,
696 late_dropped: 0,
697 late_side_output: 0,
698 });
699 }
700 }
701 } else {
702 break;
703 }
704 }
705 outputs
706 }
707
708 pub fn flush(&mut self) -> Vec<WindowOutput<T>> {
710 let mut outputs = Vec::new();
711
712 for (key, buf) in self.count_buffers.drain() {
714 if !buf.is_empty() {
715 let seq = self.count_seq;
716 self.count_seq += 1;
717 let wid = WindowId {
718 start: seq * 1000,
719 end: (seq + 1) * 1000,
720 key,
721 };
722 outputs.push(WindowOutput {
723 window_id: wid,
724 events: buf,
725 is_partial: true,
726 late_dropped: 0,
727 late_side_output: 0,
728 });
729 }
730 }
731
732 let wids: Vec<_> = self.panes.keys().cloned().collect();
733 for wid in wids {
734 if let Some(mut pane) = self.panes.remove(&wid) {
735 pane.state = WindowState::Closed;
736 self.stats.windows_closed += 1;
737 self.stats.open_windows = self.stats.open_windows.saturating_sub(1);
738 outputs.push(WindowOutput {
739 window_id: wid,
740 events: pane.events,
741 is_partial: true,
742 late_dropped: 0,
743 late_side_output: 0,
744 });
745 }
746 }
747 outputs
748 }
749
750 pub fn aggregate<A, F>(&self, window_id: &WindowId, init: A, fold: F) -> Option<A>
752 where
753 F: Fn(A, &T) -> A,
754 {
755 self.panes.get(window_id).map(|pane| {
756 pane.events
757 .iter()
758 .fold(init, |acc, evt| fold(acc, &evt.value))
759 })
760 }
761}
762
763pub fn tumbling_window_for(ts_ms: i64, size: Duration) -> WindowId {
769 let size_ms = size.as_millis() as i64;
770 if size_ms == 0 {
771 return WindowId::time_range(ts_ms, ts_ms);
772 }
773 let start = (ts_ms / size_ms) * size_ms;
774 WindowId::time_range(start, start + size_ms)
775}
776
777pub fn sliding_windows_for(ts_ms: i64, size: Duration, slide: Duration) -> Vec<WindowId> {
779 let size_ms = size.as_millis() as i64;
780 let slide_ms = slide.as_millis() as i64;
781 if slide_ms == 0 || size_ms == 0 {
782 return Vec::new();
783 }
784 let latest_start = (ts_ms / slide_ms) * slide_ms;
785 let earliest_start = latest_start - size_ms + slide_ms;
786
787 let mut windows = Vec::new();
788 let mut start = earliest_start;
789 while start <= latest_start {
790 let end = start + size_ms;
791 if ts_ms >= start && ts_ms < end {
792 windows.push(WindowId::time_range(start, end));
793 }
794 start += slide_ms;
795 }
796 windows
797}
798
799#[cfg(test)]
804mod tests {
805 use super::*;
806
807 fn events(timestamps: &[i64]) -> Vec<WindowEvent<i64>> {
809 timestamps
810 .iter()
811 .map(|&ts| WindowEvent::new(ts, ts))
812 .collect()
813 }
814
815 fn event_at(ts: i64) -> WindowEvent<i64> {
816 WindowEvent::new(ts, ts)
817 }
818
819 #[test]
822 fn test_window_id_time_range() {
823 let wid = WindowId::time_range(1000, 2000);
824 assert_eq!(wid.start, 1000);
825 assert_eq!(wid.end, 2000);
826 assert!(wid.key.is_none());
827 }
828
829 #[test]
830 fn test_window_id_keyed() {
831 let wid = WindowId::keyed(0, 100, "sensor-1");
832 assert_eq!(wid.key, Some("sensor-1".to_string()));
833 }
834
835 #[test]
836 fn test_window_id_duration() {
837 let wid = WindowId::time_range(1000, 2000);
838 assert_eq!(wid.duration_ms(), 1000);
839 }
840
841 #[test]
842 fn test_window_id_contains() {
843 let wid = WindowId::time_range(1000, 2000);
844 assert!(wid.contains(1000));
845 assert!(wid.contains(1500));
846 assert!(wid.contains(1999));
847 assert!(!wid.contains(2000)); assert!(!wid.contains(999));
849 }
850
851 #[test]
854 fn test_window_event_creation() {
855 let evt = WindowEvent::new(42, 1000);
856 assert_eq!(evt.value, 42);
857 assert_eq!(evt.timestamp_ms, 1000);
858 }
859
860 #[test]
863 fn test_window_pane_empty() {
864 let pane = WindowPane::<i64>::new(WindowId::time_range(0, 1000));
865 assert!(pane.is_empty());
866 assert_eq!(pane.len(), 0);
867 assert_eq!(pane.state, WindowState::Open);
868 }
869
870 #[test]
871 fn test_window_pane_min_max_timestamp() {
872 let mut pane = WindowPane::<i64>::new(WindowId::time_range(0, 1000));
873 pane.events.push(WindowEvent::new(1, 500));
874 pane.events.push(WindowEvent::new(2, 100));
875 pane.events.push(WindowEvent::new(3, 900));
876 assert_eq!(pane.min_timestamp(), Some(100));
877 assert_eq!(pane.max_timestamp(), Some(900));
878 }
879
880 #[test]
881 fn test_window_pane_no_timestamps() {
882 let pane = WindowPane::<i64>::new(WindowId::time_range(0, 1000));
883 assert_eq!(pane.min_timestamp(), None);
884 assert_eq!(pane.max_timestamp(), None);
885 }
886
887 #[test]
890 fn test_tumbling_basic() {
891 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
892 for ts in [0, 5000, 9999] {
894 wa.ingest(event_at(ts));
895 }
896 assert_eq!(wa.stats().total_events, 3);
897 assert_eq!(wa.open_pane_count(), 1);
898 }
899
900 #[test]
901 fn test_tumbling_multiple_windows() {
902 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
903 wa.ingest(event_at(0));
905 wa.ingest(event_at(5000));
906 wa.ingest(event_at(10000));
908 wa.ingest(event_at(15000));
909 assert_eq!(wa.open_pane_count(), 2);
910 }
911
912 #[test]
913 fn test_tumbling_watermark_closes_window() {
914 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
915 wa.ingest(event_at(0));
916 wa.ingest(event_at(5000));
917
918 let outputs = wa.advance_watermark(10000);
919 assert_eq!(outputs.len(), 1);
920 assert_eq!(outputs[0].window_id.start, 0);
921 assert_eq!(outputs[0].window_id.end, 10000);
922 assert_eq!(outputs[0].events.len(), 2);
923 assert!(!outputs[0].is_partial);
924 }
925
926 #[test]
927 fn test_tumbling_watermark_no_close_if_below() {
928 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
929 wa.ingest(event_at(0));
930 let outputs = wa.advance_watermark(5000);
931 assert!(outputs.is_empty());
932 }
933
934 #[test]
935 fn test_tumbling_late_event_dropped() {
936 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
937 wa.advance_watermark(20000);
938 wa.ingest(event_at(5000)); assert_eq!(wa.stats().late_events_dropped, 1);
940 }
941
942 #[test]
943 fn test_tumbling_late_event_side_output() {
944 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10))
945 .with_late_policy(LatePolicy::SideOutput);
946 wa.advance_watermark(20000);
947 wa.ingest(event_at(5000)); assert_eq!(wa.stats().late_events_side_output, 1);
949 let side = wa.drain_side_output();
950 assert_eq!(side.len(), 1);
951 assert_eq!(side[0].timestamp_ms, 5000);
952 }
953
954 #[test]
955 fn test_tumbling_flush() {
956 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
957 wa.ingest(event_at(0));
958 wa.ingest(event_at(5000));
959 wa.ingest(event_at(15000));
960
961 let outputs = wa.flush();
962 assert_eq!(outputs.len(), 2);
963 assert!(outputs.iter().all(|o| o.is_partial));
964 }
965
966 #[test]
969 fn test_sliding_basic() {
970 let mut wa = WindowAlgebra::<i64>::sliding(Duration::from_secs(10), Duration::from_secs(5));
971 wa.ingest(event_at(7500));
972 assert!(wa.open_pane_count() >= 1);
974 }
975
976 #[test]
977 fn test_sliding_event_in_multiple_windows() {
978 let mut wa = WindowAlgebra::<i64>::sliding(Duration::from_secs(10), Duration::from_secs(5));
979 wa.ingest(event_at(6000));
980 assert_eq!(wa.open_pane_count(), 2);
983 }
984
985 #[test]
986 fn test_sliding_watermark_closes_old_windows() {
987 let mut wa = WindowAlgebra::<i64>::sliding(Duration::from_secs(10), Duration::from_secs(5));
988 wa.ingest(event_at(3000));
989 wa.ingest(event_at(6000));
990 let outputs = wa.advance_watermark(10000);
991 assert!(!outputs.is_empty());
993 }
994
995 #[test]
996 fn test_sliding_window_helper() {
997 let windows = sliding_windows_for(7500, Duration::from_secs(10), Duration::from_secs(5));
998 assert!(!windows.is_empty());
999 assert!(windows.iter().all(|w| w.contains(7500)));
1000 }
1001
1002 #[test]
1005 fn test_session_basic() {
1006 let mut wa = WindowAlgebra::<i64>::session(Duration::from_secs(5));
1007 wa.ingest(event_at(1000));
1008 wa.ingest(event_at(3000));
1009 wa.ingest(event_at(4000));
1010 assert_eq!(wa.open_pane_count(), 1);
1012 }
1013
1014 #[test]
1015 fn test_session_gap_closes_window() {
1016 let mut wa = WindowAlgebra::<i64>::session(Duration::from_secs(5));
1017 wa.ingest(event_at(1000));
1018 wa.ingest(event_at(3000));
1019 let outputs = wa.ingest(event_at(10000));
1021 assert!(!outputs.is_empty());
1022 }
1023
1024 #[test]
1025 fn test_session_multiple_sessions() {
1026 let mut wa = WindowAlgebra::<i64>::session(Duration::from_secs(5));
1027 wa.ingest(event_at(1000));
1028 wa.ingest(event_at(3000));
1029 let out1 = wa.ingest(event_at(20000)); assert!(!out1.is_empty());
1032 wa.ingest(event_at(22000));
1033 let out2 = wa.ingest(event_at(40000)); assert!(!out2.is_empty());
1035 }
1036
1037 #[test]
1040 fn test_count_basic() {
1041 let mut wa = WindowAlgebra::<i64>::count(3);
1042 let out1 = wa.ingest(event_at(1));
1043 assert!(out1.is_empty());
1044 let out2 = wa.ingest(event_at(2));
1045 assert!(out2.is_empty());
1046 let out3 = wa.ingest(event_at(3));
1047 assert_eq!(out3.len(), 1);
1048 assert_eq!(out3[0].events.len(), 3);
1049 }
1050
1051 #[test]
1052 fn test_count_multiple_triggers() {
1053 let mut wa = WindowAlgebra::<i64>::count(2);
1054 let evts = events(&[1, 2, 3, 4, 5, 6]);
1055 let outputs = wa.ingest_batch(evts);
1056 assert_eq!(outputs.len(), 3);
1057 }
1058
1059 #[test]
1060 fn test_count_partial_flush() {
1061 let mut wa = WindowAlgebra::<i64>::count(3);
1062 wa.ingest(event_at(1));
1063 wa.ingest(event_at(2));
1064 let outputs = wa.flush();
1065 assert_eq!(outputs.len(), 1);
1066 assert!(outputs[0].is_partial);
1067 assert_eq!(outputs[0].events.len(), 2);
1068 }
1069
1070 #[test]
1073 fn test_late_policy_drop() {
1074 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10))
1075 .with_late_policy(LatePolicy::Drop);
1076 wa.advance_watermark(30000);
1077 wa.ingest(event_at(5000));
1078 assert_eq!(wa.stats().late_events_dropped, 1);
1079 assert_eq!(wa.drain_side_output().len(), 0);
1080 }
1081
1082 #[test]
1083 fn test_late_policy_side_output() {
1084 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10))
1085 .with_late_policy(LatePolicy::SideOutput);
1086 wa.advance_watermark(30000);
1087 wa.ingest(event_at(5000));
1088 wa.ingest(event_at(8000));
1089 assert_eq!(wa.stats().late_events_side_output, 2);
1090 let side = wa.drain_side_output();
1091 assert_eq!(side.len(), 2);
1092 }
1093
1094 #[test]
1095 fn test_late_policy_allowed_lateness() {
1096 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10)).with_late_policy(
1097 LatePolicy::AllowedLateness {
1098 lateness: Duration::from_secs(5),
1099 },
1100 );
1101 wa.ingest(event_at(5000));
1103 wa.advance_watermark(30000);
1106 wa.ingest(event_at(2000)); assert_eq!(wa.stats().late_events_dropped, 1);
1108 }
1109
1110 #[test]
1113 fn test_max_open_windows_eviction() {
1114 let mut wa =
1115 WindowAlgebra::<i64>::tumbling(Duration::from_secs(1)).with_max_open_windows(3);
1116 wa.ingest(event_at(0));
1118 wa.ingest(event_at(1000));
1119 wa.ingest(event_at(2000));
1120 let outputs = wa.ingest(event_at(3000));
1121 assert!(wa.stats().windows_evicted >= 1 || !outputs.is_empty());
1123 }
1124
1125 #[test]
1126 fn test_evicted_window_emits_partial() {
1127 let mut wa =
1128 WindowAlgebra::<i64>::tumbling(Duration::from_secs(1)).with_max_open_windows(2);
1129 wa.ingest(event_at(0));
1130 wa.ingest(event_at(1000));
1131 let outputs = wa.ingest(event_at(2000));
1132 let partial = outputs.iter().filter(|o| o.is_partial).count();
1133 assert!(partial >= 1);
1134 }
1135
1136 #[test]
1139 fn test_stats_total_events() {
1140 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1141 for i in 0..10 {
1142 wa.ingest(event_at(i * 100));
1143 }
1144 assert_eq!(wa.stats().total_events, 10);
1145 }
1146
1147 #[test]
1148 fn test_stats_windows_opened() {
1149 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1150 wa.ingest(event_at(0));
1151 wa.ingest(event_at(10000));
1152 wa.ingest(event_at(20000));
1153 assert_eq!(wa.stats().windows_opened, 3);
1154 }
1155
1156 #[test]
1157 fn test_stats_windows_closed() {
1158 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1159 wa.ingest(event_at(0));
1160 wa.ingest(event_at(10000));
1161 wa.advance_watermark(20000);
1162 assert_eq!(wa.stats().windows_closed, 2);
1163 }
1164
1165 #[test]
1168 fn test_aggregate_sum() {
1169 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1170 wa.ingest(event_at(100));
1171 wa.ingest(event_at(200));
1172 wa.ingest(event_at(300));
1173
1174 let wid = WindowId::time_range(0, 10000);
1175 let sum = wa.aggregate(&wid, 0i64, |acc, &val| acc + val);
1176 assert_eq!(sum, Some(600));
1177 }
1178
1179 #[test]
1180 fn test_aggregate_count() {
1181 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1182 wa.ingest(event_at(100));
1183 wa.ingest(event_at(200));
1184
1185 let wid = WindowId::time_range(0, 10000);
1186 let count = wa.aggregate(&wid, 0usize, |acc, _| acc + 1);
1187 assert_eq!(count, Some(2));
1188 }
1189
1190 #[test]
1191 fn test_aggregate_nonexistent_window() {
1192 let wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1193 let wid = WindowId::time_range(0, 10000);
1194 let result = wa.aggregate(&wid, 0, |acc, _: &i64| acc + 1);
1195 assert!(result.is_none());
1196 }
1197
1198 #[test]
1201 fn test_tumbling_window_for_helper() {
1202 let wid = tumbling_window_for(7500, Duration::from_secs(10));
1203 assert_eq!(wid.start, 0);
1204 assert_eq!(wid.end, 10000);
1205 }
1206
1207 #[test]
1208 fn test_tumbling_window_for_exact_boundary() {
1209 let wid = tumbling_window_for(10000, Duration::from_secs(10));
1210 assert_eq!(wid.start, 10000);
1211 assert_eq!(wid.end, 20000);
1212 }
1213
1214 #[test]
1215 fn test_sliding_windows_for_helper() {
1216 let windows = sliding_windows_for(12000, Duration::from_secs(10), Duration::from_secs(5));
1217 assert!(!windows.is_empty());
1219 for w in &windows {
1220 assert!(w.contains(12000));
1221 }
1222 }
1223
1224 #[test]
1227 fn test_batch_ingest() {
1228 let mut wa = WindowAlgebra::<i64>::count(3);
1229 let evts = events(&[1, 2, 3, 4, 5, 6, 7, 8, 9]);
1230 let outputs = wa.ingest_batch(evts);
1231 assert_eq!(outputs.len(), 3);
1232 assert!(outputs.iter().all(|o| o.events.len() == 3));
1233 }
1234
1235 #[test]
1238 fn test_default_config() {
1239 let config = WindowAlgebraConfig::default();
1240 assert_eq!(
1241 config.kind,
1242 WindowKind::Tumbling {
1243 size: Duration::from_secs(60)
1244 }
1245 );
1246 assert_eq!(config.late_policy, LatePolicy::Drop);
1247 assert_eq!(config.max_open_windows, 10_000);
1248 assert!(config.emit_on_evict);
1249 }
1250
1251 #[test]
1252 fn test_custom_config() {
1253 let config = WindowAlgebraConfig {
1254 kind: WindowKind::Sliding {
1255 size: Duration::from_secs(30),
1256 slide: Duration::from_secs(10),
1257 },
1258 late_policy: LatePolicy::SideOutput,
1259 max_open_windows: 500,
1260 emit_on_evict: false,
1261 };
1262 assert_eq!(config.max_open_windows, 500);
1263 assert!(!config.emit_on_evict);
1264 }
1265
1266 #[test]
1269 fn test_window_state_variants() {
1270 assert_eq!(WindowState::Open, WindowState::Open);
1271 assert_ne!(WindowState::Open, WindowState::Closed);
1272 assert_ne!(WindowState::Closing, WindowState::Closed);
1273 }
1274
1275 #[test]
1278 fn test_watermark_no_regression() {
1279 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1280 wa.advance_watermark(10000);
1281 let outputs = wa.advance_watermark(5000); assert!(outputs.is_empty());
1283 assert_eq!(wa.watermark_ms(), 10000);
1284 }
1285
1286 #[test]
1287 fn test_empty_flush() {
1288 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1289 let outputs = wa.flush();
1290 assert!(outputs.is_empty());
1291 }
1292
1293 #[test]
1294 fn test_double_watermark_advance() {
1295 let mut wa = WindowAlgebra::<i64>::tumbling(Duration::from_secs(10));
1296 wa.ingest(event_at(5000));
1297 let out1 = wa.advance_watermark(10000);
1298 assert_eq!(out1.len(), 1);
1299 let out2 = wa.advance_watermark(10000);
1300 assert!(out2.is_empty()); }
1302}