vortex_array/
serde.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::{Debug, Formatter};
5use std::iter;
6use std::sync::Arc;
7
8use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset, root};
9use itertools::Itertools;
10use vortex_buffer::{Alignment, ByteBuffer};
11use vortex_dtype::{DType, TryFromBytes};
12use vortex_error::{
13    VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
14};
15use vortex_flatbuffers::array::Compression;
16use vortex_flatbuffers::{
17    FlatBuffer, FlatBufferRoot, ReadFlatBuffer, WriteFlatBuffer, array as fba,
18};
19
20use crate::stats::StatsSet;
21use crate::{Array, ArrayContext, ArrayRef, ArrayVisitor, ArrayVisitorExt};
22
23/// Options for serializing an array.
24#[derive(Default, Debug)]
25pub struct SerializeOptions {
26    /// The starting position within an external stream or file. This offset is used to compute
27    /// appropriate padding to enable zero-copy reads.
28    pub offset: usize,
29    /// Whether to include sufficient zero-copy padding.
30    pub include_padding: bool,
31}
32
33impl dyn Array + '_ {
34    /// Serialize the array into a sequence of byte buffers that should be written contiguously.
35    /// This function returns a vec to avoid copying data buffers.
36    ///
37    /// Optionally, padding can be included to guarantee buffer alignment and ensure zero-copy
38    /// reads within the context of an external file or stream. In this case, the alignment of
39    /// the first byte buffer should be respected when writing the buffers to the stream or file.
40    ///
41    /// The format of this blob is a sequence of data buffers, possible with prefixed padding,
42    /// followed by a flatbuffer containing an [`fba::Array`] message, and ending with a
43    /// little-endian u32 describing the length of the flatbuffer message.
44    pub fn serialize(
45        &self,
46        ctx: &ArrayContext,
47        options: &SerializeOptions,
48    ) -> VortexResult<Vec<ByteBuffer>> {
49        // Collect all array buffers
50        let array_buffers = self
51            .depth_first_traversal()
52            .flat_map(|f| f.buffers())
53            .collect::<Vec<_>>();
54
55        // Allocate result buffers, including a possible padding buffer for each.
56        let mut buffers = vec![];
57        let mut fb_buffers = Vec::with_capacity(buffers.capacity());
58
59        // If we're including padding, we need to find the maximum required buffer alignment.
60        let max_alignment = array_buffers
61            .iter()
62            .map(|buf| buf.alignment())
63            .chain(iter::once(FlatBuffer::alignment()))
64            .max()
65            .unwrap_or_else(FlatBuffer::alignment);
66
67        // Create a shared buffer of zeros we can use for padding
68        let zeros = ByteBuffer::zeroed(*max_alignment);
69
70        // We push an empty buffer with the maximum alignment, so then subsequent buffers
71        // will be aligned. For subsequent buffers, we always push a 1-byte alignment.
72        buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
73
74        // Keep track of where we are in the "file" to calculate padding.
75        let mut pos = options.offset;
76
77        // Push all the array buffers with padding as necessary.
78        for buffer in array_buffers {
79            let padding = if options.include_padding {
80                let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
81                if padding > 0 {
82                    pos += padding;
83                    buffers.push(zeros.slice(0..padding));
84                }
85                padding
86            } else {
87                0
88            };
89
90            fb_buffers.push(fba::Buffer::new(
91                u16::try_from(padding).vortex_expect("padding fits into u16"),
92                buffer.alignment().exponent(),
93                Compression::None,
94                u32::try_from(buffer.len())
95                    .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
96            ));
97
98            pos += buffer.len();
99            buffers.push(buffer.aligned(Alignment::none()));
100        }
101
102        // Set up the flatbuffer builder
103        let mut fbb = FlatBufferBuilder::new();
104        let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
105        let fb_root = root.write_flatbuffer(&mut fbb);
106        let fb_buffers = fbb.create_vector(&fb_buffers);
107        let fb_array = fba::Array::create(
108            &mut fbb,
109            &fba::ArrayArgs {
110                root: Some(fb_root),
111                buffers: Some(fb_buffers),
112            },
113        );
114        fbb.finish_minimal(fb_array);
115        let (fb_vec, fb_start) = fbb.collapse();
116        let fb_end = fb_vec.len();
117        let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
118        let fb_length = fb_buffer.len();
119
120        if options.include_padding {
121            let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
122            if padding > 0 {
123                buffers.push(zeros.slice(0..padding));
124            }
125        }
126        buffers.push(fb_buffer);
127
128        // Finally, we write down the u32 length for the flatbuffer.
129        buffers.push(ByteBuffer::from(
130            u32::try_from(fb_length)
131                .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
132                .to_le_bytes()
133                .to_vec(),
134        ));
135
136        Ok(buffers)
137    }
138}
139
140/// A utility struct for creating an [`fba::ArrayNode`] flatbuffer.
141pub struct ArrayNodeFlatBuffer<'a> {
142    ctx: &'a ArrayContext,
143    array: &'a dyn Array,
144    buffer_idx: u16,
145}
146
147impl<'a> ArrayNodeFlatBuffer<'a> {
148    pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn Array) -> VortexResult<Self> {
149        // Depth-first traversal of the array to ensure it supports serialization.
150        for child in array.depth_first_traversal() {
151            if child.metadata()?.is_none() {
152                vortex_bail!(
153                    "Array {} does not support serialization",
154                    child.encoding_id()
155                );
156            }
157        }
158        let n_buffers_recursive = array.nbuffers_recursive();
159        if n_buffers_recursive > u16::MAX as usize {
160            vortex_bail!(
161                "Array and all descendent arrays can have at most u16::MAX buffers: {}",
162                n_buffers_recursive
163            );
164        };
165        Ok(Self {
166            ctx,
167            array,
168            buffer_idx: 0,
169        })
170    }
171}
172
173impl FlatBufferRoot for ArrayNodeFlatBuffer<'_> {}
174
175impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
176    type Target<'t> = fba::ArrayNode<'t>;
177
178    fn write_flatbuffer<'fb>(
179        &self,
180        fbb: &mut FlatBufferBuilder<'fb>,
181    ) -> WIPOffset<Self::Target<'fb>> {
182        let encoding = self.ctx.encoding_idx(&self.array.encoding());
183        let metadata = self
184            .array
185            .metadata()
186            // TODO(ngates): add try_write_flatbuffer
187            .vortex_expect("Failed to serialize metadata")
188            .vortex_expect("Validated that all arrays support serialization");
189        let metadata = Some(fbb.create_vector(metadata.as_slice()));
190
191        // Assign buffer indices for all child arrays.
192        let nbuffers = u16::try_from(self.array.nbuffers())
193            .vortex_expect("Array can have at most u16::MAX buffers");
194        let mut child_buffer_idx = self.buffer_idx + nbuffers;
195
196        let children = &self
197            .array
198            .children()
199            .iter()
200            .map(|child| {
201                // Update the number of buffers required.
202                let msg = ArrayNodeFlatBuffer {
203                    ctx: self.ctx,
204                    array: child,
205                    buffer_idx: child_buffer_idx,
206                }
207                .write_flatbuffer(fbb);
208                child_buffer_idx = u16::try_from(child.nbuffers_recursive())
209                    .ok()
210                    .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
211                    .vortex_expect("Too many buffers (u16) for Array");
212                msg
213            })
214            .collect::<Vec<_>>();
215        let children = Some(fbb.create_vector(children));
216
217        let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
218        let stats = Some(self.array.statistics().write_flatbuffer(fbb));
219
220        fba::ArrayNode::create(
221            fbb,
222            &fba::ArrayNodeArgs {
223                encoding,
224                metadata,
225                children,
226                buffers,
227                stats,
228            },
229        )
230    }
231}
232
233/// To minimize the serialized form, arrays do not persist their own dtype and length. Instead,
234/// parent arrays pass this information down during deserialization.
235pub trait ArrayChildren {
236    /// Returns the nth child of the array with the given dtype and length.
237    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
238
239    /// The number of children.
240    fn len(&self) -> usize;
241
242    /// Returns true if there are no children.
243    fn is_empty(&self) -> bool {
244        self.len() == 0
245    }
246}
247
248/// [`ArrayParts`] represents a parsed but not-yet-decoded deserialized [`Array`].
249/// It contains all the information from the serialized form, without anything extra. i.e.
250/// it is missing a [`DType`] and `len`, and the `encoding_id` is not yet resolved to a concrete
251/// vtable.
252///
253/// An [`ArrayParts`] can be fully decoded into an [`ArrayRef`] using the `decode` function.
254#[derive(Clone)]
255pub struct ArrayParts {
256    // Typed as fb::ArrayNode
257    flatbuffer: FlatBuffer,
258    // The location of the current fb::ArrayNode
259    flatbuffer_loc: usize,
260    buffers: Arc<[ByteBuffer]>,
261}
262
263impl Debug for ArrayParts {
264    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
265        f.debug_struct("ArrayParts")
266            .field("encoding_id", &self.encoding_id())
267            .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
268            .field(
269                "buffers",
270                &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
271            )
272            .field("metadata", &self.metadata())
273            .finish()
274    }
275}
276
277impl ArrayParts {
278    /// Decode an [`ArrayParts`] into an [`ArrayRef`].
279    pub fn decode(&self, ctx: &ArrayContext, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
280        let encoding_id = self.flatbuffer().encoding();
281        let vtable = ctx
282            .lookup_encoding(encoding_id)
283            .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
284
285        let buffers: Vec<_> = (0..self.nbuffers())
286            .map(|idx| self.buffer(idx))
287            .try_collect()?;
288
289        let children = ArrayPartsChildren { parts: self, ctx };
290
291        let decoded = vtable.build(dtype, len, self.metadata(), &buffers, &children)?;
292
293        assert_eq!(
294            decoded.len(),
295            len,
296            "Array decoded from {} has incorrect length {}, expected {}",
297            vtable.id(),
298            decoded.len(),
299            len
300        );
301        assert_eq!(
302            decoded.dtype(),
303            dtype,
304            "Array decoded from {} has incorrect dtype {}, expected {}",
305            vtable.id(),
306            decoded.dtype(),
307            dtype,
308        );
309        assert_eq!(
310            decoded.encoding_id(),
311            vtable.id(),
312            "Array decoded from {} has incorrect encoding {}",
313            vtable.id(),
314            decoded.encoding_id(),
315        );
316
317        // Populate statistics from the serialized array.
318        if let Some(stats) = self.flatbuffer().stats() {
319            let decoded_statistics = decoded.statistics();
320            StatsSet::read_flatbuffer(&stats)?
321                .into_iter()
322                .for_each(|(stat, val)| decoded_statistics.set(stat, val));
323        }
324
325        Ok(decoded)
326    }
327
328    /// Returns the array encoding.
329    pub fn encoding_id(&self) -> u16 {
330        self.flatbuffer().encoding()
331    }
332
333    /// Returns the array metadata bytes.
334    pub fn metadata(&self) -> &[u8] {
335        self.flatbuffer()
336            .metadata()
337            .map(|metadata| metadata.bytes())
338            .unwrap_or(&[])
339    }
340
341    /// Returns the number of children.
342    pub fn nchildren(&self) -> usize {
343        self.flatbuffer()
344            .children()
345            .map_or(0, |children| children.len())
346    }
347
348    /// Returns the nth child of the array.
349    pub fn child(&self, idx: usize) -> ArrayParts {
350        let children = self
351            .flatbuffer()
352            .children()
353            .vortex_expect("Expected array to have children");
354        if idx >= children.len() {
355            vortex_panic!(
356                "Invalid child index {} for array with {} children",
357                idx,
358                children.len()
359            );
360        }
361        self.with_root(children.get(idx))
362    }
363
364    /// Returns the number of buffers.
365    pub fn nbuffers(&self) -> usize {
366        self.flatbuffer()
367            .buffers()
368            .map_or(0, |buffers| buffers.len())
369    }
370
371    /// Returns the nth buffer of the current array.
372    pub fn buffer(&self, idx: usize) -> VortexResult<ByteBuffer> {
373        let buffer_idx = self
374            .flatbuffer()
375            .buffers()
376            .ok_or_else(|| vortex_err!("Array has no buffers"))?
377            .get(idx);
378        self.buffers
379            .get(buffer_idx as usize)
380            .cloned()
381            .ok_or_else(|| {
382                vortex_err!(
383                    "Invalid buffer index {} for array with {} buffers",
384                    buffer_idx,
385                    self.nbuffers()
386                )
387            })
388    }
389
390    /// Returns the root ArrayNode flatbuffer.
391    fn flatbuffer(&self) -> fba::ArrayNode<'_> {
392        unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
393    }
394
395    /// Returns a new [`ArrayParts`] with the given node as the root
396    // TODO(ngates): we may want a wrapper that avoids this clone.
397    fn with_root(&self, root: fba::ArrayNode) -> Self {
398        let mut this = self.clone();
399        this.flatbuffer_loc = root._tab.loc();
400        this
401    }
402}
403
404struct ArrayPartsChildren<'a> {
405    parts: &'a ArrayParts,
406    ctx: &'a ArrayContext,
407}
408
409impl ArrayChildren for ArrayPartsChildren<'_> {
410    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
411        self.parts.child(index).decode(self.ctx, dtype, len)
412    }
413
414    fn len(&self) -> usize {
415        self.parts.nchildren()
416    }
417}
418
419impl TryFrom<ByteBuffer> for ArrayParts {
420    type Error = VortexError;
421
422    fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
423        // The final 4 bytes contain the length of the flatbuffer.
424        if value.len() < 4 {
425            vortex_bail!("ArrayParts buffer is too short");
426        }
427
428        // We align each buffer individually, so we remove alignment requirements on the buffer.
429        let value = value.aligned(Alignment::none());
430
431        let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
432        if value.len() < 4 + fb_length {
433            vortex_bail!("ArrayParts buffer is too short for flatbuffer");
434        }
435
436        let fb_offset = value.len() - 4 - fb_length;
437        let fb_buffer = value.slice(fb_offset..fb_offset + fb_length);
438        let fb_buffer = FlatBuffer::align_from(fb_buffer);
439
440        let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
441        let fb_root = fb_array.root().vortex_expect("Array must have a root node");
442
443        let mut offset = 0;
444        let buffers: Arc<[ByteBuffer]> = fb_array
445            .buffers()
446            .unwrap_or_default()
447            .iter()
448            .map(|fb_buffer| {
449                // Skip padding
450                offset += fb_buffer.padding() as usize;
451
452                let buffer_len = fb_buffer.length() as usize;
453
454                // Extract a buffer and ensure it's aligned, copying if necessary
455                let buffer = value
456                    .slice(offset..(offset + buffer_len))
457                    .aligned(Alignment::from_exponent(fb_buffer.alignment_exponent()));
458
459                offset += buffer_len;
460                buffer
461            })
462            .collect();
463
464        Ok(ArrayParts {
465            flatbuffer: fb_buffer.clone(),
466            flatbuffer_loc: fb_root._tab.loc(),
467            buffers,
468        })
469    }
470}