vortex_array/
serde.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::fmt::Formatter;
6use std::iter;
7use std::sync::Arc;
8
9use flatbuffers::FlatBufferBuilder;
10use flatbuffers::Follow;
11use flatbuffers::WIPOffset;
12use flatbuffers::root;
13use itertools::Itertools;
14use vortex_buffer::Alignment;
15use vortex_buffer::BufferHandle;
16use vortex_buffer::ByteBuffer;
17use vortex_dtype::DType;
18use vortex_dtype::TryFromBytes;
19use vortex_error::VortexError;
20use vortex_error::VortexExpect;
21use vortex_error::VortexResult;
22use vortex_error::vortex_bail;
23use vortex_error::vortex_err;
24use vortex_error::vortex_panic;
25use vortex_flatbuffers::FlatBuffer;
26use vortex_flatbuffers::FlatBufferRoot;
27use vortex_flatbuffers::ReadFlatBuffer;
28use vortex_flatbuffers::WriteFlatBuffer;
29use vortex_flatbuffers::array as fba;
30use vortex_flatbuffers::array::Compression;
31
32use crate::Array;
33use crate::ArrayContext;
34use crate::ArrayRef;
35use crate::ArrayVisitor;
36use crate::ArrayVisitorExt;
37use crate::stats::StatsSet;
38
39/// Options for serializing an array.
40#[derive(Default, Debug)]
41pub struct SerializeOptions {
42    /// The starting position within an external stream or file. This offset is used to compute
43    /// appropriate padding to enable zero-copy reads.
44    pub offset: usize,
45    /// Whether to include sufficient zero-copy padding.
46    pub include_padding: bool,
47}
48
49impl dyn Array + '_ {
50    /// Serialize the array into a sequence of byte buffers that should be written contiguously.
51    /// This function returns a vec to avoid copying data buffers.
52    ///
53    /// Optionally, padding can be included to guarantee buffer alignment and ensure zero-copy
54    /// reads within the context of an external file or stream. In this case, the alignment of
55    /// the first byte buffer should be respected when writing the buffers to the stream or file.
56    ///
57    /// The format of this blob is a sequence of data buffers, possible with prefixed padding,
58    /// followed by a flatbuffer containing an [`fba::Array`] message, and ending with a
59    /// little-endian u32 describing the length of the flatbuffer message.
60    pub fn serialize(
61        &self,
62        ctx: &ArrayContext,
63        options: &SerializeOptions,
64    ) -> VortexResult<Vec<ByteBuffer>> {
65        // Collect all array buffers
66        let array_buffers = self
67            .depth_first_traversal()
68            .flat_map(|f| f.buffers())
69            .collect::<Vec<_>>();
70
71        // Allocate result buffers, including a possible padding buffer for each.
72        let mut buffers = vec![];
73        let mut fb_buffers = Vec::with_capacity(buffers.capacity());
74
75        // If we're including padding, we need to find the maximum required buffer alignment.
76        let max_alignment = array_buffers
77            .iter()
78            .map(|buf| buf.alignment())
79            .chain(iter::once(FlatBuffer::alignment()))
80            .max()
81            .unwrap_or_else(FlatBuffer::alignment);
82
83        // Create a shared buffer of zeros we can use for padding
84        let zeros = ByteBuffer::zeroed(*max_alignment);
85
86        // We push an empty buffer with the maximum alignment, so then subsequent buffers
87        // will be aligned. For subsequent buffers, we always push a 1-byte alignment.
88        buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
89
90        // Keep track of where we are in the "file" to calculate padding.
91        let mut pos = options.offset;
92
93        // Push all the array buffers with padding as necessary.
94        for buffer in array_buffers {
95            let padding = if options.include_padding {
96                let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
97                if padding > 0 {
98                    pos += padding;
99                    buffers.push(zeros.slice(0..padding));
100                }
101                padding
102            } else {
103                0
104            };
105
106            fb_buffers.push(fba::Buffer::new(
107                u16::try_from(padding).vortex_expect("padding fits into u16"),
108                buffer.alignment().exponent(),
109                Compression::None,
110                u32::try_from(buffer.len())
111                    .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
112            ));
113
114            pos += buffer.len();
115            buffers.push(buffer.aligned(Alignment::none()));
116        }
117
118        // Set up the flatbuffer builder
119        let mut fbb = FlatBufferBuilder::new();
120        let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
121        let fb_root = root.write_flatbuffer(&mut fbb);
122        let fb_buffers = fbb.create_vector(&fb_buffers);
123        let fb_array = fba::Array::create(
124            &mut fbb,
125            &fba::ArrayArgs {
126                root: Some(fb_root),
127                buffers: Some(fb_buffers),
128            },
129        );
130        fbb.finish_minimal(fb_array);
131        let (fb_vec, fb_start) = fbb.collapse();
132        let fb_end = fb_vec.len();
133        let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
134        let fb_length = fb_buffer.len();
135
136        if options.include_padding {
137            let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
138            if padding > 0 {
139                buffers.push(zeros.slice(0..padding));
140            }
141        }
142        buffers.push(fb_buffer);
143
144        // Finally, we write down the u32 length for the flatbuffer.
145        buffers.push(ByteBuffer::from(
146            u32::try_from(fb_length)
147                .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
148                .to_le_bytes()
149                .to_vec(),
150        ));
151
152        Ok(buffers)
153    }
154}
155
156/// A utility struct for creating an [`fba::ArrayNode`] flatbuffer.
157pub struct ArrayNodeFlatBuffer<'a> {
158    ctx: &'a ArrayContext,
159    array: &'a dyn Array,
160    buffer_idx: u16,
161}
162
163impl<'a> ArrayNodeFlatBuffer<'a> {
164    pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn Array) -> VortexResult<Self> {
165        // Depth-first traversal of the array to ensure it supports serialization.
166        for child in array.depth_first_traversal() {
167            if child.metadata()?.is_none() {
168                vortex_bail!(
169                    "Array {} does not support serialization",
170                    child.encoding_id()
171                );
172            }
173        }
174        let n_buffers_recursive = array.nbuffers_recursive();
175        if n_buffers_recursive > u16::MAX as usize {
176            vortex_bail!(
177                "Array and all descendent arrays can have at most u16::MAX buffers: {}",
178                n_buffers_recursive
179            );
180        };
181        Ok(Self {
182            ctx,
183            array,
184            buffer_idx: 0,
185        })
186    }
187}
188
189impl FlatBufferRoot for ArrayNodeFlatBuffer<'_> {}
190
191impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
192    type Target<'t> = fba::ArrayNode<'t>;
193
194    fn write_flatbuffer<'fb>(
195        &self,
196        fbb: &mut FlatBufferBuilder<'fb>,
197    ) -> WIPOffset<Self::Target<'fb>> {
198        let encoding = self.ctx.encoding_idx(&self.array.encoding());
199        let metadata = self
200            .array
201            .metadata()
202            // TODO(ngates): add try_write_flatbuffer
203            .vortex_expect("Failed to serialize metadata")
204            .vortex_expect("Validated that all arrays support serialization");
205        let metadata = Some(fbb.create_vector(metadata.as_slice()));
206
207        // Assign buffer indices for all child arrays.
208        let nbuffers = u16::try_from(self.array.nbuffers())
209            .vortex_expect("Array can have at most u16::MAX buffers");
210        let mut child_buffer_idx = self.buffer_idx + nbuffers;
211
212        let children = &self
213            .array
214            .children()
215            .iter()
216            .map(|child| {
217                // Update the number of buffers required.
218                let msg = ArrayNodeFlatBuffer {
219                    ctx: self.ctx,
220                    array: child,
221                    buffer_idx: child_buffer_idx,
222                }
223                .write_flatbuffer(fbb);
224                child_buffer_idx = u16::try_from(child.nbuffers_recursive())
225                    .ok()
226                    .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
227                    .vortex_expect("Too many buffers (u16) for Array");
228                msg
229            })
230            .collect::<Vec<_>>();
231        let children = Some(fbb.create_vector(children));
232
233        let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
234        let stats = Some(self.array.statistics().write_flatbuffer(fbb));
235
236        fba::ArrayNode::create(
237            fbb,
238            &fba::ArrayNodeArgs {
239                encoding,
240                metadata,
241                children,
242                buffers,
243                stats,
244            },
245        )
246    }
247}
248
249/// To minimize the serialized form, arrays do not persist their own dtype and length. Instead,
250/// parent arrays pass this information down during deserialization.
251pub trait ArrayChildren {
252    /// Returns the nth child of the array with the given dtype and length.
253    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
254
255    /// The number of children.
256    fn len(&self) -> usize;
257
258    /// Returns true if there are no children.
259    fn is_empty(&self) -> bool {
260        self.len() == 0
261    }
262}
263
264/// [`ArrayParts`] represents a parsed but not-yet-decoded deserialized [`Array`].
265/// It contains all the information from the serialized form, without anything extra. i.e.
266/// it is missing a [`DType`] and `len`, and the `encoding_id` is not yet resolved to a concrete
267/// vtable.
268///
269/// An [`ArrayParts`] can be fully decoded into an [`ArrayRef`] using the `decode` function.
270#[derive(Clone)]
271pub struct ArrayParts {
272    // Typed as fb::ArrayNode
273    flatbuffer: FlatBuffer,
274    // The location of the current fb::ArrayNode
275    flatbuffer_loc: usize,
276    buffers: Arc<[BufferHandle]>,
277}
278
279impl Debug for ArrayParts {
280    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
281        f.debug_struct("ArrayParts")
282            .field("encoding_id", &self.encoding_id())
283            .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
284            .field(
285                "buffers",
286                &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
287            )
288            .field("metadata", &self.metadata())
289            .finish()
290    }
291}
292
293impl ArrayParts {
294    /// Decode an [`ArrayParts`] into an [`ArrayRef`].
295    pub fn decode(&self, ctx: &ArrayContext, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
296        let encoding_id = self.flatbuffer().encoding();
297        let vtable = ctx
298            .lookup_encoding(encoding_id)
299            .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
300
301        let buffers: Vec<_> = (0..self.nbuffers())
302            .map(|idx| self.buffer(idx))
303            .try_collect()?;
304
305        let children = ArrayPartsChildren { parts: self, ctx };
306
307        let decoded = vtable.build(dtype, len, self.metadata(), &buffers, &children)?;
308
309        assert_eq!(
310            decoded.len(),
311            len,
312            "Array decoded from {} has incorrect length {}, expected {}",
313            vtable.id(),
314            decoded.len(),
315            len
316        );
317        assert_eq!(
318            decoded.dtype(),
319            dtype,
320            "Array decoded from {} has incorrect dtype {}, expected {}",
321            vtable.id(),
322            decoded.dtype(),
323            dtype,
324        );
325        assert_eq!(
326            decoded.encoding_id(),
327            vtable.id(),
328            "Array decoded from {} has incorrect encoding {}",
329            vtable.id(),
330            decoded.encoding_id(),
331        );
332
333        // Populate statistics from the serialized array.
334        if let Some(stats) = self.flatbuffer().stats() {
335            let decoded_statistics = decoded.statistics();
336            StatsSet::read_flatbuffer(&stats)?
337                .into_iter()
338                .for_each(|(stat, val)| decoded_statistics.set(stat, val));
339        }
340
341        Ok(decoded)
342    }
343
344    /// Returns the array encoding.
345    pub fn encoding_id(&self) -> u16 {
346        self.flatbuffer().encoding()
347    }
348
349    /// Returns the array metadata bytes.
350    pub fn metadata(&self) -> &[u8] {
351        self.flatbuffer()
352            .metadata()
353            .map(|metadata| metadata.bytes())
354            .unwrap_or(&[])
355    }
356
357    /// Returns the number of children.
358    pub fn nchildren(&self) -> usize {
359        self.flatbuffer()
360            .children()
361            .map_or(0, |children| children.len())
362    }
363
364    /// Returns the nth child of the array.
365    pub fn child(&self, idx: usize) -> ArrayParts {
366        let children = self
367            .flatbuffer()
368            .children()
369            .vortex_expect("Expected array to have children");
370        if idx >= children.len() {
371            vortex_panic!(
372                "Invalid child index {} for array with {} children",
373                idx,
374                children.len()
375            );
376        }
377        self.with_root(children.get(idx))
378    }
379
380    /// Returns the number of buffers.
381    pub fn nbuffers(&self) -> usize {
382        self.flatbuffer()
383            .buffers()
384            .map_or(0, |buffers| buffers.len())
385    }
386
387    /// Returns the nth buffer of the current array.
388    pub fn buffer(&self, idx: usize) -> VortexResult<BufferHandle> {
389        let buffer_idx = self
390            .flatbuffer()
391            .buffers()
392            .ok_or_else(|| vortex_err!("Array has no buffers"))?
393            .get(idx);
394        self.buffers
395            .get(buffer_idx as usize)
396            .cloned()
397            .ok_or_else(|| {
398                vortex_err!(
399                    "Invalid buffer index {} for array with {} buffers",
400                    buffer_idx,
401                    self.nbuffers()
402                )
403            })
404    }
405
406    /// Returns the root ArrayNode flatbuffer.
407    fn flatbuffer(&self) -> fba::ArrayNode<'_> {
408        unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
409    }
410
411    /// Returns a new [`ArrayParts`] with the given node as the root
412    // TODO(ngates): we may want a wrapper that avoids this clone.
413    fn with_root(&self, root: fba::ArrayNode) -> Self {
414        let mut this = self.clone();
415        this.flatbuffer_loc = root._tab.loc();
416        this
417    }
418
419    /// Create an [`ArrayParts`] from a pre-existing flatbuffer (ArrayNode) and a segment containing
420    /// only the data buffers (without the flatbuffer suffix).
421    ///
422    /// This is used when the flatbuffer is stored separately in layout metadata (e.g., when
423    /// `FLAT_LAYOUT_INLINE_ARRAY_NODE` is enabled).
424    pub fn from_flatbuffer_and_segment(
425        array_tree: ByteBuffer,
426        segment: BufferHandle,
427    ) -> VortexResult<Self> {
428        // TODO: this can also work with device buffers.
429        let segment = segment.try_to_bytes()?;
430        // We align each buffer individually, so we remove alignment requirements on the buffer.
431        let segment = segment.aligned(Alignment::none());
432
433        let fb_buffer = FlatBuffer::align_from(array_tree);
434
435        // Parse the flatbuffer to extract buffer descriptors and root location.
436        let (flatbuffer_loc, buffers) = {
437            let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
438            let fb_root = fb_array.root().vortex_expect("Array must have a root node");
439            let flatbuffer_loc = fb_root._tab.loc();
440
441            let mut offset = 0;
442            let buffers: Arc<[_]> = fb_array
443                .buffers()
444                .unwrap_or_default()
445                .iter()
446                .map(|fb_buf| {
447                    // Skip padding
448                    offset += fb_buf.padding() as usize;
449
450                    let buffer_len = fb_buf.length() as usize;
451
452                    // Extract a buffer and ensure it's aligned, copying if necessary
453                    let buffer = segment
454                        .slice(offset..(offset + buffer_len))
455                        .aligned(Alignment::from_exponent(fb_buf.alignment_exponent()));
456
457                    offset += buffer_len;
458                    BufferHandle::Buffer(buffer)
459                })
460                .collect();
461
462            (flatbuffer_loc, buffers)
463        };
464
465        Ok(ArrayParts {
466            flatbuffer: fb_buffer,
467            flatbuffer_loc,
468            buffers,
469        })
470    }
471}
472
473struct ArrayPartsChildren<'a> {
474    parts: &'a ArrayParts,
475    ctx: &'a ArrayContext,
476}
477
478impl ArrayChildren for ArrayPartsChildren<'_> {
479    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
480        self.parts.child(index).decode(self.ctx, dtype, len)
481    }
482
483    fn len(&self) -> usize {
484        self.parts.nchildren()
485    }
486}
487
488impl TryFrom<ByteBuffer> for ArrayParts {
489    type Error = VortexError;
490
491    fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
492        // The final 4 bytes contain the length of the flatbuffer.
493        if value.len() < 4 {
494            vortex_bail!("ArrayParts buffer is too short");
495        }
496
497        // We align each buffer individually, so we remove alignment requirements on the buffer.
498        let value = value.aligned(Alignment::none());
499
500        let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
501        if value.len() < 4 + fb_length {
502            vortex_bail!("ArrayParts buffer is too short for flatbuffer");
503        }
504
505        let fb_offset = value.len() - 4 - fb_length;
506        let fb_buffer = value.slice(fb_offset..fb_offset + fb_length);
507        let fb_buffer = FlatBuffer::align_from(fb_buffer);
508
509        let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
510        let fb_root = fb_array.root().vortex_expect("Array must have a root node");
511
512        let mut offset = 0;
513        let buffers: Arc<[_]> = fb_array
514            .buffers()
515            .unwrap_or_default()
516            .iter()
517            .map(|fb_buffer| {
518                // Skip padding
519                offset += fb_buffer.padding() as usize;
520
521                let buffer_len = fb_buffer.length() as usize;
522
523                // Extract a buffer and ensure it's aligned, copying if necessary
524                let buffer = value
525                    .slice(offset..(offset + buffer_len))
526                    .aligned(Alignment::from_exponent(fb_buffer.alignment_exponent()));
527
528                offset += buffer_len;
529                BufferHandle::Buffer(buffer)
530            })
531            .collect();
532
533        Ok(ArrayParts {
534            flatbuffer: fb_buffer.clone(),
535            flatbuffer_loc: fb_root._tab.loc(),
536            buffers,
537        })
538    }
539}
540
541impl TryFrom<BufferHandle> for ArrayParts {
542    type Error = VortexError;
543
544    fn try_from(value: BufferHandle) -> Result<Self, Self::Error> {
545        Self::try_from(value.try_to_bytes()?)
546    }
547}