1use std::collections::VecDeque;
25use std::mem;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29
30use crate::file_meta::FileMeta;
31use crate::file_scan_config::{FileScanConfig, PartitionColumnProjector};
32use crate::PartitionedFile;
33use arrow::datatypes::SchemaRef;
34use datafusion_common::error::Result;
35use datafusion_execution::RecordBatchStream;
36use datafusion_physical_plan::metrics::{
37 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
38};
39
40use arrow::error::ArrowError;
41use arrow::record_batch::RecordBatch;
42use datafusion_common::instant::Instant;
43use datafusion_common::ScalarValue;
44
45use futures::future::BoxFuture;
46use futures::stream::BoxStream;
47use futures::{ready, FutureExt as _, Stream, StreamExt as _};
48
49pub struct FileStream {
51 file_iter: VecDeque<PartitionedFile>,
53 projected_schema: SchemaRef,
56 remain: Option<usize>,
58 file_opener: Arc<dyn FileOpener>,
61 pc_projector: PartitionColumnProjector,
63 state: FileStreamState,
65 file_stream_metrics: FileStreamMetrics,
67 baseline_metrics: BaselineMetrics,
69 on_error: OnError,
71}
72
73impl FileStream {
74 pub fn new(
76 config: &FileScanConfig,
77 partition: usize,
78 file_opener: Arc<dyn FileOpener>,
79 metrics: &ExecutionPlanMetricsSet,
80 ) -> Result<Self> {
81 let projected_schema = config.projected_schema();
82 let pc_projector = PartitionColumnProjector::new(
83 Arc::clone(&projected_schema),
84 &config
85 .table_partition_cols
86 .iter()
87 .map(|x| x.name().clone())
88 .collect::<Vec<_>>(),
89 );
90
91 let file_group = config.file_groups[partition].clone();
92
93 Ok(Self {
94 file_iter: file_group.into_inner().into_iter().collect(),
95 projected_schema,
96 remain: config.limit,
97 file_opener,
98 pc_projector,
99 state: FileStreamState::Idle,
100 file_stream_metrics: FileStreamMetrics::new(metrics, partition),
101 baseline_metrics: BaselineMetrics::new(metrics, partition),
102 on_error: OnError::Fail,
103 })
104 }
105
106 pub fn with_on_error(mut self, on_error: OnError) -> Self {
111 self.on_error = on_error;
112 self
113 }
114
115 fn start_next_file(&mut self) -> Option<Result<(FileOpenFuture, Vec<ScalarValue>)>> {
120 let part_file = self.file_iter.pop_front()?;
121
122 let file_meta = FileMeta {
123 object_meta: part_file.object_meta.clone(),
124 range: part_file.range.clone(),
125 extensions: part_file.extensions.clone(),
126 metadata_size_hint: part_file.metadata_size_hint,
127 };
128
129 let partition_values = part_file.partition_values.clone();
130 Some(
131 self.file_opener
132 .open(file_meta, part_file)
133 .map(|future| (future, partition_values)),
134 )
135 }
136
137 fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
138 loop {
139 match &mut self.state {
140 FileStreamState::Idle => {
141 self.file_stream_metrics.time_opening.start();
142
143 match self.start_next_file().transpose() {
144 Ok(Some((future, partition_values))) => {
145 self.state = FileStreamState::Open {
146 future,
147 partition_values,
148 }
149 }
150 Ok(None) => return Poll::Ready(None),
151 Err(e) => {
152 self.state = FileStreamState::Error;
153 return Poll::Ready(Some(Err(e)));
154 }
155 }
156 }
157 FileStreamState::Open {
158 future,
159 partition_values,
160 } => match ready!(future.poll_unpin(cx)) {
161 Ok(reader) => {
162 let partition_values = mem::take(partition_values);
163
164 self.file_stream_metrics.time_opening.stop();
166 let next = self.start_next_file().transpose();
167 self.file_stream_metrics.time_scanning_until_data.start();
168 self.file_stream_metrics.time_scanning_total.start();
169
170 match next {
171 Ok(Some((next_future, next_partition_values))) => {
172 self.state = FileStreamState::Scan {
173 partition_values,
174 reader,
175 next: Some((
176 NextOpen::Pending(next_future),
177 next_partition_values,
178 )),
179 };
180 }
181 Ok(None) => {
182 self.state = FileStreamState::Scan {
183 reader,
184 partition_values,
185 next: None,
186 };
187 }
188 Err(e) => {
189 self.state = FileStreamState::Error;
190 return Poll::Ready(Some(Err(e)));
191 }
192 }
193 }
194 Err(e) => {
195 self.file_stream_metrics.file_open_errors.add(1);
196 match self.on_error {
197 OnError::Skip => {
198 self.file_stream_metrics.time_opening.stop();
199 self.state = FileStreamState::Idle
200 }
201 OnError::Fail => {
202 self.state = FileStreamState::Error;
203 return Poll::Ready(Some(Err(e)));
204 }
205 }
206 }
207 },
208 FileStreamState::Scan {
209 reader,
210 partition_values,
211 next,
212 } => {
213 if let Some((next_open_future, _)) = next {
215 if let NextOpen::Pending(f) = next_open_future {
216 if let Poll::Ready(reader) = f.as_mut().poll(cx) {
217 *next_open_future = NextOpen::Ready(reader);
218 }
219 }
220 }
221 match ready!(reader.poll_next_unpin(cx)) {
222 Some(Ok(batch)) => {
223 self.file_stream_metrics.time_scanning_until_data.stop();
224 self.file_stream_metrics.time_scanning_total.stop();
225 let result = self
226 .pc_projector
227 .project(batch, partition_values)
228 .map_err(|e| ArrowError::ExternalError(e.into()))
229 .map(|batch| match &mut self.remain {
230 Some(remain) => {
231 if *remain > batch.num_rows() {
232 *remain -= batch.num_rows();
233 batch
234 } else {
235 let batch = batch.slice(0, *remain);
236 self.state = FileStreamState::Limit;
237 *remain = 0;
238 batch
239 }
240 }
241 None => batch,
242 });
243
244 if result.is_err() {
245 self.state = FileStreamState::Error
248 }
249 self.file_stream_metrics.time_scanning_total.start();
250 return Poll::Ready(Some(result.map_err(Into::into)));
251 }
252 Some(Err(err)) => {
253 self.file_stream_metrics.file_scan_errors.add(1);
254 self.file_stream_metrics.time_scanning_until_data.stop();
255 self.file_stream_metrics.time_scanning_total.stop();
256
257 match self.on_error {
258 OnError::Skip => match mem::take(next) {
260 Some((future, partition_values)) => {
261 self.file_stream_metrics.time_opening.start();
262
263 match future {
264 NextOpen::Pending(future) => {
265 self.state = FileStreamState::Open {
266 future,
267 partition_values,
268 }
269 }
270 NextOpen::Ready(reader) => {
271 self.state = FileStreamState::Open {
272 future: Box::pin(std::future::ready(
273 reader,
274 )),
275 partition_values,
276 }
277 }
278 }
279 }
280 None => return Poll::Ready(None),
281 },
282 OnError::Fail => {
283 self.state = FileStreamState::Error;
284 return Poll::Ready(Some(Err(err.into())));
285 }
286 }
287 }
288 None => {
289 self.file_stream_metrics.time_scanning_until_data.stop();
290 self.file_stream_metrics.time_scanning_total.stop();
291
292 match mem::take(next) {
293 Some((future, partition_values)) => {
294 self.file_stream_metrics.time_opening.start();
295
296 match future {
297 NextOpen::Pending(future) => {
298 self.state = FileStreamState::Open {
299 future,
300 partition_values,
301 }
302 }
303 NextOpen::Ready(reader) => {
304 self.state = FileStreamState::Open {
305 future: Box::pin(std::future::ready(
306 reader,
307 )),
308 partition_values,
309 }
310 }
311 }
312 }
313 None => return Poll::Ready(None),
314 }
315 }
316 }
317 }
318 FileStreamState::Error | FileStreamState::Limit => {
319 return Poll::Ready(None)
320 }
321 }
322 }
323 }
324}
325
326impl Stream for FileStream {
327 type Item = Result<RecordBatch>;
328
329 fn poll_next(
330 mut self: Pin<&mut Self>,
331 cx: &mut Context<'_>,
332 ) -> Poll<Option<Self::Item>> {
333 self.file_stream_metrics.time_processing.start();
334 let result = self.poll_inner(cx);
335 self.file_stream_metrics.time_processing.stop();
336 self.baseline_metrics.record_poll(result)
337 }
338}
339
340impl RecordBatchStream for FileStream {
341 fn schema(&self) -> SchemaRef {
342 Arc::clone(&self.projected_schema)
343 }
344}
345
346pub type FileOpenFuture =
348 BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>;
349
350pub enum OnError {
352 Fail,
354 Skip,
356}
357
358impl Default for OnError {
359 fn default() -> Self {
360 Self::Fail
361 }
362}
363
364pub trait FileOpener: Unpin + Send + Sync {
369 fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture>;
372}
373
374pub enum NextOpen {
378 Pending(FileOpenFuture),
379 Ready(Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>),
380}
381
382pub enum FileStreamState {
383 Idle,
385 Open {
388 future: FileOpenFuture,
390 partition_values: Vec<ScalarValue>,
392 },
393 Scan {
396 partition_values: Vec<ScalarValue>,
398 reader: BoxStream<'static, Result<RecordBatch, ArrowError>>,
400 next: Option<(NextOpen, Vec<ScalarValue>)>,
405 },
406 Error,
408 Limit,
410}
411
412pub struct StartableTime {
414 pub metrics: Time,
415 pub start: Option<Instant>,
417}
418
419impl StartableTime {
420 pub fn start(&mut self) {
421 assert!(self.start.is_none());
422 self.start = Some(Instant::now());
423 }
424
425 pub fn stop(&mut self) {
426 if let Some(start) = self.start.take() {
427 self.metrics.add_elapsed(start);
428 }
429 }
430}
431
432#[allow(rustdoc::broken_intra_doc_links)]
433pub struct FileStreamMetrics {
441 pub time_opening: StartableTime,
452 pub time_scanning_until_data: StartableTime,
458 pub time_scanning_total: StartableTime,
465 pub time_processing: StartableTime,
469 pub file_open_errors: Count,
474 pub file_scan_errors: Count,
479}
480
481impl FileStreamMetrics {
482 pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
483 let time_opening = StartableTime {
484 metrics: MetricBuilder::new(metrics)
485 .subset_time("time_elapsed_opening", partition),
486 start: None,
487 };
488
489 let time_scanning_until_data = StartableTime {
490 metrics: MetricBuilder::new(metrics)
491 .subset_time("time_elapsed_scanning_until_data", partition),
492 start: None,
493 };
494
495 let time_scanning_total = StartableTime {
496 metrics: MetricBuilder::new(metrics)
497 .subset_time("time_elapsed_scanning_total", partition),
498 start: None,
499 };
500
501 let time_processing = StartableTime {
502 metrics: MetricBuilder::new(metrics)
503 .subset_time("time_elapsed_processing", partition),
504 start: None,
505 };
506
507 let file_open_errors =
508 MetricBuilder::new(metrics).counter("file_open_errors", partition);
509
510 let file_scan_errors =
511 MetricBuilder::new(metrics).counter("file_scan_errors", partition);
512
513 Self {
514 time_opening,
515 time_scanning_until_data,
516 time_scanning_total,
517 time_processing,
518 file_open_errors,
519 file_scan_errors,
520 }
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use crate::file_scan_config::FileScanConfigBuilder;
527 use crate::tests::make_partition;
528 use crate::PartitionedFile;
529 use arrow::error::ArrowError;
530 use datafusion_common::error::Result;
531 use datafusion_execution::object_store::ObjectStoreUrl;
532 use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
533 use futures::{FutureExt as _, StreamExt as _};
534 use std::sync::atomic::{AtomicUsize, Ordering};
535 use std::sync::Arc;
536
537 use crate::file_meta::FileMeta;
538 use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
539 use crate::test_util::MockSource;
540 use arrow::array::RecordBatch;
541 use arrow::datatypes::Schema;
542
543 use datafusion_common::{assert_batches_eq, internal_err};
544
545 #[derive(Default)]
547 struct TestOpener {
548 error_opening_idx: Vec<usize>,
550 error_scanning_idx: Vec<usize>,
552 current_idx: AtomicUsize,
554 records: Vec<RecordBatch>,
556 }
557
558 impl FileOpener for TestOpener {
559 fn open(
560 &self,
561 _file_meta: FileMeta,
562 _file: PartitionedFile,
563 ) -> Result<FileOpenFuture> {
564 let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
565
566 if self.error_opening_idx.contains(&idx) {
567 Ok(futures::future::ready(internal_err!("error opening")).boxed())
568 } else if self.error_scanning_idx.contains(&idx) {
569 let error = futures::future::ready(Err(ArrowError::IpcError(
570 "error scanning".to_owned(),
571 )));
572 let stream = futures::stream::once(error).boxed();
573 Ok(futures::future::ready(Ok(stream)).boxed())
574 } else {
575 let iterator = self.records.clone().into_iter().map(Ok);
576 let stream = futures::stream::iter(iterator).boxed();
577 Ok(futures::future::ready(Ok(stream)).boxed())
578 }
579 }
580 }
581
582 #[derive(Default)]
583 struct FileStreamTest {
584 num_files: usize,
586 limit: Option<usize>,
588 on_error: OnError,
590 opener: TestOpener,
592 }
593
594 impl FileStreamTest {
595 pub fn new() -> Self {
596 Self::default()
597 }
598
599 pub fn with_num_files(mut self, num_files: usize) -> Self {
601 self.num_files = num_files;
602 self
603 }
604
605 pub fn with_limit(mut self, limit: Option<usize>) -> Self {
607 self.limit = limit;
608 self
609 }
610
611 pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
614 self.opener.error_opening_idx = idx;
615 self
616 }
617
618 pub fn with_scan_errors(mut self, idx: Vec<usize>) -> Self {
621 self.opener.error_scanning_idx = idx;
622 self
623 }
624
625 pub fn with_on_error(mut self, on_error: OnError) -> Self {
627 self.on_error = on_error;
628 self
629 }
630
631 pub fn with_records(mut self, records: Vec<RecordBatch>) -> Self {
634 self.opener.records = records;
635 self
636 }
637
638 pub async fn result(self) -> Result<Vec<RecordBatch>> {
640 let file_schema = self
641 .opener
642 .records
643 .first()
644 .map(|batch| batch.schema())
645 .unwrap_or_else(|| Arc::new(Schema::empty()));
646
647 let mock_files: Vec<(String, u64)> = (0..self.num_files)
649 .map(|idx| (format!("mock_file{idx}"), 10_u64))
650 .collect();
651
652 let file_group = mock_files
658 .into_iter()
659 .map(|(name, size)| PartitionedFile::new(name, size))
660 .collect();
661
662 let on_error = self.on_error;
663
664 let config = FileScanConfigBuilder::new(
665 ObjectStoreUrl::parse("test:///").unwrap(),
666 file_schema,
667 Arc::new(MockSource::default()),
668 )
669 .with_file_group(file_group)
670 .with_limit(self.limit)
671 .build();
672 let metrics_set = ExecutionPlanMetricsSet::new();
673 let file_stream =
674 FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set)
675 .unwrap()
676 .with_on_error(on_error);
677
678 file_stream
679 .collect::<Vec<_>>()
680 .await
681 .into_iter()
682 .collect::<Result<Vec<_>>>()
683 }
684 }
685
686 async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
688 FileStreamTest::new()
689 .with_records(vec![make_partition(3), make_partition(2)])
690 .with_num_files(2)
691 .with_limit(limit)
692 .result()
693 .await
694 .expect("error executing stream")
695 }
696
697 #[tokio::test]
698 async fn on_error_opening() -> Result<()> {
699 let batches = FileStreamTest::new()
700 .with_records(vec![make_partition(3), make_partition(2)])
701 .with_num_files(2)
702 .with_on_error(OnError::Skip)
703 .with_open_errors(vec![0])
704 .result()
705 .await?;
706
707 #[rustfmt::skip]
708 assert_batches_eq!(&[
709 "+---+",
710 "| i |",
711 "+---+",
712 "| 0 |",
713 "| 1 |",
714 "| 2 |",
715 "| 0 |",
716 "| 1 |",
717 "+---+",
718 ], &batches);
719
720 let batches = FileStreamTest::new()
721 .with_records(vec![make_partition(3), make_partition(2)])
722 .with_num_files(2)
723 .with_on_error(OnError::Skip)
724 .with_open_errors(vec![1])
725 .result()
726 .await?;
727
728 #[rustfmt::skip]
729 assert_batches_eq!(&[
730 "+---+",
731 "| i |",
732 "+---+",
733 "| 0 |",
734 "| 1 |",
735 "| 2 |",
736 "| 0 |",
737 "| 1 |",
738 "+---+",
739 ], &batches);
740
741 let batches = FileStreamTest::new()
742 .with_records(vec![make_partition(3), make_partition(2)])
743 .with_num_files(2)
744 .with_on_error(OnError::Skip)
745 .with_open_errors(vec![0, 1])
746 .result()
747 .await?;
748
749 #[rustfmt::skip]
750 assert_batches_eq!(&[
751 "++",
752 "++",
753 ], &batches);
754
755 Ok(())
756 }
757
758 #[tokio::test]
759 async fn on_error_scanning_fail() -> Result<()> {
760 let result = FileStreamTest::new()
761 .with_records(vec![make_partition(3), make_partition(2)])
762 .with_num_files(2)
763 .with_on_error(OnError::Fail)
764 .with_scan_errors(vec![1])
765 .result()
766 .await;
767
768 assert!(result.is_err());
769
770 Ok(())
771 }
772
773 #[tokio::test]
774 async fn on_error_opening_fail() -> Result<()> {
775 let result = FileStreamTest::new()
776 .with_records(vec![make_partition(3), make_partition(2)])
777 .with_num_files(2)
778 .with_on_error(OnError::Fail)
779 .with_open_errors(vec![1])
780 .result()
781 .await;
782
783 assert!(result.is_err());
784
785 Ok(())
786 }
787
788 #[tokio::test]
789 async fn on_error_scanning() -> Result<()> {
790 let batches = FileStreamTest::new()
791 .with_records(vec![make_partition(3), make_partition(2)])
792 .with_num_files(2)
793 .with_on_error(OnError::Skip)
794 .with_scan_errors(vec![0])
795 .result()
796 .await?;
797
798 #[rustfmt::skip]
799 assert_batches_eq!(&[
800 "+---+",
801 "| i |",
802 "+---+",
803 "| 0 |",
804 "| 1 |",
805 "| 2 |",
806 "| 0 |",
807 "| 1 |",
808 "+---+",
809 ], &batches);
810
811 let batches = FileStreamTest::new()
812 .with_records(vec![make_partition(3), make_partition(2)])
813 .with_num_files(2)
814 .with_on_error(OnError::Skip)
815 .with_scan_errors(vec![1])
816 .result()
817 .await?;
818
819 #[rustfmt::skip]
820 assert_batches_eq!(&[
821 "+---+",
822 "| i |",
823 "+---+",
824 "| 0 |",
825 "| 1 |",
826 "| 2 |",
827 "| 0 |",
828 "| 1 |",
829 "+---+",
830 ], &batches);
831
832 let batches = FileStreamTest::new()
833 .with_records(vec![make_partition(3), make_partition(2)])
834 .with_num_files(2)
835 .with_on_error(OnError::Skip)
836 .with_scan_errors(vec![0, 1])
837 .result()
838 .await?;
839
840 #[rustfmt::skip]
841 assert_batches_eq!(&[
842 "++",
843 "++",
844 ], &batches);
845
846 Ok(())
847 }
848
849 #[tokio::test]
850 async fn on_error_mixed() -> Result<()> {
851 let batches = FileStreamTest::new()
852 .with_records(vec![make_partition(3), make_partition(2)])
853 .with_num_files(3)
854 .with_on_error(OnError::Skip)
855 .with_open_errors(vec![1])
856 .with_scan_errors(vec![0])
857 .result()
858 .await?;
859
860 #[rustfmt::skip]
861 assert_batches_eq!(&[
862 "+---+",
863 "| i |",
864 "+---+",
865 "| 0 |",
866 "| 1 |",
867 "| 2 |",
868 "| 0 |",
869 "| 1 |",
870 "+---+",
871 ], &batches);
872
873 let batches = FileStreamTest::new()
874 .with_records(vec![make_partition(3), make_partition(2)])
875 .with_num_files(3)
876 .with_on_error(OnError::Skip)
877 .with_open_errors(vec![0])
878 .with_scan_errors(vec![1])
879 .result()
880 .await?;
881
882 #[rustfmt::skip]
883 assert_batches_eq!(&[
884 "+---+",
885 "| i |",
886 "+---+",
887 "| 0 |",
888 "| 1 |",
889 "| 2 |",
890 "| 0 |",
891 "| 1 |",
892 "+---+",
893 ], &batches);
894
895 let batches = FileStreamTest::new()
896 .with_records(vec![make_partition(3), make_partition(2)])
897 .with_num_files(3)
898 .with_on_error(OnError::Skip)
899 .with_open_errors(vec![2])
900 .with_scan_errors(vec![0, 1])
901 .result()
902 .await?;
903
904 #[rustfmt::skip]
905 assert_batches_eq!(&[
906 "++",
907 "++",
908 ], &batches);
909
910 let batches = FileStreamTest::new()
911 .with_records(vec![make_partition(3), make_partition(2)])
912 .with_num_files(3)
913 .with_on_error(OnError::Skip)
914 .with_open_errors(vec![0, 2])
915 .with_scan_errors(vec![1])
916 .result()
917 .await?;
918
919 #[rustfmt::skip]
920 assert_batches_eq!(&[
921 "++",
922 "++",
923 ], &batches);
924
925 Ok(())
926 }
927
928 #[tokio::test]
929 async fn without_limit() -> Result<()> {
930 let batches = create_and_collect(None).await;
931
932 #[rustfmt::skip]
933 assert_batches_eq!(&[
934 "+---+",
935 "| i |",
936 "+---+",
937 "| 0 |",
938 "| 1 |",
939 "| 2 |",
940 "| 0 |",
941 "| 1 |",
942 "| 0 |",
943 "| 1 |",
944 "| 2 |",
945 "| 0 |",
946 "| 1 |",
947 "+---+",
948 ], &batches);
949
950 Ok(())
951 }
952
953 #[tokio::test]
954 async fn with_limit_between_files() -> Result<()> {
955 let batches = create_and_collect(Some(5)).await;
956 #[rustfmt::skip]
957 assert_batches_eq!(&[
958 "+---+",
959 "| i |",
960 "+---+",
961 "| 0 |",
962 "| 1 |",
963 "| 2 |",
964 "| 0 |",
965 "| 1 |",
966 "+---+",
967 ], &batches);
968
969 Ok(())
970 }
971
972 #[tokio::test]
973 async fn with_limit_at_middle_of_batch() -> Result<()> {
974 let batches = create_and_collect(Some(6)).await;
975 #[rustfmt::skip]
976 assert_batches_eq!(&[
977 "+---+",
978 "| i |",
979 "+---+",
980 "| 0 |",
981 "| 1 |",
982 "| 2 |",
983 "| 0 |",
984 "| 1 |",
985 "| 0 |",
986 "+---+",
987 ], &batches);
988
989 Ok(())
990 }
991}