1use arrow_array::builder::{
5 BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder,
6};
7use arrow_array::{Array, ArrayRef, LargeBinaryArray, StringArray};
8use arrow_schema::DataType;
9use datafusion::error::{DataFusionError, Result};
10use datafusion::logical_expr::{ScalarUDF, Volatility};
11use datafusion::physical_plan::ColumnarValue;
12use datafusion::prelude::create_udf;
13use std::sync::Arc;
14
15#[repr(u8)]
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum JsonbType {
19 Null = 0,
20 Boolean = 1,
21 Int64 = 2,
22 Float64 = 3,
23 String = 4,
24 Array = 5,
25 Object = 6,
26}
27
28impl JsonbType {
29 pub fn from_u8(value: u8) -> Option<Self> {
31 match value {
32 0 => Some(Self::Null),
33 1 => Some(Self::Boolean),
34 2 => Some(Self::Int64),
35 3 => Some(Self::Float64),
36 4 => Some(Self::String),
37 5 => Some(Self::Array),
38 6 => Some(Self::Object),
39 _ => None,
40 }
41 }
42
43 pub fn as_u8(self) -> u8 {
45 self as u8
46 }
47}
48
49mod common {
51 use super::*;
52
53 #[derive(Debug, Clone)]
55 pub enum KeyType {
56 Field(String),
57 Index(usize),
58 }
59
60 impl KeyType {
61 pub fn parse(key: &str) -> Self {
63 if let Ok(index) = key.parse::<usize>() {
64 Self::Index(index)
65 } else {
66 Self::Field(key.to_string())
67 }
68 }
69 }
70
71 pub fn columnar_to_arrays(args: &[ColumnarValue]) -> Vec<ArrayRef> {
77 args.iter()
78 .map(|arg| match arg {
79 ColumnarValue::Array(arr) => arr.clone(),
80 ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(),
81 })
82 .collect()
83 }
84
85 pub fn execution_error(msg: impl Into<String>) -> DataFusionError {
87 DataFusionError::Execution(msg.into())
88 }
89
90 pub fn validate_arg_count(
92 args: &[ArrayRef],
93 expected: usize,
94 function_name: &str,
95 ) -> Result<()> {
96 if args.len() != expected {
97 return Err(execution_error(format!(
98 "{} requires exactly {} arguments",
99 function_name, expected
100 )));
101 }
102 Ok(())
103 }
104
105 pub fn extract_jsonb_array(args: &[ArrayRef]) -> Result<&LargeBinaryArray> {
107 args[0]
108 .as_any()
109 .downcast_ref::<LargeBinaryArray>()
110 .ok_or_else(|| execution_error("First argument must be LargeBinary"))
111 }
112
113 pub fn extract_string_array(args: &[ArrayRef], arg_index: usize) -> Result<&StringArray> {
115 args[arg_index]
116 .as_any()
117 .downcast_ref::<StringArray>()
118 .ok_or_else(|| execution_error(format!("Argument {} must be String", arg_index + 1)))
119 }
120
121 pub fn get_string_value_at(string_array: &StringArray, index: usize) -> Option<&str> {
125 let actual_index = if string_array.len() == 1 { 0 } else { index };
127
128 if string_array.is_null(actual_index) {
129 None
130 } else {
131 Some(string_array.value(actual_index))
132 }
133 }
134
135 pub fn get_json_value_by_key(
137 raw_jsonb: &jsonb::RawJsonb,
138 key_type: &KeyType,
139 ) -> Result<Option<jsonb::OwnedJsonb>> {
140 match key_type {
141 KeyType::Field(field) => raw_jsonb
142 .get_by_name(field, false)
143 .map_err(|e| execution_error(format!("Failed to get field '{}': {}", field, e))),
144 KeyType::Index(index) => raw_jsonb.get_by_index(*index).map_err(|e| {
145 execution_error(format!("Failed to get array element [{}]: {}", index, e))
146 }),
147 }
148 }
149
150 pub fn parse_json_path(path: &str) -> Result<jsonb::jsonpath::JsonPath<'_>> {
152 jsonb::jsonpath::parse_json_path(path.as_bytes())
153 .map_err(|e| execution_error(format!("Invalid JSONPath '{}': {}", path, e)))
154 }
155}
156
157fn json_value_to_string(value: jsonb::OwnedJsonb) -> Result<Option<String>> {
159 let raw_jsonb = value.as_raw();
160
161 if raw_jsonb
163 .is_null()
164 .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
165 {
166 return Ok(None);
167 }
168
169 raw_jsonb
171 .to_str()
172 .map(Some)
173 .map_err(|e| common::execution_error(format!("Failed to convert to string: {}", e)))
174}
175
176fn json_value_to_int(value: jsonb::OwnedJsonb) -> Result<Option<i64>> {
178 let raw_jsonb = value.as_raw();
179
180 if raw_jsonb
182 .is_null()
183 .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
184 {
185 return Ok(None);
186 }
187
188 raw_jsonb
190 .to_i64()
191 .map(Some)
192 .map_err(|e| common::execution_error(format!("Failed to convert to integer: {}", e)))
193}
194
195fn json_value_to_float(value: jsonb::OwnedJsonb) -> Result<Option<f64>> {
197 let raw_jsonb = value.as_raw();
198
199 if raw_jsonb
201 .is_null()
202 .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
203 {
204 return Ok(None);
205 }
206
207 raw_jsonb
209 .to_f64()
210 .map(Some)
211 .map_err(|e| common::execution_error(format!("Failed to convert to float: {}", e)))
212}
213
214fn json_value_to_bool(value: jsonb::OwnedJsonb) -> Result<Option<bool>> {
216 let raw_jsonb = value.as_raw();
217
218 if raw_jsonb
220 .is_null()
221 .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
222 {
223 return Ok(None);
224 }
225
226 raw_jsonb
228 .to_bool()
229 .map(Some)
230 .map_err(|e| common::execution_error(format!("Failed to convert to boolean: {}", e)))
231}
232
233pub fn json_extract_udf() -> ScalarUDF {
242 create_udf(
243 "json_extract",
244 vec![DataType::LargeBinary, DataType::Utf8],
245 DataType::Utf8,
246 Volatility::Immutable,
247 Arc::new(json_extract_columnar_impl),
248 )
249}
250
251pub fn json_extract_with_type_udf() -> ScalarUDF {
262 use arrow_schema::Fields;
263
264 let return_type = DataType::Struct(Fields::from(vec![
265 arrow_schema::Field::new("value", DataType::LargeBinary, true),
266 arrow_schema::Field::new("type_tag", DataType::UInt8, false),
267 ]));
268
269 create_udf(
270 "json_extract_with_type",
271 vec![DataType::LargeBinary, DataType::Utf8],
272 return_type,
273 Volatility::Immutable,
274 Arc::new(json_extract_with_type_columnar_impl),
275 )
276}
277
278fn json_extract_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
280 let arrays = common::columnar_to_arrays(args);
281 let result = json_extract_impl(&arrays)?;
282 Ok(ColumnarValue::Array(result))
283}
284
285fn json_extract_with_type_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
287 let arrays = common::columnar_to_arrays(args);
288 let result = json_extract_with_type_impl(&arrays)?;
289 Ok(ColumnarValue::Array(result))
290}
291
292fn json_extract_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
294 common::validate_arg_count(args, 2, "json_extract")?;
295
296 let jsonb_array = common::extract_jsonb_array(args)?;
297 let path_array = common::extract_string_array(args, 1)?;
298 let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
299
300 for i in 0..jsonb_array.len() {
301 if jsonb_array.is_null(i) {
302 builder.append_null();
303 } else if let Some(path) = common::get_string_value_at(path_array, i) {
304 let jsonb_bytes = jsonb_array.value(i);
305 match extract_json_path(jsonb_bytes, path)? {
306 Some(value) => builder.append_value(&value),
307 None => builder.append_null(),
308 }
309 } else {
310 builder.append_null();
311 }
312 }
313
314 Ok(Arc::new(builder.finish()))
315}
316
317fn json_extract_with_type_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
319 use arrow_array::StructArray;
320 use arrow_array::builder::{LargeBinaryBuilder, UInt8Builder};
321
322 common::validate_arg_count(args, 2, "json_extract_with_type")?;
323
324 let jsonb_array = common::extract_jsonb_array(args)?;
325 let path_array = common::extract_string_array(args, 1)?;
326
327 let mut value_builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 1024);
328 let mut type_builder = UInt8Builder::with_capacity(jsonb_array.len());
329
330 for i in 0..jsonb_array.len() {
331 if jsonb_array.is_null(i) {
332 value_builder.append_null();
333 type_builder.append_value(JsonbType::Null.as_u8());
334 } else if let Some(path) = common::get_string_value_at(path_array, i) {
335 let jsonb_bytes = jsonb_array.value(i);
336 match extract_json_path_with_type(jsonb_bytes, path)? {
337 Some((value_bytes, type_tag)) => {
338 value_builder.append_value(&value_bytes);
339 type_builder.append_value(type_tag);
340 }
341 None => {
342 value_builder.append_null();
343 type_builder.append_value(JsonbType::Null.as_u8());
344 }
345 }
346 } else {
347 value_builder.append_null();
348 type_builder.append_value(JsonbType::Null.as_u8());
349 }
350 }
351
352 let value_array = Arc::new(value_builder.finish()) as ArrayRef;
354 let type_array = Arc::new(type_builder.finish()) as ArrayRef;
355
356 let struct_array = StructArray::from(vec![
357 (
358 Arc::new(arrow_schema::Field::new(
359 "value",
360 DataType::LargeBinary,
361 true,
362 )),
363 value_array,
364 ),
365 (
366 Arc::new(arrow_schema::Field::new("type_tag", DataType::UInt8, false)),
367 type_array,
368 ),
369 ]);
370
371 Ok(Arc::new(struct_array))
372}
373
374fn extract_json_path_with_type(jsonb_bytes: &[u8], path: &str) -> Result<Option<(Vec<u8>, u8)>> {
377 let json_path = common::parse_json_path(path)?;
378
379 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
380 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
381 match selector.select_values(&json_path) {
382 Ok(values) => {
383 if values.is_empty() {
384 Ok(None)
385 } else {
386 let owned_value = &values[0];
388 let raw = owned_value.as_raw();
389
390 let jsonb_type = if raw.is_null().unwrap_or(false) {
392 JsonbType::Null
393 } else if raw.is_boolean().unwrap_or(false) {
394 JsonbType::Boolean
395 } else if raw.is_number().unwrap_or(false) {
396 let is_float_storage =
397 matches!(raw.as_number(), Ok(Some(jsonb::Number::Float64(_))));
398 if !is_float_storage && raw.is_i64().unwrap_or(false) {
399 JsonbType::Int64
400 } else {
401 JsonbType::Float64
402 }
403 } else if raw.is_string().unwrap_or(false) {
404 JsonbType::String
405 } else if raw.is_array().unwrap_or(false) {
406 JsonbType::Array
407 } else if raw.is_object().unwrap_or(false) {
408 JsonbType::Object
409 } else {
410 JsonbType::String };
412
413 Ok(Some((owned_value.clone().to_vec(), jsonb_type.as_u8())))
415 }
416 }
417 Err(e) => Err(common::execution_error(format!(
418 "Failed to select values from path '{}': {}",
419 path, e
420 ))),
421 }
422}
423
424fn extract_json_path(jsonb_bytes: &[u8], path: &str) -> Result<Option<String>> {
428 let json_path = common::parse_json_path(path)?;
429
430 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
431 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
432 match selector.select_values(&json_path) {
433 Ok(values) => {
434 if values.is_empty() {
435 Ok(None)
436 } else {
437 Ok(Some(values[0].to_string()))
439 }
440 }
441 Err(e) => Err(common::execution_error(format!(
442 "Failed to select values from path '{}': {}",
443 path, e
444 ))),
445 }
446}
447
448pub fn json_exists_udf() -> ScalarUDF {
457 create_udf(
458 "json_exists",
459 vec![DataType::LargeBinary, DataType::Utf8],
460 DataType::Boolean,
461 Volatility::Immutable,
462 Arc::new(json_exists_columnar_impl),
463 )
464}
465
466fn json_exists_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
468 let arrays = common::columnar_to_arrays(args);
469 let result = json_exists_impl(&arrays)?;
470 Ok(ColumnarValue::Array(result))
471}
472
473fn json_exists_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
475 common::validate_arg_count(args, 2, "json_exists")?;
476
477 let jsonb_array = common::extract_jsonb_array(args)?;
478 let path_array = common::extract_string_array(args, 1)?;
479
480 let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
481
482 for i in 0..jsonb_array.len() {
483 if jsonb_array.is_null(i) {
484 builder.append_null();
485 } else if let Some(path) = common::get_string_value_at(path_array, i) {
486 let jsonb_bytes = jsonb_array.value(i);
487 let exists = check_json_path_exists(jsonb_bytes, path)?;
488 builder.append_value(exists);
489 } else {
490 builder.append_null();
491 }
492 }
493
494 Ok(Arc::new(builder.finish()))
495}
496
497fn check_json_path_exists(jsonb_bytes: &[u8], path: &str) -> Result<bool> {
499 let json_path = common::parse_json_path(path)?;
500
501 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
502 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
503 match selector.exists(&json_path) {
504 Ok(exists) => Ok(exists),
505 Err(e) => Err(common::execution_error(format!(
506 "Failed to check existence of path '{}': {}",
507 path, e
508 ))),
509 }
510}
511
512pub fn json_get_udf() -> ScalarUDF {
521 create_udf(
522 "json_get",
523 vec![DataType::LargeBinary, DataType::Utf8],
524 DataType::LargeBinary,
525 Volatility::Immutable,
526 Arc::new(json_get_columnar_impl),
527 )
528}
529
530fn json_get_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
532 let arrays = common::columnar_to_arrays(args);
533 let result = json_get_impl(&arrays)?;
534 Ok(ColumnarValue::Array(result))
535}
536
537fn json_get_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
539 common::validate_arg_count(args, 2, "json_get")?;
540
541 let jsonb_array = common::extract_jsonb_array(args)?;
542 let key_array = common::extract_string_array(args, 1)?;
543
544 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 0);
545
546 for i in 0..jsonb_array.len() {
547 if jsonb_array.is_null(i) {
548 builder.append_null();
549 } else if let Some(key) = common::get_string_value_at(key_array, i) {
550 let jsonb_bytes = jsonb_array.value(i);
551 let key_type = common::KeyType::parse(key);
552 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
553
554 match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
555 Some(value) => builder.append_value(value.as_raw().as_ref()),
556 None => builder.append_null(),
557 }
558 } else {
559 builder.append_null();
560 }
561 }
562
563 Ok(Arc::new(builder.finish()))
564}
565
566pub fn json_get_string_udf() -> ScalarUDF {
575 create_udf(
576 "json_get_string",
577 vec![DataType::LargeBinary, DataType::Utf8],
578 DataType::Utf8,
579 Volatility::Immutable,
580 Arc::new(json_get_string_columnar_impl),
581 )
582}
583
584fn json_get_string_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
586 let arrays = common::columnar_to_arrays(args);
587 let result = json_get_string_impl(&arrays)?;
588 Ok(ColumnarValue::Array(result))
589}
590
591fn json_get_string_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
593 common::validate_arg_count(args, 2, "json_get_string")?;
594
595 let jsonb_array = common::extract_jsonb_array(args)?;
596 let key_array = common::extract_string_array(args, 1)?;
597
598 let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
599
600 for i in 0..jsonb_array.len() {
601 if jsonb_array.is_null(i) {
602 builder.append_null();
603 } else if let Some(key) = common::get_string_value_at(key_array, i) {
604 let jsonb_bytes = jsonb_array.value(i);
605 let key_type = common::KeyType::parse(key);
606 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
607
608 match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
609 Some(value) => match json_value_to_string(value)? {
610 Some(string_val) => builder.append_value(&string_val),
611 None => builder.append_null(),
612 },
613 None => builder.append_null(),
614 }
615 } else {
616 builder.append_null();
617 }
618 }
619
620 Ok(Arc::new(builder.finish()))
621}
622
623pub fn json_get_int_udf() -> ScalarUDF {
632 create_udf(
633 "json_get_int",
634 vec![DataType::LargeBinary, DataType::Utf8],
635 DataType::Int64,
636 Volatility::Immutable,
637 Arc::new(json_get_int_columnar_impl),
638 )
639}
640
641fn json_get_int_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
643 let arrays = common::columnar_to_arrays(args);
644 let result = json_get_int_impl(&arrays)?;
645 Ok(ColumnarValue::Array(result))
646}
647
648fn json_get_int_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
650 common::validate_arg_count(args, 2, "json_get_int")?;
651
652 let jsonb_array = common::extract_jsonb_array(args)?;
653 let key_array = common::extract_string_array(args, 1)?;
654
655 let mut builder = Int64Builder::with_capacity(jsonb_array.len());
656
657 for i in 0..jsonb_array.len() {
658 if jsonb_array.is_null(i) {
659 builder.append_null();
660 } else if let Some(key) = common::get_string_value_at(key_array, i) {
661 let jsonb_bytes = jsonb_array.value(i);
662 let key_type = common::KeyType::parse(key);
663 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
664
665 match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
666 Some(value) => match json_value_to_int(value)? {
667 Some(int_val) => builder.append_value(int_val),
668 None => builder.append_null(),
669 },
670 None => builder.append_null(),
671 }
672 } else {
673 builder.append_null();
674 }
675 }
676
677 Ok(Arc::new(builder.finish()))
678}
679
680pub fn json_get_float_udf() -> ScalarUDF {
689 create_udf(
690 "json_get_float",
691 vec![DataType::LargeBinary, DataType::Utf8],
692 DataType::Float64,
693 Volatility::Immutable,
694 Arc::new(json_get_float_columnar_impl),
695 )
696}
697
698fn json_get_float_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
700 let arrays = common::columnar_to_arrays(args);
701 let result = json_get_float_impl(&arrays)?;
702 Ok(ColumnarValue::Array(result))
703}
704
705fn json_get_float_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
707 common::validate_arg_count(args, 2, "json_get_float")?;
708
709 let jsonb_array = common::extract_jsonb_array(args)?;
710 let key_array = common::extract_string_array(args, 1)?;
711
712 let mut builder = Float64Builder::with_capacity(jsonb_array.len());
713
714 for i in 0..jsonb_array.len() {
715 if jsonb_array.is_null(i) {
716 builder.append_null();
717 } else if let Some(key) = common::get_string_value_at(key_array, i) {
718 let jsonb_bytes = jsonb_array.value(i);
719 let key_type = common::KeyType::parse(key);
720 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
721
722 match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
723 Some(value) => match json_value_to_float(value)? {
724 Some(float_val) => builder.append_value(float_val),
725 None => builder.append_null(),
726 },
727 None => builder.append_null(),
728 }
729 } else {
730 builder.append_null();
731 }
732 }
733
734 Ok(Arc::new(builder.finish()))
735}
736
737pub fn json_get_bool_udf() -> ScalarUDF {
746 create_udf(
747 "json_get_bool",
748 vec![DataType::LargeBinary, DataType::Utf8],
749 DataType::Boolean,
750 Volatility::Immutable,
751 Arc::new(json_get_bool_columnar_impl),
752 )
753}
754
755fn json_get_bool_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
757 let arrays = common::columnar_to_arrays(args);
758 let result = json_get_bool_impl(&arrays)?;
759 Ok(ColumnarValue::Array(result))
760}
761
762fn json_get_bool_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
764 common::validate_arg_count(args, 2, "json_get_bool")?;
765
766 let jsonb_array = common::extract_jsonb_array(args)?;
767 let key_array = common::extract_string_array(args, 1)?;
768
769 let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
770
771 for i in 0..jsonb_array.len() {
772 if jsonb_array.is_null(i) {
773 builder.append_null();
774 } else if let Some(key) = common::get_string_value_at(key_array, i) {
775 let jsonb_bytes = jsonb_array.value(i);
776 let key_type = common::KeyType::parse(key);
777 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
778
779 match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
780 Some(value) => match json_value_to_bool(value)? {
781 Some(bool_val) => builder.append_value(bool_val),
782 None => builder.append_null(),
783 },
784 None => builder.append_null(),
785 }
786 } else {
787 builder.append_null();
788 }
789 }
790
791 Ok(Arc::new(builder.finish()))
792}
793
794pub fn json_array_contains_udf() -> ScalarUDF {
804 create_udf(
805 "json_array_contains",
806 vec![DataType::LargeBinary, DataType::Utf8, DataType::Utf8],
807 DataType::Boolean,
808 Volatility::Immutable,
809 Arc::new(json_array_contains_columnar_impl),
810 )
811}
812
813fn json_array_contains_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
815 let arrays = common::columnar_to_arrays(args);
816 let result = json_array_contains_impl(&arrays)?;
817 Ok(ColumnarValue::Array(result))
818}
819
820fn json_array_contains_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
822 common::validate_arg_count(args, 3, "json_array_contains")?;
823
824 let jsonb_array = common::extract_jsonb_array(args)?;
825 let path_array = common::extract_string_array(args, 1)?;
826 let value_array = common::extract_string_array(args, 2)?;
827
828 let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
829
830 for i in 0..jsonb_array.len() {
831 if jsonb_array.is_null(i) {
832 builder.append_null();
833 } else {
834 let path = common::get_string_value_at(path_array, i);
835 let value = common::get_string_value_at(value_array, i);
836
837 match (path, value) {
838 (Some(p), Some(v)) => {
839 let jsonb_bytes = jsonb_array.value(i);
840 let contains = check_array_contains(jsonb_bytes, p, v)?;
841 builder.append_value(contains);
842 }
843 _ => builder.append_null(),
844 }
845 }
846 }
847
848 Ok(Arc::new(builder.finish()))
849}
850
851fn check_array_contains(jsonb_bytes: &[u8], path: &str, value: &str) -> Result<bool> {
853 let json_path = common::parse_json_path(path)?;
854
855 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
856 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
857 match selector.select_values(&json_path) {
858 Ok(values) => {
859 for v in values {
860 let raw = v.as_raw();
862 let mut index = 0;
864 loop {
865 match raw.get_by_index(index) {
866 Ok(Some(elem)) => {
867 let elem_str = elem.to_string();
868 if elem_str == value || elem_str == format!("\"{}\"", value) {
870 return Ok(true);
871 }
872 index += 1;
873 }
874 Ok(None) => break, Err(_) => break, }
877 }
878 }
879 Ok(false)
880 }
881 Err(e) => Err(common::execution_error(format!(
882 "Failed to check array contains at path '{}': {}",
883 path, e
884 ))),
885 }
886}
887
888pub fn json_array_length_udf() -> ScalarUDF {
897 create_udf(
898 "json_array_length",
899 vec![DataType::LargeBinary, DataType::Utf8],
900 DataType::Int64,
901 Volatility::Immutable,
902 Arc::new(json_array_length_columnar_impl),
903 )
904}
905
906fn json_array_length_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
908 let arrays = common::columnar_to_arrays(args);
909 let result = json_array_length_impl(&arrays)?;
910 Ok(ColumnarValue::Array(result))
911}
912
913fn json_array_length_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
915 common::validate_arg_count(args, 2, "json_array_length")?;
916
917 let jsonb_array = common::extract_jsonb_array(args)?;
918 let path_array = common::extract_string_array(args, 1)?;
919
920 let mut builder = Int64Builder::with_capacity(jsonb_array.len());
921
922 for i in 0..jsonb_array.len() {
923 if jsonb_array.is_null(i) {
924 builder.append_null();
925 } else if let Some(path) = common::get_string_value_at(path_array, i) {
926 let jsonb_bytes = jsonb_array.value(i);
927 match get_array_length(jsonb_bytes, path)? {
928 Some(len) => builder.append_value(len),
929 None => builder.append_null(),
930 }
931 } else {
932 builder.append_null();
933 }
934 }
935
936 Ok(Arc::new(builder.finish()))
937}
938
939fn get_array_length(jsonb_bytes: &[u8], path: &str) -> Result<Option<i64>> {
941 let json_path = common::parse_json_path(path)?;
942
943 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
944 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
945 match selector.select_values(&json_path) {
946 Ok(values) => {
947 if values.is_empty() {
948 return Ok(None);
949 }
950 let first = &values[0];
951 let raw = first.as_raw();
952
953 let mut count = 0;
955 loop {
956 match raw.get_by_index(count) {
957 Ok(Some(_)) => count += 1,
958 Ok(None) => break, Err(_) => {
960 if count == 0 {
962 return Err(common::execution_error(format!(
963 "Path '{}' does not point to an array",
964 path
965 )));
966 }
967 break;
968 }
969 }
970 }
971 Ok(Some(count as i64))
972 }
973 Err(e) => Err(common::execution_error(format!(
974 "Failed to get array length at path '{}': {}",
975 path, e
976 ))),
977 }
978}
979
980#[cfg(test)]
981mod tests {
982 use super::*;
983 use arrow_array::builder::LargeBinaryBuilder;
984 use arrow_array::{BooleanArray, Float64Array, Int64Array};
985
986 fn create_test_jsonb(json_str: &str) -> Vec<u8> {
987 jsonb::parse_value(json_str.as_bytes()).unwrap().to_vec()
988 }
989
990 #[test]
991 fn test_jsonb_type_enum() {
992 assert_eq!(JsonbType::Null.as_u8(), 0);
994 assert_eq!(JsonbType::Boolean.as_u8(), 1);
995 assert_eq!(JsonbType::Int64.as_u8(), 2);
996 assert_eq!(JsonbType::Float64.as_u8(), 3);
997 assert_eq!(JsonbType::String.as_u8(), 4);
998 assert_eq!(JsonbType::Array.as_u8(), 5);
999 assert_eq!(JsonbType::Object.as_u8(), 6);
1000
1001 assert_eq!(JsonbType::from_u8(0), Some(JsonbType::Null));
1003 assert_eq!(JsonbType::from_u8(1), Some(JsonbType::Boolean));
1004 assert_eq!(JsonbType::from_u8(2), Some(JsonbType::Int64));
1005 assert_eq!(JsonbType::from_u8(3), Some(JsonbType::Float64));
1006 assert_eq!(JsonbType::from_u8(4), Some(JsonbType::String));
1007 assert_eq!(JsonbType::from_u8(5), Some(JsonbType::Array));
1008 assert_eq!(JsonbType::from_u8(6), Some(JsonbType::Object));
1009 assert_eq!(JsonbType::from_u8(7), None); }
1011
1012 #[tokio::test]
1013 async fn test_json_extract_udf() -> Result<()> {
1014 let json = r#"{"user": {"name": "Alice", "age": 30}}"#;
1015 let jsonb_bytes = create_test_jsonb(json);
1016
1017 let mut binary_builder = LargeBinaryBuilder::new();
1018 binary_builder.append_value(&jsonb_bytes);
1019 binary_builder.append_value(&jsonb_bytes);
1020 binary_builder.append_null();
1021
1022 let jsonb_array = Arc::new(binary_builder.finish());
1023 let path_array = Arc::new(StringArray::from(vec![
1024 Some("$.user.name"),
1025 Some("$.user.age"),
1026 Some("$.user.name"),
1027 ]));
1028
1029 let result = json_extract_impl(&[jsonb_array, path_array])?;
1030 let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1031
1032 assert_eq!(string_array.len(), 3);
1033 assert_eq!(string_array.value(0), "\"Alice\"");
1034 assert_eq!(string_array.value(1), "30");
1035 assert!(string_array.is_null(2));
1036
1037 Ok(())
1038 }
1039
1040 #[tokio::test]
1041 async fn test_json_exists_udf() -> Result<()> {
1042 let json = r#"{"user": {"name": "Alice", "age": 30}, "tags": ["rust", "json"]}"#;
1043 let jsonb_bytes = create_test_jsonb(json);
1044
1045 let mut binary_builder = LargeBinaryBuilder::new();
1046 binary_builder.append_value(&jsonb_bytes);
1047 binary_builder.append_value(&jsonb_bytes);
1048 binary_builder.append_value(&jsonb_bytes);
1049 binary_builder.append_null();
1050
1051 let jsonb_array = Arc::new(binary_builder.finish());
1052 let path_array = Arc::new(StringArray::from(vec![
1053 Some("$.user.name"),
1054 Some("$.user.email"),
1055 Some("$.tags"),
1056 Some("$.any"),
1057 ]));
1058
1059 let result = json_exists_impl(&[jsonb_array, path_array])?;
1060 let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1061
1062 assert_eq!(bool_array.len(), 4);
1063 assert!(bool_array.value(0));
1064 assert!(!bool_array.value(1));
1065 assert!(bool_array.value(2));
1066 assert!(bool_array.is_null(3));
1067
1068 Ok(())
1069 }
1070
1071 #[tokio::test]
1072 async fn test_json_get_string_udf() -> Result<()> {
1073 let json = r#"{"str": "hello", "num": 123, "bool": true, "null": null}"#;
1075 let jsonb_bytes = create_test_jsonb(json);
1076
1077 let mut binary_builder = LargeBinaryBuilder::new();
1078 binary_builder.append_value(&jsonb_bytes);
1079 binary_builder.append_value(&jsonb_bytes);
1080 binary_builder.append_value(&jsonb_bytes);
1081 binary_builder.append_value(&jsonb_bytes);
1082
1083 let jsonb_array = Arc::new(binary_builder.finish());
1084 let key_array = Arc::new(StringArray::from(vec![
1085 Some("str"),
1086 Some("num"),
1087 Some("bool"),
1088 Some("null"),
1089 ]));
1090
1091 let result = json_get_string_impl(&[jsonb_array, key_array])?;
1092 let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1093
1094 assert_eq!(string_array.len(), 4);
1095 assert_eq!(string_array.value(0), "hello");
1096 assert_eq!(string_array.value(1), "123");
1097 assert_eq!(string_array.value(2), "true");
1098 assert!(string_array.is_null(3));
1099
1100 Ok(())
1101 }
1102
1103 #[tokio::test]
1104 async fn test_json_get_int_udf() -> Result<()> {
1105 let json = r#"{"int": 42, "str_num": "99", "bool": true}"#;
1106 let jsonb_bytes = create_test_jsonb(json);
1107
1108 let mut binary_builder = LargeBinaryBuilder::new();
1109 binary_builder.append_value(&jsonb_bytes);
1110 binary_builder.append_value(&jsonb_bytes);
1111 binary_builder.append_value(&jsonb_bytes);
1112
1113 let jsonb_array = Arc::new(binary_builder.finish());
1114 let key_array = Arc::new(StringArray::from(vec![
1115 Some("int"),
1116 Some("str_num"),
1117 Some("bool"),
1118 ]));
1119
1120 let result = json_get_int_impl(&[jsonb_array, key_array])?;
1121 let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
1122
1123 assert_eq!(int_array.len(), 3);
1124 assert_eq!(int_array.value(0), 42);
1125 assert_eq!(int_array.value(1), 99);
1126 assert_eq!(int_array.value(2), 1); Ok(())
1129 }
1130
1131 #[tokio::test]
1132 async fn test_json_get_float_udf() -> Result<()> {
1133 let json = r#"{
1134 "float_decimal": 1.5,
1135 "float_neg": -2.5,
1136 "float_int_value": 1.0,
1137 "float_exp": 1e2,
1138 "int_pos": 42,
1139 "int_neg": -7,
1140 "big_int": 9223372036854775808,
1141 "str_num": "3.5",
1142 "bool_true": true,
1143 "null_val": null
1144 }"#;
1145 let jsonb_bytes = create_test_jsonb(json);
1146
1147 let mut binary_builder = LargeBinaryBuilder::new();
1148 for _ in 0..11 {
1149 binary_builder.append_value(&jsonb_bytes);
1150 }
1151 let jsonb_array = Arc::new(binary_builder.finish());
1152 let key_array = Arc::new(StringArray::from(vec![
1153 Some("float_decimal"),
1154 Some("float_neg"),
1155 Some("float_int_value"),
1156 Some("float_exp"),
1157 Some("int_pos"),
1158 Some("int_neg"),
1159 Some("big_int"),
1160 Some("str_num"),
1161 Some("bool_true"),
1162 Some("null_val"),
1163 Some("missing"),
1164 ]));
1165
1166 let result = json_get_float_impl(&[jsonb_array, key_array])?;
1167 let float_array = result.as_any().downcast_ref::<Float64Array>().unwrap();
1168
1169 assert_eq!(float_array.len(), 11);
1170 assert_eq!(float_array.value(0), 1.5);
1171 assert_eq!(float_array.value(1), -2.5);
1172 assert_eq!(float_array.value(2), 1.0);
1173 assert_eq!(float_array.value(3), 100.0);
1174 assert_eq!(float_array.value(4), 42.0);
1175 assert_eq!(float_array.value(5), -7.0);
1176 assert_eq!(float_array.value(6), 9223372036854775808.0);
1178 assert_eq!(float_array.value(7), 3.5);
1179 assert_eq!(float_array.value(8), 1.0); assert!(float_array.is_null(9));
1181 assert!(float_array.is_null(10));
1182
1183 Ok(())
1184 }
1185
1186 #[tokio::test]
1187 async fn test_json_get_bool_udf() -> Result<()> {
1188 let json =
1189 r#"{"bool_true": true, "bool_false": false, "str_true": "true", "str_false": "false"}"#;
1190 let jsonb_bytes = create_test_jsonb(json);
1191
1192 let mut binary_builder = LargeBinaryBuilder::new();
1193 binary_builder.append_value(&jsonb_bytes);
1194 binary_builder.append_value(&jsonb_bytes);
1195 binary_builder.append_value(&jsonb_bytes);
1196 binary_builder.append_value(&jsonb_bytes);
1197
1198 let jsonb_array = Arc::new(binary_builder.finish());
1199 let key_array = Arc::new(StringArray::from(vec![
1200 Some("bool_true"),
1201 Some("bool_false"),
1202 Some("str_true"),
1203 Some("str_false"),
1204 ]));
1205
1206 let result = json_get_bool_impl(&[jsonb_array, key_array])?;
1207 let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1208
1209 assert_eq!(bool_array.len(), 4);
1210 assert!(bool_array.value(0));
1211 assert!(!bool_array.value(1));
1212 assert!(bool_array.value(2)); assert!(!bool_array.value(3)); Ok(())
1216 }
1217
1218 #[tokio::test]
1219 async fn test_json_array_contains_udf() -> Result<()> {
1220 let json = r#"{"tags": ["rust", "json", "database"], "nums": [1, 2, 3]}"#;
1221 let jsonb_bytes = create_test_jsonb(json);
1222
1223 let mut binary_builder = LargeBinaryBuilder::new();
1224 binary_builder.append_value(&jsonb_bytes);
1225 binary_builder.append_value(&jsonb_bytes);
1226 binary_builder.append_value(&jsonb_bytes);
1227 binary_builder.append_null();
1228
1229 let jsonb_array = Arc::new(binary_builder.finish());
1230 let path_array = Arc::new(StringArray::from(vec![
1231 Some("$.tags"),
1232 Some("$.tags"),
1233 Some("$.nums"),
1234 Some("$.tags"),
1235 ]));
1236 let value_array = Arc::new(StringArray::from(vec![
1237 Some("rust"),
1238 Some("python"),
1239 Some("2"),
1240 Some("any"),
1241 ]));
1242
1243 let result = json_array_contains_impl(&[jsonb_array, path_array, value_array])?;
1244 let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1245
1246 assert_eq!(bool_array.len(), 4);
1247 assert!(bool_array.value(0));
1248 assert!(!bool_array.value(1));
1249 assert!(bool_array.value(2));
1250 assert!(bool_array.is_null(3));
1251
1252 Ok(())
1253 }
1254
1255 #[tokio::test]
1256 async fn test_json_array_length_udf() -> Result<()> {
1257 let json = r#"{"empty": [], "tags": ["a", "b", "c"], "nested": {"arr": [1, 2]}}"#;
1258 let jsonb_bytes = create_test_jsonb(json);
1259
1260 let mut binary_builder = LargeBinaryBuilder::new();
1261 binary_builder.append_value(&jsonb_bytes);
1262 binary_builder.append_value(&jsonb_bytes);
1263 binary_builder.append_value(&jsonb_bytes);
1264 binary_builder.append_null();
1265
1266 let jsonb_array = Arc::new(binary_builder.finish());
1267 let path_array = Arc::new(StringArray::from(vec![
1268 Some("$.empty"),
1269 Some("$.tags"),
1270 Some("$.nested.arr"),
1271 Some("$.any"),
1272 ]));
1273
1274 let result = json_array_length_impl(&[jsonb_array, path_array])?;
1275 let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
1276
1277 assert_eq!(int_array.len(), 4);
1278 assert_eq!(int_array.value(0), 0);
1279 assert_eq!(int_array.value(1), 3);
1280 assert_eq!(int_array.value(2), 2);
1281 assert!(int_array.is_null(3));
1282
1283 Ok(())
1284 }
1285
1286 #[tokio::test]
1287 async fn test_json_extract_with_type() -> Result<()> {
1288 use arrow_array::StructArray;
1289 use arrow_array::UInt8Array;
1290
1291 let cases: &[(&str, JsonbType)] = &[
1292 (r#"{"v": 1}"#, JsonbType::Int64),
1293 (r#"{"v": 0}"#, JsonbType::Int64),
1294 (r#"{"v": -42}"#, JsonbType::Int64),
1295 (r#"{"v": 9223372036854775807}"#, JsonbType::Int64), (r#"{"v": 9223372036854775808}"#, JsonbType::Float64), (r#"{"v": 1.0}"#, JsonbType::Float64),
1298 (r#"{"v": 2.7}"#, JsonbType::Float64),
1299 (r#"{"v": 1.5}"#, JsonbType::Float64),
1300 (r#"{"v": -1.5}"#, JsonbType::Float64),
1301 (r#"{"v": 1e2}"#, JsonbType::Float64),
1302 ];
1303
1304 for (json, expected) in cases {
1305 let bytes = create_test_jsonb(json);
1306 let mut binary_builder = LargeBinaryBuilder::new();
1307 binary_builder.append_value(&bytes);
1308 let jsonb_array: ArrayRef = Arc::new(binary_builder.finish());
1309 let path_array: ArrayRef = Arc::new(StringArray::from(vec![Some("$.v")]));
1310
1311 let result = json_extract_with_type_impl(&[jsonb_array, path_array])?;
1312 let struct_array = result.as_any().downcast_ref::<StructArray>().unwrap();
1313 let type_tags = struct_array
1314 .column_by_name("type_tag")
1315 .unwrap()
1316 .as_any()
1317 .downcast_ref::<UInt8Array>()
1318 .unwrap();
1319 assert_eq!(type_tags.value(0), expected.as_u8());
1320 }
1321
1322 Ok(())
1323 }
1324
1325 #[tokio::test]
1326 async fn test_json_array_access() -> Result<()> {
1327 let json = r#"["first", "second", "third"]"#;
1328 let jsonb_bytes = create_test_jsonb(json);
1329
1330 let mut binary_builder = LargeBinaryBuilder::new();
1331 binary_builder.append_value(&jsonb_bytes);
1332 binary_builder.append_value(&jsonb_bytes);
1333 binary_builder.append_value(&jsonb_bytes);
1334
1335 let jsonb_array = Arc::new(binary_builder.finish());
1336 let key_array = Arc::new(StringArray::from(vec![
1337 Some("0"),
1338 Some("1"),
1339 Some("10"), ]));
1341
1342 let result = json_get_string_impl(&[jsonb_array, key_array])?;
1343 let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1344
1345 assert_eq!(string_array.len(), 3);
1346 assert_eq!(string_array.value(0), "first");
1347 assert_eq!(string_array.value(1), "second");
1348 assert!(string_array.is_null(2));
1349
1350 Ok(())
1351 }
1352}