Skip to main content

sim_kernel/catalog/
delta.rs

1use std::collections::BTreeMap;
2
3use crate::{Error, Result, Symbol};
4
5use super::{
6    CatalogEvent, CatalogEventOp, CatalogRow, CatalogSnapshot, CatalogSnapshotRow, CatalogStore,
7    CatalogTableSpec, CatalogWritePolicy,
8};
9
10/// The change set between two catalog epochs: table specs, changed and deleted
11/// rows, and sequence changes.
12///
13/// Produced by [`CatalogStore::delta_since`] and applied by
14/// [`CatalogStore::apply_delta`]. See the README section "Snapshots and deltas".
15#[derive(Clone, Debug, PartialEq, Eq)]
16pub struct CatalogDelta {
17    /// Epoch the delta starts from (exclusive, or 0 for a full delta).
18    pub from_epoch: u64,
19    /// Epoch the delta brings the store to.
20    pub to_epoch: u64,
21    /// Table specs that must exist or match in the target store.
22    pub table_specs: BTreeMap<Symbol, CatalogTableSpec>,
23    /// Rows inserted or replaced, by table and key.
24    pub rows_changed: BTreeMap<Symbol, BTreeMap<Symbol, CatalogSnapshotRow>>,
25    /// Rows deleted, by table and key.
26    pub rows_deleted: BTreeMap<Symbol, BTreeMap<Symbol, CatalogDeletedRow>>,
27    /// Sequence advances, by sequence name.
28    pub sequence_changes: BTreeMap<Symbol, CatalogSequenceChange>,
29}
30
31/// A row removed by a [`CatalogDelta`].
32#[derive(Clone, Debug, PartialEq, Eq)]
33pub struct CatalogDeletedRow {
34    /// Epoch at which the deletion occurred.
35    pub epoch: u64,
36    /// Table the row was deleted from.
37    pub table: Symbol,
38    /// Key of the deleted row.
39    pub key: Symbol,
40}
41
42/// A sequence advance carried by a [`CatalogDelta`].
43#[derive(Clone, Debug, PartialEq, Eq)]
44pub struct CatalogSequenceChange {
45    /// Epoch at which the sequence advanced.
46    pub epoch: u64,
47    /// Sequence name.
48    pub name: Symbol,
49    /// New sequence value.
50    pub next: u64,
51}
52
53impl CatalogStore {
54    /// Computes the delta carrying this store from `from_epoch` to its current
55    /// epoch; `from_epoch == 0` yields a full delta.
56    pub fn delta_since(&self, from_epoch: u64) -> Result<CatalogDelta> {
57        let to_epoch = self.epoch();
58        if from_epoch > to_epoch {
59            return Err(delta_error(
60                "delta source epoch is newer than catalog epoch",
61            ));
62        }
63
64        let snapshot = CatalogSnapshot::from_store(self);
65        let mut rows_changed = BTreeMap::new();
66        let mut rows_deleted = BTreeMap::new();
67        let mut sequence_changes = BTreeMap::new();
68
69        if from_epoch == 0 {
70            rows_changed = snapshot.rows.clone();
71            for (name, next) in &snapshot.sequences {
72                sequence_changes.insert(
73                    name.clone(),
74                    CatalogSequenceChange {
75                        epoch: to_epoch,
76                        name: name.clone(),
77                        next: *next,
78                    },
79                );
80            }
81        }
82
83        for event in self
84            .journal()
85            .iter()
86            .filter(|event| event.epoch > from_epoch)
87        {
88            match &event.op {
89                CatalogEventOp::PutRow { table, key } => {
90                    remove_deleted_row(&mut rows_deleted, table, key);
91                    if let Some(row) = snapshot.rows(table).and_then(|rows| rows.get(key)) {
92                        rows_changed
93                            .entry(table.clone())
94                            .or_default()
95                            .insert(key.clone(), row.clone());
96                    }
97                }
98                CatalogEventOp::DeleteRow { table, key } => {
99                    remove_changed_row(&mut rows_changed, table, key);
100                    rows_deleted.entry(table.clone()).or_default().insert(
101                        key.clone(),
102                        CatalogDeletedRow {
103                            epoch: event.epoch,
104                            table: table.clone(),
105                            key: key.clone(),
106                        },
107                    );
108                }
109                CatalogEventOp::Sequence { name, next } => {
110                    sequence_changes.insert(
111                        name.clone(),
112                        CatalogSequenceChange {
113                            epoch: event.epoch,
114                            name: name.clone(),
115                            next: *next,
116                        },
117                    );
118                }
119            }
120        }
121
122        Ok(CatalogDelta {
123            from_epoch,
124            to_epoch,
125            table_specs: snapshot.tables,
126            rows_changed,
127            rows_deleted,
128            sequence_changes,
129        })
130    }
131
132    /// Applies `delta` atomically after validating source epoch, table
133    /// compatibility, sealed-row conflicts, change epochs, and target epoch;
134    /// the store is left unchanged on error.
135    pub fn apply_delta(&mut self, delta: CatalogDelta) -> Result<()> {
136        let mut next = self.clone();
137        next.apply_delta_in_place(delta)?;
138        *self = next;
139        Ok(())
140    }
141
142    fn apply_delta_in_place(&mut self, delta: CatalogDelta) -> Result<()> {
143        if self.overlay.is_some() {
144            return Err(delta_error(
145                "cannot apply catalog delta while overlay is active",
146            ));
147        }
148        if self.epoch != delta.from_epoch {
149            return Err(delta_error("catalog delta source epoch mismatch"));
150        }
151        if delta.to_epoch < delta.from_epoch {
152            return Err(delta_error(
153                "catalog delta target epoch precedes source epoch",
154            ));
155        }
156
157        for spec in delta.table_specs.values() {
158            self.install_or_validate_delta_table(spec)?;
159        }
160        self.validate_delta_rows(&delta)?;
161        self.validate_delta_deletes(&delta)?;
162        validate_delta_sequences(&delta)?;
163
164        for rows in delta.rows_changed.values() {
165            for row in rows.values() {
166                self.apply_delta_row(row.clone());
167            }
168        }
169        for rows in delta.rows_deleted.values() {
170            for deleted in rows.values() {
171                self.apply_delta_delete(deleted);
172            }
173        }
174        for change in delta.sequence_changes.values() {
175            self.sequences.insert(change.name.clone(), change.next);
176            self.journal.push(CatalogEvent {
177                epoch: change.epoch,
178                op: CatalogEventOp::Sequence {
179                    name: change.name.clone(),
180                    next: change.next,
181                },
182            });
183        }
184
185        self.epoch = delta.to_epoch;
186        if self.epoch == delta.to_epoch {
187            Ok(())
188        } else {
189            Err(delta_error("catalog delta target epoch was not reached"))
190        }
191    }
192
193    fn install_or_validate_delta_table(&mut self, spec: &CatalogTableSpec) -> Result<()> {
194        match self.tables.get(&spec.name) {
195            Some(existing) if existing == spec => Ok(()),
196            Some(_) => Err(Error::CatalogSchema {
197                table: spec.name.clone(),
198                message: "incompatible catalog table spec".to_owned(),
199            }),
200            None => {
201                self.tables.insert(spec.name.clone(), spec.clone());
202                Ok(())
203            }
204        }
205    }
206
207    fn validate_delta_rows(&self, delta: &CatalogDelta) -> Result<()> {
208        for (table, rows) in &delta.rows_changed {
209            let spec = self.table(table).ok_or_else(|| Error::CatalogSchema {
210                table: table.clone(),
211                message: "unknown catalog table".to_owned(),
212            })?;
213            for (key, row) in rows {
214                validate_row_key(table, key, row)?;
215                validate_change_epoch(row.epoch, delta, table)?;
216                validate_required_fields(spec, row)?;
217                if spec.policy == CatalogWritePolicy::Sealed && self.row(table, key).is_some() {
218                    return Err(Error::CatalogConflict {
219                        table: table.clone(),
220                        key: key.clone(),
221                    });
222                }
223            }
224        }
225        Ok(())
226    }
227
228    fn validate_delta_deletes(&self, delta: &CatalogDelta) -> Result<()> {
229        for (table, rows) in &delta.rows_deleted {
230            if self.table(table).is_none() {
231                return Err(Error::CatalogSchema {
232                    table: table.clone(),
233                    message: "unknown catalog table".to_owned(),
234                });
235            }
236            for (key, deleted) in rows {
237                if &deleted.table != table || &deleted.key != key {
238                    return Err(Error::CatalogSchema {
239                        table: table.clone(),
240                        message: "deleted row key does not match row data".to_owned(),
241                    });
242                }
243                validate_change_epoch(deleted.epoch, delta, table)?;
244            }
245        }
246        Ok(())
247    }
248
249    fn apply_delta_row(&mut self, snapshot_row: CatalogSnapshotRow) {
250        let mut row = CatalogRow::new(snapshot_row.table.clone(), snapshot_row.key.clone());
251        row.data = snapshot_row.data;
252        row.set_epoch(snapshot_row.epoch);
253        self.rows
254            .entry(row.table.clone())
255            .or_default()
256            .insert(row.key.clone(), row);
257        self.journal.push(CatalogEvent {
258            epoch: snapshot_row.epoch,
259            op: CatalogEventOp::PutRow {
260                table: snapshot_row.table,
261                key: snapshot_row.key,
262            },
263        });
264    }
265
266    fn apply_delta_delete(&mut self, deleted: &CatalogDeletedRow) {
267        if let Some(rows) = self.rows.get_mut(&deleted.table) {
268            rows.remove(&deleted.key);
269            if rows.is_empty() {
270                self.rows.remove(&deleted.table);
271            }
272        }
273        self.journal.push(CatalogEvent {
274            epoch: deleted.epoch,
275            op: CatalogEventOp::DeleteRow {
276                table: deleted.table.clone(),
277                key: deleted.key.clone(),
278            },
279        });
280    }
281}
282
283fn validate_row_key(table: &Symbol, key: &Symbol, row: &CatalogSnapshotRow) -> Result<()> {
284    if &row.table == table && &row.key == key {
285        Ok(())
286    } else {
287        Err(Error::CatalogSchema {
288            table: table.clone(),
289            message: "changed row key does not match row data".to_owned(),
290        })
291    }
292}
293
294fn validate_required_fields(spec: &CatalogTableSpec, row: &CatalogSnapshotRow) -> Result<()> {
295    for field in &spec.required_fields {
296        if !row.data.contains_key(field) {
297            return Err(Error::CatalogSchema {
298                table: row.table.clone(),
299                message: format!("missing required catalog field {field}"),
300            });
301        }
302    }
303    Ok(())
304}
305
306fn validate_delta_sequences(delta: &CatalogDelta) -> Result<()> {
307    for change in delta.sequence_changes.values() {
308        validate_change_epoch(change.epoch, delta, &change.name)?;
309    }
310    Ok(())
311}
312
313fn validate_change_epoch(epoch: u64, delta: &CatalogDelta, table: &Symbol) -> Result<()> {
314    if epoch <= delta.to_epoch && (epoch > delta.from_epoch || delta.from_epoch == 0) {
315        Ok(())
316    } else {
317        Err(Error::CatalogSchema {
318            table: table.clone(),
319            message: "catalog delta change epoch is outside delta bounds".to_owned(),
320        })
321    }
322}
323
324fn remove_changed_row(
325    rows_changed: &mut BTreeMap<Symbol, BTreeMap<Symbol, CatalogSnapshotRow>>,
326    table: &Symbol,
327    key: &Symbol,
328) {
329    if let Some(rows) = rows_changed.get_mut(table) {
330        rows.remove(key);
331        if rows.is_empty() {
332            rows_changed.remove(table);
333        }
334    }
335}
336
337fn remove_deleted_row(
338    rows_deleted: &mut BTreeMap<Symbol, BTreeMap<Symbol, CatalogDeletedRow>>,
339    table: &Symbol,
340    key: &Symbol,
341) {
342    if let Some(rows) = rows_deleted.get_mut(table) {
343        rows.remove(key);
344        if rows.is_empty() {
345            rows_deleted.remove(table);
346        }
347    }
348}
349
350fn delta_error(message: &'static str) -> Error {
351    Error::CatalogSchema {
352        table: Symbol::qualified("catalog", "delta"),
353        message: message.to_owned(),
354    }
355}