1use std::iter;
2use std::ops::{Deref, Range};
3use std::sync::Arc;
4
5use arrow_array::RecordBatch;
6use arrow_schema::SchemaRef;
7pub use executor::*;
8use futures::executor::LocalPool;
9use futures::future::ok;
10use futures::task::LocalSpawnExt;
11use futures::{FutureExt, Stream, StreamExt, stream};
12use itertools::Itertools;
13pub use selection::*;
14pub use split_by::*;
15use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
16use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
17use vortex_array::{ArrayRef, ToCanonical};
18use vortex_buffer::Buffer;
19use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
20use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_err};
21use vortex_expr::transform::immediate_access::immediate_scope_access;
22use vortex_expr::transform::simplify_typed::simplify_typed;
23use vortex_expr::{ExprRef, Identity};
24use vortex_metrics::VortexMetrics;
25
26use crate::LayoutReader;
27use crate::layouts::filter::FilterLayoutReader;
28mod executor;
29pub mod row_mask;
30mod selection;
31mod split_by;
32
33pub struct ScanBuilder<A> {
35 layout_reader: Arc<dyn LayoutReader>,
36 projection: ExprRef,
37 filter: Option<ExprRef>,
38 row_range: Option<Range<u64>>,
40 selection: Selection,
42 split_by: SplitBy,
44 concurrency: usize,
46 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
48 executor: Option<Arc<dyn TaskExecutor>>,
50 metrics: VortexMetrics,
51}
52
53impl<A: 'static + Send> ScanBuilder<A> {
54 pub fn with_filter(mut self, filter: ExprRef) -> Self {
55 self.filter = Some(filter);
56 self
57 }
58
59 pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
60 self.filter = filter;
61 self
62 }
63
64 pub fn with_projection(mut self, projection: ExprRef) -> Self {
65 self.projection = projection;
66 self
67 }
68
69 pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
70 self.row_range = Some(row_range);
71 self
72 }
73
74 pub fn with_some_row_range(mut self, row_range: Option<Range<u64>>) -> Self {
75 self.row_range = row_range;
76 self
77 }
78
79 pub fn with_selection(mut self, selection: Selection) -> Self {
80 self.selection = selection;
81 self
82 }
83
84 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
85 self.selection = Selection::IncludeByIndex(row_indices);
86 self
87 }
88
89 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
90 self.split_by = split_by;
91 self
92 }
93
94 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
96 assert!(concurrency > 0);
97 self.concurrency = concurrency;
98 self
99 }
100
101 #[cfg(feature = "tokio")]
106 pub fn with_tokio_executor(mut self, handle: tokio::runtime::Handle) -> Self {
107 self.executor = Some(Arc::new(handle));
108 self
109 }
110
111 pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
112 self.metrics = metrics;
113 self
114 }
115
116 pub fn map<B: 'static>(
118 self,
119 map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
120 ) -> ScanBuilder<B> {
121 let old_map_fn = self.map_fn;
122 ScanBuilder {
123 layout_reader: self.layout_reader,
124 projection: self.projection,
125 filter: self.filter,
126 row_range: self.row_range,
127 selection: self.selection,
128 split_by: self.split_by,
129 concurrency: self.concurrency,
130 map_fn: Arc::new(move |a| map_fn(old_map_fn(a)?)),
131 executor: self.executor,
132 metrics: self.metrics,
133 }
134 }
135
136 pub fn dtype(&self) -> VortexResult<DType> {
138 self.projection.return_dtype(self.layout_reader.dtype())
139 }
140
141 #[allow(clippy::unused_enumerate_index)]
143 pub fn build(self) -> VortexResult<Vec<impl Future<Output = VortexResult<Option<A>>>>> {
144 let mut layout_reader = self.layout_reader;
147 if self.filter.is_some() {
148 layout_reader = Arc::new(FilterLayoutReader::new(layout_reader));
149 }
150
151 let projection = simplify_typed(self.projection.clone(), layout_reader.dtype())?;
153 let filter = self
154 .filter
155 .clone()
156 .map(|f| simplify_typed(f, layout_reader.dtype()))
157 .transpose()?;
158
159 let (filter_mask, projection_mask) =
161 filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
162 let field_mask: Vec<_> = filter_mask
163 .iter()
164 .cloned()
165 .chain(projection_mask.iter().cloned())
166 .collect();
167 let splits = self.split_by.splits(layout_reader.deref(), &field_mask)?;
168
169 let row_masks = splits
170 .into_iter()
171 .filter_map(|row_range| {
172 if let Some(scan_range) = &self.row_range {
173 if row_range.start >= scan_range.end || row_range.end < scan_range.start {
175 return None;
176 }
177 return Some(
179 row_range.start.max(scan_range.start)..row_range.end.min(scan_range.end),
180 );
181 } else {
182 Some(row_range)
183 }
184 })
185 .map(|row_range| self.selection.row_mask(&row_range))
186 .filter(|mask| !mask.mask().all_false())
187 .map(|row_mask| {
188 let row_range = row_mask.row_range();
189 (row_range, ok(row_mask.mask().clone()).boxed())
190 })
191 .collect_vec();
192
193 let row_masks = if let Some(filter) = &filter {
200 let row_masks: Vec<_> = row_masks
202 .into_iter()
203 .map(|(row_range, mask_fut)| {
204 let eval = layout_reader.pruning_evaluation(&row_range, filter)?;
205 let mask_fut = async move {
206 let mask = mask_fut.await?;
207 if mask.all_false() {
208 Ok(mask)
209 } else {
210 eval.invoke(mask).await
211 }
212 }
213 .boxed();
214 Ok::<_, VortexError>((row_range, mask_fut))
215 })
216 .try_collect()?;
217
218 row_masks
220 .into_iter()
221 .map(|(row_range, mask_fut)| {
222 let eval = layout_reader.filter_evaluation(&row_range, filter)?;
223 let mask_fut = async move {
224 let mask = mask_fut.await?;
225 if mask.all_false() {
226 Ok(mask)
227 } else {
228 eval.invoke(mask).await
229 }
230 }
231 .boxed();
232 Ok::<_, VortexError>((row_range, mask_fut))
233 })
234 .try_collect()?
235 } else {
236 row_masks
237 };
238
239 row_masks
241 .into_iter()
242 .map(|(row_range, mask_fut)| {
243 let map_fn = self.map_fn.clone();
244 let eval = layout_reader.projection_evaluation(&row_range, &projection)?;
245 let array_fut = async move {
246 let mask = mask_fut.await?;
247 if mask.all_false() {
248 Ok(None)
249 } else {
250 map_fn(eval.invoke(mask).await?).map(Some)
251 }
252 }
253 .boxed();
254
255 Ok(match &self.executor {
256 None => array_fut,
257 Some(executor) => executor.spawn(array_fut),
258 })
259 })
260 .try_collect()
261 }
262
263 pub fn into_stream(self) -> VortexResult<impl Stream<Item = VortexResult<A>> + 'static> {
265 let concurrency = self.concurrency;
266 Ok(stream::iter(self.build()?)
267 .buffered(concurrency)
268 .filter_map(|r| async move { r.transpose() }))
269 }
270}
271
272impl ScanBuilder<ArrayRef> {
273 pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
274 Self {
275 layout_reader,
276 projection: Identity::new_expr(),
277 filter: None,
278 row_range: None,
279 selection: Default::default(),
280 split_by: SplitBy::Layout,
281 concurrency: 16,
284 map_fn: Arc::new(Ok),
285 executor: None,
286 metrics: Default::default(),
287 }
288 }
289
290 pub fn map_to_record_batch(self, schema: SchemaRef) -> ScanBuilder<RecordBatch> {
292 self.map(move |array| {
293 let st = array.to_struct()?;
294 st.into_record_batch_with_schema(schema.as_ref())
295 })
296 }
297
298 pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + 'static> {
301 let dtype = self.dtype()?;
302 let stream = self.into_stream()?;
303 Ok(ArrayStreamAdapter::new(dtype, stream))
304 }
305
306 pub fn into_array_iter(self) -> VortexResult<impl ArrayIterator + 'static> {
311 let dtype = self.dtype()?;
312 let concurrency = self.concurrency;
313
314 let mut local_pool = LocalPool::new();
315 let spawner = local_pool.spawner();
316
317 let mut stream = stream::iter(self.build()?)
318 .map(move |task| {
319 spawner
320 .spawn_local_with_handle(task)
321 .map_err(|e| vortex_err!("Failed to spawn task: {e}"))
322 .vortex_expect("Failed to spawn task")
323 })
324 .buffered(concurrency)
325 .filter_map(|a| async move { a.transpose() })
326 .boxed_local();
327
328 Ok(ArrayIteratorAdapter::new(
329 dtype,
330 iter::from_fn(move || local_pool.run_until(stream.next())),
331 ))
332 }
333}
334
335fn filter_and_projection_masks(
339 projection: &ExprRef,
340 filter: Option<&ExprRef>,
341 dtype: &DType,
342) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
343 let Some(struct_dtype) = dtype.as_struct() else {
344 return Ok(match filter {
345 Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
346 None => (Vec::new(), vec![FieldMask::All]),
347 });
348 };
349 let projection_mask = immediate_scope_access(projection, struct_dtype)?;
350 Ok(match filter {
351 None => (
352 Vec::new(),
353 projection_mask.into_iter().map(to_field_mask).collect_vec(),
354 ),
355 Some(f) => {
356 let filter_mask = immediate_scope_access(f, struct_dtype)?;
357 let only_projection_mask = projection_mask
358 .difference(&filter_mask)
359 .cloned()
360 .map(to_field_mask)
361 .collect_vec();
362 (
363 filter_mask.into_iter().map(to_field_mask).collect_vec(),
364 only_projection_mask,
365 )
366 }
367 })
368}
369
370fn to_field_mask(field: FieldName) -> FieldMask {
371 FieldMask::Prefix(FieldPath::from(Field::Name(field)))
372}