Skip to main content

vortex_layout/scan/
split_by.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::BTreeSet;
5use std::iter::once;
6use std::ops::Range;
7
8use vortex_array::dtype::FieldMask;
9use vortex_error::VortexResult;
10
11use crate::LayoutReader;
12
13/// Defines how the Vortex file is split into batches for reading.
14///
15/// Note that each split must fit into the platform's maximum usize.
16#[derive(Default, Copy, Clone, Debug)]
17pub enum SplitBy {
18    #[default]
19    /// Splits any time there is a chunk boundary in the file.
20    Layout,
21    /// Splits every n rows.
22    RowCount(usize),
23    // UncompressedSize(u64),
24}
25
26impl SplitBy {
27    /// Compute the splits for the given layout.
28    // TODO(ngates): remove this once layout readers are stream based.
29    pub fn splits(
30        &self,
31        layout_reader: &dyn LayoutReader,
32        row_range: &Range<u64>,
33        field_mask: &[FieldMask],
34    ) -> VortexResult<BTreeSet<u64>> {
35        Ok(match *self {
36            SplitBy::Layout => {
37                let mut row_splits = BTreeSet::<u64>::new();
38                row_splits.insert(row_range.start);
39
40                // Register all splits in the row range for all layouts that are needed
41                // to read the field mask.
42                layout_reader.register_splits(field_mask, row_range, &mut row_splits)?;
43                row_splits
44            }
45            SplitBy::RowCount(n) => row_range
46                .clone()
47                .step_by(n)
48                .chain(once(row_range.end))
49                .collect(),
50        })
51    }
52}
53
54#[cfg(test)]
55mod test {
56    use std::sync::Arc;
57
58    use vortex_array::ArrayContext;
59    use vortex_array::IntoArray;
60    use vortex_array::dtype::FieldPath;
61    use vortex_buffer::buffer;
62    use vortex_io::runtime::single::block_on;
63    use vortex_io::session::RuntimeSessionExt;
64
65    use super::*;
66    use crate::LayoutReaderRef;
67    use crate::LayoutStrategy;
68    use crate::layouts::flat::writer::FlatLayoutStrategy;
69    use crate::scan::test::SCAN_SESSION;
70    use crate::segments::TestSegments;
71    use crate::sequence::SequenceId;
72    use crate::sequence::SequentialArrayStreamExt;
73
74    fn reader() -> LayoutReaderRef {
75        let ctx = ArrayContext::empty();
76        let segments = Arc::new(TestSegments::default());
77        let (ptr, eof) = SequenceId::root().split();
78        let layout = block_on(|handle| async {
79            let session = SCAN_SESSION.clone().with_handle(handle);
80            FlatLayoutStrategy::default()
81                .write_stream(
82                    ctx,
83                    Arc::<TestSegments>::clone(&segments),
84                    buffer![1_i32; 10]
85                        .into_array()
86                        .to_array_stream()
87                        .sequenced(ptr),
88                    eof,
89                    &session,
90                )
91                .await
92        })
93        .unwrap();
94
95        layout
96            .new_reader("".into(), segments, &SCAN_SESSION)
97            .unwrap()
98    }
99
100    #[test]
101    fn test_layout_splits_flat() {
102        let reader = reader();
103
104        let splits = SplitBy::Layout
105            .splits(
106                reader.as_ref(),
107                &(0..10),
108                &[FieldMask::Exact(FieldPath::root())],
109            )
110            .unwrap();
111        assert_eq!(splits, [0, 10].into_iter().collect());
112    }
113
114    #[test]
115    fn test_row_count_splits() {
116        let reader = reader();
117
118        let splits = SplitBy::RowCount(3)
119            .splits(
120                reader.as_ref(),
121                &(0..10),
122                &[FieldMask::Exact(FieldPath::root())],
123            )
124            .unwrap();
125        assert_eq!(splits, [0, 3, 6, 9, 10].into_iter().collect());
126    }
127}