use std::collections::{HashMap, VecDeque};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Watermark {
pub timestamp: i64,
pub source_id: String,
}
impl Watermark {
pub fn new(source_id: impl Into<String>, timestamp: i64) -> Self {
Self {
timestamp,
source_id: source_id.into(),
}
}
}
#[derive(Debug, Clone)]
pub struct WatermarkConfig {
pub allowed_lateness_ms: i64,
pub idle_source_timeout_ms: i64,
pub max_history_size: usize,
}
impl Default for WatermarkConfig {
fn default() -> Self {
Self {
allowed_lateness_ms: 5_000,
idle_source_timeout_ms: 30_000,
max_history_size: 1_000,
}
}
}
#[derive(Debug, Clone)]
pub struct SourceWatermark {
pub source_id: String,
pub current: i64,
pub last_updated_opt: Option<i64>,
}
impl SourceWatermark {
fn new(source_id: impl Into<String>) -> Self {
Self {
source_id: source_id.into(),
current: i64::MIN,
last_updated_opt: None,
}
}
pub fn last_updated(&self) -> i64 {
self.last_updated_opt.unwrap_or(0)
}
}
#[derive(Debug)]
pub struct WatermarkTracker {
sources: HashMap<String, SourceWatermark>,
global_watermark: i64,
config: WatermarkConfig,
watermark_history: VecDeque<i64>,
}
impl WatermarkTracker {
pub fn new(config: WatermarkConfig) -> Self {
Self {
sources: HashMap::new(),
global_watermark: i64::MIN,
config,
watermark_history: VecDeque::new(),
}
}
pub fn register_source(&mut self, source_id: impl Into<String>) {
let id: String = source_id.into();
self.sources
.entry(id.clone())
.or_insert_with(|| SourceWatermark::new(id));
self.recompute_global();
}
pub fn update(&mut self, watermark: Watermark, current_time_ms: i64) -> Option<i64> {
let old_global = self.global_watermark;
let source = self
.sources
.entry(watermark.source_id.clone())
.or_insert_with(|| SourceWatermark::new(watermark.source_id.clone()));
if watermark.timestamp > source.current {
source.current = watermark.timestamp;
}
source.last_updated_opt = Some(current_time_ms);
self.recompute_global();
if self.global_watermark > old_global {
self.record_history(self.global_watermark);
Some(self.global_watermark)
} else {
None
}
}
pub fn global_watermark(&self) -> i64 {
self.global_watermark
}
pub fn source_watermark(&self, source_id: &str) -> Option<i64> {
self.sources.get(source_id).map(|s| s.current)
}
pub fn deregister_idle_sources(&mut self, current_time_ms: i64) -> Vec<String> {
let timeout = self.config.idle_source_timeout_ms;
let idle: Vec<String> = self
.sources
.iter()
.filter(|(_, s)| {
if let Some(last) = s.last_updated_opt {
(current_time_ms - last) >= timeout
} else {
false }
})
.map(|(id, _)| id.clone())
.collect();
for id in &idle {
self.sources.remove(id);
}
if !idle.is_empty() {
let old_global = self.global_watermark;
self.recompute_global();
if self.global_watermark > old_global {
self.record_history(self.global_watermark);
}
}
idle
}
pub fn source_count(&self) -> usize {
self.sources.len()
}
pub fn active_source_count(&self, current_time_ms: i64) -> usize {
let timeout = self.config.idle_source_timeout_ms;
self.sources
.values()
.filter(|s| {
match s.last_updated_opt {
None => true, Some(last) => (current_time_ms - last) < timeout,
}
})
.count()
}
pub fn watermark_history(&self) -> &VecDeque<i64> {
&self.watermark_history
}
pub fn is_late(&self, event_time: i64) -> bool {
if self.global_watermark == i64::MIN {
return false;
}
event_time < self.global_watermark - self.config.allowed_lateness_ms
}
pub fn advance_to(&mut self, timestamp: i64) -> bool {
if timestamp > self.global_watermark {
self.global_watermark = timestamp;
self.record_history(timestamp);
true
} else {
false
}
}
fn recompute_global(&mut self) {
if self.sources.is_empty() {
return;
}
let min = self
.sources
.values()
.map(|s| s.current)
.min()
.unwrap_or(i64::MIN);
if min > self.global_watermark {
self.global_watermark = min;
}
}
fn record_history(&mut self, value: i64) {
if self.watermark_history.len() >= self.config.max_history_size {
self.watermark_history.pop_front();
}
self.watermark_history.push_back(value);
}
}
impl Default for WatermarkTracker {
fn default() -> Self {
Self::new(WatermarkConfig::default())
}
}
#[cfg(test)]
mod tests {
use super::*;
fn cfg() -> WatermarkConfig {
WatermarkConfig {
allowed_lateness_ms: 100,
idle_source_timeout_ms: 1_000,
max_history_size: 50,
}
}
fn wm(source: &str, ts: i64) -> Watermark {
Watermark::new(source, ts)
}
#[test]
fn test_single_source_watermark_advance() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.register_source("s1");
let result = tracker.update(wm("s1", 100), 0);
assert_eq!(result, Some(100));
assert_eq!(tracker.global_watermark(), 100);
}
#[test]
fn test_single_source_monotonic() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.register_source("s1");
tracker.update(wm("s1", 200), 0);
let result = tracker.update(wm("s1", 100), 1); assert_eq!(result, None);
assert_eq!(tracker.global_watermark(), 200);
}
#[test]
fn test_multi_source_minimum() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.register_source("s1");
tracker.register_source("s2");
tracker.update(wm("s1", 500), 0);
assert_eq!(tracker.global_watermark(), i64::MIN);
tracker.update(wm("s2", 200), 0);
assert_eq!(tracker.global_watermark(), 200); }
#[test]
fn test_multi_source_both_advance() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.register_source("s1");
tracker.register_source("s2");
tracker.update(wm("s1", 100), 0);
tracker.update(wm("s2", 100), 0);
assert_eq!(tracker.global_watermark(), 100);
tracker.update(wm("s1", 300), 1);
tracker.update(wm("s2", 300), 1);
assert_eq!(tracker.global_watermark(), 300);
}
#[test]
fn test_idle_source_removal() {
let mut tracker = WatermarkTracker::new(WatermarkConfig {
idle_source_timeout_ms: 1_000,
..cfg()
});
tracker.register_source("s1");
tracker.register_source("s2");
tracker.update(wm("s1", 100), 0);
tracker.update(wm("s2", 200), 0);
let removed = tracker.deregister_idle_sources(2_000);
assert_eq!(removed.len(), 2);
assert_eq!(tracker.source_count(), 0);
}
#[test]
fn test_deregister_returns_correct_ids() {
let mut tracker = WatermarkTracker::new(WatermarkConfig {
idle_source_timeout_ms: 1_000,
..cfg()
});
tracker.register_source("s1");
tracker.register_source("s2");
tracker.update(wm("s1", 100), 0);
tracker.update(wm("s2", 200), 0);
let mut removed = tracker.deregister_idle_sources(1_500);
removed.sort();
assert!(removed.contains(&"s1".to_string()));
assert!(removed.contains(&"s2".to_string()));
}
#[test]
fn test_never_updated_not_idle() {
let mut tracker = WatermarkTracker::new(WatermarkConfig {
idle_source_timeout_ms: 1_000,
..cfg()
});
tracker.register_source("s1");
let removed = tracker.deregister_idle_sources(5_000);
assert!(removed.is_empty());
}
#[test]
fn test_late_event_detection() {
let mut tracker = WatermarkTracker::new(WatermarkConfig {
allowed_lateness_ms: 100,
..cfg()
});
tracker.register_source("s1");
tracker.update(wm("s1", 1_000), 0);
assert!(tracker.is_late(895));
assert!(!tracker.is_late(905));
assert!(!tracker.is_late(900));
}
#[test]
fn test_not_late_when_no_watermark() {
let tracker = WatermarkTracker::new(cfg());
assert!(!tracker.is_late(0));
assert!(!tracker.is_late(i64::MIN));
}
#[test]
fn test_advance_to() {
let mut tracker = WatermarkTracker::new(cfg());
let advanced = tracker.advance_to(500);
assert!(advanced);
assert_eq!(tracker.global_watermark(), 500);
}
#[test]
fn test_advance_to_no_backward() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.advance_to(500);
let advanced = tracker.advance_to(100);
assert!(!advanced);
assert_eq!(tracker.global_watermark(), 500);
}
#[test]
fn test_advance_to_same_value() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.advance_to(500);
let result = tracker.advance_to(500);
assert!(!result);
}
#[test]
fn test_history_recording() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.register_source("s1");
tracker.update(wm("s1", 100), 0);
tracker.update(wm("s1", 200), 1);
tracker.update(wm("s1", 300), 2);
let h = tracker.watermark_history();
assert_eq!(h.len(), 3);
assert_eq!(*h.front().unwrap(), 100);
assert_eq!(*h.back().unwrap(), 300);
}
#[test]
fn test_history_capped() {
let mut tracker = WatermarkTracker::new(WatermarkConfig {
max_history_size: 5,
..cfg()
});
tracker.register_source("s1");
for i in 1..=10 {
tracker.update(wm("s1", i * 100), i as i64);
}
assert_eq!(tracker.watermark_history().len(), 5);
}
#[test]
fn test_advance_to_history() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.advance_to(1_000);
assert_eq!(tracker.watermark_history().len(), 1);
assert_eq!(*tracker.watermark_history().front().unwrap(), 1_000);
}
#[test]
fn test_source_watermark() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.register_source("s1");
assert_eq!(tracker.source_watermark("s1"), Some(i64::MIN));
tracker.update(wm("s1", 300), 0);
assert_eq!(tracker.source_watermark("s1"), Some(300));
}
#[test]
fn test_source_watermark_unknown() {
let tracker = WatermarkTracker::new(cfg());
assert_eq!(tracker.source_watermark("unknown"), None);
}
#[test]
fn test_source_count() {
let mut tracker = WatermarkTracker::new(cfg());
assert_eq!(tracker.source_count(), 0);
tracker.register_source("s1");
tracker.register_source("s2");
assert_eq!(tracker.source_count(), 2);
}
#[test]
fn test_active_source_count() {
let mut tracker = WatermarkTracker::new(WatermarkConfig {
idle_source_timeout_ms: 1_000,
..cfg()
});
tracker.register_source("s1");
tracker.register_source("s2");
assert_eq!(tracker.active_source_count(0), 2);
tracker.update(wm("s1", 100), 0);
assert_eq!(tracker.active_source_count(2_000), 1); }
#[test]
fn test_global_is_minimum_of_sources() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.register_source("s1");
tracker.register_source("s2");
tracker.register_source("s3");
tracker.update(wm("s1", 1_000), 0);
tracker.update(wm("s2", 500), 0);
tracker.update(wm("s3", 750), 0);
assert_eq!(tracker.global_watermark(), 500);
}
#[test]
fn test_global_advances_after_source_removal() {
let mut tracker = WatermarkTracker::new(WatermarkConfig {
idle_source_timeout_ms: 1_000,
..cfg()
});
tracker.register_source("s1");
tracker.register_source("s2");
tracker.update(wm("s1", 1_000), 0);
tracker.update(wm("s2", 50), 0);
assert_eq!(tracker.global_watermark(), 50);
tracker.update(wm("s1", 1_500), 1_500);
let removed = tracker.deregister_idle_sources(2_000);
assert!(removed.contains(&"s2".to_string()));
assert_eq!(tracker.global_watermark(), 1_500);
}
#[test]
fn test_register_source_idempotent() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.register_source("s1");
tracker.register_source("s1");
assert_eq!(tracker.source_count(), 1);
}
#[test]
fn test_update_auto_registers_source() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.update(wm("new_source", 100), 0);
assert!(tracker.source_watermark("new_source").is_some());
}
#[test]
fn test_is_late_boundary() {
let mut tracker = WatermarkTracker::new(WatermarkConfig {
allowed_lateness_ms: 200,
..cfg()
});
tracker.advance_to(1_000);
assert!(!tracker.is_late(800)); assert!(tracker.is_late(799)); }
#[test]
fn test_advance_to_then_source_update() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.advance_to(1_000);
tracker.register_source("s1");
assert_eq!(tracker.global_watermark(), 1_000);
}
#[test]
fn test_update_returns_none_no_global_advance() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.register_source("s1");
tracker.register_source("s2");
tracker.update(wm("s2", 1_000), 0);
let result = tracker.update(wm("s2", 2_000), 1);
assert_eq!(result, None);
}
#[test]
fn test_watermark_equality() {
let w1 = Watermark::new("s1", 100);
let w2 = Watermark::new("s1", 100);
assert_eq!(w1, w2);
}
#[test]
fn test_default_tracker() {
let tracker = WatermarkTracker::default();
assert_eq!(tracker.global_watermark(), i64::MIN);
assert_eq!(tracker.source_count(), 0);
}
#[test]
fn test_empty_history_initially() {
let tracker = WatermarkTracker::new(cfg());
assert!(tracker.watermark_history().is_empty());
}
#[test]
fn test_fast_source_does_not_move_global_past_slow() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.register_source("fast");
tracker.register_source("slow");
tracker.update(wm("slow", 10), 0);
tracker.update(wm("fast", 10), 0);
for i in 1..=10 {
tracker.update(wm("fast", i * 1_000), i as i64);
}
assert_eq!(tracker.global_watermark(), 10);
}
#[test]
fn test_deregister_no_idle() {
let mut tracker = WatermarkTracker::new(WatermarkConfig {
idle_source_timeout_ms: 10_000,
..cfg()
});
tracker.register_source("s1");
tracker.update(wm("s1", 100), 0);
let removed = tracker.deregister_idle_sources(5_000);
assert!(removed.is_empty());
}
#[test]
fn test_active_source_count_empty() {
let tracker = WatermarkTracker::new(cfg());
assert_eq!(tracker.active_source_count(0), 0);
}
#[test]
fn test_source_count_after_deregister() {
let mut tracker = WatermarkTracker::new(WatermarkConfig {
idle_source_timeout_ms: 1_000,
..cfg()
});
tracker.register_source("s1");
tracker.update(wm("s1", 100), 0);
tracker.deregister_idle_sources(5_000);
assert_eq!(tracker.source_count(), 0);
}
#[test]
fn test_history_does_not_grow_on_no_advance() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.register_source("s1");
tracker.register_source("s2");
tracker.update(wm("s1", 500), 0);
assert!(tracker.watermark_history().is_empty());
}
#[test]
fn test_is_late_zero_lateness() {
let mut tracker = WatermarkTracker::new(WatermarkConfig {
allowed_lateness_ms: 0,
..cfg()
});
tracker.advance_to(1_000);
assert!(tracker.is_late(999));
assert!(!tracker.is_late(1_000));
}
#[test]
fn test_watermark_config_default() {
let cfg = WatermarkConfig::default();
assert_eq!(cfg.allowed_lateness_ms, 5_000);
assert_eq!(cfg.idle_source_timeout_ms, 30_000);
assert_eq!(cfg.max_history_size, 1_000);
}
#[test]
fn test_source_watermark_starts_at_min() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.register_source("s1");
assert_eq!(tracker.source_watermark("s1"), Some(i64::MIN));
}
#[test]
fn test_remove_slowest_source_global_advances() {
let mut tracker = WatermarkTracker::new(WatermarkConfig {
idle_source_timeout_ms: 1_000,
..cfg()
});
tracker.register_source("s1");
tracker.register_source("s2");
tracker.register_source("s3");
tracker.update(wm("s1", 100), 0); tracker.update(wm("s2", 500), 0);
tracker.update(wm("s3", 900), 0);
assert_eq!(tracker.global_watermark(), 100);
tracker.update(wm("s2", 600), 2_000);
tracker.update(wm("s3", 1_000), 2_000);
let removed = tracker.deregister_idle_sources(2_000);
assert!(removed.contains(&"s1".to_string()));
assert_eq!(tracker.global_watermark(), 600);
}
#[test]
fn test_advance_to_increments_history() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.advance_to(1_000);
tracker.advance_to(2_000);
tracker.advance_to(3_000);
assert_eq!(tracker.watermark_history().len(), 3);
}
#[test]
fn test_history_recorded_on_source_advance() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.register_source("s1");
tracker.update(wm("s1", 1_000), 0);
assert_eq!(tracker.watermark_history().len(), 1);
assert_eq!(*tracker.watermark_history().back().unwrap(), 1_000);
}
#[test]
fn test_history_no_duplicate_same_ts() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.register_source("s1");
tracker.update(wm("s1", 500), 0);
tracker.update(wm("s1", 500), 1);
assert_eq!(tracker.watermark_history().len(), 1);
}
#[test]
fn test_is_late_global_min() {
let tracker = WatermarkTracker::new(cfg());
assert!(!tracker.is_late(i64::MAX));
}
#[test]
fn test_source_watermark_reflects_latest() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.register_source("s1");
tracker.update(wm("s1", 100), 0);
tracker.update(wm("s1", 200), 1);
tracker.update(wm("s1", 300), 2);
assert_eq!(tracker.source_watermark("s1"), Some(300));
}
#[test]
fn test_source_registered_after_advance_to() {
let mut tracker = WatermarkTracker::new(cfg());
tracker.advance_to(500);
tracker.register_source("s1");
assert_eq!(tracker.global_watermark(), 500);
}
#[test]
fn test_active_count_after_deregister() {
let mut tracker = WatermarkTracker::new(WatermarkConfig {
idle_source_timeout_ms: 1_000,
..cfg()
});
tracker.register_source("s1");
tracker.register_source("s2");
tracker.update(wm("s1", 100), 0);
tracker.update(wm("s2", 200), 0);
tracker.deregister_idle_sources(5_000);
assert_eq!(tracker.active_source_count(5_000), 0);
}
#[test]
fn test_watermark_new_helper() {
let w = Watermark::new("src", 42);
assert_eq!(w.source_id, "src");
assert_eq!(w.timestamp, 42);
}
#[test]
fn test_many_sources_global_is_minimum() {
let mut tracker = WatermarkTracker::new(cfg());
for i in 1..=20 {
tracker.register_source(format!("s{i}"));
tracker.update(wm(&format!("s{i}"), i * 100), 0);
}
assert_eq!(tracker.global_watermark(), 100);
}
}