1use std::fmt::{Debug, Formatter};
2use std::iter;
3use std::sync::Arc;
4
5use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset, root};
6use itertools::Itertools;
7use vortex_buffer::{Alignment, ByteBuffer};
8use vortex_dtype::{DType, TryFromBytes};
9use vortex_error::{
10 VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
11};
12use vortex_flatbuffers::array::Compression;
13use vortex_flatbuffers::{
14 FlatBuffer, FlatBufferRoot, ReadFlatBuffer, WriteFlatBuffer, array as fba,
15};
16
17use crate::stats::StatsSet;
18use crate::{Array, ArrayContext, ArrayRef, ArrayVisitor, ArrayVisitorExt};
19
20#[derive(Default, Debug)]
22pub struct SerializeOptions {
23 pub offset: usize,
26 pub include_padding: bool,
28}
29
30impl dyn Array + '_ {
31 pub fn serialize(
42 &self,
43 ctx: &ArrayContext,
44 options: &SerializeOptions,
45 ) -> VortexResult<Vec<ByteBuffer>> {
46 let array_buffers = self
48 .depth_first_traversal()
49 .flat_map(|f| f.buffers())
50 .collect::<Vec<_>>();
51
52 let mut buffers = vec![];
54 let mut fb_buffers = Vec::with_capacity(buffers.capacity());
55
56 let max_alignment = array_buffers
58 .iter()
59 .map(|buf| buf.alignment())
60 .chain(iter::once(FlatBuffer::alignment()))
61 .max()
62 .unwrap_or_else(FlatBuffer::alignment);
63
64 let zeros = ByteBuffer::zeroed(*max_alignment);
66
67 buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
70
71 let mut pos = options.offset;
73
74 for buffer in array_buffers {
76 let padding = if options.include_padding {
77 let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
78 if padding > 0 {
79 pos += padding;
80 buffers.push(zeros.slice(0..padding));
81 }
82 padding
83 } else {
84 0
85 };
86
87 fb_buffers.push(fba::Buffer::new(
88 u16::try_from(padding).vortex_expect("padding fits into u16"),
89 buffer.alignment().exponent(),
90 Compression::None,
91 u32::try_from(buffer.len())
92 .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
93 ));
94
95 pos += buffer.len();
96 buffers.push(buffer.aligned(Alignment::none()));
97 }
98
99 let mut fbb = FlatBufferBuilder::new();
101 let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
102 let fb_root = root.write_flatbuffer(&mut fbb);
103 let fb_buffers = fbb.create_vector(&fb_buffers);
104 let fb_array = fba::Array::create(
105 &mut fbb,
106 &fba::ArrayArgs {
107 root: Some(fb_root),
108 buffers: Some(fb_buffers),
109 },
110 );
111 fbb.finish_minimal(fb_array);
112 let (fb_vec, fb_start) = fbb.collapse();
113 let fb_end = fb_vec.len();
114 let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
115 let fb_length = fb_buffer.len();
116
117 if options.include_padding {
118 let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
119 if padding > 0 {
120 buffers.push(zeros.slice(0..padding));
121 }
122 }
123 buffers.push(fb_buffer);
124
125 buffers.push(ByteBuffer::from(
127 u32::try_from(fb_length)
128 .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
129 .to_le_bytes()
130 .to_vec(),
131 ));
132
133 Ok(buffers)
134 }
135}
136
137pub struct ArrayNodeFlatBuffer<'a> {
139 ctx: &'a ArrayContext,
140 array: &'a dyn Array,
141 buffer_idx: u16,
142}
143
144impl<'a> ArrayNodeFlatBuffer<'a> {
145 pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn Array) -> VortexResult<Self> {
146 for child in array.depth_first_traversal() {
148 if child.metadata()?.is_none() {
149 vortex_bail!(
150 "Array {} does not support serialization",
151 child.encoding_id()
152 );
153 }
154 }
155 Ok(Self {
156 ctx,
157 array,
158 buffer_idx: 0,
159 })
160 }
161}
162
163impl FlatBufferRoot for ArrayNodeFlatBuffer<'_> {}
164
165impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
166 type Target<'t> = fba::ArrayNode<'t>;
167
168 fn write_flatbuffer<'fb>(
169 &self,
170 fbb: &mut FlatBufferBuilder<'fb>,
171 ) -> WIPOffset<Self::Target<'fb>> {
172 let encoding = self.ctx.encoding_idx(&self.array.encoding());
173 let metadata = self
174 .array
175 .metadata()
176 .vortex_expect("Failed to serialize metadata")
178 .vortex_expect("Validated that all arrays support serialization");
179 let metadata = Some(fbb.create_vector(metadata.as_slice()));
180
181 let nbuffers = u16::try_from(self.array.nbuffers())
183 .vortex_expect("Array can have at most u16::MAX buffers");
184 let mut child_buffer_idx = self.buffer_idx + nbuffers;
185
186 let children = &self
187 .array
188 .children()
189 .iter()
190 .map(|child| {
191 let msg = ArrayNodeFlatBuffer {
193 ctx: self.ctx,
194 array: child,
195 buffer_idx: child_buffer_idx,
196 }
197 .write_flatbuffer(fbb);
198 child_buffer_idx = u16::try_from(child.nbuffers_recursive())
199 .ok()
200 .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
201 .vortex_expect("Too many buffers (u16) for Array");
202 msg
203 })
204 .collect::<Vec<_>>();
205 let children = Some(fbb.create_vector(children));
206
207 let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
208 let stats = Some(self.array.statistics().to_owned().write_flatbuffer(fbb));
209
210 fba::ArrayNode::create(
211 fbb,
212 &fba::ArrayNodeArgs {
213 encoding,
214 metadata,
215 children,
216 buffers,
217 stats,
218 },
219 )
220 }
221}
222
223pub trait ArrayChildren {
228 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
230
231 fn len(&self) -> usize;
233
234 fn is_empty(&self) -> bool {
236 self.len() == 0
237 }
238}
239
240#[derive(Clone)]
247pub struct ArrayParts {
248 flatbuffer: FlatBuffer,
250 flatbuffer_loc: usize,
252 buffers: Arc<[ByteBuffer]>,
253}
254
255impl Debug for ArrayParts {
256 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
257 f.debug_struct("ArrayParts")
258 .field("encoding_id", &self.encoding_id())
259 .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
260 .field(
261 "buffers",
262 &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
263 )
264 .field("metadata", &self.metadata())
265 .finish()
266 }
267}
268
269impl ArrayParts {
270 pub fn decode(&self, ctx: &ArrayContext, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
272 let encoding_id = self.flatbuffer().encoding();
273 let vtable = ctx
274 .lookup_encoding(encoding_id)
275 .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
276
277 let buffers: Vec<_> = (0..self.nbuffers())
278 .map(|idx| self.buffer(idx))
279 .try_collect()?;
280
281 let children = ArrayPartsChildren { parts: self, ctx };
282
283 let decoded = vtable.build(dtype, len, self.metadata(), &buffers, &children)?;
284
285 assert_eq!(
286 decoded.len(),
287 len,
288 "Array decoded from {} has incorrect length {}, expected {}",
289 vtable.id(),
290 decoded.len(),
291 len
292 );
293 assert_eq!(
294 decoded.dtype(),
295 dtype,
296 "Array decoded from {} has incorrect dtype {}, expected {}",
297 vtable.id(),
298 decoded.dtype(),
299 dtype,
300 );
301 assert_eq!(
302 decoded.encoding_id(),
303 vtable.id(),
304 "Array decoded from {} has incorrect encoding {}",
305 vtable.id(),
306 decoded.encoding_id(),
307 );
308
309 if let Some(stats) = self.flatbuffer().stats() {
311 let decoded_statistics = decoded.statistics();
312 StatsSet::read_flatbuffer(&stats)?
313 .into_iter()
314 .for_each(|(stat, val)| decoded_statistics.set(stat, val));
315 }
316
317 Ok(decoded)
318 }
319
320 pub fn encoding_id(&self) -> u16 {
322 self.flatbuffer().encoding()
323 }
324
325 pub fn metadata(&self) -> &[u8] {
327 self.flatbuffer()
328 .metadata()
329 .map(|metadata| metadata.bytes())
330 .unwrap_or(&[])
331 }
332
333 pub fn nchildren(&self) -> usize {
335 self.flatbuffer()
336 .children()
337 .map_or(0, |children| children.len())
338 }
339
340 pub fn child(&self, idx: usize) -> ArrayParts {
342 let children = self
343 .flatbuffer()
344 .children()
345 .vortex_expect("Expected array to have children");
346 if idx >= children.len() {
347 vortex_panic!(
348 "Invalid child index {} for array with {} children",
349 idx,
350 children.len()
351 );
352 }
353 self.with_root(children.get(idx))
354 }
355
356 pub fn nbuffers(&self) -> usize {
358 self.flatbuffer()
359 .buffers()
360 .map_or(0, |buffers| buffers.len())
361 }
362
363 pub fn buffer(&self, idx: usize) -> VortexResult<ByteBuffer> {
365 let buffer_idx = self
366 .flatbuffer()
367 .buffers()
368 .ok_or_else(|| vortex_err!("Array has no buffers"))?
369 .get(idx);
370 self.buffers
371 .get(buffer_idx as usize)
372 .cloned()
373 .ok_or_else(|| {
374 vortex_err!(
375 "Invalid buffer index {} for array with {} buffers",
376 buffer_idx,
377 self.nbuffers()
378 )
379 })
380 }
381
382 fn flatbuffer(&self) -> fba::ArrayNode<'_> {
384 unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
385 }
386
387 fn with_root(&self, root: fba::ArrayNode) -> Self {
390 let mut this = self.clone();
391 this.flatbuffer_loc = root._tab.loc();
392 this
393 }
394}
395
396struct ArrayPartsChildren<'a> {
397 parts: &'a ArrayParts,
398 ctx: &'a ArrayContext,
399}
400
401impl ArrayChildren for ArrayPartsChildren<'_> {
402 fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
403 self.parts.child(index).decode(self.ctx, dtype, len)
404 }
405
406 fn len(&self) -> usize {
407 self.parts.nchildren()
408 }
409}
410
411impl TryFrom<ByteBuffer> for ArrayParts {
412 type Error = VortexError;
413
414 fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
415 if value.len() < 4 {
417 vortex_bail!("ArrayParts buffer is too short");
418 }
419
420 let value = value.aligned(Alignment::none());
422
423 let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
424 if value.len() < 4 + fb_length {
425 vortex_bail!("ArrayParts buffer is too short for flatbuffer");
426 }
427
428 let fb_offset = value.len() - 4 - fb_length;
429 let fb_buffer = value.slice(fb_offset..fb_offset + fb_length);
430 let fb_buffer = FlatBuffer::align_from(fb_buffer);
431
432 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
433 let fb_root = fb_array.root().vortex_expect("Array must have a root node");
434
435 let mut offset = 0;
436 let buffers: Arc<[ByteBuffer]> = fb_array
437 .buffers()
438 .unwrap_or_default()
439 .iter()
440 .map(|fb_buffer| {
441 offset += fb_buffer.padding() as usize;
443
444 let buffer_len = fb_buffer.length() as usize;
445
446 let buffer = value
448 .slice(offset..(offset + buffer_len))
449 .aligned(Alignment::from_exponent(fb_buffer.alignment_exponent()));
450
451 offset += buffer_len;
452 buffer
453 })
454 .collect();
455
456 Ok(ArrayParts {
457 flatbuffer: fb_buffer.clone(),
458 flatbuffer_loc: fb_root._tab.loc(),
459 buffers,
460 })
461 }
462}