use std::any::Any;
use std::collections::HashSet;
use std::sync::Arc;
use dashmap::DashMap;
use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider};
use datafusion::error::Result;
#[allow(unused_imports)]
use crate::SessionBuilder;
use crate::namespace_level::NamespaceLevel;
use crate::schema::LanceSchemaProvider;
#[derive(Debug, Clone)]
pub struct LanceCatalogProviderList {
#[allow(dead_code)]
ns_level: NamespaceLevel,
catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
}
impl LanceCatalogProviderList {
pub async fn try_new(namespace: NamespaceLevel) -> Result<Self> {
let catalogs = DashMap::new();
for child_namespace in namespace.children().await? {
let catalog_name = child_namespace.name().to_string();
let catalog_provider = Arc::new(LanceCatalogProvider::try_new(child_namespace).await?);
catalogs.insert(catalog_name, catalog_provider as Arc<dyn CatalogProvider>);
}
Ok(Self {
ns_level: namespace,
catalogs,
})
}
}
impl CatalogProviderList for LanceCatalogProviderList {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
self.catalogs.insert(name, catalog)
}
fn catalog_names(&self) -> Vec<String> {
self.catalogs
.iter()
.map(|entry| entry.key().clone())
.collect::<HashSet<_>>()
.into_iter()
.collect()
}
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
self.catalogs
.get(name)
.map(|entry| Arc::clone(entry.value()))
}
}
#[derive(Debug, Clone)]
pub struct LanceCatalogProvider {
#[allow(dead_code)]
ns_level: NamespaceLevel,
schemas: DashMap<String, Arc<dyn SchemaProvider>>,
}
impl LanceCatalogProvider {
pub async fn try_new(namespace: NamespaceLevel) -> Result<Self> {
let schemas = DashMap::new();
for child_namespace in namespace.children().await? {
let schema_name = child_namespace.name().to_string();
let schema_provider = Arc::new(LanceSchemaProvider::try_new(child_namespace).await?);
schemas.insert(schema_name, schema_provider as Arc<dyn SchemaProvider>);
}
Ok(Self {
ns_level: namespace,
schemas,
})
}
}
impl CatalogProvider for LanceCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.schemas
.iter()
.map(|entry| entry.key().clone())
.collect::<HashSet<_>>()
.into_iter()
.collect()
}
fn schema(&self, schema_name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.schemas
.get(schema_name)
.map(|entry| Arc::clone(entry.value()))
}
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
Ok(self.schemas.insert(name.to_string(), schema))
}
}