Skip to main content

reddb_server/replication/
lease.rs

1//! Serverless writer lease (PLAN.md Phase 5 / W6).
2//!
3//! Multiple RedDB instances can attach to the same remote-backed
4//! database key. To prevent two of them from concurrently mutating the
5//! same remote artifacts (snapshots, WAL segments, head manifest),
6//! exactly one of them must hold a *writer lease*. Other instances may
7//! still attach as read-only replicas without acquiring a lease.
8//!
9//! ## Wire format
10//!
11//! The lease is serialized as JSON under
12//! `leases/{database_key}.lease.json` on the remote backend:
13//!
14//! ```json
15//! {
16//!   "database_key": "main",
17//!   "holder_id": "instance-uuid",
18//!   "generation": 7,
19//!   "acquired_at_ms": 1730000000000,
20//!   "expires_at_ms":  1730000060000
21//! }
22//! ```
23//!
24//! - `generation` increments every time a different holder acquires
25//!   the key. The holder stamps its uploads with the generation so a
26//!   stale writer (whose lease was poached because it expired) can be
27//!   detected during reclaim by reading the current lease and
28//!   comparing.
29//! - `expires_at_ms` is wall-clock millis since UNIX epoch. A holder
30//!   refreshes it periodically; a contender treats anything past it as
31//!   poachable.
32//!
33//! ## Atomicity contract
34//!
35//! Lease mutation requires backend-side compare-and-swap. Backends
36//! advertise this through `RemoteBackend::supports_conditional_writes`
37//! and implement object version tokens + conditional writes/deletes.
38//! A backend that cannot enforce `IfAbsent` / `IfVersion` fails
39//! closed before the instance is allowed to write. This keeps
40//! serverless fencing out of "last writer wins" territory.
41
42use std::sync::atomic::{AtomicU64, Ordering};
43use std::sync::Arc;
44
45use crate::serde_json::{self, Value as JsonValue};
46use crate::storage::backend::{
47    AtomicRemoteBackend, BackendError, BackendObjectVersion, ConditionalDelete, ConditionalPut,
48};
49use serde_json::Map;
50
51/// Per-process monotonic counter that disambiguates lease temp files
52/// when multiple threads sample `now_unix_nanos()` within the same
53/// nanosecond (observed on virtualised CI runners with coarse
54/// clocks). Without this, two writers could share the same temp path,
55/// and the CAS holder would publish the *other* writer's bytes,
56/// producing a spurious `LostRace` for the apparent winner.
57static LEASE_TEMP_COUNTER: AtomicU64 = AtomicU64::new(0);
58
59fn lease_temp_path(kind: &str) -> std::path::PathBuf {
60    let unique = LEASE_TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
61    std::env::temp_dir().join(format!(
62        "reddb-lease-{kind}-{}-{}-{unique}.json",
63        std::process::id(),
64        crate::utils::now_unix_nanos()
65    ))
66}
67
68/// One snapshot of who owns the writer lease for a database key.
69#[derive(Debug, Clone, PartialEq, Eq)]
70pub struct WriterLease {
71    pub database_key: String,
72    pub holder_id: String,
73    pub generation: u64,
74    pub acquired_at_ms: u64,
75    pub expires_at_ms: u64,
76}
77
78impl WriterLease {
79    pub fn is_expired(&self, now_ms: u64) -> bool {
80        self.expires_at_ms <= now_ms
81    }
82
83    fn to_json(&self) -> JsonValue {
84        let mut object = Map::new();
85        object.insert(
86            "database_key".to_string(),
87            JsonValue::String(self.database_key.clone()),
88        );
89        object.insert(
90            "holder_id".to_string(),
91            JsonValue::String(self.holder_id.clone()),
92        );
93        object.insert(
94            "generation".to_string(),
95            JsonValue::Number(self.generation as f64),
96        );
97        object.insert(
98            "acquired_at_ms".to_string(),
99            JsonValue::Number(self.acquired_at_ms as f64),
100        );
101        object.insert(
102            "expires_at_ms".to_string(),
103            JsonValue::Number(self.expires_at_ms as f64),
104        );
105        JsonValue::Object(object)
106    }
107
108    fn from_json(value: &JsonValue) -> Result<Self, LeaseError> {
109        let obj = value
110            .as_object()
111            .ok_or_else(|| LeaseError::InvalidFormat("lease json is not an object".into()))?;
112        Ok(Self {
113            database_key: obj
114                .get("database_key")
115                .and_then(JsonValue::as_str)
116                .ok_or_else(|| LeaseError::InvalidFormat("missing database_key".into()))?
117                .to_string(),
118            holder_id: obj
119                .get("holder_id")
120                .and_then(JsonValue::as_str)
121                .ok_or_else(|| LeaseError::InvalidFormat("missing holder_id".into()))?
122                .to_string(),
123            generation: obj
124                .get("generation")
125                .and_then(JsonValue::as_u64)
126                .ok_or_else(|| LeaseError::InvalidFormat("missing generation".into()))?,
127            acquired_at_ms: obj
128                .get("acquired_at_ms")
129                .and_then(JsonValue::as_u64)
130                .ok_or_else(|| LeaseError::InvalidFormat("missing acquired_at_ms".into()))?,
131            expires_at_ms: obj
132                .get("expires_at_ms")
133                .and_then(JsonValue::as_u64)
134                .ok_or_else(|| LeaseError::InvalidFormat("missing expires_at_ms".into()))?,
135        })
136    }
137}
138
139#[derive(Debug)]
140pub enum LeaseError {
141    Backend(BackendError),
142    /// A different holder owns a non-expired lease.
143    Held {
144        current: WriterLease,
145        now_ms: u64,
146    },
147    /// We uploaded a fresh lease but a re-read shows a different holder
148    /// or generation, so we lost a concurrent acquire race.
149    LostRace {
150        attempted_holder: String,
151        observed: WriterLease,
152    },
153    InvalidFormat(String),
154    /// The release/refresh target no longer matches what's on the
155    /// backend (lease was already poached or removed).
156    Stale {
157        attempted_holder: String,
158        attempted_generation: u64,
159        observed: Option<WriterLease>,
160    },
161}
162
163impl std::fmt::Display for LeaseError {
164    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
165        match self {
166            Self::Backend(err) => write!(f, "lease backend error: {err}"),
167            Self::Held { current, now_ms } => {
168                write!(
169                    f,
170                    "lease for '{}' held by '{}' (gen {}, expires in {} ms)",
171                    current.database_key,
172                    current.holder_id,
173                    current.generation,
174                    current.expires_at_ms.saturating_sub(*now_ms)
175                )
176            }
177            Self::LostRace {
178                attempted_holder,
179                observed,
180            } => write!(
181                f,
182                "lost lease acquire race: '{}' tried to take '{}' but '{}' (gen {}) won",
183                attempted_holder, observed.database_key, observed.holder_id, observed.generation
184            ),
185            Self::InvalidFormat(msg) => write!(f, "invalid lease format: {msg}"),
186            Self::Stale {
187                attempted_holder,
188                attempted_generation,
189                observed,
190            } => match observed {
191                Some(o) => write!(
192                    f,
193                    "stale lease op: '{}' (gen {}) tried to act, but current is '{}' (gen {})",
194                    attempted_holder, attempted_generation, o.holder_id, o.generation
195                ),
196                None => write!(
197                    f,
198                    "stale lease op: '{}' (gen {}) tried to act, but no lease present",
199                    attempted_holder, attempted_generation
200                ),
201            },
202        }
203    }
204}
205
206impl std::error::Error for LeaseError {}
207
208impl From<BackendError> for LeaseError {
209    fn from(value: BackendError) -> Self {
210        Self::Backend(value)
211    }
212}
213
214struct VersionedLease {
215    lease: WriterLease,
216    version: BackendObjectVersion,
217}
218
219/// Wraps an `AtomicRemoteBackend` with lease primitives. The lease
220/// object is stored under a deterministic key derived from
221/// `database_key`; the store reads/writes that one key.
222///
223/// The trait bound `AtomicRemoteBackend` is the type-system version
224/// of "this backend can enforce CAS" — backends that cannot
225/// (Turso, D1, plain HTTP without ETag) deliberately do not
226/// implement the trait, so wiring them into a `LeaseStore` becomes
227/// a compile error rather than a runtime fail-closed.
228pub struct LeaseStore {
229    backend: Arc<dyn AtomicRemoteBackend>,
230    prefix: String,
231}
232
233impl LeaseStore {
234    pub fn new(backend: Arc<dyn AtomicRemoteBackend>) -> Self {
235        Self {
236            backend,
237            prefix: "leases/".to_string(),
238        }
239    }
240
241    pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
242        let p = prefix.into();
243        self.prefix = if p.ends_with('/') { p } else { format!("{p}/") };
244        self
245    }
246
247    fn key_for(&self, database_key: &str) -> String {
248        format!("{}{}.lease.json", self.prefix, database_key)
249    }
250
251    /// Read whatever lease object is currently published. `None` means
252    /// no lease has ever been written for this key.
253    pub fn current(&self, database_key: &str) -> Result<Option<WriterLease>, LeaseError> {
254        self.read_lease(database_key)
255    }
256
257    fn read_lease(&self, database_key: &str) -> Result<Option<WriterLease>, LeaseError> {
258        let key = self.key_for(database_key);
259        let temp = lease_temp_path("read");
260        let downloaded = self.backend.download(&key, &temp)?;
261        if !downloaded {
262            return Ok(None);
263        }
264        let bytes = std::fs::read(&temp)
265            .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
266        let _ = std::fs::remove_file(&temp);
267        let json: JsonValue = serde_json::from_slice(&bytes)
268            .map_err(|err| LeaseError::InvalidFormat(format!("lease json parse: {err}")))?;
269        WriterLease::from_json(&json).map(Some)
270    }
271
272    fn current_versioned(&self, database_key: &str) -> Result<Option<VersionedLease>, LeaseError> {
273        let key = self.key_for(database_key);
274        let before = match self.backend.object_version(&key)? {
275            Some(version) => version,
276            None => return Ok(None),
277        };
278        let temp = lease_temp_path("read");
279        let downloaded = self.backend.download(&key, &temp)?;
280        if !downloaded {
281            return Ok(None);
282        }
283        let after = self.backend.object_version(&key)?;
284        if after.as_ref() != Some(&before) {
285            let _ = std::fs::remove_file(&temp);
286            return Err(LeaseError::Backend(BackendError::PreconditionFailed(
287                "lease object changed while being read".to_string(),
288            )));
289        }
290        let bytes = std::fs::read(&temp)
291            .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
292        let _ = std::fs::remove_file(&temp);
293        let json: JsonValue = serde_json::from_slice(&bytes)
294            .map_err(|err| LeaseError::InvalidFormat(format!("lease json parse: {err}")))?;
295        Ok(Some(VersionedLease {
296            lease: WriterLease::from_json(&json)?,
297            version: before,
298        }))
299    }
300
301    /// Try to acquire the lease for `database_key` on behalf of
302    /// `holder_id`, valid for `ttl_ms`. Returns the `WriterLease` we
303    /// own on success. Errors:
304    /// - `LeaseError::Held` if a different holder owns a non-expired
305    ///   lease.
306    /// - `LeaseError::LostRace` if a concurrent contender beat us.
307    pub fn try_acquire(
308        &self,
309        database_key: &str,
310        holder_id: &str,
311        ttl_ms: u64,
312    ) -> Result<WriterLease, LeaseError> {
313        let now_ms = crate::utils::now_unix_millis();
314
315        let current = self.current_versioned(database_key)?;
316        // If a healthy lease exists held by someone else, refuse
317        // immediately. Two cases collapse: either the current holder
318        // is us (refresh) or it's somebody else with time left.
319        let next_generation = match &current {
320            Some(c) if !c.lease.is_expired(now_ms) && c.lease.holder_id != holder_id => {
321                return Err(LeaseError::Held {
322                    current: c.lease.clone(),
323                    now_ms,
324                });
325            }
326            Some(c) => c.lease.generation.saturating_add(1),
327            None => 1,
328        };
329
330        let new_lease = WriterLease {
331            database_key: database_key.to_string(),
332            holder_id: holder_id.to_string(),
333            generation: next_generation,
334            acquired_at_ms: now_ms,
335            expires_at_ms: now_ms.saturating_add(ttl_ms),
336        };
337        let condition = match current {
338            Some(c) => ConditionalPut::IfVersion(c.version),
339            None => ConditionalPut::IfAbsent,
340        };
341        if let Err(err) = self.publish_conditional(&new_lease, condition) {
342            if matches!(
343                err,
344                LeaseError::Backend(BackendError::PreconditionFailed(_))
345            ) {
346                return self.acquire_race_error(database_key, holder_id, now_ms);
347            }
348            return Err(err);
349        }
350
351        // Re-read and verify nobody else won the same gap.
352        match self.current(database_key)? {
353            Some(observed)
354                if observed.holder_id == holder_id
355                    && observed.generation == new_lease.generation =>
356            {
357                Ok(new_lease)
358            }
359            Some(observed) => Err(LeaseError::LostRace {
360                attempted_holder: holder_id.to_string(),
361                observed,
362            }),
363            None => Err(LeaseError::LostRace {
364                attempted_holder: holder_id.to_string(),
365                observed: WriterLease {
366                    database_key: database_key.to_string(),
367                    holder_id: "<missing>".to_string(),
368                    generation: 0,
369                    acquired_at_ms: 0,
370                    expires_at_ms: 0,
371                },
372            }),
373        }
374    }
375
376    fn acquire_race_error(
377        &self,
378        database_key: &str,
379        holder_id: &str,
380        now_ms: u64,
381    ) -> Result<WriterLease, LeaseError> {
382        match self.current(database_key)? {
383            Some(observed) if !observed.is_expired(now_ms) && observed.holder_id != holder_id => {
384                Err(LeaseError::Held {
385                    current: observed,
386                    now_ms,
387                })
388            }
389            Some(observed) => Err(LeaseError::LostRace {
390                attempted_holder: holder_id.to_string(),
391                observed,
392            }),
393            None => Err(LeaseError::LostRace {
394                attempted_holder: holder_id.to_string(),
395                observed: WriterLease {
396                    database_key: database_key.to_string(),
397                    holder_id: "<missing>".to_string(),
398                    generation: 0,
399                    acquired_at_ms: 0,
400                    expires_at_ms: 0,
401                },
402            }),
403        }
404    }
405
406    /// Refresh `lease.expires_at_ms` to `now + ttl_ms`. Fails with
407    /// `Stale` if the holder/generation no longer matches what's
408    /// currently published. The returned lease is the new
409    /// in-effect record.
410    pub fn refresh(&self, lease: &WriterLease, ttl_ms: u64) -> Result<WriterLease, LeaseError> {
411        let now_ms = crate::utils::now_unix_millis();
412        let observed = self.current_versioned(&lease.database_key)?;
413        match observed {
414            Some(o)
415                if o.lease.holder_id == lease.holder_id
416                    && o.lease.generation == lease.generation =>
417            {
418                let mut next = lease.clone();
419                next.expires_at_ms = now_ms.saturating_add(ttl_ms);
420                if let Err(err) =
421                    self.publish_conditional(&next, ConditionalPut::IfVersion(o.version))
422                {
423                    if matches!(
424                        err,
425                        LeaseError::Backend(BackendError::PreconditionFailed(_))
426                    ) {
427                        return Err(LeaseError::Stale {
428                            attempted_holder: lease.holder_id.clone(),
429                            attempted_generation: lease.generation,
430                            observed: self.current(&lease.database_key)?,
431                        });
432                    }
433                    return Err(err);
434                }
435                Ok(next)
436            }
437            other => Err(LeaseError::Stale {
438                attempted_holder: lease.holder_id.clone(),
439                attempted_generation: lease.generation,
440                observed: other.map(|v| v.lease),
441            }),
442        }
443    }
444
445    /// Release the lease. Only succeeds when the published lease
446    /// matches `lease.holder_id + lease.generation`. A stolen or
447    /// already-replaced lease returns `Stale`.
448    pub fn release(&self, lease: &WriterLease) -> Result<(), LeaseError> {
449        let observed = self.current_versioned(&lease.database_key)?;
450        match observed {
451            Some(o)
452                if o.lease.holder_id == lease.holder_id
453                    && o.lease.generation == lease.generation =>
454            {
455                let key = self.key_for(&lease.database_key);
456                if let Err(err) = self
457                    .backend
458                    .delete_conditional(&key, ConditionalDelete::IfVersion(o.version))
459                {
460                    if matches!(err, BackendError::PreconditionFailed(_)) {
461                        return Err(LeaseError::Stale {
462                            attempted_holder: lease.holder_id.clone(),
463                            attempted_generation: lease.generation,
464                            observed: self.current(&lease.database_key)?,
465                        });
466                    }
467                    return Err(err.into());
468                }
469                Ok(())
470            }
471            other => Err(LeaseError::Stale {
472                attempted_holder: lease.holder_id.clone(),
473                attempted_generation: lease.generation,
474                observed: other.map(|v| v.lease),
475            }),
476        }
477    }
478
479    fn publish_conditional(
480        &self,
481        lease: &WriterLease,
482        condition: ConditionalPut,
483    ) -> Result<BackendObjectVersion, LeaseError> {
484        let key = self.key_for(&lease.database_key);
485        let json = lease.to_json();
486        let bytes = serde_json::to_vec(&json).map_err(|err| {
487            LeaseError::Backend(BackendError::Internal(format!("serialize lease: {err}")))
488        })?;
489        let temp = lease_temp_path("write");
490        std::fs::write(&temp, &bytes)
491            .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
492        let res = self.backend.upload_conditional(&temp, &key, condition);
493        let _ = std::fs::remove_file(&temp);
494        Ok(res?)
495    }
496}
497
498#[cfg(test)]
499mod tests {
500    use super::*;
501    use crate::storage::backend::LocalBackend;
502    use std::path::Path;
503
504    fn store() -> LeaseStore {
505        LeaseStore::new(Arc::new(LocalBackend)).with_prefix(format!(
506            "{}/leases-test-{}",
507            std::env::temp_dir().to_string_lossy(),
508            std::time::SystemTime::now()
509                .duration_since(std::time::UNIX_EPOCH)
510                .unwrap()
511                .as_nanos()
512        ))
513    }
514
515    #[test]
516    fn first_acquire_assigns_generation_one() {
517        let s = store();
518        let lease = s.try_acquire("db", "writer-a", 60_000).unwrap();
519        assert_eq!(lease.generation, 1);
520        assert_eq!(lease.holder_id, "writer-a");
521    }
522
523    #[test]
524    fn second_holder_rejected_while_first_alive() {
525        let s = store();
526        let _ = s.try_acquire("db", "writer-a", 60_000).unwrap();
527        let err = s.try_acquire("db", "writer-b", 60_000).unwrap_err();
528        match err {
529            LeaseError::Held { current, .. } => {
530                assert_eq!(current.holder_id, "writer-a");
531                assert_eq!(current.generation, 1);
532            }
533            other => panic!("expected Held, got {other:?}"),
534        }
535    }
536
537    #[test]
538    fn expired_lease_is_poachable() {
539        let s = store();
540        let _ = s.try_acquire("db", "writer-a", 1).unwrap();
541        std::thread::sleep(std::time::Duration::from_millis(10));
542        let lease = s.try_acquire("db", "writer-b", 60_000).unwrap();
543        assert_eq!(lease.holder_id, "writer-b");
544        assert_eq!(
545            lease.generation, 2,
546            "generation must increment when poaching"
547        );
548    }
549
550    #[test]
551    fn release_clears_so_anyone_can_take_again() {
552        let s = store();
553        let lease = s.try_acquire("db", "writer-a", 60_000).unwrap();
554        s.release(&lease).unwrap();
555        // After release the slot is empty — generation resets to 1
556        // because the previous record is gone.
557        let next = s.try_acquire("db", "writer-b", 60_000).unwrap();
558        assert_eq!(next.holder_id, "writer-b");
559        assert_eq!(next.generation, 1);
560    }
561
562    #[test]
563    fn refresh_extends_expiration_for_same_holder() {
564        let s = store();
565        let lease = s.try_acquire("db", "writer-a", 1_000).unwrap();
566        std::thread::sleep(std::time::Duration::from_millis(20));
567        let refreshed = s.refresh(&lease, 60_000).unwrap();
568        assert_eq!(refreshed.generation, lease.generation);
569        assert!(refreshed.expires_at_ms > lease.expires_at_ms);
570    }
571
572    #[test]
573    fn refresh_fails_when_someone_else_owns() {
574        let s = store();
575        let lease = s.try_acquire("db", "writer-a", 1).unwrap();
576        std::thread::sleep(std::time::Duration::from_millis(10));
577        let _ = s.try_acquire("db", "writer-b", 60_000).unwrap();
578        let err = s.refresh(&lease, 60_000).unwrap_err();
579        assert!(matches!(err, LeaseError::Stale { .. }));
580    }
581
582    // The legacy `acquire_fails_closed_without_backend_conditional_writes`
583    // test was deleted with the trait split: `LeaseStore::new` now requires
584    // `Arc<dyn AtomicRemoteBackend>`, so a non-CAS backend cannot even be
585    // wired into the constructor — the test is enforced at compile time
586    // (see tests/lease_atomic_http_opt_in.rs for the runtime-config branch).
587}