datafusion_datasource/
file_stream.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A generic stream over file format readers that can be used by
19//! any file format that read its files from start to end.
20//!
21//! Note: Most traits here need to be marked `Sync + Send` to be
22//! compliant with the `SendableRecordBatchStream` trait.
23
24use std::collections::VecDeque;
25use std::mem;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29
30use crate::file_meta::FileMeta;
31use crate::file_scan_config::{FileScanConfig, PartitionColumnProjector};
32use crate::PartitionedFile;
33use arrow::datatypes::SchemaRef;
34use datafusion_common::error::Result;
35use datafusion_execution::RecordBatchStream;
36use datafusion_physical_plan::metrics::{
37    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
38};
39
40use arrow::error::ArrowError;
41use arrow::record_batch::RecordBatch;
42use datafusion_common::instant::Instant;
43use datafusion_common::ScalarValue;
44
45use futures::future::BoxFuture;
46use futures::stream::BoxStream;
47use futures::{ready, FutureExt as _, Stream, StreamExt as _};
48
49/// A stream that iterates record batch by record batch, file over file.
50pub struct FileStream {
51    /// An iterator over input files.
52    file_iter: VecDeque<PartitionedFile>,
53    /// The stream schema (file schema including partition columns and after
54    /// projection).
55    projected_schema: SchemaRef,
56    /// The remaining number of records to parse, None if no limit
57    remain: Option<usize>,
58    /// A dynamic [`FileOpener`]. Calling `open()` returns a [`FileOpenFuture`],
59    /// which can be resolved to a stream of `RecordBatch`.
60    file_opener: Arc<dyn FileOpener>,
61    /// The partition column projector
62    pc_projector: PartitionColumnProjector,
63    /// The stream state
64    state: FileStreamState,
65    /// File stream specific metrics
66    file_stream_metrics: FileStreamMetrics,
67    /// runtime baseline metrics
68    baseline_metrics: BaselineMetrics,
69    /// Describes the behavior of the `FileStream` if file opening or scanning fails
70    on_error: OnError,
71}
72
73impl FileStream {
74    /// Create a new `FileStream` using the give `FileOpener` to scan underlying files
75    pub fn new(
76        config: &FileScanConfig,
77        partition: usize,
78        file_opener: Arc<dyn FileOpener>,
79        metrics: &ExecutionPlanMetricsSet,
80    ) -> Result<Self> {
81        let projected_schema = config.projected_schema();
82        let pc_projector = PartitionColumnProjector::new(
83            Arc::clone(&projected_schema),
84            &config
85                .table_partition_cols
86                .iter()
87                .map(|x| x.name().clone())
88                .collect::<Vec<_>>(),
89        );
90
91        let file_group = config.file_groups[partition].clone();
92
93        Ok(Self {
94            file_iter: file_group.into_inner().into_iter().collect(),
95            projected_schema,
96            remain: config.limit,
97            file_opener,
98            pc_projector,
99            state: FileStreamState::Idle,
100            file_stream_metrics: FileStreamMetrics::new(metrics, partition),
101            baseline_metrics: BaselineMetrics::new(metrics, partition),
102            on_error: OnError::Fail,
103        })
104    }
105
106    /// Specify the behavior when an error occurs opening or scanning a file
107    ///
108    /// If `OnError::Skip` the stream will skip files which encounter an error and continue
109    /// If `OnError:Fail` (default) the stream will fail and stop processing when an error occurs
110    pub fn with_on_error(mut self, on_error: OnError) -> Self {
111        self.on_error = on_error;
112        self
113    }
114
115    /// Begin opening the next file in parallel while decoding the current file in FileStream.
116    ///
117    /// Since file opening is mostly IO (and may involve a
118    /// bunch of sequential IO), it can be parallelized with decoding.
119    fn start_next_file(&mut self) -> Option<Result<(FileOpenFuture, Vec<ScalarValue>)>> {
120        let part_file = self.file_iter.pop_front()?;
121
122        let file_meta = FileMeta {
123            object_meta: part_file.object_meta.clone(),
124            range: part_file.range.clone(),
125            extensions: part_file.extensions.clone(),
126            metadata_size_hint: part_file.metadata_size_hint,
127        };
128
129        let partition_values = part_file.partition_values.clone();
130        Some(
131            self.file_opener
132                .open(file_meta, part_file)
133                .map(|future| (future, partition_values)),
134        )
135    }
136
137    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
138        loop {
139            match &mut self.state {
140                FileStreamState::Idle => {
141                    self.file_stream_metrics.time_opening.start();
142
143                    match self.start_next_file().transpose() {
144                        Ok(Some((future, partition_values))) => {
145                            self.state = FileStreamState::Open {
146                                future,
147                                partition_values,
148                            }
149                        }
150                        Ok(None) => return Poll::Ready(None),
151                        Err(e) => {
152                            self.state = FileStreamState::Error;
153                            return Poll::Ready(Some(Err(e)));
154                        }
155                    }
156                }
157                FileStreamState::Open {
158                    future,
159                    partition_values,
160                } => match ready!(future.poll_unpin(cx)) {
161                    Ok(reader) => {
162                        let partition_values = mem::take(partition_values);
163
164                        // include time needed to start opening in `start_next_file`
165                        self.file_stream_metrics.time_opening.stop();
166                        let next = self.start_next_file().transpose();
167                        self.file_stream_metrics.time_scanning_until_data.start();
168                        self.file_stream_metrics.time_scanning_total.start();
169
170                        match next {
171                            Ok(Some((next_future, next_partition_values))) => {
172                                self.state = FileStreamState::Scan {
173                                    partition_values,
174                                    reader,
175                                    next: Some((
176                                        NextOpen::Pending(next_future),
177                                        next_partition_values,
178                                    )),
179                                };
180                            }
181                            Ok(None) => {
182                                self.state = FileStreamState::Scan {
183                                    reader,
184                                    partition_values,
185                                    next: None,
186                                };
187                            }
188                            Err(e) => {
189                                self.state = FileStreamState::Error;
190                                return Poll::Ready(Some(Err(e)));
191                            }
192                        }
193                    }
194                    Err(e) => {
195                        self.file_stream_metrics.file_open_errors.add(1);
196                        match self.on_error {
197                            OnError::Skip => {
198                                self.file_stream_metrics.time_opening.stop();
199                                self.state = FileStreamState::Idle
200                            }
201                            OnError::Fail => {
202                                self.state = FileStreamState::Error;
203                                return Poll::Ready(Some(Err(e)));
204                            }
205                        }
206                    }
207                },
208                FileStreamState::Scan {
209                    reader,
210                    partition_values,
211                    next,
212                } => {
213                    // We need to poll the next `FileOpenFuture` here to drive it forward
214                    if let Some((next_open_future, _)) = next {
215                        if let NextOpen::Pending(f) = next_open_future {
216                            if let Poll::Ready(reader) = f.as_mut().poll(cx) {
217                                *next_open_future = NextOpen::Ready(reader);
218                            }
219                        }
220                    }
221                    match ready!(reader.poll_next_unpin(cx)) {
222                        Some(Ok(batch)) => {
223                            self.file_stream_metrics.time_scanning_until_data.stop();
224                            self.file_stream_metrics.time_scanning_total.stop();
225                            let result = self
226                                .pc_projector
227                                .project(batch, partition_values)
228                                .map_err(|e| ArrowError::ExternalError(e.into()))
229                                .map(|batch| match &mut self.remain {
230                                    Some(remain) => {
231                                        if *remain > batch.num_rows() {
232                                            *remain -= batch.num_rows();
233                                            batch
234                                        } else {
235                                            let batch = batch.slice(0, *remain);
236                                            self.state = FileStreamState::Limit;
237                                            *remain = 0;
238                                            batch
239                                        }
240                                    }
241                                    None => batch,
242                                });
243
244                            if result.is_err() {
245                                // If the partition value projection fails, this is not governed by
246                                // the `OnError` behavior
247                                self.state = FileStreamState::Error
248                            }
249                            self.file_stream_metrics.time_scanning_total.start();
250                            return Poll::Ready(Some(result.map_err(Into::into)));
251                        }
252                        Some(Err(err)) => {
253                            self.file_stream_metrics.file_scan_errors.add(1);
254                            self.file_stream_metrics.time_scanning_until_data.stop();
255                            self.file_stream_metrics.time_scanning_total.stop();
256
257                            match self.on_error {
258                                // If `OnError::Skip` we skip the file as soon as we hit the first error
259                                OnError::Skip => match mem::take(next) {
260                                    Some((future, partition_values)) => {
261                                        self.file_stream_metrics.time_opening.start();
262
263                                        match future {
264                                            NextOpen::Pending(future) => {
265                                                self.state = FileStreamState::Open {
266                                                    future,
267                                                    partition_values,
268                                                }
269                                            }
270                                            NextOpen::Ready(reader) => {
271                                                self.state = FileStreamState::Open {
272                                                    future: Box::pin(std::future::ready(
273                                                        reader,
274                                                    )),
275                                                    partition_values,
276                                                }
277                                            }
278                                        }
279                                    }
280                                    None => return Poll::Ready(None),
281                                },
282                                OnError::Fail => {
283                                    self.state = FileStreamState::Error;
284                                    return Poll::Ready(Some(Err(err.into())));
285                                }
286                            }
287                        }
288                        None => {
289                            self.file_stream_metrics.time_scanning_until_data.stop();
290                            self.file_stream_metrics.time_scanning_total.stop();
291
292                            match mem::take(next) {
293                                Some((future, partition_values)) => {
294                                    self.file_stream_metrics.time_opening.start();
295
296                                    match future {
297                                        NextOpen::Pending(future) => {
298                                            self.state = FileStreamState::Open {
299                                                future,
300                                                partition_values,
301                                            }
302                                        }
303                                        NextOpen::Ready(reader) => {
304                                            self.state = FileStreamState::Open {
305                                                future: Box::pin(std::future::ready(
306                                                    reader,
307                                                )),
308                                                partition_values,
309                                            }
310                                        }
311                                    }
312                                }
313                                None => return Poll::Ready(None),
314                            }
315                        }
316                    }
317                }
318                FileStreamState::Error | FileStreamState::Limit => {
319                    return Poll::Ready(None)
320                }
321            }
322        }
323    }
324}
325
326impl Stream for FileStream {
327    type Item = Result<RecordBatch>;
328
329    fn poll_next(
330        mut self: Pin<&mut Self>,
331        cx: &mut Context<'_>,
332    ) -> Poll<Option<Self::Item>> {
333        self.file_stream_metrics.time_processing.start();
334        let result = self.poll_inner(cx);
335        self.file_stream_metrics.time_processing.stop();
336        self.baseline_metrics.record_poll(result)
337    }
338}
339
340impl RecordBatchStream for FileStream {
341    fn schema(&self) -> SchemaRef {
342        Arc::clone(&self.projected_schema)
343    }
344}
345
346/// A fallible future that resolves to a stream of [`RecordBatch`]
347pub type FileOpenFuture =
348    BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>;
349
350/// Describes the behavior of the `FileStream` if file opening or scanning fails
351pub enum OnError {
352    /// Fail the entire stream and return the underlying error
353    Fail,
354    /// Continue scanning, ignoring the failed file
355    Skip,
356}
357
358impl Default for OnError {
359    fn default() -> Self {
360        Self::Fail
361    }
362}
363
364/// Generic API for opening a file using an [`ObjectStore`] and resolving to a
365/// stream of [`RecordBatch`]
366///
367/// [`ObjectStore`]: object_store::ObjectStore
368pub trait FileOpener: Unpin + Send + Sync {
369    /// Asynchronously open the specified file and return a stream
370    /// of [`RecordBatch`]
371    fn open(&self, file_meta: FileMeta, file: PartitionedFile) -> Result<FileOpenFuture>;
372}
373
374/// Represents the state of the next `FileOpenFuture`. Since we need to poll
375/// this future while scanning the current file, we need to store the result if it
376/// is ready
377pub enum NextOpen {
378    Pending(FileOpenFuture),
379    Ready(Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>),
380}
381
382pub enum FileStreamState {
383    /// The idle state, no file is currently being read
384    Idle,
385    /// Currently performing asynchronous IO to obtain a stream of RecordBatch
386    /// for a given file
387    Open {
388        /// A [`FileOpenFuture`] returned by [`FileOpener::open`]
389        future: FileOpenFuture,
390        /// The partition values for this file
391        partition_values: Vec<ScalarValue>,
392    },
393    /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`]
394    /// returned by [`FileOpener::open`]
395    Scan {
396        /// Partitioning column values for the current batch_iter
397        partition_values: Vec<ScalarValue>,
398        /// The reader instance
399        reader: BoxStream<'static, Result<RecordBatch, ArrowError>>,
400        /// A [`FileOpenFuture`] for the next file to be processed,
401        /// and its corresponding partition column values, if any.
402        /// This allows the next file to be opened in parallel while the
403        /// current file is read.
404        next: Option<(NextOpen, Vec<ScalarValue>)>,
405    },
406    /// Encountered an error
407    Error,
408    /// Reached the row limit
409    Limit,
410}
411
412/// A timer that can be started and stopped.
413pub struct StartableTime {
414    pub metrics: Time,
415    // use for record each part cost time, will eventually add into 'metrics'.
416    pub start: Option<Instant>,
417}
418
419impl StartableTime {
420    pub fn start(&mut self) {
421        assert!(self.start.is_none());
422        self.start = Some(Instant::now());
423    }
424
425    pub fn stop(&mut self) {
426        if let Some(start) = self.start.take() {
427            self.metrics.add_elapsed(start);
428        }
429    }
430}
431
432#[allow(rustdoc::broken_intra_doc_links)]
433/// Metrics for [`FileStream`]
434///
435/// Note that all of these metrics are in terms of wall clock time
436/// (not cpu time) so they include time spent waiting on I/O as well
437/// as other operators.
438///
439/// [`FileStream`]: <https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_stream.rs>
440pub struct FileStreamMetrics {
441    /// Wall clock time elapsed for file opening.
442    ///
443    /// Time between when [`FileOpener::open`] is called and when the
444    /// [`FileStream`] receives a stream for reading.
445    ///
446    /// If there are multiple files being scanned, the stream
447    /// will open the next file in the background while scanning the
448    /// current file. This metric will only capture time spent opening
449    /// while not also scanning.
450    /// [`FileStream`]: <https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_stream.rs>
451    pub time_opening: StartableTime,
452    /// Wall clock time elapsed for file scanning + first record batch of decompression + decoding
453    ///
454    /// Time between when the [`FileStream`] requests data from the
455    /// stream and when the first [`RecordBatch`] is produced.
456    /// [`FileStream`]: <https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_stream.rs>
457    pub time_scanning_until_data: StartableTime,
458    /// Total elapsed wall clock time for scanning + record batch decompression / decoding
459    ///
460    /// Sum of time between when the [`FileStream`] requests data from
461    /// the stream and when a [`RecordBatch`] is produced for all
462    /// record batches in the stream. Note that this metric also
463    /// includes the time of the parent operator's execution.
464    pub time_scanning_total: StartableTime,
465    /// Wall clock time elapsed for data decompression + decoding
466    ///
467    /// Time spent waiting for the FileStream's input.
468    pub time_processing: StartableTime,
469    /// Count of errors opening file.
470    ///
471    /// If using `OnError::Skip` this will provide a count of the number of files
472    /// which were skipped and will not be included in the scan results.
473    pub file_open_errors: Count,
474    /// Count of errors scanning file
475    ///
476    /// If using `OnError::Skip` this will provide a count of the number of files
477    /// which were skipped and will not be included in the scan results.
478    pub file_scan_errors: Count,
479}
480
481impl FileStreamMetrics {
482    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
483        let time_opening = StartableTime {
484            metrics: MetricBuilder::new(metrics)
485                .subset_time("time_elapsed_opening", partition),
486            start: None,
487        };
488
489        let time_scanning_until_data = StartableTime {
490            metrics: MetricBuilder::new(metrics)
491                .subset_time("time_elapsed_scanning_until_data", partition),
492            start: None,
493        };
494
495        let time_scanning_total = StartableTime {
496            metrics: MetricBuilder::new(metrics)
497                .subset_time("time_elapsed_scanning_total", partition),
498            start: None,
499        };
500
501        let time_processing = StartableTime {
502            metrics: MetricBuilder::new(metrics)
503                .subset_time("time_elapsed_processing", partition),
504            start: None,
505        };
506
507        let file_open_errors =
508            MetricBuilder::new(metrics).counter("file_open_errors", partition);
509
510        let file_scan_errors =
511            MetricBuilder::new(metrics).counter("file_scan_errors", partition);
512
513        Self {
514            time_opening,
515            time_scanning_until_data,
516            time_scanning_total,
517            time_processing,
518            file_open_errors,
519            file_scan_errors,
520        }
521    }
522}
523
524#[cfg(test)]
525mod tests {
526    use crate::file_scan_config::FileScanConfigBuilder;
527    use crate::tests::make_partition;
528    use crate::PartitionedFile;
529    use arrow::error::ArrowError;
530    use datafusion_common::error::Result;
531    use datafusion_execution::object_store::ObjectStoreUrl;
532    use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
533    use futures::{FutureExt as _, StreamExt as _};
534    use std::sync::atomic::{AtomicUsize, Ordering};
535    use std::sync::Arc;
536
537    use crate::file_meta::FileMeta;
538    use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
539    use crate::test_util::MockSource;
540    use arrow::array::RecordBatch;
541    use arrow::datatypes::Schema;
542
543    use datafusion_common::{assert_batches_eq, internal_err};
544
545    /// Test `FileOpener` which will simulate errors during file opening or scanning
546    #[derive(Default)]
547    struct TestOpener {
548        /// Index in stream of files which should throw an error while opening
549        error_opening_idx: Vec<usize>,
550        /// Index in stream of files which should throw an error while scanning
551        error_scanning_idx: Vec<usize>,
552        /// Index of last file in stream
553        current_idx: AtomicUsize,
554        /// `RecordBatch` to return
555        records: Vec<RecordBatch>,
556    }
557
558    impl FileOpener for TestOpener {
559        fn open(
560            &self,
561            _file_meta: FileMeta,
562            _file: PartitionedFile,
563        ) -> Result<FileOpenFuture> {
564            let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
565
566            if self.error_opening_idx.contains(&idx) {
567                Ok(futures::future::ready(internal_err!("error opening")).boxed())
568            } else if self.error_scanning_idx.contains(&idx) {
569                let error = futures::future::ready(Err(ArrowError::IpcError(
570                    "error scanning".to_owned(),
571                )));
572                let stream = futures::stream::once(error).boxed();
573                Ok(futures::future::ready(Ok(stream)).boxed())
574            } else {
575                let iterator = self.records.clone().into_iter().map(Ok);
576                let stream = futures::stream::iter(iterator).boxed();
577                Ok(futures::future::ready(Ok(stream)).boxed())
578            }
579        }
580    }
581
582    #[derive(Default)]
583    struct FileStreamTest {
584        /// Number of files in the stream
585        num_files: usize,
586        /// Global limit of records emitted by the stream
587        limit: Option<usize>,
588        /// Error-handling behavior of the stream
589        on_error: OnError,
590        /// Mock `FileOpener`
591        opener: TestOpener,
592    }
593
594    impl FileStreamTest {
595        pub fn new() -> Self {
596            Self::default()
597        }
598
599        /// Specify the number of files in the stream
600        pub fn with_num_files(mut self, num_files: usize) -> Self {
601            self.num_files = num_files;
602            self
603        }
604
605        /// Specify the limit
606        pub fn with_limit(mut self, limit: Option<usize>) -> Self {
607            self.limit = limit;
608            self
609        }
610
611        /// Specify the index of files in the stream which should
612        /// throw an error when opening
613        pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
614            self.opener.error_opening_idx = idx;
615            self
616        }
617
618        /// Specify the index of files in the stream which should
619        /// throw an error when scanning
620        pub fn with_scan_errors(mut self, idx: Vec<usize>) -> Self {
621            self.opener.error_scanning_idx = idx;
622            self
623        }
624
625        /// Specify the behavior of the stream when an error occurs
626        pub fn with_on_error(mut self, on_error: OnError) -> Self {
627            self.on_error = on_error;
628            self
629        }
630
631        /// Specify the record batches that should be returned from each
632        /// file that is successfully scanned
633        pub fn with_records(mut self, records: Vec<RecordBatch>) -> Self {
634            self.opener.records = records;
635            self
636        }
637
638        /// Collect the results of the `FileStream`
639        pub async fn result(self) -> Result<Vec<RecordBatch>> {
640            let file_schema = self
641                .opener
642                .records
643                .first()
644                .map(|batch| batch.schema())
645                .unwrap_or_else(|| Arc::new(Schema::empty()));
646
647            // let ctx = SessionContext::new();
648            let mock_files: Vec<(String, u64)> = (0..self.num_files)
649                .map(|idx| (format!("mock_file{idx}"), 10_u64))
650                .collect();
651
652            // let mock_files_ref: Vec<(&str, u64)> = mock_files
653            //     .iter()
654            //     .map(|(name, size)| (name.as_str(), *size))
655            //     .collect();
656
657            let file_group = mock_files
658                .into_iter()
659                .map(|(name, size)| PartitionedFile::new(name, size))
660                .collect();
661
662            let on_error = self.on_error;
663
664            let config = FileScanConfigBuilder::new(
665                ObjectStoreUrl::parse("test:///").unwrap(),
666                file_schema,
667                Arc::new(MockSource::default()),
668            )
669            .with_file_group(file_group)
670            .with_limit(self.limit)
671            .build();
672            let metrics_set = ExecutionPlanMetricsSet::new();
673            let file_stream =
674                FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set)
675                    .unwrap()
676                    .with_on_error(on_error);
677
678            file_stream
679                .collect::<Vec<_>>()
680                .await
681                .into_iter()
682                .collect::<Result<Vec<_>>>()
683        }
684    }
685
686    /// helper that creates a stream of 2 files with the same pair of batches in each ([0,1,2] and [0,1])
687    async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
688        FileStreamTest::new()
689            .with_records(vec![make_partition(3), make_partition(2)])
690            .with_num_files(2)
691            .with_limit(limit)
692            .result()
693            .await
694            .expect("error executing stream")
695    }
696
697    #[tokio::test]
698    async fn on_error_opening() -> Result<()> {
699        let batches = FileStreamTest::new()
700            .with_records(vec![make_partition(3), make_partition(2)])
701            .with_num_files(2)
702            .with_on_error(OnError::Skip)
703            .with_open_errors(vec![0])
704            .result()
705            .await?;
706
707        #[rustfmt::skip]
708        assert_batches_eq!(&[
709            "+---+",
710            "| i |",
711            "+---+",
712            "| 0 |",
713            "| 1 |",
714            "| 2 |",
715            "| 0 |",
716            "| 1 |",
717            "+---+",
718        ], &batches);
719
720        let batches = FileStreamTest::new()
721            .with_records(vec![make_partition(3), make_partition(2)])
722            .with_num_files(2)
723            .with_on_error(OnError::Skip)
724            .with_open_errors(vec![1])
725            .result()
726            .await?;
727
728        #[rustfmt::skip]
729        assert_batches_eq!(&[
730            "+---+",
731            "| i |",
732            "+---+",
733            "| 0 |",
734            "| 1 |",
735            "| 2 |",
736            "| 0 |",
737            "| 1 |",
738            "+---+",
739        ], &batches);
740
741        let batches = FileStreamTest::new()
742            .with_records(vec![make_partition(3), make_partition(2)])
743            .with_num_files(2)
744            .with_on_error(OnError::Skip)
745            .with_open_errors(vec![0, 1])
746            .result()
747            .await?;
748
749        #[rustfmt::skip]
750        assert_batches_eq!(&[
751            "++",
752            "++",
753        ], &batches);
754
755        Ok(())
756    }
757
758    #[tokio::test]
759    async fn on_error_scanning_fail() -> Result<()> {
760        let result = FileStreamTest::new()
761            .with_records(vec![make_partition(3), make_partition(2)])
762            .with_num_files(2)
763            .with_on_error(OnError::Fail)
764            .with_scan_errors(vec![1])
765            .result()
766            .await;
767
768        assert!(result.is_err());
769
770        Ok(())
771    }
772
773    #[tokio::test]
774    async fn on_error_opening_fail() -> Result<()> {
775        let result = FileStreamTest::new()
776            .with_records(vec![make_partition(3), make_partition(2)])
777            .with_num_files(2)
778            .with_on_error(OnError::Fail)
779            .with_open_errors(vec![1])
780            .result()
781            .await;
782
783        assert!(result.is_err());
784
785        Ok(())
786    }
787
788    #[tokio::test]
789    async fn on_error_scanning() -> Result<()> {
790        let batches = FileStreamTest::new()
791            .with_records(vec![make_partition(3), make_partition(2)])
792            .with_num_files(2)
793            .with_on_error(OnError::Skip)
794            .with_scan_errors(vec![0])
795            .result()
796            .await?;
797
798        #[rustfmt::skip]
799        assert_batches_eq!(&[
800            "+---+",
801            "| i |",
802            "+---+",
803            "| 0 |",
804            "| 1 |",
805            "| 2 |",
806            "| 0 |",
807            "| 1 |",
808            "+---+",
809        ], &batches);
810
811        let batches = FileStreamTest::new()
812            .with_records(vec![make_partition(3), make_partition(2)])
813            .with_num_files(2)
814            .with_on_error(OnError::Skip)
815            .with_scan_errors(vec![1])
816            .result()
817            .await?;
818
819        #[rustfmt::skip]
820        assert_batches_eq!(&[
821            "+---+",
822            "| i |",
823            "+---+",
824            "| 0 |",
825            "| 1 |",
826            "| 2 |",
827            "| 0 |",
828            "| 1 |",
829            "+---+",
830        ], &batches);
831
832        let batches = FileStreamTest::new()
833            .with_records(vec![make_partition(3), make_partition(2)])
834            .with_num_files(2)
835            .with_on_error(OnError::Skip)
836            .with_scan_errors(vec![0, 1])
837            .result()
838            .await?;
839
840        #[rustfmt::skip]
841        assert_batches_eq!(&[
842            "++",
843            "++",
844        ], &batches);
845
846        Ok(())
847    }
848
849    #[tokio::test]
850    async fn on_error_mixed() -> Result<()> {
851        let batches = FileStreamTest::new()
852            .with_records(vec![make_partition(3), make_partition(2)])
853            .with_num_files(3)
854            .with_on_error(OnError::Skip)
855            .with_open_errors(vec![1])
856            .with_scan_errors(vec![0])
857            .result()
858            .await?;
859
860        #[rustfmt::skip]
861        assert_batches_eq!(&[
862            "+---+",
863            "| i |",
864            "+---+",
865            "| 0 |",
866            "| 1 |",
867            "| 2 |",
868            "| 0 |",
869            "| 1 |",
870            "+---+",
871        ], &batches);
872
873        let batches = FileStreamTest::new()
874            .with_records(vec![make_partition(3), make_partition(2)])
875            .with_num_files(3)
876            .with_on_error(OnError::Skip)
877            .with_open_errors(vec![0])
878            .with_scan_errors(vec![1])
879            .result()
880            .await?;
881
882        #[rustfmt::skip]
883        assert_batches_eq!(&[
884            "+---+",
885            "| i |",
886            "+---+",
887            "| 0 |",
888            "| 1 |",
889            "| 2 |",
890            "| 0 |",
891            "| 1 |",
892            "+---+",
893        ], &batches);
894
895        let batches = FileStreamTest::new()
896            .with_records(vec![make_partition(3), make_partition(2)])
897            .with_num_files(3)
898            .with_on_error(OnError::Skip)
899            .with_open_errors(vec![2])
900            .with_scan_errors(vec![0, 1])
901            .result()
902            .await?;
903
904        #[rustfmt::skip]
905        assert_batches_eq!(&[
906            "++",
907            "++",
908        ], &batches);
909
910        let batches = FileStreamTest::new()
911            .with_records(vec![make_partition(3), make_partition(2)])
912            .with_num_files(3)
913            .with_on_error(OnError::Skip)
914            .with_open_errors(vec![0, 2])
915            .with_scan_errors(vec![1])
916            .result()
917            .await?;
918
919        #[rustfmt::skip]
920        assert_batches_eq!(&[
921            "++",
922            "++",
923        ], &batches);
924
925        Ok(())
926    }
927
928    #[tokio::test]
929    async fn without_limit() -> Result<()> {
930        let batches = create_and_collect(None).await;
931
932        #[rustfmt::skip]
933        assert_batches_eq!(&[
934            "+---+",
935            "| i |",
936            "+---+",
937            "| 0 |",
938            "| 1 |",
939            "| 2 |",
940            "| 0 |",
941            "| 1 |",
942            "| 0 |",
943            "| 1 |",
944            "| 2 |",
945            "| 0 |",
946            "| 1 |",
947            "+---+",
948        ], &batches);
949
950        Ok(())
951    }
952
953    #[tokio::test]
954    async fn with_limit_between_files() -> Result<()> {
955        let batches = create_and_collect(Some(5)).await;
956        #[rustfmt::skip]
957        assert_batches_eq!(&[
958            "+---+",
959            "| i |",
960            "+---+",
961            "| 0 |",
962            "| 1 |",
963            "| 2 |",
964            "| 0 |",
965            "| 1 |",
966            "+---+",
967        ], &batches);
968
969        Ok(())
970    }
971
972    #[tokio::test]
973    async fn with_limit_at_middle_of_batch() -> Result<()> {
974        let batches = create_and_collect(Some(6)).await;
975        #[rustfmt::skip]
976        assert_batches_eq!(&[
977            "+---+",
978            "| i |",
979            "+---+",
980            "| 0 |",
981            "| 1 |",
982            "| 2 |",
983            "| 0 |",
984            "| 1 |",
985            "| 0 |",
986            "+---+",
987        ], &batches);
988
989        Ok(())
990    }
991}