pub mod information_schema;
pub mod listing_schema;
pub mod schema;
pub use datafusion_sql::{ResolvedTableReference, TableReference};
use crate::catalog::schema::SchemaProvider;
use dashmap::DashMap;
use datafusion_common::{exec_err, not_impl_err, DataFusionError, Result};
use std::any::Any;
use std::sync::Arc;
pub trait CatalogList: Sync + Send {
fn as_any(&self) -> &dyn Any;
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>>;
fn catalog_names(&self) -> Vec<String>;
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>>;
}
pub struct MemoryCatalogList {
pub catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
}
impl MemoryCatalogList {
pub fn new() -> Self {
Self {
catalogs: DashMap::new(),
}
}
}
impl Default for MemoryCatalogList {
fn default() -> Self {
Self::new()
}
}
impl CatalogList for MemoryCatalogList {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
self.catalogs.insert(name, catalog)
}
fn catalog_names(&self) -> Vec<String> {
self.catalogs.iter().map(|c| c.key().clone()).collect()
}
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
self.catalogs.get(name).map(|c| c.value().clone())
}
}
pub trait CatalogProvider: Sync + Send {
fn as_any(&self) -> &dyn Any;
fn schema_names(&self) -> Vec<String>;
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
let _ = name;
let _ = schema;
not_impl_err!("Registering new schemas is not supported")
}
fn deregister_schema(
&self,
_name: &str,
_cascade: bool,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
not_impl_err!("Deregistering new schemas is not supported")
}
}
pub struct MemoryCatalogProvider {
schemas: DashMap<String, Arc<dyn SchemaProvider>>,
}
impl MemoryCatalogProvider {
pub fn new() -> Self {
Self {
schemas: DashMap::new(),
}
}
}
impl Default for MemoryCatalogProvider {
fn default() -> Self {
Self::new()
}
}
impl CatalogProvider for MemoryCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
self.schemas.iter().map(|s| s.key().clone()).collect()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
self.schemas.get(name).map(|s| s.value().clone())
}
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
Ok(self.schemas.insert(name.into(), schema))
}
fn deregister_schema(
&self,
name: &str,
cascade: bool,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
if let Some(schema) = self.schema(name) {
let table_names = schema.table_names();
match (table_names.is_empty(), cascade) {
(true, _) | (false, true) => {
let (_, removed) = self.schemas.remove(name).unwrap();
Ok(Some(removed))
}
(false, false) => exec_err!(
"Cannot drop schema {} because other tables depend on it: {}",
name,
itertools::join(table_names.iter(), ", ")
),
}
} else {
Ok(None)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::catalog::schema::MemorySchemaProvider;
use crate::datasource::empty::EmptyTable;
use crate::datasource::TableProvider;
use arrow::datatypes::Schema;
#[test]
fn default_register_schema_not_supported() {
struct TestProvider {}
impl CatalogProvider for TestProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
unimplemented!()
}
fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
unimplemented!()
}
}
let schema = Arc::new(MemorySchemaProvider::new()) as _;
let catalog = Arc::new(TestProvider {});
match catalog.register_schema("foo", schema) {
Ok(_) => panic!("unexpected OK"),
Err(e) => assert_eq!(e.strip_backtrace(), "This feature is not implemented: Registering new schemas is not supported"),
};
}
#[test]
fn memory_catalog_dereg_nonempty_schema() {
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>;
let test_table = Arc::new(EmptyTable::new(Arc::new(Schema::empty())))
as Arc<dyn TableProvider>;
schema.register_table("t".into(), test_table).unwrap();
cat.register_schema("foo", schema.clone()).unwrap();
assert!(
cat.deregister_schema("foo", false).is_err(),
"dropping empty schema without cascade should error"
);
assert!(cat.deregister_schema("foo", true).unwrap().is_some());
}
#[test]
fn memory_catalog_dereg_empty_schema() {
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
let schema = Arc::new(MemorySchemaProvider::new()) as Arc<dyn SchemaProvider>;
cat.register_schema("foo", schema.clone()).unwrap();
assert!(cat.deregister_schema("foo", false).unwrap().is_some());
}
#[test]
fn memory_catalog_dereg_missing() {
let cat = Arc::new(MemoryCatalogProvider::new()) as Arc<dyn CatalogProvider>;
assert!(cat.deregister_schema("foo", false).unwrap().is_none());
}
}