use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
use datafusion::catalog::{CatalogProvider, SchemaProvider};
use futures::future::try_join_all;
use iceberg::{Catalog, NamespaceIdent, Result};
use crate::schema::IcebergSchemaProvider;
#[derive(Debug)]
pub struct IcebergCatalogProvider {
schemas: HashMap<String, Arc<dyn SchemaProvider>>,
}
impl IcebergCatalogProvider {
pub async fn try_new(client: Arc<dyn Catalog>) -> Result<Self> {
let schema_names: Vec<_> = client
.list_namespaces(None)
.await?
.iter()
.flat_map(|ns| ns.as_ref().clone())
.collect();
let providers = try_join_all(
schema_names
.iter()
.map(|name| {
IcebergSchemaProvider::try_new(
client.clone(),
NamespaceIdent::new(name.clone()),
)
})
.collect::<Vec<_>>(),
)
.await?;
let schemas: HashMap<String, Arc<dyn SchemaProvider>> = schema_names
.into_iter()
.zip(providers.into_iter())
.map(|(name, provider)| {
let provider = Arc::new(provider) as Arc<dyn SchemaProvider>;
(name, provider)
})
.collect();
Ok(IcebergCatalogProvider { schemas })
}
}
impl CatalogProvider for IcebergCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.schemas.keys().cloned().collect()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.schemas.get(name).cloned()
}
}