1use std::ops::Range;
5use std::sync::Arc;
6
7use arrow_array::RecordBatch;
8use arrow_schema::SchemaRef;
9use futures::executor::ThreadPool;
10use futures::future::BoxFuture;
11use futures::stream::FuturesOrdered;
12use futures::task::SpawnExt;
13use futures::{Stream, StreamExt, stream};
14use itertools::Itertools;
15pub use multi_scan::*;
16pub use selection::*;
17pub use split_by::*;
18use tasks::{TaskContext, split_exec};
19use vortex_array::iter::ArrayIterator;
20use vortex_array::stats::StatsSet;
21use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
22use vortex_array::{ArrayRef, ToCanonical};
23use vortex_buffer::Buffer;
24use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
25use vortex_error::{VortexResult, vortex_bail, vortex_err};
26use vortex_expr::transform::immediate_access::immediate_scope_access;
27use vortex_expr::transform::simplify_typed::simplify_typed;
28use vortex_expr::{ExprRef, root};
29use vortex_layout::layouts::filter::FilterLayoutReader;
30use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
31use vortex_layout::{LayoutReader, LayoutReaderRef};
32pub use vortex_layout::{TaskExecutor, TaskExecutorExt};
33use vortex_metrics::VortexMetrics;
34
35use crate::work_queue::{TaskFactory, WorkStealingQueue};
36use crate::work_stealing_iter::{ArrayTask, WorkStealingArrayIterator};
37
38mod multi_scan;
39#[cfg(feature = "rayon")]
40pub mod rayon;
41pub mod row_mask;
42mod selection;
43mod split_by;
44mod tasks;
45mod work_queue;
46mod work_stealing_iter;
47
48pub struct ScanBuilder<A> {
50 layout_reader: LayoutReaderRef,
51 projection: ExprRef,
52 filter: Option<ExprRef>,
53 row_range: Option<Range<u64>>,
55 selection: Selection,
58 split_by: SplitBy,
60 concurrency: usize,
62 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
64 executor: Option<Arc<dyn TaskExecutor>>,
66 metrics: VortexMetrics,
67 file_stats: Option<Arc<[StatsSet]>>,
69 limit: Option<usize>,
71 row_offset: u64,
74}
75
76impl<A: 'static + Send> ScanBuilder<A> {
77 pub fn with_filter(mut self, filter: ExprRef) -> Self {
78 self.filter = Some(filter);
79 self
80 }
81
82 pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
83 self.filter = filter;
84 self
85 }
86
87 pub fn with_projection(mut self, projection: ExprRef) -> Self {
88 self.projection = projection;
89 self
90 }
91
92 pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
93 self.row_range = Some(row_range);
94 self
95 }
96
97 pub fn with_selection(mut self, selection: Selection) -> Self {
98 self.selection = selection;
99 self
100 }
101
102 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
103 self.selection = Selection::IncludeByIndex(row_indices);
104 self
105 }
106
107 pub fn with_row_offset(mut self, row_offset: u64) -> Self {
108 self.row_offset = row_offset;
109 self
110 }
111
112 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
113 self.split_by = split_by;
114 self
115 }
116
117 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
119 assert!(concurrency > 0);
120 self.concurrency = concurrency;
121 self
122 }
123
124 #[cfg(feature = "tokio")]
129 pub fn with_tokio_executor(mut self, handle: tokio::runtime::Handle) -> Self {
130 self.executor = Some(Arc::new(handle));
131 self
132 }
133
134 pub fn with_executor(mut self, executor: Arc<dyn TaskExecutor>) -> Self {
135 self.executor = Some(executor);
136 self
137 }
138
139 pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
140 self.metrics = metrics;
141 self
142 }
143
144 pub fn with_limit(mut self, limit: usize) -> Self {
145 self.limit = Some(limit);
146 self
147 }
148
149 pub fn dtype(&self) -> VortexResult<DType> {
151 self.projection.return_dtype(self.layout_reader.dtype())
152 }
153
154 pub fn map<B: 'static>(
156 self,
157 map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
158 ) -> ScanBuilder<B> {
159 let old_map_fn = self.map_fn;
160 ScanBuilder {
161 layout_reader: self.layout_reader,
162 projection: self.projection,
163 filter: self.filter,
164 row_range: self.row_range,
165 selection: self.selection,
166 split_by: self.split_by,
167 concurrency: self.concurrency,
168 map_fn: Arc::new(move |a| map_fn(old_map_fn(a)?)),
169 executor: self.executor,
170 metrics: self.metrics,
171 file_stats: self.file_stats,
172 limit: self.limit,
173 row_offset: self.row_offset,
174 }
175 }
176
177 pub fn build(mut self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
179 if self.filter.is_some() && self.limit.is_some() {
180 vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
181 }
182
183 if self.limit.is_some_and(|l| l == 0) {
185 return Ok(vec![]);
186 }
187
188 let mut layout_reader = self.layout_reader;
191
192 layout_reader = Arc::new(RowIdxLayoutReader::new(self.row_offset, layout_reader));
196
197 if self.filter.is_some() {
198 layout_reader = Arc::new(FilterLayoutReader::new(layout_reader));
199 }
200
201 let projection = simplify_typed(self.projection.clone(), layout_reader.dtype())?;
203 let filter = self
204 .filter
205 .clone()
206 .map(|f| simplify_typed(f, layout_reader.dtype()))
207 .transpose()?;
208
209 let (filter_mask, projection_mask) =
211 filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
212 let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
213 let splits = self.split_by.splits(layout_reader.as_ref(), &field_mask)?;
214
215 let split_tasks = splits
217 .into_iter()
218 .filter_map(|split_range| {
219 let ctx = Arc::new(TaskContext {
220 row_range: self.row_range.clone(),
221 selection: self.selection.clone(),
222 filter: self.filter.clone(),
223 reader: layout_reader.clone(),
224 projection: projection.clone(),
225 mapper: self.map_fn.clone(),
226 task_executor: self.executor.clone(),
227 });
228
229 if self.limit.is_some_and(|l| l == 0) {
230 None
231 } else {
232 Some(split_exec(ctx, split_range, self.limit.as_mut()))
233 }
234 })
235 .try_collect()?;
236
237 Ok(split_tasks)
238 }
239
240 pub fn into_stream(self) -> VortexResult<impl Stream<Item = VortexResult<A>> + 'static> {
241 let concurrency = self.concurrency;
242 let split_tasks = self.build()?;
243 Ok(stream::iter(split_tasks)
244 .buffered(concurrency)
245 .filter_map(|r| async move { r.transpose() }))
246 }
247
248 pub fn into_thread_pool_iter(
251 self,
252 pool: ThreadPool,
253 ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + Send + 'static> {
254 let futures: FuturesOrdered<_> = self
255 .build()?
256 .into_iter()
257 .map(|task| {
258 let fut = pool.spawn_with_handle(task);
259 async move {
260 fut.map_err(|e| vortex_err!("Failed to spawn task onto thread pool {e}"))?
261 .await
262 }
263 })
264 .collect();
265
266 Ok(futures::executor::block_on_stream(futures).filter_map_ok(|v| v))
267 }
268}
269
270impl ScanBuilder<ArrayRef> {
271 pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
272 Self {
273 layout_reader,
274 projection: root(),
275 filter: None,
276 row_range: None,
277 selection: Default::default(),
278 split_by: SplitBy::Layout,
279 concurrency: 2,
282 map_fn: Arc::new(Ok),
283 executor: None,
284 metrics: Default::default(),
285 file_stats: None,
286 limit: None,
287 row_offset: 0,
288 }
289 }
290
291 pub fn map_to_record_batch(self, schema: SchemaRef) -> ScanBuilder<RecordBatch> {
293 self.map(move |array| {
294 let st = array.to_struct()?;
295 st.into_record_batch_with_schema(schema.as_ref())
296 })
297 }
298
299 pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + 'static> {
302 let concurrency = self.concurrency;
303 let dtype = self.dtype()?;
304 let split_tasks = self.build()?;
305 Ok(ArrayStreamAdapter::new(
306 dtype,
307 stream::iter(split_tasks)
308 .buffered(concurrency)
309 .filter_map(|r| async move { r.transpose() }),
310 ))
311 }
312
313 pub fn into_array_iter(self) -> VortexResult<impl ArrayIterator + Send + Clone + 'static> {
319 let dtype = self.dtype()?;
320 let concurrency = self.concurrency;
321 let tasks = self.build()?;
322 let queue = WorkStealingQueue::new([Box::new(move || Ok(tasks)) as TaskFactory<ArrayTask>]);
323
324 Ok(WorkStealingArrayIterator::new(
325 queue,
326 Arc::new(dtype),
327 concurrency,
328 ))
329 }
330}
331
332fn filter_and_projection_masks(
336 projection: &ExprRef,
337 filter: Option<&ExprRef>,
338 dtype: &DType,
339) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
340 let Some(struct_dtype) = dtype.as_struct() else {
341 return Ok(match filter {
342 Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
343 None => (Vec::new(), vec![FieldMask::All]),
344 });
345 };
346 let projection_mask = immediate_scope_access(projection, struct_dtype);
347 Ok(match filter {
348 None => (
349 Vec::new(),
350 projection_mask.into_iter().map(to_field_mask).collect_vec(),
351 ),
352 Some(f) => {
353 let filter_mask = immediate_scope_access(f, struct_dtype);
354 let only_projection_mask = projection_mask
355 .difference(&filter_mask)
356 .cloned()
357 .map(to_field_mask)
358 .collect_vec();
359 (
360 filter_mask.into_iter().map(to_field_mask).collect_vec(),
361 only_projection_mask,
362 )
363 }
364 })
365}
366
367fn to_field_mask(field: FieldName) -> FieldMask {
368 FieldMask::Prefix(FieldPath::from(Field::Name(field)))
369}