Skip to main content

datafusion_datasource/file_stream/
builder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use crate::file_scan_config::FileScanConfig;
21use crate::file_stream::scan_state::ScanState;
22use crate::file_stream::work_source::{SharedWorkSource, WorkSource};
23use crate::morsel::{FileOpenerMorselizer, Morselizer};
24use datafusion_common::{Result, internal_err};
25use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
26
27use super::metrics::FileStreamMetrics;
28use super::{FileOpener, FileStream, FileStreamState, OnError};
29
30/// Builder for constructing a [`FileStream`].
31pub struct FileStreamBuilder<'a> {
32    config: &'a FileScanConfig,
33    partition: Option<usize>,
34    morselizer: Option<Box<dyn Morselizer>>,
35    metrics: Option<&'a ExecutionPlanMetricsSet>,
36    on_error: OnError,
37    shared_work_source: Option<SharedWorkSource>,
38}
39
40impl<'a> FileStreamBuilder<'a> {
41    /// Create a new builder for [`FileStream`].
42    pub fn new(config: &'a FileScanConfig) -> Self {
43        Self {
44            config,
45            partition: None,
46            morselizer: None,
47            metrics: None,
48            on_error: OnError::Fail,
49            shared_work_source: None,
50        }
51    }
52
53    /// Configure the partition to scan.
54    pub fn with_partition(mut self, partition: usize) -> Self {
55        self.partition = Some(partition);
56        self
57    }
58
59    /// Configure the [`FileOpener`] used to open files.
60    ///
61    /// This will overwrite any setting from [`Self::with_morselizer`]
62    pub fn with_file_opener(mut self, file_opener: Arc<dyn FileOpener>) -> Self {
63        self.morselizer = Some(Box::new(FileOpenerMorselizer::new(file_opener)));
64        self
65    }
66
67    /// Configure the [`Morselizer`] used to open files.
68    ///
69    /// This will overwrite any setting from [`Self::with_file_opener`]
70    pub fn with_morselizer(mut self, morselizer: Box<dyn Morselizer>) -> Self {
71        self.morselizer = Some(morselizer);
72        self
73    }
74
75    /// Configure the metrics set used by the stream.
76    pub fn with_metrics(mut self, metrics: &'a ExecutionPlanMetricsSet) -> Self {
77        self.metrics = Some(metrics);
78        self
79    }
80
81    /// Configure the behavior when opening or scanning a file fails.
82    pub fn with_on_error(mut self, on_error: OnError) -> Self {
83        self.on_error = on_error;
84        self
85    }
86
87    /// Configure the [`SharedWorkSource`] for sibling work stealing.
88    pub(crate) fn with_shared_work_source(
89        mut self,
90        shared_work_source: Option<SharedWorkSource>,
91    ) -> Self {
92        self.shared_work_source = shared_work_source;
93        self
94    }
95
96    /// Build the configured [`FileStream`].
97    pub fn build(self) -> Result<FileStream> {
98        let Self {
99            config,
100            partition,
101            morselizer,
102            metrics,
103            on_error,
104            shared_work_source,
105        } = self;
106
107        let Some(partition) = partition else {
108            return internal_err!("FileStreamBuilder missing required partition");
109        };
110        let Some(morselizer) = morselizer else {
111            return internal_err!("FileStreamBuilder missing required morselizer");
112        };
113        let Some(metrics) = metrics else {
114            return internal_err!("FileStreamBuilder missing required metrics");
115        };
116        let projected_schema = config.projected_schema()?;
117        let Some(file_group) = config.file_groups.get(partition).cloned() else {
118            return internal_err!(
119                "FileStreamBuilder invalid partition index: {partition}"
120            );
121        };
122        let work_source = match shared_work_source {
123            Some(shared) => WorkSource::Shared(shared),
124            None => WorkSource::Local(file_group.into_inner().into()),
125        };
126
127        let file_stream_metrics = FileStreamMetrics::new(metrics, partition);
128        let scan_state = Box::new(ScanState::new(
129            work_source,
130            config.limit,
131            morselizer,
132            on_error,
133            file_stream_metrics,
134        ));
135
136        Ok(FileStream {
137            projected_schema,
138            state: FileStreamState::Scan { scan_state },
139            baseline_metrics: BaselineMetrics::new(metrics, partition),
140        })
141    }
142}