Skip to main content

sim_kernel/catalog/
snapshot.rs

1use std::collections::BTreeMap;
2
3use crate::{Error, Expr, Result, Symbol};
4
5use super::{
6    CatalogRow, CatalogStore, CatalogTableSpec,
7    snapshot_expr::{snapshot_from_expr, snapshot_to_expr, unresolved_live_expr},
8};
9
10/// A deterministic, point-in-time copy of a [`CatalogStore`]'s data.
11///
12/// Snapshots carry table specs, row data, sequences, and the catalog epoch. Live
13/// host payloads (runtime values, tests) are not serialized; their fields become
14/// `catalog/unresolved-live` markers. See the README section "Snapshots and
15/// deltas".
16///
17/// # Examples
18///
19/// ```
20/// # use sim_kernel::catalog::{CatalogSnapshot, CatalogStore};
21/// let store = CatalogStore::new();
22/// let snapshot = CatalogSnapshot::from_store(&store);
23/// // Snapshots round-trip through a deterministic Expr form.
24/// let restored = CatalogSnapshot::from_expr(snapshot.to_expr()).unwrap();
25/// assert_eq!(restored, snapshot);
26/// ```
27#[derive(Clone, Debug, PartialEq, Eq)]
28pub struct CatalogSnapshot {
29    /// Installed table specs by name.
30    pub tables: BTreeMap<Symbol, CatalogTableSpec>,
31    /// Snapshot rows by table and key.
32    pub rows: BTreeMap<Symbol, BTreeMap<Symbol, CatalogSnapshotRow>>,
33    /// Sequence values by name.
34    pub sequences: BTreeMap<Symbol, u64>,
35    /// Catalog epoch the snapshot was taken at.
36    pub epoch: u64,
37}
38
39impl CatalogSnapshot {
40    /// Captures the visible data of `store`, replacing live payloads with
41    /// unresolved-live markers.
42    pub fn from_store(store: &CatalogStore) -> Self {
43        let rows = visible_rows(store)
44            .iter()
45            .map(|(table, rows)| {
46                let rows = rows
47                    .iter()
48                    .map(|(key, row)| {
49                        (
50                            key.clone(),
51                            CatalogSnapshotRow {
52                                table: row.table.clone(),
53                                key: row.key.clone(),
54                                epoch: row.epoch,
55                                data: snapshot_row_data(row),
56                            },
57                        )
58                    })
59                    .collect();
60                (table.clone(), rows)
61            })
62            .collect();
63
64        Self {
65            tables: store.tables.clone(),
66            rows,
67            sequences: visible_sequences(store).clone(),
68            epoch: store.epoch(),
69        }
70    }
71
72    /// Emits the deterministic [`Expr`] encoding of the snapshot.
73    pub fn to_expr(&self) -> Expr {
74        snapshot_to_expr(self)
75    }
76
77    /// Parses a snapshot from the [`Expr`] shape produced by [`Self::to_expr`].
78    pub fn from_expr(expr: Expr) -> Result<Self> {
79        snapshot_from_expr(expr)
80    }
81
82    /// Returns the snapshot rows of `table` by key, if any.
83    pub fn rows(&self, table: &Symbol) -> Option<&BTreeMap<Symbol, CatalogSnapshotRow>> {
84        self.rows.get(table)
85    }
86}
87
88/// One row within a [`CatalogSnapshot`]: data-only, with live payloads already
89/// projected to markers.
90#[derive(Clone, Debug, PartialEq, Eq)]
91pub struct CatalogSnapshotRow {
92    /// Table the row belongs to.
93    pub table: Symbol,
94    /// Key identifying the row.
95    pub key: Symbol,
96    /// Epoch at which the row was last written.
97    pub epoch: u64,
98    /// Serializable field data.
99    pub data: BTreeMap<Symbol, Expr>,
100}
101
102impl CatalogStore {
103    /// Restores a data-only store from `snapshot`, validating table keys, row
104    /// keys, required fields, and row epochs.
105    pub fn from_snapshot(snapshot: CatalogSnapshot) -> Result<Self> {
106        validate_table_keys(&snapshot.tables)?;
107        let mut store = CatalogStore {
108            tables: snapshot.tables,
109            sequences: snapshot.sequences,
110            epoch: snapshot.epoch,
111            ..Self::default()
112        };
113
114        for (table, rows) in snapshot.rows {
115            let spec = store
116                .table(&table)
117                .cloned()
118                .ok_or_else(|| Error::CatalogSchema {
119                    table: table.clone(),
120                    message: "unknown catalog table".to_owned(),
121                })?;
122            for (key, snapshot_row) in rows {
123                validate_snapshot_row(&table, &key, &snapshot_row, &spec, store.epoch)?;
124                let mut row = CatalogRow::new(snapshot_row.table, snapshot_row.key);
125                row.data = snapshot_row.data;
126                row.set_epoch(snapshot_row.epoch);
127                store
128                    .rows
129                    .entry(table.clone())
130                    .or_default()
131                    .insert(key, row);
132            }
133        }
134
135        Ok(store)
136    }
137}
138
139fn visible_rows(store: &CatalogStore) -> &BTreeMap<Symbol, BTreeMap<Symbol, CatalogRow>> {
140    store
141        .overlay
142        .as_ref()
143        .map_or(&store.rows, |overlay| overlay.all_rows())
144}
145
146fn visible_sequences(store: &CatalogStore) -> &BTreeMap<Symbol, u64> {
147    store
148        .overlay
149        .as_ref()
150        .map_or(&store.sequences, |overlay| overlay.all_sequences())
151}
152
153fn snapshot_row_data(row: &CatalogRow) -> BTreeMap<Symbol, Expr> {
154    let mut data = row.data.clone();
155    for field in row.live.keys() {
156        data.entry(field.clone())
157            .or_insert_with(|| unresolved_live_expr(row, field));
158    }
159    data
160}
161
162fn validate_table_keys(tables: &BTreeMap<Symbol, CatalogTableSpec>) -> Result<()> {
163    for (name, spec) in tables {
164        if name != &spec.name {
165            return Err(Error::CatalogSchema {
166                table: name.clone(),
167                message: "table spec key does not match table name".to_owned(),
168            });
169        }
170    }
171    Ok(())
172}
173
174fn validate_snapshot_row(
175    table: &Symbol,
176    key: &Symbol,
177    row: &CatalogSnapshotRow,
178    spec: &CatalogTableSpec,
179    snapshot_epoch: u64,
180) -> Result<()> {
181    if &row.table != table || &row.key != key {
182        return Err(Error::CatalogSchema {
183            table: table.clone(),
184            message: "snapshot row key does not match row data".to_owned(),
185        });
186    }
187    if row.epoch > snapshot_epoch {
188        return Err(Error::CatalogSchema {
189            table: table.clone(),
190            message: "row epoch is newer than snapshot epoch".to_owned(),
191        });
192    }
193    for field in &spec.required_fields {
194        if !row.data.contains_key(field) {
195            return Err(Error::CatalogSchema {
196                table: table.clone(),
197                message: format!("missing required catalog field {field}"),
198            });
199        }
200    }
201    Ok(())
202}