1use std::ops::Range;
5use std::sync::Arc;
6
7use futures::future::BoxFuture;
8use itertools::Itertools;
9pub use multi_scan::*;
10pub use selection::*;
11pub use split_by::*;
12use tasks::{TaskContext, split_exec};
13use vortex_array::ArrayRef;
14use vortex_array::iter::ArrayIterator;
15use vortex_array::stats::StatsSet;
16use vortex_buffer::Buffer;
17use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
18use vortex_error::{VortexResult, vortex_bail};
19use vortex_expr::transform::immediate_access::immediate_scope_access;
20use vortex_expr::transform::simplify_typed;
21use vortex_expr::{ExprRef, root};
22use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
23use vortex_layout::{LayoutReader, LayoutReaderRef};
24pub use vortex_layout::{TaskExecutor, TaskExecutorExt};
25use vortex_metrics::VortexMetrics;
26
27use crate::filter::FilterExpr;
28use crate::work_queue::{TaskFactory, WorkStealingQueue};
29use crate::work_stealing_iter::{ArrayTask, WorkStealingArrayIterator};
30
31mod arrow;
32mod filter;
33mod multi_scan;
34#[cfg(feature = "tokio")]
35mod multi_thread;
36pub mod row_mask;
37mod selection;
38mod split_by;
39mod tasks;
40mod work_queue;
41mod work_stealing_iter;
42
43pub struct ScanBuilder<A> {
45 layout_reader: LayoutReaderRef,
46 projection: ExprRef,
47 filter: Option<ExprRef>,
48 row_range: Option<Range<u64>>,
50 selection: Selection,
53 split_by: SplitBy,
55 concurrency: usize,
57 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
59 metrics: VortexMetrics,
60 file_stats: Option<Arc<[StatsSet]>>,
62 limit: Option<usize>,
64 row_offset: u64,
67}
68
69impl<A: 'static + Send> ScanBuilder<A> {
70 pub fn with_filter(mut self, filter: ExprRef) -> Self {
71 self.filter = Some(filter);
72 self
73 }
74
75 pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
76 self.filter = filter;
77 self
78 }
79
80 pub fn with_projection(mut self, projection: ExprRef) -> Self {
81 self.projection = projection;
82 self
83 }
84
85 pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
86 self.row_range = Some(row_range);
87 self
88 }
89
90 pub fn with_selection(mut self, selection: Selection) -> Self {
91 self.selection = selection;
92 self
93 }
94
95 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
96 self.selection = Selection::IncludeByIndex(row_indices);
97 self
98 }
99
100 pub fn with_row_offset(mut self, row_offset: u64) -> Self {
101 self.row_offset = row_offset;
102 self
103 }
104
105 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
106 self.split_by = split_by;
107 self
108 }
109
110 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
113 assert!(concurrency > 0);
114 self.concurrency = concurrency;
115 self
116 }
117
118 pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
119 self.metrics = metrics;
120 self
121 }
122
123 pub fn with_limit(mut self, limit: usize) -> Self {
124 self.limit = Some(limit);
125 self
126 }
127
128 pub fn dtype(&self) -> VortexResult<DType> {
130 self.projection.return_dtype(self.layout_reader.dtype())
131 }
132
133 pub fn map<B: 'static>(
135 self,
136 map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
137 ) -> ScanBuilder<B> {
138 let old_map_fn = self.map_fn;
139 ScanBuilder {
140 layout_reader: self.layout_reader,
141 projection: self.projection,
142 filter: self.filter,
143 row_range: self.row_range,
144 selection: self.selection,
145 split_by: self.split_by,
146 concurrency: self.concurrency,
147 map_fn: Arc::new(move |a| map_fn(old_map_fn(a)?)),
148 metrics: self.metrics,
149 file_stats: self.file_stats,
150 limit: self.limit,
151 row_offset: self.row_offset,
152 }
153 }
154
155 pub fn build(mut self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
157 if self.filter.is_some() && self.limit.is_some() {
158 vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
159 }
160
161 if self.limit.is_some_and(|l| l == 0) {
163 return Ok(vec![]);
164 }
165
166 let mut layout_reader = self.layout_reader;
169
170 layout_reader = Arc::new(RowIdxLayoutReader::new(self.row_offset, layout_reader));
174
175 let projection = simplify_typed(self.projection.clone(), layout_reader.dtype())?;
177 let filter = self
178 .filter
179 .clone()
180 .map(|f| simplify_typed(f, layout_reader.dtype()))
181 .transpose()?;
182
183 let (filter_mask, projection_mask) =
185 filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
186 let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
187 let splits = self.split_by.splits(layout_reader.as_ref(), &field_mask)?;
188
189 let ctx = Arc::new(TaskContext {
190 row_range: self.row_range,
191 selection: self.selection,
192 filter: filter.map(|f| Arc::new(FilterExpr::new(f))),
193 reader: layout_reader,
194 projection,
195 mapper: self.map_fn,
196 });
197
198 let split_tasks = splits
200 .into_iter()
201 .filter_map(|split_range| {
202 if self.limit.is_some_and(|l| l == 0) {
203 None
204 } else {
205 Some(split_exec(ctx.clone(), split_range, self.limit.as_mut()))
206 }
207 })
208 .try_collect()?;
209
210 Ok(split_tasks)
211 }
212
213 #[cfg(feature = "tokio")]
221 pub fn into_tokio_stream(
222 self,
223 ) -> VortexResult<impl futures::Stream<Item = VortexResult<A>> + Send + 'static> {
224 use futures::StreamExt;
225 use vortex_error::vortex_err;
226
227 let handle = tokio::runtime::Handle::current();
228 let num_workers = handle.metrics().num_workers();
229 let concurrency = self.concurrency * num_workers;
230 Ok(futures::stream::iter(self.build()?)
231 .map(move |task| handle.spawn(task))
232 .buffered(concurrency)
233 .map(|task| {
234 task.map_err(|e| vortex_err!("Failed to join task: {e}"))
235 .flatten()
236 })
237 .filter_map(|chunk| async move { chunk.transpose() }))
238 }
239}
240
241impl ScanBuilder<ArrayRef> {
242 pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
243 Self {
244 layout_reader,
245 projection: root(),
246 filter: None,
247 row_range: None,
248 selection: Default::default(),
249 split_by: SplitBy::Layout,
250 concurrency: 4,
253 map_fn: Arc::new(Ok),
254 metrics: Default::default(),
255 file_stats: None,
256 limit: None,
257 row_offset: 0,
258 }
259 }
260
261 pub fn into_array_iter(self) -> VortexResult<impl ArrayIterator + Send + Clone + 'static> {
267 let dtype = self.dtype()?;
268 let concurrency = self.concurrency;
269 let tasks = self.build()?;
270 let queue = WorkStealingQueue::new([Box::new(move || Ok(tasks)) as TaskFactory<ArrayTask>]);
271
272 Ok(WorkStealingArrayIterator::new(
273 queue,
274 Arc::new(dtype),
275 concurrency,
276 ))
277 }
278
279 #[cfg(feature = "tokio")]
285 pub fn into_tokio_array_stream(
286 self,
287 ) -> VortexResult<impl vortex_array::stream::ArrayStream + Send + 'static> {
288 let dtype = self.dtype()?;
289 let stream = self.into_tokio_stream()?;
290 Ok(vortex_array::stream::ArrayStreamAdapter::new(dtype, stream))
291 }
292}
293
294fn filter_and_projection_masks(
298 projection: &ExprRef,
299 filter: Option<&ExprRef>,
300 dtype: &DType,
301) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
302 let Some(struct_dtype) = dtype.as_struct() else {
303 return Ok(match filter {
304 Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
305 None => (Vec::new(), vec![FieldMask::All]),
306 });
307 };
308 let projection_mask = immediate_scope_access(projection, struct_dtype);
309 Ok(match filter {
310 None => (
311 Vec::new(),
312 projection_mask.into_iter().map(to_field_mask).collect_vec(),
313 ),
314 Some(f) => {
315 let filter_mask = immediate_scope_access(f, struct_dtype);
316 let only_projection_mask = projection_mask
317 .difference(&filter_mask)
318 .cloned()
319 .map(to_field_mask)
320 .collect_vec();
321 (
322 filter_mask.into_iter().map(to_field_mask).collect_vec(),
323 only_projection_mask,
324 )
325 }
326 })
327}
328
329fn to_field_mask(field: FieldName) -> FieldMask {
330 FieldMask::Prefix(FieldPath::from(Field::Name(field)))
331}
332
333#[cfg(test)]
334mod tests;