datafusion_catalog/memory/
schema.rs1use crate::{SchemaProvider, TableProvider};
21use async_trait::async_trait;
22use dashmap::DashMap;
23use datafusion_common::{DataFusionError, exec_err};
24use std::sync::Arc;
25
26#[derive(Debug)]
28pub struct MemorySchemaProvider {
29 tables: DashMap<String, Arc<dyn TableProvider>>,
30}
31
32impl MemorySchemaProvider {
33 pub fn new() -> Self {
35 Self {
36 tables: DashMap::new(),
37 }
38 }
39}
40
41impl Default for MemorySchemaProvider {
42 fn default() -> Self {
43 Self::new()
44 }
45}
46
47#[async_trait]
48impl SchemaProvider for MemorySchemaProvider {
49 fn table_names(&self) -> Vec<String> {
50 self.tables
51 .iter()
52 .map(|table| table.key().clone())
53 .collect()
54 }
55
56 async fn table(
57 &self,
58 name: &str,
59 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>, DataFusionError> {
60 Ok(self.tables.get(name).map(|table| Arc::clone(table.value())))
61 }
62
63 fn register_table(
64 &self,
65 name: String,
66 table: Arc<dyn TableProvider>,
67 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
68 if self.table_exist(name.as_str()) {
69 return exec_err!("The table {name} already exists");
70 }
71 Ok(self.tables.insert(name, table))
72 }
73
74 fn deregister_table(
75 &self,
76 name: &str,
77 ) -> datafusion_common::Result<Option<Arc<dyn TableProvider>>> {
78 Ok(self.tables.remove(name).map(|(_, table)| table))
79 }
80
81 fn table_exist(&self, name: &str) -> bool {
82 self.tables.contains_key(name)
83 }
84}