vortex_layout/scan/
mod.rs

1use std::iter;
2use std::sync::Arc;
3
4use futures::executor::LocalPool;
5use futures::future::BoxFuture;
6use futures::task::LocalSpawnExt;
7use futures::{FutureExt, StreamExt, stream};
8use itertools::Itertools;
9pub use selection::*;
10pub use split_by::*;
11use vortex_array::builders::builder_with_capacity;
12use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
13use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
14use vortex_array::{Array, ArrayRef};
15use vortex_buffer::Buffer;
16use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
17use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_err};
18use vortex_expr::transform::immediate_access::immediate_scope_access;
19use vortex_expr::transform::simplify_typed::simplify_typed;
20use vortex_expr::{ExprRef, Identity};
21use vortex_metrics::{VortexMetrics, instrument};
22
23use crate::layouts::filter::FilterLayoutReader;
24use crate::{ExprEvaluator, LayoutReader};
25
26pub mod row_mask;
27mod selection;
28mod split_by;
29
30/// A struct for building a scan operation.
31pub struct ScanBuilder {
32    layout_reader: Arc<dyn LayoutReader>,
33    projection: ExprRef,
34    filter: Option<ExprRef>,
35    selection: Selection,
36    /// How to split the file for concurrent processing.
37    split_by: SplitBy,
38    /// Whether the arrays returned by the scan should be in canonical form.
39    canonicalize: bool,
40    /// The number of splits to make progress on concurrently.
41    concurrency: usize,
42    metrics: VortexMetrics,
43}
44
45impl ScanBuilder {
46    pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
47        Self {
48            layout_reader,
49            projection: Identity::new_expr(),
50            filter: None,
51            selection: Default::default(),
52            split_by: SplitBy::Layout,
53            canonicalize: false,
54            // How many row splits to make progress on concurrently (not necessarily in parallel,
55            // that is decided by the TaskExecutor).
56            concurrency: 16,
57            metrics: Default::default(),
58        }
59    }
60
61    pub fn with_filter(mut self, filter: ExprRef) -> Self {
62        self.filter = Some(filter);
63        self
64    }
65
66    pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
67        self.filter = filter;
68        self
69    }
70
71    pub fn with_projection(mut self, projection: ExprRef) -> Self {
72        self.projection = projection;
73        self
74    }
75
76    pub fn with_selection(mut self, selection: Selection) -> Self {
77        self.selection = selection;
78        self
79    }
80
81    pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
82        self.selection = Selection::IncludeByIndex(row_indices);
83        self
84    }
85
86    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
87        self.split_by = split_by;
88        self
89    }
90
91    /// Set whether the scan should canonicalize the output.
92    pub fn with_canonicalize(mut self, canonicalize: bool) -> Self {
93        self.canonicalize = canonicalize;
94        self
95    }
96
97    /// The number of row splits to make progress on concurrently, must be greater than 0.
98    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
99        assert!(concurrency > 0);
100        self.concurrency = concurrency;
101        self
102    }
103
104    pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
105        self.metrics = metrics;
106        self
107    }
108
109    #[allow(clippy::unused_enumerate_index)]
110    fn build_tasks(
111        self,
112    ) -> VortexResult<Vec<impl Future<Output = VortexResult<Option<ArrayRef>>>>> {
113        // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform
114        // conjunction splitting if a filter is provided.
115        let mut layout_reader = self.layout_reader;
116        if self.filter.is_some() {
117            layout_reader = Arc::new(FilterLayoutReader::new(layout_reader));
118        }
119
120        // Normalize and simplify the expressions.
121        let projection = simplify_typed(self.projection.clone(), layout_reader.dtype())?;
122        let filter = self
123            .filter
124            .clone()
125            .map(|f| simplify_typed(f, layout_reader.dtype()))
126            .transpose()?;
127
128        // Construct field masks and compute the row splits of the scan.
129        let (filter_mask, projection_mask) =
130            filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
131        let field_mask: Vec<_> = filter_mask
132            .iter()
133            .cloned()
134            .chain(projection_mask.iter().cloned())
135            .collect();
136        let splits = self.split_by.splits(layout_reader.layout(), &field_mask)?;
137
138        let row_masks = splits
139            .into_iter()
140            .map(|row_range| self.selection.row_mask(&row_range))
141            .filter(|mask| !mask.mask().all_false())
142            .collect_vec();
143
144        // Create a future to process each row split of the scan.
145        row_masks
146            .into_iter()
147            .enumerate()
148            .map(|(_i, row_mask)| {
149                let row_range = row_mask.row_range();
150
151                let approx_filter_eval = filter
152                    .as_ref()
153                    .map(|expr| layout_reader.pruning_evaluation(&row_range, expr))
154                    .transpose()?;
155                let exact_filter_eval = filter
156                    .as_ref()
157                    .map(|expr| layout_reader.filter_evaluation(&row_range, expr))
158                    .transpose()?;
159                let project_eval = layout_reader.projection_evaluation(&row_range, &projection)?;
160
161                Ok::<_, VortexError>(instrument!("split", [split = _i], async move {
162                    let mut mask = row_mask.mask().clone();
163                    if mask.all_false() {
164                        return Ok(None);
165                    }
166
167                    if let Some(approx_filter_eval) = approx_filter_eval {
168                        // First, we run an approximate evaluation to prune the row range.
169                        log::debug!("Pruning row range {:?}", row_range);
170                        mask = approx_filter_eval.invoke(mask).await?;
171                        if mask.all_false() {
172                            return Ok(None);
173                        }
174                    }
175
176                    if let Some(exact_filter_eval) = exact_filter_eval {
177                        // Then, we run the full evaluation.
178                        log::debug!("Filtering row range {:?}", row_range);
179                        mask = exact_filter_eval.invoke(mask).await?;
180                        if mask.all_false() {
181                            return Ok(None);
182                        }
183                    }
184
185                    log::debug!("Projecting row range {:?}", row_range);
186                    let mut array = project_eval.invoke(mask).await?;
187                    if self.canonicalize {
188                        log::debug!("Canonicalizing row range {:?}", row_range);
189                        let mut builder = builder_with_capacity(array.dtype(), array.len());
190                        array.append_to_builder(builder.as_mut())?;
191                        array = builder.finish();
192                    }
193
194                    Ok(Some(array))
195                }))
196            })
197            .try_collect()
198    }
199
200    /// Returns a stream over the scan with each CPU task spawned using the given spawn function.
201    pub fn spawn_on<F, S>(self, mut spawner: S) -> VortexResult<impl ArrayStream + 'static>
202    where
203        F: Future<Output = VortexResult<Option<ArrayRef>>>,
204        S: FnMut(BoxFuture<'static, VortexResult<Option<ArrayRef>>>) -> F + 'static,
205    {
206        let concurrency = self.concurrency;
207        let dtype = self.projection.return_dtype(self.layout_reader.dtype())?;
208        let tasks = self.build_tasks()?;
209
210        let array_stream = stream::iter(tasks)
211            .map(move |task| spawner(task.boxed()))
212            .buffered(concurrency)
213            .filter_map(|v| async move { v.transpose() });
214
215        Ok(ArrayStreamAdapter::new(
216            dtype,
217            instrument!("array_stream", array_stream),
218        ))
219    }
220
221    /// Returns a stream over the scan with each CPU task spawned onto the given Tokio runtime
222    /// using [`tokio::runtime::Handle::spawn`].
223    ///
224    /// Note that this should only be used if the Tokio runtime is dedicated to CPU-bound tasks.
225    #[cfg(feature = "tokio")]
226    pub fn spawn_tokio(
227        self,
228        handle: tokio::runtime::Handle,
229    ) -> VortexResult<impl ArrayStream + 'static> {
230        self.spawn_on(move |task| {
231            let handle = handle.clone();
232            async move {
233                handle
234                    .spawn(task)
235                    .await
236                    .vortex_expect("Failed to join task")
237            }
238        })
239    }
240
241    /// Returns a stream over the scan with each CPU task spawned onto a Tokio worker thread
242    /// using [`tokio::runtime::Handle::spawn_blocking`].
243    #[cfg(feature = "tokio")]
244    pub fn spawn_tokio_blocking(
245        self,
246        handle: tokio::runtime::Handle,
247    ) -> VortexResult<impl ArrayStream + 'static> {
248        self.spawn_on(move |task| {
249            let handle = handle.clone();
250            async move {
251                handle
252                    .spawn_blocking(|| futures::executor::block_on(task))
253                    .await
254                    .vortex_expect("Failed to join task")
255            }
256        })
257    }
258
259    /// Returns a stream over the scan with each CPU task polled on the current thread as per
260    /// the behaviour of [`futures::stream::Buffered`].
261    pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + 'static> {
262        self.spawn_on(|task| task)
263    }
264
265    /// Returns a blocking iterator over the scan.
266    ///
267    /// All work will be performed on the current thread, with tasks interleaved per the
268    /// configured concurrency.
269    pub fn into_array_iter(self) -> VortexResult<impl ArrayIterator + 'static> {
270        let mut local_pool = LocalPool::new();
271        let spawner = local_pool.spawner();
272        let array_stream = self.spawn_on(move |task| {
273            spawner
274                .spawn_local_with_handle(task)
275                .map_err(|e| vortex_err!("Failed to spawn task: {e}"))
276                .vortex_expect("Failed to spawn task")
277        })?;
278
279        let mut array_stream = Box::pin(array_stream);
280        Ok(ArrayIteratorAdapter::new(
281            array_stream.dtype().clone(),
282            iter::from_fn(move || local_pool.run_until(array_stream.next())),
283        ))
284    }
285}
286
287/// Compute masks of field paths referenced by the projection and filter in the scan.
288///
289/// Projection and filter must be pre-simplified.
290fn filter_and_projection_masks(
291    projection: &ExprRef,
292    filter: Option<&ExprRef>,
293    dtype: &DType,
294) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
295    let Some(struct_dtype) = dtype.as_struct() else {
296        return Ok(match filter {
297            Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
298            None => (Vec::new(), vec![FieldMask::All]),
299        });
300    };
301    let projection_mask = immediate_scope_access(projection, struct_dtype)?;
302    Ok(match filter {
303        None => (
304            Vec::new(),
305            projection_mask.into_iter().map(to_field_mask).collect_vec(),
306        ),
307        Some(f) => {
308            let filter_mask = immediate_scope_access(f, struct_dtype)?;
309            let only_projection_mask = projection_mask
310                .difference(&filter_mask)
311                .cloned()
312                .map(to_field_mask)
313                .collect_vec();
314            (
315                filter_mask.into_iter().map(to_field_mask).collect_vec(),
316                only_projection_mask,
317            )
318        }
319    })
320}
321
322fn to_field_mask(field: FieldName) -> FieldMask {
323    FieldMask::Prefix(FieldPath::from(Field::Name(field)))
324}