use std::convert::TryFrom;
use std::sync::Arc;
use arrow::array::{Array, ArrayData, ArrayRef, FixedSizeListArray, Float32Array, make_array};
use arrow::buffer::Buffer;
use arrow::datatypes::{DataType, Field, Schema};
use num_enum::{IntoPrimitive, TryFromPrimitive};
use simd_r_drive_entry_handle::EntryHandle;
use llkv_result::{Error, Result};
const MAGIC: [u8; 4] = *b"ARR0";
#[repr(u8)]
enum Layout {
Primitive = 0,
FslFloat32 = 1,
Varlen = 2,
Struct = 3,
}
#[repr(u8)]
#[derive(Copy, Clone, Debug, Eq, PartialEq, IntoPrimitive, TryFromPrimitive)]
enum PrimType {
UInt64 = 1,
Int32 = 2,
UInt32 = 3,
Float32 = 4,
Binary = 5,
Int64 = 6,
Int16 = 7,
Int8 = 8,
UInt16 = 9,
UInt8 = 10,
Float64 = 11,
Utf8 = 12,
LargeBinary = 13,
LargeUtf8 = 14,
Boolean = 15,
Date32 = 16,
Date64 = 17,
Decimal128 = 18,
Utf8View = 19,
}
use crate::codecs::{read_u32_le, read_u64_le, write_u32_le, write_u64_le};
#[inline]
fn prim_from_datatype(dt: &DataType) -> Result<PrimType> {
use DataType::*;
let p = match dt {
UInt64 => PrimType::UInt64,
Int64 => PrimType::Int64,
Int32 => PrimType::Int32,
Int16 => PrimType::Int16,
Int8 => PrimType::Int8,
UInt32 => PrimType::UInt32,
UInt16 => PrimType::UInt16,
UInt8 => PrimType::UInt8,
Float32 => PrimType::Float32,
Float64 => PrimType::Float64,
Binary => PrimType::Binary,
Utf8 => PrimType::Utf8,
Utf8View => PrimType::Utf8View,
LargeBinary => PrimType::LargeBinary,
LargeUtf8 => PrimType::LargeUtf8,
Boolean => PrimType::Boolean,
Date32 => PrimType::Date32,
Date64 => PrimType::Date64,
Decimal128(_, _) => PrimType::Decimal128,
_ => return Err(Error::Internal("unsupported Arrow type".into())),
};
Ok(p)
}
#[inline]
fn datatype_from_prim(p: PrimType, precision: u8, scale: u8) -> Result<DataType> {
use DataType::*;
let dt = match p {
PrimType::UInt64 => UInt64,
PrimType::Int64 => Int64,
PrimType::Int32 => Int32,
PrimType::Int16 => Int16,
PrimType::Int8 => Int8,
PrimType::UInt32 => UInt32,
PrimType::UInt16 => UInt16,
PrimType::UInt8 => UInt8,
PrimType::Float32 => Float32,
PrimType::Float64 => Float64,
PrimType::Binary => Binary,
PrimType::Utf8 => Utf8,
PrimType::Utf8View => Utf8View,
PrimType::LargeBinary => LargeBinary,
PrimType::LargeUtf8 => LargeUtf8,
PrimType::Boolean => Boolean,
PrimType::Date32 => Date32,
PrimType::Date64 => Date64,
PrimType::Decimal128 => Decimal128(precision, scale as i8),
};
Ok(dt)
}
pub fn serialize_array(arr: &dyn Array) -> Result<Vec<u8>> {
match arr.data_type() {
&DataType::Binary => serialize_varlen(arr, PrimType::Binary),
&DataType::Utf8 => serialize_varlen(arr, PrimType::Utf8),
&DataType::LargeBinary => serialize_varlen(arr, PrimType::LargeBinary),
&DataType::LargeUtf8 => serialize_varlen(arr, PrimType::LargeUtf8),
&DataType::Utf8View => {
let casted = arrow::compute::cast(arr, &DataType::Utf8)
.map_err(|e| Error::Internal(format!("failed to cast Utf8View to Utf8: {}", e)))?;
serialize_varlen(&casted, PrimType::Utf8View)
}
&DataType::FixedSizeList(ref child, list_size) => {
if child.data_type() != &DataType::Float32 {
return Err(Error::Internal(
"Only FixedSizeList<Float32> supported".into(),
));
}
serialize_fsl_float32(arr, list_size)
}
DataType::Struct(_) => serialize_struct(arr),
dt => {
let p = prim_from_datatype(dt)?;
serialize_primitive(arr, p)
}
}
}
fn serialize_primitive(arr: &dyn Array, code: PrimType) -> Result<Vec<u8>> {
if arr.null_count() != 0 {
return Err(Error::Internal(
"nulls not supported in zero-copy format (yet)".into(),
));
}
let data = arr.to_data();
let len = data.len() as u64;
let values = data
.buffers()
.first()
.ok_or_else(|| Error::Internal("missing values buffer".into()))?;
let values_bytes = values.as_slice();
let values_len = u32::try_from(values_bytes.len())
.map_err(|_| Error::Internal("values too large".into()))?;
let mut out = Vec::with_capacity(24 + values_bytes.len());
out.extend_from_slice(&MAGIC);
out.push(Layout::Primitive as u8);
out.push(u8::from(code));
match code {
PrimType::Decimal128 => {
if let DataType::Decimal128(precision, scale) = arr.data_type() {
out.push(*precision);
out.push(*scale as u8);
} else {
return Err(Error::Internal("expected Decimal128 data type".into()));
}
}
_ => {
out.extend_from_slice(&[0u8; 2]);
}
}
write_u64_le(&mut out, len);
write_u32_le(&mut out, values_len);
write_u32_le(&mut out, 0);
out.extend_from_slice(values_bytes);
Ok(out)
}
fn serialize_varlen(arr: &dyn Array, code: PrimType) -> Result<Vec<u8>> {
if arr.null_count() != 0 {
return Err(Error::Internal(
"nulls not supported in zero-copy format (yet)".into(),
));
}
let data = arr.to_data();
let len = data.len() as u64;
let offsets_buf = data
.buffers()
.first()
.ok_or_else(|| Error::Internal("missing offsets buffer".into()))?;
let values_buf = data
.buffers()
.get(1)
.ok_or_else(|| Error::Internal("missing values buffer for varlen".into()))?;
let offsets_bytes = offsets_buf.as_slice();
let values_bytes = values_buf.as_slice();
let offsets_len = u32::try_from(offsets_bytes.len())
.map_err(|_| Error::Internal("offsets buffer too large".into()))?;
let values_len = u32::try_from(values_bytes.len())
.map_err(|_| Error::Internal("values buffer too large".into()))?;
let mut out = Vec::with_capacity(24 + offsets_bytes.len() + values_bytes.len());
out.extend_from_slice(&MAGIC);
out.push(Layout::Varlen as u8);
out.push(u8::from(code));
out.extend_from_slice(&[0u8; 2]);
write_u64_le(&mut out, len);
write_u32_le(&mut out, offsets_len);
write_u32_le(&mut out, values_len);
out.extend_from_slice(offsets_bytes);
out.extend_from_slice(values_bytes);
Ok(out)
}
fn serialize_fsl_float32(arr: &dyn Array, list_size: i32) -> Result<Vec<u8>> {
if arr.null_count() != 0 {
return Err(Error::Internal(
"nulls not supported in zero-copy format (yet)".into(),
));
}
let fsl = arr
.as_any()
.downcast_ref::<FixedSizeListArray>()
.ok_or_else(|| Error::Internal("FSL downcast failed".into()))?;
let values = fsl.values();
if values.null_count() != 0 || values.data_type() != &DataType::Float32 {
return Err(Error::Internal("FSL child must be non-null Float32".into()));
}
let child = values.to_data();
let child_buf = child
.buffers()
.first()
.ok_or_else(|| Error::Internal("missing child values".into()))?;
let child_bytes = child_buf.as_slice();
let child_len =
u32::try_from(child_bytes.len()).map_err(|_| Error::Internal("child too large".into()))?;
let mut out = Vec::with_capacity(24 + child_bytes.len());
out.extend_from_slice(&MAGIC);
out.push(Layout::FslFloat32 as u8);
out.push(0); out.extend_from_slice(&[0u8; 2]);
write_u64_le(&mut out, fsl.len() as u64);
write_u32_le(&mut out, u32::try_from(list_size).unwrap());
write_u32_le(&mut out, child_len);
out.extend_from_slice(child_bytes);
Ok(out)
}
fn serialize_struct(arr: &dyn Array) -> Result<Vec<u8>> {
if arr.null_count() != 0 {
return Err(Error::Internal(
"nulls not supported in zero-copy format (yet)".into(),
));
}
use arrow::ipc::writer::StreamWriter;
use arrow::record_batch::RecordBatch;
let schema = Arc::new(Schema::new(vec![Field::new(
"struct_col",
arr.data_type().clone(),
false,
)]));
let array_ref = make_array(arr.to_data());
let batch = RecordBatch::try_new(schema, vec![array_ref])
.map_err(|e| Error::Internal(format!("failed to create record batch: {}", e)))?;
let mut ipc_bytes = Vec::new();
{
let mut writer = StreamWriter::try_new(&mut ipc_bytes, &batch.schema())
.map_err(|e| Error::Internal(format!("failed to create IPC writer: {}", e)))?;
writer
.write(&batch)
.map_err(|e| Error::Internal(format!("failed to write IPC: {}", e)))?;
writer
.finish()
.map_err(|e| Error::Internal(format!("failed to finish IPC: {}", e)))?;
}
let payload_len = u32::try_from(ipc_bytes.len())
.map_err(|_| Error::Internal("IPC payload too large".into()))?;
let mut out = Vec::with_capacity(24 + ipc_bytes.len());
out.extend_from_slice(&MAGIC);
out.push(Layout::Struct as u8);
out.push(0); out.extend_from_slice(&[0u8; 2]); write_u64_le(&mut out, arr.len() as u64);
write_u32_le(&mut out, 0); write_u32_le(&mut out, payload_len);
out.extend_from_slice(&ipc_bytes);
Ok(out)
}
pub fn deserialize_array(blob: EntryHandle) -> Result<ArrayRef> {
let raw = blob.as_ref();
if raw.len() < 24 || raw[0..4] != MAGIC {
return Err(Error::Internal("bad array blob magic/size".into()));
}
let layout = raw[4];
let type_code = raw[5];
let precision = raw[6]; let scale = raw[7];
let mut o = 8usize;
let len = read_u64_le(raw, &mut o) as usize;
let extra_a = read_u32_le(raw, &mut o);
let extra_b = read_u32_le(raw, &mut o);
let whole: Buffer = blob.as_arrow_buffer();
let payload: Buffer = whole.slice_with_length(o, whole.len() - o);
match layout {
x if x == Layout::Primitive as u8 => {
let values_len = extra_a as usize;
if payload.len() != values_len {
return Err(Error::Internal("primitive payload length mismatch".into()));
}
let p = PrimType::try_from(type_code)
.map_err(|_| Error::Internal("unsupported primitive code".into()))?;
let data_type = datatype_from_prim(p, precision, scale)?;
let buffer = if matches!(data_type, DataType::Decimal128(_, _)) {
let ptr = payload.as_ptr();
if !(ptr as usize).is_multiple_of(16) {
let mut aligned_vec = Vec::with_capacity(payload.len());
aligned_vec.extend_from_slice(&payload);
arrow::buffer::Buffer::from(aligned_vec)
} else {
payload
}
} else {
payload
};
let data = ArrayData::builder(data_type)
.len(len)
.add_buffer(buffer)
.build()?;
Ok(make_array(data))
}
x if x == Layout::FslFloat32 as u8 => {
let list_size = extra_a as i32;
let child_values_len = extra_b as usize;
if payload.len() != child_values_len {
return Err(Error::Internal("fsl child length mismatch".into()));
}
let child_values = payload;
let child_len = len * list_size as usize;
let child_data = ArrayData::builder(DataType::Float32)
.len(child_len)
.add_buffer(child_values)
.build()?;
let child = Arc::new(Float32Array::from(child_data)) as ArrayRef;
let field = Arc::new(Field::new("item", DataType::Float32, false));
let arr_data = ArrayData::builder(DataType::FixedSizeList(field, list_size))
.len(len)
.add_child_data(child.to_data())
.build()?;
Ok(Arc::new(FixedSizeListArray::from(arr_data)))
}
x if x == Layout::Varlen as u8 => {
let offsets_len = extra_a as usize;
let values_len = extra_b as usize;
if payload.len() != offsets_len + values_len {
return Err(Error::Internal("varlen payload length mismatch".into()));
}
let offsets = payload.slice_with_length(0, offsets_len);
let values = payload.slice_with_length(offsets_len, values_len);
let p = PrimType::try_from(type_code)
.map_err(|_| Error::Internal("unsupported varlen code".into()))?;
if p == PrimType::Utf8View {
let data_type = DataType::Utf8;
let data = ArrayData::builder(data_type)
.len(len)
.add_buffer(offsets)
.add_buffer(values)
.build()?;
let utf8_array = make_array(data);
let view_array =
arrow::compute::cast(&utf8_array, &DataType::Utf8View).map_err(|e| {
Error::Internal(format!("failed to cast Utf8 to Utf8View: {}", e))
})?;
return Ok(view_array);
}
let data_type = datatype_from_prim(p, 0, 0)?;
let data = ArrayData::builder(data_type)
.len(len)
.add_buffer(offsets)
.add_buffer(values)
.build()?;
Ok(make_array(data))
}
x if x == Layout::Struct as u8 => {
let payload_len = extra_b as usize;
if payload.len() != payload_len {
return Err(Error::Internal("struct payload length mismatch".into()));
}
use arrow::ipc::reader::StreamReader;
use std::io::Cursor;
let cursor = Cursor::new(payload.as_slice());
let mut reader = StreamReader::try_new(cursor, None)
.map_err(|e| Error::Internal(format!("failed to create IPC reader: {}", e)))?;
let batch = reader
.next()
.ok_or_else(|| Error::Internal("no batch in IPC stream".into()))?
.map_err(|e| Error::Internal(format!("failed to read IPC batch: {}", e)))?;
if batch.num_columns() != 1 {
return Err(Error::Internal(
"expected single column in struct batch".into(),
));
}
Ok(batch.column(0).clone())
}
_ => Err(Error::Internal("unknown layout".into())),
}
}
#[allow(clippy::no_effect)]
const _: () = {
["code changed"][!(PrimType::UInt64 as u8 == 1) as usize];
["code changed"][!(PrimType::Int32 as u8 == 2) as usize];
["code changed"][!(PrimType::UInt32 as u8 == 3) as usize];
["code changed"][!(PrimType::Float32 as u8 == 4) as usize];
["code changed"][!(PrimType::Binary as u8 == 5) as usize];
["code changed"][!(PrimType::Int64 as u8 == 6) as usize];
["code changed"][!(PrimType::Int16 as u8 == 7) as usize];
["code changed"][!(PrimType::Int8 as u8 == 8) as usize];
["code changed"][!(PrimType::UInt16 as u8 == 9) as usize];
["code changed"][!(PrimType::UInt8 as u8 == 10) as usize];
["code changed"][!(PrimType::Float64 as u8 == 11) as usize];
["code changed"][!(PrimType::Utf8 as u8 == 12) as usize];
["code changed"][!(PrimType::LargeBinary as u8 == 13) as usize];
["code changed"][!(PrimType::LargeUtf8 as u8 == 14) as usize];
["code changed"][!(PrimType::Boolean as u8 == 15) as usize];
["code changed"][!(PrimType::Date32 as u8 == 16) as usize];
["code changed"][!(PrimType::Date64 as u8 == 17) as usize];
["code changed"][!(PrimType::Utf8View as u8 == 19) as usize];
};