use std::io::Read;
use std::sync::Arc;
use arrow_format;
use arrow_format::ipc::planus::ReadAsRoot;
use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::Schema;
use crate::error::{ArrowError, Result};
use crate::io::ipc::IpcSchema;
use super::super::CONTINUATION_MARKER;
use super::common::*;
use super::schema::deserialize_stream_metadata;
use super::Dictionaries;
#[derive(Debug, Clone)]
pub struct StreamMetadata {
pub schema: Schema,
pub version: arrow_format::ipc::MetadataVersion,
pub ipc_schema: IpcSchema,
}
pub fn read_stream_metadata<R: Read>(reader: &mut R) -> Result<StreamMetadata> {
let mut meta_size: [u8; 4] = [0; 4];
reader.read_exact(&mut meta_size)?;
let meta_len = {
if meta_size == CONTINUATION_MARKER {
reader.read_exact(&mut meta_size)?;
}
i32::from_le_bytes(meta_size)
};
let mut meta_buffer = vec![0; meta_len as usize];
reader.read_exact(&mut meta_buffer)?;
deserialize_stream_metadata(&meta_buffer)
}
pub enum StreamState {
Waiting,
Some(Chunk<Arc<dyn Array>>),
}
impl StreamState {
pub fn unwrap(self) -> Chunk<Arc<dyn Array>> {
if let StreamState::Some(batch) = self {
batch
} else {
panic!("The batch is not available")
}
}
}
fn read_next<R: Read>(
reader: &mut R,
metadata: &StreamMetadata,
dictionaries: &mut Dictionaries,
message_buffer: &mut Vec<u8>,
data_buffer: &mut Vec<u8>,
) -> Result<Option<StreamState>> {
let mut meta_length: [u8; 4] = [0; 4];
match reader.read_exact(&mut meta_length) {
Ok(()) => (),
Err(e) => {
return if e.kind() == std::io::ErrorKind::UnexpectedEof {
Ok(Some(StreamState::Waiting))
} else {
Err(ArrowError::from(e))
};
}
}
let meta_length = {
if meta_length == CONTINUATION_MARKER {
reader.read_exact(&mut meta_length)?;
}
i32::from_le_bytes(meta_length) as usize
};
if meta_length == 0 {
return Ok(None);
}
message_buffer.clear();
message_buffer.resize(meta_length, 0);
reader.read_exact(message_buffer)?;
let message = arrow_format::ipc::MessageRef::read_as_root(message_buffer).map_err(|err| {
ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;
let header = message.header()?.ok_or_else(|| {
ArrowError::oos("IPC: unable to fetch the message header. The file or stream is corrupted.")
})?;
match header {
arrow_format::ipc::MessageHeaderRef::Schema(_) => Err(ArrowError::oos("A stream ")),
arrow_format::ipc::MessageHeaderRef::RecordBatch(batch) => {
data_buffer.clear();
data_buffer.resize(message.body_length()? as usize, 0);
reader.read_exact(data_buffer)?;
let mut reader = std::io::Cursor::new(data_buffer);
read_record_batch(
batch,
&metadata.schema.fields,
&metadata.ipc_schema,
None,
dictionaries,
metadata.version,
&mut reader,
0,
)
.map(|x| Some(StreamState::Some(x)))
}
arrow_format::ipc::MessageHeaderRef::DictionaryBatch(batch) => {
let mut buf = vec![0; message.body_length()? as usize];
reader.read_exact(&mut buf)?;
let mut dict_reader = std::io::Cursor::new(buf);
read_dictionary(
batch,
&metadata.schema.fields,
&metadata.ipc_schema,
dictionaries,
&mut dict_reader,
0,
)?;
read_next(reader, metadata, dictionaries, message_buffer, data_buffer)
}
t => Err(ArrowError::OutOfSpec(format!(
"Reading types other than record batches not yet supported, unable to read {:?} ",
t
))),
}
}
pub struct StreamReader<R: Read> {
reader: R,
metadata: StreamMetadata,
dictionaries: Dictionaries,
finished: bool,
data_buffer: Vec<u8>,
message_buffer: Vec<u8>,
}
impl<R: Read> StreamReader<R> {
pub fn new(reader: R, metadata: StreamMetadata) -> Self {
Self {
reader,
metadata,
dictionaries: Default::default(),
finished: false,
data_buffer: vec![],
message_buffer: vec![],
}
}
pub fn metadata(&self) -> &StreamMetadata {
&self.metadata
}
pub fn is_finished(&self) -> bool {
self.finished
}
fn maybe_next(&mut self) -> Result<Option<StreamState>> {
if self.finished {
return Ok(None);
}
let batch = read_next(
&mut self.reader,
&self.metadata,
&mut self.dictionaries,
&mut self.message_buffer,
&mut self.data_buffer,
)?;
if batch.is_none() {
self.finished = true;
}
Ok(batch)
}
}
impl<R: Read> Iterator for StreamReader<R> {
type Item = Result<StreamState>;
fn next(&mut self) -> Option<Self::Item> {
self.maybe_next().transpose()
}
}