Skip to main content

vortex_scan/
repeated_scan.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp;
5use std::iter;
6use std::ops::Range;
7use std::sync::Arc;
8
9use futures::Stream;
10use futures::future::BoxFuture;
11use itertools::Either;
12use itertools::Itertools;
13use vortex_array::ArrayRef;
14use vortex_array::expr::Expression;
15use vortex_array::iter::ArrayIterator;
16use vortex_array::iter::ArrayIteratorAdapter;
17use vortex_array::stream::ArrayStream;
18use vortex_array::stream::ArrayStreamAdapter;
19use vortex_dtype::DType;
20use vortex_error::VortexResult;
21use vortex_io::runtime::BlockingRuntime;
22use vortex_io::session::RuntimeSessionExt;
23use vortex_layout::LayoutReaderRef;
24use vortex_session::VortexSession;
25
26use crate::filter::FilterExpr;
27use crate::selection::Selection;
28use crate::splits::Splits;
29use crate::tasks::TaskContext;
30use crate::tasks::split_exec;
31
32/// A projected subset (by indices, range, and filter) of rows from a Vortex data source.
33///
34/// The method of this struct enable, possibly concurrent, scanning of multiple row ranges of this
35/// data source.
36pub struct RepeatedScan<A: 'static + Send> {
37    session: VortexSession,
38    layout_reader: LayoutReaderRef,
39    projection: Expression,
40    filter: Option<Expression>,
41    ordered: bool,
42    /// Optionally read a subset of the rows in the file.
43    row_range: Option<Range<u64>>,
44    /// The selection mask to apply to the selected row range.
45    selection: Selection,
46    /// The natural splits of the file.
47    splits: Splits,
48    /// The number of splits to make progress on concurrently **per-thread**.
49    concurrency: usize,
50    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
51    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
52    /// Maximal number of rows to read (after filtering)
53    limit: Option<u64>,
54    /// The dtype of the projected arrays.
55    dtype: DType,
56}
57
58impl RepeatedScan<ArrayRef> {
59    pub fn dtype(&self) -> &DType {
60        &self.dtype
61    }
62
63    pub fn execute_array_iter<B: BlockingRuntime>(
64        &self,
65        row_range: Option<Range<u64>>,
66        runtime: &B,
67    ) -> VortexResult<impl ArrayIterator + 'static> {
68        let dtype = self.dtype.clone();
69        let stream = self.execute_stream(row_range)?;
70        let iter = runtime.block_on_stream(stream);
71        Ok(ArrayIteratorAdapter::new(dtype, iter))
72    }
73
74    pub fn execute_array_stream(
75        &self,
76        row_range: Option<Range<u64>>,
77    ) -> VortexResult<impl ArrayStream + Send + 'static> {
78        let dtype = self.dtype.clone();
79        let stream = self.execute_stream(row_range)?;
80        Ok(ArrayStreamAdapter::new(dtype, stream))
81    }
82}
83
84impl<A: 'static + Send> RepeatedScan<A> {
85    /// Constructor just to allow `scan_builder` to create a `RepeatedScan`.
86    #[expect(
87        clippy::too_many_arguments,
88        reason = "all arguments are needed for scan construction"
89    )]
90    pub(super) fn new(
91        session: VortexSession,
92        layout_reader: LayoutReaderRef,
93        projection: Expression,
94        filter: Option<Expression>,
95        ordered: bool,
96        row_range: Option<Range<u64>>,
97        selection: Selection,
98        splits: Splits,
99        concurrency: usize,
100        map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
101        limit: Option<u64>,
102        dtype: DType,
103    ) -> Self {
104        Self {
105            session,
106            layout_reader,
107            projection,
108            filter,
109            ordered,
110            row_range,
111            selection,
112            splits,
113            concurrency,
114            map_fn,
115            limit,
116            dtype,
117        }
118    }
119
120    pub fn execute(
121        &self,
122        row_range: Option<Range<u64>>,
123    ) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
124        let ctx = Arc::new(TaskContext {
125            selection: self.selection.clone(),
126            filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
127            reader: self.layout_reader.clone(),
128            projection: self.projection.clone(),
129            mapper: self.map_fn.clone(),
130        });
131
132        let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
133
134        let ranges = match &self.splits {
135            Splits::Natural(btree_set) => {
136                let splits_iter = match row_range {
137                    None => Either::Left(btree_set.iter().copied()),
138                    Some(range) => {
139                        if range.is_empty() {
140                            return Ok(Vec::new());
141                        }
142                        Either::Right(
143                            iter::once(range.start)
144                                .chain(btree_set.range(range.clone()).copied())
145                                .chain(iter::once(range.end)),
146                        )
147                    }
148                };
149
150                Either::Left(splits_iter.tuple_windows().map(|(start, end)| start..end))
151            }
152            Splits::Ranges(ranges) => Either::Right(match row_range {
153                None => Either::Left(ranges.iter().cloned()),
154                Some(range) => {
155                    if range.is_empty() {
156                        return Ok(Vec::new());
157                    }
158                    Either::Right(ranges.iter().filter_map(move |r| {
159                        let start = cmp::max(r.start, range.start);
160                        let end = cmp::min(r.end, range.end);
161                        (start < end).then_some(start..end)
162                    }))
163                }
164            }),
165        };
166
167        let mut limit = self.limit;
168        let mut tasks = Vec::new();
169
170        for range in ranges {
171            if range.start >= range.end {
172                continue;
173            }
174
175            if limit.is_some_and(|l| l == 0) {
176                break;
177            }
178
179            tasks.push(split_exec(ctx.clone(), range, limit.as_mut())?);
180        }
181
182        Ok(tasks)
183    }
184
185    pub fn execute_stream(
186        &self,
187        row_range: Option<Range<u64>>,
188    ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
189        use futures::StreamExt;
190        let num_workers = std::thread::available_parallelism()
191            .map(|n| n.get())
192            .unwrap_or(1);
193        let concurrency = self.concurrency * num_workers;
194        let handle = self.session.handle();
195
196        let stream =
197            futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task));
198
199        let stream = if self.ordered {
200            stream.buffered(concurrency).boxed()
201        } else {
202            stream.buffer_unordered(concurrency).boxed()
203        };
204
205        Ok(stream.filter_map(|chunk| async move { chunk.transpose() }))
206    }
207}
208
209fn intersect_ranges(left: Option<&Range<u64>>, right: Option<Range<u64>>) -> Option<Range<u64>> {
210    match (left, right) {
211        (None, None) => None,
212        (None, Some(r)) => Some(r),
213        (Some(l), None) => Some(l.clone()),
214        (Some(l), Some(r)) => Some(cmp::max(l.start, r.start)..cmp::min(l.end, r.end)),
215    }
216}