1use std::sync::{Arc, Mutex};
22
23use crate::api::{RedDBError, RedDBResult};
24use crate::replication::lease::{LeaseError, LeaseStore, WriterLease};
25use crate::runtime::audit_log::{
26 AuditAuthSource, AuditEvent, AuditFieldEscaper, AuditLogger, Outcome,
27};
28use crate::runtime::write_gate::{LeaseGateState, WriteGate};
29
30pub type MarkDraining = Arc<dyn Fn() + Send + Sync>;
34
35pub struct LeaseLifecycle {
42 store: Arc<LeaseStore>,
43 write_gate: Arc<WriteGate>,
44 audit_log: Arc<AuditLogger>,
45 mark_draining: MarkDraining,
46 holder_id: String,
47 database_key: String,
48 ttl_ms: u64,
49 current: Mutex<Option<WriterLease>>,
50}
51
52impl LeaseLifecycle {
53 pub fn new(
54 store: Arc<LeaseStore>,
55 write_gate: Arc<WriteGate>,
56 audit_log: Arc<AuditLogger>,
57 mark_draining: MarkDraining,
58 holder_id: String,
59 database_key: String,
60 ttl_ms: u64,
61 ) -> Self {
62 Self {
63 store,
64 write_gate,
65 audit_log,
66 mark_draining,
67 holder_id,
68 database_key,
69 ttl_ms,
70 current: Mutex::new(None),
71 }
72 }
73
74 pub fn holder_id(&self) -> &str {
75 &self.holder_id
76 }
77
78 pub fn database_key(&self) -> &str {
79 &self.database_key
80 }
81
82 pub fn ttl_ms(&self) -> u64 {
83 self.ttl_ms
84 }
85
86 pub fn current_lease(&self) -> Option<WriterLease> {
87 self.current.lock().expect("poisoned lease mutex").clone()
88 }
89
90 pub fn try_acquire(&self) -> RedDBResult<()> {
93 match self
94 .store
95 .try_acquire(&self.database_key, &self.holder_id, self.ttl_ms)
96 {
97 Ok(lease) => {
98 *self.current.lock().expect("poisoned lease mutex") = Some(lease.clone());
99 self.write_gate.set_lease_state(LeaseGateState::Held);
100 self.audit_log.record_event(
101 AuditEvent::builder("lease/acquire")
102 .principal(self.holder_id.clone())
103 .source(AuditAuthSource::System)
104 .resource(self.database_key.clone())
105 .outcome(Outcome::Success)
106 .field(AuditFieldEscaper::field(
107 "generation",
108 lease.generation as i64,
109 ))
110 .field(AuditFieldEscaper::field("ttl_ms", self.ttl_ms))
111 .build(),
112 );
113 Ok(())
114 }
115 Err(err) => {
116 self.audit_log.record_event(
117 AuditEvent::builder("lease/acquire")
118 .principal(self.holder_id.clone())
119 .source(AuditAuthSource::System)
120 .resource(self.database_key.clone())
121 .outcome(Outcome::Error)
122 .field(AuditFieldEscaper::field("error", err.to_string()))
123 .build(),
124 );
125 Err(RedDBError::Internal(format!("acquire writer lease: {err}")))
126 }
127 }
128 }
129
130 pub fn refresh(&self) -> RedDBResult<()> {
139 let snapshot = match self.current.lock().expect("poisoned lease mutex").clone() {
140 Some(lease) => lease,
141 None => {
142 return Err(RedDBError::Internal(
143 "refresh called without an acquired lease".to_string(),
144 ));
145 }
146 };
147 match self.store.refresh(&snapshot, self.ttl_ms) {
148 Ok(updated) => {
149 *self.current.lock().expect("poisoned lease mutex") = Some(updated);
150 Ok(())
151 }
152 Err(err) => {
153 self.on_refresh_lost(err);
154 Err(RedDBError::Internal("writer lease lost".to_string()))
155 }
156 }
157 }
158
159 pub fn release(&self) -> RedDBResult<()> {
163 let snapshot = match self.current.lock().expect("poisoned lease mutex").take() {
164 Some(lease) => lease,
165 None => {
166 self.write_gate.set_lease_state(LeaseGateState::NotHeld);
169 return Ok(());
170 }
171 };
172 let result = self.store.release(&snapshot);
173 self.write_gate.set_lease_state(LeaseGateState::NotHeld);
174 match result {
175 Ok(()) => {
176 self.audit_log.record_event(
177 AuditEvent::builder("lease/release")
178 .principal(self.holder_id.clone())
179 .source(AuditAuthSource::System)
180 .resource(self.database_key.clone())
181 .outcome(Outcome::Success)
182 .build(),
183 );
184 Ok(())
185 }
186 Err(err) => {
187 self.audit_log.record_event(
188 AuditEvent::builder("lease/release")
189 .principal(self.holder_id.clone())
190 .source(AuditAuthSource::System)
191 .resource(self.database_key.clone())
192 .outcome(Outcome::Error)
193 .field(AuditFieldEscaper::field("error", err.to_string()))
194 .build(),
195 );
196 tracing::warn!(
197 target: "reddb::serverless::lease",
198 error = %err,
199 "lease release on shutdown failed"
200 );
201 Ok(())
202 }
203 }
204 }
205
206 fn on_refresh_lost(&self, err: LeaseError) {
207 tracing::error!(
208 target: "reddb::serverless::lease",
209 error = %err,
210 holder = %self.holder_id,
211 database_key = %self.database_key,
212 "lease refresh failed; flipping to NotHeld + drain"
213 );
214 *self.current.lock().expect("poisoned lease mutex") = None;
215 self.write_gate.set_lease_state(LeaseGateState::NotHeld);
216 self.audit_log.record_event(
217 AuditEvent::builder("lease/lost")
218 .principal(self.holder_id.clone())
219 .source(AuditAuthSource::System)
220 .resource(self.database_key.clone())
221 .outcome(Outcome::Error)
222 .field(AuditFieldEscaper::field("error", err.to_string()))
223 .build(),
224 );
225 (self.mark_draining)();
226 }
227}
228
229pub fn admin_promote_lease(
237 store: &LeaseStore,
238 audit_log: &AuditLogger,
239 database_key: &str,
240 holder_id: &str,
241 ttl_ms: u64,
242) -> Result<WriterLease, LeaseError> {
243 match store.try_acquire(database_key, holder_id, ttl_ms) {
244 Ok(lease) => {
245 audit_log.record_event(
246 AuditEvent::builder("admin/failover/promote")
247 .principal(lease.holder_id.clone())
248 .source(AuditAuthSource::System)
249 .resource(database_key.to_string())
250 .outcome(Outcome::Success)
251 .field(AuditFieldEscaper::field(
252 "holder_id",
253 lease.holder_id.clone(),
254 ))
255 .field(AuditFieldEscaper::field(
256 "generation",
257 lease.generation as i64,
258 ))
259 .field(AuditFieldEscaper::field("ttl_ms", ttl_ms))
260 .build(),
261 );
262 Ok(lease)
263 }
264 Err(err) => {
265 audit_log.record_event(
266 AuditEvent::builder("admin/failover/promote")
267 .principal(holder_id.to_string())
268 .source(AuditAuthSource::System)
269 .resource(database_key.to_string())
270 .outcome(Outcome::Error)
271 .field(AuditFieldEscaper::field("error", err.to_string()))
272 .build(),
273 );
274 Err(err)
275 }
276 }
277}
278
279#[cfg(test)]
280mod tests {
281 use super::*;
282 use crate::api::RedDBOptions;
283 use crate::storage::backend::LocalBackend;
284 use std::path::PathBuf;
285 use std::sync::atomic::{AtomicUsize, Ordering};
286
287 fn temp_prefix(tag: &str) -> PathBuf {
288 let mut p = PathBuf::from(std::env::temp_dir());
289 p.push(format!(
290 "reddb-lease-lifecycle-{tag}-{}-{}",
291 std::process::id(),
292 crate::utils::now_unix_nanos(),
293 ));
294 std::fs::create_dir_all(&p).unwrap();
295 p
296 }
297
298 fn build_lifecycle(
299 tag: &str,
300 ) -> (
301 Arc<LeaseLifecycle>,
302 Arc<WriteGate>,
303 Arc<AuditLogger>,
304 Arc<AtomicUsize>,
305 PathBuf,
306 ) {
307 let prefix = temp_prefix(tag);
308 let store = Arc::new(
309 LeaseStore::new(Arc::new(LocalBackend))
310 .with_prefix(prefix.to_string_lossy().to_string()),
311 );
312 let mut opts = RedDBOptions::default();
313 opts.read_only = false;
314 let write_gate = Arc::new(WriteGate::from_options(&opts));
315 let audit_log = Arc::new(AuditLogger::for_data_path(&prefix.join("data.rdb")));
316 let drain_counter = Arc::new(AtomicUsize::new(0));
317 let drain_counter_clone = Arc::clone(&drain_counter);
318 let mark_draining: MarkDraining = Arc::new(move || {
319 drain_counter_clone.fetch_add(1, Ordering::SeqCst);
320 });
321 let lifecycle = Arc::new(LeaseLifecycle::new(
322 store,
323 Arc::clone(&write_gate),
324 Arc::clone(&audit_log),
325 mark_draining,
326 "writer-1".to_string(),
327 "main".to_string(),
328 60_000,
329 ));
330 (lifecycle, write_gate, audit_log, drain_counter, prefix)
331 }
332
333 #[test]
334 fn acquire_flips_gate_to_held_and_records_audit() {
335 let (lifecycle, gate, audit, drain, prefix) = build_lifecycle("acquire");
336 assert!(lifecycle.try_acquire().is_ok());
337 assert_eq!(gate.lease_state(), LeaseGateState::Held);
338 assert!(lifecycle.current_lease().is_some());
339 assert_eq!(drain.load(Ordering::SeqCst), 0);
340 assert!(audit.wait_idle(std::time::Duration::from_secs(2)));
342 let body = std::fs::read_to_string(audit.path()).unwrap();
343 assert!(body.contains("lease/acquire"));
344 assert!(body.contains("\"outcome\":\"success\""));
345 let _ = std::fs::remove_dir_all(&prefix);
346 }
347
348 #[test]
349 fn release_flips_gate_to_not_held_and_clears_inner_state() {
350 let (lifecycle, gate, audit, _drain, prefix) = build_lifecycle("release");
351 lifecycle.try_acquire().unwrap();
352 assert!(lifecycle.release().is_ok());
353 assert_eq!(gate.lease_state(), LeaseGateState::NotHeld);
354 assert!(lifecycle.current_lease().is_none());
355 assert!(audit.wait_idle(std::time::Duration::from_secs(2)));
356 let body = std::fs::read_to_string(audit.path()).unwrap();
357 assert!(body.contains("lease/release"));
358 let _ = std::fs::remove_dir_all(&prefix);
359 }
360
361 #[test]
362 fn release_is_idempotent_when_no_lease_held() {
363 let (lifecycle, gate, _audit, _drain, prefix) = build_lifecycle("release-idem");
364 assert!(lifecycle.release().is_ok());
366 assert_eq!(gate.lease_state(), LeaseGateState::NotHeld);
367 let _ = std::fs::remove_dir_all(&prefix);
368 }
369
370 #[test]
371 fn refresh_without_acquire_returns_error_without_touching_gate() {
372 let (lifecycle, gate, _audit, drain, prefix) = build_lifecycle("refresh-noop");
373 let err = lifecycle.refresh().unwrap_err();
374 match err {
375 RedDBError::Internal(msg) => assert!(msg.contains("without an acquired lease")),
376 other => panic!("expected Internal, got {other:?}"),
377 }
378 assert_eq!(gate.lease_state(), LeaseGateState::NotRequired);
379 assert_eq!(drain.load(Ordering::SeqCst), 0);
380 let _ = std::fs::remove_dir_all(&prefix);
381 }
382
383 #[test]
384 fn admin_promote_lease_audits_success() {
385 let prefix = temp_prefix("admin-ok");
386 let store = LeaseStore::new(Arc::new(LocalBackend))
387 .with_prefix(prefix.to_string_lossy().to_string());
388 let audit = AuditLogger::for_data_path(&prefix.join("data.rdb"));
389 let lease = admin_promote_lease(&store, &audit, "main", "promoter-1", 30_000).unwrap();
390 assert_eq!(lease.holder_id, "promoter-1");
391 assert!(audit.wait_idle(std::time::Duration::from_secs(2)));
392 let body = std::fs::read_to_string(audit.path()).unwrap();
393 assert!(body.contains("admin/failover/promote"));
394 assert!(body.contains("\"outcome\":\"success\""));
395 let _ = std::fs::remove_dir_all(&prefix);
396 }
397
398 #[test]
399 fn admin_promote_lease_does_not_flip_a_separate_gate() {
400 let prefix = temp_prefix("admin-no-gate");
406 let store = LeaseStore::new(Arc::new(LocalBackend))
407 .with_prefix(prefix.to_string_lossy().to_string());
408 let audit = AuditLogger::for_data_path(&prefix.join("data.rdb"));
409 let mut opts = RedDBOptions::default();
410 opts.read_only = false;
411 let gate = WriteGate::from_options(&opts);
412 let _ = admin_promote_lease(&store, &audit, "main", "promoter-2", 30_000).unwrap();
413 assert_eq!(gate.lease_state(), LeaseGateState::NotRequired);
414 let _ = std::fs::remove_dir_all(&prefix);
415 }
416}