1use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::Arc;
44
45use crate::serde_json::{self, Value as JsonValue};
46use crate::storage::backend::{
47 AtomicRemoteBackend, BackendError, BackendObjectVersion, ConditionalDelete, ConditionalPut,
48};
49use serde_json::Map;
50
51static LEASE_TEMP_COUNTER: AtomicU64 = AtomicU64::new(0);
58
59fn lease_temp_path(kind: &str) -> std::path::PathBuf {
60 let unique = LEASE_TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
61 std::env::temp_dir().join(format!(
62 "reddb-lease-{kind}-{}-{}-{unique}.json",
63 std::process::id(),
64 crate::utils::now_unix_nanos()
65 ))
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
70pub struct WriterLease {
71 pub database_key: String,
72 pub holder_id: String,
73 pub generation: u64,
74 pub acquired_at_ms: u64,
75 pub expires_at_ms: u64,
76}
77
78impl WriterLease {
79 pub fn is_expired(&self, now_ms: u64) -> bool {
80 self.expires_at_ms <= now_ms
81 }
82
83 fn to_json(&self) -> JsonValue {
84 let mut object = Map::new();
85 object.insert(
86 "database_key".to_string(),
87 JsonValue::String(self.database_key.clone()),
88 );
89 object.insert(
90 "holder_id".to_string(),
91 JsonValue::String(self.holder_id.clone()),
92 );
93 object.insert(
94 "generation".to_string(),
95 JsonValue::Number(self.generation as f64),
96 );
97 object.insert(
98 "acquired_at_ms".to_string(),
99 JsonValue::Number(self.acquired_at_ms as f64),
100 );
101 object.insert(
102 "expires_at_ms".to_string(),
103 JsonValue::Number(self.expires_at_ms as f64),
104 );
105 JsonValue::Object(object)
106 }
107
108 fn from_json(value: &JsonValue) -> Result<Self, LeaseError> {
109 let obj = value
110 .as_object()
111 .ok_or_else(|| LeaseError::InvalidFormat("lease json is not an object".into()))?;
112 Ok(Self {
113 database_key: obj
114 .get("database_key")
115 .and_then(JsonValue::as_str)
116 .ok_or_else(|| LeaseError::InvalidFormat("missing database_key".into()))?
117 .to_string(),
118 holder_id: obj
119 .get("holder_id")
120 .and_then(JsonValue::as_str)
121 .ok_or_else(|| LeaseError::InvalidFormat("missing holder_id".into()))?
122 .to_string(),
123 generation: obj
124 .get("generation")
125 .and_then(JsonValue::as_u64)
126 .ok_or_else(|| LeaseError::InvalidFormat("missing generation".into()))?,
127 acquired_at_ms: obj
128 .get("acquired_at_ms")
129 .and_then(JsonValue::as_u64)
130 .ok_or_else(|| LeaseError::InvalidFormat("missing acquired_at_ms".into()))?,
131 expires_at_ms: obj
132 .get("expires_at_ms")
133 .and_then(JsonValue::as_u64)
134 .ok_or_else(|| LeaseError::InvalidFormat("missing expires_at_ms".into()))?,
135 })
136 }
137}
138
139#[derive(Debug)]
140pub enum LeaseError {
141 Backend(BackendError),
142 Held {
144 current: WriterLease,
145 now_ms: u64,
146 },
147 LostRace {
150 attempted_holder: String,
151 observed: WriterLease,
152 },
153 InvalidFormat(String),
154 Stale {
157 attempted_holder: String,
158 attempted_generation: u64,
159 observed: Option<WriterLease>,
160 },
161}
162
163impl std::fmt::Display for LeaseError {
164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165 match self {
166 Self::Backend(err) => write!(f, "lease backend error: {err}"),
167 Self::Held { current, now_ms } => {
168 write!(
169 f,
170 "lease for '{}' held by '{}' (gen {}, expires in {} ms)",
171 current.database_key,
172 current.holder_id,
173 current.generation,
174 current.expires_at_ms.saturating_sub(*now_ms)
175 )
176 }
177 Self::LostRace {
178 attempted_holder,
179 observed,
180 } => write!(
181 f,
182 "lost lease acquire race: '{}' tried to take '{}' but '{}' (gen {}) won",
183 attempted_holder, observed.database_key, observed.holder_id, observed.generation
184 ),
185 Self::InvalidFormat(msg) => write!(f, "invalid lease format: {msg}"),
186 Self::Stale {
187 attempted_holder,
188 attempted_generation,
189 observed,
190 } => match observed {
191 Some(o) => write!(
192 f,
193 "stale lease op: '{}' (gen {}) tried to act, but current is '{}' (gen {})",
194 attempted_holder, attempted_generation, o.holder_id, o.generation
195 ),
196 None => write!(
197 f,
198 "stale lease op: '{}' (gen {}) tried to act, but no lease present",
199 attempted_holder, attempted_generation
200 ),
201 },
202 }
203 }
204}
205
206impl std::error::Error for LeaseError {}
207
208impl From<BackendError> for LeaseError {
209 fn from(value: BackendError) -> Self {
210 Self::Backend(value)
211 }
212}
213
214struct VersionedLease {
215 lease: WriterLease,
216 version: BackendObjectVersion,
217}
218
219pub struct LeaseStore {
229 backend: Arc<dyn AtomicRemoteBackend>,
230 prefix: String,
231}
232
233impl LeaseStore {
234 pub fn new(backend: Arc<dyn AtomicRemoteBackend>) -> Self {
235 Self {
236 backend,
237 prefix: "leases/".to_string(),
238 }
239 }
240
241 pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
242 let p = prefix.into();
243 self.prefix = if p.ends_with('/') { p } else { format!("{p}/") };
244 self
245 }
246
247 fn key_for(&self, database_key: &str) -> String {
248 format!("{}{}.lease.json", self.prefix, database_key)
249 }
250
251 pub fn current(&self, database_key: &str) -> Result<Option<WriterLease>, LeaseError> {
254 self.read_lease(database_key)
255 }
256
257 fn read_lease(&self, database_key: &str) -> Result<Option<WriterLease>, LeaseError> {
258 let key = self.key_for(database_key);
259 let temp = lease_temp_path("read");
260 let downloaded = self.backend.download(&key, &temp)?;
261 if !downloaded {
262 return Ok(None);
263 }
264 let bytes = std::fs::read(&temp)
265 .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
266 let _ = std::fs::remove_file(&temp);
267 let json: JsonValue = serde_json::from_slice(&bytes)
268 .map_err(|err| LeaseError::InvalidFormat(format!("lease json parse: {err}")))?;
269 WriterLease::from_json(&json).map(Some)
270 }
271
272 fn current_versioned(&self, database_key: &str) -> Result<Option<VersionedLease>, LeaseError> {
273 let key = self.key_for(database_key);
274 let before = match self.backend.object_version(&key)? {
275 Some(version) => version,
276 None => return Ok(None),
277 };
278 let temp = lease_temp_path("read");
279 let downloaded = self.backend.download(&key, &temp)?;
280 if !downloaded {
281 return Ok(None);
282 }
283 let after = self.backend.object_version(&key)?;
284 if after.as_ref() != Some(&before) {
285 let _ = std::fs::remove_file(&temp);
286 return Err(LeaseError::Backend(BackendError::PreconditionFailed(
287 "lease object changed while being read".to_string(),
288 )));
289 }
290 let bytes = std::fs::read(&temp)
291 .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
292 let _ = std::fs::remove_file(&temp);
293 let json: JsonValue = serde_json::from_slice(&bytes)
294 .map_err(|err| LeaseError::InvalidFormat(format!("lease json parse: {err}")))?;
295 Ok(Some(VersionedLease {
296 lease: WriterLease::from_json(&json)?,
297 version: before,
298 }))
299 }
300
301 pub fn try_acquire(
308 &self,
309 database_key: &str,
310 holder_id: &str,
311 ttl_ms: u64,
312 ) -> Result<WriterLease, LeaseError> {
313 let now_ms = crate::utils::now_unix_millis();
314
315 let current = self.current_versioned(database_key)?;
316 let next_generation = match ¤t {
320 Some(c) if !c.lease.is_expired(now_ms) && c.lease.holder_id != holder_id => {
321 return Err(LeaseError::Held {
322 current: c.lease.clone(),
323 now_ms,
324 });
325 }
326 Some(c) => c.lease.generation.saturating_add(1),
327 None => 1,
328 };
329
330 let new_lease = WriterLease {
331 database_key: database_key.to_string(),
332 holder_id: holder_id.to_string(),
333 generation: next_generation,
334 acquired_at_ms: now_ms,
335 expires_at_ms: now_ms.saturating_add(ttl_ms),
336 };
337 let condition = match current {
338 Some(c) => ConditionalPut::IfVersion(c.version),
339 None => ConditionalPut::IfAbsent,
340 };
341 if let Err(err) = self.publish_conditional(&new_lease, condition) {
342 if matches!(
343 err,
344 LeaseError::Backend(BackendError::PreconditionFailed(_))
345 ) {
346 return self.acquire_race_error(database_key, holder_id, now_ms);
347 }
348 return Err(err);
349 }
350
351 match self.current(database_key)? {
353 Some(observed)
354 if observed.holder_id == holder_id
355 && observed.generation == new_lease.generation =>
356 {
357 Ok(new_lease)
358 }
359 Some(observed) => Err(LeaseError::LostRace {
360 attempted_holder: holder_id.to_string(),
361 observed,
362 }),
363 None => Err(LeaseError::LostRace {
364 attempted_holder: holder_id.to_string(),
365 observed: WriterLease {
366 database_key: database_key.to_string(),
367 holder_id: "<missing>".to_string(),
368 generation: 0,
369 acquired_at_ms: 0,
370 expires_at_ms: 0,
371 },
372 }),
373 }
374 }
375
376 fn acquire_race_error(
377 &self,
378 database_key: &str,
379 holder_id: &str,
380 now_ms: u64,
381 ) -> Result<WriterLease, LeaseError> {
382 match self.current(database_key)? {
383 Some(observed) if !observed.is_expired(now_ms) && observed.holder_id != holder_id => {
384 Err(LeaseError::Held {
385 current: observed,
386 now_ms,
387 })
388 }
389 Some(observed) => Err(LeaseError::LostRace {
390 attempted_holder: holder_id.to_string(),
391 observed,
392 }),
393 None => Err(LeaseError::LostRace {
394 attempted_holder: holder_id.to_string(),
395 observed: WriterLease {
396 database_key: database_key.to_string(),
397 holder_id: "<missing>".to_string(),
398 generation: 0,
399 acquired_at_ms: 0,
400 expires_at_ms: 0,
401 },
402 }),
403 }
404 }
405
406 pub fn refresh(&self, lease: &WriterLease, ttl_ms: u64) -> Result<WriterLease, LeaseError> {
411 let now_ms = crate::utils::now_unix_millis();
412 let observed = self.current_versioned(&lease.database_key)?;
413 match observed {
414 Some(o)
415 if o.lease.holder_id == lease.holder_id
416 && o.lease.generation == lease.generation =>
417 {
418 let mut next = lease.clone();
419 next.expires_at_ms = now_ms.saturating_add(ttl_ms);
420 if let Err(err) =
421 self.publish_conditional(&next, ConditionalPut::IfVersion(o.version))
422 {
423 if matches!(
424 err,
425 LeaseError::Backend(BackendError::PreconditionFailed(_))
426 ) {
427 return Err(LeaseError::Stale {
428 attempted_holder: lease.holder_id.clone(),
429 attempted_generation: lease.generation,
430 observed: self.current(&lease.database_key)?,
431 });
432 }
433 return Err(err);
434 }
435 Ok(next)
436 }
437 other => Err(LeaseError::Stale {
438 attempted_holder: lease.holder_id.clone(),
439 attempted_generation: lease.generation,
440 observed: other.map(|v| v.lease),
441 }),
442 }
443 }
444
445 pub fn release(&self, lease: &WriterLease) -> Result<(), LeaseError> {
449 let observed = self.current_versioned(&lease.database_key)?;
450 match observed {
451 Some(o)
452 if o.lease.holder_id == lease.holder_id
453 && o.lease.generation == lease.generation =>
454 {
455 let key = self.key_for(&lease.database_key);
456 if let Err(err) = self
457 .backend
458 .delete_conditional(&key, ConditionalDelete::IfVersion(o.version))
459 {
460 if matches!(err, BackendError::PreconditionFailed(_)) {
461 return Err(LeaseError::Stale {
462 attempted_holder: lease.holder_id.clone(),
463 attempted_generation: lease.generation,
464 observed: self.current(&lease.database_key)?,
465 });
466 }
467 return Err(err.into());
468 }
469 Ok(())
470 }
471 other => Err(LeaseError::Stale {
472 attempted_holder: lease.holder_id.clone(),
473 attempted_generation: lease.generation,
474 observed: other.map(|v| v.lease),
475 }),
476 }
477 }
478
479 fn publish_conditional(
480 &self,
481 lease: &WriterLease,
482 condition: ConditionalPut,
483 ) -> Result<BackendObjectVersion, LeaseError> {
484 let key = self.key_for(&lease.database_key);
485 let json = lease.to_json();
486 let bytes = serde_json::to_vec(&json).map_err(|err| {
487 LeaseError::Backend(BackendError::Internal(format!("serialize lease: {err}")))
488 })?;
489 let temp = lease_temp_path("write");
490 std::fs::write(&temp, &bytes)
491 .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
492 let res = self.backend.upload_conditional(&temp, &key, condition);
493 let _ = std::fs::remove_file(&temp);
494 Ok(res?)
495 }
496}
497
498#[cfg(test)]
499mod tests {
500 use super::*;
501 use crate::storage::backend::LocalBackend;
502 use std::path::Path;
503
504 fn store() -> LeaseStore {
505 LeaseStore::new(Arc::new(LocalBackend)).with_prefix(format!(
506 "{}/leases-test-{}",
507 std::env::temp_dir().to_string_lossy(),
508 std::time::SystemTime::now()
509 .duration_since(std::time::UNIX_EPOCH)
510 .unwrap()
511 .as_nanos()
512 ))
513 }
514
515 #[test]
516 fn first_acquire_assigns_generation_one() {
517 let s = store();
518 let lease = s.try_acquire("db", "writer-a", 60_000).unwrap();
519 assert_eq!(lease.generation, 1);
520 assert_eq!(lease.holder_id, "writer-a");
521 }
522
523 #[test]
524 fn second_holder_rejected_while_first_alive() {
525 let s = store();
526 let _ = s.try_acquire("db", "writer-a", 60_000).unwrap();
527 let err = s.try_acquire("db", "writer-b", 60_000).unwrap_err();
528 match err {
529 LeaseError::Held { current, .. } => {
530 assert_eq!(current.holder_id, "writer-a");
531 assert_eq!(current.generation, 1);
532 }
533 other => panic!("expected Held, got {other:?}"),
534 }
535 }
536
537 #[test]
538 fn expired_lease_is_poachable() {
539 let s = store();
540 let _ = s.try_acquire("db", "writer-a", 1).unwrap();
541 std::thread::sleep(std::time::Duration::from_millis(10));
542 let lease = s.try_acquire("db", "writer-b", 60_000).unwrap();
543 assert_eq!(lease.holder_id, "writer-b");
544 assert_eq!(
545 lease.generation, 2,
546 "generation must increment when poaching"
547 );
548 }
549
550 #[test]
551 fn release_clears_so_anyone_can_take_again() {
552 let s = store();
553 let lease = s.try_acquire("db", "writer-a", 60_000).unwrap();
554 s.release(&lease).unwrap();
555 let next = s.try_acquire("db", "writer-b", 60_000).unwrap();
558 assert_eq!(next.holder_id, "writer-b");
559 assert_eq!(next.generation, 1);
560 }
561
562 #[test]
563 fn refresh_extends_expiration_for_same_holder() {
564 let s = store();
565 let lease = s.try_acquire("db", "writer-a", 1_000).unwrap();
566 std::thread::sleep(std::time::Duration::from_millis(20));
567 let refreshed = s.refresh(&lease, 60_000).unwrap();
568 assert_eq!(refreshed.generation, lease.generation);
569 assert!(refreshed.expires_at_ms > lease.expires_at_ms);
570 }
571
572 #[test]
573 fn refresh_fails_when_someone_else_owns() {
574 let s = store();
575 let lease = s.try_acquire("db", "writer-a", 1).unwrap();
576 std::thread::sleep(std::time::Duration::from_millis(10));
577 let _ = s.try_acquire("db", "writer-b", 60_000).unwrap();
578 let err = s.refresh(&lease, 60_000).unwrap_err();
579 assert!(matches!(err, LeaseError::Stale { .. }));
580 }
581
582 }