use std::collections::BTreeMap;
use crate::{Error, Result, Symbol};
use super::{CatalogSnapshot, CatalogSnapshotRow, CatalogStore, CatalogTableSpec};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CatalogReplay {
pub tables: BTreeMap<Symbol, CatalogTableSpec>,
pub events: Vec<CatalogReplayEvent>,
pub epoch: u64,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum CatalogReplayEvent {
PutRow(CatalogSnapshotRow),
DeleteRow {
epoch: u64,
table: Symbol,
key: Symbol,
},
Sequence {
epoch: u64,
name: Symbol,
next: u64,
},
}
impl CatalogReplay {
pub fn from_snapshot(snapshot: &CatalogSnapshot) -> Self {
let row_events = snapshot
.rows
.values()
.flat_map(BTreeMap::values)
.cloned()
.map(CatalogReplayEvent::PutRow);
let sequence_events =
snapshot
.sequences
.iter()
.map(|(name, next)| CatalogReplayEvent::Sequence {
epoch: snapshot.epoch,
name: name.clone(),
next: *next,
});
Self {
tables: snapshot.tables.clone(),
events: row_events.chain(sequence_events).collect(),
epoch: snapshot.epoch,
}
}
}
impl CatalogStore {
pub fn replay(replay: CatalogReplay) -> Result<Self> {
let mut snapshot = CatalogSnapshot {
tables: replay.tables,
rows: BTreeMap::new(),
sequences: BTreeMap::new(),
epoch: replay.epoch,
};
for event in replay.events {
match event {
CatalogReplayEvent::PutRow(row) => {
validate_event_epoch(row.epoch, snapshot.epoch, &row.table)?;
snapshot
.rows
.entry(row.table.clone())
.or_default()
.insert(row.key.clone(), row);
}
CatalogReplayEvent::DeleteRow { epoch, table, key } => {
validate_event_epoch(epoch, snapshot.epoch, &table)?;
if let Some(rows) = snapshot.rows.get_mut(&table) {
rows.remove(&key);
if rows.is_empty() {
snapshot.rows.remove(&table);
}
}
}
CatalogReplayEvent::Sequence { epoch, name, next } => {
validate_event_epoch(epoch, snapshot.epoch, &name)?;
snapshot.sequences.insert(name, next);
}
}
}
Self::from_snapshot(snapshot)
}
}
fn validate_event_epoch(epoch: u64, replay_epoch: u64, table: &Symbol) -> Result<()> {
if epoch <= replay_epoch {
Ok(())
} else {
Err(Error::CatalogSchema {
table: table.clone(),
message: "replay event epoch is newer than replay epoch".to_owned(),
})
}
}