laminar_sql/datafusion/
watermark_udf.rs1use std::any::Any;
11use std::hash::{Hash, Hasher};
12use std::sync::atomic::{AtomicI64, Ordering};
13use std::sync::Arc;
14
15use arrow::datatypes::{DataType, TimeUnit};
16use datafusion_common::{Result, ScalarValue};
17use datafusion_expr::{
18 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
19};
20
21pub const NO_WATERMARK: i64 = -1;
24
25#[derive(Debug)]
34pub struct WatermarkUdf {
35 signature: Signature,
36 watermark_ms: Arc<AtomicI64>,
37}
38
39impl WatermarkUdf {
40 pub fn new(watermark_ms: Arc<AtomicI64>) -> Self {
47 Self {
48 signature: Signature::new(TypeSignature::Nullary, Volatility::Volatile),
49 watermark_ms,
50 }
51 }
52
53 #[must_use]
55 pub fn unset() -> Self {
56 Self::new(Arc::new(AtomicI64::new(NO_WATERMARK)))
57 }
58
59 #[must_use]
61 pub fn watermark_ref(&self) -> &Arc<AtomicI64> {
62 &self.watermark_ms
63 }
64}
65
66impl PartialEq for WatermarkUdf {
67 fn eq(&self, _other: &Self) -> bool {
68 true }
70}
71
72impl Eq for WatermarkUdf {}
73
74impl Hash for WatermarkUdf {
75 fn hash<H: Hasher>(&self, state: &mut H) {
76 "watermark".hash(state);
77 }
78}
79
80impl ScalarUDFImpl for WatermarkUdf {
81 fn as_any(&self) -> &dyn Any {
82 self
83 }
84
85 fn name(&self) -> &'static str {
86 "watermark"
87 }
88
89 fn signature(&self) -> &Signature {
90 &self.signature
91 }
92
93 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
94 Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
95 }
96
97 fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
98 let wm = self.watermark_ms.load(Ordering::Relaxed);
99 if wm < 0 {
100 Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
101 None, None,
102 )))
103 } else {
104 Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
105 Some(wm),
106 None,
107 )))
108 }
109 }
110}
111
112#[cfg(test)]
113mod tests {
114 use super::*;
115 use arrow::datatypes::TimeUnit;
116 use arrow_schema::Field;
117 use datafusion_common::config::ConfigOptions;
118 use datafusion_expr::ScalarUDF;
119
120 fn make_args() -> ScalarFunctionArgs {
121 ScalarFunctionArgs {
122 args: vec![],
123 arg_fields: vec![],
124 number_rows: 1,
125 return_field: Arc::new(Field::new(
126 "output",
127 DataType::Timestamp(TimeUnit::Millisecond, None),
128 true,
129 )),
130 config_options: Arc::new(ConfigOptions::default()),
131 }
132 }
133
134 #[test]
135 fn test_watermark_default_null() {
136 let udf = WatermarkUdf::unset();
137 let result = udf.invoke_with_args(make_args()).unwrap();
138 match result {
139 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(None, _)) => {}
140 other => panic!("Expected NULL when no watermark set, got: {other:?}"),
141 }
142 }
143
144 #[test]
145 fn test_watermark_fixed_value() {
146 let wm = Arc::new(AtomicI64::new(1_000_000));
147 let udf = WatermarkUdf::new(Arc::clone(&wm));
148
149 let result = udf.invoke_with_args(make_args()).unwrap();
150 match result {
151 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(v), _)) => {
152 assert_eq!(v, 1_000_000);
153 }
154 other => panic!("Expected TimestampMillisecond(1_000_000), got: {other:?}"),
155 }
156 }
157
158 #[test]
159 fn test_watermark_atomic_update() {
160 let wm = Arc::new(AtomicI64::new(NO_WATERMARK));
161 let udf = WatermarkUdf::new(Arc::clone(&wm));
162
163 let result = udf.invoke_with_args(make_args()).unwrap();
165 assert!(matches!(
166 result,
167 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(None, _))
168 ));
169
170 wm.store(500_000, Ordering::Relaxed);
172 let result = udf.invoke_with_args(make_args()).unwrap();
173 match result {
174 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(v), _)) => {
175 assert_eq!(v, 500_000);
176 }
177 other => panic!("Expected updated watermark, got: {other:?}"),
178 }
179 }
180
181 #[test]
182 fn test_watermark_registration() {
183 let udf = ScalarUDF::new_from_impl(WatermarkUdf::unset());
184 assert_eq!(udf.name(), "watermark");
185 }
186}