datafusion_datasource/
file_stream.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A generic stream over file format readers that can be used by
19//! any file format that read its files from start to end.
20//!
21//! Note: Most traits here need to be marked `Sync + Send` to be
22//! compliant with the `SendableRecordBatchStream` trait.
23
24use std::collections::VecDeque;
25use std::mem;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29
30use crate::file_meta::FileMeta;
31use crate::file_scan_config::{FileScanConfig, PartitionColumnProjector};
32use crate::PartitionedFile;
33use arrow::datatypes::SchemaRef;
34use datafusion_common::error::Result;
35use datafusion_execution::RecordBatchStream;
36use datafusion_physical_plan::metrics::{
37    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
38};
39
40use arrow::error::ArrowError;
41use arrow::record_batch::RecordBatch;
42use datafusion_common::instant::Instant;
43use datafusion_common::ScalarValue;
44
45use futures::future::BoxFuture;
46use futures::stream::BoxStream;
47use futures::{ready, FutureExt as _, Stream, StreamExt as _};
48
49/// A stream that iterates record batch by record batch, file over file.
50pub struct FileStream {
51    /// An iterator over input files.
52    file_iter: VecDeque<PartitionedFile>,
53    /// The stream schema (file schema including partition columns and after
54    /// projection).
55    projected_schema: SchemaRef,
56    /// The remaining number of records to parse, None if no limit
57    remain: Option<usize>,
58    /// A dynamic [`FileOpener`]. Calling `open()` returns a [`FileOpenFuture`],
59    /// which can be resolved to a stream of `RecordBatch`.
60    file_opener: Arc<dyn FileOpener>,
61    /// The partition column projector
62    pc_projector: PartitionColumnProjector,
63    /// The stream state
64    state: FileStreamState,
65    /// File stream specific metrics
66    file_stream_metrics: FileStreamMetrics,
67    /// runtime baseline metrics
68    baseline_metrics: BaselineMetrics,
69    /// Describes the behavior of the `FileStream` if file opening or scanning fails
70    on_error: OnError,
71}
72
73impl FileStream {
74    /// Create a new `FileStream` using the give `FileOpener` to scan underlying files
75    pub fn new(
76        config: &FileScanConfig,
77        partition: usize,
78        file_opener: Arc<dyn FileOpener>,
79        metrics: &ExecutionPlanMetricsSet,
80    ) -> Result<Self> {
81        let (projected_schema, ..) = config.project();
82        let pc_projector = PartitionColumnProjector::new(
83            Arc::clone(&projected_schema),
84            &config
85                .table_partition_cols
86                .iter()
87                .map(|x| x.name().clone())
88                .collect::<Vec<_>>(),
89        );
90
91        let files = config.file_groups[partition].clone();
92
93        Ok(Self {
94            file_iter: files.into(),
95            projected_schema,
96            remain: config.limit,
97            file_opener,
98            pc_projector,
99            state: FileStreamState::Idle,
100            file_stream_metrics: FileStreamMetrics::new(metrics, partition),
101            baseline_metrics: BaselineMetrics::new(metrics, partition),
102            on_error: OnError::Fail,
103        })
104    }
105
106    /// Specify the behavior when an error occurs opening or scanning a file
107    ///
108    /// If `OnError::Skip` the stream will skip files which encounter an error and continue
109    /// If `OnError:Fail` (default) the stream will fail and stop processing when an error occurs
110    pub fn with_on_error(mut self, on_error: OnError) -> Self {
111        self.on_error = on_error;
112        self
113    }
114
115    /// Begin opening the next file in parallel while decoding the current file in FileStream.
116    ///
117    /// Since file opening is mostly IO (and may involve a
118    /// bunch of sequential IO), it can be parallelized with decoding.
119    fn start_next_file(&mut self) -> Option<Result<(FileOpenFuture, Vec<ScalarValue>)>> {
120        let part_file = self.file_iter.pop_front()?;
121
122        let file_meta = FileMeta {
123            object_meta: part_file.object_meta,
124            range: part_file.range,
125            extensions: part_file.extensions,
126            metadata_size_hint: part_file.metadata_size_hint,
127        };
128
129        Some(
130            self.file_opener
131                .open(file_meta)
132                .map(|future| (future, part_file.partition_values)),
133        )
134    }
135
136    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
137        loop {
138            match &mut self.state {
139                FileStreamState::Idle => {
140                    self.file_stream_metrics.time_opening.start();
141
142                    match self.start_next_file().transpose() {
143                        Ok(Some((future, partition_values))) => {
144                            self.state = FileStreamState::Open {
145                                future,
146                                partition_values,
147                            }
148                        }
149                        Ok(None) => return Poll::Ready(None),
150                        Err(e) => {
151                            self.state = FileStreamState::Error;
152                            return Poll::Ready(Some(Err(e)));
153                        }
154                    }
155                }
156                FileStreamState::Open {
157                    future,
158                    partition_values,
159                } => match ready!(future.poll_unpin(cx)) {
160                    Ok(reader) => {
161                        let partition_values = mem::take(partition_values);
162
163                        // include time needed to start opening in `start_next_file`
164                        self.file_stream_metrics.time_opening.stop();
165                        let next = self.start_next_file().transpose();
166                        self.file_stream_metrics.time_scanning_until_data.start();
167                        self.file_stream_metrics.time_scanning_total.start();
168
169                        match next {
170                            Ok(Some((next_future, next_partition_values))) => {
171                                self.state = FileStreamState::Scan {
172                                    partition_values,
173                                    reader,
174                                    next: Some((
175                                        NextOpen::Pending(next_future),
176                                        next_partition_values,
177                                    )),
178                                };
179                            }
180                            Ok(None) => {
181                                self.state = FileStreamState::Scan {
182                                    reader,
183                                    partition_values,
184                                    next: None,
185                                };
186                            }
187                            Err(e) => {
188                                self.state = FileStreamState::Error;
189                                return Poll::Ready(Some(Err(e)));
190                            }
191                        }
192                    }
193                    Err(e) => {
194                        self.file_stream_metrics.file_open_errors.add(1);
195                        match self.on_error {
196                            OnError::Skip => {
197                                self.file_stream_metrics.time_opening.stop();
198                                self.state = FileStreamState::Idle
199                            }
200                            OnError::Fail => {
201                                self.state = FileStreamState::Error;
202                                return Poll::Ready(Some(Err(e)));
203                            }
204                        }
205                    }
206                },
207                FileStreamState::Scan {
208                    reader,
209                    partition_values,
210                    next,
211                } => {
212                    // We need to poll the next `FileOpenFuture` here to drive it forward
213                    if let Some((next_open_future, _)) = next {
214                        if let NextOpen::Pending(f) = next_open_future {
215                            if let Poll::Ready(reader) = f.as_mut().poll(cx) {
216                                *next_open_future = NextOpen::Ready(reader);
217                            }
218                        }
219                    }
220                    match ready!(reader.poll_next_unpin(cx)) {
221                        Some(Ok(batch)) => {
222                            self.file_stream_metrics.time_scanning_until_data.stop();
223                            self.file_stream_metrics.time_scanning_total.stop();
224                            let result = self
225                                .pc_projector
226                                .project(batch, partition_values)
227                                .map_err(|e| ArrowError::ExternalError(e.into()))
228                                .map(|batch| match &mut self.remain {
229                                    Some(remain) => {
230                                        if *remain > batch.num_rows() {
231                                            *remain -= batch.num_rows();
232                                            batch
233                                        } else {
234                                            let batch = batch.slice(0, *remain);
235                                            self.state = FileStreamState::Limit;
236                                            *remain = 0;
237                                            batch
238                                        }
239                                    }
240                                    None => batch,
241                                });
242
243                            if result.is_err() {
244                                // If the partition value projection fails, this is not governed by
245                                // the `OnError` behavior
246                                self.state = FileStreamState::Error
247                            }
248                            self.file_stream_metrics.time_scanning_total.start();
249                            return Poll::Ready(Some(result.map_err(Into::into)));
250                        }
251                        Some(Err(err)) => {
252                            self.file_stream_metrics.file_scan_errors.add(1);
253                            self.file_stream_metrics.time_scanning_until_data.stop();
254                            self.file_stream_metrics.time_scanning_total.stop();
255
256                            match self.on_error {
257                                // If `OnError::Skip` we skip the file as soon as we hit the first error
258                                OnError::Skip => match mem::take(next) {
259                                    Some((future, partition_values)) => {
260                                        self.file_stream_metrics.time_opening.start();
261
262                                        match future {
263                                            NextOpen::Pending(future) => {
264                                                self.state = FileStreamState::Open {
265                                                    future,
266                                                    partition_values,
267                                                }
268                                            }
269                                            NextOpen::Ready(reader) => {
270                                                self.state = FileStreamState::Open {
271                                                    future: Box::pin(std::future::ready(
272                                                        reader,
273                                                    )),
274                                                    partition_values,
275                                                }
276                                            }
277                                        }
278                                    }
279                                    None => return Poll::Ready(None),
280                                },
281                                OnError::Fail => {
282                                    self.state = FileStreamState::Error;
283                                    return Poll::Ready(Some(Err(err.into())));
284                                }
285                            }
286                        }
287                        None => {
288                            self.file_stream_metrics.time_scanning_until_data.stop();
289                            self.file_stream_metrics.time_scanning_total.stop();
290
291                            match mem::take(next) {
292                                Some((future, partition_values)) => {
293                                    self.file_stream_metrics.time_opening.start();
294
295                                    match future {
296                                        NextOpen::Pending(future) => {
297                                            self.state = FileStreamState::Open {
298                                                future,
299                                                partition_values,
300                                            }
301                                        }
302                                        NextOpen::Ready(reader) => {
303                                            self.state = FileStreamState::Open {
304                                                future: Box::pin(std::future::ready(
305                                                    reader,
306                                                )),
307                                                partition_values,
308                                            }
309                                        }
310                                    }
311                                }
312                                None => return Poll::Ready(None),
313                            }
314                        }
315                    }
316                }
317                FileStreamState::Error | FileStreamState::Limit => {
318                    return Poll::Ready(None)
319                }
320            }
321        }
322    }
323}
324
325impl Stream for FileStream {
326    type Item = Result<RecordBatch>;
327
328    fn poll_next(
329        mut self: Pin<&mut Self>,
330        cx: &mut Context<'_>,
331    ) -> Poll<Option<Self::Item>> {
332        self.file_stream_metrics.time_processing.start();
333        let result = self.poll_inner(cx);
334        self.file_stream_metrics.time_processing.stop();
335        self.baseline_metrics.record_poll(result)
336    }
337}
338
339impl RecordBatchStream for FileStream {
340    fn schema(&self) -> SchemaRef {
341        Arc::clone(&self.projected_schema)
342    }
343}
344
345/// A fallible future that resolves to a stream of [`RecordBatch`]
346pub type FileOpenFuture =
347    BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>>;
348
349/// Describes the behavior of the `FileStream` if file opening or scanning fails
350pub enum OnError {
351    /// Fail the entire stream and return the underlying error
352    Fail,
353    /// Continue scanning, ignoring the failed file
354    Skip,
355}
356
357impl Default for OnError {
358    fn default() -> Self {
359        Self::Fail
360    }
361}
362
363/// Generic API for opening a file using an [`ObjectStore`] and resolving to a
364/// stream of [`RecordBatch`]
365///
366/// [`ObjectStore`]: object_store::ObjectStore
367pub trait FileOpener: Unpin + Send + Sync {
368    /// Asynchronously open the specified file and return a stream
369    /// of [`RecordBatch`]
370    fn open(&self, file_meta: FileMeta) -> Result<FileOpenFuture>;
371}
372
373/// Represents the state of the next `FileOpenFuture`. Since we need to poll
374/// this future while scanning the current file, we need to store the result if it
375/// is ready
376pub enum NextOpen {
377    Pending(FileOpenFuture),
378    Ready(Result<BoxStream<'static, Result<RecordBatch, ArrowError>>>),
379}
380
381pub enum FileStreamState {
382    /// The idle state, no file is currently being read
383    Idle,
384    /// Currently performing asynchronous IO to obtain a stream of RecordBatch
385    /// for a given file
386    Open {
387        /// A [`FileOpenFuture`] returned by [`FileOpener::open`]
388        future: FileOpenFuture,
389        /// The partition values for this file
390        partition_values: Vec<ScalarValue>,
391    },
392    /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`]
393    /// returned by [`FileOpener::open`]
394    Scan {
395        /// Partitioning column values for the current batch_iter
396        partition_values: Vec<ScalarValue>,
397        /// The reader instance
398        reader: BoxStream<'static, Result<RecordBatch, ArrowError>>,
399        /// A [`FileOpenFuture`] for the next file to be processed,
400        /// and its corresponding partition column values, if any.
401        /// This allows the next file to be opened in parallel while the
402        /// current file is read.
403        next: Option<(NextOpen, Vec<ScalarValue>)>,
404    },
405    /// Encountered an error
406    Error,
407    /// Reached the row limit
408    Limit,
409}
410
411/// A timer that can be started and stopped.
412pub struct StartableTime {
413    pub metrics: Time,
414    // use for record each part cost time, will eventually add into 'metrics'.
415    pub start: Option<Instant>,
416}
417
418impl StartableTime {
419    pub fn start(&mut self) {
420        assert!(self.start.is_none());
421        self.start = Some(Instant::now());
422    }
423
424    pub fn stop(&mut self) {
425        if let Some(start) = self.start.take() {
426            self.metrics.add_elapsed(start);
427        }
428    }
429}
430
431#[allow(rustdoc::broken_intra_doc_links)]
432/// Metrics for [`FileStream`]
433///
434/// Note that all of these metrics are in terms of wall clock time
435/// (not cpu time) so they include time spent waiting on I/O as well
436/// as other operators.
437///
438/// [`FileStream`]: <https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_stream.rs>
439pub struct FileStreamMetrics {
440    /// Wall clock time elapsed for file opening.
441    ///
442    /// Time between when [`FileOpener::open`] is called and when the
443    /// [`FileStream`] receives a stream for reading.
444    ///
445    /// If there are multiple files being scanned, the stream
446    /// will open the next file in the background while scanning the
447    /// current file. This metric will only capture time spent opening
448    /// while not also scanning.
449    /// [`FileStream`]: <https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_stream.rs>
450    pub time_opening: StartableTime,
451    /// Wall clock time elapsed for file scanning + first record batch of decompression + decoding
452    ///
453    /// Time between when the [`FileStream`] requests data from the
454    /// stream and when the first [`RecordBatch`] is produced.
455    /// [`FileStream`]: <https://github.com/apache/datafusion/blob/main/datafusion/core/src/datasource/physical_plan/file_stream.rs>
456    pub time_scanning_until_data: StartableTime,
457    /// Total elapsed wall clock time for scanning + record batch decompression / decoding
458    ///
459    /// Sum of time between when the [`FileStream`] requests data from
460    /// the stream and when a [`RecordBatch`] is produced for all
461    /// record batches in the stream. Note that this metric also
462    /// includes the time of the parent operator's execution.
463    pub time_scanning_total: StartableTime,
464    /// Wall clock time elapsed for data decompression + decoding
465    ///
466    /// Time spent waiting for the FileStream's input.
467    pub time_processing: StartableTime,
468    /// Count of errors opening file.
469    ///
470    /// If using `OnError::Skip` this will provide a count of the number of files
471    /// which were skipped and will not be included in the scan results.
472    pub file_open_errors: Count,
473    /// Count of errors scanning file
474    ///
475    /// If using `OnError::Skip` this will provide a count of the number of files
476    /// which were skipped and will not be included in the scan results.
477    pub file_scan_errors: Count,
478}
479
480impl FileStreamMetrics {
481    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
482        let time_opening = StartableTime {
483            metrics: MetricBuilder::new(metrics)
484                .subset_time("time_elapsed_opening", partition),
485            start: None,
486        };
487
488        let time_scanning_until_data = StartableTime {
489            metrics: MetricBuilder::new(metrics)
490                .subset_time("time_elapsed_scanning_until_data", partition),
491            start: None,
492        };
493
494        let time_scanning_total = StartableTime {
495            metrics: MetricBuilder::new(metrics)
496                .subset_time("time_elapsed_scanning_total", partition),
497            start: None,
498        };
499
500        let time_processing = StartableTime {
501            metrics: MetricBuilder::new(metrics)
502                .subset_time("time_elapsed_processing", partition),
503            start: None,
504        };
505
506        let file_open_errors =
507            MetricBuilder::new(metrics).counter("file_open_errors", partition);
508
509        let file_scan_errors =
510            MetricBuilder::new(metrics).counter("file_scan_errors", partition);
511
512        Self {
513            time_opening,
514            time_scanning_until_data,
515            time_scanning_total,
516            time_processing,
517            file_open_errors,
518            file_scan_errors,
519        }
520    }
521}
522
523#[cfg(test)]
524mod tests {
525    use crate::file_scan_config::FileScanConfig;
526    use crate::tests::make_partition;
527    use crate::PartitionedFile;
528    use arrow::error::ArrowError;
529    use datafusion_common::error::Result;
530    use datafusion_execution::object_store::ObjectStoreUrl;
531    use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
532    use futures::{FutureExt as _, StreamExt as _};
533    use std::sync::atomic::{AtomicUsize, Ordering};
534    use std::sync::Arc;
535
536    use crate::file_meta::FileMeta;
537    use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
538    use crate::test_util::MockSource;
539    use arrow::array::RecordBatch;
540    use arrow::datatypes::Schema;
541
542    use datafusion_common::{assert_batches_eq, internal_err};
543
544    /// Test `FileOpener` which will simulate errors during file opening or scanning
545    #[derive(Default)]
546    struct TestOpener {
547        /// Index in stream of files which should throw an error while opening
548        error_opening_idx: Vec<usize>,
549        /// Index in stream of files which should throw an error while scanning
550        error_scanning_idx: Vec<usize>,
551        /// Index of last file in stream
552        current_idx: AtomicUsize,
553        /// `RecordBatch` to return
554        records: Vec<RecordBatch>,
555    }
556
557    impl FileOpener for TestOpener {
558        fn open(&self, _file_meta: FileMeta) -> Result<FileOpenFuture> {
559            let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
560
561            if self.error_opening_idx.contains(&idx) {
562                Ok(futures::future::ready(internal_err!("error opening")).boxed())
563            } else if self.error_scanning_idx.contains(&idx) {
564                let error = futures::future::ready(Err(ArrowError::IpcError(
565                    "error scanning".to_owned(),
566                )));
567                let stream = futures::stream::once(error).boxed();
568                Ok(futures::future::ready(Ok(stream)).boxed())
569            } else {
570                let iterator = self.records.clone().into_iter().map(Ok);
571                let stream = futures::stream::iter(iterator).boxed();
572                Ok(futures::future::ready(Ok(stream)).boxed())
573            }
574        }
575    }
576
577    #[derive(Default)]
578    struct FileStreamTest {
579        /// Number of files in the stream
580        num_files: usize,
581        /// Global limit of records emitted by the stream
582        limit: Option<usize>,
583        /// Error-handling behavior of the stream
584        on_error: OnError,
585        /// Mock `FileOpener`
586        opener: TestOpener,
587    }
588
589    impl FileStreamTest {
590        pub fn new() -> Self {
591            Self::default()
592        }
593
594        /// Specify the number of files in the stream
595        pub fn with_num_files(mut self, num_files: usize) -> Self {
596            self.num_files = num_files;
597            self
598        }
599
600        /// Specify the limit
601        pub fn with_limit(mut self, limit: Option<usize>) -> Self {
602            self.limit = limit;
603            self
604        }
605
606        /// Specify the index of files in the stream which should
607        /// throw an error when opening
608        pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
609            self.opener.error_opening_idx = idx;
610            self
611        }
612
613        /// Specify the index of files in the stream which should
614        /// throw an error when scanning
615        pub fn with_scan_errors(mut self, idx: Vec<usize>) -> Self {
616            self.opener.error_scanning_idx = idx;
617            self
618        }
619
620        /// Specify the behavior of the stream when an error occurs
621        pub fn with_on_error(mut self, on_error: OnError) -> Self {
622            self.on_error = on_error;
623            self
624        }
625
626        /// Specify the record batches that should be returned from each
627        /// file that is successfully scanned
628        pub fn with_records(mut self, records: Vec<RecordBatch>) -> Self {
629            self.opener.records = records;
630            self
631        }
632
633        /// Collect the results of the `FileStream`
634        pub async fn result(self) -> Result<Vec<RecordBatch>> {
635            let file_schema = self
636                .opener
637                .records
638                .first()
639                .map(|batch| batch.schema())
640                .unwrap_or_else(|| Arc::new(Schema::empty()));
641
642            // let ctx = SessionContext::new();
643            let mock_files: Vec<(String, u64)> = (0..self.num_files)
644                .map(|idx| (format!("mock_file{idx}"), 10_u64))
645                .collect();
646
647            // let mock_files_ref: Vec<(&str, u64)> = mock_files
648            //     .iter()
649            //     .map(|(name, size)| (name.as_str(), *size))
650            //     .collect();
651
652            let file_group = mock_files
653                .into_iter()
654                .map(|(name, size)| PartitionedFile::new(name, size))
655                .collect();
656
657            let on_error = self.on_error;
658
659            let config = FileScanConfig::new(
660                ObjectStoreUrl::parse("test:///").unwrap(),
661                file_schema,
662                Arc::new(MockSource::default()),
663            )
664            .with_file_group(file_group)
665            .with_limit(self.limit);
666            let metrics_set = ExecutionPlanMetricsSet::new();
667            let file_stream =
668                FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set)
669                    .unwrap()
670                    .with_on_error(on_error);
671
672            file_stream
673                .collect::<Vec<_>>()
674                .await
675                .into_iter()
676                .collect::<Result<Vec<_>>>()
677        }
678    }
679
680    /// helper that creates a stream of 2 files with the same pair of batches in each ([0,1,2] and [0,1])
681    async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
682        FileStreamTest::new()
683            .with_records(vec![make_partition(3), make_partition(2)])
684            .with_num_files(2)
685            .with_limit(limit)
686            .result()
687            .await
688            .expect("error executing stream")
689    }
690
691    #[tokio::test]
692    async fn on_error_opening() -> Result<()> {
693        let batches = FileStreamTest::new()
694            .with_records(vec![make_partition(3), make_partition(2)])
695            .with_num_files(2)
696            .with_on_error(OnError::Skip)
697            .with_open_errors(vec![0])
698            .result()
699            .await?;
700
701        #[rustfmt::skip]
702        assert_batches_eq!(&[
703            "+---+",
704            "| i |",
705            "+---+",
706            "| 0 |",
707            "| 1 |",
708            "| 2 |",
709            "| 0 |",
710            "| 1 |",
711            "+---+",
712        ], &batches);
713
714        let batches = FileStreamTest::new()
715            .with_records(vec![make_partition(3), make_partition(2)])
716            .with_num_files(2)
717            .with_on_error(OnError::Skip)
718            .with_open_errors(vec![1])
719            .result()
720            .await?;
721
722        #[rustfmt::skip]
723        assert_batches_eq!(&[
724            "+---+",
725            "| i |",
726            "+---+",
727            "| 0 |",
728            "| 1 |",
729            "| 2 |",
730            "| 0 |",
731            "| 1 |",
732            "+---+",
733        ], &batches);
734
735        let batches = FileStreamTest::new()
736            .with_records(vec![make_partition(3), make_partition(2)])
737            .with_num_files(2)
738            .with_on_error(OnError::Skip)
739            .with_open_errors(vec![0, 1])
740            .result()
741            .await?;
742
743        #[rustfmt::skip]
744        assert_batches_eq!(&[
745            "++",
746            "++",
747        ], &batches);
748
749        Ok(())
750    }
751
752    #[tokio::test]
753    async fn on_error_scanning_fail() -> Result<()> {
754        let result = FileStreamTest::new()
755            .with_records(vec![make_partition(3), make_partition(2)])
756            .with_num_files(2)
757            .with_on_error(OnError::Fail)
758            .with_scan_errors(vec![1])
759            .result()
760            .await;
761
762        assert!(result.is_err());
763
764        Ok(())
765    }
766
767    #[tokio::test]
768    async fn on_error_opening_fail() -> Result<()> {
769        let result = FileStreamTest::new()
770            .with_records(vec![make_partition(3), make_partition(2)])
771            .with_num_files(2)
772            .with_on_error(OnError::Fail)
773            .with_open_errors(vec![1])
774            .result()
775            .await;
776
777        assert!(result.is_err());
778
779        Ok(())
780    }
781
782    #[tokio::test]
783    async fn on_error_scanning() -> Result<()> {
784        let batches = FileStreamTest::new()
785            .with_records(vec![make_partition(3), make_partition(2)])
786            .with_num_files(2)
787            .with_on_error(OnError::Skip)
788            .with_scan_errors(vec![0])
789            .result()
790            .await?;
791
792        #[rustfmt::skip]
793        assert_batches_eq!(&[
794            "+---+",
795            "| i |",
796            "+---+",
797            "| 0 |",
798            "| 1 |",
799            "| 2 |",
800            "| 0 |",
801            "| 1 |",
802            "+---+",
803        ], &batches);
804
805        let batches = FileStreamTest::new()
806            .with_records(vec![make_partition(3), make_partition(2)])
807            .with_num_files(2)
808            .with_on_error(OnError::Skip)
809            .with_scan_errors(vec![1])
810            .result()
811            .await?;
812
813        #[rustfmt::skip]
814        assert_batches_eq!(&[
815            "+---+",
816            "| i |",
817            "+---+",
818            "| 0 |",
819            "| 1 |",
820            "| 2 |",
821            "| 0 |",
822            "| 1 |",
823            "+---+",
824        ], &batches);
825
826        let batches = FileStreamTest::new()
827            .with_records(vec![make_partition(3), make_partition(2)])
828            .with_num_files(2)
829            .with_on_error(OnError::Skip)
830            .with_scan_errors(vec![0, 1])
831            .result()
832            .await?;
833
834        #[rustfmt::skip]
835        assert_batches_eq!(&[
836            "++",
837            "++",
838        ], &batches);
839
840        Ok(())
841    }
842
843    #[tokio::test]
844    async fn on_error_mixed() -> Result<()> {
845        let batches = FileStreamTest::new()
846            .with_records(vec![make_partition(3), make_partition(2)])
847            .with_num_files(3)
848            .with_on_error(OnError::Skip)
849            .with_open_errors(vec![1])
850            .with_scan_errors(vec![0])
851            .result()
852            .await?;
853
854        #[rustfmt::skip]
855        assert_batches_eq!(&[
856            "+---+",
857            "| i |",
858            "+---+",
859            "| 0 |",
860            "| 1 |",
861            "| 2 |",
862            "| 0 |",
863            "| 1 |",
864            "+---+",
865        ], &batches);
866
867        let batches = FileStreamTest::new()
868            .with_records(vec![make_partition(3), make_partition(2)])
869            .with_num_files(3)
870            .with_on_error(OnError::Skip)
871            .with_open_errors(vec![0])
872            .with_scan_errors(vec![1])
873            .result()
874            .await?;
875
876        #[rustfmt::skip]
877        assert_batches_eq!(&[
878            "+---+",
879            "| i |",
880            "+---+",
881            "| 0 |",
882            "| 1 |",
883            "| 2 |",
884            "| 0 |",
885            "| 1 |",
886            "+---+",
887        ], &batches);
888
889        let batches = FileStreamTest::new()
890            .with_records(vec![make_partition(3), make_partition(2)])
891            .with_num_files(3)
892            .with_on_error(OnError::Skip)
893            .with_open_errors(vec![2])
894            .with_scan_errors(vec![0, 1])
895            .result()
896            .await?;
897
898        #[rustfmt::skip]
899        assert_batches_eq!(&[
900            "++",
901            "++",
902        ], &batches);
903
904        let batches = FileStreamTest::new()
905            .with_records(vec![make_partition(3), make_partition(2)])
906            .with_num_files(3)
907            .with_on_error(OnError::Skip)
908            .with_open_errors(vec![0, 2])
909            .with_scan_errors(vec![1])
910            .result()
911            .await?;
912
913        #[rustfmt::skip]
914        assert_batches_eq!(&[
915            "++",
916            "++",
917        ], &batches);
918
919        Ok(())
920    }
921
922    #[tokio::test]
923    async fn without_limit() -> Result<()> {
924        let batches = create_and_collect(None).await;
925
926        #[rustfmt::skip]
927        assert_batches_eq!(&[
928            "+---+",
929            "| i |",
930            "+---+",
931            "| 0 |",
932            "| 1 |",
933            "| 2 |",
934            "| 0 |",
935            "| 1 |",
936            "| 0 |",
937            "| 1 |",
938            "| 2 |",
939            "| 0 |",
940            "| 1 |",
941            "+---+",
942        ], &batches);
943
944        Ok(())
945    }
946
947    #[tokio::test]
948    async fn with_limit_between_files() -> Result<()> {
949        let batches = create_and_collect(Some(5)).await;
950        #[rustfmt::skip]
951        assert_batches_eq!(&[
952            "+---+",
953            "| i |",
954            "+---+",
955            "| 0 |",
956            "| 1 |",
957            "| 2 |",
958            "| 0 |",
959            "| 1 |",
960            "+---+",
961        ], &batches);
962
963        Ok(())
964    }
965
966    #[tokio::test]
967    async fn with_limit_at_middle_of_batch() -> Result<()> {
968        let batches = create_and_collect(Some(6)).await;
969        #[rustfmt::skip]
970        assert_batches_eq!(&[
971            "+---+",
972            "| i |",
973            "+---+",
974            "| 0 |",
975            "| 1 |",
976            "| 2 |",
977            "| 0 |",
978            "| 1 |",
979            "| 0 |",
980            "+---+",
981        ], &batches);
982
983        Ok(())
984    }
985}