1use std::sync::{Arc, Mutex};
22
23use crate::api::{RedDBError, RedDBResult};
24use crate::replication::lease::{LeaseError, LeaseStore, WriterLease};
25use crate::runtime::audit_log::{
26 AuditAuthSource, AuditEvent, AuditFieldEscaper, AuditLogger, Outcome,
27};
28use crate::runtime::write_gate::{LeaseGateState, WriteGate};
29
30pub type MarkDraining = Arc<dyn Fn() + Send + Sync>;
34pub type CurrentTerm = Arc<dyn Fn() -> u64 + Send + Sync>;
35
36pub struct LeaseLifecycle {
43 store: Arc<LeaseStore>,
44 write_gate: Arc<WriteGate>,
45 audit_log: Arc<AuditLogger>,
46 mark_draining: MarkDraining,
47 current_term: CurrentTerm,
48 holder_id: String,
49 database_key: String,
50 ttl_ms: u64,
51 current: Mutex<Option<WriterLease>>,
52}
53
54impl LeaseLifecycle {
55 pub fn new(
56 store: Arc<LeaseStore>,
57 write_gate: Arc<WriteGate>,
58 audit_log: Arc<AuditLogger>,
59 mark_draining: MarkDraining,
60 current_term: CurrentTerm,
61 holder_id: String,
62 database_key: String,
63 ttl_ms: u64,
64 ) -> Self {
65 Self {
66 store,
67 write_gate,
68 audit_log,
69 mark_draining,
70 current_term,
71 holder_id,
72 database_key,
73 ttl_ms,
74 current: Mutex::new(None),
75 }
76 }
77
78 pub fn holder_id(&self) -> &str {
79 &self.holder_id
80 }
81
82 pub fn database_key(&self) -> &str {
83 &self.database_key
84 }
85
86 pub fn ttl_ms(&self) -> u64 {
87 self.ttl_ms
88 }
89
90 pub fn current_lease(&self) -> Option<WriterLease> {
91 self.current.lock().expect("poisoned lease mutex").clone()
92 }
93
94 pub fn try_acquire(&self) -> RedDBResult<()> {
97 let term = (self.current_term)();
98 match self.store.try_acquire_for_term(
99 &self.database_key,
100 &self.holder_id,
101 self.ttl_ms,
102 term,
103 ) {
104 Ok(lease) => {
105 *self.current.lock().expect("poisoned lease mutex") = Some(lease.clone());
106 self.write_gate.set_lease_state(LeaseGateState::Held);
107 self.audit_log.record_event(
108 AuditEvent::builder("lease/acquire")
109 .principal(self.holder_id.clone())
110 .source(AuditAuthSource::System)
111 .resource(self.database_key.clone())
112 .outcome(Outcome::Success)
113 .field(AuditFieldEscaper::field(
114 "generation",
115 lease.generation as i64,
116 ))
117 .field(AuditFieldEscaper::field("term", lease.term as i64))
118 .field(AuditFieldEscaper::field("ttl_ms", self.ttl_ms))
119 .build(),
120 );
121 Ok(())
122 }
123 Err(err) => {
124 self.audit_log.record_event(
125 AuditEvent::builder("lease/acquire")
126 .principal(self.holder_id.clone())
127 .source(AuditAuthSource::System)
128 .resource(self.database_key.clone())
129 .outcome(Outcome::Error)
130 .field(AuditFieldEscaper::field("error", err.to_string()))
131 .build(),
132 );
133 Err(RedDBError::Internal(format!("acquire writer lease: {err}")))
134 }
135 }
136 }
137
138 pub fn refresh(&self) -> RedDBResult<()> {
147 let snapshot = match self.current.lock().expect("poisoned lease mutex").clone() {
148 Some(lease) => lease,
149 None => {
150 return Err(RedDBError::Internal(
151 "refresh called without an acquired lease".to_string(),
152 ));
153 }
154 };
155 let term = (self.current_term)();
156 match self.store.refresh_for_term(&snapshot, self.ttl_ms, term) {
157 Ok(updated) => {
158 *self.current.lock().expect("poisoned lease mutex") = Some(updated);
159 Ok(())
160 }
161 Err(err) => {
162 self.on_refresh_lost(err);
163 Err(RedDBError::Internal("writer lease lost".to_string()))
164 }
165 }
166 }
167
168 pub fn release(&self) -> RedDBResult<()> {
172 let snapshot = match self.current.lock().expect("poisoned lease mutex").take() {
173 Some(lease) => lease,
174 None => {
175 self.write_gate.set_lease_state(LeaseGateState::NotHeld);
178 return Ok(());
179 }
180 };
181 let result = self.store.release(&snapshot);
182 self.write_gate.set_lease_state(LeaseGateState::NotHeld);
183 match result {
184 Ok(()) => {
185 self.audit_log.record_event(
186 AuditEvent::builder("lease/release")
187 .principal(self.holder_id.clone())
188 .source(AuditAuthSource::System)
189 .resource(self.database_key.clone())
190 .outcome(Outcome::Success)
191 .build(),
192 );
193 Ok(())
194 }
195 Err(err) => {
196 self.audit_log.record_event(
197 AuditEvent::builder("lease/release")
198 .principal(self.holder_id.clone())
199 .source(AuditAuthSource::System)
200 .resource(self.database_key.clone())
201 .outcome(Outcome::Error)
202 .field(AuditFieldEscaper::field("error", err.to_string()))
203 .build(),
204 );
205 tracing::warn!(
206 target: "reddb::serverless::lease",
207 error = %err,
208 "lease release on shutdown failed"
209 );
210 Ok(())
211 }
212 }
213 }
214
215 fn on_refresh_lost(&self, err: LeaseError) {
216 tracing::error!(
217 target: "reddb::serverless::lease",
218 error = %err,
219 holder = %self.holder_id,
220 database_key = %self.database_key,
221 "lease refresh failed; flipping to NotHeld + drain"
222 );
223 *self.current.lock().expect("poisoned lease mutex") = None;
224 self.write_gate.set_lease_state(LeaseGateState::NotHeld);
225 self.audit_log.record_event(
226 AuditEvent::builder("lease/lost")
227 .principal(self.holder_id.clone())
228 .source(AuditAuthSource::System)
229 .resource(self.database_key.clone())
230 .outcome(Outcome::Error)
231 .field(AuditFieldEscaper::field("error", err.to_string()))
232 .build(),
233 );
234 (self.mark_draining)();
235 }
236}
237
238pub fn admin_promote_lease(
246 store: &LeaseStore,
247 audit_log: &AuditLogger,
248 database_key: &str,
249 holder_id: &str,
250 ttl_ms: u64,
251 term: u64,
252) -> Result<WriterLease, LeaseError> {
253 match store.try_acquire_for_term(database_key, holder_id, ttl_ms, term) {
254 Ok(lease) => {
255 audit_log.record_event(
256 AuditEvent::builder("admin/failover/promote")
257 .principal(lease.holder_id.clone())
258 .source(AuditAuthSource::System)
259 .resource(database_key.to_string())
260 .outcome(Outcome::Success)
261 .field(AuditFieldEscaper::field(
262 "holder_id",
263 lease.holder_id.clone(),
264 ))
265 .field(AuditFieldEscaper::field(
266 "generation",
267 lease.generation as i64,
268 ))
269 .field(AuditFieldEscaper::field("term", lease.term as i64))
270 .field(AuditFieldEscaper::field("ttl_ms", ttl_ms))
271 .build(),
272 );
273 Ok(lease)
274 }
275 Err(err) => {
276 audit_log.record_event(
277 AuditEvent::builder("admin/failover/promote")
278 .principal(holder_id.to_string())
279 .source(AuditAuthSource::System)
280 .resource(database_key.to_string())
281 .outcome(Outcome::Error)
282 .field(AuditFieldEscaper::field("error", err.to_string()))
283 .build(),
284 );
285 Err(err)
286 }
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use super::*;
293 use crate::api::RedDBOptions;
294 use crate::storage::backend::LocalBackend;
295 use std::path::PathBuf;
296 use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
297
298 fn temp_prefix(tag: &str) -> PathBuf {
299 let mut p = PathBuf::from(std::env::temp_dir());
300 p.push(format!(
301 "reddb-lease-lifecycle-{tag}-{}-{}",
302 std::process::id(),
303 crate::utils::now_unix_nanos(),
304 ));
305 std::fs::create_dir_all(&p).unwrap();
306 p
307 }
308
309 fn build_lifecycle(
310 tag: &str,
311 ) -> (
312 Arc<LeaseLifecycle>,
313 Arc<WriteGate>,
314 Arc<AuditLogger>,
315 Arc<AtomicUsize>,
316 PathBuf,
317 ) {
318 let (lifecycle, write_gate, audit_log, drain_counter, _current_term, prefix) =
319 build_lifecycle_with_term(tag, 1);
320 (lifecycle, write_gate, audit_log, drain_counter, prefix)
321 }
322
323 fn build_lifecycle_with_term(
324 tag: &str,
325 initial_term: u64,
326 ) -> (
327 Arc<LeaseLifecycle>,
328 Arc<WriteGate>,
329 Arc<AuditLogger>,
330 Arc<AtomicUsize>,
331 Arc<AtomicU64>,
332 PathBuf,
333 ) {
334 let prefix = temp_prefix(tag);
335 let store = Arc::new(
336 LeaseStore::new(Arc::new(LocalBackend))
337 .with_prefix(prefix.to_string_lossy().to_string()),
338 );
339 let mut opts = RedDBOptions::default();
340 opts.read_only = false;
341 let write_gate = Arc::new(WriteGate::from_options(&opts));
342 let audit_log = Arc::new(AuditLogger::for_data_path(&prefix.join("data.rdb")));
343 let drain_counter = Arc::new(AtomicUsize::new(0));
344 let drain_counter_clone = Arc::clone(&drain_counter);
345 let mark_draining: MarkDraining = Arc::new(move || {
346 drain_counter_clone.fetch_add(1, Ordering::SeqCst);
347 });
348 let current_term_value = Arc::new(AtomicU64::new(initial_term));
349 let current_term_clone = Arc::clone(¤t_term_value);
350 let current_term: CurrentTerm = Arc::new(move || current_term_clone.load(Ordering::SeqCst));
351 let lifecycle = Arc::new(LeaseLifecycle::new(
352 store,
353 Arc::clone(&write_gate),
354 Arc::clone(&audit_log),
355 mark_draining,
356 current_term,
357 "writer-1".to_string(),
358 "main".to_string(),
359 60_000,
360 ));
361 (
362 lifecycle,
363 write_gate,
364 audit_log,
365 drain_counter,
366 current_term_value,
367 prefix,
368 )
369 }
370
371 #[test]
372 fn acquire_flips_gate_to_held_and_records_audit() {
373 let (lifecycle, gate, audit, drain, prefix) = build_lifecycle("acquire");
374 assert!(lifecycle.try_acquire().is_ok());
375 assert_eq!(gate.lease_state(), LeaseGateState::Held);
376 assert!(lifecycle.current_lease().is_some());
377 assert_eq!(drain.load(Ordering::SeqCst), 0);
378 assert!(audit.wait_idle(std::time::Duration::from_secs(2)));
380 let body = std::fs::read_to_string(audit.path()).unwrap();
381 assert!(body.contains("lease/acquire"));
382 assert!(body.contains("\"outcome\":\"success\""));
383 let _ = std::fs::remove_dir_all(&prefix);
384 }
385
386 #[test]
387 fn acquire_stamps_current_replication_term() {
388 let (lifecycle, _gate, _audit, _drain, _term, prefix) =
389 build_lifecycle_with_term("acquire-term", 7);
390 lifecycle.try_acquire().unwrap();
391 assert_eq!(lifecycle.current_lease().unwrap().term, 7);
392 let _ = std::fs::remove_dir_all(&prefix);
393 }
394
395 #[test]
396 fn release_flips_gate_to_not_held_and_clears_inner_state() {
397 let (lifecycle, gate, audit, _drain, prefix) = build_lifecycle("release");
398 lifecycle.try_acquire().unwrap();
399 assert!(lifecycle.release().is_ok());
400 assert_eq!(gate.lease_state(), LeaseGateState::NotHeld);
401 assert!(lifecycle.current_lease().is_none());
402 assert!(audit.wait_idle(std::time::Duration::from_secs(2)));
403 let body = std::fs::read_to_string(audit.path()).unwrap();
404 assert!(body.contains("lease/release"));
405 let _ = std::fs::remove_dir_all(&prefix);
406 }
407
408 #[test]
409 fn release_is_idempotent_when_no_lease_held() {
410 let (lifecycle, gate, _audit, _drain, prefix) = build_lifecycle("release-idem");
411 assert!(lifecycle.release().is_ok());
413 assert_eq!(gate.lease_state(), LeaseGateState::NotHeld);
414 let _ = std::fs::remove_dir_all(&prefix);
415 }
416
417 #[test]
418 fn refresh_without_acquire_returns_error_without_touching_gate() {
419 let (lifecycle, gate, _audit, drain, prefix) = build_lifecycle("refresh-noop");
420 let err = lifecycle.refresh().unwrap_err();
421 match err {
422 RedDBError::Internal(msg) => assert!(msg.contains("without an acquired lease")),
423 other => panic!("expected Internal, got {other:?}"),
424 }
425 assert_eq!(gate.lease_state(), LeaseGateState::NotRequired);
426 assert_eq!(drain.load(Ordering::SeqCst), 0);
427 let _ = std::fs::remove_dir_all(&prefix);
428 }
429
430 #[test]
431 fn refresh_fences_when_current_term_advances() {
432 let (lifecycle, gate, _audit, drain, current_term, prefix) =
433 build_lifecycle_with_term("refresh-stale-term", 4);
434 lifecycle.try_acquire().unwrap();
435 current_term.store(5, Ordering::SeqCst);
436
437 let err = lifecycle.refresh().unwrap_err();
438
439 match err {
440 RedDBError::Internal(msg) => assert_eq!(msg, "writer lease lost"),
441 other => panic!("expected Internal, got {other:?}"),
442 }
443 assert_eq!(gate.lease_state(), LeaseGateState::NotHeld);
444 assert_eq!(drain.load(Ordering::SeqCst), 1);
445 assert!(lifecycle.current_lease().is_none());
446 let _ = std::fs::remove_dir_all(&prefix);
447 }
448
449 #[test]
450 fn admin_promote_lease_audits_success() {
451 let prefix = temp_prefix("admin-ok");
452 let store = LeaseStore::new(Arc::new(LocalBackend))
453 .with_prefix(prefix.to_string_lossy().to_string());
454 let audit = AuditLogger::for_data_path(&prefix.join("data.rdb"));
455 let lease = admin_promote_lease(&store, &audit, "main", "promoter-1", 30_000, 9).unwrap();
456 assert_eq!(lease.holder_id, "promoter-1");
457 assert_eq!(lease.term, 9);
458 assert!(audit.wait_idle(std::time::Duration::from_secs(2)));
459 let body = std::fs::read_to_string(audit.path()).unwrap();
460 assert!(body.contains("admin/failover/promote"));
461 assert!(body.contains("\"outcome\":\"success\""));
462 let _ = std::fs::remove_dir_all(&prefix);
463 }
464
465 #[test]
466 fn admin_promote_lease_does_not_flip_a_separate_gate() {
467 let prefix = temp_prefix("admin-no-gate");
473 let store = LeaseStore::new(Arc::new(LocalBackend))
474 .with_prefix(prefix.to_string_lossy().to_string());
475 let audit = AuditLogger::for_data_path(&prefix.join("data.rdb"));
476 let mut opts = RedDBOptions::default();
477 opts.read_only = false;
478 let gate = WriteGate::from_options(&opts);
479 let _ = admin_promote_lease(&store, &audit, "main", "promoter-2", 30_000, 1).unwrap();
480 assert_eq!(gate.lease_state(), LeaseGateState::NotRequired);
481 let _ = std::fs::remove_dir_all(&prefix);
482 }
483}