use std::sync::Arc;
use arrow_array::{Array, BinaryArray, RecordBatch, StructArray};
use arrow_schema::{DataType, Field, Schema, SchemaRef};
use vgi_rpc::{Result, RpcError, VgiArrow};
pub fn flat_schema<T: VgiArrow>() -> SchemaRef {
match T::arrow_data_type() {
arrow_schema::DataType::Struct(fields) => Arc::new(Schema::new(fields)),
other => unreachable!(
"flat_schema requires a struct VgiArrow type, got {other:?} for {}",
T::describe_name()
),
}
}
pub fn to_batch<T: VgiArrow>(value: T) -> Result<RecordBatch> {
let arr = T::build_singleton(value)?;
let sa = arr
.as_any()
.downcast_ref::<StructArray>()
.ok_or_else(|| RpcError::type_error("VgiArrow DTO did not build a StructArray"))?;
Ok(RecordBatch::from(sa))
}
pub fn from_batch<T: VgiArrow>(batch: &RecordBatch) -> Result<T> {
if batch.num_rows() == 0 {
return Err(RpcError::type_error(format!(
"empty request batch for {}",
T::describe_name()
)));
}
let sa = StructArray::from(batch.clone());
T::read(&sa, 0)
}
pub fn read_struct<T: VgiArrow>(arr: &dyn Array, idx: usize) -> Result<T> {
T::read(arr, idx)
}
pub fn result_binary_schema() -> SchemaRef {
Arc::new(Schema::new(vec![Field::new(
"result",
DataType::Binary,
false,
)]))
}
pub fn to_result_batch<T: VgiArrow>(value: T) -> Result<RecordBatch> {
let inner = to_batch(value)?;
let bytes = crate::ipc::write_batch(&inner)?;
let arr = BinaryArray::from(vec![bytes.as_slice()]);
RecordBatch::try_new(result_binary_schema(), vec![Arc::new(arr)])
.map_err(|e| RpcError::runtime_error(format!("build result envelope: {e}")))
}
pub fn result_batch_from_bytes(bytes: &[u8]) -> Result<RecordBatch> {
let arr = BinaryArray::from(vec![bytes]);
RecordBatch::try_new(result_binary_schema(), vec![Arc::new(arr)])
.map_err(|e| RpcError::runtime_error(format!("build result envelope: {e}")))
}
pub fn empty_result_batch() -> Result<RecordBatch> {
use arrow_array::RecordBatchOptions;
let inner = RecordBatch::try_new_with_options(
Arc::new(Schema::empty()),
vec![],
&RecordBatchOptions::new().with_row_count(Some(0)),
)
.map_err(|e| RpcError::runtime_error(format!("empty inner: {e}")))?;
let bytes = crate::ipc::write_batch(&inner)?;
let arr = BinaryArray::from(vec![bytes.as_slice()]);
RecordBatch::try_new(result_binary_schema(), vec![Arc::new(arr)])
.map_err(|e| RpcError::runtime_error(format!("build empty envelope: {e}")))
}