1use std::fmt::{Debug, Formatter};
5use std::iter;
6use std::sync::Arc;
7
8use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset, root};
9use itertools::Itertools;
10use vortex_buffer::{Alignment, ByteBuffer};
11use vortex_dtype::{DType, TryFromBytes};
12use vortex_error::{
13 VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
14};
15use vortex_flatbuffers::array::Compression;
16use vortex_flatbuffers::{
17 FlatBuffer, FlatBufferRoot, ReadFlatBuffer, WriteFlatBuffer, array as fba,
18};
19
20use crate::stats::StatsSet;
21use crate::{Array, ArrayContext, ArrayRef, ArrayVisitor, ArrayVisitorExt};
22
23#[derive(Default, Debug)]
25pub struct SerializeOptions {
26 pub offset: usize,
29 pub include_padding: bool,
31}
32
33impl dyn Array + '_ {
34 pub fn serialize(
45 &self,
46 ctx: &ArrayContext,
47 options: &SerializeOptions,
48 ) -> VortexResult<Vec<ByteBuffer>> {
49 let array_buffers = self
51 .depth_first_traversal()
52 .flat_map(|f| f.buffers())
53 .collect::<Vec<_>>();
54
55 let mut buffers = vec![];
57 let mut fb_buffers = Vec::with_capacity(buffers.capacity());
58
59 let max_alignment = array_buffers
61 .iter()
62 .map(|buf| buf.alignment())
63 .chain(iter::once(FlatBuffer::alignment()))
64 .max()
65 .unwrap_or_else(FlatBuffer::alignment);
66
67 let zeros = ByteBuffer::zeroed(*max_alignment);
69
70 buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
73
74 let mut pos = options.offset;
76
77 for buffer in array_buffers {
79 let padding = if options.include_padding {
80 let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
81 if padding > 0 {
82 pos += padding;
83 buffers.push(zeros.slice(0..padding));
84 }
85 padding
86 } else {
87 0
88 };
89
90 fb_buffers.push(fba::Buffer::new(
91 u16::try_from(padding).vortex_expect("padding fits into u16"),
92 buffer.alignment().exponent(),
93 Compression::None,
94 u32::try_from(buffer.len())
95 .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
96 ));
97
98 pos += buffer.len();
99 buffers.push(buffer.aligned(Alignment::none()));
100 }
101
102 let mut fbb = FlatBufferBuilder::new();
104 let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
105 let fb_root = root.write_flatbuffer(&mut fbb);
106 let fb_buffers = fbb.create_vector(&fb_buffers);
107 let fb_array = fba::Array::create(
108 &mut fbb,
109 &fba::ArrayArgs {
110 root: Some(fb_root),
111 buffers: Some(fb_buffers),
112 },
113 );
114 fbb.finish_minimal(fb_array);
115 let (fb_vec, fb_start) = fbb.collapse();
116 let fb_end = fb_vec.len();
117 let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
118 let fb_length = fb_buffer.len();
119
120 if options.include_padding {
121 let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
122 if padding > 0 {
123 buffers.push(zeros.slice(0..padding));
124 }
125 }
126 buffers.push(fb_buffer);
127
128 buffers.push(ByteBuffer::from(
130 u32::try_from(fb_length)
131 .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
132 .to_le_bytes()
133 .to_vec(),
134 ));
135
136 Ok(buffers)
137 }
138}
139
140pub struct ArrayNodeFlatBuffer<'a> {
142 ctx: &'a ArrayContext,
143 array: &'a dyn Array,
144 buffer_idx: u16,
145}
146
147impl<'a> ArrayNodeFlatBuffer<'a> {
148 pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn Array) -> VortexResult<Self> {
149 for child in array.depth_first_traversal() {
151 if child.metadata()?.is_none() {
152 vortex_bail!(
153 "Array {} does not support serialization",
154 child.encoding_id()
155 );
156 }
157 }
158 Ok(Self {
159 ctx,
160 array,
161 buffer_idx: 0,
162 })
163 }
164}
165
166impl FlatBufferRoot for ArrayNodeFlatBuffer<'_> {}
167
168impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
169 type Target<'t> = fba::ArrayNode<'t>;
170
171 fn write_flatbuffer<'fb>(
172 &self,
173 fbb: &mut FlatBufferBuilder<'fb>,
174 ) -> WIPOffset<Self::Target<'fb>> {
175 let encoding = self.ctx.encoding_idx(&self.array.encoding());
176 let metadata = self
177 .array
178 .metadata()
179 .vortex_expect("Failed to serialize metadata")
181 .vortex_expect("Validated that all arrays support serialization");
182 let metadata = Some(fbb.create_vector(metadata.as_slice()));
183
184 let nbuffers = u16::try_from(self.array.nbuffers())
186 .vortex_expect("Array can have at most u16::MAX buffers");
187 let mut child_buffer_idx = self.buffer_idx + nbuffers;
188
189 let children = &self
190 .array
191 .children()
192 .iter()
193 .map(|child| {
194 let msg = ArrayNodeFlatBuffer {
196 ctx: self.ctx,
197 array: child,
198 buffer_idx: child_buffer_idx,
199 }
200 .write_flatbuffer(fbb);
201 child_buffer_idx = u16::try_from(child.nbuffers_recursive())
202 .ok()
203 .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
204 .vortex_expect("Too many buffers (u16) for Array");
205 msg
206 })
207 .collect::<Vec<_>>();
208 let children = Some(fbb.create_vector(children));
209
210 let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
211 let stats = Some(self.array.statistics().to_owned().write_flatbuffer(fbb));
212
213 fba::ArrayNode::create(
214 fbb,
215 &fba::ArrayNodeArgs {
216 encoding,
217 metadata,
218 children,
219 buffers,
220 stats,
221 },
222 )
223 }
224}
225
226pub trait ArrayChildren {
231 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
233
234 fn len(&self) -> usize;
236
237 fn is_empty(&self) -> bool {
239 self.len() == 0
240 }
241}
242
243#[derive(Clone)]
250pub struct ArrayParts {
251 flatbuffer: FlatBuffer,
253 flatbuffer_loc: usize,
255 buffers: Arc<[ByteBuffer]>,
256}
257
258impl Debug for ArrayParts {
259 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
260 f.debug_struct("ArrayParts")
261 .field("encoding_id", &self.encoding_id())
262 .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
263 .field(
264 "buffers",
265 &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
266 )
267 .field("metadata", &self.metadata())
268 .finish()
269 }
270}
271
272impl ArrayParts {
273 pub fn decode(&self, ctx: &ArrayContext, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
275 let encoding_id = self.flatbuffer().encoding();
276 let vtable = ctx
277 .lookup_encoding(encoding_id)
278 .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
279
280 let buffers: Vec<_> = (0..self.nbuffers())
281 .map(|idx| self.buffer(idx))
282 .try_collect()?;
283
284 let children = ArrayPartsChildren { parts: self, ctx };
285
286 let decoded = vtable.build(dtype, len, self.metadata(), &buffers, &children)?;
287
288 assert_eq!(
289 decoded.len(),
290 len,
291 "Array decoded from {} has incorrect length {}, expected {}",
292 vtable.id(),
293 decoded.len(),
294 len
295 );
296 assert_eq!(
297 decoded.dtype(),
298 dtype,
299 "Array decoded from {} has incorrect dtype {}, expected {}",
300 vtable.id(),
301 decoded.dtype(),
302 dtype,
303 );
304 assert_eq!(
305 decoded.encoding_id(),
306 vtable.id(),
307 "Array decoded from {} has incorrect encoding {}",
308 vtable.id(),
309 decoded.encoding_id(),
310 );
311
312 if let Some(stats) = self.flatbuffer().stats() {
314 let decoded_statistics = decoded.statistics();
315 StatsSet::read_flatbuffer(&stats)?
316 .into_iter()
317 .for_each(|(stat, val)| decoded_statistics.set(stat, val));
318 }
319
320 Ok(decoded)
321 }
322
323 pub fn encoding_id(&self) -> u16 {
325 self.flatbuffer().encoding()
326 }
327
328 pub fn metadata(&self) -> &[u8] {
330 self.flatbuffer()
331 .metadata()
332 .map(|metadata| metadata.bytes())
333 .unwrap_or(&[])
334 }
335
336 pub fn nchildren(&self) -> usize {
338 self.flatbuffer()
339 .children()
340 .map_or(0, |children| children.len())
341 }
342
343 pub fn child(&self, idx: usize) -> ArrayParts {
345 let children = self
346 .flatbuffer()
347 .children()
348 .vortex_expect("Expected array to have children");
349 if idx >= children.len() {
350 vortex_panic!(
351 "Invalid child index {} for array with {} children",
352 idx,
353 children.len()
354 );
355 }
356 self.with_root(children.get(idx))
357 }
358
359 pub fn nbuffers(&self) -> usize {
361 self.flatbuffer()
362 .buffers()
363 .map_or(0, |buffers| buffers.len())
364 }
365
366 pub fn buffer(&self, idx: usize) -> VortexResult<ByteBuffer> {
368 let buffer_idx = self
369 .flatbuffer()
370 .buffers()
371 .ok_or_else(|| vortex_err!("Array has no buffers"))?
372 .get(idx);
373 self.buffers
374 .get(buffer_idx as usize)
375 .cloned()
376 .ok_or_else(|| {
377 vortex_err!(
378 "Invalid buffer index {} for array with {} buffers",
379 buffer_idx,
380 self.nbuffers()
381 )
382 })
383 }
384
385 fn flatbuffer(&self) -> fba::ArrayNode<'_> {
387 unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
388 }
389
390 fn with_root(&self, root: fba::ArrayNode) -> Self {
393 let mut this = self.clone();
394 this.flatbuffer_loc = root._tab.loc();
395 this
396 }
397}
398
399struct ArrayPartsChildren<'a> {
400 parts: &'a ArrayParts,
401 ctx: &'a ArrayContext,
402}
403
404impl ArrayChildren for ArrayPartsChildren<'_> {
405 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
406 self.parts.child(index).decode(self.ctx, dtype, len)
407 }
408
409 fn len(&self) -> usize {
410 self.parts.nchildren()
411 }
412}
413
414impl TryFrom<ByteBuffer> for ArrayParts {
415 type Error = VortexError;
416
417 fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
418 if value.len() < 4 {
420 vortex_bail!("ArrayParts buffer is too short");
421 }
422
423 let value = value.aligned(Alignment::none());
425
426 let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
427 if value.len() < 4 + fb_length {
428 vortex_bail!("ArrayParts buffer is too short for flatbuffer");
429 }
430
431 let fb_offset = value.len() - 4 - fb_length;
432 let fb_buffer = value.slice(fb_offset..fb_offset + fb_length);
433 let fb_buffer = FlatBuffer::align_from(fb_buffer);
434
435 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
436 let fb_root = fb_array.root().vortex_expect("Array must have a root node");
437
438 let mut offset = 0;
439 let buffers: Arc<[ByteBuffer]> = fb_array
440 .buffers()
441 .unwrap_or_default()
442 .iter()
443 .map(|fb_buffer| {
444 offset += fb_buffer.padding() as usize;
446
447 let buffer_len = fb_buffer.length() as usize;
448
449 let buffer = value
451 .slice(offset..(offset + buffer_len))
452 .aligned(Alignment::from_exponent(fb_buffer.alignment_exponent()));
453
454 offset += buffer_len;
455 buffer
456 })
457 .collect();
458
459 Ok(ArrayParts {
460 flatbuffer: fb_buffer.clone(),
461 flatbuffer_loc: fb_root._tab.loc(),
462 buffers,
463 })
464 }
465}