pub struct KeyedWatermarkTracker<K: Hash + Eq + Clone> { /* private fields */ }Expand description
Tracks watermarks per logical key.
Provides fine-grained watermark tracking for multi-tenant workloads and scenarios with significant event-time skew between keys.
§Research Background
Based on research (March 2025), keyed watermarks achieve 99%+ accuracy compared to 63-67% with global watermarks.
§Example
use laminar_core::time::{KeyedWatermarkTracker, KeyedWatermarkConfig, Watermark};
use std::time::Duration;
let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_secs(5));
let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
// Fast tenant advances quickly
tracker.update("tenant_a".to_string(), 10_000);
tracker.update("tenant_a".to_string(), 15_000);
// Slow tenant at earlier time
tracker.update("tenant_b".to_string(), 5_000);
// Per-key watermarks differ
assert_eq!(tracker.watermark_for_key(&"tenant_a".to_string()), Some(10_000));
assert_eq!(tracker.watermark_for_key(&"tenant_b".to_string()), Some(0));
// Global watermark is minimum
assert_eq!(tracker.global_watermark(), Some(Watermark::new(0)));Implementations§
Source§impl<K: Hash + Eq + Clone> KeyedWatermarkTracker<K>
impl<K: Hash + Eq + Clone> KeyedWatermarkTracker<K>
Sourcepub fn new(config: KeyedWatermarkConfig) -> Self
pub fn new(config: KeyedWatermarkConfig) -> Self
Creates a new keyed watermark tracker with the given configuration.
Sourcepub fn with_defaults() -> Self
pub fn with_defaults() -> Self
Creates a tracker with default configuration.
Sourcepub fn update(
&mut self,
key: K,
event_time: i64,
) -> Result<Option<Watermark>, KeyedWatermarkError>
pub fn update( &mut self, key: K, event_time: i64, ) -> Result<Option<Watermark>, KeyedWatermarkError>
Updates the watermark for a specific key.
§Returns
Ok(Some(Watermark))if the global watermark changesOk(None)if no global change
§Errors
Returns KeyedWatermarkError::MaxKeysReached if max_keys is reached
and the RejectNew eviction policy is configured.
§Example
use laminar_core::time::{KeyedWatermarkTracker, KeyedWatermarkConfig};
use std::time::Duration;
let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
// First update creates the key
let wm = tracker.update("key1".to_string(), 1000).unwrap();
assert!(wm.is_some()); // Global watermark advancesSourcepub fn update_batch(
&mut self,
events: &[(K, i64)],
) -> Result<Option<Watermark>, KeyedWatermarkError>
pub fn update_batch( &mut self, events: &[(K, i64)], ) -> Result<Option<Watermark>, KeyedWatermarkError>
Batch update for multiple events (more efficient).
Returns the new global watermark if it changed.
§Errors
Returns KeyedWatermarkError::MaxKeysReached if max_keys is reached
and the RejectNew eviction policy is configured.
Sourcepub fn watermark_for_key(&self, key: &K) -> Option<i64>
pub fn watermark_for_key(&self, key: &K) -> Option<i64>
Returns the watermark for a specific key.
Sourcepub fn global_watermark(&self) -> Option<Watermark>
pub fn global_watermark(&self) -> Option<Watermark>
Returns the global watermark (minimum across active keys).
Sourcepub fn is_late(&self, key: &K, event_time: i64) -> bool
pub fn is_late(&self, key: &K, event_time: i64) -> bool
Checks if an event is late for its key.
Uses the key’s individual watermark, not the global watermark.
If the key doesn’t exist, returns false (not late).
Sourcepub fn is_late_global(&self, event_time: i64) -> bool
pub fn is_late_global(&self, event_time: i64) -> bool
Checks if an event is late using the global watermark.
Use this for cross-key ordering guarantees.
Sourcepub fn mark_idle(&mut self, key: &K) -> Option<Watermark>
pub fn mark_idle(&mut self, key: &K) -> Option<Watermark>
Marks a key as idle, excluding it from global watermark calculation.
Returns Some(Watermark) if the global watermark advances.
Sourcepub fn mark_active(&mut self, key: &K)
pub fn mark_active(&mut self, key: &K)
Marks a key as active again.
Sourcepub fn check_idle_keys(&mut self) -> Option<Watermark>
pub fn check_idle_keys(&mut self) -> Option<Watermark>
Checks for keys that have been idle longer than the timeout.
Should be called periodically from Ring 1.
Returns Some(Watermark) if marking idle keys causes the global watermark to advance.
Sourcepub fn active_key_count(&self) -> usize
pub fn active_key_count(&self) -> usize
Returns the number of active (non-idle) keys.
Sourcepub fn total_key_count(&self) -> usize
pub fn total_key_count(&self) -> usize
Returns the total number of tracked keys.
Sourcepub fn metrics(&self) -> &KeyedWatermarkMetrics
pub fn metrics(&self) -> &KeyedWatermarkMetrics
Returns metrics.
Sourcepub fn recalculate_global(&mut self) -> Option<Watermark>
pub fn recalculate_global(&mut self) -> Option<Watermark>
Forces recalculation of global watermark.
Useful after bulk operations or recovery.
Sourcepub fn remove_key(&mut self, key: &K) -> Option<KeyWatermarkState>
pub fn remove_key(&mut self, key: &K) -> Option<KeyWatermarkState>
Removes a key from tracking.
Returns the key’s watermark state if it existed.
Sourcepub fn key_state(&self, key: &K) -> Option<&KeyWatermarkState>
pub fn key_state(&self, key: &K) -> Option<&KeyWatermarkState>
Returns the state for a specific key.
Sourcepub fn config(&self) -> &KeyedWatermarkConfig
pub fn config(&self) -> &KeyedWatermarkConfig
Returns the configuration.
Sourcepub fn contains_key(&self, key: &K) -> bool
pub fn contains_key(&self, key: &K) -> bool
Checks if a key exists in the tracker.
Sourcepub fn iter(&self) -> impl Iterator<Item = (&K, &KeyWatermarkState)>
pub fn iter(&self) -> impl Iterator<Item = (&K, &KeyWatermarkState)>
Returns an iterator over all key-state pairs.
Sourcepub fn bounded_delay_ms(&self) -> i64
pub fn bounded_delay_ms(&self) -> i64
Returns the bounded delay in milliseconds.
Trait Implementations§
Auto Trait Implementations§
impl<K> Freeze for KeyedWatermarkTracker<K>
impl<K> RefUnwindSafe for KeyedWatermarkTracker<K>where
K: RefUnwindSafe,
impl<K> Send for KeyedWatermarkTracker<K>where
K: Send,
impl<K> Sync for KeyedWatermarkTracker<K>where
K: Sync,
impl<K> Unpin for KeyedWatermarkTracker<K>where
K: Unpin,
impl<K> UnwindSafe for KeyedWatermarkTracker<K>where
K: UnwindSafe,
Blanket Implementations§
Source§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
Source§type ArchivedMetadata = ()
type ArchivedMetadata = ()
Source§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
Source§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
Source§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
Source§unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
Source§fn resolve_niched(out: Place<NichedOption<T, N1>>)
fn resolve_niched(out: Place<NichedOption<T, N1>>)
out indicating that a T is niched.