Skip to main content

vortex_layout/scan/
split_by.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::iter::once;
5use std::ops::Range;
6
7use vortex_array::dtype::FieldMask;
8use vortex_error::VortexResult;
9
10use crate::LayoutReader;
11use crate::RowSplits;
12use crate::SplitRange;
13
14/// Defines how the Vortex file is split into batches for reading.
15///
16/// Note that each split must fit into the platform's maximum usize.
17#[derive(Default, Copy, Clone, Debug)]
18pub enum SplitBy {
19    #[default]
20    /// Splits any time there is a chunk boundary in the file.
21    Layout,
22    /// Splits every n rows.
23    RowCount(usize),
24    // UncompressedSize(u64),
25}
26
27impl SplitBy {
28    /// Compute the splits for the given layout.
29    // TODO(ngates): remove this once layout readers are stream based.
30    pub fn splits(
31        &self,
32        layout_reader: &dyn LayoutReader,
33        row_range: &Range<u64>,
34        field_mask: &[FieldMask],
35    ) -> VortexResult<Vec<u64>> {
36        Ok(match *self {
37            SplitBy::Layout => {
38                // We usually have under 100 splits so reserving upfront saves
39                // us some allocations
40                let mut row_splits = RowSplits::new_capacity(128);
41                row_splits.push(row_range.start);
42                layout_reader.register_splits(
43                    field_mask,
44                    &SplitRange::root(row_range.clone())?,
45                    &mut row_splits,
46                )?;
47                row_splits.into_sorted_deduped()
48            }
49            SplitBy::RowCount(n) => row_range
50                .clone()
51                .step_by(n)
52                .chain(once(row_range.end))
53                .collect(),
54        })
55    }
56}
57
58#[cfg(test)]
59mod test {
60    use std::any::Any;
61    use std::sync::Arc;
62
63    use futures::future::BoxFuture;
64    use vortex_array::ArrayContext;
65    use vortex_array::ArrayRef;
66    use vortex_array::IntoArray;
67    use vortex_array::MaskFuture;
68    use vortex_array::dtype::DType;
69    use vortex_array::dtype::FieldPath;
70    use vortex_array::dtype::Nullability;
71    use vortex_array::dtype::PType;
72    use vortex_array::expr::Expression;
73    use vortex_buffer::buffer;
74    use vortex_io::runtime::single::block_on;
75    use vortex_io::session::RuntimeSessionExt;
76    use vortex_mask::Mask;
77
78    use super::*;
79    use crate::LayoutReaderRef;
80    use crate::LayoutStrategy;
81    use crate::RowSplits;
82    use crate::layouts::flat::writer::FlatLayoutStrategy;
83    use crate::scan::test::SCAN_SESSION;
84    use crate::segments::TestSegments;
85    use crate::sequence::SequenceId;
86    use crate::sequence::SequentialArrayStreamExt;
87
88    fn reader() -> LayoutReaderRef {
89        let ctx = ArrayContext::empty();
90        let segments = Arc::new(TestSegments::default());
91        let (ptr, eof) = SequenceId::root().split();
92        let layout = block_on(|handle| async {
93            let session = SCAN_SESSION.clone().with_handle(handle);
94            FlatLayoutStrategy::default()
95                .write_stream(
96                    ctx,
97                    Arc::<TestSegments>::clone(&segments),
98                    buffer![1_i32; 10]
99                        .into_array()
100                        .to_array_stream()
101                        .sequenced(ptr),
102                    eof,
103                    &session,
104                )
105                .await
106        })
107        .unwrap();
108
109        layout
110            .new_reader("".into(), segments, &SCAN_SESSION)
111            .unwrap()
112    }
113
114    #[test]
115    fn test_layout_splits_flat() {
116        let reader = reader();
117
118        let splits = SplitBy::Layout
119            .splits(
120                reader.as_ref(),
121                &(0..10),
122                &[FieldMask::Exact(FieldPath::root())],
123            )
124            .unwrap();
125        assert_eq!(splits, vec![0u64, 10]);
126    }
127
128    #[test]
129    fn test_row_count_splits() {
130        let reader = reader();
131
132        let splits = SplitBy::RowCount(3)
133            .splits(
134                reader.as_ref(),
135                &(0..10),
136                &[FieldMask::Exact(FieldPath::root())],
137            )
138            .unwrap();
139        assert_eq!(splits, vec![0u64, 3, 6, 9, 10]);
140    }
141
142    #[test]
143    fn test_layout_splits_dedup() {
144        struct DupReader {
145            name: Arc<str>,
146            dtype: DType,
147        }
148
149        impl LayoutReader for DupReader {
150            fn name(&self) -> &Arc<str> {
151                &self.name
152            }
153
154            fn as_any(&self) -> &dyn Any {
155                self
156            }
157
158            fn dtype(&self) -> &DType {
159                &self.dtype
160            }
161
162            fn row_count(&self) -> u64 {
163                10
164            }
165
166            fn register_splits(
167                &self,
168                _field_mask: &[FieldMask],
169                split_range: &SplitRange,
170                splits: &mut RowSplits,
171            ) -> VortexResult<()> {
172                splits.push(split_range.row_offset() + 5);
173                splits.push(split_range.row_offset() + 5);
174                splits.push(split_range.root_row_range().end);
175                Ok(())
176            }
177
178            fn pruning_evaluation(
179                &self,
180                _: &Range<u64>,
181                _: &Expression,
182                _: Mask,
183            ) -> VortexResult<MaskFuture> {
184                unimplemented!()
185            }
186
187            fn filter_evaluation(
188                &self,
189                _: &Range<u64>,
190                _: &Expression,
191                _: MaskFuture,
192            ) -> VortexResult<MaskFuture> {
193                unimplemented!()
194            }
195
196            fn projection_evaluation(
197                &self,
198                _: &Range<u64>,
199                _: &Expression,
200                _: MaskFuture,
201            ) -> VortexResult<BoxFuture<'static, VortexResult<ArrayRef>>> {
202                unimplemented!()
203            }
204        }
205
206        let reader = DupReader {
207            name: Arc::from("dup"),
208            dtype: DType::Primitive(PType::U8, Nullability::NonNullable),
209        };
210        let splits = SplitBy::Layout
211            .splits(&reader, &(0..10), &[FieldMask::All])
212            .unwrap();
213        assert_eq!(splits, vec![0u64, 5, 10]);
214    }
215}