use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
use async_trait::async_trait;
use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
#[derive(Debug)]
pub struct DynamicFileCatalog {
inner: Arc<dyn CatalogProviderList>,
factory: Arc<dyn UrlTableFactory>,
}
impl DynamicFileCatalog {
pub fn new(
inner: Arc<dyn CatalogProviderList>,
factory: Arc<dyn UrlTableFactory>,
) -> Self {
Self { inner, factory }
}
}
impl CatalogProviderList for DynamicFileCatalog {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
self.inner.register_catalog(name, catalog)
}
fn catalog_names(&self) -> Vec<String> {
self.inner.catalog_names()
}
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
self.inner.catalog(name).map(|catalog| {
Arc::new(DynamicFileCatalogProvider::new(
catalog,
Arc::clone(&self.factory),
)) as _
})
}
}
#[derive(Debug)]
struct DynamicFileCatalogProvider {
inner: Arc<dyn CatalogProvider>,
factory: Arc<dyn UrlTableFactory>,
}
impl DynamicFileCatalogProvider {
pub fn new(
inner: Arc<dyn CatalogProvider>,
factory: Arc<dyn UrlTableFactory>,
) -> Self {
Self { inner, factory }
}
}
impl CatalogProvider for DynamicFileCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.inner.schema_names()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.inner.schema(name).map(|schema| {
Arc::new(DynamicFileSchemaProvider::new(
schema,
Arc::clone(&self.factory),
)) as _
})
}
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
self.inner.register_schema(name, schema)
}
}
#[derive(Debug)]
pub struct DynamicFileSchemaProvider {
inner: Arc<dyn SchemaProvider>,
factory: Arc<dyn UrlTableFactory>,
}
impl DynamicFileSchemaProvider {
pub fn new(
inner: Arc<dyn SchemaProvider>,
factory: Arc<dyn UrlTableFactory>,
) -> Self {
Self { inner, factory }
}
}
#[async_trait]
impl SchemaProvider for DynamicFileSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
self.inner.table_names()
}
async fn table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
if let Some(table) = self.inner.table(name).await? {
return Ok(Some(table));
};
self.factory.try_new(name).await
}
fn register_table(
&self,
name: String,
table: Arc<dyn TableProvider>,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
self.inner.register_table(name, table)
}
fn deregister_table(
&self,
name: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
self.inner.deregister_table(name)
}
fn table_exist(&self, name: &str) -> bool {
self.inner.table_exist(name)
}
}
#[async_trait]
pub trait UrlTableFactory: Debug + Sync + Send {
async fn try_new(
&self,
url: &str,
) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>>;
}