use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use crate::file::FileSource;
use crate::file_compression_type::FileCompressionType;
use crate::file_scan_config::FileScanConfig;
use crate::file_sink_config::FileSinkConfig;
use arrow::datatypes::SchemaRef;
use datafusion_common::file_options::file_type::FileType;
use datafusion_common::{internal_err, not_impl_err, GetExt, Result, Statistics};
use datafusion_physical_expr::LexRequirement;
use datafusion_physical_plan::ExecutionPlan;
use datafusion_session::Session;
use async_trait::async_trait;
use object_store::{ObjectMeta, ObjectStore};
pub const DEFAULT_SCHEMA_INFER_MAX_RECORD: usize = 1000;
#[async_trait]
pub trait FileFormat: Send + Sync + fmt::Debug {
fn as_any(&self) -> &dyn Any;
fn get_ext(&self) -> String;
fn get_ext_with_compression(
&self,
_file_compression_type: &FileCompressionType,
) -> Result<String>;
async fn infer_schema(
&self,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
objects: &[ObjectMeta],
) -> Result<SchemaRef>;
async fn infer_stats(
&self,
state: &dyn Session,
store: &Arc<dyn ObjectStore>,
table_schema: SchemaRef,
object: &ObjectMeta,
) -> Result<Statistics>;
async fn create_physical_plan(
&self,
state: &dyn Session,
conf: FileScanConfig,
) -> Result<Arc<dyn ExecutionPlan>>;
async fn create_writer_physical_plan(
&self,
_input: Arc<dyn ExecutionPlan>,
_state: &dyn Session,
_conf: FileSinkConfig,
_order_requirements: Option<LexRequirement>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Writer not implemented for this format")
}
fn file_source(&self) -> Arc<dyn FileSource>;
}
pub trait FileFormatFactory: Sync + Send + GetExt + fmt::Debug {
fn create(
&self,
state: &dyn Session,
format_options: &HashMap<String, String>,
) -> Result<Arc<dyn FileFormat>>;
fn default(&self) -> Arc<dyn FileFormat>;
fn as_any(&self) -> &dyn Any;
}
#[derive(Debug)]
pub struct DefaultFileType {
file_format_factory: Arc<dyn FileFormatFactory>,
}
impl DefaultFileType {
pub fn new(file_format_factory: Arc<dyn FileFormatFactory>) -> Self {
Self {
file_format_factory,
}
}
pub fn as_format_factory(&self) -> &Arc<dyn FileFormatFactory> {
&self.file_format_factory
}
}
impl FileType for DefaultFileType {
fn as_any(&self) -> &dyn Any {
self
}
}
impl fmt::Display for DefaultFileType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self.file_format_factory)
}
}
impl GetExt for DefaultFileType {
fn get_ext(&self) -> String {
self.file_format_factory.get_ext()
}
}
pub fn format_as_file_type(
file_format_factory: Arc<dyn FileFormatFactory>,
) -> Arc<dyn FileType> {
Arc::new(DefaultFileType {
file_format_factory,
})
}
pub fn file_type_to_format(
file_type: &Arc<dyn FileType>,
) -> Result<Arc<dyn FileFormatFactory>> {
match file_type
.as_ref()
.as_any()
.downcast_ref::<DefaultFileType>()
{
Some(source) => Ok(Arc::clone(&source.file_format_factory)),
_ => internal_err!("FileType was not DefaultFileType"),
}
}