1use std::collections::BTreeSet;
5use std::ops::Range;
6use std::sync::Arc;
7use std::{cmp, iter};
8
9use futures::Stream;
10use futures::future::BoxFuture;
11use itertools::Itertools;
12pub use selection::*;
13pub use split_by::*;
14use tasks::{TaskContext, split_exec};
15use vortex_array::ArrayRef;
16use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
17use vortex_array::stats::StatsSet;
18use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
19use vortex_buffer::Buffer;
20use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
21use vortex_error::{VortexResult, vortex_bail};
22use vortex_expr::transform::immediate_access::immediate_scope_access;
23use vortex_expr::transform::simplify_typed;
24use vortex_expr::{ExprRef, root};
25use vortex_io::runtime::{BlockingRuntime, Handle};
26use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
27use vortex_layout::{LayoutReader, LayoutReaderRef};
28use vortex_metrics::VortexMetrics;
29
30use crate::filter::FilterExpr;
31
32pub mod arrow;
33mod filter;
34pub mod row_mask;
35mod selection;
36mod split_by;
37mod tasks;
38
39pub struct ScanBuilder<A> {
41 handle: Option<Handle>,
42 layout_reader: LayoutReaderRef,
43 projection: ExprRef,
44 filter: Option<ExprRef>,
45 ordered: bool,
47 row_range: Option<Range<u64>>,
49 selection: Selection,
52 split_by: SplitBy,
54 concurrency: usize,
56 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
58 metrics: VortexMetrics,
59 file_stats: Option<Arc<[StatsSet]>>,
61 limit: Option<usize>,
63 row_offset: u64,
66}
67
68impl<A: 'static + Send> ScanBuilder<A> {
69 pub fn with_handle(mut self, handle: Handle) -> Self {
71 self.handle = Some(handle);
72 self
73 }
74
75 pub fn with_filter(mut self, filter: ExprRef) -> Self {
76 self.filter = Some(filter);
77 self
78 }
79
80 pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
81 self.filter = filter;
82 self
83 }
84
85 pub fn with_projection(mut self, projection: ExprRef) -> Self {
86 self.projection = projection;
87 self
88 }
89
90 pub fn with_ordered(mut self, ordered: bool) -> Self {
91 self.ordered = ordered;
92 self
93 }
94
95 pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
96 self.row_range = Some(row_range);
97 self
98 }
99
100 pub fn with_selection(mut self, selection: Selection) -> Self {
101 self.selection = selection;
102 self
103 }
104
105 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
106 self.selection = Selection::IncludeByIndex(row_indices);
107 self
108 }
109
110 pub fn with_row_offset(mut self, row_offset: u64) -> Self {
111 self.row_offset = row_offset;
112 self
113 }
114
115 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
116 self.split_by = split_by;
117 self
118 }
119
120 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
123 assert!(concurrency > 0);
124 self.concurrency = concurrency;
125 self
126 }
127
128 pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
129 self.metrics = metrics;
130 self
131 }
132
133 pub fn with_limit(mut self, limit: usize) -> Self {
134 self.limit = Some(limit);
135 self
136 }
137
138 pub fn dtype(&self) -> VortexResult<DType> {
140 self.projection.return_dtype(self.layout_reader.dtype())
141 }
142
143 pub fn map<B: 'static>(
145 self,
146 map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
147 ) -> ScanBuilder<B> {
148 let old_map_fn = self.map_fn;
149 ScanBuilder {
150 handle: self.handle,
151 layout_reader: self.layout_reader,
152 projection: self.projection,
153 filter: self.filter,
154 ordered: self.ordered,
155 row_range: self.row_range,
156 selection: self.selection,
157 split_by: self.split_by,
158 concurrency: self.concurrency,
159 metrics: self.metrics,
160 file_stats: self.file_stats,
161 limit: self.limit,
162 row_offset: self.row_offset,
163 map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
164 }
165 }
166
167 pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
168 let dtype = self.dtype()?;
169
170 let Some(handle) = self.handle else {
171 vortex_bail!(
172 "A runtime handle must be provided to the scan builder using `with_handle`"
173 );
174 };
175 if self.filter.is_some() && self.limit.is_some() {
176 vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
177 }
178
179 let mut layout_reader = self.layout_reader;
182
183 layout_reader = Arc::new(RowIdxLayoutReader::new(self.row_offset, layout_reader));
187
188 let projection = simplify_typed(self.projection, layout_reader.dtype())?;
190 let filter = self
191 .filter
192 .map(|f| simplify_typed(f, layout_reader.dtype()))
193 .transpose()?;
194
195 let (filter_mask, projection_mask) =
197 filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
198 let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
199 let splits = self.split_by.splits(layout_reader.as_ref(), &field_mask)?;
200 Ok(RepeatedScan {
201 handle,
202 layout_reader,
203 projection,
204 filter,
205 ordered: self.ordered,
206 row_range: self.row_range,
207 selection: self.selection,
208 splits,
209 concurrency: self.concurrency,
210 map_fn: self.map_fn,
211 limit: self.limit,
212 dtype,
213 })
214 }
215
216 pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
218 if self.limit.is_some_and(|l| l == 0) {
220 return Ok(vec![]);
221 }
222
223 self.prepare()?.execute(None)
224 }
225
226 pub fn into_stream(
228 self,
229 ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
230 self.prepare()?.execute_stream(None)
231 }
232
233 pub fn into_iter<B: BlockingRuntime>(
235 self,
236 runtime: &B,
237 ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
238 let stream = self.with_handle(runtime.handle()).into_stream()?;
239 Ok(runtime.block_on_stream(|_| stream))
240 }
241}
242
243impl ScanBuilder<ArrayRef> {
244 pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
245 Self {
246 handle: Handle::find(),
247 layout_reader,
248 projection: root(),
249 filter: None,
250 ordered: true,
251 row_range: None,
252 selection: Default::default(),
253 split_by: SplitBy::Layout,
254 concurrency: 4,
257 map_fn: Arc::new(Ok),
258 metrics: Default::default(),
259 file_stats: None,
260 limit: None,
261 row_offset: 0,
262 }
263 }
264
265 pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
269 let dtype = self.dtype()?;
270 let stream = self.into_stream()?;
271 Ok(ArrayStreamAdapter::new(dtype, stream))
272 }
273
274 pub fn into_array_iter<B: BlockingRuntime>(
276 self,
277 runtime: &B,
278 ) -> VortexResult<impl ArrayIterator + 'static> {
279 let stream = self.with_handle(runtime.handle()).into_array_stream()?;
280 let dtype = stream.dtype().clone();
281 Ok(ArrayIteratorAdapter::new(
282 dtype,
283 runtime.block_on_stream(|_| stream),
284 ))
285 }
286}
287
288fn filter_and_projection_masks(
292 projection: &ExprRef,
293 filter: Option<&ExprRef>,
294 dtype: &DType,
295) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
296 let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
297 return Ok(match filter {
298 Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
299 None => (Vec::new(), vec![FieldMask::All]),
300 });
301 };
302 let projection_mask = immediate_scope_access(projection, struct_dtype);
303 Ok(match filter {
304 None => (
305 Vec::new(),
306 projection_mask.into_iter().map(to_field_mask).collect_vec(),
307 ),
308 Some(f) => {
309 let filter_mask = immediate_scope_access(f, struct_dtype);
310 let only_projection_mask = projection_mask
311 .difference(&filter_mask)
312 .cloned()
313 .map(to_field_mask)
314 .collect_vec();
315 (
316 filter_mask.into_iter().map(to_field_mask).collect_vec(),
317 only_projection_mask,
318 )
319 }
320 })
321}
322
323fn to_field_mask(field: FieldName) -> FieldMask {
324 FieldMask::Prefix(FieldPath::from(Field::Name(field)))
325}
326
327pub struct RepeatedScan<A: 'static + Send> {
334 handle: Handle,
335 layout_reader: LayoutReaderRef,
336 projection: ExprRef,
337 filter: Option<ExprRef>,
338 ordered: bool,
339 row_range: Option<Range<u64>>,
341 selection: Selection,
343 splits: BTreeSet<u64>,
345 concurrency: usize,
347 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
349 limit: Option<usize>,
351 dtype: DType,
353}
354
355impl<A: 'static + Send> RepeatedScan<A> {
356 pub fn execute(
357 &self,
358 row_range: Option<Range<u64>>,
359 ) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
360 let ctx = Arc::new(TaskContext {
361 selection: self.selection.clone(),
362 filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
363 reader: self.layout_reader.clone(),
364 projection: self.projection.clone(),
365 mapper: self.map_fn.clone(),
366 });
367
368 let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
369 let splits_iter: Box<dyn Iterator<Item = _>> = match row_range {
370 None => Box::new(self.splits.iter().copied()),
371 Some(range) => {
372 if range.start > range.end {
373 return Ok(Vec::new());
374 }
375 Box::new(
376 iter::once(range.start)
377 .chain(self.splits.range(range.clone()).copied())
378 .chain(iter::once(range.end)),
379 )
380 }
381 };
382
383 let mut limit = self.limit;
385 let split_tasks = splits_iter
386 .tuple_windows()
387 .filter_map(|(start, end)| {
388 if limit.is_some_and(|l| l == 0) || start >= end {
389 None
390 } else {
391 Some(split_exec(ctx.clone(), start..end, limit.as_mut()))
392 }
393 })
394 .try_collect()?;
395
396 Ok(split_tasks)
397 }
398
399 pub fn execute_stream(
400 &self,
401 row_range: Option<Range<u64>>,
402 ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
403 use futures::StreamExt;
404 let num_workers = std::thread::available_parallelism()
405 .map(|n| n.get())
406 .unwrap_or(1);
407 let concurrency = self.concurrency * num_workers;
408 let handle = self.handle.clone();
409
410 let stream =
411 futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task));
412
413 let stream = if self.ordered {
414 stream.buffered(concurrency).boxed()
415 } else {
416 stream.buffer_unordered(concurrency).boxed()
417 };
418
419 Ok(stream.filter_map(|chunk| async move { chunk.transpose() }))
420 }
421}
422
423impl RepeatedScan<ArrayRef> {
424 pub fn execute_array_iter<B: BlockingRuntime>(
425 &self,
426 row_range: Option<Range<u64>>,
427 runtime: &B,
428 ) -> VortexResult<impl ArrayIterator + 'static> {
429 let dtype = self.dtype.clone();
430 let stream = self.execute_stream(row_range)?;
431 let iter = runtime.block_on_stream(move |_h| stream);
432 Ok(ArrayIteratorAdapter::new(dtype, iter))
433 }
434
435 pub fn execute_array_stream(
436 &self,
437 row_range: Option<Range<u64>>,
438 ) -> VortexResult<impl ArrayStream + Send + 'static> {
439 let dtype = self.dtype.clone();
440 let stream = self.execute_stream(row_range)?;
441 Ok(ArrayStreamAdapter::new(dtype, stream))
442 }
443}
444
445fn intersect_ranges(left: Option<&Range<u64>>, right: Option<Range<u64>>) -> Option<Range<u64>> {
446 match (left, right) {
447 (None, None) => None,
448 (None, Some(r)) => Some(r),
449 (Some(l), None) => Some(l.clone()),
450 (Some(l), Some(r)) => Some(cmp::max(l.start, r.start)..cmp::min(l.end, r.end)),
451 }
452}