Skip to main content

lance_arrow/
json.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! JSON support for Apache Arrow.
5
6use std::convert::TryFrom;
7use std::sync::Arc;
8
9use arrow_array::builder::{LargeBinaryBuilder, StringBuilder};
10use arrow_array::cast::AsArray;
11use arrow_array::{
12    Array, ArrayRef, FixedSizeListArray, LargeBinaryArray, LargeListArray, LargeStringArray,
13    ListArray, MapArray, RecordBatch, StringArray, StructArray,
14};
15use arrow_schema::{ArrowError, DataType, Field as ArrowField, Fields, Schema};
16
17use crate::ARROW_EXT_NAME_KEY;
18
19/// Arrow extension type name for JSON data (Lance internal)
20pub const JSON_EXT_NAME: &str = "lance.json";
21
22/// Arrow extension type name for JSON data (Arrow official)
23pub const ARROW_JSON_EXT_NAME: &str = "arrow.json";
24
25/// Check if a field is a JSON extension field (Lance internal JSONB storage)
26pub fn is_json_field(field: &ArrowField) -> bool {
27    field.data_type() == &DataType::LargeBinary
28        && field
29            .metadata()
30            .get(ARROW_EXT_NAME_KEY)
31            .map(|name| name == JSON_EXT_NAME)
32            .unwrap_or_default()
33}
34
35/// Check if a field is an Arrow JSON extension field (PyArrow pa.json() type)
36pub fn is_arrow_json_field(field: &ArrowField) -> bool {
37    // Arrow JSON extension type uses Utf8 or LargeUtf8 as storage type
38    (field.data_type() == &DataType::Utf8 || field.data_type() == &DataType::LargeUtf8)
39        && field
40            .metadata()
41            .get(ARROW_EXT_NAME_KEY)
42            .map(|name| name == ARROW_JSON_EXT_NAME)
43            .unwrap_or_default()
44}
45
46/// Check if a field or any of its descendants is a JSON field
47pub fn has_json_fields(field: &ArrowField) -> bool {
48    if is_json_field(field) {
49        return true;
50    }
51
52    match field.data_type() {
53        DataType::Struct(fields) => fields.iter().any(|f| has_json_fields(f)),
54        DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
55            has_json_fields(f)
56        }
57        DataType::Map(f, _) => has_json_fields(f),
58        _ => false,
59    }
60}
61
62/// Check if a field or any of its descendants is an Arrow JSON field
63pub fn has_arrow_json_fields(field: &ArrowField) -> bool {
64    if is_arrow_json_field(field) {
65        return true;
66    }
67
68    match field.data_type() {
69        DataType::Struct(fields) => fields.iter().any(|f| has_arrow_json_fields(f)),
70        DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
71            has_arrow_json_fields(f)
72        }
73        DataType::Map(f, _) => has_arrow_json_fields(f),
74        _ => false,
75    }
76}
77
78/// Create a JSON field with the appropriate extension metadata
79pub fn json_field(name: &str, nullable: bool) -> ArrowField {
80    let mut field = ArrowField::new(name, DataType::LargeBinary, nullable);
81    let mut metadata = std::collections::HashMap::new();
82    metadata.insert(ARROW_EXT_NAME_KEY.to_string(), JSON_EXT_NAME.to_string());
83    field.set_metadata(metadata);
84    field
85}
86
87/// A specialized array for JSON data stored as JSONB binary format
88#[derive(Debug, Clone)]
89pub struct JsonArray {
90    inner: LargeBinaryArray,
91}
92
93impl JsonArray {
94    /// Create a new JsonArray from an iterator of JSON strings
95    pub fn try_from_iter<I, S>(iter: I) -> Result<Self, ArrowError>
96    where
97        I: IntoIterator<Item = Option<S>>,
98        S: AsRef<str>,
99    {
100        let mut builder = LargeBinaryBuilder::new();
101
102        for json_str in iter {
103            match json_str {
104                Some(s) => {
105                    let encoded = encode_json(s.as_ref()).map_err(|e| {
106                        ArrowError::InvalidArgumentError(format!("Failed to encode JSON: {}", e))
107                    })?;
108                    builder.append_value(&encoded);
109                }
110                None => builder.append_null(),
111            }
112        }
113
114        Ok(Self {
115            inner: builder.finish(),
116        })
117    }
118
119    /// Get the underlying LargeBinaryArray
120    pub fn into_inner(self) -> LargeBinaryArray {
121        self.inner
122    }
123
124    /// Get a reference to the underlying LargeBinaryArray
125    pub fn inner(&self) -> &LargeBinaryArray {
126        &self.inner
127    }
128
129    /// Get the value at index i as decoded JSON string
130    pub fn value(&self, i: usize) -> Result<String, ArrowError> {
131        if self.inner.is_null(i) {
132            return Err(ArrowError::InvalidArgumentError(
133                "Value is null".to_string(),
134            ));
135        }
136
137        let jsonb_bytes = self.inner.value(i);
138        Ok(decode_json(jsonb_bytes))
139    }
140
141    /// Get the value at index i as raw JSONB bytes
142    pub fn value_bytes(&self, i: usize) -> &[u8] {
143        self.inner.value(i)
144    }
145
146    /// Get JSONPath value from the JSON at index i
147    pub fn json_path(&self, i: usize, path: &str) -> Result<Option<String>, ArrowError> {
148        if self.inner.is_null(i) {
149            return Ok(None);
150        }
151
152        let jsonb_bytes = self.inner.value(i);
153        get_json_path(jsonb_bytes, path).map_err(|e| {
154            ArrowError::InvalidArgumentError(format!("Failed to extract JSONPath: {}", e))
155        })
156    }
157
158    /// Convert to Arrow string array (JSON as UTF-8)
159    pub fn to_arrow_json(&self) -> ArrayRef {
160        let mut builder = arrow_array::builder::StringBuilder::new();
161
162        for i in 0..self.inner.len() {
163            if self.inner.is_null(i) {
164                builder.append_null();
165            } else {
166                let jsonb_bytes = self.inner.value(i);
167                let json_str = decode_json(jsonb_bytes);
168                builder.append_value(&json_str);
169            }
170        }
171
172        // Return as UTF-8 string array (Arrow represents JSON as strings)
173        Arc::new(builder.finish())
174    }
175
176    pub fn len(&self) -> usize {
177        self.inner.len()
178    }
179
180    pub fn is_empty(&self) -> bool {
181        self.inner.is_empty()
182    }
183
184    pub fn is_null(&self, i: usize) -> bool {
185        self.inner.is_null(i)
186    }
187}
188
189// TryFrom implementations for string arrays
190impl TryFrom<StringArray> for JsonArray {
191    type Error = ArrowError;
192
193    fn try_from(array: StringArray) -> Result<Self, Self::Error> {
194        Self::try_from(&array)
195    }
196}
197
198impl TryFrom<&StringArray> for JsonArray {
199    type Error = ArrowError;
200
201    fn try_from(array: &StringArray) -> Result<Self, Self::Error> {
202        let mut builder = LargeBinaryBuilder::with_capacity(array.len(), array.value_data().len());
203
204        for i in 0..array.len() {
205            if array.is_null(i) {
206                builder.append_null();
207            } else {
208                let json_str = array.value(i);
209                let encoded = encode_json(json_str).map_err(|e| {
210                    ArrowError::InvalidArgumentError(format!("Failed to encode JSON: {}", e))
211                })?;
212                builder.append_value(&encoded);
213            }
214        }
215
216        Ok(Self {
217            inner: builder.finish(),
218        })
219    }
220}
221
222impl TryFrom<LargeStringArray> for JsonArray {
223    type Error = ArrowError;
224
225    fn try_from(array: LargeStringArray) -> Result<Self, Self::Error> {
226        Self::try_from(&array)
227    }
228}
229
230impl TryFrom<&LargeStringArray> for JsonArray {
231    type Error = ArrowError;
232
233    fn try_from(array: &LargeStringArray) -> Result<Self, Self::Error> {
234        let mut builder = LargeBinaryBuilder::with_capacity(array.len(), array.value_data().len());
235
236        for i in 0..array.len() {
237            if array.is_null(i) {
238                builder.append_null();
239            } else {
240                let json_str = array.value(i);
241                let encoded = encode_json(json_str).map_err(|e| {
242                    ArrowError::InvalidArgumentError(format!("Failed to encode JSON: {}", e))
243                })?;
244                builder.append_value(&encoded);
245            }
246        }
247
248        Ok(Self {
249            inner: builder.finish(),
250        })
251    }
252}
253
254impl TryFrom<ArrayRef> for JsonArray {
255    type Error = ArrowError;
256
257    fn try_from(array_ref: ArrayRef) -> Result<Self, Self::Error> {
258        match array_ref.data_type() {
259            DataType::Utf8 => {
260                // Downcast is guaranteed to succeed after matching on DataType::Utf8
261                let string_array = array_ref
262                    .as_any()
263                    .downcast_ref::<StringArray>()
264                    .expect("DataType::Utf8 array must be StringArray");
265                Self::try_from(string_array)
266            }
267            DataType::LargeUtf8 => {
268                // Downcast is guaranteed to succeed after matching on DataType::LargeUtf8
269                let large_string_array = array_ref
270                    .as_any()
271                    .downcast_ref::<LargeStringArray>()
272                    .expect("DataType::LargeUtf8 array must be LargeStringArray");
273                Self::try_from(large_string_array)
274            }
275            dt => Err(ArrowError::InvalidArgumentError(format!(
276                "Unsupported array type for JSON: {:?}. Expected Utf8 or LargeUtf8",
277                dt
278            ))),
279        }
280    }
281}
282
283/// Encode JSON string to JSONB format
284pub fn encode_json(json_str: &str) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
285    let value = jsonb::parse_value(json_str.as_bytes())?;
286    Ok(value.to_vec())
287}
288
289/// Decode JSONB bytes to JSON string
290pub fn decode_json(jsonb_bytes: &[u8]) -> String {
291    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
292    raw_jsonb.to_string()
293}
294
295/// Extract JSONPath value from JSONB
296fn get_json_path(
297    jsonb_bytes: &[u8],
298    path: &str,
299) -> Result<Option<String>, Box<dyn std::error::Error>> {
300    let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes())?;
301    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
302    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
303
304    let values = selector.select_values(&json_path)?;
305    if values.is_empty() {
306        Ok(None)
307    } else {
308        Ok(Some(values[0].to_string()))
309    }
310}
311
312/// Convert an Arrow JSON field to Lance JSON field (with JSONB storage)
313pub fn arrow_json_to_lance_json(field: &ArrowField) -> ArrowField {
314    if is_arrow_json_field(field) {
315        return field_with_extension(field, DataType::LargeBinary, JSON_EXT_NAME);
316    }
317
318    let data_type = match field.data_type() {
319        DataType::Struct(fields) => {
320            let fields = fields
321                .iter()
322                .map(|field| Arc::new(arrow_json_to_lance_json(field)))
323                .collect::<Vec<_>>();
324            DataType::Struct(Fields::from(fields))
325        }
326        DataType::List(item) => DataType::List(Arc::new(arrow_json_to_lance_json(item))),
327        DataType::LargeList(item) => DataType::LargeList(Arc::new(arrow_json_to_lance_json(item))),
328        DataType::FixedSizeList(item, size) => {
329            DataType::FixedSizeList(Arc::new(arrow_json_to_lance_json(item)), *size)
330        }
331        DataType::Map(entries, keys_sorted) => {
332            DataType::Map(Arc::new(arrow_json_to_lance_json(entries)), *keys_sorted)
333        }
334        _ => return field.clone(),
335    };
336
337    field_with_data_type(field, data_type)
338}
339
340/// Convert a Lance JSON field to Arrow JSON field.
341pub fn lance_json_to_arrow_json(field: &ArrowField) -> ArrowField {
342    if is_json_field(field) {
343        return field_with_extension(field, DataType::Utf8, ARROW_JSON_EXT_NAME);
344    }
345
346    let data_type = match field.data_type() {
347        DataType::Struct(fields) => {
348            let fields = fields
349                .iter()
350                .map(|field| Arc::new(lance_json_to_arrow_json(field)))
351                .collect::<Vec<_>>();
352            DataType::Struct(Fields::from(fields))
353        }
354        DataType::List(item) => DataType::List(Arc::new(lance_json_to_arrow_json(item))),
355        DataType::LargeList(item) => DataType::LargeList(Arc::new(lance_json_to_arrow_json(item))),
356        DataType::FixedSizeList(item, size) => {
357            DataType::FixedSizeList(Arc::new(lance_json_to_arrow_json(item)), *size)
358        }
359        DataType::Map(entries, keys_sorted) => {
360            DataType::Map(Arc::new(lance_json_to_arrow_json(entries)), *keys_sorted)
361        }
362        _ => return field.clone(),
363    };
364
365    field_with_data_type(field, data_type)
366}
367
368fn field_with_data_type(field: &ArrowField, data_type: DataType) -> ArrowField {
369    ArrowField::new(field.name(), data_type, field.is_nullable())
370        .with_metadata(field.metadata().clone())
371}
372
373fn field_with_extension(
374    field: &ArrowField,
375    data_type: DataType,
376    extension_name: &str,
377) -> ArrowField {
378    let mut metadata = field.metadata().clone();
379    metadata.insert(ARROW_EXT_NAME_KEY.to_string(), extension_name.to_string());
380    ArrowField::new(field.name(), data_type, field.is_nullable()).with_metadata(metadata)
381}
382
383fn convert_json_array<F>(
384    field: &ArrowField,
385    array: &ArrayRef,
386    convert_leaf: &F,
387) -> Result<(ArrowField, ArrayRef, bool), ArrowError>
388where
389    F: Fn(&ArrowField, &ArrayRef) -> Result<Option<(ArrowField, ArrayRef)>, ArrowError>,
390{
391    if let Some((field, array)) = convert_leaf(field, array)? {
392        return Ok((field, array, true));
393    }
394
395    match field.data_type() {
396        DataType::Struct(fields) => {
397            let struct_array = array.as_struct();
398            let mut new_fields = Vec::with_capacity(fields.len());
399            let mut new_columns = Vec::with_capacity(fields.len());
400            let mut changed = false;
401
402            for (field, column) in fields.iter().zip(struct_array.columns()) {
403                let (new_field, new_column, field_changed) =
404                    convert_json_array(field, column, convert_leaf)?;
405                changed |= field_changed;
406                new_fields.push(Arc::new(new_field));
407                new_columns.push(new_column);
408            }
409
410            if changed {
411                let fields = Fields::from(new_fields);
412                let new_field = field_with_data_type(field, DataType::Struct(fields.clone()));
413                let new_array =
414                    StructArray::new(fields, new_columns, struct_array.nulls().cloned());
415                Ok((new_field, Arc::new(new_array) as ArrayRef, true))
416            } else {
417                Ok((field.clone(), array.clone(), false))
418            }
419        }
420        DataType::List(item) => {
421            let list_array: &ListArray = array.as_list();
422            let (new_item, new_values, changed) =
423                convert_json_array(item, list_array.values(), convert_leaf)?;
424            if changed {
425                let new_field =
426                    field_with_data_type(field, DataType::List(Arc::new(new_item.clone())));
427                let new_array = ListArray::new(
428                    Arc::new(new_item),
429                    list_array.offsets().clone(),
430                    new_values,
431                    list_array.nulls().cloned(),
432                );
433                Ok((new_field, Arc::new(new_array) as ArrayRef, true))
434            } else {
435                Ok((field.clone(), array.clone(), false))
436            }
437        }
438        DataType::LargeList(item) => {
439            let list_array: &LargeListArray = array.as_list();
440            let (new_item, new_values, changed) =
441                convert_json_array(item, list_array.values(), convert_leaf)?;
442            if changed {
443                let new_field =
444                    field_with_data_type(field, DataType::LargeList(Arc::new(new_item.clone())));
445                let new_array = LargeListArray::new(
446                    Arc::new(new_item),
447                    list_array.offsets().clone(),
448                    new_values,
449                    list_array.nulls().cloned(),
450                );
451                Ok((new_field, Arc::new(new_array) as ArrayRef, true))
452            } else {
453                Ok((field.clone(), array.clone(), false))
454            }
455        }
456        DataType::FixedSizeList(item, size) => {
457            let list_array: &FixedSizeListArray = array.as_fixed_size_list();
458            let (new_item, new_values, changed) =
459                convert_json_array(item, list_array.values(), convert_leaf)?;
460            if changed {
461                let new_field = field_with_data_type(
462                    field,
463                    DataType::FixedSizeList(Arc::new(new_item.clone()), *size),
464                );
465                let new_array = FixedSizeListArray::try_new_with_length(
466                    Arc::new(new_item),
467                    *size,
468                    new_values,
469                    list_array.nulls().cloned(),
470                    list_array.len(),
471                )?;
472                Ok((new_field, Arc::new(new_array) as ArrayRef, true))
473            } else {
474                Ok((field.clone(), array.clone(), false))
475            }
476        }
477        DataType::Map(entries, keys_sorted) => {
478            let map_array = array
479                .as_any()
480                .downcast_ref::<MapArray>()
481                .expect("DataType::Map array must be MapArray");
482            let entries_array = Arc::new(map_array.entries().clone()) as ArrayRef;
483            let (new_entries, new_entries_array, changed) =
484                convert_json_array(entries, &entries_array, convert_leaf)?;
485            if changed {
486                let entries_struct = new_entries_array
487                    .as_any()
488                    .downcast_ref::<StructArray>()
489                    .expect("Map entries must be StructArray")
490                    .clone();
491                let new_field = field_with_data_type(
492                    field,
493                    DataType::Map(Arc::new(new_entries.clone()), *keys_sorted),
494                );
495                let new_array = MapArray::new(
496                    Arc::new(new_entries),
497                    map_array.offsets().clone(),
498                    entries_struct,
499                    map_array.nulls().cloned(),
500                    *keys_sorted,
501                );
502                Ok((new_field, Arc::new(new_array) as ArrayRef, true))
503            } else {
504                Ok((field.clone(), array.clone(), false))
505            }
506        }
507        _ => Ok((field.clone(), array.clone(), false)),
508    }
509}
510
511fn convert_arrow_json_array(
512    field: &ArrowField,
513    array: &ArrayRef,
514) -> Result<(ArrowField, ArrayRef, bool), ArrowError> {
515    convert_json_array(field, array, &|field, array| {
516        if is_arrow_json_field(field) {
517            let json_array = JsonArray::try_from(array.clone())?;
518            Ok(Some((
519                arrow_json_to_lance_json(field),
520                Arc::new(json_array.into_inner()) as ArrayRef,
521            )))
522        } else {
523            Ok(None)
524        }
525    })
526}
527
528fn convert_lance_json_array(
529    field: &ArrowField,
530    array: &ArrayRef,
531) -> Result<(ArrowField, ArrayRef, bool), ArrowError> {
532    convert_json_array(field, array, &|field, array| {
533        if is_json_field(field) {
534            let binary_array = array
535                .as_any()
536                .downcast_ref::<LargeBinaryArray>()
537                .expect("Lance JSON field must be LargeBinaryArray");
538            let mut builder = StringBuilder::new();
539
540            for i in 0..binary_array.len() {
541                if binary_array.is_null(i) {
542                    builder.append_null();
543                } else {
544                    let jsonb_bytes = binary_array.value(i);
545                    let json_str = decode_json(jsonb_bytes);
546                    builder.append_value(&json_str);
547                }
548            }
549
550            Ok(Some((
551                lance_json_to_arrow_json(field),
552                Arc::new(builder.finish()) as ArrayRef,
553            )))
554        } else {
555            Ok(None)
556        }
557    })
558}
559
560/// Convert a RecordBatch with Lance JSON columns (JSONB) back to Arrow JSON format (strings)
561pub fn convert_lance_json_to_arrow(
562    batch: &arrow_array::RecordBatch,
563) -> Result<arrow_array::RecordBatch, ArrowError> {
564    let schema = batch.schema();
565    let mut needs_conversion = false;
566    let mut new_fields = Vec::with_capacity(schema.fields().len());
567    let mut new_columns = Vec::with_capacity(batch.num_columns());
568
569    for (i, field) in schema.fields().iter().enumerate() {
570        let column = batch.column(i);
571        let (new_field, new_column, changed) = convert_lance_json_array(field, column)?;
572
573        needs_conversion |= changed;
574        new_fields.push(new_field);
575        new_columns.push(new_column);
576    }
577
578    if needs_conversion {
579        let new_schema = Arc::new(Schema::new_with_metadata(
580            new_fields,
581            schema.metadata().clone(),
582        ));
583        RecordBatch::try_new(new_schema, new_columns)
584    } else {
585        // No conversion needed, return original batch
586        Ok(batch.clone())
587    }
588}
589
590/// Convert a RecordBatch with Arrow JSON columns to Lance JSON format (JSONB)
591pub fn convert_json_columns(
592    batch: &arrow_array::RecordBatch,
593) -> Result<arrow_array::RecordBatch, ArrowError> {
594    let schema = batch.schema();
595    let mut needs_conversion = false;
596    let mut new_fields = Vec::with_capacity(schema.fields().len());
597    let mut new_columns = Vec::with_capacity(batch.num_columns());
598
599    for (i, field) in schema.fields().iter().enumerate() {
600        let column = batch.column(i);
601        let (new_field, new_column, changed) = convert_arrow_json_array(field, column)?;
602
603        needs_conversion |= changed;
604        new_fields.push(new_field);
605        new_columns.push(new_column);
606    }
607
608    if needs_conversion {
609        let new_schema = Arc::new(Schema::new_with_metadata(
610            new_fields,
611            schema.metadata().clone(),
612        ));
613        RecordBatch::try_new(new_schema, new_columns)
614    } else {
615        // No conversion needed, return original batch
616        Ok(batch.clone())
617    }
618}
619
620#[cfg(test)]
621mod tests {
622    use super::*;
623
624    #[test]
625    fn test_json_field_creation() {
626        let field = json_field("data", true);
627        assert_eq!(field.name(), "data");
628        assert_eq!(field.data_type(), &DataType::LargeBinary);
629        assert!(field.is_nullable());
630        assert!(is_json_field(&field));
631    }
632
633    #[test]
634    fn test_json_array_from_strings() {
635        let json_strings = vec![
636            Some(r#"{"name": "Alice", "age": 30}"#),
637            None,
638            Some(r#"{"name": "Bob", "age": 25}"#),
639        ];
640
641        let array = JsonArray::try_from_iter(json_strings).unwrap();
642        assert_eq!(array.len(), 3);
643        assert!(!array.is_null(0));
644        assert!(array.is_null(1));
645        assert!(!array.is_null(2));
646
647        let decoded = array.value(0).unwrap();
648        assert!(decoded.contains("Alice"));
649    }
650
651    #[test]
652    fn test_json_array_from_string_array() {
653        let string_array = StringArray::from(vec![
654            Some(r#"{"name": "Alice"}"#),
655            Some(r#"{"name": "Bob"}"#),
656            None,
657        ]);
658
659        let json_array = JsonArray::try_from(string_array).unwrap();
660        assert_eq!(json_array.len(), 3);
661        assert!(!json_array.is_null(0));
662        assert!(!json_array.is_null(1));
663        assert!(json_array.is_null(2));
664    }
665
666    #[test]
667    fn test_json_path_extraction() {
668        let json_array = JsonArray::try_from_iter(vec![
669            Some(r#"{"user": {"name": "Alice", "age": 30}}"#),
670            Some(r#"{"user": {"name": "Bob"}}"#),
671        ])
672        .unwrap();
673
674        let name = json_array.json_path(0, "$.user.name").unwrap();
675        assert_eq!(name, Some("\"Alice\"".to_string()));
676
677        let age = json_array.json_path(1, "$.user.age").unwrap();
678        assert_eq!(age, None);
679    }
680
681    #[test]
682    fn test_convert_json_columns() {
683        // Create a batch with Arrow JSON column
684        let json_strings = vec![Some(r#"{"name": "Alice"}"#), Some(r#"{"name": "Bob"}"#)];
685        let json_arr = StringArray::from(json_strings);
686
687        // Create field with arrow.json extension
688        let mut field = ArrowField::new("data", DataType::Utf8, false);
689        let mut metadata = std::collections::HashMap::new();
690        metadata.insert(
691            ARROW_EXT_NAME_KEY.to_string(),
692            ARROW_JSON_EXT_NAME.to_string(),
693        );
694        field.set_metadata(metadata);
695
696        let schema = Arc::new(Schema::new(vec![field]));
697        let batch = RecordBatch::try_new(schema, vec![Arc::new(json_arr) as ArrayRef]).unwrap();
698
699        // Convert the batch
700        let converted = convert_json_columns(&batch).unwrap();
701
702        // Check the converted schema
703        assert_eq!(converted.num_columns(), 1);
704        let converted_schema = converted.schema();
705        let converted_field = converted_schema.field(0);
706        assert_eq!(converted_field.data_type(), &DataType::LargeBinary);
707        assert_eq!(
708            converted_field.metadata().get(ARROW_EXT_NAME_KEY),
709            Some(&JSON_EXT_NAME.to_string())
710        );
711
712        // Check the data was converted
713        let converted_column = converted.column(0);
714        assert_eq!(converted_column.data_type(), &DataType::LargeBinary);
715        assert_eq!(converted_column.len(), 2);
716
717        // Verify the data is valid JSONB
718        let binary_array = converted_column
719            .as_any()
720            .downcast_ref::<LargeBinaryArray>()
721            .unwrap();
722        for i in 0..binary_array.len() {
723            let jsonb_bytes = binary_array.value(i);
724            let decoded = decode_json(jsonb_bytes);
725            assert!(decoded.contains("name"));
726        }
727    }
728
729    #[test]
730    fn test_convert_nested_json_columns() {
731        use arrow_buffer::{OffsetBuffer, ScalarBuffer};
732
733        let uri_field = Arc::new(ArrowField::new("uri", DataType::Utf8, false));
734        let mut metadata = std::collections::HashMap::new();
735        metadata.insert(
736            ARROW_EXT_NAME_KEY.to_string(),
737            ARROW_JSON_EXT_NAME.to_string(),
738        );
739        let extra_field =
740            Arc::new(ArrowField::new("extra", DataType::Utf8, true).with_metadata(metadata));
741        let item_fields = Fields::from(vec![uri_field, extra_field]);
742
743        let values = StructArray::new(
744            item_fields.clone(),
745            vec![
746                Arc::new(StringArray::from(vec![Some("a.jpg"), Some("b.jpg")])) as ArrayRef,
747                Arc::new(StringArray::from(vec![
748                    Some(r#"{"codec":"h264"}"#),
749                    None::<&str>,
750                ])) as ArrayRef,
751            ],
752            None,
753        );
754        let item = Arc::new(ArrowField::new("item", DataType::Struct(item_fields), true));
755        let media = ListArray::new(
756            item,
757            OffsetBuffer::new(ScalarBuffer::from(vec![0, 1, 2])),
758            Arc::new(values),
759            None,
760        );
761        let schema = Arc::new(Schema::new(vec![ArrowField::new(
762            "media",
763            media.data_type().clone(),
764            true,
765        )]));
766        let batch = RecordBatch::try_new(schema, vec![Arc::new(media) as ArrayRef]).unwrap();
767
768        assert!(has_arrow_json_fields(batch.schema().field(0)));
769
770        let converted = convert_json_columns(&batch).unwrap();
771        let converted_schema = converted.schema();
772        let DataType::List(item) = converted_schema.field(0).data_type() else {
773            panic!("expected list field");
774        };
775        let DataType::Struct(fields) = item.data_type() else {
776            panic!("expected struct item");
777        };
778        assert!(is_json_field(&fields[1]));
779
780        let list_array: &ListArray = converted.column(0).as_list();
781        let values = list_array.values().as_struct();
782        let extra = values
783            .column(1)
784            .as_any()
785            .downcast_ref::<LargeBinaryArray>()
786            .unwrap();
787        assert!(decode_json(extra.value(0)).contains("h264"));
788        assert!(extra.is_null(1));
789
790        let logical = convert_lance_json_to_arrow(&converted).unwrap();
791        let logical_schema = logical.schema();
792        let DataType::List(item) = logical_schema.field(0).data_type() else {
793            panic!("expected list field");
794        };
795        let DataType::Struct(fields) = item.data_type() else {
796            panic!("expected struct item");
797        };
798        assert!(is_arrow_json_field(&fields[1]));
799
800        let list_array: &ListArray = logical.column(0).as_list();
801        let values = list_array.values().as_struct();
802        let extra = values
803            .column(1)
804            .as_any()
805            .downcast_ref::<StringArray>()
806            .unwrap();
807        assert!(extra.value(0).contains("h264"));
808        assert!(extra.is_null(1));
809    }
810
811    #[test]
812    fn test_convert_fixed_size_list_zero_json_preserves_length() {
813        let mut metadata = std::collections::HashMap::new();
814        metadata.insert(
815            ARROW_EXT_NAME_KEY.to_string(),
816            ARROW_JSON_EXT_NAME.to_string(),
817        );
818        let item = Arc::new(ArrowField::new("item", DataType::Utf8, true).with_metadata(metadata));
819        let values = Arc::new(StringArray::from(Vec::<Option<&str>>::new())) as ArrayRef;
820        let lists = FixedSizeListArray::try_new_with_length(item, 0, values, None, 3).unwrap();
821        let schema = Arc::new(Schema::new(vec![ArrowField::new(
822            "lists",
823            lists.data_type().clone(),
824            true,
825        )]));
826        let batch = RecordBatch::try_new(schema, vec![Arc::new(lists) as ArrayRef]).unwrap();
827
828        let converted = convert_json_columns(&batch).unwrap();
829        assert_eq!(converted.num_rows(), 3);
830        assert_eq!(converted.column(0).len(), 3);
831
832        let converted_schema = converted.schema();
833        let DataType::FixedSizeList(item, size) = converted_schema.field(0).data_type() else {
834            panic!("expected fixed size list field");
835        };
836        assert_eq!(*size, 0);
837        assert!(is_json_field(item));
838
839        let logical = convert_lance_json_to_arrow(&converted).unwrap();
840        assert_eq!(logical.num_rows(), 3);
841        assert_eq!(logical.column(0).len(), 3);
842
843        let logical_schema = logical.schema();
844        let DataType::FixedSizeList(item, size) = logical_schema.field(0).data_type() else {
845            panic!("expected fixed size list field");
846        };
847        assert_eq!(*size, 0);
848        assert!(is_arrow_json_field(item));
849    }
850
851    #[test]
852    fn test_has_json_fields() {
853        // Test direct JSON field
854        let json_f = json_field("data", true);
855        assert!(has_json_fields(&json_f));
856
857        // Test non-JSON field
858        let non_json = ArrowField::new("data", DataType::Utf8, true);
859        assert!(!has_json_fields(&non_json));
860
861        // Test struct containing JSON field
862        let struct_field = ArrowField::new(
863            "struct",
864            DataType::Struct(vec![json_field("nested_json", true)].into()),
865            true,
866        );
867        assert!(has_json_fields(&struct_field));
868
869        // Test struct without JSON field
870        let struct_no_json = ArrowField::new(
871            "struct",
872            DataType::Struct(vec![ArrowField::new("text", DataType::Utf8, true)].into()),
873            true,
874        );
875        assert!(!has_json_fields(&struct_no_json));
876
877        // Test List containing JSON field
878        let list_field = ArrowField::new(
879            "list",
880            DataType::List(Arc::new(json_field("item", true))),
881            true,
882        );
883        assert!(has_json_fields(&list_field));
884
885        // Test LargeList containing JSON field
886        let large_list_field = ArrowField::new(
887            "large_list",
888            DataType::LargeList(Arc::new(json_field("item", true))),
889            true,
890        );
891        assert!(has_json_fields(&large_list_field));
892
893        // Test FixedSizeList containing JSON field
894        let fixed_list_field = ArrowField::new(
895            "fixed_list",
896            DataType::FixedSizeList(Arc::new(json_field("item", true)), 3),
897            true,
898        );
899        assert!(has_json_fields(&fixed_list_field));
900
901        // Test Map containing JSON field
902        let map_field = ArrowField::new(
903            "map",
904            DataType::Map(
905                Arc::new(ArrowField::new(
906                    "entries",
907                    DataType::Struct(
908                        vec![
909                            ArrowField::new("key", DataType::Utf8, false),
910                            json_field("value", true),
911                        ]
912                        .into(),
913                    ),
914                    false,
915                )),
916                false,
917            ),
918            true,
919        );
920        assert!(has_json_fields(&map_field));
921    }
922
923    #[test]
924    fn test_json_array_inner() {
925        let json_array = JsonArray::try_from_iter(vec![Some(r#"{"a": 1}"#)]).unwrap();
926        let inner = json_array.inner();
927        assert_eq!(inner.len(), 1);
928    }
929
930    #[test]
931    fn test_json_array_value_null_error() {
932        let json_array = JsonArray::try_from_iter(vec![None::<&str>]).unwrap();
933        let result = json_array.value(0);
934        assert!(result.is_err());
935        assert!(result.unwrap_err().to_string().contains("null"));
936    }
937
938    #[test]
939    fn test_json_array_value_bytes() {
940        let json_array = JsonArray::try_from_iter(vec![Some(r#"{"a": 1}"#)]).unwrap();
941        let bytes = json_array.value_bytes(0);
942        assert!(!bytes.is_empty());
943    }
944
945    #[test]
946    fn test_json_path_with_null() {
947        let json_array =
948            JsonArray::try_from_iter(vec![Some(r#"{"user": {"name": "Alice"}}"#), None::<&str>])
949                .unwrap();
950
951        let result = json_array.json_path(1, "$.user.name").unwrap();
952        assert_eq!(result, None);
953    }
954
955    #[test]
956    fn test_to_arrow_json() {
957        let json_array = JsonArray::try_from_iter(vec![
958            Some(r#"{"name": "Alice"}"#),
959            None::<&str>,
960            Some(r#"{"name": "Bob"}"#),
961        ])
962        .unwrap();
963
964        let arrow_json = json_array.to_arrow_json();
965        assert_eq!(arrow_json.len(), 3);
966        assert!(!arrow_json.is_null(0));
967        assert!(arrow_json.is_null(1));
968        assert!(!arrow_json.is_null(2));
969
970        let string_array = arrow_json.as_any().downcast_ref::<StringArray>().unwrap();
971        assert!(string_array.value(0).contains("Alice"));
972        assert!(string_array.value(2).contains("Bob"));
973    }
974
975    #[test]
976    fn test_json_array_trait_methods() {
977        let json_array =
978            JsonArray::try_from_iter(vec![Some(r#"{"a": 1}"#), Some(r#"{"b": 2}"#)]).unwrap();
979
980        // Wrapper methods
981        assert_eq!(json_array.len(), 2);
982        assert!(!json_array.is_empty());
983        assert!(!json_array.is_null(0));
984
985        // Underlying Arrow array
986        assert_eq!(json_array.inner().data_type(), &DataType::LargeBinary);
987        assert_eq!(json_array.inner().len(), 2);
988    }
989
990    #[test]
991    fn test_json_array_empty() {
992        let json_array = JsonArray::try_from_iter(Vec::<Option<&str>>::new()).unwrap();
993        assert!(json_array.is_empty());
994        assert_eq!(json_array.len(), 0);
995    }
996
997    #[test]
998    fn test_try_from_large_string_array() {
999        let large_string_array = LargeStringArray::from(vec![
1000            Some(r#"{"name": "Alice"}"#),
1001            Some(r#"{"name": "Bob"}"#),
1002            None,
1003        ]);
1004
1005        // Test TryFrom<&LargeStringArray>
1006        let json_array = JsonArray::try_from(&large_string_array).unwrap();
1007        assert_eq!(json_array.len(), 3);
1008        assert!(!json_array.is_null(0));
1009        assert!(!json_array.is_null(1));
1010        assert!(json_array.is_null(2));
1011
1012        // Test TryFrom<LargeStringArray> (owned)
1013        let large_string_array2 = LargeStringArray::from(vec![Some(r#"{"x": 1}"#)]);
1014        let json_array2 = JsonArray::try_from(large_string_array2).unwrap();
1015        assert_eq!(json_array2.len(), 1);
1016    }
1017
1018    #[test]
1019    fn test_try_from_array_ref() {
1020        // Test with Utf8
1021        let string_array: ArrayRef = Arc::new(StringArray::from(vec![
1022            Some(r#"{"a": 1}"#),
1023            Some(r#"{"b": 2}"#),
1024        ]));
1025        let json_array = JsonArray::try_from(string_array).unwrap();
1026        assert_eq!(json_array.len(), 2);
1027
1028        // Test with LargeUtf8
1029        let large_string_array: ArrayRef = Arc::new(LargeStringArray::from(vec![
1030            Some(r#"{"c": 3}"#),
1031            Some(r#"{"d": 4}"#),
1032        ]));
1033        let json_array2 = JsonArray::try_from(large_string_array).unwrap();
1034        assert_eq!(json_array2.len(), 2);
1035
1036        // Test with unsupported type
1037        let int_array: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]));
1038        let result = JsonArray::try_from(int_array);
1039        assert!(result.is_err());
1040        assert!(result.unwrap_err().to_string().contains("Unsupported"));
1041    }
1042
1043    #[test]
1044    fn test_arrow_json_to_lance_json_non_json_field() {
1045        // Test that non-JSON fields are returned unchanged
1046        let field = ArrowField::new("text", DataType::Utf8, true);
1047        let converted = arrow_json_to_lance_json(&field);
1048        assert_eq!(converted.data_type(), &DataType::Utf8);
1049        assert_eq!(converted.name(), "text");
1050    }
1051
1052    #[test]
1053    fn test_convert_lance_json_to_arrow() {
1054        // Create a batch with Lance JSON column (JSONB)
1055        let json_array = JsonArray::try_from_iter(vec![
1056            Some(r#"{"name": "Alice"}"#),
1057            None::<&str>,
1058            Some(r#"{"name": "Bob"}"#),
1059        ])
1060        .unwrap();
1061
1062        let lance_json_field = json_field("data", true);
1063        let schema = Arc::new(Schema::new(vec![lance_json_field]));
1064        let batch =
1065            RecordBatch::try_new(schema, vec![Arc::new(json_array.into_inner()) as ArrayRef])
1066                .unwrap();
1067
1068        // Convert back to Arrow JSON
1069        let converted = convert_lance_json_to_arrow(&batch).unwrap();
1070
1071        // Check schema
1072        let converted_schema = converted.schema();
1073        let converted_field = converted_schema.field(0);
1074        assert_eq!(converted_field.data_type(), &DataType::Utf8);
1075        assert_eq!(
1076            converted_field.metadata().get(ARROW_EXT_NAME_KEY),
1077            Some(&ARROW_JSON_EXT_NAME.to_string())
1078        );
1079
1080        // Check data
1081        let string_array = converted
1082            .column(0)
1083            .as_any()
1084            .downcast_ref::<StringArray>()
1085            .unwrap();
1086        assert!(!string_array.is_null(0));
1087        assert!(string_array.is_null(1));
1088        assert!(!string_array.is_null(2));
1089        assert!(string_array.value(0).contains("Alice"));
1090        assert!(string_array.value(2).contains("Bob"));
1091    }
1092
1093    #[test]
1094    fn test_convert_lance_json_to_arrow_empty_batch() {
1095        // Create an empty batch with Lance JSON column
1096        let lance_json_field = json_field("data", true);
1097        let schema = Arc::new(Schema::new(vec![lance_json_field]));
1098        let empty_binary = LargeBinaryBuilder::new().finish();
1099        let batch = RecordBatch::try_new(schema, vec![Arc::new(empty_binary) as ArrayRef]).unwrap();
1100
1101        // Convert back to Arrow JSON
1102        let converted = convert_lance_json_to_arrow(&batch).unwrap();
1103        assert_eq!(converted.num_rows(), 0);
1104        assert_eq!(converted.schema().field(0).data_type(), &DataType::Utf8);
1105    }
1106
1107    #[test]
1108    fn test_convert_lance_json_to_arrow_no_json_columns() {
1109        // Create a batch without JSON columns
1110        let field = ArrowField::new("text", DataType::Utf8, true);
1111        let schema = Arc::new(Schema::new(vec![field]));
1112        let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
1113        let batch = RecordBatch::try_new(schema, vec![Arc::new(string_array) as ArrayRef]).unwrap();
1114
1115        // Convert - should return the same batch
1116        let converted = convert_lance_json_to_arrow(&batch).unwrap();
1117        assert_eq!(converted.num_columns(), 1);
1118        assert_eq!(converted.schema().field(0).data_type(), &DataType::Utf8);
1119    }
1120
1121    #[test]
1122    fn test_convert_json_columns_empty_batch() {
1123        // Create an empty batch with Arrow JSON column
1124        let mut field = ArrowField::new("data", DataType::Utf8, false);
1125        let mut metadata = std::collections::HashMap::new();
1126        metadata.insert(
1127            ARROW_EXT_NAME_KEY.to_string(),
1128            ARROW_JSON_EXT_NAME.to_string(),
1129        );
1130        field.set_metadata(metadata);
1131
1132        let schema = Arc::new(Schema::new(vec![field]));
1133        let empty_strings = arrow_array::builder::StringBuilder::new().finish();
1134        let batch =
1135            RecordBatch::try_new(schema, vec![Arc::new(empty_strings) as ArrayRef]).unwrap();
1136
1137        let converted = convert_json_columns(&batch).unwrap();
1138        assert_eq!(converted.num_rows(), 0);
1139        assert_eq!(
1140            converted.schema().field(0).data_type(),
1141            &DataType::LargeBinary
1142        );
1143    }
1144
1145    #[test]
1146    fn test_convert_json_columns_large_string() {
1147        // Create a batch with Arrow JSON column using LargeUtf8
1148        let json_strings = LargeStringArray::from(vec![
1149            Some(r#"{"name": "Alice"}"#),
1150            Some(r#"{"name": "Bob"}"#),
1151        ]);
1152
1153        let mut field = ArrowField::new("data", DataType::LargeUtf8, false);
1154        let mut metadata = std::collections::HashMap::new();
1155        metadata.insert(
1156            ARROW_EXT_NAME_KEY.to_string(),
1157            ARROW_JSON_EXT_NAME.to_string(),
1158        );
1159        field.set_metadata(metadata);
1160
1161        let schema = Arc::new(Schema::new(vec![field]));
1162        let batch = RecordBatch::try_new(schema, vec![Arc::new(json_strings) as ArrayRef]).unwrap();
1163
1164        let converted = convert_json_columns(&batch).unwrap();
1165        assert_eq!(converted.num_columns(), 1);
1166        assert_eq!(
1167            converted.schema().field(0).data_type(),
1168            &DataType::LargeBinary
1169        );
1170        assert_eq!(converted.num_rows(), 2);
1171    }
1172
1173    #[test]
1174    fn test_convert_json_columns_no_json_columns() {
1175        // Create a batch without JSON columns
1176        let field = ArrowField::new("text", DataType::Utf8, true);
1177        let schema = Arc::new(Schema::new(vec![field]));
1178        let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
1179        let batch = RecordBatch::try_new(schema, vec![Arc::new(string_array) as ArrayRef]).unwrap();
1180
1181        // Convert - should return the same batch
1182        let converted = convert_json_columns(&batch).unwrap();
1183        assert_eq!(converted.num_columns(), 1);
1184        assert_eq!(converted.schema().field(0).data_type(), &DataType::Utf8);
1185    }
1186
1187    #[test]
1188    fn test_convert_json_columns_mixed_columns() {
1189        // Create a batch with both JSON and non-JSON columns
1190        let json_strings = StringArray::from(vec![
1191            Some(r#"{"name": "Alice"}"#),
1192            Some(r#"{"name": "Bob"}"#),
1193        ]);
1194        let text_strings = StringArray::from(vec![Some("hello"), Some("world")]);
1195
1196        let mut json_field = ArrowField::new("json_data", DataType::Utf8, false);
1197        let mut metadata = std::collections::HashMap::new();
1198        metadata.insert(
1199            ARROW_EXT_NAME_KEY.to_string(),
1200            ARROW_JSON_EXT_NAME.to_string(),
1201        );
1202        json_field.set_metadata(metadata);
1203
1204        let text_field = ArrowField::new("text_data", DataType::Utf8, true);
1205
1206        let schema = Arc::new(Schema::new(vec![json_field, text_field]));
1207        let batch = RecordBatch::try_new(
1208            schema,
1209            vec![
1210                Arc::new(json_strings) as ArrayRef,
1211                Arc::new(text_strings) as ArrayRef,
1212            ],
1213        )
1214        .unwrap();
1215
1216        let converted = convert_json_columns(&batch).unwrap();
1217        assert_eq!(converted.num_columns(), 2);
1218        assert_eq!(
1219            converted.schema().field(0).data_type(),
1220            &DataType::LargeBinary
1221        );
1222        assert_eq!(converted.schema().field(1).data_type(), &DataType::Utf8);
1223    }
1224
1225    #[test]
1226    fn test_is_arrow_json_field_large_utf8() {
1227        // Test with LargeUtf8 storage type
1228        let mut field = ArrowField::new("data", DataType::LargeUtf8, true);
1229        let mut metadata = std::collections::HashMap::new();
1230        metadata.insert(
1231            ARROW_EXT_NAME_KEY.to_string(),
1232            ARROW_JSON_EXT_NAME.to_string(),
1233        );
1234        field.set_metadata(metadata);
1235
1236        assert!(is_arrow_json_field(&field));
1237    }
1238
1239    #[test]
1240    fn test_encode_json_invalid() {
1241        // Test encoding invalid JSON
1242        let result = encode_json("not valid json {");
1243        assert!(result.is_err());
1244    }
1245
1246    #[test]
1247    fn test_json_array_from_invalid_json() {
1248        // Test creating JsonArray from invalid JSON strings
1249        let result = JsonArray::try_from_iter(vec![Some("invalid json {")]);
1250        assert!(result.is_err());
1251        assert!(result.unwrap_err().to_string().contains("Failed to encode"));
1252    }
1253
1254    #[test]
1255    fn test_try_from_string_array_invalid_json() {
1256        let string_array = StringArray::from(vec![Some("invalid json {")]);
1257        let result = JsonArray::try_from(string_array);
1258        assert!(result.is_err());
1259    }
1260
1261    #[test]
1262    fn test_try_from_large_string_array_invalid_json() {
1263        let large_string_array = LargeStringArray::from(vec![Some("invalid json {")]);
1264        let result = JsonArray::try_from(large_string_array);
1265        assert!(result.is_err());
1266    }
1267
1268    #[test]
1269    fn test_convert_lance_json_to_arrow_mixed_columns() {
1270        // Create a batch with both JSON and non-JSON columns
1271        let json_array = JsonArray::try_from_iter(vec![
1272            Some(r#"{"name": "Alice"}"#),
1273            Some(r#"{"name": "Bob"}"#),
1274        ])
1275        .unwrap();
1276        let text_strings = StringArray::from(vec![Some("hello"), Some("world")]);
1277
1278        let json_f = json_field("json_data", true);
1279        let text_field = ArrowField::new("text_data", DataType::Utf8, true);
1280
1281        let schema = Arc::new(Schema::new(vec![json_f, text_field]));
1282        let batch = RecordBatch::try_new(
1283            schema,
1284            vec![
1285                Arc::new(json_array.into_inner()) as ArrayRef,
1286                Arc::new(text_strings) as ArrayRef,
1287            ],
1288        )
1289        .unwrap();
1290
1291        let converted = convert_lance_json_to_arrow(&batch).unwrap();
1292        assert_eq!(converted.num_columns(), 2);
1293        assert_eq!(converted.schema().field(0).data_type(), &DataType::Utf8);
1294        assert_eq!(converted.schema().field(1).data_type(), &DataType::Utf8);
1295    }
1296
1297    #[test]
1298    fn test_json_path_invalid_path() {
1299        let json_array = JsonArray::try_from_iter(vec![Some(r#"{"a": 1}"#)]).unwrap();
1300        // Invalid JSONPath syntax should return error
1301        let result = json_array.json_path(0, "invalid path without $");
1302        assert!(result.is_err());
1303        assert!(
1304            result
1305                .unwrap_err()
1306                .to_string()
1307                .contains("Failed to extract JSONPath")
1308        );
1309    }
1310
1311    #[test]
1312    fn test_convert_json_columns_invalid_storage_type() {
1313        // Create a batch with Arrow JSON field but wrong storage type (Int32 instead of Utf8)
1314        let int_array = arrow_array::Int32Array::from(vec![1, 2, 3]);
1315
1316        let mut field = ArrowField::new("data", DataType::Int32, false);
1317        let mut metadata = std::collections::HashMap::new();
1318        metadata.insert(
1319            ARROW_EXT_NAME_KEY.to_string(),
1320            ARROW_JSON_EXT_NAME.to_string(),
1321        );
1322        field.set_metadata(metadata);
1323
1324        let schema = Arc::new(Schema::new(vec![field]));
1325        let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array) as ArrayRef]).unwrap();
1326
1327        // This should succeed since Int32 doesn't match is_arrow_json_field check
1328        // (is_arrow_json_field requires Utf8 or LargeUtf8)
1329        let result = convert_json_columns(&batch);
1330        assert!(result.is_ok());
1331    }
1332
1333    #[test]
1334    fn test_is_json_field_wrong_extension() {
1335        // LargeBinary field without the correct extension metadata
1336        let field = ArrowField::new("data", DataType::LargeBinary, true);
1337        assert!(!is_json_field(&field));
1338
1339        // LargeBinary field with wrong extension name
1340        let mut field2 = ArrowField::new("data", DataType::LargeBinary, true);
1341        let mut metadata = std::collections::HashMap::new();
1342        metadata.insert(
1343            ARROW_EXT_NAME_KEY.to_string(),
1344            "other.extension".to_string(),
1345        );
1346        field2.set_metadata(metadata);
1347        assert!(!is_json_field(&field2));
1348    }
1349
1350    #[test]
1351    fn test_is_arrow_json_field_wrong_extension() {
1352        // Utf8 field without extension metadata
1353        let field = ArrowField::new("data", DataType::Utf8, true);
1354        assert!(!is_arrow_json_field(&field));
1355
1356        // Utf8 field with wrong extension name
1357        let mut field2 = ArrowField::new("data", DataType::Utf8, true);
1358        let mut metadata = std::collections::HashMap::new();
1359        metadata.insert(
1360            ARROW_EXT_NAME_KEY.to_string(),
1361            "other.extension".to_string(),
1362        );
1363        field2.set_metadata(metadata);
1364        assert!(!is_arrow_json_field(&field2));
1365
1366        // Wrong type entirely
1367        let field3 = ArrowField::new("data", DataType::Int32, true);
1368        assert!(!is_arrow_json_field(&field3));
1369    }
1370
1371    #[test]
1372    fn test_convert_json_columns_invalid_json_utf8() {
1373        // Test error propagation when converting invalid JSON (Utf8)
1374        let invalid_json = StringArray::from(vec![Some("invalid json {")]);
1375
1376        let mut field = ArrowField::new("data", DataType::Utf8, false);
1377        let mut metadata = std::collections::HashMap::new();
1378        metadata.insert(
1379            ARROW_EXT_NAME_KEY.to_string(),
1380            ARROW_JSON_EXT_NAME.to_string(),
1381        );
1382        field.set_metadata(metadata);
1383
1384        let schema = Arc::new(Schema::new(vec![field]));
1385        let batch = RecordBatch::try_new(schema, vec![Arc::new(invalid_json) as ArrayRef]).unwrap();
1386
1387        let result = convert_json_columns(&batch);
1388        assert!(result.is_err());
1389    }
1390
1391    #[test]
1392    fn test_convert_json_columns_invalid_json_large_utf8() {
1393        // Test error propagation when converting invalid JSON (LargeUtf8)
1394        let invalid_json = LargeStringArray::from(vec![Some("invalid json {")]);
1395
1396        let mut field = ArrowField::new("data", DataType::LargeUtf8, false);
1397        let mut metadata = std::collections::HashMap::new();
1398        metadata.insert(
1399            ARROW_EXT_NAME_KEY.to_string(),
1400            ARROW_JSON_EXT_NAME.to_string(),
1401        );
1402        field.set_metadata(metadata);
1403
1404        let schema = Arc::new(Schema::new(vec![field]));
1405        let batch = RecordBatch::try_new(schema, vec![Arc::new(invalid_json) as ArrayRef]).unwrap();
1406
1407        let result = convert_json_columns(&batch);
1408        assert!(result.is_err());
1409    }
1410
1411    #[test]
1412    fn test_json_path_on_corrupted_jsonb() {
1413        // Create corrupted JSONB bytes directly
1414        let corrupted_bytes: &[u8] = &[0xFF, 0xFE, 0x00, 0x01, 0x02];
1415        let corrupted_binary = LargeBinaryArray::from(vec![Some(corrupted_bytes)]);
1416
1417        // Wrap in JsonArray
1418        let corrupted_json = JsonArray {
1419            inner: corrupted_binary,
1420        };
1421
1422        // Try to use json_path on corrupted data - the selector might fail or return unexpected results
1423        // This exercises the code path but may not produce an error depending on jsonb library behavior
1424        let _result = corrupted_json.json_path(0, "$.a");
1425        // We don't assert on the result as the behavior depends on the jsonb library
1426    }
1427
1428    #[test]
1429    fn test_decode_json_on_various_inputs() {
1430        // Test decode_json with various inputs
1431        let valid_jsonb = encode_json(r#"{"key": "value"}"#).unwrap();
1432        let decoded = decode_json(&valid_jsonb);
1433        assert!(decoded.contains("key"));
1434
1435        // Empty bytes - jsonb library handles this gracefully
1436        let decoded_empty = decode_json(&[]);
1437        // Just verify it doesn't panic
1438        let _ = decoded_empty;
1439
1440        // Random bytes - jsonb library handles this gracefully
1441        let decoded_random = decode_json(&[0xFF, 0xFE, 0x00]);
1442        // Just verify it doesn't panic
1443        let _ = decoded_random;
1444    }
1445}