1use std::time::{Duration, Instant};
44
45use super::Watermark;
46
47pub trait WatermarkGenerator: Send {
53 fn on_event(&mut self, timestamp: i64) -> Option<Watermark>;
58
59 fn on_periodic(&mut self) -> Option<Watermark>;
63
64 fn current_watermark(&self) -> i64;
66}
67
68pub struct BoundedOutOfOrdernessGenerator {
90 max_out_of_orderness: i64,
91 current_max_timestamp: i64,
92 current_watermark: i64,
93}
94
95impl BoundedOutOfOrdernessGenerator {
96 #[must_use]
102 pub fn new(max_out_of_orderness: i64) -> Self {
103 Self {
104 max_out_of_orderness,
105 current_max_timestamp: i64::MIN,
106 current_watermark: i64::MIN,
107 }
108 }
109
110 #[must_use]
112 #[allow(clippy::cast_possible_truncation)] pub fn from_duration(max_out_of_orderness: Duration) -> Self {
114 Self::new(max_out_of_orderness.as_millis() as i64)
115 }
116
117 #[must_use]
119 pub fn max_out_of_orderness(&self) -> i64 {
120 self.max_out_of_orderness
121 }
122}
123
124impl WatermarkGenerator for BoundedOutOfOrdernessGenerator {
125 #[inline]
126 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
127 if timestamp > self.current_max_timestamp {
128 self.current_max_timestamp = timestamp;
129 let new_watermark = timestamp.saturating_sub(self.max_out_of_orderness);
130 if new_watermark > self.current_watermark {
131 self.current_watermark = new_watermark;
132 return Some(Watermark::new(new_watermark));
133 }
134 }
135 None
136 }
137
138 #[inline]
139 fn on_periodic(&mut self) -> Option<Watermark> {
140 None
142 }
143
144 #[inline]
145 fn current_watermark(&self) -> i64 {
146 self.current_watermark
147 }
148}
149
150#[derive(Debug, Default)]
165pub struct AscendingTimestampsGenerator {
166 current_watermark: i64,
167}
168
169impl AscendingTimestampsGenerator {
170 #[must_use]
172 pub fn new() -> Self {
173 Self {
174 current_watermark: i64::MIN,
175 }
176 }
177}
178
179impl WatermarkGenerator for AscendingTimestampsGenerator {
180 #[inline]
181 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
182 if timestamp > self.current_watermark {
183 self.current_watermark = timestamp;
184 Some(Watermark::new(timestamp))
185 } else {
186 None
187 }
188 }
189
190 #[inline]
191 fn on_periodic(&mut self) -> Option<Watermark> {
192 None
193 }
194
195 #[inline]
196 fn current_watermark(&self) -> i64 {
197 self.current_watermark
198 }
199}
200
201pub struct PeriodicGenerator<G: WatermarkGenerator> {
226 inner: G,
227 period: Duration,
228 last_emit_time: Instant,
229 last_emitted_watermark: i64,
230}
231
232impl<G: WatermarkGenerator> PeriodicGenerator<G> {
233 #[must_use]
240 pub fn new(inner: G, period: Duration) -> Self {
241 Self {
242 inner,
243 period,
244 last_emit_time: Instant::now(),
245 last_emitted_watermark: i64::MIN,
246 }
247 }
248
249 #[must_use]
251 pub fn inner(&self) -> &G {
252 &self.inner
253 }
254
255 pub fn inner_mut(&mut self) -> &mut G {
257 &mut self.inner
258 }
259}
260
261impl<G: WatermarkGenerator> WatermarkGenerator for PeriodicGenerator<G> {
262 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
263 let wm = self.inner.on_event(timestamp);
264 if let Some(ref w) = wm {
265 self.last_emitted_watermark = w.timestamp();
266 self.last_emit_time = Instant::now();
267 }
268 wm
269 }
270
271 fn on_periodic(&mut self) -> Option<Watermark> {
272 if self.last_emit_time.elapsed() >= self.period {
274 let current = self.inner.current_watermark();
275 if current > self.last_emitted_watermark {
276 self.last_emitted_watermark = current;
277 self.last_emit_time = Instant::now();
278 return Some(Watermark::new(current));
279 }
280 self.last_emit_time = Instant::now();
281 }
282 None
283 }
284
285 fn current_watermark(&self) -> i64 {
286 self.inner.current_watermark()
287 }
288}
289
290pub struct PunctuatedGenerator<F>
313where
314 F: Fn(i64) -> Option<Watermark> + Send,
315{
316 predicate: F,
317 current_watermark: i64,
318}
319
320impl<F> PunctuatedGenerator<F>
321where
322 F: Fn(i64) -> Option<Watermark> + Send,
323{
324 #[must_use]
330 pub fn new(predicate: F) -> Self {
331 Self {
332 predicate,
333 current_watermark: i64::MIN,
334 }
335 }
336}
337
338impl<F> WatermarkGenerator for PunctuatedGenerator<F>
339where
340 F: Fn(i64) -> Option<Watermark> + Send,
341{
342 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
343 if let Some(wm) = (self.predicate)(timestamp) {
344 if wm.timestamp() > self.current_watermark {
345 self.current_watermark = wm.timestamp();
346 return Some(wm);
347 }
348 }
349 None
350 }
351
352 fn on_periodic(&mut self) -> Option<Watermark> {
353 None
354 }
355
356 fn current_watermark(&self) -> i64 {
357 self.current_watermark
358 }
359}
360
361#[derive(Debug)]
386pub struct WatermarkTracker {
387 source_watermarks: Vec<i64>,
389 combined_watermark: i64,
391 idle_sources: Vec<bool>,
393 last_activity: Vec<Instant>,
395 idle_timeout: Duration,
397}
398
399impl WatermarkTracker {
400 #[must_use]
402 pub fn new(num_sources: usize) -> Self {
403 Self {
404 source_watermarks: vec![i64::MIN; num_sources],
405 combined_watermark: i64::MIN,
406 idle_sources: vec![false; num_sources],
407 last_activity: vec![Instant::now(); num_sources],
408 idle_timeout: Duration::from_secs(30), }
410 }
411
412 #[must_use]
414 pub fn with_idle_timeout(num_sources: usize, idle_timeout: Duration) -> Self {
415 Self {
416 source_watermarks: vec![i64::MIN; num_sources],
417 combined_watermark: i64::MIN,
418 idle_sources: vec![false; num_sources],
419 last_activity: vec![Instant::now(); num_sources],
420 idle_timeout,
421 }
422 }
423
424 pub fn update_source(&mut self, source_id: usize, watermark: i64) -> Option<Watermark> {
428 if source_id >= self.source_watermarks.len() {
429 return None;
430 }
431
432 self.idle_sources[source_id] = false;
434 self.last_activity[source_id] = Instant::now();
435
436 if watermark > self.source_watermarks[source_id] {
438 self.source_watermarks[source_id] = watermark;
439 self.update_combined()
440 } else {
441 None
442 }
443 }
444
445 pub fn mark_idle(&mut self, source_id: usize) -> Option<Watermark> {
449 if source_id >= self.idle_sources.len() {
450 return None;
451 }
452
453 self.idle_sources[source_id] = true;
454 self.update_combined()
455 }
456
457 pub fn check_idle_sources(&mut self) -> Option<Watermark> {
461 let mut any_marked = false;
462 for i in 0..self.idle_sources.len() {
463 if !self.idle_sources[i] && self.last_activity[i].elapsed() >= self.idle_timeout {
464 self.idle_sources[i] = true;
465 any_marked = true;
466 }
467 }
468 if any_marked {
469 self.update_combined()
470 } else {
471 None
472 }
473 }
474
475 #[must_use]
477 pub fn current_watermark(&self) -> Option<Watermark> {
478 if self.combined_watermark == i64::MIN {
479 None
480 } else {
481 Some(Watermark::new(self.combined_watermark))
482 }
483 }
484
485 #[must_use]
487 pub fn source_watermark(&self, source_id: usize) -> Option<i64> {
488 self.source_watermarks.get(source_id).copied()
489 }
490
491 #[must_use]
493 pub fn is_idle(&self, source_id: usize) -> bool {
494 self.idle_sources.get(source_id).copied().unwrap_or(false)
495 }
496
497 #[must_use]
499 pub fn num_sources(&self) -> usize {
500 self.source_watermarks.len()
501 }
502
503 #[must_use]
505 pub fn active_source_count(&self) -> usize {
506 self.idle_sources.iter().filter(|&&idle| !idle).count()
507 }
508
509 fn update_combined(&mut self) -> Option<Watermark> {
511 let mut min_watermark = i64::MAX;
513 let mut has_active = false;
514
515 for (i, &wm) in self.source_watermarks.iter().enumerate() {
516 if !self.idle_sources[i] {
517 has_active = true;
518 min_watermark = min_watermark.min(wm);
519 }
520 }
521
522 if !has_active {
524 min_watermark = self
525 .source_watermarks
526 .iter()
527 .copied()
528 .max()
529 .unwrap_or(i64::MIN);
530 }
531
532 if min_watermark > self.combined_watermark && min_watermark != i64::MAX {
533 self.combined_watermark = min_watermark;
534 Some(Watermark::new(min_watermark))
535 } else {
536 None
537 }
538 }
539}
540
541pub struct SourceProvidedGenerator {
546 source_watermark: i64,
548 fallback: BoundedOutOfOrdernessGenerator,
550 prefer_source: bool,
552}
553
554impl SourceProvidedGenerator {
555 #[must_use]
562 pub fn new(fallback_lateness: i64, prefer_source: bool) -> Self {
563 Self {
564 source_watermark: i64::MIN,
565 fallback: BoundedOutOfOrdernessGenerator::new(fallback_lateness),
566 prefer_source,
567 }
568 }
569
570 pub fn on_source_watermark(&mut self, watermark: i64) -> Option<Watermark> {
574 if watermark > self.source_watermark {
575 self.source_watermark = watermark;
576 if self.prefer_source || watermark > self.fallback.current_watermark() {
577 return Some(Watermark::new(watermark));
578 }
579 }
580 None
581 }
582}
583
584impl WatermarkGenerator for SourceProvidedGenerator {
585 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
586 let fallback_wm = self.fallback.on_event(timestamp);
587
588 if self.prefer_source {
589 if self.source_watermark > i64::MIN {
591 return None; }
593 }
594
595 fallback_wm
596 }
597
598 fn on_periodic(&mut self) -> Option<Watermark> {
599 None
600 }
601
602 fn current_watermark(&self) -> i64 {
603 if self.prefer_source && self.source_watermark > i64::MIN {
604 self.source_watermark
605 } else {
606 self.fallback.current_watermark().max(self.source_watermark)
607 }
608 }
609}
610
611#[derive(Debug, Clone, Default)]
613pub struct WatermarkMetrics {
614 pub current_watermark: i64,
616 pub max_event_timestamp: i64,
618 pub watermarks_emitted: u64,
620 pub late_events: u64,
622}
623
624impl WatermarkMetrics {
625 #[must_use]
627 pub fn new() -> Self {
628 Self::default()
629 }
630
631 #[must_use]
633 pub fn lag(&self) -> i64 {
634 self.max_event_timestamp
635 .saturating_sub(self.current_watermark)
636 }
637}
638
639pub struct MeteredGenerator<G: WatermarkGenerator> {
641 inner: G,
642 metrics: WatermarkMetrics,
643}
644
645impl<G: WatermarkGenerator> MeteredGenerator<G> {
646 #[must_use]
648 pub fn new(inner: G) -> Self {
649 Self {
650 inner,
651 metrics: WatermarkMetrics::new(),
652 }
653 }
654
655 #[must_use]
657 pub fn metrics(&self) -> &WatermarkMetrics {
658 &self.metrics
659 }
660
661 pub fn inner_mut(&mut self) -> &mut G {
663 &mut self.inner
664 }
665
666 pub fn record_late_event(&mut self) {
668 self.metrics.late_events += 1;
669 }
670}
671
672impl<G: WatermarkGenerator> WatermarkGenerator for MeteredGenerator<G> {
673 fn on_event(&mut self, timestamp: i64) -> Option<Watermark> {
674 if timestamp > self.metrics.max_event_timestamp {
676 self.metrics.max_event_timestamp = timestamp;
677 }
678
679 let wm = self.inner.on_event(timestamp);
680 if let Some(ref w) = wm {
681 self.metrics.current_watermark = w.timestamp();
682 self.metrics.watermarks_emitted += 1;
683 }
684 wm
685 }
686
687 fn on_periodic(&mut self) -> Option<Watermark> {
688 let wm = self.inner.on_periodic();
689 if let Some(ref w) = wm {
690 self.metrics.current_watermark = w.timestamp();
691 self.metrics.watermarks_emitted += 1;
692 }
693 wm
694 }
695
696 fn current_watermark(&self) -> i64 {
697 self.inner.current_watermark()
698 }
699}
700
701#[cfg(test)]
702mod tests {
703 use super::*;
704
705 #[test]
706 fn test_bounded_generator_first_event() {
707 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
708 let wm = gen.on_event(1000);
709 assert_eq!(wm, Some(Watermark::new(900)));
710 assert_eq!(gen.current_watermark(), 900);
711 }
712
713 #[test]
714 fn test_bounded_generator_out_of_order() {
715 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
716
717 gen.on_event(1000);
719
720 let wm = gen.on_event(800);
722 assert_eq!(wm, None);
723 assert_eq!(gen.current_watermark(), 900); }
725
726 #[test]
727 fn test_bounded_generator_advancement() {
728 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
729
730 gen.on_event(1000);
731 let wm = gen.on_event(1200);
732
733 assert_eq!(wm, Some(Watermark::new(1100)));
734 }
735
736 #[test]
737 fn test_bounded_generator_from_duration() {
738 let gen = BoundedOutOfOrdernessGenerator::from_duration(Duration::from_secs(5));
739 assert_eq!(gen.max_out_of_orderness(), 5000);
740 }
741
742 #[test]
743 fn test_bounded_generator_no_periodic() {
744 let mut gen = BoundedOutOfOrdernessGenerator::new(100);
745 assert_eq!(gen.on_periodic(), None);
746 }
747
748 #[test]
749 fn test_ascending_generator_advances_on_each_event() {
750 let mut gen = AscendingTimestampsGenerator::new();
751
752 let wm1 = gen.on_event(1000);
753 assert_eq!(wm1, Some(Watermark::new(1000)));
754
755 let wm2 = gen.on_event(2000);
756 assert_eq!(wm2, Some(Watermark::new(2000)));
757 }
758
759 #[test]
760 fn test_ascending_generator_ignores_backwards() {
761 let mut gen = AscendingTimestampsGenerator::new();
762
763 gen.on_event(2000);
764 let wm = gen.on_event(1000); assert_eq!(wm, None);
767 assert_eq!(gen.current_watermark(), 2000);
768 }
769
770 #[test]
771 fn test_periodic_generator_passes_through() {
772 let inner = BoundedOutOfOrdernessGenerator::new(100);
773 let mut gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
774
775 let wm = gen.on_event(1000);
776 assert_eq!(wm, Some(Watermark::new(900)));
777 }
778
779 #[test]
780 fn test_periodic_generator_inner_access() {
781 let inner = BoundedOutOfOrdernessGenerator::new(100);
782 let gen = PeriodicGenerator::new(inner, Duration::from_millis(100));
783
784 assert_eq!(gen.inner().max_out_of_orderness(), 100);
785 }
786
787 #[test]
788 fn test_punctuated_generator_predicate() {
789 let mut gen = PunctuatedGenerator::new(|ts| {
790 if ts % 1000 == 0 {
791 Some(Watermark::new(ts))
792 } else {
793 None
794 }
795 });
796
797 assert_eq!(gen.on_event(500), None);
798 assert_eq!(gen.on_event(999), None);
799 assert_eq!(gen.on_event(1000), Some(Watermark::new(1000)));
800 assert_eq!(gen.on_event(1500), None);
801 assert_eq!(gen.on_event(2000), Some(Watermark::new(2000)));
802 }
803
804 #[test]
805 fn test_punctuated_generator_no_regression() {
806 let mut gen = PunctuatedGenerator::new(|ts| Some(Watermark::new(ts)));
807
808 gen.on_event(2000);
809 let wm = gen.on_event(1000); assert_eq!(wm, None);
812 assert_eq!(gen.current_watermark(), 2000);
813 }
814
815 #[test]
816 fn test_tracker_single_source() {
817 let mut tracker = WatermarkTracker::new(1);
818
819 let wm = tracker.update_source(0, 1000);
820 assert_eq!(wm, Some(Watermark::new(1000)));
821 assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
822 }
823
824 #[test]
825 fn test_tracker_multiple_sources() {
826 let mut tracker = WatermarkTracker::new(3);
827
828 tracker.update_source(0, 1000);
830 tracker.update_source(1, 2000);
831 let wm = tracker.update_source(2, 500);
832
833 assert_eq!(wm, Some(Watermark::new(500))); }
835
836 #[test]
837 fn test_tracker_min_watermark() {
838 let mut tracker = WatermarkTracker::new(2);
839
840 tracker.update_source(0, 5000);
841 tracker.update_source(1, 3000);
842
843 assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
844
845 tracker.update_source(1, 4000);
847 assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
848 }
849
850 #[test]
851 fn test_tracker_idle_source() {
852 let mut tracker = WatermarkTracker::new(2);
853
854 tracker.update_source(0, 5000);
855 tracker.update_source(1, 1000);
856
857 let wm = tracker.mark_idle(1);
859
860 assert_eq!(wm, Some(Watermark::new(5000)));
862 }
863
864 #[test]
865 fn test_tracker_all_idle() {
866 let mut tracker = WatermarkTracker::new(2);
867
868 tracker.update_source(0, 5000);
869 tracker.update_source(1, 3000);
870
871 tracker.mark_idle(0);
872 let wm = tracker.mark_idle(1);
873
874 assert_eq!(wm, Some(Watermark::new(5000)));
876 }
877
878 #[test]
879 fn test_tracker_source_watermark() {
880 let mut tracker = WatermarkTracker::new(2);
881
882 tracker.update_source(0, 1000);
883 tracker.update_source(1, 2000);
884
885 assert_eq!(tracker.source_watermark(0), Some(1000));
886 assert_eq!(tracker.source_watermark(1), Some(2000));
887 assert_eq!(tracker.source_watermark(5), None); }
889
890 #[test]
891 fn test_tracker_active_source_count() {
892 let mut tracker = WatermarkTracker::new(3);
893
894 assert_eq!(tracker.active_source_count(), 3);
895
896 tracker.mark_idle(0);
897 assert_eq!(tracker.active_source_count(), 2);
898
899 tracker.mark_idle(2);
900 assert_eq!(tracker.active_source_count(), 1);
901
902 tracker.update_source(0, 1000);
904 assert_eq!(tracker.active_source_count(), 2);
905 }
906
907 #[test]
908 fn test_tracker_invalid_source() {
909 let mut tracker = WatermarkTracker::new(2);
910
911 let wm = tracker.update_source(5, 1000); assert_eq!(wm, None);
913
914 let wm = tracker.mark_idle(5);
915 assert_eq!(wm, None);
916 }
917
918 #[test]
919 fn test_source_provided_fallback() {
920 let mut gen = SourceProvidedGenerator::new(100, false);
921
922 let wm = gen.on_event(1000);
923 assert_eq!(wm, Some(Watermark::new(900))); }
925
926 #[test]
927 fn test_source_provided_explicit_watermark() {
928 let mut gen = SourceProvidedGenerator::new(100, true);
929
930 let wm = gen.on_source_watermark(500);
931 assert_eq!(wm, Some(Watermark::new(500)));
932 assert_eq!(gen.current_watermark(), 500);
933 }
934
935 #[test]
936 fn test_metered_generator_tracks_metrics() {
937 let inner = BoundedOutOfOrdernessGenerator::new(100);
938 let mut gen = MeteredGenerator::new(inner);
939
940 gen.on_event(1000);
941 gen.on_event(2000);
942 gen.on_event(1500); let metrics = gen.metrics();
945 assert_eq!(metrics.max_event_timestamp, 2000);
946 assert_eq!(metrics.watermarks_emitted, 2); }
948
949 #[test]
950 fn test_metered_generator_lag() {
951 let inner = BoundedOutOfOrdernessGenerator::new(100);
952 let mut gen = MeteredGenerator::new(inner);
953
954 gen.on_event(1000);
955
956 let metrics = gen.metrics();
957 assert_eq!(metrics.lag(), 100); }
959
960 #[test]
961 fn test_metered_generator_late_events() {
962 let inner = BoundedOutOfOrdernessGenerator::new(100);
963 let mut gen = MeteredGenerator::new(inner);
964
965 gen.record_late_event();
966 gen.record_late_event();
967
968 assert_eq!(gen.metrics().late_events, 2);
969 }
970
971 #[test]
972 fn test_watermark_metrics_default() {
973 let metrics = WatermarkMetrics::new();
974 assert_eq!(metrics.current_watermark, 0);
975 assert_eq!(metrics.max_event_timestamp, 0);
976 assert_eq!(metrics.watermarks_emitted, 0);
977 assert_eq!(metrics.late_events, 0);
978 }
979}