Skip to main content

laminar_core/time/
partitioned_watermark.rs

1//! # Per-Partition Watermark Tracking
2//!
3//! Extends the watermark system to track watermarks per source partition rather
4//! than per source. This is critical for Kafka integration where each partition
5//! may have different event-time progress.
6//!
7//! ## Problem
8//!
9//! With per-source watermarks, a single slow partition blocks the entire source:
10//!
11//! ```text
12//! Source "orders" (Kafka topic with 4 partitions):
13//!   Partition 0: ████████████████ (active, ts: 10:05)
14//!   Partition 1: ████░░░░░░░░░░░░ (idle since 10:01)  ← BLOCKS ENTIRE SOURCE
15//!   Partition 2: ████████████████ (active, ts: 10:06)
16//!   Partition 3: ████████████████ (active, ts: 10:04)
17//!
18//! Source Watermark: stuck at 10:01 (because of Partition 1)
19//! ```
20//!
21//! ## Solution
22//!
23//! Track watermarks at partition granularity with per-partition idle detection:
24//!
25//! ```rust
26//! use laminar_core::time::{PartitionedWatermarkTracker, PartitionId, Watermark};
27//!
28//! let mut tracker = PartitionedWatermarkTracker::new();
29//!
30//! // Register Kafka source with 4 partitions
31//! tracker.register_source(0, 4);
32//!
33//! // Update individual partitions
34//! tracker.update_partition(PartitionId::new(0, 0), 5000);
35//! tracker.update_partition(PartitionId::new(0, 1), 3000);
36//! tracker.update_partition(PartitionId::new(0, 2), 4000);
37//! tracker.update_partition(PartitionId::new(0, 3), 4500);
38//!
39//! // Combined watermark is minimum (3000)
40//! assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
41//!
42//! // Mark partition 1 as idle
43//! tracker.mark_partition_idle(PartitionId::new(0, 1));
44//!
45//! // Now combined watermark advances to 4000 (min of active partitions)
46//! assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
47//! ```
48
49use std::collections::HashMap;
50use std::time::{Duration, Instant};
51
52use super::Watermark;
53
54/// Partition identifier within a source.
55///
56/// Uniquely identifies a partition by combining source ID and partition number.
57/// For Kafka sources, the partition number corresponds to the Kafka partition.
58#[derive(Debug, Clone, Copy, Hash, Eq, PartialEq)]
59pub struct PartitionId {
60    /// Source identifier (index in the source registry)
61    pub source_id: usize,
62    /// Partition number within the source
63    pub partition: u32,
64}
65
66impl PartitionId {
67    /// Creates a new partition identifier.
68    #[inline]
69    #[must_use]
70    pub const fn new(source_id: usize, partition: u32) -> Self {
71        Self {
72            source_id,
73            partition,
74        }
75    }
76}
77
78impl std::fmt::Display for PartitionId {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        write!(f, "{}:{}", self.source_id, self.partition)
81    }
82}
83
84/// Per-partition watermark state.
85///
86/// Tracks the watermark and activity status for a single partition.
87#[derive(Debug, Clone)]
88pub struct PartitionWatermarkState {
89    /// Current watermark for this partition
90    pub watermark: i64,
91    /// Last event time seen
92    pub last_event_time: i64,
93    /// Last activity timestamp (wall clock)
94    pub last_activity: Instant,
95    /// Whether this partition is marked idle
96    pub is_idle: bool,
97    /// Core assignment (for thread-per-core routing)
98    pub assigned_core: Option<usize>,
99}
100
101impl PartitionWatermarkState {
102    /// Creates new partition state.
103    #[must_use]
104    pub fn new() -> Self {
105        Self {
106            watermark: i64::MIN,
107            last_event_time: i64::MIN,
108            last_activity: Instant::now(),
109            is_idle: false,
110            assigned_core: None,
111        }
112    }
113
114    /// Creates partition state with a specific core assignment.
115    #[must_use]
116    pub fn with_core(core_id: usize) -> Self {
117        Self {
118            watermark: i64::MIN,
119            last_event_time: i64::MIN,
120            last_activity: Instant::now(),
121            is_idle: false,
122            assigned_core: Some(core_id),
123        }
124    }
125}
126
127impl Default for PartitionWatermarkState {
128    fn default() -> Self {
129        Self::new()
130    }
131}
132
133/// Metrics for partitioned watermark tracking.
134#[derive(Debug, Clone, Default)]
135pub struct PartitionedWatermarkMetrics {
136    /// Total partitions tracked
137    pub total_partitions: usize,
138    /// Currently active (non-idle) partitions
139    pub active_partitions: usize,
140    /// Idle partitions
141    pub idle_partitions: usize,
142    /// Watermark advancements
143    pub watermark_advances: u64,
144    /// Partition rebalances (adds/removes)
145    pub rebalances: u64,
146}
147
148impl PartitionedWatermarkMetrics {
149    /// Creates new metrics.
150    #[must_use]
151    pub fn new() -> Self {
152        Self::default()
153    }
154}
155
156/// Errors that can occur in partitioned watermark operations.
157#[derive(Debug, Clone, thiserror::Error)]
158pub enum WatermarkError {
159    /// Partition not registered
160    #[error("Unknown partition: {0}")]
161    UnknownPartition(PartitionId),
162
163    /// Source not found
164    #[error("Source not found: {0}")]
165    SourceNotFound(usize),
166
167    /// Invalid partition number
168    #[error("Invalid partition {partition} for source {source_id} (max: {max_partition})")]
169    InvalidPartition {
170        /// Source ID
171        source_id: usize,
172        /// Requested partition
173        partition: u32,
174        /// Maximum valid partition
175        max_partition: u32,
176    },
177
178    /// Partition already exists
179    #[error("Partition already exists: {0}")]
180    PartitionExists(PartitionId),
181}
182
183/// Tracks watermarks across partitions within sources.
184///
185/// Extends `WatermarkTracker` to support partition-level granularity.
186/// The combined watermark is the minimum across all active partitions.
187///
188/// # Thread Safety
189///
190/// This tracker is NOT thread-safe. For thread-per-core architectures,
191/// use [`CoreWatermarkState`] for per-core tracking and coordinate
192/// via the runtime.
193///
194/// # Example
195///
196/// ```rust
197/// use laminar_core::time::{PartitionedWatermarkTracker, PartitionId, Watermark};
198///
199/// let mut tracker = PartitionedWatermarkTracker::new();
200///
201/// // Register Kafka source with 4 partitions
202/// tracker.register_source(0, 4);
203///
204/// // Update individual partitions
205/// tracker.update_partition(PartitionId::new(0, 0), 5000);
206/// tracker.update_partition(PartitionId::new(0, 1), 3000);
207/// tracker.update_partition(PartitionId::new(0, 2), 4000);
208/// tracker.update_partition(PartitionId::new(0, 3), 4500);
209///
210/// // Combined watermark is minimum (3000)
211/// assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
212///
213/// // Mark partition 1 as idle
214/// tracker.mark_partition_idle(PartitionId::new(0, 1));
215///
216/// // Now combined watermark advances to 4000 (min of active partitions)
217/// assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
218/// ```
219#[derive(Debug)]
220pub struct PartitionedWatermarkTracker {
221    /// Per-partition state, keyed by `PartitionId`
222    partitions: HashMap<PartitionId, PartitionWatermarkState>,
223
224    /// Number of partitions per source (for bounds checking)
225    source_partition_counts: Vec<usize>,
226
227    /// Combined watermark across all active partitions
228    combined_watermark: i64,
229
230    /// Idle timeout for automatic idle detection
231    idle_timeout: Duration,
232
233    /// Metrics
234    metrics: PartitionedWatermarkMetrics,
235}
236
237impl PartitionedWatermarkTracker {
238    /// Default idle timeout (30 seconds).
239    pub const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(30);
240
241    /// Creates a new partitioned watermark tracker.
242    #[must_use]
243    pub fn new() -> Self {
244        Self {
245            partitions: HashMap::new(),
246            source_partition_counts: Vec::new(),
247            combined_watermark: i64::MIN,
248            idle_timeout: Self::DEFAULT_IDLE_TIMEOUT,
249            metrics: PartitionedWatermarkMetrics::new(),
250        }
251    }
252
253    /// Creates a new tracker with custom idle timeout.
254    #[must_use]
255    pub fn with_idle_timeout(idle_timeout: Duration) -> Self {
256        Self {
257            partitions: HashMap::new(),
258            source_partition_counts: Vec::new(),
259            combined_watermark: i64::MIN,
260            idle_timeout,
261            metrics: PartitionedWatermarkMetrics::new(),
262        }
263    }
264
265    /// Registers a source with the specified number of partitions.
266    ///
267    /// Creates partition state for each partition in the source.
268    /// Call this when a source is created or when Kafka partitions are assigned.
269    pub fn register_source(&mut self, source_id: usize, num_partitions: usize) {
270        // Expand source_partition_counts if needed
271        while self.source_partition_counts.len() <= source_id {
272            self.source_partition_counts.push(0);
273        }
274
275        self.source_partition_counts[source_id] = num_partitions;
276
277        // Create partition state for each partition
278        #[allow(clippy::cast_possible_truncation)]
279        // Partition count bounded by Kafka max (< u32::MAX)
280        for partition in 0..num_partitions {
281            let pid = PartitionId::new(source_id, partition as u32);
282            self.partitions.entry(pid).or_default();
283        }
284
285        self.update_metrics();
286    }
287
288    /// Adds a partition to a source (for Kafka rebalancing).
289    ///
290    /// # Errors
291    ///
292    /// Returns an error if the partition already exists.
293    pub fn add_partition(&mut self, partition: PartitionId) -> Result<(), WatermarkError> {
294        if self.partitions.contains_key(&partition) {
295            return Err(WatermarkError::PartitionExists(partition));
296        }
297
298        // Expand source_partition_counts if needed
299        while self.source_partition_counts.len() <= partition.source_id {
300            self.source_partition_counts.push(0);
301        }
302
303        // Update partition count
304        let current_count = self.source_partition_counts[partition.source_id];
305        if partition.partition as usize >= current_count {
306            self.source_partition_counts[partition.source_id] = partition.partition as usize + 1;
307        }
308
309        self.partitions
310            .insert(partition, PartitionWatermarkState::new());
311        self.metrics.rebalances += 1;
312        self.update_metrics();
313
314        Ok(())
315    }
316
317    /// Removes a partition from tracking (for Kafka rebalancing).
318    ///
319    /// Returns the partition state if it existed.
320    pub fn remove_partition(&mut self, partition: PartitionId) -> Option<PartitionWatermarkState> {
321        let state = self.partitions.remove(&partition);
322        if state.is_some() {
323            self.metrics.rebalances += 1;
324            self.update_metrics();
325            // Recalculate combined watermark
326            self.recalculate_combined();
327        }
328        state
329    }
330
331    /// Updates the watermark for a specific partition.
332    ///
333    /// This also marks the partition as active and updates its last activity time.
334    ///
335    /// # Returns
336    ///
337    /// `Some(Watermark)` if the combined watermark advances.
338    #[inline]
339    pub fn update_partition(
340        &mut self,
341        partition: PartitionId,
342        watermark: i64,
343    ) -> Option<Watermark> {
344        if let Some(state) = self.partitions.get_mut(&partition) {
345            // Mark as active
346            if state.is_idle {
347                state.is_idle = false;
348                self.metrics.active_partitions += 1;
349                self.metrics.idle_partitions = self.metrics.idle_partitions.saturating_sub(1);
350            }
351            state.last_activity = Instant::now();
352
353            // Update watermark if it advances
354            if watermark > state.watermark {
355                state.watermark = watermark;
356                state.last_event_time = watermark;
357                return self.try_advance_combined();
358            }
359        }
360        None
361    }
362
363    /// Updates watermark from an event timestamp (applies bounded lateness).
364    ///
365    /// Convenience method that subtracts the configured lateness.
366    #[inline]
367    pub fn update_partition_from_event(
368        &mut self,
369        partition: PartitionId,
370        event_time: i64,
371        max_lateness: i64,
372    ) -> Option<Watermark> {
373        let watermark = event_time.saturating_sub(max_lateness);
374        self.update_partition(partition, watermark)
375    }
376
377    /// Marks a partition as idle, excluding it from watermark calculation.
378    ///
379    /// # Returns
380    ///
381    /// `Some(Watermark)` if marking idle causes the combined watermark to advance.
382    pub fn mark_partition_idle(&mut self, partition: PartitionId) -> Option<Watermark> {
383        if let Some(state) = self.partitions.get_mut(&partition) {
384            if !state.is_idle {
385                state.is_idle = true;
386                self.metrics.idle_partitions += 1;
387                self.metrics.active_partitions = self.metrics.active_partitions.saturating_sub(1);
388                return self.try_advance_combined();
389            }
390        }
391        None
392    }
393
394    /// Marks a partition as active again.
395    ///
396    /// Called automatically when `update_partition` is called, but can be
397    /// called explicitly to reactivate a partition before receiving events.
398    pub fn mark_partition_active(&mut self, partition: PartitionId) {
399        if let Some(state) = self.partitions.get_mut(&partition) {
400            if state.is_idle {
401                state.is_idle = false;
402                state.last_activity = Instant::now();
403                self.metrics.active_partitions += 1;
404                self.metrics.idle_partitions = self.metrics.idle_partitions.saturating_sub(1);
405            }
406        }
407    }
408
409    /// Checks for partitions that have been idle longer than the timeout.
410    ///
411    /// Should be called periodically from Ring 1.
412    ///
413    /// # Returns
414    ///
415    /// `Some(Watermark)` if marking idle partitions causes the combined watermark to advance.
416    pub fn check_idle_partitions(&mut self) -> Option<Watermark> {
417        let mut any_marked = false;
418
419        for state in self.partitions.values_mut() {
420            if !state.is_idle && state.last_activity.elapsed() >= self.idle_timeout {
421                state.is_idle = true;
422                any_marked = true;
423            }
424        }
425
426        if any_marked {
427            self.update_metrics();
428            self.try_advance_combined()
429        } else {
430            None
431        }
432    }
433
434    /// Returns the current combined watermark.
435    #[inline]
436    #[must_use]
437    pub fn current_watermark(&self) -> Option<Watermark> {
438        if self.combined_watermark == i64::MIN {
439            None
440        } else {
441            Some(Watermark::new(self.combined_watermark))
442        }
443    }
444
445    /// Returns the watermark for a specific partition.
446    #[must_use]
447    pub fn partition_watermark(&self, partition: PartitionId) -> Option<i64> {
448        self.partitions.get(&partition).map(|s| s.watermark)
449    }
450
451    /// Returns the watermark for a source (minimum across its partitions).
452    #[must_use]
453    pub fn source_watermark(&self, source_id: usize) -> Option<i64> {
454        let mut min_watermark = i64::MAX;
455        let mut found = false;
456
457        for (pid, state) in &self.partitions {
458            if pid.source_id == source_id && !state.is_idle {
459                found = true;
460                min_watermark = min_watermark.min(state.watermark);
461            }
462        }
463
464        if found && min_watermark != i64::MAX {
465            Some(min_watermark)
466        } else {
467            None
468        }
469    }
470
471    /// Returns whether a partition is idle.
472    #[must_use]
473    pub fn is_partition_idle(&self, partition: PartitionId) -> bool {
474        self.partitions.get(&partition).is_some_and(|s| s.is_idle)
475    }
476
477    /// Returns the number of active partitions for a source.
478    #[must_use]
479    pub fn active_partition_count(&self, source_id: usize) -> usize {
480        self.partitions
481            .iter()
482            .filter(|(pid, state)| pid.source_id == source_id && !state.is_idle)
483            .count()
484    }
485
486    /// Returns the total number of partitions for a source.
487    #[must_use]
488    pub fn partition_count(&self, source_id: usize) -> usize {
489        self.source_partition_counts
490            .get(source_id)
491            .copied()
492            .unwrap_or(0)
493    }
494
495    /// Returns metrics.
496    #[must_use]
497    pub fn metrics(&self) -> &PartitionedWatermarkMetrics {
498        &self.metrics
499    }
500
501    /// Returns the number of sources registered.
502    #[must_use]
503    pub fn num_sources(&self) -> usize {
504        self.source_partition_counts.len()
505    }
506
507    /// Assigns a partition to a core (for thread-per-core routing).
508    pub fn assign_partition_to_core(&mut self, partition: PartitionId, core_id: usize) {
509        if let Some(state) = self.partitions.get_mut(&partition) {
510            state.assigned_core = Some(core_id);
511        }
512    }
513
514    /// Returns the core assignment for a partition.
515    #[must_use]
516    pub fn partition_core(&self, partition: PartitionId) -> Option<usize> {
517        self.partitions
518            .get(&partition)
519            .and_then(|s| s.assigned_core)
520    }
521
522    /// Returns all partitions assigned to a specific core.
523    #[must_use]
524    pub fn partitions_for_core(&self, core_id: usize) -> Vec<PartitionId> {
525        self.partitions
526            .iter()
527            .filter_map(|(pid, state)| {
528                if state.assigned_core == Some(core_id) {
529                    Some(*pid)
530                } else {
531                    None
532                }
533            })
534            .collect()
535    }
536
537    /// Returns the idle timeout.
538    #[must_use]
539    pub fn idle_timeout(&self) -> Duration {
540        self.idle_timeout
541    }
542
543    /// Sets the idle timeout.
544    pub fn set_idle_timeout(&mut self, timeout: Duration) {
545        self.idle_timeout = timeout;
546    }
547
548    /// Returns partition state for inspection.
549    #[must_use]
550    pub fn partition_state(&self, partition: PartitionId) -> Option<&PartitionWatermarkState> {
551        self.partitions.get(&partition)
552    }
553
554    /// Tries to advance the combined watermark.
555    fn try_advance_combined(&mut self) -> Option<Watermark> {
556        let new_combined = self.calculate_combined();
557
558        if new_combined > self.combined_watermark && new_combined != i64::MAX {
559            self.combined_watermark = new_combined;
560            self.metrics.watermark_advances += 1;
561            Some(Watermark::new(new_combined))
562        } else {
563            None
564        }
565    }
566
567    /// Recalculates the combined watermark from scratch.
568    fn recalculate_combined(&mut self) {
569        let new_combined = self.calculate_combined();
570        if new_combined != i64::MAX {
571            self.combined_watermark = new_combined;
572        }
573    }
574
575    /// Calculates the combined watermark.
576    fn calculate_combined(&self) -> i64 {
577        let mut min_watermark = i64::MAX;
578        let mut has_active = false;
579
580        for state in self.partitions.values() {
581            if !state.is_idle {
582                has_active = true;
583                min_watermark = min_watermark.min(state.watermark);
584            }
585        }
586
587        // If all partitions are idle, use the max watermark to allow progress
588        if !has_active {
589            min_watermark = self
590                .partitions
591                .values()
592                .map(|s| s.watermark)
593                .max()
594                .unwrap_or(i64::MIN);
595        }
596
597        min_watermark
598    }
599
600    /// Updates metrics counts.
601    fn update_metrics(&mut self) {
602        self.metrics.total_partitions = self.partitions.len();
603        self.metrics.idle_partitions = self.partitions.values().filter(|s| s.is_idle).count();
604        self.metrics.active_partitions =
605            self.metrics.total_partitions - self.metrics.idle_partitions;
606    }
607}
608
609impl Default for PartitionedWatermarkTracker {
610    fn default() -> Self {
611        Self::new()
612    }
613}
614
615/// Per-core partition watermark aggregator.
616///
617/// Each core tracks watermarks for its assigned partitions.
618/// The global tracker aggregates across cores.
619///
620/// This is used in thread-per-core architectures where each core
621/// processes a subset of partitions.
622#[derive(Debug)]
623pub struct CoreWatermarkState {
624    /// Partitions assigned to this core
625    assigned_partitions: Vec<PartitionId>,
626
627    /// Per-partition watermarks (parallel to `assigned_partitions`)
628    partition_watermarks: Vec<i64>,
629
630    /// Local watermark (minimum across assigned partitions)
631    local_watermark: i64,
632
633    /// Idle status for each partition (parallel to `assigned_partitions`)
634    idle_status: Vec<bool>,
635
636    /// Core ID
637    core_id: usize,
638}
639
640impl CoreWatermarkState {
641    /// Creates a new per-core watermark state.
642    #[must_use]
643    pub fn new(core_id: usize) -> Self {
644        Self {
645            assigned_partitions: Vec::new(),
646            partition_watermarks: Vec::new(),
647            local_watermark: i64::MIN,
648            idle_status: Vec::new(),
649            core_id,
650        }
651    }
652
653    /// Creates with pre-assigned partitions.
654    #[must_use]
655    pub fn with_partitions(core_id: usize, partitions: Vec<PartitionId>) -> Self {
656        let count = partitions.len();
657        Self {
658            assigned_partitions: partitions,
659            partition_watermarks: vec![i64::MIN; count],
660            local_watermark: i64::MIN,
661            idle_status: vec![false; count],
662            core_id,
663        }
664    }
665
666    /// Assigns a partition to this core.
667    pub fn assign_partition(&mut self, partition: PartitionId) {
668        if !self.assigned_partitions.contains(&partition) {
669            self.assigned_partitions.push(partition);
670            self.partition_watermarks.push(i64::MIN);
671            self.idle_status.push(false);
672        }
673    }
674
675    /// Removes a partition from this core.
676    pub fn remove_partition(&mut self, partition: PartitionId) -> bool {
677        if let Some(idx) = self
678            .assigned_partitions
679            .iter()
680            .position(|p| *p == partition)
681        {
682            self.assigned_partitions.swap_remove(idx);
683            self.partition_watermarks.swap_remove(idx);
684            self.idle_status.swap_remove(idx);
685            self.recalculate_local();
686            true
687        } else {
688            false
689        }
690    }
691
692    /// Updates a partition watermark on this core.
693    ///
694    /// # Returns
695    ///
696    /// `Some(i64)` with the new local watermark if it advances.
697    #[inline]
698    pub fn update_partition(&mut self, partition: PartitionId, watermark: i64) -> Option<i64> {
699        if let Some(idx) = self
700            .assigned_partitions
701            .iter()
702            .position(|p| *p == partition)
703        {
704            if watermark > self.partition_watermarks[idx] {
705                self.partition_watermarks[idx] = watermark;
706                self.idle_status[idx] = false;
707
708                // Check if local watermark advances
709                let new_local = self.calculate_local();
710                if new_local > self.local_watermark {
711                    self.local_watermark = new_local;
712                    return Some(new_local);
713                }
714            }
715        }
716        None
717    }
718
719    /// Marks a partition as idle on this core.
720    pub fn mark_idle(&mut self, partition: PartitionId) -> Option<i64> {
721        if let Some(idx) = self
722            .assigned_partitions
723            .iter()
724            .position(|p| *p == partition)
725        {
726            if !self.idle_status[idx] {
727                self.idle_status[idx] = true;
728
729                // Recalculate local watermark
730                let new_local = self.calculate_local();
731                if new_local > self.local_watermark {
732                    self.local_watermark = new_local;
733                    return Some(new_local);
734                }
735            }
736        }
737        None
738    }
739
740    /// Returns the local (per-core) watermark.
741    #[inline]
742    #[must_use]
743    pub fn local_watermark(&self) -> i64 {
744        self.local_watermark
745    }
746
747    /// Returns the core ID.
748    #[must_use]
749    pub fn core_id(&self) -> usize {
750        self.core_id
751    }
752
753    /// Returns the assigned partitions.
754    #[must_use]
755    pub fn assigned_partitions(&self) -> &[PartitionId] {
756        &self.assigned_partitions
757    }
758
759    /// Returns the number of assigned partitions.
760    #[must_use]
761    pub fn partition_count(&self) -> usize {
762        self.assigned_partitions.len()
763    }
764
765    /// Calculates the local watermark.
766    fn calculate_local(&self) -> i64 {
767        let mut min = i64::MAX;
768        let mut has_active = false;
769
770        for (idx, &wm) in self.partition_watermarks.iter().enumerate() {
771            if !self.idle_status[idx] {
772                has_active = true;
773                min = min.min(wm);
774            }
775        }
776
777        if !has_active {
778            // All idle - use max
779            self.partition_watermarks
780                .iter()
781                .copied()
782                .max()
783                .unwrap_or(i64::MIN)
784        } else if min == i64::MAX {
785            i64::MIN
786        } else {
787            min
788        }
789    }
790
791    /// Recalculates the local watermark from scratch.
792    fn recalculate_local(&mut self) {
793        self.local_watermark = self.calculate_local();
794    }
795}
796
797/// Collects watermarks from multiple cores and computes the global watermark.
798///
799/// This is used by the thread-per-core runtime coordinator to aggregate
800/// watermarks across all cores.
801#[derive(Debug)]
802pub struct GlobalWatermarkCollector {
803    /// Per-core watermarks
804    core_watermarks: Vec<i64>,
805
806    /// Global watermark (minimum across all cores)
807    global_watermark: i64,
808}
809
810impl GlobalWatermarkCollector {
811    /// Creates a new collector for the given number of cores.
812    #[must_use]
813    pub fn new(num_cores: usize) -> Self {
814        Self {
815            core_watermarks: vec![i64::MIN; num_cores],
816            global_watermark: i64::MIN,
817        }
818    }
819
820    /// Updates the watermark for a specific core.
821    ///
822    /// # Returns
823    ///
824    /// `Some(Watermark)` if the global watermark advances.
825    #[inline]
826    pub fn update_core(&mut self, core_id: usize, watermark: i64) -> Option<Watermark> {
827        if core_id < self.core_watermarks.len() {
828            self.core_watermarks[core_id] = watermark;
829
830            // Calculate new global minimum
831            let new_global = self
832                .core_watermarks
833                .iter()
834                .copied()
835                .min()
836                .unwrap_or(i64::MIN);
837
838            if new_global > self.global_watermark && new_global != i64::MIN {
839                self.global_watermark = new_global;
840                return Some(Watermark::new(new_global));
841            }
842        }
843        None
844    }
845
846    /// Returns the current global watermark.
847    #[must_use]
848    pub fn global_watermark(&self) -> Option<Watermark> {
849        if self.global_watermark == i64::MIN {
850            None
851        } else {
852            Some(Watermark::new(self.global_watermark))
853        }
854    }
855
856    /// Returns the watermark for a specific core.
857    #[must_use]
858    pub fn core_watermark(&self, core_id: usize) -> Option<i64> {
859        self.core_watermarks.get(core_id).copied()
860    }
861
862    /// Returns the number of cores.
863    #[must_use]
864    pub fn num_cores(&self) -> usize {
865        self.core_watermarks.len()
866    }
867}
868
869#[cfg(test)]
870mod tests {
871    use super::*;
872
873    #[test]
874    fn test_partition_id_creation() {
875        let pid = PartitionId::new(1, 3);
876        assert_eq!(pid.source_id, 1);
877        assert_eq!(pid.partition, 3);
878    }
879
880    #[test]
881    fn test_partition_id_equality() {
882        let p1 = PartitionId::new(1, 2);
883        let p2 = PartitionId::new(1, 2);
884        let p3 = PartitionId::new(1, 3);
885
886        assert_eq!(p1, p2);
887        assert_ne!(p1, p3);
888    }
889
890    #[test]
891    fn test_partition_id_display() {
892        let pid = PartitionId::new(2, 5);
893        assert_eq!(format!("{pid}"), "2:5");
894    }
895
896    #[test]
897    fn test_partitioned_tracker_single_partition_updates_watermark() {
898        let mut tracker = PartitionedWatermarkTracker::new();
899        tracker.register_source(0, 1);
900
901        let wm = tracker.update_partition(PartitionId::new(0, 0), 1000);
902        assert_eq!(wm, Some(Watermark::new(1000)));
903        assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
904    }
905
906    #[test]
907    fn test_partitioned_tracker_multiple_partitions_uses_minimum() {
908        let mut tracker = PartitionedWatermarkTracker::new();
909        tracker.register_source(0, 4);
910
911        tracker.update_partition(PartitionId::new(0, 0), 5000);
912        tracker.update_partition(PartitionId::new(0, 1), 3000);
913        tracker.update_partition(PartitionId::new(0, 2), 4000);
914        tracker.update_partition(PartitionId::new(0, 3), 4500);
915
916        assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
917    }
918
919    #[test]
920    fn test_partitioned_tracker_idle_partition_excluded_from_min() {
921        let mut tracker = PartitionedWatermarkTracker::new();
922        tracker.register_source(0, 4);
923
924        tracker.update_partition(PartitionId::new(0, 0), 5000);
925        tracker.update_partition(PartitionId::new(0, 1), 1000); // Slow partition
926        tracker.update_partition(PartitionId::new(0, 2), 4000);
927        tracker.update_partition(PartitionId::new(0, 3), 4500);
928
929        assert_eq!(tracker.current_watermark(), Some(Watermark::new(1000)));
930
931        // Mark slow partition as idle
932        let wm = tracker.mark_partition_idle(PartitionId::new(0, 1));
933        assert_eq!(wm, Some(Watermark::new(4000)));
934        assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
935    }
936
937    #[test]
938    fn test_partitioned_tracker_all_idle_uses_max() {
939        let mut tracker = PartitionedWatermarkTracker::new();
940        tracker.register_source(0, 2);
941
942        tracker.update_partition(PartitionId::new(0, 0), 5000);
943        tracker.update_partition(PartitionId::new(0, 1), 3000);
944
945        tracker.mark_partition_idle(PartitionId::new(0, 0));
946        let wm = tracker.mark_partition_idle(PartitionId::new(0, 1));
947
948        // When all idle, use max to allow progress
949        assert_eq!(wm, Some(Watermark::new(5000)));
950    }
951
952    #[test]
953    fn test_partitioned_tracker_partition_reactivated_on_update() {
954        let mut tracker = PartitionedWatermarkTracker::new();
955        tracker.register_source(0, 2);
956
957        tracker.update_partition(PartitionId::new(0, 0), 5000);
958        tracker.update_partition(PartitionId::new(0, 1), 3000);
959
960        // Mark partition 1 idle
961        tracker.mark_partition_idle(PartitionId::new(0, 1));
962        assert!(tracker.is_partition_idle(PartitionId::new(0, 1)));
963        assert_eq!(tracker.current_watermark(), Some(Watermark::new(5000)));
964
965        // Update partition 1 - should reactivate it
966        tracker.update_partition(PartitionId::new(0, 1), 4000);
967        assert!(!tracker.is_partition_idle(PartitionId::new(0, 1)));
968
969        // Watermark should now be min of both (4000)
970        assert_eq!(tracker.current_watermark(), Some(Watermark::new(5000))); // Still 5000 because combined can't regress
971    }
972
973    #[test]
974    fn test_partitioned_tracker_add_partition_during_operation() {
975        let mut tracker = PartitionedWatermarkTracker::new();
976        tracker.register_source(0, 2);
977
978        tracker.update_partition(PartitionId::new(0, 0), 5000);
979        tracker.update_partition(PartitionId::new(0, 1), 4000);
980
981        // Add a new partition (Kafka rebalance)
982        tracker.add_partition(PartitionId::new(0, 2)).unwrap();
983
984        // New partition starts at MIN, so watermark shouldn't advance yet
985        assert_eq!(tracker.partition_count(0), 3);
986
987        // Update new partition
988        tracker.update_partition(PartitionId::new(0, 2), 3000);
989        // Watermark is now 3000 (min of all)
990        assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000))); // Can't regress
991    }
992
993    #[test]
994    fn test_partitioned_tracker_remove_partition_recalculates_watermark() {
995        let mut tracker = PartitionedWatermarkTracker::new();
996        tracker.register_source(0, 3);
997
998        tracker.update_partition(PartitionId::new(0, 0), 5000);
999        tracker.update_partition(PartitionId::new(0, 1), 2000); // Slowest
1000        tracker.update_partition(PartitionId::new(0, 2), 4000);
1001
1002        assert_eq!(tracker.current_watermark(), Some(Watermark::new(2000)));
1003
1004        // Remove slowest partition (e.g., Kafka rebalance)
1005        let state = tracker.remove_partition(PartitionId::new(0, 1));
1006        assert!(state.is_some());
1007        assert_eq!(state.unwrap().watermark, 2000);
1008
1009        // Watermark advances to 4000 (min of remaining: 5000, 4000)
1010        // This is correct - removing a slow partition allows progress
1011        assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
1012    }
1013
1014    #[test]
1015    fn test_partitioned_tracker_check_idle_marks_stale_partitions() {
1016        let mut tracker = PartitionedWatermarkTracker::with_idle_timeout(Duration::from_millis(10));
1017        tracker.register_source(0, 2);
1018
1019        tracker.update_partition(PartitionId::new(0, 0), 5000);
1020        tracker.update_partition(PartitionId::new(0, 1), 3000);
1021
1022        // Wait for timeout
1023        std::thread::sleep(Duration::from_millis(20));
1024
1025        // Only update partition 0
1026        tracker.update_partition(PartitionId::new(0, 0), 6000);
1027
1028        // Check for idle - partition 1 should be marked idle
1029        let wm = tracker.check_idle_partitions();
1030
1031        assert!(tracker.is_partition_idle(PartitionId::new(0, 1)));
1032        // Watermark should advance since idle partition is excluded
1033        assert!(wm.is_some() || tracker.current_watermark() == Some(Watermark::new(6000)));
1034    }
1035
1036    #[test]
1037    fn test_partitioned_tracker_source_watermark_aggregates_partitions() {
1038        let mut tracker = PartitionedWatermarkTracker::new();
1039        tracker.register_source(0, 2);
1040        tracker.register_source(1, 2);
1041
1042        tracker.update_partition(PartitionId::new(0, 0), 5000);
1043        tracker.update_partition(PartitionId::new(0, 1), 3000);
1044        tracker.update_partition(PartitionId::new(1, 0), 4000);
1045        tracker.update_partition(PartitionId::new(1, 1), 6000);
1046
1047        assert_eq!(tracker.source_watermark(0), Some(3000));
1048        assert_eq!(tracker.source_watermark(1), Some(4000));
1049    }
1050
1051    #[test]
1052    fn test_partitioned_tracker_metrics_accurate() {
1053        let mut tracker = PartitionedWatermarkTracker::new();
1054        tracker.register_source(0, 4);
1055
1056        tracker.update_partition(PartitionId::new(0, 0), 1000);
1057        tracker.update_partition(PartitionId::new(0, 1), 2000);
1058
1059        let metrics = tracker.metrics();
1060        assert_eq!(metrics.total_partitions, 4);
1061        assert_eq!(metrics.active_partitions, 4);
1062        assert_eq!(metrics.idle_partitions, 0);
1063
1064        tracker.mark_partition_idle(PartitionId::new(0, 2));
1065
1066        let metrics = tracker.metrics();
1067        assert_eq!(metrics.idle_partitions, 1);
1068        assert_eq!(metrics.active_partitions, 3);
1069    }
1070
1071    #[test]
1072    fn test_partitioned_tracker_core_assignment() {
1073        let mut tracker = PartitionedWatermarkTracker::new();
1074        tracker.register_source(0, 4);
1075
1076        tracker.assign_partition_to_core(PartitionId::new(0, 0), 0);
1077        tracker.assign_partition_to_core(PartitionId::new(0, 1), 0);
1078        tracker.assign_partition_to_core(PartitionId::new(0, 2), 1);
1079        tracker.assign_partition_to_core(PartitionId::new(0, 3), 1);
1080
1081        assert_eq!(tracker.partition_core(PartitionId::new(0, 0)), Some(0));
1082        assert_eq!(tracker.partition_core(PartitionId::new(0, 2)), Some(1));
1083
1084        let core0_partitions = tracker.partitions_for_core(0);
1085        assert_eq!(core0_partitions.len(), 2);
1086    }
1087
1088    #[test]
1089    fn test_partitioned_tracker_multiple_sources() {
1090        let mut tracker = PartitionedWatermarkTracker::new();
1091        tracker.register_source(0, 2);
1092        tracker.register_source(1, 3);
1093
1094        assert_eq!(tracker.num_sources(), 2);
1095        assert_eq!(tracker.partition_count(0), 2);
1096        assert_eq!(tracker.partition_count(1), 3);
1097    }
1098
1099    #[test]
1100    fn test_partitioned_tracker_update_from_event() {
1101        let mut tracker = PartitionedWatermarkTracker::new();
1102        tracker.register_source(0, 1);
1103
1104        // Event time 5000, max lateness 1000 -> watermark 4000
1105        let wm = tracker.update_partition_from_event(PartitionId::new(0, 0), 5000, 1000);
1106        assert_eq!(wm, Some(Watermark::new(4000)));
1107    }
1108
1109    #[test]
1110    fn test_partitioned_tracker_add_partition_error() {
1111        let mut tracker = PartitionedWatermarkTracker::new();
1112        tracker.register_source(0, 2);
1113
1114        // Adding existing partition should fail
1115        let result = tracker.add_partition(PartitionId::new(0, 0));
1116        assert!(matches!(result, Err(WatermarkError::PartitionExists(_))));
1117    }
1118
1119    #[test]
1120    fn test_core_watermark_state_creation() {
1121        let state = CoreWatermarkState::new(0);
1122        assert_eq!(state.core_id(), 0);
1123        assert_eq!(state.partition_count(), 0);
1124        assert_eq!(state.local_watermark(), i64::MIN);
1125    }
1126
1127    #[test]
1128    fn test_core_watermark_state_with_partitions() {
1129        let partitions = vec![PartitionId::new(0, 0), PartitionId::new(0, 1)];
1130        let state = CoreWatermarkState::with_partitions(1, partitions);
1131
1132        assert_eq!(state.core_id(), 1);
1133        assert_eq!(state.partition_count(), 2);
1134    }
1135
1136    #[test]
1137    fn test_core_watermark_state_update() {
1138        let mut state = CoreWatermarkState::with_partitions(
1139            0,
1140            vec![PartitionId::new(0, 0), PartitionId::new(0, 1)],
1141        );
1142
1143        // First update - local watermark is still MIN because partition 1 is at MIN
1144        let wm = state.update_partition(PartitionId::new(0, 0), 5000);
1145        assert!(wm.is_none()); // Can't advance - other partition still at MIN
1146        assert_eq!(state.local_watermark(), i64::MIN);
1147
1148        // Second update - now both partitions have values
1149        let wm = state.update_partition(PartitionId::new(0, 1), 3000);
1150        assert_eq!(wm, Some(3000)); // Local watermark advances to min of both
1151        assert_eq!(state.local_watermark(), 3000);
1152
1153        // Update partition 0 again - no change to local (still 3000)
1154        let wm = state.update_partition(PartitionId::new(0, 0), 6000);
1155        assert!(wm.is_none());
1156        assert_eq!(state.local_watermark(), 3000);
1157
1158        // Update partition 1 - local advances
1159        let wm = state.update_partition(PartitionId::new(0, 1), 4000);
1160        assert_eq!(wm, Some(4000));
1161        assert_eq!(state.local_watermark(), 4000);
1162    }
1163
1164    #[test]
1165    fn test_core_watermark_state_idle() {
1166        let mut state = CoreWatermarkState::with_partitions(
1167            0,
1168            vec![PartitionId::new(0, 0), PartitionId::new(0, 1)],
1169        );
1170
1171        state.update_partition(PartitionId::new(0, 0), 5000);
1172        state.update_partition(PartitionId::new(0, 1), 2000);
1173
1174        assert_eq!(state.local_watermark(), 2000);
1175
1176        // Mark slow partition idle
1177        let wm = state.mark_idle(PartitionId::new(0, 1));
1178        assert_eq!(wm, Some(5000));
1179        assert_eq!(state.local_watermark(), 5000);
1180    }
1181
1182    #[test]
1183    fn test_core_watermark_state_assign_remove() {
1184        let mut state = CoreWatermarkState::new(0);
1185
1186        state.assign_partition(PartitionId::new(0, 0));
1187        state.assign_partition(PartitionId::new(0, 1));
1188        assert_eq!(state.partition_count(), 2);
1189
1190        state.update_partition(PartitionId::new(0, 0), 5000);
1191        state.update_partition(PartitionId::new(0, 1), 3000);
1192        assert_eq!(state.local_watermark(), 3000);
1193
1194        // Remove slow partition
1195        let removed = state.remove_partition(PartitionId::new(0, 1));
1196        assert!(removed);
1197        assert_eq!(state.partition_count(), 1);
1198        assert_eq!(state.local_watermark(), 5000);
1199    }
1200
1201    #[test]
1202    fn test_global_collector_creation() {
1203        let collector = GlobalWatermarkCollector::new(4);
1204        assert_eq!(collector.num_cores(), 4);
1205        assert_eq!(collector.global_watermark(), None);
1206    }
1207
1208    #[test]
1209    fn test_global_collector_update() {
1210        let mut collector = GlobalWatermarkCollector::new(3);
1211
1212        collector.update_core(0, 5000);
1213        collector.update_core(1, 3000);
1214        let wm = collector.update_core(2, 4000);
1215
1216        // Global is min of all (3000)
1217        assert_eq!(wm, Some(Watermark::new(3000)));
1218        assert_eq!(collector.global_watermark(), Some(Watermark::new(3000)));
1219    }
1220
1221    #[test]
1222    fn test_global_collector_advancement() {
1223        let mut collector = GlobalWatermarkCollector::new(2);
1224
1225        collector.update_core(0, 5000);
1226        collector.update_core(1, 3000);
1227
1228        assert_eq!(collector.global_watermark(), Some(Watermark::new(3000)));
1229
1230        // Advance the slower core
1231        let wm = collector.update_core(1, 4000);
1232        assert_eq!(wm, Some(Watermark::new(4000)));
1233    }
1234
1235    #[test]
1236    fn test_global_collector_no_regression() {
1237        let mut collector = GlobalWatermarkCollector::new(2);
1238
1239        collector.update_core(0, 5000);
1240        collector.update_core(1, 4000);
1241
1242        // Try to go backwards (should not regress)
1243        let wm = collector.update_core(1, 3000);
1244        assert!(wm.is_none());
1245        // The core watermark is updated, but global doesn't regress
1246        assert_eq!(collector.core_watermark(1), Some(3000));
1247    }
1248}