use std::io::Cursor;
use std::sync::Arc;
use arrow_array::RecordBatch;
use arrow_ipc::writer::StreamWriter;
use arrow_schema::{Schema, SchemaRef};
use vgi_rpc::wire::StreamReader as VgiStreamReader;
use vgi_rpc::{Result, RpcError};
pub fn write_batch(batch: &RecordBatch) -> Result<Vec<u8>> {
write_batch_with_schema(batch, batch.schema().as_ref())
}
pub fn write_batch_with_schema(batch: &RecordBatch, schema: &Schema) -> Result<Vec<u8>> {
let mut buf = Vec::new();
{
let mut w = StreamWriter::try_new(&mut buf, schema)
.map_err(|e| RpcError::runtime_error(format!("ipc writer: {e}")))?;
w.write(batch)
.map_err(|e| RpcError::runtime_error(format!("ipc write: {e}")))?;
w.finish()
.map_err(|e| RpcError::runtime_error(format!("ipc finish: {e}")))?;
}
Ok(buf)
}
pub fn read_batch(bytes: &[u8]) -> Result<RecordBatch> {
let mut reader = VgiStreamReader::new(Cursor::new(bytes))?.relax_nullability();
match reader.read_next()? {
Some((batch, _meta)) => Ok(batch),
None => Err(RpcError::type_error("ipc stream had no record batch")),
}
}
pub fn read_schema(bytes: &[u8]) -> Result<SchemaRef> {
let reader = VgiStreamReader::new(Cursor::new(bytes))?;
Ok(reader.schema())
}
pub fn write_schema(schema: &Schema) -> Result<Vec<u8>> {
let mut buf = Vec::new();
{
let mut w = StreamWriter::try_new(&mut buf, schema)
.map_err(|e| RpcError::runtime_error(format!("ipc schema writer: {e}")))?;
w.finish()
.map_err(|e| RpcError::runtime_error(format!("ipc schema finish: {e}")))?;
}
Ok(buf)
}
pub fn write_schema_ref(schema: &SchemaRef) -> Result<Vec<u8>> {
write_schema(schema.as_ref())
}
pub fn arc_schema(schema: Schema) -> SchemaRef {
Arc::new(schema)
}