1use std::sync::Arc;
40
41use crate::storage::backend::{
42 AtomicRemoteBackend, BackendError, BackendObjectVersion, ConditionalDelete, ConditionalPut,
43};
44
45pub use reddb_file::ServerlessWriterLease as WriterLease;
46
47#[derive(Debug)]
48pub enum LeaseError {
49 Backend(BackendError),
50 Held {
52 current: WriterLease,
53 now_ms: u64,
54 },
55 LostRace {
58 attempted_holder: String,
59 observed: WriterLease,
60 },
61 InvalidFormat(String),
62 Stale {
65 attempted_holder: String,
66 attempted_generation: u64,
67 observed: Option<WriterLease>,
68 },
69 Fenced {
74 attempted_holder: String,
75 attempted_term: u64,
76 current_term: u64,
77 },
78}
79
80impl std::fmt::Display for LeaseError {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 match self {
83 Self::Backend(err) => write!(f, "lease backend error: {err}"),
84 Self::Held { current, now_ms } => {
85 write!(
86 f,
87 "lease for '{}' held by '{}' (gen {}, expires in {} ms)",
88 current.database_key,
89 current.holder_id,
90 current.generation,
91 current.expires_at_ms.saturating_sub(*now_ms)
92 )
93 }
94 Self::LostRace {
95 attempted_holder,
96 observed,
97 } => write!(
98 f,
99 "lost lease acquire race: '{}' tried to take '{}' but '{}' (gen {}) won",
100 attempted_holder, observed.database_key, observed.holder_id, observed.generation
101 ),
102 Self::InvalidFormat(msg) => write!(f, "invalid lease format: {msg}"),
103 Self::Stale {
104 attempted_holder,
105 attempted_generation,
106 observed,
107 } => match observed {
108 Some(o) => write!(
109 f,
110 "stale lease op: '{}' (gen {}) tried to act, but current is '{}' (gen {})",
111 attempted_holder, attempted_generation, o.holder_id, o.generation
112 ),
113 None => write!(
114 f,
115 "stale lease op: '{}' (gen {}) tried to act, but no lease present",
116 attempted_holder, attempted_generation
117 ),
118 },
119 Self::Fenced {
120 attempted_holder,
121 attempted_term,
122 current_term,
123 } => write!(
124 f,
125 "fenced lease op: '{attempted_holder}' on stale term {attempted_term} \
126 is behind current term {current_term}"
127 ),
128 }
129 }
130}
131
132impl std::error::Error for LeaseError {}
133
134impl From<BackendError> for LeaseError {
135 fn from(value: BackendError) -> Self {
136 Self::Backend(value)
137 }
138}
139
140struct VersionedLease {
141 lease: WriterLease,
142 version: BackendObjectVersion,
143}
144
145pub struct LeaseStore {
155 backend: Arc<dyn AtomicRemoteBackend>,
156 prefix: String,
157}
158
159impl LeaseStore {
160 pub fn new(backend: Arc<dyn AtomicRemoteBackend>) -> Self {
161 Self {
162 backend,
163 prefix: "leases/".to_string(),
164 }
165 }
166
167 pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
168 let p = prefix.into();
169 self.prefix = if p.ends_with('/') { p } else { format!("{p}/") };
170 self
171 }
172
173 fn key_for(&self, database_key: &str) -> String {
174 reddb_file::serverless_writer_lease_key(&self.prefix, database_key)
175 }
176
177 pub fn current(&self, database_key: &str) -> Result<Option<WriterLease>, LeaseError> {
180 self.read_lease(database_key)
181 }
182
183 fn read_lease(&self, database_key: &str) -> Result<Option<WriterLease>, LeaseError> {
184 let key = self.key_for(database_key);
185 let temp = reddb_file::ServerlessWriterLeaseTempFile::new("read");
186 let downloaded = self.backend.download(&key, temp.path())?;
187 if !downloaded {
188 return Ok(None);
189 }
190 let bytes = temp
191 .read_bytes()
192 .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
193 decode_writer_lease(&bytes).map(Some)
194 }
195
196 fn current_versioned(&self, database_key: &str) -> Result<Option<VersionedLease>, LeaseError> {
197 let key = self.key_for(database_key);
198 let before = match self.backend.object_version(&key)? {
199 Some(version) => version,
200 None => return Ok(None),
201 };
202 let temp = reddb_file::ServerlessWriterLeaseTempFile::new("read");
203 let downloaded = self.backend.download(&key, temp.path())?;
204 if !downloaded {
205 return Ok(None);
206 }
207 let after = self.backend.object_version(&key)?;
208 if after.as_ref() != Some(&before) {
209 return Err(LeaseError::Backend(BackendError::PreconditionFailed(
210 "lease object changed while being read".to_string(),
211 )));
212 }
213 let bytes = temp
214 .read_bytes()
215 .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
216 Ok(Some(VersionedLease {
217 lease: decode_writer_lease(&bytes)?,
218 version: before,
219 }))
220 }
221
222 pub fn try_acquire(
233 &self,
234 database_key: &str,
235 holder_id: &str,
236 ttl_ms: u64,
237 ) -> Result<WriterLease, LeaseError> {
238 self.try_acquire_for_term(
239 database_key,
240 holder_id,
241 ttl_ms,
242 crate::replication::DEFAULT_REPLICATION_TERM,
243 )
244 }
245
246 pub fn try_acquire_for_term(
258 &self,
259 database_key: &str,
260 holder_id: &str,
261 ttl_ms: u64,
262 term: u64,
263 ) -> Result<WriterLease, LeaseError> {
264 let now_ms = crate::utils::now_unix_millis();
265
266 let current = self.current_versioned(database_key)?;
267 if let Some(c) = ¤t {
270 if term < c.lease.term {
271 return Err(LeaseError::Fenced {
272 attempted_holder: holder_id.to_string(),
273 attempted_term: term,
274 current_term: c.lease.term,
275 });
276 }
277 }
278 let next_generation = match ¤t {
282 Some(c) if !c.lease.is_expired(now_ms) && c.lease.holder_id != holder_id => {
283 return Err(LeaseError::Held {
284 current: c.lease.clone(),
285 now_ms,
286 });
287 }
288 Some(c) => c.lease.generation.saturating_add(1),
289 None => 1,
290 };
291
292 let new_lease = WriterLease {
293 database_key: database_key.to_string(),
294 holder_id: holder_id.to_string(),
295 term,
296 generation: next_generation,
297 acquired_at_ms: now_ms,
298 expires_at_ms: now_ms.saturating_add(ttl_ms),
299 };
300 let condition = match current {
301 Some(c) => ConditionalPut::IfVersion(c.version),
302 None => ConditionalPut::IfAbsent,
303 };
304 if let Err(err) = self.publish_conditional(&new_lease, condition) {
305 if matches!(
306 err,
307 LeaseError::Backend(BackendError::PreconditionFailed(_))
308 ) {
309 return self.acquire_race_error(database_key, holder_id, now_ms);
310 }
311 return Err(err);
312 }
313
314 match self.current(database_key)? {
316 Some(observed)
317 if observed.holder_id == holder_id
318 && observed.generation == new_lease.generation =>
319 {
320 Ok(new_lease)
321 }
322 Some(observed) => Err(LeaseError::LostRace {
323 attempted_holder: holder_id.to_string(),
324 observed,
325 }),
326 None => Err(LeaseError::LostRace {
327 attempted_holder: holder_id.to_string(),
328 observed: WriterLease {
329 database_key: database_key.to_string(),
330 holder_id: "<missing>".to_string(),
331 term: 0,
332 generation: 0,
333 acquired_at_ms: 0,
334 expires_at_ms: 0,
335 },
336 }),
337 }
338 }
339
340 fn acquire_race_error(
341 &self,
342 database_key: &str,
343 holder_id: &str,
344 now_ms: u64,
345 ) -> Result<WriterLease, LeaseError> {
346 match self.current(database_key)? {
347 Some(observed) if !observed.is_expired(now_ms) && observed.holder_id != holder_id => {
348 Err(LeaseError::Held {
349 current: observed,
350 now_ms,
351 })
352 }
353 Some(observed) => Err(LeaseError::LostRace {
354 attempted_holder: holder_id.to_string(),
355 observed,
356 }),
357 None => Err(LeaseError::LostRace {
358 attempted_holder: holder_id.to_string(),
359 observed: WriterLease {
360 database_key: database_key.to_string(),
361 holder_id: "<missing>".to_string(),
362 term: 0,
363 generation: 0,
364 acquired_at_ms: 0,
365 expires_at_ms: 0,
366 },
367 }),
368 }
369 }
370
371 pub fn refresh(&self, lease: &WriterLease, ttl_ms: u64) -> Result<WriterLease, LeaseError> {
376 let now_ms = crate::utils::now_unix_millis();
377 let observed = self.current_versioned(&lease.database_key)?;
378 match observed {
379 Some(o)
380 if o.lease.holder_id == lease.holder_id
381 && o.lease.generation == lease.generation =>
382 {
383 let mut next = lease.clone();
384 next.expires_at_ms = now_ms.saturating_add(ttl_ms);
385 if let Err(err) =
386 self.publish_conditional(&next, ConditionalPut::IfVersion(o.version))
387 {
388 if matches!(
389 err,
390 LeaseError::Backend(BackendError::PreconditionFailed(_))
391 ) {
392 return Err(LeaseError::Stale {
393 attempted_holder: lease.holder_id.clone(),
394 attempted_generation: lease.generation,
395 observed: self.current(&lease.database_key)?,
396 });
397 }
398 return Err(err);
399 }
400 Ok(next)
401 }
402 other => Err(LeaseError::Stale {
403 attempted_holder: lease.holder_id.clone(),
404 attempted_generation: lease.generation,
405 observed: other.map(|v| v.lease),
406 }),
407 }
408 }
409
410 pub fn refresh_for_term(
420 &self,
421 lease: &WriterLease,
422 ttl_ms: u64,
423 current_term: u64,
424 ) -> Result<WriterLease, LeaseError> {
425 if lease.fenced_by_term(current_term) {
426 return Err(LeaseError::Fenced {
427 attempted_holder: lease.holder_id.clone(),
428 attempted_term: lease.term,
429 current_term,
430 });
431 }
432 self.refresh(lease, ttl_ms)
433 }
434
435 pub fn release(&self, lease: &WriterLease) -> Result<(), LeaseError> {
439 let observed = self.current_versioned(&lease.database_key)?;
440 match observed {
441 Some(o)
442 if o.lease.holder_id == lease.holder_id
443 && o.lease.generation == lease.generation =>
444 {
445 let key = self.key_for(&lease.database_key);
446 if let Err(err) = self
447 .backend
448 .delete_conditional(&key, ConditionalDelete::IfVersion(o.version))
449 {
450 if matches!(err, BackendError::PreconditionFailed(_)) {
451 return Err(LeaseError::Stale {
452 attempted_holder: lease.holder_id.clone(),
453 attempted_generation: lease.generation,
454 observed: self.current(&lease.database_key)?,
455 });
456 }
457 return Err(err.into());
458 }
459 Ok(())
460 }
461 other => Err(LeaseError::Stale {
462 attempted_holder: lease.holder_id.clone(),
463 attempted_generation: lease.generation,
464 observed: other.map(|v| v.lease),
465 }),
466 }
467 }
468
469 fn publish_conditional(
470 &self,
471 lease: &WriterLease,
472 condition: ConditionalPut,
473 ) -> Result<BackendObjectVersion, LeaseError> {
474 let key = self.key_for(&lease.database_key);
475 let bytes = reddb_file::encode_serverless_writer_lease_json(lease)
476 .map_err(|err| LeaseError::Backend(BackendError::Internal(err.to_string())))?;
477 let temp = reddb_file::ServerlessWriterLeaseTempFile::new("write");
478 temp.write_bytes(&bytes)
479 .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
480 Ok(self
481 .backend
482 .upload_conditional(temp.path(), &key, condition)?)
483 }
484}
485
486fn decode_writer_lease(bytes: &[u8]) -> Result<WriterLease, LeaseError> {
487 reddb_file::decode_serverless_writer_lease_json(bytes)
488 .map_err(|err| LeaseError::InvalidFormat(err.to_string()))
489}
490
491#[cfg(test)]
492mod tests {
493 use super::*;
494 use crate::storage::backend::LocalBackend;
495 use std::path::Path;
496
497 fn store() -> LeaseStore {
498 LeaseStore::new(Arc::new(LocalBackend)).with_prefix(format!(
499 "{}/leases-test-{}",
500 std::env::temp_dir().to_string_lossy(),
501 std::time::SystemTime::now()
502 .duration_since(std::time::UNIX_EPOCH)
503 .unwrap()
504 .as_nanos()
505 ))
506 }
507
508 #[test]
509 fn first_acquire_assigns_generation_one() {
510 let s = store();
511 let lease = s.try_acquire("db", "writer-a", 60_000).unwrap();
512 assert_eq!(lease.generation, 1);
513 assert_eq!(lease.holder_id, "writer-a");
514 }
515
516 #[test]
517 fn second_holder_rejected_while_first_alive() {
518 let s = store();
519 let _ = s.try_acquire("db", "writer-a", 60_000).unwrap();
520 let err = s.try_acquire("db", "writer-b", 60_000).unwrap_err();
521 match err {
522 LeaseError::Held { current, .. } => {
523 assert_eq!(current.holder_id, "writer-a");
524 assert_eq!(current.generation, 1);
525 }
526 other => panic!("expected Held, got {other:?}"),
527 }
528 }
529
530 #[test]
531 fn expired_lease_is_poachable() {
532 let s = store();
533 let _ = s.try_acquire("db", "writer-a", 1).unwrap();
534 std::thread::sleep(std::time::Duration::from_millis(10));
535 let lease = s.try_acquire("db", "writer-b", 60_000).unwrap();
536 assert_eq!(lease.holder_id, "writer-b");
537 assert_eq!(
538 lease.generation, 2,
539 "generation must increment when poaching"
540 );
541 }
542
543 #[test]
544 fn release_clears_so_anyone_can_take_again() {
545 let s = store();
546 let lease = s.try_acquire("db", "writer-a", 60_000).unwrap();
547 s.release(&lease).unwrap();
548 let next = s.try_acquire("db", "writer-b", 60_000).unwrap();
551 assert_eq!(next.holder_id, "writer-b");
552 assert_eq!(next.generation, 1);
553 }
554
555 #[test]
556 fn refresh_extends_expiration_for_same_holder() {
557 let s = store();
558 let lease = s.try_acquire("db", "writer-a", 1_000).unwrap();
559 std::thread::sleep(std::time::Duration::from_millis(20));
560 let refreshed = s.refresh(&lease, 60_000).unwrap();
561 assert_eq!(refreshed.generation, lease.generation);
562 assert!(refreshed.expires_at_ms > lease.expires_at_ms);
563 }
564
565 #[test]
566 fn refresh_fails_when_someone_else_owns() {
567 let s = store();
568 let lease = s.try_acquire("db", "writer-a", 1).unwrap();
569 std::thread::sleep(std::time::Duration::from_millis(10));
570 let _ = s.try_acquire("db", "writer-b", 60_000).unwrap();
571 let err = s.refresh(&lease, 60_000).unwrap_err();
572 assert!(matches!(err, LeaseError::Stale { .. }));
573 }
574
575 #[test]
576 fn acquire_stamps_term_onto_lease() {
577 let s = store();
578 let lease = s.try_acquire_for_term("db", "writer-a", 60_000, 7).unwrap();
579 assert_eq!(lease.term, 7);
580 assert_eq!(lease.fencing_token(), (7, 1));
581 }
582
583 #[test]
584 fn legacy_lease_defaults_to_base_term() {
585 let s = store();
589 let lease = s.try_acquire("db", "writer-a", 60_000).unwrap();
590 assert_eq!(lease.term, crate::replication::DEFAULT_REPLICATION_TERM);
591 assert!(!lease.fenced_by_term(crate::replication::DEFAULT_REPLICATION_TERM));
592 }
593
594 #[test]
595 fn stale_term_contender_is_fenced_even_when_lease_expired() {
596 let s = store();
600 let _new_primary = s.try_acquire_for_term("db", "new-primary", 1, 5).unwrap();
601 std::thread::sleep(std::time::Duration::from_millis(10));
602 let err = s
603 .try_acquire_for_term("db", "ex-primary", 60_000, 4)
604 .unwrap_err();
605 match err {
606 LeaseError::Fenced {
607 attempted_term,
608 current_term,
609 ..
610 } => {
611 assert_eq!(attempted_term, 4);
612 assert_eq!(current_term, 5);
613 }
614 other => panic!("expected Fenced, got {other:?}"),
615 }
616 }
617
618 #[test]
619 fn same_or_higher_term_contender_may_poach_expired_lease() {
620 let s = store();
624 let _ = s.try_acquire_for_term("db", "old", 1, 5).unwrap();
625 std::thread::sleep(std::time::Duration::from_millis(10));
626 let lease = s.try_acquire_for_term("db", "new", 60_000, 6).unwrap();
627 assert_eq!(lease.holder_id, "new");
628 assert_eq!(lease.term, 6);
629 assert_eq!(lease.generation, 2, "poaching advances the generation");
630 }
631
632 #[test]
633 fn refresh_for_term_fails_closed_once_term_advances() {
634 let s = store();
638 let lease = s.try_acquire_for_term("db", "deposed", 60_000, 4).unwrap();
639 let err = s.refresh_for_term(&lease, 60_000, 5).unwrap_err();
640 match err {
641 LeaseError::Fenced {
642 attempted_holder,
643 attempted_term,
644 current_term,
645 } => {
646 assert_eq!(attempted_holder, "deposed");
647 assert_eq!(attempted_term, 4);
648 assert_eq!(current_term, 5);
649 }
650 other => panic!("expected Fenced, got {other:?}"),
651 }
652 }
653
654 #[test]
655 fn refresh_for_term_succeeds_while_term_holds() {
656 let s = store();
657 let lease = s.try_acquire_for_term("db", "primary", 1_000, 5).unwrap();
658 std::thread::sleep(std::time::Duration::from_millis(20));
659 let refreshed = s.refresh_for_term(&lease, 60_000, 5).unwrap();
660 assert_eq!(refreshed.term, 5);
661 assert!(refreshed.expires_at_ms > lease.expires_at_ms);
662 }
663
664 }