1use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::Stream;
12use futures::StreamExt;
13use futures::future::BoxFuture;
14use futures::stream::BoxStream;
15use itertools::Itertools;
16use vortex_array::ArrayRef;
17use vortex_array::dtype::DType;
18use vortex_array::dtype::Field;
19use vortex_array::dtype::FieldMask;
20use vortex_array::dtype::FieldName;
21use vortex_array::dtype::FieldPath;
22use vortex_array::expr::Expression;
23use vortex_array::expr::analysis::immediate_access::immediate_scope_access;
24use vortex_array::expr::root;
25use vortex_array::iter::ArrayIterator;
26use vortex_array::iter::ArrayIteratorAdapter;
27use vortex_array::stats::StatsSet;
28use vortex_array::stream::ArrayStream;
29use vortex_array::stream::ArrayStreamAdapter;
30use vortex_buffer::Buffer;
31use vortex_error::VortexExpect;
32use vortex_error::VortexResult;
33use vortex_error::vortex_bail;
34use vortex_io::runtime::BlockingRuntime;
35use vortex_io::runtime::Handle;
36use vortex_io::runtime::Task;
37use vortex_io::session::RuntimeSessionExt;
38use vortex_metrics::MetricsRegistry;
39use vortex_scan::selection::Selection;
40use vortex_session::VortexSession;
41use vortex_utils::parallelism::get_available_parallelism;
42
43use crate::LayoutReader;
44use crate::LayoutReaderRef;
45use crate::layouts::row_idx::RowIdxLayoutReader;
46use crate::scan::repeated_scan::RepeatedScan;
47use crate::scan::split_by::SplitBy;
48use crate::scan::splits::Splits;
49use crate::scan::splits::attempt_split_ranges;
50
51pub struct ScanBuilder<A> {
53 session: VortexSession,
54 layout_reader: LayoutReaderRef,
55 projection: Expression,
56 filter: Option<Expression>,
57 ordered: bool,
59 row_range: Option<Range<u64>>,
61 selection: Selection,
64 split_by: SplitBy,
66 concurrency: usize,
68 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
70 metrics_registry: Option<Arc<dyn MetricsRegistry>>,
71 file_stats: Option<Arc<[StatsSet]>>,
73 limit: Option<u64>,
75 row_offset: u64,
78}
79
80impl ScanBuilder<ArrayRef> {
81 pub fn new(session: VortexSession, layout_reader: Arc<dyn LayoutReader>) -> Self {
82 Self {
83 session,
84 layout_reader,
85 projection: root(),
86 filter: None,
87 ordered: true,
88 row_range: None,
89 selection: Default::default(),
90 split_by: SplitBy::Layout,
91 concurrency: 4,
94 map_fn: Arc::new(Ok),
95 metrics_registry: None,
96 file_stats: None,
97 limit: None,
98 row_offset: 0,
99 }
100 }
101
102 pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
106 let dtype = self.dtype()?;
107 let stream = self.into_stream()?;
108 Ok(ArrayStreamAdapter::new(dtype, stream))
109 }
110
111 pub fn into_array_iter<B: BlockingRuntime>(
113 self,
114 runtime: &B,
115 ) -> VortexResult<impl ArrayIterator + 'static> {
116 let stream = self.into_array_stream()?;
117 let dtype = stream.dtype().clone();
118 Ok(ArrayIteratorAdapter::new(
119 dtype,
120 runtime.block_on_stream(stream),
121 ))
122 }
123}
124
125impl<A: 'static + Send> ScanBuilder<A> {
126 pub fn with_filter(mut self, filter: Expression) -> Self {
127 self.filter = Some(filter);
128 self
129 }
130
131 pub fn with_some_filter(mut self, filter: Option<Expression>) -> Self {
132 self.filter = filter;
133 self
134 }
135
136 pub fn with_projection(mut self, projection: Expression) -> Self {
137 self.projection = projection;
138 self
139 }
140
141 pub fn ordered(&self) -> bool {
142 self.ordered
143 }
144
145 pub fn with_ordered(mut self, ordered: bool) -> Self {
146 self.ordered = ordered;
147 self
148 }
149
150 pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
151 self.row_range = Some(row_range);
152 self
153 }
154
155 pub fn with_selection(mut self, selection: Selection) -> Self {
156 self.selection = selection;
157 self
158 }
159
160 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
161 self.selection = Selection::IncludeByIndex(row_indices);
162 self
163 }
164
165 pub fn with_row_offset(mut self, row_offset: u64) -> Self {
166 self.row_offset = row_offset;
167 self
168 }
169
170 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
171 self.split_by = split_by;
172 self
173 }
174
175 pub fn concurrency(&self) -> usize {
176 self.concurrency
177 }
178
179 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
182 assert!(concurrency > 0);
183 self.concurrency = concurrency;
184 self
185 }
186
187 pub fn with_some_metrics_registry(mut self, metrics: Option<Arc<dyn MetricsRegistry>>) -> Self {
188 self.metrics_registry = metrics;
189 self
190 }
191
192 pub fn with_metrics_registry(mut self, metrics: Arc<dyn MetricsRegistry>) -> Self {
193 self.metrics_registry = Some(metrics);
194 self
195 }
196
197 pub fn with_some_limit(mut self, limit: Option<u64>) -> Self {
198 self.limit = limit;
199 self
200 }
201
202 pub fn with_limit(mut self, limit: u64) -> Self {
203 self.limit = Some(limit);
204 self
205 }
206
207 pub fn dtype(&self) -> VortexResult<DType> {
209 self.projection.return_dtype(self.layout_reader.dtype())
210 }
211
212 pub fn session(&self) -> &VortexSession {
214 &self.session
215 }
216
217 pub fn map<B: 'static>(
219 self,
220 map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
221 ) -> ScanBuilder<B> {
222 let old_map_fn = self.map_fn;
223 ScanBuilder {
224 session: self.session,
225 layout_reader: self.layout_reader,
226 projection: self.projection,
227 filter: self.filter,
228 ordered: self.ordered,
229 row_range: self.row_range,
230 selection: self.selection,
231 split_by: self.split_by,
232 concurrency: self.concurrency,
233 metrics_registry: self.metrics_registry,
234 file_stats: self.file_stats,
235 limit: self.limit,
236 row_offset: self.row_offset,
237 map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
238 }
239 }
240
241 pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
242 let dtype = self.dtype()?;
243
244 if self.filter.is_some() && self.limit.is_some() {
245 vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
246 }
247
248 let mut layout_reader = self.layout_reader;
251
252 layout_reader = Arc::new(RowIdxLayoutReader::new(
256 self.row_offset,
257 layout_reader,
258 self.session.clone(),
259 ));
260
261 let projection = self.projection.optimize_recursive(layout_reader.dtype())?;
263
264 let filter = self
265 .filter
266 .map(|f| f.optimize_recursive(layout_reader.dtype()))
267 .transpose()?;
268
269 let (filter_mask, projection_mask) =
271 filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
272 let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
273
274 let splits =
275 if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) {
276 Splits::Ranges(ranges)
277 } else {
278 let split_range = self
279 .row_range
280 .clone()
281 .unwrap_or_else(|| 0..layout_reader.row_count());
282 Splits::Natural(self.split_by.splits(
283 layout_reader.as_ref(),
284 &split_range,
285 &field_mask,
286 )?)
287 };
288
289 Ok(RepeatedScan::new(
290 self.session.clone(),
291 layout_reader,
292 projection,
293 filter,
294 self.ordered,
295 self.row_range,
296 self.selection,
297 splits,
298 self.concurrency,
299 self.map_fn,
300 self.limit,
301 dtype,
302 ))
303 }
304
305 pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
307 if self.limit.is_some_and(|l| l == 0) {
309 return Ok(vec![]);
310 }
311
312 self.prepare()?.execute(None)
313 }
314
315 pub fn into_stream(
317 self,
318 ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
319 Ok(LazyScanStream::new(self))
320 }
321
322 pub fn into_iter<B: BlockingRuntime>(
324 self,
325 runtime: &B,
326 ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
327 let stream = self.into_stream()?;
328 Ok(runtime.block_on_stream(stream))
329 }
330}
331
332enum LazyScanState<A: 'static + Send> {
333 Builder(Option<Box<ScanBuilder<A>>>),
334 Preparing(PreparingScan<A>),
335 Stream(BoxStream<'static, VortexResult<A>>),
336 Error(Option<vortex_error::VortexError>),
337}
338
339type PreparedScanTasks<A> = Vec<BoxFuture<'static, VortexResult<Option<A>>>>;
340
341struct PreparingScan<A: 'static + Send> {
342 ordered: bool,
343 concurrency: usize,
344 handle: Handle,
345 task: Task<VortexResult<PreparedScanTasks<A>>>,
346}
347
348struct LazyScanStream<A: 'static + Send> {
349 state: LazyScanState<A>,
350}
351
352impl<A: 'static + Send> LazyScanStream<A> {
353 fn new(builder: ScanBuilder<A>) -> Self {
354 Self {
355 state: LazyScanState::Builder(Some(Box::new(builder))),
356 }
357 }
358}
359
360impl<A: 'static + Send> Unpin for LazyScanStream<A> {}
361
362impl<A: 'static + Send> Stream for LazyScanStream<A> {
363 type Item = VortexResult<A>;
364
365 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
366 loop {
367 match &mut self.state {
368 LazyScanState::Builder(builder) => {
369 let builder = builder.take().vortex_expect("polled after completion");
370 let ordered = builder.ordered;
371 let num_workers = get_available_parallelism().unwrap_or(1);
372 let concurrency = builder.concurrency * num_workers;
373 let handle = builder.session.handle();
374 let task = handle.spawn_blocking(move || {
375 builder.prepare().and_then(|scan| scan.execute(None))
376 });
377 self.state = LazyScanState::Preparing(PreparingScan {
378 ordered,
379 concurrency,
380 handle,
381 task,
382 });
383 }
384 LazyScanState::Preparing(preparing) => {
385 match ready!(Pin::new(&mut preparing.task).poll(cx)) {
386 Ok(tasks) => {
387 let ordered = preparing.ordered;
388 let concurrency = preparing.concurrency;
389 let handle = preparing.handle.clone();
390 let stream =
391 futures::stream::iter(tasks).map(move |task| handle.spawn(task));
392 let stream = if ordered {
393 stream.buffered(concurrency).boxed()
394 } else {
395 stream.buffer_unordered(concurrency).boxed()
396 };
397 let stream = stream
398 .filter_map(|chunk| async move { chunk.transpose() })
399 .boxed();
400 self.state = LazyScanState::Stream(stream);
401 }
402 Err(err) => self.state = LazyScanState::Error(Some(err)),
403 }
404 }
405 LazyScanState::Stream(stream) => return stream.as_mut().poll_next(cx),
406 LazyScanState::Error(err) => return Poll::Ready(err.take().map(Err)),
407 }
408 }
409 }
410}
411
412pub fn filter_and_projection_masks(
416 projection: &Expression,
417 filter: Option<&Expression>,
418 dtype: &DType,
419) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
420 let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
421 return Ok(match filter {
422 Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
423 None => (Vec::new(), vec![FieldMask::All]),
424 });
425 };
426 let projection_mask = immediate_scope_access(projection, struct_dtype);
427 Ok(match filter {
428 None => (
429 Vec::new(),
430 projection_mask.into_iter().map(to_field_mask).collect_vec(),
431 ),
432 Some(f) => {
433 let filter_mask = immediate_scope_access(f, struct_dtype);
434 let only_projection_mask = projection_mask
435 .difference(&filter_mask)
436 .cloned()
437 .map(to_field_mask)
438 .collect_vec();
439 (
440 filter_mask.into_iter().map(to_field_mask).collect_vec(),
441 only_projection_mask,
442 )
443 }
444 })
445}
446
447fn to_field_mask(field: FieldName) -> FieldMask {
448 FieldMask::Prefix(FieldPath::from(Field::Name(field)))
449}
450
451#[cfg(test)]
452mod test {
453 use std::collections::BTreeSet;
454 use std::ops::Range;
455 use std::pin::Pin;
456 use std::sync::Arc;
457 use std::sync::atomic::AtomicUsize;
458 use std::sync::atomic::Ordering;
459 use std::sync::mpsc;
460 use std::task::Context;
461 use std::task::Poll;
462 use std::time::Duration;
463
464 use futures::Stream;
465 use futures::task::noop_waker_ref;
466 use parking_lot::Mutex;
467 use vortex_array::IntoArray;
468 use vortex_array::LEGACY_SESSION;
469 use vortex_array::MaskFuture;
470 use vortex_array::VortexSessionExecute;
471 use vortex_array::arrays::PrimitiveArray;
472 use vortex_array::dtype::DType;
473 use vortex_array::dtype::FieldMask;
474 use vortex_array::dtype::Nullability;
475 use vortex_array::dtype::PType;
476 use vortex_array::expr::Expression;
477 use vortex_error::VortexResult;
478 use vortex_error::vortex_err;
479 use vortex_io::runtime::BlockingRuntime;
480 use vortex_io::runtime::single::SingleThreadRuntime;
481 use vortex_mask::Mask;
482
483 use super::ScanBuilder;
484 use crate::ArrayFuture;
485 use crate::LayoutReader;
486 use crate::SplitRange;
487 use crate::scan::test::SCAN_SESSION;
488 use crate::scan::test::session_with_handle;
489
490 #[derive(Debug)]
491 struct CountingLayoutReader {
492 name: Arc<str>,
493 dtype: DType,
494 row_count: u64,
495 register_splits_calls: Arc<AtomicUsize>,
496 }
497
498 impl CountingLayoutReader {
499 fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
500 Self {
501 name: Arc::from("counting"),
502 dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
503 row_count: 1,
504 register_splits_calls,
505 }
506 }
507 }
508
509 impl LayoutReader for CountingLayoutReader {
510 fn name(&self) -> &Arc<str> {
511 &self.name
512 }
513
514 fn dtype(&self) -> &DType {
515 &self.dtype
516 }
517
518 fn row_count(&self) -> u64 {
519 self.row_count
520 }
521
522 fn register_splits(
523 &self,
524 _field_mask: &[FieldMask],
525 split_range: &SplitRange,
526 splits: &mut BTreeSet<u64>,
527 ) -> VortexResult<()> {
528 self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
529 splits.insert(split_range.root_row_range().end);
530 Ok(())
531 }
532
533 fn pruning_evaluation(
534 &self,
535 _row_range: &Range<u64>,
536 _expr: &Expression,
537 _mask: Mask,
538 ) -> VortexResult<MaskFuture> {
539 unimplemented!("not needed for this test");
540 }
541
542 fn filter_evaluation(
543 &self,
544 _row_range: &Range<u64>,
545 _expr: &Expression,
546 _mask: MaskFuture,
547 ) -> VortexResult<MaskFuture> {
548 unimplemented!("not needed for this test");
549 }
550
551 fn projection_evaluation(
552 &self,
553 _row_range: &Range<u64>,
554 _expr: &Expression,
555 _mask: MaskFuture,
556 ) -> VortexResult<ArrayFuture> {
557 Ok(Box::pin(async move {
558 unreachable!("scan should not be polled in this test")
559 }))
560 }
561
562 fn as_any(&self) -> &dyn std::any::Any {
563 self
564 }
565 }
566
567 #[test]
568 fn into_stream_is_lazy() {
569 let calls = Arc::new(AtomicUsize::new(0));
570 let reader = Arc::new(CountingLayoutReader::new(Arc::clone(&calls)));
571
572 let session = SCAN_SESSION.clone();
573
574 let _stream = ScanBuilder::new(session, reader).into_stream().unwrap();
575
576 assert_eq!(calls.load(Ordering::Relaxed), 0);
577 }
578
579 #[derive(Debug)]
580 struct SplittingLayoutReader {
581 name: Arc<str>,
582 dtype: DType,
583 row_count: u64,
584 register_splits_calls: Arc<AtomicUsize>,
585 }
586
587 impl SplittingLayoutReader {
588 fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
589 Self {
590 name: Arc::from("splitting"),
591 dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
592 row_count: 4,
593 register_splits_calls,
594 }
595 }
596 }
597
598 impl LayoutReader for SplittingLayoutReader {
599 fn name(&self) -> &Arc<str> {
600 &self.name
601 }
602
603 fn dtype(&self) -> &DType {
604 &self.dtype
605 }
606
607 fn row_count(&self) -> u64 {
608 self.row_count
609 }
610
611 fn register_splits(
612 &self,
613 _field_mask: &[FieldMask],
614 split_range: &SplitRange,
615 splits: &mut BTreeSet<u64>,
616 ) -> VortexResult<()> {
617 self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
618 for split in (split_range.row_range().start + 1)..=split_range.row_range().end {
619 splits.insert(split_range.row_offset() + split);
620 }
621 Ok(())
622 }
623
624 fn pruning_evaluation(
625 &self,
626 _row_range: &Range<u64>,
627 _expr: &Expression,
628 mask: Mask,
629 ) -> VortexResult<MaskFuture> {
630 Ok(MaskFuture::ready(mask))
631 }
632
633 fn filter_evaluation(
634 &self,
635 _row_range: &Range<u64>,
636 _expr: &Expression,
637 mask: MaskFuture,
638 ) -> VortexResult<MaskFuture> {
639 Ok(mask)
640 }
641
642 fn projection_evaluation(
643 &self,
644 row_range: &Range<u64>,
645 _expr: &Expression,
646 _mask: MaskFuture,
647 ) -> VortexResult<ArrayFuture> {
648 let start = usize::try_from(row_range.start)
649 .map_err(|_| vortex_err!("row_range.start must fit in usize"))?;
650 let end = usize::try_from(row_range.end)
651 .map_err(|_| vortex_err!("row_range.end must fit in usize"))?;
652
653 let values: VortexResult<Vec<i32>> = (start..end)
654 .map(|v| i32::try_from(v).map_err(|_| vortex_err!("split value must fit in i32")))
655 .collect();
656
657 let array = PrimitiveArray::from_iter(values?).into_array();
658 Ok(Box::pin(async move { Ok(array) }))
659 }
660
661 fn as_any(&self) -> &dyn std::any::Any {
662 self
663 }
664 }
665
666 #[test]
667 fn into_stream_executes_after_prepare() -> VortexResult<()> {
668 let mut ctx = LEGACY_SESSION.create_execution_ctx();
669 let calls = Arc::new(AtomicUsize::new(0));
670 let reader = Arc::new(SplittingLayoutReader::new(Arc::clone(&calls)));
671
672 let runtime = SingleThreadRuntime::default();
673 let session = session_with_handle(runtime.handle());
674
675 let stream = ScanBuilder::new(session, reader).into_stream()?;
676 let mut iter = runtime.block_on_stream(stream);
677
678 let mut values = Vec::new();
679 for chunk in &mut iter {
680 let prim = chunk?.execute::<PrimitiveArray>(&mut ctx)?;
681 values.push(prim.into_buffer::<i32>()[0]);
682 }
683
684 assert_eq!(calls.load(Ordering::Relaxed), 1);
685 assert_eq!(values.as_ref(), [0, 1, 2, 3]);
686
687 Ok(())
688 }
689
690 #[derive(Debug)]
691 struct BlockingSplitsLayoutReader {
692 name: Arc<str>,
693 dtype: DType,
694 row_count: u64,
695 register_splits_calls: Arc<AtomicUsize>,
696 gate: Arc<Mutex<()>>,
697 }
698
699 impl BlockingSplitsLayoutReader {
700 fn new(gate: Arc<Mutex<()>>, register_splits_calls: Arc<AtomicUsize>) -> Self {
701 Self {
702 name: Arc::from("blocking-splits"),
703 dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
704 row_count: 1,
705 register_splits_calls,
706 gate,
707 }
708 }
709 }
710
711 impl LayoutReader for BlockingSplitsLayoutReader {
712 fn name(&self) -> &Arc<str> {
713 &self.name
714 }
715
716 fn dtype(&self) -> &DType {
717 &self.dtype
718 }
719
720 fn row_count(&self) -> u64 {
721 self.row_count
722 }
723
724 fn register_splits(
725 &self,
726 _field_mask: &[FieldMask],
727 split_range: &SplitRange,
728 splits: &mut BTreeSet<u64>,
729 ) -> VortexResult<()> {
730 self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
731 let _guard = self.gate.lock();
732 splits.insert(split_range.root_row_range().end);
733 Ok(())
734 }
735
736 fn pruning_evaluation(
737 &self,
738 _row_range: &Range<u64>,
739 _expr: &Expression,
740 _mask: Mask,
741 ) -> VortexResult<MaskFuture> {
742 unimplemented!("not needed for this test");
743 }
744
745 fn filter_evaluation(
746 &self,
747 _row_range: &Range<u64>,
748 _expr: &Expression,
749 _mask: MaskFuture,
750 ) -> VortexResult<MaskFuture> {
751 unimplemented!("not needed for this test");
752 }
753
754 fn projection_evaluation(
755 &self,
756 _row_range: &Range<u64>,
757 _expr: &Expression,
758 _mask: MaskFuture,
759 ) -> VortexResult<ArrayFuture> {
760 Ok(Box::pin(async move {
761 unreachable!("scan should not be polled in this test")
762 }))
763 }
764
765 fn as_any(&self) -> &dyn std::any::Any {
766 self
767 }
768 }
769
770 #[test]
771 fn into_stream_first_poll_does_not_block() {
772 let gate = Arc::new(Mutex::new(()));
773 let guard = gate.lock();
774
775 let calls = Arc::new(AtomicUsize::new(0));
776 let reader = Arc::new(BlockingSplitsLayoutReader::new(
777 Arc::clone(&gate),
778 Arc::clone(&calls),
779 ));
780
781 let runtime = SingleThreadRuntime::default();
782 let session = session_with_handle(runtime.handle());
783
784 let mut stream = ScanBuilder::new(session, reader).into_stream().unwrap();
785
786 let (send, recv) = mpsc::channel::<bool>();
787 let join = std::thread::spawn(move || {
788 let waker = noop_waker_ref();
789 let mut cx = Context::from_waker(waker);
790 let poll = Pin::new(&mut stream).poll_next(&mut cx);
791 let _ = send.send(matches!(poll, Poll::Pending));
792 });
793
794 let polled_pending = recv.recv_timeout(Duration::from_secs(1)).ok();
795
796 drop(guard);
798 drop(join.join());
799
800 let polled_pending = polled_pending.expect("poll_next blocked; expected quick return");
801 assert!(
802 polled_pending,
803 "expected Poll::Pending while prepare is blocked"
804 );
805 assert_eq!(calls.load(Ordering::Relaxed), 0);
806
807 drop(runtime);
808 }
809}