vortex_array/
serde.rs

1use std::fmt::{Debug, Formatter};
2use std::iter;
3use std::sync::Arc;
4
5use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset, root};
6use itertools::Itertools;
7use vortex_buffer::{Alignment, ByteBuffer};
8use vortex_dtype::{DType, TryFromBytes};
9use vortex_error::{
10    VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
11};
12use vortex_flatbuffers::array::Compression;
13use vortex_flatbuffers::{
14    FlatBuffer, FlatBufferRoot, ReadFlatBuffer, WriteFlatBuffer, array as fba,
15};
16
17use crate::stats::StatsSet;
18use crate::{Array, ArrayContext, ArrayRef, ArrayVisitor, ArrayVisitorExt};
19
20/// Options for serializing an array.
21#[derive(Default, Debug)]
22pub struct SerializeOptions {
23    /// The starting position within an external stream or file. This offset is used to compute
24    /// appropriate padding to enable zero-copy reads.
25    pub offset: usize,
26    /// Whether to include sufficient zero-copy padding.
27    pub include_padding: bool,
28}
29
30impl dyn Array + '_ {
31    /// Serialize the array into a sequence of byte buffers that should be written contiguously.
32    /// This function returns a vec to avoid copying data buffers.
33    ///
34    /// Optionally, padding can be included to guarantee buffer alignment and ensure zero-copy
35    /// reads within the context of an external file or stream. In this case, the alignment of
36    /// the first byte buffer should be respected when writing the buffers to the stream or file.
37    ///
38    /// The format of this blob is a sequence of data buffers, possible with prefixed padding,
39    /// followed by a flatbuffer containing an [`fba::Array`] message, and ending with a
40    /// little-endian u32 describing the length of the flatbuffer message.
41    pub fn serialize(
42        &self,
43        ctx: &ArrayContext,
44        options: &SerializeOptions,
45    ) -> VortexResult<Vec<ByteBuffer>> {
46        // Collect all array buffers
47        let array_buffers = self
48            .depth_first_traversal()
49            .flat_map(|f| f.buffers())
50            .collect::<Vec<_>>();
51
52        // Allocate result buffers, including a possible padding buffer for each.
53        let mut buffers = vec![];
54        let mut fb_buffers = Vec::with_capacity(buffers.capacity());
55
56        // If we're including padding, we need to find the maximum required buffer alignment.
57        let max_alignment = array_buffers
58            .iter()
59            .map(|buf| buf.alignment())
60            .chain(iter::once(FlatBuffer::alignment()))
61            .max()
62            .unwrap_or_else(FlatBuffer::alignment);
63
64        // Create a shared buffer of zeros we can use for padding
65        let zeros = ByteBuffer::zeroed(*max_alignment);
66
67        // We push an empty buffer with the maximum alignment, so then subsequent buffers
68        // will be aligned. For subsequent buffers, we always push a 1-byte alignment.
69        buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
70
71        // Keep track of where we are in the "file" to calculate padding.
72        let mut pos = options.offset;
73
74        // Push all the array buffers with padding as necessary.
75        for buffer in array_buffers {
76            let padding = if options.include_padding {
77                let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
78                if padding > 0 {
79                    pos += padding;
80                    buffers.push(zeros.slice(0..padding));
81                }
82                padding
83            } else {
84                0
85            };
86
87            fb_buffers.push(fba::Buffer::new(
88                u16::try_from(padding).vortex_expect("padding fits into u16"),
89                buffer.alignment().exponent(),
90                Compression::None,
91                u32::try_from(buffer.len())
92                    .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
93            ));
94
95            pos += buffer.len();
96            buffers.push(buffer.aligned(Alignment::none()));
97        }
98
99        // Set up the flatbuffer builder
100        let mut fbb = FlatBufferBuilder::new();
101        let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
102        let fb_root = root.write_flatbuffer(&mut fbb);
103        let fb_buffers = fbb.create_vector(&fb_buffers);
104        let fb_array = fba::Array::create(
105            &mut fbb,
106            &fba::ArrayArgs {
107                root: Some(fb_root),
108                buffers: Some(fb_buffers),
109            },
110        );
111        fbb.finish_minimal(fb_array);
112        let (fb_vec, fb_start) = fbb.collapse();
113        let fb_end = fb_vec.len();
114        let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
115        let fb_length = fb_buffer.len();
116
117        if options.include_padding {
118            let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
119            if padding > 0 {
120                buffers.push(zeros.slice(0..padding));
121            }
122        }
123        buffers.push(fb_buffer);
124
125        // Finally, we write down the u32 length for the flatbuffer.
126        buffers.push(ByteBuffer::from(
127            u32::try_from(fb_length)
128                .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
129                .to_le_bytes()
130                .to_vec(),
131        ));
132
133        Ok(buffers)
134    }
135}
136
137/// A utility struct for creating an [`fba::ArrayNode`] flatbuffer.
138pub struct ArrayNodeFlatBuffer<'a> {
139    ctx: &'a ArrayContext,
140    array: &'a dyn Array,
141    buffer_idx: u16,
142}
143
144impl<'a> ArrayNodeFlatBuffer<'a> {
145    pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn Array) -> VortexResult<Self> {
146        // Depth-first traversal of the array to ensure it supports serialization.
147        for child in array.depth_first_traversal() {
148            if child.metadata()?.is_none() {
149                vortex_bail!(
150                    "Array {} does not support serialization",
151                    child.encoding_id()
152                );
153            }
154        }
155        Ok(Self {
156            ctx,
157            array,
158            buffer_idx: 0,
159        })
160    }
161}
162
163impl FlatBufferRoot for ArrayNodeFlatBuffer<'_> {}
164
165impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
166    type Target<'t> = fba::ArrayNode<'t>;
167
168    fn write_flatbuffer<'fb>(
169        &self,
170        fbb: &mut FlatBufferBuilder<'fb>,
171    ) -> WIPOffset<Self::Target<'fb>> {
172        let encoding = self.ctx.encoding_idx(&self.array.encoding());
173        let metadata = self
174            .array
175            .metadata()
176            // TODO(ngates): add try_write_flatbuffer
177            .vortex_expect("Failed to serialize metadata")
178            .vortex_expect("Validated that all arrays support serialization");
179        let metadata = Some(fbb.create_vector(metadata.as_slice()));
180
181        // Assign buffer indices for all child arrays.
182        let nbuffers = u16::try_from(self.array.nbuffers())
183            .vortex_expect("Array can have at most u16::MAX buffers");
184        let mut child_buffer_idx = self.buffer_idx + nbuffers;
185
186        let children = &self
187            .array
188            .children()
189            .iter()
190            .map(|child| {
191                // Update the number of buffers required.
192                let msg = ArrayNodeFlatBuffer {
193                    ctx: self.ctx,
194                    array: child,
195                    buffer_idx: child_buffer_idx,
196                }
197                .write_flatbuffer(fbb);
198                child_buffer_idx = u16::try_from(child.nbuffers_recursive())
199                    .ok()
200                    .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
201                    .vortex_expect("Too many buffers (u16) for Array");
202                msg
203            })
204            .collect::<Vec<_>>();
205        let children = Some(fbb.create_vector(children));
206
207        let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
208        let stats = Some(self.array.statistics().to_owned().write_flatbuffer(fbb));
209
210        fba::ArrayNode::create(
211            fbb,
212            &fba::ArrayNodeArgs {
213                encoding,
214                metadata,
215                children,
216                buffers,
217                stats,
218            },
219        )
220    }
221}
222
223/// To minimize the serialized form, arrays do not persist their own dtype and length. Instead,
224/// parent arrays pass this information down during deserialization. This trait abstracts
225/// over either a serialized [`crate::serde::ArrayParts`] or the
226/// in-memory [`crate::data::ArrayData`].
227pub trait ArrayChildren {
228    /// Returns the nth child of the array with the given dtype and length.
229    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
230
231    /// The number of children.
232    fn len(&self) -> usize;
233
234    /// Returns true if there are no children.
235    fn is_empty(&self) -> bool {
236        self.len() == 0
237    }
238}
239
240/// [`ArrayParts`] represents a parsed but not-yet-decoded deserialized [`Array`].
241/// It contains all the information from the serialized form, without anything extra. i.e.
242/// it is missing a [`DType`] and `len`, and the `encoding_id` is not yet resolved to a concrete
243/// vtable.
244///
245/// An [`ArrayParts`] can be fully decoded into an [`ArrayRef`] using the `decode` function.
246#[derive(Clone)]
247pub struct ArrayParts {
248    // Typed as fb::ArrayNode
249    flatbuffer: FlatBuffer,
250    // The location of the current fb::ArrayNode
251    flatbuffer_loc: usize,
252    buffers: Arc<[ByteBuffer]>,
253}
254
255impl Debug for ArrayParts {
256    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
257        f.debug_struct("ArrayParts")
258            .field("encoding_id", &self.encoding_id())
259            .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
260            .field(
261                "buffers",
262                &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
263            )
264            .field("metadata", &self.metadata())
265            .finish()
266    }
267}
268
269impl ArrayParts {
270    /// Decode an [`ArrayParts`] into an [`ArrayRef`].
271    pub fn decode(&self, ctx: &ArrayContext, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
272        let encoding_id = self.flatbuffer().encoding();
273        let vtable = ctx
274            .lookup_encoding(encoding_id)
275            .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
276
277        let buffers: Vec<_> = (0..self.nbuffers())
278            .map(|idx| self.buffer(idx))
279            .try_collect()?;
280
281        let children = ArrayPartsChildren { parts: self, ctx };
282
283        let decoded = vtable.build(dtype, len, self.metadata(), &buffers, &children)?;
284
285        assert_eq!(
286            decoded.len(),
287            len,
288            "Array decoded from {} has incorrect length {}, expected {}",
289            vtable.id(),
290            decoded.len(),
291            len
292        );
293        assert_eq!(
294            decoded.dtype(),
295            dtype,
296            "Array decoded from {} has incorrect dtype {}, expected {}",
297            vtable.id(),
298            decoded.dtype(),
299            dtype,
300        );
301        assert_eq!(
302            decoded.encoding_id(),
303            vtable.id(),
304            "Array decoded from {} has incorrect encoding {}",
305            vtable.id(),
306            decoded.encoding_id(),
307        );
308
309        // Populate statistics from the serialized array.
310        if let Some(stats) = self.flatbuffer().stats() {
311            let decoded_statistics = decoded.statistics();
312            StatsSet::read_flatbuffer(&stats)?
313                .into_iter()
314                .for_each(|(stat, val)| decoded_statistics.set(stat, val));
315        }
316
317        Ok(decoded)
318    }
319
320    /// Returns the array encoding.
321    pub fn encoding_id(&self) -> u16 {
322        self.flatbuffer().encoding()
323    }
324
325    /// Returns the array metadata bytes.
326    pub fn metadata(&self) -> &[u8] {
327        self.flatbuffer()
328            .metadata()
329            .map(|metadata| metadata.bytes())
330            .unwrap_or(&[])
331    }
332
333    /// Returns the number of children.
334    pub fn nchildren(&self) -> usize {
335        self.flatbuffer()
336            .children()
337            .map_or(0, |children| children.len())
338    }
339
340    /// Returns the nth child of the array.
341    pub fn child(&self, idx: usize) -> ArrayParts {
342        let children = self
343            .flatbuffer()
344            .children()
345            .vortex_expect("Expected array to have children");
346        if idx >= children.len() {
347            vortex_panic!(
348                "Invalid child index {} for array with {} children",
349                idx,
350                children.len()
351            );
352        }
353        self.with_root(children.get(idx))
354    }
355
356    /// Returns the number of buffers.
357    pub fn nbuffers(&self) -> usize {
358        self.flatbuffer()
359            .buffers()
360            .map_or(0, |buffers| buffers.len())
361    }
362
363    /// Returns the nth buffer of the current array.
364    pub fn buffer(&self, idx: usize) -> VortexResult<ByteBuffer> {
365        let buffer_idx = self
366            .flatbuffer()
367            .buffers()
368            .ok_or_else(|| vortex_err!("Array has no buffers"))?
369            .get(idx);
370        self.buffers
371            .get(buffer_idx as usize)
372            .cloned()
373            .ok_or_else(|| {
374                vortex_err!(
375                    "Invalid buffer index {} for array with {} buffers",
376                    buffer_idx,
377                    self.nbuffers()
378                )
379            })
380    }
381
382    /// Returns the root ArrayNode flatbuffer.
383    fn flatbuffer(&self) -> fba::ArrayNode<'_> {
384        unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
385    }
386
387    /// Returns a new [`ArrayParts`] with the given node as the root
388    // TODO(ngates): we may want a wrapper that avoids this clone.
389    fn with_root(&self, root: fba::ArrayNode) -> Self {
390        let mut this = self.clone();
391        this.flatbuffer_loc = root._tab.loc();
392        this
393    }
394}
395
396struct ArrayPartsChildren<'a> {
397    parts: &'a ArrayParts,
398    ctx: &'a ArrayContext,
399}
400
401impl ArrayChildren for ArrayPartsChildren<'_> {
402    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
403        self.parts.child(index).decode(self.ctx, dtype, len)
404    }
405
406    fn len(&self) -> usize {
407        self.parts.nchildren()
408    }
409}
410
411impl TryFrom<ByteBuffer> for ArrayParts {
412    type Error = VortexError;
413
414    fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
415        // The final 4 bytes contain the length of the flatbuffer.
416        if value.len() < 4 {
417            vortex_bail!("ArrayParts buffer is too short");
418        }
419
420        // We align each buffer individually, so we remove alignment requirements on the buffer.
421        let value = value.aligned(Alignment::none());
422
423        let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
424        if value.len() < 4 + fb_length {
425            vortex_bail!("ArrayParts buffer is too short for flatbuffer");
426        }
427
428        let fb_offset = value.len() - 4 - fb_length;
429        let fb_buffer = value.slice(fb_offset..fb_offset + fb_length);
430        let fb_buffer = FlatBuffer::align_from(fb_buffer);
431
432        let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
433        let fb_root = fb_array.root().vortex_expect("Array must have a root node");
434
435        let mut offset = 0;
436        let buffers: Arc<[ByteBuffer]> = fb_array
437            .buffers()
438            .unwrap_or_default()
439            .iter()
440            .map(|fb_buffer| {
441                // Skip padding
442                offset += fb_buffer.padding() as usize;
443
444                let buffer_len = fb_buffer.length() as usize;
445
446                // Extract a buffer and ensure it's aligned, copying if necessary
447                let buffer = value
448                    .slice(offset..(offset + buffer_len))
449                    .aligned(Alignment::from_exponent(fb_buffer.alignment_exponent()));
450
451                offset += buffer_len;
452                buffer
453            })
454            .collect();
455
456        Ok(ArrayParts {
457            flatbuffer: fb_buffer.clone(),
458            flatbuffer_loc: fb_root._tab.loc(),
459            buffers,
460        })
461    }
462}