use std::convert::TryInto;
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;
use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::{Field, Schema};
use crate::error::{ArrowError, Result};
use crate::io::ipc::IpcSchema;
use super::super::{ARROW_MAGIC, CONTINUATION_MARKER};
use super::common::*;
use super::schema::fb_to_schema;
use super::Dictionaries;
use arrow_format::ipc::planus::{ReadAsRoot, Vector};
#[derive(Debug, Clone)]
pub struct FileMetadata {
pub schema: Schema,
pub ipc_schema: IpcSchema,
pub(super) blocks: Vec<arrow_format::ipc::Block>,
pub(super) dictionaries: Dictionaries,
}
pub struct FileReader<R: Read + Seek> {
reader: R,
metadata: FileMetadata,
current_block: usize,
projection: Option<(Vec<usize>, Schema)>,
buffer: Vec<u8>,
}
fn read_dictionary_message<R: Read + Seek>(
reader: &mut R,
offset: u64,
data: &mut Vec<u8>,
) -> Result<()> {
let mut message_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::Start(offset))?;
reader.read_exact(&mut message_size)?;
if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size)?;
};
let footer_len = i32::from_le_bytes(message_size);
data.clear();
data.resize(footer_len as usize, 0);
reader.read_exact(data)?;
Ok(())
}
fn read_dictionaries<R: Read + Seek>(
reader: &mut R,
fields: &[Field],
ipc_schema: &IpcSchema,
blocks: Vector<arrow_format::ipc::BlockRef>,
) -> Result<Dictionaries> {
let mut dictionaries = Default::default();
let mut data = vec![];
for block in blocks {
let offset = block.offset() as u64;
let length = block.meta_data_length() as u64;
read_dictionary_message(reader, offset, &mut data)?;
let message = arrow_format::ipc::MessageRef::read_as_root(&data).map_err(|err| {
ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;
let header = message
.header()?
.ok_or_else(|| ArrowError::oos("Message must have an header"))?;
match header {
arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => {
let block_offset = offset + length;
read_dictionary(
batch,
fields,
ipc_schema,
&mut dictionaries,
reader,
block_offset,
)?;
}
t => {
return Err(ArrowError::OutOfSpec(format!(
"Expecting DictionaryBatch in dictionary blocks, found {:?}.",
t
)));
}
};
}
Ok(dictionaries)
}
pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata> {
let mut magic_buffer: [u8; 6] = [0; 6];
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"Arrow file does not contain correct header".to_string(),
));
}
reader.seek(SeekFrom::End(-6))?;
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != ARROW_MAGIC {
return Err(ArrowError::OutOfSpec(
"Arrow file does not contain correct footer".to_string(),
));
}
let mut footer_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::End(-10))?;
reader.read_exact(&mut footer_size)?;
let footer_len = i32::from_le_bytes(footer_size);
let mut footer_data = vec![0; footer_len as usize];
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
reader.read_exact(&mut footer_data)?;
let footer = arrow_format::ipc::FooterRef::read_as_root(&footer_data)
.map_err(|err| ArrowError::OutOfSpec(format!("Unable to get root as footer: {:?}", err)))?;
let blocks = footer.record_batches()?.ok_or_else(|| {
ArrowError::OutOfSpec("Unable to get record batches from footer".to_string())
})?;
let ipc_schema = footer
.schema()?
.ok_or_else(|| ArrowError::OutOfSpec("Unable to get the schema from footer".to_string()))?;
let (schema, ipc_schema) = fb_to_schema(ipc_schema)?;
let dictionary_blocks = footer.dictionaries()?;
let dictionaries = if let Some(blocks) = dictionary_blocks {
read_dictionaries(reader, &schema.fields, &ipc_schema, blocks)?
} else {
Default::default()
};
Ok(FileMetadata {
schema,
ipc_schema,
blocks: blocks
.iter()
.map(|block| Ok(block.try_into()?))
.collect::<Result<Vec<_>>>()?,
dictionaries,
})
}
pub(super) fn get_serialized_batch<'a>(
message: &'a arrow_format::ipc::MessageRef,
) -> Result<arrow_format::ipc::RecordBatchRef<'a>> {
let header = message.header()?.ok_or_else(|| {
ArrowError::oos("IPC: unable to fetch the message header. The file or stream is corrupted.")
})?;
match header {
arrow_format::ipc::MessageHeaderRef::Schema(_) => Err(ArrowError::OutOfSpec(
"Not expecting a schema when messages are read".to_string(),
)),
arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => Ok(batch),
t => Err(ArrowError::OutOfSpec(format!(
"Reading types other than record batches not yet supported, unable to read {:?}",
t
))),
}
}
pub fn read_batch<R: Read + Seek>(
reader: &mut R,
metadata: &FileMetadata,
projection: Option<&[usize]>,
block: usize,
block_data: &mut Vec<u8>,
) -> Result<Chunk<Arc<dyn Array>>> {
let block = metadata.blocks[block];
reader.seek(SeekFrom::Start(block.offset as u64))?;
let mut meta_buf = [0; 4];
reader.read_exact(&mut meta_buf)?;
if meta_buf == CONTINUATION_MARKER {
reader.read_exact(&mut meta_buf)?;
}
let meta_len = i32::from_le_bytes(meta_buf) as usize;
block_data.clear();
block_data.resize(meta_len, 0);
reader.read_exact(block_data)?;
let message = arrow_format::ipc::MessageRef::read_as_root(&block_data[..])
.map_err(|err| ArrowError::oos(format!("Unable parse message: {:?}", err)))?;
let batch = get_serialized_batch(&message)?;
read_record_batch(
batch,
&metadata.schema.fields,
&metadata.ipc_schema,
projection,
&metadata.dictionaries,
message.version()?,
reader,
block.offset as u64 + block.meta_data_length as u64,
)
}
impl<R: Read + Seek> FileReader<R> {
pub fn new(reader: R, metadata: FileMetadata, projection: Option<Vec<usize>>) -> Self {
if let Some(projection) = projection.as_ref() {
projection.windows(2).for_each(|x| {
assert!(
x[0] < x[1],
"The projection on IPC must be ordered and non-overlapping"
);
});
}
let projection = projection.map(|projection| {
let fields = projection
.iter()
.map(|x| metadata.schema.fields[*x].clone())
.collect();
let schema = Schema {
fields,
metadata: metadata.schema.metadata.clone(),
};
(projection, schema)
});
Self {
reader,
metadata,
projection,
current_block: 0,
buffer: vec![],
}
}
pub fn schema(&self) -> &Schema {
self.projection
.as_ref()
.map(|x| &x.1)
.unwrap_or(&self.metadata.schema)
}
pub fn metadata(&self) -> &FileMetadata {
&self.metadata
}
pub fn into_inner(self) -> R {
self.reader
}
}
impl<R: Read + Seek> Iterator for FileReader<R> {
type Item = Result<Chunk<Arc<dyn Array>>>;
fn next(&mut self) -> Option<Self::Item> {
if self.current_block < self.metadata.blocks.len() {
let block = self.current_block;
self.current_block += 1;
Some(read_batch(
&mut self.reader,
&self.metadata,
self.projection.as_ref().map(|x| x.0.as_ref()),
block,
&mut self.buffer,
))
} else {
None
}
}
}