1use std::cmp;
5use std::ops::Range;
6use std::sync::Arc;
7
8use futures::future::BoxFuture;
9use itertools::Itertools;
10pub use multi_scan::*;
11pub use selection::*;
12pub use split_by::*;
13use tasks::{TaskContext, split_exec};
14use vortex_array::ArrayRef;
15use vortex_array::iter::ArrayIterator;
16use vortex_array::stats::StatsSet;
17use vortex_buffer::Buffer;
18use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
19use vortex_error::{VortexResult, vortex_bail};
20use vortex_expr::transform::immediate_access::immediate_scope_access;
21use vortex_expr::transform::simplify_typed;
22use vortex_expr::{ExprRef, root};
23use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
24use vortex_layout::{LayoutReader, LayoutReaderRef};
25pub use vortex_layout::{TaskExecutor, TaskExecutorExt};
26use vortex_metrics::VortexMetrics;
27
28use crate::filter::FilterExpr;
29use crate::work_queue::{TaskFactory, WorkStealingQueue};
30use crate::work_stealing_iter::{ArrayTask, WorkStealingArrayIterator};
31
32mod arrow;
33mod filter;
34mod multi_scan;
35#[cfg(feature = "tokio")]
36mod multi_thread;
37pub mod row_mask;
38mod selection;
39mod split_by;
40mod tasks;
41mod work_queue;
42mod work_stealing_iter;
43
44pub struct ScanBuilder<A> {
46 layout_reader: LayoutReaderRef,
47 projection: ExprRef,
48 filter: Option<ExprRef>,
49 row_range: Option<Range<u64>>,
51 selection: Selection,
54 split_by: SplitBy,
56 concurrency: usize,
58 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
60 metrics: VortexMetrics,
61 file_stats: Option<Arc<[StatsSet]>>,
63 limit: Option<usize>,
65 row_offset: u64,
68}
69
70impl<A: 'static + Send> ScanBuilder<A> {
71 pub fn with_filter(mut self, filter: ExprRef) -> Self {
72 self.filter = Some(filter);
73 self
74 }
75
76 pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
77 self.filter = filter;
78 self
79 }
80
81 pub fn with_projection(mut self, projection: ExprRef) -> Self {
82 self.projection = projection;
83 self
84 }
85
86 pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
87 self.row_range = Some(row_range);
88 self
89 }
90
91 pub fn with_selection(mut self, selection: Selection) -> Self {
92 self.selection = selection;
93 self
94 }
95
96 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
97 self.selection = Selection::IncludeByIndex(row_indices);
98 self
99 }
100
101 pub fn with_row_offset(mut self, row_offset: u64) -> Self {
102 self.row_offset = row_offset;
103 self
104 }
105
106 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
107 self.split_by = split_by;
108 self
109 }
110
111 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
114 assert!(concurrency > 0);
115 self.concurrency = concurrency;
116 self
117 }
118
119 pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
120 self.metrics = metrics;
121 self
122 }
123
124 pub fn with_limit(mut self, limit: usize) -> Self {
125 self.limit = Some(limit);
126 self
127 }
128
129 pub fn dtype(&self) -> VortexResult<DType> {
131 self.projection.return_dtype(self.layout_reader.dtype())
132 }
133
134 pub fn map<B: 'static>(
136 self,
137 map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
138 ) -> ScanBuilder<B> {
139 let old_map_fn = self.map_fn;
140 ScanBuilder {
141 layout_reader: self.layout_reader,
142 projection: self.projection,
143 filter: self.filter,
144 row_range: self.row_range,
145 selection: self.selection,
146 split_by: self.split_by,
147 concurrency: self.concurrency,
148 map_fn: Arc::new(move |a| map_fn(old_map_fn(a)?)),
149 metrics: self.metrics,
150 file_stats: self.file_stats,
151 limit: self.limit,
152 row_offset: self.row_offset,
153 }
154 }
155
156 pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
157 if self.filter.is_some() && self.limit.is_some() {
158 vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
159 }
160
161 let dtype = self.dtype()?;
162
163 let mut layout_reader = self.layout_reader;
166
167 layout_reader = Arc::new(RowIdxLayoutReader::new(self.row_offset, layout_reader));
171
172 let projection = simplify_typed(self.projection, layout_reader.dtype())?;
174 let filter = self
175 .filter
176 .map(|f| simplify_typed(f, layout_reader.dtype()))
177 .transpose()?;
178
179 let (filter_mask, projection_mask) =
181 filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
182 let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
183 let splits = self.split_by.splits(layout_reader.as_ref(), &field_mask)?;
184 Ok(RepeatedScan {
185 layout_reader,
186 projection,
187 filter,
188 row_range: self.row_range,
189 selection: self.selection,
190 splits,
191 concurrency: self.concurrency,
192 map_fn: self.map_fn,
193 limit: self.limit,
194 dtype,
195 })
196 }
197
198 pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
200 if self.limit.is_some_and(|l| l == 0) {
202 return Ok(vec![]);
203 }
204
205 let row_range = self.row_range.clone();
206 self.prepare()?.execute(row_range)
207 }
208
209 #[cfg(feature = "tokio")]
217 pub fn into_tokio_stream(
218 self,
219 ) -> VortexResult<impl futures::Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
220 let row_range = self.row_range.clone();
221 self.prepare()?.execute_tokio_stream(row_range)
222 }
223}
224
225impl ScanBuilder<ArrayRef> {
226 pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
227 Self {
228 layout_reader,
229 projection: root(),
230 filter: None,
231 row_range: None,
232 selection: Default::default(),
233 split_by: SplitBy::Layout,
234 concurrency: 4,
237 map_fn: Arc::new(Ok),
238 metrics: Default::default(),
239 file_stats: None,
240 limit: None,
241 row_offset: 0,
242 }
243 }
244
245 pub fn into_array_iter(self) -> VortexResult<impl ArrayIterator + Send + Clone + 'static> {
251 let dtype = self.dtype()?;
252 let concurrency = self.concurrency;
253 let tasks = self.build()?;
254 let queue = WorkStealingQueue::new([Box::new(move || Ok(tasks)) as TaskFactory<ArrayTask>]);
255
256 Ok(WorkStealingArrayIterator::new(
257 queue,
258 Arc::new(dtype),
259 concurrency,
260 ))
261 }
262
263 #[cfg(feature = "tokio")]
269 pub fn into_tokio_array_stream(
270 self,
271 ) -> VortexResult<impl vortex_array::stream::ArrayStream + Send + 'static> {
272 let dtype = self.dtype()?;
273 let stream = self.into_tokio_stream()?;
274 Ok(vortex_array::stream::ArrayStreamAdapter::new(dtype, stream))
275 }
276}
277
278fn filter_and_projection_masks(
282 projection: &ExprRef,
283 filter: Option<&ExprRef>,
284 dtype: &DType,
285) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
286 let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
287 return Ok(match filter {
288 Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
289 None => (Vec::new(), vec![FieldMask::All]),
290 });
291 };
292 let projection_mask = immediate_scope_access(projection, struct_dtype);
293 Ok(match filter {
294 None => (
295 Vec::new(),
296 projection_mask.into_iter().map(to_field_mask).collect_vec(),
297 ),
298 Some(f) => {
299 let filter_mask = immediate_scope_access(f, struct_dtype);
300 let only_projection_mask = projection_mask
301 .difference(&filter_mask)
302 .cloned()
303 .map(to_field_mask)
304 .collect_vec();
305 (
306 filter_mask.into_iter().map(to_field_mask).collect_vec(),
307 only_projection_mask,
308 )
309 }
310 })
311}
312
313fn to_field_mask(field: FieldName) -> FieldMask {
314 FieldMask::Prefix(FieldPath::from(Field::Name(field)))
315}
316
317pub struct RepeatedScan<A: 'static + Send> {
324 layout_reader: LayoutReaderRef,
325 projection: ExprRef,
326 filter: Option<ExprRef>,
327 row_range: Option<Range<u64>>,
329 selection: Selection,
331 splits: Vec<Range<u64>>,
333 concurrency: usize,
335 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
337 limit: Option<usize>,
339 dtype: DType,
341}
342
343impl<A: 'static + Send> RepeatedScan<A> {
344 pub fn execute(
345 &self,
346 row_range: Option<Range<u64>>,
347 ) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
348 let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
349
350 let ctx = Arc::new(TaskContext {
351 row_range,
352 selection: self.selection.clone(),
353 filter: self.filter.clone().map(|f| Arc::new(FilterExpr::new(f))),
354 reader: self.layout_reader.clone(),
355 projection: self.projection.clone(),
356 mapper: self.map_fn.clone(),
357 });
358
359 let mut limit = self.limit;
361 let split_tasks = self
362 .splits
363 .iter()
364 .filter_map(|split_range| {
365 if limit.is_some_and(|l| l == 0) {
366 None
367 } else {
368 Some(split_exec(ctx.clone(), split_range.clone(), limit.as_mut()))
369 }
370 })
371 .try_collect()?;
372
373 Ok(split_tasks)
374 }
375
376 #[cfg(feature = "tokio")]
377 pub fn execute_tokio_stream(
378 &self,
379 row_range: Option<Range<u64>>,
380 ) -> VortexResult<impl futures::Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
381 let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
382
383 use futures::StreamExt;
384 use vortex_error::vortex_err;
385
386 let handle = tokio::runtime::Handle::current();
387 let num_workers = handle.metrics().num_workers();
388 let concurrency = self.concurrency * num_workers;
389 Ok(futures::stream::iter(self.execute(row_range)?)
390 .map(move |task| handle.spawn(task))
391 .buffered(concurrency)
392 .map(|task| {
393 task.map_err(|e| vortex_err!("Failed to join task: {e}"))
394 .flatten()
395 })
396 .filter_map(|chunk| async move { chunk.transpose() }))
397 }
398}
399
400impl RepeatedScan<ArrayRef> {
401 pub fn execute_array_iter(
402 &self,
403 row_range: Option<Range<u64>>,
404 ) -> VortexResult<impl ArrayIterator + Send + Clone + 'static> {
405 let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
406
407 let dtype = self.dtype.clone();
408 let tasks = self.execute(row_range)?;
409 let queue = WorkStealingQueue::new([Box::new(move || Ok(tasks)) as TaskFactory<ArrayTask>]);
410
411 Ok(WorkStealingArrayIterator::new(
412 queue,
413 Arc::new(dtype),
414 self.concurrency,
415 ))
416 }
417
418 #[cfg(feature = "tokio")]
419 pub fn execute_tokio_array_stream(
420 &self,
421 row_range: Option<Range<u64>>,
422 ) -> VortexResult<impl vortex_array::stream::ArrayStream + Send + 'static> {
423 let row_range = intersect_ranges(self.row_range.as_ref(), row_range);
424
425 let dtype = self.dtype.clone();
426 let stream = self.execute_tokio_stream(row_range)?;
427 Ok(vortex_array::stream::ArrayStreamAdapter::new(dtype, stream))
428 }
429}
430
431fn intersect_ranges(left: Option<&Range<u64>>, right: Option<Range<u64>>) -> Option<Range<u64>> {
432 match (left, right) {
433 (None, None) => None,
434 (None, Some(r)) => Some(r),
435 (Some(l), None) => Some(l.clone()),
436 (Some(l), Some(r)) => Some(cmp::max(l.start, r.start)..cmp::min(l.end, r.end)),
437 }
438}
439
440#[cfg(test)]
441mod tests;