datafusion_catalog/
memory.rs1use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
22use async_trait::async_trait;
23use dashmap::DashMap;
24use datafusion_common::{exec_err, DataFusionError};
25use std::any::Any;
26use std::sync::Arc;
27
28#[derive(Debug)]
30pub struct MemoryCatalogProviderList {
31 pub catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
33}
34
35impl MemoryCatalogProviderList {
36 pub fn new() -> Self {
38 Self {
39 catalogs: DashMap::new(),
40 }
41 }
42}
43
44impl Default for MemoryCatalogProviderList {
45 fn default() -> Self {
46 Self::new()
47 }
48}
49
50impl CatalogProviderList for MemoryCatalogProviderList {
51 fn as_any(&self) -> &dyn Any {
52 self
53 }
54
55 fn register_catalog(
56 &self,
57 name: String,
58 catalog: Arc<dyn CatalogProvider>,
59 ) -> Option<Arc<dyn CatalogProvider>> {
60 self.catalogs.insert(name, catalog)
61 }
62
63 fn catalog_names(&self) -> Vec<String> {
64 self.catalogs.iter().map(|c| c.key().clone()).collect()
65 }
66
67 fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
68 self.catalogs.get(name).map(|c| Arc::clone(c.value()))
69 }
70}
71
72#[derive(Debug)]
74pub struct MemoryCatalogProvider {
75 schemas: DashMap<String, Arc<dyn SchemaProvider>>,
76}
77
78impl MemoryCatalogProvider {
79 pub fn new() -> Self {
81 Self {
82 schemas: DashMap::new(),
83 }
84 }
85}
86
87impl Default for MemoryCatalogProvider {
88 fn default() -> Self {
89 Self::new()
90 }
91}
92
93impl CatalogProvider for MemoryCatalogProvider {
94 fn as_any(&self) -> &dyn Any {
95 self
96 }
97
98 fn schema_names(&self) -> Vec<String> {
99 self.schemas.iter().map(|s| s.key().clone()).collect()
100 }
101
102 fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
103 self.schemas.get(name).map(|s| Arc::clone(s.value()))
104 }
105
106 fn register_schema(
107 &self,
108 name: &str,
109 schema: Arc<dyn SchemaProvider>,
110 ) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
111 Ok(self.schemas.insert(name.into(), schema))
112 }
113
114 fn deregister_schema(
115 &self,
116 name: &str,
117 cascade: bool,
118 ) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
119 if let Some(schema) = self.schema(name) {
120 let table_names = schema.table_names();
121 match (table_names.is_empty(), cascade) {
122 (true, _) | (false, true) => {
123 let (_, removed) = self.schemas.remove(name).unwrap();
124 Ok(Some(removed))
125 }
126 (false, false) => exec_err!(
127 "Cannot drop schema {} because other tables depend on it: {}",
128 name,
129 itertools::join(table_names.iter(), ", ")
130 ),
131 }
132 } else {
133 Ok(None)
134 }
135 }
136}
137
138#[derive(Debug)]
140pub struct MemorySchemaProvider {
141 tables: DashMap<String, Arc<dyn TableProvider>>,
142}
143
144impl MemorySchemaProvider {
145 pub fn new() -> Self {
147 Self {
148 tables: DashMap::new(),
149 }
150 }
151}
152
153impl Default for MemorySchemaProvider {
154 fn default() -> Self {
155 Self::new()
156 }
157}
158
159#[async_trait]
160impl SchemaProvider for MemorySchemaProvider {
161 fn as_any(&self) -> &dyn Any {
162 self
163 }
164
165 fn table_names(&self) -> Vec<String> {
166 self.tables
167 .iter()
168 .map(|table| table.key().clone())
169 .collect()
170 }
171
172 async fn table(
173 &self,
174 name: &str,
175 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
176 Ok(self.tables.get(name).map(|table| Arc::clone(table.value())))
177 }
178
179 fn register_table(
180 &self,
181 name: String,
182 table: Arc<dyn TableProvider>,
183 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
184 if self.table_exist(name.as_str()) {
185 return exec_err!("The table {name} already exists");
186 }
187 Ok(self.tables.insert(name, table))
188 }
189
190 fn deregister_table(
191 &self,
192 name: &str,
193 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
194 Ok(self.tables.remove(name).map(|(_, table)| table))
195 }
196
197 fn table_exist(&self, name: &str) -> bool {
198 self.tables.contains_key(name)
199 }
200}