use std::sync::{Arc, OnceLock, RwLock, Weak};
use tokio::sync::broadcast;
use tracing::{debug, warn};
use nodedb_cluster::{MetadataApplier, MetadataCache, MetadataEntry, decode_entry};
use crate::control::catalog_entry;
use crate::control::security::credential::CredentialStore;
use crate::control::state::SharedState;
use metadata_applier_audit::{apply_ca_trust_change, emit_ddl_audit};
#[path = "metadata_applier_audit.rs"]
mod metadata_applier_audit;
pub const CATALOG_CHANNEL_CAPACITY: usize = 64;
#[derive(Debug, Clone)]
pub struct CatalogChangeEvent {
pub applied_index: u64,
}
pub struct MetadataCommitApplier {
cache: Arc<RwLock<MetadataCache>>,
catalog_change_tx: broadcast::Sender<CatalogChangeEvent>,
credentials: Arc<CredentialStore>,
shared: OnceLock<Weak<SharedState>>,
}
impl MetadataCommitApplier {
pub fn new(
cache: Arc<RwLock<MetadataCache>>,
catalog_change_tx: broadcast::Sender<CatalogChangeEvent>,
credentials: Arc<CredentialStore>,
) -> Self {
Self {
cache,
catalog_change_tx,
credentials,
shared: OnceLock::new(),
}
}
pub fn install_shared(&self, shared: Weak<SharedState>) {
let _ = self.shared.set(shared);
}
fn apply_host_side_effects(&self, entry: &MetadataEntry, raft_index: u64) {
if let MetadataEntry::Batch { entries } = entry {
for sub in entries {
self.apply_host_side_effects(sub, raft_index);
}
return;
}
match entry {
MetadataEntry::DescriptorDrainStart {
descriptor_id,
up_to_version,
expires_at,
} => {
if let Some(weak) = self.shared.get()
&& let Some(shared) = weak.upgrade()
{
shared.lease_drain.install_start(
descriptor_id.clone(),
*up_to_version,
*expires_at,
);
debug!(
descriptor = ?descriptor_id,
up_to_version,
"drain_start applied to host tracker"
);
}
return;
}
MetadataEntry::DescriptorDrainEnd { descriptor_id } => {
if let Some(weak) = self.shared.get()
&& let Some(shared) = weak.upgrade()
{
shared.lease_drain.install_end(descriptor_id);
debug!(
descriptor = ?descriptor_id,
"drain_end applied to host tracker"
);
}
return;
}
MetadataEntry::CaTrustChange {
add_ca_cert,
remove_ca_fingerprint,
} => {
if let Some(weak) = self.shared.get()
&& let Some(shared) = weak.upgrade()
{
apply_ca_trust_change(
&shared,
add_ca_cert.as_deref(),
remove_ca_fingerprint.as_ref(),
raft_index,
);
}
return;
}
MetadataEntry::SurrogateAlloc { hwm } => {
if let Some(weak) = self.shared.get()
&& let Some(shared) = weak.upgrade()
{
let reg = shared
.surrogate_assigner
.registry_handle()
.read()
.unwrap_or_else(|p| p.into_inner());
if let Err(e) = reg.restore_hwm(*hwm) {
warn!(hwm, error = %e, "surrogate_alloc apply: restore_hwm failed");
}
drop(reg);
if let Some(catalog) = self.credentials.catalog()
&& let Err(e) = catalog.put_surrogate_hwm(*hwm)
{
warn!(
hwm,
error = %e,
"surrogate_alloc apply: failed to persist hwm to catalog"
);
}
debug!(hwm, raft_index, "surrogate hwm advanced via raft");
}
return;
}
_ => {}
}
let Some(catalog) = self.credentials.catalog() else {
return;
};
let (payload, audit) = match entry {
MetadataEntry::CatalogDdl { payload } => (payload, None),
MetadataEntry::CatalogDdlAudited {
payload,
auth_user_id,
auth_user_name,
sql_text,
} => (
payload,
Some((
auth_user_id.clone(),
auth_user_name.clone(),
sql_text.clone(),
)),
),
_ => return,
};
let catalog_entry = match catalog_entry::decode(payload) {
Ok(e) => e,
Err(e) => {
warn!(error = %e, "metadata applier: failed to decode CatalogEntry payload");
return;
}
};
let stamped = if let Some(weak) = self.shared.get()
&& let Some(shared) = weak.upgrade()
{
let compat = !shared.cluster_version_view().can_activate_feature(
crate::control::rolling_upgrade::DESCRIPTOR_VERSIONING_VERSION,
);
if compat {
catalog_entry
} else {
catalog_entry::descriptor_stamp::stamp(catalog_entry, &shared.hlc_clock, catalog)
}
} else {
catalog_entry
};
debug!(kind = stamped.kind(), "catalog_entry: applying to redb");
catalog_entry::apply::apply_to(&stamped, catalog);
if let Some(weak) = self.shared.get()
&& let Some(shared) = weak.upgrade()
{
if let Some(drained_id) =
crate::control::lease::drain_propose::descriptor_id_for_implicit_clear(&stamped)
{
shared.lease_drain.install_end(&drained_id);
}
catalog_entry::post_apply::apply_post_apply_side_effects_sync(&stamped, &shared);
emit_ddl_audit(&shared, raft_index, &stamped, audit.as_ref());
catalog_entry::post_apply::spawn_post_apply_async_side_effects(
stamped, shared, raft_index,
);
}
}
}
impl MetadataApplier for MetadataCommitApplier {
fn apply(&self, entries: &[(u64, Vec<u8>)]) -> u64 {
let mut last = 0u64;
for (index, data) in entries {
last = *index;
if data.is_empty() {
continue;
}
let entry = match decode_entry(data) {
Ok(e) => e,
Err(e) => {
warn!(index = *index, error = %e, "metadata decode failed");
continue;
}
};
{
let mut guard = self.cache.write().unwrap_or_else(|p| p.into_inner());
guard.apply(*index, &entry);
}
self.apply_host_side_effects(&entry, *index);
}
if last > 0 {
let _ = self.catalog_change_tx.send(CatalogChangeEvent {
applied_index: last,
});
debug!(
applied_index = last,
"metadata applier broadcast catalog-change event"
);
}
last
}
}
#[cfg(test)]
mod tests {
use super::*;
use nodedb_types::DatabaseId;
use crate::control::catalog_entry::CatalogEntry;
use crate::control::security::catalog::StoredCollection;
use nodedb_cluster::encode_entry;
fn make_applier() -> (
MetadataCommitApplier,
Arc<RwLock<MetadataCache>>,
Arc<CredentialStore>,
tempfile::TempDir,
) {
let tmp = tempfile::tempdir().expect("tmpdir");
let credentials =
Arc::new(CredentialStore::open(&tmp.path().join("system.redb")).expect("open"));
let cache = Arc::new(RwLock::new(MetadataCache::new()));
let (tx, _rx) = broadcast::channel(16);
let applier = MetadataCommitApplier::new(cache.clone(), tx, credentials.clone());
(applier, cache, credentials, tmp)
}
fn put_collection_entry(name: &str) -> MetadataEntry {
let stored = StoredCollection::new(7, name, "tester");
let catalog_entry = CatalogEntry::PutCollection(Box::new(stored));
MetadataEntry::CatalogDdl {
payload: catalog_entry::encode(&catalog_entry).unwrap(),
}
}
#[test]
fn apply_put_collection_writes_through_to_redb() {
let (applier, cache, credentials, _tmp) = make_applier();
let bytes = encode_entry(&put_collection_entry("orders")).unwrap();
assert_eq!(applier.apply(&[(11, bytes)]), 11);
let cache_guard = cache.read().unwrap();
assert_eq!(cache_guard.applied_index, 11);
assert_eq!(cache_guard.catalog_entries_applied, 1);
drop(cache_guard);
let loaded = credentials
.catalog()
.as_ref()
.unwrap()
.get_collection(DatabaseId::DEFAULT, 7, "orders")
.unwrap()
.expect("present");
assert_eq!(loaded.name, "orders");
assert_eq!(loaded.owner, "tester");
}
#[test]
fn apply_deactivate_preserves_record() {
let (applier, _cache, credentials, _tmp) = make_applier();
applier.apply(&[(1, encode_entry(&put_collection_entry("archived")).unwrap())]);
let drop_entry = MetadataEntry::CatalogDdl {
payload: catalog_entry::encode(&CatalogEntry::DeactivateCollection {
tenant_id: 7,
name: "archived".into(),
})
.unwrap(),
};
applier.apply(&[(2, encode_entry(&drop_entry).unwrap())]);
let loaded = credentials
.catalog()
.as_ref()
.unwrap()
.get_collection(DatabaseId::DEFAULT, 7, "archived")
.unwrap()
.expect("preserved");
assert!(!loaded.is_active);
}
#[test]
fn apply_empty_batch_is_noop() {
let (applier, _cache, _credentials, _tmp) = make_applier();
assert_eq!(applier.apply(&[]), 0);
}
}