1use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::Stream;
12use futures::StreamExt;
13use futures::future::BoxFuture;
14use futures::stream::BoxStream;
15use itertools::Itertools;
16use vortex_array::ArrayRef;
17use vortex_array::dtype::DType;
18use vortex_array::dtype::FieldMask;
19use vortex_array::expr::Expression;
20use vortex_array::expr::analysis::referenced_field_paths;
21use vortex_array::expr::root;
22use vortex_array::iter::ArrayIterator;
23use vortex_array::iter::ArrayIteratorAdapter;
24use vortex_array::stats::StatsSet;
25use vortex_array::stream::ArrayStream;
26use vortex_array::stream::ArrayStreamAdapter;
27use vortex_buffer::Buffer;
28use vortex_error::VortexExpect;
29use vortex_error::VortexResult;
30use vortex_error::vortex_bail;
31use vortex_io::runtime::BlockingRuntime;
32use vortex_io::runtime::Handle;
33use vortex_io::runtime::Task;
34use vortex_io::session::RuntimeSessionExt;
35use vortex_metrics::MetricsRegistry;
36use vortex_scan::selection::Selection;
37use vortex_session::VortexSession;
38use vortex_utils::parallelism::get_available_parallelism;
39
40use crate::LayoutReader;
41use crate::LayoutReaderRef;
42use crate::layouts::row_idx::RowIdxLayoutReader;
43use crate::scan::repeated_scan::RepeatedScan;
44use crate::scan::split_by::SplitBy;
45use crate::scan::splits::Splits;
46use crate::scan::splits::attempt_split_ranges;
47
48pub struct ScanBuilder<A> {
50 session: VortexSession,
51 layout_reader: LayoutReaderRef,
52 projection: Expression,
53 filter: Option<Expression>,
54 ordered: bool,
56 row_range: Option<Range<u64>>,
58 selection: Selection,
61 split_by: SplitBy,
63 concurrency: usize,
65 map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
67 metrics_registry: Option<Arc<dyn MetricsRegistry>>,
68 file_stats: Option<Arc<[StatsSet]>>,
70 limit: Option<u64>,
72 row_offset: u64,
75}
76
77impl ScanBuilder<ArrayRef> {
78 pub fn new(session: VortexSession, layout_reader: Arc<dyn LayoutReader>) -> Self {
79 Self {
80 session,
81 layout_reader,
82 projection: root(),
83 filter: None,
84 ordered: true,
85 row_range: None,
86 selection: Default::default(),
87 split_by: SplitBy::Layout,
88 concurrency: 4,
91 map_fn: Arc::new(Ok),
92 metrics_registry: None,
93 file_stats: None,
94 limit: None,
95 row_offset: 0,
96 }
97 }
98
99 pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
103 let dtype = self.dtype()?;
104 let stream = self.into_stream()?;
105 Ok(ArrayStreamAdapter::new(dtype, stream))
106 }
107
108 pub fn into_array_iter<B: BlockingRuntime>(
110 self,
111 runtime: &B,
112 ) -> VortexResult<impl ArrayIterator + 'static> {
113 let stream = self.into_array_stream()?;
114 let dtype = stream.dtype().clone();
115 Ok(ArrayIteratorAdapter::new(
116 dtype,
117 runtime.block_on_stream(stream),
118 ))
119 }
120}
121
122impl<A: 'static + Send> ScanBuilder<A> {
123 pub fn with_filter(mut self, filter: Expression) -> Self {
124 self.filter = Some(filter);
125 self
126 }
127
128 pub fn with_some_filter(mut self, filter: Option<Expression>) -> Self {
129 self.filter = filter;
130 self
131 }
132
133 pub fn with_projection(mut self, projection: Expression) -> Self {
134 self.projection = projection;
135 self
136 }
137
138 pub fn ordered(&self) -> bool {
139 self.ordered
140 }
141
142 pub fn with_ordered(mut self, ordered: bool) -> Self {
143 self.ordered = ordered;
144 self
145 }
146
147 pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
148 self.row_range = Some(row_range);
149 self
150 }
151
152 pub fn with_selection(mut self, selection: Selection) -> Self {
153 self.selection = selection;
154 self
155 }
156
157 pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
158 self.selection = Selection::IncludeByIndex(row_indices);
159 self
160 }
161
162 pub fn with_row_offset(mut self, row_offset: u64) -> Self {
163 self.row_offset = row_offset;
164 self
165 }
166
167 pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
168 self.split_by = split_by;
169 self
170 }
171
172 pub fn concurrency(&self) -> usize {
173 self.concurrency
174 }
175
176 pub fn with_concurrency(mut self, concurrency: usize) -> Self {
179 assert!(concurrency > 0);
180 self.concurrency = concurrency;
181 self
182 }
183
184 pub fn with_some_metrics_registry(mut self, metrics: Option<Arc<dyn MetricsRegistry>>) -> Self {
185 self.metrics_registry = metrics;
186 self
187 }
188
189 pub fn with_metrics_registry(mut self, metrics: Arc<dyn MetricsRegistry>) -> Self {
190 self.metrics_registry = Some(metrics);
191 self
192 }
193
194 pub fn with_some_limit(mut self, limit: Option<u64>) -> Self {
195 self.limit = limit;
196 self
197 }
198
199 pub fn with_limit(mut self, limit: u64) -> Self {
200 self.limit = Some(limit);
201 self
202 }
203
204 pub fn dtype(&self) -> VortexResult<DType> {
206 self.projection.return_dtype(self.layout_reader.dtype())
207 }
208
209 pub fn session(&self) -> &VortexSession {
211 &self.session
212 }
213
214 pub fn map<B: 'static>(
216 self,
217 map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
218 ) -> ScanBuilder<B> {
219 let old_map_fn = self.map_fn;
220 ScanBuilder {
221 session: self.session,
222 layout_reader: self.layout_reader,
223 projection: self.projection,
224 filter: self.filter,
225 ordered: self.ordered,
226 row_range: self.row_range,
227 selection: self.selection,
228 split_by: self.split_by,
229 concurrency: self.concurrency,
230 metrics_registry: self.metrics_registry,
231 file_stats: self.file_stats,
232 limit: self.limit,
233 row_offset: self.row_offset,
234 map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
235 }
236 }
237
238 pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
239 let dtype = self.dtype()?;
240
241 if self.filter.is_some() && self.limit.is_some() {
242 vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
243 }
244
245 let mut layout_reader = self.layout_reader;
248
249 layout_reader = Arc::new(RowIdxLayoutReader::new(
253 self.row_offset,
254 layout_reader,
255 self.session.clone(),
256 ));
257
258 let projection = self.projection.optimize_recursive(layout_reader.dtype())?;
260
261 let filter = self
262 .filter
263 .map(|f| f.optimize_recursive(layout_reader.dtype()))
264 .transpose()?;
265
266 let field_mask =
268 referenced_field_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
269
270 let splits =
271 if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) {
272 Splits::Ranges(ranges)
273 } else {
274 let split_range = self
275 .row_range
276 .clone()
277 .unwrap_or_else(|| 0..layout_reader.row_count());
278 Splits::Natural(self.split_by.splits(
279 layout_reader.as_ref(),
280 &split_range,
281 &field_mask,
282 )?)
283 };
284
285 Ok(RepeatedScan::new(
286 self.session.clone(),
287 layout_reader,
288 projection,
289 filter,
290 self.ordered,
291 self.row_range,
292 self.selection,
293 splits,
294 self.concurrency,
295 self.map_fn,
296 self.limit,
297 dtype,
298 ))
299 }
300
301 pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
303 if self.limit.is_some_and(|l| l == 0) {
305 return Ok(vec![]);
306 }
307
308 self.prepare()?.execute(None)
309 }
310
311 pub fn into_stream(
313 self,
314 ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
315 Ok(LazyScanStream::new(self))
316 }
317
318 pub fn into_iter<B: BlockingRuntime>(
320 self,
321 runtime: &B,
322 ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
323 let stream = self.into_stream()?;
324 Ok(runtime.block_on_stream(stream))
325 }
326}
327
328enum LazyScanState<A: 'static + Send> {
329 Builder(Option<Box<ScanBuilder<A>>>),
330 Preparing(PreparingScan<A>),
331 Stream(BoxStream<'static, VortexResult<A>>),
332 Error(Option<vortex_error::VortexError>),
333}
334
335type PreparedScanTasks<A> = Vec<BoxFuture<'static, VortexResult<Option<A>>>>;
336
337struct PreparingScan<A: 'static + Send> {
338 ordered: bool,
339 concurrency: usize,
340 handle: Handle,
341 task: Task<VortexResult<PreparedScanTasks<A>>>,
342}
343
344struct LazyScanStream<A: 'static + Send> {
345 state: LazyScanState<A>,
346}
347
348impl<A: 'static + Send> LazyScanStream<A> {
349 fn new(builder: ScanBuilder<A>) -> Self {
350 Self {
351 state: LazyScanState::Builder(Some(Box::new(builder))),
352 }
353 }
354}
355
356impl<A: 'static + Send> Unpin for LazyScanStream<A> {}
357
358impl<A: 'static + Send> Stream for LazyScanStream<A> {
359 type Item = VortexResult<A>;
360
361 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
362 loop {
363 match &mut self.state {
364 LazyScanState::Builder(builder) => {
365 let builder = builder.take().vortex_expect("polled after completion");
366 let ordered = builder.ordered;
367 let num_workers = get_available_parallelism().unwrap_or(1);
368 let concurrency = builder.concurrency * num_workers;
369 let handle = builder.session.handle();
370 let task = handle.spawn_blocking(move || {
371 builder.prepare().and_then(|scan| scan.execute(None))
372 });
373 self.state = LazyScanState::Preparing(PreparingScan {
374 ordered,
375 concurrency,
376 handle,
377 task,
378 });
379 }
380 LazyScanState::Preparing(preparing) => {
381 match ready!(Pin::new(&mut preparing.task).poll(cx)) {
382 Ok(tasks) => {
383 let ordered = preparing.ordered;
384 let concurrency = preparing.concurrency;
385 let handle = preparing.handle.clone();
386 let stream =
387 futures::stream::iter(tasks).map(move |task| handle.spawn(task));
388 let stream = if ordered {
389 stream.buffered(concurrency).boxed()
390 } else {
391 stream.buffer_unordered(concurrency).boxed()
392 };
393 let stream = stream
394 .filter_map(|chunk| async move { chunk.transpose() })
395 .boxed();
396 self.state = LazyScanState::Stream(stream);
397 }
398 Err(err) => self.state = LazyScanState::Error(Some(err)),
399 }
400 }
401 LazyScanState::Stream(stream) => return stream.as_mut().poll_next(cx),
402 LazyScanState::Error(err) => return Poll::Ready(err.take().map(Err)),
403 }
404 }
405 }
406}
407
408pub fn referenced_field_masks(
412 projection: &Expression,
413 filter: Option<&Expression>,
414 dtype: &DType,
415) -> VortexResult<Vec<FieldMask>> {
416 if dtype.as_struct_fields_opt().is_none() {
417 return Ok(vec![FieldMask::All]);
418 }
419
420 let mut field_paths = referenced_field_paths(projection, dtype)?;
421 if let Some(filter) = filter {
422 field_paths.extend(referenced_field_paths(filter, dtype)?);
423 }
424 Ok(field_paths.into_iter().map(FieldMask::Prefix).collect_vec())
425}
426
427#[cfg(test)]
428mod test {
429 use std::ops::Range;
430 use std::pin::Pin;
431 use std::sync::Arc;
432 use std::sync::atomic::AtomicUsize;
433 use std::sync::atomic::Ordering;
434 use std::sync::mpsc;
435 use std::task::Context;
436 use std::task::Poll;
437 use std::time::Duration;
438
439 use futures::Stream;
440 use futures::task::noop_waker_ref;
441 use parking_lot::Mutex;
442 use vortex_array::IntoArray;
443 use vortex_array::LEGACY_SESSION;
444 use vortex_array::MaskFuture;
445 use vortex_array::VortexSessionExecute;
446 use vortex_array::arrays::PrimitiveArray;
447 use vortex_array::dtype::DType;
448 use vortex_array::dtype::FieldMask;
449 use vortex_array::dtype::FieldPath;
450 use vortex_array::dtype::Nullability;
451 use vortex_array::dtype::PType;
452 use vortex_array::dtype::StructFields;
453 use vortex_array::expr::Expression;
454 use vortex_array::expr::eq;
455 use vortex_array::expr::get_item;
456 use vortex_array::expr::is_not_null;
457 use vortex_array::expr::lit;
458 use vortex_array::expr::root;
459 use vortex_error::VortexResult;
460 use vortex_error::vortex_err;
461 use vortex_io::runtime::BlockingRuntime;
462 use vortex_io::runtime::single::SingleThreadRuntime;
463 use vortex_mask::Mask;
464
465 use super::ScanBuilder;
466 use super::referenced_field_masks;
467 use crate::ArrayFuture;
468 use crate::LayoutReader;
469 use crate::RowSplits;
470 use crate::SplitRange;
471 use crate::scan::test::SCAN_SESSION;
472 use crate::scan::test::session_with_handle;
473
474 fn nested_dtype() -> DType {
475 DType::Struct(
476 StructFields::from_iter([
477 (
478 "a",
479 DType::Struct(
480 StructFields::from_iter([
481 ("1", DType::Primitive(PType::I32, Nullability::NonNullable)),
482 ("2", DType::Primitive(PType::I32, Nullability::NonNullable)),
483 ]),
484 Nullability::NonNullable,
485 ),
486 ),
487 ("b", DType::Primitive(PType::I32, Nullability::NonNullable)),
488 ]),
489 Nullability::NonNullable,
490 )
491 }
492
493 #[test]
494 fn nested_projection_preserves_field_path_in_split_mask() -> VortexResult<()> {
495 let projection = get_item("1", get_item("a", root()));
496 let filter = eq(get_item("2", get_item("a", root())), lit(0_i32));
497
498 let field_masks = referenced_field_masks(&projection, Some(&filter), &nested_dtype())?;
499
500 assert_eq!(field_masks.len(), 2);
501 assert!(field_masks.contains(&FieldMask::Prefix(FieldPath::from_name("a").push("1"))));
502 assert!(field_masks.contains(&FieldMask::Prefix(FieldPath::from_name("a").push("2"))));
503 Ok(())
504 }
505
506 #[test]
507 fn filter_path_covers_nested_projection_path() -> VortexResult<()> {
508 let projection = get_item("1", get_item("a", root()));
509 let filter = is_not_null(get_item("a", root()));
510
511 let field_masks = referenced_field_masks(&projection, Some(&filter), &nested_dtype())?;
512
513 assert_eq!(field_masks, [FieldMask::Prefix(FieldPath::from_name("a"))]);
514 Ok(())
515 }
516
517 #[test]
518 fn parent_projection_path_covers_nested_filter_path() -> VortexResult<()> {
519 let projection = get_item("a", root());
520 let filter = is_not_null(get_item("1", get_item("a", root())));
521
522 let field_masks = referenced_field_masks(&projection, Some(&filter), &nested_dtype())?;
523
524 assert_eq!(field_masks, [FieldMask::Prefix(FieldPath::from_name("a"))]);
525 Ok(())
526 }
527
528 #[derive(Debug)]
529 struct CountingLayoutReader {
530 name: Arc<str>,
531 dtype: DType,
532 row_count: u64,
533 register_splits_calls: Arc<AtomicUsize>,
534 }
535
536 impl CountingLayoutReader {
537 fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
538 Self {
539 name: Arc::from("counting"),
540 dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
541 row_count: 1,
542 register_splits_calls,
543 }
544 }
545 }
546
547 impl LayoutReader for CountingLayoutReader {
548 fn name(&self) -> &Arc<str> {
549 &self.name
550 }
551
552 fn dtype(&self) -> &DType {
553 &self.dtype
554 }
555
556 fn row_count(&self) -> u64 {
557 self.row_count
558 }
559
560 fn register_splits(
561 &self,
562 _field_mask: &[FieldMask],
563 split_range: &SplitRange,
564 splits: &mut RowSplits,
565 ) -> VortexResult<()> {
566 self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
567 splits.push(split_range.root_row_range().end);
568 Ok(())
569 }
570
571 fn pruning_evaluation(
572 &self,
573 _row_range: &Range<u64>,
574 _expr: &Expression,
575 _mask: Mask,
576 ) -> VortexResult<MaskFuture> {
577 unimplemented!("not needed for this test");
578 }
579
580 fn filter_evaluation(
581 &self,
582 _row_range: &Range<u64>,
583 _expr: &Expression,
584 _mask: MaskFuture,
585 ) -> VortexResult<MaskFuture> {
586 unimplemented!("not needed for this test");
587 }
588
589 fn projection_evaluation(
590 &self,
591 _row_range: &Range<u64>,
592 _expr: &Expression,
593 _mask: MaskFuture,
594 ) -> VortexResult<ArrayFuture> {
595 Ok(Box::pin(async move {
596 unreachable!("scan should not be polled in this test")
597 }))
598 }
599
600 fn as_any(&self) -> &dyn std::any::Any {
601 self
602 }
603 }
604
605 #[test]
606 fn into_stream_is_lazy() {
607 let calls = Arc::new(AtomicUsize::new(0));
608 let reader = Arc::new(CountingLayoutReader::new(Arc::clone(&calls)));
609
610 let session = SCAN_SESSION.clone();
611
612 let _stream = ScanBuilder::new(session, reader).into_stream().unwrap();
613
614 assert_eq!(calls.load(Ordering::Relaxed), 0);
615 }
616
617 #[derive(Debug)]
618 struct SplittingLayoutReader {
619 name: Arc<str>,
620 dtype: DType,
621 row_count: u64,
622 register_splits_calls: Arc<AtomicUsize>,
623 }
624
625 impl SplittingLayoutReader {
626 fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
627 Self {
628 name: Arc::from("splitting"),
629 dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
630 row_count: 4,
631 register_splits_calls,
632 }
633 }
634 }
635
636 impl LayoutReader for SplittingLayoutReader {
637 fn name(&self) -> &Arc<str> {
638 &self.name
639 }
640
641 fn dtype(&self) -> &DType {
642 &self.dtype
643 }
644
645 fn row_count(&self) -> u64 {
646 self.row_count
647 }
648
649 fn register_splits(
650 &self,
651 _field_mask: &[FieldMask],
652 split_range: &SplitRange,
653 splits: &mut RowSplits,
654 ) -> VortexResult<()> {
655 self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
656 for split in (split_range.row_range().start + 1)..=split_range.row_range().end {
657 splits.push(split_range.row_offset() + split);
658 }
659 Ok(())
660 }
661
662 fn pruning_evaluation(
663 &self,
664 _row_range: &Range<u64>,
665 _expr: &Expression,
666 mask: Mask,
667 ) -> VortexResult<MaskFuture> {
668 Ok(MaskFuture::ready(mask))
669 }
670
671 fn filter_evaluation(
672 &self,
673 _row_range: &Range<u64>,
674 _expr: &Expression,
675 mask: MaskFuture,
676 ) -> VortexResult<MaskFuture> {
677 Ok(mask)
678 }
679
680 fn projection_evaluation(
681 &self,
682 row_range: &Range<u64>,
683 _expr: &Expression,
684 _mask: MaskFuture,
685 ) -> VortexResult<ArrayFuture> {
686 let start = usize::try_from(row_range.start)
687 .map_err(|_| vortex_err!("row_range.start must fit in usize"))?;
688 let end = usize::try_from(row_range.end)
689 .map_err(|_| vortex_err!("row_range.end must fit in usize"))?;
690
691 let values: VortexResult<Vec<i32>> = (start..end)
692 .map(|v| i32::try_from(v).map_err(|_| vortex_err!("split value must fit in i32")))
693 .collect();
694
695 let array = PrimitiveArray::from_iter(values?).into_array();
696 Ok(Box::pin(async move { Ok(array) }))
697 }
698
699 fn as_any(&self) -> &dyn std::any::Any {
700 self
701 }
702 }
703
704 #[test]
705 fn into_stream_executes_after_prepare() -> VortexResult<()> {
706 let mut ctx = LEGACY_SESSION.create_execution_ctx();
707 let calls = Arc::new(AtomicUsize::new(0));
708 let reader = Arc::new(SplittingLayoutReader::new(Arc::clone(&calls)));
709
710 let runtime = SingleThreadRuntime::default();
711 let session = session_with_handle(runtime.handle());
712
713 let stream = ScanBuilder::new(session, reader).into_stream()?;
714 let mut iter = runtime.block_on_stream(stream);
715
716 let mut values = Vec::new();
717 for chunk in &mut iter {
718 let prim = chunk?.execute::<PrimitiveArray>(&mut ctx)?;
719 values.push(prim.into_buffer::<i32>()[0]);
720 }
721
722 assert_eq!(calls.load(Ordering::Relaxed), 1);
723 assert_eq!(values.as_ref(), [0, 1, 2, 3]);
724
725 Ok(())
726 }
727
728 #[derive(Debug)]
729 struct BlockingSplitsLayoutReader {
730 name: Arc<str>,
731 dtype: DType,
732 row_count: u64,
733 register_splits_calls: Arc<AtomicUsize>,
734 gate: Arc<Mutex<()>>,
735 }
736
737 impl BlockingSplitsLayoutReader {
738 fn new(gate: Arc<Mutex<()>>, register_splits_calls: Arc<AtomicUsize>) -> Self {
739 Self {
740 name: Arc::from("blocking-splits"),
741 dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
742 row_count: 1,
743 register_splits_calls,
744 gate,
745 }
746 }
747 }
748
749 impl LayoutReader for BlockingSplitsLayoutReader {
750 fn name(&self) -> &Arc<str> {
751 &self.name
752 }
753
754 fn dtype(&self) -> &DType {
755 &self.dtype
756 }
757
758 fn row_count(&self) -> u64 {
759 self.row_count
760 }
761
762 fn register_splits(
763 &self,
764 _field_mask: &[FieldMask],
765 split_range: &SplitRange,
766 splits: &mut RowSplits,
767 ) -> VortexResult<()> {
768 self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
769 let _guard = self.gate.lock();
770 splits.push(split_range.root_row_range().end);
771 Ok(())
772 }
773
774 fn pruning_evaluation(
775 &self,
776 _row_range: &Range<u64>,
777 _expr: &Expression,
778 _mask: Mask,
779 ) -> VortexResult<MaskFuture> {
780 unimplemented!("not needed for this test");
781 }
782
783 fn filter_evaluation(
784 &self,
785 _row_range: &Range<u64>,
786 _expr: &Expression,
787 _mask: MaskFuture,
788 ) -> VortexResult<MaskFuture> {
789 unimplemented!("not needed for this test");
790 }
791
792 fn projection_evaluation(
793 &self,
794 _row_range: &Range<u64>,
795 _expr: &Expression,
796 _mask: MaskFuture,
797 ) -> VortexResult<ArrayFuture> {
798 Ok(Box::pin(async move {
799 unreachable!("scan should not be polled in this test")
800 }))
801 }
802
803 fn as_any(&self) -> &dyn std::any::Any {
804 self
805 }
806 }
807
808 #[test]
809 fn into_stream_first_poll_does_not_block() {
810 let gate = Arc::new(Mutex::new(()));
811 let guard = gate.lock();
812
813 let calls = Arc::new(AtomicUsize::new(0));
814 let reader = Arc::new(BlockingSplitsLayoutReader::new(
815 Arc::clone(&gate),
816 Arc::clone(&calls),
817 ));
818
819 let runtime = SingleThreadRuntime::default();
820 let session = session_with_handle(runtime.handle());
821
822 let mut stream = ScanBuilder::new(session, reader).into_stream().unwrap();
823
824 let (send, recv) = mpsc::channel::<bool>();
825 let join = std::thread::spawn(move || {
826 let waker = noop_waker_ref();
827 let mut cx = Context::from_waker(waker);
828 let poll = Pin::new(&mut stream).poll_next(&mut cx);
829 let _ = send.send(matches!(poll, Poll::Pending));
830 });
831
832 let polled_pending = recv.recv_timeout(Duration::from_secs(1)).ok();
833
834 drop(guard);
836 drop(join.join());
837
838 let polled_pending = polled_pending.expect("poll_next blocked; expected quick return");
839 assert!(
840 polled_pending,
841 "expected Poll::Pending while prepare is blocked"
842 );
843 assert_eq!(calls.load(Ordering::Relaxed), 0);
844
845 drop(runtime);
846 }
847
848 #[test]
849 fn into_stream_with_row_range() -> VortexResult<()> {
850 let mut ctx = LEGACY_SESSION.create_execution_ctx();
851 let calls = Arc::new(AtomicUsize::new(0));
852 let reader = Arc::new(SplittingLayoutReader::new(Arc::clone(&calls)));
853
854 let runtime = SingleThreadRuntime::default();
855 let session = session_with_handle(runtime.handle());
856
857 let stream = ScanBuilder::new(session, reader)
858 .with_row_range(1..3)
859 .into_stream()?;
860 let mut iter = runtime.block_on_stream(stream);
861
862 let mut values = Vec::new();
863 for chunk in &mut iter {
864 let prim = chunk?.execute::<PrimitiveArray>(&mut ctx)?;
865 values.extend(prim.into_buffer::<i32>().iter().copied());
866 }
867
868 assert_eq!(calls.load(Ordering::Relaxed), 1);
869 assert_eq!(values.as_ref(), [1, 2]);
870
871 Ok(())
872 }
873}