use std::io::{BufReader, Read, Seek, SeekFrom};
use indexmap::map::IndexMap as HashMap;
use indexmap::set::IndexSet as HashSet;
use serde_json::Value;
use super::util::ValueIter;
use crate::datatypes::*;
use crate::error::{ArrowError, Result};
fn coerce_data_type(dt: &[&DataType]) -> DataType {
use DataType::*;
if dt.len() == 1 {
return dt[0].clone();
} else if dt.len() > 2 {
return List(Box::new(Field::new("item", Utf8, true)));
}
let (lhs, rhs) = (dt[0], dt[1]);
return match (lhs, rhs) {
(lhs, rhs) if lhs == rhs => lhs.clone(),
(List(lhs), List(rhs)) => {
let inner = coerce_data_type(&[lhs.data_type(), rhs.data_type()]);
List(Box::new(Field::new("item", inner, true)))
}
(scalar, List(list)) => {
let inner = coerce_data_type(&[scalar, list.data_type()]);
List(Box::new(Field::new("item", inner, true)))
}
(List(list), scalar) => {
let inner = coerce_data_type(&[scalar, list.data_type()]);
List(Box::new(Field::new("item", inner, true)))
}
(Float64, Int64) => Float64,
(Int64, Float64) => Float64,
(Int64, Boolean) => Int64,
(Boolean, Int64) => Int64,
(_, _) => Utf8,
};
}
fn generate_schema(spec: HashMap<String, HashSet<DataType>>) -> Schema {
let fields: Vec<Field> = spec
.iter()
.map(|(k, hs)| {
let v: Vec<&DataType> = hs.iter().collect();
Field::new(k, coerce_data_type(&v), true)
})
.collect();
Schema::new(fields)
}
pub fn infer_json_schema<R: Read>(
reader: &mut BufReader<R>,
max_read_records: Option<usize>,
) -> Result<Schema> {
infer_json_schema_from_iterator(ValueIter::new(reader, max_read_records))
}
pub fn infer_json_schema_from_iterator<I>(value_iter: I) -> Result<Schema>
where
I: Iterator<Item = Result<Value>>,
{
let mut values: HashMap<String, HashSet<DataType>> = HashMap::new();
for record in value_iter {
match record? {
Value::Object(map) => {
let res = map.iter().try_for_each(|(k, v)| {
match v {
Value::Array(a) => {
let types: Result<Vec<Option<&DataType>>> = a
.iter()
.map(|a| match a {
Value::Null => Ok(None),
Value::Number(n) => {
if n.is_i64() {
Ok(Some(&DataType::Int64))
} else {
Ok(Some(&DataType::Float64))
}
}
Value::Bool(_) => Ok(Some(&DataType::Boolean)),
Value::String(_) => Ok(Some(&DataType::Utf8)),
Value::Array(_) | Value::Object(_) => {
Err(ArrowError::NotYetImplemented(
"Nested lists and structs not supported".to_string(),
))
}
})
.collect();
match types {
Ok(types) => {
let mut types: Vec<&DataType> =
types.into_iter().flatten().collect();
types.dedup();
if !types.is_empty() {
let dt = coerce_data_type(&types);
if values.contains_key(k) {
let x = values.get_mut(k).unwrap();
x.insert(DataType::List(Box::new(Field::new(
"item", dt, true,
))));
} else {
let mut hs = HashSet::new();
hs.insert(DataType::List(Box::new(Field::new(
"item", dt, true,
))));
values.insert(k.to_string(), hs);
}
}
Ok(())
}
Err(e) => Err(e),
}
}
Value::Bool(_) => {
if values.contains_key(k) {
let x = values.get_mut(k).unwrap();
x.insert(DataType::Boolean);
} else {
let mut hs = HashSet::new();
hs.insert(DataType::Boolean);
values.insert(k.to_string(), hs);
}
Ok(())
}
Value::Null => {
Ok(())
}
Value::Number(n) => {
if n.is_f64() {
if values.contains_key(k) {
let x = values.get_mut(k).unwrap();
x.insert(DataType::Float64);
} else {
let mut hs = HashSet::new();
hs.insert(DataType::Float64);
values.insert(k.to_string(), hs);
}
} else {
if values.contains_key(k) {
let x = values.get_mut(k).unwrap();
x.insert(DataType::Int64);
} else {
let mut hs = HashSet::new();
hs.insert(DataType::Int64);
values.insert(k.to_string(), hs);
}
}
Ok(())
}
Value::String(_) => {
if values.contains_key(k) {
let x = values.get_mut(k).unwrap();
x.insert(DataType::Utf8);
} else {
let mut hs = HashSet::new();
hs.insert(DataType::Utf8);
values.insert(k.to_string(), hs);
}
Ok(())
}
Value::Object(_) => Err(ArrowError::NotYetImplemented(
"Inferring schema from nested JSON structs currently not supported"
.to_string(),
)),
}
});
match res {
Ok(()) => {}
Err(e) => return Err(e),
}
}
value => {
return Err(ArrowError::ExternalFormat(format!(
"Expected JSON record to be an object, found {:?}",
value
)));
}
};
}
Ok(generate_schema(values))
}
pub fn infer_json_schema_from_seekable<R: Read + Seek>(
reader: &mut BufReader<R>,
max_read_records: Option<usize>,
) -> Result<Schema> {
let schema = infer_json_schema(reader, max_read_records);
reader.seek(SeekFrom::Start(0))?;
schema
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn test_coersion_scalar_and_list() {
use crate::datatypes::DataType::*;
assert_eq!(
List(Box::new(Field::new("item", Float64, true))),
coerce_data_type(&[&Float64, &List(Box::new(Field::new("item", Float64, true)))])
);
assert_eq!(
List(Box::new(Field::new("item", Float64, true))),
coerce_data_type(&[&Float64, &List(Box::new(Field::new("item", Int64, true)))])
);
assert_eq!(
List(Box::new(Field::new("item", Int64, true))),
coerce_data_type(&[&Int64, &List(Box::new(Field::new("item", Int64, true)))])
);
assert_eq!(
List(Box::new(Field::new("item", Utf8, true))),
coerce_data_type(&[&Boolean, &List(Box::new(Field::new("item", Float64, true)))])
);
}
}