sim_kernel/catalog/
replay.rs1use std::collections::BTreeMap;
2
3use crate::{Error, Result, Symbol};
4
5use super::{CatalogSnapshot, CatalogSnapshotRow, CatalogStore, CatalogTableSpec};
6
7#[derive(Clone, Debug, PartialEq, Eq)]
10pub struct CatalogReplay {
11 pub tables: BTreeMap<Symbol, CatalogTableSpec>,
13 pub events: Vec<CatalogReplayEvent>,
15 pub epoch: u64,
17}
18
19#[derive(Clone, Debug, PartialEq, Eq)]
21pub enum CatalogReplayEvent {
22 PutRow(CatalogSnapshotRow),
24 DeleteRow {
26 epoch: u64,
28 table: Symbol,
30 key: Symbol,
32 },
33 Sequence {
35 epoch: u64,
37 name: Symbol,
39 next: u64,
41 },
42}
43
44impl CatalogReplay {
45 pub fn from_snapshot(snapshot: &CatalogSnapshot) -> Self {
47 let row_events = snapshot
48 .rows
49 .values()
50 .flat_map(BTreeMap::values)
51 .cloned()
52 .map(CatalogReplayEvent::PutRow);
53 let sequence_events =
54 snapshot
55 .sequences
56 .iter()
57 .map(|(name, next)| CatalogReplayEvent::Sequence {
58 epoch: snapshot.epoch,
59 name: name.clone(),
60 next: *next,
61 });
62
63 Self {
64 tables: snapshot.tables.clone(),
65 events: row_events.chain(sequence_events).collect(),
66 epoch: snapshot.epoch,
67 }
68 }
69}
70
71impl CatalogStore {
72 pub fn replay(replay: CatalogReplay) -> Result<Self> {
75 let mut snapshot = CatalogSnapshot {
76 tables: replay.tables,
77 rows: BTreeMap::new(),
78 sequences: BTreeMap::new(),
79 epoch: replay.epoch,
80 };
81
82 for event in replay.events {
83 match event {
84 CatalogReplayEvent::PutRow(row) => {
85 validate_event_epoch(row.epoch, snapshot.epoch, &row.table)?;
86 snapshot
87 .rows
88 .entry(row.table.clone())
89 .or_default()
90 .insert(row.key.clone(), row);
91 }
92 CatalogReplayEvent::DeleteRow { epoch, table, key } => {
93 validate_event_epoch(epoch, snapshot.epoch, &table)?;
94 if let Some(rows) = snapshot.rows.get_mut(&table) {
95 rows.remove(&key);
96 if rows.is_empty() {
97 snapshot.rows.remove(&table);
98 }
99 }
100 }
101 CatalogReplayEvent::Sequence { epoch, name, next } => {
102 validate_event_epoch(epoch, snapshot.epoch, &name)?;
103 snapshot.sequences.insert(name, next);
104 }
105 }
106 }
107
108 Self::from_snapshot(snapshot)
109 }
110}
111
112fn validate_event_epoch(epoch: u64, replay_epoch: u64, table: &Symbol) -> Result<()> {
113 if epoch <= replay_epoch {
114 Ok(())
115 } else {
116 Err(Error::CatalogSchema {
117 table: table.clone(),
118 message: "replay event epoch is newer than replay epoch".to_owned(),
119 })
120 }
121}