Skip to main content

lance_datafusion/udf/
json.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_array::builder::{
5    BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder,
6};
7use arrow_array::{Array, ArrayRef, LargeBinaryArray, StringArray};
8use arrow_schema::DataType;
9use datafusion::error::{DataFusionError, Result};
10use datafusion::logical_expr::{ScalarUDF, Volatility};
11use datafusion::physical_plan::ColumnarValue;
12use datafusion::prelude::create_udf;
13use std::sync::Arc;
14
15/// Represents the type of a JSONB value
16#[repr(u8)]
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum JsonbType {
19    Null = 0,
20    Boolean = 1,
21    Int64 = 2,
22    Float64 = 3,
23    String = 4,
24    Array = 5,
25    Object = 6,
26}
27
28impl JsonbType {
29    /// Convert from u8 value
30    pub fn from_u8(value: u8) -> Option<Self> {
31        match value {
32            0 => Some(Self::Null),
33            1 => Some(Self::Boolean),
34            2 => Some(Self::Int64),
35            3 => Some(Self::Float64),
36            4 => Some(Self::String),
37            5 => Some(Self::Array),
38            6 => Some(Self::Object),
39            _ => None,
40        }
41    }
42
43    /// Convert to u8 value for storage in Arrow arrays
44    pub fn as_u8(self) -> u8 {
45        self as u8
46    }
47}
48
49/// Common helper functions and types for JSON UDFs
50mod common {
51    use super::*;
52
53    /// Key type for JSON field access - optimizes field/index parsing
54    #[derive(Debug, Clone)]
55    pub enum KeyType {
56        Field(String),
57        Index(usize),
58    }
59
60    impl KeyType {
61        /// Parse a key string into either a field name or array index (once per operation)
62        pub fn parse(key: &str) -> Self {
63            if let Ok(index) = key.parse::<usize>() {
64                Self::Index(index)
65            } else {
66                Self::Field(key.to_string())
67            }
68        }
69    }
70
71    /// Convert ColumnarValue arguments to ArrayRef vector
72    ///
73    /// Note: This implementation currently broadcasts scalars to arrays.
74    /// Future optimization: handle scalars directly without broadcasting
75    /// to improve performance for scalar inputs.
76    pub fn columnar_to_arrays(args: &[ColumnarValue]) -> Vec<ArrayRef> {
77        args.iter()
78            .map(|arg| match arg {
79                ColumnarValue::Array(arr) => arr.clone(),
80                ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(),
81            })
82            .collect()
83    }
84
85    /// Create DataFusionError for execution failures (simplified error wrapping)
86    pub fn execution_error(msg: impl Into<String>) -> DataFusionError {
87        DataFusionError::Execution(msg.into())
88    }
89
90    /// Validate argument count for UDF
91    pub fn validate_arg_count(
92        args: &[ArrayRef],
93        expected: usize,
94        function_name: &str,
95    ) -> Result<()> {
96        if args.len() != expected {
97            return Err(execution_error(format!(
98                "{} requires exactly {} arguments",
99                function_name, expected
100            )));
101        }
102        Ok(())
103    }
104
105    /// Extract and validate LargeBinaryArray from first argument
106    pub fn extract_jsonb_array(args: &[ArrayRef]) -> Result<&LargeBinaryArray> {
107        args[0]
108            .as_any()
109            .downcast_ref::<LargeBinaryArray>()
110            .ok_or_else(|| execution_error("First argument must be LargeBinary"))
111    }
112
113    /// Extract and validate StringArray from specified argument
114    pub fn extract_string_array(args: &[ArrayRef], arg_index: usize) -> Result<&StringArray> {
115        args[arg_index]
116            .as_any()
117            .downcast_ref::<StringArray>()
118            .ok_or_else(|| execution_error(format!("Argument {} must be String", arg_index + 1)))
119    }
120
121    /// Get string value at index, handling scalar broadcast case
122    /// When a scalar is converted to an array, it becomes a single-element array
123    /// This function handles accessing that value repeatedly for all rows
124    pub fn get_string_value_at(string_array: &StringArray, index: usize) -> Option<&str> {
125        // Handle scalar broadcast case: if array has only 1 element, always use index 0
126        let actual_index = if string_array.len() == 1 { 0 } else { index };
127
128        if string_array.is_null(actual_index) {
129            None
130        } else {
131            Some(string_array.value(actual_index))
132        }
133    }
134
135    /// Get JSON field/element using pre-parsed key type (avoids repeated parsing)
136    pub fn get_json_value_by_key(
137        raw_jsonb: &jsonb::RawJsonb,
138        key_type: &KeyType,
139    ) -> Result<Option<jsonb::OwnedJsonb>> {
140        match key_type {
141            KeyType::Field(field) => raw_jsonb
142                .get_by_name(field, false)
143                .map_err(|e| execution_error(format!("Failed to get field '{}': {}", field, e))),
144            KeyType::Index(index) => raw_jsonb.get_by_index(*index).map_err(|e| {
145                execution_error(format!("Failed to get array element [{}]: {}", index, e))
146            }),
147        }
148    }
149
150    /// Parse JSONPath with proper error handling (no false returns)
151    pub fn parse_json_path(path: &str) -> Result<jsonb::jsonpath::JsonPath<'_>> {
152        jsonb::jsonpath::parse_json_path(path.as_bytes())
153            .map_err(|e| execution_error(format!("Invalid JSONPath '{}': {}", path, e)))
154    }
155}
156
157/// Convert JSONB value to string using jsonb's built-in serde (strict mode)
158fn json_value_to_string(value: jsonb::OwnedJsonb) -> Result<Option<String>> {
159    let raw_jsonb = value.as_raw();
160
161    // Check for null first
162    if raw_jsonb
163        .is_null()
164        .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
165    {
166        return Ok(None);
167    }
168
169    // Use jsonb's built-in to_str() method - strict conversion
170    raw_jsonb
171        .to_str()
172        .map(Some)
173        .map_err(|e| common::execution_error(format!("Failed to convert to string: {}", e)))
174}
175
176/// Convert JSONB value to integer using jsonb's built-in serde (strict mode)
177fn json_value_to_int(value: jsonb::OwnedJsonb) -> Result<Option<i64>> {
178    let raw_jsonb = value.as_raw();
179
180    // Check for null first
181    if raw_jsonb
182        .is_null()
183        .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
184    {
185        return Ok(None);
186    }
187
188    // Use jsonb's built-in to_i64() method - strict conversion
189    raw_jsonb
190        .to_i64()
191        .map(Some)
192        .map_err(|e| common::execution_error(format!("Failed to convert to integer: {}", e)))
193}
194
195/// Convert JSONB value to float using jsonb's built-in serde (strict mode)
196fn json_value_to_float(value: jsonb::OwnedJsonb) -> Result<Option<f64>> {
197    let raw_jsonb = value.as_raw();
198
199    // Check for null first
200    if raw_jsonb
201        .is_null()
202        .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
203    {
204        return Ok(None);
205    }
206
207    // Use jsonb's built-in to_f64() method - strict conversion
208    raw_jsonb
209        .to_f64()
210        .map(Some)
211        .map_err(|e| common::execution_error(format!("Failed to convert to float: {}", e)))
212}
213
214/// Convert JSONB value to boolean using jsonb's built-in serde (strict mode)
215fn json_value_to_bool(value: jsonb::OwnedJsonb) -> Result<Option<bool>> {
216    let raw_jsonb = value.as_raw();
217
218    // Check for null first
219    if raw_jsonb
220        .is_null()
221        .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
222    {
223        return Ok(None);
224    }
225
226    // Use jsonb's built-in to_bool() method - strict conversion
227    raw_jsonb
228        .to_bool()
229        .map(Some)
230        .map_err(|e| common::execution_error(format!("Failed to convert to boolean: {}", e)))
231}
232
233/// Create the json_extract UDF for extracting JSONPath from JSON data
234///
235/// # Arguments
236/// * First parameter: JSONB binary data (LargeBinary)
237/// * Second parameter: JSONPath expression as string (Utf8)
238///
239/// # Returns
240/// String representation of the extracted value, or null if path not found
241pub fn json_extract_udf() -> ScalarUDF {
242    create_udf(
243        "json_extract",
244        vec![DataType::LargeBinary, DataType::Utf8],
245        DataType::Utf8,
246        Volatility::Immutable,
247        Arc::new(json_extract_columnar_impl),
248    )
249}
250
251/// Create the json_extract_with_type UDF that returns JSONB bytes with type information
252///
253/// # Arguments
254/// * First parameter: JSONB binary data (LargeBinary)
255/// * Second parameter: JSONPath expression as string (Utf8)
256///
257/// # Returns
258/// A struct with two fields:
259/// - value: LargeBinary (the extracted JSONB value)
260/// - type_tag: UInt8 (type information: 0=null, 1=bool, 2=int64, 3=float64, 4=string, 5=array, 6=object)
261pub fn json_extract_with_type_udf() -> ScalarUDF {
262    use arrow_schema::Fields;
263
264    let return_type = DataType::Struct(Fields::from(vec![
265        arrow_schema::Field::new("value", DataType::LargeBinary, true),
266        arrow_schema::Field::new("type_tag", DataType::UInt8, false),
267    ]));
268
269    create_udf(
270        "json_extract_with_type",
271        vec![DataType::LargeBinary, DataType::Utf8],
272        return_type,
273        Volatility::Immutable,
274        Arc::new(json_extract_with_type_columnar_impl),
275    )
276}
277
278/// Implementation of json_extract function with ColumnarValue
279fn json_extract_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
280    let arrays = common::columnar_to_arrays(args);
281    let result = json_extract_impl(&arrays)?;
282    Ok(ColumnarValue::Array(result))
283}
284
285/// Implementation of json_extract_with_type function with ColumnarValue
286fn json_extract_with_type_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
287    let arrays = common::columnar_to_arrays(args);
288    let result = json_extract_with_type_impl(&arrays)?;
289    Ok(ColumnarValue::Array(result))
290}
291
292/// Implementation of json_extract function
293fn json_extract_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
294    common::validate_arg_count(args, 2, "json_extract")?;
295
296    let jsonb_array = common::extract_jsonb_array(args)?;
297    let path_array = common::extract_string_array(args, 1)?;
298    let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
299
300    for i in 0..jsonb_array.len() {
301        if jsonb_array.is_null(i) {
302            builder.append_null();
303        } else if let Some(path) = common::get_string_value_at(path_array, i) {
304            let jsonb_bytes = jsonb_array.value(i);
305            match extract_json_path(jsonb_bytes, path)? {
306                Some(value) => builder.append_value(&value),
307                None => builder.append_null(),
308            }
309        } else {
310            builder.append_null();
311        }
312    }
313
314    Ok(Arc::new(builder.finish()))
315}
316
317/// Implementation of json_extract_with_type function
318fn json_extract_with_type_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
319    use arrow_array::StructArray;
320    use arrow_array::builder::{LargeBinaryBuilder, UInt8Builder};
321
322    common::validate_arg_count(args, 2, "json_extract_with_type")?;
323
324    let jsonb_array = common::extract_jsonb_array(args)?;
325    let path_array = common::extract_string_array(args, 1)?;
326
327    let mut value_builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 1024);
328    let mut type_builder = UInt8Builder::with_capacity(jsonb_array.len());
329
330    for i in 0..jsonb_array.len() {
331        if jsonb_array.is_null(i) {
332            value_builder.append_null();
333            type_builder.append_value(JsonbType::Null.as_u8());
334        } else if let Some(path) = common::get_string_value_at(path_array, i) {
335            let jsonb_bytes = jsonb_array.value(i);
336            match extract_json_path_with_type(jsonb_bytes, path)? {
337                Some((value_bytes, type_tag)) => {
338                    value_builder.append_value(&value_bytes);
339                    type_builder.append_value(type_tag);
340                }
341                None => {
342                    value_builder.append_null();
343                    type_builder.append_value(JsonbType::Null.as_u8());
344                }
345            }
346        } else {
347            value_builder.append_null();
348            type_builder.append_value(JsonbType::Null.as_u8());
349        }
350    }
351
352    // Create struct array with two fields
353    let value_array = Arc::new(value_builder.finish()) as ArrayRef;
354    let type_array = Arc::new(type_builder.finish()) as ArrayRef;
355
356    let struct_array = StructArray::from(vec![
357        (
358            Arc::new(arrow_schema::Field::new(
359                "value",
360                DataType::LargeBinary,
361                true,
362            )),
363            value_array,
364        ),
365        (
366            Arc::new(arrow_schema::Field::new("type_tag", DataType::UInt8, false)),
367            type_array,
368        ),
369    ]);
370
371    Ok(Arc::new(struct_array))
372}
373
374/// Extract value from JSONB using JSONPath and return with type information
375/// Returns (JSONB bytes, type_tag) where type_tag represents the JsonbType
376fn extract_json_path_with_type(jsonb_bytes: &[u8], path: &str) -> Result<Option<(Vec<u8>, u8)>> {
377    let json_path = common::parse_json_path(path)?;
378
379    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
380    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
381    match selector.select_values(&json_path) {
382        Ok(values) => {
383            if values.is_empty() {
384                Ok(None)
385            } else {
386                // Get the first matched value
387                let owned_value = &values[0];
388                let raw = owned_value.as_raw();
389
390                // Determine type using JsonbType enum
391                let jsonb_type = if raw.is_null().unwrap_or(false) {
392                    JsonbType::Null
393                } else if raw.is_boolean().unwrap_or(false) {
394                    JsonbType::Boolean
395                } else if raw.is_number().unwrap_or(false) {
396                    let is_float_storage =
397                        matches!(raw.as_number(), Ok(Some(jsonb::Number::Float64(_))));
398                    if !is_float_storage && raw.is_i64().unwrap_or(false) {
399                        JsonbType::Int64
400                    } else {
401                        JsonbType::Float64
402                    }
403                } else if raw.is_string().unwrap_or(false) {
404                    JsonbType::String
405                } else if raw.is_array().unwrap_or(false) {
406                    JsonbType::Array
407                } else if raw.is_object().unwrap_or(false) {
408                    JsonbType::Object
409                } else {
410                    JsonbType::String // default to string
411                };
412
413                // Return the JSONB bytes and type tag as u8
414                Ok(Some((owned_value.clone().to_vec(), jsonb_type.as_u8())))
415            }
416        }
417        Err(e) => Err(common::execution_error(format!(
418            "Failed to select values from path '{}': {}",
419            path, e
420        ))),
421    }
422}
423
424/// Extract value from JSONB using JSONPath
425///
426/// Note: Uses `select_values` instead of the deprecated `select_by_path` method
427fn extract_json_path(jsonb_bytes: &[u8], path: &str) -> Result<Option<String>> {
428    let json_path = common::parse_json_path(path)?;
429
430    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
431    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
432    match selector.select_values(&json_path) {
433        Ok(values) => {
434            if values.is_empty() {
435                Ok(None)
436            } else {
437                // Return the first matched value
438                Ok(Some(values[0].to_string()))
439            }
440        }
441        Err(e) => Err(common::execution_error(format!(
442            "Failed to select values from path '{}': {}",
443            path, e
444        ))),
445    }
446}
447
448/// Create the json_exists UDF for checking if a JSONPath exists
449///
450/// # Arguments
451/// * First parameter: JSONB binary data (LargeBinary)
452/// * Second parameter: JSONPath expression as string (Utf8)
453///
454/// # Returns
455/// Boolean indicating whether the path exists in the JSON data
456pub fn json_exists_udf() -> ScalarUDF {
457    create_udf(
458        "json_exists",
459        vec![DataType::LargeBinary, DataType::Utf8],
460        DataType::Boolean,
461        Volatility::Immutable,
462        Arc::new(json_exists_columnar_impl),
463    )
464}
465
466/// Implementation of json_exists function with ColumnarValue
467fn json_exists_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
468    let arrays = common::columnar_to_arrays(args);
469    let result = json_exists_impl(&arrays)?;
470    Ok(ColumnarValue::Array(result))
471}
472
473/// Implementation of json_exists function
474fn json_exists_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
475    common::validate_arg_count(args, 2, "json_exists")?;
476
477    let jsonb_array = common::extract_jsonb_array(args)?;
478    let path_array = common::extract_string_array(args, 1)?;
479
480    let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
481
482    for i in 0..jsonb_array.len() {
483        if jsonb_array.is_null(i) {
484            builder.append_null();
485        } else if let Some(path) = common::get_string_value_at(path_array, i) {
486            let jsonb_bytes = jsonb_array.value(i);
487            let exists = check_json_path_exists(jsonb_bytes, path)?;
488            builder.append_value(exists);
489        } else {
490            builder.append_null();
491        }
492    }
493
494    Ok(Arc::new(builder.finish()))
495}
496
497/// Check if a JSONPath exists in JSONB
498fn check_json_path_exists(jsonb_bytes: &[u8], path: &str) -> Result<bool> {
499    let json_path = common::parse_json_path(path)?;
500
501    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
502    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
503    match selector.exists(&json_path) {
504        Ok(exists) => Ok(exists),
505        Err(e) => Err(common::execution_error(format!(
506            "Failed to check existence of path '{}': {}",
507            path, e
508        ))),
509    }
510}
511
512/// Create the json_get UDF for getting a field value as JSON string
513///
514/// # Arguments
515/// * First parameter: JSONB binary data (LargeBinary)
516/// * Second parameter: Field name or array index as string (Utf8)
517///
518/// # Returns
519/// Raw JSONB bytes of the field value, or null if not found
520pub fn json_get_udf() -> ScalarUDF {
521    create_udf(
522        "json_get",
523        vec![DataType::LargeBinary, DataType::Utf8],
524        DataType::LargeBinary,
525        Volatility::Immutable,
526        Arc::new(json_get_columnar_impl),
527    )
528}
529
530/// Implementation of json_get function with ColumnarValue
531fn json_get_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
532    let arrays = common::columnar_to_arrays(args);
533    let result = json_get_impl(&arrays)?;
534    Ok(ColumnarValue::Array(result))
535}
536
537/// Implementation of json_get function
538fn json_get_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
539    common::validate_arg_count(args, 2, "json_get")?;
540
541    let jsonb_array = common::extract_jsonb_array(args)?;
542    let key_array = common::extract_string_array(args, 1)?;
543
544    let mut builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 0);
545
546    for i in 0..jsonb_array.len() {
547        if jsonb_array.is_null(i) {
548            builder.append_null();
549        } else if let Some(key) = common::get_string_value_at(key_array, i) {
550            let jsonb_bytes = jsonb_array.value(i);
551            let key_type = common::KeyType::parse(key);
552            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
553
554            match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
555                Some(value) => builder.append_value(value.as_raw().as_ref()),
556                None => builder.append_null(),
557            }
558        } else {
559            builder.append_null();
560        }
561    }
562
563    Ok(Arc::new(builder.finish()))
564}
565
566/// Create the json_get_string UDF for getting a string value
567///
568/// # Arguments
569/// * First parameter: JSONB binary data (LargeBinary)
570/// * Second parameter: Field name or array index as string (Utf8)
571///
572/// # Returns
573/// String value with type coercion (numbers/booleans converted to strings)
574pub fn json_get_string_udf() -> ScalarUDF {
575    create_udf(
576        "json_get_string",
577        vec![DataType::LargeBinary, DataType::Utf8],
578        DataType::Utf8,
579        Volatility::Immutable,
580        Arc::new(json_get_string_columnar_impl),
581    )
582}
583
584/// Implementation of json_get_string function with ColumnarValue
585fn json_get_string_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
586    let arrays = common::columnar_to_arrays(args);
587    let result = json_get_string_impl(&arrays)?;
588    Ok(ColumnarValue::Array(result))
589}
590
591/// Implementation of json_get_string function
592fn json_get_string_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
593    common::validate_arg_count(args, 2, "json_get_string")?;
594
595    let jsonb_array = common::extract_jsonb_array(args)?;
596    let key_array = common::extract_string_array(args, 1)?;
597
598    let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
599
600    for i in 0..jsonb_array.len() {
601        if jsonb_array.is_null(i) {
602            builder.append_null();
603        } else if let Some(key) = common::get_string_value_at(key_array, i) {
604            let jsonb_bytes = jsonb_array.value(i);
605            let key_type = common::KeyType::parse(key);
606            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
607
608            match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
609                Some(value) => match json_value_to_string(value)? {
610                    Some(string_val) => builder.append_value(&string_val),
611                    None => builder.append_null(),
612                },
613                None => builder.append_null(),
614            }
615        } else {
616            builder.append_null();
617        }
618    }
619
620    Ok(Arc::new(builder.finish()))
621}
622
623/// Create the json_get_int UDF for getting an integer value
624///
625/// # Arguments
626/// * First parameter: JSONB binary data (LargeBinary)
627/// * Second parameter: Field name or array index as string (Utf8)
628///
629/// # Returns
630/// Integer value with type coercion (strings/floats/booleans converted to int)
631pub fn json_get_int_udf() -> ScalarUDF {
632    create_udf(
633        "json_get_int",
634        vec![DataType::LargeBinary, DataType::Utf8],
635        DataType::Int64,
636        Volatility::Immutable,
637        Arc::new(json_get_int_columnar_impl),
638    )
639}
640
641/// Implementation of json_get_int function with ColumnarValue
642fn json_get_int_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
643    let arrays = common::columnar_to_arrays(args);
644    let result = json_get_int_impl(&arrays)?;
645    Ok(ColumnarValue::Array(result))
646}
647
648/// Implementation of json_get_int function
649fn json_get_int_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
650    common::validate_arg_count(args, 2, "json_get_int")?;
651
652    let jsonb_array = common::extract_jsonb_array(args)?;
653    let key_array = common::extract_string_array(args, 1)?;
654
655    let mut builder = Int64Builder::with_capacity(jsonb_array.len());
656
657    for i in 0..jsonb_array.len() {
658        if jsonb_array.is_null(i) {
659            builder.append_null();
660        } else if let Some(key) = common::get_string_value_at(key_array, i) {
661            let jsonb_bytes = jsonb_array.value(i);
662            let key_type = common::KeyType::parse(key);
663            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
664
665            match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
666                Some(value) => match json_value_to_int(value)? {
667                    Some(int_val) => builder.append_value(int_val),
668                    None => builder.append_null(),
669                },
670                None => builder.append_null(),
671            }
672        } else {
673            builder.append_null();
674        }
675    }
676
677    Ok(Arc::new(builder.finish()))
678}
679
680/// Create the json_get_float UDF for getting a float value
681///
682/// # Arguments
683/// * First parameter: JSONB binary data (LargeBinary)
684/// * Second parameter: Field name or array index as string (Utf8)
685///
686/// # Returns
687/// Float value with type coercion (strings/integers/booleans converted to float)
688pub fn json_get_float_udf() -> ScalarUDF {
689    create_udf(
690        "json_get_float",
691        vec![DataType::LargeBinary, DataType::Utf8],
692        DataType::Float64,
693        Volatility::Immutable,
694        Arc::new(json_get_float_columnar_impl),
695    )
696}
697
698/// Implementation of json_get_float function with ColumnarValue
699fn json_get_float_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
700    let arrays = common::columnar_to_arrays(args);
701    let result = json_get_float_impl(&arrays)?;
702    Ok(ColumnarValue::Array(result))
703}
704
705/// Implementation of json_get_float function
706fn json_get_float_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
707    common::validate_arg_count(args, 2, "json_get_float")?;
708
709    let jsonb_array = common::extract_jsonb_array(args)?;
710    let key_array = common::extract_string_array(args, 1)?;
711
712    let mut builder = Float64Builder::with_capacity(jsonb_array.len());
713
714    for i in 0..jsonb_array.len() {
715        if jsonb_array.is_null(i) {
716            builder.append_null();
717        } else if let Some(key) = common::get_string_value_at(key_array, i) {
718            let jsonb_bytes = jsonb_array.value(i);
719            let key_type = common::KeyType::parse(key);
720            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
721
722            match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
723                Some(value) => match json_value_to_float(value)? {
724                    Some(float_val) => builder.append_value(float_val),
725                    None => builder.append_null(),
726                },
727                None => builder.append_null(),
728            }
729        } else {
730            builder.append_null();
731        }
732    }
733
734    Ok(Arc::new(builder.finish()))
735}
736
737/// Create the json_get_bool UDF for getting a boolean value
738///
739/// # Arguments
740/// * First parameter: JSONB binary data (LargeBinary)
741/// * Second parameter: Field name or array index as string (Utf8)
742///
743/// # Returns
744/// Boolean value with flexible type coercion (strings like 'true'/'yes'/'1' become true)
745pub fn json_get_bool_udf() -> ScalarUDF {
746    create_udf(
747        "json_get_bool",
748        vec![DataType::LargeBinary, DataType::Utf8],
749        DataType::Boolean,
750        Volatility::Immutable,
751        Arc::new(json_get_bool_columnar_impl),
752    )
753}
754
755/// Implementation of json_get_bool function with ColumnarValue
756fn json_get_bool_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
757    let arrays = common::columnar_to_arrays(args);
758    let result = json_get_bool_impl(&arrays)?;
759    Ok(ColumnarValue::Array(result))
760}
761
762/// Implementation of json_get_bool function
763fn json_get_bool_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
764    common::validate_arg_count(args, 2, "json_get_bool")?;
765
766    let jsonb_array = common::extract_jsonb_array(args)?;
767    let key_array = common::extract_string_array(args, 1)?;
768
769    let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
770
771    for i in 0..jsonb_array.len() {
772        if jsonb_array.is_null(i) {
773            builder.append_null();
774        } else if let Some(key) = common::get_string_value_at(key_array, i) {
775            let jsonb_bytes = jsonb_array.value(i);
776            let key_type = common::KeyType::parse(key);
777            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
778
779            match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
780                Some(value) => match json_value_to_bool(value)? {
781                    Some(bool_val) => builder.append_value(bool_val),
782                    None => builder.append_null(),
783                },
784                None => builder.append_null(),
785            }
786        } else {
787            builder.append_null();
788        }
789    }
790
791    Ok(Arc::new(builder.finish()))
792}
793
794/// Create the json_array_contains UDF for checking if array contains a value
795///
796/// # Arguments
797/// * First parameter: JSONB binary data (LargeBinary)
798/// * Second parameter: JSONPath to array location (Utf8)
799/// * Third parameter: Value to search for as string (Utf8)
800///
801/// # Returns
802/// Boolean indicating whether the array contains the specified value
803pub fn json_array_contains_udf() -> ScalarUDF {
804    create_udf(
805        "json_array_contains",
806        vec![DataType::LargeBinary, DataType::Utf8, DataType::Utf8],
807        DataType::Boolean,
808        Volatility::Immutable,
809        Arc::new(json_array_contains_columnar_impl),
810    )
811}
812
813/// Implementation of json_array_contains function with ColumnarValue
814fn json_array_contains_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
815    let arrays = common::columnar_to_arrays(args);
816    let result = json_array_contains_impl(&arrays)?;
817    Ok(ColumnarValue::Array(result))
818}
819
820/// Implementation of json_array_contains function
821fn json_array_contains_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
822    common::validate_arg_count(args, 3, "json_array_contains")?;
823
824    let jsonb_array = common::extract_jsonb_array(args)?;
825    let path_array = common::extract_string_array(args, 1)?;
826    let value_array = common::extract_string_array(args, 2)?;
827
828    let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
829
830    for i in 0..jsonb_array.len() {
831        if jsonb_array.is_null(i) {
832            builder.append_null();
833        } else {
834            let path = common::get_string_value_at(path_array, i);
835            let value = common::get_string_value_at(value_array, i);
836
837            match (path, value) {
838                (Some(p), Some(v)) => {
839                    let jsonb_bytes = jsonb_array.value(i);
840                    let contains = check_array_contains(jsonb_bytes, p, v)?;
841                    builder.append_value(contains);
842                }
843                _ => builder.append_null(),
844            }
845        }
846    }
847
848    Ok(Arc::new(builder.finish()))
849}
850
851/// Check if a JSON array at path contains a value
852fn check_array_contains(jsonb_bytes: &[u8], path: &str, value: &str) -> Result<bool> {
853    let json_path = common::parse_json_path(path)?;
854
855    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
856    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
857    match selector.select_values(&json_path) {
858        Ok(values) => {
859            for v in values {
860                // Convert to raw JSONB for direct access
861                let raw = v.as_raw();
862                // Check if it's an array by trying to iterate
863                let mut index = 0;
864                loop {
865                    match raw.get_by_index(index) {
866                        Ok(Some(elem)) => {
867                            let elem_str = elem.to_string();
868                            // Compare as JSON strings (with quotes for strings)
869                            if elem_str == value || elem_str == format!("\"{}\"", value) {
870                                return Ok(true);
871                            }
872                            index += 1;
873                        }
874                        Ok(None) => break, // End of array
875                        Err(_) => break,   // Not an array or error
876                    }
877                }
878            }
879            Ok(false)
880        }
881        Err(e) => Err(common::execution_error(format!(
882            "Failed to check array contains at path '{}': {}",
883            path, e
884        ))),
885    }
886}
887
888/// Create the json_array_length UDF for getting array length
889///
890/// # Arguments
891/// * First parameter: JSONB binary data (LargeBinary)
892/// * Second parameter: JSONPath to array location (Utf8)
893///
894/// # Returns
895/// Integer length of the JSON array, or null if path doesn't point to an array
896pub fn json_array_length_udf() -> ScalarUDF {
897    create_udf(
898        "json_array_length",
899        vec![DataType::LargeBinary, DataType::Utf8],
900        DataType::Int64,
901        Volatility::Immutable,
902        Arc::new(json_array_length_columnar_impl),
903    )
904}
905
906/// Implementation of json_array_length function with ColumnarValue
907fn json_array_length_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
908    let arrays = common::columnar_to_arrays(args);
909    let result = json_array_length_impl(&arrays)?;
910    Ok(ColumnarValue::Array(result))
911}
912
913/// Implementation of json_array_length function
914fn json_array_length_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
915    common::validate_arg_count(args, 2, "json_array_length")?;
916
917    let jsonb_array = common::extract_jsonb_array(args)?;
918    let path_array = common::extract_string_array(args, 1)?;
919
920    let mut builder = Int64Builder::with_capacity(jsonb_array.len());
921
922    for i in 0..jsonb_array.len() {
923        if jsonb_array.is_null(i) {
924            builder.append_null();
925        } else if let Some(path) = common::get_string_value_at(path_array, i) {
926            let jsonb_bytes = jsonb_array.value(i);
927            match get_array_length(jsonb_bytes, path)? {
928                Some(len) => builder.append_value(len),
929                None => builder.append_null(),
930            }
931        } else {
932            builder.append_null();
933        }
934    }
935
936    Ok(Arc::new(builder.finish()))
937}
938
939/// Get the length of a JSON array at path
940fn get_array_length(jsonb_bytes: &[u8], path: &str) -> Result<Option<i64>> {
941    let json_path = common::parse_json_path(path)?;
942
943    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
944    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
945    match selector.select_values(&json_path) {
946        Ok(values) => {
947            if values.is_empty() {
948                return Ok(None);
949            }
950            let first = &values[0];
951            let raw = first.as_raw();
952
953            // Count array elements by iterating
954            let mut count = 0;
955            loop {
956                match raw.get_by_index(count) {
957                    Ok(Some(_)) => count += 1,
958                    Ok(None) => break, // End of array
959                    Err(_) => {
960                        // Not an array
961                        if count == 0 {
962                            return Err(common::execution_error(format!(
963                                "Path '{}' does not point to an array",
964                                path
965                            )));
966                        }
967                        break;
968                    }
969                }
970            }
971            Ok(Some(count as i64))
972        }
973        Err(e) => Err(common::execution_error(format!(
974            "Failed to get array length at path '{}': {}",
975            path, e
976        ))),
977    }
978}
979
980#[cfg(test)]
981mod tests {
982    use super::*;
983    use arrow_array::builder::LargeBinaryBuilder;
984    use arrow_array::{BooleanArray, Float64Array, Int64Array};
985
986    fn create_test_jsonb(json_str: &str) -> Vec<u8> {
987        jsonb::parse_value(json_str.as_bytes()).unwrap().to_vec()
988    }
989
990    #[test]
991    fn test_jsonb_type_enum() {
992        // Test enum conversion to/from u8
993        assert_eq!(JsonbType::Null.as_u8(), 0);
994        assert_eq!(JsonbType::Boolean.as_u8(), 1);
995        assert_eq!(JsonbType::Int64.as_u8(), 2);
996        assert_eq!(JsonbType::Float64.as_u8(), 3);
997        assert_eq!(JsonbType::String.as_u8(), 4);
998        assert_eq!(JsonbType::Array.as_u8(), 5);
999        assert_eq!(JsonbType::Object.as_u8(), 6);
1000
1001        // Test from_u8 conversion
1002        assert_eq!(JsonbType::from_u8(0), Some(JsonbType::Null));
1003        assert_eq!(JsonbType::from_u8(1), Some(JsonbType::Boolean));
1004        assert_eq!(JsonbType::from_u8(2), Some(JsonbType::Int64));
1005        assert_eq!(JsonbType::from_u8(3), Some(JsonbType::Float64));
1006        assert_eq!(JsonbType::from_u8(4), Some(JsonbType::String));
1007        assert_eq!(JsonbType::from_u8(5), Some(JsonbType::Array));
1008        assert_eq!(JsonbType::from_u8(6), Some(JsonbType::Object));
1009        assert_eq!(JsonbType::from_u8(7), None); // Invalid value
1010    }
1011
1012    #[tokio::test]
1013    async fn test_json_extract_udf() -> Result<()> {
1014        let json = r#"{"user": {"name": "Alice", "age": 30}}"#;
1015        let jsonb_bytes = create_test_jsonb(json);
1016
1017        let mut binary_builder = LargeBinaryBuilder::new();
1018        binary_builder.append_value(&jsonb_bytes);
1019        binary_builder.append_value(&jsonb_bytes);
1020        binary_builder.append_null();
1021
1022        let jsonb_array = Arc::new(binary_builder.finish());
1023        let path_array = Arc::new(StringArray::from(vec![
1024            Some("$.user.name"),
1025            Some("$.user.age"),
1026            Some("$.user.name"),
1027        ]));
1028
1029        let result = json_extract_impl(&[jsonb_array, path_array])?;
1030        let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1031
1032        assert_eq!(string_array.len(), 3);
1033        assert_eq!(string_array.value(0), "\"Alice\"");
1034        assert_eq!(string_array.value(1), "30");
1035        assert!(string_array.is_null(2));
1036
1037        Ok(())
1038    }
1039
1040    #[tokio::test]
1041    async fn test_json_exists_udf() -> Result<()> {
1042        let json = r#"{"user": {"name": "Alice", "age": 30}, "tags": ["rust", "json"]}"#;
1043        let jsonb_bytes = create_test_jsonb(json);
1044
1045        let mut binary_builder = LargeBinaryBuilder::new();
1046        binary_builder.append_value(&jsonb_bytes);
1047        binary_builder.append_value(&jsonb_bytes);
1048        binary_builder.append_value(&jsonb_bytes);
1049        binary_builder.append_null();
1050
1051        let jsonb_array = Arc::new(binary_builder.finish());
1052        let path_array = Arc::new(StringArray::from(vec![
1053            Some("$.user.name"),
1054            Some("$.user.email"),
1055            Some("$.tags"),
1056            Some("$.any"),
1057        ]));
1058
1059        let result = json_exists_impl(&[jsonb_array, path_array])?;
1060        let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1061
1062        assert_eq!(bool_array.len(), 4);
1063        assert!(bool_array.value(0));
1064        assert!(!bool_array.value(1));
1065        assert!(bool_array.value(2));
1066        assert!(bool_array.is_null(3));
1067
1068        Ok(())
1069    }
1070
1071    #[tokio::test]
1072    async fn test_json_get_string_udf() -> Result<()> {
1073        // Test valid string conversions
1074        let json = r#"{"str": "hello", "num": 123, "bool": true, "null": null}"#;
1075        let jsonb_bytes = create_test_jsonb(json);
1076
1077        let mut binary_builder = LargeBinaryBuilder::new();
1078        binary_builder.append_value(&jsonb_bytes);
1079        binary_builder.append_value(&jsonb_bytes);
1080        binary_builder.append_value(&jsonb_bytes);
1081        binary_builder.append_value(&jsonb_bytes);
1082
1083        let jsonb_array = Arc::new(binary_builder.finish());
1084        let key_array = Arc::new(StringArray::from(vec![
1085            Some("str"),
1086            Some("num"),
1087            Some("bool"),
1088            Some("null"),
1089        ]));
1090
1091        let result = json_get_string_impl(&[jsonb_array, key_array])?;
1092        let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1093
1094        assert_eq!(string_array.len(), 4);
1095        assert_eq!(string_array.value(0), "hello");
1096        assert_eq!(string_array.value(1), "123");
1097        assert_eq!(string_array.value(2), "true");
1098        assert!(string_array.is_null(3));
1099
1100        Ok(())
1101    }
1102
1103    #[tokio::test]
1104    async fn test_json_get_int_udf() -> Result<()> {
1105        let json = r#"{"int": 42, "str_num": "99", "bool": true}"#;
1106        let jsonb_bytes = create_test_jsonb(json);
1107
1108        let mut binary_builder = LargeBinaryBuilder::new();
1109        binary_builder.append_value(&jsonb_bytes);
1110        binary_builder.append_value(&jsonb_bytes);
1111        binary_builder.append_value(&jsonb_bytes);
1112
1113        let jsonb_array = Arc::new(binary_builder.finish());
1114        let key_array = Arc::new(StringArray::from(vec![
1115            Some("int"),
1116            Some("str_num"),
1117            Some("bool"),
1118        ]));
1119
1120        let result = json_get_int_impl(&[jsonb_array, key_array])?;
1121        let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
1122
1123        assert_eq!(int_array.len(), 3);
1124        assert_eq!(int_array.value(0), 42);
1125        assert_eq!(int_array.value(1), 99);
1126        assert_eq!(int_array.value(2), 1); // jsonb converts true to 1
1127
1128        Ok(())
1129    }
1130
1131    #[tokio::test]
1132    async fn test_json_get_float_udf() -> Result<()> {
1133        let json = r#"{
1134            "float_decimal": 1.5,
1135            "float_neg": -2.5,
1136            "float_int_value": 1.0,
1137            "float_exp": 1e2,
1138            "int_pos": 42,
1139            "int_neg": -7,
1140            "big_int": 9223372036854775808,
1141            "str_num": "3.5",
1142            "bool_true": true,
1143            "null_val": null
1144        }"#;
1145        let jsonb_bytes = create_test_jsonb(json);
1146
1147        let mut binary_builder = LargeBinaryBuilder::new();
1148        for _ in 0..11 {
1149            binary_builder.append_value(&jsonb_bytes);
1150        }
1151        let jsonb_array = Arc::new(binary_builder.finish());
1152        let key_array = Arc::new(StringArray::from(vec![
1153            Some("float_decimal"),
1154            Some("float_neg"),
1155            Some("float_int_value"),
1156            Some("float_exp"),
1157            Some("int_pos"),
1158            Some("int_neg"),
1159            Some("big_int"),
1160            Some("str_num"),
1161            Some("bool_true"),
1162            Some("null_val"),
1163            Some("missing"),
1164        ]));
1165
1166        let result = json_get_float_impl(&[jsonb_array, key_array])?;
1167        let float_array = result.as_any().downcast_ref::<Float64Array>().unwrap();
1168
1169        assert_eq!(float_array.len(), 11);
1170        assert_eq!(float_array.value(0), 1.5);
1171        assert_eq!(float_array.value(1), -2.5);
1172        assert_eq!(float_array.value(2), 1.0);
1173        assert_eq!(float_array.value(3), 100.0);
1174        assert_eq!(float_array.value(4), 42.0);
1175        assert_eq!(float_array.value(5), -7.0);
1176        // 2^63 is exactly representable in f64.
1177        assert_eq!(float_array.value(6), 9223372036854775808.0);
1178        assert_eq!(float_array.value(7), 3.5);
1179        assert_eq!(float_array.value(8), 1.0); // jsonb converts true to 1.0
1180        assert!(float_array.is_null(9));
1181        assert!(float_array.is_null(10));
1182
1183        Ok(())
1184    }
1185
1186    #[tokio::test]
1187    async fn test_json_get_bool_udf() -> Result<()> {
1188        let json =
1189            r#"{"bool_true": true, "bool_false": false, "str_true": "true", "str_false": "false"}"#;
1190        let jsonb_bytes = create_test_jsonb(json);
1191
1192        let mut binary_builder = LargeBinaryBuilder::new();
1193        binary_builder.append_value(&jsonb_bytes);
1194        binary_builder.append_value(&jsonb_bytes);
1195        binary_builder.append_value(&jsonb_bytes);
1196        binary_builder.append_value(&jsonb_bytes);
1197
1198        let jsonb_array = Arc::new(binary_builder.finish());
1199        let key_array = Arc::new(StringArray::from(vec![
1200            Some("bool_true"),
1201            Some("bool_false"),
1202            Some("str_true"),
1203            Some("str_false"),
1204        ]));
1205
1206        let result = json_get_bool_impl(&[jsonb_array, key_array])?;
1207        let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1208
1209        assert_eq!(bool_array.len(), 4);
1210        assert!(bool_array.value(0));
1211        assert!(!bool_array.value(1));
1212        assert!(bool_array.value(2)); // "true" string converts to true
1213        assert!(!bool_array.value(3)); // "false" string converts to false
1214
1215        Ok(())
1216    }
1217
1218    #[tokio::test]
1219    async fn test_json_array_contains_udf() -> Result<()> {
1220        let json = r#"{"tags": ["rust", "json", "database"], "nums": [1, 2, 3]}"#;
1221        let jsonb_bytes = create_test_jsonb(json);
1222
1223        let mut binary_builder = LargeBinaryBuilder::new();
1224        binary_builder.append_value(&jsonb_bytes);
1225        binary_builder.append_value(&jsonb_bytes);
1226        binary_builder.append_value(&jsonb_bytes);
1227        binary_builder.append_null();
1228
1229        let jsonb_array = Arc::new(binary_builder.finish());
1230        let path_array = Arc::new(StringArray::from(vec![
1231            Some("$.tags"),
1232            Some("$.tags"),
1233            Some("$.nums"),
1234            Some("$.tags"),
1235        ]));
1236        let value_array = Arc::new(StringArray::from(vec![
1237            Some("rust"),
1238            Some("python"),
1239            Some("2"),
1240            Some("any"),
1241        ]));
1242
1243        let result = json_array_contains_impl(&[jsonb_array, path_array, value_array])?;
1244        let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1245
1246        assert_eq!(bool_array.len(), 4);
1247        assert!(bool_array.value(0));
1248        assert!(!bool_array.value(1));
1249        assert!(bool_array.value(2));
1250        assert!(bool_array.is_null(3));
1251
1252        Ok(())
1253    }
1254
1255    #[tokio::test]
1256    async fn test_json_array_length_udf() -> Result<()> {
1257        let json = r#"{"empty": [], "tags": ["a", "b", "c"], "nested": {"arr": [1, 2]}}"#;
1258        let jsonb_bytes = create_test_jsonb(json);
1259
1260        let mut binary_builder = LargeBinaryBuilder::new();
1261        binary_builder.append_value(&jsonb_bytes);
1262        binary_builder.append_value(&jsonb_bytes);
1263        binary_builder.append_value(&jsonb_bytes);
1264        binary_builder.append_null();
1265
1266        let jsonb_array = Arc::new(binary_builder.finish());
1267        let path_array = Arc::new(StringArray::from(vec![
1268            Some("$.empty"),
1269            Some("$.tags"),
1270            Some("$.nested.arr"),
1271            Some("$.any"),
1272        ]));
1273
1274        let result = json_array_length_impl(&[jsonb_array, path_array])?;
1275        let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
1276
1277        assert_eq!(int_array.len(), 4);
1278        assert_eq!(int_array.value(0), 0);
1279        assert_eq!(int_array.value(1), 3);
1280        assert_eq!(int_array.value(2), 2);
1281        assert!(int_array.is_null(3));
1282
1283        Ok(())
1284    }
1285
1286    #[tokio::test]
1287    async fn test_json_extract_with_type() -> Result<()> {
1288        use arrow_array::StructArray;
1289        use arrow_array::UInt8Array;
1290
1291        let cases: &[(&str, JsonbType)] = &[
1292            (r#"{"v": 1}"#, JsonbType::Int64),
1293            (r#"{"v": 0}"#, JsonbType::Int64),
1294            (r#"{"v": -42}"#, JsonbType::Int64),
1295            (r#"{"v": 9223372036854775807}"#, JsonbType::Int64), // i64::MAX
1296            (r#"{"v": 9223372036854775808}"#, JsonbType::Float64), // i64::MAX + 1
1297            (r#"{"v": 1.0}"#, JsonbType::Float64),
1298            (r#"{"v": 2.7}"#, JsonbType::Float64),
1299            (r#"{"v": 1.5}"#, JsonbType::Float64),
1300            (r#"{"v": -1.5}"#, JsonbType::Float64),
1301            (r#"{"v": 1e2}"#, JsonbType::Float64),
1302        ];
1303
1304        for (json, expected) in cases {
1305            let bytes = create_test_jsonb(json);
1306            let mut binary_builder = LargeBinaryBuilder::new();
1307            binary_builder.append_value(&bytes);
1308            let jsonb_array: ArrayRef = Arc::new(binary_builder.finish());
1309            let path_array: ArrayRef = Arc::new(StringArray::from(vec![Some("$.v")]));
1310
1311            let result = json_extract_with_type_impl(&[jsonb_array, path_array])?;
1312            let struct_array = result.as_any().downcast_ref::<StructArray>().unwrap();
1313            let type_tags = struct_array
1314                .column_by_name("type_tag")
1315                .unwrap()
1316                .as_any()
1317                .downcast_ref::<UInt8Array>()
1318                .unwrap();
1319            assert_eq!(type_tags.value(0), expected.as_u8());
1320        }
1321
1322        Ok(())
1323    }
1324
1325    #[tokio::test]
1326    async fn test_json_array_access() -> Result<()> {
1327        let json = r#"["first", "second", "third"]"#;
1328        let jsonb_bytes = create_test_jsonb(json);
1329
1330        let mut binary_builder = LargeBinaryBuilder::new();
1331        binary_builder.append_value(&jsonb_bytes);
1332        binary_builder.append_value(&jsonb_bytes);
1333        binary_builder.append_value(&jsonb_bytes);
1334
1335        let jsonb_array = Arc::new(binary_builder.finish());
1336        let key_array = Arc::new(StringArray::from(vec![
1337            Some("0"),
1338            Some("1"),
1339            Some("10"), // Out of bounds
1340        ]));
1341
1342        let result = json_get_string_impl(&[jsonb_array, key_array])?;
1343        let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1344
1345        assert_eq!(string_array.len(), 3);
1346        assert_eq!(string_array.value(0), "first");
1347        assert_eq!(string_array.value(1), "second");
1348        assert!(string_array.is_null(2));
1349
1350        Ok(())
1351    }
1352}