Skip to main content

vortex_scan/
scan_builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::Stream;
12use futures::StreamExt;
13use futures::future::BoxFuture;
14use futures::stream::BoxStream;
15use itertools::Itertools;
16use vortex_array::ArrayRef;
17use vortex_array::expr::Expression;
18use vortex_array::expr::analysis::immediate_access::immediate_scope_access;
19use vortex_array::expr::root;
20use vortex_array::iter::ArrayIterator;
21use vortex_array::iter::ArrayIteratorAdapter;
22use vortex_array::stats::StatsSet;
23use vortex_array::stream::ArrayStream;
24use vortex_array::stream::ArrayStreamAdapter;
25use vortex_buffer::Buffer;
26use vortex_dtype::DType;
27use vortex_dtype::Field;
28use vortex_dtype::FieldMask;
29use vortex_dtype::FieldName;
30use vortex_dtype::FieldPath;
31use vortex_error::VortexExpect;
32use vortex_error::VortexResult;
33use vortex_error::vortex_bail;
34use vortex_io::runtime::BlockingRuntime;
35use vortex_io::runtime::Handle;
36use vortex_io::runtime::Task;
37use vortex_io::session::RuntimeSessionExt;
38use vortex_layout::LayoutReader;
39use vortex_layout::LayoutReaderRef;
40use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
41use vortex_metrics::MetricsRegistry;
42use vortex_session::VortexSession;
43
44use crate::RepeatedScan;
45use crate::selection::Selection;
46use crate::split_by::SplitBy;
47use crate::splits::Splits;
48use crate::splits::attempt_split_ranges;
49
50/// A struct for building a scan operation.
51pub struct ScanBuilder<A> {
52    session: VortexSession,
53    layout_reader: LayoutReaderRef,
54    projection: Expression,
55    filter: Option<Expression>,
56    /// Whether the scan needs to return splits in the order they appear in the file.
57    ordered: bool,
58    /// Optionally read a subset of the rows in the file.
59    row_range: Option<Range<u64>>,
60    /// The selection mask to apply to the selected row range.
61    // TODO(joe): replace this is usage of row_id selection, see
62    selection: Selection,
63    /// How to split the file for concurrent processing.
64    split_by: SplitBy,
65    /// The number of splits to make progress on concurrently **per-thread**.
66    concurrency: usize,
67    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
68    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
69    metrics_registry: Option<Arc<dyn MetricsRegistry>>,
70    /// Should we try to prune the file (using stats) on open.
71    file_stats: Option<Arc<[StatsSet]>>,
72    /// Maximal number of rows to read (after filtering)
73    limit: Option<u64>,
74    /// The row-offset assigned to the first row of the file. Used by the `row_idx` expression,
75    /// but not by the scan [`Selection`] which remains relative.
76    row_offset: u64,
77}
78
79impl ScanBuilder<ArrayRef> {
80    pub fn new(session: VortexSession, layout_reader: Arc<dyn LayoutReader>) -> Self {
81        Self {
82            session,
83            layout_reader,
84            projection: root(),
85            filter: None,
86            ordered: true,
87            row_range: None,
88            selection: Default::default(),
89            split_by: SplitBy::Layout,
90            // We default to four tasks per worker thread, which allows for some I/O lookahead
91            // without too much impact on work-stealing.
92            concurrency: 4,
93            map_fn: Arc::new(Ok),
94            metrics_registry: None,
95            file_stats: None,
96            limit: None,
97            row_offset: 0,
98        }
99    }
100
101    /// Returns an [`ArrayStream`] with tasks spawned onto the session's runtime handle.
102    ///
103    /// See [`ScanBuilder::into_stream`] for more details.
104    pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
105        let dtype = self.dtype()?;
106        let stream = self.into_stream()?;
107        Ok(ArrayStreamAdapter::new(dtype, stream))
108    }
109
110    /// Returns an [`ArrayIterator`] using the given blocking runtime.
111    pub fn into_array_iter<B: BlockingRuntime>(
112        self,
113        runtime: &B,
114    ) -> VortexResult<impl ArrayIterator + 'static> {
115        let stream = self.into_array_stream()?;
116        let dtype = stream.dtype().clone();
117        Ok(ArrayIteratorAdapter::new(
118            dtype,
119            runtime.block_on_stream(stream),
120        ))
121    }
122}
123
124impl<A: 'static + Send> ScanBuilder<A> {
125    pub fn with_filter(mut self, filter: Expression) -> Self {
126        self.filter = Some(filter);
127        self
128    }
129
130    pub fn with_some_filter(mut self, filter: Option<Expression>) -> Self {
131        self.filter = filter;
132        self
133    }
134
135    pub fn with_projection(mut self, projection: Expression) -> Self {
136        self.projection = projection;
137        self
138    }
139
140    pub fn with_ordered(mut self, ordered: bool) -> Self {
141        self.ordered = ordered;
142        self
143    }
144
145    pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
146        self.row_range = Some(row_range);
147        self
148    }
149
150    pub fn with_selection(mut self, selection: Selection) -> Self {
151        self.selection = selection;
152        self
153    }
154
155    pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
156        self.selection = Selection::IncludeByIndex(row_indices);
157        self
158    }
159
160    pub fn with_row_offset(mut self, row_offset: u64) -> Self {
161        self.row_offset = row_offset;
162        self
163    }
164
165    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
166        self.split_by = split_by;
167        self
168    }
169
170    /// The number of row splits to make progress on concurrently per-thread, must
171    /// be greater than 0.
172    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
173        assert!(concurrency > 0);
174        self.concurrency = concurrency;
175        self
176    }
177
178    pub fn with_metrics_registry(mut self, metrics: Arc<dyn MetricsRegistry>) -> Self {
179        self.metrics_registry = Some(metrics);
180        self
181    }
182
183    pub fn with_limit(mut self, limit: u64) -> Self {
184        self.limit = Some(limit);
185        self
186    }
187
188    /// The [`DType`] returned by the scan, after applying the projection.
189    pub fn dtype(&self) -> VortexResult<DType> {
190        self.projection.return_dtype(self.layout_reader.dtype())
191    }
192
193    /// The session used by the scan.
194    pub fn session(&self) -> &VortexSession {
195        &self.session
196    }
197
198    /// Map each split of the scan. The function will be run on the spawned task.
199    pub fn map<B: 'static>(
200        self,
201        map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
202    ) -> ScanBuilder<B> {
203        let old_map_fn = self.map_fn;
204        ScanBuilder {
205            session: self.session,
206            layout_reader: self.layout_reader,
207            projection: self.projection,
208            filter: self.filter,
209            ordered: self.ordered,
210            row_range: self.row_range,
211            selection: self.selection,
212            split_by: self.split_by,
213            concurrency: self.concurrency,
214            metrics_registry: self.metrics_registry,
215            file_stats: self.file_stats,
216            limit: self.limit,
217            row_offset: self.row_offset,
218            map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
219        }
220    }
221
222    pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
223        let dtype = self.dtype()?;
224
225        if self.filter.is_some() && self.limit.is_some() {
226            vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
227        }
228
229        // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform
230        // conjunction splitting if a filter is provided.
231        let mut layout_reader = self.layout_reader;
232
233        // Enrich the layout reader to support RowIdx expressions.
234        // Note that this is applied below the filter layout reader since it can perform
235        // better over individual conjunctions.
236        layout_reader = Arc::new(RowIdxLayoutReader::new(
237            self.row_offset,
238            layout_reader,
239            self.session.clone(),
240        ));
241
242        // Normalize and simplify the expressions.
243        let projection = self.projection.optimize_recursive(layout_reader.dtype())?;
244
245        let filter = self
246            .filter
247            .map(|f| f.optimize_recursive(layout_reader.dtype()))
248            .transpose()?;
249
250        // Construct field masks and compute the row splits of the scan.
251        let (filter_mask, projection_mask) =
252            filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
253        let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
254
255        let splits =
256            if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) {
257                Splits::Ranges(ranges)
258            } else {
259                let split_range = self
260                    .row_range
261                    .clone()
262                    .unwrap_or_else(|| 0..layout_reader.row_count());
263                Splits::Natural(self.split_by.splits(
264                    layout_reader.as_ref(),
265                    &split_range,
266                    &field_mask,
267                )?)
268            };
269
270        Ok(RepeatedScan::new(
271            self.session.clone(),
272            layout_reader,
273            projection,
274            filter,
275            self.ordered,
276            self.row_range,
277            self.selection,
278            splits,
279            self.concurrency,
280            self.map_fn,
281            self.limit,
282            dtype,
283        ))
284    }
285
286    /// Constructs a task per row split of the scan, returned as a vector of futures.
287    pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
288        // The ultimate short circuit
289        if self.limit.is_some_and(|l| l == 0) {
290            return Ok(vec![]);
291        }
292
293        self.prepare()?.execute(None)
294    }
295
296    /// Returns a [`Stream`] with tasks spawned onto the session's runtime handle.
297    pub fn into_stream(
298        self,
299    ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
300        Ok(LazyScanStream::new(self))
301    }
302
303    /// Returns an [`Iterator`] using the session's runtime.
304    pub fn into_iter<B: BlockingRuntime>(
305        self,
306        runtime: &B,
307    ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
308        let stream = self.into_stream()?;
309        Ok(runtime.block_on_stream(stream))
310    }
311}
312
313enum LazyScanState<A: 'static + Send> {
314    Builder(Option<Box<ScanBuilder<A>>>),
315    Preparing(PreparingScan<A>),
316    Stream(BoxStream<'static, VortexResult<A>>),
317    Error(Option<vortex_error::VortexError>),
318}
319
320type PreparedScanTasks<A> = Vec<BoxFuture<'static, VortexResult<Option<A>>>>;
321
322struct PreparingScan<A: 'static + Send> {
323    ordered: bool,
324    concurrency: usize,
325    handle: Handle,
326    task: Task<VortexResult<PreparedScanTasks<A>>>,
327}
328
329struct LazyScanStream<A: 'static + Send> {
330    state: LazyScanState<A>,
331}
332
333impl<A: 'static + Send> LazyScanStream<A> {
334    fn new(builder: ScanBuilder<A>) -> Self {
335        Self {
336            state: LazyScanState::Builder(Some(Box::new(builder))),
337        }
338    }
339}
340
341impl<A: 'static + Send> Unpin for LazyScanStream<A> {}
342
343impl<A: 'static + Send> Stream for LazyScanStream<A> {
344    type Item = VortexResult<A>;
345
346    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
347        loop {
348            match &mut self.state {
349                LazyScanState::Builder(builder) => {
350                    let builder = builder.take().vortex_expect("polled after completion");
351                    let ordered = builder.ordered;
352                    let num_workers = std::thread::available_parallelism()
353                        .map(|n| n.get())
354                        .unwrap_or(1);
355                    let concurrency = builder.concurrency * num_workers;
356                    let handle = builder.session.handle();
357                    let task = handle.spawn_blocking(move || {
358                        builder.prepare().and_then(|scan| scan.execute(None))
359                    });
360                    self.state = LazyScanState::Preparing(PreparingScan {
361                        ordered,
362                        concurrency,
363                        handle,
364                        task,
365                    });
366                }
367                LazyScanState::Preparing(preparing) => {
368                    match ready!(Pin::new(&mut preparing.task).poll(cx)) {
369                        Ok(tasks) => {
370                            let ordered = preparing.ordered;
371                            let concurrency = preparing.concurrency;
372                            let handle = preparing.handle.clone();
373                            let stream =
374                                futures::stream::iter(tasks).map(move |task| handle.spawn(task));
375                            let stream = if ordered {
376                                stream.buffered(concurrency).boxed()
377                            } else {
378                                stream.buffer_unordered(concurrency).boxed()
379                            };
380                            let stream = stream
381                                .filter_map(|chunk| async move { chunk.transpose() })
382                                .boxed();
383                            self.state = LazyScanState::Stream(stream);
384                        }
385                        Err(err) => self.state = LazyScanState::Error(Some(err)),
386                    }
387                }
388                LazyScanState::Stream(stream) => return stream.as_mut().poll_next(cx),
389                LazyScanState::Error(err) => return Poll::Ready(err.take().map(Err)),
390            }
391        }
392    }
393}
394
395/// Compute masks of field paths referenced by the projection and filter in the scan.
396///
397/// Projection and filter must be pre-simplified.
398pub(crate) fn filter_and_projection_masks(
399    projection: &Expression,
400    filter: Option<&Expression>,
401    dtype: &DType,
402) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
403    let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
404        return Ok(match filter {
405            Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
406            None => (Vec::new(), vec![FieldMask::All]),
407        });
408    };
409    let projection_mask = immediate_scope_access(projection, struct_dtype);
410    Ok(match filter {
411        None => (
412            Vec::new(),
413            projection_mask.into_iter().map(to_field_mask).collect_vec(),
414        ),
415        Some(f) => {
416            let filter_mask = immediate_scope_access(f, struct_dtype);
417            let only_projection_mask = projection_mask
418                .difference(&filter_mask)
419                .cloned()
420                .map(to_field_mask)
421                .collect_vec();
422            (
423                filter_mask.into_iter().map(to_field_mask).collect_vec(),
424                only_projection_mask,
425            )
426        }
427    })
428}
429
430fn to_field_mask(field: FieldName) -> FieldMask {
431    FieldMask::Prefix(FieldPath::from(Field::Name(field)))
432}
433
434#[cfg(test)]
435mod test {
436    use std::collections::BTreeSet;
437    use std::ops::Range;
438    use std::pin::Pin;
439    use std::sync::Arc;
440    use std::sync::atomic::AtomicUsize;
441    use std::sync::atomic::Ordering;
442    use std::task::Context;
443    use std::task::Poll;
444    use std::time::Duration;
445
446    use futures::Stream;
447    use futures::task::noop_waker_ref;
448    use parking_lot::Mutex;
449    use vortex_array::IntoArray;
450    use vortex_array::MaskFuture;
451    use vortex_array::ToCanonical;
452    use vortex_array::arrays::PrimitiveArray;
453    use vortex_array::expr::Expression;
454    use vortex_dtype::DType;
455    use vortex_dtype::FieldMask;
456    use vortex_dtype::Nullability;
457    use vortex_dtype::PType;
458    use vortex_error::VortexResult;
459    use vortex_error::vortex_err;
460    use vortex_io::runtime::BlockingRuntime;
461    use vortex_io::runtime::single::SingleThreadRuntime;
462    use vortex_layout::ArrayFuture;
463    use vortex_layout::LayoutReader;
464    use vortex_mask::Mask;
465
466    use super::ScanBuilder;
467
468    #[derive(Debug)]
469    struct CountingLayoutReader {
470        name: Arc<str>,
471        dtype: DType,
472        row_count: u64,
473        register_splits_calls: Arc<AtomicUsize>,
474    }
475
476    impl CountingLayoutReader {
477        fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
478            Self {
479                name: Arc::from("counting"),
480                dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
481                row_count: 1,
482                register_splits_calls,
483            }
484        }
485    }
486
487    impl LayoutReader for CountingLayoutReader {
488        fn name(&self) -> &Arc<str> {
489            &self.name
490        }
491
492        fn dtype(&self) -> &DType {
493            &self.dtype
494        }
495
496        fn row_count(&self) -> u64 {
497            self.row_count
498        }
499
500        fn register_splits(
501            &self,
502            _field_mask: &[FieldMask],
503            row_range: &Range<u64>,
504            splits: &mut BTreeSet<u64>,
505        ) -> VortexResult<()> {
506            self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
507            splits.insert(row_range.end);
508            Ok(())
509        }
510
511        fn pruning_evaluation(
512            &self,
513            _row_range: &Range<u64>,
514            _expr: &Expression,
515            _mask: Mask,
516        ) -> VortexResult<MaskFuture> {
517            unimplemented!("not needed for this test");
518        }
519
520        fn filter_evaluation(
521            &self,
522            _row_range: &Range<u64>,
523            _expr: &Expression,
524            _mask: MaskFuture,
525        ) -> VortexResult<MaskFuture> {
526            unimplemented!("not needed for this test");
527        }
528
529        fn projection_evaluation(
530            &self,
531            _row_range: &Range<u64>,
532            _expr: &Expression,
533            _mask: MaskFuture,
534        ) -> VortexResult<ArrayFuture> {
535            Ok(Box::pin(async move {
536                unreachable!("scan should not be polled in this test")
537            }))
538        }
539    }
540
541    #[test]
542    fn into_stream_is_lazy() {
543        let calls = Arc::new(AtomicUsize::new(0));
544        let reader = Arc::new(CountingLayoutReader::new(calls.clone()));
545
546        let session = crate::test::SCAN_SESSION.clone();
547
548        let _stream = ScanBuilder::new(session, reader).into_stream().unwrap();
549
550        assert_eq!(calls.load(Ordering::Relaxed), 0);
551    }
552
553    #[derive(Debug)]
554    struct SplittingLayoutReader {
555        name: Arc<str>,
556        dtype: DType,
557        row_count: u64,
558        register_splits_calls: Arc<AtomicUsize>,
559    }
560
561    impl SplittingLayoutReader {
562        fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
563            Self {
564                name: Arc::from("splitting"),
565                dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
566                row_count: 4,
567                register_splits_calls,
568            }
569        }
570    }
571
572    impl LayoutReader for SplittingLayoutReader {
573        fn name(&self) -> &Arc<str> {
574            &self.name
575        }
576
577        fn dtype(&self) -> &DType {
578            &self.dtype
579        }
580
581        fn row_count(&self) -> u64 {
582            self.row_count
583        }
584
585        fn register_splits(
586            &self,
587            _field_mask: &[FieldMask],
588            row_range: &Range<u64>,
589            splits: &mut BTreeSet<u64>,
590        ) -> VortexResult<()> {
591            self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
592            for split in (row_range.start + 1)..=row_range.end {
593                splits.insert(split);
594            }
595            Ok(())
596        }
597
598        fn pruning_evaluation(
599            &self,
600            _row_range: &Range<u64>,
601            _expr: &Expression,
602            mask: Mask,
603        ) -> VortexResult<MaskFuture> {
604            Ok(MaskFuture::ready(mask))
605        }
606
607        fn filter_evaluation(
608            &self,
609            _row_range: &Range<u64>,
610            _expr: &Expression,
611            mask: MaskFuture,
612        ) -> VortexResult<MaskFuture> {
613            Ok(mask)
614        }
615
616        fn projection_evaluation(
617            &self,
618            row_range: &Range<u64>,
619            _expr: &Expression,
620            _mask: MaskFuture,
621        ) -> VortexResult<ArrayFuture> {
622            let start = usize::try_from(row_range.start)
623                .map_err(|_| vortex_err!("row_range.start must fit in usize"))?;
624            let end = usize::try_from(row_range.end)
625                .map_err(|_| vortex_err!("row_range.end must fit in usize"))?;
626
627            let values: VortexResult<Vec<i32>> = (start..end)
628                .map(|v| i32::try_from(v).map_err(|_| vortex_err!("split value must fit in i32")))
629                .collect();
630
631            let array = PrimitiveArray::from_iter(values?).into_array();
632            Ok(Box::pin(async move { Ok(array) }))
633        }
634    }
635
636    #[test]
637    fn into_stream_executes_after_prepare() -> VortexResult<()> {
638        let calls = Arc::new(AtomicUsize::new(0));
639        let reader = Arc::new(SplittingLayoutReader::new(calls.clone()));
640
641        let runtime = SingleThreadRuntime::default();
642        let session = crate::test::session_with_handle(runtime.handle());
643
644        let stream = ScanBuilder::new(session, reader).into_stream().unwrap();
645        let mut iter = runtime.block_on_stream(stream);
646
647        let mut values = Vec::new();
648        for chunk in &mut iter {
649            values.push(chunk?.to_primitive().into_buffer::<i32>()[0]);
650        }
651
652        assert_eq!(calls.load(Ordering::Relaxed), 1);
653        assert_eq!(values.as_ref(), [0, 1, 2, 3]);
654
655        Ok(())
656    }
657
658    #[derive(Debug)]
659    struct BlockingSplitsLayoutReader {
660        name: Arc<str>,
661        dtype: DType,
662        row_count: u64,
663        register_splits_calls: Arc<AtomicUsize>,
664        gate: Arc<Mutex<()>>,
665    }
666
667    impl BlockingSplitsLayoutReader {
668        fn new(gate: Arc<Mutex<()>>, register_splits_calls: Arc<AtomicUsize>) -> Self {
669            Self {
670                name: Arc::from("blocking-splits"),
671                dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
672                row_count: 1,
673                register_splits_calls,
674                gate,
675            }
676        }
677    }
678
679    impl LayoutReader for BlockingSplitsLayoutReader {
680        fn name(&self) -> &Arc<str> {
681            &self.name
682        }
683
684        fn dtype(&self) -> &DType {
685            &self.dtype
686        }
687
688        fn row_count(&self) -> u64 {
689            self.row_count
690        }
691
692        fn register_splits(
693            &self,
694            _field_mask: &[FieldMask],
695            row_range: &Range<u64>,
696            splits: &mut BTreeSet<u64>,
697        ) -> VortexResult<()> {
698            self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
699            let _guard = self.gate.lock();
700            splits.insert(row_range.end);
701            Ok(())
702        }
703
704        fn pruning_evaluation(
705            &self,
706            _row_range: &Range<u64>,
707            _expr: &Expression,
708            _mask: Mask,
709        ) -> VortexResult<MaskFuture> {
710            unimplemented!("not needed for this test");
711        }
712
713        fn filter_evaluation(
714            &self,
715            _row_range: &Range<u64>,
716            _expr: &Expression,
717            _mask: MaskFuture,
718        ) -> VortexResult<MaskFuture> {
719            unimplemented!("not needed for this test");
720        }
721
722        fn projection_evaluation(
723            &self,
724            _row_range: &Range<u64>,
725            _expr: &Expression,
726            _mask: MaskFuture,
727        ) -> VortexResult<ArrayFuture> {
728            Ok(Box::pin(async move {
729                unreachable!("scan should not be polled in this test")
730            }))
731        }
732    }
733
734    #[test]
735    fn into_stream_first_poll_does_not_block() {
736        let gate = Arc::new(Mutex::new(()));
737        let guard = gate.lock();
738
739        let calls = Arc::new(AtomicUsize::new(0));
740        let reader = Arc::new(BlockingSplitsLayoutReader::new(gate.clone(), calls.clone()));
741
742        let runtime = SingleThreadRuntime::default();
743        let session = crate::test::session_with_handle(runtime.handle());
744
745        let mut stream = ScanBuilder::new(session, reader).into_stream().unwrap();
746
747        let (send, recv) = std::sync::mpsc::channel::<bool>();
748        let join = std::thread::spawn(move || {
749            let waker = noop_waker_ref();
750            let mut cx = Context::from_waker(waker);
751            let poll = Pin::new(&mut stream).poll_next(&mut cx);
752            let _ = send.send(matches!(poll, Poll::Pending));
753        });
754
755        let polled_pending = recv.recv_timeout(Duration::from_secs(1)).ok();
756
757        // Always release the gate and join the thread so failures don't hang the test process.
758        drop(guard);
759        drop(join.join());
760
761        let polled_pending = polled_pending.expect("poll_next blocked; expected quick return");
762        assert!(
763            polled_pending,
764            "expected Poll::Pending while prepare is blocked"
765        );
766        assert_eq!(calls.load(Ordering::Relaxed), 0);
767
768        drop(runtime);
769    }
770}