vortex_layout/scan/
split_by.rs1use std::iter::once;
5use std::ops::Range;
6
7use vortex_array::dtype::FieldMask;
8use vortex_error::VortexResult;
9
10use crate::LayoutReader;
11use crate::RowSplits;
12use crate::SplitRange;
13
14#[derive(Default, Copy, Clone, Debug)]
18pub enum SplitBy {
19 #[default]
20 Layout,
22 RowCount(usize),
24 }
26
27impl SplitBy {
28 pub fn splits(
31 &self,
32 layout_reader: &dyn LayoutReader,
33 row_range: &Range<u64>,
34 field_mask: &[FieldMask],
35 ) -> VortexResult<Vec<u64>> {
36 Ok(match *self {
37 SplitBy::Layout => {
38 let mut row_splits = RowSplits::new_capacity(128);
41 row_splits.push(row_range.start);
42 layout_reader.register_splits(
43 field_mask,
44 &SplitRange::root(row_range.clone())?,
45 &mut row_splits,
46 )?;
47 row_splits.into_sorted_deduped()
48 }
49 SplitBy::RowCount(n) => row_range
50 .clone()
51 .step_by(n)
52 .chain(once(row_range.end))
53 .collect(),
54 })
55 }
56}
57
58#[cfg(test)]
59mod test {
60 use std::any::Any;
61 use std::sync::Arc;
62
63 use futures::future::BoxFuture;
64 use vortex_array::ArrayContext;
65 use vortex_array::ArrayRef;
66 use vortex_array::IntoArray;
67 use vortex_array::MaskFuture;
68 use vortex_array::dtype::DType;
69 use vortex_array::dtype::FieldPath;
70 use vortex_array::dtype::Nullability;
71 use vortex_array::dtype::PType;
72 use vortex_array::expr::Expression;
73 use vortex_buffer::buffer;
74 use vortex_io::runtime::single::block_on;
75 use vortex_io::session::RuntimeSessionExt;
76 use vortex_mask::Mask;
77
78 use super::*;
79 use crate::LayoutReaderRef;
80 use crate::LayoutStrategy;
81 use crate::RowSplits;
82 use crate::layouts::flat::writer::FlatLayoutStrategy;
83 use crate::scan::test::SCAN_SESSION;
84 use crate::segments::TestSegments;
85 use crate::sequence::SequenceId;
86 use crate::sequence::SequentialArrayStreamExt;
87
88 fn reader() -> LayoutReaderRef {
89 let ctx = ArrayContext::empty();
90 let segments = Arc::new(TestSegments::default());
91 let (ptr, eof) = SequenceId::root().split();
92 let layout = block_on(|handle| async {
93 let session = SCAN_SESSION.clone().with_handle(handle);
94 FlatLayoutStrategy::default()
95 .write_stream(
96 ctx,
97 Arc::<TestSegments>::clone(&segments),
98 buffer![1_i32; 10]
99 .into_array()
100 .to_array_stream()
101 .sequenced(ptr),
102 eof,
103 &session,
104 )
105 .await
106 })
107 .unwrap();
108
109 layout
110 .new_reader("".into(), segments, &SCAN_SESSION)
111 .unwrap()
112 }
113
114 #[test]
115 fn test_layout_splits_flat() {
116 let reader = reader();
117
118 let splits = SplitBy::Layout
119 .splits(
120 reader.as_ref(),
121 &(0..10),
122 &[FieldMask::Exact(FieldPath::root())],
123 )
124 .unwrap();
125 assert_eq!(splits, vec![0u64, 10]);
126 }
127
128 #[test]
129 fn test_row_count_splits() {
130 let reader = reader();
131
132 let splits = SplitBy::RowCount(3)
133 .splits(
134 reader.as_ref(),
135 &(0..10),
136 &[FieldMask::Exact(FieldPath::root())],
137 )
138 .unwrap();
139 assert_eq!(splits, vec![0u64, 3, 6, 9, 10]);
140 }
141
142 #[test]
143 fn test_layout_splits_dedup() {
144 struct DupReader {
145 name: Arc<str>,
146 dtype: DType,
147 }
148
149 impl LayoutReader for DupReader {
150 fn name(&self) -> &Arc<str> {
151 &self.name
152 }
153
154 fn as_any(&self) -> &dyn Any {
155 self
156 }
157
158 fn dtype(&self) -> &DType {
159 &self.dtype
160 }
161
162 fn row_count(&self) -> u64 {
163 10
164 }
165
166 fn register_splits(
167 &self,
168 _field_mask: &[FieldMask],
169 split_range: &SplitRange,
170 splits: &mut RowSplits,
171 ) -> VortexResult<()> {
172 splits.push(split_range.row_offset() + 5);
173 splits.push(split_range.row_offset() + 5);
174 splits.push(split_range.root_row_range().end);
175 Ok(())
176 }
177
178 fn pruning_evaluation(
179 &self,
180 _: &Range<u64>,
181 _: &Expression,
182 _: Mask,
183 ) -> VortexResult<MaskFuture> {
184 unimplemented!()
185 }
186
187 fn filter_evaluation(
188 &self,
189 _: &Range<u64>,
190 _: &Expression,
191 _: MaskFuture,
192 ) -> VortexResult<MaskFuture> {
193 unimplemented!()
194 }
195
196 fn projection_evaluation(
197 &self,
198 _: &Range<u64>,
199 _: &Expression,
200 _: MaskFuture,
201 ) -> VortexResult<BoxFuture<'static, VortexResult<ArrayRef>>> {
202 unimplemented!()
203 }
204 }
205
206 let reader = DupReader {
207 name: Arc::from("dup"),
208 dtype: DType::Primitive(PType::U8, Nullability::NonNullable),
209 };
210 let splits = SplitBy::Layout
211 .splits(&reader, &(0..10), &[FieldMask::All])
212 .unwrap();
213 assert_eq!(splits, vec![0u64, 5, 10]);
214 }
215}