vortex_scan/
scan_builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::ops::Range;
5use std::sync::Arc;
6
7use futures::Stream;
8use futures::future::BoxFuture;
9use itertools::Itertools;
10use vortex_array::ArrayRef;
11use vortex_array::expr::transform::immediate_access::immediate_scope_access;
12use vortex_array::expr::transform::simplify_typed;
13use vortex_array::expr::{Expression, root};
14use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
15use vortex_array::stats::StatsSet;
16use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
17use vortex_buffer::Buffer;
18use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
19use vortex_error::{VortexResult, vortex_bail};
20use vortex_io::runtime::BlockingRuntime;
21use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
22use vortex_layout::{LayoutReader, LayoutReaderRef};
23use vortex_metrics::VortexMetrics;
24use vortex_session::VortexSession;
25
26use crate::RepeatedScan;
27use crate::selection::Selection;
28use crate::split_by::SplitBy;
29use crate::splits::{Splits, attempt_split_ranges};
30
31/// A struct for building a scan operation.
32pub struct ScanBuilder<A> {
33    session: VortexSession,
34    layout_reader: LayoutReaderRef,
35    projection: Expression,
36    filter: Option<Expression>,
37    /// Whether the scan needs to return splits in the order they appear in the file.
38    ordered: bool,
39    /// Optionally read a subset of the rows in the file.
40    row_range: Option<Range<u64>>,
41    /// The selection mask to apply to the selected row range.
42    // TODO(joe): replace this is usage of row_id selection, see
43    selection: Selection,
44    /// How to split the file for concurrent processing.
45    split_by: SplitBy,
46    /// The number of splits to make progress on concurrently **per-thread**.
47    concurrency: usize,
48    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
49    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
50    metrics: VortexMetrics,
51    /// Should we try to prune the file (using stats) on open.
52    file_stats: Option<Arc<[StatsSet]>>,
53    /// Maximal number of rows to read (after filtering)
54    limit: Option<usize>,
55    /// The row-offset assigned to the first row of the file. Used by the `row_idx` expression,
56    /// but not by the scan [`Selection`] which remains relative.
57    row_offset: u64,
58}
59
60impl ScanBuilder<ArrayRef> {
61    pub fn new(session: VortexSession, layout_reader: Arc<dyn LayoutReader>) -> Self {
62        Self {
63            session,
64            layout_reader,
65            projection: root(),
66            filter: None,
67            ordered: true,
68            row_range: None,
69            selection: Default::default(),
70            split_by: SplitBy::Layout,
71            // We default to four tasks per worker thread, which allows for some I/O lookahead
72            // without too much impact on work-stealing.
73            concurrency: 4,
74            map_fn: Arc::new(Ok),
75            metrics: Default::default(),
76            file_stats: None,
77            limit: None,
78            row_offset: 0,
79        }
80    }
81
82    /// Returns an [`ArrayStream`] with tasks spawned onto the session's runtime handle.
83    ///
84    /// See [`ScanBuilder::into_stream`] for more details.
85    pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
86        let dtype = self.dtype()?;
87        let stream = self.into_stream()?;
88        Ok(ArrayStreamAdapter::new(dtype, stream))
89    }
90
91    /// Returns an [`ArrayIterator`] using the given blocking runtime.
92    pub fn into_array_iter<B: BlockingRuntime>(
93        self,
94        runtime: &B,
95    ) -> VortexResult<impl ArrayIterator + 'static> {
96        let stream = self.into_array_stream()?;
97        let dtype = stream.dtype().clone();
98        Ok(ArrayIteratorAdapter::new(
99            dtype,
100            runtime.block_on_stream(stream),
101        ))
102    }
103}
104
105impl<A: 'static + Send> ScanBuilder<A> {
106    pub fn with_filter(mut self, filter: Expression) -> Self {
107        self.filter = Some(filter);
108        self
109    }
110
111    pub fn with_some_filter(mut self, filter: Option<Expression>) -> Self {
112        self.filter = filter;
113        self
114    }
115
116    pub fn with_projection(mut self, projection: Expression) -> Self {
117        self.projection = projection;
118        self
119    }
120
121    pub fn with_ordered(mut self, ordered: bool) -> Self {
122        self.ordered = ordered;
123        self
124    }
125
126    pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
127        self.row_range = Some(row_range);
128        self
129    }
130
131    pub fn with_selection(mut self, selection: Selection) -> Self {
132        self.selection = selection;
133        self
134    }
135
136    pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
137        self.selection = Selection::IncludeByIndex(row_indices);
138        self
139    }
140
141    pub fn with_row_offset(mut self, row_offset: u64) -> Self {
142        self.row_offset = row_offset;
143        self
144    }
145
146    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
147        self.split_by = split_by;
148        self
149    }
150
151    /// The number of row splits to make progress on concurrently per-thread, must
152    /// be greater than 0.
153    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
154        assert!(concurrency > 0);
155        self.concurrency = concurrency;
156        self
157    }
158
159    pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
160        self.metrics = metrics;
161        self
162    }
163
164    pub fn with_limit(mut self, limit: usize) -> Self {
165        self.limit = Some(limit);
166        self
167    }
168
169    /// The [`DType`] returned by the scan, after applying the projection.
170    pub fn dtype(&self) -> VortexResult<DType> {
171        self.projection.return_dtype(self.layout_reader.dtype())
172    }
173
174    /// Map each split of the scan. The function will be run on the spawned task.
175    pub fn map<B: 'static>(
176        self,
177        map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
178    ) -> ScanBuilder<B> {
179        let old_map_fn = self.map_fn;
180        ScanBuilder {
181            session: self.session,
182            layout_reader: self.layout_reader,
183            projection: self.projection,
184            filter: self.filter,
185            ordered: self.ordered,
186            row_range: self.row_range,
187            selection: self.selection,
188            split_by: self.split_by,
189            concurrency: self.concurrency,
190            metrics: self.metrics,
191            file_stats: self.file_stats,
192            limit: self.limit,
193            row_offset: self.row_offset,
194            map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
195        }
196    }
197
198    pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
199        let dtype = self.dtype()?;
200
201        if self.filter.is_some() && self.limit.is_some() {
202            vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
203        }
204
205        // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform
206        // conjunction splitting if a filter is provided.
207        let mut layout_reader = self.layout_reader;
208
209        // Enrich the layout reader to support RowIdx expressions.
210        // Note that this is applied below the filter layout reader since it can perform
211        // better over individual conjunctions.
212        layout_reader = Arc::new(RowIdxLayoutReader::new(self.row_offset, layout_reader));
213
214        // Normalize and simplify the expressions.
215        let projection = simplify_typed(self.projection, layout_reader.dtype())?;
216        let filter = self
217            .filter
218            .map(|f| simplify_typed(f, layout_reader.dtype()))
219            .transpose()?;
220
221        // Construct field masks and compute the row splits of the scan.
222        let (filter_mask, projection_mask) =
223            filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
224        let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
225
226        let splits =
227            if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) {
228                Splits::Ranges(ranges)
229            } else {
230                let split_range = self
231                    .row_range
232                    .clone()
233                    .unwrap_or_else(|| 0..layout_reader.row_count());
234                Splits::Natural(self.split_by.splits(
235                    layout_reader.as_ref(),
236                    &split_range,
237                    &field_mask,
238                )?)
239            };
240
241        Ok(RepeatedScan::new(
242            self.session.clone(),
243            layout_reader,
244            projection,
245            filter,
246            self.ordered,
247            self.row_range,
248            self.selection,
249            splits,
250            self.concurrency,
251            self.map_fn,
252            self.limit,
253            dtype,
254        ))
255    }
256
257    /// Constructs a task per row split of the scan, returned as a vector of futures.
258    pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
259        // The ultimate short circuit
260        if self.limit.is_some_and(|l| l == 0) {
261            return Ok(vec![]);
262        }
263
264        self.prepare()?.execute(None)
265    }
266
267    /// Returns a [`Stream`] with tasks spawned onto the session's runtime handle.
268    pub fn into_stream(
269        self,
270    ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
271        self.prepare()?.execute_stream(None)
272    }
273
274    /// Returns an [`Iterator`] using the session's runtime.
275    pub fn into_iter<B: BlockingRuntime>(
276        self,
277        runtime: &B,
278    ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
279        let stream = self.into_stream()?;
280        Ok(runtime.block_on_stream(stream))
281    }
282}
283
284/// Compute masks of field paths referenced by the projection and filter in the scan.
285///
286/// Projection and filter must be pre-simplified.
287pub(crate) fn filter_and_projection_masks(
288    projection: &Expression,
289    filter: Option<&Expression>,
290    dtype: &DType,
291) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
292    let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
293        return Ok(match filter {
294            Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
295            None => (Vec::new(), vec![FieldMask::All]),
296        });
297    };
298    let projection_mask = immediate_scope_access(projection, struct_dtype);
299    Ok(match filter {
300        None => (
301            Vec::new(),
302            projection_mask.into_iter().map(to_field_mask).collect_vec(),
303        ),
304        Some(f) => {
305            let filter_mask = immediate_scope_access(f, struct_dtype);
306            let only_projection_mask = projection_mask
307                .difference(&filter_mask)
308                .cloned()
309                .map(to_field_mask)
310                .collect_vec();
311            (
312                filter_mask.into_iter().map(to_field_mask).collect_vec(),
313                only_projection_mask,
314            )
315        }
316    })
317}
318
319fn to_field_mask(field: FieldName) -> FieldMask {
320    FieldMask::Prefix(FieldPath::from(Field::Name(field)))
321}