use std::convert::TryFrom;
use std::sync::Arc;
use arrow_array::builder::LargeBinaryBuilder;
use arrow_array::{Array, ArrayRef, LargeBinaryArray, LargeStringArray, RecordBatch, StringArray};
use arrow_schema::{ArrowError, DataType, Field as ArrowField, Schema};
use crate::ARROW_EXT_NAME_KEY;
pub const JSON_EXT_NAME: &str = "lance.json";
pub const ARROW_JSON_EXT_NAME: &str = "arrow.json";
pub fn is_json_field(field: &ArrowField) -> bool {
field.data_type() == &DataType::LargeBinary
&& field
.metadata()
.get(ARROW_EXT_NAME_KEY)
.map(|name| name == JSON_EXT_NAME)
.unwrap_or_default()
}
pub fn is_arrow_json_field(field: &ArrowField) -> bool {
(field.data_type() == &DataType::Utf8 || field.data_type() == &DataType::LargeUtf8)
&& field
.metadata()
.get(ARROW_EXT_NAME_KEY)
.map(|name| name == ARROW_JSON_EXT_NAME)
.unwrap_or_default()
}
pub fn has_json_fields(field: &ArrowField) -> bool {
if is_json_field(field) {
return true;
}
match field.data_type() {
DataType::Struct(fields) => fields.iter().any(|f| has_json_fields(f)),
DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
has_json_fields(f)
}
DataType::Map(f, _) => has_json_fields(f),
_ => false,
}
}
pub fn json_field(name: &str, nullable: bool) -> ArrowField {
let mut field = ArrowField::new(name, DataType::LargeBinary, nullable);
let mut metadata = std::collections::HashMap::new();
metadata.insert(ARROW_EXT_NAME_KEY.to_string(), JSON_EXT_NAME.to_string());
field.set_metadata(metadata);
field
}
#[derive(Debug, Clone)]
pub struct JsonArray {
inner: LargeBinaryArray,
}
impl JsonArray {
pub fn try_from_iter<I, S>(iter: I) -> Result<Self, ArrowError>
where
I: IntoIterator<Item = Option<S>>,
S: AsRef<str>,
{
let mut builder = LargeBinaryBuilder::new();
for json_str in iter {
match json_str {
Some(s) => {
let encoded = encode_json(s.as_ref()).map_err(|e| {
ArrowError::InvalidArgumentError(format!("Failed to encode JSON: {}", e))
})?;
builder.append_value(&encoded);
}
None => builder.append_null(),
}
}
Ok(Self {
inner: builder.finish(),
})
}
pub fn into_inner(self) -> LargeBinaryArray {
self.inner
}
pub fn inner(&self) -> &LargeBinaryArray {
&self.inner
}
pub fn value(&self, i: usize) -> Result<String, ArrowError> {
if self.inner.is_null(i) {
return Err(ArrowError::InvalidArgumentError(
"Value is null".to_string(),
));
}
let jsonb_bytes = self.inner.value(i);
Ok(decode_json(jsonb_bytes))
}
pub fn value_bytes(&self, i: usize) -> &[u8] {
self.inner.value(i)
}
pub fn json_path(&self, i: usize, path: &str) -> Result<Option<String>, ArrowError> {
if self.inner.is_null(i) {
return Ok(None);
}
let jsonb_bytes = self.inner.value(i);
get_json_path(jsonb_bytes, path).map_err(|e| {
ArrowError::InvalidArgumentError(format!("Failed to extract JSONPath: {}", e))
})
}
pub fn to_arrow_json(&self) -> ArrayRef {
let mut builder = arrow_array::builder::StringBuilder::new();
for i in 0..self.inner.len() {
if self.inner.is_null(i) {
builder.append_null();
} else {
let jsonb_bytes = self.inner.value(i);
let json_str = decode_json(jsonb_bytes);
builder.append_value(&json_str);
}
}
Arc::new(builder.finish())
}
pub fn len(&self) -> usize {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn is_null(&self, i: usize) -> bool {
self.inner.is_null(i)
}
}
impl TryFrom<StringArray> for JsonArray {
type Error = ArrowError;
fn try_from(array: StringArray) -> Result<Self, Self::Error> {
Self::try_from(&array)
}
}
impl TryFrom<&StringArray> for JsonArray {
type Error = ArrowError;
fn try_from(array: &StringArray) -> Result<Self, Self::Error> {
let mut builder = LargeBinaryBuilder::with_capacity(array.len(), array.value_data().len());
for i in 0..array.len() {
if array.is_null(i) {
builder.append_null();
} else {
let json_str = array.value(i);
let encoded = encode_json(json_str).map_err(|e| {
ArrowError::InvalidArgumentError(format!("Failed to encode JSON: {}", e))
})?;
builder.append_value(&encoded);
}
}
Ok(Self {
inner: builder.finish(),
})
}
}
impl TryFrom<LargeStringArray> for JsonArray {
type Error = ArrowError;
fn try_from(array: LargeStringArray) -> Result<Self, Self::Error> {
Self::try_from(&array)
}
}
impl TryFrom<&LargeStringArray> for JsonArray {
type Error = ArrowError;
fn try_from(array: &LargeStringArray) -> Result<Self, Self::Error> {
let mut builder = LargeBinaryBuilder::with_capacity(array.len(), array.value_data().len());
for i in 0..array.len() {
if array.is_null(i) {
builder.append_null();
} else {
let json_str = array.value(i);
let encoded = encode_json(json_str).map_err(|e| {
ArrowError::InvalidArgumentError(format!("Failed to encode JSON: {}", e))
})?;
builder.append_value(&encoded);
}
}
Ok(Self {
inner: builder.finish(),
})
}
}
impl TryFrom<ArrayRef> for JsonArray {
type Error = ArrowError;
fn try_from(array_ref: ArrayRef) -> Result<Self, Self::Error> {
match array_ref.data_type() {
DataType::Utf8 => {
let string_array = array_ref
.as_any()
.downcast_ref::<StringArray>()
.expect("DataType::Utf8 array must be StringArray");
Self::try_from(string_array)
}
DataType::LargeUtf8 => {
let large_string_array = array_ref
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("DataType::LargeUtf8 array must be LargeStringArray");
Self::try_from(large_string_array)
}
dt => Err(ArrowError::InvalidArgumentError(format!(
"Unsupported array type for JSON: {:?}. Expected Utf8 or LargeUtf8",
dt
))),
}
}
}
pub fn encode_json(json_str: &str) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
let value = jsonb::parse_value(json_str.as_bytes())?;
Ok(value.to_vec())
}
pub fn decode_json(jsonb_bytes: &[u8]) -> String {
let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
raw_jsonb.to_string()
}
fn get_json_path(
jsonb_bytes: &[u8],
path: &str,
) -> Result<Option<String>, Box<dyn std::error::Error>> {
let json_path = jsonb::jsonpath::parse_json_path(path.as_bytes())?;
let raw_jsonb = jsonb::RawJsonb::new(jsonb_bytes);
let mut selector = jsonb::jsonpath::Selector::new(raw_jsonb);
let values = selector.select_values(&json_path)?;
if values.is_empty() {
Ok(None)
} else {
Ok(Some(values[0].to_string()))
}
}
pub fn arrow_json_to_lance_json(field: &ArrowField) -> ArrowField {
if is_arrow_json_field(field) {
let mut new_field =
ArrowField::new(field.name(), DataType::LargeBinary, field.is_nullable());
let mut metadata = field.metadata().clone();
metadata.insert(ARROW_EXT_NAME_KEY.to_string(), JSON_EXT_NAME.to_string());
new_field = new_field.with_metadata(metadata);
new_field
} else {
field.clone()
}
}
pub fn convert_lance_json_to_arrow(
batch: &arrow_array::RecordBatch,
) -> Result<arrow_array::RecordBatch, ArrowError> {
let schema = batch.schema();
let mut needs_conversion = false;
let mut new_fields = Vec::with_capacity(schema.fields().len());
let mut new_columns = Vec::with_capacity(batch.num_columns());
for (i, field) in schema.fields().iter().enumerate() {
let column = batch.column(i);
if is_json_field(field) {
needs_conversion = true;
let mut new_field = ArrowField::new(field.name(), DataType::Utf8, field.is_nullable());
let mut metadata = field.metadata().clone();
metadata.insert(
ARROW_EXT_NAME_KEY.to_string(),
ARROW_JSON_EXT_NAME.to_string(),
);
new_field.set_metadata(metadata);
new_fields.push(new_field);
if batch.num_rows() == 0 {
let empty_strings = arrow_array::builder::StringBuilder::new().finish();
new_columns.push(Arc::new(empty_strings) as ArrayRef);
} else {
let binary_array = column
.as_any()
.downcast_ref::<LargeBinaryArray>()
.expect("Lance JSON field must be LargeBinaryArray");
let mut builder = arrow_array::builder::StringBuilder::new();
for i in 0..binary_array.len() {
if binary_array.is_null(i) {
builder.append_null();
} else {
let jsonb_bytes = binary_array.value(i);
let json_str = decode_json(jsonb_bytes);
builder.append_value(&json_str);
}
}
new_columns.push(Arc::new(builder.finish()) as ArrayRef);
}
} else {
new_fields.push(field.as_ref().clone());
new_columns.push(column.clone());
}
}
if needs_conversion {
let new_schema = Arc::new(Schema::new_with_metadata(
new_fields,
schema.metadata().clone(),
));
RecordBatch::try_new(new_schema, new_columns)
} else {
Ok(batch.clone())
}
}
pub fn convert_json_columns(
batch: &arrow_array::RecordBatch,
) -> Result<arrow_array::RecordBatch, ArrowError> {
let schema = batch.schema();
let mut needs_conversion = false;
let mut new_fields = Vec::with_capacity(schema.fields().len());
let mut new_columns = Vec::with_capacity(batch.num_columns());
for (i, field) in schema.fields().iter().enumerate() {
let column = batch.column(i);
if is_arrow_json_field(field) {
needs_conversion = true;
new_fields.push(arrow_json_to_lance_json(field));
if batch.num_rows() == 0 {
let empty_binary = LargeBinaryBuilder::new().finish();
new_columns.push(Arc::new(empty_binary) as ArrayRef);
} else {
let json_array =
if let Some(string_array) = column.as_any().downcast_ref::<StringArray>() {
JsonArray::try_from(string_array)?
} else {
let large_string_array = column
.as_any()
.downcast_ref::<LargeStringArray>()
.expect("Arrow JSON field must be Utf8 or LargeUtf8");
JsonArray::try_from(large_string_array)?
};
let binary_array = json_array.into_inner();
new_columns.push(Arc::new(binary_array) as ArrayRef);
}
} else {
new_fields.push(field.as_ref().clone());
new_columns.push(column.clone());
}
}
if needs_conversion {
let new_schema = Arc::new(Schema::new_with_metadata(
new_fields,
schema.metadata().clone(),
));
RecordBatch::try_new(new_schema, new_columns)
} else {
Ok(batch.clone())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_json_field_creation() {
let field = json_field("data", true);
assert_eq!(field.name(), "data");
assert_eq!(field.data_type(), &DataType::LargeBinary);
assert!(field.is_nullable());
assert!(is_json_field(&field));
}
#[test]
fn test_json_array_from_strings() {
let json_strings = vec![
Some(r#"{"name": "Alice", "age": 30}"#),
None,
Some(r#"{"name": "Bob", "age": 25}"#),
];
let array = JsonArray::try_from_iter(json_strings).unwrap();
assert_eq!(array.len(), 3);
assert!(!array.is_null(0));
assert!(array.is_null(1));
assert!(!array.is_null(2));
let decoded = array.value(0).unwrap();
assert!(decoded.contains("Alice"));
}
#[test]
fn test_json_array_from_string_array() {
let string_array = StringArray::from(vec![
Some(r#"{"name": "Alice"}"#),
Some(r#"{"name": "Bob"}"#),
None,
]);
let json_array = JsonArray::try_from(string_array).unwrap();
assert_eq!(json_array.len(), 3);
assert!(!json_array.is_null(0));
assert!(!json_array.is_null(1));
assert!(json_array.is_null(2));
}
#[test]
fn test_json_path_extraction() {
let json_array = JsonArray::try_from_iter(vec![
Some(r#"{"user": {"name": "Alice", "age": 30}}"#),
Some(r#"{"user": {"name": "Bob"}}"#),
])
.unwrap();
let name = json_array.json_path(0, "$.user.name").unwrap();
assert_eq!(name, Some("\"Alice\"".to_string()));
let age = json_array.json_path(1, "$.user.age").unwrap();
assert_eq!(age, None);
}
#[test]
fn test_convert_json_columns() {
let json_strings = vec![Some(r#"{"name": "Alice"}"#), Some(r#"{"name": "Bob"}"#)];
let json_arr = StringArray::from(json_strings);
let mut field = ArrowField::new("data", DataType::Utf8, false);
let mut metadata = std::collections::HashMap::new();
metadata.insert(
ARROW_EXT_NAME_KEY.to_string(),
ARROW_JSON_EXT_NAME.to_string(),
);
field.set_metadata(metadata);
let schema = Arc::new(Schema::new(vec![field]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(json_arr) as ArrayRef]).unwrap();
let converted = convert_json_columns(&batch).unwrap();
assert_eq!(converted.num_columns(), 1);
let converted_schema = converted.schema();
let converted_field = converted_schema.field(0);
assert_eq!(converted_field.data_type(), &DataType::LargeBinary);
assert_eq!(
converted_field.metadata().get(ARROW_EXT_NAME_KEY),
Some(&JSON_EXT_NAME.to_string())
);
let converted_column = converted.column(0);
assert_eq!(converted_column.data_type(), &DataType::LargeBinary);
assert_eq!(converted_column.len(), 2);
let binary_array = converted_column
.as_any()
.downcast_ref::<LargeBinaryArray>()
.unwrap();
for i in 0..binary_array.len() {
let jsonb_bytes = binary_array.value(i);
let decoded = decode_json(jsonb_bytes);
assert!(decoded.contains("name"));
}
}
#[test]
fn test_has_json_fields() {
let json_f = json_field("data", true);
assert!(has_json_fields(&json_f));
let non_json = ArrowField::new("data", DataType::Utf8, true);
assert!(!has_json_fields(&non_json));
let struct_field = ArrowField::new(
"struct",
DataType::Struct(vec![json_field("nested_json", true)].into()),
true,
);
assert!(has_json_fields(&struct_field));
let struct_no_json = ArrowField::new(
"struct",
DataType::Struct(vec![ArrowField::new("text", DataType::Utf8, true)].into()),
true,
);
assert!(!has_json_fields(&struct_no_json));
let list_field = ArrowField::new(
"list",
DataType::List(Arc::new(json_field("item", true))),
true,
);
assert!(has_json_fields(&list_field));
let large_list_field = ArrowField::new(
"large_list",
DataType::LargeList(Arc::new(json_field("item", true))),
true,
);
assert!(has_json_fields(&large_list_field));
let fixed_list_field = ArrowField::new(
"fixed_list",
DataType::FixedSizeList(Arc::new(json_field("item", true)), 3),
true,
);
assert!(has_json_fields(&fixed_list_field));
let map_field = ArrowField::new(
"map",
DataType::Map(
Arc::new(ArrowField::new(
"entries",
DataType::Struct(
vec![
ArrowField::new("key", DataType::Utf8, false),
json_field("value", true),
]
.into(),
),
false,
)),
false,
),
true,
);
assert!(has_json_fields(&map_field));
}
#[test]
fn test_json_array_inner() {
let json_array = JsonArray::try_from_iter(vec![Some(r#"{"a": 1}"#)]).unwrap();
let inner = json_array.inner();
assert_eq!(inner.len(), 1);
}
#[test]
fn test_json_array_value_null_error() {
let json_array = JsonArray::try_from_iter(vec![None::<&str>]).unwrap();
let result = json_array.value(0);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("null"));
}
#[test]
fn test_json_array_value_bytes() {
let json_array = JsonArray::try_from_iter(vec![Some(r#"{"a": 1}"#)]).unwrap();
let bytes = json_array.value_bytes(0);
assert!(!bytes.is_empty());
}
#[test]
fn test_json_path_with_null() {
let json_array =
JsonArray::try_from_iter(vec![Some(r#"{"user": {"name": "Alice"}}"#), None::<&str>])
.unwrap();
let result = json_array.json_path(1, "$.user.name").unwrap();
assert_eq!(result, None);
}
#[test]
fn test_to_arrow_json() {
let json_array = JsonArray::try_from_iter(vec![
Some(r#"{"name": "Alice"}"#),
None::<&str>,
Some(r#"{"name": "Bob"}"#),
])
.unwrap();
let arrow_json = json_array.to_arrow_json();
assert_eq!(arrow_json.len(), 3);
assert!(!arrow_json.is_null(0));
assert!(arrow_json.is_null(1));
assert!(!arrow_json.is_null(2));
let string_array = arrow_json.as_any().downcast_ref::<StringArray>().unwrap();
assert!(string_array.value(0).contains("Alice"));
assert!(string_array.value(2).contains("Bob"));
}
#[test]
fn test_json_array_trait_methods() {
let json_array =
JsonArray::try_from_iter(vec![Some(r#"{"a": 1}"#), Some(r#"{"b": 2}"#)]).unwrap();
assert_eq!(json_array.len(), 2);
assert!(!json_array.is_empty());
assert!(!json_array.is_null(0));
assert_eq!(json_array.inner().data_type(), &DataType::LargeBinary);
assert_eq!(json_array.inner().len(), 2);
}
#[test]
fn test_json_array_empty() {
let json_array = JsonArray::try_from_iter(Vec::<Option<&str>>::new()).unwrap();
assert!(json_array.is_empty());
assert_eq!(json_array.len(), 0);
}
#[test]
fn test_try_from_large_string_array() {
let large_string_array = LargeStringArray::from(vec![
Some(r#"{"name": "Alice"}"#),
Some(r#"{"name": "Bob"}"#),
None,
]);
let json_array = JsonArray::try_from(&large_string_array).unwrap();
assert_eq!(json_array.len(), 3);
assert!(!json_array.is_null(0));
assert!(!json_array.is_null(1));
assert!(json_array.is_null(2));
let large_string_array2 = LargeStringArray::from(vec![Some(r#"{"x": 1}"#)]);
let json_array2 = JsonArray::try_from(large_string_array2).unwrap();
assert_eq!(json_array2.len(), 1);
}
#[test]
fn test_try_from_array_ref() {
let string_array: ArrayRef = Arc::new(StringArray::from(vec![
Some(r#"{"a": 1}"#),
Some(r#"{"b": 2}"#),
]));
let json_array = JsonArray::try_from(string_array).unwrap();
assert_eq!(json_array.len(), 2);
let large_string_array: ArrayRef = Arc::new(LargeStringArray::from(vec![
Some(r#"{"c": 3}"#),
Some(r#"{"d": 4}"#),
]));
let json_array2 = JsonArray::try_from(large_string_array).unwrap();
assert_eq!(json_array2.len(), 2);
let int_array: ArrayRef = Arc::new(arrow_array::Int32Array::from(vec![1, 2, 3]));
let result = JsonArray::try_from(int_array);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Unsupported"));
}
#[test]
fn test_arrow_json_to_lance_json_non_json_field() {
let field = ArrowField::new("text", DataType::Utf8, true);
let converted = arrow_json_to_lance_json(&field);
assert_eq!(converted.data_type(), &DataType::Utf8);
assert_eq!(converted.name(), "text");
}
#[test]
fn test_convert_lance_json_to_arrow() {
let json_array = JsonArray::try_from_iter(vec![
Some(r#"{"name": "Alice"}"#),
None::<&str>,
Some(r#"{"name": "Bob"}"#),
])
.unwrap();
let lance_json_field = json_field("data", true);
let schema = Arc::new(Schema::new(vec![lance_json_field]));
let batch =
RecordBatch::try_new(schema, vec![Arc::new(json_array.into_inner()) as ArrayRef])
.unwrap();
let converted = convert_lance_json_to_arrow(&batch).unwrap();
let converted_schema = converted.schema();
let converted_field = converted_schema.field(0);
assert_eq!(converted_field.data_type(), &DataType::Utf8);
assert_eq!(
converted_field.metadata().get(ARROW_EXT_NAME_KEY),
Some(&ARROW_JSON_EXT_NAME.to_string())
);
let string_array = converted
.column(0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert!(!string_array.is_null(0));
assert!(string_array.is_null(1));
assert!(!string_array.is_null(2));
assert!(string_array.value(0).contains("Alice"));
assert!(string_array.value(2).contains("Bob"));
}
#[test]
fn test_convert_lance_json_to_arrow_empty_batch() {
let lance_json_field = json_field("data", true);
let schema = Arc::new(Schema::new(vec![lance_json_field]));
let empty_binary = LargeBinaryBuilder::new().finish();
let batch = RecordBatch::try_new(schema, vec![Arc::new(empty_binary) as ArrayRef]).unwrap();
let converted = convert_lance_json_to_arrow(&batch).unwrap();
assert_eq!(converted.num_rows(), 0);
assert_eq!(converted.schema().field(0).data_type(), &DataType::Utf8);
}
#[test]
fn test_convert_lance_json_to_arrow_no_json_columns() {
let field = ArrowField::new("text", DataType::Utf8, true);
let schema = Arc::new(Schema::new(vec![field]));
let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
let batch = RecordBatch::try_new(schema, vec![Arc::new(string_array) as ArrayRef]).unwrap();
let converted = convert_lance_json_to_arrow(&batch).unwrap();
assert_eq!(converted.num_columns(), 1);
assert_eq!(converted.schema().field(0).data_type(), &DataType::Utf8);
}
#[test]
fn test_convert_json_columns_empty_batch() {
let mut field = ArrowField::new("data", DataType::Utf8, false);
let mut metadata = std::collections::HashMap::new();
metadata.insert(
ARROW_EXT_NAME_KEY.to_string(),
ARROW_JSON_EXT_NAME.to_string(),
);
field.set_metadata(metadata);
let schema = Arc::new(Schema::new(vec![field]));
let empty_strings = arrow_array::builder::StringBuilder::new().finish();
let batch =
RecordBatch::try_new(schema, vec![Arc::new(empty_strings) as ArrayRef]).unwrap();
let converted = convert_json_columns(&batch).unwrap();
assert_eq!(converted.num_rows(), 0);
assert_eq!(
converted.schema().field(0).data_type(),
&DataType::LargeBinary
);
}
#[test]
fn test_convert_json_columns_large_string() {
let json_strings = LargeStringArray::from(vec![
Some(r#"{"name": "Alice"}"#),
Some(r#"{"name": "Bob"}"#),
]);
let mut field = ArrowField::new("data", DataType::LargeUtf8, false);
let mut metadata = std::collections::HashMap::new();
metadata.insert(
ARROW_EXT_NAME_KEY.to_string(),
ARROW_JSON_EXT_NAME.to_string(),
);
field.set_metadata(metadata);
let schema = Arc::new(Schema::new(vec![field]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(json_strings) as ArrayRef]).unwrap();
let converted = convert_json_columns(&batch).unwrap();
assert_eq!(converted.num_columns(), 1);
assert_eq!(
converted.schema().field(0).data_type(),
&DataType::LargeBinary
);
assert_eq!(converted.num_rows(), 2);
}
#[test]
fn test_convert_json_columns_no_json_columns() {
let field = ArrowField::new("text", DataType::Utf8, true);
let schema = Arc::new(Schema::new(vec![field]));
let string_array = StringArray::from(vec![Some("hello"), Some("world")]);
let batch = RecordBatch::try_new(schema, vec![Arc::new(string_array) as ArrayRef]).unwrap();
let converted = convert_json_columns(&batch).unwrap();
assert_eq!(converted.num_columns(), 1);
assert_eq!(converted.schema().field(0).data_type(), &DataType::Utf8);
}
#[test]
fn test_convert_json_columns_mixed_columns() {
let json_strings = StringArray::from(vec![
Some(r#"{"name": "Alice"}"#),
Some(r#"{"name": "Bob"}"#),
]);
let text_strings = StringArray::from(vec![Some("hello"), Some("world")]);
let mut json_field = ArrowField::new("json_data", DataType::Utf8, false);
let mut metadata = std::collections::HashMap::new();
metadata.insert(
ARROW_EXT_NAME_KEY.to_string(),
ARROW_JSON_EXT_NAME.to_string(),
);
json_field.set_metadata(metadata);
let text_field = ArrowField::new("text_data", DataType::Utf8, true);
let schema = Arc::new(Schema::new(vec![json_field, text_field]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(json_strings) as ArrayRef,
Arc::new(text_strings) as ArrayRef,
],
)
.unwrap();
let converted = convert_json_columns(&batch).unwrap();
assert_eq!(converted.num_columns(), 2);
assert_eq!(
converted.schema().field(0).data_type(),
&DataType::LargeBinary
);
assert_eq!(converted.schema().field(1).data_type(), &DataType::Utf8);
}
#[test]
fn test_is_arrow_json_field_large_utf8() {
let mut field = ArrowField::new("data", DataType::LargeUtf8, true);
let mut metadata = std::collections::HashMap::new();
metadata.insert(
ARROW_EXT_NAME_KEY.to_string(),
ARROW_JSON_EXT_NAME.to_string(),
);
field.set_metadata(metadata);
assert!(is_arrow_json_field(&field));
}
#[test]
fn test_encode_json_invalid() {
let result = encode_json("not valid json {");
assert!(result.is_err());
}
#[test]
fn test_json_array_from_invalid_json() {
let result = JsonArray::try_from_iter(vec![Some("invalid json {")]);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Failed to encode"));
}
#[test]
fn test_try_from_string_array_invalid_json() {
let string_array = StringArray::from(vec![Some("invalid json {")]);
let result = JsonArray::try_from(string_array);
assert!(result.is_err());
}
#[test]
fn test_try_from_large_string_array_invalid_json() {
let large_string_array = LargeStringArray::from(vec![Some("invalid json {")]);
let result = JsonArray::try_from(large_string_array);
assert!(result.is_err());
}
#[test]
fn test_convert_lance_json_to_arrow_mixed_columns() {
let json_array = JsonArray::try_from_iter(vec![
Some(r#"{"name": "Alice"}"#),
Some(r#"{"name": "Bob"}"#),
])
.unwrap();
let text_strings = StringArray::from(vec![Some("hello"), Some("world")]);
let json_f = json_field("json_data", true);
let text_field = ArrowField::new("text_data", DataType::Utf8, true);
let schema = Arc::new(Schema::new(vec![json_f, text_field]));
let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(json_array.into_inner()) as ArrayRef,
Arc::new(text_strings) as ArrayRef,
],
)
.unwrap();
let converted = convert_lance_json_to_arrow(&batch).unwrap();
assert_eq!(converted.num_columns(), 2);
assert_eq!(converted.schema().field(0).data_type(), &DataType::Utf8);
assert_eq!(converted.schema().field(1).data_type(), &DataType::Utf8);
}
#[test]
fn test_json_path_invalid_path() {
let json_array = JsonArray::try_from_iter(vec![Some(r#"{"a": 1}"#)]).unwrap();
let result = json_array.json_path(0, "invalid path without $");
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Failed to extract JSONPath")
);
}
#[test]
fn test_convert_json_columns_invalid_storage_type() {
let int_array = arrow_array::Int32Array::from(vec![1, 2, 3]);
let mut field = ArrowField::new("data", DataType::Int32, false);
let mut metadata = std::collections::HashMap::new();
metadata.insert(
ARROW_EXT_NAME_KEY.to_string(),
ARROW_JSON_EXT_NAME.to_string(),
);
field.set_metadata(metadata);
let schema = Arc::new(Schema::new(vec![field]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(int_array) as ArrayRef]).unwrap();
let result = convert_json_columns(&batch);
assert!(result.is_ok());
}
#[test]
fn test_is_json_field_wrong_extension() {
let field = ArrowField::new("data", DataType::LargeBinary, true);
assert!(!is_json_field(&field));
let mut field2 = ArrowField::new("data", DataType::LargeBinary, true);
let mut metadata = std::collections::HashMap::new();
metadata.insert(
ARROW_EXT_NAME_KEY.to_string(),
"other.extension".to_string(),
);
field2.set_metadata(metadata);
assert!(!is_json_field(&field2));
}
#[test]
fn test_is_arrow_json_field_wrong_extension() {
let field = ArrowField::new("data", DataType::Utf8, true);
assert!(!is_arrow_json_field(&field));
let mut field2 = ArrowField::new("data", DataType::Utf8, true);
let mut metadata = std::collections::HashMap::new();
metadata.insert(
ARROW_EXT_NAME_KEY.to_string(),
"other.extension".to_string(),
);
field2.set_metadata(metadata);
assert!(!is_arrow_json_field(&field2));
let field3 = ArrowField::new("data", DataType::Int32, true);
assert!(!is_arrow_json_field(&field3));
}
#[test]
fn test_convert_json_columns_invalid_json_utf8() {
let invalid_json = StringArray::from(vec![Some("invalid json {")]);
let mut field = ArrowField::new("data", DataType::Utf8, false);
let mut metadata = std::collections::HashMap::new();
metadata.insert(
ARROW_EXT_NAME_KEY.to_string(),
ARROW_JSON_EXT_NAME.to_string(),
);
field.set_metadata(metadata);
let schema = Arc::new(Schema::new(vec![field]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(invalid_json) as ArrayRef]).unwrap();
let result = convert_json_columns(&batch);
assert!(result.is_err());
}
#[test]
fn test_convert_json_columns_invalid_json_large_utf8() {
let invalid_json = LargeStringArray::from(vec![Some("invalid json {")]);
let mut field = ArrowField::new("data", DataType::LargeUtf8, false);
let mut metadata = std::collections::HashMap::new();
metadata.insert(
ARROW_EXT_NAME_KEY.to_string(),
ARROW_JSON_EXT_NAME.to_string(),
);
field.set_metadata(metadata);
let schema = Arc::new(Schema::new(vec![field]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(invalid_json) as ArrayRef]).unwrap();
let result = convert_json_columns(&batch);
assert!(result.is_err());
}
#[test]
fn test_json_path_on_corrupted_jsonb() {
let corrupted_bytes: &[u8] = &[0xFF, 0xFE, 0x00, 0x01, 0x02];
let corrupted_binary = LargeBinaryArray::from(vec![Some(corrupted_bytes)]);
let corrupted_json = JsonArray {
inner: corrupted_binary,
};
let _result = corrupted_json.json_path(0, "$.a");
}
#[test]
fn test_decode_json_on_various_inputs() {
let valid_jsonb = encode_json(r#"{"key": "value"}"#).unwrap();
let decoded = decode_json(&valid_jsonb);
assert!(decoded.contains("key"));
let decoded_empty = decode_json(&[]);
let _ = decoded_empty;
let decoded_random = decode_json(&[0xFF, 0xFE, 0x00]);
let _ = decoded_random;
}
}