1use chrono::{DateTime, Duration, Utc};
2
3#[derive(Debug, Clone, Copy)]
6pub struct WatermarkPolicy {
7 pub allowed_lateness: Duration,
8}
9
10impl WatermarkPolicy {
11 pub fn new(allowed_lateness: Duration) -> Self {
12 Self { allowed_lateness }
13 }
14}
15
16#[derive(Debug, Clone)]
17pub struct WatermarkClock {
18 policy: WatermarkPolicy,
19 max_observed: Option<DateTime<Utc>>,
20}
21
22impl WatermarkClock {
23 pub fn new(policy: WatermarkPolicy) -> Self {
24 Self {
25 policy,
26 max_observed: None,
27 }
28 }
29
30 pub fn observe(&mut self, ts: DateTime<Utc>) {
31 self.max_observed = match self.max_observed {
32 Some(max) if ts > max => Some(ts),
33 None => Some(ts),
34 Some(max) => Some(max),
35 };
36 }
37
38 pub fn watermark(&self) -> Option<DateTime<Utc>> {
39 self.max_observed.map(|t| t - self.policy.allowed_lateness)
40 }
41}
42
43#[cfg(test)]
44mod tests {
45 use super::*;
46
47 #[test]
48 fn clock_advances_with_observations_and_lateness() {
49 let pol = WatermarkPolicy::new(Duration::seconds(10));
50 let mut clk = WatermarkClock::new(pol);
51 let t0 = DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap();
52 let t1 = DateTime::<Utc>::from_timestamp(1_700_000_030, 0).unwrap();
53 clk.observe(t0);
54 assert_eq!(clk.watermark(), Some(t0 - Duration::seconds(10)));
55 clk.observe(t1);
56 assert_eq!(clk.watermark(), Some(t1 - Duration::seconds(10)));
57 clk.observe(t0 - Duration::seconds(100));
59 assert_eq!(clk.watermark(), Some(t1 - Duration::seconds(10)));
60 }
61}