1mod builder;
25mod metrics;
26mod scan_state;
27pub(crate) mod work_source;
28
29use std::pin::Pin;
30use std::sync::Arc;
31use std::task::{Context, Poll};
32
33use crate::PartitionedFile;
34use crate::file_scan_config::FileScanConfig;
35use arrow::datatypes::SchemaRef;
36use datafusion_common::Result;
37use datafusion_execution::RecordBatchStream;
38use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
39
40use arrow::record_batch::RecordBatch;
41
42use futures::Stream;
43use futures::future::BoxFuture;
44use futures::stream::BoxStream;
45
46use self::scan_state::{ScanAndReturn, ScanState};
47
48pub use builder::FileStreamBuilder;
49pub use metrics::{FileStreamMetrics, StartableTime};
50
51pub struct FileStream {
53 projected_schema: SchemaRef,
56 state: FileStreamState,
58 baseline_metrics: BaselineMetrics,
60}
61
62impl FileStream {
63 #[deprecated(since = "54.0.0", note = "Use FileStreamBuilder instead")]
65 pub fn new(
66 config: &FileScanConfig,
67 partition: usize,
68 file_opener: Arc<dyn FileOpener>,
69 metrics: &ExecutionPlanMetricsSet,
70 ) -> Result<Self> {
71 FileStreamBuilder::new(config)
72 .with_partition(partition)
73 .with_file_opener(file_opener)
74 .with_metrics(metrics)
75 .build()
76 }
77
78 pub fn with_on_error(mut self, on_error: OnError) -> Self {
83 match &mut self.state {
84 FileStreamState::Scan { scan_state } => scan_state.set_on_error(on_error),
85 FileStreamState::Error | FileStreamState::Done => {
86 }
88 };
89 self
90 }
91
92 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
93 loop {
94 match &mut self.state {
95 FileStreamState::Scan { scan_state: queue } => {
96 let action = queue.poll_scan(cx);
97 match action {
98 ScanAndReturn::Continue => continue,
99 ScanAndReturn::Done(result) => {
100 self.state = FileStreamState::Done;
101 return Poll::Ready(result);
102 }
103 ScanAndReturn::Error(err) => {
104 self.state = FileStreamState::Error;
105 return Poll::Ready(Some(Err(err)));
106 }
107 ScanAndReturn::Return(result) => return result,
108 }
109 }
110 FileStreamState::Error | FileStreamState::Done => {
111 return Poll::Ready(None);
112 }
113 }
114 }
115 }
116}
117
118impl Stream for FileStream {
119 type Item = Result<RecordBatch>;
120
121 fn poll_next(
122 mut self: Pin<&mut Self>,
123 cx: &mut Context<'_>,
124 ) -> Poll<Option<Self::Item>> {
125 let result = self.poll_inner(cx);
126 self.baseline_metrics.record_poll(result)
127 }
128}
129
130impl RecordBatchStream for FileStream {
131 fn schema(&self) -> SchemaRef {
132 Arc::clone(&self.projected_schema)
133 }
134}
135
136pub type FileOpenFuture =
138 BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch>>>>;
139
140#[derive(Default)]
142pub enum OnError {
143 #[default]
145 Fail,
146 Skip,
148}
149
150pub trait FileOpener: Unpin + Send + Sync {
155 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture>;
158}
159
160enum FileStreamState {
161 Scan {
163 scan_state: Box<ScanState>,
165 },
166 Error,
168 Done,
170}
171
172#[cfg(test)]
173mod tests {
174 use crate::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
175 use crate::morsel::mocks::{
176 IoFutureId, MockMorselizer, MockPlanBuilder, MockPlanner, MorselId,
177 PendingPlannerBuilder, PollsToResolve,
178 };
179 use crate::source::DataSource;
180 use crate::tests::make_partition;
181 use crate::{PartitionedFile, TableSchema};
182 use arrow::array::{AsArray, RecordBatch};
183 use arrow::datatypes::{DataType, Field, Int32Type, Schema};
184 use datafusion_common::DataFusionError;
185 use datafusion_common::error::Result;
186 use datafusion_execution::object_store::ObjectStoreUrl;
187 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
188 use futures::{FutureExt as _, StreamExt as _};
189 use std::collections::{BTreeMap, VecDeque};
190 use std::sync::Arc;
191 use std::sync::atomic::{AtomicUsize, Ordering};
192
193 use crate::file_stream::{
194 FileOpenFuture, FileOpener, FileStream, FileStreamBuilder, OnError,
195 work_source::SharedWorkSource,
196 };
197 use crate::test_util::MockSource;
198
199 use datafusion_common::{assert_batches_eq, exec_err, internal_err};
200
201 #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
203 struct PartitionId(usize);
204
205 #[derive(Default)]
207 struct TestOpener {
208 error_opening_idx: Vec<usize>,
210 error_scanning_idx: Vec<usize>,
212 current_idx: AtomicUsize,
214 records: Vec<RecordBatch>,
216 }
217
218 impl FileOpener for TestOpener {
219 fn open(&self, _partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
220 let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
221
222 if self.error_opening_idx.contains(&idx) {
223 Ok(futures::future::ready(internal_err!("error opening")).boxed())
224 } else if self.error_scanning_idx.contains(&idx) {
225 let error = futures::future::ready(exec_err!("error scanning"));
226 let stream = futures::stream::once(error).boxed();
227 Ok(futures::future::ready(Ok(stream)).boxed())
228 } else {
229 let iterator = self.records.clone().into_iter().map(Ok);
230 let stream = futures::stream::iter(iterator).boxed();
231 Ok(futures::future::ready(Ok(stream)).boxed())
232 }
233 }
234 }
235
236 #[derive(Default)]
237 struct FileStreamTest {
238 num_files: usize,
240 limit: Option<usize>,
242 on_error: OnError,
244 opener: TestOpener,
246 }
247
248 impl FileStreamTest {
249 pub fn new() -> Self {
250 Self::default()
251 }
252
253 pub fn with_num_files(mut self, num_files: usize) -> Self {
255 self.num_files = num_files;
256 self
257 }
258
259 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
261 self.limit = limit;
262 self
263 }
264
265 pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
268 self.opener.error_opening_idx = idx;
269 self
270 }
271
272 pub fn with_scan_errors(mut self, idx: Vec<usize>) -> Self {
275 self.opener.error_scanning_idx = idx;
276 self
277 }
278
279 pub fn with_on_error(mut self, on_error: OnError) -> Self {
281 self.on_error = on_error;
282 self
283 }
284
285 pub fn with_records(mut self, records: Vec<RecordBatch>) -> Self {
288 self.opener.records = records;
289 self
290 }
291
292 pub async fn result(self) -> Result<Vec<RecordBatch>> {
294 let file_schema = self
295 .opener
296 .records
297 .first()
298 .map(|batch| batch.schema())
299 .unwrap_or_else(|| Arc::new(Schema::empty()));
300
301 let mock_files: Vec<(String, u64)> = (0..self.num_files)
303 .map(|idx| (format!("mock_file{idx}"), 10_u64))
304 .collect();
305
306 let file_group = mock_files
312 .into_iter()
313 .map(|(name, size)| PartitionedFile::new(name, size))
314 .collect();
315
316 let on_error = self.on_error;
317
318 let table_schema = TableSchema::new(file_schema, vec![]);
319 let config = FileScanConfigBuilder::new(
320 ObjectStoreUrl::parse("test:///").unwrap(),
321 Arc::new(MockSource::new(table_schema)),
322 )
323 .with_file_group(file_group)
324 .with_limit(self.limit)
325 .build();
326 let metrics_set = ExecutionPlanMetricsSet::new();
327 let file_stream = FileStreamBuilder::new(&config)
328 .with_partition(0)
329 .with_file_opener(Arc::new(self.opener))
330 .with_metrics(&metrics_set)
331 .with_on_error(on_error)
332 .build()?;
333
334 file_stream
335 .collect::<Vec<_>>()
336 .await
337 .into_iter()
338 .collect::<Result<Vec<_>>>()
339 }
340 }
341
342 async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
344 FileStreamTest::new()
345 .with_records(vec![make_partition(3), make_partition(2)])
346 .with_num_files(2)
347 .with_limit(limit)
348 .result()
349 .await
350 .expect("error executing stream")
351 }
352
353 fn builder_test_config() -> FileScanConfig {
355 let table_schema = TableSchema::new(Arc::new(Schema::empty()), vec![]);
356 FileScanConfigBuilder::new(
357 ObjectStoreUrl::parse("test:///").unwrap(),
358 Arc::new(MockSource::new(table_schema)),
359 )
360 .with_file(PartitionedFile::new("mock_file", 10))
361 .build()
362 }
363
364 fn builder_error(builder: FileStreamBuilder<'_>) -> String {
367 builder.build().err().unwrap().to_string()
368 }
369
370 #[tokio::test]
371 async fn on_error_opening() -> Result<()> {
372 let batches = FileStreamTest::new()
373 .with_records(vec![make_partition(3), make_partition(2)])
374 .with_num_files(2)
375 .with_on_error(OnError::Skip)
376 .with_open_errors(vec![0])
377 .result()
378 .await?;
379
380 #[rustfmt::skip]
381 assert_batches_eq!(&[
382 "+---+",
383 "| i |",
384 "+---+",
385 "| 0 |",
386 "| 1 |",
387 "| 2 |",
388 "| 0 |",
389 "| 1 |",
390 "+---+",
391 ], &batches);
392
393 let batches = FileStreamTest::new()
394 .with_records(vec![make_partition(3), make_partition(2)])
395 .with_num_files(2)
396 .with_on_error(OnError::Skip)
397 .with_open_errors(vec![1])
398 .result()
399 .await?;
400
401 #[rustfmt::skip]
402 assert_batches_eq!(&[
403 "+---+",
404 "| i |",
405 "+---+",
406 "| 0 |",
407 "| 1 |",
408 "| 2 |",
409 "| 0 |",
410 "| 1 |",
411 "+---+",
412 ], &batches);
413
414 let batches = FileStreamTest::new()
415 .with_records(vec![make_partition(3), make_partition(2)])
416 .with_num_files(2)
417 .with_on_error(OnError::Skip)
418 .with_open_errors(vec![0, 1])
419 .result()
420 .await?;
421
422 #[rustfmt::skip]
423 assert_batches_eq!(&[
424 "++",
425 "++",
426 ], &batches);
427
428 Ok(())
429 }
430
431 #[tokio::test]
432 async fn on_error_scanning_fail() -> Result<()> {
433 let result = FileStreamTest::new()
434 .with_records(vec![make_partition(3), make_partition(2)])
435 .with_num_files(2)
436 .with_on_error(OnError::Fail)
437 .with_scan_errors(vec![1])
438 .result()
439 .await;
440
441 assert!(result.is_err());
442
443 Ok(())
444 }
445
446 #[tokio::test]
447 async fn on_error_opening_fail() -> Result<()> {
448 let result = FileStreamTest::new()
449 .with_records(vec![make_partition(3), make_partition(2)])
450 .with_num_files(2)
451 .with_on_error(OnError::Fail)
452 .with_open_errors(vec![1])
453 .result()
454 .await;
455
456 assert!(result.is_err());
457
458 Ok(())
459 }
460
461 #[tokio::test]
462 async fn on_error_scanning() -> Result<()> {
463 let batches = FileStreamTest::new()
464 .with_records(vec![make_partition(3), make_partition(2)])
465 .with_num_files(2)
466 .with_on_error(OnError::Skip)
467 .with_scan_errors(vec![0])
468 .result()
469 .await?;
470
471 #[rustfmt::skip]
472 assert_batches_eq!(&[
473 "+---+",
474 "| i |",
475 "+---+",
476 "| 0 |",
477 "| 1 |",
478 "| 2 |",
479 "| 0 |",
480 "| 1 |",
481 "+---+",
482 ], &batches);
483
484 let batches = FileStreamTest::new()
485 .with_records(vec![make_partition(3), make_partition(2)])
486 .with_num_files(2)
487 .with_on_error(OnError::Skip)
488 .with_scan_errors(vec![1])
489 .result()
490 .await?;
491
492 #[rustfmt::skip]
493 assert_batches_eq!(&[
494 "+---+",
495 "| i |",
496 "+---+",
497 "| 0 |",
498 "| 1 |",
499 "| 2 |",
500 "| 0 |",
501 "| 1 |",
502 "+---+",
503 ], &batches);
504
505 let batches = FileStreamTest::new()
506 .with_records(vec![make_partition(3), make_partition(2)])
507 .with_num_files(2)
508 .with_on_error(OnError::Skip)
509 .with_scan_errors(vec![0, 1])
510 .result()
511 .await?;
512
513 #[rustfmt::skip]
514 assert_batches_eq!(&[
515 "++",
516 "++",
517 ], &batches);
518
519 Ok(())
520 }
521
522 #[tokio::test]
523 async fn on_error_mixed() -> Result<()> {
524 let batches = FileStreamTest::new()
525 .with_records(vec![make_partition(3), make_partition(2)])
526 .with_num_files(3)
527 .with_on_error(OnError::Skip)
528 .with_open_errors(vec![1])
529 .with_scan_errors(vec![0])
530 .result()
531 .await?;
532
533 #[rustfmt::skip]
534 assert_batches_eq!(&[
535 "+---+",
536 "| i |",
537 "+---+",
538 "| 0 |",
539 "| 1 |",
540 "| 2 |",
541 "| 0 |",
542 "| 1 |",
543 "+---+",
544 ], &batches);
545
546 let batches = FileStreamTest::new()
547 .with_records(vec![make_partition(3), make_partition(2)])
548 .with_num_files(3)
549 .with_on_error(OnError::Skip)
550 .with_open_errors(vec![0])
551 .with_scan_errors(vec![1])
552 .result()
553 .await?;
554
555 #[rustfmt::skip]
556 assert_batches_eq!(&[
557 "+---+",
558 "| i |",
559 "+---+",
560 "| 0 |",
561 "| 1 |",
562 "| 2 |",
563 "| 0 |",
564 "| 1 |",
565 "+---+",
566 ], &batches);
567
568 let batches = FileStreamTest::new()
569 .with_records(vec![make_partition(3), make_partition(2)])
570 .with_num_files(3)
571 .with_on_error(OnError::Skip)
572 .with_open_errors(vec![2])
573 .with_scan_errors(vec![0, 1])
574 .result()
575 .await?;
576
577 #[rustfmt::skip]
578 assert_batches_eq!(&[
579 "++",
580 "++",
581 ], &batches);
582
583 let batches = FileStreamTest::new()
584 .with_records(vec![make_partition(3), make_partition(2)])
585 .with_num_files(3)
586 .with_on_error(OnError::Skip)
587 .with_open_errors(vec![0, 2])
588 .with_scan_errors(vec![1])
589 .result()
590 .await?;
591
592 #[rustfmt::skip]
593 assert_batches_eq!(&[
594 "++",
595 "++",
596 ], &batches);
597
598 Ok(())
599 }
600
601 #[tokio::test]
602 async fn without_limit() -> Result<()> {
603 let batches = create_and_collect(None).await;
604
605 #[rustfmt::skip]
606 assert_batches_eq!(&[
607 "+---+",
608 "| i |",
609 "+---+",
610 "| 0 |",
611 "| 1 |",
612 "| 2 |",
613 "| 0 |",
614 "| 1 |",
615 "| 0 |",
616 "| 1 |",
617 "| 2 |",
618 "| 0 |",
619 "| 1 |",
620 "+---+",
621 ], &batches);
622
623 Ok(())
624 }
625
626 #[tokio::test]
627 async fn with_limit_between_files() -> Result<()> {
628 let batches = create_and_collect(Some(5)).await;
629 #[rustfmt::skip]
630 assert_batches_eq!(&[
631 "+---+",
632 "| i |",
633 "+---+",
634 "| 0 |",
635 "| 1 |",
636 "| 2 |",
637 "| 0 |",
638 "| 1 |",
639 "+---+",
640 ], &batches);
641
642 Ok(())
643 }
644
645 #[tokio::test]
646 async fn with_limit_at_middle_of_batch() -> Result<()> {
647 let batches = create_and_collect(Some(6)).await;
648 #[rustfmt::skip]
649 assert_batches_eq!(&[
650 "+---+",
651 "| i |",
652 "+---+",
653 "| 0 |",
654 "| 1 |",
655 "| 2 |",
656 "| 0 |",
657 "| 1 |",
658 "| 0 |",
659 "+---+",
660 ], &batches);
661
662 Ok(())
663 }
664
665 #[test]
666 fn builder_requires_partition_file_opener_and_metrics() {
667 let config = builder_test_config();
668
669 let err = builder_error(FileStreamBuilder::new(&config));
670 assert!(err.contains("FileStreamBuilder missing required partition"));
671
672 let err = builder_error(FileStreamBuilder::new(&config).with_partition(0));
673 assert!(err.contains("FileStreamBuilder missing required morselizer"));
674
675 let err = builder_error(
676 FileStreamBuilder::new(&config)
677 .with_partition(0)
678 .with_file_opener(Arc::new(TestOpener::default())),
679 );
680 assert!(err.contains("FileStreamBuilder missing required metrics"));
681 }
682
683 #[test]
684 fn builder_errors_on_invalid_partition() {
685 let config = builder_test_config();
686 let metrics = ExecutionPlanMetricsSet::new();
687
688 let err = builder_error(
689 FileStreamBuilder::new(&config)
690 .with_partition(1)
691 .with_file_opener(Arc::new(TestOpener::default()))
692 .with_metrics(&metrics),
693 );
694 assert!(err.contains("FileStreamBuilder invalid partition index: 1"));
695 }
696
697 #[tokio::test]
700 async fn morsel_no_io() -> Result<()> {
701 let test = FileStreamMorselTest::new().with_file(
702 MockPlanner::builder("file1.parquet")
703 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 42))
704 .return_none(),
705 );
706
707 insta::assert_snapshot!(test.run().await.unwrap(), @r"
708 ----- Output Stream -----
709 Batch: 42
710 Done
711 ----- File Stream Events -----
712 morselize_file: file1.parquet
713 planner_created: file1.parquet
714 planner_called: file1.parquet
715 morsel_produced: file1.parquet, MorselId(10)
716 morsel_stream_started: MorselId(10)
717 morsel_stream_batch_produced: MorselId(10), BatchId(42)
718 morsel_stream_finished: MorselId(10)
719 ");
720
721 Ok(())
722 }
723
724 #[tokio::test]
727 async fn morsel_single_io_two_batches() -> Result<()> {
728 let test = FileStreamMorselTest::new().with_file(
729 MockPlanner::builder("file1.parquet")
730 .add_plan(
731 PendingPlannerBuilder::new(IoFutureId(1))
732 .with_polls_to_resolve(PollsToResolve(1)),
733 )
734 .add_plan(
735 MockPlanBuilder::new()
736 .with_morsel_batches(MorselId(10), vec![42, 43]),
737 )
738 .return_none(),
739 );
740
741 insta::assert_snapshot!(test.run().await.unwrap(), @r"
742 ----- Output Stream -----
743 Batch: 42
744 Batch: 43
745 Done
746 ----- File Stream Events -----
747 morselize_file: file1.parquet
748 planner_created: file1.parquet
749 planner_called: file1.parquet
750 io_future_created: file1.parquet, IoFutureId(1)
751 io_future_polled: file1.parquet, IoFutureId(1)
752 io_future_polled: file1.parquet, IoFutureId(1)
753 io_future_resolved: file1.parquet, IoFutureId(1)
754 planner_called: file1.parquet
755 morsel_produced: file1.parquet, MorselId(10)
756 morsel_stream_started: MorselId(10)
757 morsel_stream_batch_produced: MorselId(10), BatchId(42)
758 morsel_stream_batch_produced: MorselId(10), BatchId(43)
759 morsel_stream_finished: MorselId(10)
760 ");
761
762 Ok(())
763 }
764
765 #[tokio::test]
768 async fn morsel_two_ios_one_batch() -> Result<()> {
769 let test = FileStreamMorselTest::new().with_file(
770 MockPlanner::builder("file1.parquet")
771 .add_plan(PendingPlannerBuilder::new(IoFutureId(1)))
772 .add_plan(PendingPlannerBuilder::new(IoFutureId(2)))
773 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 42))
774 .return_none(),
775 );
776
777 insta::assert_snapshot!(test.run().await.unwrap(), @r"
778 ----- Output Stream -----
779 Batch: 42
780 Done
781 ----- File Stream Events -----
782 morselize_file: file1.parquet
783 planner_created: file1.parquet
784 planner_called: file1.parquet
785 io_future_created: file1.parquet, IoFutureId(1)
786 io_future_polled: file1.parquet, IoFutureId(1)
787 io_future_resolved: file1.parquet, IoFutureId(1)
788 planner_called: file1.parquet
789 io_future_created: file1.parquet, IoFutureId(2)
790 io_future_polled: file1.parquet, IoFutureId(2)
791 io_future_resolved: file1.parquet, IoFutureId(2)
792 planner_called: file1.parquet
793 morsel_produced: file1.parquet, MorselId(10)
794 morsel_stream_started: MorselId(10)
795 morsel_stream_batch_produced: MorselId(10), BatchId(42)
796 morsel_stream_finished: MorselId(10)
797 ");
798
799 Ok(())
800 }
801
802 #[tokio::test]
804 async fn morsel_io_error() -> Result<()> {
805 let test = FileStreamMorselTest::new().with_file(
806 MockPlanner::builder("file1.parquet").add_plan(
807 PendingPlannerBuilder::new(IoFutureId(1))
808 .with_error("io failed while opening file"),
809 ),
810 );
811
812 insta::assert_snapshot!(test.run().await.unwrap(), @r"
813 ----- Output Stream -----
814 Error: io failed while opening file
815 Done
816 ----- File Stream Events -----
817 morselize_file: file1.parquet
818 planner_created: file1.parquet
819 planner_called: file1.parquet
820 io_future_created: file1.parquet, IoFutureId(1)
821 io_future_polled: file1.parquet, IoFutureId(1)
822 io_future_errored: file1.parquet, IoFutureId(1), io failed while opening file
823 ");
824
825 Ok(())
826 }
827
828 #[tokio::test]
831 async fn morsel_pending_planner_does_not_block_active_reader() -> Result<()> {
832 let test = FileStreamMorselTest::new().with_file(
833 MockPlanner::builder("file1.parquet")
834 .add_plan(
835 MockPlanBuilder::new()
836 .with_morsel_batches(MorselId(10), vec![41, 42])
837 .with_pending_planner(IoFutureId(1), PollsToResolve(3), Ok(())),
838 )
839 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 43))
840 .return_none(),
841 );
842
843 insta::assert_snapshot!(test.run().await.unwrap(), @r"
848 ----- Output Stream -----
849 Batch: 41
850 Batch: 42
851 Batch: 43
852 Done
853 ----- File Stream Events -----
854 morselize_file: file1.parquet
855 planner_created: file1.parquet
856 planner_called: file1.parquet
857 morsel_produced: file1.parquet, MorselId(10)
858 io_future_created: file1.parquet, IoFutureId(1)
859 io_future_polled: file1.parquet, IoFutureId(1)
860 morsel_stream_started: MorselId(10)
861 io_future_polled: file1.parquet, IoFutureId(1)
862 morsel_stream_batch_produced: MorselId(10), BatchId(41)
863 io_future_polled: file1.parquet, IoFutureId(1)
864 morsel_stream_batch_produced: MorselId(10), BatchId(42)
865 io_future_polled: file1.parquet, IoFutureId(1)
866 io_future_resolved: file1.parquet, IoFutureId(1)
867 morsel_stream_finished: MorselId(10)
868 planner_called: file1.parquet
869 morsel_produced: file1.parquet, MorselId(11)
870 morsel_stream_started: MorselId(11)
871 morsel_stream_batch_produced: MorselId(11), BatchId(43)
872 morsel_stream_finished: MorselId(11)
873 ");
874
875 Ok(())
876 }
877
878 #[tokio::test]
881 async fn morsel_ready_child_planner() -> Result<()> {
882 let child_planner = MockPlanner::builder("child planner")
883 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 42))
884 .return_none();
885
886 let test = FileStreamMorselTest::new().with_file(
887 MockPlanner::builder("file1.parquet")
888 .add_plan(MockPlanBuilder::new().with_ready_planner(child_planner))
889 .return_none(),
890 );
891
892 insta::assert_snapshot!(test.run().await.unwrap(), @r"
893 ----- Output Stream -----
894 Batch: 42
895 Done
896 ----- File Stream Events -----
897 morselize_file: file1.parquet
898 planner_created: file1.parquet
899 planner_called: file1.parquet
900 planner_created: child planner
901 planner_called: child planner
902 morsel_produced: child planner, MorselId(10)
903 morsel_stream_started: MorselId(10)
904 morsel_stream_batch_produced: MorselId(10), BatchId(42)
905 morsel_stream_finished: MorselId(10)
906 ");
907
908 Ok(())
909 }
910
911 #[tokio::test]
913 async fn morsel_plan_error_after_io() -> Result<()> {
914 let test = FileStreamMorselTest::new().with_file(
915 MockPlanner::builder("file1.parquet")
916 .add_plan(PendingPlannerBuilder::new(IoFutureId(1)))
917 .return_error("planner failed after io"),
918 );
919
920 insta::assert_snapshot!(test.run().await.unwrap(), @r"
921 ----- Output Stream -----
922 Error: planner failed after io
923 Done
924 ----- File Stream Events -----
925 morselize_file: file1.parquet
926 planner_created: file1.parquet
927 planner_called: file1.parquet
928 io_future_created: file1.parquet, IoFutureId(1)
929 io_future_polled: file1.parquet, IoFutureId(1)
930 io_future_resolved: file1.parquet, IoFutureId(1)
931 planner_called: file1.parquet
932 ");
933
934 Ok(())
935 }
936
937 #[tokio::test]
939 async fn morsel_multiple_files() -> Result<()> {
940 let test = FileStreamMorselTest::new()
941 .with_file(
942 MockPlanner::builder("file1.parquet")
943 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 41))
944 .return_none(),
945 )
946 .with_file(
947 MockPlanner::builder("file2.parquet")
948 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 42))
949 .return_none(),
950 );
951
952 insta::assert_snapshot!(test.run().await.unwrap(), @r"
953 ----- Output Stream -----
954 Batch: 41
955 Batch: 42
956 Done
957 ----- File Stream Events -----
958 morselize_file: file1.parquet
959 planner_created: file1.parquet
960 planner_called: file1.parquet
961 morsel_produced: file1.parquet, MorselId(10)
962 morsel_stream_started: MorselId(10)
963 morsel_stream_batch_produced: MorselId(10), BatchId(41)
964 morsel_stream_finished: MorselId(10)
965 morselize_file: file2.parquet
966 planner_created: file2.parquet
967 planner_called: file2.parquet
968 morsel_produced: file2.parquet, MorselId(11)
969 morsel_stream_started: MorselId(11)
970 morsel_stream_batch_produced: MorselId(11), BatchId(42)
971 morsel_stream_finished: MorselId(11)
972 ");
973
974 Ok(())
975 }
976
977 #[tokio::test]
979 async fn morsel_limit_prevents_second_file() -> Result<()> {
980 let test = FileStreamMorselTest::new()
981 .with_file(
982 MockPlanner::builder("file1.parquet")
983 .add_plan(
984 MockPlanBuilder::new()
985 .with_morsel_batches(MorselId(10), vec![41, 42]),
986 )
987 .return_none(),
988 )
989 .with_file(
990 MockPlanner::builder("file2.parquet")
991 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 43))
992 .return_none(),
993 )
994 .with_limit(1);
995
996 insta::assert_snapshot!(test.run().await.unwrap(), @r"
998 ----- Output Stream -----
999 Batch: 41
1000 Done
1001 ----- File Stream Events -----
1002 morselize_file: file1.parquet
1003 planner_created: file1.parquet
1004 planner_called: file1.parquet
1005 morsel_produced: file1.parquet, MorselId(10)
1006 morsel_stream_started: MorselId(10)
1007 morsel_stream_batch_produced: MorselId(10), BatchId(41)
1008 ");
1009
1010 Ok(())
1011 }
1012
1013 fn two_partition_morsel_test() -> FileStreamMorselTest {
1019 FileStreamMorselTest::new()
1020 .with_file_in_partition(
1022 PartitionId(0),
1023 MockPlanner::builder("file1.parquet")
1024 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101))
1025 .return_none(),
1026 )
1027 .with_file_in_partition(
1028 PartitionId(0),
1029 MockPlanner::builder("file2.parquet")
1030 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102))
1031 .return_none(),
1032 )
1033 .with_file_in_partition(
1034 PartitionId(0),
1035 MockPlanner::builder("file3.parquet")
1036 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(12), 103))
1037 .return_none(),
1038 )
1039 .with_file_in_partition(
1041 PartitionId(1),
1042 MockPlanner::builder("file4.parquet")
1043 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(13), 201))
1044 .return_none(),
1045 )
1046 .with_reads(vec![
1047 PartitionId(1),
1048 PartitionId(1),
1049 PartitionId(1),
1050 PartitionId(1),
1051 PartitionId(1),
1052 ])
1053 }
1054
1055 #[tokio::test]
1058 async fn morsel_shared_files_can_be_stolen() -> Result<()> {
1059 let test = two_partition_morsel_test().with_file_stream_events(false);
1060
1061 insta::assert_snapshot!(test.run().await.unwrap(), @r"
1065 ----- Partition 0 -----
1066 Done
1067 ----- Partition 1 -----
1068 Batch: 101
1069 Batch: 102
1070 Batch: 103
1071 Batch: 201
1072 Done
1073 ----- File Stream Events -----
1074 (omitted due to with_file_stream_events(false))
1075 ");
1076
1077 Ok(())
1078 }
1079
1080 #[tokio::test]
1083 async fn morsel_preserve_order_keeps_files_local() -> Result<()> {
1084 let test = two_partition_morsel_test()
1087 .with_preserve_order(true)
1088 .with_file_stream_events(false);
1089
1090 insta::assert_snapshot!(test.run().await.unwrap(), @r"
1094 ----- Partition 0 -----
1095 Batch: 101
1096 Batch: 102
1097 Batch: 103
1098 Done
1099 ----- Partition 1 -----
1100 Batch: 201
1101 Done
1102 ----- File Stream Events -----
1103 (omitted due to with_file_stream_events(false))
1104 ");
1105
1106 Ok(())
1107 }
1108
1109 #[tokio::test]
1111 async fn morsel_partitioned_by_file_group_keeps_files_local() -> Result<()> {
1112 let test = two_partition_morsel_test()
1115 .with_partitioned_by_file_group(true)
1116 .with_file_stream_events(false);
1117
1118 insta::assert_snapshot!(test.run().await.unwrap(), @r"
1119 ----- Partition 0 -----
1120 Batch: 101
1121 Batch: 102
1122 Batch: 103
1123 Done
1124 ----- Partition 1 -----
1125 Batch: 201
1126 Done
1127 ----- File Stream Events -----
1128 (omitted due to with_file_stream_events(false))
1129 ");
1130
1131 Ok(())
1132 }
1133
1134 #[tokio::test]
1137 async fn morsel_empty_sibling_can_steal() -> Result<()> {
1138 let test = FileStreamMorselTest::new()
1139 .with_file_in_partition(
1140 PartitionId(0),
1141 MockPlanner::builder("file1.parquet")
1142 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101))
1143 .return_none(),
1144 )
1145 .with_file_in_partition(
1146 PartitionId(0),
1147 MockPlanner::builder("file2.parquet")
1148 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102))
1149 .return_none(),
1150 )
1151 .with_reads(vec![PartitionId(1), PartitionId(1), PartitionId(1)])
1153 .with_file_stream_events(false);
1154
1155 insta::assert_snapshot!(test.run().await.unwrap(), @r"
1156 ----- Partition 0 -----
1157 Done
1158 ----- Partition 1 -----
1159 Batch: 101
1160 Batch: 102
1161 Done
1162 ----- File Stream Events -----
1163 (omitted due to with_file_stream_events(false))
1164 ");
1165
1166 Ok(())
1167 }
1168
1169 #[tokio::test]
1173 async fn morsel_empty_sibling_can_finish_before_shared_work_exists() -> Result<()> {
1174 let test = FileStreamMorselTest::new()
1175 .with_file_in_partition(
1176 PartitionId(0),
1177 MockPlanner::builder("file1.parquet")
1178 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101))
1179 .return_none(),
1180 )
1181 .with_file_in_partition(
1182 PartitionId(0),
1183 MockPlanner::builder("file2.parquet")
1184 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102))
1185 .return_none(),
1186 )
1187 .with_build_streams_on_first_read(true)
1192 .with_reads(vec![PartitionId(1), PartitionId(0), PartitionId(1)])
1193 .with_file_stream_events(false);
1194
1195 insta::assert_snapshot!(test.run().await.unwrap(), @r"
1198 ----- Partition 0 -----
1199 Batch: 102
1200 Done
1201 ----- Partition 1 -----
1202 Batch: 101
1203 Done
1204 ----- File Stream Events -----
1205 (omitted due to with_file_stream_events(false))
1206 ");
1207
1208 Ok(())
1209 }
1210
1211 #[tokio::test]
1214 async fn morsel_shared_limit_does_not_double_count_files_processed() -> Result<()> {
1215 let test = two_partition_morsel_test();
1216 let unlimited_config = test.test_config();
1217 let limited_config = test.clone().with_limit(1).test_config();
1218 let shared_work_source = limited_config
1219 .create_sibling_state()
1220 .and_then(|state| state.as_ref().downcast_ref::<SharedWorkSource>().cloned())
1221 .expect("shared work source");
1222 let limited_metrics = ExecutionPlanMetricsSet::new();
1223 let unlimited_metrics = ExecutionPlanMetricsSet::new();
1224
1225 let limited_stream = FileStreamBuilder::new(&limited_config)
1226 .with_partition(1)
1227 .with_shared_work_source(Some(shared_work_source.clone()))
1228 .with_morselizer(Box::new(test.morselizer.clone()))
1229 .with_metrics(&limited_metrics)
1230 .build()?;
1231
1232 let unlimited_stream = FileStreamBuilder::new(&unlimited_config)
1233 .with_partition(0)
1234 .with_shared_work_source(Some(shared_work_source))
1235 .with_morselizer(Box::new(test.morselizer))
1236 .with_metrics(&unlimited_metrics)
1237 .build()?;
1238
1239 let limited_output = drain_stream_output(limited_stream).await?;
1240 let unlimited_output = drain_stream_output(unlimited_stream).await?;
1241
1242 insta::assert_snapshot!(format!(
1243 "----- Limited Stream -----\n{limited_output}\n----- Unlimited Stream -----\n{unlimited_output}"
1244 ), @r"
1245 ----- Limited Stream -----
1246 Batch: 101
1247 ----- Unlimited Stream -----
1248 Batch: 102
1249 Batch: 103
1250 Batch: 201
1251 ");
1252
1253 assert_eq!(
1254 metric_count(&limited_metrics, "files_opened"),
1255 1,
1256 "the limited stream should only open the file that produced its output"
1257 );
1258 assert_eq!(
1259 metric_count(&limited_metrics, "files_processed"),
1260 1,
1261 "the limited stream should only mark its own file as processed"
1262 );
1263 assert_eq!(
1264 metric_count(&unlimited_metrics, "files_opened"),
1265 3,
1266 "the draining stream should open the remaining shared files"
1267 );
1268 assert_eq!(
1269 metric_count(&unlimited_metrics, "files_processed"),
1270 3,
1271 "the draining stream should process exactly the files it opened"
1272 );
1273
1274 Ok(())
1275 }
1276
1277 #[tokio::test]
1280 async fn morsel_one_sibling_can_drain_multiple_siblings() -> Result<()> {
1281 let test = FileStreamMorselTest::new()
1282 .with_file_in_partition(
1283 PartitionId(0),
1284 MockPlanner::builder("file1.parquet")
1285 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(10), 101))
1286 .return_none(),
1287 )
1288 .with_file_in_partition(
1290 PartitionId(1),
1291 MockPlanner::builder("file2.parquet")
1292 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(11), 102))
1293 .return_none(),
1294 )
1295 .with_file_in_partition(
1296 PartitionId(1),
1297 MockPlanner::builder("file3.parquet")
1298 .add_plan(MockPlanBuilder::new().with_morsel(MorselId(12), 103))
1299 .return_none(),
1300 )
1301 .with_reads(vec![
1304 PartitionId(2),
1305 PartitionId(2),
1306 PartitionId(1),
1307 PartitionId(2),
1308 ])
1309 .with_file_stream_events(false);
1310
1311 insta::assert_snapshot!(test.run().await.unwrap(), @r"
1312 ----- Partition 0 -----
1313 Done
1314 ----- Partition 1 -----
1315 Batch: 103
1316 Done
1317 ----- Partition 2 -----
1318 Batch: 101
1319 Batch: 102
1320 Done
1321 ----- File Stream Events -----
1322 (omitted due to with_file_stream_events(false))
1323 ");
1324
1325 Ok(())
1326 }
1327
1328 #[derive(Clone)]
1330 struct FileStreamMorselTest {
1331 morselizer: MockMorselizer,
1332 partition_files: BTreeMap<PartitionId, Vec<String>>,
1333 preserve_order: bool,
1334 partitioned_by_file_group: bool,
1335 file_stream_events: bool,
1336 build_streams_on_first_read: bool,
1337 reads: Vec<PartitionId>,
1338 limit: Option<usize>,
1339 }
1340
1341 impl FileStreamMorselTest {
1342 fn new() -> Self {
1344 Self {
1345 morselizer: MockMorselizer::new(),
1346 partition_files: BTreeMap::new(),
1347 preserve_order: false,
1348 partitioned_by_file_group: false,
1349 file_stream_events: true,
1350 build_streams_on_first_read: false,
1351 reads: vec![],
1352 limit: None,
1353 }
1354 }
1355
1356 fn with_file(self, planner: impl Into<MockPlanner>) -> Self {
1358 self.with_file_in_partition(PartitionId(0), planner)
1359 }
1360
1361 fn with_file_in_partition(
1363 mut self,
1364 partition: PartitionId,
1365 planner: impl Into<MockPlanner>,
1366 ) -> Self {
1367 let planner = planner.into();
1368 let file_path = planner.file_path().to_string();
1369 self.morselizer = self.morselizer.with_planner(planner);
1370 self.partition_files
1371 .entry(partition)
1372 .or_default()
1373 .push(file_path);
1374 self
1375 }
1376
1377 fn with_preserve_order(mut self, preserve_order: bool) -> Self {
1380 self.preserve_order = preserve_order;
1381 self
1382 }
1383
1384 fn with_partitioned_by_file_group(
1387 mut self,
1388 partitioned_by_file_group: bool,
1389 ) -> Self {
1390 self.partitioned_by_file_group = partitioned_by_file_group;
1391 self
1392 }
1393
1394 fn with_file_stream_events(mut self, file_stream_events: bool) -> Self {
1400 self.file_stream_events = file_stream_events;
1401 self
1402 }
1403
1404 fn with_build_streams_on_first_read(
1412 mut self,
1413 build_streams_on_first_read: bool,
1414 ) -> Self {
1415 self.build_streams_on_first_read = build_streams_on_first_read;
1416 self
1417 }
1418
1419 fn with_reads(mut self, reads: Vec<PartitionId>) -> Self {
1429 self.reads = reads;
1430 self
1431 }
1432
1433 fn with_limit(mut self, limit: usize) -> Self {
1435 self.limit = Some(limit);
1436 self
1437 }
1438
1439 async fn run(self) -> Result<String> {
1442 let observer = self.morselizer.observer().clone();
1443 observer.clear();
1444
1445 let metrics_set = ExecutionPlanMetricsSet::new();
1446 let partition_count = self.num_partitions();
1447
1448 let mut partitions = (0..partition_count)
1449 .map(|_| PartitionState::new())
1450 .collect::<Vec<_>>();
1451
1452 let mut build_order = Vec::new();
1453 for partition in self.reads.iter().map(|partition| partition.0) {
1454 if !build_order.contains(&partition) {
1455 build_order.push(partition);
1456 }
1457 }
1458 for partition in 0..partition_count {
1459 if !build_order.contains(&partition) {
1460 build_order.push(partition);
1461 }
1462 }
1463
1464 let config = self.test_config();
1465 let shared_work_source = config.create_sibling_state().and_then(|state| {
1472 state.as_ref().downcast_ref::<SharedWorkSource>().cloned()
1473 });
1474 if !self.build_streams_on_first_read {
1475 for partition in build_order {
1476 let stream = FileStreamBuilder::new(&config)
1477 .with_partition(partition)
1478 .with_shared_work_source(shared_work_source.clone())
1479 .with_morselizer(Box::new(self.morselizer.clone()))
1480 .with_metrics(&metrics_set)
1481 .build()?;
1482 partitions[partition].set_stream(stream);
1483 }
1484 }
1485
1486 let mut initial_reads: VecDeque<_> = self.reads.into();
1487 let mut next_round_robin = 0;
1488
1489 while !initial_reads.is_empty()
1490 || partitions.iter().any(PartitionState::is_active)
1491 {
1492 let partition = if let Some(partition) = initial_reads.pop_front() {
1493 partition.0
1494 } else {
1495 let partition = next_round_robin;
1496 next_round_robin = (next_round_robin + 1) % partition_count.max(1);
1497 partition
1498 };
1499
1500 let partition_state = &mut partitions[partition];
1501
1502 if self.build_streams_on_first_read && !partition_state.built {
1503 let stream = FileStreamBuilder::new(&config)
1504 .with_partition(partition)
1505 .with_shared_work_source(shared_work_source.clone())
1506 .with_morselizer(Box::new(self.morselizer.clone()))
1507 .with_metrics(&metrics_set)
1508 .build()?;
1509 partition_state.set_stream(stream);
1510 }
1511
1512 let Some(stream) = partition_state.stream.as_mut() else {
1513 continue;
1514 };
1515
1516 match stream.next().await {
1517 Some(result) => partition_state.push_output(format_result(result)),
1518 None => partition_state.finish(),
1519 }
1520 }
1521
1522 let output_text = if partition_count == 1 {
1523 format!(
1524 "----- Output Stream -----\n{}",
1525 partitions[0].output.join("\n")
1526 )
1527 } else {
1528 partitions
1529 .into_iter()
1530 .enumerate()
1531 .map(|(partition, state)| {
1532 format!(
1533 "----- Partition {} -----\n{}",
1534 partition,
1535 state.output.join("\n")
1536 )
1537 })
1538 .collect::<Vec<_>>()
1539 .join("\n")
1540 };
1541
1542 let file_stream_events = if self.file_stream_events {
1543 observer.format_events()
1544 } else {
1545 "(omitted due to with_file_stream_events(false))".to_string()
1546 };
1547
1548 Ok(format!(
1549 "{output_text}\n----- File Stream Events -----\n{file_stream_events}",
1550 ))
1551 }
1552
1553 fn num_partitions(&self) -> usize {
1556 self.partition_files
1557 .keys()
1558 .map(|partition| partition.0 + 1)
1559 .chain(self.reads.iter().map(|partition| partition.0 + 1))
1560 .max()
1561 .unwrap_or(1)
1562 }
1563
1564 fn test_config(&self) -> FileScanConfig {
1566 let file_groups = (0..self.num_partitions())
1567 .map(|partition| {
1568 self.partition_files
1569 .get(&PartitionId(partition))
1570 .into_iter()
1571 .flat_map(|files| files.iter())
1572 .map(|name| PartitionedFile::new(name, 10))
1573 .collect::<Vec<_>>()
1574 .into()
1575 })
1576 .collect::<Vec<_>>();
1577
1578 let table_schema = TableSchema::new(
1579 Arc::new(Schema::new(vec![Field::new("i", DataType::Int32, false)])),
1580 vec![],
1581 );
1582 FileScanConfigBuilder::new(
1583 ObjectStoreUrl::parse("test:///").unwrap(),
1584 Arc::new(MockSource::new(table_schema)),
1585 )
1586 .with_file_groups(file_groups)
1587 .with_limit(self.limit)
1588 .with_preserve_order(self.preserve_order)
1589 .with_partitioned_by_file_group(self.partitioned_by_file_group)
1590 .build()
1591 }
1592 }
1593
1594 fn format_result(result: Result<RecordBatch>) -> String {
1596 match result {
1597 Ok(batch) => {
1598 let col = batch.column(0).as_primitive::<Int32Type>();
1599 let batch_id = col.value(0);
1600 format!("Batch: {batch_id}")
1601 }
1602 Err(e) => {
1603 let message = if let DataFusionError::External(generic) = e {
1607 generic.to_string()
1608 } else {
1609 e.to_string()
1610 };
1611 format!("Error: {message}")
1612 }
1613 }
1614 }
1615
1616 async fn drain_stream_output(stream: FileStream) -> Result<String> {
1617 let output = stream
1618 .collect::<Vec<_>>()
1619 .await
1620 .into_iter()
1621 .map(|result| result.map(|batch| format_result(Ok(batch))))
1622 .collect::<Result<Vec<_>>>()?;
1623 Ok(output.join("\n"))
1624 }
1625
1626 fn metric_count(metrics: &ExecutionPlanMetricsSet, name: &str) -> usize {
1627 metrics
1628 .clone_inner()
1629 .sum_by_name(name)
1630 .unwrap_or_else(|| panic!("missing metric: {name}"))
1631 .as_usize()
1632 }
1633
1634 struct PartitionState {
1636 built: bool,
1638 stream: Option<FileStream>,
1640 output: Vec<String>,
1642 }
1643
1644 impl PartitionState {
1645 fn new() -> Self {
1647 Self {
1648 built: false,
1649 stream: None,
1650 output: vec![],
1651 }
1652 }
1653
1654 fn is_active(&self) -> bool {
1656 !self.built || self.stream.is_some()
1657 }
1658
1659 fn set_stream(&mut self, stream: FileStream) {
1661 self.stream = Some(stream);
1662 self.built = true;
1663 }
1664
1665 fn push_output(&mut self, line: String) {
1667 self.output.push(line);
1668 }
1669
1670 fn finish(&mut self) {
1672 self.push_output("Done".to_string());
1673 self.stream = None;
1674 }
1675 }
1676}