vortex_scan/
repeated_scan.rs1use std::cmp;
5use std::iter;
6use std::ops::Range;
7use std::sync::Arc;
8
9use futures::Stream;
10use futures::future::BoxFuture;
11use itertools::Either;
12use itertools::Itertools;
13use vortex_array::ArrayRef;
14use vortex_array::expr::Expression;
15use vortex_array::iter::ArrayIterator;
16use vortex_array::iter::ArrayIteratorAdapter;
17use vortex_array::stream::ArrayStream;
18use vortex_array::stream::ArrayStreamAdapter;
19use vortex_dtype::DType;
20use vortex_error::VortexResult;
21use vortex_io::runtime::BlockingRuntime;
22use vortex_io::session::RuntimeSessionExt;
23use vortex_layout::LayoutReaderRef;
24use vortex_session::VortexSession;
25
26use crate::filter::FilterExpr;
27use crate::selection::Selection;
28use crate::splits::Splits;
29use crate::tasks::TaskContext;
30use crate::tasks::split_exec;
31
32pub struct RepeatedScan<A: 'static + Send> {
37 session: VortexSession,
38 layout_reader: LayoutReaderRef,
39 projection: Expression,
40 filter: Option<Expression>,
41 ordered: bool,
42 row_range: Option<Range<u64>>,
44 selection: Selection,
46 splits: Splits,
48 concurrency: usize,
50 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
52 limit: Option<usize>,
54 dtype: DType,
56}
57
58impl RepeatedScan<ArrayRef> {
59 pub fn execute_array_iter<B: BlockingRuntime>(
60 &self,
61 row_range: Option<Range<u64>>,
62 runtime: &B,
63 ) -> VortexResult<impl ArrayIterator + 'static> {
64 let dtype = self.dtype.clone();
65 let stream = self.execute_stream(row_range)?;
66 let iter = runtime.block_on_stream(stream);
67 Ok(ArrayIteratorAdapter::new(dtype, iter))
68 }
69
70 pub fn execute_array_stream(
71 &self,
72 row_range: Option<Range<u64>>,
73 ) -> VortexResult<impl ArrayStream + Send + 'static> {
74 let dtype = self.dtype.clone();
75 let stream = self.execute_stream(row_range)?;
76 Ok(ArrayStreamAdapter::new(dtype, stream))
77 }
78}
79
80impl<A: 'static + Send> RepeatedScan<A> {
81 #[expect(
83 clippy::too_many_arguments,
84 reason = "all arguments are needed for scan construction"
85 )]
86 pub(super) fn new(
87 session: VortexSession,
88 layout_reader: LayoutReaderRef,
89 projection: Expression,
90 filter: Option<Expression>,
91 ordered: bool,
92 row_range: Option<Range<u64>>,
93 selection: Selection,
94 splits: Splits,
95 concurrency: usize,
96 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
97 limit: Option<usize>,
98 dtype: DType,
99 ) -> Self {
100 Self {
101 session,
102 layout_reader,
103 projection,
104 filter,
105 ordered,
106 row_range,
107 selection,
108 splits,
109 concurrency,
110 map_fn,
111 limit,
112 dtype,
113 }
114 }
115
116 pub fn execute(
117 &self,
118 row_range: Option<Range<u64>>,
119 ) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
120 let ctx = Arc::new(TaskContext {
121 selection: self.selection.clone(),
122 filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
123 reader: self.layout_reader.clone(),
124 projection: self.projection.clone(),
125 mapper: self.map_fn.clone(),
126 });
127
128 let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
129
130 let ranges = match &self.splits {
131 Splits::Natural(btree_set) => {
132 let splits_iter = match row_range {
133 None => Either::Left(btree_set.iter().copied()),
134 Some(range) => {
135 if range.is_empty() {
136 return Ok(Vec::new());
137 }
138 Either::Right(
139 iter::once(range.start)
140 .chain(btree_set.range(range.clone()).copied())
141 .chain(iter::once(range.end)),
142 )
143 }
144 };
145
146 Either::Left(splits_iter.tuple_windows().map(|(start, end)| start..end))
147 }
148 Splits::Ranges(ranges) => Either::Right(match row_range {
149 None => Either::Left(ranges.iter().cloned()),
150 Some(range) => {
151 if range.is_empty() {
152 return Ok(Vec::new());
153 }
154 Either::Right(ranges.iter().filter_map(move |r| {
155 let start = cmp::max(r.start, range.start);
156 let end = cmp::min(r.end, range.end);
157 (start < end).then_some(start..end)
158 }))
159 }
160 }),
161 };
162
163 let mut limit = self.limit;
164 ranges
165 .filter_map(|range| {
166 if range.start >= range.end || limit.is_some_and(|l| l == 0) {
167 None
168 } else {
169 Some(split_exec(ctx.clone(), range, limit.as_mut()))
170 }
171 })
172 .try_collect()
173 }
174
175 pub fn execute_stream(
176 &self,
177 row_range: Option<Range<u64>>,
178 ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
179 use futures::StreamExt;
180 let num_workers = std::thread::available_parallelism()
181 .map(|n| n.get())
182 .unwrap_or(1);
183 let concurrency = self.concurrency * num_workers;
184 let handle = self.session.handle();
185
186 let stream =
187 futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task));
188
189 let stream = if self.ordered {
190 stream.buffered(concurrency).boxed()
191 } else {
192 stream.buffer_unordered(concurrency).boxed()
193 };
194
195 Ok(stream.filter_map(|chunk| async move { chunk.transpose() }))
196 }
197}
198
199fn intersect_ranges(left: Option<&Range<u64>>, right: Option<Range<u64>>) -> Option<Range<u64>> {
200 match (left, right) {
201 (None, None) => None,
202 (None, Some(r)) => Some(r),
203 (Some(l), None) => Some(l.clone()),
204 (Some(l), Some(r)) => Some(cmp::max(l.start, r.start)..cmp::min(l.end, r.end)),
205 }
206}