datafusion_catalog/memory/
catalog.rs1use crate::{CatalogProvider, CatalogProviderList, SchemaProvider};
22use dashmap::DashMap;
23use datafusion_common::exec_err;
24use std::sync::Arc;
25
26#[derive(Debug)]
28pub struct MemoryCatalogProviderList {
29 pub catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
31}
32
33impl MemoryCatalogProviderList {
34 pub fn new() -> Self {
36 Self {
37 catalogs: DashMap::new(),
38 }
39 }
40}
41
42impl Default for MemoryCatalogProviderList {
43 fn default() -> Self {
44 Self::new()
45 }
46}
47
48impl CatalogProviderList for MemoryCatalogProviderList {
49 fn register_catalog(
50 &self,
51 name: String,
52 catalog: Arc<dyn CatalogProvider>,
53 ) -> Option<Arc<dyn CatalogProvider>> {
54 self.catalogs.insert(name, catalog)
55 }
56
57 fn catalog_names(&self) -> Vec<String> {
58 self.catalogs.iter().map(|c| c.key().clone()).collect()
59 }
60
61 fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
62 self.catalogs.get(name).map(|c| Arc::clone(c.value()))
63 }
64}
65
66#[derive(Debug)]
68pub struct MemoryCatalogProvider {
69 schemas: DashMap<String, Arc<dyn SchemaProvider>>,
70}
71
72impl MemoryCatalogProvider {
73 pub fn new() -> Self {
75 Self {
76 schemas: DashMap::new(),
77 }
78 }
79}
80
81impl Default for MemoryCatalogProvider {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87impl CatalogProvider for MemoryCatalogProvider {
88 fn schema_names(&self) -> Vec<String> {
89 self.schemas.iter().map(|s| s.key().clone()).collect()
90 }
91
92 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
93 self.schemas.get(name).map(|s| Arc::clone(s.value()))
94 }
95
96 fn register_schema(
97 &self,
98 name: &str,
99 schema: Arc<dyn SchemaProvider>,
100 ) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
101 Ok(self.schemas.insert(name.into(), schema))
102 }
103
104 fn deregister_schema(
105 &self,
106 name: &str,
107 cascade: bool,
108 ) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
109 if let Some(schema) = self.schema(name) {
110 let table_names = schema.table_names();
111 match (table_names.is_empty(), cascade) {
112 (true, _) | (false, true) => {
113 let (_, removed) = self.schemas.remove(name).unwrap();
114 Ok(Some(removed))
115 }
116 (false, false) => exec_err!(
117 "Cannot drop schema {} because other tables depend on it: {}",
118 name,
119 itertools::join(table_names.iter(), ", ")
120 ),
121 }
122 } else {
123 Ok(None)
124 }
125 }
126}