Skip to main content

KeyedWatermarkTracker

Struct KeyedWatermarkTracker 

Source
pub struct KeyedWatermarkTracker<K: Hash + Eq + Clone> { /* private fields */ }
Expand description

Tracks watermarks per logical key.

Provides fine-grained watermark tracking for multi-tenant workloads and scenarios with significant event-time skew between keys.

§Research Background

Based on research (March 2025), keyed watermarks achieve 99%+ accuracy compared to 63-67% with global watermarks.

§Example

use laminar_core::time::{KeyedWatermarkTracker, KeyedWatermarkConfig, Watermark};
use std::time::Duration;

let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_secs(5));
let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);

// Fast tenant advances quickly
tracker.update("tenant_a".to_string(), 10_000);
tracker.update("tenant_a".to_string(), 15_000);

// Slow tenant at earlier time
tracker.update("tenant_b".to_string(), 5_000);

// Per-key watermarks differ
assert_eq!(tracker.watermark_for_key(&"tenant_a".to_string()), Some(10_000));
assert_eq!(tracker.watermark_for_key(&"tenant_b".to_string()), Some(0));

// Global watermark is minimum
assert_eq!(tracker.global_watermark(), Some(Watermark::new(0)));

Implementations§

Source§

impl<K: Hash + Eq + Clone> KeyedWatermarkTracker<K>

Source

pub fn new(config: KeyedWatermarkConfig) -> Self

Creates a new keyed watermark tracker with the given configuration.

Source

pub fn with_defaults() -> Self

Creates a tracker with default configuration.

Source

pub fn update( &mut self, key: K, event_time: i64, ) -> Result<Option<Watermark>, KeyedWatermarkError>

Updates the watermark for a specific key.

§Returns
  • Ok(Some(Watermark)) if the global watermark changes
  • Ok(None) if no global change
§Errors

Returns KeyedWatermarkError::MaxKeysReached if max_keys is reached and the RejectNew eviction policy is configured.

§Example
use laminar_core::time::{KeyedWatermarkTracker, KeyedWatermarkConfig};
use std::time::Duration;

let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_millis(100));
let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);

// First update creates the key
let wm = tracker.update("key1".to_string(), 1000).unwrap();
assert!(wm.is_some()); // Global watermark advances
Source

pub fn update_batch( &mut self, events: &[(K, i64)], ) -> Result<Option<Watermark>, KeyedWatermarkError>

Batch update for multiple events (more efficient).

Returns the new global watermark if it changed.

§Errors

Returns KeyedWatermarkError::MaxKeysReached if max_keys is reached and the RejectNew eviction policy is configured.

Source

pub fn watermark_for_key(&self, key: &K) -> Option<i64>

Returns the watermark for a specific key.

Source

pub fn global_watermark(&self) -> Option<Watermark>

Returns the global watermark (minimum across active keys).

Source

pub fn is_late(&self, key: &K, event_time: i64) -> bool

Checks if an event is late for its key.

Uses the key’s individual watermark, not the global watermark. If the key doesn’t exist, returns false (not late).

Source

pub fn is_late_global(&self, event_time: i64) -> bool

Checks if an event is late using the global watermark.

Use this for cross-key ordering guarantees.

Source

pub fn mark_idle(&mut self, key: &K) -> Option<Watermark>

Marks a key as idle, excluding it from global watermark calculation.

Returns Some(Watermark) if the global watermark advances.

Source

pub fn mark_active(&mut self, key: &K)

Marks a key as active again.

Source

pub fn check_idle_keys(&mut self) -> Option<Watermark>

Checks for keys that have been idle longer than the timeout.

Should be called periodically from Ring 1.

Returns Some(Watermark) if marking idle keys causes the global watermark to advance.

Source

pub fn active_key_count(&self) -> usize

Returns the number of active (non-idle) keys.

Source

pub fn total_key_count(&self) -> usize

Returns the total number of tracked keys.

Source

pub fn metrics(&self) -> &KeyedWatermarkMetrics

Returns metrics.

Source

pub fn recalculate_global(&mut self) -> Option<Watermark>

Forces recalculation of global watermark.

Useful after bulk operations or recovery.

Source

pub fn remove_key(&mut self, key: &K) -> Option<KeyWatermarkState>

Removes a key from tracking.

Returns the key’s watermark state if it existed.

Source

pub fn clear(&mut self)

Clears all tracked keys.

Source

pub fn key_state(&self, key: &K) -> Option<&KeyWatermarkState>

Returns the state for a specific key.

Source

pub fn config(&self) -> &KeyedWatermarkConfig

Returns the configuration.

Source

pub fn contains_key(&self, key: &K) -> bool

Checks if a key exists in the tracker.

Source

pub fn keys(&self) -> impl Iterator<Item = &K>

Returns an iterator over all keys.

Source

pub fn iter(&self) -> impl Iterator<Item = (&K, &KeyWatermarkState)>

Returns an iterator over all key-state pairs.

Source

pub fn bounded_delay_ms(&self) -> i64

Returns the bounded delay in milliseconds.

Trait Implementations§

Source§

impl<K: Debug + Hash + Eq + Clone> Debug for KeyedWatermarkTracker<K>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<K> Freeze for KeyedWatermarkTracker<K>

§

impl<K> RefUnwindSafe for KeyedWatermarkTracker<K>
where K: RefUnwindSafe,

§

impl<K> Send for KeyedWatermarkTracker<K>
where K: Send,

§

impl<K> Sync for KeyedWatermarkTracker<K>
where K: Sync,

§

impl<K> Unpin for KeyedWatermarkTracker<K>
where K: Unpin,

§

impl<K> UnwindSafe for KeyedWatermarkTracker<K>
where K: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> ArchivePointee for T

Source§

type ArchivedMetadata = ()

The archived version of the pointer metadata for this type.
Source§

fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata

Converts some archived metadata to the pointer metadata for itself.
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> LayoutRaw for T

Source§

fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>

Returns the layout of the type.
Source§

impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
where T: SharedNiching<N1, N2>, N1: Niching<T>, N2: Niching<T>,

Source§

unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool

Returns whether the given value has been niched. Read more
Source§

fn resolve_niched(out: Place<NichedOption<T, N1>>)

Writes data to out indicating that a T is niched.
Source§

impl<T> Pointee for T

Source§

type Metadata = ()

The metadata type for pointers and references to this type.
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> Allocation for T
where T: RefUnwindSafe + Send + Sync,