1use std::fmt::{Debug, Formatter};
2use std::iter;
3use std::sync::Arc;
4
5use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset, root};
6use itertools::Itertools;
7use vortex_buffer::{Alignment, ByteBuffer};
8use vortex_dtype::{DType, TryFromBytes};
9use vortex_error::{
10 VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
11};
12use vortex_flatbuffers::array::Compression;
13use vortex_flatbuffers::{
14 FlatBuffer, FlatBufferRoot, ReadFlatBuffer, WriteFlatBuffer, array as fba,
15};
16
17use crate::stats::StatsSet;
18use crate::{Array, ArrayContext, ArrayRef, ArrayVisitor, ArrayVisitorExt};
19
20#[derive(Default, Debug)]
22pub struct SerializeOptions {
23 pub offset: usize,
26 pub include_padding: bool,
28}
29
30impl dyn Array + '_ {
31 pub fn serialize(&self, ctx: &ArrayContext, options: &SerializeOptions) -> Vec<ByteBuffer> {
42 let mut array_buffers = vec![];
44 for a in self.depth_first_traversal() {
45 for buffer in a.buffers() {
46 array_buffers.push(buffer);
47 }
48 }
49
50 let mut buffers = vec![];
52 let mut fb_buffers = Vec::with_capacity(buffers.capacity());
53
54 let max_alignment = array_buffers
56 .iter()
57 .map(|buf| buf.alignment())
58 .chain(iter::once(FlatBuffer::alignment()))
59 .max()
60 .unwrap_or_else(FlatBuffer::alignment);
61
62 let zeros = ByteBuffer::zeroed(*max_alignment);
64
65 buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
68
69 let mut pos = options.offset;
71
72 for buffer in array_buffers {
74 let padding = if options.include_padding {
75 let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
76 if padding > 0 {
77 pos += padding;
78 buffers.push(zeros.slice(0..padding));
79 }
80 padding
81 } else {
82 0
83 };
84
85 fb_buffers.push(fba::Buffer::new(
86 u16::try_from(padding).vortex_expect("padding fits into u16"),
87 buffer.alignment().exponent(),
88 Compression::None,
89 u32::try_from(buffer.len()).vortex_expect("buffers fit into u32"),
90 ));
91
92 pos += buffer.len();
93 buffers.push(buffer.aligned(Alignment::none()));
94 }
95
96 let mut fbb = FlatBufferBuilder::new();
98 let root = ArrayNodeFlatBuffer::new(ctx, self);
99 let fb_root = root.write_flatbuffer(&mut fbb);
100 let fb_buffers = fbb.create_vector(&fb_buffers);
101 let fb_array = fba::Array::create(
102 &mut fbb,
103 &fba::ArrayArgs {
104 root: Some(fb_root),
105 buffers: Some(fb_buffers),
106 },
107 );
108 fbb.finish_minimal(fb_array);
109 let (fb_vec, fb_start) = fbb.collapse();
110 let fb_end = fb_vec.len();
111 let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
112 let fb_length = fb_buffer.len();
113
114 if options.include_padding {
115 let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
116 if padding > 0 {
117 buffers.push(zeros.slice(0..padding));
118 }
119 }
120 buffers.push(fb_buffer);
121
122 buffers.push(ByteBuffer::from(
124 u32::try_from(fb_length)
125 .vortex_expect("u32 fits into usize")
126 .to_le_bytes()
127 .to_vec(),
128 ));
129
130 buffers
131 }
132}
133
134pub struct ArrayNodeFlatBuffer<'a> {
136 ctx: &'a ArrayContext,
137 array: &'a dyn Array,
138 buffer_idx: u16,
139}
140
141impl<'a> ArrayNodeFlatBuffer<'a> {
142 pub fn new(ctx: &'a ArrayContext, array: &'a dyn Array) -> Self {
143 Self {
144 ctx,
145 array,
146 buffer_idx: 0,
147 }
148 }
149}
150
151impl FlatBufferRoot for ArrayNodeFlatBuffer<'_> {}
152
153impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
154 type Target<'t> = fba::ArrayNode<'t>;
155
156 fn write_flatbuffer<'fb>(
157 &self,
158 fbb: &mut FlatBufferBuilder<'fb>,
159 ) -> WIPOffset<Self::Target<'fb>> {
160 let encoding = self.ctx.encoding_idx(&self.array.vtable());
161 let metadata = self
162 .array
163 .metadata()
164 .map(|bytes| fbb.create_vector(bytes.as_slice()));
165
166 let nbuffers = u16::try_from(self.array.nbuffers())
168 .vortex_expect("Array can have at most u16::MAX buffers");
169 let child_buffer_idx = self.buffer_idx + nbuffers;
170
171 let children = self
172 .array
173 .children()
174 .iter()
175 .scan(child_buffer_idx, |buffer_idx, child| {
176 let msg = ArrayNodeFlatBuffer {
178 ctx: self.ctx,
179 array: child,
180 buffer_idx: *buffer_idx,
181 }
182 .write_flatbuffer(fbb);
183 *buffer_idx = u16::try_from(child.nbuffers_recursive())
184 .ok()
185 .and_then(|nbuffers| nbuffers.checked_add(*buffer_idx))
186 .vortex_expect("Too many buffers (u16) for Array");
187 Some(msg)
188 })
189 .collect_vec();
190 let children = Some(fbb.create_vector(&children));
191
192 let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
193 let stats = Some(self.array.statistics().to_owned().write_flatbuffer(fbb));
194
195 fba::ArrayNode::create(
196 fbb,
197 &fba::ArrayNodeArgs {
198 encoding,
199 metadata,
200 children,
201 buffers,
202 stats,
203 },
204 )
205 }
206}
207
208#[derive(Clone)]
215pub struct ArrayParts {
216 flatbuffer: FlatBuffer,
218 flatbuffer_loc: usize,
220 buffers: Arc<[ByteBuffer]>,
221}
222
223impl Debug for ArrayParts {
224 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
225 f.debug_struct("ArrayParts")
226 .field("encoding_id", &self.encoding_id())
227 .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
228 .field(
229 "buffers",
230 &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
231 )
232 .field("metadata", &self.metadata())
233 .finish()
234 }
235}
236
237impl ArrayParts {
238 pub fn decode(&self, ctx: &ArrayContext, dtype: DType, len: usize) -> VortexResult<ArrayRef> {
240 let encoding_id = self.flatbuffer().encoding();
241 let vtable = ctx
242 .lookup_encoding(encoding_id)
243 .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
244 let decoded = vtable.decode(self, ctx, dtype, len)?;
245 assert_eq!(
246 decoded.len(),
247 len,
248 "Array decoded from {} has incorrect length {}, expected {}",
249 vtable.id(),
250 decoded.len(),
251 len
252 );
253 assert_eq!(
254 decoded.encoding(),
255 vtable.id(),
256 "Array decoded from {} has incorrect encoding {}",
257 vtable.id(),
258 decoded.encoding(),
259 );
260
261 if let Some(stats) = self.flatbuffer().stats() {
263 let decoded_statistics = decoded.statistics();
264 StatsSet::read_flatbuffer(&stats)?
265 .into_iter()
266 .for_each(|(stat, val)| decoded_statistics.set(stat, val));
267 }
268
269 Ok(decoded)
270 }
271
272 pub fn encoding_id(&self) -> u16 {
274 self.flatbuffer().encoding()
275 }
276
277 pub fn metadata(&self) -> Option<&[u8]> {
279 self.flatbuffer()
280 .metadata()
281 .map(|metadata| metadata.bytes())
282 }
283
284 pub fn nchildren(&self) -> usize {
286 self.flatbuffer()
287 .children()
288 .map_or(0, |children| children.len())
289 }
290
291 pub fn child(&self, idx: usize) -> ArrayParts {
293 let children = self
294 .flatbuffer()
295 .children()
296 .vortex_expect("Expected array to have children");
297 if idx >= children.len() {
298 vortex_panic!(
299 "Invalid child index {} for array with {} children",
300 idx,
301 children.len()
302 );
303 }
304 self.with_root(children.get(idx))
305 }
306
307 pub fn nbuffers(&self) -> usize {
309 self.flatbuffer()
310 .buffers()
311 .map_or(0, |buffers| buffers.len())
312 }
313
314 pub fn buffer(&self, idx: usize) -> VortexResult<ByteBuffer> {
316 let buffer_idx = self
317 .flatbuffer()
318 .buffers()
319 .ok_or_else(|| vortex_err!("Array has no buffers"))?
320 .get(idx);
321 self.buffers
322 .get(buffer_idx as usize)
323 .cloned()
324 .ok_or_else(|| {
325 vortex_err!(
326 "Invalid buffer index {} for array with {} buffers",
327 buffer_idx,
328 self.nbuffers()
329 )
330 })
331 }
332
333 fn flatbuffer(&self) -> fba::ArrayNode {
335 unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
336 }
337
338 fn with_root(&self, root: fba::ArrayNode) -> Self {
341 let mut this = self.clone();
342 this.flatbuffer_loc = root._tab.loc();
343 this
344 }
345}
346
347impl TryFrom<ByteBuffer> for ArrayParts {
348 type Error = VortexError;
349
350 fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
351 if value.len() < 4 {
353 vortex_bail!("ArrayParts buffer is too short");
354 }
355
356 let value = value.aligned(Alignment::none());
358
359 let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
360 if value.len() < 4 + fb_length {
361 vortex_bail!("ArrayParts buffer is too short for flatbuffer");
362 }
363
364 let fb_offset = value.len() - 4 - fb_length;
365 let fb_buffer = value.slice(fb_offset..fb_offset + fb_length);
366 let fb_buffer = FlatBuffer::align_from(fb_buffer);
367
368 let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
369 let fb_root = fb_array.root().vortex_expect("Array must have a root node");
370
371 let mut offset = 0;
372 let buffers: Arc<[ByteBuffer]> = fb_array
373 .buffers()
374 .unwrap_or_default()
375 .iter()
376 .map(|fb_buffer| {
377 offset += fb_buffer.padding() as usize;
379
380 let buffer_len = fb_buffer.length() as usize;
381
382 let buffer = value
384 .slice(offset..(offset + buffer_len))
385 .aligned(Alignment::from_exponent(fb_buffer.alignment_exponent()));
386
387 offset += buffer_len;
388 buffer
389 })
390 .collect();
391
392 Ok(ArrayParts {
393 flatbuffer: fb_buffer.clone(),
394 flatbuffer_loc: fb_root._tab.loc(),
395 buffers,
396 })
397 }
398}