1use std::convert::TryFrom;
7use std::sync::Arc;
8
9use arrow_array::builder::LargeBinaryBuilder;
10use arrow_array::{Array, ArrayRef, LargeBinaryArray, LargeStringArray, RecordBatch, StringArray};
11use arrow_data::ArrayData;
12use arrow_schema::{ArrowError, DataType, Field as ArrowField, Schema};
13
14use crate::ARROW_EXT_NAME_KEY;
15
16pub const JSON_EXT_NAME: &str = "lance.json";
18
19pub const ARROW_JSON_EXT_NAME: &str = "arrow.json";
21
22pub fn is_json_field(field: &ArrowField) -> bool {
24 field.data_type() == &DataType::LargeBinary
25 && field
26 .metadata()
27 .get(ARROW_EXT_NAME_KEY)
28 .map(|name| name == JSON_EXT_NAME)
29 .unwrap_or_default()
30}
31
32pub fn is_arrow_json_field(field: &ArrowField) -> bool {
34 (field.data_type() == &DataType::Utf8 || field.data_type() == &DataType::LargeUtf8)
36 && field
37 .metadata()
38 .get(ARROW_EXT_NAME_KEY)
39 .map(|name| name == ARROW_JSON_EXT_NAME)
40 .unwrap_or_default()
41}
42
43pub fn has_json_fields(field: &ArrowField) -> bool {
45 if is_json_field(field) {
46 return true;
47 }
48
49 match field.data_type() {
50 DataType::Struct(fields) => fields.iter().any(|f| has_json_fields(f)),
51 DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
52 has_json_fields(f)
53 }
54 DataType::Map(f, _) => has_json_fields(f),
55 _ => false,
56 }
57}
58
59pub fn json_field(name: &str, nullable: bool) -> ArrowField {
61 let mut field = ArrowField::new(name, DataType::LargeBinary, nullable);
62 let mut metadata = std::collections::HashMap::new();
63 metadata.insert(ARROW_EXT_NAME_KEY.to_string(), JSON_EXT_NAME.to_string());
64 field.set_metadata(metadata);
65 field
66}
67
68#[derive(Debug, Clone)]
70pub struct JsonArray {
71 inner: LargeBinaryArray,
72}
73
74impl JsonArray {
75 pub fn try_from_iter<I, S>(iter: I) -> Result<Self, ArrowError>
77 where
78 I: IntoIterator<Item = Option<S>>,
79 S: AsRef<str>,
80 {
81 let mut builder = LargeBinaryBuilder::new();
82
83 for json_str in iter {
84 match json_str {
85 Some(s) => {
86 let encoded = encode_json(s.as_ref()).map_err(|e| {
87 ArrowError::InvalidArgumentError(format!("Failed to encode JSON: {}", e))
88 })?;
89 builder.append_value(&encoded);
90 }
91 None => builder.append_null(),
92 }
93 }
94
95 Ok(Self {
96 inner: builder.finish(),
97 })
98 }
99
100 pub fn into_inner(self) -> LargeBinaryArray {
102 self.inner
103 }
104
105 pub fn inner(&self) -> &LargeBinaryArray {
107 &self.inner
108 }
109
110 pub fn value(&self, i: usize) -> Result<String, ArrowError> {
112 if self.inner.is_null(i) {
113 return Err(ArrowError::InvalidArgumentError(
114 "Value is null".to_string(),
115 ));
116 }
117
118 let jsonb_bytes = self.inner.value(i);
119 decode_json(jsonb_bytes)
120 .map_err(|e| ArrowError::InvalidArgumentError(format!("Failed to decode JSON: {}", e)))
121 }
122
123 pub fn value_bytes(&self, i: usize) -> &[u8] {
125 self.inner.value(i)
126 }
127
128 pub fn json_path(&self, i: usize, path: &str) -> Result<Option<String>, ArrowError> {
130 if self.inner.is_null(i) {
131 return Ok(None);
132 }
133
134 let jsonb_bytes = self.inner.value(i);
135 get_json_path(jsonb_bytes, path).map_err(|e| {
136 ArrowError::InvalidArgumentError(format!("Failed to extract JSONPath: {}", e))
137 })
138 }
139
140 pub fn to_arrow_json(&self) -> Result<ArrayRef, ArrowError> {
142 let mut builder = arrow_array::builder::StringBuilder::new();
143
144 for i in 0..self.len() {
145 if self.is_null(i) {
146 builder.append_null();
147 } else {
148 let jsonb_bytes = self.inner.value(i);
149 let json_str = decode_json(jsonb_bytes).map_err(|e| {
150 ArrowError::InvalidArgumentError(format!("Failed to decode JSON: {}", e))
151 })?;
152 builder.append_value(&json_str);
153 }
154 }
155
156 Ok(Arc::new(builder.finish()))
158 }
159}
160
161impl Array for JsonArray {
162 fn as_any(&self) -> &dyn std::any::Any {
163 self
164 }
165
166 fn to_data(&self) -> ArrayData {
167 self.inner.to_data()
168 }
169
170 fn into_data(self) -> ArrayData {
171 self.inner.into_data()
172 }
173
174 fn data_type(&self) -> &DataType {
175 &DataType::LargeBinary
176 }
177
178 fn slice(&self, offset: usize, length: usize) -> ArrayRef {
179 Arc::new(Self {
180 inner: self.inner.slice(offset, length),
181 })
182 }
183
184 fn len(&self) -> usize {
185 self.inner.len()
186 }
187
188 fn is_empty(&self) -> bool {
189 self.inner.is_empty()
190 }
191
192 fn offset(&self) -> usize {
193 self.inner.offset()
194 }
195
196 fn nulls(&self) -> Option<&arrow_buffer::NullBuffer> {
197 self.inner.nulls()
198 }
199
200 fn get_buffer_memory_size(&self) -> usize {
201 self.inner.get_buffer_memory_size()
202 }
203
204 fn get_array_memory_size(&self) -> usize {
205 self.inner.get_array_memory_size()
206 }
207}
208
209impl TryFrom<StringArray> for JsonArray {
211 type Error = ArrowError;
212
213 fn try_from(array: StringArray) -> Result<Self, Self::Error> {
214 Self::try_from(&array)
215 }
216}
217
218impl TryFrom<&StringArray> for JsonArray {
219 type Error = ArrowError;
220
221 fn try_from(array: &StringArray) -> Result<Self, Self::Error> {
222 let mut builder = LargeBinaryBuilder::with_capacity(array.len(), array.value_data().len());
223
224 for i in 0..array.len() {
225 if array.is_null(i) {
226 builder.append_null();
227 } else {
228 let json_str = array.value(i);
229 let encoded = encode_json(json_str).map_err(|e| {
230 ArrowError::InvalidArgumentError(format!("Failed to encode JSON: {}", e))
231 })?;
232 builder.append_value(&encoded);
233 }
234 }
235
236 Ok(Self {
237 inner: builder.finish(),
238 })
239 }
240}
241
242impl TryFrom<LargeStringArray> for JsonArray {
243 type Error = ArrowError;
244
245 fn try_from(array: LargeStringArray) -> Result<Self, Self::Error> {
246 Self::try_from(&array)
247 }
248}
249
250impl TryFrom<&LargeStringArray> for JsonArray {
251 type Error = ArrowError;
252
253 fn try_from(array: &LargeStringArray) -> Result<Self, Self::Error> {
254 let mut builder = LargeBinaryBuilder::with_capacity(array.len(), array.value_data().len());
255
256 for i in 0..array.len() {
257 if array.is_null(i) {
258 builder.append_null();
259 } else {
260 let json_str = array.value(i);
261 let encoded = encode_json(json_str).map_err(|e| {
262 ArrowError::InvalidArgumentError(format!("Failed to encode JSON: {}", e))
263 })?;
264 builder.append_value(&encoded);
265 }
266 }
267
268 Ok(Self {
269 inner: builder.finish(),
270 })
271 }
272}
273
274impl TryFrom<ArrayRef> for JsonArray {
275 type Error = ArrowError;
276
277 fn try_from(array_ref: ArrayRef) -> Result<Self, Self::Error> {
278 match array_ref.data_type() {
279 DataType::Utf8 => {
280 let string_array = array_ref
281 .as_any()
282 .downcast_ref::<StringArray>()
283 .ok_or_else(|| {
284 ArrowError::InvalidArgumentError("Failed to downcast to StringArray".into())
285 })?;
286 Self::try_from(string_array)
287 }
288 DataType::LargeUtf8 => {
289 let large_string_array = array_ref
290 .as_any()
291 .downcast_ref::<LargeStringArray>()
292 .ok_or_else(|| {
293 ArrowError::InvalidArgumentError(
294 "Failed to downcast to LargeStringArray".into(),
295 )
296 })?;
297 Self::try_from(large_string_array)
298 }
299 dt => Err(ArrowError::InvalidArgumentError(format!(
300 "Unsupported array type for JSON: {:?}. Expected Utf8 or LargeUtf8",
301 dt
302 ))),
303 }
304 }
305}
306
307pub fn encode_json(json_str: &str) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
309 let value = jsonb::parse_value(json_str.as_bytes())?;
310 Ok(value.to_vec())
311}
312
313pub fn decode_json(jsonb_bytes: &[u8]) -> Result<String, Box<dyn std::error::Error>> {
315 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
316 Ok(raw_jsonb.to_string())
317}
318
319fn get_json_path(
321 jsonb_bytes: &[u8],
322 path: &str,
323) -> Result<Option<String>, Box<dyn std::error::Error>> {
324 let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes())?;
325 let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
326 let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
327
328 match selector.select_values(&json_path) {
329 Ok(values) => {
330 if values.is_empty() {
331 Ok(None)
332 } else {
333 Ok(Some(values[0].to_string()))
334 }
335 }
336 Err(e) => Err(Box::new(e)),
337 }
338}
339
340pub fn arrow_json_to_lance_json(field: &ArrowField) -> ArrowField {
342 if is_arrow_json_field(field) {
343 let mut new_field =
346 ArrowField::new(field.name(), DataType::LargeBinary, field.is_nullable());
347
348 let mut metadata = field.metadata().clone();
350 metadata.insert(ARROW_EXT_NAME_KEY.to_string(), JSON_EXT_NAME.to_string());
352
353 new_field = new_field.with_metadata(metadata);
354 new_field
355 } else {
356 field.clone()
357 }
358}
359
360pub fn convert_lance_json_to_arrow(
362 batch: &arrow_array::RecordBatch,
363) -> Result<arrow_array::RecordBatch, ArrowError> {
364 let schema = batch.schema();
365 let mut needs_conversion = false;
366 let mut new_fields = Vec::with_capacity(schema.fields().len());
367 let mut new_columns = Vec::with_capacity(batch.num_columns());
368
369 for (i, field) in schema.fields().iter().enumerate() {
370 let column = batch.column(i);
371
372 if is_json_field(field) {
373 needs_conversion = true;
374
375 let mut new_field = ArrowField::new(field.name(), DataType::Utf8, field.is_nullable());
377 let mut metadata = field.metadata().clone();
378 metadata.insert(
380 ARROW_EXT_NAME_KEY.to_string(),
381 ARROW_JSON_EXT_NAME.to_string(),
382 );
383 new_field.set_metadata(metadata);
384 new_fields.push(new_field);
385
386 if batch.num_rows() == 0 {
388 let empty_strings = arrow_array::builder::StringBuilder::new().finish();
390 new_columns.push(Arc::new(empty_strings) as ArrayRef);
391 } else {
392 let binary_array = column
394 .as_any()
395 .downcast_ref::<LargeBinaryArray>()
396 .ok_or_else(|| {
397 ArrowError::InvalidArgumentError(format!(
398 "Lance JSON field '{}' has unexpected type",
399 field.name()
400 ))
401 })?;
402
403 let mut builder = arrow_array::builder::StringBuilder::new();
404 for i in 0..binary_array.len() {
405 if binary_array.is_null(i) {
406 builder.append_null();
407 } else {
408 let jsonb_bytes = binary_array.value(i);
409 let json_str = decode_json(jsonb_bytes).map_err(|e| {
410 ArrowError::InvalidArgumentError(format!(
411 "Failed to decode JSON: {}",
412 e
413 ))
414 })?;
415 builder.append_value(&json_str);
416 }
417 }
418 new_columns.push(Arc::new(builder.finish()) as ArrayRef);
419 }
420 } else {
421 new_fields.push(field.as_ref().clone());
422 new_columns.push(column.clone());
423 }
424 }
425
426 if needs_conversion {
427 let new_schema = Arc::new(Schema::new_with_metadata(
428 new_fields,
429 schema.metadata().clone(),
430 ));
431 RecordBatch::try_new(new_schema, new_columns)
432 } else {
433 Ok(batch.clone())
435 }
436}
437
438pub fn convert_json_columns(
440 batch: &arrow_array::RecordBatch,
441) -> Result<arrow_array::RecordBatch, ArrowError> {
442 let schema = batch.schema();
443 let mut needs_conversion = false;
444 let mut new_fields = Vec::with_capacity(schema.fields().len());
445 let mut new_columns = Vec::with_capacity(batch.num_columns());
446
447 for (i, field) in schema.fields().iter().enumerate() {
448 let column = batch.column(i);
449
450 if is_arrow_json_field(field) {
451 needs_conversion = true;
452
453 new_fields.push(arrow_json_to_lance_json(field));
455
456 if batch.num_rows() == 0 {
458 let empty_binary = LargeBinaryBuilder::new().finish();
460 new_columns.push(Arc::new(empty_binary) as ArrayRef);
461 } else {
462 let json_array =
464 if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
465 JsonArray::try_from(string_array)?
466 } else if let Some(large_string_array) =
467 column.as_any().downcast_ref::<LargeStringArray>()
468 {
469 JsonArray::try_from(large_string_array)?
470 } else {
471 return Err(ArrowError::InvalidArgumentError(format!(
472 "Arrow JSON field '{}' has unexpected storage type: {:?}",
473 field.name(),
474 column.data_type()
475 )));
476 };
477
478 let binary_array = json_array.into_inner();
479
480 new_columns.push(Arc::new(binary_array) as ArrayRef);
481 }
482 } else {
483 new_fields.push(field.as_ref().clone());
484 new_columns.push(column.clone());
485 }
486 }
487
488 if needs_conversion {
489 let new_schema = Arc::new(Schema::new_with_metadata(
490 new_fields,
491 schema.metadata().clone(),
492 ));
493 RecordBatch::try_new(new_schema, new_columns)
494 } else {
495 Ok(batch.clone())
497 }
498}
499
500#[cfg(test)]
501mod tests {
502 use super::*;
503
504 #[test]
505 fn test_json_field_creation() {
506 let field = json_field("data", true);
507 assert_eq!(field.name(), "data");
508 assert_eq!(field.data_type(), &DataType::LargeBinary);
509 assert!(field.is_nullable());
510 assert!(is_json_field(&field));
511 }
512
513 #[test]
514 fn test_json_array_from_strings() {
515 let json_strings = vec![
516 Some(r#"{"name": "Alice", "age": 30}"#),
517 None,
518 Some(r#"{"name": "Bob", "age": 25}"#),
519 ];
520
521 let array = JsonArray::try_from_iter(json_strings).unwrap();
522 assert_eq!(array.len(), 3);
523 assert!(!array.is_null(0));
524 assert!(array.is_null(1));
525 assert!(!array.is_null(2));
526
527 let decoded = array.value(0).unwrap();
528 assert!(decoded.contains("Alice"));
529 }
530
531 #[test]
532 fn test_json_array_from_string_array() {
533 let string_array = StringArray::from(vec![
534 Some(r#"{"name": "Alice"}"#),
535 Some(r#"{"name": "Bob"}"#),
536 None,
537 ]);
538
539 let json_array = JsonArray::try_from(string_array).unwrap();
540 assert_eq!(json_array.len(), 3);
541 assert!(!json_array.is_null(0));
542 assert!(!json_array.is_null(1));
543 assert!(json_array.is_null(2));
544 }
545
546 #[test]
547 fn test_json_path_extraction() {
548 let json_array = JsonArray::try_from_iter(vec![
549 Some(r#"{"user": {"name": "Alice", "age": 30}}"#),
550 Some(r#"{"user": {"name": "Bob"}}"#),
551 ])
552 .unwrap();
553
554 let name = json_array.json_path(0, "$.user.name").unwrap();
555 assert_eq!(name, Some("\"Alice\"".to_string()));
556
557 let age = json_array.json_path(1, "$.user.age").unwrap();
558 assert_eq!(age, None);
559 }
560
561 #[test]
562 fn test_convert_json_columns() {
563 let json_strings = vec![Some(r#"{"name": "Alice"}"#), Some(r#"{"name": "Bob"}"#)];
565 let json_arr = StringArray::from(json_strings);
566
567 let mut field = ArrowField::new("data", DataType::Utf8, false);
569 let mut metadata = std::collections::HashMap::new();
570 metadata.insert(
571 ARROW_EXT_NAME_KEY.to_string(),
572 ARROW_JSON_EXT_NAME.to_string(),
573 );
574 field.set_metadata(metadata);
575
576 let schema = Arc::new(Schema::new(vec![field]));
577 let batch = RecordBatch::try_new(schema, vec![Arc::new(json_arr) as ArrayRef]).unwrap();
578
579 let converted = convert_json_columns(&batch).unwrap();
581
582 assert_eq!(converted.num_columns(), 1);
584 let converted_schema = converted.schema();
585 let converted_field = converted_schema.field(0);
586 assert_eq!(converted_field.data_type(), &DataType::LargeBinary);
587 assert_eq!(
588 converted_field.metadata().get(ARROW_EXT_NAME_KEY),
589 Some(&JSON_EXT_NAME.to_string())
590 );
591
592 let converted_column = converted.column(0);
594 assert_eq!(converted_column.data_type(), &DataType::LargeBinary);
595 assert_eq!(converted_column.len(), 2);
596
597 let binary_array = converted_column
599 .as_any()
600 .downcast_ref::<LargeBinaryArray>()
601 .unwrap();
602 for i in 0..binary_array.len() {
603 let jsonb_bytes = binary_array.value(i);
604 let decoded = decode_json(jsonb_bytes).unwrap();
605 assert!(decoded.contains("name"));
606 }
607 }
608}