1use std::any::Any;
13use std::hash::{Hash, Hasher};
14use std::sync::Arc;
15
16use arrow::datatypes::DataType;
17use arrow_array::{
18 builder::{LargeBinaryBuilder, StringBuilder},
19 Array, ArrayRef, BooleanArray, LargeBinaryArray, ListArray, StringArray,
20};
21use datafusion_common::Result;
22use datafusion_expr::{
23 ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, TypeSignature, Volatility,
24};
25
26use super::json_types;
27
28fn output_len(args: &[ColumnarValue]) -> usize {
32 for a in args {
33 if let ColumnarValue::Array(arr) = a {
34 return arr.len();
35 }
36 }
37 1
38}
39
40pub fn expand_args(args: &[ColumnarValue]) -> Result<Vec<ArrayRef>> {
46 let len = output_len(args);
47 args.iter()
48 .map(|a| match a {
49 ColumnarValue::Array(arr) => Ok(Arc::clone(arr)),
50 ColumnarValue::Scalar(s) => s.to_array_of_size(len),
51 })
52 .collect()
53}
54
55#[derive(Debug)]
64pub struct JsonbGet {
65 signature: Signature,
66}
67
68impl JsonbGet {
69 #[must_use]
71 pub fn new() -> Self {
72 Self {
73 signature: Signature::new(
74 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
75 Volatility::Immutable,
76 ),
77 }
78 }
79}
80
81impl Default for JsonbGet {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87impl PartialEq for JsonbGet {
88 fn eq(&self, _other: &Self) -> bool {
89 true
90 }
91}
92
93impl Eq for JsonbGet {}
94
95impl Hash for JsonbGet {
96 fn hash<H: Hasher>(&self, state: &mut H) {
97 "jsonb_get".hash(state);
98 }
99}
100
101impl ScalarUDFImpl for JsonbGet {
102 fn as_any(&self) -> &dyn Any {
103 self
104 }
105
106 fn name(&self) -> &'static str {
107 "jsonb_get"
108 }
109
110 fn signature(&self) -> &Signature {
111 &self.signature
112 }
113
114 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
115 Ok(DataType::LargeBinary)
116 }
117
118 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
119 let expanded = expand_args(&args.args)?;
120 let jsonb_arr = expanded[0]
121 .as_any()
122 .downcast_ref::<LargeBinaryArray>()
123 .ok_or_else(|| {
124 datafusion_common::DataFusionError::Internal(
125 "jsonb_get: first arg must be LargeBinary".into(),
126 )
127 })?;
128 let key_arr = expanded[1]
129 .as_any()
130 .downcast_ref::<StringArray>()
131 .ok_or_else(|| {
132 datafusion_common::DataFusionError::Internal(
133 "jsonb_get: second arg must be Utf8".into(),
134 )
135 })?;
136
137 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
138 for i in 0..jsonb_arr.len() {
139 if jsonb_arr.is_null(i) || key_arr.is_null(i) {
140 builder.append_null();
141 } else {
142 let jsonb = jsonb_arr.value(i);
143 let key = key_arr.value(i);
144 match json_types::jsonb_get_field(jsonb, key) {
145 Some(val) => builder.append_value(val),
146 None => builder.append_null(),
147 }
148 }
149 }
150 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
151 }
152}
153
154#[derive(Debug)]
163pub struct JsonbGetIdx {
164 signature: Signature,
165}
166
167impl JsonbGetIdx {
168 #[must_use]
170 pub fn new() -> Self {
171 Self {
172 signature: Signature::new(
173 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Int32]),
174 Volatility::Immutable,
175 ),
176 }
177 }
178}
179
180impl Default for JsonbGetIdx {
181 fn default() -> Self {
182 Self::new()
183 }
184}
185
186impl PartialEq for JsonbGetIdx {
187 fn eq(&self, _other: &Self) -> bool {
188 true
189 }
190}
191
192impl Eq for JsonbGetIdx {}
193
194impl Hash for JsonbGetIdx {
195 fn hash<H: Hasher>(&self, state: &mut H) {
196 "jsonb_get_idx".hash(state);
197 }
198}
199
200impl ScalarUDFImpl for JsonbGetIdx {
201 fn as_any(&self) -> &dyn Any {
202 self
203 }
204
205 fn name(&self) -> &'static str {
206 "jsonb_get_idx"
207 }
208
209 fn signature(&self) -> &Signature {
210 &self.signature
211 }
212
213 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
214 Ok(DataType::LargeBinary)
215 }
216
217 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
218 let expanded = expand_args(&args.args)?;
219 let jsonb_arr = expanded[0]
220 .as_any()
221 .downcast_ref::<LargeBinaryArray>()
222 .ok_or_else(|| {
223 datafusion_common::DataFusionError::Internal(
224 "jsonb_get_idx: first arg must be LargeBinary".into(),
225 )
226 })?;
227 let idx_arr = expanded[1]
228 .as_any()
229 .downcast_ref::<arrow_array::Int32Array>()
230 .ok_or_else(|| {
231 datafusion_common::DataFusionError::Internal(
232 "jsonb_get_idx: second arg must be Int32".into(),
233 )
234 })?;
235
236 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
237 for i in 0..jsonb_arr.len() {
238 if jsonb_arr.is_null(i) || idx_arr.is_null(i) {
239 builder.append_null();
240 } else {
241 let jsonb = jsonb_arr.value(i);
242 let idx = idx_arr.value(i);
243 match usize::try_from(idx)
244 .ok()
245 .and_then(|u| json_types::jsonb_array_get(jsonb, u))
246 {
247 Some(val) => builder.append_value(val),
248 None => builder.append_null(),
249 }
250 }
251 }
252 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
253 }
254}
255
256#[derive(Debug)]
265pub struct JsonbGetText {
266 signature: Signature,
267}
268
269impl JsonbGetText {
270 #[must_use]
272 pub fn new() -> Self {
273 Self {
274 signature: Signature::new(
275 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
276 Volatility::Immutable,
277 ),
278 }
279 }
280}
281
282impl Default for JsonbGetText {
283 fn default() -> Self {
284 Self::new()
285 }
286}
287
288impl PartialEq for JsonbGetText {
289 fn eq(&self, _other: &Self) -> bool {
290 true
291 }
292}
293
294impl Eq for JsonbGetText {}
295
296impl Hash for JsonbGetText {
297 fn hash<H: Hasher>(&self, state: &mut H) {
298 "jsonb_get_text".hash(state);
299 }
300}
301
302impl ScalarUDFImpl for JsonbGetText {
303 fn as_any(&self) -> &dyn Any {
304 self
305 }
306
307 fn name(&self) -> &'static str {
308 "jsonb_get_text"
309 }
310
311 fn signature(&self) -> &Signature {
312 &self.signature
313 }
314
315 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
316 Ok(DataType::Utf8)
317 }
318
319 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
320 let expanded = expand_args(&args.args)?;
321 let jsonb_arr = expanded[0]
322 .as_any()
323 .downcast_ref::<LargeBinaryArray>()
324 .ok_or_else(|| {
325 datafusion_common::DataFusionError::Internal(
326 "jsonb_get_text: first arg must be LargeBinary".into(),
327 )
328 })?;
329 let key_arr = expanded[1]
330 .as_any()
331 .downcast_ref::<StringArray>()
332 .ok_or_else(|| {
333 datafusion_common::DataFusionError::Internal(
334 "jsonb_get_text: second arg must be Utf8".into(),
335 )
336 })?;
337
338 let mut builder = StringBuilder::with_capacity(jsonb_arr.len(), 256);
339 for i in 0..jsonb_arr.len() {
340 if jsonb_arr.is_null(i) || key_arr.is_null(i) {
341 builder.append_null();
342 } else {
343 let jsonb = jsonb_arr.value(i);
344 let key = key_arr.value(i);
345 match json_types::jsonb_get_field(jsonb, key) {
346 Some(val) => match json_types::jsonb_to_text(val) {
347 Some(text) => builder.append_value(&text),
348 None => builder.append_null(),
349 },
350 None => builder.append_null(),
351 }
352 }
353 }
354 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
355 }
356}
357
358#[derive(Debug)]
367pub struct JsonbGetTextIdx {
368 signature: Signature,
369}
370
371impl JsonbGetTextIdx {
372 #[must_use]
374 pub fn new() -> Self {
375 Self {
376 signature: Signature::new(
377 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Int32]),
378 Volatility::Immutable,
379 ),
380 }
381 }
382}
383
384impl Default for JsonbGetTextIdx {
385 fn default() -> Self {
386 Self::new()
387 }
388}
389
390impl PartialEq for JsonbGetTextIdx {
391 fn eq(&self, _other: &Self) -> bool {
392 true
393 }
394}
395
396impl Eq for JsonbGetTextIdx {}
397
398impl Hash for JsonbGetTextIdx {
399 fn hash<H: Hasher>(&self, state: &mut H) {
400 "jsonb_get_text_idx".hash(state);
401 }
402}
403
404impl ScalarUDFImpl for JsonbGetTextIdx {
405 fn as_any(&self) -> &dyn Any {
406 self
407 }
408
409 fn name(&self) -> &'static str {
410 "jsonb_get_text_idx"
411 }
412
413 fn signature(&self) -> &Signature {
414 &self.signature
415 }
416
417 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
418 Ok(DataType::Utf8)
419 }
420
421 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
422 let expanded = expand_args(&args.args)?;
423 let jsonb_arr = expanded[0]
424 .as_any()
425 .downcast_ref::<LargeBinaryArray>()
426 .ok_or_else(|| {
427 datafusion_common::DataFusionError::Internal(
428 "jsonb_get_text_idx: first arg must be LargeBinary".into(),
429 )
430 })?;
431 let idx_arr = expanded[1]
432 .as_any()
433 .downcast_ref::<arrow_array::Int32Array>()
434 .ok_or_else(|| {
435 datafusion_common::DataFusionError::Internal(
436 "jsonb_get_text_idx: second arg must be Int32".into(),
437 )
438 })?;
439
440 let mut builder = StringBuilder::with_capacity(jsonb_arr.len(), 256);
441 for i in 0..jsonb_arr.len() {
442 if jsonb_arr.is_null(i) || idx_arr.is_null(i) {
443 builder.append_null();
444 } else {
445 let jsonb = jsonb_arr.value(i);
446 let idx = idx_arr.value(i);
447 match usize::try_from(idx)
448 .ok()
449 .and_then(|u| json_types::jsonb_array_get(jsonb, u))
450 {
451 Some(val) => match json_types::jsonb_to_text(val) {
452 Some(text) => builder.append_value(&text),
453 None => builder.append_null(),
454 },
455 None => builder.append_null(),
456 }
457 }
458 }
459 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
460 }
461}
462
463#[derive(Debug)]
472pub struct JsonbGetPath {
473 signature: Signature,
474}
475
476impl JsonbGetPath {
477 #[must_use]
479 pub fn new() -> Self {
480 Self {
481 signature: Signature::new(
482 TypeSignature::Exact(vec![
483 DataType::LargeBinary,
484 DataType::List(Arc::new(arrow_schema::Field::new(
485 "item",
486 DataType::Utf8,
487 true,
488 ))),
489 ]),
490 Volatility::Immutable,
491 ),
492 }
493 }
494}
495
496impl Default for JsonbGetPath {
497 fn default() -> Self {
498 Self::new()
499 }
500}
501
502impl PartialEq for JsonbGetPath {
503 fn eq(&self, _other: &Self) -> bool {
504 true
505 }
506}
507
508impl Eq for JsonbGetPath {}
509
510impl Hash for JsonbGetPath {
511 fn hash<H: Hasher>(&self, state: &mut H) {
512 "jsonb_get_path".hash(state);
513 }
514}
515
516impl ScalarUDFImpl for JsonbGetPath {
517 fn as_any(&self) -> &dyn Any {
518 self
519 }
520
521 fn name(&self) -> &'static str {
522 "jsonb_get_path"
523 }
524
525 fn signature(&self) -> &Signature {
526 &self.signature
527 }
528
529 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
530 Ok(DataType::LargeBinary)
531 }
532
533 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
534 let expanded = expand_args(&args.args)?;
535 let jsonb_arr = expanded[0]
536 .as_any()
537 .downcast_ref::<LargeBinaryArray>()
538 .ok_or_else(|| {
539 datafusion_common::DataFusionError::Internal(
540 "jsonb_get_path: first arg must be LargeBinary".into(),
541 )
542 })?;
543 let path_arr = expanded[1]
544 .as_any()
545 .downcast_ref::<ListArray>()
546 .ok_or_else(|| {
547 datafusion_common::DataFusionError::Internal(
548 "jsonb_get_path: second arg must be List<Utf8>".into(),
549 )
550 })?;
551
552 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_arr.len(), 256);
553 for i in 0..jsonb_arr.len() {
554 if jsonb_arr.is_null(i) || path_arr.is_null(i) {
555 builder.append_null();
556 } else {
557 let jsonb = jsonb_arr.value(i);
558 let path_list = path_arr.value(i);
559 let path_strings = path_list
560 .as_any()
561 .downcast_ref::<StringArray>()
562 .ok_or_else(|| {
563 datafusion_common::DataFusionError::Internal(
564 "jsonb_get_path: path elements must be Utf8".into(),
565 )
566 })?;
567 match walk_path(jsonb, path_strings) {
568 Some(val) => builder.append_value(val),
569 None => builder.append_null(),
570 }
571 }
572 }
573 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
574 }
575}
576
577fn walk_path<'a>(mut jsonb: &'a [u8], path: &StringArray) -> Option<&'a [u8]> {
579 for i in 0..path.len() {
580 if path.is_null(i) {
581 return None;
582 }
583 let key = path.value(i);
584 if let Some(next) = json_types::jsonb_get_field(jsonb, key) {
586 jsonb = next;
587 } else if let Ok(idx) = key.parse::<usize>() {
588 jsonb = json_types::jsonb_array_get(jsonb, idx)?;
590 } else {
591 return None;
592 }
593 }
594 Some(jsonb)
595}
596
597#[derive(Debug)]
606pub struct JsonbGetPathText {
607 signature: Signature,
608}
609
610impl JsonbGetPathText {
611 #[must_use]
613 pub fn new() -> Self {
614 Self {
615 signature: Signature::new(
616 TypeSignature::Exact(vec![
617 DataType::LargeBinary,
618 DataType::List(Arc::new(arrow_schema::Field::new(
619 "item",
620 DataType::Utf8,
621 true,
622 ))),
623 ]),
624 Volatility::Immutable,
625 ),
626 }
627 }
628}
629
630impl Default for JsonbGetPathText {
631 fn default() -> Self {
632 Self::new()
633 }
634}
635
636impl PartialEq for JsonbGetPathText {
637 fn eq(&self, _other: &Self) -> bool {
638 true
639 }
640}
641
642impl Eq for JsonbGetPathText {}
643
644impl Hash for JsonbGetPathText {
645 fn hash<H: Hasher>(&self, state: &mut H) {
646 "jsonb_get_path_text".hash(state);
647 }
648}
649
650impl ScalarUDFImpl for JsonbGetPathText {
651 fn as_any(&self) -> &dyn Any {
652 self
653 }
654
655 fn name(&self) -> &'static str {
656 "jsonb_get_path_text"
657 }
658
659 fn signature(&self) -> &Signature {
660 &self.signature
661 }
662
663 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
664 Ok(DataType::Utf8)
665 }
666
667 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
668 let expanded = expand_args(&args.args)?;
669 let jsonb_arr = expanded[0]
670 .as_any()
671 .downcast_ref::<LargeBinaryArray>()
672 .ok_or_else(|| {
673 datafusion_common::DataFusionError::Internal(
674 "jsonb_get_path_text: first arg must be LargeBinary".into(),
675 )
676 })?;
677 let path_arr = expanded[1]
678 .as_any()
679 .downcast_ref::<ListArray>()
680 .ok_or_else(|| {
681 datafusion_common::DataFusionError::Internal(
682 "jsonb_get_path_text: second arg must be List<Utf8>".into(),
683 )
684 })?;
685
686 let mut builder = StringBuilder::with_capacity(jsonb_arr.len(), 256);
687 for i in 0..jsonb_arr.len() {
688 if jsonb_arr.is_null(i) || path_arr.is_null(i) {
689 builder.append_null();
690 } else {
691 let jsonb = jsonb_arr.value(i);
692 let path_list = path_arr.value(i);
693 let path_strings = path_list
694 .as_any()
695 .downcast_ref::<StringArray>()
696 .ok_or_else(|| {
697 datafusion_common::DataFusionError::Internal(
698 "jsonb_get_path_text: path elements must be Utf8".into(),
699 )
700 })?;
701 match walk_path(jsonb, path_strings) {
702 Some(val) => match json_types::jsonb_to_text(val) {
703 Some(text) => builder.append_value(&text),
704 None => builder.append_null(),
705 },
706 None => builder.append_null(),
707 }
708 }
709 }
710 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
711 }
712}
713
714#[derive(Debug)]
723pub struct JsonbExists {
724 signature: Signature,
725}
726
727impl JsonbExists {
728 #[must_use]
730 pub fn new() -> Self {
731 Self {
732 signature: Signature::new(
733 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::Utf8]),
734 Volatility::Immutable,
735 ),
736 }
737 }
738}
739
740impl Default for JsonbExists {
741 fn default() -> Self {
742 Self::new()
743 }
744}
745
746impl PartialEq for JsonbExists {
747 fn eq(&self, _other: &Self) -> bool {
748 true
749 }
750}
751
752impl Eq for JsonbExists {}
753
754impl Hash for JsonbExists {
755 fn hash<H: Hasher>(&self, state: &mut H) {
756 "jsonb_exists".hash(state);
757 }
758}
759
760impl ScalarUDFImpl for JsonbExists {
761 fn as_any(&self) -> &dyn Any {
762 self
763 }
764
765 fn name(&self) -> &'static str {
766 "jsonb_exists"
767 }
768
769 fn signature(&self) -> &Signature {
770 &self.signature
771 }
772
773 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
774 Ok(DataType::Boolean)
775 }
776
777 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
778 let expanded = expand_args(&args.args)?;
779 let jsonb_arr = expanded[0]
780 .as_any()
781 .downcast_ref::<LargeBinaryArray>()
782 .ok_or_else(|| {
783 datafusion_common::DataFusionError::Internal(
784 "jsonb_exists: first arg must be LargeBinary".into(),
785 )
786 })?;
787 let key_arr = expanded[1]
788 .as_any()
789 .downcast_ref::<StringArray>()
790 .ok_or_else(|| {
791 datafusion_common::DataFusionError::Internal(
792 "jsonb_exists: second arg must be Utf8".into(),
793 )
794 })?;
795
796 let result: BooleanArray = (0..jsonb_arr.len())
797 .map(|i| {
798 if jsonb_arr.is_null(i) || key_arr.is_null(i) {
799 None
800 } else {
801 Some(json_types::jsonb_has_key(
802 jsonb_arr.value(i),
803 key_arr.value(i),
804 ))
805 }
806 })
807 .collect();
808 Ok(ColumnarValue::Array(Arc::new(result)))
809 }
810}
811
812#[derive(Debug)]
821pub struct JsonbExistsAny {
822 signature: Signature,
823}
824
825impl JsonbExistsAny {
826 #[must_use]
828 pub fn new() -> Self {
829 Self {
830 signature: Signature::new(
831 TypeSignature::Exact(vec![
832 DataType::LargeBinary,
833 DataType::List(Arc::new(arrow_schema::Field::new(
834 "item",
835 DataType::Utf8,
836 true,
837 ))),
838 ]),
839 Volatility::Immutable,
840 ),
841 }
842 }
843}
844
845impl Default for JsonbExistsAny {
846 fn default() -> Self {
847 Self::new()
848 }
849}
850
851impl PartialEq for JsonbExistsAny {
852 fn eq(&self, _other: &Self) -> bool {
853 true
854 }
855}
856
857impl Eq for JsonbExistsAny {}
858
859impl Hash for JsonbExistsAny {
860 fn hash<H: Hasher>(&self, state: &mut H) {
861 "jsonb_exists_any".hash(state);
862 }
863}
864
865impl ScalarUDFImpl for JsonbExistsAny {
866 fn as_any(&self) -> &dyn Any {
867 self
868 }
869
870 fn name(&self) -> &'static str {
871 "jsonb_exists_any"
872 }
873
874 fn signature(&self) -> &Signature {
875 &self.signature
876 }
877
878 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
879 Ok(DataType::Boolean)
880 }
881
882 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
883 let expanded = expand_args(&args.args)?;
884 let jsonb_arr = expanded[0]
885 .as_any()
886 .downcast_ref::<LargeBinaryArray>()
887 .ok_or_else(|| {
888 datafusion_common::DataFusionError::Internal(
889 "jsonb_exists_any: first arg must be LargeBinary".into(),
890 )
891 })?;
892 let keys_arr = expanded[1]
893 .as_any()
894 .downcast_ref::<ListArray>()
895 .ok_or_else(|| {
896 datafusion_common::DataFusionError::Internal(
897 "jsonb_exists_any: second arg must be List<Utf8>".into(),
898 )
899 })?;
900
901 let result: BooleanArray =
902 (0..jsonb_arr.len())
903 .map(|i| {
904 if jsonb_arr.is_null(i) || keys_arr.is_null(i) {
905 return None;
906 }
907 let jsonb = jsonb_arr.value(i);
908 let keys_list = keys_arr.value(i);
909 let keys = keys_list.as_any().downcast_ref::<StringArray>()?;
910 Some((0..keys.len()).any(|k| {
911 !keys.is_null(k) && json_types::jsonb_has_key(jsonb, keys.value(k))
912 }))
913 })
914 .collect();
915 Ok(ColumnarValue::Array(Arc::new(result)))
916 }
917}
918
919#[derive(Debug)]
928pub struct JsonbExistsAll {
929 signature: Signature,
930}
931
932impl JsonbExistsAll {
933 #[must_use]
935 pub fn new() -> Self {
936 Self {
937 signature: Signature::new(
938 TypeSignature::Exact(vec![
939 DataType::LargeBinary,
940 DataType::List(Arc::new(arrow_schema::Field::new(
941 "item",
942 DataType::Utf8,
943 true,
944 ))),
945 ]),
946 Volatility::Immutable,
947 ),
948 }
949 }
950}
951
952impl Default for JsonbExistsAll {
953 fn default() -> Self {
954 Self::new()
955 }
956}
957
958impl PartialEq for JsonbExistsAll {
959 fn eq(&self, _other: &Self) -> bool {
960 true
961 }
962}
963
964impl Eq for JsonbExistsAll {}
965
966impl Hash for JsonbExistsAll {
967 fn hash<H: Hasher>(&self, state: &mut H) {
968 "jsonb_exists_all".hash(state);
969 }
970}
971
972impl ScalarUDFImpl for JsonbExistsAll {
973 fn as_any(&self) -> &dyn Any {
974 self
975 }
976
977 fn name(&self) -> &'static str {
978 "jsonb_exists_all"
979 }
980
981 fn signature(&self) -> &Signature {
982 &self.signature
983 }
984
985 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
986 Ok(DataType::Boolean)
987 }
988
989 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
990 let expanded = expand_args(&args.args)?;
991 let jsonb_arr = expanded[0]
992 .as_any()
993 .downcast_ref::<LargeBinaryArray>()
994 .ok_or_else(|| {
995 datafusion_common::DataFusionError::Internal(
996 "jsonb_exists_all: first arg must be LargeBinary".into(),
997 )
998 })?;
999 let keys_arr = expanded[1]
1000 .as_any()
1001 .downcast_ref::<ListArray>()
1002 .ok_or_else(|| {
1003 datafusion_common::DataFusionError::Internal(
1004 "jsonb_exists_all: second arg must be List<Utf8>".into(),
1005 )
1006 })?;
1007
1008 let result: BooleanArray =
1009 (0..jsonb_arr.len())
1010 .map(|i| {
1011 if jsonb_arr.is_null(i) || keys_arr.is_null(i) {
1012 return None;
1013 }
1014 let jsonb = jsonb_arr.value(i);
1015 let keys_list = keys_arr.value(i);
1016 let keys = keys_list.as_any().downcast_ref::<StringArray>()?;
1017 Some((0..keys.len()).all(|k| {
1018 !keys.is_null(k) && json_types::jsonb_has_key(jsonb, keys.value(k))
1019 }))
1020 })
1021 .collect();
1022 Ok(ColumnarValue::Array(Arc::new(result)))
1023 }
1024}
1025
1026#[derive(Debug)]
1035pub struct JsonbContains {
1036 signature: Signature,
1037}
1038
1039impl JsonbContains {
1040 #[must_use]
1042 pub fn new() -> Self {
1043 Self {
1044 signature: Signature::new(
1045 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::LargeBinary]),
1046 Volatility::Immutable,
1047 ),
1048 }
1049 }
1050}
1051
1052impl Default for JsonbContains {
1053 fn default() -> Self {
1054 Self::new()
1055 }
1056}
1057
1058impl PartialEq for JsonbContains {
1059 fn eq(&self, _other: &Self) -> bool {
1060 true
1061 }
1062}
1063
1064impl Eq for JsonbContains {}
1065
1066impl Hash for JsonbContains {
1067 fn hash<H: Hasher>(&self, state: &mut H) {
1068 "jsonb_contains".hash(state);
1069 }
1070}
1071
1072impl ScalarUDFImpl for JsonbContains {
1073 fn as_any(&self) -> &dyn Any {
1074 self
1075 }
1076
1077 fn name(&self) -> &'static str {
1078 "jsonb_contains"
1079 }
1080
1081 fn signature(&self) -> &Signature {
1082 &self.signature
1083 }
1084
1085 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1086 Ok(DataType::Boolean)
1087 }
1088
1089 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1090 let expanded = expand_args(&args.args)?;
1091 let left_arr = expanded[0]
1092 .as_any()
1093 .downcast_ref::<LargeBinaryArray>()
1094 .ok_or_else(|| {
1095 datafusion_common::DataFusionError::Internal(
1096 "jsonb_contains: first arg must be LargeBinary".into(),
1097 )
1098 })?;
1099 let right_arr = expanded[1]
1100 .as_any()
1101 .downcast_ref::<LargeBinaryArray>()
1102 .ok_or_else(|| {
1103 datafusion_common::DataFusionError::Internal(
1104 "jsonb_contains: second arg must be LargeBinary".into(),
1105 )
1106 })?;
1107
1108 let result: BooleanArray = (0..left_arr.len())
1109 .map(|i| {
1110 if left_arr.is_null(i) || right_arr.is_null(i) {
1111 None
1112 } else {
1113 json_types::jsonb_contains(left_arr.value(i), right_arr.value(i))
1114 }
1115 })
1116 .collect();
1117 Ok(ColumnarValue::Array(Arc::new(result)))
1118 }
1119}
1120
1121#[derive(Debug)]
1130pub struct JsonbContainedBy {
1131 signature: Signature,
1132}
1133
1134impl JsonbContainedBy {
1135 #[must_use]
1137 pub fn new() -> Self {
1138 Self {
1139 signature: Signature::new(
1140 TypeSignature::Exact(vec![DataType::LargeBinary, DataType::LargeBinary]),
1141 Volatility::Immutable,
1142 ),
1143 }
1144 }
1145}
1146
1147impl Default for JsonbContainedBy {
1148 fn default() -> Self {
1149 Self::new()
1150 }
1151}
1152
1153impl PartialEq for JsonbContainedBy {
1154 fn eq(&self, _other: &Self) -> bool {
1155 true
1156 }
1157}
1158
1159impl Eq for JsonbContainedBy {}
1160
1161impl Hash for JsonbContainedBy {
1162 fn hash<H: Hasher>(&self, state: &mut H) {
1163 "jsonb_contained_by".hash(state);
1164 }
1165}
1166
1167impl ScalarUDFImpl for JsonbContainedBy {
1168 fn as_any(&self) -> &dyn Any {
1169 self
1170 }
1171
1172 fn name(&self) -> &'static str {
1173 "jsonb_contained_by"
1174 }
1175
1176 fn signature(&self) -> &Signature {
1177 &self.signature
1178 }
1179
1180 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1181 Ok(DataType::Boolean)
1182 }
1183
1184 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1185 let expanded = expand_args(&args.args)?;
1186 let left_arr = expanded[0]
1187 .as_any()
1188 .downcast_ref::<LargeBinaryArray>()
1189 .ok_or_else(|| {
1190 datafusion_common::DataFusionError::Internal(
1191 "jsonb_contained_by: first arg must be LargeBinary".into(),
1192 )
1193 })?;
1194 let right_arr = expanded[1]
1195 .as_any()
1196 .downcast_ref::<LargeBinaryArray>()
1197 .ok_or_else(|| {
1198 datafusion_common::DataFusionError::Internal(
1199 "jsonb_contained_by: second arg must be LargeBinary".into(),
1200 )
1201 })?;
1202
1203 let result: BooleanArray = (0..left_arr.len())
1205 .map(|i| {
1206 if left_arr.is_null(i) || right_arr.is_null(i) {
1207 None
1208 } else {
1209 json_types::jsonb_contains(right_arr.value(i), left_arr.value(i))
1210 }
1211 })
1212 .collect();
1213 Ok(ColumnarValue::Array(Arc::new(result)))
1214 }
1215}
1216
1217#[derive(Debug)]
1228pub struct JsonTypeof {
1229 signature: Signature,
1230}
1231
1232impl JsonTypeof {
1233 #[must_use]
1235 pub fn new() -> Self {
1236 Self {
1237 signature: Signature::new(
1238 TypeSignature::Exact(vec![DataType::LargeBinary]),
1239 Volatility::Immutable,
1240 ),
1241 }
1242 }
1243}
1244
1245impl Default for JsonTypeof {
1246 fn default() -> Self {
1247 Self::new()
1248 }
1249}
1250
1251impl PartialEq for JsonTypeof {
1252 fn eq(&self, _other: &Self) -> bool {
1253 true
1254 }
1255}
1256
1257impl Eq for JsonTypeof {}
1258
1259impl Hash for JsonTypeof {
1260 fn hash<H: Hasher>(&self, state: &mut H) {
1261 "json_typeof".hash(state);
1262 }
1263}
1264
1265impl ScalarUDFImpl for JsonTypeof {
1266 fn as_any(&self) -> &dyn Any {
1267 self
1268 }
1269
1270 fn name(&self) -> &'static str {
1271 "json_typeof"
1272 }
1273
1274 fn signature(&self) -> &Signature {
1275 &self.signature
1276 }
1277
1278 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1279 Ok(DataType::Utf8)
1280 }
1281
1282 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1283 let expanded = expand_args(&args.args)?;
1284 let jsonb_arr = expanded[0]
1285 .as_any()
1286 .downcast_ref::<LargeBinaryArray>()
1287 .ok_or_else(|| {
1288 datafusion_common::DataFusionError::Internal(
1289 "json_typeof: arg must be LargeBinary".into(),
1290 )
1291 })?;
1292
1293 let mut builder = StringBuilder::with_capacity(jsonb_arr.len(), jsonb_arr.len() * 8);
1294 for i in 0..jsonb_arr.len() {
1295 if jsonb_arr.is_null(i) {
1296 builder.append_null();
1297 } else {
1298 match json_types::jsonb_type_name(jsonb_arr.value(i)) {
1299 Some(name) => builder.append_value(name),
1300 None => builder.append_null(),
1301 }
1302 }
1303 }
1304 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1305 }
1306}
1307
1308#[derive(Debug)]
1317pub struct JsonBuildObject {
1318 signature: Signature,
1319}
1320
1321impl JsonBuildObject {
1322 #[must_use]
1324 pub fn new() -> Self {
1325 Self {
1326 signature: Signature::new(TypeSignature::VariadicAny, Volatility::Immutable),
1327 }
1328 }
1329}
1330
1331impl Default for JsonBuildObject {
1332 fn default() -> Self {
1333 Self::new()
1334 }
1335}
1336
1337impl PartialEq for JsonBuildObject {
1338 fn eq(&self, _other: &Self) -> bool {
1339 true
1340 }
1341}
1342
1343impl Eq for JsonBuildObject {}
1344
1345impl Hash for JsonBuildObject {
1346 fn hash<H: Hasher>(&self, state: &mut H) {
1347 "json_build_object".hash(state);
1348 }
1349}
1350
1351impl ScalarUDFImpl for JsonBuildObject {
1352 fn as_any(&self) -> &dyn Any {
1353 self
1354 }
1355
1356 fn name(&self) -> &'static str {
1357 "json_build_object"
1358 }
1359
1360 fn signature(&self) -> &Signature {
1361 &self.signature
1362 }
1363
1364 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1365 Ok(DataType::LargeBinary)
1366 }
1367
1368 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1369 if !args.args.len().is_multiple_of(2) {
1370 return Err(datafusion_common::DataFusionError::Execution(
1371 "json_build_object requires an even number of arguments".into(),
1372 ));
1373 }
1374
1375 let expanded = expand_args(&args.args)?;
1376 let len = expanded.first().map_or(1, Array::len);
1377
1378 let mut builder = LargeBinaryBuilder::with_capacity(len, 256);
1379 for row in 0..len {
1380 let mut obj = serde_json::Map::new();
1381 let mut is_null = false;
1382 for pair in expanded.chunks(2) {
1383 let key_arr = &pair[0];
1384 let val_arr = &pair[1];
1385 if key_arr.is_null(row) {
1386 is_null = true;
1387 break;
1388 }
1389 let key = scalar_to_json_key(key_arr, row)?;
1390 let val = scalar_to_json_value(val_arr, row);
1391 obj.insert(key, val);
1392 }
1393 if is_null {
1394 builder.append_null();
1395 } else {
1396 let jsonb = json_types::encode_jsonb(&serde_json::Value::Object(obj));
1397 builder.append_value(&jsonb);
1398 }
1399 }
1400 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1401 }
1402}
1403
1404#[derive(Debug)]
1413pub struct JsonBuildArray {
1414 signature: Signature,
1415}
1416
1417impl JsonBuildArray {
1418 #[must_use]
1420 pub fn new() -> Self {
1421 Self {
1422 signature: Signature::new(TypeSignature::VariadicAny, Volatility::Immutable),
1423 }
1424 }
1425}
1426
1427impl Default for JsonBuildArray {
1428 fn default() -> Self {
1429 Self::new()
1430 }
1431}
1432
1433impl PartialEq for JsonBuildArray {
1434 fn eq(&self, _other: &Self) -> bool {
1435 true
1436 }
1437}
1438
1439impl Eq for JsonBuildArray {}
1440
1441impl Hash for JsonBuildArray {
1442 fn hash<H: Hasher>(&self, state: &mut H) {
1443 "json_build_array".hash(state);
1444 }
1445}
1446
1447impl ScalarUDFImpl for JsonBuildArray {
1448 fn as_any(&self) -> &dyn Any {
1449 self
1450 }
1451
1452 fn name(&self) -> &'static str {
1453 "json_build_array"
1454 }
1455
1456 fn signature(&self) -> &Signature {
1457 &self.signature
1458 }
1459
1460 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1461 Ok(DataType::LargeBinary)
1462 }
1463
1464 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1465 let expanded = expand_args(&args.args)?;
1466 let len = expanded.first().map_or(1, Array::len);
1467
1468 let mut builder = LargeBinaryBuilder::with_capacity(len, 256);
1469 for row in 0..len {
1470 let mut arr = Vec::with_capacity(expanded.len());
1471 for col in &expanded {
1472 arr.push(scalar_to_json_value(col, row));
1473 }
1474 let jsonb = json_types::encode_jsonb(&serde_json::Value::Array(arr));
1475 builder.append_value(&jsonb);
1476 }
1477 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1478 }
1479}
1480
1481#[derive(Debug)]
1489pub struct ToJsonb {
1490 signature: Signature,
1491}
1492
1493impl ToJsonb {
1494 #[must_use]
1496 pub fn new() -> Self {
1497 Self {
1498 signature: Signature::new(TypeSignature::Any(1), Volatility::Immutable),
1499 }
1500 }
1501}
1502
1503impl Default for ToJsonb {
1504 fn default() -> Self {
1505 Self::new()
1506 }
1507}
1508
1509impl PartialEq for ToJsonb {
1510 fn eq(&self, _other: &Self) -> bool {
1511 true
1512 }
1513}
1514
1515impl Eq for ToJsonb {}
1516
1517impl Hash for ToJsonb {
1518 fn hash<H: Hasher>(&self, state: &mut H) {
1519 "to_jsonb".hash(state);
1520 }
1521}
1522
1523impl ScalarUDFImpl for ToJsonb {
1524 fn as_any(&self) -> &dyn Any {
1525 self
1526 }
1527
1528 fn name(&self) -> &'static str {
1529 "to_jsonb"
1530 }
1531
1532 fn signature(&self) -> &Signature {
1533 &self.signature
1534 }
1535
1536 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType> {
1537 Ok(DataType::LargeBinary)
1538 }
1539
1540 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue> {
1541 let expanded = expand_args(&args.args)?;
1542 let arr = &expanded[0];
1543 let len = arr.len();
1544
1545 let mut builder = LargeBinaryBuilder::with_capacity(len, 64);
1546 for row in 0..len {
1547 if arr.is_null(row) {
1548 let jsonb = json_types::encode_jsonb(&serde_json::Value::Null);
1549 builder.append_value(&jsonb);
1550 } else {
1551 let val = scalar_to_json_value(arr, row);
1552 let jsonb = json_types::encode_jsonb(&val);
1553 builder.append_value(&jsonb);
1554 }
1555 }
1556 Ok(ColumnarValue::Array(Arc::new(builder.finish())))
1557 }
1558}
1559
1560fn scalar_to_json_key(arr: &ArrayRef, row: usize) -> Result<String> {
1564 if let Some(s) = arr.as_any().downcast_ref::<StringArray>() {
1565 return Ok(s.value(row).to_owned());
1566 }
1567 Err(datafusion_common::DataFusionError::Execution(
1569 "json_build_object keys must be text".into(),
1570 ))
1571}
1572
1573fn scalar_to_json_value(arr: &ArrayRef, row: usize) -> serde_json::Value {
1575 if arr.is_null(row) {
1576 return serde_json::Value::Null;
1577 }
1578
1579 if let Some(a) = arr.as_any().downcast_ref::<StringArray>() {
1581 return serde_json::Value::String(a.value(row).to_owned());
1582 }
1583 if let Some(a) = arr.as_any().downcast_ref::<arrow_array::Int64Array>() {
1584 return serde_json::Value::Number(a.value(row).into());
1585 }
1586 if let Some(a) = arr.as_any().downcast_ref::<arrow_array::Int32Array>() {
1587 return serde_json::Value::Number(i64::from(a.value(row)).into());
1588 }
1589 if let Some(a) = arr.as_any().downcast_ref::<arrow_array::Float64Array>() {
1590 if let Some(n) = serde_json::Number::from_f64(a.value(row)) {
1591 return serde_json::Value::Number(n);
1592 }
1593 return serde_json::Value::Null;
1594 }
1595 if let Some(a) = arr.as_any().downcast_ref::<BooleanArray>() {
1596 return serde_json::Value::Bool(a.value(row));
1597 }
1598 if let Some(a) = arr.as_any().downcast_ref::<LargeBinaryArray>() {
1599 let bytes = a.value(row);
1601 if let Some(text) = json_types::jsonb_to_text(bytes) {
1602 if let Ok(val) = serde_json::from_str::<serde_json::Value>(&text) {
1604 return val;
1605 }
1606 return serde_json::Value::String(text);
1607 }
1608 return serde_json::Value::Null;
1609 }
1610
1611 let scalar = datafusion_common::ScalarValue::try_from_array(arr, row).ok();
1613 match scalar {
1614 Some(s) => serde_json::Value::String(s.to_string()),
1615 None => serde_json::Value::Null,
1616 }
1617}
1618
1619#[cfg(test)]
1624mod tests {
1625 use super::*;
1626 use arrow_schema::Field;
1627 use datafusion_common::config::ConfigOptions;
1628 use datafusion_expr::ScalarUDF;
1629
1630 fn enc(v: &serde_json::Value) -> Vec<u8> {
1631 json_types::encode_jsonb(v)
1632 }
1633
1634 fn make_jsonb_array(vals: &[serde_json::Value]) -> LargeBinaryArray {
1635 let encoded: Vec<Vec<u8>> = vals.iter().map(enc).collect();
1636 let refs: Vec<&[u8]> = encoded.iter().map(Vec::as_slice).collect();
1637 LargeBinaryArray::from_iter_values(refs)
1638 }
1639
1640 fn make_string_array(vals: &[&str]) -> StringArray {
1641 StringArray::from(vals.to_vec())
1642 }
1643
1644 fn make_args_2(a: ArrayRef, b: ArrayRef) -> ScalarFunctionArgs {
1645 ScalarFunctionArgs {
1646 args: vec![ColumnarValue::Array(a), ColumnarValue::Array(b)],
1647 arg_fields: vec![],
1648 number_rows: 0,
1649 return_field: Arc::new(Field::new("output", DataType::LargeBinary, true)),
1650 config_options: Arc::new(ConfigOptions::default()),
1651 }
1652 }
1653
1654 fn make_args_1(a: ArrayRef) -> ScalarFunctionArgs {
1655 ScalarFunctionArgs {
1656 args: vec![ColumnarValue::Array(a)],
1657 arg_fields: vec![],
1658 number_rows: 0,
1659 return_field: Arc::new(Field::new("output", DataType::Utf8, true)),
1660 config_options: Arc::new(ConfigOptions::default()),
1661 }
1662 }
1663
1664 #[test]
1667 fn test_jsonb_get_object_field() {
1668 let udf = JsonbGet::new();
1669 let jsonb = make_jsonb_array(&[serde_json::json!({"name": "Alice", "age": 30})]);
1670 let keys = make_string_array(&["name"]);
1671 let result = udf
1672 .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(keys)))
1673 .unwrap();
1674
1675 let ColumnarValue::Array(arr) = result else {
1676 panic!("expected array")
1677 };
1678 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1679 assert!(!bin.is_null(0));
1680 let val = bin.value(0);
1681 assert_eq!(json_types::jsonb_to_text(val), Some("Alice".to_owned()));
1682 }
1683
1684 #[test]
1685 fn test_jsonb_get_missing_key() {
1686 let udf = JsonbGet::new();
1687 let jsonb = make_jsonb_array(&[serde_json::json!({"a": 1})]);
1688 let keys = make_string_array(&["missing"]);
1689 let result = udf
1690 .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(keys)))
1691 .unwrap();
1692 let ColumnarValue::Array(arr) = result else {
1693 panic!("expected array")
1694 };
1695 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1696 assert!(bin.is_null(0));
1697 }
1698
1699 #[test]
1702 fn test_jsonb_get_idx() {
1703 let udf = JsonbGetIdx::new();
1704 let jsonb = make_jsonb_array(&[serde_json::json!([10, 20, 30])]);
1705 let idxs = arrow_array::Int32Array::from(vec![1]);
1706 let result = udf
1707 .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(idxs)))
1708 .unwrap();
1709 let ColumnarValue::Array(arr) = result else {
1710 panic!("expected array")
1711 };
1712 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1713 let val = bin.value(0);
1714 assert_eq!(json_types::jsonb_to_text(val), Some("20".to_owned()));
1715 }
1716
1717 #[test]
1718 fn test_jsonb_get_idx_out_of_bounds() {
1719 let udf = JsonbGetIdx::new();
1720 let jsonb = make_jsonb_array(&[serde_json::json!([1, 2, 3])]);
1721 let idxs = arrow_array::Int32Array::from(vec![10]);
1722 let result = udf
1723 .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(idxs)))
1724 .unwrap();
1725 let ColumnarValue::Array(arr) = result else {
1726 panic!("expected array")
1727 };
1728 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1729 assert!(bin.is_null(0));
1730 }
1731
1732 #[test]
1735 fn test_jsonb_get_text_string() {
1736 let udf = JsonbGetText::new();
1737 let jsonb = make_jsonb_array(&[serde_json::json!({"name": "Alice"})]);
1738 let keys = make_string_array(&["name"]);
1739 let result = udf
1740 .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(keys)))
1741 .unwrap();
1742 let ColumnarValue::Array(arr) = result else {
1743 panic!("expected array")
1744 };
1745 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1746 assert_eq!(str_arr.value(0), "Alice");
1747 }
1748
1749 #[test]
1750 fn test_jsonb_get_text_number() {
1751 let udf = JsonbGetText::new();
1752 let jsonb = make_jsonb_array(&[serde_json::json!({"age": 30})]);
1753 let keys = make_string_array(&["age"]);
1754 let result = udf
1755 .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(keys)))
1756 .unwrap();
1757 let ColumnarValue::Array(arr) = result else {
1758 panic!("expected array")
1759 };
1760 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1761 assert_eq!(str_arr.value(0), "30");
1762 }
1763
1764 #[test]
1767 fn test_jsonb_get_text_idx() {
1768 let udf = JsonbGetTextIdx::new();
1769 let jsonb = make_jsonb_array(&[serde_json::json!([10, 20, 30])]);
1770 let idxs = arrow_array::Int32Array::from(vec![2]);
1771 let result = udf
1772 .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(idxs)))
1773 .unwrap();
1774 let ColumnarValue::Array(arr) = result else {
1775 panic!("expected array")
1776 };
1777 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1778 assert_eq!(str_arr.value(0), "30");
1779 }
1780
1781 #[test]
1784 fn test_jsonb_exists_true() {
1785 let udf = JsonbExists::new();
1786 let jsonb = make_jsonb_array(&[serde_json::json!({"name": "Alice", "age": 30})]);
1787 let keys = make_string_array(&["name"]);
1788 let result = udf
1789 .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(keys)))
1790 .unwrap();
1791 let ColumnarValue::Array(arr) = result else {
1792 panic!("expected array")
1793 };
1794 let bool_arr = arr.as_any().downcast_ref::<BooleanArray>().unwrap();
1795 assert!(bool_arr.value(0));
1796 }
1797
1798 #[test]
1799 fn test_jsonb_exists_false() {
1800 let udf = JsonbExists::new();
1801 let jsonb = make_jsonb_array(&[serde_json::json!({"name": "Alice"})]);
1802 let keys = make_string_array(&["missing"]);
1803 let result = udf
1804 .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(keys)))
1805 .unwrap();
1806 let ColumnarValue::Array(arr) = result else {
1807 panic!("expected array")
1808 };
1809 let bool_arr = arr.as_any().downcast_ref::<BooleanArray>().unwrap();
1810 assert!(!bool_arr.value(0));
1811 }
1812
1813 #[test]
1816 fn test_jsonb_contains_true() {
1817 let udf = JsonbContains::new();
1818 let left = make_jsonb_array(&[serde_json::json!({"a": 1, "b": 2, "c": 3})]);
1819 let right = make_jsonb_array(&[serde_json::json!({"a": 1, "c": 3})]);
1820 let result = udf
1821 .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1822 .unwrap();
1823 let ColumnarValue::Array(arr) = result else {
1824 panic!("expected array")
1825 };
1826 let bool_arr = arr.as_any().downcast_ref::<BooleanArray>().unwrap();
1827 assert!(bool_arr.value(0));
1828 }
1829
1830 #[test]
1831 fn test_jsonb_contains_false() {
1832 let udf = JsonbContains::new();
1833 let left = make_jsonb_array(&[serde_json::json!({"a": 1})]);
1834 let right = make_jsonb_array(&[serde_json::json!({"a": 1, "b": 2})]);
1835 let result = udf
1836 .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1837 .unwrap();
1838 let ColumnarValue::Array(arr) = result else {
1839 panic!("expected array")
1840 };
1841 let bool_arr = arr.as_any().downcast_ref::<BooleanArray>().unwrap();
1842 assert!(!bool_arr.value(0));
1843 }
1844
1845 #[test]
1848 fn test_jsonb_contained_by() {
1849 let udf = JsonbContainedBy::new();
1850 let left = make_jsonb_array(&[serde_json::json!({"a": 1})]);
1851 let right = make_jsonb_array(&[serde_json::json!({"a": 1, "b": 2})]);
1852 let result = udf
1853 .invoke_with_args(make_args_2(Arc::new(left), Arc::new(right)))
1854 .unwrap();
1855 let ColumnarValue::Array(arr) = result else {
1856 panic!("expected array")
1857 };
1858 let bool_arr = arr.as_any().downcast_ref::<BooleanArray>().unwrap();
1859 assert!(bool_arr.value(0));
1860 }
1861
1862 #[test]
1865 fn test_json_typeof_all_types() {
1866 let udf = JsonTypeof::new();
1867 let jsonb = make_jsonb_array(&[
1868 serde_json::json!({"a": 1}),
1869 serde_json::json!([1, 2]),
1870 serde_json::json!("hello"),
1871 serde_json::json!(42),
1872 serde_json::json!(true),
1873 serde_json::json!(null),
1874 ]);
1875 let result = udf.invoke_with_args(make_args_1(Arc::new(jsonb))).unwrap();
1876 let ColumnarValue::Array(arr) = result else {
1877 panic!("expected array")
1878 };
1879 let str_arr = arr.as_any().downcast_ref::<StringArray>().unwrap();
1880 assert_eq!(str_arr.value(0), "object");
1881 assert_eq!(str_arr.value(1), "array");
1882 assert_eq!(str_arr.value(2), "string");
1883 assert_eq!(str_arr.value(3), "number");
1884 assert_eq!(str_arr.value(4), "boolean");
1885 assert_eq!(str_arr.value(5), "null");
1886 }
1887
1888 #[test]
1891 fn test_json_build_object() {
1892 let udf = JsonBuildObject::new();
1893 let keys = Arc::new(make_string_array(&["name"])) as ArrayRef;
1894 let vals = Arc::new(make_string_array(&["Alice"])) as ArrayRef;
1895 let args = ScalarFunctionArgs {
1896 args: vec![ColumnarValue::Array(keys), ColumnarValue::Array(vals)],
1897 arg_fields: vec![],
1898 number_rows: 0,
1899 return_field: Arc::new(Field::new("output", DataType::LargeBinary, true)),
1900 config_options: Arc::new(ConfigOptions::default()),
1901 };
1902 let result = udf.invoke_with_args(args).unwrap();
1903 let ColumnarValue::Array(arr) = result else {
1904 panic!("expected array")
1905 };
1906 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1907 let val = bin.value(0);
1908 let name = json_types::jsonb_get_field(val, "name").unwrap();
1910 assert_eq!(json_types::jsonb_to_text(name), Some("Alice".to_owned()));
1911 }
1912
1913 #[test]
1914 fn test_json_build_object_odd_args() {
1915 let udf = JsonBuildObject::new();
1916 let a = Arc::new(make_string_array(&["key"])) as ArrayRef;
1917 let args = ScalarFunctionArgs {
1918 args: vec![ColumnarValue::Array(a)],
1919 arg_fields: vec![],
1920 number_rows: 0,
1921 return_field: Arc::new(Field::new("output", DataType::LargeBinary, true)),
1922 config_options: Arc::new(ConfigOptions::default()),
1923 };
1924 assert!(udf.invoke_with_args(args).is_err());
1925 }
1926
1927 #[test]
1930 fn test_json_build_array() {
1931 let udf = JsonBuildArray::new();
1932 let a = Arc::new(arrow_array::Int64Array::from(vec![1])) as ArrayRef;
1933 let b = Arc::new(make_string_array(&["two"])) as ArrayRef;
1934 let args = ScalarFunctionArgs {
1935 args: vec![ColumnarValue::Array(a), ColumnarValue::Array(b)],
1936 arg_fields: vec![],
1937 number_rows: 0,
1938 return_field: Arc::new(Field::new("output", DataType::LargeBinary, true)),
1939 config_options: Arc::new(ConfigOptions::default()),
1940 };
1941 let result = udf.invoke_with_args(args).unwrap();
1942 let ColumnarValue::Array(arr) = result else {
1943 panic!("expected array")
1944 };
1945 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1946 let val = bin.value(0);
1947 assert_eq!(json_types::jsonb_type_name(val), Some("array"));
1948 let elem0 = json_types::jsonb_array_get(val, 0).unwrap();
1949 assert_eq!(json_types::jsonb_to_text(elem0), Some("1".to_owned()));
1950 let elem1 = json_types::jsonb_array_get(val, 1).unwrap();
1951 assert_eq!(json_types::jsonb_to_text(elem1), Some("two".to_owned()));
1952 }
1953
1954 #[test]
1957 fn test_to_jsonb_int() {
1958 let udf = ToJsonb::new();
1959 let a = Arc::new(arrow_array::Int64Array::from(vec![42])) as ArrayRef;
1960 let result = udf.invoke_with_args(make_args_1(a)).unwrap();
1961 let ColumnarValue::Array(arr) = result else {
1962 panic!("expected array")
1963 };
1964 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1965 let val = bin.value(0);
1966 assert_eq!(json_types::jsonb_to_text(val), Some("42".to_owned()));
1967 }
1968
1969 #[test]
1970 fn test_to_jsonb_string() {
1971 let udf = ToJsonb::new();
1972 let a = Arc::new(make_string_array(&["hello"])) as ArrayRef;
1973 let result = udf.invoke_with_args(make_args_1(a)).unwrap();
1974 let ColumnarValue::Array(arr) = result else {
1975 panic!("expected array")
1976 };
1977 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
1978 let val = bin.value(0);
1979 assert_eq!(json_types::jsonb_to_text(val), Some("hello".to_owned()));
1980 }
1981
1982 #[test]
1985 fn test_all_udfs_register() {
1986 let names = [
1987 ScalarUDF::new_from_impl(JsonbGet::new()).name().to_owned(),
1988 ScalarUDF::new_from_impl(JsonbGetIdx::new())
1989 .name()
1990 .to_owned(),
1991 ScalarUDF::new_from_impl(JsonbGetText::new())
1992 .name()
1993 .to_owned(),
1994 ScalarUDF::new_from_impl(JsonbGetTextIdx::new())
1995 .name()
1996 .to_owned(),
1997 ScalarUDF::new_from_impl(JsonbExists::new())
1998 .name()
1999 .to_owned(),
2000 ScalarUDF::new_from_impl(JsonbContains::new())
2001 .name()
2002 .to_owned(),
2003 ScalarUDF::new_from_impl(JsonbContainedBy::new())
2004 .name()
2005 .to_owned(),
2006 ScalarUDF::new_from_impl(JsonTypeof::new())
2007 .name()
2008 .to_owned(),
2009 ScalarUDF::new_from_impl(JsonBuildObject::new())
2010 .name()
2011 .to_owned(),
2012 ScalarUDF::new_from_impl(JsonBuildArray::new())
2013 .name()
2014 .to_owned(),
2015 ScalarUDF::new_from_impl(ToJsonb::new()).name().to_owned(),
2016 ];
2017 for name in &names {
2018 assert!(!name.is_empty(), "UDF has empty name");
2019 }
2020 assert_eq!(names.len(), 11);
2021 }
2022
2023 #[test]
2026 fn test_nested_extraction() {
2027 let data = serde_json::json!({
2029 "user": {"address": {"city": "London"}}
2030 });
2031 let jsonb_bytes = enc(&data);
2032
2033 let user = json_types::jsonb_get_field(&jsonb_bytes, "user").unwrap();
2035 let addr = json_types::jsonb_get_field(user, "address").unwrap();
2037 let city = json_types::jsonb_to_text(json_types::jsonb_get_field(addr, "city").unwrap());
2039 assert_eq!(city, Some("London".to_owned()));
2040 }
2041
2042 #[test]
2045 fn test_jsonb_get_multiple_rows() {
2046 let udf = JsonbGet::new();
2047 let jsonb = make_jsonb_array(&[
2048 serde_json::json!({"name": "Alice"}),
2049 serde_json::json!({"name": "Bob"}),
2050 serde_json::json!({"age": 30}),
2051 ]);
2052 let keys = make_string_array(&["name", "name", "name"]);
2053 let result = udf
2054 .invoke_with_args(make_args_2(Arc::new(jsonb), Arc::new(keys)))
2055 .unwrap();
2056 let ColumnarValue::Array(arr) = result else {
2057 panic!("expected array")
2058 };
2059 let bin = arr.as_any().downcast_ref::<LargeBinaryArray>().unwrap();
2060 assert!(!bin.is_null(0)); assert!(!bin.is_null(1)); assert!(bin.is_null(2)); }
2064}