Skip to main content

oxirs_stream/watermark/
mod.rs

1//! # Stream Watermarking
2//!
3//! Event-time watermarks for out-of-order stream processing.
4//!
5//! ## Key Types
6//!
7//! - [`StreamWatermark`]: An immutable watermark value with source attribution
8//! - [`WatermarkGenerator`]: Advances a watermark as events are observed
9//! - [`WatermarkAligner`]: Computes the global watermark across multiple sources (minimum)
10//! - [`LateDataHandler`]: Decides what to do with events that arrive after the watermark
11
12use std::collections::HashMap;
13
14// ─── StreamWatermark ──────────────────────────────────────────────────────────
15
16/// An event-time watermark generated by a single source.
17///
18/// The watermark asserts that all future events from `source_id` will have
19/// timestamps `>= timestamp`.
20#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct StreamWatermark {
22    /// Milliseconds since Unix epoch.
23    pub timestamp: i64,
24    /// Identifier of the source that produced this watermark.
25    pub source_id: String,
26    /// Number of times this generator has advanced the watermark.
27    pub advance_count: u64,
28}
29
30impl StreamWatermark {
31    pub fn new(timestamp: i64, source_id: impl Into<String>, advance_count: u64) -> Self {
32        Self {
33            timestamp,
34            source_id: source_id.into(),
35            advance_count,
36        }
37    }
38}
39
40// ─── WatermarkGenerator ───────────────────────────────────────────────────────
41
42/// Generates watermarks by tracking the maximum observed event timestamp.
43///
44/// A watermark is emitted at `max_event_ts - max_out_of_order_ms` after every
45/// `advance_threshold` events. This provides a balance between latency and
46/// tolerance for out-of-order events.
47pub struct WatermarkGenerator {
48    max_out_of_order_ms: i64,
49    current_max_ts: i64,
50    advance_threshold: usize,
51    event_count: usize,
52    advance_count: u64,
53    source_id: String,
54}
55
56impl WatermarkGenerator {
57    /// Create a generator that tolerates up to `max_out_of_order_ms` of lateness.
58    ///
59    /// Watermarks are emitted every 100 events by default.
60    pub fn new(max_out_of_order_ms: i64) -> Self {
61        Self {
62            max_out_of_order_ms,
63            current_max_ts: i64::MIN,
64            advance_threshold: 100,
65            event_count: 0,
66            advance_count: 0,
67            source_id: "default".to_string(),
68        }
69    }
70
71    /// Set the event count after which a watermark is emitted.
72    pub fn with_advance_threshold(mut self, threshold: usize) -> Self {
73        self.advance_threshold = threshold;
74        self
75    }
76
77    /// Set the source identifier embedded in emitted watermarks.
78    pub fn with_source_id(mut self, source_id: impl Into<String>) -> Self {
79        self.source_id = source_id.into();
80        self
81    }
82
83    /// Observe an event with `event_timestamp_ms`.
84    ///
85    /// Returns `Some(watermark)` when the threshold is reached, otherwise `None`.
86    pub fn observe(&mut self, event_timestamp_ms: i64) -> Option<StreamWatermark> {
87        if event_timestamp_ms > self.current_max_ts {
88            self.current_max_ts = event_timestamp_ms;
89        }
90        self.event_count += 1;
91
92        if self.event_count >= self.advance_threshold {
93            self.event_count = 0;
94            self.advance_count += 1;
95            Some(StreamWatermark::new(
96                self.current_watermark(),
97                self.source_id.clone(),
98                self.advance_count,
99            ))
100        } else {
101            None
102        }
103    }
104
105    /// Current watermark timestamp: `max_event_ts - max_out_of_order_ms`.
106    ///
107    /// Returns `i64::MIN` before any event has been observed.
108    pub fn current_watermark(&self) -> i64 {
109        if self.current_max_ts == i64::MIN {
110            i64::MIN
111        } else {
112            self.current_max_ts - self.max_out_of_order_ms
113        }
114    }
115
116    /// Return `true` if `event_timestamp_ms` is before the current watermark.
117    pub fn is_late(&self, event_timestamp_ms: i64) -> bool {
118        event_timestamp_ms < self.current_watermark()
119    }
120}
121
122// ─── WatermarkAligner ─────────────────────────────────────────────────────────
123
124/// Tracks per-source watermarks and computes a global minimum watermark.
125///
126/// A pipeline must wait for *all* sources to advance before it can safely
127/// process events up to a given timestamp.
128pub struct WatermarkAligner {
129    sources: HashMap<String, i64>,
130}
131
132impl WatermarkAligner {
133    /// Create an empty aligner with no tracked sources.
134    pub fn new() -> Self {
135        Self {
136            sources: HashMap::new(),
137        }
138    }
139
140    /// Update (or register) the watermark for `source_id`.
141    pub fn update(&mut self, source_id: &str, watermark_ms: i64) {
142        self.sources.insert(source_id.to_string(), watermark_ms);
143    }
144
145    /// The global watermark: the minimum across all tracked sources.
146    ///
147    /// Returns `i64::MIN` when no sources are tracked.
148    pub fn global_watermark(&self) -> i64 {
149        self.sources.values().copied().min().unwrap_or(i64::MIN)
150    }
151
152    /// Number of sources currently tracked.
153    pub fn source_count(&self) -> usize {
154        self.sources.len()
155    }
156
157    /// Return `true` iff *all* tracked sources have watermarks strictly
158    /// greater than `timestamp_ms`.
159    pub fn all_beyond(&self, timestamp_ms: i64) -> bool {
160        if self.sources.is_empty() {
161            return false;
162        }
163        self.sources.values().all(|&wm| wm > timestamp_ms)
164    }
165}
166
167impl Default for WatermarkAligner {
168    fn default() -> Self {
169        Self::new()
170    }
171}
172
173// ─── LateDataPolicy / LateDataDecision ────────────────────────────────────────
174
175/// What the pipeline should do with events that arrive after the watermark.
176#[derive(Debug, Clone)]
177pub enum LateDataPolicy {
178    /// Discard the event.
179    Drop,
180    /// Re-assign the event to the nearest open window that can still accept it,
181    /// provided the event is at most `max_lateness_ms` behind the watermark.
182    Reassign { max_lateness_ms: i64 },
183    /// Route the event to a named side-output channel for separate handling.
184    SideOutput { channel: String },
185}
186
187/// Result returned by [`LateDataHandler::handle`].
188#[derive(Debug, Clone, PartialEq)]
189pub enum LateDataDecision {
190    /// Process the event in the main stream.
191    Process,
192    /// Drop the event.
193    Drop,
194    /// Re-assign the event to the given timestamp.
195    Reassign(i64),
196    /// Route the event to a side-output channel.
197    SideOutput,
198}
199
200/// Applies the configured [`LateDataPolicy`] to late events.
201pub struct LateDataHandler {
202    pub policy: LateDataPolicy,
203    pub late_event_count: u64,
204}
205
206impl LateDataHandler {
207    /// Create a handler with the given policy.
208    pub fn new(policy: LateDataPolicy) -> Self {
209        Self {
210            policy,
211            late_event_count: 0,
212        }
213    }
214
215    /// Decide what to do with an event at `event_ts_ms` given that the
216    /// current watermark is `watermark_ms`.
217    ///
218    /// If `event_ts_ms >= watermark_ms` the event is not late and `Process` is returned.
219    pub fn handle(&mut self, event_ts_ms: i64, watermark_ms: i64) -> LateDataDecision {
220        if event_ts_ms >= watermark_ms {
221            return LateDataDecision::Process;
222        }
223        self.late_event_count += 1;
224        match &self.policy {
225            LateDataPolicy::Drop => LateDataDecision::Drop,
226            LateDataPolicy::Reassign { max_lateness_ms } => {
227                let lateness = watermark_ms - event_ts_ms;
228                if lateness <= *max_lateness_ms {
229                    LateDataDecision::Reassign(watermark_ms)
230                } else {
231                    LateDataDecision::Drop
232                }
233            }
234            LateDataPolicy::SideOutput { .. } => LateDataDecision::SideOutput,
235        }
236    }
237}
238
239// ─── Tests ────────────────────────────────────────────────────────────────────
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244
245    // ── WatermarkGenerator ─────────────────────────────────────────────────────
246
247    #[test]
248    fn test_generator_initial_watermark_is_min() {
249        let gen = WatermarkGenerator::new(500);
250        assert_eq!(gen.current_watermark(), i64::MIN);
251    }
252
253    #[test]
254    fn test_generator_no_watermark_before_threshold() {
255        let mut gen = WatermarkGenerator::new(100).with_advance_threshold(10);
256        for ts in 0..9 {
257            let wm = gen.observe(ts * 100);
258            assert!(wm.is_none(), "should not emit before threshold");
259        }
260    }
261
262    #[test]
263    fn test_generator_emits_at_threshold() {
264        let mut gen = WatermarkGenerator::new(100).with_advance_threshold(5);
265        for ts in 0..5 {
266            let _wm = gen.observe(ts * 1000);
267        }
268        let wm = gen.observe(5000);
269        // threshold reached at observe #5 (indices 0–4 fill 5 slots, #5 is #6 call)
270        // let's just call 5 times to trigger
271        let mut gen2 = WatermarkGenerator::new(100).with_advance_threshold(5);
272        let mut last_wm = None;
273        for i in 0..5 {
274            last_wm = gen2.observe(i * 1000);
275        }
276        assert!(last_wm.is_some(), "watermark must be emitted at threshold");
277        let w = last_wm.unwrap();
278        // max ts = 4000, watermark = 4000 - 100 = 3900
279        assert_eq!(w.timestamp, 3900);
280        assert_eq!(w.advance_count, 1);
281        // suppress unused variable warning
282        drop(wm);
283    }
284
285    #[test]
286    fn test_generator_advance_count_increments() {
287        let mut gen = WatermarkGenerator::new(0).with_advance_threshold(3);
288        for i in 0..6 {
289            gen.observe(i * 1000);
290        }
291        assert_eq!(gen.advance_count, 2);
292    }
293
294    #[test]
295    fn test_generator_current_watermark_formula() {
296        let mut gen = WatermarkGenerator::new(200).with_advance_threshold(100);
297        gen.observe(5000);
298        assert_eq!(gen.current_watermark(), 4800); // 5000 - 200
299    }
300
301    #[test]
302    fn test_generator_is_late_true() {
303        let mut gen = WatermarkGenerator::new(100).with_advance_threshold(100);
304        gen.observe(10_000);
305        // watermark = 9900; event at 9000 is late
306        assert!(gen.is_late(9000));
307    }
308
309    #[test]
310    fn test_generator_is_late_false() {
311        let mut gen = WatermarkGenerator::new(100).with_advance_threshold(100);
312        gen.observe(10_000);
313        // watermark = 9900; event at 9900 is not late (== watermark)
314        assert!(!gen.is_late(9900));
315        // event at 10_000 is future
316        assert!(!gen.is_late(10_000));
317    }
318
319    #[test]
320    fn test_generator_source_id() {
321        let mut gen = WatermarkGenerator::new(0)
322            .with_advance_threshold(1)
323            .with_source_id("sensor-42");
324        let wm = gen.observe(1000).expect("should emit");
325        assert_eq!(wm.source_id, "sensor-42");
326    }
327
328    #[test]
329    fn test_generator_max_ts_tracks_maximum() {
330        let mut gen = WatermarkGenerator::new(0).with_advance_threshold(100);
331        // Observe out-of-order events
332        gen.observe(500);
333        gen.observe(1000);
334        gen.observe(300);
335        assert_eq!(gen.current_max_ts, 1000);
336        assert_eq!(gen.current_watermark(), 1000);
337    }
338
339    // ── WatermarkAligner ───────────────────────────────────────────────────────
340
341    #[test]
342    fn test_aligner_empty_returns_min() {
343        let aligner = WatermarkAligner::new();
344        assert_eq!(aligner.global_watermark(), i64::MIN);
345        assert_eq!(aligner.source_count(), 0);
346    }
347
348    #[test]
349    fn test_aligner_single_source() {
350        let mut aligner = WatermarkAligner::new();
351        aligner.update("src-A", 5000);
352        assert_eq!(aligner.global_watermark(), 5000);
353        assert_eq!(aligner.source_count(), 1);
354    }
355
356    #[test]
357    fn test_aligner_global_is_minimum() {
358        let mut aligner = WatermarkAligner::new();
359        aligner.update("src-A", 5000);
360        aligner.update("src-B", 3000);
361        aligner.update("src-C", 7000);
362        assert_eq!(aligner.global_watermark(), 3000);
363    }
364
365    #[test]
366    fn test_aligner_all_beyond_true() {
367        let mut aligner = WatermarkAligner::new();
368        aligner.update("A", 10_000);
369        aligner.update("B", 12_000);
370        assert!(aligner.all_beyond(9_999));
371    }
372
373    #[test]
374    fn test_aligner_all_beyond_false_one_lagging() {
375        let mut aligner = WatermarkAligner::new();
376        aligner.update("A", 10_000);
377        aligner.update("B", 5_000);
378        assert!(!aligner.all_beyond(6_000));
379    }
380
381    #[test]
382    fn test_aligner_all_beyond_empty_is_false() {
383        let aligner = WatermarkAligner::new();
384        assert!(!aligner.all_beyond(0));
385    }
386
387    #[test]
388    fn test_aligner_update_overwrites() {
389        let mut aligner = WatermarkAligner::new();
390        aligner.update("src", 1000);
391        aligner.update("src", 9000);
392        assert_eq!(aligner.global_watermark(), 9000);
393        assert_eq!(aligner.source_count(), 1);
394    }
395
396    // ── LateDataHandler ────────────────────────────────────────────────────────
397
398    #[test]
399    fn test_late_handler_on_time_event_is_process() {
400        let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
401        let decision = handler.handle(5000, 4000);
402        assert_eq!(decision, LateDataDecision::Process);
403        assert_eq!(handler.late_event_count, 0);
404    }
405
406    #[test]
407    fn test_late_handler_drop_policy() {
408        let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
409        let decision = handler.handle(1000, 5000);
410        assert_eq!(decision, LateDataDecision::Drop);
411        assert_eq!(handler.late_event_count, 1);
412    }
413
414    #[test]
415    fn test_late_handler_reassign_within_max_lateness() {
416        let mut handler = LateDataHandler::new(LateDataPolicy::Reassign {
417            max_lateness_ms: 2000,
418        });
419        // event at 4000, watermark at 5000 → lateness = 1000 ≤ 2000 → Reassign
420        let decision = handler.handle(4000, 5000);
421        assert_eq!(decision, LateDataDecision::Reassign(5000));
422    }
423
424    #[test]
425    fn test_late_handler_reassign_exceeds_max_lateness_drops() {
426        let mut handler = LateDataHandler::new(LateDataPolicy::Reassign {
427            max_lateness_ms: 500,
428        });
429        // event at 3000, watermark at 5000 → lateness = 2000 > 500 → Drop
430        let decision = handler.handle(3000, 5000);
431        assert_eq!(decision, LateDataDecision::Drop);
432        assert_eq!(handler.late_event_count, 1);
433    }
434
435    #[test]
436    fn test_late_handler_side_output_policy() {
437        let mut handler = LateDataHandler::new(LateDataPolicy::SideOutput {
438            channel: "late-events".to_string(),
439        });
440        let decision = handler.handle(1000, 5000);
441        assert_eq!(decision, LateDataDecision::SideOutput);
442        assert_eq!(handler.late_event_count, 1);
443    }
444
445    #[test]
446    fn test_late_handler_counts_accumulate() {
447        let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
448        handler.handle(100, 1000);
449        handler.handle(200, 1000);
450        handler.handle(5000, 1000); // on-time: watermark = 1000, event at 5000
451        assert_eq!(handler.late_event_count, 2);
452    }
453
454    #[test]
455    fn test_stream_watermark_equality() {
456        let w1 = StreamWatermark::new(1000, "s1", 1);
457        let w2 = StreamWatermark::new(1000, "s1", 1);
458        let w3 = StreamWatermark::new(2000, "s1", 1);
459        assert_eq!(w1, w2);
460        assert_ne!(w1, w3);
461    }
462}