Skip to main content

vortex_layout/scan/
scan_builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::Stream;
12use futures::StreamExt;
13use futures::future::BoxFuture;
14use futures::stream::BoxStream;
15use itertools::Itertools;
16use vortex_array::ArrayRef;
17use vortex_array::dtype::DType;
18use vortex_array::dtype::Field;
19use vortex_array::dtype::FieldMask;
20use vortex_array::dtype::FieldName;
21use vortex_array::dtype::FieldPath;
22use vortex_array::expr::Expression;
23use vortex_array::expr::analysis::immediate_access::immediate_scope_access;
24use vortex_array::expr::root;
25use vortex_array::iter::ArrayIterator;
26use vortex_array::iter::ArrayIteratorAdapter;
27use vortex_array::stats::StatsSet;
28use vortex_array::stream::ArrayStream;
29use vortex_array::stream::ArrayStreamAdapter;
30use vortex_buffer::Buffer;
31use vortex_error::VortexExpect;
32use vortex_error::VortexResult;
33use vortex_error::vortex_bail;
34use vortex_io::runtime::BlockingRuntime;
35use vortex_io::runtime::Handle;
36use vortex_io::runtime::Task;
37use vortex_io::session::RuntimeSessionExt;
38use vortex_metrics::MetricsRegistry;
39use vortex_scan::selection::Selection;
40use vortex_session::VortexSession;
41use vortex_utils::parallelism::get_available_parallelism;
42
43use crate::LayoutReader;
44use crate::LayoutReaderRef;
45use crate::layouts::row_idx::RowIdxLayoutReader;
46use crate::scan::repeated_scan::RepeatedScan;
47use crate::scan::split_by::SplitBy;
48use crate::scan::splits::Splits;
49use crate::scan::splits::attempt_split_ranges;
50
51/// A struct for building a scan operation.
52pub struct ScanBuilder<A> {
53    session: VortexSession,
54    layout_reader: LayoutReaderRef,
55    projection: Expression,
56    filter: Option<Expression>,
57    /// Whether the scan needs to return splits in the order they appear in the file.
58    ordered: bool,
59    /// Optionally read a subset of the rows in the file.
60    row_range: Option<Range<u64>>,
61    /// The selection mask to apply to the selected row range.
62    // TODO(joe): replace this is usage of row_id selection, see
63    selection: Selection,
64    /// How to split the file for concurrent processing.
65    split_by: SplitBy,
66    /// The number of splits to make progress on concurrently **per-thread**.
67    concurrency: usize,
68    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
69    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
70    metrics_registry: Option<Arc<dyn MetricsRegistry>>,
71    /// Should we try to prune the file (using stats) on open.
72    file_stats: Option<Arc<[StatsSet]>>,
73    /// Maximal number of rows to read (after filtering)
74    limit: Option<u64>,
75    /// The row-offset assigned to the first row of the file. Used by the `row_idx` expression,
76    /// but not by the scan [`Selection`] which remains relative.
77    row_offset: u64,
78}
79
80impl ScanBuilder<ArrayRef> {
81    pub fn new(session: VortexSession, layout_reader: Arc<dyn LayoutReader>) -> Self {
82        Self {
83            session,
84            layout_reader,
85            projection: root(),
86            filter: None,
87            ordered: true,
88            row_range: None,
89            selection: Default::default(),
90            split_by: SplitBy::Layout,
91            // We default to four tasks per worker thread, which allows for some I/O lookahead
92            // without too much impact on work-stealing.
93            concurrency: 4,
94            map_fn: Arc::new(Ok),
95            metrics_registry: None,
96            file_stats: None,
97            limit: None,
98            row_offset: 0,
99        }
100    }
101
102    /// Returns an [`ArrayStream`] with tasks spawned onto the session's runtime handle.
103    ///
104    /// See [`ScanBuilder::into_stream`] for more details.
105    pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
106        let dtype = self.dtype()?;
107        let stream = self.into_stream()?;
108        Ok(ArrayStreamAdapter::new(dtype, stream))
109    }
110
111    /// Returns an [`ArrayIterator`] using the given blocking runtime.
112    pub fn into_array_iter<B: BlockingRuntime>(
113        self,
114        runtime: &B,
115    ) -> VortexResult<impl ArrayIterator + 'static> {
116        let stream = self.into_array_stream()?;
117        let dtype = stream.dtype().clone();
118        Ok(ArrayIteratorAdapter::new(
119            dtype,
120            runtime.block_on_stream(stream),
121        ))
122    }
123}
124
125impl<A: 'static + Send> ScanBuilder<A> {
126    pub fn with_filter(mut self, filter: Expression) -> Self {
127        self.filter = Some(filter);
128        self
129    }
130
131    pub fn with_some_filter(mut self, filter: Option<Expression>) -> Self {
132        self.filter = filter;
133        self
134    }
135
136    pub fn with_projection(mut self, projection: Expression) -> Self {
137        self.projection = projection;
138        self
139    }
140
141    pub fn ordered(&self) -> bool {
142        self.ordered
143    }
144
145    pub fn with_ordered(mut self, ordered: bool) -> Self {
146        self.ordered = ordered;
147        self
148    }
149
150    pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
151        self.row_range = Some(row_range);
152        self
153    }
154
155    pub fn with_selection(mut self, selection: Selection) -> Self {
156        self.selection = selection;
157        self
158    }
159
160    pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
161        self.selection = Selection::IncludeByIndex(row_indices);
162        self
163    }
164
165    pub fn with_row_offset(mut self, row_offset: u64) -> Self {
166        self.row_offset = row_offset;
167        self
168    }
169
170    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
171        self.split_by = split_by;
172        self
173    }
174
175    pub fn concurrency(&self) -> usize {
176        self.concurrency
177    }
178
179    /// The number of row splits to make progress on concurrently per-thread, must
180    /// be greater than 0.
181    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
182        assert!(concurrency > 0);
183        self.concurrency = concurrency;
184        self
185    }
186
187    pub fn with_some_metrics_registry(mut self, metrics: Option<Arc<dyn MetricsRegistry>>) -> Self {
188        self.metrics_registry = metrics;
189        self
190    }
191
192    pub fn with_metrics_registry(mut self, metrics: Arc<dyn MetricsRegistry>) -> Self {
193        self.metrics_registry = Some(metrics);
194        self
195    }
196
197    pub fn with_some_limit(mut self, limit: Option<u64>) -> Self {
198        self.limit = limit;
199        self
200    }
201
202    pub fn with_limit(mut self, limit: u64) -> Self {
203        self.limit = Some(limit);
204        self
205    }
206
207    /// The [`DType`] returned by the scan, after applying the projection.
208    pub fn dtype(&self) -> VortexResult<DType> {
209        self.projection.return_dtype(self.layout_reader.dtype())
210    }
211
212    /// The session used by the scan.
213    pub fn session(&self) -> &VortexSession {
214        &self.session
215    }
216
217    /// Map each split of the scan. The function will be run on the spawned task.
218    pub fn map<B: 'static>(
219        self,
220        map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
221    ) -> ScanBuilder<B> {
222        let old_map_fn = self.map_fn;
223        ScanBuilder {
224            session: self.session,
225            layout_reader: self.layout_reader,
226            projection: self.projection,
227            filter: self.filter,
228            ordered: self.ordered,
229            row_range: self.row_range,
230            selection: self.selection,
231            split_by: self.split_by,
232            concurrency: self.concurrency,
233            metrics_registry: self.metrics_registry,
234            file_stats: self.file_stats,
235            limit: self.limit,
236            row_offset: self.row_offset,
237            map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
238        }
239    }
240
241    pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
242        let dtype = self.dtype()?;
243
244        if self.filter.is_some() && self.limit.is_some() {
245            vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
246        }
247
248        // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform
249        // conjunction splitting if a filter is provided.
250        let mut layout_reader = self.layout_reader;
251
252        // Enrich the layout reader to support RowIdx expressions.
253        // Note that this is applied below the filter layout reader since it can perform
254        // better over individual conjunctions.
255        layout_reader = Arc::new(RowIdxLayoutReader::new(
256            self.row_offset,
257            layout_reader,
258            self.session.clone(),
259        ));
260
261        // Normalize and simplify the expressions.
262        let projection = self.projection.optimize_recursive(layout_reader.dtype())?;
263
264        let filter = self
265            .filter
266            .map(|f| f.optimize_recursive(layout_reader.dtype()))
267            .transpose()?;
268
269        // Construct field masks and compute the row splits of the scan.
270        let (filter_mask, projection_mask) =
271            filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
272        let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
273
274        let splits =
275            if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) {
276                Splits::Ranges(ranges)
277            } else {
278                let split_range = self
279                    .row_range
280                    .clone()
281                    .unwrap_or_else(|| 0..layout_reader.row_count());
282                Splits::Natural(self.split_by.splits(
283                    layout_reader.as_ref(),
284                    &split_range,
285                    &field_mask,
286                )?)
287            };
288
289        Ok(RepeatedScan::new(
290            self.session.clone(),
291            layout_reader,
292            projection,
293            filter,
294            self.ordered,
295            self.row_range,
296            self.selection,
297            splits,
298            self.concurrency,
299            self.map_fn,
300            self.limit,
301            dtype,
302        ))
303    }
304
305    /// Constructs a task per row split of the scan, returned as a vector of futures.
306    pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
307        // The ultimate short circuit
308        if self.limit.is_some_and(|l| l == 0) {
309            return Ok(vec![]);
310        }
311
312        self.prepare()?.execute(None)
313    }
314
315    /// Returns a [`Stream`] with tasks spawned onto the session's runtime handle.
316    pub fn into_stream(
317        self,
318    ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
319        Ok(LazyScanStream::new(self))
320    }
321
322    /// Returns an [`Iterator`] using the session's runtime.
323    pub fn into_iter<B: BlockingRuntime>(
324        self,
325        runtime: &B,
326    ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
327        let stream = self.into_stream()?;
328        Ok(runtime.block_on_stream(stream))
329    }
330}
331
332enum LazyScanState<A: 'static + Send> {
333    Builder(Option<Box<ScanBuilder<A>>>),
334    Preparing(PreparingScan<A>),
335    Stream(BoxStream<'static, VortexResult<A>>),
336    Error(Option<vortex_error::VortexError>),
337}
338
339type PreparedScanTasks<A> = Vec<BoxFuture<'static, VortexResult<Option<A>>>>;
340
341struct PreparingScan<A: 'static + Send> {
342    ordered: bool,
343    concurrency: usize,
344    handle: Handle,
345    task: Task<VortexResult<PreparedScanTasks<A>>>,
346}
347
348struct LazyScanStream<A: 'static + Send> {
349    state: LazyScanState<A>,
350}
351
352impl<A: 'static + Send> LazyScanStream<A> {
353    fn new(builder: ScanBuilder<A>) -> Self {
354        Self {
355            state: LazyScanState::Builder(Some(Box::new(builder))),
356        }
357    }
358}
359
360impl<A: 'static + Send> Unpin for LazyScanStream<A> {}
361
362impl<A: 'static + Send> Stream for LazyScanStream<A> {
363    type Item = VortexResult<A>;
364
365    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
366        loop {
367            match &mut self.state {
368                LazyScanState::Builder(builder) => {
369                    let builder = builder.take().vortex_expect("polled after completion");
370                    let ordered = builder.ordered;
371                    let num_workers = get_available_parallelism().unwrap_or(1);
372                    let concurrency = builder.concurrency * num_workers;
373                    let handle = builder.session.handle();
374                    let task = handle.spawn_blocking(move || {
375                        builder.prepare().and_then(|scan| scan.execute(None))
376                    });
377                    self.state = LazyScanState::Preparing(PreparingScan {
378                        ordered,
379                        concurrency,
380                        handle,
381                        task,
382                    });
383                }
384                LazyScanState::Preparing(preparing) => {
385                    match ready!(Pin::new(&mut preparing.task).poll(cx)) {
386                        Ok(tasks) => {
387                            let ordered = preparing.ordered;
388                            let concurrency = preparing.concurrency;
389                            let handle = preparing.handle.clone();
390                            let stream =
391                                futures::stream::iter(tasks).map(move |task| handle.spawn(task));
392                            let stream = if ordered {
393                                stream.buffered(concurrency).boxed()
394                            } else {
395                                stream.buffer_unordered(concurrency).boxed()
396                            };
397                            let stream = stream
398                                .filter_map(|chunk| async move { chunk.transpose() })
399                                .boxed();
400                            self.state = LazyScanState::Stream(stream);
401                        }
402                        Err(err) => self.state = LazyScanState::Error(Some(err)),
403                    }
404                }
405                LazyScanState::Stream(stream) => return stream.as_mut().poll_next(cx),
406                LazyScanState::Error(err) => return Poll::Ready(err.take().map(Err)),
407            }
408        }
409    }
410}
411
412/// Compute masks of field paths referenced by the projection and filter in the scan.
413///
414/// Projection and filter must be pre-simplified.
415pub fn filter_and_projection_masks(
416    projection: &Expression,
417    filter: Option<&Expression>,
418    dtype: &DType,
419) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
420    let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
421        return Ok(match filter {
422            Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
423            None => (Vec::new(), vec![FieldMask::All]),
424        });
425    };
426    let projection_mask = immediate_scope_access(projection, struct_dtype);
427    Ok(match filter {
428        None => (
429            Vec::new(),
430            projection_mask.into_iter().map(to_field_mask).collect_vec(),
431        ),
432        Some(f) => {
433            let filter_mask = immediate_scope_access(f, struct_dtype);
434            let only_projection_mask = projection_mask
435                .difference(&filter_mask)
436                .cloned()
437                .map(to_field_mask)
438                .collect_vec();
439            (
440                filter_mask.into_iter().map(to_field_mask).collect_vec(),
441                only_projection_mask,
442            )
443        }
444    })
445}
446
447fn to_field_mask(field: FieldName) -> FieldMask {
448    FieldMask::Prefix(FieldPath::from(Field::Name(field)))
449}
450
451#[cfg(test)]
452mod test {
453    use std::collections::BTreeSet;
454    use std::ops::Range;
455    use std::pin::Pin;
456    use std::sync::Arc;
457    use std::sync::atomic::AtomicUsize;
458    use std::sync::atomic::Ordering;
459    use std::task::Context;
460    use std::task::Poll;
461    use std::time::Duration;
462
463    use futures::Stream;
464    use futures::task::noop_waker_ref;
465    use parking_lot::Mutex;
466    use vortex_array::IntoArray;
467    use vortex_array::LEGACY_SESSION;
468    use vortex_array::MaskFuture;
469    use vortex_array::VortexSessionExecute;
470    use vortex_array::arrays::PrimitiveArray;
471    use vortex_array::dtype::DType;
472    use vortex_array::dtype::FieldMask;
473    use vortex_array::dtype::Nullability;
474    use vortex_array::dtype::PType;
475    use vortex_array::expr::Expression;
476    use vortex_error::VortexResult;
477    use vortex_error::vortex_err;
478    use vortex_io::runtime::BlockingRuntime;
479    use vortex_io::runtime::single::SingleThreadRuntime;
480    use vortex_mask::Mask;
481
482    use super::ScanBuilder;
483    use crate::ArrayFuture;
484    use crate::LayoutReader;
485
486    #[derive(Debug)]
487    struct CountingLayoutReader {
488        name: Arc<str>,
489        dtype: DType,
490        row_count: u64,
491        register_splits_calls: Arc<AtomicUsize>,
492    }
493
494    impl CountingLayoutReader {
495        fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
496            Self {
497                name: Arc::from("counting"),
498                dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
499                row_count: 1,
500                register_splits_calls,
501            }
502        }
503    }
504
505    impl LayoutReader for CountingLayoutReader {
506        fn name(&self) -> &Arc<str> {
507            &self.name
508        }
509
510        fn dtype(&self) -> &DType {
511            &self.dtype
512        }
513
514        fn row_count(&self) -> u64 {
515            self.row_count
516        }
517
518        fn register_splits(
519            &self,
520            _field_mask: &[FieldMask],
521            row_range: &Range<u64>,
522            splits: &mut BTreeSet<u64>,
523        ) -> VortexResult<()> {
524            self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
525            splits.insert(row_range.end);
526            Ok(())
527        }
528
529        fn pruning_evaluation(
530            &self,
531            _row_range: &Range<u64>,
532            _expr: &Expression,
533            _mask: Mask,
534        ) -> VortexResult<MaskFuture> {
535            unimplemented!("not needed for this test");
536        }
537
538        fn filter_evaluation(
539            &self,
540            _row_range: &Range<u64>,
541            _expr: &Expression,
542            _mask: MaskFuture,
543        ) -> VortexResult<MaskFuture> {
544            unimplemented!("not needed for this test");
545        }
546
547        fn projection_evaluation(
548            &self,
549            _row_range: &Range<u64>,
550            _expr: &Expression,
551            _mask: MaskFuture,
552        ) -> VortexResult<ArrayFuture> {
553            Ok(Box::pin(async move {
554                unreachable!("scan should not be polled in this test")
555            }))
556        }
557
558        fn as_any(&self) -> &dyn std::any::Any {
559            self
560        }
561    }
562
563    #[test]
564    fn into_stream_is_lazy() {
565        let calls = Arc::new(AtomicUsize::new(0));
566        let reader = Arc::new(CountingLayoutReader::new(Arc::clone(&calls)));
567
568        let session = crate::scan::test::SCAN_SESSION.clone();
569
570        let _stream = ScanBuilder::new(session, reader).into_stream().unwrap();
571
572        assert_eq!(calls.load(Ordering::Relaxed), 0);
573    }
574
575    #[derive(Debug)]
576    struct SplittingLayoutReader {
577        name: Arc<str>,
578        dtype: DType,
579        row_count: u64,
580        register_splits_calls: Arc<AtomicUsize>,
581    }
582
583    impl SplittingLayoutReader {
584        fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
585            Self {
586                name: Arc::from("splitting"),
587                dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
588                row_count: 4,
589                register_splits_calls,
590            }
591        }
592    }
593
594    impl LayoutReader for SplittingLayoutReader {
595        fn name(&self) -> &Arc<str> {
596            &self.name
597        }
598
599        fn dtype(&self) -> &DType {
600            &self.dtype
601        }
602
603        fn row_count(&self) -> u64 {
604            self.row_count
605        }
606
607        fn register_splits(
608            &self,
609            _field_mask: &[FieldMask],
610            row_range: &Range<u64>,
611            splits: &mut BTreeSet<u64>,
612        ) -> VortexResult<()> {
613            self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
614            for split in (row_range.start + 1)..=row_range.end {
615                splits.insert(split);
616            }
617            Ok(())
618        }
619
620        fn pruning_evaluation(
621            &self,
622            _row_range: &Range<u64>,
623            _expr: &Expression,
624            mask: Mask,
625        ) -> VortexResult<MaskFuture> {
626            Ok(MaskFuture::ready(mask))
627        }
628
629        fn filter_evaluation(
630            &self,
631            _row_range: &Range<u64>,
632            _expr: &Expression,
633            mask: MaskFuture,
634        ) -> VortexResult<MaskFuture> {
635            Ok(mask)
636        }
637
638        fn projection_evaluation(
639            &self,
640            row_range: &Range<u64>,
641            _expr: &Expression,
642            _mask: MaskFuture,
643        ) -> VortexResult<ArrayFuture> {
644            let start = usize::try_from(row_range.start)
645                .map_err(|_| vortex_err!("row_range.start must fit in usize"))?;
646            let end = usize::try_from(row_range.end)
647                .map_err(|_| vortex_err!("row_range.end must fit in usize"))?;
648
649            let values: VortexResult<Vec<i32>> = (start..end)
650                .map(|v| i32::try_from(v).map_err(|_| vortex_err!("split value must fit in i32")))
651                .collect();
652
653            let array = PrimitiveArray::from_iter(values?).into_array();
654            Ok(Box::pin(async move { Ok(array) }))
655        }
656
657        fn as_any(&self) -> &dyn std::any::Any {
658            self
659        }
660    }
661
662    #[test]
663    fn into_stream_executes_after_prepare() -> VortexResult<()> {
664        let mut ctx = LEGACY_SESSION.create_execution_ctx();
665        let calls = Arc::new(AtomicUsize::new(0));
666        let reader = Arc::new(SplittingLayoutReader::new(Arc::clone(&calls)));
667
668        let runtime = SingleThreadRuntime::default();
669        let session = crate::scan::test::session_with_handle(runtime.handle());
670
671        let stream = ScanBuilder::new(session, reader).into_stream()?;
672        let mut iter = runtime.block_on_stream(stream);
673
674        let mut values = Vec::new();
675        for chunk in &mut iter {
676            let prim = chunk?.execute::<PrimitiveArray>(&mut ctx)?;
677            values.push(prim.into_buffer::<i32>()[0]);
678        }
679
680        assert_eq!(calls.load(Ordering::Relaxed), 1);
681        assert_eq!(values.as_ref(), [0, 1, 2, 3]);
682
683        Ok(())
684    }
685
686    #[derive(Debug)]
687    struct BlockingSplitsLayoutReader {
688        name: Arc<str>,
689        dtype: DType,
690        row_count: u64,
691        register_splits_calls: Arc<AtomicUsize>,
692        gate: Arc<Mutex<()>>,
693    }
694
695    impl BlockingSplitsLayoutReader {
696        fn new(gate: Arc<Mutex<()>>, register_splits_calls: Arc<AtomicUsize>) -> Self {
697            Self {
698                name: Arc::from("blocking-splits"),
699                dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
700                row_count: 1,
701                register_splits_calls,
702                gate,
703            }
704        }
705    }
706
707    impl LayoutReader for BlockingSplitsLayoutReader {
708        fn name(&self) -> &Arc<str> {
709            &self.name
710        }
711
712        fn dtype(&self) -> &DType {
713            &self.dtype
714        }
715
716        fn row_count(&self) -> u64 {
717            self.row_count
718        }
719
720        fn register_splits(
721            &self,
722            _field_mask: &[FieldMask],
723            row_range: &Range<u64>,
724            splits: &mut BTreeSet<u64>,
725        ) -> VortexResult<()> {
726            self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
727            let _guard = self.gate.lock();
728            splits.insert(row_range.end);
729            Ok(())
730        }
731
732        fn pruning_evaluation(
733            &self,
734            _row_range: &Range<u64>,
735            _expr: &Expression,
736            _mask: Mask,
737        ) -> VortexResult<MaskFuture> {
738            unimplemented!("not needed for this test");
739        }
740
741        fn filter_evaluation(
742            &self,
743            _row_range: &Range<u64>,
744            _expr: &Expression,
745            _mask: MaskFuture,
746        ) -> VortexResult<MaskFuture> {
747            unimplemented!("not needed for this test");
748        }
749
750        fn projection_evaluation(
751            &self,
752            _row_range: &Range<u64>,
753            _expr: &Expression,
754            _mask: MaskFuture,
755        ) -> VortexResult<ArrayFuture> {
756            Ok(Box::pin(async move {
757                unreachable!("scan should not be polled in this test")
758            }))
759        }
760
761        fn as_any(&self) -> &dyn std::any::Any {
762            self
763        }
764    }
765
766    #[test]
767    fn into_stream_first_poll_does_not_block() {
768        let gate = Arc::new(Mutex::new(()));
769        let guard = gate.lock();
770
771        let calls = Arc::new(AtomicUsize::new(0));
772        let reader = Arc::new(BlockingSplitsLayoutReader::new(
773            Arc::clone(&gate),
774            Arc::clone(&calls),
775        ));
776
777        let runtime = SingleThreadRuntime::default();
778        let session = crate::scan::test::session_with_handle(runtime.handle());
779
780        let mut stream = ScanBuilder::new(session, reader).into_stream().unwrap();
781
782        let (send, recv) = std::sync::mpsc::channel::<bool>();
783        let join = std::thread::spawn(move || {
784            let waker = noop_waker_ref();
785            let mut cx = Context::from_waker(waker);
786            let poll = Pin::new(&mut stream).poll_next(&mut cx);
787            let _ = send.send(matches!(poll, Poll::Pending));
788        });
789
790        let polled_pending = recv.recv_timeout(Duration::from_secs(1)).ok();
791
792        // Always release the gate and join the thread so failures don't hang the test process.
793        drop(guard);
794        drop(join.join());
795
796        let polled_pending = polled_pending.expect("poll_next blocked; expected quick return");
797        assert!(
798            polled_pending,
799            "expected Poll::Pending while prepare is blocked"
800        );
801        assert_eq!(calls.load(Ordering::Relaxed), 0);
802
803        drop(runtime);
804    }
805}