1use std::collections::BTreeSet;
2use std::ops::Deref;
3use std::sync::Arc;
4
5use bytes::Bytes;
6use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset};
7use vortex_array::ArrayContext;
8use vortex_dtype::{DType, FieldMask};
9use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic};
10use vortex_flatbuffers::{FlatBuffer, FlatBufferRoot, WriteFlatBuffer, layout};
11
12use crate::LayoutId;
13use crate::context::LayoutContext;
14use crate::reader::LayoutReader;
15use crate::segments::{AsyncSegmentReader, SegmentCollector, SegmentId};
16use crate::vtable::LayoutVTableRef;
17
18#[derive(Debug, Clone)]
21pub struct Layout(Inner);
22
23#[derive(Debug, Clone)]
24enum Inner {
25 Owned(OwnedLayout),
26 Viewed(ViewedLayout),
27}
28
29#[derive(Debug, Clone)]
31pub struct OwnedLayout {
32 name: Arc<str>,
33 vtable: LayoutVTableRef,
34 dtype: DType,
35 row_count: u64,
36 segments: Vec<SegmentId>,
37 children: Vec<Layout>,
38 metadata: Option<Bytes>,
39}
40
41#[derive(Debug, Clone)]
43struct ViewedLayout {
44 name: Arc<str>,
45 vtable: LayoutVTableRef,
46 dtype: DType,
47 flatbuffer: FlatBuffer,
48 flatbuffer_loc: usize,
49 ctx: LayoutContext,
50}
51
52impl ViewedLayout {
53 fn flatbuffer(&self) -> layout::Layout<'_> {
55 unsafe { layout::Layout::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
56 }
57}
58
59impl Layout {
60 pub fn new_owned(
62 name: Arc<str>,
63 vtable: LayoutVTableRef,
64 dtype: DType,
65 row_count: u64,
66 segments: Vec<SegmentId>,
67 children: Vec<Layout>,
68 metadata: Option<Bytes>,
69 ) -> Self {
70 Self(Inner::Owned(OwnedLayout {
71 name,
72 vtable,
73 dtype,
74 row_count,
75 segments,
76 children,
77 metadata,
78 }))
79 }
80
81 pub unsafe fn new_viewed_unchecked(
87 name: Arc<str>,
88 encoding: LayoutVTableRef,
89 dtype: DType,
90 flatbuffer: FlatBuffer,
91 flatbuffer_loc: usize,
92 ctx: LayoutContext,
93 ) -> Self {
94 Self(Inner::Viewed(ViewedLayout {
95 name,
96 vtable: encoding,
97 dtype,
98 flatbuffer,
99 flatbuffer_loc,
100 ctx,
101 }))
102 }
103
104 pub fn name(&self) -> &str {
106 match &self.0 {
107 Inner::Owned(owned) => owned.name.as_ref(),
108 Inner::Viewed(viewed) => viewed.name.as_ref(),
109 }
110 }
111
112 pub fn vtable(&self) -> &LayoutVTableRef {
114 match &self.0 {
115 Inner::Owned(owned) => &owned.vtable,
116 Inner::Viewed(viewed) => &viewed.vtable,
117 }
118 }
119
120 pub fn id(&self) -> LayoutId {
122 self.vtable().id()
123 }
124
125 pub fn row_count(&self) -> u64 {
127 match &self.0 {
128 Inner::Owned(owned) => owned.row_count,
129 Inner::Viewed(viewed) => viewed.flatbuffer().row_count(),
130 }
131 }
132
133 pub fn dtype(&self) -> &DType {
135 match &self.0 {
136 Inner::Owned(owned) => &owned.dtype,
137 Inner::Viewed(viewed) => &viewed.dtype,
138 }
139 }
140
141 pub fn nchildren(&self) -> usize {
143 match &self.0 {
144 Inner::Owned(owned) => owned.children.len(),
145 Inner::Viewed(viewed) => viewed
146 .flatbuffer()
147 .children()
148 .map_or(0, |children| children.len()),
149 }
150 }
151
152 pub fn child(&self, i: usize, dtype: DType, name: impl AsRef<str>) -> VortexResult<Layout> {
158 if i >= self.nchildren() {
159 vortex_panic!("child index out of bounds");
160 }
161 match &self.0 {
162 Inner::Owned(o) => {
163 let child = o.children[i].clone();
164 if child.dtype() != &dtype {
165 vortex_bail!(
166 "Child has dtype {}, but was requested with {}",
167 child.dtype(),
168 dtype
169 );
170 }
171 Ok(child)
172 }
173 Inner::Viewed(v) => {
174 let fb = v
175 .flatbuffer()
176 .children()
177 .vortex_expect("child bounds already checked")
178 .get(i);
179 let encoding = v
180 .ctx
181 .lookup_encoding(fb.encoding())
182 .ok_or_else(|| {
183 vortex_err!("Child layout encoding {} not found", fb.encoding())
184 })?
185 .clone();
186
187 Ok(Self(Inner::Viewed(ViewedLayout {
188 name: format!("{}.{}", v.name, name.as_ref()).into(),
189 vtable: encoding,
190 dtype,
191 flatbuffer: v.flatbuffer.clone(),
192 flatbuffer_loc: fb._tab.loc(),
193 ctx: v.ctx.clone(),
194 })))
195 }
196 }
197 }
198
199 pub fn child_row_count(&self, i: usize) -> u64 {
205 if i >= self.nchildren() {
206 vortex_panic!("child index out of bounds");
207 }
208 match &self.0 {
209 Inner::Owned(o) => o.children[i].row_count(),
210 Inner::Viewed(v) => v
211 .flatbuffer()
212 .children()
213 .vortex_expect("child bounds already checked")
214 .get(i)
215 .row_count(),
216 }
217 }
218
219 pub fn nsegments(&self) -> usize {
221 match &self.0 {
222 Inner::Owned(owned) => owned.segments.len(),
223 Inner::Viewed(viewed) => viewed
224 .flatbuffer()
225 .segments()
226 .map_or(0, |segments| segments.len()),
227 }
228 }
229
230 pub fn segment_id(&self, i: usize) -> Option<SegmentId> {
232 match &self.0 {
233 Inner::Owned(owned) => owned.segments.get(i).copied(),
234 Inner::Viewed(viewed) => viewed
235 .flatbuffer()
236 .segments()
237 .and_then(|segments| (i < segments.len()).then(|| segments.get(i)))
238 .map(SegmentId::from),
239 }
240 }
241
242 pub fn segments(&self) -> impl Iterator<Item = SegmentId> + '_ {
244 (0..self.nsegments()).map(move |i| self.segment_id(i).vortex_expect("segment bounds"))
245 }
246
247 pub fn metadata(&self) -> Option<Bytes> {
249 match &self.0 {
250 Inner::Owned(owned) => owned.metadata.clone(),
251 Inner::Viewed(viewed) => viewed.flatbuffer().metadata().map(|m| {
252 viewed.flatbuffer.as_ref().inner().slice_ref(m.bytes())
254 }),
255 }
256 }
257
258 pub fn reader(
260 &self,
261 segment_reader: Arc<dyn AsyncSegmentReader>,
262 ctx: ArrayContext,
263 ) -> VortexResult<Arc<dyn LayoutReader>> {
264 self.vtable().reader(self.clone(), ctx, segment_reader)
265 }
266
267 pub fn register_splits(
269 &self,
270 field_mask: &[FieldMask],
271 row_offset: u64,
272 splits: &mut BTreeSet<u64>,
273 ) -> VortexResult<()> {
274 self.vtable()
275 .register_splits(self, field_mask, row_offset, splits)
276 }
277
278 pub fn required_segments(
280 &self,
281 row_offset: u64,
282 filter_field_mask: &[FieldMask],
283 projection_field_mask: &[FieldMask],
284 segments: &mut SegmentCollector,
285 ) -> VortexResult<()> {
286 self.vtable().required_segments(
287 self,
288 row_offset,
289 filter_field_mask,
290 projection_field_mask,
291 segments,
292 )
293 }
294
295 pub fn write_flatbuffer<'fbb>(
297 &self,
298 fbb: &mut FlatBufferBuilder<'fbb>,
299 ctx: &LayoutContext,
300 ) -> WIPOffset<layout::Layout<'fbb>> {
301 LayoutFlatBufferWriter { layout: self, ctx }.write_flatbuffer(fbb)
302 }
303}
304
305struct LayoutFlatBufferWriter<'a> {
307 layout: &'a Layout,
308 ctx: &'a LayoutContext,
309}
310
311impl FlatBufferRoot for LayoutFlatBufferWriter<'_> {}
312
313impl WriteFlatBuffer for LayoutFlatBufferWriter<'_> {
314 type Target<'t> = layout::Layout<'t>;
315
316 fn write_flatbuffer<'fb>(
317 &self,
318 fbb: &mut FlatBufferBuilder<'fb>,
319 ) -> WIPOffset<Self::Target<'fb>> {
320 match &self.layout.0 {
321 Inner::Owned(layout) => {
322 let metadata = layout.metadata.as_ref().map(|b| fbb.create_vector(b));
323
324 let children = (!layout.children.is_empty()).then(|| {
325 layout
326 .children
327 .iter()
328 .map(|c| {
329 LayoutFlatBufferWriter {
330 layout: c,
331 ctx: self.ctx,
332 }
333 .write_flatbuffer(fbb)
334 })
335 .collect::<Vec<_>>()
336 });
337
338 let children = children.map(|c| fbb.create_vector(&c));
339 let segments = (!layout.segments.is_empty()).then(|| {
340 layout
341 .segments
342 .iter()
343 .map(|s| s.deref())
344 .copied()
345 .collect::<Vec<u32>>()
346 });
347 let segments = segments.map(|m| fbb.create_vector(&m));
348
349 let encoding_idx = self.ctx.encoding_idx(&layout.vtable);
350
351 layout::Layout::create(
352 fbb,
353 &layout::LayoutArgs {
354 encoding: encoding_idx,
355 row_count: layout.row_count,
356 metadata,
357 children,
358 segments,
359 },
360 )
361 }
362 Inner::Viewed(layout) => LayoutFlatBuffer(layout.flatbuffer()).write_flatbuffer(fbb),
363 }
364 }
365}
366
367struct LayoutFlatBuffer<'l>(layout::Layout<'l>);
368
369impl WriteFlatBuffer for LayoutFlatBuffer<'_> {
370 type Target<'a> = layout::Layout<'a>;
371
372 fn write_flatbuffer<'fb>(
373 &self,
374 fbb: &mut FlatBufferBuilder<'fb>,
375 ) -> WIPOffset<Self::Target<'fb>> {
376 let metadata = self.0.metadata().map(|m| fbb.create_vector(m.bytes()));
377 let children = self.0.children().map(|c| {
378 c.iter()
379 .map(|child| LayoutFlatBuffer(child).write_flatbuffer(fbb))
380 .collect::<Vec<_>>()
381 });
382 let children = children.map(|c| fbb.create_vector(&c));
383 let segments = self
384 .0
385 .segments()
386 .map(|m| fbb.create_vector_from_iter(m.iter()));
387
388 layout::Layout::create(
389 fbb,
390 &layout::LayoutArgs {
391 encoding: self.0.encoding(),
392 row_count: self.0.row_count(),
393 metadata,
394 children,
395 segments,
396 },
397 )
398 }
399}