use crate::catalog::schema::SchemaProvider;
use datafusion_common::{DataFusionError, Result};
use parking_lot::RwLock;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;
pub trait CatalogList: Sync + Send {
fn as_any(&self) -> &dyn Any;
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>>;
fn catalog_names(&self) -> Vec<String>;
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>>;
}
pub struct MemoryCatalogList {
pub catalogs: RwLock<HashMap<String, Arc<dyn CatalogProvider>>>,
}
impl MemoryCatalogList {
pub fn new() -> Self {
Self {
catalogs: RwLock::new(HashMap::new()),
}
}
}
impl Default for MemoryCatalogList {
fn default() -> Self {
Self::new()
}
}
impl CatalogList for MemoryCatalogList {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
let mut catalogs = self.catalogs.write();
catalogs.insert(name, catalog)
}
fn catalog_names(&self) -> Vec<String> {
let catalogs = self.catalogs.read();
catalogs.keys().map(|s| s.to_string()).collect()
}
fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
let catalogs = self.catalogs.read();
catalogs.get(name).cloned()
}
}
impl Default for MemoryCatalogProvider {
fn default() -> Self {
Self::new()
}
}
pub trait CatalogProvider: Sync + Send {
fn as_any(&self) -> &dyn Any;
fn schema_names(&self) -> Vec<String>;
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>>;
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
let _ = name;
let _ = schema;
Err(DataFusionError::NotImplemented(
"Registering new schemas is not supported".to_string(),
))
}
}
pub struct MemoryCatalogProvider {
schemas: RwLock<HashMap<String, Arc<dyn SchemaProvider>>>,
}
impl MemoryCatalogProvider {
pub fn new() -> Self {
Self {
schemas: RwLock::new(HashMap::new()),
}
}
}
impl CatalogProvider for MemoryCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
let schemas = self.schemas.read();
schemas.keys().cloned().collect()
}
fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
let schemas = self.schemas.read();
schemas.get(name).cloned()
}
fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
let mut schemas = self.schemas.write();
Ok(schemas.insert(name.into(), schema))
}
}
#[cfg(test)]
mod tests {
use crate::catalog::schema::MemorySchemaProvider;
use super::*;
#[test]
fn default_register_schema_not_supported() {
struct TestProvider {}
impl CatalogProvider for TestProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
unimplemented!()
}
fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
unimplemented!()
}
}
let schema = Arc::new(MemorySchemaProvider::new()) as _;
let catalog = Arc::new(TestProvider {});
match catalog.register_schema("foo", schema) {
Ok(_) => panic!("unexpected OK"),
Err(e) => assert_eq!(e.to_string(), "This feature is not implemented: Registering new schemas is not supported"),
};
}
}