Skip to main content

PartitionedWatermarkTracker

Struct PartitionedWatermarkTracker 

Source
pub struct PartitionedWatermarkTracker { /* private fields */ }
Expand description

Tracks watermarks across partitions within sources.

Extends WatermarkTracker to support partition-level granularity. The combined watermark is the minimum across all active partitions.

§Thread Safety

This tracker is NOT thread-safe. For thread-per-core architectures, use CoreWatermarkState for per-core tracking and coordinate via the runtime.

§Example

use laminar_core::time::{PartitionedWatermarkTracker, PartitionId, Watermark};

let mut tracker = PartitionedWatermarkTracker::new();

// Register Kafka source with 4 partitions
tracker.register_source(0, 4);

// Update individual partitions
tracker.update_partition(PartitionId::new(0, 0), 5000);
tracker.update_partition(PartitionId::new(0, 1), 3000);
tracker.update_partition(PartitionId::new(0, 2), 4000);
tracker.update_partition(PartitionId::new(0, 3), 4500);

// Combined watermark is minimum (3000)
assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));

// Mark partition 1 as idle
tracker.mark_partition_idle(PartitionId::new(0, 1));

// Now combined watermark advances to 4000 (min of active partitions)
assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));

Implementations§

Source§

impl PartitionedWatermarkTracker

Source

pub const DEFAULT_IDLE_TIMEOUT: Duration

Default idle timeout (30 seconds).

Source

pub fn new() -> Self

Creates a new partitioned watermark tracker.

Source

pub fn with_idle_timeout(idle_timeout: Duration) -> Self

Creates a new tracker with custom idle timeout.

Source

pub fn register_source(&mut self, source_id: usize, num_partitions: usize)

Registers a source with the specified number of partitions.

Creates partition state for each partition in the source. Call this when a source is created or when Kafka partitions are assigned.

Source

pub fn add_partition( &mut self, partition: PartitionId, ) -> Result<(), WatermarkError>

Adds a partition to a source (for Kafka rebalancing).

§Errors

Returns an error if the partition already exists.

Source

pub fn remove_partition( &mut self, partition: PartitionId, ) -> Option<PartitionWatermarkState>

Removes a partition from tracking (for Kafka rebalancing).

Returns the partition state if it existed.

Source

pub fn update_partition( &mut self, partition: PartitionId, watermark: i64, ) -> Option<Watermark>

Updates the watermark for a specific partition.

This also marks the partition as active and updates its last activity time.

§Returns

Some(Watermark) if the combined watermark advances.

Source

pub fn update_partition_from_event( &mut self, partition: PartitionId, event_time: i64, max_lateness: i64, ) -> Option<Watermark>

Updates watermark from an event timestamp (applies bounded lateness).

Convenience method that subtracts the configured lateness.

Source

pub fn mark_partition_idle( &mut self, partition: PartitionId, ) -> Option<Watermark>

Marks a partition as idle, excluding it from watermark calculation.

§Returns

Some(Watermark) if marking idle causes the combined watermark to advance.

Source

pub fn mark_partition_active(&mut self, partition: PartitionId)

Marks a partition as active again.

Called automatically when update_partition is called, but can be called explicitly to reactivate a partition before receiving events.

Source

pub fn check_idle_partitions(&mut self) -> Option<Watermark>

Checks for partitions that have been idle longer than the timeout.

Should be called periodically from Ring 1.

§Returns

Some(Watermark) if marking idle partitions causes the combined watermark to advance.

Source

pub fn current_watermark(&self) -> Option<Watermark>

Returns the current combined watermark.

Source

pub fn partition_watermark(&self, partition: PartitionId) -> Option<i64>

Returns the watermark for a specific partition.

Source

pub fn source_watermark(&self, source_id: usize) -> Option<i64>

Returns the watermark for a source (minimum across its partitions).

Source

pub fn is_partition_idle(&self, partition: PartitionId) -> bool

Returns whether a partition is idle.

Source

pub fn active_partition_count(&self, source_id: usize) -> usize

Returns the number of active partitions for a source.

Source

pub fn partition_count(&self, source_id: usize) -> usize

Returns the total number of partitions for a source.

Source

pub fn metrics(&self) -> &PartitionedWatermarkMetrics

Returns metrics.

Source

pub fn num_sources(&self) -> usize

Returns the number of sources registered.

Source

pub fn assign_partition_to_core( &mut self, partition: PartitionId, core_id: usize, )

Assigns a partition to a core (for thread-per-core routing).

Source

pub fn partition_core(&self, partition: PartitionId) -> Option<usize>

Returns the core assignment for a partition.

Source

pub fn partitions_for_core(&self, core_id: usize) -> Vec<PartitionId>

Returns all partitions assigned to a specific core.

Source

pub fn idle_timeout(&self) -> Duration

Returns the idle timeout.

Source

pub fn set_idle_timeout(&mut self, timeout: Duration)

Sets the idle timeout.

Source

pub fn partition_state( &self, partition: PartitionId, ) -> Option<&PartitionWatermarkState>

Returns partition state for inspection.

Trait Implementations§

Source§

impl Debug for PartitionedWatermarkTracker

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Default for PartitionedWatermarkTracker

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> ArchivePointee for T

Source§

type ArchivedMetadata = ()

The archived version of the pointer metadata for this type.
Source§

fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata

Converts some archived metadata to the pointer metadata for itself.
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> LayoutRaw for T

Source§

fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>

Returns the layout of the type.
Source§

impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
where T: SharedNiching<N1, N2>, N1: Niching<T>, N2: Niching<T>,

Source§

unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool

Returns whether the given value has been niched. Read more
Source§

fn resolve_niched(out: Place<NichedOption<T, N1>>)

Writes data to out indicating that a T is niched.
Source§

impl<T> Pointee for T

Source§

type Metadata = ()

The metadata type for pointers and references to this type.
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> Allocation for T
where T: RefUnwindSafe + Send + Sync,