1use std::any::Any;
13use std::collections::HashSet;
14use std::hash::{Hash, Hasher};
15use std::sync::Arc;
16
17use arrow::datatypes::DataType;
18use arrow_array::{
19 builder::{LargeBinaryBuilder, StringBuilder},
20 Array, LargeBinaryArray, ListArray, MapArray, StringArray,
21};
22use datafusion_common::Result;
23use datafusion_expr::{
24 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
25};
26
27use super::json_types;
28use super::json_udf::expand_args;
29
30const MAX_DEPTH: usize = 64;
32
33#[derive(Debug)]
42pub struct JsonbMerge {
43 signature: Signature,
44}
45
46impl JsonbMerge {
47 #[must_use]
49 pub fn new() -> Self {
50 Self {
51 signature: Signature::new(
52 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::LargeBinary]),
53 Volatility::Immutable,
54 ),
55 }
56 }
57}
58
59impl Default for JsonbMerge {
60 fn default() -> Self {
61 Self::new()
62 }
63}
64
65impl PartialEq for JsonbMerge {
66 fn eq(&self, _other: &Self) -> bool {
67 true
68 }
69}
70
71impl Eq for JsonbMerge {}
72
73impl Hash for JsonbMerge {
74 fn hash<H: Hasher>(&self, state: &mut H) {
75 "jsonb_merge".hash(state);
76 }
77}
78
79impl ScalarUDFImpl for JsonbMerge {
80 fn as_any(&self) -> &dyn Any {
81 self
82 }
83
84 fn name(&self) -> &'static str {
85 "jsonb_merge"
86 }
87
88 fn signature(&self) -> &Signature {
89 &self.signature
90 }
91
92 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
93 Ok(DataType::LargeBinary)
94 }
95
96 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
97 let expanded = expand_args(&args.args)?;
98 let left_arr = expanded[0]
99 .as_any()
100 .downcast_ref::<LargeBinaryArray>()
101 .ok_or_else(|| {
102 datafusion_common::DataFusionError::Internal(
103 "jsonb_merge: first arg must be LargeBinary".into(),
104 )
105 })?;
106 let right_arr = expanded[1]
107 .as_any()
108 .downcast_ref::<LargeBinaryArray>()
109 .ok_or_else(|| {
110 datafusion_common::DataFusionError::Internal(
111 "jsonb_merge: second arg must be LargeBinary".into(),
112 )
113 })?;
114
115 let mut builder = LargeBinaryBuilder::with_capacity(left_arr.len(), 256);
116 for i in 0..left_arr.len() {
117 if left_arr.is_null(i) || right_arr.is_null(i) {
118 builder.append_null();
119 } else {
120 let left_val = json_types::jsonb_to_value(left_arr.value(i));
121 let right_val = json_types::jsonb_to_value(right_arr.value(i));
122 match (left_val, right_val) {
123 (
124 Some(serde_json::Value::Object(mut l)),
125 Some(serde_json::Value::Object(r)),
126 ) => {
127 for (k, v) in r {
128 l.insert(k, v);
129 }
130 builder
131 .append_value(json_types::encode_jsonb(&serde_json::Value::Object(l)));
132 }
133 (_, Some(r)) => {
134 builder.append_value(json_types::encode_jsonb(&r));
135 }
136 _ => builder.append_null(),
137 }
138 }
139 }
140 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
141 }
142}
143
144#[derive(Debug)]
153pub struct JsonbDeepMerge {
154 signature: Signature,
155}
156
157impl JsonbDeepMerge {
158 #[must_use]
160 pub fn new() -> Self {
161 Self {
162 signature: Signature::new(
163 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::LargeBinary]),
164 Volatility::Immutable,
165 ),
166 }
167 }
168}
169
170impl Default for JsonbDeepMerge {
171 fn default() -> Self {
172 Self::new()
173 }
174}
175
176impl PartialEq for JsonbDeepMerge {
177 fn eq(&self, _other: &Self) -> bool {
178 true
179 }
180}
181
182impl Eq for JsonbDeepMerge {}
183
184impl Hash for JsonbDeepMerge {
185 fn hash<H: Hasher>(&self, state: &mut H) {
186 "jsonb_deep_merge".hash(state);
187 }
188}
189
190fn deep_merge(
191 left: serde_json::Value,
192 right: serde_json::Value,
193 depth: usize,
194) -> std::result::Result<serde_json::Value, String> {
195 if depth > MAX_DEPTH {
196 return Err("jsonb_deep_merge: max depth exceeded".into());
197 }
198 match (left, right) {
199 (serde_json::Value::Object(mut l), serde_json::Value::Object(r)) => {
200 for (k, rv) in r {
201 let merged = if let Some(lv) = l.remove(&k) {
202 deep_merge(lv, rv, depth + 1)?
203 } else {
204 rv
205 };
206 l.insert(k, merged);
207 }
208 Ok(serde_json::Value::Object(l))
209 }
210 (_, r) => Ok(r),
211 }
212}
213
214impl ScalarUDFImpl for JsonbDeepMerge {
215 fn as_any(&self) -> &dyn Any {
216 self
217 }
218
219 fn name(&self) -> &'static str {
220 "jsonb_deep_merge"
221 }
222
223 fn signature(&self) -> &Signature {
224 &self.signature
225 }
226
227 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
228 Ok(DataType::LargeBinary)
229 }
230
231 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
232 let expanded = expand_args(&args.args)?;
233 let left_arr = expanded[0]
234 .as_any()
235 .downcast_ref::<LargeBinaryArray>()
236 .ok_or_else(|| {
237 datafusion_common::DataFusionError::Internal(
238 "jsonb_deep_merge: first arg must be LargeBinary".into(),
239 )
240 })?;
241 let right_arr = expanded[1]
242 .as_any()
243 .downcast_ref::<LargeBinaryArray>()
244 .ok_or_else(|| {
245 datafusion_common::DataFusionError::Internal(
246 "jsonb_deep_merge: second arg must be LargeBinary".into(),
247 )
248 })?;
249
250 let mut builder = LargeBinaryBuilder::with_capacity(left_arr.len(), 256);
251 for i in 0..left_arr.len() {
252 if left_arr.is_null(i) || right_arr.is_null(i) {
253 builder.append_null();
254 } else {
255 let left_val = json_types::jsonb_to_value(left_arr.value(i));
256 let right_val = json_types::jsonb_to_value(right_arr.value(i));
257 match (left_val, right_val) {
258 (Some(l), Some(r)) => {
259 let merged = deep_merge(l, r, 0)
260 .map_err(datafusion_common::DataFusionError::Execution)?;
261 builder.append_value(json_types::encode_jsonb(&merged));
262 }
263 _ => builder.append_null(),
264 }
265 }
266 }
267 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
268 }
269}
270
271#[derive(Debug)]
280pub struct JsonbStripNulls {
281 signature: Signature,
282}
283
284impl JsonbStripNulls {
285 #[must_use]
287 pub fn new() -> Self {
288 Self {
289 signature: Signature::new(
290 TypeSignature::Exact(vec![DataType::LargeBinary]),
291 Volatility::Immutable,
292 ),
293 }
294 }
295}
296
297impl Default for JsonbStripNulls {
298 fn default() -> Self {
299 Self::new()
300 }
301}
302
303impl PartialEq for JsonbStripNulls {
304 fn eq(&self, _other: &Self) -> bool {
305 true
306 }
307}
308
309impl Eq for JsonbStripNulls {}
310
311impl Hash for JsonbStripNulls {
312 fn hash<H: Hasher>(&self, state: &mut H) {
313 "jsonb_strip_nulls".hash(state);
314 }
315}
316
317fn strip_nulls(val: serde_json::Value) -> serde_json::Value {
318 match val {
319 serde_json::Value::Object(obj) => {
320 let filtered: serde_json::Map<String, serde_json::Value> = obj
321 .into_iter()
322 .filter(|(_, v)| !v.is_null())
323 .map(|(k, v)| (k, strip_nulls(v)))
324 .collect();
325 serde_json::Value::Object(filtered)
326 }
327 serde_json::Value::Array(arr) => {
328 serde_json::Value::Array(arr.into_iter().map(strip_nulls).collect())
329 }
330 other => other,
331 }
332}
333
334impl ScalarUDFImpl for JsonbStripNulls {
335 fn as_any(&self) -> &dyn Any {
336 self
337 }
338
339 fn name(&self) -> &'static str {
340 "jsonb_strip_nulls"
341 }
342
343 fn signature(&self) -> &Signature {
344 &self.signature
345 }
346
347 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
348 Ok(DataType::LargeBinary)
349 }
350
351 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
352 let expanded = expand_args(&args.args)?;
353 let jsonb_arr = expanded[0]
354 .as_any()
355 .downcast_ref::<LargeBinaryArray>()
356 .ok_or_else(|| {
357 datafusion_common::DataFusionError::Internal(
358 "jsonb_strip_nulls: arg must be LargeBinary".into(),
359 )
360 })?;
361
362 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
363 for i in 0..jsonb_arr.len() {
364 if jsonb_arr.is_null(i) {
365 builder.append_null();
366 } else {
367 match json_types::jsonb_to_value(jsonb_arr.value(i)) {
368 Some(val) => {
369 builder.append_value(json_types::encode_jsonb(&strip_nulls(val)));
370 }
371 None => builder.append_null(),
372 }
373 }
374 }
375 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
376 }
377}
378
379#[derive(Debug)]
388pub struct JsonbRenameKeys {
389 signature: Signature,
390}
391
392impl JsonbRenameKeys {
393 #[must_use]
395 pub fn new() -> Self {
396 Self {
397 signature: Signature::new(
398 TypeSignature::Exact(vec![
399 DataType::LargeBinary,
400 DataType::Map(
401 Arc::new(arrow_schema::Field::new(
402 "entries",
403 DataType::Struct(
404 vec![
405 arrow_schema::Field::new("key", DataType::Utf8, false),
406 arrow_schema::Field::new("value", DataType::Utf8, true),
407 ]
408 .into(),
409 ),
410 false,
411 )),
412 false,
413 ),
414 ]),
415 Volatility::Immutable,
416 ),
417 }
418 }
419}
420
421impl Default for JsonbRenameKeys {
422 fn default() -> Self {
423 Self::new()
424 }
425}
426
427impl PartialEq for JsonbRenameKeys {
428 fn eq(&self, _other: &Self) -> bool {
429 true
430 }
431}
432
433impl Eq for JsonbRenameKeys {}
434
435impl Hash for JsonbRenameKeys {
436 fn hash<H: Hasher>(&self, state: &mut H) {
437 "jsonb_rename_keys".hash(state);
438 }
439}
440
441impl ScalarUDFImpl for JsonbRenameKeys {
442 fn as_any(&self) -> &dyn Any {
443 self
444 }
445
446 fn name(&self) -> &'static str {
447 "jsonb_rename_keys"
448 }
449
450 fn signature(&self) -> &Signature {
451 &self.signature
452 }
453
454 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
455 Ok(DataType::LargeBinary)
456 }
457
458 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
459 let expanded = expand_args(&args.args)?;
460 let jsonb_arr = expanded[0]
461 .as_any()
462 .downcast_ref::<LargeBinaryArray>()
463 .ok_or_else(|| {
464 datafusion_common::DataFusionError::Internal(
465 "jsonb_rename_keys: first arg must be LargeBinary".into(),
466 )
467 })?;
468 let map_arr = expanded[1]
469 .as_any()
470 .downcast_ref::<MapArray>()
471 .ok_or_else(|| {
472 datafusion_common::DataFusionError::Internal(
473 "jsonb_rename_keys: second arg must be Map<Utf8,Utf8>".into(),
474 )
475 })?;
476
477 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
478 for i in 0..jsonb_arr.len() {
479 if jsonb_arr.is_null(i) || map_arr.is_null(i) {
480 builder.append_null();
481 } else {
482 let rename_map = extract_string_map(map_arr, i);
483 match json_types::jsonb_to_value(jsonb_arr.value(i)) {
484 Some(serde_json::Value::Object(obj)) => {
485 let renamed: serde_json::Map<String, serde_json::Value> = obj
486 .into_iter()
487 .map(|(k, v)| {
488 let new_key = rename_map.get(k.as_str()).cloned().unwrap_or(k);
489 (new_key, v)
490 })
491 .collect();
492 builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
493 renamed,
494 )));
495 }
496 Some(other) => {
497 builder.append_value(json_types::encode_jsonb(&other));
498 }
499 None => builder.append_null(),
500 }
501 }
502 }
503 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
504 }
505}
506
507fn extract_string_map(map_arr: &MapArray, row: usize) -> std::collections::HashMap<String, String> {
509 let mut result = std::collections::HashMap::new();
510 let entries = map_arr.value(row);
511 let struct_arr = entries
512 .as_any()
513 .downcast_ref::<arrow_array::StructArray>()
514 .unwrap();
515 let keys = struct_arr
516 .column(0)
517 .as_any()
518 .downcast_ref::<StringArray>()
519 .unwrap();
520 let vals = struct_arr
521 .column(1)
522 .as_any()
523 .downcast_ref::<StringArray>()
524 .unwrap();
525 for j in 0..keys.len() {
526 if !keys.is_null(j) && !vals.is_null(j) {
527 result.insert(keys.value(j).to_owned(), vals.value(j).to_owned());
528 }
529 }
530 result
531}
532
533#[derive(Debug)]
541pub struct JsonbPick {
542 signature: Signature,
543}
544
545impl JsonbPick {
546 #[must_use]
548 pub fn new() -> Self {
549 Self {
550 signature: Signature::new(
551 TypeSignature::Exact(vec![
552 DataType::LargeBinary,
553 DataType::List(Arc::new(arrow_schema::Field::new(
554 "item",
555 DataType::Utf8,
556 true,
557 ))),
558 ]),
559 Volatility::Immutable,
560 ),
561 }
562 }
563}
564
565impl Default for JsonbPick {
566 fn default() -> Self {
567 Self::new()
568 }
569}
570
571impl PartialEq for JsonbPick {
572 fn eq(&self, _other: &Self) -> bool {
573 true
574 }
575}
576
577impl Eq for JsonbPick {}
578
579impl Hash for JsonbPick {
580 fn hash<H: Hasher>(&self, state: &mut H) {
581 "jsonb_pick".hash(state);
582 }
583}
584
585impl ScalarUDFImpl for JsonbPick {
586 fn as_any(&self) -> &dyn Any {
587 self
588 }
589
590 fn name(&self) -> &'static str {
591 "jsonb_pick"
592 }
593
594 fn signature(&self) -> &Signature {
595 &self.signature
596 }
597
598 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
599 Ok(DataType::LargeBinary)
600 }
601
602 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
603 let expanded = expand_args(&args.args)?;
604 let jsonb_arr = expanded[0]
605 .as_any()
606 .downcast_ref::<LargeBinaryArray>()
607 .ok_or_else(|| {
608 datafusion_common::DataFusionError::Internal(
609 "jsonb_pick: first arg must be LargeBinary".into(),
610 )
611 })?;
612 let keys_arr = expanded[1]
613 .as_any()
614 .downcast_ref::<ListArray>()
615 .ok_or_else(|| {
616 datafusion_common::DataFusionError::Internal(
617 "jsonb_pick: second arg must be List<Utf8>".into(),
618 )
619 })?;
620
621 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
622 for i in 0..jsonb_arr.len() {
623 if jsonb_arr.is_null(i) || keys_arr.is_null(i) {
624 builder.append_null();
625 } else {
626 let key_set = extract_string_set(keys_arr, i);
627 match json_types::jsonb_to_value(jsonb_arr.value(i)) {
628 Some(serde_json::Value::Object(obj)) => {
629 let picked: serde_json::Map<String, serde_json::Value> = obj
630 .into_iter()
631 .filter(|(k, _)| key_set.contains(k.as_str()))
632 .collect();
633 builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
634 picked,
635 )));
636 }
637 Some(other) => {
638 builder.append_value(json_types::encode_jsonb(&other));
639 }
640 None => builder.append_null(),
641 }
642 }
643 }
644 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
645 }
646}
647
648fn extract_string_set(list_arr: &ListArray, row: usize) -> HashSet<String> {
650 let values = list_arr.value(row);
651 let str_arr = values.as_any().downcast_ref::<StringArray>();
652 let mut result = HashSet::new();
653 if let Some(arr) = str_arr {
654 for j in 0..arr.len() {
655 if !arr.is_null(j) {
656 result.insert(arr.value(j).to_owned());
657 }
658 }
659 }
660 result
661}
662
663#[derive(Debug)]
671pub struct JsonbExcept {
672 signature: Signature,
673}
674
675impl JsonbExcept {
676 #[must_use]
678 pub fn new() -> Self {
679 Self {
680 signature: Signature::new(
681 TypeSignature::Exact(vec![
682 DataType::LargeBinary,
683 DataType::List(Arc::new(arrow_schema::Field::new(
684 "item",
685 DataType::Utf8,
686 true,
687 ))),
688 ]),
689 Volatility::Immutable,
690 ),
691 }
692 }
693}
694
695impl Default for JsonbExcept {
696 fn default() -> Self {
697 Self::new()
698 }
699}
700
701impl PartialEq for JsonbExcept {
702 fn eq(&self, _other: &Self) -> bool {
703 true
704 }
705}
706
707impl Eq for JsonbExcept {}
708
709impl Hash for JsonbExcept {
710 fn hash<H: Hasher>(&self, state: &mut H) {
711 "jsonb_except".hash(state);
712 }
713}
714
715impl ScalarUDFImpl for JsonbExcept {
716 fn as_any(&self) -> &dyn Any {
717 self
718 }
719
720 fn name(&self) -> &'static str {
721 "jsonb_except"
722 }
723
724 fn signature(&self) -> &Signature {
725 &self.signature
726 }
727
728 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
729 Ok(DataType::LargeBinary)
730 }
731
732 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
733 let expanded = expand_args(&args.args)?;
734 let jsonb_arr = expanded[0]
735 .as_any()
736 .downcast_ref::<LargeBinaryArray>()
737 .ok_or_else(|| {
738 datafusion_common::DataFusionError::Internal(
739 "jsonb_except: first arg must be LargeBinary".into(),
740 )
741 })?;
742 let keys_arr = expanded[1]
743 .as_any()
744 .downcast_ref::<ListArray>()
745 .ok_or_else(|| {
746 datafusion_common::DataFusionError::Internal(
747 "jsonb_except: second arg must be List<Utf8>".into(),
748 )
749 })?;
750
751 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
752 for i in 0..jsonb_arr.len() {
753 if jsonb_arr.is_null(i) || keys_arr.is_null(i) {
754 builder.append_null();
755 } else {
756 let exclude_set = extract_string_set(keys_arr, i);
757 match json_types::jsonb_to_value(jsonb_arr.value(i)) {
758 Some(serde_json::Value::Object(obj)) => {
759 let filtered: serde_json::Map<String, serde_json::Value> = obj
760 .into_iter()
761 .filter(|(k, _)| !exclude_set.contains(k.as_str()))
762 .collect();
763 builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
764 filtered,
765 )));
766 }
767 Some(other) => {
768 builder.append_value(json_types::encode_jsonb(&other));
769 }
770 None => builder.append_null(),
771 }
772 }
773 }
774 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
775 }
776}
777
778#[derive(Debug)]
787pub struct JsonbFlatten {
788 signature: Signature,
789}
790
791impl JsonbFlatten {
792 #[must_use]
794 pub fn new() -> Self {
795 Self {
796 signature: Signature::new(
797 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
798 Volatility::Immutable,
799 ),
800 }
801 }
802}
803
804impl Default for JsonbFlatten {
805 fn default() -> Self {
806 Self::new()
807 }
808}
809
810impl PartialEq for JsonbFlatten {
811 fn eq(&self, _other: &Self) -> bool {
812 true
813 }
814}
815
816impl Eq for JsonbFlatten {}
817
818impl Hash for JsonbFlatten {
819 fn hash<H: Hasher>(&self, state: &mut H) {
820 "jsonb_flatten".hash(state);
821 }
822}
823
824fn flatten_value(
825 val: &serde_json::Value,
826 prefix: &str,
827 sep: &str,
828 out: &mut serde_json::Map<String, serde_json::Value>,
829 depth: usize,
830) -> std::result::Result<(), String> {
831 if depth > MAX_DEPTH {
832 return Err("jsonb_flatten: max depth exceeded".into());
833 }
834 match val {
835 serde_json::Value::Object(obj) => {
836 for (k, v) in obj {
837 let new_key = if prefix.is_empty() {
838 k.clone()
839 } else {
840 format!("{prefix}{sep}{k}")
841 };
842 flatten_value(v, &new_key, sep, out, depth + 1)?;
843 }
844 }
845 serde_json::Value::Array(arr) => {
846 for (idx, v) in arr.iter().enumerate() {
847 let new_key = if prefix.is_empty() {
848 idx.to_string()
849 } else {
850 format!("{prefix}{sep}{idx}")
851 };
852 flatten_value(v, &new_key, sep, out, depth + 1)?;
853 }
854 }
855 _ => {
856 out.insert(prefix.to_owned(), val.clone());
857 }
858 }
859 Ok(())
860}
861
862impl ScalarUDFImpl for JsonbFlatten {
863 fn as_any(&self) -> &dyn Any {
864 self
865 }
866
867 fn name(&self) -> &'static str {
868 "jsonb_flatten"
869 }
870
871 fn signature(&self) -> &Signature {
872 &self.signature
873 }
874
875 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
876 Ok(DataType::LargeBinary)
877 }
878
879 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
880 let expanded = expand_args(&args.args)?;
881 let jsonb_arr = expanded[0]
882 .as_any()
883 .downcast_ref::<LargeBinaryArray>()
884 .ok_or_else(|| {
885 datafusion_common::DataFusionError::Internal(
886 "jsonb_flatten: first arg must be LargeBinary".into(),
887 )
888 })?;
889 let sep_arr = expanded[1]
890 .as_any()
891 .downcast_ref::<StringArray>()
892 .ok_or_else(|| {
893 datafusion_common::DataFusionError::Internal(
894 "jsonb_flatten: second arg must be Utf8".into(),
895 )
896 })?;
897
898 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
899 for i in 0..jsonb_arr.len() {
900 if jsonb_arr.is_null(i) || sep_arr.is_null(i) {
901 builder.append_null();
902 } else {
903 let sep = sep_arr.value(i);
904 match json_types::jsonb_to_value(jsonb_arr.value(i)) {
905 Some(val) => {
906 let mut flat = serde_json::Map::new();
907 flatten_value(&val, "", sep, &mut flat, 0)
908 .map_err(datafusion_common::DataFusionError::Execution)?;
909 builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
910 flat,
911 )));
912 }
913 None => builder.append_null(),
914 }
915 }
916 }
917 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
918 }
919}
920
921#[derive(Debug)]
931pub struct JsonbUnflatten {
932 signature: Signature,
933}
934
935impl JsonbUnflatten {
936 #[must_use]
938 pub fn new() -> Self {
939 Self {
940 signature: Signature::new(
941 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
942 Volatility::Immutable,
943 ),
944 }
945 }
946}
947
948impl Default for JsonbUnflatten {
949 fn default() -> Self {
950 Self::new()
951 }
952}
953
954impl PartialEq for JsonbUnflatten {
955 fn eq(&self, _other: &Self) -> bool {
956 true
957 }
958}
959
960impl Eq for JsonbUnflatten {}
961
962impl Hash for JsonbUnflatten {
963 fn hash<H: Hasher>(&self, state: &mut H) {
964 "jsonb_unflatten".hash(state);
965 }
966}
967
968fn unflatten_insert(root: &mut serde_json::Value, parts: &[&str], value: serde_json::Value) {
969 if parts.is_empty() {
970 return;
971 }
972 if parts.len() == 1 {
973 if let serde_json::Value::Object(obj) = root {
974 obj.insert(parts[0].to_owned(), value);
975 }
976 return;
977 }
978 if let serde_json::Value::Object(obj) = root {
979 let child = obj
980 .entry(parts[0].to_owned())
981 .or_insert_with(|| serde_json::Value::Object(serde_json::Map::new()));
982 unflatten_insert(child, &parts[1..], value);
983 }
984}
985
986impl ScalarUDFImpl for JsonbUnflatten {
987 fn as_any(&self) -> &dyn Any {
988 self
989 }
990
991 fn name(&self) -> &'static str {
992 "jsonb_unflatten"
993 }
994
995 fn signature(&self) -> &Signature {
996 &self.signature
997 }
998
999 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1000 Ok(DataType::LargeBinary)
1001 }
1002
1003 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1004 let expanded = expand_args(&args.args)?;
1005 let jsonb_arr = expanded[0]
1006 .as_any()
1007 .downcast_ref::<LargeBinaryArray>()
1008 .ok_or_else(|| {
1009 datafusion_common::DataFusionError::Internal(
1010 "jsonb_unflatten: first arg must be LargeBinary".into(),
1011 )
1012 })?;
1013 let sep_arr = expanded[1]
1014 .as_any()
1015 .downcast_ref::<StringArray>()
1016 .ok_or_else(|| {
1017 datafusion_common::DataFusionError::Internal(
1018 "jsonb_unflatten: second arg must be Utf8".into(),
1019 )
1020 })?;
1021
1022 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
1023 for i in 0..jsonb_arr.len() {
1024 if jsonb_arr.is_null(i) || sep_arr.is_null(i) {
1025 builder.append_null();
1026 } else {
1027 let sep = sep_arr.value(i);
1028 match json_types::jsonb_to_value(jsonb_arr.value(i)) {
1029 Some(serde_json::Value::Object(flat)) => {
1030 let mut root = serde_json::Value::Object(serde_json::Map::new());
1031 for (key, val) in flat {
1032 let parts: Vec<&str> = key.split(sep).collect();
1033 unflatten_insert(&mut root, &parts, val);
1034 }
1035 builder.append_value(json_types::encode_jsonb(&root));
1036 }
1037 Some(other) => {
1038 builder.append_value(json_types::encode_jsonb(&other));
1039 }
1040 None => builder.append_null(),
1041 }
1042 }
1043 }
1044 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1045 }
1046}
1047
1048#[derive(Debug)]
1059pub struct JsonToColumns {
1060 signature: Signature,
1061}
1062
1063impl JsonToColumns {
1064 #[must_use]
1066 pub fn new() -> Self {
1067 Self {
1068 signature: Signature::new(
1069 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
1070 Volatility::Immutable,
1071 ),
1072 }
1073 }
1074}
1075
1076impl Default for JsonToColumns {
1077 fn default() -> Self {
1078 Self::new()
1079 }
1080}
1081
1082impl PartialEq for JsonToColumns {
1083 fn eq(&self, _other: &Self) -> bool {
1084 true
1085 }
1086}
1087
1088impl Eq for JsonToColumns {}
1089
1090impl Hash for JsonToColumns {
1091 fn hash<H: Hasher>(&self, state: &mut H) {
1092 "json_to_columns".hash(state);
1093 }
1094}
1095
1096fn parse_type_spec_fields(spec: &str) -> Vec<String> {
1099 spec.split(',')
1100 .filter_map(|part| {
1101 let trimmed = part.trim();
1102 trimmed.split_whitespace().next().map(ToOwned::to_owned)
1103 })
1104 .collect()
1105}
1106
1107impl ScalarUDFImpl for JsonToColumns {
1108 fn as_any(&self) -> &dyn Any {
1109 self
1110 }
1111
1112 fn name(&self) -> &'static str {
1113 "json_to_columns"
1114 }
1115
1116 fn signature(&self) -> &Signature {
1117 &self.signature
1118 }
1119
1120 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1121 Ok(DataType::LargeBinary)
1122 }
1123
1124 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1125 let expanded = expand_args(&args.args)?;
1126 let jsonb_arr = expanded[0]
1127 .as_any()
1128 .downcast_ref::<LargeBinaryArray>()
1129 .ok_or_else(|| {
1130 datafusion_common::DataFusionError::Internal(
1131 "json_to_columns: first arg must be LargeBinary".into(),
1132 )
1133 })?;
1134 let spec_arr = expanded[1]
1135 .as_any()
1136 .downcast_ref::<StringArray>()
1137 .ok_or_else(|| {
1138 datafusion_common::DataFusionError::Internal(
1139 "json_to_columns: second arg must be Utf8".into(),
1140 )
1141 })?;
1142
1143 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
1144 for i in 0..jsonb_arr.len() {
1145 if jsonb_arr.is_null(i) || spec_arr.is_null(i) {
1146 builder.append_null();
1147 } else {
1148 let fields = parse_type_spec_fields(spec_arr.value(i));
1149 let field_set: HashSet<&str> = fields.iter().map(String::as_str).collect();
1150 match json_types::jsonb_to_value(jsonb_arr.value(i)) {
1151 Some(serde_json::Value::Object(obj)) => {
1152 let picked: serde_json::Map<String, serde_json::Value> = obj
1153 .into_iter()
1154 .filter(|(k, _)| field_set.contains(k.as_str()))
1155 .collect();
1156 builder.append_value(json_types::encode_jsonb(&serde_json::Value::Object(
1157 picked,
1158 )));
1159 }
1160 Some(other) => {
1161 builder.append_value(json_types::encode_jsonb(&other));
1162 }
1163 None => builder.append_null(),
1164 }
1165 }
1166 }
1167 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1168 }
1169}
1170
1171#[derive(Debug)]
1180pub struct JsonInferSchema {
1181 signature: Signature,
1182}
1183
1184impl JsonInferSchema {
1185 #[must_use]
1187 pub fn new() -> Self {
1188 Self {
1189 signature: Signature::new(
1190 TypeSignature::Exact(vec![DataType::LargeBinary]),
1191 Volatility::Immutable,
1192 ),
1193 }
1194 }
1195}
1196
1197impl Default for JsonInferSchema {
1198 fn default() -> Self {
1199 Self::new()
1200 }
1201}
1202
1203impl PartialEq for JsonInferSchema {
1204 fn eq(&self, _other: &Self) -> bool {
1205 true
1206 }
1207}
1208
1209impl Eq for JsonInferSchema {}
1210
1211impl Hash for JsonInferSchema {
1212 fn hash<H: Hasher>(&self, state: &mut H) {
1213 "json_infer_schema".hash(state);
1214 }
1215}
1216
1217fn infer_type(val: &serde_json::Value) -> String {
1218 match val {
1219 serde_json::Value::Null => "NULL".to_owned(),
1220 serde_json::Value::Bool(_) => "BOOLEAN".to_owned(),
1221 serde_json::Value::Number(n) => {
1222 if n.is_i64() || n.is_u64() {
1223 "BIGINT".to_owned()
1224 } else {
1225 "DOUBLE".to_owned()
1226 }
1227 }
1228 serde_json::Value::String(_) => "VARCHAR".to_owned(),
1229 serde_json::Value::Array(arr) => {
1230 let inner = arr.first().map_or("NULL".to_owned(), infer_type);
1231 format!("ARRAY<{inner}>")
1232 }
1233 serde_json::Value::Object(obj) => {
1234 let fields: Vec<String> = obj
1235 .iter()
1236 .map(|(k, v)| format!("{k} {}", infer_type(v)))
1237 .collect();
1238 format!("STRUCT({})", fields.join(", "))
1239 }
1240 }
1241}
1242
1243impl ScalarUDFImpl for JsonInferSchema {
1244 fn as_any(&self) -> &dyn Any {
1245 self
1246 }
1247
1248 fn name(&self) -> &'static str {
1249 "json_infer_schema"
1250 }
1251
1252 fn signature(&self) -> &Signature {
1253 &self.signature
1254 }
1255
1256 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1257 Ok(DataType::Utf8)
1258 }
1259
1260 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1261 let expanded = expand_args(&args.args)?;
1262 let jsonb_arr = expanded[0]
1263 .as_any()
1264 .downcast_ref::<LargeBinaryArray>()
1265 .ok_or_else(|| {
1266 datafusion_common::DataFusionError::Internal(
1267 "json_infer_schema: arg must be LargeBinary".into(),
1268 )
1269 })?;
1270
1271 let mut builder = StringBuilder::with_capacity(jsonb_arr.len(), 256);
1272 for i in 0..jsonb_arr.len() {
1273 if jsonb_arr.is_null(i) {
1274 builder.append_null();
1275 } else {
1276 match json_types::jsonb_to_value(jsonb_arr.value(i)) {
1277 Some(serde_json::Value::Object(obj)) => {
1278 let schema: serde_json::Map<String, serde_json::Value> = obj
1279 .iter()
1280 .map(|(k, v)| (k.clone(), serde_json::Value::String(infer_type(v))))
1281 .collect();
1282 builder.append_value(
1283 serde_json::to_string(&serde_json::Value::Object(schema))
1284 .unwrap_or_default(),
1285 );
1286 }
1287 Some(val) => {
1288 builder.append_value(infer_type(&val));
1289 }
1290 None => builder.append_null(),
1291 }
1292 }
1293 }
1294 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1295 }
1296}
1297
1298pub fn register_json_extensions(ctx: &datafusion::prelude::SessionContext) {
1304 use datafusion_expr::ScalarUDF;
1305
1306 ctx.register_udf(ScalarUDF::new_from_impl(JsonbMerge::new()));
1307 ctx.register_udf(ScalarUDF::new_from_impl(JsonbDeepMerge::new()));
1308 ctx.register_udf(ScalarUDF::new_from_impl(JsonbStripNulls::new()));
1309 ctx.register_udf(ScalarUDF::new_from_impl(JsonbRenameKeys::new()));
1310 ctx.register_udf(ScalarUDF::new_from_impl(JsonbPick::new()));
1311 ctx.register_udf(ScalarUDF::new_from_impl(JsonbExcept::new()));
1312 ctx.register_udf(ScalarUDF::new_from_impl(JsonbFlatten::new()));
1313 ctx.register_udf(ScalarUDF::new_from_impl(JsonbUnflatten::new()));
1314 ctx.register_udf(ScalarUDF::new_from_impl(JsonToColumns::new()));
1315 ctx.register_udf(ScalarUDF::new_from_impl(JsonInferSchema::new()));
1316}
1317
1318#[cfg(test)]
1323mod tests {
1324 use super::*;
1325 use arrow_array::builder::{MapBuilder, StringBuilder as MapSB};
1326 use arrow_array::ArrayRef;
1327 use arrow_schema::Field;
1328 use datafusion_common::config::ConfigOptions;
1329 use serde_json::json;
1330
1331 fn enc(v: &serde_json::Value) -> Vec<u8> {
1332 json_types::encode_jsonb(v)
1333 }
1334
1335 fn make_jsonb_array(vals: &[serde_json::Value]) -> LargeBinaryArray {
1336 let encoded: Vec<Vec<u8>> = vals.iter().map(enc).collect();
1337 let refs: Vec<&[u8]> = encoded.iter().map(Vec::as_slice).collect();
1338 LargeBinaryArray::from_iter_values(refs)
1339 }
1340
1341 fn make_args_2(a: ArrayRef, b: ArrayRef) -> ScalarFunctionArgs {
1342 ScalarFunctionArgs {
1343 args: vec![ColumnarValue::Array(a), ColumnarValue::Array(b)],
1344 arg_fields: vec![],
1345 number_rows: 0,
1346 return_field: Arc::new(Field::new("output", DataType::LargeBinary, true)),
1347 config_options: Arc::new(ConfigOptions::default()),
1348 }
1349 }
1350
1351 fn make_args_1(a: ArrayRef) -> ScalarFunctionArgs {
1352 ScalarFunctionArgs {
1353 args: vec![ColumnarValue::Array(a)],
1354 arg_fields: vec![],
1355 number_rows: 0,
1356 return_field: Arc::new(Field::new("output", DataType::LargeBinary, true)),
1357 config_options: Arc::new(ConfigOptions::default()),
1358 }
1359 }
1360
1361 fn decode_jsonb_result(result: ColumnarValue, row: usize) -> serde_json::Value {
1362 let ColumnarValue::Array(arr) = result else {
1363 panic!("expected array")
1364 };
1365 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1366 assert!(!bin.is_null(row), "unexpected null at row {row}");
1367 json_types::jsonb_to_value(bin.value(row)).expect("invalid jsonb")
1368 }
1369
1370 fn decode_text_result(result: ColumnarValue, row: usize) -> String {
1371 let ColumnarValue::Array(arr) = result else {
1372 panic!("expected array")
1373 };
1374 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1375 str_arr.value(row).to_owned()
1376 }
1377
1378 #[test]
1381 fn test_json_ext_merge_objects() {
1382 let udf = JsonbMerge::new();
1383 let left = make_jsonb_array(&[json!({"a": 1, "b": 2})]);
1384 let right = make_jsonb_array(&[json!({"b": 99, "c": 3})]);
1385 let result = udf
1386 .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1387 .unwrap();
1388 assert_eq!(
1389 decode_jsonb_result(result, 0),
1390 json!({"a": 1, "b": 99, "c": 3})
1391 );
1392 }
1393
1394 #[test]
1395 fn test_json_ext_merge_non_object() {
1396 let udf = JsonbMerge::new();
1397 let left = make_jsonb_array(&[json!(42)]);
1398 let right = make_jsonb_array(&[json!("hello")]);
1399 let result = udf
1400 .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1401 .unwrap();
1402 assert_eq!(decode_jsonb_result(result, 0), json!("hello"));
1403 }
1404
1405 #[test]
1408 fn test_json_ext_deep_merge() {
1409 let udf = JsonbDeepMerge::new();
1410 let left = make_jsonb_array(&[json!({"a": {"x": 1, "y": 2}, "b": 10})]);
1411 let right = make_jsonb_array(&[json!({"a": {"y": 99, "z": 3}, "c": 20})]);
1412 let result = udf
1413 .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1414 .unwrap();
1415 assert_eq!(
1416 decode_jsonb_result(result, 0),
1417 json!({"a": {"x": 1, "y": 99, "z": 3}, "b": 10, "c": 20})
1418 );
1419 }
1420
1421 #[test]
1422 fn test_json_ext_deep_merge_non_object_override() {
1423 let udf = JsonbDeepMerge::new();
1424 let left = make_jsonb_array(&[json!({"a": {"x": 1}})]);
1425 let right = make_jsonb_array(&[json!({"a": 42})]);
1426 let result = udf
1427 .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1428 .unwrap();
1429 assert_eq!(decode_jsonb_result(result, 0), json!({"a": 42}));
1430 }
1431
1432 #[test]
1435 fn test_json_ext_strip_nulls() {
1436 let udf = JsonbStripNulls::new();
1437 let input = make_jsonb_array(&[json!({"a": 1, "b": null, "c": {"d": null, "e": 2}})]);
1438 let result = udf.invoke_with_args(make_args_1(Arc::new(input))).unwrap();
1439 assert_eq!(
1440 decode_jsonb_result(result, 0),
1441 json!({"a": 1, "c": {"e": 2}})
1442 );
1443 }
1444
1445 #[test]
1446 fn test_json_ext_strip_nulls_array_preserved() {
1447 let udf = JsonbStripNulls::new();
1448 let input = make_jsonb_array(&[json!({"arr": [1, null, 3]})]);
1449 let result = udf.invoke_with_args(make_args_1(Arc::new(input))).unwrap();
1450 assert_eq!(decode_jsonb_result(result, 0), json!({"arr": [1, null, 3]}));
1451 }
1452
1453 #[test]
1456 fn test_json_ext_rename_keys() {
1457 let udf = JsonbRenameKeys::new();
1458 let input = make_jsonb_array(&[json!({"old_name": 1, "keep": 2})]);
1459
1460 let key_builder = MapSB::new();
1462 let val_builder = MapSB::new();
1463 let mut map_builder = MapBuilder::new(None, key_builder, val_builder);
1464 map_builder.keys().append_value("old_name");
1465 map_builder.values().append_value("new_name");
1466 map_builder.append(true).unwrap();
1467 let map_arr = map_builder.finish();
1468
1469 let result = udf
1470 .invoke_with_args(make_args_2(Arc::new(input), Arc::new(map_arr)))
1471 .unwrap();
1472 assert_eq!(
1473 decode_jsonb_result(result, 0),
1474 json!({"keep": 2, "new_name": 1})
1475 );
1476 }
1477
1478 #[test]
1481 fn test_json_ext_pick() {
1482 let udf = JsonbPick::new();
1483 let input = make_jsonb_array(&[json!({"a": 1, "b": 2, "c": 3})]);
1484 let keys = make_string_list(&[&["a", "c"]]);
1485 let result = udf
1486 .invoke_with_args(make_args_2(Arc::new(input), Arc::new(keys)))
1487 .unwrap();
1488 assert_eq!(decode_jsonb_result(result, 0), json!({"a": 1, "c": 3}));
1489 }
1490
1491 #[test]
1494 fn test_json_ext_except() {
1495 let udf = JsonbExcept::new();
1496 let input = make_jsonb_array(&[json!({"a": 1, "b": 2, "c": 3})]);
1497 let keys = make_string_list(&[&["b"]]);
1498 let result = udf
1499 .invoke_with_args(make_args_2(Arc::new(input), Arc::new(keys)))
1500 .unwrap();
1501 assert_eq!(decode_jsonb_result(result, 0), json!({"a": 1, "c": 3}));
1502 }
1503
1504 #[test]
1507 fn test_json_ext_flatten() {
1508 let udf = JsonbFlatten::new();
1509 let input = make_jsonb_array(&[json!({"a": {"b": 1, "c": [2, 3]}})]);
1510 let sep = StringArray::from(vec!["."]);
1511 let result = udf
1512 .invoke_with_args(make_args_2(Arc::new(input), Arc::new(sep)))
1513 .unwrap();
1514 assert_eq!(
1515 decode_jsonb_result(result, 0),
1516 json!({"a.b": 1, "a.c.0": 2, "a.c.1": 3})
1517 );
1518 }
1519
1520 #[test]
1521 fn test_json_ext_flatten_custom_sep() {
1522 let udf = JsonbFlatten::new();
1523 let input = make_jsonb_array(&[json!({"x": {"y": 42}})]);
1524 let sep = StringArray::from(vec!["/"]);
1525 let result = udf
1526 .invoke_with_args(make_args_2(Arc::new(input), Arc::new(sep)))
1527 .unwrap();
1528 assert_eq!(decode_jsonb_result(result, 0), json!({"x/y": 42}));
1529 }
1530
1531 #[test]
1534 fn test_json_ext_unflatten() {
1535 let udf = JsonbUnflatten::new();
1536 let input = make_jsonb_array(&[json!({"a.b": 1, "a.c": 2, "d": 3})]);
1537 let sep = StringArray::from(vec!["."]);
1538 let result = udf
1539 .invoke_with_args(make_args_2(Arc::new(input), Arc::new(sep)))
1540 .unwrap();
1541 assert_eq!(
1542 decode_jsonb_result(result, 0),
1543 json!({"a": {"b": 1, "c": 2}, "d": 3})
1544 );
1545 }
1546
1547 #[test]
1550 fn test_json_ext_to_columns() {
1551 let udf = JsonToColumns::new();
1552 let input = make_jsonb_array(&[json!({"name": "Alice", "age": 30, "active": true})]);
1553 let spec = StringArray::from(vec!["name VARCHAR, age BIGINT"]);
1554 let result = udf
1555 .invoke_with_args(make_args_2(Arc::new(input), Arc::new(spec)))
1556 .unwrap();
1557 assert_eq!(
1558 decode_jsonb_result(result, 0),
1559 json!({"age": 30, "name": "Alice"})
1560 );
1561 }
1562
1563 #[test]
1566 fn test_json_ext_infer_schema_object() {
1567 let udf = JsonInferSchema::new();
1568 let input = make_jsonb_array(&[json!({"name": "Alice", "age": 30, "active": true})]);
1569 let args = ScalarFunctionArgs {
1570 args: vec![ColumnarValue::Array(Arc::new(input))],
1571 arg_fields: vec![],
1572 number_rows: 0,
1573 return_field: Arc::new(Field::new("output", DataType::Utf8, true)),
1574 config_options: Arc::new(ConfigOptions::default()),
1575 };
1576 let result = udf.invoke_with_args(args).unwrap();
1577 let text = decode_text_result(result, 0);
1578 let parsed: serde_json::Value = serde_json::from_str(&text).unwrap();
1579 assert_eq!(parsed["name"], "VARCHAR");
1580 assert_eq!(parsed["age"], "BIGINT");
1581 assert_eq!(parsed["active"], "BOOLEAN");
1582 }
1583
1584 #[test]
1585 fn test_json_ext_infer_schema_nested() {
1586 let udf = JsonInferSchema::new();
1587 let input = make_jsonb_array(&[json!({"tags": [1, 2], "meta": {"x": 1.5}})]);
1588 let args = ScalarFunctionArgs {
1589 args: vec![ColumnarValue::Array(Arc::new(input))],
1590 arg_fields: vec![],
1591 number_rows: 0,
1592 return_field: Arc::new(Field::new("output", DataType::Utf8, true)),
1593 config_options: Arc::new(ConfigOptions::default()),
1594 };
1595 let result = udf.invoke_with_args(args).unwrap();
1596 let text = decode_text_result(result, 0);
1597 let parsed: serde_json::Value = serde_json::from_str(&text).unwrap();
1598 assert_eq!(parsed["tags"], "ARRAY<BIGINT>");
1599 assert_eq!(parsed["meta"], "STRUCT(x DOUBLE)");
1600 }
1601
1602 #[test]
1603 fn test_json_ext_infer_schema_scalar() {
1604 let udf = JsonInferSchema::new();
1605 let input = make_jsonb_array(&[json!(42)]);
1606 let args = ScalarFunctionArgs {
1607 args: vec![ColumnarValue::Array(Arc::new(input))],
1608 arg_fields: vec![],
1609 number_rows: 0,
1610 return_field: Arc::new(Field::new("output", DataType::Utf8, true)),
1611 config_options: Arc::new(ConfigOptions::default()),
1612 };
1613 let result = udf.invoke_with_args(args).unwrap();
1614 assert_eq!(decode_text_result(result, 0), "BIGINT");
1615 }
1616
1617 #[test]
1620 fn test_json_ext_all_udfs_register() {
1621 use datafusion_expr::ScalarUDF;
1622
1623 let names: Vec<String> = vec![
1624 ScalarUDF::new_from_impl(JsonbMerge::new())
1625 .name()
1626 .to_owned(),
1627 ScalarUDF::new_from_impl(JsonbDeepMerge::new())
1628 .name()
1629 .to_owned(),
1630 ScalarUDF::new_from_impl(JsonbStripNulls::new())
1631 .name()
1632 .to_owned(),
1633 ScalarUDF::new_from_impl(JsonbRenameKeys::new())
1634 .name()
1635 .to_owned(),
1636 ScalarUDF::new_from_impl(JsonbPick::new()).name().to_owned(),
1637 ScalarUDF::new_from_impl(JsonbExcept::new())
1638 .name()
1639 .to_owned(),
1640 ScalarUDF::new_from_impl(JsonbFlatten::new())
1641 .name()
1642 .to_owned(),
1643 ScalarUDF::new_from_impl(JsonbUnflatten::new())
1644 .name()
1645 .to_owned(),
1646 ScalarUDF::new_from_impl(JsonToColumns::new())
1647 .name()
1648 .to_owned(),
1649 ScalarUDF::new_from_impl(JsonInferSchema::new())
1650 .name()
1651 .to_owned(),
1652 ];
1653 for name in &names {
1654 assert!(!name.is_empty(), "UDF has empty name");
1655 }
1656 assert_eq!(names.len(), 10);
1657 }
1658
1659 #[test]
1662 fn test_json_ext_merge_null_input() {
1663 let udf = JsonbMerge::new();
1664 let left = LargeBinaryArray::new_null(1);
1665 let right = make_jsonb_array(&[json!({"a": 1})]);
1666 let result = udf
1667 .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1668 .unwrap();
1669 let ColumnarValue::Array(arr) = result else {
1670 panic!("expected array")
1671 };
1672 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1673 assert!(bin.is_null(0));
1674 }
1675
1676 fn make_string_list(rows: &[&[&str]]) -> ListArray {
1679 use arrow_array::builder::{ListBuilder, StringBuilder};
1680
1681 let mut builder = ListBuilder::new(StringBuilder::new());
1682 for row in rows {
1683 for &s in *row {
1684 builder.values().append_value(s);
1685 }
1686 builder.append(true);
1687 }
1688 builder.finish()
1689 }
1690}