1use std::collections::VecDeque;
25use std::mem;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29
30use crate::file_meta::FileMeta;
31use crate::file_scan_config::{FileScanConfig, PartitionColumnProjector};
32use crate::PartitionedFile;
33use arrow::datatypes::SchemaRef;
34use datafusion_common::error::Result;
35use datafusion_execution::RecordBatchStream;
36use datafusion_physical_plan::metrics::{
37 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
38};
39
40use arrow::record_batch::RecordBatch;
41use datafusion_common::instant::Instant;
42use datafusion_common::ScalarValue;
43
44use futures::future::BoxFuture;
45use futures::stream::BoxStream;
46use futures::{ready, FutureExt as _, Stream, StreamExt as _};
47
48pub struct FileStream {
50 file_iter: VecDeque<PartitionedFile>,
52 projected_schema: SchemaRef,
55 remain: Option<usize>,
57 file_opener: Arc<dyn FileOpener>,
60 pc_projector: PartitionColumnProjector,
62 state: FileStreamState,
64 file_stream_metrics: FileStreamMetrics,
66 baseline_metrics: BaselineMetrics,
68 on_error: OnError,
70}
71
72impl FileStream {
73 pub fn new(
75 config: &FileScanConfig,
76 partition: usize,
77 file_opener: Arc<dyn FileOpener>,
78 metrics: &ExecutionPlanMetricsSet,
79 ) -> Result<Self> {
80 let projected_schema = config.projected_schema();
81 let pc_projector = PartitionColumnProjector::new(
82 Arc::clone(&projected_schema),
83 &config
84 .table_partition_cols
85 .iter()
86 .map(|x| x.name().clone())
87 .collect::<Vec<_>>(),
88 );
89
90 let file_group = config.file_groups[partition].clone();
91
92 Ok(Self {
93 file_iter: file_group.into_inner().into_iter().collect(),
94 projected_schema,
95 remain: config.limit,
96 file_opener,
97 pc_projector,
98 state: FileStreamState::Idle,
99 file_stream_metrics: FileStreamMetrics::new(metrics, partition),
100 baseline_metrics: BaselineMetrics::new(metrics, partition),
101 on_error: OnError::Fail,
102 })
103 }
104
105 pub fn with_on_error(mut self, on_error: OnError) -> Self {
110 self.on_error = on_error;
111 self
112 }
113
114 fn start_next_file(&mut self) -> Option<Result<(FileOpenFuture, Vec<ScalarValue>)>> {
119 let part_file = self.file_iter.pop_front()?;
120
121 let file_meta = FileMeta {
122 object_meta: part_file.object_meta.clone(),
123 range: part_file.range.clone(),
124 extensions: part_file.extensions.clone(),
125 metadata_size_hint: part_file.metadata_size_hint,
126 };
127
128 let partition_values = part_file.partition_values.clone();
129 Some(
130 self.file_opener
131 .open(file_meta, part_file)
132 .map(|future| (future, partition_values)),
133 )
134 }
135
136 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
137 loop {
138 match &mut self.state {
139 FileStreamState::Idle => {
140 self.file_stream_metrics.time_opening.start();
141
142 match self.start_next_file().transpose() {
143 Ok(Some((future, partition_values))) => {
144 self.state = FileStreamState::Open {
145 future,
146 partition_values,
147 }
148 }
149 Ok(None) => return Poll::Ready(None),
150 Err(e) => {
151 self.state = FileStreamState::Error;
152 return Poll::Ready(Some(Err(e)));
153 }
154 }
155 }
156 FileStreamState::Open {
157 future,
158 partition_values,
159 } => match ready!(future.poll_unpin(cx)) {
160 Ok(reader) => {
161 let partition_values = mem::take(partition_values);
162
163 self.file_stream_metrics.time_opening.stop();
165 let next = self.start_next_file().transpose();
166 self.file_stream_metrics.time_scanning_until_data.start();
167 self.file_stream_metrics.time_scanning_total.start();
168
169 match next {
170 Ok(Some((next_future, next_partition_values))) => {
171 self.state = FileStreamState::Scan {
172 partition_values,
173 reader,
174 next: Some((
175 NextOpen::Pending(next_future),
176 next_partition_values,
177 )),
178 };
179 }
180 Ok(None) => {
181 self.state = FileStreamState::Scan {
182 reader,
183 partition_values,
184 next: None,
185 };
186 }
187 Err(e) => {
188 self.state = FileStreamState::Error;
189 return Poll::Ready(Some(Err(e)));
190 }
191 }
192 }
193 Err(e) => {
194 self.file_stream_metrics.file_open_errors.add(1);
195 match self.on_error {
196 OnError::Skip => {
197 self.file_stream_metrics.time_opening.stop();
198 self.state = FileStreamState::Idle
199 }
200 OnError::Fail => {
201 self.state = FileStreamState::Error;
202 return Poll::Ready(Some(Err(e)));
203 }
204 }
205 }
206 },
207 FileStreamState::Scan {
208 reader,
209 partition_values,
210 next,
211 } => {
212 if let Some((next_open_future, _)) = next {
214 if let NextOpen::Pending(f) = next_open_future {
215 if let Poll::Ready(reader) = f.as_mut().poll(cx) {
216 *next_open_future = NextOpen::Ready(reader);
217 }
218 }
219 }
220 match ready!(reader.poll_next_unpin(cx)) {
221 Some(Ok(batch)) => {
222 self.file_stream_metrics.time_scanning_until_data.stop();
223 self.file_stream_metrics.time_scanning_total.stop();
224 let result = self
225 .pc_projector
226 .project(batch, partition_values)
227 .map(|batch| match &mut self.remain {
228 Some(remain) => {
229 if *remain > batch.num_rows() {
230 *remain -= batch.num_rows();
231 batch
232 } else {
233 let batch = batch.slice(0, *remain);
234 self.state = FileStreamState::Limit;
235 *remain = 0;
236 batch
237 }
238 }
239 None => batch,
240 });
241
242 if result.is_err() {
243 self.state = FileStreamState::Error
246 }
247 self.file_stream_metrics.time_scanning_total.start();
248 return Poll::Ready(Some(result));
249 }
250 Some(Err(err)) => {
251 self.file_stream_metrics.file_scan_errors.add(1);
252 self.file_stream_metrics.time_scanning_until_data.stop();
253 self.file_stream_metrics.time_scanning_total.stop();
254
255 match self.on_error {
256 OnError::Skip => match mem::take(next) {
258 Some((future, partition_values)) => {
259 self.file_stream_metrics.time_opening.start();
260
261 match future {
262 NextOpen::Pending(future) => {
263 self.state = FileStreamState::Open {
264 future,
265 partition_values,
266 }
267 }
268 NextOpen::Ready(reader) => {
269 self.state = FileStreamState::Open {
270 future: Box::pin(std::future::ready(
271 reader,
272 )),
273 partition_values,
274 }
275 }
276 }
277 }
278 None => return Poll::Ready(None),
279 },
280 OnError::Fail => {
281 self.state = FileStreamState::Error;
282 return Poll::Ready(Some(Err(err)));
283 }
284 }
285 }
286 None => {
287 self.file_stream_metrics.time_scanning_until_data.stop();
288 self.file_stream_metrics.time_scanning_total.stop();
289
290 match mem::take(next) {
291 Some((future, partition_values)) => {
292 self.file_stream_metrics.time_opening.start();
293
294 match future {
295 NextOpen::Pending(future) => {
296 self.state = FileStreamState::Open {
297 future,
298 partition_values,
299 }
300 }
301 NextOpen::Ready(reader) => {
302 self.state = FileStreamState::Open {
303 future: Box::pin(std::future::ready(
304 reader,
305 )),
306 partition_values,
307 }
308 }
309 }
310 }
311 None => return Poll::Ready(None),
312 }
313 }
314 }
315 }
316 FileStreamState::Error | FileStreamState::Limit => {
317 return Poll::Ready(None)
318 }
319 }
320 }
321 }
322}
323
324impl Stream for FileStream {
325 type Item = Result<RecordBatch>;
326
327 fn poll_next(
328 mut self: Pin<&mut Self>,
329 cx: &mut Context<'_>,
330 ) -> Poll<Option<Self::Item>> {
331 self.file_stream_metrics.time_processing.start();
332 let result = self.poll_inner(cx);
333 self.file_stream_metrics.time_processing.stop();
334 self.baseline_metrics.record_poll(result)
335 }
336}
337
338impl RecordBatchStream for FileStream {
339 fn schema(&self) -> SchemaRef {
340 Arc::clone(&self.projected_schema)
341 }
342}
343
344pub type FileOpenFuture =
346 BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch>>>>;
347
348pub enum OnError {
350 Fail,
352 Skip,
354}
355
356impl Default for OnError {
357 fn default() -> Self {
358 Self::Fail
359 }
360}
361
362pub trait FileOpener: Unpin + Send + Sync {
367 fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture>;
370}
371
372pub enum NextOpen {
376 Pending(FileOpenFuture),
377 Ready(Result<BoxStream<'static, Result<RecordBatch>>>),
378}
379
380pub enum FileStreamState {
381 Idle,
383 Open {
386 future: FileOpenFuture,
388 partition_values: Vec<ScalarValue>,
390 },
391 Scan {
394 partition_values: Vec<ScalarValue>,
396 reader: BoxStream<'static, Result<RecordBatch>>,
398 next: Option<(NextOpen, Vec<ScalarValue>)>,
403 },
404 Error,
406 Limit,
408}
409
410pub struct StartableTime {
412 pub metrics: Time,
413 pub start: Option<Instant>,
415}
416
417impl StartableTime {
418 pub fn start(&mut self) {
419 assert!(self.start.is_none());
420 self.start = Some(Instant::now());
421 }
422
423 pub fn stop(&mut self) {
424 if let Some(start) = self.start.take() {
425 self.metrics.add_elapsed(start);
426 }
427 }
428}
429
430#[allow(rustdoc::broken_intra_doc_links)]
431pub struct FileStreamMetrics {
439 pub time_opening: StartableTime,
450 pub time_scanning_until_data: StartableTime,
456 pub time_scanning_total: StartableTime,
463 pub time_processing: StartableTime,
467 pub file_open_errors: Count,
472 pub file_scan_errors: Count,
477}
478
479impl FileStreamMetrics {
480 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
481 let time_opening = StartableTime {
482 metrics: MetricBuilder::new(metrics)
483 .subset_time("time_elapsed_opening", partition),
484 start: None,
485 };
486
487 let time_scanning_until_data = StartableTime {
488 metrics: MetricBuilder::new(metrics)
489 .subset_time("time_elapsed_scanning_until_data", partition),
490 start: None,
491 };
492
493 let time_scanning_total = StartableTime {
494 metrics: MetricBuilder::new(metrics)
495 .subset_time("time_elapsed_scanning_total", partition),
496 start: None,
497 };
498
499 let time_processing = StartableTime {
500 metrics: MetricBuilder::new(metrics)
501 .subset_time("time_elapsed_processing", partition),
502 start: None,
503 };
504
505 let file_open_errors =
506 MetricBuilder::new(metrics).counter("file_open_errors", partition);
507
508 let file_scan_errors =
509 MetricBuilder::new(metrics).counter("file_scan_errors", partition);
510
511 Self {
512 time_opening,
513 time_scanning_until_data,
514 time_scanning_total,
515 time_processing,
516 file_open_errors,
517 file_scan_errors,
518 }
519 }
520}
521
522#[cfg(test)]
523mod tests {
524 use crate::file_scan_config::FileScanConfigBuilder;
525 use crate::tests::make_partition;
526 use crate::PartitionedFile;
527 use datafusion_common::error::Result;
528 use datafusion_execution::object_store::ObjectStoreUrl;
529 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
530 use futures::{FutureExt as _, StreamExt as _};
531 use std::sync::atomic::{AtomicUsize, Ordering};
532 use std::sync::Arc;
533
534 use crate::file_meta::FileMeta;
535 use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
536 use crate::test_util::MockSource;
537 use arrow::array::RecordBatch;
538 use arrow::datatypes::Schema;
539
540 use datafusion_common::{assert_batches_eq, exec_err, internal_err};
541
542 #[derive(Default)]
544 struct TestOpener {
545 error_opening_idx: Vec<usize>,
547 error_scanning_idx: Vec<usize>,
549 current_idx: AtomicUsize,
551 records: Vec<RecordBatch>,
553 }
554
555 impl FileOpener for TestOpener {
556 fn open(
557 &self,
558 _file_meta: FileMeta,
559 _file: PartitionedFile,
560 ) -> Result<FileOpenFuture> {
561 let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
562
563 if self.error_opening_idx.contains(&idx) {
564 Ok(futures::future::ready(internal_err!("error opening")).boxed())
565 } else if self.error_scanning_idx.contains(&idx) {
566 let error = futures::future::ready(exec_err!("error scanning"));
567 let stream = futures::stream::once(error).boxed();
568 Ok(futures::future::ready(Ok(stream)).boxed())
569 } else {
570 let iterator = self.records.clone().into_iter().map(Ok);
571 let stream = futures::stream::iter(iterator).boxed();
572 Ok(futures::future::ready(Ok(stream)).boxed())
573 }
574 }
575 }
576
577 #[derive(Default)]
578 struct FileStreamTest {
579 num_files: usize,
581 limit: Option<usize>,
583 on_error: OnError,
585 opener: TestOpener,
587 }
588
589 impl FileStreamTest {
590 pub fn new() -> Self {
591 Self::default()
592 }
593
594 pub fn with_num_files(mut self, num_files: usize) -> Self {
596 self.num_files = num_files;
597 self
598 }
599
600 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
602 self.limit = limit;
603 self
604 }
605
606 pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
609 self.opener.error_opening_idx = idx;
610 self
611 }
612
613 pub fn with_scan_errors(mut self, idx: Vec<usize>) -> Self {
616 self.opener.error_scanning_idx = idx;
617 self
618 }
619
620 pub fn with_on_error(mut self, on_error: OnError) -> Self {
622 self.on_error = on_error;
623 self
624 }
625
626 pub fn with_records(mut self, records: Vec<RecordBatch>) -> Self {
629 self.opener.records = records;
630 self
631 }
632
633 pub async fn result(self) -> Result<Vec<RecordBatch>> {
635 let file_schema = self
636 .opener
637 .records
638 .first()
639 .map(|batch| batch.schema())
640 .unwrap_or_else(|| Arc::new(Schema::empty()));
641
642 let mock_files: Vec<(String, u64)> = (0..self.num_files)
644 .map(|idx| (format!("mock_file{idx}"), 10_u64))
645 .collect();
646
647 let file_group = mock_files
653 .into_iter()
654 .map(|(name, size)| PartitionedFile::new(name, size))
655 .collect();
656
657 let on_error = self.on_error;
658
659 let config = FileScanConfigBuilder::new(
660 ObjectStoreUrl::parse("test:///").unwrap(),
661 file_schema,
662 Arc::new(MockSource::default()),
663 )
664 .with_file_group(file_group)
665 .with_limit(self.limit)
666 .build();
667 let metrics_set = ExecutionPlanMetricsSet::new();
668 let file_stream =
669 FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set)
670 .unwrap()
671 .with_on_error(on_error);
672
673 file_stream
674 .collect::<Vec<_>>()
675 .await
676 .into_iter()
677 .collect::<Result<Vec<_>>>()
678 }
679 }
680
681 async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
683 FileStreamTest::new()
684 .with_records(vec![make_partition(3), make_partition(2)])
685 .with_num_files(2)
686 .with_limit(limit)
687 .result()
688 .await
689 .expect("error executing stream")
690 }
691
692 #[tokio::test]
693 async fn on_error_opening() -> Result<()> {
694 let batches = FileStreamTest::new()
695 .with_records(vec![make_partition(3), make_partition(2)])
696 .with_num_files(2)
697 .with_on_error(OnError::Skip)
698 .with_open_errors(vec![0])
699 .result()
700 .await?;
701
702 #[rustfmt::skip]
703 assert_batches_eq!(&[
704 "+---+",
705 "| i |",
706 "+---+",
707 "| 0 |",
708 "| 1 |",
709 "| 2 |",
710 "| 0 |",
711 "| 1 |",
712 "+---+",
713 ], &batches);
714
715 let batches = FileStreamTest::new()
716 .with_records(vec![make_partition(3), make_partition(2)])
717 .with_num_files(2)
718 .with_on_error(OnError::Skip)
719 .with_open_errors(vec![1])
720 .result()
721 .await?;
722
723 #[rustfmt::skip]
724 assert_batches_eq!(&[
725 "+---+",
726 "| i |",
727 "+---+",
728 "| 0 |",
729 "| 1 |",
730 "| 2 |",
731 "| 0 |",
732 "| 1 |",
733 "+---+",
734 ], &batches);
735
736 let batches = FileStreamTest::new()
737 .with_records(vec![make_partition(3), make_partition(2)])
738 .with_num_files(2)
739 .with_on_error(OnError::Skip)
740 .with_open_errors(vec![0, 1])
741 .result()
742 .await?;
743
744 #[rustfmt::skip]
745 assert_batches_eq!(&[
746 "++",
747 "++",
748 ], &batches);
749
750 Ok(())
751 }
752
753 #[tokio::test]
754 async fn on_error_scanning_fail() -> Result<()> {
755 let result = FileStreamTest::new()
756 .with_records(vec![make_partition(3), make_partition(2)])
757 .with_num_files(2)
758 .with_on_error(OnError::Fail)
759 .with_scan_errors(vec![1])
760 .result()
761 .await;
762
763 assert!(result.is_err());
764
765 Ok(())
766 }
767
768 #[tokio::test]
769 async fn on_error_opening_fail() -> Result<()> {
770 let result = FileStreamTest::new()
771 .with_records(vec![make_partition(3), make_partition(2)])
772 .with_num_files(2)
773 .with_on_error(OnError::Fail)
774 .with_open_errors(vec![1])
775 .result()
776 .await;
777
778 assert!(result.is_err());
779
780 Ok(())
781 }
782
783 #[tokio::test]
784 async fn on_error_scanning() -> Result<()> {
785 let batches = FileStreamTest::new()
786 .with_records(vec![make_partition(3), make_partition(2)])
787 .with_num_files(2)
788 .with_on_error(OnError::Skip)
789 .with_scan_errors(vec![0])
790 .result()
791 .await?;
792
793 #[rustfmt::skip]
794 assert_batches_eq!(&[
795 "+---+",
796 "| i |",
797 "+---+",
798 "| 0 |",
799 "| 1 |",
800 "| 2 |",
801 "| 0 |",
802 "| 1 |",
803 "+---+",
804 ], &batches);
805
806 let batches = FileStreamTest::new()
807 .with_records(vec![make_partition(3), make_partition(2)])
808 .with_num_files(2)
809 .with_on_error(OnError::Skip)
810 .with_scan_errors(vec![1])
811 .result()
812 .await?;
813
814 #[rustfmt::skip]
815 assert_batches_eq!(&[
816 "+---+",
817 "| i |",
818 "+---+",
819 "| 0 |",
820 "| 1 |",
821 "| 2 |",
822 "| 0 |",
823 "| 1 |",
824 "+---+",
825 ], &batches);
826
827 let batches = FileStreamTest::new()
828 .with_records(vec![make_partition(3), make_partition(2)])
829 .with_num_files(2)
830 .with_on_error(OnError::Skip)
831 .with_scan_errors(vec![0, 1])
832 .result()
833 .await?;
834
835 #[rustfmt::skip]
836 assert_batches_eq!(&[
837 "++",
838 "++",
839 ], &batches);
840
841 Ok(())
842 }
843
844 #[tokio::test]
845 async fn on_error_mixed() -> Result<()> {
846 let batches = FileStreamTest::new()
847 .with_records(vec![make_partition(3), make_partition(2)])
848 .with_num_files(3)
849 .with_on_error(OnError::Skip)
850 .with_open_errors(vec![1])
851 .with_scan_errors(vec![0])
852 .result()
853 .await?;
854
855 #[rustfmt::skip]
856 assert_batches_eq!(&[
857 "+---+",
858 "| i |",
859 "+---+",
860 "| 0 |",
861 "| 1 |",
862 "| 2 |",
863 "| 0 |",
864 "| 1 |",
865 "+---+",
866 ], &batches);
867
868 let batches = FileStreamTest::new()
869 .with_records(vec![make_partition(3), make_partition(2)])
870 .with_num_files(3)
871 .with_on_error(OnError::Skip)
872 .with_open_errors(vec![0])
873 .with_scan_errors(vec![1])
874 .result()
875 .await?;
876
877 #[rustfmt::skip]
878 assert_batches_eq!(&[
879 "+---+",
880 "| i |",
881 "+---+",
882 "| 0 |",
883 "| 1 |",
884 "| 2 |",
885 "| 0 |",
886 "| 1 |",
887 "+---+",
888 ], &batches);
889
890 let batches = FileStreamTest::new()
891 .with_records(vec![make_partition(3), make_partition(2)])
892 .with_num_files(3)
893 .with_on_error(OnError::Skip)
894 .with_open_errors(vec![2])
895 .with_scan_errors(vec![0, 1])
896 .result()
897 .await?;
898
899 #[rustfmt::skip]
900 assert_batches_eq!(&[
901 "++",
902 "++",
903 ], &batches);
904
905 let batches = FileStreamTest::new()
906 .with_records(vec![make_partition(3), make_partition(2)])
907 .with_num_files(3)
908 .with_on_error(OnError::Skip)
909 .with_open_errors(vec![0, 2])
910 .with_scan_errors(vec![1])
911 .result()
912 .await?;
913
914 #[rustfmt::skip]
915 assert_batches_eq!(&[
916 "++",
917 "++",
918 ], &batches);
919
920 Ok(())
921 }
922
923 #[tokio::test]
924 async fn without_limit() -> Result<()> {
925 let batches = create_and_collect(None).await;
926
927 #[rustfmt::skip]
928 assert_batches_eq!(&[
929 "+---+",
930 "| i |",
931 "+---+",
932 "| 0 |",
933 "| 1 |",
934 "| 2 |",
935 "| 0 |",
936 "| 1 |",
937 "| 0 |",
938 "| 1 |",
939 "| 2 |",
940 "| 0 |",
941 "| 1 |",
942 "+---+",
943 ], &batches);
944
945 Ok(())
946 }
947
948 #[tokio::test]
949 async fn with_limit_between_files() -> Result<()> {
950 let batches = create_and_collect(Some(5)).await;
951 #[rustfmt::skip]
952 assert_batches_eq!(&[
953 "+---+",
954 "| i |",
955 "+---+",
956 "| 0 |",
957 "| 1 |",
958 "| 2 |",
959 "| 0 |",
960 "| 1 |",
961 "+---+",
962 ], &batches);
963
964 Ok(())
965 }
966
967 #[tokio::test]
968 async fn with_limit_at_middle_of_batch() -> Result<()> {
969 let batches = create_and_collect(Some(6)).await;
970 #[rustfmt::skip]
971 assert_batches_eq!(&[
972 "+---+",
973 "| i |",
974 "+---+",
975 "| 0 |",
976 "| 1 |",
977 "| 2 |",
978 "| 0 |",
979 "| 1 |",
980 "| 0 |",
981 "+---+",
982 ], &batches);
983
984 Ok(())
985 }
986}