Skip to main content

vortex_array/
serde.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::borrow::Cow;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::iter;
8use std::sync::Arc;
9
10use flatbuffers::FlatBufferBuilder;
11use flatbuffers::Follow;
12use flatbuffers::WIPOffset;
13use flatbuffers::root;
14use vortex_buffer::Alignment;
15use vortex_buffer::ByteBuffer;
16use vortex_error::VortexError;
17use vortex_error::VortexExpect;
18use vortex_error::VortexResult;
19use vortex_error::vortex_bail;
20use vortex_error::vortex_err;
21use vortex_error::vortex_panic;
22use vortex_flatbuffers::FlatBuffer;
23use vortex_flatbuffers::WriteFlatBuffer;
24use vortex_flatbuffers::array as fba;
25use vortex_flatbuffers::array::Compression;
26use vortex_session::VortexSession;
27use vortex_session::registry::ReadContext;
28use vortex_utils::aliases::hash_map::HashMap;
29
30use crate::ArrayContext;
31use crate::ArrayRef;
32use crate::ArrayVisitor;
33use crate::ArrayVisitorExt;
34use crate::DynArray;
35use crate::buffer::BufferHandle;
36use crate::dtype::DType;
37use crate::dtype::TryFromBytes;
38use crate::session::ArraySessionExt;
39use crate::stats::StatsSet;
40
41/// Options for serializing an array.
42#[derive(Default, Debug)]
43pub struct SerializeOptions {
44    /// The starting position within an external stream or file. This offset is used to compute
45    /// appropriate padding to enable zero-copy reads.
46    pub offset: usize,
47    /// Whether to include sufficient zero-copy padding.
48    pub include_padding: bool,
49}
50
51impl dyn DynArray + '_ {
52    /// Serialize the array into a sequence of byte buffers that should be written contiguously.
53    /// This function returns a vec to avoid copying data buffers.
54    ///
55    /// Optionally, padding can be included to guarantee buffer alignment and ensure zero-copy
56    /// reads within the context of an external file or stream. In this case, the alignment of
57    /// the first byte buffer should be respected when writing the buffers to the stream or file.
58    ///
59    /// The format of this blob is a sequence of data buffers, possible with prefixed padding,
60    /// followed by a flatbuffer containing an [`fba::Array`] message, and ending with a
61    /// little-endian u32 describing the length of the flatbuffer message.
62    pub fn serialize(
63        &self,
64        ctx: &ArrayContext,
65        options: &SerializeOptions,
66    ) -> VortexResult<Vec<ByteBuffer>> {
67        // Collect all array buffers
68        let array_buffers = self
69            .depth_first_traversal()
70            .flat_map(|f| f.buffers())
71            .collect::<Vec<_>>();
72
73        // Allocate result buffers, including a possible padding buffer for each.
74        let mut buffers = vec![];
75        let mut fb_buffers = Vec::with_capacity(buffers.capacity());
76
77        // If we're including padding, we need to find the maximum required buffer alignment.
78        let max_alignment = array_buffers
79            .iter()
80            .map(|buf| buf.alignment())
81            .chain(iter::once(FlatBuffer::alignment()))
82            .max()
83            .unwrap_or_else(FlatBuffer::alignment);
84
85        // Create a shared buffer of zeros we can use for padding
86        let zeros = ByteBuffer::zeroed(*max_alignment);
87
88        // We push an empty buffer with the maximum alignment, so then subsequent buffers
89        // will be aligned. For subsequent buffers, we always push a 1-byte alignment.
90        buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
91
92        // Keep track of where we are in the "file" to calculate padding.
93        let mut pos = options.offset;
94
95        // Push all the array buffers with padding as necessary.
96        for buffer in array_buffers {
97            let padding = if options.include_padding {
98                let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
99                if padding > 0 {
100                    pos += padding;
101                    buffers.push(zeros.slice(0..padding));
102                }
103                padding
104            } else {
105                0
106            };
107
108            fb_buffers.push(fba::Buffer::new(
109                u16::try_from(padding).vortex_expect("padding fits into u16"),
110                buffer.alignment().exponent(),
111                Compression::None,
112                u32::try_from(buffer.len())
113                    .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
114            ));
115
116            pos += buffer.len();
117            buffers.push(buffer.aligned(Alignment::none()));
118        }
119
120        // Set up the flatbuffer builder
121        let mut fbb = FlatBufferBuilder::new();
122
123        let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
124        let fb_root = root.try_write_flatbuffer(&mut fbb)?;
125
126        let fb_buffers = fbb.create_vector(&fb_buffers);
127        let fb_array = fba::Array::create(
128            &mut fbb,
129            &fba::ArrayArgs {
130                root: Some(fb_root),
131                buffers: Some(fb_buffers),
132            },
133        );
134        fbb.finish_minimal(fb_array);
135        let (fb_vec, fb_start) = fbb.collapse();
136        let fb_end = fb_vec.len();
137        let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
138        let fb_length = fb_buffer.len();
139
140        if options.include_padding {
141            let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
142            if padding > 0 {
143                buffers.push(zeros.slice(0..padding));
144            }
145        }
146        buffers.push(fb_buffer);
147
148        // Finally, we write down the u32 length for the flatbuffer.
149        buffers.push(ByteBuffer::from(
150            u32::try_from(fb_length)
151                .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
152                .to_le_bytes()
153                .to_vec(),
154        ));
155
156        Ok(buffers)
157    }
158}
159
160/// A utility struct for creating an [`fba::ArrayNode`] flatbuffer.
161pub struct ArrayNodeFlatBuffer<'a> {
162    ctx: &'a ArrayContext,
163    array: &'a dyn DynArray,
164    buffer_idx: u16,
165}
166
167impl<'a> ArrayNodeFlatBuffer<'a> {
168    pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn DynArray) -> VortexResult<Self> {
169        // Depth-first traversal of the array to ensure it supports serialization.
170        for child in array.depth_first_traversal() {
171            if child.metadata()?.is_none() {
172                vortex_bail!(
173                    "Array {} does not support serialization",
174                    child.encoding_id()
175                );
176            }
177        }
178        let n_buffers_recursive = array.nbuffers_recursive();
179        if n_buffers_recursive > u16::MAX as usize {
180            vortex_bail!(
181                "Array and all descendent arrays can have at most u16::MAX buffers: {}",
182                n_buffers_recursive
183            );
184        };
185        Ok(Self {
186            ctx,
187            array,
188            buffer_idx: 0,
189        })
190    }
191
192    pub fn try_write_flatbuffer<'fb>(
193        &self,
194        fbb: &mut FlatBufferBuilder<'fb>,
195    ) -> VortexResult<WIPOffset<fba::ArrayNode<'fb>>> {
196        let encoding_idx = self
197            .ctx
198            .intern(&self.array.encoding_id())
199            // TODO(ngates): write_flatbuffer should return a result if this can fail.
200            .ok_or_else(|| {
201                vortex_err!(
202                    "Array encoding {} not permitted by ctx",
203                    self.array.encoding_id()
204                )
205            })?;
206
207        let metadata = self.array.metadata()?.ok_or_else(|| {
208            vortex_err!(
209                "Array {} does not support serialization",
210                self.array.encoding_id()
211            )
212        })?;
213        let metadata = Some(fbb.create_vector(metadata.as_slice()));
214
215        // Assign buffer indices for all child arrays.
216        let nbuffers = u16::try_from(self.array.nbuffers())
217            .map_err(|_| vortex_err!("Array can have at most u16::MAX buffers"))?;
218        let mut child_buffer_idx = self.buffer_idx + nbuffers;
219
220        let children = &self
221            .array
222            .children()
223            .iter()
224            .map(|child| {
225                // Update the number of buffers required.
226                let msg = ArrayNodeFlatBuffer {
227                    ctx: self.ctx,
228                    array: child,
229                    buffer_idx: child_buffer_idx,
230                }
231                .try_write_flatbuffer(fbb)?;
232
233                child_buffer_idx = u16::try_from(child.nbuffers_recursive())
234                    .ok()
235                    .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
236                    .ok_or_else(|| vortex_err!("Too many buffers (u16) for Array"))?;
237
238                Ok(msg)
239            })
240            .collect::<VortexResult<Vec<_>>>()?;
241        let children = Some(fbb.create_vector(children));
242
243        let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
244        let stats = Some(self.array.statistics().write_flatbuffer(fbb)?);
245
246        Ok(fba::ArrayNode::create(
247            fbb,
248            &fba::ArrayNodeArgs {
249                encoding: encoding_idx,
250                metadata,
251                children,
252                buffers,
253                stats,
254            },
255        ))
256    }
257}
258
259/// To minimize the serialized form, arrays do not persist their own dtype and length. Instead,
260/// parent arrays pass this information down during deserialization.
261pub trait ArrayChildren {
262    /// Returns the nth child of the array with the given dtype and length.
263    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
264
265    /// The number of children.
266    fn len(&self) -> usize;
267
268    /// Returns true if there are no children.
269    fn is_empty(&self) -> bool {
270        self.len() == 0
271    }
272}
273
274impl ArrayChildren for &[ArrayRef] {
275    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
276        let array = self[index].clone();
277        assert_eq!(array.len(), len);
278        assert_eq!(array.dtype(), dtype);
279        Ok(array)
280    }
281
282    fn len(&self) -> usize {
283        <[_]>::len(self)
284    }
285}
286
287/// [`ArrayParts`] represents a parsed but not-yet-decoded deserialized [`DynArray`].
288/// It contains all the information from the serialized form, without anything extra. i.e.
289/// it is missing a [`DType`] and `len`, and the `encoding_id` is not yet resolved to a concrete
290/// vtable.
291///
292/// An [`ArrayParts`] can be fully decoded into an [`ArrayRef`] using the `decode` function.
293#[derive(Clone)]
294pub struct ArrayParts {
295    // Typed as fb::ArrayNode
296    flatbuffer: FlatBuffer,
297    // The location of the current fb::ArrayNode
298    flatbuffer_loc: usize,
299    buffers: Arc<[BufferHandle]>,
300}
301
302impl Debug for ArrayParts {
303    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
304        f.debug_struct("ArrayParts")
305            .field("encoding_id", &self.encoding_id())
306            .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
307            .field(
308                "buffers",
309                &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
310            )
311            .field("metadata", &self.metadata())
312            .finish()
313    }
314}
315
316impl ArrayParts {
317    /// Decode an [`ArrayParts`] into an [`ArrayRef`].
318    pub fn decode(
319        &self,
320        dtype: &DType,
321        len: usize,
322        ctx: &ReadContext,
323        session: &VortexSession,
324    ) -> VortexResult<ArrayRef> {
325        let encoding_idx = self.flatbuffer().encoding();
326        let encoding_id = ctx
327            .resolve(encoding_idx)
328            .ok_or_else(|| vortex_err!("Unknown encoding index: {}", encoding_idx))?;
329        let vtable = session
330            .arrays()
331            .registry()
332            .find(&encoding_id)
333            .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
334
335        let children = ArrayPartsChildren {
336            parts: self,
337            ctx,
338            session,
339        };
340
341        let buffers = self.collect_buffers()?;
342
343        let decoded = vtable.build(
344            encoding_id.clone(),
345            dtype,
346            len,
347            self.metadata(),
348            &buffers,
349            &children,
350            session,
351        )?;
352
353        assert_eq!(
354            decoded.len(),
355            len,
356            "Array decoded from {} has incorrect length {}, expected {}",
357            encoding_id,
358            decoded.len(),
359            len
360        );
361        assert_eq!(
362            decoded.dtype(),
363            dtype,
364            "Array decoded from {} has incorrect dtype {}, expected {}",
365            encoding_id,
366            decoded.dtype(),
367            dtype,
368        );
369        assert_eq!(
370            decoded.encoding_id(),
371            encoding_id,
372            "Array decoded from {} has incorrect encoding {}",
373            encoding_id,
374            decoded.encoding_id(),
375        );
376
377        // Populate statistics from the serialized array.
378        if let Some(stats) = self.flatbuffer().stats() {
379            decoded
380                .statistics()
381                .set_iter(StatsSet::from_flatbuffer(&stats, dtype, session)?.into_iter());
382        }
383
384        Ok(decoded)
385    }
386
387    /// Returns the array encoding.
388    pub fn encoding_id(&self) -> u16 {
389        self.flatbuffer().encoding()
390    }
391
392    /// Returns the array metadata bytes.
393    pub fn metadata(&self) -> &[u8] {
394        self.flatbuffer()
395            .metadata()
396            .map(|metadata| metadata.bytes())
397            .unwrap_or(&[])
398    }
399
400    /// Returns the number of children.
401    pub fn nchildren(&self) -> usize {
402        self.flatbuffer()
403            .children()
404            .map_or(0, |children| children.len())
405    }
406
407    /// Returns the nth child of the array.
408    pub fn child(&self, idx: usize) -> ArrayParts {
409        let children = self
410            .flatbuffer()
411            .children()
412            .vortex_expect("Expected array to have children");
413        if idx >= children.len() {
414            vortex_panic!(
415                "Invalid child index {} for array with {} children",
416                idx,
417                children.len()
418            );
419        }
420        self.with_root(children.get(idx))
421    }
422
423    /// Returns the number of buffers.
424    pub fn nbuffers(&self) -> usize {
425        self.flatbuffer()
426            .buffers()
427            .map_or(0, |buffers| buffers.len())
428    }
429
430    /// Returns the nth buffer of the current array.
431    pub fn buffer(&self, idx: usize) -> VortexResult<BufferHandle> {
432        let buffer_idx = self
433            .flatbuffer()
434            .buffers()
435            .ok_or_else(|| vortex_err!("Array has no buffers"))?
436            .get(idx);
437        self.buffers
438            .get(buffer_idx as usize)
439            .cloned()
440            .ok_or_else(|| {
441                vortex_err!(
442                    "Invalid buffer index {} for array with {} buffers",
443                    buffer_idx,
444                    self.nbuffers()
445                )
446            })
447    }
448
449    /// Returns all buffers for the current array node.
450    ///
451    /// If buffer indices are contiguous, returns a zero-copy borrowed slice.
452    /// Otherwise falls back to collecting each buffer individually.
453    fn collect_buffers(&self) -> VortexResult<Cow<'_, [BufferHandle]>> {
454        let Some(fb_buffers) = self.flatbuffer().buffers() else {
455            return Ok(Cow::Borrowed(&[]));
456        };
457        let count = fb_buffers.len();
458        if count == 0 {
459            return Ok(Cow::Borrowed(&[]));
460        }
461        let start = fb_buffers.get(0) as usize;
462        let contiguous = fb_buffers
463            .iter()
464            .enumerate()
465            .all(|(i, idx)| idx as usize == start + i);
466        if contiguous {
467            self.buffers.get(start..start + count).map_or_else(
468                || {
469                    vortex_bail!(
470                        "buffer indices {}..{} out of range for {} buffers",
471                        start,
472                        start + count,
473                        self.buffers.len()
474                    )
475                },
476                |slice| Ok(Cow::Borrowed(slice)),
477            )
478        } else {
479            (0..count)
480                .map(|idx| self.buffer(idx))
481                .collect::<VortexResult<Vec<_>>>()
482                .map(Cow::Owned)
483        }
484    }
485
486    /// Returns the buffer lengths as stored in the flatbuffer metadata.
487    ///
488    /// This reads the buffer descriptors from the flatbuffer, which contain the
489    /// serialized length of each buffer. This is useful for displaying buffer sizes
490    /// without needing to access the actual buffer data.
491    pub fn buffer_lengths(&self) -> Vec<usize> {
492        let fb_array = root::<fba::Array>(self.flatbuffer.as_ref())
493            .vortex_expect("ArrayParts flatbuffer must be a valid Array");
494        fb_array
495            .buffers()
496            .map(|buffers| buffers.iter().map(|b| b.length() as usize).collect())
497            .unwrap_or_default()
498    }
499
500    /// Validate and align the array tree flatbuffer, returning the aligned buffer and root location.
501    fn validate_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<(FlatBuffer, usize)> {
502        let fb_buffer = FlatBuffer::align_from(array_tree.into());
503        let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
504        let fb_root = fb_array
505            .root()
506            .ok_or_else(|| vortex_err!("Array must have a root node"))?;
507        let flatbuffer_loc = fb_root._tab.loc();
508        Ok((fb_buffer, flatbuffer_loc))
509    }
510
511    /// Create an [`ArrayParts`] from a pre-existing array tree flatbuffer and pre-resolved buffer
512    /// handles.
513    ///
514    /// The caller is responsible for resolving buffers from whatever source (device segments, host
515    /// overrides, or a mix). The buffers must be in the same order as the `Array.buffers` descriptor
516    /// list in the flatbuffer.
517    pub fn from_flatbuffer_with_buffers(
518        array_tree: impl Into<ByteBuffer>,
519        buffers: Vec<BufferHandle>,
520    ) -> VortexResult<Self> {
521        let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
522        Ok(ArrayParts {
523            flatbuffer,
524            flatbuffer_loc,
525            buffers: buffers.into(),
526        })
527    }
528
529    /// Create an [`ArrayParts`] from a raw array tree flatbuffer (metadata only).
530    ///
531    /// This constructor creates an `ArrayParts` with no buffer data, useful for
532    /// inspecting the metadata when the actual buffer data is not needed
533    /// (e.g., displaying buffer sizes from inlined array tree metadata).
534    ///
535    /// Note: Calling `buffer()` on the returned `ArrayParts` will fail since
536    /// no actual buffer data is available.
537    pub fn from_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<Self> {
538        let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
539        Ok(ArrayParts {
540            flatbuffer,
541            flatbuffer_loc,
542            buffers: Arc::new([]),
543        })
544    }
545
546    /// Returns the root ArrayNode flatbuffer.
547    fn flatbuffer(&self) -> fba::ArrayNode<'_> {
548        unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
549    }
550
551    /// Returns a new [`ArrayParts`] with the given node as the root
552    // TODO(ngates): we may want a wrapper that avoids this clone.
553    fn with_root(&self, root: fba::ArrayNode) -> Self {
554        let mut this = self.clone();
555        this.flatbuffer_loc = root._tab.loc();
556        this
557    }
558
559    /// Create an [`ArrayParts`] from a pre-existing flatbuffer (ArrayNode) and a segment containing
560    /// only the data buffers (without the flatbuffer suffix).
561    ///
562    /// This is used when the flatbuffer is stored separately in layout metadata (e.g., when
563    /// `FLAT_LAYOUT_INLINE_ARRAY_NODE` is enabled).
564    pub fn from_flatbuffer_and_segment(
565        array_tree: ByteBuffer,
566        segment: BufferHandle,
567    ) -> VortexResult<Self> {
568        // HashMap::new doesn't allocate when empty, so this has no overhead
569        Self::from_flatbuffer_and_segment_with_overrides(array_tree, segment, &HashMap::new())
570    }
571
572    /// Create an [`ArrayParts`] from a pre-existing flatbuffer (ArrayNode) and a segment,
573    /// substituting host-resident buffer overrides for specific buffer indices.
574    ///
575    /// Buffers whose index appears in `buffer_overrides` are resolved from the provided
576    /// host data instead of the segment. All other buffers are sliced from the segment
577    /// using the padding and alignment described in the flatbuffer.
578    pub fn from_flatbuffer_and_segment_with_overrides(
579        array_tree: ByteBuffer,
580        segment: BufferHandle,
581        buffer_overrides: &HashMap<u32, ByteBuffer>,
582    ) -> VortexResult<Self> {
583        // We align each buffer individually, so we remove alignment requirements on the segment
584        // for host-resident buffers. Device buffers are sliced directly.
585        let segment = segment.ensure_aligned(Alignment::none())?;
586
587        // this can't return the validated array because there is no lifetime to give it, so we
588        // need to cast it below, which is safe.
589        let (fb_buffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
590        // SAFETY: fb_buffer was already validated by validate_array_tree above.
591        let fb_array = unsafe { fba::root_as_array_unchecked(fb_buffer.as_ref()) };
592
593        let mut offset = 0;
594        let buffers = fb_array
595            .buffers()
596            .unwrap_or_default()
597            .iter()
598            .enumerate()
599            .map(|(idx, fb_buf)| {
600                offset += fb_buf.padding() as usize;
601                let buffer_len = fb_buf.length() as usize;
602                let alignment = Alignment::from_exponent(fb_buf.alignment_exponent());
603
604                let idx = u32::try_from(idx).vortex_expect("buffer count must fit in u32");
605                let handle = if let Some(host_data) = buffer_overrides.get(&idx) {
606                    BufferHandle::new_host(host_data.clone()).ensure_aligned(alignment)?
607                } else {
608                    let buffer = segment.slice(offset..(offset + buffer_len));
609                    buffer.ensure_aligned(alignment)?
610                };
611
612                offset += buffer_len;
613                Ok(handle)
614            })
615            .collect::<VortexResult<Arc<[_]>>>()?;
616
617        Ok(ArrayParts {
618            flatbuffer: fb_buffer,
619            flatbuffer_loc,
620            buffers,
621        })
622    }
623}
624
625struct ArrayPartsChildren<'a> {
626    parts: &'a ArrayParts,
627    ctx: &'a ReadContext,
628    session: &'a VortexSession,
629}
630
631impl ArrayChildren for ArrayPartsChildren<'_> {
632    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
633        self.parts
634            .child(index)
635            .decode(dtype, len, self.ctx, self.session)
636    }
637
638    fn len(&self) -> usize {
639        self.parts.nchildren()
640    }
641}
642
643impl TryFrom<ByteBuffer> for ArrayParts {
644    type Error = VortexError;
645
646    fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
647        // The final 4 bytes contain the length of the flatbuffer.
648        if value.len() < 4 {
649            vortex_bail!("ArrayParts buffer is too short");
650        }
651
652        // We align each buffer individually, so we remove alignment requirements on the buffer.
653        let value = value.aligned(Alignment::none());
654
655        let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
656        if value.len() < 4 + fb_length {
657            vortex_bail!("ArrayParts buffer is too short for flatbuffer");
658        }
659
660        let fb_offset = value.len() - 4 - fb_length;
661        let array_tree = value.slice(fb_offset..fb_offset + fb_length);
662        let segment = BufferHandle::new_host(value.slice(0..fb_offset));
663
664        Self::from_flatbuffer_and_segment(array_tree, segment)
665    }
666}
667
668impl TryFrom<BufferHandle> for ArrayParts {
669    type Error = VortexError;
670
671    fn try_from(value: BufferHandle) -> Result<Self, Self::Error> {
672        Self::try_from(value.try_to_host_sync()?)
673    }
674}