Skip to main content

vortex_scan/
scan_builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::Stream;
12use futures::StreamExt;
13use futures::future::BoxFuture;
14use futures::stream::BoxStream;
15use itertools::Itertools;
16use vortex_array::ArrayRef;
17use vortex_array::dtype::DType;
18use vortex_array::dtype::Field;
19use vortex_array::dtype::FieldMask;
20use vortex_array::dtype::FieldName;
21use vortex_array::dtype::FieldPath;
22use vortex_array::expr::Expression;
23use vortex_array::expr::analysis::immediate_access::immediate_scope_access;
24use vortex_array::expr::root;
25use vortex_array::iter::ArrayIterator;
26use vortex_array::iter::ArrayIteratorAdapter;
27use vortex_array::stats::StatsSet;
28use vortex_array::stream::ArrayStream;
29use vortex_array::stream::ArrayStreamAdapter;
30use vortex_buffer::Buffer;
31use vortex_error::VortexExpect;
32use vortex_error::VortexResult;
33use vortex_error::vortex_bail;
34use vortex_io::runtime::BlockingRuntime;
35use vortex_io::runtime::Handle;
36use vortex_io::runtime::Task;
37use vortex_io::session::RuntimeSessionExt;
38use vortex_layout::LayoutReader;
39use vortex_layout::LayoutReaderRef;
40use vortex_layout::layouts::row_idx::RowIdxLayoutReader;
41use vortex_metrics::MetricsRegistry;
42use vortex_session::VortexSession;
43
44use crate::RepeatedScan;
45use crate::selection::Selection;
46use crate::split_by::SplitBy;
47use crate::splits::Splits;
48use crate::splits::attempt_split_ranges;
49
50/// A struct for building a scan operation.
51pub struct ScanBuilder<A> {
52    session: VortexSession,
53    layout_reader: LayoutReaderRef,
54    projection: Expression,
55    filter: Option<Expression>,
56    /// Whether the scan needs to return splits in the order they appear in the file.
57    ordered: bool,
58    /// Optionally read a subset of the rows in the file.
59    row_range: Option<Range<u64>>,
60    /// The selection mask to apply to the selected row range.
61    // TODO(joe): replace this is usage of row_id selection, see
62    selection: Selection,
63    /// How to split the file for concurrent processing.
64    split_by: SplitBy,
65    /// The number of splits to make progress on concurrently **per-thread**.
66    concurrency: usize,
67    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
68    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
69    metrics_registry: Option<Arc<dyn MetricsRegistry>>,
70    /// Should we try to prune the file (using stats) on open.
71    file_stats: Option<Arc<[StatsSet]>>,
72    /// Maximal number of rows to read (after filtering)
73    limit: Option<u64>,
74    /// The row-offset assigned to the first row of the file. Used by the `row_idx` expression,
75    /// but not by the scan [`Selection`] which remains relative.
76    row_offset: u64,
77}
78
79impl ScanBuilder<ArrayRef> {
80    pub fn new(session: VortexSession, layout_reader: Arc<dyn LayoutReader>) -> Self {
81        Self {
82            session,
83            layout_reader,
84            projection: root(),
85            filter: None,
86            ordered: true,
87            row_range: None,
88            selection: Default::default(),
89            split_by: SplitBy::Layout,
90            // We default to four tasks per worker thread, which allows for some I/O lookahead
91            // without too much impact on work-stealing.
92            concurrency: 4,
93            map_fn: Arc::new(Ok),
94            metrics_registry: None,
95            file_stats: None,
96            limit: None,
97            row_offset: 0,
98        }
99    }
100
101    /// Returns an [`ArrayStream`] with tasks spawned onto the session's runtime handle.
102    ///
103    /// See [`ScanBuilder::into_stream`] for more details.
104    pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
105        let dtype = self.dtype()?;
106        let stream = self.into_stream()?;
107        Ok(ArrayStreamAdapter::new(dtype, stream))
108    }
109
110    /// Returns an [`ArrayIterator`] using the given blocking runtime.
111    pub fn into_array_iter<B: BlockingRuntime>(
112        self,
113        runtime: &B,
114    ) -> VortexResult<impl ArrayIterator + 'static> {
115        let stream = self.into_array_stream()?;
116        let dtype = stream.dtype().clone();
117        Ok(ArrayIteratorAdapter::new(
118            dtype,
119            runtime.block_on_stream(stream),
120        ))
121    }
122}
123
124impl<A: 'static + Send> ScanBuilder<A> {
125    pub fn with_filter(mut self, filter: Expression) -> Self {
126        self.filter = Some(filter);
127        self
128    }
129
130    pub fn with_some_filter(mut self, filter: Option<Expression>) -> Self {
131        self.filter = filter;
132        self
133    }
134
135    pub fn with_projection(mut self, projection: Expression) -> Self {
136        self.projection = projection;
137        self
138    }
139
140    pub fn ordered(&self) -> bool {
141        self.ordered
142    }
143
144    pub fn with_ordered(mut self, ordered: bool) -> Self {
145        self.ordered = ordered;
146        self
147    }
148
149    pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
150        self.row_range = Some(row_range);
151        self
152    }
153
154    pub fn with_selection(mut self, selection: Selection) -> Self {
155        self.selection = selection;
156        self
157    }
158
159    pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
160        self.selection = Selection::IncludeByIndex(row_indices);
161        self
162    }
163
164    pub fn with_row_offset(mut self, row_offset: u64) -> Self {
165        self.row_offset = row_offset;
166        self
167    }
168
169    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
170        self.split_by = split_by;
171        self
172    }
173
174    pub fn concurrency(&self) -> usize {
175        self.concurrency
176    }
177
178    /// The number of row splits to make progress on concurrently per-thread, must
179    /// be greater than 0.
180    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
181        assert!(concurrency > 0);
182        self.concurrency = concurrency;
183        self
184    }
185
186    pub fn with_some_metrics_registry(mut self, metrics: Option<Arc<dyn MetricsRegistry>>) -> Self {
187        self.metrics_registry = metrics;
188        self
189    }
190
191    pub fn with_metrics_registry(mut self, metrics: Arc<dyn MetricsRegistry>) -> Self {
192        self.metrics_registry = Some(metrics);
193        self
194    }
195
196    pub fn with_some_limit(mut self, limit: Option<u64>) -> Self {
197        self.limit = limit;
198        self
199    }
200
201    pub fn with_limit(mut self, limit: u64) -> Self {
202        self.limit = Some(limit);
203        self
204    }
205
206    /// The [`DType`] returned by the scan, after applying the projection.
207    pub fn dtype(&self) -> VortexResult<DType> {
208        self.projection.return_dtype(self.layout_reader.dtype())
209    }
210
211    /// The session used by the scan.
212    pub fn session(&self) -> &VortexSession {
213        &self.session
214    }
215
216    /// Map each split of the scan. The function will be run on the spawned task.
217    pub fn map<B: 'static>(
218        self,
219        map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
220    ) -> ScanBuilder<B> {
221        let old_map_fn = self.map_fn;
222        ScanBuilder {
223            session: self.session,
224            layout_reader: self.layout_reader,
225            projection: self.projection,
226            filter: self.filter,
227            ordered: self.ordered,
228            row_range: self.row_range,
229            selection: self.selection,
230            split_by: self.split_by,
231            concurrency: self.concurrency,
232            metrics_registry: self.metrics_registry,
233            file_stats: self.file_stats,
234            limit: self.limit,
235            row_offset: self.row_offset,
236            map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
237        }
238    }
239
240    pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
241        let dtype = self.dtype()?;
242
243        if self.filter.is_some() && self.limit.is_some() {
244            vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
245        }
246
247        // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform
248        // conjunction splitting if a filter is provided.
249        let mut layout_reader = self.layout_reader;
250
251        // Enrich the layout reader to support RowIdx expressions.
252        // Note that this is applied below the filter layout reader since it can perform
253        // better over individual conjunctions.
254        layout_reader = Arc::new(RowIdxLayoutReader::new(
255            self.row_offset,
256            layout_reader,
257            self.session.clone(),
258        ));
259
260        // Normalize and simplify the expressions.
261        let projection = self.projection.optimize_recursive(layout_reader.dtype())?;
262
263        let filter = self
264            .filter
265            .map(|f| f.optimize_recursive(layout_reader.dtype()))
266            .transpose()?;
267
268        // Construct field masks and compute the row splits of the scan.
269        let (filter_mask, projection_mask) =
270            filter_and_projection_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
271        let field_mask: Vec<_> = [filter_mask, projection_mask].concat();
272
273        let splits =
274            if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) {
275                Splits::Ranges(ranges)
276            } else {
277                let split_range = self
278                    .row_range
279                    .clone()
280                    .unwrap_or_else(|| 0..layout_reader.row_count());
281                Splits::Natural(self.split_by.splits(
282                    layout_reader.as_ref(),
283                    &split_range,
284                    &field_mask,
285                )?)
286            };
287
288        Ok(RepeatedScan::new(
289            self.session.clone(),
290            layout_reader,
291            projection,
292            filter,
293            self.ordered,
294            self.row_range,
295            self.selection,
296            splits,
297            self.concurrency,
298            self.map_fn,
299            self.limit,
300            dtype,
301        ))
302    }
303
304    /// Constructs a task per row split of the scan, returned as a vector of futures.
305    pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
306        // The ultimate short circuit
307        if self.limit.is_some_and(|l| l == 0) {
308            return Ok(vec![]);
309        }
310
311        self.prepare()?.execute(None)
312    }
313
314    /// Returns a [`Stream`] with tasks spawned onto the session's runtime handle.
315    pub fn into_stream(
316        self,
317    ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
318        Ok(LazyScanStream::new(self))
319    }
320
321    /// Returns an [`Iterator`] using the session's runtime.
322    pub fn into_iter<B: BlockingRuntime>(
323        self,
324        runtime: &B,
325    ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
326        let stream = self.into_stream()?;
327        Ok(runtime.block_on_stream(stream))
328    }
329}
330
331enum LazyScanState<A: 'static + Send> {
332    Builder(Option<Box<ScanBuilder<A>>>),
333    Preparing(PreparingScan<A>),
334    Stream(BoxStream<'static, VortexResult<A>>),
335    Error(Option<vortex_error::VortexError>),
336}
337
338type PreparedScanTasks<A> = Vec<BoxFuture<'static, VortexResult<Option<A>>>>;
339
340struct PreparingScan<A: 'static + Send> {
341    ordered: bool,
342    concurrency: usize,
343    handle: Handle,
344    task: Task<VortexResult<PreparedScanTasks<A>>>,
345}
346
347struct LazyScanStream<A: 'static + Send> {
348    state: LazyScanState<A>,
349}
350
351impl<A: 'static + Send> LazyScanStream<A> {
352    fn new(builder: ScanBuilder<A>) -> Self {
353        Self {
354            state: LazyScanState::Builder(Some(Box::new(builder))),
355        }
356    }
357}
358
359impl<A: 'static + Send> Unpin for LazyScanStream<A> {}
360
361impl<A: 'static + Send> Stream for LazyScanStream<A> {
362    type Item = VortexResult<A>;
363
364    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
365        loop {
366            match &mut self.state {
367                LazyScanState::Builder(builder) => {
368                    let builder = builder.take().vortex_expect("polled after completion");
369                    let ordered = builder.ordered;
370                    let num_workers = std::thread::available_parallelism()
371                        .map(|n| n.get())
372                        .unwrap_or(1);
373                    let concurrency = builder.concurrency * num_workers;
374                    let handle = builder.session.handle();
375                    let task = handle.spawn_blocking(move || {
376                        builder.prepare().and_then(|scan| scan.execute(None))
377                    });
378                    self.state = LazyScanState::Preparing(PreparingScan {
379                        ordered,
380                        concurrency,
381                        handle,
382                        task,
383                    });
384                }
385                LazyScanState::Preparing(preparing) => {
386                    match ready!(Pin::new(&mut preparing.task).poll(cx)) {
387                        Ok(tasks) => {
388                            let ordered = preparing.ordered;
389                            let concurrency = preparing.concurrency;
390                            let handle = preparing.handle.clone();
391                            let stream =
392                                futures::stream::iter(tasks).map(move |task| handle.spawn(task));
393                            let stream = if ordered {
394                                stream.buffered(concurrency).boxed()
395                            } else {
396                                stream.buffer_unordered(concurrency).boxed()
397                            };
398                            let stream = stream
399                                .filter_map(|chunk| async move { chunk.transpose() })
400                                .boxed();
401                            self.state = LazyScanState::Stream(stream);
402                        }
403                        Err(err) => self.state = LazyScanState::Error(Some(err)),
404                    }
405                }
406                LazyScanState::Stream(stream) => return stream.as_mut().poll_next(cx),
407                LazyScanState::Error(err) => return Poll::Ready(err.take().map(Err)),
408            }
409        }
410    }
411}
412
413/// Compute masks of field paths referenced by the projection and filter in the scan.
414///
415/// Projection and filter must be pre-simplified.
416pub(crate) fn filter_and_projection_masks(
417    projection: &Expression,
418    filter: Option<&Expression>,
419    dtype: &DType,
420) -> VortexResult<(Vec<FieldMask>, Vec<FieldMask>)> {
421    let Some(struct_dtype) = dtype.as_struct_fields_opt() else {
422        return Ok(match filter {
423            Some(_) => (vec![FieldMask::All], vec![FieldMask::All]),
424            None => (Vec::new(), vec![FieldMask::All]),
425        });
426    };
427    let projection_mask = immediate_scope_access(projection, struct_dtype);
428    Ok(match filter {
429        None => (
430            Vec::new(),
431            projection_mask.into_iter().map(to_field_mask).collect_vec(),
432        ),
433        Some(f) => {
434            let filter_mask = immediate_scope_access(f, struct_dtype);
435            let only_projection_mask = projection_mask
436                .difference(&filter_mask)
437                .cloned()
438                .map(to_field_mask)
439                .collect_vec();
440            (
441                filter_mask.into_iter().map(to_field_mask).collect_vec(),
442                only_projection_mask,
443            )
444        }
445    })
446}
447
448fn to_field_mask(field: FieldName) -> FieldMask {
449    FieldMask::Prefix(FieldPath::from(Field::Name(field)))
450}
451
452#[cfg(test)]
453mod test {
454    use std::collections::BTreeSet;
455    use std::ops::Range;
456    use std::pin::Pin;
457    use std::sync::Arc;
458    use std::sync::atomic::AtomicUsize;
459    use std::sync::atomic::Ordering;
460    use std::task::Context;
461    use std::task::Poll;
462    use std::time::Duration;
463
464    use futures::Stream;
465    use futures::task::noop_waker_ref;
466    use parking_lot::Mutex;
467    use vortex_array::IntoArray;
468    use vortex_array::MaskFuture;
469    use vortex_array::ToCanonical;
470    use vortex_array::arrays::PrimitiveArray;
471    use vortex_array::dtype::DType;
472    use vortex_array::dtype::FieldMask;
473    use vortex_array::dtype::Nullability;
474    use vortex_array::dtype::PType;
475    use vortex_array::expr::Expression;
476    use vortex_error::VortexResult;
477    use vortex_error::vortex_err;
478    use vortex_io::runtime::BlockingRuntime;
479    use vortex_io::runtime::single::SingleThreadRuntime;
480    use vortex_layout::ArrayFuture;
481    use vortex_layout::LayoutReader;
482    use vortex_mask::Mask;
483
484    use super::ScanBuilder;
485
486    #[derive(Debug)]
487    struct CountingLayoutReader {
488        name: Arc<str>,
489        dtype: DType,
490        row_count: u64,
491        register_splits_calls: Arc<AtomicUsize>,
492    }
493
494    impl CountingLayoutReader {
495        fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
496            Self {
497                name: Arc::from("counting"),
498                dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
499                row_count: 1,
500                register_splits_calls,
501            }
502        }
503    }
504
505    impl LayoutReader for CountingLayoutReader {
506        fn name(&self) -> &Arc<str> {
507            &self.name
508        }
509
510        fn dtype(&self) -> &DType {
511            &self.dtype
512        }
513
514        fn row_count(&self) -> u64 {
515            self.row_count
516        }
517
518        fn register_splits(
519            &self,
520            _field_mask: &[FieldMask],
521            row_range: &Range<u64>,
522            splits: &mut BTreeSet<u64>,
523        ) -> VortexResult<()> {
524            self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
525            splits.insert(row_range.end);
526            Ok(())
527        }
528
529        fn pruning_evaluation(
530            &self,
531            _row_range: &Range<u64>,
532            _expr: &Expression,
533            _mask: Mask,
534        ) -> VortexResult<MaskFuture> {
535            unimplemented!("not needed for this test");
536        }
537
538        fn filter_evaluation(
539            &self,
540            _row_range: &Range<u64>,
541            _expr: &Expression,
542            _mask: MaskFuture,
543        ) -> VortexResult<MaskFuture> {
544            unimplemented!("not needed for this test");
545        }
546
547        fn projection_evaluation(
548            &self,
549            _row_range: &Range<u64>,
550            _expr: &Expression,
551            _mask: MaskFuture,
552        ) -> VortexResult<ArrayFuture> {
553            Ok(Box::pin(async move {
554                unreachable!("scan should not be polled in this test")
555            }))
556        }
557    }
558
559    #[test]
560    fn into_stream_is_lazy() {
561        let calls = Arc::new(AtomicUsize::new(0));
562        let reader = Arc::new(CountingLayoutReader::new(calls.clone()));
563
564        let session = crate::test::SCAN_SESSION.clone();
565
566        let _stream = ScanBuilder::new(session, reader).into_stream().unwrap();
567
568        assert_eq!(calls.load(Ordering::Relaxed), 0);
569    }
570
571    #[derive(Debug)]
572    struct SplittingLayoutReader {
573        name: Arc<str>,
574        dtype: DType,
575        row_count: u64,
576        register_splits_calls: Arc<AtomicUsize>,
577    }
578
579    impl SplittingLayoutReader {
580        fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
581            Self {
582                name: Arc::from("splitting"),
583                dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
584                row_count: 4,
585                register_splits_calls,
586            }
587        }
588    }
589
590    impl LayoutReader for SplittingLayoutReader {
591        fn name(&self) -> &Arc<str> {
592            &self.name
593        }
594
595        fn dtype(&self) -> &DType {
596            &self.dtype
597        }
598
599        fn row_count(&self) -> u64 {
600            self.row_count
601        }
602
603        fn register_splits(
604            &self,
605            _field_mask: &[FieldMask],
606            row_range: &Range<u64>,
607            splits: &mut BTreeSet<u64>,
608        ) -> VortexResult<()> {
609            self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
610            for split in (row_range.start + 1)..=row_range.end {
611                splits.insert(split);
612            }
613            Ok(())
614        }
615
616        fn pruning_evaluation(
617            &self,
618            _row_range: &Range<u64>,
619            _expr: &Expression,
620            mask: Mask,
621        ) -> VortexResult<MaskFuture> {
622            Ok(MaskFuture::ready(mask))
623        }
624
625        fn filter_evaluation(
626            &self,
627            _row_range: &Range<u64>,
628            _expr: &Expression,
629            mask: MaskFuture,
630        ) -> VortexResult<MaskFuture> {
631            Ok(mask)
632        }
633
634        fn projection_evaluation(
635            &self,
636            row_range: &Range<u64>,
637            _expr: &Expression,
638            _mask: MaskFuture,
639        ) -> VortexResult<ArrayFuture> {
640            let start = usize::try_from(row_range.start)
641                .map_err(|_| vortex_err!("row_range.start must fit in usize"))?;
642            let end = usize::try_from(row_range.end)
643                .map_err(|_| vortex_err!("row_range.end must fit in usize"))?;
644
645            let values: VortexResult<Vec<i32>> = (start..end)
646                .map(|v| i32::try_from(v).map_err(|_| vortex_err!("split value must fit in i32")))
647                .collect();
648
649            let array = PrimitiveArray::from_iter(values?).into_array();
650            Ok(Box::pin(async move { Ok(array) }))
651        }
652    }
653
654    #[test]
655    fn into_stream_executes_after_prepare() -> VortexResult<()> {
656        let calls = Arc::new(AtomicUsize::new(0));
657        let reader = Arc::new(SplittingLayoutReader::new(calls.clone()));
658
659        let runtime = SingleThreadRuntime::default();
660        let session = crate::test::session_with_handle(runtime.handle());
661
662        let stream = ScanBuilder::new(session, reader).into_stream().unwrap();
663        let mut iter = runtime.block_on_stream(stream);
664
665        let mut values = Vec::new();
666        for chunk in &mut iter {
667            values.push(chunk?.to_primitive().into_buffer::<i32>()[0]);
668        }
669
670        assert_eq!(calls.load(Ordering::Relaxed), 1);
671        assert_eq!(values.as_ref(), [0, 1, 2, 3]);
672
673        Ok(())
674    }
675
676    #[derive(Debug)]
677    struct BlockingSplitsLayoutReader {
678        name: Arc<str>,
679        dtype: DType,
680        row_count: u64,
681        register_splits_calls: Arc<AtomicUsize>,
682        gate: Arc<Mutex<()>>,
683    }
684
685    impl BlockingSplitsLayoutReader {
686        fn new(gate: Arc<Mutex<()>>, register_splits_calls: Arc<AtomicUsize>) -> Self {
687            Self {
688                name: Arc::from("blocking-splits"),
689                dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
690                row_count: 1,
691                register_splits_calls,
692                gate,
693            }
694        }
695    }
696
697    impl LayoutReader for BlockingSplitsLayoutReader {
698        fn name(&self) -> &Arc<str> {
699            &self.name
700        }
701
702        fn dtype(&self) -> &DType {
703            &self.dtype
704        }
705
706        fn row_count(&self) -> u64 {
707            self.row_count
708        }
709
710        fn register_splits(
711            &self,
712            _field_mask: &[FieldMask],
713            row_range: &Range<u64>,
714            splits: &mut BTreeSet<u64>,
715        ) -> VortexResult<()> {
716            self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
717            let _guard = self.gate.lock();
718            splits.insert(row_range.end);
719            Ok(())
720        }
721
722        fn pruning_evaluation(
723            &self,
724            _row_range: &Range<u64>,
725            _expr: &Expression,
726            _mask: Mask,
727        ) -> VortexResult<MaskFuture> {
728            unimplemented!("not needed for this test");
729        }
730
731        fn filter_evaluation(
732            &self,
733            _row_range: &Range<u64>,
734            _expr: &Expression,
735            _mask: MaskFuture,
736        ) -> VortexResult<MaskFuture> {
737            unimplemented!("not needed for this test");
738        }
739
740        fn projection_evaluation(
741            &self,
742            _row_range: &Range<u64>,
743            _expr: &Expression,
744            _mask: MaskFuture,
745        ) -> VortexResult<ArrayFuture> {
746            Ok(Box::pin(async move {
747                unreachable!("scan should not be polled in this test")
748            }))
749        }
750    }
751
752    #[test]
753    fn into_stream_first_poll_does_not_block() {
754        let gate = Arc::new(Mutex::new(()));
755        let guard = gate.lock();
756
757        let calls = Arc::new(AtomicUsize::new(0));
758        let reader = Arc::new(BlockingSplitsLayoutReader::new(gate.clone(), calls.clone()));
759
760        let runtime = SingleThreadRuntime::default();
761        let session = crate::test::session_with_handle(runtime.handle());
762
763        let mut stream = ScanBuilder::new(session, reader).into_stream().unwrap();
764
765        let (send, recv) = std::sync::mpsc::channel::<bool>();
766        let join = std::thread::spawn(move || {
767            let waker = noop_waker_ref();
768            let mut cx = Context::from_waker(waker);
769            let poll = Pin::new(&mut stream).poll_next(&mut cx);
770            let _ = send.send(matches!(poll, Poll::Pending));
771        });
772
773        let polled_pending = recv.recv_timeout(Duration::from_secs(1)).ok();
774
775        // Always release the gate and join the thread so failures don't hang the test process.
776        drop(guard);
777        drop(join.join());
778
779        let polled_pending = polled_pending.expect("poll_next blocked; expected quick return");
780        assert!(
781            polled_pending,
782            "expected Poll::Pending while prepare is blocked"
783        );
784        assert_eq!(calls.load(Ordering::Relaxed), 0);
785
786        drop(runtime);
787    }
788}