1use std::convert::TryFrom;
31use std::sync::Arc;
32
33use arrow::array::{Array, ArrayData, ArrayRef, FixedSizeListArray, Float32Array, make_array};
34use arrow::buffer::Buffer;
35use arrow::datatypes::{DataType, Field, Schema};
36use num_enum::{IntoPrimitive, TryFromPrimitive};
37use simd_r_drive_entry_handle::EntryHandle;
38
39use llkv_result::{Error, Result};
40
41const MAGIC: [u8; 4] = *b"ARR0";
42
43#[repr(u8)]
130enum Layout {
131 Primitive = 0,
132 FslFloat32 = 1,
133 Varlen = 2,
134 Struct = 3,
135}
136
137#[repr(u8)]
145#[derive(Copy, Clone, Debug, Eq, PartialEq, IntoPrimitive, TryFromPrimitive)]
146enum PrimType {
147 UInt64 = 1,
148 Int32 = 2,
149 UInt32 = 3,
150 Float32 = 4,
151 Binary = 5,
152 Int64 = 6,
153 Int16 = 7,
154 Int8 = 8,
155 UInt16 = 9,
156 UInt8 = 10,
157 Float64 = 11,
158 Utf8 = 12,
159 LargeBinary = 13,
160 LargeUtf8 = 14,
161 Boolean = 15,
162 Date32 = 16,
163 Date64 = 17,
164 Decimal128 = 18,
165 Utf8View = 19,
166}
167
168use crate::codecs::{read_u32_le, read_u64_le, write_u32_le, write_u64_le};
169
170#[inline]
173fn prim_from_datatype(dt: &DataType) -> Result<PrimType> {
174 use DataType::*;
175 let p = match dt {
176 UInt64 => PrimType::UInt64,
177 Int64 => PrimType::Int64,
178 Int32 => PrimType::Int32,
179 Int16 => PrimType::Int16,
180 Int8 => PrimType::Int8,
181 UInt32 => PrimType::UInt32,
182 UInt16 => PrimType::UInt16,
183 UInt8 => PrimType::UInt8,
184 Float32 => PrimType::Float32,
185 Float64 => PrimType::Float64,
186 Binary => PrimType::Binary,
187 Utf8 => PrimType::Utf8,
188 Utf8View => PrimType::Utf8View,
189 LargeBinary => PrimType::LargeBinary,
190 LargeUtf8 => PrimType::LargeUtf8,
191 Boolean => PrimType::Boolean,
192 Date32 => PrimType::Date32,
193 Date64 => PrimType::Date64,
194 Decimal128(_, _) => PrimType::Decimal128,
195 _ => return Err(Error::Internal("unsupported Arrow type".into())),
196 };
197 Ok(p)
198}
199
200#[inline]
203fn datatype_from_prim(p: PrimType, precision: u8, scale: u8) -> Result<DataType> {
204 use DataType::*;
205 let dt = match p {
206 PrimType::UInt64 => UInt64,
207 PrimType::Int64 => Int64,
208 PrimType::Int32 => Int32,
209 PrimType::Int16 => Int16,
210 PrimType::Int8 => Int8,
211 PrimType::UInt32 => UInt32,
212 PrimType::UInt16 => UInt16,
213 PrimType::UInt8 => UInt8,
214 PrimType::Float32 => Float32,
215 PrimType::Float64 => Float64,
216 PrimType::Binary => Binary,
217 PrimType::Utf8 => Utf8,
218 PrimType::Utf8View => Utf8View,
219 PrimType::LargeBinary => LargeBinary,
220 PrimType::LargeUtf8 => LargeUtf8,
221 PrimType::Boolean => Boolean,
222 PrimType::Date32 => Date32,
223 PrimType::Date64 => Date64,
224 PrimType::Decimal128 => Decimal128(precision, scale as i8),
225 };
226 Ok(dt)
227}
228
229pub fn serialize_array(arr: &dyn Array) -> Result<Vec<u8>> {
231 match arr.data_type() {
232 &DataType::Binary => serialize_varlen(arr, PrimType::Binary),
234 &DataType::Utf8 => serialize_varlen(arr, PrimType::Utf8),
235 &DataType::LargeBinary => serialize_varlen(arr, PrimType::LargeBinary),
236 &DataType::LargeUtf8 => serialize_varlen(arr, PrimType::LargeUtf8),
237 &DataType::Utf8View => {
238 let casted = arrow::compute::cast(arr, &DataType::Utf8)
239 .map_err(|e| Error::Internal(format!("failed to cast Utf8View to Utf8: {}", e)))?;
240 serialize_varlen(&casted, PrimType::Utf8View)
241 }
242
243 &DataType::FixedSizeList(ref child, list_size) => {
245 if child.data_type() != &DataType::Float32 {
246 return Err(Error::Internal(
247 "Only FixedSizeList<Float32> supported".into(),
248 ));
249 }
250 serialize_fsl_float32(arr, list_size)
251 }
252
253 DataType::Struct(_) => serialize_struct(arr),
255
256 dt => {
258 let p = prim_from_datatype(dt)?;
259 serialize_primitive(arr, p)
260 }
261 }
262}
263
264fn serialize_primitive(arr: &dyn Array, code: PrimType) -> Result<Vec<u8>> {
265 if arr.null_count() != 0 {
266 return Err(Error::Internal(
267 "nulls not supported in zero-copy format (yet)".into(),
268 ));
269 }
270 let data = arr.to_data();
271 let len = data.len() as u64;
272 let values = data
273 .buffers()
274 .first()
275 .ok_or_else(|| Error::Internal("missing values buffer".into()))?;
276 let values_bytes = values.as_slice();
277 let values_len = u32::try_from(values_bytes.len())
278 .map_err(|_| Error::Internal("values too large".into()))?;
279
280 let mut out = Vec::with_capacity(24 + values_bytes.len());
281 out.extend_from_slice(&MAGIC);
282 out.push(Layout::Primitive as u8);
283 out.push(u8::from(code));
284
285 match code {
288 PrimType::Decimal128 => {
289 if let DataType::Decimal128(precision, scale) = arr.data_type() {
290 out.push(*precision);
291 out.push(*scale as u8);
292 } else {
293 return Err(Error::Internal("expected Decimal128 data type".into()));
294 }
295 }
296 _ => {
297 out.extend_from_slice(&[0u8; 2]);
299 }
300 }
301
302 write_u64_le(&mut out, len);
303 write_u32_le(&mut out, values_len);
304 write_u32_le(&mut out, 0);
305 out.extend_from_slice(values_bytes);
306 Ok(out)
307}
308
309fn serialize_varlen(arr: &dyn Array, code: PrimType) -> Result<Vec<u8>> {
310 if arr.null_count() != 0 {
311 return Err(Error::Internal(
312 "nulls not supported in zero-copy format (yet)".into(),
313 ));
314 }
315 let data = arr.to_data();
316 let len = data.len() as u64;
317
318 let offsets_buf = data
319 .buffers()
320 .first()
321 .ok_or_else(|| Error::Internal("missing offsets buffer".into()))?;
322 let values_buf = data
323 .buffers()
324 .get(1)
325 .ok_or_else(|| Error::Internal("missing values buffer for varlen".into()))?;
326
327 let offsets_bytes = offsets_buf.as_slice();
328 let values_bytes = values_buf.as_slice();
329
330 let offsets_len = u32::try_from(offsets_bytes.len())
331 .map_err(|_| Error::Internal("offsets buffer too large".into()))?;
332 let values_len = u32::try_from(values_bytes.len())
333 .map_err(|_| Error::Internal("values buffer too large".into()))?;
334
335 let mut out = Vec::with_capacity(24 + offsets_bytes.len() + values_bytes.len());
336 out.extend_from_slice(&MAGIC);
337 out.push(Layout::Varlen as u8);
338 out.push(u8::from(code));
339 out.extend_from_slice(&[0u8; 2]);
341 write_u64_le(&mut out, len);
342 write_u32_le(&mut out, offsets_len);
343 write_u32_le(&mut out, values_len);
344 out.extend_from_slice(offsets_bytes);
345 out.extend_from_slice(values_bytes);
346 Ok(out)
347}
348
349fn serialize_fsl_float32(arr: &dyn Array, list_size: i32) -> Result<Vec<u8>> {
350 if arr.null_count() != 0 {
351 return Err(Error::Internal(
352 "nulls not supported in zero-copy format (yet)".into(),
353 ));
354 }
355 let fsl = arr
356 .as_any()
357 .downcast_ref::<FixedSizeListArray>()
358 .ok_or_else(|| Error::Internal("FSL downcast failed".into()))?;
359
360 let values = fsl.values();
361 if values.null_count() != 0 || values.data_type() != &DataType::Float32 {
362 return Err(Error::Internal("FSL child must be non-null Float32".into()));
363 }
364
365 let child = values.to_data();
366 let child_buf = child
367 .buffers()
368 .first()
369 .ok_or_else(|| Error::Internal("missing child values".into()))?;
370 let child_bytes = child_buf.as_slice();
371
372 let child_len =
373 u32::try_from(child_bytes.len()).map_err(|_| Error::Internal("child too large".into()))?;
374
375 let mut out = Vec::with_capacity(24 + child_bytes.len());
376 out.extend_from_slice(&MAGIC);
377 out.push(Layout::FslFloat32 as u8);
378 out.push(0); out.extend_from_slice(&[0u8; 2]);
381 write_u64_le(&mut out, fsl.len() as u64);
382 write_u32_le(&mut out, u32::try_from(list_size).unwrap());
383 write_u32_le(&mut out, child_len);
384 out.extend_from_slice(child_bytes);
385 Ok(out)
386}
387
388fn serialize_struct(arr: &dyn Array) -> Result<Vec<u8>> {
389 if arr.null_count() != 0 {
390 return Err(Error::Internal(
391 "nulls not supported in zero-copy format (yet)".into(),
392 ));
393 }
394
395 use arrow::ipc::writer::StreamWriter;
397 use arrow::record_batch::RecordBatch;
398
399 let schema = Arc::new(Schema::new(vec![Field::new(
401 "struct_col",
402 arr.data_type().clone(),
403 false,
404 )]));
405 let array_ref = make_array(arr.to_data());
406 let batch = RecordBatch::try_new(schema, vec![array_ref])
407 .map_err(|e| Error::Internal(format!("failed to create record batch: {}", e)))?;
408
409 let mut ipc_bytes = Vec::new();
411 {
412 let mut writer = StreamWriter::try_new(&mut ipc_bytes, &batch.schema())
413 .map_err(|e| Error::Internal(format!("failed to create IPC writer: {}", e)))?;
414 writer
415 .write(&batch)
416 .map_err(|e| Error::Internal(format!("failed to write IPC: {}", e)))?;
417 writer
418 .finish()
419 .map_err(|e| Error::Internal(format!("failed to finish IPC: {}", e)))?;
420 }
421
422 let payload_len = u32::try_from(ipc_bytes.len())
423 .map_err(|_| Error::Internal("IPC payload too large".into()))?;
424
425 let mut out = Vec::with_capacity(24 + ipc_bytes.len());
426 out.extend_from_slice(&MAGIC);
427 out.push(Layout::Struct as u8);
428 out.push(0); out.extend_from_slice(&[0u8; 2]); write_u64_le(&mut out, arr.len() as u64);
431 write_u32_le(&mut out, 0); write_u32_le(&mut out, payload_len);
433 out.extend_from_slice(&ipc_bytes);
434 Ok(out)
435}
436
437pub fn deserialize_array(blob: EntryHandle) -> Result<ArrayRef> {
439 let raw = blob.as_ref();
440 if raw.len() < 24 || raw[0..4] != MAGIC {
441 return Err(Error::Internal("bad array blob magic/size".into()));
442 }
443
444 let layout = raw[4];
445 let type_code = raw[5];
446 let precision = raw[6]; let scale = raw[7]; let mut o = 8usize;
450 let len = read_u64_le(raw, &mut o) as usize;
451 let extra_a = read_u32_le(raw, &mut o);
452 let extra_b = read_u32_le(raw, &mut o);
453
454 let whole: Buffer = blob.as_arrow_buffer();
455 let payload: Buffer = whole.slice_with_length(o, whole.len() - o);
456
457 match layout {
458 x if x == Layout::Primitive as u8 => {
459 let values_len = extra_a as usize;
460 if payload.len() != values_len {
461 return Err(Error::Internal("primitive payload length mismatch".into()));
462 }
463
464 let p = PrimType::try_from(type_code)
465 .map_err(|_| Error::Internal("unsupported primitive code".into()))?;
466 let data_type = datatype_from_prim(p, precision, scale)?;
467
468 let buffer = if matches!(data_type, DataType::Decimal128(_, _)) {
470 let ptr = payload.as_ptr();
471 if !(ptr as usize).is_multiple_of(16) {
472 let mut aligned_vec = Vec::with_capacity(payload.len());
474 aligned_vec.extend_from_slice(&payload);
475 arrow::buffer::Buffer::from(aligned_vec)
476 } else {
477 payload
478 }
479 } else {
480 payload
481 };
482
483 let data = ArrayData::builder(data_type)
484 .len(len)
485 .add_buffer(buffer)
486 .build()?;
487 Ok(make_array(data))
488 }
489
490 x if x == Layout::FslFloat32 as u8 => {
491 let list_size = extra_a as i32;
492 let child_values_len = extra_b as usize;
493 if payload.len() != child_values_len {
494 return Err(Error::Internal("fsl child length mismatch".into()));
495 }
496
497 let child_values = payload;
498 let child_len = len * list_size as usize;
499
500 let child_data = ArrayData::builder(DataType::Float32)
501 .len(child_len)
502 .add_buffer(child_values)
503 .build()?;
504 let child = Arc::new(Float32Array::from(child_data)) as ArrayRef;
505
506 let field = Arc::new(Field::new("item", DataType::Float32, false));
507 let arr_data = ArrayData::builder(DataType::FixedSizeList(field, list_size))
508 .len(len)
509 .add_child_data(child.to_data())
510 .build()?;
511 Ok(Arc::new(FixedSizeListArray::from(arr_data)))
512 }
513
514 x if x == Layout::Varlen as u8 => {
515 let offsets_len = extra_a as usize;
516 let values_len = extra_b as usize;
517 if payload.len() != offsets_len + values_len {
518 return Err(Error::Internal("varlen payload length mismatch".into()));
519 }
520
521 let offsets = payload.slice_with_length(0, offsets_len);
522 let values = payload.slice_with_length(offsets_len, values_len);
523
524 let p = PrimType::try_from(type_code)
525 .map_err(|_| Error::Internal("unsupported varlen code".into()))?;
526
527 if p == PrimType::Utf8View {
529 let data_type = DataType::Utf8;
530 let data = ArrayData::builder(data_type)
531 .len(len)
532 .add_buffer(offsets)
533 .add_buffer(values)
534 .build()?;
535 let utf8_array = make_array(data);
536 let view_array =
538 arrow::compute::cast(&utf8_array, &DataType::Utf8View).map_err(|e| {
539 Error::Internal(format!("failed to cast Utf8 to Utf8View: {}", e))
540 })?;
541 return Ok(view_array);
542 }
543
544 let data_type = datatype_from_prim(p, 0, 0)?;
546
547 let data = ArrayData::builder(data_type)
548 .len(len)
549 .add_buffer(offsets)
550 .add_buffer(values)
551 .build()?;
552 Ok(make_array(data))
553 }
554
555 x if x == Layout::Struct as u8 => {
556 let payload_len = extra_b as usize;
557 if payload.len() != payload_len {
558 return Err(Error::Internal("struct payload length mismatch".into()));
559 }
560
561 use arrow::ipc::reader::StreamReader;
563 use std::io::Cursor;
564
565 let cursor = Cursor::new(payload.as_slice());
566 let mut reader = StreamReader::try_new(cursor, None)
567 .map_err(|e| Error::Internal(format!("failed to create IPC reader: {}", e)))?;
568
569 let batch = reader
570 .next()
571 .ok_or_else(|| Error::Internal("no batch in IPC stream".into()))?
572 .map_err(|e| Error::Internal(format!("failed to read IPC batch: {}", e)))?;
573
574 if batch.num_columns() != 1 {
575 return Err(Error::Internal(
576 "expected single column in struct batch".into(),
577 ));
578 }
579
580 Ok(batch.column(0).clone())
581 }
582
583 _ => Err(Error::Internal("unknown layout".into())),
584 }
585}
586
587#[allow(clippy::no_effect)]
592const _: () = {
593 ["code changed"][!(PrimType::UInt64 as u8 == 1) as usize];
595 ["code changed"][!(PrimType::Int32 as u8 == 2) as usize];
596 ["code changed"][!(PrimType::UInt32 as u8 == 3) as usize];
597 ["code changed"][!(PrimType::Float32 as u8 == 4) as usize];
598 ["code changed"][!(PrimType::Binary as u8 == 5) as usize];
599 ["code changed"][!(PrimType::Int64 as u8 == 6) as usize];
600 ["code changed"][!(PrimType::Int16 as u8 == 7) as usize];
601 ["code changed"][!(PrimType::Int8 as u8 == 8) as usize];
602 ["code changed"][!(PrimType::UInt16 as u8 == 9) as usize];
603 ["code changed"][!(PrimType::UInt8 as u8 == 10) as usize];
604 ["code changed"][!(PrimType::Float64 as u8 == 11) as usize];
605 ["code changed"][!(PrimType::Utf8 as u8 == 12) as usize];
606 ["code changed"][!(PrimType::LargeBinary as u8 == 13) as usize];
607 ["code changed"][!(PrimType::LargeUtf8 as u8 == 14) as usize];
608 ["code changed"][!(PrimType::Boolean as u8 == 15) as usize];
609 ["code changed"][!(PrimType::Date32 as u8 == 16) as usize];
610 ["code changed"][!(PrimType::Date64 as u8 == 17) as usize];
611 ["code changed"][!(PrimType::Utf8View as u8 == 19) as usize];
612};