Skip to main content

reddb_server/runtime/
lease_lifecycle.rs

1//! Serverless writer-lease state machine.
2//!
3//! Single owner of the `{acquire, refresh, refresh_failed, lost_race,
4//! release}` transitions. Centralises the side-effects that must
5//! happen together so the `WriteGate` lease state and the
6//! `AuditLogger` lease/* records can never drift out of sync.
7//!
8//! Before this module those transitions lived inline in
9//! `lease_loop::spawn_refresh_thread` and in
10//! `handlers_admin::handle_admin_failover_promote`, with each
11//! caller manually pairing `write_gate.set_lease_state(...)` and
12//! `audit_log.record("lease/...")`. Four call sites, four chances
13//! for drift.
14//!
15//! Test surface:
16//!   * Construct with stand-alone `WriteGate` + `AuditLogger`.
17//!   * Inject a `MarkDraining` callback (production wires it to
18//!     `Lifecycle::mark_draining`).
19//!   * Drive transitions; assert gate state + audit lines together.
20
21use std::sync::{Arc, Mutex};
22
23use crate::api::{RedDBError, RedDBResult};
24use crate::replication::lease::{LeaseError, LeaseStore, WriterLease};
25use crate::runtime::audit_log::{
26    AuditAuthSource, AuditEvent, AuditFieldEscaper, AuditLogger, Outcome,
27};
28use crate::runtime::write_gate::{LeaseGateState, WriteGate};
29
30/// Callback the lifecycle uses to ask the surrounding runtime to
31/// drain when the lease is lost. Production wires it to
32/// `Lifecycle::mark_draining`. Tests pass a recorder.
33pub type MarkDraining = Arc<dyn Fn() + Send + Sync>;
34
35/// Drives the serverless writer lease for one database key.
36///
37/// Owns the `WriterLease` snapshot, the `LeaseStore` it talks to,
38/// and the side-effect channels (`WriteGate`, `AuditLogger`,
39/// `MarkDraining`). All transitions go through methods on this
40/// struct so the gate/audit pair stays consistent.
41pub struct LeaseLifecycle {
42    store: Arc<LeaseStore>,
43    write_gate: Arc<WriteGate>,
44    audit_log: Arc<AuditLogger>,
45    mark_draining: MarkDraining,
46    holder_id: String,
47    database_key: String,
48    ttl_ms: u64,
49    current: Mutex<Option<WriterLease>>,
50}
51
52impl LeaseLifecycle {
53    pub fn new(
54        store: Arc<LeaseStore>,
55        write_gate: Arc<WriteGate>,
56        audit_log: Arc<AuditLogger>,
57        mark_draining: MarkDraining,
58        holder_id: String,
59        database_key: String,
60        ttl_ms: u64,
61    ) -> Self {
62        Self {
63            store,
64            write_gate,
65            audit_log,
66            mark_draining,
67            holder_id,
68            database_key,
69            ttl_ms,
70            current: Mutex::new(None),
71        }
72    }
73
74    pub fn holder_id(&self) -> &str {
75        &self.holder_id
76    }
77
78    pub fn database_key(&self) -> &str {
79        &self.database_key
80    }
81
82    pub fn ttl_ms(&self) -> u64 {
83        self.ttl_ms
84    }
85
86    pub fn current_lease(&self) -> Option<WriterLease> {
87        self.current.lock().expect("poisoned lease mutex").clone()
88    }
89
90    /// Acquire the writer lease and flip the gate to `Held`. Audit
91    /// line records the outcome (ok / err) either way.
92    pub fn try_acquire(&self) -> RedDBResult<()> {
93        match self
94            .store
95            .try_acquire(&self.database_key, &self.holder_id, self.ttl_ms)
96        {
97            Ok(lease) => {
98                *self.current.lock().expect("poisoned lease mutex") = Some(lease.clone());
99                self.write_gate.set_lease_state(LeaseGateState::Held);
100                self.audit_log.record_event(
101                    AuditEvent::builder("lease/acquire")
102                        .principal(self.holder_id.clone())
103                        .source(AuditAuthSource::System)
104                        .resource(self.database_key.clone())
105                        .outcome(Outcome::Success)
106                        .field(AuditFieldEscaper::field(
107                            "generation",
108                            lease.generation as i64,
109                        ))
110                        .field(AuditFieldEscaper::field("ttl_ms", self.ttl_ms))
111                        .build(),
112                );
113                Ok(())
114            }
115            Err(err) => {
116                self.audit_log.record_event(
117                    AuditEvent::builder("lease/acquire")
118                        .principal(self.holder_id.clone())
119                        .source(AuditAuthSource::System)
120                        .resource(self.database_key.clone())
121                        .outcome(Outcome::Error)
122                        .field(AuditFieldEscaper::field("error", err.to_string()))
123                        .build(),
124                );
125                Err(RedDBError::Internal(format!("acquire writer lease: {err}")))
126            }
127        }
128    }
129
130    /// Refresh the held lease. On success, updates the in-memory
131    /// snapshot and returns `Ok(())`. On any backend error, flips
132    /// the gate to `NotHeld`, audits `lease/lost`, asks the runtime
133    /// to drain, and returns `Err`.
134    ///
135    /// The refresh thread should treat `Err` as terminal and exit
136    /// — re-acquiring after a loss could race a freshly promoted
137    /// writer.
138    pub fn refresh(&self) -> RedDBResult<()> {
139        let snapshot = match self.current.lock().expect("poisoned lease mutex").clone() {
140            Some(lease) => lease,
141            None => {
142                return Err(RedDBError::Internal(
143                    "refresh called without an acquired lease".to_string(),
144                ));
145            }
146        };
147        match self.store.refresh(&snapshot, self.ttl_ms) {
148            Ok(updated) => {
149                *self.current.lock().expect("poisoned lease mutex") = Some(updated);
150                Ok(())
151            }
152            Err(err) => {
153                self.on_refresh_lost(err);
154                Err(RedDBError::Internal("writer lease lost".to_string()))
155            }
156        }
157    }
158
159    /// Release the held lease and flip the gate to `NotHeld`. Best-
160    /// effort: a backend failure is logged but does not propagate
161    /// — shutdown should not block on a remote release.
162    pub fn release(&self) -> RedDBResult<()> {
163        let snapshot = match self.current.lock().expect("poisoned lease mutex").take() {
164            Some(lease) => lease,
165            None => {
166                // Idempotent: already released. Make sure the gate is
167                // closed in case a previous transition didn't run.
168                self.write_gate.set_lease_state(LeaseGateState::NotHeld);
169                return Ok(());
170            }
171        };
172        let result = self.store.release(&snapshot);
173        self.write_gate.set_lease_state(LeaseGateState::NotHeld);
174        match result {
175            Ok(()) => {
176                self.audit_log.record_event(
177                    AuditEvent::builder("lease/release")
178                        .principal(self.holder_id.clone())
179                        .source(AuditAuthSource::System)
180                        .resource(self.database_key.clone())
181                        .outcome(Outcome::Success)
182                        .build(),
183                );
184                Ok(())
185            }
186            Err(err) => {
187                self.audit_log.record_event(
188                    AuditEvent::builder("lease/release")
189                        .principal(self.holder_id.clone())
190                        .source(AuditAuthSource::System)
191                        .resource(self.database_key.clone())
192                        .outcome(Outcome::Error)
193                        .field(AuditFieldEscaper::field("error", err.to_string()))
194                        .build(),
195                );
196                tracing::warn!(
197                    target: "reddb::serverless::lease",
198                    error = %err,
199                    "lease release on shutdown failed"
200                );
201                Ok(())
202            }
203        }
204    }
205
206    fn on_refresh_lost(&self, err: LeaseError) {
207        tracing::error!(
208            target: "reddb::serverless::lease",
209            error = %err,
210            holder = %self.holder_id,
211            database_key = %self.database_key,
212            "lease refresh failed; flipping to NotHeld + drain"
213        );
214        *self.current.lock().expect("poisoned lease mutex") = None;
215        self.write_gate.set_lease_state(LeaseGateState::NotHeld);
216        self.audit_log.record_event(
217            AuditEvent::builder("lease/lost")
218                .principal(self.holder_id.clone())
219                .source(AuditAuthSource::System)
220                .resource(self.database_key.clone())
221                .outcome(Outcome::Error)
222                .field(AuditFieldEscaper::field("error", err.to_string()))
223                .build(),
224        );
225        (self.mark_draining)();
226    }
227}
228
229/// Admin-driven failover promotion: acquire the writer lease as a
230/// stand-alone action without flipping the local gate. The instance
231/// stays a `Replica` until the operator restarts it as primary; the
232/// gate flip is deliberately left out so an unintended promotion
233/// can't accept writes mid-process.
234///
235/// Audited under `admin/failover/promote` regardless of outcome.
236pub fn admin_promote_lease(
237    store: &LeaseStore,
238    audit_log: &AuditLogger,
239    database_key: &str,
240    holder_id: &str,
241    ttl_ms: u64,
242) -> Result<WriterLease, LeaseError> {
243    match store.try_acquire(database_key, holder_id, ttl_ms) {
244        Ok(lease) => {
245            audit_log.record_event(
246                AuditEvent::builder("admin/failover/promote")
247                    .principal(lease.holder_id.clone())
248                    .source(AuditAuthSource::System)
249                    .resource(database_key.to_string())
250                    .outcome(Outcome::Success)
251                    .field(AuditFieldEscaper::field(
252                        "holder_id",
253                        lease.holder_id.clone(),
254                    ))
255                    .field(AuditFieldEscaper::field(
256                        "generation",
257                        lease.generation as i64,
258                    ))
259                    .field(AuditFieldEscaper::field("ttl_ms", ttl_ms))
260                    .build(),
261            );
262            Ok(lease)
263        }
264        Err(err) => {
265            audit_log.record_event(
266                AuditEvent::builder("admin/failover/promote")
267                    .principal(holder_id.to_string())
268                    .source(AuditAuthSource::System)
269                    .resource(database_key.to_string())
270                    .outcome(Outcome::Error)
271                    .field(AuditFieldEscaper::field("error", err.to_string()))
272                    .build(),
273            );
274            Err(err)
275        }
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use super::*;
282    use crate::api::RedDBOptions;
283    use crate::storage::backend::LocalBackend;
284    use std::path::PathBuf;
285    use std::sync::atomic::{AtomicUsize, Ordering};
286
287    fn temp_prefix(tag: &str) -> PathBuf {
288        let mut p = PathBuf::from(std::env::temp_dir());
289        p.push(format!(
290            "reddb-lease-lifecycle-{tag}-{}-{}",
291            std::process::id(),
292            crate::utils::now_unix_nanos(),
293        ));
294        std::fs::create_dir_all(&p).unwrap();
295        p
296    }
297
298    fn build_lifecycle(
299        tag: &str,
300    ) -> (
301        Arc<LeaseLifecycle>,
302        Arc<WriteGate>,
303        Arc<AuditLogger>,
304        Arc<AtomicUsize>,
305        PathBuf,
306    ) {
307        let prefix = temp_prefix(tag);
308        let store = Arc::new(
309            LeaseStore::new(Arc::new(LocalBackend))
310                .with_prefix(prefix.to_string_lossy().to_string()),
311        );
312        let mut opts = RedDBOptions::default();
313        opts.read_only = false;
314        let write_gate = Arc::new(WriteGate::from_options(&opts));
315        let audit_log = Arc::new(AuditLogger::for_data_path(&prefix.join("data.rdb")));
316        let drain_counter = Arc::new(AtomicUsize::new(0));
317        let drain_counter_clone = Arc::clone(&drain_counter);
318        let mark_draining: MarkDraining = Arc::new(move || {
319            drain_counter_clone.fetch_add(1, Ordering::SeqCst);
320        });
321        let lifecycle = Arc::new(LeaseLifecycle::new(
322            store,
323            Arc::clone(&write_gate),
324            Arc::clone(&audit_log),
325            mark_draining,
326            "writer-1".to_string(),
327            "main".to_string(),
328            60_000,
329        ));
330        (lifecycle, write_gate, audit_log, drain_counter, prefix)
331    }
332
333    #[test]
334    fn acquire_flips_gate_to_held_and_records_audit() {
335        let (lifecycle, gate, audit, drain, prefix) = build_lifecycle("acquire");
336        assert!(lifecycle.try_acquire().is_ok());
337        assert_eq!(gate.lease_state(), LeaseGateState::Held);
338        assert!(lifecycle.current_lease().is_some());
339        assert_eq!(drain.load(Ordering::SeqCst), 0);
340        // Audit log file should contain one acquire success line.
341        assert!(audit.wait_idle(std::time::Duration::from_secs(2)));
342        let body = std::fs::read_to_string(audit.path()).unwrap();
343        assert!(body.contains("lease/acquire"));
344        assert!(body.contains("\"outcome\":\"success\""));
345        let _ = std::fs::remove_dir_all(&prefix);
346    }
347
348    #[test]
349    fn release_flips_gate_to_not_held_and_clears_inner_state() {
350        let (lifecycle, gate, audit, _drain, prefix) = build_lifecycle("release");
351        lifecycle.try_acquire().unwrap();
352        assert!(lifecycle.release().is_ok());
353        assert_eq!(gate.lease_state(), LeaseGateState::NotHeld);
354        assert!(lifecycle.current_lease().is_none());
355        assert!(audit.wait_idle(std::time::Duration::from_secs(2)));
356        let body = std::fs::read_to_string(audit.path()).unwrap();
357        assert!(body.contains("lease/release"));
358        let _ = std::fs::remove_dir_all(&prefix);
359    }
360
361    #[test]
362    fn release_is_idempotent_when_no_lease_held() {
363        let (lifecycle, gate, _audit, _drain, prefix) = build_lifecycle("release-idem");
364        // No prior acquire; release should still close the gate.
365        assert!(lifecycle.release().is_ok());
366        assert_eq!(gate.lease_state(), LeaseGateState::NotHeld);
367        let _ = std::fs::remove_dir_all(&prefix);
368    }
369
370    #[test]
371    fn refresh_without_acquire_returns_error_without_touching_gate() {
372        let (lifecycle, gate, _audit, drain, prefix) = build_lifecycle("refresh-noop");
373        let err = lifecycle.refresh().unwrap_err();
374        match err {
375            RedDBError::Internal(msg) => assert!(msg.contains("without an acquired lease")),
376            other => panic!("expected Internal, got {other:?}"),
377        }
378        assert_eq!(gate.lease_state(), LeaseGateState::NotRequired);
379        assert_eq!(drain.load(Ordering::SeqCst), 0);
380        let _ = std::fs::remove_dir_all(&prefix);
381    }
382
383    #[test]
384    fn admin_promote_lease_audits_success() {
385        let prefix = temp_prefix("admin-ok");
386        let store = LeaseStore::new(Arc::new(LocalBackend))
387            .with_prefix(prefix.to_string_lossy().to_string());
388        let audit = AuditLogger::for_data_path(&prefix.join("data.rdb"));
389        let lease = admin_promote_lease(&store, &audit, "main", "promoter-1", 30_000).unwrap();
390        assert_eq!(lease.holder_id, "promoter-1");
391        assert!(audit.wait_idle(std::time::Duration::from_secs(2)));
392        let body = std::fs::read_to_string(audit.path()).unwrap();
393        assert!(body.contains("admin/failover/promote"));
394        assert!(body.contains("\"outcome\":\"success\""));
395        let _ = std::fs::remove_dir_all(&prefix);
396    }
397
398    #[test]
399    fn admin_promote_lease_does_not_flip_a_separate_gate() {
400        // The promote helper takes no gate; verify the documentation
401        // contract by constructing a gate and confirming nothing
402        // touches it. (Trivial, but locks the contract in the test
403        // suite so a future refactor that adds a gate-flip side
404        // effect breaks this case.)
405        let prefix = temp_prefix("admin-no-gate");
406        let store = LeaseStore::new(Arc::new(LocalBackend))
407            .with_prefix(prefix.to_string_lossy().to_string());
408        let audit = AuditLogger::for_data_path(&prefix.join("data.rdb"));
409        let mut opts = RedDBOptions::default();
410        opts.read_only = false;
411        let gate = WriteGate::from_options(&opts);
412        let _ = admin_promote_lease(&store, &audit, "main", "promoter-2", 30_000).unwrap();
413        assert_eq!(gate.lease_state(), LeaseGateState::NotRequired);
414        let _ = std::fs::remove_dir_all(&prefix);
415    }
416}