pub struct PartitionedWatermarkTracker { /* private fields */ }Expand description
Tracks watermarks across partitions within sources.
Extends WatermarkTracker to support partition-level granularity.
The combined watermark is the minimum across all active partitions.
§Thread Safety
This tracker is NOT thread-safe. For thread-per-core architectures,
use CoreWatermarkState for per-core tracking and coordinate
via the runtime.
§Example
use laminar_core::time::{PartitionedWatermarkTracker, PartitionId, Watermark};
let mut tracker = PartitionedWatermarkTracker::new();
// Register Kafka source with 4 partitions
tracker.register_source(0, 4);
// Update individual partitions
tracker.update_partition(PartitionId::new(0, 0), 5000);
tracker.update_partition(PartitionId::new(0, 1), 3000);
tracker.update_partition(PartitionId::new(0, 2), 4000);
tracker.update_partition(PartitionId::new(0, 3), 4500);
// Combined watermark is minimum (3000)
assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
// Mark partition 1 as idle
tracker.mark_partition_idle(PartitionId::new(0, 1));
// Now combined watermark advances to 4000 (min of active partitions)
assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));Implementations§
Source§impl PartitionedWatermarkTracker
impl PartitionedWatermarkTracker
Sourcepub const DEFAULT_IDLE_TIMEOUT: Duration
pub const DEFAULT_IDLE_TIMEOUT: Duration
Default idle timeout (30 seconds).
Sourcepub fn with_idle_timeout(idle_timeout: Duration) -> Self
pub fn with_idle_timeout(idle_timeout: Duration) -> Self
Creates a new tracker with custom idle timeout.
Sourcepub fn register_source(&mut self, source_id: usize, num_partitions: usize)
pub fn register_source(&mut self, source_id: usize, num_partitions: usize)
Registers a source with the specified number of partitions.
Creates partition state for each partition in the source. Call this when a source is created or when Kafka partitions are assigned.
Sourcepub fn add_partition(
&mut self,
partition: PartitionId,
) -> Result<(), WatermarkError>
pub fn add_partition( &mut self, partition: PartitionId, ) -> Result<(), WatermarkError>
Adds a partition to a source (for Kafka rebalancing).
§Errors
Returns an error if the partition already exists.
Sourcepub fn remove_partition(
&mut self,
partition: PartitionId,
) -> Option<PartitionWatermarkState>
pub fn remove_partition( &mut self, partition: PartitionId, ) -> Option<PartitionWatermarkState>
Removes a partition from tracking (for Kafka rebalancing).
Returns the partition state if it existed.
Sourcepub fn update_partition(
&mut self,
partition: PartitionId,
watermark: i64,
) -> Option<Watermark>
pub fn update_partition( &mut self, partition: PartitionId, watermark: i64, ) -> Option<Watermark>
Updates the watermark for a specific partition.
This also marks the partition as active and updates its last activity time.
§Returns
Some(Watermark) if the combined watermark advances.
Sourcepub fn update_partition_from_event(
&mut self,
partition: PartitionId,
event_time: i64,
max_lateness: i64,
) -> Option<Watermark>
pub fn update_partition_from_event( &mut self, partition: PartitionId, event_time: i64, max_lateness: i64, ) -> Option<Watermark>
Updates watermark from an event timestamp (applies bounded lateness).
Convenience method that subtracts the configured lateness.
Sourcepub fn mark_partition_idle(
&mut self,
partition: PartitionId,
) -> Option<Watermark>
pub fn mark_partition_idle( &mut self, partition: PartitionId, ) -> Option<Watermark>
Marks a partition as idle, excluding it from watermark calculation.
§Returns
Some(Watermark) if marking idle causes the combined watermark to advance.
Sourcepub fn mark_partition_active(&mut self, partition: PartitionId)
pub fn mark_partition_active(&mut self, partition: PartitionId)
Marks a partition as active again.
Called automatically when update_partition is called, but can be
called explicitly to reactivate a partition before receiving events.
Sourcepub fn check_idle_partitions(&mut self) -> Option<Watermark>
pub fn check_idle_partitions(&mut self) -> Option<Watermark>
Checks for partitions that have been idle longer than the timeout.
Should be called periodically from Ring 1.
§Returns
Some(Watermark) if marking idle partitions causes the combined watermark to advance.
Sourcepub fn current_watermark(&self) -> Option<Watermark>
pub fn current_watermark(&self) -> Option<Watermark>
Returns the current combined watermark.
Sourcepub fn partition_watermark(&self, partition: PartitionId) -> Option<i64>
pub fn partition_watermark(&self, partition: PartitionId) -> Option<i64>
Returns the watermark for a specific partition.
Sourcepub fn source_watermark(&self, source_id: usize) -> Option<i64>
pub fn source_watermark(&self, source_id: usize) -> Option<i64>
Returns the watermark for a source (minimum across its partitions).
Sourcepub fn is_partition_idle(&self, partition: PartitionId) -> bool
pub fn is_partition_idle(&self, partition: PartitionId) -> bool
Returns whether a partition is idle.
Sourcepub fn active_partition_count(&self, source_id: usize) -> usize
pub fn active_partition_count(&self, source_id: usize) -> usize
Returns the number of active partitions for a source.
Sourcepub fn partition_count(&self, source_id: usize) -> usize
pub fn partition_count(&self, source_id: usize) -> usize
Returns the total number of partitions for a source.
Sourcepub fn metrics(&self) -> &PartitionedWatermarkMetrics
pub fn metrics(&self) -> &PartitionedWatermarkMetrics
Returns metrics.
Sourcepub fn num_sources(&self) -> usize
pub fn num_sources(&self) -> usize
Returns the number of sources registered.
Sourcepub fn assign_partition_to_core(
&mut self,
partition: PartitionId,
core_id: usize,
)
pub fn assign_partition_to_core( &mut self, partition: PartitionId, core_id: usize, )
Assigns a partition to a core (for thread-per-core routing).
Sourcepub fn partition_core(&self, partition: PartitionId) -> Option<usize>
pub fn partition_core(&self, partition: PartitionId) -> Option<usize>
Returns the core assignment for a partition.
Sourcepub fn partitions_for_core(&self, core_id: usize) -> Vec<PartitionId>
pub fn partitions_for_core(&self, core_id: usize) -> Vec<PartitionId>
Returns all partitions assigned to a specific core.
Sourcepub fn idle_timeout(&self) -> Duration
pub fn idle_timeout(&self) -> Duration
Returns the idle timeout.
Sourcepub fn set_idle_timeout(&mut self, timeout: Duration)
pub fn set_idle_timeout(&mut self, timeout: Duration)
Sets the idle timeout.
Sourcepub fn partition_state(
&self,
partition: PartitionId,
) -> Option<&PartitionWatermarkState>
pub fn partition_state( &self, partition: PartitionId, ) -> Option<&PartitionWatermarkState>
Returns partition state for inspection.
Trait Implementations§
Source§impl Debug for PartitionedWatermarkTracker
impl Debug for PartitionedWatermarkTracker
Auto Trait Implementations§
impl Freeze for PartitionedWatermarkTracker
impl RefUnwindSafe for PartitionedWatermarkTracker
impl Send for PartitionedWatermarkTracker
impl Sync for PartitionedWatermarkTracker
impl Unpin for PartitionedWatermarkTracker
impl UnwindSafe for PartitionedWatermarkTracker
Blanket Implementations§
Source§impl<T> ArchivePointee for T
impl<T> ArchivePointee for T
Source§type ArchivedMetadata = ()
type ArchivedMetadata = ()
Source§fn pointer_metadata(
_: &<T as ArchivePointee>::ArchivedMetadata,
) -> <T as Pointee>::Metadata
fn pointer_metadata( _: &<T as ArchivePointee>::ArchivedMetadata, ) -> <T as Pointee>::Metadata
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> LayoutRaw for T
impl<T> LayoutRaw for T
Source§fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
fn layout_raw(_: <T as Pointee>::Metadata) -> Result<Layout, LayoutError>
Source§impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
impl<T, N1, N2> Niching<NichedOption<T, N1>> for N2
Source§unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
unsafe fn is_niched(niched: *const NichedOption<T, N1>) -> bool
Source§fn resolve_niched(out: Place<NichedOption<T, N1>>)
fn resolve_niched(out: Place<NichedOption<T, N1>>)
out indicating that a T is niched.