vortex_scan/
repeated_scan.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp;
5use std::iter;
6use std::ops::Range;
7use std::sync::Arc;
8
9use futures::Stream;
10use futures::future::BoxFuture;
11use itertools::Either;
12use itertools::Itertools;
13use vortex_array::ArrayRef;
14use vortex_array::expr::Expression;
15use vortex_array::iter::ArrayIterator;
16use vortex_array::iter::ArrayIteratorAdapter;
17use vortex_array::stream::ArrayStream;
18use vortex_array::stream::ArrayStreamAdapter;
19use vortex_dtype::DType;
20use vortex_error::VortexResult;
21use vortex_io::runtime::BlockingRuntime;
22use vortex_io::session::RuntimeSessionExt;
23use vortex_layout::LayoutReaderRef;
24use vortex_session::VortexSession;
25
26use crate::filter::FilterExpr;
27use crate::selection::Selection;
28use crate::splits::Splits;
29use crate::tasks::TaskContext;
30use crate::tasks::split_exec;
31
32/// A projected subset (by indices, range, and filter) of rows from a Vortex data source.
33///
34/// The method of this struct enable, possibly concurrent, scanning of multiple row ranges of this
35/// data source.
36pub struct RepeatedScan<A: 'static + Send> {
37    session: VortexSession,
38    layout_reader: LayoutReaderRef,
39    projection: Expression,
40    filter: Option<Expression>,
41    ordered: bool,
42    /// Optionally read a subset of the rows in the file.
43    row_range: Option<Range<u64>>,
44    /// The selection mask to apply to the selected row range.
45    selection: Selection,
46    /// The natural splits of the file.
47    splits: Splits,
48    /// The number of splits to make progress on concurrently **per-thread**.
49    concurrency: usize,
50    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
51    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
52    /// Maximal number of rows to read (after filtering)
53    limit: Option<usize>,
54    /// The dtype of the projected arrays.
55    dtype: DType,
56}
57
58impl RepeatedScan<ArrayRef> {
59    pub fn execute_array_iter<B: BlockingRuntime>(
60        &self,
61        row_range: Option<Range<u64>>,
62        runtime: &B,
63    ) -> VortexResult<impl ArrayIterator + 'static> {
64        let dtype = self.dtype.clone();
65        let stream = self.execute_stream(row_range)?;
66        let iter = runtime.block_on_stream(stream);
67        Ok(ArrayIteratorAdapter::new(dtype, iter))
68    }
69
70    pub fn execute_array_stream(
71        &self,
72        row_range: Option<Range<u64>>,
73    ) -> VortexResult<impl ArrayStream + Send + 'static> {
74        let dtype = self.dtype.clone();
75        let stream = self.execute_stream(row_range)?;
76        Ok(ArrayStreamAdapter::new(dtype, stream))
77    }
78}
79
80impl<A: 'static + Send> RepeatedScan<A> {
81    /// Constructor just to allow `scan_builder` to create a `RepeatedScan`.
82    #[expect(
83        clippy::too_many_arguments,
84        reason = "all arguments are needed for scan construction"
85    )]
86    pub(super) fn new(
87        session: VortexSession,
88        layout_reader: LayoutReaderRef,
89        projection: Expression,
90        filter: Option<Expression>,
91        ordered: bool,
92        row_range: Option<Range<u64>>,
93        selection: Selection,
94        splits: Splits,
95        concurrency: usize,
96        map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
97        limit: Option<usize>,
98        dtype: DType,
99    ) -> Self {
100        Self {
101            session,
102            layout_reader,
103            projection,
104            filter,
105            ordered,
106            row_range,
107            selection,
108            splits,
109            concurrency,
110            map_fn,
111            limit,
112            dtype,
113        }
114    }
115
116    pub fn execute(
117        &self,
118        row_range: Option<Range<u64>>,
119    ) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
120        let ctx = Arc::new(TaskContext {
121            selection: self.selection.clone(),
122            filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
123            reader: self.layout_reader.clone(),
124            projection: self.projection.clone(),
125            mapper: self.map_fn.clone(),
126        });
127
128        let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
129
130        let ranges = match &self.splits {
131            Splits::Natural(btree_set) => {
132                let splits_iter = match row_range {
133                    None => Either::Left(btree_set.iter().copied()),
134                    Some(range) => {
135                        if range.is_empty() {
136                            return Ok(Vec::new());
137                        }
138                        Either::Right(
139                            iter::once(range.start)
140                                .chain(btree_set.range(range.clone()).copied())
141                                .chain(iter::once(range.end)),
142                        )
143                    }
144                };
145
146                Either::Left(splits_iter.tuple_windows().map(|(start, end)| start..end))
147            }
148            Splits::Ranges(ranges) => Either::Right(match row_range {
149                None => Either::Left(ranges.iter().cloned()),
150                Some(range) => {
151                    if range.is_empty() {
152                        return Ok(Vec::new());
153                    }
154                    Either::Right(ranges.iter().filter_map(move |r| {
155                        let start = cmp::max(r.start, range.start);
156                        let end = cmp::min(r.end, range.end);
157                        (start < end).then_some(start..end)
158                    }))
159                }
160            }),
161        };
162
163        let mut limit = self.limit;
164        ranges
165            .filter_map(|range| {
166                if range.start >= range.end || limit.is_some_and(|l| l == 0) {
167                    None
168                } else {
169                    Some(split_exec(ctx.clone(), range, limit.as_mut()))
170                }
171            })
172            .try_collect()
173    }
174
175    pub fn execute_stream(
176        &self,
177        row_range: Option<Range<u64>>,
178    ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
179        use futures::StreamExt;
180        let num_workers = std::thread::available_parallelism()
181            .map(|n| n.get())
182            .unwrap_or(1);
183        let concurrency = self.concurrency * num_workers;
184        let handle = self.session.handle();
185
186        let stream =
187            futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task));
188
189        let stream = if self.ordered {
190            stream.buffered(concurrency).boxed()
191        } else {
192            stream.buffer_unordered(concurrency).boxed()
193        };
194
195        Ok(stream.filter_map(|chunk| async move { chunk.transpose() }))
196    }
197}
198
199fn intersect_ranges(left: Option<&Range<u64>>, right: Option<Range<u64>>) -> Option<Range<u64>> {
200    match (left, right) {
201        (None, None) => None,
202        (None, Some(r)) => Some(r),
203        (Some(l), None) => Some(l.clone()),
204        (Some(l), Some(r)) => Some(cmp::max(l.start, r.start)..cmp::min(l.end, r.end)),
205    }
206}