datafusion_datasource/
file_stream.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A generic stream over file format readers that can be used by
19//! any file format that read its files from start to end.
20//!
21//! Note: Most traits here need to be marked `Sync + Send` to be
22//! compliant with the `SendableRecordBatchStream` trait.
23
24use std::collections::VecDeque;
25use std::mem;
26use std::pin::Pin;
27use std::sync::Arc;
28use std::task::{Context, Poll};
29
30use crate::file_scan_config::{FileScanConfig, PartitionColumnProjector};
31use crate::PartitionedFile;
32use arrow::datatypes::SchemaRef;
33use datafusion_common::error::Result;
34use datafusion_execution::RecordBatchStream;
35use datafusion_physical_plan::metrics::{
36    BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
37};
38
39use arrow::record_batch::RecordBatch;
40use datafusion_common::instant::Instant;
41use datafusion_common::ScalarValue;
42
43use futures::future::BoxFuture;
44use futures::stream::BoxStream;
45use futures::{ready, FutureExt as _, Stream, StreamExt as _};
46
47/// A stream that iterates record batch by record batch, file over file.
48pub struct FileStream {
49    /// An iterator over input files.
50    file_iter: VecDeque<PartitionedFile>,
51    /// The stream schema (file schema including partition columns and after
52    /// projection).
53    projected_schema: SchemaRef,
54    /// The remaining number of records to parse, None if no limit
55    remain: Option<usize>,
56    /// A dynamic [`FileOpener`]. Calling `open()` returns a [`FileOpenFuture`],
57    /// which can be resolved to a stream of `RecordBatch`.
58    file_opener: Arc<dyn FileOpener>,
59    /// The partition column projector
60    pc_projector: PartitionColumnProjector,
61    /// The stream state
62    state: FileStreamState,
63    /// File stream specific metrics
64    file_stream_metrics: FileStreamMetrics,
65    /// runtime baseline metrics
66    baseline_metrics: BaselineMetrics,
67    /// Describes the behavior of the `FileStream` if file opening or scanning fails
68    on_error: OnError,
69}
70
71impl FileStream {
72    /// Create a new `FileStream` using the give `FileOpener` to scan underlying files
73    pub fn new(
74        config: &FileScanConfig,
75        partition: usize,
76        file_opener: Arc<dyn FileOpener>,
77        metrics: &ExecutionPlanMetricsSet,
78    ) -> Result<Self> {
79        let projected_schema = config.projected_schema();
80        let pc_projector = PartitionColumnProjector::new(
81            Arc::clone(&projected_schema),
82            &config
83                .table_partition_cols()
84                .iter()
85                .map(|x| x.name().clone())
86                .collect::<Vec<_>>(),
87        );
88
89        let file_group = config.file_groups[partition].clone();
90
91        Ok(Self {
92            file_iter: file_group.into_inner().into_iter().collect(),
93            projected_schema,
94            remain: config.limit,
95            file_opener,
96            pc_projector,
97            state: FileStreamState::Idle,
98            file_stream_metrics: FileStreamMetrics::new(metrics, partition),
99            baseline_metrics: BaselineMetrics::new(metrics, partition),
100            on_error: OnError::Fail,
101        })
102    }
103
104    /// Specify the behavior when an error occurs opening or scanning a file
105    ///
106    /// If `OnError::Skip` the stream will skip files which encounter an error and continue
107    /// If `OnError:Fail` (default) the stream will fail and stop processing when an error occurs
108    pub fn with_on_error(mut self, on_error: OnError) -> Self {
109        self.on_error = on_error;
110        self
111    }
112
113    /// Begin opening the next file in parallel while decoding the current file in FileStream.
114    ///
115    /// Since file opening is mostly IO (and may involve a
116    /// bunch of sequential IO), it can be parallelized with decoding.
117    fn start_next_file(&mut self) -> Option<Result<(FileOpenFuture, Vec<ScalarValue>)>> {
118        let part_file = self.file_iter.pop_front()?;
119
120        let partition_values = part_file.partition_values.clone();
121        Some(
122            self.file_opener
123                .open(part_file)
124                .map(|future| (future, partition_values)),
125        )
126    }
127
128    fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch>>> {
129        loop {
130            match &mut self.state {
131                FileStreamState::Idle => {
132                    self.file_stream_metrics.time_opening.start();
133
134                    match self.start_next_file().transpose() {
135                        Ok(Some((future, partition_values))) => {
136                            self.state = FileStreamState::Open {
137                                future,
138                                partition_values,
139                            }
140                        }
141                        Ok(None) => return Poll::Ready(None),
142                        Err(e) => {
143                            self.state = FileStreamState::Error;
144                            return Poll::Ready(Some(Err(e)));
145                        }
146                    }
147                }
148                FileStreamState::Open {
149                    future,
150                    partition_values,
151                } => match ready!(future.poll_unpin(cx)) {
152                    Ok(reader) => {
153                        let partition_values = mem::take(partition_values);
154
155                        // include time needed to start opening in `start_next_file`
156                        self.file_stream_metrics.time_opening.stop();
157                        let next = self.start_next_file().transpose();
158                        self.file_stream_metrics.time_scanning_until_data.start();
159                        self.file_stream_metrics.time_scanning_total.start();
160
161                        match next {
162                            Ok(Some((next_future, next_partition_values))) => {
163                                self.state = FileStreamState::Scan {
164                                    partition_values,
165                                    reader,
166                                    next: Some((
167                                        NextOpen::Pending(next_future),
168                                        next_partition_values,
169                                    )),
170                                };
171                            }
172                            Ok(None) => {
173                                self.state = FileStreamState::Scan {
174                                    reader,
175                                    partition_values,
176                                    next: None,
177                                };
178                            }
179                            Err(e) => {
180                                self.state = FileStreamState::Error;
181                                return Poll::Ready(Some(Err(e)));
182                            }
183                        }
184                    }
185                    Err(e) => {
186                        self.file_stream_metrics.file_open_errors.add(1);
187                        match self.on_error {
188                            OnError::Skip => {
189                                self.file_stream_metrics.time_opening.stop();
190                                self.state = FileStreamState::Idle
191                            }
192                            OnError::Fail => {
193                                self.state = FileStreamState::Error;
194                                return Poll::Ready(Some(Err(e)));
195                            }
196                        }
197                    }
198                },
199                FileStreamState::Scan {
200                    reader,
201                    partition_values,
202                    next,
203                } => {
204                    // We need to poll the next `FileOpenFuture` here to drive it forward
205                    if let Some((next_open_future, _)) = next {
206                        if let NextOpen::Pending(f) = next_open_future {
207                            if let Poll::Ready(reader) = f.as_mut().poll(cx) {
208                                *next_open_future = NextOpen::Ready(reader);
209                            }
210                        }
211                    }
212                    match ready!(reader.poll_next_unpin(cx)) {
213                        Some(Ok(batch)) => {
214                            self.file_stream_metrics.time_scanning_until_data.stop();
215                            self.file_stream_metrics.time_scanning_total.stop();
216                            let result = self
217                                .pc_projector
218                                .project(batch, partition_values)
219                                .map(|batch| match &mut self.remain {
220                                    Some(remain) => {
221                                        if *remain > batch.num_rows() {
222                                            *remain -= batch.num_rows();
223                                            batch
224                                        } else {
225                                            let batch = batch.slice(0, *remain);
226                                            self.state = FileStreamState::Limit;
227                                            *remain = 0;
228                                            batch
229                                        }
230                                    }
231                                    None => batch,
232                                });
233
234                            if result.is_err() {
235                                // If the partition value projection fails, this is not governed by
236                                // the `OnError` behavior
237                                self.state = FileStreamState::Error
238                            }
239                            self.file_stream_metrics.time_scanning_total.start();
240                            return Poll::Ready(Some(result));
241                        }
242                        Some(Err(err)) => {
243                            self.file_stream_metrics.file_scan_errors.add(1);
244                            self.file_stream_metrics.time_scanning_until_data.stop();
245                            self.file_stream_metrics.time_scanning_total.stop();
246
247                            match self.on_error {
248                                // If `OnError::Skip` we skip the file as soon as we hit the first error
249                                OnError::Skip => match mem::take(next) {
250                                    Some((future, partition_values)) => {
251                                        self.file_stream_metrics.time_opening.start();
252
253                                        match future {
254                                            NextOpen::Pending(future) => {
255                                                self.state = FileStreamState::Open {
256                                                    future,
257                                                    partition_values,
258                                                }
259                                            }
260                                            NextOpen::Ready(reader) => {
261                                                self.state = FileStreamState::Open {
262                                                    future: Box::pin(std::future::ready(
263                                                        reader,
264                                                    )),
265                                                    partition_values,
266                                                }
267                                            }
268                                        }
269                                    }
270                                    None => return Poll::Ready(None),
271                                },
272                                OnError::Fail => {
273                                    self.state = FileStreamState::Error;
274                                    return Poll::Ready(Some(Err(err)));
275                                }
276                            }
277                        }
278                        None => {
279                            self.file_stream_metrics.time_scanning_until_data.stop();
280                            self.file_stream_metrics.time_scanning_total.stop();
281
282                            match mem::take(next) {
283                                Some((future, partition_values)) => {
284                                    self.file_stream_metrics.time_opening.start();
285
286                                    match future {
287                                        NextOpen::Pending(future) => {
288                                            self.state = FileStreamState::Open {
289                                                future,
290                                                partition_values,
291                                            }
292                                        }
293                                        NextOpen::Ready(reader) => {
294                                            self.state = FileStreamState::Open {
295                                                future: Box::pin(std::future::ready(
296                                                    reader,
297                                                )),
298                                                partition_values,
299                                            }
300                                        }
301                                    }
302                                }
303                                None => return Poll::Ready(None),
304                            }
305                        }
306                    }
307                }
308                FileStreamState::Error | FileStreamState::Limit => {
309                    return Poll::Ready(None)
310                }
311            }
312        }
313    }
314}
315
316impl Stream for FileStream {
317    type Item = Result<RecordBatch>;
318
319    fn poll_next(
320        mut self: Pin<&mut Self>,
321        cx: &mut Context<'_>,
322    ) -> Poll<Option<Self::Item>> {
323        self.file_stream_metrics.time_processing.start();
324        let result = self.poll_inner(cx);
325        self.file_stream_metrics.time_processing.stop();
326        self.baseline_metrics.record_poll(result)
327    }
328}
329
330impl RecordBatchStream for FileStream {
331    fn schema(&self) -> SchemaRef {
332        Arc::clone(&self.projected_schema)
333    }
334}
335
336/// A fallible future that resolves to a stream of [`RecordBatch`]
337pub type FileOpenFuture =
338    BoxFuture<'static, Result<BoxStream<'static, Result<RecordBatch>>>>;
339
340/// Describes the behavior of the `FileStream` if file opening or scanning fails
341#[derive(Default)]
342pub enum OnError {
343    /// Fail the entire stream and return the underlying error
344    #[default]
345    Fail,
346    /// Continue scanning, ignoring the failed file
347    Skip,
348}
349
350/// Generic API for opening a file using an [`ObjectStore`] and resolving to a
351/// stream of [`RecordBatch`]
352///
353/// [`ObjectStore`]: object_store::ObjectStore
354pub trait FileOpener: Unpin + Send + Sync {
355    /// Asynchronously open the specified file and return a stream
356    /// of [`RecordBatch`]
357    fn open(&self, partitioned_file: PartitionedFile) -> Result<FileOpenFuture>;
358}
359
360/// Represents the state of the next `FileOpenFuture`. Since we need to poll
361/// this future while scanning the current file, we need to store the result if it
362/// is ready
363pub enum NextOpen {
364    Pending(FileOpenFuture),
365    Ready(Result<BoxStream<'static, Result<RecordBatch>>>),
366}
367
368pub enum FileStreamState {
369    /// The idle state, no file is currently being read
370    Idle,
371    /// Currently performing asynchronous IO to obtain a stream of RecordBatch
372    /// for a given file
373    Open {
374        /// A [`FileOpenFuture`] returned by [`FileOpener::open`]
375        future: FileOpenFuture,
376        /// The partition values for this file
377        partition_values: Vec<ScalarValue>,
378    },
379    /// Scanning the [`BoxStream`] returned by the completion of a [`FileOpenFuture`]
380    /// returned by [`FileOpener::open`]
381    Scan {
382        /// Partitioning column values for the current batch_iter
383        partition_values: Vec<ScalarValue>,
384        /// The reader instance
385        reader: BoxStream<'static, Result<RecordBatch>>,
386        /// A [`FileOpenFuture`] for the next file to be processed,
387        /// and its corresponding partition column values, if any.
388        /// This allows the next file to be opened in parallel while the
389        /// current file is read.
390        next: Option<(NextOpen, Vec<ScalarValue>)>,
391    },
392    /// Encountered an error
393    Error,
394    /// Reached the row limit
395    Limit,
396}
397
398/// A timer that can be started and stopped.
399pub struct StartableTime {
400    pub metrics: Time,
401    // use for record each part cost time, will eventually add into 'metrics'.
402    pub start: Option<Instant>,
403}
404
405impl StartableTime {
406    pub fn start(&mut self) {
407        assert!(self.start.is_none());
408        self.start = Some(Instant::now());
409    }
410
411    pub fn stop(&mut self) {
412        if let Some(start) = self.start.take() {
413            self.metrics.add_elapsed(start);
414        }
415    }
416}
417
418#[allow(rustdoc::broken_intra_doc_links)]
419/// Metrics for [`FileStream`]
420///
421/// Note that all of these metrics are in terms of wall clock time
422/// (not cpu time) so they include time spent waiting on I/O as well
423/// as other operators.
424///
425/// [`FileStream`]: <https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_stream.rs>
426pub struct FileStreamMetrics {
427    /// Wall clock time elapsed for file opening.
428    ///
429    /// Time between when [`FileOpener::open`] is called and when the
430    /// [`FileStream`] receives a stream for reading.
431    ///
432    /// If there are multiple files being scanned, the stream
433    /// will open the next file in the background while scanning the
434    /// current file. This metric will only capture time spent opening
435    /// while not also scanning.
436    /// [`FileStream`]: <https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_stream.rs>
437    pub time_opening: StartableTime,
438    /// Wall clock time elapsed for file scanning + first record batch of decompression + decoding
439    ///
440    /// Time between when the [`FileStream`] requests data from the
441    /// stream and when the first [`RecordBatch`] is produced.
442    /// [`FileStream`]: <https://github.com/apache/datafusion/blob/main/datafusion/datasource/src/file_stream.rs>
443    pub time_scanning_until_data: StartableTime,
444    /// Total elapsed wall clock time for scanning + record batch decompression / decoding
445    ///
446    /// Sum of time between when the [`FileStream`] requests data from
447    /// the stream and when a [`RecordBatch`] is produced for all
448    /// record batches in the stream. Note that this metric also
449    /// includes the time of the parent operator's execution.
450    pub time_scanning_total: StartableTime,
451    /// Wall clock time elapsed for data decompression + decoding
452    ///
453    /// Time spent waiting for the FileStream's input.
454    pub time_processing: StartableTime,
455    /// Count of errors opening file.
456    ///
457    /// If using `OnError::Skip` this will provide a count of the number of files
458    /// which were skipped and will not be included in the scan results.
459    pub file_open_errors: Count,
460    /// Count of errors scanning file
461    ///
462    /// If using `OnError::Skip` this will provide a count of the number of files
463    /// which were skipped and will not be included in the scan results.
464    pub file_scan_errors: Count,
465}
466
467impl FileStreamMetrics {
468    pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
469        let time_opening = StartableTime {
470            metrics: MetricBuilder::new(metrics)
471                .subset_time("time_elapsed_opening", partition),
472            start: None,
473        };
474
475        let time_scanning_until_data = StartableTime {
476            metrics: MetricBuilder::new(metrics)
477                .subset_time("time_elapsed_scanning_until_data", partition),
478            start: None,
479        };
480
481        let time_scanning_total = StartableTime {
482            metrics: MetricBuilder::new(metrics)
483                .subset_time("time_elapsed_scanning_total", partition),
484            start: None,
485        };
486
487        let time_processing = StartableTime {
488            metrics: MetricBuilder::new(metrics)
489                .subset_time("time_elapsed_processing", partition),
490            start: None,
491        };
492
493        let file_open_errors =
494            MetricBuilder::new(metrics).counter("file_open_errors", partition);
495
496        let file_scan_errors =
497            MetricBuilder::new(metrics).counter("file_scan_errors", partition);
498
499        Self {
500            time_opening,
501            time_scanning_until_data,
502            time_scanning_total,
503            time_processing,
504            file_open_errors,
505            file_scan_errors,
506        }
507    }
508}
509
510#[cfg(test)]
511mod tests {
512    use crate::file_scan_config::FileScanConfigBuilder;
513    use crate::tests::make_partition;
514    use crate::PartitionedFile;
515    use datafusion_common::error::Result;
516    use datafusion_execution::object_store::ObjectStoreUrl;
517    use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
518    use futures::{FutureExt as _, StreamExt as _};
519    use std::sync::atomic::{AtomicUsize, Ordering};
520    use std::sync::Arc;
521
522    use crate::file_stream::{FileOpenFuture, FileOpener, FileStream, OnError};
523    use crate::test_util::MockSource;
524    use arrow::array::RecordBatch;
525    use arrow::datatypes::Schema;
526
527    use datafusion_common::{assert_batches_eq, exec_err, internal_err};
528
529    /// Test `FileOpener` which will simulate errors during file opening or scanning
530    #[derive(Default)]
531    struct TestOpener {
532        /// Index in stream of files which should throw an error while opening
533        error_opening_idx: Vec<usize>,
534        /// Index in stream of files which should throw an error while scanning
535        error_scanning_idx: Vec<usize>,
536        /// Index of last file in stream
537        current_idx: AtomicUsize,
538        /// `RecordBatch` to return
539        records: Vec<RecordBatch>,
540    }
541
542    impl FileOpener for TestOpener {
543        fn open(&self, _partitioned_file: PartitionedFile) -> Result<FileOpenFuture> {
544            let idx = self.current_idx.fetch_add(1, Ordering::SeqCst);
545
546            if self.error_opening_idx.contains(&idx) {
547                Ok(futures::future::ready(internal_err!("error opening")).boxed())
548            } else if self.error_scanning_idx.contains(&idx) {
549                let error = futures::future::ready(exec_err!("error scanning"));
550                let stream = futures::stream::once(error).boxed();
551                Ok(futures::future::ready(Ok(stream)).boxed())
552            } else {
553                let iterator = self.records.clone().into_iter().map(Ok);
554                let stream = futures::stream::iter(iterator).boxed();
555                Ok(futures::future::ready(Ok(stream)).boxed())
556            }
557        }
558    }
559
560    #[derive(Default)]
561    struct FileStreamTest {
562        /// Number of files in the stream
563        num_files: usize,
564        /// Global limit of records emitted by the stream
565        limit: Option<usize>,
566        /// Error-handling behavior of the stream
567        on_error: OnError,
568        /// Mock `FileOpener`
569        opener: TestOpener,
570    }
571
572    impl FileStreamTest {
573        pub fn new() -> Self {
574            Self::default()
575        }
576
577        /// Specify the number of files in the stream
578        pub fn with_num_files(mut self, num_files: usize) -> Self {
579            self.num_files = num_files;
580            self
581        }
582
583        /// Specify the limit
584        pub fn with_limit(mut self, limit: Option<usize>) -> Self {
585            self.limit = limit;
586            self
587        }
588
589        /// Specify the index of files in the stream which should
590        /// throw an error when opening
591        pub fn with_open_errors(mut self, idx: Vec<usize>) -> Self {
592            self.opener.error_opening_idx = idx;
593            self
594        }
595
596        /// Specify the index of files in the stream which should
597        /// throw an error when scanning
598        pub fn with_scan_errors(mut self, idx: Vec<usize>) -> Self {
599            self.opener.error_scanning_idx = idx;
600            self
601        }
602
603        /// Specify the behavior of the stream when an error occurs
604        pub fn with_on_error(mut self, on_error: OnError) -> Self {
605            self.on_error = on_error;
606            self
607        }
608
609        /// Specify the record batches that should be returned from each
610        /// file that is successfully scanned
611        pub fn with_records(mut self, records: Vec<RecordBatch>) -> Self {
612            self.opener.records = records;
613            self
614        }
615
616        /// Collect the results of the `FileStream`
617        pub async fn result(self) -> Result<Vec<RecordBatch>> {
618            let file_schema = self
619                .opener
620                .records
621                .first()
622                .map(|batch| batch.schema())
623                .unwrap_or_else(|| Arc::new(Schema::empty()));
624
625            // let ctx = SessionContext::new();
626            let mock_files: Vec<(String, u64)> = (0..self.num_files)
627                .map(|idx| (format!("mock_file{idx}"), 10_u64))
628                .collect();
629
630            // let mock_files_ref: Vec<(&str, u64)> = mock_files
631            //     .iter()
632            //     .map(|(name, size)| (name.as_str(), *size))
633            //     .collect();
634
635            let file_group = mock_files
636                .into_iter()
637                .map(|(name, size)| PartitionedFile::new(name, size))
638                .collect();
639
640            let on_error = self.on_error;
641
642            let config = FileScanConfigBuilder::new(
643                ObjectStoreUrl::parse("test:///").unwrap(),
644                file_schema,
645                Arc::new(MockSource::default()),
646            )
647            .with_file_group(file_group)
648            .with_limit(self.limit)
649            .build();
650            let metrics_set = ExecutionPlanMetricsSet::new();
651            let file_stream =
652                FileStream::new(&config, 0, Arc::new(self.opener), &metrics_set)
653                    .unwrap()
654                    .with_on_error(on_error);
655
656            file_stream
657                .collect::<Vec<_>>()
658                .await
659                .into_iter()
660                .collect::<Result<Vec<_>>>()
661        }
662    }
663
664    /// helper that creates a stream of 2 files with the same pair of batches in each ([0,1,2] and [0,1])
665    async fn create_and_collect(limit: Option<usize>) -> Vec<RecordBatch> {
666        FileStreamTest::new()
667            .with_records(vec![make_partition(3), make_partition(2)])
668            .with_num_files(2)
669            .with_limit(limit)
670            .result()
671            .await
672            .expect("error executing stream")
673    }
674
675    #[tokio::test]
676    async fn on_error_opening() -> Result<()> {
677        let batches = FileStreamTest::new()
678            .with_records(vec![make_partition(3), make_partition(2)])
679            .with_num_files(2)
680            .with_on_error(OnError::Skip)
681            .with_open_errors(vec![0])
682            .result()
683            .await?;
684
685        #[rustfmt::skip]
686        assert_batches_eq!(&[
687            "+---+",
688            "| i |",
689            "+---+",
690            "| 0 |",
691            "| 1 |",
692            "| 2 |",
693            "| 0 |",
694            "| 1 |",
695            "+---+",
696        ], &batches);
697
698        let batches = FileStreamTest::new()
699            .with_records(vec![make_partition(3), make_partition(2)])
700            .with_num_files(2)
701            .with_on_error(OnError::Skip)
702            .with_open_errors(vec![1])
703            .result()
704            .await?;
705
706        #[rustfmt::skip]
707        assert_batches_eq!(&[
708            "+---+",
709            "| i |",
710            "+---+",
711            "| 0 |",
712            "| 1 |",
713            "| 2 |",
714            "| 0 |",
715            "| 1 |",
716            "+---+",
717        ], &batches);
718
719        let batches = FileStreamTest::new()
720            .with_records(vec![make_partition(3), make_partition(2)])
721            .with_num_files(2)
722            .with_on_error(OnError::Skip)
723            .with_open_errors(vec![0, 1])
724            .result()
725            .await?;
726
727        #[rustfmt::skip]
728        assert_batches_eq!(&[
729            "++",
730            "++",
731        ], &batches);
732
733        Ok(())
734    }
735
736    #[tokio::test]
737    async fn on_error_scanning_fail() -> Result<()> {
738        let result = FileStreamTest::new()
739            .with_records(vec![make_partition(3), make_partition(2)])
740            .with_num_files(2)
741            .with_on_error(OnError::Fail)
742            .with_scan_errors(vec![1])
743            .result()
744            .await;
745
746        assert!(result.is_err());
747
748        Ok(())
749    }
750
751    #[tokio::test]
752    async fn on_error_opening_fail() -> Result<()> {
753        let result = FileStreamTest::new()
754            .with_records(vec![make_partition(3), make_partition(2)])
755            .with_num_files(2)
756            .with_on_error(OnError::Fail)
757            .with_open_errors(vec![1])
758            .result()
759            .await;
760
761        assert!(result.is_err());
762
763        Ok(())
764    }
765
766    #[tokio::test]
767    async fn on_error_scanning() -> Result<()> {
768        let batches = FileStreamTest::new()
769            .with_records(vec![make_partition(3), make_partition(2)])
770            .with_num_files(2)
771            .with_on_error(OnError::Skip)
772            .with_scan_errors(vec![0])
773            .result()
774            .await?;
775
776        #[rustfmt::skip]
777        assert_batches_eq!(&[
778            "+---+",
779            "| i |",
780            "+---+",
781            "| 0 |",
782            "| 1 |",
783            "| 2 |",
784            "| 0 |",
785            "| 1 |",
786            "+---+",
787        ], &batches);
788
789        let batches = FileStreamTest::new()
790            .with_records(vec![make_partition(3), make_partition(2)])
791            .with_num_files(2)
792            .with_on_error(OnError::Skip)
793            .with_scan_errors(vec![1])
794            .result()
795            .await?;
796
797        #[rustfmt::skip]
798        assert_batches_eq!(&[
799            "+---+",
800            "| i |",
801            "+---+",
802            "| 0 |",
803            "| 1 |",
804            "| 2 |",
805            "| 0 |",
806            "| 1 |",
807            "+---+",
808        ], &batches);
809
810        let batches = FileStreamTest::new()
811            .with_records(vec![make_partition(3), make_partition(2)])
812            .with_num_files(2)
813            .with_on_error(OnError::Skip)
814            .with_scan_errors(vec![0, 1])
815            .result()
816            .await?;
817
818        #[rustfmt::skip]
819        assert_batches_eq!(&[
820            "++",
821            "++",
822        ], &batches);
823
824        Ok(())
825    }
826
827    #[tokio::test]
828    async fn on_error_mixed() -> Result<()> {
829        let batches = FileStreamTest::new()
830            .with_records(vec![make_partition(3), make_partition(2)])
831            .with_num_files(3)
832            .with_on_error(OnError::Skip)
833            .with_open_errors(vec![1])
834            .with_scan_errors(vec![0])
835            .result()
836            .await?;
837
838        #[rustfmt::skip]
839        assert_batches_eq!(&[
840            "+---+",
841            "| i |",
842            "+---+",
843            "| 0 |",
844            "| 1 |",
845            "| 2 |",
846            "| 0 |",
847            "| 1 |",
848            "+---+",
849        ], &batches);
850
851        let batches = FileStreamTest::new()
852            .with_records(vec![make_partition(3), make_partition(2)])
853            .with_num_files(3)
854            .with_on_error(OnError::Skip)
855            .with_open_errors(vec![0])
856            .with_scan_errors(vec![1])
857            .result()
858            .await?;
859
860        #[rustfmt::skip]
861        assert_batches_eq!(&[
862            "+---+",
863            "| i |",
864            "+---+",
865            "| 0 |",
866            "| 1 |",
867            "| 2 |",
868            "| 0 |",
869            "| 1 |",
870            "+---+",
871        ], &batches);
872
873        let batches = FileStreamTest::new()
874            .with_records(vec![make_partition(3), make_partition(2)])
875            .with_num_files(3)
876            .with_on_error(OnError::Skip)
877            .with_open_errors(vec![2])
878            .with_scan_errors(vec![0, 1])
879            .result()
880            .await?;
881
882        #[rustfmt::skip]
883        assert_batches_eq!(&[
884            "++",
885            "++",
886        ], &batches);
887
888        let batches = FileStreamTest::new()
889            .with_records(vec![make_partition(3), make_partition(2)])
890            .with_num_files(3)
891            .with_on_error(OnError::Skip)
892            .with_open_errors(vec![0, 2])
893            .with_scan_errors(vec![1])
894            .result()
895            .await?;
896
897        #[rustfmt::skip]
898        assert_batches_eq!(&[
899            "++",
900            "++",
901        ], &batches);
902
903        Ok(())
904    }
905
906    #[tokio::test]
907    async fn without_limit() -> Result<()> {
908        let batches = create_and_collect(None).await;
909
910        #[rustfmt::skip]
911        assert_batches_eq!(&[
912            "+---+",
913            "| i |",
914            "+---+",
915            "| 0 |",
916            "| 1 |",
917            "| 2 |",
918            "| 0 |",
919            "| 1 |",
920            "| 0 |",
921            "| 1 |",
922            "| 2 |",
923            "| 0 |",
924            "| 1 |",
925            "+---+",
926        ], &batches);
927
928        Ok(())
929    }
930
931    #[tokio::test]
932    async fn with_limit_between_files() -> Result<()> {
933        let batches = create_and_collect(Some(5)).await;
934        #[rustfmt::skip]
935        assert_batches_eq!(&[
936            "+---+",
937            "| i |",
938            "+---+",
939            "| 0 |",
940            "| 1 |",
941            "| 2 |",
942            "| 0 |",
943            "| 1 |",
944            "+---+",
945        ], &batches);
946
947        Ok(())
948    }
949
950    #[tokio::test]
951    async fn with_limit_at_middle_of_batch() -> Result<()> {
952        let batches = create_and_collect(Some(6)).await;
953        #[rustfmt::skip]
954        assert_batches_eq!(&[
955            "+---+",
956            "| i |",
957            "+---+",
958            "| 0 |",
959            "| 1 |",
960            "| 2 |",
961            "| 0 |",
962            "| 1 |",
963            "| 0 |",
964            "+---+",
965        ], &batches);
966
967        Ok(())
968    }
969}