1use std::fmt::{Debug, Formatter};
2use std::iter;
3use std::sync::Arc;
4
5use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset, root};
6use itertools::Itertools;
7use vortex_buffer::{Alignment, ByteBuffer};
8use vortex_dtype::{DType, TryFromBytes};
9use vortex_error::{
10 VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
11};
12use vortex_flatbuffers::array::Compression;
13use vortex_flatbuffers::{
14 FlatBuffer, FlatBufferRoot, ReadFlatBuffer, WriteFlatBuffer, array as fba,
15};
16
17use crate::stats::StatsSet;
18use crate::{Array, ArrayContext, ArrayRef, ArrayVisitor, ArrayVisitorExt};
19
20#[derive(Default, Debug)]
22pub struct SerializeOptions {
23 pub offset: usize,
26 pub include_padding: bool,
28}
29
30impl dyn Array + '_ {
31 pub fn serialize(&self, ctx: &ArrayContext, options: &SerializeOptions) -> Vec<ByteBuffer> {
42 let mut array_buffers = vec![];
44 for a in self.depth_first_traversal() {
45 for buffer in a.buffers() {
46 array_buffers.push(buffer);
47 }
48 }
49
50 let mut buffers = vec![];
52 let mut fb_buffers = Vec::with_capacity(buffers.capacity());
53
54 let max_alignment = array_buffers
56 .iter()
57 .map(|buf| buf.alignment())
58 .chain(iter::once(FlatBuffer::alignment()))
59 .max()
60 .unwrap_or_else(FlatBuffer::alignment);
61
62 let zeros = ByteBuffer::zeroed(*max_alignment);
64
65 buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
68
69 let mut pos = options.offset;
71
72 for buffer in array_buffers {
74 let padding = if options.include_padding {
75 let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
76 if padding > 0 {
77 pos += padding;
78 buffers.push(zeros.slice(0..padding));
79 }
80 padding
81 } else {
82 0
83 };
84
85 fb_buffers.push(fba::Buffer::new(
86 u16::try_from(padding).vortex_expect("padding fits into u16"),
87 buffer.alignment().exponent(),
88 Compression::None,
89 u32::try_from(buffer.len()).vortex_expect("buffers fit into u32"),
90 ));
91
92 pos += buffer.len();
93 buffers.push(buffer.aligned(Alignment::none()));
94 }
95
96 let mut fbb = FlatBufferBuilder::new();
98 let root = ArrayNodeFlatBuffer::new(ctx, self);
99 let fb_root = root.write_flatbuffer(&mut fbb);
100 let fb_buffers = fbb.create_vector(&fb_buffers);
101 let fb_array = fba::Array::create(
102 &mut fbb,
103 &fba::ArrayArgs {
104 root: Some(fb_root),
105 buffers: Some(fb_buffers),
106 },
107 );
108 fbb.finish_minimal(fb_array);
109 let (fb_vec, fb_start) = fbb.collapse();
110 let fb_end = fb_vec.len();
111 let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
112 let fb_length = fb_buffer.len();
113
114 if options.include_padding {
115 let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
116 if padding > 0 {
117 buffers.push(zeros.slice(0..padding));
118 }
119 }
120 buffers.push(fb_buffer);
121
122 buffers.push(ByteBuffer::from(
124 u32::try_from(fb_length)
125 .vortex_expect("u32 fits into usize")
126 .to_le_bytes()
127 .to_vec(),
128 ));
129
130 buffers
131 }
132}
133
134pub struct ArrayNodeFlatBuffer<'a> {
136 ctx: &'a ArrayContext,
137 array: &'a dyn Array,
138 buffer_idx: u16,
139}
140
141impl<'a> ArrayNodeFlatBuffer<'a> {
142 pub fn new(ctx: &'a ArrayContext, array: &'a dyn Array) -> Self {
143 Self {
144 ctx,
145 array,
146 buffer_idx: 0,
147 }
148 }
149}
150
151impl FlatBufferRoot for ArrayNodeFlatBuffer<'_> {}
152
153impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
154 type Target<'t> = fba::ArrayNode<'t>;
155
156 fn write_flatbuffer<'fb>(
157 &self,
158 fbb: &mut FlatBufferBuilder<'fb>,
159 ) -> WIPOffset<Self::Target<'fb>> {
160 let encoding = self.ctx.encoding_idx(&self.array.vtable());
161 let metadata = self
162 .array
163 .metadata()
164 .map(|bytes| fbb.create_vector(bytes.as_slice()));
165
166 let nbuffers = u16::try_from(self.array.nbuffers())
168 .vortex_expect("Array can have at most u16::MAX buffers");
169 let child_buffer_idx = self.buffer_idx + nbuffers;
170
171 let children = self
172 .array
173 .children()
174 .iter()
175 .scan(child_buffer_idx, |buffer_idx, child| {
176 let msg = ArrayNodeFlatBuffer {
178 ctx: self.ctx,
179 array: child,
180 buffer_idx: *buffer_idx,
181 }
182 .write_flatbuffer(fbb);
183 *buffer_idx = u16::try_from(child.nbuffers_recursive())
184 .ok()
185 .and_then(|nbuffers| nbuffers.checked_add(*buffer_idx))
186 .vortex_expect("Too many buffers (u16) for Array");
187 Some(msg)
188 })
189 .collect_vec();
190 let children = Some(fbb.create_vector(&children));
191
192 let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
193 let stats = Some(self.array.statistics().to_owned().write_flatbuffer(fbb));
194
195 fba::ArrayNode::create(
196 fbb,
197 &fba::ArrayNodeArgs {
198 encoding,
199 metadata,
200 children,
201 buffers,
202 stats,
203 },
204 )
205 }
206}
207
208#[derive(Clone)]
214pub struct ArrayParts {
215 flatbuffer: FlatBuffer,
217 flatbuffer_loc: usize,
219 buffers: Arc<[ByteBuffer]>,
220}
221
222impl Debug for ArrayParts {
223 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
224 f.debug_struct("ArrayParts")
225 .field("encoding_id", &self.encoding_id())
226 .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
227 .field(
228 "buffers",
229 &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
230 )
231 .field("metadata", &self.metadata())
232 .finish()
233 }
234}
235
236impl ArrayParts {
237 pub fn decode(&self, ctx: &ArrayContext, dtype: DType, len: usize) -> VortexResult<ArrayRef> {
239 let encoding_id = self.flatbuffer().encoding();
240 let vtable = ctx
241 .lookup_encoding(encoding_id)
242 .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
243 let decoded = vtable.decode(self, ctx, dtype, len)?;
244 assert_eq!(
245 decoded.len(),
246 len,
247 "Array decoded from {} has incorrect length {}, expected {}",
248 vtable.id(),
249 decoded.len(),
250 len
251 );
252 assert_eq!(
253 decoded.encoding(),
254 vtable.id(),
255 "Array decoded from {} has incorrect encoding {}",
256 vtable.id(),
257 decoded.encoding(),
258 );
259
260 if let Some(stats) = self.flatbuffer().stats() {
262 let decoded_statistics = decoded.statistics();
263 StatsSet::read_flatbuffer(&stats)?
264 .into_iter()
265 .for_each(|(stat, val)| decoded_statistics.set(stat, val));
266 }
267
268 Ok(decoded)
269 }
270
271 pub fn encoding_id(&self) -> u16 {
273 self.flatbuffer().encoding()
274 }
275
276 pub fn metadata(&self) -> Option<&[u8]> {
278 self.flatbuffer()
279 .metadata()
280 .map(|metadata| metadata.bytes())
281 }
282
283 pub fn nchildren(&self) -> usize {
285 self.flatbuffer()
286 .children()
287 .map_or(0, |children| children.len())
288 }
289
290 pub fn child(&self, idx: usize) -> ArrayParts {
292 let children = self
293 .flatbuffer()
294 .children()
295 .vortex_expect("Expected array to have children");
296 if idx >= children.len() {
297 vortex_panic!(
298 "Invalid child index {} for array with {} children",
299 idx,
300 children.len()
301 );
302 }
303 self.with_root(children.get(idx))
304 }
305
306 pub fn children(&self) -> Vec<ArrayParts> {
308 self.flatbuffer()
309 .children()
310 .iter()
311 .flat_map(|children| children.iter())
312 .map(move |child| self.with_root(child))
313 .collect()
314 }
315
316 pub fn nbuffers(&self) -> usize {
318 self.flatbuffer()
319 .buffers()
320 .map_or(0, |buffers| buffers.len())
321 }
322
323 pub fn buffer(&self, idx: usize) -> VortexResult<ByteBuffer> {
325 let buffer_idx = self
326 .flatbuffer()
327 .buffers()
328 .ok_or_else(|| vortex_err!("Array has no buffers"))?
329 .get(idx);
330 self.buffers
331 .get(buffer_idx as usize)
332 .cloned()
333 .ok_or_else(|| {
334 vortex_err!(
335 "Invalid buffer index {} for array with {} buffers",
336 buffer_idx,
337 self.nbuffers()
338 )
339 })
340 }
341
342 fn flatbuffer(&self) -> fba::ArrayNode {
344 unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
345 }
346
347 fn with_root(&self, root: fba::ArrayNode) -> Self {
350 let mut this = self.clone();
351 this.flatbuffer_loc = root._tab.loc();
352 this
353 }
354}
355
356impl TryFrom<ByteBuffer> for ArrayParts {
357 type Error = VortexError;
358
359 fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
360 if value.len() < 4 {
362 vortex_bail!("ArrayParts buffer is too short");
363 }
364
365 let value = value.aligned(Alignment::none());
367
368 let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
369 let fb_offset = value.len() - 4 - fb_length;
370 let fb_buffer = value.slice(fb_offset..fb_offset + fb_length);
371 let fb_buffer = FlatBuffer::align_from(fb_buffer);
372
373 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
374 let fb_root = fb_array.root().vortex_expect("Array must have a root node");
375
376 let mut offset = 0;
377 let buffers: Arc<[ByteBuffer]> = fb_array
378 .buffers()
379 .unwrap_or_default()
380 .iter()
381 .map(|fb_buffer| {
382 offset += fb_buffer.padding() as usize;
384
385 let buffer_len = fb_buffer.length() as usize;
386
387 let buffer = value
389 .slice(offset..(offset + buffer_len))
390 .aligned(Alignment::from_exponent(fb_buffer.alignment_exponent()));
391
392 offset += buffer_len;
393 buffer
394 })
395 .collect();
396
397 Ok(ArrayParts {
398 flatbuffer: fb_buffer.clone(),
399 flatbuffer_loc: fb_root._tab.loc(),
400 buffers,
401 })
402 }
403}