use crate::core::stream::StreamElement;
use crate::error::Result;
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct Watermark {
pub timestamp: DateTime<Utc>,
}
impl Watermark {
pub fn new(timestamp: DateTime<Utc>) -> Self {
Self { timestamp }
}
pub fn min() -> Self {
Self {
timestamp: DateTime::from_timestamp(0, 0).unwrap_or_else(Utc::now),
}
}
pub fn max() -> Self {
Self {
timestamp: DateTime::from_timestamp(i64::MAX / 1000, 0).unwrap_or_else(Utc::now),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum WatermarkStrategy {
Ascending,
BoundedOutOfOrderness,
Periodic,
Punctuated,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WatermarkConfig {
pub strategy: WatermarkStrategy,
pub max_out_of_orderness: Duration,
pub interval: Duration,
pub idle_timeout: Option<Duration>,
}
impl Default for WatermarkConfig {
fn default() -> Self {
Self {
strategy: WatermarkStrategy::BoundedOutOfOrderness,
max_out_of_orderness: Duration::seconds(5),
interval: Duration::seconds(1),
idle_timeout: Some(Duration::seconds(10)),
}
}
}
pub trait WatermarkGenerator: Send + Sync {
fn on_event(&mut self, element: &StreamElement) -> Option<Watermark>;
fn on_periodic_emit(&mut self) -> Option<Watermark>;
fn current_watermark(&self) -> Watermark;
}
pub struct PeriodicWatermarkGenerator {
config: WatermarkConfig,
max_timestamp: Option<DateTime<Utc>>,
current_watermark: Watermark,
last_emit: Option<DateTime<Utc>>,
}
impl PeriodicWatermarkGenerator {
pub fn new(config: WatermarkConfig) -> Self {
Self {
config,
max_timestamp: None,
current_watermark: Watermark::min(),
last_emit: None,
}
}
}
impl WatermarkGenerator for PeriodicWatermarkGenerator {
fn on_event(&mut self, element: &StreamElement) -> Option<Watermark> {
if let Some(max_ts) = self.max_timestamp {
if element.event_time > max_ts {
self.max_timestamp = Some(element.event_time);
}
} else {
self.max_timestamp = Some(element.event_time);
}
None
}
fn on_periodic_emit(&mut self) -> Option<Watermark> {
let now = Utc::now();
let should_emit = if let Some(last) = self.last_emit {
now - last >= self.config.interval
} else {
true
};
if should_emit {
if let Some(max_ts) = self.max_timestamp {
let new_watermark = match self.config.strategy {
WatermarkStrategy::Ascending => Watermark::new(max_ts),
WatermarkStrategy::BoundedOutOfOrderness => {
Watermark::new(max_ts - self.config.max_out_of_orderness)
}
_ => self.current_watermark,
};
if new_watermark > self.current_watermark {
self.current_watermark = new_watermark;
self.last_emit = Some(now);
return Some(new_watermark);
}
}
}
None
}
fn current_watermark(&self) -> Watermark {
self.current_watermark
}
}
pub struct PunctuatedWatermarkGenerator {
config: WatermarkConfig,
current_watermark: Watermark,
max_timestamp: Option<DateTime<Utc>>,
}
impl PunctuatedWatermarkGenerator {
pub fn new(config: WatermarkConfig) -> Self {
Self {
config,
current_watermark: Watermark::min(),
max_timestamp: None,
}
}
fn should_emit_watermark(&self, element: &StreamElement) -> bool {
if let Some(marker) = element.metadata.attributes.get("watermark_marker") {
marker == "true"
} else {
false
}
}
}
impl WatermarkGenerator for PunctuatedWatermarkGenerator {
fn on_event(&mut self, element: &StreamElement) -> Option<Watermark> {
if let Some(max_ts) = self.max_timestamp {
if element.event_time > max_ts {
self.max_timestamp = Some(element.event_time);
}
} else {
self.max_timestamp = Some(element.event_time);
}
if self.should_emit_watermark(element) {
if let Some(max_ts) = self.max_timestamp {
let new_watermark = Watermark::new(max_ts - self.config.max_out_of_orderness);
if new_watermark > self.current_watermark {
self.current_watermark = new_watermark;
return Some(new_watermark);
}
}
}
None
}
fn on_periodic_emit(&mut self) -> Option<Watermark> {
None
}
fn current_watermark(&self) -> Watermark {
self.current_watermark
}
}
pub struct MultiSourceWatermarkManager {
source_watermarks: Arc<RwLock<BTreeMap<String, Watermark>>>,
global_watermark: Arc<RwLock<Watermark>>,
}
impl MultiSourceWatermarkManager {
pub fn new() -> Self {
Self {
source_watermarks: Arc::new(RwLock::new(BTreeMap::new())),
global_watermark: Arc::new(RwLock::new(Watermark::min())),
}
}
pub async fn update_source_watermark(
&self,
source_id: String,
watermark: Watermark,
) -> Result<()> {
let mut watermarks = self.source_watermarks.write().await;
watermarks.insert(source_id, watermark);
let min_watermark = watermarks
.values()
.min()
.copied()
.unwrap_or(Watermark::min());
let mut global = self.global_watermark.write().await;
if min_watermark > *global {
*global = min_watermark;
}
Ok(())
}
pub async fn global_watermark(&self) -> Watermark {
*self.global_watermark.read().await
}
pub async fn source_watermark(&self, source_id: &str) -> Option<Watermark> {
self.source_watermarks.read().await.get(source_id).copied()
}
pub async fn remove_source(&self, source_id: &str) -> Result<()> {
let mut watermarks = self.source_watermarks.write().await;
watermarks.remove(source_id);
let min_watermark = watermarks
.values()
.min()
.copied()
.unwrap_or(Watermark::max());
let mut global = self.global_watermark.write().await;
*global = min_watermark;
Ok(())
}
}
impl Default for MultiSourceWatermarkManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_watermark_creation() {
let now = Utc::now();
let wm = Watermark::new(now);
assert_eq!(wm.timestamp, now);
}
#[test]
fn test_watermark_ordering() {
let wm1 = Watermark::new(Utc::now());
let wm2 = Watermark::new(Utc::now() + Duration::seconds(10));
assert!(wm1 < wm2);
assert!(wm2 > wm1);
}
#[tokio::test]
async fn test_periodic_watermark_generator() {
let config = WatermarkConfig::default();
let mut generator = PeriodicWatermarkGenerator::new(config);
let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
generator.on_event(&elem);
let wm = generator.on_periodic_emit();
assert!(wm.is_some());
}
#[tokio::test]
async fn test_multi_source_watermark_manager() {
let manager = MultiSourceWatermarkManager::new();
let wm1 = Watermark::new(Utc::now());
let wm2 = Watermark::new(Utc::now() + Duration::seconds(10));
manager
.update_source_watermark("source1".to_string(), wm1)
.await
.expect("Test watermark update for source1 should succeed");
manager
.update_source_watermark("source2".to_string(), wm2)
.await
.expect("Test watermark update for source2 should succeed");
let global = manager.global_watermark().await;
assert_eq!(global, wm1);
}
#[tokio::test]
async fn test_remove_source_watermark() {
let manager = MultiSourceWatermarkManager::new();
let wm1 = Watermark::new(Utc::now());
let wm2 = Watermark::new(Utc::now() + Duration::seconds(10));
manager
.update_source_watermark("source1".to_string(), wm1)
.await
.expect("Test watermark update for source1 should succeed");
manager
.update_source_watermark("source2".to_string(), wm2)
.await
.expect("Test watermark update for source2 should succeed");
manager
.remove_source("source1")
.await
.expect("Test source removal should succeed");
let global = manager.global_watermark().await;
assert_eq!(global, wm2);
}
}