Skip to main content

laminar_sql/datafusion/
watermark_udf.rs

1//! Watermark UDF for `DataFusion` integration
2//!
3//! Provides a `watermark()` scalar function that returns the current
4//! watermark timestamp from Ring 0 via a shared `Arc<AtomicI64>`.
5//!
6//! The watermark represents the boundary below which all events are
7//! assumed to have arrived. Queries can use `WHERE event_time > watermark()`
8//! to filter stale data.
9
10use std::any::Any;
11use std::hash::{Hash, Hasher};
12use std::sync::atomic::{AtomicI64, Ordering};
13use std::sync::Arc;
14
15use arrow::datatypes::{DataType, TimeUnit};
16use datafusion_common::{Result, ScalarValue};
17use datafusion_expr::{
18    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
19};
20
21/// No-watermark sentinel value. When the atomic holds this value,
22/// `watermark()` returns `NULL`.
23pub const NO_WATERMARK: i64 = -1;
24
25/// Scalar UDF that returns the current watermark timestamp.
26///
27/// The watermark is stored in a shared `Arc<AtomicI64>` that Ring 0
28/// updates as events are processed. This UDF reads it with relaxed
29/// ordering (appropriate since watermarks are monotonically advancing
30/// and a slightly stale read is acceptable).
31///
32/// Returns `NULL` when no watermark has been set (value < 0).
33#[derive(Debug)]
34pub struct WatermarkUdf {
35    signature: Signature,
36    watermark_ms: Arc<AtomicI64>,
37}
38
39impl WatermarkUdf {
40    /// Creates a new watermark UDF backed by the given atomic value.
41    ///
42    /// # Arguments
43    ///
44    /// * `watermark_ms` - Shared atomic holding the current watermark
45    ///   in milliseconds since epoch. Values < 0 mean "no watermark".
46    pub fn new(watermark_ms: Arc<AtomicI64>) -> Self {
47        Self {
48            signature: Signature::new(TypeSignature::Nullary, Volatility::Volatile),
49            watermark_ms,
50        }
51    }
52
53    /// Creates a watermark UDF that always returns `NULL`.
54    #[must_use]
55    pub fn unset() -> Self {
56        Self::new(Arc::new(AtomicI64::new(NO_WATERMARK)))
57    }
58
59    /// Returns a reference to the underlying atomic watermark value.
60    #[must_use]
61    pub fn watermark_ref(&self) -> &Arc<AtomicI64> {
62        &self.watermark_ms
63    }
64}
65
66impl PartialEq for WatermarkUdf {
67    fn eq(&self, _other: &Self) -> bool {
68        true // All watermark UDF instances serve the same purpose
69    }
70}
71
72impl Eq for WatermarkUdf {}
73
74impl Hash for WatermarkUdf {
75    fn hash<H: Hasher>(&self, state: &mut H) {
76        "watermark".hash(state);
77    }
78}
79
80impl ScalarUDFImpl for WatermarkUdf {
81    fn as_any(&self) -> &dyn Any {
82        self
83    }
84
85    fn name(&self) -> &'static str {
86        "watermark"
87    }
88
89    fn signature(&self) -> &Signature {
90        &self.signature
91    }
92
93    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
94        Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
95    }
96
97    fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
98        let wm = self.watermark_ms.load(Ordering::Relaxed);
99        if wm < 0 {
100            Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
101                None, None,
102            )))
103        } else {
104            Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
105                Some(wm),
106                None,
107            )))
108        }
109    }
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115    use arrow::datatypes::TimeUnit;
116    use arrow_schema::Field;
117    use datafusion_common::config::ConfigOptions;
118    use datafusion_expr::ScalarUDF;
119
120    fn make_args() -> ScalarFunctionArgs {
121        ScalarFunctionArgs {
122            args: vec![],
123            arg_fields: vec![],
124            number_rows: 1,
125            return_field: Arc::new(Field::new(
126                "output",
127                DataType::Timestamp(TimeUnit::Millisecond, None),
128                true,
129            )),
130            config_options: Arc::new(ConfigOptions::default()),
131        }
132    }
133
134    #[test]
135    fn test_watermark_default_null() {
136        let udf = WatermarkUdf::unset();
137        let result = udf.invoke_with_args(make_args()).unwrap();
138        match result {
139            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(None, _)) => {}
140            other => panic!("Expected NULL when no watermark set, got: {other:?}"),
141        }
142    }
143
144    #[test]
145    fn test_watermark_fixed_value() {
146        let wm = Arc::new(AtomicI64::new(1_000_000));
147        let udf = WatermarkUdf::new(Arc::clone(&wm));
148
149        let result = udf.invoke_with_args(make_args()).unwrap();
150        match result {
151            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(v), _)) => {
152                assert_eq!(v, 1_000_000);
153            }
154            other => panic!("Expected TimestampMillisecond(1_000_000), got: {other:?}"),
155        }
156    }
157
158    #[test]
159    fn test_watermark_atomic_update() {
160        let wm = Arc::new(AtomicI64::new(NO_WATERMARK));
161        let udf = WatermarkUdf::new(Arc::clone(&wm));
162
163        // Initially NULL
164        let result = udf.invoke_with_args(make_args()).unwrap();
165        assert!(matches!(
166            result,
167            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(None, _))
168        ));
169
170        // Update watermark
171        wm.store(500_000, Ordering::Relaxed);
172        let result = udf.invoke_with_args(make_args()).unwrap();
173        match result {
174            ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(v), _)) => {
175                assert_eq!(v, 500_000);
176            }
177            other => panic!("Expected updated watermark, got: {other:?}"),
178        }
179    }
180
181    #[test]
182    fn test_watermark_registration() {
183        let udf = ScalarUDF::new_from_impl(WatermarkUdf::unset());
184        assert_eq!(udf.name(), "watermark");
185    }
186}