vortex_scan/
repeated_scan.rs1use std::ops::Range;
5use std::sync::Arc;
6use std::{cmp, iter};
7
8use futures::Stream;
9use futures::future::BoxFuture;
10use itertools::{Either, Itertools};
11use vortex_array::ArrayRef;
12use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
13use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
14use vortex_dtype::DType;
15use vortex_error::VortexResult;
16use vortex_expr::ExprRef;
17use vortex_io::runtime::{BlockingRuntime, Handle};
18use vortex_layout::LayoutReaderRef;
19
20use crate::filter::FilterExpr;
21use crate::selection::Selection;
22use crate::splits::Splits;
23use crate::tasks::{TaskContext, split_exec};
24
25pub struct RepeatedScan<A: 'static + Send> {
30 handle: Handle,
31 layout_reader: LayoutReaderRef,
32 projection: ExprRef,
33 filter: Option<ExprRef>,
34 ordered: bool,
35 row_range: Option<Range<u64>>,
37 selection: Selection,
39 splits: Splits,
41 concurrency: usize,
43 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
45 limit: Option<usize>,
47 dtype: DType,
49}
50
51impl RepeatedScan<ArrayRef> {
52 pub fn execute_array_iter<B: BlockingRuntime>(
53 &self,
54 row_range: Option<Range<u64>>,
55 runtime: &B,
56 ) -> VortexResult<impl ArrayIterator + 'static> {
57 let dtype = self.dtype.clone();
58 let stream = self.execute_stream(row_range)?;
59 let iter = runtime.block_on_stream(move |_h| stream);
60 Ok(ArrayIteratorAdapter::new(dtype, iter))
61 }
62
63 pub fn execute_array_stream(
64 &self,
65 row_range: Option<Range<u64>>,
66 ) -> VortexResult<impl ArrayStream + Send + 'static> {
67 let dtype = self.dtype.clone();
68 let stream = self.execute_stream(row_range)?;
69 Ok(ArrayStreamAdapter::new(dtype, stream))
70 }
71}
72
73impl<A: 'static + Send> RepeatedScan<A> {
74 #[allow(clippy::too_many_arguments)]
76 pub(super) fn new(
77 handle: Handle,
78 layout_reader: LayoutReaderRef,
79 projection: ExprRef,
80 filter: Option<ExprRef>,
81 ordered: bool,
82 row_range: Option<Range<u64>>,
83 selection: Selection,
84 splits: Splits,
85 concurrency: usize,
86 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
87 limit: Option<usize>,
88 dtype: DType,
89 ) -> Self {
90 Self {
91 handle,
92 layout_reader,
93 projection,
94 filter,
95 ordered,
96 row_range,
97 selection,
98 splits,
99 concurrency,
100 map_fn,
101 limit,
102 dtype,
103 }
104 }
105
106 pub fn execute(
107 &self,
108 row_range: Option<Range<u64>>,
109 ) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
110 let ctx = Arc::new(TaskContext {
111 selection: self.selection.clone(),
112 filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
113 reader: self.layout_reader.clone(),
114 projection: self.projection.clone(),
115 mapper: self.map_fn.clone(),
116 });
117
118 let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
119
120 let ranges = match &self.splits {
121 Splits::Natural(btree_set) => {
122 let splits_iter = match row_range {
123 None => Either::Left(btree_set.iter().copied()),
124 Some(range) => {
125 if range.is_empty() {
126 return Ok(Vec::new());
127 }
128 Either::Right(
129 iter::once(range.start)
130 .chain(btree_set.range(range.clone()).copied())
131 .chain(iter::once(range.end)),
132 )
133 }
134 };
135
136 Either::Left(splits_iter.tuple_windows().map(|(start, end)| start..end))
137 }
138 Splits::Ranges(ranges) => Either::Right(match row_range {
139 None => Either::Left(ranges.iter().cloned()),
140 Some(range) => {
141 if range.is_empty() {
142 return Ok(Vec::new());
143 }
144 Either::Right(ranges.iter().filter_map(move |r| {
145 let start = cmp::max(r.start, range.start);
146 let end = cmp::min(r.end, range.end);
147 (start < end).then_some(start..end)
148 }))
149 }
150 }),
151 };
152
153 let mut limit = self.limit;
154 ranges
155 .filter_map(|range| {
156 if range.start >= range.end || limit.is_some_and(|l| l == 0) {
157 None
158 } else {
159 Some(split_exec(ctx.clone(), range, limit.as_mut()))
160 }
161 })
162 .try_collect()
163 }
164
165 pub fn execute_stream(
166 &self,
167 row_range: Option<Range<u64>>,
168 ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
169 use futures::StreamExt;
170 let num_workers = std::thread::available_parallelism()
171 .map(|n| n.get())
172 .unwrap_or(1);
173 let concurrency = self.concurrency * num_workers;
174 let handle = self.handle.clone();
175
176 let stream =
177 futures::stream::iter(self.execute(row_range)?).map(move |task| handle.spawn(task));
178
179 let stream = if self.ordered {
180 stream.buffered(concurrency).boxed()
181 } else {
182 stream.buffer_unordered(concurrency).boxed()
183 };
184
185 Ok(stream.filter_map(|chunk| async move { chunk.transpose() }))
186 }
187}
188
189fn intersect_ranges(left: Option<&Range<u64>>, right: Option<Range<u64>>) -> Option<Range<u64>> {
190 match (left, right) {
191 (None, None) => None,
192 (None, Some(r)) => Some(r),
193 (Some(l), None) => Some(l.clone()),
194 (Some(l), Some(r)) => Some(cmp::max(l.start, r.start)..cmp::min(l.end, r.end)),
195 }
196}