1use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::Stream;
12use futures::StreamExt;
13use futures::future::BoxFuture;
14use futures::stream::BoxStream;
15use itertools::Itertools;
16use vortex_array::ArrayRef;
17use vortex_array::dtype::DType;
18use vortex_array::dtype::Field;
19use vortex_array::dtype::FieldMask;
20use vortex_array::dtype::FieldName;
21use vortex_array::dtype::FieldPath;
22use vortex_array::expr::Expression;
23use vortex_array::expr::analysis::immediate_access::immediate_scope_access;
24use vortex_array::expr::root;
25use vortex_array::iter::ArrayIterator;
26use vortex_array::iter::ArrayIteratorAdapter;
27use vortex_array::stats::StatsSet;
28use vortex_array::stream::ArrayStream;
29use vortex_array::stream::ArrayStreamAdapter;
30use vortex_buffer::Buffer;
31use vortex_error::VortexExpect;
32use vortex_error::VortexResult;
33use vortex_error::vortex_bail;
34use vortex_io::runtime::BlockingRuntime;
35use vortex_io::runtime::Handle;
36use vortex_io::runtime::Task;
37use vortex_io::session::RuntimeSessionExt;
38use vortex_metrics::MetricsRegistry;
39use vortex_scan::selection::Selection;
40use vortex_session::VortexSession;
41use vortex_utils::parallelism::get_available_parallelism;
42
43use crate::LayoutReader;
44use crate::LayoutReaderRef;
45use crate::layouts::row_idx::RowIdxLayoutReader;
46use crate::scan::repeated_scan::RepeatedScan;
47use crate::scan::split_by::SplitBy;
48use crate::scan::splits::Splits;
49use crate::scan::splits::attempt_split_ranges;
50
51pub struct ScanBuilder<A> {
53 session: VortexSession,
54 layout_reader: LayoutReaderRef,
55 projection: Expression,
56 filter: Option<Expression>,
57 ordered: bool,
59 row_range: Option<Range<u64>>,
61 selection: Selection,
64 split_by: SplitBy,
66 concurrency: usize,
68 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
70 metrics_registry: Option<Arc<dyn MetricsRegistry>>,
71 file_stats: Option<Arc<[StatsSet]>>,
73 limit: Option<u64>,
75 row_offset: u64,
78}
79
80impl ScanBuilder<ArrayRef> {
81 pub fn new(session: VortexSession, layout_reader: Arc<dyn LayoutReader>) -> Self {
82 Self {
83 session,
84 layout_reader,
85 projection: root(),
86 filter: None,
87 ordered: true,
88 row_range: None,
89 selection: Default::default(),
90 split_by: SplitBy::Layout,
91 concurrency: 4,
94 map_fn: Arc::new(Ok),
95 metrics_registry: None,
96 file_stats: None,
97 limit: None,
98 row_offset: 0,
99 }
100 }
101
102 pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
106 let dtype = self.dtype()?;
107 let stream = self.into_stream()?;
108 Ok(ArrayStreamAdapter::new(dtype, stream))
109 }
110
111 pub fn into_array_iter<B: BlockingRuntime>(
113 self,
114 runtime: &B,
115 ) -> VortexResult<impl ArrayIterator + 'static> {
116 let stream = self.into_array_stream()?;
117 let dtype = stream.dtype().clone();
118 Ok(ArrayIteratorAdapter::new(
119 dtype,
120 runtime.block_on_stream(stream),
121 ))
122 }
123}
124
125impl<A: 'static + Send> ScanBuilder<A> {
126 pub fn with_filter(mut self, filter: Expression) -> Self {
127 self.filter = Some(filter);
128 self
129 }
130
131 pub fn with_some_filter(mut self, filter: Option<Expression>) -> Self {
132 self.filter = filter;
133 self
134 }
135
136 pub fn with_projection(mut self, projection: Expression) -> Self {
137 self.projection = projection;
138 self
139 }
140
141 pub fn ordered(&self) -> bool {
142 self.ordered
143 }
144
145 pub fn with_ordered(mut self, ordered: bool) -> Self {
146 self.ordered = ordered;
147 self
148 }
149
150 pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
151 self.row_range = Some(row_range);
152 self
153 }
154
155 pub fn with_selection(mut self, selection: Selection) -> Self {
156 self.selection = selection;
157 self
158 }
159
160 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
161 self.selection = Selection::IncludeByIndex(row_indices);
162 self
163 }
164
165 pub fn with_row_offset(mut self, row_offset: u64) -> Self {
166 self.row_offset = row_offset;
167 self
168 }
169
170 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
171 self.split_by = split_by;
172 self
173 }
174
175 pub fn concurrency(&self) -> usize {
176 self.concurrency
177 }
178
179 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
182 assert!(concurrency > 0);
183 self.concurrency = concurrency;
184 self
185 }
186
187 pub fn with_some_metrics_registry(mut self, metrics: Option<Arc<dyn MetricsRegistry>>) -> Self {
188 self.metrics_registry = metrics;
189 self
190 }
191
192 pub fn with_metrics_registry(mut self, metrics: Arc<dyn MetricsRegistry>) -> Self {
193 self.metrics_registry = Some(metrics);
194 self
195 }
196
197 pub fn with_some_limit(mut self, limit: Option<u64>) -> Self {
198 self.limit = limit;
199 self
200 }
201
202 pub fn with_limit(mut self, limit: u64) -> Self {
203 self.limit = Some(limit);
204 self
205 }
206
207 pub fn dtype(&self) -> VortexResult<DType> {
209 self.projection.return_dtype(self.layout_reader.dtype())
210 }
211
212 pub fn session(&self) -> &VortexSession {
214 &self.session
215 }
216
217 pub fn map<B: 'static>(
219 self,
220 map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
221 ) -> ScanBuilder<B> {
222 let old_map_fn = self.map_fn;
223 ScanBuilder {
224 session: self.session,
225 layout_reader: self.layout_reader,
226 projection: self.projection,
227 filter: self.filter,
228 ordered: self.ordered,
229 row_range: self.row_range,
230 selection: self.selection,
231 split_by: self.split_by,
232 concurrency: self.concurrency,
233 metrics_registry: self.metrics_registry,
234 file_stats: self.file_stats,
235 limit: self.limit,
236 row_offset: self.row_offset,
237 map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
238 }
239 }
240
241 pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
242 let dtype = self.dtype()?;
243
244 if self.filter.is_some() && self.limit.is_some() {
245 vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
246 }
247
248 let mut layout_reader = self.layout_reader;
251
252 layout_reader = Arc::new(RowIdxLayoutReader::new(
256 self.row_offset,
257 layout_reader,
258 self.session.clone(),
259 ));
260
261 let projection = self.projection.optimize_recursive(layout_reader.dtype())?;
263
264 let filter = self
265 .filter
266 .map(|f| f.optimize_recursive(layout_reader.dtype()))
267 .transpose()?;
268
269 let (filter_mask, projection_mask) =
271 filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
272 let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
273
274 let splits =
275 if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) {
276 Splits::Ranges(ranges)
277 } else {
278 let split_range = self
279 .row_range
280 .clone()
281 .unwrap_or_else(|| 0..layout_reader.row_count());
282 Splits::Natural(self.split_by.splits(
283 layout_reader.as_ref(),
284 &split_range,
285 &field_mask,
286 )?)
287 };
288
289 Ok(RepeatedScan::new(
290 self.session.clone(),
291 layout_reader,
292 projection,
293 filter,
294 self.ordered,
295 self.row_range,
296 self.selection,
297 splits,
298 self.concurrency,
299 self.map_fn,
300 self.limit,
301 dtype,
302 ))
303 }
304
305 pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
307 if self.limit.is_some_and(|l| l == 0) {
309 return Ok(vec![]);
310 }
311
312 self.prepare()?.execute(None)
313 }
314
315 pub fn into_stream(
317 self,
318 ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
319 Ok(LazyScanStream::new(self))
320 }
321
322 pub fn into_iter<B: BlockingRuntime>(
324 self,
325 runtime: &B,
326 ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
327 let stream = self.into_stream()?;
328 Ok(runtime.block_on_stream(stream))
329 }
330}
331
332enum LazyScanState<A: 'static + Send> {
333 Builder(Option<Box<ScanBuilder<A>>>),
334 Preparing(PreparingScan<A>),
335 Stream(BoxStream<'static, VortexResult<A>>),
336 Error(Option<vortex_error::VortexError>),
337}
338
339type PreparedScanTasks<A> = Vec<BoxFuture<'static, VortexResult<Option<A>>>>;
340
341struct PreparingScan<A: 'static + Send> {
342 ordered: bool,
343 concurrency: usize,
344 handle: Handle,
345 task: Task<VortexResult<PreparedScanTasks<A>>>,
346}
347
348struct LazyScanStream<A: 'static + Send> {
349 state: LazyScanState<A>,
350}
351
352impl<A: 'static + Send> LazyScanStream<A> {
353 fn new(builder: ScanBuilder<A>) -> Self {
354 Self {
355 state: LazyScanState::Builder(Some(Box::new(builder))),
356 }
357 }
358}
359
360impl<A: 'static + Send> Unpin for LazyScanStream<A> {}
361
362impl<A: 'static + Send> Stream for LazyScanStream<A> {
363 type Item = VortexResult<A>;
364
365 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
366 loop {
367 match &mut self.state {
368 LazyScanState::Builder(builder) => {
369 let builder = builder.take().vortex_expect("polled after completion");
370 let ordered = builder.ordered;
371 let num_workers = get_available_parallelism().unwrap_or(1);
372 let concurrency = builder.concurrency * num_workers;
373 let handle = builder.session.handle();
374 let task = handle.spawn_blocking(move || {
375 builder.prepare().and_then(|scan| scan.execute(None))
376 });
377 self.state = LazyScanState::Preparing(PreparingScan {
378 ordered,
379 concurrency,
380 handle,
381 task,
382 });
383 }
384 LazyScanState::Preparing(preparing) => {
385 match ready!(Pin::new(&mut preparing.task).poll(cx)) {
386 Ok(tasks) => {
387 let ordered = preparing.ordered;
388 let concurrency = preparing.concurrency;
389 let handle = preparing.handle.clone();
390 let stream =
391 futures::stream::iter(tasks).map(move |task| handle.spawn(task));
392 let stream = if ordered {
393 stream.buffered(concurrency).boxed()
394 } else {
395 stream.buffer_unordered(concurrency).boxed()
396 };
397 let stream = stream
398 .filter_map(|chunk| async move { chunk.transpose() })
399 .boxed();
400 self.state = LazyScanState::Stream(stream);
401 }
402 Err(err) => self.state = LazyScanState::Error(Some(err)),
403 }
404 }
405 LazyScanState::Stream(stream) => return stream.as_mut().poll_next(cx),
406 LazyScanState::Error(err) => return Poll::Ready(err.take().map(Err)),
407 }
408 }
409 }
410}
411
412pub fn filter_and_projection_masks(
416 projection: &Expression,
417 filter: Option<&Expression>,
418 dtype: &DType,
419) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
420 let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
421 return Ok(match filter {
422 Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
423 None => (Vec::new(), vec![FieldMask::All]),
424 });
425 };
426 let projection_mask = immediate_scope_access(projection, struct_dtype);
427 Ok(match filter {
428 None => (
429 Vec::new(),
430 projection_mask.into_iter().map(to_field_mask).collect_vec(),
431 ),
432 Some(f) => {
433 let filter_mask = immediate_scope_access(f, struct_dtype);
434 let only_projection_mask = projection_mask
435 .difference(&filter_mask)
436 .cloned()
437 .map(to_field_mask)
438 .collect_vec();
439 (
440 filter_mask.into_iter().map(to_field_mask).collect_vec(),
441 only_projection_mask,
442 )
443 }
444 })
445}
446
447fn to_field_mask(field: FieldName) -> FieldMask {
448 FieldMask::Prefix(FieldPath::from(Field::Name(field)))
449}
450
451#[cfg(test)]
452mod test {
453 use std::collections::BTreeSet;
454 use std::ops::Range;
455 use std::pin::Pin;
456 use std::sync::Arc;
457 use std::sync::atomic::AtomicUsize;
458 use std::sync::atomic::Ordering;
459 use std::task::Context;
460 use std::task::Poll;
461 use std::time::Duration;
462
463 use futures::Stream;
464 use futures::task::noop_waker_ref;
465 use parking_lot::Mutex;
466 use vortex_array::IntoArray;
467 use vortex_array::LEGACY_SESSION;
468 use vortex_array::MaskFuture;
469 use vortex_array::VortexSessionExecute;
470 use vortex_array::arrays::PrimitiveArray;
471 use vortex_array::dtype::DType;
472 use vortex_array::dtype::FieldMask;
473 use vortex_array::dtype::Nullability;
474 use vortex_array::dtype::PType;
475 use vortex_array::expr::Expression;
476 use vortex_error::VortexResult;
477 use vortex_error::vortex_err;
478 use vortex_io::runtime::BlockingRuntime;
479 use vortex_io::runtime::single::SingleThreadRuntime;
480 use vortex_mask::Mask;
481
482 use super::ScanBuilder;
483 use crate::ArrayFuture;
484 use crate::LayoutReader;
485
486 #[derive(Debug)]
487 struct CountingLayoutReader {
488 name: Arc<str>,
489 dtype: DType,
490 row_count: u64,
491 register_splits_calls: Arc<AtomicUsize>,
492 }
493
494 impl CountingLayoutReader {
495 fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
496 Self {
497 name: Arc::from("counting"),
498 dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
499 row_count: 1,
500 register_splits_calls,
501 }
502 }
503 }
504
505 impl LayoutReader for CountingLayoutReader {
506 fn name(&self) -> &Arc<str> {
507 &self.name
508 }
509
510 fn dtype(&self) -> &DType {
511 &self.dtype
512 }
513
514 fn row_count(&self) -> u64 {
515 self.row_count
516 }
517
518 fn register_splits(
519 &self,
520 _field_mask: &[FieldMask],
521 row_range: &Range<u64>,
522 splits: &mut BTreeSet<u64>,
523 ) -> VortexResult<()> {
524 self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
525 splits.insert(row_range.end);
526 Ok(())
527 }
528
529 fn pruning_evaluation(
530 &self,
531 _row_range: &Range<u64>,
532 _expr: &Expression,
533 _mask: Mask,
534 ) -> VortexResult<MaskFuture> {
535 unimplemented!("not needed for this test");
536 }
537
538 fn filter_evaluation(
539 &self,
540 _row_range: &Range<u64>,
541 _expr: &Expression,
542 _mask: MaskFuture,
543 ) -> VortexResult<MaskFuture> {
544 unimplemented!("not needed for this test");
545 }
546
547 fn projection_evaluation(
548 &self,
549 _row_range: &Range<u64>,
550 _expr: &Expression,
551 _mask: MaskFuture,
552 ) -> VortexResult<ArrayFuture> {
553 Ok(Box::pin(async move {
554 unreachable!("scan should not be polled in this test")
555 }))
556 }
557
558 fn as_any(&self) -> &dyn std::any::Any {
559 self
560 }
561 }
562
563 #[test]
564 fn into_stream_is_lazy() {
565 let calls = Arc::new(AtomicUsize::new(0));
566 let reader = Arc::new(CountingLayoutReader::new(Arc::clone(&calls)));
567
568 let session = crate::scan::test::SCAN_SESSION.clone();
569
570 let _stream = ScanBuilder::new(session, reader).into_stream().unwrap();
571
572 assert_eq!(calls.load(Ordering::Relaxed), 0);
573 }
574
575 #[derive(Debug)]
576 struct SplittingLayoutReader {
577 name: Arc<str>,
578 dtype: DType,
579 row_count: u64,
580 register_splits_calls: Arc<AtomicUsize>,
581 }
582
583 impl SplittingLayoutReader {
584 fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
585 Self {
586 name: Arc::from("splitting"),
587 dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
588 row_count: 4,
589 register_splits_calls,
590 }
591 }
592 }
593
594 impl LayoutReader for SplittingLayoutReader {
595 fn name(&self) -> &Arc<str> {
596 &self.name
597 }
598
599 fn dtype(&self) -> &DType {
600 &self.dtype
601 }
602
603 fn row_count(&self) -> u64 {
604 self.row_count
605 }
606
607 fn register_splits(
608 &self,
609 _field_mask: &[FieldMask],
610 row_range: &Range<u64>,
611 splits: &mut BTreeSet<u64>,
612 ) -> VortexResult<()> {
613 self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
614 for split in (row_range.start + 1)..=row_range.end {
615 splits.insert(split);
616 }
617 Ok(())
618 }
619
620 fn pruning_evaluation(
621 &self,
622 _row_range: &Range<u64>,
623 _expr: &Expression,
624 mask: Mask,
625 ) -> VortexResult<MaskFuture> {
626 Ok(MaskFuture::ready(mask))
627 }
628
629 fn filter_evaluation(
630 &self,
631 _row_range: &Range<u64>,
632 _expr: &Expression,
633 mask: MaskFuture,
634 ) -> VortexResult<MaskFuture> {
635 Ok(mask)
636 }
637
638 fn projection_evaluation(
639 &self,
640 row_range: &Range<u64>,
641 _expr: &Expression,
642 _mask: MaskFuture,
643 ) -> VortexResult<ArrayFuture> {
644 let start = usize::try_from(row_range.start)
645 .map_err(|_| vortex_err!("row_range.start must fit in usize"))?;
646 let end = usize::try_from(row_range.end)
647 .map_err(|_| vortex_err!("row_range.end must fit in usize"))?;
648
649 let values: VortexResult<Vec<i32>> = (start..end)
650 .map(|v| i32::try_from(v).map_err(|_| vortex_err!("split value must fit in i32")))
651 .collect();
652
653 let array = PrimitiveArray::from_iter(values?).into_array();
654 Ok(Box::pin(async move { Ok(array) }))
655 }
656
657 fn as_any(&self) -> &dyn std::any::Any {
658 self
659 }
660 }
661
662 #[test]
663 fn into_stream_executes_after_prepare() -> VortexResult<()> {
664 let mut ctx = LEGACY_SESSION.create_execution_ctx();
665 let calls = Arc::new(AtomicUsize::new(0));
666 let reader = Arc::new(SplittingLayoutReader::new(Arc::clone(&calls)));
667
668 let runtime = SingleThreadRuntime::default();
669 let session = crate::scan::test::session_with_handle(runtime.handle());
670
671 let stream = ScanBuilder::new(session, reader).into_stream()?;
672 let mut iter = runtime.block_on_stream(stream);
673
674 let mut values = Vec::new();
675 for chunk in &mut iter {
676 let prim = chunk?.execute::<PrimitiveArray>(&mut ctx)?;
677 values.push(prim.into_buffer::<i32>()[0]);
678 }
679
680 assert_eq!(calls.load(Ordering::Relaxed), 1);
681 assert_eq!(values.as_ref(), [0, 1, 2, 3]);
682
683 Ok(())
684 }
685
686 #[derive(Debug)]
687 struct BlockingSplitsLayoutReader {
688 name: Arc<str>,
689 dtype: DType,
690 row_count: u64,
691 register_splits_calls: Arc<AtomicUsize>,
692 gate: Arc<Mutex<()>>,
693 }
694
695 impl BlockingSplitsLayoutReader {
696 fn new(gate: Arc<Mutex<()>>, register_splits_calls: Arc<AtomicUsize>) -> Self {
697 Self {
698 name: Arc::from("blocking-splits"),
699 dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
700 row_count: 1,
701 register_splits_calls,
702 gate,
703 }
704 }
705 }
706
707 impl LayoutReader for BlockingSplitsLayoutReader {
708 fn name(&self) -> &Arc<str> {
709 &self.name
710 }
711
712 fn dtype(&self) -> &DType {
713 &self.dtype
714 }
715
716 fn row_count(&self) -> u64 {
717 self.row_count
718 }
719
720 fn register_splits(
721 &self,
722 _field_mask: &[FieldMask],
723 row_range: &Range<u64>,
724 splits: &mut BTreeSet<u64>,
725 ) -> VortexResult<()> {
726 self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
727 let _guard = self.gate.lock();
728 splits.insert(row_range.end);
729 Ok(())
730 }
731
732 fn pruning_evaluation(
733 &self,
734 _row_range: &Range<u64>,
735 _expr: &Expression,
736 _mask: Mask,
737 ) -> VortexResult<MaskFuture> {
738 unimplemented!("not needed for this test");
739 }
740
741 fn filter_evaluation(
742 &self,
743 _row_range: &Range<u64>,
744 _expr: &Expression,
745 _mask: MaskFuture,
746 ) -> VortexResult<MaskFuture> {
747 unimplemented!("not needed for this test");
748 }
749
750 fn projection_evaluation(
751 &self,
752 _row_range: &Range<u64>,
753 _expr: &Expression,
754 _mask: MaskFuture,
755 ) -> VortexResult<ArrayFuture> {
756 Ok(Box::pin(async move {
757 unreachable!("scan should not be polled in this test")
758 }))
759 }
760
761 fn as_any(&self) -> &dyn std::any::Any {
762 self
763 }
764 }
765
766 #[test]
767 fn into_stream_first_poll_does_not_block() {
768 let gate = Arc::new(Mutex::new(()));
769 let guard = gate.lock();
770
771 let calls = Arc::new(AtomicUsize::new(0));
772 let reader = Arc::new(BlockingSplitsLayoutReader::new(
773 Arc::clone(&gate),
774 Arc::clone(&calls),
775 ));
776
777 let runtime = SingleThreadRuntime::default();
778 let session = crate::scan::test::session_with_handle(runtime.handle());
779
780 let mut stream = ScanBuilder::new(session, reader).into_stream().unwrap();
781
782 let (send, recv) = std::sync::mpsc::channel::<bool>();
783 let join = std::thread::spawn(move || {
784 let waker = noop_waker_ref();
785 let mut cx = Context::from_waker(waker);
786 let poll = Pin::new(&mut stream).poll_next(&mut cx);
787 let _ = send.send(matches!(poll, Poll::Pending));
788 });
789
790 let polled_pending = recv.recv_timeout(Duration::from_secs(1)).ok();
791
792 drop(guard);
794 drop(join.join());
795
796 let polled_pending = polled_pending.expect("poll_next blocked; expected quick return");
797 assert!(
798 polled_pending,
799 "expected Poll::Pending while prepare is blocked"
800 );
801 assert_eq!(calls.load(Ordering::Relaxed), 0);
802
803 drop(runtime);
804 }
805}