datafusion_catalog/
memory.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`MemoryCatalogProvider`], [`MemoryCatalogProviderList`]: In-memory
19//! implementations of [`CatalogProviderList`] and [`CatalogProvider`].
20
21use crate::{CatalogProvider, CatalogProviderList, SchemaProvider, TableProvider};
22use async_trait::async_trait;
23use dashmap::DashMap;
24use datafusion_common::{exec_err, DataFusionError};
25use std::any::Any;
26use std::sync::Arc;
27
28/// Simple in-memory list of catalogs
29#[derive(Debug)]
30pub struct MemoryCatalogProviderList {
31    /// Collection of catalogs containing schemas and ultimately TableProviders
32    pub catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
33}
34
35impl MemoryCatalogProviderList {
36    /// Instantiates a new `MemoryCatalogProviderList` with an empty collection of catalogs
37    pub fn new() -> Self {
38        Self {
39            catalogs: DashMap::new(),
40        }
41    }
42}
43
44impl Default for MemoryCatalogProviderList {
45    fn default() -> Self {
46        Self::new()
47    }
48}
49
50impl CatalogProviderList for MemoryCatalogProviderList {
51    fn as_any(&self) -> &dyn Any {
52        self
53    }
54
55    fn register_catalog(
56        &self,
57        name: String,
58        catalog: Arc<dyn CatalogProvider>,
59    ) -> Option<Arc<dyn CatalogProvider>> {
60        self.catalogs.insert(name, catalog)
61    }
62
63    fn catalog_names(&self) -> Vec<String> {
64        self.catalogs.iter().map(|c| c.key().clone()).collect()
65    }
66
67    fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
68        self.catalogs.get(name).map(|c| Arc::clone(c.value()))
69    }
70}
71
72/// Simple in-memory implementation of a catalog.
73#[derive(Debug)]
74pub struct MemoryCatalogProvider {
75    schemas: DashMap<String, Arc<dyn SchemaProvider>>,
76}
77
78impl MemoryCatalogProvider {
79    /// Instantiates a new MemoryCatalogProvider with an empty collection of schemas.
80    pub fn new() -> Self {
81        Self {
82            schemas: DashMap::new(),
83        }
84    }
85}
86
87impl Default for MemoryCatalogProvider {
88    fn default() -> Self {
89        Self::new()
90    }
91}
92
93impl CatalogProvider for MemoryCatalogProvider {
94    fn as_any(&self) -> &dyn Any {
95        self
96    }
97
98    fn schema_names(&self) -> Vec<String> {
99        self.schemas.iter().map(|s| s.key().clone()).collect()
100    }
101
102    fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
103        self.schemas.get(name).map(|s| Arc::clone(s.value()))
104    }
105
106    fn register_schema(
107        &self,
108        name: &str,
109        schema: Arc<dyn SchemaProvider>,
110    ) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
111        Ok(self.schemas.insert(name.into(), schema))
112    }
113
114    fn deregister_schema(
115        &self,
116        name: &str,
117        cascade: bool,
118    ) -> datafusion_common::Result<Option<Arc<dyn SchemaProvider>>> {
119        if let Some(schema) = self.schema(name) {
120            let table_names = schema.table_names();
121            match (table_names.is_empty(), cascade) {
122                (true, _) | (false, true) => {
123                    let (_, removed) = self.schemas.remove(name).unwrap();
124                    Ok(Some(removed))
125                }
126                (false, false) => exec_err!(
127                    "Cannot drop schema {} because other tables depend on it: {}",
128                    name,
129                    itertools::join(table_names.iter(), ", ")
130                ),
131            }
132        } else {
133            Ok(None)
134        }
135    }
136}
137
138/// Simple in-memory implementation of a schema.
139#[derive(Debug)]
140pub struct MemorySchemaProvider {
141    tables: DashMap<String, Arc<dyn TableProvider>>,
142}
143
144impl MemorySchemaProvider {
145    /// Instantiates a new MemorySchemaProvider with an empty collection of tables.
146    pub fn new() -> Self {
147        Self {
148            tables: DashMap::new(),
149        }
150    }
151}
152
153impl Default for MemorySchemaProvider {
154    fn default() -> Self {
155        Self::new()
156    }
157}
158
159#[async_trait]
160impl SchemaProvider for MemorySchemaProvider {
161    fn as_any(&self) -> &dyn Any {
162        self
163    }
164
165    fn table_names(&self) -> Vec<String> {
166        self.tables
167            .iter()
168            .map(|table| table.key().clone())
169            .collect()
170    }
171
172    async fn table(
173        &self,
174        name: &str,
175    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
176        Ok(self.tables.get(name).map(|table| Arc::clone(table.value())))
177    }
178
179    fn register_table(
180        &self,
181        name: String,
182        table: Arc<dyn TableProvider>,
183    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
184        if self.table_exist(name.as_str()) {
185            return exec_err!("The table {name} already exists");
186        }
187        Ok(self.tables.insert(name, table))
188    }
189
190    fn deregister_table(
191        &self,
192        name: &str,
193    ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
194        Ok(self.tables.remove(name).map(|(_, table)| table))
195    }
196
197    fn table_exist(&self, name: &str) -> bool {
198        self.tables.contains_key(name)
199    }
200}