Skip to main content

lance_datafusion/udf/
json.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_array::builder::{
5    BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder,
6};
7use arrow_array::{Array, ArrayRef, LargeBinaryArray, StringArray};
8use arrow_schema::DataType;
9use datafusion::error::{DataFusionError, Result};
10use datafusion::logical_expr::{ScalarUDF, Volatility};
11use datafusion::physical_plan::ColumnarValue;
12use datafusion::prelude::create_udf;
13use std::sync::Arc;
14
15/// Represents the type of a JSONB value
16#[repr(u8)]
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum JsonbType {
19    Null = 0,
20    Boolean = 1,
21    Int64 = 2,
22    Float64 = 3,
23    String = 4,
24    Array = 5,
25    Object = 6,
26}
27
28impl JsonbType {
29    /// Convert from u8 value
30    pub fn from_u8(value: u8) -> Option<Self> {
31        match value {
32            0 => Some(Self::Null),
33            1 => Some(Self::Boolean),
34            2 => Some(Self::Int64),
35            3 => Some(Self::Float64),
36            4 => Some(Self::String),
37            5 => Some(Self::Array),
38            6 => Some(Self::Object),
39            _ => None,
40        }
41    }
42
43    /// Convert to u8 value for storage in Arrow arrays
44    pub fn as_u8(self) -> u8 {
45        self as u8
46    }
47}
48
49/// Common helper functions and types for JSON UDFs
50mod common {
51    use super::*;
52
53    /// Convert ColumnarValue arguments to ArrayRef vector
54    ///
55    /// Note: This implementation currently broadcasts scalars to arrays.
56    /// Future optimization: handle scalars directly without broadcasting
57    /// to improve performance for scalar inputs.
58    pub fn columnar_to_arrays(args: &[ColumnarValue]) -> Vec<ArrayRef> {
59        args.iter()
60            .map(|arg| match arg {
61                ColumnarValue::Array(arr) => arr.clone(),
62                ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(),
63            })
64            .collect()
65    }
66
67    /// Create DataFusionError for execution failures (simplified error wrapping)
68    pub fn execution_error(msg: impl Into<String>) -> DataFusionError {
69        DataFusionError::Execution(msg.into())
70    }
71
72    /// Validate argument count for UDF
73    pub fn validate_arg_count(
74        args: &[ArrayRef],
75        expected: usize,
76        function_name: &str,
77    ) -> Result<()> {
78        if args.len() != expected {
79            return Err(execution_error(format!(
80                "{} requires exactly {} arguments",
81                function_name, expected
82            )));
83        }
84        Ok(())
85    }
86
87    /// Extract and validate LargeBinaryArray from first argument
88    pub fn extract_jsonb_array(args: &[ArrayRef]) -> Result<&LargeBinaryArray> {
89        args[0]
90            .as_any()
91            .downcast_ref::<LargeBinaryArray>()
92            .ok_or_else(|| execution_error("First argument must be LargeBinary"))
93    }
94
95    /// Extract and validate StringArray from specified argument
96    pub fn extract_string_array(args: &[ArrayRef], arg_index: usize) -> Result<&StringArray> {
97        args[arg_index]
98            .as_any()
99            .downcast_ref::<StringArray>()
100            .ok_or_else(|| execution_error(format!("Argument {} must be String", arg_index + 1)))
101    }
102
103    /// Get string value at index, handling scalar broadcast case
104    /// When a scalar is converted to an array, it becomes a single-element array
105    /// This function handles accessing that value repeatedly for all rows
106    pub fn get_string_value_at(string_array: &StringArray, index: usize) -> Option<&str> {
107        // Handle scalar broadcast case: if array has only 1 element, always use index 0
108        let actual_index = if string_array.len() == 1 { 0 } else { index };
109
110        if string_array.is_null(actual_index) {
111            None
112        } else {
113            Some(string_array.value(actual_index))
114        }
115    }
116
117    /// Get a JSON field or array element by key.
118    pub fn get_json_value_by_key(
119        raw_jsonb: &jsonb::RawJsonb,
120        key: &str,
121    ) -> Result<Option<jsonb::OwnedJsonb>> {
122        if raw_jsonb.is_object().unwrap_or(false) {
123            raw_jsonb
124                .get_by_name(key, false)
125                .map_err(|e| execution_error(format!("Failed to get field '{}': {}", key, e)))
126        } else if raw_jsonb.is_array().unwrap_or(false) {
127            match key.parse::<usize>() {
128                Ok(index) => raw_jsonb.get_by_index(index).map_err(|e| {
129                    execution_error(format!("Failed to get array element [{}]: {}", index, e))
130                }),
131                Err(_) => Ok(None),
132            }
133        } else {
134            Ok(None)
135        }
136    }
137
138    /// Parse JSONPath with proper error handling (no false returns)
139    pub fn parse_json_path(path: &str) -> Result<jsonb::jsonpath::JsonPath<'_>> {
140        jsonb::jsonpath::parse_json_path(path.as_bytes())
141            .map_err(|e| execution_error(format!("Invalid JSONPath '{}': {}", path, e)))
142    }
143}
144
145/// Convert JSONB value to string using jsonb's built-in serde (strict mode)
146fn json_value_to_string(value: jsonb::OwnedJsonb) -> Result<Option<String>> {
147    let raw_jsonb = value.as_raw();
148
149    // Check for null first
150    if raw_jsonb
151        .is_null()
152        .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
153    {
154        return Ok(None);
155    }
156
157    // Use jsonb's built-in to_str() method - strict conversion
158    raw_jsonb
159        .to_str()
160        .map(Some)
161        .map_err(|e| common::execution_error(format!("Failed to convert to string: {}", e)))
162}
163
164/// Convert JSONB value to integer using jsonb's built-in serde (strict mode)
165fn json_value_to_int(value: jsonb::OwnedJsonb) -> Result<Option<i64>> {
166    let raw_jsonb = value.as_raw();
167
168    // Check for null first
169    if raw_jsonb
170        .is_null()
171        .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
172    {
173        return Ok(None);
174    }
175
176    // Use jsonb's built-in to_i64() method - strict conversion
177    raw_jsonb
178        .to_i64()
179        .map(Some)
180        .map_err(|e| common::execution_error(format!("Failed to convert to integer: {}", e)))
181}
182
183/// Convert JSONB value to float using jsonb's built-in serde (strict mode)
184fn json_value_to_float(value: jsonb::OwnedJsonb) -> Result<Option<f64>> {
185    let raw_jsonb = value.as_raw();
186
187    // Check for null first
188    if raw_jsonb
189        .is_null()
190        .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
191    {
192        return Ok(None);
193    }
194
195    // Use jsonb's built-in to_f64() method - strict conversion
196    raw_jsonb
197        .to_f64()
198        .map(Some)
199        .map_err(|e| common::execution_error(format!("Failed to convert to float: {}", e)))
200}
201
202/// Convert JSONB value to boolean using jsonb's built-in serde (strict mode)
203fn json_value_to_bool(value: jsonb::OwnedJsonb) -> Result<Option<bool>> {
204    let raw_jsonb = value.as_raw();
205
206    // Check for null first
207    if raw_jsonb
208        .is_null()
209        .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
210    {
211        return Ok(None);
212    }
213
214    // Use jsonb's built-in to_bool() method - strict conversion
215    raw_jsonb
216        .to_bool()
217        .map(Some)
218        .map_err(|e| common::execution_error(format!("Failed to convert to boolean: {}", e)))
219}
220
221/// Create the json_extract UDF for extracting JSONPath from JSON data
222///
223/// # Arguments
224/// * First parameter: JSONB binary data (LargeBinary)
225/// * Second parameter: JSONPath expression as string (Utf8)
226///
227/// # Returns
228/// String representation of the extracted value, or null if path not found
229pub fn json_extract_udf() -> ScalarUDF {
230    create_udf(
231        "json_extract",
232        vec![DataType::LargeBinary, DataType::Utf8],
233        DataType::Utf8,
234        Volatility::Immutable,
235        Arc::new(json_extract_columnar_impl),
236    )
237}
238
239/// Create the json_extract_with_type UDF that returns JSONB bytes with type information
240///
241/// # Arguments
242/// * First parameter: JSONB binary data (LargeBinary)
243/// * Second parameter: JSONPath expression as string (Utf8)
244///
245/// # Returns
246/// A struct with two fields:
247/// - value: LargeBinary (the extracted JSONB value)
248/// - type_tag: UInt8 (type information: 0=null, 1=bool, 2=int64, 3=float64, 4=string, 5=array, 6=object)
249pub fn json_extract_with_type_udf() -> ScalarUDF {
250    use arrow_schema::Fields;
251
252    let return_type = DataType::Struct(Fields::from(vec![
253        arrow_schema::Field::new("value", DataType::LargeBinary, true),
254        arrow_schema::Field::new("type_tag", DataType::UInt8, false),
255    ]));
256
257    create_udf(
258        "json_extract_with_type",
259        vec![DataType::LargeBinary, DataType::Utf8],
260        return_type,
261        Volatility::Immutable,
262        Arc::new(json_extract_with_type_columnar_impl),
263    )
264}
265
266/// Implementation of json_extract function with ColumnarValue
267fn json_extract_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
268    let arrays = common::columnar_to_arrays(args);
269    let result = json_extract_impl(&arrays)?;
270    Ok(ColumnarValue::Array(result))
271}
272
273/// Implementation of json_extract_with_type function with ColumnarValue
274fn json_extract_with_type_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
275    let arrays = common::columnar_to_arrays(args);
276    let result = json_extract_with_type_impl(&arrays)?;
277    Ok(ColumnarValue::Array(result))
278}
279
280/// Implementation of json_extract function
281fn json_extract_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
282    common::validate_arg_count(args, 2, "json_extract")?;
283
284    let jsonb_array = common::extract_jsonb_array(args)?;
285    let path_array = common::extract_string_array(args, 1)?;
286    let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
287
288    for i in 0..jsonb_array.len() {
289        if jsonb_array.is_null(i) {
290            builder.append_null();
291        } else if let Some(path) = common::get_string_value_at(path_array, i) {
292            let jsonb_bytes = jsonb_array.value(i);
293            match extract_json_path(jsonb_bytes, path)? {
294                Some(value) => builder.append_value(&value),
295                None => builder.append_null(),
296            }
297        } else {
298            builder.append_null();
299        }
300    }
301
302    Ok(Arc::new(builder.finish()))
303}
304
305/// Implementation of json_extract_with_type function
306fn json_extract_with_type_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
307    use arrow_array::StructArray;
308    use arrow_array::builder::{LargeBinaryBuilder, UInt8Builder};
309
310    common::validate_arg_count(args, 2, "json_extract_with_type")?;
311
312    let jsonb_array = common::extract_jsonb_array(args)?;
313    let path_array = common::extract_string_array(args, 1)?;
314
315    let mut value_builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 1024);
316    let mut type_builder = UInt8Builder::with_capacity(jsonb_array.len());
317
318    for i in 0..jsonb_array.len() {
319        if jsonb_array.is_null(i) {
320            value_builder.append_null();
321            type_builder.append_value(JsonbType::Null.as_u8());
322        } else if let Some(path) = common::get_string_value_at(path_array, i) {
323            let jsonb_bytes = jsonb_array.value(i);
324            match extract_json_path_with_type(jsonb_bytes, path)? {
325                Some((value_bytes, type_tag)) => {
326                    value_builder.append_value(&value_bytes);
327                    type_builder.append_value(type_tag);
328                }
329                None => {
330                    value_builder.append_null();
331                    type_builder.append_value(JsonbType::Null.as_u8());
332                }
333            }
334        } else {
335            value_builder.append_null();
336            type_builder.append_value(JsonbType::Null.as_u8());
337        }
338    }
339
340    // Create struct array with two fields
341    let value_array = Arc::new(value_builder.finish()) as ArrayRef;
342    let type_array = Arc::new(type_builder.finish()) as ArrayRef;
343
344    let struct_array = StructArray::from(vec![
345        (
346            Arc::new(arrow_schema::Field::new(
347                "value",
348                DataType::LargeBinary,
349                true,
350            )),
351            value_array,
352        ),
353        (
354            Arc::new(arrow_schema::Field::new("type_tag", DataType::UInt8, false)),
355            type_array,
356        ),
357    ]);
358
359    Ok(Arc::new(struct_array))
360}
361
362/// Extract value from JSONB using JSONPath and return with type information
363/// Returns (JSONB bytes, type_tag) where type_tag represents the JsonbType
364fn extract_json_path_with_type(jsonb_bytes: &[u8], path: &str) -> Result<Option<(Vec<u8>, u8)>> {
365    let json_path = common::parse_json_path(path)?;
366
367    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
368    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
369    match selector.select_value(&json_path) {
370        Ok(Some(owned_value)) => {
371            let raw = owned_value.as_raw();
372
373            // Determine type using JsonbType enum
374            let jsonb_type = if raw.is_null().unwrap_or(false) {
375                JsonbType::Null
376            } else if raw.is_boolean().unwrap_or(false) {
377                JsonbType::Boolean
378            } else if raw.is_number().unwrap_or(false) {
379                let is_float_storage =
380                    matches!(raw.as_number(), Ok(Some(jsonb::Number::Float64(_))));
381                if !is_float_storage && raw.is_i64().unwrap_or(false) {
382                    JsonbType::Int64
383                } else {
384                    JsonbType::Float64
385                }
386            } else if raw.is_string().unwrap_or(false) {
387                JsonbType::String
388            } else if raw.is_array().unwrap_or(false) {
389                JsonbType::Array
390            } else if raw.is_object().unwrap_or(false) {
391                JsonbType::Object
392            } else {
393                JsonbType::String // default to string
394            };
395
396            // Return the JSONB bytes and type tag as u8
397            Ok(Some((owned_value.to_vec(), jsonb_type.as_u8())))
398        }
399        Ok(None) => Ok(None),
400        Err(e) => Err(common::execution_error(format!(
401            "Failed to select value from path '{}': {}",
402            path, e
403        ))),
404    }
405}
406
407/// Extract value from JSONB using JSONPath
408///
409/// Note: Uses `select_value` so JSONPath expressions matching multiple values
410/// return a JSON array instead of silently dropping all but the first match.
411fn extract_json_path(jsonb_bytes: &[u8], path: &str) -> Result<Option<String>> {
412    let json_path = common::parse_json_path(path)?;
413
414    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
415    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
416    match selector.select_value(&json_path) {
417        Ok(value) => Ok(value.map(|value| value.to_string())),
418        Err(e) => Err(common::execution_error(format!(
419            "Failed to select value from path '{}': {}",
420            path, e
421        ))),
422    }
423}
424
425/// Create the json_exists UDF for checking if a JSONPath exists
426///
427/// # Arguments
428/// * First parameter: JSONB binary data (LargeBinary)
429/// * Second parameter: JSONPath expression as string (Utf8)
430///
431/// # Returns
432/// Boolean indicating whether the path exists in the JSON data
433pub fn json_exists_udf() -> ScalarUDF {
434    create_udf(
435        "json_exists",
436        vec![DataType::LargeBinary, DataType::Utf8],
437        DataType::Boolean,
438        Volatility::Immutable,
439        Arc::new(json_exists_columnar_impl),
440    )
441}
442
443/// Implementation of json_exists function with ColumnarValue
444fn json_exists_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
445    let arrays = common::columnar_to_arrays(args);
446    let result = json_exists_impl(&arrays)?;
447    Ok(ColumnarValue::Array(result))
448}
449
450/// Implementation of json_exists function
451fn json_exists_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
452    common::validate_arg_count(args, 2, "json_exists")?;
453
454    let jsonb_array = common::extract_jsonb_array(args)?;
455    let path_array = common::extract_string_array(args, 1)?;
456
457    let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
458
459    for i in 0..jsonb_array.len() {
460        if jsonb_array.is_null(i) {
461            builder.append_null();
462        } else if let Some(path) = common::get_string_value_at(path_array, i) {
463            let jsonb_bytes = jsonb_array.value(i);
464            let exists = check_json_path_exists(jsonb_bytes, path)?;
465            builder.append_value(exists);
466        } else {
467            builder.append_null();
468        }
469    }
470
471    Ok(Arc::new(builder.finish()))
472}
473
474/// Check if a JSONPath exists in JSONB
475fn check_json_path_exists(jsonb_bytes: &[u8], path: &str) -> Result<bool> {
476    let json_path = common::parse_json_path(path)?;
477
478    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
479    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
480    match selector.exists(&json_path) {
481        Ok(exists) => Ok(exists),
482        Err(e) => Err(common::execution_error(format!(
483            "Failed to check existence of path '{}': {}",
484            path, e
485        ))),
486    }
487}
488
489/// Create the json_get UDF for getting a field value as JSON string
490///
491/// # Arguments
492/// * First parameter: JSONB binary data (LargeBinary)
493/// * Second parameter: Field name or array index as string (Utf8)
494///
495/// # Returns
496/// Raw JSONB bytes of the field value, or null if not found
497pub fn json_get_udf() -> ScalarUDF {
498    create_udf(
499        "json_get",
500        vec![DataType::LargeBinary, DataType::Utf8],
501        DataType::LargeBinary,
502        Volatility::Immutable,
503        Arc::new(json_get_columnar_impl),
504    )
505}
506
507/// Implementation of json_get function with ColumnarValue
508fn json_get_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
509    let arrays = common::columnar_to_arrays(args);
510    let result = json_get_impl(&arrays)?;
511    Ok(ColumnarValue::Array(result))
512}
513
514/// Implementation of json_get function
515fn json_get_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
516    common::validate_arg_count(args, 2, "json_get")?;
517
518    let jsonb_array = common::extract_jsonb_array(args)?;
519    let key_array = common::extract_string_array(args, 1)?;
520
521    let mut builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 0);
522
523    for i in 0..jsonb_array.len() {
524        if jsonb_array.is_null(i) {
525            builder.append_null();
526        } else if let Some(key) = common::get_string_value_at(key_array, i) {
527            let jsonb_bytes = jsonb_array.value(i);
528            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
529
530            match common::get_json_value_by_key(&raw_jsonb, key)? {
531                Some(value) => builder.append_value(value.as_raw().as_ref()),
532                None => builder.append_null(),
533            }
534        } else {
535            builder.append_null();
536        }
537    }
538
539    Ok(Arc::new(builder.finish()))
540}
541
542/// Create the json_get_string UDF for getting a string value
543///
544/// # Arguments
545/// * First parameter: JSONB binary data (LargeBinary)
546/// * Second parameter: Field name or array index as string (Utf8)
547///
548/// # Returns
549/// String value with type coercion (numbers/booleans converted to strings)
550pub fn json_get_string_udf() -> ScalarUDF {
551    create_udf(
552        "json_get_string",
553        vec![DataType::LargeBinary, DataType::Utf8],
554        DataType::Utf8,
555        Volatility::Immutable,
556        Arc::new(json_get_string_columnar_impl),
557    )
558}
559
560/// Implementation of json_get_string function with ColumnarValue
561fn json_get_string_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
562    let arrays = common::columnar_to_arrays(args);
563    let result = json_get_string_impl(&arrays)?;
564    Ok(ColumnarValue::Array(result))
565}
566
567/// Implementation of json_get_string function
568fn json_get_string_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
569    common::validate_arg_count(args, 2, "json_get_string")?;
570
571    let jsonb_array = common::extract_jsonb_array(args)?;
572    let key_array = common::extract_string_array(args, 1)?;
573
574    let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
575
576    for i in 0..jsonb_array.len() {
577        if jsonb_array.is_null(i) {
578            builder.append_null();
579        } else if let Some(key) = common::get_string_value_at(key_array, i) {
580            let jsonb_bytes = jsonb_array.value(i);
581            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
582
583            match common::get_json_value_by_key(&raw_jsonb, key)? {
584                Some(value) => match json_value_to_string(value)? {
585                    Some(string_val) => builder.append_value(&string_val),
586                    None => builder.append_null(),
587                },
588                None => builder.append_null(),
589            }
590        } else {
591            builder.append_null();
592        }
593    }
594
595    Ok(Arc::new(builder.finish()))
596}
597
598/// Create the json_get_int UDF for getting an integer value
599///
600/// # Arguments
601/// * First parameter: JSONB binary data (LargeBinary)
602/// * Second parameter: Field name or array index as string (Utf8)
603///
604/// # Returns
605/// Integer value with type coercion (strings/floats/booleans converted to int)
606pub fn json_get_int_udf() -> ScalarUDF {
607    create_udf(
608        "json_get_int",
609        vec![DataType::LargeBinary, DataType::Utf8],
610        DataType::Int64,
611        Volatility::Immutable,
612        Arc::new(json_get_int_columnar_impl),
613    )
614}
615
616/// Implementation of json_get_int function with ColumnarValue
617fn json_get_int_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
618    let arrays = common::columnar_to_arrays(args);
619    let result = json_get_int_impl(&arrays)?;
620    Ok(ColumnarValue::Array(result))
621}
622
623/// Implementation of json_get_int function
624fn json_get_int_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
625    common::validate_arg_count(args, 2, "json_get_int")?;
626
627    let jsonb_array = common::extract_jsonb_array(args)?;
628    let key_array = common::extract_string_array(args, 1)?;
629
630    let mut builder = Int64Builder::with_capacity(jsonb_array.len());
631
632    for i in 0..jsonb_array.len() {
633        if jsonb_array.is_null(i) {
634            builder.append_null();
635        } else if let Some(key) = common::get_string_value_at(key_array, i) {
636            let jsonb_bytes = jsonb_array.value(i);
637            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
638
639            match common::get_json_value_by_key(&raw_jsonb, key)? {
640                Some(value) => match json_value_to_int(value)? {
641                    Some(int_val) => builder.append_value(int_val),
642                    None => builder.append_null(),
643                },
644                None => builder.append_null(),
645            }
646        } else {
647            builder.append_null();
648        }
649    }
650
651    Ok(Arc::new(builder.finish()))
652}
653
654/// Create the json_get_float UDF for getting a float value
655///
656/// # Arguments
657/// * First parameter: JSONB binary data (LargeBinary)
658/// * Second parameter: Field name or array index as string (Utf8)
659///
660/// # Returns
661/// Float value with type coercion (strings/integers/booleans converted to float)
662pub fn json_get_float_udf() -> ScalarUDF {
663    create_udf(
664        "json_get_float",
665        vec![DataType::LargeBinary, DataType::Utf8],
666        DataType::Float64,
667        Volatility::Immutable,
668        Arc::new(json_get_float_columnar_impl),
669    )
670}
671
672/// Implementation of json_get_float function with ColumnarValue
673fn json_get_float_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
674    let arrays = common::columnar_to_arrays(args);
675    let result = json_get_float_impl(&arrays)?;
676    Ok(ColumnarValue::Array(result))
677}
678
679/// Implementation of json_get_float function
680fn json_get_float_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
681    common::validate_arg_count(args, 2, "json_get_float")?;
682
683    let jsonb_array = common::extract_jsonb_array(args)?;
684    let key_array = common::extract_string_array(args, 1)?;
685
686    let mut builder = Float64Builder::with_capacity(jsonb_array.len());
687
688    for i in 0..jsonb_array.len() {
689        if jsonb_array.is_null(i) {
690            builder.append_null();
691        } else if let Some(key) = common::get_string_value_at(key_array, i) {
692            let jsonb_bytes = jsonb_array.value(i);
693            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
694
695            match common::get_json_value_by_key(&raw_jsonb, key)? {
696                Some(value) => match json_value_to_float(value)? {
697                    Some(float_val) => builder.append_value(float_val),
698                    None => builder.append_null(),
699                },
700                None => builder.append_null(),
701            }
702        } else {
703            builder.append_null();
704        }
705    }
706
707    Ok(Arc::new(builder.finish()))
708}
709
710/// Create the json_get_bool UDF for getting a boolean value
711///
712/// # Arguments
713/// * First parameter: JSONB binary data (LargeBinary)
714/// * Second parameter: Field name or array index as string (Utf8)
715///
716/// # Returns
717/// Boolean value with flexible type coercion (strings like 'true'/'yes'/'1' become true)
718pub fn json_get_bool_udf() -> ScalarUDF {
719    create_udf(
720        "json_get_bool",
721        vec![DataType::LargeBinary, DataType::Utf8],
722        DataType::Boolean,
723        Volatility::Immutable,
724        Arc::new(json_get_bool_columnar_impl),
725    )
726}
727
728/// Implementation of json_get_bool function with ColumnarValue
729fn json_get_bool_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
730    let arrays = common::columnar_to_arrays(args);
731    let result = json_get_bool_impl(&arrays)?;
732    Ok(ColumnarValue::Array(result))
733}
734
735/// Implementation of json_get_bool function
736fn json_get_bool_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
737    common::validate_arg_count(args, 2, "json_get_bool")?;
738
739    let jsonb_array = common::extract_jsonb_array(args)?;
740    let key_array = common::extract_string_array(args, 1)?;
741
742    let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
743
744    for i in 0..jsonb_array.len() {
745        if jsonb_array.is_null(i) {
746            builder.append_null();
747        } else if let Some(key) = common::get_string_value_at(key_array, i) {
748            let jsonb_bytes = jsonb_array.value(i);
749            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
750
751            match common::get_json_value_by_key(&raw_jsonb, key)? {
752                Some(value) => match json_value_to_bool(value)? {
753                    Some(bool_val) => builder.append_value(bool_val),
754                    None => builder.append_null(),
755                },
756                None => builder.append_null(),
757            }
758        } else {
759            builder.append_null();
760        }
761    }
762
763    Ok(Arc::new(builder.finish()))
764}
765
766/// Create the json_array_contains UDF for checking if array contains a value
767///
768/// # Arguments
769/// * First parameter: JSONB binary data (LargeBinary)
770/// * Second parameter: JSONPath to array location (Utf8)
771/// * Third parameter: Value to search for as string (Utf8)
772///
773/// # Returns
774/// Boolean indicating whether the array contains the specified value
775pub fn json_array_contains_udf() -> ScalarUDF {
776    create_udf(
777        "json_array_contains",
778        vec![DataType::LargeBinary, DataType::Utf8, DataType::Utf8],
779        DataType::Boolean,
780        Volatility::Immutable,
781        Arc::new(json_array_contains_columnar_impl),
782    )
783}
784
785/// Implementation of json_array_contains function with ColumnarValue
786fn json_array_contains_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
787    let arrays = common::columnar_to_arrays(args);
788    let result = json_array_contains_impl(&arrays)?;
789    Ok(ColumnarValue::Array(result))
790}
791
792/// Implementation of json_array_contains function
793fn json_array_contains_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
794    common::validate_arg_count(args, 3, "json_array_contains")?;
795
796    let jsonb_array = common::extract_jsonb_array(args)?;
797    let path_array = common::extract_string_array(args, 1)?;
798    let value_array = common::extract_string_array(args, 2)?;
799
800    let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
801
802    for i in 0..jsonb_array.len() {
803        if jsonb_array.is_null(i) {
804            builder.append_null();
805        } else {
806            let path = common::get_string_value_at(path_array, i);
807            let value = common::get_string_value_at(value_array, i);
808
809            match (path, value) {
810                (Some(p), Some(v)) => {
811                    let jsonb_bytes = jsonb_array.value(i);
812                    let contains = check_array_contains(jsonb_bytes, p, v)?;
813                    builder.append_value(contains);
814                }
815                _ => builder.append_null(),
816            }
817        }
818    }
819
820    Ok(Arc::new(builder.finish()))
821}
822
823/// Check if a JSON array at path contains a value
824fn check_array_contains(jsonb_bytes: &[u8], path: &str, value: &str) -> Result<bool> {
825    let json_path = common::parse_json_path(path)?;
826
827    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
828    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
829    match selector.select_values(&json_path) {
830        Ok(values) => {
831            for v in values {
832                // Convert to raw JSONB for direct access
833                let raw = v.as_raw();
834                // Check if it's an array by trying to iterate
835                let mut index = 0;
836                loop {
837                    match raw.get_by_index(index) {
838                        Ok(Some(elem)) => {
839                            let elem_str = elem.to_string();
840                            // Compare as JSON strings (with quotes for strings)
841                            if elem_str == value || elem_str == format!("\"{}\"", value) {
842                                return Ok(true);
843                            }
844                            index += 1;
845                        }
846                        Ok(None) => break, // End of array
847                        Err(_) => break,   // Not an array or error
848                    }
849                }
850            }
851            Ok(false)
852        }
853        Err(e) => Err(common::execution_error(format!(
854            "Failed to check array contains at path '{}': {}",
855            path, e
856        ))),
857    }
858}
859
860/// Create the json_array_length UDF for getting array length
861///
862/// # Arguments
863/// * First parameter: JSONB binary data (LargeBinary)
864/// * Second parameter: JSONPath to array location (Utf8)
865///
866/// # Returns
867/// Integer length of the JSON array, or null if path doesn't point to an array
868pub fn json_array_length_udf() -> ScalarUDF {
869    create_udf(
870        "json_array_length",
871        vec![DataType::LargeBinary, DataType::Utf8],
872        DataType::Int64,
873        Volatility::Immutable,
874        Arc::new(json_array_length_columnar_impl),
875    )
876}
877
878/// Implementation of json_array_length function with ColumnarValue
879fn json_array_length_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
880    let arrays = common::columnar_to_arrays(args);
881    let result = json_array_length_impl(&arrays)?;
882    Ok(ColumnarValue::Array(result))
883}
884
885/// Implementation of json_array_length function
886fn json_array_length_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
887    common::validate_arg_count(args, 2, "json_array_length")?;
888
889    let jsonb_array = common::extract_jsonb_array(args)?;
890    let path_array = common::extract_string_array(args, 1)?;
891
892    let mut builder = Int64Builder::with_capacity(jsonb_array.len());
893
894    for i in 0..jsonb_array.len() {
895        if jsonb_array.is_null(i) {
896            builder.append_null();
897        } else if let Some(path) = common::get_string_value_at(path_array, i) {
898            let jsonb_bytes = jsonb_array.value(i);
899            match get_array_length(jsonb_bytes, path)? {
900                Some(len) => builder.append_value(len),
901                None => builder.append_null(),
902            }
903        } else {
904            builder.append_null();
905        }
906    }
907
908    Ok(Arc::new(builder.finish()))
909}
910
911/// Get the length of a JSON array at path
912fn get_array_length(jsonb_bytes: &[u8], path: &str) -> Result<Option<i64>> {
913    let json_path = common::parse_json_path(path)?;
914
915    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
916    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
917    match selector.select_values(&json_path) {
918        Ok(values) => {
919            if values.is_empty() {
920                return Ok(None);
921            }
922            let first = &values[0];
923            let raw = first.as_raw();
924
925            // Count array elements by iterating
926            let mut count = 0;
927            loop {
928                match raw.get_by_index(count) {
929                    Ok(Some(_)) => count += 1,
930                    Ok(None) => break, // End of array
931                    Err(_) => {
932                        // Not an array
933                        if count == 0 {
934                            return Err(common::execution_error(format!(
935                                "Path '{}' does not point to an array",
936                                path
937                            )));
938                        }
939                        break;
940                    }
941                }
942            }
943            Ok(Some(count as i64))
944        }
945        Err(e) => Err(common::execution_error(format!(
946            "Failed to get array length at path '{}': {}",
947            path, e
948        ))),
949    }
950}
951
952#[cfg(test)]
953mod tests {
954    use super::*;
955    use arrow_array::builder::LargeBinaryBuilder;
956    use arrow_array::{BooleanArray, Float64Array, Int64Array};
957
958    fn create_test_jsonb(json_str: &str) -> Vec<u8> {
959        jsonb::parse_value(json_str.as_bytes()).unwrap().to_vec()
960    }
961
962    #[test]
963    fn test_jsonb_type_enum() {
964        // Test enum conversion to/from u8
965        assert_eq!(JsonbType::Null.as_u8(), 0);
966        assert_eq!(JsonbType::Boolean.as_u8(), 1);
967        assert_eq!(JsonbType::Int64.as_u8(), 2);
968        assert_eq!(JsonbType::Float64.as_u8(), 3);
969        assert_eq!(JsonbType::String.as_u8(), 4);
970        assert_eq!(JsonbType::Array.as_u8(), 5);
971        assert_eq!(JsonbType::Object.as_u8(), 6);
972
973        // Test from_u8 conversion
974        assert_eq!(JsonbType::from_u8(0), Some(JsonbType::Null));
975        assert_eq!(JsonbType::from_u8(1), Some(JsonbType::Boolean));
976        assert_eq!(JsonbType::from_u8(2), Some(JsonbType::Int64));
977        assert_eq!(JsonbType::from_u8(3), Some(JsonbType::Float64));
978        assert_eq!(JsonbType::from_u8(4), Some(JsonbType::String));
979        assert_eq!(JsonbType::from_u8(5), Some(JsonbType::Array));
980        assert_eq!(JsonbType::from_u8(6), Some(JsonbType::Object));
981        assert_eq!(JsonbType::from_u8(7), None); // Invalid value
982    }
983
984    #[tokio::test]
985    async fn test_json_extract_udf() -> Result<()> {
986        let json = r#"{"user": {"name": "Alice", "age": 30}, "tags": ["python", "ml"]}"#;
987        let jsonb_bytes = create_test_jsonb(json);
988
989        let mut binary_builder = LargeBinaryBuilder::new();
990        binary_builder.append_value(&jsonb_bytes);
991        binary_builder.append_value(&jsonb_bytes);
992        binary_builder.append_value(&jsonb_bytes);
993        binary_builder.append_null();
994
995        let jsonb_array = Arc::new(binary_builder.finish());
996        let path_array = Arc::new(StringArray::from(vec![
997            Some("$.user.name"),
998            Some("$.user.age"),
999            Some("$.tags[*]"),
1000            Some("$.user.name"),
1001        ]));
1002
1003        let result = json_extract_impl(&[jsonb_array, path_array])?;
1004        let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1005
1006        assert_eq!(string_array.len(), 4);
1007        assert_eq!(string_array.value(0), "\"Alice\"");
1008        assert_eq!(string_array.value(1), "30");
1009        assert_eq!(string_array.value(2), "[\"python\",\"ml\"]");
1010        assert!(string_array.is_null(3));
1011
1012        Ok(())
1013    }
1014
1015    #[tokio::test]
1016    async fn test_json_exists_udf() -> Result<()> {
1017        let json = r#"{"user": {"name": "Alice", "age": 30}, "tags": ["rust", "json"]}"#;
1018        let jsonb_bytes = create_test_jsonb(json);
1019
1020        let mut binary_builder = LargeBinaryBuilder::new();
1021        binary_builder.append_value(&jsonb_bytes);
1022        binary_builder.append_value(&jsonb_bytes);
1023        binary_builder.append_value(&jsonb_bytes);
1024        binary_builder.append_null();
1025
1026        let jsonb_array = Arc::new(binary_builder.finish());
1027        let path_array = Arc::new(StringArray::from(vec![
1028            Some("$.user.name"),
1029            Some("$.user.email"),
1030            Some("$.tags"),
1031            Some("$.any"),
1032        ]));
1033
1034        let result = json_exists_impl(&[jsonb_array, path_array])?;
1035        let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1036
1037        assert_eq!(bool_array.len(), 4);
1038        assert!(bool_array.value(0));
1039        assert!(!bool_array.value(1));
1040        assert!(bool_array.value(2));
1041        assert!(bool_array.is_null(3));
1042
1043        Ok(())
1044    }
1045
1046    #[tokio::test]
1047    async fn test_json_get_string_udf() -> Result<()> {
1048        // Test valid string conversions
1049        let json = r#"{"str": "hello", "num": 123, "bool": true, "null": null}"#;
1050        let jsonb_bytes = create_test_jsonb(json);
1051
1052        let mut binary_builder = LargeBinaryBuilder::new();
1053        binary_builder.append_value(&jsonb_bytes);
1054        binary_builder.append_value(&jsonb_bytes);
1055        binary_builder.append_value(&jsonb_bytes);
1056        binary_builder.append_value(&jsonb_bytes);
1057
1058        let jsonb_array = Arc::new(binary_builder.finish());
1059        let key_array = Arc::new(StringArray::from(vec![
1060            Some("str"),
1061            Some("num"),
1062            Some("bool"),
1063            Some("null"),
1064        ]));
1065
1066        let result = json_get_string_impl(&[jsonb_array, key_array])?;
1067        let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1068
1069        assert_eq!(string_array.len(), 4);
1070        assert_eq!(string_array.value(0), "hello");
1071        assert_eq!(string_array.value(1), "123");
1072        assert_eq!(string_array.value(2), "true");
1073        assert!(string_array.is_null(3));
1074
1075        Ok(())
1076    }
1077
1078    #[tokio::test]
1079    async fn test_json_get_int_udf() -> Result<()> {
1080        let json = r#"{"int": 42, "str_num": "99", "bool": true, "0": 7}"#;
1081        let jsonb_bytes = create_test_jsonb(json);
1082
1083        let mut binary_builder = LargeBinaryBuilder::new();
1084        for _ in 0..4 {
1085            binary_builder.append_value(&jsonb_bytes);
1086        }
1087
1088        let jsonb_array = Arc::new(binary_builder.finish());
1089        let key_array = Arc::new(StringArray::from(vec![
1090            Some("int"),
1091            Some("str_num"),
1092            Some("bool"),
1093            Some("0"),
1094        ]));
1095
1096        let result = json_get_int_impl(&[jsonb_array, key_array])?;
1097        let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
1098
1099        assert_eq!(int_array.len(), 4);
1100        assert_eq!(int_array.value(0), 42);
1101        assert_eq!(int_array.value(1), 99);
1102        assert_eq!(int_array.value(2), 1); // jsonb converts true to 1
1103        assert_eq!(int_array.value(3), 7);
1104
1105        Ok(())
1106    }
1107
1108    #[tokio::test]
1109    async fn test_json_get_float_udf() -> Result<()> {
1110        let json = r#"{
1111            "float_decimal": 1.5,
1112            "float_neg": -2.5,
1113            "float_int_value": 1.0,
1114            "float_exp": 1e2,
1115            "int_pos": 42,
1116            "int_neg": -7,
1117            "big_int": 9223372036854775808,
1118            "str_num": "3.5",
1119            "bool_true": true,
1120            "null_val": null
1121        }"#;
1122        let jsonb_bytes = create_test_jsonb(json);
1123
1124        let mut binary_builder = LargeBinaryBuilder::new();
1125        for _ in 0..11 {
1126            binary_builder.append_value(&jsonb_bytes);
1127        }
1128        let jsonb_array = Arc::new(binary_builder.finish());
1129        let key_array = Arc::new(StringArray::from(vec![
1130            Some("float_decimal"),
1131            Some("float_neg"),
1132            Some("float_int_value"),
1133            Some("float_exp"),
1134            Some("int_pos"),
1135            Some("int_neg"),
1136            Some("big_int"),
1137            Some("str_num"),
1138            Some("bool_true"),
1139            Some("null_val"),
1140            Some("missing"),
1141        ]));
1142
1143        let result = json_get_float_impl(&[jsonb_array, key_array])?;
1144        let float_array = result.as_any().downcast_ref::<Float64Array>().unwrap();
1145
1146        assert_eq!(float_array.len(), 11);
1147        assert_eq!(float_array.value(0), 1.5);
1148        assert_eq!(float_array.value(1), -2.5);
1149        assert_eq!(float_array.value(2), 1.0);
1150        assert_eq!(float_array.value(3), 100.0);
1151        assert_eq!(float_array.value(4), 42.0);
1152        assert_eq!(float_array.value(5), -7.0);
1153        // 2^63 is exactly representable in f64.
1154        assert_eq!(float_array.value(6), 9223372036854775808.0);
1155        assert_eq!(float_array.value(7), 3.5);
1156        assert_eq!(float_array.value(8), 1.0); // jsonb converts true to 1.0
1157        assert!(float_array.is_null(9));
1158        assert!(float_array.is_null(10));
1159
1160        Ok(())
1161    }
1162
1163    #[tokio::test]
1164    async fn test_json_get_bool_udf() -> Result<()> {
1165        let json =
1166            r#"{"bool_true": true, "bool_false": false, "str_true": "true", "str_false": "false"}"#;
1167        let jsonb_bytes = create_test_jsonb(json);
1168
1169        let mut binary_builder = LargeBinaryBuilder::new();
1170        binary_builder.append_value(&jsonb_bytes);
1171        binary_builder.append_value(&jsonb_bytes);
1172        binary_builder.append_value(&jsonb_bytes);
1173        binary_builder.append_value(&jsonb_bytes);
1174
1175        let jsonb_array = Arc::new(binary_builder.finish());
1176        let key_array = Arc::new(StringArray::from(vec![
1177            Some("bool_true"),
1178            Some("bool_false"),
1179            Some("str_true"),
1180            Some("str_false"),
1181        ]));
1182
1183        let result = json_get_bool_impl(&[jsonb_array, key_array])?;
1184        let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1185
1186        assert_eq!(bool_array.len(), 4);
1187        assert!(bool_array.value(0));
1188        assert!(!bool_array.value(1));
1189        assert!(bool_array.value(2)); // "true" string converts to true
1190        assert!(!bool_array.value(3)); // "false" string converts to false
1191
1192        Ok(())
1193    }
1194
1195    #[tokio::test]
1196    async fn test_json_array_contains_udf() -> Result<()> {
1197        let json = r#"{"tags": ["rust", "json", "database"], "nums": [1, 2, 3]}"#;
1198        let jsonb_bytes = create_test_jsonb(json);
1199
1200        let mut binary_builder = LargeBinaryBuilder::new();
1201        binary_builder.append_value(&jsonb_bytes);
1202        binary_builder.append_value(&jsonb_bytes);
1203        binary_builder.append_value(&jsonb_bytes);
1204        binary_builder.append_null();
1205
1206        let jsonb_array = Arc::new(binary_builder.finish());
1207        let path_array = Arc::new(StringArray::from(vec![
1208            Some("$.tags"),
1209            Some("$.tags"),
1210            Some("$.nums"),
1211            Some("$.tags"),
1212        ]));
1213        let value_array = Arc::new(StringArray::from(vec![
1214            Some("rust"),
1215            Some("python"),
1216            Some("2"),
1217            Some("any"),
1218        ]));
1219
1220        let result = json_array_contains_impl(&[jsonb_array, path_array, value_array])?;
1221        let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1222
1223        assert_eq!(bool_array.len(), 4);
1224        assert!(bool_array.value(0));
1225        assert!(!bool_array.value(1));
1226        assert!(bool_array.value(2));
1227        assert!(bool_array.is_null(3));
1228
1229        Ok(())
1230    }
1231
1232    #[tokio::test]
1233    async fn test_json_array_length_udf() -> Result<()> {
1234        let json = r#"{"empty": [], "tags": ["a", "b", "c"], "nested": {"arr": [1, 2]}}"#;
1235        let jsonb_bytes = create_test_jsonb(json);
1236
1237        let mut binary_builder = LargeBinaryBuilder::new();
1238        binary_builder.append_value(&jsonb_bytes);
1239        binary_builder.append_value(&jsonb_bytes);
1240        binary_builder.append_value(&jsonb_bytes);
1241        binary_builder.append_null();
1242
1243        let jsonb_array = Arc::new(binary_builder.finish());
1244        let path_array = Arc::new(StringArray::from(vec![
1245            Some("$.empty"),
1246            Some("$.tags"),
1247            Some("$.nested.arr"),
1248            Some("$.any"),
1249        ]));
1250
1251        let result = json_array_length_impl(&[jsonb_array, path_array])?;
1252        let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
1253
1254        assert_eq!(int_array.len(), 4);
1255        assert_eq!(int_array.value(0), 0);
1256        assert_eq!(int_array.value(1), 3);
1257        assert_eq!(int_array.value(2), 2);
1258        assert!(int_array.is_null(3));
1259
1260        Ok(())
1261    }
1262
1263    #[tokio::test]
1264    async fn test_json_extract_with_type() -> Result<()> {
1265        use arrow_array::StructArray;
1266        use arrow_array::UInt8Array;
1267
1268        let cases: &[(&str, JsonbType)] = &[
1269            (r#"{"v": 1}"#, JsonbType::Int64),
1270            (r#"{"v": 0}"#, JsonbType::Int64),
1271            (r#"{"v": -42}"#, JsonbType::Int64),
1272            (r#"{"v": 9223372036854775807}"#, JsonbType::Int64), // i64::MAX
1273            (r#"{"v": 9223372036854775808}"#, JsonbType::Float64), // i64::MAX + 1
1274            (r#"{"v": 1.0}"#, JsonbType::Float64),
1275            (r#"{"v": 2.7}"#, JsonbType::Float64),
1276            (r#"{"v": 1.5}"#, JsonbType::Float64),
1277            (r#"{"v": -1.5}"#, JsonbType::Float64),
1278            (r#"{"v": 1e2}"#, JsonbType::Float64),
1279        ];
1280
1281        for (json, expected) in cases {
1282            let bytes = create_test_jsonb(json);
1283            let mut binary_builder = LargeBinaryBuilder::new();
1284            binary_builder.append_value(&bytes);
1285            let jsonb_array: ArrayRef = Arc::new(binary_builder.finish());
1286            let path_array: ArrayRef = Arc::new(StringArray::from(vec![Some("$.v")]));
1287
1288            let result = json_extract_with_type_impl(&[jsonb_array, path_array])?;
1289            let struct_array = result.as_any().downcast_ref::<StructArray>().unwrap();
1290            let type_tags = struct_array
1291                .column_by_name("type_tag")
1292                .unwrap()
1293                .as_any()
1294                .downcast_ref::<UInt8Array>()
1295                .unwrap();
1296            assert_eq!(type_tags.value(0), expected.as_u8());
1297        }
1298
1299        Ok(())
1300    }
1301
1302    #[tokio::test]
1303    async fn test_json_extract_with_wildcard() -> Result<()> {
1304        use arrow_array::StructArray;
1305        use arrow_array::UInt8Array;
1306
1307        let json = r#"{"items": [{"price": 1}, {"price": 2}]}"#;
1308        let jsonb_bytes = create_test_jsonb(json);
1309
1310        let mut binary_builder = LargeBinaryBuilder::new();
1311        binary_builder.append_value(&jsonb_bytes);
1312
1313        let jsonb_array: ArrayRef = Arc::new(binary_builder.finish());
1314        let path_array: ArrayRef = Arc::new(StringArray::from(vec![Some("$.items[*].price")]));
1315
1316        let result = json_extract_with_type_impl(&[jsonb_array, path_array])?;
1317        let struct_array = result.as_any().downcast_ref::<StructArray>().unwrap();
1318        let values = struct_array
1319            .column_by_name("value")
1320            .unwrap()
1321            .as_any()
1322            .downcast_ref::<LargeBinaryArray>()
1323            .unwrap();
1324        let type_tags = struct_array
1325            .column_by_name("type_tag")
1326            .unwrap()
1327            .as_any()
1328            .downcast_ref::<UInt8Array>()
1329            .unwrap();
1330
1331        assert_eq!(jsonb::RawJsonb::new(values.value(0)).to_string(), "[1,2]");
1332        assert_eq!(type_tags.value(0), JsonbType::Array.as_u8());
1333
1334        Ok(())
1335    }
1336
1337    #[tokio::test]
1338    async fn test_json_array_access() -> Result<()> {
1339        let json = r#"["first", "second", "third"]"#;
1340        let jsonb_bytes = create_test_jsonb(json);
1341
1342        let mut binary_builder = LargeBinaryBuilder::new();
1343        binary_builder.append_value(&jsonb_bytes);
1344        binary_builder.append_value(&jsonb_bytes);
1345        binary_builder.append_value(&jsonb_bytes);
1346
1347        let jsonb_array = Arc::new(binary_builder.finish());
1348        let key_array = Arc::new(StringArray::from(vec![
1349            Some("0"),
1350            Some("1"),
1351            Some("10"), // Out of bounds
1352        ]));
1353
1354        let result = json_get_string_impl(&[jsonb_array, key_array])?;
1355        let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1356
1357        assert_eq!(string_array.len(), 3);
1358        assert_eq!(string_array.value(0), "first");
1359        assert_eq!(string_array.value(1), "second");
1360        assert!(string_array.is_null(2));
1361
1362        Ok(())
1363    }
1364
1365    #[tokio::test]
1366    async fn test_json_get_numeric_object_key() -> Result<()> {
1367        let obj_bytes = create_test_jsonb(r#"{"0": "from_object", "1": 42}"#);
1368        let arr_bytes = create_test_jsonb(r#"["zero", "one", "two"]"#);
1369
1370        let mut binary_builder = LargeBinaryBuilder::new();
1371        binary_builder.append_value(&obj_bytes);
1372        binary_builder.append_value(&arr_bytes);
1373        binary_builder.append_value(&arr_bytes);
1374
1375        let jsonb_array = Arc::new(binary_builder.finish());
1376        let key_array = Arc::new(StringArray::from(vec![
1377            Some("0"),   // Numeric key on object: looks up the "0" field.
1378            Some("0"),   // Numeric key on array: looks up index 0.
1379            Some("foo"), // Non-numeric key on array: no match.
1380        ]));
1381
1382        let result = json_get_string_impl(&[jsonb_array, key_array])?;
1383        let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1384
1385        assert_eq!(string_array.len(), 3);
1386        assert_eq!(string_array.value(0), "from_object");
1387        assert_eq!(string_array.value(1), "zero");
1388        assert!(string_array.is_null(2));
1389
1390        Ok(())
1391    }
1392}