1use std::collections::HashMap;
54use std::time::{Duration, Instant};
55
56#[derive(Debug, Clone, Hash, Eq, PartialEq)]
58pub struct AlignmentGroupId(pub String);
59
60impl AlignmentGroupId {
61 #[must_use]
63 pub fn new(id: impl Into<String>) -> Self {
64 Self(id.into())
65 }
66
67 #[must_use]
69 pub fn as_str(&self) -> &str {
70 &self.0
71 }
72}
73
74impl std::fmt::Display for AlignmentGroupId {
75 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76 write!(f, "{}", self.0)
77 }
78}
79
80#[derive(Debug, Clone)]
82pub struct AlignmentGroupConfig {
83 pub group_id: AlignmentGroupId,
85 pub max_drift: Duration,
87 pub update_interval: Duration,
89 pub enforcement_mode: EnforcementMode,
91}
92
93impl AlignmentGroupConfig {
94 #[must_use]
96 pub fn new(group_id: impl Into<String>) -> Self {
97 Self {
98 group_id: AlignmentGroupId::new(group_id),
99 max_drift: Duration::from_secs(300), update_interval: Duration::from_secs(1),
101 enforcement_mode: EnforcementMode::Pause,
102 }
103 }
104
105 #[must_use]
107 pub fn with_max_drift(mut self, max_drift: Duration) -> Self {
108 self.max_drift = max_drift;
109 self
110 }
111
112 #[must_use]
114 pub fn with_update_interval(mut self, interval: Duration) -> Self {
115 self.update_interval = interval;
116 self
117 }
118
119 #[must_use]
121 pub fn with_enforcement_mode(mut self, mode: EnforcementMode) -> Self {
122 self.enforcement_mode = mode;
123 self
124 }
125}
126
127#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
129pub enum EnforcementMode {
130 #[default]
132 Pause,
133 WarnOnly,
135 DropExcess,
137}
138
139#[derive(Debug, Clone)]
141pub struct AlignmentSourceState {
142 pub source_id: usize,
144 pub watermark: i64,
146 pub is_paused: bool,
148 pub pause_start: Option<Instant>,
150 pub total_pause_time: Duration,
152 pub events_dropped_while_paused: u64,
154 pub last_activity: Instant,
156}
157
158impl AlignmentSourceState {
159 fn new(source_id: usize) -> Self {
161 Self {
162 source_id,
163 watermark: i64::MIN,
164 is_paused: false,
165 pause_start: None,
166 total_pause_time: Duration::ZERO,
167 events_dropped_while_paused: 0,
168 last_activity: Instant::now(),
169 }
170 }
171
172 fn pause(&mut self) {
174 if !self.is_paused {
175 self.is_paused = true;
176 self.pause_start = Some(Instant::now());
177 }
178 }
179
180 fn resume(&mut self) {
182 if self.is_paused {
183 self.is_paused = false;
184 if let Some(start) = self.pause_start.take() {
185 self.total_pause_time += start.elapsed();
186 }
187 }
188 }
189}
190
191#[derive(Debug, Clone, Default)]
193pub struct AlignmentGroupMetrics {
194 pub pause_events: u64,
196 pub resume_events: u64,
198 pub total_pause_time: Duration,
200 pub max_observed_drift: Duration,
202 pub current_drift: Duration,
204 pub events_dropped: u64,
206 pub warnings_emitted: u64,
208}
209
210impl AlignmentGroupMetrics {
211 #[must_use]
213 pub fn new() -> Self {
214 Self::default()
215 }
216
217 pub fn reset(&mut self) {
219 *self = Self::default();
220 }
221}
222
223#[derive(Debug, Clone, Copy, PartialEq, Eq)]
225pub enum AlignmentAction {
226 Continue,
228 Pause,
230 Resume,
232 Drop,
234 Warn {
236 drift_ms: i64,
238 },
239}
240
241#[derive(Debug)]
285pub struct WatermarkAlignmentGroup {
286 config: AlignmentGroupConfig,
288 sources: HashMap<usize, AlignmentSourceState>,
290 min_watermark: i64,
292 max_watermark: i64,
294 last_check: Instant,
296 metrics: AlignmentGroupMetrics,
298}
299
300impl WatermarkAlignmentGroup {
301 #[must_use]
303 pub fn new(config: AlignmentGroupConfig) -> Self {
304 Self {
305 config,
306 sources: HashMap::new(),
307 min_watermark: i64::MIN,
308 max_watermark: i64::MIN,
309 last_check: Instant::now(),
310 metrics: AlignmentGroupMetrics::new(),
311 }
312 }
313
314 #[must_use]
316 pub fn group_id(&self) -> &AlignmentGroupId {
317 &self.config.group_id
318 }
319
320 #[must_use]
322 pub fn config(&self) -> &AlignmentGroupConfig {
323 &self.config
324 }
325
326 pub fn register_source(&mut self, source_id: usize) {
328 self.sources
329 .entry(source_id)
330 .or_insert_with(|| AlignmentSourceState::new(source_id));
331 }
332
333 pub fn unregister_source(&mut self, source_id: usize) {
335 self.sources.remove(&source_id);
336 self.recalculate_bounds();
337 }
338
339 pub fn report_watermark(&mut self, source_id: usize, watermark: i64) -> AlignmentAction {
343 self.sources
345 .entry(source_id)
346 .or_insert_with(|| AlignmentSourceState::new(source_id));
347
348 let is_paused = self.sources.get(&source_id).is_some_and(|s| s.is_paused);
350
351 if is_paused {
352 match self.config.enforcement_mode {
353 EnforcementMode::Pause => {
354 let can_resume = self.can_resume_with_watermark(source_id, watermark);
356 if can_resume {
357 if let Some(source) = self.sources.get_mut(&source_id) {
358 source.watermark = watermark;
359 source.last_activity = Instant::now();
360 source.resume();
361 }
362 self.metrics.resume_events += 1;
363 self.recalculate_bounds();
364 return AlignmentAction::Resume;
365 }
366 return AlignmentAction::Pause;
367 }
368 EnforcementMode::DropExcess => {
369 if let Some(source) = self.sources.get_mut(&source_id) {
370 source.events_dropped_while_paused += 1;
371 }
372 self.metrics.events_dropped += 1;
373 return AlignmentAction::Drop;
374 }
375 EnforcementMode::WarnOnly => {
376 }
378 }
379 }
380
381 if let Some(source) = self.sources.get_mut(&source_id) {
383 source.watermark = watermark;
384 source.last_activity = Instant::now();
385 }
386
387 self.recalculate_bounds();
389
390 let current_drift = if self.min_watermark == i64::MIN || self.max_watermark == i64::MIN {
392 Duration::ZERO
393 } else {
394 let drift_ms = self.max_watermark.saturating_sub(self.min_watermark).max(0);
395 #[allow(clippy::cast_sign_loss)]
397 Duration::from_millis(drift_ms as u64)
398 };
399
400 self.metrics.current_drift = current_drift;
401 if current_drift > self.metrics.max_observed_drift {
402 self.metrics.max_observed_drift = current_drift;
403 }
404
405 if watermark == self.max_watermark && current_drift > self.config.max_drift {
407 match self.config.enforcement_mode {
408 EnforcementMode::Pause => {
409 if let Some(source) = self.sources.get_mut(&source_id) {
410 source.pause();
411 }
412 self.metrics.pause_events += 1;
413 AlignmentAction::Pause
414 }
415 EnforcementMode::WarnOnly => {
416 self.metrics.warnings_emitted += 1;
417 #[allow(clippy::cast_possible_truncation)]
419 let drift_ms = current_drift.as_millis().min(i64::MAX as u128) as i64;
420 AlignmentAction::Warn { drift_ms }
421 }
422 EnforcementMode::DropExcess => {
423 if let Some(source) = self.sources.get_mut(&source_id) {
424 source.is_paused = true; source.events_dropped_while_paused += 1;
426 }
427 self.metrics.events_dropped += 1;
428 AlignmentAction::Drop
429 }
430 }
431 } else {
432 AlignmentAction::Continue
433 }
434 }
435
436 #[must_use]
438 pub fn should_resume(&self, source_id: usize) -> bool {
439 let Some(source) = self.sources.get(&source_id) else {
440 return false;
441 };
442
443 if !source.is_paused {
444 return false;
445 }
446
447 self.can_resume_with_watermark(source_id, source.watermark)
448 }
449
450 fn can_resume_with_watermark(&self, source_id: usize, watermark: i64) -> bool {
452 let min_without_source = self
454 .sources
455 .iter()
456 .filter(|(&id, s)| id != source_id && !s.is_paused && s.watermark != i64::MIN)
457 .map(|(_, s)| s.watermark)
458 .min()
459 .unwrap_or(i64::MIN);
460
461 if min_without_source == i64::MIN {
462 return true; }
464
465 let drift_if_resumed = watermark.saturating_sub(min_without_source).max(0);
466 #[allow(clippy::cast_sign_loss)]
468 let drift_duration = Duration::from_millis(drift_if_resumed as u64);
469
470 drift_duration <= self.config.max_drift
471 }
472
473 #[must_use]
475 pub fn current_drift(&self) -> Duration {
476 self.metrics.current_drift
477 }
478
479 #[must_use]
481 pub fn is_paused(&self, source_id: usize) -> bool {
482 self.sources.get(&source_id).is_some_and(|s| s.is_paused)
483 }
484
485 #[must_use]
487 pub fn min_watermark(&self) -> i64 {
488 self.min_watermark
489 }
490
491 #[must_use]
493 pub fn max_watermark(&self) -> i64 {
494 self.max_watermark
495 }
496
497 #[must_use]
499 pub fn metrics(&self) -> &AlignmentGroupMetrics {
500 &self.metrics
501 }
502
503 #[must_use]
505 pub fn source_count(&self) -> usize {
506 self.sources.len()
507 }
508
509 #[must_use]
511 pub fn paused_source_count(&self) -> usize {
512 self.sources.values().filter(|s| s.is_paused).count()
513 }
514
515 #[must_use]
517 pub fn active_source_count(&self) -> usize {
518 self.sources.values().filter(|s| !s.is_paused).count()
519 }
520
521 pub fn check_alignment(&mut self) -> Vec<(usize, AlignmentAction)> {
526 if self.last_check.elapsed() < self.config.update_interval {
527 return Vec::new();
528 }
529
530 self.last_check = Instant::now();
531 let mut actions = Vec::new();
532
533 let paused_sources: Vec<usize> = self
535 .sources
536 .iter()
537 .filter(|(_, s)| s.is_paused)
538 .map(|(&id, _)| id)
539 .collect();
540
541 for source_id in paused_sources {
542 if self.should_resume(source_id) {
543 if let Some(source) = self.sources.get_mut(&source_id) {
544 source.resume();
545 self.metrics.resume_events += 1;
546 actions.push((source_id, AlignmentAction::Resume));
547 }
548 }
549 }
550
551 if !actions.is_empty() {
553 self.recalculate_bounds();
554 }
555
556 actions
557 }
558
559 #[must_use]
561 pub fn source_state(&self, source_id: usize) -> Option<&AlignmentSourceState> {
562 self.sources.get(&source_id)
563 }
564
565 fn recalculate_bounds(&mut self) {
567 let active_watermarks: Vec<i64> = self
568 .sources
569 .values()
570 .filter(|s| !s.is_paused && s.watermark != i64::MIN)
571 .map(|s| s.watermark)
572 .collect();
573
574 if active_watermarks.is_empty() {
575 self.min_watermark = i64::MIN;
576 self.max_watermark = i64::MIN;
577 } else {
578 self.min_watermark = *active_watermarks.iter().min().unwrap();
579 self.max_watermark = *active_watermarks.iter().max().unwrap();
580 }
581 }
582}
583
584#[derive(Debug, thiserror::Error)]
586pub enum AlignmentError {
587 #[error("source {0} not in any group")]
589 SourceNotInGroup(usize),
590
591 #[error("group '{0}' not found")]
593 GroupNotFound(String),
594
595 #[error("source {source_id} already in group '{group_id}'")]
597 SourceAlreadyInGroup {
598 source_id: usize,
600 group_id: String,
602 },
603}
604
605#[derive(Debug, Default)]
635pub struct AlignmentGroupCoordinator {
636 groups: HashMap<AlignmentGroupId, WatermarkAlignmentGroup>,
638 source_groups: HashMap<usize, AlignmentGroupId>,
640}
641
642impl AlignmentGroupCoordinator {
643 #[must_use]
645 pub fn new() -> Self {
646 Self::default()
647 }
648
649 pub fn add_group(&mut self, config: AlignmentGroupConfig) {
651 let group_id = config.group_id.clone();
652 self.groups
653 .insert(group_id, WatermarkAlignmentGroup::new(config));
654 }
655
656 pub fn remove_group(&mut self, group_id: &AlignmentGroupId) -> Option<WatermarkAlignmentGroup> {
658 self.source_groups.retain(|_, gid| gid != group_id);
660 self.groups.remove(group_id)
661 }
662
663 pub fn assign_source_to_group(
670 &mut self,
671 source_id: usize,
672 group_id: &AlignmentGroupId,
673 ) -> Result<(), AlignmentError> {
674 if let Some(existing_group) = self.source_groups.get(&source_id) {
676 if existing_group != group_id {
677 return Err(AlignmentError::SourceAlreadyInGroup {
678 source_id,
679 group_id: existing_group.0.clone(),
680 });
681 }
682 return Ok(());
684 }
685
686 let group = self
688 .groups
689 .get_mut(group_id)
690 .ok_or_else(|| AlignmentError::GroupNotFound(group_id.0.clone()))?;
691
692 group.register_source(source_id);
694 self.source_groups.insert(source_id, group_id.clone());
695
696 Ok(())
697 }
698
699 pub fn unassign_source(&mut self, source_id: usize) {
701 if let Some(group_id) = self.source_groups.remove(&source_id) {
702 if let Some(group) = self.groups.get_mut(&group_id) {
703 group.unregister_source(source_id);
704 }
705 }
706 }
707
708 pub fn report_watermark(
712 &mut self,
713 source_id: usize,
714 watermark: i64,
715 ) -> Option<AlignmentAction> {
716 let group_id = self.source_groups.get(&source_id)?;
717 let group = self.groups.get_mut(group_id)?;
718 Some(group.report_watermark(source_id, watermark))
719 }
720
721 pub fn check_all_alignments(&mut self) -> Vec<(usize, AlignmentAction)> {
725 let mut all_actions = Vec::new();
726 for group in self.groups.values_mut() {
727 all_actions.extend(group.check_alignment());
728 }
729 all_actions
730 }
731
732 #[must_use]
734 pub fn all_metrics(&self) -> HashMap<AlignmentGroupId, AlignmentGroupMetrics> {
735 self.groups
736 .iter()
737 .map(|(id, group)| (id.clone(), group.metrics().clone()))
738 .collect()
739 }
740
741 #[must_use]
743 pub fn group(&self, group_id: &AlignmentGroupId) -> Option<&WatermarkAlignmentGroup> {
744 self.groups.get(group_id)
745 }
746
747 pub fn group_mut(
749 &mut self,
750 group_id: &AlignmentGroupId,
751 ) -> Option<&mut WatermarkAlignmentGroup> {
752 self.groups.get_mut(group_id)
753 }
754
755 #[must_use]
757 pub fn source_group(&self, source_id: usize) -> Option<&AlignmentGroupId> {
758 self.source_groups.get(&source_id)
759 }
760
761 #[must_use]
763 pub fn group_count(&self) -> usize {
764 self.groups.len()
765 }
766
767 #[must_use]
769 pub fn total_source_count(&self) -> usize {
770 self.source_groups.len()
771 }
772
773 #[must_use]
775 pub fn should_resume(&self, source_id: usize) -> bool {
776 let Some(group_id) = self.source_groups.get(&source_id) else {
777 return false;
778 };
779 let Some(group) = self.groups.get(group_id) else {
780 return false;
781 };
782 group.should_resume(source_id)
783 }
784
785 #[must_use]
787 pub fn is_paused(&self, source_id: usize) -> bool {
788 let Some(group_id) = self.source_groups.get(&source_id) else {
789 return false;
790 };
791 let Some(group) = self.groups.get(group_id) else {
792 return false;
793 };
794 group.is_paused(source_id)
795 }
796}
797
798#[cfg(test)]
799mod tests {
800 use super::*;
801
802 #[test]
803 fn test_alignment_group_id() {
804 let id = AlignmentGroupId::new("test-group");
805 assert_eq!(id.as_str(), "test-group");
806 assert_eq!(format!("{id}"), "test-group");
807 }
808
809 #[test]
810 fn test_alignment_group_config_builder() {
811 let config = AlignmentGroupConfig::new("test")
812 .with_max_drift(Duration::from_secs(120))
813 .with_update_interval(Duration::from_millis(500))
814 .with_enforcement_mode(EnforcementMode::WarnOnly);
815
816 assert_eq!(config.group_id.as_str(), "test");
817 assert_eq!(config.max_drift, Duration::from_secs(120));
818 assert_eq!(config.update_interval, Duration::from_millis(500));
819 assert_eq!(config.enforcement_mode, EnforcementMode::WarnOnly);
820 }
821
822 #[test]
823 fn test_alignment_group_single_source_no_pause() {
824 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
825 let mut group = WatermarkAlignmentGroup::new(config);
826
827 group.register_source(0);
828
829 let action = group.report_watermark(0, 100_000);
831 assert_eq!(action, AlignmentAction::Continue);
832 assert!(!group.is_paused(0));
833 }
834
835 #[test]
836 fn test_alignment_group_two_sources_fast_paused() {
837 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60)); let mut group = WatermarkAlignmentGroup::new(config);
839
840 group.register_source(0);
841 group.register_source(1);
842
843 group.report_watermark(0, 0);
845 group.report_watermark(1, 0);
846
847 let action = group.report_watermark(0, 50_000); assert_eq!(action, AlignmentAction::Continue);
850 assert!(!group.is_paused(0));
851
852 let action = group.report_watermark(0, 70_000); assert_eq!(action, AlignmentAction::Pause);
855 assert!(group.is_paused(0));
856 }
857
858 #[test]
859 fn test_alignment_group_resume_when_slow_catches_up() {
860 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
861 let mut group = WatermarkAlignmentGroup::new(config);
862
863 group.register_source(0);
864 group.register_source(1);
865
866 group.report_watermark(0, 0);
868 group.report_watermark(1, 0);
869
870 group.report_watermark(0, 100_000); assert!(group.is_paused(0));
873
874 group.report_watermark(1, 50_000); assert!(group.should_resume(0));
878 }
879
880 #[test]
881 fn test_alignment_group_warn_only_mode() {
882 let config = AlignmentGroupConfig::new("test")
883 .with_max_drift(Duration::from_secs(60))
884 .with_enforcement_mode(EnforcementMode::WarnOnly);
885 let mut group = WatermarkAlignmentGroup::new(config);
886
887 group.register_source(0);
888 group.register_source(1);
889
890 group.report_watermark(0, 0);
891 group.report_watermark(1, 0);
892
893 let action = group.report_watermark(0, 100_000);
895 match action {
896 AlignmentAction::Warn { drift_ms } => {
897 assert_eq!(drift_ms, 100_000); }
899 _ => panic!("Expected Warn action"),
900 }
901 assert!(!group.is_paused(0)); assert_eq!(group.metrics().warnings_emitted, 1);
903 }
904
905 #[test]
906 fn test_alignment_group_drop_excess_mode() {
907 let config = AlignmentGroupConfig::new("test")
908 .with_max_drift(Duration::from_secs(60))
909 .with_enforcement_mode(EnforcementMode::DropExcess);
910 let mut group = WatermarkAlignmentGroup::new(config);
911
912 group.register_source(0);
913 group.register_source(1);
914
915 group.report_watermark(0, 0);
916 group.report_watermark(1, 0);
917
918 let action = group.report_watermark(0, 100_000);
920 assert_eq!(action, AlignmentAction::Drop);
921 assert_eq!(group.metrics().events_dropped, 1);
922
923 let action = group.report_watermark(0, 110_000);
925 assert_eq!(action, AlignmentAction::Drop);
926 assert_eq!(group.metrics().events_dropped, 2);
927 }
928
929 #[test]
930 fn test_alignment_group_drift_calculation() {
931 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(300)); let mut group = WatermarkAlignmentGroup::new(config);
933
934 group.register_source(0);
935 group.register_source(1);
936 group.register_source(2);
937
938 group.report_watermark(0, 100_000); group.report_watermark(1, 200_000); group.report_watermark(2, 150_000); assert_eq!(group.current_drift(), Duration::from_secs(100));
944 assert_eq!(group.min_watermark(), 100_000);
945 assert_eq!(group.max_watermark(), 200_000);
946 }
947
948 #[test]
949 fn test_alignment_group_metrics_accurate() {
950 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
951 let mut group = WatermarkAlignmentGroup::new(config);
952
953 group.register_source(0);
954 group.register_source(1);
955
956 group.report_watermark(0, 0);
957 group.report_watermark(1, 0);
958
959 group.report_watermark(0, 100_000);
961 assert_eq!(group.metrics().pause_events, 1);
962
963 group.report_watermark(1, 50_000);
965
966 let _actions = group.check_alignment();
968 assert!(group.should_resume(0));
971 }
972
973 #[test]
974 fn test_alignment_group_unregister_source() {
975 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
976 let mut group = WatermarkAlignmentGroup::new(config);
977
978 group.register_source(0);
979 group.register_source(1);
980
981 group.report_watermark(0, 100_000);
982 group.report_watermark(1, 50_000);
983
984 assert_eq!(group.source_count(), 2);
985
986 group.unregister_source(1);
987 assert_eq!(group.source_count(), 1);
988
989 assert_eq!(group.min_watermark(), 100_000);
992 assert_eq!(group.max_watermark(), 100_000);
993 }
994
995 #[test]
996 fn test_alignment_group_source_state() {
997 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
998 let mut group = WatermarkAlignmentGroup::new(config);
999
1000 group.register_source(0);
1001 group.report_watermark(0, 50_000);
1002
1003 let state = group.source_state(0).expect("source exists");
1004 assert_eq!(state.source_id, 0);
1005 assert_eq!(state.watermark, 50_000);
1006 assert!(!state.is_paused);
1007 }
1008
1009 #[test]
1010 fn test_coordinator_multiple_groups() {
1011 let mut coordinator = AlignmentGroupCoordinator::new();
1012
1013 let config1 = AlignmentGroupConfig::new("group1").with_max_drift(Duration::from_secs(60));
1014 let config2 = AlignmentGroupConfig::new("group2").with_max_drift(Duration::from_secs(120));
1015
1016 coordinator.add_group(config1);
1017 coordinator.add_group(config2);
1018
1019 assert_eq!(coordinator.group_count(), 2);
1020 }
1021
1022 #[test]
1023 fn test_coordinator_source_assignment() {
1024 let mut coordinator = AlignmentGroupCoordinator::new();
1025
1026 let config =
1027 AlignmentGroupConfig::new("test-group").with_max_drift(Duration::from_secs(60));
1028 coordinator.add_group(config);
1029
1030 let group_id = AlignmentGroupId::new("test-group");
1031
1032 coordinator
1034 .assign_source_to_group(0, &group_id)
1035 .expect("should succeed");
1036 coordinator
1037 .assign_source_to_group(1, &group_id)
1038 .expect("should succeed");
1039
1040 assert_eq!(coordinator.total_source_count(), 2);
1041 assert_eq!(coordinator.source_group(0), Some(&group_id));
1042 }
1043
1044 #[test]
1045 fn test_coordinator_source_already_in_group() {
1046 let mut coordinator = AlignmentGroupCoordinator::new();
1047
1048 let config1 = AlignmentGroupConfig::new("group1");
1049 let config2 = AlignmentGroupConfig::new("group2");
1050 coordinator.add_group(config1);
1051 coordinator.add_group(config2);
1052
1053 let group1 = AlignmentGroupId::new("group1");
1054 let group2 = AlignmentGroupId::new("group2");
1055
1056 coordinator
1057 .assign_source_to_group(0, &group1)
1058 .expect("should succeed");
1059
1060 let result = coordinator.assign_source_to_group(0, &group2);
1062 assert!(matches!(
1063 result,
1064 Err(AlignmentError::SourceAlreadyInGroup { .. })
1065 ));
1066
1067 let result = coordinator.assign_source_to_group(0, &group1);
1069 assert!(result.is_ok());
1070 }
1071
1072 #[test]
1073 fn test_coordinator_group_not_found() {
1074 let mut coordinator = AlignmentGroupCoordinator::new();
1075
1076 let result = coordinator.assign_source_to_group(0, &AlignmentGroupId::new("nonexistent"));
1077 assert!(matches!(result, Err(AlignmentError::GroupNotFound(_))));
1078 }
1079
1080 #[test]
1081 fn test_coordinator_report_watermark() {
1082 let mut coordinator = AlignmentGroupCoordinator::new();
1083
1084 let config =
1085 AlignmentGroupConfig::new("test-group").with_max_drift(Duration::from_secs(60));
1086 coordinator.add_group(config);
1087
1088 let group_id = AlignmentGroupId::new("test-group");
1089 coordinator.assign_source_to_group(0, &group_id).unwrap();
1090 coordinator.assign_source_to_group(1, &group_id).unwrap();
1091
1092 let action = coordinator.report_watermark(0, 0);
1094 assert_eq!(action, Some(AlignmentAction::Continue));
1095
1096 let action = coordinator.report_watermark(1, 0);
1097 assert_eq!(action, Some(AlignmentAction::Continue));
1098
1099 let action = coordinator.report_watermark(99, 0);
1101 assert_eq!(action, None);
1102 }
1103
1104 #[test]
1105 fn test_coordinator_unassign_source() {
1106 let mut coordinator = AlignmentGroupCoordinator::new();
1107
1108 let config = AlignmentGroupConfig::new("test-group");
1109 coordinator.add_group(config);
1110
1111 let group_id = AlignmentGroupId::new("test-group");
1112 coordinator.assign_source_to_group(0, &group_id).unwrap();
1113
1114 assert_eq!(coordinator.total_source_count(), 1);
1115
1116 coordinator.unassign_source(0);
1117 assert_eq!(coordinator.total_source_count(), 0);
1118 assert_eq!(coordinator.source_group(0), None);
1119 }
1120
1121 #[test]
1122 fn test_coordinator_remove_group() {
1123 let mut coordinator = AlignmentGroupCoordinator::new();
1124
1125 let config = AlignmentGroupConfig::new("test-group");
1126 coordinator.add_group(config);
1127
1128 let group_id = AlignmentGroupId::new("test-group");
1129 coordinator.assign_source_to_group(0, &group_id).unwrap();
1130 coordinator.assign_source_to_group(1, &group_id).unwrap();
1131
1132 assert_eq!(coordinator.group_count(), 1);
1133 assert_eq!(coordinator.total_source_count(), 2);
1134
1135 coordinator.remove_group(&group_id);
1136
1137 assert_eq!(coordinator.group_count(), 0);
1138 assert_eq!(coordinator.total_source_count(), 0);
1139 }
1140
1141 #[test]
1142 fn test_coordinator_is_paused() {
1143 let mut coordinator = AlignmentGroupCoordinator::new();
1144
1145 let config =
1146 AlignmentGroupConfig::new("test-group").with_max_drift(Duration::from_secs(60));
1147 coordinator.add_group(config);
1148
1149 let group_id = AlignmentGroupId::new("test-group");
1150 coordinator.assign_source_to_group(0, &group_id).unwrap();
1151 coordinator.assign_source_to_group(1, &group_id).unwrap();
1152
1153 coordinator.report_watermark(0, 0);
1154 coordinator.report_watermark(1, 0);
1155
1156 coordinator.report_watermark(0, 100_000);
1158 assert!(coordinator.is_paused(0));
1159 assert!(!coordinator.is_paused(1));
1160 }
1161
1162 #[test]
1163 fn test_coordinator_all_metrics() {
1164 let mut coordinator = AlignmentGroupCoordinator::new();
1165
1166 let config1 = AlignmentGroupConfig::new("group1");
1167 let config2 = AlignmentGroupConfig::new("group2");
1168 coordinator.add_group(config1);
1169 coordinator.add_group(config2);
1170
1171 let metrics = coordinator.all_metrics();
1172 assert_eq!(metrics.len(), 2);
1173 assert!(metrics.contains_key(&AlignmentGroupId::new("group1")));
1174 assert!(metrics.contains_key(&AlignmentGroupId::new("group2")));
1175 }
1176
1177 #[test]
1178 fn test_alignment_group_empty() {
1179 let config = AlignmentGroupConfig::new("test");
1180 let group = WatermarkAlignmentGroup::new(config);
1181
1182 assert_eq!(group.source_count(), 0);
1183 assert_eq!(group.min_watermark(), i64::MIN);
1184 assert_eq!(group.max_watermark(), i64::MIN);
1185 }
1186
1187 #[test]
1188 fn test_alignment_group_all_paused() {
1189 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(10));
1190 let mut group = WatermarkAlignmentGroup::new(config);
1191
1192 group.register_source(0);
1193 group.register_source(1);
1194
1195 group.report_watermark(0, 0);
1196 group.report_watermark(1, 0);
1197
1198 group.report_watermark(0, 100_000);
1201 assert!(group.is_paused(0));
1202
1203 assert!(!group.is_paused(1));
1206 }
1207
1208 #[test]
1209 fn test_alignment_group_negative_watermarks() {
1210 let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
1211 let mut group = WatermarkAlignmentGroup::new(config);
1212
1213 group.register_source(0);
1214 group.register_source(1);
1215
1216 group.report_watermark(0, -100_000);
1218 group.report_watermark(1, -50_000);
1219
1220 assert_eq!(group.min_watermark(), -100_000);
1221 assert_eq!(group.max_watermark(), -50_000);
1222 assert_eq!(group.current_drift(), Duration::from_secs(50));
1223 }
1224
1225 #[test]
1226 fn test_alignment_source_state_pause_resume_tracking() {
1227 let mut state = AlignmentSourceState::new(0);
1228
1229 assert!(!state.is_paused);
1230 assert!(state.pause_start.is_none());
1231
1232 state.pause();
1233 assert!(state.is_paused);
1234 assert!(state.pause_start.is_some());
1235
1236 std::thread::sleep(Duration::from_millis(1));
1238
1239 state.resume();
1240 assert!(!state.is_paused);
1241 assert!(state.pause_start.is_none());
1242 assert!(state.total_pause_time > Duration::ZERO);
1243 }
1244
1245 #[test]
1246 fn test_alignment_group_check_alignment_interval() {
1247 let config = AlignmentGroupConfig::new("test")
1248 .with_max_drift(Duration::from_secs(60))
1249 .with_update_interval(Duration::from_millis(100));
1250 let mut group = WatermarkAlignmentGroup::new(config);
1251
1252 group.register_source(0);
1253 group.register_source(1);
1254
1255 group.report_watermark(0, 0);
1256 group.report_watermark(1, 0);
1257 group.report_watermark(0, 100_000); let immediate_actions = group.check_alignment();
1261 assert!(immediate_actions.is_empty());
1262
1263 std::thread::sleep(Duration::from_millis(110));
1265
1266 group.report_watermark(1, 50_000); let actions = group.check_alignment();
1269 assert!(actions
1271 .iter()
1272 .any(|(id, action)| *id == 0 && *action == AlignmentAction::Resume));
1273 }
1274}