use std::sync::Arc;
use crate::datasource::TableProvider;
use crate::datasource::listing::ListingTableConfigExt;
use crate::datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl};
use crate::error::Result;
use crate::execution::context::SessionState;
use datafusion_catalog::UrlTableFactory;
use datafusion_common::plan_datafusion_err;
use datafusion_session::SessionStore;
use async_trait::async_trait;
#[derive(Default, Debug)]
pub struct DynamicListTableFactory {
session_store: SessionStore,
}
impl DynamicListTableFactory {
pub fn new(session_store: SessionStore) -> Self {
Self { session_store }
}
pub fn session_store(&self) -> &SessionStore {
&self.session_store
}
}
#[async_trait]
impl UrlTableFactory for DynamicListTableFactory {
async fn try_new(&self, url: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let Ok(table_url) = ListingTableUrl::parse(url) else {
return Ok(None);
};
let state = &self
.session_store()
.get_session()
.upgrade()
.and_then(|session| {
session
.read()
.as_any()
.downcast_ref::<SessionState>()
.cloned()
})
.ok_or_else(|| plan_datafusion_err!("get current SessionStore error"))?;
match ListingTableConfig::new(table_url.clone())
.infer_options(state)
.await
{
Ok(cfg) => {
let cfg = cfg
.infer_partitions_from_path(state)
.await?
.infer_schema(state)
.await?;
ListingTable::try_new(cfg)
.map(|table| Some(Arc::new(table) as Arc<dyn TableProvider>))
}
Err(_) => Ok(None),
}
}
}