use crate::options::ListingOptions;
use arrow::datatypes::{DataType, Schema, SchemaRef};
use datafusion_catalog::Session;
use datafusion_common::{config_err, internal_err};
use datafusion_datasource::ListingTableUrl;
use datafusion_datasource::file_compression_type::FileCompressionType;
#[expect(deprecated)]
use datafusion_datasource::schema_adapter::SchemaAdapterFactory;
use datafusion_physical_expr_adapter::PhysicalExprAdapterFactory;
use std::str::FromStr;
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub enum SchemaSource {
#[default]
Unset,
Inferred,
Specified,
}
#[derive(Debug, Clone, Default)]
pub struct ListingTableConfig {
pub table_paths: Vec<ListingTableUrl>,
pub file_schema: Option<SchemaRef>,
pub options: Option<ListingOptions>,
pub(crate) schema_source: SchemaSource,
pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
}
impl ListingTableConfig {
pub fn new(table_path: ListingTableUrl) -> Self {
Self {
table_paths: vec![table_path],
..Default::default()
}
}
pub fn new_with_multi_paths(table_paths: Vec<ListingTableUrl>) -> Self {
Self {
table_paths,
..Default::default()
}
}
pub fn schema_source(&self) -> SchemaSource {
self.schema_source
}
pub fn with_schema(self, schema: SchemaRef) -> Self {
debug_assert!(
self.options.is_some() || cfg!(test),
"ListingTableConfig::with_schema called without options set. \
Consider calling with_listing_options() or infer_options() first to avoid panics in downstream code."
);
Self {
file_schema: Some(schema),
schema_source: SchemaSource::Specified,
..self
}
}
pub fn with_listing_options(self, listing_options: ListingOptions) -> Self {
debug_assert!(
!self.table_paths.is_empty() || cfg!(test),
"ListingTableConfig::with_listing_options called without table_paths set. \
Consider calling new() or new_with_multi_paths() first to establish table paths."
);
Self {
options: Some(listing_options),
..self
}
}
pub fn infer_file_extension_and_compression_type(
path: &str,
) -> datafusion_common::Result<(String, Option<String>)> {
let mut exts = path.rsplit('.');
let split = exts.next().unwrap_or("");
let file_compression_type = FileCompressionType::from_str(split)
.unwrap_or(FileCompressionType::UNCOMPRESSED);
if file_compression_type.is_compressed() {
let split2 = exts.next().unwrap_or("");
Ok((split2.to_string(), Some(split.to_string())))
} else {
Ok((split.to_string(), None))
}
}
pub async fn infer_schema(
self,
state: &dyn Session,
) -> datafusion_common::Result<Self> {
match self.options {
Some(options) => {
let ListingTableConfig {
table_paths,
file_schema,
options: _,
schema_source,
expr_adapter_factory,
} = self;
let (schema, new_schema_source) = match file_schema {
Some(schema) => (schema, schema_source), None => {
if let Some(url) = table_paths.first() {
(
options.infer_schema(state, url).await?,
SchemaSource::Inferred,
)
} else {
(Arc::new(Schema::empty()), SchemaSource::Inferred)
}
}
};
Ok(Self {
table_paths,
file_schema: Some(schema),
options: Some(options),
schema_source: new_schema_source,
expr_adapter_factory,
})
}
None => internal_err!("No `ListingOptions` set for inferring schema"),
}
}
pub async fn infer_partitions_from_path(
self,
state: &dyn Session,
) -> datafusion_common::Result<Self> {
match self.options {
Some(options) => {
let Some(url) = self.table_paths.first() else {
return config_err!("No table path found");
};
let partitions = options
.infer_partitions(state, url)
.await?
.into_iter()
.map(|col_name| {
(
col_name,
DataType::Dictionary(
Box::new(DataType::UInt16),
Box::new(DataType::Utf8),
),
)
})
.collect::<Vec<_>>();
let options = options.with_table_partition_cols(partitions);
Ok(Self {
table_paths: self.table_paths,
file_schema: self.file_schema,
options: Some(options),
schema_source: self.schema_source,
expr_adapter_factory: self.expr_adapter_factory,
})
}
None => config_err!("No `ListingOptions` set for inferring schema"),
}
}
pub fn with_expr_adapter_factory(
self,
expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
) -> Self {
Self {
expr_adapter_factory: Some(expr_adapter_factory),
..self
}
}
#[deprecated(
since = "52.0.0",
note = "SchemaAdapterFactory has been removed. Use with_expr_adapter_factory and PhysicalExprAdapterFactory instead. See upgrading.md for more details."
)]
#[expect(deprecated)]
pub fn with_schema_adapter_factory(
self,
_schema_adapter_factory: Arc<dyn SchemaAdapterFactory>,
) -> Self {
self
}
}