vortex_layout/scan/
repeated_scan.rs1use std::cmp;
5use std::iter;
6use std::ops::Range;
7use std::sync::Arc;
8
9use futures::Stream;
10use futures::future::BoxFuture;
11use itertools::Either;
12use itertools::Itertools;
13use vortex_array::ArrayRef;
14use vortex_array::dtype::DType;
15use vortex_array::expr::Expression;
16use vortex_array::iter::ArrayIterator;
17use vortex_array::iter::ArrayIteratorAdapter;
18use vortex_array::stream::ArrayStream;
19use vortex_array::stream::ArrayStreamAdapter;
20use vortex_error::VortexExpect;
21use vortex_error::VortexResult;
22use vortex_io::runtime::BlockingRuntime;
23use vortex_io::session::RuntimeSessionExt;
24use vortex_scan::selection::Selection;
25use vortex_session::VortexSession;
26use vortex_utils::parallelism::get_available_parallelism;
27
28use crate::LayoutReaderRef;
29use crate::scan::filter::FilterExpr;
30use crate::scan::splits::Splits;
31use crate::scan::tasks::TaskContext;
32use crate::scan::tasks::split_exec;
33
34pub struct RepeatedScan<A: 'static + Send> {
39 session: VortexSession,
40 layout_reader: LayoutReaderRef,
41 projection: Expression,
42 filter: Option<Expression>,
43 ordered: bool,
44 row_range: Option<Range<u64>>,
46 selection: Selection,
48 splits: Splits,
50 concurrency: usize,
52 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
54 limit: Option<u64>,
56 dtype: DType,
58}
59
60impl RepeatedScan<ArrayRef> {
61 pub fn dtype(&self) -> &DType {
62 &self.dtype
63 }
64
65 pub fn execute_array_iter<B: BlockingRuntime>(
66 &self,
67 row_range: Option<Range<u64>>,
68 runtime: &B,
69 ) -> VortexResult<impl ArrayIterator + 'static> {
70 let dtype = self.dtype.clone();
71 let stream = self.execute_stream(row_range)?;
72 let iter = runtime.block_on_stream(stream);
73 Ok(ArrayIteratorAdapter::new(dtype, iter))
74 }
75
76 pub fn execute_array_stream(
77 &self,
78 row_range: Option<Range<u64>>,
79 ) -> VortexResult<impl ArrayStream + Send + 'static> {
80 let dtype = self.dtype.clone();
81 let stream = self.execute_stream(row_range)?;
82 Ok(ArrayStreamAdapter::new(dtype, stream))
83 }
84}
85
86impl<A: 'static + Send> RepeatedScan<A> {
87 #[expect(
89 clippy::too_many_arguments,
90 reason = "all arguments are needed for scan construction"
91 )]
92 pub fn new(
93 session: VortexSession,
94 layout_reader: LayoutReaderRef,
95 projection: Expression,
96 filter: Option<Expression>,
97 ordered: bool,
98 row_range: Option<Range<u64>>,
99 selection: Selection,
100 splits: Splits,
101 concurrency: usize,
102 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
103 limit: Option<u64>,
104 dtype: DType,
105 ) -> Self {
106 Self {
107 session,
108 layout_reader,
109 projection,
110 filter,
111 ordered,
112 row_range,
113 selection,
114 splits,
115 concurrency,
116 map_fn,
117 limit,
118 dtype,
119 }
120 }
121
122 pub fn execute(
123 &self,
124 row_range: Option<Range<u64>>,
125 ) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
126 let selection_range: Option<Range<u64>> = match &self.selection {
127 Selection::IncludeByIndex(buf) if !buf.is_empty() => {
128 Some(buf[0]..buf[buf.len() - 1] + 1)
129 }
130 Selection::IncludeRoaring(roaring) if !roaring.is_empty() => {
131 Some(roaring.min().vortex_expect("empty")..roaring.max().vortex_expect("empty") + 1)
132 }
133 _ => None,
134 };
135 let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
136 let row_range = intersect_ranges(row_range.as_ref(), selection_range);
137
138 let ranges = match &self.splits {
139 Splits::Natural(vec) => {
140 debug_assert!(vec.is_sorted());
141 let splits_iter = match row_range {
142 None => Either::Left(vec.iter().copied()),
143 Some(range) => {
144 if range.is_empty() {
145 return Ok(Vec::new());
146 }
147 let lo = vec.partition_point(|&x| x < range.start);
148 let hi = vec.partition_point(|&x| x < range.end);
149 Either::Right(
150 iter::once(range.start)
151 .chain(vec[lo..hi].iter().copied())
152 .chain(iter::once(range.end)),
153 )
154 }
155 };
156
157 Either::Left(splits_iter.tuple_windows().map(|(start, end)| start..end))
158 }
159 Splits::Ranges(ranges) => Either::Right(match row_range {
160 None => Either::Left(ranges.iter().cloned()),
161 Some(range) => {
162 if range.is_empty() {
163 return Ok(Vec::new());
164 }
165 Either::Right(ranges.iter().filter_map(move |r| {
166 let start = cmp::max(r.start, range.start);
167 let end = cmp::min(r.end, range.end);
168 (start < end).then_some(start..end)
169 }))
170 }
171 }),
172 };
173
174 let mut limit = self.limit;
175 let mut tasks = Vec::new();
176 let ctx = Arc::new(TaskContext {
177 filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
178 reader: Arc::clone(&self.layout_reader),
179 projection: self.projection.clone(),
180 mapper: Arc::clone(&self.map_fn),
181 });
182
183 for range in ranges {
184 let row_mask = self.selection.row_mask(&range);
185 if row_mask.mask().all_false() {
186 continue;
187 }
188
189 tasks.push(split_exec(Arc::clone(&ctx), row_mask, limit.as_mut())?);
190 if limit.is_some_and(|l| l == 0) {
191 break;
192 }
193 }
194
195 Ok(tasks)
196 }
197
198 pub fn execute_stream(
199 &self,
200 row_range: Option<Range<u64>>,
201 ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
202 use futures::StreamExt;
203 let num_workers = get_available_parallelism().unwrap_or(1);
204 let concurrency = self.concurrency * num_workers;
205 let handle = self.session.handle();
206
207 let stream =
208 futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task));
209
210 let stream = if self.ordered {
211 stream.buffered(concurrency).boxed()
212 } else {
213 stream.buffer_unordered(concurrency).boxed()
214 };
215
216 Ok(stream.filter_map(|chunk| async move { chunk.transpose() }))
217 }
218}
219
220fn intersect_ranges(left: Option<&Range<u64>>, right: Option<Range<u64>>) -> Option<Range<u64>> {
221 match (left, right) {
222 (None, None) => None,
223 (None, Some(r)) => Some(r),
224 (Some(l), None) => Some(l.clone()),
225 (Some(l), Some(r)) => Some(cmp::max(l.start, r.start)..cmp::min(l.end, r.end)),
226 }
227}