use super::event::StreamEvent;
use std::collections::VecDeque;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub struct Watermark {
pub timestamp: u64,
}
impl Watermark {
pub fn new(timestamp: u64) -> Self {
Self { timestamp }
}
pub fn from_system_time(time: SystemTime) -> Self {
let timestamp = time
.duration_since(UNIX_EPOCH)
.unwrap_or(Duration::ZERO)
.as_millis() as u64;
Self { timestamp }
}
pub fn now() -> Self {
Self::from_system_time(SystemTime::now())
}
pub fn is_before(&self, other: &Watermark) -> bool {
self.timestamp < other.timestamp
}
pub fn is_late(&self, event_time: u64) -> bool {
event_time < self.timestamp
}
}
#[derive(Debug, Clone)]
pub enum WatermarkStrategy {
Periodic {
interval: Duration,
},
BoundedOutOfOrder {
max_delay: Duration,
},
MonotonicAscending,
Custom,
}
#[allow(dead_code)]
pub struct WatermarkGenerator {
current_watermark: Watermark,
strategy: WatermarkStrategy,
max_timestamp: u64,
last_emission: SystemTime,
_pending_events: VecDeque<StreamEvent>,
}
impl WatermarkGenerator {
pub fn new(strategy: WatermarkStrategy) -> Self {
Self {
current_watermark: Watermark::new(0),
strategy,
max_timestamp: 0,
last_emission: SystemTime::now(),
_pending_events: VecDeque::new(),
}
}
pub fn process_event(&mut self, event: &StreamEvent) -> Option<Watermark> {
let event_time = event.metadata.timestamp;
if event_time > self.max_timestamp {
self.max_timestamp = event_time;
}
self.maybe_generate_watermark()
}
fn maybe_generate_watermark(&mut self) -> Option<Watermark> {
let new_watermark = match &self.strategy {
WatermarkStrategy::Periodic { interval } => {
let now = SystemTime::now();
let elapsed = now.duration_since(self.last_emission).ok()?;
if elapsed >= *interval {
self.last_emission = now;
Some(Watermark::new(self.max_timestamp))
} else {
None
}
}
WatermarkStrategy::BoundedOutOfOrder { max_delay } => {
let delay_ms = max_delay.as_millis() as u64;
let new_ts = self.max_timestamp.saturating_sub(delay_ms);
if new_ts > self.current_watermark.timestamp {
Some(Watermark::new(new_ts))
} else {
None
}
}
WatermarkStrategy::MonotonicAscending => {
if self.max_timestamp > self.current_watermark.timestamp {
Some(Watermark::new(self.max_timestamp))
} else {
None
}
}
WatermarkStrategy::Custom => {
None
}
};
if let Some(wm) = new_watermark {
if wm > self.current_watermark {
self.current_watermark = wm;
return Some(wm);
}
}
None
}
pub fn current_watermark(&self) -> Watermark {
self.current_watermark
}
pub fn is_late(&self, event: &StreamEvent) -> bool {
self.current_watermark.is_late(event.metadata.timestamp)
}
}
#[derive(Debug, Clone)]
pub enum LateDataStrategy {
Drop,
AllowedLateness {
max_lateness: Duration,
},
SideOutput,
RecomputeWindows,
}
pub struct LateDataHandler {
strategy: LateDataStrategy,
side_output: Vec<StreamEvent>,
late_count: usize,
dropped_count: usize,
allowed_count: usize,
}
impl LateDataHandler {
pub fn new(strategy: LateDataStrategy) -> Self {
Self {
strategy,
side_output: Vec::new(),
late_count: 0,
dropped_count: 0,
allowed_count: 0,
}
}
pub fn handle_late_event(
&mut self,
event: StreamEvent,
watermark: &Watermark,
) -> LateEventDecision {
self.late_count += 1;
let lateness = watermark.timestamp.saturating_sub(event.metadata.timestamp);
match &self.strategy {
LateDataStrategy::Drop => {
self.dropped_count += 1;
LateEventDecision::Drop
}
LateDataStrategy::AllowedLateness { max_lateness } => {
let max_lateness_ms = max_lateness.as_millis() as u64;
if lateness <= max_lateness_ms {
self.allowed_count += 1;
LateEventDecision::Process(event)
} else {
self.dropped_count += 1;
LateEventDecision::Drop
}
}
LateDataStrategy::SideOutput => {
self.side_output.push(event.clone());
LateEventDecision::SideOutput(event)
}
LateDataStrategy::RecomputeWindows => {
self.allowed_count += 1;
LateEventDecision::Recompute(event)
}
}
}
pub fn side_output(&self) -> &[StreamEvent] {
&self.side_output
}
pub fn clear_side_output(&mut self) {
self.side_output.clear();
}
pub fn stats(&self) -> LateDataStats {
LateDataStats {
total_late: self.late_count,
dropped: self.dropped_count,
allowed: self.allowed_count,
side_output: self.side_output.len(),
}
}
}
#[derive(Debug, Clone)]
pub enum LateEventDecision {
Drop,
Process(StreamEvent),
SideOutput(StreamEvent),
Recompute(StreamEvent),
}
#[derive(Debug, Clone, Copy)]
pub struct LateDataStats {
pub total_late: usize,
pub dropped: usize,
pub allowed: usize,
pub side_output: usize,
}
pub struct WatermarkedStream {
events: Vec<StreamEvent>,
watermark_gen: WatermarkGenerator,
late_handler: LateDataHandler,
watermark_history: Vec<Watermark>,
}
impl WatermarkedStream {
pub fn new(watermark_strategy: WatermarkStrategy, late_strategy: LateDataStrategy) -> Self {
Self {
events: Vec::new(),
watermark_gen: WatermarkGenerator::new(watermark_strategy),
late_handler: LateDataHandler::new(late_strategy),
watermark_history: Vec::new(),
}
}
pub fn add_event(&mut self, event: StreamEvent) -> Result<(), String> {
if self.watermark_gen.is_late(&event) {
match self
.late_handler
.handle_late_event(event, &self.watermark_gen.current_watermark())
{
LateEventDecision::Drop => {
}
LateEventDecision::Process(e) => {
self.events.push(e);
}
LateEventDecision::SideOutput(_) => {
}
LateEventDecision::Recompute(e) => {
self.events.push(e);
}
}
} else {
self.events.push(event.clone());
if let Some(new_watermark) = self.watermark_gen.process_event(&event) {
self.watermark_history.push(new_watermark);
}
}
Ok(())
}
pub fn events(&self) -> &[StreamEvent] {
&self.events
}
pub fn current_watermark(&self) -> Watermark {
self.watermark_gen.current_watermark()
}
pub fn late_stats(&self) -> LateDataStats {
self.late_handler.stats()
}
pub fn side_output(&self) -> &[StreamEvent] {
self.late_handler.side_output()
}
pub fn watermark_history(&self) -> &[Watermark] {
&self.watermark_history
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Value;
use std::collections::HashMap;
fn create_event(timestamp: u64, value: i64) -> StreamEvent {
let mut data = HashMap::new();
data.insert("value".to_string(), Value::Integer(value));
let event = StreamEvent::new("TestEvent", data, "test");
StreamEvent {
metadata: super::super::event::EventMetadata {
timestamp,
..event.metadata
},
..event
}
}
#[test]
fn test_watermark_ordering() {
let wm1 = Watermark::new(1000);
let wm2 = Watermark::new(2000);
assert!(wm1.is_before(&wm2));
assert!(!wm2.is_before(&wm1));
assert!(wm1 < wm2);
}
#[test]
fn test_monotonic_watermark() {
let mut gen = WatermarkGenerator::new(WatermarkStrategy::MonotonicAscending);
let e1 = create_event(1000, 1);
let e2 = create_event(2000, 2);
let e3 = create_event(1500, 3);
gen.process_event(&e1);
assert_eq!(gen.current_watermark().timestamp, 1000);
gen.process_event(&e2);
assert_eq!(gen.current_watermark().timestamp, 2000);
gen.process_event(&e3);
assert_eq!(gen.current_watermark().timestamp, 2000);
}
#[test]
fn test_bounded_out_of_order() {
let strategy = WatermarkStrategy::BoundedOutOfOrder {
max_delay: Duration::from_millis(500),
};
let mut gen = WatermarkGenerator::new(strategy);
let e1 = create_event(2000, 1);
gen.process_event(&e1);
assert_eq!(gen.current_watermark().timestamp, 1500);
}
#[test]
fn test_late_data_drop() {
let mut handler = LateDataHandler::new(LateDataStrategy::Drop);
let watermark = Watermark::new(2000);
let late_event = create_event(1000, 1);
match handler.handle_late_event(late_event, &watermark) {
LateEventDecision::Drop => {
let stats = handler.stats();
assert_eq!(stats.total_late, 1);
assert_eq!(stats.dropped, 1);
}
_ => panic!("Expected Drop decision"),
}
}
#[test]
fn test_late_data_allowed_lateness() {
let strategy = LateDataStrategy::AllowedLateness {
max_lateness: Duration::from_millis(500),
};
let mut handler = LateDataHandler::new(strategy);
let watermark = Watermark::new(2000);
let late_event1 = create_event(1600, 1); match handler.handle_late_event(late_event1, &watermark) {
LateEventDecision::Process(_) => {
assert_eq!(handler.stats().allowed, 1);
}
_ => panic!("Expected Process decision"),
}
let late_event2 = create_event(1400, 2); match handler.handle_late_event(late_event2, &watermark) {
LateEventDecision::Drop => {
assert_eq!(handler.stats().dropped, 1);
}
_ => panic!("Expected Drop decision"),
}
}
#[test]
fn test_watermarked_stream() {
let strategy = WatermarkStrategy::BoundedOutOfOrder {
max_delay: Duration::from_millis(500),
};
let late_strategy = LateDataStrategy::Drop;
let mut stream = WatermarkedStream::new(strategy, late_strategy);
stream.add_event(create_event(1000, 1)).unwrap();
stream.add_event(create_event(2000, 2)).unwrap();
assert_eq!(stream.current_watermark().timestamp, 1500);
stream.add_event(create_event(1200, 3)).unwrap();
let stats = stream.late_stats();
assert_eq!(stats.total_late, 1);
assert_eq!(stats.dropped, 1);
assert_eq!(stream.events().len(), 2);
}
}