Skip to main content

vortex_layout/scan/
scan_builder.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::ops::Range;
5use std::pin::Pin;
6use std::sync::Arc;
7use std::task::Context;
8use std::task::Poll;
9use std::task::ready;
10
11use futures::Stream;
12use futures::StreamExt;
13use futures::future::BoxFuture;
14use futures::stream::BoxStream;
15use itertools::Itertools;
16use vortex_array::ArrayRef;
17use vortex_array::dtype::DType;
18use vortex_array::dtype::FieldMask;
19use vortex_array::expr::Expression;
20use vortex_array::expr::analysis::referenced_field_paths;
21use vortex_array::expr::root;
22use vortex_array::iter::ArrayIterator;
23use vortex_array::iter::ArrayIteratorAdapter;
24use vortex_array::stats::StatsSet;
25use vortex_array::stream::ArrayStream;
26use vortex_array::stream::ArrayStreamAdapter;
27use vortex_buffer::Buffer;
28use vortex_error::VortexExpect;
29use vortex_error::VortexResult;
30use vortex_error::vortex_bail;
31use vortex_io::runtime::BlockingRuntime;
32use vortex_io::runtime::Handle;
33use vortex_io::runtime::Task;
34use vortex_io::session::RuntimeSessionExt;
35use vortex_metrics::MetricsRegistry;
36use vortex_scan::selection::Selection;
37use vortex_session::VortexSession;
38use vortex_utils::parallelism::get_available_parallelism;
39
40use crate::LayoutReader;
41use crate::LayoutReaderRef;
42use crate::layouts::row_idx::RowIdxLayoutReader;
43use crate::scan::repeated_scan::RepeatedScan;
44use crate::scan::split_by::SplitBy;
45use crate::scan::splits::Splits;
46use crate::scan::splits::attempt_split_ranges;
47
48/// A struct for building a scan operation.
49pub struct ScanBuilder<A> {
50    session: VortexSession,
51    layout_reader: LayoutReaderRef,
52    projection: Expression,
53    filter: Option<Expression>,
54    /// Whether the scan needs to return splits in the order they appear in the file.
55    ordered: bool,
56    /// Optionally read a subset of the rows in the file.
57    row_range: Option<Range<u64>>,
58    /// The selection mask to apply to the selected row range.
59    // TODO(joe): replace this is usage of row_id selection, see
60    selection: Selection,
61    /// How to split the file for concurrent processing.
62    split_by: SplitBy,
63    /// The number of splits to make progress on concurrently **per-thread**.
64    concurrency: usize,
65    /// Function to apply to each [`ArrayRef`] within the spawned split tasks.
66    map_fn: Arc<dyn Fn(ArrayRef) -> VortexResult<A> + Send + Sync>,
67    metrics_registry: Option<Arc<dyn MetricsRegistry>>,
68    /// Should we try to prune the file (using stats) on open.
69    file_stats: Option<Arc<[StatsSet]>>,
70    /// Maximal number of rows to read (after filtering)
71    limit: Option<u64>,
72    /// The row-offset assigned to the first row of the file. Used by the `row_idx` expression,
73    /// but not by the scan [`Selection`] which remains relative.
74    row_offset: u64,
75}
76
77impl ScanBuilder<ArrayRef> {
78    pub fn new(session: VortexSession, layout_reader: Arc<dyn LayoutReader>) -> Self {
79        Self {
80            session,
81            layout_reader,
82            projection: root(),
83            filter: None,
84            ordered: true,
85            row_range: None,
86            selection: Default::default(),
87            split_by: SplitBy::Layout,
88            // We default to four tasks per worker thread, which allows for some I/O lookahead
89            // without too much impact on work-stealing.
90            concurrency: 4,
91            map_fn: Arc::new(Ok),
92            metrics_registry: None,
93            file_stats: None,
94            limit: None,
95            row_offset: 0,
96        }
97    }
98
99    /// Returns an [`ArrayStream`] with tasks spawned onto the session's runtime handle.
100    ///
101    /// See [`ScanBuilder::into_stream`] for more details.
102    pub fn into_array_stream(self) -> VortexResult<impl ArrayStream + Send + 'static> {
103        let dtype = self.dtype()?;
104        let stream = self.into_stream()?;
105        Ok(ArrayStreamAdapter::new(dtype, stream))
106    }
107
108    /// Returns an [`ArrayIterator`] using the given blocking runtime.
109    pub fn into_array_iter<B: BlockingRuntime>(
110        self,
111        runtime: &B,
112    ) -> VortexResult<impl ArrayIterator + 'static> {
113        let stream = self.into_array_stream()?;
114        let dtype = stream.dtype().clone();
115        Ok(ArrayIteratorAdapter::new(
116            dtype,
117            runtime.block_on_stream(stream),
118        ))
119    }
120}
121
122impl<A: 'static + Send> ScanBuilder<A> {
123    pub fn with_filter(mut self, filter: Expression) -> Self {
124        self.filter = Some(filter);
125        self
126    }
127
128    pub fn with_some_filter(mut self, filter: Option<Expression>) -> Self {
129        self.filter = filter;
130        self
131    }
132
133    pub fn with_projection(mut self, projection: Expression) -> Self {
134        self.projection = projection;
135        self
136    }
137
138    pub fn ordered(&self) -> bool {
139        self.ordered
140    }
141
142    pub fn with_ordered(mut self, ordered: bool) -> Self {
143        self.ordered = ordered;
144        self
145    }
146
147    pub fn with_row_range(mut self, row_range: Range<u64>) -> Self {
148        self.row_range = Some(row_range);
149        self
150    }
151
152    pub fn with_selection(mut self, selection: Selection) -> Self {
153        self.selection = selection;
154        self
155    }
156
157    pub fn with_row_indices(mut self, row_indices: Buffer<u64>) -> Self {
158        self.selection = Selection::IncludeByIndex(row_indices);
159        self
160    }
161
162    pub fn with_row_offset(mut self, row_offset: u64) -> Self {
163        self.row_offset = row_offset;
164        self
165    }
166
167    pub fn with_split_by(mut self, split_by: SplitBy) -> Self {
168        self.split_by = split_by;
169        self
170    }
171
172    pub fn concurrency(&self) -> usize {
173        self.concurrency
174    }
175
176    /// The number of row splits to make progress on concurrently per-thread, must
177    /// be greater than 0.
178    pub fn with_concurrency(mut self, concurrency: usize) -> Self {
179        assert!(concurrency > 0);
180        self.concurrency = concurrency;
181        self
182    }
183
184    pub fn with_some_metrics_registry(mut self, metrics: Option<Arc<dyn MetricsRegistry>>) -> Self {
185        self.metrics_registry = metrics;
186        self
187    }
188
189    pub fn with_metrics_registry(mut self, metrics: Arc<dyn MetricsRegistry>) -> Self {
190        self.metrics_registry = Some(metrics);
191        self
192    }
193
194    pub fn with_some_limit(mut self, limit: Option<u64>) -> Self {
195        self.limit = limit;
196        self
197    }
198
199    pub fn with_limit(mut self, limit: u64) -> Self {
200        self.limit = Some(limit);
201        self
202    }
203
204    /// The [`DType`] returned by the scan, after applying the projection.
205    pub fn dtype(&self) -> VortexResult<DType> {
206        self.projection.return_dtype(self.layout_reader.dtype())
207    }
208
209    /// The session used by the scan.
210    pub fn session(&self) -> &VortexSession {
211        &self.session
212    }
213
214    /// Map each split of the scan. The function will be run on the spawned task.
215    pub fn map<B: 'static>(
216        self,
217        map_fn: impl Fn(A) -> VortexResult<B> + 'static + Send + Sync,
218    ) -> ScanBuilder<B> {
219        let old_map_fn = self.map_fn;
220        ScanBuilder {
221            session: self.session,
222            layout_reader: self.layout_reader,
223            projection: self.projection,
224            filter: self.filter,
225            ordered: self.ordered,
226            row_range: self.row_range,
227            selection: self.selection,
228            split_by: self.split_by,
229            concurrency: self.concurrency,
230            metrics_registry: self.metrics_registry,
231            file_stats: self.file_stats,
232            limit: self.limit,
233            row_offset: self.row_offset,
234            map_fn: Arc::new(move |a| old_map_fn(a).and_then(&map_fn)),
235        }
236    }
237
238    pub fn prepare(self) -> VortexResult<RepeatedScan<A>> {
239        let dtype = self.dtype()?;
240
241        if self.filter.is_some() && self.limit.is_some() {
242            vortex_bail!("Vortex doesn't support scans with both a filter and a limit")
243        }
244
245        // Spin up the root layout reader, and wrap it in a FilterLayoutReader to perform
246        // conjunction splitting if a filter is provided.
247        let mut layout_reader = self.layout_reader;
248
249        // Enrich the layout reader to support RowIdx expressions.
250        // Note that this is applied below the filter layout reader since it can perform
251        // better over individual conjunctions.
252        layout_reader = Arc::new(RowIdxLayoutReader::new(
253            self.row_offset,
254            layout_reader,
255            self.session.clone(),
256        ));
257
258        // Normalize and simplify the expressions.
259        let projection = self.projection.optimize_recursive(layout_reader.dtype())?;
260
261        let filter = self
262            .filter
263            .map(|f| f.optimize_recursive(layout_reader.dtype()))
264            .transpose()?;
265
266        // Construct field masks and compute the row splits of the scan.
267        let field_mask =
268            referenced_field_masks(&projection, filter.as_ref(), layout_reader.dtype())?;
269
270        let splits =
271            if let Some(ranges) = attempt_split_ranges(&self.selection, self.row_range.as_ref()) {
272                Splits::Ranges(ranges)
273            } else {
274                let split_range = self
275                    .row_range
276                    .clone()
277                    .unwrap_or_else(|| 0..layout_reader.row_count());
278                Splits::Natural(self.split_by.splits(
279                    layout_reader.as_ref(),
280                    &split_range,
281                    &field_mask,
282                )?)
283            };
284
285        Ok(RepeatedScan::new(
286            self.session.clone(),
287            layout_reader,
288            projection,
289            filter,
290            self.ordered,
291            self.row_range,
292            self.selection,
293            splits,
294            self.concurrency,
295            self.map_fn,
296            self.limit,
297            dtype,
298        ))
299    }
300
301    /// Constructs a task per row split of the scan, returned as a vector of futures.
302    pub fn build(self) -> VortexResult<Vec<BoxFuture<'static, VortexResult<Option<A>>>>> {
303        // The ultimate short circuit
304        if self.limit.is_some_and(|l| l == 0) {
305            return Ok(vec![]);
306        }
307
308        self.prepare()?.execute(None)
309    }
310
311    /// Returns a [`Stream`] with tasks spawned onto the session's runtime handle.
312    pub fn into_stream(
313        self,
314    ) -> VortexResult<impl Stream<Item = VortexResult<A>> + Send + 'static + use<A>> {
315        Ok(LazyScanStream::new(self))
316    }
317
318    /// Returns an [`Iterator`] using the session's runtime.
319    pub fn into_iter<B: BlockingRuntime>(
320        self,
321        runtime: &B,
322    ) -> VortexResult<impl Iterator<Item = VortexResult<A>> + 'static> {
323        let stream = self.into_stream()?;
324        Ok(runtime.block_on_stream(stream))
325    }
326}
327
328enum LazyScanState<A: 'static + Send> {
329    Builder(Option<Box<ScanBuilder<A>>>),
330    Preparing(PreparingScan<A>),
331    Stream(BoxStream<'static, VortexResult<A>>),
332    Error(Option<vortex_error::VortexError>),
333}
334
335type PreparedScanTasks<A> = Vec<BoxFuture<'static, VortexResult<Option<A>>>>;
336
337struct PreparingScan<A: 'static + Send> {
338    ordered: bool,
339    concurrency: usize,
340    handle: Handle,
341    task: Task<VortexResult<PreparedScanTasks<A>>>,
342}
343
344struct LazyScanStream<A: 'static + Send> {
345    state: LazyScanState<A>,
346}
347
348impl<A: 'static + Send> LazyScanStream<A> {
349    fn new(builder: ScanBuilder<A>) -> Self {
350        Self {
351            state: LazyScanState::Builder(Some(Box::new(builder))),
352        }
353    }
354}
355
356impl<A: 'static + Send> Unpin for LazyScanStream<A> {}
357
358impl<A: 'static + Send> Stream for LazyScanStream<A> {
359    type Item = VortexResult<A>;
360
361    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
362        loop {
363            match &mut self.state {
364                LazyScanState::Builder(builder) => {
365                    let builder = builder.take().vortex_expect("polled after completion");
366                    let ordered = builder.ordered;
367                    let num_workers = get_available_parallelism().unwrap_or(1);
368                    let concurrency = builder.concurrency * num_workers;
369                    let handle = builder.session.handle();
370                    let task = handle.spawn_blocking(move || {
371                        builder.prepare().and_then(|scan| scan.execute(None))
372                    });
373                    self.state = LazyScanState::Preparing(PreparingScan {
374                        ordered,
375                        concurrency,
376                        handle,
377                        task,
378                    });
379                }
380                LazyScanState::Preparing(preparing) => {
381                    match ready!(Pin::new(&mut preparing.task).poll(cx)) {
382                        Ok(tasks) => {
383                            let ordered = preparing.ordered;
384                            let concurrency = preparing.concurrency;
385                            let handle = preparing.handle.clone();
386                            let stream =
387                                futures::stream::iter(tasks).map(move |task| handle.spawn(task));
388                            let stream = if ordered {
389                                stream.buffered(concurrency).boxed()
390                            } else {
391                                stream.buffer_unordered(concurrency).boxed()
392                            };
393                            let stream = stream
394                                .filter_map(|chunk| async move { chunk.transpose() })
395                                .boxed();
396                            self.state = LazyScanState::Stream(stream);
397                        }
398                        Err(err) => self.state = LazyScanState::Error(Some(err)),
399                    }
400                }
401                LazyScanState::Stream(stream) => return stream.as_mut().poll_next(cx),
402                LazyScanState::Error(err) => return Poll::Ready(err.take().map(Err)),
403            }
404        }
405    }
406}
407
408/// Compute masks of field paths referenced by the projection and filter in the scan.
409///
410/// Projection and filter must be pre-simplified.
411pub fn referenced_field_masks(
412    projection: &Expression,
413    filter: Option<&Expression>,
414    dtype: &DType,
415) -> VortexResult<Vec<FieldMask>> {
416    if dtype.as_struct_fields_opt().is_none() {
417        return Ok(vec![FieldMask::All]);
418    }
419
420    let mut field_paths = referenced_field_paths(projection, dtype)?;
421    if let Some(filter) = filter {
422        field_paths.extend(referenced_field_paths(filter, dtype)?);
423    }
424    Ok(field_paths.into_iter().map(FieldMask::Prefix).collect_vec())
425}
426
427#[cfg(test)]
428mod test {
429    use std::ops::Range;
430    use std::pin::Pin;
431    use std::sync::Arc;
432    use std::sync::atomic::AtomicUsize;
433    use std::sync::atomic::Ordering;
434    use std::sync::mpsc;
435    use std::task::Context;
436    use std::task::Poll;
437    use std::time::Duration;
438
439    use futures::Stream;
440    use futures::task::noop_waker_ref;
441    use parking_lot::Mutex;
442    use vortex_array::IntoArray;
443    use vortex_array::LEGACY_SESSION;
444    use vortex_array::MaskFuture;
445    use vortex_array::VortexSessionExecute;
446    use vortex_array::arrays::PrimitiveArray;
447    use vortex_array::dtype::DType;
448    use vortex_array::dtype::FieldMask;
449    use vortex_array::dtype::FieldPath;
450    use vortex_array::dtype::Nullability;
451    use vortex_array::dtype::PType;
452    use vortex_array::dtype::StructFields;
453    use vortex_array::expr::Expression;
454    use vortex_array::expr::eq;
455    use vortex_array::expr::get_item;
456    use vortex_array::expr::is_not_null;
457    use vortex_array::expr::lit;
458    use vortex_array::expr::root;
459    use vortex_error::VortexResult;
460    use vortex_error::vortex_err;
461    use vortex_io::runtime::BlockingRuntime;
462    use vortex_io::runtime::single::SingleThreadRuntime;
463    use vortex_mask::Mask;
464
465    use super::ScanBuilder;
466    use super::referenced_field_masks;
467    use crate::ArrayFuture;
468    use crate::LayoutReader;
469    use crate::RowSplits;
470    use crate::SplitRange;
471    use crate::scan::test::SCAN_SESSION;
472    use crate::scan::test::session_with_handle;
473
474    fn nested_dtype() -> DType {
475        DType::Struct(
476            StructFields::from_iter([
477                (
478                    "a",
479                    DType::Struct(
480                        StructFields::from_iter([
481                            ("1", DType::Primitive(PType::I32, Nullability::NonNullable)),
482                            ("2", DType::Primitive(PType::I32, Nullability::NonNullable)),
483                        ]),
484                        Nullability::NonNullable,
485                    ),
486                ),
487                ("b", DType::Primitive(PType::I32, Nullability::NonNullable)),
488            ]),
489            Nullability::NonNullable,
490        )
491    }
492
493    #[test]
494    fn nested_projection_preserves_field_path_in_split_mask() -> VortexResult<()> {
495        let projection = get_item("1", get_item("a", root()));
496        let filter = eq(get_item("2", get_item("a", root())), lit(0_i32));
497
498        let field_masks = referenced_field_masks(&projection, Some(&filter), &nested_dtype())?;
499
500        assert_eq!(field_masks.len(), 2);
501        assert!(field_masks.contains(&FieldMask::Prefix(FieldPath::from_name("a").push("1"))));
502        assert!(field_masks.contains(&FieldMask::Prefix(FieldPath::from_name("a").push("2"))));
503        Ok(())
504    }
505
506    #[test]
507    fn filter_path_covers_nested_projection_path() -> VortexResult<()> {
508        let projection = get_item("1", get_item("a", root()));
509        let filter = is_not_null(get_item("a", root()));
510
511        let field_masks = referenced_field_masks(&projection, Some(&filter), &nested_dtype())?;
512
513        assert_eq!(field_masks, [FieldMask::Prefix(FieldPath::from_name("a"))]);
514        Ok(())
515    }
516
517    #[test]
518    fn parent_projection_path_covers_nested_filter_path() -> VortexResult<()> {
519        let projection = get_item("a", root());
520        let filter = is_not_null(get_item("1", get_item("a", root())));
521
522        let field_masks = referenced_field_masks(&projection, Some(&filter), &nested_dtype())?;
523
524        assert_eq!(field_masks, [FieldMask::Prefix(FieldPath::from_name("a"))]);
525        Ok(())
526    }
527
528    #[derive(Debug)]
529    struct CountingLayoutReader {
530        name: Arc<str>,
531        dtype: DType,
532        row_count: u64,
533        register_splits_calls: Arc<AtomicUsize>,
534    }
535
536    impl CountingLayoutReader {
537        fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
538            Self {
539                name: Arc::from("counting"),
540                dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
541                row_count: 1,
542                register_splits_calls,
543            }
544        }
545    }
546
547    impl LayoutReader for CountingLayoutReader {
548        fn name(&self) -> &Arc<str> {
549            &self.name
550        }
551
552        fn dtype(&self) -> &DType {
553            &self.dtype
554        }
555
556        fn row_count(&self) -> u64 {
557            self.row_count
558        }
559
560        fn register_splits(
561            &self,
562            _field_mask: &[FieldMask],
563            split_range: &SplitRange,
564            splits: &mut RowSplits,
565        ) -> VortexResult<()> {
566            self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
567            splits.push(split_range.root_row_range().end);
568            Ok(())
569        }
570
571        fn pruning_evaluation(
572            &self,
573            _row_range: &Range<u64>,
574            _expr: &Expression,
575            _mask: Mask,
576        ) -> VortexResult<MaskFuture> {
577            unimplemented!("not needed for this test");
578        }
579
580        fn filter_evaluation(
581            &self,
582            _row_range: &Range<u64>,
583            _expr: &Expression,
584            _mask: MaskFuture,
585        ) -> VortexResult<MaskFuture> {
586            unimplemented!("not needed for this test");
587        }
588
589        fn projection_evaluation(
590            &self,
591            _row_range: &Range<u64>,
592            _expr: &Expression,
593            _mask: MaskFuture,
594        ) -> VortexResult<ArrayFuture> {
595            Ok(Box::pin(async move {
596                unreachable!("scan should not be polled in this test")
597            }))
598        }
599
600        fn as_any(&self) -> &dyn std::any::Any {
601            self
602        }
603    }
604
605    #[test]
606    fn into_stream_is_lazy() {
607        let calls = Arc::new(AtomicUsize::new(0));
608        let reader = Arc::new(CountingLayoutReader::new(Arc::clone(&calls)));
609
610        let session = SCAN_SESSION.clone();
611
612        let _stream = ScanBuilder::new(session, reader).into_stream().unwrap();
613
614        assert_eq!(calls.load(Ordering::Relaxed), 0);
615    }
616
617    #[derive(Debug)]
618    struct SplittingLayoutReader {
619        name: Arc<str>,
620        dtype: DType,
621        row_count: u64,
622        register_splits_calls: Arc<AtomicUsize>,
623    }
624
625    impl SplittingLayoutReader {
626        fn new(register_splits_calls: Arc<AtomicUsize>) -> Self {
627            Self {
628                name: Arc::from("splitting"),
629                dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
630                row_count: 4,
631                register_splits_calls,
632            }
633        }
634    }
635
636    impl LayoutReader for SplittingLayoutReader {
637        fn name(&self) -> &Arc<str> {
638            &self.name
639        }
640
641        fn dtype(&self) -> &DType {
642            &self.dtype
643        }
644
645        fn row_count(&self) -> u64 {
646            self.row_count
647        }
648
649        fn register_splits(
650            &self,
651            _field_mask: &[FieldMask],
652            split_range: &SplitRange,
653            splits: &mut RowSplits,
654        ) -> VortexResult<()> {
655            self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
656            for split in (split_range.row_range().start + 1)..=split_range.row_range().end {
657                splits.push(split_range.row_offset() + split);
658            }
659            Ok(())
660        }
661
662        fn pruning_evaluation(
663            &self,
664            _row_range: &Range<u64>,
665            _expr: &Expression,
666            mask: Mask,
667        ) -> VortexResult<MaskFuture> {
668            Ok(MaskFuture::ready(mask))
669        }
670
671        fn filter_evaluation(
672            &self,
673            _row_range: &Range<u64>,
674            _expr: &Expression,
675            mask: MaskFuture,
676        ) -> VortexResult<MaskFuture> {
677            Ok(mask)
678        }
679
680        fn projection_evaluation(
681            &self,
682            row_range: &Range<u64>,
683            _expr: &Expression,
684            _mask: MaskFuture,
685        ) -> VortexResult<ArrayFuture> {
686            let start = usize::try_from(row_range.start)
687                .map_err(|_| vortex_err!("row_range.start must fit in usize"))?;
688            let end = usize::try_from(row_range.end)
689                .map_err(|_| vortex_err!("row_range.end must fit in usize"))?;
690
691            let values: VortexResult<Vec<i32>> = (start..end)
692                .map(|v| i32::try_from(v).map_err(|_| vortex_err!("split value must fit in i32")))
693                .collect();
694
695            let array = PrimitiveArray::from_iter(values?).into_array();
696            Ok(Box::pin(async move { Ok(array) }))
697        }
698
699        fn as_any(&self) -> &dyn std::any::Any {
700            self
701        }
702    }
703
704    #[test]
705    fn into_stream_executes_after_prepare() -> VortexResult<()> {
706        let mut ctx = LEGACY_SESSION.create_execution_ctx();
707        let calls = Arc::new(AtomicUsize::new(0));
708        let reader = Arc::new(SplittingLayoutReader::new(Arc::clone(&calls)));
709
710        let runtime = SingleThreadRuntime::default();
711        let session = session_with_handle(runtime.handle());
712
713        let stream = ScanBuilder::new(session, reader).into_stream()?;
714        let mut iter = runtime.block_on_stream(stream);
715
716        let mut values = Vec::new();
717        for chunk in &mut iter {
718            let prim = chunk?.execute::<PrimitiveArray>(&mut ctx)?;
719            values.push(prim.into_buffer::<i32>()[0]);
720        }
721
722        assert_eq!(calls.load(Ordering::Relaxed), 1);
723        assert_eq!(values.as_ref(), [0, 1, 2, 3]);
724
725        Ok(())
726    }
727
728    #[derive(Debug)]
729    struct BlockingSplitsLayoutReader {
730        name: Arc<str>,
731        dtype: DType,
732        row_count: u64,
733        register_splits_calls: Arc<AtomicUsize>,
734        gate: Arc<Mutex<()>>,
735    }
736
737    impl BlockingSplitsLayoutReader {
738        fn new(gate: Arc<Mutex<()>>, register_splits_calls: Arc<AtomicUsize>) -> Self {
739            Self {
740                name: Arc::from("blocking-splits"),
741                dtype: DType::Primitive(PType::I32, Nullability::NonNullable),
742                row_count: 1,
743                register_splits_calls,
744                gate,
745            }
746        }
747    }
748
749    impl LayoutReader for BlockingSplitsLayoutReader {
750        fn name(&self) -> &Arc<str> {
751            &self.name
752        }
753
754        fn dtype(&self) -> &DType {
755            &self.dtype
756        }
757
758        fn row_count(&self) -> u64 {
759            self.row_count
760        }
761
762        fn register_splits(
763            &self,
764            _field_mask: &[FieldMask],
765            split_range: &SplitRange,
766            splits: &mut RowSplits,
767        ) -> VortexResult<()> {
768            self.register_splits_calls.fetch_add(1, Ordering::Relaxed);
769            let _guard = self.gate.lock();
770            splits.push(split_range.root_row_range().end);
771            Ok(())
772        }
773
774        fn pruning_evaluation(
775            &self,
776            _row_range: &Range<u64>,
777            _expr: &Expression,
778            _mask: Mask,
779        ) -> VortexResult<MaskFuture> {
780            unimplemented!("not needed for this test");
781        }
782
783        fn filter_evaluation(
784            &self,
785            _row_range: &Range<u64>,
786            _expr: &Expression,
787            _mask: MaskFuture,
788        ) -> VortexResult<MaskFuture> {
789            unimplemented!("not needed for this test");
790        }
791
792        fn projection_evaluation(
793            &self,
794            _row_range: &Range<u64>,
795            _expr: &Expression,
796            _mask: MaskFuture,
797        ) -> VortexResult<ArrayFuture> {
798            Ok(Box::pin(async move {
799                unreachable!("scan should not be polled in this test")
800            }))
801        }
802
803        fn as_any(&self) -> &dyn std::any::Any {
804            self
805        }
806    }
807
808    #[test]
809    fn into_stream_first_poll_does_not_block() {
810        let gate = Arc::new(Mutex::new(()));
811        let guard = gate.lock();
812
813        let calls = Arc::new(AtomicUsize::new(0));
814        let reader = Arc::new(BlockingSplitsLayoutReader::new(
815            Arc::clone(&gate),
816            Arc::clone(&calls),
817        ));
818
819        let runtime = SingleThreadRuntime::default();
820        let session = session_with_handle(runtime.handle());
821
822        let mut stream = ScanBuilder::new(session, reader).into_stream().unwrap();
823
824        let (send, recv) = mpsc::channel::<bool>();
825        let join = std::thread::spawn(move || {
826            let waker = noop_waker_ref();
827            let mut cx = Context::from_waker(waker);
828            let poll = Pin::new(&mut stream).poll_next(&mut cx);
829            let _ = send.send(matches!(poll, Poll::Pending));
830        });
831
832        let polled_pending = recv.recv_timeout(Duration::from_secs(1)).ok();
833
834        // Always release the gate and join the thread so failures don't hang the test process.
835        drop(guard);
836        drop(join.join());
837
838        let polled_pending = polled_pending.expect("poll_next blocked; expected quick return");
839        assert!(
840            polled_pending,
841            "expected Poll::Pending while prepare is blocked"
842        );
843        assert_eq!(calls.load(Ordering::Relaxed), 0);
844
845        drop(runtime);
846    }
847
848    #[test]
849    fn into_stream_with_row_range() -> VortexResult<()> {
850        let mut ctx = LEGACY_SESSION.create_execution_ctx();
851        let calls = Arc::new(AtomicUsize::new(0));
852        let reader = Arc::new(SplittingLayoutReader::new(Arc::clone(&calls)));
853
854        let runtime = SingleThreadRuntime::default();
855        let session = session_with_handle(runtime.handle());
856
857        let stream = ScanBuilder::new(session, reader)
858            .with_row_range(1..3)
859            .into_stream()?;
860        let mut iter = runtime.block_on_stream(stream);
861
862        let mut values = Vec::new();
863        for chunk in &mut iter {
864            let prim = chunk?.execute::<PrimitiveArray>(&mut ctx)?;
865            values.extend(prim.into_buffer::<i32>().iter().copied());
866        }
867
868        assert_eq!(calls.load(Ordering::Relaxed), 1);
869        assert_eq!(values.as_ref(), [1, 2]);
870
871        Ok(())
872    }
873}