Skip to main content

oxirs_stream/watermark/
mod.rs

1//! # Stream Watermarking
2//!
3//! Event-time watermarks for out-of-order stream processing.
4//!
5//! ## Key Types
6//!
7//! - [`StreamWatermark`]: An immutable watermark value with source attribution
8//! - [`WatermarkGenerator`]: Advances a watermark as events are observed
9//! - [`WatermarkAligner`]: Computes the global watermark across multiple sources (minimum)
10//! - [`LateDataHandler`]: Decides what to do with events that arrive after the watermark
11//!
12//! ## Submodules
13//!
14//! - [`propagation`]: Operator-graph watermark propagation with monotonicity
15//!   enforcement.
16//! - [`late_handler`]: Allowed-lateness budget tracker and side-output router
17//!   (the [`LateDataHandler`] / [`LateDataPolicy`] / [`LateDataDecision`]
18//!   types are re-exported from there for convenience).
19
20pub mod late_handler;
21pub mod propagation;
22
23pub use late_handler::{AllowedLatenessTracker, SideOutputRouter};
24pub use propagation::{OperatorId, OperatorWatermarkAggregator, WatermarkPropagator};
25
26use std::collections::HashMap;
27
28// ─── StreamWatermark ──────────────────────────────────────────────────────────
29
30/// An event-time watermark generated by a single source.
31///
32/// The watermark asserts that all future events from `source_id` will have
33/// timestamps `>= timestamp`.
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct StreamWatermark {
36    /// Milliseconds since Unix epoch.
37    pub timestamp: i64,
38    /// Identifier of the source that produced this watermark.
39    pub source_id: String,
40    /// Number of times this generator has advanced the watermark.
41    pub advance_count: u64,
42}
43
44impl StreamWatermark {
45    pub fn new(timestamp: i64, source_id: impl Into<String>, advance_count: u64) -> Self {
46        Self {
47            timestamp,
48            source_id: source_id.into(),
49            advance_count,
50        }
51    }
52}
53
54// ─── WatermarkGenerator ───────────────────────────────────────────────────────
55
56/// Generates watermarks by tracking the maximum observed event timestamp.
57///
58/// A watermark is emitted at `max_event_ts - max_out_of_order_ms` after every
59/// `advance_threshold` events. This provides a balance between latency and
60/// tolerance for out-of-order events.
61pub struct WatermarkGenerator {
62    max_out_of_order_ms: i64,
63    current_max_ts: i64,
64    advance_threshold: usize,
65    event_count: usize,
66    advance_count: u64,
67    source_id: String,
68}
69
70impl WatermarkGenerator {
71    /// Create a generator that tolerates up to `max_out_of_order_ms` of lateness.
72    ///
73    /// Watermarks are emitted every 100 events by default.
74    pub fn new(max_out_of_order_ms: i64) -> Self {
75        Self {
76            max_out_of_order_ms,
77            current_max_ts: i64::MIN,
78            advance_threshold: 100,
79            event_count: 0,
80            advance_count: 0,
81            source_id: "default".to_string(),
82        }
83    }
84
85    /// Set the event count after which a watermark is emitted.
86    pub fn with_advance_threshold(mut self, threshold: usize) -> Self {
87        self.advance_threshold = threshold;
88        self
89    }
90
91    /// Set the source identifier embedded in emitted watermarks.
92    pub fn with_source_id(mut self, source_id: impl Into<String>) -> Self {
93        self.source_id = source_id.into();
94        self
95    }
96
97    /// Observe an event with `event_timestamp_ms`.
98    ///
99    /// Returns `Some(watermark)` when the threshold is reached, otherwise `None`.
100    pub fn observe(&mut self, event_timestamp_ms: i64) -> Option<StreamWatermark> {
101        if event_timestamp_ms > self.current_max_ts {
102            self.current_max_ts = event_timestamp_ms;
103        }
104        self.event_count += 1;
105
106        if self.event_count >= self.advance_threshold {
107            self.event_count = 0;
108            self.advance_count += 1;
109            Some(StreamWatermark::new(
110                self.current_watermark(),
111                self.source_id.clone(),
112                self.advance_count,
113            ))
114        } else {
115            None
116        }
117    }
118
119    /// Current watermark timestamp: `max_event_ts - max_out_of_order_ms`.
120    ///
121    /// Returns `i64::MIN` before any event has been observed.
122    pub fn current_watermark(&self) -> i64 {
123        if self.current_max_ts == i64::MIN {
124            i64::MIN
125        } else {
126            self.current_max_ts - self.max_out_of_order_ms
127        }
128    }
129
130    /// Return `true` if `event_timestamp_ms` is before the current watermark.
131    pub fn is_late(&self, event_timestamp_ms: i64) -> bool {
132        event_timestamp_ms < self.current_watermark()
133    }
134}
135
136// ─── WatermarkAligner ─────────────────────────────────────────────────────────
137
138/// Tracks per-source watermarks and computes a global minimum watermark.
139///
140/// A pipeline must wait for *all* sources to advance before it can safely
141/// process events up to a given timestamp.
142pub struct WatermarkAligner {
143    sources: HashMap<String, i64>,
144}
145
146impl WatermarkAligner {
147    /// Create an empty aligner with no tracked sources.
148    pub fn new() -> Self {
149        Self {
150            sources: HashMap::new(),
151        }
152    }
153
154    /// Update (or register) the watermark for `source_id`.
155    pub fn update(&mut self, source_id: &str, watermark_ms: i64) {
156        self.sources.insert(source_id.to_string(), watermark_ms);
157    }
158
159    /// The global watermark: the minimum across all tracked sources.
160    ///
161    /// Returns `i64::MIN` when no sources are tracked.
162    pub fn global_watermark(&self) -> i64 {
163        self.sources.values().copied().min().unwrap_or(i64::MIN)
164    }
165
166    /// Number of sources currently tracked.
167    pub fn source_count(&self) -> usize {
168        self.sources.len()
169    }
170
171    /// Return `true` iff *all* tracked sources have watermarks strictly
172    /// greater than `timestamp_ms`.
173    pub fn all_beyond(&self, timestamp_ms: i64) -> bool {
174        if self.sources.is_empty() {
175            return false;
176        }
177        self.sources.values().all(|&wm| wm > timestamp_ms)
178    }
179}
180
181impl Default for WatermarkAligner {
182    fn default() -> Self {
183        Self::new()
184    }
185}
186
187// ─── LateDataPolicy / LateDataDecision ────────────────────────────────────────
188
189/// What the pipeline should do with events that arrive after the watermark.
190#[derive(Debug, Clone)]
191pub enum LateDataPolicy {
192    /// Discard the event.
193    Drop,
194    /// Re-assign the event to the nearest open window that can still accept it,
195    /// provided the event is at most `max_lateness_ms` behind the watermark.
196    Reassign { max_lateness_ms: i64 },
197    /// Route the event to a named side-output channel for separate handling.
198    SideOutput { channel: String },
199}
200
201/// Result returned by [`LateDataHandler::handle`].
202#[derive(Debug, Clone, PartialEq)]
203pub enum LateDataDecision {
204    /// Process the event in the main stream.
205    Process,
206    /// Drop the event.
207    Drop,
208    /// Re-assign the event to the given timestamp.
209    Reassign(i64),
210    /// Route the event to a side-output channel.
211    SideOutput,
212}
213
214/// Applies the configured [`LateDataPolicy`] to late events.
215pub struct LateDataHandler {
216    pub policy: LateDataPolicy,
217    pub late_event_count: u64,
218}
219
220impl LateDataHandler {
221    /// Create a handler with the given policy.
222    pub fn new(policy: LateDataPolicy) -> Self {
223        Self {
224            policy,
225            late_event_count: 0,
226        }
227    }
228
229    /// Decide what to do with an event at `event_ts_ms` given that the
230    /// current watermark is `watermark_ms`.
231    ///
232    /// If `event_ts_ms >= watermark_ms` the event is not late and `Process` is returned.
233    pub fn handle(&mut self, event_ts_ms: i64, watermark_ms: i64) -> LateDataDecision {
234        if event_ts_ms >= watermark_ms {
235            return LateDataDecision::Process;
236        }
237        self.late_event_count += 1;
238        match &self.policy {
239            LateDataPolicy::Drop => LateDataDecision::Drop,
240            LateDataPolicy::Reassign { max_lateness_ms } => {
241                let lateness = watermark_ms - event_ts_ms;
242                if lateness <= *max_lateness_ms {
243                    LateDataDecision::Reassign(watermark_ms)
244                } else {
245                    LateDataDecision::Drop
246                }
247            }
248            LateDataPolicy::SideOutput { .. } => LateDataDecision::SideOutput,
249        }
250    }
251}
252
253// ─── Tests ────────────────────────────────────────────────────────────────────
254
255#[cfg(test)]
256mod tests {
257    use super::*;
258
259    // ── WatermarkGenerator ─────────────────────────────────────────────────────
260
261    #[test]
262    fn test_generator_initial_watermark_is_min() {
263        let gen = WatermarkGenerator::new(500);
264        assert_eq!(gen.current_watermark(), i64::MIN);
265    }
266
267    #[test]
268    fn test_generator_no_watermark_before_threshold() {
269        let mut gen = WatermarkGenerator::new(100).with_advance_threshold(10);
270        for ts in 0..9 {
271            let wm = gen.observe(ts * 100);
272            assert!(wm.is_none(), "should not emit before threshold");
273        }
274    }
275
276    #[test]
277    fn test_generator_emits_at_threshold() {
278        let mut gen = WatermarkGenerator::new(100).with_advance_threshold(5);
279        for ts in 0..5 {
280            let _wm = gen.observe(ts * 1000);
281        }
282        let wm = gen.observe(5000);
283        // threshold reached at observe #5 (indices 0–4 fill 5 slots, #5 is #6 call)
284        // let's just call 5 times to trigger
285        let mut gen2 = WatermarkGenerator::new(100).with_advance_threshold(5);
286        let mut last_wm = None;
287        for i in 0..5 {
288            last_wm = gen2.observe(i * 1000);
289        }
290        assert!(last_wm.is_some(), "watermark must be emitted at threshold");
291        let w = last_wm.unwrap();
292        // max ts = 4000, watermark = 4000 - 100 = 3900
293        assert_eq!(w.timestamp, 3900);
294        assert_eq!(w.advance_count, 1);
295        // suppress unused variable warning
296        drop(wm);
297    }
298
299    #[test]
300    fn test_generator_advance_count_increments() {
301        let mut gen = WatermarkGenerator::new(0).with_advance_threshold(3);
302        for i in 0..6 {
303            gen.observe(i * 1000);
304        }
305        assert_eq!(gen.advance_count, 2);
306    }
307
308    #[test]
309    fn test_generator_current_watermark_formula() {
310        let mut gen = WatermarkGenerator::new(200).with_advance_threshold(100);
311        gen.observe(5000);
312        assert_eq!(gen.current_watermark(), 4800); // 5000 - 200
313    }
314
315    #[test]
316    fn test_generator_is_late_true() {
317        let mut gen = WatermarkGenerator::new(100).with_advance_threshold(100);
318        gen.observe(10_000);
319        // watermark = 9900; event at 9000 is late
320        assert!(gen.is_late(9000));
321    }
322
323    #[test]
324    fn test_generator_is_late_false() {
325        let mut gen = WatermarkGenerator::new(100).with_advance_threshold(100);
326        gen.observe(10_000);
327        // watermark = 9900; event at 9900 is not late (== watermark)
328        assert!(!gen.is_late(9900));
329        // event at 10_000 is future
330        assert!(!gen.is_late(10_000));
331    }
332
333    #[test]
334    fn test_generator_source_id() {
335        let mut gen = WatermarkGenerator::new(0)
336            .with_advance_threshold(1)
337            .with_source_id("sensor-42");
338        let wm = gen.observe(1000).expect("should emit");
339        assert_eq!(wm.source_id, "sensor-42");
340    }
341
342    #[test]
343    fn test_generator_max_ts_tracks_maximum() {
344        let mut gen = WatermarkGenerator::new(0).with_advance_threshold(100);
345        // Observe out-of-order events
346        gen.observe(500);
347        gen.observe(1000);
348        gen.observe(300);
349        assert_eq!(gen.current_max_ts, 1000);
350        assert_eq!(gen.current_watermark(), 1000);
351    }
352
353    // ── WatermarkAligner ───────────────────────────────────────────────────────
354
355    #[test]
356    fn test_aligner_empty_returns_min() {
357        let aligner = WatermarkAligner::new();
358        assert_eq!(aligner.global_watermark(), i64::MIN);
359        assert_eq!(aligner.source_count(), 0);
360    }
361
362    #[test]
363    fn test_aligner_single_source() {
364        let mut aligner = WatermarkAligner::new();
365        aligner.update("src-A", 5000);
366        assert_eq!(aligner.global_watermark(), 5000);
367        assert_eq!(aligner.source_count(), 1);
368    }
369
370    #[test]
371    fn test_aligner_global_is_minimum() {
372        let mut aligner = WatermarkAligner::new();
373        aligner.update("src-A", 5000);
374        aligner.update("src-B", 3000);
375        aligner.update("src-C", 7000);
376        assert_eq!(aligner.global_watermark(), 3000);
377    }
378
379    #[test]
380    fn test_aligner_all_beyond_true() {
381        let mut aligner = WatermarkAligner::new();
382        aligner.update("A", 10_000);
383        aligner.update("B", 12_000);
384        assert!(aligner.all_beyond(9_999));
385    }
386
387    #[test]
388    fn test_aligner_all_beyond_false_one_lagging() {
389        let mut aligner = WatermarkAligner::new();
390        aligner.update("A", 10_000);
391        aligner.update("B", 5_000);
392        assert!(!aligner.all_beyond(6_000));
393    }
394
395    #[test]
396    fn test_aligner_all_beyond_empty_is_false() {
397        let aligner = WatermarkAligner::new();
398        assert!(!aligner.all_beyond(0));
399    }
400
401    #[test]
402    fn test_aligner_update_overwrites() {
403        let mut aligner = WatermarkAligner::new();
404        aligner.update("src", 1000);
405        aligner.update("src", 9000);
406        assert_eq!(aligner.global_watermark(), 9000);
407        assert_eq!(aligner.source_count(), 1);
408    }
409
410    // ── LateDataHandler ────────────────────────────────────────────────────────
411
412    #[test]
413    fn test_late_handler_on_time_event_is_process() {
414        let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
415        let decision = handler.handle(5000, 4000);
416        assert_eq!(decision, LateDataDecision::Process);
417        assert_eq!(handler.late_event_count, 0);
418    }
419
420    #[test]
421    fn test_late_handler_drop_policy() {
422        let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
423        let decision = handler.handle(1000, 5000);
424        assert_eq!(decision, LateDataDecision::Drop);
425        assert_eq!(handler.late_event_count, 1);
426    }
427
428    #[test]
429    fn test_late_handler_reassign_within_max_lateness() {
430        let mut handler = LateDataHandler::new(LateDataPolicy::Reassign {
431            max_lateness_ms: 2000,
432        });
433        // event at 4000, watermark at 5000 → lateness = 1000 ≤ 2000 → Reassign
434        let decision = handler.handle(4000, 5000);
435        assert_eq!(decision, LateDataDecision::Reassign(5000));
436    }
437
438    #[test]
439    fn test_late_handler_reassign_exceeds_max_lateness_drops() {
440        let mut handler = LateDataHandler::new(LateDataPolicy::Reassign {
441            max_lateness_ms: 500,
442        });
443        // event at 3000, watermark at 5000 → lateness = 2000 > 500 → Drop
444        let decision = handler.handle(3000, 5000);
445        assert_eq!(decision, LateDataDecision::Drop);
446        assert_eq!(handler.late_event_count, 1);
447    }
448
449    #[test]
450    fn test_late_handler_side_output_policy() {
451        let mut handler = LateDataHandler::new(LateDataPolicy::SideOutput {
452            channel: "late-events".to_string(),
453        });
454        let decision = handler.handle(1000, 5000);
455        assert_eq!(decision, LateDataDecision::SideOutput);
456        assert_eq!(handler.late_event_count, 1);
457    }
458
459    #[test]
460    fn test_late_handler_counts_accumulate() {
461        let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
462        handler.handle(100, 1000);
463        handler.handle(200, 1000);
464        handler.handle(5000, 1000); // on-time: watermark = 1000, event at 5000
465        assert_eq!(handler.late_event_count, 2);
466    }
467
468    #[test]
469    fn test_stream_watermark_equality() {
470        let w1 = StreamWatermark::new(1000, "s1", 1);
471        let w2 = StreamWatermark::new(1000, "s1", 1);
472        let w3 = StreamWatermark::new(2000, "s1", 1);
473        assert_eq!(w1, w2);
474        assert_ne!(w1, w3);
475    }
476}