use std::time::{SystemTime, UNIX_EPOCH};
#[non_exhaustive]
#[derive(Debug, Clone)]
pub enum WatermarkStrategy {
MonotonousTimestamps,
BoundedOutOfOrder {
max_lateness_ms: u64,
},
Periodic {
interval_ms: u64,
strategy: Box<WatermarkStrategy>,
},
}
#[derive(Debug)]
pub struct WatermarkGenerator {
strategy: WatermarkStrategy,
current_watermark_ms: i64,
max_event_time_ms: i64,
last_emit_processing_ms: u64,
}
impl WatermarkGenerator {
pub fn new(strategy: WatermarkStrategy) -> Self {
Self {
strategy,
current_watermark_ms: i64::MIN,
max_event_time_ms: i64::MIN,
last_emit_processing_ms: 0,
}
}
pub fn observe_event(&mut self, event_time_ms: i64) -> Option<i64> {
if event_time_ms > self.max_event_time_ms {
self.max_event_time_ms = event_time_ms;
}
let new_wm = self.compute_watermark();
if new_wm > self.current_watermark_ms {
self.current_watermark_ms = new_wm;
Some(new_wm)
} else {
None
}
}
pub fn tick(&mut self, processing_time_ms: u64) -> Option<i64> {
if let WatermarkStrategy::Periodic { interval_ms, .. } = &self.strategy {
if processing_time_ms >= self.last_emit_processing_ms + interval_ms {
self.last_emit_processing_ms = processing_time_ms;
let new_wm = self.compute_watermark();
if new_wm > self.current_watermark_ms {
self.current_watermark_ms = new_wm;
return Some(new_wm);
}
}
}
None
}
pub fn current_watermark(&self) -> i64 {
self.current_watermark_ms
}
pub fn max_event_time(&self) -> i64 {
self.max_event_time_ms
}
fn compute_watermark(&self) -> i64 {
if self.max_event_time_ms == i64::MIN {
return i64::MIN;
}
match &self.strategy {
WatermarkStrategy::MonotonousTimestamps => self.max_event_time_ms,
WatermarkStrategy::BoundedOutOfOrder { max_lateness_ms } => {
self.max_event_time_ms - (*max_lateness_ms as i64)
}
WatermarkStrategy::Periodic { strategy, .. } => {
let inner_max = self.max_event_time_ms;
match strategy.as_ref() {
WatermarkStrategy::MonotonousTimestamps => inner_max,
WatermarkStrategy::BoundedOutOfOrder { max_lateness_ms } => {
inner_max - (*max_lateness_ms as i64)
}
_ => inner_max,
}
}
}
}
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub enum LateElementPolicy {
Drop,
AllowLateness {
grace_ms: u64,
},
SideOutput,
}
impl LateElementPolicy {
pub fn should_process(&self, event_time_ms: i64, watermark_ms: i64) -> bool {
match self {
LateElementPolicy::Drop => event_time_ms >= watermark_ms,
LateElementPolicy::AllowLateness { grace_ms } => {
event_time_ms >= watermark_ms - (*grace_ms as i64)
}
LateElementPolicy::SideOutput => {
true
}
}
}
}
#[derive(Debug, Clone)]
pub struct TimestampedRecord<T> {
pub value: T,
pub event_time_ms: i64,
pub processing_time_ms: i64,
}
impl<T> TimestampedRecord<T> {
pub fn new(value: T, event_time_ms: i64) -> Self {
let processing_time_ms = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
Self {
value,
event_time_ms,
processing_time_ms,
}
}
pub fn with_processing_time(value: T, event_time_ms: i64, processing_time_ms: i64) -> Self {
Self {
value,
event_time_ms,
processing_time_ms,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_monotone_watermark_never_decreases() {
let mut gen = WatermarkGenerator::new(WatermarkStrategy::MonotonousTimestamps);
gen.observe_event(1000);
let wm1 = gen.current_watermark();
gen.observe_event(500); let wm2 = gen.current_watermark();
assert!(
wm2 >= wm1,
"Watermark must be monotone: wm1={wm1}, wm2={wm2}"
);
}
#[test]
fn test_monotone_watermark_advances_with_max_event() {
let mut gen = WatermarkGenerator::new(WatermarkStrategy::MonotonousTimestamps);
gen.observe_event(500);
assert_eq!(gen.current_watermark(), 500);
gen.observe_event(1200);
assert_eq!(gen.current_watermark(), 1200);
gen.observe_event(800); assert_eq!(gen.current_watermark(), 1200);
}
#[test]
fn test_bounded_out_of_order_delays_by_max_lateness() {
let mut gen = WatermarkGenerator::new(WatermarkStrategy::BoundedOutOfOrder {
max_lateness_ms: 500,
});
gen.observe_event(1000);
assert_eq!(gen.current_watermark(), 500); gen.observe_event(1200);
assert_eq!(gen.current_watermark(), 700); }
#[test]
fn test_bounded_out_of_order_watermark_never_decreases() {
let mut gen = WatermarkGenerator::new(WatermarkStrategy::BoundedOutOfOrder {
max_lateness_ms: 200,
});
gen.observe_event(1000);
let wm1 = gen.current_watermark();
gen.observe_event(900); let wm2 = gen.current_watermark();
assert!(wm2 >= wm1);
}
#[test]
fn test_late_element_policy_drop() {
let policy = LateElementPolicy::Drop;
assert!(!policy.should_process(500, 1000)); assert!(policy.should_process(1001, 1000)); }
#[test]
fn test_late_element_policy_allow_lateness() {
let policy = LateElementPolicy::AllowLateness { grace_ms: 300 };
assert!(!policy.should_process(699, 1000));
assert!(policy.should_process(700, 1000));
assert!(policy.should_process(1200, 1000));
}
#[test]
fn test_timestamped_record_preserves_values() {
let rec = TimestampedRecord::with_processing_time(42u64, 9000, 10000);
assert_eq!(rec.value, 42u64);
assert_eq!(rec.event_time_ms, 9000);
assert_eq!(rec.processing_time_ms, 10000);
}
}