llkv_column_map/
serialization.rs

1//! Zero-copy array persistence for fixed/var width Arrow arrays used by the
2//! store.
3//!
4//! ## Why a custom on-disk format instead of Arrow IPC?
5//!
6//! This module persists Arrow arrays in a minimal, mmap-friendly container so
7//! we can reconstruct `ArrayData` *zero-copy* from a single memory-mapped
8//! region. Using Arrow IPC (stream/file) directly would:
9//!   - Increase file size from extra framing, padding, and metadata.
10//!   - Require additional allocations and buffer copies during decode.
11//!   - Prevent us from keeping a single contiguous payload per array, which
12//!     hurts scan performance.
13//!
14//! Design goals of this format:
15//! - **Minimal headers**: fixed-size header plus raw buffers only, no framing
16//!   or schema objects.
17//! - **Predictable contiguous payloads**: each array’s bytes live together in
18//!   one region, ideal for mmap and SIMD access.
19//! - **True zero-copy rebuild**: deserialization produces `ArrayData` that
20//!   references the original mmap directly, avoiding memcpy.
21//! - **Simpler invariants**: deliberately omits certain features (e.g., null
22//!   bitmaps) to keep the format compact and reconstruction trivial.
23//! - **Stable codes**: layout and type tags are explicitly pinned with
24//!   compile-time checks to avoid silent corruption.
25//!
26//! Net effect: smaller files, faster scans, and preserved zero-copy semantics
27//! tailored to this storage engine’s access pattern, at the cost of leaving
28//! out some of the generality of Arrow IPC.
29
30use std::convert::TryFrom;
31use std::sync::Arc;
32
33use arrow::array::{Array, ArrayData, ArrayRef, FixedSizeListArray, Float32Array, make_array};
34use arrow::buffer::Buffer;
35use arrow::datatypes::{DataType, Field, Schema};
36use num_enum::{IntoPrimitive, TryFromPrimitive};
37use simd_r_drive_entry_handle::EntryHandle;
38
39use llkv_result::{Error, Result};
40
41const MAGIC: [u8; 4] = *b"ARR0";
42
43/// On-disk layout selector for serialized Arrow arrays.
44///
45/// The zero-copy header is:
46///   bytes 0..=3  : MAGIC = b"ARR0"
47///   byte  4      : layout code (see `Layout`)
48///   byte  5      : type code (PrimType; may be 0 for non-primitive layouts)
49///   bytes 6..=7  : reserved (currently 0)
50///   bytes 8..=15 : len (u64) = logical array length (number of elements)
51///   bytes 16..=19: extra_a (u32) = layout-specific
52///   bytes 20..=23: extra_b (u32) = layout-specific
53///   bytes 24..   : payload (layout-specific), no null bitmaps supported yet
54///
55/// Variants:
56///
57/// - `Primitive`
58///
59///   - Fixed-width primitive arrays (e.g., Int32, UInt64, Float32, Float64).
60///
61///   - Header:
62///     - len = element count
63///     - extra_a = values_len (u32), byte length of the values buffer
64///     - extra_b = 0
65///
66///   - Payload:
67///     - [values buffer bytes]
68///
69///   - Notes:
70///     - `type_code` (header byte 5) is a `PrimType` that chooses the Arrow
71///       primitive `DataType`.
72///     - Nulls are not supported yet (null_count must be 0).
73///
74/// - `FslFloat32`
75///
76///   - Specialized fast-path for `FixedSizeList<Float32>`. Encodes a
77///     vector-like column (e.g., embeddings) as one contiguous `Float32`
78///     child buffer, without per-list headers.
79///
80///   - Header:
81///     - len = number of lists (rows)
82///     - extra_a = list_size (u32), number of f32 values per list
83///     - extra_b = child_values_len (u32), total bytes in the child f32
84///       buffer
85///
86///   - Payload:
87///     - [child Float32 values as a single buffer]
88///       The child has `len * list_size` elements, each 4 bytes (f32).
89///
90///   - Constraints:
91///     - The parent `FixedSizeList` has no nulls.
92///     - The child array has `DataType::Float32` and no nulls.
93///     - `type_code` (header byte 5) is unused here and written as 0.
94///
95///   - Rationale:
96///     - Many workloads store dense float vectors (embeddings) as
97///       `FixedSizeList<Float32>`. This variant avoids extra nesting
98///       overhead and allows a direct slice into the contiguous f32 buffer.
99///
100/// - `Varlen`
101///
102///   - Variable-length, currently for `Binary` (offsets + values).
103///
104///   - Header:
105///     - len = number of binary values
106///     - extra_a = offsets_len (u32) in bytes
107///     - extra_b = values_len (u32) in bytes
108///
109///   - Payload:
110///     - [offsets buffer bytes][values buffer bytes]
111///
112///   - Notes:
113///     - `type_code` is a `PrimType` and must be `Binary` for now.
114///     - Nulls are not supported yet (null_count must be 0).
115///
116/// - **Struct** (`Layout::Struct`)
117///
118///   - Header:
119///     - len = number of struct values
120///     - extra_a = unused (0)
121///     - extra_b = payload_len (u32) in bytes
122///
123///   - Payload:
124///     - [IPC-serialized struct array bytes]
125///
126///   - Notes:
127///     - Uses Arrow IPC format for struct serialization
128///     - Nulls are not supported yet (null_count must be 0).
129#[repr(u8)]
130enum Layout {
131    Primitive = 0,
132    FslFloat32 = 1,
133    Varlen = 2,
134    Struct = 3,
135}
136
137/// Stable on-disk primitive type codes. Do not reorder. Only append new
138/// variants at the end with explicit numeric values.
139///
140/// Using `num_enum` gives zero-cost Into/TryFrom conversions and avoids
141/// manual matches for u8 <-> enum mapping.
142///
143/// NOTE: Decimal128 stores precision and scale inline as the next two bytes after the type tag.
144#[repr(u8)]
145#[derive(Copy, Clone, Debug, Eq, PartialEq, IntoPrimitive, TryFromPrimitive)]
146enum PrimType {
147    UInt64 = 1,
148    Int32 = 2,
149    UInt32 = 3,
150    Float32 = 4,
151    Binary = 5,
152    Int64 = 6,
153    Int16 = 7,
154    Int8 = 8,
155    UInt16 = 9,
156    UInt8 = 10,
157    Float64 = 11,
158    Utf8 = 12,
159    LargeBinary = 13,
160    LargeUtf8 = 14,
161    Boolean = 15,
162    Date32 = 16,
163    Date64 = 17,
164    Decimal128 = 18,
165    Utf8View = 19,
166}
167
168use crate::codecs::{read_u32_le, read_u64_le, write_u32_le, write_u64_le};
169
170/// Map Arrow `DataType` to on-disk `PrimType`.
171/// For Decimal128, precision and scale are stored separately in the serialized format.
172#[inline]
173fn prim_from_datatype(dt: &DataType) -> Result<PrimType> {
174    use DataType::*;
175    let p = match dt {
176        UInt64 => PrimType::UInt64,
177        Int64 => PrimType::Int64,
178        Int32 => PrimType::Int32,
179        Int16 => PrimType::Int16,
180        Int8 => PrimType::Int8,
181        UInt32 => PrimType::UInt32,
182        UInt16 => PrimType::UInt16,
183        UInt8 => PrimType::UInt8,
184        Float32 => PrimType::Float32,
185        Float64 => PrimType::Float64,
186        Binary => PrimType::Binary,
187        Utf8 => PrimType::Utf8,
188        Utf8View => PrimType::Utf8View,
189        LargeBinary => PrimType::LargeBinary,
190        LargeUtf8 => PrimType::LargeUtf8,
191        Boolean => PrimType::Boolean,
192        Date32 => PrimType::Date32,
193        Date64 => PrimType::Date64,
194        Decimal128(_, _) => PrimType::Decimal128,
195        _ => return Err(Error::Internal("unsupported Arrow type".into())),
196    };
197    Ok(p)
198}
199
200/// Map on-disk `PrimType` to Arrow `DataType`.
201/// For Decimal128, precision and scale must be provided separately.
202#[inline]
203fn datatype_from_prim(p: PrimType, precision: u8, scale: u8) -> Result<DataType> {
204    use DataType::*;
205    let dt = match p {
206        PrimType::UInt64 => UInt64,
207        PrimType::Int64 => Int64,
208        PrimType::Int32 => Int32,
209        PrimType::Int16 => Int16,
210        PrimType::Int8 => Int8,
211        PrimType::UInt32 => UInt32,
212        PrimType::UInt16 => UInt16,
213        PrimType::UInt8 => UInt8,
214        PrimType::Float32 => Float32,
215        PrimType::Float64 => Float64,
216        PrimType::Binary => Binary,
217        PrimType::Utf8 => Utf8,
218        PrimType::Utf8View => Utf8View,
219        PrimType::LargeBinary => LargeBinary,
220        PrimType::LargeUtf8 => LargeUtf8,
221        PrimType::Boolean => Boolean,
222        PrimType::Date32 => Date32,
223        PrimType::Date64 => Date64,
224        PrimType::Decimal128 => Decimal128(precision, scale as i8),
225    };
226    Ok(dt)
227}
228
229/// Serialize array buffers with a minimal header (no nulls supported yet).
230pub fn serialize_array(arr: &dyn Array) -> Result<Vec<u8>> {
231    match arr.data_type() {
232        // Var-len path stays explicit to preserve layout.
233        &DataType::Binary => serialize_varlen(arr, PrimType::Binary),
234        &DataType::Utf8 => serialize_varlen(arr, PrimType::Utf8),
235        &DataType::LargeBinary => serialize_varlen(arr, PrimType::LargeBinary),
236        &DataType::LargeUtf8 => serialize_varlen(arr, PrimType::LargeUtf8),
237        &DataType::Utf8View => {
238            let casted = arrow::compute::cast(arr, &DataType::Utf8)
239                .map_err(|e| Error::Internal(format!("failed to cast Utf8View to Utf8: {}", e)))?;
240            serialize_varlen(&casted, PrimType::Utf8View)
241        }
242
243        // Special-case fixed-size list of f32 as before.
244        &DataType::FixedSizeList(ref child, list_size) => {
245            if child.data_type() != &DataType::Float32 {
246                return Err(Error::Internal(
247                    "Only FixedSizeList<Float32> supported".into(),
248                ));
249            }
250            serialize_fsl_float32(arr, list_size)
251        }
252
253        // Struct types use IPC serialization
254        DataType::Struct(_) => serialize_struct(arr),
255
256        // All remaining supported fixed-width primitives route here.
257        dt => {
258            let p = prim_from_datatype(dt)?;
259            serialize_primitive(arr, p)
260        }
261    }
262}
263
264fn serialize_primitive(arr: &dyn Array, code: PrimType) -> Result<Vec<u8>> {
265    if arr.null_count() != 0 {
266        return Err(Error::Internal(
267            "nulls not supported in zero-copy format (yet)".into(),
268        ));
269    }
270    let data = arr.to_data();
271    let len = data.len() as u64;
272    let values = data
273        .buffers()
274        .first()
275        .ok_or_else(|| Error::Internal("missing values buffer".into()))?;
276    let values_bytes = values.as_slice();
277    let values_len = u32::try_from(values_bytes.len())
278        .map_err(|_| Error::Internal("values too large".into()))?;
279
280    let mut out = Vec::with_capacity(24 + values_bytes.len());
281    out.extend_from_slice(&MAGIC);
282    out.push(Layout::Primitive as u8);
283    out.push(u8::from(code));
284
285    // For Decimal128, store precision and scale in the 2 padding bytes
286    // For other types, use zeros as before
287    match code {
288        PrimType::Decimal128 => {
289            if let DataType::Decimal128(precision, scale) = arr.data_type() {
290                out.push(*precision);
291                out.push(*scale as u8);
292            } else {
293                return Err(Error::Internal("expected Decimal128 data type".into()));
294            }
295        }
296        _ => {
297            // 2 bytes padding reserved (e.g., future versioning).
298            out.extend_from_slice(&[0u8; 2]);
299        }
300    }
301
302    write_u64_le(&mut out, len);
303    write_u32_le(&mut out, values_len);
304    write_u32_le(&mut out, 0);
305    out.extend_from_slice(values_bytes);
306    Ok(out)
307}
308
309fn serialize_varlen(arr: &dyn Array, code: PrimType) -> Result<Vec<u8>> {
310    if arr.null_count() != 0 {
311        return Err(Error::Internal(
312            "nulls not supported in zero-copy format (yet)".into(),
313        ));
314    }
315    let data = arr.to_data();
316    let len = data.len() as u64;
317
318    let offsets_buf = data
319        .buffers()
320        .first()
321        .ok_or_else(|| Error::Internal("missing offsets buffer".into()))?;
322    let values_buf = data
323        .buffers()
324        .get(1)
325        .ok_or_else(|| Error::Internal("missing values buffer for varlen".into()))?;
326
327    let offsets_bytes = offsets_buf.as_slice();
328    let values_bytes = values_buf.as_slice();
329
330    let offsets_len = u32::try_from(offsets_bytes.len())
331        .map_err(|_| Error::Internal("offsets buffer too large".into()))?;
332    let values_len = u32::try_from(values_bytes.len())
333        .map_err(|_| Error::Internal("values buffer too large".into()))?;
334
335    let mut out = Vec::with_capacity(24 + offsets_bytes.len() + values_bytes.len());
336    out.extend_from_slice(&MAGIC);
337    out.push(Layout::Varlen as u8);
338    out.push(u8::from(code));
339    // 2 bytes padding reserved (e.g., future versioning).
340    out.extend_from_slice(&[0u8; 2]);
341    write_u64_le(&mut out, len);
342    write_u32_le(&mut out, offsets_len);
343    write_u32_le(&mut out, values_len);
344    out.extend_from_slice(offsets_bytes);
345    out.extend_from_slice(values_bytes);
346    Ok(out)
347}
348
349fn serialize_fsl_float32(arr: &dyn Array, list_size: i32) -> Result<Vec<u8>> {
350    if arr.null_count() != 0 {
351        return Err(Error::Internal(
352            "nulls not supported in zero-copy format (yet)".into(),
353        ));
354    }
355    let fsl = arr
356        .as_any()
357        .downcast_ref::<FixedSizeListArray>()
358        .ok_or_else(|| Error::Internal("FSL downcast failed".into()))?;
359
360    let values = fsl.values();
361    if values.null_count() != 0 || values.data_type() != &DataType::Float32 {
362        return Err(Error::Internal("FSL child must be non-null Float32".into()));
363    }
364
365    let child = values.to_data();
366    let child_buf = child
367        .buffers()
368        .first()
369        .ok_or_else(|| Error::Internal("missing child values".into()))?;
370    let child_bytes = child_buf.as_slice();
371
372    let child_len =
373        u32::try_from(child_bytes.len()).map_err(|_| Error::Internal("child too large".into()))?;
374
375    let mut out = Vec::with_capacity(24 + child_bytes.len());
376    out.extend_from_slice(&MAGIC);
377    out.push(Layout::FslFloat32 as u8);
378    out.push(0); // no PrimType for FSL header slot
379    // 2 bytes padding reserved (e.g., future versioning).
380    out.extend_from_slice(&[0u8; 2]);
381    write_u64_le(&mut out, fsl.len() as u64);
382    write_u32_le(&mut out, u32::try_from(list_size).unwrap());
383    write_u32_le(&mut out, child_len);
384    out.extend_from_slice(child_bytes);
385    Ok(out)
386}
387
388fn serialize_struct(arr: &dyn Array) -> Result<Vec<u8>> {
389    if arr.null_count() != 0 {
390        return Err(Error::Internal(
391            "nulls not supported in zero-copy format (yet)".into(),
392        ));
393    }
394
395    // Use Arrow IPC format to serialize the struct array
396    use arrow::ipc::writer::StreamWriter;
397    use arrow::record_batch::RecordBatch;
398
399    // Create a RecordBatch with a single column containing the struct array
400    let schema = Arc::new(Schema::new(vec![Field::new(
401        "struct_col",
402        arr.data_type().clone(),
403        false,
404    )]));
405    let array_ref = make_array(arr.to_data());
406    let batch = RecordBatch::try_new(schema, vec![array_ref])
407        .map_err(|e| Error::Internal(format!("failed to create record batch: {}", e)))?;
408
409    // Serialize to IPC format
410    let mut ipc_bytes = Vec::new();
411    {
412        let mut writer = StreamWriter::try_new(&mut ipc_bytes, &batch.schema())
413            .map_err(|e| Error::Internal(format!("failed to create IPC writer: {}", e)))?;
414        writer
415            .write(&batch)
416            .map_err(|e| Error::Internal(format!("failed to write IPC: {}", e)))?;
417        writer
418            .finish()
419            .map_err(|e| Error::Internal(format!("failed to finish IPC: {}", e)))?;
420    }
421
422    let payload_len = u32::try_from(ipc_bytes.len())
423        .map_err(|_| Error::Internal("IPC payload too large".into()))?;
424
425    let mut out = Vec::with_capacity(24 + ipc_bytes.len());
426    out.extend_from_slice(&MAGIC);
427    out.push(Layout::Struct as u8);
428    out.push(0); // no PrimType for struct
429    out.extend_from_slice(&[0u8; 2]); // padding
430    write_u64_le(&mut out, arr.len() as u64);
431    write_u32_le(&mut out, 0); // extra_a unused
432    write_u32_le(&mut out, payload_len);
433    out.extend_from_slice(&ipc_bytes);
434    Ok(out)
435}
436
437/// Deserialize zero-copy from a pager blob.
438pub fn deserialize_array(blob: EntryHandle) -> Result<ArrayRef> {
439    let raw = blob.as_ref();
440    if raw.len() < 24 || raw[0..4] != MAGIC {
441        return Err(Error::Internal("bad array blob magic/size".into()));
442    }
443
444    let layout = raw[4];
445    let type_code = raw[5];
446    let precision = raw[6]; // Used for Decimal128
447    let scale = raw[7]; // Used for Decimal128
448
449    let mut o = 8usize;
450    let len = read_u64_le(raw, &mut o) as usize;
451    let extra_a = read_u32_le(raw, &mut o);
452    let extra_b = read_u32_le(raw, &mut o);
453
454    let whole: Buffer = blob.as_arrow_buffer();
455    let payload: Buffer = whole.slice_with_length(o, whole.len() - o);
456
457    match layout {
458        x if x == Layout::Primitive as u8 => {
459            let values_len = extra_a as usize;
460            if payload.len() != values_len {
461                return Err(Error::Internal("primitive payload length mismatch".into()));
462            }
463
464            let p = PrimType::try_from(type_code)
465                .map_err(|_| Error::Internal("unsupported primitive code".into()))?;
466            let data_type = datatype_from_prim(p, precision, scale)?;
467
468            // Decimal128 requires 16-byte alignment. If the buffer isn't aligned, copy it.
469            let buffer = if matches!(data_type, DataType::Decimal128(_, _)) {
470                let ptr = payload.as_ptr();
471                if !(ptr as usize).is_multiple_of(16) {
472                    // Buffer is not 16-byte aligned, need to copy to aligned buffer
473                    let mut aligned_vec = Vec::with_capacity(payload.len());
474                    aligned_vec.extend_from_slice(&payload);
475                    arrow::buffer::Buffer::from(aligned_vec)
476                } else {
477                    payload
478                }
479            } else {
480                payload
481            };
482
483            let data = ArrayData::builder(data_type)
484                .len(len)
485                .add_buffer(buffer)
486                .build()?;
487            Ok(make_array(data))
488        }
489
490        x if x == Layout::FslFloat32 as u8 => {
491            let list_size = extra_a as i32;
492            let child_values_len = extra_b as usize;
493            if payload.len() != child_values_len {
494                return Err(Error::Internal("fsl child length mismatch".into()));
495            }
496
497            let child_values = payload;
498            let child_len = len * list_size as usize;
499
500            let child_data = ArrayData::builder(DataType::Float32)
501                .len(child_len)
502                .add_buffer(child_values)
503                .build()?;
504            let child = Arc::new(Float32Array::from(child_data)) as ArrayRef;
505
506            let field = Arc::new(Field::new("item", DataType::Float32, false));
507            let arr_data = ArrayData::builder(DataType::FixedSizeList(field, list_size))
508                .len(len)
509                .add_child_data(child.to_data())
510                .build()?;
511            Ok(Arc::new(FixedSizeListArray::from(arr_data)))
512        }
513
514        x if x == Layout::Varlen as u8 => {
515            let offsets_len = extra_a as usize;
516            let values_len = extra_b as usize;
517            if payload.len() != offsets_len + values_len {
518                return Err(Error::Internal("varlen payload length mismatch".into()));
519            }
520
521            let offsets = payload.slice_with_length(0, offsets_len);
522            let values = payload.slice_with_length(offsets_len, values_len);
523
524            let p = PrimType::try_from(type_code)
525                .map_err(|_| Error::Internal("unsupported varlen code".into()))?;
526
527            // Special handling for Utf8View stored as Utf8
528            if p == PrimType::Utf8View {
529                let data_type = DataType::Utf8;
530                let data = ArrayData::builder(data_type)
531                    .len(len)
532                    .add_buffer(offsets)
533                    .add_buffer(values)
534                    .build()?;
535                let utf8_array = make_array(data);
536                // Cast to Utf8View
537                let view_array =
538                    arrow::compute::cast(&utf8_array, &DataType::Utf8View).map_err(|e| {
539                        Error::Internal(format!("failed to cast Utf8 to Utf8View: {}", e))
540                    })?;
541                return Ok(view_array);
542            }
543
544            // Varlen types (Binary, Utf8, etc.) don't use precision/scale
545            let data_type = datatype_from_prim(p, 0, 0)?;
546
547            let data = ArrayData::builder(data_type)
548                .len(len)
549                .add_buffer(offsets)
550                .add_buffer(values)
551                .build()?;
552            Ok(make_array(data))
553        }
554
555        x if x == Layout::Struct as u8 => {
556            let payload_len = extra_b as usize;
557            if payload.len() != payload_len {
558                return Err(Error::Internal("struct payload length mismatch".into()));
559            }
560
561            // Deserialize from IPC format
562            use arrow::ipc::reader::StreamReader;
563            use std::io::Cursor;
564
565            let cursor = Cursor::new(payload.as_slice());
566            let mut reader = StreamReader::try_new(cursor, None)
567                .map_err(|e| Error::Internal(format!("failed to create IPC reader: {}", e)))?;
568
569            let batch = reader
570                .next()
571                .ok_or_else(|| Error::Internal("no batch in IPC stream".into()))?
572                .map_err(|e| Error::Internal(format!("failed to read IPC batch: {}", e)))?;
573
574            if batch.num_columns() != 1 {
575                return Err(Error::Internal(
576                    "expected single column in struct batch".into(),
577                ));
578            }
579
580            Ok(batch.column(0).clone())
581        }
582
583        _ => Err(Error::Internal("unknown layout".into())),
584    }
585}
586
587/* ---- Compile-time pinning of on-disk codes -------------------------------
588   Changing any discriminant silently would corrupt persistence. These const
589   checks make such edits fail to compile immediately.
590*/
591#[allow(clippy::no_effect)]
592const _: () = {
593    // true -> 1, false -> 0; index out of bounds if false.
594    ["code changed"][!(PrimType::UInt64 as u8 == 1) as usize];
595    ["code changed"][!(PrimType::Int32 as u8 == 2) as usize];
596    ["code changed"][!(PrimType::UInt32 as u8 == 3) as usize];
597    ["code changed"][!(PrimType::Float32 as u8 == 4) as usize];
598    ["code changed"][!(PrimType::Binary as u8 == 5) as usize];
599    ["code changed"][!(PrimType::Int64 as u8 == 6) as usize];
600    ["code changed"][!(PrimType::Int16 as u8 == 7) as usize];
601    ["code changed"][!(PrimType::Int8 as u8 == 8) as usize];
602    ["code changed"][!(PrimType::UInt16 as u8 == 9) as usize];
603    ["code changed"][!(PrimType::UInt8 as u8 == 10) as usize];
604    ["code changed"][!(PrimType::Float64 as u8 == 11) as usize];
605    ["code changed"][!(PrimType::Utf8 as u8 == 12) as usize];
606    ["code changed"][!(PrimType::LargeBinary as u8 == 13) as usize];
607    ["code changed"][!(PrimType::LargeUtf8 as u8 == 14) as usize];
608    ["code changed"][!(PrimType::Boolean as u8 == 15) as usize];
609    ["code changed"][!(PrimType::Date32 as u8 == 16) as usize];
610    ["code changed"][!(PrimType::Date64 as u8 == 17) as usize];
611    ["code changed"][!(PrimType::Utf8View as u8 == 19) as usize];
612};