Skip to main content

laminar_core/time/
watermark.rs

1//! # Watermark Generation and Tracking
2//!
3//! Watermarks indicate event-time progress through a streaming system. They are assertions
4//! that no events with timestamps earlier than the watermark are expected to arrive.
5//!
6//! ## Watermark Generation Strategies
7//!
8//! - [`BoundedOutOfOrdernessGenerator`]: Allows events to be late by a fixed duration
9//! - [`AscendingTimestampsGenerator`]: Assumes strictly increasing timestamps (no lateness)
10//! - [`PeriodicGenerator`]: Emits watermarks at fixed wall-clock intervals
11//! - [`PunctuatedGenerator`]: Emits watermarks based on special marker events
12//!
13//! ## Multi-Source Alignment
14//!
15//! When processing multiple input streams (e.g., joins), use [`WatermarkTracker`] to
16//! track the minimum watermark across all sources.
17//!
18//! ## Idle Source Handling
19//!
20//! Sources that stop producing events can block watermark progress. The
21//! [`IdleSourceDetector`] marks sources as idle after a configurable timeout.
22//!
23//! # Example
24//!
25//! ```rust
26//! use laminar_core::time::{
27//!     WatermarkGenerator, BoundedOutOfOrdernessGenerator,
28//!     WatermarkTracker, Watermark,
29//! };
30//!
31//! // Single source with bounded out-of-orderness
32//! let mut generator = BoundedOutOfOrdernessGenerator::new(1000); // 1 second
33//! let wm = generator.on_event(5000);
34//! assert_eq!(wm, Some(Watermark::new(4000)));
35//!
36//! // Multi-source tracking
37//! let mut tracker = WatermarkTracker::new(2);
38//! tracker.update_source(0, 5000);
39//! tracker.update_source(1, 3000);
40//! assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
41//! ```
42
43use std::time::{Duration, Instant};
44
45use super::Watermark;
46
47/// Trait for generating watermarks from event timestamps.
48///
49/// Implementations track observed timestamps and produce watermarks that indicate
50/// event-time progress. The watermark is an assertion that no events with timestamps
51/// earlier than the watermark are expected.
52pub trait WatermarkGenerator: Send {
53    /// Process an event timestamp and potentially emit a new watermark.
54    ///
55    /// Called for each event processed. Returns `Some(watermark)` if the watermark
56    /// should advance, `None` otherwise.
57    fn on_event(&mut self, timestamp: i64) -> Option<Watermark>;
58
59    /// Called periodically to emit watermarks based on wall-clock time.
60    ///
61    /// Useful for generating watermarks even when no events are arriving.
62    fn on_periodic(&mut self) -> Option<Watermark>;
63
64    /// Returns the current watermark value without advancing it.
65    fn current_watermark(&self) -> i64;
66}
67
68/// Watermark generator with bounded out-of-orderness.
69///
70/// Allows events to arrive out of order by up to `max_out_of_orderness` milliseconds.
71/// The watermark is always `max_timestamp_seen - max_out_of_orderness`.
72///
73/// # Example
74///
75/// ```rust
76/// use laminar_core::time::{BoundedOutOfOrdernessGenerator, WatermarkGenerator, Watermark};
77///
78/// let mut gen = BoundedOutOfOrdernessGenerator::new(100); // 100ms lateness allowed
79///
80/// // First event at t=1000
81/// assert_eq!(gen.on_event(1000), Some(Watermark::new(900)));
82///
83/// // Out-of-order event at t=800 - no watermark advance
84/// assert_eq!(gen.on_event(800), None);
85///
86/// // New max at t=1200
87/// assert_eq!(gen.on_event(1200), Some(Watermark::new(1100)));
88/// ```
89pub struct BoundedOutOfOrdernessGenerator {
90    max_out_of_orderness: i64,
91    current_max_timestamp: i64,
92    current_watermark: i64,
93}
94
95impl BoundedOutOfOrdernessGenerator {
96    /// Creates a new generator with the specified maximum out-of-orderness.
97    ///
98    /// # Arguments
99    ///
100    /// * `max_out_of_orderness` - Maximum allowed lateness in milliseconds
101    #[must_use]
102    pub fn new(max_out_of_orderness: i64) -> Self {
103        Self {
104            max_out_of_orderness,
105            current_max_timestamp: i64::MIN,
106            current_watermark: i64::MIN,
107        }
108    }
109
110    /// Creates a new generator from a `Duration`.
111    #[must_use]
112    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
113    pub fn from_duration(max_out_of_orderness: Duration) -> Self {
114        Self::new(max_out_of_orderness.as_millis() as i64)
115    }
116
117    /// Returns the maximum out-of-orderness in milliseconds.
118    #[must_use]
119    pub fn max_out_of_orderness(&self) -> i64 {
120        self.max_out_of_orderness
121    }
122}
123
124impl WatermarkGenerator for BoundedOutOfOrdernessGenerator {
125    #[inline]
126    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
127        if timestamp > self.current_max_timestamp {
128            self.current_max_timestamp = timestamp;
129            let new_watermark = timestamp.saturating_sub(self.max_out_of_orderness);
130            if new_watermark > self.current_watermark {
131                self.current_watermark = new_watermark;
132                return Some(Watermark::new(new_watermark));
133            }
134        }
135        None
136    }
137
138    #[inline]
139    fn on_periodic(&mut self) -> Option<Watermark> {
140        // Bounded out-of-orderness doesn't emit periodic watermarks
141        None
142    }
143
144    #[inline]
145    fn current_watermark(&self) -> i64 {
146        self.current_watermark
147    }
148}
149
150/// Watermark generator for strictly ascending timestamps.
151///
152/// Assumes events arrive in timestamp order with no lateness. The watermark
153/// equals the current timestamp. Use this for sources that guarantee ordering.
154///
155/// # Example
156///
157/// ```rust
158/// use laminar_core::time::{AscendingTimestampsGenerator, WatermarkGenerator, Watermark};
159///
160/// let mut gen = AscendingTimestampsGenerator::new();
161/// assert_eq!(gen.on_event(1000), Some(Watermark::new(1000)));
162/// assert_eq!(gen.on_event(2000), Some(Watermark::new(2000)));
163/// ```
164#[derive(Debug, Default)]
165pub struct AscendingTimestampsGenerator {
166    current_watermark: i64,
167}
168
169impl AscendingTimestampsGenerator {
170    /// Creates a new ascending timestamps generator.
171    #[must_use]
172    pub fn new() -> Self {
173        Self {
174            current_watermark: i64::MIN,
175        }
176    }
177}
178
179impl WatermarkGenerator for AscendingTimestampsGenerator {
180    #[inline]
181    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
182        if timestamp > self.current_watermark {
183            self.current_watermark = timestamp;
184            Some(Watermark::new(timestamp))
185        } else {
186            None
187        }
188    }
189
190    #[inline]
191    fn on_periodic(&mut self) -> Option<Watermark> {
192        None
193    }
194
195    #[inline]
196    fn current_watermark(&self) -> i64 {
197        self.current_watermark
198    }
199}
200
201/// Periodic watermark generator that emits at fixed wall-clock intervals.
202///
203/// Wraps another generator and emits watermarks periodically even when no
204/// events are arriving. Useful for handling idle sources and ensuring
205/// time-based windows eventually trigger.
206///
207/// # Example
208///
209/// ```rust,no_run
210/// use laminar_core::time::{
211///     PeriodicGenerator, BoundedOutOfOrdernessGenerator, WatermarkGenerator,
212/// };
213/// use std::time::Duration;
214///
215/// let inner = BoundedOutOfOrdernessGenerator::new(100);
216/// let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(500));
217///
218/// // First event
219/// gen.on_event(1000);
220///
221/// // Later, periodic check may emit watermark
222/// // (depends on wall-clock time elapsed)
223/// let wm = gen.on_periodic();
224/// ```
225pub struct PeriodicGenerator<G: WatermarkGenerator> {
226    inner: G,
227    period: Duration,
228    last_emit_time: Instant,
229    last_emitted_watermark: i64,
230}
231
232impl<G: WatermarkGenerator> PeriodicGenerator<G> {
233    /// Creates a new periodic generator wrapping another generator.
234    ///
235    /// # Arguments
236    ///
237    /// * `inner` - The underlying watermark generator
238    /// * `period` - How often to emit watermarks (wall-clock time)
239    #[must_use]
240    pub fn new(inner: G, period: Duration) -> Self {
241        Self {
242            inner,
243            period,
244            last_emit_time: Instant::now(),
245            last_emitted_watermark: i64::MIN,
246        }
247    }
248
249    /// Returns a reference to the inner generator.
250    #[must_use]
251    pub fn inner(&self) -> &G {
252        &self.inner
253    }
254
255    /// Returns a mutable reference to the inner generator.
256    pub fn inner_mut(&mut self) -> &mut G {
257        &mut self.inner
258    }
259}
260
261impl<G: WatermarkGenerator> WatermarkGenerator for PeriodicGenerator<G> {
262    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
263        let wm = self.inner.on_event(timestamp);
264        if let Some(ref w) = wm {
265            self.last_emitted_watermark = w.timestamp();
266            self.last_emit_time = Instant::now();
267        }
268        wm
269    }
270
271    fn on_periodic(&mut self) -> Option<Watermark> {
272        // Check if enough wall-clock time has passed
273        if self.last_emit_time.elapsed() >= self.period {
274            let current = self.inner.current_watermark();
275            if current > self.last_emitted_watermark {
276                self.last_emitted_watermark = current;
277                self.last_emit_time = Instant::now();
278                return Some(Watermark::new(current));
279            }
280            self.last_emit_time = Instant::now();
281        }
282        None
283    }
284
285    fn current_watermark(&self) -> i64 {
286        self.inner.current_watermark()
287    }
288}
289
290/// Punctuated watermark generator that emits based on special events.
291///
292/// Uses a predicate function to identify watermark-carrying events. When the
293/// predicate returns `Some(watermark)`, that watermark is emitted.
294///
295/// # Example
296///
297/// ```rust
298/// use laminar_core::time::{PunctuatedGenerator, WatermarkGenerator, Watermark};
299///
300/// // Emit watermark on every 1000ms boundary
301/// let mut gen = PunctuatedGenerator::new(|ts| {
302///     if ts % 1000 == 0 {
303///         Some(Watermark::new(ts))
304///     } else {
305///         None
306///     }
307/// });
308///
309/// assert_eq!(gen.on_event(999), None);
310/// assert_eq!(gen.on_event(1000), Some(Watermark::new(1000)));
311/// ```
312pub struct PunctuatedGenerator<F>
313where
314    F: Fn(i64) -> Option<Watermark> + Send,
315{
316    predicate: F,
317    current_watermark: i64,
318}
319
320impl<F> PunctuatedGenerator<F>
321where
322    F: Fn(i64) -> Option<Watermark> + Send,
323{
324    /// Creates a new punctuated generator with the given predicate.
325    ///
326    /// # Arguments
327    ///
328    /// * `predicate` - Function that returns `Some(Watermark)` for watermark events
329    #[must_use]
330    pub fn new(predicate: F) -> Self {
331        Self {
332            predicate,
333            current_watermark: i64::MIN,
334        }
335    }
336}
337
338impl<F> WatermarkGenerator for PunctuatedGenerator<F>
339where
340    F: Fn(i64) -> Option<Watermark> + Send,
341{
342    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
343        if let Some(wm) = (self.predicate)(timestamp) {
344            if wm.timestamp() > self.current_watermark {
345                self.current_watermark = wm.timestamp();
346                return Some(wm);
347            }
348        }
349        None
350    }
351
352    fn on_periodic(&mut self) -> Option<Watermark> {
353        None
354    }
355
356    fn current_watermark(&self) -> i64 {
357        self.current_watermark
358    }
359}
360
361/// Tracks watermarks across multiple input sources.
362///
363/// For operators with multiple inputs (e.g., joins, unions), the combined
364/// watermark is the minimum across all sources. This ensures no late events
365/// from any source are missed.
366///
367/// # Example
368///
369/// ```rust
370/// use laminar_core::time::{WatermarkTracker, Watermark};
371///
372/// let mut tracker = WatermarkTracker::new(3); // 3 sources
373///
374/// // Source 0 advances to 1000
375/// let wm = tracker.update_source(0, 1000);
376/// assert_eq!(wm, None); // Other sources still at MIN
377///
378/// // Source 1 advances to 2000
379/// tracker.update_source(1, 2000);
380///
381/// // Source 2 advances to 500
382/// let wm = tracker.update_source(2, 500);
383/// assert_eq!(wm, Some(Watermark::new(500))); // Min of all sources
384/// ```
385#[derive(Debug)]
386pub struct WatermarkTracker {
387    /// Watermark for each source
388    source_watermarks: Vec<i64>,
389    /// Combined minimum watermark
390    combined_watermark: i64,
391    /// Idle status for each source
392    idle_sources: Vec<bool>,
393    /// Last activity time for each source
394    last_activity: Vec<Instant>,
395    /// Idle timeout duration
396    idle_timeout: Duration,
397}
398
399impl WatermarkTracker {
400    /// Creates a new tracker for the specified number of sources.
401    #[must_use]
402    pub fn new(num_sources: usize) -> Self {
403        Self {
404            source_watermarks: vec![i64::MIN; num_sources],
405            combined_watermark: i64::MIN,
406            idle_sources: vec![false; num_sources],
407            last_activity: vec![Instant::now(); num_sources],
408            idle_timeout: Duration::from_secs(30), // Default 30 second idle timeout
409        }
410    }
411
412    /// Creates a new tracker with a custom idle timeout.
413    #[must_use]
414    pub fn with_idle_timeout(num_sources: usize, idle_timeout: Duration) -> Self {
415        Self {
416            source_watermarks: vec![i64::MIN; num_sources],
417            combined_watermark: i64::MIN,
418            idle_sources: vec![false; num_sources],
419            last_activity: vec![Instant::now(); num_sources],
420            idle_timeout,
421        }
422    }
423
424    /// Updates the watermark for a specific source.
425    ///
426    /// Returns `Some(Watermark)` if the combined watermark advances.
427    pub fn update_source(&mut self, source_id: usize, watermark: i64) -> Option<Watermark> {
428        if source_id >= self.source_watermarks.len() {
429            return None;
430        }
431
432        // Mark source as active
433        self.idle_sources[source_id] = false;
434        self.last_activity[source_id] = Instant::now();
435
436        // Update source watermark
437        if watermark > self.source_watermarks[source_id] {
438            self.source_watermarks[source_id] = watermark;
439            self.update_combined()
440        } else {
441            None
442        }
443    }
444
445    /// Marks a source as idle, excluding it from watermark calculation.
446    ///
447    /// Idle sources don't hold back the combined watermark.
448    pub fn mark_idle(&mut self, source_id: usize) -> Option<Watermark> {
449        if source_id >= self.idle_sources.len() {
450            return None;
451        }
452
453        self.idle_sources[source_id] = true;
454        self.update_combined()
455    }
456
457    /// Checks for sources that have been idle longer than the timeout.
458    ///
459    /// Should be called periodically to detect stalled sources.
460    pub fn check_idle_sources(&mut self) -> Option<Watermark> {
461        let mut any_marked = false;
462        for i in 0..self.idle_sources.len() {
463            if !self.idle_sources[i] && self.last_activity[i].elapsed() >= self.idle_timeout {
464                self.idle_sources[i] = true;
465                any_marked = true;
466            }
467        }
468        if any_marked {
469            self.update_combined()
470        } else {
471            None
472        }
473    }
474
475    /// Returns the current combined watermark.
476    #[must_use]
477    pub fn current_watermark(&self) -> Option<Watermark> {
478        if self.combined_watermark == i64::MIN {
479            None
480        } else {
481            Some(Watermark::new(self.combined_watermark))
482        }
483    }
484
485    /// Returns the watermark for a specific source.
486    #[must_use]
487    pub fn source_watermark(&self, source_id: usize) -> Option<i64> {
488        self.source_watermarks.get(source_id).copied()
489    }
490
491    /// Returns whether a source is marked as idle.
492    #[must_use]
493    pub fn is_idle(&self, source_id: usize) -> bool {
494        self.idle_sources.get(source_id).copied().unwrap_or(false)
495    }
496
497    /// Returns the number of sources being tracked.
498    #[must_use]
499    pub fn num_sources(&self) -> usize {
500        self.source_watermarks.len()
501    }
502
503    /// Returns the number of active (non-idle) sources.
504    #[must_use]
505    pub fn active_source_count(&self) -> usize {
506        self.idle_sources.iter().filter(|&&idle| !idle).count()
507    }
508
509    /// Updates the combined watermark based on all active sources.
510    fn update_combined(&mut self) -> Option<Watermark> {
511        // Calculate minimum across active sources only
512        let mut min_watermark = i64::MAX;
513        let mut has_active = false;
514
515        for (i, &wm) in self.source_watermarks.iter().enumerate() {
516            if !self.idle_sources[i] {
517                has_active = true;
518                min_watermark = min_watermark.min(wm);
519            }
520        }
521
522        // If all sources are idle, use the max watermark
523        if !has_active {
524            min_watermark = self
525                .source_watermarks
526                .iter()
527                .copied()
528                .max()
529                .unwrap_or(i64::MIN);
530        }
531
532        if min_watermark > self.combined_watermark && min_watermark != i64::MAX {
533            self.combined_watermark = min_watermark;
534            Some(Watermark::new(min_watermark))
535        } else {
536            None
537        }
538    }
539}
540
541/// Watermark generator for sources with embedded watermarks.
542///
543/// Some sources (like Kafka with EOS) may provide watermarks directly.
544/// This generator tracks both event timestamps and explicit watermarks.
545pub struct SourceProvidedGenerator {
546    /// Last watermark from the source
547    source_watermark: i64,
548    /// Fallback generator for when source doesn't provide watermarks
549    fallback: BoundedOutOfOrdernessGenerator,
550    /// Whether to use source watermarks when available
551    prefer_source: bool,
552}
553
554impl SourceProvidedGenerator {
555    /// Creates a new source-provided generator.
556    ///
557    /// # Arguments
558    ///
559    /// * `fallback_lateness` - Lateness for fallback bounded generator
560    /// * `prefer_source` - If true, source watermarks take precedence
561    #[must_use]
562    pub fn new(fallback_lateness: i64, prefer_source: bool) -> Self {
563        Self {
564            source_watermark: i64::MIN,
565            fallback: BoundedOutOfOrdernessGenerator::new(fallback_lateness),
566            prefer_source,
567        }
568    }
569
570    /// Updates the watermark from the source.
571    ///
572    /// Call this when the source provides an explicit watermark.
573    pub fn on_source_watermark(&mut self, watermark: i64) -> Option<Watermark> {
574        if watermark > self.source_watermark {
575            self.source_watermark = watermark;
576            if self.prefer_source || watermark > self.fallback.current_watermark() {
577                return Some(Watermark::new(watermark));
578            }
579        }
580        None
581    }
582}
583
584impl WatermarkGenerator for SourceProvidedGenerator {
585    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
586        let fallback_wm = self.fallback.on_event(timestamp);
587
588        if self.prefer_source {
589            // Only emit if source watermark allows and it's an advancement
590            if self.source_watermark > i64::MIN {
591                return None; // Wait for source watermark
592            }
593        }
594
595        fallback_wm
596    }
597
598    fn on_periodic(&mut self) -> Option<Watermark> {
599        None
600    }
601
602    fn current_watermark(&self) -> i64 {
603        if self.prefer_source && self.source_watermark > i64::MIN {
604            self.source_watermark
605        } else {
606            self.fallback.current_watermark().max(self.source_watermark)
607        }
608    }
609}
610
611/// Metrics for watermark tracking.
612#[derive(Debug, Clone, Default)]
613pub struct WatermarkMetrics {
614    /// Current watermark timestamp
615    pub current_watermark: i64,
616    /// Maximum observed event timestamp
617    pub max_event_timestamp: i64,
618    /// Number of watermark emissions
619    pub watermarks_emitted: u64,
620    /// Number of late events detected
621    pub late_events: u64,
622}
623
624impl WatermarkMetrics {
625    /// Creates new metrics.
626    #[must_use]
627    pub fn new() -> Self {
628        Self::default()
629    }
630
631    /// Returns the watermark lag (difference between max event time and watermark).
632    #[must_use]
633    pub fn lag(&self) -> i64 {
634        self.max_event_timestamp
635            .saturating_sub(self.current_watermark)
636    }
637}
638
639/// Watermark generator wrapper that collects metrics.
640pub struct MeteredGenerator<G: WatermarkGenerator> {
641    inner: G,
642    metrics: WatermarkMetrics,
643}
644
645impl<G: WatermarkGenerator> MeteredGenerator<G> {
646    /// Creates a new metered generator.
647    #[must_use]
648    pub fn new(inner: G) -> Self {
649        Self {
650            inner,
651            metrics: WatermarkMetrics::new(),
652        }
653    }
654
655    /// Returns the current metrics.
656    #[must_use]
657    pub fn metrics(&self) -> &WatermarkMetrics {
658        &self.metrics
659    }
660
661    /// Returns a mutable reference to the inner generator.
662    pub fn inner_mut(&mut self) -> &mut G {
663        &mut self.inner
664    }
665
666    /// Records a late event.
667    pub fn record_late_event(&mut self) {
668        self.metrics.late_events += 1;
669    }
670}
671
672impl<G: WatermarkGenerator> WatermarkGenerator for MeteredGenerator<G> {
673    fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
674        // Track max event timestamp
675        if timestamp > self.metrics.max_event_timestamp {
676            self.metrics.max_event_timestamp = timestamp;
677        }
678
679        let wm = self.inner.on_event(timestamp);
680        if let Some(ref w) = wm {
681            self.metrics.current_watermark = w.timestamp();
682            self.metrics.watermarks_emitted += 1;
683        }
684        wm
685    }
686
687    fn on_periodic(&mut self) -> Option<Watermark> {
688        let wm = self.inner.on_periodic();
689        if let Some(ref w) = wm {
690            self.metrics.current_watermark = w.timestamp();
691            self.metrics.watermarks_emitted += 1;
692        }
693        wm
694    }
695
696    fn current_watermark(&self) -> i64 {
697        self.inner.current_watermark()
698    }
699}
700
701#[cfg(test)]
702mod tests {
703    use super::*;
704
705    #[test]
706    fn test_bounded_generator_first_event() {
707        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
708        let wm = gen.on_event(1000);
709        assert_eq!(wm, Some(Watermark::new(900)));
710        assert_eq!(gen.current_watermark(), 900);
711    }
712
713    #[test]
714    fn test_bounded_generator_out_of_order() {
715        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
716
717        // First event
718        gen.on_event(1000);
719
720        // Out of order - should not emit new watermark
721        let wm = gen.on_event(800);
722        assert_eq!(wm, None);
723        assert_eq!(gen.current_watermark(), 900); // Still at 1000 - 100
724    }
725
726    #[test]
727    fn test_bounded_generator_advancement() {
728        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
729
730        gen.on_event(1000);
731        let wm = gen.on_event(1200);
732
733        assert_eq!(wm, Some(Watermark::new(1100)));
734    }
735
736    #[test]
737    fn test_bounded_generator_from_duration() {
738        let gen = BoundedOutOfOrdernessGenerator::from_duration(Duration::from_secs(5));
739        assert_eq!(gen.max_out_of_orderness(), 5000);
740    }
741
742    #[test]
743    fn test_bounded_generator_no_periodic() {
744        let mut gen = BoundedOutOfOrdernessGenerator::new(100);
745        assert_eq!(gen.on_periodic(), None);
746    }
747
748    #[test]
749    fn test_ascending_generator_advances_on_each_event() {
750        let mut gen = AscendingTimestampsGenerator::new();
751
752        let wm1 = gen.on_event(1000);
753        assert_eq!(wm1, Some(Watermark::new(1000)));
754
755        let wm2 = gen.on_event(2000);
756        assert_eq!(wm2, Some(Watermark::new(2000)));
757    }
758
759    #[test]
760    fn test_ascending_generator_ignores_backwards() {
761        let mut gen = AscendingTimestampsGenerator::new();
762
763        gen.on_event(2000);
764        let wm = gen.on_event(1000); // Earlier timestamp
765
766        assert_eq!(wm, None);
767        assert_eq!(gen.current_watermark(), 2000);
768    }
769
770    #[test]
771    fn test_periodic_generator_passes_through() {
772        let inner = BoundedOutOfOrdernessGenerator::new(100);
773        let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
774
775        let wm = gen.on_event(1000);
776        assert_eq!(wm, Some(Watermark::new(900)));
777    }
778
779    #[test]
780    fn test_periodic_generator_inner_access() {
781        let inner = BoundedOutOfOrdernessGenerator::new(100);
782        let gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
783
784        assert_eq!(gen.inner().max_out_of_orderness(), 100);
785    }
786
787    #[test]
788    fn test_punctuated_generator_predicate() {
789        let mut gen = PunctuatedGenerator::new(|ts| {
790            if ts % 1000 == 0 {
791                Some(Watermark::new(ts))
792            } else {
793                None
794            }
795        });
796
797        assert_eq!(gen.on_event(500), None);
798        assert_eq!(gen.on_event(999), None);
799        assert_eq!(gen.on_event(1000), Some(Watermark::new(1000)));
800        assert_eq!(gen.on_event(1500), None);
801        assert_eq!(gen.on_event(2000), Some(Watermark::new(2000)));
802    }
803
804    #[test]
805    fn test_punctuated_generator_no_regression() {
806        let mut gen = PunctuatedGenerator::new(|ts| Some(Watermark::new(ts)));
807
808        gen.on_event(2000);
809        let wm = gen.on_event(1000); // Lower watermark
810
811        assert_eq!(wm, None);
812        assert_eq!(gen.current_watermark(), 2000);
813    }
814
815    #[test]
816    fn test_tracker_single_source() {
817        let mut tracker = WatermarkTracker::new(1);
818
819        let wm = tracker.update_source(0, 1000);
820        assert_eq!(wm, Some(Watermark::new(1000)));
821        assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
822    }
823
824    #[test]
825    fn test_tracker_multiple_sources() {
826        let mut tracker = WatermarkTracker::new(3);
827
828        // All sources need to report before watermark advances
829        tracker.update_source(0, 1000);
830        tracker.update_source(1, 2000);
831        let wm = tracker.update_source(2, 500);
832
833        assert_eq!(wm, Some(Watermark::new(500))); // Minimum
834    }
835
836    #[test]
837    fn test_tracker_min_watermark() {
838        let mut tracker = WatermarkTracker::new(2);
839
840        tracker.update_source(0, 5000);
841        tracker.update_source(1, 3000);
842
843        assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
844
845        // Source 1 advances
846        tracker.update_source(1, 4000);
847        assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
848    }
849
850    #[test]
851    fn test_tracker_idle_source() {
852        let mut tracker = WatermarkTracker::new(2);
853
854        tracker.update_source(0, 5000);
855        tracker.update_source(1, 1000);
856
857        // Source 1 is slow, mark it idle
858        let wm = tracker.mark_idle(1);
859
860        // Now only source 0's watermark counts
861        assert_eq!(wm, Some(Watermark::new(5000)));
862    }
863
864    #[test]
865    fn test_tracker_all_idle() {
866        let mut tracker = WatermarkTracker::new(2);
867
868        tracker.update_source(0, 5000);
869        tracker.update_source(1, 3000);
870
871        tracker.mark_idle(0);
872        let wm = tracker.mark_idle(1);
873
874        // Use max when all idle
875        assert_eq!(wm, Some(Watermark::new(5000)));
876    }
877
878    #[test]
879    fn test_tracker_source_watermark() {
880        let mut tracker = WatermarkTracker::new(2);
881
882        tracker.update_source(0, 1000);
883        tracker.update_source(1, 2000);
884
885        assert_eq!(tracker.source_watermark(0), Some(1000));
886        assert_eq!(tracker.source_watermark(1), Some(2000));
887        assert_eq!(tracker.source_watermark(5), None); // Out of bounds
888    }
889
890    #[test]
891    fn test_tracker_active_source_count() {
892        let mut tracker = WatermarkTracker::new(3);
893
894        assert_eq!(tracker.active_source_count(), 3);
895
896        tracker.mark_idle(0);
897        assert_eq!(tracker.active_source_count(), 2);
898
899        tracker.mark_idle(2);
900        assert_eq!(tracker.active_source_count(), 1);
901
902        // Reactivate by updating
903        tracker.update_source(0, 1000);
904        assert_eq!(tracker.active_source_count(), 2);
905    }
906
907    #[test]
908    fn test_tracker_invalid_source() {
909        let mut tracker = WatermarkTracker::new(2);
910
911        let wm = tracker.update_source(5, 1000); // Invalid source ID
912        assert_eq!(wm, None);
913
914        let wm = tracker.mark_idle(5);
915        assert_eq!(wm, None);
916    }
917
918    #[test]
919    fn test_source_provided_fallback() {
920        let mut gen = SourceProvidedGenerator::new(100, false);
921
922        let wm = gen.on_event(1000);
923        assert_eq!(wm, Some(Watermark::new(900))); // Fallback behavior
924    }
925
926    #[test]
927    fn test_source_provided_explicit_watermark() {
928        let mut gen = SourceProvidedGenerator::new(100, true);
929
930        let wm = gen.on_source_watermark(500);
931        assert_eq!(wm, Some(Watermark::new(500)));
932        assert_eq!(gen.current_watermark(), 500);
933    }
934
935    #[test]
936    fn test_metered_generator_tracks_metrics() {
937        let inner = BoundedOutOfOrdernessGenerator::new(100);
938        let mut gen = MeteredGenerator::new(inner);
939
940        gen.on_event(1000);
941        gen.on_event(2000);
942        gen.on_event(1500); // Out of order
943
944        let metrics = gen.metrics();
945        assert_eq!(metrics.max_event_timestamp, 2000);
946        assert_eq!(metrics.watermarks_emitted, 2); // 1000 and 2000 advanced
947    }
948
949    #[test]
950    fn test_metered_generator_lag() {
951        let inner = BoundedOutOfOrdernessGenerator::new(100);
952        let mut gen = MeteredGenerator::new(inner);
953
954        gen.on_event(1000);
955
956        let metrics = gen.metrics();
957        assert_eq!(metrics.lag(), 100); // max_event (1000) - watermark (900)
958    }
959
960    #[test]
961    fn test_metered_generator_late_events() {
962        let inner = BoundedOutOfOrdernessGenerator::new(100);
963        let mut gen = MeteredGenerator::new(inner);
964
965        gen.record_late_event();
966        gen.record_late_event();
967
968        assert_eq!(gen.metrics().late_events, 2);
969    }
970
971    #[test]
972    fn test_watermark_metrics_default() {
973        let metrics = WatermarkMetrics::new();
974        assert_eq!(metrics.current_watermark, 0);
975        assert_eq!(metrics.max_event_timestamp, 0);
976        assert_eq!(metrics.watermarks_emitted, 0);
977        assert_eq!(metrics.late_events, 0);
978    }
979}