lance_arrow/
json.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! JSON support for Apache Arrow.
5
6use std::convert::TryFrom;
7use std::sync::Arc;
8
9use arrow_array::builder::LargeBinaryBuilder;
10use arrow_array::{Array, ArrayRef, LargeBinaryArray, LargeStringArray, RecordBatch, StringArray};
11use arrow_data::ArrayData;
12use arrow_schema::{ArrowError, DataType, Field as ArrowField, Schema};
13
14use crate::ARROW_EXT_NAME_KEY;
15
16/// Arrow extension type name for JSON data (Lance internal)
17pub const JSON_EXT_NAME: &str = "lance.json";
18
19/// Arrow extension type name for JSON data (Arrow official)
20pub const ARROW_JSON_EXT_NAME: &str = "arrow.json";
21
22/// Check if a field is a JSON extension field (Lance internal JSONB storage)
23pub fn is_json_field(field: &ArrowField) -> bool {
24    field.data_type() == &DataType::LargeBinary
25        && field
26            .metadata()
27            .get(ARROW_EXT_NAME_KEY)
28            .map(|name| name == JSON_EXT_NAME)
29            .unwrap_or_default()
30}
31
32/// Check if a field is an Arrow JSON extension field (PyArrow pa.json() type)
33pub fn is_arrow_json_field(field: &ArrowField) -> bool {
34    // Arrow JSON extension type uses Utf8 or LargeUtf8 as storage type
35    (field.data_type() == &DataType::Utf8 || field.data_type() == &DataType::LargeUtf8)
36        && field
37            .metadata()
38            .get(ARROW_EXT_NAME_KEY)
39            .map(|name| name == ARROW_JSON_EXT_NAME)
40            .unwrap_or_default()
41}
42
43/// Check if a field or any of its descendants is a JSON field
44pub fn has_json_fields(field: &ArrowField) -> bool {
45    if is_json_field(field) {
46        return true;
47    }
48
49    match field.data_type() {
50        DataType::Struct(fields) => fields.iter().any(|f| has_json_fields(f)),
51        DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
52            has_json_fields(f)
53        }
54        DataType::Map(f, _) => has_json_fields(f),
55        _ => false,
56    }
57}
58
59/// Create a JSON field with the appropriate extension metadata
60pub fn json_field(name: &str, nullable: bool) -> ArrowField {
61    let mut field = ArrowField::new(name, DataType::LargeBinary, nullable);
62    let mut metadata = std::collections::HashMap::new();
63    metadata.insert(ARROW_EXT_NAME_KEY.to_string(), JSON_EXT_NAME.to_string());
64    field.set_metadata(metadata);
65    field
66}
67
68/// A specialized array for JSON data stored as JSONB binary format
69#[derive(Debug, Clone)]
70pub struct JsonArray {
71    inner: LargeBinaryArray,
72}
73
74impl JsonArray {
75    /// Create a new JsonArray from an iterator of JSON strings
76    pub fn try_from_iter<I, S>(iter: I) -> Result<Self, ArrowError>
77    where
78        I: IntoIterator<Item = Option<S>>,
79        S: AsRef<str>,
80    {
81        let mut builder = LargeBinaryBuilder::new();
82
83        for json_str in iter {
84            match json_str {
85                Some(s) => {
86                    let encoded = encode_json(s.as_ref()).map_err(|e| {
87                        ArrowError::InvalidArgumentError(format!("Failed to encode JSON: {}", e))
88                    })?;
89                    builder.append_value(&encoded);
90                }
91                None => builder.append_null(),
92            }
93        }
94
95        Ok(Self {
96            inner: builder.finish(),
97        })
98    }
99
100    /// Get the underlying LargeBinaryArray
101    pub fn into_inner(self) -> LargeBinaryArray {
102        self.inner
103    }
104
105    /// Get a reference to the underlying LargeBinaryArray
106    pub fn inner(&self) -> &LargeBinaryArray {
107        &self.inner
108    }
109
110    /// Get the value at index i as decoded JSON string
111    pub fn value(&self, i: usize) -> Result<String, ArrowError> {
112        if self.inner.is_null(i) {
113            return Err(ArrowError::InvalidArgumentError(
114                "Value is null".to_string(),
115            ));
116        }
117
118        let jsonb_bytes = self.inner.value(i);
119        decode_json(jsonb_bytes)
120            .map_err(|e| ArrowError::InvalidArgumentError(format!("Failed to decode JSON: {}", e)))
121    }
122
123    /// Get the value at index i as raw JSONB bytes
124    pub fn value_bytes(&self, i: usize) -> &[u8] {
125        self.inner.value(i)
126    }
127
128    /// Get JSONPath value from the JSON at index i
129    pub fn json_path(&self, i: usize, path: &str) -> Result<Option<String>, ArrowError> {
130        if self.inner.is_null(i) {
131            return Ok(None);
132        }
133
134        let jsonb_bytes = self.inner.value(i);
135        get_json_path(jsonb_bytes, path).map_err(|e| {
136            ArrowError::InvalidArgumentError(format!("Failed to extract JSONPath: {}", e))
137        })
138    }
139
140    /// Convert to Arrow string array (JSON as UTF-8)
141    pub fn to_arrow_json(&self) -> Result<ArrayRef, ArrowError> {
142        let mut builder = arrow_array::builder::StringBuilder::new();
143
144        for i in 0..self.len() {
145            if self.is_null(i) {
146                builder.append_null();
147            } else {
148                let jsonb_bytes = self.inner.value(i);
149                let json_str = decode_json(jsonb_bytes).map_err(|e| {
150                    ArrowError::InvalidArgumentError(format!("Failed to decode JSON: {}", e))
151                })?;
152                builder.append_value(&json_str);
153            }
154        }
155
156        // Return as UTF-8 string array (Arrow represents JSON as strings)
157        Ok(Arc::new(builder.finish()))
158    }
159}
160
161impl Array for JsonArray {
162    fn as_any(&self) -> &dyn std::any::Any {
163        self
164    }
165
166    fn to_data(&self) -> ArrayData {
167        self.inner.to_data()
168    }
169
170    fn into_data(self) -> ArrayData {
171        self.inner.into_data()
172    }
173
174    fn data_type(&self) -> &DataType {
175        &DataType::LargeBinary
176    }
177
178    fn slice(&self, offset: usize, length: usize) -> ArrayRef {
179        Arc::new(Self {
180            inner: self.inner.slice(offset, length),
181        })
182    }
183
184    fn len(&self) -> usize {
185        self.inner.len()
186    }
187
188    fn is_empty(&self) -> bool {
189        self.inner.is_empty()
190    }
191
192    fn offset(&self) -> usize {
193        self.inner.offset()
194    }
195
196    fn nulls(&self) -> Option<&arrow_buffer::NullBuffer> {
197        self.inner.nulls()
198    }
199
200    fn get_buffer_memory_size(&self) -> usize {
201        self.inner.get_buffer_memory_size()
202    }
203
204    fn get_array_memory_size(&self) -> usize {
205        self.inner.get_array_memory_size()
206    }
207}
208
209// TryFrom implementations for string arrays
210impl TryFrom<StringArray> for JsonArray {
211    type Error = ArrowError;
212
213    fn try_from(array: StringArray) -> Result<Self, Self::Error> {
214        Self::try_from(&array)
215    }
216}
217
218impl TryFrom<&StringArray> for JsonArray {
219    type Error = ArrowError;
220
221    fn try_from(array: &StringArray) -> Result<Self, Self::Error> {
222        let mut builder = LargeBinaryBuilder::with_capacity(array.len(), array.value_data().len());
223
224        for i in 0..array.len() {
225            if array.is_null(i) {
226                builder.append_null();
227            } else {
228                let json_str = array.value(i);
229                let encoded = encode_json(json_str).map_err(|e| {
230                    ArrowError::InvalidArgumentError(format!("Failed to encode JSON: {}", e))
231                })?;
232                builder.append_value(&encoded);
233            }
234        }
235
236        Ok(Self {
237            inner: builder.finish(),
238        })
239    }
240}
241
242impl TryFrom<LargeStringArray> for JsonArray {
243    type Error = ArrowError;
244
245    fn try_from(array: LargeStringArray) -> Result<Self, Self::Error> {
246        Self::try_from(&array)
247    }
248}
249
250impl TryFrom<&LargeStringArray> for JsonArray {
251    type Error = ArrowError;
252
253    fn try_from(array: &LargeStringArray) -> Result<Self, Self::Error> {
254        let mut builder = LargeBinaryBuilder::with_capacity(array.len(), array.value_data().len());
255
256        for i in 0..array.len() {
257            if array.is_null(i) {
258                builder.append_null();
259            } else {
260                let json_str = array.value(i);
261                let encoded = encode_json(json_str).map_err(|e| {
262                    ArrowError::InvalidArgumentError(format!("Failed to encode JSON: {}", e))
263                })?;
264                builder.append_value(&encoded);
265            }
266        }
267
268        Ok(Self {
269            inner: builder.finish(),
270        })
271    }
272}
273
274impl TryFrom<ArrayRef> for JsonArray {
275    type Error = ArrowError;
276
277    fn try_from(array_ref: ArrayRef) -> Result<Self, Self::Error> {
278        match array_ref.data_type() {
279            DataType::Utf8 => {
280                let string_array = array_ref
281                    .as_any()
282                    .downcast_ref::<StringArray>()
283                    .ok_or_else(|| {
284                        ArrowError::InvalidArgumentError("Failed to downcast to StringArray".into())
285                    })?;
286                Self::try_from(string_array)
287            }
288            DataType::LargeUtf8 => {
289                let large_string_array = array_ref
290                    .as_any()
291                    .downcast_ref::<LargeStringArray>()
292                    .ok_or_else(|| {
293                        ArrowError::InvalidArgumentError(
294                            "Failed to downcast to LargeStringArray".into(),
295                        )
296                    })?;
297                Self::try_from(large_string_array)
298            }
299            dt => Err(ArrowError::InvalidArgumentError(format!(
300                "Unsupported array type for JSON: {:?}. Expected Utf8 or LargeUtf8",
301                dt
302            ))),
303        }
304    }
305}
306
307/// Encode JSON string to JSONB format
308pub fn encode_json(json_str: &str) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
309    let value = jsonb::parse_value(json_str.as_bytes())?;
310    Ok(value.to_vec())
311}
312
313/// Decode JSONB bytes to JSON string
314pub fn decode_json(jsonb_bytes: &[u8]) -> Result<String, Box<dyn std::error::Error>> {
315    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
316    Ok(raw_jsonb.to_string())
317}
318
319/// Extract JSONPath value from JSONB
320fn get_json_path(
321    jsonb_bytes: &[u8],
322    path: &str,
323) -> Result<Option<String>, Box<dyn std::error::Error>> {
324    let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes())?;
325    let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
326    let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
327
328    match selector.select_values(&json_path) {
329        Ok(values) => {
330            if values.is_empty() {
331                Ok(None)
332            } else {
333                Ok(Some(values[0].to_string()))
334            }
335        }
336        Err(e) => Err(Box::new(e)),
337    }
338}
339
340/// Convert an Arrow JSON field to Lance JSON field (with JSONB storage)
341pub fn arrow_json_to_lance_json(field: &ArrowField) -> ArrowField {
342    if is_arrow_json_field(field) {
343        // Convert Arrow JSON (Utf8/LargeUtf8) to Lance JSON (LargeBinary)
344        // Preserve all metadata from the original field
345        let mut new_field =
346            ArrowField::new(field.name(), DataType::LargeBinary, field.is_nullable());
347
348        // Copy all metadata from the original field
349        let mut metadata = field.metadata().clone();
350        // Add/override the extension metadata for Lance JSON
351        metadata.insert(ARROW_EXT_NAME_KEY.to_string(), JSON_EXT_NAME.to_string());
352
353        new_field = new_field.with_metadata(metadata);
354        new_field
355    } else {
356        field.clone()
357    }
358}
359
360/// Convert a RecordBatch with Lance JSON columns (JSONB) back to Arrow JSON format (strings)
361pub fn convert_lance_json_to_arrow(
362    batch: &arrow_array::RecordBatch,
363) -> Result<arrow_array::RecordBatch, ArrowError> {
364    let schema = batch.schema();
365    let mut needs_conversion = false;
366    let mut new_fields = Vec::with_capacity(schema.fields().len());
367    let mut new_columns = Vec::with_capacity(batch.num_columns());
368
369    for (i, field) in schema.fields().iter().enumerate() {
370        let column = batch.column(i);
371
372        if is_json_field(field) {
373            needs_conversion = true;
374
375            // Convert the field back to Arrow JSON (Utf8)
376            let mut new_field = ArrowField::new(field.name(), DataType::Utf8, field.is_nullable());
377            let mut metadata = field.metadata().clone();
378            // Change from lance.json to arrow.json
379            metadata.insert(
380                ARROW_EXT_NAME_KEY.to_string(),
381                ARROW_JSON_EXT_NAME.to_string(),
382            );
383            new_field.set_metadata(metadata);
384            new_fields.push(new_field);
385
386            // Convert the data from JSONB to JSON strings
387            if batch.num_rows() == 0 {
388                // For empty batches, create an empty String array
389                let empty_strings = arrow_array::builder::StringBuilder::new().finish();
390                new_columns.push(Arc::new(empty_strings) as ArrayRef);
391            } else {
392                // Convert JSONB back to JSON strings
393                let binary_array = column
394                    .as_any()
395                    .downcast_ref::<LargeBinaryArray>()
396                    .ok_or_else(|| {
397                        ArrowError::InvalidArgumentError(format!(
398                            "Lance JSON field '{}' has unexpected type",
399                            field.name()
400                        ))
401                    })?;
402
403                let mut builder = arrow_array::builder::StringBuilder::new();
404                for i in 0..binary_array.len() {
405                    if binary_array.is_null(i) {
406                        builder.append_null();
407                    } else {
408                        let jsonb_bytes = binary_array.value(i);
409                        let json_str = decode_json(jsonb_bytes).map_err(|e| {
410                            ArrowError::InvalidArgumentError(format!(
411                                "Failed to decode JSON: {}",
412                                e
413                            ))
414                        })?;
415                        builder.append_value(&json_str);
416                    }
417                }
418                new_columns.push(Arc::new(builder.finish()) as ArrayRef);
419            }
420        } else {
421            new_fields.push(field.as_ref().clone());
422            new_columns.push(column.clone());
423        }
424    }
425
426    if needs_conversion {
427        let new_schema = Arc::new(Schema::new_with_metadata(
428            new_fields,
429            schema.metadata().clone(),
430        ));
431        RecordBatch::try_new(new_schema, new_columns)
432    } else {
433        // No conversion needed, return original batch
434        Ok(batch.clone())
435    }
436}
437
438/// Convert a RecordBatch with Arrow JSON columns to Lance JSON format (JSONB)
439pub fn convert_json_columns(
440    batch: &arrow_array::RecordBatch,
441) -> Result<arrow_array::RecordBatch, ArrowError> {
442    let schema = batch.schema();
443    let mut needs_conversion = false;
444    let mut new_fields = Vec::with_capacity(schema.fields().len());
445    let mut new_columns = Vec::with_capacity(batch.num_columns());
446
447    for (i, field) in schema.fields().iter().enumerate() {
448        let column = batch.column(i);
449
450        if is_arrow_json_field(field) {
451            needs_conversion = true;
452
453            // Convert the field metadata
454            new_fields.push(arrow_json_to_lance_json(field));
455
456            // Convert the data from JSON strings to JSONB
457            if batch.num_rows() == 0 {
458                // For empty batches, create an empty LargeBinary array
459                let empty_binary = LargeBinaryBuilder::new().finish();
460                new_columns.push(Arc::new(empty_binary) as ArrayRef);
461            } else {
462                // Convert non-empty data
463                let json_array =
464                    if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
465                        JsonArray::try_from(string_array)?
466                    } else if let Some(large_string_array) =
467                        column.as_any().downcast_ref::<LargeStringArray>()
468                    {
469                        JsonArray::try_from(large_string_array)?
470                    } else {
471                        return Err(ArrowError::InvalidArgumentError(format!(
472                            "Arrow JSON field '{}' has unexpected storage type: {:?}",
473                            field.name(),
474                            column.data_type()
475                        )));
476                    };
477
478                let binary_array = json_array.into_inner();
479
480                new_columns.push(Arc::new(binary_array) as ArrayRef);
481            }
482        } else {
483            new_fields.push(field.as_ref().clone());
484            new_columns.push(column.clone());
485        }
486    }
487
488    if needs_conversion {
489        let new_schema = Arc::new(Schema::new_with_metadata(
490            new_fields,
491            schema.metadata().clone(),
492        ));
493        RecordBatch::try_new(new_schema, new_columns)
494    } else {
495        // No conversion needed, return original batch
496        Ok(batch.clone())
497    }
498}
499
500#[cfg(test)]
501mod tests {
502    use super::*;
503
504    #[test]
505    fn test_json_field_creation() {
506        let field = json_field("data", true);
507        assert_eq!(field.name(), "data");
508        assert_eq!(field.data_type(), &DataType::LargeBinary);
509        assert!(field.is_nullable());
510        assert!(is_json_field(&field));
511    }
512
513    #[test]
514    fn test_json_array_from_strings() {
515        let json_strings = vec![
516            Some(r#"{"name": "Alice", "age": 30}"#),
517            None,
518            Some(r#"{"name": "Bob", "age": 25}"#),
519        ];
520
521        let array = JsonArray::try_from_iter(json_strings).unwrap();
522        assert_eq!(array.len(), 3);
523        assert!(!array.is_null(0));
524        assert!(array.is_null(1));
525        assert!(!array.is_null(2));
526
527        let decoded = array.value(0).unwrap();
528        assert!(decoded.contains("Alice"));
529    }
530
531    #[test]
532    fn test_json_array_from_string_array() {
533        let string_array = StringArray::from(vec![
534            Some(r#"{"name": "Alice"}"#),
535            Some(r#"{"name": "Bob"}"#),
536            None,
537        ]);
538
539        let json_array = JsonArray::try_from(string_array).unwrap();
540        assert_eq!(json_array.len(), 3);
541        assert!(!json_array.is_null(0));
542        assert!(!json_array.is_null(1));
543        assert!(json_array.is_null(2));
544    }
545
546    #[test]
547    fn test_json_path_extraction() {
548        let json_array = JsonArray::try_from_iter(vec![
549            Some(r#"{"user": {"name": "Alice", "age": 30}}"#),
550            Some(r#"{"user": {"name": "Bob"}}"#),
551        ])
552        .unwrap();
553
554        let name = json_array.json_path(0, "$.user.name").unwrap();
555        assert_eq!(name, Some("\"Alice\"".to_string()));
556
557        let age = json_array.json_path(1, "$.user.age").unwrap();
558        assert_eq!(age, None);
559    }
560
561    #[test]
562    fn test_convert_json_columns() {
563        // Create a batch with Arrow JSON column
564        let json_strings = vec![Some(r#"{"name": "Alice"}"#), Some(r#"{"name": "Bob"}"#)];
565        let json_arr = StringArray::from(json_strings);
566
567        // Create field with arrow.json extension
568        let mut field = ArrowField::new("data", DataType::Utf8, false);
569        let mut metadata = std::collections::HashMap::new();
570        metadata.insert(
571            ARROW_EXT_NAME_KEY.to_string(),
572            ARROW_JSON_EXT_NAME.to_string(),
573        );
574        field.set_metadata(metadata);
575
576        let schema = Arc::new(Schema::new(vec![field]));
577        let batch = RecordBatch::try_new(schema, vec![Arc::new(json_arr) as ArrayRef]).unwrap();
578
579        // Convert the batch
580        let converted = convert_json_columns(&batch).unwrap();
581
582        // Check the converted schema
583        assert_eq!(converted.num_columns(), 1);
584        let converted_schema = converted.schema();
585        let converted_field = converted_schema.field(0);
586        assert_eq!(converted_field.data_type(), &DataType::LargeBinary);
587        assert_eq!(
588            converted_field.metadata().get(ARROW_EXT_NAME_KEY),
589            Some(&JSON_EXT_NAME.to_string())
590        );
591
592        // Check the data was converted
593        let converted_column = converted.column(0);
594        assert_eq!(converted_column.data_type(), &DataType::LargeBinary);
595        assert_eq!(converted_column.len(), 2);
596
597        // Verify the data is valid JSONB
598        let binary_array = converted_column
599            .as_any()
600            .downcast_ref::<LargeBinaryArray>()
601            .unwrap();
602        for i in 0..binary_array.len() {
603            let jsonb_bytes = binary_array.value(i);
604            let decoded = decode_json(jsonb_bytes).unwrap();
605            assert!(decoded.contains("name"));
606        }
607    }
608}