use nodedb_types::{DatabaseId, Lsn};
use crate::control::security::catalog::SystemCatalog;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum MirrorDdlKind {
CreateCollection,
AlterCollection,
DropCollection,
}
pub fn apply_mirror_ddl_entry(
catalog: &SystemCatalog,
mirror_db_id: DatabaseId,
entry_lsn: Lsn,
entry_apply_ms: u64,
source_collection_name: &str,
kind: MirrorDdlKind,
) -> crate::Result<bool> {
match kind {
MirrorDdlKind::CreateCollection | MirrorDdlKind::AlterCollection => {
let local_collection_name = source_collection_name;
catalog.apply_ddl_entry_atomic(
mirror_db_id,
entry_lsn,
entry_apply_ms,
source_collection_name,
local_collection_name,
)
}
MirrorDdlKind::DropCollection => {
let drop_sentinel = "";
catalog.apply_ddl_entry_atomic(
mirror_db_id,
entry_lsn,
entry_apply_ms,
source_collection_name,
drop_sentinel,
)
}
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use nodedb_types::{DatabaseId, Lsn};
use tempfile::TempDir;
use super::*;
use crate::control::security::catalog::SystemCatalog;
fn open_tmp_catalog(tmp: &TempDir) -> SystemCatalog {
let path: PathBuf = tmp.path().join("system.redb");
SystemCatalog::open(&path).expect("open catalog")
}
#[test]
fn mirror_ddl_create_applies_and_updates_lag() {
let tmp = TempDir::new().unwrap();
let catalog = open_tmp_catalog(&tmp);
let db_id = DatabaseId::new(1024);
let applied = apply_mirror_ddl_entry(
&catalog,
db_id,
Lsn::new(5),
1000,
"orders",
MirrorDdlKind::CreateCollection,
)
.unwrap();
assert!(applied, "first apply must return true");
let lag = catalog.get_mirror_lag(db_id).unwrap().unwrap();
assert_eq!(lag.last_applied_lsn, Lsn::new(5));
assert_eq!(lag.last_apply_ms, 1000);
let mapping = catalog
.get_mirror_collection_mapping(db_id, "orders")
.unwrap();
assert_eq!(mapping, Some("orders".to_string()));
}
#[test]
fn mirror_ddl_is_idempotent_across_simulated_restart() {
let tmp = TempDir::new().unwrap();
let catalog = open_tmp_catalog(&tmp);
let db_id = DatabaseId::new(1025);
let lsn = Lsn::new(10);
let first = apply_mirror_ddl_entry(
&catalog,
db_id,
lsn,
500,
"events",
MirrorDdlKind::CreateCollection,
)
.unwrap();
assert!(first);
let second = apply_mirror_ddl_entry(
&catalog,
db_id,
lsn,
500,
"events",
MirrorDdlKind::CreateCollection,
)
.unwrap();
assert!(!second, "idempotent replay must return false");
let lag = catalog.get_mirror_lag(db_id).unwrap().unwrap();
assert_eq!(lag.last_applied_lsn, lsn);
}
#[test]
fn mirror_ddl_drop_advances_lsn() {
let tmp = TempDir::new().unwrap();
let catalog = open_tmp_catalog(&tmp);
let db_id = DatabaseId::new(1026);
apply_mirror_ddl_entry(
&catalog,
db_id,
Lsn::new(3),
300,
"tmp_table",
MirrorDdlKind::CreateCollection,
)
.unwrap();
apply_mirror_ddl_entry(
&catalog,
db_id,
Lsn::new(7),
700,
"tmp_table",
MirrorDdlKind::DropCollection,
)
.unwrap();
let lag = catalog.get_mirror_lag(db_id).unwrap().unwrap();
assert_eq!(lag.last_applied_lsn, Lsn::new(7));
}
}