Skip to main content

vortex_layout/scan/
repeated_scan.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp;
5use std::iter;
6use std::ops::Range;
7use std::sync::Arc;
8
9use futures::Stream;
10use futures::future::BoxFuture;
11use itertools::Either;
12use itertools::Itertools;
13use vortex_array::ArrayRef;
14use vortex_array::dtype::DType;
15use vortex_array::expr::Expression;
16use vortex_array::iter::ArrayIterator;
17use vortex_array::iter::ArrayIteratorAdapter;
18use vortex_array::stream::ArrayStream;
19use vortex_array::stream::ArrayStreamAdapter;
20use vortex_error::VortexExpect;
21use vortex_error::VortexResult;
22use vortex_io::runtime::BlockingRuntime;
23use vortex_io::session::RuntimeSessionExt;
24use vortex_scan::selection::Selection;
25use vortex_session::VortexSession;
26use vortex_utils::parallelism::get_available_parallelism;
27
28use crate::LayoutReaderRef;
29use crate::scan::filter::FilterExpr;
30use crate::scan::splits::Splits;
31use crate::scan::tasks::TaskContext;
32use crate::scan::tasks::split_exec;
33
34/// A projected subset (by indices, range, and filter) of rows from a Vortex data source.
35///
36/// The method of this struct enable, possibly concurrent, scanning of multiple row ranges of this
37/// data source.
38pub struct RepeatedScan<A: 'static + Send> {
39    session: VortexSession,
40    layout_reader: LayoutReaderRef,
41    projection: Expression,
42    filter: Option<Expression>,
43    ordered: bool,
44    /// Optionally read a subset of the rows in the file.
45    row_range: Option<Range<u64>>,
46    /// The selection mask to apply to the selected row range.
47    selection: Selection,
48    /// The natural splits of the file.
49    splits: Splits,
50    /// The number of splits to make progress on concurrently **per-thread**.
51    concurrency: usize,
52    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
53    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
54    /// Maximal number of rows to read (after filtering)
55    limit: Option<u64>,
56    /// The dtype of the projected arrays.
57    dtype: DType,
58}
59
60impl RepeatedScan<ArrayRef> {
61    pub fn dtype(&self) -> &DType {
62        &self.dtype
63    }
64
65    pub fn execute_array_iter<B: BlockingRuntime>(
66        &self,
67        row_range: Option<Range<u64>>,
68        runtime: &B,
69    ) -> VortexResult<impl ArrayIterator + 'static> {
70        let dtype = self.dtype.clone();
71        let stream = self.execute_stream(row_range)?;
72        let iter = runtime.block_on_stream(stream);
73        Ok(ArrayIteratorAdapter::new(dtype, iter))
74    }
75
76    pub fn execute_array_stream(
77        &self,
78        row_range: Option<Range<u64>>,
79    ) -> VortexResult<impl ArrayStream + Send + 'static> {
80        let dtype = self.dtype.clone();
81        let stream = self.execute_stream(row_range)?;
82        Ok(ArrayStreamAdapter::new(dtype, stream))
83    }
84}
85
86impl<A: 'static + Send> RepeatedScan<A> {
87    /// Constructor just to allow `scan_builder` to create a `RepeatedScan`.
88    #[expect(
89        clippy::too_many_arguments,
90        reason = "all arguments are needed for scan construction"
91    )]
92    pub fn new(
93        session: VortexSession,
94        layout_reader: LayoutReaderRef,
95        projection: Expression,
96        filter: Option<Expression>,
97        ordered: bool,
98        row_range: Option<Range<u64>>,
99        selection: Selection,
100        splits: Splits,
101        concurrency: usize,
102        map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
103        limit: Option<u64>,
104        dtype: DType,
105    ) -> Self {
106        Self {
107            session,
108            layout_reader,
109            projection,
110            filter,
111            ordered,
112            row_range,
113            selection,
114            splits,
115            concurrency,
116            map_fn,
117            limit,
118            dtype,
119        }
120    }
121
122    pub fn execute(
123        &self,
124        row_range: Option<Range<u64>>,
125    ) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
126        let selection_range: Option<Range<u64>> = match &self.selection {
127            Selection::IncludeByIndex(buf) if !buf.is_empty() => {
128                Some(buf[0]..buf[buf.len() - 1] + 1)
129            }
130            Selection::IncludeRoaring(roaring) if !roaring.is_empty() => {
131                Some(roaring.min().vortex_expect("empty")..roaring.max().vortex_expect("empty") + 1)
132            }
133            _ => None,
134        };
135        let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
136        let row_range = intersect_ranges(row_range.as_ref(), selection_range);
137
138        let ranges = match &self.splits {
139            Splits::Natural(vec) => {
140                debug_assert!(vec.is_sorted());
141                let splits_iter = match row_range {
142                    None => Either::Left(vec.iter().copied()),
143                    Some(range) => {
144                        if range.is_empty() {
145                            return Ok(Vec::new());
146                        }
147                        let lo = vec.partition_point(|&x| x < range.start);
148                        let hi = vec.partition_point(|&x| x < range.end);
149                        Either::Right(
150                            iter::once(range.start)
151                                .chain(vec[lo..hi].iter().copied())
152                                .chain(iter::once(range.end)),
153                        )
154                    }
155                };
156
157                Either::Left(splits_iter.tuple_windows().map(|(start, end)| start..end))
158            }
159            Splits::Ranges(ranges) => Either::Right(match row_range {
160                None => Either::Left(ranges.iter().cloned()),
161                Some(range) => {
162                    if range.is_empty() {
163                        return Ok(Vec::new());
164                    }
165                    Either::Right(ranges.iter().filter_map(move |r| {
166                        let start = cmp::max(r.start, range.start);
167                        let end = cmp::min(r.end, range.end);
168                        (start < end).then_some(start..end)
169                    }))
170                }
171            }),
172        };
173
174        let mut limit = self.limit;
175        let mut tasks = Vec::new();
176        let ctx = Arc::new(TaskContext {
177            filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
178            reader: Arc::clone(&self.layout_reader),
179            projection: self.projection.clone(),
180            mapper: Arc::clone(&self.map_fn),
181        });
182
183        for range in ranges {
184            let row_mask = self.selection.row_mask(&range);
185            if row_mask.mask().all_false() {
186                continue;
187            }
188
189            tasks.push(split_exec(Arc::clone(&ctx), row_mask, limit.as_mut())?);
190            if limit.is_some_and(|l| l == 0) {
191                break;
192            }
193        }
194
195        Ok(tasks)
196    }
197
198    pub fn execute_stream(
199        &self,
200        row_range: Option<Range<u64>>,
201    ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
202        use futures::StreamExt;
203        let num_workers = get_available_parallelism().unwrap_or(1);
204        let concurrency = self.concurrency * num_workers;
205        let handle = self.session.handle();
206
207        let stream =
208            futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task));
209
210        let stream = if self.ordered {
211            stream.buffered(concurrency).boxed()
212        } else {
213            stream.buffer_unordered(concurrency).boxed()
214        };
215
216        Ok(stream.filter_map(|chunk| async move { chunk.transpose() }))
217    }
218}
219
220fn intersect_ranges(left: Option<&Range<u64>>, right: Option<Range<u64>>) -> Option<Range<u64>> {
221    match (left, right) {
222        (None, None) => None,
223        (None, Some(r)) => Some(r),
224        (Some(l), None) => Some(l.clone()),
225        (Some(l), Some(r)) => Some(cmp::max(l.start, r.start)..cmp::min(l.end, r.end)),
226    }
227}