Skip to main content

sim_kernel/catalog/
store.rs

1use std::collections::BTreeMap;
2
3use crate::{Error, Result, Symbol};
4
5use super::{CatalogEvent, CatalogOverlay, CatalogRow, CatalogTableSpec};
6
7/// `Cx`-free, lock-free transactional table storage owned by the registry.
8///
9/// The store holds table specs, rows by table and key, named sequences, the
10/// append-only journal, and the current epoch. It is the authoritative catalog
11/// storage described in the README section "Contract: registry catalog
12/// substrate"; mutation flows through [`CatalogTx`](super::CatalogTx) commits,
13/// optionally buffered through an overlay.
14///
15/// # Examples
16///
17/// ```
18/// # use sim_kernel::catalog::{CatalogRow, CatalogStore, CatalogTableSpec, CatalogTx, CatalogWritePolicy};
19/// # use sim_kernel::Symbol;
20/// let mut store = CatalogStore::new();
21/// let table = Symbol::new("registry/libs");
22/// store.install_table(CatalogTableSpec::new(table.clone(), CatalogWritePolicy::Mutable)).unwrap();
23///
24/// let mut tx = CatalogTx::new();
25/// tx.put_row(CatalogRow::new(table.clone(), Symbol::new("demo")));
26/// let epoch = tx.commit(&mut store).unwrap();
27///
28/// assert_eq!(store.epoch(), epoch);
29/// assert!(store.row(&table, &Symbol::new("demo")).is_some());
30/// ```
31#[derive(Clone, Debug, Default)]
32pub struct CatalogStore {
33    pub(crate) tables: BTreeMap<Symbol, CatalogTableSpec>,
34    pub(crate) rows: BTreeMap<Symbol, BTreeMap<Symbol, CatalogRow>>,
35    pub(crate) sequences: BTreeMap<Symbol, u64>,
36    pub(crate) journal: Vec<CatalogEvent>,
37    pub(crate) epoch: u64,
38    pub(crate) overlay: Option<CatalogOverlay>,
39}
40
41impl CatalogStore {
42    /// Creates an empty store with no tables, rows, or sequences.
43    pub fn new() -> Self {
44        Self::default()
45    }
46
47    /// Installs a table spec, erroring if a table of that name already exists.
48    pub fn install_table(&mut self, spec: CatalogTableSpec) -> Result<()> {
49        if self.tables.contains_key(&spec.name) {
50            return Err(Error::CatalogConflict {
51                table: spec.name.clone(),
52                key: spec.name,
53            });
54        }
55        self.tables.insert(spec.name.clone(), spec);
56        Ok(())
57    }
58
59    /// Returns the spec for `name`, if the table is installed.
60    pub fn table(&self, name: &Symbol) -> Option<&CatalogTableSpec> {
61        self.tables.get(name)
62    }
63
64    /// Returns all installed table specs by name.
65    pub fn tables(&self) -> &BTreeMap<Symbol, CatalogTableSpec> {
66        &self.tables
67    }
68
69    /// Returns the row at `table`/`key`, reading through any active overlay.
70    pub fn row(&self, table: &Symbol, key: &Symbol) -> Option<&CatalogRow> {
71        if let Some(overlay) = &self.overlay {
72            return overlay.row(table, key);
73        }
74        self.rows.get(table).and_then(|rows| rows.get(key))
75    }
76
77    /// Returns all rows of `table` by key, reading through any active overlay.
78    pub fn rows(&self, table: &Symbol) -> Option<&BTreeMap<Symbol, CatalogRow>> {
79        if let Some(overlay) = &self.overlay {
80            return overlay.rows(table);
81        }
82        self.rows.get(table)
83    }
84
85    /// Returns the current value of sequence `name`, if any.
86    pub fn sequence(&self, name: &Symbol) -> Option<u64> {
87        if let Some(overlay) = &self.overlay {
88            return overlay.sequence(name);
89        }
90        self.sequences.get(name).copied()
91    }
92
93    /// Returns the append-only audit journal, reading through any active overlay.
94    pub fn journal(&self) -> &[CatalogEvent] {
95        if let Some(overlay) = &self.overlay {
96            return overlay.journal();
97        }
98        &self.journal
99    }
100
101    /// Returns the current catalog epoch, reading through any active overlay.
102    pub fn epoch(&self) -> u64 {
103        if let Some(overlay) = &self.overlay {
104            return overlay.epoch();
105        }
106        self.epoch
107    }
108
109    /// Runs `f` against a buffered overlay, committing its edits on success and
110    /// rolling them back on error.
111    pub fn with_overlay<F, R>(&mut self, f: F) -> Result<R>
112    where
113        F: FnOnce(&mut Self) -> Result<R>,
114    {
115        self.begin_overlay()?;
116        match f(self) {
117            Ok(value) => {
118                self.commit_overlay()?;
119                Ok(value)
120            }
121            Err(err) => {
122                self.rollback_overlay();
123                Err(err)
124            }
125        }
126    }
127
128    pub(crate) fn begin_overlay(&mut self) -> Result<()> {
129        if self.overlay.is_some() {
130            return Err(overlay_error("nested catalog overlays are not supported"));
131        }
132        self.overlay = Some(CatalogOverlay::from_parent(self));
133        Ok(())
134    }
135
136    pub(crate) fn rollback_overlay(&mut self) {
137        self.overlay = None;
138    }
139
140    pub(crate) fn commit_overlay(&mut self) -> Result<()> {
141        let Some(overlay) = self.overlay.take() else {
142            return Err(overlay_error("no catalog overlay is active"));
143        };
144        if overlay.parent_epoch() != self.epoch {
145            return Err(overlay_error("catalog overlay parent epoch changed"));
146        }
147        if overlay.is_empty() {
148            return Ok(());
149        }
150        overlay.into_tx().commit(self).map(|_| ())
151    }
152
153    pub(crate) fn bump_epoch(&mut self) -> u64 {
154        if let Some(overlay) = &mut self.overlay {
155            return overlay.bump_epoch();
156        }
157        self.epoch += 1;
158        self.epoch
159    }
160
161    pub(crate) fn put_row(&mut self, row: CatalogRow) {
162        if let Some(overlay) = &mut self.overlay {
163            overlay.put_row(row);
164            return;
165        }
166        self.rows
167            .entry(row.table.clone())
168            .or_default()
169            .insert(row.key.clone(), row);
170    }
171
172    pub(crate) fn delete_row(&mut self, table: &Symbol, key: &Symbol) {
173        if let Some(overlay) = &mut self.overlay {
174            overlay.delete_row(table, key);
175            return;
176        }
177        if let Some(rows) = self.rows.get_mut(table) {
178            rows.remove(key);
179            if rows.is_empty() {
180                self.rows.remove(table);
181            }
182        }
183    }
184
185    pub(crate) fn bump_sequence(&mut self, name: Symbol, reserved: u64) -> u64 {
186        if let Some(overlay) = &mut self.overlay {
187            return overlay.bump_sequence(name, reserved);
188        }
189        let slot = self.sequences.entry(name).or_default();
190        *slot = (*slot).max(reserved);
191        *slot
192    }
193
194    pub(crate) fn push_event(&mut self, event: CatalogEvent) {
195        if let Some(overlay) = &mut self.overlay {
196            overlay.push_event(event);
197            return;
198        }
199        self.journal.push(event);
200    }
201}
202
203fn overlay_error(message: &'static str) -> Error {
204    Error::CatalogSchema {
205        table: Symbol::qualified("catalog", "overlay"),
206        message: message.to_owned(),
207    }
208}