1use std::borrow::Cow;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::iter;
8use std::sync::Arc;
9
10use flatbuffers::FlatBufferBuilder;
11use flatbuffers::Follow;
12use flatbuffers::WIPOffset;
13use flatbuffers::root;
14use vortex_buffer::Alignment;
15use vortex_buffer::ByteBuffer;
16use vortex_error::VortexError;
17use vortex_error::VortexExpect;
18use vortex_error::VortexResult;
19use vortex_error::vortex_bail;
20use vortex_error::vortex_err;
21use vortex_error::vortex_panic;
22use vortex_flatbuffers::FlatBuffer;
23use vortex_flatbuffers::WriteFlatBuffer;
24use vortex_flatbuffers::array as fba;
25use vortex_flatbuffers::array::Compression;
26use vortex_session::VortexSession;
27use vortex_session::registry::ReadContext;
28use vortex_utils::aliases::hash_map::HashMap;
29
30use crate::ArrayContext;
31use crate::ArrayRef;
32use crate::array::new_foreign_array;
33use crate::buffer::BufferHandle;
34use crate::dtype::DType;
35use crate::dtype::TryFromBytes;
36use crate::session::ArraySessionExt;
37use crate::stats::StatsSet;
38
39#[derive(Default, Debug)]
41pub struct SerializeOptions {
42 pub offset: usize,
45 pub include_padding: bool,
47}
48
49impl ArrayRef {
50 pub fn serialize(
61 &self,
62 ctx: &ArrayContext,
63 session: &VortexSession,
64 options: &SerializeOptions,
65 ) -> VortexResult<Vec<ByteBuffer>> {
66 let array_buffers = self
68 .depth_first_traversal()
69 .flat_map(|f| f.buffers())
70 .collect::<Vec<_>>();
71
72 let mut buffers = vec![];
74 let mut fb_buffers = Vec::with_capacity(buffers.capacity());
75
76 let max_alignment = array_buffers
78 .iter()
79 .map(|buf| buf.alignment())
80 .chain(iter::once(FlatBuffer::alignment()))
81 .max()
82 .unwrap_or_else(FlatBuffer::alignment);
83
84 let zeros = ByteBuffer::zeroed(*max_alignment);
86
87 buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
90
91 let mut pos = options.offset;
93
94 for buffer in array_buffers {
96 let padding = if options.include_padding {
97 let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
98 if padding > 0 {
99 pos += padding;
100 buffers.push(zeros.slice(0..padding));
101 }
102 padding
103 } else {
104 0
105 };
106
107 fb_buffers.push(fba::Buffer::new(
108 u16::try_from(padding).vortex_expect("padding fits into u16"),
109 buffer.alignment().exponent(),
110 Compression::None,
111 u32::try_from(buffer.len())
112 .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
113 ));
114
115 pos += buffer.len();
116 buffers.push(buffer.aligned(Alignment::none()));
117 }
118
119 let mut fbb = FlatBufferBuilder::new();
121
122 let root = ArrayNodeFlatBuffer::try_new(ctx, session, self)?;
123 let fb_root = root.try_write_flatbuffer(&mut fbb)?;
124
125 let fb_buffers = fbb.create_vector(&fb_buffers);
126 let fb_array = fba::Array::create(
127 &mut fbb,
128 &fba::ArrayArgs {
129 root: Some(fb_root),
130 buffers: Some(fb_buffers),
131 },
132 );
133 fbb.finish_minimal(fb_array);
134 let (fb_vec, fb_start) = fbb.collapse();
135 let fb_end = fb_vec.len();
136 let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
137 let fb_length = fb_buffer.len();
138
139 if options.include_padding {
140 let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
141 if padding > 0 {
142 buffers.push(zeros.slice(0..padding));
143 }
144 }
145 buffers.push(fb_buffer);
146
147 buffers.push(ByteBuffer::from(
149 u32::try_from(fb_length)
150 .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
151 .to_le_bytes()
152 .to_vec(),
153 ));
154
155 Ok(buffers)
156 }
157}
158
159pub struct ArrayNodeFlatBuffer<'a> {
161 ctx: &'a ArrayContext,
162 session: &'a VortexSession,
163 array: &'a ArrayRef,
164 buffer_idx: u16,
165}
166
167impl<'a> ArrayNodeFlatBuffer<'a> {
168 pub fn try_new(
169 ctx: &'a ArrayContext,
170 session: &'a VortexSession,
171 array: &'a ArrayRef,
172 ) -> VortexResult<Self> {
173 let n_buffers_recursive = array.nbuffers_recursive();
174 if n_buffers_recursive > u16::MAX as usize {
175 vortex_bail!(
176 "Array and all descendent arrays can have at most u16::MAX buffers: {}",
177 n_buffers_recursive
178 );
179 };
180 Ok(Self {
181 ctx,
182 session,
183 array,
184 buffer_idx: 0,
185 })
186 }
187
188 pub fn try_write_flatbuffer<'fb>(
189 &self,
190 fbb: &mut FlatBufferBuilder<'fb>,
191 ) -> VortexResult<WIPOffset<fba::ArrayNode<'fb>>> {
192 let encoding_idx = self
193 .ctx
194 .intern(&self.array.encoding_id())
195 .ok_or_else(|| {
197 vortex_err!(
198 "Array encoding {} not permitted by ctx",
199 self.array.encoding_id()
200 )
201 })?;
202
203 let metadata_bytes = self.session.array_serialize(self.array)?.ok_or_else(|| {
204 vortex_err!(
205 "Array {} does not support serialization",
206 self.array.encoding_id()
207 )
208 })?;
209 let metadata = Some(fbb.create_vector(metadata_bytes.as_slice()));
210
211 let nbuffers = u16::try_from(self.array.nbuffers())
213 .map_err(|_| vortex_err!("Array can have at most u16::MAX buffers"))?;
214 let mut child_buffer_idx = self.buffer_idx + nbuffers;
215
216 let children = self
217 .array
218 .children()
219 .iter()
220 .map(|child| {
221 let msg = ArrayNodeFlatBuffer {
223 ctx: self.ctx,
224 session: self.session,
225 array: child,
226 buffer_idx: child_buffer_idx,
227 }
228 .try_write_flatbuffer(fbb)?;
229
230 child_buffer_idx = u16::try_from(child.nbuffers_recursive())
231 .ok()
232 .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
233 .ok_or_else(|| vortex_err!("Too many buffers (u16) for Array"))?;
234
235 Ok(msg)
236 })
237 .collect::<VortexResult<Vec<_>>>()?;
238 let children = Some(fbb.create_vector(&children));
239
240 let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
241 let stats = Some(self.array.statistics().write_flatbuffer(fbb)?);
242
243 Ok(fba::ArrayNode::create(
244 fbb,
245 &fba::ArrayNodeArgs {
246 encoding: encoding_idx,
247 metadata,
248 children,
249 buffers,
250 stats,
251 },
252 ))
253 }
254}
255
256pub trait ArrayChildren {
259 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
261
262 fn len(&self) -> usize;
264
265 fn is_empty(&self) -> bool {
267 self.len() == 0
268 }
269}
270
271impl<T: AsRef<[ArrayRef]>> ArrayChildren for T {
272 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
273 let array = self.as_ref()[index].clone();
274 assert_eq!(array.len(), len);
275 assert_eq!(array.dtype(), dtype);
276 Ok(array)
277 }
278
279 fn len(&self) -> usize {
280 self.as_ref().len()
281 }
282}
283
284#[derive(Clone)]
291pub struct SerializedArray {
292 flatbuffer: FlatBuffer,
294 flatbuffer_loc: usize,
296 buffers: Arc<[BufferHandle]>,
297}
298
299impl Debug for SerializedArray {
300 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
301 f.debug_struct("SerializedArray")
302 .field("encoding_id", &self.encoding_id())
303 .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
304 .field(
305 "buffers",
306 &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
307 )
308 .field("metadata", &self.metadata())
309 .finish()
310 }
311}
312
313impl SerializedArray {
314 pub fn decode(
316 &self,
317 dtype: &DType,
318 len: usize,
319 ctx: &ReadContext,
320 session: &VortexSession,
321 ) -> VortexResult<ArrayRef> {
322 let encoding_idx = self.flatbuffer().encoding();
323 let encoding_id = ctx
324 .resolve(encoding_idx)
325 .ok_or_else(|| vortex_err!("Unknown encoding index: {}", encoding_idx))?;
326 let Some(plugin) = session.arrays().registry().find(&encoding_id) else {
327 if session.allows_unknown() {
328 return self.decode_foreign(encoding_id, dtype, len, ctx);
329 }
330 return Err(vortex_err!("Unknown encoding: {}", encoding_id));
331 };
332
333 let children = SerializedArrayChildren {
334 ser: self,
335 ctx,
336 session,
337 };
338
339 let buffers = self.collect_buffers()?;
340
341 let decoded =
342 plugin.deserialize(dtype, len, self.metadata(), &buffers, &children, session)?;
343
344 assert_eq!(
345 decoded.len(),
346 len,
347 "Array decoded from {} has incorrect length {}, expected {}",
348 encoding_id,
349 decoded.len(),
350 len
351 );
352 assert_eq!(
353 decoded.dtype(),
354 dtype,
355 "Array decoded from {} has incorrect dtype {}, expected {}",
356 encoding_id,
357 decoded.dtype(),
358 dtype,
359 );
360
361 assert!(
362 plugin.is_supported_encoding(&decoded.encoding_id()),
363 "Array decoded from {} has incorrect encoding {}",
364 encoding_id,
365 decoded.encoding_id(),
366 );
367
368 if let Some(stats) = self.flatbuffer().stats() {
370 decoded
371 .statistics()
372 .set_iter(StatsSet::from_flatbuffer(&stats, dtype, session)?.into_iter());
373 }
374
375 Ok(decoded)
376 }
377
378 fn decode_foreign(
379 &self,
380 encoding_id: crate::array::ArrayId,
381 dtype: &DType,
382 len: usize,
383 ctx: &ReadContext,
384 ) -> VortexResult<ArrayRef> {
385 let children = (0..self.nchildren())
386 .map(|idx| {
387 let child = self.child(idx);
388 let child_encoding_idx = child.flatbuffer().encoding();
389 let child_encoding_id = ctx
390 .resolve(child_encoding_idx)
391 .ok_or_else(|| vortex_err!("Unknown encoding index: {}", child_encoding_idx))?;
392 child.decode_foreign(child_encoding_id, dtype, len, ctx)
393 })
394 .collect::<VortexResult<Vec<_>>>()?;
395
396 new_foreign_array(
397 encoding_id,
398 dtype.clone(),
399 len,
400 self.metadata().to_vec(),
401 self.collect_buffers()?.into_owned(),
402 children,
403 )
404 }
405
406 pub fn encoding_id(&self) -> u16 {
408 self.flatbuffer().encoding()
409 }
410
411 pub fn metadata(&self) -> &[u8] {
413 self.flatbuffer()
414 .metadata()
415 .map(|metadata| metadata.bytes())
416 .unwrap_or(&[])
417 }
418
419 pub fn nchildren(&self) -> usize {
421 self.flatbuffer()
422 .children()
423 .map_or(0, |children| children.len())
424 }
425
426 pub fn child(&self, idx: usize) -> SerializedArray {
428 let children = self
429 .flatbuffer()
430 .children()
431 .vortex_expect("Expected array to have children");
432 if idx >= children.len() {
433 vortex_panic!(
434 "Invalid child index {} for array with {} children",
435 idx,
436 children.len()
437 );
438 }
439 self.with_root(children.get(idx))
440 }
441
442 pub fn nbuffers(&self) -> usize {
444 self.flatbuffer()
445 .buffers()
446 .map_or(0, |buffers| buffers.len())
447 }
448
449 pub fn buffer(&self, idx: usize) -> VortexResult<BufferHandle> {
451 let buffer_idx = self
452 .flatbuffer()
453 .buffers()
454 .ok_or_else(|| vortex_err!("Array has no buffers"))?
455 .get(idx);
456 self.buffers
457 .get(buffer_idx as usize)
458 .cloned()
459 .ok_or_else(|| {
460 vortex_err!(
461 "Invalid buffer index {} for array with {} buffers",
462 buffer_idx,
463 self.nbuffers()
464 )
465 })
466 }
467
468 fn collect_buffers(&self) -> VortexResult<Cow<'_, [BufferHandle]>> {
473 let Some(fb_buffers) = self.flatbuffer().buffers() else {
474 return Ok(Cow::Borrowed(&[]));
475 };
476 let count = fb_buffers.len();
477 if count == 0 {
478 return Ok(Cow::Borrowed(&[]));
479 }
480 let start = fb_buffers.get(0) as usize;
481 let contiguous = fb_buffers
482 .iter()
483 .enumerate()
484 .all(|(i, idx)| idx as usize == start + i);
485 if contiguous {
486 self.buffers.get(start..start + count).map_or_else(
487 || {
488 vortex_bail!(
489 "buffer indices {}..{} out of range for {} buffers",
490 start,
491 start + count,
492 self.buffers.len()
493 )
494 },
495 |slice| Ok(Cow::Borrowed(slice)),
496 )
497 } else {
498 (0..count)
499 .map(|idx| self.buffer(idx))
500 .collect::<VortexResult<Vec<_>>>()
501 .map(Cow::Owned)
502 }
503 }
504
505 pub fn buffer_lengths(&self) -> Vec<usize> {
511 let fb_array = root::<fba::Array>(self.flatbuffer.as_ref())
512 .vortex_expect("SerializedArray flatbuffer must be a valid Array");
513 fb_array
514 .buffers()
515 .map(|buffers| buffers.iter().map(|b| b.length() as usize).collect())
516 .unwrap_or_default()
517 }
518
519 fn validate_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<(FlatBuffer, usize)> {
521 let fb_buffer = FlatBuffer::align_from(array_tree.into());
522 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
523 let fb_root = fb_array
524 .root()
525 .ok_or_else(|| vortex_err!("Array must have a root node"))?;
526 let flatbuffer_loc = fb_root._tab.loc();
527 Ok((fb_buffer, flatbuffer_loc))
528 }
529
530 pub fn from_flatbuffer_with_buffers(
537 array_tree: impl Into<ByteBuffer>,
538 buffers: Vec<BufferHandle>,
539 ) -> VortexResult<Self> {
540 let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
541 Ok(SerializedArray {
542 flatbuffer,
543 flatbuffer_loc,
544 buffers: buffers.into(),
545 })
546 }
547
548 pub fn from_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<Self> {
557 let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
558 Ok(SerializedArray {
559 flatbuffer,
560 flatbuffer_loc,
561 buffers: Arc::new([]),
562 })
563 }
564
565 fn flatbuffer(&self) -> fba::ArrayNode<'_> {
567 unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
568 }
569
570 fn with_root(&self, root: fba::ArrayNode) -> Self {
573 let mut this = self.clone();
574 this.flatbuffer_loc = root._tab.loc();
575 this
576 }
577
578 pub fn from_flatbuffer_and_segment(
584 array_tree: ByteBuffer,
585 segment: BufferHandle,
586 ) -> VortexResult<Self> {
587 Self::from_flatbuffer_and_segment_with_overrides(array_tree, segment, &HashMap::new())
589 }
590
591 pub fn from_flatbuffer_and_segment_with_overrides(
598 array_tree: ByteBuffer,
599 segment: BufferHandle,
600 buffer_overrides: &HashMap<u32, ByteBuffer>,
601 ) -> VortexResult<Self> {
602 let segment = segment.ensure_aligned(Alignment::none())?;
605
606 let (fb_buffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
609 let fb_array = unsafe { fba::root_as_array_unchecked(fb_buffer.as_ref()) };
611
612 let mut offset = 0;
613 let buffers = fb_array
614 .buffers()
615 .unwrap_or_default()
616 .iter()
617 .enumerate()
618 .map(|(idx, fb_buf)| {
619 offset += fb_buf.padding() as usize;
620 let buffer_len = fb_buf.length() as usize;
621 let alignment = Alignment::from_exponent(fb_buf.alignment_exponent());
622
623 let idx = u32::try_from(idx).vortex_expect("buffer count must fit in u32");
624 let handle = if let Some(host_data) = buffer_overrides.get(&idx) {
625 BufferHandle::new_host(host_data.clone()).ensure_aligned(alignment)?
626 } else {
627 let buffer = segment.slice(offset..(offset + buffer_len));
628 buffer.ensure_aligned(alignment)?
629 };
630
631 offset += buffer_len;
632 Ok(handle)
633 })
634 .collect::<VortexResult<Arc<[_]>>>()?;
635
636 Ok(SerializedArray {
637 flatbuffer: fb_buffer,
638 flatbuffer_loc,
639 buffers,
640 })
641 }
642}
643
644struct SerializedArrayChildren<'a> {
645 ser: &'a SerializedArray,
646 ctx: &'a ReadContext,
647 session: &'a VortexSession,
648}
649
650impl ArrayChildren for SerializedArrayChildren<'_> {
651 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
652 self.ser
653 .child(index)
654 .decode(dtype, len, self.ctx, self.session)
655 }
656
657 fn len(&self) -> usize {
658 self.ser.nchildren()
659 }
660}
661
662impl TryFrom<ByteBuffer> for SerializedArray {
663 type Error = VortexError;
664
665 fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
666 if value.len() < 4 {
668 vortex_bail!("SerializedArray buffer is too short");
669 }
670
671 let value = value.aligned(Alignment::none());
673
674 let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
675 if value.len() < 4 + fb_length {
676 vortex_bail!("SerializedArray buffer is too short for flatbuffer");
677 }
678
679 let fb_offset = value.len() - 4 - fb_length;
680 let array_tree = value.slice(fb_offset..fb_offset + fb_length);
681 let segment = BufferHandle::new_host(value.slice(0..fb_offset));
682
683 Self::from_flatbuffer_and_segment(array_tree, segment)
684 }
685}
686
687impl TryFrom<BufferHandle> for SerializedArray {
688 type Error = VortexError;
689
690 fn try_from(value: BufferHandle) -> Result<Self, Self::Error> {
691 Self::try_from(value.try_to_host_sync()?)
692 }
693}