use std::any::Any;
use std::fmt;
use std::fmt::Formatter;
use std::sync::Arc;
use crate::file_groups::FileGroupPartitioner;
use crate::file_scan_config::FileScanConfig;
use crate::file_stream::FileOpener;
use arrow::datatypes::SchemaRef;
use datafusion_common::Statistics;
use datafusion_physical_expr::LexOrdering;
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
use datafusion_physical_plan::DisplayFormatType;
use object_store::ObjectStore;
pub trait FileSource: Send + Sync {
fn create_file_opener(
&self,
object_store: Arc<dyn ObjectStore>,
base_config: &FileScanConfig,
partition: usize,
) -> Arc<dyn FileOpener>;
fn as_any(&self) -> &dyn Any;
fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource>;
fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>;
fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>;
fn metrics(&self) -> &ExecutionPlanMetricsSet;
fn statistics(&self) -> datafusion_common::Result<Statistics>;
fn file_type(&self) -> &str;
fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result {
Ok(())
}
fn repartitioned(
&self,
target_partitions: usize,
repartition_file_min_size: usize,
output_ordering: Option<LexOrdering>,
config: &FileScanConfig,
) -> datafusion_common::Result<Option<FileScanConfig>> {
if config.file_compression_type.is_compressed() || config.new_lines_in_values {
return Ok(None);
}
let repartitioned_file_groups_option = FileGroupPartitioner::new()
.with_target_partitions(target_partitions)
.with_repartition_file_min_size(repartition_file_min_size)
.with_preserve_order_within_groups(output_ordering.is_some())
.repartition_file_groups(&config.file_groups);
if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
let mut source = config.clone();
source.file_groups = repartitioned_file_groups;
return Ok(Some(source));
}
Ok(None)
}
}