#[derive(Debug, Clone, Copy, PartialEq, PartialOrd)]
pub struct Watermark {
pub timestamp: f64,
}
impl Watermark {
pub fn new(timestamp: f64) -> Self {
Self { timestamp }
}
pub fn is_after(&self, other: &Watermark) -> bool {
self.timestamp > other.timestamp
}
pub fn covers(&self, event_time: f64) -> bool {
self.timestamp >= event_time
}
}
#[derive(Debug, Clone)]
pub struct EventTimeTracker {
pub max_event_time: f64,
pub watermark: f64,
pub max_out_of_order_delay: f64,
pub watermarks_emitted: usize,
pub late_events: usize,
}
impl EventTimeTracker {
pub fn new(max_delay: f64) -> Self {
Self {
max_event_time: f64::NEG_INFINITY,
watermark: f64::NEG_INFINITY,
max_out_of_order_delay: max_delay,
watermarks_emitted: 0,
late_events: 0,
}
}
pub fn process_event(&mut self, event_time: f64) -> Option<Watermark> {
if event_time <= self.watermark {
self.late_events += 1;
}
if event_time > self.max_event_time {
self.max_event_time = event_time;
}
let new_wm = self.max_event_time - self.max_out_of_order_delay;
if new_wm > self.watermark {
self.watermark = new_wm;
self.watermarks_emitted += 1;
Some(Watermark {
timestamp: new_wm,
})
} else {
None
}
}
pub fn advance_to(&mut self, time: f64) -> Option<Watermark> {
if time > self.watermark {
self.watermark = time;
self.watermarks_emitted += 1;
Some(Watermark { timestamp: time })
} else {
None
}
}
pub fn is_event_late(&self, event_time: f64) -> bool {
event_time <= self.watermark
}
pub fn current_watermark(&self) -> Option<Watermark> {
if self.watermark == f64::NEG_INFINITY {
None
} else {
Some(Watermark {
timestamp: self.watermark,
})
}
}
}
#[derive(Debug, Clone)]
pub struct MultiStreamWatermark {
stream_watermarks: Vec<f64>,
pub global_watermark: f64,
}
impl MultiStreamWatermark {
pub fn new(num_streams: usize) -> Self {
Self {
stream_watermarks: vec![f64::NEG_INFINITY; num_streams],
global_watermark: f64::NEG_INFINITY,
}
}
pub fn update_stream(&mut self, stream_id: usize, watermark: f64) -> Option<Watermark> {
if stream_id >= self.stream_watermarks.len() {
return None;
}
self.stream_watermarks[stream_id] = watermark;
let new_global = self
.stream_watermarks
.iter()
.cloned()
.fold(f64::INFINITY, f64::min);
if new_global > self.global_watermark {
self.global_watermark = new_global;
Some(Watermark {
timestamp: new_global,
})
} else {
None
}
}
pub fn num_streams(&self) -> usize {
self.stream_watermarks.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_watermark_advancement() {
let mut tracker = EventTimeTracker::new(2.0);
let wm = tracker.process_event(10.0);
assert!(wm.is_some());
assert_eq!(wm.expect("watermark should advance").timestamp, 8.0);
let wm = tracker.process_event(12.0);
assert!(wm.is_some());
assert_eq!(wm.expect("watermark should advance").timestamp, 10.0);
let wm = tracker.process_event(11.0);
assert!(wm.is_none(), "Watermark should not advance for lower event");
}
#[test]
fn test_late_event_detection() {
let mut tracker = EventTimeTracker::new(0.0);
tracker.process_event(10.0);
assert!(tracker.is_event_late(9.0));
assert!(!tracker.is_event_late(11.0));
}
#[test]
fn test_advance_to() {
let mut tracker = EventTimeTracker::new(1.0);
let wm = tracker.advance_to(100.0);
assert!(wm.is_some());
assert_eq!(wm.expect("advance_to should emit watermark").timestamp, 100.0);
let wm = tracker.advance_to(50.0);
assert!(wm.is_none());
}
#[test]
fn test_multi_stream_watermark() {
let mut msw = MultiStreamWatermark::new(3);
msw.update_stream(0, 10.0);
msw.update_stream(1, 8.0);
assert_eq!(msw.global_watermark, f64::NEG_INFINITY);
msw.update_stream(2, 6.0);
assert_eq!(msw.global_watermark, 6.0);
}
}