1use arrow_array::builder::{
5 BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder,
6};
7use arrow_array::{Array, ArrayRef, LargeBinaryArray, StringArray};
8use arrow_schema::DataType;
9use datafusion::error::{DataFusionError, Result};
10use datafusion::logical_expr::{ScalarUDF, Volatility};
11use datafusion::physical_plan::ColumnarValue;
12use datafusion::prelude::create_udf;
13use std::sync::Arc;
14
15#[repr(u8)]
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum JsonbType {
19 Null = 0,
20 Boolean = 1,
21 Int64 = 2,
22 Float64 = 3,
23 String = 4,
24 Array = 5,
25 Object = 6,
26}
27
28impl JsonbType {
29 pub fn from_u8(value: u8) -> Option<Self> {
31 match value {
32 0 => Some(Self::Null),
33 1 => Some(Self::Boolean),
34 2 => Some(Self::Int64),
35 3 => Some(Self::Float64),
36 4 => Some(Self::String),
37 5 => Some(Self::Array),
38 6 => Some(Self::Object),
39 _ => None,
40 }
41 }
42
43 pub fn as_u8(self) -> u8 {
45 self as u8
46 }
47}
48
49mod common {
51 use super::*;
52
53 #[derive(Debug, Clone)]
55 pub enum KeyType {
56 Field(String),
57 Index(usize),
58 }
59
60 impl KeyType {
61 pub fn parse(key: &str) -> Self {
63 if let Ok(index) = key.parse::<usize>() {
64 Self::Index(index)
65 } else {
66 Self::Field(key.to_string())
67 }
68 }
69 }
70
71 pub fn columnar_to_arrays(args: &[ColumnarValue]) -> Vec<ArrayRef> {
77 args.iter()
78 .map(|arg| match arg {
79 ColumnarValue::Array(arr) => arr.clone(),
80 ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(),
81 })
82 .collect()
83 }
84
85 pub fn execution_error(msg: impl Into<String>) -> DataFusionError {
87 DataFusionError::Execution(msg.into())
88 }
89
90 pub fn validate_arg_count(
92 args: &[ArrayRef],
93 expected: usize,
94 function_name: &str,
95 ) -> Result<()> {
96 if args.len() != expected {
97 return Err(execution_error(format!(
98 "{} requires exactly {} arguments",
99 function_name, expected
100 )));
101 }
102 Ok(())
103 }
104
105 pub fn extract_jsonb_array(args: &[ArrayRef]) -> Result<&LargeBinaryArray> {
107 args[0]
108 .as_any()
109 .downcast_ref::<LargeBinaryArray>()
110 .ok_or_else(|| execution_error("First argument must be LargeBinary"))
111 }
112
113 pub fn extract_string_array(args: &[ArrayRef], arg_index: usize) -> Result<&StringArray> {
115 args[arg_index]
116 .as_any()
117 .downcast_ref::<StringArray>()
118 .ok_or_else(|| execution_error(format!("Argument {} must be String", arg_index + 1)))
119 }
120
121 pub fn get_string_value_at(string_array: &StringArray, index: usize) -> Option<&str> {
125 let actual_index = if string_array.len() == 1 { 0 } else { index };
127
128 if string_array.is_null(actual_index) {
129 None
130 } else {
131 Some(string_array.value(actual_index))
132 }
133 }
134
135 pub fn get_json_value_by_key(
137 raw_jsonb: &jsonb::RawJsonb,
138 key_type: &KeyType,
139 ) -> Result<Option<jsonb::OwnedJsonb>> {
140 match key_type {
141 KeyType::Field(field) => raw_jsonb
142 .get_by_name(field, false)
143 .map_err(|e| execution_error(format!("Failed to get field '{}': {}", field, e))),
144 KeyType::Index(index) => raw_jsonb.get_by_index(*index).map_err(|e| {
145 execution_error(format!("Failed to get array element [{}]: {}", index, e))
146 }),
147 }
148 }
149
150 pub fn parse_json_path(path: &str) -> Result<jsonb::jsonpath::JsonPath<'_>> {
152 jsonb::jsonpath::parse_json_path(path.as_bytes())
153 .map_err(|e| execution_error(format!("Invalid JSONPath '{}': {}", path, e)))
154 }
155}
156
157fn json_value_to_string(value: jsonb::OwnedJsonb) -> Result<Option<String>> {
159 let raw_jsonb = value.as_raw();
160
161 if raw_jsonb
163 .is_null()
164 .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
165 {
166 return Ok(None);
167 }
168
169 raw_jsonb
171 .to_str()
172 .map(Some)
173 .map_err(|e| common::execution_error(format!("Failed to convert to string: {}", e)))
174}
175
176fn json_value_to_int(value: jsonb::OwnedJsonb) -> Result<Option<i64>> {
178 let raw_jsonb = value.as_raw();
179
180 if raw_jsonb
182 .is_null()
183 .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
184 {
185 return Ok(None);
186 }
187
188 raw_jsonb
190 .to_i64()
191 .map(Some)
192 .map_err(|e| common::execution_error(format!("Failed to convert to integer: {}", e)))
193}
194
195fn json_value_to_float(value: jsonb::OwnedJsonb) -> Result<Option<f64>> {
197 let raw_jsonb = value.as_raw();
198
199 if raw_jsonb
201 .is_null()
202 .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
203 {
204 return Ok(None);
205 }
206
207 raw_jsonb
209 .to_f64()
210 .map(Some)
211 .map_err(|e| common::execution_error(format!("Failed to convert to float: {}", e)))
212}
213
214fn json_value_to_bool(value: jsonb::OwnedJsonb) -> Result<Option<bool>> {
216 let raw_jsonb = value.as_raw();
217
218 if raw_jsonb
220 .is_null()
221 .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
222 {
223 return Ok(None);
224 }
225
226 raw_jsonb
228 .to_bool()
229 .map(Some)
230 .map_err(|e| common::execution_error(format!("Failed to convert to boolean: {}", e)))
231}
232
233pub fn json_extract_udf() -> ScalarUDF {
242 create_udf(
243 "json_extract",
244 vec![DataType::LargeBinary, DataType::Utf8],
245 DataType::Utf8,
246 Volatility::Immutable,
247 Arc::new(json_extract_columnar_impl),
248 )
249}
250
251pub fn json_extract_with_type_udf() -> ScalarUDF {
262 use arrow_schema::Fields;
263
264 let return_type = DataType::Struct(Fields::from(vec![
265 arrow_schema::Field::new("value", DataType::LargeBinary, true),
266 arrow_schema::Field::new("type_tag", DataType::UInt8, false),
267 ]));
268
269 create_udf(
270 "json_extract_with_type",
271 vec![DataType::LargeBinary, DataType::Utf8],
272 return_type,
273 Volatility::Immutable,
274 Arc::new(json_extract_with_type_columnar_impl),
275 )
276}
277
278fn json_extract_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
280 let arrays = common::columnar_to_arrays(args);
281 let result = json_extract_impl(&arrays)?;
282 Ok(ColumnarValue::Array(result))
283}
284
285fn json_extract_with_type_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
287 let arrays = common::columnar_to_arrays(args);
288 let result = json_extract_with_type_impl(&arrays)?;
289 Ok(ColumnarValue::Array(result))
290}
291
292fn json_extract_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
294 common::validate_arg_count(args, 2, "json_extract")?;
295
296 let jsonb_array = common::extract_jsonb_array(args)?;
297 let path_array = common::extract_string_array(args, 1)?;
298 let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
299
300 for i in 0..jsonb_array.len() {
301 if jsonb_array.is_null(i) {
302 builder.append_null();
303 } else if let Some(path) = common::get_string_value_at(path_array, i) {
304 let jsonb_bytes = jsonb_array.value(i);
305 match extract_json_path(jsonb_bytes, path)? {
306 Some(value) => builder.append_value(&value),
307 None => builder.append_null(),
308 }
309 } else {
310 builder.append_null();
311 }
312 }
313
314 Ok(Arc::new(builder.finish()))
315}
316
317fn json_extract_with_type_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
319 use arrow_array::builder::{LargeBinaryBuilder, UInt8Builder};
320 use arrow_array::StructArray;
321
322 common::validate_arg_count(args, 2, "json_extract_with_type")?;
323
324 let jsonb_array = common::extract_jsonb_array(args)?;
325 let path_array = common::extract_string_array(args, 1)?;
326
327 let mut value_builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 1024);
328 let mut type_builder = UInt8Builder::with_capacity(jsonb_array.len());
329
330 for i in 0..jsonb_array.len() {
331 if jsonb_array.is_null(i) {
332 value_builder.append_null();
333 type_builder.append_value(JsonbType::Null.as_u8());
334 } else if let Some(path) = common::get_string_value_at(path_array, i) {
335 let jsonb_bytes = jsonb_array.value(i);
336 match extract_json_path_with_type(jsonb_bytes, path)? {
337 Some((value_bytes, type_tag)) => {
338 value_builder.append_value(&value_bytes);
339 type_builder.append_value(type_tag);
340 }
341 None => {
342 value_builder.append_null();
343 type_builder.append_value(JsonbType::Null.as_u8());
344 }
345 }
346 } else {
347 value_builder.append_null();
348 type_builder.append_value(JsonbType::Null.as_u8());
349 }
350 }
351
352 let value_array = Arc::new(value_builder.finish()) as ArrayRef;
354 let type_array = Arc::new(type_builder.finish()) as ArrayRef;
355
356 let struct_array = StructArray::from(vec![
357 (
358 Arc::new(arrow_schema::Field::new(
359 "value",
360 DataType::LargeBinary,
361 true,
362 )),
363 value_array,
364 ),
365 (
366 Arc::new(arrow_schema::Field::new("type_tag", DataType::UInt8, false)),
367 type_array,
368 ),
369 ]);
370
371 Ok(Arc::new(struct_array))
372}
373
374fn extract_json_path_with_type(jsonb_bytes: &[u8], path: &str) -> Result<Option<(Vec<u8>, u8)>> {
377 let json_path = common::parse_json_path(path)?;
378
379 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
380 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
381 match selector.select_values(&json_path) {
382 Ok(values) => {
383 if values.is_empty() {
384 Ok(None)
385 } else {
386 let owned_value = &values[0];
388 let raw = owned_value.as_raw();
389
390 let jsonb_type = if raw.is_null().unwrap_or(false) {
392 JsonbType::Null
393 } else if raw.is_boolean().unwrap_or(false) {
394 JsonbType::Boolean
395 } else if raw.is_number().unwrap_or(false) {
396 if raw.to_i64().is_ok() {
398 JsonbType::Int64
399 } else {
400 JsonbType::Float64
401 }
402 } else if raw.is_string().unwrap_or(false) {
403 JsonbType::String
404 } else if raw.is_array().unwrap_or(false) {
405 JsonbType::Array
406 } else if raw.is_object().unwrap_or(false) {
407 JsonbType::Object
408 } else {
409 JsonbType::String };
411
412 Ok(Some((owned_value.clone().to_vec(), jsonb_type.as_u8())))
414 }
415 }
416 Err(e) => Err(common::execution_error(format!(
417 "Failed to select values from path '{}': {}",
418 path, e
419 ))),
420 }
421}
422
423fn extract_json_path(jsonb_bytes: &[u8], path: &str) -> Result<Option<String>> {
427 let json_path = common::parse_json_path(path)?;
428
429 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
430 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
431 match selector.select_values(&json_path) {
432 Ok(values) => {
433 if values.is_empty() {
434 Ok(None)
435 } else {
436 Ok(Some(values[0].to_string()))
438 }
439 }
440 Err(e) => Err(common::execution_error(format!(
441 "Failed to select values from path '{}': {}",
442 path, e
443 ))),
444 }
445}
446
447pub fn json_exists_udf() -> ScalarUDF {
456 create_udf(
457 "json_exists",
458 vec![DataType::LargeBinary, DataType::Utf8],
459 DataType::Boolean,
460 Volatility::Immutable,
461 Arc::new(json_exists_columnar_impl),
462 )
463}
464
465fn json_exists_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
467 let arrays = common::columnar_to_arrays(args);
468 let result = json_exists_impl(&arrays)?;
469 Ok(ColumnarValue::Array(result))
470}
471
472fn json_exists_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
474 common::validate_arg_count(args, 2, "json_exists")?;
475
476 let jsonb_array = common::extract_jsonb_array(args)?;
477 let path_array = common::extract_string_array(args, 1)?;
478
479 let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
480
481 for i in 0..jsonb_array.len() {
482 if jsonb_array.is_null(i) {
483 builder.append_null();
484 } else if let Some(path) = common::get_string_value_at(path_array, i) {
485 let jsonb_bytes = jsonb_array.value(i);
486 let exists = check_json_path_exists(jsonb_bytes, path)?;
487 builder.append_value(exists);
488 } else {
489 builder.append_null();
490 }
491 }
492
493 Ok(Arc::new(builder.finish()))
494}
495
496fn check_json_path_exists(jsonb_bytes: &[u8], path: &str) -> Result<bool> {
498 let json_path = common::parse_json_path(path)?;
499
500 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
501 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
502 match selector.exists(&json_path) {
503 Ok(exists) => Ok(exists),
504 Err(e) => Err(common::execution_error(format!(
505 "Failed to check existence of path '{}': {}",
506 path, e
507 ))),
508 }
509}
510
511pub fn json_get_udf() -> ScalarUDF {
520 create_udf(
521 "json_get",
522 vec![DataType::LargeBinary, DataType::Utf8],
523 DataType::LargeBinary,
524 Volatility::Immutable,
525 Arc::new(json_get_columnar_impl),
526 )
527}
528
529fn json_get_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
531 let arrays = common::columnar_to_arrays(args);
532 let result = json_get_impl(&arrays)?;
533 Ok(ColumnarValue::Array(result))
534}
535
536fn json_get_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
538 common::validate_arg_count(args, 2, "json_get")?;
539
540 let jsonb_array = common::extract_jsonb_array(args)?;
541 let key_array = common::extract_string_array(args, 1)?;
542
543 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 0);
544
545 for i in 0..jsonb_array.len() {
546 if jsonb_array.is_null(i) {
547 builder.append_null();
548 } else if let Some(key) = common::get_string_value_at(key_array, i) {
549 let jsonb_bytes = jsonb_array.value(i);
550 let key_type = common::KeyType::parse(key);
551 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
552
553 match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
554 Some(value) => builder.append_value(value.as_raw().as_ref()),
555 None => builder.append_null(),
556 }
557 } else {
558 builder.append_null();
559 }
560 }
561
562 Ok(Arc::new(builder.finish()))
563}
564
565pub fn json_get_string_udf() -> ScalarUDF {
574 create_udf(
575 "json_get_string",
576 vec![DataType::LargeBinary, DataType::Utf8],
577 DataType::Utf8,
578 Volatility::Immutable,
579 Arc::new(json_get_string_columnar_impl),
580 )
581}
582
583fn json_get_string_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
585 let arrays = common::columnar_to_arrays(args);
586 let result = json_get_string_impl(&arrays)?;
587 Ok(ColumnarValue::Array(result))
588}
589
590fn json_get_string_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
592 common::validate_arg_count(args, 2, "json_get_string")?;
593
594 let jsonb_array = common::extract_jsonb_array(args)?;
595 let key_array = common::extract_string_array(args, 1)?;
596
597 let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
598
599 for i in 0..jsonb_array.len() {
600 if jsonb_array.is_null(i) {
601 builder.append_null();
602 } else if let Some(key) = common::get_string_value_at(key_array, i) {
603 let jsonb_bytes = jsonb_array.value(i);
604 let key_type = common::KeyType::parse(key);
605 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
606
607 match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
608 Some(value) => match json_value_to_string(value)? {
609 Some(string_val) => builder.append_value(&string_val),
610 None => builder.append_null(),
611 },
612 None => builder.append_null(),
613 }
614 } else {
615 builder.append_null();
616 }
617 }
618
619 Ok(Arc::new(builder.finish()))
620}
621
622pub fn json_get_int_udf() -> ScalarUDF {
631 create_udf(
632 "json_get_int",
633 vec![DataType::LargeBinary, DataType::Utf8],
634 DataType::Int64,
635 Volatility::Immutable,
636 Arc::new(json_get_int_columnar_impl),
637 )
638}
639
640fn json_get_int_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
642 let arrays = common::columnar_to_arrays(args);
643 let result = json_get_int_impl(&arrays)?;
644 Ok(ColumnarValue::Array(result))
645}
646
647fn json_get_int_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
649 common::validate_arg_count(args, 2, "json_get_int")?;
650
651 let jsonb_array = common::extract_jsonb_array(args)?;
652 let key_array = common::extract_string_array(args, 1)?;
653
654 let mut builder = Int64Builder::with_capacity(jsonb_array.len());
655
656 for i in 0..jsonb_array.len() {
657 if jsonb_array.is_null(i) {
658 builder.append_null();
659 } else if let Some(key) = common::get_string_value_at(key_array, i) {
660 let jsonb_bytes = jsonb_array.value(i);
661 let key_type = common::KeyType::parse(key);
662 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
663
664 match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
665 Some(value) => match json_value_to_int(value)? {
666 Some(int_val) => builder.append_value(int_val),
667 None => builder.append_null(),
668 },
669 None => builder.append_null(),
670 }
671 } else {
672 builder.append_null();
673 }
674 }
675
676 Ok(Arc::new(builder.finish()))
677}
678
679pub fn json_get_float_udf() -> ScalarUDF {
688 create_udf(
689 "json_get_float",
690 vec![DataType::LargeBinary, DataType::Utf8],
691 DataType::Float64,
692 Volatility::Immutable,
693 Arc::new(json_get_float_columnar_impl),
694 )
695}
696
697fn json_get_float_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
699 let arrays = common::columnar_to_arrays(args);
700 let result = json_get_float_impl(&arrays)?;
701 Ok(ColumnarValue::Array(result))
702}
703
704fn json_get_float_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
706 common::validate_arg_count(args, 2, "json_get_float")?;
707
708 let jsonb_array = common::extract_jsonb_array(args)?;
709 let key_array = common::extract_string_array(args, 1)?;
710
711 let mut builder = Float64Builder::with_capacity(jsonb_array.len());
712
713 for i in 0..jsonb_array.len() {
714 if jsonb_array.is_null(i) {
715 builder.append_null();
716 } else if let Some(key) = common::get_string_value_at(key_array, i) {
717 let jsonb_bytes = jsonb_array.value(i);
718 let key_type = common::KeyType::parse(key);
719 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
720
721 match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
722 Some(value) => match json_value_to_float(value)? {
723 Some(float_val) => builder.append_value(float_val),
724 None => builder.append_null(),
725 },
726 None => builder.append_null(),
727 }
728 } else {
729 builder.append_null();
730 }
731 }
732
733 Ok(Arc::new(builder.finish()))
734}
735
736pub fn json_get_bool_udf() -> ScalarUDF {
745 create_udf(
746 "json_get_bool",
747 vec![DataType::LargeBinary, DataType::Utf8],
748 DataType::Boolean,
749 Volatility::Immutable,
750 Arc::new(json_get_bool_columnar_impl),
751 )
752}
753
754fn json_get_bool_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
756 let arrays = common::columnar_to_arrays(args);
757 let result = json_get_bool_impl(&arrays)?;
758 Ok(ColumnarValue::Array(result))
759}
760
761fn json_get_bool_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
763 common::validate_arg_count(args, 2, "json_get_bool")?;
764
765 let jsonb_array = common::extract_jsonb_array(args)?;
766 let key_array = common::extract_string_array(args, 1)?;
767
768 let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
769
770 for i in 0..jsonb_array.len() {
771 if jsonb_array.is_null(i) {
772 builder.append_null();
773 } else if let Some(key) = common::get_string_value_at(key_array, i) {
774 let jsonb_bytes = jsonb_array.value(i);
775 let key_type = common::KeyType::parse(key);
776 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
777
778 match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
779 Some(value) => match json_value_to_bool(value)? {
780 Some(bool_val) => builder.append_value(bool_val),
781 None => builder.append_null(),
782 },
783 None => builder.append_null(),
784 }
785 } else {
786 builder.append_null();
787 }
788 }
789
790 Ok(Arc::new(builder.finish()))
791}
792
793pub fn json_array_contains_udf() -> ScalarUDF {
803 create_udf(
804 "json_array_contains",
805 vec![DataType::LargeBinary, DataType::Utf8, DataType::Utf8],
806 DataType::Boolean,
807 Volatility::Immutable,
808 Arc::new(json_array_contains_columnar_impl),
809 )
810}
811
812fn json_array_contains_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
814 let arrays = common::columnar_to_arrays(args);
815 let result = json_array_contains_impl(&arrays)?;
816 Ok(ColumnarValue::Array(result))
817}
818
819fn json_array_contains_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
821 common::validate_arg_count(args, 3, "json_array_contains")?;
822
823 let jsonb_array = common::extract_jsonb_array(args)?;
824 let path_array = common::extract_string_array(args, 1)?;
825 let value_array = common::extract_string_array(args, 2)?;
826
827 let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
828
829 for i in 0..jsonb_array.len() {
830 if jsonb_array.is_null(i) {
831 builder.append_null();
832 } else {
833 let path = common::get_string_value_at(path_array, i);
834 let value = common::get_string_value_at(value_array, i);
835
836 match (path, value) {
837 (Some(p), Some(v)) => {
838 let jsonb_bytes = jsonb_array.value(i);
839 let contains = check_array_contains(jsonb_bytes, p, v)?;
840 builder.append_value(contains);
841 }
842 _ => builder.append_null(),
843 }
844 }
845 }
846
847 Ok(Arc::new(builder.finish()))
848}
849
850fn check_array_contains(jsonb_bytes: &[u8], path: &str, value: &str) -> Result<bool> {
852 let json_path = common::parse_json_path(path)?;
853
854 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
855 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
856 match selector.select_values(&json_path) {
857 Ok(values) => {
858 for v in values {
859 let raw = v.as_raw();
861 let mut index = 0;
863 loop {
864 match raw.get_by_index(index) {
865 Ok(Some(elem)) => {
866 let elem_str = elem.to_string();
867 if elem_str == value || elem_str == format!("\"{}\"", value) {
869 return Ok(true);
870 }
871 index += 1;
872 }
873 Ok(None) => break, Err(_) => break, }
876 }
877 }
878 Ok(false)
879 }
880 Err(e) => Err(common::execution_error(format!(
881 "Failed to check array contains at path '{}': {}",
882 path, e
883 ))),
884 }
885}
886
887pub fn json_array_length_udf() -> ScalarUDF {
896 create_udf(
897 "json_array_length",
898 vec![DataType::LargeBinary, DataType::Utf8],
899 DataType::Int64,
900 Volatility::Immutable,
901 Arc::new(json_array_length_columnar_impl),
902 )
903}
904
905fn json_array_length_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
907 let arrays = common::columnar_to_arrays(args);
908 let result = json_array_length_impl(&arrays)?;
909 Ok(ColumnarValue::Array(result))
910}
911
912fn json_array_length_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
914 common::validate_arg_count(args, 2, "json_array_length")?;
915
916 let jsonb_array = common::extract_jsonb_array(args)?;
917 let path_array = common::extract_string_array(args, 1)?;
918
919 let mut builder = Int64Builder::with_capacity(jsonb_array.len());
920
921 for i in 0..jsonb_array.len() {
922 if jsonb_array.is_null(i) {
923 builder.append_null();
924 } else if let Some(path) = common::get_string_value_at(path_array, i) {
925 let jsonb_bytes = jsonb_array.value(i);
926 match get_array_length(jsonb_bytes, path)? {
927 Some(len) => builder.append_value(len),
928 None => builder.append_null(),
929 }
930 } else {
931 builder.append_null();
932 }
933 }
934
935 Ok(Arc::new(builder.finish()))
936}
937
938fn get_array_length(jsonb_bytes: &[u8], path: &str) -> Result<Option<i64>> {
940 let json_path = common::parse_json_path(path)?;
941
942 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
943 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
944 match selector.select_values(&json_path) {
945 Ok(values) => {
946 if values.is_empty() {
947 return Ok(None);
948 }
949 let first = &values[0];
950 let raw = first.as_raw();
951
952 let mut count = 0;
954 loop {
955 match raw.get_by_index(count) {
956 Ok(Some(_)) => count += 1,
957 Ok(None) => break, Err(_) => {
959 if count == 0 {
961 return Err(common::execution_error(format!(
962 "Path '{}' does not point to an array",
963 path
964 )));
965 }
966 break;
967 }
968 }
969 }
970 Ok(Some(count as i64))
971 }
972 Err(e) => Err(common::execution_error(format!(
973 "Failed to get array length at path '{}': {}",
974 path, e
975 ))),
976 }
977}
978
979#[cfg(test)]
980mod tests {
981 use super::*;
982 use arrow_array::builder::LargeBinaryBuilder;
983 use arrow_array::{BooleanArray, Int64Array};
984
985 fn create_test_jsonb(json_str: &str) -> Vec<u8> {
986 jsonb::parse_value(json_str.as_bytes()).unwrap().to_vec()
987 }
988
989 #[test]
990 fn test_jsonb_type_enum() {
991 assert_eq!(JsonbType::Null.as_u8(), 0);
993 assert_eq!(JsonbType::Boolean.as_u8(), 1);
994 assert_eq!(JsonbType::Int64.as_u8(), 2);
995 assert_eq!(JsonbType::Float64.as_u8(), 3);
996 assert_eq!(JsonbType::String.as_u8(), 4);
997 assert_eq!(JsonbType::Array.as_u8(), 5);
998 assert_eq!(JsonbType::Object.as_u8(), 6);
999
1000 assert_eq!(JsonbType::from_u8(0), Some(JsonbType::Null));
1002 assert_eq!(JsonbType::from_u8(1), Some(JsonbType::Boolean));
1003 assert_eq!(JsonbType::from_u8(2), Some(JsonbType::Int64));
1004 assert_eq!(JsonbType::from_u8(3), Some(JsonbType::Float64));
1005 assert_eq!(JsonbType::from_u8(4), Some(JsonbType::String));
1006 assert_eq!(JsonbType::from_u8(5), Some(JsonbType::Array));
1007 assert_eq!(JsonbType::from_u8(6), Some(JsonbType::Object));
1008 assert_eq!(JsonbType::from_u8(7), None); }
1010
1011 #[tokio::test]
1012 async fn test_json_extract_udf() -> Result<()> {
1013 let json = r#"{"user": {"name": "Alice", "age": 30}}"#;
1014 let jsonb_bytes = create_test_jsonb(json);
1015
1016 let mut binary_builder = LargeBinaryBuilder::new();
1017 binary_builder.append_value(&jsonb_bytes);
1018 binary_builder.append_value(&jsonb_bytes);
1019 binary_builder.append_null();
1020
1021 let jsonb_array = Arc::new(binary_builder.finish());
1022 let path_array = Arc::new(StringArray::from(vec![
1023 Some("$.user.name"),
1024 Some("$.user.age"),
1025 Some("$.user.name"),
1026 ]));
1027
1028 let result = json_extract_impl(&[jsonb_array, path_array])?;
1029 let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1030
1031 assert_eq!(string_array.len(), 3);
1032 assert_eq!(string_array.value(0), "\"Alice\"");
1033 assert_eq!(string_array.value(1), "30");
1034 assert!(string_array.is_null(2));
1035
1036 Ok(())
1037 }
1038
1039 #[tokio::test]
1040 async fn test_json_exists_udf() -> Result<()> {
1041 let json = r#"{"user": {"name": "Alice", "age": 30}, "tags": ["rust", "json"]}"#;
1042 let jsonb_bytes = create_test_jsonb(json);
1043
1044 let mut binary_builder = LargeBinaryBuilder::new();
1045 binary_builder.append_value(&jsonb_bytes);
1046 binary_builder.append_value(&jsonb_bytes);
1047 binary_builder.append_value(&jsonb_bytes);
1048 binary_builder.append_null();
1049
1050 let jsonb_array = Arc::new(binary_builder.finish());
1051 let path_array = Arc::new(StringArray::from(vec![
1052 Some("$.user.name"),
1053 Some("$.user.email"),
1054 Some("$.tags"),
1055 Some("$.any"),
1056 ]));
1057
1058 let result = json_exists_impl(&[jsonb_array, path_array])?;
1059 let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1060
1061 assert_eq!(bool_array.len(), 4);
1062 assert!(bool_array.value(0));
1063 assert!(!bool_array.value(1));
1064 assert!(bool_array.value(2));
1065 assert!(bool_array.is_null(3));
1066
1067 Ok(())
1068 }
1069
1070 #[tokio::test]
1071 async fn test_json_get_string_udf() -> Result<()> {
1072 let json = r#"{"str": "hello", "num": 123, "bool": true, "null": null}"#;
1074 let jsonb_bytes = create_test_jsonb(json);
1075
1076 let mut binary_builder = LargeBinaryBuilder::new();
1077 binary_builder.append_value(&jsonb_bytes);
1078 binary_builder.append_value(&jsonb_bytes);
1079 binary_builder.append_value(&jsonb_bytes);
1080 binary_builder.append_value(&jsonb_bytes);
1081
1082 let jsonb_array = Arc::new(binary_builder.finish());
1083 let key_array = Arc::new(StringArray::from(vec![
1084 Some("str"),
1085 Some("num"),
1086 Some("bool"),
1087 Some("null"),
1088 ]));
1089
1090 let result = json_get_string_impl(&[jsonb_array, key_array])?;
1091 let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1092
1093 assert_eq!(string_array.len(), 4);
1094 assert_eq!(string_array.value(0), "hello");
1095 assert_eq!(string_array.value(1), "123");
1096 assert_eq!(string_array.value(2), "true");
1097 assert!(string_array.is_null(3));
1098
1099 Ok(())
1100 }
1101
1102 #[tokio::test]
1103 async fn test_json_get_int_udf() -> Result<()> {
1104 let json = r#"{"int": 42, "str_num": "99", "bool": true}"#;
1105 let jsonb_bytes = create_test_jsonb(json);
1106
1107 let mut binary_builder = LargeBinaryBuilder::new();
1108 binary_builder.append_value(&jsonb_bytes);
1109 binary_builder.append_value(&jsonb_bytes);
1110 binary_builder.append_value(&jsonb_bytes);
1111
1112 let jsonb_array = Arc::new(binary_builder.finish());
1113 let key_array = Arc::new(StringArray::from(vec![
1114 Some("int"),
1115 Some("str_num"),
1116 Some("bool"),
1117 ]));
1118
1119 let result = json_get_int_impl(&[jsonb_array, key_array])?;
1120 let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
1121
1122 assert_eq!(int_array.len(), 3);
1123 assert_eq!(int_array.value(0), 42);
1124 assert_eq!(int_array.value(1), 99);
1125 assert_eq!(int_array.value(2), 1); Ok(())
1128 }
1129
1130 #[tokio::test]
1131 async fn test_json_get_bool_udf() -> Result<()> {
1132 let json =
1133 r#"{"bool_true": true, "bool_false": false, "str_true": "true", "str_false": "false"}"#;
1134 let jsonb_bytes = create_test_jsonb(json);
1135
1136 let mut binary_builder = LargeBinaryBuilder::new();
1137 binary_builder.append_value(&jsonb_bytes);
1138 binary_builder.append_value(&jsonb_bytes);
1139 binary_builder.append_value(&jsonb_bytes);
1140 binary_builder.append_value(&jsonb_bytes);
1141
1142 let jsonb_array = Arc::new(binary_builder.finish());
1143 let key_array = Arc::new(StringArray::from(vec![
1144 Some("bool_true"),
1145 Some("bool_false"),
1146 Some("str_true"),
1147 Some("str_false"),
1148 ]));
1149
1150 let result = json_get_bool_impl(&[jsonb_array, key_array])?;
1151 let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1152
1153 assert_eq!(bool_array.len(), 4);
1154 assert!(bool_array.value(0));
1155 assert!(!bool_array.value(1));
1156 assert!(bool_array.value(2)); assert!(!bool_array.value(3)); Ok(())
1160 }
1161
1162 #[tokio::test]
1163 async fn test_json_array_contains_udf() -> Result<()> {
1164 let json = r#"{"tags": ["rust", "json", "database"], "nums": [1, 2, 3]}"#;
1165 let jsonb_bytes = create_test_jsonb(json);
1166
1167 let mut binary_builder = LargeBinaryBuilder::new();
1168 binary_builder.append_value(&jsonb_bytes);
1169 binary_builder.append_value(&jsonb_bytes);
1170 binary_builder.append_value(&jsonb_bytes);
1171 binary_builder.append_null();
1172
1173 let jsonb_array = Arc::new(binary_builder.finish());
1174 let path_array = Arc::new(StringArray::from(vec![
1175 Some("$.tags"),
1176 Some("$.tags"),
1177 Some("$.nums"),
1178 Some("$.tags"),
1179 ]));
1180 let value_array = Arc::new(StringArray::from(vec![
1181 Some("rust"),
1182 Some("python"),
1183 Some("2"),
1184 Some("any"),
1185 ]));
1186
1187 let result = json_array_contains_impl(&[jsonb_array, path_array, value_array])?;
1188 let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1189
1190 assert_eq!(bool_array.len(), 4);
1191 assert!(bool_array.value(0));
1192 assert!(!bool_array.value(1));
1193 assert!(bool_array.value(2));
1194 assert!(bool_array.is_null(3));
1195
1196 Ok(())
1197 }
1198
1199 #[tokio::test]
1200 async fn test_json_array_length_udf() -> Result<()> {
1201 let json = r#"{"empty": [], "tags": ["a", "b", "c"], "nested": {"arr": [1, 2]}}"#;
1202 let jsonb_bytes = create_test_jsonb(json);
1203
1204 let mut binary_builder = LargeBinaryBuilder::new();
1205 binary_builder.append_value(&jsonb_bytes);
1206 binary_builder.append_value(&jsonb_bytes);
1207 binary_builder.append_value(&jsonb_bytes);
1208 binary_builder.append_null();
1209
1210 let jsonb_array = Arc::new(binary_builder.finish());
1211 let path_array = Arc::new(StringArray::from(vec![
1212 Some("$.empty"),
1213 Some("$.tags"),
1214 Some("$.nested.arr"),
1215 Some("$.any"),
1216 ]));
1217
1218 let result = json_array_length_impl(&[jsonb_array, path_array])?;
1219 let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
1220
1221 assert_eq!(int_array.len(), 4);
1222 assert_eq!(int_array.value(0), 0);
1223 assert_eq!(int_array.value(1), 3);
1224 assert_eq!(int_array.value(2), 2);
1225 assert!(int_array.is_null(3));
1226
1227 Ok(())
1228 }
1229
1230 #[tokio::test]
1231 async fn test_json_array_access() -> Result<()> {
1232 let json = r#"["first", "second", "third"]"#;
1233 let jsonb_bytes = create_test_jsonb(json);
1234
1235 let mut binary_builder = LargeBinaryBuilder::new();
1236 binary_builder.append_value(&jsonb_bytes);
1237 binary_builder.append_value(&jsonb_bytes);
1238 binary_builder.append_value(&jsonb_bytes);
1239
1240 let jsonb_array = Arc::new(binary_builder.finish());
1241 let key_array = Arc::new(StringArray::from(vec![
1242 Some("0"),
1243 Some("1"),
1244 Some("10"), ]));
1246
1247 let result = json_get_string_impl(&[jsonb_array, key_array])?;
1248 let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1249
1250 assert_eq!(string_array.len(), 3);
1251 assert_eq!(string_array.value(0), "first");
1252 assert_eq!(string_array.value(1), "second");
1253 assert!(string_array.is_null(2));
1254
1255 Ok(())
1256 }
1257}