1use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::Stream;
12use futures::StreamExt;
13use futures::future::BoxFuture;
14use futures::stream::BoxStream;
15use itertools::Itertools;
16use vortex_array::ArrayRef;
17use vortex_array::expr::Expression;
18use vortex_array::expr::analysis::immediate_access::immediate_scope_access;
19use vortex_array::expr::root;
20use vortex_array::iter::ArrayIterator;
21use vortex_array::iter::ArrayIteratorAdapter;
22use vortex_array::stats::StatsSet;
23use vortex_array::stream::ArrayStream;
24use vortex_array::stream::ArrayStreamAdapter;
25use vortex_buffer::Buffer;
26use vortex_dtype::DType;
27use vortex_dtype::Field;
28use vortex_dtype::FieldMask;
29use vortex_dtype::FieldName;
30use vortex_dtype::FieldPath;
31use vortex_error::VortexExpect;
32use vortex_error::VortexResult;
33use vortex_error::vortex_bail;
34use vortex_io::runtime::BlockingRuntime;
35use vortex_io::runtime::Handle;
36use vortex_io::runtime::Task;
37use vortex_io::session::RuntimeSessionExt;
38use vortex_layout::LayoutReader;
39use vortex_layout::LayoutReaderRef;
40use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
41use vortex_metrics::MetricsRegistry;
42use vortex_session::VortexSession;
43
44use crate::RepeatedScan;
45use crate::selection::Selection;
46use crate::split_by::SplitBy;
47use crate::splits::Splits;
48use crate::splits::attempt_split_ranges;
49
50pub struct ScanBuilder<A> {
52 session: VortexSession,
53 layout_reader: LayoutReaderRef,
54 projection: Expression,
55 filter: Option<Expression>,
56 ordered: bool,
58 row_range: Option<Range<u64>>,
60 selection: Selection,
63 split_by: SplitBy,
65 concurrency: usize,
67 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
69 metrics_registry: Option<Arc<dyn MetricsRegistry>>,
70 file_stats: Option<Arc<[StatsSet]>>,
72 limit: Option<u64>,
74 row_offset: u64,
77}
78
79impl ScanBuilder<ArrayRef> {
80 pub fn new(session: VortexSession, layout_reader: Arc<dyn LayoutReader>) -> Self {
81 Self {
82 session,
83 layout_reader,
84 projection: root(),
85 filter: None,
86 ordered: true,
87 row_range: None,
88 selection: Default::default(),
89 split_by: SplitBy::Layout,
90 concurrency: 4,
93 map_fn: Arc::new(Ok),
94 metrics_registry: None,
95 file_stats: None,
96 limit: None,
97 row_offset: 0,
98 }
99 }
100
101 pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
105 let dtype = self.dtype()?;
106 let stream = self.into_stream()?;
107 Ok(ArrayStreamAdapter::new(dtype, stream))
108 }
109
110 pub fn into_array_iter<B: BlockingRuntime>(
112 self,
113 runtime: &B,
114 ) -> VortexResult<impl ArrayIterator + 'static> {
115 let stream = self.into_array_stream()?;
116 let dtype = stream.dtype().clone();
117 Ok(ArrayIteratorAdapter::new(
118 dtype,
119 runtime.block_on_stream(stream),
120 ))
121 }
122}
123
124impl<A: 'static + Send> ScanBuilder<A> {
125 pub fn with_filter(mut self, filter: Expression) -> Self {
126 self.filter = Some(filter);
127 self
128 }
129
130 pub fn with_some_filter(mut self, filter: Option<Expression>) -> Self {
131 self.filter = filter;
132 self
133 }
134
135 pub fn with_projection(mut self, projection: Expression) -> Self {
136 self.projection = projection;
137 self
138 }
139
140 pub fn with_ordered(mut self, ordered: bool) -> Self {
141 self.ordered = ordered;
142 self
143 }
144
145 pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
146 self.row_range = Some(row_range);
147 self
148 }
149
150 pub fn with_selection(mut self, selection: Selection) -> Self {
151 self.selection = selection;
152 self
153 }
154
155 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
156 self.selection = Selection::IncludeByIndex(row_indices);
157 self
158 }
159
160 pub fn with_row_offset(mut self, row_offset: u64) -> Self {
161 self.row_offset = row_offset;
162 self
163 }
164
165 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
166 self.split_by = split_by;
167 self
168 }
169
170 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
173 assert!(concurrency > 0);
174 self.concurrency = concurrency;
175 self
176 }
177
178 pub fn with_metrics_registry(mut self, metrics: Arc<dyn MetricsRegistry>) -> Self {
179 self.metrics_registry = Some(metrics);
180 self
181 }
182
183 pub fn with_limit(mut self, limit: u64) -> Self {
184 self.limit = Some(limit);
185 self
186 }
187
188 pub fn dtype(&self) -> VortexResult<DType> {
190 self.projection.return_dtype(self.layout_reader.dtype())
191 }
192
193 pub fn session(&self) -> &VortexSession {
195 &self.session
196 }
197
198 pub fn map<B: 'static>(
200 self,
201 map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
202 ) -> ScanBuilder<B> {
203 let old_map_fn = self.map_fn;
204 ScanBuilder {
205 session: self.session,
206 layout_reader: self.layout_reader,
207 projection: self.projection,
208 filter: self.filter,
209 ordered: self.ordered,
210 row_range: self.row_range,
211 selection: self.selection,
212 split_by: self.split_by,
213 concurrency: self.concurrency,
214 metrics_registry: self.metrics_registry,
215 file_stats: self.file_stats,
216 limit: self.limit,
217 row_offset: self.row_offset,
218 map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
219 }
220 }
221
222 pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
223 let dtype = self.dtype()?;
224
225 if self.filter.is_some() && self.limit.is_some() {
226 vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
227 }
228
229 let mut layout_reader = self.layout_reader;
232
233 layout_reader = Arc::new(RowIdxLayoutReader::new(
237 self.row_offset,
238 layout_reader,
239 self.session.clone(),
240 ));
241
242 let projection = self.projection.optimize_recursive(layout_reader.dtype())?;
244
245 let filter = self
246 .filter
247 .map(|f| f.optimize_recursive(layout_reader.dtype()))
248 .transpose()?;
249
250 let (filter_mask, projection_mask) =
252 filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
253 let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
254
255 let splits =
256 if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) {
257 Splits::Ranges(ranges)
258 } else {
259 let split_range = self
260 .row_range
261 .clone()
262 .unwrap_or_else(|| 0..layout_reader.row_count());
263 Splits::Natural(self.split_by.splits(
264 layout_reader.as_ref(),
265 &split_range,
266 &field_mask,
267 )?)
268 };
269
270 Ok(RepeatedScan::new(
271 self.session.clone(),
272 layout_reader,
273 projection,
274 filter,
275 self.ordered,
276 self.row_range,
277 self.selection,
278 splits,
279 self.concurrency,
280 self.map_fn,
281 self.limit,
282 dtype,
283 ))
284 }
285
286 pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
288 if self.limit.is_some_and(|l| l == 0) {
290 return Ok(vec![]);
291 }
292
293 self.prepare()?.execute(None)
294 }
295
296 pub fn into_stream(
298 self,
299 ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
300 Ok(LazyScanStream::new(self))
301 }
302
303 pub fn into_iter<B: BlockingRuntime>(
305 self,
306 runtime: &B,
307 ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
308 let stream = self.into_stream()?;
309 Ok(runtime.block_on_stream(stream))
310 }
311}
312
313enum LazyScanState<A: 'static + Send> {
314 Builder(Option<Box<ScanBuilder<A>>>),
315 Preparing(PreparingScan<A>),
316 Stream(BoxStream<'static, VortexResult<A>>),
317 Error(Option<vortex_error::VortexError>),
318}
319
320type PreparedScanTasks<A> = Vec<BoxFuture<'static, VortexResult<Option<A>>>>;
321
322struct PreparingScan<A: 'static + Send> {
323 ordered: bool,
324 concurrency: usize,
325 handle: Handle,
326 task: Task<VortexResult<PreparedScanTasks<A>>>,
327}
328
329struct LazyScanStream<A: 'static + Send> {
330 state: LazyScanState<A>,
331}
332
333impl<A: 'static + Send> LazyScanStream<A> {
334 fn new(builder: ScanBuilder<A>) -> Self {
335 Self {
336 state: LazyScanState::Builder(Some(Box::new(builder))),
337 }
338 }
339}
340
341impl<A: 'static + Send> Unpin for LazyScanStream<A> {}
342
343impl<A: 'static + Send> Stream for LazyScanStream<A> {
344 type Item = VortexResult<A>;
345
346 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
347 loop {
348 match &mut self.state {
349 LazyScanState::Builder(builder) => {
350 let builder = builder.take().vortex_expect("polled after completion");
351 let ordered = builder.ordered;
352 let num_workers = std::thread::available_parallelism()
353 .map(|n| n.get())
354 .unwrap_or(1);
355 let concurrency = builder.concurrency * num_workers;
356 let handle = builder.session.handle();
357 let task = handle.spawn_blocking(move || {
358 builder.prepare().and_then(|scan| scan.execute(None))
359 });
360 self.state = LazyScanState::Preparing(PreparingScan {
361 ordered,
362 concurrency,
363 handle,
364 task,
365 });
366 }
367 LazyScanState::Preparing(preparing) => {
368 match ready!(Pin::new(&mut preparing.task).poll(cx)) {
369 Ok(tasks) => {
370 let ordered = preparing.ordered;
371 let concurrency = preparing.concurrency;
372 let handle = preparing.handle.clone();
373 let stream =
374 futures::stream::iter(tasks).map(move |task| handle.spawn(task));
375 let stream = if ordered {
376 stream.buffered(concurrency).boxed()
377 } else {
378 stream.buffer_unordered(concurrency).boxed()
379 };
380 let stream = stream
381 .filter_map(|chunk| async move { chunk.transpose() })
382 .boxed();
383 self.state = LazyScanState::Stream(stream);
384 }
385 Err(err) => self.state = LazyScanState::Error(Some(err)),
386 }
387 }
388 LazyScanState::Stream(stream) => return stream.as_mut().poll_next(cx),
389 LazyScanState::Error(err) => return Poll::Ready(err.take().map(Err)),
390 }
391 }
392 }
393}
394
395pub(crate) fn filter_and_projection_masks(
399 projection: &Expression,
400 filter: Option<&Expression>,
401 dtype: &DType,
402) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
403 let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
404 return Ok(match filter {
405 Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
406 None => (Vec::new(), vec![FieldMask::All]),
407 });
408 };
409 let projection_mask = immediate_scope_access(projection, struct_dtype);
410 Ok(match filter {
411 None => (
412 Vec::new(),
413 projection_mask.into_iter().map(to_field_mask).collect_vec(),
414 ),
415 Some(f) => {
416 let filter_mask = immediate_scope_access(f, struct_dtype);
417 let only_projection_mask = projection_mask
418 .difference(&filter_mask)
419 .cloned()
420 .map(to_field_mask)
421 .collect_vec();
422 (
423 filter_mask.into_iter().map(to_field_mask).collect_vec(),
424 only_projection_mask,
425 )
426 }
427 })
428}
429
430fn to_field_mask(field: FieldName) -> FieldMask {
431 FieldMask::Prefix(FieldPath::from(Field::Name(field)))
432}
433
434#[cfg(test)]
435mod test {
436 use std::collections::BTreeSet;
437 use std::ops::Range;
438 use std::pin::Pin;
439 use std::sync::Arc;
440 use std::sync::atomic::AtomicUsize;
441 use std::sync::atomic::Ordering;
442 use std::task::Context;
443 use std::task::Poll;
444 use std::time::Duration;
445
446 use futures::Stream;
447 use futures::task::noop_waker_ref;
448 use parking_lot::Mutex;
449 use vortex_array::IntoArray;
450 use vortex_array::MaskFuture;
451 use vortex_array::ToCanonical;
452 use vortex_array::arrays::PrimitiveArray;
453 use vortex_array::expr::Expression;
454 use vortex_dtype::DType;
455 use vortex_dtype::FieldMask;
456 use vortex_dtype::Nullability;
457 use vortex_dtype::PType;
458 use vortex_error::VortexResult;
459 use vortex_error::vortex_err;
460 use vortex_io::runtime::BlockingRuntime;
461 use vortex_io::runtime::single::SingleThreadRuntime;
462 use vortex_layout::ArrayFuture;
463 use vortex_layout::LayoutReader;
464 use vortex_mask::Mask;
465
466 use super::ScanBuilder;
467
468 #[derive(Debug)]
469 struct CountingLayoutReader {
470 name: Arc<str>,
471 dtype: DType,
472 row_count: u64,
473 register_splits_calls: Arc<AtomicUsize>,
474 }
475
476 impl CountingLayoutReader {
477 fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
478 Self {
479 name: Arc::from("counting"),
480 dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
481 row_count: 1,
482 register_splits_calls,
483 }
484 }
485 }
486
487 impl LayoutReader for CountingLayoutReader {
488 fn name(&self) -> &Arc<str> {
489 &self.name
490 }
491
492 fn dtype(&self) -> &DType {
493 &self.dtype
494 }
495
496 fn row_count(&self) -> u64 {
497 self.row_count
498 }
499
500 fn register_splits(
501 &self,
502 _field_mask: &[FieldMask],
503 row_range: &Range<u64>,
504 splits: &mut BTreeSet<u64>,
505 ) -> VortexResult<()> {
506 self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
507 splits.insert(row_range.end);
508 Ok(())
509 }
510
511 fn pruning_evaluation(
512 &self,
513 _row_range: &Range<u64>,
514 _expr: &Expression,
515 _mask: Mask,
516 ) -> VortexResult<MaskFuture> {
517 unimplemented!("not needed for this test");
518 }
519
520 fn filter_evaluation(
521 &self,
522 _row_range: &Range<u64>,
523 _expr: &Expression,
524 _mask: MaskFuture,
525 ) -> VortexResult<MaskFuture> {
526 unimplemented!("not needed for this test");
527 }
528
529 fn projection_evaluation(
530 &self,
531 _row_range: &Range<u64>,
532 _expr: &Expression,
533 _mask: MaskFuture,
534 ) -> VortexResult<ArrayFuture> {
535 Ok(Box::pin(async move {
536 unreachable!("scan should not be polled in this test")
537 }))
538 }
539 }
540
541 #[test]
542 fn into_stream_is_lazy() {
543 let calls = Arc::new(AtomicUsize::new(0));
544 let reader = Arc::new(CountingLayoutReader::new(calls.clone()));
545
546 let session = crate::test::SCAN_SESSION.clone();
547
548 let _stream = ScanBuilder::new(session, reader).into_stream().unwrap();
549
550 assert_eq!(calls.load(Ordering::Relaxed), 0);
551 }
552
553 #[derive(Debug)]
554 struct SplittingLayoutReader {
555 name: Arc<str>,
556 dtype: DType,
557 row_count: u64,
558 register_splits_calls: Arc<AtomicUsize>,
559 }
560
561 impl SplittingLayoutReader {
562 fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
563 Self {
564 name: Arc::from("splitting"),
565 dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
566 row_count: 4,
567 register_splits_calls,
568 }
569 }
570 }
571
572 impl LayoutReader for SplittingLayoutReader {
573 fn name(&self) -> &Arc<str> {
574 &self.name
575 }
576
577 fn dtype(&self) -> &DType {
578 &self.dtype
579 }
580
581 fn row_count(&self) -> u64 {
582 self.row_count
583 }
584
585 fn register_splits(
586 &self,
587 _field_mask: &[FieldMask],
588 row_range: &Range<u64>,
589 splits: &mut BTreeSet<u64>,
590 ) -> VortexResult<()> {
591 self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
592 for split in (row_range.start + 1)..=row_range.end {
593 splits.insert(split);
594 }
595 Ok(())
596 }
597
598 fn pruning_evaluation(
599 &self,
600 _row_range: &Range<u64>,
601 _expr: &Expression,
602 mask: Mask,
603 ) -> VortexResult<MaskFuture> {
604 Ok(MaskFuture::ready(mask))
605 }
606
607 fn filter_evaluation(
608 &self,
609 _row_range: &Range<u64>,
610 _expr: &Expression,
611 mask: MaskFuture,
612 ) -> VortexResult<MaskFuture> {
613 Ok(mask)
614 }
615
616 fn projection_evaluation(
617 &self,
618 row_range: &Range<u64>,
619 _expr: &Expression,
620 _mask: MaskFuture,
621 ) -> VortexResult<ArrayFuture> {
622 let start = usize::try_from(row_range.start)
623 .map_err(|_| vortex_err!("row_range.start must fit in usize"))?;
624 let end = usize::try_from(row_range.end)
625 .map_err(|_| vortex_err!("row_range.end must fit in usize"))?;
626
627 let values: VortexResult<Vec<i32>> = (start..end)
628 .map(|v| i32::try_from(v).map_err(|_| vortex_err!("split value must fit in i32")))
629 .collect();
630
631 let array = PrimitiveArray::from_iter(values?).into_array();
632 Ok(Box::pin(async move { Ok(array) }))
633 }
634 }
635
636 #[test]
637 fn into_stream_executes_after_prepare() -> VortexResult<()> {
638 let calls = Arc::new(AtomicUsize::new(0));
639 let reader = Arc::new(SplittingLayoutReader::new(calls.clone()));
640
641 let runtime = SingleThreadRuntime::default();
642 let session = crate::test::session_with_handle(runtime.handle());
643
644 let stream = ScanBuilder::new(session, reader).into_stream().unwrap();
645 let mut iter = runtime.block_on_stream(stream);
646
647 let mut values = Vec::new();
648 for chunk in &mut iter {
649 values.push(chunk?.to_primitive().into_buffer::<i32>()[0]);
650 }
651
652 assert_eq!(calls.load(Ordering::Relaxed), 1);
653 assert_eq!(values.as_ref(), [0, 1, 2, 3]);
654
655 Ok(())
656 }
657
658 #[derive(Debug)]
659 struct BlockingSplitsLayoutReader {
660 name: Arc<str>,
661 dtype: DType,
662 row_count: u64,
663 register_splits_calls: Arc<AtomicUsize>,
664 gate: Arc<Mutex<()>>,
665 }
666
667 impl BlockingSplitsLayoutReader {
668 fn new(gate: Arc<Mutex<()>>, register_splits_calls: Arc<AtomicUsize>) -> Self {
669 Self {
670 name: Arc::from("blocking-splits"),
671 dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
672 row_count: 1,
673 register_splits_calls,
674 gate,
675 }
676 }
677 }
678
679 impl LayoutReader for BlockingSplitsLayoutReader {
680 fn name(&self) -> &Arc<str> {
681 &self.name
682 }
683
684 fn dtype(&self) -> &DType {
685 &self.dtype
686 }
687
688 fn row_count(&self) -> u64 {
689 self.row_count
690 }
691
692 fn register_splits(
693 &self,
694 _field_mask: &[FieldMask],
695 row_range: &Range<u64>,
696 splits: &mut BTreeSet<u64>,
697 ) -> VortexResult<()> {
698 self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
699 let _guard = self.gate.lock();
700 splits.insert(row_range.end);
701 Ok(())
702 }
703
704 fn pruning_evaluation(
705 &self,
706 _row_range: &Range<u64>,
707 _expr: &Expression,
708 _mask: Mask,
709 ) -> VortexResult<MaskFuture> {
710 unimplemented!("not needed for this test");
711 }
712
713 fn filter_evaluation(
714 &self,
715 _row_range: &Range<u64>,
716 _expr: &Expression,
717 _mask: MaskFuture,
718 ) -> VortexResult<MaskFuture> {
719 unimplemented!("not needed for this test");
720 }
721
722 fn projection_evaluation(
723 &self,
724 _row_range: &Range<u64>,
725 _expr: &Expression,
726 _mask: MaskFuture,
727 ) -> VortexResult<ArrayFuture> {
728 Ok(Box::pin(async move {
729 unreachable!("scan should not be polled in this test")
730 }))
731 }
732 }
733
734 #[test]
735 fn into_stream_first_poll_does_not_block() {
736 let gate = Arc::new(Mutex::new(()));
737 let guard = gate.lock();
738
739 let calls = Arc::new(AtomicUsize::new(0));
740 let reader = Arc::new(BlockingSplitsLayoutReader::new(gate.clone(), calls.clone()));
741
742 let runtime = SingleThreadRuntime::default();
743 let session = crate::test::session_with_handle(runtime.handle());
744
745 let mut stream = ScanBuilder::new(session, reader).into_stream().unwrap();
746
747 let (send, recv) = std::sync::mpsc::channel::<bool>();
748 let join = std::thread::spawn(move || {
749 let waker = noop_waker_ref();
750 let mut cx = Context::from_waker(waker);
751 let poll = Pin::new(&mut stream).poll_next(&mut cx);
752 let _ = send.send(matches!(poll, Poll::Pending));
753 });
754
755 let polled_pending = recv.recv_timeout(Duration::from_secs(1)).ok();
756
757 drop(guard);
759 drop(join.join());
760
761 let polled_pending = polled_pending.expect("poll_next blocked; expected quick return");
762 assert!(
763 polled_pending,
764 "expected Poll::Pending while prepare is blocked"
765 );
766 assert_eq!(calls.load(Ordering::Relaxed), 0);
767
768 drop(runtime);
769 }
770}