oxigdal_streaming/windowing/
watermark.rs1use crate::core::stream::StreamElement;
4use crate::error::Result;
5use chrono::{DateTime, Duration, Utc};
6use serde::{Deserialize, Serialize};
7use std::collections::BTreeMap;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
13pub struct Watermark {
14 pub timestamp: DateTime<Utc>,
16}
17
18impl Watermark {
19 pub fn new(timestamp: DateTime<Utc>) -> Self {
21 Self { timestamp }
22 }
23
24 pub fn min() -> Self {
26 Self {
27 timestamp: DateTime::from_timestamp(0, 0).unwrap_or_else(Utc::now),
28 }
29 }
30
31 pub fn max() -> Self {
33 Self {
34 timestamp: DateTime::from_timestamp(i64::MAX / 1000, 0).unwrap_or_else(Utc::now),
35 }
36 }
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
41pub enum WatermarkStrategy {
42 Ascending,
44
45 BoundedOutOfOrderness,
47
48 Periodic,
50
51 Punctuated,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct WatermarkConfig {
58 pub strategy: WatermarkStrategy,
60
61 pub max_out_of_orderness: Duration,
63
64 pub interval: Duration,
66
67 pub idle_timeout: Option<Duration>,
69}
70
71impl Default for WatermarkConfig {
72 fn default() -> Self {
73 Self {
74 strategy: WatermarkStrategy::BoundedOutOfOrderness,
75 max_out_of_orderness: Duration::seconds(5),
76 interval: Duration::seconds(1),
77 idle_timeout: Some(Duration::seconds(10)),
78 }
79 }
80}
81
82pub trait WatermarkGenerator: Send + Sync {
84 fn on_event(&mut self, element: &StreamElement) -> Option<Watermark>;
86
87 fn on_periodic_emit(&mut self) -> Option<Watermark>;
89
90 fn current_watermark(&self) -> Watermark;
92}
93
94pub struct PeriodicWatermarkGenerator {
96 config: WatermarkConfig,
97 max_timestamp: Option<DateTime<Utc>>,
98 current_watermark: Watermark,
99 last_emit: Option<DateTime<Utc>>,
100}
101
102impl PeriodicWatermarkGenerator {
103 pub fn new(config: WatermarkConfig) -> Self {
105 Self {
106 config,
107 max_timestamp: None,
108 current_watermark: Watermark::min(),
109 last_emit: None,
110 }
111 }
112}
113
114impl WatermarkGenerator for PeriodicWatermarkGenerator {
115 fn on_event(&mut self, element: &StreamElement) -> Option<Watermark> {
116 if let Some(max_ts) = self.max_timestamp {
117 if element.event_time > max_ts {
118 self.max_timestamp = Some(element.event_time);
119 }
120 } else {
121 self.max_timestamp = Some(element.event_time);
122 }
123
124 None
125 }
126
127 fn on_periodic_emit(&mut self) -> Option<Watermark> {
128 let now = Utc::now();
129 let should_emit = if let Some(last) = self.last_emit {
130 now - last >= self.config.interval
131 } else {
132 true
133 };
134
135 if should_emit {
136 if let Some(max_ts) = self.max_timestamp {
137 let new_watermark = match self.config.strategy {
138 WatermarkStrategy::Ascending => Watermark::new(max_ts),
139 WatermarkStrategy::BoundedOutOfOrderness => {
140 Watermark::new(max_ts - self.config.max_out_of_orderness)
141 }
142 _ => self.current_watermark,
143 };
144
145 if new_watermark > self.current_watermark {
146 self.current_watermark = new_watermark;
147 self.last_emit = Some(now);
148 return Some(new_watermark);
149 }
150 }
151 }
152
153 None
154 }
155
156 fn current_watermark(&self) -> Watermark {
157 self.current_watermark
158 }
159}
160
161pub struct PunctuatedWatermarkGenerator {
163 config: WatermarkConfig,
164 current_watermark: Watermark,
165 max_timestamp: Option<DateTime<Utc>>,
166}
167
168impl PunctuatedWatermarkGenerator {
169 pub fn new(config: WatermarkConfig) -> Self {
171 Self {
172 config,
173 current_watermark: Watermark::min(),
174 max_timestamp: None,
175 }
176 }
177
178 fn should_emit_watermark(&self, element: &StreamElement) -> bool {
180 if let Some(marker) = element.metadata.attributes.get("watermark_marker") {
181 marker == "true"
182 } else {
183 false
184 }
185 }
186}
187
188impl WatermarkGenerator for PunctuatedWatermarkGenerator {
189 fn on_event(&mut self, element: &StreamElement) -> Option<Watermark> {
190 if let Some(max_ts) = self.max_timestamp {
191 if element.event_time > max_ts {
192 self.max_timestamp = Some(element.event_time);
193 }
194 } else {
195 self.max_timestamp = Some(element.event_time);
196 }
197
198 if self.should_emit_watermark(element) {
199 if let Some(max_ts) = self.max_timestamp {
200 let new_watermark = Watermark::new(max_ts - self.config.max_out_of_orderness);
201
202 if new_watermark > self.current_watermark {
203 self.current_watermark = new_watermark;
204 return Some(new_watermark);
205 }
206 }
207 }
208
209 None
210 }
211
212 fn on_periodic_emit(&mut self) -> Option<Watermark> {
213 None
214 }
215
216 fn current_watermark(&self) -> Watermark {
217 self.current_watermark
218 }
219}
220
221pub struct MultiSourceWatermarkManager {
223 source_watermarks: Arc<RwLock<BTreeMap<String, Watermark>>>,
224 global_watermark: Arc<RwLock<Watermark>>,
225}
226
227impl MultiSourceWatermarkManager {
228 pub fn new() -> Self {
230 Self {
231 source_watermarks: Arc::new(RwLock::new(BTreeMap::new())),
232 global_watermark: Arc::new(RwLock::new(Watermark::min())),
233 }
234 }
235
236 pub async fn update_source_watermark(
238 &self,
239 source_id: String,
240 watermark: Watermark,
241 ) -> Result<()> {
242 let mut watermarks = self.source_watermarks.write().await;
243 watermarks.insert(source_id, watermark);
244
245 let min_watermark = watermarks
246 .values()
247 .min()
248 .copied()
249 .unwrap_or(Watermark::min());
250
251 let mut global = self.global_watermark.write().await;
252 if min_watermark > *global {
253 *global = min_watermark;
254 }
255
256 Ok(())
257 }
258
259 pub async fn global_watermark(&self) -> Watermark {
261 *self.global_watermark.read().await
262 }
263
264 pub async fn source_watermark(&self, source_id: &str) -> Option<Watermark> {
266 self.source_watermarks.read().await.get(source_id).copied()
267 }
268
269 pub async fn remove_source(&self, source_id: &str) -> Result<()> {
271 let mut watermarks = self.source_watermarks.write().await;
272 watermarks.remove(source_id);
273
274 let min_watermark = watermarks
275 .values()
276 .min()
277 .copied()
278 .unwrap_or(Watermark::max());
279
280 let mut global = self.global_watermark.write().await;
281 *global = min_watermark;
282
283 Ok(())
284 }
285}
286
287impl Default for MultiSourceWatermarkManager {
288 fn default() -> Self {
289 Self::new()
290 }
291}
292
293#[cfg(test)]
294mod tests {
295 use super::*;
296
297 #[test]
298 fn test_watermark_creation() {
299 let now = Utc::now();
300 let wm = Watermark::new(now);
301 assert_eq!(wm.timestamp, now);
302 }
303
304 #[test]
305 fn test_watermark_ordering() {
306 let wm1 = Watermark::new(Utc::now());
307 let wm2 = Watermark::new(Utc::now() + Duration::seconds(10));
308
309 assert!(wm1 < wm2);
310 assert!(wm2 > wm1);
311 }
312
313 #[tokio::test]
314 async fn test_periodic_watermark_generator() {
315 let config = WatermarkConfig::default();
316 let mut generator = PeriodicWatermarkGenerator::new(config);
317
318 let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
319 generator.on_event(&elem);
320
321 let wm = generator.on_periodic_emit();
322 assert!(wm.is_some());
323 }
324
325 #[tokio::test]
326 async fn test_multi_source_watermark_manager() {
327 let manager = MultiSourceWatermarkManager::new();
328
329 let wm1 = Watermark::new(Utc::now());
330 let wm2 = Watermark::new(Utc::now() + Duration::seconds(10));
331
332 manager
333 .update_source_watermark("source1".to_string(), wm1)
334 .await
335 .expect("Test watermark update for source1 should succeed");
336 manager
337 .update_source_watermark("source2".to_string(), wm2)
338 .await
339 .expect("Test watermark update for source2 should succeed");
340
341 let global = manager.global_watermark().await;
342 assert_eq!(global, wm1);
343 }
344
345 #[tokio::test]
346 async fn test_remove_source_watermark() {
347 let manager = MultiSourceWatermarkManager::new();
348
349 let wm1 = Watermark::new(Utc::now());
350 let wm2 = Watermark::new(Utc::now() + Duration::seconds(10));
351
352 manager
353 .update_source_watermark("source1".to_string(), wm1)
354 .await
355 .expect("Test watermark update for source1 should succeed");
356 manager
357 .update_source_watermark("source2".to_string(), wm2)
358 .await
359 .expect("Test watermark update for source2 should succeed");
360
361 manager
362 .remove_source("source1")
363 .await
364 .expect("Test source removal should succeed");
365
366 let global = manager.global_watermark().await;
367 assert_eq!(global, wm2);
368 }
369}