Skip to main content

laminar_core/time/
mod.rs

1//! # Time Module
2//!
3//! Event time processing, watermarks, and timer management.
4//!
5//! ## Concepts
6//!
7//! - **Event Time**: Timestamp when the event actually occurred
8//! - **Processing Time**: Timestamp when the event is processed
9//! - **Watermark**: Assertion that no events with timestamp < watermark will arrive
10//! - **Timer**: Scheduled callback for window triggers or timeouts
11//!
12//! ## Event Time Extraction
13//!
14//! Use [`EventTimeExtractor`] to extract timestamps from Arrow `RecordBatch` columns:
15//!
16//! ```ignore
17//! use laminar_core::time::{EventTimeExtractor, TimestampFormat, ExtractionMode};
18//!
19//! // Extract millisecond timestamps from a column
20//! let mut extractor = EventTimeExtractor::from_column("event_time", TimestampFormat::UnixMillis);
21//!
22//! // Use Max mode for multi-row batches
23//! let extractor = extractor.with_mode(ExtractionMode::Max);
24//!
25//! let timestamp = extractor.extract(&batch)?;
26//! ```
27//!
28//! ## Watermark Generation
29//!
30//! Use watermark generators to track event-time progress:
31//!
32//! ```rust
33//! use laminar_core::time::{BoundedOutOfOrdernessGenerator, WatermarkGenerator, Watermark};
34//!
35//! // Allow events to be up to 1 second late
36//! let mut generator = BoundedOutOfOrdernessGenerator::new(1000);
37//!
38//! // Process events
39//! let wm = generator.on_event(5000);
40//! assert_eq!(wm, Some(Watermark::new(4000))); // 5000 - 1000
41//! ```
42//!
43//! ## Multi-Source Watermark Tracking
44//!
45//! For operators with multiple inputs, use [`WatermarkTracker`]:
46//!
47//! ```rust
48//! use laminar_core::time::{WatermarkTracker, Watermark};
49//!
50//! let mut tracker = WatermarkTracker::new(2);
51//! tracker.update_source(0, 5000);
52//! tracker.update_source(1, 3000);
53//!
54//! // Combined watermark is the minimum
55//! assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
56//! ```
57//!
58//! ## Per-Partition Watermark Tracking
59//!
60//! For Kafka sources with multiple partitions, use [`PartitionedWatermarkTracker`]:
61//!
62//! ```rust
63//! use laminar_core::time::{PartitionedWatermarkTracker, PartitionId, Watermark};
64//!
65//! let mut tracker = PartitionedWatermarkTracker::new();
66//!
67//! // Register a Kafka source with 4 partitions
68//! tracker.register_source(0, 4);
69//!
70//! // Update ALL partitions (all must have valid watermarks)
71//! tracker.update_partition(PartitionId::new(0, 0), 5000);
72//! tracker.update_partition(PartitionId::new(0, 1), 3000);
73//! tracker.update_partition(PartitionId::new(0, 2), 4000);
74//! tracker.update_partition(PartitionId::new(0, 3), 4500);
75//!
76//! // Combined watermark is minimum across active partitions
77//! assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
78//!
79//! // Mark slow partition as idle to allow progress
80//! tracker.mark_partition_idle(PartitionId::new(0, 1));
81//! assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
82//! ```
83//!
84//! ## Per-Key Watermark Tracking (F065)
85//!
86//! For multi-tenant workloads or scenarios with significant event-time skew between
87//! keys, use [`KeyedWatermarkTracker`] to achieve 99%+ accuracy vs 63-67% with global:
88//!
89//! ```rust
90//! use laminar_core::time::{KeyedWatermarkTracker, KeyedWatermarkConfig, Watermark};
91//! use std::time::Duration;
92//!
93//! let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_secs(5));
94//! let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
95//!
96//! // Fast tenant advances quickly
97//! tracker.update("tenant_a".to_string(), 15_000);
98//!
99//! // Slow tenant at earlier time
100//! tracker.update("tenant_b".to_string(), 5_000);
101//!
102//! // Per-key watermarks differ - each key has independent tracking
103//! assert_eq!(tracker.watermark_for_key(&"tenant_a".to_string()), Some(10_000));
104//! assert_eq!(tracker.watermark_for_key(&"tenant_b".to_string()), Some(0));
105//!
106//! // Events for tenant_b at 3000 are NOT late (their key watermark is 0)
107//! assert!(!tracker.is_late(&"tenant_b".to_string(), 3000));
108//!
109//! // But events for tenant_a at 3000 ARE late (their key watermark is 10000)
110//! assert!(tracker.is_late(&"tenant_a".to_string(), 3000));
111//! ```
112//!
113//! ## Watermark Alignment Groups (F066)
114//!
115//! For stream-stream joins and multi-source operators, use [`WatermarkAlignmentGroup`]
116//! to prevent unbounded state growth when sources have different processing speeds:
117//!
118//! ```rust
119//! use laminar_core::time::{
120//!     WatermarkAlignmentGroup, AlignmentGroupConfig, AlignmentGroupId,
121//!     EnforcementMode, AlignmentAction,
122//! };
123//! use std::time::Duration;
124//!
125//! let config = AlignmentGroupConfig::new("orders-payments")
126//!     .with_max_drift(Duration::from_secs(300)); // 5 minute max drift
127//!
128//! let mut group = WatermarkAlignmentGroup::new(config);
129//! group.register_source(0); // orders
130//! group.register_source(1); // payments
131//!
132//! // Both start at 0
133//! group.report_watermark(0, 0);
134//! group.report_watermark(1, 0);
135//!
136//! // Orders advances within limit - OK
137//! let action = group.report_watermark(0, 200_000); // 200 seconds
138//! assert_eq!(action, AlignmentAction::Continue);
139//!
140//! // Orders advances beyond limit - PAUSED
141//! let action = group.report_watermark(0, 400_000); // 400 seconds (drift > 300)
142//! assert_eq!(action, AlignmentAction::Pause);
143//! ```
144
145mod alignment_group;
146mod event_time;
147mod keyed_watermark;
148mod partitioned_watermark;
149mod watermark;
150
151pub use alignment_group::{
152    AlignmentAction, AlignmentError, AlignmentGroupConfig, AlignmentGroupCoordinator,
153    AlignmentGroupId, AlignmentGroupMetrics, AlignmentSourceState, EnforcementMode,
154    WatermarkAlignmentGroup,
155};
156
157pub use event_time::{
158    EventTimeError, EventTimeExtractor, ExtractionMode, TimestampField, TimestampFormat,
159};
160
161pub use keyed_watermark::{
162    KeyEvictionPolicy, KeyWatermarkState, KeyedWatermarkConfig, KeyedWatermarkError,
163    KeyedWatermarkMetrics, KeyedWatermarkTracker, KeyedWatermarkTrackerWithLateHandling,
164};
165
166pub use partitioned_watermark::{
167    CoreWatermarkState, GlobalWatermarkCollector, PartitionId, PartitionWatermarkState,
168    PartitionedWatermarkMetrics, PartitionedWatermarkTracker, WatermarkError,
169};
170
171pub use watermark::{
172    AscendingTimestampsGenerator, BoundedOutOfOrdernessGenerator, MeteredGenerator,
173    PeriodicGenerator, PunctuatedGenerator, SourceProvidedGenerator, WatermarkGenerator,
174    WatermarkMetrics, WatermarkTracker,
175};
176
177use smallvec::SmallVec;
178use std::cmp::Ordering;
179use std::collections::BinaryHeap;
180
181/// Timer key type optimized for window IDs (16 bytes).
182///
183/// Uses `SmallVec` to avoid heap allocation for keys up to 16 bytes,
184/// which covers the common case of `WindowId` keys.
185pub type TimerKey = SmallVec<[u8; 16]>;
186
187/// Collection type for fired timers.
188///
189/// Uses `SmallVec` to avoid heap allocation when few timers fire per poll.
190/// Size 8 covers most practical cases where timers fire in small batches.
191pub type FiredTimersVec = SmallVec<[TimerRegistration; 8]>;
192
193/// A watermark indicating event time progress.
194///
195/// Watermarks are monotonically increasing assertions that no events with
196/// timestamps earlier than the watermark will arrive. They are used to:
197///
198/// - Trigger window emissions
199/// - Detect late events
200/// - Coordinate time progress across operators
201///
202/// # Example
203///
204/// ```rust
205/// use laminar_core::time::Watermark;
206///
207/// let watermark = Watermark::new(1000);
208///
209/// // Check if an event is late
210/// assert!(watermark.is_late(999));  // Before watermark
211/// assert!(!watermark.is_late(1000)); // At watermark
212/// assert!(!watermark.is_late(1001)); // After watermark
213/// ```
214#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
215pub struct Watermark(pub i64);
216
217impl Watermark {
218    /// Creates a new watermark with the given timestamp.
219    #[inline]
220    #[must_use]
221    pub fn new(timestamp: i64) -> Self {
222        Self(timestamp)
223    }
224
225    /// Returns the watermark timestamp in milliseconds.
226    #[inline]
227    #[must_use]
228    pub fn timestamp(&self) -> i64 {
229        self.0
230    }
231
232    /// Checks if an event is late relative to this watermark.
233    ///
234    /// An event is considered late if its timestamp is strictly less than
235    /// the watermark timestamp.
236    #[inline]
237    #[must_use]
238    pub fn is_late(&self, event_time: i64) -> bool {
239        event_time < self.0
240    }
241
242    /// Returns the minimum (earlier) of two watermarks.
243    #[must_use]
244    pub fn min(self, other: Self) -> Self {
245        Self(self.0.min(other.0))
246    }
247
248    /// Returns the maximum (later) of two watermarks.
249    #[must_use]
250    pub fn max(self, other: Self) -> Self {
251        Self(self.0.max(other.0))
252    }
253}
254
255impl Default for Watermark {
256    fn default() -> Self {
257        Self(i64::MIN)
258    }
259}
260
261impl From<i64> for Watermark {
262    fn from(timestamp: i64) -> Self {
263        Self(timestamp)
264    }
265}
266
267impl From<Watermark> for i64 {
268    fn from(watermark: Watermark) -> Self {
269        watermark.0
270    }
271}
272
273/// A timer registration for delayed processing.
274///
275/// Timers are used by operators to schedule future actions, typically for
276/// window triggering or timeouts.
277#[derive(Debug, Clone, PartialEq, Eq)]
278pub struct TimerRegistration {
279    /// Unique timer ID
280    pub id: u64,
281    /// Scheduled timestamp (event time, in milliseconds)
282    pub timestamp: i64,
283    /// Timer key (for keyed operators).
284    /// Uses `TimerKey` (`SmallVec`) to avoid heap allocation for keys up to 16 bytes.
285    pub key: Option<TimerKey>,
286    /// The index of the operator that registered this timer
287    pub operator_index: Option<usize>,
288}
289
290impl Ord for TimerRegistration {
291    fn cmp(&self, other: &Self) -> Ordering {
292        // Reverse ordering for min-heap behavior (earliest first)
293        match other.timestamp.cmp(&self.timestamp) {
294            Ordering::Equal => other.id.cmp(&self.id),
295            ord => ord,
296        }
297    }
298}
299
300impl PartialOrd for TimerRegistration {
301    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
302        Some(self.cmp(other))
303    }
304}
305
306/// Timer service for scheduling and managing timers.
307///
308/// The timer service maintains a priority queue of timer registrations,
309/// ordered by timestamp. Operators can register timers to be fired at
310/// specific event times.
311///
312/// # Example
313///
314/// ```rust
315/// use laminar_core::time::{TimerService, TimerKey};
316///
317/// let mut service = TimerService::new();
318///
319/// // Register timers at different times
320/// let id1 = service.register_timer(100, None, None);
321/// let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), None);
322///
323/// // Poll for timers that should fire at time 75
324/// let fired = service.poll_timers(75);
325/// assert_eq!(fired.len(), 1);
326/// assert_eq!(fired[0].id, id2); // Timer at t=50 fires first
327/// ```
328pub struct TimerService {
329    timers: BinaryHeap<TimerRegistration>,
330    next_timer_id: u64,
331}
332
333impl TimerService {
334    /// Creates a new timer service.
335    #[must_use]
336    pub fn new() -> Self {
337        Self {
338            timers: BinaryHeap::new(),
339            next_timer_id: 0,
340        }
341    }
342
343    /// Registers a new timer.
344    ///
345    /// Returns the unique timer ID that can be used to cancel the timer.
346    ///
347    /// # Arguments
348    ///
349    /// * `timestamp` - The event time at which the timer should fire
350    /// * `key` - Optional key for keyed operators
351    /// * `operator_index` - Optional index of the operator registering the timer(must match the index in the reactor)
352    pub fn register_timer(
353        &mut self,
354        timestamp: i64,
355        key: Option<TimerKey>,
356        operator_index: Option<usize>,
357    ) -> u64 {
358        let id = self.next_timer_id;
359        self.next_timer_id += 1;
360
361        self.timers.push(TimerRegistration {
362            id,
363            timestamp,
364            key,
365            operator_index,
366        });
367
368        id
369    }
370
371    /// Polls for timers that should fire at or before the given timestamp.
372    ///
373    /// Returns all timers with timestamps <= `current_time`, in order.
374    /// Uses `FiredTimersVec` (`SmallVec`) to avoid heap allocation when few timers fire.
375    ///
376    /// # Panics
377    ///
378    /// This function should not panic under normal circumstances. The internal
379    /// `expect` is only called after verifying the heap is not empty via `peek`.
380    #[inline]
381    pub fn poll_timers(&mut self, current_time: i64) -> FiredTimersVec {
382        let mut fired = FiredTimersVec::new();
383
384        while let Some(timer) = self.timers.peek() {
385            if timer.timestamp <= current_time {
386                // SAFETY: We just peeked and confirmed the heap is not empty
387                fired.push(self.timers.pop().expect("heap should not be empty"));
388            } else {
389                break;
390            }
391        }
392
393        fired
394    }
395
396    /// Cancels a timer by ID.
397    ///
398    /// Returns `true` if the timer was found and cancelled.
399    pub fn cancel_timer(&mut self, id: u64) -> bool {
400        let count_before = self.timers.len();
401        self.timers.retain(|t| t.id != id);
402        self.timers.len() < count_before
403    }
404
405    /// Returns the number of pending timers.
406    #[must_use]
407    pub fn pending_count(&self) -> usize {
408        self.timers.len()
409    }
410
411    /// Returns the timestamp of the next timer to fire, if any.
412    #[must_use]
413    pub fn next_timer_timestamp(&self) -> Option<i64> {
414        self.timers.peek().map(|t| t.timestamp)
415    }
416
417    /// Clears all pending timers.
418    pub fn clear(&mut self) {
419        self.timers.clear();
420    }
421}
422
423impl Default for TimerService {
424    fn default() -> Self {
425        Self::new()
426    }
427}
428
429/// Errors that can occur in time operations.
430#[derive(Debug, thiserror::Error)]
431pub enum TimeError {
432    /// Invalid timestamp value
433    #[error("Invalid timestamp: {0}")]
434    InvalidTimestamp(i64),
435
436    /// Timer not found
437    #[error("Timer not found: {0}")]
438    TimerNotFound(u64),
439
440    /// Watermark regression (going backwards)
441    #[error("Watermark regression: current={current}, new={new}")]
442    WatermarkRegression {
443        /// Current watermark value
444        current: i64,
445        /// Attempted new watermark value
446        new: i64,
447    },
448}
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453
454    #[test]
455    fn test_watermark_creation() {
456        let watermark = Watermark::new(1000);
457        assert_eq!(watermark.timestamp(), 1000);
458    }
459
460    #[test]
461    fn test_watermark_late_detection() {
462        let watermark = Watermark::new(1000);
463        assert!(watermark.is_late(999));
464        assert!(!watermark.is_late(1000));
465        assert!(!watermark.is_late(1001));
466    }
467
468    #[test]
469    fn test_watermark_min_max() {
470        let w1 = Watermark::new(1000);
471        let w2 = Watermark::new(2000);
472
473        assert_eq!(w1.min(w2), Watermark::new(1000));
474        assert_eq!(w1.max(w2), Watermark::new(2000));
475    }
476
477    #[test]
478    fn test_watermark_ordering() {
479        let w1 = Watermark::new(1000);
480        let w2 = Watermark::new(2000);
481
482        assert!(w1 < w2);
483        assert!(w2 > w1);
484        assert_eq!(w1, Watermark::new(1000));
485    }
486
487    #[test]
488    fn test_watermark_conversions() {
489        let wm = Watermark::from(1000i64);
490        assert_eq!(wm.timestamp(), 1000);
491
492        let ts: i64 = wm.into();
493        assert_eq!(ts, 1000);
494    }
495
496    #[test]
497    fn test_watermark_default() {
498        let wm = Watermark::default();
499        assert_eq!(wm.timestamp(), i64::MIN);
500    }
501
502    #[test]
503    fn test_timer_service_creation() {
504        let service = TimerService::new();
505        assert_eq!(service.pending_count(), 0);
506        assert_eq!(service.next_timer_timestamp(), None);
507    }
508
509    #[test]
510    fn test_timer_registration() {
511        let mut service = TimerService::new();
512
513        let id1 = service.register_timer(100, None, None);
514        let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), Some(1));
515
516        assert_eq!(service.pending_count(), 2);
517        assert_ne!(id1, id2);
518    }
519
520    #[test]
521    fn test_timer_poll_order() {
522        let mut service = TimerService::new();
523
524        let id1 = service.register_timer(100, None, None);
525        let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), Some(0));
526        let _id3 = service.register_timer(150, None, None);
527
528        // Poll at time 75 - should get timer at t=50
529        let fired = service.poll_timers(75);
530        assert_eq!(fired.len(), 1);
531        assert_eq!(fired[0].id, id2);
532        assert_eq!(fired[0].key, Some(TimerKey::from_slice(&[1, 2, 3])));
533
534        // Poll at time 125 - should get timer at t=100
535        let fired = service.poll_timers(125);
536        assert_eq!(fired.len(), 1);
537        assert_eq!(fired[0].id, id1);
538
539        // Poll at time 200 - should get timer at t=150
540        let fired = service.poll_timers(200);
541        assert_eq!(fired.len(), 1);
542
543        assert_eq!(service.pending_count(), 0);
544    }
545
546    #[test]
547    fn test_timer_poll_multiple() {
548        let mut service = TimerService::new();
549
550        service.register_timer(50, None, None);
551        service.register_timer(75, None, None);
552        service.register_timer(100, None, None);
553
554        // Poll at time 80 - should get timers at t=50 and t=75
555        let fired = service.poll_timers(80);
556        assert_eq!(fired.len(), 2);
557        // Should be in timestamp order
558        assert_eq!(fired[0].timestamp, 50);
559        assert_eq!(fired[1].timestamp, 75);
560    }
561
562    #[test]
563    fn test_timer_cancel() {
564        let mut service = TimerService::new();
565
566        let id1 = service.register_timer(100, None, None);
567        let id2 = service.register_timer(200, None, None);
568
569        assert!(service.cancel_timer(id1));
570        assert_eq!(service.pending_count(), 1);
571
572        // Should not be able to cancel again
573        assert!(!service.cancel_timer(id1));
574
575        // Cancel the remaining timer
576        assert!(service.cancel_timer(id2));
577        assert_eq!(service.pending_count(), 0);
578    }
579
580    #[test]
581    fn test_timer_next_timestamp() {
582        let mut service = TimerService::new();
583
584        assert_eq!(service.next_timer_timestamp(), None);
585
586        service.register_timer(100, None, None);
587        assert_eq!(service.next_timer_timestamp(), Some(100));
588
589        service.register_timer(50, None, None);
590        assert_eq!(service.next_timer_timestamp(), Some(50));
591    }
592
593    #[test]
594    fn test_timer_clear() {
595        let mut service = TimerService::new();
596
597        service.register_timer(100, None, None);
598        service.register_timer(200, None, None);
599        service.register_timer(300, None, None);
600
601        service.clear();
602        assert_eq!(service.pending_count(), 0);
603        assert_eq!(service.next_timer_timestamp(), None);
604    }
605
606    #[test]
607    fn test_bounded_watermark_generator() {
608        let mut generator = BoundedOutOfOrdernessGenerator::new(100);
609
610        // First event
611        let wm1 = generator.on_event(1000);
612        assert_eq!(wm1, Some(Watermark::new(900)));
613
614        // Out of order event - no new watermark
615        let wm2 = generator.on_event(800);
616        assert!(wm2.is_none());
617
618        // New max timestamp
619        let wm3 = generator.on_event(1200);
620        assert_eq!(wm3, Some(Watermark::new(1100)));
621    }
622
623    #[test]
624    fn test_ascending_watermark_generator() {
625        let mut generator = AscendingTimestampsGenerator::new();
626
627        let wm1 = generator.on_event(1000);
628        assert_eq!(wm1, Some(Watermark::new(1000)));
629
630        let wm2 = generator.on_event(2000);
631        assert_eq!(wm2, Some(Watermark::new(2000)));
632
633        // Out of order - no watermark
634        let wm3 = generator.on_event(1500);
635        assert_eq!(wm3, None);
636    }
637
638    #[test]
639    fn test_watermark_tracker_basic() {
640        let mut tracker = WatermarkTracker::new(2);
641
642        tracker.update_source(0, 1000);
643        let wm = tracker.update_source(1, 500);
644
645        assert_eq!(wm, Some(Watermark::new(500)));
646    }
647
648    #[test]
649    fn test_watermark_tracker_idle() {
650        let mut tracker = WatermarkTracker::new(2);
651
652        tracker.update_source(0, 5000);
653        tracker.update_source(1, 1000);
654
655        // Mark slow source as idle
656        let wm = tracker.mark_idle(1);
657        assert_eq!(wm, Some(Watermark::new(5000)));
658
659        assert!(tracker.is_idle(1));
660        assert!(!tracker.is_idle(0));
661    }
662}