1use std::fmt::Debug;
5use std::fmt::Formatter;
6use std::iter;
7use std::sync::Arc;
8
9use flatbuffers::FlatBufferBuilder;
10use flatbuffers::Follow;
11use flatbuffers::WIPOffset;
12use flatbuffers::root;
13use itertools::Itertools;
14use vortex_buffer::Alignment;
15use vortex_buffer::ByteBuffer;
16use vortex_dtype::DType;
17use vortex_dtype::TryFromBytes;
18use vortex_error::VortexError;
19use vortex_error::VortexExpect;
20use vortex_error::VortexResult;
21use vortex_error::vortex_bail;
22use vortex_error::vortex_err;
23use vortex_error::vortex_panic;
24use vortex_flatbuffers::FlatBuffer;
25use vortex_flatbuffers::FlatBufferRoot;
26use vortex_flatbuffers::ReadFlatBuffer;
27use vortex_flatbuffers::WriteFlatBuffer;
28use vortex_flatbuffers::array as fba;
29use vortex_flatbuffers::array::Compression;
30
31use crate::Array;
32use crate::ArrayContext;
33use crate::ArrayRef;
34use crate::ArrayVisitor;
35use crate::ArrayVisitorExt;
36use crate::buffer::BufferHandle;
37use crate::stats::StatsSet;
38
39#[derive(Default, Debug)]
41pub struct SerializeOptions {
42 pub offset: usize,
45 pub include_padding: bool,
47}
48
49impl dyn Array + '_ {
50 pub fn serialize(
61 &self,
62 ctx: &ArrayContext,
63 options: &SerializeOptions,
64 ) -> VortexResult<Vec<ByteBuffer>> {
65 let array_buffers = self
67 .depth_first_traversal()
68 .flat_map(|f| f.buffers())
69 .collect::<Vec<_>>();
70
71 let mut buffers = vec![];
73 let mut fb_buffers = Vec::with_capacity(buffers.capacity());
74
75 let max_alignment = array_buffers
77 .iter()
78 .map(|buf| buf.alignment())
79 .chain(iter::once(FlatBuffer::alignment()))
80 .max()
81 .unwrap_or_else(FlatBuffer::alignment);
82
83 let zeros = ByteBuffer::zeroed(*max_alignment);
85
86 buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
89
90 let mut pos = options.offset;
92
93 for buffer in array_buffers {
95 let padding = if options.include_padding {
96 let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
97 if padding > 0 {
98 pos += padding;
99 buffers.push(zeros.slice(0..padding));
100 }
101 padding
102 } else {
103 0
104 };
105
106 fb_buffers.push(fba::Buffer::new(
107 u16::try_from(padding).vortex_expect("padding fits into u16"),
108 buffer.alignment().exponent(),
109 Compression::None,
110 u32::try_from(buffer.len())
111 .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
112 ));
113
114 pos += buffer.len();
115 buffers.push(buffer.aligned(Alignment::none()));
116 }
117
118 let mut fbb = FlatBufferBuilder::new();
120 let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
121 let fb_root = root.write_flatbuffer(&mut fbb);
122 let fb_buffers = fbb.create_vector(&fb_buffers);
123 let fb_array = fba::Array::create(
124 &mut fbb,
125 &fba::ArrayArgs {
126 root: Some(fb_root),
127 buffers: Some(fb_buffers),
128 },
129 );
130 fbb.finish_minimal(fb_array);
131 let (fb_vec, fb_start) = fbb.collapse();
132 let fb_end = fb_vec.len();
133 let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
134 let fb_length = fb_buffer.len();
135
136 if options.include_padding {
137 let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
138 if padding > 0 {
139 buffers.push(zeros.slice(0..padding));
140 }
141 }
142 buffers.push(fb_buffer);
143
144 buffers.push(ByteBuffer::from(
146 u32::try_from(fb_length)
147 .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
148 .to_le_bytes()
149 .to_vec(),
150 ));
151
152 Ok(buffers)
153 }
154}
155
156pub struct ArrayNodeFlatBuffer<'a> {
158 ctx: &'a ArrayContext,
159 array: &'a dyn Array,
160 buffer_idx: u16,
161}
162
163impl<'a> ArrayNodeFlatBuffer<'a> {
164 pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn Array) -> VortexResult<Self> {
165 for child in array.depth_first_traversal() {
167 if child.metadata()?.is_none() {
168 vortex_bail!(
169 "Array {} does not support serialization",
170 child.encoding_id()
171 );
172 }
173 }
174 let n_buffers_recursive = array.nbuffers_recursive();
175 if n_buffers_recursive > u16::MAX as usize {
176 vortex_bail!(
177 "Array and all descendent arrays can have at most u16::MAX buffers: {}",
178 n_buffers_recursive
179 );
180 };
181 Ok(Self {
182 ctx,
183 array,
184 buffer_idx: 0,
185 })
186 }
187}
188
189impl FlatBufferRoot for ArrayNodeFlatBuffer<'_> {}
190
191impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
192 type Target<'t> = fba::ArrayNode<'t>;
193
194 fn write_flatbuffer<'fb>(
195 &self,
196 fbb: &mut FlatBufferBuilder<'fb>,
197 ) -> WIPOffset<Self::Target<'fb>> {
198 let encoding = self.ctx.encoding_idx(&self.array.encoding());
199 let metadata = self
200 .array
201 .metadata()
202 .vortex_expect("Failed to serialize metadata")
204 .vortex_expect("Validated that all arrays support serialization");
205 let metadata = Some(fbb.create_vector(metadata.as_slice()));
206
207 let nbuffers = u16::try_from(self.array.nbuffers())
209 .vortex_expect("Array can have at most u16::MAX buffers");
210 let mut child_buffer_idx = self.buffer_idx + nbuffers;
211
212 let children = &self
213 .array
214 .children()
215 .iter()
216 .map(|child| {
217 let msg = ArrayNodeFlatBuffer {
219 ctx: self.ctx,
220 array: child,
221 buffer_idx: child_buffer_idx,
222 }
223 .write_flatbuffer(fbb);
224 child_buffer_idx = u16::try_from(child.nbuffers_recursive())
225 .ok()
226 .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
227 .vortex_expect("Too many buffers (u16) for Array");
228 msg
229 })
230 .collect::<Vec<_>>();
231 let children = Some(fbb.create_vector(children));
232
233 let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
234 let stats = Some(self.array.statistics().write_flatbuffer(fbb));
235
236 fba::ArrayNode::create(
237 fbb,
238 &fba::ArrayNodeArgs {
239 encoding,
240 metadata,
241 children,
242 buffers,
243 stats,
244 },
245 )
246 }
247}
248
249pub trait ArrayChildren {
252 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
254
255 fn len(&self) -> usize;
257
258 fn is_empty(&self) -> bool {
260 self.len() == 0
261 }
262}
263
264#[derive(Clone)]
271pub struct ArrayParts {
272 flatbuffer: FlatBuffer,
274 flatbuffer_loc: usize,
276 buffers: Arc<[BufferHandle]>,
277}
278
279impl Debug for ArrayParts {
280 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
281 f.debug_struct("ArrayParts")
282 .field("encoding_id", &self.encoding_id())
283 .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
284 .field(
285 "buffers",
286 &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
287 )
288 .field("metadata", &self.metadata())
289 .finish()
290 }
291}
292
293impl ArrayParts {
294 pub fn decode(&self, ctx: &ArrayContext, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
296 let encoding_id = self.flatbuffer().encoding();
297 let vtable = ctx
298 .lookup_encoding(encoding_id)
299 .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
300
301 let buffers: Vec<_> = (0..self.nbuffers())
302 .map(|idx| self.buffer(idx))
303 .try_collect()?;
304
305 let children = ArrayPartsChildren { parts: self, ctx };
306
307 let decoded = vtable
308 .as_dyn()
309 .build(dtype, len, self.metadata(), &buffers, &children)?;
310
311 assert_eq!(
312 decoded.len(),
313 len,
314 "Array decoded from {} has incorrect length {}, expected {}",
315 vtable.id(),
316 decoded.len(),
317 len
318 );
319 assert_eq!(
320 decoded.dtype(),
321 dtype,
322 "Array decoded from {} has incorrect dtype {}, expected {}",
323 vtable.id(),
324 decoded.dtype(),
325 dtype,
326 );
327 assert_eq!(
328 decoded.encoding_id(),
329 vtable.id(),
330 "Array decoded from {} has incorrect encoding {}",
331 vtable.id(),
332 decoded.encoding_id(),
333 );
334
335 if let Some(stats) = self.flatbuffer().stats() {
337 let decoded_statistics = decoded.statistics();
338 StatsSet::read_flatbuffer(&stats)?
339 .into_iter()
340 .for_each(|(stat, val)| decoded_statistics.set(stat, val));
341 }
342
343 Ok(decoded)
344 }
345
346 pub fn encoding_id(&self) -> u16 {
348 self.flatbuffer().encoding()
349 }
350
351 pub fn metadata(&self) -> &[u8] {
353 self.flatbuffer()
354 .metadata()
355 .map(|metadata| metadata.bytes())
356 .unwrap_or(&[])
357 }
358
359 pub fn nchildren(&self) -> usize {
361 self.flatbuffer()
362 .children()
363 .map_or(0, |children| children.len())
364 }
365
366 pub fn child(&self, idx: usize) -> ArrayParts {
368 let children = self
369 .flatbuffer()
370 .children()
371 .vortex_expect("Expected array to have children");
372 if idx >= children.len() {
373 vortex_panic!(
374 "Invalid child index {} for array with {} children",
375 idx,
376 children.len()
377 );
378 }
379 self.with_root(children.get(idx))
380 }
381
382 pub fn nbuffers(&self) -> usize {
384 self.flatbuffer()
385 .buffers()
386 .map_or(0, |buffers| buffers.len())
387 }
388
389 pub fn buffer(&self, idx: usize) -> VortexResult<BufferHandle> {
391 let buffer_idx = self
392 .flatbuffer()
393 .buffers()
394 .ok_or_else(|| vortex_err!("Array has no buffers"))?
395 .get(idx);
396 self.buffers
397 .get(buffer_idx as usize)
398 .cloned()
399 .ok_or_else(|| {
400 vortex_err!(
401 "Invalid buffer index {} for array with {} buffers",
402 buffer_idx,
403 self.nbuffers()
404 )
405 })
406 }
407
408 pub fn buffer_lengths(&self) -> Vec<usize> {
414 let fb_array = root::<fba::Array>(self.flatbuffer.as_ref())
415 .vortex_expect("ArrayParts flatbuffer must be a valid Array");
416 fb_array
417 .buffers()
418 .map(|buffers| buffers.iter().map(|b| b.length() as usize).collect())
419 .unwrap_or_default()
420 }
421
422 pub fn from_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<Self> {
431 let fb_buffer = FlatBuffer::align_from(array_tree.into());
432 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
433 let fb_root = fb_array
434 .root()
435 .ok_or_else(|| vortex_err!("Array must have a root node"))?;
436 let flatbuffer_loc = fb_root._tab.loc();
437
438 Ok(ArrayParts {
439 flatbuffer: fb_buffer,
440 flatbuffer_loc,
441 buffers: Arc::new([]),
442 })
443 }
444
445 fn flatbuffer(&self) -> fba::ArrayNode<'_> {
447 unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
448 }
449
450 fn with_root(&self, root: fba::ArrayNode) -> Self {
453 let mut this = self.clone();
454 this.flatbuffer_loc = root._tab.loc();
455 this
456 }
457
458 pub fn from_flatbuffer_and_segment(
464 array_tree: ByteBuffer,
465 segment: BufferHandle,
466 ) -> VortexResult<Self> {
467 let segment = segment.try_to_bytes()?;
469 let segment = segment.aligned(Alignment::none());
471
472 let fb_buffer = FlatBuffer::align_from(array_tree);
473
474 let (flatbuffer_loc, buffers) = {
476 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
477 let fb_root = fb_array.root().vortex_expect("Array must have a root node");
478 let flatbuffer_loc = fb_root._tab.loc();
479
480 let mut offset = 0;
481 let buffers: Arc<[_]> = fb_array
482 .buffers()
483 .unwrap_or_default()
484 .iter()
485 .map(|fb_buf| {
486 offset += fb_buf.padding() as usize;
488
489 let buffer_len = fb_buf.length() as usize;
490
491 let buffer = segment
493 .slice(offset..(offset + buffer_len))
494 .aligned(Alignment::from_exponent(fb_buf.alignment_exponent()));
495
496 offset += buffer_len;
497 BufferHandle::Host(buffer)
498 })
499 .collect();
500
501 (flatbuffer_loc, buffers)
502 };
503
504 Ok(ArrayParts {
505 flatbuffer: fb_buffer,
506 flatbuffer_loc,
507 buffers,
508 })
509 }
510}
511
512struct ArrayPartsChildren<'a> {
513 parts: &'a ArrayParts,
514 ctx: &'a ArrayContext,
515}
516
517impl ArrayChildren for ArrayPartsChildren<'_> {
518 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
519 self.parts.child(index).decode(self.ctx, dtype, len)
520 }
521
522 fn len(&self) -> usize {
523 self.parts.nchildren()
524 }
525}
526
527impl TryFrom<ByteBuffer> for ArrayParts {
528 type Error = VortexError;
529
530 fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
531 if value.len() < 4 {
533 vortex_bail!("ArrayParts buffer is too short");
534 }
535
536 let value = value.aligned(Alignment::none());
538
539 let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
540 if value.len() < 4 + fb_length {
541 vortex_bail!("ArrayParts buffer is too short for flatbuffer");
542 }
543
544 let fb_offset = value.len() - 4 - fb_length;
545 let fb_buffer = value.slice(fb_offset..fb_offset + fb_length);
546 let fb_buffer = FlatBuffer::align_from(fb_buffer);
547
548 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
549 let fb_root = fb_array.root().vortex_expect("Array must have a root node");
550
551 let mut offset = 0;
552 let buffers: Arc<[_]> = fb_array
553 .buffers()
554 .unwrap_or_default()
555 .iter()
556 .map(|fb_buffer| {
557 offset += fb_buffer.padding() as usize;
559
560 let buffer_len = fb_buffer.length() as usize;
561
562 let buffer = value
564 .slice(offset..(offset + buffer_len))
565 .aligned(Alignment::from_exponent(fb_buffer.alignment_exponent()));
566
567 offset += buffer_len;
568 BufferHandle::Host(buffer)
569 })
570 .collect();
571
572 Ok(ArrayParts {
573 flatbuffer: fb_buffer.clone(),
574 flatbuffer_loc: fb_root._tab.loc(),
575 buffers,
576 })
577 }
578}
579
580impl TryFrom<BufferHandle> for ArrayParts {
581 type Error = VortexError;
582
583 fn try_from(value: BufferHandle) -> Result<Self, Self::Error> {
584 Self::try_from(value.try_to_bytes()?)
585 }
586}