Skip to main content

datafusion_datasource/file_stream/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A generic stream over file format readers that can be used by
19//! any file format that read its files from start to end.
20//!
21//! Note: Most traits here need to be marked `Sync + Send` to be
22//! compliant with the `SendableRecordBatchStream` trait.
23
24mod builder;
25mod metrics;
26mod scan_state;
27pub(crate) mod work_source;
28
29use std::pin::Pin;
30use std::sync::Arc;
31use std::task::{Context, Poll};
32
33use crate::PartitionedFile;
34use crate::file_scan_config::FileScanConfig;
35use arrow::datatypes::SchemaRef;
36use datafusion_common::Result;
37use datafusion_execution::RecordBatchStream;
38use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
39
40use arrow::record_batch::RecordBatch;
41
42use futures::Stream;
43use futures::future::BoxFuture;
44use futures::stream::BoxStream;
45
46use self::scan_state::{ScanAndReturn, ScanState};
47
48pub use builder::FileStreamBuilder;
49pub use metrics::{FileStreamMetrics, StartableTime};
50
51/// A stream that iterates record batch by record batch, file over file.
52pub struct FileStream {
53    /// The stream schema (file schema including partition columns and after
54    /// projection).
55    projected_schema: SchemaRef,
56    /// The stream state
57    state: FileStreamState,
58    /// runtime baseline metrics
59    baseline_metrics: BaselineMetrics,
60}
61
62impl FileStream {
63    /// Create a new `FileStream` using the give `FileOpener` to scan underlying files
64    #[deprecated(since = "54.0.0", note = "Use FileStreamBuilder instead")]
65    pub fn new(
66        config: &FileScanConfig,
67        partition: usize,
68        file_opener: Arc<dyn FileOpener>,
69        metrics: &ExecutionPlanMetricsSet,
70    ) -> Result<Self> {
71        FileStreamBuilder::new(config)
72            .with_partition(partition)
73            .with_file_opener(file_opener)
74            .with_metrics(metrics)
75            .build()
76    }
77
78    /// Specify the behavior when an error occurs opening or scanning a file
79    ///
80    /// If `OnError::Skip` the stream will skip files which encounter an error and continue
81    /// If `OnError:Fail` (default) the stream will fail and stop processing when an error occurs
82    pub fn with_on_error(mut self, on_error: OnError) -> Self {
83        match &mut self.state {
84            FileStreamState::Scan { scan_state } => scan_state.set_on_error(on_error),
85            FileStreamState::Error | FileStreamState::Done => {
86                // no effect as there are no more files to process
87            }
88        };
89        self
90    }
91
92    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
93        loop {
94            match &mut self.state {
95                FileStreamState::Scan { scan_state: queue } => {
96                    let action = queue.poll_scan(cx);
97                    match action {
98                        ScanAndReturn::Continue => continue,
99                        ScanAndReturn::Done(result) => {
100                            self.state = FileStreamState::Done;
101                            return Poll::Ready(result);
102                        }
103                        ScanAndReturn::Error(err) => {
104                            self.state = FileStreamState::Error;
105                            return Poll::Ready(Some(Err(err)));
106                        }
107                        ScanAndReturn::Return(result) => return result,
108                    }
109                }
110                FileStreamState::Error | FileStreamState::Done => {
111                    return Poll::Ready(None);
112                }
113            }
114        }
115    }
116}
117
118impl Stream for FileStream {
119    type Item = Result<RecordBatch>;
120
121    fn poll_next(
122        mut self: Pin<&mut Self>,
123        cx: &mut Context<'_>,
124    ) -> Poll<Option<Self::Item>> {
125        let result = self.poll_inner(cx);
126        self.baseline_metrics.record_poll(result)
127    }
128}
129
130impl RecordBatchStream for FileStream {
131    fn schema(&self) -> SchemaRef {
132        Arc::clone(&self.projected_schema)
133    }
134}
135
136/// A fallible future that resolves to a stream of [`RecordBatch`]
137pub type FileOpenFuture =
138    BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch>>>>;
139
140/// Describes the behavior of the `FileStream` if file opening or scanning fails
141#[derive(Default)]
142pub enum OnError {
143    /// Fail the entire stream and return the underlying error
144    #[default]
145    Fail,
146    /// Continue scanning, ignoring the failed file
147    Skip,
148}
149
150/// Generic API for opening a file using an [`ObjectStore`] and resolving to a
151/// stream of [`RecordBatch`]
152///
153/// [`ObjectStore`]: object_store::ObjectStore
154pub trait FileOpener: Unpin + Send + Sync {
155    /// Asynchronously open the specified file and return a stream
156    /// of [`RecordBatch`]
157    fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture>;
158}
159
160enum FileStreamState {
161    /// Actively processing readers, ready morsels, and planner work.
162    Scan {
163        /// The ready queues and active reader for the current file.
164        scan_state: Box<ScanState>,
165    },
166    /// Encountered an error
167    Error,
168    /// Finished scanning all requested data, possibly because a limit was reached
169    Done,
170}
171
172#[cfg(test)]
173mod tests {
174    use crate::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
175    use crate::morsel::mocks::{
176        IoFutureId, MockMorselizer, MockPlanBuilder, MockPlanner, MorselId,
177        PendingPlannerBuilder, PollsToResolve,
178    };
179    use crate::source::DataSource;
180    use crate::tests::make_partition;
181    use crate::{PartitionedFile, TableSchema};
182    use arrow::array::{AsArray, RecordBatch};
183    use arrow::datatypes::{DataType, Field, Int32Type, Schema};
184    use datafusion_common::DataFusionError;
185    use datafusion_common::error::Result;
186    use datafusion_execution::object_store::ObjectStoreUrl;
187    use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
188    use futures::{FutureExt as _, StreamExt as _};
189    use std::collections::{BTreeMap, VecDeque};
190    use std::sync::Arc;
191    use std::sync::atomic::{AtomicUsize, Ordering};
192
193    use crate::file_stream::{
194        FileOpenFuture, FileOpener, FileStream, FileStreamBuilder, OnError,
195        work_source::SharedWorkSource,
196    };
197    use crate::test_util::MockSource;
198
199    use datafusion_common::{assert_batches_eq, exec_err, internal_err};
200
201    /// Test identifier for one `FileStream` partition.
202    #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
203    struct PartitionId(usize);
204
205    /// Test `FileOpener` which will simulate errors during file opening or scanning
206    #[derive(Default)]
207    struct TestOpener {
208        /// Index in stream of files which should throw an error while opening
209        error_opening_idx: Vec<usize>,
210        /// Index in stream of files which should throw an error while scanning
211        error_scanning_idx: Vec<usize>,
212        /// Index of last file in stream
213        current_idx: AtomicUsize,
214        /// `RecordBatch` to return
215        records: Vec<RecordBatch>,
216    }
217
218    impl FileOpener for TestOpener {
219        fn open(&self, _partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
220            let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
221
222            if self.error_opening_idx.contains(&idx) {
223                Ok(futures::future::ready(internal_err!("error opening")).boxed())
224            } else if self.error_scanning_idx.contains(&idx) {
225                let error = futures::future::ready(exec_err!("error scanning"));
226                let stream = futures::stream::once(error).boxed();
227                Ok(futures::future::ready(Ok(stream)).boxed())
228            } else {
229                let iterator = self.records.clone().into_iter().map(Ok);
230                let stream = futures::stream::iter(iterator).boxed();
231                Ok(futures::future::ready(Ok(stream)).boxed())
232            }
233        }
234    }
235
236    #[derive(Default)]
237    struct FileStreamTest {
238        /// Number of files in the stream
239        num_files: usize,
240        /// Global limit of records emitted by the stream
241        limit: Option<usize>,
242        /// Error-handling behavior of the stream
243        on_error: OnError,
244        /// Mock `FileOpener`
245        opener: TestOpener,
246    }
247
248    impl FileStreamTest {
249        pub fn new() -> Self {
250            Self::default()
251        }
252
253        /// Specify the number of files in the stream
254        pub fn with_num_files(mut self, num_files: usize) -> Self {
255            self.num_files = num_files;
256            self
257        }
258
259        /// Specify the limit
260        pub fn with_limit(mut self, limit: Option<usize>) -> Self {
261            self.limit = limit;
262            self
263        }
264
265        /// Specify the index of files in the stream which should
266        /// throw an error when opening
267        pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
268            self.opener.error_opening_idx = idx;
269            self
270        }
271
272        /// Specify the index of files in the stream which should
273        /// throw an error when scanning
274        pub fn with_scan_errors(mut self, idx: Vec<usize>) -> Self {
275            self.opener.error_scanning_idx = idx;
276            self
277        }
278
279        /// Specify the behavior of the stream when an error occurs
280        pub fn with_on_error(mut self, on_error: OnError) -> Self {
281            self.on_error = on_error;
282            self
283        }
284
285        /// Specify the record batches that should be returned from each
286        /// file that is successfully scanned
287        pub fn with_records(mut self, records: Vec<RecordBatch>) -> Self {
288            self.opener.records = records;
289            self
290        }
291
292        /// Collect the results of the `FileStream`
293        pub async fn result(self) -> Result<Vec<RecordBatch>> {
294            let file_schema = self
295                .opener
296                .records
297                .first()
298                .map(|batch| batch.schema())
299                .unwrap_or_else(|| Arc::new(Schema::empty()));
300
301            // let ctx = SessionContext::new();
302            let mock_files: Vec<(String, u64)> = (0..self.num_files)
303                .map(|idx| (format!("mock_file{idx}"), 10_u64))
304                .collect();
305
306            // let mock_files_ref: Vec<(&str, u64)> = mock_files
307            //     .iter()
308            //     .map(|(name, size)| (name.as_str(), *size))
309            //     .collect();
310
311            let file_group = mock_files
312                .into_iter()
313                .map(|(name, size)| PartitionedFile::new(name, size))
314                .collect();
315
316            let on_error = self.on_error;
317
318            let table_schema = TableSchema::new(file_schema, vec![]);
319            let config = FileScanConfigBuilder::new(
320                ObjectStoreUrl::parse("test:///").unwrap(),
321                Arc::new(MockSource::new(table_schema)),
322            )
323            .with_file_group(file_group)
324            .with_limit(self.limit)
325            .build();
326            let metrics_set = ExecutionPlanMetricsSet::new();
327            let file_stream = FileStreamBuilder::new(&config)
328                .with_partition(0)
329                .with_file_opener(Arc::new(self.opener))
330                .with_metrics(&metrics_set)
331                .with_on_error(on_error)
332                .build()?;
333
334            file_stream
335                .collect::<Vec<_>>()
336                .await
337                .into_iter()
338                .collect::<Result<Vec<_>>>()
339        }
340    }
341
342    /// helper that creates a stream of 2 files with the same pair of batches in each ([0,1,2] and [0,1])
343    async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
344        FileStreamTest::new()
345            .with_records(vec![make_partition(3), make_partition(2)])
346            .with_num_files(2)
347            .with_limit(limit)
348            .result()
349            .await
350            .expect("error executing stream")
351    }
352
353    /// Create the smallest valid file scan config for builder validation tests.
354    fn builder_test_config() -> FileScanConfig {
355        let table_schema = TableSchema::new(Arc::new(Schema::empty()), vec![]);
356        FileScanConfigBuilder::new(
357            ObjectStoreUrl::parse("test:///").unwrap(),
358            Arc::new(MockSource::new(table_schema)),
359        )
360        .with_file(PartitionedFile::new("mock_file", 10))
361        .build()
362    }
363
364    /// Convenience helper to keep builder error assertions focused on the
365    /// specific missing or invalid input under test.
366    fn builder_error(builder: FileStreamBuilder<'_>) -> String {
367        builder.build().err().unwrap().to_string()
368    }
369
370    #[tokio::test]
371    async fn on_error_opening() -> Result<()> {
372        let batches = FileStreamTest::new()
373            .with_records(vec![make_partition(3), make_partition(2)])
374            .with_num_files(2)
375            .with_on_error(OnError::Skip)
376            .with_open_errors(vec![0])
377            .result()
378            .await?;
379
380        #[rustfmt::skip]
381        assert_batches_eq!(&[
382            "+---+",
383            "| i |",
384            "+---+",
385            "| 0 |",
386            "| 1 |",
387            "| 2 |",
388            "| 0 |",
389            "| 1 |",
390            "+---+",
391        ], &batches);
392
393        let batches = FileStreamTest::new()
394            .with_records(vec![make_partition(3), make_partition(2)])
395            .with_num_files(2)
396            .with_on_error(OnError::Skip)
397            .with_open_errors(vec![1])
398            .result()
399            .await?;
400
401        #[rustfmt::skip]
402        assert_batches_eq!(&[
403            "+---+",
404            "| i |",
405            "+---+",
406            "| 0 |",
407            "| 1 |",
408            "| 2 |",
409            "| 0 |",
410            "| 1 |",
411            "+---+",
412        ], &batches);
413
414        let batches = FileStreamTest::new()
415            .with_records(vec![make_partition(3), make_partition(2)])
416            .with_num_files(2)
417            .with_on_error(OnError::Skip)
418            .with_open_errors(vec![0, 1])
419            .result()
420            .await?;
421
422        #[rustfmt::skip]
423        assert_batches_eq!(&[
424            "++",
425            "++",
426        ], &batches);
427
428        Ok(())
429    }
430
431    #[tokio::test]
432    async fn on_error_scanning_fail() -> Result<()> {
433        let result = FileStreamTest::new()
434            .with_records(vec![make_partition(3), make_partition(2)])
435            .with_num_files(2)
436            .with_on_error(OnError::Fail)
437            .with_scan_errors(vec![1])
438            .result()
439            .await;
440
441        assert!(result.is_err());
442
443        Ok(())
444    }
445
446    #[tokio::test]
447    async fn on_error_opening_fail() -> Result<()> {
448        let result = FileStreamTest::new()
449            .with_records(vec![make_partition(3), make_partition(2)])
450            .with_num_files(2)
451            .with_on_error(OnError::Fail)
452            .with_open_errors(vec![1])
453            .result()
454            .await;
455
456        assert!(result.is_err());
457
458        Ok(())
459    }
460
461    #[tokio::test]
462    async fn on_error_scanning() -> Result<()> {
463        let batches = FileStreamTest::new()
464            .with_records(vec![make_partition(3), make_partition(2)])
465            .with_num_files(2)
466            .with_on_error(OnError::Skip)
467            .with_scan_errors(vec![0])
468            .result()
469            .await?;
470
471        #[rustfmt::skip]
472        assert_batches_eq!(&[
473            "+---+",
474            "| i |",
475            "+---+",
476            "| 0 |",
477            "| 1 |",
478            "| 2 |",
479            "| 0 |",
480            "| 1 |",
481            "+---+",
482        ], &batches);
483
484        let batches = FileStreamTest::new()
485            .with_records(vec![make_partition(3), make_partition(2)])
486            .with_num_files(2)
487            .with_on_error(OnError::Skip)
488            .with_scan_errors(vec![1])
489            .result()
490            .await?;
491
492        #[rustfmt::skip]
493        assert_batches_eq!(&[
494            "+---+",
495            "| i |",
496            "+---+",
497            "| 0 |",
498            "| 1 |",
499            "| 2 |",
500            "| 0 |",
501            "| 1 |",
502            "+---+",
503        ], &batches);
504
505        let batches = FileStreamTest::new()
506            .with_records(vec![make_partition(3), make_partition(2)])
507            .with_num_files(2)
508            .with_on_error(OnError::Skip)
509            .with_scan_errors(vec![0, 1])
510            .result()
511            .await?;
512
513        #[rustfmt::skip]
514        assert_batches_eq!(&[
515            "++",
516            "++",
517        ], &batches);
518
519        Ok(())
520    }
521
522    #[tokio::test]
523    async fn on_error_mixed() -> Result<()> {
524        let batches = FileStreamTest::new()
525            .with_records(vec![make_partition(3), make_partition(2)])
526            .with_num_files(3)
527            .with_on_error(OnError::Skip)
528            .with_open_errors(vec![1])
529            .with_scan_errors(vec![0])
530            .result()
531            .await?;
532
533        #[rustfmt::skip]
534        assert_batches_eq!(&[
535            "+---+",
536            "| i |",
537            "+---+",
538            "| 0 |",
539            "| 1 |",
540            "| 2 |",
541            "| 0 |",
542            "| 1 |",
543            "+---+",
544        ], &batches);
545
546        let batches = FileStreamTest::new()
547            .with_records(vec![make_partition(3), make_partition(2)])
548            .with_num_files(3)
549            .with_on_error(OnError::Skip)
550            .with_open_errors(vec![0])
551            .with_scan_errors(vec![1])
552            .result()
553            .await?;
554
555        #[rustfmt::skip]
556        assert_batches_eq!(&[
557            "+---+",
558            "| i |",
559            "+---+",
560            "| 0 |",
561            "| 1 |",
562            "| 2 |",
563            "| 0 |",
564            "| 1 |",
565            "+---+",
566        ], &batches);
567
568        let batches = FileStreamTest::new()
569            .with_records(vec![make_partition(3), make_partition(2)])
570            .with_num_files(3)
571            .with_on_error(OnError::Skip)
572            .with_open_errors(vec![2])
573            .with_scan_errors(vec![0, 1])
574            .result()
575            .await?;
576
577        #[rustfmt::skip]
578        assert_batches_eq!(&[
579            "++",
580            "++",
581        ], &batches);
582
583        let batches = FileStreamTest::new()
584            .with_records(vec![make_partition(3), make_partition(2)])
585            .with_num_files(3)
586            .with_on_error(OnError::Skip)
587            .with_open_errors(vec![0, 2])
588            .with_scan_errors(vec![1])
589            .result()
590            .await?;
591
592        #[rustfmt::skip]
593        assert_batches_eq!(&[
594            "++",
595            "++",
596        ], &batches);
597
598        Ok(())
599    }
600
601    #[tokio::test]
602    async fn without_limit() -> Result<()> {
603        let batches = create_and_collect(None).await;
604
605        #[rustfmt::skip]
606        assert_batches_eq!(&[
607            "+---+",
608            "| i |",
609            "+---+",
610            "| 0 |",
611            "| 1 |",
612            "| 2 |",
613            "| 0 |",
614            "| 1 |",
615            "| 0 |",
616            "| 1 |",
617            "| 2 |",
618            "| 0 |",
619            "| 1 |",
620            "+---+",
621        ], &batches);
622
623        Ok(())
624    }
625
626    #[tokio::test]
627    async fn with_limit_between_files() -> Result<()> {
628        let batches = create_and_collect(Some(5)).await;
629        #[rustfmt::skip]
630        assert_batches_eq!(&[
631            "+---+",
632            "| i |",
633            "+---+",
634            "| 0 |",
635            "| 1 |",
636            "| 2 |",
637            "| 0 |",
638            "| 1 |",
639            "+---+",
640        ], &batches);
641
642        Ok(())
643    }
644
645    #[tokio::test]
646    async fn with_limit_at_middle_of_batch() -> Result<()> {
647        let batches = create_and_collect(Some(6)).await;
648        #[rustfmt::skip]
649        assert_batches_eq!(&[
650            "+---+",
651            "| i |",
652            "+---+",
653            "| 0 |",
654            "| 1 |",
655            "| 2 |",
656            "| 0 |",
657            "| 1 |",
658            "| 0 |",
659            "+---+",
660        ], &batches);
661
662        Ok(())
663    }
664
665    #[test]
666    fn builder_requires_partition_file_opener_and_metrics() {
667        let config = builder_test_config();
668
669        let err = builder_error(FileStreamBuilder::new(&config));
670        assert!(err.contains("FileStreamBuilder missing required partition"));
671
672        let err = builder_error(FileStreamBuilder::new(&config).with_partition(0));
673        assert!(err.contains("FileStreamBuilder missing required morselizer"));
674
675        let err = builder_error(
676            FileStreamBuilder::new(&config)
677                .with_partition(0)
678                .with_file_opener(Arc::new(TestOpener::default())),
679        );
680        assert!(err.contains("FileStreamBuilder missing required metrics"));
681    }
682
683    #[test]
684    fn builder_errors_on_invalid_partition() {
685        let config = builder_test_config();
686        let metrics = ExecutionPlanMetricsSet::new();
687
688        let err = builder_error(
689            FileStreamBuilder::new(&config)
690                .with_partition(1)
691                .with_file_opener(Arc::new(TestOpener::default()))
692                .with_metrics(&metrics),
693        );
694        assert!(err.contains("FileStreamBuilder invalid partition index: 1"));
695    }
696
697    /// Verifies the simplest morsel-driven flow: one planner produces one
698    /// morsel immediately, and that morsel is then scanned to completion.
699    #[tokio::test]
700    async fn morsel_no_io() -> Result<()> {
701        let test = FileStreamMorselTest::new().with_file(
702            MockPlanner::builder("file1.parquet")
703                .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 42))
704                .return_none(),
705        );
706
707        insta::assert_snapshot!(test.run().await.unwrap(), @r"
708        ----- Output Stream -----
709        Batch: 42
710        Done
711        ----- File Stream Events -----
712        morselize_file: file1.parquet
713        planner_created: file1.parquet
714        planner_called: file1.parquet
715        morsel_produced: file1.parquet, MorselId(10)
716        morsel_stream_started: MorselId(10)
717        morsel_stream_batch_produced: MorselId(10), BatchId(42)
718        morsel_stream_finished: MorselId(10)
719        ");
720
721        Ok(())
722    }
723
724    /// Verifies that a planner can block on one I/O phase and then produce a
725    /// morsel containing two batches.
726    #[tokio::test]
727    async fn morsel_single_io_two_batches() -> Result<()> {
728        let test = FileStreamMorselTest::new().with_file(
729            MockPlanner::builder("file1.parquet")
730                .add_plan(
731                    PendingPlannerBuilder::new(IoFutureId(1))
732                        .with_polls_to_resolve(PollsToResolve(1)),
733                )
734                .add_plan(
735                    MockPlanBuilder::new()
736                        .with_morsel_batches(MorselId(10), vec![42, 43]),
737                )
738                .return_none(),
739        );
740
741        insta::assert_snapshot!(test.run().await.unwrap(), @r"
742        ----- Output Stream -----
743        Batch: 42
744        Batch: 43
745        Done
746        ----- File Stream Events -----
747        morselize_file: file1.parquet
748        planner_created: file1.parquet
749        planner_called: file1.parquet
750        io_future_created: file1.parquet, IoFutureId(1)
751        io_future_polled: file1.parquet, IoFutureId(1)
752        io_future_polled: file1.parquet, IoFutureId(1)
753        io_future_resolved: file1.parquet, IoFutureId(1)
754        planner_called: file1.parquet
755        morsel_produced: file1.parquet, MorselId(10)
756        morsel_stream_started: MorselId(10)
757        morsel_stream_batch_produced: MorselId(10), BatchId(42)
758        morsel_stream_batch_produced: MorselId(10), BatchId(43)
759        morsel_stream_finished: MorselId(10)
760        ");
761
762        Ok(())
763    }
764
765    /// Verifies that a planner can traverse two sequential I/O phases before
766    /// producing one batch, similar to Parquet.
767    #[tokio::test]
768    async fn morsel_two_ios_one_batch() -> Result<()> {
769        let test = FileStreamMorselTest::new().with_file(
770            MockPlanner::builder("file1.parquet")
771                .add_plan(PendingPlannerBuilder::new(IoFutureId(1)))
772                .add_plan(PendingPlannerBuilder::new(IoFutureId(2)))
773                .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 42))
774                .return_none(),
775        );
776
777        insta::assert_snapshot!(test.run().await.unwrap(), @r"
778        ----- Output Stream -----
779        Batch: 42
780        Done
781        ----- File Stream Events -----
782        morselize_file: file1.parquet
783        planner_created: file1.parquet
784        planner_called: file1.parquet
785        io_future_created: file1.parquet, IoFutureId(1)
786        io_future_polled: file1.parquet, IoFutureId(1)
787        io_future_resolved: file1.parquet, IoFutureId(1)
788        planner_called: file1.parquet
789        io_future_created: file1.parquet, IoFutureId(2)
790        io_future_polled: file1.parquet, IoFutureId(2)
791        io_future_resolved: file1.parquet, IoFutureId(2)
792        planner_called: file1.parquet
793        morsel_produced: file1.parquet, MorselId(10)
794        morsel_stream_started: MorselId(10)
795        morsel_stream_batch_produced: MorselId(10), BatchId(42)
796        morsel_stream_finished: MorselId(10)
797        ");
798
799        Ok(())
800    }
801
802    /// Verifies that a planner I/O future can fail and terminate the stream.
803    #[tokio::test]
804    async fn morsel_io_error() -> Result<()> {
805        let test = FileStreamMorselTest::new().with_file(
806            MockPlanner::builder("file1.parquet").add_plan(
807                PendingPlannerBuilder::new(IoFutureId(1))
808                    .with_error("io failed while opening file"),
809            ),
810        );
811
812        insta::assert_snapshot!(test.run().await.unwrap(), @r"
813        ----- Output Stream -----
814        Error: io failed while opening file
815        Done
816        ----- File Stream Events -----
817        morselize_file: file1.parquet
818        planner_created: file1.parquet
819        planner_called: file1.parquet
820        io_future_created: file1.parquet, IoFutureId(1)
821        io_future_polled: file1.parquet, IoFutureId(1)
822        io_future_errored: file1.parquet, IoFutureId(1), io failed while opening file
823        ");
824
825        Ok(())
826    }
827
828    /// Verifies that pending planner I/O does not block draining the current
829    /// morsel stream.
830    #[tokio::test]
831    async fn morsel_pending_planner_does_not_block_active_reader() -> Result<()> {
832        let test = FileStreamMorselTest::new().with_file(
833            MockPlanner::builder("file1.parquet")
834                .add_plan(
835                    MockPlanBuilder::new()
836                        .with_morsel_batches(MorselId(10), vec![41, 42])
837                        .with_pending_planner(IoFutureId(1), PollsToResolve(3), Ok(())),
838                )
839                .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 43))
840                .return_none(),
841        );
842
843        // The key events are:
844        // 1. the first `planner_called` produces `MorselId(10)` and creates `IoFutureId(1)`
845        // 2. `MorselId(10)` continues yielding both batches while that I/O is pending
846        // 3. after the I/O resolves, planning resumes and yields `MorselId(11)`
847        insta::assert_snapshot!(test.run().await.unwrap(), @r"
848        ----- Output Stream -----
849        Batch: 41
850        Batch: 42
851        Batch: 43
852        Done
853        ----- File Stream Events -----
854        morselize_file: file1.parquet
855        planner_created: file1.parquet
856        planner_called: file1.parquet
857        morsel_produced: file1.parquet, MorselId(10)
858        io_future_created: file1.parquet, IoFutureId(1)
859        io_future_polled: file1.parquet, IoFutureId(1)
860        morsel_stream_started: MorselId(10)
861        io_future_polled: file1.parquet, IoFutureId(1)
862        morsel_stream_batch_produced: MorselId(10), BatchId(41)
863        io_future_polled: file1.parquet, IoFutureId(1)
864        morsel_stream_batch_produced: MorselId(10), BatchId(42)
865        io_future_polled: file1.parquet, IoFutureId(1)
866        io_future_resolved: file1.parquet, IoFutureId(1)
867        morsel_stream_finished: MorselId(10)
868        planner_called: file1.parquet
869        morsel_produced: file1.parquet, MorselId(11)
870        morsel_stream_started: MorselId(11)
871        morsel_stream_batch_produced: MorselId(11), BatchId(43)
872        morsel_stream_finished: MorselId(11)
873        ");
874
875        Ok(())
876    }
877
878    /// Verifies that one `plan()` call can return a ready child planner, which
879    /// is then called to produce the morsel.
880    #[tokio::test]
881    async fn morsel_ready_child_planner() -> Result<()> {
882        let child_planner = MockPlanner::builder("child planner")
883            .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 42))
884            .return_none();
885
886        let test = FileStreamMorselTest::new().with_file(
887            MockPlanner::builder("file1.parquet")
888                .add_plan(MockPlanBuilder::new().with_ready_planner(child_planner))
889                .return_none(),
890        );
891
892        insta::assert_snapshot!(test.run().await.unwrap(), @r"
893        ----- Output Stream -----
894        Batch: 42
895        Done
896        ----- File Stream Events -----
897        morselize_file: file1.parquet
898        planner_created: file1.parquet
899        planner_called: file1.parquet
900        planner_created: child planner
901        planner_called: child planner
902        morsel_produced: child planner, MorselId(10)
903        morsel_stream_started: MorselId(10)
904        morsel_stream_batch_produced: MorselId(10), BatchId(42)
905        morsel_stream_finished: MorselId(10)
906        ");
907
908        Ok(())
909    }
910
911    /// Verifies that planning can fail after a successful I/O phase.
912    #[tokio::test]
913    async fn morsel_plan_error_after_io() -> Result<()> {
914        let test = FileStreamMorselTest::new().with_file(
915            MockPlanner::builder("file1.parquet")
916                .add_plan(PendingPlannerBuilder::new(IoFutureId(1)))
917                .return_error("planner failed after io"),
918        );
919
920        insta::assert_snapshot!(test.run().await.unwrap(), @r"
921        ----- Output Stream -----
922        Error: planner failed after io
923        Done
924        ----- File Stream Events -----
925        morselize_file: file1.parquet
926        planner_created: file1.parquet
927        planner_called: file1.parquet
928        io_future_created: file1.parquet, IoFutureId(1)
929        io_future_polled: file1.parquet, IoFutureId(1)
930        io_future_resolved: file1.parquet, IoFutureId(1)
931        planner_called: file1.parquet
932        ");
933
934        Ok(())
935    }
936
937    /// Verifies that `FileStream` scans multiple files in order.
938    #[tokio::test]
939    async fn morsel_multiple_files() -> Result<()> {
940        let test = FileStreamMorselTest::new()
941            .with_file(
942                MockPlanner::builder("file1.parquet")
943                    .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 41))
944                    .return_none(),
945            )
946            .with_file(
947                MockPlanner::builder("file2.parquet")
948                    .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 42))
949                    .return_none(),
950            );
951
952        insta::assert_snapshot!(test.run().await.unwrap(), @r"
953        ----- Output Stream -----
954        Batch: 41
955        Batch: 42
956        Done
957        ----- File Stream Events -----
958        morselize_file: file1.parquet
959        planner_created: file1.parquet
960        planner_called: file1.parquet
961        morsel_produced: file1.parquet, MorselId(10)
962        morsel_stream_started: MorselId(10)
963        morsel_stream_batch_produced: MorselId(10), BatchId(41)
964        morsel_stream_finished: MorselId(10)
965        morselize_file: file2.parquet
966        planner_created: file2.parquet
967        planner_called: file2.parquet
968        morsel_produced: file2.parquet, MorselId(11)
969        morsel_stream_started: MorselId(11)
970        morsel_stream_batch_produced: MorselId(11), BatchId(42)
971        morsel_stream_finished: MorselId(11)
972        ");
973
974        Ok(())
975    }
976
977    /// Verifies that a global limit can stop the stream before a second file is opened.
978    #[tokio::test]
979    async fn morsel_limit_prevents_second_file() -> Result<()> {
980        let test = FileStreamMorselTest::new()
981            .with_file(
982                MockPlanner::builder("file1.parquet")
983                    .add_plan(
984                        MockPlanBuilder::new()
985                            .with_morsel_batches(MorselId(10), vec![41, 42]),
986                    )
987                    .return_none(),
988            )
989            .with_file(
990                MockPlanner::builder("file2.parquet")
991                    .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 43))
992                    .return_none(),
993            )
994            .with_limit(1);
995
996        // Note the snapshot should not ever see planner id2
997        insta::assert_snapshot!(test.run().await.unwrap(), @r"
998        ----- Output Stream -----
999        Batch: 41
1000        Done
1001        ----- File Stream Events -----
1002        morselize_file: file1.parquet
1003        planner_created: file1.parquet
1004        planner_called: file1.parquet
1005        morsel_produced: file1.parquet, MorselId(10)
1006        morsel_stream_started: MorselId(10)
1007        morsel_stream_batch_produced: MorselId(10), BatchId(41)
1008        ");
1009
1010        Ok(())
1011    }
1012
1013    /// Return a morsel test with two partitions:
1014    /// Partition 0: file1, file2, file3
1015    /// Partition 1: file4
1016    ///
1017    /// Partition 1 has only 1 file but it polled first 4 times
1018    fn two_partition_morsel_test() -> FileStreamMorselTest {
1019        FileStreamMorselTest::new()
1020            // Partition 0 has three files
1021            .with_file_in_partition(
1022                PartitionId(0),
1023                MockPlanner::builder("file1.parquet")
1024                    .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101))
1025                    .return_none(),
1026            )
1027            .with_file_in_partition(
1028                PartitionId(0),
1029                MockPlanner::builder("file2.parquet")
1030                    .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102))
1031                    .return_none(),
1032            )
1033            .with_file_in_partition(
1034                PartitionId(0),
1035                MockPlanner::builder("file3.parquet")
1036                    .add_plan(MockPlanBuilder::new().with_morsel(MorselId(12), 103))
1037                    .return_none(),
1038            )
1039            // Partition 1 has only one file, but is polled first
1040            .with_file_in_partition(
1041                PartitionId(1),
1042                MockPlanner::builder("file4.parquet")
1043                    .add_plan(MockPlanBuilder::new().with_morsel(MorselId(13), 201))
1044                    .return_none(),
1045            )
1046            .with_reads(vec![
1047                PartitionId(1),
1048                PartitionId(1),
1049                PartitionId(1),
1050                PartitionId(1),
1051                PartitionId(1),
1052            ])
1053    }
1054
1055    /// Verifies that an idle sibling stream can steal shared files from
1056    /// another stream once it exhausts its own local work.
1057    #[tokio::test]
1058    async fn morsel_shared_files_can_be_stolen() -> Result<()> {
1059        let test = two_partition_morsel_test().with_file_stream_events(false);
1060
1061        // Partition 0 starts with 3 files, but Partition 1 is polled first.
1062        // Since Partition 1 is polled first, it will run all the files even those
1063        // that were assigned to Partition 0.
1064        insta::assert_snapshot!(test.run().await.unwrap(), @r"
1065        ----- Partition 0 -----
1066        Done
1067        ----- Partition 1 -----
1068        Batch: 101
1069        Batch: 102
1070        Batch: 103
1071        Batch: 201
1072        Done
1073        ----- File Stream Events -----
1074        (omitted due to with_file_stream_events(false))
1075        ");
1076
1077        Ok(())
1078    }
1079
1080    /// Verifies that a stream that must preserve order keeps its files local
1081    /// and therefore cannot steal from a sibling shared queue.
1082    #[tokio::test]
1083    async fn morsel_preserve_order_keeps_files_local() -> Result<()> {
1084        // same fixture as `morsel_shared_files_can_be_stolen` but marked as
1085        // preserve-order
1086        let test = two_partition_morsel_test()
1087            .with_preserve_order(true)
1088            .with_file_stream_events(false);
1089
1090        // Even though that Partition 1 is polled first, it can not steal files
1091        // from partition 0. The three files originally assigned to Partition 0
1092        // must be evaluated by Partition 0.
1093        insta::assert_snapshot!(test.run().await.unwrap(), @r"
1094        ----- Partition 0 -----
1095        Batch: 101
1096        Batch: 102
1097        Batch: 103
1098        Done
1099        ----- Partition 1 -----
1100        Batch: 201
1101        Done
1102        ----- File Stream Events -----
1103        (omitted due to with_file_stream_events(false))
1104        ");
1105
1106        Ok(())
1107    }
1108
1109    /// Verifies that `partitioned_by_file_group` disables shared work stealing.
1110    #[tokio::test]
1111    async fn morsel_partitioned_by_file_group_keeps_files_local() -> Result<()> {
1112        // same fixture as `morsel_shared_files_can_be_stolen` but marked as
1113        // preserve-partitioned
1114        let test = two_partition_morsel_test()
1115            .with_partitioned_by_file_group(true)
1116            .with_file_stream_events(false);
1117
1118        insta::assert_snapshot!(test.run().await.unwrap(), @r"
1119        ----- Partition 0 -----
1120        Batch: 101
1121        Batch: 102
1122        Batch: 103
1123        Done
1124        ----- Partition 1 -----
1125        Batch: 201
1126        Done
1127        ----- File Stream Events -----
1128        (omitted due to with_file_stream_events(false))
1129        ");
1130
1131        Ok(())
1132    }
1133
1134    /// Verifies that an empty sibling can immediately steal shared files when
1135    /// it is polled before the stream that originally owned them.
1136    #[tokio::test]
1137    async fn morsel_empty_sibling_can_steal() -> Result<()> {
1138        let test = FileStreamMorselTest::new()
1139            .with_file_in_partition(
1140                PartitionId(0),
1141                MockPlanner::builder("file1.parquet")
1142                    .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101))
1143                    .return_none(),
1144            )
1145            .with_file_in_partition(
1146                PartitionId(0),
1147                MockPlanner::builder("file2.parquet")
1148                    .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102))
1149                    .return_none(),
1150            )
1151            // Poll the empty sibling first so it steals both files.
1152            .with_reads(vec![PartitionId(1), PartitionId(1), PartitionId(1)])
1153            .with_file_stream_events(false);
1154
1155        insta::assert_snapshot!(test.run().await.unwrap(), @r"
1156        ----- Partition 0 -----
1157        Done
1158        ----- Partition 1 -----
1159        Batch: 101
1160        Batch: 102
1161        Done
1162        ----- File Stream Events -----
1163        (omitted due to with_file_stream_events(false))
1164        ");
1165
1166        Ok(())
1167    }
1168
1169    /// Ensures that if a sibling is built and polled
1170    /// before another sibling has been built and contributed its files to the
1171    /// shared queue, the first sibling does not finish prematurely.
1172    #[tokio::test]
1173    async fn morsel_empty_sibling_can_finish_before_shared_work_exists() -> Result<()> {
1174        let test = FileStreamMorselTest::new()
1175            .with_file_in_partition(
1176                PartitionId(0),
1177                MockPlanner::builder("file1.parquet")
1178                    .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101))
1179                    .return_none(),
1180            )
1181            .with_file_in_partition(
1182                PartitionId(0),
1183                MockPlanner::builder("file2.parquet")
1184                    .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102))
1185                    .return_none(),
1186            )
1187            // Build streams lazily so partition 1 can poll the shared queue
1188            // before partition 0 has contributed its files. Once partition 0
1189            // is built, a later poll of partition 1 can still steal one of
1190            // them from the shared queue.
1191            .with_build_streams_on_first_read(true)
1192            .with_reads(vec![PartitionId(1), PartitionId(0), PartitionId(1)])
1193            .with_file_stream_events(false);
1194
1195        // Partition 1 polls too early once, then later steals one file after
1196        // partition 0 has populated the shared queue.
1197        insta::assert_snapshot!(test.run().await.unwrap(), @r"
1198        ----- Partition 0 -----
1199        Batch: 102
1200        Done
1201        ----- Partition 1 -----
1202        Batch: 101
1203        Done
1204        ----- File Stream Events -----
1205        (omitted due to with_file_stream_events(false))
1206        ");
1207
1208        Ok(())
1209    }
1210
1211    /// Verifies that a sibling hitting its limit does not count shared files
1212    /// left in the queue as already processed by that stream.
1213    #[tokio::test]
1214    async fn morsel_shared_limit_does_not_double_count_files_processed() -> Result<()> {
1215        let test = two_partition_morsel_test();
1216        let unlimited_config = test.test_config();
1217        let limited_config = test.clone().with_limit(1).test_config();
1218        let shared_work_source = limited_config
1219            .create_sibling_state()
1220            .and_then(|state| state.as_ref().downcast_ref::<SharedWorkSource>().cloned())
1221            .expect("shared work source");
1222        let limited_metrics = ExecutionPlanMetricsSet::new();
1223        let unlimited_metrics = ExecutionPlanMetricsSet::new();
1224
1225        let limited_stream = FileStreamBuilder::new(&limited_config)
1226            .with_partition(1)
1227            .with_shared_work_source(Some(shared_work_source.clone()))
1228            .with_morselizer(Box::new(test.morselizer.clone()))
1229            .with_metrics(&limited_metrics)
1230            .build()?;
1231
1232        let unlimited_stream = FileStreamBuilder::new(&unlimited_config)
1233            .with_partition(0)
1234            .with_shared_work_source(Some(shared_work_source))
1235            .with_morselizer(Box::new(test.morselizer))
1236            .with_metrics(&unlimited_metrics)
1237            .build()?;
1238
1239        let limited_output = drain_stream_output(limited_stream).await?;
1240        let unlimited_output = drain_stream_output(unlimited_stream).await?;
1241
1242        insta::assert_snapshot!(format!(
1243            "----- Limited Stream -----\n{limited_output}\n----- Unlimited Stream -----\n{unlimited_output}"
1244        ), @r"
1245        ----- Limited Stream -----
1246        Batch: 101
1247        ----- Unlimited Stream -----
1248        Batch: 102
1249        Batch: 103
1250        Batch: 201
1251        ");
1252
1253        assert_eq!(
1254            metric_count(&limited_metrics, "files_opened"),
1255            1,
1256            "the limited stream should only open the file that produced its output"
1257        );
1258        assert_eq!(
1259            metric_count(&limited_metrics, "files_processed"),
1260            1,
1261            "the limited stream should only mark its own file as processed"
1262        );
1263        assert_eq!(
1264            metric_count(&unlimited_metrics, "files_opened"),
1265            3,
1266            "the draining stream should open the remaining shared files"
1267        );
1268        assert_eq!(
1269            metric_count(&unlimited_metrics, "files_processed"),
1270            3,
1271            "the draining stream should process exactly the files it opened"
1272        );
1273
1274        Ok(())
1275    }
1276
1277    /// Verifies that one fast sibling can drain shared files that originated
1278    /// in more than one other partition.
1279    #[tokio::test]
1280    async fn morsel_one_sibling_can_drain_multiple_siblings() -> Result<()> {
1281        let test = FileStreamMorselTest::new()
1282            .with_file_in_partition(
1283                PartitionId(0),
1284                MockPlanner::builder("file1.parquet")
1285                    .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101))
1286                    .return_none(),
1287            )
1288            // Partition 1 has two files
1289            .with_file_in_partition(
1290                PartitionId(1),
1291                MockPlanner::builder("file2.parquet")
1292                    .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102))
1293                    .return_none(),
1294            )
1295            .with_file_in_partition(
1296                PartitionId(1),
1297                MockPlanner::builder("file3.parquet")
1298                    .add_plan(MockPlanBuilder::new().with_morsel(MorselId(12), 103))
1299                    .return_none(),
1300            )
1301            // Partition 2 starts empty but is polled first, so it should drain
1302            // the shared queue across both sibling partitions.
1303            .with_reads(vec![
1304                PartitionId(2),
1305                PartitionId(2),
1306                PartitionId(1),
1307                PartitionId(2),
1308            ])
1309            .with_file_stream_events(false);
1310
1311        insta::assert_snapshot!(test.run().await.unwrap(), @r"
1312        ----- Partition 0 -----
1313        Done
1314        ----- Partition 1 -----
1315        Batch: 103
1316        Done
1317        ----- Partition 2 -----
1318        Batch: 101
1319        Batch: 102
1320        Done
1321        ----- File Stream Events -----
1322        (omitted due to with_file_stream_events(false))
1323        ");
1324
1325        Ok(())
1326    }
1327
1328    /// Tests how one or more `FileStream`s consume morselized file work.
1329    #[derive(Clone)]
1330    struct FileStreamMorselTest {
1331        morselizer: MockMorselizer,
1332        partition_files: BTreeMap<PartitionId, Vec<String>>,
1333        preserve_order: bool,
1334        partitioned_by_file_group: bool,
1335        file_stream_events: bool,
1336        build_streams_on_first_read: bool,
1337        reads: Vec<PartitionId>,
1338        limit: Option<usize>,
1339    }
1340
1341    impl FileStreamMorselTest {
1342        /// Creates an empty test harness.
1343        fn new() -> Self {
1344            Self {
1345                morselizer: MockMorselizer::new(),
1346                partition_files: BTreeMap::new(),
1347                preserve_order: false,
1348                partitioned_by_file_group: false,
1349                file_stream_events: true,
1350                build_streams_on_first_read: false,
1351                reads: vec![],
1352                limit: None,
1353            }
1354        }
1355
1356        /// Adds one file and its root planner to partition 0.
1357        fn with_file(self, planner: impl Into<MockPlanner>) -> Self {
1358            self.with_file_in_partition(PartitionId(0), planner)
1359        }
1360
1361        /// Adds one file and its root planner to the specified input partition.
1362        fn with_file_in_partition(
1363            mut self,
1364            partition: PartitionId,
1365            planner: impl Into<MockPlanner>,
1366        ) -> Self {
1367            let planner = planner.into();
1368            let file_path = planner.file_path().to_string();
1369            self.morselizer = self.morselizer.with_planner(planner);
1370            self.partition_files
1371                .entry(partition)
1372                .or_default()
1373                .push(file_path);
1374            self
1375        }
1376
1377        /// Marks the stream (and all partitions) to preserve the specified file
1378        /// order.
1379        fn with_preserve_order(mut self, preserve_order: bool) -> Self {
1380            self.preserve_order = preserve_order;
1381            self
1382        }
1383
1384        /// Marks the test scan as pre-partitioned by file group, which should
1385        /// force each stream to keep its own files local.
1386        fn with_partitioned_by_file_group(
1387            mut self,
1388            partitioned_by_file_group: bool,
1389        ) -> Self {
1390            self.partitioned_by_file_group = partitioned_by_file_group;
1391            self
1392        }
1393
1394        /// Controls whether scheduler events are included in the snapshot.
1395        ///
1396        /// When disabled, `run()` still includes the event section header but
1397        /// replaces the trace with a fixed placeholder so tests can focus only
1398        /// on the output batches.
1399        fn with_file_stream_events(mut self, file_stream_events: bool) -> Self {
1400            self.file_stream_events = file_stream_events;
1401            self
1402        }
1403
1404        /// Controls whether streams are all built up front or lazily on their
1405        /// first read.
1406        ///
1407        /// The default builds all streams before polling begins, which matches
1408        /// normal execution. Tests may enable lazy creation to model races
1409        /// where one sibling polls before another has contributed its files to
1410        /// the shared queue.
1411        fn with_build_streams_on_first_read(
1412            mut self,
1413            build_streams_on_first_read: bool,
1414        ) -> Self {
1415            self.build_streams_on_first_read = build_streams_on_first_read;
1416            self
1417        }
1418
1419        /// Sets the partition polling order.
1420        ///
1421        /// `run()` polls these partitions in the listed order first. After
1422        /// those explicit reads are exhausted, it completes to round
1423        /// robin across all configured partitions, skipping any streams that
1424        /// have already finished.
1425        ///
1426        /// This allows testing early scheduling decisions explicit in a test
1427        /// while avoiding a fully scripted poll trace for the remainder.
1428        fn with_reads(mut self, reads: Vec<PartitionId>) -> Self {
1429            self.reads = reads;
1430            self
1431        }
1432
1433        /// Sets a global output limit for all streams created by this test.
1434        fn with_limit(mut self, limit: usize) -> Self {
1435            self.limit = Some(limit);
1436            self
1437        }
1438
1439        /// Runs the test and returns combined stream output and scheduler
1440        /// trace text.
1441        async fn run(self) -> Result<String> {
1442            let observer = self.morselizer.observer().clone();
1443            observer.clear();
1444
1445            let metrics_set = ExecutionPlanMetricsSet::new();
1446            let partition_count = self.num_partitions();
1447
1448            let mut partitions = (0..partition_count)
1449                .map(|_| PartitionState::new())
1450                .collect::<Vec<_>>();
1451
1452            let mut build_order = Vec::new();
1453            for partition in self.reads.iter().map(|partition| partition.0) {
1454                if !build_order.contains(&partition) {
1455                    build_order.push(partition);
1456                }
1457            }
1458            for partition in 0..partition_count {
1459                if !build_order.contains(&partition) {
1460                    build_order.push(partition);
1461                }
1462            }
1463
1464            let config = self.test_config();
1465            // `DataSourceExec::execute` creates one execution-local shared
1466            // state object via `create_sibling_state()` and then passes it
1467            // to `open_with_sibling_state(...)`. These tests build
1468            // `FileStream`s directly, bypassing `DataSourceExec`, so they must
1469            // perform the same setup explicitly when exercising sibling-stream
1470            // work stealing.
1471            let shared_work_source = config.create_sibling_state().and_then(|state| {
1472                state.as_ref().downcast_ref::<SharedWorkSource>().cloned()
1473            });
1474            if !self.build_streams_on_first_read {
1475                for partition in build_order {
1476                    let stream = FileStreamBuilder::new(&config)
1477                        .with_partition(partition)
1478                        .with_shared_work_source(shared_work_source.clone())
1479                        .with_morselizer(Box::new(self.morselizer.clone()))
1480                        .with_metrics(&metrics_set)
1481                        .build()?;
1482                    partitions[partition].set_stream(stream);
1483                }
1484            }
1485
1486            let mut initial_reads: VecDeque<_> = self.reads.into();
1487            let mut next_round_robin = 0;
1488
1489            while !initial_reads.is_empty()
1490                || partitions.iter().any(PartitionState::is_active)
1491            {
1492                let partition = if let Some(partition) = initial_reads.pop_front() {
1493                    partition.0
1494                } else {
1495                    let partition = next_round_robin;
1496                    next_round_robin = (next_round_robin + 1) % partition_count.max(1);
1497                    partition
1498                };
1499
1500                let partition_state = &mut partitions[partition];
1501
1502                if self.build_streams_on_first_read && !partition_state.built {
1503                    let stream = FileStreamBuilder::new(&config)
1504                        .with_partition(partition)
1505                        .with_shared_work_source(shared_work_source.clone())
1506                        .with_morselizer(Box::new(self.morselizer.clone()))
1507                        .with_metrics(&metrics_set)
1508                        .build()?;
1509                    partition_state.set_stream(stream);
1510                }
1511
1512                let Some(stream) = partition_state.stream.as_mut() else {
1513                    continue;
1514                };
1515
1516                match stream.next().await {
1517                    Some(result) => partition_state.push_output(format_result(result)),
1518                    None => partition_state.finish(),
1519                }
1520            }
1521
1522            let output_text = if partition_count == 1 {
1523                format!(
1524                    "----- Output Stream -----\n{}",
1525                    partitions[0].output.join("\n")
1526                )
1527            } else {
1528                partitions
1529                    .into_iter()
1530                    .enumerate()
1531                    .map(|(partition, state)| {
1532                        format!(
1533                            "----- Partition {} -----\n{}",
1534                            partition,
1535                            state.output.join("\n")
1536                        )
1537                    })
1538                    .collect::<Vec<_>>()
1539                    .join("\n")
1540            };
1541
1542            let file_stream_events = if self.file_stream_events {
1543                observer.format_events()
1544            } else {
1545                "(omitted due to with_file_stream_events(false))".to_string()
1546            };
1547
1548            Ok(format!(
1549                "{output_text}\n----- File Stream Events -----\n{file_stream_events}",
1550            ))
1551        }
1552
1553        /// Returns the number of configured partitions, including empty ones
1554        /// that appear only in the explicit read schedule.
1555        fn num_partitions(&self) -> usize {
1556            self.partition_files
1557                .keys()
1558                .map(|partition| partition.0 + 1)
1559                .chain(self.reads.iter().map(|partition| partition.0 + 1))
1560                .max()
1561                .unwrap_or(1)
1562        }
1563
1564        /// Builds a `FileScanConfig` covering every configured partition.
1565        fn test_config(&self) -> FileScanConfig {
1566            let file_groups = (0..self.num_partitions())
1567                .map(|partition| {
1568                    self.partition_files
1569                        .get(&PartitionId(partition))
1570                        .into_iter()
1571                        .flat_map(|files| files.iter())
1572                        .map(|name| PartitionedFile::new(name, 10))
1573                        .collect::<Vec<_>>()
1574                        .into()
1575                })
1576                .collect::<Vec<_>>();
1577
1578            let table_schema = TableSchema::new(
1579                Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
1580                vec![],
1581            );
1582            FileScanConfigBuilder::new(
1583                ObjectStoreUrl::parse("test:///").unwrap(),
1584                Arc::new(MockSource::new(table_schema)),
1585            )
1586            .with_file_groups(file_groups)
1587            .with_limit(self.limit)
1588            .with_preserve_order(self.preserve_order)
1589            .with_partitioned_by_file_group(self.partitioned_by_file_group)
1590            .build()
1591        }
1592    }
1593
1594    /// Formats one stream poll result into a stable snapshot line.
1595    fn format_result(result: Result<RecordBatch>) -> String {
1596        match result {
1597            Ok(batch) => {
1598                let col = batch.column(0).as_primitive::<Int32Type>();
1599                let batch_id = col.value(0);
1600                format!("Batch: {batch_id}")
1601            }
1602            Err(e) => {
1603                // Pull the actual message for external errors rather than
1604                // relying on DataFusionError formatting, which changes if
1605                // backtraces are enabled, etc.
1606                let message = if let DataFusionError::External(generic) = e {
1607                    generic.to_string()
1608                } else {
1609                    e.to_string()
1610                };
1611                format!("Error: {message}")
1612            }
1613        }
1614    }
1615
1616    async fn drain_stream_output(stream: FileStream) -> Result<String> {
1617        let output = stream
1618            .collect::<Vec<_>>()
1619            .await
1620            .into_iter()
1621            .map(|result| result.map(|batch| format_result(Ok(batch))))
1622            .collect::<Result<Vec<_>>>()?;
1623        Ok(output.join("\n"))
1624    }
1625
1626    fn metric_count(metrics: &ExecutionPlanMetricsSet, name: &str) -> usize {
1627        metrics
1628            .clone_inner()
1629            .sum_by_name(name)
1630            .unwrap_or_else(|| panic!("missing metric: {name}"))
1631            .as_usize()
1632    }
1633
1634    /// Test-only state for one stream partition in [`FileStreamMorselTest`].
1635    struct PartitionState {
1636        /// Whether the `FileStream` for this partition has been built yet.
1637        built: bool,
1638        /// The live stream, if this partition has not finished yet.
1639        stream: Option<FileStream>,
1640        /// Snapshot lines produced by this partition.
1641        output: Vec<String>,
1642    }
1643
1644    impl PartitionState {
1645        /// Create an unbuilt partition with no output yet.
1646        fn new() -> Self {
1647            Self {
1648                built: false,
1649                stream: None,
1650                output: vec![],
1651            }
1652        }
1653
1654        /// Returns true if this partition might still produce output.
1655        fn is_active(&self) -> bool {
1656            !self.built || self.stream.is_some()
1657        }
1658
1659        /// Records that this partition's stream has been built.
1660        fn set_stream(&mut self, stream: FileStream) {
1661            self.stream = Some(stream);
1662            self.built = true;
1663        }
1664
1665        /// Records one formatted output line for this partition.
1666        fn push_output(&mut self, line: String) {
1667            self.output.push(line);
1668        }
1669
1670        /// Marks this partition as finished.
1671        fn finish(&mut self) {
1672            self.push_output("Done".to_string());
1673            self.stream = None;
1674        }
1675    }
1676}