datafusion_datasource/
file_stream.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A generic stream over file format readers that can be used by
19//! any file format that read its files from start to end.
20//!
21//! Note: Most traits here need to be marked `Sync + Send` to be
22//! compliant with the `SendableRecordBatchStream` trait.
23
24use std::collections::VecDeque;
25use std::mem;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29
30use crate::PartitionedFile;
31use crate::file_scan_config::FileScanConfig;
32use arrow::datatypes::SchemaRef;
33use datafusion_common::error::Result;
34use datafusion_execution::RecordBatchStream;
35use datafusion_physical_plan::metrics::{
36    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
37};
38
39use arrow::record_batch::RecordBatch;
40use datafusion_common::instant::Instant;
41
42use futures::future::BoxFuture;
43use futures::stream::BoxStream;
44use futures::{FutureExt as _, Stream, StreamExt as _, ready};
45
46/// A stream that iterates record batch by record batch, file over file.
47pub struct FileStream {
48    /// An iterator over input files.
49    file_iter: VecDeque<PartitionedFile>,
50    /// The stream schema (file schema including partition columns and after
51    /// projection).
52    projected_schema: SchemaRef,
53    /// The remaining number of records to parse, None if no limit
54    remain: Option<usize>,
55    /// A dynamic [`FileOpener`]. Calling `open()` returns a [`FileOpenFuture`],
56    /// which can be resolved to a stream of `RecordBatch`.
57    file_opener: Arc<dyn FileOpener>,
58    /// The stream state
59    state: FileStreamState,
60    /// File stream specific metrics
61    file_stream_metrics: FileStreamMetrics,
62    /// runtime baseline metrics
63    baseline_metrics: BaselineMetrics,
64    /// Describes the behavior of the `FileStream` if file opening or scanning fails
65    on_error: OnError,
66}
67
68impl FileStream {
69    /// Create a new `FileStream` using the give `FileOpener` to scan underlying files
70    pub fn new(
71        config: &FileScanConfig,
72        partition: usize,
73        file_opener: Arc<dyn FileOpener>,
74        metrics: &ExecutionPlanMetricsSet,
75    ) -> Result<Self> {
76        let projected_schema = config.projected_schema()?;
77
78        let file_group = config.file_groups[partition].clone();
79
80        Ok(Self {
81            file_iter: file_group.into_inner().into_iter().collect(),
82            projected_schema,
83            remain: config.limit,
84            file_opener,
85            state: FileStreamState::Idle,
86            file_stream_metrics: FileStreamMetrics::new(metrics, partition),
87            baseline_metrics: BaselineMetrics::new(metrics, partition),
88            on_error: OnError::Fail,
89        })
90    }
91
92    /// Specify the behavior when an error occurs opening or scanning a file
93    ///
94    /// If `OnError::Skip` the stream will skip files which encounter an error and continue
95    /// If `OnError:Fail` (default) the stream will fail and stop processing when an error occurs
96    pub fn with_on_error(mut self, on_error: OnError) -> Self {
97        self.on_error = on_error;
98        self
99    }
100
101    /// Begin opening the next file in parallel while decoding the current file in FileStream.
102    ///
103    /// Since file opening is mostly IO (and may involve a
104    /// bunch of sequential IO), it can be parallelized with decoding.
105    fn start_next_file(&mut self) -> Option<Result<FileOpenFuture>> {
106        let part_file = self.file_iter.pop_front()?;
107        Some(self.file_opener.open(part_file))
108    }
109
110    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
111        loop {
112            match &mut self.state {
113                FileStreamState::Idle => {
114                    self.file_stream_metrics.time_opening.start();
115
116                    match self.start_next_file().transpose() {
117                        Ok(Some(future)) => self.state = FileStreamState::Open { future },
118                        Ok(None) => return Poll::Ready(None),
119                        Err(e) => {
120                            self.state = FileStreamState::Error;
121                            return Poll::Ready(Some(Err(e)));
122                        }
123                    }
124                }
125                FileStreamState::Open { future } => match ready!(future.poll_unpin(cx)) {
126                    Ok(reader) => {
127                        // include time needed to start opening in `start_next_file`
128                        self.file_stream_metrics.time_opening.stop();
129                        let next = self.start_next_file().transpose();
130                        self.file_stream_metrics.time_scanning_until_data.start();
131                        self.file_stream_metrics.time_scanning_total.start();
132
133                        match next {
134                            Ok(Some(next_future)) => {
135                                self.state = FileStreamState::Scan {
136                                    reader,
137                                    next: Some(NextOpen::Pending(next_future)),
138                                };
139                            }
140                            Ok(None) => {
141                                self.state = FileStreamState::Scan { reader, next: None };
142                            }
143                            Err(e) => {
144                                self.state = FileStreamState::Error;
145                                return Poll::Ready(Some(Err(e)));
146                            }
147                        }
148                    }
149                    Err(e) => {
150                        self.file_stream_metrics.file_open_errors.add(1);
151                        match self.on_error {
152                            OnError::Skip => {
153                                self.file_stream_metrics.time_opening.stop();
154                                self.state = FileStreamState::Idle
155                            }
156                            OnError::Fail => {
157                                self.state = FileStreamState::Error;
158                                return Poll::Ready(Some(Err(e)));
159                            }
160                        }
161                    }
162                },
163                FileStreamState::Scan { reader, next } => {
164                    // We need to poll the next `FileOpenFuture` here to drive it forward
165                    if let Some(next_open_future) = next
166                        && let NextOpen::Pending(f) = next_open_future
167                        && let Poll::Ready(reader) = f.as_mut().poll(cx)
168                    {
169                        *next_open_future = NextOpen::Ready(reader);
170                    }
171                    match ready!(reader.poll_next_unpin(cx)) {
172                        Some(Ok(batch)) => {
173                            self.file_stream_metrics.time_scanning_until_data.stop();
174                            self.file_stream_metrics.time_scanning_total.stop();
175                            let batch = match &mut self.remain {
176                                Some(remain) => {
177                                    if *remain > batch.num_rows() {
178                                        *remain -= batch.num_rows();
179                                        batch
180                                    } else {
181                                        let batch = batch.slice(0, *remain);
182                                        self.state = FileStreamState::Limit;
183                                        *remain = 0;
184                                        batch
185                                    }
186                                }
187                                None => batch,
188                            };
189                            self.file_stream_metrics.time_scanning_total.start();
190                            return Poll::Ready(Some(Ok(batch)));
191                        }
192                        Some(Err(err)) => {
193                            self.file_stream_metrics.file_scan_errors.add(1);
194                            self.file_stream_metrics.time_scanning_until_data.stop();
195                            self.file_stream_metrics.time_scanning_total.stop();
196
197                            match self.on_error {
198                                // If `OnError::Skip` we skip the file as soon as we hit the first error
199                                OnError::Skip => match mem::take(next) {
200                                    Some(future) => {
201                                        self.file_stream_metrics.time_opening.start();
202
203                                        match future {
204                                            NextOpen::Pending(future) => {
205                                                self.state =
206                                                    FileStreamState::Open { future }
207                                            }
208                                            NextOpen::Ready(reader) => {
209                                                self.state = FileStreamState::Open {
210                                                    future: Box::pin(std::future::ready(
211                                                        reader,
212                                                    )),
213                                                }
214                                            }
215                                        }
216                                    }
217                                    None => return Poll::Ready(None),
218                                },
219                                OnError::Fail => {
220                                    self.state = FileStreamState::Error;
221                                    return Poll::Ready(Some(Err(err)));
222                                }
223                            }
224                        }
225                        None => {
226                            self.file_stream_metrics.time_scanning_until_data.stop();
227                            self.file_stream_metrics.time_scanning_total.stop();
228
229                            match mem::take(next) {
230                                Some(future) => {
231                                    self.file_stream_metrics.time_opening.start();
232
233                                    match future {
234                                        NextOpen::Pending(future) => {
235                                            self.state = FileStreamState::Open { future }
236                                        }
237                                        NextOpen::Ready(reader) => {
238                                            self.state = FileStreamState::Open {
239                                                future: Box::pin(std::future::ready(
240                                                    reader,
241                                                )),
242                                            }
243                                        }
244                                    }
245                                }
246                                None => return Poll::Ready(None),
247                            }
248                        }
249                    }
250                }
251                FileStreamState::Error | FileStreamState::Limit => {
252                    return Poll::Ready(None);
253                }
254            }
255        }
256    }
257}
258
259impl Stream for FileStream {
260    type Item = Result<RecordBatch>;
261
262    fn poll_next(
263        mut self: Pin<&mut Self>,
264        cx: &mut Context<'_>,
265    ) -> Poll<Option<Self::Item>> {
266        self.file_stream_metrics.time_processing.start();
267        let result = self.poll_inner(cx);
268        self.file_stream_metrics.time_processing.stop();
269        self.baseline_metrics.record_poll(result)
270    }
271}
272
273impl RecordBatchStream for FileStream {
274    fn schema(&self) -> SchemaRef {
275        Arc::clone(&self.projected_schema)
276    }
277}
278
279/// A fallible future that resolves to a stream of [`RecordBatch`]
280pub type FileOpenFuture =
281    BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch>>>>;
282
283/// Describes the behavior of the `FileStream` if file opening or scanning fails
284#[derive(Default)]
285pub enum OnError {
286    /// Fail the entire stream and return the underlying error
287    #[default]
288    Fail,
289    /// Continue scanning, ignoring the failed file
290    Skip,
291}
292
293/// Generic API for opening a file using an [`ObjectStore`] and resolving to a
294/// stream of [`RecordBatch`]
295///
296/// [`ObjectStore`]: object_store::ObjectStore
297pub trait FileOpener: Unpin + Send + Sync {
298    /// Asynchronously open the specified file and return a stream
299    /// of [`RecordBatch`]
300    fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture>;
301}
302
303/// Represents the state of the next `FileOpenFuture`. Since we need to poll
304/// this future while scanning the current file, we need to store the result if it
305/// is ready
306pub enum NextOpen {
307    Pending(FileOpenFuture),
308    Ready(Result<BoxStream<'static, Result<RecordBatch>>>),
309}
310
311pub enum FileStreamState {
312    /// The idle state, no file is currently being read
313    Idle,
314    /// Currently performing asynchronous IO to obtain a stream of RecordBatch
315    /// for a given file
316    Open {
317        /// A [`FileOpenFuture`] returned by [`FileOpener::open`]
318        future: FileOpenFuture,
319    },
320    /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`]
321    /// returned by [`FileOpener::open`]
322    Scan {
323        /// The reader instance
324        reader: BoxStream<'static, Result<RecordBatch>>,
325        /// A [`FileOpenFuture`] for the next file to be processed.
326        /// This allows the next file to be opened in parallel while the
327        /// current file is read.
328        next: Option<NextOpen>,
329    },
330    /// Encountered an error
331    Error,
332    /// Reached the row limit
333    Limit,
334}
335
336/// A timer that can be started and stopped.
337pub struct StartableTime {
338    pub metrics: Time,
339    // use for record each part cost time, will eventually add into 'metrics'.
340    pub start: Option<Instant>,
341}
342
343impl StartableTime {
344    pub fn start(&mut self) {
345        assert!(self.start.is_none());
346        self.start = Some(Instant::now());
347    }
348
349    pub fn stop(&mut self) {
350        if let Some(start) = self.start.take() {
351            self.metrics.add_elapsed(start);
352        }
353    }
354}
355
356/// Metrics for [`FileStream`]
357///
358/// Note that all of these metrics are in terms of wall clock time
359/// (not cpu time) so they include time spent waiting on I/O as well
360/// as other operators.
361///
362/// [`FileStream`]: <https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_stream.rs>
363pub struct FileStreamMetrics {
364    /// Wall clock time elapsed for file opening.
365    ///
366    /// Time between when [`FileOpener::open`] is called and when the
367    /// [`FileStream`] receives a stream for reading.
368    ///
369    /// If there are multiple files being scanned, the stream
370    /// will open the next file in the background while scanning the
371    /// current file. This metric will only capture time spent opening
372    /// while not also scanning.
373    /// [`FileStream`]: <https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_stream.rs>
374    pub time_opening: StartableTime,
375    /// Wall clock time elapsed for file scanning + first record batch of decompression + decoding
376    ///
377    /// Time between when the [`FileStream`] requests data from the
378    /// stream and when the first [`RecordBatch`] is produced.
379    /// [`FileStream`]: <https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_stream.rs>
380    pub time_scanning_until_data: StartableTime,
381    /// Total elapsed wall clock time for scanning + record batch decompression / decoding
382    ///
383    /// Sum of time between when the [`FileStream`] requests data from
384    /// the stream and when a [`RecordBatch`] is produced for all
385    /// record batches in the stream. Note that this metric also
386    /// includes the time of the parent operator's execution.
387    pub time_scanning_total: StartableTime,
388    /// Wall clock time elapsed for data decompression + decoding
389    ///
390    /// Time spent waiting for the FileStream's input.
391    pub time_processing: StartableTime,
392    /// Count of errors opening file.
393    ///
394    /// If using `OnError::Skip` this will provide a count of the number of files
395    /// which were skipped and will not be included in the scan results.
396    pub file_open_errors: Count,
397    /// Count of errors scanning file
398    ///
399    /// If using `OnError::Skip` this will provide a count of the number of files
400    /// which were skipped and will not be included in the scan results.
401    pub file_scan_errors: Count,
402}
403
404impl FileStreamMetrics {
405    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
406        let time_opening = StartableTime {
407            metrics: MetricBuilder::new(metrics)
408                .subset_time("time_elapsed_opening", partition),
409            start: None,
410        };
411
412        let time_scanning_until_data = StartableTime {
413            metrics: MetricBuilder::new(metrics)
414                .subset_time("time_elapsed_scanning_until_data", partition),
415            start: None,
416        };
417
418        let time_scanning_total = StartableTime {
419            metrics: MetricBuilder::new(metrics)
420                .subset_time("time_elapsed_scanning_total", partition),
421            start: None,
422        };
423
424        let time_processing = StartableTime {
425            metrics: MetricBuilder::new(metrics)
426                .subset_time("time_elapsed_processing", partition),
427            start: None,
428        };
429
430        let file_open_errors =
431            MetricBuilder::new(metrics).counter("file_open_errors", partition);
432
433        let file_scan_errors =
434            MetricBuilder::new(metrics).counter("file_scan_errors", partition);
435
436        Self {
437            time_opening,
438            time_scanning_until_data,
439            time_scanning_total,
440            time_processing,
441            file_open_errors,
442            file_scan_errors,
443        }
444    }
445}
446
447#[cfg(test)]
448mod tests {
449    use crate::PartitionedFile;
450    use crate::file_scan_config::FileScanConfigBuilder;
451    use crate::tests::make_partition;
452    use datafusion_common::error::Result;
453    use datafusion_execution::object_store::ObjectStoreUrl;
454    use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
455    use futures::{FutureExt as _, StreamExt as _};
456    use std::sync::Arc;
457    use std::sync::atomic::{AtomicUsize, Ordering};
458
459    use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
460    use crate::test_util::MockSource;
461    use arrow::array::RecordBatch;
462    use arrow::datatypes::Schema;
463
464    use datafusion_common::{assert_batches_eq, exec_err, internal_err};
465
466    /// Test `FileOpener` which will simulate errors during file opening or scanning
467    #[derive(Default)]
468    struct TestOpener {
469        /// Index in stream of files which should throw an error while opening
470        error_opening_idx: Vec<usize>,
471        /// Index in stream of files which should throw an error while scanning
472        error_scanning_idx: Vec<usize>,
473        /// Index of last file in stream
474        current_idx: AtomicUsize,
475        /// `RecordBatch` to return
476        records: Vec<RecordBatch>,
477    }
478
479    impl FileOpener for TestOpener {
480        fn open(&self, _partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
481            let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
482
483            if self.error_opening_idx.contains(&idx) {
484                Ok(futures::future::ready(internal_err!("error opening")).boxed())
485            } else if self.error_scanning_idx.contains(&idx) {
486                let error = futures::future::ready(exec_err!("error scanning"));
487                let stream = futures::stream::once(error).boxed();
488                Ok(futures::future::ready(Ok(stream)).boxed())
489            } else {
490                let iterator = self.records.clone().into_iter().map(Ok);
491                let stream = futures::stream::iter(iterator).boxed();
492                Ok(futures::future::ready(Ok(stream)).boxed())
493            }
494        }
495    }
496
497    #[derive(Default)]
498    struct FileStreamTest {
499        /// Number of files in the stream
500        num_files: usize,
501        /// Global limit of records emitted by the stream
502        limit: Option<usize>,
503        /// Error-handling behavior of the stream
504        on_error: OnError,
505        /// Mock `FileOpener`
506        opener: TestOpener,
507    }
508
509    impl FileStreamTest {
510        pub fn new() -> Self {
511            Self::default()
512        }
513
514        /// Specify the number of files in the stream
515        pub fn with_num_files(mut self, num_files: usize) -> Self {
516            self.num_files = num_files;
517            self
518        }
519
520        /// Specify the limit
521        pub fn with_limit(mut self, limit: Option<usize>) -> Self {
522            self.limit = limit;
523            self
524        }
525
526        /// Specify the index of files in the stream which should
527        /// throw an error when opening
528        pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
529            self.opener.error_opening_idx = idx;
530            self
531        }
532
533        /// Specify the index of files in the stream which should
534        /// throw an error when scanning
535        pub fn with_scan_errors(mut self, idx: Vec<usize>) -> Self {
536            self.opener.error_scanning_idx = idx;
537            self
538        }
539
540        /// Specify the behavior of the stream when an error occurs
541        pub fn with_on_error(mut self, on_error: OnError) -> Self {
542            self.on_error = on_error;
543            self
544        }
545
546        /// Specify the record batches that should be returned from each
547        /// file that is successfully scanned
548        pub fn with_records(mut self, records: Vec<RecordBatch>) -> Self {
549            self.opener.records = records;
550            self
551        }
552
553        /// Collect the results of the `FileStream`
554        pub async fn result(self) -> Result<Vec<RecordBatch>> {
555            let file_schema = self
556                .opener
557                .records
558                .first()
559                .map(|batch| batch.schema())
560                .unwrap_or_else(|| Arc::new(Schema::empty()));
561
562            // let ctx = SessionContext::new();
563            let mock_files: Vec<(String, u64)> = (0..self.num_files)
564                .map(|idx| (format!("mock_file{idx}"), 10_u64))
565                .collect();
566
567            // let mock_files_ref: Vec<(&str, u64)> = mock_files
568            //     .iter()
569            //     .map(|(name, size)| (name.as_str(), *size))
570            //     .collect();
571
572            let file_group = mock_files
573                .into_iter()
574                .map(|(name, size)| PartitionedFile::new(name, size))
575                .collect();
576
577            let on_error = self.on_error;
578
579            let table_schema = crate::table_schema::TableSchema::new(file_schema, vec![]);
580            let config = FileScanConfigBuilder::new(
581                ObjectStoreUrl::parse("test:///").unwrap(),
582                Arc::new(MockSource::new(table_schema)),
583            )
584            .with_file_group(file_group)
585            .with_limit(self.limit)
586            .build();
587            let metrics_set = ExecutionPlanMetricsSet::new();
588            let file_stream =
589                FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set)
590                    .unwrap()
591                    .with_on_error(on_error);
592
593            file_stream
594                .collect::<Vec<_>>()
595                .await
596                .into_iter()
597                .collect::<Result<Vec<_>>>()
598        }
599    }
600
601    /// helper that creates a stream of 2 files with the same pair of batches in each ([0,1,2] and [0,1])
602    async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
603        FileStreamTest::new()
604            .with_records(vec![make_partition(3), make_partition(2)])
605            .with_num_files(2)
606            .with_limit(limit)
607            .result()
608            .await
609            .expect("error executing stream")
610    }
611
612    #[tokio::test]
613    async fn on_error_opening() -> Result<()> {
614        let batches = FileStreamTest::new()
615            .with_records(vec![make_partition(3), make_partition(2)])
616            .with_num_files(2)
617            .with_on_error(OnError::Skip)
618            .with_open_errors(vec![0])
619            .result()
620            .await?;
621
622        #[rustfmt::skip]
623        assert_batches_eq!(&[
624            "+---+",
625            "| i |",
626            "+---+",
627            "| 0 |",
628            "| 1 |",
629            "| 2 |",
630            "| 0 |",
631            "| 1 |",
632            "+---+",
633        ], &batches);
634
635        let batches = FileStreamTest::new()
636            .with_records(vec![make_partition(3), make_partition(2)])
637            .with_num_files(2)
638            .with_on_error(OnError::Skip)
639            .with_open_errors(vec![1])
640            .result()
641            .await?;
642
643        #[rustfmt::skip]
644        assert_batches_eq!(&[
645            "+---+",
646            "| i |",
647            "+---+",
648            "| 0 |",
649            "| 1 |",
650            "| 2 |",
651            "| 0 |",
652            "| 1 |",
653            "+---+",
654        ], &batches);
655
656        let batches = FileStreamTest::new()
657            .with_records(vec![make_partition(3), make_partition(2)])
658            .with_num_files(2)
659            .with_on_error(OnError::Skip)
660            .with_open_errors(vec![0, 1])
661            .result()
662            .await?;
663
664        #[rustfmt::skip]
665        assert_batches_eq!(&[
666            "++",
667            "++",
668        ], &batches);
669
670        Ok(())
671    }
672
673    #[tokio::test]
674    async fn on_error_scanning_fail() -> Result<()> {
675        let result = FileStreamTest::new()
676            .with_records(vec![make_partition(3), make_partition(2)])
677            .with_num_files(2)
678            .with_on_error(OnError::Fail)
679            .with_scan_errors(vec![1])
680            .result()
681            .await;
682
683        assert!(result.is_err());
684
685        Ok(())
686    }
687
688    #[tokio::test]
689    async fn on_error_opening_fail() -> Result<()> {
690        let result = FileStreamTest::new()
691            .with_records(vec![make_partition(3), make_partition(2)])
692            .with_num_files(2)
693            .with_on_error(OnError::Fail)
694            .with_open_errors(vec![1])
695            .result()
696            .await;
697
698        assert!(result.is_err());
699
700        Ok(())
701    }
702
703    #[tokio::test]
704    async fn on_error_scanning() -> Result<()> {
705        let batches = FileStreamTest::new()
706            .with_records(vec![make_partition(3), make_partition(2)])
707            .with_num_files(2)
708            .with_on_error(OnError::Skip)
709            .with_scan_errors(vec![0])
710            .result()
711            .await?;
712
713        #[rustfmt::skip]
714        assert_batches_eq!(&[
715            "+---+",
716            "| i |",
717            "+---+",
718            "| 0 |",
719            "| 1 |",
720            "| 2 |",
721            "| 0 |",
722            "| 1 |",
723            "+---+",
724        ], &batches);
725
726        let batches = FileStreamTest::new()
727            .with_records(vec![make_partition(3), make_partition(2)])
728            .with_num_files(2)
729            .with_on_error(OnError::Skip)
730            .with_scan_errors(vec![1])
731            .result()
732            .await?;
733
734        #[rustfmt::skip]
735        assert_batches_eq!(&[
736            "+---+",
737            "| i |",
738            "+---+",
739            "| 0 |",
740            "| 1 |",
741            "| 2 |",
742            "| 0 |",
743            "| 1 |",
744            "+---+",
745        ], &batches);
746
747        let batches = FileStreamTest::new()
748            .with_records(vec![make_partition(3), make_partition(2)])
749            .with_num_files(2)
750            .with_on_error(OnError::Skip)
751            .with_scan_errors(vec![0, 1])
752            .result()
753            .await?;
754
755        #[rustfmt::skip]
756        assert_batches_eq!(&[
757            "++",
758            "++",
759        ], &batches);
760
761        Ok(())
762    }
763
764    #[tokio::test]
765    async fn on_error_mixed() -> Result<()> {
766        let batches = FileStreamTest::new()
767            .with_records(vec![make_partition(3), make_partition(2)])
768            .with_num_files(3)
769            .with_on_error(OnError::Skip)
770            .with_open_errors(vec![1])
771            .with_scan_errors(vec![0])
772            .result()
773            .await?;
774
775        #[rustfmt::skip]
776        assert_batches_eq!(&[
777            "+---+",
778            "| i |",
779            "+---+",
780            "| 0 |",
781            "| 1 |",
782            "| 2 |",
783            "| 0 |",
784            "| 1 |",
785            "+---+",
786        ], &batches);
787
788        let batches = FileStreamTest::new()
789            .with_records(vec![make_partition(3), make_partition(2)])
790            .with_num_files(3)
791            .with_on_error(OnError::Skip)
792            .with_open_errors(vec![0])
793            .with_scan_errors(vec![1])
794            .result()
795            .await?;
796
797        #[rustfmt::skip]
798        assert_batches_eq!(&[
799            "+---+",
800            "| i |",
801            "+---+",
802            "| 0 |",
803            "| 1 |",
804            "| 2 |",
805            "| 0 |",
806            "| 1 |",
807            "+---+",
808        ], &batches);
809
810        let batches = FileStreamTest::new()
811            .with_records(vec![make_partition(3), make_partition(2)])
812            .with_num_files(3)
813            .with_on_error(OnError::Skip)
814            .with_open_errors(vec![2])
815            .with_scan_errors(vec![0, 1])
816            .result()
817            .await?;
818
819        #[rustfmt::skip]
820        assert_batches_eq!(&[
821            "++",
822            "++",
823        ], &batches);
824
825        let batches = FileStreamTest::new()
826            .with_records(vec![make_partition(3), make_partition(2)])
827            .with_num_files(3)
828            .with_on_error(OnError::Skip)
829            .with_open_errors(vec![0, 2])
830            .with_scan_errors(vec![1])
831            .result()
832            .await?;
833
834        #[rustfmt::skip]
835        assert_batches_eq!(&[
836            "++",
837            "++",
838        ], &batches);
839
840        Ok(())
841    }
842
843    #[tokio::test]
844    async fn without_limit() -> Result<()> {
845        let batches = create_and_collect(None).await;
846
847        #[rustfmt::skip]
848        assert_batches_eq!(&[
849            "+---+",
850            "| i |",
851            "+---+",
852            "| 0 |",
853            "| 1 |",
854            "| 2 |",
855            "| 0 |",
856            "| 1 |",
857            "| 0 |",
858            "| 1 |",
859            "| 2 |",
860            "| 0 |",
861            "| 1 |",
862            "+---+",
863        ], &batches);
864
865        Ok(())
866    }
867
868    #[tokio::test]
869    async fn with_limit_between_files() -> Result<()> {
870        let batches = create_and_collect(Some(5)).await;
871        #[rustfmt::skip]
872        assert_batches_eq!(&[
873            "+---+",
874            "| i |",
875            "+---+",
876            "| 0 |",
877            "| 1 |",
878            "| 2 |",
879            "| 0 |",
880            "| 1 |",
881            "+---+",
882        ], &batches);
883
884        Ok(())
885    }
886
887    #[tokio::test]
888    async fn with_limit_at_middle_of_batch() -> Result<()> {
889        let batches = create_and_collect(Some(6)).await;
890        #[rustfmt::skip]
891        assert_batches_eq!(&[
892            "+---+",
893            "| i |",
894            "+---+",
895            "| 0 |",
896            "| 1 |",
897            "| 2 |",
898            "| 0 |",
899            "| 1 |",
900            "| 0 |",
901            "+---+",
902        ], &batches);
903
904        Ok(())
905    }
906}