lance_datafusion/udf/
json.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_array::builder::{
5    BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder,
6};
7use arrow_array::{Array, ArrayRef, LargeBinaryArray, StringArray};
8use arrow_schema::DataType;
9use datafusion::error::{DataFusionError, Result};
10use datafusion::logical_expr::{ScalarUDF, Volatility};
11use datafusion::physical_plan::ColumnarValue;
12use datafusion::prelude::create_udf;
13use std::sync::Arc;
14
15/// Common helper functions and types for JSON UDFs
16mod common {
17    use super::*;
18
19    /// Key type for JSON field access - optimizes field/index parsing
20    #[derive(Debug, Clone)]
21    pub enum KeyType {
22        Field(String),
23        Index(usize),
24    }
25
26    impl KeyType {
27        /// Parse a key string into either a field name or array index (once per operation)
28        pub fn parse(key: &str) -> Self {
29            if let Ok(index) = key.parse::<usize>() {
30                Self::Index(index)
31            } else {
32                Self::Field(key.to_string())
33            }
34        }
35    }
36
37    /// Convert ColumnarValue arguments to ArrayRef vector
38    ///
39    /// Note: This implementation currently broadcasts scalars to arrays.
40    /// Future optimization: handle scalars directly without broadcasting
41    /// to improve performance for scalar inputs.
42    pub fn columnar_to_arrays(args: &[ColumnarValue]) -> Vec<ArrayRef> {
43        args.iter()
44            .map(|arg| match arg {
45                ColumnarValue::Array(arr) => arr.clone(),
46                ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(),
47            })
48            .collect()
49    }
50
51    /// Create DataFusionError for execution failures (simplified error wrapping)
52    pub fn execution_error(msg: impl Into<String>) -> DataFusionError {
53        DataFusionError::Execution(msg.into())
54    }
55
56    /// Validate argument count for UDF
57    pub fn validate_arg_count(
58        args: &[ArrayRef],
59        expected: usize,
60        function_name: &str,
61    ) -> Result<()> {
62        if args.len() != expected {
63            return Err(execution_error(format!(
64                "{} requires exactly {} arguments",
65                function_name, expected
66            )));
67        }
68        Ok(())
69    }
70
71    /// Extract and validate LargeBinaryArray from first argument
72    pub fn extract_jsonb_array(args: &[ArrayRef]) -> Result<&LargeBinaryArray> {
73        args[0]
74            .as_any()
75            .downcast_ref::<LargeBinaryArray>()
76            .ok_or_else(|| execution_error("First argument must be LargeBinary"))
77    }
78
79    /// Extract and validate StringArray from specified argument
80    pub fn extract_string_array(args: &[ArrayRef], arg_index: usize) -> Result<&StringArray> {
81        args[arg_index]
82            .as_any()
83            .downcast_ref::<StringArray>()
84            .ok_or_else(|| execution_error(format!("Argument {} must be String", arg_index + 1)))
85    }
86
87    /// Get string value at index, handling scalar broadcast case
88    /// When a scalar is converted to an array, it becomes a single-element array
89    /// This function handles accessing that value repeatedly for all rows
90    pub fn get_string_value_at(string_array: &StringArray, index: usize) -> Option<&str> {
91        // Handle scalar broadcast case: if array has only 1 element, always use index 0
92        let actual_index = if string_array.len() == 1 { 0 } else { index };
93
94        if string_array.is_null(actual_index) {
95            None
96        } else {
97            Some(string_array.value(actual_index))
98        }
99    }
100
101    /// Get JSON field/element using pre-parsed key type (avoids repeated parsing)
102    pub fn get_json_value_by_key(
103        raw_jsonb: &jsonb::RawJsonb,
104        key_type: &KeyType,
105    ) -> Result<Option<jsonb::OwnedJsonb>> {
106        match key_type {
107            KeyType::Field(field) => raw_jsonb
108                .get_by_name(field, false)
109                .map_err(|e| execution_error(format!("Failed to get field '{}': {}", field, e))),
110            KeyType::Index(index) => raw_jsonb.get_by_index(*index).map_err(|e| {
111                execution_error(format!("Failed to get array element [{}]: {}", index, e))
112            }),
113        }
114    }
115
116    /// Parse JSONPath with proper error handling (no false returns)
117    pub fn parse_json_path(path: &str) -> Result<jsonb::jsonpath::JsonPath<'_>> {
118        jsonb::jsonpath::parse_json_path(path.as_bytes())
119            .map_err(|e| execution_error(format!("Invalid JSONPath '{}': {}", path, e)))
120    }
121}
122
123/// Convert JSONB value to string using jsonb's built-in serde (strict mode)
124fn json_value_to_string(value: jsonb::OwnedJsonb) -> Result<Option<String>> {
125    let raw_jsonb = value.as_raw();
126
127    // Check for null first
128    if raw_jsonb
129        .is_null()
130        .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
131    {
132        return Ok(None);
133    }
134
135    // Use jsonb's built-in to_str() method - strict conversion
136    raw_jsonb
137        .to_str()
138        .map(Some)
139        .map_err(|e| common::execution_error(format!("Failed to convert to string: {}", e)))
140}
141
142/// Convert JSONB value to integer using jsonb's built-in serde (strict mode)
143fn json_value_to_int(value: jsonb::OwnedJsonb) -> Result<Option<i64>> {
144    let raw_jsonb = value.as_raw();
145
146    // Check for null first
147    if raw_jsonb
148        .is_null()
149        .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
150    {
151        return Ok(None);
152    }
153
154    // Use jsonb's built-in to_i64() method - strict conversion
155    raw_jsonb
156        .to_i64()
157        .map(Some)
158        .map_err(|e| common::execution_error(format!("Failed to convert to integer: {}", e)))
159}
160
161/// Convert JSONB value to float using jsonb's built-in serde (strict mode)
162fn json_value_to_float(value: jsonb::OwnedJsonb) -> Result<Option<f64>> {
163    let raw_jsonb = value.as_raw();
164
165    // Check for null first
166    if raw_jsonb
167        .is_null()
168        .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
169    {
170        return Ok(None);
171    }
172
173    // Use jsonb's built-in to_f64() method - strict conversion
174    raw_jsonb
175        .to_f64()
176        .map(Some)
177        .map_err(|e| common::execution_error(format!("Failed to convert to float: {}", e)))
178}
179
180/// Convert JSONB value to boolean using jsonb's built-in serde (strict mode)
181fn json_value_to_bool(value: jsonb::OwnedJsonb) -> Result<Option<bool>> {
182    let raw_jsonb = value.as_raw();
183
184    // Check for null first
185    if raw_jsonb
186        .is_null()
187        .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
188    {
189        return Ok(None);
190    }
191
192    // Use jsonb's built-in to_bool() method - strict conversion
193    raw_jsonb
194        .to_bool()
195        .map(Some)
196        .map_err(|e| common::execution_error(format!("Failed to convert to boolean: {}", e)))
197}
198
199/// Create the json_extract UDF for extracting JSONPath from JSON data
200///
201/// # Arguments
202/// * First parameter: JSONB binary data (LargeBinary)
203/// * Second parameter: JSONPath expression as string (Utf8)
204///
205/// # Returns
206/// String representation of the extracted value, or null if path not found
207pub fn json_extract_udf() -> ScalarUDF {
208    create_udf(
209        "json_extract",
210        vec![DataType::LargeBinary, DataType::Utf8],
211        DataType::Utf8,
212        Volatility::Immutable,
213        Arc::new(json_extract_columnar_impl),
214    )
215}
216
217/// Implementation of json_extract function with ColumnarValue
218fn json_extract_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
219    let arrays = common::columnar_to_arrays(args);
220    let result = json_extract_impl(&arrays)?;
221    Ok(ColumnarValue::Array(result))
222}
223
224/// Implementation of json_extract function
225fn json_extract_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
226    common::validate_arg_count(args, 2, "json_extract")?;
227
228    let jsonb_array = common::extract_jsonb_array(args)?;
229    let path_array = common::extract_string_array(args, 1)?;
230    let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
231
232    for i in 0..jsonb_array.len() {
233        if jsonb_array.is_null(i) {
234            builder.append_null();
235        } else if let Some(path) = common::get_string_value_at(path_array, i) {
236            let jsonb_bytes = jsonb_array.value(i);
237            match extract_json_path(jsonb_bytes, path)? {
238                Some(value) => builder.append_value(&value),
239                None => builder.append_null(),
240            }
241        } else {
242            builder.append_null();
243        }
244    }
245
246    Ok(Arc::new(builder.finish()))
247}
248
249/// Extract value from JSONB using JSONPath
250///
251/// Note: Uses `select_values` instead of the deprecated `select_by_path` method
252fn extract_json_path(jsonb_bytes: &[u8], path: &str) -> Result<Option<String>> {
253    let json_path = common::parse_json_path(path)?;
254
255    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
256    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
257    match selector.select_values(&json_path) {
258        Ok(values) => {
259            if values.is_empty() {
260                Ok(None)
261            } else {
262                // Return the first matched value
263                Ok(Some(values[0].to_string()))
264            }
265        }
266        Err(e) => Err(common::execution_error(format!(
267            "Failed to select values from path '{}': {}",
268            path, e
269        ))),
270    }
271}
272
273/// Create the json_exists UDF for checking if a JSONPath exists
274///
275/// # Arguments
276/// * First parameter: JSONB binary data (LargeBinary)
277/// * Second parameter: JSONPath expression as string (Utf8)
278///
279/// # Returns
280/// Boolean indicating whether the path exists in the JSON data
281pub fn json_exists_udf() -> ScalarUDF {
282    create_udf(
283        "json_exists",
284        vec![DataType::LargeBinary, DataType::Utf8],
285        DataType::Boolean,
286        Volatility::Immutable,
287        Arc::new(json_exists_columnar_impl),
288    )
289}
290
291/// Implementation of json_exists function with ColumnarValue
292fn json_exists_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
293    let arrays = common::columnar_to_arrays(args);
294    let result = json_exists_impl(&arrays)?;
295    Ok(ColumnarValue::Array(result))
296}
297
298/// Implementation of json_exists function
299fn json_exists_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
300    common::validate_arg_count(args, 2, "json_exists")?;
301
302    let jsonb_array = common::extract_jsonb_array(args)?;
303    let path_array = common::extract_string_array(args, 1)?;
304
305    let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
306
307    for i in 0..jsonb_array.len() {
308        if jsonb_array.is_null(i) {
309            builder.append_null();
310        } else if let Some(path) = common::get_string_value_at(path_array, i) {
311            let jsonb_bytes = jsonb_array.value(i);
312            let exists = check_json_path_exists(jsonb_bytes, path)?;
313            builder.append_value(exists);
314        } else {
315            builder.append_null();
316        }
317    }
318
319    Ok(Arc::new(builder.finish()))
320}
321
322/// Check if a JSONPath exists in JSONB
323fn check_json_path_exists(jsonb_bytes: &[u8], path: &str) -> Result<bool> {
324    let json_path = common::parse_json_path(path)?;
325
326    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
327    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
328    match selector.exists(&json_path) {
329        Ok(exists) => Ok(exists),
330        Err(e) => Err(common::execution_error(format!(
331            "Failed to check existence of path '{}': {}",
332            path, e
333        ))),
334    }
335}
336
337/// Create the json_get UDF for getting a field value as JSON string
338///
339/// # Arguments
340/// * First parameter: JSONB binary data (LargeBinary)
341/// * Second parameter: Field name or array index as string (Utf8)
342///
343/// # Returns
344/// Raw JSONB bytes of the field value, or null if not found
345pub fn json_get_udf() -> ScalarUDF {
346    create_udf(
347        "json_get",
348        vec![DataType::LargeBinary, DataType::Utf8],
349        DataType::LargeBinary,
350        Volatility::Immutable,
351        Arc::new(json_get_columnar_impl),
352    )
353}
354
355/// Implementation of json_get function with ColumnarValue
356fn json_get_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
357    let arrays = common::columnar_to_arrays(args);
358    let result = json_get_impl(&arrays)?;
359    Ok(ColumnarValue::Array(result))
360}
361
362/// Implementation of json_get function
363fn json_get_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
364    common::validate_arg_count(args, 2, "json_get")?;
365
366    let jsonb_array = common::extract_jsonb_array(args)?;
367    let key_array = common::extract_string_array(args, 1)?;
368
369    let mut builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 0);
370
371    for i in 0..jsonb_array.len() {
372        if jsonb_array.is_null(i) {
373            builder.append_null();
374        } else if let Some(key) = common::get_string_value_at(key_array, i) {
375            let jsonb_bytes = jsonb_array.value(i);
376            let key_type = common::KeyType::parse(key);
377            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
378
379            match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
380                Some(value) => builder.append_value(value.as_raw().as_ref()),
381                None => builder.append_null(),
382            }
383        } else {
384            builder.append_null();
385        }
386    }
387
388    Ok(Arc::new(builder.finish()))
389}
390
391/// Create the json_get_string UDF for getting a string value
392///
393/// # Arguments
394/// * First parameter: JSONB binary data (LargeBinary)
395/// * Second parameter: Field name or array index as string (Utf8)
396///
397/// # Returns
398/// String value with type coercion (numbers/booleans converted to strings)
399pub fn json_get_string_udf() -> ScalarUDF {
400    create_udf(
401        "json_get_string",
402        vec![DataType::LargeBinary, DataType::Utf8],
403        DataType::Utf8,
404        Volatility::Immutable,
405        Arc::new(json_get_string_columnar_impl),
406    )
407}
408
409/// Implementation of json_get_string function with ColumnarValue
410fn json_get_string_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
411    let arrays = common::columnar_to_arrays(args);
412    let result = json_get_string_impl(&arrays)?;
413    Ok(ColumnarValue::Array(result))
414}
415
416/// Implementation of json_get_string function
417fn json_get_string_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
418    common::validate_arg_count(args, 2, "json_get_string")?;
419
420    let jsonb_array = common::extract_jsonb_array(args)?;
421    let key_array = common::extract_string_array(args, 1)?;
422
423    let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
424
425    for i in 0..jsonb_array.len() {
426        if jsonb_array.is_null(i) {
427            builder.append_null();
428        } else if let Some(key) = common::get_string_value_at(key_array, i) {
429            let jsonb_bytes = jsonb_array.value(i);
430            let key_type = common::KeyType::parse(key);
431            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
432
433            match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
434                Some(value) => match json_value_to_string(value)? {
435                    Some(string_val) => builder.append_value(&string_val),
436                    None => builder.append_null(),
437                },
438                None => builder.append_null(),
439            }
440        } else {
441            builder.append_null();
442        }
443    }
444
445    Ok(Arc::new(builder.finish()))
446}
447
448/// Create the json_get_int UDF for getting an integer value
449///
450/// # Arguments
451/// * First parameter: JSONB binary data (LargeBinary)
452/// * Second parameter: Field name or array index as string (Utf8)
453///
454/// # Returns
455/// Integer value with type coercion (strings/floats/booleans converted to int)
456pub fn json_get_int_udf() -> ScalarUDF {
457    create_udf(
458        "json_get_int",
459        vec![DataType::LargeBinary, DataType::Utf8],
460        DataType::Int64,
461        Volatility::Immutable,
462        Arc::new(json_get_int_columnar_impl),
463    )
464}
465
466/// Implementation of json_get_int function with ColumnarValue
467fn json_get_int_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
468    let arrays = common::columnar_to_arrays(args);
469    let result = json_get_int_impl(&arrays)?;
470    Ok(ColumnarValue::Array(result))
471}
472
473/// Implementation of json_get_int function
474fn json_get_int_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
475    common::validate_arg_count(args, 2, "json_get_int")?;
476
477    let jsonb_array = common::extract_jsonb_array(args)?;
478    let key_array = common::extract_string_array(args, 1)?;
479
480    let mut builder = Int64Builder::with_capacity(jsonb_array.len());
481
482    for i in 0..jsonb_array.len() {
483        if jsonb_array.is_null(i) {
484            builder.append_null();
485        } else if let Some(key) = common::get_string_value_at(key_array, i) {
486            let jsonb_bytes = jsonb_array.value(i);
487            let key_type = common::KeyType::parse(key);
488            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
489
490            match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
491                Some(value) => match json_value_to_int(value)? {
492                    Some(int_val) => builder.append_value(int_val),
493                    None => builder.append_null(),
494                },
495                None => builder.append_null(),
496            }
497        } else {
498            builder.append_null();
499        }
500    }
501
502    Ok(Arc::new(builder.finish()))
503}
504
505/// Create the json_get_float UDF for getting a float value
506///
507/// # Arguments
508/// * First parameter: JSONB binary data (LargeBinary)
509/// * Second parameter: Field name or array index as string (Utf8)
510///
511/// # Returns
512/// Float value with type coercion (strings/integers/booleans converted to float)
513pub fn json_get_float_udf() -> ScalarUDF {
514    create_udf(
515        "json_get_float",
516        vec![DataType::LargeBinary, DataType::Utf8],
517        DataType::Float64,
518        Volatility::Immutable,
519        Arc::new(json_get_float_columnar_impl),
520    )
521}
522
523/// Implementation of json_get_float function with ColumnarValue
524fn json_get_float_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
525    let arrays = common::columnar_to_arrays(args);
526    let result = json_get_float_impl(&arrays)?;
527    Ok(ColumnarValue::Array(result))
528}
529
530/// Implementation of json_get_float function
531fn json_get_float_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
532    common::validate_arg_count(args, 2, "json_get_float")?;
533
534    let jsonb_array = common::extract_jsonb_array(args)?;
535    let key_array = common::extract_string_array(args, 1)?;
536
537    let mut builder = Float64Builder::with_capacity(jsonb_array.len());
538
539    for i in 0..jsonb_array.len() {
540        if jsonb_array.is_null(i) {
541            builder.append_null();
542        } else if let Some(key) = common::get_string_value_at(key_array, i) {
543            let jsonb_bytes = jsonb_array.value(i);
544            let key_type = common::KeyType::parse(key);
545            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
546
547            match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
548                Some(value) => match json_value_to_float(value)? {
549                    Some(float_val) => builder.append_value(float_val),
550                    None => builder.append_null(),
551                },
552                None => builder.append_null(),
553            }
554        } else {
555            builder.append_null();
556        }
557    }
558
559    Ok(Arc::new(builder.finish()))
560}
561
562/// Create the json_get_bool UDF for getting a boolean value
563///
564/// # Arguments
565/// * First parameter: JSONB binary data (LargeBinary)
566/// * Second parameter: Field name or array index as string (Utf8)
567///
568/// # Returns
569/// Boolean value with flexible type coercion (strings like 'true'/'yes'/'1' become true)
570pub fn json_get_bool_udf() -> ScalarUDF {
571    create_udf(
572        "json_get_bool",
573        vec![DataType::LargeBinary, DataType::Utf8],
574        DataType::Boolean,
575        Volatility::Immutable,
576        Arc::new(json_get_bool_columnar_impl),
577    )
578}
579
580/// Implementation of json_get_bool function with ColumnarValue
581fn json_get_bool_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
582    let arrays = common::columnar_to_arrays(args);
583    let result = json_get_bool_impl(&arrays)?;
584    Ok(ColumnarValue::Array(result))
585}
586
587/// Implementation of json_get_bool function
588fn json_get_bool_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
589    common::validate_arg_count(args, 2, "json_get_bool")?;
590
591    let jsonb_array = common::extract_jsonb_array(args)?;
592    let key_array = common::extract_string_array(args, 1)?;
593
594    let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
595
596    for i in 0..jsonb_array.len() {
597        if jsonb_array.is_null(i) {
598            builder.append_null();
599        } else if let Some(key) = common::get_string_value_at(key_array, i) {
600            let jsonb_bytes = jsonb_array.value(i);
601            let key_type = common::KeyType::parse(key);
602            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
603
604            match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
605                Some(value) => match json_value_to_bool(value)? {
606                    Some(bool_val) => builder.append_value(bool_val),
607                    None => builder.append_null(),
608                },
609                None => builder.append_null(),
610            }
611        } else {
612            builder.append_null();
613        }
614    }
615
616    Ok(Arc::new(builder.finish()))
617}
618
619/// Create the json_array_contains UDF for checking if array contains a value
620///
621/// # Arguments
622/// * First parameter: JSONB binary data (LargeBinary)
623/// * Second parameter: JSONPath to array location (Utf8)
624/// * Third parameter: Value to search for as string (Utf8)
625///
626/// # Returns
627/// Boolean indicating whether the array contains the specified value
628pub fn json_array_contains_udf() -> ScalarUDF {
629    create_udf(
630        "json_array_contains",
631        vec![DataType::LargeBinary, DataType::Utf8, DataType::Utf8],
632        DataType::Boolean,
633        Volatility::Immutable,
634        Arc::new(json_array_contains_columnar_impl),
635    )
636}
637
638/// Implementation of json_array_contains function with ColumnarValue
639fn json_array_contains_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
640    let arrays = common::columnar_to_arrays(args);
641    let result = json_array_contains_impl(&arrays)?;
642    Ok(ColumnarValue::Array(result))
643}
644
645/// Implementation of json_array_contains function
646fn json_array_contains_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
647    common::validate_arg_count(args, 3, "json_array_contains")?;
648
649    let jsonb_array = common::extract_jsonb_array(args)?;
650    let path_array = common::extract_string_array(args, 1)?;
651    let value_array = common::extract_string_array(args, 2)?;
652
653    let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
654
655    for i in 0..jsonb_array.len() {
656        if jsonb_array.is_null(i) {
657            builder.append_null();
658        } else {
659            let path = common::get_string_value_at(path_array, i);
660            let value = common::get_string_value_at(value_array, i);
661
662            match (path, value) {
663                (Some(p), Some(v)) => {
664                    let jsonb_bytes = jsonb_array.value(i);
665                    let contains = check_array_contains(jsonb_bytes, p, v)?;
666                    builder.append_value(contains);
667                }
668                _ => builder.append_null(),
669            }
670        }
671    }
672
673    Ok(Arc::new(builder.finish()))
674}
675
676/// Check if a JSON array at path contains a value
677fn check_array_contains(jsonb_bytes: &[u8], path: &str, value: &str) -> Result<bool> {
678    let json_path = common::parse_json_path(path)?;
679
680    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
681    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
682    match selector.select_values(&json_path) {
683        Ok(values) => {
684            for v in values {
685                // Convert to raw JSONB for direct access
686                let raw = v.as_raw();
687                // Check if it's an array by trying to iterate
688                let mut index = 0;
689                loop {
690                    match raw.get_by_index(index) {
691                        Ok(Some(elem)) => {
692                            let elem_str = elem.to_string();
693                            // Compare as JSON strings (with quotes for strings)
694                            if elem_str == value || elem_str == format!("\"{}\"", value) {
695                                return Ok(true);
696                            }
697                            index += 1;
698                        }
699                        Ok(None) => break, // End of array
700                        Err(_) => break,   // Not an array or error
701                    }
702                }
703            }
704            Ok(false)
705        }
706        Err(e) => Err(common::execution_error(format!(
707            "Failed to check array contains at path '{}': {}",
708            path, e
709        ))),
710    }
711}
712
713/// Create the json_array_length UDF for getting array length
714///
715/// # Arguments
716/// * First parameter: JSONB binary data (LargeBinary)
717/// * Second parameter: JSONPath to array location (Utf8)
718///
719/// # Returns
720/// Integer length of the JSON array, or null if path doesn't point to an array
721pub fn json_array_length_udf() -> ScalarUDF {
722    create_udf(
723        "json_array_length",
724        vec![DataType::LargeBinary, DataType::Utf8],
725        DataType::Int64,
726        Volatility::Immutable,
727        Arc::new(json_array_length_columnar_impl),
728    )
729}
730
731/// Implementation of json_array_length function with ColumnarValue
732fn json_array_length_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
733    let arrays = common::columnar_to_arrays(args);
734    let result = json_array_length_impl(&arrays)?;
735    Ok(ColumnarValue::Array(result))
736}
737
738/// Implementation of json_array_length function
739fn json_array_length_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
740    common::validate_arg_count(args, 2, "json_array_length")?;
741
742    let jsonb_array = common::extract_jsonb_array(args)?;
743    let path_array = common::extract_string_array(args, 1)?;
744
745    let mut builder = Int64Builder::with_capacity(jsonb_array.len());
746
747    for i in 0..jsonb_array.len() {
748        if jsonb_array.is_null(i) {
749            builder.append_null();
750        } else if let Some(path) = common::get_string_value_at(path_array, i) {
751            let jsonb_bytes = jsonb_array.value(i);
752            match get_array_length(jsonb_bytes, path)? {
753                Some(len) => builder.append_value(len),
754                None => builder.append_null(),
755            }
756        } else {
757            builder.append_null();
758        }
759    }
760
761    Ok(Arc::new(builder.finish()))
762}
763
764/// Get the length of a JSON array at path
765fn get_array_length(jsonb_bytes: &[u8], path: &str) -> Result<Option<i64>> {
766    let json_path = common::parse_json_path(path)?;
767
768    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
769    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
770    match selector.select_values(&json_path) {
771        Ok(values) => {
772            if values.is_empty() {
773                return Ok(None);
774            }
775            let first = &values[0];
776            let raw = first.as_raw();
777
778            // Count array elements by iterating
779            let mut count = 0;
780            loop {
781                match raw.get_by_index(count) {
782                    Ok(Some(_)) => count += 1,
783                    Ok(None) => break, // End of array
784                    Err(_) => {
785                        // Not an array
786                        if count == 0 {
787                            return Err(common::execution_error(format!(
788                                "Path '{}' does not point to an array",
789                                path
790                            )));
791                        }
792                        break;
793                    }
794                }
795            }
796            Ok(Some(count as i64))
797        }
798        Err(e) => Err(common::execution_error(format!(
799            "Failed to get array length at path '{}': {}",
800            path, e
801        ))),
802    }
803}
804
805#[cfg(test)]
806mod tests {
807    use super::*;
808    use arrow_array::builder::LargeBinaryBuilder;
809    use arrow_array::{BooleanArray, Int64Array};
810
811    fn create_test_jsonb(json_str: &str) -> Vec<u8> {
812        jsonb::parse_value(json_str.as_bytes()).unwrap().to_vec()
813    }
814
815    #[tokio::test]
816    async fn test_json_extract_udf() -> Result<()> {
817        let json = r#"{"user": {"name": "Alice", "age": 30}}"#;
818        let jsonb_bytes = create_test_jsonb(json);
819
820        let mut binary_builder = LargeBinaryBuilder::new();
821        binary_builder.append_value(&jsonb_bytes);
822        binary_builder.append_value(&jsonb_bytes);
823        binary_builder.append_null();
824
825        let jsonb_array = Arc::new(binary_builder.finish());
826        let path_array = Arc::new(StringArray::from(vec![
827            Some("$.user.name"),
828            Some("$.user.age"),
829            Some("$.user.name"),
830        ]));
831
832        let result = json_extract_impl(&[jsonb_array, path_array])?;
833        let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
834
835        assert_eq!(string_array.len(), 3);
836        assert_eq!(string_array.value(0), "\"Alice\"");
837        assert_eq!(string_array.value(1), "30");
838        assert!(string_array.is_null(2));
839
840        Ok(())
841    }
842
843    #[tokio::test]
844    async fn test_json_exists_udf() -> Result<()> {
845        let json = r#"{"user": {"name": "Alice", "age": 30}, "tags": ["rust", "json"]}"#;
846        let jsonb_bytes = create_test_jsonb(json);
847
848        let mut binary_builder = LargeBinaryBuilder::new();
849        binary_builder.append_value(&jsonb_bytes);
850        binary_builder.append_value(&jsonb_bytes);
851        binary_builder.append_value(&jsonb_bytes);
852        binary_builder.append_null();
853
854        let jsonb_array = Arc::new(binary_builder.finish());
855        let path_array = Arc::new(StringArray::from(vec![
856            Some("$.user.name"),
857            Some("$.user.email"),
858            Some("$.tags"),
859            Some("$.any"),
860        ]));
861
862        let result = json_exists_impl(&[jsonb_array, path_array])?;
863        let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
864
865        assert_eq!(bool_array.len(), 4);
866        assert!(bool_array.value(0));
867        assert!(!bool_array.value(1));
868        assert!(bool_array.value(2));
869        assert!(bool_array.is_null(3));
870
871        Ok(())
872    }
873
874    #[tokio::test]
875    async fn test_json_get_string_udf() -> Result<()> {
876        // Test valid string conversions
877        let json = r#"{"str": "hello", "num": 123, "bool": true, "null": null}"#;
878        let jsonb_bytes = create_test_jsonb(json);
879
880        let mut binary_builder = LargeBinaryBuilder::new();
881        binary_builder.append_value(&jsonb_bytes);
882        binary_builder.append_value(&jsonb_bytes);
883        binary_builder.append_value(&jsonb_bytes);
884        binary_builder.append_value(&jsonb_bytes);
885
886        let jsonb_array = Arc::new(binary_builder.finish());
887        let key_array = Arc::new(StringArray::from(vec![
888            Some("str"),
889            Some("num"),
890            Some("bool"),
891            Some("null"),
892        ]));
893
894        let result = json_get_string_impl(&[jsonb_array, key_array])?;
895        let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
896
897        assert_eq!(string_array.len(), 4);
898        assert_eq!(string_array.value(0), "hello");
899        assert_eq!(string_array.value(1), "123");
900        assert_eq!(string_array.value(2), "true");
901        assert!(string_array.is_null(3));
902
903        Ok(())
904    }
905
906    #[tokio::test]
907    async fn test_json_get_int_udf() -> Result<()> {
908        let json = r#"{"int": 42, "str_num": "99", "bool": true}"#;
909        let jsonb_bytes = create_test_jsonb(json);
910
911        let mut binary_builder = LargeBinaryBuilder::new();
912        binary_builder.append_value(&jsonb_bytes);
913        binary_builder.append_value(&jsonb_bytes);
914        binary_builder.append_value(&jsonb_bytes);
915
916        let jsonb_array = Arc::new(binary_builder.finish());
917        let key_array = Arc::new(StringArray::from(vec![
918            Some("int"),
919            Some("str_num"),
920            Some("bool"),
921        ]));
922
923        let result = json_get_int_impl(&[jsonb_array, key_array])?;
924        let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
925
926        assert_eq!(int_array.len(), 3);
927        assert_eq!(int_array.value(0), 42);
928        assert_eq!(int_array.value(1), 99);
929        assert_eq!(int_array.value(2), 1); // jsonb converts true to 1
930
931        Ok(())
932    }
933
934    #[tokio::test]
935    async fn test_json_get_bool_udf() -> Result<()> {
936        let json =
937            r#"{"bool_true": true, "bool_false": false, "str_true": "true", "str_false": "false"}"#;
938        let jsonb_bytes = create_test_jsonb(json);
939
940        let mut binary_builder = LargeBinaryBuilder::new();
941        binary_builder.append_value(&jsonb_bytes);
942        binary_builder.append_value(&jsonb_bytes);
943        binary_builder.append_value(&jsonb_bytes);
944        binary_builder.append_value(&jsonb_bytes);
945
946        let jsonb_array = Arc::new(binary_builder.finish());
947        let key_array = Arc::new(StringArray::from(vec![
948            Some("bool_true"),
949            Some("bool_false"),
950            Some("str_true"),
951            Some("str_false"),
952        ]));
953
954        let result = json_get_bool_impl(&[jsonb_array, key_array])?;
955        let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
956
957        assert_eq!(bool_array.len(), 4);
958        assert!(bool_array.value(0));
959        assert!(!bool_array.value(1));
960        assert!(bool_array.value(2)); // "true" string converts to true
961        assert!(!bool_array.value(3)); // "false" string converts to false
962
963        Ok(())
964    }
965
966    #[tokio::test]
967    async fn test_json_array_contains_udf() -> Result<()> {
968        let json = r#"{"tags": ["rust", "json", "database"], "nums": [1, 2, 3]}"#;
969        let jsonb_bytes = create_test_jsonb(json);
970
971        let mut binary_builder = LargeBinaryBuilder::new();
972        binary_builder.append_value(&jsonb_bytes);
973        binary_builder.append_value(&jsonb_bytes);
974        binary_builder.append_value(&jsonb_bytes);
975        binary_builder.append_null();
976
977        let jsonb_array = Arc::new(binary_builder.finish());
978        let path_array = Arc::new(StringArray::from(vec![
979            Some("$.tags"),
980            Some("$.tags"),
981            Some("$.nums"),
982            Some("$.tags"),
983        ]));
984        let value_array = Arc::new(StringArray::from(vec![
985            Some("rust"),
986            Some("python"),
987            Some("2"),
988            Some("any"),
989        ]));
990
991        let result = json_array_contains_impl(&[jsonb_array, path_array, value_array])?;
992        let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
993
994        assert_eq!(bool_array.len(), 4);
995        assert!(bool_array.value(0));
996        assert!(!bool_array.value(1));
997        assert!(bool_array.value(2));
998        assert!(bool_array.is_null(3));
999
1000        Ok(())
1001    }
1002
1003    #[tokio::test]
1004    async fn test_json_array_length_udf() -> Result<()> {
1005        let json = r#"{"empty": [], "tags": ["a", "b", "c"], "nested": {"arr": [1, 2]}}"#;
1006        let jsonb_bytes = create_test_jsonb(json);
1007
1008        let mut binary_builder = LargeBinaryBuilder::new();
1009        binary_builder.append_value(&jsonb_bytes);
1010        binary_builder.append_value(&jsonb_bytes);
1011        binary_builder.append_value(&jsonb_bytes);
1012        binary_builder.append_null();
1013
1014        let jsonb_array = Arc::new(binary_builder.finish());
1015        let path_array = Arc::new(StringArray::from(vec![
1016            Some("$.empty"),
1017            Some("$.tags"),
1018            Some("$.nested.arr"),
1019            Some("$.any"),
1020        ]));
1021
1022        let result = json_array_length_impl(&[jsonb_array, path_array])?;
1023        let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
1024
1025        assert_eq!(int_array.len(), 4);
1026        assert_eq!(int_array.value(0), 0);
1027        assert_eq!(int_array.value(1), 3);
1028        assert_eq!(int_array.value(2), 2);
1029        assert!(int_array.is_null(3));
1030
1031        Ok(())
1032    }
1033
1034    #[tokio::test]
1035    async fn test_json_array_access() -> Result<()> {
1036        let json = r#"["first", "second", "third"]"#;
1037        let jsonb_bytes = create_test_jsonb(json);
1038
1039        let mut binary_builder = LargeBinaryBuilder::new();
1040        binary_builder.append_value(&jsonb_bytes);
1041        binary_builder.append_value(&jsonb_bytes);
1042        binary_builder.append_value(&jsonb_bytes);
1043
1044        let jsonb_array = Arc::new(binary_builder.finish());
1045        let key_array = Arc::new(StringArray::from(vec![
1046            Some("0"),
1047            Some("1"),
1048            Some("10"), // Out of bounds
1049        ]));
1050
1051        let result = json_get_string_impl(&[jsonb_array, key_array])?;
1052        let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1053
1054        assert_eq!(string_array.len(), 3);
1055        assert_eq!(string_array.value(0), "first");
1056        assert_eq!(string_array.value(1), "second");
1057        assert!(string_array.is_null(2));
1058
1059        Ok(())
1060    }
1061}