1use std::collections::HashMap;
13
14#[derive(Debug, Clone, PartialEq, Eq)]
21pub struct StreamWatermark {
22 pub timestamp: i64,
24 pub source_id: String,
26 pub advance_count: u64,
28}
29
30impl StreamWatermark {
31 pub fn new(timestamp: i64, source_id: impl Into<String>, advance_count: u64) -> Self {
32 Self {
33 timestamp,
34 source_id: source_id.into(),
35 advance_count,
36 }
37 }
38}
39
40pub struct WatermarkGenerator {
48 max_out_of_order_ms: i64,
49 current_max_ts: i64,
50 advance_threshold: usize,
51 event_count: usize,
52 advance_count: u64,
53 source_id: String,
54}
55
56impl WatermarkGenerator {
57 pub fn new(max_out_of_order_ms: i64) -> Self {
61 Self {
62 max_out_of_order_ms,
63 current_max_ts: i64::MIN,
64 advance_threshold: 100,
65 event_count: 0,
66 advance_count: 0,
67 source_id: "default".to_string(),
68 }
69 }
70
71 pub fn with_advance_threshold(mut self, threshold: usize) -> Self {
73 self.advance_threshold = threshold;
74 self
75 }
76
77 pub fn with_source_id(mut self, source_id: impl Into<String>) -> Self {
79 self.source_id = source_id.into();
80 self
81 }
82
83 pub fn observe(&mut self, event_timestamp_ms: i64) -> Option<StreamWatermark> {
87 if event_timestamp_ms > self.current_max_ts {
88 self.current_max_ts = event_timestamp_ms;
89 }
90 self.event_count += 1;
91
92 if self.event_count >= self.advance_threshold {
93 self.event_count = 0;
94 self.advance_count += 1;
95 Some(StreamWatermark::new(
96 self.current_watermark(),
97 self.source_id.clone(),
98 self.advance_count,
99 ))
100 } else {
101 None
102 }
103 }
104
105 pub fn current_watermark(&self) -> i64 {
109 if self.current_max_ts == i64::MIN {
110 i64::MIN
111 } else {
112 self.current_max_ts - self.max_out_of_order_ms
113 }
114 }
115
116 pub fn is_late(&self, event_timestamp_ms: i64) -> bool {
118 event_timestamp_ms < self.current_watermark()
119 }
120}
121
122pub struct WatermarkAligner {
129 sources: HashMap<String, i64>,
130}
131
132impl WatermarkAligner {
133 pub fn new() -> Self {
135 Self {
136 sources: HashMap::new(),
137 }
138 }
139
140 pub fn update(&mut self, source_id: &str, watermark_ms: i64) {
142 self.sources.insert(source_id.to_string(), watermark_ms);
143 }
144
145 pub fn global_watermark(&self) -> i64 {
149 self.sources.values().copied().min().unwrap_or(i64::MIN)
150 }
151
152 pub fn source_count(&self) -> usize {
154 self.sources.len()
155 }
156
157 pub fn all_beyond(&self, timestamp_ms: i64) -> bool {
160 if self.sources.is_empty() {
161 return false;
162 }
163 self.sources.values().all(|&wm| wm > timestamp_ms)
164 }
165}
166
167impl Default for WatermarkAligner {
168 fn default() -> Self {
169 Self::new()
170 }
171}
172
173#[derive(Debug, Clone)]
177pub enum LateDataPolicy {
178 Drop,
180 Reassign { max_lateness_ms: i64 },
183 SideOutput { channel: String },
185}
186
187#[derive(Debug, Clone, PartialEq)]
189pub enum LateDataDecision {
190 Process,
192 Drop,
194 Reassign(i64),
196 SideOutput,
198}
199
200pub struct LateDataHandler {
202 pub policy: LateDataPolicy,
203 pub late_event_count: u64,
204}
205
206impl LateDataHandler {
207 pub fn new(policy: LateDataPolicy) -> Self {
209 Self {
210 policy,
211 late_event_count: 0,
212 }
213 }
214
215 pub fn handle(&mut self, event_ts_ms: i64, watermark_ms: i64) -> LateDataDecision {
220 if event_ts_ms >= watermark_ms {
221 return LateDataDecision::Process;
222 }
223 self.late_event_count += 1;
224 match &self.policy {
225 LateDataPolicy::Drop => LateDataDecision::Drop,
226 LateDataPolicy::Reassign { max_lateness_ms } => {
227 let lateness = watermark_ms - event_ts_ms;
228 if lateness <= *max_lateness_ms {
229 LateDataDecision::Reassign(watermark_ms)
230 } else {
231 LateDataDecision::Drop
232 }
233 }
234 LateDataPolicy::SideOutput { .. } => LateDataDecision::SideOutput,
235 }
236 }
237}
238
239#[cfg(test)]
242mod tests {
243 use super::*;
244
245 #[test]
248 fn test_generator_initial_watermark_is_min() {
249 let gen = WatermarkGenerator::new(500);
250 assert_eq!(gen.current_watermark(), i64::MIN);
251 }
252
253 #[test]
254 fn test_generator_no_watermark_before_threshold() {
255 let mut gen = WatermarkGenerator::new(100).with_advance_threshold(10);
256 for ts in 0..9 {
257 let wm = gen.observe(ts * 100);
258 assert!(wm.is_none(), "should not emit before threshold");
259 }
260 }
261
262 #[test]
263 fn test_generator_emits_at_threshold() {
264 let mut gen = WatermarkGenerator::new(100).with_advance_threshold(5);
265 for ts in 0..5 {
266 let _wm = gen.observe(ts * 1000);
267 }
268 let wm = gen.observe(5000);
269 let mut gen2 = WatermarkGenerator::new(100).with_advance_threshold(5);
272 let mut last_wm = None;
273 for i in 0..5 {
274 last_wm = gen2.observe(i * 1000);
275 }
276 assert!(last_wm.is_some(), "watermark must be emitted at threshold");
277 let w = last_wm.unwrap();
278 assert_eq!(w.timestamp, 3900);
280 assert_eq!(w.advance_count, 1);
281 drop(wm);
283 }
284
285 #[test]
286 fn test_generator_advance_count_increments() {
287 let mut gen = WatermarkGenerator::new(0).with_advance_threshold(3);
288 for i in 0..6 {
289 gen.observe(i * 1000);
290 }
291 assert_eq!(gen.advance_count, 2);
292 }
293
294 #[test]
295 fn test_generator_current_watermark_formula() {
296 let mut gen = WatermarkGenerator::new(200).with_advance_threshold(100);
297 gen.observe(5000);
298 assert_eq!(gen.current_watermark(), 4800); }
300
301 #[test]
302 fn test_generator_is_late_true() {
303 let mut gen = WatermarkGenerator::new(100).with_advance_threshold(100);
304 gen.observe(10_000);
305 assert!(gen.is_late(9000));
307 }
308
309 #[test]
310 fn test_generator_is_late_false() {
311 let mut gen = WatermarkGenerator::new(100).with_advance_threshold(100);
312 gen.observe(10_000);
313 assert!(!gen.is_late(9900));
315 assert!(!gen.is_late(10_000));
317 }
318
319 #[test]
320 fn test_generator_source_id() {
321 let mut gen = WatermarkGenerator::new(0)
322 .with_advance_threshold(1)
323 .with_source_id("sensor-42");
324 let wm = gen.observe(1000).expect("should emit");
325 assert_eq!(wm.source_id, "sensor-42");
326 }
327
328 #[test]
329 fn test_generator_max_ts_tracks_maximum() {
330 let mut gen = WatermarkGenerator::new(0).with_advance_threshold(100);
331 gen.observe(500);
333 gen.observe(1000);
334 gen.observe(300);
335 assert_eq!(gen.current_max_ts, 1000);
336 assert_eq!(gen.current_watermark(), 1000);
337 }
338
339 #[test]
342 fn test_aligner_empty_returns_min() {
343 let aligner = WatermarkAligner::new();
344 assert_eq!(aligner.global_watermark(), i64::MIN);
345 assert_eq!(aligner.source_count(), 0);
346 }
347
348 #[test]
349 fn test_aligner_single_source() {
350 let mut aligner = WatermarkAligner::new();
351 aligner.update("src-A", 5000);
352 assert_eq!(aligner.global_watermark(), 5000);
353 assert_eq!(aligner.source_count(), 1);
354 }
355
356 #[test]
357 fn test_aligner_global_is_minimum() {
358 let mut aligner = WatermarkAligner::new();
359 aligner.update("src-A", 5000);
360 aligner.update("src-B", 3000);
361 aligner.update("src-C", 7000);
362 assert_eq!(aligner.global_watermark(), 3000);
363 }
364
365 #[test]
366 fn test_aligner_all_beyond_true() {
367 let mut aligner = WatermarkAligner::new();
368 aligner.update("A", 10_000);
369 aligner.update("B", 12_000);
370 assert!(aligner.all_beyond(9_999));
371 }
372
373 #[test]
374 fn test_aligner_all_beyond_false_one_lagging() {
375 let mut aligner = WatermarkAligner::new();
376 aligner.update("A", 10_000);
377 aligner.update("B", 5_000);
378 assert!(!aligner.all_beyond(6_000));
379 }
380
381 #[test]
382 fn test_aligner_all_beyond_empty_is_false() {
383 let aligner = WatermarkAligner::new();
384 assert!(!aligner.all_beyond(0));
385 }
386
387 #[test]
388 fn test_aligner_update_overwrites() {
389 let mut aligner = WatermarkAligner::new();
390 aligner.update("src", 1000);
391 aligner.update("src", 9000);
392 assert_eq!(aligner.global_watermark(), 9000);
393 assert_eq!(aligner.source_count(), 1);
394 }
395
396 #[test]
399 fn test_late_handler_on_time_event_is_process() {
400 let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
401 let decision = handler.handle(5000, 4000);
402 assert_eq!(decision, LateDataDecision::Process);
403 assert_eq!(handler.late_event_count, 0);
404 }
405
406 #[test]
407 fn test_late_handler_drop_policy() {
408 let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
409 let decision = handler.handle(1000, 5000);
410 assert_eq!(decision, LateDataDecision::Drop);
411 assert_eq!(handler.late_event_count, 1);
412 }
413
414 #[test]
415 fn test_late_handler_reassign_within_max_lateness() {
416 let mut handler = LateDataHandler::new(LateDataPolicy::Reassign {
417 max_lateness_ms: 2000,
418 });
419 let decision = handler.handle(4000, 5000);
421 assert_eq!(decision, LateDataDecision::Reassign(5000));
422 }
423
424 #[test]
425 fn test_late_handler_reassign_exceeds_max_lateness_drops() {
426 let mut handler = LateDataHandler::new(LateDataPolicy::Reassign {
427 max_lateness_ms: 500,
428 });
429 let decision = handler.handle(3000, 5000);
431 assert_eq!(decision, LateDataDecision::Drop);
432 assert_eq!(handler.late_event_count, 1);
433 }
434
435 #[test]
436 fn test_late_handler_side_output_policy() {
437 let mut handler = LateDataHandler::new(LateDataPolicy::SideOutput {
438 channel: "late-events".to_string(),
439 });
440 let decision = handler.handle(1000, 5000);
441 assert_eq!(decision, LateDataDecision::SideOutput);
442 assert_eq!(handler.late_event_count, 1);
443 }
444
445 #[test]
446 fn test_late_handler_counts_accumulate() {
447 let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
448 handler.handle(100, 1000);
449 handler.handle(200, 1000);
450 handler.handle(5000, 1000); assert_eq!(handler.late_event_count, 2);
452 }
453
454 #[test]
455 fn test_stream_watermark_equality() {
456 let w1 = StreamWatermark::new(1000, "s1", 1);
457 let w2 = StreamWatermark::new(1000, "s1", 1);
458 let w3 = StreamWatermark::new(2000, "s1", 1);
459 assert_eq!(w1, w2);
460 assert_ne!(w1, w3);
461 }
462}