vortex_scan/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::collections::BTreeSet;
5use std::ops::Range;
6use std::sync::Arc;
7use std::{cmp, iter};
8
9use futures::Stream;
10use futures::future::BoxFuture;
11use itertools::Itertools;
12pub use selection::*;
13pub use split_by::*;
14use tasks::{TaskContext, split_exec};
15use vortex_array::ArrayRef;
16use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
17use vortex_array::stats::StatsSet;
18use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
19use vortex_buffer::Buffer;
20use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
21use vortex_error::{VortexResult, vortex_bail};
22use vortex_expr::transform::immediate_access::immediate_scope_access;
23use vortex_expr::transform::simplify_typed;
24use vortex_expr::{ExprRef, root};
25use vortex_io::runtime::{BlockingRuntime, Handle};
26use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
27use vortex_layout::{LayoutReader, LayoutReaderRef};
28use vortex_metrics::VortexMetrics;
29
30use crate::filter::FilterExpr;
31
32pub mod arrow;
33mod filter;
34pub mod row_mask;
35mod selection;
36mod split_by;
37mod tasks;
38
39/// A struct for building a scan operation.
40pub struct ScanBuilder<A> {
41    handle: Option<Handle>,
42    layout_reader: LayoutReaderRef,
43    projection: ExprRef,
44    filter: Option<ExprRef>,
45    /// Whether the scan needs to return splits in the order they appear in the file.
46    ordered: bool,
47    /// Optionally read a subset of the rows in the file.
48    row_range: Option<Range<u64>>,
49    /// The selection mask to apply to the selected row range.
50    // TODO(joe): replace this is usage of row_id selection, see
51    selection: Selection,
52    /// How to split the file for concurrent processing.
53    split_by: SplitBy,
54    /// The number of splits to make progress on concurrently **per-thread**.
55    concurrency: usize,
56    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
57    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
58    metrics: VortexMetrics,
59    /// Should we try to prune the file (using stats) on open.
60    file_stats: Option<Arc<[StatsSet]>>,
61    /// Maximal number of rows to read (after filtering)
62    limit: Option<usize>,
63    /// The row-offset assigned to the first row of the file. Used by the `row_idx` expression,
64    /// but not by the scan [`Selection`] which remains relative.
65    row_offset: u64,
66}
67
68impl<A: 'static + Send> ScanBuilder<A> {
69    /// Provide a handle to the runtime on which to spawn tasks.
70    pub fn with_handle(mut self, handle: Handle) -> Self {
71        self.handle = Some(handle);
72        self
73    }
74
75    pub fn with_filter(mut self, filter: ExprRef) -> Self {
76        self.filter = Some(filter);
77        self
78    }
79
80    pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
81        self.filter = filter;
82        self
83    }
84
85    pub fn with_projection(mut self, projection: ExprRef) -> Self {
86        self.projection = projection;
87        self
88    }
89
90    pub fn with_ordered(mut self, ordered: bool) -> Self {
91        self.ordered = ordered;
92        self
93    }
94
95    pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
96        self.row_range = Some(row_range);
97        self
98    }
99
100    pub fn with_selection(mut self, selection: Selection) -> Self {
101        self.selection = selection;
102        self
103    }
104
105    pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
106        self.selection = Selection::IncludeByIndex(row_indices);
107        self
108    }
109
110    pub fn with_row_offset(mut self, row_offset: u64) -> Self {
111        self.row_offset = row_offset;
112        self
113    }
114
115    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
116        self.split_by = split_by;
117        self
118    }
119
120    /// The number of row splits to make progress on concurrently per-thread, must
121    /// be greater than 0.
122    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
123        assert!(concurrency > 0);
124        self.concurrency = concurrency;
125        self
126    }
127
128    pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
129        self.metrics = metrics;
130        self
131    }
132
133    pub fn with_limit(mut self, limit: usize) -> Self {
134        self.limit = Some(limit);
135        self
136    }
137
138    /// The [`DType`] returned by the scan, after applying the projection.
139    pub fn dtype(&self) -> VortexResult<DType> {
140        self.projection.return_dtype(self.layout_reader.dtype())
141    }
142
143    /// Map each split of the scan. The function will be run on the spawned task.
144    pub fn map<B: 'static>(
145        self,
146        map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
147    ) -> ScanBuilder<B> {
148        let old_map_fn = self.map_fn;
149        ScanBuilder {
150            handle: self.handle,
151            layout_reader: self.layout_reader,
152            projection: self.projection,
153            filter: self.filter,
154            ordered: self.ordered,
155            row_range: self.row_range,
156            selection: self.selection,
157            split_by: self.split_by,
158            concurrency: self.concurrency,
159            metrics: self.metrics,
160            file_stats: self.file_stats,
161            limit: self.limit,
162            row_offset: self.row_offset,
163            map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
164        }
165    }
166
167    pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
168        let dtype = self.dtype()?;
169
170        let Some(handle) = self.handle else {
171            vortex_bail!(
172                "A runtime handle must be provided to the scan builder using `with_handle`"
173            );
174        };
175        if self.filter.is_some() && self.limit.is_some() {
176            vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
177        }
178
179        // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform
180        // conjunction splitting if a filter is provided.
181        let mut layout_reader = self.layout_reader;
182
183        // Enrich the layout reader to support RowIdx expressions.
184        // Note that this is applied below the filter layout reader since it can perform
185        // better over individual conjunctions.
186        layout_reader = Arc::new(RowIdxLayoutReader::new(self.row_offset, layout_reader));
187
188        // Normalize and simplify the expressions.
189        let projection = simplify_typed(self.projection, layout_reader.dtype())?;
190        let filter = self
191            .filter
192            .map(|f| simplify_typed(f, layout_reader.dtype()))
193            .transpose()?;
194
195        // Construct field masks and compute the row splits of the scan.
196        let (filter_mask, projection_mask) =
197            filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
198        let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
199        let splits = self.split_by.splits(layout_reader.as_ref(), &field_mask)?;
200        Ok(RepeatedScan {
201            handle,
202            layout_reader,
203            projection,
204            filter,
205            ordered: self.ordered,
206            row_range: self.row_range,
207            selection: self.selection,
208            splits,
209            concurrency: self.concurrency,
210            map_fn: self.map_fn,
211            limit: self.limit,
212            dtype,
213        })
214    }
215
216    /// Constructs a task per row split of the scan, returned as a vector of futures.
217    pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
218        // The ultimate short circuit
219        if self.limit.is_some_and(|l| l == 0) {
220            return Ok(vec![]);
221        }
222
223        self.prepare()?.execute(None)
224    }
225
226    /// Returns a [`Stream`] with tasks spawned onto the scan's [`Handle`].
227    pub fn into_stream(
228        self,
229    ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
230        self.prepare()?.execute_stream(None)
231    }
232
233    /// Returns an [`Iterator`] using the given blocking runtime.
234    pub fn into_iter<B: BlockingRuntime>(
235        self,
236        runtime: &B,
237    ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
238        let stream = self.with_handle(runtime.handle()).into_stream()?;
239        Ok(runtime.block_on_stream(|_| stream))
240    }
241}
242
243impl ScanBuilder<ArrayRef> {
244    pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
245        Self {
246            handle: Handle::find(),
247            layout_reader,
248            projection: root(),
249            filter: None,
250            ordered: true,
251            row_range: None,
252            selection: Default::default(),
253            split_by: SplitBy::Layout,
254            // We default to four tasks per worker thread, which allows for some I/O lookahead
255            // without too much impact on work-stealing.
256            concurrency: 4,
257            map_fn: Arc::new(Ok),
258            metrics: Default::default(),
259            file_stats: None,
260            limit: None,
261            row_offset: 0,
262        }
263    }
264
265    /// Returns an [`ArrayStream`] with tasks spawned onto the scan's [`Handle`].
266    ///
267    /// See [`ScanBuilder::into_stream`] for more details.
268    pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
269        let dtype = self.dtype()?;
270        let stream = self.into_stream()?;
271        Ok(ArrayStreamAdapter::new(dtype, stream))
272    }
273
274    /// Returns an [`ArrayIterator`] using the given blocking runtime.
275    pub fn into_array_iter<B: BlockingRuntime>(
276        self,
277        runtime: &B,
278    ) -> VortexResult<impl ArrayIterator + 'static> {
279        let stream = self.with_handle(runtime.handle()).into_array_stream()?;
280        let dtype = stream.dtype().clone();
281        Ok(ArrayIteratorAdapter::new(
282            dtype,
283            runtime.block_on_stream(|_| stream),
284        ))
285    }
286}
287
288/// Compute masks of field paths referenced by the projection and filter in the scan.
289///
290/// Projection and filter must be pre-simplified.
291fn filter_and_projection_masks(
292    projection: &ExprRef,
293    filter: Option<&ExprRef>,
294    dtype: &DType,
295) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
296    let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
297        return Ok(match filter {
298            Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
299            None => (Vec::new(), vec![FieldMask::All]),
300        });
301    };
302    let projection_mask = immediate_scope_access(projection, struct_dtype);
303    Ok(match filter {
304        None => (
305            Vec::new(),
306            projection_mask.into_iter().map(to_field_mask).collect_vec(),
307        ),
308        Some(f) => {
309            let filter_mask = immediate_scope_access(f, struct_dtype);
310            let only_projection_mask = projection_mask
311                .difference(&filter_mask)
312                .cloned()
313                .map(to_field_mask)
314                .collect_vec();
315            (
316                filter_mask.into_iter().map(to_field_mask).collect_vec(),
317                only_projection_mask,
318            )
319        }
320    })
321}
322
323fn to_field_mask(field: FieldName) -> FieldMask {
324    FieldMask::Prefix(FieldPath::from(Field::Name(field)))
325}
326
327/// A projected subset (by indices, range, and filter) of rows from a Vortex data source.
328///
329/// The method of this struct enable, possibly concurrent, scanning of multiple row ranges of this
330/// data source.
331///
332/// See also: [ScanBuilder].
333pub struct RepeatedScan<A: 'static + Send> {
334    handle: Handle,
335    layout_reader: LayoutReaderRef,
336    projection: ExprRef,
337    filter: Option<ExprRef>,
338    ordered: bool,
339    /// Optionally read a subset of the rows in the file.
340    row_range: Option<Range<u64>>,
341    /// The selection mask to apply to the selected row range.
342    selection: Selection,
343    /// The natural splits of the file.
344    splits: BTreeSet<u64>,
345    /// The number of splits to make progress on concurrently **per-thread**.
346    concurrency: usize,
347    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
348    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
349    /// Maximal number of rows to read (after filtering)
350    limit: Option<usize>,
351    /// The dtype of the projected arrays.
352    dtype: DType,
353}
354
355impl<A: 'static + Send> RepeatedScan<A> {
356    pub fn execute(
357        &self,
358        row_range: Option<Range<u64>>,
359    ) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
360        let ctx = Arc::new(TaskContext {
361            selection: self.selection.clone(),
362            filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
363            reader: self.layout_reader.clone(),
364            projection: self.projection.clone(),
365            mapper: self.map_fn.clone(),
366        });
367
368        let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
369        let splits_iter: Box<dyn Iterator<Item = _>> = match row_range {
370            None => Box::new(self.splits.iter().copied()),
371            Some(range) => {
372                if range.start > range.end {
373                    return Ok(Vec::new());
374                }
375                Box::new(
376                    iter::once(range.start)
377                        .chain(self.splits.range(range.clone()).copied())
378                        .chain(iter::once(range.end)),
379                )
380            }
381        };
382
383        // Create a task that executes the full scan operator for each split.
384        let mut limit = self.limit;
385        let split_tasks = splits_iter
386            .tuple_windows()
387            .filter_map(|(start, end)| {
388                if limit.is_some_and(|l| l == 0) || start >= end {
389                    None
390                } else {
391                    Some(split_exec(ctx.clone(), start..end, limit.as_mut()))
392                }
393            })
394            .try_collect()?;
395
396        Ok(split_tasks)
397    }
398
399    pub fn execute_stream(
400        &self,
401        row_range: Option<Range<u64>>,
402    ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
403        use futures::StreamExt;
404        let num_workers = std::thread::available_parallelism()
405            .map(|n| n.get())
406            .unwrap_or(1);
407        let concurrency = self.concurrency * num_workers;
408        let handle = self.handle.clone();
409
410        let stream =
411            futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task));
412
413        let stream = if self.ordered {
414            stream.buffered(concurrency).boxed()
415        } else {
416            stream.buffer_unordered(concurrency).boxed()
417        };
418
419        Ok(stream.filter_map(|chunk| async move { chunk.transpose() }))
420    }
421}
422
423impl RepeatedScan<ArrayRef> {
424    pub fn execute_array_iter<B: BlockingRuntime>(
425        &self,
426        row_range: Option<Range<u64>>,
427        runtime: &B,
428    ) -> VortexResult<impl ArrayIterator + 'static> {
429        let dtype = self.dtype.clone();
430        let stream = self.execute_stream(row_range)?;
431        let iter = runtime.block_on_stream(move |_h| stream);
432        Ok(ArrayIteratorAdapter::new(dtype, iter))
433    }
434
435    pub fn execute_array_stream(
436        &self,
437        row_range: Option<Range<u64>>,
438    ) -> VortexResult<impl ArrayStream + Send + 'static> {
439        let dtype = self.dtype.clone();
440        let stream = self.execute_stream(row_range)?;
441        Ok(ArrayStreamAdapter::new(dtype, stream))
442    }
443}
444
445fn intersect_ranges(left: Option<&Range<u64>>, right: Option<Range<u64>>) -> Option<Range<u64>> {
446    match (left, right) {
447        (None, None) => None,
448        (None, Some(r)) => Some(r),
449        (Some(l), None) => Some(l.clone()),
450        (Some(l), Some(r)) => Some(cmp::max(l.start, r.start)..cmp::min(l.end, r.end)),
451    }
452}