1use std::sync::atomic::{AtomicU64, Ordering};
50use std::sync::Arc;
51
52use crate::serde_json::{self, Value as JsonValue};
53use crate::storage::backend::{
54 AtomicRemoteBackend, BackendError, BackendObjectVersion, ConditionalDelete, ConditionalPut,
55};
56use serde_json::Map;
57
58static LEASE_TEMP_COUNTER: AtomicU64 = AtomicU64::new(0);
65
66fn lease_temp_path(kind: &str) -> std::path::PathBuf {
67 let unique = LEASE_TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
68 std::env::temp_dir().join(format!(
69 "reddb-lease-{kind}-{}-{}-{unique}.json",
70 std::process::id(),
71 crate::utils::now_unix_nanos()
72 ))
73}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
77pub struct WriterLease {
78 pub database_key: String,
79 pub holder_id: String,
80 pub term: u64,
89 pub generation: u64,
90 pub acquired_at_ms: u64,
91 pub expires_at_ms: u64,
92}
93
94impl WriterLease {
95 pub fn is_expired(&self, now_ms: u64) -> bool {
96 self.expires_at_ms <= now_ms
97 }
98
99 pub fn fenced_by_term(&self, current_term: u64) -> bool {
104 self.term < current_term
105 }
106
107 pub fn fencing_token(&self) -> (u64, u64) {
112 (self.term, self.generation)
113 }
114
115 fn to_json(&self) -> JsonValue {
116 let mut object = Map::new();
117 object.insert(
118 "database_key".to_string(),
119 JsonValue::String(self.database_key.clone()),
120 );
121 object.insert(
122 "holder_id".to_string(),
123 JsonValue::String(self.holder_id.clone()),
124 );
125 object.insert("term".to_string(), JsonValue::Number(self.term as f64));
126 object.insert(
127 "generation".to_string(),
128 JsonValue::Number(self.generation as f64),
129 );
130 object.insert(
131 "acquired_at_ms".to_string(),
132 JsonValue::Number(self.acquired_at_ms as f64),
133 );
134 object.insert(
135 "expires_at_ms".to_string(),
136 JsonValue::Number(self.expires_at_ms as f64),
137 );
138 JsonValue::Object(object)
139 }
140
141 fn from_json(value: &JsonValue) -> Result<Self, LeaseError> {
142 let obj = value
143 .as_object()
144 .ok_or_else(|| LeaseError::InvalidFormat("lease json is not an object".into()))?;
145 Ok(Self {
146 database_key: obj
147 .get("database_key")
148 .and_then(JsonValue::as_str)
149 .ok_or_else(|| LeaseError::InvalidFormat("missing database_key".into()))?
150 .to_string(),
151 holder_id: obj
152 .get("holder_id")
153 .and_then(JsonValue::as_str)
154 .ok_or_else(|| LeaseError::InvalidFormat("missing holder_id".into()))?
155 .to_string(),
156 term: obj
160 .get("term")
161 .and_then(JsonValue::as_u64)
162 .unwrap_or(crate::replication::DEFAULT_REPLICATION_TERM),
163 generation: obj
164 .get("generation")
165 .and_then(JsonValue::as_u64)
166 .ok_or_else(|| LeaseError::InvalidFormat("missing generation".into()))?,
167 acquired_at_ms: obj
168 .get("acquired_at_ms")
169 .and_then(JsonValue::as_u64)
170 .ok_or_else(|| LeaseError::InvalidFormat("missing acquired_at_ms".into()))?,
171 expires_at_ms: obj
172 .get("expires_at_ms")
173 .and_then(JsonValue::as_u64)
174 .ok_or_else(|| LeaseError::InvalidFormat("missing expires_at_ms".into()))?,
175 })
176 }
177}
178
179#[derive(Debug)]
180pub enum LeaseError {
181 Backend(BackendError),
182 Held {
184 current: WriterLease,
185 now_ms: u64,
186 },
187 LostRace {
190 attempted_holder: String,
191 observed: WriterLease,
192 },
193 InvalidFormat(String),
194 Stale {
197 attempted_holder: String,
198 attempted_generation: u64,
199 observed: Option<WriterLease>,
200 },
201 Fenced {
206 attempted_holder: String,
207 attempted_term: u64,
208 current_term: u64,
209 },
210}
211
212impl std::fmt::Display for LeaseError {
213 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214 match self {
215 Self::Backend(err) => write!(f, "lease backend error: {err}"),
216 Self::Held { current, now_ms } => {
217 write!(
218 f,
219 "lease for '{}' held by '{}' (gen {}, expires in {} ms)",
220 current.database_key,
221 current.holder_id,
222 current.generation,
223 current.expires_at_ms.saturating_sub(*now_ms)
224 )
225 }
226 Self::LostRace {
227 attempted_holder,
228 observed,
229 } => write!(
230 f,
231 "lost lease acquire race: '{}' tried to take '{}' but '{}' (gen {}) won",
232 attempted_holder, observed.database_key, observed.holder_id, observed.generation
233 ),
234 Self::InvalidFormat(msg) => write!(f, "invalid lease format: {msg}"),
235 Self::Stale {
236 attempted_holder,
237 attempted_generation,
238 observed,
239 } => match observed {
240 Some(o) => write!(
241 f,
242 "stale lease op: '{}' (gen {}) tried to act, but current is '{}' (gen {})",
243 attempted_holder, attempted_generation, o.holder_id, o.generation
244 ),
245 None => write!(
246 f,
247 "stale lease op: '{}' (gen {}) tried to act, but no lease present",
248 attempted_holder, attempted_generation
249 ),
250 },
251 Self::Fenced {
252 attempted_holder,
253 attempted_term,
254 current_term,
255 } => write!(
256 f,
257 "fenced lease op: '{attempted_holder}' on stale term {attempted_term} \
258 is behind current term {current_term}"
259 ),
260 }
261 }
262}
263
264impl std::error::Error for LeaseError {}
265
266impl From<BackendError> for LeaseError {
267 fn from(value: BackendError) -> Self {
268 Self::Backend(value)
269 }
270}
271
272struct VersionedLease {
273 lease: WriterLease,
274 version: BackendObjectVersion,
275}
276
277pub struct LeaseStore {
287 backend: Arc<dyn AtomicRemoteBackend>,
288 prefix: String,
289}
290
291impl LeaseStore {
292 pub fn new(backend: Arc<dyn AtomicRemoteBackend>) -> Self {
293 Self {
294 backend,
295 prefix: "leases/".to_string(),
296 }
297 }
298
299 pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
300 let p = prefix.into();
301 self.prefix = if p.ends_with('/') { p } else { format!("{p}/") };
302 self
303 }
304
305 fn key_for(&self, database_key: &str) -> String {
306 format!("{}{}.lease.json", self.prefix, database_key)
307 }
308
309 pub fn current(&self, database_key: &str) -> Result<Option<WriterLease>, LeaseError> {
312 self.read_lease(database_key)
313 }
314
315 fn read_lease(&self, database_key: &str) -> Result<Option<WriterLease>, LeaseError> {
316 let key = self.key_for(database_key);
317 let temp = lease_temp_path("read");
318 let downloaded = self.backend.download(&key, &temp)?;
319 if !downloaded {
320 return Ok(None);
321 }
322 let bytes = std::fs::read(&temp)
323 .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
324 let _ = std::fs::remove_file(&temp);
325 let json: JsonValue = serde_json::from_slice(&bytes)
326 .map_err(|err| LeaseError::InvalidFormat(format!("lease json parse: {err}")))?;
327 WriterLease::from_json(&json).map(Some)
328 }
329
330 fn current_versioned(&self, database_key: &str) -> Result<Option<VersionedLease>, LeaseError> {
331 let key = self.key_for(database_key);
332 let before = match self.backend.object_version(&key)? {
333 Some(version) => version,
334 None => return Ok(None),
335 };
336 let temp = lease_temp_path("read");
337 let downloaded = self.backend.download(&key, &temp)?;
338 if !downloaded {
339 return Ok(None);
340 }
341 let after = self.backend.object_version(&key)?;
342 if after.as_ref() != Some(&before) {
343 let _ = std::fs::remove_file(&temp);
344 return Err(LeaseError::Backend(BackendError::PreconditionFailed(
345 "lease object changed while being read".to_string(),
346 )));
347 }
348 let bytes = std::fs::read(&temp)
349 .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
350 let _ = std::fs::remove_file(&temp);
351 let json: JsonValue = serde_json::from_slice(&bytes)
352 .map_err(|err| LeaseError::InvalidFormat(format!("lease json parse: {err}")))?;
353 Ok(Some(VersionedLease {
354 lease: WriterLease::from_json(&json)?,
355 version: before,
356 }))
357 }
358
359 pub fn try_acquire(
370 &self,
371 database_key: &str,
372 holder_id: &str,
373 ttl_ms: u64,
374 ) -> Result<WriterLease, LeaseError> {
375 self.try_acquire_for_term(
376 database_key,
377 holder_id,
378 ttl_ms,
379 crate::replication::DEFAULT_REPLICATION_TERM,
380 )
381 }
382
383 pub fn try_acquire_for_term(
395 &self,
396 database_key: &str,
397 holder_id: &str,
398 ttl_ms: u64,
399 term: u64,
400 ) -> Result<WriterLease, LeaseError> {
401 let now_ms = crate::utils::now_unix_millis();
402
403 let current = self.current_versioned(database_key)?;
404 if let Some(c) = ¤t {
407 if term < c.lease.term {
408 return Err(LeaseError::Fenced {
409 attempted_holder: holder_id.to_string(),
410 attempted_term: term,
411 current_term: c.lease.term,
412 });
413 }
414 }
415 let next_generation = match ¤t {
419 Some(c) if !c.lease.is_expired(now_ms) && c.lease.holder_id != holder_id => {
420 return Err(LeaseError::Held {
421 current: c.lease.clone(),
422 now_ms,
423 });
424 }
425 Some(c) => c.lease.generation.saturating_add(1),
426 None => 1,
427 };
428
429 let new_lease = WriterLease {
430 database_key: database_key.to_string(),
431 holder_id: holder_id.to_string(),
432 term,
433 generation: next_generation,
434 acquired_at_ms: now_ms,
435 expires_at_ms: now_ms.saturating_add(ttl_ms),
436 };
437 let condition = match current {
438 Some(c) => ConditionalPut::IfVersion(c.version),
439 None => ConditionalPut::IfAbsent,
440 };
441 if let Err(err) = self.publish_conditional(&new_lease, condition) {
442 if matches!(
443 err,
444 LeaseError::Backend(BackendError::PreconditionFailed(_))
445 ) {
446 return self.acquire_race_error(database_key, holder_id, now_ms);
447 }
448 return Err(err);
449 }
450
451 match self.current(database_key)? {
453 Some(observed)
454 if observed.holder_id == holder_id
455 && observed.generation == new_lease.generation =>
456 {
457 Ok(new_lease)
458 }
459 Some(observed) => Err(LeaseError::LostRace {
460 attempted_holder: holder_id.to_string(),
461 observed,
462 }),
463 None => Err(LeaseError::LostRace {
464 attempted_holder: holder_id.to_string(),
465 observed: WriterLease {
466 database_key: database_key.to_string(),
467 holder_id: "<missing>".to_string(),
468 term: 0,
469 generation: 0,
470 acquired_at_ms: 0,
471 expires_at_ms: 0,
472 },
473 }),
474 }
475 }
476
477 fn acquire_race_error(
478 &self,
479 database_key: &str,
480 holder_id: &str,
481 now_ms: u64,
482 ) -> Result<WriterLease, LeaseError> {
483 match self.current(database_key)? {
484 Some(observed) if !observed.is_expired(now_ms) && observed.holder_id != holder_id => {
485 Err(LeaseError::Held {
486 current: observed,
487 now_ms,
488 })
489 }
490 Some(observed) => Err(LeaseError::LostRace {
491 attempted_holder: holder_id.to_string(),
492 observed,
493 }),
494 None => Err(LeaseError::LostRace {
495 attempted_holder: holder_id.to_string(),
496 observed: WriterLease {
497 database_key: database_key.to_string(),
498 holder_id: "<missing>".to_string(),
499 term: 0,
500 generation: 0,
501 acquired_at_ms: 0,
502 expires_at_ms: 0,
503 },
504 }),
505 }
506 }
507
508 pub fn refresh(&self, lease: &WriterLease, ttl_ms: u64) -> Result<WriterLease, LeaseError> {
513 let now_ms = crate::utils::now_unix_millis();
514 let observed = self.current_versioned(&lease.database_key)?;
515 match observed {
516 Some(o)
517 if o.lease.holder_id == lease.holder_id
518 && o.lease.generation == lease.generation =>
519 {
520 let mut next = lease.clone();
521 next.expires_at_ms = now_ms.saturating_add(ttl_ms);
522 if let Err(err) =
523 self.publish_conditional(&next, ConditionalPut::IfVersion(o.version))
524 {
525 if matches!(
526 err,
527 LeaseError::Backend(BackendError::PreconditionFailed(_))
528 ) {
529 return Err(LeaseError::Stale {
530 attempted_holder: lease.holder_id.clone(),
531 attempted_generation: lease.generation,
532 observed: self.current(&lease.database_key)?,
533 });
534 }
535 return Err(err);
536 }
537 Ok(next)
538 }
539 other => Err(LeaseError::Stale {
540 attempted_holder: lease.holder_id.clone(),
541 attempted_generation: lease.generation,
542 observed: other.map(|v| v.lease),
543 }),
544 }
545 }
546
547 pub fn refresh_for_term(
557 &self,
558 lease: &WriterLease,
559 ttl_ms: u64,
560 current_term: u64,
561 ) -> Result<WriterLease, LeaseError> {
562 if lease.fenced_by_term(current_term) {
563 return Err(LeaseError::Fenced {
564 attempted_holder: lease.holder_id.clone(),
565 attempted_term: lease.term,
566 current_term,
567 });
568 }
569 self.refresh(lease, ttl_ms)
570 }
571
572 pub fn release(&self, lease: &WriterLease) -> Result<(), LeaseError> {
576 let observed = self.current_versioned(&lease.database_key)?;
577 match observed {
578 Some(o)
579 if o.lease.holder_id == lease.holder_id
580 && o.lease.generation == lease.generation =>
581 {
582 let key = self.key_for(&lease.database_key);
583 if let Err(err) = self
584 .backend
585 .delete_conditional(&key, ConditionalDelete::IfVersion(o.version))
586 {
587 if matches!(err, BackendError::PreconditionFailed(_)) {
588 return Err(LeaseError::Stale {
589 attempted_holder: lease.holder_id.clone(),
590 attempted_generation: lease.generation,
591 observed: self.current(&lease.database_key)?,
592 });
593 }
594 return Err(err.into());
595 }
596 Ok(())
597 }
598 other => Err(LeaseError::Stale {
599 attempted_holder: lease.holder_id.clone(),
600 attempted_generation: lease.generation,
601 observed: other.map(|v| v.lease),
602 }),
603 }
604 }
605
606 fn publish_conditional(
607 &self,
608 lease: &WriterLease,
609 condition: ConditionalPut,
610 ) -> Result<BackendObjectVersion, LeaseError> {
611 let key = self.key_for(&lease.database_key);
612 let json = lease.to_json();
613 let bytes = serde_json::to_vec(&json).map_err(|err| {
614 LeaseError::Backend(BackendError::Internal(format!("serialize lease: {err}")))
615 })?;
616 let temp = lease_temp_path("write");
617 std::fs::write(&temp, &bytes)
618 .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
619 let res = self.backend.upload_conditional(&temp, &key, condition);
620 let _ = std::fs::remove_file(&temp);
621 Ok(res?)
622 }
623}
624
625#[cfg(test)]
626mod tests {
627 use super::*;
628 use crate::storage::backend::LocalBackend;
629 use std::path::Path;
630
631 fn store() -> LeaseStore {
632 LeaseStore::new(Arc::new(LocalBackend)).with_prefix(format!(
633 "{}/leases-test-{}",
634 std::env::temp_dir().to_string_lossy(),
635 std::time::SystemTime::now()
636 .duration_since(std::time::UNIX_EPOCH)
637 .unwrap()
638 .as_nanos()
639 ))
640 }
641
642 #[test]
643 fn first_acquire_assigns_generation_one() {
644 let s = store();
645 let lease = s.try_acquire("db", "writer-a", 60_000).unwrap();
646 assert_eq!(lease.generation, 1);
647 assert_eq!(lease.holder_id, "writer-a");
648 }
649
650 #[test]
651 fn second_holder_rejected_while_first_alive() {
652 let s = store();
653 let _ = s.try_acquire("db", "writer-a", 60_000).unwrap();
654 let err = s.try_acquire("db", "writer-b", 60_000).unwrap_err();
655 match err {
656 LeaseError::Held { current, .. } => {
657 assert_eq!(current.holder_id, "writer-a");
658 assert_eq!(current.generation, 1);
659 }
660 other => panic!("expected Held, got {other:?}"),
661 }
662 }
663
664 #[test]
665 fn expired_lease_is_poachable() {
666 let s = store();
667 let _ = s.try_acquire("db", "writer-a", 1).unwrap();
668 std::thread::sleep(std::time::Duration::from_millis(10));
669 let lease = s.try_acquire("db", "writer-b", 60_000).unwrap();
670 assert_eq!(lease.holder_id, "writer-b");
671 assert_eq!(
672 lease.generation, 2,
673 "generation must increment when poaching"
674 );
675 }
676
677 #[test]
678 fn release_clears_so_anyone_can_take_again() {
679 let s = store();
680 let lease = s.try_acquire("db", "writer-a", 60_000).unwrap();
681 s.release(&lease).unwrap();
682 let next = s.try_acquire("db", "writer-b", 60_000).unwrap();
685 assert_eq!(next.holder_id, "writer-b");
686 assert_eq!(next.generation, 1);
687 }
688
689 #[test]
690 fn refresh_extends_expiration_for_same_holder() {
691 let s = store();
692 let lease = s.try_acquire("db", "writer-a", 1_000).unwrap();
693 std::thread::sleep(std::time::Duration::from_millis(20));
694 let refreshed = s.refresh(&lease, 60_000).unwrap();
695 assert_eq!(refreshed.generation, lease.generation);
696 assert!(refreshed.expires_at_ms > lease.expires_at_ms);
697 }
698
699 #[test]
700 fn refresh_fails_when_someone_else_owns() {
701 let s = store();
702 let lease = s.try_acquire("db", "writer-a", 1).unwrap();
703 std::thread::sleep(std::time::Duration::from_millis(10));
704 let _ = s.try_acquire("db", "writer-b", 60_000).unwrap();
705 let err = s.refresh(&lease, 60_000).unwrap_err();
706 assert!(matches!(err, LeaseError::Stale { .. }));
707 }
708
709 #[test]
710 fn acquire_stamps_term_onto_lease() {
711 let s = store();
712 let lease = s.try_acquire_for_term("db", "writer-a", 60_000, 7).unwrap();
713 assert_eq!(lease.term, 7);
714 assert_eq!(lease.fencing_token(), (7, 1));
715 }
716
717 #[test]
718 fn legacy_lease_defaults_to_base_term() {
719 let s = store();
723 let lease = s.try_acquire("db", "writer-a", 60_000).unwrap();
724 assert_eq!(lease.term, crate::replication::DEFAULT_REPLICATION_TERM);
725 assert!(!lease.fenced_by_term(crate::replication::DEFAULT_REPLICATION_TERM));
726 }
727
728 #[test]
729 fn stale_term_contender_is_fenced_even_when_lease_expired() {
730 let s = store();
734 let _new_primary = s.try_acquire_for_term("db", "new-primary", 1, 5).unwrap();
735 std::thread::sleep(std::time::Duration::from_millis(10));
736 let err = s
737 .try_acquire_for_term("db", "ex-primary", 60_000, 4)
738 .unwrap_err();
739 match err {
740 LeaseError::Fenced {
741 attempted_term,
742 current_term,
743 ..
744 } => {
745 assert_eq!(attempted_term, 4);
746 assert_eq!(current_term, 5);
747 }
748 other => panic!("expected Fenced, got {other:?}"),
749 }
750 }
751
752 #[test]
753 fn same_or_higher_term_contender_may_poach_expired_lease() {
754 let s = store();
758 let _ = s.try_acquire_for_term("db", "old", 1, 5).unwrap();
759 std::thread::sleep(std::time::Duration::from_millis(10));
760 let lease = s.try_acquire_for_term("db", "new", 60_000, 6).unwrap();
761 assert_eq!(lease.holder_id, "new");
762 assert_eq!(lease.term, 6);
763 assert_eq!(lease.generation, 2, "poaching advances the generation");
764 }
765
766 #[test]
767 fn refresh_for_term_fails_closed_once_term_advances() {
768 let s = store();
772 let lease = s.try_acquire_for_term("db", "deposed", 60_000, 4).unwrap();
773 let err = s.refresh_for_term(&lease, 60_000, 5).unwrap_err();
774 match err {
775 LeaseError::Fenced {
776 attempted_holder,
777 attempted_term,
778 current_term,
779 } => {
780 assert_eq!(attempted_holder, "deposed");
781 assert_eq!(attempted_term, 4);
782 assert_eq!(current_term, 5);
783 }
784 other => panic!("expected Fenced, got {other:?}"),
785 }
786 }
787
788 #[test]
789 fn refresh_for_term_succeeds_while_term_holds() {
790 let s = store();
791 let lease = s.try_acquire_for_term("db", "primary", 1_000, 5).unwrap();
792 std::thread::sleep(std::time::Duration::from_millis(20));
793 let refreshed = s.refresh_for_term(&lease, 60_000, 5).unwrap();
794 assert_eq!(refreshed.term, 5);
795 assert!(refreshed.expires_at_ms > lease.expires_at_ms);
796 }
797
798 }