vortex_scan/
repeated_scan.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::ops::Range;
5use std::sync::Arc;
6use std::{cmp, iter};
7
8use futures::Stream;
9use futures::future::BoxFuture;
10use itertools::{Either, Itertools};
11use vortex_array::ArrayRef;
12use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
13use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
14use vortex_dtype::DType;
15use vortex_error::VortexResult;
16use vortex_expr::ExprRef;
17use vortex_io::runtime::{BlockingRuntime, Handle};
18use vortex_layout::LayoutReaderRef;
19
20use crate::filter::FilterExpr;
21use crate::selection::Selection;
22use crate::splits::Splits;
23use crate::tasks::{TaskContext, split_exec};
24
25/// A projected subset (by indices, range, and filter) of rows from a Vortex data source.
26///
27/// The method of this struct enable, possibly concurrent, scanning of multiple row ranges of this
28/// data source.
29pub struct RepeatedScan<A: 'static + Send> {
30    handle: Handle,
31    layout_reader: LayoutReaderRef,
32    projection: ExprRef,
33    filter: Option<ExprRef>,
34    ordered: bool,
35    /// Optionally read a subset of the rows in the file.
36    row_range: Option<Range<u64>>,
37    /// The selection mask to apply to the selected row range.
38    selection: Selection,
39    /// The natural splits of the file.
40    splits: Splits,
41    /// The number of splits to make progress on concurrently **per-thread**.
42    concurrency: usize,
43    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
44    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
45    /// Maximal number of rows to read (after filtering)
46    limit: Option<usize>,
47    /// The dtype of the projected arrays.
48    dtype: DType,
49}
50
51impl RepeatedScan<ArrayRef> {
52    pub fn execute_array_iter<B: BlockingRuntime>(
53        &self,
54        row_range: Option<Range<u64>>,
55        runtime: &B,
56    ) -> VortexResult<impl ArrayIterator + 'static> {
57        let dtype = self.dtype.clone();
58        let stream = self.execute_stream(row_range)?;
59        let iter = runtime.block_on_stream(move |_h| stream);
60        Ok(ArrayIteratorAdapter::new(dtype, iter))
61    }
62
63    pub fn execute_array_stream(
64        &self,
65        row_range: Option<Range<u64>>,
66    ) -> VortexResult<impl ArrayStream + Send + 'static> {
67        let dtype = self.dtype.clone();
68        let stream = self.execute_stream(row_range)?;
69        Ok(ArrayStreamAdapter::new(dtype, stream))
70    }
71}
72
73impl<A: 'static + Send> RepeatedScan<A> {
74    /// Constructor just to allow `scan_builder` to create a `RepeatedScan`.
75    #[allow(clippy::too_many_arguments)]
76    pub(super) fn new(
77        handle: Handle,
78        layout_reader: LayoutReaderRef,
79        projection: ExprRef,
80        filter: Option<ExprRef>,
81        ordered: bool,
82        row_range: Option<Range<u64>>,
83        selection: Selection,
84        splits: Splits,
85        concurrency: usize,
86        map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
87        limit: Option<usize>,
88        dtype: DType,
89    ) -> Self {
90        Self {
91            handle,
92            layout_reader,
93            projection,
94            filter,
95            ordered,
96            row_range,
97            selection,
98            splits,
99            concurrency,
100            map_fn,
101            limit,
102            dtype,
103        }
104    }
105
106    pub fn execute(
107        &self,
108        row_range: Option<Range<u64>>,
109    ) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
110        let ctx = Arc::new(TaskContext {
111            selection: self.selection.clone(),
112            filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
113            reader: self.layout_reader.clone(),
114            projection: self.projection.clone(),
115            mapper: self.map_fn.clone(),
116        });
117
118        let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
119
120        let ranges = match &self.splits {
121            Splits::Natural(btree_set) => {
122                let splits_iter = match row_range {
123                    None => Either::Left(btree_set.iter().copied()),
124                    Some(range) => {
125                        if range.is_empty() {
126                            return Ok(Vec::new());
127                        }
128                        Either::Right(
129                            iter::once(range.start)
130                                .chain(btree_set.range(range.clone()).copied())
131                                .chain(iter::once(range.end)),
132                        )
133                    }
134                };
135
136                Either::Left(splits_iter.tuple_windows().map(|(start, end)| start..end))
137            }
138            Splits::Ranges(ranges) => Either::Right(match row_range {
139                None => Either::Left(ranges.iter().cloned()),
140                Some(range) => {
141                    if range.is_empty() {
142                        return Ok(Vec::new());
143                    }
144                    Either::Right(ranges.iter().filter_map(move |r| {
145                        let start = cmp::max(r.start, range.start);
146                        let end = cmp::min(r.end, range.end);
147                        (start < end).then_some(start..end)
148                    }))
149                }
150            }),
151        };
152
153        let mut limit = self.limit;
154        ranges
155            .filter_map(|range| {
156                if range.start >= range.end || limit.is_some_and(|l| l == 0) {
157                    None
158                } else {
159                    Some(split_exec(ctx.clone(), range, limit.as_mut()))
160                }
161            })
162            .try_collect()
163    }
164
165    pub fn execute_stream(
166        &self,
167        row_range: Option<Range<u64>>,
168    ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
169        use futures::StreamExt;
170        let num_workers = std::thread::available_parallelism()
171            .map(|n| n.get())
172            .unwrap_or(1);
173        let concurrency = self.concurrency * num_workers;
174        let handle = self.handle.clone();
175
176        let stream =
177            futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task));
178
179        let stream = if self.ordered {
180            stream.buffered(concurrency).boxed()
181        } else {
182            stream.buffer_unordered(concurrency).boxed()
183        };
184
185        Ok(stream.filter_map(|chunk| async move { chunk.transpose() }))
186    }
187}
188
189fn intersect_ranges(left: Option<&Range<u64>>, right: Option<Range<u64>>) -> Option<Range<u64>> {
190    match (left, right) {
191        (None, None) => None,
192        (None, Some(r)) => Some(r),
193        (Some(l), None) => Some(l.clone()),
194        (Some(l), Some(r)) => Some(cmp::max(l.start, r.start)..cmp::min(l.end, r.end)),
195    }
196}