vortex_layout/
data.rs

1use std::collections::BTreeSet;
2use std::ops::Deref;
3use std::sync::Arc;
4
5use bytes::Bytes;
6use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset};
7use vortex_array::ArrayContext;
8use vortex_dtype::{DType, FieldMask};
9use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic};
10use vortex_flatbuffers::{FlatBuffer, FlatBufferRoot, WriteFlatBuffer, layout};
11
12use crate::LayoutId;
13use crate::context::LayoutContext;
14use crate::reader::LayoutReader;
15use crate::segments::{AsyncSegmentReader, SegmentCollector, SegmentId};
16use crate::vtable::LayoutVTableRef;
17
18/// [`Layout`] is the lazy equivalent to [`vortex_array::ArrayRef`], providing a hierarchical
19/// structure.
20#[derive(Debug, Clone)]
21pub struct Layout(Inner);
22
23#[derive(Debug, Clone)]
24enum Inner {
25    Owned(OwnedLayout),
26    Viewed(ViewedLayout),
27}
28
29/// A layout that is fully deserialized and heap-allocated.
30#[derive(Debug, Clone)]
31pub struct OwnedLayout {
32    name: Arc<str>,
33    vtable: LayoutVTableRef,
34    dtype: DType,
35    row_count: u64,
36    segments: Vec<SegmentId>,
37    children: Vec<Layout>,
38    metadata: Option<Bytes>,
39}
40
41/// A layout that is lazily deserialized from a flatbuffer message.
42#[derive(Debug, Clone)]
43struct ViewedLayout {
44    name: Arc<str>,
45    vtable: LayoutVTableRef,
46    dtype: DType,
47    flatbuffer: FlatBuffer,
48    flatbuffer_loc: usize,
49    ctx: LayoutContext,
50}
51
52impl ViewedLayout {
53    /// Return the flatbuffer layout message.
54    fn flatbuffer(&self) -> layout::Layout<'_> {
55        unsafe { layout::Layout::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
56    }
57}
58
59impl Layout {
60    /// Create a new owned layout.
61    pub fn new_owned(
62        name: Arc<str>,
63        vtable: LayoutVTableRef,
64        dtype: DType,
65        row_count: u64,
66        segments: Vec<SegmentId>,
67        children: Vec<Layout>,
68        metadata: Option<Bytes>,
69    ) -> Self {
70        Self(Inner::Owned(OwnedLayout {
71            name,
72            vtable,
73            dtype,
74            row_count,
75            segments,
76            children,
77            metadata,
78        }))
79    }
80
81    /// Create a new viewed layout from a flatbuffer root message.
82    ///
83    /// # SAFETY
84    ///
85    /// Assumes that flatbuffer has been previously validated and has same encoding id as the passed encoding
86    pub unsafe fn new_viewed_unchecked(
87        name: Arc<str>,
88        encoding: LayoutVTableRef,
89        dtype: DType,
90        flatbuffer: FlatBuffer,
91        flatbuffer_loc: usize,
92        ctx: LayoutContext,
93    ) -> Self {
94        Self(Inner::Viewed(ViewedLayout {
95            name,
96            vtable: encoding,
97            dtype,
98            flatbuffer,
99            flatbuffer_loc,
100            ctx,
101        }))
102    }
103
104    /// Returns the human-readable name of the layout.
105    pub fn name(&self) -> &str {
106        match &self.0 {
107            Inner::Owned(owned) => owned.name.as_ref(),
108            Inner::Viewed(viewed) => viewed.name.as_ref(),
109        }
110    }
111
112    /// Returns the [`crate::LayoutVTable`] for this layout.
113    pub fn vtable(&self) -> &LayoutVTableRef {
114        match &self.0 {
115            Inner::Owned(owned) => &owned.vtable,
116            Inner::Viewed(viewed) => &viewed.vtable,
117        }
118    }
119
120    /// Returns the ID of the layout.
121    pub fn id(&self) -> LayoutId {
122        self.vtable().id()
123    }
124
125    /// Return the row-count of the layout.
126    pub fn row_count(&self) -> u64 {
127        match &self.0 {
128            Inner::Owned(owned) => owned.row_count,
129            Inner::Viewed(viewed) => viewed.flatbuffer().row_count(),
130        }
131    }
132
133    /// Return the data type of the layout.
134    pub fn dtype(&self) -> &DType {
135        match &self.0 {
136            Inner::Owned(owned) => &owned.dtype,
137            Inner::Viewed(viewed) => &viewed.dtype,
138        }
139    }
140
141    /// Returns the number of children of the layout.
142    pub fn nchildren(&self) -> usize {
143        match &self.0 {
144            Inner::Owned(owned) => owned.children.len(),
145            Inner::Viewed(viewed) => viewed
146                .flatbuffer()
147                .children()
148                .map_or(0, |children| children.len()),
149        }
150    }
151
152    /// Fetch the i'th child layout.
153    ///
154    /// ## Panics
155    ///
156    /// Panics if the child index is out of bounds.
157    pub fn child(&self, i: usize, dtype: DType, name: impl AsRef<str>) -> VortexResult<Layout> {
158        if i >= self.nchildren() {
159            vortex_panic!("child index out of bounds");
160        }
161        match &self.0 {
162            Inner::Owned(o) => {
163                let child = o.children[i].clone();
164                if child.dtype() != &dtype {
165                    vortex_bail!(
166                        "Child has dtype {}, but was requested with {}",
167                        child.dtype(),
168                        dtype
169                    );
170                }
171                Ok(child)
172            }
173            Inner::Viewed(v) => {
174                let fb = v
175                    .flatbuffer()
176                    .children()
177                    .vortex_expect("child bounds already checked")
178                    .get(i);
179                let encoding = v
180                    .ctx
181                    .lookup_encoding(fb.encoding())
182                    .ok_or_else(|| {
183                        vortex_err!("Child layout encoding {} not found", fb.encoding())
184                    })?
185                    .clone();
186
187                Ok(Self(Inner::Viewed(ViewedLayout {
188                    name: format!("{}.{}", v.name, name.as_ref()).into(),
189                    vtable: encoding,
190                    dtype,
191                    flatbuffer: v.flatbuffer.clone(),
192                    flatbuffer_loc: fb._tab.loc(),
193                    ctx: v.ctx.clone(),
194                })))
195            }
196        }
197    }
198
199    /// Fetch the row count of the i'th child layout.
200    ///
201    /// ## Panics
202    ///
203    /// Panics if the child index is out of bounds.
204    pub fn child_row_count(&self, i: usize) -> u64 {
205        if i >= self.nchildren() {
206            vortex_panic!("child index out of bounds");
207        }
208        match &self.0 {
209            Inner::Owned(o) => o.children[i].row_count(),
210            Inner::Viewed(v) => v
211                .flatbuffer()
212                .children()
213                .vortex_expect("child bounds already checked")
214                .get(i)
215                .row_count(),
216        }
217    }
218
219    /// Returns the number of segments in the layout.
220    pub fn nsegments(&self) -> usize {
221        match &self.0 {
222            Inner::Owned(owned) => owned.segments.len(),
223            Inner::Viewed(viewed) => viewed
224                .flatbuffer()
225                .segments()
226                .map_or(0, |segments| segments.len()),
227        }
228    }
229
230    /// Fetch the i'th segment id of the layout.
231    pub fn segment_id(&self, i: usize) -> Option<SegmentId> {
232        match &self.0 {
233            Inner::Owned(owned) => owned.segments.get(i).copied(),
234            Inner::Viewed(viewed) => viewed
235                .flatbuffer()
236                .segments()
237                .and_then(|segments| (i < segments.len()).then(|| segments.get(i)))
238                .map(SegmentId::from),
239        }
240    }
241
242    /// Iterate the segment IDs of the layout.
243    pub fn segments(&self) -> impl Iterator<Item = SegmentId> + '_ {
244        (0..self.nsegments()).map(move |i| self.segment_id(i).vortex_expect("segment bounds"))
245    }
246
247    /// Returns the bytes of the metadata stored in the layout's flatbuffer.
248    pub fn metadata(&self) -> Option<Bytes> {
249        match &self.0 {
250            Inner::Owned(owned) => owned.metadata.clone(),
251            Inner::Viewed(viewed) => viewed.flatbuffer().metadata().map(|m| {
252                // Return the metadata bytes zero-copy by finding them in the flatbuffer.
253                viewed.flatbuffer.as_ref().inner().slice_ref(m.bytes())
254            }),
255        }
256    }
257
258    /// Create a reader for this layout.
259    pub fn reader(
260        &self,
261        segment_reader: Arc<dyn AsyncSegmentReader>,
262        ctx: ArrayContext,
263    ) -> VortexResult<Arc<dyn LayoutReader>> {
264        self.vtable().reader(self.clone(), ctx, segment_reader)
265    }
266
267    /// Register splits for this layout.
268    pub fn register_splits(
269        &self,
270        field_mask: &[FieldMask],
271        row_offset: u64,
272        splits: &mut BTreeSet<u64>,
273    ) -> VortexResult<()> {
274        self.vtable()
275            .register_splits(self, field_mask, row_offset, splits)
276    }
277
278    /// Registers matching segments to the given filter and projection field mask.
279    pub fn required_segments(
280        &self,
281        row_offset: u64,
282        filter_field_mask: &[FieldMask],
283        projection_field_mask: &[FieldMask],
284        segments: &mut SegmentCollector,
285    ) -> VortexResult<()> {
286        self.vtable().required_segments(
287            self,
288            row_offset,
289            filter_field_mask,
290            projection_field_mask,
291            segments,
292        )
293    }
294
295    /// Serialize the layout into a [`FlatBufferBuilder`].
296    pub fn write_flatbuffer<'fbb>(
297        &self,
298        fbb: &mut FlatBufferBuilder<'fbb>,
299        ctx: &LayoutContext,
300    ) -> WIPOffset<layout::Layout<'fbb>> {
301        LayoutFlatBufferWriter { layout: self, ctx }.write_flatbuffer(fbb)
302    }
303}
304
305/// An adapter struct for writing a layout to a FlatBuffer.
306struct LayoutFlatBufferWriter<'a> {
307    layout: &'a Layout,
308    ctx: &'a LayoutContext,
309}
310
311impl FlatBufferRoot for LayoutFlatBufferWriter<'_> {}
312
313impl WriteFlatBuffer for LayoutFlatBufferWriter<'_> {
314    type Target<'t> = layout::Layout<'t>;
315
316    fn write_flatbuffer<'fb>(
317        &self,
318        fbb: &mut FlatBufferBuilder<'fb>,
319    ) -> WIPOffset<Self::Target<'fb>> {
320        match &self.layout.0 {
321            Inner::Owned(layout) => {
322                let metadata = layout.metadata.as_ref().map(|b| fbb.create_vector(b));
323
324                let children = (!layout.children.is_empty()).then(|| {
325                    layout
326                        .children
327                        .iter()
328                        .map(|c| {
329                            LayoutFlatBufferWriter {
330                                layout: c,
331                                ctx: self.ctx,
332                            }
333                            .write_flatbuffer(fbb)
334                        })
335                        .collect::<Vec<_>>()
336                });
337
338                let children = children.map(|c| fbb.create_vector(&c));
339                let segments = (!layout.segments.is_empty()).then(|| {
340                    layout
341                        .segments
342                        .iter()
343                        .map(|s| s.deref())
344                        .copied()
345                        .collect::<Vec<u32>>()
346                });
347                let segments = segments.map(|m| fbb.create_vector(&m));
348
349                let encoding_idx = self.ctx.encoding_idx(&layout.vtable);
350
351                layout::Layout::create(
352                    fbb,
353                    &layout::LayoutArgs {
354                        encoding: encoding_idx,
355                        row_count: layout.row_count,
356                        metadata,
357                        children,
358                        segments,
359                    },
360                )
361            }
362            Inner::Viewed(layout) => LayoutFlatBuffer(layout.flatbuffer()).write_flatbuffer(fbb),
363        }
364    }
365}
366
367struct LayoutFlatBuffer<'l>(layout::Layout<'l>);
368
369impl WriteFlatBuffer for LayoutFlatBuffer<'_> {
370    type Target<'a> = layout::Layout<'a>;
371
372    fn write_flatbuffer<'fb>(
373        &self,
374        fbb: &mut FlatBufferBuilder<'fb>,
375    ) -> WIPOffset<Self::Target<'fb>> {
376        let metadata = self.0.metadata().map(|m| fbb.create_vector(m.bytes()));
377        let children = self.0.children().map(|c| {
378            c.iter()
379                .map(|child| LayoutFlatBuffer(child).write_flatbuffer(fbb))
380                .collect::<Vec<_>>()
381        });
382        let children = children.map(|c| fbb.create_vector(&c));
383        let segments = self
384            .0
385            .segments()
386            .map(|m| fbb.create_vector_from_iter(m.iter()));
387
388        layout::Layout::create(
389            fbb,
390            &layout::LayoutArgs {
391                encoding: self.0.encoding(),
392                row_count: self.0.row_count(),
393                metadata,
394                children,
395                segments,
396            },
397        )
398    }
399}