use std::io::{BufRead, BufReader, Read, Seek};
use std::sync::Arc;
use indexmap::map::IndexMap as HashMap;
use indexmap::set::IndexSet as HashSet;
use serde_json::json;
use serde_json::{map::Map as JsonMap, Value};
use arrow_array::builder::*;
use arrow_array::types::*;
use arrow_array::*;
use arrow_buffer::{bit_util, Buffer, MutableBuffer};
use arrow_cast::parse::Parser;
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::*;
#[derive(Debug, Clone)]
enum InferredType {
Scalar(HashSet<DataType>),
Array(Box<InferredType>),
Object(HashMap<String, InferredType>),
Any,
}
impl InferredType {
fn merge(&mut self, other: InferredType) -> Result<(), ArrowError> {
match (self, other) {
(InferredType::Array(s), InferredType::Array(o)) => {
s.merge(*o)?;
}
(InferredType::Scalar(self_hs), InferredType::Scalar(other_hs)) => {
other_hs.into_iter().for_each(|v| {
self_hs.insert(v);
});
}
(InferredType::Object(self_map), InferredType::Object(other_map)) => {
for (k, v) in other_map {
self_map.entry(k).or_insert(InferredType::Any).merge(v)?;
}
}
(s @ InferredType::Any, v) => {
*s = v;
}
(_, InferredType::Any) => {}
(
InferredType::Array(self_inner_type),
other_scalar @ InferredType::Scalar(_),
) => {
self_inner_type.merge(other_scalar)?;
}
(s @ InferredType::Scalar(_), InferredType::Array(mut other_inner_type)) => {
other_inner_type.merge(s.clone())?;
*s = InferredType::Array(other_inner_type);
}
(s, o) => {
return Err(ArrowError::JsonError(format!(
"Incompatible type found during schema inference: {s:?} v.s. {o:?}",
)));
}
}
Ok(())
}
}
fn coerce_data_type(dt: Vec<&DataType>) -> DataType {
let mut dt_iter = dt.into_iter().cloned();
let dt_init = dt_iter.next().unwrap_or(DataType::Utf8);
dt_iter.fold(dt_init, |l, r| match (l, r) {
(DataType::Boolean, DataType::Boolean) => DataType::Boolean,
(DataType::Int64, DataType::Int64) => DataType::Int64,
(DataType::Float64, DataType::Float64)
| (DataType::Float64, DataType::Int64)
| (DataType::Int64, DataType::Float64) => DataType::Float64,
(DataType::List(l), DataType::List(r)) => DataType::List(Box::new(Field::new(
"item",
coerce_data_type(vec![l.data_type(), r.data_type()]),
true,
))),
(DataType::List(e), not_list) | (not_list, DataType::List(e)) => {
DataType::List(Box::new(Field::new(
"item",
coerce_data_type(vec![e.data_type(), ¬_list]),
true,
)))
}
_ => DataType::Utf8,
})
}
fn generate_datatype(t: &InferredType) -> Result<DataType, ArrowError> {
Ok(match t {
InferredType::Scalar(hs) => coerce_data_type(hs.iter().collect()),
InferredType::Object(spec) => DataType::Struct(generate_fields(spec)?),
InferredType::Array(ele_type) => DataType::List(Box::new(Field::new(
"item",
generate_datatype(ele_type)?,
true,
))),
InferredType::Any => DataType::Null,
})
}
fn generate_fields(
spec: &HashMap<String, InferredType>,
) -> Result<Vec<Field>, ArrowError> {
spec.iter()
.map(|(k, types)| Ok(Field::new(k, generate_datatype(types)?, true)))
.collect()
}
fn generate_schema(spec: HashMap<String, InferredType>) -> Result<Schema, ArrowError> {
Ok(Schema::new(generate_fields(&spec)?))
}
#[derive(Debug)]
pub struct ValueIter<'a, R: Read> {
reader: &'a mut BufReader<R>,
max_read_records: Option<usize>,
record_count: usize,
line_buf: String,
}
impl<'a, R: Read> ValueIter<'a, R> {
pub fn new(reader: &'a mut BufReader<R>, max_read_records: Option<usize>) -> Self {
Self {
reader,
max_read_records,
record_count: 0,
line_buf: String::new(),
}
}
}
impl<'a, R: Read> Iterator for ValueIter<'a, R> {
type Item = Result<Value, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(max) = self.max_read_records {
if self.record_count >= max {
return None;
}
}
loop {
self.line_buf.truncate(0);
match self.reader.read_line(&mut self.line_buf) {
Ok(0) => {
return None;
}
Err(e) => {
return Some(Err(ArrowError::JsonError(format!(
"Failed to read JSON record: {e}"
))));
}
_ => {
let trimmed_s = self.line_buf.trim();
if trimmed_s.is_empty() {
continue;
}
self.record_count += 1;
return Some(serde_json::from_str(trimmed_s).map_err(|e| {
ArrowError::JsonError(format!("Not valid JSON: {e}"))
}));
}
}
}
}
}
pub fn infer_json_schema_from_seekable<R: Read + Seek>(
reader: &mut BufReader<R>,
max_read_records: Option<usize>,
) -> Result<Schema, ArrowError> {
let schema = infer_json_schema(reader, max_read_records);
reader.rewind()?;
schema
}
pub fn infer_json_schema<R: Read>(
reader: &mut BufReader<R>,
max_read_records: Option<usize>,
) -> Result<Schema, ArrowError> {
infer_json_schema_from_iterator(ValueIter::new(reader, max_read_records))
}
fn set_object_scalar_field_type(
field_types: &mut HashMap<String, InferredType>,
key: &str,
ftype: DataType,
) -> Result<(), ArrowError> {
if !field_types.contains_key(key) {
field_types.insert(key.to_string(), InferredType::Scalar(HashSet::new()));
}
match field_types.get_mut(key).unwrap() {
InferredType::Scalar(hs) => {
hs.insert(ftype);
Ok(())
}
scalar_array @ InferredType::Array(_) => {
let mut hs = HashSet::new();
hs.insert(ftype);
scalar_array.merge(InferredType::Scalar(hs))?;
Ok(())
}
t => Err(ArrowError::JsonError(format!(
"Expected scalar or scalar array JSON type, found: {t:?}",
))),
}
}
fn infer_scalar_array_type(array: &[Value]) -> Result<InferredType, ArrowError> {
let mut hs = HashSet::new();
for v in array {
match v {
Value::Null => {}
Value::Number(n) => {
if n.is_i64() {
hs.insert(DataType::Int64);
} else {
hs.insert(DataType::Float64);
}
}
Value::Bool(_) => {
hs.insert(DataType::Boolean);
}
Value::String(_) => {
hs.insert(DataType::Utf8);
}
Value::Array(_) | Value::Object(_) => {
return Err(ArrowError::JsonError(format!(
"Expected scalar value for scalar array, got: {v:?}"
)));
}
}
}
Ok(InferredType::Scalar(hs))
}
fn infer_nested_array_type(array: &[Value]) -> Result<InferredType, ArrowError> {
let mut inner_ele_type = InferredType::Any;
for v in array {
match v {
Value::Array(inner_array) => {
inner_ele_type.merge(infer_array_element_type(inner_array)?)?;
}
x => {
return Err(ArrowError::JsonError(format!(
"Got non array element in nested array: {x:?}"
)));
}
}
}
Ok(InferredType::Array(Box::new(inner_ele_type)))
}
fn infer_struct_array_type(array: &[Value]) -> Result<InferredType, ArrowError> {
let mut field_types = HashMap::new();
for v in array {
match v {
Value::Object(map) => {
collect_field_types_from_object(&mut field_types, map)?;
}
_ => {
return Err(ArrowError::JsonError(format!(
"Expected struct value for struct array, got: {v:?}"
)));
}
}
}
Ok(InferredType::Object(field_types))
}
fn infer_array_element_type(array: &[Value]) -> Result<InferredType, ArrowError> {
match array.iter().take(1).next() {
None => Ok(InferredType::Any), Some(a) => match a {
Value::Array(_) => infer_nested_array_type(array),
Value::Object(_) => infer_struct_array_type(array),
_ => infer_scalar_array_type(array),
},
}
}
fn collect_field_types_from_object(
field_types: &mut HashMap<String, InferredType>,
map: &JsonMap<String, Value>,
) -> Result<(), ArrowError> {
for (k, v) in map {
match v {
Value::Array(array) => {
let ele_type = infer_array_element_type(array)?;
if !field_types.contains_key(k) {
match ele_type {
InferredType::Scalar(_) => {
field_types.insert(
k.to_string(),
InferredType::Array(Box::new(InferredType::Scalar(
HashSet::new(),
))),
);
}
InferredType::Object(_) => {
field_types.insert(
k.to_string(),
InferredType::Array(Box::new(InferredType::Object(
HashMap::new(),
))),
);
}
InferredType::Any | InferredType::Array(_) => {
field_types.insert(
k.to_string(),
InferredType::Array(Box::new(InferredType::Any)),
);
}
}
}
match field_types.get_mut(k).unwrap() {
InferredType::Array(inner_type) => {
inner_type.merge(ele_type)?;
}
field_type @ InferredType::Scalar(_) => {
field_type.merge(ele_type)?;
*field_type = InferredType::Array(Box::new(field_type.clone()));
}
t => {
return Err(ArrowError::JsonError(format!(
"Expected array json type, found: {t:?}",
)));
}
}
}
Value::Bool(_) => {
set_object_scalar_field_type(field_types, k, DataType::Boolean)?;
}
Value::Null => {
}
Value::Number(n) => {
if n.is_f64() {
set_object_scalar_field_type(field_types, k, DataType::Float64)?;
} else {
set_object_scalar_field_type(field_types, k, DataType::Int64)?;
}
}
Value::String(_) => {
set_object_scalar_field_type(field_types, k, DataType::Utf8)?;
}
Value::Object(inner_map) => {
if !field_types.contains_key(k) {
field_types
.insert(k.to_string(), InferredType::Object(HashMap::new()));
}
match field_types.get_mut(k).unwrap() {
InferredType::Object(inner_field_types) => {
collect_field_types_from_object(inner_field_types, inner_map)?;
}
t => {
return Err(ArrowError::JsonError(format!(
"Expected object json type, found: {t:?}",
)));
}
}
}
}
}
Ok(())
}
pub fn infer_json_schema_from_iterator<I>(value_iter: I) -> Result<Schema, ArrowError>
where
I: Iterator<Item = Result<Value, ArrowError>>,
{
let mut field_types: HashMap<String, InferredType> = HashMap::new();
for record in value_iter {
match record? {
Value::Object(map) => {
collect_field_types_from_object(&mut field_types, &map)?;
}
value => {
return Err(ArrowError::JsonError(format!(
"Expected JSON record to be an object, found {value:?}"
)));
}
};
}
generate_schema(field_types)
}
#[derive(Debug)]
pub struct Decoder {
schema: SchemaRef,
options: DecoderOptions,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DecoderOptions {
batch_size: usize,
projection: Option<Vec<String>>,
format_strings: Option<HashMap<String, String>>,
}
impl Default for DecoderOptions {
fn default() -> Self {
Self {
batch_size: 1024,
projection: None,
format_strings: None,
}
}
}
impl DecoderOptions {
pub fn new() -> Self {
Default::default()
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.batch_size = batch_size;
self
}
pub fn with_projection(mut self, projection: Vec<String>) -> Self {
self.projection = Some(projection);
self
}
pub fn with_format_strings(
mut self,
format_strings: HashMap<String, String>,
) -> Self {
self.format_strings = Some(format_strings);
self
}
}
impl Decoder {
pub fn new(schema: SchemaRef, options: DecoderOptions) -> Self {
Self { schema, options }
}
pub fn schema(&self) -> SchemaRef {
match &self.options.projection {
Some(projection) => {
let fields = self.schema.fields();
let projected_fields: Vec<Field> = fields
.iter()
.filter_map(|field| {
if projection.contains(field.name()) {
Some(field.clone())
} else {
None
}
})
.collect();
Arc::new(Schema::new(projected_fields))
}
None => self.schema.clone(),
}
}
pub fn next_batch<I>(
&self,
value_iter: &mut I,
) -> Result<Option<RecordBatch>, ArrowError>
where
I: Iterator<Item = Result<Value, ArrowError>>,
{
let batch_size = self.options.batch_size;
let mut rows: Vec<Value> = Vec::with_capacity(batch_size);
for value in value_iter.by_ref().take(batch_size) {
let v = value?;
match v {
Value::Object(_) => rows.push(v),
_ => {
return Err(ArrowError::JsonError(format!(
"Row needs to be of type object, got: {v:?}"
)));
}
}
}
if rows.is_empty() {
return Ok(None);
}
let rows = &rows[..];
let arrays =
self.build_struct_array(rows, self.schema.fields(), &self.options.projection);
let projected_fields = if let Some(projection) = self.options.projection.as_ref()
{
projection
.iter()
.filter_map(|name| self.schema.column_with_name(name))
.map(|(_, field)| field.clone())
.collect()
} else {
self.schema.fields().to_vec()
};
let projected_schema = Arc::new(Schema::new(projected_fields));
arrays.and_then(|arr| {
RecordBatch::try_new_with_options(
projected_schema,
arr,
&RecordBatchOptions::new()
.with_match_field_names(true)
.with_row_count(Some(rows.len())),
)
.map(Some)
})
}
fn build_wrapped_list_array(
&self,
rows: &[Value],
col_name: &str,
key_type: &DataType,
) -> Result<ArrayRef, ArrowError> {
match *key_type {
DataType::Int8 => {
let dtype = DataType::Dictionary(
Box::new(DataType::Int8),
Box::new(DataType::Utf8),
);
self.list_array_string_array_builder::<Int8Type>(&dtype, col_name, rows)
}
DataType::Int16 => {
let dtype = DataType::Dictionary(
Box::new(DataType::Int16),
Box::new(DataType::Utf8),
);
self.list_array_string_array_builder::<Int16Type>(&dtype, col_name, rows)
}
DataType::Int32 => {
let dtype = DataType::Dictionary(
Box::new(DataType::Int32),
Box::new(DataType::Utf8),
);
self.list_array_string_array_builder::<Int32Type>(&dtype, col_name, rows)
}
DataType::Int64 => {
let dtype = DataType::Dictionary(
Box::new(DataType::Int64),
Box::new(DataType::Utf8),
);
self.list_array_string_array_builder::<Int64Type>(&dtype, col_name, rows)
}
DataType::UInt8 => {
let dtype = DataType::Dictionary(
Box::new(DataType::UInt8),
Box::new(DataType::Utf8),
);
self.list_array_string_array_builder::<UInt8Type>(&dtype, col_name, rows)
}
DataType::UInt16 => {
let dtype = DataType::Dictionary(
Box::new(DataType::UInt16),
Box::new(DataType::Utf8),
);
self.list_array_string_array_builder::<UInt16Type>(&dtype, col_name, rows)
}
DataType::UInt32 => {
let dtype = DataType::Dictionary(
Box::new(DataType::UInt32),
Box::new(DataType::Utf8),
);
self.list_array_string_array_builder::<UInt32Type>(&dtype, col_name, rows)
}
DataType::UInt64 => {
let dtype = DataType::Dictionary(
Box::new(DataType::UInt64),
Box::new(DataType::Utf8),
);
self.list_array_string_array_builder::<UInt64Type>(&dtype, col_name, rows)
}
ref e => Err(ArrowError::JsonError(format!(
"Data type is currently not supported for dictionaries in list : {e:?}"
))),
}
}
#[inline(always)]
fn list_array_string_array_builder<DT>(
&self,
data_type: &DataType,
col_name: &str,
rows: &[Value],
) -> Result<ArrayRef, ArrowError>
where
DT: ArrowPrimitiveType + ArrowDictionaryKeyType,
{
let mut builder: Box<dyn ArrayBuilder> = match data_type {
DataType::Utf8 => {
let values_builder =
StringBuilder::with_capacity(rows.len(), rows.len() * 5);
Box::new(ListBuilder::new(values_builder))
}
DataType::Dictionary(_, _) => {
let values_builder =
self.build_string_dictionary_builder::<DT>(rows.len() * 5);
Box::new(ListBuilder::new(values_builder))
}
e => {
return Err(ArrowError::JsonError(format!(
"Nested list data builder type is not supported: {e:?}"
)))
}
};
for row in rows {
if let Some(value) = row.get(col_name) {
let vals: Vec<Option<String>> = if let Value::String(v) = value {
vec![Some(v.to_string())]
} else if let Value::Array(n) = value {
n.iter()
.map(|v: &Value| {
if v.is_string() {
Some(v.as_str().unwrap().to_string())
} else if v.is_array() || v.is_object() || v.is_null() {
None
} else {
Some(v.to_string())
}
})
.collect()
} else if let Value::Null = value {
vec![None]
} else if !value.is_object() {
vec![Some(value.to_string())]
} else {
return Err(ArrowError::JsonError(
"Only scalars are currently supported in JSON arrays".to_string(),
));
};
match data_type {
DataType::Utf8 => {
let builder = builder
.as_any_mut()
.downcast_mut::<ListBuilder<StringBuilder>>()
.ok_or_else(||ArrowError::JsonError(
"Cast failed for ListBuilder<StringBuilder> during nested data parsing".to_string(),
))?;
for val in vals {
if let Some(v) = val {
builder.values().append_value(&v);
} else {
builder.values().append_null();
};
}
builder.append(true);
}
DataType::Dictionary(_, _) => {
let builder = builder.as_any_mut().downcast_mut::<ListBuilder<StringDictionaryBuilder<DT>>>().ok_or_else(||ArrowError::JsonError(
"Cast failed for ListBuilder<StringDictionaryBuilder> during nested data parsing".to_string(),
))?;
for val in vals {
if let Some(v) = val {
let _ = builder.values().append(&v);
} else {
builder.values().append_null();
};
}
builder.append(true);
}
e => {
return Err(ArrowError::JsonError(format!(
"Nested list data builder type is not supported: {e:?}"
)))
}
}
}
}
Ok(builder.finish() as ArrayRef)
}
#[inline(always)]
fn build_string_dictionary_builder<T>(
&self,
row_len: usize,
) -> StringDictionaryBuilder<T>
where
T: ArrowPrimitiveType + ArrowDictionaryKeyType,
{
StringDictionaryBuilder::with_capacity(row_len, row_len, row_len * 5)
}
#[inline(always)]
fn build_string_dictionary_array(
&self,
rows: &[Value],
col_name: &str,
key_type: &DataType,
value_type: &DataType,
) -> Result<ArrayRef, ArrowError> {
if let DataType::Utf8 = *value_type {
match *key_type {
DataType::Int8 => self.build_dictionary_array::<Int8Type>(rows, col_name),
DataType::Int16 => {
self.build_dictionary_array::<Int16Type>(rows, col_name)
}
DataType::Int32 => {
self.build_dictionary_array::<Int32Type>(rows, col_name)
}
DataType::Int64 => {
self.build_dictionary_array::<Int64Type>(rows, col_name)
}
DataType::UInt8 => {
self.build_dictionary_array::<UInt8Type>(rows, col_name)
}
DataType::UInt16 => {
self.build_dictionary_array::<UInt16Type>(rows, col_name)
}
DataType::UInt32 => {
self.build_dictionary_array::<UInt32Type>(rows, col_name)
}
DataType::UInt64 => {
self.build_dictionary_array::<UInt64Type>(rows, col_name)
}
_ => Err(ArrowError::JsonError(
"unsupported dictionary key type".to_string(),
)),
}
} else {
Err(ArrowError::JsonError(
"dictionary types other than UTF-8 not yet supported".to_string(),
))
}
}
fn build_boolean_array(
&self,
rows: &[Value],
col_name: &str,
) -> Result<ArrayRef, ArrowError> {
let mut builder = BooleanBuilder::with_capacity(rows.len());
for row in rows {
if let Some(value) = row.get(col_name) {
if let Some(boolean) = value.as_bool() {
builder.append_value(boolean);
} else {
builder.append_null();
}
} else {
builder.append_null();
}
}
Ok(Arc::new(builder.finish()))
}
fn build_primitive_array<T: ArrowPrimitiveType + Parser>(
&self,
rows: &[Value],
col_name: &str,
) -> Result<ArrayRef, ArrowError>
where
T: ArrowPrimitiveType,
T::Native: num::NumCast,
{
let format_string = self
.options
.format_strings
.as_ref()
.and_then(|fmts| fmts.get(col_name));
Ok(Arc::new(
rows.iter()
.map(|row| {
row.get(col_name).and_then(|value| {
if value.is_i64() {
value.as_i64().and_then(num::cast::cast)
} else if value.is_u64() {
value.as_u64().and_then(num::cast::cast)
} else if value.is_string() {
match format_string {
Some(fmt) => {
T::parse_formatted(value.as_str().unwrap(), fmt)
}
None => T::parse(value.as_str().unwrap()),
}
} else {
value.as_f64().and_then(num::cast::cast)
}
})
})
.collect::<PrimitiveArray<T>>(),
))
}
fn build_nested_list_array<OffsetSize: OffsetSizeTrait>(
&self,
rows: &[Value],
list_field: &Field,
) -> Result<ArrayRef, ArrowError> {
let mut cur_offset = OffsetSize::zero();
let list_len = rows.len();
let num_list_bytes = bit_util::ceil(list_len, 8);
let mut offsets = Vec::with_capacity(list_len + 1);
let mut list_nulls = MutableBuffer::from_len_zeroed(num_list_bytes);
let list_nulls = list_nulls.as_slice_mut();
offsets.push(cur_offset);
rows.iter().enumerate().for_each(|(i, v)| {
if let Value::Array(a) = v {
cur_offset += OffsetSize::from_usize(a.len()).unwrap();
bit_util::set_bit(list_nulls, i);
} else if let Value::Null = v {
} else {
cur_offset += OffsetSize::one();
}
offsets.push(cur_offset);
});
let valid_len = cur_offset.to_usize().unwrap();
let array_data = match list_field.data_type() {
DataType::Null => NullArray::new(valid_len).into_data(),
DataType::Boolean => {
let num_bytes = bit_util::ceil(valid_len, 8);
let mut bool_values = MutableBuffer::from_len_zeroed(num_bytes);
let mut bool_nulls =
MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
let mut curr_index = 0;
rows.iter().for_each(|v| {
if let Value::Array(vs) = v {
vs.iter().for_each(|value| {
if let Value::Bool(child) = value {
if *child {
bit_util::set_bit(
bool_values.as_slice_mut(),
curr_index,
);
}
} else {
bit_util::unset_bit(
bool_nulls.as_slice_mut(),
curr_index,
);
}
curr_index += 1;
});
}
});
unsafe {
ArrayData::builder(list_field.data_type().clone())
.len(valid_len)
.add_buffer(bool_values.into())
.null_bit_buffer(Some(bool_nulls.into()))
.build_unchecked()
}
}
DataType::Int8 => self.read_primitive_list_values::<Int8Type>(rows),
DataType::Int16 => self.read_primitive_list_values::<Int16Type>(rows),
DataType::Int32 => self.read_primitive_list_values::<Int32Type>(rows),
DataType::Int64 => self.read_primitive_list_values::<Int64Type>(rows),
DataType::UInt8 => self.read_primitive_list_values::<UInt8Type>(rows),
DataType::UInt16 => self.read_primitive_list_values::<UInt16Type>(rows),
DataType::UInt32 => self.read_primitive_list_values::<UInt32Type>(rows),
DataType::UInt64 => self.read_primitive_list_values::<UInt64Type>(rows),
DataType::Float16 => {
return Err(ArrowError::JsonError("Float16 not supported".to_string()))
}
DataType::Float32 => self.read_primitive_list_values::<Float32Type>(rows),
DataType::Float64 => self.read_primitive_list_values::<Float64Type>(rows),
DataType::Timestamp(_, _)
| DataType::Date32
| DataType::Date64
| DataType::Time32(_)
| DataType::Time64(_) => {
return Err(ArrowError::JsonError(
"Temporal types are not yet supported, see ARROW-4803".to_string(),
))
}
DataType::Utf8 => flatten_json_string_values(rows)
.into_iter()
.collect::<StringArray>()
.data()
.clone(),
DataType::LargeUtf8 => flatten_json_string_values(rows)
.into_iter()
.collect::<LargeStringArray>()
.data()
.clone(),
DataType::List(field) => {
let child = self
.build_nested_list_array::<i32>(&flatten_json_values(rows), field)?;
child.into_data()
}
DataType::LargeList(field) => {
let child = self
.build_nested_list_array::<i64>(&flatten_json_values(rows), field)?;
child.into_data()
}
DataType::Struct(fields) => {
let array_item_count = cur_offset.to_usize().unwrap();
let num_bytes = bit_util::ceil(array_item_count, 8);
let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes);
let mut struct_index = 0;
let rows: Vec<Value> = rows
.iter()
.flat_map(|row| match row {
Value::Array(values) if !values.is_empty() => {
values.iter().for_each(|value| {
if !value.is_null() {
bit_util::set_bit(
null_buffer.as_slice_mut(),
struct_index,
);
}
struct_index += 1;
});
values.clone()
}
_ => {
vec![]
}
})
.collect();
let arrays =
self.build_struct_array(rows.as_slice(), fields.as_slice(), &None)?;
let data_type = DataType::Struct(fields.clone());
let buf = null_buffer.into();
unsafe {
ArrayDataBuilder::new(data_type)
.len(rows.len())
.null_bit_buffer(Some(buf))
.child_data(arrays.into_iter().map(|a| a.into_data()).collect())
.build_unchecked()
}
}
datatype => {
return Err(ArrowError::JsonError(format!(
"Nested list of {datatype:?} not supported"
)));
}
};
let list_data = ArrayData::builder(DataType::List(Box::new(list_field.clone())))
.len(list_len)
.add_buffer(Buffer::from_slice_ref(&offsets))
.add_child_data(array_data)
.null_bit_buffer(Some(list_nulls.into()));
let list_data = unsafe { list_data.build_unchecked() };
Ok(Arc::new(GenericListArray::<OffsetSize>::from(list_data)))
}
fn build_struct_array(
&self,
rows: &[Value],
struct_fields: &[Field],
projection: &Option<Vec<String>>,
) -> Result<Vec<ArrayRef>, ArrowError> {
let arrays: Result<Vec<ArrayRef>, ArrowError> = struct_fields
.iter()
.filter(|field| {
projection
.as_ref()
.map(|p| p.contains(field.name()))
.unwrap_or(true)
})
.map(|field| {
match field.data_type() {
DataType::Null => {
Ok(Arc::new(NullArray::new(rows.len())) as ArrayRef)
}
DataType::Boolean => self.build_boolean_array(rows, field.name()),
DataType::Float64 => {
self.build_primitive_array::<Float64Type>(rows, field.name())
}
DataType::Float32 => {
self.build_primitive_array::<Float32Type>(rows, field.name())
}
DataType::Int64 => {
self.build_primitive_array::<Int64Type>(rows, field.name())
}
DataType::Int32 => {
self.build_primitive_array::<Int32Type>(rows, field.name())
}
DataType::Int16 => {
self.build_primitive_array::<Int16Type>(rows, field.name())
}
DataType::Int8 => {
self.build_primitive_array::<Int8Type>(rows, field.name())
}
DataType::UInt64 => {
self.build_primitive_array::<UInt64Type>(rows, field.name())
}
DataType::UInt32 => {
self.build_primitive_array::<UInt32Type>(rows, field.name())
}
DataType::UInt16 => {
self.build_primitive_array::<UInt16Type>(rows, field.name())
}
DataType::UInt8 => {
self.build_primitive_array::<UInt8Type>(rows, field.name())
}
DataType::Timestamp(unit, _) => match unit {
TimeUnit::Second => self
.build_primitive_array::<TimestampSecondType>(
rows,
field.name(),
),
TimeUnit::Microsecond => self
.build_primitive_array::<TimestampMicrosecondType>(
rows,
field.name(),
),
TimeUnit::Millisecond => self
.build_primitive_array::<TimestampMillisecondType>(
rows,
field.name(),
),
TimeUnit::Nanosecond => self
.build_primitive_array::<TimestampNanosecondType>(
rows,
field.name(),
),
},
DataType::Date64 => {
self.build_primitive_array::<Date64Type>(rows, field.name())
}
DataType::Date32 => {
self.build_primitive_array::<Date32Type>(rows, field.name())
}
DataType::Time64(unit) => match unit {
TimeUnit::Microsecond => self
.build_primitive_array::<Time64MicrosecondType>(
rows,
field.name(),
),
TimeUnit::Nanosecond => self
.build_primitive_array::<Time64NanosecondType>(
rows,
field.name(),
),
t => Err(ArrowError::JsonError(format!(
"TimeUnit {t:?} not supported with Time64"
))),
},
DataType::Time32(unit) => match unit {
TimeUnit::Second => self
.build_primitive_array::<Time32SecondType>(
rows,
field.name(),
),
TimeUnit::Millisecond => self
.build_primitive_array::<Time32MillisecondType>(
rows,
field.name(),
),
t => Err(ArrowError::JsonError(format!(
"TimeUnit {t:?} not supported with Time32"
))),
},
DataType::Utf8 => Ok(Arc::new(
rows.iter()
.map(|row| {
let maybe_value = row.get(field.name());
maybe_value.and_then(|value| value.as_str())
})
.collect::<StringArray>(),
) as ArrayRef),
DataType::Binary => Ok(Arc::new(
rows.iter()
.map(|row| {
let maybe_value = row.get(field.name());
maybe_value.and_then(|value| value.as_str())
})
.collect::<BinaryArray>(),
) as ArrayRef),
DataType::List(ref list_field) => {
match list_field.data_type() {
DataType::Dictionary(ref key_ty, _) => {
self.build_wrapped_list_array(rows, field.name(), key_ty)
}
_ => {
let extracted_rows = rows
.iter()
.map(|row| {
row.get(field.name())
.cloned()
.unwrap_or(Value::Null)
})
.collect::<Vec<Value>>();
self.build_nested_list_array::<i32>(
extracted_rows.as_slice(),
list_field,
)
}
}
}
DataType::Dictionary(ref key_ty, ref val_ty) => self
.build_string_dictionary_array(
rows,
field.name(),
key_ty,
val_ty,
),
DataType::Struct(fields) => {
let len = rows.len();
let num_bytes = bit_util::ceil(len, 8);
let mut null_buffer = MutableBuffer::from_len_zeroed(num_bytes);
let struct_rows = rows
.iter()
.enumerate()
.map(|(i, row)| {
(i, row.as_object().and_then(|v| v.get(field.name())))
})
.map(|(i, v)| match v {
Some(Value::Object(value)) => {
bit_util::set_bit(null_buffer.as_slice_mut(), i);
Value::Object(value.clone())
}
_ => Value::Object(Default::default()),
})
.collect::<Vec<Value>>();
let arrays =
self.build_struct_array(&struct_rows, fields, &None)?;
let data_type = DataType::Struct(fields.clone());
let data = ArrayDataBuilder::new(data_type)
.len(len)
.null_bit_buffer(Some(null_buffer.into()))
.child_data(
arrays.into_iter().map(|a| a.into_data()).collect(),
);
let data = unsafe { data.build_unchecked() };
Ok(make_array(data))
}
DataType::Map(map_field, _) => self.build_map_array(
rows,
field.name(),
field.data_type(),
map_field,
),
_ => Err(ArrowError::JsonError(format!(
"{:?} type is not supported",
field.data_type()
))),
}
})
.collect();
arrays
}
fn build_map_array(
&self,
rows: &[Value],
field_name: &str,
map_type: &DataType,
struct_field: &Field,
) -> Result<ArrayRef, ArrowError> {
let (key_field, value_field) =
if let DataType::Struct(fields) = struct_field.data_type() {
if fields.len() != 2 {
return Err(ArrowError::InvalidArgumentError(format!(
"DataType::Map expects a struct with 2 fields, found {} fields",
fields.len()
)));
}
(&fields[0], &fields[1])
} else {
return Err(ArrowError::InvalidArgumentError(format!(
"JSON map array builder expects a DataType::Map, found {:?}",
struct_field.data_type()
)));
};
let value_map_iter = rows.iter().map(|value| {
value
.get(field_name)
.and_then(|v| v.as_object().map(|map| (map, map.len() as i32)))
});
let rows_len = rows.len();
let mut list_offsets = Vec::with_capacity(rows_len + 1);
list_offsets.push(0i32);
let mut last_offset = 0;
let num_bytes = bit_util::ceil(rows_len, 8);
let mut list_bitmap = MutableBuffer::from_len_zeroed(num_bytes);
let null_data = list_bitmap.as_slice_mut();
let struct_rows = value_map_iter
.enumerate()
.filter_map(|(i, v)| match v {
Some((map, len)) => {
list_offsets.push(last_offset + len);
last_offset += len;
bit_util::set_bit(null_data, i);
Some(map.iter().map(|(k, v)| {
json!({
key_field.name(): k,
value_field.name(): v
})
}))
}
None => {
list_offsets.push(last_offset);
None
}
})
.flatten()
.collect::<Vec<Value>>();
let struct_children = self.build_struct_array(
struct_rows.as_slice(),
&[key_field.clone(), value_field.clone()],
&None,
)?;
unsafe {
Ok(make_array(ArrayData::new_unchecked(
map_type.clone(),
rows_len,
None,
Some(list_bitmap.into()),
0,
vec![Buffer::from_slice_ref(&list_offsets)],
vec![ArrayData::new_unchecked(
struct_field.data_type().clone(),
struct_children[0].len(),
None,
None,
0,
vec![],
struct_children
.into_iter()
.map(|array| array.into_data())
.collect(),
)],
)))
}
}
#[inline(always)]
fn build_dictionary_array<T>(
&self,
rows: &[Value],
col_name: &str,
) -> Result<ArrayRef, ArrowError>
where
T::Native: num::NumCast,
T: ArrowPrimitiveType + ArrowDictionaryKeyType,
{
let mut builder: StringDictionaryBuilder<T> =
self.build_string_dictionary_builder(rows.len());
for row in rows {
if let Some(value) = row.get(col_name) {
if let Some(str_v) = value.as_str() {
builder.append(str_v).map(drop)?
} else {
builder.append_null();
}
} else {
builder.append_null();
}
}
Ok(Arc::new(builder.finish()) as ArrayRef)
}
fn read_primitive_list_values<T>(&self, rows: &[Value]) -> ArrayData
where
T: ArrowPrimitiveType,
T::Native: num::NumCast,
{
let values = rows
.iter()
.flat_map(|row| {
if let Value::Array(values) = row {
values
.iter()
.map(|value| {
let v: Option<T::Native> =
value.as_f64().and_then(num::cast::cast);
v
})
.collect::<Vec<Option<T::Native>>>()
} else if let Value::Number(value) = row {
let v: Option<T::Native> = value.as_f64().and_then(num::cast::cast);
v.map(|v| vec![Some(v)]).unwrap_or_default()
} else {
vec![]
}
})
.collect::<Vec<Option<T::Native>>>();
let array = values.iter().collect::<PrimitiveArray<T>>();
array.into_data()
}
}
#[inline(always)]
fn json_value_as_string(value: &Value) -> Option<String> {
match value {
Value::Null => None,
Value::String(string) => Some(string.clone()),
_ => Some(value.to_string()),
}
}
#[inline]
fn flatten_json_values(values: &[Value]) -> Vec<Value> {
values
.iter()
.flat_map(|row| {
if let Value::Array(values) = row {
values.clone()
} else if let Value::Null = row {
vec![Value::Null]
} else {
vec![row.clone()]
}
})
.collect()
}
#[inline]
fn flatten_json_string_values(values: &[Value]) -> Vec<Option<String>> {
values
.iter()
.flat_map(|row| {
if let Value::Array(values) = row {
values
.iter()
.map(json_value_as_string)
.collect::<Vec<Option<_>>>()
} else if let Value::Null = row {
vec![]
} else {
vec![json_value_as_string(row)]
}
})
.collect::<Vec<Option<_>>>()
}
#[derive(Debug)]
pub struct Reader<R: Read> {
reader: BufReader<R>,
decoder: Decoder,
}
impl<R: Read> Reader<R> {
pub fn new(reader: R, schema: SchemaRef, options: DecoderOptions) -> Self {
Self::from_buf_reader(BufReader::new(reader), schema, options)
}
pub fn from_buf_reader(
reader: BufReader<R>,
schema: SchemaRef,
options: DecoderOptions,
) -> Self {
Self {
reader,
decoder: Decoder::new(schema, options),
}
}
pub fn schema(&self) -> SchemaRef {
self.decoder.schema()
}
#[allow(clippy::should_implement_trait)]
pub fn next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
self.decoder
.next_batch(&mut ValueIter::new(&mut self.reader, None))
}
}
#[derive(Debug, Default)]
pub struct ReaderBuilder {
schema: Option<SchemaRef>,
max_records: Option<usize>,
options: DecoderOptions,
}
impl ReaderBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
self
}
pub fn infer_schema(mut self, max_records: Option<usize>) -> Self {
self.schema = None;
self.max_records = max_records;
self
}
pub fn with_batch_size(mut self, batch_size: usize) -> Self {
self.options = self.options.with_batch_size(batch_size);
self
}
pub fn with_projection(mut self, projection: Vec<String>) -> Self {
self.options = self.options.with_projection(projection);
self
}
pub fn with_format_strings(
mut self,
format_strings: HashMap<String, String>,
) -> Self {
self.options = self.options.with_format_strings(format_strings);
self
}
pub fn build<R>(self, source: R) -> Result<Reader<R>, ArrowError>
where
R: Read + Seek,
{
let mut buf_reader = BufReader::new(source);
let schema = match self.schema {
Some(schema) => schema,
None => Arc::new(infer_json_schema_from_seekable(
&mut buf_reader,
self.max_records,
)?),
};
Ok(Reader::from_buf_reader(buf_reader, schema, self.options))
}
}
impl<R: Read> Iterator for Reader<R> {
type Item = Result<RecordBatch, ArrowError>;
fn next(&mut self) -> Option<Self::Item> {
self.next().transpose()
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::cast::{
as_boolean_array, as_dictionary_array, as_primitive_array, as_string_array,
as_struct_array,
};
use arrow_buffer::ToByteSlice;
use arrow_schema::DataType::{Dictionary, List};
use flate2::read::GzDecoder;
use std::fs::File;
use std::io::Cursor;
#[test]
fn test_json_basic() {
let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(5, batch.num_columns());
assert_eq!(12, batch.num_rows());
let schema = reader.schema();
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let a = schema.column_with_name("a").unwrap();
assert_eq!(0, a.0);
assert_eq!(&DataType::Int64, a.1.data_type());
let b = schema.column_with_name("b").unwrap();
assert_eq!(1, b.0);
assert_eq!(&DataType::Float64, b.1.data_type());
let c = schema.column_with_name("c").unwrap();
assert_eq!(2, c.0);
assert_eq!(&DataType::Boolean, c.1.data_type());
let d = schema.column_with_name("d").unwrap();
assert_eq!(3, d.0);
assert_eq!(&DataType::Utf8, d.1.data_type());
let aa = batch
.column(a.0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(1, aa.value(0));
assert_eq!(-10, aa.value(1));
let bb = batch
.column(b.0)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert_eq!(2.0, bb.value(0));
assert_eq!(-3.5, bb.value(1));
let cc = batch
.column(c.0)
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap();
assert!(!cc.value(0));
assert!(cc.value(10));
let dd = batch
.column(d.0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert_eq!("4", dd.value(0));
assert_eq!("text", dd.value(8));
}
#[test]
fn test_json_empty_projection() {
let builder = ReaderBuilder::new()
.infer_schema(None)
.with_batch_size(64)
.with_projection(vec![]);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(0, batch.num_columns());
assert_eq!(12, batch.num_rows());
}
#[test]
fn test_json_basic_with_nulls() {
let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(4, batch.num_columns());
assert_eq!(12, batch.num_rows());
let schema = reader.schema();
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let a = schema.column_with_name("a").unwrap();
assert_eq!(&DataType::Int64, a.1.data_type());
let b = schema.column_with_name("b").unwrap();
assert_eq!(&DataType::Float64, b.1.data_type());
let c = schema.column_with_name("c").unwrap();
assert_eq!(&DataType::Boolean, c.1.data_type());
let d = schema.column_with_name("d").unwrap();
assert_eq!(&DataType::Utf8, d.1.data_type());
let aa = batch
.column(a.0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert!(aa.is_valid(0));
assert!(!aa.is_valid(1));
assert!(!aa.is_valid(11));
let bb = batch
.column(b.0)
.as_any()
.downcast_ref::<Float64Array>()
.unwrap();
assert!(bb.is_valid(0));
assert!(!bb.is_valid(2));
assert!(!bb.is_valid(11));
let cc = batch
.column(c.0)
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap();
assert!(cc.is_valid(0));
assert!(!cc.is_valid(4));
assert!(!cc.is_valid(11));
let dd = batch
.column(d.0)
.as_any()
.downcast_ref::<StringArray>()
.unwrap();
assert!(!dd.is_valid(0));
assert!(dd.is_valid(1));
assert!(!dd.is_valid(4));
assert!(!dd.is_valid(11));
}
#[test]
fn test_json_basic_schema() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Float32, false),
Field::new("c", DataType::Boolean, false),
Field::new("d", DataType::Utf8, false),
]);
let mut reader: Reader<File> = Reader::new(
File::open("test/data/basic.json").unwrap(),
Arc::new(schema.clone()),
DecoderOptions::new(),
);
let reader_schema = reader.schema();
assert_eq!(reader_schema, Arc::new(schema));
let batch = reader.next().unwrap().unwrap();
assert_eq!(4, batch.num_columns());
assert_eq!(12, batch.num_rows());
let schema = batch.schema();
let a = schema.column_with_name("a").unwrap();
assert_eq!(&DataType::Int32, a.1.data_type());
let b = schema.column_with_name("b").unwrap();
assert_eq!(&DataType::Float32, b.1.data_type());
let c = schema.column_with_name("c").unwrap();
assert_eq!(&DataType::Boolean, c.1.data_type());
let d = schema.column_with_name("d").unwrap();
assert_eq!(&DataType::Utf8, d.1.data_type());
let aa = batch
.column(a.0)
.as_any()
.downcast_ref::<Int32Array>()
.unwrap();
assert_eq!(1, aa.value(0));
assert!(!aa.is_valid(11));
let bb = batch
.column(b.0)
.as_any()
.downcast_ref::<Float32Array>()
.unwrap();
assert_eq!(2.0, bb.value(0));
assert_eq!(-3.5, bb.value(1));
}
#[test]
fn test_json_format_strings_for_date() {
let schema = Arc::new(Schema::new(vec![Field::new("e", DataType::Date32, true)]));
let e = schema.column_with_name("e").unwrap();
assert_eq!(&DataType::Date32, e.1.data_type());
let mut fmts = HashMap::new();
let date_format = "%Y-%m-%d".to_string();
fmts.insert("e".to_string(), date_format.clone());
let mut reader: Reader<File> = Reader::new(
File::open("test/data/basic.json").unwrap(),
schema.clone(),
DecoderOptions::new().with_format_strings(fmts),
);
let reader_schema = reader.schema();
assert_eq!(reader_schema, schema);
let batch = reader.next().unwrap().unwrap();
let ee = batch
.column(e.0)
.as_any()
.downcast_ref::<Date32Array>()
.unwrap();
let dt = Date32Type::parse_formatted("1970-1-2", &date_format).unwrap();
assert_eq!(dt, ee.value(0));
let dt = Date32Type::parse_formatted("1969-12-31", &date_format).unwrap();
assert_eq!(dt, ee.value(1));
assert!(!ee.is_valid(2));
}
#[test]
fn test_json_basic_schema_projection() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Float32, false),
Field::new("c", DataType::Boolean, false),
]);
let mut reader: Reader<File> = Reader::new(
File::open("test/data/basic.json").unwrap(),
Arc::new(schema),
DecoderOptions::new().with_projection(vec!["a".to_string(), "c".to_string()]),
);
let reader_schema = reader.schema();
let expected_schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("c", DataType::Boolean, false),
]));
assert_eq!(reader_schema, expected_schema);
let batch = reader.next().unwrap().unwrap();
assert_eq!(2, batch.num_columns());
assert_eq!(2, batch.schema().fields().len());
assert_eq!(12, batch.num_rows());
let schema = batch.schema();
assert_eq!(reader_schema, schema);
let a = schema.column_with_name("a").unwrap();
assert_eq!(0, a.0);
assert_eq!(&DataType::Int32, a.1.data_type());
let c = schema.column_with_name("c").unwrap();
assert_eq!(1, c.0);
assert_eq!(&DataType::Boolean, c.1.data_type());
}
#[test]
fn test_json_arrays() {
let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/arrays.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(4, batch.num_columns());
assert_eq!(3, batch.num_rows());
let schema = batch.schema();
let a = schema.column_with_name("a").unwrap();
assert_eq!(&DataType::Int64, a.1.data_type());
let b = schema.column_with_name("b").unwrap();
assert_eq!(
&DataType::List(Box::new(Field::new("item", DataType::Float64, true))),
b.1.data_type()
);
let c = schema.column_with_name("c").unwrap();
assert_eq!(
&DataType::List(Box::new(Field::new("item", DataType::Boolean, true))),
c.1.data_type()
);
let d = schema.column_with_name("d").unwrap();
assert_eq!(&DataType::Utf8, d.1.data_type());
let aa = batch
.column(a.0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
assert_eq!(1, aa.value(0));
assert_eq!(-10, aa.value(1));
assert_eq!(1627668684594000000, aa.value(2));
let bb = batch
.column(b.0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let bb = as_primitive_array::<Float64Type>(bb.values());
assert_eq!(9, bb.len());
assert_eq!(2.0, bb.value(0));
assert_eq!(-6.1, bb.value(5));
assert!(!bb.is_valid(7));
let cc = batch
.column(c.0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let cc = as_boolean_array(cc.values());
assert_eq!(6, cc.len());
assert!(!cc.value(0));
assert!(!cc.value(4));
assert!(!cc.is_valid(5));
}
#[test]
fn test_invalid_json_infer_schema() {
let re =
infer_json_schema_from_seekable(&mut BufReader::new(Cursor::new(b"}")), None);
assert_eq!(
re.err().unwrap().to_string(),
"Json error: Not valid JSON: expected value at line 1 column 1",
);
}
#[test]
fn test_invalid_json_read_record() {
let schema = Arc::new(Schema::new(vec![Field::new(
"a",
DataType::Struct(vec![Field::new("a", DataType::Utf8, true)]),
true,
)]));
let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64);
let mut reader = builder.build(Cursor::new(b"}")).unwrap();
assert_eq!(
reader.next().err().unwrap().to_string(),
"Json error: Not valid JSON: expected value at line 1 column 1",
);
}
#[test]
fn test_coersion_scalar_and_list() {
use arrow_schema::DataType::*;
assert_eq!(
List(Box::new(Field::new("item", Float64, true))),
coerce_data_type(vec![
&Float64,
&List(Box::new(Field::new("item", Float64, true)))
])
);
assert_eq!(
List(Box::new(Field::new("item", Float64, true))),
coerce_data_type(vec![
&Float64,
&List(Box::new(Field::new("item", Int64, true)))
])
);
assert_eq!(
List(Box::new(Field::new("item", Int64, true))),
coerce_data_type(vec![
&Int64,
&List(Box::new(Field::new("item", Int64, true)))
])
);
assert_eq!(
List(Box::new(Field::new("item", Utf8, true))),
coerce_data_type(vec![
&Boolean,
&List(Box::new(Field::new("item", Float64, true)))
])
);
}
#[test]
fn test_mixed_json_arrays() {
let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/mixed_arrays.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
let mut file = File::open("test/data/mixed_arrays.json.gz").unwrap();
let mut reader = BufReader::new(GzDecoder::new(&file));
let schema = infer_json_schema(&mut reader, None).unwrap();
file.rewind().unwrap();
let reader = BufReader::new(GzDecoder::new(&file));
let options = DecoderOptions::new().with_batch_size(64);
let mut reader = Reader::from_buf_reader(reader, Arc::new(schema), options);
let batch_gz = reader.next().unwrap().unwrap();
for batch in vec![batch, batch_gz] {
assert_eq!(4, batch.num_columns());
assert_eq!(4, batch.num_rows());
let schema = batch.schema();
let a = schema.column_with_name("a").unwrap();
assert_eq!(&DataType::Int64, a.1.data_type());
let b = schema.column_with_name("b").unwrap();
assert_eq!(
&DataType::List(Box::new(Field::new("item", DataType::Float64, true))),
b.1.data_type()
);
let c = schema.column_with_name("c").unwrap();
assert_eq!(
&DataType::List(Box::new(Field::new("item", DataType::Boolean, true))),
c.1.data_type()
);
let d = schema.column_with_name("d").unwrap();
assert_eq!(
&DataType::List(Box::new(Field::new("item", DataType::Utf8, true))),
d.1.data_type()
);
let bb = batch
.column(b.0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let bb = as_primitive_array::<Float64Type>(bb.values());
assert_eq!(10, bb.len());
assert_eq!(4.0, bb.value(9));
let cc = batch
.column(c.0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
assert_eq!(
cc.data().buffers()[0],
Buffer::from_slice_ref([0i32, 2, 2, 4, 5])
);
let cc = as_boolean_array(cc.values());
let cc_expected = BooleanArray::from(vec![
Some(false),
Some(true),
Some(false),
None,
Some(false),
]);
assert_eq!(cc.data_ref(), cc_expected.data_ref());
let dd: &ListArray = batch
.column(d.0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
assert_eq!(
dd.data().buffers()[0],
Buffer::from_slice_ref([0i32, 1, 1, 2, 6])
);
let dd = as_string_array(dd.values());
assert_eq!(6, dd.len());
assert_eq!("text", dd.value(1));
assert_eq!("1", dd.value(2));
assert_eq!("false", dd.value(3));
assert_eq!("array", dd.value(4));
assert_eq!("2.4", dd.value(5));
}
}
#[test]
fn test_nested_struct_json_arrays() {
let c_field = Field::new(
"c",
DataType::Struct(vec![Field::new("d", DataType::Utf8, true)]),
true,
);
let a_field = Field::new(
"a",
DataType::Struct(vec![
Field::new("b", DataType::Boolean, true),
c_field.clone(),
]),
true,
);
let schema = Arc::new(Schema::new(vec![a_field.clone()]));
let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/nested_structs.json").unwrap())
.unwrap();
let d = StringArray::from(vec![Some("text"), None, Some("text"), None]);
let c = ArrayDataBuilder::new(c_field.data_type().clone())
.len(4)
.add_child_data(d.into_data())
.null_bit_buffer(Some(Buffer::from(vec![0b00000101])))
.build()
.unwrap();
let b = BooleanArray::from(vec![Some(true), Some(false), Some(true), None]);
let a = ArrayDataBuilder::new(a_field.data_type().clone())
.len(4)
.add_child_data(b.into_data())
.add_child_data(c)
.null_bit_buffer(Some(Buffer::from(vec![0b00000111])))
.build()
.unwrap();
let expected = make_array(a);
let batch = reader.next().unwrap().unwrap();
let read = batch.column(0);
assert!(
expected.data_ref() == read.data_ref(),
"{:?} != {:?}",
expected.data(),
read.data(),
);
}
#[test]
fn test_nested_list_json_arrays() {
let c_field = Field::new(
"c",
DataType::Struct(vec![Field::new("d", DataType::Utf8, true)]),
true,
);
let a_struct_field = Field::new(
"a",
DataType::Struct(vec![
Field::new("b", DataType::Boolean, true),
c_field.clone(),
]),
true,
);
let a_field =
Field::new("a", DataType::List(Box::new(a_struct_field.clone())), true);
let schema = Arc::new(Schema::new(vec![a_field.clone()]));
let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64);
let json_content = r#"
{"a": [{"b": true, "c": {"d": "a_text"}}, {"b": false, "c": {"d": "b_text"}}]}
{"a": [{"b": false, "c": null}]}
{"a": [{"b": true, "c": {"d": "c_text"}}, {"b": null, "c": {"d": "d_text"}}, {"b": true, "c": {"d": null}}]}
{"a": null}
{"a": []}
{"a": [null]}
"#;
let mut reader = builder.build(Cursor::new(json_content)).unwrap();
let d = StringArray::from(vec![
Some("a_text"),
Some("b_text"),
None,
Some("c_text"),
Some("d_text"),
None,
None,
]);
let c = ArrayDataBuilder::new(c_field.data_type().clone())
.len(7)
.add_child_data(d.data().clone())
.null_bit_buffer(Some(Buffer::from(vec![0b00111011])))
.build()
.unwrap();
let b = BooleanArray::from(vec![
Some(true),
Some(false),
Some(false),
Some(true),
None,
Some(true),
None,
]);
let a = ArrayDataBuilder::new(a_struct_field.data_type().clone())
.len(7)
.add_child_data(b.data().clone())
.add_child_data(c.clone())
.null_bit_buffer(Some(Buffer::from(vec![0b00111111])))
.build()
.unwrap();
let a_list = ArrayDataBuilder::new(a_field.data_type().clone())
.len(6)
.add_buffer(Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7]))
.add_child_data(a)
.null_bit_buffer(Some(Buffer::from(vec![0b00110111])))
.build()
.unwrap();
let expected = make_array(a_list);
let batch = reader.next().unwrap().unwrap();
let read = batch.column(0);
assert_eq!(read.len(), 6);
let read: &ListArray = read.as_any().downcast_ref::<ListArray>().unwrap();
let expected = expected.as_any().downcast_ref::<ListArray>().unwrap();
assert_eq!(
read.data().buffers()[0],
Buffer::from_slice_ref([0i32, 2, 3, 6, 6, 6, 7])
);
assert_eq!(read.data().null_buffer(), expected.data().null_buffer());
let struct_array = as_struct_array(read.values());
let expected_struct_array = as_struct_array(expected.values());
assert_eq!(7, struct_array.len());
assert_eq!(1, struct_array.null_count());
assert_eq!(7, expected_struct_array.len());
assert_eq!(1, expected_struct_array.null_count());
assert_eq!(
struct_array.data().null_buffer(),
expected_struct_array.data().null_buffer()
);
let read_b = struct_array.column(0);
assert_eq!(b.data_ref(), read_b.data_ref());
let read_c = struct_array.column(1);
assert_eq!(&c, read_c.data_ref());
let read_c: &StructArray = read_c.as_any().downcast_ref::<StructArray>().unwrap();
let read_d = read_c.column(0);
assert_eq!(d.data_ref(), read_d.data_ref());
assert_eq!(read.data_ref(), expected.data_ref());
}
#[test]
fn test_map_json_arrays() {
let account_field = Field::new("account", DataType::UInt16, false);
let value_list_type =
DataType::List(Box::new(Field::new("item", DataType::Utf8, false)));
let entries_struct_type = DataType::Struct(vec![
Field::new("key", DataType::Utf8, false),
Field::new("value", value_list_type.clone(), true),
]);
let stocks_field = Field::new(
"stocks",
DataType::Map(
Box::new(Field::new("entries", entries_struct_type.clone(), false)),
false,
),
true,
);
let schema = Arc::new(Schema::new(vec![account_field, stocks_field.clone()]));
let builder = ReaderBuilder::new().with_schema(schema).with_batch_size(64);
let json_content = r#"
{"account": 123, "stocks":{"long": ["$AAA", "$BBB"], "short": ["$CCC", "$D"]}}
{"account": 456, "stocks":{"long": null, "long": ["$AAA", "$CCC", "$D"], "short": null}}
{"account": 789, "stocks":{"hedged": ["$YYY"], "long": null, "short": ["$D"]}}
"#;
let mut reader = builder.build(Cursor::new(json_content)).unwrap();
let expected_accounts = UInt16Array::from(vec![123, 456, 789]);
let expected_keys = StringArray::from(vec![
"long", "short", "long", "short", "hedged", "long", "short",
])
.data()
.clone();
let expected_value_array_data = StringArray::from(vec![
"$AAA", "$BBB", "$CCC", "$D", "$AAA", "$CCC", "$D", "$YYY", "$D",
])
.data()
.clone();
let expected_values = ArrayDataBuilder::new(value_list_type)
.len(7)
.add_buffer(Buffer::from(
vec![0i32, 2, 4, 7, 7, 8, 8, 9].to_byte_slice(),
))
.add_child_data(expected_value_array_data)
.null_bit_buffer(Some(Buffer::from(vec![0b01010111])))
.build()
.unwrap();
let expected_stocks_entries_data = ArrayDataBuilder::new(entries_struct_type)
.len(7)
.add_child_data(expected_keys)
.add_child_data(expected_values)
.build()
.unwrap();
let expected_stocks_data =
ArrayDataBuilder::new(stocks_field.data_type().clone())
.len(3)
.add_buffer(Buffer::from(vec![0i32, 2, 4, 7].to_byte_slice()))
.add_child_data(expected_stocks_entries_data)
.build()
.unwrap();
let expected_stocks = make_array(expected_stocks_data);
let batch = reader.next().unwrap().unwrap();
assert_eq!(batch.num_rows(), 3);
assert_eq!(batch.num_columns(), 2);
let col1 = batch.column(0);
assert_eq!(col1.data(), expected_accounts.data());
let col2 = batch.column(1);
assert_eq!(col2.data(), expected_stocks.data());
}
#[test]
fn test_dictionary_from_json_basic_with_nulls() {
let schema = Schema::new(vec![Field::new(
"d",
Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
true,
)]);
let builder = ReaderBuilder::new()
.with_schema(Arc::new(schema))
.with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
assert_eq!(12, batch.num_rows());
let schema = reader.schema();
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let d = schema.column_with_name("d").unwrap();
assert_eq!(
&Dictionary(Box::new(DataType::Int16), Box::new(DataType::Utf8)),
d.1.data_type()
);
let dd = batch
.column(d.0)
.as_any()
.downcast_ref::<DictionaryArray<Int16Type>>()
.unwrap();
assert!(!dd.is_valid(0));
assert!(dd.is_valid(1));
assert!(dd.is_valid(2));
assert!(!dd.is_valid(11));
assert_eq!(
dd.keys(),
&Int16Array::from(vec![
None,
Some(0),
Some(1),
Some(0),
None,
None,
Some(0),
None,
Some(1),
Some(0),
Some(0),
None
])
);
}
#[test]
fn test_dictionary_from_json_int8() {
let schema = Schema::new(vec![Field::new(
"d",
Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
true,
)]);
let builder = ReaderBuilder::new()
.with_schema(Arc::new(schema))
.with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
assert_eq!(12, batch.num_rows());
let schema = reader.schema();
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let d = schema.column_with_name("d").unwrap();
assert_eq!(
&Dictionary(Box::new(DataType::Int8), Box::new(DataType::Utf8)),
d.1.data_type()
);
}
#[test]
fn test_dictionary_from_json_int32() {
let schema = Schema::new(vec![Field::new(
"d",
Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
true,
)]);
let builder = ReaderBuilder::new()
.with_schema(Arc::new(schema))
.with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
assert_eq!(12, batch.num_rows());
let schema = reader.schema();
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let d = schema.column_with_name("d").unwrap();
assert_eq!(
&Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
d.1.data_type()
);
}
#[test]
fn test_dictionary_from_json_int64() {
let schema = Schema::new(vec![Field::new(
"d",
Dictionary(Box::new(DataType::Int64), Box::new(DataType::Utf8)),
true,
)]);
let builder = ReaderBuilder::new()
.with_schema(Arc::new(schema))
.with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
assert_eq!(12, batch.num_rows());
let schema = reader.schema();
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let d = schema.column_with_name("d").unwrap();
assert_eq!(
&Dictionary(Box::new(DataType::Int64), Box::new(DataType::Utf8)),
d.1.data_type()
);
}
#[test]
fn test_skip_empty_lines() {
let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
let json_content = "
{\"a\": 1}
{\"a\": 2}
{\"a\": 3}";
let mut reader = builder.build(Cursor::new(json_content)).unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
assert_eq!(3, batch.num_rows());
let schema = reader.schema();
let c = schema.column_with_name("a").unwrap();
assert_eq!(&DataType::Int64, c.1.data_type());
}
#[test]
fn test_row_type_validation() {
let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(64);
let json_content = "
[1, \"hello\"]
\"world\"";
let re = builder.build(Cursor::new(json_content));
assert_eq!(
re.err().unwrap().to_string(),
r#"Json error: Expected JSON record to be an object, found Array [Number(1), String("hello")]"#,
);
}
#[test]
fn test_list_of_string_dictionary_from_json() {
let schema = Schema::new(vec![Field::new(
"events",
List(Box::new(Field::new(
"item",
Dictionary(Box::new(DataType::UInt64), Box::new(DataType::Utf8)),
true,
))),
true,
)]);
let builder = ReaderBuilder::new()
.with_schema(Arc::new(schema))
.with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/list_string_dict_nested.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
assert_eq!(3, batch.num_rows());
let schema = reader.schema();
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let events = schema.column_with_name("events").unwrap();
assert_eq!(
&List(Box::new(Field::new(
"item",
Dictionary(Box::new(DataType::UInt64), Box::new(DataType::Utf8)),
true
))),
events.1.data_type()
);
let evs_list = batch
.column(events.0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let evs_list = as_dictionary_array::<UInt64Type>(evs_list.values());
assert_eq!(6, evs_list.len());
assert!(evs_list.is_valid(1));
assert_eq!(DataType::Utf8, evs_list.value_type());
let dict_el = evs_list.values();
let dict_el = dict_el.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(3, dict_el.len());
assert_eq!("Elect Leader", dict_el.value(0));
assert_eq!("Do Ballot", dict_el.value(1));
assert_eq!("Send Data", dict_el.value(2));
}
#[test]
fn test_list_of_string_dictionary_from_json_with_nulls() {
let schema = Schema::new(vec![Field::new(
"events",
List(Box::new(Field::new(
"item",
Dictionary(Box::new(DataType::UInt64), Box::new(DataType::Utf8)),
true,
))),
true,
)]);
let builder = ReaderBuilder::new()
.with_schema(Arc::new(schema))
.with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(
File::open("test/data/list_string_dict_nested_nulls.json").unwrap(),
)
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
assert_eq!(3, batch.num_rows());
let schema = reader.schema();
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let events = schema.column_with_name("events").unwrap();
assert_eq!(
&List(Box::new(Field::new(
"item",
Dictionary(Box::new(DataType::UInt64), Box::new(DataType::Utf8)),
true
))),
events.1.data_type()
);
let evs_list = batch
.column(events.0)
.as_any()
.downcast_ref::<ListArray>()
.unwrap();
let evs_list = as_dictionary_array::<UInt64Type>(evs_list.values());
assert_eq!(8, evs_list.len());
assert!(evs_list.is_valid(1));
assert_eq!(DataType::Utf8, evs_list.value_type());
let dict_el = evs_list.values();
let dict_el = dict_el.as_any().downcast_ref::<StringArray>().unwrap();
assert_eq!(2, evs_list.null_count());
assert_eq!(3, dict_el.len());
assert_eq!("Elect Leader", dict_el.value(0));
assert_eq!("Do Ballot", dict_el.value(1));
assert_eq!("Send Data", dict_el.value(2));
}
#[test]
fn test_dictionary_from_json_uint8() {
let schema = Schema::new(vec![Field::new(
"d",
Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
true,
)]);
let builder = ReaderBuilder::new()
.with_schema(Arc::new(schema))
.with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
assert_eq!(12, batch.num_rows());
let schema = reader.schema();
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let d = schema.column_with_name("d").unwrap();
assert_eq!(
&Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
d.1.data_type()
);
}
#[test]
fn test_dictionary_from_json_uint32() {
let schema = Schema::new(vec![Field::new(
"d",
Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
true,
)]);
let builder = ReaderBuilder::new()
.with_schema(Arc::new(schema))
.with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
assert_eq!(12, batch.num_rows());
let schema = reader.schema();
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let d = schema.column_with_name("d").unwrap();
assert_eq!(
&Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
d.1.data_type()
);
}
#[test]
fn test_dictionary_from_json_uint64() {
let schema = Schema::new(vec![Field::new(
"d",
Dictionary(Box::new(DataType::UInt64), Box::new(DataType::Utf8)),
true,
)]);
let builder = ReaderBuilder::new()
.with_schema(Arc::new(schema))
.with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
assert_eq!(12, batch.num_rows());
let schema = reader.schema();
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let d = schema.column_with_name("d").unwrap();
assert_eq!(
&Dictionary(Box::new(DataType::UInt64), Box::new(DataType::Utf8)),
d.1.data_type()
);
}
#[test]
fn test_with_multiple_batches() {
let builder = ReaderBuilder::new()
.infer_schema(Some(4))
.with_batch_size(5);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
.unwrap();
let mut num_records = Vec::new();
while let Some(rb) = reader.next().unwrap() {
num_records.push(rb.num_rows());
}
assert_eq!(vec![5, 5, 2], num_records);
}
#[test]
fn test_json_infer_schema() {
let schema = Schema::new(vec![
Field::new("a", DataType::Int64, true),
Field::new(
"b",
DataType::List(Box::new(Field::new("item", DataType::Float64, true))),
true,
),
Field::new(
"c",
DataType::List(Box::new(Field::new("item", DataType::Boolean, true))),
true,
),
Field::new(
"d",
DataType::List(Box::new(Field::new("item", DataType::Utf8, true))),
true,
),
]);
let mut reader =
BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
let inferred_schema = infer_json_schema_from_seekable(&mut reader, None).unwrap();
assert_eq!(inferred_schema, schema);
let file = File::open("test/data/mixed_arrays.json.gz").unwrap();
let mut reader = BufReader::new(GzDecoder::new(&file));
let inferred_schema = infer_json_schema(&mut reader, None).unwrap();
assert_eq!(inferred_schema, schema);
}
#[test]
fn test_json_infer_schema_nested_structs() {
let schema = Schema::new(vec![
Field::new(
"c1",
DataType::Struct(vec![
Field::new("a", DataType::Boolean, true),
Field::new(
"b",
DataType::Struct(vec![Field::new("c", DataType::Utf8, true)]),
true,
),
]),
true,
),
Field::new("c2", DataType::Int64, true),
Field::new("c3", DataType::Utf8, true),
]);
let inferred_schema = infer_json_schema_from_iterator(
vec![
Ok(serde_json::json!({"c1": {"a": true, "b": {"c": "text"}}, "c2": 1})),
Ok(serde_json::json!({"c1": {"a": false, "b": null}, "c2": 0})),
Ok(serde_json::json!({"c1": {"a": true, "b": {"c": "text"}}, "c3": "ok"})),
]
.into_iter(),
)
.unwrap();
assert_eq!(inferred_schema, schema);
}
#[test]
fn test_json_infer_schema_struct_in_list() {
let schema = Schema::new(vec![
Field::new(
"c1",
DataType::List(Box::new(Field::new(
"item",
DataType::Struct(vec![
Field::new("a", DataType::Utf8, true),
Field::new("b", DataType::Int64, true),
Field::new("c", DataType::Boolean, true),
]),
true,
))),
true,
),
Field::new("c2", DataType::Float64, true),
Field::new(
"c3",
DataType::List(Box::new(Field::new("item", DataType::Null, true))),
true,
),
]);
let inferred_schema = infer_json_schema_from_iterator(
vec![
Ok(serde_json::json!({
"c1": [{"a": "foo", "b": 100}], "c2": 1, "c3": [],
})),
Ok(serde_json::json!({
"c1": [{"a": "bar", "b": 2}, {"a": "foo", "c": true}], "c2": 0, "c3": [],
})),
Ok(serde_json::json!({"c1": [], "c2": 0.5, "c3": []})),
]
.into_iter(),
)
.unwrap();
assert_eq!(inferred_schema, schema);
}
#[test]
fn test_json_infer_schema_nested_list() {
let schema = Schema::new(vec![
Field::new(
"c1",
DataType::List(Box::new(Field::new(
"item",
DataType::List(Box::new(Field::new("item", DataType::Utf8, true))),
true,
))),
true,
),
Field::new("c2", DataType::Float64, true),
]);
let inferred_schema = infer_json_schema_from_iterator(
vec![
Ok(serde_json::json!({
"c1": [],
"c2": 12,
})),
Ok(serde_json::json!({
"c1": [["a", "b"], ["c"]],
})),
Ok(serde_json::json!({
"c1": [["foo"]],
"c2": 0.11,
})),
]
.into_iter(),
)
.unwrap();
assert_eq!(inferred_schema, schema);
}
#[test]
fn test_timestamp_from_json_seconds() {
let schema = Schema::new(vec![Field::new(
"a",
DataType::Timestamp(TimeUnit::Second, None),
true,
)]);
let builder = ReaderBuilder::new()
.with_schema(Arc::new(schema))
.with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
assert_eq!(12, batch.num_rows());
let schema = reader.schema();
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let a = schema.column_with_name("a").unwrap();
assert_eq!(
&DataType::Timestamp(TimeUnit::Second, None),
a.1.data_type()
);
let aa = batch
.column(a.0)
.as_any()
.downcast_ref::<TimestampSecondArray>()
.unwrap();
assert!(aa.is_valid(0));
assert!(!aa.is_valid(1));
assert!(!aa.is_valid(2));
assert_eq!(1, aa.value(0));
assert_eq!(1, aa.value(3));
assert_eq!(5, aa.value(7));
}
#[test]
fn test_timestamp_from_json_milliseconds() {
let schema = Schema::new(vec![Field::new(
"a",
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
)]);
let builder = ReaderBuilder::new()
.with_schema(Arc::new(schema))
.with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
assert_eq!(12, batch.num_rows());
let schema = reader.schema();
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let a = schema.column_with_name("a").unwrap();
assert_eq!(
&DataType::Timestamp(TimeUnit::Millisecond, None),
a.1.data_type()
);
let aa = batch
.column(a.0)
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.unwrap();
assert!(aa.is_valid(0));
assert!(!aa.is_valid(1));
assert!(!aa.is_valid(2));
assert_eq!(1, aa.value(0));
assert_eq!(1, aa.value(3));
assert_eq!(5, aa.value(7));
}
#[test]
fn test_date_from_json_milliseconds() {
let schema = Schema::new(vec![Field::new("a", DataType::Date64, true)]);
let builder = ReaderBuilder::new()
.with_schema(Arc::new(schema))
.with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
assert_eq!(12, batch.num_rows());
let schema = reader.schema();
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let a = schema.column_with_name("a").unwrap();
assert_eq!(&DataType::Date64, a.1.data_type());
let aa = batch
.column(a.0)
.as_any()
.downcast_ref::<Date64Array>()
.unwrap();
assert!(aa.is_valid(0));
assert!(!aa.is_valid(1));
assert!(!aa.is_valid(2));
assert_eq!(1, aa.value(0));
assert_eq!(1, aa.value(3));
assert_eq!(5, aa.value(7));
}
#[test]
fn test_time_from_json_nanoseconds() {
let schema = Schema::new(vec![Field::new(
"a",
DataType::Time64(TimeUnit::Nanosecond),
true,
)]);
let builder = ReaderBuilder::new()
.with_schema(Arc::new(schema))
.with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
assert_eq!(1, batch.num_columns());
assert_eq!(12, batch.num_rows());
let schema = reader.schema();
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let a = schema.column_with_name("a").unwrap();
assert_eq!(&DataType::Time64(TimeUnit::Nanosecond), a.1.data_type());
let aa = batch
.column(a.0)
.as_any()
.downcast_ref::<Time64NanosecondArray>()
.unwrap();
assert!(aa.is_valid(0));
assert!(!aa.is_valid(1));
assert!(!aa.is_valid(2));
assert_eq!(1, aa.value(0));
assert_eq!(1, aa.value(3));
assert_eq!(5, aa.value(7));
}
#[test]
fn test_time_from_string() {
parse_string_column::<Time64NanosecondType>(4);
parse_string_column::<Time64MicrosecondType>(4);
parse_string_column::<Time32MillisecondType>(4);
parse_string_column::<Time32SecondType>(4);
}
fn parse_string_column<T>(value: T::Native)
where
T: ArrowPrimitiveType,
{
let schema = Schema::new(vec![Field::new("d", T::DATA_TYPE, true)]);
let builder = ReaderBuilder::new()
.with_schema(Arc::new(schema))
.with_batch_size(64);
let mut reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic_nulls.json").unwrap())
.unwrap();
let batch = reader.next().unwrap().unwrap();
let dd = batch
.column(0)
.as_any()
.downcast_ref::<PrimitiveArray<T>>()
.unwrap();
assert_eq!(value, dd.value(1));
assert!(!dd.is_valid(2));
}
#[test]
fn test_json_read_nested_list() {
let schema = Schema::new(vec![Field::new(
"c1",
DataType::List(Box::new(Field::new(
"item",
DataType::List(Box::new(Field::new("item", DataType::Utf8, true))),
true,
))),
true,
)]);
let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
let batch = decoder
.next_batch(
&mut vec![
Ok(serde_json::json!({
"c1": [],
})),
Ok(serde_json::json!({
"c1": [["a", "b"], ["c"], ["e", "f"], ["g"], ["h"], ["i"], ["j"], ["k"]],
})),
Ok(serde_json::json!({
"c1": [["foo"], ["bar"]],
})),
]
.into_iter(),
)
.unwrap()
.unwrap();
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.num_rows(), 3);
}
#[test]
fn test_json_read_list_of_structs() {
let schema = Schema::new(vec![Field::new(
"c1",
DataType::List(Box::new(Field::new(
"item",
DataType::Struct(vec![Field::new("a", DataType::Int64, true)]),
true,
))),
true,
)]);
let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
let batch = decoder
.next_batch(
&mut vec![
Ok(serde_json::json!({
"c1": [{"a": 1}],
})),
Ok(serde_json::json!({
"c1": [{"a": 2}, {"a": 3}, {"a": 4}, {"a": 5}, {"a": 6}, {"a": 7}],
})),
Ok(serde_json::json!({
"c1": [{"a": 10}, {"a": 11}],
})),
]
.into_iter(),
)
.unwrap()
.unwrap();
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.num_rows(), 3);
}
#[test]
fn test_json_read_binary_structs() {
let schema = Schema::new(vec![Field::new("c1", DataType::Binary, true)]);
let decoder = Decoder::new(Arc::new(schema), DecoderOptions::new());
let batch = decoder
.next_batch(
&mut vec![
Ok(serde_json::json!({
"c1": "₁₂₃",
})),
Ok(serde_json::json!({
"c1": "foo",
})),
]
.into_iter(),
)
.unwrap()
.unwrap();
let data = batch.columns().iter().collect::<Vec<_>>();
let schema = Schema::new(vec![Field::new("c1", DataType::Binary, true)]);
let binary_values = BinaryArray::from(vec!["₁₂₃".as_bytes(), "foo".as_bytes()]);
let expected_batch =
RecordBatch::try_new(Arc::new(schema), vec![Arc::new(binary_values)])
.unwrap();
let expected_data = expected_batch.columns().iter().collect::<Vec<_>>();
assert_eq!(data, expected_data);
assert_eq!(batch.num_columns(), 1);
assert_eq!(batch.num_rows(), 2);
}
#[test]
fn test_json_iterator() {
let builder = ReaderBuilder::new().infer_schema(None).with_batch_size(5);
let reader: Reader<File> = builder
.build::<File>(File::open("test/data/basic.json").unwrap())
.unwrap();
let schema = reader.schema();
let (col_a_index, _) = schema.column_with_name("a").unwrap();
let mut sum_num_rows = 0;
let mut num_batches = 0;
let mut sum_a = 0;
for batch in reader {
let batch = batch.unwrap();
assert_eq!(5, batch.num_columns());
sum_num_rows += batch.num_rows();
num_batches += 1;
let batch_schema = batch.schema();
assert_eq!(schema, batch_schema);
let a_array = batch
.column(col_a_index)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
sum_a += (0..a_array.len()).map(|i| a_array.value(i)).sum::<i64>();
}
assert_eq!(12, sum_num_rows);
assert_eq!(3, num_batches);
assert_eq!(100000000000011, sum_a);
}
#[test]
fn test_options_clone() {
let options = DecoderOptions::new().with_batch_size(64);
let cloned = options.clone();
assert_eq!(options, cloned);
}
}