use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WatermarkState {
pub watermark_ts: i64,
pub o3_watermark_ts: Option<i64>,
pub rows_aggregated: u64,
pub last_refresh_ms: i64,
}
impl Default for WatermarkState {
fn default() -> Self {
Self {
watermark_ts: i64::MIN,
o3_watermark_ts: None,
rows_aggregated: 0,
last_refresh_ms: 0,
}
}
}
impl WatermarkState {
pub fn record_o3(&mut self, ts: i64) {
if ts <= self.watermark_ts {
match self.o3_watermark_ts {
Some(current) if ts < current => self.o3_watermark_ts = Some(ts),
None => self.o3_watermark_ts = Some(ts),
_ => {}
}
}
}
pub fn advance(&mut self, max_ts: i64, rows: u64, now_ms: i64) {
if max_ts > self.watermark_ts {
self.watermark_ts = max_ts;
}
self.rows_aggregated += rows;
self.last_refresh_ms = now_ms;
}
pub fn clear_o3(&mut self) {
self.o3_watermark_ts = None;
}
}