vortex_array/
serde.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::fmt::Formatter;
6use std::iter;
7use std::sync::Arc;
8
9use flatbuffers::FlatBufferBuilder;
10use flatbuffers::Follow;
11use flatbuffers::WIPOffset;
12use flatbuffers::root;
13use itertools::Itertools;
14use vortex_buffer::Alignment;
15use vortex_buffer::ByteBuffer;
16use vortex_dtype::DType;
17use vortex_dtype::TryFromBytes;
18use vortex_error::VortexError;
19use vortex_error::VortexExpect;
20use vortex_error::VortexResult;
21use vortex_error::vortex_bail;
22use vortex_error::vortex_err;
23use vortex_error::vortex_panic;
24use vortex_flatbuffers::FlatBuffer;
25use vortex_flatbuffers::FlatBufferRoot;
26use vortex_flatbuffers::ReadFlatBuffer;
27use vortex_flatbuffers::WriteFlatBuffer;
28use vortex_flatbuffers::array as fba;
29use vortex_flatbuffers::array::Compression;
30
31use crate::Array;
32use crate::ArrayContext;
33use crate::ArrayRef;
34use crate::ArrayVisitor;
35use crate::ArrayVisitorExt;
36use crate::buffer::BufferHandle;
37use crate::stats::StatsSet;
38
39/// Options for serializing an array.
40#[derive(Default, Debug)]
41pub struct SerializeOptions {
42    /// The starting position within an external stream or file. This offset is used to compute
43    /// appropriate padding to enable zero-copy reads.
44    pub offset: usize,
45    /// Whether to include sufficient zero-copy padding.
46    pub include_padding: bool,
47}
48
49impl dyn Array + '_ {
50    /// Serialize the array into a sequence of byte buffers that should be written contiguously.
51    /// This function returns a vec to avoid copying data buffers.
52    ///
53    /// Optionally, padding can be included to guarantee buffer alignment and ensure zero-copy
54    /// reads within the context of an external file or stream. In this case, the alignment of
55    /// the first byte buffer should be respected when writing the buffers to the stream or file.
56    ///
57    /// The format of this blob is a sequence of data buffers, possible with prefixed padding,
58    /// followed by a flatbuffer containing an [`fba::Array`] message, and ending with a
59    /// little-endian u32 describing the length of the flatbuffer message.
60    pub fn serialize(
61        &self,
62        ctx: &ArrayContext,
63        options: &SerializeOptions,
64    ) -> VortexResult<Vec<ByteBuffer>> {
65        // Collect all array buffers
66        let array_buffers = self
67            .depth_first_traversal()
68            .flat_map(|f| f.buffers())
69            .collect::<Vec<_>>();
70
71        // Allocate result buffers, including a possible padding buffer for each.
72        let mut buffers = vec![];
73        let mut fb_buffers = Vec::with_capacity(buffers.capacity());
74
75        // If we're including padding, we need to find the maximum required buffer alignment.
76        let max_alignment = array_buffers
77            .iter()
78            .map(|buf| buf.alignment())
79            .chain(iter::once(FlatBuffer::alignment()))
80            .max()
81            .unwrap_or_else(FlatBuffer::alignment);
82
83        // Create a shared buffer of zeros we can use for padding
84        let zeros = ByteBuffer::zeroed(*max_alignment);
85
86        // We push an empty buffer with the maximum alignment, so then subsequent buffers
87        // will be aligned. For subsequent buffers, we always push a 1-byte alignment.
88        buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
89
90        // Keep track of where we are in the "file" to calculate padding.
91        let mut pos = options.offset;
92
93        // Push all the array buffers with padding as necessary.
94        for buffer in array_buffers {
95            let padding = if options.include_padding {
96                let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
97                if padding > 0 {
98                    pos += padding;
99                    buffers.push(zeros.slice(0..padding));
100                }
101                padding
102            } else {
103                0
104            };
105
106            fb_buffers.push(fba::Buffer::new(
107                u16::try_from(padding).vortex_expect("padding fits into u16"),
108                buffer.alignment().exponent(),
109                Compression::None,
110                u32::try_from(buffer.len())
111                    .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
112            ));
113
114            pos += buffer.len();
115            buffers.push(buffer.aligned(Alignment::none()));
116        }
117
118        // Set up the flatbuffer builder
119        let mut fbb = FlatBufferBuilder::new();
120        let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
121        let fb_root = root.write_flatbuffer(&mut fbb);
122        let fb_buffers = fbb.create_vector(&fb_buffers);
123        let fb_array = fba::Array::create(
124            &mut fbb,
125            &fba::ArrayArgs {
126                root: Some(fb_root),
127                buffers: Some(fb_buffers),
128            },
129        );
130        fbb.finish_minimal(fb_array);
131        let (fb_vec, fb_start) = fbb.collapse();
132        let fb_end = fb_vec.len();
133        let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
134        let fb_length = fb_buffer.len();
135
136        if options.include_padding {
137            let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
138            if padding > 0 {
139                buffers.push(zeros.slice(0..padding));
140            }
141        }
142        buffers.push(fb_buffer);
143
144        // Finally, we write down the u32 length for the flatbuffer.
145        buffers.push(ByteBuffer::from(
146            u32::try_from(fb_length)
147                .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
148                .to_le_bytes()
149                .to_vec(),
150        ));
151
152        Ok(buffers)
153    }
154}
155
156/// A utility struct for creating an [`fba::ArrayNode`] flatbuffer.
157pub struct ArrayNodeFlatBuffer<'a> {
158    ctx: &'a ArrayContext,
159    array: &'a dyn Array,
160    buffer_idx: u16,
161}
162
163impl<'a> ArrayNodeFlatBuffer<'a> {
164    pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn Array) -> VortexResult<Self> {
165        // Depth-first traversal of the array to ensure it supports serialization.
166        for child in array.depth_first_traversal() {
167            if child.metadata()?.is_none() {
168                vortex_bail!(
169                    "Array {} does not support serialization",
170                    child.encoding_id()
171                );
172            }
173        }
174        let n_buffers_recursive = array.nbuffers_recursive();
175        if n_buffers_recursive > u16::MAX as usize {
176            vortex_bail!(
177                "Array and all descendent arrays can have at most u16::MAX buffers: {}",
178                n_buffers_recursive
179            );
180        };
181        Ok(Self {
182            ctx,
183            array,
184            buffer_idx: 0,
185        })
186    }
187}
188
189impl FlatBufferRoot for ArrayNodeFlatBuffer<'_> {}
190
191impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
192    type Target<'t> = fba::ArrayNode<'t>;
193
194    fn write_flatbuffer<'fb>(
195        &self,
196        fbb: &mut FlatBufferBuilder<'fb>,
197    ) -> WIPOffset<Self::Target<'fb>> {
198        let encoding = self.ctx.encoding_idx(&self.array.encoding());
199        let metadata = self
200            .array
201            .metadata()
202            // TODO(ngates): add try_write_flatbuffer
203            .vortex_expect("Failed to serialize metadata")
204            .vortex_expect("Validated that all arrays support serialization");
205        let metadata = Some(fbb.create_vector(metadata.as_slice()));
206
207        // Assign buffer indices for all child arrays.
208        let nbuffers = u16::try_from(self.array.nbuffers())
209            .vortex_expect("Array can have at most u16::MAX buffers");
210        let mut child_buffer_idx = self.buffer_idx + nbuffers;
211
212        let children = &self
213            .array
214            .children()
215            .iter()
216            .map(|child| {
217                // Update the number of buffers required.
218                let msg = ArrayNodeFlatBuffer {
219                    ctx: self.ctx,
220                    array: child,
221                    buffer_idx: child_buffer_idx,
222                }
223                .write_flatbuffer(fbb);
224                child_buffer_idx = u16::try_from(child.nbuffers_recursive())
225                    .ok()
226                    .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
227                    .vortex_expect("Too many buffers (u16) for Array");
228                msg
229            })
230            .collect::<Vec<_>>();
231        let children = Some(fbb.create_vector(children));
232
233        let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
234        let stats = Some(self.array.statistics().write_flatbuffer(fbb));
235
236        fba::ArrayNode::create(
237            fbb,
238            &fba::ArrayNodeArgs {
239                encoding,
240                metadata,
241                children,
242                buffers,
243                stats,
244            },
245        )
246    }
247}
248
249/// To minimize the serialized form, arrays do not persist their own dtype and length. Instead,
250/// parent arrays pass this information down during deserialization.
251pub trait ArrayChildren {
252    /// Returns the nth child of the array with the given dtype and length.
253    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
254
255    /// The number of children.
256    fn len(&self) -> usize;
257
258    /// Returns true if there are no children.
259    fn is_empty(&self) -> bool {
260        self.len() == 0
261    }
262}
263
264/// [`ArrayParts`] represents a parsed but not-yet-decoded deserialized [`Array`].
265/// It contains all the information from the serialized form, without anything extra. i.e.
266/// it is missing a [`DType`] and `len`, and the `encoding_id` is not yet resolved to a concrete
267/// vtable.
268///
269/// An [`ArrayParts`] can be fully decoded into an [`ArrayRef`] using the `decode` function.
270#[derive(Clone)]
271pub struct ArrayParts {
272    // Typed as fb::ArrayNode
273    flatbuffer: FlatBuffer,
274    // The location of the current fb::ArrayNode
275    flatbuffer_loc: usize,
276    buffers: Arc<[BufferHandle]>,
277}
278
279impl Debug for ArrayParts {
280    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
281        f.debug_struct("ArrayParts")
282            .field("encoding_id", &self.encoding_id())
283            .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
284            .field(
285                "buffers",
286                &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
287            )
288            .field("metadata", &self.metadata())
289            .finish()
290    }
291}
292
293impl ArrayParts {
294    /// Decode an [`ArrayParts`] into an [`ArrayRef`].
295    pub fn decode(&self, ctx: &ArrayContext, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
296        let encoding_id = self.flatbuffer().encoding();
297        let vtable = ctx
298            .lookup_encoding(encoding_id)
299            .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
300
301        let buffers: Vec<_> = (0..self.nbuffers())
302            .map(|idx| self.buffer(idx))
303            .try_collect()?;
304
305        let children = ArrayPartsChildren { parts: self, ctx };
306
307        let decoded = vtable
308            .as_dyn()
309            .build(dtype, len, self.metadata(), &buffers, &children)?;
310
311        assert_eq!(
312            decoded.len(),
313            len,
314            "Array decoded from {} has incorrect length {}, expected {}",
315            vtable.id(),
316            decoded.len(),
317            len
318        );
319        assert_eq!(
320            decoded.dtype(),
321            dtype,
322            "Array decoded from {} has incorrect dtype {}, expected {}",
323            vtable.id(),
324            decoded.dtype(),
325            dtype,
326        );
327        assert_eq!(
328            decoded.encoding_id(),
329            vtable.id(),
330            "Array decoded from {} has incorrect encoding {}",
331            vtable.id(),
332            decoded.encoding_id(),
333        );
334
335        // Populate statistics from the serialized array.
336        if let Some(stats) = self.flatbuffer().stats() {
337            let decoded_statistics = decoded.statistics();
338            StatsSet::read_flatbuffer(&stats)?
339                .into_iter()
340                .for_each(|(stat, val)| decoded_statistics.set(stat, val));
341        }
342
343        Ok(decoded)
344    }
345
346    /// Returns the array encoding.
347    pub fn encoding_id(&self) -> u16 {
348        self.flatbuffer().encoding()
349    }
350
351    /// Returns the array metadata bytes.
352    pub fn metadata(&self) -> &[u8] {
353        self.flatbuffer()
354            .metadata()
355            .map(|metadata| metadata.bytes())
356            .unwrap_or(&[])
357    }
358
359    /// Returns the number of children.
360    pub fn nchildren(&self) -> usize {
361        self.flatbuffer()
362            .children()
363            .map_or(0, |children| children.len())
364    }
365
366    /// Returns the nth child of the array.
367    pub fn child(&self, idx: usize) -> ArrayParts {
368        let children = self
369            .flatbuffer()
370            .children()
371            .vortex_expect("Expected array to have children");
372        if idx >= children.len() {
373            vortex_panic!(
374                "Invalid child index {} for array with {} children",
375                idx,
376                children.len()
377            );
378        }
379        self.with_root(children.get(idx))
380    }
381
382    /// Returns the number of buffers.
383    pub fn nbuffers(&self) -> usize {
384        self.flatbuffer()
385            .buffers()
386            .map_or(0, |buffers| buffers.len())
387    }
388
389    /// Returns the nth buffer of the current array.
390    pub fn buffer(&self, idx: usize) -> VortexResult<BufferHandle> {
391        let buffer_idx = self
392            .flatbuffer()
393            .buffers()
394            .ok_or_else(|| vortex_err!("Array has no buffers"))?
395            .get(idx);
396        self.buffers
397            .get(buffer_idx as usize)
398            .cloned()
399            .ok_or_else(|| {
400                vortex_err!(
401                    "Invalid buffer index {} for array with {} buffers",
402                    buffer_idx,
403                    self.nbuffers()
404                )
405            })
406    }
407
408    /// Returns the buffer lengths as stored in the flatbuffer metadata.
409    ///
410    /// This reads the buffer descriptors from the flatbuffer, which contain the
411    /// serialized length of each buffer. This is useful for displaying buffer sizes
412    /// without needing to access the actual buffer data.
413    pub fn buffer_lengths(&self) -> Vec<usize> {
414        let fb_array = root::<fba::Array>(self.flatbuffer.as_ref())
415            .vortex_expect("ArrayParts flatbuffer must be a valid Array");
416        fb_array
417            .buffers()
418            .map(|buffers| buffers.iter().map(|b| b.length() as usize).collect())
419            .unwrap_or_default()
420    }
421
422    /// Create an [`ArrayParts`] from a raw array tree flatbuffer (metadata only).
423    ///
424    /// This constructor creates an `ArrayParts` with no buffer data, useful for
425    /// inspecting the metadata when the actual buffer data is not needed
426    /// (e.g., displaying buffer sizes from inlined array tree metadata).
427    ///
428    /// Note: Calling `buffer()` on the returned `ArrayParts` will fail since
429    /// no actual buffer data is available.
430    pub fn from_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<Self> {
431        let fb_buffer = FlatBuffer::align_from(array_tree.into());
432        let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
433        let fb_root = fb_array
434            .root()
435            .ok_or_else(|| vortex_err!("Array must have a root node"))?;
436        let flatbuffer_loc = fb_root._tab.loc();
437
438        Ok(ArrayParts {
439            flatbuffer: fb_buffer,
440            flatbuffer_loc,
441            buffers: Arc::new([]),
442        })
443    }
444
445    /// Returns the root ArrayNode flatbuffer.
446    fn flatbuffer(&self) -> fba::ArrayNode<'_> {
447        unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
448    }
449
450    /// Returns a new [`ArrayParts`] with the given node as the root
451    // TODO(ngates): we may want a wrapper that avoids this clone.
452    fn with_root(&self, root: fba::ArrayNode) -> Self {
453        let mut this = self.clone();
454        this.flatbuffer_loc = root._tab.loc();
455        this
456    }
457
458    /// Create an [`ArrayParts`] from a pre-existing flatbuffer (ArrayNode) and a segment containing
459    /// only the data buffers (without the flatbuffer suffix).
460    ///
461    /// This is used when the flatbuffer is stored separately in layout metadata (e.g., when
462    /// `FLAT_LAYOUT_INLINE_ARRAY_NODE` is enabled).
463    pub fn from_flatbuffer_and_segment(
464        array_tree: ByteBuffer,
465        segment: BufferHandle,
466    ) -> VortexResult<Self> {
467        // TODO: this can also work with device buffers.
468        let segment = segment.try_to_bytes()?;
469        // We align each buffer individually, so we remove alignment requirements on the buffer.
470        let segment = segment.aligned(Alignment::none());
471
472        let fb_buffer = FlatBuffer::align_from(array_tree);
473
474        // Parse the flatbuffer to extract buffer descriptors and root location.
475        let (flatbuffer_loc, buffers) = {
476            let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
477            let fb_root = fb_array.root().vortex_expect("Array must have a root node");
478            let flatbuffer_loc = fb_root._tab.loc();
479
480            let mut offset = 0;
481            let buffers: Arc<[_]> = fb_array
482                .buffers()
483                .unwrap_or_default()
484                .iter()
485                .map(|fb_buf| {
486                    // Skip padding
487                    offset += fb_buf.padding() as usize;
488
489                    let buffer_len = fb_buf.length() as usize;
490
491                    // Extract a buffer and ensure it's aligned, copying if necessary
492                    let buffer = segment
493                        .slice(offset..(offset + buffer_len))
494                        .aligned(Alignment::from_exponent(fb_buf.alignment_exponent()));
495
496                    offset += buffer_len;
497                    BufferHandle::Host(buffer)
498                })
499                .collect();
500
501            (flatbuffer_loc, buffers)
502        };
503
504        Ok(ArrayParts {
505            flatbuffer: fb_buffer,
506            flatbuffer_loc,
507            buffers,
508        })
509    }
510}
511
512struct ArrayPartsChildren<'a> {
513    parts: &'a ArrayParts,
514    ctx: &'a ArrayContext,
515}
516
517impl ArrayChildren for ArrayPartsChildren<'_> {
518    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
519        self.parts.child(index).decode(self.ctx, dtype, len)
520    }
521
522    fn len(&self) -> usize {
523        self.parts.nchildren()
524    }
525}
526
527impl TryFrom<ByteBuffer> for ArrayParts {
528    type Error = VortexError;
529
530    fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
531        // The final 4 bytes contain the length of the flatbuffer.
532        if value.len() < 4 {
533            vortex_bail!("ArrayParts buffer is too short");
534        }
535
536        // We align each buffer individually, so we remove alignment requirements on the buffer.
537        let value = value.aligned(Alignment::none());
538
539        let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
540        if value.len() < 4 + fb_length {
541            vortex_bail!("ArrayParts buffer is too short for flatbuffer");
542        }
543
544        let fb_offset = value.len() - 4 - fb_length;
545        let fb_buffer = value.slice(fb_offset..fb_offset + fb_length);
546        let fb_buffer = FlatBuffer::align_from(fb_buffer);
547
548        let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
549        let fb_root = fb_array.root().vortex_expect("Array must have a root node");
550
551        let mut offset = 0;
552        let buffers: Arc<[_]> = fb_array
553            .buffers()
554            .unwrap_or_default()
555            .iter()
556            .map(|fb_buffer| {
557                // Skip padding
558                offset += fb_buffer.padding() as usize;
559
560                let buffer_len = fb_buffer.length() as usize;
561
562                // Extract a buffer and ensure it's aligned, copying if necessary
563                let buffer = value
564                    .slice(offset..(offset + buffer_len))
565                    .aligned(Alignment::from_exponent(fb_buffer.alignment_exponent()));
566
567                offset += buffer_len;
568                BufferHandle::Host(buffer)
569            })
570            .collect();
571
572        Ok(ArrayParts {
573            flatbuffer: fb_buffer.clone(),
574            flatbuffer_loc: fb_root._tab.loc(),
575            buffers,
576        })
577    }
578}
579
580impl TryFrom<BufferHandle> for ArrayParts {
581    type Error = VortexError;
582
583    fn try_from(value: BufferHandle) -> Result<Self, Self::Error> {
584        Self::try_from(value.try_to_bytes()?)
585    }
586}