Skip to main content

sim_kernel/catalog/
overlay.rs

1use std::collections::{BTreeMap, BTreeSet};
2
3use crate::Symbol;
4
5use super::{CatalogEvent, CatalogOp, CatalogRow, CatalogStore, CatalogTx};
6
7/// Buffered, uncommitted catalog edits layered over a base [`CatalogStore`].
8///
9/// An overlay snapshots the parent's rows, sequences, and journal, accumulates
10/// local puts, deletes, and sequence bumps, and is either committed back as a
11/// single transaction or discarded.
12#[derive(Clone, Debug)]
13pub struct CatalogOverlay {
14    parent_epoch: u64,
15    local_rows: BTreeMap<Symbol, BTreeMap<Symbol, CatalogRow>>,
16    local_deletes: BTreeMap<Symbol, BTreeSet<Symbol>>,
17    local_sequences: BTreeMap<Symbol, u64>,
18    rows: BTreeMap<Symbol, BTreeMap<Symbol, CatalogRow>>,
19    sequences: BTreeMap<Symbol, u64>,
20    journal: Vec<CatalogEvent>,
21    epoch: u64,
22}
23
24impl CatalogOverlay {
25    pub(crate) fn from_parent(store: &CatalogStore) -> Self {
26        Self {
27            parent_epoch: store.epoch,
28            local_rows: BTreeMap::new(),
29            local_deletes: BTreeMap::new(),
30            local_sequences: BTreeMap::new(),
31            rows: store.rows.clone(),
32            sequences: store.sequences.clone(),
33            journal: store.journal.clone(),
34            epoch: store.epoch,
35        }
36    }
37
38    /// Returns the parent store epoch the overlay was created from.
39    pub fn parent_epoch(&self) -> u64 {
40        self.parent_epoch
41    }
42
43    pub(crate) fn row(&self, table: &Symbol, key: &Symbol) -> Option<&CatalogRow> {
44        self.rows.get(table).and_then(|rows| rows.get(key))
45    }
46
47    pub(crate) fn rows(&self, table: &Symbol) -> Option<&BTreeMap<Symbol, CatalogRow>> {
48        self.rows.get(table)
49    }
50
51    pub(crate) fn all_rows(&self) -> &BTreeMap<Symbol, BTreeMap<Symbol, CatalogRow>> {
52        &self.rows
53    }
54
55    pub(crate) fn all_sequences(&self) -> &BTreeMap<Symbol, u64> {
56        &self.sequences
57    }
58
59    pub(crate) fn sequence(&self, name: &Symbol) -> Option<u64> {
60        self.sequences.get(name).copied()
61    }
62
63    pub(crate) fn journal(&self) -> &[CatalogEvent] {
64        &self.journal
65    }
66
67    pub(crate) fn epoch(&self) -> u64 {
68        self.epoch
69    }
70
71    pub(crate) fn bump_epoch(&mut self) -> u64 {
72        self.epoch += 1;
73        self.epoch
74    }
75
76    pub(crate) fn put_row(&mut self, row: CatalogRow) {
77        let table = row.table.clone();
78        let key = row.key.clone();
79        if let Some(deleted) = self.local_deletes.get_mut(&table) {
80            deleted.remove(&key);
81        }
82        if self
83            .local_deletes
84            .get(&table)
85            .is_some_and(BTreeSet::is_empty)
86        {
87            self.local_deletes.remove(&table);
88        }
89        self.rows
90            .entry(table.clone())
91            .or_default()
92            .insert(key.clone(), row.clone());
93        self.local_rows.entry(table).or_default().insert(key, row);
94    }
95
96    pub(crate) fn delete_row(&mut self, table: &Symbol, key: &Symbol) {
97        if let Some(rows) = self.rows.get_mut(table) {
98            rows.remove(key);
99            if rows.is_empty() {
100                self.rows.remove(table);
101            }
102        }
103        if let Some(rows) = self.local_rows.get_mut(table) {
104            rows.remove(key);
105            if rows.is_empty() {
106                self.local_rows.remove(table);
107            }
108        }
109        self.local_deletes
110            .entry(table.clone())
111            .or_default()
112            .insert(key.clone());
113    }
114
115    pub(crate) fn bump_sequence(&mut self, name: Symbol, reserved: u64) -> u64 {
116        let visible = self.sequences.entry(name.clone()).or_default();
117        *visible = (*visible).max(reserved);
118        let local = self.local_sequences.entry(name).or_default();
119        *local = (*local).max(*visible);
120        *visible
121    }
122
123    pub(crate) fn push_event(&mut self, event: CatalogEvent) {
124        self.journal.push(event);
125    }
126
127    pub(crate) fn is_empty(&self) -> bool {
128        self.local_rows.is_empty()
129            && self.local_deletes.is_empty()
130            && self.local_sequences.is_empty()
131    }
132
133    pub(crate) fn into_tx(self) -> CatalogTx {
134        let mut tx = CatalogTx::new();
135        for (table, keys) in self.local_deletes {
136            for key in keys {
137                tx.push(CatalogOp::DeleteRow {
138                    table: table.clone(),
139                    key,
140                });
141            }
142        }
143        for rows in self.local_rows.into_values() {
144            for row in rows.into_values() {
145                tx.put_row(row);
146            }
147        }
148        for (name, reserved) in self.local_sequences {
149            tx.bump_sequence(name, reserved);
150        }
151        tx
152    }
153}