vortex_layout/scan/
mod.rs

1use std::iter;
2use std::ops::Range;
3use std::sync::Arc;
4
5use futures::executor::LocalPool;
6use futures::future::BoxFuture;
7use futures::task::LocalSpawnExt;
8use futures::{FutureExt, StreamExt, stream};
9use itertools::Itertools;
10pub use selection::*;
11pub use split_by::*;
12use vortex_array::builders::builder_with_capacity;
13use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
14use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
15use vortex_array::{Array, ArrayRef};
16use vortex_buffer::Buffer;
17use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
18use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_err};
19use vortex_expr::transform::immediate_access::immediate_scope_access;
20use vortex_expr::transform::simplify_typed::simplify_typed;
21use vortex_expr::{ExprRef, Identity};
22use vortex_metrics::{VortexMetrics, instrument};
23
24use crate::layouts::filter::FilterLayoutReader;
25use crate::{ExprEvaluator, LayoutReader};
26
27pub mod row_mask;
28mod selection;
29mod split_by;
30
31/// A struct for building a scan operation.
32pub struct ScanBuilder {
33    layout_reader: Arc<dyn LayoutReader>,
34    projection: ExprRef,
35    filter: Option<ExprRef>,
36    /// Optionally read a subset of the rows in the file.
37    row_range: Option<Range<u64>>,
38    /// The selection mask to apply to the selected row range.
39    selection: Selection,
40    /// How to split the file for concurrent processing.
41    split_by: SplitBy,
42    /// Whether the arrays returned by the scan should be in canonical form.
43    canonicalize: bool,
44    /// The number of splits to make progress on concurrently.
45    concurrency: usize,
46    metrics: VortexMetrics,
47}
48
49impl ScanBuilder {
50    pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
51        Self {
52            layout_reader,
53            projection: Identity::new_expr(),
54            filter: None,
55            row_range: None,
56            selection: Default::default(),
57            split_by: SplitBy::Layout,
58            canonicalize: false,
59            // How many row splits to make progress on concurrently (not necessarily in parallel,
60            // that is decided by the TaskExecutor).
61            concurrency: 16,
62            metrics: Default::default(),
63        }
64    }
65
66    pub fn with_filter(mut self, filter: ExprRef) -> Self {
67        self.filter = Some(filter);
68        self
69    }
70
71    pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
72        self.filter = filter;
73        self
74    }
75
76    pub fn with_projection(mut self, projection: ExprRef) -> Self {
77        self.projection = projection;
78        self
79    }
80
81    pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
82        self.row_range = Some(row_range);
83        self
84    }
85
86    pub fn with_some_row_range(mut self, row_range: Option<Range<u64>>) -> Self {
87        self.row_range = row_range;
88        self
89    }
90
91    pub fn with_selection(mut self, selection: Selection) -> Self {
92        self.selection = selection;
93        self
94    }
95
96    pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
97        self.selection = Selection::IncludeByIndex(row_indices);
98        self
99    }
100
101    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
102        self.split_by = split_by;
103        self
104    }
105
106    /// Set whether the scan should canonicalize the output.
107    pub fn with_canonicalize(mut self, canonicalize: bool) -> Self {
108        self.canonicalize = canonicalize;
109        self
110    }
111
112    /// The number of row splits to make progress on concurrently, must be greater than 0.
113    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
114        assert!(concurrency > 0);
115        self.concurrency = concurrency;
116        self
117    }
118
119    pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
120        self.metrics = metrics;
121        self
122    }
123
124    #[allow(clippy::unused_enumerate_index)]
125    fn build_tasks(
126        self,
127    ) -> VortexResult<Vec<impl Future<Output = VortexResult<Option<ArrayRef>>>>> {
128        // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform
129        // conjunction splitting if a filter is provided.
130        let mut layout_reader = self.layout_reader;
131        if self.filter.is_some() {
132            layout_reader = Arc::new(FilterLayoutReader::new(layout_reader));
133        }
134
135        // Normalize and simplify the expressions.
136        let projection = simplify_typed(self.projection.clone(), layout_reader.dtype())?;
137        let filter = self
138            .filter
139            .clone()
140            .map(|f| simplify_typed(f, layout_reader.dtype()))
141            .transpose()?;
142
143        // Construct field masks and compute the row splits of the scan.
144        let (filter_mask, projection_mask) =
145            filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
146        let field_mask: Vec<_> = filter_mask
147            .iter()
148            .cloned()
149            .chain(projection_mask.iter().cloned())
150            .collect();
151        let splits = self.split_by.splits(layout_reader.layout(), &field_mask)?;
152
153        let row_masks = splits
154            .into_iter()
155            .filter_map(|row_range| {
156                if let Some(scan_range) = &self.row_range {
157                    // If the row range is fully within the scan range, return it.
158                    if row_range.start >= scan_range.end || row_range.end < scan_range.start {
159                        return None;
160                    }
161                    // Otherwise, take the intersection of the range.
162                    return Some(
163                        row_range.start.max(scan_range.start)..row_range.end.min(scan_range.end),
164                    );
165                } else {
166                    Some(row_range)
167                }
168            })
169            .map(|row_range| self.selection.row_mask(&row_range))
170            .filter(|mask| !mask.mask().all_false())
171            .collect_vec();
172
173        // Create a future to process each row split of the scan.
174        row_masks
175            .into_iter()
176            .enumerate()
177            .map(|(_i, row_mask)| {
178                let row_range = row_mask.row_range();
179
180                let approx_filter_eval = filter
181                    .as_ref()
182                    .map(|expr| layout_reader.pruning_evaluation(&row_range, expr))
183                    .transpose()?;
184                let exact_filter_eval = filter
185                    .as_ref()
186                    .map(|expr| layout_reader.filter_evaluation(&row_range, expr))
187                    .transpose()?;
188                let project_eval = layout_reader.projection_evaluation(&row_range, &projection)?;
189
190                Ok::<_, VortexError>(instrument!("split", [split = _i], async move {
191                    let mut mask = row_mask.mask().clone();
192                    if mask.all_false() {
193                        return Ok(None);
194                    }
195
196                    if let Some(approx_filter_eval) = approx_filter_eval {
197                        // First, we run an approximate evaluation to prune the row range.
198                        log::debug!("Pruning row range {:?}", row_range);
199                        mask = approx_filter_eval.invoke(mask).await?;
200                        if mask.all_false() {
201                            return Ok(None);
202                        }
203                    }
204
205                    if let Some(exact_filter_eval) = exact_filter_eval {
206                        // Then, we run the full evaluation.
207                        log::debug!("Filtering row range {:?}", row_range);
208                        mask = exact_filter_eval.invoke(mask).await?;
209                        if mask.all_false() {
210                            return Ok(None);
211                        }
212                    }
213
214                    log::debug!("Projecting row range {:?}", row_range);
215                    let mut array = project_eval.invoke(mask).await?;
216                    if self.canonicalize {
217                        log::debug!("Canonicalizing row range {:?}", row_range);
218                        let mut builder = builder_with_capacity(array.dtype(), array.len());
219                        array.append_to_builder(builder.as_mut())?;
220                        array = builder.finish();
221                    }
222
223                    Ok(Some(array))
224                }))
225            })
226            .try_collect()
227    }
228
229    /// Returns a stream over the scan with each CPU task spawned using the given spawn function.
230    pub fn spawn_on<F, S>(self, mut spawner: S) -> VortexResult<impl ArrayStream + 'static>
231    where
232        F: Future<Output = VortexResult<Option<ArrayRef>>>,
233        S: FnMut(BoxFuture<'static, VortexResult<Option<ArrayRef>>>) -> F + 'static,
234    {
235        let concurrency = self.concurrency;
236        let dtype = self.projection.return_dtype(self.layout_reader.dtype())?;
237        let tasks = self.build_tasks()?;
238
239        let array_stream = stream::iter(tasks)
240            .map(move |task| spawner(task.boxed()))
241            .buffered(concurrency)
242            .filter_map(|v| async move { v.transpose() });
243
244        Ok(ArrayStreamAdapter::new(
245            dtype,
246            instrument!("array_stream", array_stream),
247        ))
248    }
249
250    /// Returns a stream over the scan with each CPU task spawned onto the given Tokio runtime
251    /// using [`tokio::runtime::Handle::spawn`].
252    ///
253    /// Note that this should only be used if the Tokio runtime is dedicated to CPU-bound tasks.
254    #[cfg(feature = "tokio")]
255    pub fn spawn_tokio(
256        self,
257        handle: tokio::runtime::Handle,
258    ) -> VortexResult<impl ArrayStream + 'static> {
259        self.spawn_on(move |task| {
260            let handle = handle.clone();
261            async move {
262                handle
263                    .spawn(task)
264                    .await
265                    .vortex_expect("Failed to join task")
266            }
267        })
268    }
269
270    /// Returns a stream over the scan with each CPU task spawned onto a Tokio worker thread
271    /// using [`tokio::runtime::Handle::spawn_blocking`].
272    #[cfg(feature = "tokio")]
273    pub fn spawn_tokio_blocking(
274        self,
275        handle: tokio::runtime::Handle,
276    ) -> VortexResult<impl ArrayStream + 'static> {
277        self.spawn_on(move |task| {
278            let handle = handle.clone();
279            async move {
280                handle
281                    .spawn_blocking(|| futures::executor::block_on(task))
282                    .await
283                    .vortex_expect("Failed to join task")
284            }
285        })
286    }
287
288    /// Returns a stream over the scan with each CPU task polled on the current thread as per
289    /// the behaviour of [`futures::stream::Buffered`].
290    pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + 'static> {
291        self.spawn_on(|task| task)
292    }
293
294    /// Returns a blocking iterator over the scan.
295    ///
296    /// All work will be performed on the current thread, with tasks interleaved per the
297    /// configured concurrency.
298    pub fn into_array_iter(self) -> VortexResult<impl ArrayIterator + 'static> {
299        let mut local_pool = LocalPool::new();
300        let spawner = local_pool.spawner();
301        let array_stream = self.spawn_on(move |task| {
302            spawner
303                .spawn_local_with_handle(task)
304                .map_err(|e| vortex_err!("Failed to spawn task: {e}"))
305                .vortex_expect("Failed to spawn task")
306        })?;
307
308        let mut array_stream = Box::pin(array_stream);
309        Ok(ArrayIteratorAdapter::new(
310            array_stream.dtype().clone(),
311            iter::from_fn(move || local_pool.run_until(array_stream.next())),
312        ))
313    }
314}
315
316/// Compute masks of field paths referenced by the projection and filter in the scan.
317///
318/// Projection and filter must be pre-simplified.
319fn filter_and_projection_masks(
320    projection: &ExprRef,
321    filter: Option<&ExprRef>,
322    dtype: &DType,
323) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
324    let Some(struct_dtype) = dtype.as_struct() else {
325        return Ok(match filter {
326            Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
327            None => (Vec::new(), vec![FieldMask::All]),
328        });
329    };
330    let projection_mask = immediate_scope_access(projection, struct_dtype)?;
331    Ok(match filter {
332        None => (
333            Vec::new(),
334            projection_mask.into_iter().map(to_field_mask).collect_vec(),
335        ),
336        Some(f) => {
337            let filter_mask = immediate_scope_access(f, struct_dtype)?;
338            let only_projection_mask = projection_mask
339                .difference(&filter_mask)
340                .cloned()
341                .map(to_field_mask)
342                .collect_vec();
343            (
344                filter_mask.into_iter().map(to_field_mask).collect_vec(),
345                only_projection_mask,
346            )
347        }
348    })
349}
350
351fn to_field_mask(field: FieldName) -> FieldMask {
352    FieldMask::Prefix(FieldPath::from(Field::Name(field)))
353}