1use std::borrow::Cow;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::iter;
8use std::sync::Arc;
9
10use flatbuffers::FlatBufferBuilder;
11use flatbuffers::Follow;
12use flatbuffers::WIPOffset;
13use flatbuffers::root;
14use vortex_buffer::Alignment;
15use vortex_buffer::ByteBuffer;
16use vortex_error::VortexError;
17use vortex_error::VortexExpect;
18use vortex_error::VortexResult;
19use vortex_error::vortex_bail;
20use vortex_error::vortex_err;
21use vortex_error::vortex_panic;
22use vortex_flatbuffers::FlatBuffer;
23use vortex_flatbuffers::WriteFlatBuffer;
24use vortex_flatbuffers::array as fba;
25use vortex_flatbuffers::array::Compression;
26use vortex_session::VortexSession;
27use vortex_session::registry::ReadContext;
28use vortex_utils::aliases::hash_map::HashMap;
29
30use crate::ArrayContext;
31use crate::ArrayRef;
32use crate::array::new_foreign_array;
33use crate::buffer::BufferHandle;
34use crate::dtype::DType;
35use crate::dtype::TryFromBytes;
36use crate::session::ArraySessionExt;
37use crate::stats::StatsSet;
38
39#[derive(Default, Debug)]
41pub struct SerializeOptions {
42 pub offset: usize,
45 pub include_padding: bool,
47}
48
49impl ArrayRef {
50 pub fn serialize(
61 &self,
62 ctx: &ArrayContext,
63 session: &VortexSession,
64 options: &SerializeOptions,
65 ) -> VortexResult<Vec<ByteBuffer>> {
66 let array_buffers = self
68 .depth_first_traversal()
69 .flat_map(|f| f.buffers())
70 .collect::<Vec<_>>();
71
72 let mut buffers = vec![];
74 let mut fb_buffers = Vec::with_capacity(buffers.capacity());
75
76 let max_alignment = array_buffers
78 .iter()
79 .map(|buf| buf.alignment())
80 .chain(iter::once(FlatBuffer::alignment()))
81 .max()
82 .unwrap_or_else(FlatBuffer::alignment);
83
84 let zeros = ByteBuffer::zeroed(*max_alignment);
86
87 buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
90
91 let mut pos = options.offset;
93
94 for buffer in array_buffers {
96 let padding = if options.include_padding {
97 let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
98 if padding > 0 {
99 pos += padding;
100 buffers.push(zeros.slice(0..padding));
101 }
102 padding
103 } else {
104 0
105 };
106
107 fb_buffers.push(fba::Buffer::new(
108 u16::try_from(padding).vortex_expect("padding fits into u16"),
109 buffer.alignment().exponent(),
110 Compression::None,
111 u32::try_from(buffer.len())
112 .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
113 ));
114
115 pos += buffer.len();
116 buffers.push(buffer.aligned(Alignment::none()));
117 }
118
119 let mut fbb = FlatBufferBuilder::new();
121
122 let root = ArrayNodeFlatBuffer::try_new(ctx, session, self)?;
123 let fb_root = root.try_write_flatbuffer(&mut fbb)?;
124
125 let fb_buffers = fbb.create_vector(&fb_buffers);
126 let fb_array = fba::Array::create(
127 &mut fbb,
128 &fba::ArrayArgs {
129 root: Some(fb_root),
130 buffers: Some(fb_buffers),
131 },
132 );
133 fbb.finish_minimal(fb_array);
134 let (fb_vec, fb_start) = fbb.collapse();
135 let fb_end = fb_vec.len();
136 let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
137 let fb_length = fb_buffer.len();
138
139 if options.include_padding {
140 let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
141 if padding > 0 {
142 buffers.push(zeros.slice(0..padding));
143 }
144 }
145 buffers.push(fb_buffer);
146
147 buffers.push(ByteBuffer::from(
149 u32::try_from(fb_length)
150 .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
151 .to_le_bytes()
152 .to_vec(),
153 ));
154
155 Ok(buffers)
156 }
157}
158
159pub struct ArrayNodeFlatBuffer<'a> {
161 ctx: &'a ArrayContext,
162 session: &'a VortexSession,
163 array: &'a ArrayRef,
164 buffer_idx: u16,
165}
166
167impl<'a> ArrayNodeFlatBuffer<'a> {
168 pub fn try_new(
169 ctx: &'a ArrayContext,
170 session: &'a VortexSession,
171 array: &'a ArrayRef,
172 ) -> VortexResult<Self> {
173 for child in array.depth_first_traversal() {
176 if child.metadata(session)?.is_none() {
177 vortex_bail!(
178 "Array {} does not support serialization",
179 child.encoding_id()
180 );
181 }
182 }
183 let n_buffers_recursive = array.nbuffers_recursive();
184 if n_buffers_recursive > u16::MAX as usize {
185 vortex_bail!(
186 "Array and all descendent arrays can have at most u16::MAX buffers: {}",
187 n_buffers_recursive
188 );
189 };
190 Ok(Self {
191 ctx,
192 session,
193 array,
194 buffer_idx: 0,
195 })
196 }
197
198 pub fn try_write_flatbuffer<'fb>(
199 &self,
200 fbb: &mut FlatBufferBuilder<'fb>,
201 ) -> VortexResult<WIPOffset<fba::ArrayNode<'fb>>> {
202 let encoding_idx = self
203 .ctx
204 .intern(&self.array.encoding_id())
205 .ok_or_else(|| {
207 vortex_err!(
208 "Array encoding {} not permitted by ctx",
209 self.array.encoding_id()
210 )
211 })?;
212
213 let metadata = self.array.metadata(self.session)?.ok_or_else(|| {
214 vortex_err!(
215 "Array {} does not support serialization",
216 self.array.encoding_id()
217 )
218 })?;
219 let metadata = Some(fbb.create_vector(metadata.as_slice()));
220
221 let nbuffers = u16::try_from(self.array.nbuffers())
223 .map_err(|_| vortex_err!("Array can have at most u16::MAX buffers"))?;
224 let mut child_buffer_idx = self.buffer_idx + nbuffers;
225
226 let children = self
227 .array
228 .children()
229 .iter()
230 .map(|child| {
231 let msg = ArrayNodeFlatBuffer {
233 ctx: self.ctx,
234 session: self.session,
235 array: child,
236 buffer_idx: child_buffer_idx,
237 }
238 .try_write_flatbuffer(fbb)?;
239
240 child_buffer_idx = u16::try_from(child.nbuffers_recursive())
241 .ok()
242 .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
243 .ok_or_else(|| vortex_err!("Too many buffers (u16) for Array"))?;
244
245 Ok(msg)
246 })
247 .collect::<VortexResult<Vec<_>>>()?;
248 let children = Some(fbb.create_vector(&children));
249
250 let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
251 let stats = Some(self.array.statistics().write_flatbuffer(fbb)?);
252
253 Ok(fba::ArrayNode::create(
254 fbb,
255 &fba::ArrayNodeArgs {
256 encoding: encoding_idx,
257 metadata,
258 children,
259 buffers,
260 stats,
261 },
262 ))
263 }
264}
265
266pub trait ArrayChildren {
269 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
271
272 fn len(&self) -> usize;
274
275 fn is_empty(&self) -> bool {
277 self.len() == 0
278 }
279}
280
281impl<T: AsRef<[ArrayRef]>> ArrayChildren for T {
282 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
283 let array = self.as_ref()[index].clone();
284 assert_eq!(array.len(), len);
285 assert_eq!(array.dtype(), dtype);
286 Ok(array)
287 }
288
289 fn len(&self) -> usize {
290 self.as_ref().len()
291 }
292}
293
294#[derive(Clone)]
301pub struct SerializedArray {
302 flatbuffer: FlatBuffer,
304 flatbuffer_loc: usize,
306 buffers: Arc<[BufferHandle]>,
307}
308
309impl Debug for SerializedArray {
310 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
311 f.debug_struct("SerializedArray")
312 .field("encoding_id", &self.encoding_id())
313 .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
314 .field(
315 "buffers",
316 &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
317 )
318 .field("metadata", &self.metadata())
319 .finish()
320 }
321}
322
323impl SerializedArray {
324 pub fn decode(
326 &self,
327 dtype: &DType,
328 len: usize,
329 ctx: &ReadContext,
330 session: &VortexSession,
331 ) -> VortexResult<ArrayRef> {
332 let encoding_idx = self.flatbuffer().encoding();
333 let encoding_id = ctx
334 .resolve(encoding_idx)
335 .ok_or_else(|| vortex_err!("Unknown encoding index: {}", encoding_idx))?;
336 let Some(plugin) = session.arrays().registry().find(&encoding_id) else {
337 if session.allows_unknown() {
338 return self.decode_foreign(encoding_id, dtype, len, ctx);
339 }
340 return Err(vortex_err!("Unknown encoding: {}", encoding_id));
341 };
342
343 let children = SerializedArrayChildren {
344 ser: self,
345 ctx,
346 session,
347 };
348
349 let buffers = self.collect_buffers()?;
350
351 let decoded =
352 plugin.deserialize(dtype, len, self.metadata(), &buffers, &children, session)?;
353
354 assert_eq!(
355 decoded.len(),
356 len,
357 "Array decoded from {} has incorrect length {}, expected {}",
358 encoding_id,
359 decoded.len(),
360 len
361 );
362 assert_eq!(
363 decoded.dtype(),
364 dtype,
365 "Array decoded from {} has incorrect dtype {}, expected {}",
366 encoding_id,
367 decoded.dtype(),
368 dtype,
369 );
370 assert_eq!(
371 decoded.encoding_id(),
372 encoding_id,
373 "Array decoded from {} has incorrect encoding {}",
374 encoding_id,
375 decoded.encoding_id(),
376 );
377
378 if let Some(stats) = self.flatbuffer().stats() {
380 decoded
381 .statistics()
382 .set_iter(StatsSet::from_flatbuffer(&stats, dtype, session)?.into_iter());
383 }
384
385 Ok(decoded)
386 }
387
388 fn decode_foreign(
389 &self,
390 encoding_id: crate::array::ArrayId,
391 dtype: &DType,
392 len: usize,
393 ctx: &ReadContext,
394 ) -> VortexResult<ArrayRef> {
395 let children = (0..self.nchildren())
396 .map(|idx| {
397 let child = self.child(idx);
398 let child_encoding_idx = child.flatbuffer().encoding();
399 let child_encoding_id = ctx
400 .resolve(child_encoding_idx)
401 .ok_or_else(|| vortex_err!("Unknown encoding index: {}", child_encoding_idx))?;
402 child.decode_foreign(child_encoding_id, dtype, len, ctx)
403 })
404 .collect::<VortexResult<Vec<_>>>()?;
405
406 new_foreign_array(
407 encoding_id,
408 dtype.clone(),
409 len,
410 self.metadata().to_vec(),
411 self.collect_buffers()?.into_owned(),
412 children,
413 )
414 }
415
416 pub fn encoding_id(&self) -> u16 {
418 self.flatbuffer().encoding()
419 }
420
421 pub fn metadata(&self) -> &[u8] {
423 self.flatbuffer()
424 .metadata()
425 .map(|metadata| metadata.bytes())
426 .unwrap_or(&[])
427 }
428
429 pub fn nchildren(&self) -> usize {
431 self.flatbuffer()
432 .children()
433 .map_or(0, |children| children.len())
434 }
435
436 pub fn child(&self, idx: usize) -> SerializedArray {
438 let children = self
439 .flatbuffer()
440 .children()
441 .vortex_expect("Expected array to have children");
442 if idx >= children.len() {
443 vortex_panic!(
444 "Invalid child index {} for array with {} children",
445 idx,
446 children.len()
447 );
448 }
449 self.with_root(children.get(idx))
450 }
451
452 pub fn nbuffers(&self) -> usize {
454 self.flatbuffer()
455 .buffers()
456 .map_or(0, |buffers| buffers.len())
457 }
458
459 pub fn buffer(&self, idx: usize) -> VortexResult<BufferHandle> {
461 let buffer_idx = self
462 .flatbuffer()
463 .buffers()
464 .ok_or_else(|| vortex_err!("Array has no buffers"))?
465 .get(idx);
466 self.buffers
467 .get(buffer_idx as usize)
468 .cloned()
469 .ok_or_else(|| {
470 vortex_err!(
471 "Invalid buffer index {} for array with {} buffers",
472 buffer_idx,
473 self.nbuffers()
474 )
475 })
476 }
477
478 fn collect_buffers(&self) -> VortexResult<Cow<'_, [BufferHandle]>> {
483 let Some(fb_buffers) = self.flatbuffer().buffers() else {
484 return Ok(Cow::Borrowed(&[]));
485 };
486 let count = fb_buffers.len();
487 if count == 0 {
488 return Ok(Cow::Borrowed(&[]));
489 }
490 let start = fb_buffers.get(0) as usize;
491 let contiguous = fb_buffers
492 .iter()
493 .enumerate()
494 .all(|(i, idx)| idx as usize == start + i);
495 if contiguous {
496 self.buffers.get(start..start + count).map_or_else(
497 || {
498 vortex_bail!(
499 "buffer indices {}..{} out of range for {} buffers",
500 start,
501 start + count,
502 self.buffers.len()
503 )
504 },
505 |slice| Ok(Cow::Borrowed(slice)),
506 )
507 } else {
508 (0..count)
509 .map(|idx| self.buffer(idx))
510 .collect::<VortexResult<Vec<_>>>()
511 .map(Cow::Owned)
512 }
513 }
514
515 pub fn buffer_lengths(&self) -> Vec<usize> {
521 let fb_array = root::<fba::Array>(self.flatbuffer.as_ref())
522 .vortex_expect("SerializedArray flatbuffer must be a valid Array");
523 fb_array
524 .buffers()
525 .map(|buffers| buffers.iter().map(|b| b.length() as usize).collect())
526 .unwrap_or_default()
527 }
528
529 fn validate_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<(FlatBuffer, usize)> {
531 let fb_buffer = FlatBuffer::align_from(array_tree.into());
532 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
533 let fb_root = fb_array
534 .root()
535 .ok_or_else(|| vortex_err!("Array must have a root node"))?;
536 let flatbuffer_loc = fb_root._tab.loc();
537 Ok((fb_buffer, flatbuffer_loc))
538 }
539
540 pub fn from_flatbuffer_with_buffers(
547 array_tree: impl Into<ByteBuffer>,
548 buffers: Vec<BufferHandle>,
549 ) -> VortexResult<Self> {
550 let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
551 Ok(SerializedArray {
552 flatbuffer,
553 flatbuffer_loc,
554 buffers: buffers.into(),
555 })
556 }
557
558 pub fn from_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<Self> {
567 let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
568 Ok(SerializedArray {
569 flatbuffer,
570 flatbuffer_loc,
571 buffers: Arc::new([]),
572 })
573 }
574
575 fn flatbuffer(&self) -> fba::ArrayNode<'_> {
577 unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
578 }
579
580 fn with_root(&self, root: fba::ArrayNode) -> Self {
583 let mut this = self.clone();
584 this.flatbuffer_loc = root._tab.loc();
585 this
586 }
587
588 pub fn from_flatbuffer_and_segment(
594 array_tree: ByteBuffer,
595 segment: BufferHandle,
596 ) -> VortexResult<Self> {
597 Self::from_flatbuffer_and_segment_with_overrides(array_tree, segment, &HashMap::new())
599 }
600
601 pub fn from_flatbuffer_and_segment_with_overrides(
608 array_tree: ByteBuffer,
609 segment: BufferHandle,
610 buffer_overrides: &HashMap<u32, ByteBuffer>,
611 ) -> VortexResult<Self> {
612 let segment = segment.ensure_aligned(Alignment::none())?;
615
616 let (fb_buffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
619 let fb_array = unsafe { fba::root_as_array_unchecked(fb_buffer.as_ref()) };
621
622 let mut offset = 0;
623 let buffers = fb_array
624 .buffers()
625 .unwrap_or_default()
626 .iter()
627 .enumerate()
628 .map(|(idx, fb_buf)| {
629 offset += fb_buf.padding() as usize;
630 let buffer_len = fb_buf.length() as usize;
631 let alignment = Alignment::from_exponent(fb_buf.alignment_exponent());
632
633 let idx = u32::try_from(idx).vortex_expect("buffer count must fit in u32");
634 let handle = if let Some(host_data) = buffer_overrides.get(&idx) {
635 BufferHandle::new_host(host_data.clone()).ensure_aligned(alignment)?
636 } else {
637 let buffer = segment.slice(offset..(offset + buffer_len));
638 buffer.ensure_aligned(alignment)?
639 };
640
641 offset += buffer_len;
642 Ok(handle)
643 })
644 .collect::<VortexResult<Arc<[_]>>>()?;
645
646 Ok(SerializedArray {
647 flatbuffer: fb_buffer,
648 flatbuffer_loc,
649 buffers,
650 })
651 }
652}
653
654struct SerializedArrayChildren<'a> {
655 ser: &'a SerializedArray,
656 ctx: &'a ReadContext,
657 session: &'a VortexSession,
658}
659
660impl ArrayChildren for SerializedArrayChildren<'_> {
661 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
662 self.ser
663 .child(index)
664 .decode(dtype, len, self.ctx, self.session)
665 }
666
667 fn len(&self) -> usize {
668 self.ser.nchildren()
669 }
670}
671
672impl TryFrom<ByteBuffer> for SerializedArray {
673 type Error = VortexError;
674
675 fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
676 if value.len() < 4 {
678 vortex_bail!("SerializedArray buffer is too short");
679 }
680
681 let value = value.aligned(Alignment::none());
683
684 let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
685 if value.len() < 4 + fb_length {
686 vortex_bail!("SerializedArray buffer is too short for flatbuffer");
687 }
688
689 let fb_offset = value.len() - 4 - fb_length;
690 let array_tree = value.slice(fb_offset..fb_offset + fb_length);
691 let segment = BufferHandle::new_host(value.slice(0..fb_offset));
692
693 Self::from_flatbuffer_and_segment(array_tree, segment)
694 }
695}
696
697impl TryFrom<BufferHandle> for SerializedArray {
698 type Error = VortexError;
699
700 fn try_from(value: BufferHandle) -> Result<Self, Self::Error> {
701 Self::try_from(value.try_to_host_sync()?)
702 }
703}
704
705#[cfg(test)]
706mod tests {
707 use std::sync::LazyLock;
708
709 use flatbuffers::FlatBufferBuilder;
710 use vortex_session::VortexSession;
711 use vortex_session::registry::ReadContext;
712
713 use super::SerializeOptions;
714 use super::SerializedArray;
715 use crate::ArrayContext;
716 use crate::array::ArrayId;
717 use crate::dtype::DType;
718 use crate::dtype::Nullability;
719 use crate::flatbuffers as fba;
720 use crate::session::ArraySession;
721
722 static SESSION: LazyLock<VortexSession> = LazyLock::new(VortexSession::empty);
723
724 #[test]
725 fn unknown_array_encoding_allow_unknown() {
726 let mut fbb = FlatBufferBuilder::new();
727
728 let child_metadata = fbb.create_vector(&[9u8]);
729 let child = fba::ArrayNode::create(
730 &mut fbb,
731 &fba::ArrayNodeArgs {
732 encoding: 1,
733 metadata: Some(child_metadata),
734 children: None,
735 buffers: None,
736 stats: None,
737 },
738 );
739
740 let children = fbb.create_vector(&[child]);
741 let metadata = fbb.create_vector(&[1u8, 2, 3]);
742 let root = fba::ArrayNode::create(
743 &mut fbb,
744 &fba::ArrayNodeArgs {
745 encoding: 0,
746 metadata: Some(metadata),
747 children: Some(children),
748 buffers: None,
749 stats: None,
750 },
751 );
752 let array = fba::Array::create(
753 &mut fbb,
754 &fba::ArrayArgs {
755 root: Some(root),
756 buffers: None,
757 },
758 );
759 fbb.finish_minimal(array);
760 let (buf, start) = fbb.collapse();
761 let tree = vortex_buffer::ByteBuffer::from(buf).slice(start..);
762
763 let ser = SerializedArray::from_array_tree(tree).unwrap();
764 let ctx = ReadContext::new([
765 ArrayId::new_ref("vortex.test.foreign_array"),
766 ArrayId::new_ref("vortex.test.foreign_child"),
767 ]);
768 let session = VortexSession::empty()
769 .with::<ArraySession>()
770 .allow_unknown();
771
772 let decoded = ser
773 .decode(&DType::Variant(Nullability::Nullable), 5, &ctx, &session)
774 .unwrap();
775 assert_eq!(decoded.encoding_id().as_ref(), "vortex.test.foreign_array");
776 assert_eq!(decoded.nchildren(), 1);
777 assert_eq!(
778 decoded.nth_child(0).unwrap().encoding_id().as_ref(),
779 "vortex.test.foreign_child"
780 );
781 assert_eq!(decoded.metadata(&SESSION).unwrap().unwrap(), vec![1, 2, 3]);
782 assert_eq!(
783 decoded
784 .nth_child(0)
785 .unwrap()
786 .metadata(&SESSION)
787 .unwrap()
788 .unwrap(),
789 vec![9]
790 );
791
792 let serialized = decoded
793 .serialize(
794 &ArrayContext::default(),
795 &SESSION,
796 &SerializeOptions::default(),
797 )
798 .unwrap();
799 assert!(!serialized.is_empty());
800 }
801}