1use std::fmt::{Debug, Formatter};
2use std::iter;
3use std::sync::Arc;
4
5use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset, root};
6use itertools::Itertools;
7use vortex_buffer::{Alignment, ByteBuffer};
8use vortex_dtype::{DType, TryFromBytes};
9use vortex_error::{
10 VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
11};
12use vortex_flatbuffers::array::Compression;
13use vortex_flatbuffers::{
14 FlatBuffer, FlatBufferRoot, ReadFlatBuffer, WriteFlatBuffer, array as fba,
15};
16
17use crate::stats::StatsSet;
18use crate::{Array, ArrayContext, ArrayRef, ArrayVisitor, ArrayVisitorExt};
19
20#[derive(Default, Debug)]
22pub struct SerializeOptions {
23 pub offset: usize,
26 pub include_padding: bool,
28}
29
30impl dyn Array + '_ {
31 pub fn serialize(
42 &self,
43 ctx: &ArrayContext,
44 options: &SerializeOptions,
45 ) -> VortexResult<Vec<ByteBuffer>> {
46 let mut array_buffers = vec![];
48 for a in self.depth_first_traversal() {
49 for buffer in a.buffers() {
50 array_buffers.push(buffer);
51 }
52 }
53
54 let mut buffers = vec![];
56 let mut fb_buffers = Vec::with_capacity(buffers.capacity());
57
58 let max_alignment = array_buffers
60 .iter()
61 .map(|buf| buf.alignment())
62 .chain(iter::once(FlatBuffer::alignment()))
63 .max()
64 .unwrap_or_else(FlatBuffer::alignment);
65
66 let zeros = ByteBuffer::zeroed(*max_alignment);
68
69 buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
72
73 let mut pos = options.offset;
75
76 for buffer in array_buffers {
78 let padding = if options.include_padding {
79 let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
80 if padding > 0 {
81 pos += padding;
82 buffers.push(zeros.slice(0..padding));
83 }
84 padding
85 } else {
86 0
87 };
88
89 fb_buffers.push(fba::Buffer::new(
90 u16::try_from(padding).vortex_expect("padding fits into u16"),
91 buffer.alignment().exponent(),
92 Compression::None,
93 u32::try_from(buffer.len())
94 .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
95 ));
96
97 pos += buffer.len();
98 buffers.push(buffer.aligned(Alignment::none()));
99 }
100
101 let mut fbb = FlatBufferBuilder::new();
103 let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
104 let fb_root = root.write_flatbuffer(&mut fbb);
105 let fb_buffers = fbb.create_vector(&fb_buffers);
106 let fb_array = fba::Array::create(
107 &mut fbb,
108 &fba::ArrayArgs {
109 root: Some(fb_root),
110 buffers: Some(fb_buffers),
111 },
112 );
113 fbb.finish_minimal(fb_array);
114 let (fb_vec, fb_start) = fbb.collapse();
115 let fb_end = fb_vec.len();
116 let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
117 let fb_length = fb_buffer.len();
118
119 if options.include_padding {
120 let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
121 if padding > 0 {
122 buffers.push(zeros.slice(0..padding));
123 }
124 }
125 buffers.push(fb_buffer);
126
127 buffers.push(ByteBuffer::from(
129 u32::try_from(fb_length)
130 .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
131 .to_le_bytes()
132 .to_vec(),
133 ));
134
135 Ok(buffers)
136 }
137}
138
139pub struct ArrayNodeFlatBuffer<'a> {
141 ctx: &'a ArrayContext,
142 array: &'a dyn Array,
143 buffer_idx: u16,
144}
145
146impl<'a> ArrayNodeFlatBuffer<'a> {
147 pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn Array) -> VortexResult<Self> {
148 for child in array.depth_first_traversal() {
150 if child.metadata()?.is_none() {
151 vortex_bail!(
152 "Array {} does not support serialization",
153 child.encoding_id()
154 );
155 }
156 }
157 Ok(Self {
158 ctx,
159 array,
160 buffer_idx: 0,
161 })
162 }
163}
164
165impl FlatBufferRoot for ArrayNodeFlatBuffer<'_> {}
166
167impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
168 type Target<'t> = fba::ArrayNode<'t>;
169
170 fn write_flatbuffer<'fb>(
171 &self,
172 fbb: &mut FlatBufferBuilder<'fb>,
173 ) -> WIPOffset<Self::Target<'fb>> {
174 let encoding = self.ctx.encoding_idx(&self.array.encoding());
175 let metadata = self
176 .array
177 .metadata()
178 .vortex_expect("Failed to serialize metadata")
180 .vortex_expect("Validated that all arrays support serialization");
181 let metadata = Some(fbb.create_vector(metadata.as_slice()));
182
183 let nbuffers = u16::try_from(self.array.nbuffers())
185 .vortex_expect("Array can have at most u16::MAX buffers");
186 let child_buffer_idx = self.buffer_idx + nbuffers;
187
188 let children = self
189 .array
190 .children()
191 .iter()
192 .scan(child_buffer_idx, |buffer_idx, child| {
193 let msg = ArrayNodeFlatBuffer {
195 ctx: self.ctx,
196 array: child,
197 buffer_idx: *buffer_idx,
198 }
199 .write_flatbuffer(fbb);
200 *buffer_idx = u16::try_from(child.nbuffers_recursive())
201 .ok()
202 .and_then(|nbuffers| nbuffers.checked_add(*buffer_idx))
203 .vortex_expect("Too many buffers (u16) for Array");
204 Some(msg)
205 })
206 .collect_vec();
207 let children = Some(fbb.create_vector(&children));
208
209 let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
210 let stats = Some(self.array.statistics().to_owned().write_flatbuffer(fbb));
211
212 fba::ArrayNode::create(
213 fbb,
214 &fba::ArrayNodeArgs {
215 encoding,
216 metadata,
217 children,
218 buffers,
219 stats,
220 },
221 )
222 }
223}
224
225pub trait ArrayChildren {
230 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
232
233 fn len(&self) -> usize;
235
236 fn is_empty(&self) -> bool {
238 self.len() == 0
239 }
240}
241
242#[derive(Clone)]
249pub struct ArrayParts {
250 flatbuffer: FlatBuffer,
252 flatbuffer_loc: usize,
254 buffers: Arc<[ByteBuffer]>,
255}
256
257impl Debug for ArrayParts {
258 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
259 f.debug_struct("ArrayParts")
260 .field("encoding_id", &self.encoding_id())
261 .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
262 .field(
263 "buffers",
264 &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
265 )
266 .field("metadata", &self.metadata())
267 .finish()
268 }
269}
270
271impl ArrayParts {
272 pub fn decode(&self, ctx: &ArrayContext, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
274 let encoding_id = self.flatbuffer().encoding();
275 let vtable = ctx
276 .lookup_encoding(encoding_id)
277 .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
278
279 let buffers: Vec<_> = (0..self.nbuffers())
280 .map(|idx| self.buffer(idx))
281 .try_collect()?;
282
283 let children = ArrayPartsChildren { parts: self, ctx };
284
285 let decoded = vtable.build(dtype, len, self.metadata(), &buffers, &children)?;
286
287 assert_eq!(
288 decoded.len(),
289 len,
290 "Array decoded from {} has incorrect length {}, expected {}",
291 vtable.id(),
292 decoded.len(),
293 len
294 );
295 assert_eq!(
296 decoded.dtype(),
297 dtype,
298 "Array decoded from {} has incorrect dtype {}, expected {}",
299 vtable.id(),
300 decoded.dtype(),
301 dtype,
302 );
303 assert_eq!(
304 decoded.encoding_id(),
305 vtable.id(),
306 "Array decoded from {} has incorrect encoding {}",
307 vtable.id(),
308 decoded.encoding_id(),
309 );
310
311 if let Some(stats) = self.flatbuffer().stats() {
313 let decoded_statistics = decoded.statistics();
314 StatsSet::read_flatbuffer(&stats)?
315 .into_iter()
316 .for_each(|(stat, val)| decoded_statistics.set(stat, val));
317 }
318
319 Ok(decoded)
320 }
321
322 pub fn encoding_id(&self) -> u16 {
324 self.flatbuffer().encoding()
325 }
326
327 pub fn metadata(&self) -> &[u8] {
329 self.flatbuffer()
330 .metadata()
331 .map(|metadata| metadata.bytes())
332 .unwrap_or(&[])
333 }
334
335 pub fn nchildren(&self) -> usize {
337 self.flatbuffer()
338 .children()
339 .map_or(0, |children| children.len())
340 }
341
342 pub fn child(&self, idx: usize) -> ArrayParts {
344 let children = self
345 .flatbuffer()
346 .children()
347 .vortex_expect("Expected array to have children");
348 if idx >= children.len() {
349 vortex_panic!(
350 "Invalid child index {} for array with {} children",
351 idx,
352 children.len()
353 );
354 }
355 self.with_root(children.get(idx))
356 }
357
358 pub fn nbuffers(&self) -> usize {
360 self.flatbuffer()
361 .buffers()
362 .map_or(0, |buffers| buffers.len())
363 }
364
365 pub fn buffer(&self, idx: usize) -> VortexResult<ByteBuffer> {
367 let buffer_idx = self
368 .flatbuffer()
369 .buffers()
370 .ok_or_else(|| vortex_err!("Array has no buffers"))?
371 .get(idx);
372 self.buffers
373 .get(buffer_idx as usize)
374 .cloned()
375 .ok_or_else(|| {
376 vortex_err!(
377 "Invalid buffer index {} for array with {} buffers",
378 buffer_idx,
379 self.nbuffers()
380 )
381 })
382 }
383
384 fn flatbuffer(&self) -> fba::ArrayNode {
386 unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
387 }
388
389 fn with_root(&self, root: fba::ArrayNode) -> Self {
392 let mut this = self.clone();
393 this.flatbuffer_loc = root._tab.loc();
394 this
395 }
396}
397
398struct ArrayPartsChildren<'a> {
399 parts: &'a ArrayParts,
400 ctx: &'a ArrayContext,
401}
402
403impl ArrayChildren for ArrayPartsChildren<'_> {
404 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
405 self.parts.child(index).decode(self.ctx, dtype, len)
406 }
407
408 fn len(&self) -> usize {
409 self.parts.nchildren()
410 }
411}
412
413impl TryFrom<ByteBuffer> for ArrayParts {
414 type Error = VortexError;
415
416 fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
417 if value.len() < 4 {
419 vortex_bail!("ArrayParts buffer is too short");
420 }
421
422 let value = value.aligned(Alignment::none());
424
425 let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
426 if value.len() < 4 + fb_length {
427 vortex_bail!("ArrayParts buffer is too short for flatbuffer");
428 }
429
430 let fb_offset = value.len() - 4 - fb_length;
431 let fb_buffer = value.slice(fb_offset..fb_offset + fb_length);
432 let fb_buffer = FlatBuffer::align_from(fb_buffer);
433
434 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
435 let fb_root = fb_array.root().vortex_expect("Array must have a root node");
436
437 let mut offset = 0;
438 let buffers: Arc<[ByteBuffer]> = fb_array
439 .buffers()
440 .unwrap_or_default()
441 .iter()
442 .map(|fb_buffer| {
443 offset += fb_buffer.padding() as usize;
445
446 let buffer_len = fb_buffer.length() as usize;
447
448 let buffer = value
450 .slice(offset..(offset + buffer_len))
451 .aligned(Alignment::from_exponent(fb_buffer.alignment_exponent()));
452
453 offset += buffer_len;
454 buffer
455 })
456 .collect();
457
458 Ok(ArrayParts {
459 flatbuffer: fb_buffer.clone(),
460 flatbuffer_loc: fb_root._tab.loc(),
461 buffers,
462 })
463 }
464}