Skip to main content

vortex_layout/scan/
repeated_scan.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp;
5use std::iter;
6use std::ops::Range;
7use std::sync::Arc;
8
9use futures::Stream;
10use futures::future::BoxFuture;
11use itertools::Either;
12use itertools::Itertools;
13use vortex_array::ArrayRef;
14use vortex_array::dtype::DType;
15use vortex_array::expr::Expression;
16use vortex_array::iter::ArrayIterator;
17use vortex_array::iter::ArrayIteratorAdapter;
18use vortex_array::stream::ArrayStream;
19use vortex_array::stream::ArrayStreamAdapter;
20use vortex_error::VortexResult;
21use vortex_io::runtime::BlockingRuntime;
22use vortex_io::session::RuntimeSessionExt;
23use vortex_scan::selection::Selection;
24use vortex_session::VortexSession;
25use vortex_utils::parallelism::get_available_parallelism;
26
27use crate::LayoutReaderRef;
28use crate::scan::filter::FilterExpr;
29use crate::scan::splits::Splits;
30use crate::scan::tasks::TaskContext;
31use crate::scan::tasks::split_exec;
32
33/// A projected subset (by indices, range, and filter) of rows from a Vortex data source.
34///
35/// The method of this struct enable, possibly concurrent, scanning of multiple row ranges of this
36/// data source.
37pub struct RepeatedScan<A: 'static + Send> {
38    session: VortexSession,
39    layout_reader: LayoutReaderRef,
40    projection: Expression,
41    filter: Option<Expression>,
42    ordered: bool,
43    /// Optionally read a subset of the rows in the file.
44    row_range: Option<Range<u64>>,
45    /// The selection mask to apply to the selected row range.
46    selection: Selection,
47    /// The natural splits of the file.
48    splits: Splits,
49    /// The number of splits to make progress on concurrently **per-thread**.
50    concurrency: usize,
51    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
52    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
53    /// Maximal number of rows to read (after filtering)
54    limit: Option<u64>,
55    /// The dtype of the projected arrays.
56    dtype: DType,
57}
58
59impl RepeatedScan<ArrayRef> {
60    pub fn dtype(&self) -> &DType {
61        &self.dtype
62    }
63
64    pub fn execute_array_iter<B: BlockingRuntime>(
65        &self,
66        row_range: Option<Range<u64>>,
67        runtime: &B,
68    ) -> VortexResult<impl ArrayIterator + 'static> {
69        let dtype = self.dtype.clone();
70        let stream = self.execute_stream(row_range)?;
71        let iter = runtime.block_on_stream(stream);
72        Ok(ArrayIteratorAdapter::new(dtype, iter))
73    }
74
75    pub fn execute_array_stream(
76        &self,
77        row_range: Option<Range<u64>>,
78    ) -> VortexResult<impl ArrayStream + Send + 'static> {
79        let dtype = self.dtype.clone();
80        let stream = self.execute_stream(row_range)?;
81        Ok(ArrayStreamAdapter::new(dtype, stream))
82    }
83}
84
85impl<A: 'static + Send> RepeatedScan<A> {
86    /// Constructor just to allow `scan_builder` to create a `RepeatedScan`.
87    #[expect(
88        clippy::too_many_arguments,
89        reason = "all arguments are needed for scan construction"
90    )]
91    pub fn new(
92        session: VortexSession,
93        layout_reader: LayoutReaderRef,
94        projection: Expression,
95        filter: Option<Expression>,
96        ordered: bool,
97        row_range: Option<Range<u64>>,
98        selection: Selection,
99        splits: Splits,
100        concurrency: usize,
101        map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
102        limit: Option<u64>,
103        dtype: DType,
104    ) -> Self {
105        Self {
106            session,
107            layout_reader,
108            projection,
109            filter,
110            ordered,
111            row_range,
112            selection,
113            splits,
114            concurrency,
115            map_fn,
116            limit,
117            dtype,
118        }
119    }
120
121    pub fn execute(
122        &self,
123        row_range: Option<Range<u64>>,
124    ) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
125        let ctx = Arc::new(TaskContext {
126            selection: self.selection.clone(),
127            filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
128            reader: Arc::clone(&self.layout_reader),
129            projection: self.projection.clone(),
130            mapper: Arc::clone(&self.map_fn),
131        });
132
133        let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
134
135        let ranges = match &self.splits {
136            Splits::Natural(btree_set) => {
137                let splits_iter = match row_range {
138                    None => Either::Left(btree_set.iter().copied()),
139                    Some(range) => {
140                        if range.is_empty() {
141                            return Ok(Vec::new());
142                        }
143                        Either::Right(
144                            iter::once(range.start)
145                                .chain(btree_set.range(range.clone()).copied())
146                                .chain(iter::once(range.end)),
147                        )
148                    }
149                };
150
151                Either::Left(splits_iter.tuple_windows().map(|(start, end)| start..end))
152            }
153            Splits::Ranges(ranges) => Either::Right(match row_range {
154                None => Either::Left(ranges.iter().cloned()),
155                Some(range) => {
156                    if range.is_empty() {
157                        return Ok(Vec::new());
158                    }
159                    Either::Right(ranges.iter().filter_map(move |r| {
160                        let start = cmp::max(r.start, range.start);
161                        let end = cmp::min(r.end, range.end);
162                        (start < end).then_some(start..end)
163                    }))
164                }
165            }),
166        };
167
168        let mut limit = self.limit;
169        let mut tasks = Vec::new();
170
171        for range in ranges {
172            if range.start >= range.end {
173                continue;
174            }
175
176            if limit.is_some_and(|l| l == 0) {
177                break;
178            }
179
180            tasks.push(split_exec(Arc::clone(&ctx), range, limit.as_mut())?);
181        }
182
183        Ok(tasks)
184    }
185
186    pub fn execute_stream(
187        &self,
188        row_range: Option<Range<u64>>,
189    ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
190        use futures::StreamExt;
191        let num_workers = get_available_parallelism().unwrap_or(1);
192        let concurrency = self.concurrency * num_workers;
193        let handle = self.session.handle();
194
195        let stream =
196            futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task));
197
198        let stream = if self.ordered {
199            stream.buffered(concurrency).boxed()
200        } else {
201            stream.buffer_unordered(concurrency).boxed()
202        };
203
204        Ok(stream.filter_map(|chunk| async move { chunk.transpose() }))
205    }
206}
207
208fn intersect_ranges(left: Option<&Range<u64>>, right: Option<Range<u64>>) -> Option<Range<u64>> {
209    match (left, right) {
210        (None, None) => None,
211        (None, Some(r)) => Some(r),
212        (Some(l), None) => Some(l.clone()),
213        (Some(l), Some(r)) => Some(cmp::max(l.start, r.start)..cmp::min(l.end, r.end)),
214    }
215}