vortex_scan/
scan_builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::ops::Range;
5use std::sync::Arc;
6
7use futures::Stream;
8use futures::future::BoxFuture;
9use itertools::Itertools;
10use vortex_array::ArrayRef;
11use vortex_array::expr::Expression;
12use vortex_array::expr::analysis::immediate_access::immediate_scope_access;
13use vortex_array::expr::root;
14use vortex_array::iter::ArrayIterator;
15use vortex_array::iter::ArrayIteratorAdapter;
16use vortex_array::stats::StatsSet;
17use vortex_array::stream::ArrayStream;
18use vortex_array::stream::ArrayStreamAdapter;
19use vortex_buffer::Buffer;
20use vortex_dtype::DType;
21use vortex_dtype::Field;
22use vortex_dtype::FieldMask;
23use vortex_dtype::FieldName;
24use vortex_dtype::FieldPath;
25use vortex_error::VortexResult;
26use vortex_error::vortex_bail;
27use vortex_io::runtime::BlockingRuntime;
28use vortex_layout::LayoutReader;
29use vortex_layout::LayoutReaderRef;
30use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
31use vortex_metrics::VortexMetrics;
32use vortex_session::VortexSession;
33
34use crate::RepeatedScan;
35use crate::selection::Selection;
36use crate::split_by::SplitBy;
37use crate::splits::Splits;
38use crate::splits::attempt_split_ranges;
39
40/// A struct for building a scan operation.
41pub struct ScanBuilder<A> {
42    session: VortexSession,
43    layout_reader: LayoutReaderRef,
44    projection: Expression,
45    filter: Option<Expression>,
46    /// Whether the scan needs to return splits in the order they appear in the file.
47    ordered: bool,
48    /// Optionally read a subset of the rows in the file.
49    row_range: Option<Range<u64>>,
50    /// The selection mask to apply to the selected row range.
51    // TODO(joe): replace this is usage of row_id selection, see
52    selection: Selection,
53    /// How to split the file for concurrent processing.
54    split_by: SplitBy,
55    /// The number of splits to make progress on concurrently **per-thread**.
56    concurrency: usize,
57    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
58    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
59    metrics: VortexMetrics,
60    /// Should we try to prune the file (using stats) on open.
61    file_stats: Option<Arc<[StatsSet]>>,
62    /// Maximal number of rows to read (after filtering)
63    limit: Option<usize>,
64    /// The row-offset assigned to the first row of the file. Used by the `row_idx` expression,
65    /// but not by the scan [`Selection`] which remains relative.
66    row_offset: u64,
67}
68
69impl ScanBuilder<ArrayRef> {
70    pub fn new(session: VortexSession, layout_reader: Arc<dyn LayoutReader>) -> Self {
71        Self {
72            session,
73            layout_reader,
74            projection: root(),
75            filter: None,
76            ordered: true,
77            row_range: None,
78            selection: Default::default(),
79            split_by: SplitBy::Layout,
80            // We default to four tasks per worker thread, which allows for some I/O lookahead
81            // without too much impact on work-stealing.
82            concurrency: 4,
83            map_fn: Arc::new(Ok),
84            metrics: Default::default(),
85            file_stats: None,
86            limit: None,
87            row_offset: 0,
88        }
89    }
90
91    /// Returns an [`ArrayStream`] with tasks spawned onto the session's runtime handle.
92    ///
93    /// See [`ScanBuilder::into_stream`] for more details.
94    pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
95        let dtype = self.dtype()?;
96        let stream = self.into_stream()?;
97        Ok(ArrayStreamAdapter::new(dtype, stream))
98    }
99
100    /// Returns an [`ArrayIterator`] using the given blocking runtime.
101    pub fn into_array_iter<B: BlockingRuntime>(
102        self,
103        runtime: &B,
104    ) -> VortexResult<impl ArrayIterator + 'static> {
105        let stream = self.into_array_stream()?;
106        let dtype = stream.dtype().clone();
107        Ok(ArrayIteratorAdapter::new(
108            dtype,
109            runtime.block_on_stream(stream),
110        ))
111    }
112}
113
114impl<A: 'static + Send> ScanBuilder<A> {
115    pub fn with_filter(mut self, filter: Expression) -> Self {
116        self.filter = Some(filter);
117        self
118    }
119
120    pub fn with_some_filter(mut self, filter: Option<Expression>) -> Self {
121        self.filter = filter;
122        self
123    }
124
125    pub fn with_projection(mut self, projection: Expression) -> Self {
126        self.projection = projection;
127        self
128    }
129
130    pub fn with_ordered(mut self, ordered: bool) -> Self {
131        self.ordered = ordered;
132        self
133    }
134
135    pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
136        self.row_range = Some(row_range);
137        self
138    }
139
140    pub fn with_selection(mut self, selection: Selection) -> Self {
141        self.selection = selection;
142        self
143    }
144
145    pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
146        self.selection = Selection::IncludeByIndex(row_indices);
147        self
148    }
149
150    pub fn with_row_offset(mut self, row_offset: u64) -> Self {
151        self.row_offset = row_offset;
152        self
153    }
154
155    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
156        self.split_by = split_by;
157        self
158    }
159
160    /// The number of row splits to make progress on concurrently per-thread, must
161    /// be greater than 0.
162    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
163        assert!(concurrency > 0);
164        self.concurrency = concurrency;
165        self
166    }
167
168    pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
169        self.metrics = metrics;
170        self
171    }
172
173    pub fn with_limit(mut self, limit: usize) -> Self {
174        self.limit = Some(limit);
175        self
176    }
177
178    /// The [`DType`] returned by the scan, after applying the projection.
179    pub fn dtype(&self) -> VortexResult<DType> {
180        self.projection.return_dtype(self.layout_reader.dtype())
181    }
182
183    /// The session used by the scan.
184    pub fn session(&self) -> &VortexSession {
185        &self.session
186    }
187
188    /// Map each split of the scan. The function will be run on the spawned task.
189    pub fn map<B: 'static>(
190        self,
191        map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
192    ) -> ScanBuilder<B> {
193        let old_map_fn = self.map_fn;
194        ScanBuilder {
195            session: self.session,
196            layout_reader: self.layout_reader,
197            projection: self.projection,
198            filter: self.filter,
199            ordered: self.ordered,
200            row_range: self.row_range,
201            selection: self.selection,
202            split_by: self.split_by,
203            concurrency: self.concurrency,
204            metrics: self.metrics,
205            file_stats: self.file_stats,
206            limit: self.limit,
207            row_offset: self.row_offset,
208            map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
209        }
210    }
211
212    pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
213        let dtype = self.dtype()?;
214
215        if self.filter.is_some() && self.limit.is_some() {
216            vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
217        }
218
219        // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform
220        // conjunction splitting if a filter is provided.
221        let mut layout_reader = self.layout_reader;
222
223        // Enrich the layout reader to support RowIdx expressions.
224        // Note that this is applied below the filter layout reader since it can perform
225        // better over individual conjunctions.
226        layout_reader = Arc::new(RowIdxLayoutReader::new(
227            self.row_offset,
228            layout_reader,
229            self.session.clone(),
230        ));
231
232        // Normalize and simplify the expressions.
233        let projection = self.projection.optimize_recursive(layout_reader.dtype())?;
234
235        let filter = self
236            .filter
237            .map(|f| f.optimize_recursive(layout_reader.dtype()))
238            .transpose()?;
239
240        // Construct field masks and compute the row splits of the scan.
241        let (filter_mask, projection_mask) =
242            filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
243        let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
244
245        let splits =
246            if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) {
247                Splits::Ranges(ranges)
248            } else {
249                let split_range = self
250                    .row_range
251                    .clone()
252                    .unwrap_or_else(|| 0..layout_reader.row_count());
253                Splits::Natural(self.split_by.splits(
254                    layout_reader.as_ref(),
255                    &split_range,
256                    &field_mask,
257                )?)
258            };
259
260        Ok(RepeatedScan::new(
261            self.session.clone(),
262            layout_reader,
263            projection,
264            filter,
265            self.ordered,
266            self.row_range,
267            self.selection,
268            splits,
269            self.concurrency,
270            self.map_fn,
271            self.limit,
272            dtype,
273        ))
274    }
275
276    /// Constructs a task per row split of the scan, returned as a vector of futures.
277    pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
278        // The ultimate short circuit
279        if self.limit.is_some_and(|l| l == 0) {
280            return Ok(vec![]);
281        }
282
283        self.prepare()?.execute(None)
284    }
285
286    /// Returns a [`Stream`] with tasks spawned onto the session's runtime handle.
287    pub fn into_stream(
288        self,
289    ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
290        self.prepare()?.execute_stream(None)
291    }
292
293    /// Returns an [`Iterator`] using the session's runtime.
294    pub fn into_iter<B: BlockingRuntime>(
295        self,
296        runtime: &B,
297    ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
298        let stream = self.into_stream()?;
299        Ok(runtime.block_on_stream(stream))
300    }
301}
302
303/// Compute masks of field paths referenced by the projection and filter in the scan.
304///
305/// Projection and filter must be pre-simplified.
306pub(crate) fn filter_and_projection_masks(
307    projection: &Expression,
308    filter: Option<&Expression>,
309    dtype: &DType,
310) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
311    let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
312        return Ok(match filter {
313            Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
314            None => (Vec::new(), vec![FieldMask::All]),
315        });
316    };
317    let projection_mask = immediate_scope_access(projection, struct_dtype);
318    Ok(match filter {
319        None => (
320            Vec::new(),
321            projection_mask.into_iter().map(to_field_mask).collect_vec(),
322        ),
323        Some(f) => {
324            let filter_mask = immediate_scope_access(f, struct_dtype);
325            let only_projection_mask = projection_mask
326                .difference(&filter_mask)
327                .cloned()
328                .map(to_field_mask)
329                .collect_vec();
330            (
331                filter_mask.into_iter().map(to_field_mask).collect_vec(),
332                only_projection_mask,
333            )
334        }
335    })
336}
337
338fn to_field_mask(field: FieldName) -> FieldMask {
339    FieldMask::Prefix(FieldPath::from(Field::Name(field)))
340}