vortex_scan/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::ops::Range;
5use std::sync::Arc;
6
7use futures::future::BoxFuture;
8use itertools::Itertools;
9pub use multi_scan::*;
10pub use selection::*;
11pub use split_by::*;
12use tasks::{TaskContext, split_exec};
13use vortex_array::ArrayRef;
14use vortex_array::iter::ArrayIterator;
15use vortex_array::stats::StatsSet;
16use vortex_buffer::Buffer;
17use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
18use vortex_error::{VortexResult, vortex_bail};
19use vortex_expr::transform::immediate_access::immediate_scope_access;
20use vortex_expr::transform::simplify_typed;
21use vortex_expr::{ExprRef, root};
22use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
23use vortex_layout::{LayoutReader, LayoutReaderRef};
24pub use vortex_layout::{TaskExecutor, TaskExecutorExt};
25use vortex_metrics::VortexMetrics;
26
27use crate::filter::FilterExpr;
28use crate::work_queue::{TaskFactory, WorkStealingQueue};
29use crate::work_stealing_iter::{ArrayTask, WorkStealingArrayIterator};
30
31mod arrow;
32mod filter;
33mod multi_scan;
34#[cfg(feature = "tokio")]
35mod multi_thread;
36pub mod row_mask;
37mod selection;
38mod split_by;
39mod tasks;
40mod work_queue;
41mod work_stealing_iter;
42
43/// A struct for building a scan operation.
44pub struct ScanBuilder<A> {
45    layout_reader: LayoutReaderRef,
46    projection: ExprRef,
47    filter: Option<ExprRef>,
48    /// Optionally read a subset of the rows in the file.
49    row_range: Option<Range<u64>>,
50    /// The selection mask to apply to the selected row range.
51    // TODO(joe): replace this is usage of row_id selection, see
52    selection: Selection,
53    /// How to split the file f§    or concurrent processing.
54    split_by: SplitBy,
55    /// The number of splits to make progress on concurrently **per-thread**.
56    concurrency: usize,
57    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
58    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
59    metrics: VortexMetrics,
60    /// Should we try to prune the file (using stats) on open.
61    file_stats: Option<Arc<[StatsSet]>>,
62    /// Maximal number of rows to read (after filtering)
63    limit: Option<usize>,
64    /// The row-offset assigned to the first row of the file. Used by the `row_idx` expression,
65    /// but not by the scan [`Selection`] which remains relative.
66    row_offset: u64,
67}
68
69impl<A: 'static + Send> ScanBuilder<A> {
70    pub fn with_filter(mut self, filter: ExprRef) -> Self {
71        self.filter = Some(filter);
72        self
73    }
74
75    pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
76        self.filter = filter;
77        self
78    }
79
80    pub fn with_projection(mut self, projection: ExprRef) -> Self {
81        self.projection = projection;
82        self
83    }
84
85    pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
86        self.row_range = Some(row_range);
87        self
88    }
89
90    pub fn with_selection(mut self, selection: Selection) -> Self {
91        self.selection = selection;
92        self
93    }
94
95    pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
96        self.selection = Selection::IncludeByIndex(row_indices);
97        self
98    }
99
100    pub fn with_row_offset(mut self, row_offset: u64) -> Self {
101        self.row_offset = row_offset;
102        self
103    }
104
105    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
106        self.split_by = split_by;
107        self
108    }
109
110    /// The number of row splits to make progress on concurrently per-thread, must
111    /// be greater than 0.
112    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
113        assert!(concurrency > 0);
114        self.concurrency = concurrency;
115        self
116    }
117
118    pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
119        self.metrics = metrics;
120        self
121    }
122
123    pub fn with_limit(mut self, limit: usize) -> Self {
124        self.limit = Some(limit);
125        self
126    }
127
128    /// The [`DType`] returned by the scan, after applying the projection.
129    pub fn dtype(&self) -> VortexResult<DType> {
130        self.projection.return_dtype(self.layout_reader.dtype())
131    }
132
133    /// Map each split of the scan. The function will be run on the spawned task.
134    pub fn map<B: 'static>(
135        self,
136        map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
137    ) -> ScanBuilder<B> {
138        let old_map_fn = self.map_fn;
139        ScanBuilder {
140            layout_reader: self.layout_reader,
141            projection: self.projection,
142            filter: self.filter,
143            row_range: self.row_range,
144            selection: self.selection,
145            split_by: self.split_by,
146            concurrency: self.concurrency,
147            map_fn: Arc::new(move |a| map_fn(old_map_fn(a)?)),
148            metrics: self.metrics,
149            file_stats: self.file_stats,
150            limit: self.limit,
151            row_offset: self.row_offset,
152        }
153    }
154
155    /// Constructs a task per row split of the scan, returned as a vector of futures.
156    pub fn build(mut self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
157        if self.filter.is_some() && self.limit.is_some() {
158            vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
159        }
160
161        // The ultimate short circuit
162        if self.limit.is_some_and(|l| l == 0) {
163            return Ok(vec![]);
164        }
165
166        // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform
167        // conjunction splitting if a filter is provided.
168        let mut layout_reader = self.layout_reader;
169
170        // Enrich the layout reader to support RowIdx expressions.
171        // Note that this is applied below the filter layout reader since it can perform
172        // better over individual conjunctions.
173        layout_reader = Arc::new(RowIdxLayoutReader::new(self.row_offset, layout_reader));
174
175        // Normalize and simplify the expressions.
176        let projection = simplify_typed(self.projection.clone(), layout_reader.dtype())?;
177        let filter = self
178            .filter
179            .clone()
180            .map(|f| simplify_typed(f, layout_reader.dtype()))
181            .transpose()?;
182
183        // Construct field masks and compute the row splits of the scan.
184        let (filter_mask, projection_mask) =
185            filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
186        let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
187        let splits = self.split_by.splits(layout_reader.as_ref(), &field_mask)?;
188
189        let ctx = Arc::new(TaskContext {
190            row_range: self.row_range,
191            selection: self.selection,
192            filter: filter.map(|f| Arc::new(FilterExpr::new(f))),
193            reader: layout_reader,
194            projection,
195            mapper: self.map_fn,
196        });
197
198        // Create a task that executes the full scan pipeline for each split.
199        let split_tasks = splits
200            .into_iter()
201            .filter_map(|split_range| {
202                if self.limit.is_some_and(|l| l == 0) {
203                    None
204                } else {
205                    Some(split_exec(ctx.clone(), split_range, self.limit.as_mut()))
206                }
207            })
208            .try_collect()?;
209
210        Ok(split_tasks)
211    }
212
213    /// Returns a [`Stream`](futures::Stream) with tasks spawned onto the current Tokio runtime.
214    ///
215    /// The stream performs CPU work on the polling thread, with I/O operations dispatched as
216    /// per the Vortex I/O traits.
217    ///
218    /// Task concurrency is the product of the `concurrency` parameter and the number of worker
219    /// threads in the Tokio runtime.
220    #[cfg(feature = "tokio")]
221    pub fn into_tokio_stream(
222        self,
223    ) -> VortexResult<impl futures::Stream<Item = VortexResult<A>> + Send + 'static> {
224        use futures::StreamExt;
225        use vortex_error::vortex_err;
226
227        let handle = tokio::runtime::Handle::current();
228        let num_workers = handle.metrics().num_workers();
229        let concurrency = self.concurrency * num_workers;
230        Ok(futures::stream::iter(self.build()?)
231            .map(move |task| handle.spawn(task))
232            .buffered(concurrency)
233            .map(|task| {
234                task.map_err(|e| vortex_err!("Failed to join task: {e}"))
235                    .flatten()
236            })
237            .filter_map(|chunk| async move { chunk.transpose() }))
238    }
239}
240
241impl ScanBuilder<ArrayRef> {
242    pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
243        Self {
244            layout_reader,
245            projection: root(),
246            filter: None,
247            row_range: None,
248            selection: Default::default(),
249            split_by: SplitBy::Layout,
250            // We default to four tasks per worker thread, which allows for some I/O lookahead
251            // without too much impact on work-stealing.
252            concurrency: 4,
253            map_fn: Arc::new(Ok),
254            metrics: Default::default(),
255            file_stats: None,
256            limit: None,
257            row_offset: 0,
258        }
259    }
260
261    /// Returns a thread-safe [`ArrayIterator`] that can be cloned and passed
262    /// to other threads to make progress on the same scan concurrently.
263    ///
264    /// Within each thread, the array chunks will be emitted in the original order they are within
265    /// the scan. Between threads, the order is not guaranteed.
266    pub fn into_array_iter(self) -> VortexResult<impl ArrayIterator + Send + Clone + 'static> {
267        let dtype = self.dtype()?;
268        let concurrency = self.concurrency;
269        let tasks = self.build()?;
270        let queue = WorkStealingQueue::new([Box::new(move || Ok(tasks)) as TaskFactory<ArrayTask>]);
271
272        Ok(WorkStealingArrayIterator::new(
273            queue,
274            Arc::new(dtype),
275            concurrency,
276        ))
277    }
278
279    /// Returns an [`ArrayStream`] with tasks spawned onto the current Tokio runtime.
280    ///
281    /// See [`ScanBuilder::into_tokio_stream`] for more details.
282    ///
283    /// [`ArrayStream`]: vortex_array::stream::ArrayStreamAdapter
284    #[cfg(feature = "tokio")]
285    pub fn into_tokio_array_stream(
286        self,
287    ) -> VortexResult<impl vortex_array::stream::ArrayStream + Send + 'static> {
288        let dtype = self.dtype()?;
289        let stream = self.into_tokio_stream()?;
290        Ok(vortex_array::stream::ArrayStreamAdapter::new(dtype, stream))
291    }
292}
293
294/// Compute masks of field paths referenced by the projection and filter in the scan.
295///
296/// Projection and filter must be pre-simplified.
297fn filter_and_projection_masks(
298    projection: &ExprRef,
299    filter: Option<&ExprRef>,
300    dtype: &DType,
301) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
302    let Some(struct_dtype) = dtype.as_struct() else {
303        return Ok(match filter {
304            Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
305            None => (Vec::new(), vec![FieldMask::All]),
306        });
307    };
308    let projection_mask = immediate_scope_access(projection, struct_dtype);
309    Ok(match filter {
310        None => (
311            Vec::new(),
312            projection_mask.into_iter().map(to_field_mask).collect_vec(),
313        ),
314        Some(f) => {
315            let filter_mask = immediate_scope_access(f, struct_dtype);
316            let only_projection_mask = projection_mask
317                .difference(&filter_mask)
318                .cloned()
319                .map(to_field_mask)
320                .collect_vec();
321            (
322                filter_mask.into_iter().map(to_field_mask).collect_vec(),
323                only_projection_mask,
324            )
325        }
326    })
327}
328
329fn to_field_mask(field: FieldName) -> FieldMask {
330    FieldMask::Prefix(FieldPath::from(Field::Name(field)))
331}
332
333#[cfg(test)]
334mod tests;