1use std::collections::BTreeSet;
2use std::ops::Deref;
3use std::sync::Arc;
4
5use bytes::Bytes;
6use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset};
7use vortex_array::ArrayContext;
8use vortex_dtype::{DType, FieldMask};
9use vortex_error::{VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic};
10use vortex_flatbuffers::{FlatBuffer, FlatBufferRoot, WriteFlatBuffer, layout};
11
12use crate::LayoutId;
13use crate::context::LayoutContext;
14use crate::reader::LayoutReader;
15use crate::segments::{SegmentId, SegmentSource};
16use crate::vtable::LayoutVTableRef;
17
18#[derive(Debug, Clone)]
21pub struct Layout(Inner);
22
23#[derive(Debug, Clone)]
24enum Inner {
25 Owned(OwnedLayout),
26 Viewed(ViewedLayout),
27}
28
29#[derive(Debug, Clone)]
31pub struct OwnedLayout {
32 name: Arc<str>,
33 vtable: LayoutVTableRef,
34 dtype: DType,
35 row_count: u64,
36 segments: Vec<SegmentId>,
37 children: Vec<Layout>,
38 metadata: Option<Bytes>,
39}
40
41#[derive(Debug, Clone)]
43struct ViewedLayout {
44 name: Arc<str>,
45 vtable: LayoutVTableRef,
46 dtype: DType,
47 flatbuffer: FlatBuffer,
48 flatbuffer_loc: usize,
49 ctx: LayoutContext,
50}
51
52impl ViewedLayout {
53 fn flatbuffer(&self) -> layout::Layout<'_> {
55 unsafe { layout::Layout::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
56 }
57}
58
59impl Layout {
60 pub fn new_owned(
62 name: Arc<str>,
63 vtable: LayoutVTableRef,
64 dtype: DType,
65 row_count: u64,
66 segments: Vec<SegmentId>,
67 children: Vec<Layout>,
68 metadata: Option<Bytes>,
69 ) -> Self {
70 Self(Inner::Owned(OwnedLayout {
71 name,
72 vtable,
73 dtype,
74 row_count,
75 segments,
76 children,
77 metadata,
78 }))
79 }
80
81 pub unsafe fn new_viewed_unchecked(
87 name: Arc<str>,
88 encoding: LayoutVTableRef,
89 dtype: DType,
90 flatbuffer: FlatBuffer,
91 flatbuffer_loc: usize,
92 ctx: LayoutContext,
93 ) -> Self {
94 Self(Inner::Viewed(ViewedLayout {
95 name,
96 vtable: encoding,
97 dtype,
98 flatbuffer,
99 flatbuffer_loc,
100 ctx,
101 }))
102 }
103
104 pub fn name(&self) -> &Arc<str> {
106 match &self.0 {
107 Inner::Owned(owned) => &owned.name,
108 Inner::Viewed(viewed) => &viewed.name,
109 }
110 }
111
112 pub fn vtable(&self) -> &LayoutVTableRef {
114 match &self.0 {
115 Inner::Owned(owned) => &owned.vtable,
116 Inner::Viewed(viewed) => &viewed.vtable,
117 }
118 }
119
120 pub fn id(&self) -> LayoutId {
122 self.vtable().id()
123 }
124
125 pub fn row_count(&self) -> u64 {
127 match &self.0 {
128 Inner::Owned(owned) => owned.row_count,
129 Inner::Viewed(viewed) => viewed.flatbuffer().row_count(),
130 }
131 }
132
133 pub fn dtype(&self) -> &DType {
135 match &self.0 {
136 Inner::Owned(owned) => &owned.dtype,
137 Inner::Viewed(viewed) => &viewed.dtype,
138 }
139 }
140
141 pub fn nchildren(&self) -> usize {
143 match &self.0 {
144 Inner::Owned(owned) => owned.children.len(),
145 Inner::Viewed(viewed) => viewed
146 .flatbuffer()
147 .children()
148 .map_or(0, |children| children.len()),
149 }
150 }
151
152 pub fn child(&self, i: usize, dtype: DType, name: impl AsRef<str>) -> VortexResult<Layout> {
158 if i >= self.nchildren() {
159 vortex_panic!("child index out of bounds");
160 }
161 match &self.0 {
162 Inner::Owned(o) => {
163 let child = o.children[i].clone();
164 if child.dtype() != &dtype {
165 vortex_bail!(
166 "Child has dtype {}, but was requested with {}",
167 child.dtype(),
168 dtype
169 );
170 }
171 Ok(child)
172 }
173 Inner::Viewed(v) => {
174 let fb = v
175 .flatbuffer()
176 .children()
177 .vortex_expect("child bounds already checked")
178 .get(i);
179 let encoding = v
180 .ctx
181 .lookup_encoding(fb.encoding())
182 .ok_or_else(|| {
183 vortex_err!("Child layout encoding {} not found", fb.encoding())
184 })?
185 .clone();
186
187 Ok(Self(Inner::Viewed(ViewedLayout {
188 name: format!("{}.{}", v.name, name.as_ref()).into(),
189 vtable: encoding,
190 dtype,
191 flatbuffer: v.flatbuffer.clone(),
192 flatbuffer_loc: fb._tab.loc(),
193 ctx: v.ctx.clone(),
194 })))
195 }
196 }
197 }
198
199 pub fn child_row_count(&self, i: usize) -> u64 {
205 if i >= self.nchildren() {
206 vortex_panic!("child index out of bounds");
207 }
208 match &self.0 {
209 Inner::Owned(o) => o.children[i].row_count(),
210 Inner::Viewed(v) => v
211 .flatbuffer()
212 .children()
213 .vortex_expect("child bounds already checked")
214 .get(i)
215 .row_count(),
216 }
217 }
218
219 pub fn nsegments(&self) -> usize {
221 match &self.0 {
222 Inner::Owned(owned) => owned.segments.len(),
223 Inner::Viewed(viewed) => viewed
224 .flatbuffer()
225 .segments()
226 .map_or(0, |segments| segments.len()),
227 }
228 }
229
230 pub fn segment_id(&self, i: usize) -> Option<SegmentId> {
232 match &self.0 {
233 Inner::Owned(owned) => owned.segments.get(i).copied(),
234 Inner::Viewed(viewed) => viewed
235 .flatbuffer()
236 .segments()
237 .and_then(|segments| (i < segments.len()).then(|| segments.get(i)))
238 .map(SegmentId::from),
239 }
240 }
241
242 pub fn segments(&self) -> impl Iterator<Item = SegmentId> + '_ {
244 (0..self.nsegments()).map(move |i| self.segment_id(i).vortex_expect("segment bounds"))
245 }
246
247 pub fn metadata(&self) -> Option<Bytes> {
249 match &self.0 {
250 Inner::Owned(owned) => owned.metadata.clone(),
251 Inner::Viewed(viewed) => viewed.flatbuffer().metadata().map(|m| {
252 viewed.flatbuffer.as_ref().inner().slice_ref(m.bytes())
254 }),
255 }
256 }
257
258 pub fn reader(
260 &self,
261 segment_source: &Arc<dyn SegmentSource>,
262 ctx: &ArrayContext,
263 ) -> VortexResult<Arc<dyn LayoutReader>> {
264 self.vtable().reader(self.clone(), segment_source, ctx)
265 }
266
267 pub fn register_splits(
269 &self,
270 field_mask: &[FieldMask],
271 row_offset: u64,
272 splits: &mut BTreeSet<u64>,
273 ) -> VortexResult<()> {
274 self.vtable()
275 .register_splits(self, field_mask, row_offset, splits)
276 }
277
278 pub fn flatbuffer_writer<'a>(
280 &'a self,
281 ctx: &'a LayoutContext,
282 ) -> impl WriteFlatBuffer<Target<'a> = layout::Layout<'a>> + FlatBufferRoot + 'a {
283 LayoutFlatBufferWriter { layout: self, ctx }
284 }
285}
286
287struct LayoutFlatBufferWriter<'a> {
289 layout: &'a Layout,
290 ctx: &'a LayoutContext,
291}
292
293impl FlatBufferRoot for LayoutFlatBufferWriter<'_> {}
294
295impl WriteFlatBuffer for LayoutFlatBufferWriter<'_> {
296 type Target<'t> = layout::Layout<'t>;
297
298 fn write_flatbuffer<'fb>(
299 &self,
300 fbb: &mut FlatBufferBuilder<'fb>,
301 ) -> WIPOffset<Self::Target<'fb>> {
302 match &self.layout.0 {
303 Inner::Owned(layout) => {
304 let metadata = layout.metadata.as_ref().map(|b| fbb.create_vector(b));
305
306 let children = (!layout.children.is_empty()).then(|| {
307 layout
308 .children
309 .iter()
310 .map(|c| {
311 LayoutFlatBufferWriter {
312 layout: c,
313 ctx: self.ctx,
314 }
315 .write_flatbuffer(fbb)
316 })
317 .collect::<Vec<_>>()
318 });
319
320 let children = children.map(|c| fbb.create_vector(&c));
321 let segments = (!layout.segments.is_empty()).then(|| {
322 layout
323 .segments
324 .iter()
325 .map(|s| s.deref())
326 .copied()
327 .collect::<Vec<u32>>()
328 });
329 let segments = segments.map(|m| fbb.create_vector(&m));
330
331 let encoding_idx = self.ctx.encoding_idx(&layout.vtable);
332
333 layout::Layout::create(
334 fbb,
335 &layout::LayoutArgs {
336 encoding: encoding_idx,
337 row_count: layout.row_count,
338 metadata,
339 children,
340 segments,
341 },
342 )
343 }
344 Inner::Viewed(layout) => LayoutFlatBuffer(layout.flatbuffer()).write_flatbuffer(fbb),
345 }
346 }
347}
348
349struct LayoutFlatBuffer<'l>(layout::Layout<'l>);
350
351impl WriteFlatBuffer for LayoutFlatBuffer<'_> {
352 type Target<'a> = layout::Layout<'a>;
353
354 fn write_flatbuffer<'fb>(
355 &self,
356 fbb: &mut FlatBufferBuilder<'fb>,
357 ) -> WIPOffset<Self::Target<'fb>> {
358 let metadata = self.0.metadata().map(|m| fbb.create_vector(m.bytes()));
359 let children = self.0.children().map(|c| {
360 c.iter()
361 .map(|child| LayoutFlatBuffer(child).write_flatbuffer(fbb))
362 .collect::<Vec<_>>()
363 });
364 let children = children.map(|c| fbb.create_vector(&c));
365 let segments = self
366 .0
367 .segments()
368 .map(|m| fbb.create_vector_from_iter(m.iter()));
369
370 layout::Layout::create(
371 fbb,
372 &layout::LayoutArgs {
373 encoding: self.0.encoding(),
374 row_count: self.0.row_count(),
375 metadata,
376 children,
377 segments,
378 },
379 )
380 }
381}