1use std::convert::TryFrom;
7use std::sync::Arc;
8
9use arrow_array::builder::LargeBinaryBuilder;
10use arrow_array::{Array, ArrayRef, LargeBinaryArray, LargeStringArray, RecordBatch, StringArray};
11use arrow_schema::{ArrowError, DataType, Field as ArrowField, Schema};
12
13use crate::ARROW_EXT_NAME_KEY;
14
15pub const JSON_EXT_NAME: &str = "lance.json";
17
18pub const ARROW_JSON_EXT_NAME: &str = "arrow.json";
20
21pub fn is_json_field(field: &ArrowField) -> bool {
23 field.data_type() == &DataType::LargeBinary
24 && field
25 .metadata()
26 .get(ARROW_EXT_NAME_KEY)
27 .map(|name| name == JSON_EXT_NAME)
28 .unwrap_or_default()
29}
30
31pub fn is_arrow_json_field(field: &ArrowField) -> bool {
33 (field.data_type() == &DataType::Utf8 || field.data_type() == &DataType::LargeUtf8)
35 && field
36 .metadata()
37 .get(ARROW_EXT_NAME_KEY)
38 .map(|name| name == ARROW_JSON_EXT_NAME)
39 .unwrap_or_default()
40}
41
42pub fn has_json_fields(field: &ArrowField) -> bool {
44 if is_json_field(field) {
45 return true;
46 }
47
48 match field.data_type() {
49 DataType::Struct(fields) => fields.iter().any(|f| has_json_fields(f)),
50 DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
51 has_json_fields(f)
52 }
53 DataType::Map(f, _) => has_json_fields(f),
54 _ => false,
55 }
56}
57
58pub fn json_field(name: &str, nullable: bool) -> ArrowField {
60 let mut field = ArrowField::new(name, DataType::LargeBinary, nullable);
61 let mut metadata = std::collections::HashMap::new();
62 metadata.insert(ARROW_EXT_NAME_KEY.to_string(), JSON_EXT_NAME.to_string());
63 field.set_metadata(metadata);
64 field
65}
66
67#[derive(Debug, Clone)]
69pub struct JsonArray {
70 inner: LargeBinaryArray,
71}
72
73impl JsonArray {
74 pub fn try_from_iter<I, S>(iter: I) -> Result<Self, ArrowError>
76 where
77 I: IntoIterator<Item = Option<S>>,
78 S: AsRef<str>,
79 {
80 let mut builder = LargeBinaryBuilder::new();
81
82 for json_str in iter {
83 match json_str {
84 Some(s) => {
85 let encoded = encode_json(s.as_ref()).map_err(|e| {
86 ArrowError::InvalidArgumentError(format!("Failed to encode JSON: {}", e))
87 })?;
88 builder.append_value(&encoded);
89 }
90 None => builder.append_null(),
91 }
92 }
93
94 Ok(Self {
95 inner: builder.finish(),
96 })
97 }
98
99 pub fn into_inner(self) -> LargeBinaryArray {
101 self.inner
102 }
103
104 pub fn inner(&self) -> &LargeBinaryArray {
106 &self.inner
107 }
108
109 pub fn value(&self, i: usize) -> Result<String, ArrowError> {
111 if self.inner.is_null(i) {
112 return Err(ArrowError::InvalidArgumentError(
113 "Value is null".to_string(),
114 ));
115 }
116
117 let jsonb_bytes = self.inner.value(i);
118 Ok(decode_json(jsonb_bytes))
119 }
120
121 pub fn value_bytes(&self, i: usize) -> &[u8] {
123 self.inner.value(i)
124 }
125
126 pub fn json_path(&self, i: usize, path: &str) -> Result<Option<String>, ArrowError> {
128 if self.inner.is_null(i) {
129 return Ok(None);
130 }
131
132 let jsonb_bytes = self.inner.value(i);
133 get_json_path(jsonb_bytes, path).map_err(|e| {
134 ArrowError::InvalidArgumentError(format!("Failed to extract JSONPath: {}", e))
135 })
136 }
137
138 pub fn to_arrow_json(&self) -> ArrayRef {
140 let mut builder = arrow_array::builder::StringBuilder::new();
141
142 for i in 0..self.inner.len() {
143 if self.inner.is_null(i) {
144 builder.append_null();
145 } else {
146 let jsonb_bytes = self.inner.value(i);
147 let json_str = decode_json(jsonb_bytes);
148 builder.append_value(&json_str);
149 }
150 }
151
152 Arc::new(builder.finish())
154 }
155
156 pub fn len(&self) -> usize {
157 self.inner.len()
158 }
159
160 pub fn is_empty(&self) -> bool {
161 self.inner.is_empty()
162 }
163
164 pub fn is_null(&self, i: usize) -> bool {
165 self.inner.is_null(i)
166 }
167}
168
169impl TryFrom<StringArray> for JsonArray {
171 type Error = ArrowError;
172
173 fn try_from(array: StringArray) -> Result<Self, Self::Error> {
174 Self::try_from(&array)
175 }
176}
177
178impl TryFrom<&StringArray> for JsonArray {
179 type Error = ArrowError;
180
181 fn try_from(array: &StringArray) -> Result<Self, Self::Error> {
182 let mut builder = LargeBinaryBuilder::with_capacity(array.len(), array.value_data().len());
183
184 for i in 0..array.len() {
185 if array.is_null(i) {
186 builder.append_null();
187 } else {
188 let json_str = array.value(i);
189 let encoded = encode_json(json_str).map_err(|e| {
190 ArrowError::InvalidArgumentError(format!("Failed to encode JSON: {}", e))
191 })?;
192 builder.append_value(&encoded);
193 }
194 }
195
196 Ok(Self {
197 inner: builder.finish(),
198 })
199 }
200}
201
202impl TryFrom<LargeStringArray> for JsonArray {
203 type Error = ArrowError;
204
205 fn try_from(array: LargeStringArray) -> Result<Self, Self::Error> {
206 Self::try_from(&array)
207 }
208}
209
210impl TryFrom<&LargeStringArray> for JsonArray {
211 type Error = ArrowError;
212
213 fn try_from(array: &LargeStringArray) -> Result<Self, Self::Error> {
214 let mut builder = LargeBinaryBuilder::with_capacity(array.len(), array.value_data().len());
215
216 for i in 0..array.len() {
217 if array.is_null(i) {
218 builder.append_null();
219 } else {
220 let json_str = array.value(i);
221 let encoded = encode_json(json_str).map_err(|e| {
222 ArrowError::InvalidArgumentError(format!("Failed to encode JSON: {}", e))
223 })?;
224 builder.append_value(&encoded);
225 }
226 }
227
228 Ok(Self {
229 inner: builder.finish(),
230 })
231 }
232}
233
234impl TryFrom<ArrayRef> for JsonArray {
235 type Error = ArrowError;
236
237 fn try_from(array_ref: ArrayRef) -> Result<Self, Self::Error> {
238 match array_ref.data_type() {
239 DataType::Utf8 => {
240 let string_array = array_ref
242 .as_any()
243 .downcast_ref::<StringArray>()
244 .expect("DataType::Utf8 array must be StringArray");
245 Self::try_from(string_array)
246 }
247 DataType::LargeUtf8 => {
248 let large_string_array = array_ref
250 .as_any()
251 .downcast_ref::<LargeStringArray>()
252 .expect("DataType::LargeUtf8 array must be LargeStringArray");
253 Self::try_from(large_string_array)
254 }
255 dt => Err(ArrowError::InvalidArgumentError(format!(
256 "Unsupported array type for JSON: {:?}. Expected Utf8 or LargeUtf8",
257 dt
258 ))),
259 }
260 }
261}
262
263pub fn encode_json(json_str: &str) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
265 let value = jsonb::parse_value(json_str.as_bytes())?;
266 Ok(value.to_vec())
267}
268
269pub fn decode_json(jsonb_bytes: &[u8]) -> String {
271 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
272 raw_jsonb.to_string()
273}
274
275fn get_json_path(
277 jsonb_bytes: &[u8],
278 path: &str,
279) -> Result<Option<String>, Box<dyn std::error::Error>> {
280 let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes())?;
281 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
282 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
283
284 let values = selector.select_values(&json_path)?;
285 if values.is_empty() {
286 Ok(None)
287 } else {
288 Ok(Some(values[0].to_string()))
289 }
290}
291
292pub fn arrow_json_to_lance_json(field: &ArrowField) -> ArrowField {
294 if is_arrow_json_field(field) {
295 let mut new_field =
298 ArrowField::new(field.name(), DataType::LargeBinary, field.is_nullable());
299
300 let mut metadata = field.metadata().clone();
302 metadata.insert(ARROW_EXT_NAME_KEY.to_string(), JSON_EXT_NAME.to_string());
304
305 new_field = new_field.with_metadata(metadata);
306 new_field
307 } else {
308 field.clone()
309 }
310}
311
312pub fn convert_lance_json_to_arrow(
314 batch: &arrow_array::RecordBatch,
315) -> Result<arrow_array::RecordBatch, ArrowError> {
316 let schema = batch.schema();
317 let mut needs_conversion = false;
318 let mut new_fields = Vec::with_capacity(schema.fields().len());
319 let mut new_columns = Vec::with_capacity(batch.num_columns());
320
321 for (i, field) in schema.fields().iter().enumerate() {
322 let column = batch.column(i);
323
324 if is_json_field(field) {
325 needs_conversion = true;
326
327 let mut new_field = ArrowField::new(field.name(), DataType::Utf8, field.is_nullable());
329 let mut metadata = field.metadata().clone();
330 metadata.insert(
332 ARROW_EXT_NAME_KEY.to_string(),
333 ARROW_JSON_EXT_NAME.to_string(),
334 );
335 new_field.set_metadata(metadata);
336 new_fields.push(new_field);
337
338 if batch.num_rows() == 0 {
340 let empty_strings = arrow_array::builder::StringBuilder::new().finish();
342 new_columns.push(Arc::new(empty_strings) as ArrayRef);
343 } else {
344 let binary_array = column
347 .as_any()
348 .downcast_ref::<LargeBinaryArray>()
349 .expect("Lance JSON field must be LargeBinaryArray");
350
351 let mut builder = arrow_array::builder::StringBuilder::new();
352 for i in 0..binary_array.len() {
353 if binary_array.is_null(i) {
354 builder.append_null();
355 } else {
356 let jsonb_bytes = binary_array.value(i);
357 let json_str = decode_json(jsonb_bytes);
358 builder.append_value(&json_str);
359 }
360 }
361 new_columns.push(Arc::new(builder.finish()) as ArrayRef);
362 }
363 } else {
364 new_fields.push(field.as_ref().clone());
365 new_columns.push(column.clone());
366 }
367 }
368
369 if needs_conversion {
370 let new_schema = Arc::new(Schema::new_with_metadata(
371 new_fields,
372 schema.metadata().clone(),
373 ));
374 RecordBatch::try_new(new_schema, new_columns)
375 } else {
376 Ok(batch.clone())
378 }
379}
380
381pub fn convert_json_columns(
383 batch: &arrow_array::RecordBatch,
384) -> Result<arrow_array::RecordBatch, ArrowError> {
385 let schema = batch.schema();
386 let mut needs_conversion = false;
387 let mut new_fields = Vec::with_capacity(schema.fields().len());
388 let mut new_columns = Vec::with_capacity(batch.num_columns());
389
390 for (i, field) in schema.fields().iter().enumerate() {
391 let column = batch.column(i);
392
393 if is_arrow_json_field(field) {
394 needs_conversion = true;
395
396 new_fields.push(arrow_json_to_lance_json(field));
398
399 if batch.num_rows() == 0 {
401 let empty_binary = LargeBinaryBuilder::new().finish();
403 new_columns.push(Arc::new(empty_binary) as ArrayRef);
404 } else {
405 let json_array =
408 if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
409 JsonArray::try_from(string_array)?
410 } else {
411 let large_string_array = column
412 .as_any()
413 .downcast_ref::<LargeStringArray>()
414 .expect("Arrow JSON field must be Utf8 or LargeUtf8");
415 JsonArray::try_from(large_string_array)?
416 };
417
418 let binary_array = json_array.into_inner();
419
420 new_columns.push(Arc::new(binary_array) as ArrayRef);
421 }
422 } else {
423 new_fields.push(field.as_ref().clone());
424 new_columns.push(column.clone());
425 }
426 }
427
428 if needs_conversion {
429 let new_schema = Arc::new(Schema::new_with_metadata(
430 new_fields,
431 schema.metadata().clone(),
432 ));
433 RecordBatch::try_new(new_schema, new_columns)
434 } else {
435 Ok(batch.clone())
437 }
438}
439
440#[cfg(test)]
441mod tests {
442 use super::*;
443
444 #[test]
445 fn test_json_field_creation() {
446 let field = json_field("data", true);
447 assert_eq!(field.name(), "data");
448 assert_eq!(field.data_type(), &DataType::LargeBinary);
449 assert!(field.is_nullable());
450 assert!(is_json_field(&field));
451 }
452
453 #[test]
454 fn test_json_array_from_strings() {
455 let json_strings = vec![
456 Some(r#"{"name": "Alice", "age": 30}"#),
457 None,
458 Some(r#"{"name": "Bob", "age": 25}"#),
459 ];
460
461 let array = JsonArray::try_from_iter(json_strings).unwrap();
462 assert_eq!(array.len(), 3);
463 assert!(!array.is_null(0));
464 assert!(array.is_null(1));
465 assert!(!array.is_null(2));
466
467 let decoded = array.value(0).unwrap();
468 assert!(decoded.contains("Alice"));
469 }
470
471 #[test]
472 fn test_json_array_from_string_array() {
473 let string_array = StringArray::from(vec![
474 Some(r#"{"name": "Alice"}"#),
475 Some(r#"{"name": "Bob"}"#),
476 None,
477 ]);
478
479 let json_array = JsonArray::try_from(string_array).unwrap();
480 assert_eq!(json_array.len(), 3);
481 assert!(!json_array.is_null(0));
482 assert!(!json_array.is_null(1));
483 assert!(json_array.is_null(2));
484 }
485
486 #[test]
487 fn test_json_path_extraction() {
488 let json_array = JsonArray::try_from_iter(vec![
489 Some(r#"{"user": {"name": "Alice", "age": 30}}"#),
490 Some(r#"{"user": {"name": "Bob"}}"#),
491 ])
492 .unwrap();
493
494 let name = json_array.json_path(0, "$.user.name").unwrap();
495 assert_eq!(name, Some("\"Alice\"".to_string()));
496
497 let age = json_array.json_path(1, "$.user.age").unwrap();
498 assert_eq!(age, None);
499 }
500
501 #[test]
502 fn test_convert_json_columns() {
503 let json_strings = vec![Some(r#"{"name": "Alice"}"#), Some(r#"{"name": "Bob"}"#)];
505 let json_arr = StringArray::from(json_strings);
506
507 let mut field = ArrowField::new("data", DataType::Utf8, false);
509 let mut metadata = std::collections::HashMap::new();
510 metadata.insert(
511 ARROW_EXT_NAME_KEY.to_string(),
512 ARROW_JSON_EXT_NAME.to_string(),
513 );
514 field.set_metadata(metadata);
515
516 let schema = Arc::new(Schema::new(vec![field]));
517 let batch = RecordBatch::try_new(schema, vec![Arc::new(json_arr) as ArrayRef]).unwrap();
518
519 let converted = convert_json_columns(&batch).unwrap();
521
522 assert_eq!(converted.num_columns(), 1);
524 let converted_schema = converted.schema();
525 let converted_field = converted_schema.field(0);
526 assert_eq!(converted_field.data_type(), &DataType::LargeBinary);
527 assert_eq!(
528 converted_field.metadata().get(ARROW_EXT_NAME_KEY),
529 Some(&JSON_EXT_NAME.to_string())
530 );
531
532 let converted_column = converted.column(0);
534 assert_eq!(converted_column.data_type(), &DataType::LargeBinary);
535 assert_eq!(converted_column.len(), 2);
536
537 let binary_array = converted_column
539 .as_any()
540 .downcast_ref::<LargeBinaryArray>()
541 .unwrap();
542 for i in 0..binary_array.len() {
543 let jsonb_bytes = binary_array.value(i);
544 let decoded = decode_json(jsonb_bytes);
545 assert!(decoded.contains("name"));
546 }
547 }
548
549 #[test]
550 fn test_has_json_fields() {
551 let json_f = json_field("data", true);
553 assert!(has_json_fields(&json_f));
554
555 let non_json = ArrowField::new("data", DataType::Utf8, true);
557 assert!(!has_json_fields(&non_json));
558
559 let struct_field = ArrowField::new(
561 "struct",
562 DataType::Struct(vec![json_field("nested_json", true)].into()),
563 true,
564 );
565 assert!(has_json_fields(&struct_field));
566
567 let struct_no_json = ArrowField::new(
569 "struct",
570 DataType::Struct(vec![ArrowField::new("text", DataType::Utf8, true)].into()),
571 true,
572 );
573 assert!(!has_json_fields(&struct_no_json));
574
575 let list_field = ArrowField::new(
577 "list",
578 DataType::List(Arc::new(json_field("item", true))),
579 true,
580 );
581 assert!(has_json_fields(&list_field));
582
583 let large_list_field = ArrowField::new(
585 "large_list",
586 DataType::LargeList(Arc::new(json_field("item", true))),
587 true,
588 );
589 assert!(has_json_fields(&large_list_field));
590
591 let fixed_list_field = ArrowField::new(
593 "fixed_list",
594 DataType::FixedSizeList(Arc::new(json_field("item", true)), 3),
595 true,
596 );
597 assert!(has_json_fields(&fixed_list_field));
598
599 let map_field = ArrowField::new(
601 "map",
602 DataType::Map(
603 Arc::new(ArrowField::new(
604 "entries",
605 DataType::Struct(
606 vec![
607 ArrowField::new("key", DataType::Utf8, false),
608 json_field("value", true),
609 ]
610 .into(),
611 ),
612 false,
613 )),
614 false,
615 ),
616 true,
617 );
618 assert!(has_json_fields(&map_field));
619 }
620
621 #[test]
622 fn test_json_array_inner() {
623 let json_array = JsonArray::try_from_iter(vec![Some(r#"{"a": 1}"#)]).unwrap();
624 let inner = json_array.inner();
625 assert_eq!(inner.len(), 1);
626 }
627
628 #[test]
629 fn test_json_array_value_null_error() {
630 let json_array = JsonArray::try_from_iter(vec![None::<&str>]).unwrap();
631 let result = json_array.value(0);
632 assert!(result.is_err());
633 assert!(result.unwrap_err().to_string().contains("null"));
634 }
635
636 #[test]
637 fn test_json_array_value_bytes() {
638 let json_array = JsonArray::try_from_iter(vec![Some(r#"{"a": 1}"#)]).unwrap();
639 let bytes = json_array.value_bytes(0);
640 assert!(!bytes.is_empty());
641 }
642
643 #[test]
644 fn test_json_path_with_null() {
645 let json_array =
646 JsonArray::try_from_iter(vec![Some(r#"{"user": {"name": "Alice"}}"#), None::<&str>])
647 .unwrap();
648
649 let result = json_array.json_path(1, "$.user.name").unwrap();
650 assert_eq!(result, None);
651 }
652
653 #[test]
654 fn test_to_arrow_json() {
655 let json_array = JsonArray::try_from_iter(vec![
656 Some(r#"{"name": "Alice"}"#),
657 None::<&str>,
658 Some(r#"{"name": "Bob"}"#),
659 ])
660 .unwrap();
661
662 let arrow_json = json_array.to_arrow_json();
663 assert_eq!(arrow_json.len(), 3);
664 assert!(!arrow_json.is_null(0));
665 assert!(arrow_json.is_null(1));
666 assert!(!arrow_json.is_null(2));
667
668 let string_array = arrow_json.as_any().downcast_ref::<StringArray>().unwrap();
669 assert!(string_array.value(0).contains("Alice"));
670 assert!(string_array.value(2).contains("Bob"));
671 }
672
673 #[test]
674 fn test_json_array_trait_methods() {
675 let json_array =
676 JsonArray::try_from_iter(vec![Some(r#"{"a": 1}"#), Some(r#"{"b": 2}"#)]).unwrap();
677
678 assert_eq!(json_array.len(), 2);
680 assert!(!json_array.is_empty());
681 assert!(!json_array.is_null(0));
682
683 assert_eq!(json_array.inner().data_type(), &DataType::LargeBinary);
685 assert_eq!(json_array.inner().len(), 2);
686 }
687
688 #[test]
689 fn test_json_array_empty() {
690 let json_array = JsonArray::try_from_iter(Vec::<Option<&str>>::new()).unwrap();
691 assert!(json_array.is_empty());
692 assert_eq!(json_array.len(), 0);
693 }
694
695 #[test]
696 fn test_try_from_large_string_array() {
697 let large_string_array = LargeStringArray::from(vec![
698 Some(r#"{"name": "Alice"}"#),
699 Some(r#"{"name": "Bob"}"#),
700 None,
701 ]);
702
703 let json_array = JsonArray::try_from(&large_string_array).unwrap();
705 assert_eq!(json_array.len(), 3);
706 assert!(!json_array.is_null(0));
707 assert!(!json_array.is_null(1));
708 assert!(json_array.is_null(2));
709
710 let large_string_array2 = LargeStringArray::from(vec![Some(r#"{"x": 1}"#)]);
712 let json_array2 = JsonArray::try_from(large_string_array2).unwrap();
713 assert_eq!(json_array2.len(), 1);
714 }
715
716 #[test]
717 fn test_try_from_array_ref() {
718 let string_array: ArrayRef = Arc::new(StringArray::from(vec![
720 Some(r#"{"a": 1}"#),
721 Some(r#"{"b": 2}"#),
722 ]));
723 let json_array = JsonArray::try_from(string_array).unwrap();
724 assert_eq!(json_array.len(), 2);
725
726 let large_string_array: ArrayRef = Arc::new(LargeStringArray::from(vec![
728 Some(r#"{"c": 3}"#),
729 Some(r#"{"d": 4}"#),
730 ]));
731 let json_array2 = JsonArray::try_from(large_string_array).unwrap();
732 assert_eq!(json_array2.len(), 2);
733
734 let int_array: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]));
736 let result = JsonArray::try_from(int_array);
737 assert!(result.is_err());
738 assert!(result.unwrap_err().to_string().contains("Unsupported"));
739 }
740
741 #[test]
742 fn test_arrow_json_to_lance_json_non_json_field() {
743 let field = ArrowField::new("text", DataType::Utf8, true);
745 let converted = arrow_json_to_lance_json(&field);
746 assert_eq!(converted.data_type(), &DataType::Utf8);
747 assert_eq!(converted.name(), "text");
748 }
749
750 #[test]
751 fn test_convert_lance_json_to_arrow() {
752 let json_array = JsonArray::try_from_iter(vec![
754 Some(r#"{"name": "Alice"}"#),
755 None::<&str>,
756 Some(r#"{"name": "Bob"}"#),
757 ])
758 .unwrap();
759
760 let lance_json_field = json_field("data", true);
761 let schema = Arc::new(Schema::new(vec![lance_json_field]));
762 let batch =
763 RecordBatch::try_new(schema, vec![Arc::new(json_array.into_inner()) as ArrayRef])
764 .unwrap();
765
766 let converted = convert_lance_json_to_arrow(&batch).unwrap();
768
769 let converted_schema = converted.schema();
771 let converted_field = converted_schema.field(0);
772 assert_eq!(converted_field.data_type(), &DataType::Utf8);
773 assert_eq!(
774 converted_field.metadata().get(ARROW_EXT_NAME_KEY),
775 Some(&ARROW_JSON_EXT_NAME.to_string())
776 );
777
778 let string_array = converted
780 .column(0)
781 .as_any()
782 .downcast_ref::<StringArray>()
783 .unwrap();
784 assert!(!string_array.is_null(0));
785 assert!(string_array.is_null(1));
786 assert!(!string_array.is_null(2));
787 assert!(string_array.value(0).contains("Alice"));
788 assert!(string_array.value(2).contains("Bob"));
789 }
790
791 #[test]
792 fn test_convert_lance_json_to_arrow_empty_batch() {
793 let lance_json_field = json_field("data", true);
795 let schema = Arc::new(Schema::new(vec![lance_json_field]));
796 let empty_binary = LargeBinaryBuilder::new().finish();
797 let batch = RecordBatch::try_new(schema, vec![Arc::new(empty_binary) as ArrayRef]).unwrap();
798
799 let converted = convert_lance_json_to_arrow(&batch).unwrap();
801 assert_eq!(converted.num_rows(), 0);
802 assert_eq!(converted.schema().field(0).data_type(), &DataType::Utf8);
803 }
804
805 #[test]
806 fn test_convert_lance_json_to_arrow_no_json_columns() {
807 let field = ArrowField::new("text", DataType::Utf8, true);
809 let schema = Arc::new(Schema::new(vec![field]));
810 let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
811 let batch = RecordBatch::try_new(schema, vec![Arc::new(string_array) as ArrayRef]).unwrap();
812
813 let converted = convert_lance_json_to_arrow(&batch).unwrap();
815 assert_eq!(converted.num_columns(), 1);
816 assert_eq!(converted.schema().field(0).data_type(), &DataType::Utf8);
817 }
818
819 #[test]
820 fn test_convert_json_columns_empty_batch() {
821 let mut field = ArrowField::new("data", DataType::Utf8, false);
823 let mut metadata = std::collections::HashMap::new();
824 metadata.insert(
825 ARROW_EXT_NAME_KEY.to_string(),
826 ARROW_JSON_EXT_NAME.to_string(),
827 );
828 field.set_metadata(metadata);
829
830 let schema = Arc::new(Schema::new(vec![field]));
831 let empty_strings = arrow_array::builder::StringBuilder::new().finish();
832 let batch =
833 RecordBatch::try_new(schema, vec![Arc::new(empty_strings) as ArrayRef]).unwrap();
834
835 let converted = convert_json_columns(&batch).unwrap();
836 assert_eq!(converted.num_rows(), 0);
837 assert_eq!(
838 converted.schema().field(0).data_type(),
839 &DataType::LargeBinary
840 );
841 }
842
843 #[test]
844 fn test_convert_json_columns_large_string() {
845 let json_strings = LargeStringArray::from(vec![
847 Some(r#"{"name": "Alice"}"#),
848 Some(r#"{"name": "Bob"}"#),
849 ]);
850
851 let mut field = ArrowField::new("data", DataType::LargeUtf8, false);
852 let mut metadata = std::collections::HashMap::new();
853 metadata.insert(
854 ARROW_EXT_NAME_KEY.to_string(),
855 ARROW_JSON_EXT_NAME.to_string(),
856 );
857 field.set_metadata(metadata);
858
859 let schema = Arc::new(Schema::new(vec![field]));
860 let batch = RecordBatch::try_new(schema, vec![Arc::new(json_strings) as ArrayRef]).unwrap();
861
862 let converted = convert_json_columns(&batch).unwrap();
863 assert_eq!(converted.num_columns(), 1);
864 assert_eq!(
865 converted.schema().field(0).data_type(),
866 &DataType::LargeBinary
867 );
868 assert_eq!(converted.num_rows(), 2);
869 }
870
871 #[test]
872 fn test_convert_json_columns_no_json_columns() {
873 let field = ArrowField::new("text", DataType::Utf8, true);
875 let schema = Arc::new(Schema::new(vec![field]));
876 let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
877 let batch = RecordBatch::try_new(schema, vec![Arc::new(string_array) as ArrayRef]).unwrap();
878
879 let converted = convert_json_columns(&batch).unwrap();
881 assert_eq!(converted.num_columns(), 1);
882 assert_eq!(converted.schema().field(0).data_type(), &DataType::Utf8);
883 }
884
885 #[test]
886 fn test_convert_json_columns_mixed_columns() {
887 let json_strings = StringArray::from(vec![
889 Some(r#"{"name": "Alice"}"#),
890 Some(r#"{"name": "Bob"}"#),
891 ]);
892 let text_strings = StringArray::from(vec![Some("hello"), Some("world")]);
893
894 let mut json_field = ArrowField::new("json_data", DataType::Utf8, false);
895 let mut metadata = std::collections::HashMap::new();
896 metadata.insert(
897 ARROW_EXT_NAME_KEY.to_string(),
898 ARROW_JSON_EXT_NAME.to_string(),
899 );
900 json_field.set_metadata(metadata);
901
902 let text_field = ArrowField::new("text_data", DataType::Utf8, true);
903
904 let schema = Arc::new(Schema::new(vec![json_field, text_field]));
905 let batch = RecordBatch::try_new(
906 schema,
907 vec![
908 Arc::new(json_strings) as ArrayRef,
909 Arc::new(text_strings) as ArrayRef,
910 ],
911 )
912 .unwrap();
913
914 let converted = convert_json_columns(&batch).unwrap();
915 assert_eq!(converted.num_columns(), 2);
916 assert_eq!(
917 converted.schema().field(0).data_type(),
918 &DataType::LargeBinary
919 );
920 assert_eq!(converted.schema().field(1).data_type(), &DataType::Utf8);
921 }
922
923 #[test]
924 fn test_is_arrow_json_field_large_utf8() {
925 let mut field = ArrowField::new("data", DataType::LargeUtf8, true);
927 let mut metadata = std::collections::HashMap::new();
928 metadata.insert(
929 ARROW_EXT_NAME_KEY.to_string(),
930 ARROW_JSON_EXT_NAME.to_string(),
931 );
932 field.set_metadata(metadata);
933
934 assert!(is_arrow_json_field(&field));
935 }
936
937 #[test]
938 fn test_encode_json_invalid() {
939 let result = encode_json("not valid json {");
941 assert!(result.is_err());
942 }
943
944 #[test]
945 fn test_json_array_from_invalid_json() {
946 let result = JsonArray::try_from_iter(vec![Some("invalid json {")]);
948 assert!(result.is_err());
949 assert!(result.unwrap_err().to_string().contains("Failed to encode"));
950 }
951
952 #[test]
953 fn test_try_from_string_array_invalid_json() {
954 let string_array = StringArray::from(vec![Some("invalid json {")]);
955 let result = JsonArray::try_from(string_array);
956 assert!(result.is_err());
957 }
958
959 #[test]
960 fn test_try_from_large_string_array_invalid_json() {
961 let large_string_array = LargeStringArray::from(vec![Some("invalid json {")]);
962 let result = JsonArray::try_from(large_string_array);
963 assert!(result.is_err());
964 }
965
966 #[test]
967 fn test_convert_lance_json_to_arrow_mixed_columns() {
968 let json_array = JsonArray::try_from_iter(vec![
970 Some(r#"{"name": "Alice"}"#),
971 Some(r#"{"name": "Bob"}"#),
972 ])
973 .unwrap();
974 let text_strings = StringArray::from(vec![Some("hello"), Some("world")]);
975
976 let json_f = json_field("json_data", true);
977 let text_field = ArrowField::new("text_data", DataType::Utf8, true);
978
979 let schema = Arc::new(Schema::new(vec![json_f, text_field]));
980 let batch = RecordBatch::try_new(
981 schema,
982 vec![
983 Arc::new(json_array.into_inner()) as ArrayRef,
984 Arc::new(text_strings) as ArrayRef,
985 ],
986 )
987 .unwrap();
988
989 let converted = convert_lance_json_to_arrow(&batch).unwrap();
990 assert_eq!(converted.num_columns(), 2);
991 assert_eq!(converted.schema().field(0).data_type(), &DataType::Utf8);
992 assert_eq!(converted.schema().field(1).data_type(), &DataType::Utf8);
993 }
994
995 #[test]
996 fn test_json_path_invalid_path() {
997 let json_array = JsonArray::try_from_iter(vec![Some(r#"{"a": 1}"#)]).unwrap();
998 let result = json_array.json_path(0, "invalid path without $");
1000 assert!(result.is_err());
1001 assert!(result
1002 .unwrap_err()
1003 .to_string()
1004 .contains("Failed to extract JSONPath"));
1005 }
1006
1007 #[test]
1008 fn test_convert_json_columns_invalid_storage_type() {
1009 let int_array = arrow_array::Int32Array::from(vec![1, 2, 3]);
1011
1012 let mut field = ArrowField::new("data", DataType::Int32, false);
1013 let mut metadata = std::collections::HashMap::new();
1014 metadata.insert(
1015 ARROW_EXT_NAME_KEY.to_string(),
1016 ARROW_JSON_EXT_NAME.to_string(),
1017 );
1018 field.set_metadata(metadata);
1019
1020 let schema = Arc::new(Schema::new(vec![field]));
1021 let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array) as ArrayRef]).unwrap();
1022
1023 let result = convert_json_columns(&batch);
1026 assert!(result.is_ok());
1027 }
1028
1029 #[test]
1030 fn test_is_json_field_wrong_extension() {
1031 let field = ArrowField::new("data", DataType::LargeBinary, true);
1033 assert!(!is_json_field(&field));
1034
1035 let mut field2 = ArrowField::new("data", DataType::LargeBinary, true);
1037 let mut metadata = std::collections::HashMap::new();
1038 metadata.insert(
1039 ARROW_EXT_NAME_KEY.to_string(),
1040 "other.extension".to_string(),
1041 );
1042 field2.set_metadata(metadata);
1043 assert!(!is_json_field(&field2));
1044 }
1045
1046 #[test]
1047 fn test_is_arrow_json_field_wrong_extension() {
1048 let field = ArrowField::new("data", DataType::Utf8, true);
1050 assert!(!is_arrow_json_field(&field));
1051
1052 let mut field2 = ArrowField::new("data", DataType::Utf8, true);
1054 let mut metadata = std::collections::HashMap::new();
1055 metadata.insert(
1056 ARROW_EXT_NAME_KEY.to_string(),
1057 "other.extension".to_string(),
1058 );
1059 field2.set_metadata(metadata);
1060 assert!(!is_arrow_json_field(&field2));
1061
1062 let field3 = ArrowField::new("data", DataType::Int32, true);
1064 assert!(!is_arrow_json_field(&field3));
1065 }
1066
1067 #[test]
1068 fn test_convert_json_columns_invalid_json_utf8() {
1069 let invalid_json = StringArray::from(vec![Some("invalid json {")]);
1071
1072 let mut field = ArrowField::new("data", DataType::Utf8, false);
1073 let mut metadata = std::collections::HashMap::new();
1074 metadata.insert(
1075 ARROW_EXT_NAME_KEY.to_string(),
1076 ARROW_JSON_EXT_NAME.to_string(),
1077 );
1078 field.set_metadata(metadata);
1079
1080 let schema = Arc::new(Schema::new(vec![field]));
1081 let batch = RecordBatch::try_new(schema, vec![Arc::new(invalid_json) as ArrayRef]).unwrap();
1082
1083 let result = convert_json_columns(&batch);
1084 assert!(result.is_err());
1085 }
1086
1087 #[test]
1088 fn test_convert_json_columns_invalid_json_large_utf8() {
1089 let invalid_json = LargeStringArray::from(vec![Some("invalid json {")]);
1091
1092 let mut field = ArrowField::new("data", DataType::LargeUtf8, false);
1093 let mut metadata = std::collections::HashMap::new();
1094 metadata.insert(
1095 ARROW_EXT_NAME_KEY.to_string(),
1096 ARROW_JSON_EXT_NAME.to_string(),
1097 );
1098 field.set_metadata(metadata);
1099
1100 let schema = Arc::new(Schema::new(vec![field]));
1101 let batch = RecordBatch::try_new(schema, vec![Arc::new(invalid_json) as ArrayRef]).unwrap();
1102
1103 let result = convert_json_columns(&batch);
1104 assert!(result.is_err());
1105 }
1106
1107 #[test]
1108 fn test_json_path_on_corrupted_jsonb() {
1109 let corrupted_bytes: &[u8] = &[0xFF, 0xFE, 0x00, 0x01, 0x02];
1111 let corrupted_binary = LargeBinaryArray::from(vec![Some(corrupted_bytes)]);
1112
1113 let corrupted_json = JsonArray {
1115 inner: corrupted_binary,
1116 };
1117
1118 let _result = corrupted_json.json_path(0, "$.a");
1121 }
1123
1124 #[test]
1125 fn test_decode_json_on_various_inputs() {
1126 let valid_jsonb = encode_json(r#"{"key": "value"}"#).unwrap();
1128 let decoded = decode_json(&valid_jsonb);
1129 assert!(decoded.contains("key"));
1130
1131 let decoded_empty = decode_json(&[]);
1133 let _ = decoded_empty;
1135
1136 let decoded_random = decode_json(&[0xFF, 0xFE, 0x00]);
1138 let _ = decoded_random;
1140 }
1141}