1use std::borrow::Cow;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::iter;
8use std::sync::Arc;
9
10use flatbuffers::FlatBufferBuilder;
11use flatbuffers::Follow;
12use flatbuffers::WIPOffset;
13use flatbuffers::root;
14use vortex_buffer::Alignment;
15use vortex_buffer::ByteBuffer;
16use vortex_error::VortexError;
17use vortex_error::VortexExpect;
18use vortex_error::VortexResult;
19use vortex_error::vortex_bail;
20use vortex_error::vortex_err;
21use vortex_error::vortex_panic;
22use vortex_flatbuffers::FlatBuffer;
23use vortex_flatbuffers::WriteFlatBuffer;
24use vortex_flatbuffers::array as fba;
25use vortex_flatbuffers::array::Compression;
26use vortex_session::VortexSession;
27use vortex_utils::aliases::hash_map::HashMap;
28
29use crate::Array;
30use crate::ArrayContext;
31use crate::ArrayRef;
32use crate::ArrayVisitor;
33use crate::ArrayVisitorExt;
34use crate::buffer::BufferHandle;
35use crate::dtype::DType;
36use crate::dtype::TryFromBytes;
37use crate::session::ArraySessionExt;
38use crate::stats::StatsSet;
39
40#[derive(Default, Debug)]
42pub struct SerializeOptions {
43 pub offset: usize,
46 pub include_padding: bool,
48}
49
50impl dyn Array + '_ {
51 pub fn serialize(
62 &self,
63 ctx: &ArrayContext,
64 options: &SerializeOptions,
65 ) -> VortexResult<Vec<ByteBuffer>> {
66 let array_buffers = self
68 .depth_first_traversal()
69 .flat_map(|f| f.buffers())
70 .collect::<Vec<_>>();
71
72 let mut buffers = vec![];
74 let mut fb_buffers = Vec::with_capacity(buffers.capacity());
75
76 let max_alignment = array_buffers
78 .iter()
79 .map(|buf| buf.alignment())
80 .chain(iter::once(FlatBuffer::alignment()))
81 .max()
82 .unwrap_or_else(FlatBuffer::alignment);
83
84 let zeros = ByteBuffer::zeroed(*max_alignment);
86
87 buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
90
91 let mut pos = options.offset;
93
94 for buffer in array_buffers {
96 let padding = if options.include_padding {
97 let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
98 if padding > 0 {
99 pos += padding;
100 buffers.push(zeros.slice(0..padding));
101 }
102 padding
103 } else {
104 0
105 };
106
107 fb_buffers.push(fba::Buffer::new(
108 u16::try_from(padding).vortex_expect("padding fits into u16"),
109 buffer.alignment().exponent(),
110 Compression::None,
111 u32::try_from(buffer.len())
112 .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
113 ));
114
115 pos += buffer.len();
116 buffers.push(buffer.aligned(Alignment::none()));
117 }
118
119 let mut fbb = FlatBufferBuilder::new();
121
122 let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
123 let fb_root = root.try_write_flatbuffer(&mut fbb)?;
124
125 let fb_buffers = fbb.create_vector(&fb_buffers);
126 let fb_array = fba::Array::create(
127 &mut fbb,
128 &fba::ArrayArgs {
129 root: Some(fb_root),
130 buffers: Some(fb_buffers),
131 },
132 );
133 fbb.finish_minimal(fb_array);
134 let (fb_vec, fb_start) = fbb.collapse();
135 let fb_end = fb_vec.len();
136 let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
137 let fb_length = fb_buffer.len();
138
139 if options.include_padding {
140 let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
141 if padding > 0 {
142 buffers.push(zeros.slice(0..padding));
143 }
144 }
145 buffers.push(fb_buffer);
146
147 buffers.push(ByteBuffer::from(
149 u32::try_from(fb_length)
150 .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
151 .to_le_bytes()
152 .to_vec(),
153 ));
154
155 Ok(buffers)
156 }
157}
158
159pub struct ArrayNodeFlatBuffer<'a> {
161 ctx: &'a ArrayContext,
162 array: &'a dyn Array,
163 buffer_idx: u16,
164}
165
166impl<'a> ArrayNodeFlatBuffer<'a> {
167 pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn Array) -> VortexResult<Self> {
168 for child in array.depth_first_traversal() {
170 if child.metadata()?.is_none() {
171 vortex_bail!(
172 "Array {} does not support serialization",
173 child.encoding_id()
174 );
175 }
176 }
177 let n_buffers_recursive = array.nbuffers_recursive();
178 if n_buffers_recursive > u16::MAX as usize {
179 vortex_bail!(
180 "Array and all descendent arrays can have at most u16::MAX buffers: {}",
181 n_buffers_recursive
182 );
183 };
184 Ok(Self {
185 ctx,
186 array,
187 buffer_idx: 0,
188 })
189 }
190
191 pub fn try_write_flatbuffer<'fb>(
192 &self,
193 fbb: &mut FlatBufferBuilder<'fb>,
194 ) -> VortexResult<WIPOffset<fba::ArrayNode<'fb>>> {
195 let encoding_idx = self
196 .ctx
197 .intern(&self.array.encoding_id())
198 .ok_or_else(|| {
200 vortex_err!(
201 "Array encoding {} not permitted by ctx",
202 self.array.encoding_id()
203 )
204 })?;
205
206 let metadata = self.array.metadata()?.ok_or_else(|| {
207 vortex_err!(
208 "Array {} does not support serialization",
209 self.array.encoding_id()
210 )
211 })?;
212 let metadata = Some(fbb.create_vector(metadata.as_slice()));
213
214 let nbuffers = u16::try_from(self.array.nbuffers())
216 .map_err(|_| vortex_err!("Array can have at most u16::MAX buffers"))?;
217 let mut child_buffer_idx = self.buffer_idx + nbuffers;
218
219 let children = &self
220 .array
221 .children()
222 .iter()
223 .map(|child| {
224 let msg = ArrayNodeFlatBuffer {
226 ctx: self.ctx,
227 array: child,
228 buffer_idx: child_buffer_idx,
229 }
230 .try_write_flatbuffer(fbb)?;
231
232 child_buffer_idx = u16::try_from(child.nbuffers_recursive())
233 .ok()
234 .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
235 .ok_or_else(|| vortex_err!("Too many buffers (u16) for Array"))?;
236
237 Ok(msg)
238 })
239 .collect::<VortexResult<Vec<_>>>()?;
240 let children = Some(fbb.create_vector(children));
241
242 let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
243 let stats = Some(self.array.statistics().write_flatbuffer(fbb)?);
244
245 Ok(fba::ArrayNode::create(
246 fbb,
247 &fba::ArrayNodeArgs {
248 encoding: encoding_idx,
249 metadata,
250 children,
251 buffers,
252 stats,
253 },
254 ))
255 }
256}
257
258pub trait ArrayChildren {
261 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
263
264 fn len(&self) -> usize;
266
267 fn is_empty(&self) -> bool {
269 self.len() == 0
270 }
271}
272
273impl ArrayChildren for &[ArrayRef] {
274 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
275 let array = self[index].clone();
276 assert_eq!(array.len(), len);
277 assert_eq!(array.dtype(), dtype);
278 Ok(array)
279 }
280
281 fn len(&self) -> usize {
282 <[_]>::len(self)
283 }
284}
285
286#[derive(Clone)]
293pub struct ArrayParts {
294 flatbuffer: FlatBuffer,
296 flatbuffer_loc: usize,
298 buffers: Arc<[BufferHandle]>,
299}
300
301impl Debug for ArrayParts {
302 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
303 f.debug_struct("ArrayParts")
304 .field("encoding_id", &self.encoding_id())
305 .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
306 .field(
307 "buffers",
308 &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
309 )
310 .field("metadata", &self.metadata())
311 .finish()
312 }
313}
314
315impl ArrayParts {
316 pub fn decode(
318 &self,
319 dtype: &DType,
320 len: usize,
321 ctx: &ArrayContext,
322 session: &VortexSession,
323 ) -> VortexResult<ArrayRef> {
324 let encoding_idx = self.flatbuffer().encoding();
325 let encoding_id = ctx
326 .resolve(encoding_idx)
327 .ok_or_else(|| vortex_err!("Unknown encoding index: {}", encoding_idx))?;
328 let vtable = session
329 .arrays()
330 .registry()
331 .find(&encoding_id)
332 .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
333
334 let children = ArrayPartsChildren {
335 parts: self,
336 ctx,
337 session,
338 };
339
340 let buffers = self.collect_buffers()?;
341
342 let decoded = vtable.build(
343 encoding_id.clone(),
344 dtype,
345 len,
346 self.metadata(),
347 &buffers,
348 &children,
349 session,
350 )?;
351
352 assert_eq!(
353 decoded.len(),
354 len,
355 "Array decoded from {} has incorrect length {}, expected {}",
356 encoding_id,
357 decoded.len(),
358 len
359 );
360 assert_eq!(
361 decoded.dtype(),
362 dtype,
363 "Array decoded from {} has incorrect dtype {}, expected {}",
364 encoding_id,
365 decoded.dtype(),
366 dtype,
367 );
368 assert_eq!(
369 decoded.encoding_id(),
370 encoding_id,
371 "Array decoded from {} has incorrect encoding {}",
372 encoding_id,
373 decoded.encoding_id(),
374 );
375
376 if let Some(stats) = self.flatbuffer().stats() {
378 decoded
379 .statistics()
380 .set_iter(StatsSet::from_flatbuffer(&stats, dtype)?.into_iter());
381 }
382
383 Ok(decoded)
384 }
385
386 pub fn encoding_id(&self) -> u16 {
388 self.flatbuffer().encoding()
389 }
390
391 pub fn metadata(&self) -> &[u8] {
393 self.flatbuffer()
394 .metadata()
395 .map(|metadata| metadata.bytes())
396 .unwrap_or(&[])
397 }
398
399 pub fn nchildren(&self) -> usize {
401 self.flatbuffer()
402 .children()
403 .map_or(0, |children| children.len())
404 }
405
406 pub fn child(&self, idx: usize) -> ArrayParts {
408 let children = self
409 .flatbuffer()
410 .children()
411 .vortex_expect("Expected array to have children");
412 if idx >= children.len() {
413 vortex_panic!(
414 "Invalid child index {} for array with {} children",
415 idx,
416 children.len()
417 );
418 }
419 self.with_root(children.get(idx))
420 }
421
422 pub fn nbuffers(&self) -> usize {
424 self.flatbuffer()
425 .buffers()
426 .map_or(0, |buffers| buffers.len())
427 }
428
429 pub fn buffer(&self, idx: usize) -> VortexResult<BufferHandle> {
431 let buffer_idx = self
432 .flatbuffer()
433 .buffers()
434 .ok_or_else(|| vortex_err!("Array has no buffers"))?
435 .get(idx);
436 self.buffers
437 .get(buffer_idx as usize)
438 .cloned()
439 .ok_or_else(|| {
440 vortex_err!(
441 "Invalid buffer index {} for array with {} buffers",
442 buffer_idx,
443 self.nbuffers()
444 )
445 })
446 }
447
448 fn collect_buffers(&self) -> VortexResult<Cow<'_, [BufferHandle]>> {
453 let Some(fb_buffers) = self.flatbuffer().buffers() else {
454 return Ok(Cow::Borrowed(&[]));
455 };
456 let count = fb_buffers.len();
457 if count == 0 {
458 return Ok(Cow::Borrowed(&[]));
459 }
460 let start = fb_buffers.get(0) as usize;
461 let contiguous = fb_buffers
462 .iter()
463 .enumerate()
464 .all(|(i, idx)| idx as usize == start + i);
465 if contiguous {
466 self.buffers.get(start..start + count).map_or_else(
467 || {
468 vortex_bail!(
469 "buffer indices {}..{} out of range for {} buffers",
470 start,
471 start + count,
472 self.buffers.len()
473 )
474 },
475 |slice| Ok(Cow::Borrowed(slice)),
476 )
477 } else {
478 (0..count)
479 .map(|idx| self.buffer(idx))
480 .collect::<VortexResult<Vec<_>>>()
481 .map(Cow::Owned)
482 }
483 }
484
485 pub fn buffer_lengths(&self) -> Vec<usize> {
491 let fb_array = root::<fba::Array>(self.flatbuffer.as_ref())
492 .vortex_expect("ArrayParts flatbuffer must be a valid Array");
493 fb_array
494 .buffers()
495 .map(|buffers| buffers.iter().map(|b| b.length() as usize).collect())
496 .unwrap_or_default()
497 }
498
499 fn validate_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<(FlatBuffer, usize)> {
501 let fb_buffer = FlatBuffer::align_from(array_tree.into());
502 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
503 let fb_root = fb_array
504 .root()
505 .ok_or_else(|| vortex_err!("Array must have a root node"))?;
506 let flatbuffer_loc = fb_root._tab.loc();
507 Ok((fb_buffer, flatbuffer_loc))
508 }
509
510 pub fn from_flatbuffer_with_buffers(
517 array_tree: impl Into<ByteBuffer>,
518 buffers: Vec<BufferHandle>,
519 ) -> VortexResult<Self> {
520 let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
521 Ok(ArrayParts {
522 flatbuffer,
523 flatbuffer_loc,
524 buffers: buffers.into(),
525 })
526 }
527
528 pub fn from_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<Self> {
537 let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
538 Ok(ArrayParts {
539 flatbuffer,
540 flatbuffer_loc,
541 buffers: Arc::new([]),
542 })
543 }
544
545 fn flatbuffer(&self) -> fba::ArrayNode<'_> {
547 unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
548 }
549
550 fn with_root(&self, root: fba::ArrayNode) -> Self {
553 let mut this = self.clone();
554 this.flatbuffer_loc = root._tab.loc();
555 this
556 }
557
558 pub fn from_flatbuffer_and_segment(
564 array_tree: ByteBuffer,
565 segment: BufferHandle,
566 ) -> VortexResult<Self> {
567 Self::from_flatbuffer_and_segment_with_overrides(array_tree, segment, &HashMap::new())
569 }
570
571 pub fn from_flatbuffer_and_segment_with_overrides(
578 array_tree: ByteBuffer,
579 segment: BufferHandle,
580 buffer_overrides: &HashMap<u32, ByteBuffer>,
581 ) -> VortexResult<Self> {
582 let segment = segment.ensure_aligned(Alignment::none())?;
585
586 let (fb_buffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
589 let fb_array = unsafe { fba::root_as_array_unchecked(fb_buffer.as_ref()) };
591
592 let mut offset = 0;
593 let buffers = fb_array
594 .buffers()
595 .unwrap_or_default()
596 .iter()
597 .enumerate()
598 .map(|(idx, fb_buf)| {
599 offset += fb_buf.padding() as usize;
600 let buffer_len = fb_buf.length() as usize;
601 let alignment = Alignment::from_exponent(fb_buf.alignment_exponent());
602
603 let idx = u32::try_from(idx).vortex_expect("buffer count must fit in u32");
604 let handle = if let Some(host_data) = buffer_overrides.get(&idx) {
605 BufferHandle::new_host(host_data.clone()).ensure_aligned(alignment)?
606 } else {
607 let buffer = segment.slice(offset..(offset + buffer_len));
608 buffer.ensure_aligned(alignment)?
609 };
610
611 offset += buffer_len;
612 Ok(handle)
613 })
614 .collect::<VortexResult<Arc<[_]>>>()?;
615
616 Ok(ArrayParts {
617 flatbuffer: fb_buffer,
618 flatbuffer_loc,
619 buffers,
620 })
621 }
622}
623
624struct ArrayPartsChildren<'a> {
625 parts: &'a ArrayParts,
626 ctx: &'a ArrayContext,
627 session: &'a VortexSession,
628}
629
630impl ArrayChildren for ArrayPartsChildren<'_> {
631 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
632 self.parts
633 .child(index)
634 .decode(dtype, len, self.ctx, self.session)
635 }
636
637 fn len(&self) -> usize {
638 self.parts.nchildren()
639 }
640}
641
642impl TryFrom<ByteBuffer> for ArrayParts {
643 type Error = VortexError;
644
645 fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
646 if value.len() < 4 {
648 vortex_bail!("ArrayParts buffer is too short");
649 }
650
651 let value = value.aligned(Alignment::none());
653
654 let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
655 if value.len() < 4 + fb_length {
656 vortex_bail!("ArrayParts buffer is too short for flatbuffer");
657 }
658
659 let fb_offset = value.len() - 4 - fb_length;
660 let array_tree = value.slice(fb_offset..fb_offset + fb_length);
661 let segment = BufferHandle::new_host(value.slice(0..fb_offset));
662
663 Self::from_flatbuffer_and_segment(array_tree, segment)
664 }
665}
666
667impl TryFrom<BufferHandle> for ArrayParts {
668 type Error = VortexError;
669
670 fn try_from(value: BufferHandle) -> Result<Self, Self::Error> {
671 Self::try_from(value.try_to_host_sync()?)
672 }
673}