vortex_array/
serde.rs

1use std::fmt::{Debug, Formatter};
2use std::iter;
3use std::sync::Arc;
4
5use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset, root};
6use itertools::Itertools;
7use vortex_buffer::{Alignment, ByteBuffer};
8use vortex_dtype::{DType, TryFromBytes};
9use vortex_error::{
10    VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
11};
12use vortex_flatbuffers::array::Compression;
13use vortex_flatbuffers::{
14    FlatBuffer, FlatBufferRoot, ReadFlatBuffer, WriteFlatBuffer, array as fba,
15};
16
17use crate::stats::StatsSet;
18use crate::{Array, ArrayContext, ArrayRef, ArrayVisitor, ArrayVisitorExt};
19
20/// Options for serializing an array.
21#[derive(Default, Debug)]
22pub struct SerializeOptions {
23    /// The starting position within an external stream or file. This offset is used to compute
24    /// appropriate padding to enable zero-copy reads.
25    pub offset: usize,
26    /// Whether to include sufficient zero-copy padding.
27    pub include_padding: bool,
28}
29
30impl dyn Array + '_ {
31    /// Serialize the array into a sequence of byte buffers that should be written contiguously.
32    /// This function returns a vec to avoid copying data buffers.
33    ///
34    /// Optionally, padding can be included to guarantee buffer alignment and ensure zero-copy
35    /// reads within the context of an external file or stream. In this case, the alignment of
36    /// the first byte buffer should be respected when writing the buffers to the stream or file.
37    ///
38    /// The format of this blob is a sequence of data buffers, possible with prefixed padding,
39    /// followed by a flatbuffer containing an [`fba::Array`] message, and ending with a
40    /// little-endian u32 describing the length of the flatbuffer message.
41    pub fn serialize(&self, ctx: &ArrayContext, options: &SerializeOptions) -> Vec<ByteBuffer> {
42        // Collect all array buffers
43        let mut array_buffers = vec![];
44        for a in self.depth_first_traversal() {
45            for buffer in a.buffers() {
46                array_buffers.push(buffer);
47            }
48        }
49
50        // Allocate result buffers, including a possible padding buffer for each.
51        let mut buffers = vec![];
52        let mut fb_buffers = Vec::with_capacity(buffers.capacity());
53
54        // If we're including padding, we need to find the maximum required buffer alignment.
55        let max_alignment = array_buffers
56            .iter()
57            .map(|buf| buf.alignment())
58            .chain(iter::once(FlatBuffer::alignment()))
59            .max()
60            .unwrap_or_else(FlatBuffer::alignment);
61
62        // Create a shared buffer of zeros we can use for padding
63        let zeros = ByteBuffer::zeroed(*max_alignment);
64
65        // We push an empty buffer with the maximum alignment, so then subsequent buffers
66        // will be aligned. For subsequent buffers, we always push a 1-byte alignment.
67        buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
68
69        // Keep track of where we are in the "file" to calculate padding.
70        let mut pos = options.offset;
71
72        // Push all the array buffers with padding as necessary.
73        for buffer in array_buffers {
74            let padding = if options.include_padding {
75                let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
76                if padding > 0 {
77                    pos += padding;
78                    buffers.push(zeros.slice(0..padding));
79                }
80                padding
81            } else {
82                0
83            };
84
85            fb_buffers.push(fba::Buffer::new(
86                u16::try_from(padding).vortex_expect("padding fits into u16"),
87                buffer.alignment().exponent(),
88                Compression::None,
89                u32::try_from(buffer.len()).vortex_expect("buffers fit into u32"),
90            ));
91
92            pos += buffer.len();
93            buffers.push(buffer.aligned(Alignment::none()));
94        }
95
96        // Set up the flatbuffer builder
97        let mut fbb = FlatBufferBuilder::new();
98        let root = ArrayNodeFlatBuffer::new(ctx, self);
99        let fb_root = root.write_flatbuffer(&mut fbb);
100        let fb_buffers = fbb.create_vector(&fb_buffers);
101        let fb_array = fba::Array::create(
102            &mut fbb,
103            &fba::ArrayArgs {
104                root: Some(fb_root),
105                buffers: Some(fb_buffers),
106            },
107        );
108        fbb.finish_minimal(fb_array);
109        let (fb_vec, fb_start) = fbb.collapse();
110        let fb_end = fb_vec.len();
111        let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
112        let fb_length = fb_buffer.len();
113
114        if options.include_padding {
115            let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
116            if padding > 0 {
117                buffers.push(zeros.slice(0..padding));
118            }
119        }
120        buffers.push(fb_buffer);
121
122        // Finally, we write down the u32 length for the flatbuffer.
123        buffers.push(ByteBuffer::from(
124            u32::try_from(fb_length)
125                .vortex_expect("u32 fits into usize")
126                .to_le_bytes()
127                .to_vec(),
128        ));
129
130        buffers
131    }
132}
133
134/// A utility struct for creating an [`fba::ArrayNode`] flatbuffer.
135pub struct ArrayNodeFlatBuffer<'a> {
136    ctx: &'a ArrayContext,
137    array: &'a dyn Array,
138    buffer_idx: u16,
139}
140
141impl<'a> ArrayNodeFlatBuffer<'a> {
142    pub fn new(ctx: &'a ArrayContext, array: &'a dyn Array) -> Self {
143        Self {
144            ctx,
145            array,
146            buffer_idx: 0,
147        }
148    }
149}
150
151impl FlatBufferRoot for ArrayNodeFlatBuffer<'_> {}
152
153impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
154    type Target<'t> = fba::ArrayNode<'t>;
155
156    fn write_flatbuffer<'fb>(
157        &self,
158        fbb: &mut FlatBufferBuilder<'fb>,
159    ) -> WIPOffset<Self::Target<'fb>> {
160        let encoding = self.ctx.encoding_idx(&self.array.vtable());
161        let metadata = self
162            .array
163            .metadata()
164            .map(|bytes| fbb.create_vector(bytes.as_slice()));
165
166        // Assign buffer indices for all child arrays.
167        let nbuffers = u16::try_from(self.array.nbuffers())
168            .vortex_expect("Array can have at most u16::MAX buffers");
169        let child_buffer_idx = self.buffer_idx + nbuffers;
170
171        let children = self
172            .array
173            .children()
174            .iter()
175            .scan(child_buffer_idx, |buffer_idx, child| {
176                // Update the number of buffers required.
177                let msg = ArrayNodeFlatBuffer {
178                    ctx: self.ctx,
179                    array: child,
180                    buffer_idx: *buffer_idx,
181                }
182                .write_flatbuffer(fbb);
183                *buffer_idx = u16::try_from(child.nbuffers_recursive())
184                    .ok()
185                    .and_then(|nbuffers| nbuffers.checked_add(*buffer_idx))
186                    .vortex_expect("Too many buffers (u16) for Array");
187                Some(msg)
188            })
189            .collect_vec();
190        let children = Some(fbb.create_vector(&children));
191
192        let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
193        let stats = Some(self.array.statistics().to_owned().write_flatbuffer(fbb));
194
195        fba::ArrayNode::create(
196            fbb,
197            &fba::ArrayNodeArgs {
198                encoding,
199                metadata,
200                children,
201                buffers,
202                stats,
203            },
204        )
205    }
206}
207
208/// [`ArrayParts`] represents a parsed but not-yet-decoded deserialized [`Array`].
209/// It contains all the information from the serialized form, without anything extra. i.e.
210/// it is missing a [`DType`] and `len`, and the `encoding_id` is not yet resolved to a concrete
211/// vtable.
212///
213/// An [`ArrayParts`] can be fully decoded into an [`ArrayRef`] using the `decode` function.
214#[derive(Clone)]
215pub struct ArrayParts {
216    // Typed as fb::ArrayNode
217    flatbuffer: FlatBuffer,
218    // The location of the current fb::ArrayNode
219    flatbuffer_loc: usize,
220    buffers: Arc<[ByteBuffer]>,
221}
222
223impl Debug for ArrayParts {
224    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
225        f.debug_struct("ArrayParts")
226            .field("encoding_id", &self.encoding_id())
227            .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
228            .field(
229                "buffers",
230                &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
231            )
232            .field("metadata", &self.metadata())
233            .finish()
234    }
235}
236
237impl ArrayParts {
238    /// Decode an [`ArrayParts`] into an [`ArrayRef`].
239    pub fn decode(&self, ctx: &ArrayContext, dtype: DType, len: usize) -> VortexResult<ArrayRef> {
240        let encoding_id = self.flatbuffer().encoding();
241        let vtable = ctx
242            .lookup_encoding(encoding_id)
243            .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
244        let decoded = vtable.decode(self, ctx, dtype, len)?;
245        assert_eq!(
246            decoded.len(),
247            len,
248            "Array decoded from {} has incorrect length {}, expected {}",
249            vtable.id(),
250            decoded.len(),
251            len
252        );
253        assert_eq!(
254            decoded.encoding(),
255            vtable.id(),
256            "Array decoded from {} has incorrect encoding {}",
257            vtable.id(),
258            decoded.encoding(),
259        );
260
261        // Populate statistics from the serialized array.
262        if let Some(stats) = self.flatbuffer().stats() {
263            let decoded_statistics = decoded.statistics();
264            StatsSet::read_flatbuffer(&stats)?
265                .into_iter()
266                .for_each(|(stat, val)| decoded_statistics.set(stat, val));
267        }
268
269        Ok(decoded)
270    }
271
272    /// Returns the array encoding.
273    pub fn encoding_id(&self) -> u16 {
274        self.flatbuffer().encoding()
275    }
276
277    /// Returns the array metadata bytes.
278    pub fn metadata(&self) -> Option<&[u8]> {
279        self.flatbuffer()
280            .metadata()
281            .map(|metadata| metadata.bytes())
282    }
283
284    /// Returns the number of children.
285    pub fn nchildren(&self) -> usize {
286        self.flatbuffer()
287            .children()
288            .map_or(0, |children| children.len())
289    }
290
291    /// Returns the nth child of the array.
292    pub fn child(&self, idx: usize) -> ArrayParts {
293        let children = self
294            .flatbuffer()
295            .children()
296            .vortex_expect("Expected array to have children");
297        if idx >= children.len() {
298            vortex_panic!(
299                "Invalid child index {} for array with {} children",
300                idx,
301                children.len()
302            );
303        }
304        self.with_root(children.get(idx))
305    }
306
307    /// Returns the number of buffers.
308    pub fn nbuffers(&self) -> usize {
309        self.flatbuffer()
310            .buffers()
311            .map_or(0, |buffers| buffers.len())
312    }
313
314    /// Returns the nth buffer of the current array.
315    pub fn buffer(&self, idx: usize) -> VortexResult<ByteBuffer> {
316        let buffer_idx = self
317            .flatbuffer()
318            .buffers()
319            .ok_or_else(|| vortex_err!("Array has no buffers"))?
320            .get(idx);
321        self.buffers
322            .get(buffer_idx as usize)
323            .cloned()
324            .ok_or_else(|| {
325                vortex_err!(
326                    "Invalid buffer index {} for array with {} buffers",
327                    buffer_idx,
328                    self.nbuffers()
329                )
330            })
331    }
332
333    /// Returns the root ArrayNode flatbuffer.
334    fn flatbuffer(&self) -> fba::ArrayNode {
335        unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
336    }
337
338    /// Returns a new [`ArrayParts`] with the given node as the root
339    // TODO(ngates): we may want a wrapper that avoids this clone.
340    fn with_root(&self, root: fba::ArrayNode) -> Self {
341        let mut this = self.clone();
342        this.flatbuffer_loc = root._tab.loc();
343        this
344    }
345}
346
347impl TryFrom<ByteBuffer> for ArrayParts {
348    type Error = VortexError;
349
350    fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
351        // The final 4 bytes contain the length of the flatbuffer.
352        if value.len() < 4 {
353            vortex_bail!("ArrayParts buffer is too short");
354        }
355
356        // We align each buffer individually, so we remove alignment requirements on the buffer.
357        let value = value.aligned(Alignment::none());
358
359        let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
360        if value.len() < 4 + fb_length {
361            vortex_bail!("ArrayParts buffer is too short for flatbuffer");
362        }
363
364        let fb_offset = value.len() - 4 - fb_length;
365        let fb_buffer = value.slice(fb_offset..fb_offset + fb_length);
366        let fb_buffer = FlatBuffer::align_from(fb_buffer);
367
368        let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
369        let fb_root = fb_array.root().vortex_expect("Array must have a root node");
370
371        let mut offset = 0;
372        let buffers: Arc<[ByteBuffer]> = fb_array
373            .buffers()
374            .unwrap_or_default()
375            .iter()
376            .map(|fb_buffer| {
377                // Skip padding
378                offset += fb_buffer.padding() as usize;
379
380                let buffer_len = fb_buffer.length() as usize;
381
382                // Extract a buffer and ensure it's aligned, copying if necessary
383                let buffer = value
384                    .slice(offset..(offset + buffer_len))
385                    .aligned(Alignment::from_exponent(fb_buffer.alignment_exponent()));
386
387                offset += buffer_len;
388                buffer
389            })
390            .collect();
391
392        Ok(ArrayParts {
393            flatbuffer: fb_buffer.clone(),
394            flatbuffer_loc: fb_root._tab.loc(),
395            buffers,
396        })
397    }
398}