Skip to main content

laminar_sql/datafusion/
format_bridge_udf.rs

1//! Format bridge scalar UDFs (F-SCHEMA-014).
2//!
3//! SQL-callable functions for inline format conversion within queries:
4//!
5//! - [`ParseEpochUdf`] — `parse_epoch(number, unit) -> timestamp`
6//! - [`ParseTimestampUdf`] — `parse_timestamp(string, format, timezone) -> timestamp`
7//! - [`ToJsonUdf`] — `to_json(value) -> text`
8//! - [`FromJsonUdf`] — `from_json(string) -> jsonb`
9
10use std::any::Any;
11use std::hash::{Hash, Hasher};
12use std::sync::Arc;
13
14use arrow::datatypes::{DataType, TimeUnit};
15use arrow_array::{
16    builder::{LargeBinaryBuilder, StringBuilder, TimestampMicrosecondBuilder},
17    Array, ArrayRef, LargeBinaryArray, StringArray, TimestampMicrosecondArray,
18};
19use datafusion_common::Result;
20use datafusion_expr::{
21    ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
22};
23
24use super::json_types;
25
26// ── Helpers ──────────────────────────────────────────────────────
27
28/// Expand all args to arrays of the same length.
29fn expand_args(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
30    let len = args
31        .iter()
32        .find_map(|a| match a {
33            ColumnarValue::Array(arr) => Some(arr.len()),
34            ColumnarValue::Scalar(_) => None,
35        })
36        .unwrap_or(1);
37
38    args.iter()
39        .map(|a| match a {
40            ColumnarValue::Array(arr) => Ok(Arc::clone(arr)),
41            ColumnarValue::Scalar(s) => s.to_array_of_size(len),
42        })
43        .collect()
44}
45
46// ══════════════════════════════════════════════════════════════════
47// parse_epoch(number, unit) -> timestamp
48// ══════════════════════════════════════════════════════════════════
49
50/// `parse_epoch(number, unit) -> timestamp`
51///
52/// Converts an epoch number to a timestamp. The unit parameter specifies
53/// the time unit: 'seconds', 'milliseconds', 'microseconds', 'nanoseconds'.
54///
55/// Returns `TimestampMicrosecond` (microsecond precision, no timezone).
56/// Executes in Ring 0 — pure arithmetic, no allocation.
57///
58/// # Examples
59///
60/// ```sql
61/// SELECT parse_epoch(1708528800, 'seconds') AS ts;
62/// SELECT parse_epoch(1708528800000, 'milliseconds') AS ts;
63/// ```
64#[derive(Debug)]
65pub struct ParseEpochUdf {
66    signature: Signature,
67}
68
69impl ParseEpochUdf {
70    /// Creates a new `parse_epoch` UDF.
71    #[must_use]
72    pub fn new() -> Self {
73        Self {
74            signature: Signature::new(
75                TypeSignature::Exact(vec![DataType::Int64, DataType::Utf8]),
76                Volatility::Immutable,
77            ),
78        }
79    }
80}
81
82impl Default for ParseEpochUdf {
83    fn default() -> Self {
84        Self::new()
85    }
86}
87
88impl PartialEq for ParseEpochUdf {
89    fn eq(&self, _other: &Self) -> bool {
90        true
91    }
92}
93
94impl Eq for ParseEpochUdf {}
95
96impl Hash for ParseEpochUdf {
97    fn hash<H: Hasher>(&self, state: &mut H) {
98        "parse_epoch".hash(state);
99    }
100}
101
102impl ScalarUDFImpl for ParseEpochUdf {
103    fn as_any(&self) -> &dyn Any {
104        self
105    }
106
107    fn name(&self) -> &'static str {
108        "parse_epoch"
109    }
110
111    fn signature(&self) -> &Signature {
112        &self.signature
113    }
114
115    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
116        Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
117    }
118
119    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
120        let expanded = expand_args(&args.args)?;
121        let val_arr = expanded[0]
122            .as_any()
123            .downcast_ref::<arrow_array::Int64Array>()
124            .ok_or_else(|| {
125                datafusion_common::DataFusionError::Internal(
126                    "parse_epoch: first arg must be Int64".into(),
127                )
128            })?;
129        let unit_arr = expanded[1]
130            .as_any()
131            .downcast_ref::<StringArray>()
132            .ok_or_else(|| {
133                datafusion_common::DataFusionError::Internal(
134                    "parse_epoch: second arg must be Utf8".into(),
135                )
136            })?;
137
138        let mut builder = TimestampMicrosecondBuilder::with_capacity(val_arr.len());
139        for i in 0..val_arr.len() {
140            if val_arr.is_null(i) || unit_arr.is_null(i) {
141                builder.append_null();
142            } else {
143                let value = val_arr.value(i);
144                let unit = unit_arr.value(i);
145                let micros = epoch_to_micros(value, unit)?;
146                builder.append_value(micros);
147            }
148        }
149        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
150    }
151}
152
153/// Convert an epoch value with the given unit string to microseconds.
154fn epoch_to_micros(value: i64, unit: &str) -> Result<i64> {
155    match unit.to_ascii_lowercase().as_str() {
156        "seconds" | "s" => Ok(value.saturating_mul(1_000_000)),
157        "milliseconds" | "ms" => Ok(value.saturating_mul(1_000)),
158        "microseconds" | "us" => Ok(value),
159        "nanoseconds" | "ns" => Ok(value / 1_000),
160        _ => Err(datafusion_common::DataFusionError::Execution(format!(
161            "parse_epoch: invalid unit '{unit}'. \
162             Expected: seconds, milliseconds, microseconds, nanoseconds"
163        ))),
164    }
165}
166
167// ══════════════════════════════════════════════════════════════════
168// parse_timestamp(string, format, timezone) -> timestamp
169// ══════════════════════════════════════════════════════════════════
170
171/// `parse_timestamp(string, format) -> timestamp`
172///
173/// Parses a timestamp string using the given chrono format string.
174/// Use `'iso8601'` as a shortcut for ISO 8601 parsing.
175///
176/// Returns `TimestampMicrosecond`.
177///
178/// # Examples
179///
180/// ```sql
181/// SELECT parse_timestamp('2026-02-21 14:30:00', '%Y-%m-%d %H:%M:%S');
182/// SELECT parse_timestamp('2026-02-21T14:30:00Z', 'iso8601');
183/// ```
184#[derive(Debug)]
185pub struct ParseTimestampUdf {
186    signature: Signature,
187}
188
189impl ParseTimestampUdf {
190    /// Creates a new `parse_timestamp` UDF.
191    #[must_use]
192    pub fn new() -> Self {
193        Self {
194            signature: Signature::new(
195                TypeSignature::Exact(vec![DataType::Utf8, DataType::Utf8]),
196                Volatility::Immutable,
197            ),
198        }
199    }
200}
201
202impl Default for ParseTimestampUdf {
203    fn default() -> Self {
204        Self::new()
205    }
206}
207
208impl PartialEq for ParseTimestampUdf {
209    fn eq(&self, _other: &Self) -> bool {
210        true
211    }
212}
213
214impl Eq for ParseTimestampUdf {}
215
216impl Hash for ParseTimestampUdf {
217    fn hash<H: Hasher>(&self, state: &mut H) {
218        "parse_timestamp".hash(state);
219    }
220}
221
222impl ScalarUDFImpl for ParseTimestampUdf {
223    fn as_any(&self) -> &dyn Any {
224        self
225    }
226
227    fn name(&self) -> &'static str {
228        "parse_timestamp"
229    }
230
231    fn signature(&self) -> &Signature {
232        &self.signature
233    }
234
235    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
236        Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
237    }
238
239    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
240        let expanded = expand_args(&args.args)?;
241        let str_arr = expanded[0]
242            .as_any()
243            .downcast_ref::<StringArray>()
244            .ok_or_else(|| {
245                datafusion_common::DataFusionError::Internal(
246                    "parse_timestamp: first arg must be Utf8".into(),
247                )
248            })?;
249        let fmt_arr = expanded[1]
250            .as_any()
251            .downcast_ref::<StringArray>()
252            .ok_or_else(|| {
253                datafusion_common::DataFusionError::Internal(
254                    "parse_timestamp: second arg must be Utf8".into(),
255                )
256            })?;
257
258        let mut builder = TimestampMicrosecondBuilder::with_capacity(str_arr.len());
259        for i in 0..str_arr.len() {
260            if str_arr.is_null(i) || fmt_arr.is_null(i) {
261                builder.append_null();
262            } else {
263                let ts_str = str_arr.value(i);
264                let fmt = fmt_arr.value(i);
265                match parse_ts_string(ts_str, fmt) {
266                    Ok(micros) => builder.append_value(micros),
267                    Err(_) => builder.append_null(),
268                }
269            }
270        }
271        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
272    }
273}
274
275/// Parse a timestamp string with the given format to microseconds since epoch.
276fn parse_ts_string(ts_str: &str, fmt: &str) -> std::result::Result<i64, String> {
277    use chrono::NaiveDateTime;
278
279    if fmt.eq_ignore_ascii_case("iso8601") {
280        // Try RFC 3339 / ISO 8601
281        let dt = ts_str
282            .parse::<chrono::DateTime<chrono::Utc>>()
283            .or_else(|_| {
284                NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H:%M:%S%.f")
285                    .map(|ndt| ndt.and_utc())
286            })
287            .or_else(|_| {
288                NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H:%M:%S").map(|ndt| ndt.and_utc())
289            })
290            .map_err(|e| e.to_string())?;
291        return Ok(dt.timestamp_micros());
292    }
293
294    let ndt = NaiveDateTime::parse_from_str(ts_str, fmt).map_err(|e| e.to_string())?;
295    Ok(ndt.and_utc().timestamp_micros())
296}
297
298// ══════════════════════════════════════════════════════════════════
299// to_json(value) -> text
300// ══════════════════════════════════════════════════════════════════
301
302/// `to_json(value) -> text`
303///
304/// Converts any SQL value to a JSON text string.
305/// Works with all Arrow data types — strings, numbers, booleans, structs.
306///
307/// # Examples
308///
309/// ```sql
310/// SELECT to_json(42);           -- Returns: '42'
311/// SELECT to_json('hello');      -- Returns: '"hello"'
312/// ```
313#[derive(Debug)]
314pub struct ToJsonUdf {
315    signature: Signature,
316}
317
318impl ToJsonUdf {
319    /// Creates a new `to_json` UDF.
320    #[must_use]
321    pub fn new() -> Self {
322        Self {
323            signature: Signature::new(TypeSignature::Any(1), Volatility::Immutable),
324        }
325    }
326}
327
328impl Default for ToJsonUdf {
329    fn default() -> Self {
330        Self::new()
331    }
332}
333
334impl PartialEq for ToJsonUdf {
335    fn eq(&self, _other: &Self) -> bool {
336        true
337    }
338}
339
340impl Eq for ToJsonUdf {}
341
342impl Hash for ToJsonUdf {
343    fn hash<H: Hasher>(&self, state: &mut H) {
344        "to_json".hash(state);
345    }
346}
347
348impl ScalarUDFImpl for ToJsonUdf {
349    fn as_any(&self) -> &dyn Any {
350        self
351    }
352
353    fn name(&self) -> &'static str {
354        "to_json"
355    }
356
357    fn signature(&self) -> &Signature {
358        &self.signature
359    }
360
361    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
362        Ok(DataType::Utf8)
363    }
364
365    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
366        let expanded = expand_args(&args.args)?;
367        let arr = &expanded[0];
368        let len = arr.len();
369
370        let mut builder = StringBuilder::with_capacity(len, 256);
371        for row in 0..len {
372            if arr.is_null(row) {
373                builder.append_value("null");
374            } else {
375                let val = arrow_value_to_json(arr, row);
376                builder.append_value(val.to_string());
377            }
378        }
379        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
380    }
381}
382
383/// Convert an Arrow array value at a given row to a `serde_json::Value`.
384fn arrow_value_to_json(arr: &ArrayRef, row: usize) -> serde_json::Value {
385    if arr.is_null(row) {
386        return serde_json::Value::Null;
387    }
388    if let Some(a) = arr.as_any().downcast_ref::<StringArray>() {
389        return serde_json::Value::String(a.value(row).to_owned());
390    }
391    if let Some(a) = arr.as_any().downcast_ref::<arrow_array::Int64Array>() {
392        return serde_json::Value::Number(a.value(row).into());
393    }
394    if let Some(a) = arr.as_any().downcast_ref::<arrow_array::Int32Array>() {
395        return serde_json::Value::Number(i64::from(a.value(row)).into());
396    }
397    if let Some(a) = arr.as_any().downcast_ref::<arrow_array::Float64Array>() {
398        if let Some(n) = serde_json::Number::from_f64(a.value(row)) {
399            return serde_json::Value::Number(n);
400        }
401        return serde_json::Value::Null;
402    }
403    if let Some(a) = arr.as_any().downcast_ref::<arrow_array::BooleanArray>() {
404        return serde_json::Value::Bool(a.value(row));
405    }
406    // JSONB passthrough
407    if let Some(a) = arr.as_any().downcast_ref::<LargeBinaryArray>() {
408        if let Some(text) = json_types::jsonb_to_text(a.value(row)) {
409            if let Ok(val) = serde_json::from_str::<serde_json::Value>(&text) {
410                return val;
411            }
412            return serde_json::Value::String(text);
413        }
414        return serde_json::Value::Null;
415    }
416    // Timestamp types
417    if let Some(a) = arr.as_any().downcast_ref::<TimestampMicrosecondArray>() {
418        let micros = a.value(row);
419        let secs = micros / 1_000_000;
420        let sub_micros = (micros % 1_000_000).unsigned_abs() * 1_000;
421        #[allow(clippy::cast_possible_truncation)]
422        let nsecs = sub_micros as u32;
423        if let Some(dt) = chrono::DateTime::from_timestamp(secs, nsecs) {
424            return serde_json::Value::String(dt.to_rfc3339());
425        }
426        return serde_json::Value::Number(micros.into());
427    }
428    // Fallback via ScalarValue display
429    let sv = datafusion_common::ScalarValue::try_from_array(arr, row).ok();
430    match sv {
431        Some(s) => serde_json::Value::String(s.to_string()),
432        None => serde_json::Value::Null,
433    }
434}
435
436// ══════════════════════════════════════════════════════════════════
437// from_json(string) -> jsonb
438// ══════════════════════════════════════════════════════════════════
439
440/// `from_json(string) -> jsonb`
441///
442/// Parses a JSON string and returns the JSONB binary representation.
443/// Returns NULL for invalid JSON.
444///
445/// # Examples
446///
447/// ```sql
448/// SELECT from_json('{"name": "Alice", "age": 30}');
449/// SELECT json_typeof(from_json('42'));  -- Returns: 'number'
450/// ```
451#[derive(Debug)]
452pub struct FromJsonUdf {
453    signature: Signature,
454}
455
456impl FromJsonUdf {
457    /// Creates a new `from_json` UDF.
458    #[must_use]
459    pub fn new() -> Self {
460        Self {
461            signature: Signature::new(
462                TypeSignature::Exact(vec![DataType::Utf8]),
463                Volatility::Immutable,
464            ),
465        }
466    }
467}
468
469impl Default for FromJsonUdf {
470    fn default() -> Self {
471        Self::new()
472    }
473}
474
475impl PartialEq for FromJsonUdf {
476    fn eq(&self, _other: &Self) -> bool {
477        true
478    }
479}
480
481impl Eq for FromJsonUdf {}
482
483impl Hash for FromJsonUdf {
484    fn hash<H: Hasher>(&self, state: &mut H) {
485        "from_json".hash(state);
486    }
487}
488
489impl ScalarUDFImpl for FromJsonUdf {
490    fn as_any(&self) -> &dyn Any {
491        self
492    }
493
494    fn name(&self) -> &'static str {
495        "from_json"
496    }
497
498    fn signature(&self) -> &Signature {
499        &self.signature
500    }
501
502    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
503        Ok(DataType::LargeBinary) // Returns JSONB
504    }
505
506    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
507        let expanded = expand_args(&args.args)?;
508        let str_arr = expanded[0]
509            .as_any()
510            .downcast_ref::<StringArray>()
511            .ok_or_else(|| {
512                datafusion_common::DataFusionError::Internal("from_json: arg must be Utf8".into())
513            })?;
514
515        let mut builder = LargeBinaryBuilder::with_capacity(str_arr.len(), 256);
516        for i in 0..str_arr.len() {
517            if str_arr.is_null(i) {
518                builder.append_null();
519            } else {
520                let json_str = str_arr.value(i);
521                match serde_json::from_str::<serde_json::Value>(json_str) {
522                    Ok(val) => {
523                        let jsonb = json_types::encode_jsonb(&val);
524                        builder.append_value(&jsonb);
525                    }
526                    Err(_) => builder.append_null(),
527                }
528            }
529        }
530        Ok(ColumnarValue::Array(Arc::new(builder.finish())))
531    }
532}
533
534// ══════════════════════════════════════════════════════════════════
535// Tests
536// ══════════════════════════════════════════════════════════════════
537
538#[cfg(test)]
539mod tests {
540    use super::*;
541    use arrow_schema::Field;
542    use datafusion_common::config::ConfigOptions;
543
544    fn make_args_2(a: ArrayRef, b: ArrayRef) -> ScalarFunctionArgs {
545        ScalarFunctionArgs {
546            args: vec![ColumnarValue::Array(a), ColumnarValue::Array(b)],
547            arg_fields: vec![],
548            number_rows: 0,
549            return_field: Arc::new(Field::new(
550                "output",
551                DataType::Timestamp(TimeUnit::Microsecond, None),
552                true,
553            )),
554            config_options: Arc::new(ConfigOptions::default()),
555        }
556    }
557
558    fn make_args_1(a: ArrayRef) -> ScalarFunctionArgs {
559        ScalarFunctionArgs {
560            args: vec![ColumnarValue::Array(a)],
561            arg_fields: vec![],
562            number_rows: 0,
563            return_field: Arc::new(Field::new("output", DataType::Utf8, true)),
564            config_options: Arc::new(ConfigOptions::default()),
565        }
566    }
567
568    // ── parse_epoch tests ────────────────────────────────────
569
570    #[test]
571    fn test_parse_epoch_seconds() {
572        let udf = ParseEpochUdf::new();
573        let vals = Arc::new(arrow_array::Int64Array::from(vec![1_708_528_800])) as ArrayRef;
574        let units = Arc::new(StringArray::from(vec!["seconds"])) as ArrayRef;
575        let result = udf.invoke_with_args(make_args_2(vals, units)).unwrap();
576        let ColumnarValue::Array(arr) = result else {
577            panic!("expected array")
578        };
579        let ts = arr
580            .as_any()
581            .downcast_ref::<TimestampMicrosecondArray>()
582            .unwrap();
583        assert_eq!(ts.value(0), 1_708_528_800_000_000);
584    }
585
586    #[test]
587    fn test_parse_epoch_milliseconds() {
588        let udf = ParseEpochUdf::new();
589        let vals = Arc::new(arrow_array::Int64Array::from(vec![1_708_528_800_000])) as ArrayRef;
590        let units = Arc::new(StringArray::from(vec!["milliseconds"])) as ArrayRef;
591        let result = udf.invoke_with_args(make_args_2(vals, units)).unwrap();
592        let ColumnarValue::Array(arr) = result else {
593            panic!("expected array")
594        };
595        let ts = arr
596            .as_any()
597            .downcast_ref::<TimestampMicrosecondArray>()
598            .unwrap();
599        assert_eq!(ts.value(0), 1_708_528_800_000_000);
600    }
601
602    #[test]
603    fn test_parse_epoch_microseconds() {
604        let udf = ParseEpochUdf::new();
605        let vals = Arc::new(arrow_array::Int64Array::from(vec![1_708_528_800_000_000])) as ArrayRef;
606        let units = Arc::new(StringArray::from(vec!["microseconds"])) as ArrayRef;
607        let result = udf.invoke_with_args(make_args_2(vals, units)).unwrap();
608        let ColumnarValue::Array(arr) = result else {
609            panic!("expected array")
610        };
611        let ts = arr
612            .as_any()
613            .downcast_ref::<TimestampMicrosecondArray>()
614            .unwrap();
615        assert_eq!(ts.value(0), 1_708_528_800_000_000);
616    }
617
618    #[test]
619    fn test_parse_epoch_nanoseconds() {
620        let udf = ParseEpochUdf::new();
621        let vals = Arc::new(arrow_array::Int64Array::from(vec![
622            1_708_528_800_000_000_000,
623        ])) as ArrayRef;
624        let units = Arc::new(StringArray::from(vec!["nanoseconds"])) as ArrayRef;
625        let result = udf.invoke_with_args(make_args_2(vals, units)).unwrap();
626        let ColumnarValue::Array(arr) = result else {
627            panic!("expected array")
628        };
629        let ts = arr
630            .as_any()
631            .downcast_ref::<TimestampMicrosecondArray>()
632            .unwrap();
633        assert_eq!(ts.value(0), 1_708_528_800_000_000);
634    }
635
636    #[test]
637    fn test_parse_epoch_short_units() {
638        let udf = ParseEpochUdf::new();
639        for (val, unit, expected) in [
640            (100i64, "s", 100_000_000i64),
641            (100_000, "ms", 100_000_000),
642            (100_000_000, "us", 100_000_000),
643            (100_000_000_000, "ns", 100_000_000),
644        ] {
645            let vals = Arc::new(arrow_array::Int64Array::from(vec![val])) as ArrayRef;
646            let units = Arc::new(StringArray::from(vec![unit])) as ArrayRef;
647            let result = udf.invoke_with_args(make_args_2(vals, units)).unwrap();
648            let ColumnarValue::Array(arr) = result else {
649                panic!("expected array")
650            };
651            let ts = arr
652                .as_any()
653                .downcast_ref::<TimestampMicrosecondArray>()
654                .unwrap();
655            assert_eq!(ts.value(0), expected, "Failed for unit '{unit}'");
656        }
657    }
658
659    #[test]
660    fn test_parse_epoch_invalid_unit() {
661        let udf = ParseEpochUdf::new();
662        let vals = Arc::new(arrow_array::Int64Array::from(vec![100])) as ArrayRef;
663        let units = Arc::new(StringArray::from(vec!["invalid"])) as ArrayRef;
664        assert!(udf.invoke_with_args(make_args_2(vals, units)).is_err());
665    }
666
667    #[test]
668    fn test_parse_epoch_null_handling() {
669        let udf = ParseEpochUdf::new();
670        let vals = Arc::new(arrow_array::Int64Array::from(vec![
671            Some(100),
672            None,
673            Some(200),
674        ])) as ArrayRef;
675        let units = Arc::new(StringArray::from(vec![
676            Some("seconds"),
677            Some("seconds"),
678            Some("seconds"),
679        ])) as ArrayRef;
680        let result = udf.invoke_with_args(make_args_2(vals, units)).unwrap();
681        let ColumnarValue::Array(arr) = result else {
682            panic!("expected array")
683        };
684        let ts = arr
685            .as_any()
686            .downcast_ref::<TimestampMicrosecondArray>()
687            .unwrap();
688        assert!(!ts.is_null(0));
689        assert!(ts.is_null(1));
690        assert!(!ts.is_null(2));
691    }
692
693    // ── parse_timestamp tests ────────────────────────────────
694
695    #[test]
696    fn test_parse_timestamp_custom_format() {
697        let udf = ParseTimestampUdf::new();
698        let strs = Arc::new(StringArray::from(vec!["2026-02-21 14:30:00"])) as ArrayRef;
699        let fmts = Arc::new(StringArray::from(vec!["%Y-%m-%d %H:%M:%S"])) as ArrayRef;
700        let result = udf.invoke_with_args(make_args_2(strs, fmts)).unwrap();
701        let ColumnarValue::Array(arr) = result else {
702            panic!("expected array")
703        };
704        let ts = arr
705            .as_any()
706            .downcast_ref::<TimestampMicrosecondArray>()
707            .unwrap();
708        assert!(!ts.is_null(0));
709        // 2026-02-21 14:30:00 UTC = 1771684200 seconds since epoch
710        let expected = 1_771_684_200_000_000i64;
711        assert_eq!(ts.value(0), expected);
712    }
713
714    #[test]
715    fn test_parse_timestamp_iso8601() {
716        let udf = ParseTimestampUdf::new();
717        let strs = Arc::new(StringArray::from(vec!["2026-02-21T14:30:00Z"])) as ArrayRef;
718        let fmts = Arc::new(StringArray::from(vec!["iso8601"])) as ArrayRef;
719        let result = udf.invoke_with_args(make_args_2(strs, fmts)).unwrap();
720        let ColumnarValue::Array(arr) = result else {
721            panic!("expected array")
722        };
723        let ts = arr
724            .as_any()
725            .downcast_ref::<TimestampMicrosecondArray>()
726            .unwrap();
727        assert!(!ts.is_null(0));
728    }
729
730    #[test]
731    fn test_parse_timestamp_invalid_returns_null() {
732        let udf = ParseTimestampUdf::new();
733        let strs = Arc::new(StringArray::from(vec!["not-a-timestamp"])) as ArrayRef;
734        let fmts = Arc::new(StringArray::from(vec!["%Y-%m-%d %H:%M:%S"])) as ArrayRef;
735        let result = udf.invoke_with_args(make_args_2(strs, fmts)).unwrap();
736        let ColumnarValue::Array(arr) = result else {
737            panic!("expected array")
738        };
739        let ts = arr
740            .as_any()
741            .downcast_ref::<TimestampMicrosecondArray>()
742            .unwrap();
743        assert!(ts.is_null(0));
744    }
745
746    // ── to_json tests ────────────────────────────────────────
747
748    #[test]
749    fn test_to_json_int() {
750        let udf = ToJsonUdf::new();
751        let vals = Arc::new(arrow_array::Int64Array::from(vec![42])) as ArrayRef;
752        let result = udf.invoke_with_args(make_args_1(vals)).unwrap();
753        let ColumnarValue::Array(arr) = result else {
754            panic!("expected array")
755        };
756        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
757        assert_eq!(str_arr.value(0), "42");
758    }
759
760    #[test]
761    fn test_to_json_string() {
762        let udf = ToJsonUdf::new();
763        let vals = Arc::new(StringArray::from(vec!["hello"])) as ArrayRef;
764        let result = udf.invoke_with_args(make_args_1(vals)).unwrap();
765        let ColumnarValue::Array(arr) = result else {
766            panic!("expected array")
767        };
768        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
769        assert_eq!(str_arr.value(0), "\"hello\"");
770    }
771
772    #[test]
773    fn test_to_json_bool() {
774        let udf = ToJsonUdf::new();
775        let vals = Arc::new(arrow_array::BooleanArray::from(vec![true, false])) as ArrayRef;
776        let result = udf.invoke_with_args(make_args_1(vals)).unwrap();
777        let ColumnarValue::Array(arr) = result else {
778            panic!("expected array")
779        };
780        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
781        assert_eq!(str_arr.value(0), "true");
782        assert_eq!(str_arr.value(1), "false");
783    }
784
785    #[test]
786    fn test_to_json_null() {
787        let udf = ToJsonUdf::new();
788        let vals = Arc::new(arrow_array::Int64Array::from(vec![None::<i64>])) as ArrayRef;
789        let result = udf.invoke_with_args(make_args_1(vals)).unwrap();
790        let ColumnarValue::Array(arr) = result else {
791            panic!("expected array")
792        };
793        let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
794        assert_eq!(str_arr.value(0), "null");
795    }
796
797    // ── from_json tests ──────────────────────────────────────
798
799    #[test]
800    fn test_from_json_object() {
801        let udf = FromJsonUdf::new();
802        let strs = Arc::new(StringArray::from(vec![r#"{"name":"Alice","age":30}"#])) as ArrayRef;
803        let result = udf.invoke_with_args(make_args_1(strs)).unwrap();
804        let ColumnarValue::Array(arr) = result else {
805            panic!("expected array")
806        };
807        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
808        assert!(!bin.is_null(0));
809        let val = bin.value(0);
810        assert_eq!(json_types::jsonb_type_name(val), Some("object"));
811        let name = json_types::jsonb_get_field(val, "name").unwrap();
812        assert_eq!(json_types::jsonb_to_text(name), Some("Alice".to_owned()));
813    }
814
815    #[test]
816    fn test_from_json_number() {
817        let udf = FromJsonUdf::new();
818        let strs = Arc::new(StringArray::from(vec!["42"])) as ArrayRef;
819        let result = udf.invoke_with_args(make_args_1(strs)).unwrap();
820        let ColumnarValue::Array(arr) = result else {
821            panic!("expected array")
822        };
823        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
824        assert_eq!(json_types::jsonb_type_name(bin.value(0)), Some("number"));
825    }
826
827    #[test]
828    fn test_from_json_invalid_returns_null() {
829        let udf = FromJsonUdf::new();
830        let strs = Arc::new(StringArray::from(vec!["not json {{{"])) as ArrayRef;
831        let result = udf.invoke_with_args(make_args_1(strs)).unwrap();
832        let ColumnarValue::Array(arr) = result else {
833            panic!("expected array")
834        };
835        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
836        assert!(bin.is_null(0));
837    }
838
839    #[test]
840    fn test_from_json_null_input() {
841        let udf = FromJsonUdf::new();
842        let strs = Arc::new(StringArray::from(vec![None::<&str>])) as ArrayRef;
843        let result = udf.invoke_with_args(make_args_1(strs)).unwrap();
844        let ColumnarValue::Array(arr) = result else {
845            panic!("expected array")
846        };
847        let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
848        assert!(bin.is_null(0));
849    }
850
851    // ── Registration tests ───────────────────────────────────
852
853    #[test]
854    fn test_registration() {
855        use datafusion_expr::ScalarUDF;
856
857        let udfs = [
858            ScalarUDF::new_from_impl(ParseEpochUdf::new()),
859            ScalarUDF::new_from_impl(ParseTimestampUdf::new()),
860            ScalarUDF::new_from_impl(ToJsonUdf::new()),
861            ScalarUDF::new_from_impl(FromJsonUdf::new()),
862        ];
863        let names: Vec<&str> = udfs.iter().map(datafusion_expr::ScalarUDF::name).collect();
864        assert_eq!(
865            names,
866            &["parse_epoch", "parse_timestamp", "to_json", "from_json"]
867        );
868    }
869}