1use std::ops::Range;
5use std::sync::Arc;
6
7use futures::Stream;
8use futures::future::BoxFuture;
9use itertools::Itertools;
10use vortex_array::ArrayRef;
11use vortex_array::expr::Expression;
12use vortex_array::expr::analysis::immediate_access::immediate_scope_access;
13use vortex_array::expr::root;
14use vortex_array::iter::ArrayIterator;
15use vortex_array::iter::ArrayIteratorAdapter;
16use vortex_array::stats::StatsSet;
17use vortex_array::stream::ArrayStream;
18use vortex_array::stream::ArrayStreamAdapter;
19use vortex_buffer::Buffer;
20use vortex_dtype::DType;
21use vortex_dtype::Field;
22use vortex_dtype::FieldMask;
23use vortex_dtype::FieldName;
24use vortex_dtype::FieldPath;
25use vortex_error::VortexResult;
26use vortex_error::vortex_bail;
27use vortex_io::runtime::BlockingRuntime;
28use vortex_layout::LayoutReader;
29use vortex_layout::LayoutReaderRef;
30use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
31use vortex_metrics::VortexMetrics;
32use vortex_session::VortexSession;
33
34use crate::RepeatedScan;
35use crate::selection::Selection;
36use crate::split_by::SplitBy;
37use crate::splits::Splits;
38use crate::splits::attempt_split_ranges;
39
40pub struct ScanBuilder<A> {
42 session: VortexSession,
43 layout_reader: LayoutReaderRef,
44 projection: Expression,
45 filter: Option<Expression>,
46 ordered: bool,
48 row_range: Option<Range<u64>>,
50 selection: Selection,
53 split_by: SplitBy,
55 concurrency: usize,
57 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
59 metrics: VortexMetrics,
60 file_stats: Option<Arc<[StatsSet]>>,
62 limit: Option<usize>,
64 row_offset: u64,
67}
68
69impl ScanBuilder<ArrayRef> {
70 pub fn new(session: VortexSession, layout_reader: Arc<dyn LayoutReader>) -> Self {
71 Self {
72 session,
73 layout_reader,
74 projection: root(),
75 filter: None,
76 ordered: true,
77 row_range: None,
78 selection: Default::default(),
79 split_by: SplitBy::Layout,
80 concurrency: 4,
83 map_fn: Arc::new(Ok),
84 metrics: Default::default(),
85 file_stats: None,
86 limit: None,
87 row_offset: 0,
88 }
89 }
90
91 pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
95 let dtype = self.dtype()?;
96 let stream = self.into_stream()?;
97 Ok(ArrayStreamAdapter::new(dtype, stream))
98 }
99
100 pub fn into_array_iter<B: BlockingRuntime>(
102 self,
103 runtime: &B,
104 ) -> VortexResult<impl ArrayIterator + 'static> {
105 let stream = self.into_array_stream()?;
106 let dtype = stream.dtype().clone();
107 Ok(ArrayIteratorAdapter::new(
108 dtype,
109 runtime.block_on_stream(stream),
110 ))
111 }
112}
113
114impl<A: 'static + Send> ScanBuilder<A> {
115 pub fn with_filter(mut self, filter: Expression) -> Self {
116 self.filter = Some(filter);
117 self
118 }
119
120 pub fn with_some_filter(mut self, filter: Option<Expression>) -> Self {
121 self.filter = filter;
122 self
123 }
124
125 pub fn with_projection(mut self, projection: Expression) -> Self {
126 self.projection = projection;
127 self
128 }
129
130 pub fn with_ordered(mut self, ordered: bool) -> Self {
131 self.ordered = ordered;
132 self
133 }
134
135 pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
136 self.row_range = Some(row_range);
137 self
138 }
139
140 pub fn with_selection(mut self, selection: Selection) -> Self {
141 self.selection = selection;
142 self
143 }
144
145 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
146 self.selection = Selection::IncludeByIndex(row_indices);
147 self
148 }
149
150 pub fn with_row_offset(mut self, row_offset: u64) -> Self {
151 self.row_offset = row_offset;
152 self
153 }
154
155 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
156 self.split_by = split_by;
157 self
158 }
159
160 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
163 assert!(concurrency > 0);
164 self.concurrency = concurrency;
165 self
166 }
167
168 pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
169 self.metrics = metrics;
170 self
171 }
172
173 pub fn with_limit(mut self, limit: usize) -> Self {
174 self.limit = Some(limit);
175 self
176 }
177
178 pub fn dtype(&self) -> VortexResult<DType> {
180 self.projection.return_dtype(self.layout_reader.dtype())
181 }
182
183 pub fn session(&self) -> &VortexSession {
185 &self.session
186 }
187
188 pub fn map<B: 'static>(
190 self,
191 map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
192 ) -> ScanBuilder<B> {
193 let old_map_fn = self.map_fn;
194 ScanBuilder {
195 session: self.session,
196 layout_reader: self.layout_reader,
197 projection: self.projection,
198 filter: self.filter,
199 ordered: self.ordered,
200 row_range: self.row_range,
201 selection: self.selection,
202 split_by: self.split_by,
203 concurrency: self.concurrency,
204 metrics: self.metrics,
205 file_stats: self.file_stats,
206 limit: self.limit,
207 row_offset: self.row_offset,
208 map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
209 }
210 }
211
212 pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
213 let dtype = self.dtype()?;
214
215 if self.filter.is_some() && self.limit.is_some() {
216 vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
217 }
218
219 let mut layout_reader = self.layout_reader;
222
223 layout_reader = Arc::new(RowIdxLayoutReader::new(
227 self.row_offset,
228 layout_reader,
229 self.session.clone(),
230 ));
231
232 let projection = self.projection.optimize_recursive(layout_reader.dtype())?;
234
235 let filter = self
236 .filter
237 .map(|f| f.optimize_recursive(layout_reader.dtype()))
238 .transpose()?;
239
240 let (filter_mask, projection_mask) =
242 filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
243 let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
244
245 let splits =
246 if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) {
247 Splits::Ranges(ranges)
248 } else {
249 let split_range = self
250 .row_range
251 .clone()
252 .unwrap_or_else(|| 0..layout_reader.row_count());
253 Splits::Natural(self.split_by.splits(
254 layout_reader.as_ref(),
255 &split_range,
256 &field_mask,
257 )?)
258 };
259
260 Ok(RepeatedScan::new(
261 self.session.clone(),
262 layout_reader,
263 projection,
264 filter,
265 self.ordered,
266 self.row_range,
267 self.selection,
268 splits,
269 self.concurrency,
270 self.map_fn,
271 self.limit,
272 dtype,
273 ))
274 }
275
276 pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
278 if self.limit.is_some_and(|l| l == 0) {
280 return Ok(vec![]);
281 }
282
283 self.prepare()?.execute(None)
284 }
285
286 pub fn into_stream(
288 self,
289 ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
290 self.prepare()?.execute_stream(None)
291 }
292
293 pub fn into_iter<B: BlockingRuntime>(
295 self,
296 runtime: &B,
297 ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
298 let stream = self.into_stream()?;
299 Ok(runtime.block_on_stream(stream))
300 }
301}
302
303pub(crate) fn filter_and_projection_masks(
307 projection: &Expression,
308 filter: Option<&Expression>,
309 dtype: &DType,
310) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
311 let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
312 return Ok(match filter {
313 Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
314 None => (Vec::new(), vec![FieldMask::All]),
315 });
316 };
317 let projection_mask = immediate_scope_access(projection, struct_dtype);
318 Ok(match filter {
319 None => (
320 Vec::new(),
321 projection_mask.into_iter().map(to_field_mask).collect_vec(),
322 ),
323 Some(f) => {
324 let filter_mask = immediate_scope_access(f, struct_dtype);
325 let only_projection_mask = projection_mask
326 .difference(&filter_mask)
327 .cloned()
328 .map(to_field_mask)
329 .collect_vec();
330 (
331 filter_mask.into_iter().map(to_field_mask).collect_vec(),
332 only_projection_mask,
333 )
334 }
335 })
336}
337
338fn to_field_mask(field: FieldName) -> FieldMask {
339 FieldMask::Prefix(FieldPath::from(Field::Name(field)))
340}