1use std::collections::HashMap;
50use std::time::{Duration, Instant};
51
52use super::Watermark;
53
54#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
59pub struct PartitionId {
60 pub source_id: usize,
62 pub partition: u32,
64}
65
66impl PartitionId {
67 #[inline]
69 #[must_use]
70 pub const fn new(source_id: usize, partition: u32) -> Self {
71 Self {
72 source_id,
73 partition,
74 }
75 }
76}
77
78impl std::fmt::Display for PartitionId {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 write!(f, "{}:{}", self.source_id, self.partition)
81 }
82}
83
84#[derive(Debug, Clone)]
88pub struct PartitionWatermarkState {
89 pub watermark: i64,
91 pub last_event_time: i64,
93 pub last_activity: Instant,
95 pub is_idle: bool,
97 pub assigned_core: Option<usize>,
99}
100
101impl PartitionWatermarkState {
102 #[must_use]
104 pub fn new() -> Self {
105 Self {
106 watermark: i64::MIN,
107 last_event_time: i64::MIN,
108 last_activity: Instant::now(),
109 is_idle: false,
110 assigned_core: None,
111 }
112 }
113
114 #[must_use]
116 pub fn with_core(core_id: usize) -> Self {
117 Self {
118 watermark: i64::MIN,
119 last_event_time: i64::MIN,
120 last_activity: Instant::now(),
121 is_idle: false,
122 assigned_core: Some(core_id),
123 }
124 }
125}
126
127impl Default for PartitionWatermarkState {
128 fn default() -> Self {
129 Self::new()
130 }
131}
132
133#[derive(Debug, Clone, Default)]
135pub struct PartitionedWatermarkMetrics {
136 pub total_partitions: usize,
138 pub active_partitions: usize,
140 pub idle_partitions: usize,
142 pub watermark_advances: u64,
144 pub rebalances: u64,
146}
147
148impl PartitionedWatermarkMetrics {
149 #[must_use]
151 pub fn new() -> Self {
152 Self::default()
153 }
154}
155
156#[derive(Debug, Clone, thiserror::Error)]
158pub enum WatermarkError {
159 #[error("Unknown partition: {0}")]
161 UnknownPartition(PartitionId),
162
163 #[error("Source not found: {0}")]
165 SourceNotFound(usize),
166
167 #[error("Invalid partition {partition} for source {source_id} (max: {max_partition})")]
169 InvalidPartition {
170 source_id: usize,
172 partition: u32,
174 max_partition: u32,
176 },
177
178 #[error("Partition already exists: {0}")]
180 PartitionExists(PartitionId),
181}
182
183#[derive(Debug)]
220pub struct PartitionedWatermarkTracker {
221 partitions: HashMap<PartitionId, PartitionWatermarkState>,
223
224 source_partition_counts: Vec<usize>,
226
227 combined_watermark: i64,
229
230 idle_timeout: Duration,
232
233 metrics: PartitionedWatermarkMetrics,
235}
236
237impl PartitionedWatermarkTracker {
238 pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
240
241 #[must_use]
243 pub fn new() -> Self {
244 Self {
245 partitions: HashMap::new(),
246 source_partition_counts: Vec::new(),
247 combined_watermark: i64::MIN,
248 idle_timeout: Self::DEFAULT_IDLE_TIMEOUT,
249 metrics: PartitionedWatermarkMetrics::new(),
250 }
251 }
252
253 #[must_use]
255 pub fn with_idle_timeout(idle_timeout: Duration) -> Self {
256 Self {
257 partitions: HashMap::new(),
258 source_partition_counts: Vec::new(),
259 combined_watermark: i64::MIN,
260 idle_timeout,
261 metrics: PartitionedWatermarkMetrics::new(),
262 }
263 }
264
265 pub fn register_source(&mut self, source_id: usize, num_partitions: usize) {
270 while self.source_partition_counts.len() <= source_id {
272 self.source_partition_counts.push(0);
273 }
274
275 self.source_partition_counts[source_id] = num_partitions;
276
277 #[allow(clippy::cast_possible_truncation)]
279 for partition in 0..num_partitions {
281 let pid = PartitionId::new(source_id, partition as u32);
282 self.partitions.entry(pid).or_default();
283 }
284
285 self.update_metrics();
286 }
287
288 pub fn add_partition(&mut self, partition: PartitionId) -> Result<(), WatermarkError> {
294 if self.partitions.contains_key(&partition) {
295 return Err(WatermarkError::PartitionExists(partition));
296 }
297
298 while self.source_partition_counts.len() <= partition.source_id {
300 self.source_partition_counts.push(0);
301 }
302
303 let current_count = self.source_partition_counts[partition.source_id];
305 if partition.partition as usize >= current_count {
306 self.source_partition_counts[partition.source_id] = partition.partition as usize + 1;
307 }
308
309 self.partitions
310 .insert(partition, PartitionWatermarkState::new());
311 self.metrics.rebalances += 1;
312 self.update_metrics();
313
314 Ok(())
315 }
316
317 pub fn remove_partition(&mut self, partition: PartitionId) -> Option<PartitionWatermarkState> {
321 let state = self.partitions.remove(&partition);
322 if state.is_some() {
323 self.metrics.rebalances += 1;
324 self.update_metrics();
325 self.recalculate_combined();
327 }
328 state
329 }
330
331 #[inline]
339 pub fn update_partition(
340 &mut self,
341 partition: PartitionId,
342 watermark: i64,
343 ) -> Option<Watermark> {
344 if let Some(state) = self.partitions.get_mut(&partition) {
345 if state.is_idle {
347 state.is_idle = false;
348 self.metrics.active_partitions += 1;
349 self.metrics.idle_partitions = self.metrics.idle_partitions.saturating_sub(1);
350 }
351 state.last_activity = Instant::now();
352
353 if watermark > state.watermark {
355 state.watermark = watermark;
356 state.last_event_time = watermark;
357 return self.try_advance_combined();
358 }
359 }
360 None
361 }
362
363 #[inline]
367 pub fn update_partition_from_event(
368 &mut self,
369 partition: PartitionId,
370 event_time: i64,
371 max_lateness: i64,
372 ) -> Option<Watermark> {
373 let watermark = event_time.saturating_sub(max_lateness);
374 self.update_partition(partition, watermark)
375 }
376
377 pub fn mark_partition_idle(&mut self, partition: PartitionId) -> Option<Watermark> {
383 if let Some(state) = self.partitions.get_mut(&partition) {
384 if !state.is_idle {
385 state.is_idle = true;
386 self.metrics.idle_partitions += 1;
387 self.metrics.active_partitions = self.metrics.active_partitions.saturating_sub(1);
388 return self.try_advance_combined();
389 }
390 }
391 None
392 }
393
394 pub fn mark_partition_active(&mut self, partition: PartitionId) {
399 if let Some(state) = self.partitions.get_mut(&partition) {
400 if state.is_idle {
401 state.is_idle = false;
402 state.last_activity = Instant::now();
403 self.metrics.active_partitions += 1;
404 self.metrics.idle_partitions = self.metrics.idle_partitions.saturating_sub(1);
405 }
406 }
407 }
408
409 pub fn check_idle_partitions(&mut self) -> Option<Watermark> {
417 let mut any_marked = false;
418
419 for state in self.partitions.values_mut() {
420 if !state.is_idle && state.last_activity.elapsed() >= self.idle_timeout {
421 state.is_idle = true;
422 any_marked = true;
423 }
424 }
425
426 if any_marked {
427 self.update_metrics();
428 self.try_advance_combined()
429 } else {
430 None
431 }
432 }
433
434 #[inline]
436 #[must_use]
437 pub fn current_watermark(&self) -> Option<Watermark> {
438 if self.combined_watermark == i64::MIN {
439 None
440 } else {
441 Some(Watermark::new(self.combined_watermark))
442 }
443 }
444
445 #[must_use]
447 pub fn partition_watermark(&self, partition: PartitionId) -> Option<i64> {
448 self.partitions.get(&partition).map(|s| s.watermark)
449 }
450
451 #[must_use]
453 pub fn source_watermark(&self, source_id: usize) -> Option<i64> {
454 let mut min_watermark = i64::MAX;
455 let mut found = false;
456
457 for (pid, state) in &self.partitions {
458 if pid.source_id == source_id && !state.is_idle {
459 found = true;
460 min_watermark = min_watermark.min(state.watermark);
461 }
462 }
463
464 if found && min_watermark != i64::MAX {
465 Some(min_watermark)
466 } else {
467 None
468 }
469 }
470
471 #[must_use]
473 pub fn is_partition_idle(&self, partition: PartitionId) -> bool {
474 self.partitions.get(&partition).is_some_and(|s| s.is_idle)
475 }
476
477 #[must_use]
479 pub fn active_partition_count(&self, source_id: usize) -> usize {
480 self.partitions
481 .iter()
482 .filter(|(pid, state)| pid.source_id == source_id && !state.is_idle)
483 .count()
484 }
485
486 #[must_use]
488 pub fn partition_count(&self, source_id: usize) -> usize {
489 self.source_partition_counts
490 .get(source_id)
491 .copied()
492 .unwrap_or(0)
493 }
494
495 #[must_use]
497 pub fn metrics(&self) -> &PartitionedWatermarkMetrics {
498 &self.metrics
499 }
500
501 #[must_use]
503 pub fn num_sources(&self) -> usize {
504 self.source_partition_counts.len()
505 }
506
507 pub fn assign_partition_to_core(&mut self, partition: PartitionId, core_id: usize) {
509 if let Some(state) = self.partitions.get_mut(&partition) {
510 state.assigned_core = Some(core_id);
511 }
512 }
513
514 #[must_use]
516 pub fn partition_core(&self, partition: PartitionId) -> Option<usize> {
517 self.partitions
518 .get(&partition)
519 .and_then(|s| s.assigned_core)
520 }
521
522 #[must_use]
524 pub fn partitions_for_core(&self, core_id: usize) -> Vec<PartitionId> {
525 self.partitions
526 .iter()
527 .filter_map(|(pid, state)| {
528 if state.assigned_core == Some(core_id) {
529 Some(*pid)
530 } else {
531 None
532 }
533 })
534 .collect()
535 }
536
537 #[must_use]
539 pub fn idle_timeout(&self) -> Duration {
540 self.idle_timeout
541 }
542
543 pub fn set_idle_timeout(&mut self, timeout: Duration) {
545 self.idle_timeout = timeout;
546 }
547
548 #[must_use]
550 pub fn partition_state(&self, partition: PartitionId) -> Option<&PartitionWatermarkState> {
551 self.partitions.get(&partition)
552 }
553
554 fn try_advance_combined(&mut self) -> Option<Watermark> {
556 let new_combined = self.calculate_combined();
557
558 if new_combined > self.combined_watermark && new_combined != i64::MAX {
559 self.combined_watermark = new_combined;
560 self.metrics.watermark_advances += 1;
561 Some(Watermark::new(new_combined))
562 } else {
563 None
564 }
565 }
566
567 fn recalculate_combined(&mut self) {
569 let new_combined = self.calculate_combined();
570 if new_combined != i64::MAX {
571 self.combined_watermark = new_combined;
572 }
573 }
574
575 fn calculate_combined(&self) -> i64 {
577 let mut min_watermark = i64::MAX;
578 let mut has_active = false;
579
580 for state in self.partitions.values() {
581 if !state.is_idle {
582 has_active = true;
583 min_watermark = min_watermark.min(state.watermark);
584 }
585 }
586
587 if !has_active {
589 min_watermark = self
590 .partitions
591 .values()
592 .map(|s| s.watermark)
593 .max()
594 .unwrap_or(i64::MIN);
595 }
596
597 min_watermark
598 }
599
600 fn update_metrics(&mut self) {
602 self.metrics.total_partitions = self.partitions.len();
603 self.metrics.idle_partitions = self.partitions.values().filter(|s| s.is_idle).count();
604 self.metrics.active_partitions =
605 self.metrics.total_partitions - self.metrics.idle_partitions;
606 }
607}
608
609impl Default for PartitionedWatermarkTracker {
610 fn default() -> Self {
611 Self::new()
612 }
613}
614
615#[derive(Debug)]
623pub struct CoreWatermarkState {
624 assigned_partitions: Vec<PartitionId>,
626
627 partition_watermarks: Vec<i64>,
629
630 local_watermark: i64,
632
633 idle_status: Vec<bool>,
635
636 core_id: usize,
638}
639
640impl CoreWatermarkState {
641 #[must_use]
643 pub fn new(core_id: usize) -> Self {
644 Self {
645 assigned_partitions: Vec::new(),
646 partition_watermarks: Vec::new(),
647 local_watermark: i64::MIN,
648 idle_status: Vec::new(),
649 core_id,
650 }
651 }
652
653 #[must_use]
655 pub fn with_partitions(core_id: usize, partitions: Vec<PartitionId>) -> Self {
656 let count = partitions.len();
657 Self {
658 assigned_partitions: partitions,
659 partition_watermarks: vec![i64::MIN; count],
660 local_watermark: i64::MIN,
661 idle_status: vec![false; count],
662 core_id,
663 }
664 }
665
666 pub fn assign_partition(&mut self, partition: PartitionId) {
668 if !self.assigned_partitions.contains(&partition) {
669 self.assigned_partitions.push(partition);
670 self.partition_watermarks.push(i64::MIN);
671 self.idle_status.push(false);
672 }
673 }
674
675 pub fn remove_partition(&mut self, partition: PartitionId) -> bool {
677 if let Some(idx) = self
678 .assigned_partitions
679 .iter()
680 .position(|p| *p == partition)
681 {
682 self.assigned_partitions.swap_remove(idx);
683 self.partition_watermarks.swap_remove(idx);
684 self.idle_status.swap_remove(idx);
685 self.recalculate_local();
686 true
687 } else {
688 false
689 }
690 }
691
692 #[inline]
698 pub fn update_partition(&mut self, partition: PartitionId, watermark: i64) -> Option<i64> {
699 if let Some(idx) = self
700 .assigned_partitions
701 .iter()
702 .position(|p| *p == partition)
703 {
704 if watermark > self.partition_watermarks[idx] {
705 self.partition_watermarks[idx] = watermark;
706 self.idle_status[idx] = false;
707
708 let new_local = self.calculate_local();
710 if new_local > self.local_watermark {
711 self.local_watermark = new_local;
712 return Some(new_local);
713 }
714 }
715 }
716 None
717 }
718
719 pub fn mark_idle(&mut self, partition: PartitionId) -> Option<i64> {
721 if let Some(idx) = self
722 .assigned_partitions
723 .iter()
724 .position(|p| *p == partition)
725 {
726 if !self.idle_status[idx] {
727 self.idle_status[idx] = true;
728
729 let new_local = self.calculate_local();
731 if new_local > self.local_watermark {
732 self.local_watermark = new_local;
733 return Some(new_local);
734 }
735 }
736 }
737 None
738 }
739
740 #[inline]
742 #[must_use]
743 pub fn local_watermark(&self) -> i64 {
744 self.local_watermark
745 }
746
747 #[must_use]
749 pub fn core_id(&self) -> usize {
750 self.core_id
751 }
752
753 #[must_use]
755 pub fn assigned_partitions(&self) -> &[PartitionId] {
756 &self.assigned_partitions
757 }
758
759 #[must_use]
761 pub fn partition_count(&self) -> usize {
762 self.assigned_partitions.len()
763 }
764
765 fn calculate_local(&self) -> i64 {
767 let mut min = i64::MAX;
768 let mut has_active = false;
769
770 for (idx, &wm) in self.partition_watermarks.iter().enumerate() {
771 if !self.idle_status[idx] {
772 has_active = true;
773 min = min.min(wm);
774 }
775 }
776
777 if !has_active {
778 self.partition_watermarks
780 .iter()
781 .copied()
782 .max()
783 .unwrap_or(i64::MIN)
784 } else if min == i64::MAX {
785 i64::MIN
786 } else {
787 min
788 }
789 }
790
791 fn recalculate_local(&mut self) {
793 self.local_watermark = self.calculate_local();
794 }
795}
796
797#[derive(Debug)]
802pub struct GlobalWatermarkCollector {
803 core_watermarks: Vec<i64>,
805
806 global_watermark: i64,
808}
809
810impl GlobalWatermarkCollector {
811 #[must_use]
813 pub fn new(num_cores: usize) -> Self {
814 Self {
815 core_watermarks: vec![i64::MIN; num_cores],
816 global_watermark: i64::MIN,
817 }
818 }
819
820 #[inline]
826 pub fn update_core(&mut self, core_id: usize, watermark: i64) -> Option<Watermark> {
827 if core_id < self.core_watermarks.len() {
828 self.core_watermarks[core_id] = watermark;
829
830 let new_global = self
832 .core_watermarks
833 .iter()
834 .copied()
835 .min()
836 .unwrap_or(i64::MIN);
837
838 if new_global > self.global_watermark && new_global != i64::MIN {
839 self.global_watermark = new_global;
840 return Some(Watermark::new(new_global));
841 }
842 }
843 None
844 }
845
846 #[must_use]
848 pub fn global_watermark(&self) -> Option<Watermark> {
849 if self.global_watermark == i64::MIN {
850 None
851 } else {
852 Some(Watermark::new(self.global_watermark))
853 }
854 }
855
856 #[must_use]
858 pub fn core_watermark(&self, core_id: usize) -> Option<i64> {
859 self.core_watermarks.get(core_id).copied()
860 }
861
862 #[must_use]
864 pub fn num_cores(&self) -> usize {
865 self.core_watermarks.len()
866 }
867}
868
869#[cfg(test)]
870mod tests {
871 use super::*;
872
873 #[test]
874 fn test_partition_id_creation() {
875 let pid = PartitionId::new(1, 3);
876 assert_eq!(pid.source_id, 1);
877 assert_eq!(pid.partition, 3);
878 }
879
880 #[test]
881 fn test_partition_id_equality() {
882 let p1 = PartitionId::new(1, 2);
883 let p2 = PartitionId::new(1, 2);
884 let p3 = PartitionId::new(1, 3);
885
886 assert_eq!(p1, p2);
887 assert_ne!(p1, p3);
888 }
889
890 #[test]
891 fn test_partition_id_display() {
892 let pid = PartitionId::new(2, 5);
893 assert_eq!(format!("{pid}"), "2:5");
894 }
895
896 #[test]
897 fn test_partitioned_tracker_single_partition_updates_watermark() {
898 let mut tracker = PartitionedWatermarkTracker::new();
899 tracker.register_source(0, 1);
900
901 let wm = tracker.update_partition(PartitionId::new(0, 0), 1000);
902 assert_eq!(wm, Some(Watermark::new(1000)));
903 assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
904 }
905
906 #[test]
907 fn test_partitioned_tracker_multiple_partitions_uses_minimum() {
908 let mut tracker = PartitionedWatermarkTracker::new();
909 tracker.register_source(0, 4);
910
911 tracker.update_partition(PartitionId::new(0, 0), 5000);
912 tracker.update_partition(PartitionId::new(0, 1), 3000);
913 tracker.update_partition(PartitionId::new(0, 2), 4000);
914 tracker.update_partition(PartitionId::new(0, 3), 4500);
915
916 assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
917 }
918
919 #[test]
920 fn test_partitioned_tracker_idle_partition_excluded_from_min() {
921 let mut tracker = PartitionedWatermarkTracker::new();
922 tracker.register_source(0, 4);
923
924 tracker.update_partition(PartitionId::new(0, 0), 5000);
925 tracker.update_partition(PartitionId::new(0, 1), 1000); tracker.update_partition(PartitionId::new(0, 2), 4000);
927 tracker.update_partition(PartitionId::new(0, 3), 4500);
928
929 assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
930
931 let wm = tracker.mark_partition_idle(PartitionId::new(0, 1));
933 assert_eq!(wm, Some(Watermark::new(4000)));
934 assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
935 }
936
937 #[test]
938 fn test_partitioned_tracker_all_idle_uses_max() {
939 let mut tracker = PartitionedWatermarkTracker::new();
940 tracker.register_source(0, 2);
941
942 tracker.update_partition(PartitionId::new(0, 0), 5000);
943 tracker.update_partition(PartitionId::new(0, 1), 3000);
944
945 tracker.mark_partition_idle(PartitionId::new(0, 0));
946 let wm = tracker.mark_partition_idle(PartitionId::new(0, 1));
947
948 assert_eq!(wm, Some(Watermark::new(5000)));
950 }
951
952 #[test]
953 fn test_partitioned_tracker_partition_reactivated_on_update() {
954 let mut tracker = PartitionedWatermarkTracker::new();
955 tracker.register_source(0, 2);
956
957 tracker.update_partition(PartitionId::new(0, 0), 5000);
958 tracker.update_partition(PartitionId::new(0, 1), 3000);
959
960 tracker.mark_partition_idle(PartitionId::new(0, 1));
962 assert!(tracker.is_partition_idle(PartitionId::new(0, 1)));
963 assert_eq!(tracker.current_watermark(), Some(Watermark::new(5000)));
964
965 tracker.update_partition(PartitionId::new(0, 1), 4000);
967 assert!(!tracker.is_partition_idle(PartitionId::new(0, 1)));
968
969 assert_eq!(tracker.current_watermark(), Some(Watermark::new(5000))); }
972
973 #[test]
974 fn test_partitioned_tracker_add_partition_during_operation() {
975 let mut tracker = PartitionedWatermarkTracker::new();
976 tracker.register_source(0, 2);
977
978 tracker.update_partition(PartitionId::new(0, 0), 5000);
979 tracker.update_partition(PartitionId::new(0, 1), 4000);
980
981 tracker.add_partition(PartitionId::new(0, 2)).unwrap();
983
984 assert_eq!(tracker.partition_count(0), 3);
986
987 tracker.update_partition(PartitionId::new(0, 2), 3000);
989 assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000))); }
992
993 #[test]
994 fn test_partitioned_tracker_remove_partition_recalculates_watermark() {
995 let mut tracker = PartitionedWatermarkTracker::new();
996 tracker.register_source(0, 3);
997
998 tracker.update_partition(PartitionId::new(0, 0), 5000);
999 tracker.update_partition(PartitionId::new(0, 1), 2000); tracker.update_partition(PartitionId::new(0, 2), 4000);
1001
1002 assert_eq!(tracker.current_watermark(), Some(Watermark::new(2000)));
1003
1004 let state = tracker.remove_partition(PartitionId::new(0, 1));
1006 assert!(state.is_some());
1007 assert_eq!(state.unwrap().watermark, 2000);
1008
1009 assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
1012 }
1013
1014 #[test]
1015 fn test_partitioned_tracker_check_idle_marks_stale_partitions() {
1016 let mut tracker = PartitionedWatermarkTracker::with_idle_timeout(Duration::from_millis(10));
1017 tracker.register_source(0, 2);
1018
1019 tracker.update_partition(PartitionId::new(0, 0), 5000);
1020 tracker.update_partition(PartitionId::new(0, 1), 3000);
1021
1022 std::thread::sleep(Duration::from_millis(20));
1024
1025 tracker.update_partition(PartitionId::new(0, 0), 6000);
1027
1028 let wm = tracker.check_idle_partitions();
1030
1031 assert!(tracker.is_partition_idle(PartitionId::new(0, 1)));
1032 assert!(wm.is_some() || tracker.current_watermark() == Some(Watermark::new(6000)));
1034 }
1035
1036 #[test]
1037 fn test_partitioned_tracker_source_watermark_aggregates_partitions() {
1038 let mut tracker = PartitionedWatermarkTracker::new();
1039 tracker.register_source(0, 2);
1040 tracker.register_source(1, 2);
1041
1042 tracker.update_partition(PartitionId::new(0, 0), 5000);
1043 tracker.update_partition(PartitionId::new(0, 1), 3000);
1044 tracker.update_partition(PartitionId::new(1, 0), 4000);
1045 tracker.update_partition(PartitionId::new(1, 1), 6000);
1046
1047 assert_eq!(tracker.source_watermark(0), Some(3000));
1048 assert_eq!(tracker.source_watermark(1), Some(4000));
1049 }
1050
1051 #[test]
1052 fn test_partitioned_tracker_metrics_accurate() {
1053 let mut tracker = PartitionedWatermarkTracker::new();
1054 tracker.register_source(0, 4);
1055
1056 tracker.update_partition(PartitionId::new(0, 0), 1000);
1057 tracker.update_partition(PartitionId::new(0, 1), 2000);
1058
1059 let metrics = tracker.metrics();
1060 assert_eq!(metrics.total_partitions, 4);
1061 assert_eq!(metrics.active_partitions, 4);
1062 assert_eq!(metrics.idle_partitions, 0);
1063
1064 tracker.mark_partition_idle(PartitionId::new(0, 2));
1065
1066 let metrics = tracker.metrics();
1067 assert_eq!(metrics.idle_partitions, 1);
1068 assert_eq!(metrics.active_partitions, 3);
1069 }
1070
1071 #[test]
1072 fn test_partitioned_tracker_core_assignment() {
1073 let mut tracker = PartitionedWatermarkTracker::new();
1074 tracker.register_source(0, 4);
1075
1076 tracker.assign_partition_to_core(PartitionId::new(0, 0), 0);
1077 tracker.assign_partition_to_core(PartitionId::new(0, 1), 0);
1078 tracker.assign_partition_to_core(PartitionId::new(0, 2), 1);
1079 tracker.assign_partition_to_core(PartitionId::new(0, 3), 1);
1080
1081 assert_eq!(tracker.partition_core(PartitionId::new(0, 0)), Some(0));
1082 assert_eq!(tracker.partition_core(PartitionId::new(0, 2)), Some(1));
1083
1084 let core0_partitions = tracker.partitions_for_core(0);
1085 assert_eq!(core0_partitions.len(), 2);
1086 }
1087
1088 #[test]
1089 fn test_partitioned_tracker_multiple_sources() {
1090 let mut tracker = PartitionedWatermarkTracker::new();
1091 tracker.register_source(0, 2);
1092 tracker.register_source(1, 3);
1093
1094 assert_eq!(tracker.num_sources(), 2);
1095 assert_eq!(tracker.partition_count(0), 2);
1096 assert_eq!(tracker.partition_count(1), 3);
1097 }
1098
1099 #[test]
1100 fn test_partitioned_tracker_update_from_event() {
1101 let mut tracker = PartitionedWatermarkTracker::new();
1102 tracker.register_source(0, 1);
1103
1104 let wm = tracker.update_partition_from_event(PartitionId::new(0, 0), 5000, 1000);
1106 assert_eq!(wm, Some(Watermark::new(4000)));
1107 }
1108
1109 #[test]
1110 fn test_partitioned_tracker_add_partition_error() {
1111 let mut tracker = PartitionedWatermarkTracker::new();
1112 tracker.register_source(0, 2);
1113
1114 let result = tracker.add_partition(PartitionId::new(0, 0));
1116 assert!(matches!(result, Err(WatermarkError::PartitionExists(_))));
1117 }
1118
1119 #[test]
1120 fn test_core_watermark_state_creation() {
1121 let state = CoreWatermarkState::new(0);
1122 assert_eq!(state.core_id(), 0);
1123 assert_eq!(state.partition_count(), 0);
1124 assert_eq!(state.local_watermark(), i64::MIN);
1125 }
1126
1127 #[test]
1128 fn test_core_watermark_state_with_partitions() {
1129 let partitions = vec![PartitionId::new(0, 0), PartitionId::new(0, 1)];
1130 let state = CoreWatermarkState::with_partitions(1, partitions);
1131
1132 assert_eq!(state.core_id(), 1);
1133 assert_eq!(state.partition_count(), 2);
1134 }
1135
1136 #[test]
1137 fn test_core_watermark_state_update() {
1138 let mut state = CoreWatermarkState::with_partitions(
1139 0,
1140 vec![PartitionId::new(0, 0), PartitionId::new(0, 1)],
1141 );
1142
1143 let wm = state.update_partition(PartitionId::new(0, 0), 5000);
1145 assert!(wm.is_none()); assert_eq!(state.local_watermark(), i64::MIN);
1147
1148 let wm = state.update_partition(PartitionId::new(0, 1), 3000);
1150 assert_eq!(wm, Some(3000)); assert_eq!(state.local_watermark(), 3000);
1152
1153 let wm = state.update_partition(PartitionId::new(0, 0), 6000);
1155 assert!(wm.is_none());
1156 assert_eq!(state.local_watermark(), 3000);
1157
1158 let wm = state.update_partition(PartitionId::new(0, 1), 4000);
1160 assert_eq!(wm, Some(4000));
1161 assert_eq!(state.local_watermark(), 4000);
1162 }
1163
1164 #[test]
1165 fn test_core_watermark_state_idle() {
1166 let mut state = CoreWatermarkState::with_partitions(
1167 0,
1168 vec![PartitionId::new(0, 0), PartitionId::new(0, 1)],
1169 );
1170
1171 state.update_partition(PartitionId::new(0, 0), 5000);
1172 state.update_partition(PartitionId::new(0, 1), 2000);
1173
1174 assert_eq!(state.local_watermark(), 2000);
1175
1176 let wm = state.mark_idle(PartitionId::new(0, 1));
1178 assert_eq!(wm, Some(5000));
1179 assert_eq!(state.local_watermark(), 5000);
1180 }
1181
1182 #[test]
1183 fn test_core_watermark_state_assign_remove() {
1184 let mut state = CoreWatermarkState::new(0);
1185
1186 state.assign_partition(PartitionId::new(0, 0));
1187 state.assign_partition(PartitionId::new(0, 1));
1188 assert_eq!(state.partition_count(), 2);
1189
1190 state.update_partition(PartitionId::new(0, 0), 5000);
1191 state.update_partition(PartitionId::new(0, 1), 3000);
1192 assert_eq!(state.local_watermark(), 3000);
1193
1194 let removed = state.remove_partition(PartitionId::new(0, 1));
1196 assert!(removed);
1197 assert_eq!(state.partition_count(), 1);
1198 assert_eq!(state.local_watermark(), 5000);
1199 }
1200
1201 #[test]
1202 fn test_global_collector_creation() {
1203 let collector = GlobalWatermarkCollector::new(4);
1204 assert_eq!(collector.num_cores(), 4);
1205 assert_eq!(collector.global_watermark(), None);
1206 }
1207
1208 #[test]
1209 fn test_global_collector_update() {
1210 let mut collector = GlobalWatermarkCollector::new(3);
1211
1212 collector.update_core(0, 5000);
1213 collector.update_core(1, 3000);
1214 let wm = collector.update_core(2, 4000);
1215
1216 assert_eq!(wm, Some(Watermark::new(3000)));
1218 assert_eq!(collector.global_watermark(), Some(Watermark::new(3000)));
1219 }
1220
1221 #[test]
1222 fn test_global_collector_advancement() {
1223 let mut collector = GlobalWatermarkCollector::new(2);
1224
1225 collector.update_core(0, 5000);
1226 collector.update_core(1, 3000);
1227
1228 assert_eq!(collector.global_watermark(), Some(Watermark::new(3000)));
1229
1230 let wm = collector.update_core(1, 4000);
1232 assert_eq!(wm, Some(Watermark::new(4000)));
1233 }
1234
1235 #[test]
1236 fn test_global_collector_no_regression() {
1237 let mut collector = GlobalWatermarkCollector::new(2);
1238
1239 collector.update_core(0, 5000);
1240 collector.update_core(1, 4000);
1241
1242 let wm = collector.update_core(1, 3000);
1244 assert!(wm.is_none());
1245 assert_eq!(collector.core_watermark(1), Some(3000));
1247 }
1248}