laminar_sql/datafusion/
proctime_udf.rs1use std::any::Any;
15use std::hash::{Hash, Hasher};
16use std::time::{SystemTime, UNIX_EPOCH};
17
18use arrow::datatypes::{DataType, TimeUnit};
19use datafusion_common::{Result, ScalarValue};
20use datafusion_expr::{
21 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
22};
23
24#[derive(Debug)]
30pub struct ProcTimeUdf {
31 signature: Signature,
32}
33
34impl ProcTimeUdf {
35 #[must_use]
37 pub fn new() -> Self {
38 Self {
39 signature: Signature::new(TypeSignature::Nullary, Volatility::Volatile),
40 }
41 }
42}
43
44impl Default for ProcTimeUdf {
45 fn default() -> Self {
46 Self::new()
47 }
48}
49
50impl PartialEq for ProcTimeUdf {
51 fn eq(&self, _other: &Self) -> bool {
52 true
53 }
54}
55
56impl Eq for ProcTimeUdf {}
57
58impl Hash for ProcTimeUdf {
59 fn hash<H: Hasher>(&self, state: &mut H) {
60 "proctime".hash(state);
61 }
62}
63
64impl ScalarUDFImpl for ProcTimeUdf {
65 fn as_any(&self) -> &dyn Any {
66 self
67 }
68
69 fn name(&self) -> &'static str {
70 "proctime"
71 }
72
73 fn signature(&self) -> &Signature {
74 &self.signature
75 }
76
77 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
78 Ok(DataType::Timestamp(TimeUnit::Millisecond, None))
79 }
80
81 #[allow(clippy::cast_possible_truncation)]
82 fn invoke_with_args(&self, _args: ScalarFunctionArgs) -> Result<ColumnarValue> {
83 let now_ms = SystemTime::now()
84 .duration_since(UNIX_EPOCH)
85 .unwrap_or_default()
86 .as_millis() as i64;
87 Ok(ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(
88 Some(now_ms),
89 None,
90 )))
91 }
92}
93
94#[cfg(test)]
95mod tests {
96 use super::*;
97 use std::sync::Arc;
98
99 use arrow_schema::Field;
100 use datafusion_common::config::ConfigOptions;
101 use datafusion_expr::ScalarUDF;
102
103 fn make_args() -> ScalarFunctionArgs {
104 ScalarFunctionArgs {
105 args: vec![],
106 arg_fields: vec![],
107 number_rows: 1,
108 return_field: Arc::new(Field::new(
109 "output",
110 DataType::Timestamp(TimeUnit::Millisecond, None),
111 true,
112 )),
113 config_options: Arc::new(ConfigOptions::default()),
114 }
115 }
116
117 #[test]
118 fn test_proctime_returns_timestamp() {
119 let udf = ProcTimeUdf::new();
120 let result = udf.invoke_with_args(make_args()).unwrap();
121 match result {
122 ColumnarValue::Scalar(ScalarValue::TimestampMillisecond(Some(v), _)) => {
123 assert!(v > 1_577_836_800_000, "timestamp too old: {v}");
125 }
126 other => panic!("Expected TimestampMillisecond, got: {other:?}"),
127 }
128 }
129
130 #[test]
131 fn test_proctime_registration() {
132 let udf = ScalarUDF::new_from_impl(ProcTimeUdf::new());
133 assert_eq!(udf.name(), "proctime");
134 }
135
136 #[test]
137 fn test_proctime_volatile() {
138 let udf = ProcTimeUdf::new();
139 assert_eq!(udf.signature().volatility, Volatility::Volatile);
140 }
141
142 #[test]
143 fn test_proctime_return_type() {
144 let udf = ProcTimeUdf::new();
145 let rt = udf.return_type(&[]).unwrap();
146 assert_eq!(rt, DataType::Timestamp(TimeUnit::Millisecond, None));
147 }
148}