1use arrow_array::builder::{
5 BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder,
6};
7use arrow_array::{Array, ArrayRef, LargeBinaryArray, StringArray};
8use arrow_schema::DataType;
9use datafusion::error::{DataFusionError, Result};
10use datafusion::logical_expr::{ScalarUDF, Volatility};
11use datafusion::physical_plan::ColumnarValue;
12use datafusion::prelude::create_udf;
13use std::sync::Arc;
14
15#[repr(u8)]
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum JsonbType {
19 Null = 0,
20 Boolean = 1,
21 Int64 = 2,
22 Float64 = 3,
23 String = 4,
24 Array = 5,
25 Object = 6,
26}
27
28impl JsonbType {
29 pub fn from_u8(value: u8) -> Option<Self> {
31 match value {
32 0 => Some(Self::Null),
33 1 => Some(Self::Boolean),
34 2 => Some(Self::Int64),
35 3 => Some(Self::Float64),
36 4 => Some(Self::String),
37 5 => Some(Self::Array),
38 6 => Some(Self::Object),
39 _ => None,
40 }
41 }
42
43 pub fn as_u8(self) -> u8 {
45 self as u8
46 }
47}
48
49mod common {
51 use super::*;
52
53 pub fn columnar_to_arrays(args: &[ColumnarValue]) -> Vec<ArrayRef> {
59 args.iter()
60 .map(|arg| match arg {
61 ColumnarValue::Array(arr) => arr.clone(),
62 ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(),
63 })
64 .collect()
65 }
66
67 pub fn execution_error(msg: impl Into<String>) -> DataFusionError {
69 DataFusionError::Execution(msg.into())
70 }
71
72 pub fn validate_arg_count(
74 args: &[ArrayRef],
75 expected: usize,
76 function_name: &str,
77 ) -> Result<()> {
78 if args.len() != expected {
79 return Err(execution_error(format!(
80 "{} requires exactly {} arguments",
81 function_name, expected
82 )));
83 }
84 Ok(())
85 }
86
87 pub fn extract_jsonb_array(args: &[ArrayRef]) -> Result<&LargeBinaryArray> {
89 args[0]
90 .as_any()
91 .downcast_ref::<LargeBinaryArray>()
92 .ok_or_else(|| execution_error("First argument must be LargeBinary"))
93 }
94
95 pub fn extract_string_array(args: &[ArrayRef], arg_index: usize) -> Result<&StringArray> {
97 args[arg_index]
98 .as_any()
99 .downcast_ref::<StringArray>()
100 .ok_or_else(|| execution_error(format!("Argument {} must be String", arg_index + 1)))
101 }
102
103 pub fn get_string_value_at(string_array: &StringArray, index: usize) -> Option<&str> {
107 let actual_index = if string_array.len() == 1 { 0 } else { index };
109
110 if string_array.is_null(actual_index) {
111 None
112 } else {
113 Some(string_array.value(actual_index))
114 }
115 }
116
117 pub fn get_json_value_by_key(
119 raw_jsonb: &jsonb::RawJsonb,
120 key: &str,
121 ) -> Result<Option<jsonb::OwnedJsonb>> {
122 if raw_jsonb.is_object().unwrap_or(false) {
123 raw_jsonb
124 .get_by_name(key, false)
125 .map_err(|e| execution_error(format!("Failed to get field '{}': {}", key, e)))
126 } else if raw_jsonb.is_array().unwrap_or(false) {
127 match key.parse::<usize>() {
128 Ok(index) => raw_jsonb.get_by_index(index).map_err(|e| {
129 execution_error(format!("Failed to get array element [{}]: {}", index, e))
130 }),
131 Err(_) => Ok(None),
132 }
133 } else {
134 Ok(None)
135 }
136 }
137
138 pub fn parse_json_path(path: &str) -> Result<jsonb::jsonpath::JsonPath<'_>> {
140 jsonb::jsonpath::parse_json_path(path.as_bytes())
141 .map_err(|e| execution_error(format!("Invalid JSONPath '{}': {}", path, e)))
142 }
143}
144
145fn json_value_to_string(value: jsonb::OwnedJsonb) -> Result<Option<String>> {
147 let raw_jsonb = value.as_raw();
148
149 if raw_jsonb
151 .is_null()
152 .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
153 {
154 return Ok(None);
155 }
156
157 raw_jsonb
159 .to_str()
160 .map(Some)
161 .map_err(|e| common::execution_error(format!("Failed to convert to string: {}", e)))
162}
163
164fn json_value_to_int(value: jsonb::OwnedJsonb) -> Result<Option<i64>> {
166 let raw_jsonb = value.as_raw();
167
168 if raw_jsonb
170 .is_null()
171 .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
172 {
173 return Ok(None);
174 }
175
176 raw_jsonb
178 .to_i64()
179 .map(Some)
180 .map_err(|e| common::execution_error(format!("Failed to convert to integer: {}", e)))
181}
182
183fn json_value_to_float(value: jsonb::OwnedJsonb) -> Result<Option<f64>> {
185 let raw_jsonb = value.as_raw();
186
187 if raw_jsonb
189 .is_null()
190 .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
191 {
192 return Ok(None);
193 }
194
195 raw_jsonb
197 .to_f64()
198 .map(Some)
199 .map_err(|e| common::execution_error(format!("Failed to convert to float: {}", e)))
200}
201
202fn json_value_to_bool(value: jsonb::OwnedJsonb) -> Result<Option<bool>> {
204 let raw_jsonb = value.as_raw();
205
206 if raw_jsonb
208 .is_null()
209 .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
210 {
211 return Ok(None);
212 }
213
214 raw_jsonb
216 .to_bool()
217 .map(Some)
218 .map_err(|e| common::execution_error(format!("Failed to convert to boolean: {}", e)))
219}
220
221pub fn json_extract_udf() -> ScalarUDF {
230 create_udf(
231 "json_extract",
232 vec![DataType::LargeBinary, DataType::Utf8],
233 DataType::Utf8,
234 Volatility::Immutable,
235 Arc::new(json_extract_columnar_impl),
236 )
237}
238
239pub fn json_extract_with_type_udf() -> ScalarUDF {
250 use arrow_schema::Fields;
251
252 let return_type = DataType::Struct(Fields::from(vec![
253 arrow_schema::Field::new("value", DataType::LargeBinary, true),
254 arrow_schema::Field::new("type_tag", DataType::UInt8, false),
255 ]));
256
257 create_udf(
258 "json_extract_with_type",
259 vec![DataType::LargeBinary, DataType::Utf8],
260 return_type,
261 Volatility::Immutable,
262 Arc::new(json_extract_with_type_columnar_impl),
263 )
264}
265
266fn json_extract_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
268 let arrays = common::columnar_to_arrays(args);
269 let result = json_extract_impl(&arrays)?;
270 Ok(ColumnarValue::Array(result))
271}
272
273fn json_extract_with_type_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
275 let arrays = common::columnar_to_arrays(args);
276 let result = json_extract_with_type_impl(&arrays)?;
277 Ok(ColumnarValue::Array(result))
278}
279
280fn json_extract_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
282 common::validate_arg_count(args, 2, "json_extract")?;
283
284 let jsonb_array = common::extract_jsonb_array(args)?;
285 let path_array = common::extract_string_array(args, 1)?;
286 let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
287
288 for i in 0..jsonb_array.len() {
289 if jsonb_array.is_null(i) {
290 builder.append_null();
291 } else if let Some(path) = common::get_string_value_at(path_array, i) {
292 let jsonb_bytes = jsonb_array.value(i);
293 match extract_json_path(jsonb_bytes, path)? {
294 Some(value) => builder.append_value(&value),
295 None => builder.append_null(),
296 }
297 } else {
298 builder.append_null();
299 }
300 }
301
302 Ok(Arc::new(builder.finish()))
303}
304
305fn json_extract_with_type_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
307 use arrow_array::StructArray;
308 use arrow_array::builder::{LargeBinaryBuilder, UInt8Builder};
309
310 common::validate_arg_count(args, 2, "json_extract_with_type")?;
311
312 let jsonb_array = common::extract_jsonb_array(args)?;
313 let path_array = common::extract_string_array(args, 1)?;
314
315 let mut value_builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 1024);
316 let mut type_builder = UInt8Builder::with_capacity(jsonb_array.len());
317
318 for i in 0..jsonb_array.len() {
319 if jsonb_array.is_null(i) {
320 value_builder.append_null();
321 type_builder.append_value(JsonbType::Null.as_u8());
322 } else if let Some(path) = common::get_string_value_at(path_array, i) {
323 let jsonb_bytes = jsonb_array.value(i);
324 match extract_json_path_with_type(jsonb_bytes, path)? {
325 Some((value_bytes, type_tag)) => {
326 value_builder.append_value(&value_bytes);
327 type_builder.append_value(type_tag);
328 }
329 None => {
330 value_builder.append_null();
331 type_builder.append_value(JsonbType::Null.as_u8());
332 }
333 }
334 } else {
335 value_builder.append_null();
336 type_builder.append_value(JsonbType::Null.as_u8());
337 }
338 }
339
340 let value_array = Arc::new(value_builder.finish()) as ArrayRef;
342 let type_array = Arc::new(type_builder.finish()) as ArrayRef;
343
344 let struct_array = StructArray::from(vec![
345 (
346 Arc::new(arrow_schema::Field::new(
347 "value",
348 DataType::LargeBinary,
349 true,
350 )),
351 value_array,
352 ),
353 (
354 Arc::new(arrow_schema::Field::new("type_tag", DataType::UInt8, false)),
355 type_array,
356 ),
357 ]);
358
359 Ok(Arc::new(struct_array))
360}
361
362fn extract_json_path_with_type(jsonb_bytes: &[u8], path: &str) -> Result<Option<(Vec<u8>, u8)>> {
365 let json_path = common::parse_json_path(path)?;
366
367 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
368 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
369 match selector.select_value(&json_path) {
370 Ok(Some(owned_value)) => {
371 let raw = owned_value.as_raw();
372
373 let jsonb_type = if raw.is_null().unwrap_or(false) {
375 JsonbType::Null
376 } else if raw.is_boolean().unwrap_or(false) {
377 JsonbType::Boolean
378 } else if raw.is_number().unwrap_or(false) {
379 let is_float_storage =
380 matches!(raw.as_number(), Ok(Some(jsonb::Number::Float64(_))));
381 if !is_float_storage && raw.is_i64().unwrap_or(false) {
382 JsonbType::Int64
383 } else {
384 JsonbType::Float64
385 }
386 } else if raw.is_string().unwrap_or(false) {
387 JsonbType::String
388 } else if raw.is_array().unwrap_or(false) {
389 JsonbType::Array
390 } else if raw.is_object().unwrap_or(false) {
391 JsonbType::Object
392 } else {
393 JsonbType::String };
395
396 Ok(Some((owned_value.to_vec(), jsonb_type.as_u8())))
398 }
399 Ok(None) => Ok(None),
400 Err(e) => Err(common::execution_error(format!(
401 "Failed to select value from path '{}': {}",
402 path, e
403 ))),
404 }
405}
406
407fn extract_json_path(jsonb_bytes: &[u8], path: &str) -> Result<Option<String>> {
412 let json_path = common::parse_json_path(path)?;
413
414 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
415 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
416 match selector.select_value(&json_path) {
417 Ok(value) => Ok(value.map(|value| value.to_string())),
418 Err(e) => Err(common::execution_error(format!(
419 "Failed to select value from path '{}': {}",
420 path, e
421 ))),
422 }
423}
424
425pub fn json_exists_udf() -> ScalarUDF {
434 create_udf(
435 "json_exists",
436 vec![DataType::LargeBinary, DataType::Utf8],
437 DataType::Boolean,
438 Volatility::Immutable,
439 Arc::new(json_exists_columnar_impl),
440 )
441}
442
443fn json_exists_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
445 let arrays = common::columnar_to_arrays(args);
446 let result = json_exists_impl(&arrays)?;
447 Ok(ColumnarValue::Array(result))
448}
449
450fn json_exists_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
452 common::validate_arg_count(args, 2, "json_exists")?;
453
454 let jsonb_array = common::extract_jsonb_array(args)?;
455 let path_array = common::extract_string_array(args, 1)?;
456
457 let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
458
459 for i in 0..jsonb_array.len() {
460 if jsonb_array.is_null(i) {
461 builder.append_null();
462 } else if let Some(path) = common::get_string_value_at(path_array, i) {
463 let jsonb_bytes = jsonb_array.value(i);
464 let exists = check_json_path_exists(jsonb_bytes, path)?;
465 builder.append_value(exists);
466 } else {
467 builder.append_null();
468 }
469 }
470
471 Ok(Arc::new(builder.finish()))
472}
473
474fn check_json_path_exists(jsonb_bytes: &[u8], path: &str) -> Result<bool> {
476 let json_path = common::parse_json_path(path)?;
477
478 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
479 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
480 match selector.exists(&json_path) {
481 Ok(exists) => Ok(exists),
482 Err(e) => Err(common::execution_error(format!(
483 "Failed to check existence of path '{}': {}",
484 path, e
485 ))),
486 }
487}
488
489pub fn json_get_udf() -> ScalarUDF {
498 create_udf(
499 "json_get",
500 vec![DataType::LargeBinary, DataType::Utf8],
501 DataType::LargeBinary,
502 Volatility::Immutable,
503 Arc::new(json_get_columnar_impl),
504 )
505}
506
507fn json_get_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
509 let arrays = common::columnar_to_arrays(args);
510 let result = json_get_impl(&arrays)?;
511 Ok(ColumnarValue::Array(result))
512}
513
514fn json_get_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
516 common::validate_arg_count(args, 2, "json_get")?;
517
518 let jsonb_array = common::extract_jsonb_array(args)?;
519 let key_array = common::extract_string_array(args, 1)?;
520
521 let mut builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 0);
522
523 for i in 0..jsonb_array.len() {
524 if jsonb_array.is_null(i) {
525 builder.append_null();
526 } else if let Some(key) = common::get_string_value_at(key_array, i) {
527 let jsonb_bytes = jsonb_array.value(i);
528 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
529
530 match common::get_json_value_by_key(&raw_jsonb, key)? {
531 Some(value) => builder.append_value(value.as_raw().as_ref()),
532 None => builder.append_null(),
533 }
534 } else {
535 builder.append_null();
536 }
537 }
538
539 Ok(Arc::new(builder.finish()))
540}
541
542pub fn json_get_string_udf() -> ScalarUDF {
551 create_udf(
552 "json_get_string",
553 vec![DataType::LargeBinary, DataType::Utf8],
554 DataType::Utf8,
555 Volatility::Immutable,
556 Arc::new(json_get_string_columnar_impl),
557 )
558}
559
560fn json_get_string_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
562 let arrays = common::columnar_to_arrays(args);
563 let result = json_get_string_impl(&arrays)?;
564 Ok(ColumnarValue::Array(result))
565}
566
567fn json_get_string_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
569 common::validate_arg_count(args, 2, "json_get_string")?;
570
571 let jsonb_array = common::extract_jsonb_array(args)?;
572 let key_array = common::extract_string_array(args, 1)?;
573
574 let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
575
576 for i in 0..jsonb_array.len() {
577 if jsonb_array.is_null(i) {
578 builder.append_null();
579 } else if let Some(key) = common::get_string_value_at(key_array, i) {
580 let jsonb_bytes = jsonb_array.value(i);
581 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
582
583 match common::get_json_value_by_key(&raw_jsonb, key)? {
584 Some(value) => match json_value_to_string(value)? {
585 Some(string_val) => builder.append_value(&string_val),
586 None => builder.append_null(),
587 },
588 None => builder.append_null(),
589 }
590 } else {
591 builder.append_null();
592 }
593 }
594
595 Ok(Arc::new(builder.finish()))
596}
597
598pub fn json_get_int_udf() -> ScalarUDF {
607 create_udf(
608 "json_get_int",
609 vec![DataType::LargeBinary, DataType::Utf8],
610 DataType::Int64,
611 Volatility::Immutable,
612 Arc::new(json_get_int_columnar_impl),
613 )
614}
615
616fn json_get_int_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
618 let arrays = common::columnar_to_arrays(args);
619 let result = json_get_int_impl(&arrays)?;
620 Ok(ColumnarValue::Array(result))
621}
622
623fn json_get_int_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
625 common::validate_arg_count(args, 2, "json_get_int")?;
626
627 let jsonb_array = common::extract_jsonb_array(args)?;
628 let key_array = common::extract_string_array(args, 1)?;
629
630 let mut builder = Int64Builder::with_capacity(jsonb_array.len());
631
632 for i in 0..jsonb_array.len() {
633 if jsonb_array.is_null(i) {
634 builder.append_null();
635 } else if let Some(key) = common::get_string_value_at(key_array, i) {
636 let jsonb_bytes = jsonb_array.value(i);
637 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
638
639 match common::get_json_value_by_key(&raw_jsonb, key)? {
640 Some(value) => match json_value_to_int(value)? {
641 Some(int_val) => builder.append_value(int_val),
642 None => builder.append_null(),
643 },
644 None => builder.append_null(),
645 }
646 } else {
647 builder.append_null();
648 }
649 }
650
651 Ok(Arc::new(builder.finish()))
652}
653
654pub fn json_get_float_udf() -> ScalarUDF {
663 create_udf(
664 "json_get_float",
665 vec![DataType::LargeBinary, DataType::Utf8],
666 DataType::Float64,
667 Volatility::Immutable,
668 Arc::new(json_get_float_columnar_impl),
669 )
670}
671
672fn json_get_float_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
674 let arrays = common::columnar_to_arrays(args);
675 let result = json_get_float_impl(&arrays)?;
676 Ok(ColumnarValue::Array(result))
677}
678
679fn json_get_float_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
681 common::validate_arg_count(args, 2, "json_get_float")?;
682
683 let jsonb_array = common::extract_jsonb_array(args)?;
684 let key_array = common::extract_string_array(args, 1)?;
685
686 let mut builder = Float64Builder::with_capacity(jsonb_array.len());
687
688 for i in 0..jsonb_array.len() {
689 if jsonb_array.is_null(i) {
690 builder.append_null();
691 } else if let Some(key) = common::get_string_value_at(key_array, i) {
692 let jsonb_bytes = jsonb_array.value(i);
693 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
694
695 match common::get_json_value_by_key(&raw_jsonb, key)? {
696 Some(value) => match json_value_to_float(value)? {
697 Some(float_val) => builder.append_value(float_val),
698 None => builder.append_null(),
699 },
700 None => builder.append_null(),
701 }
702 } else {
703 builder.append_null();
704 }
705 }
706
707 Ok(Arc::new(builder.finish()))
708}
709
710pub fn json_get_bool_udf() -> ScalarUDF {
719 create_udf(
720 "json_get_bool",
721 vec![DataType::LargeBinary, DataType::Utf8],
722 DataType::Boolean,
723 Volatility::Immutable,
724 Arc::new(json_get_bool_columnar_impl),
725 )
726}
727
728fn json_get_bool_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
730 let arrays = common::columnar_to_arrays(args);
731 let result = json_get_bool_impl(&arrays)?;
732 Ok(ColumnarValue::Array(result))
733}
734
735fn json_get_bool_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
737 common::validate_arg_count(args, 2, "json_get_bool")?;
738
739 let jsonb_array = common::extract_jsonb_array(args)?;
740 let key_array = common::extract_string_array(args, 1)?;
741
742 let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
743
744 for i in 0..jsonb_array.len() {
745 if jsonb_array.is_null(i) {
746 builder.append_null();
747 } else if let Some(key) = common::get_string_value_at(key_array, i) {
748 let jsonb_bytes = jsonb_array.value(i);
749 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
750
751 match common::get_json_value_by_key(&raw_jsonb, key)? {
752 Some(value) => match json_value_to_bool(value)? {
753 Some(bool_val) => builder.append_value(bool_val),
754 None => builder.append_null(),
755 },
756 None => builder.append_null(),
757 }
758 } else {
759 builder.append_null();
760 }
761 }
762
763 Ok(Arc::new(builder.finish()))
764}
765
766pub fn json_array_contains_udf() -> ScalarUDF {
776 create_udf(
777 "json_array_contains",
778 vec![DataType::LargeBinary, DataType::Utf8, DataType::Utf8],
779 DataType::Boolean,
780 Volatility::Immutable,
781 Arc::new(json_array_contains_columnar_impl),
782 )
783}
784
785fn json_array_contains_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
787 let arrays = common::columnar_to_arrays(args);
788 let result = json_array_contains_impl(&arrays)?;
789 Ok(ColumnarValue::Array(result))
790}
791
792fn json_array_contains_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
794 common::validate_arg_count(args, 3, "json_array_contains")?;
795
796 let jsonb_array = common::extract_jsonb_array(args)?;
797 let path_array = common::extract_string_array(args, 1)?;
798 let value_array = common::extract_string_array(args, 2)?;
799
800 let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
801
802 for i in 0..jsonb_array.len() {
803 if jsonb_array.is_null(i) {
804 builder.append_null();
805 } else {
806 let path = common::get_string_value_at(path_array, i);
807 let value = common::get_string_value_at(value_array, i);
808
809 match (path, value) {
810 (Some(p), Some(v)) => {
811 let jsonb_bytes = jsonb_array.value(i);
812 let contains = check_array_contains(jsonb_bytes, p, v)?;
813 builder.append_value(contains);
814 }
815 _ => builder.append_null(),
816 }
817 }
818 }
819
820 Ok(Arc::new(builder.finish()))
821}
822
823fn check_array_contains(jsonb_bytes: &[u8], path: &str, value: &str) -> Result<bool> {
825 let json_path = common::parse_json_path(path)?;
826
827 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
828 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
829 match selector.select_values(&json_path) {
830 Ok(values) => {
831 for v in values {
832 let raw = v.as_raw();
834 let mut index = 0;
836 loop {
837 match raw.get_by_index(index) {
838 Ok(Some(elem)) => {
839 let elem_str = elem.to_string();
840 if elem_str == value || elem_str == format!("\"{}\"", value) {
842 return Ok(true);
843 }
844 index += 1;
845 }
846 Ok(None) => break, Err(_) => break, }
849 }
850 }
851 Ok(false)
852 }
853 Err(e) => Err(common::execution_error(format!(
854 "Failed to check array contains at path '{}': {}",
855 path, e
856 ))),
857 }
858}
859
860pub fn json_array_length_udf() -> ScalarUDF {
869 create_udf(
870 "json_array_length",
871 vec![DataType::LargeBinary, DataType::Utf8],
872 DataType::Int64,
873 Volatility::Immutable,
874 Arc::new(json_array_length_columnar_impl),
875 )
876}
877
878fn json_array_length_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
880 let arrays = common::columnar_to_arrays(args);
881 let result = json_array_length_impl(&arrays)?;
882 Ok(ColumnarValue::Array(result))
883}
884
885fn json_array_length_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
887 common::validate_arg_count(args, 2, "json_array_length")?;
888
889 let jsonb_array = common::extract_jsonb_array(args)?;
890 let path_array = common::extract_string_array(args, 1)?;
891
892 let mut builder = Int64Builder::with_capacity(jsonb_array.len());
893
894 for i in 0..jsonb_array.len() {
895 if jsonb_array.is_null(i) {
896 builder.append_null();
897 } else if let Some(path) = common::get_string_value_at(path_array, i) {
898 let jsonb_bytes = jsonb_array.value(i);
899 match get_array_length(jsonb_bytes, path)? {
900 Some(len) => builder.append_value(len),
901 None => builder.append_null(),
902 }
903 } else {
904 builder.append_null();
905 }
906 }
907
908 Ok(Arc::new(builder.finish()))
909}
910
911fn get_array_length(jsonb_bytes: &[u8], path: &str) -> Result<Option<i64>> {
913 let json_path = common::parse_json_path(path)?;
914
915 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
916 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
917 match selector.select_values(&json_path) {
918 Ok(values) => {
919 if values.is_empty() {
920 return Ok(None);
921 }
922 let first = &values[0];
923 let raw = first.as_raw();
924
925 let mut count = 0;
927 loop {
928 match raw.get_by_index(count) {
929 Ok(Some(_)) => count += 1,
930 Ok(None) => break, Err(_) => {
932 if count == 0 {
934 return Err(common::execution_error(format!(
935 "Path '{}' does not point to an array",
936 path
937 )));
938 }
939 break;
940 }
941 }
942 }
943 Ok(Some(count as i64))
944 }
945 Err(e) => Err(common::execution_error(format!(
946 "Failed to get array length at path '{}': {}",
947 path, e
948 ))),
949 }
950}
951
952#[cfg(test)]
953mod tests {
954 use super::*;
955 use arrow_array::builder::LargeBinaryBuilder;
956 use arrow_array::{BooleanArray, Float64Array, Int64Array};
957
958 fn create_test_jsonb(json_str: &str) -> Vec<u8> {
959 jsonb::parse_value(json_str.as_bytes()).unwrap().to_vec()
960 }
961
962 #[test]
963 fn test_jsonb_type_enum() {
964 assert_eq!(JsonbType::Null.as_u8(), 0);
966 assert_eq!(JsonbType::Boolean.as_u8(), 1);
967 assert_eq!(JsonbType::Int64.as_u8(), 2);
968 assert_eq!(JsonbType::Float64.as_u8(), 3);
969 assert_eq!(JsonbType::String.as_u8(), 4);
970 assert_eq!(JsonbType::Array.as_u8(), 5);
971 assert_eq!(JsonbType::Object.as_u8(), 6);
972
973 assert_eq!(JsonbType::from_u8(0), Some(JsonbType::Null));
975 assert_eq!(JsonbType::from_u8(1), Some(JsonbType::Boolean));
976 assert_eq!(JsonbType::from_u8(2), Some(JsonbType::Int64));
977 assert_eq!(JsonbType::from_u8(3), Some(JsonbType::Float64));
978 assert_eq!(JsonbType::from_u8(4), Some(JsonbType::String));
979 assert_eq!(JsonbType::from_u8(5), Some(JsonbType::Array));
980 assert_eq!(JsonbType::from_u8(6), Some(JsonbType::Object));
981 assert_eq!(JsonbType::from_u8(7), None); }
983
984 #[tokio::test]
985 async fn test_json_extract_udf() -> Result<()> {
986 let json = r#"{"user": {"name": "Alice", "age": 30}, "tags": ["python", "ml"]}"#;
987 let jsonb_bytes = create_test_jsonb(json);
988
989 let mut binary_builder = LargeBinaryBuilder::new();
990 binary_builder.append_value(&jsonb_bytes);
991 binary_builder.append_value(&jsonb_bytes);
992 binary_builder.append_value(&jsonb_bytes);
993 binary_builder.append_null();
994
995 let jsonb_array = Arc::new(binary_builder.finish());
996 let path_array = Arc::new(StringArray::from(vec![
997 Some("$.user.name"),
998 Some("$.user.age"),
999 Some("$.tags[*]"),
1000 Some("$.user.name"),
1001 ]));
1002
1003 let result = json_extract_impl(&[jsonb_array, path_array])?;
1004 let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1005
1006 assert_eq!(string_array.len(), 4);
1007 assert_eq!(string_array.value(0), "\"Alice\"");
1008 assert_eq!(string_array.value(1), "30");
1009 assert_eq!(string_array.value(2), "[\"python\",\"ml\"]");
1010 assert!(string_array.is_null(3));
1011
1012 Ok(())
1013 }
1014
1015 #[tokio::test]
1016 async fn test_json_exists_udf() -> Result<()> {
1017 let json = r#"{"user": {"name": "Alice", "age": 30}, "tags": ["rust", "json"]}"#;
1018 let jsonb_bytes = create_test_jsonb(json);
1019
1020 let mut binary_builder = LargeBinaryBuilder::new();
1021 binary_builder.append_value(&jsonb_bytes);
1022 binary_builder.append_value(&jsonb_bytes);
1023 binary_builder.append_value(&jsonb_bytes);
1024 binary_builder.append_null();
1025
1026 let jsonb_array = Arc::new(binary_builder.finish());
1027 let path_array = Arc::new(StringArray::from(vec![
1028 Some("$.user.name"),
1029 Some("$.user.email"),
1030 Some("$.tags"),
1031 Some("$.any"),
1032 ]));
1033
1034 let result = json_exists_impl(&[jsonb_array, path_array])?;
1035 let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1036
1037 assert_eq!(bool_array.len(), 4);
1038 assert!(bool_array.value(0));
1039 assert!(!bool_array.value(1));
1040 assert!(bool_array.value(2));
1041 assert!(bool_array.is_null(3));
1042
1043 Ok(())
1044 }
1045
1046 #[tokio::test]
1047 async fn test_json_get_string_udf() -> Result<()> {
1048 let json = r#"{"str": "hello", "num": 123, "bool": true, "null": null}"#;
1050 let jsonb_bytes = create_test_jsonb(json);
1051
1052 let mut binary_builder = LargeBinaryBuilder::new();
1053 binary_builder.append_value(&jsonb_bytes);
1054 binary_builder.append_value(&jsonb_bytes);
1055 binary_builder.append_value(&jsonb_bytes);
1056 binary_builder.append_value(&jsonb_bytes);
1057
1058 let jsonb_array = Arc::new(binary_builder.finish());
1059 let key_array = Arc::new(StringArray::from(vec![
1060 Some("str"),
1061 Some("num"),
1062 Some("bool"),
1063 Some("null"),
1064 ]));
1065
1066 let result = json_get_string_impl(&[jsonb_array, key_array])?;
1067 let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1068
1069 assert_eq!(string_array.len(), 4);
1070 assert_eq!(string_array.value(0), "hello");
1071 assert_eq!(string_array.value(1), "123");
1072 assert_eq!(string_array.value(2), "true");
1073 assert!(string_array.is_null(3));
1074
1075 Ok(())
1076 }
1077
1078 #[tokio::test]
1079 async fn test_json_get_int_udf() -> Result<()> {
1080 let json = r#"{"int": 42, "str_num": "99", "bool": true, "0": 7}"#;
1081 let jsonb_bytes = create_test_jsonb(json);
1082
1083 let mut binary_builder = LargeBinaryBuilder::new();
1084 for _ in 0..4 {
1085 binary_builder.append_value(&jsonb_bytes);
1086 }
1087
1088 let jsonb_array = Arc::new(binary_builder.finish());
1089 let key_array = Arc::new(StringArray::from(vec![
1090 Some("int"),
1091 Some("str_num"),
1092 Some("bool"),
1093 Some("0"),
1094 ]));
1095
1096 let result = json_get_int_impl(&[jsonb_array, key_array])?;
1097 let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
1098
1099 assert_eq!(int_array.len(), 4);
1100 assert_eq!(int_array.value(0), 42);
1101 assert_eq!(int_array.value(1), 99);
1102 assert_eq!(int_array.value(2), 1); assert_eq!(int_array.value(3), 7);
1104
1105 Ok(())
1106 }
1107
1108 #[tokio::test]
1109 async fn test_json_get_float_udf() -> Result<()> {
1110 let json = r#"{
1111 "float_decimal": 1.5,
1112 "float_neg": -2.5,
1113 "float_int_value": 1.0,
1114 "float_exp": 1e2,
1115 "int_pos": 42,
1116 "int_neg": -7,
1117 "big_int": 9223372036854775808,
1118 "str_num": "3.5",
1119 "bool_true": true,
1120 "null_val": null
1121 }"#;
1122 let jsonb_bytes = create_test_jsonb(json);
1123
1124 let mut binary_builder = LargeBinaryBuilder::new();
1125 for _ in 0..11 {
1126 binary_builder.append_value(&jsonb_bytes);
1127 }
1128 let jsonb_array = Arc::new(binary_builder.finish());
1129 let key_array = Arc::new(StringArray::from(vec![
1130 Some("float_decimal"),
1131 Some("float_neg"),
1132 Some("float_int_value"),
1133 Some("float_exp"),
1134 Some("int_pos"),
1135 Some("int_neg"),
1136 Some("big_int"),
1137 Some("str_num"),
1138 Some("bool_true"),
1139 Some("null_val"),
1140 Some("missing"),
1141 ]));
1142
1143 let result = json_get_float_impl(&[jsonb_array, key_array])?;
1144 let float_array = result.as_any().downcast_ref::<Float64Array>().unwrap();
1145
1146 assert_eq!(float_array.len(), 11);
1147 assert_eq!(float_array.value(0), 1.5);
1148 assert_eq!(float_array.value(1), -2.5);
1149 assert_eq!(float_array.value(2), 1.0);
1150 assert_eq!(float_array.value(3), 100.0);
1151 assert_eq!(float_array.value(4), 42.0);
1152 assert_eq!(float_array.value(5), -7.0);
1153 assert_eq!(float_array.value(6), 9223372036854775808.0);
1155 assert_eq!(float_array.value(7), 3.5);
1156 assert_eq!(float_array.value(8), 1.0); assert!(float_array.is_null(9));
1158 assert!(float_array.is_null(10));
1159
1160 Ok(())
1161 }
1162
1163 #[tokio::test]
1164 async fn test_json_get_bool_udf() -> Result<()> {
1165 let json =
1166 r#"{"bool_true": true, "bool_false": false, "str_true": "true", "str_false": "false"}"#;
1167 let jsonb_bytes = create_test_jsonb(json);
1168
1169 let mut binary_builder = LargeBinaryBuilder::new();
1170 binary_builder.append_value(&jsonb_bytes);
1171 binary_builder.append_value(&jsonb_bytes);
1172 binary_builder.append_value(&jsonb_bytes);
1173 binary_builder.append_value(&jsonb_bytes);
1174
1175 let jsonb_array = Arc::new(binary_builder.finish());
1176 let key_array = Arc::new(StringArray::from(vec![
1177 Some("bool_true"),
1178 Some("bool_false"),
1179 Some("str_true"),
1180 Some("str_false"),
1181 ]));
1182
1183 let result = json_get_bool_impl(&[jsonb_array, key_array])?;
1184 let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1185
1186 assert_eq!(bool_array.len(), 4);
1187 assert!(bool_array.value(0));
1188 assert!(!bool_array.value(1));
1189 assert!(bool_array.value(2)); assert!(!bool_array.value(3)); Ok(())
1193 }
1194
1195 #[tokio::test]
1196 async fn test_json_array_contains_udf() -> Result<()> {
1197 let json = r#"{"tags": ["rust", "json", "database"], "nums": [1, 2, 3]}"#;
1198 let jsonb_bytes = create_test_jsonb(json);
1199
1200 let mut binary_builder = LargeBinaryBuilder::new();
1201 binary_builder.append_value(&jsonb_bytes);
1202 binary_builder.append_value(&jsonb_bytes);
1203 binary_builder.append_value(&jsonb_bytes);
1204 binary_builder.append_null();
1205
1206 let jsonb_array = Arc::new(binary_builder.finish());
1207 let path_array = Arc::new(StringArray::from(vec![
1208 Some("$.tags"),
1209 Some("$.tags"),
1210 Some("$.nums"),
1211 Some("$.tags"),
1212 ]));
1213 let value_array = Arc::new(StringArray::from(vec![
1214 Some("rust"),
1215 Some("python"),
1216 Some("2"),
1217 Some("any"),
1218 ]));
1219
1220 let result = json_array_contains_impl(&[jsonb_array, path_array, value_array])?;
1221 let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1222
1223 assert_eq!(bool_array.len(), 4);
1224 assert!(bool_array.value(0));
1225 assert!(!bool_array.value(1));
1226 assert!(bool_array.value(2));
1227 assert!(bool_array.is_null(3));
1228
1229 Ok(())
1230 }
1231
1232 #[tokio::test]
1233 async fn test_json_array_length_udf() -> Result<()> {
1234 let json = r#"{"empty": [], "tags": ["a", "b", "c"], "nested": {"arr": [1, 2]}}"#;
1235 let jsonb_bytes = create_test_jsonb(json);
1236
1237 let mut binary_builder = LargeBinaryBuilder::new();
1238 binary_builder.append_value(&jsonb_bytes);
1239 binary_builder.append_value(&jsonb_bytes);
1240 binary_builder.append_value(&jsonb_bytes);
1241 binary_builder.append_null();
1242
1243 let jsonb_array = Arc::new(binary_builder.finish());
1244 let path_array = Arc::new(StringArray::from(vec![
1245 Some("$.empty"),
1246 Some("$.tags"),
1247 Some("$.nested.arr"),
1248 Some("$.any"),
1249 ]));
1250
1251 let result = json_array_length_impl(&[jsonb_array, path_array])?;
1252 let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
1253
1254 assert_eq!(int_array.len(), 4);
1255 assert_eq!(int_array.value(0), 0);
1256 assert_eq!(int_array.value(1), 3);
1257 assert_eq!(int_array.value(2), 2);
1258 assert!(int_array.is_null(3));
1259
1260 Ok(())
1261 }
1262
1263 #[tokio::test]
1264 async fn test_json_extract_with_type() -> Result<()> {
1265 use arrow_array::StructArray;
1266 use arrow_array::UInt8Array;
1267
1268 let cases: &[(&str, JsonbType)] = &[
1269 (r#"{"v": 1}"#, JsonbType::Int64),
1270 (r#"{"v": 0}"#, JsonbType::Int64),
1271 (r#"{"v": -42}"#, JsonbType::Int64),
1272 (r#"{"v": 9223372036854775807}"#, JsonbType::Int64), (r#"{"v": 9223372036854775808}"#, JsonbType::Float64), (r#"{"v": 1.0}"#, JsonbType::Float64),
1275 (r#"{"v": 2.7}"#, JsonbType::Float64),
1276 (r#"{"v": 1.5}"#, JsonbType::Float64),
1277 (r#"{"v": -1.5}"#, JsonbType::Float64),
1278 (r#"{"v": 1e2}"#, JsonbType::Float64),
1279 ];
1280
1281 for (json, expected) in cases {
1282 let bytes = create_test_jsonb(json);
1283 let mut binary_builder = LargeBinaryBuilder::new();
1284 binary_builder.append_value(&bytes);
1285 let jsonb_array: ArrayRef = Arc::new(binary_builder.finish());
1286 let path_array: ArrayRef = Arc::new(StringArray::from(vec![Some("$.v")]));
1287
1288 let result = json_extract_with_type_impl(&[jsonb_array, path_array])?;
1289 let struct_array = result.as_any().downcast_ref::<StructArray>().unwrap();
1290 let type_tags = struct_array
1291 .column_by_name("type_tag")
1292 .unwrap()
1293 .as_any()
1294 .downcast_ref::<UInt8Array>()
1295 .unwrap();
1296 assert_eq!(type_tags.value(0), expected.as_u8());
1297 }
1298
1299 Ok(())
1300 }
1301
1302 #[tokio::test]
1303 async fn test_json_extract_with_wildcard() -> Result<()> {
1304 use arrow_array::StructArray;
1305 use arrow_array::UInt8Array;
1306
1307 let json = r#"{"items": [{"price": 1}, {"price": 2}]}"#;
1308 let jsonb_bytes = create_test_jsonb(json);
1309
1310 let mut binary_builder = LargeBinaryBuilder::new();
1311 binary_builder.append_value(&jsonb_bytes);
1312
1313 let jsonb_array: ArrayRef = Arc::new(binary_builder.finish());
1314 let path_array: ArrayRef = Arc::new(StringArray::from(vec![Some("$.items[*].price")]));
1315
1316 let result = json_extract_with_type_impl(&[jsonb_array, path_array])?;
1317 let struct_array = result.as_any().downcast_ref::<StructArray>().unwrap();
1318 let values = struct_array
1319 .column_by_name("value")
1320 .unwrap()
1321 .as_any()
1322 .downcast_ref::<LargeBinaryArray>()
1323 .unwrap();
1324 let type_tags = struct_array
1325 .column_by_name("type_tag")
1326 .unwrap()
1327 .as_any()
1328 .downcast_ref::<UInt8Array>()
1329 .unwrap();
1330
1331 assert_eq!(jsonb::RawJsonb::new(values.value(0)).to_string(), "[1,2]");
1332 assert_eq!(type_tags.value(0), JsonbType::Array.as_u8());
1333
1334 Ok(())
1335 }
1336
1337 #[tokio::test]
1338 async fn test_json_array_access() -> Result<()> {
1339 let json = r#"["first", "second", "third"]"#;
1340 let jsonb_bytes = create_test_jsonb(json);
1341
1342 let mut binary_builder = LargeBinaryBuilder::new();
1343 binary_builder.append_value(&jsonb_bytes);
1344 binary_builder.append_value(&jsonb_bytes);
1345 binary_builder.append_value(&jsonb_bytes);
1346
1347 let jsonb_array = Arc::new(binary_builder.finish());
1348 let key_array = Arc::new(StringArray::from(vec![
1349 Some("0"),
1350 Some("1"),
1351 Some("10"), ]));
1353
1354 let result = json_get_string_impl(&[jsonb_array, key_array])?;
1355 let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1356
1357 assert_eq!(string_array.len(), 3);
1358 assert_eq!(string_array.value(0), "first");
1359 assert_eq!(string_array.value(1), "second");
1360 assert!(string_array.is_null(2));
1361
1362 Ok(())
1363 }
1364
1365 #[tokio::test]
1366 async fn test_json_get_numeric_object_key() -> Result<()> {
1367 let obj_bytes = create_test_jsonb(r#"{"0": "from_object", "1": 42}"#);
1368 let arr_bytes = create_test_jsonb(r#"["zero", "one", "two"]"#);
1369
1370 let mut binary_builder = LargeBinaryBuilder::new();
1371 binary_builder.append_value(&obj_bytes);
1372 binary_builder.append_value(&arr_bytes);
1373 binary_builder.append_value(&arr_bytes);
1374
1375 let jsonb_array = Arc::new(binary_builder.finish());
1376 let key_array = Arc::new(StringArray::from(vec![
1377 Some("0"), Some("0"), Some("foo"), ]));
1381
1382 let result = json_get_string_impl(&[jsonb_array, key_array])?;
1383 let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1384
1385 assert_eq!(string_array.len(), 3);
1386 assert_eq!(string_array.value(0), "from_object");
1387 assert_eq!(string_array.value(1), "zero");
1388 assert!(string_array.is_null(2));
1389
1390 Ok(())
1391 }
1392}