Skip to main content

varpulis_runtime/
watermark.rs

1//! Per-source watermark tracking for multi-source event-time processing.
2//!
3//! Tracks watermarks independently per source stream and computes the effective
4//! watermark as the minimum across all sources. This ensures correct event-time
5//! processing when events arrive from multiple sources with different latencies.
6
7use std::time::Instant;
8
9use chrono::{DateTime, Duration, Utc};
10use rustc_hash::FxHashMap;
11
12use crate::persistence::{SourceWatermarkCheckpoint, WatermarkCheckpoint};
13
14/// Tracks watermarks for multiple event sources.
15///
16/// The effective watermark is the minimum watermark across all registered sources,
17/// ensuring no source's events are prematurely considered late.
18#[derive(Debug)]
19pub struct PerSourceWatermarkTracker {
20    sources: FxHashMap<String, SourceWatermark>,
21    effective_watermark: Option<DateTime<Utc>>,
22}
23
24#[derive(Debug)]
25struct SourceWatermark {
26    watermark: Option<DateTime<Utc>>,
27    max_timestamp: Option<DateTime<Utc>>,
28    max_out_of_orderness: Duration,
29    last_event_time: Option<Instant>,
30}
31
32impl PerSourceWatermarkTracker {
33    /// Create a new empty tracker.
34    pub fn new() -> Self {
35        Self {
36            sources: FxHashMap::default(),
37            effective_watermark: None,
38        }
39    }
40
41    /// Register a source with its maximum out-of-orderness tolerance.
42    pub fn register_source(&mut self, name: &str, max_ooo: Duration) {
43        self.sources.insert(
44            name.to_string(),
45            SourceWatermark {
46                watermark: None,
47                max_timestamp: None,
48                max_out_of_orderness: max_ooo,
49                last_event_time: None,
50            },
51        );
52    }
53
54    /// Observe an event from a source, updating its watermark.
55    pub fn observe_event(&mut self, source: &str, event_ts: DateTime<Utc>) {
56        if let Some(sw) = self.sources.get_mut(source) {
57            sw.last_event_time = Some(Instant::now());
58
59            // Update max timestamp
60            let updated = match sw.max_timestamp {
61                Some(max_ts) if event_ts > max_ts => {
62                    sw.max_timestamp = Some(event_ts);
63                    true
64                }
65                None => {
66                    sw.max_timestamp = Some(event_ts);
67                    true
68                }
69                _ => false,
70            };
71
72            // Recompute source watermark: max_timestamp - max_out_of_orderness
73            if updated {
74                if let Some(max_ts) = sw.max_timestamp {
75                    let new_wm = max_ts - sw.max_out_of_orderness;
76                    // Watermark never recedes
77                    match sw.watermark {
78                        Some(wm) if new_wm > wm => sw.watermark = Some(new_wm),
79                        None => sw.watermark = Some(new_wm),
80                        _ => {}
81                    }
82                }
83            }
84
85            self.recompute_effective();
86        } else {
87            // Auto-register unknown sources with zero out-of-orderness
88            self.register_source(source, Duration::zero());
89            self.observe_event(source, event_ts);
90        }
91    }
92
93    /// Get the effective (minimum) watermark across all sources.
94    pub const fn effective_watermark(&self) -> Option<DateTime<Utc>> {
95        self.effective_watermark
96    }
97
98    /// Manually advance a source's watermark (e.g., from upstream context).
99    pub fn advance_source_watermark(&mut self, source: &str, wm: DateTime<Utc>) {
100        if let Some(sw) = self.sources.get_mut(source) {
101            match sw.watermark {
102                Some(current) if wm > current => sw.watermark = Some(wm),
103                None => sw.watermark = Some(wm),
104                _ => {}
105            }
106            self.recompute_effective();
107        }
108    }
109
110    /// Recompute the effective watermark as the minimum of all source watermarks.
111    fn recompute_effective(&mut self) {
112        if self.sources.is_empty() {
113            self.effective_watermark = None;
114            return;
115        }
116
117        let mut min_wm: Option<DateTime<Utc>> = None;
118        for sw in self.sources.values() {
119            match (min_wm, sw.watermark) {
120                (Some(current_min), Some(source_wm)) => {
121                    if source_wm < current_min {
122                        min_wm = Some(source_wm);
123                    }
124                }
125                (None, Some(source_wm)) => {
126                    min_wm = Some(source_wm);
127                }
128                // If any source has no watermark yet, effective watermark stays at current
129                // (we don't block on uninitialized sources)
130                _ => {}
131            }
132        }
133
134        // Set effective watermark to the min across all sources.
135        // The effective watermark CAN decrease when a previously-uninitialized
136        // source registers its first (lower) watermark. Per-source monotonicity
137        // is enforced in observe_event/advance_source_watermark.
138        if let Some(new) = min_wm {
139            self.effective_watermark = Some(new);
140        }
141    }
142
143    /// Create a checkpoint of the tracker state.
144    pub fn checkpoint(&self) -> WatermarkCheckpoint {
145        let sources = self
146            .sources
147            .iter()
148            .map(|(name, sw)| {
149                (
150                    name.clone(),
151                    SourceWatermarkCheckpoint {
152                        watermark_ms: sw.watermark.map(|w| w.timestamp_millis()),
153                        max_timestamp_ms: sw.max_timestamp.map(|t| t.timestamp_millis()),
154                        max_out_of_orderness_ms: sw.max_out_of_orderness.num_milliseconds(),
155                    },
156                )
157            })
158            .collect();
159
160        WatermarkCheckpoint {
161            sources,
162            effective_watermark_ms: self.effective_watermark.map(|w| w.timestamp_millis()),
163        }
164    }
165
166    /// Restore tracker state from a checkpoint.
167    pub fn restore(&mut self, cp: &WatermarkCheckpoint) {
168        for (name, scp) in &cp.sources {
169            let sw = self
170                .sources
171                .entry(name.clone())
172                .or_insert_with(|| SourceWatermark {
173                    watermark: None,
174                    max_timestamp: None,
175                    max_out_of_orderness: Duration::milliseconds(scp.max_out_of_orderness_ms),
176                    last_event_time: None,
177                });
178            sw.watermark = scp.watermark_ms.and_then(DateTime::from_timestamp_millis);
179            sw.max_timestamp = scp
180                .max_timestamp_ms
181                .and_then(DateTime::from_timestamp_millis);
182            sw.max_out_of_orderness = Duration::milliseconds(scp.max_out_of_orderness_ms);
183        }
184
185        self.effective_watermark = cp
186            .effective_watermark_ms
187            .and_then(DateTime::from_timestamp_millis);
188    }
189
190    /// Check if any sources are registered.
191    pub fn has_sources(&self) -> bool {
192        !self.sources.is_empty()
193    }
194}
195
196impl Default for PerSourceWatermarkTracker {
197    fn default() -> Self {
198        Self::new()
199    }
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205
206    #[test]
207    fn test_single_source() {
208        let mut tracker = PerSourceWatermarkTracker::new();
209        tracker.register_source("src1", Duration::seconds(5));
210
211        let base = Utc::now();
212        tracker.observe_event("src1", base);
213
214        // Watermark should be base - 5s
215        let wm = tracker.effective_watermark().unwrap();
216        assert_eq!(wm, base - Duration::seconds(5));
217    }
218
219    #[test]
220    fn test_two_sources_min_watermark() {
221        let mut tracker = PerSourceWatermarkTracker::new();
222        tracker.register_source("fast", Duration::seconds(1));
223        tracker.register_source("slow", Duration::seconds(10));
224
225        let base = Utc::now();
226        tracker.observe_event("fast", base + Duration::seconds(20));
227        tracker.observe_event("slow", base + Duration::seconds(15));
228
229        // fast watermark = base+20 - 1 = base+19
230        // slow watermark = base+15 - 10 = base+5
231        // effective = min(base+19, base+5) = base+5
232        let wm = tracker.effective_watermark().unwrap();
233        assert_eq!(wm, base + Duration::seconds(5));
234    }
235
236    #[test]
237    fn test_watermark_never_recedes() {
238        let mut tracker = PerSourceWatermarkTracker::new();
239        tracker.register_source("src1", Duration::seconds(0));
240
241        let base = Utc::now();
242        tracker.observe_event("src1", base + Duration::seconds(10));
243        let wm1 = tracker.effective_watermark().unwrap();
244
245        // Late event should not cause watermark to recede
246        tracker.observe_event("src1", base + Duration::seconds(5));
247        let wm2 = tracker.effective_watermark().unwrap();
248        assert_eq!(wm1, wm2);
249    }
250
251    #[test]
252    fn test_advance_source_watermark() {
253        let mut tracker = PerSourceWatermarkTracker::new();
254        tracker.register_source("upstream", Duration::seconds(0));
255
256        let base = Utc::now();
257        tracker.advance_source_watermark("upstream", base);
258
259        assert_eq!(tracker.effective_watermark(), Some(base));
260    }
261
262    #[test]
263    fn test_checkpoint_restore_roundtrip() {
264        let mut tracker = PerSourceWatermarkTracker::new();
265        tracker.register_source("src1", Duration::seconds(5));
266        tracker.register_source("src2", Duration::seconds(10));
267
268        let base = Utc::now();
269        tracker.observe_event("src1", base + Duration::seconds(20));
270        tracker.observe_event("src2", base + Duration::seconds(15));
271
272        let cp = tracker.checkpoint();
273
274        let mut restored = PerSourceWatermarkTracker::new();
275        restored.restore(&cp);
276
277        assert_eq!(
278            tracker.effective_watermark().map(|w| w.timestamp_millis()),
279            restored.effective_watermark().map(|w| w.timestamp_millis())
280        );
281    }
282}