streamweave_window/
window.rs

1//! Windowing operations for stream processing.
2//!
3//! This module provides abstractions for grouping stream elements into
4//! bounded windows for aggregation and processing.
5//!
6//! # Overview
7//!
8//! Windowing is fundamental to stream processing, allowing bounded computations
9//! over unbounded streams. Windows group elements based on:
10//!
11//! - **Time**: Group by event time or processing time
12//! - **Count**: Group by number of elements
13//! - **Session**: Group by activity with gaps
14//!
15//! # Core Concepts
16//!
17//! - `TimeWindow`: Represents a window boundary (start/end time)
18//! - `WindowAssigner`: Assigns elements to windows
19//! - `WindowTrigger`: Determines when to emit window results
20//! - `LateDataPolicy`: Handles elements arriving after window closes
21//!
22//! # Window Types
23//!
24//! - `TumblingWindowAssigner`: Fixed-size, non-overlapping windows
25//! - `SlidingWindowAssigner`: Fixed-size, overlapping windows
26//! - `SessionWindow`: Gap-based dynamic windows
27//!
28//! # Example
29//!
30//! ```rust
31//! use streamweave::window::{TimeWindow, TumblingWindowAssigner, WindowConfig};
32//! use std::time::Duration;
33//!
34//! // Create a tumbling window of 5 seconds
35//! let assigner = TumblingWindowAssigner::new(Duration::from_secs(5));
36//! ```
37
38use chrono::{DateTime, Duration as ChronoDuration, Utc};
39use std::cmp::Ordering;
40use std::collections::HashMap;
41use std::fmt::{self, Debug, Display, Formatter};
42use std::hash::{Hash, Hasher};
43use std::sync::Arc;
44use std::time::Duration;
45
46/// Result of trigger evaluation.
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
48pub enum TriggerResult {
49  /// Continue accumulating elements.
50  Continue,
51  /// Fire the window (emit results but keep state).
52  Fire,
53  /// Fire and purge (emit results and clear state).
54  FireAndPurge,
55  /// Purge without firing (discard state).
56  Purge,
57}
58
59/// Policy for handling late data (elements arriving after window closes).
60#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
61pub enum LateDataPolicy {
62  /// Drop late elements silently.
63  #[default]
64  Drop,
65  /// Emit late elements to a side output.
66  SideOutput,
67  /// Include late elements in the window (refire if already fired).
68  AllowLateness(Duration),
69}
70
71/// Error type for window operations.
72#[derive(Debug)]
73pub enum WindowError {
74  /// Invalid window configuration.
75  InvalidConfig(String),
76  /// Window not found.
77  NotFound(String),
78  /// Window already closed.
79  WindowClosed(String),
80  /// State error.
81  StateError(String),
82}
83
84impl fmt::Display for WindowError {
85  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86    match self {
87      WindowError::InvalidConfig(msg) => write!(f, "Invalid window config: {}", msg),
88      WindowError::NotFound(msg) => write!(f, "Window not found: {}", msg),
89      WindowError::WindowClosed(msg) => write!(f, "Window closed: {}", msg),
90      WindowError::StateError(msg) => write!(f, "State error: {}", msg),
91    }
92  }
93}
94
95impl std::error::Error for WindowError {}
96
97/// Result type for window operations.
98pub type WindowResult<T> = Result<T, WindowError>;
99
100/// Trait for window types that can be used with `WindowState`.
101///
102/// This trait provides a common interface for different window types
103/// (e.g., `TimeWindow`, `GlobalWindow`) to be used generically.
104pub trait Window: Clone + Debug + Send + Sync + 'static {
105  /// Returns the end time of the window, if applicable.
106  /// Returns `None` for windows without a defined end (e.g., global windows).
107  fn end_time(&self) -> Option<DateTime<Utc>>;
108
109  /// Returns the start time of the window, if applicable.
110  fn start_time(&self) -> Option<DateTime<Utc>>;
111
112  /// Returns true if this window contains the given timestamp.
113  fn contains(&self, timestamp: DateTime<Utc>) -> bool;
114}
115
116/// A time-based window with start and end timestamps.
117///
118/// This is the fundamental window type for time-based windowing strategies.
119#[derive(Debug, Clone)]
120pub struct TimeWindow {
121  /// Start time of the window (inclusive).
122  start: DateTime<Utc>,
123  /// End time of the window (exclusive).
124  end: DateTime<Utc>,
125}
126
127impl TimeWindow {
128  /// Creates a new time window with the given start and end.
129  pub fn new(start: DateTime<Utc>, end: DateTime<Utc>) -> Self {
130    Self { start, end }
131  }
132
133  /// Returns the start time of the window.
134  pub fn start(&self) -> DateTime<Utc> {
135    self.start
136  }
137
138  /// Returns the end time of the window.
139  pub fn end(&self) -> DateTime<Utc> {
140    self.end
141  }
142
143  /// Returns the duration of the window.
144  pub fn duration(&self) -> ChronoDuration {
145    self.end - self.start
146  }
147
148  /// Returns the maximum timestamp for elements in this window.
149  pub fn max_timestamp(&self) -> DateTime<Utc> {
150    self.end - ChronoDuration::milliseconds(1)
151  }
152
153  /// Returns true if the given timestamp falls within this window.
154  pub fn contains(&self, timestamp: DateTime<Utc>) -> bool {
155    timestamp >= self.start && timestamp < self.end
156  }
157
158  /// Returns true if this window intersects with another.
159  pub fn intersects(&self, other: &TimeWindow) -> bool {
160    self.start < other.end && other.start < self.end
161  }
162
163  /// Merges this window with another overlapping window.
164  pub fn merge(&self, other: &TimeWindow) -> Option<TimeWindow> {
165    if self.intersects(other) || self.end == other.start || other.end == self.start {
166      Some(TimeWindow::new(
167        self.start.min(other.start),
168        self.end.max(other.end),
169      ))
170    } else {
171      None
172    }
173  }
174}
175
176impl PartialEq for TimeWindow {
177  fn eq(&self, other: &Self) -> bool {
178    self.start == other.start && self.end == other.end
179  }
180}
181
182impl Eq for TimeWindow {}
183
184impl Hash for TimeWindow {
185  fn hash<H: Hasher>(&self, state: &mut H) {
186    self.start.hash(state);
187    self.end.hash(state);
188  }
189}
190
191impl PartialOrd for TimeWindow {
192  fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
193    Some(self.cmp(other))
194  }
195}
196
197impl Ord for TimeWindow {
198  fn cmp(&self, other: &Self) -> Ordering {
199    self
200      .start
201      .cmp(&other.start)
202      .then_with(|| self.end.cmp(&other.end))
203  }
204}
205
206impl fmt::Display for TimeWindow {
207  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208    write!(
209      f,
210      "[{}, {})",
211      self.start.format("%H:%M:%S"),
212      self.end.format("%H:%M:%S")
213    )
214  }
215}
216
217impl Window for TimeWindow {
218  fn end_time(&self) -> Option<DateTime<Utc>> {
219    Some(self.end)
220  }
221
222  fn start_time(&self) -> Option<DateTime<Utc>> {
223    Some(self.start)
224  }
225
226  fn contains(&self, timestamp: DateTime<Utc>) -> bool {
227    timestamp >= self.start && timestamp < self.end
228  }
229}
230
231/// A count-based window that holds a fixed number of elements.
232#[derive(Debug, Clone, PartialEq, Eq, Hash)]
233pub struct CountWindow {
234  /// Window identifier (incremented for each new window).
235  id: u64,
236  /// Maximum count for this window.
237  max_count: usize,
238}
239
240impl CountWindow {
241  /// Creates a new count window with the given ID and max count.
242  pub fn new(id: u64, max_count: usize) -> Self {
243    Self { id, max_count }
244  }
245
246  /// Returns the window ID.
247  pub fn id(&self) -> u64 {
248    self.id
249  }
250
251  /// Returns the maximum count for this window.
252  pub fn max_count(&self) -> usize {
253    self.max_count
254  }
255}
256
257impl fmt::Display for CountWindow {
258  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
259    write!(f, "CountWindow(id={}, max={})", self.id, self.max_count)
260  }
261}
262
263/// A session window with dynamic boundaries based on activity gaps.
264#[derive(Debug, Clone)]
265pub struct SessionWindow {
266  /// Start time of the session.
267  start: DateTime<Utc>,
268  /// End time of the session.
269  end: DateTime<Utc>,
270  /// Session gap timeout.
271  gap: Duration,
272}
273
274impl SessionWindow {
275  /// Creates a new session window.
276  pub fn new(start: DateTime<Utc>, end: DateTime<Utc>, gap: Duration) -> Self {
277    Self { start, end, gap }
278  }
279
280  /// Creates a session window from a single element timestamp.
281  pub fn from_element(timestamp: DateTime<Utc>, gap: Duration) -> Self {
282    let gap_chrono = ChronoDuration::from_std(gap).unwrap_or(ChronoDuration::seconds(0));
283    Self {
284      start: timestamp,
285      end: timestamp + gap_chrono,
286      gap,
287    }
288  }
289
290  /// Returns the start time.
291  pub fn start(&self) -> DateTime<Utc> {
292    self.start
293  }
294
295  /// Returns the end time.
296  pub fn end(&self) -> DateTime<Utc> {
297    self.end
298  }
299
300  /// Returns the gap duration.
301  pub fn gap(&self) -> Duration {
302    self.gap
303  }
304
305  /// Returns true if this session contains the timestamp.
306  pub fn contains(&self, timestamp: DateTime<Utc>) -> bool {
307    timestamp >= self.start && timestamp < self.end
308  }
309
310  /// Returns true if this session should merge with another.
311  pub fn should_merge(&self, other: &SessionWindow) -> bool {
312    // Sessions merge if they overlap or are adjacent (within gap)
313    self.start < other.end && other.start < self.end
314  }
315
316  /// Merges this session with another.
317  pub fn merge(&self, other: &SessionWindow) -> Option<SessionWindow> {
318    if self.should_merge(other) {
319      Some(SessionWindow::new(
320        self.start.min(other.start),
321        self.end.max(other.end),
322        self.gap.max(other.gap),
323      ))
324    } else {
325      None
326    }
327  }
328
329  /// Extends the session to include a new timestamp.
330  pub fn extend(&mut self, timestamp: DateTime<Utc>) {
331    let gap_chrono = ChronoDuration::from_std(self.gap).unwrap_or(ChronoDuration::seconds(0));
332    if timestamp < self.start {
333      self.start = timestamp;
334    }
335    let new_end = timestamp + gap_chrono;
336    if new_end > self.end {
337      self.end = new_end;
338    }
339  }
340}
341
342impl PartialEq for SessionWindow {
343  fn eq(&self, other: &Self) -> bool {
344    self.start == other.start && self.end == other.end
345  }
346}
347
348impl Eq for SessionWindow {}
349
350impl Hash for SessionWindow {
351  fn hash<H: Hasher>(&self, state: &mut H) {
352    self.start.hash(state);
353    self.end.hash(state);
354  }
355}
356
357impl fmt::Display for SessionWindow {
358  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
359    write!(
360      f,
361      "Session[{}, {}) gap={:?}",
362      self.start.format("%H:%M:%S"),
363      self.end.format("%H:%M:%S"),
364      self.gap
365    )
366  }
367}
368
369/// Trait for window assigners that assign elements to windows.
370///
371/// Window assigners determine which windows an element belongs to based on
372/// its timestamp or other properties.
373pub trait WindowAssigner: Send + Sync {
374  /// The window type produced by this assigner.
375  type W: Clone + Debug + PartialEq + Eq + Hash + Send + Sync;
376
377  /// Assign an element to zero or more windows.
378  ///
379  /// # Arguments
380  ///
381  /// * `timestamp` - The timestamp of the element
382  ///
383  /// # Returns
384  ///
385  /// A vector of windows the element belongs to.
386  fn assign_windows(&self, timestamp: DateTime<Utc>) -> Vec<Self::W>;
387
388  /// Returns the default trigger for this assigner.
389  fn default_trigger(&self) -> Box<dyn WindowTrigger<Self::W>>;
390
391  /// Returns true if this assigner produces event-time windows.
392  fn is_event_time(&self) -> bool {
393    true
394  }
395}
396
397/// Trait for window triggers that determine when to emit results.
398///
399/// Triggers are fired based on:
400/// - Element arrival
401/// - Processing time advance
402/// - Event time (watermark) advance
403pub trait WindowTrigger<W>: Send + Sync
404where
405  W: Clone + Debug + PartialEq + Eq + Hash,
406{
407  /// Called when an element is added to a window.
408  fn on_element(&mut self, timestamp: DateTime<Utc>, window: &W) -> TriggerResult;
409
410  /// Called when processing time advances.
411  fn on_processing_time(&mut self, time: DateTime<Utc>, window: &W) -> TriggerResult;
412
413  /// Called when the watermark (event time) advances.
414  fn on_event_time(&mut self, watermark: DateTime<Utc>, window: &W) -> TriggerResult;
415
416  /// Called when the trigger is cleared.
417  fn clear(&mut self, window: &W);
418
419  /// Creates a clone of this trigger.
420  fn clone_trigger(&self) -> Box<dyn WindowTrigger<W>>;
421}
422
423/// Default trigger that fires when the watermark passes the window end.
424#[derive(Debug, Clone, Default)]
425pub struct EventTimeTrigger;
426
427impl EventTimeTrigger {
428  /// Creates a new event time trigger.
429  pub fn new() -> Self {
430    Self
431  }
432}
433
434impl WindowTrigger<TimeWindow> for EventTimeTrigger {
435  fn on_element(&mut self, _timestamp: DateTime<Utc>, _window: &TimeWindow) -> TriggerResult {
436    TriggerResult::Continue
437  }
438
439  fn on_processing_time(&mut self, _time: DateTime<Utc>, _window: &TimeWindow) -> TriggerResult {
440    TriggerResult::Continue
441  }
442
443  fn on_event_time(&mut self, watermark: DateTime<Utc>, window: &TimeWindow) -> TriggerResult {
444    if watermark >= window.end() {
445      TriggerResult::FireAndPurge
446    } else {
447      TriggerResult::Continue
448    }
449  }
450
451  fn clear(&mut self, _window: &TimeWindow) {}
452
453  fn clone_trigger(&self) -> Box<dyn WindowTrigger<TimeWindow>> {
454    Box::new(self.clone())
455  }
456}
457
458/// Trigger that fires after a specified count of elements.
459#[derive(Debug, Clone)]
460pub struct CountTrigger {
461  /// Count at which to fire.
462  count: usize,
463  /// Current counts per window.
464  window_counts: HashMap<CountWindow, usize>,
465}
466
467impl CountTrigger {
468  /// Creates a new count trigger that fires after `count` elements.
469  pub fn new(count: usize) -> Self {
470    Self {
471      count,
472      window_counts: HashMap::new(),
473    }
474  }
475
476  /// Returns the target count.
477  pub fn count(&self) -> usize {
478    self.count
479  }
480}
481
482impl WindowTrigger<CountWindow> for CountTrigger {
483  fn on_element(&mut self, _timestamp: DateTime<Utc>, window: &CountWindow) -> TriggerResult {
484    let count = self.window_counts.entry(window.clone()).or_insert(0);
485    *count += 1;
486
487    if *count >= self.count {
488      TriggerResult::FireAndPurge
489    } else {
490      TriggerResult::Continue
491    }
492  }
493
494  fn on_processing_time(&mut self, _time: DateTime<Utc>, _window: &CountWindow) -> TriggerResult {
495    TriggerResult::Continue
496  }
497
498  fn on_event_time(&mut self, _watermark: DateTime<Utc>, _window: &CountWindow) -> TriggerResult {
499    TriggerResult::Continue
500  }
501
502  fn clear(&mut self, window: &CountWindow) {
503    self.window_counts.remove(window);
504  }
505
506  fn clone_trigger(&self) -> Box<dyn WindowTrigger<CountWindow>> {
507    Box::new(self.clone())
508  }
509}
510
511/// Trigger that fires when the session gap timeout expires.
512#[derive(Debug, Clone)]
513pub struct SessionTrigger {
514  /// Last event time per window.
515  last_event_times: HashMap<SessionWindow, DateTime<Utc>>,
516}
517
518impl SessionTrigger {
519  /// Creates a new session trigger.
520  pub fn new() -> Self {
521    Self {
522      last_event_times: HashMap::new(),
523    }
524  }
525}
526
527impl Default for SessionTrigger {
528  fn default() -> Self {
529    Self::new()
530  }
531}
532
533impl WindowTrigger<SessionWindow> for SessionTrigger {
534  fn on_element(&mut self, timestamp: DateTime<Utc>, window: &SessionWindow) -> TriggerResult {
535    self.last_event_times.insert(window.clone(), timestamp);
536    TriggerResult::Continue
537  }
538
539  fn on_processing_time(&mut self, _time: DateTime<Utc>, _window: &SessionWindow) -> TriggerResult {
540    TriggerResult::Continue
541  }
542
543  fn on_event_time(&mut self, watermark: DateTime<Utc>, window: &SessionWindow) -> TriggerResult {
544    // Fire when watermark passes the session end
545    if watermark >= window.end() {
546      TriggerResult::FireAndPurge
547    } else {
548      TriggerResult::Continue
549    }
550  }
551
552  fn clear(&mut self, window: &SessionWindow) {
553    self.last_event_times.remove(window);
554  }
555
556  fn clone_trigger(&self) -> Box<dyn WindowTrigger<SessionWindow>> {
557    Box::new(self.clone())
558  }
559}
560
561/// Processing time trigger that fires at regular intervals.
562#[derive(Debug, Clone)]
563pub struct ProcessingTimeTrigger {
564  /// Interval at which to fire.
565  interval: Duration,
566  /// Last fire time per window.
567  last_fire_times: HashMap<TimeWindow, DateTime<Utc>>,
568}
569
570impl ProcessingTimeTrigger {
571  /// Creates a new processing time trigger.
572  pub fn new(interval: Duration) -> Self {
573    Self {
574      interval,
575      last_fire_times: HashMap::new(),
576    }
577  }
578
579  /// Returns the trigger interval.
580  pub fn interval(&self) -> Duration {
581    self.interval
582  }
583}
584
585impl WindowTrigger<TimeWindow> for ProcessingTimeTrigger {
586  fn on_element(&mut self, _timestamp: DateTime<Utc>, _window: &TimeWindow) -> TriggerResult {
587    TriggerResult::Continue
588  }
589
590  fn on_processing_time(&mut self, time: DateTime<Utc>, window: &TimeWindow) -> TriggerResult {
591    let interval_chrono =
592      ChronoDuration::from_std(self.interval).unwrap_or(ChronoDuration::seconds(1));
593
594    let last_fire = self
595      .last_fire_times
596      .get(window)
597      .copied()
598      .unwrap_or_else(|| time - interval_chrono);
599
600    if time >= last_fire + interval_chrono {
601      self.last_fire_times.insert(window.clone(), time);
602      TriggerResult::Fire
603    } else {
604      TriggerResult::Continue
605    }
606  }
607
608  fn on_event_time(&mut self, _watermark: DateTime<Utc>, _window: &TimeWindow) -> TriggerResult {
609    TriggerResult::Continue
610  }
611
612  fn clear(&mut self, window: &TimeWindow) {
613    self.last_fire_times.remove(window);
614  }
615
616  fn clone_trigger(&self) -> Box<dyn WindowTrigger<TimeWindow>> {
617    Box::new(self.clone())
618  }
619}
620
621/// Tumbling window assigner that creates non-overlapping windows.
622///
623/// Each element is assigned to exactly one window.
624#[derive(Debug, Clone)]
625pub struct TumblingWindowAssigner {
626  /// Size of each window.
627  size: Duration,
628  /// Offset from epoch (for alignment).
629  offset: Duration,
630}
631
632impl TumblingWindowAssigner {
633  /// Creates a new tumbling window assigner with the given size.
634  pub fn new(size: Duration) -> Self {
635    Self {
636      size,
637      offset: Duration::ZERO,
638    }
639  }
640
641  /// Sets the offset for window alignment.
642  pub fn with_offset(mut self, offset: Duration) -> Self {
643    self.offset = offset;
644    self
645  }
646
647  /// Returns the window size.
648  pub fn size(&self) -> Duration {
649    self.size
650  }
651
652  /// Returns the offset.
653  pub fn offset(&self) -> Duration {
654    self.offset
655  }
656
657  /// Calculates the window start for a given timestamp.
658  fn window_start(&self, timestamp: DateTime<Utc>) -> DateTime<Utc> {
659    let ts_millis = timestamp.timestamp_millis();
660    let size_millis = self.size.as_millis() as i64;
661    let offset_millis = self.offset.as_millis() as i64;
662
663    let window_start_millis =
664      ((ts_millis - offset_millis) / size_millis) * size_millis + offset_millis;
665
666    DateTime::from_timestamp_millis(window_start_millis).unwrap_or(timestamp)
667  }
668}
669
670impl WindowAssigner for TumblingWindowAssigner {
671  type W = TimeWindow;
672
673  fn assign_windows(&self, timestamp: DateTime<Utc>) -> Vec<Self::W> {
674    let start = self.window_start(timestamp);
675    let size_chrono = ChronoDuration::from_std(self.size).unwrap_or(ChronoDuration::seconds(1));
676    let end = start + size_chrono;
677
678    vec![TimeWindow::new(start, end)]
679  }
680
681  fn default_trigger(&self) -> Box<dyn WindowTrigger<Self::W>> {
682    Box::new(EventTimeTrigger::new())
683  }
684}
685
686/// Sliding window assigner that creates overlapping windows.
687///
688/// Each element may be assigned to multiple windows.
689#[derive(Debug, Clone)]
690pub struct SlidingWindowAssigner {
691  /// Size of each window.
692  size: Duration,
693  /// Slide interval between windows.
694  slide: Duration,
695  /// Offset from epoch (for alignment).
696  offset: Duration,
697}
698
699impl SlidingWindowAssigner {
700  /// Creates a new sliding window assigner.
701  pub fn new(size: Duration, slide: Duration) -> Self {
702    Self {
703      size,
704      slide,
705      offset: Duration::ZERO,
706    }
707  }
708
709  /// Sets the offset for window alignment.
710  pub fn with_offset(mut self, offset: Duration) -> Self {
711    self.offset = offset;
712    self
713  }
714
715  /// Returns the window size.
716  pub fn size(&self) -> Duration {
717    self.size
718  }
719
720  /// Returns the slide interval.
721  pub fn slide(&self) -> Duration {
722    self.slide
723  }
724
725  /// Returns the offset.
726  pub fn offset(&self) -> Duration {
727    self.offset
728  }
729}
730
731impl WindowAssigner for SlidingWindowAssigner {
732  type W = TimeWindow;
733
734  fn assign_windows(&self, timestamp: DateTime<Utc>) -> Vec<Self::W> {
735    let ts_millis = timestamp.timestamp_millis();
736    let size_millis = self.size.as_millis() as i64;
737    let slide_millis = self.slide.as_millis() as i64;
738    let offset_millis = self.offset.as_millis() as i64;
739
740    let size_chrono = ChronoDuration::from_std(self.size).unwrap_or(ChronoDuration::seconds(1));
741
742    // Find the last window that could contain this timestamp
743    let last_start = ((ts_millis - offset_millis) / slide_millis) * slide_millis + offset_millis;
744
745    // Number of windows this element belongs to
746    let num_windows = (size_millis / slide_millis) as usize;
747
748    (0..num_windows)
749      .map(|i| {
750        let start_millis = last_start - (i as i64) * slide_millis;
751        let start = DateTime::from_timestamp_millis(start_millis).unwrap_or(timestamp);
752        let end = start + size_chrono;
753        TimeWindow::new(start, end)
754      })
755      .filter(|w| w.contains(timestamp))
756      .collect()
757  }
758
759  fn default_trigger(&self) -> Box<dyn WindowTrigger<Self::W>> {
760    Box::new(EventTimeTrigger::new())
761  }
762}
763
764/// Session window assigner that creates gap-based windows.
765///
766/// Elements are grouped into sessions based on activity gaps.
767#[derive(Debug, Clone)]
768pub struct SessionWindowAssigner {
769  /// Gap duration that triggers a new session.
770  gap: Duration,
771}
772
773impl SessionWindowAssigner {
774  /// Creates a new session window assigner with the given gap.
775  pub fn new(gap: Duration) -> Self {
776    Self { gap }
777  }
778
779  /// Returns the gap duration.
780  pub fn gap(&self) -> Duration {
781    self.gap
782  }
783}
784
785impl WindowAssigner for SessionWindowAssigner {
786  type W = SessionWindow;
787
788  fn assign_windows(&self, timestamp: DateTime<Utc>) -> Vec<Self::W> {
789    vec![SessionWindow::from_element(timestamp, self.gap)]
790  }
791
792  fn default_trigger(&self) -> Box<dyn WindowTrigger<Self::W>> {
793    Box::new(SessionTrigger::new())
794  }
795}
796
797/// Count-based tumbling window assigner.
798///
799/// Creates windows based on element count rather than time.
800#[derive(Debug)]
801pub struct CountWindowAssigner {
802  /// Number of elements per window.
803  count: usize,
804  /// Current window ID.
805  current_id: std::sync::atomic::AtomicU64,
806  /// Current count in the window.
807  current_count: std::sync::atomic::AtomicUsize,
808}
809
810impl Clone for CountWindowAssigner {
811  fn clone(&self) -> Self {
812    Self {
813      count: self.count,
814      current_id: std::sync::atomic::AtomicU64::new(
815        self.current_id.load(std::sync::atomic::Ordering::Relaxed),
816      ),
817      current_count: std::sync::atomic::AtomicUsize::new(
818        self
819          .current_count
820          .load(std::sync::atomic::Ordering::Relaxed),
821      ),
822    }
823  }
824}
825
826impl CountWindowAssigner {
827  /// Creates a new count window assigner.
828  pub fn new(count: usize) -> Self {
829    Self {
830      count,
831      current_id: std::sync::atomic::AtomicU64::new(0),
832      current_count: std::sync::atomic::AtomicUsize::new(0),
833    }
834  }
835
836  /// Returns the count per window.
837  pub fn count(&self) -> usize {
838    self.count
839  }
840}
841
842impl WindowAssigner for CountWindowAssigner {
843  type W = CountWindow;
844
845  fn assign_windows(&self, _timestamp: DateTime<Utc>) -> Vec<Self::W> {
846    // Increment count
847    let prev_count = self
848      .current_count
849      .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
850
851    // Check if we need a new window
852    if prev_count > 0 && prev_count.is_multiple_of(self.count) {
853      self
854        .current_id
855        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
856      self
857        .current_count
858        .store(1, std::sync::atomic::Ordering::Relaxed);
859    }
860
861    let id = self.current_id.load(std::sync::atomic::Ordering::Relaxed);
862
863    vec![CountWindow::new(id, self.count)]
864  }
865
866  fn default_trigger(&self) -> Box<dyn WindowTrigger<Self::W>> {
867    Box::new(CountTrigger::new(self.count))
868  }
869
870  fn is_event_time(&self) -> bool {
871    false
872  }
873}
874
875/// Global window assigner that assigns all elements to a single window.
876#[derive(Debug, Clone, Default)]
877pub struct GlobalWindowAssigner;
878
879/// Global window that contains all elements.
880#[derive(Debug, Clone, PartialEq, Eq, Hash)]
881pub struct GlobalWindow;
882
883impl fmt::Display for GlobalWindow {
884  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
885    write!(f, "GlobalWindow")
886  }
887}
888
889impl Window for GlobalWindow {
890  fn end_time(&self) -> Option<DateTime<Utc>> {
891    None // Global window has no end
892  }
893
894  fn start_time(&self) -> Option<DateTime<Utc>> {
895    None // Global window has no start
896  }
897
898  fn contains(&self, _timestamp: DateTime<Utc>) -> bool {
899    true // Global window contains all timestamps
900  }
901}
902
903impl GlobalWindowAssigner {
904  /// Creates a new global window assigner.
905  pub fn new() -> Self {
906    Self
907  }
908}
909
910impl WindowAssigner for GlobalWindowAssigner {
911  type W = GlobalWindow;
912
913  fn assign_windows(&self, _timestamp: DateTime<Utc>) -> Vec<Self::W> {
914    vec![GlobalWindow]
915  }
916
917  fn default_trigger(&self) -> Box<dyn WindowTrigger<Self::W>> {
918    Box::new(NeverTrigger)
919  }
920}
921
922/// Trigger that never fires (for global windows).
923#[derive(Debug, Clone, Default)]
924pub struct NeverTrigger;
925
926impl WindowTrigger<GlobalWindow> for NeverTrigger {
927  fn on_element(&mut self, _timestamp: DateTime<Utc>, _window: &GlobalWindow) -> TriggerResult {
928    TriggerResult::Continue
929  }
930
931  fn on_processing_time(&mut self, _time: DateTime<Utc>, _window: &GlobalWindow) -> TriggerResult {
932    TriggerResult::Continue
933  }
934
935  fn on_event_time(&mut self, _watermark: DateTime<Utc>, _window: &GlobalWindow) -> TriggerResult {
936    TriggerResult::Continue
937  }
938
939  fn clear(&mut self, _window: &GlobalWindow) {}
940
941  fn clone_trigger(&self) -> Box<dyn WindowTrigger<GlobalWindow>> {
942    Box::new(self.clone())
943  }
944}
945
946/// Configuration for window operations.
947#[derive(Debug, Clone)]
948pub struct WindowConfig {
949  /// Policy for handling late data.
950  pub late_data_policy: LateDataPolicy,
951  /// Allowed lateness for elements.
952  pub allowed_lateness: Duration,
953}
954
955impl Default for WindowConfig {
956  fn default() -> Self {
957    Self {
958      late_data_policy: LateDataPolicy::Drop,
959      allowed_lateness: Duration::ZERO,
960    }
961  }
962}
963
964impl WindowConfig {
965  /// Creates a new window configuration.
966  pub fn new() -> Self {
967    Self::default()
968  }
969
970  /// Sets the late data policy.
971  pub fn with_late_data_policy(mut self, policy: LateDataPolicy) -> Self {
972    self.late_data_policy = policy;
973    self
974  }
975
976  /// Sets the allowed lateness.
977  pub fn with_allowed_lateness(mut self, lateness: Duration) -> Self {
978    self.allowed_lateness = lateness;
979    self
980  }
981}
982
983// =============================================================================
984// Watermark Support
985// =============================================================================
986
987/// Represents a watermark in event-time processing.
988///
989/// A watermark is a statement that all events with timestamps less than or equal
990/// to the watermark have arrived. It's used to track event-time progress and
991/// determine when windows can be triggered.
992#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
993pub struct Watermark {
994  /// The timestamp represented by this watermark.
995  pub timestamp: DateTime<Utc>,
996}
997
998impl Watermark {
999  /// Creates a new watermark at the given timestamp.
1000  #[must_use]
1001  pub fn new(timestamp: DateTime<Utc>) -> Self {
1002    Self { timestamp }
1003  }
1004
1005  /// Creates an initial watermark at the minimum possible time.
1006  #[must_use]
1007  pub fn min() -> Self {
1008    Self {
1009      timestamp: DateTime::<Utc>::MIN_UTC,
1010    }
1011  }
1012
1013  /// Creates a watermark at the maximum possible time (end of stream).
1014  #[must_use]
1015  pub fn max() -> Self {
1016    Self {
1017      timestamp: DateTime::<Utc>::MAX_UTC,
1018    }
1019  }
1020
1021  /// Returns true if this is the end-of-stream watermark.
1022  #[must_use]
1023  pub fn is_end_of_stream(&self) -> bool {
1024    self.timestamp == DateTime::<Utc>::MAX_UTC
1025  }
1026
1027  /// Advances this watermark to at least the given timestamp.
1028  /// Returns true if the watermark was actually advanced.
1029  pub fn advance(&mut self, timestamp: DateTime<Utc>) -> bool {
1030    if timestamp > self.timestamp {
1031      self.timestamp = timestamp;
1032      true
1033    } else {
1034      false
1035    }
1036  }
1037}
1038
1039impl Display for Watermark {
1040  fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
1041    if self.is_end_of_stream() {
1042      write!(f, "Watermark(END)")
1043    } else {
1044      write!(f, "Watermark({})", self.timestamp)
1045    }
1046  }
1047}
1048
1049/// Trait for generating watermarks from observed event timestamps.
1050///
1051/// Different strategies exist for watermark generation:
1052/// - Monotonic: watermark follows the maximum observed timestamp exactly
1053/// - Bounded out-of-orderness: allows for some delay in event arrival
1054/// - Periodic: generates watermarks at fixed intervals
1055pub trait WatermarkGenerator: Send + Sync + std::fmt::Debug + 'static {
1056  /// Called when a new element arrives with the given event timestamp.
1057  /// Returns the new watermark if it should be emitted.
1058  fn on_event(&mut self, timestamp: DateTime<Utc>) -> Option<Watermark>;
1059
1060  /// Called periodically to generate watermarks even without new events.
1061  /// Returns the new watermark if it should be emitted.
1062  fn on_periodic_emit(&mut self) -> Option<Watermark>;
1063
1064  /// Returns the current watermark without advancing it.
1065  fn current_watermark(&self) -> Watermark;
1066
1067  /// Signals end of stream, generating the final watermark.
1068  fn on_end_of_stream(&mut self) -> Watermark {
1069    Watermark::max()
1070  }
1071}
1072
1073/// Generates watermarks that follow the maximum observed timestamp exactly.
1074///
1075/// This is suitable for streams where events arrive in order.
1076#[derive(Debug, Clone)]
1077pub struct MonotonicWatermarkGenerator {
1078  current: Watermark,
1079}
1080
1081impl MonotonicWatermarkGenerator {
1082  /// Creates a new monotonic watermark generator.
1083  #[must_use]
1084  pub fn new() -> Self {
1085    Self {
1086      current: Watermark::min(),
1087    }
1088  }
1089}
1090
1091impl Default for MonotonicWatermarkGenerator {
1092  fn default() -> Self {
1093    Self::new()
1094  }
1095}
1096
1097impl WatermarkGenerator for MonotonicWatermarkGenerator {
1098  fn on_event(&mut self, timestamp: DateTime<Utc>) -> Option<Watermark> {
1099    if self.current.advance(timestamp) {
1100      Some(self.current.clone())
1101    } else {
1102      None
1103    }
1104  }
1105
1106  fn on_periodic_emit(&mut self) -> Option<Watermark> {
1107    // Monotonic generator doesn't emit on periodic basis
1108    None
1109  }
1110
1111  fn current_watermark(&self) -> Watermark {
1112    self.current.clone()
1113  }
1114}
1115
1116/// Generates watermarks allowing for bounded out-of-orderness.
1117///
1118/// The watermark is set to `max_observed_timestamp - max_out_of_orderness`.
1119/// This allows events to arrive slightly out of order while still making progress.
1120#[derive(Debug, Clone)]
1121pub struct BoundedOutOfOrdernessGenerator {
1122  /// Maximum allowed out-of-orderness.
1123  max_out_of_orderness: Duration,
1124  /// Maximum timestamp seen so far.
1125  max_timestamp: Option<DateTime<Utc>>,
1126  /// Current watermark.
1127  current: Watermark,
1128}
1129
1130impl BoundedOutOfOrdernessGenerator {
1131  /// Creates a new bounded out-of-orderness watermark generator.
1132  #[must_use]
1133  pub fn new(max_out_of_orderness: Duration) -> Self {
1134    Self {
1135      max_out_of_orderness,
1136      max_timestamp: None,
1137      current: Watermark::min(),
1138    }
1139  }
1140
1141  fn calculate_watermark(&self) -> Watermark {
1142    match self.max_timestamp {
1143      Some(max_ts) => Watermark::new(max_ts - self.max_out_of_orderness),
1144      None => Watermark::min(),
1145    }
1146  }
1147}
1148
1149impl WatermarkGenerator for BoundedOutOfOrdernessGenerator {
1150  fn on_event(&mut self, timestamp: DateTime<Utc>) -> Option<Watermark> {
1151    // Update max timestamp
1152    self.max_timestamp = Some(
1153      self
1154        .max_timestamp
1155        .map(|current| current.max(timestamp))
1156        .unwrap_or(timestamp),
1157    );
1158
1159    // Calculate new watermark
1160    let new_watermark = self.calculate_watermark();
1161
1162    // Only emit if watermark advanced
1163    if new_watermark.timestamp > self.current.timestamp {
1164      self.current = new_watermark.clone();
1165      Some(new_watermark)
1166    } else {
1167      None
1168    }
1169  }
1170
1171  fn on_periodic_emit(&mut self) -> Option<Watermark> {
1172    // Emit current watermark on periodic basis
1173    Some(self.current.clone())
1174  }
1175
1176  fn current_watermark(&self) -> Watermark {
1177    self.current.clone()
1178  }
1179}
1180
1181/// Generates watermarks at fixed intervals regardless of event arrival.
1182#[derive(Debug, Clone)]
1183pub struct PeriodicWatermarkGenerator {
1184  /// The inner generator for calculating watermark values.
1185  inner: BoundedOutOfOrdernessGenerator,
1186  /// Interval between watermark emissions.
1187  interval: Duration,
1188  /// Last emission time.
1189  last_emit: Option<DateTime<Utc>>,
1190}
1191
1192impl PeriodicWatermarkGenerator {
1193  /// Creates a new periodic watermark generator.
1194  #[must_use]
1195  pub fn new(max_out_of_orderness: Duration, interval: Duration) -> Self {
1196    Self {
1197      inner: BoundedOutOfOrdernessGenerator::new(max_out_of_orderness),
1198      interval,
1199      last_emit: None,
1200    }
1201  }
1202}
1203
1204impl WatermarkGenerator for PeriodicWatermarkGenerator {
1205  fn on_event(&mut self, timestamp: DateTime<Utc>) -> Option<Watermark> {
1206    // Update the inner generator but don't emit watermark per-event
1207    self.inner.on_event(timestamp);
1208    None
1209  }
1210
1211  fn on_periodic_emit(&mut self) -> Option<Watermark> {
1212    let now = Utc::now();
1213    let interval_chrono = ChronoDuration::from_std(self.interval).unwrap_or(ChronoDuration::zero());
1214    let should_emit = self
1215      .last_emit
1216      .map(|last| now - last >= interval_chrono)
1217      .unwrap_or(true);
1218
1219    if should_emit {
1220      self.last_emit = Some(now);
1221      Some(self.inner.current_watermark())
1222    } else {
1223      None
1224    }
1225  }
1226
1227  fn current_watermark(&self) -> Watermark {
1228    self.inner.current_watermark()
1229  }
1230}
1231
1232// =============================================================================
1233// Late Data Handling
1234// =============================================================================
1235
1236/// Result of evaluating whether an element is late.
1237#[derive(Debug, Clone, PartialEq, Eq)]
1238pub enum LateDataResult<T> {
1239  /// The element is not late and should be processed normally.
1240  OnTime(T),
1241  /// The element is late but within allowed lateness.
1242  WithinLateness(T),
1243  /// The element is late and should be dropped.
1244  Drop,
1245  /// The element is late and should be redirected to side output.
1246  SideOutput(T),
1247}
1248
1249impl<T> LateDataResult<T> {
1250  /// Returns true if the element should be processed (OnTime or WithinLateness).
1251  #[must_use]
1252  pub fn should_process(&self) -> bool {
1253    matches!(
1254      self,
1255      LateDataResult::OnTime(_) | LateDataResult::WithinLateness(_)
1256    )
1257  }
1258
1259  /// Extracts the element if it should be processed.
1260  #[must_use]
1261  pub fn into_processable(self) -> Option<T> {
1262    match self {
1263      LateDataResult::OnTime(t) | LateDataResult::WithinLateness(t) => Some(t),
1264      _ => None,
1265    }
1266  }
1267
1268  /// Returns true if this is a side output.
1269  #[must_use]
1270  pub fn is_side_output(&self) -> bool {
1271    matches!(self, LateDataResult::SideOutput(_))
1272  }
1273
1274  /// Extracts the element for side output.
1275  #[must_use]
1276  pub fn into_side_output(self) -> Option<T> {
1277    match self {
1278      LateDataResult::SideOutput(t) => Some(t),
1279      _ => None,
1280    }
1281  }
1282}
1283
1284/// Handles late data according to the configured policy.
1285#[derive(Debug, Clone)]
1286pub struct LateDataHandler {
1287  /// Policy for handling late data.
1288  policy: LateDataPolicy,
1289  /// Statistics about late data.
1290  stats: LateDataStats,
1291}
1292
1293/// Statistics about late data processing.
1294#[derive(Debug, Clone, Default)]
1295pub struct LateDataStats {
1296  /// Number of on-time elements.
1297  pub on_time: u64,
1298  /// Number of elements within allowed lateness.
1299  pub within_lateness: u64,
1300  /// Number of dropped late elements.
1301  pub dropped: u64,
1302  /// Number of elements sent to side output.
1303  pub side_output: u64,
1304}
1305
1306impl LateDataHandler {
1307  /// Creates a new late data handler with the given policy.
1308  #[must_use]
1309  pub fn new(policy: LateDataPolicy) -> Self {
1310    Self {
1311      policy,
1312      stats: LateDataStats::default(),
1313    }
1314  }
1315
1316  /// Creates a handler that drops all late data.
1317  #[must_use]
1318  pub fn drop_late() -> Self {
1319    Self::new(LateDataPolicy::Drop)
1320  }
1321
1322  /// Creates a handler that allows late data within the given lateness window.
1323  #[must_use]
1324  pub fn with_allowed_lateness(lateness: Duration) -> Self {
1325    Self::new(LateDataPolicy::AllowLateness(lateness))
1326  }
1327
1328  /// Creates a handler that redirects late data to a side output.
1329  #[must_use]
1330  pub fn redirect_to_side_output() -> Self {
1331    Self::new(LateDataPolicy::SideOutput)
1332  }
1333
1334  /// Returns the allowed lateness based on the policy.
1335  fn allowed_lateness(&self) -> Duration {
1336    match &self.policy {
1337      LateDataPolicy::AllowLateness(d) => *d,
1338      _ => Duration::ZERO,
1339    }
1340  }
1341
1342  /// Evaluates whether an element is late and returns the appropriate action.
1343  ///
1344  /// # Arguments
1345  /// - `element_timestamp`: The event time of the element.
1346  /// - `current_watermark`: The current watermark.
1347  /// - `element`: The element to evaluate.
1348  pub fn evaluate<T>(
1349    &mut self,
1350    element_timestamp: DateTime<Utc>,
1351    current_watermark: &Watermark,
1352    element: T,
1353  ) -> LateDataResult<T> {
1354    // Element is on-time if its timestamp >= watermark
1355    if element_timestamp >= current_watermark.timestamp {
1356      self.stats.on_time += 1;
1357      return LateDataResult::OnTime(element);
1358    }
1359
1360    // Element is late - check if within allowed lateness
1361    let lateness_duration = current_watermark.timestamp - element_timestamp;
1362    let allowed = self.allowed_lateness();
1363
1364    // Convert chrono Duration to std Duration for comparison
1365    if let Ok(lateness_std) = lateness_duration.to_std()
1366      && lateness_std <= allowed
1367    {
1368      self.stats.within_lateness += 1;
1369      return LateDataResult::WithinLateness(element);
1370    }
1371
1372    // Element is truly late - apply policy
1373    match &self.policy {
1374      LateDataPolicy::Drop => {
1375        self.stats.dropped += 1;
1376        LateDataResult::Drop
1377      }
1378      LateDataPolicy::AllowLateness(_) => {
1379        // Even beyond allowed lateness, process it (policy says to allow)
1380        self.stats.within_lateness += 1;
1381        LateDataResult::WithinLateness(element)
1382      }
1383      LateDataPolicy::SideOutput => {
1384        self.stats.side_output += 1;
1385        LateDataResult::SideOutput(element)
1386      }
1387    }
1388  }
1389
1390  /// Returns statistics about late data handling.
1391  #[must_use]
1392  pub fn stats(&self) -> &LateDataStats {
1393    &self.stats
1394  }
1395
1396  /// Resets statistics.
1397  pub fn reset_stats(&mut self) {
1398    self.stats = LateDataStats::default();
1399  }
1400
1401  /// Returns the configured policy.
1402  #[must_use]
1403  pub fn policy(&self) -> &LateDataPolicy {
1404    &self.policy
1405  }
1406
1407  /// Returns the allowed lateness from the policy.
1408  #[must_use]
1409  pub fn get_allowed_lateness(&self) -> Duration {
1410    match &self.policy {
1411      LateDataPolicy::AllowLateness(d) => *d,
1412      _ => Duration::ZERO,
1413    }
1414  }
1415}
1416
1417// =============================================================================
1418// Windowed Stream Processing
1419// =============================================================================
1420
1421/// Tracks the state of a window including its watermark context.
1422#[derive(Debug)]
1423pub struct WindowState<W: Window, T> {
1424  /// The window this state belongs to.
1425  pub window: W,
1426  /// Elements currently in the window.
1427  pub elements: Vec<(DateTime<Utc>, T)>,
1428  /// Count of elements.
1429  pub count: usize,
1430  /// Whether the window has been triggered.
1431  pub triggered: bool,
1432  /// Last watermark that affected this window.
1433  pub last_watermark: Option<Watermark>,
1434}
1435
1436impl<W: Window, T: Clone> WindowState<W, T> {
1437  /// Creates a new window state.
1438  #[must_use]
1439  pub fn new(window: W) -> Self {
1440    Self {
1441      window,
1442      elements: Vec::new(),
1443      count: 0,
1444      triggered: false,
1445      last_watermark: None,
1446    }
1447  }
1448
1449  /// Adds an element to the window.
1450  pub fn add(&mut self, timestamp: DateTime<Utc>, element: T) {
1451    self.elements.push((timestamp, element));
1452    self.count += 1;
1453  }
1454
1455  /// Updates the watermark for this window.
1456  pub fn update_watermark(&mut self, watermark: Watermark) {
1457    self.last_watermark = Some(watermark);
1458  }
1459
1460  /// Returns true if the window should be garbage collected.
1461  ///
1462  /// A window can be GC'd if it has been triggered and the watermark
1463  /// has passed the window's end time plus allowed lateness.
1464  #[must_use]
1465  pub fn can_be_gc(&self, current_watermark: &Watermark, allowed_lateness: ChronoDuration) -> bool {
1466    if !self.triggered {
1467      return false;
1468    }
1469    match self.window.end_time() {
1470      Some(end_time) => current_watermark.timestamp > end_time + allowed_lateness,
1471      None => false, // Global windows are never GC'd
1472    }
1473  }
1474
1475  /// Clears all elements from the window.
1476  pub fn clear(&mut self) {
1477    self.elements.clear();
1478    self.count = 0;
1479  }
1480
1481  /// Marks the window as triggered.
1482  pub fn mark_triggered(&mut self) {
1483    self.triggered = true;
1484  }
1485
1486  /// Returns the elements as a slice.
1487  #[must_use]
1488  pub fn elements(&self) -> &[(DateTime<Utc>, T)] {
1489    &self.elements
1490  }
1491
1492  /// Extracts the element values without timestamps.
1493  #[must_use]
1494  pub fn values(&self) -> Vec<T> {
1495    self.elements.iter().map(|(_, v)| v.clone()).collect()
1496  }
1497}
1498
1499impl<W: Window, T: Clone> Clone for WindowState<W, T> {
1500  fn clone(&self) -> Self {
1501    Self {
1502      window: self.window.clone(),
1503      elements: self.elements.clone(),
1504      count: self.count,
1505      triggered: self.triggered,
1506      last_watermark: self.last_watermark.clone(),
1507    }
1508  }
1509}
1510
1511/// Shared window assigner reference.
1512pub type SharedWindowAssigner<W> = Arc<dyn WindowAssigner<W = W>>;
1513
1514#[cfg(test)]
1515mod tests {
1516  use super::*;
1517
1518  fn timestamp(hour: u32, minute: u32, second: u32) -> DateTime<Utc> {
1519    use chrono::TimeZone;
1520    Utc
1521      .with_ymd_and_hms(2024, 1, 1, hour, minute, second)
1522      .unwrap()
1523  }
1524
1525  // TimeWindow tests
1526  #[test]
1527  fn test_time_window_basic() {
1528    let start = timestamp(10, 0, 0);
1529    let end = timestamp(10, 5, 0);
1530    let window = TimeWindow::new(start, end);
1531
1532    assert_eq!(window.start(), start);
1533    assert_eq!(window.end(), end);
1534    assert_eq!(window.duration(), ChronoDuration::minutes(5));
1535  }
1536
1537  #[test]
1538  fn test_time_window_contains() {
1539    let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
1540
1541    assert!(window.contains(timestamp(10, 0, 0))); // Start is inclusive
1542    assert!(window.contains(timestamp(10, 2, 30)));
1543    assert!(!window.contains(timestamp(10, 5, 0))); // End is exclusive
1544    assert!(!window.contains(timestamp(9, 59, 59)));
1545  }
1546
1547  #[test]
1548  fn test_time_window_intersects() {
1549    let w1 = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
1550    let w2 = TimeWindow::new(timestamp(10, 3, 0), timestamp(10, 8, 0));
1551    let w3 = TimeWindow::new(timestamp(10, 5, 0), timestamp(10, 10, 0));
1552    let w4 = TimeWindow::new(timestamp(10, 10, 0), timestamp(10, 15, 0));
1553
1554    assert!(w1.intersects(&w2)); // Overlap
1555    assert!(!w1.intersects(&w3)); // Adjacent
1556    assert!(!w1.intersects(&w4)); // Disjoint
1557  }
1558
1559  #[test]
1560  fn test_time_window_merge() {
1561    let w1 = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
1562    let w2 = TimeWindow::new(timestamp(10, 3, 0), timestamp(10, 8, 0));
1563    let w3 = TimeWindow::new(timestamp(10, 10, 0), timestamp(10, 15, 0));
1564
1565    let merged = w1.merge(&w2).unwrap();
1566    assert_eq!(merged.start(), timestamp(10, 0, 0));
1567    assert_eq!(merged.end(), timestamp(10, 8, 0));
1568
1569    assert!(w1.merge(&w3).is_none()); // Can't merge disjoint
1570  }
1571
1572  #[test]
1573  fn test_time_window_display() {
1574    let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
1575    let s = format!("{}", window);
1576    assert!(s.contains("10:00:00"));
1577    assert!(s.contains("10:05:00"));
1578  }
1579
1580  // CountWindow tests
1581  #[test]
1582  fn test_count_window() {
1583    let window = CountWindow::new(42, 100);
1584    assert_eq!(window.id(), 42);
1585    assert_eq!(window.max_count(), 100);
1586  }
1587
1588  // SessionWindow tests
1589  #[test]
1590  fn test_session_window_from_element() {
1591    let ts = timestamp(10, 0, 0);
1592    let gap = Duration::from_secs(300); // 5 minutes
1593    let session = SessionWindow::from_element(ts, gap);
1594
1595    assert_eq!(session.start(), ts);
1596    assert_eq!(session.gap(), gap);
1597    assert!(session.contains(ts));
1598  }
1599
1600  #[test]
1601  fn test_session_window_extend() {
1602    let mut session = SessionWindow::from_element(timestamp(10, 0, 0), Duration::from_secs(300));
1603
1604    session.extend(timestamp(10, 2, 0));
1605    assert_eq!(session.start(), timestamp(10, 0, 0));
1606    // End should be extended
1607
1608    session.extend(timestamp(9, 58, 0)); // Earlier than start
1609    assert_eq!(session.start(), timestamp(9, 58, 0));
1610  }
1611
1612  #[test]
1613  fn test_session_window_merge() {
1614    let s1 = SessionWindow::from_element(timestamp(10, 0, 0), Duration::from_secs(300));
1615    let s2 = SessionWindow::from_element(timestamp(10, 2, 0), Duration::from_secs(300));
1616    let s3 = SessionWindow::from_element(timestamp(10, 30, 0), Duration::from_secs(300));
1617
1618    assert!(s1.should_merge(&s2)); // Overlapping
1619    assert!(!s1.should_merge(&s3)); // Too far apart
1620
1621    let merged = s1.merge(&s2).unwrap();
1622    assert_eq!(merged.start(), timestamp(10, 0, 0));
1623  }
1624
1625  // TumblingWindowAssigner tests
1626  #[test]
1627  fn test_tumbling_assigner() {
1628    let assigner = TumblingWindowAssigner::new(Duration::from_secs(300)); // 5 minute windows
1629
1630    let windows = assigner.assign_windows(timestamp(10, 2, 30));
1631    assert_eq!(windows.len(), 1);
1632
1633    // Window should start at 10:00:00
1634    let window = &windows[0];
1635    assert_eq!(window.start(), timestamp(10, 0, 0));
1636    assert_eq!(window.end(), timestamp(10, 5, 0));
1637  }
1638
1639  #[test]
1640  fn test_tumbling_assigner_with_offset() {
1641    let assigner =
1642      TumblingWindowAssigner::new(Duration::from_secs(300)).with_offset(Duration::from_secs(60)); // 1 minute offset
1643
1644    let windows = assigner.assign_windows(timestamp(10, 2, 30));
1645    assert_eq!(windows.len(), 1);
1646
1647    // Window should start at 10:01:00 with offset
1648    let window = &windows[0];
1649    assert_eq!(window.start(), timestamp(10, 1, 0));
1650    assert_eq!(window.end(), timestamp(10, 6, 0));
1651  }
1652
1653  // SlidingWindowAssigner tests
1654  #[test]
1655  fn test_sliding_assigner() {
1656    let assigner = SlidingWindowAssigner::new(
1657      Duration::from_secs(300), // 5 minute window
1658      Duration::from_secs(60),  // 1 minute slide
1659    );
1660
1661    let windows = assigner.assign_windows(timestamp(10, 2, 30));
1662
1663    // Element should be in multiple windows
1664    assert!(windows.len() > 1);
1665
1666    // All windows should contain the timestamp
1667    for window in &windows {
1668      assert!(window.contains(timestamp(10, 2, 30)));
1669    }
1670  }
1671
1672  // SessionWindowAssigner tests
1673  #[test]
1674  fn test_session_assigner() {
1675    let assigner = SessionWindowAssigner::new(Duration::from_secs(300)); // 5 minute gap
1676
1677    let windows = assigner.assign_windows(timestamp(10, 0, 0));
1678    assert_eq!(windows.len(), 1);
1679    assert_eq!(windows[0].gap(), Duration::from_secs(300));
1680  }
1681
1682  // CountWindowAssigner tests
1683  #[test]
1684  fn test_count_assigner() {
1685    let assigner = CountWindowAssigner::new(3);
1686
1687    // First three elements in window 0
1688    for _ in 0..3 {
1689      let windows = assigner.assign_windows(Utc::now());
1690      assert_eq!(windows[0].id(), 0);
1691    }
1692
1693    // Fourth element starts window 1
1694    let windows = assigner.assign_windows(Utc::now());
1695    assert_eq!(windows[0].id(), 1);
1696  }
1697
1698  // GlobalWindowAssigner tests
1699  #[test]
1700  fn test_global_assigner() {
1701    let assigner = GlobalWindowAssigner::new();
1702
1703    let windows = assigner.assign_windows(Utc::now());
1704    assert_eq!(windows.len(), 1);
1705    assert_eq!(windows[0], GlobalWindow);
1706  }
1707
1708  // Trigger tests
1709  #[test]
1710  fn test_event_time_trigger() {
1711    let mut trigger = EventTimeTrigger::new();
1712    let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
1713
1714    // Before window end
1715    let result = trigger.on_event_time(timestamp(10, 3, 0), &window);
1716    assert_eq!(result, TriggerResult::Continue);
1717
1718    // At window end
1719    let result = trigger.on_event_time(timestamp(10, 5, 0), &window);
1720    assert_eq!(result, TriggerResult::FireAndPurge);
1721  }
1722
1723  #[test]
1724  fn test_count_trigger() {
1725    let mut trigger = CountTrigger::new(3);
1726    let window = CountWindow::new(0, 3);
1727
1728    // First two elements
1729    assert_eq!(
1730      trigger.on_element(Utc::now(), &window),
1731      TriggerResult::Continue
1732    );
1733    assert_eq!(
1734      trigger.on_element(Utc::now(), &window),
1735      TriggerResult::Continue
1736    );
1737
1738    // Third element triggers
1739    assert_eq!(
1740      trigger.on_element(Utc::now(), &window),
1741      TriggerResult::FireAndPurge
1742    );
1743  }
1744
1745  #[test]
1746  fn test_processing_time_trigger() {
1747    let mut trigger = ProcessingTimeTrigger::new(Duration::from_secs(60));
1748    let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
1749
1750    // First call at 10:00:00
1751    let result = trigger.on_processing_time(timestamp(10, 0, 0), &window);
1752    assert_eq!(result, TriggerResult::Fire);
1753
1754    // Call at 10:00:30 - not yet 1 minute
1755    let result = trigger.on_processing_time(timestamp(10, 0, 30), &window);
1756    assert_eq!(result, TriggerResult::Continue);
1757
1758    // Call at 10:01:00 - 1 minute elapsed
1759    let result = trigger.on_processing_time(timestamp(10, 1, 0), &window);
1760    assert_eq!(result, TriggerResult::Fire);
1761  }
1762
1763  // WindowConfig tests
1764  #[test]
1765  fn test_window_config() {
1766    let config = WindowConfig::new()
1767      .with_late_data_policy(LateDataPolicy::SideOutput)
1768      .with_allowed_lateness(Duration::from_secs(60));
1769
1770    assert_eq!(config.late_data_policy, LateDataPolicy::SideOutput);
1771    assert_eq!(config.allowed_lateness, Duration::from_secs(60));
1772  }
1773
1774  // Error tests
1775  #[test]
1776  fn test_window_error_display() {
1777    let err = WindowError::InvalidConfig("bad config".to_string());
1778    assert!(err.to_string().contains("Invalid window config"));
1779
1780    let err = WindowError::NotFound("window1".to_string());
1781    assert!(err.to_string().contains("not found"));
1782
1783    let err = WindowError::WindowClosed("window1".to_string());
1784    assert!(err.to_string().contains("closed"));
1785
1786    let err = WindowError::StateError("state issue".to_string());
1787    assert!(err.to_string().contains("State error"));
1788  }
1789
1790  // TriggerResult tests
1791  #[test]
1792  fn test_trigger_result_equality() {
1793    assert_eq!(TriggerResult::Continue, TriggerResult::Continue);
1794    assert_eq!(TriggerResult::Fire, TriggerResult::Fire);
1795    assert_ne!(TriggerResult::Fire, TriggerResult::FireAndPurge);
1796  }
1797
1798  // LateDataPolicy tests
1799  #[test]
1800  fn test_late_data_policy_default() {
1801    let policy = LateDataPolicy::default();
1802    assert_eq!(policy, LateDataPolicy::Drop);
1803  }
1804
1805  // Clone trigger tests
1806  #[test]
1807  fn test_trigger_clone() {
1808    let trigger = EventTimeTrigger::new();
1809    let cloned = trigger.clone_trigger();
1810    // Verify the cloned trigger works
1811    let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
1812    assert_eq!(
1813      cloned
1814        .clone_trigger()
1815        .on_event_time(timestamp(10, 5, 0), &window),
1816      TriggerResult::FireAndPurge
1817    );
1818  }
1819
1820  // ==========================================================================
1821  // Watermark Tests
1822  // ==========================================================================
1823
1824  #[test]
1825  fn test_watermark_basic() {
1826    let ts = timestamp(10, 30, 0);
1827    let wm = Watermark::new(ts);
1828    assert_eq!(wm.timestamp, ts);
1829    assert!(!wm.is_end_of_stream());
1830    assert!(format!("{}", wm).contains("Watermark"));
1831  }
1832
1833  #[test]
1834  fn test_watermark_min_max() {
1835    let min_wm = Watermark::min();
1836    let max_wm = Watermark::max();
1837
1838    assert!(min_wm.timestamp < max_wm.timestamp);
1839    assert!(!min_wm.is_end_of_stream());
1840    assert!(max_wm.is_end_of_stream());
1841    assert!(format!("{}", max_wm).contains("END"));
1842  }
1843
1844  #[test]
1845  fn test_watermark_advance() {
1846    let mut wm = Watermark::new(timestamp(10, 0, 0));
1847
1848    // Advancing to a later time should succeed
1849    assert!(wm.advance(timestamp(10, 5, 0)));
1850    assert_eq!(wm.timestamp, timestamp(10, 5, 0));
1851
1852    // Advancing to an earlier time should not change it
1853    assert!(!wm.advance(timestamp(10, 3, 0)));
1854    assert_eq!(wm.timestamp, timestamp(10, 5, 0));
1855
1856    // Advancing to the same time should not change it
1857    assert!(!wm.advance(timestamp(10, 5, 0)));
1858    assert_eq!(wm.timestamp, timestamp(10, 5, 0));
1859  }
1860
1861  #[test]
1862  fn test_watermark_ordering() {
1863    let wm1 = Watermark::new(timestamp(10, 0, 0));
1864    let wm2 = Watermark::new(timestamp(10, 5, 0));
1865    let wm3 = Watermark::new(timestamp(10, 0, 0));
1866
1867    assert!(wm1 < wm2);
1868    assert!(wm2 > wm1);
1869    assert_eq!(wm1, wm3);
1870  }
1871
1872  // ==========================================================================
1873  // Watermark Generator Tests
1874  // ==========================================================================
1875
1876  #[test]
1877  fn test_monotonic_generator() {
1878    let mut generator = MonotonicWatermarkGenerator::new();
1879    assert_eq!(generator.current_watermark(), Watermark::min());
1880
1881    // First event should emit watermark
1882    let wm = generator.on_event(timestamp(10, 0, 0));
1883    assert!(wm.is_some());
1884    assert_eq!(wm.unwrap().timestamp, timestamp(10, 0, 0));
1885
1886    // Later event should emit watermark
1887    let wm = generator.on_event(timestamp(10, 5, 0));
1888    assert!(wm.is_some());
1889    assert_eq!(wm.unwrap().timestamp, timestamp(10, 5, 0));
1890
1891    // Earlier event should not emit watermark
1892    let wm = generator.on_event(timestamp(10, 3, 0));
1893    assert!(wm.is_none());
1894    assert_eq!(generator.current_watermark().timestamp, timestamp(10, 5, 0));
1895
1896    // Periodic emit should return None
1897    assert!(generator.on_periodic_emit().is_none());
1898  }
1899
1900  #[test]
1901  fn test_bounded_out_of_orderness_generator() {
1902    let max_delay = Duration::from_secs(5);
1903    let mut generator = BoundedOutOfOrdernessGenerator::new(max_delay);
1904    assert_eq!(generator.current_watermark(), Watermark::min());
1905
1906    // First event at 10:00:00 -> watermark at 09:59:55
1907    let wm = generator.on_event(timestamp(10, 0, 0));
1908    assert!(wm.is_some());
1909    assert_eq!(wm.unwrap().timestamp, timestamp(9, 59, 55));
1910
1911    // Event at 10:00:10 -> watermark at 10:00:05
1912    let wm = generator.on_event(timestamp(10, 0, 10));
1913    assert!(wm.is_some());
1914    assert_eq!(wm.unwrap().timestamp, timestamp(10, 0, 5));
1915
1916    // Event at 10:00:05 (late but within bounds) -> no new watermark
1917    let wm = generator.on_event(timestamp(10, 0, 5));
1918    assert!(wm.is_none());
1919    assert_eq!(generator.current_watermark().timestamp, timestamp(10, 0, 5));
1920
1921    // Periodic emit should return current watermark
1922    let wm = generator.on_periodic_emit();
1923    assert!(wm.is_some());
1924    assert_eq!(wm.unwrap().timestamp, timestamp(10, 0, 5));
1925  }
1926
1927  #[test]
1928  fn test_periodic_watermark_generator() {
1929    let max_delay = Duration::from_secs(5);
1930    let interval = Duration::from_secs(10);
1931    let mut generator = PeriodicWatermarkGenerator::new(max_delay, interval);
1932
1933    // Events should not emit watermarks directly
1934    let wm = generator.on_event(timestamp(10, 0, 0));
1935    assert!(wm.is_none());
1936
1937    generator.on_event(timestamp(10, 0, 10));
1938    assert!(generator.on_event(timestamp(10, 0, 15)).is_none());
1939
1940    // First periodic emit should work (no previous emit)
1941    let wm = generator.on_periodic_emit();
1942    assert!(wm.is_some());
1943    // Watermark should be 10:00:10 (max) - 5s (delay) = 10:00:05
1944    assert_eq!(wm.unwrap().timestamp, timestamp(10, 0, 10));
1945  }
1946
1947  #[test]
1948  fn test_watermark_generator_end_of_stream() {
1949    let mut generator = MonotonicWatermarkGenerator::new();
1950    generator.on_event(timestamp(10, 0, 0));
1951
1952    let eos = generator.on_end_of_stream();
1953    assert!(eos.is_end_of_stream());
1954  }
1955
1956  // ==========================================================================
1957  // Late Data Handler Tests
1958  // ==========================================================================
1959
1960  #[test]
1961  fn test_late_data_result() {
1962    let on_time: LateDataResult<i32> = LateDataResult::OnTime(42);
1963    assert!(on_time.should_process());
1964    assert_eq!(on_time.into_processable(), Some(42));
1965
1966    let within: LateDataResult<i32> = LateDataResult::WithinLateness(42);
1967    assert!(within.should_process());
1968
1969    let drop: LateDataResult<i32> = LateDataResult::Drop;
1970    assert!(!drop.should_process());
1971    assert_eq!(drop.into_processable(), None);
1972
1973    let side: LateDataResult<i32> = LateDataResult::SideOutput(42);
1974    assert!(!side.should_process());
1975    assert!(side.is_side_output());
1976    assert_eq!(LateDataResult::SideOutput(42).into_side_output(), Some(42));
1977  }
1978
1979  #[test]
1980  fn test_late_data_handler_on_time() {
1981    let mut handler = LateDataHandler::drop_late();
1982    let watermark = Watermark::new(timestamp(10, 0, 0));
1983
1984    // Element at same time as watermark is on-time
1985    let result = handler.evaluate(timestamp(10, 0, 0), &watermark, "data");
1986    assert!(matches!(result, LateDataResult::OnTime("data")));
1987
1988    // Element after watermark is on-time
1989    let result = handler.evaluate(timestamp(10, 5, 0), &watermark, "data");
1990    assert!(matches!(result, LateDataResult::OnTime("data")));
1991
1992    assert_eq!(handler.stats().on_time, 2);
1993  }
1994
1995  #[test]
1996  fn test_late_data_handler_within_lateness() {
1997    let mut handler = LateDataHandler::with_allowed_lateness(Duration::from_secs(30));
1998    let watermark = Watermark::new(timestamp(10, 0, 0));
1999
2000    // Element 10 seconds before watermark (within 30s lateness)
2001    let result = handler.evaluate(timestamp(9, 59, 50), &watermark, "data");
2002    assert!(matches!(result, LateDataResult::WithinLateness("data")));
2003
2004    assert_eq!(handler.stats().within_lateness, 1);
2005  }
2006
2007  #[test]
2008  fn test_late_data_handler_drop() {
2009    let mut handler = LateDataHandler::drop_late();
2010    let watermark = Watermark::new(timestamp(10, 0, 0));
2011
2012    // Element 30 seconds before watermark (late, no allowed lateness)
2013    let result = handler.evaluate(timestamp(9, 59, 30), &watermark, "data");
2014    assert!(matches!(result, LateDataResult::Drop));
2015
2016    assert_eq!(handler.stats().dropped, 1);
2017  }
2018
2019  #[test]
2020  fn test_late_data_handler_side_output() {
2021    let mut handler = LateDataHandler::redirect_to_side_output();
2022    let watermark = Watermark::new(timestamp(10, 0, 0));
2023
2024    // Element 30 seconds before watermark (late, goes to side output)
2025    let result = handler.evaluate(timestamp(9, 59, 30), &watermark, "data");
2026    assert!(matches!(result, LateDataResult::SideOutput("data")));
2027
2028    assert_eq!(handler.stats().side_output, 1);
2029  }
2030
2031  #[test]
2032  fn test_late_data_handler_stats_reset() {
2033    let mut handler = LateDataHandler::drop_late();
2034    let watermark = Watermark::new(timestamp(10, 0, 0));
2035
2036    handler.evaluate(timestamp(10, 0, 0), &watermark, "data");
2037    handler.evaluate(timestamp(9, 59, 0), &watermark, "data");
2038
2039    assert_eq!(handler.stats().on_time, 1);
2040    assert_eq!(handler.stats().dropped, 1);
2041
2042    handler.reset_stats();
2043    assert_eq!(handler.stats().on_time, 0);
2044    assert_eq!(handler.stats().dropped, 0);
2045  }
2046
2047  #[test]
2048  fn test_late_data_handler_accessors() {
2049    let handler = LateDataHandler::with_allowed_lateness(Duration::from_secs(30));
2050    assert!(matches!(handler.policy(), LateDataPolicy::AllowLateness(_)));
2051    assert_eq!(handler.get_allowed_lateness(), Duration::from_secs(30));
2052  }
2053
2054  // ==========================================================================
2055  // Window State Tests
2056  // ==========================================================================
2057
2058  #[test]
2059  fn test_window_state_basic() {
2060    let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2061    let mut state: WindowState<TimeWindow, i32> = WindowState::new(window.clone());
2062
2063    assert_eq!(state.count, 0);
2064    assert!(!state.triggered);
2065    assert!(state.last_watermark.is_none());
2066
2067    state.add(timestamp(10, 1, 0), 42);
2068    state.add(timestamp(10, 2, 0), 43);
2069
2070    assert_eq!(state.count, 2);
2071    assert_eq!(state.elements().len(), 2);
2072    assert_eq!(state.values(), vec![42, 43]);
2073  }
2074
2075  #[test]
2076  fn test_window_state_watermark() {
2077    let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2078    let mut state: WindowState<TimeWindow, i32> = WindowState::new(window);
2079
2080    let wm = Watermark::new(timestamp(10, 3, 0));
2081    state.update_watermark(wm.clone());
2082    assert_eq!(state.last_watermark, Some(wm));
2083  }
2084
2085  #[test]
2086  fn test_window_state_trigger_and_clear() {
2087    let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2088    let mut state: WindowState<TimeWindow, i32> = WindowState::new(window);
2089
2090    state.add(timestamp(10, 1, 0), 42);
2091    state.mark_triggered();
2092    assert!(state.triggered);
2093
2094    state.clear();
2095    assert_eq!(state.count, 0);
2096    assert!(state.elements().is_empty());
2097  }
2098
2099  #[test]
2100  fn test_window_state_gc() {
2101    let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2102    let mut state: WindowState<TimeWindow, i32> = WindowState::new(window);
2103
2104    // Non-triggered windows cannot be GC'd
2105    let wm = Watermark::new(timestamp(11, 0, 0));
2106    assert!(!state.can_be_gc(&wm, ChronoDuration::zero()));
2107
2108    // Triggered window can be GC'd after watermark passes end + lateness
2109    state.mark_triggered();
2110    assert!(state.can_be_gc(&wm, ChronoDuration::zero()));
2111
2112    // With allowed lateness, needs more time
2113    let wm2 = Watermark::new(timestamp(10, 5, 0));
2114    assert!(!state.can_be_gc(&wm2, ChronoDuration::minutes(1)));
2115  }
2116
2117  #[test]
2118  fn test_window_state_clone() {
2119    let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2120    let mut state: WindowState<TimeWindow, i32> = WindowState::new(window);
2121    state.add(timestamp(10, 1, 0), 42);
2122    state.mark_triggered();
2123
2124    let cloned = state.clone();
2125    assert_eq!(cloned.count, 1);
2126    assert!(cloned.triggered);
2127    assert_eq!(cloned.values(), vec![42]);
2128  }
2129
2130  #[test]
2131  fn test_count_window_display() {
2132    let window = CountWindow::new(42, 100);
2133    let s = format!("{}", window);
2134    assert!(s.contains("id=42"));
2135    assert!(s.contains("max=100"));
2136  }
2137
2138  #[test]
2139  fn test_session_window_new() {
2140    let start = timestamp(10, 0, 0);
2141    let end = timestamp(10, 5, 0);
2142    let gap = Duration::from_secs(300);
2143    let session = SessionWindow::new(start, end, gap);
2144
2145    assert_eq!(session.start(), start);
2146    assert_eq!(session.end(), end);
2147    assert_eq!(session.gap(), gap);
2148  }
2149
2150  #[test]
2151  fn test_session_window_display() {
2152    let session = SessionWindow::from_element(timestamp(10, 0, 0), Duration::from_secs(300));
2153    let s = format!("{}", session);
2154    assert!(s.contains("Session"));
2155    assert!(s.contains("gap="));
2156  }
2157
2158  #[test]
2159  fn test_session_window_hash() {
2160    use std::collections::hash_map::DefaultHasher;
2161    use std::hash::{Hash, Hasher};
2162
2163    let s1 = SessionWindow::from_element(timestamp(10, 0, 0), Duration::from_secs(300));
2164    let s2 = SessionWindow::from_element(timestamp(10, 0, 0), Duration::from_secs(300));
2165    let s3 = SessionWindow::from_element(timestamp(10, 1, 0), Duration::from_secs(300));
2166
2167    let mut h1 = DefaultHasher::new();
2168    let mut h2 = DefaultHasher::new();
2169    let mut h3 = DefaultHasher::new();
2170
2171    s1.hash(&mut h1);
2172    s2.hash(&mut h2);
2173    s3.hash(&mut h3);
2174
2175    assert_eq!(h1.finish(), h2.finish()); // Same windows have same hash
2176    assert_ne!(h1.finish(), h3.finish()); // Different windows have different hash
2177  }
2178
2179  #[test]
2180  fn test_time_window_ord() {
2181    let w1 = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2182    let w2 = TimeWindow::new(timestamp(10, 5, 0), timestamp(10, 10, 0));
2183    let w3 = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2184
2185    assert!(w1 < w2);
2186    assert_eq!(w1, w3);
2187    assert!(w2 > w1);
2188  }
2189
2190  #[test]
2191  fn test_late_data_handler_new() {
2192    let handler = LateDataHandler::new(LateDataPolicy::Drop);
2193    assert!(matches!(handler.policy(), LateDataPolicy::Drop));
2194    assert_eq!(handler.get_allowed_lateness(), Duration::ZERO);
2195  }
2196
2197  #[test]
2198  fn test_late_data_handler_allow_lateness() {
2199    let handler = LateDataHandler::new(LateDataPolicy::AllowLateness(Duration::from_secs(10)));
2200    assert_eq!(handler.get_allowed_lateness(), Duration::from_secs(10));
2201    assert!(matches!(handler.policy(), LateDataPolicy::AllowLateness(_)));
2202  }
2203
2204  #[test]
2205  fn test_late_data_handler_stats() {
2206    let handler = LateDataHandler::new(LateDataPolicy::Drop);
2207    let stats = handler.stats();
2208    assert_eq!(stats.dropped, 0);
2209    assert_eq!(stats.within_lateness, 0);
2210    assert_eq!(stats.side_output, 0);
2211  }
2212
2213  #[test]
2214  fn test_late_data_handler_reset_stats() {
2215    let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
2216    // Note: We can't easily test handle_late_data without creating actual late elements
2217    // But we can test reset_stats
2218    handler.reset_stats();
2219    let stats = handler.stats();
2220    assert_eq!(stats.dropped, 0);
2221  }
2222
2223  #[test]
2224  fn test_window_state_new() {
2225    let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2226    let state: WindowState<TimeWindow, i32> = WindowState::new(window);
2227
2228    assert_eq!(state.count, 0);
2229    assert!(!state.triggered);
2230    assert!(state.elements().is_empty());
2231    assert_eq!(state.last_watermark, None);
2232  }
2233
2234  #[test]
2235  fn test_window_state_add_multiple() {
2236    let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2237    let mut state: WindowState<TimeWindow, i32> = WindowState::new(window);
2238
2239    state.add(timestamp(10, 1, 0), 42);
2240    state.add(timestamp(10, 2, 0), 43);
2241    state.add(timestamp(10, 3, 0), 44);
2242
2243    assert_eq!(state.count, 3);
2244    assert_eq!(state.elements().len(), 3);
2245    assert_eq!(state.values(), vec![42, 43, 44]);
2246  }
2247
2248  #[test]
2249  fn test_window_state_can_be_gc_global_window() {
2250    use crate::window::GlobalWindow;
2251    let window = GlobalWindow;
2252    let mut state: WindowState<GlobalWindow, i32> = WindowState::new(window);
2253    state.mark_triggered();
2254
2255    let wm = Watermark::new(timestamp(11, 0, 0));
2256    // Global windows are never GC'd
2257    assert!(!state.can_be_gc(&wm, ChronoDuration::zero()));
2258  }
2259
2260  #[test]
2261  fn test_window_state_can_be_gc_with_lateness() {
2262    let window = TimeWindow::new(timestamp(10, 0, 0), timestamp(10, 5, 0));
2263    let mut state: WindowState<TimeWindow, i32> = WindowState::new(window);
2264    state.mark_triggered();
2265
2266    // Watermark at end time, but with 1 minute lateness, should not be GC'd yet
2267    let wm = Watermark::new(timestamp(10, 5, 0));
2268    assert!(!state.can_be_gc(&wm, ChronoDuration::minutes(1)));
2269
2270    // Watermark past end + lateness, should be GC'd
2271    let wm2 = Watermark::new(timestamp(10, 6, 1));
2272    assert!(state.can_be_gc(&wm2, ChronoDuration::minutes(1)));
2273  }
2274}