pub struct WatermarkAlignmentGroup { /* private fields */ }Expand description
Manages watermark alignment across sources in a group.
Tracks watermarks from multiple sources and enforces that no source gets too far ahead of others. When a source exceeds the maximum drift, it is paused (or warned/dropped depending on enforcement mode) until slower sources catch up.
§Example
use laminar_core::time::{
WatermarkAlignmentGroup, AlignmentGroupConfig, AlignmentGroupId,
EnforcementMode, AlignmentAction,
};
use std::time::Duration;
let config = AlignmentGroupConfig {
group_id: AlignmentGroupId("join-group".to_string()),
max_drift: Duration::from_secs(60), // 1 minute
update_interval: Duration::from_millis(100),
enforcement_mode: EnforcementMode::Pause,
};
let mut group = WatermarkAlignmentGroup::new(config);
group.register_source(0);
group.register_source(1);
// Both sources start at 0
group.report_watermark(0, 0);
group.report_watermark(1, 0);
// Source 0 advances within limit
let action = group.report_watermark(0, 50_000); // 50 seconds
assert_eq!(action, AlignmentAction::Continue);
// Source 0 advances beyond limit (>60s drift)
let action = group.report_watermark(0, 70_000); // 70 seconds
assert_eq!(action, AlignmentAction::Pause);
// Source 1 catches up
group.report_watermark(1, 30_000); // 30 seconds
assert!(group.should_resume(0)); // Drift now 40s < 60sImplementations§
Source§impl WatermarkAlignmentGroup
impl WatermarkAlignmentGroup
Sourcepub fn new(config: AlignmentGroupConfig) -> Self
pub fn new(config: AlignmentGroupConfig) -> Self
Creates a new alignment group.
Sourcepub fn group_id(&self) -> &AlignmentGroupId
pub fn group_id(&self) -> &AlignmentGroupId
Returns the group ID.
Sourcepub fn config(&self) -> &AlignmentGroupConfig
pub fn config(&self) -> &AlignmentGroupConfig
Returns the configuration.
Sourcepub fn register_source(&mut self, source_id: usize)
pub fn register_source(&mut self, source_id: usize)
Registers a source with this alignment group.
Sourcepub fn unregister_source(&mut self, source_id: usize)
pub fn unregister_source(&mut self, source_id: usize)
Removes a source from this alignment group.
Sourcepub fn report_watermark(
&mut self,
source_id: usize,
watermark: i64,
) -> AlignmentAction
pub fn report_watermark( &mut self, source_id: usize, watermark: i64, ) -> AlignmentAction
Reports a watermark update from a source.
Returns the action the source should take.
Sourcepub fn should_resume(&self, source_id: usize) -> bool
pub fn should_resume(&self, source_id: usize) -> bool
Checks if a paused source should resume.
Sourcepub fn current_drift(&self) -> Duration
pub fn current_drift(&self) -> Duration
Returns the current drift (max - min watermark).
Sourcepub fn min_watermark(&self) -> i64
pub fn min_watermark(&self) -> i64
Returns the minimum watermark in the group.
Sourcepub fn max_watermark(&self) -> i64
pub fn max_watermark(&self) -> i64
Returns the maximum watermark in the group.
Sourcepub fn metrics(&self) -> &AlignmentGroupMetrics
pub fn metrics(&self) -> &AlignmentGroupMetrics
Returns metrics for this group.
Sourcepub fn source_count(&self) -> usize
pub fn source_count(&self) -> usize
Returns the number of registered sources.
Sourcepub fn paused_source_count(&self) -> usize
pub fn paused_source_count(&self) -> usize
Returns the number of paused sources.
Sourcepub fn active_source_count(&self) -> usize
pub fn active_source_count(&self) -> usize
Returns the number of active (non-paused) sources.
Sourcepub fn check_alignment(&mut self) -> Vec<(usize, AlignmentAction)>
pub fn check_alignment(&mut self) -> Vec<(usize, AlignmentAction)>
Performs periodic alignment check.
Should be called from Ring 1 at the configured update_interval.
Returns list of (source_id, action) pairs.
Sourcepub fn source_state(&self, source_id: usize) -> Option<&AlignmentSourceState>
pub fn source_state(&self, source_id: usize) -> Option<&AlignmentSourceState>
Returns the state of a specific source.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for WatermarkAlignmentGroup
impl RefUnwindSafe for WatermarkAlignmentGroup
impl Send for WatermarkAlignmentGroup
impl Sync for WatermarkAlignmentGroup
impl Unpin for WatermarkAlignmentGroup
impl UnwindSafe for WatermarkAlignmentGroup
Blanket Implementations§
Source§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
Source§type ArchivedMetadata = ()
type ArchivedMetadata = ()
Source§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
Source§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
Source§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
Source§unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
Source§fn resolve_niched(out: Place<NichedOption<T, N1>>)
fn resolve_niched(out: Place<NichedOption<T, N1>>)
out indicating that a T is niched.