Skip to main content

laminar_core/time/
keyed_watermark.rs

1//! # Keyed Watermark Tracking
2//!
3//! Per-key watermark tracking to achieve 99%+ data accuracy compared to 63-67%
4//! with traditional global watermarks. This addresses the fundamental problem of
5//! fast-moving keys causing late data drops for slower keys.
6//!
7//! ## Problem with Global Watermarks
8//!
9//! ```text
10//! Global Watermark Scenario (Traditional):
11//! ┌─────────────────────────────────────────────────────────────────┐
12//! │              Global Watermark = 10:05 (from Key C)              │
13//! │                                                                 │
14//! │  Key A: ████████████░░░░░░ events at 10:03 → DROPPED (late!)   │
15//! │  Key B: ██████░░░░░░░░░░░░ events at 10:01 → DROPPED (late!)   │
16//! │  Key C: █████████████████ events at 10:08 → OK                 │
17//! │                                                                 │
18//! │  Result: Fast-moving Key C advances watermark, slow keys       │
19//! │          have their valid events dropped as "late"             │
20//! └─────────────────────────────────────────────────────────────────┘
21//! ```
22//!
23//! ## Keyed Watermarks Solution
24//!
25//! ```text
26//! Keyed Watermark Scenario (This Feature):
27//! ┌─────────────────────────────────────────────────────────────────┐
28//! │             Per-Key Watermarks                                  │
29//! │                                                                 │
30//! │  Key A: watermark = 10:02 → events at 10:03 → OK               │
31//! │  Key B: watermark = 10:00 → events at 10:01 → OK               │
32//! │  Key C: watermark = 10:07 → events at 10:08 → OK               │
33//! │                                                                 │
34//! │  Global (for ordering): min(A,B,C) = 10:00                     │
35//! │                                                                 │
36//! │  Result: Each key tracks its own progress independently        │
37//! │          99%+ accuracy vs 63-67% with global                   │
38//! └─────────────────────────────────────────────────────────────────┘
39//! ```
40//!
41//! ## Example
42//!
43//! ```rust
44//! use laminar_core::time::{KeyedWatermarkTracker, KeyedWatermarkConfig, Watermark};
45//! use std::time::Duration;
46//!
47//! let config = KeyedWatermarkConfig {
48//!     bounded_delay: Duration::from_secs(5),
49//!     idle_timeout: Duration::from_secs(60),
50//!     ..Default::default()
51//! };
52//!
53//! let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
54//!
55//! // Fast tenant advances quickly
56//! tracker.update("tenant_a".to_string(), 10_000);
57//! tracker.update("tenant_a".to_string(), 15_000);
58//!
59//! // Slow tenant at earlier time
60//! tracker.update("tenant_b".to_string(), 5_000);
61//!
62//! // Per-key watermarks
63//! assert_eq!(tracker.watermark_for_key(&"tenant_a".to_string()), Some(10_000)); // 15000 - 5000
64//! assert_eq!(tracker.watermark_for_key(&"tenant_b".to_string()), Some(0));      // 5000 - 5000
65//!
66//! // Global watermark is minimum across active keys
67//! assert_eq!(tracker.global_watermark(), Some(Watermark::new(0))); // min of 10000, 0
68//!
69//! // Events for tenant_b at 3000 are NOT late (their key watermark allows it)
70//! assert!(!tracker.is_late(&"tenant_b".to_string(), 3000));
71//!
72//! // But events for tenant_a at 3000 ARE late (their key watermark is 10000)
73//! assert!(tracker.is_late(&"tenant_a".to_string(), 3000));
74//! ```
75
76use std::collections::HashMap;
77use std::hash::Hash;
78use std::time::{Duration, Instant};
79
80use super::Watermark;
81
82/// Per-key watermark state.
83///
84/// Tracks the maximum event time seen for a key and computes its watermark
85/// using bounded out-of-orderness.
86#[derive(Debug, Clone)]
87pub struct KeyWatermarkState {
88    /// Maximum event time seen for this key
89    pub max_event_time: i64,
90    /// Current watermark (`max_event_time - bounded_delay`)
91    pub watermark: i64,
92    /// Last activity time (wall clock)
93    pub last_activity: Instant,
94    /// Whether this key is marked idle
95    pub is_idle: bool,
96}
97
98impl KeyWatermarkState {
99    /// Creates a new key watermark state.
100    #[must_use]
101    pub fn new() -> Self {
102        Self {
103            max_event_time: i64::MIN,
104            watermark: i64::MIN,
105            last_activity: Instant::now(),
106            is_idle: false,
107        }
108    }
109
110    /// Updates state with a new event timestamp.
111    ///
112    /// Returns `true` if the watermark advanced.
113    #[inline]
114    pub fn update(&mut self, event_time: i64, bounded_delay: i64) -> bool {
115        self.last_activity = Instant::now();
116        self.is_idle = false;
117
118        if event_time > self.max_event_time {
119            self.max_event_time = event_time;
120            let new_watermark = event_time.saturating_sub(bounded_delay);
121            if new_watermark > self.watermark {
122                self.watermark = new_watermark;
123                return true;
124            }
125        }
126        false
127    }
128
129    /// Checks if an event is late relative to this key's watermark.
130    #[inline]
131    #[must_use]
132    pub fn is_late(&self, event_time: i64) -> bool {
133        event_time < self.watermark
134    }
135}
136
137impl Default for KeyWatermarkState {
138    fn default() -> Self {
139        Self::new()
140    }
141}
142
143/// Keyed watermark tracker configuration.
144#[derive(Debug, Clone)]
145pub struct KeyedWatermarkConfig {
146    /// Maximum out-of-orderness for watermark calculation (in milliseconds)
147    pub bounded_delay: Duration,
148    /// Timeout before marking a key as idle
149    pub idle_timeout: Duration,
150    /// Maximum number of keys to track (for memory bounds)
151    pub max_keys: Option<usize>,
152    /// Eviction policy when `max_keys` reached
153    pub eviction_policy: KeyEvictionPolicy,
154}
155
156impl Default for KeyedWatermarkConfig {
157    fn default() -> Self {
158        Self {
159            bounded_delay: Duration::from_secs(5),
160            idle_timeout: Duration::from_secs(60),
161            max_keys: None,
162            eviction_policy: KeyEvictionPolicy::LeastRecentlyActive,
163        }
164    }
165}
166
167impl KeyedWatermarkConfig {
168    /// Creates a new configuration with the specified bounded delay.
169    #[must_use]
170    pub fn with_bounded_delay(bounded_delay: Duration) -> Self {
171        Self {
172            bounded_delay,
173            ..Default::default()
174        }
175    }
176
177    /// Sets the idle timeout.
178    #[must_use]
179    pub fn with_idle_timeout(mut self, timeout: Duration) -> Self {
180        self.idle_timeout = timeout;
181        self
182    }
183
184    /// Sets the maximum number of keys to track.
185    #[must_use]
186    pub fn with_max_keys(mut self, max_keys: usize) -> Self {
187        self.max_keys = Some(max_keys);
188        self
189    }
190
191    /// Sets the eviction policy.
192    #[must_use]
193    pub fn with_eviction_policy(mut self, policy: KeyEvictionPolicy) -> Self {
194        self.eviction_policy = policy;
195        self
196    }
197}
198
199/// Policy for evicting keys when `max_keys` is reached.
200#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
201pub enum KeyEvictionPolicy {
202    /// Evict key with oldest `last_activity`
203    #[default]
204    LeastRecentlyActive,
205    /// Evict key with lowest watermark
206    LowestWatermark,
207    /// Reject new keys (return error)
208    RejectNew,
209}
210
211/// Errors that can occur in keyed watermark operations.
212#[derive(Debug, Clone, thiserror::Error)]
213pub enum KeyedWatermarkError {
214    /// Maximum number of keys reached with `RejectNew` policy
215    #[error("Maximum keys reached ({max_keys}), cannot add new key")]
216    MaxKeysReached {
217        /// Maximum keys configured
218        max_keys: usize,
219    },
220}
221
222/// Metrics for keyed watermark tracking.
223#[derive(Debug, Clone, Default)]
224pub struct KeyedWatermarkMetrics {
225    /// Total unique keys tracked
226    pub total_keys: usize,
227    /// Currently active (non-idle) keys
228    pub active_keys: usize,
229    /// Idle keys
230    pub idle_keys: usize,
231    /// Keys evicted due to `max_keys` limit
232    pub evicted_keys: u64,
233    /// Global watermark advancements
234    pub global_advances: u64,
235    /// Per-key watermark advancements
236    pub key_advances: u64,
237}
238
239impl KeyedWatermarkMetrics {
240    /// Creates new metrics.
241    #[must_use]
242    pub fn new() -> Self {
243        Self::default()
244    }
245}
246
247/// Tracks watermarks per logical key.
248///
249/// Provides fine-grained watermark tracking for multi-tenant workloads
250/// and scenarios with significant event-time skew between keys.
251///
252/// # Research Background
253///
254/// Based on research (March 2025), keyed watermarks achieve
255/// **99%+ accuracy** compared to **63-67%** with global watermarks.
256///
257/// # Example
258///
259/// ```rust
260/// use laminar_core::time::{KeyedWatermarkTracker, KeyedWatermarkConfig, Watermark};
261/// use std::time::Duration;
262///
263/// let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_secs(5));
264/// let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
265///
266/// // Fast tenant advances quickly
267/// tracker.update("tenant_a".to_string(), 10_000);
268/// tracker.update("tenant_a".to_string(), 15_000);
269///
270/// // Slow tenant at earlier time
271/// tracker.update("tenant_b".to_string(), 5_000);
272///
273/// // Per-key watermarks differ
274/// assert_eq!(tracker.watermark_for_key(&"tenant_a".to_string()), Some(10_000));
275/// assert_eq!(tracker.watermark_for_key(&"tenant_b".to_string()), Some(0));
276///
277/// // Global watermark is minimum
278/// assert_eq!(tracker.global_watermark(), Some(Watermark::new(0)));
279/// ```
280#[derive(Debug)]
281pub struct KeyedWatermarkTracker<K: Hash + Eq + Clone> {
282    /// Per-key watermark state
283    key_states: HashMap<K, KeyWatermarkState>,
284
285    /// Global watermark (minimum across all active keys)
286    global_watermark: i64,
287
288    /// Configuration
289    config: KeyedWatermarkConfig,
290
291    /// Bounded delay in milliseconds (cached from config)
292    bounded_delay_ms: i64,
293
294    /// Metrics
295    metrics: KeyedWatermarkMetrics,
296}
297
298impl<K: Hash + Eq + Clone> KeyedWatermarkTracker<K> {
299    /// Creates a new keyed watermark tracker with the given configuration.
300    #[must_use]
301    #[allow(clippy::cast_possible_truncation)] // Duration.as_millis() fits i64 for practical values
302    pub fn new(config: KeyedWatermarkConfig) -> Self {
303        let bounded_delay_ms = config.bounded_delay.as_millis() as i64;
304        Self {
305            key_states: HashMap::new(),
306            global_watermark: i64::MIN,
307            config,
308            bounded_delay_ms,
309            metrics: KeyedWatermarkMetrics::new(),
310        }
311    }
312
313    /// Creates a tracker with default configuration.
314    #[must_use]
315    pub fn with_defaults() -> Self {
316        Self::new(KeyedWatermarkConfig::default())
317    }
318
319    /// Updates the watermark for a specific key.
320    ///
321    /// # Returns
322    ///
323    /// - `Ok(Some(Watermark))` if the global watermark changes
324    /// - `Ok(None)` if no global change
325    ///
326    /// # Errors
327    ///
328    /// Returns `KeyedWatermarkError::MaxKeysReached` if `max_keys` is reached
329    /// and the `RejectNew` eviction policy is configured.
330    ///
331    /// # Example
332    ///
333    /// ```rust
334    /// use laminar_core::time::{KeyedWatermarkTracker, KeyedWatermarkConfig};
335    /// use std::time::Duration;
336    ///
337    /// let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
338    /// let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
339    ///
340    /// // First update creates the key
341    /// let wm = tracker.update("key1".to_string(), 1000).unwrap();
342    /// assert!(wm.is_some()); // Global watermark advances
343    /// ```
344    #[allow(clippy::missing_panics_doc)] // Internal invariant: key is always present after insert
345    #[allow(clippy::needless_pass_by_value)] // Key must be owned for HashMap insertion
346    pub fn update(
347        &mut self,
348        key: K,
349        event_time: i64,
350    ) -> Result<Option<Watermark>, KeyedWatermarkError> {
351        // Check if we need to create a new key
352        if !self.key_states.contains_key(&key) {
353            // Check max_keys limit
354            if let Some(max_keys) = self.config.max_keys {
355                if self.key_states.len() >= max_keys {
356                    match self.config.eviction_policy {
357                        KeyEvictionPolicy::RejectNew => {
358                            return Err(KeyedWatermarkError::MaxKeysReached { max_keys });
359                        }
360                        KeyEvictionPolicy::LeastRecentlyActive => {
361                            self.evict_least_recently_active();
362                        }
363                        KeyEvictionPolicy::LowestWatermark => {
364                            self.evict_lowest_watermark();
365                        }
366                    }
367                }
368            }
369            self.key_states
370                .insert(key.clone(), KeyWatermarkState::new());
371            self.metrics.total_keys = self.key_states.len();
372        }
373
374        // Update the key's watermark
375        let state = self.key_states.get_mut(&key).expect("key just inserted");
376        let watermark_advanced = state.update(event_time, self.bounded_delay_ms);
377
378        if watermark_advanced {
379            self.metrics.key_advances += 1;
380        }
381
382        // Update metrics
383        self.update_metrics_counts();
384
385        // Try to advance global watermark
386        Ok(self.try_advance_global())
387    }
388
389    /// Batch update for multiple events (more efficient).
390    ///
391    /// Returns the new global watermark if it changed.
392    ///
393    /// # Errors
394    ///
395    /// Returns `KeyedWatermarkError::MaxKeysReached` if `max_keys` is reached
396    /// and the `RejectNew` eviction policy is configured.
397    #[allow(clippy::missing_panics_doc)] // Internal invariant: key is always present after insert
398    pub fn update_batch(
399        &mut self,
400        events: &[(K, i64)],
401    ) -> Result<Option<Watermark>, KeyedWatermarkError> {
402        for (key, event_time) in events {
403            // Check if we need to create a new key
404            if !self.key_states.contains_key(key) {
405                if let Some(max_keys) = self.config.max_keys {
406                    if self.key_states.len() >= max_keys {
407                        match self.config.eviction_policy {
408                            KeyEvictionPolicy::RejectNew => {
409                                return Err(KeyedWatermarkError::MaxKeysReached { max_keys });
410                            }
411                            KeyEvictionPolicy::LeastRecentlyActive => {
412                                self.evict_least_recently_active();
413                            }
414                            KeyEvictionPolicy::LowestWatermark => {
415                                self.evict_lowest_watermark();
416                            }
417                        }
418                    }
419                }
420                self.key_states
421                    .insert(key.clone(), KeyWatermarkState::new());
422            }
423
424            let state = self.key_states.get_mut(key).expect("key just inserted");
425            if state.update(*event_time, self.bounded_delay_ms) {
426                self.metrics.key_advances += 1;
427            }
428        }
429
430        self.metrics.total_keys = self.key_states.len();
431        self.update_metrics_counts();
432
433        Ok(self.try_advance_global())
434    }
435
436    /// Returns the watermark for a specific key.
437    #[must_use]
438    pub fn watermark_for_key(&self, key: &K) -> Option<i64> {
439        self.key_states.get(key).map(|s| s.watermark)
440    }
441
442    /// Returns the global watermark (minimum across active keys).
443    #[must_use]
444    pub fn global_watermark(&self) -> Option<Watermark> {
445        if self.global_watermark == i64::MIN {
446            None
447        } else {
448            Some(Watermark::new(self.global_watermark))
449        }
450    }
451
452    /// Checks if an event is late for its key.
453    ///
454    /// Uses the key's individual watermark, not the global watermark.
455    /// If the key doesn't exist, returns `false` (not late).
456    #[must_use]
457    pub fn is_late(&self, key: &K, event_time: i64) -> bool {
458        self.key_states
459            .get(key)
460            .is_some_and(|s| s.is_late(event_time))
461    }
462
463    /// Checks if an event is late using the global watermark.
464    ///
465    /// Use this for cross-key ordering guarantees.
466    #[must_use]
467    pub fn is_late_global(&self, event_time: i64) -> bool {
468        event_time < self.global_watermark
469    }
470
471    /// Marks a key as idle, excluding it from global watermark calculation.
472    ///
473    /// Returns `Some(Watermark)` if the global watermark advances.
474    pub fn mark_idle(&mut self, key: &K) -> Option<Watermark> {
475        if let Some(state) = self.key_states.get_mut(key) {
476            if !state.is_idle {
477                state.is_idle = true;
478                self.update_metrics_counts();
479                return self.try_advance_global();
480            }
481        }
482        None
483    }
484
485    /// Marks a key as active again.
486    pub fn mark_active(&mut self, key: &K) {
487        if let Some(state) = self.key_states.get_mut(key) {
488            if state.is_idle {
489                state.is_idle = false;
490                state.last_activity = Instant::now();
491                self.update_metrics_counts();
492            }
493        }
494    }
495
496    /// Checks for keys that have been idle longer than the timeout.
497    ///
498    /// Should be called periodically from Ring 1.
499    ///
500    /// Returns `Some(Watermark)` if marking idle keys causes the global watermark to advance.
501    pub fn check_idle_keys(&mut self) -> Option<Watermark> {
502        let idle_timeout = self.config.idle_timeout;
503        let mut any_marked = false;
504
505        for state in self.key_states.values_mut() {
506            if !state.is_idle && state.last_activity.elapsed() >= idle_timeout {
507                state.is_idle = true;
508                any_marked = true;
509            }
510        }
511
512        if any_marked {
513            self.update_metrics_counts();
514            self.try_advance_global()
515        } else {
516            None
517        }
518    }
519
520    /// Returns the number of active (non-idle) keys.
521    #[must_use]
522    pub fn active_key_count(&self) -> usize {
523        self.key_states.values().filter(|s| !s.is_idle).count()
524    }
525
526    /// Returns the total number of tracked keys.
527    #[must_use]
528    pub fn total_key_count(&self) -> usize {
529        self.key_states.len()
530    }
531
532    /// Returns metrics.
533    #[must_use]
534    pub fn metrics(&self) -> &KeyedWatermarkMetrics {
535        &self.metrics
536    }
537
538    /// Forces recalculation of global watermark.
539    ///
540    /// Useful after bulk operations or recovery.
541    pub fn recalculate_global(&mut self) -> Option<Watermark> {
542        let new_global = self.calculate_global();
543        if new_global != i64::MAX && new_global != i64::MIN {
544            self.global_watermark = new_global;
545            Some(Watermark::new(new_global))
546        } else {
547            None
548        }
549    }
550
551    /// Removes a key from tracking.
552    ///
553    /// Returns the key's watermark state if it existed.
554    pub fn remove_key(&mut self, key: &K) -> Option<KeyWatermarkState> {
555        let state = self.key_states.remove(key);
556        if state.is_some() {
557            self.metrics.total_keys = self.key_states.len();
558            self.update_metrics_counts();
559            // Recalculate global since we removed a key
560            let new_global = self.calculate_global();
561            if new_global > self.global_watermark && new_global != i64::MAX {
562                self.global_watermark = new_global;
563            }
564        }
565        state
566    }
567
568    /// Clears all tracked keys.
569    pub fn clear(&mut self) {
570        self.key_states.clear();
571        self.global_watermark = i64::MIN;
572        self.metrics = KeyedWatermarkMetrics::new();
573    }
574
575    /// Returns the state for a specific key.
576    #[must_use]
577    pub fn key_state(&self, key: &K) -> Option<&KeyWatermarkState> {
578        self.key_states.get(key)
579    }
580
581    /// Returns the configuration.
582    #[must_use]
583    pub fn config(&self) -> &KeyedWatermarkConfig {
584        &self.config
585    }
586
587    /// Checks if a key exists in the tracker.
588    #[must_use]
589    pub fn contains_key(&self, key: &K) -> bool {
590        self.key_states.contains_key(key)
591    }
592
593    /// Returns an iterator over all keys.
594    pub fn keys(&self) -> impl Iterator<Item = &K> {
595        self.key_states.keys()
596    }
597
598    /// Returns an iterator over all key-state pairs.
599    pub fn iter(&self) -> impl Iterator<Item = (&K, &KeyWatermarkState)> {
600        self.key_states.iter()
601    }
602
603    /// Returns the bounded delay in milliseconds.
604    #[must_use]
605    pub fn bounded_delay_ms(&self) -> i64 {
606        self.bounded_delay_ms
607    }
608
609    /// Tries to advance the global watermark.
610    ///
611    /// Note: Unlike partition-based tracking where global only advances,
612    /// keyed watermarks can regress when new keys are added (since a new
613    /// key starts with MIN watermark). This is intentional - the global
614    /// watermark always reflects the minimum across active keys.
615    fn try_advance_global(&mut self) -> Option<Watermark> {
616        let new_global = self.calculate_global();
617
618        if new_global != i64::MAX && new_global != i64::MIN && new_global != self.global_watermark {
619            let old_global = self.global_watermark;
620            self.global_watermark = new_global;
621            if new_global > old_global {
622                self.metrics.global_advances += 1;
623            }
624            Some(Watermark::new(new_global))
625        } else {
626            None
627        }
628    }
629
630    /// Calculates the global watermark.
631    fn calculate_global(&self) -> i64 {
632        let mut min_watermark = i64::MAX;
633        let mut has_active = false;
634
635        for state in self.key_states.values() {
636            if !state.is_idle {
637                has_active = true;
638                min_watermark = min_watermark.min(state.watermark);
639            }
640        }
641
642        // If all keys are idle, use the max watermark to allow progress
643        if !has_active {
644            min_watermark = self
645                .key_states
646                .values()
647                .map(|s| s.watermark)
648                .max()
649                .unwrap_or(i64::MIN);
650        }
651
652        min_watermark
653    }
654
655    /// Updates the metrics counts.
656    fn update_metrics_counts(&mut self) {
657        self.metrics.idle_keys = self.key_states.values().filter(|s| s.is_idle).count();
658        self.metrics.active_keys = self.metrics.total_keys - self.metrics.idle_keys;
659    }
660
661    /// Evicts the least recently active key.
662    fn evict_least_recently_active(&mut self) {
663        if let Some(key_to_evict) = self
664            .key_states
665            .iter()
666            .min_by_key(|(_, state)| state.last_activity)
667            .map(|(k, _)| k.clone())
668        {
669            self.key_states.remove(&key_to_evict);
670            self.metrics.evicted_keys += 1;
671        }
672    }
673
674    /// Evicts the key with the lowest watermark.
675    fn evict_lowest_watermark(&mut self) {
676        if let Some(key_to_evict) = self
677            .key_states
678            .iter()
679            .min_by_key(|(_, state)| state.watermark)
680            .map(|(k, _)| k.clone())
681        {
682            self.key_states.remove(&key_to_evict);
683            self.metrics.evicted_keys += 1;
684        }
685    }
686}
687
688/// Keyed watermark tracker with late event handling.
689///
690/// Wraps `KeyedWatermarkTracker` and provides utilities for handling late events,
691/// including counting and optional side-output.
692#[derive(Debug)]
693pub struct KeyedWatermarkTrackerWithLateHandling<K: Hash + Eq + Clone> {
694    /// Inner tracker
695    tracker: KeyedWatermarkTracker<K>,
696    /// Count of late events per key
697    late_events_per_key: HashMap<K, u64>,
698    /// Total late events
699    total_late_events: u64,
700}
701
702impl<K: Hash + Eq + Clone> KeyedWatermarkTrackerWithLateHandling<K> {
703    /// Creates a new tracker with late event handling.
704    #[must_use]
705    pub fn new(config: KeyedWatermarkConfig) -> Self {
706        Self {
707            tracker: KeyedWatermarkTracker::new(config),
708            late_events_per_key: HashMap::new(),
709            total_late_events: 0,
710        }
711    }
712
713    /// Updates the watermark and checks for late events.
714    ///
715    /// Returns `(watermark_result, is_late)`.
716    ///
717    /// # Errors
718    ///
719    /// Returns `KeyedWatermarkError::MaxKeysReached` if the maximum number of keys
720    /// is reached and the eviction policy is `RejectNew`.
721    pub fn update_with_late_check(
722        &mut self,
723        key: K,
724        event_time: i64,
725    ) -> Result<(Option<Watermark>, bool), KeyedWatermarkError> {
726        let is_late = self.tracker.is_late(&key, event_time);
727
728        if is_late {
729            *self.late_events_per_key.entry(key.clone()).or_insert(0) += 1;
730            self.total_late_events += 1;
731        }
732
733        let wm = self.tracker.update(key, event_time)?;
734        Ok((wm, is_late))
735    }
736
737    /// Returns late event count for a key.
738    #[must_use]
739    pub fn late_events_for_key(&self, key: &K) -> u64 {
740        self.late_events_per_key.get(key).copied().unwrap_or(0)
741    }
742
743    /// Returns total late event count.
744    #[must_use]
745    pub fn total_late_events(&self) -> u64 {
746        self.total_late_events
747    }
748
749    /// Returns a reference to the inner tracker.
750    #[must_use]
751    pub fn inner(&self) -> &KeyedWatermarkTracker<K> {
752        &self.tracker
753    }
754
755    /// Returns a mutable reference to the inner tracker.
756    pub fn inner_mut(&mut self) -> &mut KeyedWatermarkTracker<K> {
757        &mut self.tracker
758    }
759}
760
761#[cfg(test)]
762mod tests {
763    use super::*;
764
765    #[test]
766    fn test_key_watermark_state_creation() {
767        let state = KeyWatermarkState::new();
768        assert_eq!(state.max_event_time, i64::MIN);
769        assert_eq!(state.watermark, i64::MIN);
770        assert!(!state.is_idle);
771    }
772
773    #[test]
774    fn test_key_watermark_state_update() {
775        let mut state = KeyWatermarkState::new();
776
777        // First update
778        let advanced = state.update(1000, 100);
779        assert!(advanced);
780        assert_eq!(state.max_event_time, 1000);
781        assert_eq!(state.watermark, 900); // 1000 - 100
782
783        // Out-of-order event
784        let advanced = state.update(800, 100);
785        assert!(!advanced);
786        assert_eq!(state.max_event_time, 1000); // Unchanged
787
788        // New max
789        let advanced = state.update(1500, 100);
790        assert!(advanced);
791        assert_eq!(state.watermark, 1400);
792    }
793
794    #[test]
795    fn test_key_watermark_state_is_late() {
796        let mut state = KeyWatermarkState::new();
797        state.update(1000, 100); // watermark = 900
798
799        assert!(state.is_late(800)); // Before watermark
800        assert!(state.is_late(899)); // Just before watermark
801        assert!(!state.is_late(900)); // At watermark
802        assert!(!state.is_late(1000)); // After watermark
803    }
804
805    #[test]
806    fn test_config_defaults() {
807        let config = KeyedWatermarkConfig::default();
808        assert_eq!(config.bounded_delay, Duration::from_secs(5));
809        assert_eq!(config.idle_timeout, Duration::from_secs(60));
810        assert!(config.max_keys.is_none());
811        assert_eq!(
812            config.eviction_policy,
813            KeyEvictionPolicy::LeastRecentlyActive
814        );
815    }
816
817    #[test]
818    fn test_config_builder() {
819        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_secs(10))
820            .with_idle_timeout(Duration::from_secs(30))
821            .with_max_keys(1000)
822            .with_eviction_policy(KeyEvictionPolicy::LowestWatermark);
823
824        assert_eq!(config.bounded_delay, Duration::from_secs(10));
825        assert_eq!(config.idle_timeout, Duration::from_secs(30));
826        assert_eq!(config.max_keys, Some(1000));
827        assert_eq!(config.eviction_policy, KeyEvictionPolicy::LowestWatermark);
828    }
829
830    #[test]
831    fn test_keyed_tracker_single_key_updates_watermark() {
832        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
833        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
834
835        let wm = tracker.update("key1".to_string(), 1000).unwrap();
836        assert_eq!(wm, Some(Watermark::new(900)));
837        assert_eq!(tracker.watermark_for_key(&"key1".to_string()), Some(900));
838        assert_eq!(tracker.global_watermark(), Some(Watermark::new(900)));
839    }
840
841    #[test]
842    fn test_keyed_tracker_multiple_keys_independent_watermarks() {
843        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
844        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
845
846        tracker.update("fast".to_string(), 5000).unwrap();
847        tracker.update("slow".to_string(), 1000).unwrap();
848
849        // Each key has its own watermark
850        assert_eq!(tracker.watermark_for_key(&"fast".to_string()), Some(4900));
851        assert_eq!(tracker.watermark_for_key(&"slow".to_string()), Some(900));
852
853        // Global is minimum
854        assert_eq!(tracker.global_watermark(), Some(Watermark::new(900)));
855    }
856
857    #[test]
858    fn test_keyed_tracker_global_is_minimum_of_active_keys() {
859        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
860        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
861
862        tracker.update("a".to_string(), 5000).unwrap();
863        tracker.update("b".to_string(), 3000).unwrap();
864        tracker.update("c".to_string(), 7000).unwrap();
865
866        assert_eq!(tracker.global_watermark(), Some(Watermark::new(3000)));
867    }
868
869    #[test]
870    fn test_keyed_tracker_fast_key_does_not_affect_slow_key() {
871        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
872        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
873
874        // Slow key starts first
875        tracker.update("slow".to_string(), 1000).unwrap();
876
877        // Fast key advances rapidly
878        tracker.update("fast".to_string(), 5000).unwrap();
879        tracker.update("fast".to_string(), 10000).unwrap();
880
881        // Slow key's watermark is independent
882        assert_eq!(tracker.watermark_for_key(&"slow".to_string()), Some(900));
883
884        // Event at 950 is not late for slow key
885        assert!(!tracker.is_late(&"slow".to_string(), 950));
886
887        // But it would be very late for fast key
888        assert!(tracker.is_late(&"fast".to_string(), 950));
889    }
890
891    #[test]
892    fn test_keyed_tracker_is_late_uses_key_watermark_not_global() {
893        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
894        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
895
896        tracker.update("fast".to_string(), 10000).unwrap(); // watermark = 9900
897        tracker.update("slow".to_string(), 1000).unwrap(); // watermark = 900
898
899        // Global is 900, but we use per-key watermarks
900        assert_eq!(tracker.global_watermark(), Some(Watermark::new(900)));
901
902        // Event at 5000 is NOT late for slow key (5000 >= 900)
903        assert!(!tracker.is_late(&"slow".to_string(), 5000));
904
905        // Event at 5000 IS late for fast key (5000 < 9900)
906        assert!(tracker.is_late(&"fast".to_string(), 5000));
907    }
908
909    #[test]
910    fn test_keyed_tracker_idle_key_excluded_from_global() {
911        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
912        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
913
914        tracker.update("fast".to_string(), 5000).unwrap();
915        tracker.update("slow".to_string(), 1000).unwrap();
916
917        assert_eq!(tracker.global_watermark(), Some(Watermark::new(1000)));
918
919        // Mark slow key as idle
920        let wm = tracker.mark_idle(&"slow".to_string());
921        assert_eq!(wm, Some(Watermark::new(5000)));
922        assert_eq!(tracker.global_watermark(), Some(Watermark::new(5000)));
923    }
924
925    #[test]
926    fn test_keyed_tracker_all_idle_uses_max() {
927        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
928        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
929
930        tracker.update("a".to_string(), 5000).unwrap();
931        tracker.update("b".to_string(), 3000).unwrap();
932
933        tracker.mark_idle(&"a".to_string());
934        let wm = tracker.mark_idle(&"b".to_string());
935
936        // When all idle, use max to allow progress
937        assert_eq!(wm, Some(Watermark::new(5000)));
938    }
939
940    #[test]
941    fn test_keyed_tracker_key_eviction_lru() {
942        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0))
943            .with_max_keys(2)
944            .with_eviction_policy(KeyEvictionPolicy::LeastRecentlyActive);
945
946        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
947
948        tracker.update("a".to_string(), 1000).unwrap();
949        std::thread::sleep(Duration::from_millis(10));
950        tracker.update("b".to_string(), 2000).unwrap();
951
952        assert_eq!(tracker.total_key_count(), 2);
953
954        // Adding third key should evict "a" (least recently active)
955        tracker.update("c".to_string(), 3000).unwrap();
956
957        assert_eq!(tracker.total_key_count(), 2);
958        assert!(!tracker.contains_key(&"a".to_string())); // Evicted
959        assert!(tracker.contains_key(&"b".to_string()));
960        assert!(tracker.contains_key(&"c".to_string()));
961        assert_eq!(tracker.metrics().evicted_keys, 1);
962    }
963
964    #[test]
965    fn test_keyed_tracker_key_eviction_lowest_watermark() {
966        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0))
967            .with_max_keys(2)
968            .with_eviction_policy(KeyEvictionPolicy::LowestWatermark);
969
970        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
971
972        tracker.update("high".to_string(), 5000).unwrap();
973        tracker.update("low".to_string(), 1000).unwrap();
974
975        // Adding third key should evict "low" (lowest watermark)
976        tracker.update("mid".to_string(), 3000).unwrap();
977
978        assert!(!tracker.contains_key(&"low".to_string())); // Evicted
979        assert!(tracker.contains_key(&"high".to_string()));
980        assert!(tracker.contains_key(&"mid".to_string()));
981    }
982
983    #[test]
984    fn test_keyed_tracker_key_eviction_reject_new() {
985        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0))
986            .with_max_keys(2)
987            .with_eviction_policy(KeyEvictionPolicy::RejectNew);
988
989        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
990
991        tracker.update("a".to_string(), 1000).unwrap();
992        tracker.update("b".to_string(), 2000).unwrap();
993
994        // Third key should be rejected
995        let result = tracker.update("c".to_string(), 3000);
996        assert!(matches!(
997            result,
998            Err(KeyedWatermarkError::MaxKeysReached { max_keys: 2 })
999        ));
1000
1001        // Existing keys still work
1002        assert!(tracker.update("a".to_string(), 1500).is_ok());
1003    }
1004
1005    #[test]
1006    fn test_keyed_tracker_batch_update_efficient() {
1007        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
1008        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1009
1010        let events = vec![
1011            ("a".to_string(), 1000),
1012            ("b".to_string(), 2000),
1013            ("a".to_string(), 1500),
1014            ("c".to_string(), 3000),
1015        ];
1016
1017        let wm = tracker.update_batch(&events).unwrap();
1018        assert!(wm.is_some());
1019
1020        assert_eq!(tracker.total_key_count(), 3);
1021        assert_eq!(tracker.watermark_for_key(&"a".to_string()), Some(1400)); // 1500 - 100
1022        assert_eq!(tracker.watermark_for_key(&"b".to_string()), Some(1900));
1023        assert_eq!(tracker.watermark_for_key(&"c".to_string()), Some(2900));
1024    }
1025
1026    #[test]
1027    fn test_keyed_tracker_remove_key_recalculates_global() {
1028        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
1029        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1030
1031        tracker.update("fast".to_string(), 5000).unwrap();
1032        tracker.update("slow".to_string(), 1000).unwrap();
1033
1034        assert_eq!(tracker.global_watermark(), Some(Watermark::new(1000)));
1035
1036        // Remove slow key
1037        let state = tracker.remove_key(&"slow".to_string());
1038        assert!(state.is_some());
1039        assert_eq!(state.unwrap().watermark, 1000);
1040
1041        // Global should now be 5000
1042        assert_eq!(tracker.global_watermark(), Some(Watermark::new(5000)));
1043    }
1044
1045    #[test]
1046    fn test_keyed_tracker_check_idle_keys() {
1047        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0))
1048            .with_idle_timeout(Duration::from_millis(10));
1049
1050        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1051
1052        tracker.update("fast".to_string(), 5000).unwrap();
1053        tracker.update("slow".to_string(), 1000).unwrap();
1054
1055        // Wait for timeout
1056        std::thread::sleep(Duration::from_millis(20));
1057
1058        // Update only fast key
1059        tracker.update("fast".to_string(), 6000).unwrap();
1060
1061        // Check for idle keys
1062        let wm = tracker.check_idle_keys();
1063
1064        // slow should be marked idle
1065        assert!(tracker.key_state(&"slow".to_string()).unwrap().is_idle);
1066
1067        // Global should advance
1068        assert!(wm.is_some() || tracker.global_watermark() == Some(Watermark::new(6000)));
1069    }
1070
1071    #[test]
1072    fn test_keyed_tracker_metrics() {
1073        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
1074        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1075
1076        tracker.update("a".to_string(), 1000).unwrap();
1077        tracker.update("b".to_string(), 2000).unwrap();
1078        tracker.update("a".to_string(), 1500).unwrap(); // Advances a's watermark
1079
1080        let metrics = tracker.metrics();
1081        assert_eq!(metrics.total_keys, 2);
1082        assert_eq!(metrics.active_keys, 2);
1083        assert_eq!(metrics.idle_keys, 0);
1084        assert!(metrics.key_advances >= 3); // At least 3 watermark advances
1085        assert!(metrics.global_advances >= 1);
1086
1087        tracker.mark_idle(&"b".to_string());
1088
1089        let metrics = tracker.metrics();
1090        assert_eq!(metrics.active_keys, 1);
1091        assert_eq!(metrics.idle_keys, 1);
1092    }
1093
1094    #[test]
1095    fn test_keyed_tracker_clear() {
1096        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
1097        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1098
1099        tracker.update("a".to_string(), 1000).unwrap();
1100        tracker.update("b".to_string(), 2000).unwrap();
1101
1102        tracker.clear();
1103
1104        assert_eq!(tracker.total_key_count(), 0);
1105        assert_eq!(tracker.global_watermark(), None);
1106    }
1107
1108    #[test]
1109    fn test_keyed_tracker_is_late_global() {
1110        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
1111        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1112
1113        tracker.update("fast".to_string(), 5000).unwrap();
1114        tracker.update("slow".to_string(), 1000).unwrap();
1115
1116        // Global watermark is 1000
1117        assert!(!tracker.is_late_global(1000));
1118        assert!(tracker.is_late_global(999));
1119    }
1120
1121    #[test]
1122    fn test_keyed_tracker_iteration() {
1123        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
1124        let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1125
1126        tracker.update("a".to_string(), 1000).unwrap();
1127        tracker.update("b".to_string(), 2000).unwrap();
1128
1129        let keys: Vec<_> = tracker.keys().collect();
1130        assert_eq!(keys.len(), 2);
1131
1132        let pairs: Vec<_> = tracker.iter().collect();
1133        assert_eq!(pairs.len(), 2);
1134    }
1135
1136    #[test]
1137    fn test_late_handling_tracker() {
1138        let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
1139        let mut tracker: KeyedWatermarkTrackerWithLateHandling<String> =
1140            KeyedWatermarkTrackerWithLateHandling::new(config);
1141
1142        // First event
1143        let (wm, is_late) = tracker
1144            .update_with_late_check("key1".to_string(), 1000)
1145            .unwrap();
1146        assert!(wm.is_some());
1147        assert!(!is_late);
1148
1149        // On-time event
1150        let (_, is_late) = tracker
1151            .update_with_late_check("key1".to_string(), 950)
1152            .unwrap();
1153        assert!(!is_late); // 950 >= 900 (watermark)
1154
1155        // Late event
1156        let (_, is_late) = tracker
1157            .update_with_late_check("key1".to_string(), 800)
1158            .unwrap();
1159        assert!(is_late); // 800 < 900
1160
1161        assert_eq!(tracker.late_events_for_key(&"key1".to_string()), 1);
1162        assert_eq!(tracker.total_late_events(), 1);
1163    }
1164}