1use std::borrow::Cow;
5use std::fmt::Debug;
6use std::fmt::Formatter;
7use std::iter;
8use std::sync::Arc;
9
10use flatbuffers::FlatBufferBuilder;
11use flatbuffers::Follow;
12use flatbuffers::WIPOffset;
13use flatbuffers::root;
14use vortex_buffer::Alignment;
15use vortex_buffer::ByteBuffer;
16use vortex_error::VortexError;
17use vortex_error::VortexExpect;
18use vortex_error::VortexResult;
19use vortex_error::vortex_bail;
20use vortex_error::vortex_err;
21use vortex_error::vortex_panic;
22use vortex_flatbuffers::FlatBuffer;
23use vortex_flatbuffers::WriteFlatBuffer;
24use vortex_flatbuffers::array as fba;
25use vortex_flatbuffers::array::Compression;
26use vortex_session::VortexSession;
27use vortex_session::registry::ReadContext;
28use vortex_utils::aliases::hash_map::HashMap;
29
30use crate::ArrayContext;
31use crate::ArrayRef;
32use crate::array::ArrayId;
33use crate::array::new_foreign_array;
34use crate::buffer::BufferHandle;
35use crate::dtype::DType;
36use crate::dtype::TryFromBytes;
37use crate::session::ArraySessionExt;
38use crate::stats::StatsSet;
39
40#[derive(Default, Debug)]
42pub struct SerializeOptions {
43 pub offset: usize,
46 pub include_padding: bool,
48}
49
50impl ArrayRef {
51 pub fn serialize(
62 &self,
63 ctx: &ArrayContext,
64 session: &VortexSession,
65 options: &SerializeOptions,
66 ) -> VortexResult<Vec<ByteBuffer>> {
67 let array_buffers = self
69 .depth_first_traversal()
70 .flat_map(|f| f.buffers())
71 .collect::<Vec<_>>();
72
73 let mut buffers = vec![];
75 let mut fb_buffers = Vec::with_capacity(buffers.capacity());
76
77 let max_alignment = array_buffers
79 .iter()
80 .map(|buf| buf.alignment())
81 .chain(iter::once(FlatBuffer::alignment()))
82 .max()
83 .unwrap_or_else(FlatBuffer::alignment);
84
85 let zeros = ByteBuffer::zeroed(*max_alignment);
87
88 buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
91
92 let mut pos = options.offset;
94
95 for buffer in array_buffers {
97 let padding = if options.include_padding {
98 let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
99 if padding > 0 {
100 pos += padding;
101 buffers.push(zeros.slice(0..padding));
102 }
103 padding
104 } else {
105 0
106 };
107
108 fb_buffers.push(fba::Buffer::new(
109 u16::try_from(padding).vortex_expect("padding fits into u16"),
110 buffer.alignment().exponent(),
111 Compression::None,
112 u32::try_from(buffer.len())
113 .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
114 ));
115
116 pos += buffer.len();
117 buffers.push(buffer.aligned(Alignment::none()));
118 }
119
120 let mut fbb = FlatBufferBuilder::new();
122
123 let root = ArrayNodeFlatBuffer::try_new(ctx, session, self)?;
124 let fb_root = root.try_write_flatbuffer(&mut fbb)?;
125
126 let fb_buffers = fbb.create_vector(&fb_buffers);
127 let fb_array = fba::Array::create(
128 &mut fbb,
129 &fba::ArrayArgs {
130 root: Some(fb_root),
131 buffers: Some(fb_buffers),
132 },
133 );
134 fbb.finish_minimal(fb_array);
135 let (fb_vec, fb_start) = fbb.collapse();
136 let fb_end = fb_vec.len();
137 let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
138 let fb_length = fb_buffer.len();
139
140 if options.include_padding {
141 let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
142 if padding > 0 {
143 buffers.push(zeros.slice(0..padding));
144 }
145 }
146 buffers.push(fb_buffer);
147
148 buffers.push(ByteBuffer::from(
150 u32::try_from(fb_length)
151 .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
152 .to_le_bytes()
153 .to_vec(),
154 ));
155
156 Ok(buffers)
157 }
158}
159
160pub struct ArrayNodeFlatBuffer<'a> {
162 ctx: &'a ArrayContext,
163 session: &'a VortexSession,
164 array: &'a ArrayRef,
165 buffer_idx: u16,
166}
167
168impl<'a> ArrayNodeFlatBuffer<'a> {
169 pub fn try_new(
170 ctx: &'a ArrayContext,
171 session: &'a VortexSession,
172 array: &'a ArrayRef,
173 ) -> VortexResult<Self> {
174 let n_buffers_recursive = array.nbuffers_recursive();
175 if n_buffers_recursive > u16::MAX as usize {
176 vortex_bail!(
177 "Array and all descendent arrays can have at most u16::MAX buffers: {}",
178 n_buffers_recursive
179 );
180 };
181 Ok(Self {
182 ctx,
183 session,
184 array,
185 buffer_idx: 0,
186 })
187 }
188
189 pub fn try_write_flatbuffer<'fb>(
190 &self,
191 fbb: &mut FlatBufferBuilder<'fb>,
192 ) -> VortexResult<WIPOffset<fba::ArrayNode<'fb>>> {
193 let encoding_idx = self
194 .ctx
195 .intern(&self.array.encoding_id())
196 .ok_or_else(|| {
198 vortex_err!(
199 "Array encoding {} not permitted by ctx",
200 self.array.encoding_id()
201 )
202 })?;
203
204 let metadata_bytes = self.session.array_serialize(self.array)?.ok_or_else(|| {
205 vortex_err!(
206 "Array {} does not support serialization",
207 self.array.encoding_id()
208 )
209 })?;
210 let metadata = Some(fbb.create_vector(metadata_bytes.as_slice()));
211
212 let nbuffers = u16::try_from(self.array.nbuffers())
214 .map_err(|_| vortex_err!("Array can have at most u16::MAX buffers"))?;
215 let mut child_buffer_idx = self.buffer_idx + nbuffers;
216
217 let children = self
218 .array
219 .children()
220 .iter()
221 .map(|child| {
222 let msg = ArrayNodeFlatBuffer {
224 ctx: self.ctx,
225 session: self.session,
226 array: child,
227 buffer_idx: child_buffer_idx,
228 }
229 .try_write_flatbuffer(fbb)?;
230
231 child_buffer_idx = u16::try_from(child.nbuffers_recursive())
232 .ok()
233 .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
234 .ok_or_else(|| vortex_err!("Too many buffers (u16) for Array"))?;
235
236 Ok(msg)
237 })
238 .collect::<VortexResult<Vec<_>>>()?;
239 let children = Some(fbb.create_vector(&children));
240
241 let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
242 let stats = Some(self.array.statistics().write_flatbuffer(fbb)?);
243
244 Ok(fba::ArrayNode::create(
245 fbb,
246 &fba::ArrayNodeArgs {
247 encoding: encoding_idx,
248 metadata,
249 children,
250 buffers,
251 stats,
252 },
253 ))
254 }
255}
256
257pub trait ArrayChildren {
260 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
262
263 fn len(&self) -> usize;
265
266 fn is_empty(&self) -> bool {
268 self.len() == 0
269 }
270}
271
272impl<T: AsRef<[ArrayRef]>> ArrayChildren for T {
273 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
274 let array = self.as_ref()[index].clone();
275 assert_eq!(array.len(), len);
276 assert_eq!(array.dtype(), dtype);
277 Ok(array)
278 }
279
280 fn len(&self) -> usize {
281 self.as_ref().len()
282 }
283}
284
285#[derive(Clone)]
292pub struct SerializedArray {
293 flatbuffer: FlatBuffer,
295 flatbuffer_loc: usize,
297 buffers: Arc<[BufferHandle]>,
298}
299
300impl Debug for SerializedArray {
301 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
302 f.debug_struct("SerializedArray")
303 .field("encoding_id", &self.encoding_id())
304 .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
305 .field(
306 "buffers",
307 &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
308 )
309 .field("metadata", &self.metadata())
310 .finish()
311 }
312}
313
314impl SerializedArray {
315 pub fn decode(
317 &self,
318 dtype: &DType,
319 len: usize,
320 ctx: &ReadContext,
321 session: &VortexSession,
322 ) -> VortexResult<ArrayRef> {
323 let encoding_idx = self.flatbuffer().encoding();
324 let encoding_id = ctx
325 .resolve(encoding_idx)
326 .ok_or_else(|| vortex_err!("Unknown encoding index: {}", encoding_idx))?;
327 let Some(plugin) = session.arrays().registry().find(&encoding_id) else {
328 if session.allows_unknown() {
329 return self.decode_foreign(encoding_id, dtype, len, ctx);
330 }
331 return Err(vortex_err!("Unknown encoding: {}", encoding_id));
332 };
333
334 let children = SerializedArrayChildren {
335 ser: self,
336 ctx,
337 session,
338 };
339
340 let buffers = self.collect_buffers()?;
341
342 let decoded =
343 plugin.deserialize(dtype, len, self.metadata(), &buffers, &children, session)?;
344
345 assert_eq!(
346 decoded.len(),
347 len,
348 "Array decoded from {} has incorrect length {}, expected {}",
349 encoding_id,
350 decoded.len(),
351 len
352 );
353 assert_eq!(
354 decoded.dtype(),
355 dtype,
356 "Array decoded from {} has incorrect dtype {}, expected {}",
357 encoding_id,
358 decoded.dtype(),
359 dtype,
360 );
361
362 assert!(
363 plugin.is_supported_encoding(&decoded.encoding_id()),
364 "Array decoded from {} has incorrect encoding {}",
365 encoding_id,
366 decoded.encoding_id(),
367 );
368
369 if let Some(stats) = self.flatbuffer().stats() {
371 decoded
372 .statistics()
373 .set_iter(StatsSet::from_flatbuffer(&stats, dtype, session)?.into_iter());
374 }
375
376 Ok(decoded)
377 }
378
379 fn decode_foreign(
380 &self,
381 encoding_id: ArrayId,
382 dtype: &DType,
383 len: usize,
384 ctx: &ReadContext,
385 ) -> VortexResult<ArrayRef> {
386 let children = (0..self.nchildren())
387 .map(|idx| {
388 let child = self.child(idx);
389 let child_encoding_idx = child.flatbuffer().encoding();
390 let child_encoding_id = ctx
391 .resolve(child_encoding_idx)
392 .ok_or_else(|| vortex_err!("Unknown encoding index: {}", child_encoding_idx))?;
393 child.decode_foreign(child_encoding_id, dtype, len, ctx)
394 })
395 .collect::<VortexResult<Vec<_>>>()?;
396
397 new_foreign_array(
398 encoding_id,
399 dtype.clone(),
400 len,
401 self.metadata().to_vec(),
402 self.collect_buffers()?.into_owned(),
403 children,
404 )
405 }
406
407 pub fn encoding_id(&self) -> u16 {
409 self.flatbuffer().encoding()
410 }
411
412 pub fn metadata(&self) -> &[u8] {
414 self.flatbuffer()
415 .metadata()
416 .map(|metadata| metadata.bytes())
417 .unwrap_or(&[])
418 }
419
420 pub fn nchildren(&self) -> usize {
422 self.flatbuffer()
423 .children()
424 .map_or(0, |children| children.len())
425 }
426
427 pub fn child(&self, idx: usize) -> SerializedArray {
429 let children = self
430 .flatbuffer()
431 .children()
432 .vortex_expect("Expected array to have children");
433 if idx >= children.len() {
434 vortex_panic!(
435 "Invalid child index {} for array with {} children",
436 idx,
437 children.len()
438 );
439 }
440 self.with_root(children.get(idx))
441 }
442
443 pub fn nbuffers(&self) -> usize {
445 self.flatbuffer()
446 .buffers()
447 .map_or(0, |buffers| buffers.len())
448 }
449
450 pub fn buffer(&self, idx: usize) -> VortexResult<BufferHandle> {
452 let buffer_idx = self
453 .flatbuffer()
454 .buffers()
455 .ok_or_else(|| vortex_err!("Array has no buffers"))?
456 .get(idx);
457 self.buffers
458 .get(buffer_idx as usize)
459 .cloned()
460 .ok_or_else(|| {
461 vortex_err!(
462 "Invalid buffer index {} for array with {} buffers",
463 buffer_idx,
464 self.nbuffers()
465 )
466 })
467 }
468
469 fn collect_buffers(&self) -> VortexResult<Cow<'_, [BufferHandle]>> {
474 let Some(fb_buffers) = self.flatbuffer().buffers() else {
475 return Ok(Cow::Borrowed(&[]));
476 };
477 let count = fb_buffers.len();
478 if count == 0 {
479 return Ok(Cow::Borrowed(&[]));
480 }
481 let start = fb_buffers.get(0) as usize;
482 let contiguous = fb_buffers
483 .iter()
484 .enumerate()
485 .all(|(i, idx)| idx as usize == start + i);
486 if contiguous {
487 self.buffers.get(start..start + count).map_or_else(
488 || {
489 vortex_bail!(
490 "buffer indices {}..{} out of range for {} buffers",
491 start,
492 start + count,
493 self.buffers.len()
494 )
495 },
496 |slice| Ok(Cow::Borrowed(slice)),
497 )
498 } else {
499 (0..count)
500 .map(|idx| self.buffer(idx))
501 .collect::<VortexResult<Vec<_>>>()
502 .map(Cow::Owned)
503 }
504 }
505
506 pub fn buffer_lengths(&self) -> Vec<usize> {
512 let fb_array = root::<fba::Array>(self.flatbuffer.as_ref())
513 .vortex_expect("SerializedArray flatbuffer must be a valid Array");
514 fb_array
515 .buffers()
516 .map(|buffers| buffers.iter().map(|b| b.length() as usize).collect())
517 .unwrap_or_default()
518 }
519
520 fn validate_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<(FlatBuffer, usize)> {
522 let fb_buffer = FlatBuffer::align_from(array_tree.into());
523 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
524 let fb_root = fb_array
525 .root()
526 .ok_or_else(|| vortex_err!("Array must have a root node"))?;
527 let flatbuffer_loc = fb_root._tab.loc();
528 Ok((fb_buffer, flatbuffer_loc))
529 }
530
531 pub fn from_flatbuffer_with_buffers(
538 array_tree: impl Into<ByteBuffer>,
539 buffers: Vec<BufferHandle>,
540 ) -> VortexResult<Self> {
541 let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
542 Ok(SerializedArray {
543 flatbuffer,
544 flatbuffer_loc,
545 buffers: buffers.into(),
546 })
547 }
548
549 pub fn from_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<Self> {
558 let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
559 Ok(SerializedArray {
560 flatbuffer,
561 flatbuffer_loc,
562 buffers: Arc::new([]),
563 })
564 }
565
566 fn flatbuffer(&self) -> fba::ArrayNode<'_> {
568 unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
569 }
570
571 fn with_root(&self, root: fba::ArrayNode) -> Self {
574 let mut this = self.clone();
575 this.flatbuffer_loc = root._tab.loc();
576 this
577 }
578
579 pub fn from_flatbuffer_and_segment(
585 array_tree: ByteBuffer,
586 segment: BufferHandle,
587 ) -> VortexResult<Self> {
588 Self::from_flatbuffer_and_segment_with_overrides(array_tree, segment, &HashMap::new())
590 }
591
592 pub fn from_flatbuffer_and_segment_with_overrides(
599 array_tree: ByteBuffer,
600 segment: BufferHandle,
601 buffer_overrides: &HashMap<u32, ByteBuffer>,
602 ) -> VortexResult<Self> {
603 let segment = segment.ensure_aligned(Alignment::none())?;
606
607 let (fb_buffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
610 let fb_array = unsafe { fba::root_as_array_unchecked(fb_buffer.as_ref()) };
612
613 let mut offset = 0;
614 let buffers = fb_array
615 .buffers()
616 .unwrap_or_default()
617 .iter()
618 .enumerate()
619 .map(|(idx, fb_buf)| {
620 offset += fb_buf.padding() as usize;
621 let buffer_len = fb_buf.length() as usize;
622 let alignment = Alignment::from_exponent(fb_buf.alignment_exponent());
623
624 let idx = u32::try_from(idx).vortex_expect("buffer count must fit in u32");
625 let handle = if let Some(host_data) = buffer_overrides.get(&idx) {
626 BufferHandle::new_host(host_data.clone()).ensure_aligned(alignment)?
627 } else {
628 let buffer = segment.slice(offset..(offset + buffer_len));
629 buffer.ensure_aligned(alignment)?
630 };
631
632 offset += buffer_len;
633 Ok(handle)
634 })
635 .collect::<VortexResult<Arc<[_]>>>()?;
636
637 Ok(SerializedArray {
638 flatbuffer: fb_buffer,
639 flatbuffer_loc,
640 buffers,
641 })
642 }
643}
644
645struct SerializedArrayChildren<'a> {
646 ser: &'a SerializedArray,
647 ctx: &'a ReadContext,
648 session: &'a VortexSession,
649}
650
651impl ArrayChildren for SerializedArrayChildren<'_> {
652 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
653 self.ser
654 .child(index)
655 .decode(dtype, len, self.ctx, self.session)
656 }
657
658 fn len(&self) -> usize {
659 self.ser.nchildren()
660 }
661}
662
663impl TryFrom<ByteBuffer> for SerializedArray {
664 type Error = VortexError;
665
666 fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
667 if value.len() < 4 {
669 vortex_bail!("SerializedArray buffer is too short");
670 }
671
672 let value = value.aligned(Alignment::none());
674
675 let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
676 if value.len() < 4 + fb_length {
677 vortex_bail!("SerializedArray buffer is too short for flatbuffer");
678 }
679
680 let fb_offset = value.len() - 4 - fb_length;
681 let array_tree = value.slice(fb_offset..fb_offset + fb_length);
682 let segment = BufferHandle::new_host(value.slice(0..fb_offset));
683
684 Self::from_flatbuffer_and_segment(array_tree, segment)
685 }
686}
687
688impl TryFrom<BufferHandle> for SerializedArray {
689 type Error = VortexError;
690
691 fn try_from(value: BufferHandle) -> Result<Self, Self::Error> {
692 Self::try_from(value.try_to_host_sync()?)
693 }
694}