1use std::sync::Arc;
2
3use executor::{Executor as _, TaskExecutor, ThreadsExecutor};
4use futures::{Stream, StreamExt, stream};
5use itertools::Itertools;
6pub use split_by::*;
7use vortex_array::builders::builder_with_capacity;
8use vortex_array::stream::{ArrayStream, ArrayStreamAdapter, ArrayStreamExt};
9use vortex_array::{Array, ArrayContext, ArrayRef};
10use vortex_buffer::Buffer;
11use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
12use vortex_error::{ResultExt, VortexExpect, VortexResult, vortex_err};
13use vortex_expr::transform::immediate_access::immediate_scope_access;
14use vortex_expr::transform::simplify_typed::simplify_typed;
15use vortex_expr::{ExprRef, Identity};
16use vortex_mask::Mask;
17use vortex_metrics::VortexMetrics;
18
19use crate::scan::filter::FilterExpr;
20use crate::scan::unified::UnifiedDriverStream;
21use crate::segments::{AsyncSegmentReader, RowRangePruner, SegmentCollector, SegmentStream};
22use crate::{
23 ExprEvaluator, Layout, LayoutReader, LayoutReaderExt, RowMask, instrument, range_intersection,
24};
25
26pub mod executor;
27pub(crate) mod filter;
28mod split_by;
29pub mod unified;
30
31pub trait ScanDriver: 'static + Sized {
32 fn segment_reader(&self) -> Arc<dyn AsyncSegmentReader>;
33
34 fn io_stream(self, segments: SegmentStream) -> impl Stream<Item = VortexResult<()>>;
44}
45
46pub struct ScanBuilder<D: ScanDriver> {
48 driver: D,
49 task_executor: Option<TaskExecutor>,
50 layout: Layout,
51 ctx: ArrayContext, projection: ExprRef,
53 filter: Option<ExprRef>,
54 row_indices: Option<Buffer<u64>>,
55 split_by: SplitBy,
56 canonicalize: bool,
57 concurrency: usize,
59 prefetch_conjuncts: bool,
60 metrics: VortexMetrics,
61}
62
63impl<D: ScanDriver> ScanBuilder<D> {
64 pub fn new(driver: D, layout: Layout, ctx: ArrayContext) -> Self {
65 Self {
66 driver,
67 task_executor: None,
68 layout,
69 ctx,
70 projection: Identity::new_expr(),
71 filter: None,
72 row_indices: None,
73 split_by: SplitBy::Layout,
74 canonicalize: false,
75 prefetch_conjuncts: false,
76 concurrency: 1024,
77 metrics: Default::default(),
78 }
79 }
80
81 pub fn with_filter(mut self, filter: ExprRef) -> Self {
82 self.filter = Some(filter);
83 self
84 }
85
86 pub fn with_some_filter(mut self, filter: Option<ExprRef>) -> Self {
87 self.filter = filter;
88 self
89 }
90
91 pub fn with_projection(mut self, projection: ExprRef) -> Self {
92 self.projection = projection;
93 self
94 }
95
96 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
97 self.row_indices = Some(row_indices);
98 self
99 }
100
101 pub fn with_some_row_indices(mut self, row_indices: Option<Buffer<u64>>) -> Self {
102 self.row_indices = row_indices;
103 self
104 }
105
106 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
107 self.split_by = split_by;
108 self
109 }
110
111 pub fn with_canonicalize(mut self, canonicalize: bool) -> Self {
113 self.canonicalize = canonicalize;
114 self
115 }
116
117 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
119 assert!(concurrency > 0);
120 self.concurrency = concurrency;
121 self
122 }
123
124 pub fn with_prefetch_conjuncts(mut self, prefetch: bool) -> Self {
126 self.prefetch_conjuncts = prefetch;
127 self
128 }
129
130 pub fn with_task_executor(mut self, task_executor: TaskExecutor) -> Self {
131 self.task_executor = Some(task_executor);
132 self
133 }
134
135 pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
136 self.metrics = metrics;
137 self
138 }
139
140 pub fn build(self) -> VortexResult<Scan<D>> {
141 let projection = simplify_typed(self.projection.clone(), self.layout.dtype())?;
142 let filter = self
143 .filter
144 .clone()
145 .map(|f| simplify_typed(f, self.layout.dtype()))
146 .transpose()?;
147 let (filter_mask, projection_mask) =
148 filter_and_projection_masks(&projection, filter.as_ref(), self.layout.dtype())?;
149
150 let field_mask: Vec<_> = filter_mask
151 .iter()
152 .cloned()
153 .chain(projection_mask.iter().cloned())
154 .collect();
155
156 let splits = self.split_by.splits(&self.layout, &field_mask)?;
157 let mut collector = SegmentCollector::new(self.metrics.clone());
158 self.layout
159 .required_segments(0, &filter_mask, &projection_mask, &mut collector)?;
160 let (mut row_range_pruner, segments) = collector.finish();
161 let row_indices = self.row_indices.clone();
162 if let Some(indices) = &row_indices {
163 row_range_pruner.retain_matching(indices.clone());
164 }
165
166 let row_masks = splits
167 .into_iter()
168 .filter_map(move |row_range| {
169 let Some(row_indices) = &row_indices else {
170 return Some(RowMask::new_valid_between(row_range.start, row_range.end));
172 };
173
174 let intersection = range_intersection(&row_range, row_indices)?;
176
177 let filter_mask = Mask::from_indices(
179 usize::try_from(row_range.end - row_range.start)
180 .vortex_expect("Split ranges are within usize"),
181 row_indices[intersection]
182 .iter()
183 .map(|&idx| {
184 usize::try_from(idx - row_range.start)
185 .vortex_expect("index within range")
186 })
187 .collect(),
188 );
189 Some(RowMask::new(filter_mask, row_range.start))
190 })
191 .collect_vec();
192
193 Ok(Scan {
194 driver: self.driver,
195 task_executor: self
196 .task_executor
197 .unwrap_or(TaskExecutor::Threads(ThreadsExecutor::default())),
198 layout: self.layout,
199 ctx: self.ctx,
200 projection,
201 filter,
202 row_masks,
203 canonicalize: self.canonicalize,
204 concurrency: self.concurrency,
205 prefetch_conjuncts: self.prefetch_conjuncts,
206 row_range_pruner,
207 segments,
208 })
209 }
210
211 pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + 'static> {
213 self.build()?.into_array_stream()
214 }
215
216 pub async fn read_all(self) -> VortexResult<ArrayRef> {
217 self.into_array_stream()?.read_all().await
218 }
219}
220
221fn filter_and_projection_masks(
225 projection: &ExprRef,
226 filter: Option<&ExprRef>,
227 dtype: &DType,
228) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
229 let Some(struct_dtype) = dtype.as_struct() else {
230 return Ok(match filter {
231 Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
232 None => (Vec::new(), vec![FieldMask::All]),
233 });
234 };
235 let projection_mask = immediate_scope_access(projection, struct_dtype)?;
236 Ok(match filter {
237 None => (
238 Vec::new(),
239 projection_mask.into_iter().map(to_field_mask).collect_vec(),
240 ),
241 Some(f) => {
242 let filter_mask = immediate_scope_access(f, struct_dtype)?;
243 let only_projection_mask = projection_mask
244 .difference(&filter_mask)
245 .cloned()
246 .map(to_field_mask)
247 .collect_vec();
248 (
249 filter_mask.into_iter().map(to_field_mask).collect_vec(),
250 only_projection_mask,
251 )
252 }
253 })
254}
255
256fn to_field_mask(field: FieldName) -> FieldMask {
257 FieldMask::Prefix(FieldPath::from(Field::Name(field)))
258}
259
260pub struct Scan<D> {
261 driver: D,
262 task_executor: TaskExecutor,
263 layout: Layout,
264 ctx: ArrayContext,
265 projection: ExprRef,
267 filter: Option<ExprRef>,
269 row_masks: Vec<RowMask>,
270 canonicalize: bool,
271 concurrency: usize,
273 prefetch_conjuncts: bool,
274 row_range_pruner: RowRangePruner,
275 segments: SegmentStream,
276}
277
278impl<D: ScanDriver> Scan<D> {
279 pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + 'static> {
284 let segment_reader = self.driver.segment_reader();
286 let task_executor = self.task_executor.clone();
287 let reader: Arc<dyn LayoutReader> = self.layout.reader(segment_reader, self.ctx.clone())?;
288
289 let pruning = self
290 .filter
291 .as_ref()
292 .map(|filter| {
293 let pruning = Arc::new(FilterExpr::try_new(
294 reader.dtype().as_struct().ok_or_else(|| {
295 vortex_err!("Vortex scan currently only works for struct arrays")
296 })?,
297 filter.clone(),
298 self.prefetch_conjuncts,
299 )?);
300
301 VortexResult::Ok(pruning)
302 })
303 .transpose()?;
304
305 let row_masks = stream::iter(self.row_masks);
307 let projection = self.projection.clone();
308 let row_range_pruner = self.row_range_pruner.clone();
309
310 let exec_stream = row_masks
311 .map(move |row_mask| {
312 let reader = reader.clone();
313 let projection = projection.clone();
314 let pruning = pruning.clone();
315 let reader = reader.clone();
316 let mut row_range_pruner = row_range_pruner.clone();
317
318 instrument!("process", async move {
320 let row_mask = match pruning {
321 None => row_mask,
322 Some(pruning_filter) => {
323 pruning_filter
324 .new_evaluation(&row_mask)
325 .evaluate(reader.clone())
326 .await?
327 }
328 };
329
330 if row_mask.filter_mask().all_false() {
332 row_range_pruner
333 .remove(row_mask.begin()..row_mask.end())
334 .await?;
335 Ok(None)
336 } else {
337 let mut array = reader.evaluate_expr(row_mask, projection).await?;
338 if self.canonicalize {
339 let mut builder = builder_with_capacity(array.dtype(), array.len());
340 array.append_to_builder(builder.as_mut())?;
341 array = builder.finish();
342 }
343 VortexResult::Ok(Some(array))
344 }
345 })
346 })
347 .map(move |processing_task| task_executor.spawn(processing_task))
348 .buffered(self.concurrency)
349 .filter_map(|v| async move { v.unnest().transpose() });
350
351 let exec_stream = instrument!("exec_stream", exec_stream);
352 let io_stream = self.driver.io_stream(self.segments);
353
354 let unified = UnifiedDriverStream {
355 exec_stream,
356 io_stream,
357 };
358
359 let result_dtype = self.projection.return_dtype(self.layout.dtype())?;
360 Ok(ArrayStreamAdapter::new(result_dtype, unified))
361 }
362
363 pub async fn read_all(self) -> VortexResult<ArrayRef> {
364 self.into_array_stream()?.read_all().await
365 }
366}