1use std::ops::Range;
5use std::sync::Arc;
6
7use futures::Stream;
8use futures::future::BoxFuture;
9use itertools::Itertools;
10use vortex_array::ArrayRef;
11use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
12use vortex_array::stats::StatsSet;
13use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
14use vortex_buffer::Buffer;
15use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
16use vortex_error::{VortexResult, vortex_bail};
17use vortex_expr::transform::immediate_access::immediate_scope_access;
18use vortex_expr::transform::simplify_typed;
19use vortex_expr::{ExprRef, root};
20use vortex_io::runtime::{BlockingRuntime, Handle};
21use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
22use vortex_layout::{LayoutReader, LayoutReaderRef};
23use vortex_metrics::VortexMetrics;
24
25use crate::RepeatedScan;
26use crate::selection::Selection;
27use crate::split_by::SplitBy;
28use crate::splits::{Splits, attempt_split_ranges};
29
30pub struct ScanBuilder<A> {
32 handle: Option<Handle>,
33 layout_reader: LayoutReaderRef,
34 projection: ExprRef,
35 filter: Option<ExprRef>,
36 ordered: bool,
38 row_range: Option<Range<u64>>,
40 selection: Selection,
43 split_by: SplitBy,
45 concurrency: usize,
47 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
49 metrics: VortexMetrics,
50 file_stats: Option<Arc<[StatsSet]>>,
52 limit: Option<usize>,
54 row_offset: u64,
57}
58
59impl ScanBuilder<ArrayRef> {
60 pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
61 Self {
62 handle: Handle::find(),
63 layout_reader,
64 projection: root(),
65 filter: None,
66 ordered: true,
67 row_range: None,
68 selection: Default::default(),
69 split_by: SplitBy::Layout,
70 concurrency: 4,
73 map_fn: Arc::new(Ok),
74 metrics: Default::default(),
75 file_stats: None,
76 limit: None,
77 row_offset: 0,
78 }
79 }
80
81 pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
85 let dtype = self.dtype()?;
86 let stream = self.into_stream()?;
87 Ok(ArrayStreamAdapter::new(dtype, stream))
88 }
89
90 pub fn into_array_iter<B: BlockingRuntime>(
92 self,
93 runtime: &B,
94 ) -> VortexResult<impl ArrayIterator + 'static> {
95 let stream = self.with_handle(runtime.handle()).into_array_stream()?;
96 let dtype = stream.dtype().clone();
97 Ok(ArrayIteratorAdapter::new(
98 dtype,
99 runtime.block_on_stream(|_| stream),
100 ))
101 }
102}
103
104impl<A: 'static + Send> ScanBuilder<A> {
105 pub fn with_handle(mut self, handle: Handle) -> Self {
107 self.handle = Some(handle);
108 self
109 }
110
111 pub fn with_filter(mut self, filter: ExprRef) -> Self {
112 self.filter = Some(filter);
113 self
114 }
115
116 pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
117 self.filter = filter;
118 self
119 }
120
121 pub fn with_projection(mut self, projection: ExprRef) -> Self {
122 self.projection = projection;
123 self
124 }
125
126 pub fn with_ordered(mut self, ordered: bool) -> Self {
127 self.ordered = ordered;
128 self
129 }
130
131 pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
132 self.row_range = Some(row_range);
133 self
134 }
135
136 pub fn with_selection(mut self, selection: Selection) -> Self {
137 self.selection = selection;
138 self
139 }
140
141 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
142 self.selection = Selection::IncludeByIndex(row_indices);
143 self
144 }
145
146 pub fn with_row_offset(mut self, row_offset: u64) -> Self {
147 self.row_offset = row_offset;
148 self
149 }
150
151 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
152 self.split_by = split_by;
153 self
154 }
155
156 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
159 assert!(concurrency > 0);
160 self.concurrency = concurrency;
161 self
162 }
163
164 pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
165 self.metrics = metrics;
166 self
167 }
168
169 pub fn with_limit(mut self, limit: usize) -> Self {
170 self.limit = Some(limit);
171 self
172 }
173
174 pub fn dtype(&self) -> VortexResult<DType> {
176 self.projection.return_dtype(self.layout_reader.dtype())
177 }
178
179 pub fn map<B: 'static>(
181 self,
182 map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
183 ) -> ScanBuilder<B> {
184 let old_map_fn = self.map_fn;
185 ScanBuilder {
186 handle: self.handle,
187 layout_reader: self.layout_reader,
188 projection: self.projection,
189 filter: self.filter,
190 ordered: self.ordered,
191 row_range: self.row_range,
192 selection: self.selection,
193 split_by: self.split_by,
194 concurrency: self.concurrency,
195 metrics: self.metrics,
196 file_stats: self.file_stats,
197 limit: self.limit,
198 row_offset: self.row_offset,
199 map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
200 }
201 }
202
203 pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
204 let dtype = self.dtype()?;
205
206 let Some(handle) = self.handle else {
207 vortex_bail!(
208 "A runtime handle must be provided to the scan builder using `with_handle`"
209 );
210 };
211 if self.filter.is_some() && self.limit.is_some() {
212 vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
213 }
214
215 let mut layout_reader = self.layout_reader;
218
219 layout_reader = Arc::new(RowIdxLayoutReader::new(self.row_offset, layout_reader));
223
224 let projection = simplify_typed(self.projection, layout_reader.dtype())?;
226 let filter = self
227 .filter
228 .map(|f| simplify_typed(f, layout_reader.dtype()))
229 .transpose()?;
230
231 let (filter_mask, projection_mask) =
233 filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
234 let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
235
236 let splits =
237 if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) {
238 Splits::Ranges(ranges)
239 } else {
240 Splits::Natural(self.split_by.splits(layout_reader.as_ref(), &field_mask)?)
241 };
242
243 Ok(RepeatedScan::new(
244 handle,
245 layout_reader,
246 projection,
247 filter,
248 self.ordered,
249 self.row_range,
250 self.selection,
251 splits,
252 self.concurrency,
253 self.map_fn,
254 self.limit,
255 dtype,
256 ))
257 }
258
259 pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
261 if self.limit.is_some_and(|l| l == 0) {
263 return Ok(vec![]);
264 }
265
266 self.prepare()?.execute(None)
267 }
268
269 pub fn into_stream(
271 self,
272 ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
273 self.prepare()?.execute_stream(None)
274 }
275
276 pub fn into_iter<B: BlockingRuntime>(
278 self,
279 runtime: &B,
280 ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
281 let stream = self.with_handle(runtime.handle()).into_stream()?;
282 Ok(runtime.block_on_stream(|_| stream))
283 }
284}
285
286fn filter_and_projection_masks(
290 projection: &ExprRef,
291 filter: Option<&ExprRef>,
292 dtype: &DType,
293) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
294 let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
295 return Ok(match filter {
296 Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
297 None => (Vec::new(), vec![FieldMask::All]),
298 });
299 };
300 let projection_mask = immediate_scope_access(projection, struct_dtype);
301 Ok(match filter {
302 None => (
303 Vec::new(),
304 projection_mask.into_iter().map(to_field_mask).collect_vec(),
305 ),
306 Some(f) => {
307 let filter_mask = immediate_scope_access(f, struct_dtype);
308 let only_projection_mask = projection_mask
309 .difference(&filter_mask)
310 .cloned()
311 .map(to_field_mask)
312 .collect_vec();
313 (
314 filter_mask.into_iter().map(to_field_mask).collect_vec(),
315 only_projection_mask,
316 )
317 }
318 })
319}
320
321fn to_field_mask(field: FieldName) -> FieldMask {
322 FieldMask::Prefix(FieldPath::from(Field::Name(field)))
323}