vortex_scan/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::ops::Range;
5use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use arrow_schema::SchemaRef;
9use futures::executor::ThreadPool;
10use futures::future::BoxFuture;
11use futures::stream::FuturesOrdered;
12use futures::task::SpawnExt;
13use futures::{Stream, StreamExt, stream};
14use itertools::Itertools;
15pub use multi_scan::*;
16pub use selection::*;
17pub use split_by::*;
18use tasks::{TaskContext, split_exec};
19use vortex_array::iter::ArrayIterator;
20use vortex_array::stats::StatsSet;
21use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
22use vortex_array::{ArrayRef, ToCanonical};
23use vortex_buffer::Buffer;
24use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
25use vortex_error::{VortexResult, vortex_bail, vortex_err};
26use vortex_expr::transform::immediate_access::immediate_scope_access;
27use vortex_expr::transform::simplify_typed::simplify_typed;
28use vortex_expr::{ExprRef, root};
29use vortex_layout::layouts::filter::FilterLayoutReader;
30use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
31use vortex_layout::{LayoutReader, LayoutReaderRef};
32pub use vortex_layout::{TaskExecutor, TaskExecutorExt};
33use vortex_metrics::VortexMetrics;
34
35use crate::work_queue::{TaskFactory, WorkStealingQueue};
36use crate::work_stealing_iter::{ArrayTask, WorkStealingArrayIterator};
37
38mod multi_scan;
39#[cfg(feature = "rayon")]
40pub mod rayon;
41pub mod row_mask;
42mod selection;
43mod split_by;
44mod tasks;
45mod work_queue;
46mod work_stealing_iter;
47
48/// A struct for building a scan operation.
49pub struct ScanBuilder<A> {
50    layout_reader: LayoutReaderRef,
51    projection: ExprRef,
52    filter: Option<ExprRef>,
53    /// Optionally read a subset of the rows in the file.
54    row_range: Option<Range<u64>>,
55    /// The selection mask to apply to the selected row range.
56    // TODO(joe): replace this is usage of row_id selection, see
57    selection: Selection,
58    /// How to split the file for concurrent processing.
59    split_by: SplitBy,
60    /// The number of splits to make progress on concurrently.
61    concurrency: usize,
62    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
63    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
64    /// The executor used to spawn each split task.
65    executor: Option<Arc<dyn TaskExecutor>>,
66    metrics: VortexMetrics,
67    /// Should we try to prune the file (using stats) on open.
68    file_stats: Option<Arc<[StatsSet]>>,
69    /// Maximal number of rows to read (after filtering)
70    limit: Option<usize>,
71    /// The row-offset assigned to the first row of the file. Used by the `row_idx` expression,
72    /// but not by the scan [`Selection`] which remains relative.
73    row_offset: u64,
74}
75
76impl<A: 'static + Send> ScanBuilder<A> {
77    pub fn with_filter(mut self, filter: ExprRef) -> Self {
78        self.filter = Some(filter);
79        self
80    }
81
82    pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
83        self.filter = filter;
84        self
85    }
86
87    pub fn with_projection(mut self, projection: ExprRef) -> Self {
88        self.projection = projection;
89        self
90    }
91
92    pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
93        self.row_range = Some(row_range);
94        self
95    }
96
97    pub fn with_selection(mut self, selection: Selection) -> Self {
98        self.selection = selection;
99        self
100    }
101
102    pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
103        self.selection = Selection::IncludeByIndex(row_indices);
104        self
105    }
106
107    pub fn with_row_offset(mut self, row_offset: u64) -> Self {
108        self.row_offset = row_offset;
109        self
110    }
111
112    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
113        self.split_by = split_by;
114        self
115    }
116
117    /// The number of row splits to make progress on concurrently, must be greater than 0.
118    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
119        assert!(concurrency > 0);
120        self.concurrency = concurrency;
121        self
122    }
123
124    /// Spawn each CPU task onto the given Tokio runtime.
125    ///
126    /// Note that this is an odd use of the Tokio runtime. Typically, it is used predominantly
127    /// for I/O bound tasks.
128    #[cfg(feature = "tokio")]
129    pub fn with_tokio_executor(mut self, handle: tokio::runtime::Handle) -> Self {
130        self.executor = Some(Arc::new(handle));
131        self
132    }
133
134    pub fn with_executor(mut self, executor: Arc<dyn TaskExecutor>) -> Self {
135        self.executor = Some(executor);
136        self
137    }
138
139    pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
140        self.metrics = metrics;
141        self
142    }
143
144    pub fn with_limit(mut self, limit: usize) -> Self {
145        self.limit = Some(limit);
146        self
147    }
148
149    /// The [`DType`] returned by the scan, after applying the projection.
150    pub fn dtype(&self) -> VortexResult<DType> {
151        self.projection.return_dtype(self.layout_reader.dtype())
152    }
153
154    /// Map each split of the scan. The function will be run on the spawned task.
155    pub fn map<B: 'static>(
156        self,
157        map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
158    ) -> ScanBuilder<B> {
159        let old_map_fn = self.map_fn;
160        ScanBuilder {
161            layout_reader: self.layout_reader,
162            projection: self.projection,
163            filter: self.filter,
164            row_range: self.row_range,
165            selection: self.selection,
166            split_by: self.split_by,
167            concurrency: self.concurrency,
168            map_fn: Arc::new(move |a| map_fn(old_map_fn(a)?)),
169            executor: self.executor,
170            metrics: self.metrics,
171            file_stats: self.file_stats,
172            limit: self.limit,
173            row_offset: self.row_offset,
174        }
175    }
176
177    /// Constructs a task per row split of the scan, returned as a vector of futures.
178    pub fn build(mut self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
179        if self.filter.is_some() && self.limit.is_some() {
180            vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
181        }
182
183        // The ultimate short circuit
184        if self.limit.is_some_and(|l| l == 0) {
185            return Ok(vec![]);
186        }
187
188        // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform
189        // conjunction splitting if a filter is provided.
190        let mut layout_reader = self.layout_reader;
191
192        // Enrich the layout reader to support RowIdx expressions.
193        // Note that this is applied below the filter layout reader since it can perform
194        // better over individual conjunctions.
195        layout_reader = Arc::new(RowIdxLayoutReader::new(self.row_offset, layout_reader));
196
197        if self.filter.is_some() {
198            layout_reader = Arc::new(FilterLayoutReader::new(layout_reader));
199        }
200
201        // Normalize and simplify the expressions.
202        let projection = simplify_typed(self.projection.clone(), layout_reader.dtype())?;
203        let filter = self
204            .filter
205            .clone()
206            .map(|f| simplify_typed(f, layout_reader.dtype()))
207            .transpose()?;
208
209        // Construct field masks and compute the row splits of the scan.
210        let (filter_mask, projection_mask) =
211            filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
212        let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
213        let splits = self.split_by.splits(layout_reader.as_ref(), &field_mask)?;
214
215        // Create a task that executes the full scan pipeline for each split.
216        let split_tasks = splits
217            .into_iter()
218            .filter_map(|split_range| {
219                let ctx = Arc::new(TaskContext {
220                    row_range: self.row_range.clone(),
221                    selection: self.selection.clone(),
222                    filter: self.filter.clone(),
223                    reader: layout_reader.clone(),
224                    projection: projection.clone(),
225                    mapper: self.map_fn.clone(),
226                    task_executor: self.executor.clone(),
227                });
228
229                if self.limit.is_some_and(|l| l == 0) {
230                    None
231                } else {
232                    Some(split_exec(ctx, split_range, self.limit.as_mut()))
233                }
234            })
235            .try_collect()?;
236
237        Ok(split_tasks)
238    }
239
240    pub fn into_stream(self) -> VortexResult<impl Stream<Item = VortexResult<A>> + 'static> {
241        let concurrency = self.concurrency;
242        let split_tasks = self.build()?;
243        Ok(stream::iter(split_tasks)
244            .buffered(concurrency)
245            .filter_map(|r| async move { r.transpose() }))
246    }
247
248    /// Returns a blocking iterator over the scan, where tasks perform CPU work on the provided
249    /// thread pool.
250    pub fn into_thread_pool_iter(
251        self,
252        pool: ThreadPool,
253    ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + Send + 'static> {
254        let futures: FuturesOrdered<_> = self
255            .build()?
256            .into_iter()
257            .map(|task| {
258                let fut = pool.spawn_with_handle(task);
259                async move {
260                    fut.map_err(|e| vortex_err!("Failed to spawn task onto thread pool {e}"))?
261                        .await
262                }
263            })
264            .collect();
265
266        Ok(futures::executor::block_on_stream(futures).filter_map_ok(|v| v))
267    }
268}
269
270impl ScanBuilder<ArrayRef> {
271    pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
272        Self {
273            layout_reader,
274            projection: root(),
275            filter: None,
276            row_range: None,
277            selection: Default::default(),
278            split_by: SplitBy::Layout,
279            // We default to two tasks per worker thread, which allows for some I/O lookahead
280            // without too much impact on work-stealing.
281            concurrency: 2,
282            map_fn: Arc::new(Ok),
283            executor: None,
284            metrics: Default::default(),
285            file_stats: None,
286            limit: None,
287            row_offset: 0,
288        }
289    }
290
291    /// Map the scan into a stream of Arrow [`RecordBatch`].
292    pub fn map_to_record_batch(self, schema: SchemaRef) -> ScanBuilder<RecordBatch> {
293        self.map(move |array| {
294            let st = array.to_struct()?;
295            st.into_record_batch_with_schema(schema.as_ref())
296        })
297    }
298
299    /// Returns a stream over the scan with each CPU task polled on the current thread as per
300    /// the behaviour of [`futures::stream::Buffered`].
301    pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + 'static> {
302        let concurrency = self.concurrency;
303        let dtype = self.dtype()?;
304        let split_tasks = self.build()?;
305        Ok(ArrayStreamAdapter::new(
306            dtype,
307            stream::iter(split_tasks)
308                .buffered(concurrency)
309                .filter_map(|r| async move { r.transpose() }),
310        ))
311    }
312
313    /// Returns a thread-safe [`ArrayIterator`] that can be cloned and passed
314    /// to other threads to make progress on the same scan concurrently.
315    ///
316    /// Within each thread, the array chunks will be emitted in the original order they are within
317    /// the scan. Between threads, the order is not guaranteed.
318    pub fn into_array_iter(self) -> VortexResult<impl ArrayIterator + Send + Clone + 'static> {
319        let dtype = self.dtype()?;
320        let concurrency = self.concurrency;
321        let tasks = self.build()?;
322        let queue = WorkStealingQueue::new([Box::new(move || Ok(tasks)) as TaskFactory<ArrayTask>]);
323
324        Ok(WorkStealingArrayIterator::new(
325            queue,
326            Arc::new(dtype),
327            concurrency,
328        ))
329    }
330}
331
332/// Compute masks of field paths referenced by the projection and filter in the scan.
333///
334/// Projection and filter must be pre-simplified.
335fn filter_and_projection_masks(
336    projection: &ExprRef,
337    filter: Option<&ExprRef>,
338    dtype: &DType,
339) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
340    let Some(struct_dtype) = dtype.as_struct() else {
341        return Ok(match filter {
342            Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
343            None => (Vec::new(), vec![FieldMask::All]),
344        });
345    };
346    let projection_mask = immediate_scope_access(projection, struct_dtype);
347    Ok(match filter {
348        None => (
349            Vec::new(),
350            projection_mask.into_iter().map(to_field_mask).collect_vec(),
351        ),
352        Some(f) => {
353            let filter_mask = immediate_scope_access(f, struct_dtype);
354            let only_projection_mask = projection_mask
355                .difference(&filter_mask)
356                .cloned()
357                .map(to_field_mask)
358                .collect_vec();
359            (
360                filter_mask.into_iter().map(to_field_mask).collect_vec(),
361                only_projection_mask,
362            )
363        }
364    })
365}
366
367fn to_field_mask(field: FieldName) -> FieldMask {
368    FieldMask::Prefix(FieldPath::from(Field::Name(field)))
369}