1use std::collections::VecDeque;
25use std::mem;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29
30use crate::file_meta::FileMeta;
31use crate::file_scan_config::{FileScanConfig, PartitionColumnProjector};
32use crate::PartitionedFile;
33use arrow::datatypes::SchemaRef;
34use datafusion_common::error::Result;
35use datafusion_execution::RecordBatchStream;
36use datafusion_physical_plan::metrics::{
37 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
38};
39
40use arrow::error::ArrowError;
41use arrow::record_batch::RecordBatch;
42use datafusion_common::instant::Instant;
43use datafusion_common::ScalarValue;
44
45use futures::future::BoxFuture;
46use futures::stream::BoxStream;
47use futures::{ready, FutureExt as _, Stream, StreamExt as _};
48
49pub struct FileStream {
51 file_iter: VecDeque<PartitionedFile>,
53 projected_schema: SchemaRef,
56 remain: Option<usize>,
58 file_opener: Arc<dyn FileOpener>,
61 pc_projector: PartitionColumnProjector,
63 state: FileStreamState,
65 file_stream_metrics: FileStreamMetrics,
67 baseline_metrics: BaselineMetrics,
69 on_error: OnError,
71}
72
73impl FileStream {
74 pub fn new(
76 config: &FileScanConfig,
77 partition: usize,
78 file_opener: Arc<dyn FileOpener>,
79 metrics: &ExecutionPlanMetricsSet,
80 ) -> Result<Self> {
81 let projected_schema = config.projected_schema();
82 let pc_projector = PartitionColumnProjector::new(
83 Arc::clone(&projected_schema),
84 &config
85 .table_partition_cols
86 .iter()
87 .map(|x| x.name().clone())
88 .collect::<Vec<_>>(),
89 );
90
91 let file_group = config.file_groups[partition].clone();
92
93 Ok(Self {
94 file_iter: file_group.into_inner().into_iter().collect(),
95 projected_schema,
96 remain: config.limit,
97 file_opener,
98 pc_projector,
99 state: FileStreamState::Idle,
100 file_stream_metrics: FileStreamMetrics::new(metrics, partition),
101 baseline_metrics: BaselineMetrics::new(metrics, partition),
102 on_error: OnError::Fail,
103 })
104 }
105
106 pub fn with_on_error(mut self, on_error: OnError) -> Self {
111 self.on_error = on_error;
112 self
113 }
114
115 fn start_next_file(&mut self) -> Option<Result<(FileOpenFuture, Vec<ScalarValue>)>> {
120 let part_file = self.file_iter.pop_front()?;
121
122 let file_meta = FileMeta {
123 object_meta: part_file.object_meta,
124 range: part_file.range,
125 extensions: part_file.extensions,
126 metadata_size_hint: part_file.metadata_size_hint,
127 };
128
129 Some(
130 self.file_opener
131 .open(file_meta)
132 .map(|future| (future, part_file.partition_values)),
133 )
134 }
135
136 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
137 loop {
138 match &mut self.state {
139 FileStreamState::Idle => {
140 self.file_stream_metrics.time_opening.start();
141
142 match self.start_next_file().transpose() {
143 Ok(Some((future, partition_values))) => {
144 self.state = FileStreamState::Open {
145 future,
146 partition_values,
147 }
148 }
149 Ok(None) => return Poll::Ready(None),
150 Err(e) => {
151 self.state = FileStreamState::Error;
152 return Poll::Ready(Some(Err(e)));
153 }
154 }
155 }
156 FileStreamState::Open {
157 future,
158 partition_values,
159 } => match ready!(future.poll_unpin(cx)) {
160 Ok(reader) => {
161 let partition_values = mem::take(partition_values);
162
163 self.file_stream_metrics.time_opening.stop();
165 let next = self.start_next_file().transpose();
166 self.file_stream_metrics.time_scanning_until_data.start();
167 self.file_stream_metrics.time_scanning_total.start();
168
169 match next {
170 Ok(Some((next_future, next_partition_values))) => {
171 self.state = FileStreamState::Scan {
172 partition_values,
173 reader,
174 next: Some((
175 NextOpen::Pending(next_future),
176 next_partition_values,
177 )),
178 };
179 }
180 Ok(None) => {
181 self.state = FileStreamState::Scan {
182 reader,
183 partition_values,
184 next: None,
185 };
186 }
187 Err(e) => {
188 self.state = FileStreamState::Error;
189 return Poll::Ready(Some(Err(e)));
190 }
191 }
192 }
193 Err(e) => {
194 self.file_stream_metrics.file_open_errors.add(1);
195 match self.on_error {
196 OnError::Skip => {
197 self.file_stream_metrics.time_opening.stop();
198 self.state = FileStreamState::Idle
199 }
200 OnError::Fail => {
201 self.state = FileStreamState::Error;
202 return Poll::Ready(Some(Err(e)));
203 }
204 }
205 }
206 },
207 FileStreamState::Scan {
208 reader,
209 partition_values,
210 next,
211 } => {
212 if let Some((next_open_future, _)) = next {
214 if let NextOpen::Pending(f) = next_open_future {
215 if let Poll::Ready(reader) = f.as_mut().poll(cx) {
216 *next_open_future = NextOpen::Ready(reader);
217 }
218 }
219 }
220 match ready!(reader.poll_next_unpin(cx)) {
221 Some(Ok(batch)) => {
222 self.file_stream_metrics.time_scanning_until_data.stop();
223 self.file_stream_metrics.time_scanning_total.stop();
224 let result = self
225 .pc_projector
226 .project(batch, partition_values)
227 .map_err(|e| ArrowError::ExternalError(e.into()))
228 .map(|batch| match &mut self.remain {
229 Some(remain) => {
230 if *remain > batch.num_rows() {
231 *remain -= batch.num_rows();
232 batch
233 } else {
234 let batch = batch.slice(0, *remain);
235 self.state = FileStreamState::Limit;
236 *remain = 0;
237 batch
238 }
239 }
240 None => batch,
241 });
242
243 if result.is_err() {
244 self.state = FileStreamState::Error
247 }
248 self.file_stream_metrics.time_scanning_total.start();
249 return Poll::Ready(Some(result.map_err(Into::into)));
250 }
251 Some(Err(err)) => {
252 self.file_stream_metrics.file_scan_errors.add(1);
253 self.file_stream_metrics.time_scanning_until_data.stop();
254 self.file_stream_metrics.time_scanning_total.stop();
255
256 match self.on_error {
257 OnError::Skip => match mem::take(next) {
259 Some((future, partition_values)) => {
260 self.file_stream_metrics.time_opening.start();
261
262 match future {
263 NextOpen::Pending(future) => {
264 self.state = FileStreamState::Open {
265 future,
266 partition_values,
267 }
268 }
269 NextOpen::Ready(reader) => {
270 self.state = FileStreamState::Open {
271 future: Box::pin(std::future::ready(
272 reader,
273 )),
274 partition_values,
275 }
276 }
277 }
278 }
279 None => return Poll::Ready(None),
280 },
281 OnError::Fail => {
282 self.state = FileStreamState::Error;
283 return Poll::Ready(Some(Err(err.into())));
284 }
285 }
286 }
287 None => {
288 self.file_stream_metrics.time_scanning_until_data.stop();
289 self.file_stream_metrics.time_scanning_total.stop();
290
291 match mem::take(next) {
292 Some((future, partition_values)) => {
293 self.file_stream_metrics.time_opening.start();
294
295 match future {
296 NextOpen::Pending(future) => {
297 self.state = FileStreamState::Open {
298 future,
299 partition_values,
300 }
301 }
302 NextOpen::Ready(reader) => {
303 self.state = FileStreamState::Open {
304 future: Box::pin(std::future::ready(
305 reader,
306 )),
307 partition_values,
308 }
309 }
310 }
311 }
312 None => return Poll::Ready(None),
313 }
314 }
315 }
316 }
317 FileStreamState::Error | FileStreamState::Limit => {
318 return Poll::Ready(None)
319 }
320 }
321 }
322 }
323}
324
325impl Stream for FileStream {
326 type Item = Result<RecordBatch>;
327
328 fn poll_next(
329 mut self: Pin<&mut Self>,
330 cx: &mut Context<'_>,
331 ) -> Poll<Option<Self::Item>> {
332 self.file_stream_metrics.time_processing.start();
333 let result = self.poll_inner(cx);
334 self.file_stream_metrics.time_processing.stop();
335 self.baseline_metrics.record_poll(result)
336 }
337}
338
339impl RecordBatchStream for FileStream {
340 fn schema(&self) -> SchemaRef {
341 Arc::clone(&self.projected_schema)
342 }
343}
344
345pub type FileOpenFuture =
347 BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>;
348
349pub enum OnError {
351 Fail,
353 Skip,
355}
356
357impl Default for OnError {
358 fn default() -> Self {
359 Self::Fail
360 }
361}
362
363pub trait FileOpener: Unpin + Send + Sync {
368 fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>;
371}
372
373pub enum NextOpen {
377 Pending(FileOpenFuture),
378 Ready(Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>),
379}
380
381pub enum FileStreamState {
382 Idle,
384 Open {
387 future: FileOpenFuture,
389 partition_values: Vec<ScalarValue>,
391 },
392 Scan {
395 partition_values: Vec<ScalarValue>,
397 reader: BoxStream<'static, Result<RecordBatch, ArrowError>>,
399 next: Option<(NextOpen, Vec<ScalarValue>)>,
404 },
405 Error,
407 Limit,
409}
410
411pub struct StartableTime {
413 pub metrics: Time,
414 pub start: Option<Instant>,
416}
417
418impl StartableTime {
419 pub fn start(&mut self) {
420 assert!(self.start.is_none());
421 self.start = Some(Instant::now());
422 }
423
424 pub fn stop(&mut self) {
425 if let Some(start) = self.start.take() {
426 self.metrics.add_elapsed(start);
427 }
428 }
429}
430
431#[allow(rustdoc::broken_intra_doc_links)]
432pub struct FileStreamMetrics {
440 pub time_opening: StartableTime,
451 pub time_scanning_until_data: StartableTime,
457 pub time_scanning_total: StartableTime,
464 pub time_processing: StartableTime,
468 pub file_open_errors: Count,
473 pub file_scan_errors: Count,
478}
479
480impl FileStreamMetrics {
481 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
482 let time_opening = StartableTime {
483 metrics: MetricBuilder::new(metrics)
484 .subset_time("time_elapsed_opening", partition),
485 start: None,
486 };
487
488 let time_scanning_until_data = StartableTime {
489 metrics: MetricBuilder::new(metrics)
490 .subset_time("time_elapsed_scanning_until_data", partition),
491 start: None,
492 };
493
494 let time_scanning_total = StartableTime {
495 metrics: MetricBuilder::new(metrics)
496 .subset_time("time_elapsed_scanning_total", partition),
497 start: None,
498 };
499
500 let time_processing = StartableTime {
501 metrics: MetricBuilder::new(metrics)
502 .subset_time("time_elapsed_processing", partition),
503 start: None,
504 };
505
506 let file_open_errors =
507 MetricBuilder::new(metrics).counter("file_open_errors", partition);
508
509 let file_scan_errors =
510 MetricBuilder::new(metrics).counter("file_scan_errors", partition);
511
512 Self {
513 time_opening,
514 time_scanning_until_data,
515 time_scanning_total,
516 time_processing,
517 file_open_errors,
518 file_scan_errors,
519 }
520 }
521}
522
523#[cfg(test)]
524mod tests {
525 use crate::file_scan_config::FileScanConfigBuilder;
526 use crate::tests::make_partition;
527 use crate::PartitionedFile;
528 use arrow::error::ArrowError;
529 use datafusion_common::error::Result;
530 use datafusion_execution::object_store::ObjectStoreUrl;
531 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
532 use futures::{FutureExt as _, StreamExt as _};
533 use std::sync::atomic::{AtomicUsize, Ordering};
534 use std::sync::Arc;
535
536 use crate::file_meta::FileMeta;
537 use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
538 use crate::test_util::MockSource;
539 use arrow::array::RecordBatch;
540 use arrow::datatypes::Schema;
541
542 use datafusion_common::{assert_batches_eq, internal_err};
543
544 #[derive(Default)]
546 struct TestOpener {
547 error_opening_idx: Vec<usize>,
549 error_scanning_idx: Vec<usize>,
551 current_idx: AtomicUsize,
553 records: Vec<RecordBatch>,
555 }
556
557 impl FileOpener for TestOpener {
558 fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
559 let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
560
561 if self.error_opening_idx.contains(&idx) {
562 Ok(futures::future::ready(internal_err!("error opening")).boxed())
563 } else if self.error_scanning_idx.contains(&idx) {
564 let error = futures::future::ready(Err(ArrowError::IpcError(
565 "error scanning".to_owned(),
566 )));
567 let stream = futures::stream::once(error).boxed();
568 Ok(futures::future::ready(Ok(stream)).boxed())
569 } else {
570 let iterator = self.records.clone().into_iter().map(Ok);
571 let stream = futures::stream::iter(iterator).boxed();
572 Ok(futures::future::ready(Ok(stream)).boxed())
573 }
574 }
575 }
576
577 #[derive(Default)]
578 struct FileStreamTest {
579 num_files: usize,
581 limit: Option<usize>,
583 on_error: OnError,
585 opener: TestOpener,
587 }
588
589 impl FileStreamTest {
590 pub fn new() -> Self {
591 Self::default()
592 }
593
594 pub fn with_num_files(mut self, num_files: usize) -> Self {
596 self.num_files = num_files;
597 self
598 }
599
600 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
602 self.limit = limit;
603 self
604 }
605
606 pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
609 self.opener.error_opening_idx = idx;
610 self
611 }
612
613 pub fn with_scan_errors(mut self, idx: Vec<usize>) -> Self {
616 self.opener.error_scanning_idx = idx;
617 self
618 }
619
620 pub fn with_on_error(mut self, on_error: OnError) -> Self {
622 self.on_error = on_error;
623 self
624 }
625
626 pub fn with_records(mut self, records: Vec<RecordBatch>) -> Self {
629 self.opener.records = records;
630 self
631 }
632
633 pub async fn result(self) -> Result<Vec<RecordBatch>> {
635 let file_schema = self
636 .opener
637 .records
638 .first()
639 .map(|batch| batch.schema())
640 .unwrap_or_else(|| Arc::new(Schema::empty()));
641
642 let mock_files: Vec<(String, u64)> = (0..self.num_files)
644 .map(|idx| (format!("mock_file{idx}"), 10_u64))
645 .collect();
646
647 let file_group = mock_files
653 .into_iter()
654 .map(|(name, size)| PartitionedFile::new(name, size))
655 .collect();
656
657 let on_error = self.on_error;
658
659 let config = FileScanConfigBuilder::new(
660 ObjectStoreUrl::parse("test:///").unwrap(),
661 file_schema,
662 Arc::new(MockSource::default()),
663 )
664 .with_file_group(file_group)
665 .with_limit(self.limit)
666 .build();
667 let metrics_set = ExecutionPlanMetricsSet::new();
668 let file_stream =
669 FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set)
670 .unwrap()
671 .with_on_error(on_error);
672
673 file_stream
674 .collect::<Vec<_>>()
675 .await
676 .into_iter()
677 .collect::<Result<Vec<_>>>()
678 }
679 }
680
681 async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
683 FileStreamTest::new()
684 .with_records(vec![make_partition(3), make_partition(2)])
685 .with_num_files(2)
686 .with_limit(limit)
687 .result()
688 .await
689 .expect("error executing stream")
690 }
691
692 #[tokio::test]
693 async fn on_error_opening() -> Result<()> {
694 let batches = FileStreamTest::new()
695 .with_records(vec![make_partition(3), make_partition(2)])
696 .with_num_files(2)
697 .with_on_error(OnError::Skip)
698 .with_open_errors(vec![0])
699 .result()
700 .await?;
701
702 #[rustfmt::skip]
703 assert_batches_eq!(&[
704 "+---+",
705 "| i |",
706 "+---+",
707 "| 0 |",
708 "| 1 |",
709 "| 2 |",
710 "| 0 |",
711 "| 1 |",
712 "+---+",
713 ], &batches);
714
715 let batches = FileStreamTest::new()
716 .with_records(vec![make_partition(3), make_partition(2)])
717 .with_num_files(2)
718 .with_on_error(OnError::Skip)
719 .with_open_errors(vec![1])
720 .result()
721 .await?;
722
723 #[rustfmt::skip]
724 assert_batches_eq!(&[
725 "+---+",
726 "| i |",
727 "+---+",
728 "| 0 |",
729 "| 1 |",
730 "| 2 |",
731 "| 0 |",
732 "| 1 |",
733 "+---+",
734 ], &batches);
735
736 let batches = FileStreamTest::new()
737 .with_records(vec![make_partition(3), make_partition(2)])
738 .with_num_files(2)
739 .with_on_error(OnError::Skip)
740 .with_open_errors(vec![0, 1])
741 .result()
742 .await?;
743
744 #[rustfmt::skip]
745 assert_batches_eq!(&[
746 "++",
747 "++",
748 ], &batches);
749
750 Ok(())
751 }
752
753 #[tokio::test]
754 async fn on_error_scanning_fail() -> Result<()> {
755 let result = FileStreamTest::new()
756 .with_records(vec![make_partition(3), make_partition(2)])
757 .with_num_files(2)
758 .with_on_error(OnError::Fail)
759 .with_scan_errors(vec![1])
760 .result()
761 .await;
762
763 assert!(result.is_err());
764
765 Ok(())
766 }
767
768 #[tokio::test]
769 async fn on_error_opening_fail() -> Result<()> {
770 let result = FileStreamTest::new()
771 .with_records(vec![make_partition(3), make_partition(2)])
772 .with_num_files(2)
773 .with_on_error(OnError::Fail)
774 .with_open_errors(vec![1])
775 .result()
776 .await;
777
778 assert!(result.is_err());
779
780 Ok(())
781 }
782
783 #[tokio::test]
784 async fn on_error_scanning() -> Result<()> {
785 let batches = FileStreamTest::new()
786 .with_records(vec![make_partition(3), make_partition(2)])
787 .with_num_files(2)
788 .with_on_error(OnError::Skip)
789 .with_scan_errors(vec![0])
790 .result()
791 .await?;
792
793 #[rustfmt::skip]
794 assert_batches_eq!(&[
795 "+---+",
796 "| i |",
797 "+---+",
798 "| 0 |",
799 "| 1 |",
800 "| 2 |",
801 "| 0 |",
802 "| 1 |",
803 "+---+",
804 ], &batches);
805
806 let batches = FileStreamTest::new()
807 .with_records(vec![make_partition(3), make_partition(2)])
808 .with_num_files(2)
809 .with_on_error(OnError::Skip)
810 .with_scan_errors(vec![1])
811 .result()
812 .await?;
813
814 #[rustfmt::skip]
815 assert_batches_eq!(&[
816 "+---+",
817 "| i |",
818 "+---+",
819 "| 0 |",
820 "| 1 |",
821 "| 2 |",
822 "| 0 |",
823 "| 1 |",
824 "+---+",
825 ], &batches);
826
827 let batches = FileStreamTest::new()
828 .with_records(vec![make_partition(3), make_partition(2)])
829 .with_num_files(2)
830 .with_on_error(OnError::Skip)
831 .with_scan_errors(vec![0, 1])
832 .result()
833 .await?;
834
835 #[rustfmt::skip]
836 assert_batches_eq!(&[
837 "++",
838 "++",
839 ], &batches);
840
841 Ok(())
842 }
843
844 #[tokio::test]
845 async fn on_error_mixed() -> Result<()> {
846 let batches = FileStreamTest::new()
847 .with_records(vec![make_partition(3), make_partition(2)])
848 .with_num_files(3)
849 .with_on_error(OnError::Skip)
850 .with_open_errors(vec![1])
851 .with_scan_errors(vec![0])
852 .result()
853 .await?;
854
855 #[rustfmt::skip]
856 assert_batches_eq!(&[
857 "+---+",
858 "| i |",
859 "+---+",
860 "| 0 |",
861 "| 1 |",
862 "| 2 |",
863 "| 0 |",
864 "| 1 |",
865 "+---+",
866 ], &batches);
867
868 let batches = FileStreamTest::new()
869 .with_records(vec![make_partition(3), make_partition(2)])
870 .with_num_files(3)
871 .with_on_error(OnError::Skip)
872 .with_open_errors(vec![0])
873 .with_scan_errors(vec![1])
874 .result()
875 .await?;
876
877 #[rustfmt::skip]
878 assert_batches_eq!(&[
879 "+---+",
880 "| i |",
881 "+---+",
882 "| 0 |",
883 "| 1 |",
884 "| 2 |",
885 "| 0 |",
886 "| 1 |",
887 "+---+",
888 ], &batches);
889
890 let batches = FileStreamTest::new()
891 .with_records(vec![make_partition(3), make_partition(2)])
892 .with_num_files(3)
893 .with_on_error(OnError::Skip)
894 .with_open_errors(vec![2])
895 .with_scan_errors(vec![0, 1])
896 .result()
897 .await?;
898
899 #[rustfmt::skip]
900 assert_batches_eq!(&[
901 "++",
902 "++",
903 ], &batches);
904
905 let batches = FileStreamTest::new()
906 .with_records(vec![make_partition(3), make_partition(2)])
907 .with_num_files(3)
908 .with_on_error(OnError::Skip)
909 .with_open_errors(vec![0, 2])
910 .with_scan_errors(vec![1])
911 .result()
912 .await?;
913
914 #[rustfmt::skip]
915 assert_batches_eq!(&[
916 "++",
917 "++",
918 ], &batches);
919
920 Ok(())
921 }
922
923 #[tokio::test]
924 async fn without_limit() -> Result<()> {
925 let batches = create_and_collect(None).await;
926
927 #[rustfmt::skip]
928 assert_batches_eq!(&[
929 "+---+",
930 "| i |",
931 "+---+",
932 "| 0 |",
933 "| 1 |",
934 "| 2 |",
935 "| 0 |",
936 "| 1 |",
937 "| 0 |",
938 "| 1 |",
939 "| 2 |",
940 "| 0 |",
941 "| 1 |",
942 "+---+",
943 ], &batches);
944
945 Ok(())
946 }
947
948 #[tokio::test]
949 async fn with_limit_between_files() -> Result<()> {
950 let batches = create_and_collect(Some(5)).await;
951 #[rustfmt::skip]
952 assert_batches_eq!(&[
953 "+---+",
954 "| i |",
955 "+---+",
956 "| 0 |",
957 "| 1 |",
958 "| 2 |",
959 "| 0 |",
960 "| 1 |",
961 "+---+",
962 ], &batches);
963
964 Ok(())
965 }
966
967 #[tokio::test]
968 async fn with_limit_at_middle_of_batch() -> Result<()> {
969 let batches = create_and_collect(Some(6)).await;
970 #[rustfmt::skip]
971 assert_batches_eq!(&[
972 "+---+",
973 "| i |",
974 "+---+",
975 "| 0 |",
976 "| 1 |",
977 "| 2 |",
978 "| 0 |",
979 "| 1 |",
980 "| 0 |",
981 "+---+",
982 ], &batches);
983
984 Ok(())
985 }
986}