1use std::collections::HashMap;
77use std::hash::Hash;
78use std::time::{Duration, Instant};
79
80use super::Watermark;
81
82#[derive(Debug, Clone)]
87pub struct KeyWatermarkState {
88 pub max_event_time: i64,
90 pub watermark: i64,
92 pub last_activity: Instant,
94 pub is_idle: bool,
96}
97
98impl KeyWatermarkState {
99 #[must_use]
101 pub fn new() -> Self {
102 Self {
103 max_event_time: i64::MIN,
104 watermark: i64::MIN,
105 last_activity: Instant::now(),
106 is_idle: false,
107 }
108 }
109
110 #[inline]
114 pub fn update(&mut self, event_time: i64, bounded_delay: i64) -> bool {
115 self.last_activity = Instant::now();
116 self.is_idle = false;
117
118 if event_time > self.max_event_time {
119 self.max_event_time = event_time;
120 let new_watermark = event_time.saturating_sub(bounded_delay);
121 if new_watermark > self.watermark {
122 self.watermark = new_watermark;
123 return true;
124 }
125 }
126 false
127 }
128
129 #[inline]
131 #[must_use]
132 pub fn is_late(&self, event_time: i64) -> bool {
133 event_time < self.watermark
134 }
135}
136
137impl Default for KeyWatermarkState {
138 fn default() -> Self {
139 Self::new()
140 }
141}
142
143#[derive(Debug, Clone)]
145pub struct KeyedWatermarkConfig {
146 pub bounded_delay: Duration,
148 pub idle_timeout: Duration,
150 pub max_keys: Option<usize>,
152 pub eviction_policy: KeyEvictionPolicy,
154}
155
156impl Default for KeyedWatermarkConfig {
157 fn default() -> Self {
158 Self {
159 bounded_delay: Duration::from_secs(5),
160 idle_timeout: Duration::from_secs(60),
161 max_keys: None,
162 eviction_policy: KeyEvictionPolicy::LeastRecentlyActive,
163 }
164 }
165}
166
167impl KeyedWatermarkConfig {
168 #[must_use]
170 pub fn with_bounded_delay(bounded_delay: Duration) -> Self {
171 Self {
172 bounded_delay,
173 ..Default::default()
174 }
175 }
176
177 #[must_use]
179 pub fn with_idle_timeout(mut self, timeout: Duration) -> Self {
180 self.idle_timeout = timeout;
181 self
182 }
183
184 #[must_use]
186 pub fn with_max_keys(mut self, max_keys: usize) -> Self {
187 self.max_keys = Some(max_keys);
188 self
189 }
190
191 #[must_use]
193 pub fn with_eviction_policy(mut self, policy: KeyEvictionPolicy) -> Self {
194 self.eviction_policy = policy;
195 self
196 }
197}
198
199#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
201pub enum KeyEvictionPolicy {
202 #[default]
204 LeastRecentlyActive,
205 LowestWatermark,
207 RejectNew,
209}
210
211#[derive(Debug, Clone, thiserror::Error)]
213pub enum KeyedWatermarkError {
214 #[error("Maximum keys reached ({max_keys}), cannot add new key")]
216 MaxKeysReached {
217 max_keys: usize,
219 },
220}
221
222#[derive(Debug, Clone, Default)]
224pub struct KeyedWatermarkMetrics {
225 pub total_keys: usize,
227 pub active_keys: usize,
229 pub idle_keys: usize,
231 pub evicted_keys: u64,
233 pub global_advances: u64,
235 pub key_advances: u64,
237}
238
239impl KeyedWatermarkMetrics {
240 #[must_use]
242 pub fn new() -> Self {
243 Self::default()
244 }
245}
246
247#[derive(Debug)]
281pub struct KeyedWatermarkTracker<K: Hash + Eq + Clone> {
282 key_states: HashMap<K, KeyWatermarkState>,
284
285 global_watermark: i64,
287
288 config: KeyedWatermarkConfig,
290
291 bounded_delay_ms: i64,
293
294 metrics: KeyedWatermarkMetrics,
296}
297
298impl<K: Hash + Eq + Clone> KeyedWatermarkTracker<K> {
299 #[must_use]
301 #[allow(clippy::cast_possible_truncation)] pub fn new(config: KeyedWatermarkConfig) -> Self {
303 let bounded_delay_ms = config.bounded_delay.as_millis() as i64;
304 Self {
305 key_states: HashMap::new(),
306 global_watermark: i64::MIN,
307 config,
308 bounded_delay_ms,
309 metrics: KeyedWatermarkMetrics::new(),
310 }
311 }
312
313 #[must_use]
315 pub fn with_defaults() -> Self {
316 Self::new(KeyedWatermarkConfig::default())
317 }
318
319 #[allow(clippy::missing_panics_doc)] #[allow(clippy::needless_pass_by_value)] pub fn update(
347 &mut self,
348 key: K,
349 event_time: i64,
350 ) -> Result<Option<Watermark>, KeyedWatermarkError> {
351 if !self.key_states.contains_key(&key) {
353 if let Some(max_keys) = self.config.max_keys {
355 if self.key_states.len() >= max_keys {
356 match self.config.eviction_policy {
357 KeyEvictionPolicy::RejectNew => {
358 return Err(KeyedWatermarkError::MaxKeysReached { max_keys });
359 }
360 KeyEvictionPolicy::LeastRecentlyActive => {
361 self.evict_least_recently_active();
362 }
363 KeyEvictionPolicy::LowestWatermark => {
364 self.evict_lowest_watermark();
365 }
366 }
367 }
368 }
369 self.key_states
370 .insert(key.clone(), KeyWatermarkState::new());
371 self.metrics.total_keys = self.key_states.len();
372 }
373
374 let state = self.key_states.get_mut(&key).expect("key just inserted");
376 let watermark_advanced = state.update(event_time, self.bounded_delay_ms);
377
378 if watermark_advanced {
379 self.metrics.key_advances += 1;
380 }
381
382 self.update_metrics_counts();
384
385 Ok(self.try_advance_global())
387 }
388
389 #[allow(clippy::missing_panics_doc)] pub fn update_batch(
399 &mut self,
400 events: &[(K, i64)],
401 ) -> Result<Option<Watermark>, KeyedWatermarkError> {
402 for (key, event_time) in events {
403 if !self.key_states.contains_key(key) {
405 if let Some(max_keys) = self.config.max_keys {
406 if self.key_states.len() >= max_keys {
407 match self.config.eviction_policy {
408 KeyEvictionPolicy::RejectNew => {
409 return Err(KeyedWatermarkError::MaxKeysReached { max_keys });
410 }
411 KeyEvictionPolicy::LeastRecentlyActive => {
412 self.evict_least_recently_active();
413 }
414 KeyEvictionPolicy::LowestWatermark => {
415 self.evict_lowest_watermark();
416 }
417 }
418 }
419 }
420 self.key_states
421 .insert(key.clone(), KeyWatermarkState::new());
422 }
423
424 let state = self.key_states.get_mut(key).expect("key just inserted");
425 if state.update(*event_time, self.bounded_delay_ms) {
426 self.metrics.key_advances += 1;
427 }
428 }
429
430 self.metrics.total_keys = self.key_states.len();
431 self.update_metrics_counts();
432
433 Ok(self.try_advance_global())
434 }
435
436 #[must_use]
438 pub fn watermark_for_key(&self, key: &K) -> Option<i64> {
439 self.key_states.get(key).map(|s| s.watermark)
440 }
441
442 #[must_use]
444 pub fn global_watermark(&self) -> Option<Watermark> {
445 if self.global_watermark == i64::MIN {
446 None
447 } else {
448 Some(Watermark::new(self.global_watermark))
449 }
450 }
451
452 #[must_use]
457 pub fn is_late(&self, key: &K, event_time: i64) -> bool {
458 self.key_states
459 .get(key)
460 .is_some_and(|s| s.is_late(event_time))
461 }
462
463 #[must_use]
467 pub fn is_late_global(&self, event_time: i64) -> bool {
468 event_time < self.global_watermark
469 }
470
471 pub fn mark_idle(&mut self, key: &K) -> Option<Watermark> {
475 if let Some(state) = self.key_states.get_mut(key) {
476 if !state.is_idle {
477 state.is_idle = true;
478 self.update_metrics_counts();
479 return self.try_advance_global();
480 }
481 }
482 None
483 }
484
485 pub fn mark_active(&mut self, key: &K) {
487 if let Some(state) = self.key_states.get_mut(key) {
488 if state.is_idle {
489 state.is_idle = false;
490 state.last_activity = Instant::now();
491 self.update_metrics_counts();
492 }
493 }
494 }
495
496 pub fn check_idle_keys(&mut self) -> Option<Watermark> {
502 let idle_timeout = self.config.idle_timeout;
503 let mut any_marked = false;
504
505 for state in self.key_states.values_mut() {
506 if !state.is_idle && state.last_activity.elapsed() >= idle_timeout {
507 state.is_idle = true;
508 any_marked = true;
509 }
510 }
511
512 if any_marked {
513 self.update_metrics_counts();
514 self.try_advance_global()
515 } else {
516 None
517 }
518 }
519
520 #[must_use]
522 pub fn active_key_count(&self) -> usize {
523 self.key_states.values().filter(|s| !s.is_idle).count()
524 }
525
526 #[must_use]
528 pub fn total_key_count(&self) -> usize {
529 self.key_states.len()
530 }
531
532 #[must_use]
534 pub fn metrics(&self) -> &KeyedWatermarkMetrics {
535 &self.metrics
536 }
537
538 pub fn recalculate_global(&mut self) -> Option<Watermark> {
542 let new_global = self.calculate_global();
543 if new_global != i64::MAX && new_global != i64::MIN {
544 self.global_watermark = new_global;
545 Some(Watermark::new(new_global))
546 } else {
547 None
548 }
549 }
550
551 pub fn remove_key(&mut self, key: &K) -> Option<KeyWatermarkState> {
555 let state = self.key_states.remove(key);
556 if state.is_some() {
557 self.metrics.total_keys = self.key_states.len();
558 self.update_metrics_counts();
559 let new_global = self.calculate_global();
561 if new_global > self.global_watermark && new_global != i64::MAX {
562 self.global_watermark = new_global;
563 }
564 }
565 state
566 }
567
568 pub fn clear(&mut self) {
570 self.key_states.clear();
571 self.global_watermark = i64::MIN;
572 self.metrics = KeyedWatermarkMetrics::new();
573 }
574
575 #[must_use]
577 pub fn key_state(&self, key: &K) -> Option<&KeyWatermarkState> {
578 self.key_states.get(key)
579 }
580
581 #[must_use]
583 pub fn config(&self) -> &KeyedWatermarkConfig {
584 &self.config
585 }
586
587 #[must_use]
589 pub fn contains_key(&self, key: &K) -> bool {
590 self.key_states.contains_key(key)
591 }
592
593 pub fn keys(&self) -> impl Iterator<Item = &K> {
595 self.key_states.keys()
596 }
597
598 pub fn iter(&self) -> impl Iterator<Item = (&K, &KeyWatermarkState)> {
600 self.key_states.iter()
601 }
602
603 #[must_use]
605 pub fn bounded_delay_ms(&self) -> i64 {
606 self.bounded_delay_ms
607 }
608
609 fn try_advance_global(&mut self) -> Option<Watermark> {
616 let new_global = self.calculate_global();
617
618 if new_global != i64::MAX && new_global != i64::MIN && new_global != self.global_watermark {
619 let old_global = self.global_watermark;
620 self.global_watermark = new_global;
621 if new_global > old_global {
622 self.metrics.global_advances += 1;
623 }
624 Some(Watermark::new(new_global))
625 } else {
626 None
627 }
628 }
629
630 fn calculate_global(&self) -> i64 {
632 let mut min_watermark = i64::MAX;
633 let mut has_active = false;
634
635 for state in self.key_states.values() {
636 if !state.is_idle {
637 has_active = true;
638 min_watermark = min_watermark.min(state.watermark);
639 }
640 }
641
642 if !has_active {
644 min_watermark = self
645 .key_states
646 .values()
647 .map(|s| s.watermark)
648 .max()
649 .unwrap_or(i64::MIN);
650 }
651
652 min_watermark
653 }
654
655 fn update_metrics_counts(&mut self) {
657 self.metrics.idle_keys = self.key_states.values().filter(|s| s.is_idle).count();
658 self.metrics.active_keys = self.metrics.total_keys - self.metrics.idle_keys;
659 }
660
661 fn evict_least_recently_active(&mut self) {
663 if let Some(key_to_evict) = self
664 .key_states
665 .iter()
666 .min_by_key(|(_, state)| state.last_activity)
667 .map(|(k, _)| k.clone())
668 {
669 self.key_states.remove(&key_to_evict);
670 self.metrics.evicted_keys += 1;
671 }
672 }
673
674 fn evict_lowest_watermark(&mut self) {
676 if let Some(key_to_evict) = self
677 .key_states
678 .iter()
679 .min_by_key(|(_, state)| state.watermark)
680 .map(|(k, _)| k.clone())
681 {
682 self.key_states.remove(&key_to_evict);
683 self.metrics.evicted_keys += 1;
684 }
685 }
686}
687
688#[derive(Debug)]
693pub struct KeyedWatermarkTrackerWithLateHandling<K: Hash + Eq + Clone> {
694 tracker: KeyedWatermarkTracker<K>,
696 late_events_per_key: HashMap<K, u64>,
698 total_late_events: u64,
700}
701
702impl<K: Hash + Eq + Clone> KeyedWatermarkTrackerWithLateHandling<K> {
703 #[must_use]
705 pub fn new(config: KeyedWatermarkConfig) -> Self {
706 Self {
707 tracker: KeyedWatermarkTracker::new(config),
708 late_events_per_key: HashMap::new(),
709 total_late_events: 0,
710 }
711 }
712
713 pub fn update_with_late_check(
722 &mut self,
723 key: K,
724 event_time: i64,
725 ) -> Result<(Option<Watermark>, bool), KeyedWatermarkError> {
726 let is_late = self.tracker.is_late(&key, event_time);
727
728 if is_late {
729 *self.late_events_per_key.entry(key.clone()).or_insert(0) += 1;
730 self.total_late_events += 1;
731 }
732
733 let wm = self.tracker.update(key, event_time)?;
734 Ok((wm, is_late))
735 }
736
737 #[must_use]
739 pub fn late_events_for_key(&self, key: &K) -> u64 {
740 self.late_events_per_key.get(key).copied().unwrap_or(0)
741 }
742
743 #[must_use]
745 pub fn total_late_events(&self) -> u64 {
746 self.total_late_events
747 }
748
749 #[must_use]
751 pub fn inner(&self) -> &KeyedWatermarkTracker<K> {
752 &self.tracker
753 }
754
755 pub fn inner_mut(&mut self) -> &mut KeyedWatermarkTracker<K> {
757 &mut self.tracker
758 }
759}
760
761#[cfg(test)]
762mod tests {
763 use super::*;
764
765 #[test]
766 fn test_key_watermark_state_creation() {
767 let state = KeyWatermarkState::new();
768 assert_eq!(state.max_event_time, i64::MIN);
769 assert_eq!(state.watermark, i64::MIN);
770 assert!(!state.is_idle);
771 }
772
773 #[test]
774 fn test_key_watermark_state_update() {
775 let mut state = KeyWatermarkState::new();
776
777 let advanced = state.update(1000, 100);
779 assert!(advanced);
780 assert_eq!(state.max_event_time, 1000);
781 assert_eq!(state.watermark, 900); let advanced = state.update(800, 100);
785 assert!(!advanced);
786 assert_eq!(state.max_event_time, 1000); let advanced = state.update(1500, 100);
790 assert!(advanced);
791 assert_eq!(state.watermark, 1400);
792 }
793
794 #[test]
795 fn test_key_watermark_state_is_late() {
796 let mut state = KeyWatermarkState::new();
797 state.update(1000, 100); assert!(state.is_late(800)); assert!(state.is_late(899)); assert!(!state.is_late(900)); assert!(!state.is_late(1000)); }
804
805 #[test]
806 fn test_config_defaults() {
807 let config = KeyedWatermarkConfig::default();
808 assert_eq!(config.bounded_delay, Duration::from_secs(5));
809 assert_eq!(config.idle_timeout, Duration::from_secs(60));
810 assert!(config.max_keys.is_none());
811 assert_eq!(
812 config.eviction_policy,
813 KeyEvictionPolicy::LeastRecentlyActive
814 );
815 }
816
817 #[test]
818 fn test_config_builder() {
819 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_secs(10))
820 .with_idle_timeout(Duration::from_secs(30))
821 .with_max_keys(1000)
822 .with_eviction_policy(KeyEvictionPolicy::LowestWatermark);
823
824 assert_eq!(config.bounded_delay, Duration::from_secs(10));
825 assert_eq!(config.idle_timeout, Duration::from_secs(30));
826 assert_eq!(config.max_keys, Some(1000));
827 assert_eq!(config.eviction_policy, KeyEvictionPolicy::LowestWatermark);
828 }
829
830 #[test]
831 fn test_keyed_tracker_single_key_updates_watermark() {
832 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
833 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
834
835 let wm = tracker.update("key1".to_string(), 1000).unwrap();
836 assert_eq!(wm, Some(Watermark::new(900)));
837 assert_eq!(tracker.watermark_for_key(&"key1".to_string()), Some(900));
838 assert_eq!(tracker.global_watermark(), Some(Watermark::new(900)));
839 }
840
841 #[test]
842 fn test_keyed_tracker_multiple_keys_independent_watermarks() {
843 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
844 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
845
846 tracker.update("fast".to_string(), 5000).unwrap();
847 tracker.update("slow".to_string(), 1000).unwrap();
848
849 assert_eq!(tracker.watermark_for_key(&"fast".to_string()), Some(4900));
851 assert_eq!(tracker.watermark_for_key(&"slow".to_string()), Some(900));
852
853 assert_eq!(tracker.global_watermark(), Some(Watermark::new(900)));
855 }
856
857 #[test]
858 fn test_keyed_tracker_global_is_minimum_of_active_keys() {
859 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
860 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
861
862 tracker.update("a".to_string(), 5000).unwrap();
863 tracker.update("b".to_string(), 3000).unwrap();
864 tracker.update("c".to_string(), 7000).unwrap();
865
866 assert_eq!(tracker.global_watermark(), Some(Watermark::new(3000)));
867 }
868
869 #[test]
870 fn test_keyed_tracker_fast_key_does_not_affect_slow_key() {
871 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
872 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
873
874 tracker.update("slow".to_string(), 1000).unwrap();
876
877 tracker.update("fast".to_string(), 5000).unwrap();
879 tracker.update("fast".to_string(), 10000).unwrap();
880
881 assert_eq!(tracker.watermark_for_key(&"slow".to_string()), Some(900));
883
884 assert!(!tracker.is_late(&"slow".to_string(), 950));
886
887 assert!(tracker.is_late(&"fast".to_string(), 950));
889 }
890
891 #[test]
892 fn test_keyed_tracker_is_late_uses_key_watermark_not_global() {
893 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
894 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
895
896 tracker.update("fast".to_string(), 10000).unwrap(); tracker.update("slow".to_string(), 1000).unwrap(); assert_eq!(tracker.global_watermark(), Some(Watermark::new(900)));
901
902 assert!(!tracker.is_late(&"slow".to_string(), 5000));
904
905 assert!(tracker.is_late(&"fast".to_string(), 5000));
907 }
908
909 #[test]
910 fn test_keyed_tracker_idle_key_excluded_from_global() {
911 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
912 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
913
914 tracker.update("fast".to_string(), 5000).unwrap();
915 tracker.update("slow".to_string(), 1000).unwrap();
916
917 assert_eq!(tracker.global_watermark(), Some(Watermark::new(1000)));
918
919 let wm = tracker.mark_idle(&"slow".to_string());
921 assert_eq!(wm, Some(Watermark::new(5000)));
922 assert_eq!(tracker.global_watermark(), Some(Watermark::new(5000)));
923 }
924
925 #[test]
926 fn test_keyed_tracker_all_idle_uses_max() {
927 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
928 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
929
930 tracker.update("a".to_string(), 5000).unwrap();
931 tracker.update("b".to_string(), 3000).unwrap();
932
933 tracker.mark_idle(&"a".to_string());
934 let wm = tracker.mark_idle(&"b".to_string());
935
936 assert_eq!(wm, Some(Watermark::new(5000)));
938 }
939
940 #[test]
941 fn test_keyed_tracker_key_eviction_lru() {
942 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0))
943 .with_max_keys(2)
944 .with_eviction_policy(KeyEvictionPolicy::LeastRecentlyActive);
945
946 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
947
948 tracker.update("a".to_string(), 1000).unwrap();
949 std::thread::sleep(Duration::from_millis(10));
950 tracker.update("b".to_string(), 2000).unwrap();
951
952 assert_eq!(tracker.total_key_count(), 2);
953
954 tracker.update("c".to_string(), 3000).unwrap();
956
957 assert_eq!(tracker.total_key_count(), 2);
958 assert!(!tracker.contains_key(&"a".to_string())); assert!(tracker.contains_key(&"b".to_string()));
960 assert!(tracker.contains_key(&"c".to_string()));
961 assert_eq!(tracker.metrics().evicted_keys, 1);
962 }
963
964 #[test]
965 fn test_keyed_tracker_key_eviction_lowest_watermark() {
966 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0))
967 .with_max_keys(2)
968 .with_eviction_policy(KeyEvictionPolicy::LowestWatermark);
969
970 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
971
972 tracker.update("high".to_string(), 5000).unwrap();
973 tracker.update("low".to_string(), 1000).unwrap();
974
975 tracker.update("mid".to_string(), 3000).unwrap();
977
978 assert!(!tracker.contains_key(&"low".to_string())); assert!(tracker.contains_key(&"high".to_string()));
980 assert!(tracker.contains_key(&"mid".to_string()));
981 }
982
983 #[test]
984 fn test_keyed_tracker_key_eviction_reject_new() {
985 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0))
986 .with_max_keys(2)
987 .with_eviction_policy(KeyEvictionPolicy::RejectNew);
988
989 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
990
991 tracker.update("a".to_string(), 1000).unwrap();
992 tracker.update("b".to_string(), 2000).unwrap();
993
994 let result = tracker.update("c".to_string(), 3000);
996 assert!(matches!(
997 result,
998 Err(KeyedWatermarkError::MaxKeysReached { max_keys: 2 })
999 ));
1000
1001 assert!(tracker.update("a".to_string(), 1500).is_ok());
1003 }
1004
1005 #[test]
1006 fn test_keyed_tracker_batch_update_efficient() {
1007 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
1008 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1009
1010 let events = vec![
1011 ("a".to_string(), 1000),
1012 ("b".to_string(), 2000),
1013 ("a".to_string(), 1500),
1014 ("c".to_string(), 3000),
1015 ];
1016
1017 let wm = tracker.update_batch(&events).unwrap();
1018 assert!(wm.is_some());
1019
1020 assert_eq!(tracker.total_key_count(), 3);
1021 assert_eq!(tracker.watermark_for_key(&"a".to_string()), Some(1400)); assert_eq!(tracker.watermark_for_key(&"b".to_string()), Some(1900));
1023 assert_eq!(tracker.watermark_for_key(&"c".to_string()), Some(2900));
1024 }
1025
1026 #[test]
1027 fn test_keyed_tracker_remove_key_recalculates_global() {
1028 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
1029 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1030
1031 tracker.update("fast".to_string(), 5000).unwrap();
1032 tracker.update("slow".to_string(), 1000).unwrap();
1033
1034 assert_eq!(tracker.global_watermark(), Some(Watermark::new(1000)));
1035
1036 let state = tracker.remove_key(&"slow".to_string());
1038 assert!(state.is_some());
1039 assert_eq!(state.unwrap().watermark, 1000);
1040
1041 assert_eq!(tracker.global_watermark(), Some(Watermark::new(5000)));
1043 }
1044
1045 #[test]
1046 fn test_keyed_tracker_check_idle_keys() {
1047 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0))
1048 .with_idle_timeout(Duration::from_millis(10));
1049
1050 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1051
1052 tracker.update("fast".to_string(), 5000).unwrap();
1053 tracker.update("slow".to_string(), 1000).unwrap();
1054
1055 std::thread::sleep(Duration::from_millis(20));
1057
1058 tracker.update("fast".to_string(), 6000).unwrap();
1060
1061 let wm = tracker.check_idle_keys();
1063
1064 assert!(tracker.key_state(&"slow".to_string()).unwrap().is_idle);
1066
1067 assert!(wm.is_some() || tracker.global_watermark() == Some(Watermark::new(6000)));
1069 }
1070
1071 #[test]
1072 fn test_keyed_tracker_metrics() {
1073 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
1074 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1075
1076 tracker.update("a".to_string(), 1000).unwrap();
1077 tracker.update("b".to_string(), 2000).unwrap();
1078 tracker.update("a".to_string(), 1500).unwrap(); let metrics = tracker.metrics();
1081 assert_eq!(metrics.total_keys, 2);
1082 assert_eq!(metrics.active_keys, 2);
1083 assert_eq!(metrics.idle_keys, 0);
1084 assert!(metrics.key_advances >= 3); assert!(metrics.global_advances >= 1);
1086
1087 tracker.mark_idle(&"b".to_string());
1088
1089 let metrics = tracker.metrics();
1090 assert_eq!(metrics.active_keys, 1);
1091 assert_eq!(metrics.idle_keys, 1);
1092 }
1093
1094 #[test]
1095 fn test_keyed_tracker_clear() {
1096 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
1097 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1098
1099 tracker.update("a".to_string(), 1000).unwrap();
1100 tracker.update("b".to_string(), 2000).unwrap();
1101
1102 tracker.clear();
1103
1104 assert_eq!(tracker.total_key_count(), 0);
1105 assert_eq!(tracker.global_watermark(), None);
1106 }
1107
1108 #[test]
1109 fn test_keyed_tracker_is_late_global() {
1110 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
1111 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1112
1113 tracker.update("fast".to_string(), 5000).unwrap();
1114 tracker.update("slow".to_string(), 1000).unwrap();
1115
1116 assert!(!tracker.is_late_global(1000));
1118 assert!(tracker.is_late_global(999));
1119 }
1120
1121 #[test]
1122 fn test_keyed_tracker_iteration() {
1123 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(0));
1124 let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
1125
1126 tracker.update("a".to_string(), 1000).unwrap();
1127 tracker.update("b".to_string(), 2000).unwrap();
1128
1129 let keys: Vec<_> = tracker.keys().collect();
1130 assert_eq!(keys.len(), 2);
1131
1132 let pairs: Vec<_> = tracker.iter().collect();
1133 assert_eq!(pairs.len(), 2);
1134 }
1135
1136 #[test]
1137 fn test_late_handling_tracker() {
1138 let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
1139 let mut tracker: KeyedWatermarkTrackerWithLateHandling<String> =
1140 KeyedWatermarkTrackerWithLateHandling::new(config);
1141
1142 let (wm, is_late) = tracker
1144 .update_with_late_check("key1".to_string(), 1000)
1145 .unwrap();
1146 assert!(wm.is_some());
1147 assert!(!is_late);
1148
1149 let (_, is_late) = tracker
1151 .update_with_late_check("key1".to_string(), 950)
1152 .unwrap();
1153 assert!(!is_late); let (_, is_late) = tracker
1157 .update_with_late_check("key1".to_string(), 800)
1158 .unwrap();
1159 assert!(is_late); assert_eq!(tracker.late_events_for_key(&"key1".to_string()), 1);
1162 assert_eq!(tracker.total_late_events(), 1);
1163 }
1164}