rust_rule_engine/streaming/
watermark.rs

1//! Watermark and Late Data Handling
2//!
3//! This module provides watermark generation and late data handling for
4//! stream processing with out-of-order events.
5
6use super::event::StreamEvent;
7use std::collections::VecDeque;
8use std::time::{Duration, SystemTime, UNIX_EPOCH};
9
10/// Watermark representing event-time progress
11#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
12pub struct Watermark {
13    /// Timestamp in milliseconds since UNIX epoch
14    pub timestamp: u64,
15}
16
17impl Watermark {
18    /// Create a new watermark with the given timestamp
19    pub fn new(timestamp: u64) -> Self {
20        Self { timestamp }
21    }
22
23    /// Create a watermark from system time
24    pub fn from_system_time(time: SystemTime) -> Self {
25        let timestamp = time
26            .duration_since(UNIX_EPOCH)
27            .unwrap_or(Duration::ZERO)
28            .as_millis() as u64;
29        Self { timestamp }
30    }
31
32    /// Get current time watermark
33    pub fn now() -> Self {
34        Self::from_system_time(SystemTime::now())
35    }
36
37    /// Check if this watermark is before another
38    pub fn is_before(&self, other: &Watermark) -> bool {
39        self.timestamp < other.timestamp
40    }
41
42    /// Check if an event is late according to this watermark
43    pub fn is_late(&self, event_time: u64) -> bool {
44        event_time < self.timestamp
45    }
46}
47
48/// Strategy for generating watermarks
49#[derive(Debug, Clone)]
50pub enum WatermarkStrategy {
51    /// Periodic watermarks based on processing time
52    Periodic {
53        /// Interval between watermark generations
54        interval: Duration,
55    },
56
57    /// Bounded out-of-orderness: watermark = max_timestamp - max_delay
58    BoundedOutOfOrder {
59        /// Maximum delay for out-of-order events
60        max_delay: Duration,
61    },
62
63    /// Monotonic ascending watermarks (no out-of-order tolerance)
64    MonotonicAscending,
65
66    /// Custom watermark generation function
67    Custom,
68}
69
70/// Watermark generator that tracks event-time progress
71#[allow(dead_code)]
72pub struct WatermarkGenerator {
73    /// Current watermark
74    current_watermark: Watermark,
75
76    /// Strategy for generating watermarks
77    strategy: WatermarkStrategy,
78
79    /// Maximum observed event timestamp
80    max_timestamp: u64,
81
82    /// Last watermark emission time (processing time)
83    last_emission: SystemTime,
84
85    /// Pending events waiting for watermark advancement
86    _pending_events: VecDeque<StreamEvent>,
87}
88
89impl WatermarkGenerator {
90    /// Create a new watermark generator with the given strategy
91    pub fn new(strategy: WatermarkStrategy) -> Self {
92        Self {
93            current_watermark: Watermark::new(0),
94            strategy,
95            max_timestamp: 0,
96            last_emission: SystemTime::now(),
97            _pending_events: VecDeque::new(),
98        }
99    }
100
101    /// Process an event and update watermark if needed
102    pub fn process_event(&mut self, event: &StreamEvent) -> Option<Watermark> {
103        let event_time = event.metadata.timestamp;
104
105        // Track maximum timestamp
106        if event_time > self.max_timestamp {
107            self.max_timestamp = event_time;
108        }
109
110        // Generate watermark based on strategy
111        self.maybe_generate_watermark()
112    }
113
114    /// Generate watermark based on strategy
115    fn maybe_generate_watermark(&mut self) -> Option<Watermark> {
116        let new_watermark = match &self.strategy {
117            WatermarkStrategy::Periodic { interval } => {
118                let now = SystemTime::now();
119                let elapsed = now.duration_since(self.last_emission).ok()?;
120
121                if elapsed >= *interval {
122                    self.last_emission = now;
123                    Some(Watermark::new(self.max_timestamp))
124                } else {
125                    None
126                }
127            }
128
129            WatermarkStrategy::BoundedOutOfOrder { max_delay } => {
130                let delay_ms = max_delay.as_millis() as u64;
131                let new_ts = self.max_timestamp.saturating_sub(delay_ms);
132
133                if new_ts > self.current_watermark.timestamp {
134                    Some(Watermark::new(new_ts))
135                } else {
136                    None
137                }
138            }
139
140            WatermarkStrategy::MonotonicAscending => {
141                if self.max_timestamp > self.current_watermark.timestamp {
142                    Some(Watermark::new(self.max_timestamp))
143                } else {
144                    None
145                }
146            }
147
148            WatermarkStrategy::Custom => {
149                // Custom logic can be implemented by subclassing
150                None
151            }
152        };
153
154        if let Some(wm) = new_watermark {
155            if wm > self.current_watermark {
156                self.current_watermark = wm;
157                return Some(wm);
158            }
159        }
160
161        None
162    }
163
164    /// Get the current watermark
165    pub fn current_watermark(&self) -> Watermark {
166        self.current_watermark
167    }
168
169    /// Check if an event is late
170    pub fn is_late(&self, event: &StreamEvent) -> bool {
171        self.current_watermark.is_late(event.metadata.timestamp)
172    }
173}
174
175/// Strategy for handling late events
176#[derive(Debug, Clone)]
177pub enum LateDataStrategy {
178    /// Drop late events completely
179    Drop,
180
181    /// Allow late events up to a certain lateness threshold
182    AllowedLateness {
183        /// Maximum allowed lateness
184        max_lateness: Duration,
185    },
186
187    /// Route late events to a side output for special processing
188    SideOutput,
189
190    /// Recompute affected windows when late data arrives
191    RecomputeWindows,
192}
193
194/// Handler for late data events
195pub struct LateDataHandler {
196    /// Strategy for handling late data
197    strategy: LateDataStrategy,
198
199    /// Side output for late events
200    side_output: Vec<StreamEvent>,
201
202    /// Statistics about late events
203    late_count: usize,
204    dropped_count: usize,
205    allowed_count: usize,
206}
207
208impl LateDataHandler {
209    /// Create a new late data handler with the given strategy
210    pub fn new(strategy: LateDataStrategy) -> Self {
211        Self {
212            strategy,
213            side_output: Vec::new(),
214            late_count: 0,
215            dropped_count: 0,
216            allowed_count: 0,
217        }
218    }
219
220    /// Handle a late event according to the strategy
221    pub fn handle_late_event(
222        &mut self,
223        event: StreamEvent,
224        watermark: &Watermark,
225    ) -> LateEventDecision {
226        self.late_count += 1;
227
228        let lateness = watermark.timestamp.saturating_sub(event.metadata.timestamp);
229
230        match &self.strategy {
231            LateDataStrategy::Drop => {
232                self.dropped_count += 1;
233                LateEventDecision::Drop
234            }
235
236            LateDataStrategy::AllowedLateness { max_lateness } => {
237                let max_lateness_ms = max_lateness.as_millis() as u64;
238
239                if lateness <= max_lateness_ms {
240                    self.allowed_count += 1;
241                    LateEventDecision::Process(event)
242                } else {
243                    self.dropped_count += 1;
244                    LateEventDecision::Drop
245                }
246            }
247
248            LateDataStrategy::SideOutput => {
249                self.side_output.push(event.clone());
250                LateEventDecision::SideOutput(event)
251            }
252
253            LateDataStrategy::RecomputeWindows => {
254                self.allowed_count += 1;
255                LateEventDecision::Recompute(event)
256            }
257        }
258    }
259
260    /// Get the side output events
261    pub fn side_output(&self) -> &[StreamEvent] {
262        &self.side_output
263    }
264
265    /// Clear the side output
266    pub fn clear_side_output(&mut self) {
267        self.side_output.clear();
268    }
269
270    /// Get statistics about late events
271    pub fn stats(&self) -> LateDataStats {
272        LateDataStats {
273            total_late: self.late_count,
274            dropped: self.dropped_count,
275            allowed: self.allowed_count,
276            side_output: self.side_output.len(),
277        }
278    }
279}
280
281/// Decision for how to handle a late event
282#[derive(Debug, Clone)]
283pub enum LateEventDecision {
284    /// Drop the event
285    Drop,
286
287    /// Process the event normally
288    Process(StreamEvent),
289
290    /// Route to side output
291    SideOutput(StreamEvent),
292
293    /// Recompute affected windows
294    Recompute(StreamEvent),
295}
296
297/// Statistics about late data handling
298#[derive(Debug, Clone, Copy)]
299pub struct LateDataStats {
300    /// Total number of late events
301    pub total_late: usize,
302
303    /// Number of dropped late events
304    pub dropped: usize,
305
306    /// Number of allowed late events
307    pub allowed: usize,
308
309    /// Number of events in side output
310    pub side_output: usize,
311}
312
313/// Watermark-aware stream that tracks event-time progress
314pub struct WatermarkedStream {
315    /// Events in the stream
316    events: Vec<StreamEvent>,
317
318    /// Watermark generator
319    watermark_gen: WatermarkGenerator,
320
321    /// Late data handler
322    late_handler: LateDataHandler,
323
324    /// Watermark history for debugging
325    watermark_history: Vec<Watermark>,
326}
327
328impl WatermarkedStream {
329    /// Create a new watermarked stream
330    pub fn new(watermark_strategy: WatermarkStrategy, late_strategy: LateDataStrategy) -> Self {
331        Self {
332            events: Vec::new(),
333            watermark_gen: WatermarkGenerator::new(watermark_strategy),
334            late_handler: LateDataHandler::new(late_strategy),
335            watermark_history: Vec::new(),
336        }
337    }
338
339    /// Add an event to the stream, checking for lateness
340    pub fn add_event(&mut self, event: StreamEvent) -> Result<(), String> {
341        // Check if event is late
342        if self.watermark_gen.is_late(&event) {
343            // Handle late event
344            match self
345                .late_handler
346                .handle_late_event(event, &self.watermark_gen.current_watermark())
347            {
348                LateEventDecision::Drop => {
349                    // Event dropped, do nothing
350                }
351                LateEventDecision::Process(e) => {
352                    self.events.push(e);
353                }
354                LateEventDecision::SideOutput(_) => {
355                    // Event stored in side output
356                }
357                LateEventDecision::Recompute(e) => {
358                    self.events.push(e);
359                }
360            }
361        } else {
362            // Event is on-time
363            self.events.push(event.clone());
364
365            // Update watermark
366            if let Some(new_watermark) = self.watermark_gen.process_event(&event) {
367                self.watermark_history.push(new_watermark);
368            }
369        }
370
371        Ok(())
372    }
373
374    /// Get all events
375    pub fn events(&self) -> &[StreamEvent] {
376        &self.events
377    }
378
379    /// Get current watermark
380    pub fn current_watermark(&self) -> Watermark {
381        self.watermark_gen.current_watermark()
382    }
383
384    /// Get late data statistics
385    pub fn late_stats(&self) -> LateDataStats {
386        self.late_handler.stats()
387    }
388
389    /// Get side output events
390    pub fn side_output(&self) -> &[StreamEvent] {
391        self.late_handler.side_output()
392    }
393
394    /// Get watermark history
395    pub fn watermark_history(&self) -> &[Watermark] {
396        &self.watermark_history
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403    use crate::Value;
404    use std::collections::HashMap;
405
406    fn create_event(timestamp: u64, value: i64) -> StreamEvent {
407        let mut data = HashMap::new();
408        data.insert("value".to_string(), Value::Integer(value));
409        let event = StreamEvent::new("TestEvent", data, "test");
410
411        // Manually set timestamp
412        StreamEvent {
413            metadata: super::super::event::EventMetadata {
414                timestamp,
415                ..event.metadata
416            },
417            ..event
418        }
419    }
420
421    #[test]
422    fn test_watermark_ordering() {
423        let wm1 = Watermark::new(1000);
424        let wm2 = Watermark::new(2000);
425
426        assert!(wm1.is_before(&wm2));
427        assert!(!wm2.is_before(&wm1));
428        assert!(wm1 < wm2);
429    }
430
431    #[test]
432    fn test_monotonic_watermark() {
433        let mut gen = WatermarkGenerator::new(WatermarkStrategy::MonotonicAscending);
434
435        let e1 = create_event(1000, 1);
436        let e2 = create_event(2000, 2);
437        let e3 = create_event(1500, 3); // Out of order
438
439        gen.process_event(&e1);
440        assert_eq!(gen.current_watermark().timestamp, 1000);
441
442        gen.process_event(&e2);
443        assert_eq!(gen.current_watermark().timestamp, 2000);
444
445        gen.process_event(&e3);
446        // Watermark stays at 2000 (monotonic)
447        assert_eq!(gen.current_watermark().timestamp, 2000);
448    }
449
450    #[test]
451    fn test_bounded_out_of_order() {
452        let strategy = WatermarkStrategy::BoundedOutOfOrder {
453            max_delay: Duration::from_millis(500),
454        };
455        let mut gen = WatermarkGenerator::new(strategy);
456
457        let e1 = create_event(2000, 1);
458        gen.process_event(&e1);
459
460        // Watermark should be max_timestamp - max_delay = 2000 - 500 = 1500
461        assert_eq!(gen.current_watermark().timestamp, 1500);
462    }
463
464    #[test]
465    fn test_late_data_drop() {
466        let mut handler = LateDataHandler::new(LateDataStrategy::Drop);
467        let watermark = Watermark::new(2000);
468
469        let late_event = create_event(1000, 1); // 1000ms late
470
471        match handler.handle_late_event(late_event, &watermark) {
472            LateEventDecision::Drop => {
473                let stats = handler.stats();
474                assert_eq!(stats.total_late, 1);
475                assert_eq!(stats.dropped, 1);
476            }
477            _ => panic!("Expected Drop decision"),
478        }
479    }
480
481    #[test]
482    fn test_late_data_allowed_lateness() {
483        let strategy = LateDataStrategy::AllowedLateness {
484            max_lateness: Duration::from_millis(500),
485        };
486        let mut handler = LateDataHandler::new(strategy);
487        let watermark = Watermark::new(2000);
488
489        // Event within allowed lateness
490        let late_event1 = create_event(1600, 1); // 400ms late
491        match handler.handle_late_event(late_event1, &watermark) {
492            LateEventDecision::Process(_) => {
493                assert_eq!(handler.stats().allowed, 1);
494            }
495            _ => panic!("Expected Process decision"),
496        }
497
498        // Event beyond allowed lateness
499        let late_event2 = create_event(1400, 2); // 600ms late
500        match handler.handle_late_event(late_event2, &watermark) {
501            LateEventDecision::Drop => {
502                assert_eq!(handler.stats().dropped, 1);
503            }
504            _ => panic!("Expected Drop decision"),
505        }
506    }
507
508    #[test]
509    fn test_watermarked_stream() {
510        let strategy = WatermarkStrategy::BoundedOutOfOrder {
511            max_delay: Duration::from_millis(500),
512        };
513        let late_strategy = LateDataStrategy::Drop;
514
515        let mut stream = WatermarkedStream::new(strategy, late_strategy);
516
517        // Add events in order
518        stream.add_event(create_event(1000, 1)).unwrap();
519        stream.add_event(create_event(2000, 2)).unwrap();
520
521        // Watermark should be 2000 - 500 = 1500
522        assert_eq!(stream.current_watermark().timestamp, 1500);
523
524        // Add late event (before watermark)
525        stream.add_event(create_event(1200, 3)).unwrap();
526
527        // Late event should be dropped
528        let stats = stream.late_stats();
529        assert_eq!(stats.total_late, 1);
530        assert_eq!(stats.dropped, 1);
531
532        // Should have 2 on-time events
533        assert_eq!(stream.events().len(), 2);
534    }
535}