1use std::borrow::Cow;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::iter;
8use std::sync::Arc;
9
10use flatbuffers::FlatBufferBuilder;
11use flatbuffers::Follow;
12use flatbuffers::WIPOffset;
13use flatbuffers::root;
14use vortex_buffer::Alignment;
15use vortex_buffer::ByteBuffer;
16use vortex_error::VortexError;
17use vortex_error::VortexExpect;
18use vortex_error::VortexResult;
19use vortex_error::vortex_bail;
20use vortex_error::vortex_err;
21use vortex_error::vortex_panic;
22use vortex_flatbuffers::FlatBuffer;
23use vortex_flatbuffers::WriteFlatBuffer;
24use vortex_flatbuffers::array as fba;
25use vortex_flatbuffers::array::Compression;
26use vortex_session::VortexSession;
27use vortex_session::registry::ReadContext;
28use vortex_utils::aliases::hash_map::HashMap;
29
30use crate::ArrayContext;
31use crate::ArrayRef;
32use crate::ArrayVisitor;
33use crate::ArrayVisitorExt;
34use crate::DynArray;
35use crate::buffer::BufferHandle;
36use crate::dtype::DType;
37use crate::dtype::TryFromBytes;
38use crate::session::ArraySessionExt;
39use crate::stats::StatsSet;
40
41#[derive(Default, Debug)]
43pub struct SerializeOptions {
44 pub offset: usize,
47 pub include_padding: bool,
49}
50
51impl dyn DynArray + '_ {
52 pub fn serialize(
63 &self,
64 ctx: &ArrayContext,
65 options: &SerializeOptions,
66 ) -> VortexResult<Vec<ByteBuffer>> {
67 let array_buffers = self
69 .depth_first_traversal()
70 .flat_map(|f| f.buffers())
71 .collect::<Vec<_>>();
72
73 let mut buffers = vec![];
75 let mut fb_buffers = Vec::with_capacity(buffers.capacity());
76
77 let max_alignment = array_buffers
79 .iter()
80 .map(|buf| buf.alignment())
81 .chain(iter::once(FlatBuffer::alignment()))
82 .max()
83 .unwrap_or_else(FlatBuffer::alignment);
84
85 let zeros = ByteBuffer::zeroed(*max_alignment);
87
88 buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
91
92 let mut pos = options.offset;
94
95 for buffer in array_buffers {
97 let padding = if options.include_padding {
98 let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
99 if padding > 0 {
100 pos += padding;
101 buffers.push(zeros.slice(0..padding));
102 }
103 padding
104 } else {
105 0
106 };
107
108 fb_buffers.push(fba::Buffer::new(
109 u16::try_from(padding).vortex_expect("padding fits into u16"),
110 buffer.alignment().exponent(),
111 Compression::None,
112 u32::try_from(buffer.len())
113 .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
114 ));
115
116 pos += buffer.len();
117 buffers.push(buffer.aligned(Alignment::none()));
118 }
119
120 let mut fbb = FlatBufferBuilder::new();
122
123 let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
124 let fb_root = root.try_write_flatbuffer(&mut fbb)?;
125
126 let fb_buffers = fbb.create_vector(&fb_buffers);
127 let fb_array = fba::Array::create(
128 &mut fbb,
129 &fba::ArrayArgs {
130 root: Some(fb_root),
131 buffers: Some(fb_buffers),
132 },
133 );
134 fbb.finish_minimal(fb_array);
135 let (fb_vec, fb_start) = fbb.collapse();
136 let fb_end = fb_vec.len();
137 let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
138 let fb_length = fb_buffer.len();
139
140 if options.include_padding {
141 let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
142 if padding > 0 {
143 buffers.push(zeros.slice(0..padding));
144 }
145 }
146 buffers.push(fb_buffer);
147
148 buffers.push(ByteBuffer::from(
150 u32::try_from(fb_length)
151 .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
152 .to_le_bytes()
153 .to_vec(),
154 ));
155
156 Ok(buffers)
157 }
158}
159
160pub struct ArrayNodeFlatBuffer<'a> {
162 ctx: &'a ArrayContext,
163 array: &'a dyn DynArray,
164 buffer_idx: u16,
165}
166
167impl<'a> ArrayNodeFlatBuffer<'a> {
168 pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn DynArray) -> VortexResult<Self> {
169 for child in array.depth_first_traversal() {
171 if child.metadata()?.is_none() {
172 vortex_bail!(
173 "Array {} does not support serialization",
174 child.encoding_id()
175 );
176 }
177 }
178 let n_buffers_recursive = array.nbuffers_recursive();
179 if n_buffers_recursive > u16::MAX as usize {
180 vortex_bail!(
181 "Array and all descendent arrays can have at most u16::MAX buffers: {}",
182 n_buffers_recursive
183 );
184 };
185 Ok(Self {
186 ctx,
187 array,
188 buffer_idx: 0,
189 })
190 }
191
192 pub fn try_write_flatbuffer<'fb>(
193 &self,
194 fbb: &mut FlatBufferBuilder<'fb>,
195 ) -> VortexResult<WIPOffset<fba::ArrayNode<'fb>>> {
196 let encoding_idx = self
197 .ctx
198 .intern(&self.array.encoding_id())
199 .ok_or_else(|| {
201 vortex_err!(
202 "Array encoding {} not permitted by ctx",
203 self.array.encoding_id()
204 )
205 })?;
206
207 let metadata = self.array.metadata()?.ok_or_else(|| {
208 vortex_err!(
209 "Array {} does not support serialization",
210 self.array.encoding_id()
211 )
212 })?;
213 let metadata = Some(fbb.create_vector(metadata.as_slice()));
214
215 let nbuffers = u16::try_from(self.array.nbuffers())
217 .map_err(|_| vortex_err!("Array can have at most u16::MAX buffers"))?;
218 let mut child_buffer_idx = self.buffer_idx + nbuffers;
219
220 let children = &self
221 .array
222 .children()
223 .iter()
224 .map(|child| {
225 let msg = ArrayNodeFlatBuffer {
227 ctx: self.ctx,
228 array: child,
229 buffer_idx: child_buffer_idx,
230 }
231 .try_write_flatbuffer(fbb)?;
232
233 child_buffer_idx = u16::try_from(child.nbuffers_recursive())
234 .ok()
235 .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
236 .ok_or_else(|| vortex_err!("Too many buffers (u16) for Array"))?;
237
238 Ok(msg)
239 })
240 .collect::<VortexResult<Vec<_>>>()?;
241 let children = Some(fbb.create_vector(children));
242
243 let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
244 let stats = Some(self.array.statistics().write_flatbuffer(fbb)?);
245
246 Ok(fba::ArrayNode::create(
247 fbb,
248 &fba::ArrayNodeArgs {
249 encoding: encoding_idx,
250 metadata,
251 children,
252 buffers,
253 stats,
254 },
255 ))
256 }
257}
258
259pub trait ArrayChildren {
262 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
264
265 fn len(&self) -> usize;
267
268 fn is_empty(&self) -> bool {
270 self.len() == 0
271 }
272}
273
274impl ArrayChildren for &[ArrayRef] {
275 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
276 let array = self[index].clone();
277 assert_eq!(array.len(), len);
278 assert_eq!(array.dtype(), dtype);
279 Ok(array)
280 }
281
282 fn len(&self) -> usize {
283 <[_]>::len(self)
284 }
285}
286
287#[derive(Clone)]
294pub struct ArrayParts {
295 flatbuffer: FlatBuffer,
297 flatbuffer_loc: usize,
299 buffers: Arc<[BufferHandle]>,
300}
301
302impl Debug for ArrayParts {
303 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
304 f.debug_struct("ArrayParts")
305 .field("encoding_id", &self.encoding_id())
306 .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
307 .field(
308 "buffers",
309 &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
310 )
311 .field("metadata", &self.metadata())
312 .finish()
313 }
314}
315
316impl ArrayParts {
317 pub fn decode(
319 &self,
320 dtype: &DType,
321 len: usize,
322 ctx: &ReadContext,
323 session: &VortexSession,
324 ) -> VortexResult<ArrayRef> {
325 let encoding_idx = self.flatbuffer().encoding();
326 let encoding_id = ctx
327 .resolve(encoding_idx)
328 .ok_or_else(|| vortex_err!("Unknown encoding index: {}", encoding_idx))?;
329 let vtable = session
330 .arrays()
331 .registry()
332 .find(&encoding_id)
333 .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
334
335 let children = ArrayPartsChildren {
336 parts: self,
337 ctx,
338 session,
339 };
340
341 let buffers = self.collect_buffers()?;
342
343 let decoded = vtable.build(
344 encoding_id.clone(),
345 dtype,
346 len,
347 self.metadata(),
348 &buffers,
349 &children,
350 session,
351 )?;
352
353 assert_eq!(
354 decoded.len(),
355 len,
356 "Array decoded from {} has incorrect length {}, expected {}",
357 encoding_id,
358 decoded.len(),
359 len
360 );
361 assert_eq!(
362 decoded.dtype(),
363 dtype,
364 "Array decoded from {} has incorrect dtype {}, expected {}",
365 encoding_id,
366 decoded.dtype(),
367 dtype,
368 );
369 assert_eq!(
370 decoded.encoding_id(),
371 encoding_id,
372 "Array decoded from {} has incorrect encoding {}",
373 encoding_id,
374 decoded.encoding_id(),
375 );
376
377 if let Some(stats) = self.flatbuffer().stats() {
379 decoded
380 .statistics()
381 .set_iter(StatsSet::from_flatbuffer(&stats, dtype, session)?.into_iter());
382 }
383
384 Ok(decoded)
385 }
386
387 pub fn encoding_id(&self) -> u16 {
389 self.flatbuffer().encoding()
390 }
391
392 pub fn metadata(&self) -> &[u8] {
394 self.flatbuffer()
395 .metadata()
396 .map(|metadata| metadata.bytes())
397 .unwrap_or(&[])
398 }
399
400 pub fn nchildren(&self) -> usize {
402 self.flatbuffer()
403 .children()
404 .map_or(0, |children| children.len())
405 }
406
407 pub fn child(&self, idx: usize) -> ArrayParts {
409 let children = self
410 .flatbuffer()
411 .children()
412 .vortex_expect("Expected array to have children");
413 if idx >= children.len() {
414 vortex_panic!(
415 "Invalid child index {} for array with {} children",
416 idx,
417 children.len()
418 );
419 }
420 self.with_root(children.get(idx))
421 }
422
423 pub fn nbuffers(&self) -> usize {
425 self.flatbuffer()
426 .buffers()
427 .map_or(0, |buffers| buffers.len())
428 }
429
430 pub fn buffer(&self, idx: usize) -> VortexResult<BufferHandle> {
432 let buffer_idx = self
433 .flatbuffer()
434 .buffers()
435 .ok_or_else(|| vortex_err!("Array has no buffers"))?
436 .get(idx);
437 self.buffers
438 .get(buffer_idx as usize)
439 .cloned()
440 .ok_or_else(|| {
441 vortex_err!(
442 "Invalid buffer index {} for array with {} buffers",
443 buffer_idx,
444 self.nbuffers()
445 )
446 })
447 }
448
449 fn collect_buffers(&self) -> VortexResult<Cow<'_, [BufferHandle]>> {
454 let Some(fb_buffers) = self.flatbuffer().buffers() else {
455 return Ok(Cow::Borrowed(&[]));
456 };
457 let count = fb_buffers.len();
458 if count == 0 {
459 return Ok(Cow::Borrowed(&[]));
460 }
461 let start = fb_buffers.get(0) as usize;
462 let contiguous = fb_buffers
463 .iter()
464 .enumerate()
465 .all(|(i, idx)| idx as usize == start + i);
466 if contiguous {
467 self.buffers.get(start..start + count).map_or_else(
468 || {
469 vortex_bail!(
470 "buffer indices {}..{} out of range for {} buffers",
471 start,
472 start + count,
473 self.buffers.len()
474 )
475 },
476 |slice| Ok(Cow::Borrowed(slice)),
477 )
478 } else {
479 (0..count)
480 .map(|idx| self.buffer(idx))
481 .collect::<VortexResult<Vec<_>>>()
482 .map(Cow::Owned)
483 }
484 }
485
486 pub fn buffer_lengths(&self) -> Vec<usize> {
492 let fb_array = root::<fba::Array>(self.flatbuffer.as_ref())
493 .vortex_expect("ArrayParts flatbuffer must be a valid Array");
494 fb_array
495 .buffers()
496 .map(|buffers| buffers.iter().map(|b| b.length() as usize).collect())
497 .unwrap_or_default()
498 }
499
500 fn validate_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<(FlatBuffer, usize)> {
502 let fb_buffer = FlatBuffer::align_from(array_tree.into());
503 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
504 let fb_root = fb_array
505 .root()
506 .ok_or_else(|| vortex_err!("Array must have a root node"))?;
507 let flatbuffer_loc = fb_root._tab.loc();
508 Ok((fb_buffer, flatbuffer_loc))
509 }
510
511 pub fn from_flatbuffer_with_buffers(
518 array_tree: impl Into<ByteBuffer>,
519 buffers: Vec<BufferHandle>,
520 ) -> VortexResult<Self> {
521 let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
522 Ok(ArrayParts {
523 flatbuffer,
524 flatbuffer_loc,
525 buffers: buffers.into(),
526 })
527 }
528
529 pub fn from_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<Self> {
538 let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
539 Ok(ArrayParts {
540 flatbuffer,
541 flatbuffer_loc,
542 buffers: Arc::new([]),
543 })
544 }
545
546 fn flatbuffer(&self) -> fba::ArrayNode<'_> {
548 unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
549 }
550
551 fn with_root(&self, root: fba::ArrayNode) -> Self {
554 let mut this = self.clone();
555 this.flatbuffer_loc = root._tab.loc();
556 this
557 }
558
559 pub fn from_flatbuffer_and_segment(
565 array_tree: ByteBuffer,
566 segment: BufferHandle,
567 ) -> VortexResult<Self> {
568 Self::from_flatbuffer_and_segment_with_overrides(array_tree, segment, &HashMap::new())
570 }
571
572 pub fn from_flatbuffer_and_segment_with_overrides(
579 array_tree: ByteBuffer,
580 segment: BufferHandle,
581 buffer_overrides: &HashMap<u32, ByteBuffer>,
582 ) -> VortexResult<Self> {
583 let segment = segment.ensure_aligned(Alignment::none())?;
586
587 let (fb_buffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
590 let fb_array = unsafe { fba::root_as_array_unchecked(fb_buffer.as_ref()) };
592
593 let mut offset = 0;
594 let buffers = fb_array
595 .buffers()
596 .unwrap_or_default()
597 .iter()
598 .enumerate()
599 .map(|(idx, fb_buf)| {
600 offset += fb_buf.padding() as usize;
601 let buffer_len = fb_buf.length() as usize;
602 let alignment = Alignment::from_exponent(fb_buf.alignment_exponent());
603
604 let idx = u32::try_from(idx).vortex_expect("buffer count must fit in u32");
605 let handle = if let Some(host_data) = buffer_overrides.get(&idx) {
606 BufferHandle::new_host(host_data.clone()).ensure_aligned(alignment)?
607 } else {
608 let buffer = segment.slice(offset..(offset + buffer_len));
609 buffer.ensure_aligned(alignment)?
610 };
611
612 offset += buffer_len;
613 Ok(handle)
614 })
615 .collect::<VortexResult<Arc<[_]>>>()?;
616
617 Ok(ArrayParts {
618 flatbuffer: fb_buffer,
619 flatbuffer_loc,
620 buffers,
621 })
622 }
623}
624
625struct ArrayPartsChildren<'a> {
626 parts: &'a ArrayParts,
627 ctx: &'a ReadContext,
628 session: &'a VortexSession,
629}
630
631impl ArrayChildren for ArrayPartsChildren<'_> {
632 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
633 self.parts
634 .child(index)
635 .decode(dtype, len, self.ctx, self.session)
636 }
637
638 fn len(&self) -> usize {
639 self.parts.nchildren()
640 }
641}
642
643impl TryFrom<ByteBuffer> for ArrayParts {
644 type Error = VortexError;
645
646 fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
647 if value.len() < 4 {
649 vortex_bail!("ArrayParts buffer is too short");
650 }
651
652 let value = value.aligned(Alignment::none());
654
655 let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
656 if value.len() < 4 + fb_length {
657 vortex_bail!("ArrayParts buffer is too short for flatbuffer");
658 }
659
660 let fb_offset = value.len() - 4 - fb_length;
661 let array_tree = value.slice(fb_offset..fb_offset + fb_length);
662 let segment = BufferHandle::new_host(value.slice(0..fb_offset));
663
664 Self::from_flatbuffer_and_segment(array_tree, segment)
665 }
666}
667
668impl TryFrom<BufferHandle> for ArrayParts {
669 type Error = VortexError;
670
671 fn try_from(value: BufferHandle) -> Result<Self, Self::Error> {
672 Self::try_from(value.try_to_host_sync()?)
673 }
674}