vortex_array/
serde.rs

1use std::fmt::{Debug, Formatter};
2use std::iter;
3use std::sync::Arc;
4
5use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset, root};
6use itertools::Itertools;
7use vortex_buffer::{Alignment, ByteBuffer};
8use vortex_dtype::{DType, TryFromBytes};
9use vortex_error::{
10    VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
11};
12use vortex_flatbuffers::array::Compression;
13use vortex_flatbuffers::{
14    FlatBuffer, FlatBufferRoot, ReadFlatBuffer, WriteFlatBuffer, array as fba,
15};
16
17use crate::stats::StatsSet;
18use crate::{Array, ArrayContext, ArrayRef, ArrayVisitor, ArrayVisitorExt};
19
20/// Options for serializing an array.
21#[derive(Default, Debug)]
22pub struct SerializeOptions {
23    /// The starting position within an external stream or file. This offset is used to compute
24    /// appropriate padding to enable zero-copy reads.
25    pub offset: usize,
26    /// Whether to include sufficient zero-copy padding.
27    pub include_padding: bool,
28}
29
30impl dyn Array + '_ {
31    /// Serialize the array into a sequence of byte buffers that should be written contiguously.
32    /// This function returns a vec to avoid copying data buffers.
33    ///
34    /// Optionally, padding can be included to guarantee buffer alignment and ensure zero-copy
35    /// reads within the context of an external file or stream. In this case, the alignment of
36    /// the first byte buffer should be respected when writing the buffers to the stream or file.
37    ///
38    /// The format of this blob is a sequence of data buffers, possible with prefixed padding,
39    /// followed by a flatbuffer containing an [`fba::Array`] message, and ending with a
40    /// little-endian u32 describing the length of the flatbuffer message.
41    pub fn serialize(&self, ctx: &ArrayContext, options: &SerializeOptions) -> Vec<ByteBuffer> {
42        // Collect all array buffers
43        let mut array_buffers = vec![];
44        for a in self.depth_first_traversal() {
45            for buffer in a.buffers() {
46                array_buffers.push(buffer);
47            }
48        }
49
50        // Allocate result buffers, including a possible padding buffer for each.
51        let mut buffers = vec![];
52        let mut fb_buffers = Vec::with_capacity(buffers.capacity());
53
54        // If we're including padding, we need to find the maximum required buffer alignment.
55        let max_alignment = array_buffers
56            .iter()
57            .map(|buf| buf.alignment())
58            .chain(iter::once(FlatBuffer::alignment()))
59            .max()
60            .unwrap_or_else(FlatBuffer::alignment);
61
62        // Create a shared buffer of zeros we can use for padding
63        let zeros = ByteBuffer::zeroed(*max_alignment);
64
65        // We push an empty buffer with the maximum alignment, so then subsequent buffers
66        // will be aligned. For subsequent buffers, we always push a 1-byte alignment.
67        buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
68
69        // Keep track of where we are in the "file" to calculate padding.
70        let mut pos = options.offset;
71
72        // Push all the array buffers with padding as necessary.
73        for buffer in array_buffers {
74            let padding = if options.include_padding {
75                let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
76                if padding > 0 {
77                    pos += padding;
78                    buffers.push(zeros.slice(0..padding));
79                }
80                padding
81            } else {
82                0
83            };
84
85            fb_buffers.push(fba::Buffer::new(
86                u16::try_from(padding).vortex_expect("padding fits into u16"),
87                buffer.alignment().exponent(),
88                Compression::None,
89                u32::try_from(buffer.len()).vortex_expect("buffers fit into u32"),
90            ));
91
92            pos += buffer.len();
93            buffers.push(buffer.aligned(Alignment::none()));
94        }
95
96        // Set up the flatbuffer builder
97        let mut fbb = FlatBufferBuilder::new();
98        let root = ArrayNodeFlatBuffer::new(ctx, self);
99        let fb_root = root.write_flatbuffer(&mut fbb);
100        let fb_buffers = fbb.create_vector(&fb_buffers);
101        let fb_array = fba::Array::create(
102            &mut fbb,
103            &fba::ArrayArgs {
104                root: Some(fb_root),
105                buffers: Some(fb_buffers),
106            },
107        );
108        fbb.finish_minimal(fb_array);
109        let (fb_vec, fb_start) = fbb.collapse();
110        let fb_end = fb_vec.len();
111        let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
112        let fb_length = fb_buffer.len();
113
114        if options.include_padding {
115            let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
116            if padding > 0 {
117                buffers.push(zeros.slice(0..padding));
118            }
119        }
120        buffers.push(fb_buffer);
121
122        // Finally, we write down the u32 length for the flatbuffer.
123        buffers.push(ByteBuffer::from(
124            u32::try_from(fb_length)
125                .vortex_expect("u32 fits into usize")
126                .to_le_bytes()
127                .to_vec(),
128        ));
129
130        buffers
131    }
132}
133
134/// A utility struct for creating an [`fba::ArrayNode`] flatbuffer.
135pub struct ArrayNodeFlatBuffer<'a> {
136    ctx: &'a ArrayContext,
137    array: &'a dyn Array,
138    buffer_idx: u16,
139}
140
141impl<'a> ArrayNodeFlatBuffer<'a> {
142    pub fn new(ctx: &'a ArrayContext, array: &'a dyn Array) -> Self {
143        Self {
144            ctx,
145            array,
146            buffer_idx: 0,
147        }
148    }
149}
150
151impl FlatBufferRoot for ArrayNodeFlatBuffer<'_> {}
152
153impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
154    type Target<'t> = fba::ArrayNode<'t>;
155
156    fn write_flatbuffer<'fb>(
157        &self,
158        fbb: &mut FlatBufferBuilder<'fb>,
159    ) -> WIPOffset<Self::Target<'fb>> {
160        let encoding = self.ctx.encoding_idx(&self.array.vtable());
161        let metadata = self
162            .array
163            .metadata()
164            .map(|bytes| fbb.create_vector(bytes.as_slice()));
165
166        // Assign buffer indices for all child arrays.
167        let nbuffers = u16::try_from(self.array.nbuffers())
168            .vortex_expect("Array can have at most u16::MAX buffers");
169        let child_buffer_idx = self.buffer_idx + nbuffers;
170
171        let children = self
172            .array
173            .children()
174            .iter()
175            .scan(child_buffer_idx, |buffer_idx, child| {
176                // Update the number of buffers required.
177                let msg = ArrayNodeFlatBuffer {
178                    ctx: self.ctx,
179                    array: child,
180                    buffer_idx: *buffer_idx,
181                }
182                .write_flatbuffer(fbb);
183                *buffer_idx = u16::try_from(child.nbuffers_recursive())
184                    .ok()
185                    .and_then(|nbuffers| nbuffers.checked_add(*buffer_idx))
186                    .vortex_expect("Too many buffers (u16) for Array");
187                Some(msg)
188            })
189            .collect_vec();
190        let children = Some(fbb.create_vector(&children));
191
192        let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
193        let stats = Some(self.array.statistics().to_owned().write_flatbuffer(fbb));
194
195        fba::ArrayNode::create(
196            fbb,
197            &fba::ArrayNodeArgs {
198                encoding,
199                metadata,
200                children,
201                buffers,
202                stats,
203            },
204        )
205    }
206}
207
208/// [`ArrayParts`] represents the information from an [`ArrayRef`] that makes up the serialized
209/// form. For example, it uses stores integer encoding IDs rather than a reference to an encoding
210/// vtable, and it doesn't store any [`DType`] information.
211///
212/// An [`ArrayParts`] can be fully decoded into an [`ArrayRef`] using the `decode` function.
213#[derive(Clone)]
214pub struct ArrayParts {
215    // Typed as fb::ArrayNode
216    flatbuffer: FlatBuffer,
217    // The location of the current fb::ArrayNode
218    flatbuffer_loc: usize,
219    buffers: Arc<[ByteBuffer]>,
220}
221
222impl Debug for ArrayParts {
223    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
224        f.debug_struct("ArrayParts")
225            .field("encoding_id", &self.encoding_id())
226            .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
227            .field(
228                "buffers",
229                &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
230            )
231            .field("metadata", &self.metadata())
232            .finish()
233    }
234}
235
236impl ArrayParts {
237    /// Decode an [`ArrayParts`] into an [`ArrayRef`].
238    pub fn decode(&self, ctx: &ArrayContext, dtype: DType, len: usize) -> VortexResult<ArrayRef> {
239        let encoding_id = self.flatbuffer().encoding();
240        let vtable = ctx
241            .lookup_encoding(encoding_id)
242            .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
243        let decoded = vtable.decode(self, ctx, dtype, len)?;
244        assert_eq!(
245            decoded.len(),
246            len,
247            "Array decoded from {} has incorrect length {}, expected {}",
248            vtable.id(),
249            decoded.len(),
250            len
251        );
252        assert_eq!(
253            decoded.encoding(),
254            vtable.id(),
255            "Array decoded from {} has incorrect encoding {}",
256            vtable.id(),
257            decoded.encoding(),
258        );
259
260        // Populate statistics from the serialized array.
261        if let Some(stats) = self.flatbuffer().stats() {
262            let decoded_statistics = decoded.statistics();
263            StatsSet::read_flatbuffer(&stats)?
264                .into_iter()
265                .for_each(|(stat, val)| decoded_statistics.set(stat, val));
266        }
267
268        Ok(decoded)
269    }
270
271    /// Returns the array encoding.
272    pub fn encoding_id(&self) -> u16 {
273        self.flatbuffer().encoding()
274    }
275
276    /// Returns the array metadata bytes.
277    pub fn metadata(&self) -> Option<&[u8]> {
278        self.flatbuffer()
279            .metadata()
280            .map(|metadata| metadata.bytes())
281    }
282
283    /// Returns the number of children.
284    pub fn nchildren(&self) -> usize {
285        self.flatbuffer()
286            .children()
287            .map_or(0, |children| children.len())
288    }
289
290    /// Returns the nth child of the array.
291    pub fn child(&self, idx: usize) -> ArrayParts {
292        let children = self
293            .flatbuffer()
294            .children()
295            .vortex_expect("Expected array to have children");
296        if idx >= children.len() {
297            vortex_panic!(
298                "Invalid child index {} for array with {} children",
299                idx,
300                children.len()
301            );
302        }
303        self.with_root(children.get(idx))
304    }
305
306    /// Iterate the children of this array.
307    pub fn children(&self) -> Vec<ArrayParts> {
308        self.flatbuffer()
309            .children()
310            .iter()
311            .flat_map(|children| children.iter())
312            .map(move |child| self.with_root(child))
313            .collect()
314    }
315
316    /// Returns the number of buffers.
317    pub fn nbuffers(&self) -> usize {
318        self.flatbuffer()
319            .buffers()
320            .map_or(0, |buffers| buffers.len())
321    }
322
323    /// Returns the nth buffer of the current array.
324    pub fn buffer(&self, idx: usize) -> VortexResult<ByteBuffer> {
325        let buffer_idx = self
326            .flatbuffer()
327            .buffers()
328            .ok_or_else(|| vortex_err!("Array has no buffers"))?
329            .get(idx);
330        self.buffers
331            .get(buffer_idx as usize)
332            .cloned()
333            .ok_or_else(|| {
334                vortex_err!(
335                    "Invalid buffer index {} for array with {} buffers",
336                    buffer_idx,
337                    self.nbuffers()
338                )
339            })
340    }
341
342    /// Returns the root ArrayNode flatbuffer.
343    fn flatbuffer(&self) -> fba::ArrayNode {
344        unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
345    }
346
347    /// Returns a new [`ArrayParts`] with the given node as the root
348    // TODO(ngates): we may want a wrapper that avoids this clone.
349    fn with_root(&self, root: fba::ArrayNode) -> Self {
350        let mut this = self.clone();
351        this.flatbuffer_loc = root._tab.loc();
352        this
353    }
354}
355
356impl TryFrom<ByteBuffer> for ArrayParts {
357    type Error = VortexError;
358
359    fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
360        // The final 4 bytes contain the length of the flatbuffer.
361        if value.len() < 4 {
362            vortex_bail!("ArrayParts buffer is too short");
363        }
364
365        // We align each buffer individually, so we remove alignment requirements on the buffer.
366        let value = value.aligned(Alignment::none());
367
368        let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
369        let fb_offset = value.len() - 4 - fb_length;
370        let fb_buffer = value.slice(fb_offset..fb_offset + fb_length);
371        let fb_buffer = FlatBuffer::align_from(fb_buffer);
372
373        let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
374        let fb_root = fb_array.root().vortex_expect("Array must have a root node");
375
376        let mut offset = 0;
377        let buffers: Arc<[ByteBuffer]> = fb_array
378            .buffers()
379            .unwrap_or_default()
380            .iter()
381            .map(|fb_buffer| {
382                // Skip padding
383                offset += fb_buffer.padding() as usize;
384
385                let buffer_len = fb_buffer.length() as usize;
386
387                // Extract a buffer and ensure it's aligned, copying if necessary
388                let buffer = value
389                    .slice(offset..(offset + buffer_len))
390                    .aligned(Alignment::from_exponent(fb_buffer.alignment_exponent()));
391
392                offset += buffer_len;
393                buffer
394            })
395            .collect();
396
397        Ok(ArrayParts {
398            flatbuffer: fb_buffer.clone(),
399            flatbuffer_loc: fb_root._tab.loc(),
400            buffers,
401        })
402    }
403}