sim_kernel/catalog/
overlay.rs1use std::collections::{BTreeMap, BTreeSet};
2
3use crate::Symbol;
4
5use super::{CatalogEvent, CatalogOp, CatalogRow, CatalogStore, CatalogTx};
6
7#[derive(Clone, Debug)]
13pub struct CatalogOverlay {
14 parent_epoch: u64,
15 local_rows: BTreeMap<Symbol, BTreeMap<Symbol, CatalogRow>>,
16 local_deletes: BTreeMap<Symbol, BTreeSet<Symbol>>,
17 local_sequences: BTreeMap<Symbol, u64>,
18 rows: BTreeMap<Symbol, BTreeMap<Symbol, CatalogRow>>,
19 sequences: BTreeMap<Symbol, u64>,
20 journal: Vec<CatalogEvent>,
21 epoch: u64,
22}
23
24impl CatalogOverlay {
25 pub(crate) fn from_parent(store: &CatalogStore) -> Self {
26 Self {
27 parent_epoch: store.epoch,
28 local_rows: BTreeMap::new(),
29 local_deletes: BTreeMap::new(),
30 local_sequences: BTreeMap::new(),
31 rows: store.rows.clone(),
32 sequences: store.sequences.clone(),
33 journal: store.journal.clone(),
34 epoch: store.epoch,
35 }
36 }
37
38 pub fn parent_epoch(&self) -> u64 {
40 self.parent_epoch
41 }
42
43 pub(crate) fn row(&self, table: &Symbol, key: &Symbol) -> Option<&CatalogRow> {
44 self.rows.get(table).and_then(|rows| rows.get(key))
45 }
46
47 pub(crate) fn rows(&self, table: &Symbol) -> Option<&BTreeMap<Symbol, CatalogRow>> {
48 self.rows.get(table)
49 }
50
51 pub(crate) fn all_rows(&self) -> &BTreeMap<Symbol, BTreeMap<Symbol, CatalogRow>> {
52 &self.rows
53 }
54
55 pub(crate) fn all_sequences(&self) -> &BTreeMap<Symbol, u64> {
56 &self.sequences
57 }
58
59 pub(crate) fn sequence(&self, name: &Symbol) -> Option<u64> {
60 self.sequences.get(name).copied()
61 }
62
63 pub(crate) fn journal(&self) -> &[CatalogEvent] {
64 &self.journal
65 }
66
67 pub(crate) fn epoch(&self) -> u64 {
68 self.epoch
69 }
70
71 pub(crate) fn bump_epoch(&mut self) -> u64 {
72 self.epoch += 1;
73 self.epoch
74 }
75
76 pub(crate) fn put_row(&mut self, row: CatalogRow) {
77 let table = row.table.clone();
78 let key = row.key.clone();
79 if let Some(deleted) = self.local_deletes.get_mut(&table) {
80 deleted.remove(&key);
81 }
82 if self
83 .local_deletes
84 .get(&table)
85 .is_some_and(BTreeSet::is_empty)
86 {
87 self.local_deletes.remove(&table);
88 }
89 self.rows
90 .entry(table.clone())
91 .or_default()
92 .insert(key.clone(), row.clone());
93 self.local_rows.entry(table).or_default().insert(key, row);
94 }
95
96 pub(crate) fn delete_row(&mut self, table: &Symbol, key: &Symbol) {
97 if let Some(rows) = self.rows.get_mut(table) {
98 rows.remove(key);
99 if rows.is_empty() {
100 self.rows.remove(table);
101 }
102 }
103 if let Some(rows) = self.local_rows.get_mut(table) {
104 rows.remove(key);
105 if rows.is_empty() {
106 self.local_rows.remove(table);
107 }
108 }
109 self.local_deletes
110 .entry(table.clone())
111 .or_default()
112 .insert(key.clone());
113 }
114
115 pub(crate) fn bump_sequence(&mut self, name: Symbol, reserved: u64) -> u64 {
116 let visible = self.sequences.entry(name.clone()).or_default();
117 *visible = (*visible).max(reserved);
118 let local = self.local_sequences.entry(name).or_default();
119 *local = (*local).max(*visible);
120 *visible
121 }
122
123 pub(crate) fn push_event(&mut self, event: CatalogEvent) {
124 self.journal.push(event);
125 }
126
127 pub(crate) fn is_empty(&self) -> bool {
128 self.local_rows.is_empty()
129 && self.local_deletes.is_empty()
130 && self.local_sequences.is_empty()
131 }
132
133 pub(crate) fn into_tx(self) -> CatalogTx {
134 let mut tx = CatalogTx::new();
135 for (table, keys) in self.local_deletes {
136 for key in keys {
137 tx.push(CatalogOp::DeleteRow {
138 table: table.clone(),
139 key,
140 });
141 }
142 }
143 for rows in self.local_rows.into_values() {
144 for row in rows.into_values() {
145 tx.put_row(row);
146 }
147 }
148 for (name, reserved) in self.local_sequences {
149 tx.bump_sequence(name, reserved);
150 }
151 tx
152 }
153}