use crate::{EngineError, Result};
use arrow::json::reader::infer_json_schema;
use arrow::json::{LineDelimitedWriter, ReaderBuilder};
use arrow::record_batch::RecordBatch;
use serde_json::Value;
use std::io::{BufReader, Cursor, Seek};
use std::sync::Arc;
pub fn record_batches_to_json(batches: &[RecordBatch]) -> Result<Value> {
if batches.is_empty() {
return Ok(Value::Array(vec![]));
}
let mut json_output = Vec::new();
{
let mut writer = LineDelimitedWriter::new(&mut json_output);
for batch in batches {
writer
.write(batch)
.map_err(|e| EngineError::ArrowError(e.to_string()))?;
}
writer
.finish()
.map_err(|e| EngineError::ArrowError(e.to_string()))?;
}
let json_str =
String::from_utf8(json_output).map_err(|e| EngineError::ArrowError(e.to_string()))?;
let items: Vec<Value> = json_str
.lines()
.filter(|line| !line.is_empty())
.map(serde_json::from_str)
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| EngineError::InvalidJson(e.to_string()))?;
Ok(Value::Array(items))
}
pub fn record_batch_to_json(batch: &RecordBatch) -> Result<Value> {
record_batches_to_json(std::slice::from_ref(batch))
}
pub fn json_to_record_batches(value: &Value) -> Result<Vec<RecordBatch>> {
json_to_record_batches_with_batch_size(value, 1024)
}
pub fn json_to_record_batches_with_batch_size(
value: &Value,
batch_size: usize,
) -> Result<Vec<RecordBatch>> {
let array = value
.as_array()
.ok_or_else(|| EngineError::ArrowError("Arrow conversion requires an array".to_string()))?;
if array.is_empty() {
return Err(EngineError::ArrowError(
"Cannot convert empty array to Arrow".to_string(),
));
}
let mut ndjson = String::new();
for item in array {
ndjson.push_str(
&serde_json::to_string(item).map_err(|e| EngineError::InvalidJson(e.to_string()))?,
);
ndjson.push('\n');
}
let mut cursor = Cursor::new(ndjson.as_bytes());
let (schema, _) =
infer_json_schema(&mut cursor, None).map_err(|e| EngineError::ArrowError(e.to_string()))?;
cursor
.rewind()
.map_err(|e| EngineError::ArrowError(e.to_string()))?;
let buf_reader = BufReader::new(cursor);
let json_reader = ReaderBuilder::new(Arc::new(schema))
.with_batch_size(batch_size)
.build(buf_reader)
.map_err(|e| EngineError::ArrowError(e.to_string()))?;
let batches: Vec<RecordBatch> = json_reader
.collect::<std::result::Result<Vec<_>, _>>()
.map_err(|e| EngineError::ArrowError(e.to_string()))?;
if batches.is_empty() {
return Err(EngineError::ArrowError(
"No data converted to Arrow".to_string(),
));
}
Ok(batches)
}
pub fn infer_schema_from_json(value: &Value) -> Result<arrow::datatypes::Schema> {
let array = value
.as_array()
.ok_or_else(|| EngineError::ArrowError("Schema inference requires an array".to_string()))?;
if array.is_empty() {
return Err(EngineError::ArrowError(
"Cannot infer schema from empty array".to_string(),
));
}
let sample_size = array.len().min(100);
let mut ndjson = String::new();
for item in array.iter().take(sample_size) {
ndjson.push_str(
&serde_json::to_string(item).map_err(|e| EngineError::InvalidJson(e.to_string()))?,
);
ndjson.push('\n');
}
let mut cursor = Cursor::new(ndjson.as_bytes());
let (schema, _) =
infer_json_schema(&mut cursor, None).map_err(|e| EngineError::ArrowError(e.to_string()))?;
Ok(schema)
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_json_to_arrow_roundtrip() {
let input = json!([
{"id": 1, "name": "alice", "score": 95.5},
{"id": 2, "name": "bob", "score": 87.0},
{"id": 3, "name": "carol", "score": 92.3}
]);
let batches = json_to_record_batches(&input).unwrap();
assert!(!batches.is_empty());
let output = record_batches_to_json(&batches).unwrap();
let output_arr = output.as_array().unwrap();
assert_eq!(output_arr.len(), 3);
assert_eq!(output_arr[0]["name"], "alice");
assert_eq!(output_arr[1]["name"], "bob");
assert_eq!(output_arr[2]["name"], "carol");
}
#[test]
fn test_empty_batches_to_json() {
let result = record_batches_to_json(&[]).unwrap();
assert_eq!(result, Value::Array(vec![]));
}
#[test]
fn test_empty_array_to_arrow() {
let result = json_to_record_batches(&json!([]));
assert!(result.is_err());
}
#[test]
fn test_non_array_to_arrow() {
let result = json_to_record_batches(&json!({"not": "array"}));
assert!(result.is_err());
}
#[test]
fn test_infer_schema() {
let data = json!([
{"id": 1, "name": "alice", "active": true},
{"id": 2, "name": "bob", "active": false}
]);
let schema = infer_schema_from_json(&data).unwrap();
assert_eq!(schema.fields().len(), 3);
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
assert!(field_names.contains(&"id"));
assert!(field_names.contains(&"name"));
assert!(field_names.contains(&"active"));
}
#[test]
fn test_batch_size() {
let items: Vec<Value> = (0..100)
.map(|i| json!({"id": i, "value": format!("item_{}", i)}))
.collect();
let data = Value::Array(items);
let batches = json_to_record_batches_with_batch_size(&data, 10).unwrap();
assert!(batches.len() >= 10);
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 100);
}
#[test]
fn test_single_batch_to_json() {
let input = json!([
{"x": 1, "y": 2},
{"x": 3, "y": 4}
]);
let batches = json_to_record_batches(&input).unwrap();
assert!(!batches.is_empty());
let output = record_batch_to_json(&batches[0]).unwrap();
let arr = output.as_array().unwrap();
assert!(!arr.is_empty());
}
#[test]
fn test_various_json_types() {
let input = json!([
{
"int_val": 42,
"float_val": 3.125,
"string_val": "hello",
"bool_val": true,
"null_val": null
},
{
"int_val": -100,
"float_val": 2.75,
"string_val": "world",
"bool_val": false,
"null_val": null
}
]);
let batches = json_to_record_batches(&input).unwrap();
let output = record_batches_to_json(&batches).unwrap();
let arr = output.as_array().unwrap();
assert_eq!(arr.len(), 2);
assert_eq!(arr[0]["string_val"], "hello");
assert_eq!(arr[1]["string_val"], "world");
assert_eq!(arr[0]["bool_val"], true);
assert_eq!(arr[1]["bool_val"], false);
}
#[test]
fn test_nested_objects_flatten() {
let input = json!([
{"id": 1, "data": {"nested": "value"}},
{"id": 2, "data": {"nested": "other"}}
]);
let result = json_to_record_batches(&input);
let _ = result;
}
#[test]
fn test_array_field() {
let input = json!([
{"id": 1, "tags": ["a", "b", "c"]},
{"id": 2, "tags": ["d", "e"]}
]);
let result = json_to_record_batches(&input);
let _ = result;
}
#[test]
fn test_large_dataset_roundtrip() {
let items: Vec<Value> = (0..1000)
.map(|i| {
json!({
"id": i,
"name": format!("user_{}", i),
"score": (i as f64) * 0.1
})
})
.collect();
let input = Value::Array(items);
let batches = json_to_record_batches(&input).unwrap();
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 1000);
let output = record_batches_to_json(&batches).unwrap();
assert_eq!(output.as_array().unwrap().len(), 1000);
}
#[test]
fn test_infer_schema_empty_array_error() {
let result = infer_schema_from_json(&json!([]));
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("empty"));
}
#[test]
fn test_infer_schema_non_array_error() {
let result = infer_schema_from_json(&json!({"not": "array"}));
assert!(result.is_err());
}
#[test]
fn test_primitive_value_error() {
assert!(json_to_record_batches(&json!(42)).is_err());
assert!(json_to_record_batches(&json!("string")).is_err());
assert!(json_to_record_batches(&json!(true)).is_err());
assert!(json_to_record_batches(&json!(null)).is_err());
}
#[test]
fn test_schema_field_types() {
let data = json!([
{"int_field": 1, "float_field": 1.5, "str_field": "a", "bool_field": true}
]);
let schema = infer_schema_from_json(&data).unwrap();
assert_eq!(schema.fields().len(), 4);
assert!(schema.field_with_name("int_field").is_ok());
assert!(schema.field_with_name("float_field").is_ok());
assert!(schema.field_with_name("str_field").is_ok());
assert!(schema.field_with_name("bool_field").is_ok());
}
#[test]
fn test_single_row_roundtrip() {
let input = json!([{"a": 1}]);
let batches = json_to_record_batches(&input).unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 1);
let output = record_batches_to_json(&batches).unwrap();
let arr = output.as_array().unwrap();
assert_eq!(arr.len(), 1);
assert_eq!(arr[0]["a"], 1);
}
#[test]
fn test_all_null_values() {
let input = json!([
{"a": null},
{"a": null}
]);
let batches = json_to_record_batches(&input).unwrap();
assert!(!batches.is_empty());
let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
assert_eq!(total_rows, 2);
let output = record_batches_to_json(&batches).unwrap();
let arr = output.as_array().unwrap();
assert_eq!(arr.len(), 2);
for row in arr {
let obj = row.as_object().unwrap();
if let Some(val) = obj.get("a") {
assert!(val.is_null());
}
}
}
#[test]
fn test_mixed_null_values() {
let input = json!([
{"a": 1},
{"a": null},
{"a": 3}
]);
let batches = json_to_record_batches(&input).unwrap();
let output = record_batches_to_json(&batches).unwrap();
let arr = output.as_array().unwrap();
assert_eq!(arr.len(), 3);
assert_eq!(arr[0]["a"], 1);
assert!(arr[1].get("a").is_none() || arr[1]["a"].is_null());
assert_eq!(arr[2]["a"], 3);
}
#[test]
fn test_string_only_data() {
let input = json!([
{"first": "Alice", "last": "Smith"},
{"first": "Bob", "last": "Jones"},
{"first": "Carol", "last": "White"}
]);
let batches = json_to_record_batches(&input).unwrap();
let output = record_batches_to_json(&batches).unwrap();
let arr = output.as_array().unwrap();
assert_eq!(arr.len(), 3);
assert_eq!(arr[0]["first"], "Alice");
assert_eq!(arr[0]["last"], "Smith");
assert_eq!(arr[1]["first"], "Bob");
assert_eq!(arr[1]["last"], "Jones");
assert_eq!(arr[2]["first"], "Carol");
assert_eq!(arr[2]["last"], "White");
}
#[test]
fn test_boolean_field_roundtrip() {
let input = json!([
{"name": "flag_a", "enabled": true},
{"name": "flag_b", "enabled": false},
{"name": "flag_c", "enabled": true}
]);
let batches = json_to_record_batches(&input).unwrap();
let output = record_batches_to_json(&batches).unwrap();
let arr = output.as_array().unwrap();
assert_eq!(arr.len(), 3);
assert_eq!(arr[0]["enabled"], true);
assert_eq!(arr[1]["enabled"], false);
assert_eq!(arr[2]["enabled"], true);
assert_eq!(arr[0]["name"], "flag_a");
assert_eq!(arr[1]["name"], "flag_b");
assert_eq!(arr[2]["name"], "flag_c");
}
}