1use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::Stream;
12use futures::StreamExt;
13use futures::future::BoxFuture;
14use futures::stream::BoxStream;
15use itertools::Itertools;
16use vortex_array::ArrayRef;
17use vortex_array::dtype::DType;
18use vortex_array::dtype::Field;
19use vortex_array::dtype::FieldMask;
20use vortex_array::dtype::FieldName;
21use vortex_array::dtype::FieldPath;
22use vortex_array::expr::Expression;
23use vortex_array::expr::analysis::immediate_access::immediate_scope_access;
24use vortex_array::expr::root;
25use vortex_array::iter::ArrayIterator;
26use vortex_array::iter::ArrayIteratorAdapter;
27use vortex_array::stats::StatsSet;
28use vortex_array::stream::ArrayStream;
29use vortex_array::stream::ArrayStreamAdapter;
30use vortex_buffer::Buffer;
31use vortex_error::VortexExpect;
32use vortex_error::VortexResult;
33use vortex_error::vortex_bail;
34use vortex_io::runtime::BlockingRuntime;
35use vortex_io::runtime::Handle;
36use vortex_io::runtime::Task;
37use vortex_io::session::RuntimeSessionExt;
38use vortex_layout::LayoutReader;
39use vortex_layout::LayoutReaderRef;
40use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
41use vortex_metrics::MetricsRegistry;
42use vortex_session::VortexSession;
43
44use crate::RepeatedScan;
45use crate::selection::Selection;
46use crate::split_by::SplitBy;
47use crate::splits::Splits;
48use crate::splits::attempt_split_ranges;
49
50pub struct ScanBuilder<A> {
52 session: VortexSession,
53 layout_reader: LayoutReaderRef,
54 projection: Expression,
55 filter: Option<Expression>,
56 ordered: bool,
58 row_range: Option<Range<u64>>,
60 selection: Selection,
63 split_by: SplitBy,
65 concurrency: usize,
67 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
69 metrics_registry: Option<Arc<dyn MetricsRegistry>>,
70 file_stats: Option<Arc<[StatsSet]>>,
72 limit: Option<u64>,
74 row_offset: u64,
77}
78
79impl ScanBuilder<ArrayRef> {
80 pub fn new(session: VortexSession, layout_reader: Arc<dyn LayoutReader>) -> Self {
81 Self {
82 session,
83 layout_reader,
84 projection: root(),
85 filter: None,
86 ordered: true,
87 row_range: None,
88 selection: Default::default(),
89 split_by: SplitBy::Layout,
90 concurrency: 4,
93 map_fn: Arc::new(Ok),
94 metrics_registry: None,
95 file_stats: None,
96 limit: None,
97 row_offset: 0,
98 }
99 }
100
101 pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
105 let dtype = self.dtype()?;
106 let stream = self.into_stream()?;
107 Ok(ArrayStreamAdapter::new(dtype, stream))
108 }
109
110 pub fn into_array_iter<B: BlockingRuntime>(
112 self,
113 runtime: &B,
114 ) -> VortexResult<impl ArrayIterator + 'static> {
115 let stream = self.into_array_stream()?;
116 let dtype = stream.dtype().clone();
117 Ok(ArrayIteratorAdapter::new(
118 dtype,
119 runtime.block_on_stream(stream),
120 ))
121 }
122}
123
124impl<A: 'static + Send> ScanBuilder<A> {
125 pub fn with_filter(mut self, filter: Expression) -> Self {
126 self.filter = Some(filter);
127 self
128 }
129
130 pub fn with_some_filter(mut self, filter: Option<Expression>) -> Self {
131 self.filter = filter;
132 self
133 }
134
135 pub fn with_projection(mut self, projection: Expression) -> Self {
136 self.projection = projection;
137 self
138 }
139
140 pub fn ordered(&self) -> bool {
141 self.ordered
142 }
143
144 pub fn with_ordered(mut self, ordered: bool) -> Self {
145 self.ordered = ordered;
146 self
147 }
148
149 pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
150 self.row_range = Some(row_range);
151 self
152 }
153
154 pub fn with_selection(mut self, selection: Selection) -> Self {
155 self.selection = selection;
156 self
157 }
158
159 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
160 self.selection = Selection::IncludeByIndex(row_indices);
161 self
162 }
163
164 pub fn with_row_offset(mut self, row_offset: u64) -> Self {
165 self.row_offset = row_offset;
166 self
167 }
168
169 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
170 self.split_by = split_by;
171 self
172 }
173
174 pub fn concurrency(&self) -> usize {
175 self.concurrency
176 }
177
178 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
181 assert!(concurrency > 0);
182 self.concurrency = concurrency;
183 self
184 }
185
186 pub fn with_some_metrics_registry(mut self, metrics: Option<Arc<dyn MetricsRegistry>>) -> Self {
187 self.metrics_registry = metrics;
188 self
189 }
190
191 pub fn with_metrics_registry(mut self, metrics: Arc<dyn MetricsRegistry>) -> Self {
192 self.metrics_registry = Some(metrics);
193 self
194 }
195
196 pub fn with_some_limit(mut self, limit: Option<u64>) -> Self {
197 self.limit = limit;
198 self
199 }
200
201 pub fn with_limit(mut self, limit: u64) -> Self {
202 self.limit = Some(limit);
203 self
204 }
205
206 pub fn dtype(&self) -> VortexResult<DType> {
208 self.projection.return_dtype(self.layout_reader.dtype())
209 }
210
211 pub fn session(&self) -> &VortexSession {
213 &self.session
214 }
215
216 pub fn map<B: 'static>(
218 self,
219 map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
220 ) -> ScanBuilder<B> {
221 let old_map_fn = self.map_fn;
222 ScanBuilder {
223 session: self.session,
224 layout_reader: self.layout_reader,
225 projection: self.projection,
226 filter: self.filter,
227 ordered: self.ordered,
228 row_range: self.row_range,
229 selection: self.selection,
230 split_by: self.split_by,
231 concurrency: self.concurrency,
232 metrics_registry: self.metrics_registry,
233 file_stats: self.file_stats,
234 limit: self.limit,
235 row_offset: self.row_offset,
236 map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
237 }
238 }
239
240 pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
241 let dtype = self.dtype()?;
242
243 if self.filter.is_some() && self.limit.is_some() {
244 vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
245 }
246
247 let mut layout_reader = self.layout_reader;
250
251 layout_reader = Arc::new(RowIdxLayoutReader::new(
255 self.row_offset,
256 layout_reader,
257 self.session.clone(),
258 ));
259
260 let projection = self.projection.optimize_recursive(layout_reader.dtype())?;
262
263 let filter = self
264 .filter
265 .map(|f| f.optimize_recursive(layout_reader.dtype()))
266 .transpose()?;
267
268 let (filter_mask, projection_mask) =
270 filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
271 let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
272
273 let splits =
274 if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) {
275 Splits::Ranges(ranges)
276 } else {
277 let split_range = self
278 .row_range
279 .clone()
280 .unwrap_or_else(|| 0..layout_reader.row_count());
281 Splits::Natural(self.split_by.splits(
282 layout_reader.as_ref(),
283 &split_range,
284 &field_mask,
285 )?)
286 };
287
288 Ok(RepeatedScan::new(
289 self.session.clone(),
290 layout_reader,
291 projection,
292 filter,
293 self.ordered,
294 self.row_range,
295 self.selection,
296 splits,
297 self.concurrency,
298 self.map_fn,
299 self.limit,
300 dtype,
301 ))
302 }
303
304 pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
306 if self.limit.is_some_and(|l| l == 0) {
308 return Ok(vec![]);
309 }
310
311 self.prepare()?.execute(None)
312 }
313
314 pub fn into_stream(
316 self,
317 ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
318 Ok(LazyScanStream::new(self))
319 }
320
321 pub fn into_iter<B: BlockingRuntime>(
323 self,
324 runtime: &B,
325 ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
326 let stream = self.into_stream()?;
327 Ok(runtime.block_on_stream(stream))
328 }
329}
330
331enum LazyScanState<A: 'static + Send> {
332 Builder(Option<Box<ScanBuilder<A>>>),
333 Preparing(PreparingScan<A>),
334 Stream(BoxStream<'static, VortexResult<A>>),
335 Error(Option<vortex_error::VortexError>),
336}
337
338type PreparedScanTasks<A> = Vec<BoxFuture<'static, VortexResult<Option<A>>>>;
339
340struct PreparingScan<A: 'static + Send> {
341 ordered: bool,
342 concurrency: usize,
343 handle: Handle,
344 task: Task<VortexResult<PreparedScanTasks<A>>>,
345}
346
347struct LazyScanStream<A: 'static + Send> {
348 state: LazyScanState<A>,
349}
350
351impl<A: 'static + Send> LazyScanStream<A> {
352 fn new(builder: ScanBuilder<A>) -> Self {
353 Self {
354 state: LazyScanState::Builder(Some(Box::new(builder))),
355 }
356 }
357}
358
359impl<A: 'static + Send> Unpin for LazyScanStream<A> {}
360
361impl<A: 'static + Send> Stream for LazyScanStream<A> {
362 type Item = VortexResult<A>;
363
364 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
365 loop {
366 match &mut self.state {
367 LazyScanState::Builder(builder) => {
368 let builder = builder.take().vortex_expect("polled after completion");
369 let ordered = builder.ordered;
370 let num_workers = std::thread::available_parallelism()
371 .map(|n| n.get())
372 .unwrap_or(1);
373 let concurrency = builder.concurrency * num_workers;
374 let handle = builder.session.handle();
375 let task = handle.spawn_blocking(move || {
376 builder.prepare().and_then(|scan| scan.execute(None))
377 });
378 self.state = LazyScanState::Preparing(PreparingScan {
379 ordered,
380 concurrency,
381 handle,
382 task,
383 });
384 }
385 LazyScanState::Preparing(preparing) => {
386 match ready!(Pin::new(&mut preparing.task).poll(cx)) {
387 Ok(tasks) => {
388 let ordered = preparing.ordered;
389 let concurrency = preparing.concurrency;
390 let handle = preparing.handle.clone();
391 let stream =
392 futures::stream::iter(tasks).map(move |task| handle.spawn(task));
393 let stream = if ordered {
394 stream.buffered(concurrency).boxed()
395 } else {
396 stream.buffer_unordered(concurrency).boxed()
397 };
398 let stream = stream
399 .filter_map(|chunk| async move { chunk.transpose() })
400 .boxed();
401 self.state = LazyScanState::Stream(stream);
402 }
403 Err(err) => self.state = LazyScanState::Error(Some(err)),
404 }
405 }
406 LazyScanState::Stream(stream) => return stream.as_mut().poll_next(cx),
407 LazyScanState::Error(err) => return Poll::Ready(err.take().map(Err)),
408 }
409 }
410 }
411}
412
413pub(crate) fn filter_and_projection_masks(
417 projection: &Expression,
418 filter: Option<&Expression>,
419 dtype: &DType,
420) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
421 let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
422 return Ok(match filter {
423 Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
424 None => (Vec::new(), vec![FieldMask::All]),
425 });
426 };
427 let projection_mask = immediate_scope_access(projection, struct_dtype);
428 Ok(match filter {
429 None => (
430 Vec::new(),
431 projection_mask.into_iter().map(to_field_mask).collect_vec(),
432 ),
433 Some(f) => {
434 let filter_mask = immediate_scope_access(f, struct_dtype);
435 let only_projection_mask = projection_mask
436 .difference(&filter_mask)
437 .cloned()
438 .map(to_field_mask)
439 .collect_vec();
440 (
441 filter_mask.into_iter().map(to_field_mask).collect_vec(),
442 only_projection_mask,
443 )
444 }
445 })
446}
447
448fn to_field_mask(field: FieldName) -> FieldMask {
449 FieldMask::Prefix(FieldPath::from(Field::Name(field)))
450}
451
452#[cfg(test)]
453mod test {
454 use std::collections::BTreeSet;
455 use std::ops::Range;
456 use std::pin::Pin;
457 use std::sync::Arc;
458 use std::sync::atomic::AtomicUsize;
459 use std::sync::atomic::Ordering;
460 use std::task::Context;
461 use std::task::Poll;
462 use std::time::Duration;
463
464 use futures::Stream;
465 use futures::task::noop_waker_ref;
466 use parking_lot::Mutex;
467 use vortex_array::IntoArray;
468 use vortex_array::MaskFuture;
469 use vortex_array::ToCanonical;
470 use vortex_array::arrays::PrimitiveArray;
471 use vortex_array::dtype::DType;
472 use vortex_array::dtype::FieldMask;
473 use vortex_array::dtype::Nullability;
474 use vortex_array::dtype::PType;
475 use vortex_array::expr::Expression;
476 use vortex_error::VortexResult;
477 use vortex_error::vortex_err;
478 use vortex_io::runtime::BlockingRuntime;
479 use vortex_io::runtime::single::SingleThreadRuntime;
480 use vortex_layout::ArrayFuture;
481 use vortex_layout::LayoutReader;
482 use vortex_mask::Mask;
483
484 use super::ScanBuilder;
485
486 #[derive(Debug)]
487 struct CountingLayoutReader {
488 name: Arc<str>,
489 dtype: DType,
490 row_count: u64,
491 register_splits_calls: Arc<AtomicUsize>,
492 }
493
494 impl CountingLayoutReader {
495 fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
496 Self {
497 name: Arc::from("counting"),
498 dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
499 row_count: 1,
500 register_splits_calls,
501 }
502 }
503 }
504
505 impl LayoutReader for CountingLayoutReader {
506 fn name(&self) -> &Arc<str> {
507 &self.name
508 }
509
510 fn dtype(&self) -> &DType {
511 &self.dtype
512 }
513
514 fn row_count(&self) -> u64 {
515 self.row_count
516 }
517
518 fn register_splits(
519 &self,
520 _field_mask: &[FieldMask],
521 row_range: &Range<u64>,
522 splits: &mut BTreeSet<u64>,
523 ) -> VortexResult<()> {
524 self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
525 splits.insert(row_range.end);
526 Ok(())
527 }
528
529 fn pruning_evaluation(
530 &self,
531 _row_range: &Range<u64>,
532 _expr: &Expression,
533 _mask: Mask,
534 ) -> VortexResult<MaskFuture> {
535 unimplemented!("not needed for this test");
536 }
537
538 fn filter_evaluation(
539 &self,
540 _row_range: &Range<u64>,
541 _expr: &Expression,
542 _mask: MaskFuture,
543 ) -> VortexResult<MaskFuture> {
544 unimplemented!("not needed for this test");
545 }
546
547 fn projection_evaluation(
548 &self,
549 _row_range: &Range<u64>,
550 _expr: &Expression,
551 _mask: MaskFuture,
552 ) -> VortexResult<ArrayFuture> {
553 Ok(Box::pin(async move {
554 unreachable!("scan should not be polled in this test")
555 }))
556 }
557 }
558
559 #[test]
560 fn into_stream_is_lazy() {
561 let calls = Arc::new(AtomicUsize::new(0));
562 let reader = Arc::new(CountingLayoutReader::new(calls.clone()));
563
564 let session = crate::test::SCAN_SESSION.clone();
565
566 let _stream = ScanBuilder::new(session, reader).into_stream().unwrap();
567
568 assert_eq!(calls.load(Ordering::Relaxed), 0);
569 }
570
571 #[derive(Debug)]
572 struct SplittingLayoutReader {
573 name: Arc<str>,
574 dtype: DType,
575 row_count: u64,
576 register_splits_calls: Arc<AtomicUsize>,
577 }
578
579 impl SplittingLayoutReader {
580 fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
581 Self {
582 name: Arc::from("splitting"),
583 dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
584 row_count: 4,
585 register_splits_calls,
586 }
587 }
588 }
589
590 impl LayoutReader for SplittingLayoutReader {
591 fn name(&self) -> &Arc<str> {
592 &self.name
593 }
594
595 fn dtype(&self) -> &DType {
596 &self.dtype
597 }
598
599 fn row_count(&self) -> u64 {
600 self.row_count
601 }
602
603 fn register_splits(
604 &self,
605 _field_mask: &[FieldMask],
606 row_range: &Range<u64>,
607 splits: &mut BTreeSet<u64>,
608 ) -> VortexResult<()> {
609 self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
610 for split in (row_range.start + 1)..=row_range.end {
611 splits.insert(split);
612 }
613 Ok(())
614 }
615
616 fn pruning_evaluation(
617 &self,
618 _row_range: &Range<u64>,
619 _expr: &Expression,
620 mask: Mask,
621 ) -> VortexResult<MaskFuture> {
622 Ok(MaskFuture::ready(mask))
623 }
624
625 fn filter_evaluation(
626 &self,
627 _row_range: &Range<u64>,
628 _expr: &Expression,
629 mask: MaskFuture,
630 ) -> VortexResult<MaskFuture> {
631 Ok(mask)
632 }
633
634 fn projection_evaluation(
635 &self,
636 row_range: &Range<u64>,
637 _expr: &Expression,
638 _mask: MaskFuture,
639 ) -> VortexResult<ArrayFuture> {
640 let start = usize::try_from(row_range.start)
641 .map_err(|_| vortex_err!("row_range.start must fit in usize"))?;
642 let end = usize::try_from(row_range.end)
643 .map_err(|_| vortex_err!("row_range.end must fit in usize"))?;
644
645 let values: VortexResult<Vec<i32>> = (start..end)
646 .map(|v| i32::try_from(v).map_err(|_| vortex_err!("split value must fit in i32")))
647 .collect();
648
649 let array = PrimitiveArray::from_iter(values?).into_array();
650 Ok(Box::pin(async move { Ok(array) }))
651 }
652 }
653
654 #[test]
655 fn into_stream_executes_after_prepare() -> VortexResult<()> {
656 let calls = Arc::new(AtomicUsize::new(0));
657 let reader = Arc::new(SplittingLayoutReader::new(calls.clone()));
658
659 let runtime = SingleThreadRuntime::default();
660 let session = crate::test::session_with_handle(runtime.handle());
661
662 let stream = ScanBuilder::new(session, reader).into_stream().unwrap();
663 let mut iter = runtime.block_on_stream(stream);
664
665 let mut values = Vec::new();
666 for chunk in &mut iter {
667 values.push(chunk?.to_primitive().into_buffer::<i32>()[0]);
668 }
669
670 assert_eq!(calls.load(Ordering::Relaxed), 1);
671 assert_eq!(values.as_ref(), [0, 1, 2, 3]);
672
673 Ok(())
674 }
675
676 #[derive(Debug)]
677 struct BlockingSplitsLayoutReader {
678 name: Arc<str>,
679 dtype: DType,
680 row_count: u64,
681 register_splits_calls: Arc<AtomicUsize>,
682 gate: Arc<Mutex<()>>,
683 }
684
685 impl BlockingSplitsLayoutReader {
686 fn new(gate: Arc<Mutex<()>>, register_splits_calls: Arc<AtomicUsize>) -> Self {
687 Self {
688 name: Arc::from("blocking-splits"),
689 dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
690 row_count: 1,
691 register_splits_calls,
692 gate,
693 }
694 }
695 }
696
697 impl LayoutReader for BlockingSplitsLayoutReader {
698 fn name(&self) -> &Arc<str> {
699 &self.name
700 }
701
702 fn dtype(&self) -> &DType {
703 &self.dtype
704 }
705
706 fn row_count(&self) -> u64 {
707 self.row_count
708 }
709
710 fn register_splits(
711 &self,
712 _field_mask: &[FieldMask],
713 row_range: &Range<u64>,
714 splits: &mut BTreeSet<u64>,
715 ) -> VortexResult<()> {
716 self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
717 let _guard = self.gate.lock();
718 splits.insert(row_range.end);
719 Ok(())
720 }
721
722 fn pruning_evaluation(
723 &self,
724 _row_range: &Range<u64>,
725 _expr: &Expression,
726 _mask: Mask,
727 ) -> VortexResult<MaskFuture> {
728 unimplemented!("not needed for this test");
729 }
730
731 fn filter_evaluation(
732 &self,
733 _row_range: &Range<u64>,
734 _expr: &Expression,
735 _mask: MaskFuture,
736 ) -> VortexResult<MaskFuture> {
737 unimplemented!("not needed for this test");
738 }
739
740 fn projection_evaluation(
741 &self,
742 _row_range: &Range<u64>,
743 _expr: &Expression,
744 _mask: MaskFuture,
745 ) -> VortexResult<ArrayFuture> {
746 Ok(Box::pin(async move {
747 unreachable!("scan should not be polled in this test")
748 }))
749 }
750 }
751
752 #[test]
753 fn into_stream_first_poll_does_not_block() {
754 let gate = Arc::new(Mutex::new(()));
755 let guard = gate.lock();
756
757 let calls = Arc::new(AtomicUsize::new(0));
758 let reader = Arc::new(BlockingSplitsLayoutReader::new(gate.clone(), calls.clone()));
759
760 let runtime = SingleThreadRuntime::default();
761 let session = crate::test::session_with_handle(runtime.handle());
762
763 let mut stream = ScanBuilder::new(session, reader).into_stream().unwrap();
764
765 let (send, recv) = std::sync::mpsc::channel::<bool>();
766 let join = std::thread::spawn(move || {
767 let waker = noop_waker_ref();
768 let mut cx = Context::from_waker(waker);
769 let poll = Pin::new(&mut stream).poll_next(&mut cx);
770 let _ = send.send(matches!(poll, Poll::Pending));
771 });
772
773 let polled_pending = recv.recv_timeout(Duration::from_secs(1)).ok();
774
775 drop(guard);
777 drop(join.join());
778
779 let polled_pending = polled_pending.expect("poll_next blocked; expected quick return");
780 assert!(
781 polled_pending,
782 "expected Poll::Pending while prepare is blocked"
783 );
784 assert_eq!(calls.load(Ordering::Relaxed), 0);
785
786 drop(runtime);
787 }
788}