lance_datafusion/udf/
json.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use arrow_array::builder::{
5    BooleanBuilder, Float64Builder, Int64Builder, LargeBinaryBuilder, StringBuilder,
6};
7use arrow_array::{Array, ArrayRef, LargeBinaryArray, StringArray};
8use arrow_schema::DataType;
9use datafusion::error::{DataFusionError, Result};
10use datafusion::logical_expr::{ScalarUDF, Volatility};
11use datafusion::physical_plan::ColumnarValue;
12use datafusion::prelude::create_udf;
13use std::sync::Arc;
14
15/// Represents the type of a JSONB value
16#[repr(u8)]
17#[derive(Debug, Clone, Copy, PartialEq, Eq)]
18pub enum JsonbType {
19    Null = 0,
20    Boolean = 1,
21    Int64 = 2,
22    Float64 = 3,
23    String = 4,
24    Array = 5,
25    Object = 6,
26}
27
28impl JsonbType {
29    /// Convert from u8 value
30    pub fn from_u8(value: u8) -> Option<Self> {
31        match value {
32            0 => Some(Self::Null),
33            1 => Some(Self::Boolean),
34            2 => Some(Self::Int64),
35            3 => Some(Self::Float64),
36            4 => Some(Self::String),
37            5 => Some(Self::Array),
38            6 => Some(Self::Object),
39            _ => None,
40        }
41    }
42
43    /// Convert to u8 value for storage in Arrow arrays
44    pub fn as_u8(self) -> u8 {
45        self as u8
46    }
47}
48
49/// Common helper functions and types for JSON UDFs
50mod common {
51    use super::*;
52
53    /// Key type for JSON field access - optimizes field/index parsing
54    #[derive(Debug, Clone)]
55    pub enum KeyType {
56        Field(String),
57        Index(usize),
58    }
59
60    impl KeyType {
61        /// Parse a key string into either a field name or array index (once per operation)
62        pub fn parse(key: &str) -> Self {
63            if let Ok(index) = key.parse::<usize>() {
64                Self::Index(index)
65            } else {
66                Self::Field(key.to_string())
67            }
68        }
69    }
70
71    /// Convert ColumnarValue arguments to ArrayRef vector
72    ///
73    /// Note: This implementation currently broadcasts scalars to arrays.
74    /// Future optimization: handle scalars directly without broadcasting
75    /// to improve performance for scalar inputs.
76    pub fn columnar_to_arrays(args: &[ColumnarValue]) -> Vec<ArrayRef> {
77        args.iter()
78            .map(|arg| match arg {
79                ColumnarValue::Array(arr) => arr.clone(),
80                ColumnarValue::Scalar(scalar) => scalar.to_array().unwrap(),
81            })
82            .collect()
83    }
84
85    /// Create DataFusionError for execution failures (simplified error wrapping)
86    pub fn execution_error(msg: impl Into<String>) -> DataFusionError {
87        DataFusionError::Execution(msg.into())
88    }
89
90    /// Validate argument count for UDF
91    pub fn validate_arg_count(
92        args: &[ArrayRef],
93        expected: usize,
94        function_name: &str,
95    ) -> Result<()> {
96        if args.len() != expected {
97            return Err(execution_error(format!(
98                "{} requires exactly {} arguments",
99                function_name, expected
100            )));
101        }
102        Ok(())
103    }
104
105    /// Extract and validate LargeBinaryArray from first argument
106    pub fn extract_jsonb_array(args: &[ArrayRef]) -> Result<&LargeBinaryArray> {
107        args[0]
108            .as_any()
109            .downcast_ref::<LargeBinaryArray>()
110            .ok_or_else(|| execution_error("First argument must be LargeBinary"))
111    }
112
113    /// Extract and validate StringArray from specified argument
114    pub fn extract_string_array(args: &[ArrayRef], arg_index: usize) -> Result<&StringArray> {
115        args[arg_index]
116            .as_any()
117            .downcast_ref::<StringArray>()
118            .ok_or_else(|| execution_error(format!("Argument {} must be String", arg_index + 1)))
119    }
120
121    /// Get string value at index, handling scalar broadcast case
122    /// When a scalar is converted to an array, it becomes a single-element array
123    /// This function handles accessing that value repeatedly for all rows
124    pub fn get_string_value_at(string_array: &StringArray, index: usize) -> Option<&str> {
125        // Handle scalar broadcast case: if array has only 1 element, always use index 0
126        let actual_index = if string_array.len() == 1 { 0 } else { index };
127
128        if string_array.is_null(actual_index) {
129            None
130        } else {
131            Some(string_array.value(actual_index))
132        }
133    }
134
135    /// Get JSON field/element using pre-parsed key type (avoids repeated parsing)
136    pub fn get_json_value_by_key(
137        raw_jsonb: &jsonb::RawJsonb,
138        key_type: &KeyType,
139    ) -> Result<Option<jsonb::OwnedJsonb>> {
140        match key_type {
141            KeyType::Field(field) => raw_jsonb
142                .get_by_name(field, false)
143                .map_err(|e| execution_error(format!("Failed to get field '{}': {}", field, e))),
144            KeyType::Index(index) => raw_jsonb.get_by_index(*index).map_err(|e| {
145                execution_error(format!("Failed to get array element [{}]: {}", index, e))
146            }),
147        }
148    }
149
150    /// Parse JSONPath with proper error handling (no false returns)
151    pub fn parse_json_path(path: &str) -> Result<jsonb::jsonpath::JsonPath<'_>> {
152        jsonb::jsonpath::parse_json_path(path.as_bytes())
153            .map_err(|e| execution_error(format!("Invalid JSONPath '{}': {}", path, e)))
154    }
155}
156
157/// Convert JSONB value to string using jsonb's built-in serde (strict mode)
158fn json_value_to_string(value: jsonb::OwnedJsonb) -> Result<Option<String>> {
159    let raw_jsonb = value.as_raw();
160
161    // Check for null first
162    if raw_jsonb
163        .is_null()
164        .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
165    {
166        return Ok(None);
167    }
168
169    // Use jsonb's built-in to_str() method - strict conversion
170    raw_jsonb
171        .to_str()
172        .map(Some)
173        .map_err(|e| common::execution_error(format!("Failed to convert to string: {}", e)))
174}
175
176/// Convert JSONB value to integer using jsonb's built-in serde (strict mode)
177fn json_value_to_int(value: jsonb::OwnedJsonb) -> Result<Option<i64>> {
178    let raw_jsonb = value.as_raw();
179
180    // Check for null first
181    if raw_jsonb
182        .is_null()
183        .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
184    {
185        return Ok(None);
186    }
187
188    // Use jsonb's built-in to_i64() method - strict conversion
189    raw_jsonb
190        .to_i64()
191        .map(Some)
192        .map_err(|e| common::execution_error(format!("Failed to convert to integer: {}", e)))
193}
194
195/// Convert JSONB value to float using jsonb's built-in serde (strict mode)
196fn json_value_to_float(value: jsonb::OwnedJsonb) -> Result<Option<f64>> {
197    let raw_jsonb = value.as_raw();
198
199    // Check for null first
200    if raw_jsonb
201        .is_null()
202        .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
203    {
204        return Ok(None);
205    }
206
207    // Use jsonb's built-in to_f64() method - strict conversion
208    raw_jsonb
209        .to_f64()
210        .map(Some)
211        .map_err(|e| common::execution_error(format!("Failed to convert to float: {}", e)))
212}
213
214/// Convert JSONB value to boolean using jsonb's built-in serde (strict mode)
215fn json_value_to_bool(value: jsonb::OwnedJsonb) -> Result<Option<bool>> {
216    let raw_jsonb = value.as_raw();
217
218    // Check for null first
219    if raw_jsonb
220        .is_null()
221        .map_err(|e| common::execution_error(format!("Failed to check null: {}", e)))?
222    {
223        return Ok(None);
224    }
225
226    // Use jsonb's built-in to_bool() method - strict conversion
227    raw_jsonb
228        .to_bool()
229        .map(Some)
230        .map_err(|e| common::execution_error(format!("Failed to convert to boolean: {}", e)))
231}
232
233/// Create the json_extract UDF for extracting JSONPath from JSON data
234///
235/// # Arguments
236/// * First parameter: JSONB binary data (LargeBinary)
237/// * Second parameter: JSONPath expression as string (Utf8)
238///
239/// # Returns
240/// String representation of the extracted value, or null if path not found
241pub fn json_extract_udf() -> ScalarUDF {
242    create_udf(
243        "json_extract",
244        vec![DataType::LargeBinary, DataType::Utf8],
245        DataType::Utf8,
246        Volatility::Immutable,
247        Arc::new(json_extract_columnar_impl),
248    )
249}
250
251/// Create the json_extract_with_type UDF that returns JSONB bytes with type information
252///
253/// # Arguments
254/// * First parameter: JSONB binary data (LargeBinary)
255/// * Second parameter: JSONPath expression as string (Utf8)
256///
257/// # Returns
258/// A struct with two fields:
259/// - value: LargeBinary (the extracted JSONB value)
260/// - type_tag: UInt8 (type information: 0=null, 1=bool, 2=int64, 3=float64, 4=string, 5=array, 6=object)
261pub fn json_extract_with_type_udf() -> ScalarUDF {
262    use arrow_schema::Fields;
263
264    let return_type = DataType::Struct(Fields::from(vec![
265        arrow_schema::Field::new("value", DataType::LargeBinary, true),
266        arrow_schema::Field::new("type_tag", DataType::UInt8, false),
267    ]));
268
269    create_udf(
270        "json_extract_with_type",
271        vec![DataType::LargeBinary, DataType::Utf8],
272        return_type,
273        Volatility::Immutable,
274        Arc::new(json_extract_with_type_columnar_impl),
275    )
276}
277
278/// Implementation of json_extract function with ColumnarValue
279fn json_extract_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
280    let arrays = common::columnar_to_arrays(args);
281    let result = json_extract_impl(&arrays)?;
282    Ok(ColumnarValue::Array(result))
283}
284
285/// Implementation of json_extract_with_type function with ColumnarValue
286fn json_extract_with_type_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
287    let arrays = common::columnar_to_arrays(args);
288    let result = json_extract_with_type_impl(&arrays)?;
289    Ok(ColumnarValue::Array(result))
290}
291
292/// Implementation of json_extract function
293fn json_extract_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
294    common::validate_arg_count(args, 2, "json_extract")?;
295
296    let jsonb_array = common::extract_jsonb_array(args)?;
297    let path_array = common::extract_string_array(args, 1)?;
298    let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
299
300    for i in 0..jsonb_array.len() {
301        if jsonb_array.is_null(i) {
302            builder.append_null();
303        } else if let Some(path) = common::get_string_value_at(path_array, i) {
304            let jsonb_bytes = jsonb_array.value(i);
305            match extract_json_path(jsonb_bytes, path)? {
306                Some(value) => builder.append_value(&value),
307                None => builder.append_null(),
308            }
309        } else {
310            builder.append_null();
311        }
312    }
313
314    Ok(Arc::new(builder.finish()))
315}
316
317/// Implementation of json_extract_with_type function
318fn json_extract_with_type_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
319    use arrow_array::builder::{LargeBinaryBuilder, UInt8Builder};
320    use arrow_array::StructArray;
321
322    common::validate_arg_count(args, 2, "json_extract_with_type")?;
323
324    let jsonb_array = common::extract_jsonb_array(args)?;
325    let path_array = common::extract_string_array(args, 1)?;
326
327    let mut value_builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 1024);
328    let mut type_builder = UInt8Builder::with_capacity(jsonb_array.len());
329
330    for i in 0..jsonb_array.len() {
331        if jsonb_array.is_null(i) {
332            value_builder.append_null();
333            type_builder.append_value(JsonbType::Null.as_u8());
334        } else if let Some(path) = common::get_string_value_at(path_array, i) {
335            let jsonb_bytes = jsonb_array.value(i);
336            match extract_json_path_with_type(jsonb_bytes, path)? {
337                Some((value_bytes, type_tag)) => {
338                    value_builder.append_value(&value_bytes);
339                    type_builder.append_value(type_tag);
340                }
341                None => {
342                    value_builder.append_null();
343                    type_builder.append_value(JsonbType::Null.as_u8());
344                }
345            }
346        } else {
347            value_builder.append_null();
348            type_builder.append_value(JsonbType::Null.as_u8());
349        }
350    }
351
352    // Create struct array with two fields
353    let value_array = Arc::new(value_builder.finish()) as ArrayRef;
354    let type_array = Arc::new(type_builder.finish()) as ArrayRef;
355
356    let struct_array = StructArray::from(vec![
357        (
358            Arc::new(arrow_schema::Field::new(
359                "value",
360                DataType::LargeBinary,
361                true,
362            )),
363            value_array,
364        ),
365        (
366            Arc::new(arrow_schema::Field::new("type_tag", DataType::UInt8, false)),
367            type_array,
368        ),
369    ]);
370
371    Ok(Arc::new(struct_array))
372}
373
374/// Extract value from JSONB using JSONPath and return with type information
375/// Returns (JSONB bytes, type_tag) where type_tag represents the JsonbType
376fn extract_json_path_with_type(jsonb_bytes: &[u8], path: &str) -> Result<Option<(Vec<u8>, u8)>> {
377    let json_path = common::parse_json_path(path)?;
378
379    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
380    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
381    match selector.select_values(&json_path) {
382        Ok(values) => {
383            if values.is_empty() {
384                Ok(None)
385            } else {
386                // Get the first matched value
387                let owned_value = &values[0];
388                let raw = owned_value.as_raw();
389
390                // Determine type using JsonbType enum
391                let jsonb_type = if raw.is_null().unwrap_or(false) {
392                    JsonbType::Null
393                } else if raw.is_boolean().unwrap_or(false) {
394                    JsonbType::Boolean
395                } else if raw.is_number().unwrap_or(false) {
396                    // Try to determine if it's an integer or float
397                    if raw.to_i64().is_ok() {
398                        JsonbType::Int64
399                    } else {
400                        JsonbType::Float64
401                    }
402                } else if raw.is_string().unwrap_or(false) {
403                    JsonbType::String
404                } else if raw.is_array().unwrap_or(false) {
405                    JsonbType::Array
406                } else if raw.is_object().unwrap_or(false) {
407                    JsonbType::Object
408                } else {
409                    JsonbType::String // default to string
410                };
411
412                // Return the JSONB bytes and type tag as u8
413                Ok(Some((owned_value.clone().to_vec(), jsonb_type.as_u8())))
414            }
415        }
416        Err(e) => Err(common::execution_error(format!(
417            "Failed to select values from path '{}': {}",
418            path, e
419        ))),
420    }
421}
422
423/// Extract value from JSONB using JSONPath
424///
425/// Note: Uses `select_values` instead of the deprecated `select_by_path` method
426fn extract_json_path(jsonb_bytes: &[u8], path: &str) -> Result<Option<String>> {
427    let json_path = common::parse_json_path(path)?;
428
429    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
430    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
431    match selector.select_values(&json_path) {
432        Ok(values) => {
433            if values.is_empty() {
434                Ok(None)
435            } else {
436                // Return the first matched value
437                Ok(Some(values[0].to_string()))
438            }
439        }
440        Err(e) => Err(common::execution_error(format!(
441            "Failed to select values from path '{}': {}",
442            path, e
443        ))),
444    }
445}
446
447/// Create the json_exists UDF for checking if a JSONPath exists
448///
449/// # Arguments
450/// * First parameter: JSONB binary data (LargeBinary)
451/// * Second parameter: JSONPath expression as string (Utf8)
452///
453/// # Returns
454/// Boolean indicating whether the path exists in the JSON data
455pub fn json_exists_udf() -> ScalarUDF {
456    create_udf(
457        "json_exists",
458        vec![DataType::LargeBinary, DataType::Utf8],
459        DataType::Boolean,
460        Volatility::Immutable,
461        Arc::new(json_exists_columnar_impl),
462    )
463}
464
465/// Implementation of json_exists function with ColumnarValue
466fn json_exists_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
467    let arrays = common::columnar_to_arrays(args);
468    let result = json_exists_impl(&arrays)?;
469    Ok(ColumnarValue::Array(result))
470}
471
472/// Implementation of json_exists function
473fn json_exists_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
474    common::validate_arg_count(args, 2, "json_exists")?;
475
476    let jsonb_array = common::extract_jsonb_array(args)?;
477    let path_array = common::extract_string_array(args, 1)?;
478
479    let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
480
481    for i in 0..jsonb_array.len() {
482        if jsonb_array.is_null(i) {
483            builder.append_null();
484        } else if let Some(path) = common::get_string_value_at(path_array, i) {
485            let jsonb_bytes = jsonb_array.value(i);
486            let exists = check_json_path_exists(jsonb_bytes, path)?;
487            builder.append_value(exists);
488        } else {
489            builder.append_null();
490        }
491    }
492
493    Ok(Arc::new(builder.finish()))
494}
495
496/// Check if a JSONPath exists in JSONB
497fn check_json_path_exists(jsonb_bytes: &[u8], path: &str) -> Result<bool> {
498    let json_path = common::parse_json_path(path)?;
499
500    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
501    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
502    match selector.exists(&json_path) {
503        Ok(exists) => Ok(exists),
504        Err(e) => Err(common::execution_error(format!(
505            "Failed to check existence of path '{}': {}",
506            path, e
507        ))),
508    }
509}
510
511/// Create the json_get UDF for getting a field value as JSON string
512///
513/// # Arguments
514/// * First parameter: JSONB binary data (LargeBinary)
515/// * Second parameter: Field name or array index as string (Utf8)
516///
517/// # Returns
518/// Raw JSONB bytes of the field value, or null if not found
519pub fn json_get_udf() -> ScalarUDF {
520    create_udf(
521        "json_get",
522        vec![DataType::LargeBinary, DataType::Utf8],
523        DataType::LargeBinary,
524        Volatility::Immutable,
525        Arc::new(json_get_columnar_impl),
526    )
527}
528
529/// Implementation of json_get function with ColumnarValue
530fn json_get_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
531    let arrays = common::columnar_to_arrays(args);
532    let result = json_get_impl(&arrays)?;
533    Ok(ColumnarValue::Array(result))
534}
535
536/// Implementation of json_get function
537fn json_get_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
538    common::validate_arg_count(args, 2, "json_get")?;
539
540    let jsonb_array = common::extract_jsonb_array(args)?;
541    let key_array = common::extract_string_array(args, 1)?;
542
543    let mut builder = LargeBinaryBuilder::with_capacity(jsonb_array.len(), 0);
544
545    for i in 0..jsonb_array.len() {
546        if jsonb_array.is_null(i) {
547            builder.append_null();
548        } else if let Some(key) = common::get_string_value_at(key_array, i) {
549            let jsonb_bytes = jsonb_array.value(i);
550            let key_type = common::KeyType::parse(key);
551            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
552
553            match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
554                Some(value) => builder.append_value(value.as_raw().as_ref()),
555                None => builder.append_null(),
556            }
557        } else {
558            builder.append_null();
559        }
560    }
561
562    Ok(Arc::new(builder.finish()))
563}
564
565/// Create the json_get_string UDF for getting a string value
566///
567/// # Arguments
568/// * First parameter: JSONB binary data (LargeBinary)
569/// * Second parameter: Field name or array index as string (Utf8)
570///
571/// # Returns
572/// String value with type coercion (numbers/booleans converted to strings)
573pub fn json_get_string_udf() -> ScalarUDF {
574    create_udf(
575        "json_get_string",
576        vec![DataType::LargeBinary, DataType::Utf8],
577        DataType::Utf8,
578        Volatility::Immutable,
579        Arc::new(json_get_string_columnar_impl),
580    )
581}
582
583/// Implementation of json_get_string function with ColumnarValue
584fn json_get_string_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
585    let arrays = common::columnar_to_arrays(args);
586    let result = json_get_string_impl(&arrays)?;
587    Ok(ColumnarValue::Array(result))
588}
589
590/// Implementation of json_get_string function
591fn json_get_string_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
592    common::validate_arg_count(args, 2, "json_get_string")?;
593
594    let jsonb_array = common::extract_jsonb_array(args)?;
595    let key_array = common::extract_string_array(args, 1)?;
596
597    let mut builder = StringBuilder::with_capacity(jsonb_array.len(), 1024);
598
599    for i in 0..jsonb_array.len() {
600        if jsonb_array.is_null(i) {
601            builder.append_null();
602        } else if let Some(key) = common::get_string_value_at(key_array, i) {
603            let jsonb_bytes = jsonb_array.value(i);
604            let key_type = common::KeyType::parse(key);
605            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
606
607            match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
608                Some(value) => match json_value_to_string(value)? {
609                    Some(string_val) => builder.append_value(&string_val),
610                    None => builder.append_null(),
611                },
612                None => builder.append_null(),
613            }
614        } else {
615            builder.append_null();
616        }
617    }
618
619    Ok(Arc::new(builder.finish()))
620}
621
622/// Create the json_get_int UDF for getting an integer value
623///
624/// # Arguments
625/// * First parameter: JSONB binary data (LargeBinary)
626/// * Second parameter: Field name or array index as string (Utf8)
627///
628/// # Returns
629/// Integer value with type coercion (strings/floats/booleans converted to int)
630pub fn json_get_int_udf() -> ScalarUDF {
631    create_udf(
632        "json_get_int",
633        vec![DataType::LargeBinary, DataType::Utf8],
634        DataType::Int64,
635        Volatility::Immutable,
636        Arc::new(json_get_int_columnar_impl),
637    )
638}
639
640/// Implementation of json_get_int function with ColumnarValue
641fn json_get_int_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
642    let arrays = common::columnar_to_arrays(args);
643    let result = json_get_int_impl(&arrays)?;
644    Ok(ColumnarValue::Array(result))
645}
646
647/// Implementation of json_get_int function
648fn json_get_int_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
649    common::validate_arg_count(args, 2, "json_get_int")?;
650
651    let jsonb_array = common::extract_jsonb_array(args)?;
652    let key_array = common::extract_string_array(args, 1)?;
653
654    let mut builder = Int64Builder::with_capacity(jsonb_array.len());
655
656    for i in 0..jsonb_array.len() {
657        if jsonb_array.is_null(i) {
658            builder.append_null();
659        } else if let Some(key) = common::get_string_value_at(key_array, i) {
660            let jsonb_bytes = jsonb_array.value(i);
661            let key_type = common::KeyType::parse(key);
662            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
663
664            match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
665                Some(value) => match json_value_to_int(value)? {
666                    Some(int_val) => builder.append_value(int_val),
667                    None => builder.append_null(),
668                },
669                None => builder.append_null(),
670            }
671        } else {
672            builder.append_null();
673        }
674    }
675
676    Ok(Arc::new(builder.finish()))
677}
678
679/// Create the json_get_float UDF for getting a float value
680///
681/// # Arguments
682/// * First parameter: JSONB binary data (LargeBinary)
683/// * Second parameter: Field name or array index as string (Utf8)
684///
685/// # Returns
686/// Float value with type coercion (strings/integers/booleans converted to float)
687pub fn json_get_float_udf() -> ScalarUDF {
688    create_udf(
689        "json_get_float",
690        vec![DataType::LargeBinary, DataType::Utf8],
691        DataType::Float64,
692        Volatility::Immutable,
693        Arc::new(json_get_float_columnar_impl),
694    )
695}
696
697/// Implementation of json_get_float function with ColumnarValue
698fn json_get_float_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
699    let arrays = common::columnar_to_arrays(args);
700    let result = json_get_float_impl(&arrays)?;
701    Ok(ColumnarValue::Array(result))
702}
703
704/// Implementation of json_get_float function
705fn json_get_float_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
706    common::validate_arg_count(args, 2, "json_get_float")?;
707
708    let jsonb_array = common::extract_jsonb_array(args)?;
709    let key_array = common::extract_string_array(args, 1)?;
710
711    let mut builder = Float64Builder::with_capacity(jsonb_array.len());
712
713    for i in 0..jsonb_array.len() {
714        if jsonb_array.is_null(i) {
715            builder.append_null();
716        } else if let Some(key) = common::get_string_value_at(key_array, i) {
717            let jsonb_bytes = jsonb_array.value(i);
718            let key_type = common::KeyType::parse(key);
719            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
720
721            match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
722                Some(value) => match json_value_to_float(value)? {
723                    Some(float_val) => builder.append_value(float_val),
724                    None => builder.append_null(),
725                },
726                None => builder.append_null(),
727            }
728        } else {
729            builder.append_null();
730        }
731    }
732
733    Ok(Arc::new(builder.finish()))
734}
735
736/// Create the json_get_bool UDF for getting a boolean value
737///
738/// # Arguments
739/// * First parameter: JSONB binary data (LargeBinary)
740/// * Second parameter: Field name or array index as string (Utf8)
741///
742/// # Returns
743/// Boolean value with flexible type coercion (strings like 'true'/'yes'/'1' become true)
744pub fn json_get_bool_udf() -> ScalarUDF {
745    create_udf(
746        "json_get_bool",
747        vec![DataType::LargeBinary, DataType::Utf8],
748        DataType::Boolean,
749        Volatility::Immutable,
750        Arc::new(json_get_bool_columnar_impl),
751    )
752}
753
754/// Implementation of json_get_bool function with ColumnarValue
755fn json_get_bool_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
756    let arrays = common::columnar_to_arrays(args);
757    let result = json_get_bool_impl(&arrays)?;
758    Ok(ColumnarValue::Array(result))
759}
760
761/// Implementation of json_get_bool function
762fn json_get_bool_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
763    common::validate_arg_count(args, 2, "json_get_bool")?;
764
765    let jsonb_array = common::extract_jsonb_array(args)?;
766    let key_array = common::extract_string_array(args, 1)?;
767
768    let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
769
770    for i in 0..jsonb_array.len() {
771        if jsonb_array.is_null(i) {
772            builder.append_null();
773        } else if let Some(key) = common::get_string_value_at(key_array, i) {
774            let jsonb_bytes = jsonb_array.value(i);
775            let key_type = common::KeyType::parse(key);
776            let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
777
778            match common::get_json_value_by_key(&raw_jsonb, &key_type)? {
779                Some(value) => match json_value_to_bool(value)? {
780                    Some(bool_val) => builder.append_value(bool_val),
781                    None => builder.append_null(),
782                },
783                None => builder.append_null(),
784            }
785        } else {
786            builder.append_null();
787        }
788    }
789
790    Ok(Arc::new(builder.finish()))
791}
792
793/// Create the json_array_contains UDF for checking if array contains a value
794///
795/// # Arguments
796/// * First parameter: JSONB binary data (LargeBinary)
797/// * Second parameter: JSONPath to array location (Utf8)
798/// * Third parameter: Value to search for as string (Utf8)
799///
800/// # Returns
801/// Boolean indicating whether the array contains the specified value
802pub fn json_array_contains_udf() -> ScalarUDF {
803    create_udf(
804        "json_array_contains",
805        vec![DataType::LargeBinary, DataType::Utf8, DataType::Utf8],
806        DataType::Boolean,
807        Volatility::Immutable,
808        Arc::new(json_array_contains_columnar_impl),
809    )
810}
811
812/// Implementation of json_array_contains function with ColumnarValue
813fn json_array_contains_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
814    let arrays = common::columnar_to_arrays(args);
815    let result = json_array_contains_impl(&arrays)?;
816    Ok(ColumnarValue::Array(result))
817}
818
819/// Implementation of json_array_contains function
820fn json_array_contains_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
821    common::validate_arg_count(args, 3, "json_array_contains")?;
822
823    let jsonb_array = common::extract_jsonb_array(args)?;
824    let path_array = common::extract_string_array(args, 1)?;
825    let value_array = common::extract_string_array(args, 2)?;
826
827    let mut builder = BooleanBuilder::with_capacity(jsonb_array.len());
828
829    for i in 0..jsonb_array.len() {
830        if jsonb_array.is_null(i) {
831            builder.append_null();
832        } else {
833            let path = common::get_string_value_at(path_array, i);
834            let value = common::get_string_value_at(value_array, i);
835
836            match (path, value) {
837                (Some(p), Some(v)) => {
838                    let jsonb_bytes = jsonb_array.value(i);
839                    let contains = check_array_contains(jsonb_bytes, p, v)?;
840                    builder.append_value(contains);
841                }
842                _ => builder.append_null(),
843            }
844        }
845    }
846
847    Ok(Arc::new(builder.finish()))
848}
849
850/// Check if a JSON array at path contains a value
851fn check_array_contains(jsonb_bytes: &[u8], path: &str, value: &str) -> Result<bool> {
852    let json_path = common::parse_json_path(path)?;
853
854    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
855    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
856    match selector.select_values(&json_path) {
857        Ok(values) => {
858            for v in values {
859                // Convert to raw JSONB for direct access
860                let raw = v.as_raw();
861                // Check if it's an array by trying to iterate
862                let mut index = 0;
863                loop {
864                    match raw.get_by_index(index) {
865                        Ok(Some(elem)) => {
866                            let elem_str = elem.to_string();
867                            // Compare as JSON strings (with quotes for strings)
868                            if elem_str == value || elem_str == format!("\"{}\"", value) {
869                                return Ok(true);
870                            }
871                            index += 1;
872                        }
873                        Ok(None) => break, // End of array
874                        Err(_) => break,   // Not an array or error
875                    }
876                }
877            }
878            Ok(false)
879        }
880        Err(e) => Err(common::execution_error(format!(
881            "Failed to check array contains at path '{}': {}",
882            path, e
883        ))),
884    }
885}
886
887/// Create the json_array_length UDF for getting array length
888///
889/// # Arguments
890/// * First parameter: JSONB binary data (LargeBinary)
891/// * Second parameter: JSONPath to array location (Utf8)
892///
893/// # Returns
894/// Integer length of the JSON array, or null if path doesn't point to an array
895pub fn json_array_length_udf() -> ScalarUDF {
896    create_udf(
897        "json_array_length",
898        vec![DataType::LargeBinary, DataType::Utf8],
899        DataType::Int64,
900        Volatility::Immutable,
901        Arc::new(json_array_length_columnar_impl),
902    )
903}
904
905/// Implementation of json_array_length function with ColumnarValue
906fn json_array_length_columnar_impl(args: &[ColumnarValue]) -> Result<ColumnarValue> {
907    let arrays = common::columnar_to_arrays(args);
908    let result = json_array_length_impl(&arrays)?;
909    Ok(ColumnarValue::Array(result))
910}
911
912/// Implementation of json_array_length function
913fn json_array_length_impl(args: &[ArrayRef]) -> Result<ArrayRef> {
914    common::validate_arg_count(args, 2, "json_array_length")?;
915
916    let jsonb_array = common::extract_jsonb_array(args)?;
917    let path_array = common::extract_string_array(args, 1)?;
918
919    let mut builder = Int64Builder::with_capacity(jsonb_array.len());
920
921    for i in 0..jsonb_array.len() {
922        if jsonb_array.is_null(i) {
923            builder.append_null();
924        } else if let Some(path) = common::get_string_value_at(path_array, i) {
925            let jsonb_bytes = jsonb_array.value(i);
926            match get_array_length(jsonb_bytes, path)? {
927                Some(len) => builder.append_value(len),
928                None => builder.append_null(),
929            }
930        } else {
931            builder.append_null();
932        }
933    }
934
935    Ok(Arc::new(builder.finish()))
936}
937
938/// Get the length of a JSON array at path
939fn get_array_length(jsonb_bytes: &[u8], path: &str) -> Result<Option<i64>> {
940    let json_path = common::parse_json_path(path)?;
941
942    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
943    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
944    match selector.select_values(&json_path) {
945        Ok(values) => {
946            if values.is_empty() {
947                return Ok(None);
948            }
949            let first = &values[0];
950            let raw = first.as_raw();
951
952            // Count array elements by iterating
953            let mut count = 0;
954            loop {
955                match raw.get_by_index(count) {
956                    Ok(Some(_)) => count += 1,
957                    Ok(None) => break, // End of array
958                    Err(_) => {
959                        // Not an array
960                        if count == 0 {
961                            return Err(common::execution_error(format!(
962                                "Path '{}' does not point to an array",
963                                path
964                            )));
965                        }
966                        break;
967                    }
968                }
969            }
970            Ok(Some(count as i64))
971        }
972        Err(e) => Err(common::execution_error(format!(
973            "Failed to get array length at path '{}': {}",
974            path, e
975        ))),
976    }
977}
978
979#[cfg(test)]
980mod tests {
981    use super::*;
982    use arrow_array::builder::LargeBinaryBuilder;
983    use arrow_array::{BooleanArray, Int64Array};
984
985    fn create_test_jsonb(json_str: &str) -> Vec<u8> {
986        jsonb::parse_value(json_str.as_bytes()).unwrap().to_vec()
987    }
988
989    #[test]
990    fn test_jsonb_type_enum() {
991        // Test enum conversion to/from u8
992        assert_eq!(JsonbType::Null.as_u8(), 0);
993        assert_eq!(JsonbType::Boolean.as_u8(), 1);
994        assert_eq!(JsonbType::Int64.as_u8(), 2);
995        assert_eq!(JsonbType::Float64.as_u8(), 3);
996        assert_eq!(JsonbType::String.as_u8(), 4);
997        assert_eq!(JsonbType::Array.as_u8(), 5);
998        assert_eq!(JsonbType::Object.as_u8(), 6);
999
1000        // Test from_u8 conversion
1001        assert_eq!(JsonbType::from_u8(0), Some(JsonbType::Null));
1002        assert_eq!(JsonbType::from_u8(1), Some(JsonbType::Boolean));
1003        assert_eq!(JsonbType::from_u8(2), Some(JsonbType::Int64));
1004        assert_eq!(JsonbType::from_u8(3), Some(JsonbType::Float64));
1005        assert_eq!(JsonbType::from_u8(4), Some(JsonbType::String));
1006        assert_eq!(JsonbType::from_u8(5), Some(JsonbType::Array));
1007        assert_eq!(JsonbType::from_u8(6), Some(JsonbType::Object));
1008        assert_eq!(JsonbType::from_u8(7), None); // Invalid value
1009    }
1010
1011    #[tokio::test]
1012    async fn test_json_extract_udf() -> Result<()> {
1013        let json = r#"{"user": {"name": "Alice", "age": 30}}"#;
1014        let jsonb_bytes = create_test_jsonb(json);
1015
1016        let mut binary_builder = LargeBinaryBuilder::new();
1017        binary_builder.append_value(&jsonb_bytes);
1018        binary_builder.append_value(&jsonb_bytes);
1019        binary_builder.append_null();
1020
1021        let jsonb_array = Arc::new(binary_builder.finish());
1022        let path_array = Arc::new(StringArray::from(vec![
1023            Some("$.user.name"),
1024            Some("$.user.age"),
1025            Some("$.user.name"),
1026        ]));
1027
1028        let result = json_extract_impl(&[jsonb_array, path_array])?;
1029        let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1030
1031        assert_eq!(string_array.len(), 3);
1032        assert_eq!(string_array.value(0), "\"Alice\"");
1033        assert_eq!(string_array.value(1), "30");
1034        assert!(string_array.is_null(2));
1035
1036        Ok(())
1037    }
1038
1039    #[tokio::test]
1040    async fn test_json_exists_udf() -> Result<()> {
1041        let json = r#"{"user": {"name": "Alice", "age": 30}, "tags": ["rust", "json"]}"#;
1042        let jsonb_bytes = create_test_jsonb(json);
1043
1044        let mut binary_builder = LargeBinaryBuilder::new();
1045        binary_builder.append_value(&jsonb_bytes);
1046        binary_builder.append_value(&jsonb_bytes);
1047        binary_builder.append_value(&jsonb_bytes);
1048        binary_builder.append_null();
1049
1050        let jsonb_array = Arc::new(binary_builder.finish());
1051        let path_array = Arc::new(StringArray::from(vec![
1052            Some("$.user.name"),
1053            Some("$.user.email"),
1054            Some("$.tags"),
1055            Some("$.any"),
1056        ]));
1057
1058        let result = json_exists_impl(&[jsonb_array, path_array])?;
1059        let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1060
1061        assert_eq!(bool_array.len(), 4);
1062        assert!(bool_array.value(0));
1063        assert!(!bool_array.value(1));
1064        assert!(bool_array.value(2));
1065        assert!(bool_array.is_null(3));
1066
1067        Ok(())
1068    }
1069
1070    #[tokio::test]
1071    async fn test_json_get_string_udf() -> Result<()> {
1072        // Test valid string conversions
1073        let json = r#"{"str": "hello", "num": 123, "bool": true, "null": null}"#;
1074        let jsonb_bytes = create_test_jsonb(json);
1075
1076        let mut binary_builder = LargeBinaryBuilder::new();
1077        binary_builder.append_value(&jsonb_bytes);
1078        binary_builder.append_value(&jsonb_bytes);
1079        binary_builder.append_value(&jsonb_bytes);
1080        binary_builder.append_value(&jsonb_bytes);
1081
1082        let jsonb_array = Arc::new(binary_builder.finish());
1083        let key_array = Arc::new(StringArray::from(vec![
1084            Some("str"),
1085            Some("num"),
1086            Some("bool"),
1087            Some("null"),
1088        ]));
1089
1090        let result = json_get_string_impl(&[jsonb_array, key_array])?;
1091        let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1092
1093        assert_eq!(string_array.len(), 4);
1094        assert_eq!(string_array.value(0), "hello");
1095        assert_eq!(string_array.value(1), "123");
1096        assert_eq!(string_array.value(2), "true");
1097        assert!(string_array.is_null(3));
1098
1099        Ok(())
1100    }
1101
1102    #[tokio::test]
1103    async fn test_json_get_int_udf() -> Result<()> {
1104        let json = r#"{"int": 42, "str_num": "99", "bool": true}"#;
1105        let jsonb_bytes = create_test_jsonb(json);
1106
1107        let mut binary_builder = LargeBinaryBuilder::new();
1108        binary_builder.append_value(&jsonb_bytes);
1109        binary_builder.append_value(&jsonb_bytes);
1110        binary_builder.append_value(&jsonb_bytes);
1111
1112        let jsonb_array = Arc::new(binary_builder.finish());
1113        let key_array = Arc::new(StringArray::from(vec![
1114            Some("int"),
1115            Some("str_num"),
1116            Some("bool"),
1117        ]));
1118
1119        let result = json_get_int_impl(&[jsonb_array, key_array])?;
1120        let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
1121
1122        assert_eq!(int_array.len(), 3);
1123        assert_eq!(int_array.value(0), 42);
1124        assert_eq!(int_array.value(1), 99);
1125        assert_eq!(int_array.value(2), 1); // jsonb converts true to 1
1126
1127        Ok(())
1128    }
1129
1130    #[tokio::test]
1131    async fn test_json_get_bool_udf() -> Result<()> {
1132        let json =
1133            r#"{"bool_true": true, "bool_false": false, "str_true": "true", "str_false": "false"}"#;
1134        let jsonb_bytes = create_test_jsonb(json);
1135
1136        let mut binary_builder = LargeBinaryBuilder::new();
1137        binary_builder.append_value(&jsonb_bytes);
1138        binary_builder.append_value(&jsonb_bytes);
1139        binary_builder.append_value(&jsonb_bytes);
1140        binary_builder.append_value(&jsonb_bytes);
1141
1142        let jsonb_array = Arc::new(binary_builder.finish());
1143        let key_array = Arc::new(StringArray::from(vec![
1144            Some("bool_true"),
1145            Some("bool_false"),
1146            Some("str_true"),
1147            Some("str_false"),
1148        ]));
1149
1150        let result = json_get_bool_impl(&[jsonb_array, key_array])?;
1151        let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1152
1153        assert_eq!(bool_array.len(), 4);
1154        assert!(bool_array.value(0));
1155        assert!(!bool_array.value(1));
1156        assert!(bool_array.value(2)); // "true" string converts to true
1157        assert!(!bool_array.value(3)); // "false" string converts to false
1158
1159        Ok(())
1160    }
1161
1162    #[tokio::test]
1163    async fn test_json_array_contains_udf() -> Result<()> {
1164        let json = r#"{"tags": ["rust", "json", "database"], "nums": [1, 2, 3]}"#;
1165        let jsonb_bytes = create_test_jsonb(json);
1166
1167        let mut binary_builder = LargeBinaryBuilder::new();
1168        binary_builder.append_value(&jsonb_bytes);
1169        binary_builder.append_value(&jsonb_bytes);
1170        binary_builder.append_value(&jsonb_bytes);
1171        binary_builder.append_null();
1172
1173        let jsonb_array = Arc::new(binary_builder.finish());
1174        let path_array = Arc::new(StringArray::from(vec![
1175            Some("$.tags"),
1176            Some("$.tags"),
1177            Some("$.nums"),
1178            Some("$.tags"),
1179        ]));
1180        let value_array = Arc::new(StringArray::from(vec![
1181            Some("rust"),
1182            Some("python"),
1183            Some("2"),
1184            Some("any"),
1185        ]));
1186
1187        let result = json_array_contains_impl(&[jsonb_array, path_array, value_array])?;
1188        let bool_array = result.as_any().downcast_ref::<BooleanArray>().unwrap();
1189
1190        assert_eq!(bool_array.len(), 4);
1191        assert!(bool_array.value(0));
1192        assert!(!bool_array.value(1));
1193        assert!(bool_array.value(2));
1194        assert!(bool_array.is_null(3));
1195
1196        Ok(())
1197    }
1198
1199    #[tokio::test]
1200    async fn test_json_array_length_udf() -> Result<()> {
1201        let json = r#"{"empty": [], "tags": ["a", "b", "c"], "nested": {"arr": [1, 2]}}"#;
1202        let jsonb_bytes = create_test_jsonb(json);
1203
1204        let mut binary_builder = LargeBinaryBuilder::new();
1205        binary_builder.append_value(&jsonb_bytes);
1206        binary_builder.append_value(&jsonb_bytes);
1207        binary_builder.append_value(&jsonb_bytes);
1208        binary_builder.append_null();
1209
1210        let jsonb_array = Arc::new(binary_builder.finish());
1211        let path_array = Arc::new(StringArray::from(vec![
1212            Some("$.empty"),
1213            Some("$.tags"),
1214            Some("$.nested.arr"),
1215            Some("$.any"),
1216        ]));
1217
1218        let result = json_array_length_impl(&[jsonb_array, path_array])?;
1219        let int_array = result.as_any().downcast_ref::<Int64Array>().unwrap();
1220
1221        assert_eq!(int_array.len(), 4);
1222        assert_eq!(int_array.value(0), 0);
1223        assert_eq!(int_array.value(1), 3);
1224        assert_eq!(int_array.value(2), 2);
1225        assert!(int_array.is_null(3));
1226
1227        Ok(())
1228    }
1229
1230    #[tokio::test]
1231    async fn test_json_array_access() -> Result<()> {
1232        let json = r#"["first", "second", "third"]"#;
1233        let jsonb_bytes = create_test_jsonb(json);
1234
1235        let mut binary_builder = LargeBinaryBuilder::new();
1236        binary_builder.append_value(&jsonb_bytes);
1237        binary_builder.append_value(&jsonb_bytes);
1238        binary_builder.append_value(&jsonb_bytes);
1239
1240        let jsonb_array = Arc::new(binary_builder.finish());
1241        let key_array = Arc::new(StringArray::from(vec![
1242            Some("0"),
1243            Some("1"),
1244            Some("10"), // Out of bounds
1245        ]));
1246
1247        let result = json_get_string_impl(&[jsonb_array, key_array])?;
1248        let string_array = result.as_any().downcast_ref::<StringArray>().unwrap();
1249
1250        assert_eq!(string_array.len(), 3);
1251        assert_eq!(string_array.value(0), "first");
1252        assert_eq!(string_array.value(1), "second");
1253        assert!(string_array.is_null(2));
1254
1255        Ok(())
1256    }
1257}