Skip to main content

citadel_sql/
schema.rs

1//! Schema manager: in-memory cache of table schemas.
2
3use std::sync::Arc;
4
5use rustc_hash::{FxHashMap, FxHashSet};
6
7use citadel::{Database, SqlCacheHandle};
8use parking_lot::Mutex;
9
10use crate::error::{Result, SqlError};
11use crate::system_tables::{self, VirtualTable};
12use crate::types::{ForeignKeySchemaEntry, TableSchema, ViewDef};
13
14/// Reverse FK index (parent -> [(child table, fk idx)]), tagged with its generation.
15type FkChildrenCache = std::cell::RefCell<Option<(u64, FxHashMap<String, Vec<(String, usize)>>)>>;
16
17const SCHEMA_TABLE: &[u8] = b"_schema";
18const VIEWS_TABLE: &[u8] = b"_views";
19const TRIGGERS_TABLE: &[u8] = b"_triggers";
20const MATVIEWS_TABLE: &[u8] = b"_matviews";
21
22thread_local! {
23    /// Stack of `(alias → storage_name)` frames pushed by FOR EACH STATEMENT trigger
24    /// firings so `REFERENCING NEW TABLE AS new_t` resolves while the body runs.
25    static TRANSITION_TABLES: std::cell::RefCell<Vec<FxHashMap<String, String>>> =
26        const { std::cell::RefCell::new(Vec::new()) };
27}
28
29/// Borrow already-lowercase names (the common hot-path case), allocate otherwise.
30fn lower_cow(name: &str) -> std::borrow::Cow<'_, str> {
31    if name.bytes().any(|b| b.is_ascii_uppercase()) {
32        std::borrow::Cow::Owned(name.to_ascii_lowercase())
33    } else {
34        std::borrow::Cow::Borrowed(name)
35    }
36}
37
38fn transition_table_lookup(name_lower: &str) -> Option<String> {
39    TRANSITION_TABLES.with(|cell| {
40        let stack = cell.borrow();
41        for frame in stack.iter().rev() {
42            if let Some(storage) = frame.get(name_lower) {
43                return Some(storage.clone());
44            }
45        }
46        None
47    })
48}
49
50pub(crate) fn push_transition_tables(aliases: FxHashMap<String, String>) -> TransitionGuard {
51    TRANSITION_TABLES.with(|cell| cell.borrow_mut().push(aliases));
52    TransitionGuard
53}
54
55pub(crate) struct TransitionGuard;
56impl Drop for TransitionGuard {
57    fn drop(&mut self) {
58        TRANSITION_TABLES.with(|cell| {
59            cell.borrow_mut().pop();
60        });
61    }
62}
63
64/// Manages table schemas in memory, backed by the `_schema` table.
65pub struct SchemaManager {
66    tables: FxHashMap<String, TableSchema>,
67    views: FxHashMap<String, ViewDef>,
68    virtual_tables: FxHashMap<String, Arc<dyn VirtualTable>>,
69    /// Within a `(target, timing, event)` group, triggers fire in name order.
70    triggers: FxHashMap<String, Vec<crate::types::TriggerDef>>,
71    /// Matview catalog. Backing table shares the matview's name in `tables`; this map
72    /// also gates DML rejection (matviews are read-only outside REFRESH).
73    matviews: FxHashMap<String, crate::types::MatviewDef>,
74    /// Maps user-typed TEMP name to prefixed storage name (`__temp_<conn_id>_<name>`).
75    temp_aliases: FxHashMap<String, String>,
76    /// Each entry is leaked once via `Box::leak` so `get()` can hand out a `&TableSchema`
77    /// from inside `&self` methods. Bounded by `(active triggers × transition aliases)`.
78    transition_schemas: std::cell::RefCell<FxHashMap<String, &'static TableSchema>>,
79    generation: u64,
80    /// Per-Database shared cache (e.g. ANN indexes). Cloned from the Database
81    /// when the Connection opens; all Connections to the same DB share entries.
82    /// Tests created via `empty()` get their own isolated cache.
83    pub sql_caches: SqlCacheHandle,
84    /// Tables mutated (UPDATE/DELETE/upsert/DDL) since the last commit; their
85    /// shared caches are hard-invalidated on commit.
86    dml_dirty_tables: std::cell::RefCell<FxHashSet<String>>,
87    /// Tables touched only by pure appends, mapped to the min inserted pk; an
88    /// append above the index snapshot tail-merges instead of hard-invalidating.
89    dml_append_tables: std::cell::RefCell<FxHashMap<String, i64>>,
90    /// Reverse FK index, rebuilt lazily whenever the schema generation changes.
91    fk_children_cache: FkChildrenCache,
92}
93
94/// DML since the last commit, classified for cache invalidation.
95pub struct DmlDirty {
96    pub mutating: Vec<String>,
97    pub appends: Vec<(String, i64)>,
98}
99
100#[derive(Clone)]
101pub struct SchemaSnapshot {
102    tables: FxHashMap<String, TableSchema>,
103    views: FxHashMap<String, ViewDef>,
104    generation: u64,
105}
106
107impl SchemaManager {
108    pub fn empty() -> Self {
109        Self {
110            tables: FxHashMap::default(),
111            views: FxHashMap::default(),
112            virtual_tables: FxHashMap::default(),
113            triggers: FxHashMap::default(),
114            matviews: FxHashMap::default(),
115            temp_aliases: FxHashMap::default(),
116            transition_schemas: std::cell::RefCell::new(FxHashMap::default()),
117            generation: 0,
118            sql_caches: Arc::new(Mutex::new(FxHashMap::default())),
119            dml_dirty_tables: std::cell::RefCell::new(FxHashSet::default()),
120            dml_append_tables: std::cell::RefCell::new(FxHashMap::default()),
121            fk_children_cache: std::cell::RefCell::new(None),
122        }
123    }
124
125    /// Mark a table mutated (UPDATE/DELETE/upsert/DDL); supersedes a pending append.
126    pub fn mark_dml(&self, table_name: &str) {
127        let key = lower_cow(table_name);
128        // Dirty implies no pending append (mark_dml_append checks dirty first;
129        // this fn removes the append entry before inserting dirty).
130        if self.dml_dirty_tables.borrow().contains(key.as_ref()) {
131            return;
132        }
133        self.dml_append_tables.borrow_mut().remove(key.as_ref());
134        self.dml_dirty_tables.borrow_mut().insert(key.into_owned());
135    }
136
137    /// Mark a pure append with the smallest inserted pk; no-op if already mutating.
138    pub fn mark_dml_append(&self, table_name: &str, min_pk: i64) {
139        let key = lower_cow(table_name);
140        if self.dml_dirty_tables.borrow().contains(key.as_ref()) {
141            return;
142        }
143        let mut appends = self.dml_append_tables.borrow_mut();
144        match appends.get_mut(key.as_ref()) {
145            Some(m) => *m = (*m).min(min_pk),
146            None => {
147                appends.insert(key.into_owned(), min_pk);
148            }
149        }
150    }
151
152    /// Take the touched tables, classified into mutating vs pure-append.
153    pub fn drain_dml_dirty(&self) -> DmlDirty {
154        DmlDirty {
155            mutating: self.dml_dirty_tables.borrow_mut().drain().collect(),
156            appends: self.dml_append_tables.borrow_mut().drain().collect(),
157        }
158    }
159
160    pub fn has_dml_dirty(&self) -> bool {
161        !self.dml_dirty_tables.borrow().is_empty() || !self.dml_append_tables.borrow().is_empty()
162    }
163
164    /// Forget pending DML markers without invalidating downstream caches.
165    /// Used on rollback (uncommitted writes leave no caches stale).
166    pub fn clear_dml_dirty(&self) {
167        self.dml_dirty_tables.borrow_mut().clear();
168        self.dml_append_tables.borrow_mut().clear();
169    }
170
171    pub fn register_temp_alias(&mut self, user_name: &str, prefixed_name: String) {
172        self.temp_aliases
173            .insert(user_name.to_ascii_lowercase(), prefixed_name);
174        self.generation += 1;
175    }
176
177    pub fn unregister_temp_alias(&mut self, user_name: &str) -> Option<String> {
178        let lower = user_name.to_ascii_lowercase();
179        let removed = self.temp_aliases.remove(&lower);
180        if removed.is_some() {
181            self.generation += 1;
182        }
183        removed
184    }
185
186    pub fn temp_alias_iter(&self) -> impl Iterator<Item = (&str, &str)> + '_ {
187        self.temp_aliases
188            .iter()
189            .map(|(k, v)| (k.as_str(), v.as_str()))
190    }
191
192    pub fn resolve_temp(&self, name: &str) -> String {
193        let lower = name.to_ascii_lowercase();
194        if let Some(prefixed) = self.temp_aliases.get(&lower) {
195            return prefixed.clone();
196        }
197        name.to_string()
198    }
199
200    pub fn load(db: &Database) -> Result<Self> {
201        let mut tables = FxHashMap::default();
202
203        let mut rtx = db.begin_read();
204        let mut parse_err: Option<crate::error::SqlError> = None;
205        let scan_result = rtx.table_for_each(SCHEMA_TABLE, |_key, value| {
206            match TableSchema::deserialize(value) {
207                Ok(schema) => {
208                    tables.insert(schema.name.clone(), schema);
209                }
210                Err(e) => {
211                    parse_err = Some(e);
212                }
213            }
214            Ok(())
215        });
216
217        match scan_result {
218            Ok(()) => {}
219            Err(citadel_core::Error::TableNotFound(_)) => {}
220            Err(e) => return Err(e.into()),
221        }
222        if let Some(e) = parse_err {
223            return Err(e);
224        }
225
226        let mut views = FxHashMap::default();
227        let mut rtx2 = db.begin_read();
228        let mut view_err: Option<crate::error::SqlError> = None;
229        let view_scan = rtx2.table_for_each(VIEWS_TABLE, |_key, value| {
230            match ViewDef::deserialize(value) {
231                Ok(vd) => {
232                    views.insert(vd.name.clone(), vd);
233                }
234                Err(e) => {
235                    view_err = Some(e);
236                }
237            }
238            Ok(())
239        });
240
241        match view_scan {
242            Ok(()) => {}
243            Err(citadel_core::Error::TableNotFound(_)) => {}
244            Err(e) => return Err(e.into()),
245        }
246        if let Some(e) = view_err {
247            return Err(e);
248        }
249
250        let mut triggers: FxHashMap<String, Vec<crate::types::TriggerDef>> = FxHashMap::default();
251        let mut rtx3 = db.begin_read();
252        let mut trig_err: Option<crate::error::SqlError> = None;
253        let trig_scan = rtx3.table_for_each(TRIGGERS_TABLE, |_key, value| {
254            match crate::types::TriggerDef::deserialize(value) {
255                Ok(td) => {
256                    triggers
257                        .entry(td.target.to_ascii_lowercase())
258                        .or_default()
259                        .push(td);
260                }
261                Err(e) => {
262                    trig_err = Some(e);
263                }
264            }
265            Ok(())
266        });
267        match trig_scan {
268            Ok(()) => {}
269            Err(citadel_core::Error::TableNotFound(_)) => {}
270            Err(e) => return Err(e.into()),
271        }
272        if let Some(e) = trig_err {
273            return Err(e);
274        }
275        // PG-faithful: triggers fire in name order within a (target, timing, event) group.
276        for v in triggers.values_mut() {
277            v.sort_by(|a, b| a.name.cmp(&b.name));
278        }
279
280        let mut matviews: FxHashMap<String, crate::types::MatviewDef> = FxHashMap::default();
281        let mut rtx4 = db.begin_read();
282        let mut mv_err: Option<crate::error::SqlError> = None;
283        let mv_scan = rtx4.table_for_each(MATVIEWS_TABLE, |_key, value| {
284            match crate::types::MatviewDef::deserialize(value) {
285                Ok(mv) => {
286                    matviews.insert(mv.name.to_ascii_lowercase(), mv);
287                }
288                Err(e) => {
289                    mv_err = Some(e);
290                }
291            }
292            Ok(())
293        });
294        match mv_scan {
295            Ok(()) => {}
296            Err(citadel_core::Error::TableNotFound(_)) => {}
297            Err(e) => return Err(e.into()),
298        }
299        if let Some(e) = mv_err {
300            return Err(e);
301        }
302
303        let mut mgr = Self {
304            tables,
305            views,
306            virtual_tables: FxHashMap::default(),
307            triggers,
308            matviews,
309            temp_aliases: FxHashMap::default(),
310            transition_schemas: std::cell::RefCell::new(FxHashMap::default()),
311            generation: 0,
312            sql_caches: db.sql_cache_handle(),
313            dml_dirty_tables: std::cell::RefCell::new(FxHashSet::default()),
314            dml_append_tables: std::cell::RefCell::new(FxHashMap::default()),
315            fk_children_cache: std::cell::RefCell::new(None),
316        };
317        system_tables::register_builtins(&mut mgr);
318        Ok(mgr)
319    }
320
321    pub fn get_virtual(&self, name: &str) -> Option<&Arc<dyn VirtualTable>> {
322        self.virtual_tables.get(name)
323    }
324
325    pub fn register_virtual(&mut self, vt: Arc<dyn VirtualTable>) {
326        let name = vt.name().to_ascii_lowercase();
327        self.virtual_tables.insert(name, vt);
328    }
329
330    pub fn get(&self, name: &str) -> Option<&TableSchema> {
331        // Hot callers pass pre-lowered names; those skip the String alloc.
332        if name.bytes().any(|b| b.is_ascii_uppercase()) {
333            self.get_lower(&name.to_ascii_lowercase())
334        } else {
335            self.get_lower(name)
336        }
337    }
338
339    /// Resolution precedence: transition > matview > temp alias > base table.
340    fn get_lower(&self, lower: &str) -> Option<&TableSchema> {
341        if let Some(prefixed) = transition_table_lookup(lower) {
342            if let Some(s) = self.tables.get(&prefixed) {
343                return Some(s);
344            }
345            if let Some(&leaked) = self.transition_schemas.borrow().get(&prefixed) {
346                return Some(leaked);
347            }
348        }
349        if !self.matviews.is_empty() {
350            if let Some(mv) = self.matviews.get(lower) {
351                return self.tables.get(&mv.backing_table);
352            }
353        }
354        if !self.temp_aliases.is_empty() {
355            if let Some(prefixed) = self.temp_aliases.get(lower) {
356                return self.tables.get(prefixed);
357            }
358        }
359        // Table keys are registered lowercase; one canonical probe suffices.
360        self.tables.get(lower)
361    }
362
363    pub fn register_transition_schema(&self, storage_name: String, schema: TableSchema) {
364        let leaked: &'static TableSchema = Box::leak(Box::new(schema));
365        self.transition_schemas
366            .borrow_mut()
367            .insert(storage_name, leaked);
368    }
369
370    pub fn unregister_transition_schema(&self, storage_name: &str) {
371        self.transition_schemas.borrow_mut().remove(storage_name);
372    }
373
374    pub fn contains(&self, name: &str) -> bool {
375        if name.bytes().any(|b| b.is_ascii_uppercase()) {
376            self.contains_lower(&name.to_ascii_lowercase())
377        } else {
378            self.contains_lower(name)
379        }
380    }
381
382    fn contains_lower(&self, lower: &str) -> bool {
383        transition_table_lookup(lower).is_some()
384            || (!self.matviews.is_empty() && self.matviews.contains_key(lower))
385            || (!self.temp_aliases.is_empty() && self.temp_aliases.contains_key(lower))
386            || self.tables.contains_key(lower)
387    }
388
389    pub fn generation(&self) -> u64 {
390        self.generation
391    }
392
393    pub fn register(&mut self, schema: TableSchema) {
394        let lower = schema.name.to_ascii_lowercase();
395        self.tables.insert(lower, schema);
396        self.generation += 1;
397    }
398
399    pub fn remove(&mut self, name: &str) -> Option<TableSchema> {
400        let lower = name.to_ascii_lowercase();
401        let result = self.tables.remove(&lower);
402        if result.is_some() {
403            self.generation += 1;
404        }
405        result
406    }
407
408    pub fn table_names(&self) -> Vec<&str> {
409        self.tables.keys().map(|s| s.as_str()).collect()
410    }
411
412    pub fn all_schemas(&self) -> impl Iterator<Item = &TableSchema> {
413        self.tables.values()
414    }
415
416    pub fn get_view(&self, name: &str) -> Option<&ViewDef> {
417        if let Some(v) = self.views.get(name) {
418            return Some(v);
419        }
420        if name.bytes().any(|b| b.is_ascii_uppercase()) {
421            self.views.get(&name.to_ascii_lowercase())
422        } else {
423            None
424        }
425    }
426
427    pub fn register_view(&mut self, view: ViewDef) {
428        let lower = view.name.to_ascii_lowercase();
429        self.views.insert(lower, view);
430        self.generation += 1;
431    }
432
433    pub fn remove_view(&mut self, name: &str) -> Option<ViewDef> {
434        let lower = name.to_ascii_lowercase();
435        let result = self.views.remove(&lower);
436        if result.is_some() {
437            self.generation += 1;
438        }
439        result
440    }
441
442    pub fn view_names(&self) -> Vec<&str> {
443        self.views.keys().map(|s| s.as_str()).collect()
444    }
445
446    pub fn triggers_for(&self, target: &str) -> &[crate::types::TriggerDef] {
447        if self.triggers.is_empty() {
448            return &[];
449        }
450        // Keys are stored lowercased; skip the alloc when target already is.
451        if !target.bytes().any(|b| b.is_ascii_uppercase()) {
452            return self.triggers.get(target).map_or(&[], |v| v.as_slice());
453        }
454        let key = target.to_ascii_lowercase();
455        self.triggers.get(&key).map_or(&[], |v| v.as_slice())
456    }
457
458    pub fn all_triggers(&self) -> impl Iterator<Item = &crate::types::TriggerDef> + '_ {
459        self.triggers.values().flatten()
460    }
461
462    pub fn register_trigger(&mut self, trig: crate::types::TriggerDef) {
463        let target = trig.target.to_ascii_lowercase();
464        let bucket = self.triggers.entry(target).or_default();
465        bucket.push(trig);
466        bucket.sort_by(|a, b| a.name.cmp(&b.name));
467        self.generation += 1;
468    }
469
470    pub fn remove_trigger(&mut self, name: &str) -> Option<crate::types::TriggerDef> {
471        let lower = name.to_ascii_lowercase();
472        let mut result = None;
473        for bucket in self.triggers.values_mut() {
474            if let Some(pos) = bucket
475                .iter()
476                .position(|t| t.name.eq_ignore_ascii_case(&lower))
477            {
478                result = Some(bucket.remove(pos));
479                break;
480            }
481        }
482        if result.is_some() {
483            self.generation += 1;
484        }
485        result
486    }
487
488    /// Caller is responsible for dropping the returned triggers' on-disk catalog rows.
489    pub fn remove_triggers_for(&mut self, target: &str) -> Vec<crate::types::TriggerDef> {
490        let key = target.to_ascii_lowercase();
491        let removed = self.triggers.remove(&key).unwrap_or_default();
492        if !removed.is_empty() {
493            self.generation += 1;
494        }
495        removed
496    }
497
498    pub fn find_trigger(&self, name: &str) -> Option<(&str, &crate::types::TriggerDef)> {
499        let lower = name.to_ascii_lowercase();
500        for (target, bucket) in &self.triggers {
501            if let Some(t) = bucket.iter().find(|t| t.name.eq_ignore_ascii_case(&lower)) {
502                return Some((target.as_str(), t));
503            }
504        }
505        None
506    }
507
508    pub fn set_trigger_enabled(&mut self, name: &str, enabled: bool) -> bool {
509        let lower = name.to_ascii_lowercase();
510        for bucket in self.triggers.values_mut() {
511            if let Some(t) = bucket
512                .iter_mut()
513                .find(|t| t.name.eq_ignore_ascii_case(&lower))
514            {
515                t.enabled = enabled;
516                self.generation += 1;
517                return true;
518            }
519        }
520        false
521    }
522
523    pub fn set_all_triggers_enabled(&mut self, target: &str, enabled: bool) -> usize {
524        let key = target.to_ascii_lowercase();
525        let bucket = match self.triggers.get_mut(&key) {
526            Some(b) => b,
527            None => return 0,
528        };
529        let count = bucket.len();
530        for t in bucket {
531            t.enabled = enabled;
532        }
533        if count > 0 {
534            self.generation += 1;
535        }
536        count
537    }
538
539    pub fn ensure_triggers_table(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>) -> Result<()> {
540        match wtx.create_table(TRIGGERS_TABLE) {
541            Ok(()) => Ok(()),
542            Err(citadel_core::Error::TableAlreadyExists(_)) => Ok(()),
543            Err(e) => Err(e.into()),
544        }
545    }
546
547    pub fn save_trigger(
548        wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
549        trig: &crate::types::TriggerDef,
550    ) -> Result<()> {
551        Self::ensure_triggers_table(wtx)?;
552        let data = trig.serialize();
553        let lower = trig.name.to_ascii_lowercase();
554        wtx.table_insert(TRIGGERS_TABLE, lower.as_bytes(), &data)
555            .map_err(crate::error::SqlError::from)?;
556        Ok(())
557    }
558
559    pub fn delete_trigger(
560        wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
561        name: &str,
562    ) -> Result<()> {
563        Self::ensure_triggers_table(wtx)?;
564        let lower = name.to_ascii_lowercase();
565        wtx.table_delete(TRIGGERS_TABLE, lower.as_bytes())
566            .map_err(crate::error::SqlError::from)?;
567        Ok(())
568    }
569
570    pub fn save_view(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>, view: &ViewDef) -> Result<()> {
571        let lower = view.name.to_ascii_lowercase();
572        let data = view.serialize();
573        wtx.table_insert(VIEWS_TABLE, lower.as_bytes(), &data)?;
574        Ok(())
575    }
576
577    pub fn delete_view(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>, name: &str) -> Result<()> {
578        let lower = name.to_ascii_lowercase();
579        wtx.table_delete(VIEWS_TABLE, lower.as_bytes())
580            .map_err(|e| match e {
581                citadel_core::Error::TableNotFound(_) => SqlError::ViewNotFound(name.into()),
582                other => SqlError::Storage(other),
583            })?;
584        Ok(())
585    }
586
587    pub fn ensure_views_table(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>) -> Result<()> {
588        match wtx.create_table(VIEWS_TABLE) {
589            Ok(()) => Ok(()),
590            Err(citadel_core::Error::TableAlreadyExists(_)) => Ok(()),
591            Err(e) => Err(e.into()),
592        }
593    }
594
595    pub fn get_matview(&self, name: &str) -> Option<&crate::types::MatviewDef> {
596        let lower = name.to_ascii_lowercase();
597        self.matviews.get(&lower)
598    }
599
600    pub fn matview_names(&self) -> Vec<&str> {
601        self.matviews.keys().map(|s| s.as_str()).collect()
602    }
603
604    pub fn all_matviews(&self) -> impl Iterator<Item = &crate::types::MatviewDef> + '_ {
605        self.matviews.values()
606    }
607
608    pub fn register_matview(&mut self, mv: crate::types::MatviewDef) {
609        let lower = mv.name.to_ascii_lowercase();
610        self.matviews.insert(lower, mv);
611        self.generation += 1;
612    }
613
614    pub fn remove_matview(&mut self, name: &str) -> Option<crate::types::MatviewDef> {
615        let lower = name.to_ascii_lowercase();
616        let removed = self.matviews.remove(&lower);
617        if removed.is_some() {
618            self.generation += 1;
619        }
620        removed
621    }
622
623    pub fn ensure_matviews_table(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>) -> Result<()> {
624        match wtx.create_table(MATVIEWS_TABLE) {
625            Ok(()) => Ok(()),
626            Err(citadel_core::Error::TableAlreadyExists(_)) => Ok(()),
627            Err(e) => Err(e.into()),
628        }
629    }
630
631    pub fn save_matview(
632        wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
633        mv: &crate::types::MatviewDef,
634    ) -> Result<()> {
635        Self::ensure_matviews_table(wtx)?;
636        let lower = mv.name.to_ascii_lowercase();
637        let data = mv.serialize();
638        wtx.table_insert(MATVIEWS_TABLE, lower.as_bytes(), &data)?;
639        Ok(())
640    }
641
642    pub fn delete_matview(
643        wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
644        name: &str,
645    ) -> Result<()> {
646        Self::ensure_matviews_table(wtx)?;
647        let lower = name.to_ascii_lowercase();
648        wtx.table_delete(MATVIEWS_TABLE, lower.as_bytes())
649            .map_err(crate::error::SqlError::from)?;
650        Ok(())
651    }
652
653    pub fn child_fks_for(&self, parent: &str) -> Vec<(&str, &ForeignKeySchemaEntry)> {
654        self.ensure_fk_children_cache();
655        let cache = self.fk_children_cache.borrow();
656        let Some((_, map)) = cache.as_ref() else {
657            return Vec::new();
658        };
659        let Some(children) = map.get(parent) else {
660            return Vec::new();
661        };
662        children
663            .iter()
664            .map(|(child, fk_idx)| {
665                let schema = self.tables.get(child).expect("cached child table exists");
666                (schema.name.as_str(), &schema.foreign_keys[*fk_idx])
667            })
668            .collect()
669    }
670
671    /// Rebuild the reverse FK index iff it is stale for the current generation.
672    fn ensure_fk_children_cache(&self) {
673        if matches!(self.fk_children_cache.borrow().as_ref(), Some((g, _)) if *g == self.generation)
674        {
675            return;
676        }
677        let mut map: FxHashMap<String, Vec<(String, usize)>> = FxHashMap::default();
678        for (child_name, schema) in &self.tables {
679            for (fk_idx, fk) in schema.foreign_keys.iter().enumerate() {
680                map.entry(fk.foreign_table.clone())
681                    .or_default()
682                    .push((child_name.clone(), fk_idx));
683            }
684        }
685        *self.fk_children_cache.borrow_mut() = Some((self.generation, map));
686    }
687
688    pub fn save_schema(
689        wtx: &mut citadel_txn::write_txn::WriteTxn<'_>,
690        schema: &TableSchema,
691    ) -> Result<()> {
692        let lower = schema.name.to_ascii_lowercase();
693        let data = schema.serialize();
694        wtx.table_insert(SCHEMA_TABLE, lower.as_bytes(), &data)?;
695        Ok(())
696    }
697
698    pub fn delete_schema(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>, name: &str) -> Result<()> {
699        let lower = name.to_ascii_lowercase();
700        wtx.table_delete(SCHEMA_TABLE, lower.as_bytes())
701            .map_err(|e| match e {
702                citadel_core::Error::TableNotFound(_) => SqlError::TableNotFound(name.into()),
703                other => SqlError::Storage(other),
704            })?;
705        Ok(())
706    }
707
708    pub fn ensure_schema_table(wtx: &mut citadel_txn::write_txn::WriteTxn<'_>) -> Result<()> {
709        match wtx.create_table(SCHEMA_TABLE) {
710            Ok(()) => Ok(()),
711            Err(citadel_core::Error::TableAlreadyExists(_)) => Ok(()),
712            Err(e) => Err(e.into()),
713        }
714    }
715
716    pub fn save_snapshot(&self) -> SchemaSnapshot {
717        SchemaSnapshot {
718            tables: self.tables.clone(),
719            views: self.views.clone(),
720            generation: self.generation,
721        }
722    }
723
724    pub fn restore_snapshot(&mut self, snap: SchemaSnapshot) {
725        // Equal generations prove nothing changed since the snapshot: restore
726        // is a no-op and plans stay valid. Otherwise never rewind - a plan
727        // compiled after the snapshot must not revalidate against it.
728        if self.generation != snap.generation {
729            self.generation = self.generation.max(snap.generation) + 1;
730        }
731        self.tables = snap.tables;
732        self.views = snap.views;
733    }
734
735    /// Advance the generation strictly past `prior` so plans compiled against
736    /// a predecessor manager can never revalidate against this one.
737    pub fn bump_generation_past(&mut self, prior: u64) {
738        if self.generation <= prior {
739            self.generation = prior + 1;
740        }
741    }
742}
743
744#[cfg(test)]
745#[path = "schema_tests.rs"]
746mod tests;