Skip to main content

citadel_sql/
schema.rs

1//! Schema manager: in-memory cache of table schemas.
2
3use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use citadel::Database;
8
9use crate::error::{Result, SqlError};
10use crate::system_tables::{self, VirtualTable};
11use crate::types::{ForeignKeySchemaEntry, TableSchema, ViewDef};
12
13const SCHEMA_TABLE: &[u8] = b"_schema";
14const VIEWS_TABLE: &[u8] = b"_views";
15
16/// Manages table schemas in memory, backed by the `_schema` table.
17pub struct SchemaManager {
18    tables: FxHashMap<String, TableSchema>,
19    views: FxHashMap<String, ViewDef>,
20    virtual_tables: FxHashMap<String, Arc<dyn VirtualTable>>,
21    generation: u64,
22}
23
24#[derive(Clone)]
25pub struct SchemaSnapshot {
26    tables: FxHashMap<String, TableSchema>,
27    views: FxHashMap<String, ViewDef>,
28    generation: u64,
29}
30
31impl SchemaManager {
32    /// Load all schemas from the database's `_schema` table.
33    pub fn load(db: &Database) -> Result<Self> {
34        let mut tables = FxHashMap::default();
35
36        let mut rtx = db.begin_read();
37        let mut parse_err: Option<crate::error::SqlError> = None;
38        let scan_result = rtx.table_for_each(SCHEMA_TABLE, |_key, value| {
39            match TableSchema::deserialize(value) {
40                Ok(schema) => {
41                    tables.insert(schema.name.clone(), schema);
42                }
43                Err(e) => {
44                    parse_err = Some(e);
45                }
46            }
47            Ok(())
48        });
49
50        match scan_result {
51            Ok(()) => {}
52            Err(citadel_core::Error::TableNotFound(_)) => {}
53            Err(e) => return Err(e.into()),
54        }
55        if let Some(e) = parse_err {
56            return Err(e);
57        }
58
59        let mut views = FxHashMap::default();
60        let mut rtx2 = db.begin_read();
61        let mut view_err: Option<crate::error::SqlError> = None;
62        let view_scan = rtx2.table_for_each(VIEWS_TABLE, |_key, value| {
63            match ViewDef::deserialize(value) {
64                Ok(vd) => {
65                    views.insert(vd.name.clone(), vd);
66                }
67                Err(e) => {
68                    view_err = Some(e);
69                }
70            }
71            Ok(())
72        });
73
74        match view_scan {
75            Ok(()) => {}
76            Err(citadel_core::Error::TableNotFound(_)) => {}
77            Err(e) => return Err(e.into()),
78        }
79        if let Some(e) = view_err {
80            return Err(e);
81        }
82
83        let mut mgr = Self {
84            tables,
85            views,
86            virtual_tables: FxHashMap::default(),
87            generation: 0,
88        };
89        system_tables::register_builtins(&mut mgr);
90        Ok(mgr)
91    }
92
93    pub fn get_virtual(&self, name: &str) -> Option<&Arc<dyn VirtualTable>> {
94        self.virtual_tables.get(name)
95    }
96
97    pub fn register_virtual(&mut self, vt: Arc<dyn VirtualTable>) {
98        let name = vt.name().to_ascii_lowercase();
99        self.virtual_tables.insert(name, vt);
100    }
101
102    pub fn get(&self, name: &str) -> Option<&TableSchema> {
103        if let Some(s) = self.tables.get(name) {
104            return Some(s);
105        }
106        if name.bytes().any(|b| b.is_ascii_uppercase()) {
107            self.tables.get(&name.to_ascii_lowercase())
108        } else {
109            None
110        }
111    }
112
113    pub fn contains(&self, name: &str) -> bool {
114        if self.tables.contains_key(name) {
115            return true;
116        }
117        if name.bytes().any(|b| b.is_ascii_uppercase()) {
118            self.tables.contains_key(&name.to_ascii_lowercase())
119        } else {
120            false
121        }
122    }
123
124    pub fn generation(&self) -> u64 {
125        self.generation
126    }
127
128    pub fn register(&mut self, schema: TableSchema) {
129        let lower = schema.name.to_ascii_lowercase();
130        self.tables.insert(lower, schema);
131        self.generation += 1;
132    }
133
134    pub fn remove(&mut self, name: &str) -> Option<TableSchema> {
135        let lower = name.to_ascii_lowercase();
136        let result = self.tables.remove(&lower);
137        if result.is_some() {
138            self.generation += 1;
139        }
140        result
141    }
142
143    pub fn table_names(&self) -> Vec<&str> {
144        self.tables.keys().map(|s| s.as_str()).collect()
145    }
146
147    /// Returns all table schemas.
148    pub fn all_schemas(&self) -> impl Iterator<Item = &TableSchema> {
149        self.tables.values()
150    }
151
152    pub fn get_view(&self, name: &str) -> Option<&ViewDef> {
153        if let Some(v) = self.views.get(name) {
154            return Some(v);
155        }
156        if name.bytes().any(|b| b.is_ascii_uppercase()) {
157            self.views.get(&name.to_ascii_lowercase())
158        } else {
159            None
160        }
161    }
162
163    pub fn register_view(&mut self, view: ViewDef) {
164        let lower = view.name.to_ascii_lowercase();
165        self.views.insert(lower, view);
166        self.generation += 1;
167    }
168
169    pub fn remove_view(&mut self, name: &str) -> Option<ViewDef> {
170        let lower = name.to_ascii_lowercase();
171        let result = self.views.remove(&lower);
172        if result.is_some() {
173            self.generation += 1;
174        }
175        result
176    }
177
178    pub fn view_names(&self) -> Vec<&str> {
179        self.views.keys().map(|s| s.as_str()).collect()
180    }
181
182    pub fn save_view(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>, view: &ViewDef) -> Result<()> {
183        let lower = view.name.to_ascii_lowercase();
184        let data = view.serialize();
185        wtx.table_insert(VIEWS_TABLE, lower.as_bytes(), &data)?;
186        Ok(())
187    }
188
189    pub fn delete_view(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>, name: &str) -> Result<()> {
190        let lower = name.to_ascii_lowercase();
191        wtx.table_delete(VIEWS_TABLE, lower.as_bytes())
192            .map_err(|e| match e {
193                citadel_core::Error::TableNotFound(_) => SqlError::ViewNotFound(name.into()),
194                other => SqlError::Storage(other),
195            })?;
196        Ok(())
197    }
198
199    pub fn ensure_views_table(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>) -> Result<()> {
200        match wtx.create_table(VIEWS_TABLE) {
201            Ok(()) => Ok(()),
202            Err(citadel_core::Error::TableAlreadyExists(_)) => Ok(()),
203            Err(e) => Err(e.into()),
204        }
205    }
206
207    /// Find all FKs in other tables that reference `parent` table.
208    pub fn child_fks_for(&self, parent: &str) -> Vec<(&str, &ForeignKeySchemaEntry)> {
209        self.tables
210            .iter()
211            .flat_map(|(name, schema)| {
212                schema
213                    .foreign_keys
214                    .iter()
215                    .filter(|fk| fk.foreign_table == parent)
216                    .map(move |fk| (name.as_str(), fk))
217            })
218            .collect()
219    }
220
221    /// Persist a schema to the _schema table (called within a write txn).
222    pub fn save_schema(
223        wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
224        schema: &TableSchema,
225    ) -> Result<()> {
226        let lower = schema.name.to_ascii_lowercase();
227        let data = schema.serialize();
228        wtx.table_insert(SCHEMA_TABLE, lower.as_bytes(), &data)?;
229        Ok(())
230    }
231
232    /// Remove a schema from the _schema table (called within a write txn).
233    pub fn delete_schema(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>, name: &str) -> Result<()> {
234        let lower = name.to_ascii_lowercase();
235        wtx.table_delete(SCHEMA_TABLE, lower.as_bytes())
236            .map_err(|e| match e {
237                citadel_core::Error::TableNotFound(_) => SqlError::TableNotFound(name.into()),
238                other => SqlError::Storage(other),
239            })?;
240        Ok(())
241    }
242
243    /// Ensure the _schema table exists (called once per write).
244    pub fn ensure_schema_table(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>) -> Result<()> {
245        match wtx.create_table(SCHEMA_TABLE) {
246            Ok(()) => Ok(()),
247            Err(citadel_core::Error::TableAlreadyExists(_)) => Ok(()),
248            Err(e) => Err(e.into()),
249        }
250    }
251
252    pub fn save_snapshot(&self) -> SchemaSnapshot {
253        SchemaSnapshot {
254            tables: self.tables.clone(),
255            views: self.views.clone(),
256            generation: self.generation,
257        }
258    }
259
260    pub fn restore_snapshot(&mut self, snap: SchemaSnapshot) {
261        self.tables = snap.tables;
262        self.views = snap.views;
263        self.generation = snap.generation;
264    }
265}