use std::collections::BTreeMap;
use crate::{Error, Expr, Result, Symbol};
use super::{
CatalogRow, CatalogStore, CatalogTableSpec,
snapshot_expr::{snapshot_from_expr, snapshot_to_expr, unresolved_live_expr},
};
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CatalogSnapshot {
pub tables: BTreeMap<Symbol, CatalogTableSpec>,
pub rows: BTreeMap<Symbol, BTreeMap<Symbol, CatalogSnapshotRow>>,
pub sequences: BTreeMap<Symbol, u64>,
pub epoch: u64,
}
impl CatalogSnapshot {
pub fn from_store(store: &CatalogStore) -> Self {
let rows = visible_rows(store)
.iter()
.map(|(table, rows)| {
let rows = rows
.iter()
.map(|(key, row)| {
(
key.clone(),
CatalogSnapshotRow {
table: row.table.clone(),
key: row.key.clone(),
epoch: row.epoch,
data: snapshot_row_data(row),
},
)
})
.collect();
(table.clone(), rows)
})
.collect();
Self {
tables: store.tables.clone(),
rows,
sequences: visible_sequences(store).clone(),
epoch: store.epoch(),
}
}
pub fn to_expr(&self) -> Expr {
snapshot_to_expr(self)
}
pub fn from_expr(expr: Expr) -> Result<Self> {
snapshot_from_expr(expr)
}
pub fn rows(&self, table: &Symbol) -> Option<&BTreeMap<Symbol, CatalogSnapshotRow>> {
self.rows.get(table)
}
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct CatalogSnapshotRow {
pub table: Symbol,
pub key: Symbol,
pub epoch: u64,
pub data: BTreeMap<Symbol, Expr>,
}
impl CatalogStore {
pub fn from_snapshot(snapshot: CatalogSnapshot) -> Result<Self> {
validate_table_keys(&snapshot.tables)?;
let mut store = CatalogStore {
tables: snapshot.tables,
sequences: snapshot.sequences,
epoch: snapshot.epoch,
..Self::default()
};
for (table, rows) in snapshot.rows {
let spec = store
.table(&table)
.cloned()
.ok_or_else(|| Error::CatalogSchema {
table: table.clone(),
message: "unknown catalog table".to_owned(),
})?;
for (key, snapshot_row) in rows {
validate_snapshot_row(&table, &key, &snapshot_row, &spec, store.epoch)?;
let mut row = CatalogRow::new(snapshot_row.table, snapshot_row.key);
row.data = snapshot_row.data;
row.set_epoch(snapshot_row.epoch);
store
.rows
.entry(table.clone())
.or_default()
.insert(key, row);
}
}
Ok(store)
}
}
fn visible_rows(store: &CatalogStore) -> &BTreeMap<Symbol, BTreeMap<Symbol, CatalogRow>> {
store
.overlay
.as_ref()
.map_or(&store.rows, |overlay| overlay.all_rows())
}
fn visible_sequences(store: &CatalogStore) -> &BTreeMap<Symbol, u64> {
store
.overlay
.as_ref()
.map_or(&store.sequences, |overlay| overlay.all_sequences())
}
fn snapshot_row_data(row: &CatalogRow) -> BTreeMap<Symbol, Expr> {
let mut data = row.data.clone();
for field in row.live.keys() {
data.entry(field.clone())
.or_insert_with(|| unresolved_live_expr(row, field));
}
data
}
fn validate_table_keys(tables: &BTreeMap<Symbol, CatalogTableSpec>) -> Result<()> {
for (name, spec) in tables {
if name != &spec.name {
return Err(Error::CatalogSchema {
table: name.clone(),
message: "table spec key does not match table name".to_owned(),
});
}
}
Ok(())
}
fn validate_snapshot_row(
table: &Symbol,
key: &Symbol,
row: &CatalogSnapshotRow,
spec: &CatalogTableSpec,
snapshot_epoch: u64,
) -> Result<()> {
if &row.table != table || &row.key != key {
return Err(Error::CatalogSchema {
table: table.clone(),
message: "snapshot row key does not match row data".to_owned(),
});
}
if row.epoch > snapshot_epoch {
return Err(Error::CatalogSchema {
table: table.clone(),
message: "row epoch is newer than snapshot epoch".to_owned(),
});
}
for field in &spec.required_fields {
if !row.data.contains_key(field) {
return Err(Error::CatalogSchema {
table: table.clone(),
message: format!("missing required catalog field {field}"),
});
}
}
Ok(())
}