1use chrono::{DateTime, Duration, Utc};
2
3#[derive(Debug, Clone, Copy)]
6pub struct WatermarkPolicy {
7 pub allowed_lateness: Duration,
8}
9
10impl WatermarkPolicy {
11 pub fn new(allowed_lateness: Duration) -> Self {
12 Self { allowed_lateness }
13 }
14}
15
16#[derive(Debug, Clone)]
17pub struct WatermarkClock {
18 policy: WatermarkPolicy,
19 max_observed: Option<DateTime<Utc>>,
20}
21
22impl WatermarkClock {
23 pub fn new(policy: WatermarkPolicy) -> Self {
24 Self {
25 policy,
26 max_observed: None,
27 }
28 }
29
30 pub fn observe(&mut self, ts: DateTime<Utc>) {
31 self.max_observed = match self.max_observed {
32 Some(max) if ts > max => Some(ts),
33 None => Some(ts),
34 Some(max) => Some(max),
35 };
36 }
37
38 pub fn watermark(&self) -> Option<DateTime<Utc>> {
39 self.max_observed
40 .map(|t| t - self.policy.allowed_lateness)
41 }
42}
43
44#[cfg(test)]
45mod tests {
46 use super::*;
47
48 #[test]
49 fn clock_advances_with_observations_and_lateness() {
50 let pol = WatermarkPolicy::new(Duration::seconds(10));
51 let mut clk = WatermarkClock::new(pol);
52 let t0 = DateTime::<Utc>::from_timestamp(1_700_000_000, 0).unwrap();
53 let t1 = DateTime::<Utc>::from_timestamp(1_700_000_030, 0).unwrap();
54 clk.observe(t0);
55 assert_eq!(clk.watermark(), Some(t0 - Duration::seconds(10)));
56 clk.observe(t1);
57 assert_eq!(clk.watermark(), Some(t1 - Duration::seconds(10)));
58 clk.observe(t0 - Duration::seconds(100));
60 assert_eq!(clk.watermark(), Some(t1 - Duration::seconds(10)));
61 }
62}