use std::collections::HashMap;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StreamWatermark {
pub timestamp: i64,
pub source_id: String,
pub advance_count: u64,
}
impl StreamWatermark {
pub fn new(timestamp: i64, source_id: impl Into<String>, advance_count: u64) -> Self {
Self {
timestamp,
source_id: source_id.into(),
advance_count,
}
}
}
pub struct WatermarkGenerator {
max_out_of_order_ms: i64,
current_max_ts: i64,
advance_threshold: usize,
event_count: usize,
advance_count: u64,
source_id: String,
}
impl WatermarkGenerator {
pub fn new(max_out_of_order_ms: i64) -> Self {
Self {
max_out_of_order_ms,
current_max_ts: i64::MIN,
advance_threshold: 100,
event_count: 0,
advance_count: 0,
source_id: "default".to_string(),
}
}
pub fn with_advance_threshold(mut self, threshold: usize) -> Self {
self.advance_threshold = threshold;
self
}
pub fn with_source_id(mut self, source_id: impl Into<String>) -> Self {
self.source_id = source_id.into();
self
}
pub fn observe(&mut self, event_timestamp_ms: i64) -> Option<StreamWatermark> {
if event_timestamp_ms > self.current_max_ts {
self.current_max_ts = event_timestamp_ms;
}
self.event_count += 1;
if self.event_count >= self.advance_threshold {
self.event_count = 0;
self.advance_count += 1;
Some(StreamWatermark::new(
self.current_watermark(),
self.source_id.clone(),
self.advance_count,
))
} else {
None
}
}
pub fn current_watermark(&self) -> i64 {
if self.current_max_ts == i64::MIN {
i64::MIN
} else {
self.current_max_ts - self.max_out_of_order_ms
}
}
pub fn is_late(&self, event_timestamp_ms: i64) -> bool {
event_timestamp_ms < self.current_watermark()
}
}
pub struct WatermarkAligner {
sources: HashMap<String, i64>,
}
impl WatermarkAligner {
pub fn new() -> Self {
Self {
sources: HashMap::new(),
}
}
pub fn update(&mut self, source_id: &str, watermark_ms: i64) {
self.sources.insert(source_id.to_string(), watermark_ms);
}
pub fn global_watermark(&self) -> i64 {
self.sources.values().copied().min().unwrap_or(i64::MIN)
}
pub fn source_count(&self) -> usize {
self.sources.len()
}
pub fn all_beyond(&self, timestamp_ms: i64) -> bool {
if self.sources.is_empty() {
return false;
}
self.sources.values().all(|&wm| wm > timestamp_ms)
}
}
impl Default for WatermarkAligner {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub enum LateDataPolicy {
Drop,
Reassign { max_lateness_ms: i64 },
SideOutput { channel: String },
}
#[derive(Debug, Clone, PartialEq)]
pub enum LateDataDecision {
Process,
Drop,
Reassign(i64),
SideOutput,
}
pub struct LateDataHandler {
pub policy: LateDataPolicy,
pub late_event_count: u64,
}
impl LateDataHandler {
pub fn new(policy: LateDataPolicy) -> Self {
Self {
policy,
late_event_count: 0,
}
}
pub fn handle(&mut self, event_ts_ms: i64, watermark_ms: i64) -> LateDataDecision {
if event_ts_ms >= watermark_ms {
return LateDataDecision::Process;
}
self.late_event_count += 1;
match &self.policy {
LateDataPolicy::Drop => LateDataDecision::Drop,
LateDataPolicy::Reassign { max_lateness_ms } => {
let lateness = watermark_ms - event_ts_ms;
if lateness <= *max_lateness_ms {
LateDataDecision::Reassign(watermark_ms)
} else {
LateDataDecision::Drop
}
}
LateDataPolicy::SideOutput { .. } => LateDataDecision::SideOutput,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_generator_initial_watermark_is_min() {
let gen = WatermarkGenerator::new(500);
assert_eq!(gen.current_watermark(), i64::MIN);
}
#[test]
fn test_generator_no_watermark_before_threshold() {
let mut gen = WatermarkGenerator::new(100).with_advance_threshold(10);
for ts in 0..9 {
let wm = gen.observe(ts * 100);
assert!(wm.is_none(), "should not emit before threshold");
}
}
#[test]
fn test_generator_emits_at_threshold() {
let mut gen = WatermarkGenerator::new(100).with_advance_threshold(5);
for ts in 0..5 {
let _wm = gen.observe(ts * 1000);
}
let wm = gen.observe(5000);
let mut gen2 = WatermarkGenerator::new(100).with_advance_threshold(5);
let mut last_wm = None;
for i in 0..5 {
last_wm = gen2.observe(i * 1000);
}
assert!(last_wm.is_some(), "watermark must be emitted at threshold");
let w = last_wm.unwrap();
assert_eq!(w.timestamp, 3900);
assert_eq!(w.advance_count, 1);
drop(wm);
}
#[test]
fn test_generator_advance_count_increments() {
let mut gen = WatermarkGenerator::new(0).with_advance_threshold(3);
for i in 0..6 {
gen.observe(i * 1000);
}
assert_eq!(gen.advance_count, 2);
}
#[test]
fn test_generator_current_watermark_formula() {
let mut gen = WatermarkGenerator::new(200).with_advance_threshold(100);
gen.observe(5000);
assert_eq!(gen.current_watermark(), 4800); }
#[test]
fn test_generator_is_late_true() {
let mut gen = WatermarkGenerator::new(100).with_advance_threshold(100);
gen.observe(10_000);
assert!(gen.is_late(9000));
}
#[test]
fn test_generator_is_late_false() {
let mut gen = WatermarkGenerator::new(100).with_advance_threshold(100);
gen.observe(10_000);
assert!(!gen.is_late(9900));
assert!(!gen.is_late(10_000));
}
#[test]
fn test_generator_source_id() {
let mut gen = WatermarkGenerator::new(0)
.with_advance_threshold(1)
.with_source_id("sensor-42");
let wm = gen.observe(1000).expect("should emit");
assert_eq!(wm.source_id, "sensor-42");
}
#[test]
fn test_generator_max_ts_tracks_maximum() {
let mut gen = WatermarkGenerator::new(0).with_advance_threshold(100);
gen.observe(500);
gen.observe(1000);
gen.observe(300);
assert_eq!(gen.current_max_ts, 1000);
assert_eq!(gen.current_watermark(), 1000);
}
#[test]
fn test_aligner_empty_returns_min() {
let aligner = WatermarkAligner::new();
assert_eq!(aligner.global_watermark(), i64::MIN);
assert_eq!(aligner.source_count(), 0);
}
#[test]
fn test_aligner_single_source() {
let mut aligner = WatermarkAligner::new();
aligner.update("src-A", 5000);
assert_eq!(aligner.global_watermark(), 5000);
assert_eq!(aligner.source_count(), 1);
}
#[test]
fn test_aligner_global_is_minimum() {
let mut aligner = WatermarkAligner::new();
aligner.update("src-A", 5000);
aligner.update("src-B", 3000);
aligner.update("src-C", 7000);
assert_eq!(aligner.global_watermark(), 3000);
}
#[test]
fn test_aligner_all_beyond_true() {
let mut aligner = WatermarkAligner::new();
aligner.update("A", 10_000);
aligner.update("B", 12_000);
assert!(aligner.all_beyond(9_999));
}
#[test]
fn test_aligner_all_beyond_false_one_lagging() {
let mut aligner = WatermarkAligner::new();
aligner.update("A", 10_000);
aligner.update("B", 5_000);
assert!(!aligner.all_beyond(6_000));
}
#[test]
fn test_aligner_all_beyond_empty_is_false() {
let aligner = WatermarkAligner::new();
assert!(!aligner.all_beyond(0));
}
#[test]
fn test_aligner_update_overwrites() {
let mut aligner = WatermarkAligner::new();
aligner.update("src", 1000);
aligner.update("src", 9000);
assert_eq!(aligner.global_watermark(), 9000);
assert_eq!(aligner.source_count(), 1);
}
#[test]
fn test_late_handler_on_time_event_is_process() {
let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
let decision = handler.handle(5000, 4000);
assert_eq!(decision, LateDataDecision::Process);
assert_eq!(handler.late_event_count, 0);
}
#[test]
fn test_late_handler_drop_policy() {
let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
let decision = handler.handle(1000, 5000);
assert_eq!(decision, LateDataDecision::Drop);
assert_eq!(handler.late_event_count, 1);
}
#[test]
fn test_late_handler_reassign_within_max_lateness() {
let mut handler = LateDataHandler::new(LateDataPolicy::Reassign {
max_lateness_ms: 2000,
});
let decision = handler.handle(4000, 5000);
assert_eq!(decision, LateDataDecision::Reassign(5000));
}
#[test]
fn test_late_handler_reassign_exceeds_max_lateness_drops() {
let mut handler = LateDataHandler::new(LateDataPolicy::Reassign {
max_lateness_ms: 500,
});
let decision = handler.handle(3000, 5000);
assert_eq!(decision, LateDataDecision::Drop);
assert_eq!(handler.late_event_count, 1);
}
#[test]
fn test_late_handler_side_output_policy() {
let mut handler = LateDataHandler::new(LateDataPolicy::SideOutput {
channel: "late-events".to_string(),
});
let decision = handler.handle(1000, 5000);
assert_eq!(decision, LateDataDecision::SideOutput);
assert_eq!(handler.late_event_count, 1);
}
#[test]
fn test_late_handler_counts_accumulate() {
let mut handler = LateDataHandler::new(LateDataPolicy::Drop);
handler.handle(100, 1000);
handler.handle(200, 1000);
handler.handle(5000, 1000); assert_eq!(handler.late_event_count, 2);
}
#[test]
fn test_stream_watermark_equality() {
let w1 = StreamWatermark::new(1000, "s1", 1);
let w2 = StreamWatermark::new(1000, "s1", 1);
let w3 = StreamWatermark::new(2000, "s1", 1);
assert_eq!(w1, w2);
assert_ne!(w1, w3);
}
}