1use std::any::Any;
11use std::hash::{Hash, Hasher};
12use std::sync::Arc;
13
14use arrow::datatypes::{DataType, TimeUnit};
15use arrow_array::{
16 builder::{LargeBinaryBuilder, StringBuilder, TimestampMicrosecondBuilder},
17 Array, ArrayRef, LargeBinaryArray, StringArray, TimestampMicrosecondArray,
18};
19use datafusion_common::Result;
20use datafusion_expr::{
21 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
22};
23
24use super::json_types;
25
26fn expand_args(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
30 let len = args
31 .iter()
32 .find_map(|a| match a {
33 ColumnarValue::Array(arr) => Some(arr.len()),
34 ColumnarValue::Scalar(_) => None,
35 })
36 .unwrap_or(1);
37
38 args.iter()
39 .map(|a| match a {
40 ColumnarValue::Array(arr) => Ok(Arc::clone(arr)),
41 ColumnarValue::Scalar(s) => s.to_array_of_size(len),
42 })
43 .collect()
44}
45
46#[derive(Debug)]
65pub struct ParseEpochUdf {
66 signature: Signature,
67}
68
69impl ParseEpochUdf {
70 #[must_use]
72 pub fn new() -> Self {
73 Self {
74 signature: Signature::new(
75 TypeSignature::Exact(vec![DataType::Int64, DataType::Utf8]),
76 Volatility::Immutable,
77 ),
78 }
79 }
80}
81
82impl Default for ParseEpochUdf {
83 fn default() -> Self {
84 Self::new()
85 }
86}
87
88impl PartialEq for ParseEpochUdf {
89 fn eq(&self, _other: &Self) -> bool {
90 true
91 }
92}
93
94impl Eq for ParseEpochUdf {}
95
96impl Hash for ParseEpochUdf {
97 fn hash<H: Hasher>(&self, state: &mut H) {
98 "parse_epoch".hash(state);
99 }
100}
101
102impl ScalarUDFImpl for ParseEpochUdf {
103 fn as_any(&self) -> &dyn Any {
104 self
105 }
106
107 fn name(&self) -> &'static str {
108 "parse_epoch"
109 }
110
111 fn signature(&self) -> &Signature {
112 &self.signature
113 }
114
115 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
116 Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
117 }
118
119 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
120 let expanded = expand_args(&args.args)?;
121 let val_arr = expanded[0]
122 .as_any()
123 .downcast_ref::<arrow_array::Int64Array>()
124 .ok_or_else(|| {
125 datafusion_common::DataFusionError::Internal(
126 "parse_epoch: first arg must be Int64".into(),
127 )
128 })?;
129 let unit_arr = expanded[1]
130 .as_any()
131 .downcast_ref::<StringArray>()
132 .ok_or_else(|| {
133 datafusion_common::DataFusionError::Internal(
134 "parse_epoch: second arg must be Utf8".into(),
135 )
136 })?;
137
138 let mut builder = TimestampMicrosecondBuilder::with_capacity(val_arr.len());
139 for i in 0..val_arr.len() {
140 if val_arr.is_null(i) || unit_arr.is_null(i) {
141 builder.append_null();
142 } else {
143 let value = val_arr.value(i);
144 let unit = unit_arr.value(i);
145 let micros = epoch_to_micros(value, unit)?;
146 builder.append_value(micros);
147 }
148 }
149 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
150 }
151}
152
153fn epoch_to_micros(value: i64, unit: &str) -> Result<i64> {
155 match unit.to_ascii_lowercase().as_str() {
156 "seconds" | "s" => Ok(value.saturating_mul(1_000_000)),
157 "milliseconds" | "ms" => Ok(value.saturating_mul(1_000)),
158 "microseconds" | "us" => Ok(value),
159 "nanoseconds" | "ns" => Ok(value / 1_000),
160 _ => Err(datafusion_common::DataFusionError::Execution(format!(
161 "parse_epoch: invalid unit '{unit}'. \
162 Expected: seconds, milliseconds, microseconds, nanoseconds"
163 ))),
164 }
165}
166
167#[derive(Debug)]
185pub struct ParseTimestampUdf {
186 signature: Signature,
187}
188
189impl ParseTimestampUdf {
190 #[must_use]
192 pub fn new() -> Self {
193 Self {
194 signature: Signature::new(
195 TypeSignature::Exact(vec![DataType::Utf8, DataType::Utf8]),
196 Volatility::Immutable,
197 ),
198 }
199 }
200}
201
202impl Default for ParseTimestampUdf {
203 fn default() -> Self {
204 Self::new()
205 }
206}
207
208impl PartialEq for ParseTimestampUdf {
209 fn eq(&self, _other: &Self) -> bool {
210 true
211 }
212}
213
214impl Eq for ParseTimestampUdf {}
215
216impl Hash for ParseTimestampUdf {
217 fn hash<H: Hasher>(&self, state: &mut H) {
218 "parse_timestamp".hash(state);
219 }
220}
221
222impl ScalarUDFImpl for ParseTimestampUdf {
223 fn as_any(&self) -> &dyn Any {
224 self
225 }
226
227 fn name(&self) -> &'static str {
228 "parse_timestamp"
229 }
230
231 fn signature(&self) -> &Signature {
232 &self.signature
233 }
234
235 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
236 Ok(DataType::Timestamp(TimeUnit::Microsecond, None))
237 }
238
239 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
240 let expanded = expand_args(&args.args)?;
241 let str_arr = expanded[0]
242 .as_any()
243 .downcast_ref::<StringArray>()
244 .ok_or_else(|| {
245 datafusion_common::DataFusionError::Internal(
246 "parse_timestamp: first arg must be Utf8".into(),
247 )
248 })?;
249 let fmt_arr = expanded[1]
250 .as_any()
251 .downcast_ref::<StringArray>()
252 .ok_or_else(|| {
253 datafusion_common::DataFusionError::Internal(
254 "parse_timestamp: second arg must be Utf8".into(),
255 )
256 })?;
257
258 let mut builder = TimestampMicrosecondBuilder::with_capacity(str_arr.len());
259 for i in 0..str_arr.len() {
260 if str_arr.is_null(i) || fmt_arr.is_null(i) {
261 builder.append_null();
262 } else {
263 let ts_str = str_arr.value(i);
264 let fmt = fmt_arr.value(i);
265 match parse_ts_string(ts_str, fmt) {
266 Ok(micros) => builder.append_value(micros),
267 Err(_) => builder.append_null(),
268 }
269 }
270 }
271 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
272 }
273}
274
275fn parse_ts_string(ts_str: &str, fmt: &str) -> std::result::Result<i64, String> {
277 use chrono::NaiveDateTime;
278
279 if fmt.eq_ignore_ascii_case("iso8601") {
280 let dt = ts_str
282 .parse::<chrono::DateTime<chrono::Utc>>()
283 .or_else(|_| {
284 NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H:%M:%S%.f")
285 .map(|ndt| ndt.and_utc())
286 })
287 .or_else(|_| {
288 NaiveDateTime::parse_from_str(ts_str, "%Y-%m-%dT%H:%M:%S").map(|ndt| ndt.and_utc())
289 })
290 .map_err(|e| e.to_string())?;
291 return Ok(dt.timestamp_micros());
292 }
293
294 let ndt = NaiveDateTime::parse_from_str(ts_str, fmt).map_err(|e| e.to_string())?;
295 Ok(ndt.and_utc().timestamp_micros())
296}
297
298#[derive(Debug)]
314pub struct ToJsonUdf {
315 signature: Signature,
316}
317
318impl ToJsonUdf {
319 #[must_use]
321 pub fn new() -> Self {
322 Self {
323 signature: Signature::new(TypeSignature::Any(1), Volatility::Immutable),
324 }
325 }
326}
327
328impl Default for ToJsonUdf {
329 fn default() -> Self {
330 Self::new()
331 }
332}
333
334impl PartialEq for ToJsonUdf {
335 fn eq(&self, _other: &Self) -> bool {
336 true
337 }
338}
339
340impl Eq for ToJsonUdf {}
341
342impl Hash for ToJsonUdf {
343 fn hash<H: Hasher>(&self, state: &mut H) {
344 "to_json".hash(state);
345 }
346}
347
348impl ScalarUDFImpl for ToJsonUdf {
349 fn as_any(&self) -> &dyn Any {
350 self
351 }
352
353 fn name(&self) -> &'static str {
354 "to_json"
355 }
356
357 fn signature(&self) -> &Signature {
358 &self.signature
359 }
360
361 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
362 Ok(DataType::Utf8)
363 }
364
365 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
366 let expanded = expand_args(&args.args)?;
367 let arr = &expanded[0];
368 let len = arr.len();
369
370 let mut builder = StringBuilder::with_capacity(len, 256);
371 for row in 0..len {
372 if arr.is_null(row) {
373 builder.append_value("null");
374 } else {
375 let val = arrow_value_to_json(arr, row);
376 builder.append_value(val.to_string());
377 }
378 }
379 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
380 }
381}
382
383fn arrow_value_to_json(arr: &ArrayRef, row: usize) -> serde_json::Value {
385 if arr.is_null(row) {
386 return serde_json::Value::Null;
387 }
388 if let Some(a) = arr.as_any().downcast_ref::<StringArray>() {
389 return serde_json::Value::String(a.value(row).to_owned());
390 }
391 if let Some(a) = arr.as_any().downcast_ref::<arrow_array::Int64Array>() {
392 return serde_json::Value::Number(a.value(row).into());
393 }
394 if let Some(a) = arr.as_any().downcast_ref::<arrow_array::Int32Array>() {
395 return serde_json::Value::Number(i64::from(a.value(row)).into());
396 }
397 if let Some(a) = arr.as_any().downcast_ref::<arrow_array::Float64Array>() {
398 if let Some(n) = serde_json::Number::from_f64(a.value(row)) {
399 return serde_json::Value::Number(n);
400 }
401 return serde_json::Value::Null;
402 }
403 if let Some(a) = arr.as_any().downcast_ref::<arrow_array::BooleanArray>() {
404 return serde_json::Value::Bool(a.value(row));
405 }
406 if let Some(a) = arr.as_any().downcast_ref::<LargeBinaryArray>() {
408 if let Some(text) = json_types::jsonb_to_text(a.value(row)) {
409 if let Ok(val) = serde_json::from_str::<serde_json::Value>(&text) {
410 return val;
411 }
412 return serde_json::Value::String(text);
413 }
414 return serde_json::Value::Null;
415 }
416 if let Some(a) = arr.as_any().downcast_ref::<TimestampMicrosecondArray>() {
418 let micros = a.value(row);
419 let secs = micros / 1_000_000;
420 let sub_micros = (micros % 1_000_000).unsigned_abs() * 1_000;
421 #[allow(clippy::cast_possible_truncation)]
422 let nsecs = sub_micros as u32;
423 if let Some(dt) = chrono::DateTime::from_timestamp(secs, nsecs) {
424 return serde_json::Value::String(dt.to_rfc3339());
425 }
426 return serde_json::Value::Number(micros.into());
427 }
428 let sv = datafusion_common::ScalarValue::try_from_array(arr, row).ok();
430 match sv {
431 Some(s) => serde_json::Value::String(s.to_string()),
432 None => serde_json::Value::Null,
433 }
434}
435
436#[derive(Debug)]
452pub struct FromJsonUdf {
453 signature: Signature,
454}
455
456impl FromJsonUdf {
457 #[must_use]
459 pub fn new() -> Self {
460 Self {
461 signature: Signature::new(
462 TypeSignature::Exact(vec![DataType::Utf8]),
463 Volatility::Immutable,
464 ),
465 }
466 }
467}
468
469impl Default for FromJsonUdf {
470 fn default() -> Self {
471 Self::new()
472 }
473}
474
475impl PartialEq for FromJsonUdf {
476 fn eq(&self, _other: &Self) -> bool {
477 true
478 }
479}
480
481impl Eq for FromJsonUdf {}
482
483impl Hash for FromJsonUdf {
484 fn hash<H: Hasher>(&self, state: &mut H) {
485 "from_json".hash(state);
486 }
487}
488
489impl ScalarUDFImpl for FromJsonUdf {
490 fn as_any(&self) -> &dyn Any {
491 self
492 }
493
494 fn name(&self) -> &'static str {
495 "from_json"
496 }
497
498 fn signature(&self) -> &Signature {
499 &self.signature
500 }
501
502 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
503 Ok(DataType::LargeBinary) }
505
506 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
507 let expanded = expand_args(&args.args)?;
508 let str_arr = expanded[0]
509 .as_any()
510 .downcast_ref::<StringArray>()
511 .ok_or_else(|| {
512 datafusion_common::DataFusionError::Internal("from_json: arg must be Utf8".into())
513 })?;
514
515 let mut builder = LargeBinaryBuilder::with_capacity(str_arr.len(), 256);
516 for i in 0..str_arr.len() {
517 if str_arr.is_null(i) {
518 builder.append_null();
519 } else {
520 let json_str = str_arr.value(i);
521 match serde_json::from_str::<serde_json::Value>(json_str) {
522 Ok(val) => {
523 let jsonb = json_types::encode_jsonb(&val);
524 builder.append_value(&jsonb);
525 }
526 Err(_) => builder.append_null(),
527 }
528 }
529 }
530 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
531 }
532}
533
534#[cfg(test)]
539mod tests {
540 use super::*;
541 use arrow_schema::Field;
542 use datafusion_common::config::ConfigOptions;
543
544 fn make_args_2(a: ArrayRef, b: ArrayRef) -> ScalarFunctionArgs {
545 ScalarFunctionArgs {
546 args: vec![ColumnarValue::Array(a), ColumnarValue::Array(b)],
547 arg_fields: vec![],
548 number_rows: 0,
549 return_field: Arc::new(Field::new(
550 "output",
551 DataType::Timestamp(TimeUnit::Microsecond, None),
552 true,
553 )),
554 config_options: Arc::new(ConfigOptions::default()),
555 }
556 }
557
558 fn make_args_1(a: ArrayRef) -> ScalarFunctionArgs {
559 ScalarFunctionArgs {
560 args: vec![ColumnarValue::Array(a)],
561 arg_fields: vec![],
562 number_rows: 0,
563 return_field: Arc::new(Field::new("output", DataType::Utf8, true)),
564 config_options: Arc::new(ConfigOptions::default()),
565 }
566 }
567
568 #[test]
571 fn test_parse_epoch_seconds() {
572 let udf = ParseEpochUdf::new();
573 let vals = Arc::new(arrow_array::Int64Array::from(vec![1_708_528_800])) as ArrayRef;
574 let units = Arc::new(StringArray::from(vec!["seconds"])) as ArrayRef;
575 let result = udf.invoke_with_args(make_args_2(vals, units)).unwrap();
576 let ColumnarValue::Array(arr) = result else {
577 panic!("expected array")
578 };
579 let ts = arr
580 .as_any()
581 .downcast_ref::<TimestampMicrosecondArray>()
582 .unwrap();
583 assert_eq!(ts.value(0), 1_708_528_800_000_000);
584 }
585
586 #[test]
587 fn test_parse_epoch_milliseconds() {
588 let udf = ParseEpochUdf::new();
589 let vals = Arc::new(arrow_array::Int64Array::from(vec![1_708_528_800_000])) as ArrayRef;
590 let units = Arc::new(StringArray::from(vec!["milliseconds"])) as ArrayRef;
591 let result = udf.invoke_with_args(make_args_2(vals, units)).unwrap();
592 let ColumnarValue::Array(arr) = result else {
593 panic!("expected array")
594 };
595 let ts = arr
596 .as_any()
597 .downcast_ref::<TimestampMicrosecondArray>()
598 .unwrap();
599 assert_eq!(ts.value(0), 1_708_528_800_000_000);
600 }
601
602 #[test]
603 fn test_parse_epoch_microseconds() {
604 let udf = ParseEpochUdf::new();
605 let vals = Arc::new(arrow_array::Int64Array::from(vec![1_708_528_800_000_000])) as ArrayRef;
606 let units = Arc::new(StringArray::from(vec!["microseconds"])) as ArrayRef;
607 let result = udf.invoke_with_args(make_args_2(vals, units)).unwrap();
608 let ColumnarValue::Array(arr) = result else {
609 panic!("expected array")
610 };
611 let ts = arr
612 .as_any()
613 .downcast_ref::<TimestampMicrosecondArray>()
614 .unwrap();
615 assert_eq!(ts.value(0), 1_708_528_800_000_000);
616 }
617
618 #[test]
619 fn test_parse_epoch_nanoseconds() {
620 let udf = ParseEpochUdf::new();
621 let vals = Arc::new(arrow_array::Int64Array::from(vec![
622 1_708_528_800_000_000_000,
623 ])) as ArrayRef;
624 let units = Arc::new(StringArray::from(vec!["nanoseconds"])) as ArrayRef;
625 let result = udf.invoke_with_args(make_args_2(vals, units)).unwrap();
626 let ColumnarValue::Array(arr) = result else {
627 panic!("expected array")
628 };
629 let ts = arr
630 .as_any()
631 .downcast_ref::<TimestampMicrosecondArray>()
632 .unwrap();
633 assert_eq!(ts.value(0), 1_708_528_800_000_000);
634 }
635
636 #[test]
637 fn test_parse_epoch_short_units() {
638 let udf = ParseEpochUdf::new();
639 for (val, unit, expected) in [
640 (100i64, "s", 100_000_000i64),
641 (100_000, "ms", 100_000_000),
642 (100_000_000, "us", 100_000_000),
643 (100_000_000_000, "ns", 100_000_000),
644 ] {
645 let vals = Arc::new(arrow_array::Int64Array::from(vec![val])) as ArrayRef;
646 let units = Arc::new(StringArray::from(vec![unit])) as ArrayRef;
647 let result = udf.invoke_with_args(make_args_2(vals, units)).unwrap();
648 let ColumnarValue::Array(arr) = result else {
649 panic!("expected array")
650 };
651 let ts = arr
652 .as_any()
653 .downcast_ref::<TimestampMicrosecondArray>()
654 .unwrap();
655 assert_eq!(ts.value(0), expected, "Failed for unit '{unit}'");
656 }
657 }
658
659 #[test]
660 fn test_parse_epoch_invalid_unit() {
661 let udf = ParseEpochUdf::new();
662 let vals = Arc::new(arrow_array::Int64Array::from(vec![100])) as ArrayRef;
663 let units = Arc::new(StringArray::from(vec!["invalid"])) as ArrayRef;
664 assert!(udf.invoke_with_args(make_args_2(vals, units)).is_err());
665 }
666
667 #[test]
668 fn test_parse_epoch_null_handling() {
669 let udf = ParseEpochUdf::new();
670 let vals = Arc::new(arrow_array::Int64Array::from(vec![
671 Some(100),
672 None,
673 Some(200),
674 ])) as ArrayRef;
675 let units = Arc::new(StringArray::from(vec![
676 Some("seconds"),
677 Some("seconds"),
678 Some("seconds"),
679 ])) as ArrayRef;
680 let result = udf.invoke_with_args(make_args_2(vals, units)).unwrap();
681 let ColumnarValue::Array(arr) = result else {
682 panic!("expected array")
683 };
684 let ts = arr
685 .as_any()
686 .downcast_ref::<TimestampMicrosecondArray>()
687 .unwrap();
688 assert!(!ts.is_null(0));
689 assert!(ts.is_null(1));
690 assert!(!ts.is_null(2));
691 }
692
693 #[test]
696 fn test_parse_timestamp_custom_format() {
697 let udf = ParseTimestampUdf::new();
698 let strs = Arc::new(StringArray::from(vec!["2026-02-21 14:30:00"])) as ArrayRef;
699 let fmts = Arc::new(StringArray::from(vec!["%Y-%m-%d %H:%M:%S"])) as ArrayRef;
700 let result = udf.invoke_with_args(make_args_2(strs, fmts)).unwrap();
701 let ColumnarValue::Array(arr) = result else {
702 panic!("expected array")
703 };
704 let ts = arr
705 .as_any()
706 .downcast_ref::<TimestampMicrosecondArray>()
707 .unwrap();
708 assert!(!ts.is_null(0));
709 let expected = 1_771_684_200_000_000i64;
711 assert_eq!(ts.value(0), expected);
712 }
713
714 #[test]
715 fn test_parse_timestamp_iso8601() {
716 let udf = ParseTimestampUdf::new();
717 let strs = Arc::new(StringArray::from(vec!["2026-02-21T14:30:00Z"])) as ArrayRef;
718 let fmts = Arc::new(StringArray::from(vec!["iso8601"])) as ArrayRef;
719 let result = udf.invoke_with_args(make_args_2(strs, fmts)).unwrap();
720 let ColumnarValue::Array(arr) = result else {
721 panic!("expected array")
722 };
723 let ts = arr
724 .as_any()
725 .downcast_ref::<TimestampMicrosecondArray>()
726 .unwrap();
727 assert!(!ts.is_null(0));
728 }
729
730 #[test]
731 fn test_parse_timestamp_invalid_returns_null() {
732 let udf = ParseTimestampUdf::new();
733 let strs = Arc::new(StringArray::from(vec!["not-a-timestamp"])) as ArrayRef;
734 let fmts = Arc::new(StringArray::from(vec!["%Y-%m-%d %H:%M:%S"])) as ArrayRef;
735 let result = udf.invoke_with_args(make_args_2(strs, fmts)).unwrap();
736 let ColumnarValue::Array(arr) = result else {
737 panic!("expected array")
738 };
739 let ts = arr
740 .as_any()
741 .downcast_ref::<TimestampMicrosecondArray>()
742 .unwrap();
743 assert!(ts.is_null(0));
744 }
745
746 #[test]
749 fn test_to_json_int() {
750 let udf = ToJsonUdf::new();
751 let vals = Arc::new(arrow_array::Int64Array::from(vec![42])) as ArrayRef;
752 let result = udf.invoke_with_args(make_args_1(vals)).unwrap();
753 let ColumnarValue::Array(arr) = result else {
754 panic!("expected array")
755 };
756 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
757 assert_eq!(str_arr.value(0), "42");
758 }
759
760 #[test]
761 fn test_to_json_string() {
762 let udf = ToJsonUdf::new();
763 let vals = Arc::new(StringArray::from(vec!["hello"])) as ArrayRef;
764 let result = udf.invoke_with_args(make_args_1(vals)).unwrap();
765 let ColumnarValue::Array(arr) = result else {
766 panic!("expected array")
767 };
768 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
769 assert_eq!(str_arr.value(0), "\"hello\"");
770 }
771
772 #[test]
773 fn test_to_json_bool() {
774 let udf = ToJsonUdf::new();
775 let vals = Arc::new(arrow_array::BooleanArray::from(vec![true, false])) as ArrayRef;
776 let result = udf.invoke_with_args(make_args_1(vals)).unwrap();
777 let ColumnarValue::Array(arr) = result else {
778 panic!("expected array")
779 };
780 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
781 assert_eq!(str_arr.value(0), "true");
782 assert_eq!(str_arr.value(1), "false");
783 }
784
785 #[test]
786 fn test_to_json_null() {
787 let udf = ToJsonUdf::new();
788 let vals = Arc::new(arrow_array::Int64Array::from(vec![None::<i64>])) as ArrayRef;
789 let result = udf.invoke_with_args(make_args_1(vals)).unwrap();
790 let ColumnarValue::Array(arr) = result else {
791 panic!("expected array")
792 };
793 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
794 assert_eq!(str_arr.value(0), "null");
795 }
796
797 #[test]
800 fn test_from_json_object() {
801 let udf = FromJsonUdf::new();
802 let strs = Arc::new(StringArray::from(vec![r#"{"name":"Alice","age":30}"#])) as ArrayRef;
803 let result = udf.invoke_with_args(make_args_1(strs)).unwrap();
804 let ColumnarValue::Array(arr) = result else {
805 panic!("expected array")
806 };
807 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
808 assert!(!bin.is_null(0));
809 let val = bin.value(0);
810 assert_eq!(json_types::jsonb_type_name(val), Some("object"));
811 let name = json_types::jsonb_get_field(val, "name").unwrap();
812 assert_eq!(json_types::jsonb_to_text(name), Some("Alice".to_owned()));
813 }
814
815 #[test]
816 fn test_from_json_number() {
817 let udf = FromJsonUdf::new();
818 let strs = Arc::new(StringArray::from(vec!["42"])) as ArrayRef;
819 let result = udf.invoke_with_args(make_args_1(strs)).unwrap();
820 let ColumnarValue::Array(arr) = result else {
821 panic!("expected array")
822 };
823 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
824 assert_eq!(json_types::jsonb_type_name(bin.value(0)), Some("number"));
825 }
826
827 #[test]
828 fn test_from_json_invalid_returns_null() {
829 let udf = FromJsonUdf::new();
830 let strs = Arc::new(StringArray::from(vec!["not json {{{"])) as ArrayRef;
831 let result = udf.invoke_with_args(make_args_1(strs)).unwrap();
832 let ColumnarValue::Array(arr) = result else {
833 panic!("expected array")
834 };
835 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
836 assert!(bin.is_null(0));
837 }
838
839 #[test]
840 fn test_from_json_null_input() {
841 let udf = FromJsonUdf::new();
842 let strs = Arc::new(StringArray::from(vec![None::<&str>])) as ArrayRef;
843 let result = udf.invoke_with_args(make_args_1(strs)).unwrap();
844 let ColumnarValue::Array(arr) = result else {
845 panic!("expected array")
846 };
847 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
848 assert!(bin.is_null(0));
849 }
850
851 #[test]
854 fn test_registration() {
855 use datafusion_expr::ScalarUDF;
856
857 let udfs = [
858 ScalarUDF::new_from_impl(ParseEpochUdf::new()),
859 ScalarUDF::new_from_impl(ParseTimestampUdf::new()),
860 ScalarUDF::new_from_impl(ToJsonUdf::new()),
861 ScalarUDF::new_from_impl(FromJsonUdf::new()),
862 ];
863 let names: Vec<&str> = udfs.iter().map(datafusion_expr::ScalarUDF::name).collect();
864 assert_eq!(
865 names,
866 &["parse_epoch", "parse_timestamp", "to_json", "from_json"]
867 );
868 }
869}