datafusion_datasource/
file.rs1use std::any::Any;
21use std::fmt;
22use std::fmt::Formatter;
23use std::sync::Arc;
24
25use crate::file_groups::FileGroupPartitioner;
26use crate::file_scan_config::FileScanConfig;
27use crate::file_stream::FileOpener;
28use arrow::datatypes::SchemaRef;
29use datafusion_common::Statistics;
30use datafusion_physical_expr::LexOrdering;
31use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
32use datafusion_physical_plan::DisplayFormatType;
33
34use object_store::ObjectStore;
35
36pub trait FileSource: Send + Sync {
40 fn create_file_opener(
42 &self,
43 object_store: Arc<dyn ObjectStore>,
44 base_config: &FileScanConfig,
45 partition: usize,
46 ) -> Arc<dyn FileOpener>;
47 fn as_any(&self) -> &dyn Any;
49 fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource>;
51 fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource>;
53 fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource>;
55 fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource>;
57 fn metrics(&self) -> &ExecutionPlanMetricsSet;
59 fn statistics(&self) -> datafusion_common::Result<Statistics>;
61 fn file_type(&self) -> &str;
63 fn fmt_extra(&self, _t: DisplayFormatType, _f: &mut Formatter) -> fmt::Result {
65 Ok(())
66 }
67
68 fn repartitioned(
73 &self,
74 target_partitions: usize,
75 repartition_file_min_size: usize,
76 output_ordering: Option<LexOrdering>,
77 config: &FileScanConfig,
78 ) -> datafusion_common::Result<Option<FileScanConfig>> {
79 if config.file_compression_type.is_compressed() || config.new_lines_in_values {
80 return Ok(None);
81 }
82
83 let repartitioned_file_groups_option = FileGroupPartitioner::new()
84 .with_target_partitions(target_partitions)
85 .with_repartition_file_min_size(repartition_file_min_size)
86 .with_preserve_order_within_groups(output_ordering.is_some())
87 .repartition_file_groups(&config.file_groups);
88
89 if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
90 let mut source = config.clone();
91 source.file_groups = repartitioned_file_groups;
92 return Ok(Some(source));
93 }
94 Ok(None)
95 }
96}