use std::sync::{Arc, Mutex};
use crate::api::{RedDBError, RedDBResult};
use crate::replication::lease::{LeaseError, LeaseStore, WriterLease};
use crate::runtime::audit_log::{
AuditAuthSource, AuditEvent, AuditFieldEscaper, AuditLogger, Outcome,
};
use crate::runtime::write_gate::{LeaseGateState, WriteGate};
pub type MarkDraining = Arc<dyn Fn() + Send + Sync>;
pub struct LeaseLifecycle {
store: Arc<LeaseStore>,
write_gate: Arc<WriteGate>,
audit_log: Arc<AuditLogger>,
mark_draining: MarkDraining,
holder_id: String,
database_key: String,
ttl_ms: u64,
current: Mutex<Option<WriterLease>>,
}
impl LeaseLifecycle {
pub fn new(
store: Arc<LeaseStore>,
write_gate: Arc<WriteGate>,
audit_log: Arc<AuditLogger>,
mark_draining: MarkDraining,
holder_id: String,
database_key: String,
ttl_ms: u64,
) -> Self {
Self {
store,
write_gate,
audit_log,
mark_draining,
holder_id,
database_key,
ttl_ms,
current: Mutex::new(None),
}
}
pub fn holder_id(&self) -> &str {
&self.holder_id
}
pub fn database_key(&self) -> &str {
&self.database_key
}
pub fn ttl_ms(&self) -> u64 {
self.ttl_ms
}
pub fn current_lease(&self) -> Option<WriterLease> {
self.current.lock().expect("poisoned lease mutex").clone()
}
pub fn try_acquire(&self) -> RedDBResult<()> {
match self
.store
.try_acquire(&self.database_key, &self.holder_id, self.ttl_ms)
{
Ok(lease) => {
*self.current.lock().expect("poisoned lease mutex") = Some(lease.clone());
self.write_gate.set_lease_state(LeaseGateState::Held);
self.audit_log.record_event(
AuditEvent::builder("lease/acquire")
.principal(self.holder_id.clone())
.source(AuditAuthSource::System)
.resource(self.database_key.clone())
.outcome(Outcome::Success)
.field(AuditFieldEscaper::field(
"generation",
lease.generation as i64,
))
.field(AuditFieldEscaper::field("ttl_ms", self.ttl_ms))
.build(),
);
Ok(())
}
Err(err) => {
self.audit_log.record_event(
AuditEvent::builder("lease/acquire")
.principal(self.holder_id.clone())
.source(AuditAuthSource::System)
.resource(self.database_key.clone())
.outcome(Outcome::Error)
.field(AuditFieldEscaper::field("error", err.to_string()))
.build(),
);
Err(RedDBError::Internal(format!("acquire writer lease: {err}")))
}
}
}
pub fn refresh(&self) -> RedDBResult<()> {
let snapshot = match self.current.lock().expect("poisoned lease mutex").clone() {
Some(lease) => lease,
None => {
return Err(RedDBError::Internal(
"refresh called without an acquired lease".to_string(),
));
}
};
match self.store.refresh(&snapshot, self.ttl_ms) {
Ok(updated) => {
*self.current.lock().expect("poisoned lease mutex") = Some(updated);
Ok(())
}
Err(err) => {
self.on_refresh_lost(err);
Err(RedDBError::Internal("writer lease lost".to_string()))
}
}
}
pub fn release(&self) -> RedDBResult<()> {
let snapshot = match self.current.lock().expect("poisoned lease mutex").take() {
Some(lease) => lease,
None => {
self.write_gate.set_lease_state(LeaseGateState::NotHeld);
return Ok(());
}
};
let result = self.store.release(&snapshot);
self.write_gate.set_lease_state(LeaseGateState::NotHeld);
match result {
Ok(()) => {
self.audit_log.record_event(
AuditEvent::builder("lease/release")
.principal(self.holder_id.clone())
.source(AuditAuthSource::System)
.resource(self.database_key.clone())
.outcome(Outcome::Success)
.build(),
);
Ok(())
}
Err(err) => {
self.audit_log.record_event(
AuditEvent::builder("lease/release")
.principal(self.holder_id.clone())
.source(AuditAuthSource::System)
.resource(self.database_key.clone())
.outcome(Outcome::Error)
.field(AuditFieldEscaper::field("error", err.to_string()))
.build(),
);
tracing::warn!(
target: "reddb::serverless::lease",
error = %err,
"lease release on shutdown failed"
);
Ok(())
}
}
}
fn on_refresh_lost(&self, err: LeaseError) {
tracing::error!(
target: "reddb::serverless::lease",
error = %err,
holder = %self.holder_id,
database_key = %self.database_key,
"lease refresh failed; flipping to NotHeld + drain"
);
*self.current.lock().expect("poisoned lease mutex") = None;
self.write_gate.set_lease_state(LeaseGateState::NotHeld);
self.audit_log.record_event(
AuditEvent::builder("lease/lost")
.principal(self.holder_id.clone())
.source(AuditAuthSource::System)
.resource(self.database_key.clone())
.outcome(Outcome::Error)
.field(AuditFieldEscaper::field("error", err.to_string()))
.build(),
);
(self.mark_draining)();
}
}
pub fn admin_promote_lease(
store: &LeaseStore,
audit_log: &AuditLogger,
database_key: &str,
holder_id: &str,
ttl_ms: u64,
) -> Result<WriterLease, LeaseError> {
match store.try_acquire(database_key, holder_id, ttl_ms) {
Ok(lease) => {
audit_log.record_event(
AuditEvent::builder("admin/failover/promote")
.principal(lease.holder_id.clone())
.source(AuditAuthSource::System)
.resource(database_key.to_string())
.outcome(Outcome::Success)
.field(AuditFieldEscaper::field(
"holder_id",
lease.holder_id.clone(),
))
.field(AuditFieldEscaper::field(
"generation",
lease.generation as i64,
))
.field(AuditFieldEscaper::field("ttl_ms", ttl_ms))
.build(),
);
Ok(lease)
}
Err(err) => {
audit_log.record_event(
AuditEvent::builder("admin/failover/promote")
.principal(holder_id.to_string())
.source(AuditAuthSource::System)
.resource(database_key.to_string())
.outcome(Outcome::Error)
.field(AuditFieldEscaper::field("error", err.to_string()))
.build(),
);
Err(err)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::api::RedDBOptions;
use crate::storage::backend::LocalBackend;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
fn temp_prefix(tag: &str) -> PathBuf {
let mut p = PathBuf::from(std::env::temp_dir());
p.push(format!(
"reddb-lease-lifecycle-{tag}-{}-{}",
std::process::id(),
crate::utils::now_unix_nanos(),
));
std::fs::create_dir_all(&p).unwrap();
p
}
fn build_lifecycle(
tag: &str,
) -> (
Arc<LeaseLifecycle>,
Arc<WriteGate>,
Arc<AuditLogger>,
Arc<AtomicUsize>,
PathBuf,
) {
let prefix = temp_prefix(tag);
let store = Arc::new(
LeaseStore::new(Arc::new(LocalBackend))
.with_prefix(prefix.to_string_lossy().to_string()),
);
let mut opts = RedDBOptions::default();
opts.read_only = false;
let write_gate = Arc::new(WriteGate::from_options(&opts));
let audit_log = Arc::new(AuditLogger::for_data_path(&prefix.join("data.rdb")));
let drain_counter = Arc::new(AtomicUsize::new(0));
let drain_counter_clone = Arc::clone(&drain_counter);
let mark_draining: MarkDraining = Arc::new(move || {
drain_counter_clone.fetch_add(1, Ordering::SeqCst);
});
let lifecycle = Arc::new(LeaseLifecycle::new(
store,
Arc::clone(&write_gate),
Arc::clone(&audit_log),
mark_draining,
"writer-1".to_string(),
"main".to_string(),
60_000,
));
(lifecycle, write_gate, audit_log, drain_counter, prefix)
}
#[test]
fn acquire_flips_gate_to_held_and_records_audit() {
let (lifecycle, gate, audit, drain, prefix) = build_lifecycle("acquire");
assert!(lifecycle.try_acquire().is_ok());
assert_eq!(gate.lease_state(), LeaseGateState::Held);
assert!(lifecycle.current_lease().is_some());
assert_eq!(drain.load(Ordering::SeqCst), 0);
assert!(audit.wait_idle(std::time::Duration::from_secs(2)));
let body = std::fs::read_to_string(audit.path()).unwrap();
assert!(body.contains("lease/acquire"));
assert!(body.contains("\"outcome\":\"success\""));
let _ = std::fs::remove_dir_all(&prefix);
}
#[test]
fn release_flips_gate_to_not_held_and_clears_inner_state() {
let (lifecycle, gate, audit, _drain, prefix) = build_lifecycle("release");
lifecycle.try_acquire().unwrap();
assert!(lifecycle.release().is_ok());
assert_eq!(gate.lease_state(), LeaseGateState::NotHeld);
assert!(lifecycle.current_lease().is_none());
assert!(audit.wait_idle(std::time::Duration::from_secs(2)));
let body = std::fs::read_to_string(audit.path()).unwrap();
assert!(body.contains("lease/release"));
let _ = std::fs::remove_dir_all(&prefix);
}
#[test]
fn release_is_idempotent_when_no_lease_held() {
let (lifecycle, gate, _audit, _drain, prefix) = build_lifecycle("release-idem");
assert!(lifecycle.release().is_ok());
assert_eq!(gate.lease_state(), LeaseGateState::NotHeld);
let _ = std::fs::remove_dir_all(&prefix);
}
#[test]
fn refresh_without_acquire_returns_error_without_touching_gate() {
let (lifecycle, gate, _audit, drain, prefix) = build_lifecycle("refresh-noop");
let err = lifecycle.refresh().unwrap_err();
match err {
RedDBError::Internal(msg) => assert!(msg.contains("without an acquired lease")),
other => panic!("expected Internal, got {other:?}"),
}
assert_eq!(gate.lease_state(), LeaseGateState::NotRequired);
assert_eq!(drain.load(Ordering::SeqCst), 0);
let _ = std::fs::remove_dir_all(&prefix);
}
#[test]
fn admin_promote_lease_audits_success() {
let prefix = temp_prefix("admin-ok");
let store = LeaseStore::new(Arc::new(LocalBackend))
.with_prefix(prefix.to_string_lossy().to_string());
let audit = AuditLogger::for_data_path(&prefix.join("data.rdb"));
let lease = admin_promote_lease(&store, &audit, "main", "promoter-1", 30_000).unwrap();
assert_eq!(lease.holder_id, "promoter-1");
assert!(audit.wait_idle(std::time::Duration::from_secs(2)));
let body = std::fs::read_to_string(audit.path()).unwrap();
assert!(body.contains("admin/failover/promote"));
assert!(body.contains("\"outcome\":\"success\""));
let _ = std::fs::remove_dir_all(&prefix);
}
#[test]
fn admin_promote_lease_does_not_flip_a_separate_gate() {
let prefix = temp_prefix("admin-no-gate");
let store = LeaseStore::new(Arc::new(LocalBackend))
.with_prefix(prefix.to_string_lossy().to_string());
let audit = AuditLogger::for_data_path(&prefix.join("data.rdb"));
let mut opts = RedDBOptions::default();
opts.read_only = false;
let gate = WriteGate::from_options(&opts);
let _ = admin_promote_lease(&store, &audit, "main", "promoter-2", 30_000).unwrap();
assert_eq!(gate.lease_state(), LeaseGateState::NotRequired);
let _ = std::fs::remove_dir_all(&prefix);
}
}