vortex_layout/scan/
split_by.rs1use std::collections::BTreeSet;
5use std::iter::once;
6use std::ops::Range;
7
8use vortex_array::dtype::FieldMask;
9use vortex_error::VortexResult;
10
11use crate::LayoutReader;
12
13#[derive(Default, Copy, Clone, Debug)]
17pub enum SplitBy {
18 #[default]
19 Layout,
21 RowCount(usize),
23 }
25
26impl SplitBy {
27 pub fn splits(
30 &self,
31 layout_reader: &dyn LayoutReader,
32 row_range: &Range<u64>,
33 field_mask: &[FieldMask],
34 ) -> VortexResult<BTreeSet<u64>> {
35 Ok(match *self {
36 SplitBy::Layout => {
37 let mut row_splits = BTreeSet::<u64>::new();
38 row_splits.insert(row_range.start);
39
40 layout_reader.register_splits(field_mask, row_range, &mut row_splits)?;
43 row_splits
44 }
45 SplitBy::RowCount(n) => row_range
46 .clone()
47 .step_by(n)
48 .chain(once(row_range.end))
49 .collect(),
50 })
51 }
52}
53
54#[cfg(test)]
55mod test {
56 use std::sync::Arc;
57
58 use vortex_array::ArrayContext;
59 use vortex_array::IntoArray;
60 use vortex_array::dtype::FieldPath;
61 use vortex_buffer::buffer;
62 use vortex_io::runtime::single::block_on;
63 use vortex_io::session::RuntimeSessionExt;
64
65 use super::*;
66 use crate::LayoutReaderRef;
67 use crate::LayoutStrategy;
68 use crate::layouts::flat::writer::FlatLayoutStrategy;
69 use crate::scan::test::SCAN_SESSION;
70 use crate::segments::TestSegments;
71 use crate::sequence::SequenceId;
72 use crate::sequence::SequentialArrayStreamExt;
73
74 fn reader() -> LayoutReaderRef {
75 let ctx = ArrayContext::empty();
76 let segments = Arc::new(TestSegments::default());
77 let (ptr, eof) = SequenceId::root().split();
78 let layout = block_on(|handle| async {
79 let session = SCAN_SESSION.clone().with_handle(handle);
80 FlatLayoutStrategy::default()
81 .write_stream(
82 ctx,
83 Arc::<TestSegments>::clone(&segments),
84 buffer![1_i32; 10]
85 .into_array()
86 .to_array_stream()
87 .sequenced(ptr),
88 eof,
89 &session,
90 )
91 .await
92 })
93 .unwrap();
94
95 layout
96 .new_reader("".into(), segments, &SCAN_SESSION)
97 .unwrap()
98 }
99
100 #[test]
101 fn test_layout_splits_flat() {
102 let reader = reader();
103
104 let splits = SplitBy::Layout
105 .splits(
106 reader.as_ref(),
107 &(0..10),
108 &[FieldMask::Exact(FieldPath::root())],
109 )
110 .unwrap();
111 assert_eq!(splits, [0, 10].into_iter().collect());
112 }
113
114 #[test]
115 fn test_row_count_splits() {
116 let reader = reader();
117
118 let splits = SplitBy::RowCount(3)
119 .splits(
120 reader.as_ref(),
121 &(0..10),
122 &[FieldMask::Exact(FieldPath::root())],
123 )
124 .unwrap();
125 assert_eq!(splits, [0, 3, 6, 9, 10].into_iter().collect());
126 }
127}