Skip to main content

laminar_core/time/
alignment_group.rs

1//! # Watermark Alignment Groups
2//!
3//! Coordinates watermark progress across multiple sources to prevent unbounded state growth
4//! when sources have different processing speeds.
5//!
6//! ## Problem
7//!
8//! When sources have different processing speeds, fast sources can cause:
9//! - Excessive state growth (buffering data waiting for slow sources)
10//! - Memory pressure on downstream operators
11//! - Uneven resource utilization
12//!
13//! ## Solution
14//!
15//! Alignment groups enforce a maximum watermark drift between sources. When a source
16//! gets too far ahead, it is paused until slower sources catch up.
17//!
18//! ## Example
19//!
20//! ```rust
21//! use laminar_core::time::{
22//!     WatermarkAlignmentGroup, AlignmentGroupConfig, AlignmentGroupId,
23//!     EnforcementMode, AlignmentAction,
24//! };
25//! use std::time::Duration;
26//!
27//! let config = AlignmentGroupConfig {
28//!     group_id: AlignmentGroupId("orders-payments".to_string()),
29//!     max_drift: Duration::from_secs(300), // 5 minutes
30//!     update_interval: Duration::from_secs(1),
31//!     enforcement_mode: EnforcementMode::Pause,
32//! };
33//!
34//! let mut group = WatermarkAlignmentGroup::new(config);
35//!
36//! // Register sources
37//! group.register_source(0); // orders stream
38//! group.register_source(1); // payments stream
39//!
40//! // Initialize both sources with watermarks
41//! group.report_watermark(0, 0);
42//! group.report_watermark(1, 0);
43//!
44//! // Report watermarks
45//! let action = group.report_watermark(0, 10_000); // orders at 10:00
46//! assert_eq!(action, AlignmentAction::Continue);
47//!
48//! // Source 0 jumps far ahead (>300s drift from source 1 at 0)
49//! let action = group.report_watermark(0, 310_000); // orders at 10:05:10
50//! assert_eq!(action, AlignmentAction::Pause); // Too far ahead of source 1!
51//! ```
52
53use std::collections::HashMap;
54use std::time::{Duration, Instant};
55
56/// Identifier for an alignment group.
57#[derive(Debug, Clone, Hash, Eq, PartialEq)]
58pub struct AlignmentGroupId(pub String);
59
60impl AlignmentGroupId {
61    /// Creates a new alignment group ID.
62    #[must_use]
63    pub fn new(id: impl Into<String>) -> Self {
64        Self(id.into())
65    }
66
67    /// Returns the group ID as a string slice.
68    #[must_use]
69    pub fn as_str(&self) -> &str {
70        &self.0
71    }
72}
73
74impl std::fmt::Display for AlignmentGroupId {
75    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
76        write!(f, "{}", self.0)
77    }
78}
79
80/// Configuration for a watermark alignment group.
81#[derive(Debug, Clone)]
82pub struct AlignmentGroupConfig {
83    /// Group identifier.
84    pub group_id: AlignmentGroupId,
85    /// Maximum allowed drift between fastest and slowest source.
86    pub max_drift: Duration,
87    /// How often to check alignment (wall clock).
88    pub update_interval: Duration,
89    /// Whether to pause sources or just emit warnings.
90    pub enforcement_mode: EnforcementMode,
91}
92
93impl AlignmentGroupConfig {
94    /// Creates a new configuration with defaults.
95    #[must_use]
96    pub fn new(group_id: impl Into<String>) -> Self {
97        Self {
98            group_id: AlignmentGroupId::new(group_id),
99            max_drift: Duration::from_secs(300), // 5 minutes default
100            update_interval: Duration::from_secs(1),
101            enforcement_mode: EnforcementMode::Pause,
102        }
103    }
104
105    /// Sets the maximum allowed drift.
106    #[must_use]
107    pub fn with_max_drift(mut self, max_drift: Duration) -> Self {
108        self.max_drift = max_drift;
109        self
110    }
111
112    /// Sets the update interval.
113    #[must_use]
114    pub fn with_update_interval(mut self, interval: Duration) -> Self {
115        self.update_interval = interval;
116        self
117    }
118
119    /// Sets the enforcement mode.
120    #[must_use]
121    pub fn with_enforcement_mode(mut self, mode: EnforcementMode) -> Self {
122        self.enforcement_mode = mode;
123        self
124    }
125}
126
127/// Enforcement mode for alignment groups.
128#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
129pub enum EnforcementMode {
130    /// Pause fast sources (recommended for production).
131    #[default]
132    Pause,
133    /// Emit warnings but don't pause (for monitoring).
134    WarnOnly,
135    /// Drop events from fast sources that exceed drift.
136    DropExcess,
137}
138
139/// State for a source within an alignment group.
140#[derive(Debug, Clone)]
141pub struct AlignmentSourceState {
142    /// Source identifier.
143    pub source_id: usize,
144    /// Current watermark (milliseconds since epoch).
145    pub watermark: i64,
146    /// Whether this source is currently paused.
147    pub is_paused: bool,
148    /// Time when pause started (for metrics).
149    pub pause_start: Option<Instant>,
150    /// Total time spent paused.
151    pub total_pause_time: Duration,
152    /// Events processed while paused (for `DropExcess` mode).
153    pub events_dropped_while_paused: u64,
154    /// Last activity time.
155    pub last_activity: Instant,
156}
157
158impl AlignmentSourceState {
159    /// Creates a new source state.
160    fn new(source_id: usize) -> Self {
161        Self {
162            source_id,
163            watermark: i64::MIN,
164            is_paused: false,
165            pause_start: None,
166            total_pause_time: Duration::ZERO,
167            events_dropped_while_paused: 0,
168            last_activity: Instant::now(),
169        }
170    }
171
172    /// Pauses this source.
173    fn pause(&mut self) {
174        if !self.is_paused {
175            self.is_paused = true;
176            self.pause_start = Some(Instant::now());
177        }
178    }
179
180    /// Resumes this source.
181    fn resume(&mut self) {
182        if self.is_paused {
183            self.is_paused = false;
184            if let Some(start) = self.pause_start.take() {
185                self.total_pause_time += start.elapsed();
186            }
187        }
188    }
189}
190
191/// Metrics for an alignment group.
192#[derive(Debug, Clone, Default)]
193pub struct AlignmentGroupMetrics {
194    /// Number of times sources were paused.
195    pub pause_events: u64,
196    /// Number of times sources were resumed.
197    pub resume_events: u64,
198    /// Total pause time across all sources.
199    pub total_pause_time: Duration,
200    /// Maximum observed drift.
201    pub max_observed_drift: Duration,
202    /// Current drift.
203    pub current_drift: Duration,
204    /// Events dropped (in `DropExcess` mode).
205    pub events_dropped: u64,
206    /// Number of warnings emitted (in `WarnOnly` mode).
207    pub warnings_emitted: u64,
208}
209
210impl AlignmentGroupMetrics {
211    /// Creates new metrics.
212    #[must_use]
213    pub fn new() -> Self {
214        Self::default()
215    }
216
217    /// Resets all metrics.
218    pub fn reset(&mut self) {
219        *self = Self::default();
220    }
221}
222
223/// Action to take for a source based on alignment.
224#[derive(Debug, Clone, Copy, PartialEq, Eq)]
225pub enum AlignmentAction {
226    /// Continue processing normally.
227    Continue,
228    /// Pause this source (too far ahead).
229    Pause,
230    /// Resume this source (caught up).
231    Resume,
232    /// Drop this event (`DropExcess` mode).
233    Drop,
234    /// Warning only (`WarnOnly` mode).
235    Warn {
236        /// Current drift in milliseconds.
237        drift_ms: i64,
238    },
239}
240
241/// Manages watermark alignment across sources in a group.
242///
243/// Tracks watermarks from multiple sources and enforces that no source
244/// gets too far ahead of others. When a source exceeds the maximum drift,
245/// it is paused (or warned/dropped depending on enforcement mode) until
246/// slower sources catch up.
247///
248/// # Example
249///
250/// ```rust
251/// use laminar_core::time::{
252///     WatermarkAlignmentGroup, AlignmentGroupConfig, AlignmentGroupId,
253///     EnforcementMode, AlignmentAction,
254/// };
255/// use std::time::Duration;
256///
257/// let config = AlignmentGroupConfig {
258///     group_id: AlignmentGroupId("join-group".to_string()),
259///     max_drift: Duration::from_secs(60), // 1 minute
260///     update_interval: Duration::from_millis(100),
261///     enforcement_mode: EnforcementMode::Pause,
262/// };
263///
264/// let mut group = WatermarkAlignmentGroup::new(config);
265/// group.register_source(0);
266/// group.register_source(1);
267///
268/// // Both sources start at 0
269/// group.report_watermark(0, 0);
270/// group.report_watermark(1, 0);
271///
272/// // Source 0 advances within limit
273/// let action = group.report_watermark(0, 50_000); // 50 seconds
274/// assert_eq!(action, AlignmentAction::Continue);
275///
276/// // Source 0 advances beyond limit (>60s drift)
277/// let action = group.report_watermark(0, 70_000); // 70 seconds
278/// assert_eq!(action, AlignmentAction::Pause);
279///
280/// // Source 1 catches up
281/// group.report_watermark(1, 30_000); // 30 seconds
282/// assert!(group.should_resume(0)); // Drift now 40s < 60s
283/// ```
284#[derive(Debug)]
285pub struct WatermarkAlignmentGroup {
286    /// Configuration.
287    config: AlignmentGroupConfig,
288    /// Per-source state.
289    sources: HashMap<usize, AlignmentSourceState>,
290    /// Current minimum watermark in the group.
291    min_watermark: i64,
292    /// Current maximum watermark in the group.
293    max_watermark: i64,
294    /// Last alignment check time.
295    last_check: Instant,
296    /// Metrics.
297    metrics: AlignmentGroupMetrics,
298}
299
300impl WatermarkAlignmentGroup {
301    /// Creates a new alignment group.
302    #[must_use]
303    pub fn new(config: AlignmentGroupConfig) -> Self {
304        Self {
305            config,
306            sources: HashMap::new(),
307            min_watermark: i64::MIN,
308            max_watermark: i64::MIN,
309            last_check: Instant::now(),
310            metrics: AlignmentGroupMetrics::new(),
311        }
312    }
313
314    /// Returns the group ID.
315    #[must_use]
316    pub fn group_id(&self) -> &AlignmentGroupId {
317        &self.config.group_id
318    }
319
320    /// Returns the configuration.
321    #[must_use]
322    pub fn config(&self) -> &AlignmentGroupConfig {
323        &self.config
324    }
325
326    /// Registers a source with this alignment group.
327    pub fn register_source(&mut self, source_id: usize) {
328        self.sources
329            .entry(source_id)
330            .or_insert_with(|| AlignmentSourceState::new(source_id));
331    }
332
333    /// Removes a source from this alignment group.
334    pub fn unregister_source(&mut self, source_id: usize) {
335        self.sources.remove(&source_id);
336        self.recalculate_bounds();
337    }
338
339    /// Reports a watermark update from a source.
340    ///
341    /// Returns the action the source should take.
342    pub fn report_watermark(&mut self, source_id: usize, watermark: i64) -> AlignmentAction {
343        // Ensure source exists
344        self.sources
345            .entry(source_id)
346            .or_insert_with(|| AlignmentSourceState::new(source_id));
347
348        // Check if source is paused and handle accordingly
349        let is_paused = self.sources.get(&source_id).is_some_and(|s| s.is_paused);
350
351        if is_paused {
352            match self.config.enforcement_mode {
353                EnforcementMode::Pause => {
354                    // Check if we can resume (need to check before mutating)
355                    let can_resume = self.can_resume_with_watermark(source_id, watermark);
356                    if can_resume {
357                        if let Some(source) = self.sources.get_mut(&source_id) {
358                            source.watermark = watermark;
359                            source.last_activity = Instant::now();
360                            source.resume();
361                        }
362                        self.metrics.resume_events += 1;
363                        self.recalculate_bounds();
364                        return AlignmentAction::Resume;
365                    }
366                    return AlignmentAction::Pause;
367                }
368                EnforcementMode::DropExcess => {
369                    if let Some(source) = self.sources.get_mut(&source_id) {
370                        source.events_dropped_while_paused += 1;
371                    }
372                    self.metrics.events_dropped += 1;
373                    return AlignmentAction::Drop;
374                }
375                EnforcementMode::WarnOnly => {
376                    // WarnOnly doesn't actually pause, so continue to update watermark
377                }
378            }
379        }
380
381        // Update source watermark
382        if let Some(source) = self.sources.get_mut(&source_id) {
383            source.watermark = watermark;
384            source.last_activity = Instant::now();
385        }
386
387        // Recalculate min/max
388        self.recalculate_bounds();
389
390        // Calculate current drift
391        let current_drift = if self.min_watermark == i64::MIN || self.max_watermark == i64::MIN {
392            Duration::ZERO
393        } else {
394            let drift_ms = self.max_watermark.saturating_sub(self.min_watermark).max(0);
395            // SAFETY: drift_ms is guaranteed to be >= 0 due to max(0)
396            #[allow(clippy::cast_sign_loss)]
397            Duration::from_millis(drift_ms as u64)
398        };
399
400        self.metrics.current_drift = current_drift;
401        if current_drift > self.metrics.max_observed_drift {
402            self.metrics.max_observed_drift = current_drift;
403        }
404
405        // Check if this source is the one causing excessive drift
406        if watermark == self.max_watermark && current_drift > self.config.max_drift {
407            match self.config.enforcement_mode {
408                EnforcementMode::Pause => {
409                    if let Some(source) = self.sources.get_mut(&source_id) {
410                        source.pause();
411                    }
412                    self.metrics.pause_events += 1;
413                    AlignmentAction::Pause
414                }
415                EnforcementMode::WarnOnly => {
416                    self.metrics.warnings_emitted += 1;
417                    // Cap drift_ms to i64::MAX for extremely large drifts
418                    #[allow(clippy::cast_possible_truncation)]
419                    let drift_ms = current_drift.as_millis().min(i64::MAX as u128) as i64;
420                    AlignmentAction::Warn { drift_ms }
421                }
422                EnforcementMode::DropExcess => {
423                    if let Some(source) = self.sources.get_mut(&source_id) {
424                        source.is_paused = true; // Mark as paused for tracking
425                        source.events_dropped_while_paused += 1;
426                    }
427                    self.metrics.events_dropped += 1;
428                    AlignmentAction::Drop
429                }
430            }
431        } else {
432            AlignmentAction::Continue
433        }
434    }
435
436    /// Checks if a paused source should resume.
437    #[must_use]
438    pub fn should_resume(&self, source_id: usize) -> bool {
439        let Some(source) = self.sources.get(&source_id) else {
440            return false;
441        };
442
443        if !source.is_paused {
444            return false;
445        }
446
447        self.can_resume_with_watermark(source_id, source.watermark)
448    }
449
450    /// Checks if a source can resume with a given watermark.
451    fn can_resume_with_watermark(&self, source_id: usize, watermark: i64) -> bool {
452        // Calculate what the min would be without this source
453        let min_without_source = self
454            .sources
455            .iter()
456            .filter(|(&id, s)| id != source_id && !s.is_paused && s.watermark != i64::MIN)
457            .map(|(_, s)| s.watermark)
458            .min()
459            .unwrap_or(i64::MIN);
460
461        if min_without_source == i64::MIN {
462            return true; // No other active sources
463        }
464
465        let drift_if_resumed = watermark.saturating_sub(min_without_source).max(0);
466        // SAFETY: drift_if_resumed is guaranteed to be >= 0 due to max(0)
467        #[allow(clippy::cast_sign_loss)]
468        let drift_duration = Duration::from_millis(drift_if_resumed as u64);
469
470        drift_duration <= self.config.max_drift
471    }
472
473    /// Returns the current drift (max - min watermark).
474    #[must_use]
475    pub fn current_drift(&self) -> Duration {
476        self.metrics.current_drift
477    }
478
479    /// Returns whether a specific source is paused.
480    #[must_use]
481    pub fn is_paused(&self, source_id: usize) -> bool {
482        self.sources.get(&source_id).is_some_and(|s| s.is_paused)
483    }
484
485    /// Returns the minimum watermark in the group.
486    #[must_use]
487    pub fn min_watermark(&self) -> i64 {
488        self.min_watermark
489    }
490
491    /// Returns the maximum watermark in the group.
492    #[must_use]
493    pub fn max_watermark(&self) -> i64 {
494        self.max_watermark
495    }
496
497    /// Returns metrics for this group.
498    #[must_use]
499    pub fn metrics(&self) -> &AlignmentGroupMetrics {
500        &self.metrics
501    }
502
503    /// Returns the number of registered sources.
504    #[must_use]
505    pub fn source_count(&self) -> usize {
506        self.sources.len()
507    }
508
509    /// Returns the number of paused sources.
510    #[must_use]
511    pub fn paused_source_count(&self) -> usize {
512        self.sources.values().filter(|s| s.is_paused).count()
513    }
514
515    /// Returns the number of active (non-paused) sources.
516    #[must_use]
517    pub fn active_source_count(&self) -> usize {
518        self.sources.values().filter(|s| !s.is_paused).count()
519    }
520
521    /// Performs periodic alignment check.
522    ///
523    /// Should be called from Ring 1 at the configured `update_interval`.
524    /// Returns list of (`source_id`, action) pairs.
525    pub fn check_alignment(&mut self) -> Vec<(usize, AlignmentAction)> {
526        if self.last_check.elapsed() < self.config.update_interval {
527            return Vec::new();
528        }
529
530        self.last_check = Instant::now();
531        let mut actions = Vec::new();
532
533        // Check each paused source to see if it can resume
534        let paused_sources: Vec<usize> = self
535            .sources
536            .iter()
537            .filter(|(_, s)| s.is_paused)
538            .map(|(&id, _)| id)
539            .collect();
540
541        for source_id in paused_sources {
542            if self.should_resume(source_id) {
543                if let Some(source) = self.sources.get_mut(&source_id) {
544                    source.resume();
545                    self.metrics.resume_events += 1;
546                    actions.push((source_id, AlignmentAction::Resume));
547                }
548            }
549        }
550
551        // Recalculate bounds after resumptions
552        if !actions.is_empty() {
553            self.recalculate_bounds();
554        }
555
556        actions
557    }
558
559    /// Returns the state of a specific source.
560    #[must_use]
561    pub fn source_state(&self, source_id: usize) -> Option<&AlignmentSourceState> {
562        self.sources.get(&source_id)
563    }
564
565    /// Recalculates min/max watermarks from active sources.
566    fn recalculate_bounds(&mut self) {
567        let active_watermarks: Vec<i64> = self
568            .sources
569            .values()
570            .filter(|s| !s.is_paused && s.watermark != i64::MIN)
571            .map(|s| s.watermark)
572            .collect();
573
574        if active_watermarks.is_empty() {
575            self.min_watermark = i64::MIN;
576            self.max_watermark = i64::MIN;
577        } else {
578            self.min_watermark = *active_watermarks.iter().min().unwrap();
579            self.max_watermark = *active_watermarks.iter().max().unwrap();
580        }
581    }
582}
583
584/// Error type for alignment group operations.
585#[derive(Debug, thiserror::Error)]
586pub enum AlignmentError {
587    /// Source not registered in any group.
588    #[error("source {0} not in any group")]
589    SourceNotInGroup(usize),
590
591    /// Group not found.
592    #[error("group '{0}' not found")]
593    GroupNotFound(String),
594
595    /// Source already assigned to another group.
596    #[error("source {source_id} already in group '{group_id}'")]
597    SourceAlreadyInGroup {
598        /// The source ID.
599        source_id: usize,
600        /// The group ID.
601        group_id: String,
602    },
603}
604
605/// Manages multiple alignment groups.
606///
607/// Provides a single coordination point for all alignment groups in the system.
608/// Routes watermark updates to the appropriate group based on source assignment.
609///
610/// # Example
611///
612/// ```rust
613/// use laminar_core::time::{
614///     AlignmentGroupCoordinator, AlignmentGroupConfig, AlignmentGroupId,
615///     EnforcementMode, AlignmentAction,
616/// };
617/// use std::time::Duration;
618///
619/// let mut coordinator = AlignmentGroupCoordinator::new();
620///
621/// // Create a group for orders-payments join
622/// let config = AlignmentGroupConfig::new("orders-payments")
623///     .with_max_drift(Duration::from_secs(300));
624/// coordinator.add_group(config);
625///
626/// // Assign sources to the group
627/// coordinator.assign_source_to_group(0, &AlignmentGroupId::new("orders-payments")).unwrap();
628/// coordinator.assign_source_to_group(1, &AlignmentGroupId::new("orders-payments")).unwrap();
629///
630/// // Report watermarks
631/// let action = coordinator.report_watermark(0, 10_000);
632/// assert_eq!(action, Some(AlignmentAction::Continue));
633/// ```
634#[derive(Debug, Default)]
635pub struct AlignmentGroupCoordinator {
636    /// Groups by ID.
637    groups: HashMap<AlignmentGroupId, WatermarkAlignmentGroup>,
638    /// Source to group mapping (a source can be in one group).
639    source_groups: HashMap<usize, AlignmentGroupId>,
640}
641
642impl AlignmentGroupCoordinator {
643    /// Creates a new coordinator.
644    #[must_use]
645    pub fn new() -> Self {
646        Self::default()
647    }
648
649    /// Adds an alignment group.
650    pub fn add_group(&mut self, config: AlignmentGroupConfig) {
651        let group_id = config.group_id.clone();
652        self.groups
653            .insert(group_id, WatermarkAlignmentGroup::new(config));
654    }
655
656    /// Removes an alignment group.
657    pub fn remove_group(&mut self, group_id: &AlignmentGroupId) -> Option<WatermarkAlignmentGroup> {
658        // Remove source mappings for this group
659        self.source_groups.retain(|_, gid| gid != group_id);
660        self.groups.remove(group_id)
661    }
662
663    /// Assigns a source to a group.
664    ///
665    /// # Errors
666    ///
667    /// Returns an error if the group doesn't exist or the source is already
668    /// assigned to another group.
669    pub fn assign_source_to_group(
670        &mut self,
671        source_id: usize,
672        group_id: &AlignmentGroupId,
673    ) -> Result<(), AlignmentError> {
674        // Check if source is already in a group
675        if let Some(existing_group) = self.source_groups.get(&source_id) {
676            if existing_group != group_id {
677                return Err(AlignmentError::SourceAlreadyInGroup {
678                    source_id,
679                    group_id: existing_group.0.clone(),
680                });
681            }
682            // Already in this group, nothing to do
683            return Ok(());
684        }
685
686        // Check if group exists
687        let group = self
688            .groups
689            .get_mut(group_id)
690            .ok_or_else(|| AlignmentError::GroupNotFound(group_id.0.clone()))?;
691
692        // Register source with group
693        group.register_source(source_id);
694        self.source_groups.insert(source_id, group_id.clone());
695
696        Ok(())
697    }
698
699    /// Removes a source from its group.
700    pub fn unassign_source(&mut self, source_id: usize) {
701        if let Some(group_id) = self.source_groups.remove(&source_id) {
702            if let Some(group) = self.groups.get_mut(&group_id) {
703                group.unregister_source(source_id);
704            }
705        }
706    }
707
708    /// Reports a watermark update.
709    ///
710    /// Returns the action for the source, or `None` if source not in any group.
711    pub fn report_watermark(
712        &mut self,
713        source_id: usize,
714        watermark: i64,
715    ) -> Option<AlignmentAction> {
716        let group_id = self.source_groups.get(&source_id)?;
717        let group = self.groups.get_mut(group_id)?;
718        Some(group.report_watermark(source_id, watermark))
719    }
720
721    /// Checks alignment for all groups.
722    ///
723    /// Returns all resume/pause actions across all groups.
724    pub fn check_all_alignments(&mut self) -> Vec<(usize, AlignmentAction)> {
725        let mut all_actions = Vec::new();
726        for group in self.groups.values_mut() {
727            all_actions.extend(group.check_alignment());
728        }
729        all_actions
730    }
731
732    /// Returns metrics for all groups.
733    #[must_use]
734    pub fn all_metrics(&self) -> HashMap<AlignmentGroupId, AlignmentGroupMetrics> {
735        self.groups
736            .iter()
737            .map(|(id, group)| (id.clone(), group.metrics().clone()))
738            .collect()
739    }
740
741    /// Returns a reference to a specific group.
742    #[must_use]
743    pub fn group(&self, group_id: &AlignmentGroupId) -> Option<&WatermarkAlignmentGroup> {
744        self.groups.get(group_id)
745    }
746
747    /// Returns a mutable reference to a specific group.
748    pub fn group_mut(
749        &mut self,
750        group_id: &AlignmentGroupId,
751    ) -> Option<&mut WatermarkAlignmentGroup> {
752        self.groups.get_mut(group_id)
753    }
754
755    /// Returns the group ID for a source.
756    #[must_use]
757    pub fn source_group(&self, source_id: usize) -> Option<&AlignmentGroupId> {
758        self.source_groups.get(&source_id)
759    }
760
761    /// Returns the number of groups.
762    #[must_use]
763    pub fn group_count(&self) -> usize {
764        self.groups.len()
765    }
766
767    /// Returns the total number of sources across all groups.
768    #[must_use]
769    pub fn total_source_count(&self) -> usize {
770        self.source_groups.len()
771    }
772
773    /// Checks if a source should resume.
774    #[must_use]
775    pub fn should_resume(&self, source_id: usize) -> bool {
776        let Some(group_id) = self.source_groups.get(&source_id) else {
777            return false;
778        };
779        let Some(group) = self.groups.get(group_id) else {
780            return false;
781        };
782        group.should_resume(source_id)
783    }
784
785    /// Checks if a source is paused.
786    #[must_use]
787    pub fn is_paused(&self, source_id: usize) -> bool {
788        let Some(group_id) = self.source_groups.get(&source_id) else {
789            return false;
790        };
791        let Some(group) = self.groups.get(group_id) else {
792            return false;
793        };
794        group.is_paused(source_id)
795    }
796}
797
798#[cfg(test)]
799mod tests {
800    use super::*;
801
802    #[test]
803    fn test_alignment_group_id() {
804        let id = AlignmentGroupId::new("test-group");
805        assert_eq!(id.as_str(), "test-group");
806        assert_eq!(format!("{id}"), "test-group");
807    }
808
809    #[test]
810    fn test_alignment_group_config_builder() {
811        let config = AlignmentGroupConfig::new("test")
812            .with_max_drift(Duration::from_secs(120))
813            .with_update_interval(Duration::from_millis(500))
814            .with_enforcement_mode(EnforcementMode::WarnOnly);
815
816        assert_eq!(config.group_id.as_str(), "test");
817        assert_eq!(config.max_drift, Duration::from_secs(120));
818        assert_eq!(config.update_interval, Duration::from_millis(500));
819        assert_eq!(config.enforcement_mode, EnforcementMode::WarnOnly);
820    }
821
822    #[test]
823    fn test_alignment_group_single_source_no_pause() {
824        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
825        let mut group = WatermarkAlignmentGroup::new(config);
826
827        group.register_source(0);
828
829        // Single source should never be paused
830        let action = group.report_watermark(0, 100_000);
831        assert_eq!(action, AlignmentAction::Continue);
832        assert!(!group.is_paused(0));
833    }
834
835    #[test]
836    fn test_alignment_group_two_sources_fast_paused() {
837        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60)); // 60 second max drift
838        let mut group = WatermarkAlignmentGroup::new(config);
839
840        group.register_source(0);
841        group.register_source(1);
842
843        // Both start at 0
844        group.report_watermark(0, 0);
845        group.report_watermark(1, 0);
846
847        // Source 0 advances within limit
848        let action = group.report_watermark(0, 50_000); // 50 seconds
849        assert_eq!(action, AlignmentAction::Continue);
850        assert!(!group.is_paused(0));
851
852        // Source 0 advances beyond limit
853        let action = group.report_watermark(0, 70_000); // 70 seconds, drift > 60
854        assert_eq!(action, AlignmentAction::Pause);
855        assert!(group.is_paused(0));
856    }
857
858    #[test]
859    fn test_alignment_group_resume_when_slow_catches_up() {
860        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
861        let mut group = WatermarkAlignmentGroup::new(config);
862
863        group.register_source(0);
864        group.register_source(1);
865
866        // Source 0 at 0, source 1 at 0
867        group.report_watermark(0, 0);
868        group.report_watermark(1, 0);
869
870        // Source 0 jumps ahead and gets paused
871        group.report_watermark(0, 100_000); // 100 seconds
872        assert!(group.is_paused(0));
873
874        // Source 1 catches up
875        group.report_watermark(1, 50_000); // 50 seconds
876                                           // Drift is now 100 - 50 = 50 seconds, within limit
877        assert!(group.should_resume(0));
878    }
879
880    #[test]
881    fn test_alignment_group_warn_only_mode() {
882        let config = AlignmentGroupConfig::new("test")
883            .with_max_drift(Duration::from_secs(60))
884            .with_enforcement_mode(EnforcementMode::WarnOnly);
885        let mut group = WatermarkAlignmentGroup::new(config);
886
887        group.register_source(0);
888        group.register_source(1);
889
890        group.report_watermark(0, 0);
891        group.report_watermark(1, 0);
892
893        // Source 0 exceeds drift but only warns
894        let action = group.report_watermark(0, 100_000);
895        match action {
896            AlignmentAction::Warn { drift_ms } => {
897                assert_eq!(drift_ms, 100_000); // 100 seconds drift
898            }
899            _ => panic!("Expected Warn action"),
900        }
901        assert!(!group.is_paused(0)); // Not actually paused
902        assert_eq!(group.metrics().warnings_emitted, 1);
903    }
904
905    #[test]
906    fn test_alignment_group_drop_excess_mode() {
907        let config = AlignmentGroupConfig::new("test")
908            .with_max_drift(Duration::from_secs(60))
909            .with_enforcement_mode(EnforcementMode::DropExcess);
910        let mut group = WatermarkAlignmentGroup::new(config);
911
912        group.register_source(0);
913        group.register_source(1);
914
915        group.report_watermark(0, 0);
916        group.report_watermark(1, 0);
917
918        // Source 0 exceeds drift and gets dropped
919        let action = group.report_watermark(0, 100_000);
920        assert_eq!(action, AlignmentAction::Drop);
921        assert_eq!(group.metrics().events_dropped, 1);
922
923        // Subsequent events from source 0 are also dropped
924        let action = group.report_watermark(0, 110_000);
925        assert_eq!(action, AlignmentAction::Drop);
926        assert_eq!(group.metrics().events_dropped, 2);
927    }
928
929    #[test]
930    fn test_alignment_group_drift_calculation() {
931        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(300)); // 5 minutes
932        let mut group = WatermarkAlignmentGroup::new(config);
933
934        group.register_source(0);
935        group.register_source(1);
936        group.register_source(2);
937
938        group.report_watermark(0, 100_000); // 100s
939        group.report_watermark(1, 200_000); // 200s
940        group.report_watermark(2, 150_000); // 150s
941
942        // Drift should be max - min = 200 - 100 = 100 seconds
943        assert_eq!(group.current_drift(), Duration::from_secs(100));
944        assert_eq!(group.min_watermark(), 100_000);
945        assert_eq!(group.max_watermark(), 200_000);
946    }
947
948    #[test]
949    fn test_alignment_group_metrics_accurate() {
950        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
951        let mut group = WatermarkAlignmentGroup::new(config);
952
953        group.register_source(0);
954        group.register_source(1);
955
956        group.report_watermark(0, 0);
957        group.report_watermark(1, 0);
958
959        // Pause source 0
960        group.report_watermark(0, 100_000);
961        assert_eq!(group.metrics().pause_events, 1);
962
963        // Source 1 catches up
964        group.report_watermark(1, 50_000);
965
966        // Check alignment should resume source 0
967        let _actions = group.check_alignment();
968        // Note: check_alignment only runs if update_interval has passed
969        // For this test, we check should_resume directly
970        assert!(group.should_resume(0));
971    }
972
973    #[test]
974    fn test_alignment_group_unregister_source() {
975        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
976        let mut group = WatermarkAlignmentGroup::new(config);
977
978        group.register_source(0);
979        group.register_source(1);
980
981        group.report_watermark(0, 100_000);
982        group.report_watermark(1, 50_000);
983
984        assert_eq!(group.source_count(), 2);
985
986        group.unregister_source(1);
987        assert_eq!(group.source_count(), 1);
988
989        // After removing source 1, only source 0 remains
990        // so drift should be 0 (single source)
991        assert_eq!(group.min_watermark(), 100_000);
992        assert_eq!(group.max_watermark(), 100_000);
993    }
994
995    #[test]
996    fn test_alignment_group_source_state() {
997        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
998        let mut group = WatermarkAlignmentGroup::new(config);
999
1000        group.register_source(0);
1001        group.report_watermark(0, 50_000);
1002
1003        let state = group.source_state(0).expect("source exists");
1004        assert_eq!(state.source_id, 0);
1005        assert_eq!(state.watermark, 50_000);
1006        assert!(!state.is_paused);
1007    }
1008
1009    #[test]
1010    fn test_coordinator_multiple_groups() {
1011        let mut coordinator = AlignmentGroupCoordinator::new();
1012
1013        let config1 = AlignmentGroupConfig::new("group1").with_max_drift(Duration::from_secs(60));
1014        let config2 = AlignmentGroupConfig::new("group2").with_max_drift(Duration::from_secs(120));
1015
1016        coordinator.add_group(config1);
1017        coordinator.add_group(config2);
1018
1019        assert_eq!(coordinator.group_count(), 2);
1020    }
1021
1022    #[test]
1023    fn test_coordinator_source_assignment() {
1024        let mut coordinator = AlignmentGroupCoordinator::new();
1025
1026        let config =
1027            AlignmentGroupConfig::new("test-group").with_max_drift(Duration::from_secs(60));
1028        coordinator.add_group(config);
1029
1030        let group_id = AlignmentGroupId::new("test-group");
1031
1032        // Assign sources
1033        coordinator
1034            .assign_source_to_group(0, &group_id)
1035            .expect("should succeed");
1036        coordinator
1037            .assign_source_to_group(1, &group_id)
1038            .expect("should succeed");
1039
1040        assert_eq!(coordinator.total_source_count(), 2);
1041        assert_eq!(coordinator.source_group(0), Some(&group_id));
1042    }
1043
1044    #[test]
1045    fn test_coordinator_source_already_in_group() {
1046        let mut coordinator = AlignmentGroupCoordinator::new();
1047
1048        let config1 = AlignmentGroupConfig::new("group1");
1049        let config2 = AlignmentGroupConfig::new("group2");
1050        coordinator.add_group(config1);
1051        coordinator.add_group(config2);
1052
1053        let group1 = AlignmentGroupId::new("group1");
1054        let group2 = AlignmentGroupId::new("group2");
1055
1056        coordinator
1057            .assign_source_to_group(0, &group1)
1058            .expect("should succeed");
1059
1060        // Try to assign to different group
1061        let result = coordinator.assign_source_to_group(0, &group2);
1062        assert!(matches!(
1063            result,
1064            Err(AlignmentError::SourceAlreadyInGroup { .. })
1065        ));
1066
1067        // Assigning to same group should be fine
1068        let result = coordinator.assign_source_to_group(0, &group1);
1069        assert!(result.is_ok());
1070    }
1071
1072    #[test]
1073    fn test_coordinator_group_not_found() {
1074        let mut coordinator = AlignmentGroupCoordinator::new();
1075
1076        let result = coordinator.assign_source_to_group(0, &AlignmentGroupId::new("nonexistent"));
1077        assert!(matches!(result, Err(AlignmentError::GroupNotFound(_))));
1078    }
1079
1080    #[test]
1081    fn test_coordinator_report_watermark() {
1082        let mut coordinator = AlignmentGroupCoordinator::new();
1083
1084        let config =
1085            AlignmentGroupConfig::new("test-group").with_max_drift(Duration::from_secs(60));
1086        coordinator.add_group(config);
1087
1088        let group_id = AlignmentGroupId::new("test-group");
1089        coordinator.assign_source_to_group(0, &group_id).unwrap();
1090        coordinator.assign_source_to_group(1, &group_id).unwrap();
1091
1092        // Report watermarks
1093        let action = coordinator.report_watermark(0, 0);
1094        assert_eq!(action, Some(AlignmentAction::Continue));
1095
1096        let action = coordinator.report_watermark(1, 0);
1097        assert_eq!(action, Some(AlignmentAction::Continue));
1098
1099        // Source not in any group
1100        let action = coordinator.report_watermark(99, 0);
1101        assert_eq!(action, None);
1102    }
1103
1104    #[test]
1105    fn test_coordinator_unassign_source() {
1106        let mut coordinator = AlignmentGroupCoordinator::new();
1107
1108        let config = AlignmentGroupConfig::new("test-group");
1109        coordinator.add_group(config);
1110
1111        let group_id = AlignmentGroupId::new("test-group");
1112        coordinator.assign_source_to_group(0, &group_id).unwrap();
1113
1114        assert_eq!(coordinator.total_source_count(), 1);
1115
1116        coordinator.unassign_source(0);
1117        assert_eq!(coordinator.total_source_count(), 0);
1118        assert_eq!(coordinator.source_group(0), None);
1119    }
1120
1121    #[test]
1122    fn test_coordinator_remove_group() {
1123        let mut coordinator = AlignmentGroupCoordinator::new();
1124
1125        let config = AlignmentGroupConfig::new("test-group");
1126        coordinator.add_group(config);
1127
1128        let group_id = AlignmentGroupId::new("test-group");
1129        coordinator.assign_source_to_group(0, &group_id).unwrap();
1130        coordinator.assign_source_to_group(1, &group_id).unwrap();
1131
1132        assert_eq!(coordinator.group_count(), 1);
1133        assert_eq!(coordinator.total_source_count(), 2);
1134
1135        coordinator.remove_group(&group_id);
1136
1137        assert_eq!(coordinator.group_count(), 0);
1138        assert_eq!(coordinator.total_source_count(), 0);
1139    }
1140
1141    #[test]
1142    fn test_coordinator_is_paused() {
1143        let mut coordinator = AlignmentGroupCoordinator::new();
1144
1145        let config =
1146            AlignmentGroupConfig::new("test-group").with_max_drift(Duration::from_secs(60));
1147        coordinator.add_group(config);
1148
1149        let group_id = AlignmentGroupId::new("test-group");
1150        coordinator.assign_source_to_group(0, &group_id).unwrap();
1151        coordinator.assign_source_to_group(1, &group_id).unwrap();
1152
1153        coordinator.report_watermark(0, 0);
1154        coordinator.report_watermark(1, 0);
1155
1156        // Source 0 exceeds drift
1157        coordinator.report_watermark(0, 100_000);
1158        assert!(coordinator.is_paused(0));
1159        assert!(!coordinator.is_paused(1));
1160    }
1161
1162    #[test]
1163    fn test_coordinator_all_metrics() {
1164        let mut coordinator = AlignmentGroupCoordinator::new();
1165
1166        let config1 = AlignmentGroupConfig::new("group1");
1167        let config2 = AlignmentGroupConfig::new("group2");
1168        coordinator.add_group(config1);
1169        coordinator.add_group(config2);
1170
1171        let metrics = coordinator.all_metrics();
1172        assert_eq!(metrics.len(), 2);
1173        assert!(metrics.contains_key(&AlignmentGroupId::new("group1")));
1174        assert!(metrics.contains_key(&AlignmentGroupId::new("group2")));
1175    }
1176
1177    #[test]
1178    fn test_alignment_group_empty() {
1179        let config = AlignmentGroupConfig::new("test");
1180        let group = WatermarkAlignmentGroup::new(config);
1181
1182        assert_eq!(group.source_count(), 0);
1183        assert_eq!(group.min_watermark(), i64::MIN);
1184        assert_eq!(group.max_watermark(), i64::MIN);
1185    }
1186
1187    #[test]
1188    fn test_alignment_group_all_paused() {
1189        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(10));
1190        let mut group = WatermarkAlignmentGroup::new(config);
1191
1192        group.register_source(0);
1193        group.register_source(1);
1194
1195        group.report_watermark(0, 0);
1196        group.report_watermark(1, 0);
1197
1198        // Pause both sources by having them exceed drift relative to each other
1199        // Source 0 jumps far ahead
1200        group.report_watermark(0, 100_000);
1201        assert!(group.is_paused(0));
1202
1203        // Source 1 is now the min, so drift calculation only includes it
1204        // Source 1 can't get paused because it's the slowest
1205        assert!(!group.is_paused(1));
1206    }
1207
1208    #[test]
1209    fn test_alignment_group_negative_watermarks() {
1210        let config = AlignmentGroupConfig::new("test").with_max_drift(Duration::from_secs(60));
1211        let mut group = WatermarkAlignmentGroup::new(config);
1212
1213        group.register_source(0);
1214        group.register_source(1);
1215
1216        // Use negative timestamps (before epoch)
1217        group.report_watermark(0, -100_000);
1218        group.report_watermark(1, -50_000);
1219
1220        assert_eq!(group.min_watermark(), -100_000);
1221        assert_eq!(group.max_watermark(), -50_000);
1222        assert_eq!(group.current_drift(), Duration::from_secs(50));
1223    }
1224
1225    #[test]
1226    fn test_alignment_source_state_pause_resume_tracking() {
1227        let mut state = AlignmentSourceState::new(0);
1228
1229        assert!(!state.is_paused);
1230        assert!(state.pause_start.is_none());
1231
1232        state.pause();
1233        assert!(state.is_paused);
1234        assert!(state.pause_start.is_some());
1235
1236        // Wait a tiny bit
1237        std::thread::sleep(Duration::from_millis(1));
1238
1239        state.resume();
1240        assert!(!state.is_paused);
1241        assert!(state.pause_start.is_none());
1242        assert!(state.total_pause_time > Duration::ZERO);
1243    }
1244
1245    #[test]
1246    fn test_alignment_group_check_alignment_interval() {
1247        let config = AlignmentGroupConfig::new("test")
1248            .with_max_drift(Duration::from_secs(60))
1249            .with_update_interval(Duration::from_millis(100));
1250        let mut group = WatermarkAlignmentGroup::new(config);
1251
1252        group.register_source(0);
1253        group.register_source(1);
1254
1255        group.report_watermark(0, 0);
1256        group.report_watermark(1, 0);
1257        group.report_watermark(0, 100_000); // Paused
1258
1259        // Immediate check should return empty (interval not elapsed)
1260        let immediate_actions = group.check_alignment();
1261        assert!(immediate_actions.is_empty());
1262
1263        // Wait for interval
1264        std::thread::sleep(Duration::from_millis(110));
1265
1266        // Now check should potentially return actions
1267        group.report_watermark(1, 50_000); // Slow source catches up
1268        let actions = group.check_alignment();
1269        // Should have a Resume action for source 0
1270        assert!(actions
1271            .iter()
1272            .any(|(id, action)| *id == 0 && *action == AlignmentAction::Resume));
1273    }
1274}