vortex_layout/scan/
mod.rs

1use std::sync::Arc;
2
3use executor::{Executor as _, TaskExecutor, ThreadsExecutor};
4use futures::{Stream, StreamExt, stream};
5use itertools::Itertools;
6pub use split_by::*;
7use vortex_array::builders::builder_with_capacity;
8use vortex_array::stream::{ArrayStream, ArrayStreamAdapter, ArrayStreamExt};
9use vortex_array::{Array, ArrayContext, ArrayRef};
10use vortex_buffer::Buffer;
11use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
12use vortex_error::{ResultExt, VortexExpect, VortexResult, vortex_err};
13use vortex_expr::transform::immediate_access::immediate_scope_access;
14use vortex_expr::transform::simplify_typed::simplify_typed;
15use vortex_expr::{ExprRef, Identity};
16use vortex_mask::Mask;
17use vortex_metrics::VortexMetrics;
18
19use crate::scan::filter::FilterExpr;
20use crate::scan::unified::UnifiedDriverStream;
21use crate::segments::{AsyncSegmentReader, RowRangePruner, SegmentCollector, SegmentStream};
22use crate::{
23    ExprEvaluator, Layout, LayoutReader, LayoutReaderExt, RowMask, instrument, range_intersection,
24};
25
26pub mod executor;
27pub(crate) mod filter;
28mod split_by;
29pub mod unified;
30
31pub trait ScanDriver: 'static + Sized {
32    fn segment_reader(&self) -> Arc<dyn AsyncSegmentReader>;
33
34    /// Return a future that drives the I/O stream for the segment reader.
35    /// The future should return when the stream is complete, and can return an error to
36    /// terminate the scan early.
37    ///
38    /// It is recommended that I/O is spawned and processed on its own thread, with this driver
39    /// serving only as a mechanism to signal completion or error. There is no guarantee around
40    /// how frequently this future will be polled, so it should not be used to drive I/O.
41    ///
42    /// TODO(ngates): make this a future
43    fn io_stream(self, segments: SegmentStream) -> impl Stream<Item = VortexResult<()>>;
44}
45
46/// A struct for building a scan operation.
47pub struct ScanBuilder<D: ScanDriver> {
48    driver: D,
49    task_executor: Option<TaskExecutor>,
50    layout: Layout,
51    ctx: ArrayContext, // TODO(ngates): store this on larger context on Layout
52    projection: ExprRef,
53    filter: Option<ExprRef>,
54    row_indices: Option<Buffer<u64>>,
55    split_by: SplitBy,
56    canonicalize: bool,
57    // The number of splits to make progress on concurrently.
58    concurrency: usize,
59    prefetch_conjuncts: bool,
60    metrics: VortexMetrics,
61}
62
63impl<D: ScanDriver> ScanBuilder<D> {
64    pub fn new(driver: D, layout: Layout, ctx: ArrayContext) -> Self {
65        Self {
66            driver,
67            task_executor: None,
68            layout,
69            ctx,
70            projection: Identity::new_expr(),
71            filter: None,
72            row_indices: None,
73            split_by: SplitBy::Layout,
74            canonicalize: false,
75            prefetch_conjuncts: false,
76            concurrency: 1024,
77            metrics: Default::default(),
78        }
79    }
80
81    pub fn with_filter(mut self, filter: ExprRef) -> Self {
82        self.filter = Some(filter);
83        self
84    }
85
86    pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
87        self.filter = filter;
88        self
89    }
90
91    pub fn with_projection(mut self, projection: ExprRef) -> Self {
92        self.projection = projection;
93        self
94    }
95
96    pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
97        self.row_indices = Some(row_indices);
98        self
99    }
100
101    pub fn with_some_row_indices(mut self, row_indices: Option<Buffer<u64>>) -> Self {
102        self.row_indices = row_indices;
103        self
104    }
105
106    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
107        self.split_by = split_by;
108        self
109    }
110
111    /// Set whether the scan should canonicalize the output.
112    pub fn with_canonicalize(mut self, canonicalize: bool) -> Self {
113        self.canonicalize = canonicalize;
114        self
115    }
116
117    /// The number of row splits to make progress on concurrently, must be greater than 0.
118    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
119        assert!(concurrency > 0);
120        self.concurrency = concurrency;
121        self
122    }
123
124    /// The number of row splits to make progress on concurrently, must be greater than 0.
125    pub fn with_prefetch_conjuncts(mut self, prefetch: bool) -> Self {
126        self.prefetch_conjuncts = prefetch;
127        self
128    }
129
130    pub fn with_task_executor(mut self, task_executor: TaskExecutor) -> Self {
131        self.task_executor = Some(task_executor);
132        self
133    }
134
135    pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
136        self.metrics = metrics;
137        self
138    }
139
140    pub fn build(self) -> VortexResult<Scan<D>> {
141        let projection = simplify_typed(self.projection.clone(), self.layout.dtype())?;
142        let filter = self
143            .filter
144            .clone()
145            .map(|f| simplify_typed(f, self.layout.dtype()))
146            .transpose()?;
147        let (filter_mask, projection_mask) =
148            filter_and_projection_masks(&projection, filter.as_ref(), self.layout.dtype())?;
149
150        let field_mask: Vec<_> = filter_mask
151            .iter()
152            .cloned()
153            .chain(projection_mask.iter().cloned())
154            .collect();
155
156        let splits = self.split_by.splits(&self.layout, &field_mask)?;
157        let mut collector = SegmentCollector::new(self.metrics.clone());
158        self.layout
159            .required_segments(0, &filter_mask, &projection_mask, &mut collector)?;
160        let (mut row_range_pruner, segments) = collector.finish();
161        let row_indices = self.row_indices.clone();
162        if let Some(indices) = &row_indices {
163            row_range_pruner.retain_matching(indices.clone());
164        }
165
166        let row_masks = splits
167            .into_iter()
168            .filter_map(move |row_range| {
169                let Some(row_indices) = &row_indices else {
170                    // If there is no row indices filter, then take the whole range
171                    return Some(RowMask::new_valid_between(row_range.start, row_range.end));
172                };
173
174                // Otherwise, find the indices that are within the row range.
175                let intersection = range_intersection(&row_range, row_indices)?;
176
177                // Construct a row mask for the range.
178                let filter_mask = Mask::from_indices(
179                    usize::try_from(row_range.end - row_range.start)
180                        .vortex_expect("Split ranges are within usize"),
181                    row_indices[intersection]
182                        .iter()
183                        .map(|&idx| {
184                            usize::try_from(idx - row_range.start)
185                                .vortex_expect("index within range")
186                        })
187                        .collect(),
188                );
189                Some(RowMask::new(filter_mask, row_range.start))
190            })
191            .collect_vec();
192
193        Ok(Scan {
194            driver: self.driver,
195            task_executor: self
196                .task_executor
197                .unwrap_or(TaskExecutor::Threads(ThreadsExecutor::default())),
198            layout: self.layout,
199            ctx: self.ctx,
200            projection,
201            filter,
202            row_masks,
203            canonicalize: self.canonicalize,
204            concurrency: self.concurrency,
205            prefetch_conjuncts: self.prefetch_conjuncts,
206            row_range_pruner,
207            segments,
208        })
209    }
210
211    /// Perform the scan operation and return a stream of arrays.
212    pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + 'static> {
213        self.build()?.into_array_stream()
214    }
215
216    pub async fn read_all(self) -> VortexResult<ArrayRef> {
217        self.into_array_stream()?.read_all().await
218    }
219}
220
221/// Compute masks of field paths referenced by the projection and filter in the scan.
222///
223/// Projection and filter must be pre-simplified.
224fn filter_and_projection_masks(
225    projection: &ExprRef,
226    filter: Option<&ExprRef>,
227    dtype: &DType,
228) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
229    let Some(struct_dtype) = dtype.as_struct() else {
230        return Ok(match filter {
231            Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
232            None => (Vec::new(), vec![FieldMask::All]),
233        });
234    };
235    let projection_mask = immediate_scope_access(projection, struct_dtype)?;
236    Ok(match filter {
237        None => (
238            Vec::new(),
239            projection_mask.into_iter().map(to_field_mask).collect_vec(),
240        ),
241        Some(f) => {
242            let filter_mask = immediate_scope_access(f, struct_dtype)?;
243            let only_projection_mask = projection_mask
244                .difference(&filter_mask)
245                .cloned()
246                .map(to_field_mask)
247                .collect_vec();
248            (
249                filter_mask.into_iter().map(to_field_mask).collect_vec(),
250                only_projection_mask,
251            )
252        }
253    })
254}
255
256fn to_field_mask(field: FieldName) -> FieldMask {
257    FieldMask::Prefix(FieldPath::from(Field::Name(field)))
258}
259
260pub struct Scan<D> {
261    driver: D,
262    task_executor: TaskExecutor,
263    layout: Layout,
264    ctx: ArrayContext,
265    // Guaranteed to be simplified
266    projection: ExprRef,
267    // Guaranteed to be simplified
268    filter: Option<ExprRef>,
269    row_masks: Vec<RowMask>,
270    canonicalize: bool,
271    //TODO(adam): bake this into the executors?
272    concurrency: usize,
273    prefetch_conjuncts: bool,
274    row_range_pruner: RowRangePruner,
275    segments: SegmentStream,
276}
277
278impl<D: ScanDriver> Scan<D> {
279    /// Perform the scan operation and return a stream of arrays.
280    ///
281    /// The returned stream should be considered to perform I/O-bound operations and requires
282    /// frequent polling to make progress.
283    pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + 'static> {
284        // Create a single LayoutReader that is reused for the entire scan.
285        let segment_reader = self.driver.segment_reader();
286        let task_executor = self.task_executor.clone();
287        let reader: Arc<dyn LayoutReader> = self.layout.reader(segment_reader, self.ctx.clone())?;
288
289        let pruning = self
290            .filter
291            .as_ref()
292            .map(|filter| {
293                let pruning = Arc::new(FilterExpr::try_new(
294                    reader.dtype().as_struct().ok_or_else(|| {
295                        vortex_err!("Vortex scan currently only works for struct arrays")
296                    })?,
297                    filter.clone(),
298                    self.prefetch_conjuncts,
299                )?);
300
301                VortexResult::Ok(pruning)
302            })
303            .transpose()?;
304
305        // We start with a stream of row masks
306        let row_masks = stream::iter(self.row_masks);
307        let projection = self.projection.clone();
308        let row_range_pruner = self.row_range_pruner.clone();
309
310        let exec_stream = row_masks
311            .map(move |row_mask| {
312                let reader = reader.clone();
313                let projection = projection.clone();
314                let pruning = pruning.clone();
315                let reader = reader.clone();
316                let mut row_range_pruner = row_range_pruner.clone();
317
318                // This future is the processing task
319                instrument!("process", async move {
320                    let row_mask = match pruning {
321                        None => row_mask,
322                        Some(pruning_filter) => {
323                            pruning_filter
324                                .new_evaluation(&row_mask)
325                                .evaluate(reader.clone())
326                                .await?
327                        }
328                    };
329
330                    // Filter out all-false masks
331                    if row_mask.filter_mask().all_false() {
332                        row_range_pruner
333                            .remove(row_mask.begin()..row_mask.end())
334                            .await?;
335                        Ok(None)
336                    } else {
337                        let mut array = reader.evaluate_expr(row_mask, projection).await?;
338                        if self.canonicalize {
339                            let mut builder = builder_with_capacity(array.dtype(), array.len());
340                            array.append_to_builder(builder.as_mut())?;
341                            array = builder.finish();
342                        }
343                        VortexResult::Ok(Some(array))
344                    }
345                })
346            })
347            .map(move |processing_task| task_executor.spawn(processing_task))
348            .buffered(self.concurrency)
349            .filter_map(|v| async move { v.unnest().transpose() });
350
351        let exec_stream = instrument!("exec_stream", exec_stream);
352        let io_stream = self.driver.io_stream(self.segments);
353
354        let unified = UnifiedDriverStream {
355            exec_stream,
356            io_stream,
357        };
358
359        let result_dtype = self.projection.return_dtype(self.layout.dtype())?;
360        Ok(ArrayStreamAdapter::new(result_dtype, unified))
361    }
362
363    pub async fn read_all(self) -> VortexResult<ArrayRef> {
364        self.into_array_stream()?.read_all().await
365    }
366}