use std::sync::Arc;
use crate::file_scan_config::FileScanConfig;
use crate::file_stream::scan_state::ScanState;
use crate::file_stream::work_source::{SharedWorkSource, WorkSource};
use crate::morsel::{FileOpenerMorselizer, Morselizer};
use datafusion_common::{Result, internal_err};
use datafusion_physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet};
use super::metrics::FileStreamMetrics;
use super::{FileOpener, FileStream, FileStreamState, OnError};
pub struct FileStreamBuilder<'a> {
config: &'a FileScanConfig,
partition: Option<usize>,
morselizer: Option<Box<dyn Morselizer>>,
metrics: Option<&'a ExecutionPlanMetricsSet>,
on_error: OnError,
shared_work_source: Option<SharedWorkSource>,
}
impl<'a> FileStreamBuilder<'a> {
pub fn new(config: &'a FileScanConfig) -> Self {
Self {
config,
partition: None,
morselizer: None,
metrics: None,
on_error: OnError::Fail,
shared_work_source: None,
}
}
pub fn with_partition(mut self, partition: usize) -> Self {
self.partition = Some(partition);
self
}
pub fn with_file_opener(mut self, file_opener: Arc<dyn FileOpener>) -> Self {
self.morselizer = Some(Box::new(FileOpenerMorselizer::new(file_opener)));
self
}
pub fn with_morselizer(mut self, morselizer: Box<dyn Morselizer>) -> Self {
self.morselizer = Some(morselizer);
self
}
pub fn with_metrics(mut self, metrics: &'a ExecutionPlanMetricsSet) -> Self {
self.metrics = Some(metrics);
self
}
pub fn with_on_error(mut self, on_error: OnError) -> Self {
self.on_error = on_error;
self
}
pub(crate) fn with_shared_work_source(
mut self,
shared_work_source: Option<SharedWorkSource>,
) -> Self {
self.shared_work_source = shared_work_source;
self
}
pub fn build(self) -> Result<FileStream> {
let Self {
config,
partition,
morselizer,
metrics,
on_error,
shared_work_source,
} = self;
let Some(partition) = partition else {
return internal_err!("FileStreamBuilder missing required partition");
};
let Some(morselizer) = morselizer else {
return internal_err!("FileStreamBuilder missing required morselizer");
};
let Some(metrics) = metrics else {
return internal_err!("FileStreamBuilder missing required metrics");
};
let projected_schema = config.projected_schema()?;
let Some(file_group) = config.file_groups.get(partition).cloned() else {
return internal_err!(
"FileStreamBuilder invalid partition index: {partition}"
);
};
let work_source = match shared_work_source {
Some(shared) => WorkSource::Shared(shared),
None => WorkSource::Local(file_group.into_inner().into()),
};
let file_stream_metrics = FileStreamMetrics::new(metrics, partition);
let scan_state = Box::new(ScanState::new(
work_source,
config.limit,
morselizer,
on_error,
file_stream_metrics,
));
Ok(FileStream {
projected_schema,
state: FileStreamState::Scan { scan_state },
baseline_metrics: BaselineMetrics::new(metrics, partition),
})
}
}