Skip to main content

vortex_array/
serde.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::fmt::Formatter;
6use std::iter;
7use std::sync::Arc;
8
9use flatbuffers::FlatBufferBuilder;
10use flatbuffers::Follow;
11use flatbuffers::WIPOffset;
12use flatbuffers::root;
13use itertools::Itertools;
14use vortex_buffer::Alignment;
15use vortex_buffer::ByteBuffer;
16use vortex_dtype::DType;
17use vortex_dtype::TryFromBytes;
18use vortex_error::VortexError;
19use vortex_error::VortexExpect;
20use vortex_error::VortexResult;
21use vortex_error::vortex_bail;
22use vortex_error::vortex_err;
23use vortex_error::vortex_panic;
24use vortex_flatbuffers::FlatBuffer;
25use vortex_flatbuffers::WriteFlatBuffer;
26use vortex_flatbuffers::array as fba;
27use vortex_flatbuffers::array::Compression;
28use vortex_session::VortexSession;
29
30use crate::Array;
31use crate::ArrayContext;
32use crate::ArrayRef;
33use crate::ArrayVisitor;
34use crate::ArrayVisitorExt;
35use crate::buffer::BufferHandle;
36use crate::session::ArraySessionExt;
37use crate::stats::StatsSet;
38
39/// Options for serializing an array.
40#[derive(Default, Debug)]
41pub struct SerializeOptions {
42    /// The starting position within an external stream or file. This offset is used to compute
43    /// appropriate padding to enable zero-copy reads.
44    pub offset: usize,
45    /// Whether to include sufficient zero-copy padding.
46    pub include_padding: bool,
47}
48
49impl dyn Array + '_ {
50    /// Serialize the array into a sequence of byte buffers that should be written contiguously.
51    /// This function returns a vec to avoid copying data buffers.
52    ///
53    /// Optionally, padding can be included to guarantee buffer alignment and ensure zero-copy
54    /// reads within the context of an external file or stream. In this case, the alignment of
55    /// the first byte buffer should be respected when writing the buffers to the stream or file.
56    ///
57    /// The format of this blob is a sequence of data buffers, possible with prefixed padding,
58    /// followed by a flatbuffer containing an [`fba::Array`] message, and ending with a
59    /// little-endian u32 describing the length of the flatbuffer message.
60    pub fn serialize(
61        &self,
62        ctx: &ArrayContext,
63        options: &SerializeOptions,
64    ) -> VortexResult<Vec<ByteBuffer>> {
65        // Collect all array buffers
66        let array_buffers = self
67            .depth_first_traversal()
68            .flat_map(|f| f.buffers())
69            .collect::<Vec<_>>();
70
71        // Allocate result buffers, including a possible padding buffer for each.
72        let mut buffers = vec![];
73        let mut fb_buffers = Vec::with_capacity(buffers.capacity());
74
75        // If we're including padding, we need to find the maximum required buffer alignment.
76        let max_alignment = array_buffers
77            .iter()
78            .map(|buf| buf.alignment())
79            .chain(iter::once(FlatBuffer::alignment()))
80            .max()
81            .unwrap_or_else(FlatBuffer::alignment);
82
83        // Create a shared buffer of zeros we can use for padding
84        let zeros = ByteBuffer::zeroed(*max_alignment);
85
86        // We push an empty buffer with the maximum alignment, so then subsequent buffers
87        // will be aligned. For subsequent buffers, we always push a 1-byte alignment.
88        buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
89
90        // Keep track of where we are in the "file" to calculate padding.
91        let mut pos = options.offset;
92
93        // Push all the array buffers with padding as necessary.
94        for buffer in array_buffers {
95            let padding = if options.include_padding {
96                let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
97                if padding > 0 {
98                    pos += padding;
99                    buffers.push(zeros.slice(0..padding));
100                }
101                padding
102            } else {
103                0
104            };
105
106            fb_buffers.push(fba::Buffer::new(
107                u16::try_from(padding).vortex_expect("padding fits into u16"),
108                buffer.alignment().exponent(),
109                Compression::None,
110                u32::try_from(buffer.len())
111                    .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
112            ));
113
114            pos += buffer.len();
115            buffers.push(buffer.aligned(Alignment::none()));
116        }
117
118        // Set up the flatbuffer builder
119        let mut fbb = FlatBufferBuilder::new();
120
121        let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
122        let fb_root = root.try_write_flatbuffer(&mut fbb)?;
123
124        let fb_buffers = fbb.create_vector(&fb_buffers);
125        let fb_array = fba::Array::create(
126            &mut fbb,
127            &fba::ArrayArgs {
128                root: Some(fb_root),
129                buffers: Some(fb_buffers),
130            },
131        );
132        fbb.finish_minimal(fb_array);
133        let (fb_vec, fb_start) = fbb.collapse();
134        let fb_end = fb_vec.len();
135        let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
136        let fb_length = fb_buffer.len();
137
138        if options.include_padding {
139            let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
140            if padding > 0 {
141                buffers.push(zeros.slice(0..padding));
142            }
143        }
144        buffers.push(fb_buffer);
145
146        // Finally, we write down the u32 length for the flatbuffer.
147        buffers.push(ByteBuffer::from(
148            u32::try_from(fb_length)
149                .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
150                .to_le_bytes()
151                .to_vec(),
152        ));
153
154        Ok(buffers)
155    }
156}
157
158/// A utility struct for creating an [`fba::ArrayNode`] flatbuffer.
159pub struct ArrayNodeFlatBuffer<'a> {
160    ctx: &'a ArrayContext,
161    array: &'a dyn Array,
162    buffer_idx: u16,
163}
164
165impl<'a> ArrayNodeFlatBuffer<'a> {
166    pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn Array) -> VortexResult<Self> {
167        // Depth-first traversal of the array to ensure it supports serialization.
168        for child in array.depth_first_traversal() {
169            if child.metadata()?.is_none() {
170                vortex_bail!(
171                    "Array {} does not support serialization",
172                    child.encoding_id()
173                );
174            }
175        }
176        let n_buffers_recursive = array.nbuffers_recursive();
177        if n_buffers_recursive > u16::MAX as usize {
178            vortex_bail!(
179                "Array and all descendent arrays can have at most u16::MAX buffers: {}",
180                n_buffers_recursive
181            );
182        };
183        Ok(Self {
184            ctx,
185            array,
186            buffer_idx: 0,
187        })
188    }
189
190    pub fn try_write_flatbuffer<'fb>(
191        &self,
192        fbb: &mut FlatBufferBuilder<'fb>,
193    ) -> VortexResult<WIPOffset<fba::ArrayNode<'fb>>> {
194        let encoding_idx = self
195            .ctx
196            .intern(&self.array.encoding_id())
197            // TODO(ngates): write_flatbuffer should return a result if this can fail.
198            .ok_or_else(|| {
199                vortex_err!(
200                    "Array encoding {} not permitted by ctx",
201                    self.array.encoding_id()
202                )
203            })?;
204
205        let metadata = self.array.metadata()?.ok_or_else(|| {
206            vortex_err!(
207                "Array {} does not support serialization",
208                self.array.encoding_id()
209            )
210        })?;
211        let metadata = Some(fbb.create_vector(metadata.as_slice()));
212
213        // Assign buffer indices for all child arrays.
214        let nbuffers = u16::try_from(self.array.nbuffers())
215            .map_err(|_| vortex_err!("Array can have at most u16::MAX buffers"))?;
216        let mut child_buffer_idx = self.buffer_idx + nbuffers;
217
218        let children = &self
219            .array
220            .children()
221            .iter()
222            .map(|child| {
223                // Update the number of buffers required.
224                let msg = ArrayNodeFlatBuffer {
225                    ctx: self.ctx,
226                    array: child,
227                    buffer_idx: child_buffer_idx,
228                }
229                .try_write_flatbuffer(fbb)?;
230
231                child_buffer_idx = u16::try_from(child.nbuffers_recursive())
232                    .ok()
233                    .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
234                    .ok_or_else(|| vortex_err!("Too many buffers (u16) for Array"))?;
235
236                Ok(msg)
237            })
238            .collect::<VortexResult<Vec<_>>>()?;
239        let children = Some(fbb.create_vector(children));
240
241        let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
242        let stats = Some(self.array.statistics().write_flatbuffer(fbb)?);
243
244        Ok(fba::ArrayNode::create(
245            fbb,
246            &fba::ArrayNodeArgs {
247                encoding: encoding_idx,
248                metadata,
249                children,
250                buffers,
251                stats,
252            },
253        ))
254    }
255}
256
257/// To minimize the serialized form, arrays do not persist their own dtype and length. Instead,
258/// parent arrays pass this information down during deserialization.
259pub trait ArrayChildren {
260    /// Returns the nth child of the array with the given dtype and length.
261    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
262
263    /// The number of children.
264    fn len(&self) -> usize;
265
266    /// Returns true if there are no children.
267    fn is_empty(&self) -> bool {
268        self.len() == 0
269    }
270}
271
272impl ArrayChildren for &[ArrayRef] {
273    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
274        let array = self[index].clone();
275        assert_eq!(array.len(), len);
276        assert_eq!(array.dtype(), dtype);
277        Ok(array)
278    }
279
280    fn len(&self) -> usize {
281        <[_]>::len(self)
282    }
283}
284
285/// [`ArrayParts`] represents a parsed but not-yet-decoded deserialized [`Array`].
286/// It contains all the information from the serialized form, without anything extra. i.e.
287/// it is missing a [`DType`] and `len`, and the `encoding_id` is not yet resolved to a concrete
288/// vtable.
289///
290/// An [`ArrayParts`] can be fully decoded into an [`ArrayRef`] using the `decode` function.
291#[derive(Clone)]
292pub struct ArrayParts {
293    // Typed as fb::ArrayNode
294    flatbuffer: FlatBuffer,
295    // The location of the current fb::ArrayNode
296    flatbuffer_loc: usize,
297    buffers: Arc<[BufferHandle]>,
298}
299
300impl Debug for ArrayParts {
301    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
302        f.debug_struct("ArrayParts")
303            .field("encoding_id", &self.encoding_id())
304            .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
305            .field(
306                "buffers",
307                &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
308            )
309            .field("metadata", &self.metadata())
310            .finish()
311    }
312}
313
314impl ArrayParts {
315    /// Decode an [`ArrayParts`] into an [`ArrayRef`].
316    pub fn decode(
317        &self,
318        dtype: &DType,
319        len: usize,
320        ctx: &ArrayContext,
321        session: &VortexSession,
322    ) -> VortexResult<ArrayRef> {
323        let encoding_idx = self.flatbuffer().encoding();
324        let encoding_id = ctx
325            .resolve(encoding_idx)
326            .ok_or_else(|| vortex_err!("Unknown encoding index: {}", encoding_idx))?;
327        let vtable = session
328            .arrays()
329            .registry()
330            .find(&encoding_id)
331            .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
332
333        let buffers: Vec<_> = (0..self.nbuffers())
334            .map(|idx| self.buffer(idx))
335            .try_collect()?;
336
337        let children = ArrayPartsChildren {
338            parts: self,
339            ctx,
340            session,
341        };
342
343        let decoded = vtable.build(
344            encoding_id.clone(),
345            dtype,
346            len,
347            self.metadata(),
348            &buffers,
349            &children,
350            session,
351        )?;
352
353        assert_eq!(
354            decoded.len(),
355            len,
356            "Array decoded from {} has incorrect length {}, expected {}",
357            encoding_id,
358            decoded.len(),
359            len
360        );
361        assert_eq!(
362            decoded.dtype(),
363            dtype,
364            "Array decoded from {} has incorrect dtype {}, expected {}",
365            encoding_id,
366            decoded.dtype(),
367            dtype,
368        );
369        assert_eq!(
370            decoded.encoding_id(),
371            encoding_id,
372            "Array decoded from {} has incorrect encoding {}",
373            encoding_id,
374            decoded.encoding_id(),
375        );
376
377        // Populate statistics from the serialized array.
378        if let Some(stats) = self.flatbuffer().stats() {
379            let decoded_statistics = decoded.statistics();
380            StatsSet::from_flatbuffer(&stats, dtype)?
381                .into_iter()
382                .for_each(|(stat, val)| decoded_statistics.set(stat, val));
383        }
384
385        Ok(decoded)
386    }
387
388    /// Returns the array encoding.
389    pub fn encoding_id(&self) -> u16 {
390        self.flatbuffer().encoding()
391    }
392
393    /// Returns the array metadata bytes.
394    pub fn metadata(&self) -> &[u8] {
395        self.flatbuffer()
396            .metadata()
397            .map(|metadata| metadata.bytes())
398            .unwrap_or(&[])
399    }
400
401    /// Returns the number of children.
402    pub fn nchildren(&self) -> usize {
403        self.flatbuffer()
404            .children()
405            .map_or(0, |children| children.len())
406    }
407
408    /// Returns the nth child of the array.
409    pub fn child(&self, idx: usize) -> ArrayParts {
410        let children = self
411            .flatbuffer()
412            .children()
413            .vortex_expect("Expected array to have children");
414        if idx >= children.len() {
415            vortex_panic!(
416                "Invalid child index {} for array with {} children",
417                idx,
418                children.len()
419            );
420        }
421        self.with_root(children.get(idx))
422    }
423
424    /// Returns the number of buffers.
425    pub fn nbuffers(&self) -> usize {
426        self.flatbuffer()
427            .buffers()
428            .map_or(0, |buffers| buffers.len())
429    }
430
431    /// Returns the nth buffer of the current array.
432    pub fn buffer(&self, idx: usize) -> VortexResult<BufferHandle> {
433        let buffer_idx = self
434            .flatbuffer()
435            .buffers()
436            .ok_or_else(|| vortex_err!("Array has no buffers"))?
437            .get(idx);
438        self.buffers
439            .get(buffer_idx as usize)
440            .cloned()
441            .ok_or_else(|| {
442                vortex_err!(
443                    "Invalid buffer index {} for array with {} buffers",
444                    buffer_idx,
445                    self.nbuffers()
446                )
447            })
448    }
449
450    /// Returns the buffer lengths as stored in the flatbuffer metadata.
451    ///
452    /// This reads the buffer descriptors from the flatbuffer, which contain the
453    /// serialized length of each buffer. This is useful for displaying buffer sizes
454    /// without needing to access the actual buffer data.
455    pub fn buffer_lengths(&self) -> Vec<usize> {
456        let fb_array = root::<fba::Array>(self.flatbuffer.as_ref())
457            .vortex_expect("ArrayParts flatbuffer must be a valid Array");
458        fb_array
459            .buffers()
460            .map(|buffers| buffers.iter().map(|b| b.length() as usize).collect())
461            .unwrap_or_default()
462    }
463
464    /// Validate and align the array tree flatbuffer, returning the aligned buffer and root location.
465    fn validate_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<(FlatBuffer, usize)> {
466        let fb_buffer = FlatBuffer::align_from(array_tree.into());
467        let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
468        let fb_root = fb_array
469            .root()
470            .ok_or_else(|| vortex_err!("Array must have a root node"))?;
471        let flatbuffer_loc = fb_root._tab.loc();
472        Ok((fb_buffer, flatbuffer_loc))
473    }
474
475    /// Create an [`ArrayParts`] from a pre-existing array tree flatbuffer and pre-resolved buffer
476    /// handles.
477    ///
478    /// The caller is responsible for resolving buffers from whatever source (device segments, host
479    /// overrides, or a mix). The buffers must be in the same order as the `Array.buffers` descriptor
480    /// list in the flatbuffer.
481    pub fn from_flatbuffer_with_buffers(
482        array_tree: impl Into<ByteBuffer>,
483        buffers: Vec<BufferHandle>,
484    ) -> VortexResult<Self> {
485        let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
486        Ok(ArrayParts {
487            flatbuffer,
488            flatbuffer_loc,
489            buffers: buffers.into(),
490        })
491    }
492
493    /// Create an [`ArrayParts`] from a raw array tree flatbuffer (metadata only).
494    ///
495    /// This constructor creates an `ArrayParts` with no buffer data, useful for
496    /// inspecting the metadata when the actual buffer data is not needed
497    /// (e.g., displaying buffer sizes from inlined array tree metadata).
498    ///
499    /// Note: Calling `buffer()` on the returned `ArrayParts` will fail since
500    /// no actual buffer data is available.
501    pub fn from_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<Self> {
502        let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
503        Ok(ArrayParts {
504            flatbuffer,
505            flatbuffer_loc,
506            buffers: Arc::new([]),
507        })
508    }
509
510    /// Returns the root ArrayNode flatbuffer.
511    fn flatbuffer(&self) -> fba::ArrayNode<'_> {
512        unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
513    }
514
515    /// Returns a new [`ArrayParts`] with the given node as the root
516    // TODO(ngates): we may want a wrapper that avoids this clone.
517    fn with_root(&self, root: fba::ArrayNode) -> Self {
518        let mut this = self.clone();
519        this.flatbuffer_loc = root._tab.loc();
520        this
521    }
522
523    /// Create an [`ArrayParts`] from a pre-existing flatbuffer (ArrayNode) and a segment containing
524    /// only the data buffers (without the flatbuffer suffix).
525    ///
526    /// This is used when the flatbuffer is stored separately in layout metadata (e.g., when
527    /// `FLAT_LAYOUT_INLINE_ARRAY_NODE` is enabled).
528    pub fn from_flatbuffer_and_segment(
529        array_tree: ByteBuffer,
530        segment: BufferHandle,
531    ) -> VortexResult<Self> {
532        // We align each buffer individually, so we remove alignment requirements on the segment
533        // for host-resident buffers. Device buffers are sliced directly.
534        let segment = segment.ensure_aligned(Alignment::none())?;
535
536        // this can't return the validated array because there is no lifetime to give it, so we
537        // need to cast it below, which is safe.
538        let (fb_buffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
539        // SAFETY: fb_buffer was already validated by validate_array_tree above.
540        let fb_array = unsafe { fba::root_as_array_unchecked(fb_buffer.as_ref()) };
541
542        let mut offset = 0;
543        let buffers = fb_array
544            .buffers()
545            .unwrap_or_default()
546            .iter()
547            .map(|fb_buf| {
548                offset += fb_buf.padding() as usize;
549                let buffer_len = fb_buf.length() as usize;
550                let buffer = segment.slice(offset..(offset + buffer_len));
551                let buffer =
552                    buffer.ensure_aligned(Alignment::from_exponent(fb_buf.alignment_exponent()))?;
553                offset += buffer_len;
554                Ok(buffer)
555            })
556            .collect::<VortexResult<Arc<[_]>>>()?;
557
558        Ok(ArrayParts {
559            flatbuffer: fb_buffer,
560            flatbuffer_loc,
561            buffers,
562        })
563    }
564}
565
566struct ArrayPartsChildren<'a> {
567    parts: &'a ArrayParts,
568    ctx: &'a ArrayContext,
569    session: &'a VortexSession,
570}
571
572impl ArrayChildren for ArrayPartsChildren<'_> {
573    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
574        self.parts
575            .child(index)
576            .decode(dtype, len, self.ctx, self.session)
577    }
578
579    fn len(&self) -> usize {
580        self.parts.nchildren()
581    }
582}
583
584impl TryFrom<ByteBuffer> for ArrayParts {
585    type Error = VortexError;
586
587    fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
588        // The final 4 bytes contain the length of the flatbuffer.
589        if value.len() < 4 {
590            vortex_bail!("ArrayParts buffer is too short");
591        }
592
593        // We align each buffer individually, so we remove alignment requirements on the buffer.
594        let value = value.aligned(Alignment::none());
595
596        let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
597        if value.len() < 4 + fb_length {
598            vortex_bail!("ArrayParts buffer is too short for flatbuffer");
599        }
600
601        let fb_offset = value.len() - 4 - fb_length;
602        let array_tree = value.slice(fb_offset..fb_offset + fb_length);
603        let segment = BufferHandle::new_host(value.slice(0..fb_offset));
604
605        Self::from_flatbuffer_and_segment(array_tree, segment)
606    }
607}
608
609impl TryFrom<BufferHandle> for ArrayParts {
610    type Error = VortexError;
611
612    fn try_from(value: BufferHandle) -> Result<Self, Self::Error> {
613        Self::try_from(value.try_to_host_sync()?)
614    }
615}