use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
use crate::array::*;
use crate::buffer::Buffer;
use crate::compute::cast;
use crate::datatypes::{DataType, IntervalUnit, Schema, SchemaRef};
use crate::error::{ArrowError, Result};
use crate::ipc;
use crate::record_batch::{RecordBatch, RecordBatchReader};
use DataType::*;
fn read_buffer(buf: &ipc::Buffer, a_data: &Vec<u8>) -> Buffer {
let start_offset = buf.offset() as usize;
let end_offset = start_offset + buf.length() as usize;
let buf_data = &a_data[start_offset..end_offset];
Buffer::from(&buf_data)
}
fn create_array(
nodes: &[ipc::FieldNode],
data_type: &DataType,
data: &Vec<u8>,
buffers: &[ipc::Buffer],
mut node_index: usize,
mut buffer_index: usize,
) -> (ArrayRef, usize, usize) {
use DataType::*;
let array = match data_type {
Utf8 | Binary => {
let array = create_primitive_array(
&nodes[node_index],
data_type,
buffers[buffer_index..buffer_index + 3]
.iter()
.map(|buf| read_buffer(buf, data))
.collect(),
);
node_index = node_index + 1;
buffer_index = buffer_index + 3;
array
}
FixedSizeBinary(_) => {
let array = create_primitive_array(
&nodes[node_index],
data_type,
buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.collect(),
);
node_index = node_index + 1;
buffer_index = buffer_index + 2;
array
}
List(ref list_data_type) => {
let list_node = &nodes[node_index];
let list_buffers: Vec<Buffer> = buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.collect();
node_index = node_index + 1;
buffer_index = buffer_index + 2;
let triple = create_array(
nodes,
list_data_type,
data,
buffers,
node_index,
buffer_index,
);
node_index = triple.1;
buffer_index = triple.2;
create_list_array(list_node, data_type, &list_buffers[..], triple.0)
}
FixedSizeList(ref list_data_type, _) => {
let list_node = &nodes[node_index];
let list_buffers: Vec<Buffer> = buffers[buffer_index..buffer_index + 1]
.iter()
.map(|buf| read_buffer(buf, data))
.collect();
node_index = node_index + 1;
buffer_index = buffer_index + 1;
let triple = create_array(
nodes,
list_data_type,
data,
buffers,
node_index,
buffer_index,
);
node_index = triple.1;
buffer_index = triple.2;
create_list_array(list_node, data_type, &list_buffers[..], triple.0)
}
Struct(struct_fields) => {
let struct_node = &nodes[node_index];
let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data);
node_index = node_index + 1;
buffer_index = buffer_index + 1;
let mut struct_arrays = vec![];
for struct_field in struct_fields {
let triple = create_array(
nodes,
struct_field.data_type(),
data,
buffers,
node_index,
buffer_index,
);
node_index = triple.1;
buffer_index = triple.2;
struct_arrays.push((struct_field.clone(), triple.0));
}
let null_count = struct_node.null_count() as usize;
let struct_array = if null_count > 0 {
StructArray::from((
struct_arrays,
null_buffer,
struct_node.null_count() as usize,
))
} else {
StructArray::from(struct_arrays)
};
Arc::new(struct_array)
}
_ => {
let array = create_primitive_array(
&nodes[node_index],
data_type,
buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.collect(),
);
node_index = node_index + 1;
buffer_index = buffer_index + 2;
array
}
};
(array, node_index, buffer_index)
}
fn create_primitive_array(
field_node: &ipc::FieldNode,
data_type: &DataType,
buffers: Vec<Buffer>,
) -> ArrayRef {
let length = field_node.length() as usize;
let null_count = field_node.null_count() as usize;
let array_data = match data_type {
Utf8 | Binary => {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..3].to_vec())
.offset(0);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
FixedSizeBinary(_) => {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..2].to_vec())
.offset(0);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
Int8
| Int16
| Int32
| UInt8
| UInt16
| UInt32
| Time32(_)
| Date32(_)
| Interval(IntervalUnit::YearMonth) => {
if buffers[1].len() / 8 == length {
let mut builder = ArrayData::builder(DataType::Int64)
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
let values = Arc::new(Int64Array::from(builder.build())) as ArrayRef;
let casted = cast(&values, data_type).unwrap();
casted.data()
} else {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
}
Float32 => {
if buffers[1].len() / 8 == length {
let mut builder = ArrayData::builder(DataType::Float64)
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
let values = Arc::new(Float64Array::from(builder.build())) as ArrayRef;
let casted = cast(&values, data_type).unwrap();
casted.data()
} else {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
}
Boolean
| Int64
| UInt64
| Float64
| Time64(_)
| Timestamp(_, _)
| Date64(_)
| Duration(_)
| Interval(IntervalUnit::DayTime) => {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
t @ _ => panic!("Data type {:?} either unsupported or not primitive", t),
};
make_array(array_data)
}
fn create_list_array(
field_node: &ipc::FieldNode,
data_type: &DataType,
buffers: &[Buffer],
child_array: ArrayRef,
) -> ArrayRef {
if let &DataType::List(_) = data_type {
let null_count = field_node.null_count() as usize;
let mut builder = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.buffers(buffers[1..2].to_vec())
.offset(0)
.child_data(vec![child_array.data()]);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
make_array(builder.build())
} else if let &DataType::FixedSizeList(_, _) = data_type {
let null_count = field_node.null_count() as usize;
let mut builder = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.buffers(buffers[1..1].to_vec())
.offset(0)
.child_data(vec![child_array.data()]);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
make_array(builder.build())
} else {
panic!("Cannot create list array from {:?}", data_type)
}
}
fn read_record_batch(
buf: &Vec<u8>,
batch: ipc::RecordBatch,
schema: Arc<Schema>,
) -> Result<Option<RecordBatch>> {
let buffers = batch.buffers().ok_or(ArrowError::IoError(
"Unable to get buffers from IPC RecordBatch".to_string(),
))?;
let field_nodes = batch.nodes().ok_or(ArrowError::IoError(
"Unable to get field nodes from IPC RecordBatch".to_string(),
))?;
let mut buffer_index = 0;
let mut node_index = 0;
let mut arrays = vec![];
for field in schema.fields() {
let triple = create_array(
field_nodes,
field.data_type(),
&buf,
buffers,
node_index,
buffer_index,
);
node_index = triple.1;
buffer_index = triple.2;
arrays.push(triple.0);
}
RecordBatch::try_new(schema.clone(), arrays).map(|batch| Some(batch))
}
pub struct FileReader<R: Read + Seek> {
reader: BufReader<R>,
schema: Arc<Schema>,
blocks: Vec<ipc::Block>,
current_block: usize,
total_blocks: usize,
}
impl<R: Read + Seek> FileReader<R> {
pub fn try_new(reader: R) -> Result<Self> {
let mut reader = BufReader::new(reader);
let mut magic_buffer: [u8; 6] = [0; 6];
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
return Err(ArrowError::IoError(
"Arrow file does not contain correct header".to_string(),
));
}
reader.seek(SeekFrom::End(-6))?;
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
return Err(ArrowError::IoError(
"Arrow file does not contain correct footer".to_string(),
));
}
reader.seek(SeekFrom::Start(8))?;
let mut meta_size: [u8; 4] = [0; 4];
reader.read_exact(&mut meta_size)?;
let meta_len = u32::from_le_bytes(meta_size);
let mut meta_buffer = vec![0; meta_len as usize];
reader.seek(SeekFrom::Start(12))?;
reader.read_exact(&mut meta_buffer)?;
let vecs = &meta_buffer.to_vec();
let message = ipc::get_root_as_message(vecs);
let ipc_schema: ipc::Schema =
message.header_as_schema().ok_or(ArrowError::IoError(
"Unable to Unable to read IPC message as schema".to_string(),
))?;
let schema = ipc::convert::fb_to_schema(ipc_schema);
let mut footer_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::End(-10))?;
reader.read_exact(&mut footer_size)?;
let footer_len = u32::from_le_bytes(footer_size);
let mut footer_data = vec![0; footer_len as usize];
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
reader.read_exact(&mut footer_data)?;
let footer = ipc::get_root_as_footer(&footer_data[..]);
let blocks = footer.recordBatches().ok_or(ArrowError::IoError(
"Unable to get record batches from IPC Footer".to_string(),
))?;
let total_blocks = blocks.len();
Ok(Self {
reader,
schema: Arc::new(schema),
blocks: blocks.to_vec(),
current_block: 0,
total_blocks,
})
}
pub fn num_batches(&self) -> usize {
self.total_blocks
}
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
pub fn next(&mut self) -> Result<Option<RecordBatch>> {
if self.current_block < self.total_blocks {
let block = self.blocks[self.current_block];
self.current_block = self.current_block + 1;
let meta_len = block.metaDataLength() - 4;
let mut block_data = vec![0; meta_len as usize];
self.reader
.seek(SeekFrom::Start(block.offset() as u64 + 4))?;
self.reader.read_exact(&mut block_data)?;
let message = ipc::get_root_as_message(&block_data[..]);
match message.header_type() {
ipc::MessageHeader::Schema => {
return Err(ArrowError::IoError(
"Not expecting a schema when messages are read".to_string(),
));
}
ipc::MessageHeader::RecordBatch => {
let batch =
message.header_as_record_batch().ok_or(ArrowError::IoError(
"Unable to read IPC message as record batch".to_string(),
))?;
let mut buf = vec![0; block.bodyLength() as usize];
self.reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
self.reader.read_exact(&mut buf)?;
read_record_batch(&buf, batch, self.schema())
}
_ => {
return Err(ArrowError::IoError(
"Reading types other than record batches not yet supported"
.to_string(),
));
}
}
} else {
Ok(None)
}
}
pub fn set_index(&mut self, index: usize) -> Result<()> {
if index >= self.total_blocks {
Err(ArrowError::IoError(format!(
"Cannot set batch to index {} from {} total batches",
index, self.total_blocks
)))
} else {
self.current_block = index;
Ok(())
}
}
}
impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
fn schema(&mut self) -> SchemaRef {
self.schema.clone()
}
fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
self.next()
}
}
pub struct StreamReader<R: Read> {
reader: BufReader<R>,
schema: Arc<Schema>,
finished: bool,
}
impl<R: Read> StreamReader<R> {
pub fn try_new(reader: R) -> Result<Self> {
let mut reader = BufReader::new(reader);
let mut meta_size: [u8; 4] = [0; 4];
reader.read_exact(&mut meta_size)?;
let meta_len = u32::from_le_bytes(meta_size);
let mut meta_buffer = vec![0; meta_len as usize];
reader.read_exact(&mut meta_buffer)?;
let vecs = &meta_buffer.to_vec();
let message = ipc::get_root_as_message(vecs);
let ipc_schema: ipc::Schema = message.header_as_schema().ok_or(
ArrowError::IoError("Unable to read IPC message as schema".to_string()),
)?;
let schema = ipc::convert::fb_to_schema(ipc_schema);
Ok(Self {
reader,
schema: Arc::new(schema),
finished: false,
})
}
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
pub fn next(&mut self) -> Result<Option<RecordBatch>> {
if self.finished {
return Ok(None);
}
let mut meta_size: [u8; 4] = [0; 4];
self.reader.read_exact(&mut meta_size)?;
let meta_len = u32::from_le_bytes(meta_size);
if meta_len == 0 {
self.finished = true;
return Ok(None);
}
let mut meta_buffer = vec![0; meta_len as usize];
self.reader.read_exact(&mut meta_buffer)?;
let vecs = &meta_buffer.to_vec();
let message = ipc::get_root_as_message(vecs);
match message.header_type() {
ipc::MessageHeader::Schema => {
return Err(ArrowError::IoError(
"Not expecting a schema when messages are read".to_string(),
));
}
ipc::MessageHeader::RecordBatch => {
let batch =
message.header_as_record_batch().ok_or(ArrowError::IoError(
"Unable to read IPC message as record batch".to_string(),
))?;
let mut buf = vec![0; message.bodyLength() as usize];
self.reader.read_exact(&mut buf)?;
read_record_batch(&buf, batch, self.schema())
}
_ => {
return Err(ArrowError::IoError(
"Reading types other than record batches not yet supported"
.to_string(),
));
}
}
}
pub fn is_finished(&self) -> bool {
self.finished
}
}
impl<R: Read> RecordBatchReader for StreamReader<R> {
fn schema(&mut self) -> SchemaRef {
self.schema.clone()
}
fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
self.next()
}
}
#[cfg(test)]
mod tests {
use super::*;
use flate2::read::GzDecoder;
use crate::util::integration_util::*;
use std::env;
use std::fs::File;
#[test]
fn read_generated_files() {
let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_nested",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/0.14.1/{}.arrow_file",
testdata, path
))
.unwrap();
let mut reader = FileReader::try_new(file).unwrap();
let arrow_json = read_gzip_json(path);
assert!(arrow_json.equals_reader(&mut reader));
});
}
#[test]
fn read_generated_streams() {
let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_nested",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/0.14.1/{}.stream",
testdata, path
))
.unwrap();
let mut reader = StreamReader::try_new(file).unwrap();
let arrow_json = read_gzip_json(path);
assert!(arrow_json.equals_reader(&mut reader));
assert!(reader.next().unwrap().is_none());
assert!(reader.is_finished());
});
}
fn read_gzip_json(path: &str) -> ArrowJson {
let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/0.14.1/{}.json.gz",
testdata, path
))
.unwrap();
let mut gz = GzDecoder::new(&file);
let mut s = String::new();
gz.read_to_string(&mut s).unwrap();
let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();
arrow_json
}
}