Skip to main content

oxigdal_streaming/windowing/
watermark.rs

1//! Watermark generation for event-time processing.
2
3use crate::core::stream::StreamElement;
4use crate::error::Result;
5use chrono::{DateTime, Duration, Utc};
6use serde::{Deserialize, Serialize};
7use std::collections::BTreeMap;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11/// A watermark representing event-time progress.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
13pub struct Watermark {
14    /// Watermark timestamp
15    pub timestamp: DateTime<Utc>,
16}
17
18impl Watermark {
19    /// Create a new watermark.
20    pub fn new(timestamp: DateTime<Utc>) -> Self {
21        Self { timestamp }
22    }
23
24    /// Get the minimum watermark (beginning of time).
25    pub fn min() -> Self {
26        Self {
27            timestamp: DateTime::from_timestamp(0, 0).unwrap_or_else(Utc::now),
28        }
29    }
30
31    /// Get the maximum watermark (end of time).
32    pub fn max() -> Self {
33        Self {
34            timestamp: DateTime::from_timestamp(i64::MAX / 1000, 0).unwrap_or_else(Utc::now),
35        }
36    }
37}
38
39/// Strategy for generating watermarks.
40#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
41pub enum WatermarkStrategy {
42    /// Ascending timestamps (watermark = max observed timestamp)
43    Ascending,
44
45    /// Bounded out-of-orderness (watermark = max timestamp - max delay)
46    BoundedOutOfOrderness,
47
48    /// Periodic watermarks
49    Periodic,
50
51    /// Punctuated watermarks (based on special markers)
52    Punctuated,
53}
54
55/// Configuration for watermark generation.
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct WatermarkConfig {
58    /// Watermark strategy
59    pub strategy: WatermarkStrategy,
60
61    /// Maximum out-of-orderness
62    pub max_out_of_orderness: Duration,
63
64    /// Watermark interval (for periodic strategy)
65    pub interval: Duration,
66
67    /// Idle timeout (emit watermark if no data for this duration)
68    pub idle_timeout: Option<Duration>,
69}
70
71impl Default for WatermarkConfig {
72    fn default() -> Self {
73        Self {
74            strategy: WatermarkStrategy::BoundedOutOfOrderness,
75            max_out_of_orderness: Duration::seconds(5),
76            interval: Duration::seconds(1),
77            idle_timeout: Some(Duration::seconds(10)),
78        }
79    }
80}
81
82/// Generates watermarks for a stream.
83pub trait WatermarkGenerator: Send + Sync {
84    /// Process an element and potentially generate a watermark.
85    fn on_event(&mut self, element: &StreamElement) -> Option<Watermark>;
86
87    /// Generate a periodic watermark.
88    fn on_periodic_emit(&mut self) -> Option<Watermark>;
89
90    /// Get the current watermark.
91    fn current_watermark(&self) -> Watermark;
92}
93
94/// Periodic watermark generator.
95pub struct PeriodicWatermarkGenerator {
96    config: WatermarkConfig,
97    max_timestamp: Option<DateTime<Utc>>,
98    current_watermark: Watermark,
99    last_emit: Option<DateTime<Utc>>,
100}
101
102impl PeriodicWatermarkGenerator {
103    /// Create a new periodic watermark generator.
104    pub fn new(config: WatermarkConfig) -> Self {
105        Self {
106            config,
107            max_timestamp: None,
108            current_watermark: Watermark::min(),
109            last_emit: None,
110        }
111    }
112}
113
114impl WatermarkGenerator for PeriodicWatermarkGenerator {
115    fn on_event(&mut self, element: &StreamElement) -> Option<Watermark> {
116        if let Some(max_ts) = self.max_timestamp {
117            if element.event_time > max_ts {
118                self.max_timestamp = Some(element.event_time);
119            }
120        } else {
121            self.max_timestamp = Some(element.event_time);
122        }
123
124        None
125    }
126
127    fn on_periodic_emit(&mut self) -> Option<Watermark> {
128        let now = Utc::now();
129        let should_emit = if let Some(last) = self.last_emit {
130            now - last >= self.config.interval
131        } else {
132            true
133        };
134
135        if should_emit {
136            if let Some(max_ts) = self.max_timestamp {
137                let new_watermark = match self.config.strategy {
138                    WatermarkStrategy::Ascending => Watermark::new(max_ts),
139                    WatermarkStrategy::BoundedOutOfOrderness => {
140                        Watermark::new(max_ts - self.config.max_out_of_orderness)
141                    }
142                    _ => self.current_watermark,
143                };
144
145                if new_watermark > self.current_watermark {
146                    self.current_watermark = new_watermark;
147                    self.last_emit = Some(now);
148                    return Some(new_watermark);
149                }
150            }
151        }
152
153        None
154    }
155
156    fn current_watermark(&self) -> Watermark {
157        self.current_watermark
158    }
159}
160
161/// Punctuated watermark generator.
162pub struct PunctuatedWatermarkGenerator {
163    config: WatermarkConfig,
164    current_watermark: Watermark,
165    max_timestamp: Option<DateTime<Utc>>,
166}
167
168impl PunctuatedWatermarkGenerator {
169    /// Create a new punctuated watermark generator.
170    pub fn new(config: WatermarkConfig) -> Self {
171        Self {
172            config,
173            current_watermark: Watermark::min(),
174            max_timestamp: None,
175        }
176    }
177
178    /// Check if an element should trigger a watermark.
179    fn should_emit_watermark(&self, element: &StreamElement) -> bool {
180        if let Some(marker) = element.metadata.attributes.get("watermark_marker") {
181            marker == "true"
182        } else {
183            false
184        }
185    }
186}
187
188impl WatermarkGenerator for PunctuatedWatermarkGenerator {
189    fn on_event(&mut self, element: &StreamElement) -> Option<Watermark> {
190        if let Some(max_ts) = self.max_timestamp {
191            if element.event_time > max_ts {
192                self.max_timestamp = Some(element.event_time);
193            }
194        } else {
195            self.max_timestamp = Some(element.event_time);
196        }
197
198        if self.should_emit_watermark(element) {
199            if let Some(max_ts) = self.max_timestamp {
200                let new_watermark = Watermark::new(max_ts - self.config.max_out_of_orderness);
201
202                if new_watermark > self.current_watermark {
203                    self.current_watermark = new_watermark;
204                    return Some(new_watermark);
205                }
206            }
207        }
208
209        None
210    }
211
212    fn on_periodic_emit(&mut self) -> Option<Watermark> {
213        None
214    }
215
216    fn current_watermark(&self) -> Watermark {
217        self.current_watermark
218    }
219}
220
221/// Multi-source watermark manager.
222pub struct MultiSourceWatermarkManager {
223    source_watermarks: Arc<RwLock<BTreeMap<String, Watermark>>>,
224    global_watermark: Arc<RwLock<Watermark>>,
225}
226
227impl MultiSourceWatermarkManager {
228    /// Create a new multi-source watermark manager.
229    pub fn new() -> Self {
230        Self {
231            source_watermarks: Arc::new(RwLock::new(BTreeMap::new())),
232            global_watermark: Arc::new(RwLock::new(Watermark::min())),
233        }
234    }
235
236    /// Update watermark for a source.
237    pub async fn update_source_watermark(
238        &self,
239        source_id: String,
240        watermark: Watermark,
241    ) -> Result<()> {
242        let mut watermarks = self.source_watermarks.write().await;
243        watermarks.insert(source_id, watermark);
244
245        let min_watermark = watermarks
246            .values()
247            .min()
248            .copied()
249            .unwrap_or(Watermark::min());
250
251        let mut global = self.global_watermark.write().await;
252        if min_watermark > *global {
253            *global = min_watermark;
254        }
255
256        Ok(())
257    }
258
259    /// Get the global watermark (minimum of all source watermarks).
260    pub async fn global_watermark(&self) -> Watermark {
261        *self.global_watermark.read().await
262    }
263
264    /// Get watermark for a specific source.
265    pub async fn source_watermark(&self, source_id: &str) -> Option<Watermark> {
266        self.source_watermarks.read().await.get(source_id).copied()
267    }
268
269    /// Remove a source.
270    pub async fn remove_source(&self, source_id: &str) -> Result<()> {
271        let mut watermarks = self.source_watermarks.write().await;
272        watermarks.remove(source_id);
273
274        let min_watermark = watermarks
275            .values()
276            .min()
277            .copied()
278            .unwrap_or(Watermark::max());
279
280        let mut global = self.global_watermark.write().await;
281        *global = min_watermark;
282
283        Ok(())
284    }
285}
286
287impl Default for MultiSourceWatermarkManager {
288    fn default() -> Self {
289        Self::new()
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296
297    #[test]
298    fn test_watermark_creation() {
299        let now = Utc::now();
300        let wm = Watermark::new(now);
301        assert_eq!(wm.timestamp, now);
302    }
303
304    #[test]
305    fn test_watermark_ordering() {
306        let wm1 = Watermark::new(Utc::now());
307        let wm2 = Watermark::new(Utc::now() + Duration::seconds(10));
308
309        assert!(wm1 < wm2);
310        assert!(wm2 > wm1);
311    }
312
313    #[tokio::test]
314    async fn test_periodic_watermark_generator() {
315        let config = WatermarkConfig::default();
316        let mut generator = PeriodicWatermarkGenerator::new(config);
317
318        let elem = StreamElement::new(vec![1, 2, 3], Utc::now());
319        generator.on_event(&elem);
320
321        let wm = generator.on_periodic_emit();
322        assert!(wm.is_some());
323    }
324
325    #[tokio::test]
326    async fn test_multi_source_watermark_manager() {
327        let manager = MultiSourceWatermarkManager::new();
328
329        let wm1 = Watermark::new(Utc::now());
330        let wm2 = Watermark::new(Utc::now() + Duration::seconds(10));
331
332        manager
333            .update_source_watermark("source1".to_string(), wm1)
334            .await
335            .expect("Test watermark update for source1 should succeed");
336        manager
337            .update_source_watermark("source2".to_string(), wm2)
338            .await
339            .expect("Test watermark update for source2 should succeed");
340
341        let global = manager.global_watermark().await;
342        assert_eq!(global, wm1);
343    }
344
345    #[tokio::test]
346    async fn test_remove_source_watermark() {
347        let manager = MultiSourceWatermarkManager::new();
348
349        let wm1 = Watermark::new(Utc::now());
350        let wm2 = Watermark::new(Utc::now() + Duration::seconds(10));
351
352        manager
353            .update_source_watermark("source1".to_string(), wm1)
354            .await
355            .expect("Test watermark update for source1 should succeed");
356        manager
357            .update_source_watermark("source2".to_string(), wm2)
358            .await
359            .expect("Test watermark update for source2 should succeed");
360
361        manager
362            .remove_source("source1")
363            .await
364            .expect("Test source removal should succeed");
365
366        let global = manager.global_watermark().await;
367        assert_eq!(global, wm2);
368    }
369}