Skip to main content

vortex_array/
serde.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::borrow::Cow;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::iter;
8use std::sync::Arc;
9
10use flatbuffers::FlatBufferBuilder;
11use flatbuffers::Follow;
12use flatbuffers::WIPOffset;
13use flatbuffers::root;
14use vortex_buffer::Alignment;
15use vortex_buffer::ByteBuffer;
16use vortex_error::VortexError;
17use vortex_error::VortexExpect;
18use vortex_error::VortexResult;
19use vortex_error::vortex_bail;
20use vortex_error::vortex_err;
21use vortex_error::vortex_panic;
22use vortex_flatbuffers::FlatBuffer;
23use vortex_flatbuffers::WriteFlatBuffer;
24use vortex_flatbuffers::array as fba;
25use vortex_flatbuffers::array::Compression;
26use vortex_session::VortexSession;
27use vortex_session::registry::ReadContext;
28use vortex_utils::aliases::hash_map::HashMap;
29
30use crate::ArrayContext;
31use crate::ArrayRef;
32use crate::array::new_foreign_array;
33use crate::buffer::BufferHandle;
34use crate::dtype::DType;
35use crate::dtype::TryFromBytes;
36use crate::session::ArraySessionExt;
37use crate::stats::StatsSet;
38
39/// Options for serializing an array.
40#[derive(Default, Debug)]
41pub struct SerializeOptions {
42    /// The starting position within an external stream or file. This offset is used to compute
43    /// appropriate padding to enable zero-copy reads.
44    pub offset: usize,
45    /// Whether to include sufficient zero-copy padding.
46    pub include_padding: bool,
47}
48
49impl ArrayRef {
50    /// Serialize the array into a sequence of byte buffers that should be written contiguously.
51    /// This function returns a vec to avoid copying data buffers.
52    ///
53    /// Optionally, padding can be included to guarantee buffer alignment and ensure zero-copy
54    /// reads within the context of an external file or stream. In this case, the alignment of
55    /// the first byte buffer should be respected when writing the buffers to the stream or file.
56    ///
57    /// The format of this blob is a sequence of data buffers, possible with prefixed padding,
58    /// followed by a flatbuffer containing an [`fba::Array`] message, and ending with a
59    /// little-endian u32 describing the length of the flatbuffer message.
60    pub fn serialize(
61        &self,
62        ctx: &ArrayContext,
63        session: &VortexSession,
64        options: &SerializeOptions,
65    ) -> VortexResult<Vec<ByteBuffer>> {
66        // Collect all array buffers
67        let array_buffers = self
68            .depth_first_traversal()
69            .flat_map(|f| f.buffers())
70            .collect::<Vec<_>>();
71
72        // Allocate result buffers, including a possible padding buffer for each.
73        let mut buffers = vec![];
74        let mut fb_buffers = Vec::with_capacity(buffers.capacity());
75
76        // If we're including padding, we need to find the maximum required buffer alignment.
77        let max_alignment = array_buffers
78            .iter()
79            .map(|buf| buf.alignment())
80            .chain(iter::once(FlatBuffer::alignment()))
81            .max()
82            .unwrap_or_else(FlatBuffer::alignment);
83
84        // Create a shared buffer of zeros we can use for padding
85        let zeros = ByteBuffer::zeroed(*max_alignment);
86
87        // We push an empty buffer with the maximum alignment, so then subsequent buffers
88        // will be aligned. For subsequent buffers, we always push a 1-byte alignment.
89        buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
90
91        // Keep track of where we are in the "file" to calculate padding.
92        let mut pos = options.offset;
93
94        // Push all the array buffers with padding as necessary.
95        for buffer in array_buffers {
96            let padding = if options.include_padding {
97                let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
98                if padding > 0 {
99                    pos += padding;
100                    buffers.push(zeros.slice(0..padding));
101                }
102                padding
103            } else {
104                0
105            };
106
107            fb_buffers.push(fba::Buffer::new(
108                u16::try_from(padding).vortex_expect("padding fits into u16"),
109                buffer.alignment().exponent(),
110                Compression::None,
111                u32::try_from(buffer.len())
112                    .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
113            ));
114
115            pos += buffer.len();
116            buffers.push(buffer.aligned(Alignment::none()));
117        }
118
119        // Set up the flatbuffer builder
120        let mut fbb = FlatBufferBuilder::new();
121
122        let root = ArrayNodeFlatBuffer::try_new(ctx, session, self)?;
123        let fb_root = root.try_write_flatbuffer(&mut fbb)?;
124
125        let fb_buffers = fbb.create_vector(&fb_buffers);
126        let fb_array = fba::Array::create(
127            &mut fbb,
128            &fba::ArrayArgs {
129                root: Some(fb_root),
130                buffers: Some(fb_buffers),
131            },
132        );
133        fbb.finish_minimal(fb_array);
134        let (fb_vec, fb_start) = fbb.collapse();
135        let fb_end = fb_vec.len();
136        let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
137        let fb_length = fb_buffer.len();
138
139        if options.include_padding {
140            let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
141            if padding > 0 {
142                buffers.push(zeros.slice(0..padding));
143            }
144        }
145        buffers.push(fb_buffer);
146
147        // Finally, we write down the u32 length for the flatbuffer.
148        buffers.push(ByteBuffer::from(
149            u32::try_from(fb_length)
150                .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
151                .to_le_bytes()
152                .to_vec(),
153        ));
154
155        Ok(buffers)
156    }
157}
158
159/// A utility struct for creating an [`fba::ArrayNode`] flatbuffer.
160pub struct ArrayNodeFlatBuffer<'a> {
161    ctx: &'a ArrayContext,
162    session: &'a VortexSession,
163    array: &'a ArrayRef,
164    buffer_idx: u16,
165}
166
167impl<'a> ArrayNodeFlatBuffer<'a> {
168    pub fn try_new(
169        ctx: &'a ArrayContext,
170        session: &'a VortexSession,
171        array: &'a ArrayRef,
172    ) -> VortexResult<Self> {
173        // Depth-first traversal of the array to ensure it supports serialization.
174        // FIXME(ngates): this serializes the metadata and throws it away!
175        for child in array.depth_first_traversal() {
176            if child.metadata(session)?.is_none() {
177                vortex_bail!(
178                    "Array {} does not support serialization",
179                    child.encoding_id()
180                );
181            }
182        }
183        let n_buffers_recursive = array.nbuffers_recursive();
184        if n_buffers_recursive > u16::MAX as usize {
185            vortex_bail!(
186                "Array and all descendent arrays can have at most u16::MAX buffers: {}",
187                n_buffers_recursive
188            );
189        };
190        Ok(Self {
191            ctx,
192            session,
193            array,
194            buffer_idx: 0,
195        })
196    }
197
198    pub fn try_write_flatbuffer<'fb>(
199        &self,
200        fbb: &mut FlatBufferBuilder<'fb>,
201    ) -> VortexResult<WIPOffset<fba::ArrayNode<'fb>>> {
202        let encoding_idx = self
203            .ctx
204            .intern(&self.array.encoding_id())
205            // TODO(ngates): write_flatbuffer should return a result if this can fail.
206            .ok_or_else(|| {
207                vortex_err!(
208                    "Array encoding {} not permitted by ctx",
209                    self.array.encoding_id()
210                )
211            })?;
212
213        let metadata = self.array.metadata(self.session)?.ok_or_else(|| {
214            vortex_err!(
215                "Array {} does not support serialization",
216                self.array.encoding_id()
217            )
218        })?;
219        let metadata = Some(fbb.create_vector(metadata.as_slice()));
220
221        // Assign buffer indices for all child arrays.
222        let nbuffers = u16::try_from(self.array.nbuffers())
223            .map_err(|_| vortex_err!("Array can have at most u16::MAX buffers"))?;
224        let mut child_buffer_idx = self.buffer_idx + nbuffers;
225
226        let children = self
227            .array
228            .children()
229            .iter()
230            .map(|child| {
231                // Update the number of buffers required.
232                let msg = ArrayNodeFlatBuffer {
233                    ctx: self.ctx,
234                    session: self.session,
235                    array: child,
236                    buffer_idx: child_buffer_idx,
237                }
238                .try_write_flatbuffer(fbb)?;
239
240                child_buffer_idx = u16::try_from(child.nbuffers_recursive())
241                    .ok()
242                    .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
243                    .ok_or_else(|| vortex_err!("Too many buffers (u16) for Array"))?;
244
245                Ok(msg)
246            })
247            .collect::<VortexResult<Vec<_>>>()?;
248        let children = Some(fbb.create_vector(&children));
249
250        let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
251        let stats = Some(self.array.statistics().write_flatbuffer(fbb)?);
252
253        Ok(fba::ArrayNode::create(
254            fbb,
255            &fba::ArrayNodeArgs {
256                encoding: encoding_idx,
257                metadata,
258                children,
259                buffers,
260                stats,
261            },
262        ))
263    }
264}
265
266/// To minimize the serialized form, arrays do not persist their own dtype and length. Instead,
267/// parent arrays pass this information down during deserialization.
268pub trait ArrayChildren {
269    /// Returns the nth child of the array with the given dtype and length.
270    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
271
272    /// The number of children.
273    fn len(&self) -> usize;
274
275    /// Returns true if there are no children.
276    fn is_empty(&self) -> bool {
277        self.len() == 0
278    }
279}
280
281impl<T: AsRef<[ArrayRef]>> ArrayChildren for T {
282    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
283        let array = self.as_ref()[index].clone();
284        assert_eq!(array.len(), len);
285        assert_eq!(array.dtype(), dtype);
286        Ok(array)
287    }
288
289    fn len(&self) -> usize {
290        self.as_ref().len()
291    }
292}
293
294/// [`SerializedArray`] represents a parsed but not-yet-decoded deserialized array.
295/// It contains all the information from the serialized form, without anything extra. i.e.
296/// it is missing a [`DType`] and `len`, and the `encoding_id` is not yet resolved to a concrete
297/// vtable.
298///
299/// An [`SerializedArray`] can be fully decoded into an [`ArrayRef`] using the `decode` function.
300#[derive(Clone)]
301pub struct SerializedArray {
302    // Typed as fb::ArrayNode
303    flatbuffer: FlatBuffer,
304    // The location of the current fb::ArrayNode
305    flatbuffer_loc: usize,
306    buffers: Arc<[BufferHandle]>,
307}
308
309impl Debug for SerializedArray {
310    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
311        f.debug_struct("SerializedArray")
312            .field("encoding_id", &self.encoding_id())
313            .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
314            .field(
315                "buffers",
316                &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
317            )
318            .field("metadata", &self.metadata())
319            .finish()
320    }
321}
322
323impl SerializedArray {
324    /// Decode an [`SerializedArray`] into an [`ArrayRef`].
325    pub fn decode(
326        &self,
327        dtype: &DType,
328        len: usize,
329        ctx: &ReadContext,
330        session: &VortexSession,
331    ) -> VortexResult<ArrayRef> {
332        let encoding_idx = self.flatbuffer().encoding();
333        let encoding_id = ctx
334            .resolve(encoding_idx)
335            .ok_or_else(|| vortex_err!("Unknown encoding index: {}", encoding_idx))?;
336        let Some(plugin) = session.arrays().registry().find(&encoding_id) else {
337            if session.allows_unknown() {
338                return self.decode_foreign(encoding_id, dtype, len, ctx);
339            }
340            return Err(vortex_err!("Unknown encoding: {}", encoding_id));
341        };
342
343        let children = SerializedArrayChildren {
344            ser: self,
345            ctx,
346            session,
347        };
348
349        let buffers = self.collect_buffers()?;
350
351        let decoded =
352            plugin.deserialize(dtype, len, self.metadata(), &buffers, &children, session)?;
353
354        assert_eq!(
355            decoded.len(),
356            len,
357            "Array decoded from {} has incorrect length {}, expected {}",
358            encoding_id,
359            decoded.len(),
360            len
361        );
362        assert_eq!(
363            decoded.dtype(),
364            dtype,
365            "Array decoded from {} has incorrect dtype {}, expected {}",
366            encoding_id,
367            decoded.dtype(),
368            dtype,
369        );
370        assert_eq!(
371            decoded.encoding_id(),
372            encoding_id,
373            "Array decoded from {} has incorrect encoding {}",
374            encoding_id,
375            decoded.encoding_id(),
376        );
377
378        // Populate statistics from the serialized array.
379        if let Some(stats) = self.flatbuffer().stats() {
380            decoded
381                .statistics()
382                .set_iter(StatsSet::from_flatbuffer(&stats, dtype, session)?.into_iter());
383        }
384
385        Ok(decoded)
386    }
387
388    fn decode_foreign(
389        &self,
390        encoding_id: crate::array::ArrayId,
391        dtype: &DType,
392        len: usize,
393        ctx: &ReadContext,
394    ) -> VortexResult<ArrayRef> {
395        let children = (0..self.nchildren())
396            .map(|idx| {
397                let child = self.child(idx);
398                let child_encoding_idx = child.flatbuffer().encoding();
399                let child_encoding_id = ctx
400                    .resolve(child_encoding_idx)
401                    .ok_or_else(|| vortex_err!("Unknown encoding index: {}", child_encoding_idx))?;
402                child.decode_foreign(child_encoding_id, dtype, len, ctx)
403            })
404            .collect::<VortexResult<Vec<_>>>()?;
405
406        new_foreign_array(
407            encoding_id,
408            dtype.clone(),
409            len,
410            self.metadata().to_vec(),
411            self.collect_buffers()?.into_owned(),
412            children,
413        )
414    }
415
416    /// Returns the array encoding.
417    pub fn encoding_id(&self) -> u16 {
418        self.flatbuffer().encoding()
419    }
420
421    /// Returns the array metadata bytes.
422    pub fn metadata(&self) -> &[u8] {
423        self.flatbuffer()
424            .metadata()
425            .map(|metadata| metadata.bytes())
426            .unwrap_or(&[])
427    }
428
429    /// Returns the number of children.
430    pub fn nchildren(&self) -> usize {
431        self.flatbuffer()
432            .children()
433            .map_or(0, |children| children.len())
434    }
435
436    /// Returns the nth child of the array.
437    pub fn child(&self, idx: usize) -> SerializedArray {
438        let children = self
439            .flatbuffer()
440            .children()
441            .vortex_expect("Expected array to have children");
442        if idx >= children.len() {
443            vortex_panic!(
444                "Invalid child index {} for array with {} children",
445                idx,
446                children.len()
447            );
448        }
449        self.with_root(children.get(idx))
450    }
451
452    /// Returns the number of buffers.
453    pub fn nbuffers(&self) -> usize {
454        self.flatbuffer()
455            .buffers()
456            .map_or(0, |buffers| buffers.len())
457    }
458
459    /// Returns the nth buffer of the current array.
460    pub fn buffer(&self, idx: usize) -> VortexResult<BufferHandle> {
461        let buffer_idx = self
462            .flatbuffer()
463            .buffers()
464            .ok_or_else(|| vortex_err!("Array has no buffers"))?
465            .get(idx);
466        self.buffers
467            .get(buffer_idx as usize)
468            .cloned()
469            .ok_or_else(|| {
470                vortex_err!(
471                    "Invalid buffer index {} for array with {} buffers",
472                    buffer_idx,
473                    self.nbuffers()
474                )
475            })
476    }
477
478    /// Returns all buffers for the current array node.
479    ///
480    /// If buffer indices are contiguous, returns a zero-copy borrowed slice.
481    /// Otherwise falls back to collecting each buffer individually.
482    fn collect_buffers(&self) -> VortexResult<Cow<'_, [BufferHandle]>> {
483        let Some(fb_buffers) = self.flatbuffer().buffers() else {
484            return Ok(Cow::Borrowed(&[]));
485        };
486        let count = fb_buffers.len();
487        if count == 0 {
488            return Ok(Cow::Borrowed(&[]));
489        }
490        let start = fb_buffers.get(0) as usize;
491        let contiguous = fb_buffers
492            .iter()
493            .enumerate()
494            .all(|(i, idx)| idx as usize == start + i);
495        if contiguous {
496            self.buffers.get(start..start + count).map_or_else(
497                || {
498                    vortex_bail!(
499                        "buffer indices {}..{} out of range for {} buffers",
500                        start,
501                        start + count,
502                        self.buffers.len()
503                    )
504                },
505                |slice| Ok(Cow::Borrowed(slice)),
506            )
507        } else {
508            (0..count)
509                .map(|idx| self.buffer(idx))
510                .collect::<VortexResult<Vec<_>>>()
511                .map(Cow::Owned)
512        }
513    }
514
515    /// Returns the buffer lengths as stored in the flatbuffer metadata.
516    ///
517    /// This reads the buffer descriptors from the flatbuffer, which contain the
518    /// serialized length of each buffer. This is useful for displaying buffer sizes
519    /// without needing to access the actual buffer data.
520    pub fn buffer_lengths(&self) -> Vec<usize> {
521        let fb_array = root::<fba::Array>(self.flatbuffer.as_ref())
522            .vortex_expect("SerializedArray flatbuffer must be a valid Array");
523        fb_array
524            .buffers()
525            .map(|buffers| buffers.iter().map(|b| b.length() as usize).collect())
526            .unwrap_or_default()
527    }
528
529    /// Validate and align the array tree flatbuffer, returning the aligned buffer and root location.
530    fn validate_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<(FlatBuffer, usize)> {
531        let fb_buffer = FlatBuffer::align_from(array_tree.into());
532        let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
533        let fb_root = fb_array
534            .root()
535            .ok_or_else(|| vortex_err!("Array must have a root node"))?;
536        let flatbuffer_loc = fb_root._tab.loc();
537        Ok((fb_buffer, flatbuffer_loc))
538    }
539
540    /// Create an [`SerializedArray`] from a pre-existing array tree flatbuffer and pre-resolved buffer
541    /// handles.
542    ///
543    /// The caller is responsible for resolving buffers from whatever source (device segments, host
544    /// overrides, or a mix). The buffers must be in the same order as the `Array.buffers` descriptor
545    /// list in the flatbuffer.
546    pub fn from_flatbuffer_with_buffers(
547        array_tree: impl Into<ByteBuffer>,
548        buffers: Vec<BufferHandle>,
549    ) -> VortexResult<Self> {
550        let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
551        Ok(SerializedArray {
552            flatbuffer,
553            flatbuffer_loc,
554            buffers: buffers.into(),
555        })
556    }
557
558    /// Create an [`SerializedArray`] from a raw array tree flatbuffer (metadata only).
559    ///
560    /// This constructor creates a `SerializedArray` with no buffer data, useful for
561    /// inspecting the metadata when the actual buffer data is not needed
562    /// (e.g., displaying buffer sizes from inlined array tree metadata).
563    ///
564    /// Note: Calling `buffer()` on the returned `SerializedArray` will fail since
565    /// no actual buffer data is available.
566    pub fn from_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<Self> {
567        let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
568        Ok(SerializedArray {
569            flatbuffer,
570            flatbuffer_loc,
571            buffers: Arc::new([]),
572        })
573    }
574
575    /// Returns the root ArrayNode flatbuffer.
576    fn flatbuffer(&self) -> fba::ArrayNode<'_> {
577        unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
578    }
579
580    /// Returns a new [`SerializedArray`] with the given node as the root
581    // TODO(ngates): we may want a wrapper that avoids this clone.
582    fn with_root(&self, root: fba::ArrayNode) -> Self {
583        let mut this = self.clone();
584        this.flatbuffer_loc = root._tab.loc();
585        this
586    }
587
588    /// Create an [`SerializedArray`] from a pre-existing flatbuffer (ArrayNode) and a segment containing
589    /// only the data buffers (without the flatbuffer suffix).
590    ///
591    /// This is used when the flatbuffer is stored separately in layout metadata (e.g., when
592    /// `FLAT_LAYOUT_INLINE_ARRAY_NODE` is enabled).
593    pub fn from_flatbuffer_and_segment(
594        array_tree: ByteBuffer,
595        segment: BufferHandle,
596    ) -> VortexResult<Self> {
597        // HashMap::new doesn't allocate when empty, so this has no overhead
598        Self::from_flatbuffer_and_segment_with_overrides(array_tree, segment, &HashMap::new())
599    }
600
601    /// Create an [`SerializedArray`] from a pre-existing flatbuffer (ArrayNode) and a segment,
602    /// substituting host-resident buffer overrides for specific buffer indices.
603    ///
604    /// Buffers whose index appears in `buffer_overrides` are resolved from the provided
605    /// host data instead of the segment. All other buffers are sliced from the segment
606    /// using the padding and alignment described in the flatbuffer.
607    pub fn from_flatbuffer_and_segment_with_overrides(
608        array_tree: ByteBuffer,
609        segment: BufferHandle,
610        buffer_overrides: &HashMap<u32, ByteBuffer>,
611    ) -> VortexResult<Self> {
612        // We align each buffer individually, so we remove alignment requirements on the segment
613        // for host-resident buffers. Device buffers are sliced directly.
614        let segment = segment.ensure_aligned(Alignment::none())?;
615
616        // this can't return the validated array because there is no lifetime to give it, so we
617        // need to cast it below, which is safe.
618        let (fb_buffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
619        // SAFETY: fb_buffer was already validated by validate_array_tree above.
620        let fb_array = unsafe { fba::root_as_array_unchecked(fb_buffer.as_ref()) };
621
622        let mut offset = 0;
623        let buffers = fb_array
624            .buffers()
625            .unwrap_or_default()
626            .iter()
627            .enumerate()
628            .map(|(idx, fb_buf)| {
629                offset += fb_buf.padding() as usize;
630                let buffer_len = fb_buf.length() as usize;
631                let alignment = Alignment::from_exponent(fb_buf.alignment_exponent());
632
633                let idx = u32::try_from(idx).vortex_expect("buffer count must fit in u32");
634                let handle = if let Some(host_data) = buffer_overrides.get(&idx) {
635                    BufferHandle::new_host(host_data.clone()).ensure_aligned(alignment)?
636                } else {
637                    let buffer = segment.slice(offset..(offset + buffer_len));
638                    buffer.ensure_aligned(alignment)?
639                };
640
641                offset += buffer_len;
642                Ok(handle)
643            })
644            .collect::<VortexResult<Arc<[_]>>>()?;
645
646        Ok(SerializedArray {
647            flatbuffer: fb_buffer,
648            flatbuffer_loc,
649            buffers,
650        })
651    }
652}
653
654struct SerializedArrayChildren<'a> {
655    ser: &'a SerializedArray,
656    ctx: &'a ReadContext,
657    session: &'a VortexSession,
658}
659
660impl ArrayChildren for SerializedArrayChildren<'_> {
661    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
662        self.ser
663            .child(index)
664            .decode(dtype, len, self.ctx, self.session)
665    }
666
667    fn len(&self) -> usize {
668        self.ser.nchildren()
669    }
670}
671
672impl TryFrom<ByteBuffer> for SerializedArray {
673    type Error = VortexError;
674
675    fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
676        // The final 4 bytes contain the length of the flatbuffer.
677        if value.len() < 4 {
678            vortex_bail!("SerializedArray buffer is too short");
679        }
680
681        // We align each buffer individually, so we remove alignment requirements on the buffer.
682        let value = value.aligned(Alignment::none());
683
684        let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
685        if value.len() < 4 + fb_length {
686            vortex_bail!("SerializedArray buffer is too short for flatbuffer");
687        }
688
689        let fb_offset = value.len() - 4 - fb_length;
690        let array_tree = value.slice(fb_offset..fb_offset + fb_length);
691        let segment = BufferHandle::new_host(value.slice(0..fb_offset));
692
693        Self::from_flatbuffer_and_segment(array_tree, segment)
694    }
695}
696
697impl TryFrom<BufferHandle> for SerializedArray {
698    type Error = VortexError;
699
700    fn try_from(value: BufferHandle) -> Result<Self, Self::Error> {
701        Self::try_from(value.try_to_host_sync()?)
702    }
703}
704
705#[cfg(test)]
706mod tests {
707    use std::sync::LazyLock;
708
709    use flatbuffers::FlatBufferBuilder;
710    use vortex_session::VortexSession;
711    use vortex_session::registry::ReadContext;
712
713    use super::SerializeOptions;
714    use super::SerializedArray;
715    use crate::ArrayContext;
716    use crate::array::ArrayId;
717    use crate::dtype::DType;
718    use crate::dtype::Nullability;
719    use crate::flatbuffers as fba;
720    use crate::session::ArraySession;
721
722    static SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::empty);
723
724    #[test]
725    fn unknown_array_encoding_allow_unknown() {
726        let mut fbb = FlatBufferBuilder::new();
727
728        let child_metadata = fbb.create_vector(&[9u8]);
729        let child = fba::ArrayNode::create(
730            &mut fbb,
731            &fba::ArrayNodeArgs {
732                encoding: 1,
733                metadata: Some(child_metadata),
734                children: None,
735                buffers: None,
736                stats: None,
737            },
738        );
739
740        let children = fbb.create_vector(&[child]);
741        let metadata = fbb.create_vector(&[1u8, 2, 3]);
742        let root = fba::ArrayNode::create(
743            &mut fbb,
744            &fba::ArrayNodeArgs {
745                encoding: 0,
746                metadata: Some(metadata),
747                children: Some(children),
748                buffers: None,
749                stats: None,
750            },
751        );
752        let array = fba::Array::create(
753            &mut fbb,
754            &fba::ArrayArgs {
755                root: Some(root),
756                buffers: None,
757            },
758        );
759        fbb.finish_minimal(array);
760        let (buf, start) = fbb.collapse();
761        let tree = vortex_buffer::ByteBuffer::from(buf).slice(start..);
762
763        let ser = SerializedArray::from_array_tree(tree).unwrap();
764        let ctx = ReadContext::new([
765            ArrayId::new_ref("vortex.test.foreign_array"),
766            ArrayId::new_ref("vortex.test.foreign_child"),
767        ]);
768        let session = VortexSession::empty()
769            .with::<ArraySession>()
770            .allow_unknown();
771
772        let decoded = ser
773            .decode(&DType::Variant(Nullability::Nullable), 5, &ctx, &session)
774            .unwrap();
775        assert_eq!(decoded.encoding_id().as_ref(), "vortex.test.foreign_array");
776        assert_eq!(decoded.nchildren(), 1);
777        assert_eq!(
778            decoded.nth_child(0).unwrap().encoding_id().as_ref(),
779            "vortex.test.foreign_child"
780        );
781        assert_eq!(decoded.metadata(&SESSION).unwrap().unwrap(), vec![1, 2, 3]);
782        assert_eq!(
783            decoded
784                .nth_child(0)
785                .unwrap()
786                .metadata(&SESSION)
787                .unwrap()
788                .unwrap(),
789            vec![9]
790        );
791
792        let serialized = decoded
793            .serialize(
794                &ArrayContext::default(),
795                &SESSION,
796                &SerializeOptions::default(),
797            )
798            .unwrap();
799        assert!(!serialized.is_empty());
800    }
801}