1use std::fmt::Debug;
5use std::fmt::Formatter;
6use std::iter;
7use std::sync::Arc;
8
9use flatbuffers::FlatBufferBuilder;
10use flatbuffers::Follow;
11use flatbuffers::WIPOffset;
12use flatbuffers::root;
13use itertools::Itertools;
14use vortex_buffer::Alignment;
15use vortex_buffer::ByteBuffer;
16use vortex_dtype::DType;
17use vortex_dtype::TryFromBytes;
18use vortex_error::VortexError;
19use vortex_error::VortexExpect;
20use vortex_error::VortexResult;
21use vortex_error::vortex_bail;
22use vortex_error::vortex_err;
23use vortex_error::vortex_panic;
24use vortex_flatbuffers::FlatBuffer;
25use vortex_flatbuffers::WriteFlatBuffer;
26use vortex_flatbuffers::array as fba;
27use vortex_flatbuffers::array::Compression;
28use vortex_session::VortexSession;
29
30use crate::Array;
31use crate::ArrayContext;
32use crate::ArrayRef;
33use crate::ArrayVisitor;
34use crate::ArrayVisitorExt;
35use crate::buffer::BufferHandle;
36use crate::session::ArraySessionExt;
37use crate::stats::StatsSet;
38
39#[derive(Default, Debug)]
41pub struct SerializeOptions {
42 pub offset: usize,
45 pub include_padding: bool,
47}
48
49impl dyn Array + '_ {
50 pub fn serialize(
61 &self,
62 ctx: &ArrayContext,
63 options: &SerializeOptions,
64 ) -> VortexResult<Vec<ByteBuffer>> {
65 let array_buffers = self
67 .depth_first_traversal()
68 .flat_map(|f| f.buffers())
69 .collect::<Vec<_>>();
70
71 let mut buffers = vec![];
73 let mut fb_buffers = Vec::with_capacity(buffers.capacity());
74
75 let max_alignment = array_buffers
77 .iter()
78 .map(|buf| buf.alignment())
79 .chain(iter::once(FlatBuffer::alignment()))
80 .max()
81 .unwrap_or_else(FlatBuffer::alignment);
82
83 let zeros = ByteBuffer::zeroed(*max_alignment);
85
86 buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
89
90 let mut pos = options.offset;
92
93 for buffer in array_buffers {
95 let padding = if options.include_padding {
96 let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
97 if padding > 0 {
98 pos += padding;
99 buffers.push(zeros.slice(0..padding));
100 }
101 padding
102 } else {
103 0
104 };
105
106 fb_buffers.push(fba::Buffer::new(
107 u16::try_from(padding).vortex_expect("padding fits into u16"),
108 buffer.alignment().exponent(),
109 Compression::None,
110 u32::try_from(buffer.len())
111 .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
112 ));
113
114 pos += buffer.len();
115 buffers.push(buffer.aligned(Alignment::none()));
116 }
117
118 let mut fbb = FlatBufferBuilder::new();
120
121 let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
122 let fb_root = root.try_write_flatbuffer(&mut fbb)?;
123
124 let fb_buffers = fbb.create_vector(&fb_buffers);
125 let fb_array = fba::Array::create(
126 &mut fbb,
127 &fba::ArrayArgs {
128 root: Some(fb_root),
129 buffers: Some(fb_buffers),
130 },
131 );
132 fbb.finish_minimal(fb_array);
133 let (fb_vec, fb_start) = fbb.collapse();
134 let fb_end = fb_vec.len();
135 let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
136 let fb_length = fb_buffer.len();
137
138 if options.include_padding {
139 let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
140 if padding > 0 {
141 buffers.push(zeros.slice(0..padding));
142 }
143 }
144 buffers.push(fb_buffer);
145
146 buffers.push(ByteBuffer::from(
148 u32::try_from(fb_length)
149 .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
150 .to_le_bytes()
151 .to_vec(),
152 ));
153
154 Ok(buffers)
155 }
156}
157
158pub struct ArrayNodeFlatBuffer<'a> {
160 ctx: &'a ArrayContext,
161 array: &'a dyn Array,
162 buffer_idx: u16,
163}
164
165impl<'a> ArrayNodeFlatBuffer<'a> {
166 pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn Array) -> VortexResult<Self> {
167 for child in array.depth_first_traversal() {
169 if child.metadata()?.is_none() {
170 vortex_bail!(
171 "Array {} does not support serialization",
172 child.encoding_id()
173 );
174 }
175 }
176 let n_buffers_recursive = array.nbuffers_recursive();
177 if n_buffers_recursive > u16::MAX as usize {
178 vortex_bail!(
179 "Array and all descendent arrays can have at most u16::MAX buffers: {}",
180 n_buffers_recursive
181 );
182 };
183 Ok(Self {
184 ctx,
185 array,
186 buffer_idx: 0,
187 })
188 }
189
190 pub fn try_write_flatbuffer<'fb>(
191 &self,
192 fbb: &mut FlatBufferBuilder<'fb>,
193 ) -> VortexResult<WIPOffset<fba::ArrayNode<'fb>>> {
194 let encoding_idx = self
195 .ctx
196 .intern(&self.array.encoding_id())
197 .ok_or_else(|| {
199 vortex_err!(
200 "Array encoding {} not permitted by ctx",
201 self.array.encoding_id()
202 )
203 })?;
204
205 let metadata = self.array.metadata()?.ok_or_else(|| {
206 vortex_err!(
207 "Array {} does not support serialization",
208 self.array.encoding_id()
209 )
210 })?;
211 let metadata = Some(fbb.create_vector(metadata.as_slice()));
212
213 let nbuffers = u16::try_from(self.array.nbuffers())
215 .map_err(|_| vortex_err!("Array can have at most u16::MAX buffers"))?;
216 let mut child_buffer_idx = self.buffer_idx + nbuffers;
217
218 let children = &self
219 .array
220 .children()
221 .iter()
222 .map(|child| {
223 let msg = ArrayNodeFlatBuffer {
225 ctx: self.ctx,
226 array: child,
227 buffer_idx: child_buffer_idx,
228 }
229 .try_write_flatbuffer(fbb)?;
230
231 child_buffer_idx = u16::try_from(child.nbuffers_recursive())
232 .ok()
233 .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
234 .ok_or_else(|| vortex_err!("Too many buffers (u16) for Array"))?;
235
236 Ok(msg)
237 })
238 .collect::<VortexResult<Vec<_>>>()?;
239 let children = Some(fbb.create_vector(children));
240
241 let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
242 let stats = Some(self.array.statistics().write_flatbuffer(fbb)?);
243
244 Ok(fba::ArrayNode::create(
245 fbb,
246 &fba::ArrayNodeArgs {
247 encoding: encoding_idx,
248 metadata,
249 children,
250 buffers,
251 stats,
252 },
253 ))
254 }
255}
256
257pub trait ArrayChildren {
260 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
262
263 fn len(&self) -> usize;
265
266 fn is_empty(&self) -> bool {
268 self.len() == 0
269 }
270}
271
272impl ArrayChildren for &[ArrayRef] {
273 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
274 let array = self[index].clone();
275 assert_eq!(array.len(), len);
276 assert_eq!(array.dtype(), dtype);
277 Ok(array)
278 }
279
280 fn len(&self) -> usize {
281 <[_]>::len(self)
282 }
283}
284
285#[derive(Clone)]
292pub struct ArrayParts {
293 flatbuffer: FlatBuffer,
295 flatbuffer_loc: usize,
297 buffers: Arc<[BufferHandle]>,
298}
299
300impl Debug for ArrayParts {
301 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
302 f.debug_struct("ArrayParts")
303 .field("encoding_id", &self.encoding_id())
304 .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
305 .field(
306 "buffers",
307 &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
308 )
309 .field("metadata", &self.metadata())
310 .finish()
311 }
312}
313
314impl ArrayParts {
315 pub fn decode(
317 &self,
318 dtype: &DType,
319 len: usize,
320 ctx: &ArrayContext,
321 session: &VortexSession,
322 ) -> VortexResult<ArrayRef> {
323 let encoding_idx = self.flatbuffer().encoding();
324 let encoding_id = ctx
325 .resolve(encoding_idx)
326 .ok_or_else(|| vortex_err!("Unknown encoding index: {}", encoding_idx))?;
327 let vtable = session
328 .arrays()
329 .registry()
330 .find(&encoding_id)
331 .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
332
333 let buffers: Vec<_> = (0..self.nbuffers())
334 .map(|idx| self.buffer(idx))
335 .try_collect()?;
336
337 let children = ArrayPartsChildren {
338 parts: self,
339 ctx,
340 session,
341 };
342
343 let decoded = vtable.build(
344 encoding_id.clone(),
345 dtype,
346 len,
347 self.metadata(),
348 &buffers,
349 &children,
350 session,
351 )?;
352
353 assert_eq!(
354 decoded.len(),
355 len,
356 "Array decoded from {} has incorrect length {}, expected {}",
357 encoding_id,
358 decoded.len(),
359 len
360 );
361 assert_eq!(
362 decoded.dtype(),
363 dtype,
364 "Array decoded from {} has incorrect dtype {}, expected {}",
365 encoding_id,
366 decoded.dtype(),
367 dtype,
368 );
369 assert_eq!(
370 decoded.encoding_id(),
371 encoding_id,
372 "Array decoded from {} has incorrect encoding {}",
373 encoding_id,
374 decoded.encoding_id(),
375 );
376
377 if let Some(stats) = self.flatbuffer().stats() {
379 let decoded_statistics = decoded.statistics();
380 StatsSet::from_flatbuffer(&stats, dtype)?
381 .into_iter()
382 .for_each(|(stat, val)| decoded_statistics.set(stat, val));
383 }
384
385 Ok(decoded)
386 }
387
388 pub fn encoding_id(&self) -> u16 {
390 self.flatbuffer().encoding()
391 }
392
393 pub fn metadata(&self) -> &[u8] {
395 self.flatbuffer()
396 .metadata()
397 .map(|metadata| metadata.bytes())
398 .unwrap_or(&[])
399 }
400
401 pub fn nchildren(&self) -> usize {
403 self.flatbuffer()
404 .children()
405 .map_or(0, |children| children.len())
406 }
407
408 pub fn child(&self, idx: usize) -> ArrayParts {
410 let children = self
411 .flatbuffer()
412 .children()
413 .vortex_expect("Expected array to have children");
414 if idx >= children.len() {
415 vortex_panic!(
416 "Invalid child index {} for array with {} children",
417 idx,
418 children.len()
419 );
420 }
421 self.with_root(children.get(idx))
422 }
423
424 pub fn nbuffers(&self) -> usize {
426 self.flatbuffer()
427 .buffers()
428 .map_or(0, |buffers| buffers.len())
429 }
430
431 pub fn buffer(&self, idx: usize) -> VortexResult<BufferHandle> {
433 let buffer_idx = self
434 .flatbuffer()
435 .buffers()
436 .ok_or_else(|| vortex_err!("Array has no buffers"))?
437 .get(idx);
438 self.buffers
439 .get(buffer_idx as usize)
440 .cloned()
441 .ok_or_else(|| {
442 vortex_err!(
443 "Invalid buffer index {} for array with {} buffers",
444 buffer_idx,
445 self.nbuffers()
446 )
447 })
448 }
449
450 pub fn buffer_lengths(&self) -> Vec<usize> {
456 let fb_array = root::<fba::Array>(self.flatbuffer.as_ref())
457 .vortex_expect("ArrayParts flatbuffer must be a valid Array");
458 fb_array
459 .buffers()
460 .map(|buffers| buffers.iter().map(|b| b.length() as usize).collect())
461 .unwrap_or_default()
462 }
463
464 fn validate_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<(FlatBuffer, usize)> {
466 let fb_buffer = FlatBuffer::align_from(array_tree.into());
467 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
468 let fb_root = fb_array
469 .root()
470 .ok_or_else(|| vortex_err!("Array must have a root node"))?;
471 let flatbuffer_loc = fb_root._tab.loc();
472 Ok((fb_buffer, flatbuffer_loc))
473 }
474
475 pub fn from_flatbuffer_with_buffers(
482 array_tree: impl Into<ByteBuffer>,
483 buffers: Vec<BufferHandle>,
484 ) -> VortexResult<Self> {
485 let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
486 Ok(ArrayParts {
487 flatbuffer,
488 flatbuffer_loc,
489 buffers: buffers.into(),
490 })
491 }
492
493 pub fn from_array_tree(array_tree: impl Into<ByteBuffer>) -> VortexResult<Self> {
502 let (flatbuffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
503 Ok(ArrayParts {
504 flatbuffer,
505 flatbuffer_loc,
506 buffers: Arc::new([]),
507 })
508 }
509
510 fn flatbuffer(&self) -> fba::ArrayNode<'_> {
512 unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
513 }
514
515 fn with_root(&self, root: fba::ArrayNode) -> Self {
518 let mut this = self.clone();
519 this.flatbuffer_loc = root._tab.loc();
520 this
521 }
522
523 pub fn from_flatbuffer_and_segment(
529 array_tree: ByteBuffer,
530 segment: BufferHandle,
531 ) -> VortexResult<Self> {
532 let segment = segment.ensure_aligned(Alignment::none())?;
535
536 let (fb_buffer, flatbuffer_loc) = Self::validate_array_tree(array_tree)?;
539 let fb_array = unsafe { fba::root_as_array_unchecked(fb_buffer.as_ref()) };
541
542 let mut offset = 0;
543 let buffers = fb_array
544 .buffers()
545 .unwrap_or_default()
546 .iter()
547 .map(|fb_buf| {
548 offset += fb_buf.padding() as usize;
549 let buffer_len = fb_buf.length() as usize;
550 let buffer = segment.slice(offset..(offset + buffer_len));
551 let buffer =
552 buffer.ensure_aligned(Alignment::from_exponent(fb_buf.alignment_exponent()))?;
553 offset += buffer_len;
554 Ok(buffer)
555 })
556 .collect::<VortexResult<Arc<[_]>>>()?;
557
558 Ok(ArrayParts {
559 flatbuffer: fb_buffer,
560 flatbuffer_loc,
561 buffers,
562 })
563 }
564}
565
566struct ArrayPartsChildren<'a> {
567 parts: &'a ArrayParts,
568 ctx: &'a ArrayContext,
569 session: &'a VortexSession,
570}
571
572impl ArrayChildren for ArrayPartsChildren<'_> {
573 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
574 self.parts
575 .child(index)
576 .decode(dtype, len, self.ctx, self.session)
577 }
578
579 fn len(&self) -> usize {
580 self.parts.nchildren()
581 }
582}
583
584impl TryFrom<ByteBuffer> for ArrayParts {
585 type Error = VortexError;
586
587 fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
588 if value.len() < 4 {
590 vortex_bail!("ArrayParts buffer is too short");
591 }
592
593 let value = value.aligned(Alignment::none());
595
596 let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
597 if value.len() < 4 + fb_length {
598 vortex_bail!("ArrayParts buffer is too short for flatbuffer");
599 }
600
601 let fb_offset = value.len() - 4 - fb_length;
602 let array_tree = value.slice(fb_offset..fb_offset + fb_length);
603 let segment = BufferHandle::new_host(value.slice(0..fb_offset));
604
605 Self::from_flatbuffer_and_segment(array_tree, segment)
606 }
607}
608
609impl TryFrom<BufferHandle> for ArrayParts {
610 type Error = VortexError;
611
612 fn try_from(value: BufferHandle) -> Result<Self, Self::Error> {
613 Self::try_from(value.try_to_host_sync()?)
614 }
615}