1use std::iter;
2use std::ops::Range;
3use std::sync::Arc;
4
5use futures::executor::LocalPool;
6use futures::future::BoxFuture;
7use futures::task::LocalSpawnExt;
8use futures::{FutureExt, StreamExt, stream};
9use itertools::Itertools;
10pub use selection::*;
11pub use split_by::*;
12use vortex_array::builders::builder_with_capacity;
13use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
14use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
15use vortex_array::{Array, ArrayRef};
16use vortex_buffer::Buffer;
17use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
18use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_err};
19use vortex_expr::transform::immediate_access::immediate_scope_access;
20use vortex_expr::transform::simplify_typed::simplify_typed;
21use vortex_expr::{ExprRef, Identity};
22use vortex_metrics::{VortexMetrics, instrument};
23
24use crate::layouts::filter::FilterLayoutReader;
25use crate::{ExprEvaluator, LayoutReader};
26
27pub mod row_mask;
28mod selection;
29mod split_by;
30
31pub struct ScanBuilder {
33 layout_reader: Arc<dyn LayoutReader>,
34 projection: ExprRef,
35 filter: Option<ExprRef>,
36 row_range: Option<Range<u64>>,
38 selection: Selection,
40 split_by: SplitBy,
42 canonicalize: bool,
44 concurrency: usize,
46 metrics: VortexMetrics,
47}
48
49impl ScanBuilder {
50 pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
51 Self {
52 layout_reader,
53 projection: Identity::new_expr(),
54 filter: None,
55 row_range: None,
56 selection: Default::default(),
57 split_by: SplitBy::Layout,
58 canonicalize: false,
59 concurrency: 16,
62 metrics: Default::default(),
63 }
64 }
65
66 pub fn with_filter(mut self, filter: ExprRef) -> Self {
67 self.filter = Some(filter);
68 self
69 }
70
71 pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
72 self.filter = filter;
73 self
74 }
75
76 pub fn with_projection(mut self, projection: ExprRef) -> Self {
77 self.projection = projection;
78 self
79 }
80
81 pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
82 self.row_range = Some(row_range);
83 self
84 }
85
86 pub fn with_some_row_range(mut self, row_range: Option<Range<u64>>) -> Self {
87 self.row_range = row_range;
88 self
89 }
90
91 pub fn with_selection(mut self, selection: Selection) -> Self {
92 self.selection = selection;
93 self
94 }
95
96 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
97 self.selection = Selection::IncludeByIndex(row_indices);
98 self
99 }
100
101 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
102 self.split_by = split_by;
103 self
104 }
105
106 pub fn with_canonicalize(mut self, canonicalize: bool) -> Self {
108 self.canonicalize = canonicalize;
109 self
110 }
111
112 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
114 assert!(concurrency > 0);
115 self.concurrency = concurrency;
116 self
117 }
118
119 pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
120 self.metrics = metrics;
121 self
122 }
123
124 #[allow(clippy::unused_enumerate_index)]
125 fn build_tasks(
126 self,
127 ) -> VortexResult<Vec<impl Future<Output = VortexResult<Option<ArrayRef>>>>> {
128 let mut layout_reader = self.layout_reader;
131 if self.filter.is_some() {
132 layout_reader = Arc::new(FilterLayoutReader::new(layout_reader));
133 }
134
135 let projection = simplify_typed(self.projection.clone(), layout_reader.dtype())?;
137 let filter = self
138 .filter
139 .clone()
140 .map(|f| simplify_typed(f, layout_reader.dtype()))
141 .transpose()?;
142
143 let (filter_mask, projection_mask) =
145 filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
146 let field_mask: Vec<_> = filter_mask
147 .iter()
148 .cloned()
149 .chain(projection_mask.iter().cloned())
150 .collect();
151 let splits = self.split_by.splits(layout_reader.layout(), &field_mask)?;
152
153 let row_masks = splits
154 .into_iter()
155 .filter_map(|row_range| {
156 if let Some(scan_range) = &self.row_range {
157 if row_range.start >= scan_range.end || row_range.end < scan_range.start {
159 return None;
160 }
161 return Some(
163 row_range.start.max(scan_range.start)..row_range.end.min(scan_range.end),
164 );
165 } else {
166 Some(row_range)
167 }
168 })
169 .map(|row_range| self.selection.row_mask(&row_range))
170 .filter(|mask| !mask.mask().all_false())
171 .collect_vec();
172
173 row_masks
175 .into_iter()
176 .enumerate()
177 .map(|(_i, row_mask)| {
178 let row_range = row_mask.row_range();
179
180 let approx_filter_eval = filter
181 .as_ref()
182 .map(|expr| layout_reader.pruning_evaluation(&row_range, expr))
183 .transpose()?;
184 let exact_filter_eval = filter
185 .as_ref()
186 .map(|expr| layout_reader.filter_evaluation(&row_range, expr))
187 .transpose()?;
188 let project_eval = layout_reader.projection_evaluation(&row_range, &projection)?;
189
190 Ok::<_, VortexError>(instrument!("split", [split = _i], async move {
191 let mut mask = row_mask.mask().clone();
192 if mask.all_false() {
193 return Ok(None);
194 }
195
196 if let Some(approx_filter_eval) = approx_filter_eval {
197 log::debug!("Pruning row range {:?}", row_range);
199 mask = approx_filter_eval.invoke(mask).await?;
200 if mask.all_false() {
201 return Ok(None);
202 }
203 }
204
205 if let Some(exact_filter_eval) = exact_filter_eval {
206 log::debug!("Filtering row range {:?}", row_range);
208 mask = exact_filter_eval.invoke(mask).await?;
209 if mask.all_false() {
210 return Ok(None);
211 }
212 }
213
214 log::debug!("Projecting row range {:?}", row_range);
215 let mut array = project_eval.invoke(mask).await?;
216 if self.canonicalize {
217 log::debug!("Canonicalizing row range {:?}", row_range);
218 let mut builder = builder_with_capacity(array.dtype(), array.len());
219 array.append_to_builder(builder.as_mut())?;
220 array = builder.finish();
221 }
222
223 Ok(Some(array))
224 }))
225 })
226 .try_collect()
227 }
228
229 pub fn spawn_on<F, S>(self, mut spawner: S) -> VortexResult<impl ArrayStream + 'static>
231 where
232 F: Future<Output = VortexResult<Option<ArrayRef>>>,
233 S: FnMut(BoxFuture<'static, VortexResult<Option<ArrayRef>>>) -> F + 'static,
234 {
235 let concurrency = self.concurrency;
236 let dtype = self.projection.return_dtype(self.layout_reader.dtype())?;
237 let tasks = self.build_tasks()?;
238
239 let array_stream = stream::iter(tasks)
240 .map(move |task| spawner(task.boxed()))
241 .buffered(concurrency)
242 .filter_map(|v| async move { v.transpose() });
243
244 Ok(ArrayStreamAdapter::new(
245 dtype,
246 instrument!("array_stream", array_stream),
247 ))
248 }
249
250 #[cfg(feature = "tokio")]
255 pub fn spawn_tokio(
256 self,
257 handle: tokio::runtime::Handle,
258 ) -> VortexResult<impl ArrayStream + 'static> {
259 self.spawn_on(move |task| {
260 let handle = handle.clone();
261 async move {
262 handle
263 .spawn(task)
264 .await
265 .vortex_expect("Failed to join task")
266 }
267 })
268 }
269
270 #[cfg(feature = "tokio")]
273 pub fn spawn_tokio_blocking(
274 self,
275 handle: tokio::runtime::Handle,
276 ) -> VortexResult<impl ArrayStream + 'static> {
277 self.spawn_on(move |task| {
278 let handle = handle.clone();
279 async move {
280 handle
281 .spawn_blocking(|| futures::executor::block_on(task))
282 .await
283 .vortex_expect("Failed to join task")
284 }
285 })
286 }
287
288 pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + 'static> {
291 self.spawn_on(|task| task)
292 }
293
294 pub fn into_array_iter(self) -> VortexResult<impl ArrayIterator + 'static> {
299 let mut local_pool = LocalPool::new();
300 let spawner = local_pool.spawner();
301 let array_stream = self.spawn_on(move |task| {
302 spawner
303 .spawn_local_with_handle(task)
304 .map_err(|e| vortex_err!("Failed to spawn task: {e}"))
305 .vortex_expect("Failed to spawn task")
306 })?;
307
308 let mut array_stream = Box::pin(array_stream);
309 Ok(ArrayIteratorAdapter::new(
310 array_stream.dtype().clone(),
311 iter::from_fn(move || local_pool.run_until(array_stream.next())),
312 ))
313 }
314}
315
316fn filter_and_projection_masks(
320 projection: &ExprRef,
321 filter: Option<&ExprRef>,
322 dtype: &DType,
323) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
324 let Some(struct_dtype) = dtype.as_struct() else {
325 return Ok(match filter {
326 Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
327 None => (Vec::new(), vec![FieldMask::All]),
328 });
329 };
330 let projection_mask = immediate_scope_access(projection, struct_dtype)?;
331 Ok(match filter {
332 None => (
333 Vec::new(),
334 projection_mask.into_iter().map(to_field_mask).collect_vec(),
335 ),
336 Some(f) => {
337 let filter_mask = immediate_scope_access(f, struct_dtype)?;
338 let only_projection_mask = projection_mask
339 .difference(&filter_mask)
340 .cloned()
341 .map(to_field_mask)
342 .collect_vec();
343 (
344 filter_mask.into_iter().map(to_field_mask).collect_vec(),
345 only_projection_mask,
346 )
347 }
348 })
349}
350
351fn to_field_mask(field: FieldName) -> FieldMask {
352 FieldMask::Prefix(FieldPath::from(Field::Name(field)))
353}