1use std::fmt::{Debug, Formatter};
5use std::iter;
6use std::sync::Arc;
7
8use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset, root};
9use itertools::Itertools;
10use vortex_buffer::{Alignment, ByteBuffer};
11use vortex_dtype::{DType, TryFromBytes};
12use vortex_error::{
13 VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
14};
15use vortex_flatbuffers::array::Compression;
16use vortex_flatbuffers::{
17 FlatBuffer, FlatBufferRoot, ReadFlatBuffer, WriteFlatBuffer, array as fba,
18};
19
20use crate::stats::StatsSet;
21use crate::{Array, ArrayContext, ArrayRef, ArrayVisitor, ArrayVisitorExt};
22
23#[derive(Default, Debug)]
25pub struct SerializeOptions {
26 pub offset: usize,
29 pub include_padding: bool,
31}
32
33impl dyn Array + '_ {
34 pub fn serialize(
45 &self,
46 ctx: &ArrayContext,
47 options: &SerializeOptions,
48 ) -> VortexResult<Vec<ByteBuffer>> {
49 let array_buffers = self
51 .depth_first_traversal()
52 .flat_map(|f| f.buffers())
53 .collect::<Vec<_>>();
54
55 let mut buffers = vec![];
57 let mut fb_buffers = Vec::with_capacity(buffers.capacity());
58
59 let max_alignment = array_buffers
61 .iter()
62 .map(|buf| buf.alignment())
63 .chain(iter::once(FlatBuffer::alignment()))
64 .max()
65 .unwrap_or_else(FlatBuffer::alignment);
66
67 let zeros = ByteBuffer::zeroed(*max_alignment);
69
70 buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
73
74 let mut pos = options.offset;
76
77 for buffer in array_buffers {
79 let padding = if options.include_padding {
80 let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
81 if padding > 0 {
82 pos += padding;
83 buffers.push(zeros.slice(0..padding));
84 }
85 padding
86 } else {
87 0
88 };
89
90 fb_buffers.push(fba::Buffer::new(
91 u16::try_from(padding).vortex_expect("padding fits into u16"),
92 buffer.alignment().exponent(),
93 Compression::None,
94 u32::try_from(buffer.len())
95 .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
96 ));
97
98 pos += buffer.len();
99 buffers.push(buffer.aligned(Alignment::none()));
100 }
101
102 let mut fbb = FlatBufferBuilder::new();
104 let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
105 let fb_root = root.write_flatbuffer(&mut fbb);
106 let fb_buffers = fbb.create_vector(&fb_buffers);
107 let fb_array = fba::Array::create(
108 &mut fbb,
109 &fba::ArrayArgs {
110 root: Some(fb_root),
111 buffers: Some(fb_buffers),
112 },
113 );
114 fbb.finish_minimal(fb_array);
115 let (fb_vec, fb_start) = fbb.collapse();
116 let fb_end = fb_vec.len();
117 let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
118 let fb_length = fb_buffer.len();
119
120 if options.include_padding {
121 let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
122 if padding > 0 {
123 buffers.push(zeros.slice(0..padding));
124 }
125 }
126 buffers.push(fb_buffer);
127
128 buffers.push(ByteBuffer::from(
130 u32::try_from(fb_length)
131 .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
132 .to_le_bytes()
133 .to_vec(),
134 ));
135
136 Ok(buffers)
137 }
138}
139
140pub struct ArrayNodeFlatBuffer<'a> {
142 ctx: &'a ArrayContext,
143 array: &'a dyn Array,
144 buffer_idx: u16,
145}
146
147impl<'a> ArrayNodeFlatBuffer<'a> {
148 pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn Array) -> VortexResult<Self> {
149 for child in array.depth_first_traversal() {
151 if child.metadata()?.is_none() {
152 vortex_bail!(
153 "Array {} does not support serialization",
154 child.encoding_id()
155 );
156 }
157 }
158 let n_buffers_recursive = array.nbuffers_recursive();
159 if n_buffers_recursive > u16::MAX as usize {
160 vortex_bail!(
161 "Array and all descendent arrays can have at most u16::MAX buffers: {}",
162 n_buffers_recursive
163 );
164 };
165 Ok(Self {
166 ctx,
167 array,
168 buffer_idx: 0,
169 })
170 }
171}
172
173impl FlatBufferRoot for ArrayNodeFlatBuffer<'_> {}
174
175impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
176 type Target<'t> = fba::ArrayNode<'t>;
177
178 fn write_flatbuffer<'fb>(
179 &self,
180 fbb: &mut FlatBufferBuilder<'fb>,
181 ) -> WIPOffset<Self::Target<'fb>> {
182 let encoding = self.ctx.encoding_idx(&self.array.encoding());
183 let metadata = self
184 .array
185 .metadata()
186 .vortex_expect("Failed to serialize metadata")
188 .vortex_expect("Validated that all arrays support serialization");
189 let metadata = Some(fbb.create_vector(metadata.as_slice()));
190
191 let nbuffers = u16::try_from(self.array.nbuffers())
193 .vortex_expect("Array can have at most u16::MAX buffers");
194 let mut child_buffer_idx = self.buffer_idx + nbuffers;
195
196 let children = &self
197 .array
198 .children()
199 .iter()
200 .map(|child| {
201 let msg = ArrayNodeFlatBuffer {
203 ctx: self.ctx,
204 array: child,
205 buffer_idx: child_buffer_idx,
206 }
207 .write_flatbuffer(fbb);
208 child_buffer_idx = u16::try_from(child.nbuffers_recursive())
209 .ok()
210 .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
211 .vortex_expect("Too many buffers (u16) for Array");
212 msg
213 })
214 .collect::<Vec<_>>();
215 let children = Some(fbb.create_vector(children));
216
217 let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
218 let stats = Some(self.array.statistics().write_flatbuffer(fbb));
219
220 fba::ArrayNode::create(
221 fbb,
222 &fba::ArrayNodeArgs {
223 encoding,
224 metadata,
225 children,
226 buffers,
227 stats,
228 },
229 )
230 }
231}
232
233pub trait ArrayChildren {
236 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
238
239 fn len(&self) -> usize;
241
242 fn is_empty(&self) -> bool {
244 self.len() == 0
245 }
246}
247
248#[derive(Clone)]
255pub struct ArrayParts {
256 flatbuffer: FlatBuffer,
258 flatbuffer_loc: usize,
260 buffers: Arc<[ByteBuffer]>,
261}
262
263impl Debug for ArrayParts {
264 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
265 f.debug_struct("ArrayParts")
266 .field("encoding_id", &self.encoding_id())
267 .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
268 .field(
269 "buffers",
270 &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
271 )
272 .field("metadata", &self.metadata())
273 .finish()
274 }
275}
276
277impl ArrayParts {
278 pub fn decode(&self, ctx: &ArrayContext, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
280 let encoding_id = self.flatbuffer().encoding();
281 let vtable = ctx
282 .lookup_encoding(encoding_id)
283 .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
284
285 let buffers: Vec<_> = (0..self.nbuffers())
286 .map(|idx| self.buffer(idx))
287 .try_collect()?;
288
289 let children = ArrayPartsChildren { parts: self, ctx };
290
291 let decoded = vtable.build(dtype, len, self.metadata(), &buffers, &children)?;
292
293 assert_eq!(
294 decoded.len(),
295 len,
296 "Array decoded from {} has incorrect length {}, expected {}",
297 vtable.id(),
298 decoded.len(),
299 len
300 );
301 assert_eq!(
302 decoded.dtype(),
303 dtype,
304 "Array decoded from {} has incorrect dtype {}, expected {}",
305 vtable.id(),
306 decoded.dtype(),
307 dtype,
308 );
309 assert_eq!(
310 decoded.encoding_id(),
311 vtable.id(),
312 "Array decoded from {} has incorrect encoding {}",
313 vtable.id(),
314 decoded.encoding_id(),
315 );
316
317 if let Some(stats) = self.flatbuffer().stats() {
319 let decoded_statistics = decoded.statistics();
320 StatsSet::read_flatbuffer(&stats)?
321 .into_iter()
322 .for_each(|(stat, val)| decoded_statistics.set(stat, val));
323 }
324
325 Ok(decoded)
326 }
327
328 pub fn encoding_id(&self) -> u16 {
330 self.flatbuffer().encoding()
331 }
332
333 pub fn metadata(&self) -> &[u8] {
335 self.flatbuffer()
336 .metadata()
337 .map(|metadata| metadata.bytes())
338 .unwrap_or(&[])
339 }
340
341 pub fn nchildren(&self) -> usize {
343 self.flatbuffer()
344 .children()
345 .map_or(0, |children| children.len())
346 }
347
348 pub fn child(&self, idx: usize) -> ArrayParts {
350 let children = self
351 .flatbuffer()
352 .children()
353 .vortex_expect("Expected array to have children");
354 if idx >= children.len() {
355 vortex_panic!(
356 "Invalid child index {} for array with {} children",
357 idx,
358 children.len()
359 );
360 }
361 self.with_root(children.get(idx))
362 }
363
364 pub fn nbuffers(&self) -> usize {
366 self.flatbuffer()
367 .buffers()
368 .map_or(0, |buffers| buffers.len())
369 }
370
371 pub fn buffer(&self, idx: usize) -> VortexResult<ByteBuffer> {
373 let buffer_idx = self
374 .flatbuffer()
375 .buffers()
376 .ok_or_else(|| vortex_err!("Array has no buffers"))?
377 .get(idx);
378 self.buffers
379 .get(buffer_idx as usize)
380 .cloned()
381 .ok_or_else(|| {
382 vortex_err!(
383 "Invalid buffer index {} for array with {} buffers",
384 buffer_idx,
385 self.nbuffers()
386 )
387 })
388 }
389
390 fn flatbuffer(&self) -> fba::ArrayNode<'_> {
392 unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
393 }
394
395 fn with_root(&self, root: fba::ArrayNode) -> Self {
398 let mut this = self.clone();
399 this.flatbuffer_loc = root._tab.loc();
400 this
401 }
402}
403
404struct ArrayPartsChildren<'a> {
405 parts: &'a ArrayParts,
406 ctx: &'a ArrayContext,
407}
408
409impl ArrayChildren for ArrayPartsChildren<'_> {
410 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
411 self.parts.child(index).decode(self.ctx, dtype, len)
412 }
413
414 fn len(&self) -> usize {
415 self.parts.nchildren()
416 }
417}
418
419impl TryFrom<ByteBuffer> for ArrayParts {
420 type Error = VortexError;
421
422 fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
423 if value.len() < 4 {
425 vortex_bail!("ArrayParts buffer is too short");
426 }
427
428 let value = value.aligned(Alignment::none());
430
431 let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
432 if value.len() < 4 + fb_length {
433 vortex_bail!("ArrayParts buffer is too short for flatbuffer");
434 }
435
436 let fb_offset = value.len() - 4 - fb_length;
437 let fb_buffer = value.slice(fb_offset..fb_offset + fb_length);
438 let fb_buffer = FlatBuffer::align_from(fb_buffer);
439
440 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
441 let fb_root = fb_array.root().vortex_expect("Array must have a root node");
442
443 let mut offset = 0;
444 let buffers: Arc<[ByteBuffer]> = fb_array
445 .buffers()
446 .unwrap_or_default()
447 .iter()
448 .map(|fb_buffer| {
449 offset += fb_buffer.padding() as usize;
451
452 let buffer_len = fb_buffer.length() as usize;
453
454 let buffer = value
456 .slice(offset..(offset + buffer_len))
457 .aligned(Alignment::from_exponent(fb_buffer.alignment_exponent()));
458
459 offset += buffer_len;
460 buffer
461 })
462 .collect();
463
464 Ok(ArrayParts {
465 flatbuffer: fb_buffer.clone(),
466 flatbuffer_loc: fb_root._tab.loc(),
467 buffers,
468 })
469 }
470}