vortex_layout/scan/
split_by.rs

1use std::collections::BTreeSet;
2use std::ops::Range;
3
4use itertools::Itertools;
5use vortex_dtype::FieldMask;
6use vortex_error::VortexResult;
7
8use crate::Layout;
9
10/// Defines how the Vortex file is split into batches for reading.
11///
12/// Note that each split must fit into the platform's maximum usize.
13#[derive(Default, Copy, Clone)]
14pub enum SplitBy {
15    #[default]
16    /// Splits any time there is a chunk boundary in the file.
17    Layout,
18    /// Splits every n rows.
19    RowCount(usize),
20    // UncompressedSize(u64),
21}
22
23impl SplitBy {
24    /// Compute the splits for the given layout.
25    pub(crate) fn splits(
26        &self,
27        layout: &Layout,
28        field_mask: &[FieldMask],
29    ) -> VortexResult<Vec<Range<u64>>> {
30        Ok(match *self {
31            SplitBy::Layout => {
32                let mut row_splits = BTreeSet::<u64>::new();
33                // Make sure we always have the first and last row.
34                row_splits.insert(0);
35                row_splits.insert(layout.row_count());
36                // Register the splits for all the layouts.
37                layout.register_splits(field_mask, 0, &mut row_splits)?;
38
39                row_splits
40                    .into_iter()
41                    .tuple_windows()
42                    .map(|(start, end)| start..end)
43                    .collect()
44            }
45            SplitBy::RowCount(n) => {
46                let row_count = layout.row_count();
47                let mut splits =
48                    Vec::with_capacity(usize::try_from((row_count + n as u64) / n as u64)?);
49                for start in (0..row_count).step_by(n) {
50                    let end = (start + n as u64).min(row_count);
51                    splits.push(start..end);
52                }
53                splits
54            }
55        })
56    }
57}
58
59#[cfg(test)]
60mod test {
61
62    use vortex_array::{ArrayContext, IntoArray};
63    use vortex_buffer::buffer;
64    use vortex_dtype::Nullability::NonNullable;
65    use vortex_dtype::{DType, FieldPath};
66
67    use super::*;
68    use crate::LayoutWriterExt;
69    use crate::layouts::flat::writer::FlatLayoutWriter;
70    use crate::segments::TestSegments;
71
72    #[test]
73    fn test_layout_splits_flat() {
74        let mut segments = TestSegments::default();
75        let layout = FlatLayoutWriter::new(
76            ArrayContext::empty(),
77            DType::Bool(NonNullable),
78            Default::default(),
79        )
80        .push_one(&mut segments, buffer![1; 10].into_array())
81        .unwrap();
82        let splits = SplitBy::Layout
83            .splits(&layout, &[FieldMask::Exact(FieldPath::root())])
84            .unwrap();
85        assert_eq!(splits, vec![0..10]);
86    }
87
88    #[test]
89    fn test_row_count_splits() {
90        let mut segments = TestSegments::default();
91        let layout = FlatLayoutWriter::new(
92            ArrayContext::empty(),
93            DType::Bool(NonNullable),
94            Default::default(),
95        )
96        .push_one(&mut segments, buffer![1; 10].into_array())
97        .unwrap();
98        let splits = SplitBy::RowCount(3)
99            .splits(&layout, &[FieldMask::Exact(FieldPath::root())])
100            .unwrap();
101        assert_eq!(splits, vec![0..3, 3..6, 6..9, 9..10]);
102    }
103}