laminar_core/time/mod.rs
1//! # Time Module
2//!
3//! Event time processing, watermarks, and timer management.
4//!
5//! ## Concepts
6//!
7//! - **Event Time**: Timestamp when the event actually occurred
8//! - **Processing Time**: Timestamp when the event is processed
9//! - **Watermark**: Assertion that no events with timestamp < watermark will arrive
10//! - **Timer**: Scheduled callback for window triggers or timeouts
11//!
12//! ## Event Time Extraction
13//!
14//! Use [`EventTimeExtractor`] to extract timestamps from Arrow `RecordBatch` columns:
15//!
16//! ```ignore
17//! use laminar_core::time::{EventTimeExtractor, TimestampFormat, ExtractionMode};
18//!
19//! // Extract millisecond timestamps from a column
20//! let mut extractor = EventTimeExtractor::from_column("event_time", TimestampFormat::UnixMillis);
21//!
22//! // Use Max mode for multi-row batches
23//! let extractor = extractor.with_mode(ExtractionMode::Max);
24//!
25//! let timestamp = extractor.extract(&batch)?;
26//! ```
27//!
28//! ## Watermark Generation
29//!
30//! Use watermark generators to track event-time progress:
31//!
32//! ```rust
33//! use laminar_core::time::{BoundedOutOfOrdernessGenerator, WatermarkGenerator, Watermark};
34//!
35//! // Allow events to be up to 1 second late
36//! let mut generator = BoundedOutOfOrdernessGenerator::new(1000);
37//!
38//! // Process events
39//! let wm = generator.on_event(5000);
40//! assert_eq!(wm, Some(Watermark::new(4000))); // 5000 - 1000
41//! ```
42//!
43//! ## Multi-Source Watermark Tracking
44//!
45//! For operators with multiple inputs, use [`WatermarkTracker`]:
46//!
47//! ```rust
48//! use laminar_core::time::{WatermarkTracker, Watermark};
49//!
50//! let mut tracker = WatermarkTracker::new(2);
51//! tracker.update_source(0, 5000);
52//! tracker.update_source(1, 3000);
53//!
54//! // Combined watermark is the minimum
55//! assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
56//! ```
57//!
58//! ## Per-Partition Watermark Tracking
59//!
60//! For Kafka sources with multiple partitions, use [`PartitionedWatermarkTracker`]:
61//!
62//! ```rust
63//! use laminar_core::time::{PartitionedWatermarkTracker, PartitionId, Watermark};
64//!
65//! let mut tracker = PartitionedWatermarkTracker::new();
66//!
67//! // Register a Kafka source with 4 partitions
68//! tracker.register_source(0, 4);
69//!
70//! // Update ALL partitions (all must have valid watermarks)
71//! tracker.update_partition(PartitionId::new(0, 0), 5000);
72//! tracker.update_partition(PartitionId::new(0, 1), 3000);
73//! tracker.update_partition(PartitionId::new(0, 2), 4000);
74//! tracker.update_partition(PartitionId::new(0, 3), 4500);
75//!
76//! // Combined watermark is minimum across active partitions
77//! assert_eq!(tracker.current_watermark(), Some(Watermark::new(3000)));
78//!
79//! // Mark slow partition as idle to allow progress
80//! tracker.mark_partition_idle(PartitionId::new(0, 1));
81//! assert_eq!(tracker.current_watermark(), Some(Watermark::new(4000)));
82//! ```
83//!
84//! ## Per-Key Watermark Tracking (F065)
85//!
86//! For multi-tenant workloads or scenarios with significant event-time skew between
87//! keys, use [`KeyedWatermarkTracker`] to achieve 99%+ accuracy vs 63-67% with global:
88//!
89//! ```rust
90//! use laminar_core::time::{KeyedWatermarkTracker, KeyedWatermarkConfig, Watermark};
91//! use std::time::Duration;
92//!
93//! let config = KeyedWatermarkConfig::with_bounded_delay(Duration::from_secs(5));
94//! let mut tracker: KeyedWatermarkTracker<String> = KeyedWatermarkTracker::new(config);
95//!
96//! // Fast tenant advances quickly
97//! tracker.update("tenant_a".to_string(), 15_000);
98//!
99//! // Slow tenant at earlier time
100//! tracker.update("tenant_b".to_string(), 5_000);
101//!
102//! // Per-key watermarks differ - each key has independent tracking
103//! assert_eq!(tracker.watermark_for_key(&"tenant_a".to_string()), Some(10_000));
104//! assert_eq!(tracker.watermark_for_key(&"tenant_b".to_string()), Some(0));
105//!
106//! // Events for tenant_b at 3000 are NOT late (their key watermark is 0)
107//! assert!(!tracker.is_late(&"tenant_b".to_string(), 3000));
108//!
109//! // But events for tenant_a at 3000 ARE late (their key watermark is 10000)
110//! assert!(tracker.is_late(&"tenant_a".to_string(), 3000));
111//! ```
112//!
113//! ## Watermark Alignment Groups (F066)
114//!
115//! For stream-stream joins and multi-source operators, use [`WatermarkAlignmentGroup`]
116//! to prevent unbounded state growth when sources have different processing speeds:
117//!
118//! ```rust
119//! use laminar_core::time::{
120//! WatermarkAlignmentGroup, AlignmentGroupConfig, AlignmentGroupId,
121//! EnforcementMode, AlignmentAction,
122//! };
123//! use std::time::Duration;
124//!
125//! let config = AlignmentGroupConfig::new("orders-payments")
126//! .with_max_drift(Duration::from_secs(300)); // 5 minute max drift
127//!
128//! let mut group = WatermarkAlignmentGroup::new(config);
129//! group.register_source(0); // orders
130//! group.register_source(1); // payments
131//!
132//! // Both start at 0
133//! group.report_watermark(0, 0);
134//! group.report_watermark(1, 0);
135//!
136//! // Orders advances within limit - OK
137//! let action = group.report_watermark(0, 200_000); // 200 seconds
138//! assert_eq!(action, AlignmentAction::Continue);
139//!
140//! // Orders advances beyond limit - PAUSED
141//! let action = group.report_watermark(0, 400_000); // 400 seconds (drift > 300)
142//! assert_eq!(action, AlignmentAction::Pause);
143//! ```
144
145mod alignment_group;
146mod event_time;
147mod keyed_watermark;
148mod partitioned_watermark;
149mod watermark;
150
151pub use alignment_group::{
152 AlignmentAction, AlignmentError, AlignmentGroupConfig, AlignmentGroupCoordinator,
153 AlignmentGroupId, AlignmentGroupMetrics, AlignmentSourceState, EnforcementMode,
154 WatermarkAlignmentGroup,
155};
156
157pub use event_time::{
158 EventTimeError, EventTimeExtractor, ExtractionMode, TimestampField, TimestampFormat,
159};
160
161pub use keyed_watermark::{
162 KeyEvictionPolicy, KeyWatermarkState, KeyedWatermarkConfig, KeyedWatermarkError,
163 KeyedWatermarkMetrics, KeyedWatermarkTracker, KeyedWatermarkTrackerWithLateHandling,
164};
165
166pub use partitioned_watermark::{
167 CoreWatermarkState, GlobalWatermarkCollector, PartitionId, PartitionWatermarkState,
168 PartitionedWatermarkMetrics, PartitionedWatermarkTracker, WatermarkError,
169};
170
171pub use watermark::{
172 AscendingTimestampsGenerator, BoundedOutOfOrdernessGenerator, MeteredGenerator,
173 PeriodicGenerator, PunctuatedGenerator, SourceProvidedGenerator, WatermarkGenerator,
174 WatermarkMetrics, WatermarkTracker,
175};
176
177use smallvec::SmallVec;
178use std::cmp::Ordering;
179use std::collections::BinaryHeap;
180
181/// Timer key type optimized for window IDs (16 bytes).
182///
183/// Uses `SmallVec` to avoid heap allocation for keys up to 16 bytes,
184/// which covers the common case of `WindowId` keys.
185pub type TimerKey = SmallVec<[u8; 16]>;
186
187/// Collection type for fired timers.
188///
189/// Uses `SmallVec` to avoid heap allocation when few timers fire per poll.
190/// Size 8 covers most practical cases where timers fire in small batches.
191pub type FiredTimersVec = SmallVec<[TimerRegistration; 8]>;
192
193/// A watermark indicating event time progress.
194///
195/// Watermarks are monotonically increasing assertions that no events with
196/// timestamps earlier than the watermark will arrive. They are used to:
197///
198/// - Trigger window emissions
199/// - Detect late events
200/// - Coordinate time progress across operators
201///
202/// # Example
203///
204/// ```rust
205/// use laminar_core::time::Watermark;
206///
207/// let watermark = Watermark::new(1000);
208///
209/// // Check if an event is late
210/// assert!(watermark.is_late(999)); // Before watermark
211/// assert!(!watermark.is_late(1000)); // At watermark
212/// assert!(!watermark.is_late(1001)); // After watermark
213/// ```
214#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
215pub struct Watermark(pub i64);
216
217impl Watermark {
218 /// Creates a new watermark with the given timestamp.
219 #[inline]
220 #[must_use]
221 pub fn new(timestamp: i64) -> Self {
222 Self(timestamp)
223 }
224
225 /// Returns the watermark timestamp in milliseconds.
226 #[inline]
227 #[must_use]
228 pub fn timestamp(&self) -> i64 {
229 self.0
230 }
231
232 /// Checks if an event is late relative to this watermark.
233 ///
234 /// An event is considered late if its timestamp is strictly less than
235 /// the watermark timestamp.
236 #[inline]
237 #[must_use]
238 pub fn is_late(&self, event_time: i64) -> bool {
239 event_time < self.0
240 }
241
242 /// Returns the minimum (earlier) of two watermarks.
243 #[must_use]
244 pub fn min(self, other: Self) -> Self {
245 Self(self.0.min(other.0))
246 }
247
248 /// Returns the maximum (later) of two watermarks.
249 #[must_use]
250 pub fn max(self, other: Self) -> Self {
251 Self(self.0.max(other.0))
252 }
253}
254
255impl Default for Watermark {
256 fn default() -> Self {
257 Self(i64::MIN)
258 }
259}
260
261impl From<i64> for Watermark {
262 fn from(timestamp: i64) -> Self {
263 Self(timestamp)
264 }
265}
266
267impl From<Watermark> for i64 {
268 fn from(watermark: Watermark) -> Self {
269 watermark.0
270 }
271}
272
273/// A timer registration for delayed processing.
274///
275/// Timers are used by operators to schedule future actions, typically for
276/// window triggering or timeouts.
277#[derive(Debug, Clone, PartialEq, Eq)]
278pub struct TimerRegistration {
279 /// Unique timer ID
280 pub id: u64,
281 /// Scheduled timestamp (event time, in milliseconds)
282 pub timestamp: i64,
283 /// Timer key (for keyed operators).
284 /// Uses `TimerKey` (`SmallVec`) to avoid heap allocation for keys up to 16 bytes.
285 pub key: Option<TimerKey>,
286 /// The index of the operator that registered this timer
287 pub operator_index: Option<usize>,
288}
289
290impl Ord for TimerRegistration {
291 fn cmp(&self, other: &Self) -> Ordering {
292 // Reverse ordering for min-heap behavior (earliest first)
293 match other.timestamp.cmp(&self.timestamp) {
294 Ordering::Equal => other.id.cmp(&self.id),
295 ord => ord,
296 }
297 }
298}
299
300impl PartialOrd for TimerRegistration {
301 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
302 Some(self.cmp(other))
303 }
304}
305
306/// Timer service for scheduling and managing timers.
307///
308/// The timer service maintains a priority queue of timer registrations,
309/// ordered by timestamp. Operators can register timers to be fired at
310/// specific event times.
311///
312/// # Example
313///
314/// ```rust
315/// use laminar_core::time::{TimerService, TimerKey};
316///
317/// let mut service = TimerService::new();
318///
319/// // Register timers at different times
320/// let id1 = service.register_timer(100, None, None);
321/// let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), None);
322///
323/// // Poll for timers that should fire at time 75
324/// let fired = service.poll_timers(75);
325/// assert_eq!(fired.len(), 1);
326/// assert_eq!(fired[0].id, id2); // Timer at t=50 fires first
327/// ```
328pub struct TimerService {
329 timers: BinaryHeap<TimerRegistration>,
330 next_timer_id: u64,
331}
332
333impl TimerService {
334 /// Creates a new timer service.
335 #[must_use]
336 pub fn new() -> Self {
337 Self {
338 timers: BinaryHeap::new(),
339 next_timer_id: 0,
340 }
341 }
342
343 /// Registers a new timer.
344 ///
345 /// Returns the unique timer ID that can be used to cancel the timer.
346 ///
347 /// # Arguments
348 ///
349 /// * `timestamp` - The event time at which the timer should fire
350 /// * `key` - Optional key for keyed operators
351 /// * `operator_index` - Optional index of the operator registering the timer(must match the index in the reactor)
352 pub fn register_timer(
353 &mut self,
354 timestamp: i64,
355 key: Option<TimerKey>,
356 operator_index: Option<usize>,
357 ) -> u64 {
358 let id = self.next_timer_id;
359 self.next_timer_id += 1;
360
361 self.timers.push(TimerRegistration {
362 id,
363 timestamp,
364 key,
365 operator_index,
366 });
367
368 id
369 }
370
371 /// Polls for timers that should fire at or before the given timestamp.
372 ///
373 /// Returns all timers with timestamps <= `current_time`, in order.
374 /// Uses `FiredTimersVec` (`SmallVec`) to avoid heap allocation when few timers fire.
375 ///
376 /// # Panics
377 ///
378 /// This function should not panic under normal circumstances. The internal
379 /// `expect` is only called after verifying the heap is not empty via `peek`.
380 #[inline]
381 pub fn poll_timers(&mut self, current_time: i64) -> FiredTimersVec {
382 let mut fired = FiredTimersVec::new();
383
384 while let Some(timer) = self.timers.peek() {
385 if timer.timestamp <= current_time {
386 // SAFETY: We just peeked and confirmed the heap is not empty
387 fired.push(self.timers.pop().expect("heap should not be empty"));
388 } else {
389 break;
390 }
391 }
392
393 fired
394 }
395
396 /// Cancels a timer by ID.
397 ///
398 /// Returns `true` if the timer was found and cancelled.
399 pub fn cancel_timer(&mut self, id: u64) -> bool {
400 let count_before = self.timers.len();
401 self.timers.retain(|t| t.id != id);
402 self.timers.len() < count_before
403 }
404
405 /// Returns the number of pending timers.
406 #[must_use]
407 pub fn pending_count(&self) -> usize {
408 self.timers.len()
409 }
410
411 /// Returns the timestamp of the next timer to fire, if any.
412 #[must_use]
413 pub fn next_timer_timestamp(&self) -> Option<i64> {
414 self.timers.peek().map(|t| t.timestamp)
415 }
416
417 /// Clears all pending timers.
418 pub fn clear(&mut self) {
419 self.timers.clear();
420 }
421}
422
423impl Default for TimerService {
424 fn default() -> Self {
425 Self::new()
426 }
427}
428
429/// Errors that can occur in time operations.
430#[derive(Debug, thiserror::Error)]
431pub enum TimeError {
432 /// Invalid timestamp value
433 #[error("Invalid timestamp: {0}")]
434 InvalidTimestamp(i64),
435
436 /// Timer not found
437 #[error("Timer not found: {0}")]
438 TimerNotFound(u64),
439
440 /// Watermark regression (going backwards)
441 #[error("Watermark regression: current={current}, new={new}")]
442 WatermarkRegression {
443 /// Current watermark value
444 current: i64,
445 /// Attempted new watermark value
446 new: i64,
447 },
448}
449
450#[cfg(test)]
451mod tests {
452 use super::*;
453
454 #[test]
455 fn test_watermark_creation() {
456 let watermark = Watermark::new(1000);
457 assert_eq!(watermark.timestamp(), 1000);
458 }
459
460 #[test]
461 fn test_watermark_late_detection() {
462 let watermark = Watermark::new(1000);
463 assert!(watermark.is_late(999));
464 assert!(!watermark.is_late(1000));
465 assert!(!watermark.is_late(1001));
466 }
467
468 #[test]
469 fn test_watermark_min_max() {
470 let w1 = Watermark::new(1000);
471 let w2 = Watermark::new(2000);
472
473 assert_eq!(w1.min(w2), Watermark::new(1000));
474 assert_eq!(w1.max(w2), Watermark::new(2000));
475 }
476
477 #[test]
478 fn test_watermark_ordering() {
479 let w1 = Watermark::new(1000);
480 let w2 = Watermark::new(2000);
481
482 assert!(w1 < w2);
483 assert!(w2 > w1);
484 assert_eq!(w1, Watermark::new(1000));
485 }
486
487 #[test]
488 fn test_watermark_conversions() {
489 let wm = Watermark::from(1000i64);
490 assert_eq!(wm.timestamp(), 1000);
491
492 let ts: i64 = wm.into();
493 assert_eq!(ts, 1000);
494 }
495
496 #[test]
497 fn test_watermark_default() {
498 let wm = Watermark::default();
499 assert_eq!(wm.timestamp(), i64::MIN);
500 }
501
502 #[test]
503 fn test_timer_service_creation() {
504 let service = TimerService::new();
505 assert_eq!(service.pending_count(), 0);
506 assert_eq!(service.next_timer_timestamp(), None);
507 }
508
509 #[test]
510 fn test_timer_registration() {
511 let mut service = TimerService::new();
512
513 let id1 = service.register_timer(100, None, None);
514 let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), Some(1));
515
516 assert_eq!(service.pending_count(), 2);
517 assert_ne!(id1, id2);
518 }
519
520 #[test]
521 fn test_timer_poll_order() {
522 let mut service = TimerService::new();
523
524 let id1 = service.register_timer(100, None, None);
525 let id2 = service.register_timer(50, Some(TimerKey::from_slice(&[1, 2, 3])), Some(0));
526 let _id3 = service.register_timer(150, None, None);
527
528 // Poll at time 75 - should get timer at t=50
529 let fired = service.poll_timers(75);
530 assert_eq!(fired.len(), 1);
531 assert_eq!(fired[0].id, id2);
532 assert_eq!(fired[0].key, Some(TimerKey::from_slice(&[1, 2, 3])));
533
534 // Poll at time 125 - should get timer at t=100
535 let fired = service.poll_timers(125);
536 assert_eq!(fired.len(), 1);
537 assert_eq!(fired[0].id, id1);
538
539 // Poll at time 200 - should get timer at t=150
540 let fired = service.poll_timers(200);
541 assert_eq!(fired.len(), 1);
542
543 assert_eq!(service.pending_count(), 0);
544 }
545
546 #[test]
547 fn test_timer_poll_multiple() {
548 let mut service = TimerService::new();
549
550 service.register_timer(50, None, None);
551 service.register_timer(75, None, None);
552 service.register_timer(100, None, None);
553
554 // Poll at time 80 - should get timers at t=50 and t=75
555 let fired = service.poll_timers(80);
556 assert_eq!(fired.len(), 2);
557 // Should be in timestamp order
558 assert_eq!(fired[0].timestamp, 50);
559 assert_eq!(fired[1].timestamp, 75);
560 }
561
562 #[test]
563 fn test_timer_cancel() {
564 let mut service = TimerService::new();
565
566 let id1 = service.register_timer(100, None, None);
567 let id2 = service.register_timer(200, None, None);
568
569 assert!(service.cancel_timer(id1));
570 assert_eq!(service.pending_count(), 1);
571
572 // Should not be able to cancel again
573 assert!(!service.cancel_timer(id1));
574
575 // Cancel the remaining timer
576 assert!(service.cancel_timer(id2));
577 assert_eq!(service.pending_count(), 0);
578 }
579
580 #[test]
581 fn test_timer_next_timestamp() {
582 let mut service = TimerService::new();
583
584 assert_eq!(service.next_timer_timestamp(), None);
585
586 service.register_timer(100, None, None);
587 assert_eq!(service.next_timer_timestamp(), Some(100));
588
589 service.register_timer(50, None, None);
590 assert_eq!(service.next_timer_timestamp(), Some(50));
591 }
592
593 #[test]
594 fn test_timer_clear() {
595 let mut service = TimerService::new();
596
597 service.register_timer(100, None, None);
598 service.register_timer(200, None, None);
599 service.register_timer(300, None, None);
600
601 service.clear();
602 assert_eq!(service.pending_count(), 0);
603 assert_eq!(service.next_timer_timestamp(), None);
604 }
605
606 #[test]
607 fn test_bounded_watermark_generator() {
608 let mut generator = BoundedOutOfOrdernessGenerator::new(100);
609
610 // First event
611 let wm1 = generator.on_event(1000);
612 assert_eq!(wm1, Some(Watermark::new(900)));
613
614 // Out of order event - no new watermark
615 let wm2 = generator.on_event(800);
616 assert!(wm2.is_none());
617
618 // New max timestamp
619 let wm3 = generator.on_event(1200);
620 assert_eq!(wm3, Some(Watermark::new(1100)));
621 }
622
623 #[test]
624 fn test_ascending_watermark_generator() {
625 let mut generator = AscendingTimestampsGenerator::new();
626
627 let wm1 = generator.on_event(1000);
628 assert_eq!(wm1, Some(Watermark::new(1000)));
629
630 let wm2 = generator.on_event(2000);
631 assert_eq!(wm2, Some(Watermark::new(2000)));
632
633 // Out of order - no watermark
634 let wm3 = generator.on_event(1500);
635 assert_eq!(wm3, None);
636 }
637
638 #[test]
639 fn test_watermark_tracker_basic() {
640 let mut tracker = WatermarkTracker::new(2);
641
642 tracker.update_source(0, 1000);
643 let wm = tracker.update_source(1, 500);
644
645 assert_eq!(wm, Some(Watermark::new(500)));
646 }
647
648 #[test]
649 fn test_watermark_tracker_idle() {
650 let mut tracker = WatermarkTracker::new(2);
651
652 tracker.update_source(0, 5000);
653 tracker.update_source(1, 1000);
654
655 // Mark slow source as idle
656 let wm = tracker.mark_idle(1);
657 assert_eq!(wm, Some(Watermark::new(5000)));
658
659 assert!(tracker.is_idle(1));
660 assert!(!tracker.is_idle(0));
661 }
662}