vortex_layout/scan/
repeated_scan.rs1use std::cmp;
5use std::iter;
6use std::ops::Range;
7use std::sync::Arc;
8
9use futures::Stream;
10use futures::future::BoxFuture;
11use itertools::Either;
12use itertools::Itertools;
13use vortex_array::ArrayRef;
14use vortex_array::dtype::DType;
15use vortex_array::expr::Expression;
16use vortex_array::iter::ArrayIterator;
17use vortex_array::iter::ArrayIteratorAdapter;
18use vortex_array::stream::ArrayStream;
19use vortex_array::stream::ArrayStreamAdapter;
20use vortex_error::VortexResult;
21use vortex_io::runtime::BlockingRuntime;
22use vortex_io::session::RuntimeSessionExt;
23use vortex_scan::selection::Selection;
24use vortex_session::VortexSession;
25use vortex_utils::parallelism::get_available_parallelism;
26
27use crate::LayoutReaderRef;
28use crate::scan::filter::FilterExpr;
29use crate::scan::splits::Splits;
30use crate::scan::tasks::TaskContext;
31use crate::scan::tasks::split_exec;
32
33pub struct RepeatedScan<A: 'static + Send> {
38 session: VortexSession,
39 layout_reader: LayoutReaderRef,
40 projection: Expression,
41 filter: Option<Expression>,
42 ordered: bool,
43 row_range: Option<Range<u64>>,
45 selection: Selection,
47 splits: Splits,
49 concurrency: usize,
51 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
53 limit: Option<u64>,
55 dtype: DType,
57}
58
59impl RepeatedScan<ArrayRef> {
60 pub fn dtype(&self) -> &DType {
61 &self.dtype
62 }
63
64 pub fn execute_array_iter<B: BlockingRuntime>(
65 &self,
66 row_range: Option<Range<u64>>,
67 runtime: &B,
68 ) -> VortexResult<impl ArrayIterator + 'static> {
69 let dtype = self.dtype.clone();
70 let stream = self.execute_stream(row_range)?;
71 let iter = runtime.block_on_stream(stream);
72 Ok(ArrayIteratorAdapter::new(dtype, iter))
73 }
74
75 pub fn execute_array_stream(
76 &self,
77 row_range: Option<Range<u64>>,
78 ) -> VortexResult<impl ArrayStream + Send + 'static> {
79 let dtype = self.dtype.clone();
80 let stream = self.execute_stream(row_range)?;
81 Ok(ArrayStreamAdapter::new(dtype, stream))
82 }
83}
84
85impl<A: 'static + Send> RepeatedScan<A> {
86 #[expect(
88 clippy::too_many_arguments,
89 reason = "all arguments are needed for scan construction"
90 )]
91 pub fn new(
92 session: VortexSession,
93 layout_reader: LayoutReaderRef,
94 projection: Expression,
95 filter: Option<Expression>,
96 ordered: bool,
97 row_range: Option<Range<u64>>,
98 selection: Selection,
99 splits: Splits,
100 concurrency: usize,
101 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
102 limit: Option<u64>,
103 dtype: DType,
104 ) -> Self {
105 Self {
106 session,
107 layout_reader,
108 projection,
109 filter,
110 ordered,
111 row_range,
112 selection,
113 splits,
114 concurrency,
115 map_fn,
116 limit,
117 dtype,
118 }
119 }
120
121 pub fn execute(
122 &self,
123 row_range: Option<Range<u64>>,
124 ) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
125 let ctx = Arc::new(TaskContext {
126 selection: self.selection.clone(),
127 filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
128 reader: Arc::clone(&self.layout_reader),
129 projection: self.projection.clone(),
130 mapper: Arc::clone(&self.map_fn),
131 });
132
133 let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
134
135 let ranges = match &self.splits {
136 Splits::Natural(btree_set) => {
137 let splits_iter = match row_range {
138 None => Either::Left(btree_set.iter().copied()),
139 Some(range) => {
140 if range.is_empty() {
141 return Ok(Vec::new());
142 }
143 Either::Right(
144 iter::once(range.start)
145 .chain(btree_set.range(range.clone()).copied())
146 .chain(iter::once(range.end)),
147 )
148 }
149 };
150
151 Either::Left(splits_iter.tuple_windows().map(|(start, end)| start..end))
152 }
153 Splits::Ranges(ranges) => Either::Right(match row_range {
154 None => Either::Left(ranges.iter().cloned()),
155 Some(range) => {
156 if range.is_empty() {
157 return Ok(Vec::new());
158 }
159 Either::Right(ranges.iter().filter_map(move |r| {
160 let start = cmp::max(r.start, range.start);
161 let end = cmp::min(r.end, range.end);
162 (start < end).then_some(start..end)
163 }))
164 }
165 }),
166 };
167
168 let mut limit = self.limit;
169 let mut tasks = Vec::new();
170
171 for range in ranges {
172 if range.start >= range.end {
173 continue;
174 }
175
176 if limit.is_some_and(|l| l == 0) {
177 break;
178 }
179
180 tasks.push(split_exec(Arc::clone(&ctx), range, limit.as_mut())?);
181 }
182
183 Ok(tasks)
184 }
185
186 pub fn execute_stream(
187 &self,
188 row_range: Option<Range<u64>>,
189 ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
190 use futures::StreamExt;
191 let num_workers = get_available_parallelism().unwrap_or(1);
192 let concurrency = self.concurrency * num_workers;
193 let handle = self.session.handle();
194
195 let stream =
196 futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task));
197
198 let stream = if self.ordered {
199 stream.buffered(concurrency).boxed()
200 } else {
201 stream.buffer_unordered(concurrency).boxed()
202 };
203
204 Ok(stream.filter_map(|chunk| async move { chunk.transpose() }))
205 }
206}
207
208fn intersect_ranges(left: Option<&Range<u64>>, right: Option<Range<u64>>) -> Option<Range<u64>> {
209 match (left, right) {
210 (None, None) => None,
211 (None, Some(r)) => Some(r),
212 (Some(l), None) => Some(l.clone()),
213 (Some(l), Some(r)) => Some(cmp::max(l.start, r.start)..cmp::min(l.end, r.end)),
214 }
215}