1use std::fmt::{Debug, Formatter};
5use std::iter;
6use std::sync::Arc;
7
8use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset, root};
9use itertools::Itertools;
10use vortex_buffer::{Alignment, ByteBuffer};
11use vortex_dtype::{DType, TryFromBytes};
12use vortex_error::{
13 VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
14};
15use vortex_flatbuffers::array::Compression;
16use vortex_flatbuffers::{
17 FlatBuffer, FlatBufferRoot, ReadFlatBuffer, WriteFlatBuffer, array as fba,
18};
19
20use crate::stats::StatsSet;
21use crate::{Array, ArrayContext, ArrayRef, ArrayVisitor, ArrayVisitorExt};
22
23#[derive(Default, Debug)]
25pub struct SerializeOptions {
26 pub offset: usize,
29 pub include_padding: bool,
31}
32
33impl dyn Array + '_ {
34 pub fn serialize(
45 &self,
46 ctx: &ArrayContext,
47 options: &SerializeOptions,
48 ) -> VortexResult<Vec<ByteBuffer>> {
49 let array_buffers = self
51 .depth_first_traversal()
52 .flat_map(|f| f.buffers())
53 .collect::<Vec<_>>();
54
55 let mut buffers = vec![];
57 let mut fb_buffers = Vec::with_capacity(buffers.capacity());
58
59 let max_alignment = array_buffers
61 .iter()
62 .map(|buf| buf.alignment())
63 .chain(iter::once(FlatBuffer::alignment()))
64 .max()
65 .unwrap_or_else(FlatBuffer::alignment);
66
67 let zeros = ByteBuffer::zeroed(*max_alignment);
69
70 buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
73
74 let mut pos = options.offset;
76
77 for buffer in array_buffers {
79 let padding = if options.include_padding {
80 let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
81 if padding > 0 {
82 pos += padding;
83 buffers.push(zeros.slice(0..padding));
84 }
85 padding
86 } else {
87 0
88 };
89
90 fb_buffers.push(fba::Buffer::new(
91 u16::try_from(padding).vortex_expect("padding fits into u16"),
92 buffer.alignment().exponent(),
93 Compression::None,
94 u32::try_from(buffer.len())
95 .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
96 ));
97
98 pos += buffer.len();
99 buffers.push(buffer.aligned(Alignment::none()));
100 }
101
102 let mut fbb = FlatBufferBuilder::new();
104 let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
105 let fb_root = root.write_flatbuffer(&mut fbb);
106 let fb_buffers = fbb.create_vector(&fb_buffers);
107 let fb_array = fba::Array::create(
108 &mut fbb,
109 &fba::ArrayArgs {
110 root: Some(fb_root),
111 buffers: Some(fb_buffers),
112 },
113 );
114 fbb.finish_minimal(fb_array);
115 let (fb_vec, fb_start) = fbb.collapse();
116 let fb_end = fb_vec.len();
117 let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
118 let fb_length = fb_buffer.len();
119
120 if options.include_padding {
121 let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
122 if padding > 0 {
123 buffers.push(zeros.slice(0..padding));
124 }
125 }
126 buffers.push(fb_buffer);
127
128 buffers.push(ByteBuffer::from(
130 u32::try_from(fb_length)
131 .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
132 .to_le_bytes()
133 .to_vec(),
134 ));
135
136 Ok(buffers)
137 }
138}
139
140pub struct ArrayNodeFlatBuffer<'a> {
142 ctx: &'a ArrayContext,
143 array: &'a dyn Array,
144 buffer_idx: u16,
145}
146
147impl<'a> ArrayNodeFlatBuffer<'a> {
148 pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn Array) -> VortexResult<Self> {
149 for child in array.depth_first_traversal() {
151 if child.metadata()?.is_none() {
152 vortex_bail!(
153 "Array {} does not support serialization",
154 child.encoding_id()
155 );
156 }
157 }
158 Ok(Self {
159 ctx,
160 array,
161 buffer_idx: 0,
162 })
163 }
164}
165
166impl FlatBufferRoot for ArrayNodeFlatBuffer<'_> {}
167
168impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
169 type Target<'t> = fba::ArrayNode<'t>;
170
171 fn write_flatbuffer<'fb>(
172 &self,
173 fbb: &mut FlatBufferBuilder<'fb>,
174 ) -> WIPOffset<Self::Target<'fb>> {
175 let encoding = self.ctx.encoding_idx(&self.array.encoding());
176 let metadata = self
177 .array
178 .metadata()
179 .vortex_expect("Failed to serialize metadata")
181 .vortex_expect("Validated that all arrays support serialization");
182 let metadata = Some(fbb.create_vector(metadata.as_slice()));
183
184 let nbuffers = u16::try_from(self.array.nbuffers())
186 .vortex_expect("Array can have at most u16::MAX buffers");
187 let mut child_buffer_idx = self.buffer_idx + nbuffers;
188
189 let children = &self
190 .array
191 .children()
192 .iter()
193 .map(|child| {
194 let msg = ArrayNodeFlatBuffer {
196 ctx: self.ctx,
197 array: child,
198 buffer_idx: child_buffer_idx,
199 }
200 .write_flatbuffer(fbb);
201 child_buffer_idx = u16::try_from(child.nbuffers_recursive())
202 .ok()
203 .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
204 .vortex_expect("Too many buffers (u16) for Array");
205 msg
206 })
207 .collect::<Vec<_>>();
208 let children = Some(fbb.create_vector(children));
209
210 let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
211 let stats = Some(self.array.statistics().write_flatbuffer(fbb));
212
213 fba::ArrayNode::create(
214 fbb,
215 &fba::ArrayNodeArgs {
216 encoding,
217 metadata,
218 children,
219 buffers,
220 stats,
221 },
222 )
223 }
224}
225
226pub trait ArrayChildren {
229 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
231
232 fn len(&self) -> usize;
234
235 fn is_empty(&self) -> bool {
237 self.len() == 0
238 }
239}
240
241#[derive(Clone)]
248pub struct ArrayParts {
249 flatbuffer: FlatBuffer,
251 flatbuffer_loc: usize,
253 buffers: Arc<[ByteBuffer]>,
254}
255
256impl Debug for ArrayParts {
257 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
258 f.debug_struct("ArrayParts")
259 .field("encoding_id", &self.encoding_id())
260 .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
261 .field(
262 "buffers",
263 &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
264 )
265 .field("metadata", &self.metadata())
266 .finish()
267 }
268}
269
270impl ArrayParts {
271 pub fn decode(&self, ctx: &ArrayContext, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
273 let encoding_id = self.flatbuffer().encoding();
274 let vtable = ctx
275 .lookup_encoding(encoding_id)
276 .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
277
278 let buffers: Vec<_> = (0..self.nbuffers())
279 .map(|idx| self.buffer(idx))
280 .try_collect()?;
281
282 let children = ArrayPartsChildren { parts: self, ctx };
283
284 let decoded = vtable.build(dtype, len, self.metadata(), &buffers, &children)?;
285
286 assert_eq!(
287 decoded.len(),
288 len,
289 "Array decoded from {} has incorrect length {}, expected {}",
290 vtable.id(),
291 decoded.len(),
292 len
293 );
294 assert_eq!(
295 decoded.dtype(),
296 dtype,
297 "Array decoded from {} has incorrect dtype {}, expected {}",
298 vtable.id(),
299 decoded.dtype(),
300 dtype,
301 );
302 assert_eq!(
303 decoded.encoding_id(),
304 vtable.id(),
305 "Array decoded from {} has incorrect encoding {}",
306 vtable.id(),
307 decoded.encoding_id(),
308 );
309
310 if let Some(stats) = self.flatbuffer().stats() {
312 let decoded_statistics = decoded.statistics();
313 StatsSet::read_flatbuffer(&stats)?
314 .into_iter()
315 .for_each(|(stat, val)| decoded_statistics.set(stat, val));
316 }
317
318 Ok(decoded)
319 }
320
321 pub fn encoding_id(&self) -> u16 {
323 self.flatbuffer().encoding()
324 }
325
326 pub fn metadata(&self) -> &[u8] {
328 self.flatbuffer()
329 .metadata()
330 .map(|metadata| metadata.bytes())
331 .unwrap_or(&[])
332 }
333
334 pub fn nchildren(&self) -> usize {
336 self.flatbuffer()
337 .children()
338 .map_or(0, |children| children.len())
339 }
340
341 pub fn child(&self, idx: usize) -> ArrayParts {
343 let children = self
344 .flatbuffer()
345 .children()
346 .vortex_expect("Expected array to have children");
347 if idx >= children.len() {
348 vortex_panic!(
349 "Invalid child index {} for array with {} children",
350 idx,
351 children.len()
352 );
353 }
354 self.with_root(children.get(idx))
355 }
356
357 pub fn nbuffers(&self) -> usize {
359 self.flatbuffer()
360 .buffers()
361 .map_or(0, |buffers| buffers.len())
362 }
363
364 pub fn buffer(&self, idx: usize) -> VortexResult<ByteBuffer> {
366 let buffer_idx = self
367 .flatbuffer()
368 .buffers()
369 .ok_or_else(|| vortex_err!("Array has no buffers"))?
370 .get(idx);
371 self.buffers
372 .get(buffer_idx as usize)
373 .cloned()
374 .ok_or_else(|| {
375 vortex_err!(
376 "Invalid buffer index {} for array with {} buffers",
377 buffer_idx,
378 self.nbuffers()
379 )
380 })
381 }
382
383 fn flatbuffer(&self) -> fba::ArrayNode<'_> {
385 unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
386 }
387
388 fn with_root(&self, root: fba::ArrayNode) -> Self {
391 let mut this = self.clone();
392 this.flatbuffer_loc = root._tab.loc();
393 this
394 }
395}
396
397struct ArrayPartsChildren<'a> {
398 parts: &'a ArrayParts,
399 ctx: &'a ArrayContext,
400}
401
402impl ArrayChildren for ArrayPartsChildren<'_> {
403 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
404 self.parts.child(index).decode(self.ctx, dtype, len)
405 }
406
407 fn len(&self) -> usize {
408 self.parts.nchildren()
409 }
410}
411
412impl TryFrom<ByteBuffer> for ArrayParts {
413 type Error = VortexError;
414
415 fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
416 if value.len() < 4 {
418 vortex_bail!("ArrayParts buffer is too short");
419 }
420
421 let value = value.aligned(Alignment::none());
423
424 let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
425 if value.len() < 4 + fb_length {
426 vortex_bail!("ArrayParts buffer is too short for flatbuffer");
427 }
428
429 let fb_offset = value.len() - 4 - fb_length;
430 let fb_buffer = value.slice(fb_offset..fb_offset + fb_length);
431 let fb_buffer = FlatBuffer::align_from(fb_buffer);
432
433 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
434 let fb_root = fb_array.root().vortex_expect("Array must have a root node");
435
436 let mut offset = 0;
437 let buffers: Arc<[ByteBuffer]> = fb_array
438 .buffers()
439 .unwrap_or_default()
440 .iter()
441 .map(|fb_buffer| {
442 offset += fb_buffer.padding() as usize;
444
445 let buffer_len = fb_buffer.length() as usize;
446
447 let buffer = value
449 .slice(offset..(offset + buffer_len))
450 .aligned(Alignment::from_exponent(fb_buffer.alignment_exponent()));
451
452 offset += buffer_len;
453 buffer
454 })
455 .collect();
456
457 Ok(ArrayParts {
458 flatbuffer: fb_buffer.clone(),
459 flatbuffer_loc: fb_root._tab.loc(),
460 buffers,
461 })
462 }
463}