datafusion_datasource/file_stream/
builder.rs1use std::sync::Arc;
19
20use crate::file_scan_config::FileScanConfig;
21use crate::file_stream::scan_state::ScanState;
22use crate::file_stream::work_source::{SharedWorkSource, WorkSource};
23use crate::morsel::{FileOpenerMorselizer, Morselizer};
24use datafusion_common::{Result, internal_err};
25use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
26
27use super::metrics::FileStreamMetrics;
28use super::{FileOpener, FileStream, FileStreamState, OnError};
29
30pub struct FileStreamBuilder<'a> {
32 config: &'a FileScanConfig,
33 partition: Option<usize>,
34 morselizer: Option<Box<dyn Morselizer>>,
35 metrics: Option<&'a ExecutionPlanMetricsSet>,
36 on_error: OnError,
37 shared_work_source: Option<SharedWorkSource>,
38}
39
40impl<'a> FileStreamBuilder<'a> {
41 pub fn new(config: &'a FileScanConfig) -> Self {
43 Self {
44 config,
45 partition: None,
46 morselizer: None,
47 metrics: None,
48 on_error: OnError::Fail,
49 shared_work_source: None,
50 }
51 }
52
53 pub fn with_partition(mut self, partition: usize) -> Self {
55 self.partition = Some(partition);
56 self
57 }
58
59 pub fn with_file_opener(mut self, file_opener: Arc<dyn FileOpener>) -> Self {
63 self.morselizer = Some(Box::new(FileOpenerMorselizer::new(file_opener)));
64 self
65 }
66
67 pub fn with_morselizer(mut self, morselizer: Box<dyn Morselizer>) -> Self {
71 self.morselizer = Some(morselizer);
72 self
73 }
74
75 pub fn with_metrics(mut self, metrics: &'a ExecutionPlanMetricsSet) -> Self {
77 self.metrics = Some(metrics);
78 self
79 }
80
81 pub fn with_on_error(mut self, on_error: OnError) -> Self {
83 self.on_error = on_error;
84 self
85 }
86
87 pub(crate) fn with_shared_work_source(
89 mut self,
90 shared_work_source: Option<SharedWorkSource>,
91 ) -> Self {
92 self.shared_work_source = shared_work_source;
93 self
94 }
95
96 pub fn build(self) -> Result<FileStream> {
98 let Self {
99 config,
100 partition,
101 morselizer,
102 metrics,
103 on_error,
104 shared_work_source,
105 } = self;
106
107 let Some(partition) = partition else {
108 return internal_err!("FileStreamBuilder missing required partition");
109 };
110 let Some(morselizer) = morselizer else {
111 return internal_err!("FileStreamBuilder missing required morselizer");
112 };
113 let Some(metrics) = metrics else {
114 return internal_err!("FileStreamBuilder missing required metrics");
115 };
116 let projected_schema = config.projected_schema()?;
117 let Some(file_group) = config.file_groups.get(partition).cloned() else {
118 return internal_err!(
119 "FileStreamBuilder invalid partition index: {partition}"
120 );
121 };
122 let work_source = match shared_work_source {
123 Some(shared) => WorkSource::Shared(shared),
124 None => WorkSource::Local(file_group.into_inner().into()),
125 };
126
127 let file_stream_metrics = FileStreamMetrics::new(metrics, partition);
128 let scan_state = Box::new(ScanState::new(
129 work_source,
130 config.limit,
131 morselizer,
132 on_error,
133 file_stream_metrics,
134 ));
135
136 Ok(FileStream {
137 projected_schema,
138 state: FileStreamState::Scan { scan_state },
139 baseline_metrics: BaselineMetrics::new(metrics, partition),
140 })
141 }
142}