1use std::collections::BTreeSet;
2use std::ops::Deref;
3use std::sync::Arc;
4
5use bytes::Bytes;
6use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset};
7use vortex_array::ArrayContext;
8use vortex_dtype::{DType, FieldMask};
9use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic};
10use vortex_flatbuffers::{FlatBuffer, FlatBufferRoot, WriteFlatBuffer, layout};
11
12use crate::LayoutId;
13use crate::context::LayoutContext;
14use crate::reader::LayoutReader;
15use crate::segments::{SegmentId, SegmentSource};
16use crate::vtable::LayoutVTableRef;
17
18#[derive(Debug, Clone)]
21pub struct Layout(Inner);
22
23#[derive(Debug, Clone)]
24enum Inner {
25 Owned(OwnedLayout),
26 Viewed(ViewedLayout),
27}
28
29#[derive(Debug, Clone)]
31pub struct OwnedLayout {
32 name: Arc<str>,
33 vtable: LayoutVTableRef,
34 dtype: DType,
35 row_count: u64,
36 segments: Vec<SegmentId>,
37 children: Vec<Layout>,
38 metadata: Option<Bytes>,
39}
40
41#[derive(Debug, Clone)]
43struct ViewedLayout {
44 name: Arc<str>,
45 vtable: LayoutVTableRef,
46 dtype: DType,
47 flatbuffer: FlatBuffer,
48 flatbuffer_loc: usize,
49 ctx: LayoutContext,
50}
51
52impl ViewedLayout {
53 fn flatbuffer(&self) -> layout::Layout<'_> {
55 unsafe { layout::Layout::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
56 }
57}
58
59impl Layout {
60 pub fn new_owned(
62 name: Arc<str>,
63 vtable: LayoutVTableRef,
64 dtype: DType,
65 row_count: u64,
66 segments: Vec<SegmentId>,
67 children: Vec<Layout>,
68 metadata: Option<Bytes>,
69 ) -> Self {
70 Self(Inner::Owned(OwnedLayout {
71 name,
72 vtable,
73 dtype,
74 row_count,
75 segments,
76 children,
77 metadata,
78 }))
79 }
80
81 pub unsafe fn new_viewed_unchecked(
87 name: Arc<str>,
88 encoding: LayoutVTableRef,
89 dtype: DType,
90 flatbuffer: FlatBuffer,
91 flatbuffer_loc: usize,
92 ctx: LayoutContext,
93 ) -> Self {
94 Self(Inner::Viewed(ViewedLayout {
95 name,
96 vtable: encoding,
97 dtype,
98 flatbuffer,
99 flatbuffer_loc,
100 ctx,
101 }))
102 }
103
104 pub fn name(&self) -> &Arc<str> {
106 match &self.0 {
107 Inner::Owned(owned) => &owned.name,
108 Inner::Viewed(viewed) => &viewed.name,
109 }
110 }
111
112 pub fn vtable(&self) -> &LayoutVTableRef {
114 match &self.0 {
115 Inner::Owned(owned) => &owned.vtable,
116 Inner::Viewed(viewed) => &viewed.vtable,
117 }
118 }
119
120 pub fn id(&self) -> LayoutId {
122 self.vtable().id()
123 }
124
125 pub fn row_count(&self) -> u64 {
127 match &self.0 {
128 Inner::Owned(owned) => owned.row_count,
129 Inner::Viewed(viewed) => viewed.flatbuffer().row_count(),
130 }
131 }
132
133 pub fn dtype(&self) -> &DType {
135 match &self.0 {
136 Inner::Owned(owned) => &owned.dtype,
137 Inner::Viewed(viewed) => &viewed.dtype,
138 }
139 }
140
141 pub fn nchildren(&self) -> usize {
143 match &self.0 {
144 Inner::Owned(owned) => owned.children.len(),
145 Inner::Viewed(viewed) => viewed
146 .flatbuffer()
147 .children()
148 .map_or(0, |children| children.len()),
149 }
150 }
151
152 pub fn child(&self, i: usize, dtype: DType, name: impl AsRef<str>) -> VortexResult<Layout> {
158 if i >= self.nchildren() {
159 vortex_panic!("child index out of bounds");
160 }
161 match &self.0 {
162 Inner::Owned(o) => {
163 let child = o.children[i].clone();
164 if child.dtype() != &dtype {
165 vortex_bail!(
166 "Child has dtype {}, but was requested with {}",
167 child.dtype(),
168 dtype
169 );
170 }
171 Ok(child)
172 }
173 Inner::Viewed(v) => {
174 let fb = v
175 .flatbuffer()
176 .children()
177 .vortex_expect("child bounds already checked")
178 .get(i);
179 let encoding = v
180 .ctx
181 .lookup_encoding(fb.encoding())
182 .ok_or_else(|| {
183 vortex_err!("Child layout encoding {} not found", fb.encoding())
184 })?
185 .clone();
186
187 Ok(Self(Inner::Viewed(ViewedLayout {
188 name: format!("{}.{}", v.name, name.as_ref()).into(),
189 vtable: encoding,
190 dtype,
191 flatbuffer: v.flatbuffer.clone(),
192 flatbuffer_loc: fb._tab.loc(),
193 ctx: v.ctx.clone(),
194 })))
195 }
196 }
197 }
198
199 pub fn child_row_count(&self, i: usize) -> u64 {
205 if i >= self.nchildren() {
206 vortex_panic!("child index out of bounds");
207 }
208 match &self.0 {
209 Inner::Owned(o) => o.children[i].row_count(),
210 Inner::Viewed(v) => v
211 .flatbuffer()
212 .children()
213 .vortex_expect("child bounds already checked")
214 .get(i)
215 .row_count(),
216 }
217 }
218
219 pub fn nsegments(&self) -> usize {
221 match &self.0 {
222 Inner::Owned(owned) => owned.segments.len(),
223 Inner::Viewed(viewed) => viewed
224 .flatbuffer()
225 .segments()
226 .map_or(0, |segments| segments.len()),
227 }
228 }
229
230 pub fn segment_id(&self, i: usize) -> Option<SegmentId> {
232 match &self.0 {
233 Inner::Owned(owned) => owned.segments.get(i).copied(),
234 Inner::Viewed(viewed) => viewed
235 .flatbuffer()
236 .segments()
237 .and_then(|segments| (i < segments.len()).then(|| segments.get(i)))
238 .map(SegmentId::from),
239 }
240 }
241
242 pub fn segments(&self) -> impl Iterator<Item = SegmentId> + '_ {
244 (0..self.nsegments()).map(move |i| self.segment_id(i).vortex_expect("segment bounds"))
245 }
246
247 pub fn metadata(&self) -> Option<Bytes> {
249 match &self.0 {
250 Inner::Owned(owned) => owned.metadata.clone(),
251 Inner::Viewed(viewed) => viewed.flatbuffer().metadata().map(|m| {
252 viewed.flatbuffer.as_ref().inner().slice_ref(m.bytes())
254 }),
255 }
256 }
257
258 pub fn reader(
260 &self,
261 segment_source: &Arc<dyn SegmentSource>,
262 ctx: &ArrayContext,
263 ) -> VortexResult<Arc<dyn LayoutReader>> {
264 self.vtable().reader(self.clone(), segment_source, ctx)
265 }
266
267 pub fn register_splits(
269 &self,
270 field_mask: &[FieldMask],
271 row_offset: u64,
272 splits: &mut BTreeSet<u64>,
273 ) -> VortexResult<()> {
274 self.vtable()
275 .register_splits(self, field_mask, row_offset, splits)
276 }
277
278 pub fn write_flatbuffer<'fbb>(
280 &self,
281 fbb: &mut FlatBufferBuilder<'fbb>,
282 ctx: &LayoutContext,
283 ) -> WIPOffset<layout::Layout<'fbb>> {
284 LayoutFlatBufferWriter { layout: self, ctx }.write_flatbuffer(fbb)
285 }
286}
287
288struct LayoutFlatBufferWriter<'a> {
290 layout: &'a Layout,
291 ctx: &'a LayoutContext,
292}
293
294impl FlatBufferRoot for LayoutFlatBufferWriter<'_> {}
295
296impl WriteFlatBuffer for LayoutFlatBufferWriter<'_> {
297 type Target<'t> = layout::Layout<'t>;
298
299 fn write_flatbuffer<'fb>(
300 &self,
301 fbb: &mut FlatBufferBuilder<'fb>,
302 ) -> WIPOffset<Self::Target<'fb>> {
303 match &self.layout.0 {
304 Inner::Owned(layout) => {
305 let metadata = layout.metadata.as_ref().map(|b| fbb.create_vector(b));
306
307 let children = (!layout.children.is_empty()).then(|| {
308 layout
309 .children
310 .iter()
311 .map(|c| {
312 LayoutFlatBufferWriter {
313 layout: c,
314 ctx: self.ctx,
315 }
316 .write_flatbuffer(fbb)
317 })
318 .collect::<Vec<_>>()
319 });
320
321 let children = children.map(|c| fbb.create_vector(&c));
322 let segments = (!layout.segments.is_empty()).then(|| {
323 layout
324 .segments
325 .iter()
326 .map(|s| s.deref())
327 .copied()
328 .collect::<Vec<u32>>()
329 });
330 let segments = segments.map(|m| fbb.create_vector(&m));
331
332 let encoding_idx = self.ctx.encoding_idx(&layout.vtable);
333
334 layout::Layout::create(
335 fbb,
336 &layout::LayoutArgs {
337 encoding: encoding_idx,
338 row_count: layout.row_count,
339 metadata,
340 children,
341 segments,
342 },
343 )
344 }
345 Inner::Viewed(layout) => LayoutFlatBuffer(layout.flatbuffer()).write_flatbuffer(fbb),
346 }
347 }
348}
349
350struct LayoutFlatBuffer<'l>(layout::Layout<'l>);
351
352impl WriteFlatBuffer for LayoutFlatBuffer<'_> {
353 type Target<'a> = layout::Layout<'a>;
354
355 fn write_flatbuffer<'fb>(
356 &self,
357 fbb: &mut FlatBufferBuilder<'fb>,
358 ) -> WIPOffset<Self::Target<'fb>> {
359 let metadata = self.0.metadata().map(|m| fbb.create_vector(m.bytes()));
360 let children = self.0.children().map(|c| {
361 c.iter()
362 .map(|child| LayoutFlatBuffer(child).write_flatbuffer(fbb))
363 .collect::<Vec<_>>()
364 });
365 let children = children.map(|c| fbb.create_vector(&c));
366 let segments = self
367 .0
368 .segments()
369 .map(|m| fbb.create_vector_from_iter(m.iter()));
370
371 layout::Layout::create(
372 fbb,
373 &layout::LayoutArgs {
374 encoding: self.0.encoding(),
375 row_count: self.0.row_count(),
376 metadata,
377 children,
378 segments,
379 },
380 )
381 }
382}