Skip to main content

sim_kernel/catalog/
replay.rs

1use std::collections::BTreeMap;
2
3use crate::{Error, Result, Symbol};
4
5use super::{CatalogSnapshot, CatalogSnapshotRow, CatalogStore, CatalogTableSpec};
6
7/// A replayable log of catalog events plus the table specs and epoch needed to
8/// rebuild a [`CatalogStore`] deterministically.
9#[derive(Clone, Debug, PartialEq, Eq)]
10pub struct CatalogReplay {
11    /// Table specs the replay installs.
12    pub tables: BTreeMap<Symbol, CatalogTableSpec>,
13    /// Ordered events to replay.
14    pub events: Vec<CatalogReplayEvent>,
15    /// Final catalog epoch after replay.
16    pub epoch: u64,
17}
18
19/// A single event within a [`CatalogReplay`].
20#[derive(Clone, Debug, PartialEq, Eq)]
21pub enum CatalogReplayEvent {
22    /// Insert or replace a row.
23    PutRow(CatalogSnapshotRow),
24    /// Delete a row.
25    DeleteRow {
26        /// Epoch of the deletion.
27        epoch: u64,
28        /// Table the row was in.
29        table: Symbol,
30        /// Key of the deleted row.
31        key: Symbol,
32    },
33    /// Set a sequence value.
34    Sequence {
35        /// Epoch of the change.
36        epoch: u64,
37        /// Sequence name.
38        name: Symbol,
39        /// New sequence value.
40        next: u64,
41    },
42}
43
44impl CatalogReplay {
45    /// Builds a replay log from `snapshot`'s rows and sequences.
46    pub fn from_snapshot(snapshot: &CatalogSnapshot) -> Self {
47        let row_events = snapshot
48            .rows
49            .values()
50            .flat_map(BTreeMap::values)
51            .cloned()
52            .map(CatalogReplayEvent::PutRow);
53        let sequence_events =
54            snapshot
55                .sequences
56                .iter()
57                .map(|(name, next)| CatalogReplayEvent::Sequence {
58                    epoch: snapshot.epoch,
59                    name: name.clone(),
60                    next: *next,
61                });
62
63        Self {
64            tables: snapshot.tables.clone(),
65            events: row_events.chain(sequence_events).collect(),
66            epoch: snapshot.epoch,
67        }
68    }
69}
70
71impl CatalogStore {
72    /// Rebuilds a store by replaying `replay`'s events in order, validating each
73    /// event epoch against the replay epoch.
74    pub fn replay(replay: CatalogReplay) -> Result<Self> {
75        let mut snapshot = CatalogSnapshot {
76            tables: replay.tables,
77            rows: BTreeMap::new(),
78            sequences: BTreeMap::new(),
79            epoch: replay.epoch,
80        };
81
82        for event in replay.events {
83            match event {
84                CatalogReplayEvent::PutRow(row) => {
85                    validate_event_epoch(row.epoch, snapshot.epoch, &row.table)?;
86                    snapshot
87                        .rows
88                        .entry(row.table.clone())
89                        .or_default()
90                        .insert(row.key.clone(), row);
91                }
92                CatalogReplayEvent::DeleteRow { epoch, table, key } => {
93                    validate_event_epoch(epoch, snapshot.epoch, &table)?;
94                    if let Some(rows) = snapshot.rows.get_mut(&table) {
95                        rows.remove(&key);
96                        if rows.is_empty() {
97                            snapshot.rows.remove(&table);
98                        }
99                    }
100                }
101                CatalogReplayEvent::Sequence { epoch, name, next } => {
102                    validate_event_epoch(epoch, snapshot.epoch, &name)?;
103                    snapshot.sequences.insert(name, next);
104                }
105            }
106        }
107
108        Self::from_snapshot(snapshot)
109    }
110}
111
112fn validate_event_epoch(epoch: u64, replay_epoch: u64, table: &Symbol) -> Result<()> {
113    if epoch <= replay_epoch {
114        Ok(())
115    } else {
116        Err(Error::CatalogSchema {
117            table: table.clone(),
118            message: "replay event epoch is newer than replay epoch".to_owned(),
119        })
120    }
121}