pub mod jsonb_tags;
use std::sync::Arc;
use arrow::buffer::Buffer;
use arrow_array::RecordBatch;
use arrow_ipc::reader::{StreamDecoder, StreamReader};
use arrow_ipc::writer::StreamWriter;
use arrow_schema::{ArrowError, Schema, SchemaRef};
pub fn serialize_batch_stream(batch: &RecordBatch) -> Result<Vec<u8>, arrow_schema::ArrowError> {
let mut buf = Vec::new();
{
let mut writer = StreamWriter::try_new(&mut buf, &batch.schema())?;
writer.write(batch)?;
writer.finish()?;
}
Ok(buf)
}
pub fn deserialize_batch_stream(bytes: &[u8]) -> Result<RecordBatch, arrow_schema::ArrowError> {
let cursor = std::io::Cursor::new(bytes);
let mut reader = StreamReader::try_new(cursor, None)?;
reader.next().ok_or_else(|| {
arrow_schema::ArrowError::IpcError("no record batch in IPC stream".to_string())
})?
}
pub struct BatchStreamEncoder {
writer: StreamWriter<Vec<u8>>,
schema: SchemaRef,
}
impl BatchStreamEncoder {
pub fn new(schema: &Schema) -> Result<Self, ArrowError> {
Ok(Self {
writer: StreamWriter::try_new(Vec::new(), schema)?,
schema: Arc::new(schema.clone()),
})
}
#[must_use]
pub fn schema(&self) -> &SchemaRef {
&self.schema
}
pub fn encode(&mut self, batch: &RecordBatch) -> Result<Vec<u8>, ArrowError> {
self.writer.write(batch)?;
Ok(std::mem::take(self.writer.get_mut()))
}
pub fn finish(&mut self) -> Result<Vec<u8>, ArrowError> {
self.writer.finish()?;
Ok(std::mem::take(self.writer.get_mut()))
}
}
#[derive(Debug, Default)]
pub struct BatchStreamDecoder {
decoder: StreamDecoder,
}
impl BatchStreamDecoder {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn decode_chunk(&mut self, bytes: Vec<u8>) -> Result<Vec<RecordBatch>, ArrowError> {
let mut buffer = Buffer::from_vec(bytes);
let mut batches = Vec::new();
while !buffer.is_empty() {
if let Some(batch) = self.decoder.decode(&mut buffer)? {
batches.push(batch);
}
}
Ok(batches)
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_array::Int32Array;
use arrow_schema::{DataType, Field};
use std::sync::Arc;
fn batch(values: &[i32]) -> RecordBatch {
let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Int32, false)]));
RecordBatch::try_new(schema, vec![Arc::new(Int32Array::from(values.to_vec()))]).unwrap()
}
#[test]
fn stream_encoder_emits_schema_once() {
let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Int32, false)]));
let mut encoder = BatchStreamEncoder::new(&schema).unwrap();
let first = encoder.encode(&batch(&[1, 2, 3])).unwrap();
let second = encoder.encode(&batch(&[4, 5, 6])).unwrap();
assert!(first.len() > second.len());
let standalone = serialize_batch_stream(&batch(&[4, 5, 6])).unwrap();
assert!(second.len() < standalone.len());
}
#[test]
fn stream_encode_decode_roundtrip() {
let schema = Arc::new(Schema::new(vec![Field::new("n", DataType::Int32, false)]));
let mut encoder = BatchStreamEncoder::new(&schema).unwrap();
let inputs = [batch(&[1, 2]), batch(&[3, 4, 5]), batch(&[])];
let mut chunks: Vec<Vec<u8>> = inputs.iter().map(|b| encoder.encode(b).unwrap()).collect();
chunks.push(encoder.finish().unwrap());
let mut decoder = BatchStreamDecoder::new();
let mut out = Vec::new();
for chunk in chunks {
out.extend(decoder.decode_chunk(chunk).unwrap());
}
assert_eq!(out, inputs);
}
}