use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use crate::file::FileSource;
use crate::file_compression_type::FileCompressionType;
use crate::file_scan_config::FileScanConfig;
use crate::file_sink_config::FileSinkConfig;
use arrow::datatypes::SchemaRef;
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{GetExt, Result, Statistics, internal_err, not_impl_err};
use datafusion_physical_expr::LexRequirement;
use datafusion_physical_expr_common::sort_expr::LexOrdering;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_session::Session;
use async_trait::async_trait;
use object_store::{ObjectMeta, ObjectStore};
pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
#[derive(Debug, Clone)]
#[non_exhaustive]
pub struct FileMeta {
pub statistics: Statistics,
pub ordering: Option<LexOrdering>,
}
impl FileMeta {
pub fn new(statistics: Statistics) -> Self {
Self {
statistics,
ordering: None,
}
}
pub fn with_ordering(mut self, ordering: Option<LexOrdering>) -> Self {
self.ordering = ordering;
self
}
}
#[async_trait]
pub trait FileFormat: Send + Sync + fmt::Debug {
fn as_any(&self) -> &dyn Any;
fn get_ext(&self) -> String;
fn get_ext_with_compression(
&self,
_file_compression_type: &FileCompressionType,
) -> Result<String>;
fn compression_type(&self) -> Option<FileCompressionType>;
async fn infer_schema(
&self,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef>;
async fn infer_stats(
&self,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<Statistics>;
async fn infer_ordering(
&self,
_state: &dyn Session,
_store: &Arc<dyn ObjectStore>,
_table_schema: SchemaRef,
_object: &ObjectMeta,
) -> Result<Option<LexOrdering>> {
Ok(None)
}
async fn infer_stats_and_ordering(
&self,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<FileMeta> {
let statistics = self
.infer_stats(state, store, Arc::clone(&table_schema), object)
.await?;
let ordering = self
.infer_ordering(state, store, table_schema, object)
.await?;
Ok(FileMeta {
statistics,
ordering,
})
}
async fn create_physical_plan(
&self,
state: &dyn Session,
conf: FileScanConfig,
) -> Result<Arc<dyn ExecutionPlan>>;
async fn create_writer_physical_plan(
&self,
_input: Arc<dyn ExecutionPlan>,
_state: &dyn Session,
_conf: FileSinkConfig,
_order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writer not implemented for this format")
}
fn file_source(&self, table_schema: crate::TableSchema) -> Arc<dyn FileSource>;
}
pub trait FileFormatFactory: Sync + Send + GetExt + fmt::Debug {
fn create(
&self,
state: &dyn Session,
format_options: &HashMap<String, String>,
) -> Result<Arc<dyn FileFormat>>;
fn default(&self) -> Arc<dyn FileFormat>;
fn as_any(&self) -> &dyn Any;
}
#[derive(Debug)]
pub struct DefaultFileType {
file_format_factory: Arc<dyn FileFormatFactory>,
}
impl DefaultFileType {
pub fn new(file_format_factory: Arc<dyn FileFormatFactory>) -> Self {
Self {
file_format_factory,
}
}
pub fn as_format_factory(&self) -> &Arc<dyn FileFormatFactory> {
&self.file_format_factory
}
}
impl FileType for DefaultFileType {
fn as_any(&self) -> &dyn Any {
self
}
}
impl fmt::Display for DefaultFileType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.file_format_factory)
}
}
impl GetExt for DefaultFileType {
fn get_ext(&self) -> String {
self.file_format_factory.get_ext()
}
}
pub fn format_as_file_type(
file_format_factory: Arc<dyn FileFormatFactory>,
) -> Arc<dyn FileType> {
Arc::new(DefaultFileType {
file_format_factory,
})
}
pub fn file_type_to_format(
file_type: &Arc<dyn FileType>,
) -> Result<Arc<dyn FileFormatFactory>> {
match file_type
.as_ref()
.as_any()
.downcast_ref::<DefaultFileType>()
{
Some(source) => Ok(Arc::clone(&source.file_format_factory)),
_ => internal_err!("FileType was not DefaultFileType"),
}
}