use nodedb_types::HlcClock;
use crate::control::catalog_entry::CatalogEntry;
use crate::control::security::catalog::SystemCatalog;
pub fn stamp(entry: CatalogEntry, clock: &HlcClock, catalog: &SystemCatalog) -> CatalogEntry {
let hlc = clock.now();
match entry {
CatalogEntry::PutCollection(mut stored) => {
let prior = catalog
.get_collection(stored.database_id, stored.tenant_id, &stored.name)
.ok()
.flatten()
.map(|c| c.descriptor_version)
.unwrap_or(0);
stored.descriptor_version = prior.saturating_add(1);
stored.modification_hlc = hlc;
CatalogEntry::PutCollection(stored)
}
CatalogEntry::PutMaterializedView(mut stored) => {
let prior = catalog
.get_materialized_view(stored.tenant_id, &stored.name)
.ok()
.flatten()
.map(|v| v.descriptor_version)
.unwrap_or(0);
stored.descriptor_version = prior.saturating_add(1);
stored.modification_hlc = hlc;
CatalogEntry::PutMaterializedView(stored)
}
CatalogEntry::PutFunction(mut stored) => {
let prior = catalog
.get_function(stored.tenant_id, &stored.name)
.ok()
.flatten()
.map(|f| f.descriptor_version)
.unwrap_or(0);
stored.descriptor_version = prior.saturating_add(1);
stored.modification_hlc = hlc;
CatalogEntry::PutFunction(stored)
}
CatalogEntry::PutProcedure(mut stored) => {
let prior = catalog
.get_procedure(stored.tenant_id, &stored.name)
.ok()
.flatten()
.map(|p| p.descriptor_version)
.unwrap_or(0);
stored.descriptor_version = prior.saturating_add(1);
stored.modification_hlc = hlc;
CatalogEntry::PutProcedure(stored)
}
CatalogEntry::PutTrigger(mut stored) => {
let prior = catalog
.get_trigger(stored.tenant_id, &stored.name)
.ok()
.flatten()
.map(|t| t.descriptor_version)
.unwrap_or(0);
stored.descriptor_version = prior.saturating_add(1);
stored.modification_hlc = hlc;
CatalogEntry::PutTrigger(stored)
}
CatalogEntry::PutSequence(mut stored) => {
let prior = catalog
.get_sequence(stored.tenant_id, &stored.name)
.ok()
.flatten()
.map(|s| s.descriptor_version)
.unwrap_or(0);
stored.descriptor_version = prior.saturating_add(1);
stored.modification_hlc = hlc;
CatalogEntry::PutSequence(stored)
}
CatalogEntry::PutContinuousAggregate(mut stored) => {
let prior = catalog
.get_continuous_aggregate(stored.tenant_id, &stored.name)
.ok()
.flatten()
.map(|c| c.descriptor_version)
.unwrap_or(0);
stored.descriptor_version = prior.saturating_add(1);
stored.modification_hlc = hlc;
CatalogEntry::PutContinuousAggregate(stored)
}
entry @ (CatalogEntry::DeactivateCollection { .. }
| CatalogEntry::PurgeCollection { .. }
| CatalogEntry::DeleteFunction { .. }
| CatalogEntry::DeleteProcedure { .. }
| CatalogEntry::DeleteTrigger { .. }
| CatalogEntry::DeleteMaterializedView { .. }
| CatalogEntry::DeleteContinuousAggregate { .. }
| CatalogEntry::DeleteSequence { .. }
| CatalogEntry::PutSequenceState(_)
| CatalogEntry::PutSchedule(_)
| CatalogEntry::DeleteSchedule { .. }
| CatalogEntry::PutChangeStream(_)
| CatalogEntry::DeleteChangeStream { .. }
| CatalogEntry::PutUser(_)
| CatalogEntry::DropUser { .. }
| CatalogEntry::PutRole(_)
| CatalogEntry::DeleteRole { .. }
| CatalogEntry::PutApiKey(_)
| CatalogEntry::RevokeApiKey { .. }
| CatalogEntry::PutTenant(_)
| CatalogEntry::DeleteTenant { .. }
| CatalogEntry::PutRlsPolicy(_)
| CatalogEntry::DeleteRlsPolicy { .. }
| CatalogEntry::PutPermission(_)
| CatalogEntry::DeletePermission { .. }
| CatalogEntry::PutOwner(_)
| CatalogEntry::DeleteOwner { .. }
| CatalogEntry::PutSynonymGroup(_)
| CatalogEntry::DeleteSynonymGroup { .. }
| CatalogEntry::PutCustomType(_)
| CatalogEntry::DeleteCustomType { .. }
| CatalogEntry::PutDatabase(_)
| CatalogEntry::DeleteDatabase { .. }
| CatalogEntry::PutDatabaseGrant { .. }
| CatalogEntry::DeleteDatabaseGrant { .. }
| CatalogEntry::PutOidcProvider(_)
| CatalogEntry::DeleteOidcProvider { .. }
| CatalogEntry::CloneDatabase { .. }
| CatalogEntry::MoveTenantCutover { .. }) => entry,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::control::security::catalog::StoredCollection;
use crate::control::security::credential::CredentialStore;
use nodedb_types::DatabaseId;
use std::sync::Arc;
fn make_catalog() -> (Arc<CredentialStore>, tempfile::TempDir) {
let tmp = tempfile::tempdir().expect("tmpdir");
let store = Arc::new(CredentialStore::open(&tmp.path().join("system.redb")).expect("open"));
(store, tmp)
}
#[test]
fn stamp_on_create_assigns_version_one() {
let (store, _tmp) = make_catalog();
let clock = HlcClock::new();
let catalog = store.catalog().as_ref().expect("catalog");
let stored = StoredCollection::new(1, "orders", "tester");
let entry = CatalogEntry::PutCollection(Box::new(stored));
let stamped = stamp(entry, &clock, catalog);
let CatalogEntry::PutCollection(boxed) = stamped else {
panic!("expected PutCollection");
};
assert_eq!(boxed.descriptor_version, 1);
assert!(boxed.modification_hlc > nodedb_types::Hlc::ZERO);
}
#[test]
fn stamp_monotonic_across_updates() {
let (store, _tmp) = make_catalog();
let clock = HlcClock::new();
let catalog = store.catalog().as_ref().expect("catalog");
let mut prior_hlc = nodedb_types::Hlc::ZERO;
for expected in 1u64..=5 {
let stored = StoredCollection::new(1, "orders", "tester");
let entry = CatalogEntry::PutCollection(Box::new(stored));
let stamped = stamp(entry, &clock, catalog);
let CatalogEntry::PutCollection(boxed) = stamped else {
panic!("expected PutCollection");
};
assert_eq!(boxed.descriptor_version, expected);
assert!(boxed.modification_hlc > prior_hlc);
prior_hlc = boxed.modification_hlc;
catalog
.put_collection(DatabaseId::DEFAULT, &boxed)
.expect("put_collection");
}
}
#[test]
fn stamp_ignores_deletes() {
let (store, _tmp) = make_catalog();
let clock = HlcClock::new();
let catalog = store.catalog().as_ref().expect("catalog");
let entry = CatalogEntry::DeactivateCollection {
tenant_id: 1,
name: "orders".into(),
};
let stamped = stamp(entry, &clock, catalog);
assert!(matches!(stamped, CatalogEntry::DeactivateCollection { .. }));
}
}