use std::any::Any;
use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;
use crate::file_groups::FileGroupPartitioner;
use crate::file_scan_config::FileScanConfig;
use crate::file_stream::FileOpener;
use crate::schema_adapter::SchemaAdapterFactory;
use arrow::datatypes::SchemaRef;
use datafusion_common::config::ConfigOptions;
use datafusion_common::{not_impl_err, Result, Statistics};
use datafusion_physical_expr::{LexOrdering, PhysicalExpr};
use datafusion_physical_plan::filter_pushdown::FilterPushdownPropagation;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;
use object_store::ObjectStore;
pub fn as_file_source<T: FileSource + 'static>(source: T) -> Arc<dyn FileSource> {
Arc::new(source)
}
pub trait FileSource: Send + Sync {
fn create_file_opener(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> Arc<dyn FileOpener>;
fn as_any(&self) -> &dyn Any;
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource>;
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>;
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>;
fn metrics(&self) -> &ExecutionPlanMetricsSet;
fn statistics(&self) -> Result<Statistics>;
fn file_type(&self) -> &str;
fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result {
Ok(())
}
fn repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
config: &FileScanConfig,
) -> Result<Option<FileScanConfig>> {
if config.file_compression_type.is_compressed() || config.new_lines_in_values {
return Ok(None);
}
let repartitioned_file_groups_option = FileGroupPartitioner::new()
.with_target_partitions(target_partitions)
.with_repartition_file_min_size(repartition_file_min_size)
.with_preserve_order_within_groups(output_ordering.is_some())
.repartition_file_groups(&config.file_groups);
if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
let mut source = config.clone();
source.file_groups = repartitioned_file_groups;
return Ok(Some(source));
}
Ok(None)
}
fn try_pushdown_filters(
&self,
filters: Vec<Arc<dyn PhysicalExpr>>,
_config: &ConfigOptions,
) -> Result<FilterPushdownPropagation<Arc<dyn FileSource>>> {
Ok(FilterPushdownPropagation::unsupported(filters))
}
fn with_schema_adapter_factory(
&self,
_factory: Arc<dyn SchemaAdapterFactory>,
) -> Result<Arc<dyn FileSource>> {
not_impl_err!(
"FileSource {} does not support schema adapter factory",
self.file_type()
)
}
fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
None
}
}