Skip to main content

vortex_array/
serde.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::borrow::Cow;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::iter;
8use std::sync::Arc;
9
10use flatbuffers::FlatBufferBuilder;
11use flatbuffers::Follow;
12use flatbuffers::WIPOffset;
13use flatbuffers::root;
14use vortex_buffer::Alignment;
15use vortex_buffer::ByteBuffer;
16use vortex_error::VortexError;
17use vortex_error::VortexExpect;
18use vortex_error::VortexResult;
19use vortex_error::vortex_bail;
20use vortex_error::vortex_err;
21use vortex_error::vortex_panic;
22use vortex_flatbuffers::FlatBuffer;
23use vortex_flatbuffers::WriteFlatBuffer;
24use vortex_flatbuffers::array as fba;
25use vortex_flatbuffers::array::Compression;
26use vortex_session::VortexSession;
27use vortex_utils::aliases::hash_map::HashMap;
28
29use crate::Array;
30use crate::ArrayContext;
31use crate::ArrayRef;
32use crate::ArrayVisitor;
33use crate::ArrayVisitorExt;
34use crate::buffer::BufferHandle;
35use crate::dtype::DType;
36use crate::dtype::TryFromBytes;
37use crate::session::ArraySessionExt;
38use crate::stats::StatsSet;
39
40/// Options for serializing an array.
41#[derive(Default, Debug)]
42pub struct SerializeOptions {
43    /// The starting position within an external stream or file. This offset is used to compute
44    /// appropriate padding to enable zero-copy reads.
45    pub offset: usize,
46    /// Whether to include sufficient zero-copy padding.
47    pub include_padding: bool,
48}
49
50impl dyn Array + '_ {
51    /// Serialize the array into a sequence of byte buffers that should be written contiguously.
52    /// This function returns a vec to avoid copying data buffers.
53    ///
54    /// Optionally, padding can be included to guarantee buffer alignment and ensure zero-copy
55    /// reads within the context of an external file or stream. In this case, the alignment of
56    /// the first byte buffer should be respected when writing the buffers to the stream or file.
57    ///
58    /// The format of this blob is a sequence of data buffers, possible with prefixed padding,
59    /// followed by a flatbuffer containing an [`fba::Array`] message, and ending with a
60    /// little-endian u32 describing the length of the flatbuffer message.
61    pub fn serialize(
62        &self,
63        ctx: &ArrayContext,
64        options: &SerializeOptions,
65    ) -> VortexResult<Vec<ByteBuffer>> {
66        // Collect all array buffers
67        let array_buffers = self
68            .depth_first_traversal()
69            .flat_map(|f| f.buffers())
70            .collect::<Vec<_>>();
71
72        // Allocate result buffers, including a possible padding buffer for each.
73        let mut buffers = vec![];
74        let mut fb_buffers = Vec::with_capacity(buffers.capacity());
75
76        // If we're including padding, we need to find the maximum required buffer alignment.
77        let max_alignment = array_buffers
78            .iter()
79            .map(|buf| buf.alignment())
80            .chain(iter::once(FlatBuffer::alignment()))
81            .max()
82            .unwrap_or_else(FlatBuffer::alignment);
83
84        // Create a shared buffer of zeros we can use for padding
85        let zeros = ByteBuffer::zeroed(*max_alignment);
86
87        // We push an empty buffer with the maximum alignment, so then subsequent buffers
88        // will be aligned. For subsequent buffers, we always push a 1-byte alignment.
89        buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
90
91        // Keep track of where we are in the "file" to calculate padding.
92        let mut pos = options.offset;
93
94        // Push all the array buffers with padding as necessary.
95        for buffer in array_buffers {
96            let padding = if options.include_padding {
97                let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
98                if padding > 0 {
99                    pos += padding;
100                    buffers.push(zeros.slice(0..padding));
101                }
102                padding
103            } else {
104                0
105            };
106
107            fb_buffers.push(fba::Buffer::new(
108                u16::try_from(padding).vortex_expect("padding fits into u16"),
109                buffer.alignment().exponent(),
110                Compression::None,
111                u32::try_from(buffer.len())
112                    .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
113            ));
114
115            pos += buffer.len();
116            buffers.push(buffer.aligned(Alignment::none()));
117        }
118
119        // Set up the flatbuffer builder
120        let mut fbb = FlatBufferBuilder::new();
121
122        let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
123        let fb_root = root.try_write_flatbuffer(&mut fbb)?;
124
125        let fb_buffers = fbb.create_vector(&fb_buffers);
126        let fb_array = fba::Array::create(
127            &mut fbb,
128            &fba::ArrayArgs {
129                root: Some(fb_root),
130                buffers: Some(fb_buffers),
131            },
132        );
133        fbb.finish_minimal(fb_array);
134        let (fb_vec, fb_start) = fbb.collapse();
135        let fb_end = fb_vec.len();
136        let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
137        let fb_length = fb_buffer.len();
138
139        if options.include_padding {
140            let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
141            if padding > 0 {
142                buffers.push(zeros.slice(0..padding));
143            }
144        }
145        buffers.push(fb_buffer);
146
147        // Finally, we write down the u32 length for the flatbuffer.
148        buffers.push(ByteBuffer::from(
149            u32::try_from(fb_length)
150                .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
151                .to_le_bytes()
152                .to_vec(),
153        ));
154
155        Ok(buffers)
156    }
157}
158
159/// A utility struct for creating an [`fba::ArrayNode`] flatbuffer.
160pub struct ArrayNodeFlatBuffer<'a> {
161    ctx: &'a ArrayContext,
162    array: &'a dyn Array,
163    buffer_idx: u16,
164}
165
166impl<'a> ArrayNodeFlatBuffer<'a> {
167    pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn Array) -> VortexResult<Self> {
168        // Depth-first traversal of the array to ensure it supports serialization.
169        for child in array.depth_first_traversal() {
170            if child.metadata()?.is_none() {
171                vortex_bail!(
172                    "Array {} does not support serialization",
173                    child.encoding_id()
174                );
175            }
176        }
177        let n_buffers_recursive = array.nbuffers_recursive();
178        if n_buffers_recursive > u16::MAX as usize {
179            vortex_bail!(
180                "Array and all descendent arrays can have at most u16::MAX buffers: {}",
181                n_buffers_recursive
182            );
183        };
184        Ok(Self {
185            ctx,
186            array,
187            buffer_idx: 0,
188        })
189    }
190
191    pub fn try_write_flatbuffer<'fb>(
192        &self,
193        fbb: &mut FlatBufferBuilder<'fb>,
194    ) -> VortexResult<WIPOffset<fba::ArrayNode<'fb>>> {
195        let encoding_idx = self
196            .ctx
197            .intern(&self.array.encoding_id())
198            // TODO(ngates): write_flatbuffer should return a result if this can fail.
199            .ok_or_else(|| {
200                vortex_err!(
201                    "Array encoding {} not permitted by ctx",
202                    self.array.encoding_id()
203                )
204            })?;
205
206        let metadata = self.array.metadata()?.ok_or_else(|| {
207            vortex_err!(
208                "Array {} does not support serialization",
209                self.array.encoding_id()
210            )
211        })?;
212        let metadata = Some(fbb.create_vector(metadata.as_slice()));
213
214        // Assign buffer indices for all child arrays.
215        let nbuffers = u16::try_from(self.array.nbuffers())
216            .map_err(|_| vortex_err!("Array can have at most u16::MAX buffers"))?;
217        let mut child_buffer_idx = self.buffer_idx + nbuffers;
218
219        let children = &self
220            .array
221            .children()
222            .iter()
223            .map(|child| {
224                // Update the number of buffers required.
225                let msg = ArrayNodeFlatBuffer {
226                    ctx: self.ctx,
227                    array: child,
228                    buffer_idx: child_buffer_idx,
229                }
230                .try_write_flatbuffer(fbb)?;
231
232                child_buffer_idx = u16::try_from(child.nbuffers_recursive())
233                    .ok()
234                    .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
235                    .ok_or_else(|| vortex_err!("Too many buffers (u16) for Array"))?;
236
237                Ok(msg)
238            })
239            .collect::<VortexResult<Vec<_>>>()?;
240        let children = Some(fbb.create_vector(children));
241
242        let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
243        let stats = Some(self.array.statistics().write_flatbuffer(fbb)?);
244
245        Ok(fba::ArrayNode::create(
246            fbb,
247            &fba::ArrayNodeArgs {
248                encoding: encoding_idx,
249                metadata,
250                children,
251                buffers,
252                stats,
253            },
254        ))
255    }
256}
257
258/// To minimize the serialized form, arrays do not persist their own dtype and length. Instead,
259/// parent arrays pass this information down during deserialization.
260pub trait ArrayChildren {
261    /// Returns the nth child of the array with the given dtype and length.
262    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
263
264    /// The number of children.
265    fn len(&self) -> usize;
266
267    /// Returns true if there are no children.
268    fn is_empty(&self) -> bool {
269        self.len() == 0
270    }
271}
272
273impl ArrayChildren for &[ArrayRef] {
274    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
275        let array = self[index].clone();
276        assert_eq!(array.len(), len);
277        assert_eq!(array.dtype(), dtype);
278        Ok(array)
279    }
280
281    fn len(&self) -> usize {
282        <[_]>::len(self)
283    }
284}
285
286/// [`ArrayParts`] represents a parsed but not-yet-decoded deserialized [`Array`].
287/// It contains all the information from the serialized form, without anything extra. i.e.
288/// it is missing a [`DType`] and `len`, and the `encoding_id` is not yet resolved to a concrete
289/// vtable.
290///
291/// An [`ArrayParts`] can be fully decoded into an [`ArrayRef`] using the `decode` function.
292#[derive(Clone)]
293pub struct ArrayParts {
294    // Typed as fb::ArrayNode
295    flatbuffer: FlatBuffer,
296    // The location of the current fb::ArrayNode
297    flatbuffer_loc: usize,
298    buffers: Arc<[BufferHandle]>,
299}
300
301impl Debug for ArrayParts {
302    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
303        f.debug_struct("ArrayParts")
304            .field("encoding_id", &self.encoding_id())
305            .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
306            .field(
307                "buffers",
308                &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
309            )
310            .field("metadata", &self.metadata())
311            .finish()
312    }
313}
314
315impl ArrayParts {
316    /// Decode an [`ArrayParts`] into an [`ArrayRef`].
317    pub fn decode(
318        &self,
319        dtype: &DType,
320        len: usize,
321        ctx: &ArrayContext,
322        session: &VortexSession,
323    ) -> VortexResult<ArrayRef> {
324        let encoding_idx = self.flatbuffer().encoding();
325        let encoding_id = ctx
326            .resolve(encoding_idx)
327            .ok_or_else(|| vortex_err!("Unknown encoding index: {}", encoding_idx))?;
328        let vtable = session
329            .arrays()
330            .registry()
331            .find(&encoding_id)
332            .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
333
334        let children = ArrayPartsChildren {
335            parts: self,
336            ctx,
337            session,
338        };
339
340        let buffers = self.collect_buffers()?;
341
342        let decoded = vtable.build(
343            encoding_id.clone(),
344            dtype,
345            len,
346            self.metadata(),
347            &buffers,
348            &children,
349            session,
350        )?;
351
352        assert_eq!(
353            decoded.len(),
354            len,
355            "Array decoded from {} has incorrect length {}, expected {}",
356            encoding_id,
357            decoded.len(),
358            len
359        );
360        assert_eq!(
361            decoded.dtype(),
362            dtype,
363            "Array decoded from {} has incorrect dtype {}, expected {}",
364            encoding_id,
365            decoded.dtype(),
366            dtype,
367        );
368        assert_eq!(
369            decoded.encoding_id(),
370            encoding_id,
371            "Array decoded from {} has incorrect encoding {}",
372            encoding_id,
373            decoded.encoding_id(),
374        );
375
376        // Populate statistics from the serialized array.
377        if let Some(stats) = self.flatbuffer().stats() {
378            decoded
379                .statistics()
380                .set_iter(StatsSet::from_flatbuffer(&stats, dtype)?.into_iter());
381        }
382
383        Ok(decoded)
384    }
385
386    /// Returns the array encoding.
387    pub fn encoding_id(&self) -> u16 {
388        self.flatbuffer().encoding()
389    }
390
391    /// Returns the array metadata bytes.
392    pub fn metadata(&self) -> &[u8] {
393        self.flatbuffer()
394            .metadata()
395            .map(|metadata| metadata.bytes())
396            .unwrap_or(&[])
397    }
398
399    /// Returns the number of children.
400    pub fn nchildren(&self) -> usize {
401        self.flatbuffer()
402            .children()
403            .map_or(0, |children| children.len())
404    }
405
406    /// Returns the nth child of the array.
407    pub fn child(&self, idx: usize) -> ArrayParts {
408        let children = self
409            .flatbuffer()
410            .children()
411            .vortex_expect("Expected array to have children");
412        if idx >= children.len() {
413            vortex_panic!(
414                "Invalid child index {} for array with {} children",
415                idx,
416                children.len()
417            );
418        }
419        self.with_root(children.get(idx))
420    }
421
422    /// Returns the number of buffers.
423    pub fn nbuffers(&self) -> usize {
424        self.flatbuffer()
425            .buffers()
426            .map_or(0, |buffers| buffers.len())
427    }
428
429    /// Returns the nth buffer of the current array.
430    pub fn buffer(&self, idx: usize) -> VortexResult<BufferHandle> {
431        let buffer_idx = self
432            .flatbuffer()
433            .buffers()
434            .ok_or_else(|| vortex_err!("Array has no buffers"))?
435            .get(idx);
436        self.buffers
437            .get(buffer_idx as usize)
438            .cloned()
439            .ok_or_else(|| {
440                vortex_err!(
441                    "Invalid buffer index {} for array with {} buffers",
442                    buffer_idx,
443                    self.nbuffers()
444                )
445            })
446    }
447
448    /// Returns all buffers for the current array node.
449    ///
450    /// If buffer indices are contiguous, returns a zero-copy borrowed slice.
451    /// Otherwise falls back to collecting each buffer individually.
452    fn collect_buffers(&self) -> VortexResult<Cow<'_, [BufferHandle]>> {
453        let Some(fb_buffers) = self.flatbuffer().buffers() else {
454            return Ok(Cow::Borrowed(&[]));
455        };
456        let count = fb_buffers.len();
457        if count == 0 {
458            return Ok(Cow::Borrowed(&[]));
459        }
460        let start = fb_buffers.get(0) as usize;
461        let contiguous = fb_buffers
462            .iter()
463            .enumerate()
464            .all(|(i, idx)| idx as usize == start + i);
465        if contiguous {
466            self.buffers.get(start..start + count).map_or_else(
467                || {
468                    vortex_bail!(
469                        "buffer indices {}..{} out of range for {} buffers",
470                        start,
471                        start + count,
472                        self.buffers.len()
473                    )
474                },
475                |slice| Ok(Cow::Borrowed(slice)),
476            )
477        } else {
478            (0..count)
479                .map(|idx| self.buffer(idx))
480                .collect::<VortexResult<Vec<_>>>()
481                .map(Cow::Owned)
482        }
483    }
484
485    /// Returns the buffer lengths as stored in the flatbuffer metadata.
486    ///
487    /// This reads the buffer descriptors from the flatbuffer, which contain the
488    /// serialized length of each buffer. This is useful for displaying buffer sizes
489    /// without needing to access the actual buffer data.
490    pub fn buffer_lengths(&self) -> Vec<usize> {
491        let fb_array = root::<fba::Array>(self.flatbuffer.as_ref())
492            .vortex_expect("ArrayParts flatbuffer must be a valid Array");
493        fb_array
494            .buffers()
495            .map(|buffers| buffers.iter().map(|b| b.length() as usize).collect())
496            .unwrap_or_default()
497    }
498
499    /// Validate and align the array tree flatbuffer, returning the aligned buffer and root location.
500    fn validate_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<(FlatBuffer, usize)> {
501        let fb_buffer = FlatBuffer::align_from(array_tree.into());
502        let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
503        let fb_root = fb_array
504            .root()
505            .ok_or_else(|| vortex_err!("Array must have a root node"))?;
506        let flatbuffer_loc = fb_root._tab.loc();
507        Ok((fb_buffer, flatbuffer_loc))
508    }
509
510    /// Create an [`ArrayParts`] from a pre-existing array tree flatbuffer and pre-resolved buffer
511    /// handles.
512    ///
513    /// The caller is responsible for resolving buffers from whatever source (device segments, host
514    /// overrides, or a mix). The buffers must be in the same order as the `Array.buffers` descriptor
515    /// list in the flatbuffer.
516    pub fn from_flatbuffer_with_buffers(
517        array_tree: impl Into<ByteBuffer>,
518        buffers: Vec<BufferHandle>,
519    ) -> VortexResult<Self> {
520        let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
521        Ok(ArrayParts {
522            flatbuffer,
523            flatbuffer_loc,
524            buffers: buffers.into(),
525        })
526    }
527
528    /// Create an [`ArrayParts`] from a raw array tree flatbuffer (metadata only).
529    ///
530    /// This constructor creates an `ArrayParts` with no buffer data, useful for
531    /// inspecting the metadata when the actual buffer data is not needed
532    /// (e.g., displaying buffer sizes from inlined array tree metadata).
533    ///
534    /// Note: Calling `buffer()` on the returned `ArrayParts` will fail since
535    /// no actual buffer data is available.
536    pub fn from_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<Self> {
537        let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
538        Ok(ArrayParts {
539            flatbuffer,
540            flatbuffer_loc,
541            buffers: Arc::new([]),
542        })
543    }
544
545    /// Returns the root ArrayNode flatbuffer.
546    fn flatbuffer(&self) -> fba::ArrayNode<'_> {
547        unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
548    }
549
550    /// Returns a new [`ArrayParts`] with the given node as the root
551    // TODO(ngates): we may want a wrapper that avoids this clone.
552    fn with_root(&self, root: fba::ArrayNode) -> Self {
553        let mut this = self.clone();
554        this.flatbuffer_loc = root._tab.loc();
555        this
556    }
557
558    /// Create an [`ArrayParts`] from a pre-existing flatbuffer (ArrayNode) and a segment containing
559    /// only the data buffers (without the flatbuffer suffix).
560    ///
561    /// This is used when the flatbuffer is stored separately in layout metadata (e.g., when
562    /// `FLAT_LAYOUT_INLINE_ARRAY_NODE` is enabled).
563    pub fn from_flatbuffer_and_segment(
564        array_tree: ByteBuffer,
565        segment: BufferHandle,
566    ) -> VortexResult<Self> {
567        // HashMap::new doesn't allocate when empty, so this has no overhead
568        Self::from_flatbuffer_and_segment_with_overrides(array_tree, segment, &HashMap::new())
569    }
570
571    /// Create an [`ArrayParts`] from a pre-existing flatbuffer (ArrayNode) and a segment,
572    /// substituting host-resident buffer overrides for specific buffer indices.
573    ///
574    /// Buffers whose index appears in `buffer_overrides` are resolved from the provided
575    /// host data instead of the segment. All other buffers are sliced from the segment
576    /// using the padding and alignment described in the flatbuffer.
577    pub fn from_flatbuffer_and_segment_with_overrides(
578        array_tree: ByteBuffer,
579        segment: BufferHandle,
580        buffer_overrides: &HashMap<u32, ByteBuffer>,
581    ) -> VortexResult<Self> {
582        // We align each buffer individually, so we remove alignment requirements on the segment
583        // for host-resident buffers. Device buffers are sliced directly.
584        let segment = segment.ensure_aligned(Alignment::none())?;
585
586        // this can't return the validated array because there is no lifetime to give it, so we
587        // need to cast it below, which is safe.
588        let (fb_buffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
589        // SAFETY: fb_buffer was already validated by validate_array_tree above.
590        let fb_array = unsafe { fba::root_as_array_unchecked(fb_buffer.as_ref()) };
591
592        let mut offset = 0;
593        let buffers = fb_array
594            .buffers()
595            .unwrap_or_default()
596            .iter()
597            .enumerate()
598            .map(|(idx, fb_buf)| {
599                offset += fb_buf.padding() as usize;
600                let buffer_len = fb_buf.length() as usize;
601                let alignment = Alignment::from_exponent(fb_buf.alignment_exponent());
602
603                let idx = u32::try_from(idx).vortex_expect("buffer count must fit in u32");
604                let handle = if let Some(host_data) = buffer_overrides.get(&idx) {
605                    BufferHandle::new_host(host_data.clone()).ensure_aligned(alignment)?
606                } else {
607                    let buffer = segment.slice(offset..(offset + buffer_len));
608                    buffer.ensure_aligned(alignment)?
609                };
610
611                offset += buffer_len;
612                Ok(handle)
613            })
614            .collect::<VortexResult<Arc<[_]>>>()?;
615
616        Ok(ArrayParts {
617            flatbuffer: fb_buffer,
618            flatbuffer_loc,
619            buffers,
620        })
621    }
622}
623
624struct ArrayPartsChildren<'a> {
625    parts: &'a ArrayParts,
626    ctx: &'a ArrayContext,
627    session: &'a VortexSession,
628}
629
630impl ArrayChildren for ArrayPartsChildren<'_> {
631    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
632        self.parts
633            .child(index)
634            .decode(dtype, len, self.ctx, self.session)
635    }
636
637    fn len(&self) -> usize {
638        self.parts.nchildren()
639    }
640}
641
642impl TryFrom<ByteBuffer> for ArrayParts {
643    type Error = VortexError;
644
645    fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
646        // The final 4 bytes contain the length of the flatbuffer.
647        if value.len() < 4 {
648            vortex_bail!("ArrayParts buffer is too short");
649        }
650
651        // We align each buffer individually, so we remove alignment requirements on the buffer.
652        let value = value.aligned(Alignment::none());
653
654        let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
655        if value.len() < 4 + fb_length {
656            vortex_bail!("ArrayParts buffer is too short for flatbuffer");
657        }
658
659        let fb_offset = value.len() - 4 - fb_length;
660        let array_tree = value.slice(fb_offset..fb_offset + fb_length);
661        let segment = BufferHandle::new_host(value.slice(0..fb_offset));
662
663        Self::from_flatbuffer_and_segment(array_tree, segment)
664    }
665}
666
667impl TryFrom<BufferHandle> for ArrayParts {
668    type Error = VortexError;
669
670    fn try_from(value: BufferHandle) -> Result<Self, Self::Error> {
671        Self::try_from(value.try_to_host_sync()?)
672    }
673}