1use std::iter;
2use std::ops::{Deref, Range};
3use std::sync::Arc;
4
5use arrow_array::RecordBatch;
6use arrow_schema::SchemaRef;
7pub use executor::*;
8use futures::executor::LocalPool;
9use futures::future::ok;
10use futures::task::LocalSpawnExt;
11use futures::{FutureExt, Stream, StreamExt, stream};
12use itertools::Itertools;
13pub use selection::*;
14pub use split_by::*;
15use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
16use vortex_array::stats::StatsSet;
17use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
18use vortex_array::{ArrayRef, ToCanonical};
19use vortex_buffer::Buffer;
20use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
21use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_err};
22use vortex_expr::transform::immediate_access::immediate_scope_access;
23use vortex_expr::transform::simplify_typed::simplify_typed;
24use vortex_expr::{ExprRef, Identity};
25use vortex_metrics::VortexMetrics;
26
27use crate::LayoutReader;
28use crate::layouts::filter::FilterLayoutReader;
29mod executor;
30pub mod row_mask;
31mod selection;
32mod split_by;
33
34pub struct ScanBuilder<A> {
36 layout_reader: Arc<dyn LayoutReader>,
37 projection: ExprRef,
38 filter: Option<ExprRef>,
39 row_range: Option<Range<u64>>,
41 selection: Selection,
43 split_by: SplitBy,
45 concurrency: usize,
47 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
49 executor: Option<Arc<dyn TaskExecutor>>,
51 metrics: VortexMetrics,
52 file_stats: Option<Arc<[StatsSet]>>,
54}
55
56impl<A: 'static + Send> ScanBuilder<A> {
57 pub fn with_filter(mut self, filter: ExprRef) -> Self {
58 self.filter = Some(filter);
59 self
60 }
61
62 pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
63 self.filter = filter;
64 self
65 }
66
67 pub fn with_projection(mut self, projection: ExprRef) -> Self {
68 self.projection = projection;
69 self
70 }
71
72 pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
73 self.row_range = Some(row_range);
74 self
75 }
76
77 pub fn with_some_row_range(mut self, row_range: Option<Range<u64>>) -> Self {
78 self.row_range = row_range;
79 self
80 }
81
82 pub fn with_selection(mut self, selection: Selection) -> Self {
83 self.selection = selection;
84 self
85 }
86
87 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
88 self.selection = Selection::IncludeByIndex(row_indices);
89 self
90 }
91
92 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
93 self.split_by = split_by;
94 self
95 }
96
97 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
99 assert!(concurrency > 0);
100 self.concurrency = concurrency;
101 self
102 }
103
104 #[cfg(feature = "tokio")]
109 pub fn with_tokio_executor(mut self, handle: tokio::runtime::Handle) -> Self {
110 self.executor = Some(Arc::new(handle));
111 self
112 }
113
114 pub fn with_executor(mut self, executor: Arc<dyn TaskExecutor>) -> Self {
115 self.executor = Some(executor);
116 self
117 }
118
119 pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
120 self.metrics = metrics;
121 self
122 }
123
124 pub fn with_prune_file_on_open(mut self, stats_set: Arc<[StatsSet]>) -> Self {
125 self.file_stats = Some(stats_set);
126 self
127 }
128
129 pub fn map<B: 'static>(
131 self,
132 map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
133 ) -> ScanBuilder<B> {
134 let old_map_fn = self.map_fn;
135 ScanBuilder {
136 layout_reader: self.layout_reader,
137 projection: self.projection,
138 filter: self.filter,
139 row_range: self.row_range,
140 selection: self.selection,
141 split_by: self.split_by,
142 concurrency: self.concurrency,
143 map_fn: Arc::new(move |a| map_fn(old_map_fn(a)?)),
144 executor: self.executor,
145 metrics: self.metrics,
146 file_stats: self.file_stats,
147 }
148 }
149
150 pub fn dtype(&self) -> VortexResult<DType> {
152 self.projection.return_dtype(self.layout_reader.dtype())
153 }
154
155 #[allow(clippy::unused_enumerate_index)]
157 pub fn build(self) -> VortexResult<Vec<impl Future<Output = VortexResult<Option<A>>>>> {
158 let mut layout_reader = self.layout_reader;
161 if self.filter.is_some() {
162 layout_reader = Arc::new(FilterLayoutReader::new(layout_reader));
163 }
164
165 let projection = simplify_typed(self.projection.clone(), layout_reader.dtype())?;
167 let filter = self
168 .filter
169 .clone()
170 .map(|f| simplify_typed(f, layout_reader.dtype()))
171 .transpose()?;
172
173 let (filter_mask, projection_mask) =
175 filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
176 let field_mask: Vec<_> = filter_mask
177 .iter()
178 .cloned()
179 .chain(projection_mask.iter().cloned())
180 .collect();
181 let splits = self.split_by.splits(layout_reader.deref(), &field_mask)?;
182
183 let row_masks = splits
184 .into_iter()
185 .filter_map(|row_range| {
186 if let Some(scan_range) = &self.row_range {
187 if row_range.start >= scan_range.end || row_range.end < scan_range.start {
189 return None;
190 }
191 return Some(
193 row_range.start.max(scan_range.start)..row_range.end.min(scan_range.end),
194 );
195 } else {
196 Some(row_range)
197 }
198 })
199 .map(|row_range| self.selection.row_mask(&row_range))
200 .filter(|mask| !mask.mask().all_false())
201 .map(|row_mask| {
202 let row_range = row_mask.row_range();
203 (row_range, ok(row_mask.mask().clone()).boxed())
204 })
205 .collect_vec();
206
207 let row_masks = if let Some(filter) = &filter {
214 let row_masks: Vec<_> = row_masks
216 .into_iter()
217 .map(|(row_range, mask_fut)| {
218 let eval = layout_reader.pruning_evaluation(&row_range, filter)?;
219 let mask_fut = async move {
220 let mask = mask_fut.await?;
221 if mask.all_false() {
222 Ok(mask)
223 } else {
224 eval.invoke(mask).await
225 }
226 }
227 .boxed();
228 Ok::<_, VortexError>((row_range, mask_fut))
229 })
230 .try_collect()?;
231
232 row_masks
234 .into_iter()
235 .map(|(row_range, mask_fut)| {
236 let eval = layout_reader.filter_evaluation(&row_range, filter)?;
237 let mask_fut = async move {
238 let mask = mask_fut.await?;
239 if mask.all_false() {
240 Ok(mask)
241 } else {
242 eval.invoke(mask).await
243 }
244 }
245 .boxed();
246 Ok::<_, VortexError>((row_range, mask_fut))
247 })
248 .try_collect()?
249 } else {
250 row_masks
251 };
252
253 row_masks
255 .into_iter()
256 .map(|(row_range, mask_fut)| {
257 let map_fn = self.map_fn.clone();
258 let eval = layout_reader.projection_evaluation(&row_range, &projection)?;
259 let array_fut = async move {
260 let mask = mask_fut.await?;
261 if mask.all_false() {
262 Ok(None)
263 } else {
264 map_fn(eval.invoke(mask).await?).map(Some)
265 }
266 }
267 .boxed();
268
269 Ok(match &self.executor {
270 None => array_fut,
271 Some(executor) => executor.spawn(array_fut),
272 })
273 })
274 .try_collect()
275 }
276
277 pub fn into_stream(self) -> VortexResult<impl Stream<Item = VortexResult<A>> + 'static> {
279 let concurrency = self.concurrency;
280 Ok(stream::iter(self.build()?)
281 .buffered(concurrency)
282 .filter_map(|r| async move { r.transpose() }))
283 }
284}
285
286impl ScanBuilder<ArrayRef> {
287 pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
288 Self {
289 layout_reader,
290 projection: Identity::new_expr(),
291 filter: None,
292 row_range: None,
293 selection: Default::default(),
294 split_by: SplitBy::Layout,
295 concurrency: 16,
298 map_fn: Arc::new(Ok),
299 executor: None,
300 metrics: Default::default(),
301 file_stats: None,
302 }
303 }
304
305 pub fn map_to_record_batch(self, schema: SchemaRef) -> ScanBuilder<RecordBatch> {
307 self.map(move |array| {
308 let st = array.to_struct()?;
309 st.into_record_batch_with_schema(schema.as_ref())
310 })
311 }
312
313 pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + 'static> {
316 let dtype = self.dtype()?;
317 let stream = self.into_stream()?;
318 Ok(ArrayStreamAdapter::new(dtype, stream))
319 }
320
321 pub fn into_array_iter(self) -> VortexResult<impl ArrayIterator + 'static> {
326 let dtype = self.dtype()?;
327 let concurrency = self.concurrency;
328
329 let mut local_pool = LocalPool::new();
330 let spawner = local_pool.spawner();
331
332 let mut stream = stream::iter(self.build()?)
333 .map(move |task| {
334 spawner
335 .spawn_local_with_handle(task)
336 .map_err(|e| vortex_err!("Failed to spawn task: {e}"))
337 .vortex_expect("Failed to spawn task")
338 })
339 .buffered(concurrency)
340 .filter_map(|a| async move { a.transpose() })
341 .boxed_local();
342
343 Ok(ArrayIteratorAdapter::new(
344 dtype,
345 iter::from_fn(move || local_pool.run_until(stream.next())),
346 ))
347 }
348}
349
350fn filter_and_projection_masks(
354 projection: &ExprRef,
355 filter: Option<&ExprRef>,
356 dtype: &DType,
357) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
358 let Some(struct_dtype) = dtype.as_struct() else {
359 return Ok(match filter {
360 Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
361 None => (Vec::new(), vec![FieldMask::All]),
362 });
363 };
364 let projection_mask = immediate_scope_access(projection, struct_dtype)?;
365 Ok(match filter {
366 None => (
367 Vec::new(),
368 projection_mask.into_iter().map(to_field_mask).collect_vec(),
369 ),
370 Some(f) => {
371 let filter_mask = immediate_scope_access(f, struct_dtype)?;
372 let only_projection_mask = projection_mask
373 .difference(&filter_mask)
374 .cloned()
375 .map(to_field_mask)
376 .collect_vec();
377 (
378 filter_mask.into_iter().map(to_field_mask).collect_vec(),
379 only_projection_mask,
380 )
381 }
382 })
383}
384
385fn to_field_mask(field: FieldName) -> FieldMask {
386 FieldMask::Prefix(FieldPath::from(Field::Name(field)))
387}