1use std::collections::VecDeque;
25use std::mem;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29
30use crate::file_meta::FileMeta;
31use crate::file_scan_config::{FileScanConfig, PartitionColumnProjector};
32use crate::PartitionedFile;
33use arrow::datatypes::SchemaRef;
34use datafusion_common::error::Result;
35use datafusion_execution::RecordBatchStream;
36use datafusion_physical_plan::metrics::{
37 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
38};
39
40use arrow::error::ArrowError;
41use arrow::record_batch::RecordBatch;
42use datafusion_common::instant::Instant;
43use datafusion_common::ScalarValue;
44
45use futures::future::BoxFuture;
46use futures::stream::BoxStream;
47use futures::{ready, FutureExt as _, Stream, StreamExt as _};
48
49pub struct FileStream {
51 file_iter: VecDeque<PartitionedFile>,
53 projected_schema: SchemaRef,
56 remain: Option<usize>,
58 file_opener: Arc<dyn FileOpener>,
61 pc_projector: PartitionColumnProjector,
63 state: FileStreamState,
65 file_stream_metrics: FileStreamMetrics,
67 baseline_metrics: BaselineMetrics,
69 on_error: OnError,
71}
72
73impl FileStream {
74 pub fn new(
76 config: &FileScanConfig,
77 partition: usize,
78 file_opener: Arc<dyn FileOpener>,
79 metrics: &ExecutionPlanMetricsSet,
80 ) -> Result<Self> {
81 let (projected_schema, ..) = config.project();
82 let pc_projector = PartitionColumnProjector::new(
83 Arc::clone(&projected_schema),
84 &config
85 .table_partition_cols
86 .iter()
87 .map(|x| x.name().clone())
88 .collect::<Vec<_>>(),
89 );
90
91 let files = config.file_groups[partition].clone();
92
93 Ok(Self {
94 file_iter: files.into(),
95 projected_schema,
96 remain: config.limit,
97 file_opener,
98 pc_projector,
99 state: FileStreamState::Idle,
100 file_stream_metrics: FileStreamMetrics::new(metrics, partition),
101 baseline_metrics: BaselineMetrics::new(metrics, partition),
102 on_error: OnError::Fail,
103 })
104 }
105
106 pub fn with_on_error(mut self, on_error: OnError) -> Self {
111 self.on_error = on_error;
112 self
113 }
114
115 fn start_next_file(&mut self) -> Option<Result<(FileOpenFuture, Vec<ScalarValue>)>> {
120 let part_file = self.file_iter.pop_front()?;
121
122 let file_meta = FileMeta {
123 object_meta: part_file.object_meta,
124 range: part_file.range,
125 extensions: part_file.extensions,
126 metadata_size_hint: part_file.metadata_size_hint,
127 };
128
129 Some(
130 self.file_opener
131 .open(file_meta)
132 .map(|future| (future, part_file.partition_values)),
133 )
134 }
135
136 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
137 loop {
138 match &mut self.state {
139 FileStreamState::Idle => {
140 self.file_stream_metrics.time_opening.start();
141
142 match self.start_next_file().transpose() {
143 Ok(Some((future, partition_values))) => {
144 self.state = FileStreamState::Open {
145 future,
146 partition_values,
147 }
148 }
149 Ok(None) => return Poll::Ready(None),
150 Err(e) => {
151 self.state = FileStreamState::Error;
152 return Poll::Ready(Some(Err(e)));
153 }
154 }
155 }
156 FileStreamState::Open {
157 future,
158 partition_values,
159 } => match ready!(future.poll_unpin(cx)) {
160 Ok(reader) => {
161 let partition_values = mem::take(partition_values);
162
163 self.file_stream_metrics.time_opening.stop();
165 let next = self.start_next_file().transpose();
166 self.file_stream_metrics.time_scanning_until_data.start();
167 self.file_stream_metrics.time_scanning_total.start();
168
169 match next {
170 Ok(Some((next_future, next_partition_values))) => {
171 self.state = FileStreamState::Scan {
172 partition_values,
173 reader,
174 next: Some((
175 NextOpen::Pending(next_future),
176 next_partition_values,
177 )),
178 };
179 }
180 Ok(None) => {
181 self.state = FileStreamState::Scan {
182 reader,
183 partition_values,
184 next: None,
185 };
186 }
187 Err(e) => {
188 self.state = FileStreamState::Error;
189 return Poll::Ready(Some(Err(e)));
190 }
191 }
192 }
193 Err(e) => {
194 self.file_stream_metrics.file_open_errors.add(1);
195 match self.on_error {
196 OnError::Skip => {
197 self.file_stream_metrics.time_opening.stop();
198 self.state = FileStreamState::Idle
199 }
200 OnError::Fail => {
201 self.state = FileStreamState::Error;
202 return Poll::Ready(Some(Err(e)));
203 }
204 }
205 }
206 },
207 FileStreamState::Scan {
208 reader,
209 partition_values,
210 next,
211 } => {
212 if let Some((next_open_future, _)) = next {
214 if let NextOpen::Pending(f) = next_open_future {
215 if let Poll::Ready(reader) = f.as_mut().poll(cx) {
216 *next_open_future = NextOpen::Ready(reader);
217 }
218 }
219 }
220 match ready!(reader.poll_next_unpin(cx)) {
221 Some(Ok(batch)) => {
222 self.file_stream_metrics.time_scanning_until_data.stop();
223 self.file_stream_metrics.time_scanning_total.stop();
224 let result = self
225 .pc_projector
226 .project(batch, partition_values)
227 .map_err(|e| ArrowError::ExternalError(e.into()))
228 .map(|batch| match &mut self.remain {
229 Some(remain) => {
230 if *remain > batch.num_rows() {
231 *remain -= batch.num_rows();
232 batch
233 } else {
234 let batch = batch.slice(0, *remain);
235 self.state = FileStreamState::Limit;
236 *remain = 0;
237 batch
238 }
239 }
240 None => batch,
241 });
242
243 if result.is_err() {
244 self.state = FileStreamState::Error
247 }
248 self.file_stream_metrics.time_scanning_total.start();
249 return Poll::Ready(Some(result.map_err(Into::into)));
250 }
251 Some(Err(err)) => {
252 self.file_stream_metrics.file_scan_errors.add(1);
253 self.file_stream_metrics.time_scanning_until_data.stop();
254 self.file_stream_metrics.time_scanning_total.stop();
255
256 match self.on_error {
257 OnError::Skip => match mem::take(next) {
259 Some((future, partition_values)) => {
260 self.file_stream_metrics.time_opening.start();
261
262 match future {
263 NextOpen::Pending(future) => {
264 self.state = FileStreamState::Open {
265 future,
266 partition_values,
267 }
268 }
269 NextOpen::Ready(reader) => {
270 self.state = FileStreamState::Open {
271 future: Box::pin(std::future::ready(
272 reader,
273 )),
274 partition_values,
275 }
276 }
277 }
278 }
279 None => return Poll::Ready(None),
280 },
281 OnError::Fail => {
282 self.state = FileStreamState::Error;
283 return Poll::Ready(Some(Err(err.into())));
284 }
285 }
286 }
287 None => {
288 self.file_stream_metrics.time_scanning_until_data.stop();
289 self.file_stream_metrics.time_scanning_total.stop();
290
291 match mem::take(next) {
292 Some((future, partition_values)) => {
293 self.file_stream_metrics.time_opening.start();
294
295 match future {
296 NextOpen::Pending(future) => {
297 self.state = FileStreamState::Open {
298 future,
299 partition_values,
300 }
301 }
302 NextOpen::Ready(reader) => {
303 self.state = FileStreamState::Open {
304 future: Box::pin(std::future::ready(
305 reader,
306 )),
307 partition_values,
308 }
309 }
310 }
311 }
312 None => return Poll::Ready(None),
313 }
314 }
315 }
316 }
317 FileStreamState::Error | FileStreamState::Limit => {
318 return Poll::Ready(None)
319 }
320 }
321 }
322 }
323}
324
325impl Stream for FileStream {
326 type Item = Result<RecordBatch>;
327
328 fn poll_next(
329 mut self: Pin<&mut Self>,
330 cx: &mut Context<'_>,
331 ) -> Poll<Option<Self::Item>> {
332 self.file_stream_metrics.time_processing.start();
333 let result = self.poll_inner(cx);
334 self.file_stream_metrics.time_processing.stop();
335 self.baseline_metrics.record_poll(result)
336 }
337}
338
339impl RecordBatchStream for FileStream {
340 fn schema(&self) -> SchemaRef {
341 Arc::clone(&self.projected_schema)
342 }
343}
344
345pub type FileOpenFuture =
347 BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>;
348
349pub enum OnError {
351 Fail,
353 Skip,
355}
356
357impl Default for OnError {
358 fn default() -> Self {
359 Self::Fail
360 }
361}
362
363pub trait FileOpener: Unpin + Send + Sync {
368 fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>;
371}
372
373pub enum NextOpen {
377 Pending(FileOpenFuture),
378 Ready(Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>),
379}
380
381pub enum FileStreamState {
382 Idle,
384 Open {
387 future: FileOpenFuture,
389 partition_values: Vec<ScalarValue>,
391 },
392 Scan {
395 partition_values: Vec<ScalarValue>,
397 reader: BoxStream<'static, Result<RecordBatch, ArrowError>>,
399 next: Option<(NextOpen, Vec<ScalarValue>)>,
404 },
405 Error,
407 Limit,
409}
410
411pub struct StartableTime {
413 pub metrics: Time,
414 pub start: Option<Instant>,
416}
417
418impl StartableTime {
419 pub fn start(&mut self) {
420 assert!(self.start.is_none());
421 self.start = Some(Instant::now());
422 }
423
424 pub fn stop(&mut self) {
425 if let Some(start) = self.start.take() {
426 self.metrics.add_elapsed(start);
427 }
428 }
429}
430
431#[allow(rustdoc::broken_intra_doc_links)]
432pub struct FileStreamMetrics {
440 pub time_opening: StartableTime,
451 pub time_scanning_until_data: StartableTime,
457 pub time_scanning_total: StartableTime,
464 pub time_processing: StartableTime,
468 pub file_open_errors: Count,
473 pub file_scan_errors: Count,
478}
479
480impl FileStreamMetrics {
481 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
482 let time_opening = StartableTime {
483 metrics: MetricBuilder::new(metrics)
484 .subset_time("time_elapsed_opening", partition),
485 start: None,
486 };
487
488 let time_scanning_until_data = StartableTime {
489 metrics: MetricBuilder::new(metrics)
490 .subset_time("time_elapsed_scanning_until_data", partition),
491 start: None,
492 };
493
494 let time_scanning_total = StartableTime {
495 metrics: MetricBuilder::new(metrics)
496 .subset_time("time_elapsed_scanning_total", partition),
497 start: None,
498 };
499
500 let time_processing = StartableTime {
501 metrics: MetricBuilder::new(metrics)
502 .subset_time("time_elapsed_processing", partition),
503 start: None,
504 };
505
506 let file_open_errors =
507 MetricBuilder::new(metrics).counter("file_open_errors", partition);
508
509 let file_scan_errors =
510 MetricBuilder::new(metrics).counter("file_scan_errors", partition);
511
512 Self {
513 time_opening,
514 time_scanning_until_data,
515 time_scanning_total,
516 time_processing,
517 file_open_errors,
518 file_scan_errors,
519 }
520 }
521}
522
523#[cfg(test)]
524mod tests {
525 use crate::file_scan_config::FileScanConfig;
526 use crate::tests::make_partition;
527 use crate::PartitionedFile;
528 use arrow::error::ArrowError;
529 use datafusion_common::error::Result;
530 use datafusion_execution::object_store::ObjectStoreUrl;
531 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
532 use futures::{FutureExt as _, StreamExt as _};
533 use std::sync::atomic::{AtomicUsize, Ordering};
534 use std::sync::Arc;
535
536 use crate::file_meta::FileMeta;
537 use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
538 use crate::test_util::MockSource;
539 use arrow::array::RecordBatch;
540 use arrow::datatypes::Schema;
541
542 use datafusion_common::{assert_batches_eq, internal_err};
543
544 #[derive(Default)]
546 struct TestOpener {
547 error_opening_idx: Vec<usize>,
549 error_scanning_idx: Vec<usize>,
551 current_idx: AtomicUsize,
553 records: Vec<RecordBatch>,
555 }
556
557 impl FileOpener for TestOpener {
558 fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
559 let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
560
561 if self.error_opening_idx.contains(&idx) {
562 Ok(futures::future::ready(internal_err!("error opening")).boxed())
563 } else if self.error_scanning_idx.contains(&idx) {
564 let error = futures::future::ready(Err(ArrowError::IpcError(
565 "error scanning".to_owned(),
566 )));
567 let stream = futures::stream::once(error).boxed();
568 Ok(futures::future::ready(Ok(stream)).boxed())
569 } else {
570 let iterator = self.records.clone().into_iter().map(Ok);
571 let stream = futures::stream::iter(iterator).boxed();
572 Ok(futures::future::ready(Ok(stream)).boxed())
573 }
574 }
575 }
576
577 #[derive(Default)]
578 struct FileStreamTest {
579 num_files: usize,
581 limit: Option<usize>,
583 on_error: OnError,
585 opener: TestOpener,
587 }
588
589 impl FileStreamTest {
590 pub fn new() -> Self {
591 Self::default()
592 }
593
594 pub fn with_num_files(mut self, num_files: usize) -> Self {
596 self.num_files = num_files;
597 self
598 }
599
600 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
602 self.limit = limit;
603 self
604 }
605
606 pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
609 self.opener.error_opening_idx = idx;
610 self
611 }
612
613 pub fn with_scan_errors(mut self, idx: Vec<usize>) -> Self {
616 self.opener.error_scanning_idx = idx;
617 self
618 }
619
620 pub fn with_on_error(mut self, on_error: OnError) -> Self {
622 self.on_error = on_error;
623 self
624 }
625
626 pub fn with_records(mut self, records: Vec<RecordBatch>) -> Self {
629 self.opener.records = records;
630 self
631 }
632
633 pub async fn result(self) -> Result<Vec<RecordBatch>> {
635 let file_schema = self
636 .opener
637 .records
638 .first()
639 .map(|batch| batch.schema())
640 .unwrap_or_else(|| Arc::new(Schema::empty()));
641
642 let mock_files: Vec<(String, u64)> = (0..self.num_files)
644 .map(|idx| (format!("mock_file{idx}"), 10_u64))
645 .collect();
646
647 let file_group = mock_files
653 .into_iter()
654 .map(|(name, size)| PartitionedFile::new(name, size))
655 .collect();
656
657 let on_error = self.on_error;
658
659 let config = FileScanConfig::new(
660 ObjectStoreUrl::parse("test:///").unwrap(),
661 file_schema,
662 Arc::new(MockSource::default()),
663 )
664 .with_file_group(file_group)
665 .with_limit(self.limit);
666 let metrics_set = ExecutionPlanMetricsSet::new();
667 let file_stream =
668 FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set)
669 .unwrap()
670 .with_on_error(on_error);
671
672 file_stream
673 .collect::<Vec<_>>()
674 .await
675 .into_iter()
676 .collect::<Result<Vec<_>>>()
677 }
678 }
679
680 async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
682 FileStreamTest::new()
683 .with_records(vec![make_partition(3), make_partition(2)])
684 .with_num_files(2)
685 .with_limit(limit)
686 .result()
687 .await
688 .expect("error executing stream")
689 }
690
691 #[tokio::test]
692 async fn on_error_opening() -> Result<()> {
693 let batches = FileStreamTest::new()
694 .with_records(vec![make_partition(3), make_partition(2)])
695 .with_num_files(2)
696 .with_on_error(OnError::Skip)
697 .with_open_errors(vec![0])
698 .result()
699 .await?;
700
701 #[rustfmt::skip]
702 assert_batches_eq!(&[
703 "+---+",
704 "| i |",
705 "+---+",
706 "| 0 |",
707 "| 1 |",
708 "| 2 |",
709 "| 0 |",
710 "| 1 |",
711 "+---+",
712 ], &batches);
713
714 let batches = FileStreamTest::new()
715 .with_records(vec![make_partition(3), make_partition(2)])
716 .with_num_files(2)
717 .with_on_error(OnError::Skip)
718 .with_open_errors(vec![1])
719 .result()
720 .await?;
721
722 #[rustfmt::skip]
723 assert_batches_eq!(&[
724 "+---+",
725 "| i |",
726 "+---+",
727 "| 0 |",
728 "| 1 |",
729 "| 2 |",
730 "| 0 |",
731 "| 1 |",
732 "+---+",
733 ], &batches);
734
735 let batches = FileStreamTest::new()
736 .with_records(vec![make_partition(3), make_partition(2)])
737 .with_num_files(2)
738 .with_on_error(OnError::Skip)
739 .with_open_errors(vec![0, 1])
740 .result()
741 .await?;
742
743 #[rustfmt::skip]
744 assert_batches_eq!(&[
745 "++",
746 "++",
747 ], &batches);
748
749 Ok(())
750 }
751
752 #[tokio::test]
753 async fn on_error_scanning_fail() -> Result<()> {
754 let result = FileStreamTest::new()
755 .with_records(vec![make_partition(3), make_partition(2)])
756 .with_num_files(2)
757 .with_on_error(OnError::Fail)
758 .with_scan_errors(vec![1])
759 .result()
760 .await;
761
762 assert!(result.is_err());
763
764 Ok(())
765 }
766
767 #[tokio::test]
768 async fn on_error_opening_fail() -> Result<()> {
769 let result = FileStreamTest::new()
770 .with_records(vec![make_partition(3), make_partition(2)])
771 .with_num_files(2)
772 .with_on_error(OnError::Fail)
773 .with_open_errors(vec![1])
774 .result()
775 .await;
776
777 assert!(result.is_err());
778
779 Ok(())
780 }
781
782 #[tokio::test]
783 async fn on_error_scanning() -> Result<()> {
784 let batches = FileStreamTest::new()
785 .with_records(vec![make_partition(3), make_partition(2)])
786 .with_num_files(2)
787 .with_on_error(OnError::Skip)
788 .with_scan_errors(vec![0])
789 .result()
790 .await?;
791
792 #[rustfmt::skip]
793 assert_batches_eq!(&[
794 "+---+",
795 "| i |",
796 "+---+",
797 "| 0 |",
798 "| 1 |",
799 "| 2 |",
800 "| 0 |",
801 "| 1 |",
802 "+---+",
803 ], &batches);
804
805 let batches = FileStreamTest::new()
806 .with_records(vec![make_partition(3), make_partition(2)])
807 .with_num_files(2)
808 .with_on_error(OnError::Skip)
809 .with_scan_errors(vec![1])
810 .result()
811 .await?;
812
813 #[rustfmt::skip]
814 assert_batches_eq!(&[
815 "+---+",
816 "| i |",
817 "+---+",
818 "| 0 |",
819 "| 1 |",
820 "| 2 |",
821 "| 0 |",
822 "| 1 |",
823 "+---+",
824 ], &batches);
825
826 let batches = FileStreamTest::new()
827 .with_records(vec![make_partition(3), make_partition(2)])
828 .with_num_files(2)
829 .with_on_error(OnError::Skip)
830 .with_scan_errors(vec![0, 1])
831 .result()
832 .await?;
833
834 #[rustfmt::skip]
835 assert_batches_eq!(&[
836 "++",
837 "++",
838 ], &batches);
839
840 Ok(())
841 }
842
843 #[tokio::test]
844 async fn on_error_mixed() -> Result<()> {
845 let batches = FileStreamTest::new()
846 .with_records(vec![make_partition(3), make_partition(2)])
847 .with_num_files(3)
848 .with_on_error(OnError::Skip)
849 .with_open_errors(vec![1])
850 .with_scan_errors(vec![0])
851 .result()
852 .await?;
853
854 #[rustfmt::skip]
855 assert_batches_eq!(&[
856 "+---+",
857 "| i |",
858 "+---+",
859 "| 0 |",
860 "| 1 |",
861 "| 2 |",
862 "| 0 |",
863 "| 1 |",
864 "+---+",
865 ], &batches);
866
867 let batches = FileStreamTest::new()
868 .with_records(vec![make_partition(3), make_partition(2)])
869 .with_num_files(3)
870 .with_on_error(OnError::Skip)
871 .with_open_errors(vec![0])
872 .with_scan_errors(vec![1])
873 .result()
874 .await?;
875
876 #[rustfmt::skip]
877 assert_batches_eq!(&[
878 "+---+",
879 "| i |",
880 "+---+",
881 "| 0 |",
882 "| 1 |",
883 "| 2 |",
884 "| 0 |",
885 "| 1 |",
886 "+---+",
887 ], &batches);
888
889 let batches = FileStreamTest::new()
890 .with_records(vec![make_partition(3), make_partition(2)])
891 .with_num_files(3)
892 .with_on_error(OnError::Skip)
893 .with_open_errors(vec![2])
894 .with_scan_errors(vec![0, 1])
895 .result()
896 .await?;
897
898 #[rustfmt::skip]
899 assert_batches_eq!(&[
900 "++",
901 "++",
902 ], &batches);
903
904 let batches = FileStreamTest::new()
905 .with_records(vec![make_partition(3), make_partition(2)])
906 .with_num_files(3)
907 .with_on_error(OnError::Skip)
908 .with_open_errors(vec![0, 2])
909 .with_scan_errors(vec![1])
910 .result()
911 .await?;
912
913 #[rustfmt::skip]
914 assert_batches_eq!(&[
915 "++",
916 "++",
917 ], &batches);
918
919 Ok(())
920 }
921
922 #[tokio::test]
923 async fn without_limit() -> Result<()> {
924 let batches = create_and_collect(None).await;
925
926 #[rustfmt::skip]
927 assert_batches_eq!(&[
928 "+---+",
929 "| i |",
930 "+---+",
931 "| 0 |",
932 "| 1 |",
933 "| 2 |",
934 "| 0 |",
935 "| 1 |",
936 "| 0 |",
937 "| 1 |",
938 "| 2 |",
939 "| 0 |",
940 "| 1 |",
941 "+---+",
942 ], &batches);
943
944 Ok(())
945 }
946
947 #[tokio::test]
948 async fn with_limit_between_files() -> Result<()> {
949 let batches = create_and_collect(Some(5)).await;
950 #[rustfmt::skip]
951 assert_batches_eq!(&[
952 "+---+",
953 "| i |",
954 "+---+",
955 "| 0 |",
956 "| 1 |",
957 "| 2 |",
958 "| 0 |",
959 "| 1 |",
960 "+---+",
961 ], &batches);
962
963 Ok(())
964 }
965
966 #[tokio::test]
967 async fn with_limit_at_middle_of_batch() -> Result<()> {
968 let batches = create_and_collect(Some(6)).await;
969 #[rustfmt::skip]
970 assert_batches_eq!(&[
971 "+---+",
972 "| i |",
973 "+---+",
974 "| 0 |",
975 "| 1 |",
976 "| 2 |",
977 "| 0 |",
978 "| 1 |",
979 "| 0 |",
980 "+---+",
981 ], &batches);
982
983 Ok(())
984 }
985}