vortex_scan/
scan_builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::ops::Range;
5use std::sync::Arc;
6
7use futures::Stream;
8use futures::future::BoxFuture;
9use itertools::Itertools;
10use vortex_array::ArrayRef;
11use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
12use vortex_array::stats::StatsSet;
13use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
14use vortex_buffer::Buffer;
15use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
16use vortex_error::{VortexResult, vortex_bail};
17use vortex_expr::transform::immediate_access::immediate_scope_access;
18use vortex_expr::transform::simplify_typed;
19use vortex_expr::{ExprRef, root};
20use vortex_io::runtime::{BlockingRuntime, Handle};
21use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
22use vortex_layout::{LayoutReader, LayoutReaderRef};
23use vortex_metrics::VortexMetrics;
24
25use crate::RepeatedScan;
26use crate::selection::Selection;
27use crate::split_by::SplitBy;
28use crate::splits::{Splits, attempt_split_ranges};
29
30/// A struct for building a scan operation.
31pub struct ScanBuilder<A> {
32    handle: Option<Handle>,
33    layout_reader: LayoutReaderRef,
34    projection: ExprRef,
35    filter: Option<ExprRef>,
36    /// Whether the scan needs to return splits in the order they appear in the file.
37    ordered: bool,
38    /// Optionally read a subset of the rows in the file.
39    row_range: Option<Range<u64>>,
40    /// The selection mask to apply to the selected row range.
41    // TODO(joe): replace this is usage of row_id selection, see
42    selection: Selection,
43    /// How to split the file for concurrent processing.
44    split_by: SplitBy,
45    /// The number of splits to make progress on concurrently **per-thread**.
46    concurrency: usize,
47    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
48    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
49    metrics: VortexMetrics,
50    /// Should we try to prune the file (using stats) on open.
51    file_stats: Option<Arc<[StatsSet]>>,
52    /// Maximal number of rows to read (after filtering)
53    limit: Option<usize>,
54    /// The row-offset assigned to the first row of the file. Used by the `row_idx` expression,
55    /// but not by the scan [`Selection`] which remains relative.
56    row_offset: u64,
57}
58
59impl ScanBuilder<ArrayRef> {
60    pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
61        Self {
62            handle: Handle::find(),
63            layout_reader,
64            projection: root(),
65            filter: None,
66            ordered: true,
67            row_range: None,
68            selection: Default::default(),
69            split_by: SplitBy::Layout,
70            // We default to four tasks per worker thread, which allows for some I/O lookahead
71            // without too much impact on work-stealing.
72            concurrency: 4,
73            map_fn: Arc::new(Ok),
74            metrics: Default::default(),
75            file_stats: None,
76            limit: None,
77            row_offset: 0,
78        }
79    }
80
81    /// Returns an [`ArrayStream`] with tasks spawned onto the scan's [`Handle`].
82    ///
83    /// See [`ScanBuilder::into_stream`] for more details.
84    pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
85        let dtype = self.dtype()?;
86        let stream = self.into_stream()?;
87        Ok(ArrayStreamAdapter::new(dtype, stream))
88    }
89
90    /// Returns an [`ArrayIterator`] using the given blocking runtime.
91    pub fn into_array_iter<B: BlockingRuntime>(
92        self,
93        runtime: &B,
94    ) -> VortexResult<impl ArrayIterator + 'static> {
95        let stream = self.with_handle(runtime.handle()).into_array_stream()?;
96        let dtype = stream.dtype().clone();
97        Ok(ArrayIteratorAdapter::new(
98            dtype,
99            runtime.block_on_stream(|_| stream),
100        ))
101    }
102}
103
104impl<A: 'static + Send> ScanBuilder<A> {
105    /// Provide a handle to the runtime on which to spawn tasks.
106    pub fn with_handle(mut self, handle: Handle) -> Self {
107        self.handle = Some(handle);
108        self
109    }
110
111    pub fn with_filter(mut self, filter: ExprRef) -> Self {
112        self.filter = Some(filter);
113        self
114    }
115
116    pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
117        self.filter = filter;
118        self
119    }
120
121    pub fn with_projection(mut self, projection: ExprRef) -> Self {
122        self.projection = projection;
123        self
124    }
125
126    pub fn with_ordered(mut self, ordered: bool) -> Self {
127        self.ordered = ordered;
128        self
129    }
130
131    pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
132        self.row_range = Some(row_range);
133        self
134    }
135
136    pub fn with_selection(mut self, selection: Selection) -> Self {
137        self.selection = selection;
138        self
139    }
140
141    pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
142        self.selection = Selection::IncludeByIndex(row_indices);
143        self
144    }
145
146    pub fn with_row_offset(mut self, row_offset: u64) -> Self {
147        self.row_offset = row_offset;
148        self
149    }
150
151    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
152        self.split_by = split_by;
153        self
154    }
155
156    /// The number of row splits to make progress on concurrently per-thread, must
157    /// be greater than 0.
158    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
159        assert!(concurrency > 0);
160        self.concurrency = concurrency;
161        self
162    }
163
164    pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
165        self.metrics = metrics;
166        self
167    }
168
169    pub fn with_limit(mut self, limit: usize) -> Self {
170        self.limit = Some(limit);
171        self
172    }
173
174    /// The [`DType`] returned by the scan, after applying the projection.
175    pub fn dtype(&self) -> VortexResult<DType> {
176        self.projection.return_dtype(self.layout_reader.dtype())
177    }
178
179    /// Map each split of the scan. The function will be run on the spawned task.
180    pub fn map<B: 'static>(
181        self,
182        map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
183    ) -> ScanBuilder<B> {
184        let old_map_fn = self.map_fn;
185        ScanBuilder {
186            handle: self.handle,
187            layout_reader: self.layout_reader,
188            projection: self.projection,
189            filter: self.filter,
190            ordered: self.ordered,
191            row_range: self.row_range,
192            selection: self.selection,
193            split_by: self.split_by,
194            concurrency: self.concurrency,
195            metrics: self.metrics,
196            file_stats: self.file_stats,
197            limit: self.limit,
198            row_offset: self.row_offset,
199            map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
200        }
201    }
202
203    pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
204        let dtype = self.dtype()?;
205
206        let Some(handle) = self.handle else {
207            vortex_bail!(
208                "A runtime handle must be provided to the scan builder using `with_handle`"
209            );
210        };
211        if self.filter.is_some() && self.limit.is_some() {
212            vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
213        }
214
215        // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform
216        // conjunction splitting if a filter is provided.
217        let mut layout_reader = self.layout_reader;
218
219        // Enrich the layout reader to support RowIdx expressions.
220        // Note that this is applied below the filter layout reader since it can perform
221        // better over individual conjunctions.
222        layout_reader = Arc::new(RowIdxLayoutReader::new(self.row_offset, layout_reader));
223
224        // Normalize and simplify the expressions.
225        let projection = simplify_typed(self.projection, layout_reader.dtype())?;
226        let filter = self
227            .filter
228            .map(|f| simplify_typed(f, layout_reader.dtype()))
229            .transpose()?;
230
231        // Construct field masks and compute the row splits of the scan.
232        let (filter_mask, projection_mask) =
233            filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
234        let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
235
236        let splits =
237            if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) {
238                Splits::Ranges(ranges)
239            } else {
240                Splits::Natural(self.split_by.splits(layout_reader.as_ref(), &field_mask)?)
241            };
242
243        Ok(RepeatedScan::new(
244            handle,
245            layout_reader,
246            projection,
247            filter,
248            self.ordered,
249            self.row_range,
250            self.selection,
251            splits,
252            self.concurrency,
253            self.map_fn,
254            self.limit,
255            dtype,
256        ))
257    }
258
259    /// Constructs a task per row split of the scan, returned as a vector of futures.
260    pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
261        // The ultimate short circuit
262        if self.limit.is_some_and(|l| l == 0) {
263            return Ok(vec![]);
264        }
265
266        self.prepare()?.execute(None)
267    }
268
269    /// Returns a [`Stream`] with tasks spawned onto the scan's [`Handle`].
270    pub fn into_stream(
271        self,
272    ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
273        self.prepare()?.execute_stream(None)
274    }
275
276    /// Returns an [`Iterator`] using the given blocking runtime.
277    pub fn into_iter<B: BlockingRuntime>(
278        self,
279        runtime: &B,
280    ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
281        let stream = self.with_handle(runtime.handle()).into_stream()?;
282        Ok(runtime.block_on_stream(|_| stream))
283    }
284}
285
286/// Compute masks of field paths referenced by the projection and filter in the scan.
287///
288/// Projection and filter must be pre-simplified.
289fn filter_and_projection_masks(
290    projection: &ExprRef,
291    filter: Option<&ExprRef>,
292    dtype: &DType,
293) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
294    let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
295        return Ok(match filter {
296            Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
297            None => (Vec::new(), vec![FieldMask::All]),
298        });
299    };
300    let projection_mask = immediate_scope_access(projection, struct_dtype);
301    Ok(match filter {
302        None => (
303            Vec::new(),
304            projection_mask.into_iter().map(to_field_mask).collect_vec(),
305        ),
306        Some(f) => {
307            let filter_mask = immediate_scope_access(f, struct_dtype);
308            let only_projection_mask = projection_mask
309                .difference(&filter_mask)
310                .cloned()
311                .map(to_field_mask)
312                .collect_vec();
313            (
314                filter_mask.into_iter().map(to_field_mask).collect_vec(),
315                only_projection_mask,
316            )
317        }
318    })
319}
320
321fn to_field_mask(field: FieldName) -> FieldMask {
322    FieldMask::Prefix(FieldPath::from(Field::Name(field)))
323}