1use std::iter;
2use std::sync::Arc;
3
4use futures::executor::LocalPool;
5use futures::future::BoxFuture;
6use futures::task::LocalSpawnExt;
7use futures::{FutureExt, StreamExt, stream};
8use itertools::Itertools;
9pub use selection::*;
10pub use split_by::*;
11use vortex_array::builders::builder_with_capacity;
12use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
13use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
14use vortex_array::{Array, ArrayRef};
15use vortex_buffer::Buffer;
16use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
17use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_err};
18use vortex_expr::transform::immediate_access::immediate_scope_access;
19use vortex_expr::transform::simplify_typed::simplify_typed;
20use vortex_expr::{ExprRef, Identity};
21use vortex_metrics::{VortexMetrics, instrument};
22
23use crate::layouts::filter::FilterLayoutReader;
24use crate::{ExprEvaluator, LayoutReader};
25
26pub mod row_mask;
27mod selection;
28mod split_by;
29
30pub struct ScanBuilder {
32 layout_reader: Arc<dyn LayoutReader>,
33 projection: ExprRef,
34 filter: Option<ExprRef>,
35 selection: Selection,
36 split_by: SplitBy,
38 canonicalize: bool,
40 concurrency: usize,
42 metrics: VortexMetrics,
43}
44
45impl ScanBuilder {
46 pub fn new(layout_reader: Arc<dyn LayoutReader>) -> Self {
47 Self {
48 layout_reader,
49 projection: Identity::new_expr(),
50 filter: None,
51 selection: Default::default(),
52 split_by: SplitBy::Layout,
53 canonicalize: false,
54 concurrency: 16,
57 metrics: Default::default(),
58 }
59 }
60
61 pub fn with_filter(mut self, filter: ExprRef) -> Self {
62 self.filter = Some(filter);
63 self
64 }
65
66 pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
67 self.filter = filter;
68 self
69 }
70
71 pub fn with_projection(mut self, projection: ExprRef) -> Self {
72 self.projection = projection;
73 self
74 }
75
76 pub fn with_selection(mut self, selection: Selection) -> Self {
77 self.selection = selection;
78 self
79 }
80
81 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
82 self.selection = Selection::IncludeByIndex(row_indices);
83 self
84 }
85
86 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
87 self.split_by = split_by;
88 self
89 }
90
91 pub fn with_canonicalize(mut self, canonicalize: bool) -> Self {
93 self.canonicalize = canonicalize;
94 self
95 }
96
97 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
99 assert!(concurrency > 0);
100 self.concurrency = concurrency;
101 self
102 }
103
104 pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
105 self.metrics = metrics;
106 self
107 }
108
109 #[allow(clippy::unused_enumerate_index)]
110 fn build_tasks(
111 self,
112 ) -> VortexResult<Vec<impl Future<Output = VortexResult<Option<ArrayRef>>>>> {
113 let mut layout_reader = self.layout_reader;
116 if self.filter.is_some() {
117 layout_reader = Arc::new(FilterLayoutReader::new(layout_reader));
118 }
119
120 let projection = simplify_typed(self.projection.clone(), layout_reader.dtype())?;
122 let filter = self
123 .filter
124 .clone()
125 .map(|f| simplify_typed(f, layout_reader.dtype()))
126 .transpose()?;
127
128 let (filter_mask, projection_mask) =
130 filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
131 let field_mask: Vec<_> = filter_mask
132 .iter()
133 .cloned()
134 .chain(projection_mask.iter().cloned())
135 .collect();
136 let splits = self.split_by.splits(layout_reader.layout(), &field_mask)?;
137
138 let row_masks = splits
139 .into_iter()
140 .map(|row_range| self.selection.row_mask(&row_range))
141 .filter(|mask| !mask.mask().all_false())
142 .collect_vec();
143
144 row_masks
146 .into_iter()
147 .enumerate()
148 .map(|(_i, row_mask)| {
149 let row_range = row_mask.row_range();
150
151 let approx_filter_eval = filter
152 .as_ref()
153 .map(|expr| layout_reader.pruning_evaluation(&row_range, expr))
154 .transpose()?;
155 let exact_filter_eval = filter
156 .as_ref()
157 .map(|expr| layout_reader.filter_evaluation(&row_range, expr))
158 .transpose()?;
159 let project_eval = layout_reader.projection_evaluation(&row_range, &projection)?;
160
161 Ok::<_, VortexError>(instrument!("split", [split = _i], async move {
162 let mut mask = row_mask.mask().clone();
163 if mask.all_false() {
164 return Ok(None);
165 }
166
167 if let Some(approx_filter_eval) = approx_filter_eval {
168 log::debug!("Pruning row range {:?}", row_range);
170 mask = approx_filter_eval.invoke(mask).await?;
171 if mask.all_false() {
172 return Ok(None);
173 }
174 }
175
176 if let Some(exact_filter_eval) = exact_filter_eval {
177 log::debug!("Filtering row range {:?}", row_range);
179 mask = exact_filter_eval.invoke(mask).await?;
180 if mask.all_false() {
181 return Ok(None);
182 }
183 }
184
185 log::debug!("Projecting row range {:?}", row_range);
186 let mut array = project_eval.invoke(mask).await?;
187 if self.canonicalize {
188 log::debug!("Canonicalizing row range {:?}", row_range);
189 let mut builder = builder_with_capacity(array.dtype(), array.len());
190 array.append_to_builder(builder.as_mut())?;
191 array = builder.finish();
192 }
193
194 Ok(Some(array))
195 }))
196 })
197 .try_collect()
198 }
199
200 pub fn spawn_on<F, S>(self, mut spawner: S) -> VortexResult<impl ArrayStream + 'static>
202 where
203 F: Future<Output = VortexResult<Option<ArrayRef>>>,
204 S: FnMut(BoxFuture<'static, VortexResult<Option<ArrayRef>>>) -> F + 'static,
205 {
206 let concurrency = self.concurrency;
207 let dtype = self.projection.return_dtype(self.layout_reader.dtype())?;
208 let tasks = self.build_tasks()?;
209
210 let array_stream = stream::iter(tasks)
211 .map(move |task| spawner(task.boxed()))
212 .buffered(concurrency)
213 .filter_map(|v| async move { v.transpose() });
214
215 Ok(ArrayStreamAdapter::new(
216 dtype,
217 instrument!("array_stream", array_stream),
218 ))
219 }
220
221 #[cfg(feature = "tokio")]
226 pub fn spawn_tokio(
227 self,
228 handle: tokio::runtime::Handle,
229 ) -> VortexResult<impl ArrayStream + 'static> {
230 self.spawn_on(move |task| {
231 let handle = handle.clone();
232 async move {
233 handle
234 .spawn(task)
235 .await
236 .vortex_expect("Failed to join task")
237 }
238 })
239 }
240
241 #[cfg(feature = "tokio")]
244 pub fn spawn_tokio_blocking(
245 self,
246 handle: tokio::runtime::Handle,
247 ) -> VortexResult<impl ArrayStream + 'static> {
248 self.spawn_on(move |task| {
249 let handle = handle.clone();
250 async move {
251 handle
252 .spawn_blocking(|| futures::executor::block_on(task))
253 .await
254 .vortex_expect("Failed to join task")
255 }
256 })
257 }
258
259 pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + 'static> {
262 self.spawn_on(|task| task)
263 }
264
265 pub fn into_array_iter(self) -> VortexResult<impl ArrayIterator + 'static> {
270 let mut local_pool = LocalPool::new();
271 let spawner = local_pool.spawner();
272 let array_stream = self.spawn_on(move |task| {
273 spawner
274 .spawn_local_with_handle(task)
275 .map_err(|e| vortex_err!("Failed to spawn task: {e}"))
276 .vortex_expect("Failed to spawn task")
277 })?;
278
279 let mut array_stream = Box::pin(array_stream);
280 Ok(ArrayIteratorAdapter::new(
281 array_stream.dtype().clone(),
282 iter::from_fn(move || local_pool.run_until(array_stream.next())),
283 ))
284 }
285}
286
287fn filter_and_projection_masks(
291 projection: &ExprRef,
292 filter: Option<&ExprRef>,
293 dtype: &DType,
294) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
295 let Some(struct_dtype) = dtype.as_struct() else {
296 return Ok(match filter {
297 Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
298 None => (Vec::new(), vec![FieldMask::All]),
299 });
300 };
301 let projection_mask = immediate_scope_access(projection, struct_dtype)?;
302 Ok(match filter {
303 None => (
304 Vec::new(),
305 projection_mask.into_iter().map(to_field_mask).collect_vec(),
306 ),
307 Some(f) => {
308 let filter_mask = immediate_scope_access(f, struct_dtype)?;
309 let only_projection_mask = projection_mask
310 .difference(&filter_mask)
311 .cloned()
312 .map(to_field_mask)
313 .collect_vec();
314 (
315 filter_mask.into_iter().map(to_field_mask).collect_vec(),
316 only_projection_mask,
317 )
318 }
319 })
320}
321
322fn to_field_mask(field: FieldName) -> FieldMask {
323 FieldMask::Prefix(FieldPath::from(Field::Name(field)))
324}