rust_rule_engine/streaming/
watermark.rs1use crate::types::Value;
7use super::event::StreamEvent;
8use std::collections::{BTreeMap, VecDeque};
9use std::time::{Duration, SystemTime, UNIX_EPOCH};
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
13pub struct Watermark {
14 pub timestamp: u64,
16}
17
18impl Watermark {
19 pub fn new(timestamp: u64) -> Self {
21 Self { timestamp }
22 }
23
24 pub fn from_system_time(time: SystemTime) -> Self {
26 let timestamp = time
27 .duration_since(UNIX_EPOCH)
28 .unwrap_or(Duration::ZERO)
29 .as_millis() as u64;
30 Self { timestamp }
31 }
32
33 pub fn now() -> Self {
35 Self::from_system_time(SystemTime::now())
36 }
37
38 pub fn is_before(&self, other: &Watermark) -> bool {
40 self.timestamp < other.timestamp
41 }
42
43 pub fn is_late(&self, event_time: u64) -> bool {
45 event_time < self.timestamp
46 }
47}
48
49#[derive(Debug, Clone)]
51pub enum WatermarkStrategy {
52 Periodic {
54 interval: Duration,
56 },
57
58 BoundedOutOfOrder {
60 max_delay: Duration,
62 },
63
64 MonotonicAscending,
66
67 Custom,
69}
70
71pub struct WatermarkGenerator {
73 current_watermark: Watermark,
75
76 strategy: WatermarkStrategy,
78
79 max_timestamp: u64,
81
82 last_emission: SystemTime,
84
85 pending_events: VecDeque<StreamEvent>,
87}
88
89impl WatermarkGenerator {
90 pub fn new(strategy: WatermarkStrategy) -> Self {
92 Self {
93 current_watermark: Watermark::new(0),
94 strategy,
95 max_timestamp: 0,
96 last_emission: SystemTime::now(),
97 pending_events: VecDeque::new(),
98 }
99 }
100
101 pub fn process_event(&mut self, event: &StreamEvent) -> Option<Watermark> {
103 let event_time = event.metadata.timestamp;
104
105 if event_time > self.max_timestamp {
107 self.max_timestamp = event_time;
108 }
109
110 self.maybe_generate_watermark()
112 }
113
114 fn maybe_generate_watermark(&mut self) -> Option<Watermark> {
116 let new_watermark = match &self.strategy {
117 WatermarkStrategy::Periodic { interval } => {
118 let now = SystemTime::now();
119 let elapsed = now.duration_since(self.last_emission).ok()?;
120
121 if elapsed >= *interval {
122 self.last_emission = now;
123 Some(Watermark::new(self.max_timestamp))
124 } else {
125 None
126 }
127 }
128
129 WatermarkStrategy::BoundedOutOfOrder { max_delay } => {
130 let delay_ms = max_delay.as_millis() as u64;
131 let new_ts = self.max_timestamp.saturating_sub(delay_ms);
132
133 if new_ts > self.current_watermark.timestamp {
134 Some(Watermark::new(new_ts))
135 } else {
136 None
137 }
138 }
139
140 WatermarkStrategy::MonotonicAscending => {
141 if self.max_timestamp > self.current_watermark.timestamp {
142 Some(Watermark::new(self.max_timestamp))
143 } else {
144 None
145 }
146 }
147
148 WatermarkStrategy::Custom => {
149 None
151 }
152 };
153
154 if let Some(wm) = new_watermark {
155 if wm > self.current_watermark {
156 self.current_watermark = wm;
157 return Some(wm);
158 }
159 }
160
161 None
162 }
163
164 pub fn current_watermark(&self) -> Watermark {
166 self.current_watermark
167 }
168
169 pub fn is_late(&self, event: &StreamEvent) -> bool {
171 self.current_watermark.is_late(event.metadata.timestamp)
172 }
173}
174
175#[derive(Debug, Clone)]
177pub enum LateDataStrategy {
178 Drop,
180
181 AllowedLateness {
183 max_lateness: Duration,
185 },
186
187 SideOutput,
189
190 RecomputeWindows,
192}
193
194pub struct LateDataHandler {
196 strategy: LateDataStrategy,
198
199 side_output: Vec<StreamEvent>,
201
202 late_count: usize,
204 dropped_count: usize,
205 allowed_count: usize,
206}
207
208impl LateDataHandler {
209 pub fn new(strategy: LateDataStrategy) -> Self {
211 Self {
212 strategy,
213 side_output: Vec::new(),
214 late_count: 0,
215 dropped_count: 0,
216 allowed_count: 0,
217 }
218 }
219
220 pub fn handle_late_event(
222 &mut self,
223 event: StreamEvent,
224 watermark: &Watermark,
225 ) -> LateEventDecision {
226 self.late_count += 1;
227
228 let lateness = watermark.timestamp.saturating_sub(event.metadata.timestamp);
229
230 match &self.strategy {
231 LateDataStrategy::Drop => {
232 self.dropped_count += 1;
233 LateEventDecision::Drop
234 }
235
236 LateDataStrategy::AllowedLateness { max_lateness } => {
237 let max_lateness_ms = max_lateness.as_millis() as u64;
238
239 if lateness <= max_lateness_ms {
240 self.allowed_count += 1;
241 LateEventDecision::Process(event)
242 } else {
243 self.dropped_count += 1;
244 LateEventDecision::Drop
245 }
246 }
247
248 LateDataStrategy::SideOutput => {
249 self.side_output.push(event.clone());
250 LateEventDecision::SideOutput(event)
251 }
252
253 LateDataStrategy::RecomputeWindows => {
254 self.allowed_count += 1;
255 LateEventDecision::Recompute(event)
256 }
257 }
258 }
259
260 pub fn side_output(&self) -> &[StreamEvent] {
262 &self.side_output
263 }
264
265 pub fn clear_side_output(&mut self) {
267 self.side_output.clear();
268 }
269
270 pub fn stats(&self) -> LateDataStats {
272 LateDataStats {
273 total_late: self.late_count,
274 dropped: self.dropped_count,
275 allowed: self.allowed_count,
276 side_output: self.side_output.len(),
277 }
278 }
279}
280
281#[derive(Debug, Clone)]
283pub enum LateEventDecision {
284 Drop,
286
287 Process(StreamEvent),
289
290 SideOutput(StreamEvent),
292
293 Recompute(StreamEvent),
295}
296
297#[derive(Debug, Clone, Copy)]
299pub struct LateDataStats {
300 pub total_late: usize,
302
303 pub dropped: usize,
305
306 pub allowed: usize,
308
309 pub side_output: usize,
311}
312
313pub struct WatermarkedStream {
315 events: Vec<StreamEvent>,
317
318 watermark_gen: WatermarkGenerator,
320
321 late_handler: LateDataHandler,
323
324 watermark_history: Vec<Watermark>,
326}
327
328impl WatermarkedStream {
329 pub fn new(
331 watermark_strategy: WatermarkStrategy,
332 late_strategy: LateDataStrategy,
333 ) -> Self {
334 Self {
335 events: Vec::new(),
336 watermark_gen: WatermarkGenerator::new(watermark_strategy),
337 late_handler: LateDataHandler::new(late_strategy),
338 watermark_history: Vec::new(),
339 }
340 }
341
342 pub fn add_event(&mut self, event: StreamEvent) -> Result<(), String> {
344 if self.watermark_gen.is_late(&event) {
346 match self.late_handler.handle_late_event(event, &self.watermark_gen.current_watermark()) {
348 LateEventDecision::Drop => {
349 }
351 LateEventDecision::Process(e) => {
352 self.events.push(e);
353 }
354 LateEventDecision::SideOutput(_) => {
355 }
357 LateEventDecision::Recompute(e) => {
358 self.events.push(e);
359 }
360 }
361 } else {
362 self.events.push(event.clone());
364
365 if let Some(new_watermark) = self.watermark_gen.process_event(&event) {
367 self.watermark_history.push(new_watermark);
368 }
369 }
370
371 Ok(())
372 }
373
374 pub fn events(&self) -> &[StreamEvent] {
376 &self.events
377 }
378
379 pub fn current_watermark(&self) -> Watermark {
381 self.watermark_gen.current_watermark()
382 }
383
384 pub fn late_stats(&self) -> LateDataStats {
386 self.late_handler.stats()
387 }
388
389 pub fn side_output(&self) -> &[StreamEvent] {
391 self.late_handler.side_output()
392 }
393
394 pub fn watermark_history(&self) -> &[Watermark] {
396 &self.watermark_history
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403 use std::collections::HashMap;
404 use std::thread;
405
406 fn create_event(timestamp: u64, value: i64) -> StreamEvent {
407 let mut data = HashMap::new();
408 data.insert("value".to_string(), Value::Integer(value));
409 let event = StreamEvent::new("TestEvent", data, "test");
410
411 StreamEvent {
413 metadata: super::super::event::EventMetadata {
414 timestamp,
415 ..event.metadata
416 },
417 ..event
418 }
419 }
420
421 #[test]
422 fn test_watermark_ordering() {
423 let wm1 = Watermark::new(1000);
424 let wm2 = Watermark::new(2000);
425
426 assert!(wm1.is_before(&wm2));
427 assert!(!wm2.is_before(&wm1));
428 assert!(wm1 < wm2);
429 }
430
431 #[test]
432 fn test_monotonic_watermark() {
433 let mut gen = WatermarkGenerator::new(WatermarkStrategy::MonotonicAscending);
434
435 let e1 = create_event(1000, 1);
436 let e2 = create_event(2000, 2);
437 let e3 = create_event(1500, 3); gen.process_event(&e1);
440 assert_eq!(gen.current_watermark().timestamp, 1000);
441
442 gen.process_event(&e2);
443 assert_eq!(gen.current_watermark().timestamp, 2000);
444
445 gen.process_event(&e3);
446 assert_eq!(gen.current_watermark().timestamp, 2000);
448 }
449
450 #[test]
451 fn test_bounded_out_of_order() {
452 let strategy = WatermarkStrategy::BoundedOutOfOrder {
453 max_delay: Duration::from_millis(500),
454 };
455 let mut gen = WatermarkGenerator::new(strategy);
456
457 let e1 = create_event(2000, 1);
458 gen.process_event(&e1);
459
460 assert_eq!(gen.current_watermark().timestamp, 1500);
462 }
463
464 #[test]
465 fn test_late_data_drop() {
466 let mut handler = LateDataHandler::new(LateDataStrategy::Drop);
467 let watermark = Watermark::new(2000);
468
469 let late_event = create_event(1000, 1); match handler.handle_late_event(late_event, &watermark) {
472 LateEventDecision::Drop => {
473 let stats = handler.stats();
474 assert_eq!(stats.total_late, 1);
475 assert_eq!(stats.dropped, 1);
476 }
477 _ => panic!("Expected Drop decision"),
478 }
479 }
480
481 #[test]
482 fn test_late_data_allowed_lateness() {
483 let strategy = LateDataStrategy::AllowedLateness {
484 max_lateness: Duration::from_millis(500),
485 };
486 let mut handler = LateDataHandler::new(strategy);
487 let watermark = Watermark::new(2000);
488
489 let late_event1 = create_event(1600, 1); match handler.handle_late_event(late_event1, &watermark) {
492 LateEventDecision::Process(_) => {
493 assert_eq!(handler.stats().allowed, 1);
494 }
495 _ => panic!("Expected Process decision"),
496 }
497
498 let late_event2 = create_event(1400, 2); match handler.handle_late_event(late_event2, &watermark) {
501 LateEventDecision::Drop => {
502 assert_eq!(handler.stats().dropped, 1);
503 }
504 _ => panic!("Expected Drop decision"),
505 }
506 }
507
508 #[test]
509 fn test_watermarked_stream() {
510 let strategy = WatermarkStrategy::BoundedOutOfOrder {
511 max_delay: Duration::from_millis(500),
512 };
513 let late_strategy = LateDataStrategy::Drop;
514
515 let mut stream = WatermarkedStream::new(strategy, late_strategy);
516
517 stream.add_event(create_event(1000, 1)).unwrap();
519 stream.add_event(create_event(2000, 2)).unwrap();
520
521 assert_eq!(stream.current_watermark().timestamp, 1500);
523
524 stream.add_event(create_event(1200, 3)).unwrap();
526
527 let stats = stream.late_stats();
529 assert_eq!(stats.total_late, 1);
530 assert_eq!(stats.dropped, 1);
531
532 assert_eq!(stream.events().len(), 2);
534 }
535}