1use arrow_array::builder::{
5 BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder,
6};
7use arrow_array::{Array, ArrayRef, LargeBinaryArray, StringArray};
8use arrow_schema::DataType;
9use datafusion::error::{DataFusionError, Result};
10use datafusion::logical_expr::{ScalarUDF, Volatility};
11use datafusion::physical_plan::ColumnarValue;
12use datafusion::prelude::create_udf;
13use std::sync::Arc;
14
15mod common {
17 use super::*;
18
19 #[derive(Debug, Clone)]
21 pub enum KeyType {
22 Field(String),
23 Index(usize),
24 }
25
26 impl KeyType {
27 pub fn parse(key: &str) -> Self {
29 if let Ok(index) = key.parse::<usize>() {
30 Self::Index(index)
31 } else {
32 Self::Field(key.to_string())
33 }
34 }
35 }
36
37 pub fn columnar_to_arrays(args: &[ColumnarValue]) -> Vec<ArrayRef> {
43 args.iter()
44 .map(|arg| match arg {
45 ColumnarValue::Array(arr) => arr.clone(),
46 ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(),
47 })
48 .collect()
49 }
50
51 pub fn execution_error(msg: impl Into<String>) -> DataFusionError {
53 DataFusionError::Execution(msg.into())
54 }
55
56 pub fn validate_arg_count(
58 args: &[ArrayRef],
59 expected: usize,
60 function_name: &str,
61 ) -> Result<()> {
62 if args.len() != expected {
63 return Err(execution_error(format!(
64 "{} requires exactly {} arguments",
65 function_name, expected
66 )));
67 }
68 Ok(())
69 }
70
71 pub fn extract_jsonb_array(args: &[ArrayRef]) -> Result<&LargeBinaryArray> {
73 args[0]
74 .as_any()
75 .downcast_ref::<LargeBinaryArray>()
76 .ok_or_else(|| execution_error("First argument must be LargeBinary"))
77 }
78
79 pub fn extract_string_array(args: &[ArrayRef], arg_index: usize) -> Result<&StringArray> {
81 args[arg_index]
82 .as_any()
83 .downcast_ref::<StringArray>()
84 .ok_or_else(|| execution_error(format!("Argument {} must be String", arg_index + 1)))
85 }
86
87 pub fn get_string_value_at(string_array: &StringArray, index: usize) -> Option<&str> {
91 let actual_index = if string_array.len() == 1 { 0 } else { index };
93
94 if string_array.is_null(actual_index) {
95 None
96 } else {
97 Some(string_array.value(actual_index))
98 }
99 }
100
101 pub fn get_json_value_by_key(
103 raw_jsonb: &jsonb::RawJsonb,
104 key_type: &KeyType,
105 ) -> Result<Option<jsonb::OwnedJsonb>> {
106 match key_type {
107 KeyType::Field(field) => raw_jsonb
108 .get_by_name(field, false)
109 .map_err(|e| execution_error(format!("Failed to get field '{}': {}", field, e))),
110 KeyType::Index(index) => raw_jsonb.get_by_index(*index).map_err(|e| {
111 execution_error(format!("Failed to get array element [{}]: {}", index, e))
112 }),
113 }
114 }
115
116 pub fn parse_json_path(path: &str) -> Result<jsonb::jsonpath::JsonPath<'_>> {
118 jsonb::jsonpath::parse_json_path(path.as_bytes())
119 .map_err(|e| execution_error(format!("Invalid JSONPath '{}': {}", path, e)))
120 }
121}
122
123fn json_value_to_string(value: jsonb::OwnedJsonb) -> Result<Option<String>> {
125 let raw_jsonb = value.as_raw();
126
127 if raw_jsonb
129 .is_null()
130 .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
131 {
132 return Ok(None);
133 }
134
135 raw_jsonb
137 .to_str()
138 .map(Some)
139 .map_err(|e| common::execution_error(format!("Failed to convert to string: {}", e)))
140}
141
142fn json_value_to_int(value: jsonb::OwnedJsonb) -> Result<Option<i64>> {
144 let raw_jsonb = value.as_raw();
145
146 if raw_jsonb
148 .is_null()
149 .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
150 {
151 return Ok(None);
152 }
153
154 raw_jsonb
156 .to_i64()
157 .map(Some)
158 .map_err(|e| common::execution_error(format!("Failed to convert to integer: {}", e)))
159}
160
161fn json_value_to_float(value: jsonb::OwnedJsonb) -> Result<Option<f64>> {
163 let raw_jsonb = value.as_raw();
164
165 if raw_jsonb
167 .is_null()
168 .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
169 {
170 return Ok(None);
171 }
172
173 raw_jsonb
175 .to_f64()
176 .map(Some)
177 .map_err(|e| common::execution_error(format!("Failed to convert to float: {}", e)))
178}
179
180fn json_value_to_bool(value: jsonb::OwnedJsonb) -> Result<Option<bool>> {
182 let raw_jsonb = value.as_raw();
183
184 if raw_jsonb
186 .is_null()
187 .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
188 {
189 return Ok(None);
190 }
191
192 raw_jsonb
194 .to_bool()
195 .map(Some)
196 .map_err(|e| common::execution_error(format!("Failed to convert to boolean: {}", e)))
197}
198
199pub fn json_extract_udf() -> ScalarUDF {
208 create_udf(
209 "json_extract",
210 vec![DataType::LargeBinary, DataType::Utf8],
211 DataType::Utf8,
212 Volatility::Immutable,
213 Arc::new(json_extract_columnar_impl),
214 )
215}
216
217fn json_extract_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
219 let arrays = common::columnar_to_arrays(args);
220 let result = json_extract_impl(&arrays)?;
221 Ok(ColumnarValue::Array(result))
222}
223
224fn json_extract_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
226 common::validate_arg_count(args, 2, "json_extract")?;
227
228 let jsonb_array = common::extract_jsonb_array(args)?;
229 let path_array = common::extract_string_array(args, 1)?;
230 let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
231
232 for i in 0..jsonb_array.len() {
233 if jsonb_array.is_null(i) {
234 builder.append_null();
235 } else if let Some(path) = common::get_string_value_at(path_array, i) {
236 let jsonb_bytes = jsonb_array.value(i);
237 match extract_json_path(jsonb_bytes, path)? {
238 Some(value) => builder.append_value(&value),
239 None => builder.append_null(),
240 }
241 } else {
242 builder.append_null();
243 }
244 }
245
246 Ok(Arc::new(builder.finish()))
247}
248
249fn extract_json_path(jsonb_bytes: &[u8], path: &str) -> Result<Option<String>> {
253 let json_path = common::parse_json_path(path)?;
254
255 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
256 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
257 match selector.select_values(&json_path) {
258 Ok(values) => {
259 if values.is_empty() {
260 Ok(None)
261 } else {
262 Ok(Some(values[0].to_string()))
264 }
265 }
266 Err(e) => Err(common::execution_error(format!(
267 "Failed to select values from path '{}': {}",
268 path, e
269 ))),
270 }
271}
272
273pub fn json_exists_udf() -> ScalarUDF {
282 create_udf(
283 "json_exists",
284 vec![DataType::LargeBinary, DataType::Utf8],
285 DataType::Boolean,
286 Volatility::Immutable,
287 Arc::new(json_exists_columnar_impl),
288 )
289}
290
291fn json_exists_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
293 let arrays = common::columnar_to_arrays(args);
294 let result = json_exists_impl(&arrays)?;
295 Ok(ColumnarValue::Array(result))
296}
297
298fn json_exists_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
300 common::validate_arg_count(args, 2, "json_exists")?;
301
302 let jsonb_array = common::extract_jsonb_array(args)?;
303 let path_array = common::extract_string_array(args, 1)?;
304
305 let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
306
307 for i in 0..jsonb_array.len() {
308 if jsonb_array.is_null(i) {
309 builder.append_null();
310 } else if let Some(path) = common::get_string_value_at(path_array, i) {
311 let jsonb_bytes = jsonb_array.value(i);
312 let exists = check_json_path_exists(jsonb_bytes, path)?;
313 builder.append_value(exists);
314 } else {
315 builder.append_null();
316 }
317 }
318
319 Ok(Arc::new(builder.finish()))
320}
321
322fn check_json_path_exists(jsonb_bytes: &[u8], path: &str) -> Result<bool> {
324 let json_path = common::parse_json_path(path)?;
325
326 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
327 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
328 match selector.exists(&json_path) {
329 Ok(exists) => Ok(exists),
330 Err(e) => Err(common::execution_error(format!(
331 "Failed to check existence of path '{}': {}",
332 path, e
333 ))),
334 }
335}
336
337pub fn json_get_udf() -> ScalarUDF {
346 create_udf(
347 "json_get",
348 vec![DataType::LargeBinary, DataType::Utf8],
349 DataType::LargeBinary,
350 Volatility::Immutable,
351 Arc::new(json_get_columnar_impl),
352 )
353}
354
355fn json_get_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
357 let arrays = common::columnar_to_arrays(args);
358 let result = json_get_impl(&arrays)?;
359 Ok(ColumnarValue::Array(result))
360}
361
362fn json_get_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
364 common::validate_arg_count(args, 2, "json_get")?;
365
366 let jsonb_array = common::extract_jsonb_array(args)?;
367 let key_array = common::extract_string_array(args, 1)?;
368
369 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 0);
370
371 for i in 0..jsonb_array.len() {
372 if jsonb_array.is_null(i) {
373 builder.append_null();
374 } else if let Some(key) = common::get_string_value_at(key_array, i) {
375 let jsonb_bytes = jsonb_array.value(i);
376 let key_type = common::KeyType::parse(key);
377 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
378
379 match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
380 Some(value) => builder.append_value(value.as_raw().as_ref()),
381 None => builder.append_null(),
382 }
383 } else {
384 builder.append_null();
385 }
386 }
387
388 Ok(Arc::new(builder.finish()))
389}
390
391pub fn json_get_string_udf() -> ScalarUDF {
400 create_udf(
401 "json_get_string",
402 vec![DataType::LargeBinary, DataType::Utf8],
403 DataType::Utf8,
404 Volatility::Immutable,
405 Arc::new(json_get_string_columnar_impl),
406 )
407}
408
409fn json_get_string_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
411 let arrays = common::columnar_to_arrays(args);
412 let result = json_get_string_impl(&arrays)?;
413 Ok(ColumnarValue::Array(result))
414}
415
416fn json_get_string_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
418 common::validate_arg_count(args, 2, "json_get_string")?;
419
420 let jsonb_array = common::extract_jsonb_array(args)?;
421 let key_array = common::extract_string_array(args, 1)?;
422
423 let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
424
425 for i in 0..jsonb_array.len() {
426 if jsonb_array.is_null(i) {
427 builder.append_null();
428 } else if let Some(key) = common::get_string_value_at(key_array, i) {
429 let jsonb_bytes = jsonb_array.value(i);
430 let key_type = common::KeyType::parse(key);
431 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
432
433 match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
434 Some(value) => match json_value_to_string(value)? {
435 Some(string_val) => builder.append_value(&string_val),
436 None => builder.append_null(),
437 },
438 None => builder.append_null(),
439 }
440 } else {
441 builder.append_null();
442 }
443 }
444
445 Ok(Arc::new(builder.finish()))
446}
447
448pub fn json_get_int_udf() -> ScalarUDF {
457 create_udf(
458 "json_get_int",
459 vec![DataType::LargeBinary, DataType::Utf8],
460 DataType::Int64,
461 Volatility::Immutable,
462 Arc::new(json_get_int_columnar_impl),
463 )
464}
465
466fn json_get_int_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
468 let arrays = common::columnar_to_arrays(args);
469 let result = json_get_int_impl(&arrays)?;
470 Ok(ColumnarValue::Array(result))
471}
472
473fn json_get_int_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
475 common::validate_arg_count(args, 2, "json_get_int")?;
476
477 let jsonb_array = common::extract_jsonb_array(args)?;
478 let key_array = common::extract_string_array(args, 1)?;
479
480 let mut builder = Int64Builder::with_capacity(jsonb_array.len());
481
482 for i in 0..jsonb_array.len() {
483 if jsonb_array.is_null(i) {
484 builder.append_null();
485 } else if let Some(key) = common::get_string_value_at(key_array, i) {
486 let jsonb_bytes = jsonb_array.value(i);
487 let key_type = common::KeyType::parse(key);
488 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
489
490 match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
491 Some(value) => match json_value_to_int(value)? {
492 Some(int_val) => builder.append_value(int_val),
493 None => builder.append_null(),
494 },
495 None => builder.append_null(),
496 }
497 } else {
498 builder.append_null();
499 }
500 }
501
502 Ok(Arc::new(builder.finish()))
503}
504
505pub fn json_get_float_udf() -> ScalarUDF {
514 create_udf(
515 "json_get_float",
516 vec![DataType::LargeBinary, DataType::Utf8],
517 DataType::Float64,
518 Volatility::Immutable,
519 Arc::new(json_get_float_columnar_impl),
520 )
521}
522
523fn json_get_float_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
525 let arrays = common::columnar_to_arrays(args);
526 let result = json_get_float_impl(&arrays)?;
527 Ok(ColumnarValue::Array(result))
528}
529
530fn json_get_float_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
532 common::validate_arg_count(args, 2, "json_get_float")?;
533
534 let jsonb_array = common::extract_jsonb_array(args)?;
535 let key_array = common::extract_string_array(args, 1)?;
536
537 let mut builder = Float64Builder::with_capacity(jsonb_array.len());
538
539 for i in 0..jsonb_array.len() {
540 if jsonb_array.is_null(i) {
541 builder.append_null();
542 } else if let Some(key) = common::get_string_value_at(key_array, i) {
543 let jsonb_bytes = jsonb_array.value(i);
544 let key_type = common::KeyType::parse(key);
545 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
546
547 match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
548 Some(value) => match json_value_to_float(value)? {
549 Some(float_val) => builder.append_value(float_val),
550 None => builder.append_null(),
551 },
552 None => builder.append_null(),
553 }
554 } else {
555 builder.append_null();
556 }
557 }
558
559 Ok(Arc::new(builder.finish()))
560}
561
562pub fn json_get_bool_udf() -> ScalarUDF {
571 create_udf(
572 "json_get_bool",
573 vec![DataType::LargeBinary, DataType::Utf8],
574 DataType::Boolean,
575 Volatility::Immutable,
576 Arc::new(json_get_bool_columnar_impl),
577 )
578}
579
580fn json_get_bool_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
582 let arrays = common::columnar_to_arrays(args);
583 let result = json_get_bool_impl(&arrays)?;
584 Ok(ColumnarValue::Array(result))
585}
586
587fn json_get_bool_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
589 common::validate_arg_count(args, 2, "json_get_bool")?;
590
591 let jsonb_array = common::extract_jsonb_array(args)?;
592 let key_array = common::extract_string_array(args, 1)?;
593
594 let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
595
596 for i in 0..jsonb_array.len() {
597 if jsonb_array.is_null(i) {
598 builder.append_null();
599 } else if let Some(key) = common::get_string_value_at(key_array, i) {
600 let jsonb_bytes = jsonb_array.value(i);
601 let key_type = common::KeyType::parse(key);
602 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
603
604 match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
605 Some(value) => match json_value_to_bool(value)? {
606 Some(bool_val) => builder.append_value(bool_val),
607 None => builder.append_null(),
608 },
609 None => builder.append_null(),
610 }
611 } else {
612 builder.append_null();
613 }
614 }
615
616 Ok(Arc::new(builder.finish()))
617}
618
619pub fn json_array_contains_udf() -> ScalarUDF {
629 create_udf(
630 "json_array_contains",
631 vec![DataType::LargeBinary, DataType::Utf8, DataType::Utf8],
632 DataType::Boolean,
633 Volatility::Immutable,
634 Arc::new(json_array_contains_columnar_impl),
635 )
636}
637
638fn json_array_contains_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
640 let arrays = common::columnar_to_arrays(args);
641 let result = json_array_contains_impl(&arrays)?;
642 Ok(ColumnarValue::Array(result))
643}
644
645fn json_array_contains_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
647 common::validate_arg_count(args, 3, "json_array_contains")?;
648
649 let jsonb_array = common::extract_jsonb_array(args)?;
650 let path_array = common::extract_string_array(args, 1)?;
651 let value_array = common::extract_string_array(args, 2)?;
652
653 let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
654
655 for i in 0..jsonb_array.len() {
656 if jsonb_array.is_null(i) {
657 builder.append_null();
658 } else {
659 let path = common::get_string_value_at(path_array, i);
660 let value = common::get_string_value_at(value_array, i);
661
662 match (path, value) {
663 (Some(p), Some(v)) => {
664 let jsonb_bytes = jsonb_array.value(i);
665 let contains = check_array_contains(jsonb_bytes, p, v)?;
666 builder.append_value(contains);
667 }
668 _ => builder.append_null(),
669 }
670 }
671 }
672
673 Ok(Arc::new(builder.finish()))
674}
675
676fn check_array_contains(jsonb_bytes: &[u8], path: &str, value: &str) -> Result<bool> {
678 let json_path = common::parse_json_path(path)?;
679
680 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
681 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
682 match selector.select_values(&json_path) {
683 Ok(values) => {
684 for v in values {
685 let raw = v.as_raw();
687 let mut index = 0;
689 loop {
690 match raw.get_by_index(index) {
691 Ok(Some(elem)) => {
692 let elem_str = elem.to_string();
693 if elem_str == value || elem_str == format!("\"{}\"", value) {
695 return Ok(true);
696 }
697 index += 1;
698 }
699 Ok(None) => break, Err(_) => break, }
702 }
703 }
704 Ok(false)
705 }
706 Err(e) => Err(common::execution_error(format!(
707 "Failed to check array contains at path '{}': {}",
708 path, e
709 ))),
710 }
711}
712
713pub fn json_array_length_udf() -> ScalarUDF {
722 create_udf(
723 "json_array_length",
724 vec![DataType::LargeBinary, DataType::Utf8],
725 DataType::Int64,
726 Volatility::Immutable,
727 Arc::new(json_array_length_columnar_impl),
728 )
729}
730
731fn json_array_length_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
733 let arrays = common::columnar_to_arrays(args);
734 let result = json_array_length_impl(&arrays)?;
735 Ok(ColumnarValue::Array(result))
736}
737
738fn json_array_length_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
740 common::validate_arg_count(args, 2, "json_array_length")?;
741
742 let jsonb_array = common::extract_jsonb_array(args)?;
743 let path_array = common::extract_string_array(args, 1)?;
744
745 let mut builder = Int64Builder::with_capacity(jsonb_array.len());
746
747 for i in 0..jsonb_array.len() {
748 if jsonb_array.is_null(i) {
749 builder.append_null();
750 } else if let Some(path) = common::get_string_value_at(path_array, i) {
751 let jsonb_bytes = jsonb_array.value(i);
752 match get_array_length(jsonb_bytes, path)? {
753 Some(len) => builder.append_value(len),
754 None => builder.append_null(),
755 }
756 } else {
757 builder.append_null();
758 }
759 }
760
761 Ok(Arc::new(builder.finish()))
762}
763
764fn get_array_length(jsonb_bytes: &[u8], path: &str) -> Result<Option<i64>> {
766 let json_path = common::parse_json_path(path)?;
767
768 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
769 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
770 match selector.select_values(&json_path) {
771 Ok(values) => {
772 if values.is_empty() {
773 return Ok(None);
774 }
775 let first = &values[0];
776 let raw = first.as_raw();
777
778 let mut count = 0;
780 loop {
781 match raw.get_by_index(count) {
782 Ok(Some(_)) => count += 1,
783 Ok(None) => break, Err(_) => {
785 if count == 0 {
787 return Err(common::execution_error(format!(
788 "Path '{}' does not point to an array",
789 path
790 )));
791 }
792 break;
793 }
794 }
795 }
796 Ok(Some(count as i64))
797 }
798 Err(e) => Err(common::execution_error(format!(
799 "Failed to get array length at path '{}': {}",
800 path, e
801 ))),
802 }
803}
804
805#[cfg(test)]
806mod tests {
807 use super::*;
808 use arrow_array::builder::LargeBinaryBuilder;
809 use arrow_array::{BooleanArray, Int64Array};
810
811 fn create_test_jsonb(json_str: &str) -> Vec<u8> {
812 jsonb::parse_value(json_str.as_bytes()).unwrap().to_vec()
813 }
814
815 #[tokio::test]
816 async fn test_json_extract_udf() -> Result<()> {
817 let json = r#"{"user": {"name": "Alice", "age": 30}}"#;
818 let jsonb_bytes = create_test_jsonb(json);
819
820 let mut binary_builder = LargeBinaryBuilder::new();
821 binary_builder.append_value(&jsonb_bytes);
822 binary_builder.append_value(&jsonb_bytes);
823 binary_builder.append_null();
824
825 let jsonb_array = Arc::new(binary_builder.finish());
826 let path_array = Arc::new(StringArray::from(vec![
827 Some("$.user.name"),
828 Some("$.user.age"),
829 Some("$.user.name"),
830 ]));
831
832 let result = json_extract_impl(&[jsonb_array, path_array])?;
833 let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
834
835 assert_eq!(string_array.len(), 3);
836 assert_eq!(string_array.value(0), "\"Alice\"");
837 assert_eq!(string_array.value(1), "30");
838 assert!(string_array.is_null(2));
839
840 Ok(())
841 }
842
843 #[tokio::test]
844 async fn test_json_exists_udf() -> Result<()> {
845 let json = r#"{"user": {"name": "Alice", "age": 30}, "tags": ["rust", "json"]}"#;
846 let jsonb_bytes = create_test_jsonb(json);
847
848 let mut binary_builder = LargeBinaryBuilder::new();
849 binary_builder.append_value(&jsonb_bytes);
850 binary_builder.append_value(&jsonb_bytes);
851 binary_builder.append_value(&jsonb_bytes);
852 binary_builder.append_null();
853
854 let jsonb_array = Arc::new(binary_builder.finish());
855 let path_array = Arc::new(StringArray::from(vec![
856 Some("$.user.name"),
857 Some("$.user.email"),
858 Some("$.tags"),
859 Some("$.any"),
860 ]));
861
862 let result = json_exists_impl(&[jsonb_array, path_array])?;
863 let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
864
865 assert_eq!(bool_array.len(), 4);
866 assert!(bool_array.value(0));
867 assert!(!bool_array.value(1));
868 assert!(bool_array.value(2));
869 assert!(bool_array.is_null(3));
870
871 Ok(())
872 }
873
874 #[tokio::test]
875 async fn test_json_get_string_udf() -> Result<()> {
876 let json = r#"{"str": "hello", "num": 123, "bool": true, "null": null}"#;
878 let jsonb_bytes = create_test_jsonb(json);
879
880 let mut binary_builder = LargeBinaryBuilder::new();
881 binary_builder.append_value(&jsonb_bytes);
882 binary_builder.append_value(&jsonb_bytes);
883 binary_builder.append_value(&jsonb_bytes);
884 binary_builder.append_value(&jsonb_bytes);
885
886 let jsonb_array = Arc::new(binary_builder.finish());
887 let key_array = Arc::new(StringArray::from(vec![
888 Some("str"),
889 Some("num"),
890 Some("bool"),
891 Some("null"),
892 ]));
893
894 let result = json_get_string_impl(&[jsonb_array, key_array])?;
895 let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
896
897 assert_eq!(string_array.len(), 4);
898 assert_eq!(string_array.value(0), "hello");
899 assert_eq!(string_array.value(1), "123");
900 assert_eq!(string_array.value(2), "true");
901 assert!(string_array.is_null(3));
902
903 Ok(())
904 }
905
906 #[tokio::test]
907 async fn test_json_get_int_udf() -> Result<()> {
908 let json = r#"{"int": 42, "str_num": "99", "bool": true}"#;
909 let jsonb_bytes = create_test_jsonb(json);
910
911 let mut binary_builder = LargeBinaryBuilder::new();
912 binary_builder.append_value(&jsonb_bytes);
913 binary_builder.append_value(&jsonb_bytes);
914 binary_builder.append_value(&jsonb_bytes);
915
916 let jsonb_array = Arc::new(binary_builder.finish());
917 let key_array = Arc::new(StringArray::from(vec![
918 Some("int"),
919 Some("str_num"),
920 Some("bool"),
921 ]));
922
923 let result = json_get_int_impl(&[jsonb_array, key_array])?;
924 let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
925
926 assert_eq!(int_array.len(), 3);
927 assert_eq!(int_array.value(0), 42);
928 assert_eq!(int_array.value(1), 99);
929 assert_eq!(int_array.value(2), 1); Ok(())
932 }
933
934 #[tokio::test]
935 async fn test_json_get_bool_udf() -> Result<()> {
936 let json =
937 r#"{"bool_true": true, "bool_false": false, "str_true": "true", "str_false": "false"}"#;
938 let jsonb_bytes = create_test_jsonb(json);
939
940 let mut binary_builder = LargeBinaryBuilder::new();
941 binary_builder.append_value(&jsonb_bytes);
942 binary_builder.append_value(&jsonb_bytes);
943 binary_builder.append_value(&jsonb_bytes);
944 binary_builder.append_value(&jsonb_bytes);
945
946 let jsonb_array = Arc::new(binary_builder.finish());
947 let key_array = Arc::new(StringArray::from(vec![
948 Some("bool_true"),
949 Some("bool_false"),
950 Some("str_true"),
951 Some("str_false"),
952 ]));
953
954 let result = json_get_bool_impl(&[jsonb_array, key_array])?;
955 let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
956
957 assert_eq!(bool_array.len(), 4);
958 assert!(bool_array.value(0));
959 assert!(!bool_array.value(1));
960 assert!(bool_array.value(2)); assert!(!bool_array.value(3)); Ok(())
964 }
965
966 #[tokio::test]
967 async fn test_json_array_contains_udf() -> Result<()> {
968 let json = r#"{"tags": ["rust", "json", "database"], "nums": [1, 2, 3]}"#;
969 let jsonb_bytes = create_test_jsonb(json);
970
971 let mut binary_builder = LargeBinaryBuilder::new();
972 binary_builder.append_value(&jsonb_bytes);
973 binary_builder.append_value(&jsonb_bytes);
974 binary_builder.append_value(&jsonb_bytes);
975 binary_builder.append_null();
976
977 let jsonb_array = Arc::new(binary_builder.finish());
978 let path_array = Arc::new(StringArray::from(vec![
979 Some("$.tags"),
980 Some("$.tags"),
981 Some("$.nums"),
982 Some("$.tags"),
983 ]));
984 let value_array = Arc::new(StringArray::from(vec![
985 Some("rust"),
986 Some("python"),
987 Some("2"),
988 Some("any"),
989 ]));
990
991 let result = json_array_contains_impl(&[jsonb_array, path_array, value_array])?;
992 let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
993
994 assert_eq!(bool_array.len(), 4);
995 assert!(bool_array.value(0));
996 assert!(!bool_array.value(1));
997 assert!(bool_array.value(2));
998 assert!(bool_array.is_null(3));
999
1000 Ok(())
1001 }
1002
1003 #[tokio::test]
1004 async fn test_json_array_length_udf() -> Result<()> {
1005 let json = r#"{"empty": [], "tags": ["a", "b", "c"], "nested": {"arr": [1, 2]}}"#;
1006 let jsonb_bytes = create_test_jsonb(json);
1007
1008 let mut binary_builder = LargeBinaryBuilder::new();
1009 binary_builder.append_value(&jsonb_bytes);
1010 binary_builder.append_value(&jsonb_bytes);
1011 binary_builder.append_value(&jsonb_bytes);
1012 binary_builder.append_null();
1013
1014 let jsonb_array = Arc::new(binary_builder.finish());
1015 let path_array = Arc::new(StringArray::from(vec![
1016 Some("$.empty"),
1017 Some("$.tags"),
1018 Some("$.nested.arr"),
1019 Some("$.any"),
1020 ]));
1021
1022 let result = json_array_length_impl(&[jsonb_array, path_array])?;
1023 let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
1024
1025 assert_eq!(int_array.len(), 4);
1026 assert_eq!(int_array.value(0), 0);
1027 assert_eq!(int_array.value(1), 3);
1028 assert_eq!(int_array.value(2), 2);
1029 assert!(int_array.is_null(3));
1030
1031 Ok(())
1032 }
1033
1034 #[tokio::test]
1035 async fn test_json_array_access() -> Result<()> {
1036 let json = r#"["first", "second", "third"]"#;
1037 let jsonb_bytes = create_test_jsonb(json);
1038
1039 let mut binary_builder = LargeBinaryBuilder::new();
1040 binary_builder.append_value(&jsonb_bytes);
1041 binary_builder.append_value(&jsonb_bytes);
1042 binary_builder.append_value(&jsonb_bytes);
1043
1044 let jsonb_array = Arc::new(binary_builder.finish());
1045 let key_array = Arc::new(StringArray::from(vec![
1046 Some("0"),
1047 Some("1"),
1048 Some("10"), ]));
1050
1051 let result = json_get_string_impl(&[jsonb_array, key_array])?;
1052 let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1053
1054 assert_eq!(string_array.len(), 3);
1055 assert_eq!(string_array.value(0), "first");
1056 assert_eq!(string_array.value(1), "second");
1057 assert!(string_array.is_null(2));
1058
1059 Ok(())
1060 }
1061}