1use std::ops::Range;
5use std::sync::Arc;
6
7use futures::Stream;
8use futures::future::BoxFuture;
9use itertools::Itertools;
10use vortex_array::ArrayRef;
11use vortex_array::expr::transform::immediate_access::immediate_scope_access;
12use vortex_array::expr::transform::simplify_typed;
13use vortex_array::expr::{Expression, root};
14use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
15use vortex_array::stats::StatsSet;
16use vortex_array::stream::{ArrayStream, ArrayStreamAdapter};
17use vortex_buffer::Buffer;
18use vortex_dtype::{DType, Field, FieldMask, FieldName, FieldPath};
19use vortex_error::{VortexResult, vortex_bail};
20use vortex_io::runtime::BlockingRuntime;
21use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
22use vortex_layout::{LayoutReader, LayoutReaderRef};
23use vortex_metrics::VortexMetrics;
24use vortex_session::VortexSession;
25
26use crate::RepeatedScan;
27use crate::selection::Selection;
28use crate::split_by::SplitBy;
29use crate::splits::{Splits, attempt_split_ranges};
30
31pub struct ScanBuilder<A> {
33 session: VortexSession,
34 layout_reader: LayoutReaderRef,
35 projection: Expression,
36 filter: Option<Expression>,
37 ordered: bool,
39 row_range: Option<Range<u64>>,
41 selection: Selection,
44 split_by: SplitBy,
46 concurrency: usize,
48 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
50 metrics: VortexMetrics,
51 file_stats: Option<Arc<[StatsSet]>>,
53 limit: Option<usize>,
55 row_offset: u64,
58}
59
60impl ScanBuilder<ArrayRef> {
61 pub fn new(session: VortexSession, layout_reader: Arc<dyn LayoutReader>) -> Self {
62 Self {
63 session,
64 layout_reader,
65 projection: root(),
66 filter: None,
67 ordered: true,
68 row_range: None,
69 selection: Default::default(),
70 split_by: SplitBy::Layout,
71 concurrency: 4,
74 map_fn: Arc::new(Ok),
75 metrics: Default::default(),
76 file_stats: None,
77 limit: None,
78 row_offset: 0,
79 }
80 }
81
82 pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
86 let dtype = self.dtype()?;
87 let stream = self.into_stream()?;
88 Ok(ArrayStreamAdapter::new(dtype, stream))
89 }
90
91 pub fn into_array_iter<B: BlockingRuntime>(
93 self,
94 runtime: &B,
95 ) -> VortexResult<impl ArrayIterator + 'static> {
96 let stream = self.into_array_stream()?;
97 let dtype = stream.dtype().clone();
98 Ok(ArrayIteratorAdapter::new(
99 dtype,
100 runtime.block_on_stream(stream),
101 ))
102 }
103}
104
105impl<A: 'static + Send> ScanBuilder<A> {
106 pub fn with_filter(mut self, filter: Expression) -> Self {
107 self.filter = Some(filter);
108 self
109 }
110
111 pub fn with_some_filter(mut self, filter: Option<Expression>) -> Self {
112 self.filter = filter;
113 self
114 }
115
116 pub fn with_projection(mut self, projection: Expression) -> Self {
117 self.projection = projection;
118 self
119 }
120
121 pub fn with_ordered(mut self, ordered: bool) -> Self {
122 self.ordered = ordered;
123 self
124 }
125
126 pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
127 self.row_range = Some(row_range);
128 self
129 }
130
131 pub fn with_selection(mut self, selection: Selection) -> Self {
132 self.selection = selection;
133 self
134 }
135
136 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
137 self.selection = Selection::IncludeByIndex(row_indices);
138 self
139 }
140
141 pub fn with_row_offset(mut self, row_offset: u64) -> Self {
142 self.row_offset = row_offset;
143 self
144 }
145
146 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
147 self.split_by = split_by;
148 self
149 }
150
151 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
154 assert!(concurrency > 0);
155 self.concurrency = concurrency;
156 self
157 }
158
159 pub fn with_metrics(mut self, metrics: VortexMetrics) -> Self {
160 self.metrics = metrics;
161 self
162 }
163
164 pub fn with_limit(mut self, limit: usize) -> Self {
165 self.limit = Some(limit);
166 self
167 }
168
169 pub fn dtype(&self) -> VortexResult<DType> {
171 self.projection.return_dtype(self.layout_reader.dtype())
172 }
173
174 pub fn map<B: 'static>(
176 self,
177 map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
178 ) -> ScanBuilder<B> {
179 let old_map_fn = self.map_fn;
180 ScanBuilder {
181 session: self.session,
182 layout_reader: self.layout_reader,
183 projection: self.projection,
184 filter: self.filter,
185 ordered: self.ordered,
186 row_range: self.row_range,
187 selection: self.selection,
188 split_by: self.split_by,
189 concurrency: self.concurrency,
190 metrics: self.metrics,
191 file_stats: self.file_stats,
192 limit: self.limit,
193 row_offset: self.row_offset,
194 map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
195 }
196 }
197
198 pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
199 let dtype = self.dtype()?;
200
201 if self.filter.is_some() && self.limit.is_some() {
202 vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
203 }
204
205 let mut layout_reader = self.layout_reader;
208
209 layout_reader = Arc::new(RowIdxLayoutReader::new(self.row_offset, layout_reader));
213
214 let projection = simplify_typed(self.projection, layout_reader.dtype())?;
216 let filter = self
217 .filter
218 .map(|f| simplify_typed(f, layout_reader.dtype()))
219 .transpose()?;
220
221 let (filter_mask, projection_mask) =
223 filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
224 let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
225
226 let splits =
227 if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) {
228 Splits::Ranges(ranges)
229 } else {
230 let split_range = self
231 .row_range
232 .clone()
233 .unwrap_or_else(|| 0..layout_reader.row_count());
234 Splits::Natural(self.split_by.splits(
235 layout_reader.as_ref(),
236 &split_range,
237 &field_mask,
238 )?)
239 };
240
241 Ok(RepeatedScan::new(
242 self.session.clone(),
243 layout_reader,
244 projection,
245 filter,
246 self.ordered,
247 self.row_range,
248 self.selection,
249 splits,
250 self.concurrency,
251 self.map_fn,
252 self.limit,
253 dtype,
254 ))
255 }
256
257 pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
259 if self.limit.is_some_and(|l| l == 0) {
261 return Ok(vec![]);
262 }
263
264 self.prepare()?.execute(None)
265 }
266
267 pub fn into_stream(
269 self,
270 ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
271 self.prepare()?.execute_stream(None)
272 }
273
274 pub fn into_iter<B: BlockingRuntime>(
276 self,
277 runtime: &B,
278 ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
279 let stream = self.into_stream()?;
280 Ok(runtime.block_on_stream(stream))
281 }
282}
283
284pub(crate) fn filter_and_projection_masks(
288 projection: &Expression,
289 filter: Option<&Expression>,
290 dtype: &DType,
291) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
292 let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
293 return Ok(match filter {
294 Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
295 None => (Vec::new(), vec![FieldMask::All]),
296 });
297 };
298 let projection_mask = immediate_scope_access(projection, struct_dtype);
299 Ok(match filter {
300 None => (
301 Vec::new(),
302 projection_mask.into_iter().map(to_field_mask).collect_vec(),
303 ),
304 Some(f) => {
305 let filter_mask = immediate_scope_access(f, struct_dtype);
306 let only_projection_mask = projection_mask
307 .difference(&filter_mask)
308 .cloned()
309 .map(to_field_mask)
310 .collect_vec();
311 (
312 filter_mask.into_iter().map(to_field_mask).collect_vec(),
313 only_projection_mask,
314 )
315 }
316 })
317}
318
319fn to_field_mask(field: FieldName) -> FieldMask {
320 FieldMask::Prefix(FieldPath::from(Field::Name(field)))
321}