1use chrono::{DateTime, Duration, Utc};
2
3#[derive(Debug, Clone, Copy)]
6pub struct WatermarkPolicy {
7    pub allowed_lateness: Duration,
8}
9
10impl WatermarkPolicy {
11    pub fn new(allowed_lateness: Duration) -> Self {
12        Self { allowed_lateness }
13    }
14}
15
16#[derive(Debug, Clone)]
17pub struct WatermarkClock {
18    policy: WatermarkPolicy,
19    max_observed: Option<DateTime<Utc>>,
20}
21
22impl WatermarkClock {
23    pub fn new(policy: WatermarkPolicy) -> Self {
24        Self {
25            policy,
26            max_observed: None,
27        }
28    }
29
30    pub fn observe(&mut self, ts: DateTime<Utc>) {
31        self.max_observed = match self.max_observed {
32            Some(max) if ts > max => Some(ts),
33            None => Some(ts),
34            Some(max) => Some(max),
35        };
36    }
37
38    pub fn watermark(&self) -> Option<DateTime<Utc>> {
39        self.max_observed.map(|t| t - self.policy.allowed_lateness)
40    }
41}
42
43#[cfg(test)]
44mod tests {
45    use super::*;
46
47    #[test]
48    fn clock_advances_with_observations_and_lateness() {
49        let pol = WatermarkPolicy::new(Duration::seconds(10));
50        let mut clk = WatermarkClock::new(pol);
51        let t0 = DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap();
52        let t1 = DateTime::<Utc>::from_timestamp(1_700_000_030, 0).unwrap();
53        clk.observe(t0);
54        assert_eq!(clk.watermark(), Some(t0 - Duration::seconds(10)));
55        clk.observe(t1);
56        assert_eq!(clk.watermark(), Some(t1 - Duration::seconds(10)));
57        clk.observe(t0 - Duration::seconds(100));
59        assert_eq!(clk.watermark(), Some(t1 - Duration::seconds(10)));
60    }
61}