1pub mod late_handler;
21pub mod propagation;
22
23pub use late_handler::{AllowedLatenessTracker, SideOutputRouter};
24pub use propagation::{OperatorId, OperatorWatermarkAggregator, WatermarkPropagator};
25
26use std::collections::HashMap;
27
28#[derive(Debug, Clone, PartialEq, Eq)]
35pub struct StreamWatermark {
36 pub timestamp: i64,
38 pub source_id: String,
40 pub advance_count: u64,
42}
43
44impl StreamWatermark {
45 pub fn new(timestamp: i64, source_id: impl Into<String>, advance_count: u64) -> Self {
46 Self {
47 timestamp,
48 source_id: source_id.into(),
49 advance_count,
50 }
51 }
52}
53
54pub struct WatermarkGenerator {
62 max_out_of_order_ms: i64,
63 current_max_ts: i64,
64 advance_threshold: usize,
65 event_count: usize,
66 advance_count: u64,
67 source_id: String,
68}
69
70impl WatermarkGenerator {
71 pub fn new(max_out_of_order_ms: i64) -> Self {
75 Self {
76 max_out_of_order_ms,
77 current_max_ts: i64::MIN,
78 advance_threshold: 100,
79 event_count: 0,
80 advance_count: 0,
81 source_id: "default".to_string(),
82 }
83 }
84
85 pub fn with_advance_threshold(mut self, threshold: usize) -> Self {
87 self.advance_threshold = threshold;
88 self
89 }
90
91 pub fn with_source_id(mut self, source_id: impl Into<String>) -> Self {
93 self.source_id = source_id.into();
94 self
95 }
96
97 pub fn observe(&mut self, event_timestamp_ms: i64) -> Option<StreamWatermark> {
101 if event_timestamp_ms > self.current_max_ts {
102 self.current_max_ts = event_timestamp_ms;
103 }
104 self.event_count += 1;
105
106 if self.event_count >= self.advance_threshold {
107 self.event_count = 0;
108 self.advance_count += 1;
109 Some(StreamWatermark::new(
110 self.current_watermark(),
111 self.source_id.clone(),
112 self.advance_count,
113 ))
114 } else {
115 None
116 }
117 }
118
119 pub fn current_watermark(&self) -> i64 {
123 if self.current_max_ts == i64::MIN {
124 i64::MIN
125 } else {
126 self.current_max_ts - self.max_out_of_order_ms
127 }
128 }
129
130 pub fn is_late(&self, event_timestamp_ms: i64) -> bool {
132 event_timestamp_ms < self.current_watermark()
133 }
134}
135
136pub struct WatermarkAligner {
143 sources: HashMap<String, i64>,
144}
145
146impl WatermarkAligner {
147 pub fn new() -> Self {
149 Self {
150 sources: HashMap::new(),
151 }
152 }
153
154 pub fn update(&mut self, source_id: &str, watermark_ms: i64) {
156 self.sources.insert(source_id.to_string(), watermark_ms);
157 }
158
159 pub fn global_watermark(&self) -> i64 {
163 self.sources.values().copied().min().unwrap_or(i64::MIN)
164 }
165
166 pub fn source_count(&self) -> usize {
168 self.sources.len()
169 }
170
171 pub fn all_beyond(&self, timestamp_ms: i64) -> bool {
174 if self.sources.is_empty() {
175 return false;
176 }
177 self.sources.values().all(|&wm| wm > timestamp_ms)
178 }
179}
180
181impl Default for WatermarkAligner {
182 fn default() -> Self {
183 Self::new()
184 }
185}
186
187#[derive(Debug, Clone)]
191pub enum LateDataPolicy {
192 Drop,
194 Reassign { max_lateness_ms: i64 },
197 SideOutput { channel: String },
199}
200
201#[derive(Debug, Clone, PartialEq)]
203pub enum LateDataDecision {
204 Process,
206 Drop,
208 Reassign(i64),
210 SideOutput,
212}
213
214pub struct LateDataHandler {
216 pub policy: LateDataPolicy,
217 pub late_event_count: u64,
218}
219
220impl LateDataHandler {
221 pub fn new(policy: LateDataPolicy) -> Self {
223 Self {
224 policy,
225 late_event_count: 0,
226 }
227 }
228
229 pub fn handle(&mut self, event_ts_ms: i64, watermark_ms: i64) -> LateDataDecision {
234 if event_ts_ms >= watermark_ms {
235 return LateDataDecision::Process;
236 }
237 self.late_event_count += 1;
238 match &self.policy {
239 LateDataPolicy::Drop => LateDataDecision::Drop,
240 LateDataPolicy::Reassign { max_lateness_ms } => {
241 let lateness = watermark_ms - event_ts_ms;
242 if lateness <= *max_lateness_ms {
243 LateDataDecision::Reassign(watermark_ms)
244 } else {
245 LateDataDecision::Drop
246 }
247 }
248 LateDataPolicy::SideOutput { .. } => LateDataDecision::SideOutput,
249 }
250 }
251}
252
253#[cfg(test)]
256mod tests {
257 use super::*;
258
259 #[test]
262 fn test_generator_initial_watermark_is_min() {
263 let gen = WatermarkGenerator::new(500);
264 assert_eq!(gen.current_watermark(), i64::MIN);
265 }
266
267 #[test]
268 fn test_generator_no_watermark_before_threshold() {
269 let mut gen = WatermarkGenerator::new(100).with_advance_threshold(10);
270 for ts in 0..9 {
271 let wm = gen.observe(ts * 100);
272 assert!(wm.is_none(), "should not emit before threshold");
273 }
274 }
275
276 #[test]
277 fn test_generator_emits_at_threshold() {
278 let mut gen = WatermarkGenerator::new(100).with_advance_threshold(5);
279 for ts in 0..5 {
280 let _wm = gen.observe(ts * 1000);
281 }
282 let wm = gen.observe(5000);
283 let mut gen2 = WatermarkGenerator::new(100).with_advance_threshold(5);
286 let mut last_wm = None;
287 for i in 0..5 {
288 last_wm = gen2.observe(i * 1000);
289 }
290 assert!(last_wm.is_some(), "watermark must be emitted at threshold");
291 let w = last_wm.unwrap();
292 assert_eq!(w.timestamp, 3900);
294 assert_eq!(w.advance_count, 1);
295 drop(wm);
297 }
298
299 #[test]
300 fn test_generator_advance_count_increments() {
301 let mut gen = WatermarkGenerator::new(0).with_advance_threshold(3);
302 for i in 0..6 {
303 gen.observe(i * 1000);
304 }
305 assert_eq!(gen.advance_count, 2);
306 }
307
308 #[test]
309 fn test_generator_current_watermark_formula() {
310 let mut gen = WatermarkGenerator::new(200).with_advance_threshold(100);
311 gen.observe(5000);
312 assert_eq!(gen.current_watermark(), 4800); }
314
315 #[test]
316 fn test_generator_is_late_true() {
317 let mut gen = WatermarkGenerator::new(100).with_advance_threshold(100);
318 gen.observe(10_000);
319 assert!(gen.is_late(9000));
321 }
322
323 #[test]
324 fn test_generator_is_late_false() {
325 let mut gen = WatermarkGenerator::new(100).with_advance_threshold(100);
326 gen.observe(10_000);
327 assert!(!gen.is_late(9900));
329 assert!(!gen.is_late(10_000));
331 }
332
333 #[test]
334 fn test_generator_source_id() {
335 let mut gen = WatermarkGenerator::new(0)
336 .with_advance_threshold(1)
337 .with_source_id("sensor-42");
338 let wm = gen.observe(1000).expect("should emit");
339 assert_eq!(wm.source_id, "sensor-42");
340 }
341
342 #[test]
343 fn test_generator_max_ts_tracks_maximum() {
344 let mut gen = WatermarkGenerator::new(0).with_advance_threshold(100);
345 gen.observe(500);
347 gen.observe(1000);
348 gen.observe(300);
349 assert_eq!(gen.current_max_ts, 1000);
350 assert_eq!(gen.current_watermark(), 1000);
351 }
352
353 #[test]
356 fn test_aligner_empty_returns_min() {
357 let aligner = WatermarkAligner::new();
358 assert_eq!(aligner.global_watermark(), i64::MIN);
359 assert_eq!(aligner.source_count(), 0);
360 }
361
362 #[test]
363 fn test_aligner_single_source() {
364 let mut aligner = WatermarkAligner::new();
365 aligner.update("src-A", 5000);
366 assert_eq!(aligner.global_watermark(), 5000);
367 assert_eq!(aligner.source_count(), 1);
368 }
369
370 #[test]
371 fn test_aligner_global_is_minimum() {
372 let mut aligner = WatermarkAligner::new();
373 aligner.update("src-A", 5000);
374 aligner.update("src-B", 3000);
375 aligner.update("src-C", 7000);
376 assert_eq!(aligner.global_watermark(), 3000);
377 }
378
379 #[test]
380 fn test_aligner_all_beyond_true() {
381 let mut aligner = WatermarkAligner::new();
382 aligner.update("A", 10_000);
383 aligner.update("B", 12_000);
384 assert!(aligner.all_beyond(9_999));
385 }
386
387 #[test]
388 fn test_aligner_all_beyond_false_one_lagging() {
389 let mut aligner = WatermarkAligner::new();
390 aligner.update("A", 10_000);
391 aligner.update("B", 5_000);
392 assert!(!aligner.all_beyond(6_000));
393 }
394
395 #[test]
396 fn test_aligner_all_beyond_empty_is_false() {
397 let aligner = WatermarkAligner::new();
398 assert!(!aligner.all_beyond(0));
399 }
400
401 #[test]
402 fn test_aligner_update_overwrites() {
403 let mut aligner = WatermarkAligner::new();
404 aligner.update("src", 1000);
405 aligner.update("src", 9000);
406 assert_eq!(aligner.global_watermark(), 9000);
407 assert_eq!(aligner.source_count(), 1);
408 }
409
410 #[test]
413 fn test_late_handler_on_time_event_is_process() {
414 let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
415 let decision = handler.handle(5000, 4000);
416 assert_eq!(decision, LateDataDecision::Process);
417 assert_eq!(handler.late_event_count, 0);
418 }
419
420 #[test]
421 fn test_late_handler_drop_policy() {
422 let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
423 let decision = handler.handle(1000, 5000);
424 assert_eq!(decision, LateDataDecision::Drop);
425 assert_eq!(handler.late_event_count, 1);
426 }
427
428 #[test]
429 fn test_late_handler_reassign_within_max_lateness() {
430 let mut handler = LateDataHandler::new(LateDataPolicy::Reassign {
431 max_lateness_ms: 2000,
432 });
433 let decision = handler.handle(4000, 5000);
435 assert_eq!(decision, LateDataDecision::Reassign(5000));
436 }
437
438 #[test]
439 fn test_late_handler_reassign_exceeds_max_lateness_drops() {
440 let mut handler = LateDataHandler::new(LateDataPolicy::Reassign {
441 max_lateness_ms: 500,
442 });
443 let decision = handler.handle(3000, 5000);
445 assert_eq!(decision, LateDataDecision::Drop);
446 assert_eq!(handler.late_event_count, 1);
447 }
448
449 #[test]
450 fn test_late_handler_side_output_policy() {
451 let mut handler = LateDataHandler::new(LateDataPolicy::SideOutput {
452 channel: "late-events".to_string(),
453 });
454 let decision = handler.handle(1000, 5000);
455 assert_eq!(decision, LateDataDecision::SideOutput);
456 assert_eq!(handler.late_event_count, 1);
457 }
458
459 #[test]
460 fn test_late_handler_counts_accumulate() {
461 let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
462 handler.handle(100, 1000);
463 handler.handle(200, 1000);
464 handler.handle(5000, 1000); assert_eq!(handler.late_event_count, 2);
466 }
467
468 #[test]
469 fn test_stream_watermark_equality() {
470 let w1 = StreamWatermark::new(1000, "s1", 1);
471 let w2 = StreamWatermark::new(1000, "s1", 1);
472 let w3 = StreamWatermark::new(2000, "s1", 1);
473 assert_eq!(w1, w2);
474 assert_ne!(w1, w3);
475 }
476}