use std::collections::HashMap;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
use crate::array::*;
use crate::buffer::Buffer;
use crate::compute::cast;
use crate::datatypes::{DataType, Field, IntervalUnit, Schema, SchemaRef};
use crate::error::{ArrowError, Result};
use crate::ipc;
use crate::record_batch::{RecordBatch, RecordBatchReader};
use DataType::*;
const CONTINUATION_MARKER: u32 = 0xffff_ffff;
fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
let start_offset = buf.offset() as usize;
let end_offset = start_offset + buf.length() as usize;
let buf_data = &a_data[start_offset..end_offset];
Buffer::from(&buf_data)
}
fn create_array(
nodes: &[ipc::FieldNode],
data_type: &DataType,
data: &[u8],
buffers: &[ipc::Buffer],
dictionaries: &[Option<ArrayRef>],
mut node_index: usize,
mut buffer_index: usize,
) -> (ArrayRef, usize, usize) {
use DataType::*;
let array = match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => {
let array = create_primitive_array(
&nodes[node_index],
data_type,
buffers[buffer_index..buffer_index + 3]
.iter()
.map(|buf| read_buffer(buf, data))
.collect(),
);
node_index += 1;
buffer_index += 3;
array
}
FixedSizeBinary(_) => {
let array = create_primitive_array(
&nodes[node_index],
data_type,
buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.collect(),
);
node_index += 1;
buffer_index += 2;
array
}
List(ref list_data_type) | LargeList(ref list_data_type) => {
let list_node = &nodes[node_index];
let list_buffers: Vec<Buffer> = buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.collect();
node_index += 1;
buffer_index += 2;
let triple = create_array(
nodes,
list_data_type,
data,
buffers,
dictionaries,
node_index,
buffer_index,
);
node_index = triple.1;
buffer_index = triple.2;
create_list_array(list_node, data_type, &list_buffers[..], triple.0)
}
FixedSizeList(ref list_data_type, _) => {
let list_node = &nodes[node_index];
let list_buffers: Vec<Buffer> = buffers[buffer_index..=buffer_index]
.iter()
.map(|buf| read_buffer(buf, data))
.collect();
node_index += 1;
buffer_index += 1;
let triple = create_array(
nodes,
list_data_type,
data,
buffers,
dictionaries,
node_index,
buffer_index,
);
node_index = triple.1;
buffer_index = triple.2;
create_list_array(list_node, data_type, &list_buffers[..], triple.0)
}
Struct(struct_fields) => {
let struct_node = &nodes[node_index];
let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data);
node_index += 1;
buffer_index += 1;
let mut struct_arrays = vec![];
for struct_field in struct_fields {
let triple = create_array(
nodes,
struct_field.data_type(),
data,
buffers,
dictionaries,
node_index,
buffer_index,
);
node_index = triple.1;
buffer_index = triple.2;
struct_arrays.push((struct_field.clone(), triple.0));
}
let null_count = struct_node.null_count() as usize;
let struct_array = if null_count > 0 {
StructArray::from((
struct_arrays,
null_buffer,
struct_node.null_count() as usize,
))
} else {
StructArray::from(struct_arrays)
};
Arc::new(struct_array)
}
Dictionary(_, _) => {
let index_node = &nodes[node_index];
let index_buffers: Vec<Buffer> = buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.collect();
let value_array = dictionaries[node_index].clone().unwrap();
node_index += 1;
buffer_index += 2;
create_dictionary_array(
index_node,
data_type,
&index_buffers[..],
value_array,
)
}
Null => {
let length = nodes[node_index].length() as usize;
let data = ArrayData::builder(data_type.clone())
.len(length)
.offset(0)
.build();
node_index += 1;
make_array(data)
}
_ => {
let array = create_primitive_array(
&nodes[node_index],
data_type,
buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.collect(),
);
node_index += 1;
buffer_index += 2;
array
}
};
(array, node_index, buffer_index)
}
fn create_primitive_array(
field_node: &ipc::FieldNode,
data_type: &DataType,
buffers: Vec<Buffer>,
) -> ArrayRef {
let length = field_node.length() as usize;
let null_count = field_node.null_count() as usize;
let array_data = match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..3].to_vec())
.offset(0);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
FixedSizeBinary(_) => {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..2].to_vec())
.offset(0);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
Int8
| Int16
| Int32
| UInt8
| UInt16
| UInt32
| Time32(_)
| Date32(_)
| Interval(IntervalUnit::YearMonth) => {
if buffers[1].len() / 8 == length && length != 1 {
let mut builder = ArrayData::builder(DataType::Int64)
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
let values = Arc::new(Int64Array::from(builder.build())) as ArrayRef;
let casted = cast(&values, data_type).unwrap();
casted.data()
} else {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
}
Float32 => {
if buffers[1].len() / 8 == length && length != 1 {
let mut builder = ArrayData::builder(DataType::Float64)
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
let values = Arc::new(Float64Array::from(builder.build())) as ArrayRef;
let casted = cast(&values, data_type).unwrap();
casted.data()
} else {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
}
Boolean
| Int64
| UInt64
| Float64
| Time64(_)
| Timestamp(_, _)
| Date64(_)
| Duration(_)
| Interval(IntervalUnit::DayTime) => {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
t => panic!("Data type {:?} either unsupported or not primitive", t),
};
make_array(array_data)
}
fn create_list_array(
field_node: &ipc::FieldNode,
data_type: &DataType,
buffers: &[Buffer],
child_array: ArrayRef,
) -> ArrayRef {
if let DataType::List(_) = *data_type {
let null_count = field_node.null_count() as usize;
let mut builder = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.buffers(buffers[1..2].to_vec())
.offset(0)
.child_data(vec![child_array.data()]);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
make_array(builder.build())
} else if let DataType::FixedSizeList(_, _) = *data_type {
let null_count = field_node.null_count() as usize;
let mut builder = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.buffers(buffers[1..1].to_vec())
.offset(0)
.child_data(vec![child_array.data()]);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
make_array(builder.build())
} else {
panic!("Cannot create list array from {:?}", data_type)
}
}
fn create_dictionary_array(
field_node: &ipc::FieldNode,
data_type: &DataType,
buffers: &[Buffer],
value_array: ArrayRef,
) -> ArrayRef {
if let DataType::Dictionary(_, _) = *data_type {
let null_count = field_node.null_count() as usize;
let mut builder = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.buffers(buffers[1..2].to_vec())
.offset(0)
.child_data(vec![value_array.data()]);
if null_count > 0 {
builder = builder
.null_count(null_count)
.null_bit_buffer(buffers[0].clone())
}
make_array(builder.build())
} else {
unreachable!("Cannot create dictionary array from {:?}", data_type)
}
}
pub(crate) fn read_record_batch(
buf: &[u8],
batch: ipc::RecordBatch,
schema: SchemaRef,
dictionaries: &[Option<ArrayRef>],
) -> Result<Option<RecordBatch>> {
let buffers = batch.buffers().ok_or_else(|| {
ArrowError::IoError("Unable to get buffers from IPC RecordBatch".to_string())
})?;
let field_nodes = batch.nodes().ok_or_else(|| {
ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string())
})?;
let mut buffer_index = 0;
let mut node_index = 0;
let mut arrays = vec![];
for field in schema.fields() {
let triple = create_array(
field_nodes,
field.data_type(),
&buf,
buffers,
dictionaries,
node_index,
buffer_index,
);
node_index = triple.1;
buffer_index = triple.2;
arrays.push(triple.0);
}
RecordBatch::try_new(schema, arrays).map(|batch| Some(batch))
}
fn find_dictionary_field(ipc_schema: &ipc::Schema, id: i64) -> Option<usize> {
let fields = ipc_schema.fields().unwrap();
for i in 0..fields.len() {
let field: ipc::Field = fields.get(i);
if let Some(dictionary) = field.dictionary() {
if dictionary.id() == id {
return Some(i);
}
}
}
None
}
pub struct FileReader<R: Read + Seek> {
reader: BufReader<R>,
schema: SchemaRef,
blocks: Vec<ipc::Block>,
current_block: usize,
total_blocks: usize,
dictionaries_by_field: Vec<Option<ArrayRef>>,
}
impl<R: Read + Seek> FileReader<R> {
pub fn try_new(reader: R) -> Result<Self> {
let mut reader = BufReader::new(reader);
let mut magic_buffer: [u8; 6] = [0; 6];
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
return Err(ArrowError::IoError(
"Arrow file does not contain correct header".to_string(),
));
}
reader.seek(SeekFrom::End(-6))?;
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
return Err(ArrowError::IoError(
"Arrow file does not contain correct footer".to_string(),
));
}
let mut footer_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::End(-10))?;
reader.read_exact(&mut footer_size)?;
let footer_len = u32::from_le_bytes(footer_size);
let mut footer_data = vec![0; footer_len as usize];
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
reader.read_exact(&mut footer_data)?;
let footer = ipc::get_root_as_footer(&footer_data[..]);
let blocks = footer.recordBatches().ok_or_else(|| {
ArrowError::IoError(
"Unable to get record batches from IPC Footer".to_string(),
)
})?;
let total_blocks = blocks.len();
let ipc_schema = footer.schema().unwrap();
let schema = ipc::convert::fb_to_schema(ipc_schema);
let mut dictionaries_by_field = vec![None; schema.fields().len()];
for block in footer.dictionaries().unwrap() {
let meta_len = block.metaDataLength() - 4;
let mut block_data = vec![0; meta_len as usize];
reader.seek(SeekFrom::Start(block.offset() as u64 + 4))?;
reader.read_exact(&mut block_data)?;
let message = ipc::get_root_as_message(&block_data[..]);
match message.header_type() {
ipc::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().unwrap();
let mut buf = vec![0; block.bodyLength() as usize];
reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
reader.read_exact(&mut buf)?;
if batch.isDelta() {
panic!("delta dictionary batches not supported");
}
let id = batch.id();
let first_field = find_dictionary_field(&ipc_schema, id)
.expect("dictionary id not found in shchema");
let dictionary_values: ArrayRef =
match schema.field(first_field).data_type() {
DataType::Dictionary(_, ref value_type) => {
let schema = Schema {
fields: vec![Field::new(
"",
value_type.as_ref().clone(),
false,
)],
metadata: HashMap::new(),
};
let record_batch = read_record_batch(
&buf,
batch.data().unwrap(),
Arc::new(schema),
&dictionaries_by_field,
)?
.unwrap();
Some(record_batch.column(0).clone())
}
_ => None,
}
.expect("dictionary id not found in schema");
let fields = ipc_schema.fields().unwrap();
for (i, field) in fields.iter().enumerate() {
if let Some(dictionary) = field.dictionary() {
if dictionary.id() == id {
dictionaries_by_field[i] =
Some(dictionary_values.clone());
}
}
}
}
_ => panic!("Expecting DictionaryBatch in dictionary blocks."),
};
}
Ok(Self {
reader,
schema: Arc::new(schema),
blocks: blocks.to_vec(),
current_block: 0,
total_blocks,
dictionaries_by_field,
})
}
pub fn num_batches(&self) -> usize {
self.total_blocks
}
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
pub fn set_index(&mut self, index: usize) -> Result<()> {
if index >= self.total_blocks {
Err(ArrowError::IoError(format!(
"Cannot set batch to index {} from {} total batches",
index, self.total_blocks
)))
} else {
self.current_block = index;
Ok(())
}
}
}
impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
if self.current_block < self.total_blocks {
let block = self.blocks[self.current_block];
self.current_block += 1;
let meta_len = block.metaDataLength() - 4;
let mut block_data = vec![0; meta_len as usize];
self.reader
.seek(SeekFrom::Start(block.offset() as u64 + 4))?;
self.reader.read_exact(&mut block_data)?;
let message = ipc::get_root_as_message(&block_data[..]);
match message.header_type() {
ipc::MessageHeader::Schema => Err(ArrowError::IoError(
"Not expecting a schema when messages are read".to_string(),
)),
ipc::MessageHeader::RecordBatch => {
let batch = message.header_as_record_batch().ok_or_else(|| {
ArrowError::IoError(
"Unable to read IPC message as record batch".to_string(),
)
})?;
let mut buf = vec![0; block.bodyLength() as usize];
self.reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
self.reader.read_exact(&mut buf)?;
read_record_batch(
&buf,
batch,
self.schema(),
&self.dictionaries_by_field,
)
}
ipc::MessageHeader::NONE => {
Ok(None)
}
t => Err(ArrowError::IoError(format!(
"Reading types other than record batches not yet supported, unable to read {:?}", t
))),
}
} else {
Ok(None)
}
}
}
pub struct StreamReader<R: Read> {
reader: BufReader<R>,
schema: SchemaRef,
finished: bool,
dictionaries_by_field: Vec<Option<ArrayRef>>,
}
impl<R: Read> StreamReader<R> {
pub fn try_new(reader: R) -> Result<Self> {
let mut reader = BufReader::new(reader);
let mut meta_size: [u8; 4] = [0; 4];
reader.read_exact(&mut meta_size)?;
let meta_len = {
let meta_len = u32::from_le_bytes(meta_size);
if meta_len == CONTINUATION_MARKER {
reader.read_exact(&mut meta_size)?;
u32::from_le_bytes(meta_size)
} else {
meta_len
}
};
let mut meta_buffer = vec![0; meta_len as usize];
reader.read_exact(&mut meta_buffer)?;
let vecs = &meta_buffer.to_vec();
let message = ipc::get_root_as_message(vecs);
let ipc_schema: ipc::Schema = message.header_as_schema().ok_or_else(|| {
ArrowError::IoError("Unable to read IPC message as schema".to_string())
})?;
let schema = ipc::convert::fb_to_schema(ipc_schema);
let dictionaries_by_field = vec![None; schema.fields().len()];
Ok(Self {
reader,
schema: Arc::new(schema),
finished: false,
dictionaries_by_field,
})
}
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
pub fn is_finished(&self) -> bool {
self.finished
}
}
impl<R: Read> RecordBatchReader for StreamReader<R> {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
if self.finished {
return Ok(None);
}
let mut meta_size: [u8; 4] = [0; 4];
match self.reader.read_exact(&mut meta_size) {
Ok(()) => (),
Err(e) => {
return if e.kind() == std::io::ErrorKind::UnexpectedEof {
self.finished = true;
Ok(None)
} else {
Err(ArrowError::from(e))
};
}
}
let meta_len = {
let meta_len = u32::from_le_bytes(meta_size);
if meta_len == CONTINUATION_MARKER {
self.reader.read_exact(&mut meta_size)?;
u32::from_le_bytes(meta_size)
} else {
meta_len
}
};
if meta_len == 0 {
self.finished = true;
return Ok(None);
}
let mut meta_buffer = vec![0; meta_len as usize];
self.reader.read_exact(&mut meta_buffer)?;
let vecs = &meta_buffer.to_vec();
let message = ipc::get_root_as_message(vecs);
match message.header_type() {
ipc::MessageHeader::Schema => Err(ArrowError::IoError(
"Not expecting a schema when messages are read".to_string(),
)),
ipc::MessageHeader::RecordBatch => {
let batch = message.header_as_record_batch().ok_or_else(|| {
ArrowError::IoError(
"Unable to read IPC message as record batch".to_string(),
)
})?;
let mut buf = vec![0; message.bodyLength() as usize];
self.reader.read_exact(&mut buf)?;
read_record_batch(&buf, batch, self.schema(), &self.dictionaries_by_field)
}
ipc::MessageHeader::NONE => {
Ok(None)
}
t => Err(ArrowError::IoError(
format!("Reading types other than record batches not yet supported, unable to read {:?} ", t)
)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use flate2::read::GzDecoder;
use crate::util::integration_util::*;
use std::env;
use std::fs::File;
#[test]
fn read_generated_files() {
let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_dictionary",
"generated_nested",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/0.14.1/{}.arrow_file",
testdata, path
))
.unwrap();
let mut reader = FileReader::try_new(file).unwrap();
let arrow_json = read_gzip_json(path);
assert!(arrow_json.equals_reader(&mut reader));
});
}
#[test]
fn read_generated_streams() {
let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_nested",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/0.14.1/{}.stream",
testdata, path
))
.unwrap();
let mut reader = StreamReader::try_new(file).unwrap();
let arrow_json = read_gzip_json(path);
assert!(arrow_json.equals_reader(&mut reader));
assert!(reader.next_batch().unwrap().is_none());
assert!(reader.is_finished());
});
}
#[test]
fn test_arrow_single_float_row() {
let schema = Schema::new(vec![
Field::new("a", DataType::Float32, false),
Field::new("b", DataType::Float32, false),
Field::new("c", DataType::Int32, false),
Field::new("d", DataType::Int32, false),
]);
let arrays = vec![
Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
Arc::new(Int32Array::from(vec![2])) as ArrayRef,
Arc::new(Int32Array::from(vec![1])) as ArrayRef,
];
let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
let file = File::create("target/debug/testdata/float.stream").unwrap();
let mut stream_writer =
crate::ipc::writer::StreamWriter::try_new(file, &schema).unwrap();
stream_writer.write(&batch).unwrap();
stream_writer.finish().unwrap();
let file = File::open("target/debug/testdata/float.stream").unwrap();
let mut reader = StreamReader::try_new(file).unwrap();
while let Some(batch) = reader.next_batch().unwrap() {
assert!(
batch
.column(0)
.as_any()
.downcast_ref::<Float32Array>()
.unwrap()
.value(0)
!= 0.0
);
assert!(
batch
.column(1)
.as_any()
.downcast_ref::<Float32Array>()
.unwrap()
.value(0)
!= 0.0
);
}
}
fn read_gzip_json(path: &str) -> ArrowJson {
let testdata = env::var("ARROW_TEST_DATA").expect("ARROW_TEST_DATA not defined");
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/0.14.1/{}.json.gz",
testdata, path
))
.unwrap();
let mut gz = GzDecoder::new(&file);
let mut s = String::new();
gz.read_to_string(&mut s).unwrap();
let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();
arrow_json
}
}