use nodedb_types::DatabaseId;
use std::sync::{Arc, RwLock, Weak};
use nodedb_types::Surrogate;
use super::persist::SurrogateHwmPersist;
use super::registry::SurrogateRegistry;
use super::wal_appender::SurrogateWalAppender;
use crate::control::security::catalog::SystemCatalog;
use crate::control::security::credential::CredentialStore;
use crate::control::state::SharedState;
pub type SurrogateRegistryHandle = Arc<RwLock<SurrogateRegistry>>;
pub struct SurrogateAssigner {
registry: SurrogateRegistryHandle,
credential_store: Arc<CredentialStore>,
wal_appender: Arc<dyn SurrogateWalAppender>,
shared: std::sync::OnceLock<Weak<SharedState>>,
}
impl SurrogateAssigner {
pub fn new(
registry: SurrogateRegistryHandle,
credential_store: Arc<CredentialStore>,
wal_appender: Arc<dyn SurrogateWalAppender>,
) -> Self {
Self {
registry,
credential_store,
wal_appender,
shared: std::sync::OnceLock::new(),
}
}
pub fn install_shared(&self, shared: Weak<SharedState>) {
let _ = self.shared.set(shared);
}
pub fn current_hwm(&self) -> u32 {
self.registry
.read()
.map(|reg| reg.current_hwm())
.unwrap_or_else(|p| p.into_inner().current_hwm())
}
pub fn assign(&self, collection: &str, pk_bytes: &[u8]) -> crate::Result<Surrogate> {
let catalog = match self.credential_store.catalog().as_ref() {
Some(c) => c,
None => return Ok(Surrogate::ZERO),
};
if let Some(s) = catalog.get_surrogate_for_pk(DatabaseId::DEFAULT, collection, pk_bytes)? {
return Ok(s);
}
let registry = self.registry.write().map_err(|_| crate::Error::Internal {
detail: "surrogate registry lock poisoned".into(),
})?;
if let Some(s) = catalog.get_surrogate_for_pk(DatabaseId::DEFAULT, collection, pk_bytes)? {
return Ok(s);
}
let surrogate = registry.alloc_one()?;
catalog.put_surrogate(DatabaseId::DEFAULT, collection, pk_bytes, surrogate)?;
self.wal_appender
.record_bind_to_wal(surrogate.as_u32(), collection, pk_bytes)?;
if registry.should_flush() {
let raft_shared = self.shared.get().and_then(|w| w.upgrade());
let combined = CombinedPersist {
catalog,
wal_appender: self.wal_appender.as_ref(),
raft_shared: raft_shared.as_deref(),
};
registry.flush(&combined)?;
}
Ok(surrogate)
}
pub fn lookup(&self, collection: &str, pk_bytes: &[u8]) -> crate::Result<Option<Surrogate>> {
let catalog = match self.credential_store.catalog().as_ref() {
Some(c) => c,
None => return Ok(Some(Surrogate::ZERO)),
};
catalog.get_surrogate_for_pk(DatabaseId::DEFAULT, collection, pk_bytes)
}
pub fn registry_handle(&self) -> &SurrogateRegistryHandle {
&self.registry
}
pub fn assign_anonymous(&self, collection: &str) -> crate::Result<Surrogate> {
let catalog = match self.credential_store.catalog().as_ref() {
Some(c) => c,
None => return Ok(Surrogate::ZERO),
};
let registry = self.registry.write().map_err(|_| crate::Error::Internal {
detail: "surrogate registry lock poisoned".into(),
})?;
let surrogate = registry.alloc_one()?;
let self_bytes = surrogate.as_u32().to_be_bytes();
catalog.put_surrogate(DatabaseId::DEFAULT, collection, &self_bytes, surrogate)?;
self.wal_appender
.record_bind_to_wal(surrogate.as_u32(), collection, &self_bytes)?;
if registry.should_flush() {
let raft_shared = self.shared.get().and_then(|w| w.upgrade());
let combined = CombinedPersist {
catalog,
wal_appender: self.wal_appender.as_ref(),
raft_shared: raft_shared.as_deref(),
};
registry.flush(&combined)?;
}
Ok(surrogate)
}
}
struct CombinedPersist<'a> {
catalog: &'a SystemCatalog,
wal_appender: &'a dyn SurrogateWalAppender,
raft_shared: Option<&'a SharedState>,
}
impl SurrogateHwmPersist for CombinedPersist<'_> {
fn checkpoint(&self, hwm: u32) -> crate::Result<()> {
self.catalog.put_surrogate_hwm(hwm)?;
self.wal_appender.record_alloc_to_wal(hwm)?;
if let Some(shared) = self.raft_shared
&& let Err(e) = crate::control::metadata_proposer::propose_surrogate_hwm(shared, hwm)
{
tracing::warn!(hwm, error = %e, "surrogate hwm raft propose failed; followers may lag");
}
Ok(())
}
fn load(&self) -> crate::Result<u32> {
self.catalog.get_surrogate_hwm()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::control::security::credential::CredentialStore;
use crate::control::surrogate::wal_appender::NoopWalAppender;
fn open_test() -> (tempfile::TempDir, Arc<SurrogateAssigner>) {
let dir = tempfile::tempdir().unwrap();
let credentials = Arc::new(CredentialStore::open(&dir.path().join("system.redb")).unwrap());
let reg = Arc::new(RwLock::new(SurrogateRegistry::new()));
let wal: Arc<dyn SurrogateWalAppender> = Arc::new(NoopWalAppender);
let a = Arc::new(SurrogateAssigner::new(reg, credentials, wal));
(dir, a)
}
#[test]
fn assign_is_idempotent_for_same_pk() {
let (_dir, a) = open_test();
let s1 = a.assign("users", b"alice").unwrap();
let s2 = a.assign("users", b"alice").unwrap();
assert_eq!(s1, s2);
assert_eq!(s1, Surrogate::new(1));
}
#[test]
fn assign_distinct_pks_returns_distinct_surrogates() {
let (_dir, a) = open_test();
let s1 = a.assign("users", b"alice").unwrap();
let s2 = a.assign("users", b"bob").unwrap();
assert_ne!(s1, s2);
}
#[test]
fn assign_writes_reverse_binding() {
let (_dir, a) = open_test();
let s = a.assign("users", b"alice").unwrap();
let cat = a.credential_store.catalog().as_ref().unwrap();
assert_eq!(
cat.get_pk_for_surrogate(DatabaseId::DEFAULT, "users", s)
.unwrap(),
Some(b"alice".to_vec())
);
}
#[test]
fn assign_persists_hwm_at_flush_threshold() {
let (_dir, a) = open_test();
let n = super::super::registry::FLUSH_OPS_THRESHOLD as usize;
for i in 0..n {
let pk = format!("u{i}");
let _ = a.assign("users", pk.as_bytes()).unwrap();
}
let cat = a.credential_store.catalog().as_ref().unwrap();
let persisted = cat.get_surrogate_hwm().unwrap();
assert!(persisted > 0 && persisted <= n as u32);
}
}