sim_kernel/catalog/
store.rs1use std::collections::BTreeMap;
2
3use crate::{Error, Result, Symbol};
4
5use super::{CatalogEvent, CatalogOverlay, CatalogRow, CatalogTableSpec};
6
7#[derive(Clone, Debug, Default)]
32pub struct CatalogStore {
33 pub(crate) tables: BTreeMap<Symbol, CatalogTableSpec>,
34 pub(crate) rows: BTreeMap<Symbol, BTreeMap<Symbol, CatalogRow>>,
35 pub(crate) sequences: BTreeMap<Symbol, u64>,
36 pub(crate) journal: Vec<CatalogEvent>,
37 pub(crate) epoch: u64,
38 pub(crate) overlay: Option<CatalogOverlay>,
39}
40
41impl CatalogStore {
42 pub fn new() -> Self {
44 Self::default()
45 }
46
47 pub fn install_table(&mut self, spec: CatalogTableSpec) -> Result<()> {
49 if self.tables.contains_key(&spec.name) {
50 return Err(Error::CatalogConflict {
51 table: spec.name.clone(),
52 key: spec.name,
53 });
54 }
55 self.tables.insert(spec.name.clone(), spec);
56 Ok(())
57 }
58
59 pub fn table(&self, name: &Symbol) -> Option<&CatalogTableSpec> {
61 self.tables.get(name)
62 }
63
64 pub fn tables(&self) -> &BTreeMap<Symbol, CatalogTableSpec> {
66 &self.tables
67 }
68
69 pub fn row(&self, table: &Symbol, key: &Symbol) -> Option<&CatalogRow> {
71 if let Some(overlay) = &self.overlay {
72 return overlay.row(table, key);
73 }
74 self.rows.get(table).and_then(|rows| rows.get(key))
75 }
76
77 pub fn rows(&self, table: &Symbol) -> Option<&BTreeMap<Symbol, CatalogRow>> {
79 if let Some(overlay) = &self.overlay {
80 return overlay.rows(table);
81 }
82 self.rows.get(table)
83 }
84
85 pub fn sequence(&self, name: &Symbol) -> Option<u64> {
87 if let Some(overlay) = &self.overlay {
88 return overlay.sequence(name);
89 }
90 self.sequences.get(name).copied()
91 }
92
93 pub fn journal(&self) -> &[CatalogEvent] {
95 if let Some(overlay) = &self.overlay {
96 return overlay.journal();
97 }
98 &self.journal
99 }
100
101 pub fn epoch(&self) -> u64 {
103 if let Some(overlay) = &self.overlay {
104 return overlay.epoch();
105 }
106 self.epoch
107 }
108
109 pub fn with_overlay<F, R>(&mut self, f: F) -> Result<R>
112 where
113 F: FnOnce(&mut Self) -> Result<R>,
114 {
115 self.begin_overlay()?;
116 match f(self) {
117 Ok(value) => {
118 self.commit_overlay()?;
119 Ok(value)
120 }
121 Err(err) => {
122 self.rollback_overlay();
123 Err(err)
124 }
125 }
126 }
127
128 pub(crate) fn begin_overlay(&mut self) -> Result<()> {
129 if self.overlay.is_some() {
130 return Err(overlay_error("nested catalog overlays are not supported"));
131 }
132 self.overlay = Some(CatalogOverlay::from_parent(self));
133 Ok(())
134 }
135
136 pub(crate) fn rollback_overlay(&mut self) {
137 self.overlay = None;
138 }
139
140 pub(crate) fn commit_overlay(&mut self) -> Result<()> {
141 let Some(overlay) = self.overlay.take() else {
142 return Err(overlay_error("no catalog overlay is active"));
143 };
144 if overlay.parent_epoch() != self.epoch {
145 return Err(overlay_error("catalog overlay parent epoch changed"));
146 }
147 if overlay.is_empty() {
148 return Ok(());
149 }
150 overlay.into_tx().commit(self).map(|_| ())
151 }
152
153 pub(crate) fn bump_epoch(&mut self) -> u64 {
154 if let Some(overlay) = &mut self.overlay {
155 return overlay.bump_epoch();
156 }
157 self.epoch += 1;
158 self.epoch
159 }
160
161 pub(crate) fn put_row(&mut self, row: CatalogRow) {
162 if let Some(overlay) = &mut self.overlay {
163 overlay.put_row(row);
164 return;
165 }
166 self.rows
167 .entry(row.table.clone())
168 .or_default()
169 .insert(row.key.clone(), row);
170 }
171
172 pub(crate) fn delete_row(&mut self, table: &Symbol, key: &Symbol) {
173 if let Some(overlay) = &mut self.overlay {
174 overlay.delete_row(table, key);
175 return;
176 }
177 if let Some(rows) = self.rows.get_mut(table) {
178 rows.remove(key);
179 if rows.is_empty() {
180 self.rows.remove(table);
181 }
182 }
183 }
184
185 pub(crate) fn bump_sequence(&mut self, name: Symbol, reserved: u64) -> u64 {
186 if let Some(overlay) = &mut self.overlay {
187 return overlay.bump_sequence(name, reserved);
188 }
189 let slot = self.sequences.entry(name).or_default();
190 *slot = (*slot).max(reserved);
191 *slot
192 }
193
194 pub(crate) fn push_event(&mut self, event: CatalogEvent) {
195 if let Some(overlay) = &mut self.overlay {
196 overlay.push_event(event);
197 return;
198 }
199 self.journal.push(event);
200 }
201}
202
203fn overlay_error(message: &'static str) -> Error {
204 Error::CatalogSchema {
205 table: Symbol::qualified("catalog", "overlay"),
206 message: message.to_owned(),
207 }
208}