use hashlink::LinkedHashMap;
use std::{
any::type_name, collections::HashMap, convert::{TryFrom, TryInto}, fmt, hash::{BuildHasher, Hash}, marker::PhantomData, string::FromUtf8Error, sync::Arc
};
use sum::{Sum2, Sum3};
use amadeus_types::{
Bson, Data, Date, DateTime, DateTimeWithoutTimezone, DateWithoutTimezone, Decimal, Enum, Group, IpAddr, Json, List, Time, TimeWithoutTimezone, Timezone, Url, Value, Webpage
};
use crate::internal::{
basic::{LogicalType, Repetition, Type as PhysicalType}, column::reader::ColumnReader, data_type::{
BoolType, ByteArrayType, DoubleType, FixedLenByteArrayType, FloatType, Int32Type, Int64Type, Int96, Int96Type
}, errors::{ParquetError, Result}, record::{
display::{DisplayFmt, DisplaySchemaGroup}, predicates::{GroupPredicate, MapPredicate, ValuePredicate}, reader::{
BoolReader, BoxFixedLenByteArrayReader, BoxReader, ByteArrayReader, F32Reader, F64Reader, FixedLenByteArrayReader, GroupReader, I32Reader, I64Reader, I96Reader, KeyValueReader, MapReader, OptionReader, RepeatedReader, RootReader, TryIntoReader, TupleReader, ValueReader, VecU8Reader
}, schemas::{
BoolSchema, BoxSchema, BsonSchema, ByteArraySchema, DateSchema, DateTimeSchema, DecimalSchema, EnumSchema, F32Schema, F64Schema, FixedByteArraySchema, GroupSchema, I16Schema, I32Schema, I64Schema, I8Schema, JsonSchema, ListSchema, ListSchemaType, MapSchema, OptionSchema, RootSchema, StringSchema, TimeSchema, TupleSchema, U16Schema, U32Schema, U64Schema, U8Schema, ValueSchema, VecU8Schema
}, triplet::TypedTripletIter, types::{downcast, Downcast, Root}, ParquetData, Predicate, Reader, Schema
}, schema::types::{ColumnPath, Type}
};
macro_rules! via_string {
($($doc:tt $t:ty)*) => ($(
impl ParquetData for $t {
type Schema = StringSchema;
type Reader = impl Reader<Item = Self>;
type Predicate = Predicate;
fn parse(schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
MapReader(
String::reader(schema, path, def_level, rep_level, paths, batch_size),
|string: String| string.parse().map_err(Into::into),
)
}
}
)*)
}
impl ParquetData for Bson {
type Schema = BsonSchema;
type Reader = impl Reader<Item = Self>;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
MapReader(
byte_array_reader(&schema.0, path, def_level, rep_level, paths, batch_size),
|x| Ok(Bson::from(Vec::from(x))),
)
}
}
impl ParquetData for String {
type Schema = StringSchema;
type Reader = impl Reader<Item = Self>;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
MapReader(
byte_array_reader(&schema.0, path, def_level, rep_level, paths, batch_size),
|x| {
String::from_utf8(Vec::from(x))
.map_err(|err: FromUtf8Error| ParquetError::General(err.to_string()))
},
)
}
}
impl ParquetData for Json {
type Schema = JsonSchema;
type Reader = impl Reader<Item = Self>;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
MapReader(
String::reader(&schema.0, path, def_level, rep_level, paths, batch_size),
|x| Ok(From::from(x)),
)
}
}
impl ParquetData for Enum {
type Schema = EnumSchema;
type Reader = impl Reader<Item = Self>;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
MapReader(
String::reader(&schema.0, path, def_level, rep_level, paths, batch_size),
|x| Ok(From::from(x)),
)
}
}
macro_rules! array {
($($i:tt)*) => {$(
impl ParquetData for [u8; $i] {
type Schema = FixedByteArraySchema<Self>;
type Reader = FixedLenByteArrayReader<Self>;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
if schema.is_primitive()
&& repetition == Some(Repetition::Required)
&& schema.get_physical_type() == PhysicalType::FixedLenByteArray
&& schema.get_basic_info().logical_type() == LogicalType::None
&& schema.get_type_length() == $i
{
return Ok((schema.name().to_owned(), FixedByteArraySchema(PhantomData)));
}
Err(ParquetError::General(format!(
"Can't parse array {:?}",
schema
)))
}
fn reader(
_schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
let col_path = ColumnPath::new(path.to_vec());
let col_reader = paths.remove(&col_path).unwrap();
FixedLenByteArrayReader::<[u8; $i]> {
column: TypedTripletIter::<FixedLenByteArrayType>::new(
def_level, rep_level, col_reader, batch_size,
),
marker: PhantomData,
}
}
}
#[cfg(nightly)]
impl ParquetData for Box<[u8; $i]> {
type Schema = FixedByteArraySchema<[u8; $i]>;
type Reader = BoxFixedLenByteArrayReader<[u8; $i]>;
fn parse(
schema: &Type, predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
<[u8; $i]>::parse(schema, predicate, repetition)
}
fn reader(
_schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
let col_path = ColumnPath::new(path.to_vec());
let col_reader = paths.remove(&col_path).unwrap();
BoxFixedLenByteArrayReader::<[u8; $i]> {
column: TypedTripletIter::<FixedLenByteArrayType>::new(
def_level, rep_level, col_reader, batch_size,
),
marker: PhantomData,
}
}
}
)*};
}
amadeus_types::array!(array);
impl<T> ParquetData for Box<T>
where
T: ParquetData,
{
default type Schema = BoxSchema<T::Schema>;
default type Reader = BoxReader<T::Reader>;
type Predicate = T::Predicate;
default fn parse(
schema: &Type, predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
T::parse(schema, predicate, repetition)
.map(|(name, schema)| (name, type_coerce(BoxSchema(schema))))
}
default fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
let schema = type_coerce::<&Self::Schema, &BoxSchema<T::Schema>>(schema);
let ret = BoxReader(T::reader(
&schema.0, path, def_level, rep_level, paths, batch_size,
));
type_coerce(ret)
}
}
impl ParquetData for Decimal {
type Schema = DecimalSchema;
type Reader = impl Reader<Item = Self>;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
match *schema {
DecimalSchema::Int32 { precision, scale } => DecimalReader::Int32 {
reader: i32::reader(&I32Schema, path, def_level, rep_level, paths, batch_size),
precision,
scale,
},
DecimalSchema::Int64 { precision, scale } => DecimalReader::Int64 {
reader: i64::reader(&I64Schema, path, def_level, rep_level, paths, batch_size),
precision,
scale,
},
DecimalSchema::Array {
ref byte_array_schema,
precision,
scale,
} => DecimalReader::Array {
reader: byte_array_reader(
byte_array_schema,
path,
def_level,
rep_level,
paths,
batch_size,
),
precision,
scale,
},
}
}
}
pub enum DecimalReader {
Int32 {
reader: <i32 as ParquetData>::Reader,
precision: u8,
scale: u8,
},
Int64 {
reader: <i64 as ParquetData>::Reader,
precision: u8,
scale: u8,
},
Array {
reader: ByteArrayReader,
precision: u32,
scale: u32,
},
}
impl Reader for DecimalReader {
type Item = Decimal;
#[inline]
fn read(&mut self, def_level: i16, rep_level: i16) -> Result<Self::Item> {
match self {
DecimalReader::Int32 {
reader,
precision,
scale,
} => reader.read(def_level, rep_level).map(|_bytes| {
let _ = (precision, scale);
unimplemented!()
}),
DecimalReader::Int64 {
reader,
precision,
scale,
} => reader.read(def_level, rep_level).map(|_bytes| {
let _ = (precision, scale);
unimplemented!()
}),
DecimalReader::Array {
reader,
precision,
scale,
} => reader
.read(def_level, rep_level)
.map(|bytes| Decimal::from_bytes(bytes.into(), *precision as i32, *scale as i32)),
}
}
#[inline]
fn advance_columns(&mut self) -> Result<()> {
match self {
DecimalReader::Int32 { reader, .. } => reader.advance_columns(),
DecimalReader::Int64 { reader, .. } => reader.advance_columns(),
DecimalReader::Array { reader, .. } => reader.advance_columns(),
}
}
#[inline]
fn has_next(&self) -> bool {
match self {
DecimalReader::Int32 { reader, .. } => reader.has_next(),
DecimalReader::Int64 { reader, .. } => reader.has_next(),
DecimalReader::Array { reader, .. } => reader.has_next(),
}
}
#[inline]
fn current_def_level(&self) -> i16 {
match self {
DecimalReader::Int32 { reader, .. } => reader.current_def_level(),
DecimalReader::Int64 { reader, .. } => reader.current_def_level(),
DecimalReader::Array { reader, .. } => reader.current_def_level(),
}
}
#[inline]
fn current_rep_level(&self) -> i16 {
match self {
DecimalReader::Int32 { reader, .. } => reader.current_rep_level(),
DecimalReader::Int64 { reader, .. } => reader.current_rep_level(),
DecimalReader::Array { reader, .. } => reader.current_rep_level(),
}
}
}
impl ParquetData for Group {
type Schema = GroupSchema;
type Reader = GroupReader;
type Predicate = GroupPredicate;
fn parse(
schema: &Type, predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
if schema.is_group() && repetition == Some(Repetition::Required) {
let mut map = LinkedHashMap::with_capacity_and_hasher(
schema.get_fields().len(),
Default::default(),
);
let fields = schema.get_fields().iter();
let fields = if let Some(predicate) = predicate {
let fields = fields
.map(|field| (field.name(), field))
.collect::<HashMap<_, _>>();
predicate
.0
.iter()
.enumerate()
.map(|(i, (field, predicate))| {
let field = fields.get(&**field).ok_or_else(|| {
ParquetError::General(format!(
"Group has predicate field \"{}\" not in the schema",
field
))
})?;
let (name, schema) = <Value as ParquetData>::parse(
&**field,
predicate.as_ref(),
Some(field.get_basic_info().repetition()),
)?;
let x = map.insert(name, i);
assert!(x.is_none());
Ok(schema)
})
.collect::<Result<Vec<ValueSchema>>>()?
} else {
fields
.enumerate()
.map(|(i, field)| {
let (name, schema) = <Value as ParquetData>::parse(
&**field,
None,
Some(field.get_basic_info().repetition()),
)?;
let x = map.insert(name, i);
assert!(x.is_none());
Ok(schema)
})
.collect::<Result<Vec<ValueSchema>>>()?
};
let schema_ = GroupSchema(fields, map);
return Ok((schema.name().to_owned(), schema_));
}
Err(ParquetError::General(format!(
"Can't parse Group {:?}",
schema
)))
}
fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
let readers = schema
.1
.iter()
.map(|(name, _index)| name)
.zip(schema.0.iter())
.map(|(name, field)| {
path.push(name.clone());
let ret = Value::reader(field, path, def_level, rep_level, paths, batch_size);
let _ = path.pop().unwrap();
ret
})
.collect();
GroupReader {
readers,
fields: Arc::new(schema.1.clone()),
}
}
}
fn parse_list<T: ParquetData>(
schema: &Type, predicate: Option<&T::Predicate>,
) -> Result<ListSchema<T::Schema>> {
if schema.is_group()
&& schema.get_basic_info().logical_type() == LogicalType::List
&& schema.get_fields().len() == 1
{
let sub_schema = schema.get_fields().into_iter().nth(0).unwrap();
if sub_schema.get_basic_info().repetition() == Repetition::Repeated {
return Ok(
if sub_schema.is_group()
&& sub_schema.get_fields().len() == 1
&& sub_schema.name() != "array"
&& sub_schema.name() != format!("{}_tuple", schema.name())
{
let element = sub_schema.get_fields().into_iter().nth(0).unwrap();
let list_name = if sub_schema.name() == "list" {
None
} else {
Some(sub_schema.name().to_owned())
};
let element_name = if element.name() == "element" {
None
} else {
Some(element.name().to_owned())
};
ListSchema(
T::parse(
&*element,
predicate,
Some(element.get_basic_info().repetition()),
)?
.1,
ListSchemaType::List(list_name, element_name),
)
} else {
let element_name = sub_schema.name().to_owned();
ListSchema(
T::parse(&*sub_schema, predicate, Some(Repetition::Repeated))?.1,
ListSchemaType::ListCompat(element_name),
)
},
);
}
}
Err(ParquetError::General(String::from("Couldn't parse Vec<T>")))
}
impl<T: Data> ParquetData for List<T>
where
T: ParquetData,
{
default type Schema = ListSchema<T::Schema>;
default type Reader = RepeatedReader<T::Reader>;
type Predicate = T::Predicate;
default fn parse(
schema: &Type, predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
if repetition == Some(Repetition::Required) {
return parse_list::<T>(schema, predicate)
.map(|schema2| (schema.name().to_owned(), type_coerce(schema2)));
}
if repetition == Some(Repetition::Repeated) {
return Ok((
schema.name().to_owned(),
type_coerce(ListSchema(
T::parse(&schema, predicate, Some(Repetition::Required))?.1,
ListSchemaType::Repeated,
)),
));
}
Err(ParquetError::General(String::from("Couldn't parse Vec<T>")))
}
default fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
let schema: &ListSchema<T::Schema> = type_coerce(schema);
type_coerce(list_reader::<T>(
schema, path, def_level, rep_level, paths, batch_size,
))
}
}
fn list_reader<T>(
schema: &ListSchema<T::Schema>, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> RepeatedReader<T::Reader>
where
T: ParquetData,
{
match &schema.1 {
ListSchemaType::List(ref list_name, ref element_name) => {
let list_name = list_name.as_ref().map(|x| &**x).unwrap_or("list");
let element_name = element_name.as_ref().map(|x| &**x).unwrap_or("element");
path.push(list_name.to_owned());
path.push(element_name.to_owned());
let reader = T::reader(
&schema.0,
path,
def_level + 1,
rep_level + 1,
paths,
batch_size,
);
let _ = path.pop().unwrap();
let _ = path.pop().unwrap();
RepeatedReader { reader }
}
ListSchemaType::ListCompat(ref element_name) => {
path.push(element_name.to_owned());
let reader = T::reader(
&schema.0,
path,
def_level + 1,
rep_level + 1,
paths,
batch_size,
);
let _ = path.pop().unwrap();
RepeatedReader { reader }
}
ListSchemaType::Repeated => {
let reader = T::reader(
&schema.0,
path,
def_level + 1,
rep_level + 1,
paths,
batch_size,
);
RepeatedReader { reader }
}
}
}
fn byte_array_reader(
_schema: &ByteArraySchema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> ByteArrayReader {
let col_path = ColumnPath::new(path.to_vec());
let col_reader = paths.remove(&col_path).unwrap();
ByteArrayReader {
column: TypedTripletIter::<ByteArrayType>::new(
def_level, rep_level, col_reader, batch_size,
),
}
}
impl ParquetData for List<u8> {
type Schema = VecU8Schema;
type Reader = VecU8Reader;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
match schema {
VecU8Schema::ByteArray(schema) => VecU8Reader::ByteArray(byte_array_reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
VecU8Schema::List(schema) => VecU8Reader::List(list_reader::<u8>(
schema, path, def_level, rep_level, paths, batch_size,
)),
}
}
}
impl<K, V, S> ParquetData for HashMap<K, V, S>
where
K: ParquetData + Hash + Eq,
V: ParquetData,
S: BuildHasher + Default + Clone + Send + 'static,
{
type Schema = MapSchema<K::Schema, V::Schema>;
type Reader = impl Reader<Item = Self>;
type Predicate = MapPredicate<K::Predicate, V::Predicate>;
fn parse(
schema: &Type, predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
if schema.is_group()
&& repetition == Some(Repetition::Required)
&& (schema.get_basic_info().logical_type() == LogicalType::Map
|| schema.get_basic_info().logical_type() == LogicalType::MapKeyValue)
&& schema.get_fields().len() == 1
{
let sub_schema = schema.get_fields().into_iter().nth(0).unwrap();
if sub_schema.is_group()
&& sub_schema.get_basic_info().repetition() == Repetition::Repeated
&& sub_schema.get_fields().len() == 2
{
let mut fields = sub_schema.get_fields().into_iter();
let (key, value) = (fields.next().unwrap(), fields.next().unwrap());
let key_value_name = if sub_schema.name() == "key_value" {
None
} else {
Some(sub_schema.name().to_owned())
};
let key_name = if key.name() == "key" {
None
} else {
Some(key.name().to_owned())
};
let value_name = if value.name() == "value" {
None
} else {
Some(value.name().to_owned())
};
return Ok((
schema.name().to_owned(),
MapSchema(
K::parse(
&*key,
predicate.and_then(|predicate| predicate.key.as_ref()),
Some(key.get_basic_info().repetition()),
)?
.1,
V::parse(
&*value,
predicate.and_then(|predicate| predicate.value.as_ref()),
Some(value.get_basic_info().repetition()),
)?
.1,
key_value_name,
key_name,
value_name,
),
));
}
}
Err(ParquetError::General(String::from(
"Couldn't parse HashMap<K,V>",
)))
}
fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
let key_value_name = schema.2.as_ref().map(|x| &**x).unwrap_or("key_value");
let key_name = schema.3.as_ref().map(|x| &**x).unwrap_or("key");
let value_name = schema.4.as_ref().map(|x| &**x).unwrap_or("value");
path.push(key_value_name.to_owned());
path.push(key_name.to_owned());
let keys_reader = K::reader(
&schema.0,
path,
def_level + 1,
rep_level + 1,
paths,
batch_size,
);
let _ = path.pop().unwrap();
path.push(value_name.to_owned());
let values_reader = V::reader(
&schema.1,
path,
def_level + 1,
rep_level + 1,
paths,
batch_size,
);
let _ = path.pop().unwrap();
let _ = path.pop().unwrap();
MapReader(
KeyValueReader {
keys_reader,
values_reader,
},
|x: List<_>| {
Ok(From::from(
Vec::from(x).into_iter().collect::<HashMap<_, _, S>>(),
))
},
)
}
}
impl ParquetData for bool {
type Schema = BoolSchema;
type Reader = BoolReader;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
_schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
let col_path = ColumnPath::new(path.to_vec());
let col_reader = paths.remove(&col_path).unwrap();
BoolReader {
column: TypedTripletIter::<BoolType>::new(def_level, rep_level, col_reader, batch_size),
}
}
}
impl ParquetData for i8 {
type Schema = I8Schema;
type Reader = TryIntoReader<I32Reader, i8>;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
_schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
TryIntoReader(
i32::reader(&I32Schema, path, def_level, rep_level, paths, batch_size),
PhantomData,
)
}
}
impl ParquetData for u8 {
type Schema = U8Schema;
type Reader = TryIntoReader<I32Reader, u8>;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
_schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
TryIntoReader(
i32::reader(&I32Schema, path, def_level, rep_level, paths, batch_size),
PhantomData,
)
}
}
impl ParquetData for i16 {
type Schema = I16Schema;
type Reader = TryIntoReader<I32Reader, i16>;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
_schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
TryIntoReader(
i32::reader(&I32Schema, path, def_level, rep_level, paths, batch_size),
PhantomData,
)
}
}
impl ParquetData for u16 {
type Schema = U16Schema;
type Reader = TryIntoReader<I32Reader, u16>;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
_schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
TryIntoReader(
i32::reader(&I32Schema, path, def_level, rep_level, paths, batch_size),
PhantomData,
)
}
}
impl ParquetData for i32 {
type Schema = I32Schema;
type Reader = I32Reader;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
_schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
let col_path = ColumnPath::new(path.to_vec());
let col_reader = paths.remove(&col_path).unwrap();
I32Reader {
column: TypedTripletIter::<Int32Type>::new(
def_level, rep_level, col_reader, batch_size,
),
}
}
}
impl ParquetData for u32 {
type Schema = U32Schema;
type Reader = impl Reader<Item = Self>;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
_schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
MapReader(
i32::reader(&I32Schema, path, def_level, rep_level, paths, batch_size),
|x| Ok(x as u32),
)
}
}
impl ParquetData for i64 {
type Schema = I64Schema;
type Reader = I64Reader;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
_schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
let col_path = ColumnPath::new(path.to_vec());
let col_reader = paths.remove(&col_path).unwrap();
I64Reader {
column: TypedTripletIter::<Int64Type>::new(
def_level, rep_level, col_reader, batch_size,
),
}
}
}
impl ParquetData for u64 {
type Schema = U64Schema;
type Reader = impl Reader<Item = Self>;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
_schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
MapReader(
i64::reader(&I64Schema, path, def_level, rep_level, paths, batch_size),
|x| Ok(x as u64),
)
}
}
impl ParquetData for f32 {
type Schema = F32Schema;
type Reader = F32Reader;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
_schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
let col_path = ColumnPath::new(path.to_vec());
let col_reader = paths.remove(&col_path).unwrap();
F32Reader {
column: TypedTripletIter::<FloatType>::new(
def_level, rep_level, col_reader, batch_size,
),
}
}
}
impl ParquetData for f64 {
type Schema = F64Schema;
type Reader = F64Reader;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
_schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
let col_path = ColumnPath::new(path.to_vec());
let col_reader = paths.remove(&col_path).unwrap();
F64Reader {
column: TypedTripletIter::<DoubleType>::new(
def_level, rep_level, col_reader, batch_size,
),
}
}
}
impl<T> ParquetData for Option<T>
where
T: ParquetData,
{
type Schema = OptionSchema<T::Schema>;
type Reader = OptionReader<T::Reader>;
type Predicate = T::Predicate;
fn parse(
schema: &Type, predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
if repetition == Some(Repetition::Optional) {
return T::parse(&schema, predicate, Some(Repetition::Required))
.map(|(name, schema)| (name, OptionSchema(schema)));
}
Err(ParquetError::General(String::from(
"Couldn't parse Option<T>",
)))
}
fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
OptionReader {
reader: <T as ParquetData>::reader(
&schema.0,
path,
def_level + 1,
rep_level,
paths,
batch_size,
),
}
}
}
impl<T> ParquetData for Root<T>
where
T: ParquetData,
{
type Schema = RootSchema<T>;
type Reader = RootReader<T::Reader>;
type Predicate = T::Predicate;
fn parse(
schema: &Type, predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
assert!(repetition.is_none());
if schema.is_schema() {
T::parse(schema, predicate, Some(Repetition::Required))
.map(|(name, schema_)| (String::from(""), RootSchema(name, schema_, PhantomData)))
.map_err(|err| {
let actual_schema = Value::parse(schema, None, Some(Repetition::Required))
.map(|(name, schema_)| RootSchema(name, schema_, PhantomData));
let actual_schema = match actual_schema {
Ok(actual_schema) => actual_schema,
Err(err) => return err,
};
let actual_schema = DisplayFmt::new(|fmt| {
<<Root<Value> as ParquetData>::Schema>::fmt(
Some(&actual_schema),
None,
None,
fmt,
)
});
let schema_ = DisplayFmt::new(|fmt| {
<<Root<T> as ParquetData>::Schema>::fmt(None, None, None, fmt)
});
ParquetError::General(format!(
"Types don't match schema.\nSchema is:\n{}\nBut types require:\n{}\nError: {}",
actual_schema,
schema_,
err
))
})
.map(|(name, schema_)| {
#[cfg(debug_assertions)]
{
use crate::internal::schema::parser::parse_message_type;
let printed = format!("{}", schema_);
let schema_2 = parse_message_type(&printed).unwrap();
let schema2 = T::parse(&schema_2, None, Some(Repetition::Required))
.map(|(name, schema_)| RootSchema::<T>(name, schema_, PhantomData))
.unwrap();
let printed2 = format!("{}", schema2);
assert_eq!(printed, printed2, "{:#?}", schema);
let schema3 = Value::parse(&schema_2, None, Some(Repetition::Required))
.map(|(name, schema_)| RootSchema::<Value>(name, schema_, PhantomData))
.unwrap();
let printed3 = format!("{}", schema3);
assert_eq!(printed, printed3, "{:#?}", schema);
}
(name, schema_)
})
} else {
Err(ParquetError::General(format!(
"Not a valid root schema {:?}",
schema
)))
}
}
fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
RootReader(T::reader(
&schema.1, path, def_level, rep_level, paths, batch_size,
))
}
}
const JULIAN_DAY_OF_EPOCH: i64 = 2_440_588;
const SECONDS_PER_DAY: u64 = 86_400;
const MILLIS_PER_SECOND: u64 = 1_000;
const MICROS_PER_MILLI: u64 = 1_000;
const NANOS_PER_MICRO: u64 = 1_000;
via_string!(
"Corresponds to string `%:z OR name`" Timezone
"Corresponds to string `%Y-%m-%d`" DateWithoutTimezone
"Corresponds to RFC 3339 and ISO 8601 string `%H:%M:%S%.9f%:z`" TimeWithoutTimezone
"Corresponds to RFC 3339 and ISO 8601 string `%Y-%m-%dT%H:%M:%S%.9f%:z`" DateTimeWithoutTimezone
);
fn date_from_parquet(days: i32) -> Result<Date> {
Ok(Date::from_days(i64::from(days), Timezone::UTC).unwrap())
}
impl ParquetData for Date {
type Schema = DateSchema;
type Reader = impl Reader<Item = Self>;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
_schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
MapReader(
i32::reader(&I32Schema, path, def_level, rep_level, paths, batch_size),
|days| date_from_parquet(days),
)
}
}
fn time_from_parquet(time: Sum2<i64, i32>) -> Result<Time> {
match time {
Sum2::A(micros) => {
let err = || ParquetError::General(format!("Invalid Time Micros {}", micros));
let micros: u64 = micros.try_into().ok().ok_or_else(err)?;
let divisor = MICROS_PER_MILLI * MILLIS_PER_SECOND;
let seconds = (micros / divisor).try_into().ok().ok_or_else(err)?;
let nanos = u32::try_from(micros % divisor).unwrap() * NANOS_PER_MICRO as u32;
Time::from_seconds(seconds, nanos, Timezone::UTC).ok_or_else(err)
}
Sum2::B(millis) => {
let err = || ParquetError::General(format!("Invalid Time Millis {}", millis));
let millis: u32 = millis.try_into().ok().ok_or_else(err)?;
let divisor = MILLIS_PER_SECOND as u32;
let seconds = (millis / divisor).try_into().ok().ok_or_else(err)?;
let nanos = (millis % divisor) * (MICROS_PER_MILLI * NANOS_PER_MICRO) as u32;
Time::from_seconds(seconds, nanos, Timezone::UTC).ok_or_else(err)
}
}
}
impl ParquetData for Time {
type Schema = TimeSchema;
type Reader = impl Reader<Item = Self>;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
match schema {
TimeSchema::Micros => Sum2::A(MapReader(
i64::reader(&I64Schema, path, def_level, rep_level, paths, batch_size),
|micros: i64| time_from_parquet(Sum2::A(micros)),
)),
TimeSchema::Millis => Sum2::B(MapReader(
i32::reader(&I32Schema, path, def_level, rep_level, paths, batch_size),
|millis: i32| time_from_parquet(Sum2::B(millis)),
)),
}
}
}
fn date_time_from_parquet(date_time: Sum3<Int96, i64, i64>) -> Result<DateTime> {
match date_time {
Sum3::A(date_time) => {
let mut day = i64::from(date_time.data()[2]);
let nanoseconds =
(i64::from(date_time.data()[1]) << 32) + i64::from(date_time.data()[0]);
let nanos_per_second = NANOS_PER_MICRO * MICROS_PER_MILLI * MILLIS_PER_SECOND;
let nanos_per_day = (nanos_per_second * SECONDS_PER_DAY) as i64;
day += nanoseconds.div_euclid(nanos_per_day);
let nanoseconds: u64 = nanoseconds.rem_euclid(nanos_per_day).try_into().unwrap();
let date =
Date::from_days(day.checked_sub(JULIAN_DAY_OF_EPOCH).unwrap(), Timezone::UTC)
.unwrap();
let time = Time::from_seconds(
(nanoseconds / nanos_per_second).try_into().unwrap(),
(nanoseconds % nanos_per_second).try_into().unwrap(),
Timezone::UTC,
)
.unwrap();
Ok(DateTime::from_date_time(date, time).unwrap())
}
Sum3::B(millis) => {
let millis_per_day = (MILLIS_PER_SECOND * SECONDS_PER_DAY) as i64;
let days = millis.div_euclid(millis_per_day);
let millis: u32 = millis.rem_euclid(millis_per_day).try_into().unwrap();
let date = Date::from_days(days, Timezone::UTC).unwrap();
let time = Time::from_seconds(
(millis / MILLIS_PER_SECOND as u32).try_into().unwrap(),
(millis % MILLIS_PER_SECOND as u32).try_into().unwrap(),
Timezone::UTC,
)
.unwrap();
Ok(DateTime::from_date_time(date, time).unwrap())
}
Sum3::C(micros) => {
let micros_per_day = (MICROS_PER_MILLI * MILLIS_PER_SECOND * SECONDS_PER_DAY) as i64;
let micros_per_second = (MICROS_PER_MILLI * MILLIS_PER_SECOND) as u64;
let days = micros.div_euclid(micros_per_day);
let micros: u64 = micros.rem_euclid(micros_per_day).try_into().unwrap();
let date = Date::from_days(days, Timezone::UTC).unwrap();
let time = Time::from_seconds(
(micros / micros_per_second).try_into().unwrap(),
(micros % micros_per_second).try_into().unwrap(),
Timezone::UTC,
)
.unwrap();
Ok(DateTime::from_date_time(date, time).unwrap())
}
}
}
impl ParquetData for DateTime {
type Schema = DateTimeSchema;
type Reader = impl Reader<Item = Self>;
type Predicate = Predicate;
fn parse(
schema: &Type, _predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
Value::parse(schema, None, repetition).and_then(downcast)
}
fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
match schema {
DateTimeSchema::Int96 => Sum3::A(MapReader(
{
let col_path = ColumnPath::new(path.to_vec());
let col_reader = paths.remove(&col_path).unwrap();
I96Reader {
column: TypedTripletIter::<Int96Type>::new(
def_level, rep_level, col_reader, batch_size,
),
}
},
|date_time: Int96| date_time_from_parquet(Sum3::A(date_time)),
)),
DateTimeSchema::Millis => Sum3::B(MapReader(
i64::reader(&I64Schema, path, def_level, rep_level, paths, batch_size),
|millis: i64| date_time_from_parquet(Sum3::B(millis)),
)),
DateTimeSchema::Micros => Sum3::C(MapReader(
i64::reader(&I64Schema, path, def_level, rep_level, paths, batch_size),
|micros: i64| date_time_from_parquet(Sum3::C(micros)),
)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use amadeus_types::Timezone;
use chrono::{offset::TimeZone, NaiveDate, NaiveTime, Utc};
#[test]
fn test_int96() {
let value = date_time_from_parquet(Sum3::A(Int96::new(0, 0, 2454923))).unwrap();
assert_eq!(value.as_chrono().unwrap().timestamp_millis(), 1238544000000);
let value = date_time_from_parquet(Sum3::A(Int96::new(4165425152, 13, 2454923))).unwrap();
assert_eq!(value.as_chrono().unwrap().timestamp_millis(), 1238544060000);
let value = date_time_from_parquet(Sum3::A(Int96::new(0, 0, 0))).unwrap();
assert_eq!(
value.as_chrono().unwrap().timestamp_millis(),
-210866803200000
);
}
#[test]
fn test_convert_date_to_string() {
fn check_date_conversion(y: i32, m: u32, d: u32) {
let datetime = NaiveDate::from_ymd(y, m, d).and_hms(0, 0, 0);
let dt = Utc.from_utc_datetime(&datetime);
let date = date_from_parquet((dt.timestamp() / SECONDS_PER_DAY as i64) as i32).unwrap();
assert_eq!(date.to_string(), dt.format("%Y-%m-%d %:z").to_string());
}
check_date_conversion(2010, 01, 02);
check_date_conversion(2014, 05, 01);
check_date_conversion(2016, 02, 29);
check_date_conversion(2017, 09, 12);
check_date_conversion(2018, 03, 31);
}
#[test]
fn test_convert_time_to_string() {
fn check_time_conversion(h: u32, mi: u32, s: u32) {
let chrono_time = NaiveTime::from_hms(h, mi, s);
let time = TimeWithoutTimezone::from_chrono(&chrono_time).with_timezone(Timezone::UTC);
assert_eq!(
time.to_string(),
format!("{}+00:00", chrono_time.format("%H:%M:%S%.9f").to_string())
);
}
check_time_conversion(13, 12, 54);
check_time_conversion(08, 23, 01);
check_time_conversion(11, 06, 32);
check_time_conversion(16, 38, 00);
check_time_conversion(21, 15, 12);
}
#[test]
fn test_convert_timestamp_to_string() {
#[allow(clippy::many_single_char_names)]
fn check_datetime_conversion(y: i32, m: u32, d: u32, h: u32, mi: u32, s: u32) {
let datetime = NaiveDate::from_ymd(y, m, d).and_hms(h, mi, s);
let dt = Utc.from_utc_datetime(&datetime);
let res = DateTime::from_chrono(&dt).to_string();
let exp = dt.format("%Y-%m-%d %H:%M:%S%.9f %:z").to_string();
assert_eq!(res, exp);
}
check_datetime_conversion(2010, 01, 02, 13, 12, 54);
check_datetime_conversion(2011, 01, 03, 08, 23, 01);
check_datetime_conversion(2012, 04, 05, 11, 06, 32);
check_datetime_conversion(2013, 05, 12, 16, 38, 00);
check_datetime_conversion(2014, 11, 28, 21, 15, 12);
}
}
macro_rules! tuple {
($len:tt $($t:ident $i:tt)*) => (
impl<$($t,)*> Reader for TupleReader<($($t,)*)> where $($t: Reader,)* {
type Item = ($($t::Item,)*);
#[allow(unused_variables, non_snake_case)]
fn read(&mut self, def_level: i16, rep_level: i16) -> Result<Self::Item> {
$(
let $t = (self.0).$i.read(def_level, rep_level);
)*
if $($t.is_err() ||)* false {
$(let _ = $t?;)*
unreachable!()
}
Ok((
$($t.unwrap(),)*
))
}
fn advance_columns(&mut self) -> Result<()> {
#[allow(unused_mut)]
let mut res = Ok(());
$(
res = res.and((self.0).$i.advance_columns());
)*
res
}
#[inline]
fn has_next(&self) -> bool {
$(if true { (self.0).$i.has_next() } else)*
{
true
}
}
#[inline]
fn current_def_level(&self) -> i16 {
$(if true { (self.0).$i.current_def_level() } else)*
{
panic!("Current definition level: empty group reader")
}
}
#[inline]
fn current_rep_level(&self) -> i16 {
$(if true { (self.0).$i.current_rep_level() } else)*
{
panic!("Current repetition level: empty group reader")
}
}
}
impl<$($t,)*> Default for TupleSchema<($((String,$t,),)*)> where $($t: Default,)* {
fn default() -> Self {
Self(($((format!("field_{}", $i), Default::default()),)*))
}
}
impl<$($t,)*> fmt::Debug for TupleSchema<($((String,$t,),)*)> where $($t: fmt::Debug,)* {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_tuple("TupleSchema")
$(.field(&(self.0).$i))*
.finish()
}
}
impl<$($t,)*> Schema for TupleSchema<($((String,$t,),)*)> where $($t: Schema,)* {
#[allow(unused_variables)]
fn fmt(self_: Option<&Self>, r: Option<Repetition>, name: Option<&str>, f: &mut fmt::Formatter) -> fmt::Result {
DisplaySchemaGroup::new(r, name, None, f)
$(.field(self_.map(|self_|&*(self_.0).$i.0), self_.map(|self_|&(self_.0).$i.1)))*
.finish()
}
}
impl<$($t,)*> ParquetData for ($($t,)*) where $($t: ParquetData,)* {
type Schema = TupleSchema<($((String,$t::Schema,),)*)>;
type Reader = TupleReader<($($t::Reader,)*)>;
type Predicate = ($(Option<$t::Predicate>,)*);
#[allow(unused_variables)]
fn parse(schema: &Type, predicate: Option<&Self::Predicate>, repetition: Option<Repetition>) -> Result<(String, Self::Schema)> {
if schema.is_group() && repetition == Some(Repetition::Required) {
let mut fields = schema.get_fields().iter();
let schema_ = TupleSchema(($(fields.next().ok_or_else(|| ParquetError::General(String::from("Group missing field"))).and_then(|x|$t::parse(&**x, predicate.and_then(|predicate| predicate.$i.as_ref()), Some(x.get_basic_info().repetition())))?,)*));
if fields.next().is_none() {
return Ok((schema.name().to_owned(), schema_))
}
}
Err(ParquetError::General(format!("Can't parse Tuple {:?}", schema)))
}
#[allow(unused_variables)]
fn reader(schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16, paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize) -> Self::Reader {
$(
path.push((schema.0).$i.0.to_owned());
#[allow(non_snake_case)]
let $t = <$t as ParquetData>::reader(&(schema.0).$i.1, path, def_level, rep_level, paths, batch_size);
let _ = path.pop().unwrap();
)*
TupleReader(($($t,)*))
}
}
impl<$($t,)*> Downcast<TupleSchema<($((String,$t,),)*)>> for ValueSchema where ValueSchema: $(Downcast<$t> +)* {
fn downcast(self) -> Result<TupleSchema<($((String,$t,),)*)>> {
let group = self.into_group()?;
#[allow(unused_mut,unused_variables)]
let mut fields = group.0.into_iter();
#[allow(unused_mut,unused_variables)]
let mut names = group.1.into_iter().map(|(name,_index)|name);
Ok(TupleSchema(($({let _ = $i;(names.next().unwrap(),fields.next().unwrap().downcast()?)},)*)))
}
}
);
}
amadeus_types::tuple!(tuple);
impl ParquetData for Value {
type Schema = ValueSchema;
type Reader = ValueReader;
type Predicate = ValuePredicate;
fn parse(
schema: &Type, predicate: Option<&Self::Predicate>, repetition: Option<Repetition>,
) -> Result<(String, Self::Schema)> {
let mut value = None;
if repetition.is_some() && schema.is_primitive() {
value = Some(
match (
schema.get_physical_type(),
schema.get_basic_info().logical_type(),
) {
(PhysicalType::Boolean, LogicalType::None) => ValueSchema::Bool(BoolSchema),
(PhysicalType::Int32, LogicalType::Uint8) => ValueSchema::U8(U8Schema),
(PhysicalType::Int32, LogicalType::Int8) => ValueSchema::I8(I8Schema),
(PhysicalType::Int32, LogicalType::Uint16) => ValueSchema::U16(U16Schema),
(PhysicalType::Int32, LogicalType::Int16) => ValueSchema::I16(I16Schema),
(PhysicalType::Int32, LogicalType::Uint32) => ValueSchema::U32(U32Schema),
(PhysicalType::Int32, LogicalType::Int32)
| (PhysicalType::Int32, LogicalType::None) => ValueSchema::I32(I32Schema),
(PhysicalType::Int32, LogicalType::Date) => ValueSchema::Date(DateSchema),
(PhysicalType::Int32, LogicalType::TimeMillis) => {
ValueSchema::Time(TimeSchema::Millis)
}
(PhysicalType::Int32, LogicalType::Decimal) => {
let (precision, scale) = (schema.get_precision(), schema.get_scale());
let (precision, scale) =
(precision.try_into().unwrap(), scale.try_into().unwrap());
ValueSchema::Decimal(DecimalSchema::Int32 { precision, scale })
}
(PhysicalType::Int64, LogicalType::Uint64) => ValueSchema::U64(U64Schema),
(PhysicalType::Int64, LogicalType::Int64)
| (PhysicalType::Int64, LogicalType::None) => ValueSchema::I64(I64Schema),
(PhysicalType::Int64, LogicalType::TimeMicros) => {
ValueSchema::Time(TimeSchema::Micros)
}
(PhysicalType::Int64, LogicalType::TimestampMillis) => {
ValueSchema::DateTime(DateTimeSchema::Millis)
}
(PhysicalType::Int64, LogicalType::TimestampMicros) => {
ValueSchema::DateTime(DateTimeSchema::Micros)
}
(PhysicalType::Int64, LogicalType::Decimal) => {
let (precision, scale) = (schema.get_precision(), schema.get_scale());
let (precision, scale) =
(precision.try_into().unwrap(), scale.try_into().unwrap());
ValueSchema::Decimal(DecimalSchema::Int64 { precision, scale })
}
(PhysicalType::Int96, LogicalType::None) => {
ValueSchema::DateTime(DateTimeSchema::Int96)
}
(PhysicalType::Float, LogicalType::None) => ValueSchema::F32(F32Schema),
(PhysicalType::Double, LogicalType::None) => ValueSchema::F64(F64Schema),
(PhysicalType::ByteArray, LogicalType::Utf8)
| (PhysicalType::FixedLenByteArray, LogicalType::Utf8) => {
ValueSchema::String(StringSchema(ByteArraySchema(
if schema.get_physical_type() == PhysicalType::FixedLenByteArray {
Some(schema.get_type_length().try_into().unwrap())
} else {
None
},
)))
}
(PhysicalType::ByteArray, LogicalType::Json)
| (PhysicalType::FixedLenByteArray, LogicalType::Json) => {
ValueSchema::Json(JsonSchema(StringSchema(ByteArraySchema(
if schema.get_physical_type() == PhysicalType::FixedLenByteArray {
Some(schema.get_type_length().try_into().unwrap())
} else {
None
},
))))
}
(PhysicalType::ByteArray, LogicalType::Enum)
| (PhysicalType::FixedLenByteArray, LogicalType::Enum) => {
ValueSchema::Enum(EnumSchema(StringSchema(ByteArraySchema(
if schema.get_physical_type() == PhysicalType::FixedLenByteArray {
Some(schema.get_type_length().try_into().unwrap())
} else {
None
},
))))
}
(PhysicalType::ByteArray, LogicalType::None)
| (PhysicalType::FixedLenByteArray, LogicalType::None) => {
ValueSchema::ByteArray(ByteArraySchema(
if schema.get_physical_type() == PhysicalType::FixedLenByteArray {
Some(schema.get_type_length().try_into().unwrap())
} else {
None
},
))
}
(PhysicalType::ByteArray, LogicalType::Bson)
| (PhysicalType::FixedLenByteArray, LogicalType::Bson) => {
ValueSchema::Bson(BsonSchema(ByteArraySchema(
if schema.get_physical_type() == PhysicalType::FixedLenByteArray {
Some(schema.get_type_length().try_into().unwrap())
} else {
None
},
)))
}
(PhysicalType::ByteArray, LogicalType::Decimal)
| (PhysicalType::FixedLenByteArray, LogicalType::Decimal) => {
let byte_array_schema = ByteArraySchema(
if schema.get_physical_type() == PhysicalType::FixedLenByteArray {
Some(schema.get_type_length().try_into().unwrap())
} else {
None
},
);
let (precision, scale) = (schema.get_precision(), schema.get_scale());
let (precision, scale) =
(precision.try_into().unwrap(), scale.try_into().unwrap());
ValueSchema::Decimal(DecimalSchema::Array {
byte_array_schema,
precision,
scale,
})
}
(PhysicalType::ByteArray, LogicalType::Interval)
| (PhysicalType::FixedLenByteArray, LogicalType::Interval) => {
unimplemented!("Interval logical type not yet implemented")
}
(PhysicalType::Boolean, _) => ValueSchema::Bool(BoolSchema),
(PhysicalType::Int32, _) => ValueSchema::I32(I32Schema),
(PhysicalType::Int64, _) => ValueSchema::I64(I64Schema),
(PhysicalType::Int96, _) => ValueSchema::DateTime(DateTimeSchema::Int96),
(PhysicalType::Float, _) => ValueSchema::F32(F32Schema),
(PhysicalType::Double, _) => ValueSchema::F64(F64Schema),
(PhysicalType::ByteArray, _) | (PhysicalType::FixedLenByteArray, _) => {
ValueSchema::ByteArray(ByteArraySchema(
if schema.get_physical_type() == PhysicalType::FixedLenByteArray {
Some(schema.get_type_length().try_into().unwrap())
} else {
None
},
))
}
},
);
}
if repetition.is_some()
&& value.is_none()
&& predicate
.map(|predicate| predicate.is_list())
.unwrap_or(true)
{
value = parse_list::<Value>(
schema,
predicate.and_then(|predicate| predicate.as_list().unwrap().as_ref()),
)
.ok()
.map(|value| ValueSchema::List(Box::new(value)));
}
if repetition.is_some()
&& value.is_none()
&& predicate
.map(|predicate| predicate.is_map())
.unwrap_or(true)
{
let predicate = predicate.and_then(|predicate| predicate.as_map().unwrap().as_ref());
value = HashMap::<Value, Value>::parse(schema, predicate, Some(Repetition::Required))
.ok()
.map(|(_, value)| ValueSchema::Map(Box::new(value)));
}
if repetition.is_some()
&& value.is_none()
&& schema.is_group()
&& predicate
.map(|predicate| predicate.is_group())
.unwrap_or(true)
{
let predicate = predicate.and_then(|predicate| predicate.as_group().unwrap().as_ref());
value = Some(ValueSchema::Group(
Group::parse(schema, predicate, Some(Repetition::Required))?.1,
));
}
let mut value = value
.ok_or_else(|| ParquetError::General(format!("Can't parse value {:?}", schema)))?;
match repetition.unwrap() {
Repetition::Optional => {
value = ValueSchema::Option(Box::new(OptionSchema(value)));
}
Repetition::Repeated => {
value = ValueSchema::List(Box::new(ListSchema(value, ListSchemaType::Repeated)));
}
Repetition::Required => (),
}
Ok((schema.name().to_owned(), value))
}
fn reader(
schema: &Self::Schema, path: &mut Vec<String>, def_level: i16, rep_level: i16,
paths: &mut HashMap<ColumnPath, ColumnReader>, batch_size: usize,
) -> Self::Reader {
match *schema {
ValueSchema::Bool(ref schema) => ValueReader::Bool(<bool as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::U8(ref schema) => ValueReader::U8(<u8 as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::I8(ref schema) => ValueReader::I8(<i8 as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::U16(ref schema) => ValueReader::U16(<u16 as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::I16(ref schema) => ValueReader::I16(<i16 as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::U32(ref schema) => ValueReader::U32(<u32 as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::I32(ref schema) => ValueReader::I32(<i32 as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::U64(ref schema) => ValueReader::U64(<u64 as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::I64(ref schema) => ValueReader::I64(<i64 as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::F32(ref schema) => ValueReader::F32(<f32 as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::F64(ref schema) => ValueReader::F64(<f64 as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::Date(ref schema) => ValueReader::Date(<Date as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::Time(ref schema) => ValueReader::Time(<Time as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::DateTime(ref schema) => {
ValueReader::DateTime(<DateTime as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
))
}
ValueSchema::Decimal(ref schema) => {
ValueReader::Decimal(<Decimal as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
))
}
ValueSchema::ByteArray(ref schema) => ValueReader::ByteArray(byte_array_reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::Bson(ref schema) => ValueReader::Bson(<Bson as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::String(ref schema) => {
ValueReader::String(<String as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
))
}
ValueSchema::Json(ref schema) => ValueReader::Json(<Json as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::Enum(ref schema) => ValueReader::Enum(<Enum as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::List(ref schema) => {
ValueReader::List(Box::new(<List<Value> as ParquetData>::reader(
type_coerce(&**schema),
path,
def_level,
rep_level,
paths,
batch_size,
)))
}
ValueSchema::Map(ref schema) => {
ValueReader::Map(Box::new(<HashMap<Value, Value> as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)))
}
ValueSchema::Group(ref schema) => ValueReader::Group(<Group as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)),
ValueSchema::Option(ref schema) => {
ValueReader::Option(Box::new(<Option<Value> as ParquetData>::reader(
schema, path, def_level, rep_level, paths, batch_size,
)))
}
}
}
}
via_string!(
"" Webpage<'static>
"" Url
"" IpAddr
);
impl From<amadeus_types::ParseDateError> for ParquetError {
fn from(err: amadeus_types::ParseDateError) -> Self {
ParquetError::General(err.to_string())
}
}
impl From<amadeus_types::ParseAddrError> for ParquetError {
fn from(err: amadeus_types::ParseAddrError) -> Self {
ParquetError::General(err.to_string())
}
}
impl From<amadeus_types::ParseUrlError> for ParquetError {
fn from(err: amadeus_types::ParseUrlError) -> Self {
ParquetError::General(err.to_string())
}
}
impl From<amadeus_types::ParseWebpageError> for ParquetError {
fn from(err: amadeus_types::ParseWebpageError) -> Self {
ParquetError::General(err.to_string())
}
}
fn type_coerce<A, B>(a: A) -> B {
try_type_coerce(a)
.unwrap_or_else(|| panic!("can't coerce {} to {}", type_name::<A>(), type_name::<B>()))
}
fn try_type_coerce<A, B>(a: A) -> Option<B> {
trait Eq<B> {
fn eq(self) -> Option<B>;
}
struct Foo<A, B>(A, PhantomData<fn() -> B>);
impl<A, B> Eq<B> for Foo<A, B> {
default fn eq(self) -> Option<B> {
None
}
}
impl<A> Eq<A> for Foo<A, A> {
fn eq(self) -> Option<A> {
Some(self.0)
}
}
Foo::<A, B>(a, PhantomData).eq()
}