vortex_layout/scan/
mod.rs

1use std::iter;
2use std::ops::{Deref, Range};
3use std::sync::Arc;
4
5use arrow_array::RecordBatch;
6use arrow_schema::SchemaRef;
7pub use executor::*;
8use futures::executor::LocalPool;
9use futures::future::ok;
10use futures::task::LocalSpawnExt;
11use futures::{FutureExt, Stream, StreamExt, stream};
12use itertools::Itertools;
13pub use selection::*;
14pub use split_by::*;
15use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
16use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
17use vortex_array::{ArrayRef, ToCanonical};
18use vortex_buffer::Buffer;
19use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
20use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_err};
21use vortex_expr::transform::immediate_access::immediate_scope_access;
22use vortex_expr::transform::simplify_typed::simplify_typed;
23use vortex_expr::{ExprRef, Identity};
24use vortex_metrics::VortexMetrics;
25
26use crate::LayoutReader;
27use crate::layouts::filter::FilterLayoutReader;
28mod executor;
29pub mod row_mask;
30mod selection;
31mod split_by;
32
33/// A struct for building a scan operation.
34pub struct ScanBuilder<A> {
35    layout_reader: Arc<dyn LayoutReader>,
36    projection: ExprRef,
37    filter: Option<ExprRef>,
38    /// Optionally read a subset of the rows in the file.
39    row_range: Option<Range<u64>>,
40    /// The selection mask to apply to the selected row range.
41    selection: Selection,
42    /// How to split the file for concurrent processing.
43    split_by: SplitBy,
44    /// The number of splits to make progress on concurrently.
45    concurrency: usize,
46    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
47    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
48    /// The executor used to spawn each split task.
49    executor: Option<Arc<dyn TaskExecutor>>,
50    metrics: VortexMetrics,
51}
52
53impl<A: 'static + Send> ScanBuilder<A> {
54    pub fn with_filter(mut self, filter: ExprRef) -> Self {
55        self.filter = Some(filter);
56        self
57    }
58
59    pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
60        self.filter = filter;
61        self
62    }
63
64    pub fn with_projection(mut self, projection: ExprRef) -> Self {
65        self.projection = projection;
66        self
67    }
68
69    pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
70        self.row_range = Some(row_range);
71        self
72    }
73
74    pub fn with_some_row_range(mut self, row_range: Option<Range<u64>>) -> Self {
75        self.row_range = row_range;
76        self
77    }
78
79    pub fn with_selection(mut self, selection: Selection) -> Self {
80        self.selection = selection;
81        self
82    }
83
84    pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
85        self.selection = Selection::IncludeByIndex(row_indices);
86        self
87    }
88
89    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
90        self.split_by = split_by;
91        self
92    }
93
94    /// The number of row splits to make progress on concurrently, must be greater than 0.
95    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
96        assert!(concurrency > 0);
97        self.concurrency = concurrency;
98        self
99    }
100
101    /// Spawn each CPU task onto the given Tokio runtime.
102    ///
103    /// Note that this is an odd use of the Tokio runtime. Typically, it is used predominantly
104    /// for I/O bound tasks.
105    #[cfg(feature = "tokio")]
106    pub fn with_tokio_executor(mut self, handle: tokio::runtime::Handle) -> Self {
107        self.executor = Some(Arc::new(handle));
108        self
109    }
110
111    pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
112        self.metrics = metrics;
113        self
114    }
115
116    /// Map each split of the scan. The function will be run on the spawned task.
117    pub fn map<B: 'static>(
118        self,
119        map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
120    ) -> ScanBuilder<B> {
121        let old_map_fn = self.map_fn;
122        ScanBuilder {
123            layout_reader: self.layout_reader,
124            projection: self.projection,
125            filter: self.filter,
126            row_range: self.row_range,
127            selection: self.selection,
128            split_by: self.split_by,
129            concurrency: self.concurrency,
130            map_fn: Arc::new(move |a| map_fn(old_map_fn(a)?)),
131            executor: self.executor,
132            metrics: self.metrics,
133        }
134    }
135
136    /// Returns the output [`DType`] of the scan.
137    pub fn dtype(&self) -> VortexResult<DType> {
138        self.projection.return_dtype(self.layout_reader.dtype())
139    }
140
141    /// Constructs a task per row split of the scan, returned as a vector of futures.
142    #[allow(clippy::unused_enumerate_index)]
143    pub fn build(self) -> VortexResult<Vec<impl Future<Output = VortexResult<Option<A>>>>> {
144        // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform
145        // conjunction splitting if a filter is provided.
146        let mut layout_reader = self.layout_reader;
147        if self.filter.is_some() {
148            layout_reader = Arc::new(FilterLayoutReader::new(layout_reader));
149        }
150
151        // Normalize and simplify the expressions.
152        let projection = simplify_typed(self.projection.clone(), layout_reader.dtype())?;
153        let filter = self
154            .filter
155            .clone()
156            .map(|f| simplify_typed(f, layout_reader.dtype()))
157            .transpose()?;
158
159        // Construct field masks and compute the row splits of the scan.
160        let (filter_mask, projection_mask) =
161            filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
162        let field_mask: Vec<_> = filter_mask
163            .iter()
164            .cloned()
165            .chain(projection_mask.iter().cloned())
166            .collect();
167        let splits = self.split_by.splits(layout_reader.deref(), &field_mask)?;
168
169        let row_masks = splits
170            .into_iter()
171            .filter_map(|row_range| {
172                if let Some(scan_range) = &self.row_range {
173                    // If the row range is fully within the scan range, return it.
174                    if row_range.start >= scan_range.end || row_range.end < scan_range.start {
175                        return None;
176                    }
177                    // Otherwise, take the intersection of the range.
178                    return Some(
179                        row_range.start.max(scan_range.start)..row_range.end.min(scan_range.end),
180                    );
181                } else {
182                    Some(row_range)
183                }
184            })
185            .map(|row_range| self.selection.row_mask(&row_range))
186            .filter(|mask| !mask.mask().all_false())
187            .map(|row_mask| {
188                let row_range = row_mask.row_range();
189                (row_range, ok(row_mask.mask().clone()).boxed())
190            })
191            .collect_vec();
192
193        // NOTE(ngates): since segment prefetching occurs in insertion order, we construct
194        //  all pruning tasks, then all filter tasks, then all projection tasks. When a task
195        //  explicitly polls a segment, it jumps to the front of the queue so this shouldn't
196        //  impact the time-to-first-chunk latency.
197
198        // If a filter expression is provided, then we setup pruning and filter evaluations.
199        let row_masks = if let Some(filter) = &filter {
200            // Map the row masks through the pruning evaluation
201            let row_masks: Vec<_> = row_masks
202                .into_iter()
203                .map(|(row_range, mask_fut)| {
204                    let eval = layout_reader.pruning_evaluation(&row_range, filter)?;
205                    let mask_fut = async move {
206                        let mask = mask_fut.await?;
207                        if mask.all_false() {
208                            Ok(mask)
209                        } else {
210                            eval.invoke(mask).await
211                        }
212                    }
213                    .boxed();
214                    Ok::<_, VortexError>((row_range, mask_fut))
215                })
216                .try_collect()?;
217
218            // Map the row masks through the filter evaluation
219            row_masks
220                .into_iter()
221                .map(|(row_range, mask_fut)| {
222                    let eval = layout_reader.filter_evaluation(&row_range, filter)?;
223                    let mask_fut = async move {
224                        let mask = mask_fut.await?;
225                        if mask.all_false() {
226                            Ok(mask)
227                        } else {
228                            eval.invoke(mask).await
229                        }
230                    }
231                    .boxed();
232                    Ok::<_, VortexError>((row_range, mask_fut))
233                })
234                .try_collect()?
235        } else {
236            row_masks
237        };
238
239        // Finally, map the row masks through the projection evaluation
240        row_masks
241            .into_iter()
242            .map(|(row_range, mask_fut)| {
243                let map_fn = self.map_fn.clone();
244                let eval = layout_reader.projection_evaluation(&row_range, &projection)?;
245                let array_fut = async move {
246                    let mask = mask_fut.await?;
247                    if mask.all_false() {
248                        Ok(None)
249                    } else {
250                        map_fn(eval.invoke(mask).await?).map(Some)
251                    }
252                }
253                .boxed();
254
255                Ok(match &self.executor {
256                    None => array_fut,
257                    Some(executor) => executor.spawn(array_fut),
258                })
259            })
260            .try_collect()
261    }
262
263    /// Returns a stream over the scan objects.
264    pub fn into_stream(self) -> VortexResult<impl Stream<Item = VortexResult<A>> + 'static> {
265        let concurrency = self.concurrency;
266        Ok(stream::iter(self.build()?)
267            .buffered(concurrency)
268            .filter_map(|r| async move { r.transpose() }))
269    }
270}
271
272impl ScanBuilder<ArrayRef> {
273    pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
274        Self {
275            layout_reader,
276            projection: Identity::new_expr(),
277            filter: None,
278            row_range: None,
279            selection: Default::default(),
280            split_by: SplitBy::Layout,
281            // How many row splits to make progress on concurrently (not necessarily in parallel,
282            // that is decided by the TaskExecutor).
283            concurrency: 16,
284            map_fn: Arc::new(Ok),
285            executor: None,
286            metrics: Default::default(),
287        }
288    }
289
290    /// Map the scan into a stream of Arrow [`RecordBatch`].
291    pub fn map_to_record_batch(self, schema: SchemaRef) -> ScanBuilder<RecordBatch> {
292        self.map(move |array| {
293            let st = array.to_struct()?;
294            st.into_record_batch_with_schema(schema.as_ref())
295        })
296    }
297
298    /// Returns a stream over the scan with each CPU task polled on the current thread as per
299    /// the behaviour of [`futures::stream::Buffered`].
300    pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + 'static> {
301        let dtype = self.dtype()?;
302        let stream = self.into_stream()?;
303        Ok(ArrayStreamAdapter::new(dtype, stream))
304    }
305
306    /// Returns a blocking iterator over the scan.
307    ///
308    /// All work will be performed on the current thread, with tasks interleaved per the
309    /// configured concurrency. Any configured executor will be ignored.
310    pub fn into_array_iter(self) -> VortexResult<impl ArrayIterator + 'static> {
311        let dtype = self.dtype()?;
312        let concurrency = self.concurrency;
313
314        let mut local_pool = LocalPool::new();
315        let spawner = local_pool.spawner();
316
317        let mut stream = stream::iter(self.build()?)
318            .map(move |task| {
319                spawner
320                    .spawn_local_with_handle(task)
321                    .map_err(|e| vortex_err!("Failed to spawn task: {e}"))
322                    .vortex_expect("Failed to spawn task")
323            })
324            .buffered(concurrency)
325            .filter_map(|a| async move { a.transpose() })
326            .boxed_local();
327
328        Ok(ArrayIteratorAdapter::new(
329            dtype,
330            iter::from_fn(move || local_pool.run_until(stream.next())),
331        ))
332    }
333}
334
335/// Compute masks of field paths referenced by the projection and filter in the scan.
336///
337/// Projection and filter must be pre-simplified.
338fn filter_and_projection_masks(
339    projection: &ExprRef,
340    filter: Option<&ExprRef>,
341    dtype: &DType,
342) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
343    let Some(struct_dtype) = dtype.as_struct() else {
344        return Ok(match filter {
345            Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
346            None => (Vec::new(), vec![FieldMask::All]),
347        });
348    };
349    let projection_mask = immediate_scope_access(projection, struct_dtype)?;
350    Ok(match filter {
351        None => (
352            Vec::new(),
353            projection_mask.into_iter().map(to_field_mask).collect_vec(),
354        ),
355        Some(f) => {
356            let filter_mask = immediate_scope_access(f, struct_dtype)?;
357            let only_projection_mask = projection_mask
358                .difference(&filter_mask)
359                .cloned()
360                .map(to_field_mask)
361                .collect_vec();
362            (
363                filter_mask.into_iter().map(to_field_mask).collect_vec(),
364                only_projection_mask,
365            )
366        }
367    })
368}
369
370fn to_field_mask(field: FieldName) -> FieldMask {
371    FieldMask::Prefix(FieldPath::from(Field::Name(field)))
372}