varpulis_runtime/
watermark.rs1use std::time::Instant;
8
9use chrono::{DateTime, Duration, Utc};
10use rustc_hash::FxHashMap;
11
12use crate::persistence::{SourceWatermarkCheckpoint, WatermarkCheckpoint};
13
14#[derive(Debug)]
19pub struct PerSourceWatermarkTracker {
20 sources: FxHashMap<String, SourceWatermark>,
21 effective_watermark: Option<DateTime<Utc>>,
22}
23
24#[derive(Debug)]
25struct SourceWatermark {
26 watermark: Option<DateTime<Utc>>,
27 max_timestamp: Option<DateTime<Utc>>,
28 max_out_of_orderness: Duration,
29 last_event_time: Option<Instant>,
30}
31
32impl PerSourceWatermarkTracker {
33 pub fn new() -> Self {
35 Self {
36 sources: FxHashMap::default(),
37 effective_watermark: None,
38 }
39 }
40
41 pub fn register_source(&mut self, name: &str, max_ooo: Duration) {
43 self.sources.insert(
44 name.to_string(),
45 SourceWatermark {
46 watermark: None,
47 max_timestamp: None,
48 max_out_of_orderness: max_ooo,
49 last_event_time: None,
50 },
51 );
52 }
53
54 pub fn observe_event(&mut self, source: &str, event_ts: DateTime<Utc>) {
56 if let Some(sw) = self.sources.get_mut(source) {
57 sw.last_event_time = Some(Instant::now());
58
59 let updated = match sw.max_timestamp {
61 Some(max_ts) if event_ts > max_ts => {
62 sw.max_timestamp = Some(event_ts);
63 true
64 }
65 None => {
66 sw.max_timestamp = Some(event_ts);
67 true
68 }
69 _ => false,
70 };
71
72 if updated {
74 if let Some(max_ts) = sw.max_timestamp {
75 let new_wm = max_ts - sw.max_out_of_orderness;
76 match sw.watermark {
78 Some(wm) if new_wm > wm => sw.watermark = Some(new_wm),
79 None => sw.watermark = Some(new_wm),
80 _ => {}
81 }
82 }
83 }
84
85 self.recompute_effective();
86 } else {
87 self.register_source(source, Duration::zero());
89 self.observe_event(source, event_ts);
90 }
91 }
92
93 pub const fn effective_watermark(&self) -> Option<DateTime<Utc>> {
95 self.effective_watermark
96 }
97
98 pub fn advance_source_watermark(&mut self, source: &str, wm: DateTime<Utc>) {
100 if let Some(sw) = self.sources.get_mut(source) {
101 match sw.watermark {
102 Some(current) if wm > current => sw.watermark = Some(wm),
103 None => sw.watermark = Some(wm),
104 _ => {}
105 }
106 self.recompute_effective();
107 }
108 }
109
110 fn recompute_effective(&mut self) {
112 if self.sources.is_empty() {
113 self.effective_watermark = None;
114 return;
115 }
116
117 let mut min_wm: Option<DateTime<Utc>> = None;
118 for sw in self.sources.values() {
119 match (min_wm, sw.watermark) {
120 (Some(current_min), Some(source_wm)) => {
121 if source_wm < current_min {
122 min_wm = Some(source_wm);
123 }
124 }
125 (None, Some(source_wm)) => {
126 min_wm = Some(source_wm);
127 }
128 _ => {}
131 }
132 }
133
134 if let Some(new) = min_wm {
139 self.effective_watermark = Some(new);
140 }
141 }
142
143 pub fn checkpoint(&self) -> WatermarkCheckpoint {
145 let sources = self
146 .sources
147 .iter()
148 .map(|(name, sw)| {
149 (
150 name.clone(),
151 SourceWatermarkCheckpoint {
152 watermark_ms: sw.watermark.map(|w| w.timestamp_millis()),
153 max_timestamp_ms: sw.max_timestamp.map(|t| t.timestamp_millis()),
154 max_out_of_orderness_ms: sw.max_out_of_orderness.num_milliseconds(),
155 },
156 )
157 })
158 .collect();
159
160 WatermarkCheckpoint {
161 sources,
162 effective_watermark_ms: self.effective_watermark.map(|w| w.timestamp_millis()),
163 }
164 }
165
166 pub fn restore(&mut self, cp: &WatermarkCheckpoint) {
168 for (name, scp) in &cp.sources {
169 let sw = self
170 .sources
171 .entry(name.clone())
172 .or_insert_with(|| SourceWatermark {
173 watermark: None,
174 max_timestamp: None,
175 max_out_of_orderness: Duration::milliseconds(scp.max_out_of_orderness_ms),
176 last_event_time: None,
177 });
178 sw.watermark = scp.watermark_ms.and_then(DateTime::from_timestamp_millis);
179 sw.max_timestamp = scp
180 .max_timestamp_ms
181 .and_then(DateTime::from_timestamp_millis);
182 sw.max_out_of_orderness = Duration::milliseconds(scp.max_out_of_orderness_ms);
183 }
184
185 self.effective_watermark = cp
186 .effective_watermark_ms
187 .and_then(DateTime::from_timestamp_millis);
188 }
189
190 pub fn has_sources(&self) -> bool {
192 !self.sources.is_empty()
193 }
194}
195
196impl Default for PerSourceWatermarkTracker {
197 fn default() -> Self {
198 Self::new()
199 }
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205
206 #[test]
207 fn test_single_source() {
208 let mut tracker = PerSourceWatermarkTracker::new();
209 tracker.register_source("src1", Duration::seconds(5));
210
211 let base = Utc::now();
212 tracker.observe_event("src1", base);
213
214 let wm = tracker.effective_watermark().unwrap();
216 assert_eq!(wm, base - Duration::seconds(5));
217 }
218
219 #[test]
220 fn test_two_sources_min_watermark() {
221 let mut tracker = PerSourceWatermarkTracker::new();
222 tracker.register_source("fast", Duration::seconds(1));
223 tracker.register_source("slow", Duration::seconds(10));
224
225 let base = Utc::now();
226 tracker.observe_event("fast", base + Duration::seconds(20));
227 tracker.observe_event("slow", base + Duration::seconds(15));
228
229 let wm = tracker.effective_watermark().unwrap();
233 assert_eq!(wm, base + Duration::seconds(5));
234 }
235
236 #[test]
237 fn test_watermark_never_recedes() {
238 let mut tracker = PerSourceWatermarkTracker::new();
239 tracker.register_source("src1", Duration::seconds(0));
240
241 let base = Utc::now();
242 tracker.observe_event("src1", base + Duration::seconds(10));
243 let wm1 = tracker.effective_watermark().unwrap();
244
245 tracker.observe_event("src1", base + Duration::seconds(5));
247 let wm2 = tracker.effective_watermark().unwrap();
248 assert_eq!(wm1, wm2);
249 }
250
251 #[test]
252 fn test_advance_source_watermark() {
253 let mut tracker = PerSourceWatermarkTracker::new();
254 tracker.register_source("upstream", Duration::seconds(0));
255
256 let base = Utc::now();
257 tracker.advance_source_watermark("upstream", base);
258
259 assert_eq!(tracker.effective_watermark(), Some(base));
260 }
261
262 #[test]
263 fn test_checkpoint_restore_roundtrip() {
264 let mut tracker = PerSourceWatermarkTracker::new();
265 tracker.register_source("src1", Duration::seconds(5));
266 tracker.register_source("src2", Duration::seconds(10));
267
268 let base = Utc::now();
269 tracker.observe_event("src1", base + Duration::seconds(20));
270 tracker.observe_event("src2", base + Duration::seconds(15));
271
272 let cp = tracker.checkpoint();
273
274 let mut restored = PerSourceWatermarkTracker::new();
275 restored.restore(&cp);
276
277 assert_eq!(
278 tracker.effective_watermark().map(|w| w.timestamp_millis()),
279 restored.effective_watermark().map(|w| w.timestamp_millis())
280 );
281 }
282}