vortex_array/
serde.rs

1use std::fmt::{Debug, Formatter};
2use std::iter;
3use std::sync::Arc;
4
5use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset, root};
6use itertools::Itertools;
7use vortex_buffer::{Alignment, ByteBuffer};
8use vortex_dtype::{DType, TryFromBytes};
9use vortex_error::{
10    VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
11};
12use vortex_flatbuffers::array::Compression;
13use vortex_flatbuffers::{
14    FlatBuffer, FlatBufferRoot, ReadFlatBuffer, WriteFlatBuffer, array as fba,
15};
16
17use crate::stats::StatsSet;
18use crate::{Array, ArrayContext, ArrayRef, ArrayVisitor, ArrayVisitorExt};
19
20/// Options for serializing an array.
21#[derive(Default, Debug)]
22pub struct SerializeOptions {
23    /// The starting position within an external stream or file. This offset is used to compute
24    /// appropriate padding to enable zero-copy reads.
25    pub offset: usize,
26    /// Whether to include sufficient zero-copy padding.
27    pub include_padding: bool,
28}
29
30impl dyn Array + '_ {
31    /// Serialize the array into a sequence of byte buffers that should be written contiguously.
32    /// This function returns a vec to avoid copying data buffers.
33    ///
34    /// Optionally, padding can be included to guarantee buffer alignment and ensure zero-copy
35    /// reads within the context of an external file or stream. In this case, the alignment of
36    /// the first byte buffer should be respected when writing the buffers to the stream or file.
37    ///
38    /// The format of this blob is a sequence of data buffers, possible with prefixed padding,
39    /// followed by a flatbuffer containing an [`fba::Array`] message, and ending with a
40    /// little-endian u32 describing the length of the flatbuffer message.
41    pub fn serialize(
42        &self,
43        ctx: &ArrayContext,
44        options: &SerializeOptions,
45    ) -> VortexResult<Vec<ByteBuffer>> {
46        // Collect all array buffers
47        let mut array_buffers = vec![];
48        for a in self.depth_first_traversal() {
49            for buffer in a.buffers() {
50                array_buffers.push(buffer);
51            }
52        }
53
54        // Allocate result buffers, including a possible padding buffer for each.
55        let mut buffers = vec![];
56        let mut fb_buffers = Vec::with_capacity(buffers.capacity());
57
58        // If we're including padding, we need to find the maximum required buffer alignment.
59        let max_alignment = array_buffers
60            .iter()
61            .map(|buf| buf.alignment())
62            .chain(iter::once(FlatBuffer::alignment()))
63            .max()
64            .unwrap_or_else(FlatBuffer::alignment);
65
66        // Create a shared buffer of zeros we can use for padding
67        let zeros = ByteBuffer::zeroed(*max_alignment);
68
69        // We push an empty buffer with the maximum alignment, so then subsequent buffers
70        // will be aligned. For subsequent buffers, we always push a 1-byte alignment.
71        buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
72
73        // Keep track of where we are in the "file" to calculate padding.
74        let mut pos = options.offset;
75
76        // Push all the array buffers with padding as necessary.
77        for buffer in array_buffers {
78            let padding = if options.include_padding {
79                let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
80                if padding > 0 {
81                    pos += padding;
82                    buffers.push(zeros.slice(0..padding));
83                }
84                padding
85            } else {
86                0
87            };
88
89            fb_buffers.push(fba::Buffer::new(
90                u16::try_from(padding).vortex_expect("padding fits into u16"),
91                buffer.alignment().exponent(),
92                Compression::None,
93                u32::try_from(buffer.len())
94                    .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
95            ));
96
97            pos += buffer.len();
98            buffers.push(buffer.aligned(Alignment::none()));
99        }
100
101        // Set up the flatbuffer builder
102        let mut fbb = FlatBufferBuilder::new();
103        let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
104        let fb_root = root.write_flatbuffer(&mut fbb);
105        let fb_buffers = fbb.create_vector(&fb_buffers);
106        let fb_array = fba::Array::create(
107            &mut fbb,
108            &fba::ArrayArgs {
109                root: Some(fb_root),
110                buffers: Some(fb_buffers),
111            },
112        );
113        fbb.finish_minimal(fb_array);
114        let (fb_vec, fb_start) = fbb.collapse();
115        let fb_end = fb_vec.len();
116        let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
117        let fb_length = fb_buffer.len();
118
119        if options.include_padding {
120            let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
121            if padding > 0 {
122                buffers.push(zeros.slice(0..padding));
123            }
124        }
125        buffers.push(fb_buffer);
126
127        // Finally, we write down the u32 length for the flatbuffer.
128        buffers.push(ByteBuffer::from(
129            u32::try_from(fb_length)
130                .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
131                .to_le_bytes()
132                .to_vec(),
133        ));
134
135        Ok(buffers)
136    }
137}
138
139/// A utility struct for creating an [`fba::ArrayNode`] flatbuffer.
140pub struct ArrayNodeFlatBuffer<'a> {
141    ctx: &'a ArrayContext,
142    array: &'a dyn Array,
143    buffer_idx: u16,
144}
145
146impl<'a> ArrayNodeFlatBuffer<'a> {
147    pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn Array) -> VortexResult<Self> {
148        // Depth-first traversal of the array to ensure it supports serialization.
149        for child in array.depth_first_traversal() {
150            if child.metadata()?.is_none() {
151                vortex_bail!(
152                    "Array {} does not support serialization",
153                    child.encoding_id()
154                );
155            }
156        }
157        Ok(Self {
158            ctx,
159            array,
160            buffer_idx: 0,
161        })
162    }
163}
164
165impl FlatBufferRoot for ArrayNodeFlatBuffer<'_> {}
166
167impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
168    type Target<'t> = fba::ArrayNode<'t>;
169
170    fn write_flatbuffer<'fb>(
171        &self,
172        fbb: &mut FlatBufferBuilder<'fb>,
173    ) -> WIPOffset<Self::Target<'fb>> {
174        let encoding = self.ctx.encoding_idx(&self.array.encoding());
175        let metadata = self
176            .array
177            .metadata()
178            // TODO(ngates): add try_write_flatbuffer
179            .vortex_expect("Failed to serialize metadata")
180            .vortex_expect("Validated that all arrays support serialization");
181        let metadata = Some(fbb.create_vector(metadata.as_slice()));
182
183        // Assign buffer indices for all child arrays.
184        let nbuffers = u16::try_from(self.array.nbuffers())
185            .vortex_expect("Array can have at most u16::MAX buffers");
186        let child_buffer_idx = self.buffer_idx + nbuffers;
187
188        let children = self
189            .array
190            .children()
191            .iter()
192            .scan(child_buffer_idx, |buffer_idx, child| {
193                // Update the number of buffers required.
194                let msg = ArrayNodeFlatBuffer {
195                    ctx: self.ctx,
196                    array: child,
197                    buffer_idx: *buffer_idx,
198                }
199                .write_flatbuffer(fbb);
200                *buffer_idx = u16::try_from(child.nbuffers_recursive())
201                    .ok()
202                    .and_then(|nbuffers| nbuffers.checked_add(*buffer_idx))
203                    .vortex_expect("Too many buffers (u16) for Array");
204                Some(msg)
205            })
206            .collect_vec();
207        let children = Some(fbb.create_vector(&children));
208
209        let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
210        let stats = Some(self.array.statistics().to_owned().write_flatbuffer(fbb));
211
212        fba::ArrayNode::create(
213            fbb,
214            &fba::ArrayNodeArgs {
215                encoding,
216                metadata,
217                children,
218                buffers,
219                stats,
220            },
221        )
222    }
223}
224
225/// To minimize the serialized form, arrays do not persist their own dtype and length. Instead,
226/// parent arrays pass this information down during deserialization. This trait abstracts
227/// over either a serialized [`crate::serde::ArrayParts`] or the
228/// in-memory [`crate::data::ArrayData`].
229pub trait ArrayChildren {
230    /// Returns the nth child of the array with the given dtype and length.
231    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
232
233    /// The number of children.
234    fn len(&self) -> usize;
235
236    /// Returns true if there are no children.
237    fn is_empty(&self) -> bool {
238        self.len() == 0
239    }
240}
241
242/// [`ArrayParts`] represents a parsed but not-yet-decoded deserialized [`Array`].
243/// It contains all the information from the serialized form, without anything extra. i.e.
244/// it is missing a [`DType`] and `len`, and the `encoding_id` is not yet resolved to a concrete
245/// vtable.
246///
247/// An [`ArrayParts`] can be fully decoded into an [`ArrayRef`] using the `decode` function.
248#[derive(Clone)]
249pub struct ArrayParts {
250    // Typed as fb::ArrayNode
251    flatbuffer: FlatBuffer,
252    // The location of the current fb::ArrayNode
253    flatbuffer_loc: usize,
254    buffers: Arc<[ByteBuffer]>,
255}
256
257impl Debug for ArrayParts {
258    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
259        f.debug_struct("ArrayParts")
260            .field("encoding_id", &self.encoding_id())
261            .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
262            .field(
263                "buffers",
264                &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
265            )
266            .field("metadata", &self.metadata())
267            .finish()
268    }
269}
270
271impl ArrayParts {
272    /// Decode an [`ArrayParts`] into an [`ArrayRef`].
273    pub fn decode(&self, ctx: &ArrayContext, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
274        let encoding_id = self.flatbuffer().encoding();
275        let vtable = ctx
276            .lookup_encoding(encoding_id)
277            .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
278
279        let buffers: Vec<_> = (0..self.nbuffers())
280            .map(|idx| self.buffer(idx))
281            .try_collect()?;
282
283        let children = ArrayPartsChildren { parts: self, ctx };
284
285        let decoded = vtable.build(dtype, len, self.metadata(), &buffers, &children)?;
286
287        assert_eq!(
288            decoded.len(),
289            len,
290            "Array decoded from {} has incorrect length {}, expected {}",
291            vtable.id(),
292            decoded.len(),
293            len
294        );
295        assert_eq!(
296            decoded.dtype(),
297            dtype,
298            "Array decoded from {} has incorrect dtype {}, expected {}",
299            vtable.id(),
300            decoded.dtype(),
301            dtype,
302        );
303        assert_eq!(
304            decoded.encoding_id(),
305            vtable.id(),
306            "Array decoded from {} has incorrect encoding {}",
307            vtable.id(),
308            decoded.encoding_id(),
309        );
310
311        // Populate statistics from the serialized array.
312        if let Some(stats) = self.flatbuffer().stats() {
313            let decoded_statistics = decoded.statistics();
314            StatsSet::read_flatbuffer(&stats)?
315                .into_iter()
316                .for_each(|(stat, val)| decoded_statistics.set(stat, val));
317        }
318
319        Ok(decoded)
320    }
321
322    /// Returns the array encoding.
323    pub fn encoding_id(&self) -> u16 {
324        self.flatbuffer().encoding()
325    }
326
327    /// Returns the array metadata bytes.
328    pub fn metadata(&self) -> &[u8] {
329        self.flatbuffer()
330            .metadata()
331            .map(|metadata| metadata.bytes())
332            .unwrap_or(&[])
333    }
334
335    /// Returns the number of children.
336    pub fn nchildren(&self) -> usize {
337        self.flatbuffer()
338            .children()
339            .map_or(0, |children| children.len())
340    }
341
342    /// Returns the nth child of the array.
343    pub fn child(&self, idx: usize) -> ArrayParts {
344        let children = self
345            .flatbuffer()
346            .children()
347            .vortex_expect("Expected array to have children");
348        if idx >= children.len() {
349            vortex_panic!(
350                "Invalid child index {} for array with {} children",
351                idx,
352                children.len()
353            );
354        }
355        self.with_root(children.get(idx))
356    }
357
358    /// Returns the number of buffers.
359    pub fn nbuffers(&self) -> usize {
360        self.flatbuffer()
361            .buffers()
362            .map_or(0, |buffers| buffers.len())
363    }
364
365    /// Returns the nth buffer of the current array.
366    pub fn buffer(&self, idx: usize) -> VortexResult<ByteBuffer> {
367        let buffer_idx = self
368            .flatbuffer()
369            .buffers()
370            .ok_or_else(|| vortex_err!("Array has no buffers"))?
371            .get(idx);
372        self.buffers
373            .get(buffer_idx as usize)
374            .cloned()
375            .ok_or_else(|| {
376                vortex_err!(
377                    "Invalid buffer index {} for array with {} buffers",
378                    buffer_idx,
379                    self.nbuffers()
380                )
381            })
382    }
383
384    /// Returns the root ArrayNode flatbuffer.
385    fn flatbuffer(&self) -> fba::ArrayNode {
386        unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
387    }
388
389    /// Returns a new [`ArrayParts`] with the given node as the root
390    // TODO(ngates): we may want a wrapper that avoids this clone.
391    fn with_root(&self, root: fba::ArrayNode) -> Self {
392        let mut this = self.clone();
393        this.flatbuffer_loc = root._tab.loc();
394        this
395    }
396}
397
398struct ArrayPartsChildren<'a> {
399    parts: &'a ArrayParts,
400    ctx: &'a ArrayContext,
401}
402
403impl ArrayChildren for ArrayPartsChildren<'_> {
404    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
405        self.parts.child(index).decode(self.ctx, dtype, len)
406    }
407
408    fn len(&self) -> usize {
409        self.parts.nchildren()
410    }
411}
412
413impl TryFrom<ByteBuffer> for ArrayParts {
414    type Error = VortexError;
415
416    fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
417        // The final 4 bytes contain the length of the flatbuffer.
418        if value.len() < 4 {
419            vortex_bail!("ArrayParts buffer is too short");
420        }
421
422        // We align each buffer individually, so we remove alignment requirements on the buffer.
423        let value = value.aligned(Alignment::none());
424
425        let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
426        if value.len() < 4 + fb_length {
427            vortex_bail!("ArrayParts buffer is too short for flatbuffer");
428        }
429
430        let fb_offset = value.len() - 4 - fb_length;
431        let fb_buffer = value.slice(fb_offset..fb_offset + fb_length);
432        let fb_buffer = FlatBuffer::align_from(fb_buffer);
433
434        let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
435        let fb_root = fb_array.root().vortex_expect("Array must have a root node");
436
437        let mut offset = 0;
438        let buffers: Arc<[ByteBuffer]> = fb_array
439            .buffers()
440            .unwrap_or_default()
441            .iter()
442            .map(|fb_buffer| {
443                // Skip padding
444                offset += fb_buffer.padding() as usize;
445
446                let buffer_len = fb_buffer.length() as usize;
447
448                // Extract a buffer and ensure it's aligned, copying if necessary
449                let buffer = value
450                    .slice(offset..(offset + buffer_len))
451                    .aligned(Alignment::from_exponent(fb_buffer.alignment_exponent()));
452
453                offset += buffer_len;
454                buffer
455            })
456            .collect();
457
458        Ok(ArrayParts {
459            flatbuffer: fb_buffer.clone(),
460            flatbuffer_loc: fb_root._tab.loc(),
461            buffers,
462        })
463    }
464}