Skip to main content

reddb_server/runtime/
lease_lifecycle.rs

1//! Serverless writer-lease state machine.
2//!
3//! Single owner of the `{acquire, refresh, refresh_failed, lost_race,
4//! release}` transitions. Centralises the side-effects that must
5//! happen together so the `WriteGate` lease state and the
6//! `AuditLogger` lease/* records can never drift out of sync.
7//!
8//! Before this module those transitions lived inline in
9//! `lease_loop::spawn_refresh_thread` and in
10//! `handlers_admin::handle_admin_failover_promote`, with each
11//! caller manually pairing `write_gate.set_lease_state(...)` and
12//! `audit_log.record("lease/...")`. Four call sites, four chances
13//! for drift.
14//!
15//! Test surface:
16//!   * Construct with stand-alone `WriteGate` + `AuditLogger`.
17//!   * Inject a `MarkDraining` callback (production wires it to
18//!     `Lifecycle::mark_draining`).
19//!   * Drive transitions; assert gate state + audit lines together.
20
21use std::sync::{Arc, Mutex};
22
23use crate::api::{RedDBError, RedDBResult};
24use crate::replication::lease::{LeaseError, LeaseStore, WriterLease};
25use crate::runtime::audit_log::{
26    AuditAuthSource, AuditEvent, AuditFieldEscaper, AuditLogger, Outcome,
27};
28use crate::runtime::write_gate::{LeaseGateState, WriteGate};
29
30/// Callback the lifecycle uses to ask the surrounding runtime to
31/// drain when the lease is lost. Production wires it to
32/// `Lifecycle::mark_draining`. Tests pass a recorder.
33pub type MarkDraining = Arc<dyn Fn() + Send + Sync>;
34pub type CurrentTerm = Arc<dyn Fn() -> u64 + Send + Sync>;
35
36/// Drives the serverless writer lease for one database key.
37///
38/// Owns the `WriterLease` snapshot, the `LeaseStore` it talks to,
39/// and the side-effect channels (`WriteGate`, `AuditLogger`,
40/// `MarkDraining`). All transitions go through methods on this
41/// struct so the gate/audit pair stays consistent.
42pub struct LeaseLifecycle {
43    store: Arc<LeaseStore>,
44    write_gate: Arc<WriteGate>,
45    audit_log: Arc<AuditLogger>,
46    mark_draining: MarkDraining,
47    current_term: CurrentTerm,
48    holder_id: String,
49    database_key: String,
50    ttl_ms: u64,
51    current: Mutex<Option<WriterLease>>,
52}
53
54impl LeaseLifecycle {
55    pub fn new(
56        store: Arc<LeaseStore>,
57        write_gate: Arc<WriteGate>,
58        audit_log: Arc<AuditLogger>,
59        mark_draining: MarkDraining,
60        current_term: CurrentTerm,
61        holder_id: String,
62        database_key: String,
63        ttl_ms: u64,
64    ) -> Self {
65        Self {
66            store,
67            write_gate,
68            audit_log,
69            mark_draining,
70            current_term,
71            holder_id,
72            database_key,
73            ttl_ms,
74            current: Mutex::new(None),
75        }
76    }
77
78    pub fn holder_id(&self) -> &str {
79        &self.holder_id
80    }
81
82    pub fn database_key(&self) -> &str {
83        &self.database_key
84    }
85
86    pub fn ttl_ms(&self) -> u64 {
87        self.ttl_ms
88    }
89
90    pub fn current_lease(&self) -> Option<WriterLease> {
91        self.current.lock().expect("poisoned lease mutex").clone()
92    }
93
94    /// Acquire the writer lease and flip the gate to `Held`. Audit
95    /// line records the outcome (ok / err) either way.
96    pub fn try_acquire(&self) -> RedDBResult<()> {
97        let term = (self.current_term)();
98        match self.store.try_acquire_for_term(
99            &self.database_key,
100            &self.holder_id,
101            self.ttl_ms,
102            term,
103        ) {
104            Ok(lease) => {
105                *self.current.lock().expect("poisoned lease mutex") = Some(lease.clone());
106                self.write_gate.set_lease_state(LeaseGateState::Held);
107                self.audit_log.record_event(
108                    AuditEvent::builder("lease/acquire")
109                        .principal(self.holder_id.clone())
110                        .source(AuditAuthSource::System)
111                        .resource(self.database_key.clone())
112                        .outcome(Outcome::Success)
113                        .field(AuditFieldEscaper::field(
114                            "generation",
115                            lease.generation as i64,
116                        ))
117                        .field(AuditFieldEscaper::field("term", lease.term as i64))
118                        .field(AuditFieldEscaper::field("ttl_ms", self.ttl_ms))
119                        .build(),
120                );
121                Ok(())
122            }
123            Err(err) => {
124                self.audit_log.record_event(
125                    AuditEvent::builder("lease/acquire")
126                        .principal(self.holder_id.clone())
127                        .source(AuditAuthSource::System)
128                        .resource(self.database_key.clone())
129                        .outcome(Outcome::Error)
130                        .field(AuditFieldEscaper::field("error", err.to_string()))
131                        .build(),
132                );
133                Err(RedDBError::Internal(format!("acquire writer lease: {err}")))
134            }
135        }
136    }
137
138    /// Refresh the held lease. On success, updates the in-memory
139    /// snapshot and returns `Ok(())`. On any backend error, flips
140    /// the gate to `NotHeld`, audits `lease/lost`, asks the runtime
141    /// to drain, and returns `Err`.
142    ///
143    /// The refresh thread should treat `Err` as terminal and exit
144    /// — re-acquiring after a loss could race a freshly promoted
145    /// writer.
146    pub fn refresh(&self) -> RedDBResult<()> {
147        let snapshot = match self.current.lock().expect("poisoned lease mutex").clone() {
148            Some(lease) => lease,
149            None => {
150                return Err(RedDBError::Internal(
151                    "refresh called without an acquired lease".to_string(),
152                ));
153            }
154        };
155        let term = (self.current_term)();
156        match self.store.refresh_for_term(&snapshot, self.ttl_ms, term) {
157            Ok(updated) => {
158                *self.current.lock().expect("poisoned lease mutex") = Some(updated);
159                Ok(())
160            }
161            Err(err) => {
162                self.on_refresh_lost(err);
163                Err(RedDBError::Internal("writer lease lost".to_string()))
164            }
165        }
166    }
167
168    /// Release the held lease and flip the gate to `NotHeld`. Best-
169    /// effort: a backend failure is logged but does not propagate
170    /// — shutdown should not block on a remote release.
171    pub fn release(&self) -> RedDBResult<()> {
172        let snapshot = match self.current.lock().expect("poisoned lease mutex").take() {
173            Some(lease) => lease,
174            None => {
175                // Idempotent: already released. Make sure the gate is
176                // closed in case a previous transition didn't run.
177                self.write_gate.set_lease_state(LeaseGateState::NotHeld);
178                return Ok(());
179            }
180        };
181        let result = self.store.release(&snapshot);
182        self.write_gate.set_lease_state(LeaseGateState::NotHeld);
183        match result {
184            Ok(()) => {
185                self.audit_log.record_event(
186                    AuditEvent::builder("lease/release")
187                        .principal(self.holder_id.clone())
188                        .source(AuditAuthSource::System)
189                        .resource(self.database_key.clone())
190                        .outcome(Outcome::Success)
191                        .build(),
192                );
193                Ok(())
194            }
195            Err(err) => {
196                self.audit_log.record_event(
197                    AuditEvent::builder("lease/release")
198                        .principal(self.holder_id.clone())
199                        .source(AuditAuthSource::System)
200                        .resource(self.database_key.clone())
201                        .outcome(Outcome::Error)
202                        .field(AuditFieldEscaper::field("error", err.to_string()))
203                        .build(),
204                );
205                tracing::warn!(
206                    target: "reddb::serverless::lease",
207                    error = %err,
208                    "lease release on shutdown failed"
209                );
210                Ok(())
211            }
212        }
213    }
214
215    fn on_refresh_lost(&self, err: LeaseError) {
216        tracing::error!(
217            target: "reddb::serverless::lease",
218            error = %err,
219            holder = %self.holder_id,
220            database_key = %self.database_key,
221            "lease refresh failed; flipping to NotHeld + drain"
222        );
223        *self.current.lock().expect("poisoned lease mutex") = None;
224        self.write_gate.set_lease_state(LeaseGateState::NotHeld);
225        self.audit_log.record_event(
226            AuditEvent::builder("lease/lost")
227                .principal(self.holder_id.clone())
228                .source(AuditAuthSource::System)
229                .resource(self.database_key.clone())
230                .outcome(Outcome::Error)
231                .field(AuditFieldEscaper::field("error", err.to_string()))
232                .build(),
233        );
234        (self.mark_draining)();
235    }
236}
237
238/// Admin-driven failover promotion: acquire the writer lease as a
239/// stand-alone action without flipping the local gate. The instance
240/// stays a `Replica` until the operator restarts it as primary; the
241/// gate flip is deliberately left out so an unintended promotion
242/// can't accept writes mid-process.
243///
244/// Audited under `admin/failover/promote` regardless of outcome.
245pub fn admin_promote_lease(
246    store: &LeaseStore,
247    audit_log: &AuditLogger,
248    database_key: &str,
249    holder_id: &str,
250    ttl_ms: u64,
251    term: u64,
252) -> Result<WriterLease, LeaseError> {
253    match store.try_acquire_for_term(database_key, holder_id, ttl_ms, term) {
254        Ok(lease) => {
255            audit_log.record_event(
256                AuditEvent::builder("admin/failover/promote")
257                    .principal(lease.holder_id.clone())
258                    .source(AuditAuthSource::System)
259                    .resource(database_key.to_string())
260                    .outcome(Outcome::Success)
261                    .field(AuditFieldEscaper::field(
262                        "holder_id",
263                        lease.holder_id.clone(),
264                    ))
265                    .field(AuditFieldEscaper::field(
266                        "generation",
267                        lease.generation as i64,
268                    ))
269                    .field(AuditFieldEscaper::field("term", lease.term as i64))
270                    .field(AuditFieldEscaper::field("ttl_ms", ttl_ms))
271                    .build(),
272            );
273            Ok(lease)
274        }
275        Err(err) => {
276            audit_log.record_event(
277                AuditEvent::builder("admin/failover/promote")
278                    .principal(holder_id.to_string())
279                    .source(AuditAuthSource::System)
280                    .resource(database_key.to_string())
281                    .outcome(Outcome::Error)
282                    .field(AuditFieldEscaper::field("error", err.to_string()))
283                    .build(),
284            );
285            Err(err)
286        }
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use super::*;
293    use crate::api::RedDBOptions;
294    use crate::storage::backend::LocalBackend;
295    use std::path::PathBuf;
296    use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
297
298    fn temp_prefix(tag: &str) -> PathBuf {
299        let mut p = PathBuf::from(std::env::temp_dir());
300        p.push(format!(
301            "reddb-lease-lifecycle-{tag}-{}-{}",
302            std::process::id(),
303            crate::utils::now_unix_nanos(),
304        ));
305        std::fs::create_dir_all(&p).unwrap();
306        p
307    }
308
309    fn build_lifecycle(
310        tag: &str,
311    ) -> (
312        Arc<LeaseLifecycle>,
313        Arc<WriteGate>,
314        Arc<AuditLogger>,
315        Arc<AtomicUsize>,
316        PathBuf,
317    ) {
318        let (lifecycle, write_gate, audit_log, drain_counter, _current_term, prefix) =
319            build_lifecycle_with_term(tag, 1);
320        (lifecycle, write_gate, audit_log, drain_counter, prefix)
321    }
322
323    fn build_lifecycle_with_term(
324        tag: &str,
325        initial_term: u64,
326    ) -> (
327        Arc<LeaseLifecycle>,
328        Arc<WriteGate>,
329        Arc<AuditLogger>,
330        Arc<AtomicUsize>,
331        Arc<AtomicU64>,
332        PathBuf,
333    ) {
334        let prefix = temp_prefix(tag);
335        let store = Arc::new(
336            LeaseStore::new(Arc::new(LocalBackend))
337                .with_prefix(prefix.to_string_lossy().to_string()),
338        );
339        let mut opts = RedDBOptions::default();
340        opts.read_only = false;
341        let write_gate = Arc::new(WriteGate::from_options(&opts));
342        let audit_log = Arc::new(AuditLogger::for_data_path(&prefix.join("data.rdb")));
343        let drain_counter = Arc::new(AtomicUsize::new(0));
344        let drain_counter_clone = Arc::clone(&drain_counter);
345        let mark_draining: MarkDraining = Arc::new(move || {
346            drain_counter_clone.fetch_add(1, Ordering::SeqCst);
347        });
348        let current_term_value = Arc::new(AtomicU64::new(initial_term));
349        let current_term_clone = Arc::clone(&current_term_value);
350        let current_term: CurrentTerm = Arc::new(move || current_term_clone.load(Ordering::SeqCst));
351        let lifecycle = Arc::new(LeaseLifecycle::new(
352            store,
353            Arc::clone(&write_gate),
354            Arc::clone(&audit_log),
355            mark_draining,
356            current_term,
357            "writer-1".to_string(),
358            "main".to_string(),
359            60_000,
360        ));
361        (
362            lifecycle,
363            write_gate,
364            audit_log,
365            drain_counter,
366            current_term_value,
367            prefix,
368        )
369    }
370
371    #[test]
372    fn acquire_flips_gate_to_held_and_records_audit() {
373        let (lifecycle, gate, audit, drain, prefix) = build_lifecycle("acquire");
374        assert!(lifecycle.try_acquire().is_ok());
375        assert_eq!(gate.lease_state(), LeaseGateState::Held);
376        assert!(lifecycle.current_lease().is_some());
377        assert_eq!(drain.load(Ordering::SeqCst), 0);
378        // Audit log file should contain one acquire success line.
379        assert!(audit.wait_idle(std::time::Duration::from_secs(2)));
380        let body = std::fs::read_to_string(audit.path()).unwrap();
381        assert!(body.contains("lease/acquire"));
382        assert!(body.contains("\"outcome\":\"success\""));
383        let _ = std::fs::remove_dir_all(&prefix);
384    }
385
386    #[test]
387    fn acquire_stamps_current_replication_term() {
388        let (lifecycle, _gate, _audit, _drain, _term, prefix) =
389            build_lifecycle_with_term("acquire-term", 7);
390        lifecycle.try_acquire().unwrap();
391        assert_eq!(lifecycle.current_lease().unwrap().term, 7);
392        let _ = std::fs::remove_dir_all(&prefix);
393    }
394
395    #[test]
396    fn release_flips_gate_to_not_held_and_clears_inner_state() {
397        let (lifecycle, gate, audit, _drain, prefix) = build_lifecycle("release");
398        lifecycle.try_acquire().unwrap();
399        assert!(lifecycle.release().is_ok());
400        assert_eq!(gate.lease_state(), LeaseGateState::NotHeld);
401        assert!(lifecycle.current_lease().is_none());
402        assert!(audit.wait_idle(std::time::Duration::from_secs(2)));
403        let body = std::fs::read_to_string(audit.path()).unwrap();
404        assert!(body.contains("lease/release"));
405        let _ = std::fs::remove_dir_all(&prefix);
406    }
407
408    #[test]
409    fn release_is_idempotent_when_no_lease_held() {
410        let (lifecycle, gate, _audit, _drain, prefix) = build_lifecycle("release-idem");
411        // No prior acquire; release should still close the gate.
412        assert!(lifecycle.release().is_ok());
413        assert_eq!(gate.lease_state(), LeaseGateState::NotHeld);
414        let _ = std::fs::remove_dir_all(&prefix);
415    }
416
417    #[test]
418    fn refresh_without_acquire_returns_error_without_touching_gate() {
419        let (lifecycle, gate, _audit, drain, prefix) = build_lifecycle("refresh-noop");
420        let err = lifecycle.refresh().unwrap_err();
421        match err {
422            RedDBError::Internal(msg) => assert!(msg.contains("without an acquired lease")),
423            other => panic!("expected Internal, got {other:?}"),
424        }
425        assert_eq!(gate.lease_state(), LeaseGateState::NotRequired);
426        assert_eq!(drain.load(Ordering::SeqCst), 0);
427        let _ = std::fs::remove_dir_all(&prefix);
428    }
429
430    #[test]
431    fn refresh_fences_when_current_term_advances() {
432        let (lifecycle, gate, _audit, drain, current_term, prefix) =
433            build_lifecycle_with_term("refresh-stale-term", 4);
434        lifecycle.try_acquire().unwrap();
435        current_term.store(5, Ordering::SeqCst);
436
437        let err = lifecycle.refresh().unwrap_err();
438
439        match err {
440            RedDBError::Internal(msg) => assert_eq!(msg, "writer lease lost"),
441            other => panic!("expected Internal, got {other:?}"),
442        }
443        assert_eq!(gate.lease_state(), LeaseGateState::NotHeld);
444        assert_eq!(drain.load(Ordering::SeqCst), 1);
445        assert!(lifecycle.current_lease().is_none());
446        let _ = std::fs::remove_dir_all(&prefix);
447    }
448
449    #[test]
450    fn admin_promote_lease_audits_success() {
451        let prefix = temp_prefix("admin-ok");
452        let store = LeaseStore::new(Arc::new(LocalBackend))
453            .with_prefix(prefix.to_string_lossy().to_string());
454        let audit = AuditLogger::for_data_path(&prefix.join("data.rdb"));
455        let lease = admin_promote_lease(&store, &audit, "main", "promoter-1", 30_000, 9).unwrap();
456        assert_eq!(lease.holder_id, "promoter-1");
457        assert_eq!(lease.term, 9);
458        assert!(audit.wait_idle(std::time::Duration::from_secs(2)));
459        let body = std::fs::read_to_string(audit.path()).unwrap();
460        assert!(body.contains("admin/failover/promote"));
461        assert!(body.contains("\"outcome\":\"success\""));
462        let _ = std::fs::remove_dir_all(&prefix);
463    }
464
465    #[test]
466    fn admin_promote_lease_does_not_flip_a_separate_gate() {
467        // The promote helper takes no gate; verify the documentation
468        // contract by constructing a gate and confirming nothing
469        // touches it. (Trivial, but locks the contract in the test
470        // suite so a future refactor that adds a gate-flip side
471        // effect breaks this case.)
472        let prefix = temp_prefix("admin-no-gate");
473        let store = LeaseStore::new(Arc::new(LocalBackend))
474            .with_prefix(prefix.to_string_lossy().to_string());
475        let audit = AuditLogger::for_data_path(&prefix.join("data.rdb"));
476        let mut opts = RedDBOptions::default();
477        opts.read_only = false;
478        let gate = WriteGate::from_options(&opts);
479        let _ = admin_promote_lease(&store, &audit, "main", "promoter-2", 30_000, 1).unwrap();
480        assert_eq!(gate.lease_state(), LeaseGateState::NotRequired);
481        let _ = std::fs::remove_dir_all(&prefix);
482    }
483}