vortex_scan/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp;
5use std::ops::Range;
6use std::sync::Arc;
7
8use futures::future::BoxFuture;
9use itertools::Itertools;
10pub use multi_scan::*;
11pub use selection::*;
12pub use split_by::*;
13use tasks::{TaskContext, split_exec};
14use vortex_array::ArrayRef;
15use vortex_array::iter::ArrayIterator;
16use vortex_array::stats::StatsSet;
17use vortex_buffer::Buffer;
18use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
19use vortex_error::{VortexResult, vortex_bail};
20use vortex_expr::transform::immediate_access::immediate_scope_access;
21use vortex_expr::transform::simplify_typed;
22use vortex_expr::{ExprRef, root};
23use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
24use vortex_layout::{LayoutReader, LayoutReaderRef};
25pub use vortex_layout::{TaskExecutor, TaskExecutorExt};
26use vortex_metrics::VortexMetrics;
27
28use crate::filter::FilterExpr;
29use crate::work_queue::{TaskFactory, WorkStealingQueue};
30use crate::work_stealing_iter::{ArrayTask, WorkStealingArrayIterator};
31
32mod arrow;
33mod filter;
34mod multi_scan;
35#[cfg(feature = "tokio")]
36mod multi_thread;
37pub mod row_mask;
38mod selection;
39mod split_by;
40mod tasks;
41mod work_queue;
42mod work_stealing_iter;
43
44/// A struct for building a scan operation.
45pub struct ScanBuilder<A> {
46    layout_reader: LayoutReaderRef,
47    projection: ExprRef,
48    filter: Option<ExprRef>,
49    /// Optionally read a subset of the rows in the file.
50    row_range: Option<Range<u64>>,
51    /// The selection mask to apply to the selected row range.
52    // TODO(joe): replace this is usage of row_id selection, see
53    selection: Selection,
54    /// How to split the file for concurrent processing.
55    split_by: SplitBy,
56    /// The number of splits to make progress on concurrently **per-thread**.
57    concurrency: usize,
58    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
59    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
60    metrics: VortexMetrics,
61    /// Should we try to prune the file (using stats) on open.
62    file_stats: Option<Arc<[StatsSet]>>,
63    /// Maximal number of rows to read (after filtering)
64    limit: Option<usize>,
65    /// The row-offset assigned to the first row of the file. Used by the `row_idx` expression,
66    /// but not by the scan [`Selection`] which remains relative.
67    row_offset: u64,
68}
69
70impl<A: 'static + Send> ScanBuilder<A> {
71    pub fn with_filter(mut self, filter: ExprRef) -> Self {
72        self.filter = Some(filter);
73        self
74    }
75
76    pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
77        self.filter = filter;
78        self
79    }
80
81    pub fn with_projection(mut self, projection: ExprRef) -> Self {
82        self.projection = projection;
83        self
84    }
85
86    pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
87        self.row_range = Some(row_range);
88        self
89    }
90
91    pub fn with_selection(mut self, selection: Selection) -> Self {
92        self.selection = selection;
93        self
94    }
95
96    pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
97        self.selection = Selection::IncludeByIndex(row_indices);
98        self
99    }
100
101    pub fn with_row_offset(mut self, row_offset: u64) -> Self {
102        self.row_offset = row_offset;
103        self
104    }
105
106    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
107        self.split_by = split_by;
108        self
109    }
110
111    /// The number of row splits to make progress on concurrently per-thread, must
112    /// be greater than 0.
113    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
114        assert!(concurrency > 0);
115        self.concurrency = concurrency;
116        self
117    }
118
119    pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
120        self.metrics = metrics;
121        self
122    }
123
124    pub fn with_limit(mut self, limit: usize) -> Self {
125        self.limit = Some(limit);
126        self
127    }
128
129    /// The [`DType`] returned by the scan, after applying the projection.
130    pub fn dtype(&self) -> VortexResult<DType> {
131        self.projection.return_dtype(self.layout_reader.dtype())
132    }
133
134    /// Map each split of the scan. The function will be run on the spawned task.
135    pub fn map<B: 'static>(
136        self,
137        map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
138    ) -> ScanBuilder<B> {
139        let old_map_fn = self.map_fn;
140        ScanBuilder {
141            layout_reader: self.layout_reader,
142            projection: self.projection,
143            filter: self.filter,
144            row_range: self.row_range,
145            selection: self.selection,
146            split_by: self.split_by,
147            concurrency: self.concurrency,
148            map_fn: Arc::new(move |a| map_fn(old_map_fn(a)?)),
149            metrics: self.metrics,
150            file_stats: self.file_stats,
151            limit: self.limit,
152            row_offset: self.row_offset,
153        }
154    }
155
156    pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
157        if self.filter.is_some() && self.limit.is_some() {
158            vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
159        }
160
161        let dtype = self.dtype()?;
162
163        // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform
164        // conjunction splitting if a filter is provided.
165        let mut layout_reader = self.layout_reader;
166
167        // Enrich the layout reader to support RowIdx expressions.
168        // Note that this is applied below the filter layout reader since it can perform
169        // better over individual conjunctions.
170        layout_reader = Arc::new(RowIdxLayoutReader::new(self.row_offset, layout_reader));
171
172        // Normalize and simplify the expressions.
173        let projection = simplify_typed(self.projection, layout_reader.dtype())?;
174        let filter = self
175            .filter
176            .map(|f| simplify_typed(f, layout_reader.dtype()))
177            .transpose()?;
178
179        // Construct field masks and compute the row splits of the scan.
180        let (filter_mask, projection_mask) =
181            filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
182        let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
183        let splits = self.split_by.splits(layout_reader.as_ref(), &field_mask)?;
184        Ok(RepeatedScan {
185            layout_reader,
186            projection,
187            filter,
188            row_range: self.row_range,
189            selection: self.selection,
190            splits,
191            concurrency: self.concurrency,
192            map_fn: self.map_fn,
193            limit: self.limit,
194            dtype,
195        })
196    }
197
198    /// Constructs a task per row split of the scan, returned as a vector of futures.
199    pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
200        // The ultimate short circuit
201        if self.limit.is_some_and(|l| l == 0) {
202            return Ok(vec![]);
203        }
204
205        let row_range = self.row_range.clone();
206        self.prepare()?.execute(row_range)
207    }
208
209    /// Returns a [`Stream`](futures::Stream) with tasks spawned onto the current Tokio runtime.
210    ///
211    /// The stream performs CPU work on the polling thread, with I/O operations dispatched as
212    /// per the Vortex I/O traits.
213    ///
214    /// Task concurrency is the product of the `concurrency` parameter and the number of worker
215    /// threads in the Tokio runtime.
216    #[cfg(feature = "tokio")]
217    pub fn into_tokio_stream(
218        self,
219    ) -> VortexResult<impl futures::Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
220        let row_range = self.row_range.clone();
221        self.prepare()?.execute_tokio_stream(row_range)
222    }
223}
224
225impl ScanBuilder<ArrayRef> {
226    pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
227        Self {
228            layout_reader,
229            projection: root(),
230            filter: None,
231            row_range: None,
232            selection: Default::default(),
233            split_by: SplitBy::Layout,
234            // We default to four tasks per worker thread, which allows for some I/O lookahead
235            // without too much impact on work-stealing.
236            concurrency: 4,
237            map_fn: Arc::new(Ok),
238            metrics: Default::default(),
239            file_stats: None,
240            limit: None,
241            row_offset: 0,
242        }
243    }
244
245    /// Returns a thread-safe [`ArrayIterator`] that can be cloned and passed
246    /// to other threads to make progress on the same scan concurrently.
247    ///
248    /// Within each thread, the array chunks will be emitted in the original order they are within
249    /// the scan. Between threads, the order is not guaranteed.
250    pub fn into_array_iter(self) -> VortexResult<impl ArrayIterator + Send + Clone + 'static> {
251        let dtype = self.dtype()?;
252        let concurrency = self.concurrency;
253        let tasks = self.build()?;
254        let queue = WorkStealingQueue::new([Box::new(move || Ok(tasks)) as TaskFactory<ArrayTask>]);
255
256        Ok(WorkStealingArrayIterator::new(
257            queue,
258            Arc::new(dtype),
259            concurrency,
260        ))
261    }
262
263    /// Returns an `ArrayStream` with tasks spawned onto the current Tokio runtime.
264    ///
265    /// See [`ScanBuilder::into_tokio_stream`] for more details.
266    ///
267    /// [`ArrayStream`]: vortex_array::stream::ArrayStreamAdapter
268    #[cfg(feature = "tokio")]
269    pub fn into_tokio_array_stream(
270        self,
271    ) -> VortexResult<impl vortex_array::stream::ArrayStream + Send + 'static> {
272        let dtype = self.dtype()?;
273        let stream = self.into_tokio_stream()?;
274        Ok(vortex_array::stream::ArrayStreamAdapter::new(dtype, stream))
275    }
276}
277
278/// Compute masks of field paths referenced by the projection and filter in the scan.
279///
280/// Projection and filter must be pre-simplified.
281fn filter_and_projection_masks(
282    projection: &ExprRef,
283    filter: Option<&ExprRef>,
284    dtype: &DType,
285) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
286    let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
287        return Ok(match filter {
288            Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
289            None => (Vec::new(), vec![FieldMask::All]),
290        });
291    };
292    let projection_mask = immediate_scope_access(projection, struct_dtype);
293    Ok(match filter {
294        None => (
295            Vec::new(),
296            projection_mask.into_iter().map(to_field_mask).collect_vec(),
297        ),
298        Some(f) => {
299            let filter_mask = immediate_scope_access(f, struct_dtype);
300            let only_projection_mask = projection_mask
301                .difference(&filter_mask)
302                .cloned()
303                .map(to_field_mask)
304                .collect_vec();
305            (
306                filter_mask.into_iter().map(to_field_mask).collect_vec(),
307                only_projection_mask,
308            )
309        }
310    })
311}
312
313fn to_field_mask(field: FieldName) -> FieldMask {
314    FieldMask::Prefix(FieldPath::from(Field::Name(field)))
315}
316
317/// A projected subset (by indices, range, and filter) of rows from a Vortex data source.
318///
319/// The method of this struct enable, possibly concurrent, scanning of multiple row ranges of this
320/// data source.
321///
322/// See also: [ScanBuilder].
323pub struct RepeatedScan<A: 'static + Send> {
324    layout_reader: LayoutReaderRef,
325    projection: ExprRef,
326    filter: Option<ExprRef>,
327    /// Optionally read a subset of the rows in the file.
328    row_range: Option<Range<u64>>,
329    /// The selection mask to apply to the selected row range.
330    selection: Selection,
331    /// The natural splits of the file.
332    splits: Vec<Range<u64>>,
333    /// The number of splits to make progress on concurrently **per-thread**.
334    concurrency: usize,
335    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
336    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
337    /// Maximal number of rows to read (after filtering)
338    limit: Option<usize>,
339    /// The dtype of the projected arrays.
340    dtype: DType,
341}
342
343impl<A: 'static + Send> RepeatedScan<A> {
344    pub fn execute(
345        &self,
346        row_range: Option<Range<u64>>,
347    ) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
348        let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
349
350        let ctx = Arc::new(TaskContext {
351            row_range,
352            selection: self.selection.clone(),
353            filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
354            reader: self.layout_reader.clone(),
355            projection: self.projection.clone(),
356            mapper: self.map_fn.clone(),
357        });
358
359        // Create a task that executes the full scan pipeline for each split.
360        let mut limit = self.limit;
361        let split_tasks = self
362            .splits
363            .iter()
364            .filter_map(|split_range| {
365                if limit.is_some_and(|l| l == 0) {
366                    None
367                } else {
368                    Some(split_exec(ctx.clone(), split_range.clone(), limit.as_mut()))
369                }
370            })
371            .try_collect()?;
372
373        Ok(split_tasks)
374    }
375
376    #[cfg(feature = "tokio")]
377    pub fn execute_tokio_stream(
378        &self,
379        row_range: Option<Range<u64>>,
380    ) -> VortexResult<impl futures::Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
381        let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
382
383        use futures::StreamExt;
384        use vortex_error::vortex_err;
385
386        let handle = tokio::runtime::Handle::current();
387        let num_workers = handle.metrics().num_workers();
388        let concurrency = self.concurrency * num_workers;
389        Ok(futures::stream::iter(self.execute(row_range)?)
390            .map(move |task| handle.spawn(task))
391            .buffered(concurrency)
392            .map(|task| {
393                task.map_err(|e| vortex_err!("Failed to join task: {e}"))
394                    .flatten()
395            })
396            .filter_map(|chunk| async move { chunk.transpose() }))
397    }
398}
399
400impl RepeatedScan<ArrayRef> {
401    pub fn execute_array_iter(
402        &self,
403        row_range: Option<Range<u64>>,
404    ) -> VortexResult<impl ArrayIterator + Send + Clone + 'static> {
405        let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
406
407        let dtype = self.dtype.clone();
408        let tasks = self.execute(row_range)?;
409        let queue = WorkStealingQueue::new([Box::new(move || Ok(tasks)) as TaskFactory<ArrayTask>]);
410
411        Ok(WorkStealingArrayIterator::new(
412            queue,
413            Arc::new(dtype),
414            self.concurrency,
415        ))
416    }
417
418    #[cfg(feature = "tokio")]
419    pub fn execute_tokio_array_stream(
420        &self,
421        row_range: Option<Range<u64>>,
422    ) -> VortexResult<impl vortex_array::stream::ArrayStream + Send + 'static> {
423        let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
424
425        let dtype = self.dtype.clone();
426        let stream = self.execute_tokio_stream(row_range)?;
427        Ok(vortex_array::stream::ArrayStreamAdapter::new(dtype, stream))
428    }
429}
430
431fn intersect_ranges(left: Option<&Range<u64>>, right: Option<Range<u64>>) -> Option<Range<u64>> {
432    match (left, right) {
433        (None, None) => None,
434        (None, Some(r)) => Some(r),
435        (Some(l), None) => Some(l.clone()),
436        (Some(l), Some(r)) => Some(cmp::max(l.start, r.start)..cmp::min(l.end, r.end)),
437    }
438}
439
440#[cfg(test)]
441mod tests;