Skip to main content

citadel_sql/
schema.rs

1//! Schema manager: in-memory cache of table schemas.
2
3use std::sync::Arc;
4
5use rustc_hash::FxHashMap;
6
7use citadel::Database;
8
9use crate::error::{Result, SqlError};
10use crate::system_tables::{self, VirtualTable};
11use crate::types::{ForeignKeySchemaEntry, TableSchema, ViewDef};
12
13const SCHEMA_TABLE: &[u8] = b"_schema";
14const VIEWS_TABLE: &[u8] = b"_views";
15const TRIGGERS_TABLE: &[u8] = b"_triggers";
16const MATVIEWS_TABLE: &[u8] = b"_matviews";
17
18thread_local! {
19    /// Stack of `(alias → storage_name)` frames pushed by FOR EACH STATEMENT trigger
20    /// firings so `REFERENCING NEW TABLE AS new_t` resolves while the body runs.
21    static TRANSITION_TABLES: std::cell::RefCell<Vec<FxHashMap<String, String>>> =
22        const { std::cell::RefCell::new(Vec::new()) };
23}
24
25fn transition_table_lookup(name_lower: &str) -> Option<String> {
26    TRANSITION_TABLES.with(|cell| {
27        let stack = cell.borrow();
28        for frame in stack.iter().rev() {
29            if let Some(storage) = frame.get(name_lower) {
30                return Some(storage.clone());
31            }
32        }
33        None
34    })
35}
36
37pub(crate) fn push_transition_tables(aliases: FxHashMap<String, String>) -> TransitionGuard {
38    TRANSITION_TABLES.with(|cell| cell.borrow_mut().push(aliases));
39    TransitionGuard
40}
41
42pub(crate) struct TransitionGuard;
43impl Drop for TransitionGuard {
44    fn drop(&mut self) {
45        TRANSITION_TABLES.with(|cell| {
46            cell.borrow_mut().pop();
47        });
48    }
49}
50
51/// Manages table schemas in memory, backed by the `_schema` table.
52pub struct SchemaManager {
53    tables: FxHashMap<String, TableSchema>,
54    views: FxHashMap<String, ViewDef>,
55    virtual_tables: FxHashMap<String, Arc<dyn VirtualTable>>,
56    /// Within a `(target, timing, event)` group, triggers fire in name order.
57    triggers: FxHashMap<String, Vec<crate::types::TriggerDef>>,
58    /// Matview catalog. Backing table shares the matview's name in `tables`; this map
59    /// also gates DML rejection (matviews are read-only outside REFRESH).
60    matviews: FxHashMap<String, crate::types::MatviewDef>,
61    /// Maps user-typed TEMP name to prefixed storage name (`__temp_<conn_id>_<name>`).
62    temp_aliases: FxHashMap<String, String>,
63    /// Each entry is leaked once via `Box::leak` so `get()` can hand out a `&TableSchema`
64    /// from inside `&self` methods. Bounded by `(active triggers × transition aliases)`.
65    transition_schemas: std::cell::RefCell<FxHashMap<String, &'static TableSchema>>,
66    generation: u64,
67}
68
69#[derive(Clone)]
70pub struct SchemaSnapshot {
71    tables: FxHashMap<String, TableSchema>,
72    views: FxHashMap<String, ViewDef>,
73    generation: u64,
74}
75
76impl SchemaManager {
77    pub fn empty() -> Self {
78        Self {
79            tables: FxHashMap::default(),
80            views: FxHashMap::default(),
81            virtual_tables: FxHashMap::default(),
82            triggers: FxHashMap::default(),
83            matviews: FxHashMap::default(),
84            temp_aliases: FxHashMap::default(),
85            transition_schemas: std::cell::RefCell::new(FxHashMap::default()),
86            generation: 0,
87        }
88    }
89
90    pub fn register_temp_alias(&mut self, user_name: &str, prefixed_name: String) {
91        self.temp_aliases
92            .insert(user_name.to_ascii_lowercase(), prefixed_name);
93        self.generation += 1;
94    }
95
96    pub fn unregister_temp_alias(&mut self, user_name: &str) -> Option<String> {
97        let lower = user_name.to_ascii_lowercase();
98        let removed = self.temp_aliases.remove(&lower);
99        if removed.is_some() {
100            self.generation += 1;
101        }
102        removed
103    }
104
105    pub fn temp_alias_iter(&self) -> impl Iterator<Item = (&str, &str)> + '_ {
106        self.temp_aliases
107            .iter()
108            .map(|(k, v)| (k.as_str(), v.as_str()))
109    }
110
111    pub fn resolve_temp(&self, name: &str) -> String {
112        let lower = name.to_ascii_lowercase();
113        if let Some(prefixed) = self.temp_aliases.get(&lower) {
114            return prefixed.clone();
115        }
116        name.to_string()
117    }
118
119    pub fn load(db: &Database) -> Result<Self> {
120        let mut tables = FxHashMap::default();
121
122        let mut rtx = db.begin_read();
123        let mut parse_err: Option<crate::error::SqlError> = None;
124        let scan_result = rtx.table_for_each(SCHEMA_TABLE, |_key, value| {
125            match TableSchema::deserialize(value) {
126                Ok(schema) => {
127                    tables.insert(schema.name.clone(), schema);
128                }
129                Err(e) => {
130                    parse_err = Some(e);
131                }
132            }
133            Ok(())
134        });
135
136        match scan_result {
137            Ok(()) => {}
138            Err(citadel_core::Error::TableNotFound(_)) => {}
139            Err(e) => return Err(e.into()),
140        }
141        if let Some(e) = parse_err {
142            return Err(e);
143        }
144
145        let mut views = FxHashMap::default();
146        let mut rtx2 = db.begin_read();
147        let mut view_err: Option<crate::error::SqlError> = None;
148        let view_scan = rtx2.table_for_each(VIEWS_TABLE, |_key, value| {
149            match ViewDef::deserialize(value) {
150                Ok(vd) => {
151                    views.insert(vd.name.clone(), vd);
152                }
153                Err(e) => {
154                    view_err = Some(e);
155                }
156            }
157            Ok(())
158        });
159
160        match view_scan {
161            Ok(()) => {}
162            Err(citadel_core::Error::TableNotFound(_)) => {}
163            Err(e) => return Err(e.into()),
164        }
165        if let Some(e) = view_err {
166            return Err(e);
167        }
168
169        let mut triggers: FxHashMap<String, Vec<crate::types::TriggerDef>> = FxHashMap::default();
170        let mut rtx3 = db.begin_read();
171        let mut trig_err: Option<crate::error::SqlError> = None;
172        let trig_scan = rtx3.table_for_each(TRIGGERS_TABLE, |_key, value| {
173            match crate::types::TriggerDef::deserialize(value) {
174                Ok(td) => {
175                    triggers
176                        .entry(td.target.to_ascii_lowercase())
177                        .or_default()
178                        .push(td);
179                }
180                Err(e) => {
181                    trig_err = Some(e);
182                }
183            }
184            Ok(())
185        });
186        match trig_scan {
187            Ok(()) => {}
188            Err(citadel_core::Error::TableNotFound(_)) => {}
189            Err(e) => return Err(e.into()),
190        }
191        if let Some(e) = trig_err {
192            return Err(e);
193        }
194        // PG-faithful: triggers fire in name order within a (target, timing, event) group.
195        for v in triggers.values_mut() {
196            v.sort_by(|a, b| a.name.cmp(&b.name));
197        }
198
199        let mut matviews: FxHashMap<String, crate::types::MatviewDef> = FxHashMap::default();
200        let mut rtx4 = db.begin_read();
201        let mut mv_err: Option<crate::error::SqlError> = None;
202        let mv_scan = rtx4.table_for_each(MATVIEWS_TABLE, |_key, value| {
203            match crate::types::MatviewDef::deserialize(value) {
204                Ok(mv) => {
205                    matviews.insert(mv.name.to_ascii_lowercase(), mv);
206                }
207                Err(e) => {
208                    mv_err = Some(e);
209                }
210            }
211            Ok(())
212        });
213        match mv_scan {
214            Ok(()) => {}
215            Err(citadel_core::Error::TableNotFound(_)) => {}
216            Err(e) => return Err(e.into()),
217        }
218        if let Some(e) = mv_err {
219            return Err(e);
220        }
221
222        let mut mgr = Self {
223            tables,
224            views,
225            virtual_tables: FxHashMap::default(),
226            triggers,
227            matviews,
228            temp_aliases: FxHashMap::default(),
229            transition_schemas: std::cell::RefCell::new(FxHashMap::default()),
230            generation: 0,
231        };
232        system_tables::register_builtins(&mut mgr);
233        Ok(mgr)
234    }
235
236    pub fn get_virtual(&self, name: &str) -> Option<&Arc<dyn VirtualTable>> {
237        self.virtual_tables.get(name)
238    }
239
240    pub fn register_virtual(&mut self, vt: Arc<dyn VirtualTable>) {
241        let name = vt.name().to_ascii_lowercase();
242        self.virtual_tables.insert(name, vt);
243    }
244
245    pub fn get(&self, name: &str) -> Option<&TableSchema> {
246        let lower = name.to_ascii_lowercase();
247        if let Some(prefixed) = transition_table_lookup(&lower) {
248            if let Some(s) = self.tables.get(&prefixed) {
249                return Some(s);
250            }
251            if let Some(&leaked) = self.transition_schemas.borrow().get(&prefixed) {
252                return Some(leaked);
253            }
254        }
255        if let Some(mv) = self.matviews.get(&lower) {
256            return self.tables.get(&mv.backing_table);
257        }
258        if let Some(prefixed) = self.temp_aliases.get(&lower) {
259            return self.tables.get(prefixed);
260        }
261        if let Some(s) = self.tables.get(name) {
262            return Some(s);
263        }
264        if name.bytes().any(|b| b.is_ascii_uppercase()) {
265            self.tables.get(&lower)
266        } else {
267            None
268        }
269    }
270
271    pub fn register_transition_schema(&self, storage_name: String, schema: TableSchema) {
272        let leaked: &'static TableSchema = Box::leak(Box::new(schema));
273        self.transition_schemas
274            .borrow_mut()
275            .insert(storage_name, leaked);
276    }
277
278    pub fn unregister_transition_schema(&self, storage_name: &str) {
279        self.transition_schemas.borrow_mut().remove(storage_name);
280    }
281
282    pub fn contains(&self, name: &str) -> bool {
283        let lower = name.to_ascii_lowercase();
284        if transition_table_lookup(&lower).is_some() {
285            return true;
286        }
287        if self.matviews.contains_key(&lower) {
288            return true;
289        }
290        if self.temp_aliases.contains_key(&lower) {
291            return true;
292        }
293        if self.tables.contains_key(name) {
294            return true;
295        }
296        if name.bytes().any(|b| b.is_ascii_uppercase()) {
297            self.tables.contains_key(&lower)
298        } else {
299            false
300        }
301    }
302
303    pub fn generation(&self) -> u64 {
304        self.generation
305    }
306
307    pub fn register(&mut self, schema: TableSchema) {
308        let lower = schema.name.to_ascii_lowercase();
309        self.tables.insert(lower, schema);
310        self.generation += 1;
311    }
312
313    pub fn remove(&mut self, name: &str) -> Option<TableSchema> {
314        let lower = name.to_ascii_lowercase();
315        let result = self.tables.remove(&lower);
316        if result.is_some() {
317            self.generation += 1;
318        }
319        result
320    }
321
322    pub fn table_names(&self) -> Vec<&str> {
323        self.tables.keys().map(|s| s.as_str()).collect()
324    }
325
326    pub fn all_schemas(&self) -> impl Iterator<Item = &TableSchema> {
327        self.tables.values()
328    }
329
330    pub fn get_view(&self, name: &str) -> Option<&ViewDef> {
331        if let Some(v) = self.views.get(name) {
332            return Some(v);
333        }
334        if name.bytes().any(|b| b.is_ascii_uppercase()) {
335            self.views.get(&name.to_ascii_lowercase())
336        } else {
337            None
338        }
339    }
340
341    pub fn register_view(&mut self, view: ViewDef) {
342        let lower = view.name.to_ascii_lowercase();
343        self.views.insert(lower, view);
344        self.generation += 1;
345    }
346
347    pub fn remove_view(&mut self, name: &str) -> Option<ViewDef> {
348        let lower = name.to_ascii_lowercase();
349        let result = self.views.remove(&lower);
350        if result.is_some() {
351            self.generation += 1;
352        }
353        result
354    }
355
356    pub fn view_names(&self) -> Vec<&str> {
357        self.views.keys().map(|s| s.as_str()).collect()
358    }
359
360    pub fn triggers_for(&self, target: &str) -> &[crate::types::TriggerDef] {
361        let key = target.to_ascii_lowercase();
362        self.triggers.get(&key).map(|v| v.as_slice()).unwrap_or(&[])
363    }
364
365    pub fn all_triggers(&self) -> impl Iterator<Item = &crate::types::TriggerDef> + '_ {
366        self.triggers.values().flatten()
367    }
368
369    pub fn register_trigger(&mut self, trig: crate::types::TriggerDef) {
370        let target = trig.target.to_ascii_lowercase();
371        let bucket = self.triggers.entry(target).or_default();
372        bucket.push(trig);
373        bucket.sort_by(|a, b| a.name.cmp(&b.name));
374        self.generation += 1;
375    }
376
377    pub fn remove_trigger(&mut self, name: &str) -> Option<crate::types::TriggerDef> {
378        let lower = name.to_ascii_lowercase();
379        let mut result = None;
380        for bucket in self.triggers.values_mut() {
381            if let Some(pos) = bucket
382                .iter()
383                .position(|t| t.name.eq_ignore_ascii_case(&lower))
384            {
385                result = Some(bucket.remove(pos));
386                break;
387            }
388        }
389        if result.is_some() {
390            self.generation += 1;
391        }
392        result
393    }
394
395    /// Caller is responsible for dropping the returned triggers' on-disk catalog rows.
396    pub fn remove_triggers_for(&mut self, target: &str) -> Vec<crate::types::TriggerDef> {
397        let key = target.to_ascii_lowercase();
398        let removed = self.triggers.remove(&key).unwrap_or_default();
399        if !removed.is_empty() {
400            self.generation += 1;
401        }
402        removed
403    }
404
405    pub fn find_trigger(&self, name: &str) -> Option<(&str, &crate::types::TriggerDef)> {
406        let lower = name.to_ascii_lowercase();
407        for (target, bucket) in &self.triggers {
408            if let Some(t) = bucket.iter().find(|t| t.name.eq_ignore_ascii_case(&lower)) {
409                return Some((target.as_str(), t));
410            }
411        }
412        None
413    }
414
415    pub fn set_trigger_enabled(&mut self, name: &str, enabled: bool) -> bool {
416        let lower = name.to_ascii_lowercase();
417        for bucket in self.triggers.values_mut() {
418            if let Some(t) = bucket
419                .iter_mut()
420                .find(|t| t.name.eq_ignore_ascii_case(&lower))
421            {
422                t.enabled = enabled;
423                self.generation += 1;
424                return true;
425            }
426        }
427        false
428    }
429
430    pub fn set_all_triggers_enabled(&mut self, target: &str, enabled: bool) -> usize {
431        let key = target.to_ascii_lowercase();
432        let bucket = match self.triggers.get_mut(&key) {
433            Some(b) => b,
434            None => return 0,
435        };
436        let count = bucket.len();
437        for t in bucket {
438            t.enabled = enabled;
439        }
440        if count > 0 {
441            self.generation += 1;
442        }
443        count
444    }
445
446    pub fn ensure_triggers_table(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>) -> Result<()> {
447        match wtx.create_table(TRIGGERS_TABLE) {
448            Ok(()) => Ok(()),
449            Err(citadel_core::Error::TableAlreadyExists(_)) => Ok(()),
450            Err(e) => Err(e.into()),
451        }
452    }
453
454    pub fn save_trigger(
455        wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
456        trig: &crate::types::TriggerDef,
457    ) -> Result<()> {
458        Self::ensure_triggers_table(wtx)?;
459        let data = trig.serialize();
460        let lower = trig.name.to_ascii_lowercase();
461        wtx.table_insert(TRIGGERS_TABLE, lower.as_bytes(), &data)
462            .map_err(crate::error::SqlError::from)?;
463        Ok(())
464    }
465
466    pub fn delete_trigger(
467        wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
468        name: &str,
469    ) -> Result<()> {
470        Self::ensure_triggers_table(wtx)?;
471        let lower = name.to_ascii_lowercase();
472        wtx.table_delete(TRIGGERS_TABLE, lower.as_bytes())
473            .map_err(crate::error::SqlError::from)?;
474        Ok(())
475    }
476
477    pub fn save_view(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>, view: &ViewDef) -> Result<()> {
478        let lower = view.name.to_ascii_lowercase();
479        let data = view.serialize();
480        wtx.table_insert(VIEWS_TABLE, lower.as_bytes(), &data)?;
481        Ok(())
482    }
483
484    pub fn delete_view(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>, name: &str) -> Result<()> {
485        let lower = name.to_ascii_lowercase();
486        wtx.table_delete(VIEWS_TABLE, lower.as_bytes())
487            .map_err(|e| match e {
488                citadel_core::Error::TableNotFound(_) => SqlError::ViewNotFound(name.into()),
489                other => SqlError::Storage(other),
490            })?;
491        Ok(())
492    }
493
494    pub fn ensure_views_table(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>) -> Result<()> {
495        match wtx.create_table(VIEWS_TABLE) {
496            Ok(()) => Ok(()),
497            Err(citadel_core::Error::TableAlreadyExists(_)) => Ok(()),
498            Err(e) => Err(e.into()),
499        }
500    }
501
502    pub fn get_matview(&self, name: &str) -> Option<&crate::types::MatviewDef> {
503        let lower = name.to_ascii_lowercase();
504        self.matviews.get(&lower)
505    }
506
507    pub fn matview_names(&self) -> Vec<&str> {
508        self.matviews.keys().map(|s| s.as_str()).collect()
509    }
510
511    pub fn all_matviews(&self) -> impl Iterator<Item = &crate::types::MatviewDef> + '_ {
512        self.matviews.values()
513    }
514
515    pub fn register_matview(&mut self, mv: crate::types::MatviewDef) {
516        let lower = mv.name.to_ascii_lowercase();
517        self.matviews.insert(lower, mv);
518        self.generation += 1;
519    }
520
521    pub fn remove_matview(&mut self, name: &str) -> Option<crate::types::MatviewDef> {
522        let lower = name.to_ascii_lowercase();
523        let removed = self.matviews.remove(&lower);
524        if removed.is_some() {
525            self.generation += 1;
526        }
527        removed
528    }
529
530    pub fn ensure_matviews_table(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>) -> Result<()> {
531        match wtx.create_table(MATVIEWS_TABLE) {
532            Ok(()) => Ok(()),
533            Err(citadel_core::Error::TableAlreadyExists(_)) => Ok(()),
534            Err(e) => Err(e.into()),
535        }
536    }
537
538    pub fn save_matview(
539        wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
540        mv: &crate::types::MatviewDef,
541    ) -> Result<()> {
542        Self::ensure_matviews_table(wtx)?;
543        let lower = mv.name.to_ascii_lowercase();
544        let data = mv.serialize();
545        wtx.table_insert(MATVIEWS_TABLE, lower.as_bytes(), &data)?;
546        Ok(())
547    }
548
549    pub fn delete_matview(
550        wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
551        name: &str,
552    ) -> Result<()> {
553        Self::ensure_matviews_table(wtx)?;
554        let lower = name.to_ascii_lowercase();
555        wtx.table_delete(MATVIEWS_TABLE, lower.as_bytes())
556            .map_err(crate::error::SqlError::from)?;
557        Ok(())
558    }
559
560    pub fn child_fks_for(&self, parent: &str) -> Vec<(&str, &ForeignKeySchemaEntry)> {
561        self.tables
562            .iter()
563            .flat_map(|(name, schema)| {
564                schema
565                    .foreign_keys
566                    .iter()
567                    .filter(|fk| fk.foreign_table == parent)
568                    .map(move |fk| (name.as_str(), fk))
569            })
570            .collect()
571    }
572
573    pub fn save_schema(
574        wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
575        schema: &TableSchema,
576    ) -> Result<()> {
577        let lower = schema.name.to_ascii_lowercase();
578        let data = schema.serialize();
579        wtx.table_insert(SCHEMA_TABLE, lower.as_bytes(), &data)?;
580        Ok(())
581    }
582
583    pub fn delete_schema(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>, name: &str) -> Result<()> {
584        let lower = name.to_ascii_lowercase();
585        wtx.table_delete(SCHEMA_TABLE, lower.as_bytes())
586            .map_err(|e| match e {
587                citadel_core::Error::TableNotFound(_) => SqlError::TableNotFound(name.into()),
588                other => SqlError::Storage(other),
589            })?;
590        Ok(())
591    }
592
593    pub fn ensure_schema_table(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>) -> Result<()> {
594        match wtx.create_table(SCHEMA_TABLE) {
595            Ok(()) => Ok(()),
596            Err(citadel_core::Error::TableAlreadyExists(_)) => Ok(()),
597            Err(e) => Err(e.into()),
598        }
599    }
600
601    pub fn save_snapshot(&self) -> SchemaSnapshot {
602        SchemaSnapshot {
603            tables: self.tables.clone(),
604            views: self.views.clone(),
605            generation: self.generation,
606        }
607    }
608
609    pub fn restore_snapshot(&mut self, snap: SchemaSnapshot) {
610        self.tables = snap.tables;
611        self.views = snap.views;
612        self.generation = snap.generation;
613    }
614}
615
616#[cfg(test)]
617#[path = "schema_tests.rs"]
618mod tests;