1use std::collections::VecDeque;
25use std::mem;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29
30use crate::PartitionedFile;
31use crate::file_scan_config::FileScanConfig;
32use arrow::datatypes::SchemaRef;
33use datafusion_common::error::Result;
34use datafusion_execution::RecordBatchStream;
35use datafusion_physical_plan::metrics::{
36 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
37};
38
39use arrow::record_batch::RecordBatch;
40use datafusion_common::instant::Instant;
41
42use futures::future::BoxFuture;
43use futures::stream::BoxStream;
44use futures::{FutureExt as _, Stream, StreamExt as _, ready};
45
46pub struct FileStream {
48 file_iter: VecDeque<PartitionedFile>,
50 projected_schema: SchemaRef,
53 remain: Option<usize>,
55 file_opener: Arc<dyn FileOpener>,
58 state: FileStreamState,
60 file_stream_metrics: FileStreamMetrics,
62 baseline_metrics: BaselineMetrics,
64 on_error: OnError,
66}
67
68impl FileStream {
69 pub fn new(
71 config: &FileScanConfig,
72 partition: usize,
73 file_opener: Arc<dyn FileOpener>,
74 metrics: &ExecutionPlanMetricsSet,
75 ) -> Result<Self> {
76 let projected_schema = config.projected_schema()?;
77
78 let file_group = config.file_groups[partition].clone();
79
80 Ok(Self {
81 file_iter: file_group.into_inner().into_iter().collect(),
82 projected_schema,
83 remain: config.limit,
84 file_opener,
85 state: FileStreamState::Idle,
86 file_stream_metrics: FileStreamMetrics::new(metrics, partition),
87 baseline_metrics: BaselineMetrics::new(metrics, partition),
88 on_error: OnError::Fail,
89 })
90 }
91
92 pub fn with_on_error(mut self, on_error: OnError) -> Self {
97 self.on_error = on_error;
98 self
99 }
100
101 fn start_next_file(&mut self) -> Option<Result<FileOpenFuture>> {
106 let part_file = self.file_iter.pop_front()?;
107 Some(self.file_opener.open(part_file))
108 }
109
110 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
111 loop {
112 match &mut self.state {
113 FileStreamState::Idle => {
114 self.file_stream_metrics.time_opening.start();
115
116 match self.start_next_file().transpose() {
117 Ok(Some(future)) => self.state = FileStreamState::Open { future },
118 Ok(None) => return Poll::Ready(None),
119 Err(e) => {
120 self.state = FileStreamState::Error;
121 return Poll::Ready(Some(Err(e)));
122 }
123 }
124 }
125 FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) {
126 Ok(reader) => {
127 self.file_stream_metrics.time_opening.stop();
129 let next = self.start_next_file().transpose();
130 self.file_stream_metrics.time_scanning_until_data.start();
131 self.file_stream_metrics.time_scanning_total.start();
132
133 match next {
134 Ok(Some(next_future)) => {
135 self.state = FileStreamState::Scan {
136 reader,
137 next: Some(NextOpen::Pending(next_future)),
138 };
139 }
140 Ok(None) => {
141 self.state = FileStreamState::Scan { reader, next: None };
142 }
143 Err(e) => {
144 self.state = FileStreamState::Error;
145 return Poll::Ready(Some(Err(e)));
146 }
147 }
148 }
149 Err(e) => {
150 self.file_stream_metrics.file_open_errors.add(1);
151 match self.on_error {
152 OnError::Skip => {
153 self.file_stream_metrics.time_opening.stop();
154 self.state = FileStreamState::Idle
155 }
156 OnError::Fail => {
157 self.state = FileStreamState::Error;
158 return Poll::Ready(Some(Err(e)));
159 }
160 }
161 }
162 },
163 FileStreamState::Scan { reader, next } => {
164 if let Some(next_open_future) = next
166 && let NextOpen::Pending(f) = next_open_future
167 && let Poll::Ready(reader) = f.as_mut().poll(cx)
168 {
169 *next_open_future = NextOpen::Ready(reader);
170 }
171 match ready!(reader.poll_next_unpin(cx)) {
172 Some(Ok(batch)) => {
173 self.file_stream_metrics.time_scanning_until_data.stop();
174 self.file_stream_metrics.time_scanning_total.stop();
175 let batch = match &mut self.remain {
176 Some(remain) => {
177 if *remain > batch.num_rows() {
178 *remain -= batch.num_rows();
179 batch
180 } else {
181 let batch = batch.slice(0, *remain);
182 self.state = FileStreamState::Limit;
183 *remain = 0;
184 batch
185 }
186 }
187 None => batch,
188 };
189 self.file_stream_metrics.time_scanning_total.start();
190 return Poll::Ready(Some(Ok(batch)));
191 }
192 Some(Err(err)) => {
193 self.file_stream_metrics.file_scan_errors.add(1);
194 self.file_stream_metrics.time_scanning_until_data.stop();
195 self.file_stream_metrics.time_scanning_total.stop();
196
197 match self.on_error {
198 OnError::Skip => match mem::take(next) {
200 Some(future) => {
201 self.file_stream_metrics.time_opening.start();
202
203 match future {
204 NextOpen::Pending(future) => {
205 self.state =
206 FileStreamState::Open { future }
207 }
208 NextOpen::Ready(reader) => {
209 self.state = FileStreamState::Open {
210 future: Box::pin(std::future::ready(
211 reader,
212 )),
213 }
214 }
215 }
216 }
217 None => return Poll::Ready(None),
218 },
219 OnError::Fail => {
220 self.state = FileStreamState::Error;
221 return Poll::Ready(Some(Err(err)));
222 }
223 }
224 }
225 None => {
226 self.file_stream_metrics.time_scanning_until_data.stop();
227 self.file_stream_metrics.time_scanning_total.stop();
228
229 match mem::take(next) {
230 Some(future) => {
231 self.file_stream_metrics.time_opening.start();
232
233 match future {
234 NextOpen::Pending(future) => {
235 self.state = FileStreamState::Open { future }
236 }
237 NextOpen::Ready(reader) => {
238 self.state = FileStreamState::Open {
239 future: Box::pin(std::future::ready(
240 reader,
241 )),
242 }
243 }
244 }
245 }
246 None => return Poll::Ready(None),
247 }
248 }
249 }
250 }
251 FileStreamState::Error | FileStreamState::Limit => {
252 return Poll::Ready(None);
253 }
254 }
255 }
256 }
257}
258
259impl Stream for FileStream {
260 type Item = Result<RecordBatch>;
261
262 fn poll_next(
263 mut self: Pin<&mut Self>,
264 cx: &mut Context<'_>,
265 ) -> Poll<Option<Self::Item>> {
266 self.file_stream_metrics.time_processing.start();
267 let result = self.poll_inner(cx);
268 self.file_stream_metrics.time_processing.stop();
269 self.baseline_metrics.record_poll(result)
270 }
271}
272
273impl RecordBatchStream for FileStream {
274 fn schema(&self) -> SchemaRef {
275 Arc::clone(&self.projected_schema)
276 }
277}
278
279pub type FileOpenFuture =
281 BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch>>>>;
282
283#[derive(Default)]
285pub enum OnError {
286 #[default]
288 Fail,
289 Skip,
291}
292
293pub trait FileOpener: Unpin + Send + Sync {
298 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture>;
301}
302
303pub enum NextOpen {
307 Pending(FileOpenFuture),
308 Ready(Result<BoxStream<'static, Result<RecordBatch>>>),
309}
310
311pub enum FileStreamState {
312 Idle,
314 Open {
317 future: FileOpenFuture,
319 },
320 Scan {
323 reader: BoxStream<'static, Result<RecordBatch>>,
325 next: Option<NextOpen>,
329 },
330 Error,
332 Limit,
334}
335
336pub struct StartableTime {
338 pub metrics: Time,
339 pub start: Option<Instant>,
341}
342
343impl StartableTime {
344 pub fn start(&mut self) {
345 assert!(self.start.is_none());
346 self.start = Some(Instant::now());
347 }
348
349 pub fn stop(&mut self) {
350 if let Some(start) = self.start.take() {
351 self.metrics.add_elapsed(start);
352 }
353 }
354}
355
356pub struct FileStreamMetrics {
364 pub time_opening: StartableTime,
375 pub time_scanning_until_data: StartableTime,
381 pub time_scanning_total: StartableTime,
388 pub time_processing: StartableTime,
392 pub file_open_errors: Count,
397 pub file_scan_errors: Count,
402}
403
404impl FileStreamMetrics {
405 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
406 let time_opening = StartableTime {
407 metrics: MetricBuilder::new(metrics)
408 .subset_time("time_elapsed_opening", partition),
409 start: None,
410 };
411
412 let time_scanning_until_data = StartableTime {
413 metrics: MetricBuilder::new(metrics)
414 .subset_time("time_elapsed_scanning_until_data", partition),
415 start: None,
416 };
417
418 let time_scanning_total = StartableTime {
419 metrics: MetricBuilder::new(metrics)
420 .subset_time("time_elapsed_scanning_total", partition),
421 start: None,
422 };
423
424 let time_processing = StartableTime {
425 metrics: MetricBuilder::new(metrics)
426 .subset_time("time_elapsed_processing", partition),
427 start: None,
428 };
429
430 let file_open_errors =
431 MetricBuilder::new(metrics).counter("file_open_errors", partition);
432
433 let file_scan_errors =
434 MetricBuilder::new(metrics).counter("file_scan_errors", partition);
435
436 Self {
437 time_opening,
438 time_scanning_until_data,
439 time_scanning_total,
440 time_processing,
441 file_open_errors,
442 file_scan_errors,
443 }
444 }
445}
446
447#[cfg(test)]
448mod tests {
449 use crate::PartitionedFile;
450 use crate::file_scan_config::FileScanConfigBuilder;
451 use crate::tests::make_partition;
452 use datafusion_common::error::Result;
453 use datafusion_execution::object_store::ObjectStoreUrl;
454 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
455 use futures::{FutureExt as _, StreamExt as _};
456 use std::sync::Arc;
457 use std::sync::atomic::{AtomicUsize, Ordering};
458
459 use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
460 use crate::test_util::MockSource;
461 use arrow::array::RecordBatch;
462 use arrow::datatypes::Schema;
463
464 use datafusion_common::{assert_batches_eq, exec_err, internal_err};
465
466 #[derive(Default)]
468 struct TestOpener {
469 error_opening_idx: Vec<usize>,
471 error_scanning_idx: Vec<usize>,
473 current_idx: AtomicUsize,
475 records: Vec<RecordBatch>,
477 }
478
479 impl FileOpener for TestOpener {
480 fn open(&self, _partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
481 let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
482
483 if self.error_opening_idx.contains(&idx) {
484 Ok(futures::future::ready(internal_err!("error opening")).boxed())
485 } else if self.error_scanning_idx.contains(&idx) {
486 let error = futures::future::ready(exec_err!("error scanning"));
487 let stream = futures::stream::once(error).boxed();
488 Ok(futures::future::ready(Ok(stream)).boxed())
489 } else {
490 let iterator = self.records.clone().into_iter().map(Ok);
491 let stream = futures::stream::iter(iterator).boxed();
492 Ok(futures::future::ready(Ok(stream)).boxed())
493 }
494 }
495 }
496
497 #[derive(Default)]
498 struct FileStreamTest {
499 num_files: usize,
501 limit: Option<usize>,
503 on_error: OnError,
505 opener: TestOpener,
507 }
508
509 impl FileStreamTest {
510 pub fn new() -> Self {
511 Self::default()
512 }
513
514 pub fn with_num_files(mut self, num_files: usize) -> Self {
516 self.num_files = num_files;
517 self
518 }
519
520 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
522 self.limit = limit;
523 self
524 }
525
526 pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
529 self.opener.error_opening_idx = idx;
530 self
531 }
532
533 pub fn with_scan_errors(mut self, idx: Vec<usize>) -> Self {
536 self.opener.error_scanning_idx = idx;
537 self
538 }
539
540 pub fn with_on_error(mut self, on_error: OnError) -> Self {
542 self.on_error = on_error;
543 self
544 }
545
546 pub fn with_records(mut self, records: Vec<RecordBatch>) -> Self {
549 self.opener.records = records;
550 self
551 }
552
553 pub async fn result(self) -> Result<Vec<RecordBatch>> {
555 let file_schema = self
556 .opener
557 .records
558 .first()
559 .map(|batch| batch.schema())
560 .unwrap_or_else(|| Arc::new(Schema::empty()));
561
562 let mock_files: Vec<(String, u64)> = (0..self.num_files)
564 .map(|idx| (format!("mock_file{idx}"), 10_u64))
565 .collect();
566
567 let file_group = mock_files
573 .into_iter()
574 .map(|(name, size)| PartitionedFile::new(name, size))
575 .collect();
576
577 let on_error = self.on_error;
578
579 let table_schema = crate::table_schema::TableSchema::new(file_schema, vec![]);
580 let config = FileScanConfigBuilder::new(
581 ObjectStoreUrl::parse("test:///").unwrap(),
582 Arc::new(MockSource::new(table_schema)),
583 )
584 .with_file_group(file_group)
585 .with_limit(self.limit)
586 .build();
587 let metrics_set = ExecutionPlanMetricsSet::new();
588 let file_stream =
589 FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set)
590 .unwrap()
591 .with_on_error(on_error);
592
593 file_stream
594 .collect::<Vec<_>>()
595 .await
596 .into_iter()
597 .collect::<Result<Vec<_>>>()
598 }
599 }
600
601 async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
603 FileStreamTest::new()
604 .with_records(vec![make_partition(3), make_partition(2)])
605 .with_num_files(2)
606 .with_limit(limit)
607 .result()
608 .await
609 .expect("error executing stream")
610 }
611
612 #[tokio::test]
613 async fn on_error_opening() -> Result<()> {
614 let batches = FileStreamTest::new()
615 .with_records(vec![make_partition(3), make_partition(2)])
616 .with_num_files(2)
617 .with_on_error(OnError::Skip)
618 .with_open_errors(vec![0])
619 .result()
620 .await?;
621
622 #[rustfmt::skip]
623 assert_batches_eq!(&[
624 "+---+",
625 "| i |",
626 "+---+",
627 "| 0 |",
628 "| 1 |",
629 "| 2 |",
630 "| 0 |",
631 "| 1 |",
632 "+---+",
633 ], &batches);
634
635 let batches = FileStreamTest::new()
636 .with_records(vec![make_partition(3), make_partition(2)])
637 .with_num_files(2)
638 .with_on_error(OnError::Skip)
639 .with_open_errors(vec![1])
640 .result()
641 .await?;
642
643 #[rustfmt::skip]
644 assert_batches_eq!(&[
645 "+---+",
646 "| i |",
647 "+---+",
648 "| 0 |",
649 "| 1 |",
650 "| 2 |",
651 "| 0 |",
652 "| 1 |",
653 "+---+",
654 ], &batches);
655
656 let batches = FileStreamTest::new()
657 .with_records(vec![make_partition(3), make_partition(2)])
658 .with_num_files(2)
659 .with_on_error(OnError::Skip)
660 .with_open_errors(vec![0, 1])
661 .result()
662 .await?;
663
664 #[rustfmt::skip]
665 assert_batches_eq!(&[
666 "++",
667 "++",
668 ], &batches);
669
670 Ok(())
671 }
672
673 #[tokio::test]
674 async fn on_error_scanning_fail() -> Result<()> {
675 let result = FileStreamTest::new()
676 .with_records(vec![make_partition(3), make_partition(2)])
677 .with_num_files(2)
678 .with_on_error(OnError::Fail)
679 .with_scan_errors(vec![1])
680 .result()
681 .await;
682
683 assert!(result.is_err());
684
685 Ok(())
686 }
687
688 #[tokio::test]
689 async fn on_error_opening_fail() -> Result<()> {
690 let result = FileStreamTest::new()
691 .with_records(vec![make_partition(3), make_partition(2)])
692 .with_num_files(2)
693 .with_on_error(OnError::Fail)
694 .with_open_errors(vec![1])
695 .result()
696 .await;
697
698 assert!(result.is_err());
699
700 Ok(())
701 }
702
703 #[tokio::test]
704 async fn on_error_scanning() -> Result<()> {
705 let batches = FileStreamTest::new()
706 .with_records(vec![make_partition(3), make_partition(2)])
707 .with_num_files(2)
708 .with_on_error(OnError::Skip)
709 .with_scan_errors(vec![0])
710 .result()
711 .await?;
712
713 #[rustfmt::skip]
714 assert_batches_eq!(&[
715 "+---+",
716 "| i |",
717 "+---+",
718 "| 0 |",
719 "| 1 |",
720 "| 2 |",
721 "| 0 |",
722 "| 1 |",
723 "+---+",
724 ], &batches);
725
726 let batches = FileStreamTest::new()
727 .with_records(vec![make_partition(3), make_partition(2)])
728 .with_num_files(2)
729 .with_on_error(OnError::Skip)
730 .with_scan_errors(vec![1])
731 .result()
732 .await?;
733
734 #[rustfmt::skip]
735 assert_batches_eq!(&[
736 "+---+",
737 "| i |",
738 "+---+",
739 "| 0 |",
740 "| 1 |",
741 "| 2 |",
742 "| 0 |",
743 "| 1 |",
744 "+---+",
745 ], &batches);
746
747 let batches = FileStreamTest::new()
748 .with_records(vec![make_partition(3), make_partition(2)])
749 .with_num_files(2)
750 .with_on_error(OnError::Skip)
751 .with_scan_errors(vec![0, 1])
752 .result()
753 .await?;
754
755 #[rustfmt::skip]
756 assert_batches_eq!(&[
757 "++",
758 "++",
759 ], &batches);
760
761 Ok(())
762 }
763
764 #[tokio::test]
765 async fn on_error_mixed() -> Result<()> {
766 let batches = FileStreamTest::new()
767 .with_records(vec![make_partition(3), make_partition(2)])
768 .with_num_files(3)
769 .with_on_error(OnError::Skip)
770 .with_open_errors(vec![1])
771 .with_scan_errors(vec![0])
772 .result()
773 .await?;
774
775 #[rustfmt::skip]
776 assert_batches_eq!(&[
777 "+---+",
778 "| i |",
779 "+---+",
780 "| 0 |",
781 "| 1 |",
782 "| 2 |",
783 "| 0 |",
784 "| 1 |",
785 "+---+",
786 ], &batches);
787
788 let batches = FileStreamTest::new()
789 .with_records(vec![make_partition(3), make_partition(2)])
790 .with_num_files(3)
791 .with_on_error(OnError::Skip)
792 .with_open_errors(vec![0])
793 .with_scan_errors(vec![1])
794 .result()
795 .await?;
796
797 #[rustfmt::skip]
798 assert_batches_eq!(&[
799 "+---+",
800 "| i |",
801 "+---+",
802 "| 0 |",
803 "| 1 |",
804 "| 2 |",
805 "| 0 |",
806 "| 1 |",
807 "+---+",
808 ], &batches);
809
810 let batches = FileStreamTest::new()
811 .with_records(vec![make_partition(3), make_partition(2)])
812 .with_num_files(3)
813 .with_on_error(OnError::Skip)
814 .with_open_errors(vec![2])
815 .with_scan_errors(vec![0, 1])
816 .result()
817 .await?;
818
819 #[rustfmt::skip]
820 assert_batches_eq!(&[
821 "++",
822 "++",
823 ], &batches);
824
825 let batches = FileStreamTest::new()
826 .with_records(vec![make_partition(3), make_partition(2)])
827 .with_num_files(3)
828 .with_on_error(OnError::Skip)
829 .with_open_errors(vec![0, 2])
830 .with_scan_errors(vec![1])
831 .result()
832 .await?;
833
834 #[rustfmt::skip]
835 assert_batches_eq!(&[
836 "++",
837 "++",
838 ], &batches);
839
840 Ok(())
841 }
842
843 #[tokio::test]
844 async fn without_limit() -> Result<()> {
845 let batches = create_and_collect(None).await;
846
847 #[rustfmt::skip]
848 assert_batches_eq!(&[
849 "+---+",
850 "| i |",
851 "+---+",
852 "| 0 |",
853 "| 1 |",
854 "| 2 |",
855 "| 0 |",
856 "| 1 |",
857 "| 0 |",
858 "| 1 |",
859 "| 2 |",
860 "| 0 |",
861 "| 1 |",
862 "+---+",
863 ], &batches);
864
865 Ok(())
866 }
867
868 #[tokio::test]
869 async fn with_limit_between_files() -> Result<()> {
870 let batches = create_and_collect(Some(5)).await;
871 #[rustfmt::skip]
872 assert_batches_eq!(&[
873 "+---+",
874 "| i |",
875 "+---+",
876 "| 0 |",
877 "| 1 |",
878 "| 2 |",
879 "| 0 |",
880 "| 1 |",
881 "+---+",
882 ], &batches);
883
884 Ok(())
885 }
886
887 #[tokio::test]
888 async fn with_limit_at_middle_of_batch() -> Result<()> {
889 let batches = create_and_collect(Some(6)).await;
890 #[rustfmt::skip]
891 assert_batches_eq!(&[
892 "+---+",
893 "| i |",
894 "+---+",
895 "| 0 |",
896 "| 1 |",
897 "| 2 |",
898 "| 0 |",
899 "| 1 |",
900 "| 0 |",
901 "+---+",
902 ], &batches);
903
904 Ok(())
905 }
906}