vortex_scan/
repeated_scan.rs1use std::cmp;
5use std::iter;
6use std::ops::Range;
7use std::sync::Arc;
8
9use futures::Stream;
10use futures::future::BoxFuture;
11use itertools::Either;
12use itertools::Itertools;
13use vortex_array::ArrayRef;
14use vortex_array::expr::Expression;
15use vortex_array::iter::ArrayIterator;
16use vortex_array::iter::ArrayIteratorAdapter;
17use vortex_array::stream::ArrayStream;
18use vortex_array::stream::ArrayStreamAdapter;
19use vortex_dtype::DType;
20use vortex_error::VortexResult;
21use vortex_io::runtime::BlockingRuntime;
22use vortex_io::session::RuntimeSessionExt;
23use vortex_layout::LayoutReaderRef;
24use vortex_session::VortexSession;
25
26use crate::filter::FilterExpr;
27use crate::selection::Selection;
28use crate::splits::Splits;
29use crate::tasks::TaskContext;
30use crate::tasks::split_exec;
31
32pub struct RepeatedScan<A: 'static + Send> {
37 session: VortexSession,
38 layout_reader: LayoutReaderRef,
39 projection: Expression,
40 filter: Option<Expression>,
41 ordered: bool,
42 row_range: Option<Range<u64>>,
44 selection: Selection,
46 splits: Splits,
48 concurrency: usize,
50 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
52 limit: Option<u64>,
54 dtype: DType,
56}
57
58impl RepeatedScan<ArrayRef> {
59 pub fn dtype(&self) -> &DType {
60 &self.dtype
61 }
62
63 pub fn execute_array_iter<B: BlockingRuntime>(
64 &self,
65 row_range: Option<Range<u64>>,
66 runtime: &B,
67 ) -> VortexResult<impl ArrayIterator + 'static> {
68 let dtype = self.dtype.clone();
69 let stream = self.execute_stream(row_range)?;
70 let iter = runtime.block_on_stream(stream);
71 Ok(ArrayIteratorAdapter::new(dtype, iter))
72 }
73
74 pub fn execute_array_stream(
75 &self,
76 row_range: Option<Range<u64>>,
77 ) -> VortexResult<impl ArrayStream + Send + 'static> {
78 let dtype = self.dtype.clone();
79 let stream = self.execute_stream(row_range)?;
80 Ok(ArrayStreamAdapter::new(dtype, stream))
81 }
82}
83
84impl<A: 'static + Send> RepeatedScan<A> {
85 #[expect(
87 clippy::too_many_arguments,
88 reason = "all arguments are needed for scan construction"
89 )]
90 pub(super) fn new(
91 session: VortexSession,
92 layout_reader: LayoutReaderRef,
93 projection: Expression,
94 filter: Option<Expression>,
95 ordered: bool,
96 row_range: Option<Range<u64>>,
97 selection: Selection,
98 splits: Splits,
99 concurrency: usize,
100 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
101 limit: Option<u64>,
102 dtype: DType,
103 ) -> Self {
104 Self {
105 session,
106 layout_reader,
107 projection,
108 filter,
109 ordered,
110 row_range,
111 selection,
112 splits,
113 concurrency,
114 map_fn,
115 limit,
116 dtype,
117 }
118 }
119
120 pub fn execute(
121 &self,
122 row_range: Option<Range<u64>>,
123 ) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
124 let ctx = Arc::new(TaskContext {
125 selection: self.selection.clone(),
126 filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
127 reader: self.layout_reader.clone(),
128 projection: self.projection.clone(),
129 mapper: self.map_fn.clone(),
130 });
131
132 let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
133
134 let ranges = match &self.splits {
135 Splits::Natural(btree_set) => {
136 let splits_iter = match row_range {
137 None => Either::Left(btree_set.iter().copied()),
138 Some(range) => {
139 if range.is_empty() {
140 return Ok(Vec::new());
141 }
142 Either::Right(
143 iter::once(range.start)
144 .chain(btree_set.range(range.clone()).copied())
145 .chain(iter::once(range.end)),
146 )
147 }
148 };
149
150 Either::Left(splits_iter.tuple_windows().map(|(start, end)| start..end))
151 }
152 Splits::Ranges(ranges) => Either::Right(match row_range {
153 None => Either::Left(ranges.iter().cloned()),
154 Some(range) => {
155 if range.is_empty() {
156 return Ok(Vec::new());
157 }
158 Either::Right(ranges.iter().filter_map(move |r| {
159 let start = cmp::max(r.start, range.start);
160 let end = cmp::min(r.end, range.end);
161 (start < end).then_some(start..end)
162 }))
163 }
164 }),
165 };
166
167 let mut limit = self.limit;
168 let mut tasks = Vec::new();
169
170 for range in ranges {
171 if range.start >= range.end {
172 continue;
173 }
174
175 if limit.is_some_and(|l| l == 0) {
176 break;
177 }
178
179 tasks.push(split_exec(ctx.clone(), range, limit.as_mut())?);
180 }
181
182 Ok(tasks)
183 }
184
185 pub fn execute_stream(
186 &self,
187 row_range: Option<Range<u64>>,
188 ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
189 use futures::StreamExt;
190 let num_workers = std::thread::available_parallelism()
191 .map(|n| n.get())
192 .unwrap_or(1);
193 let concurrency = self.concurrency * num_workers;
194 let handle = self.session.handle();
195
196 let stream =
197 futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task));
198
199 let stream = if self.ordered {
200 stream.buffered(concurrency).boxed()
201 } else {
202 stream.buffer_unordered(concurrency).boxed()
203 };
204
205 Ok(stream.filter_map(|chunk| async move { chunk.transpose() }))
206 }
207}
208
209fn intersect_ranges(left: Option<&Range<u64>>, right: Option<Range<u64>>) -> Option<Range<u64>> {
210 match (left, right) {
211 (None, None) => None,
212 (None, Some(r)) => Some(r),
213 (Some(l), None) => Some(l.clone()),
214 (Some(l), Some(r)) => Some(cmp::max(l.start, r.start)..cmp::min(l.end, r.end)),
215 }
216}