vortex_serde/file/read/layouts/
columnar.rs1use std::collections::BTreeSet;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use itertools::Itertools;
6use vortex_dtype::field::Field;
7use vortex_dtype::DType;
8use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexResult};
9use vortex_expr::{Column, Select};
10use vortex_flatbuffers::footer;
11
12use crate::file::read::cache::{LazilyDeserializedDType, RelativeLayoutCache};
13use crate::file::read::column_batch::ColumnBatchReader;
14use crate::file::read::expr_project::expr_project;
15use crate::file::read::mask::RowMask;
16use crate::file::{
17 BatchRead, LayoutDeserializer, LayoutId, LayoutReader, LayoutSpec, RowFilter, Scan,
18 COLUMNAR_LAYOUT_ID,
19};
20
21#[derive(Debug)]
22pub struct ColumnarLayoutSpec;
23
24impl LayoutSpec for ColumnarLayoutSpec {
25 fn id(&self) -> LayoutId {
26 COLUMNAR_LAYOUT_ID
27 }
28
29 fn layout_reader(
30 &self,
31 fb_bytes: Bytes,
32 fb_loc: usize,
33 scan: Scan,
34 layout_serde: LayoutDeserializer,
35 message_cache: RelativeLayoutCache,
36 ) -> VortexResult<Box<dyn LayoutReader>> {
37 Ok(Box::new(ColumnarLayout::new(
38 fb_bytes,
39 fb_loc,
40 scan,
41 layout_serde,
42 message_cache,
43 )))
44 }
45}
46
47#[derive(Debug)]
51pub struct ColumnarLayout {
52 fb_bytes: Bytes,
53 fb_loc: usize,
54 scan: Scan,
55 layout_serde: LayoutDeserializer,
56 message_cache: RelativeLayoutCache,
57 reader: Option<ColumnBatchReader>,
58}
59
60impl ColumnarLayout {
61 pub fn new(
62 fb_bytes: Bytes,
63 fb_loc: usize,
64 scan: Scan,
65 layout_serde: LayoutDeserializer,
66 message_cache: RelativeLayoutCache,
67 ) -> Self {
68 Self {
69 fb_bytes,
70 fb_loc,
71 scan,
72 layout_serde,
73 message_cache,
74 reader: None,
75 }
76 }
77
78 fn flatbuffer(&self) -> footer::Layout {
79 unsafe {
80 let tab = flatbuffers::Table::new(&self.fb_bytes, self.fb_loc);
81 footer::Layout::init_from_table(tab)
82 }
83 }
84
85 fn children_for_splits(&self) -> VortexResult<Vec<Box<dyn LayoutReader>>> {
87 let (refs, lazy_dtype) = self.fields_with_dtypes()?;
88 let fb_children = self.flatbuffer().children().unwrap_or_default();
89
90 refs.into_iter()
91 .map(|field| {
92 let resolved_child = lazy_dtype.resolve_field(&field)?;
93 let child_loc = fb_children.get(resolved_child)._tab.loc();
94
95 self.layout_serde.read_layout(
96 self.fb_bytes.clone(),
97 child_loc,
98 Scan::new(None),
99 self.message_cache.unknown_dtype(resolved_child as u16),
100 )
101 })
102 .collect::<VortexResult<Vec<_>>>()
103 }
104
105 fn column_reader(&self) -> VortexResult<ColumnBatchReader> {
106 let (refs, lazy_dtype) = self.fields_with_dtypes()?;
107 let fb_children = self.flatbuffer().children().unwrap_or_default();
108
109 let filter_dtype = lazy_dtype.value()?;
110 let DType::Struct(s, ..) = filter_dtype else {
111 vortex_bail!("Column layout must have struct dtype")
112 };
113
114 let mut unhandled_names = Vec::new();
115 let mut unhandled_children = Vec::new();
116 let mut handled_children = Vec::new();
117 let mut handled_names = Vec::new();
118
119 for (field, (name, dtype)) in refs
120 .into_iter()
121 .zip_eq(s.names().iter().cloned().zip_eq(s.dtypes().iter().cloned()))
122 {
123 let resolved_child = lazy_dtype.resolve_field(&field)?;
124 let child_loc = fb_children.get(resolved_child)._tab.loc();
125 let projected_expr = self
126 .scan
127 .expr
128 .as_ref()
129 .and_then(|e| expr_project(e, &[field]));
130
131 let handled =
132 self.scan.expr.is_none() || (self.scan.expr.is_some() && projected_expr.is_some());
133
134 let child = self.layout_serde.read_layout(
135 self.fb_bytes.clone(),
136 child_loc,
137 Scan::new(projected_expr),
138 self.message_cache.relative(
139 resolved_child as u16,
140 Arc::new(LazilyDeserializedDType::from_dtype(dtype)),
141 ),
142 )?;
143
144 if handled {
145 handled_children.push(child);
146 handled_names.push(name);
147 } else {
148 unhandled_children.push(child);
149 unhandled_names.push(name);
150 }
151 }
152
153 if !unhandled_names.is_empty() {
154 let prf = self
155 .scan
156 .expr
157 .as_ref()
158 .and_then(|e| {
159 expr_project(
160 e,
161 &unhandled_names
162 .iter()
163 .map(|n| Field::from(n.as_ref()))
164 .collect::<Vec<_>>(),
165 )
166 })
167 .ok_or_else(|| {
168 vortex_err!(
169 "Must be able to project {:?} filter into unhandled space {}",
170 self.scan.expr.as_ref(),
171 unhandled_names.iter().format(",")
172 )
173 })?;
174
175 handled_children.push(Box::new(ColumnBatchReader::new(
176 unhandled_names.into(),
177 unhandled_children,
178 Some(prf),
179 false,
180 )));
181 handled_names.push("__unhandled".into());
182 }
183
184 let top_level_expr = self
185 .scan
186 .expr
187 .as_ref()
188 .map(|e| e.as_any().downcast_ref::<RowFilter>().is_some())
189 .unwrap_or(false)
190 .then(|| {
191 Arc::new(RowFilter::from_conjunction(
192 handled_names
193 .iter()
194 .map(|f| Arc::new(Column::new(Field::from(&**f))) as _)
195 .collect(),
196 )) as _
197 });
198 let shortcircuit_siblings = top_level_expr.is_some();
199 Ok(ColumnBatchReader::new(
200 handled_names.into(),
201 handled_children,
202 top_level_expr,
203 shortcircuit_siblings,
204 ))
205 }
206
207 fn fields_with_dtypes(&self) -> VortexResult<(Vec<Field>, Arc<LazilyDeserializedDType>)> {
209 let fb_children = self.flatbuffer().children().unwrap_or_default();
210 let field_refs = self.scan_fields();
211 let lazy_dtype = field_refs
212 .as_ref()
213 .map(|e| self.message_cache.dtype().project(e))
214 .unwrap_or_else(|| Ok(self.message_cache.dtype().clone()))?;
215
216 Ok((
217 field_refs.unwrap_or_else(|| (0..fb_children.len()).map(Field::from).collect()),
218 lazy_dtype,
219 ))
220 }
221
222 fn scan_fields(&self) -> Option<Vec<Field>> {
224 self.scan.expr.as_ref().map(|e| {
225 if let Some(se) = e.as_any().downcast_ref::<Select>() {
226 match se {
227 Select::Include(i) => i.clone(),
228 Select::Exclude(_) => vortex_panic!("Select::Exclude is not supported"),
229 }
230 } else {
231 e.references().into_iter().cloned().collect::<Vec<_>>()
232 }
233 })
234 }
235}
236
237impl LayoutReader for ColumnarLayout {
238 fn add_splits(&self, row_offset: usize, splits: &mut BTreeSet<usize>) -> VortexResult<()> {
239 for child in self.children_for_splits()? {
240 child.add_splits(row_offset, splits)?
241 }
242 Ok(())
243 }
244
245 fn read_selection(&mut self, selector: &RowMask) -> VortexResult<Option<BatchRead>> {
246 if let Some(r) = &mut self.reader {
247 r.read_selection(selector)
248 } else {
249 self.reader = Some(self.column_reader()?);
250 self.read_selection(selector)
251 }
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use std::iter;
258 use std::sync::{Arc, RwLock};
259
260 use bytes::Bytes;
261 use vortex_array::accessor::ArrayAccessor;
262 use vortex_array::array::{ChunkedArray, PrimitiveArray, StructArray, VarBinArray};
263 use vortex_array::validity::Validity;
264 use vortex_array::{ArrayDType, IntoArray, IntoArrayVariant};
265 use vortex_dtype::field::Field;
266 use vortex_dtype::{DType, Nullability};
267 use vortex_expr::{BinaryExpr, Column, Literal, Operator};
268
269 use crate::file::read::builder::initial_read::{read_initial_bytes, read_layout_from_initial};
270 use crate::file::read::cache::RelativeLayoutCache;
271 use crate::file::read::layouts::test_read::{filter_read_layout, read_layout};
272 use crate::file::{
273 LayoutDeserializer, LayoutMessageCache, LayoutReader, RowFilter, Scan, VortexFileWriter,
274 };
275
276 async fn layout_and_bytes(
277 cache: Arc<RwLock<LayoutMessageCache>>,
278 scan: Scan,
279 ) -> (Box<dyn LayoutReader>, Box<dyn LayoutReader>, Bytes, usize) {
280 let int_array = PrimitiveArray::from((0..100).collect::<Vec<_>>()).into_array();
281 let int_dtype = int_array.dtype().clone();
282 let chunked = ChunkedArray::try_new(iter::repeat(int_array).take(5).collect(), int_dtype)
283 .unwrap()
284 .into_array();
285 let str_array = VarBinArray::from_vec(
286 iter::repeat("test text").take(500).collect(),
287 DType::Utf8(Nullability::NonNullable),
288 )
289 .into_array();
290 let len = chunked.len();
291 let struct_arr = StructArray::try_new(
292 vec!["ints".into(), "strs".into()].into(),
293 vec![chunked, str_array],
294 len,
295 Validity::NonNullable,
296 )
297 .unwrap()
298 .into_array();
299
300 let mut writer = VortexFileWriter::new(Vec::new());
301 writer = writer.write_array_columns(struct_arr).await.unwrap();
302 let written = writer.finalize().await.unwrap();
303
304 let initial_read = read_initial_bytes(&written, written.len() as u64)
305 .await
306 .unwrap();
307 let layout_serde = LayoutDeserializer::default();
308
309 let dtype = Arc::new(initial_read.lazy_dtype().unwrap());
310 (
311 read_layout_from_initial(
312 &initial_read,
313 &layout_serde,
314 scan,
315 RelativeLayoutCache::new(cache.clone(), dtype.clone()),
316 )
317 .unwrap(),
318 read_layout_from_initial(
319 &initial_read,
320 &layout_serde,
321 Scan::new(None),
322 RelativeLayoutCache::new(cache.clone(), dtype),
323 )
324 .unwrap(),
325 Bytes::from(written),
326 len,
327 )
328 }
329
330 #[tokio::test]
331 #[cfg_attr(miri, ignore)]
332 async fn read_range() {
333 let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
334 let (mut filter_layout, mut project_layout, buf, length) = layout_and_bytes(
335 cache.clone(),
336 Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new(
337 Arc::new(Column::new(Field::from("ints"))),
338 Operator::Gt,
339 Arc::new(Literal::new(10.into())),
340 )))))),
341 )
342 .await;
343 let arr = filter_read_layout(
344 filter_layout.as_mut(),
345 project_layout.as_mut(),
346 cache,
347 &buf,
348 length,
349 )
350 .pop_front();
351
352 assert!(arr.is_some());
353 let prim_arr = arr
354 .as_ref()
355 .unwrap()
356 .with_dyn(|a| a.as_struct_array_unchecked().field(0))
357 .unwrap()
358 .into_primitive()
359 .unwrap();
360 let str_arr = arr
361 .as_ref()
362 .unwrap()
363 .with_dyn(|a| a.as_struct_array_unchecked().field(1))
364 .unwrap()
365 .into_varbinview()
366 .unwrap();
367 assert_eq!(
368 prim_arr.maybe_null_slice::<i32>(),
369 &(11..100).collect::<Vec<_>>()
370 );
371 assert_eq!(
372 str_arr
373 .with_iterator(|iter| iter
374 .flatten()
375 .map(|s| unsafe { String::from_utf8_unchecked(s.to_vec()) })
376 .collect::<Vec<_>>())
377 .unwrap(),
378 iter::repeat("test text").take(89).collect::<Vec<_>>()
379 );
380 }
381
382 #[tokio::test]
383 #[cfg_attr(miri, ignore)]
384 async fn read_range_no_filter() {
385 let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
386 let (_, mut project_layout, buf, length) =
387 layout_and_bytes(cache.clone(), Scan::new(None)).await;
388 let arr = read_layout(project_layout.as_mut(), cache, &buf, length).pop_front();
389
390 assert!(arr.is_some());
391 let prim_arr = arr
392 .as_ref()
393 .unwrap()
394 .with_dyn(|a| a.as_struct_array_unchecked().field(0))
395 .unwrap()
396 .into_primitive()
397 .unwrap();
398 let str_arr = arr
399 .as_ref()
400 .unwrap()
401 .with_dyn(|a| a.as_struct_array_unchecked().field(1))
402 .unwrap()
403 .into_varbinview()
404 .unwrap();
405 assert_eq!(
406 prim_arr.maybe_null_slice::<i32>(),
407 (0..100).collect::<Vec<_>>()
408 );
409 assert_eq!(
410 str_arr
411 .with_iterator(|iter| iter
412 .flatten()
413 .map(|s| unsafe { String::from_utf8_unchecked(s.to_vec()) })
414 .collect::<Vec<_>>())
415 .unwrap(),
416 iter::repeat("test text").take(100).collect::<Vec<_>>()
417 );
418 }
419}