use std::collections::HashMap;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
use crate::array::*;
use crate::buffer::Buffer;
use crate::compute::cast;
use crate::datatypes::{DataType, Field, IntervalUnit, Schema, SchemaRef};
use crate::error::{ArrowError, Result};
use crate::ipc;
use crate::record_batch::{RecordBatch, RecordBatchReader};
use ipc::CONTINUATION_MARKER;
use DataType::*;
fn read_buffer(buf: &ipc::Buffer, a_data: &[u8]) -> Buffer {
let start_offset = buf.offset() as usize;
let end_offset = start_offset + buf.length() as usize;
let buf_data = &a_data[start_offset..end_offset];
Buffer::from(&buf_data)
}
fn create_array(
nodes: &[ipc::FieldNode],
data_type: &DataType,
data: &[u8],
buffers: &[ipc::Buffer],
dictionaries: &[Option<ArrayRef>],
mut node_index: usize,
mut buffer_index: usize,
) -> (ArrayRef, usize, usize) {
use DataType::*;
let array = match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => {
let array = create_primitive_array(
&nodes[node_index],
data_type,
buffers[buffer_index..buffer_index + 3]
.iter()
.map(|buf| read_buffer(buf, data))
.collect(),
);
node_index += 1;
buffer_index += 3;
array
}
FixedSizeBinary(_) => {
let array = create_primitive_array(
&nodes[node_index],
data_type,
buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.collect(),
);
node_index += 1;
buffer_index += 2;
array
}
List(ref list_field) | LargeList(ref list_field) => {
let list_node = &nodes[node_index];
let list_buffers: Vec<Buffer> = buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.collect();
node_index += 1;
buffer_index += 2;
let triple = create_array(
nodes,
list_field.data_type(),
data,
buffers,
dictionaries,
node_index,
buffer_index,
);
node_index = triple.1;
buffer_index = triple.2;
create_list_array(list_node, data_type, &list_buffers[..], triple.0)
}
FixedSizeList(ref list_field, _) => {
let list_node = &nodes[node_index];
let list_buffers: Vec<Buffer> = buffers[buffer_index..=buffer_index]
.iter()
.map(|buf| read_buffer(buf, data))
.collect();
node_index += 1;
buffer_index += 1;
let triple = create_array(
nodes,
list_field.data_type(),
data,
buffers,
dictionaries,
node_index,
buffer_index,
);
node_index = triple.1;
buffer_index = triple.2;
create_list_array(list_node, data_type, &list_buffers[..], triple.0)
}
Struct(struct_fields) => {
let struct_node = &nodes[node_index];
let null_buffer: Buffer = read_buffer(&buffers[buffer_index], data);
node_index += 1;
buffer_index += 1;
let mut struct_arrays = vec![];
for struct_field in struct_fields {
let triple = create_array(
nodes,
struct_field.data_type(),
data,
buffers,
dictionaries,
node_index,
buffer_index,
);
node_index = triple.1;
buffer_index = triple.2;
struct_arrays.push((struct_field.clone(), triple.0));
}
let null_count = struct_node.null_count() as usize;
let struct_array = if null_count > 0 {
StructArray::from((struct_arrays, null_buffer))
} else {
StructArray::from(struct_arrays)
};
Arc::new(struct_array)
}
Dictionary(_, _) => {
let index_node = &nodes[node_index];
let index_buffers: Vec<Buffer> = buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.collect();
let value_array = dictionaries[node_index].clone().unwrap();
node_index += 1;
buffer_index += 2;
create_dictionary_array(
index_node,
data_type,
&index_buffers[..],
value_array,
)
}
Null => {
let length = nodes[node_index].length() as usize;
let data = ArrayData::builder(data_type.clone())
.len(length)
.offset(0)
.build();
node_index += 1;
make_array(data)
}
_ => {
let array = create_primitive_array(
&nodes[node_index],
data_type,
buffers[buffer_index..buffer_index + 2]
.iter()
.map(|buf| read_buffer(buf, data))
.collect(),
);
node_index += 1;
buffer_index += 2;
array
}
};
(array, node_index, buffer_index)
}
fn create_primitive_array(
field_node: &ipc::FieldNode,
data_type: &DataType,
buffers: Vec<Buffer>,
) -> ArrayRef {
let length = field_node.length() as usize;
let null_count = field_node.null_count() as usize;
let array_data = match data_type {
Utf8 | Binary | LargeBinary | LargeUtf8 => {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..3].to_vec())
.offset(0);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
FixedSizeBinary(_) => {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..2].to_vec())
.offset(0);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
Int8
| Int16
| Int32
| UInt8
| UInt16
| UInt32
| Time32(_)
| Date32(_)
| Interval(IntervalUnit::YearMonth) => {
if buffers[1].len() / 8 == length && length != 1 {
let mut builder = ArrayData::builder(DataType::Int64)
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
let values = Arc::new(Int64Array::from(builder.build())) as ArrayRef;
let casted = cast(&values, data_type).unwrap();
casted.data()
} else {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
}
Float32 => {
if buffers[1].len() / 8 == length && length != 1 {
let mut builder = ArrayData::builder(DataType::Float64)
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
let values = Arc::new(Float64Array::from(builder.build())) as ArrayRef;
let casted = cast(&values, data_type).unwrap();
casted.data()
} else {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
}
Boolean
| Int64
| UInt64
| Float64
| Time64(_)
| Timestamp(_, _)
| Date64(_)
| Duration(_)
| Interval(IntervalUnit::DayTime) => {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..].to_vec())
.offset(0);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
Decimal(_, _) => {
let mut builder = ArrayData::builder(data_type.clone())
.len(length)
.buffers(buffers[1..2].to_vec())
.offset(0);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
builder.build()
}
t => panic!("Data type {:?} either unsupported or not primitive", t),
};
make_array(array_data)
}
fn create_list_array(
field_node: &ipc::FieldNode,
data_type: &DataType,
buffers: &[Buffer],
child_array: ArrayRef,
) -> ArrayRef {
if let DataType::List(_) = *data_type {
let null_count = field_node.null_count() as usize;
let mut builder = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.buffers(buffers[1..2].to_vec())
.offset(0)
.child_data(vec![child_array.data()]);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
make_array(builder.build())
} else if let DataType::LargeList(_) = *data_type {
let null_count = field_node.null_count() as usize;
let mut builder = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.buffers(buffers[1..2].to_vec())
.offset(0)
.child_data(vec![child_array.data()]);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
make_array(builder.build())
} else if let DataType::FixedSizeList(_, _) = *data_type {
let null_count = field_node.null_count() as usize;
let mut builder = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.buffers(buffers[1..1].to_vec())
.offset(0)
.child_data(vec![child_array.data()]);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
make_array(builder.build())
} else {
panic!("Cannot create list array from {:?}", data_type)
}
}
fn create_dictionary_array(
field_node: &ipc::FieldNode,
data_type: &DataType,
buffers: &[Buffer],
value_array: ArrayRef,
) -> ArrayRef {
if let DataType::Dictionary(_, _) = *data_type {
let null_count = field_node.null_count() as usize;
let mut builder = ArrayData::builder(data_type.clone())
.len(field_node.length() as usize)
.buffers(buffers[1..2].to_vec())
.offset(0)
.child_data(vec![value_array.data()]);
if null_count > 0 {
builder = builder.null_bit_buffer(buffers[0].clone())
}
make_array(builder.build())
} else {
unreachable!("Cannot create dictionary array from {:?}", data_type)
}
}
pub fn read_record_batch(
buf: &[u8],
batch: ipc::RecordBatch,
schema: SchemaRef,
dictionaries: &[Option<ArrayRef>],
) -> Result<RecordBatch> {
let buffers = batch.buffers().ok_or_else(|| {
ArrowError::IoError("Unable to get buffers from IPC RecordBatch".to_string())
})?;
let field_nodes = batch.nodes().ok_or_else(|| {
ArrowError::IoError("Unable to get field nodes from IPC RecordBatch".to_string())
})?;
let mut buffer_index = 0;
let mut node_index = 0;
let mut arrays = vec![];
for field in schema.fields() {
let triple = create_array(
field_nodes,
field.data_type(),
&buf,
buffers,
dictionaries,
node_index,
buffer_index,
);
node_index = triple.1;
buffer_index = triple.2;
arrays.push(triple.0);
}
RecordBatch::try_new(schema, arrays)
}
pub fn read_dictionary(
buf: &[u8],
batch: ipc::DictionaryBatch,
schema: &Schema,
dictionaries_by_field: &mut [Option<ArrayRef>],
) -> Result<()> {
if batch.isDelta() {
return Err(ArrowError::IoError(
"delta dictionary batches not supported".to_string(),
));
}
let id = batch.id();
let fields_using_this_dictionary = schema.fields_with_dict_id(id);
let first_field = fields_using_this_dictionary.first().ok_or_else(|| {
ArrowError::InvalidArgumentError("dictionary id not found in schema".to_string())
})?;
let dictionary_values: ArrayRef = match first_field.data_type() {
DataType::Dictionary(_, ref value_type) => {
let schema = Schema {
fields: vec![Field::new("", value_type.as_ref().clone(), false)],
metadata: HashMap::new(),
};
let record_batch = read_record_batch(
&buf,
batch.data().unwrap(),
Arc::new(schema),
&dictionaries_by_field,
)?;
Some(record_batch.column(0).clone())
}
_ => None,
}
.ok_or_else(|| {
ArrowError::InvalidArgumentError("dictionary id not found in schema".to_string())
})?;
for (i, field) in schema.fields().iter().enumerate() {
if field.dict_id() == Some(id) {
dictionaries_by_field[i] = Some(dictionary_values.clone());
}
}
Ok(())
}
pub struct FileReader<R: Read + Seek> {
reader: BufReader<R>,
schema: SchemaRef,
blocks: Vec<ipc::Block>,
current_block: usize,
total_blocks: usize,
dictionaries_by_field: Vec<Option<ArrayRef>>,
metadata_version: ipc::MetadataVersion,
}
impl<R: Read + Seek> FileReader<R> {
pub fn try_new(reader: R) -> Result<Self> {
let mut reader = BufReader::new(reader);
let mut magic_buffer: [u8; 6] = [0; 6];
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
return Err(ArrowError::IoError(
"Arrow file does not contain correct header".to_string(),
));
}
reader.seek(SeekFrom::End(-6))?;
reader.read_exact(&mut magic_buffer)?;
if magic_buffer != super::ARROW_MAGIC {
return Err(ArrowError::IoError(
"Arrow file does not contain correct footer".to_string(),
));
}
let mut footer_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::End(-10))?;
reader.read_exact(&mut footer_size)?;
let footer_len = i32::from_le_bytes(footer_size);
let mut footer_data = vec![0; footer_len as usize];
reader.seek(SeekFrom::End(-10 - footer_len as i64))?;
reader.read_exact(&mut footer_data)?;
let footer = ipc::root_as_footer(&footer_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as footer: {:?}", err))
})?;
let blocks = footer.recordBatches().ok_or_else(|| {
ArrowError::IoError(
"Unable to get record batches from IPC Footer".to_string(),
)
})?;
let total_blocks = blocks.len();
let ipc_schema = footer.schema().unwrap();
let schema = ipc::convert::fb_to_schema(ipc_schema);
let mut dictionaries_by_field = vec![None; schema.fields().len()];
for block in footer.dictionaries().unwrap() {
let mut message_size: [u8; 4] = [0; 4];
reader.seek(SeekFrom::Start(block.offset() as u64))?;
reader.read_exact(&mut message_size)?;
let footer_len = if message_size == CONTINUATION_MARKER {
reader.read_exact(&mut message_size)?;
i32::from_le_bytes(message_size)
} else {
i32::from_le_bytes(message_size)
};
let mut block_data = vec![0; footer_len as usize];
reader.read_exact(&mut block_data)?;
let message = ipc::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {:?}", err))
})?;
match message.header_type() {
ipc::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().unwrap();
let mut buf = vec![0; block.bodyLength() as usize];
reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
reader.read_exact(&mut buf)?;
read_dictionary(&buf, batch, &schema, &mut dictionaries_by_field)?;
}
t => {
return Err(ArrowError::IoError(format!(
"Expecting DictionaryBatch in dictionary blocks, found {:?}.",
t
)));
}
};
}
Ok(Self {
reader,
schema: Arc::new(schema),
blocks: blocks.to_vec(),
current_block: 0,
total_blocks,
dictionaries_by_field,
metadata_version: footer.version(),
})
}
pub fn num_batches(&self) -> usize {
self.total_blocks
}
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
pub fn set_index(&mut self, index: usize) -> Result<()> {
if index >= self.total_blocks {
Err(ArrowError::IoError(format!(
"Cannot set batch to index {} from {} total batches",
index, self.total_blocks
)))
} else {
self.current_block = index;
Ok(())
}
}
fn maybe_next(&mut self) -> Result<Option<RecordBatch>> {
let block = self.blocks[self.current_block];
self.current_block += 1;
self.reader.seek(SeekFrom::Start(block.offset() as u64))?;
let mut meta_buf = [0; 4];
self.reader.read_exact(&mut meta_buf)?;
if meta_buf == CONTINUATION_MARKER {
self.reader.read_exact(&mut meta_buf)?;
}
let meta_len = i32::from_le_bytes(meta_buf);
let mut block_data = vec![0; meta_len as usize];
self.reader.read_exact(&mut block_data)?;
let message = ipc::root_as_message(&block_data[..]).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as footer: {:?}", err))
})?;
if self.metadata_version != ipc::MetadataVersion::V1
&& message.version() != self.metadata_version
{
return Err(ArrowError::IoError(
"Could not read IPC message as metadata versions mismatch".to_string(),
));
}
match message.header_type() {
ipc::MessageHeader::Schema => Err(ArrowError::IoError(
"Not expecting a schema when messages are read".to_string(),
)),
ipc::MessageHeader::RecordBatch => {
let batch = message.header_as_record_batch().ok_or_else(|| {
ArrowError::IoError(
"Unable to read IPC message as record batch".to_string(),
)
})?;
let mut buf = vec![0; block.bodyLength() as usize];
self.reader.seek(SeekFrom::Start(
block.offset() as u64 + block.metaDataLength() as u64,
))?;
self.reader.read_exact(&mut buf)?;
read_record_batch(
&buf,
batch,
self.schema(),
&self.dictionaries_by_field,
).map(Some)
}
ipc::MessageHeader::NONE => {
Ok(None)
}
t => Err(ArrowError::IoError(format!(
"Reading types other than record batches not yet supported, unable to read {:?}", t
))),
}
}
}
impl<R: Read + Seek> Iterator for FileReader<R> {
type Item = Result<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
if self.current_block < self.total_blocks {
self.maybe_next().transpose()
} else {
None
}
}
}
impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
pub struct StreamReader<R: Read> {
reader: BufReader<R>,
schema: SchemaRef,
dictionaries_by_field: Vec<Option<ArrayRef>>,
finished: bool,
}
impl<R: Read> StreamReader<R> {
pub fn try_new(reader: R) -> Result<Self> {
let mut reader = BufReader::new(reader);
let mut meta_size: [u8; 4] = [0; 4];
reader.read_exact(&mut meta_size)?;
let meta_len = {
if meta_size == CONTINUATION_MARKER {
reader.read_exact(&mut meta_size)?;
}
i32::from_le_bytes(meta_size)
};
let mut meta_buffer = vec![0; meta_len as usize];
reader.read_exact(&mut meta_buffer)?;
let message = ipc::root_as_message(meta_buffer.as_slice()).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {:?}", err))
})?;
let ipc_schema: ipc::Schema = message.header_as_schema().ok_or_else(|| {
ArrowError::IoError("Unable to read IPC message as schema".to_string())
})?;
let schema = ipc::convert::fb_to_schema(ipc_schema);
let dictionaries_by_field = vec![None; schema.fields().len()];
Ok(Self {
reader,
schema: Arc::new(schema),
finished: false,
dictionaries_by_field,
})
}
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
pub fn is_finished(&self) -> bool {
self.finished
}
fn maybe_next(&mut self) -> Result<Option<RecordBatch>> {
if self.finished {
return Ok(None);
}
let mut meta_size: [u8; 4] = [0; 4];
match self.reader.read_exact(&mut meta_size) {
Ok(()) => (),
Err(e) => {
return if e.kind() == std::io::ErrorKind::UnexpectedEof {
self.finished = true;
Ok(None)
} else {
Err(ArrowError::from(e))
};
}
}
let meta_len = {
if meta_size == CONTINUATION_MARKER {
self.reader.read_exact(&mut meta_size)?;
}
i32::from_le_bytes(meta_size)
};
if meta_len == 0 {
self.finished = true;
return Ok(None);
}
let mut meta_buffer = vec![0; meta_len as usize];
self.reader.read_exact(&mut meta_buffer)?;
let vecs = &meta_buffer.to_vec();
let message = ipc::root_as_message(vecs).map_err(|err| {
ArrowError::IoError(format!("Unable to get root as message: {:?}", err))
})?;
match message.header_type() {
ipc::MessageHeader::Schema => Err(ArrowError::IoError(
"Not expecting a schema when messages are read".to_string(),
)),
ipc::MessageHeader::RecordBatch => {
let batch = message.header_as_record_batch().ok_or_else(|| {
ArrowError::IoError(
"Unable to read IPC message as record batch".to_string(),
)
})?;
let mut buf = vec![0; message.bodyLength() as usize];
self.reader.read_exact(&mut buf)?;
read_record_batch(&buf, batch, self.schema(), &self.dictionaries_by_field).map(Some)
}
ipc::MessageHeader::DictionaryBatch => {
let batch = message.header_as_dictionary_batch().ok_or_else(|| {
ArrowError::IoError(
"Unable to read IPC message as dictionary batch".to_string(),
)
})?;
let mut buf = vec![0; message.bodyLength() as usize];
self.reader.read_exact(&mut buf)?;
read_dictionary(
&buf, batch, &self.schema, &mut self.dictionaries_by_field
)?;
self.maybe_next()
}
ipc::MessageHeader::NONE => {
Ok(None)
}
t => Err(ArrowError::IoError(
format!("Reading types other than record batches not yet supported, unable to read {:?} ", t)
)),
}
}
}
impl<R: Read> Iterator for StreamReader<R> {
type Item = Result<RecordBatch>;
fn next(&mut self) -> Option<Self::Item> {
self.maybe_next().transpose()
}
}
impl<R: Read> RecordBatchReader for StreamReader<R> {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
use flate2::read::GzDecoder;
use crate::util::integration_util::*;
use std::fs::File;
#[test]
fn read_generated_files() {
let testdata = crate::util::test_util::arrow_test_data();
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_dictionary",
"generated_nested",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
"generated_decimal",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/0.14.1/{}.arrow_file",
testdata, path
))
.unwrap();
let mut reader = FileReader::try_new(file).unwrap();
let arrow_json = read_gzip_json(path);
assert!(arrow_json.equals_reader(&mut reader));
});
}
#[test]
#[should_panic(expected = "Big Endian is not supported for Decimal!")]
fn read_decimal_be_file_should_panic() {
let testdata = crate::util::test_util::arrow_test_data();
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_decimal.arrow_file",
testdata
))
.unwrap();
FileReader::try_new(file).unwrap();
}
#[test]
fn read_generated_be_files_should_work() {
let testdata = crate::util::test_util::arrow_test_data();
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_dictionary",
"generated_nested",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/1.0.0-bigendian/{}.arrow_file",
testdata, path
))
.unwrap();
FileReader::try_new(file).unwrap();
});
}
#[test]
fn read_generated_streams() {
let testdata = crate::util::test_util::arrow_test_data();
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_dictionary",
"generated_nested",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
"generated_decimal",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/0.14.1/{}.stream",
testdata, path
))
.unwrap();
let mut reader = StreamReader::try_new(file).unwrap();
let arrow_json = read_gzip_json(path);
assert!(arrow_json.equals_reader(&mut reader));
assert!(reader.next().is_none());
assert!(reader.is_finished());
});
}
#[test]
fn test_arrow_single_float_row() {
let schema = Schema::new(vec![
Field::new("a", DataType::Float32, false),
Field::new("b", DataType::Float32, false),
Field::new("c", DataType::Int32, false),
Field::new("d", DataType::Int32, false),
]);
let arrays = vec![
Arc::new(Float32Array::from(vec![1.23])) as ArrayRef,
Arc::new(Float32Array::from(vec![-6.50])) as ArrayRef,
Arc::new(Int32Array::from(vec![2])) as ArrayRef,
Arc::new(Int32Array::from(vec![1])) as ArrayRef,
];
let batch = RecordBatch::try_new(Arc::new(schema.clone()), arrays).unwrap();
let file = File::create("target/debug/testdata/float.stream").unwrap();
let mut stream_writer =
crate::ipc::writer::StreamWriter::try_new(file, &schema).unwrap();
stream_writer.write(&batch).unwrap();
stream_writer.finish().unwrap();
let file = File::open("target/debug/testdata/float.stream").unwrap();
let reader = StreamReader::try_new(file).unwrap();
reader.for_each(|batch| {
let batch = batch.unwrap();
assert!(
batch
.column(0)
.as_any()
.downcast_ref::<Float32Array>()
.unwrap()
.value(0)
!= 0.0
);
assert!(
batch
.column(1)
.as_any()
.downcast_ref::<Float32Array>()
.unwrap()
.value(0)
!= 0.0
);
})
}
fn read_gzip_json(path: &str) -> ArrowJson {
let testdata = crate::util::test_util::arrow_test_data();
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/0.14.1/{}.json.gz",
testdata, path
))
.unwrap();
let mut gz = GzDecoder::new(&file);
let mut s = String::new();
gz.read_to_string(&mut s).unwrap();
let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();
arrow_json
}
}