1use std::collections::VecDeque;
25use std::mem;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29
30use crate::file_scan_config::{FileScanConfig, PartitionColumnProjector};
31use crate::PartitionedFile;
32use arrow::datatypes::SchemaRef;
33use datafusion_common::error::Result;
34use datafusion_execution::RecordBatchStream;
35use datafusion_physical_plan::metrics::{
36 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
37};
38
39use arrow::record_batch::RecordBatch;
40use datafusion_common::instant::Instant;
41use datafusion_common::ScalarValue;
42
43use futures::future::BoxFuture;
44use futures::stream::BoxStream;
45use futures::{ready, FutureExt as _, Stream, StreamExt as _};
46
47pub struct FileStream {
49 file_iter: VecDeque<PartitionedFile>,
51 projected_schema: SchemaRef,
54 remain: Option<usize>,
56 file_opener: Arc<dyn FileOpener>,
59 pc_projector: PartitionColumnProjector,
61 state: FileStreamState,
63 file_stream_metrics: FileStreamMetrics,
65 baseline_metrics: BaselineMetrics,
67 on_error: OnError,
69}
70
71impl FileStream {
72 pub fn new(
74 config: &FileScanConfig,
75 partition: usize,
76 file_opener: Arc<dyn FileOpener>,
77 metrics: &ExecutionPlanMetricsSet,
78 ) -> Result<Self> {
79 let projected_schema = config.projected_schema();
80 let pc_projector = PartitionColumnProjector::new(
81 Arc::clone(&projected_schema),
82 &config
83 .table_partition_cols()
84 .iter()
85 .map(|x| x.name().clone())
86 .collect::<Vec<_>>(),
87 );
88
89 let file_group = config.file_groups[partition].clone();
90
91 Ok(Self {
92 file_iter: file_group.into_inner().into_iter().collect(),
93 projected_schema,
94 remain: config.limit,
95 file_opener,
96 pc_projector,
97 state: FileStreamState::Idle,
98 file_stream_metrics: FileStreamMetrics::new(metrics, partition),
99 baseline_metrics: BaselineMetrics::new(metrics, partition),
100 on_error: OnError::Fail,
101 })
102 }
103
104 pub fn with_on_error(mut self, on_error: OnError) -> Self {
109 self.on_error = on_error;
110 self
111 }
112
113 fn start_next_file(&mut self) -> Option<Result<(FileOpenFuture, Vec<ScalarValue>)>> {
118 let part_file = self.file_iter.pop_front()?;
119
120 let partition_values = part_file.partition_values.clone();
121 Some(
122 self.file_opener
123 .open(part_file)
124 .map(|future| (future, partition_values)),
125 )
126 }
127
128 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
129 loop {
130 match &mut self.state {
131 FileStreamState::Idle => {
132 self.file_stream_metrics.time_opening.start();
133
134 match self.start_next_file().transpose() {
135 Ok(Some((future, partition_values))) => {
136 self.state = FileStreamState::Open {
137 future,
138 partition_values,
139 }
140 }
141 Ok(None) => return Poll::Ready(None),
142 Err(e) => {
143 self.state = FileStreamState::Error;
144 return Poll::Ready(Some(Err(e)));
145 }
146 }
147 }
148 FileStreamState::Open {
149 future,
150 partition_values,
151 } => match ready!(future.poll_unpin(cx)) {
152 Ok(reader) => {
153 let partition_values = mem::take(partition_values);
154
155 self.file_stream_metrics.time_opening.stop();
157 let next = self.start_next_file().transpose();
158 self.file_stream_metrics.time_scanning_until_data.start();
159 self.file_stream_metrics.time_scanning_total.start();
160
161 match next {
162 Ok(Some((next_future, next_partition_values))) => {
163 self.state = FileStreamState::Scan {
164 partition_values,
165 reader,
166 next: Some((
167 NextOpen::Pending(next_future),
168 next_partition_values,
169 )),
170 };
171 }
172 Ok(None) => {
173 self.state = FileStreamState::Scan {
174 reader,
175 partition_values,
176 next: None,
177 };
178 }
179 Err(e) => {
180 self.state = FileStreamState::Error;
181 return Poll::Ready(Some(Err(e)));
182 }
183 }
184 }
185 Err(e) => {
186 self.file_stream_metrics.file_open_errors.add(1);
187 match self.on_error {
188 OnError::Skip => {
189 self.file_stream_metrics.time_opening.stop();
190 self.state = FileStreamState::Idle
191 }
192 OnError::Fail => {
193 self.state = FileStreamState::Error;
194 return Poll::Ready(Some(Err(e)));
195 }
196 }
197 }
198 },
199 FileStreamState::Scan {
200 reader,
201 partition_values,
202 next,
203 } => {
204 if let Some((next_open_future, _)) = next {
206 if let NextOpen::Pending(f) = next_open_future {
207 if let Poll::Ready(reader) = f.as_mut().poll(cx) {
208 *next_open_future = NextOpen::Ready(reader);
209 }
210 }
211 }
212 match ready!(reader.poll_next_unpin(cx)) {
213 Some(Ok(batch)) => {
214 self.file_stream_metrics.time_scanning_until_data.stop();
215 self.file_stream_metrics.time_scanning_total.stop();
216 let result = self
217 .pc_projector
218 .project(batch, partition_values)
219 .map(|batch| match &mut self.remain {
220 Some(remain) => {
221 if *remain > batch.num_rows() {
222 *remain -= batch.num_rows();
223 batch
224 } else {
225 let batch = batch.slice(0, *remain);
226 self.state = FileStreamState::Limit;
227 *remain = 0;
228 batch
229 }
230 }
231 None => batch,
232 });
233
234 if result.is_err() {
235 self.state = FileStreamState::Error
238 }
239 self.file_stream_metrics.time_scanning_total.start();
240 return Poll::Ready(Some(result));
241 }
242 Some(Err(err)) => {
243 self.file_stream_metrics.file_scan_errors.add(1);
244 self.file_stream_metrics.time_scanning_until_data.stop();
245 self.file_stream_metrics.time_scanning_total.stop();
246
247 match self.on_error {
248 OnError::Skip => match mem::take(next) {
250 Some((future, partition_values)) => {
251 self.file_stream_metrics.time_opening.start();
252
253 match future {
254 NextOpen::Pending(future) => {
255 self.state = FileStreamState::Open {
256 future,
257 partition_values,
258 }
259 }
260 NextOpen::Ready(reader) => {
261 self.state = FileStreamState::Open {
262 future: Box::pin(std::future::ready(
263 reader,
264 )),
265 partition_values,
266 }
267 }
268 }
269 }
270 None => return Poll::Ready(None),
271 },
272 OnError::Fail => {
273 self.state = FileStreamState::Error;
274 return Poll::Ready(Some(Err(err)));
275 }
276 }
277 }
278 None => {
279 self.file_stream_metrics.time_scanning_until_data.stop();
280 self.file_stream_metrics.time_scanning_total.stop();
281
282 match mem::take(next) {
283 Some((future, partition_values)) => {
284 self.file_stream_metrics.time_opening.start();
285
286 match future {
287 NextOpen::Pending(future) => {
288 self.state = FileStreamState::Open {
289 future,
290 partition_values,
291 }
292 }
293 NextOpen::Ready(reader) => {
294 self.state = FileStreamState::Open {
295 future: Box::pin(std::future::ready(
296 reader,
297 )),
298 partition_values,
299 }
300 }
301 }
302 }
303 None => return Poll::Ready(None),
304 }
305 }
306 }
307 }
308 FileStreamState::Error | FileStreamState::Limit => {
309 return Poll::Ready(None)
310 }
311 }
312 }
313 }
314}
315
316impl Stream for FileStream {
317 type Item = Result<RecordBatch>;
318
319 fn poll_next(
320 mut self: Pin<&mut Self>,
321 cx: &mut Context<'_>,
322 ) -> Poll<Option<Self::Item>> {
323 self.file_stream_metrics.time_processing.start();
324 let result = self.poll_inner(cx);
325 self.file_stream_metrics.time_processing.stop();
326 self.baseline_metrics.record_poll(result)
327 }
328}
329
330impl RecordBatchStream for FileStream {
331 fn schema(&self) -> SchemaRef {
332 Arc::clone(&self.projected_schema)
333 }
334}
335
336pub type FileOpenFuture =
338 BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch>>>>;
339
340#[derive(Default)]
342pub enum OnError {
343 #[default]
345 Fail,
346 Skip,
348}
349
350pub trait FileOpener: Unpin + Send + Sync {
355 fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture>;
358}
359
360pub enum NextOpen {
364 Pending(FileOpenFuture),
365 Ready(Result<BoxStream<'static, Result<RecordBatch>>>),
366}
367
368pub enum FileStreamState {
369 Idle,
371 Open {
374 future: FileOpenFuture,
376 partition_values: Vec<ScalarValue>,
378 },
379 Scan {
382 partition_values: Vec<ScalarValue>,
384 reader: BoxStream<'static, Result<RecordBatch>>,
386 next: Option<(NextOpen, Vec<ScalarValue>)>,
391 },
392 Error,
394 Limit,
396}
397
398pub struct StartableTime {
400 pub metrics: Time,
401 pub start: Option<Instant>,
403}
404
405impl StartableTime {
406 pub fn start(&mut self) {
407 assert!(self.start.is_none());
408 self.start = Some(Instant::now());
409 }
410
411 pub fn stop(&mut self) {
412 if let Some(start) = self.start.take() {
413 self.metrics.add_elapsed(start);
414 }
415 }
416}
417
418#[allow(rustdoc::broken_intra_doc_links)]
419pub struct FileStreamMetrics {
427 pub time_opening: StartableTime,
438 pub time_scanning_until_data: StartableTime,
444 pub time_scanning_total: StartableTime,
451 pub time_processing: StartableTime,
455 pub file_open_errors: Count,
460 pub file_scan_errors: Count,
465}
466
467impl FileStreamMetrics {
468 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
469 let time_opening = StartableTime {
470 metrics: MetricBuilder::new(metrics)
471 .subset_time("time_elapsed_opening", partition),
472 start: None,
473 };
474
475 let time_scanning_until_data = StartableTime {
476 metrics: MetricBuilder::new(metrics)
477 .subset_time("time_elapsed_scanning_until_data", partition),
478 start: None,
479 };
480
481 let time_scanning_total = StartableTime {
482 metrics: MetricBuilder::new(metrics)
483 .subset_time("time_elapsed_scanning_total", partition),
484 start: None,
485 };
486
487 let time_processing = StartableTime {
488 metrics: MetricBuilder::new(metrics)
489 .subset_time("time_elapsed_processing", partition),
490 start: None,
491 };
492
493 let file_open_errors =
494 MetricBuilder::new(metrics).counter("file_open_errors", partition);
495
496 let file_scan_errors =
497 MetricBuilder::new(metrics).counter("file_scan_errors", partition);
498
499 Self {
500 time_opening,
501 time_scanning_until_data,
502 time_scanning_total,
503 time_processing,
504 file_open_errors,
505 file_scan_errors,
506 }
507 }
508}
509
510#[cfg(test)]
511mod tests {
512 use crate::file_scan_config::FileScanConfigBuilder;
513 use crate::tests::make_partition;
514 use crate::PartitionedFile;
515 use datafusion_common::error::Result;
516 use datafusion_execution::object_store::ObjectStoreUrl;
517 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
518 use futures::{FutureExt as _, StreamExt as _};
519 use std::sync::atomic::{AtomicUsize, Ordering};
520 use std::sync::Arc;
521
522 use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
523 use crate::test_util::MockSource;
524 use arrow::array::RecordBatch;
525 use arrow::datatypes::Schema;
526
527 use datafusion_common::{assert_batches_eq, exec_err, internal_err};
528
529 #[derive(Default)]
531 struct TestOpener {
532 error_opening_idx: Vec<usize>,
534 error_scanning_idx: Vec<usize>,
536 current_idx: AtomicUsize,
538 records: Vec<RecordBatch>,
540 }
541
542 impl FileOpener for TestOpener {
543 fn open(&self, _partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
544 let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
545
546 if self.error_opening_idx.contains(&idx) {
547 Ok(futures::future::ready(internal_err!("error opening")).boxed())
548 } else if self.error_scanning_idx.contains(&idx) {
549 let error = futures::future::ready(exec_err!("error scanning"));
550 let stream = futures::stream::once(error).boxed();
551 Ok(futures::future::ready(Ok(stream)).boxed())
552 } else {
553 let iterator = self.records.clone().into_iter().map(Ok);
554 let stream = futures::stream::iter(iterator).boxed();
555 Ok(futures::future::ready(Ok(stream)).boxed())
556 }
557 }
558 }
559
560 #[derive(Default)]
561 struct FileStreamTest {
562 num_files: usize,
564 limit: Option<usize>,
566 on_error: OnError,
568 opener: TestOpener,
570 }
571
572 impl FileStreamTest {
573 pub fn new() -> Self {
574 Self::default()
575 }
576
577 pub fn with_num_files(mut self, num_files: usize) -> Self {
579 self.num_files = num_files;
580 self
581 }
582
583 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
585 self.limit = limit;
586 self
587 }
588
589 pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
592 self.opener.error_opening_idx = idx;
593 self
594 }
595
596 pub fn with_scan_errors(mut self, idx: Vec<usize>) -> Self {
599 self.opener.error_scanning_idx = idx;
600 self
601 }
602
603 pub fn with_on_error(mut self, on_error: OnError) -> Self {
605 self.on_error = on_error;
606 self
607 }
608
609 pub fn with_records(mut self, records: Vec<RecordBatch>) -> Self {
612 self.opener.records = records;
613 self
614 }
615
616 pub async fn result(self) -> Result<Vec<RecordBatch>> {
618 let file_schema = self
619 .opener
620 .records
621 .first()
622 .map(|batch| batch.schema())
623 .unwrap_or_else(|| Arc::new(Schema::empty()));
624
625 let mock_files: Vec<(String, u64)> = (0..self.num_files)
627 .map(|idx| (format!("mock_file{idx}"), 10_u64))
628 .collect();
629
630 let file_group = mock_files
636 .into_iter()
637 .map(|(name, size)| PartitionedFile::new(name, size))
638 .collect();
639
640 let on_error = self.on_error;
641
642 let config = FileScanConfigBuilder::new(
643 ObjectStoreUrl::parse("test:///").unwrap(),
644 file_schema,
645 Arc::new(MockSource::default()),
646 )
647 .with_file_group(file_group)
648 .with_limit(self.limit)
649 .build();
650 let metrics_set = ExecutionPlanMetricsSet::new();
651 let file_stream =
652 FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set)
653 .unwrap()
654 .with_on_error(on_error);
655
656 file_stream
657 .collect::<Vec<_>>()
658 .await
659 .into_iter()
660 .collect::<Result<Vec<_>>>()
661 }
662 }
663
664 async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
666 FileStreamTest::new()
667 .with_records(vec![make_partition(3), make_partition(2)])
668 .with_num_files(2)
669 .with_limit(limit)
670 .result()
671 .await
672 .expect("error executing stream")
673 }
674
675 #[tokio::test]
676 async fn on_error_opening() -> Result<()> {
677 let batches = FileStreamTest::new()
678 .with_records(vec![make_partition(3), make_partition(2)])
679 .with_num_files(2)
680 .with_on_error(OnError::Skip)
681 .with_open_errors(vec![0])
682 .result()
683 .await?;
684
685 #[rustfmt::skip]
686 assert_batches_eq!(&[
687 "+---+",
688 "| i |",
689 "+---+",
690 "| 0 |",
691 "| 1 |",
692 "| 2 |",
693 "| 0 |",
694 "| 1 |",
695 "+---+",
696 ], &batches);
697
698 let batches = FileStreamTest::new()
699 .with_records(vec![make_partition(3), make_partition(2)])
700 .with_num_files(2)
701 .with_on_error(OnError::Skip)
702 .with_open_errors(vec![1])
703 .result()
704 .await?;
705
706 #[rustfmt::skip]
707 assert_batches_eq!(&[
708 "+---+",
709 "| i |",
710 "+---+",
711 "| 0 |",
712 "| 1 |",
713 "| 2 |",
714 "| 0 |",
715 "| 1 |",
716 "+---+",
717 ], &batches);
718
719 let batches = FileStreamTest::new()
720 .with_records(vec![make_partition(3), make_partition(2)])
721 .with_num_files(2)
722 .with_on_error(OnError::Skip)
723 .with_open_errors(vec![0, 1])
724 .result()
725 .await?;
726
727 #[rustfmt::skip]
728 assert_batches_eq!(&[
729 "++",
730 "++",
731 ], &batches);
732
733 Ok(())
734 }
735
736 #[tokio::test]
737 async fn on_error_scanning_fail() -> Result<()> {
738 let result = FileStreamTest::new()
739 .with_records(vec![make_partition(3), make_partition(2)])
740 .with_num_files(2)
741 .with_on_error(OnError::Fail)
742 .with_scan_errors(vec![1])
743 .result()
744 .await;
745
746 assert!(result.is_err());
747
748 Ok(())
749 }
750
751 #[tokio::test]
752 async fn on_error_opening_fail() -> Result<()> {
753 let result = FileStreamTest::new()
754 .with_records(vec![make_partition(3), make_partition(2)])
755 .with_num_files(2)
756 .with_on_error(OnError::Fail)
757 .with_open_errors(vec![1])
758 .result()
759 .await;
760
761 assert!(result.is_err());
762
763 Ok(())
764 }
765
766 #[tokio::test]
767 async fn on_error_scanning() -> Result<()> {
768 let batches = FileStreamTest::new()
769 .with_records(vec![make_partition(3), make_partition(2)])
770 .with_num_files(2)
771 .with_on_error(OnError::Skip)
772 .with_scan_errors(vec![0])
773 .result()
774 .await?;
775
776 #[rustfmt::skip]
777 assert_batches_eq!(&[
778 "+---+",
779 "| i |",
780 "+---+",
781 "| 0 |",
782 "| 1 |",
783 "| 2 |",
784 "| 0 |",
785 "| 1 |",
786 "+---+",
787 ], &batches);
788
789 let batches = FileStreamTest::new()
790 .with_records(vec![make_partition(3), make_partition(2)])
791 .with_num_files(2)
792 .with_on_error(OnError::Skip)
793 .with_scan_errors(vec![1])
794 .result()
795 .await?;
796
797 #[rustfmt::skip]
798 assert_batches_eq!(&[
799 "+---+",
800 "| i |",
801 "+---+",
802 "| 0 |",
803 "| 1 |",
804 "| 2 |",
805 "| 0 |",
806 "| 1 |",
807 "+---+",
808 ], &batches);
809
810 let batches = FileStreamTest::new()
811 .with_records(vec![make_partition(3), make_partition(2)])
812 .with_num_files(2)
813 .with_on_error(OnError::Skip)
814 .with_scan_errors(vec![0, 1])
815 .result()
816 .await?;
817
818 #[rustfmt::skip]
819 assert_batches_eq!(&[
820 "++",
821 "++",
822 ], &batches);
823
824 Ok(())
825 }
826
827 #[tokio::test]
828 async fn on_error_mixed() -> Result<()> {
829 let batches = FileStreamTest::new()
830 .with_records(vec![make_partition(3), make_partition(2)])
831 .with_num_files(3)
832 .with_on_error(OnError::Skip)
833 .with_open_errors(vec![1])
834 .with_scan_errors(vec![0])
835 .result()
836 .await?;
837
838 #[rustfmt::skip]
839 assert_batches_eq!(&[
840 "+---+",
841 "| i |",
842 "+---+",
843 "| 0 |",
844 "| 1 |",
845 "| 2 |",
846 "| 0 |",
847 "| 1 |",
848 "+---+",
849 ], &batches);
850
851 let batches = FileStreamTest::new()
852 .with_records(vec![make_partition(3), make_partition(2)])
853 .with_num_files(3)
854 .with_on_error(OnError::Skip)
855 .with_open_errors(vec![0])
856 .with_scan_errors(vec![1])
857 .result()
858 .await?;
859
860 #[rustfmt::skip]
861 assert_batches_eq!(&[
862 "+---+",
863 "| i |",
864 "+---+",
865 "| 0 |",
866 "| 1 |",
867 "| 2 |",
868 "| 0 |",
869 "| 1 |",
870 "+---+",
871 ], &batches);
872
873 let batches = FileStreamTest::new()
874 .with_records(vec![make_partition(3), make_partition(2)])
875 .with_num_files(3)
876 .with_on_error(OnError::Skip)
877 .with_open_errors(vec![2])
878 .with_scan_errors(vec![0, 1])
879 .result()
880 .await?;
881
882 #[rustfmt::skip]
883 assert_batches_eq!(&[
884 "++",
885 "++",
886 ], &batches);
887
888 let batches = FileStreamTest::new()
889 .with_records(vec![make_partition(3), make_partition(2)])
890 .with_num_files(3)
891 .with_on_error(OnError::Skip)
892 .with_open_errors(vec![0, 2])
893 .with_scan_errors(vec![1])
894 .result()
895 .await?;
896
897 #[rustfmt::skip]
898 assert_batches_eq!(&[
899 "++",
900 "++",
901 ], &batches);
902
903 Ok(())
904 }
905
906 #[tokio::test]
907 async fn without_limit() -> Result<()> {
908 let batches = create_and_collect(None).await;
909
910 #[rustfmt::skip]
911 assert_batches_eq!(&[
912 "+---+",
913 "| i |",
914 "+---+",
915 "| 0 |",
916 "| 1 |",
917 "| 2 |",
918 "| 0 |",
919 "| 1 |",
920 "| 0 |",
921 "| 1 |",
922 "| 2 |",
923 "| 0 |",
924 "| 1 |",
925 "+---+",
926 ], &batches);
927
928 Ok(())
929 }
930
931 #[tokio::test]
932 async fn with_limit_between_files() -> Result<()> {
933 let batches = create_and_collect(Some(5)).await;
934 #[rustfmt::skip]
935 assert_batches_eq!(&[
936 "+---+",
937 "| i |",
938 "+---+",
939 "| 0 |",
940 "| 1 |",
941 "| 2 |",
942 "| 0 |",
943 "| 1 |",
944 "+---+",
945 ], &batches);
946
947 Ok(())
948 }
949
950 #[tokio::test]
951 async fn with_limit_at_middle_of_batch() -> Result<()> {
952 let batches = create_and_collect(Some(6)).await;
953 #[rustfmt::skip]
954 assert_batches_eq!(&[
955 "+---+",
956 "| i |",
957 "+---+",
958 "| 0 |",
959 "| 1 |",
960 "| 2 |",
961 "| 0 |",
962 "| 1 |",
963 "| 0 |",
964 "+---+",
965 ], &batches);
966
967 Ok(())
968 }
969}