vortex_serde/file/read/layouts/
columnar.rs

1use std::collections::BTreeSet;
2use std::sync::Arc;
3
4use bytes::Bytes;
5use itertools::Itertools;
6use vortex_dtype::field::Field;
7use vortex_dtype::DType;
8use vortex_error::{vortex_bail, vortex_err, vortex_panic, VortexResult};
9use vortex_expr::{Column, Select};
10use vortex_flatbuffers::footer;
11
12use crate::file::read::cache::{LazilyDeserializedDType, RelativeLayoutCache};
13use crate::file::read::column_batch::ColumnBatchReader;
14use crate::file::read::expr_project::expr_project;
15use crate::file::read::mask::RowMask;
16use crate::file::{
17    BatchRead, LayoutDeserializer, LayoutId, LayoutReader, LayoutSpec, RowFilter, Scan,
18    COLUMNAR_LAYOUT_ID,
19};
20
21#[derive(Debug)]
22pub struct ColumnarLayoutSpec;
23
24impl LayoutSpec for ColumnarLayoutSpec {
25    fn id(&self) -> LayoutId {
26        COLUMNAR_LAYOUT_ID
27    }
28
29    fn layout_reader(
30        &self,
31        fb_bytes: Bytes,
32        fb_loc: usize,
33        scan: Scan,
34        layout_serde: LayoutDeserializer,
35        message_cache: RelativeLayoutCache,
36    ) -> VortexResult<Box<dyn LayoutReader>> {
37        Ok(Box::new(ColumnarLayout::new(
38            fb_bytes,
39            fb_loc,
40            scan,
41            layout_serde,
42            message_cache,
43        )))
44    }
45}
46
47/// In memory representation of Columnar NestedLayout.
48///
49/// Each child represents a column
50#[derive(Debug)]
51pub struct ColumnarLayout {
52    fb_bytes: Bytes,
53    fb_loc: usize,
54    scan: Scan,
55    layout_serde: LayoutDeserializer,
56    message_cache: RelativeLayoutCache,
57    reader: Option<ColumnBatchReader>,
58}
59
60impl ColumnarLayout {
61    pub fn new(
62        fb_bytes: Bytes,
63        fb_loc: usize,
64        scan: Scan,
65        layout_serde: LayoutDeserializer,
66        message_cache: RelativeLayoutCache,
67    ) -> Self {
68        Self {
69            fb_bytes,
70            fb_loc,
71            scan,
72            layout_serde,
73            message_cache,
74            reader: None,
75        }
76    }
77
78    fn flatbuffer(&self) -> footer::Layout {
79        unsafe {
80            let tab = flatbuffers::Table::new(&self.fb_bytes, self.fb_loc);
81            footer::Layout::init_from_table(tab)
82        }
83    }
84
85    /// Perform minimal amount of work to construct children that can be queried for splits
86    fn children_for_splits(&self) -> VortexResult<Vec<Box<dyn LayoutReader>>> {
87        let (refs, lazy_dtype) = self.fields_with_dtypes()?;
88        let fb_children = self.flatbuffer().children().unwrap_or_default();
89
90        refs.into_iter()
91            .map(|field| {
92                let resolved_child = lazy_dtype.resolve_field(&field)?;
93                let child_loc = fb_children.get(resolved_child)._tab.loc();
94
95                self.layout_serde.read_layout(
96                    self.fb_bytes.clone(),
97                    child_loc,
98                    Scan::new(None),
99                    self.message_cache.unknown_dtype(resolved_child as u16),
100                )
101            })
102            .collect::<VortexResult<Vec<_>>>()
103    }
104
105    fn column_reader(&self) -> VortexResult<ColumnBatchReader> {
106        let (refs, lazy_dtype) = self.fields_with_dtypes()?;
107        let fb_children = self.flatbuffer().children().unwrap_or_default();
108
109        let filter_dtype = lazy_dtype.value()?;
110        let DType::Struct(s, ..) = filter_dtype else {
111            vortex_bail!("Column layout must have struct dtype")
112        };
113
114        let mut unhandled_names = Vec::new();
115        let mut unhandled_children = Vec::new();
116        let mut handled_children = Vec::new();
117        let mut handled_names = Vec::new();
118
119        for (field, (name, dtype)) in refs
120            .into_iter()
121            .zip_eq(s.names().iter().cloned().zip_eq(s.dtypes().iter().cloned()))
122        {
123            let resolved_child = lazy_dtype.resolve_field(&field)?;
124            let child_loc = fb_children.get(resolved_child)._tab.loc();
125            let projected_expr = self
126                .scan
127                .expr
128                .as_ref()
129                .and_then(|e| expr_project(e, &[field]));
130
131            let handled =
132                self.scan.expr.is_none() || (self.scan.expr.is_some() && projected_expr.is_some());
133
134            let child = self.layout_serde.read_layout(
135                self.fb_bytes.clone(),
136                child_loc,
137                Scan::new(projected_expr),
138                self.message_cache.relative(
139                    resolved_child as u16,
140                    Arc::new(LazilyDeserializedDType::from_dtype(dtype)),
141                ),
142            )?;
143
144            if handled {
145                handled_children.push(child);
146                handled_names.push(name);
147            } else {
148                unhandled_children.push(child);
149                unhandled_names.push(name);
150            }
151        }
152
153        if !unhandled_names.is_empty() {
154            let prf = self
155                .scan
156                .expr
157                .as_ref()
158                .and_then(|e| {
159                    expr_project(
160                        e,
161                        &unhandled_names
162                            .iter()
163                            .map(|n| Field::from(n.as_ref()))
164                            .collect::<Vec<_>>(),
165                    )
166                })
167                .ok_or_else(|| {
168                    vortex_err!(
169                        "Must be able to project {:?} filter into unhandled space {}",
170                        self.scan.expr.as_ref(),
171                        unhandled_names.iter().format(",")
172                    )
173                })?;
174
175            handled_children.push(Box::new(ColumnBatchReader::new(
176                unhandled_names.into(),
177                unhandled_children,
178                Some(prf),
179                false,
180            )));
181            handled_names.push("__unhandled".into());
182        }
183
184        let top_level_expr = self
185            .scan
186            .expr
187            .as_ref()
188            .map(|e| e.as_any().downcast_ref::<RowFilter>().is_some())
189            .unwrap_or(false)
190            .then(|| {
191                Arc::new(RowFilter::from_conjunction(
192                    handled_names
193                        .iter()
194                        .map(|f| Arc::new(Column::new(Field::from(&**f))) as _)
195                        .collect(),
196                )) as _
197            });
198        let shortcircuit_siblings = top_level_expr.is_some();
199        Ok(ColumnBatchReader::new(
200            handled_names.into(),
201            handled_children,
202            top_level_expr,
203            shortcircuit_siblings,
204        ))
205    }
206
207    /// Get fields referenced by scan expression along with their dtype
208    fn fields_with_dtypes(&self) -> VortexResult<(Vec<Field>, Arc<LazilyDeserializedDType>)> {
209        let fb_children = self.flatbuffer().children().unwrap_or_default();
210        let field_refs = self.scan_fields();
211        let lazy_dtype = field_refs
212            .as_ref()
213            .map(|e| self.message_cache.dtype().project(e))
214            .unwrap_or_else(|| Ok(self.message_cache.dtype().clone()))?;
215
216        Ok((
217            field_refs.unwrap_or_else(|| (0..fb_children.len()).map(Field::from).collect()),
218            lazy_dtype,
219        ))
220    }
221
222    /// Get fields referenced by scan expression preserving order if we're using select to project
223    fn scan_fields(&self) -> Option<Vec<Field>> {
224        self.scan.expr.as_ref().map(|e| {
225            if let Some(se) = e.as_any().downcast_ref::<Select>() {
226                match se {
227                    Select::Include(i) => i.clone(),
228                    Select::Exclude(_) => vortex_panic!("Select::Exclude is not supported"),
229                }
230            } else {
231                e.references().into_iter().cloned().collect::<Vec<_>>()
232            }
233        })
234    }
235}
236
237impl LayoutReader for ColumnarLayout {
238    fn add_splits(&self, row_offset: usize, splits: &mut BTreeSet<usize>) -> VortexResult<()> {
239        for child in self.children_for_splits()? {
240            child.add_splits(row_offset, splits)?
241        }
242        Ok(())
243    }
244
245    fn read_selection(&mut self, selector: &RowMask) -> VortexResult<Option<BatchRead>> {
246        if let Some(r) = &mut self.reader {
247            r.read_selection(selector)
248        } else {
249            self.reader = Some(self.column_reader()?);
250            self.read_selection(selector)
251        }
252    }
253}
254
255#[cfg(test)]
256mod tests {
257    use std::iter;
258    use std::sync::{Arc, RwLock};
259
260    use bytes::Bytes;
261    use vortex_array::accessor::ArrayAccessor;
262    use vortex_array::array::{ChunkedArray, PrimitiveArray, StructArray, VarBinArray};
263    use vortex_array::validity::Validity;
264    use vortex_array::{ArrayDType, IntoArray, IntoArrayVariant};
265    use vortex_dtype::field::Field;
266    use vortex_dtype::{DType, Nullability};
267    use vortex_expr::{BinaryExpr, Column, Literal, Operator};
268
269    use crate::file::read::builder::initial_read::{read_initial_bytes, read_layout_from_initial};
270    use crate::file::read::cache::RelativeLayoutCache;
271    use crate::file::read::layouts::test_read::{filter_read_layout, read_layout};
272    use crate::file::{
273        LayoutDeserializer, LayoutMessageCache, LayoutReader, RowFilter, Scan, VortexFileWriter,
274    };
275
276    async fn layout_and_bytes(
277        cache: Arc<RwLock<LayoutMessageCache>>,
278        scan: Scan,
279    ) -> (Box<dyn LayoutReader>, Box<dyn LayoutReader>, Bytes, usize) {
280        let int_array = PrimitiveArray::from((0..100).collect::<Vec<_>>()).into_array();
281        let int_dtype = int_array.dtype().clone();
282        let chunked = ChunkedArray::try_new(iter::repeat(int_array).take(5).collect(), int_dtype)
283            .unwrap()
284            .into_array();
285        let str_array = VarBinArray::from_vec(
286            iter::repeat("test text").take(500).collect(),
287            DType::Utf8(Nullability::NonNullable),
288        )
289        .into_array();
290        let len = chunked.len();
291        let struct_arr = StructArray::try_new(
292            vec!["ints".into(), "strs".into()].into(),
293            vec![chunked, str_array],
294            len,
295            Validity::NonNullable,
296        )
297        .unwrap()
298        .into_array();
299
300        let mut writer = VortexFileWriter::new(Vec::new());
301        writer = writer.write_array_columns(struct_arr).await.unwrap();
302        let written = writer.finalize().await.unwrap();
303
304        let initial_read = read_initial_bytes(&written, written.len() as u64)
305            .await
306            .unwrap();
307        let layout_serde = LayoutDeserializer::default();
308
309        let dtype = Arc::new(initial_read.lazy_dtype().unwrap());
310        (
311            read_layout_from_initial(
312                &initial_read,
313                &layout_serde,
314                scan,
315                RelativeLayoutCache::new(cache.clone(), dtype.clone()),
316            )
317            .unwrap(),
318            read_layout_from_initial(
319                &initial_read,
320                &layout_serde,
321                Scan::new(None),
322                RelativeLayoutCache::new(cache.clone(), dtype),
323            )
324            .unwrap(),
325            Bytes::from(written),
326            len,
327        )
328    }
329
330    #[tokio::test]
331    #[cfg_attr(miri, ignore)]
332    async fn read_range() {
333        let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
334        let (mut filter_layout, mut project_layout, buf, length) = layout_and_bytes(
335            cache.clone(),
336            Scan::new(Some(Arc::new(RowFilter::new(Arc::new(BinaryExpr::new(
337                Arc::new(Column::new(Field::from("ints"))),
338                Operator::Gt,
339                Arc::new(Literal::new(10.into())),
340            )))))),
341        )
342        .await;
343        let arr = filter_read_layout(
344            filter_layout.as_mut(),
345            project_layout.as_mut(),
346            cache,
347            &buf,
348            length,
349        )
350        .pop_front();
351
352        assert!(arr.is_some());
353        let prim_arr = arr
354            .as_ref()
355            .unwrap()
356            .with_dyn(|a| a.as_struct_array_unchecked().field(0))
357            .unwrap()
358            .into_primitive()
359            .unwrap();
360        let str_arr = arr
361            .as_ref()
362            .unwrap()
363            .with_dyn(|a| a.as_struct_array_unchecked().field(1))
364            .unwrap()
365            .into_varbinview()
366            .unwrap();
367        assert_eq!(
368            prim_arr.maybe_null_slice::<i32>(),
369            &(11..100).collect::<Vec<_>>()
370        );
371        assert_eq!(
372            str_arr
373                .with_iterator(|iter| iter
374                    .flatten()
375                    .map(|s| unsafe { String::from_utf8_unchecked(s.to_vec()) })
376                    .collect::<Vec<_>>())
377                .unwrap(),
378            iter::repeat("test text").take(89).collect::<Vec<_>>()
379        );
380    }
381
382    #[tokio::test]
383    #[cfg_attr(miri, ignore)]
384    async fn read_range_no_filter() {
385        let cache = Arc::new(RwLock::new(LayoutMessageCache::default()));
386        let (_, mut project_layout, buf, length) =
387            layout_and_bytes(cache.clone(), Scan::new(None)).await;
388        let arr = read_layout(project_layout.as_mut(), cache, &buf, length).pop_front();
389
390        assert!(arr.is_some());
391        let prim_arr = arr
392            .as_ref()
393            .unwrap()
394            .with_dyn(|a| a.as_struct_array_unchecked().field(0))
395            .unwrap()
396            .into_primitive()
397            .unwrap();
398        let str_arr = arr
399            .as_ref()
400            .unwrap()
401            .with_dyn(|a| a.as_struct_array_unchecked().field(1))
402            .unwrap()
403            .into_varbinview()
404            .unwrap();
405        assert_eq!(
406            prim_arr.maybe_null_slice::<i32>(),
407            (0..100).collect::<Vec<_>>()
408        );
409        assert_eq!(
410            str_arr
411                .with_iterator(|iter| iter
412                    .flatten()
413                    .map(|s| unsafe { String::from_utf8_unchecked(s.to_vec()) })
414                    .collect::<Vec<_>>())
415                .unwrap(),
416            iter::repeat("test text").take(100).collect::<Vec<_>>()
417        );
418    }
419}