Skip to main content

reddb_server/replication/
lease.rs

1//! Serverless writer lease (PLAN.md Phase 5 / W6).
2//!
3//! Multiple RedDB instances can attach to the same remote-backed
4//! database key. To prevent two of them from concurrently mutating the
5//! same remote artifacts (snapshots, WAL segments, head manifest),
6//! exactly one of them must hold a *writer lease*. Other instances may
7//! still attach as read-only replicas without acquiring a lease.
8//!
9//! ## Wire format
10//!
11//! The lease is serialized as JSON under
12//! `leases/{database_key}.lease.json` on the remote backend:
13//!
14//! ```json
15//! {
16//!   "database_key": "main",
17//!   "holder_id": "instance-uuid",
18//!   "term": 3,
19//!   "generation": 7,
20//!   "acquired_at_ms": 1730000000000,
21//!   "expires_at_ms":  1730000060000
22//! }
23//! ```
24//!
25//! - `generation` increments every time a different holder acquires
26//!   the key. The holder stamps its uploads with the generation so a
27//!   stale writer (whose lease was poached because it expired) can be
28//!   detected during reclaim by reading the current lease and
29//!   comparing.
30//! - `term` is the replication term the holder acquired under (issue
31//!   #835, ADR 0030). A contender on a term *behind* the published
32//!   lease's term is a deposed primary and is fenced
33//!   (`LeaseError::Fenced`) — even if the lease has expired and would
34//!   otherwise be poachable. Legacy lease objects without a `term`
35//!   field decode as `DEFAULT_REPLICATION_TERM`.
36//! - `expires_at_ms` is wall-clock millis since UNIX epoch. A holder
37//!   refreshes it periodically; a contender treats anything past it as
38//!   poachable.
39//!
40//! ## Atomicity contract
41//!
42//! Lease mutation requires backend-side compare-and-swap. Backends
43//! advertise this through `RemoteBackend::supports_conditional_writes`
44//! and implement object version tokens + conditional writes/deletes.
45//! A backend that cannot enforce `IfAbsent` / `IfVersion` fails
46//! closed before the instance is allowed to write. This keeps
47//! serverless fencing out of "last writer wins" territory.
48
49use std::sync::atomic::{AtomicU64, Ordering};
50use std::sync::Arc;
51
52use crate::serde_json::{self, Value as JsonValue};
53use crate::storage::backend::{
54    AtomicRemoteBackend, BackendError, BackendObjectVersion, ConditionalDelete, ConditionalPut,
55};
56use serde_json::Map;
57
58/// Per-process monotonic counter that disambiguates lease temp files
59/// when multiple threads sample `now_unix_nanos()` within the same
60/// nanosecond (observed on virtualised CI runners with coarse
61/// clocks). Without this, two writers could share the same temp path,
62/// and the CAS holder would publish the *other* writer's bytes,
63/// producing a spurious `LostRace` for the apparent winner.
64static LEASE_TEMP_COUNTER: AtomicU64 = AtomicU64::new(0);
65
66fn lease_temp_path(kind: &str) -> std::path::PathBuf {
67    let unique = LEASE_TEMP_COUNTER.fetch_add(1, Ordering::Relaxed);
68    std::env::temp_dir().join(format!(
69        "reddb-lease-{kind}-{}-{}-{unique}.json",
70        std::process::id(),
71        crate::utils::now_unix_nanos()
72    ))
73}
74
75/// One snapshot of who owns the writer lease for a database key.
76#[derive(Debug, Clone, PartialEq, Eq)]
77pub struct WriterLease {
78    pub database_key: String,
79    pub holder_id: String,
80    /// Replication term the holder acquired the lease under (issue #835,
81    /// ADR 0030). Tying the lease to the term is what makes a deposed
82    /// primary fail closed: once a new primary acquires the lease at a
83    /// higher term, the old holder's term is behind and every term-gated
84    /// op it attempts is fenced. Defaults to
85    /// [`DEFAULT_REPLICATION_TERM`](crate::replication::DEFAULT_REPLICATION_TERM)
86    /// when read from a legacy (pre-#835) lease object that did not carry
87    /// a term, so older lease files stay readable.
88    pub term: u64,
89    pub generation: u64,
90    pub acquired_at_ms: u64,
91    pub expires_at_ms: u64,
92}
93
94impl WriterLease {
95    pub fn is_expired(&self, now_ms: u64) -> bool {
96        self.expires_at_ms <= now_ms
97    }
98
99    /// Is this lease fenced by `current_term`? A holder whose lease was
100    /// stamped under a term *behind* the cluster's current term is a
101    /// stale writer from a superseded timeline (issue #835) — it must
102    /// fail closed rather than keep mutating the new timeline.
103    pub fn fenced_by_term(&self, current_term: u64) -> bool {
104        self.term < current_term
105    }
106
107    /// The monotonic fencing token `(term, generation)`. Both components
108    /// advance forward across a legitimate handover (a new primary wins a
109    /// higher term *and* takes a fresh lease generation), so a stale
110    /// holder is ordered strictly behind on both axes (ADR 0030).
111    pub fn fencing_token(&self) -> (u64, u64) {
112        (self.term, self.generation)
113    }
114
115    fn to_json(&self) -> JsonValue {
116        let mut object = Map::new();
117        object.insert(
118            "database_key".to_string(),
119            JsonValue::String(self.database_key.clone()),
120        );
121        object.insert(
122            "holder_id".to_string(),
123            JsonValue::String(self.holder_id.clone()),
124        );
125        object.insert("term".to_string(), JsonValue::Number(self.term as f64));
126        object.insert(
127            "generation".to_string(),
128            JsonValue::Number(self.generation as f64),
129        );
130        object.insert(
131            "acquired_at_ms".to_string(),
132            JsonValue::Number(self.acquired_at_ms as f64),
133        );
134        object.insert(
135            "expires_at_ms".to_string(),
136            JsonValue::Number(self.expires_at_ms as f64),
137        );
138        JsonValue::Object(object)
139    }
140
141    fn from_json(value: &JsonValue) -> Result<Self, LeaseError> {
142        let obj = value
143            .as_object()
144            .ok_or_else(|| LeaseError::InvalidFormat("lease json is not an object".into()))?;
145        Ok(Self {
146            database_key: obj
147                .get("database_key")
148                .and_then(JsonValue::as_str)
149                .ok_or_else(|| LeaseError::InvalidFormat("missing database_key".into()))?
150                .to_string(),
151            holder_id: obj
152                .get("holder_id")
153                .and_then(JsonValue::as_str)
154                .ok_or_else(|| LeaseError::InvalidFormat("missing holder_id".into()))?
155                .to_string(),
156            // Legacy lease objects (pre-#835) carry no term — default to the
157            // base replication term so they stay readable and act as "never
158            // fenced" until a termed primary re-stamps them.
159            term: obj
160                .get("term")
161                .and_then(JsonValue::as_u64)
162                .unwrap_or(crate::replication::DEFAULT_REPLICATION_TERM),
163            generation: obj
164                .get("generation")
165                .and_then(JsonValue::as_u64)
166                .ok_or_else(|| LeaseError::InvalidFormat("missing generation".into()))?,
167            acquired_at_ms: obj
168                .get("acquired_at_ms")
169                .and_then(JsonValue::as_u64)
170                .ok_or_else(|| LeaseError::InvalidFormat("missing acquired_at_ms".into()))?,
171            expires_at_ms: obj
172                .get("expires_at_ms")
173                .and_then(JsonValue::as_u64)
174                .ok_or_else(|| LeaseError::InvalidFormat("missing expires_at_ms".into()))?,
175        })
176    }
177}
178
179#[derive(Debug)]
180pub enum LeaseError {
181    Backend(BackendError),
182    /// A different holder owns a non-expired lease.
183    Held {
184        current: WriterLease,
185        now_ms: u64,
186    },
187    /// We uploaded a fresh lease but a re-read shows a different holder
188    /// or generation, so we lost a concurrent acquire race.
189    LostRace {
190        attempted_holder: String,
191        observed: WriterLease,
192    },
193    InvalidFormat(String),
194    /// The release/refresh target no longer matches what's on the
195    /// backend (lease was already poached or removed).
196    Stale {
197        attempted_holder: String,
198        attempted_generation: u64,
199        observed: Option<WriterLease>,
200    },
201    /// A holder on a term *behind* the current term tried to take or keep
202    /// the lease (issue #835). The deposed primary is fenced: a newer term
203    /// already owns the timeline, so the stale holder fails closed rather
204    /// than mutate it.
205    Fenced {
206        attempted_holder: String,
207        attempted_term: u64,
208        current_term: u64,
209    },
210}
211
212impl std::fmt::Display for LeaseError {
213    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
214        match self {
215            Self::Backend(err) => write!(f, "lease backend error: {err}"),
216            Self::Held { current, now_ms } => {
217                write!(
218                    f,
219                    "lease for '{}' held by '{}' (gen {}, expires in {} ms)",
220                    current.database_key,
221                    current.holder_id,
222                    current.generation,
223                    current.expires_at_ms.saturating_sub(*now_ms)
224                )
225            }
226            Self::LostRace {
227                attempted_holder,
228                observed,
229            } => write!(
230                f,
231                "lost lease acquire race: '{}' tried to take '{}' but '{}' (gen {}) won",
232                attempted_holder, observed.database_key, observed.holder_id, observed.generation
233            ),
234            Self::InvalidFormat(msg) => write!(f, "invalid lease format: {msg}"),
235            Self::Stale {
236                attempted_holder,
237                attempted_generation,
238                observed,
239            } => match observed {
240                Some(o) => write!(
241                    f,
242                    "stale lease op: '{}' (gen {}) tried to act, but current is '{}' (gen {})",
243                    attempted_holder, attempted_generation, o.holder_id, o.generation
244                ),
245                None => write!(
246                    f,
247                    "stale lease op: '{}' (gen {}) tried to act, but no lease present",
248                    attempted_holder, attempted_generation
249                ),
250            },
251            Self::Fenced {
252                attempted_holder,
253                attempted_term,
254                current_term,
255            } => write!(
256                f,
257                "fenced lease op: '{attempted_holder}' on stale term {attempted_term} \
258                 is behind current term {current_term}"
259            ),
260        }
261    }
262}
263
264impl std::error::Error for LeaseError {}
265
266impl From<BackendError> for LeaseError {
267    fn from(value: BackendError) -> Self {
268        Self::Backend(value)
269    }
270}
271
272struct VersionedLease {
273    lease: WriterLease,
274    version: BackendObjectVersion,
275}
276
277/// Wraps an `AtomicRemoteBackend` with lease primitives. The lease
278/// object is stored under a deterministic key derived from
279/// `database_key`; the store reads/writes that one key.
280///
281/// The trait bound `AtomicRemoteBackend` is the type-system version
282/// of "this backend can enforce CAS" — backends that cannot
283/// (Turso, D1, plain HTTP without ETag) deliberately do not
284/// implement the trait, so wiring them into a `LeaseStore` becomes
285/// a compile error rather than a runtime fail-closed.
286pub struct LeaseStore {
287    backend: Arc<dyn AtomicRemoteBackend>,
288    prefix: String,
289}
290
291impl LeaseStore {
292    pub fn new(backend: Arc<dyn AtomicRemoteBackend>) -> Self {
293        Self {
294            backend,
295            prefix: "leases/".to_string(),
296        }
297    }
298
299    pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
300        let p = prefix.into();
301        self.prefix = if p.ends_with('/') { p } else { format!("{p}/") };
302        self
303    }
304
305    fn key_for(&self, database_key: &str) -> String {
306        format!("{}{}.lease.json", self.prefix, database_key)
307    }
308
309    /// Read whatever lease object is currently published. `None` means
310    /// no lease has ever been written for this key.
311    pub fn current(&self, database_key: &str) -> Result<Option<WriterLease>, LeaseError> {
312        self.read_lease(database_key)
313    }
314
315    fn read_lease(&self, database_key: &str) -> Result<Option<WriterLease>, LeaseError> {
316        let key = self.key_for(database_key);
317        let temp = lease_temp_path("read");
318        let downloaded = self.backend.download(&key, &temp)?;
319        if !downloaded {
320            return Ok(None);
321        }
322        let bytes = std::fs::read(&temp)
323            .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
324        let _ = std::fs::remove_file(&temp);
325        let json: JsonValue = serde_json::from_slice(&bytes)
326            .map_err(|err| LeaseError::InvalidFormat(format!("lease json parse: {err}")))?;
327        WriterLease::from_json(&json).map(Some)
328    }
329
330    fn current_versioned(&self, database_key: &str) -> Result<Option<VersionedLease>, LeaseError> {
331        let key = self.key_for(database_key);
332        let before = match self.backend.object_version(&key)? {
333            Some(version) => version,
334            None => return Ok(None),
335        };
336        let temp = lease_temp_path("read");
337        let downloaded = self.backend.download(&key, &temp)?;
338        if !downloaded {
339            return Ok(None);
340        }
341        let after = self.backend.object_version(&key)?;
342        if after.as_ref() != Some(&before) {
343            let _ = std::fs::remove_file(&temp);
344            return Err(LeaseError::Backend(BackendError::PreconditionFailed(
345                "lease object changed while being read".to_string(),
346            )));
347        }
348        let bytes = std::fs::read(&temp)
349            .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
350        let _ = std::fs::remove_file(&temp);
351        let json: JsonValue = serde_json::from_slice(&bytes)
352            .map_err(|err| LeaseError::InvalidFormat(format!("lease json parse: {err}")))?;
353        Ok(Some(VersionedLease {
354            lease: WriterLease::from_json(&json)?,
355            version: before,
356        }))
357    }
358
359    /// Try to acquire the lease for `database_key` on behalf of
360    /// `holder_id`, valid for `ttl_ms`. Returns the `WriterLease` we
361    /// own on success. Errors:
362    /// - `LeaseError::Held` if a different holder owns a non-expired
363    ///   lease.
364    /// - `LeaseError::LostRace` if a concurrent contender beat us.
365    ///
366    /// Acquires under the base replication term; use
367    /// [`LeaseStore::try_acquire_for_term`] to stamp a specific term and
368    /// fence stale-term contenders (issue #835).
369    pub fn try_acquire(
370        &self,
371        database_key: &str,
372        holder_id: &str,
373        ttl_ms: u64,
374    ) -> Result<WriterLease, LeaseError> {
375        self.try_acquire_for_term(
376            database_key,
377            holder_id,
378            ttl_ms,
379            crate::replication::DEFAULT_REPLICATION_TERM,
380        )
381    }
382
383    /// Like [`LeaseStore::try_acquire`] but stamps `term` onto the lease
384    /// and **fences** any contender whose `term` is behind the published
385    /// lease's term (issue #835, ADR 0030).
386    ///
387    /// The term tie is what makes a deposed primary fail closed: a new
388    /// primary that won a higher term takes the lease under that term, so
389    /// a returning ex-primary on the old term sees a published lease whose
390    /// term is *ahead* of its own and is refused with `LeaseError::Fenced`
391    /// — even when the lease has since expired and would otherwise be
392    /// poachable. A stale holder can never re-take the writer slot until
393    /// it adopts the new term.
394    pub fn try_acquire_for_term(
395        &self,
396        database_key: &str,
397        holder_id: &str,
398        ttl_ms: u64,
399        term: u64,
400    ) -> Result<WriterLease, LeaseError> {
401        let now_ms = crate::utils::now_unix_millis();
402
403        let current = self.current_versioned(database_key)?;
404        // Term fence first — a contender behind the published term is a
405        // deposed writer and fails closed regardless of expiry or holder.
406        if let Some(c) = &current {
407            if term < c.lease.term {
408                return Err(LeaseError::Fenced {
409                    attempted_holder: holder_id.to_string(),
410                    attempted_term: term,
411                    current_term: c.lease.term,
412                });
413            }
414        }
415        // If a healthy lease exists held by someone else, refuse
416        // immediately. Two cases collapse: either the current holder
417        // is us (refresh) or it's somebody else with time left.
418        let next_generation = match &current {
419            Some(c) if !c.lease.is_expired(now_ms) && c.lease.holder_id != holder_id => {
420                return Err(LeaseError::Held {
421                    current: c.lease.clone(),
422                    now_ms,
423                });
424            }
425            Some(c) => c.lease.generation.saturating_add(1),
426            None => 1,
427        };
428
429        let new_lease = WriterLease {
430            database_key: database_key.to_string(),
431            holder_id: holder_id.to_string(),
432            term,
433            generation: next_generation,
434            acquired_at_ms: now_ms,
435            expires_at_ms: now_ms.saturating_add(ttl_ms),
436        };
437        let condition = match current {
438            Some(c) => ConditionalPut::IfVersion(c.version),
439            None => ConditionalPut::IfAbsent,
440        };
441        if let Err(err) = self.publish_conditional(&new_lease, condition) {
442            if matches!(
443                err,
444                LeaseError::Backend(BackendError::PreconditionFailed(_))
445            ) {
446                return self.acquire_race_error(database_key, holder_id, now_ms);
447            }
448            return Err(err);
449        }
450
451        // Re-read and verify nobody else won the same gap.
452        match self.current(database_key)? {
453            Some(observed)
454                if observed.holder_id == holder_id
455                    && observed.generation == new_lease.generation =>
456            {
457                Ok(new_lease)
458            }
459            Some(observed) => Err(LeaseError::LostRace {
460                attempted_holder: holder_id.to_string(),
461                observed,
462            }),
463            None => Err(LeaseError::LostRace {
464                attempted_holder: holder_id.to_string(),
465                observed: WriterLease {
466                    database_key: database_key.to_string(),
467                    holder_id: "<missing>".to_string(),
468                    term: 0,
469                    generation: 0,
470                    acquired_at_ms: 0,
471                    expires_at_ms: 0,
472                },
473            }),
474        }
475    }
476
477    fn acquire_race_error(
478        &self,
479        database_key: &str,
480        holder_id: &str,
481        now_ms: u64,
482    ) -> Result<WriterLease, LeaseError> {
483        match self.current(database_key)? {
484            Some(observed) if !observed.is_expired(now_ms) && observed.holder_id != holder_id => {
485                Err(LeaseError::Held {
486                    current: observed,
487                    now_ms,
488                })
489            }
490            Some(observed) => Err(LeaseError::LostRace {
491                attempted_holder: holder_id.to_string(),
492                observed,
493            }),
494            None => Err(LeaseError::LostRace {
495                attempted_holder: holder_id.to_string(),
496                observed: WriterLease {
497                    database_key: database_key.to_string(),
498                    holder_id: "<missing>".to_string(),
499                    term: 0,
500                    generation: 0,
501                    acquired_at_ms: 0,
502                    expires_at_ms: 0,
503                },
504            }),
505        }
506    }
507
508    /// Refresh `lease.expires_at_ms` to `now + ttl_ms`. Fails with
509    /// `Stale` if the holder/generation no longer matches what's
510    /// currently published. The returned lease is the new
511    /// in-effect record.
512    pub fn refresh(&self, lease: &WriterLease, ttl_ms: u64) -> Result<WriterLease, LeaseError> {
513        let now_ms = crate::utils::now_unix_millis();
514        let observed = self.current_versioned(&lease.database_key)?;
515        match observed {
516            Some(o)
517                if o.lease.holder_id == lease.holder_id
518                    && o.lease.generation == lease.generation =>
519            {
520                let mut next = lease.clone();
521                next.expires_at_ms = now_ms.saturating_add(ttl_ms);
522                if let Err(err) =
523                    self.publish_conditional(&next, ConditionalPut::IfVersion(o.version))
524                {
525                    if matches!(
526                        err,
527                        LeaseError::Backend(BackendError::PreconditionFailed(_))
528                    ) {
529                        return Err(LeaseError::Stale {
530                            attempted_holder: lease.holder_id.clone(),
531                            attempted_generation: lease.generation,
532                            observed: self.current(&lease.database_key)?,
533                        });
534                    }
535                    return Err(err);
536                }
537                Ok(next)
538            }
539            other => Err(LeaseError::Stale {
540                attempted_holder: lease.holder_id.clone(),
541                attempted_generation: lease.generation,
542                observed: other.map(|v| v.lease),
543            }),
544        }
545    }
546
547    /// Refresh the lease, but **fail closed** if the holder's term has
548    /// fallen behind `current_term` (issue #835). This is the keep-alive
549    /// counterpart to the acquire fence: a primary that was deposed while
550    /// holding a live lease cannot keep extending it once the cluster has
551    /// moved to a newer term — its next refresh is fenced before it ever
552    /// touches the backend, so it stops mutating and drains.
553    ///
554    /// When the holder's own term still matches or leads `current_term`,
555    /// this is exactly [`LeaseStore::refresh`].
556    pub fn refresh_for_term(
557        &self,
558        lease: &WriterLease,
559        ttl_ms: u64,
560        current_term: u64,
561    ) -> Result<WriterLease, LeaseError> {
562        if lease.fenced_by_term(current_term) {
563            return Err(LeaseError::Fenced {
564                attempted_holder: lease.holder_id.clone(),
565                attempted_term: lease.term,
566                current_term,
567            });
568        }
569        self.refresh(lease, ttl_ms)
570    }
571
572    /// Release the lease. Only succeeds when the published lease
573    /// matches `lease.holder_id + lease.generation`. A stolen or
574    /// already-replaced lease returns `Stale`.
575    pub fn release(&self, lease: &WriterLease) -> Result<(), LeaseError> {
576        let observed = self.current_versioned(&lease.database_key)?;
577        match observed {
578            Some(o)
579                if o.lease.holder_id == lease.holder_id
580                    && o.lease.generation == lease.generation =>
581            {
582                let key = self.key_for(&lease.database_key);
583                if let Err(err) = self
584                    .backend
585                    .delete_conditional(&key, ConditionalDelete::IfVersion(o.version))
586                {
587                    if matches!(err, BackendError::PreconditionFailed(_)) {
588                        return Err(LeaseError::Stale {
589                            attempted_holder: lease.holder_id.clone(),
590                            attempted_generation: lease.generation,
591                            observed: self.current(&lease.database_key)?,
592                        });
593                    }
594                    return Err(err.into());
595                }
596                Ok(())
597            }
598            other => Err(LeaseError::Stale {
599                attempted_holder: lease.holder_id.clone(),
600                attempted_generation: lease.generation,
601                observed: other.map(|v| v.lease),
602            }),
603        }
604    }
605
606    fn publish_conditional(
607        &self,
608        lease: &WriterLease,
609        condition: ConditionalPut,
610    ) -> Result<BackendObjectVersion, LeaseError> {
611        let key = self.key_for(&lease.database_key);
612        let json = lease.to_json();
613        let bytes = serde_json::to_vec(&json).map_err(|err| {
614            LeaseError::Backend(BackendError::Internal(format!("serialize lease: {err}")))
615        })?;
616        let temp = lease_temp_path("write");
617        std::fs::write(&temp, &bytes)
618            .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
619        let res = self.backend.upload_conditional(&temp, &key, condition);
620        let _ = std::fs::remove_file(&temp);
621        Ok(res?)
622    }
623}
624
625#[cfg(test)]
626mod tests {
627    use super::*;
628    use crate::storage::backend::LocalBackend;
629    use std::path::Path;
630
631    fn store() -> LeaseStore {
632        LeaseStore::new(Arc::new(LocalBackend)).with_prefix(format!(
633            "{}/leases-test-{}",
634            std::env::temp_dir().to_string_lossy(),
635            std::time::SystemTime::now()
636                .duration_since(std::time::UNIX_EPOCH)
637                .unwrap()
638                .as_nanos()
639        ))
640    }
641
642    #[test]
643    fn first_acquire_assigns_generation_one() {
644        let s = store();
645        let lease = s.try_acquire("db", "writer-a", 60_000).unwrap();
646        assert_eq!(lease.generation, 1);
647        assert_eq!(lease.holder_id, "writer-a");
648    }
649
650    #[test]
651    fn second_holder_rejected_while_first_alive() {
652        let s = store();
653        let _ = s.try_acquire("db", "writer-a", 60_000).unwrap();
654        let err = s.try_acquire("db", "writer-b", 60_000).unwrap_err();
655        match err {
656            LeaseError::Held { current, .. } => {
657                assert_eq!(current.holder_id, "writer-a");
658                assert_eq!(current.generation, 1);
659            }
660            other => panic!("expected Held, got {other:?}"),
661        }
662    }
663
664    #[test]
665    fn expired_lease_is_poachable() {
666        let s = store();
667        let _ = s.try_acquire("db", "writer-a", 1).unwrap();
668        std::thread::sleep(std::time::Duration::from_millis(10));
669        let lease = s.try_acquire("db", "writer-b", 60_000).unwrap();
670        assert_eq!(lease.holder_id, "writer-b");
671        assert_eq!(
672            lease.generation, 2,
673            "generation must increment when poaching"
674        );
675    }
676
677    #[test]
678    fn release_clears_so_anyone_can_take_again() {
679        let s = store();
680        let lease = s.try_acquire("db", "writer-a", 60_000).unwrap();
681        s.release(&lease).unwrap();
682        // After release the slot is empty — generation resets to 1
683        // because the previous record is gone.
684        let next = s.try_acquire("db", "writer-b", 60_000).unwrap();
685        assert_eq!(next.holder_id, "writer-b");
686        assert_eq!(next.generation, 1);
687    }
688
689    #[test]
690    fn refresh_extends_expiration_for_same_holder() {
691        let s = store();
692        let lease = s.try_acquire("db", "writer-a", 1_000).unwrap();
693        std::thread::sleep(std::time::Duration::from_millis(20));
694        let refreshed = s.refresh(&lease, 60_000).unwrap();
695        assert_eq!(refreshed.generation, lease.generation);
696        assert!(refreshed.expires_at_ms > lease.expires_at_ms);
697    }
698
699    #[test]
700    fn refresh_fails_when_someone_else_owns() {
701        let s = store();
702        let lease = s.try_acquire("db", "writer-a", 1).unwrap();
703        std::thread::sleep(std::time::Duration::from_millis(10));
704        let _ = s.try_acquire("db", "writer-b", 60_000).unwrap();
705        let err = s.refresh(&lease, 60_000).unwrap_err();
706        assert!(matches!(err, LeaseError::Stale { .. }));
707    }
708
709    #[test]
710    fn acquire_stamps_term_onto_lease() {
711        let s = store();
712        let lease = s.try_acquire_for_term("db", "writer-a", 60_000, 7).unwrap();
713        assert_eq!(lease.term, 7);
714        assert_eq!(lease.fencing_token(), (7, 1));
715    }
716
717    #[test]
718    fn legacy_lease_defaults_to_base_term() {
719        // A lease acquired through the term-agnostic API carries the base
720        // replication term, so it is never fenced until a termed primary
721        // re-stamps it.
722        let s = store();
723        let lease = s.try_acquire("db", "writer-a", 60_000).unwrap();
724        assert_eq!(lease.term, crate::replication::DEFAULT_REPLICATION_TERM);
725        assert!(!lease.fenced_by_term(crate::replication::DEFAULT_REPLICATION_TERM));
726    }
727
728    #[test]
729    fn stale_term_contender_is_fenced_even_when_lease_expired() {
730        // New primary holds the lease at term 5. The lease then expires,
731        // but a returning ex-primary on the old term 4 still cannot poach
732        // it — the term fence refuses before expiry is even consulted.
733        let s = store();
734        let _new_primary = s.try_acquire_for_term("db", "new-primary", 1, 5).unwrap();
735        std::thread::sleep(std::time::Duration::from_millis(10));
736        let err = s
737            .try_acquire_for_term("db", "ex-primary", 60_000, 4)
738            .unwrap_err();
739        match err {
740            LeaseError::Fenced {
741                attempted_term,
742                current_term,
743                ..
744            } => {
745                assert_eq!(attempted_term, 4);
746                assert_eq!(current_term, 5);
747            }
748            other => panic!("expected Fenced, got {other:?}"),
749        }
750    }
751
752    #[test]
753    fn same_or_higher_term_contender_may_poach_expired_lease() {
754        // The fence only bites a *behind* term. A contender at the same or
755        // a higher term takes an expired lease normally, and the generation
756        // advances with the handover.
757        let s = store();
758        let _ = s.try_acquire_for_term("db", "old", 1, 5).unwrap();
759        std::thread::sleep(std::time::Duration::from_millis(10));
760        let lease = s.try_acquire_for_term("db", "new", 60_000, 6).unwrap();
761        assert_eq!(lease.holder_id, "new");
762        assert_eq!(lease.term, 6);
763        assert_eq!(lease.generation, 2, "poaching advances the generation");
764    }
765
766    #[test]
767    fn refresh_for_term_fails_closed_once_term_advances() {
768        // A primary holds a live lease at term 4, then the cluster moves to
769        // term 5 underneath it. Its keep-alive refresh is fenced before it
770        // touches the backend — the deposed holder stops mutating.
771        let s = store();
772        let lease = s.try_acquire_for_term("db", "deposed", 60_000, 4).unwrap();
773        let err = s.refresh_for_term(&lease, 60_000, 5).unwrap_err();
774        match err {
775            LeaseError::Fenced {
776                attempted_holder,
777                attempted_term,
778                current_term,
779            } => {
780                assert_eq!(attempted_holder, "deposed");
781                assert_eq!(attempted_term, 4);
782                assert_eq!(current_term, 5);
783            }
784            other => panic!("expected Fenced, got {other:?}"),
785        }
786    }
787
788    #[test]
789    fn refresh_for_term_succeeds_while_term_holds() {
790        let s = store();
791        let lease = s.try_acquire_for_term("db", "primary", 1_000, 5).unwrap();
792        std::thread::sleep(std::time::Duration::from_millis(20));
793        let refreshed = s.refresh_for_term(&lease, 60_000, 5).unwrap();
794        assert_eq!(refreshed.term, 5);
795        assert!(refreshed.expires_at_ms > lease.expires_at_ms);
796    }
797
798    // The legacy `acquire_fails_closed_without_backend_conditional_writes`
799    // test was deleted with the trait split: `LeaseStore::new` now requires
800    // `Arc<dyn AtomicRemoteBackend>`, so a non-CAS backend cannot even be
801    // wired into the constructor — the test is enforced at compile time
802    // (see tests/lease_atomic_http_opt_in.rs for the runtime-config branch).
803}