1use std::borrow::Cow;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::iter;
8use std::sync::Arc;
9
10use flatbuffers::FlatBufferBuilder;
11use flatbuffers::Follow;
12use flatbuffers::WIPOffset;
13use flatbuffers::root;
14use vortex_buffer::Alignment;
15use vortex_buffer::ByteBuffer;
16use vortex_error::VortexError;
17use vortex_error::VortexExpect;
18use vortex_error::VortexResult;
19use vortex_error::vortex_bail;
20use vortex_error::vortex_err;
21use vortex_error::vortex_panic;
22use vortex_flatbuffers::FlatBuffer;
23use vortex_flatbuffers::WriteFlatBuffer;
24use vortex_flatbuffers::array as fba;
25use vortex_flatbuffers::array::Compression;
26use vortex_session::VortexSession;
27use vortex_session::registry::ReadContext;
28use vortex_utils::aliases::hash_map::HashMap;
29
30use crate::ArrayContext;
31use crate::ArrayRef;
32use crate::ArraySlots;
33use crate::array::ArrayId;
34use crate::array::new_foreign_array;
35use crate::buffer::BufferHandle;
36use crate::dtype::DType;
37use crate::dtype::TryFromBytes;
38use crate::session::ArraySessionExt;
39use crate::stats::StatsSet;
40
41#[derive(Default, Debug)]
43pub struct SerializeOptions {
44 pub offset: usize,
47 pub include_padding: bool,
49}
50
51impl ArrayRef {
52 pub fn serialize(
63 &self,
64 ctx: &ArrayContext,
65 session: &VortexSession,
66 options: &SerializeOptions,
67 ) -> VortexResult<Vec<ByteBuffer>> {
68 let array_buffers = self
70 .depth_first_traversal()
71 .flat_map(|f| f.buffers())
72 .collect::<Vec<_>>();
73
74 let mut buffers = vec![];
76 let mut fb_buffers = Vec::with_capacity(buffers.capacity());
77
78 let max_alignment = array_buffers
80 .iter()
81 .map(|buf| buf.alignment())
82 .chain(iter::once(FlatBuffer::alignment()))
83 .max()
84 .unwrap_or_else(FlatBuffer::alignment);
85
86 let zeros = ByteBuffer::zeroed(*max_alignment);
88
89 buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
92
93 let mut pos = options.offset;
95
96 for buffer in array_buffers {
98 let padding = if options.include_padding {
99 let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
100 if padding > 0 {
101 pos += padding;
102 buffers.push(zeros.slice(0..padding));
103 }
104 padding
105 } else {
106 0
107 };
108
109 fb_buffers.push(fba::Buffer::new(
110 u16::try_from(padding).vortex_expect("padding fits into u16"),
111 buffer.alignment().exponent(),
112 Compression::None,
113 u32::try_from(buffer.len())
114 .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
115 ));
116
117 pos += buffer.len();
118 buffers.push(buffer.aligned(Alignment::none()));
119 }
120
121 let mut fbb = FlatBufferBuilder::new();
123
124 let root = ArrayNodeFlatBuffer::try_new(ctx, session, self)?;
125 let fb_root = root.try_write_flatbuffer(&mut fbb)?;
126
127 let fb_buffers = fbb.create_vector(&fb_buffers);
128 let fb_array = fba::Array::create(
129 &mut fbb,
130 &fba::ArrayArgs {
131 root: Some(fb_root),
132 buffers: Some(fb_buffers),
133 },
134 );
135 fbb.finish_minimal(fb_array);
136 let (fb_vec, fb_start) = fbb.collapse();
137 let fb_end = fb_vec.len();
138 let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
139 let fb_length = fb_buffer.len();
140
141 if options.include_padding {
142 let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
143 if padding > 0 {
144 buffers.push(zeros.slice(0..padding));
145 }
146 }
147 buffers.push(fb_buffer);
148
149 buffers.push(ByteBuffer::from(
151 u32::try_from(fb_length)
152 .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
153 .to_le_bytes()
154 .to_vec(),
155 ));
156
157 Ok(buffers)
158 }
159}
160
161pub struct ArrayNodeFlatBuffer<'a> {
163 ctx: &'a ArrayContext,
164 session: &'a VortexSession,
165 array: &'a ArrayRef,
166 buffer_idx: u16,
167}
168
169impl<'a> ArrayNodeFlatBuffer<'a> {
170 pub fn try_new(
171 ctx: &'a ArrayContext,
172 session: &'a VortexSession,
173 array: &'a ArrayRef,
174 ) -> VortexResult<Self> {
175 let n_buffers_recursive = array.nbuffers_recursive();
176 if n_buffers_recursive > u16::MAX as usize {
177 vortex_bail!(
178 "Array and all descendent arrays can have at most u16::MAX buffers: {}",
179 n_buffers_recursive
180 );
181 };
182 Ok(Self {
183 ctx,
184 session,
185 array,
186 buffer_idx: 0,
187 })
188 }
189
190 pub fn try_write_flatbuffer<'fb>(
191 &self,
192 fbb: &mut FlatBufferBuilder<'fb>,
193 ) -> VortexResult<WIPOffset<fba::ArrayNode<'fb>>> {
194 let encoding_idx = self
195 .ctx
196 .intern(&self.array.encoding_id())
197 .ok_or_else(|| {
199 vortex_err!(
200 "Array encoding {} not permitted by ctx",
201 self.array.encoding_id()
202 )
203 })?;
204
205 let metadata_bytes = self.session.array_serialize(self.array)?.ok_or_else(|| {
206 vortex_err!(
207 "Array {} does not support serialization",
208 self.array.encoding_id()
209 )
210 })?;
211 let metadata = Some(fbb.create_vector(metadata_bytes.as_slice()));
212
213 let nbuffers = u16::try_from(self.array.nbuffers())
215 .map_err(|_| vortex_err!("Array can have at most u16::MAX buffers"))?;
216 let mut child_buffer_idx = self.buffer_idx + nbuffers;
217
218 let children = self
219 .array
220 .children()
221 .iter()
222 .map(|child| {
223 let msg = ArrayNodeFlatBuffer {
225 ctx: self.ctx,
226 session: self.session,
227 array: child,
228 buffer_idx: child_buffer_idx,
229 }
230 .try_write_flatbuffer(fbb)?;
231
232 child_buffer_idx = u16::try_from(child.nbuffers_recursive())
233 .ok()
234 .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
235 .ok_or_else(|| vortex_err!("Too many buffers (u16) for Array"))?;
236
237 Ok(msg)
238 })
239 .collect::<VortexResult<Vec<_>>>()?;
240 let children = Some(fbb.create_vector(&children));
241
242 let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
243 let stats = Some(self.array.statistics().write_flatbuffer(fbb)?);
244
245 Ok(fba::ArrayNode::create(
246 fbb,
247 &fba::ArrayNodeArgs {
248 encoding: encoding_idx,
249 metadata,
250 children,
251 buffers,
252 stats,
253 },
254 ))
255 }
256}
257
258pub trait ArrayChildren {
261 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
263
264 fn len(&self) -> usize;
266
267 fn is_empty(&self) -> bool {
269 self.len() == 0
270 }
271}
272
273impl<T: AsRef<[ArrayRef]>> ArrayChildren for T {
274 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
275 let array = self.as_ref()[index].clone();
276 assert_eq!(array.len(), len);
277 assert_eq!(array.dtype(), dtype);
278 Ok(array)
279 }
280
281 fn len(&self) -> usize {
282 self.as_ref().len()
283 }
284}
285
286#[derive(Clone)]
293pub struct SerializedArray {
294 flatbuffer: FlatBuffer,
296 flatbuffer_loc: usize,
298 buffers: Arc<[BufferHandle]>,
299}
300
301impl Debug for SerializedArray {
302 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
303 f.debug_struct("SerializedArray")
304 .field("encoding_id", &self.encoding_id())
305 .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
306 .field(
307 "buffers",
308 &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
309 )
310 .field("metadata", &self.metadata())
311 .finish()
312 }
313}
314
315impl SerializedArray {
316 pub fn decode(
318 &self,
319 dtype: &DType,
320 len: usize,
321 ctx: &ReadContext,
322 session: &VortexSession,
323 ) -> VortexResult<ArrayRef> {
324 let encoding_idx = self.flatbuffer().encoding();
325 let encoding_id = ctx
326 .resolve(encoding_idx)
327 .ok_or_else(|| vortex_err!("Unknown encoding index: {}", encoding_idx))?;
328 let Some(plugin) = session.arrays().registry().find(&encoding_id) else {
329 if session.allows_unknown() {
330 return self.decode_foreign(encoding_id, dtype, len, ctx);
331 }
332 return Err(vortex_err!("Unknown encoding: {}", encoding_id));
333 };
334
335 let children = SerializedArrayChildren {
336 ser: self,
337 ctx,
338 session,
339 };
340
341 let buffers = self.collect_buffers()?;
342
343 let decoded =
344 plugin.deserialize(dtype, len, self.metadata(), &buffers, &children, session)?;
345
346 assert_eq!(
347 decoded.len(),
348 len,
349 "Array decoded from {} has incorrect length {}, expected {}",
350 encoding_id,
351 decoded.len(),
352 len
353 );
354 assert_eq!(
355 decoded.dtype(),
356 dtype,
357 "Array decoded from {} has incorrect dtype {}, expected {}",
358 encoding_id,
359 decoded.dtype(),
360 dtype,
361 );
362
363 assert!(
364 plugin.is_supported_encoding(&decoded.encoding_id()),
365 "Array decoded from {} has incorrect encoding {}",
366 encoding_id,
367 decoded.encoding_id(),
368 );
369
370 if let Some(stats) = self.flatbuffer().stats() {
372 decoded
373 .statistics()
374 .set_iter(StatsSet::from_flatbuffer(&stats, dtype, session)?.into_iter());
375 }
376
377 Ok(decoded)
378 }
379
380 fn decode_foreign(
381 &self,
382 encoding_id: ArrayId,
383 dtype: &DType,
384 len: usize,
385 ctx: &ReadContext,
386 ) -> VortexResult<ArrayRef> {
387 let children = (0..self.nchildren())
388 .map(|idx| {
389 let child = self.child(idx);
390 let child_encoding_idx = child.flatbuffer().encoding();
391 let child_encoding_id = ctx
392 .resolve(child_encoding_idx)
393 .ok_or_else(|| vortex_err!("Unknown encoding index: {}", child_encoding_idx))?;
394 child
395 .decode_foreign(child_encoding_id, dtype, len, ctx)
396 .map(Some)
397 })
398 .collect::<VortexResult<ArraySlots>>()?;
399
400 new_foreign_array(
401 encoding_id,
402 dtype.clone(),
403 len,
404 self.metadata().to_vec(),
405 self.collect_buffers()?.into_owned(),
406 children,
407 )
408 }
409
410 pub fn encoding_id(&self) -> u16 {
412 self.flatbuffer().encoding()
413 }
414
415 pub fn metadata(&self) -> &[u8] {
417 self.flatbuffer()
418 .metadata()
419 .map(|metadata| metadata.bytes())
420 .unwrap_or(&[])
421 }
422
423 pub fn nchildren(&self) -> usize {
425 self.flatbuffer()
426 .children()
427 .map_or(0, |children| children.len())
428 }
429
430 pub fn child(&self, idx: usize) -> SerializedArray {
432 let children = self
433 .flatbuffer()
434 .children()
435 .vortex_expect("Expected array to have children");
436 if idx >= children.len() {
437 vortex_panic!(
438 "Invalid child index {} for array with {} children",
439 idx,
440 children.len()
441 );
442 }
443 self.with_root(children.get(idx))
444 }
445
446 pub fn nbuffers(&self) -> usize {
448 self.flatbuffer()
449 .buffers()
450 .map_or(0, |buffers| buffers.len())
451 }
452
453 pub fn buffer(&self, idx: usize) -> VortexResult<BufferHandle> {
455 let buffer_idx = self
456 .flatbuffer()
457 .buffers()
458 .ok_or_else(|| vortex_err!("Array has no buffers"))?
459 .get(idx);
460 self.buffers
461 .get(buffer_idx as usize)
462 .cloned()
463 .ok_or_else(|| {
464 vortex_err!(
465 "Invalid buffer index {} for array with {} buffers",
466 buffer_idx,
467 self.nbuffers()
468 )
469 })
470 }
471
472 fn collect_buffers(&self) -> VortexResult<Cow<'_, [BufferHandle]>> {
477 let Some(fb_buffers) = self.flatbuffer().buffers() else {
478 return Ok(Cow::Borrowed(&[]));
479 };
480 let count = fb_buffers.len();
481 if count == 0 {
482 return Ok(Cow::Borrowed(&[]));
483 }
484 let start = fb_buffers.get(0) as usize;
485 let contiguous = fb_buffers
486 .iter()
487 .enumerate()
488 .all(|(i, idx)| idx as usize == start + i);
489 if contiguous {
490 self.buffers.get(start..start + count).map_or_else(
491 || {
492 vortex_bail!(
493 "buffer indices {}..{} out of range for {} buffers",
494 start,
495 start + count,
496 self.buffers.len()
497 )
498 },
499 |slice| Ok(Cow::Borrowed(slice)),
500 )
501 } else {
502 (0..count)
503 .map(|idx| self.buffer(idx))
504 .collect::<VortexResult<Vec<_>>>()
505 .map(Cow::Owned)
506 }
507 }
508
509 pub fn buffer_lengths(&self) -> Vec<usize> {
515 let fb_array = root::<fba::Array>(self.flatbuffer.as_ref())
516 .vortex_expect("SerializedArray flatbuffer must be a valid Array");
517 fb_array
518 .buffers()
519 .map(|buffers| buffers.iter().map(|b| b.length() as usize).collect())
520 .unwrap_or_default()
521 }
522
523 fn validate_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<(FlatBuffer, usize)> {
525 let fb_buffer = FlatBuffer::align_from(array_tree.into());
526 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
527 let fb_root = fb_array
528 .root()
529 .ok_or_else(|| vortex_err!("Array must have a root node"))?;
530 let flatbuffer_loc = fb_root._tab.loc();
531 Ok((fb_buffer, flatbuffer_loc))
532 }
533
534 pub fn from_flatbuffer_with_buffers(
541 array_tree: impl Into<ByteBuffer>,
542 buffers: Vec<BufferHandle>,
543 ) -> VortexResult<Self> {
544 let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
545 Ok(SerializedArray {
546 flatbuffer,
547 flatbuffer_loc,
548 buffers: buffers.into(),
549 })
550 }
551
552 pub fn from_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<Self> {
561 let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
562 Ok(SerializedArray {
563 flatbuffer,
564 flatbuffer_loc,
565 buffers: Arc::new([]),
566 })
567 }
568
569 fn flatbuffer(&self) -> fba::ArrayNode<'_> {
571 unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
572 }
573
574 fn with_root(&self, root: fba::ArrayNode) -> Self {
577 let mut this = self.clone();
578 this.flatbuffer_loc = root._tab.loc();
579 this
580 }
581
582 pub fn from_flatbuffer_and_segment(
588 array_tree: ByteBuffer,
589 segment: BufferHandle,
590 ) -> VortexResult<Self> {
591 Self::from_flatbuffer_and_segment_with_overrides(array_tree, segment, &HashMap::new())
593 }
594
595 pub fn from_flatbuffer_and_segment_with_overrides(
602 array_tree: ByteBuffer,
603 segment: BufferHandle,
604 buffer_overrides: &HashMap<u32, ByteBuffer>,
605 ) -> VortexResult<Self> {
606 let segment = segment.ensure_aligned(Alignment::none())?;
609
610 let (fb_buffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
613 let fb_array = unsafe { fba::root_as_array_unchecked(fb_buffer.as_ref()) };
615
616 let mut offset = 0;
617 let buffers = fb_array
618 .buffers()
619 .unwrap_or_default()
620 .iter()
621 .enumerate()
622 .map(|(idx, fb_buf)| {
623 offset += fb_buf.padding() as usize;
624 let buffer_len = fb_buf.length() as usize;
625 let alignment = Alignment::from_exponent(fb_buf.alignment_exponent());
626
627 let idx = u32::try_from(idx).vortex_expect("buffer count must fit in u32");
628 let handle = if let Some(host_data) = buffer_overrides.get(&idx) {
629 BufferHandle::new_host(host_data.clone()).ensure_aligned(alignment)?
630 } else {
631 let buffer = segment.slice(offset..(offset + buffer_len));
632 buffer.ensure_aligned(alignment)?
633 };
634
635 offset += buffer_len;
636 Ok(handle)
637 })
638 .collect::<VortexResult<Arc<[_]>>>()?;
639
640 Ok(SerializedArray {
641 flatbuffer: fb_buffer,
642 flatbuffer_loc,
643 buffers,
644 })
645 }
646}
647
648struct SerializedArrayChildren<'a> {
649 ser: &'a SerializedArray,
650 ctx: &'a ReadContext,
651 session: &'a VortexSession,
652}
653
654impl ArrayChildren for SerializedArrayChildren<'_> {
655 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
656 self.ser
657 .child(index)
658 .decode(dtype, len, self.ctx, self.session)
659 }
660
661 fn len(&self) -> usize {
662 self.ser.nchildren()
663 }
664}
665
666impl TryFrom<ByteBuffer> for SerializedArray {
667 type Error = VortexError;
668
669 fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
670 if value.len() < 4 {
672 vortex_bail!("SerializedArray buffer is too short");
673 }
674
675 let value = value.aligned(Alignment::none());
677
678 let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
679 if value.len() < 4 + fb_length {
680 vortex_bail!("SerializedArray buffer is too short for flatbuffer");
681 }
682
683 let fb_offset = value.len() - 4 - fb_length;
684 let array_tree = value.slice(fb_offset..fb_offset + fb_length);
685 let segment = BufferHandle::new_host(value.slice(0..fb_offset));
686
687 Self::from_flatbuffer_and_segment(array_tree, segment)
688 }
689}
690
691impl TryFrom<BufferHandle> for SerializedArray {
692 type Error = VortexError;
693
694 fn try_from(value: BufferHandle) -> Result<Self, Self::Error> {
695 Self::try_from(value.try_to_host_sync()?)
696 }
697}