#![expect(
dead_code,
reason = "experimental zero-copy Arrow path; helpers retained for upcoming gRPC/TCP wiring"
)]
use std::collections::VecDeque;
use std::sync::Arc;
use arrow::array::{
Array, BinaryArray, BooleanArray, Date32Array, Float32Array, Float64Array, Int16Array,
Int32Array, Int64Array, LargeBinaryArray, LargeStringArray, StringArray,
TimestampMicrosecondArray,
};
use arrow::buffer::Buffer;
use arrow::datatypes::{DataType, Schema, TimeUnit};
use arrow::ipc::reader::StreamDecoder;
use arrow::record_batch::RecordBatch;
use bytes::Bytes;
use crate::error::{Error, Result};
use hyperdb_api_core::types::SqlType;
#[derive(Debug)]
pub struct ArrowRow<'a> {
batch: &'a RecordBatch,
row_index: usize,
}
impl<'a> ArrowRow<'a> {
pub(crate) fn new(batch: &'a RecordBatch, row_index: usize) -> Self {
ArrowRow { batch, row_index }
}
#[must_use]
pub fn column_count(&self) -> usize {
self.batch.num_columns()
}
#[must_use]
pub fn get<T: FromArrowValue>(&self, col: usize) -> Option<T> {
if col >= self.batch.num_columns() {
return None;
}
T::from_arrow_column(self.batch.column(col), self.row_index)
}
#[must_use]
pub fn get_i16(&self, col: usize) -> Option<i16> {
self.get::<i16>(col)
}
#[must_use]
pub fn get_i32(&self, col: usize) -> Option<i32> {
self.get::<i32>(col)
}
#[must_use]
pub fn get_i64(&self, col: usize) -> Option<i64> {
self.get::<i64>(col)
}
#[must_use]
pub fn get_f32(&self, col: usize) -> Option<f32> {
self.get::<f32>(col)
}
#[must_use]
pub fn get_f64(&self, col: usize) -> Option<f64> {
self.get::<f64>(col)
}
#[must_use]
pub fn get_bool(&self, col: usize) -> Option<bool> {
self.get::<bool>(col)
}
#[must_use]
pub fn get_string(&self, col: usize) -> Option<String> {
self.get::<String>(col)
}
#[must_use]
pub fn get_bytes(&self, col: usize) -> Option<Vec<u8>> {
self.get::<Vec<u8>>(col)
}
#[must_use]
pub fn is_null(&self, col: usize) -> bool {
if col >= self.batch.num_columns() {
return true;
}
self.batch.column(col).is_null(self.row_index)
}
}
pub trait FromArrowValue: Sized {
fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self>;
}
impl FromArrowValue for i16 {
fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
if array.is_null(row) {
return None;
}
if let Some(arr) = array.as_any().downcast_ref::<Int16Array>() {
Some(arr.value(row))
} else if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
i16::try_from(arr.value(row)).ok()
} else {
array
.as_any()
.downcast_ref::<Int64Array>()
.and_then(|arr| i16::try_from(arr.value(row)).ok())
}
}
}
impl FromArrowValue for i32 {
fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
if array.is_null(row) {
return None;
}
if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
Some(arr.value(row))
} else if let Some(arr) = array.as_any().downcast_ref::<Int16Array>() {
Some(i32::from(arr.value(row)))
} else {
array
.as_any()
.downcast_ref::<Int64Array>()
.and_then(|arr| i32::try_from(arr.value(row)).ok())
}
}
}
impl FromArrowValue for i64 {
fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
if array.is_null(row) {
return None;
}
if let Some(arr) = array.as_any().downcast_ref::<Int64Array>() {
Some(arr.value(row))
} else if let Some(arr) = array.as_any().downcast_ref::<Int32Array>() {
Some(i64::from(arr.value(row)))
} else if let Some(arr) = array.as_any().downcast_ref::<Int16Array>() {
Some(i64::from(arr.value(row)))
} else if let Some(arr) = array.as_any().downcast_ref::<Date32Array>() {
Some(i64::from(arr.value(row)))
} else {
array
.as_any()
.downcast_ref::<TimestampMicrosecondArray>()
.map(|arr| arr.value(row))
}
}
}
impl FromArrowValue for f32 {
fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
if array.is_null(row) {
return None;
}
if let Some(arr) = array.as_any().downcast_ref::<Float32Array>() {
Some(arr.value(row))
} else {
array.as_any().downcast_ref::<Float64Array>().map(|arr| {
#[expect(
clippy::cast_possible_truncation,
reason = "f64 → f32 narrowing is caller-accepted precision loss for this column-coercion path"
)]
let narrowed = arr.value(row) as f32;
narrowed
})
}
}
}
impl FromArrowValue for f64 {
fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
if array.is_null(row) {
return None;
}
if let Some(arr) = array.as_any().downcast_ref::<Float64Array>() {
Some(arr.value(row))
} else {
array
.as_any()
.downcast_ref::<Float32Array>()
.map(|arr| f64::from(arr.value(row)))
}
}
}
impl FromArrowValue for bool {
fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
if array.is_null(row) {
return None;
}
array
.as_any()
.downcast_ref::<BooleanArray>()
.map(|arr| arr.value(row))
}
}
impl FromArrowValue for String {
fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
if array.is_null(row) {
return None;
}
if let Some(arr) = array.as_any().downcast_ref::<StringArray>() {
Some(arr.value(row).to_string())
} else {
array
.as_any()
.downcast_ref::<LargeStringArray>()
.map(|arr| arr.value(row).to_string())
}
}
}
impl FromArrowValue for Vec<u8> {
fn from_arrow_column(array: &Arc<dyn Array>, row: usize) -> Option<Self> {
if array.is_null(row) {
return None;
}
if let Some(arr) = array.as_any().downcast_ref::<BinaryArray>() {
Some(arr.value(row).to_vec())
} else {
array
.as_any()
.downcast_ref::<LargeBinaryArray>()
.map(|arr| arr.value(row).to_vec())
}
}
}
#[derive(Debug)]
pub struct ArrowChunk {
batch: RecordBatch,
}
impl ArrowChunk {
pub(crate) fn new(batch: RecordBatch) -> Self {
ArrowChunk { batch }
}
#[must_use]
pub fn len(&self) -> usize {
self.batch.num_rows()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.batch.num_rows() == 0
}
#[must_use]
pub fn column_count(&self) -> usize {
self.batch.num_columns()
}
#[must_use]
pub fn row(&self, index: usize) -> Option<ArrowRow<'_>> {
if index < self.batch.num_rows() {
Some(ArrowRow::new(&self.batch, index))
} else {
None
}
}
#[must_use]
pub fn first(&self) -> Option<ArrowRow<'_>> {
self.row(0)
}
#[must_use]
pub fn iter(&self) -> ArrowChunkIter<'_> {
ArrowChunkIter {
chunk: self,
index: 0,
}
}
#[must_use]
pub fn into_batch(self) -> RecordBatch {
self.batch
}
}
impl<'a> IntoIterator for &'a ArrowChunk {
type Item = ArrowRow<'a>;
type IntoIter = ArrowChunkIter<'a>;
fn into_iter(self) -> Self::IntoIter {
self.iter()
}
}
#[derive(Debug)]
pub struct ArrowChunkIter<'a> {
chunk: &'a ArrowChunk,
index: usize,
}
impl<'a> Iterator for ArrowChunkIter<'a> {
type Item = ArrowRow<'a>;
fn next(&mut self) -> Option<Self::Item> {
if self.index < self.chunk.len() {
let row = ArrowRow::new(&self.chunk.batch, self.index);
self.index += 1;
Some(row)
} else {
None
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.chunk.len() - self.index;
(remaining, Some(remaining))
}
}
impl ExactSizeIterator for ArrowChunkIter<'_> {}
pub trait ChunkSource: Send {
fn next_chunk(&mut self) -> Result<Option<Bytes>>;
}
pub struct ArrowRowset {
inner: ArrowRowsetInner,
schema: Arc<Schema>,
}
impl std::fmt::Debug for ArrowRowset {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ArrowRowset")
.field("schema", &self.schema)
.finish_non_exhaustive()
}
}
enum ArrowRowsetInner {
Buffered {
batches: Vec<RecordBatch>,
current: usize,
},
Streaming {
source: Box<dyn ChunkSource>,
decoder: StreamDecoder,
pending: VecDeque<RecordBatch>,
leftover: Option<Buffer>,
exhausted: bool,
},
}
impl ArrowRowset {
fn empty() -> Self {
ArrowRowset {
inner: ArrowRowsetInner::Buffered {
batches: Vec::new(),
current: 0,
},
schema: Arc::new(Schema::empty()),
}
}
pub fn from_bytes(bytes: Bytes) -> Result<Self> {
if bytes.is_empty() {
return Ok(Self::empty());
}
Self::from_buffer(Buffer::from(bytes))
}
pub fn from_buffer(buf: Buffer) -> Result<Self> {
if buf.is_empty() {
return Ok(Self::empty());
}
let (schema, batches) = decode_possibly_concatenated_streams(buf)?;
Ok(ArrowRowset {
inner: ArrowRowsetInner::Buffered {
batches,
current: 0,
},
schema,
})
}
pub fn from_chunks<I>(chunks: I) -> Result<Self>
where
I: IntoIterator<Item = Bytes>,
{
let mut batches = Vec::new();
let mut schema = Arc::new(Schema::empty());
for chunk in chunks {
if chunk.is_empty() {
continue;
}
let (chunk_schema, chunk_batches) = decode_chunk(chunk)?;
if schema.fields().is_empty() {
schema = chunk_schema;
}
batches.extend(chunk_batches);
}
Ok(ArrowRowset {
inner: ArrowRowsetInner::Buffered {
batches,
current: 0,
},
schema,
})
}
pub fn from_stream(source: Box<dyn ChunkSource>) -> Result<Self> {
let mut rowset = ArrowRowset {
inner: ArrowRowsetInner::Streaming {
source,
decoder: StreamDecoder::new(),
pending: VecDeque::new(),
leftover: None,
exhausted: false,
},
schema: Arc::new(Schema::empty()),
};
rowset.prime_stream()?;
Ok(rowset)
}
fn prime_stream(&mut self) -> Result<()> {
let new_schema = {
let ArrowRowsetInner::Streaming {
source,
decoder,
pending,
leftover,
exhausted,
} = &mut self.inner
else {
return Ok(());
};
while decoder.schema().is_none() && !*exhausted {
let mut buf = match leftover.take() {
Some(b) => b,
None => match source.next_chunk()? {
Some(bytes) if !bytes.is_empty() => Buffer::from(bytes),
Some(_) => continue,
None => {
*exhausted = true;
break;
}
},
};
drive_streaming_decoder(decoder, &mut buf, pending)?;
if !buf.is_empty() {
*leftover = Some(buf);
}
}
decoder.schema()
};
if let Some(s) = new_schema {
self.schema = s;
}
Ok(())
}
pub fn from_ipc_slice(data: &[u8]) -> Result<Self> {
if data.is_empty() {
return Ok(Self::empty());
}
Self::from_buffer(Buffer::from(data.to_vec()))
}
#[must_use]
pub fn schema(&self) -> &Arc<Schema> {
&self.schema
}
#[must_use]
pub fn column_count(&self) -> usize {
self.schema.fields().len()
}
#[must_use]
pub fn column_names(&self) -> Vec<String> {
self.schema
.fields()
.iter()
.map(|f| f.name().clone())
.collect()
}
#[must_use]
pub fn column_name(&self, index: usize) -> Option<&str> {
self.schema.fields().get(index).map(|f| f.name().as_str())
}
pub fn next_chunk(&mut self) -> Result<Option<ArrowChunk>> {
match &mut self.inner {
ArrowRowsetInner::Buffered { batches, current } => {
if *current >= batches.len() {
return Ok(None);
}
let batch = batches[*current].clone();
*current += 1;
Ok(Some(ArrowChunk::new(batch)))
}
ArrowRowsetInner::Streaming {
source,
decoder,
pending,
leftover,
exhausted,
} => loop {
if let Some(batch) = pending.pop_front() {
return Ok(Some(ArrowChunk::new(batch)));
}
if *exhausted {
return Ok(None);
}
let mut buf = match leftover.take() {
Some(b) => b,
None => match source.next_chunk()? {
Some(bytes) if !bytes.is_empty() => Buffer::from(bytes),
Some(_) => continue,
None => {
*exhausted = true;
continue;
}
},
};
drive_streaming_decoder(decoder, &mut buf, pending)?;
if !buf.is_empty() {
*leftover = Some(buf);
}
},
}
}
#[must_use]
pub fn total_rows(&self) -> usize {
match &self.inner {
ArrowRowsetInner::Buffered { batches, .. } => batches
.iter()
.map(arrow::array::RecordBatch::num_rows)
.sum(),
ArrowRowsetInner::Streaming { pending, .. } => pending
.iter()
.map(arrow::array::RecordBatch::num_rows)
.sum(),
}
}
#[must_use]
pub fn is_empty(&self) -> bool {
match &self.inner {
ArrowRowsetInner::Buffered { batches, .. } => {
batches.is_empty() || batches.iter().all(|b| b.num_rows() == 0)
}
ArrowRowsetInner::Streaming {
pending, exhausted, ..
} => *exhausted && pending.is_empty(),
}
}
}
fn arrow_type_to_sql_name(dt: &DataType) -> String {
match dt {
DataType::Boolean => "BOOLEAN".to_string(),
DataType::Int8 => "SMALLINT".to_string(),
DataType::Int16 => "SMALLINT".to_string(),
DataType::Int32 => "INTEGER".to_string(),
DataType::Int64 => "BIGINT".to_string(),
DataType::UInt8 => "SMALLINT".to_string(),
DataType::UInt16 => "INTEGER".to_string(),
DataType::UInt32 => "BIGINT".to_string(),
DataType::UInt64 => "BIGINT".to_string(),
DataType::Float16 => "REAL".to_string(),
DataType::Float32 => "REAL".to_string(),
DataType::Float64 => "DOUBLE PRECISION".to_string(),
DataType::Utf8 | DataType::LargeUtf8 => "TEXT".to_string(),
DataType::Binary | DataType::LargeBinary => "BYTEA".to_string(),
DataType::Date32 | DataType::Date64 => "DATE".to_string(),
DataType::Time32(_) | DataType::Time64(_) => "TIME".to_string(),
DataType::Timestamp(TimeUnit::Microsecond, None) => "TIMESTAMP".to_string(),
DataType::Timestamp(TimeUnit::Microsecond, Some(_)) => "TIMESTAMPTZ".to_string(),
DataType::Timestamp(_, None) => "TIMESTAMP".to_string(),
DataType::Timestamp(_, Some(_)) => "TIMESTAMPTZ".to_string(),
DataType::Decimal128(p, s) => format!("NUMERIC({p}, {s})"),
DataType::Decimal256(p, s) => format!("NUMERIC({p}, {s})"),
DataType::Interval(_) => "INTERVAL".to_string(),
DataType::List(_) => "ARRAY".to_string(),
DataType::Struct(_) => "RECORD".to_string(),
_ => "UNKNOWN".to_string(),
}
}
fn decimal_scale_to_u32(scale: i8) -> u32 {
u32::try_from(scale).unwrap_or(0)
}
pub(crate) fn arrow_type_to_sql_type(dt: &DataType) -> SqlType {
match dt {
DataType::Boolean => SqlType::Bool,
DataType::Int8 | DataType::Int16 => SqlType::SmallInt,
DataType::Int32 => SqlType::Int,
DataType::Int64 => SqlType::BigInt,
DataType::UInt8 | DataType::UInt16 => SqlType::SmallInt,
DataType::UInt32 => SqlType::Int,
DataType::UInt64 => SqlType::BigInt,
DataType::Float16 | DataType::Float32 => SqlType::Float,
DataType::Float64 => SqlType::Double,
DataType::Utf8 | DataType::LargeUtf8 => SqlType::Text,
DataType::Binary | DataType::LargeBinary => SqlType::ByteA,
DataType::Date32 | DataType::Date64 => SqlType::Date,
DataType::Time32(_) | DataType::Time64(_) => SqlType::Time,
DataType::Timestamp(_, None) => SqlType::Timestamp,
DataType::Timestamp(_, Some(_)) => SqlType::TimestampTz,
DataType::Decimal128(p, s) => SqlType::Numeric {
precision: u32::from(*p),
scale: decimal_scale_to_u32(*s),
},
DataType::Decimal256(p, s) => SqlType::Numeric {
precision: u32::from(*p),
scale: decimal_scale_to_u32(*s),
},
DataType::Interval(_) => SqlType::Interval,
_ => SqlType::Text, }
}
pub fn parse_arrow_ipc(bytes: Bytes) -> Result<Vec<RecordBatch>> {
if bytes.is_empty() {
return Ok(Vec::new());
}
let (_, batches) = decode_possibly_concatenated_streams(Buffer::from(bytes))?;
Ok(batches)
}
fn decode_chunk(bytes: Bytes) -> Result<(Arc<Schema>, Vec<RecordBatch>)> {
decode_possibly_concatenated_streams(Buffer::from(bytes))
}
const ARROW_IPC_EOS: [u8; 8] = [0xFF, 0xFF, 0xFF, 0xFF, 0, 0, 0, 0];
fn drive_streaming_decoder(
decoder: &mut StreamDecoder,
buf: &mut Buffer,
pending: &mut VecDeque<RecordBatch>,
) -> Result<()> {
loop {
if buf.is_empty() {
return Ok(());
}
if buf.len() >= ARROW_IPC_EOS.len() && buf[..ARROW_IPC_EOS.len()] == ARROW_IPC_EOS {
let new_len = buf.len() - ARROW_IPC_EOS.len();
*buf = buf.slice_with_length(ARROW_IPC_EOS.len(), new_len);
if !buf.is_empty() {
*decoder = StreamDecoder::new();
}
continue;
}
if decoder.schema().is_some() && peek_is_schema_message(buf) {
match peek_message_total_size(buf) {
Some(total) if buf.len() >= total => {
*buf = buf.slice_with_length(total, buf.len() - total);
continue;
}
_ => {}
}
}
let buf_before = buf.clone();
match decoder.decode(buf) {
Ok(Some(batch)) => pending.push_back(batch),
Ok(None) => return Ok(()),
Err(e) => {
let msg = e.to_string();
if msg.contains("Not expecting a schema when messages are read") {
*buf = buf_before;
*decoder = StreamDecoder::new();
continue;
}
if msg.contains("Unexpected EOS") {
*buf = buf_before;
*decoder = StreamDecoder::new();
continue;
}
return Err(Error::new(format!("Failed to parse Arrow IPC data: {e}")));
}
}
}
}
fn peek_is_schema_message(buf: &Buffer) -> bool {
let Some((_len, body)) = peek_message_body(buf) else {
return false;
};
match arrow::ipc::root_as_message(body) {
Ok(msg) => msg.header_type() == arrow::ipc::MessageHeader::Schema,
Err(_) => false,
}
}
fn peek_message_body(buf: &Buffer) -> Option<(usize, &[u8])> {
let bytes: &[u8] = buf;
let (length_offset, remaining) = if bytes.len() >= 4 && bytes[0..4] == [0xFF; 4] {
(4, &bytes[4..])
} else {
(0, bytes)
};
if remaining.len() < 4 {
return None;
}
let length =
u32::from_le_bytes([remaining[0], remaining[1], remaining[2], remaining[3]]) as usize;
let body_start = length_offset + 4;
if buf.len() < body_start + length {
return None;
}
Some((body_start + length, &bytes[body_start..body_start + length]))
}
fn peek_message_total_size(buf: &Buffer) -> Option<usize> {
let (total, _body) = peek_message_body(buf)?;
Some(total)
}
fn decode_possibly_concatenated_streams(
mut buf: Buffer,
) -> Result<(Arc<Schema>, Vec<RecordBatch>)> {
let mut decoder = StreamDecoder::new();
let mut pending = VecDeque::new();
while !buf.is_empty() {
let before_len = buf.len();
drive_streaming_decoder(&mut decoder, &mut buf, &mut pending)?;
if buf.len() == before_len {
if !buf.is_empty() {
return Err(Error::new(
"Failed to parse Arrow IPC data: decoder made no progress",
));
}
break;
}
}
let schema = decoder
.schema()
.unwrap_or_else(|| Arc::new(Schema::empty()));
Ok((schema, pending.into_iter().collect()))
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::datatypes::Field;
#[test]
fn test_arrow_rowset_empty() {
let rowset = ArrowRowset::from_bytes(Bytes::new()).unwrap();
assert!(rowset.is_empty());
assert_eq!(rowset.column_count(), 0);
let rowset = ArrowRowset::from_ipc_slice(&[]).unwrap();
assert!(rowset.is_empty());
}
#[test]
fn test_arrow_chunk_iteration() {
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
]));
let id_array = Int32Array::from(vec![1, 2, 3]);
let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), None]);
let batch =
RecordBatch::try_new(schema, vec![Arc::new(id_array), Arc::new(name_array)]).unwrap();
let chunk = ArrowChunk::new(batch);
assert_eq!(chunk.len(), 3);
assert_eq!(chunk.column_count(), 2);
let mut iter = chunk.iter();
let row0 = iter.next().unwrap();
assert_eq!(row0.get::<i32>(0), Some(1));
assert_eq!(row0.get::<String>(1), Some("Alice".to_string()));
let row1 = iter.next().unwrap();
assert_eq!(row1.get::<i32>(0), Some(2));
assert_eq!(row1.get::<String>(1), Some("Bob".to_string()));
let row2 = iter.next().unwrap();
assert_eq!(row2.get::<i32>(0), Some(3));
assert_eq!(row2.get::<String>(1), None);
assert!(row2.is_null(1));
assert!(iter.next().is_none());
}
struct VecChunkSource {
chunks: VecDeque<Bytes>,
}
impl VecChunkSource {
fn new(chunks: Vec<Bytes>) -> Self {
VecChunkSource {
chunks: chunks.into(),
}
}
}
impl ChunkSource for VecChunkSource {
fn next_chunk(&mut self) -> Result<Option<Bytes>> {
Ok(self.chunks.pop_front())
}
}
fn serialize_independent_streams(num_streams: usize, rows_per_stream: i32) -> Vec<Bytes> {
use arrow::ipc::writer::StreamWriter;
let schema = Arc::new(Schema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, true),
]));
let mut out = Vec::with_capacity(num_streams);
for s in 0..num_streams {
let start = i32::try_from(s).expect("test uses small stream counts") * rows_per_stream;
let id_array = Int32Array::from((start..start + rows_per_stream).collect::<Vec<_>>());
let name_array = StringArray::from(
(start..start + rows_per_stream)
.map(|i| Some(format!("n{i}")))
.collect::<Vec<_>>(),
);
let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(id_array), Arc::new(name_array)],
)
.unwrap();
let mut buf: Vec<u8> = Vec::new();
{
let mut writer = StreamWriter::try_new(&mut buf, &schema).unwrap();
writer.write(&batch).unwrap();
writer.finish().unwrap();
}
out.push(Bytes::from(buf));
}
out
}
#[test]
fn test_streaming_rowset_single_chunk() {
let chunks = serialize_independent_streams(1, 100);
let source = Box::new(VecChunkSource::new(chunks));
let mut rowset = ArrowRowset::from_stream(source).unwrap();
assert_eq!(rowset.column_count(), 2);
assert_eq!(rowset.column_name(0), Some("id"));
let chunk = rowset.next_chunk().unwrap().expect("one chunk");
assert_eq!(chunk.len(), 100);
assert!(rowset.next_chunk().unwrap().is_none());
}
#[test]
fn test_streaming_rowset_multiple_streams() {
let chunks = serialize_independent_streams(4, 500);
assert_eq!(chunks.len(), 4);
let source = Box::new(VecChunkSource::new(chunks));
let mut rowset = ArrowRowset::from_stream(source).unwrap();
let mut total_rows = 0;
while let Some(chunk) = rowset.next_chunk().unwrap() {
total_rows += chunk.len();
}
assert_eq!(total_rows, 2000);
}
#[test]
fn test_streaming_rowset_empty_source() {
let source = Box::new(VecChunkSource::new(vec![]));
let mut rowset = ArrowRowset::from_stream(source).unwrap();
assert!(rowset.next_chunk().unwrap().is_none());
assert!(rowset.is_empty());
}
#[test]
fn test_from_bytes_concatenated_streams() {
let streams = serialize_independent_streams(2, 300);
let mut concat = bytes::BytesMut::new();
for s in &streams {
concat.extend_from_slice(s);
}
let rowset = ArrowRowset::from_bytes(concat.freeze()).unwrap();
let mut total_rows = 0usize;
let mut rowset = rowset;
while let Some(chunk) = rowset.next_chunk().unwrap() {
total_rows += chunk.len();
}
assert_eq!(total_rows, 600);
}
}