use std::any::Any;
use std::collections::HashMap;
use std::fmt::Debug;
use std::sync::Arc;
use async_trait::async_trait;
use datafusion::catalog::{CatalogProvider, SchemaProvider};
use datafusion::datasource::TableProvider;
use datafusion::error::Result as DFResult;
use paimon::catalog::{Catalog, Identifier};
use crate::error::to_datafusion_error;
use crate::runtime::{await_with_runtime, block_on_with_runtime};
use crate::system_tables;
use crate::table::PaimonTableProvider;
pub struct PaimonCatalogProvider {
catalog: Arc<dyn Catalog>,
}
impl Debug for PaimonCatalogProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PaimonCatalogProvider").finish()
}
}
impl PaimonCatalogProvider {
pub fn new(catalog: Arc<dyn Catalog>) -> Self {
PaimonCatalogProvider { catalog }
}
}
impl CatalogProvider for PaimonCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
let catalog = Arc::clone(&self.catalog);
block_on_with_runtime(
async move { catalog.list_databases().await.unwrap_or_default() },
"paimon catalog access thread panicked",
)
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
let catalog = Arc::clone(&self.catalog);
let name = name.to_string();
block_on_with_runtime(
async move {
match catalog.get_database(&name).await {
Ok(_) => Some(
Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog), name))
as Arc<dyn SchemaProvider>,
),
Err(paimon::Error::DatabaseNotExist { .. }) => None,
Err(_) => None,
}
},
"paimon catalog access thread panicked",
)
}
fn register_schema(
&self,
name: &str,
_schema: Arc<dyn SchemaProvider>,
) -> DFResult<Option<Arc<dyn SchemaProvider>>> {
let catalog = Arc::clone(&self.catalog);
let name = name.to_string();
block_on_with_runtime(
async move {
catalog
.create_database(&name, false, HashMap::new())
.await
.map_err(to_datafusion_error)?;
Ok(Some(
Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog), name))
as Arc<dyn SchemaProvider>,
))
},
"paimon catalog access thread panicked",
)
}
fn deregister_schema(
&self,
name: &str,
cascade: bool,
) -> DFResult<Option<Arc<dyn SchemaProvider>>> {
let catalog = Arc::clone(&self.catalog);
let name = name.to_string();
block_on_with_runtime(
async move {
catalog
.drop_database(&name, false, cascade)
.await
.map_err(to_datafusion_error)?;
Ok(Some(
Arc::new(PaimonSchemaProvider::new(Arc::clone(&catalog), name))
as Arc<dyn SchemaProvider>,
))
},
"paimon catalog access thread panicked",
)
}
}
pub struct PaimonSchemaProvider {
catalog: Arc<dyn Catalog>,
database: String,
}
impl Debug for PaimonSchemaProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PaimonSchemaProvider")
.field("database", &self.database)
.finish()
}
}
impl PaimonSchemaProvider {
pub fn new(catalog: Arc<dyn Catalog>, database: String) -> Self {
PaimonSchemaProvider { catalog, database }
}
}
#[async_trait]
impl SchemaProvider for PaimonSchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
let catalog = Arc::clone(&self.catalog);
let database = self.database.clone();
block_on_with_runtime(
async move { catalog.list_tables(&database).await.unwrap_or_default() },
"paimon catalog access thread panicked",
)
}
async fn table(&self, name: &str) -> DFResult<Option<Arc<dyn TableProvider>>> {
let (base, system_name) = system_tables::split_object_name(name);
if let Some(system_name) = system_name {
return await_with_runtime(system_tables::load(
Arc::clone(&self.catalog),
self.database.clone(),
base.to_string(),
system_name.to_string(),
))
.await;
}
let catalog = Arc::clone(&self.catalog);
let identifier = Identifier::new(self.database.clone(), base);
await_with_runtime(async move {
match catalog.get_table(&identifier).await {
Ok(table) => {
let provider = PaimonTableProvider::try_new(table)?;
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
}
Err(paimon::Error::TableNotExist { .. }) => Ok(None),
Err(e) => Err(to_datafusion_error(e)),
}
})
.await
}
fn table_exist(&self, name: &str) -> bool {
let (base, system_name) = system_tables::split_object_name(name);
if let Some(system_name) = system_name {
if !system_tables::is_registered(system_name) {
return false;
}
}
let catalog = Arc::clone(&self.catalog);
let identifier = Identifier::new(self.database.clone(), base.to_string());
block_on_with_runtime(
async move {
match catalog.get_table(&identifier).await {
Ok(_) => true,
Err(paimon::Error::TableNotExist { .. }) => false,
Err(_) => false,
}
},
"paimon catalog access thread panicked",
)
}
fn register_table(
&self,
_name: String,
table: Arc<dyn TableProvider>,
) -> DFResult<Option<Arc<dyn TableProvider>>> {
Ok(Some(table))
}
fn deregister_table(&self, name: &str) -> DFResult<Option<Arc<dyn TableProvider>>> {
let catalog = Arc::clone(&self.catalog);
let identifier = Identifier::new(self.database.clone(), name);
block_on_with_runtime(
async move {
let table = match catalog.get_table(&identifier).await {
Ok(t) => t,
Err(paimon::Error::TableNotExist { .. }) => return Ok(None),
Err(e) => return Err(to_datafusion_error(e)),
};
let provider = PaimonTableProvider::try_new(table)?;
catalog
.drop_table(&identifier, false)
.await
.map_err(to_datafusion_error)?;
Ok(Some(Arc::new(provider) as Arc<dyn TableProvider>))
},
"paimon catalog access thread panicked",
)
}
}