sim_kernel/catalog/
snapshot.rs1use std::collections::BTreeMap;
2
3use crate::{Error, Expr, Result, Symbol};
4
5use super::{
6 CatalogRow, CatalogStore, CatalogTableSpec,
7 snapshot_expr::{snapshot_from_expr, snapshot_to_expr, unresolved_live_expr},
8};
9
10#[derive(Clone, Debug, PartialEq, Eq)]
28pub struct CatalogSnapshot {
29 pub tables: BTreeMap<Symbol, CatalogTableSpec>,
31 pub rows: BTreeMap<Symbol, BTreeMap<Symbol, CatalogSnapshotRow>>,
33 pub sequences: BTreeMap<Symbol, u64>,
35 pub epoch: u64,
37}
38
39impl CatalogSnapshot {
40 pub fn from_store(store: &CatalogStore) -> Self {
43 let rows = visible_rows(store)
44 .iter()
45 .map(|(table, rows)| {
46 let rows = rows
47 .iter()
48 .map(|(key, row)| {
49 (
50 key.clone(),
51 CatalogSnapshotRow {
52 table: row.table.clone(),
53 key: row.key.clone(),
54 epoch: row.epoch,
55 data: snapshot_row_data(row),
56 },
57 )
58 })
59 .collect();
60 (table.clone(), rows)
61 })
62 .collect();
63
64 Self {
65 tables: store.tables.clone(),
66 rows,
67 sequences: visible_sequences(store).clone(),
68 epoch: store.epoch(),
69 }
70 }
71
72 pub fn to_expr(&self) -> Expr {
74 snapshot_to_expr(self)
75 }
76
77 pub fn from_expr(expr: Expr) -> Result<Self> {
79 snapshot_from_expr(expr)
80 }
81
82 pub fn rows(&self, table: &Symbol) -> Option<&BTreeMap<Symbol, CatalogSnapshotRow>> {
84 self.rows.get(table)
85 }
86}
87
88#[derive(Clone, Debug, PartialEq, Eq)]
91pub struct CatalogSnapshotRow {
92 pub table: Symbol,
94 pub key: Symbol,
96 pub epoch: u64,
98 pub data: BTreeMap<Symbol, Expr>,
100}
101
102impl CatalogStore {
103 pub fn from_snapshot(snapshot: CatalogSnapshot) -> Result<Self> {
106 validate_table_keys(&snapshot.tables)?;
107 let mut store = CatalogStore {
108 tables: snapshot.tables,
109 sequences: snapshot.sequences,
110 epoch: snapshot.epoch,
111 ..Self::default()
112 };
113
114 for (table, rows) in snapshot.rows {
115 let spec = store
116 .table(&table)
117 .cloned()
118 .ok_or_else(|| Error::CatalogSchema {
119 table: table.clone(),
120 message: "unknown catalog table".to_owned(),
121 })?;
122 for (key, snapshot_row) in rows {
123 validate_snapshot_row(&table, &key, &snapshot_row, &spec, store.epoch)?;
124 let mut row = CatalogRow::new(snapshot_row.table, snapshot_row.key);
125 row.data = snapshot_row.data;
126 row.set_epoch(snapshot_row.epoch);
127 store
128 .rows
129 .entry(table.clone())
130 .or_default()
131 .insert(key, row);
132 }
133 }
134
135 Ok(store)
136 }
137}
138
139fn visible_rows(store: &CatalogStore) -> &BTreeMap<Symbol, BTreeMap<Symbol, CatalogRow>> {
140 store
141 .overlay
142 .as_ref()
143 .map_or(&store.rows, |overlay| overlay.all_rows())
144}
145
146fn visible_sequences(store: &CatalogStore) -> &BTreeMap<Symbol, u64> {
147 store
148 .overlay
149 .as_ref()
150 .map_or(&store.sequences, |overlay| overlay.all_sequences())
151}
152
153fn snapshot_row_data(row: &CatalogRow) -> BTreeMap<Symbol, Expr> {
154 let mut data = row.data.clone();
155 for field in row.live.keys() {
156 data.entry(field.clone())
157 .or_insert_with(|| unresolved_live_expr(row, field));
158 }
159 data
160}
161
162fn validate_table_keys(tables: &BTreeMap<Symbol, CatalogTableSpec>) -> Result<()> {
163 for (name, spec) in tables {
164 if name != &spec.name {
165 return Err(Error::CatalogSchema {
166 table: name.clone(),
167 message: "table spec key does not match table name".to_owned(),
168 });
169 }
170 }
171 Ok(())
172}
173
174fn validate_snapshot_row(
175 table: &Symbol,
176 key: &Symbol,
177 row: &CatalogSnapshotRow,
178 spec: &CatalogTableSpec,
179 snapshot_epoch: u64,
180) -> Result<()> {
181 if &row.table != table || &row.key != key {
182 return Err(Error::CatalogSchema {
183 table: table.clone(),
184 message: "snapshot row key does not match row data".to_owned(),
185 });
186 }
187 if row.epoch > snapshot_epoch {
188 return Err(Error::CatalogSchema {
189 table: table.clone(),
190 message: "row epoch is newer than snapshot epoch".to_owned(),
191 });
192 }
193 for field in &spec.required_fields {
194 if !row.data.contains_key(field) {
195 return Err(Error::CatalogSchema {
196 table: table.clone(),
197 message: format!("missing required catalog field {field}"),
198 });
199 }
200 }
201 Ok(())
202}