use std::io::Read;
use std::sync::Arc;
use avro_schema::{Record, Schema as AvroSchema};
use fallible_streaming_iterator::FallibleStreamingIterator;
mod block;
mod decompress;
pub use block::BlockStreamIterator;
pub use decompress::{decompress_block, Decompressor};
mod deserialize;
pub use deserialize::deserialize;
mod header;
mod nested;
mod schema;
mod util;
pub(super) use header::deserialize_header;
pub(super) use schema::infer_schema;
use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::{Field, Schema};
use crate::error::Result;
use super::Compression;
#[allow(clippy::type_complexity)]
pub fn read_metadata<R: std::io::Read>(
reader: &mut R,
) -> Result<(Vec<AvroSchema>, Schema, Option<Compression>, [u8; 16])> {
let (avro_schema, codec, marker) = util::read_schema(reader)?;
let schema = infer_schema(&avro_schema)?;
let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema {
fields.into_iter().map(|x| x.schema).collect()
} else {
panic!()
};
Ok((avro_schema, schema, codec, marker))
}
pub struct Reader<R: Read> {
iter: Decompressor<R>,
avro_schemas: Vec<AvroSchema>,
fields: Vec<Field>,
projection: Vec<bool>,
}
impl<R: Read> Reader<R> {
pub fn new(
iter: Decompressor<R>,
avro_schemas: Vec<AvroSchema>,
fields: Vec<Field>,
projection: Option<Vec<bool>>,
) -> Self {
let projection = projection.unwrap_or_else(|| fields.iter().map(|_| true).collect());
Self {
iter,
avro_schemas,
fields,
projection,
}
}
pub fn into_inner(self) -> R {
self.iter.into_inner()
}
}
impl<R: Read> Iterator for Reader<R> {
type Item = Result<Chunk<Arc<dyn Array>>>;
fn next(&mut self) -> Option<Self::Item> {
let fields = &self.fields[..];
let avro_schemas = &self.avro_schemas;
let projection = &self.projection;
self.iter
.next()
.transpose()
.map(|maybe_block| deserialize(maybe_block?, fields, avro_schemas, projection))
}
}