vortex_layout/scan/
mod.rs

1use std::iter;
2use std::ops::{Deref, Range};
3use std::sync::Arc;
4
5use arrow_array::RecordBatch;
6use arrow_schema::SchemaRef;
7pub use executor::*;
8use futures::executor::LocalPool;
9use futures::future::ok;
10use futures::task::LocalSpawnExt;
11use futures::{FutureExt, Stream, StreamExt, stream};
12use itertools::Itertools;
13pub use selection::*;
14pub use split_by::*;
15use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
16use vortex_array::stats::StatsSet;
17use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
18use vortex_array::{ArrayRef, ToCanonical};
19use vortex_buffer::Buffer;
20use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
21use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_err};
22use vortex_expr::transform::immediate_access::immediate_scope_access;
23use vortex_expr::transform::simplify_typed::simplify_typed;
24use vortex_expr::{ExprRef, Identity};
25use vortex_metrics::VortexMetrics;
26
27use crate::LayoutReader;
28use crate::layouts::filter::FilterLayoutReader;
29mod executor;
30pub mod row_mask;
31mod selection;
32mod split_by;
33
34/// A struct for building a scan operation.
35pub struct ScanBuilder<A> {
36    layout_reader: Arc<dyn LayoutReader>,
37    projection: ExprRef,
38    filter: Option<ExprRef>,
39    /// Optionally read a subset of the rows in the file.
40    row_range: Option<Range<u64>>,
41    /// The selection mask to apply to the selected row range.
42    selection: Selection,
43    /// How to split the file for concurrent processing.
44    split_by: SplitBy,
45    /// The number of splits to make progress on concurrently.
46    concurrency: usize,
47    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
48    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
49    /// The executor used to spawn each split task.
50    executor: Option<Arc<dyn TaskExecutor>>,
51    metrics: VortexMetrics,
52    /// Should we try to prune the file (using stats) on open.
53    file_stats: Option<Arc<[StatsSet]>>,
54}
55
56impl<A: 'static + Send> ScanBuilder<A> {
57    pub fn with_filter(mut self, filter: ExprRef) -> Self {
58        self.filter = Some(filter);
59        self
60    }
61
62    pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
63        self.filter = filter;
64        self
65    }
66
67    pub fn with_projection(mut self, projection: ExprRef) -> Self {
68        self.projection = projection;
69        self
70    }
71
72    pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
73        self.row_range = Some(row_range);
74        self
75    }
76
77    pub fn with_some_row_range(mut self, row_range: Option<Range<u64>>) -> Self {
78        self.row_range = row_range;
79        self
80    }
81
82    pub fn with_selection(mut self, selection: Selection) -> Self {
83        self.selection = selection;
84        self
85    }
86
87    pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
88        self.selection = Selection::IncludeByIndex(row_indices);
89        self
90    }
91
92    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
93        self.split_by = split_by;
94        self
95    }
96
97    /// The number of row splits to make progress on concurrently, must be greater than 0.
98    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
99        assert!(concurrency > 0);
100        self.concurrency = concurrency;
101        self
102    }
103
104    /// Spawn each CPU task onto the given Tokio runtime.
105    ///
106    /// Note that this is an odd use of the Tokio runtime. Typically, it is used predominantly
107    /// for I/O bound tasks.
108    #[cfg(feature = "tokio")]
109    pub fn with_tokio_executor(mut self, handle: tokio::runtime::Handle) -> Self {
110        self.executor = Some(Arc::new(handle));
111        self
112    }
113
114    pub fn with_executor(mut self, executor: Arc<dyn TaskExecutor>) -> Self {
115        self.executor = Some(executor);
116        self
117    }
118
119    pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
120        self.metrics = metrics;
121        self
122    }
123
124    pub fn with_prune_file_on_open(mut self, stats_set: Arc<[StatsSet]>) -> Self {
125        self.file_stats = Some(stats_set);
126        self
127    }
128
129    /// Map each split of the scan. The function will be run on the spawned task.
130    pub fn map<B: 'static>(
131        self,
132        map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
133    ) -> ScanBuilder<B> {
134        let old_map_fn = self.map_fn;
135        ScanBuilder {
136            layout_reader: self.layout_reader,
137            projection: self.projection,
138            filter: self.filter,
139            row_range: self.row_range,
140            selection: self.selection,
141            split_by: self.split_by,
142            concurrency: self.concurrency,
143            map_fn: Arc::new(move |a| map_fn(old_map_fn(a)?)),
144            executor: self.executor,
145            metrics: self.metrics,
146            file_stats: self.file_stats,
147        }
148    }
149
150    /// Returns the output [`DType`] of the scan.
151    pub fn dtype(&self) -> VortexResult<DType> {
152        self.projection.return_dtype(self.layout_reader.dtype())
153    }
154
155    /// Constructs a task per row split of the scan, returned as a vector of futures.
156    #[allow(clippy::unused_enumerate_index)]
157    pub fn build(self) -> VortexResult<Vec<impl Future<Output = VortexResult<Option<A>>>>> {
158        // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform
159        // conjunction splitting if a filter is provided.
160        let mut layout_reader = self.layout_reader;
161        if self.filter.is_some() {
162            layout_reader = Arc::new(FilterLayoutReader::new(layout_reader));
163        }
164
165        // Normalize and simplify the expressions.
166        let projection = simplify_typed(self.projection.clone(), layout_reader.dtype())?;
167        let filter = self
168            .filter
169            .clone()
170            .map(|f| simplify_typed(f, layout_reader.dtype()))
171            .transpose()?;
172
173        // Construct field masks and compute the row splits of the scan.
174        let (filter_mask, projection_mask) =
175            filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
176        let field_mask: Vec<_> = filter_mask
177            .iter()
178            .cloned()
179            .chain(projection_mask.iter().cloned())
180            .collect();
181        let splits = self.split_by.splits(layout_reader.deref(), &field_mask)?;
182
183        let row_masks = splits
184            .into_iter()
185            .filter_map(|row_range| {
186                if let Some(scan_range) = &self.row_range {
187                    // If the row range is fully within the scan range, return it.
188                    if row_range.start >= scan_range.end || row_range.end < scan_range.start {
189                        return None;
190                    }
191                    // Otherwise, take the intersection of the range.
192                    return Some(
193                        row_range.start.max(scan_range.start)..row_range.end.min(scan_range.end),
194                    );
195                } else {
196                    Some(row_range)
197                }
198            })
199            .map(|row_range| self.selection.row_mask(&row_range))
200            .filter(|mask| !mask.mask().all_false())
201            .map(|row_mask| {
202                let row_range = row_mask.row_range();
203                (row_range, ok(row_mask.mask().clone()).boxed())
204            })
205            .collect_vec();
206
207        // NOTE(ngates): since segment prefetching occurs in insertion order, we construct
208        //  all pruning tasks, then all filter tasks, then all projection tasks. When a task
209        //  explicitly polls a segment, it jumps to the front of the queue so this shouldn't
210        //  impact the time-to-first-chunk latency.
211
212        // If a filter expression is provided, then we setup pruning and filter evaluations.
213        let row_masks = if let Some(filter) = &filter {
214            // Map the row masks through the pruning evaluation
215            let row_masks: Vec<_> = row_masks
216                .into_iter()
217                .map(|(row_range, mask_fut)| {
218                    let eval = layout_reader.pruning_evaluation(&row_range, filter)?;
219                    let mask_fut = async move {
220                        let mask = mask_fut.await?;
221                        if mask.all_false() {
222                            Ok(mask)
223                        } else {
224                            eval.invoke(mask).await
225                        }
226                    }
227                    .boxed();
228                    Ok::<_, VortexError>((row_range, mask_fut))
229                })
230                .try_collect()?;
231
232            // Map the row masks through the filter evaluation
233            row_masks
234                .into_iter()
235                .map(|(row_range, mask_fut)| {
236                    let eval = layout_reader.filter_evaluation(&row_range, filter)?;
237                    let mask_fut = async move {
238                        let mask = mask_fut.await?;
239                        if mask.all_false() {
240                            Ok(mask)
241                        } else {
242                            eval.invoke(mask).await
243                        }
244                    }
245                    .boxed();
246                    Ok::<_, VortexError>((row_range, mask_fut))
247                })
248                .try_collect()?
249        } else {
250            row_masks
251        };
252
253        // Finally, map the row masks through the projection evaluation and spawn.
254        row_masks
255            .into_iter()
256            .map(|(row_range, mask_fut)| {
257                let map_fn = self.map_fn.clone();
258                let eval = layout_reader.projection_evaluation(&row_range, &projection)?;
259                let array_fut = async move {
260                    let mask = mask_fut.await?;
261                    if mask.all_false() {
262                        Ok(None)
263                    } else {
264                        map_fn(eval.invoke(mask).await?).map(Some)
265                    }
266                }
267                .boxed();
268
269                Ok(match &self.executor {
270                    None => array_fut,
271                    Some(executor) => executor.spawn(array_fut),
272                })
273            })
274            .try_collect()
275    }
276
277    /// Returns a stream over the scan objects.
278    pub fn into_stream(self) -> VortexResult<impl Stream<Item = VortexResult<A>> + 'static> {
279        let concurrency = self.concurrency;
280        Ok(stream::iter(self.build()?)
281            .buffered(concurrency)
282            .filter_map(|r| async move { r.transpose() }))
283    }
284}
285
286impl ScanBuilder<ArrayRef> {
287    pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
288        Self {
289            layout_reader,
290            projection: Identity::new_expr(),
291            filter: None,
292            row_range: None,
293            selection: Default::default(),
294            split_by: SplitBy::Layout,
295            // How many row splits to make progress on concurrently (not necessarily in parallel,
296            // that is decided by the TaskExecutor).
297            concurrency: 16,
298            map_fn: Arc::new(Ok),
299            executor: None,
300            metrics: Default::default(),
301            file_stats: None,
302        }
303    }
304
305    /// Map the scan into a stream of Arrow [`RecordBatch`].
306    pub fn map_to_record_batch(self, schema: SchemaRef) -> ScanBuilder<RecordBatch> {
307        self.map(move |array| {
308            let st = array.to_struct()?;
309            st.into_record_batch_with_schema(schema.as_ref())
310        })
311    }
312
313    /// Returns a stream over the scan with each CPU task polled on the current thread as per
314    /// the behaviour of [`futures::stream::Buffered`].
315    pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + 'static> {
316        let dtype = self.dtype()?;
317        let stream = self.into_stream()?;
318        Ok(ArrayStreamAdapter::new(dtype, stream))
319    }
320
321    /// Returns a blocking iterator over the scan.
322    ///
323    /// All work will be performed on the current thread, with tasks interleaved per the
324    /// configured concurrency. Any configured executor will be ignored.
325    pub fn into_array_iter(self) -> VortexResult<impl ArrayIterator + 'static> {
326        let dtype = self.dtype()?;
327        let concurrency = self.concurrency;
328
329        let mut local_pool = LocalPool::new();
330        let spawner = local_pool.spawner();
331
332        let mut stream = stream::iter(self.build()?)
333            .map(move |task| {
334                spawner
335                    .spawn_local_with_handle(task)
336                    .map_err(|e| vortex_err!("Failed to spawn task: {e}"))
337                    .vortex_expect("Failed to spawn task")
338            })
339            .buffered(concurrency)
340            .filter_map(|a| async move { a.transpose() })
341            .boxed_local();
342
343        Ok(ArrayIteratorAdapter::new(
344            dtype,
345            iter::from_fn(move || local_pool.run_until(stream.next())),
346        ))
347    }
348}
349
350/// Compute masks of field paths referenced by the projection and filter in the scan.
351///
352/// Projection and filter must be pre-simplified.
353fn filter_and_projection_masks(
354    projection: &ExprRef,
355    filter: Option<&ExprRef>,
356    dtype: &DType,
357) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
358    let Some(struct_dtype) = dtype.as_struct() else {
359        return Ok(match filter {
360            Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
361            None => (Vec::new(), vec![FieldMask::All]),
362        });
363    };
364    let projection_mask = immediate_scope_access(projection, struct_dtype)?;
365    Ok(match filter {
366        None => (
367            Vec::new(),
368            projection_mask.into_iter().map(to_field_mask).collect_vec(),
369        ),
370        Some(f) => {
371            let filter_mask = immediate_scope_access(f, struct_dtype)?;
372            let only_projection_mask = projection_mask
373                .difference(&filter_mask)
374                .cloned()
375                .map(to_field_mask)
376                .collect_vec();
377            (
378                filter_mask.into_iter().map(to_field_mask).collect_vec(),
379                only_projection_mask,
380            )
381        }
382    })
383}
384
385fn to_field_mask(field: FieldName) -> FieldMask {
386    FieldMask::Prefix(FieldPath::from(Field::Name(field)))
387}