rust_rule_engine/streaming/
watermark.rs

1//! Watermark and Late Data Handling
2//! 
3//! This module provides watermark generation and late data handling for
4//! stream processing with out-of-order events.
5
6use crate::types::Value;
7use super::event::StreamEvent;
8use std::collections::{BTreeMap, VecDeque};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11/// Watermark representing event-time progress
12#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
13pub struct Watermark {
14    /// Timestamp in milliseconds since UNIX epoch
15    pub timestamp: u64,
16}
17
18impl Watermark {
19    /// Create a new watermark with the given timestamp
20    pub fn new(timestamp: u64) -> Self {
21        Self { timestamp }
22    }
23    
24    /// Create a watermark from system time
25    pub fn from_system_time(time: SystemTime) -> Self {
26        let timestamp = time
27            .duration_since(UNIX_EPOCH)
28            .unwrap_or(Duration::ZERO)
29            .as_millis() as u64;
30        Self { timestamp }
31    }
32    
33    /// Get current time watermark
34    pub fn now() -> Self {
35        Self::from_system_time(SystemTime::now())
36    }
37    
38    /// Check if this watermark is before another
39    pub fn is_before(&self, other: &Watermark) -> bool {
40        self.timestamp < other.timestamp
41    }
42    
43    /// Check if an event is late according to this watermark
44    pub fn is_late(&self, event_time: u64) -> bool {
45        event_time < self.timestamp
46    }
47}
48
49/// Strategy for generating watermarks
50#[derive(Debug, Clone)]
51pub enum WatermarkStrategy {
52    /// Periodic watermarks based on processing time
53    Periodic {
54        /// Interval between watermark generations
55        interval: Duration,
56    },
57    
58    /// Bounded out-of-orderness: watermark = max_timestamp - max_delay
59    BoundedOutOfOrder {
60        /// Maximum delay for out-of-order events
61        max_delay: Duration,
62    },
63    
64    /// Monotonic ascending watermarks (no out-of-order tolerance)
65    MonotonicAscending,
66    
67    /// Custom watermark generation function
68    Custom,
69}
70
71/// Watermark generator that tracks event-time progress
72pub struct WatermarkGenerator {
73    /// Current watermark
74    current_watermark: Watermark,
75    
76    /// Strategy for generating watermarks
77    strategy: WatermarkStrategy,
78    
79    /// Maximum observed event timestamp
80    max_timestamp: u64,
81    
82    /// Last watermark emission time (processing time)
83    last_emission: SystemTime,
84    
85    /// Pending events waiting for watermark advancement
86    pending_events: VecDeque<StreamEvent>,
87}
88
89impl WatermarkGenerator {
90    /// Create a new watermark generator with the given strategy
91    pub fn new(strategy: WatermarkStrategy) -> Self {
92        Self {
93            current_watermark: Watermark::new(0),
94            strategy,
95            max_timestamp: 0,
96            last_emission: SystemTime::now(),
97            pending_events: VecDeque::new(),
98        }
99    }
100    
101    /// Process an event and update watermark if needed
102    pub fn process_event(&mut self, event: &StreamEvent) -> Option<Watermark> {
103        let event_time = event.metadata.timestamp;
104        
105        // Track maximum timestamp
106        if event_time > self.max_timestamp {
107            self.max_timestamp = event_time;
108        }
109        
110        // Generate watermark based on strategy
111        self.maybe_generate_watermark()
112    }
113    
114    /// Generate watermark based on strategy
115    fn maybe_generate_watermark(&mut self) -> Option<Watermark> {
116        let new_watermark = match &self.strategy {
117            WatermarkStrategy::Periodic { interval } => {
118                let now = SystemTime::now();
119                let elapsed = now.duration_since(self.last_emission).ok()?;
120                
121                if elapsed >= *interval {
122                    self.last_emission = now;
123                    Some(Watermark::new(self.max_timestamp))
124                } else {
125                    None
126                }
127            }
128            
129            WatermarkStrategy::BoundedOutOfOrder { max_delay } => {
130                let delay_ms = max_delay.as_millis() as u64;
131                let new_ts = self.max_timestamp.saturating_sub(delay_ms);
132                
133                if new_ts > self.current_watermark.timestamp {
134                    Some(Watermark::new(new_ts))
135                } else {
136                    None
137                }
138            }
139            
140            WatermarkStrategy::MonotonicAscending => {
141                if self.max_timestamp > self.current_watermark.timestamp {
142                    Some(Watermark::new(self.max_timestamp))
143                } else {
144                    None
145                }
146            }
147            
148            WatermarkStrategy::Custom => {
149                // Custom logic can be implemented by subclassing
150                None
151            }
152        };
153        
154        if let Some(wm) = new_watermark {
155            if wm > self.current_watermark {
156                self.current_watermark = wm;
157                return Some(wm);
158            }
159        }
160        
161        None
162    }
163    
164    /// Get the current watermark
165    pub fn current_watermark(&self) -> Watermark {
166        self.current_watermark
167    }
168    
169    /// Check if an event is late
170    pub fn is_late(&self, event: &StreamEvent) -> bool {
171        self.current_watermark.is_late(event.metadata.timestamp)
172    }
173}
174
175/// Strategy for handling late events
176#[derive(Debug, Clone)]
177pub enum LateDataStrategy {
178    /// Drop late events completely
179    Drop,
180    
181    /// Allow late events up to a certain lateness threshold
182    AllowedLateness {
183        /// Maximum allowed lateness
184        max_lateness: Duration,
185    },
186    
187    /// Route late events to a side output for special processing
188    SideOutput,
189    
190    /// Recompute affected windows when late data arrives
191    RecomputeWindows,
192}
193
194/// Handler for late data events
195pub struct LateDataHandler {
196    /// Strategy for handling late data
197    strategy: LateDataStrategy,
198    
199    /// Side output for late events
200    side_output: Vec<StreamEvent>,
201    
202    /// Statistics about late events
203    late_count: usize,
204    dropped_count: usize,
205    allowed_count: usize,
206}
207
208impl LateDataHandler {
209    /// Create a new late data handler with the given strategy
210    pub fn new(strategy: LateDataStrategy) -> Self {
211        Self {
212            strategy,
213            side_output: Vec::new(),
214            late_count: 0,
215            dropped_count: 0,
216            allowed_count: 0,
217        }
218    }
219    
220    /// Handle a late event according to the strategy
221    pub fn handle_late_event(
222        &mut self,
223        event: StreamEvent,
224        watermark: &Watermark,
225    ) -> LateEventDecision {
226        self.late_count += 1;
227        
228        let lateness = watermark.timestamp.saturating_sub(event.metadata.timestamp);
229        
230        match &self.strategy {
231            LateDataStrategy::Drop => {
232                self.dropped_count += 1;
233                LateEventDecision::Drop
234            }
235            
236            LateDataStrategy::AllowedLateness { max_lateness } => {
237                let max_lateness_ms = max_lateness.as_millis() as u64;
238                
239                if lateness <= max_lateness_ms {
240                    self.allowed_count += 1;
241                    LateEventDecision::Process(event)
242                } else {
243                    self.dropped_count += 1;
244                    LateEventDecision::Drop
245                }
246            }
247            
248            LateDataStrategy::SideOutput => {
249                self.side_output.push(event.clone());
250                LateEventDecision::SideOutput(event)
251            }
252            
253            LateDataStrategy::RecomputeWindows => {
254                self.allowed_count += 1;
255                LateEventDecision::Recompute(event)
256            }
257        }
258    }
259    
260    /// Get the side output events
261    pub fn side_output(&self) -> &[StreamEvent] {
262        &self.side_output
263    }
264    
265    /// Clear the side output
266    pub fn clear_side_output(&mut self) {
267        self.side_output.clear();
268    }
269    
270    /// Get statistics about late events
271    pub fn stats(&self) -> LateDataStats {
272        LateDataStats {
273            total_late: self.late_count,
274            dropped: self.dropped_count,
275            allowed: self.allowed_count,
276            side_output: self.side_output.len(),
277        }
278    }
279}
280
281/// Decision for how to handle a late event
282#[derive(Debug, Clone)]
283pub enum LateEventDecision {
284    /// Drop the event
285    Drop,
286    
287    /// Process the event normally
288    Process(StreamEvent),
289    
290    /// Route to side output
291    SideOutput(StreamEvent),
292    
293    /// Recompute affected windows
294    Recompute(StreamEvent),
295}
296
297/// Statistics about late data handling
298#[derive(Debug, Clone, Copy)]
299pub struct LateDataStats {
300    /// Total number of late events
301    pub total_late: usize,
302    
303    /// Number of dropped late events
304    pub dropped: usize,
305    
306    /// Number of allowed late events
307    pub allowed: usize,
308    
309    /// Number of events in side output
310    pub side_output: usize,
311}
312
313/// Watermark-aware stream that tracks event-time progress
314pub struct WatermarkedStream {
315    /// Events in the stream
316    events: Vec<StreamEvent>,
317    
318    /// Watermark generator
319    watermark_gen: WatermarkGenerator,
320    
321    /// Late data handler
322    late_handler: LateDataHandler,
323    
324    /// Watermark history for debugging
325    watermark_history: Vec<Watermark>,
326}
327
328impl WatermarkedStream {
329    /// Create a new watermarked stream
330    pub fn new(
331        watermark_strategy: WatermarkStrategy,
332        late_strategy: LateDataStrategy,
333    ) -> Self {
334        Self {
335            events: Vec::new(),
336            watermark_gen: WatermarkGenerator::new(watermark_strategy),
337            late_handler: LateDataHandler::new(late_strategy),
338            watermark_history: Vec::new(),
339        }
340    }
341    
342    /// Add an event to the stream, checking for lateness
343    pub fn add_event(&mut self, event: StreamEvent) -> Result<(), String> {
344        // Check if event is late
345        if self.watermark_gen.is_late(&event) {
346            // Handle late event
347            match self.late_handler.handle_late_event(event, &self.watermark_gen.current_watermark()) {
348                LateEventDecision::Drop => {
349                    // Event dropped, do nothing
350                }
351                LateEventDecision::Process(e) => {
352                    self.events.push(e);
353                }
354                LateEventDecision::SideOutput(_) => {
355                    // Event stored in side output
356                }
357                LateEventDecision::Recompute(e) => {
358                    self.events.push(e);
359                }
360            }
361        } else {
362            // Event is on-time
363            self.events.push(event.clone());
364            
365            // Update watermark
366            if let Some(new_watermark) = self.watermark_gen.process_event(&event) {
367                self.watermark_history.push(new_watermark);
368            }
369        }
370        
371        Ok(())
372    }
373    
374    /// Get all events
375    pub fn events(&self) -> &[StreamEvent] {
376        &self.events
377    }
378    
379    /// Get current watermark
380    pub fn current_watermark(&self) -> Watermark {
381        self.watermark_gen.current_watermark()
382    }
383    
384    /// Get late data statistics
385    pub fn late_stats(&self) -> LateDataStats {
386        self.late_handler.stats()
387    }
388    
389    /// Get side output events
390    pub fn side_output(&self) -> &[StreamEvent] {
391        self.late_handler.side_output()
392    }
393    
394    /// Get watermark history
395    pub fn watermark_history(&self) -> &[Watermark] {
396        &self.watermark_history
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403    use std::collections::HashMap;
404    use std::thread;
405
406    fn create_event(timestamp: u64, value: i64) -> StreamEvent {
407        let mut data = HashMap::new();
408        data.insert("value".to_string(), Value::Integer(value));
409        let event = StreamEvent::new("TestEvent", data, "test");
410        
411        // Manually set timestamp
412        StreamEvent {
413            metadata: super::super::event::EventMetadata {
414                timestamp,
415                ..event.metadata
416            },
417            ..event
418        }
419    }
420
421    #[test]
422    fn test_watermark_ordering() {
423        let wm1 = Watermark::new(1000);
424        let wm2 = Watermark::new(2000);
425        
426        assert!(wm1.is_before(&wm2));
427        assert!(!wm2.is_before(&wm1));
428        assert!(wm1 < wm2);
429    }
430
431    #[test]
432    fn test_monotonic_watermark() {
433        let mut gen = WatermarkGenerator::new(WatermarkStrategy::MonotonicAscending);
434        
435        let e1 = create_event(1000, 1);
436        let e2 = create_event(2000, 2);
437        let e3 = create_event(1500, 3); // Out of order
438        
439        gen.process_event(&e1);
440        assert_eq!(gen.current_watermark().timestamp, 1000);
441        
442        gen.process_event(&e2);
443        assert_eq!(gen.current_watermark().timestamp, 2000);
444        
445        gen.process_event(&e3);
446        // Watermark stays at 2000 (monotonic)
447        assert_eq!(gen.current_watermark().timestamp, 2000);
448    }
449
450    #[test]
451    fn test_bounded_out_of_order() {
452        let strategy = WatermarkStrategy::BoundedOutOfOrder {
453            max_delay: Duration::from_millis(500),
454        };
455        let mut gen = WatermarkGenerator::new(strategy);
456        
457        let e1 = create_event(2000, 1);
458        gen.process_event(&e1);
459        
460        // Watermark should be max_timestamp - max_delay = 2000 - 500 = 1500
461        assert_eq!(gen.current_watermark().timestamp, 1500);
462    }
463
464    #[test]
465    fn test_late_data_drop() {
466        let mut handler = LateDataHandler::new(LateDataStrategy::Drop);
467        let watermark = Watermark::new(2000);
468        
469        let late_event = create_event(1000, 1); // 1000ms late
470        
471        match handler.handle_late_event(late_event, &watermark) {
472            LateEventDecision::Drop => {
473                let stats = handler.stats();
474                assert_eq!(stats.total_late, 1);
475                assert_eq!(stats.dropped, 1);
476            }
477            _ => panic!("Expected Drop decision"),
478        }
479    }
480
481    #[test]
482    fn test_late_data_allowed_lateness() {
483        let strategy = LateDataStrategy::AllowedLateness {
484            max_lateness: Duration::from_millis(500),
485        };
486        let mut handler = LateDataHandler::new(strategy);
487        let watermark = Watermark::new(2000);
488        
489        // Event within allowed lateness
490        let late_event1 = create_event(1600, 1); // 400ms late
491        match handler.handle_late_event(late_event1, &watermark) {
492            LateEventDecision::Process(_) => {
493                assert_eq!(handler.stats().allowed, 1);
494            }
495            _ => panic!("Expected Process decision"),
496        }
497        
498        // Event beyond allowed lateness
499        let late_event2 = create_event(1400, 2); // 600ms late
500        match handler.handle_late_event(late_event2, &watermark) {
501            LateEventDecision::Drop => {
502                assert_eq!(handler.stats().dropped, 1);
503            }
504            _ => panic!("Expected Drop decision"),
505        }
506    }
507
508    #[test]
509    fn test_watermarked_stream() {
510        let strategy = WatermarkStrategy::BoundedOutOfOrder {
511            max_delay: Duration::from_millis(500),
512        };
513        let late_strategy = LateDataStrategy::Drop;
514        
515        let mut stream = WatermarkedStream::new(strategy, late_strategy);
516        
517        // Add events in order
518        stream.add_event(create_event(1000, 1)).unwrap();
519        stream.add_event(create_event(2000, 2)).unwrap();
520        
521        // Watermark should be 2000 - 500 = 1500
522        assert_eq!(stream.current_watermark().timestamp, 1500);
523        
524        // Add late event (before watermark)
525        stream.add_event(create_event(1200, 3)).unwrap();
526        
527        // Late event should be dropped
528        let stats = stream.late_stats();
529        assert_eq!(stats.total_late, 1);
530        assert_eq!(stats.dropped, 1);
531        
532        // Should have 2 on-time events
533        assert_eq!(stream.events().len(), 2);
534    }
535}