Skip to main content

reddb_server/replication/
lease.rs

1//! Serverless writer lease (PLAN.md Phase 5 / W6).
2//!
3//! Multiple RedDB instances can attach to the same remote-backed
4//! database key. To prevent two of them from concurrently mutating the
5//! same remote artifacts (snapshots, WAL segments, head manifest),
6//! exactly one of them must hold a *writer lease*. Other instances may
7//! still attach as read-only replicas without acquiring a lease.
8//!
9//! ## Artifact format
10//!
11//! The lease is serialized as a serverless writer-lease artifact owned by
12//! `reddb-file`. This module only computes policy and performs backend
13//! compare-and-swap around that artifact.
14//!
15//! - `generation` increments every time a different holder acquires
16//!   the key. The holder stamps its uploads with the generation so a
17//!   stale writer (whose lease was poached because it expired) can be
18//!   detected during reclaim by reading the current lease and
19//!   comparing.
20//! - `term` is the replication term the holder acquired under (issue
21//!   #835, ADR 0030). A contender on a term *behind* the published
22//!   lease's term is a deposed primary and is fenced
23//!   (`LeaseError::Fenced`) — even if the lease has expired and would
24//!   otherwise be poachable. Legacy lease objects without a `term`
25//!   field decode as `DEFAULT_REPLICATION_TERM`.
26//! - `expires_at_ms` is wall-clock millis since UNIX epoch. A holder
27//!   refreshes it periodically; a contender treats anything past it as
28//!   poachable.
29//!
30//! ## Atomicity contract
31//!
32//! Lease mutation requires backend-side compare-and-swap. Backends
33//! advertise this through `RemoteBackend::supports_conditional_writes`
34//! and implement object version tokens + conditional writes/deletes.
35//! A backend that cannot enforce `IfAbsent` / `IfVersion` fails
36//! closed before the instance is allowed to write. This keeps
37//! serverless fencing out of "last writer wins" territory.
38
39use std::sync::Arc;
40
41use crate::storage::backend::{
42    AtomicRemoteBackend, BackendError, BackendObjectVersion, ConditionalDelete, ConditionalPut,
43};
44
45pub use reddb_file::ServerlessWriterLease as WriterLease;
46
47#[derive(Debug)]
48pub enum LeaseError {
49    Backend(BackendError),
50    /// A different holder owns a non-expired lease.
51    Held {
52        current: WriterLease,
53        now_ms: u64,
54    },
55    /// We uploaded a fresh lease but a re-read shows a different holder
56    /// or generation, so we lost a concurrent acquire race.
57    LostRace {
58        attempted_holder: String,
59        observed: WriterLease,
60    },
61    InvalidFormat(String),
62    /// The release/refresh target no longer matches what's on the
63    /// backend (lease was already poached or removed).
64    Stale {
65        attempted_holder: String,
66        attempted_generation: u64,
67        observed: Option<WriterLease>,
68    },
69    /// A holder on a term *behind* the current term tried to take or keep
70    /// the lease (issue #835). The deposed primary is fenced: a newer term
71    /// already owns the timeline, so the stale holder fails closed rather
72    /// than mutate it.
73    Fenced {
74        attempted_holder: String,
75        attempted_term: u64,
76        current_term: u64,
77    },
78}
79
80impl std::fmt::Display for LeaseError {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        match self {
83            Self::Backend(err) => write!(f, "lease backend error: {err}"),
84            Self::Held { current, now_ms } => {
85                write!(
86                    f,
87                    "lease for '{}' held by '{}' (gen {}, expires in {} ms)",
88                    current.database_key,
89                    current.holder_id,
90                    current.generation,
91                    current.expires_at_ms.saturating_sub(*now_ms)
92                )
93            }
94            Self::LostRace {
95                attempted_holder,
96                observed,
97            } => write!(
98                f,
99                "lost lease acquire race: '{}' tried to take '{}' but '{}' (gen {}) won",
100                attempted_holder, observed.database_key, observed.holder_id, observed.generation
101            ),
102            Self::InvalidFormat(msg) => write!(f, "invalid lease format: {msg}"),
103            Self::Stale {
104                attempted_holder,
105                attempted_generation,
106                observed,
107            } => match observed {
108                Some(o) => write!(
109                    f,
110                    "stale lease op: '{}' (gen {}) tried to act, but current is '{}' (gen {})",
111                    attempted_holder, attempted_generation, o.holder_id, o.generation
112                ),
113                None => write!(
114                    f,
115                    "stale lease op: '{}' (gen {}) tried to act, but no lease present",
116                    attempted_holder, attempted_generation
117                ),
118            },
119            Self::Fenced {
120                attempted_holder,
121                attempted_term,
122                current_term,
123            } => write!(
124                f,
125                "fenced lease op: '{attempted_holder}' on stale term {attempted_term} \
126                 is behind current term {current_term}"
127            ),
128        }
129    }
130}
131
132impl std::error::Error for LeaseError {}
133
134impl From<BackendError> for LeaseError {
135    fn from(value: BackendError) -> Self {
136        Self::Backend(value)
137    }
138}
139
140struct VersionedLease {
141    lease: WriterLease,
142    version: BackendObjectVersion,
143}
144
145/// Wraps an `AtomicRemoteBackend` with lease primitives. The lease
146/// object is stored under a deterministic key derived from
147/// `database_key`; the store reads/writes that one key.
148///
149/// The trait bound `AtomicRemoteBackend` is the type-system version
150/// of "this backend can enforce CAS" — backends that cannot
151/// (Turso, D1, plain HTTP without ETag) deliberately do not
152/// implement the trait, so wiring them into a `LeaseStore` becomes
153/// a compile error rather than a runtime fail-closed.
154pub struct LeaseStore {
155    backend: Arc<dyn AtomicRemoteBackend>,
156    prefix: String,
157}
158
159impl LeaseStore {
160    pub fn new(backend: Arc<dyn AtomicRemoteBackend>) -> Self {
161        Self {
162            backend,
163            prefix: "leases/".to_string(),
164        }
165    }
166
167    pub fn with_prefix(mut self, prefix: impl Into<String>) -> Self {
168        let p = prefix.into();
169        self.prefix = if p.ends_with('/') { p } else { format!("{p}/") };
170        self
171    }
172
173    fn key_for(&self, database_key: &str) -> String {
174        reddb_file::serverless_writer_lease_key(&self.prefix, database_key)
175    }
176
177    /// Read whatever lease object is currently published. `None` means
178    /// no lease has ever been written for this key.
179    pub fn current(&self, database_key: &str) -> Result<Option<WriterLease>, LeaseError> {
180        self.read_lease(database_key)
181    }
182
183    fn read_lease(&self, database_key: &str) -> Result<Option<WriterLease>, LeaseError> {
184        let key = self.key_for(database_key);
185        let temp = reddb_file::ServerlessWriterLeaseTempFile::new("read");
186        let downloaded = self.backend.download(&key, temp.path())?;
187        if !downloaded {
188            return Ok(None);
189        }
190        let bytes = temp
191            .read_bytes()
192            .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
193        decode_writer_lease(&bytes).map(Some)
194    }
195
196    fn current_versioned(&self, database_key: &str) -> Result<Option<VersionedLease>, LeaseError> {
197        let key = self.key_for(database_key);
198        let before = match self.backend.object_version(&key)? {
199            Some(version) => version,
200            None => return Ok(None),
201        };
202        let temp = reddb_file::ServerlessWriterLeaseTempFile::new("read");
203        let downloaded = self.backend.download(&key, temp.path())?;
204        if !downloaded {
205            return Ok(None);
206        }
207        let after = self.backend.object_version(&key)?;
208        if after.as_ref() != Some(&before) {
209            return Err(LeaseError::Backend(BackendError::PreconditionFailed(
210                "lease object changed while being read".to_string(),
211            )));
212        }
213        let bytes = temp
214            .read_bytes()
215            .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
216        Ok(Some(VersionedLease {
217            lease: decode_writer_lease(&bytes)?,
218            version: before,
219        }))
220    }
221
222    /// Try to acquire the lease for `database_key` on behalf of
223    /// `holder_id`, valid for `ttl_ms`. Returns the `WriterLease` we
224    /// own on success. Errors:
225    /// - `LeaseError::Held` if a different holder owns a non-expired
226    ///   lease.
227    /// - `LeaseError::LostRace` if a concurrent contender beat us.
228    ///
229    /// Acquires under the base replication term; use
230    /// [`LeaseStore::try_acquire_for_term`] to stamp a specific term and
231    /// fence stale-term contenders (issue #835).
232    pub fn try_acquire(
233        &self,
234        database_key: &str,
235        holder_id: &str,
236        ttl_ms: u64,
237    ) -> Result<WriterLease, LeaseError> {
238        self.try_acquire_for_term(
239            database_key,
240            holder_id,
241            ttl_ms,
242            crate::replication::DEFAULT_REPLICATION_TERM,
243        )
244    }
245
246    /// Like [`LeaseStore::try_acquire`] but stamps `term` onto the lease
247    /// and **fences** any contender whose `term` is behind the published
248    /// lease's term (issue #835, ADR 0030).
249    ///
250    /// The term tie is what makes a deposed primary fail closed: a new
251    /// primary that won a higher term takes the lease under that term, so
252    /// a returning ex-primary on the old term sees a published lease whose
253    /// term is *ahead* of its own and is refused with `LeaseError::Fenced`
254    /// — even when the lease has since expired and would otherwise be
255    /// poachable. A stale holder can never re-take the writer slot until
256    /// it adopts the new term.
257    pub fn try_acquire_for_term(
258        &self,
259        database_key: &str,
260        holder_id: &str,
261        ttl_ms: u64,
262        term: u64,
263    ) -> Result<WriterLease, LeaseError> {
264        let now_ms = crate::utils::now_unix_millis();
265
266        let current = self.current_versioned(database_key)?;
267        // Term fence first — a contender behind the published term is a
268        // deposed writer and fails closed regardless of expiry or holder.
269        if let Some(c) = &current {
270            if term < c.lease.term {
271                return Err(LeaseError::Fenced {
272                    attempted_holder: holder_id.to_string(),
273                    attempted_term: term,
274                    current_term: c.lease.term,
275                });
276            }
277        }
278        // If a healthy lease exists held by someone else, refuse
279        // immediately. Two cases collapse: either the current holder
280        // is us (refresh) or it's somebody else with time left.
281        let next_generation = match &current {
282            Some(c) if !c.lease.is_expired(now_ms) && c.lease.holder_id != holder_id => {
283                return Err(LeaseError::Held {
284                    current: c.lease.clone(),
285                    now_ms,
286                });
287            }
288            Some(c) => c.lease.generation.saturating_add(1),
289            None => 1,
290        };
291
292        let new_lease = WriterLease {
293            database_key: database_key.to_string(),
294            holder_id: holder_id.to_string(),
295            term,
296            generation: next_generation,
297            acquired_at_ms: now_ms,
298            expires_at_ms: now_ms.saturating_add(ttl_ms),
299        };
300        let condition = match current {
301            Some(c) => ConditionalPut::IfVersion(c.version),
302            None => ConditionalPut::IfAbsent,
303        };
304        if let Err(err) = self.publish_conditional(&new_lease, condition) {
305            if matches!(
306                err,
307                LeaseError::Backend(BackendError::PreconditionFailed(_))
308            ) {
309                return self.acquire_race_error(database_key, holder_id, now_ms);
310            }
311            return Err(err);
312        }
313
314        // Re-read and verify nobody else won the same gap.
315        match self.current(database_key)? {
316            Some(observed)
317                if observed.holder_id == holder_id
318                    && observed.generation == new_lease.generation =>
319            {
320                Ok(new_lease)
321            }
322            Some(observed) => Err(LeaseError::LostRace {
323                attempted_holder: holder_id.to_string(),
324                observed,
325            }),
326            None => Err(LeaseError::LostRace {
327                attempted_holder: holder_id.to_string(),
328                observed: WriterLease {
329                    database_key: database_key.to_string(),
330                    holder_id: "<missing>".to_string(),
331                    term: 0,
332                    generation: 0,
333                    acquired_at_ms: 0,
334                    expires_at_ms: 0,
335                },
336            }),
337        }
338    }
339
340    fn acquire_race_error(
341        &self,
342        database_key: &str,
343        holder_id: &str,
344        now_ms: u64,
345    ) -> Result<WriterLease, LeaseError> {
346        match self.current(database_key)? {
347            Some(observed) if !observed.is_expired(now_ms) && observed.holder_id != holder_id => {
348                Err(LeaseError::Held {
349                    current: observed,
350                    now_ms,
351                })
352            }
353            Some(observed) => Err(LeaseError::LostRace {
354                attempted_holder: holder_id.to_string(),
355                observed,
356            }),
357            None => Err(LeaseError::LostRace {
358                attempted_holder: holder_id.to_string(),
359                observed: WriterLease {
360                    database_key: database_key.to_string(),
361                    holder_id: "<missing>".to_string(),
362                    term: 0,
363                    generation: 0,
364                    acquired_at_ms: 0,
365                    expires_at_ms: 0,
366                },
367            }),
368        }
369    }
370
371    /// Refresh `lease.expires_at_ms` to `now + ttl_ms`. Fails with
372    /// `Stale` if the holder/generation no longer matches what's
373    /// currently published. The returned lease is the new
374    /// in-effect record.
375    pub fn refresh(&self, lease: &WriterLease, ttl_ms: u64) -> Result<WriterLease, LeaseError> {
376        let now_ms = crate::utils::now_unix_millis();
377        let observed = self.current_versioned(&lease.database_key)?;
378        match observed {
379            Some(o)
380                if o.lease.holder_id == lease.holder_id
381                    && o.lease.generation == lease.generation =>
382            {
383                let mut next = lease.clone();
384                next.expires_at_ms = now_ms.saturating_add(ttl_ms);
385                if let Err(err) =
386                    self.publish_conditional(&next, ConditionalPut::IfVersion(o.version))
387                {
388                    if matches!(
389                        err,
390                        LeaseError::Backend(BackendError::PreconditionFailed(_))
391                    ) {
392                        return Err(LeaseError::Stale {
393                            attempted_holder: lease.holder_id.clone(),
394                            attempted_generation: lease.generation,
395                            observed: self.current(&lease.database_key)?,
396                        });
397                    }
398                    return Err(err);
399                }
400                Ok(next)
401            }
402            other => Err(LeaseError::Stale {
403                attempted_holder: lease.holder_id.clone(),
404                attempted_generation: lease.generation,
405                observed: other.map(|v| v.lease),
406            }),
407        }
408    }
409
410    /// Refresh the lease, but **fail closed** if the holder's term has
411    /// fallen behind `current_term` (issue #835). This is the keep-alive
412    /// counterpart to the acquire fence: a primary that was deposed while
413    /// holding a live lease cannot keep extending it once the cluster has
414    /// moved to a newer term — its next refresh is fenced before it ever
415    /// touches the backend, so it stops mutating and drains.
416    ///
417    /// When the holder's own term still matches or leads `current_term`,
418    /// this is exactly [`LeaseStore::refresh`].
419    pub fn refresh_for_term(
420        &self,
421        lease: &WriterLease,
422        ttl_ms: u64,
423        current_term: u64,
424    ) -> Result<WriterLease, LeaseError> {
425        if lease.fenced_by_term(current_term) {
426            return Err(LeaseError::Fenced {
427                attempted_holder: lease.holder_id.clone(),
428                attempted_term: lease.term,
429                current_term,
430            });
431        }
432        self.refresh(lease, ttl_ms)
433    }
434
435    /// Release the lease. Only succeeds when the published lease
436    /// matches `lease.holder_id + lease.generation`. A stolen or
437    /// already-replaced lease returns `Stale`.
438    pub fn release(&self, lease: &WriterLease) -> Result<(), LeaseError> {
439        let observed = self.current_versioned(&lease.database_key)?;
440        match observed {
441            Some(o)
442                if o.lease.holder_id == lease.holder_id
443                    && o.lease.generation == lease.generation =>
444            {
445                let key = self.key_for(&lease.database_key);
446                if let Err(err) = self
447                    .backend
448                    .delete_conditional(&key, ConditionalDelete::IfVersion(o.version))
449                {
450                    if matches!(err, BackendError::PreconditionFailed(_)) {
451                        return Err(LeaseError::Stale {
452                            attempted_holder: lease.holder_id.clone(),
453                            attempted_generation: lease.generation,
454                            observed: self.current(&lease.database_key)?,
455                        });
456                    }
457                    return Err(err.into());
458                }
459                Ok(())
460            }
461            other => Err(LeaseError::Stale {
462                attempted_holder: lease.holder_id.clone(),
463                attempted_generation: lease.generation,
464                observed: other.map(|v| v.lease),
465            }),
466        }
467    }
468
469    fn publish_conditional(
470        &self,
471        lease: &WriterLease,
472        condition: ConditionalPut,
473    ) -> Result<BackendObjectVersion, LeaseError> {
474        let key = self.key_for(&lease.database_key);
475        let bytes = reddb_file::encode_serverless_writer_lease_json(lease)
476            .map_err(|err| LeaseError::Backend(BackendError::Internal(err.to_string())))?;
477        let temp = reddb_file::ServerlessWriterLeaseTempFile::new("write");
478        temp.write_bytes(&bytes)
479            .map_err(|err| LeaseError::Backend(BackendError::Transport(err.to_string())))?;
480        Ok(self
481            .backend
482            .upload_conditional(temp.path(), &key, condition)?)
483    }
484}
485
486fn decode_writer_lease(bytes: &[u8]) -> Result<WriterLease, LeaseError> {
487    reddb_file::decode_serverless_writer_lease_json(bytes)
488        .map_err(|err| LeaseError::InvalidFormat(err.to_string()))
489}
490
491#[cfg(test)]
492mod tests {
493    use super::*;
494    use crate::storage::backend::LocalBackend;
495    use std::path::Path;
496
497    fn store() -> LeaseStore {
498        LeaseStore::new(Arc::new(LocalBackend)).with_prefix(format!(
499            "{}/leases-test-{}",
500            std::env::temp_dir().to_string_lossy(),
501            std::time::SystemTime::now()
502                .duration_since(std::time::UNIX_EPOCH)
503                .unwrap()
504                .as_nanos()
505        ))
506    }
507
508    #[test]
509    fn first_acquire_assigns_generation_one() {
510        let s = store();
511        let lease = s.try_acquire("db", "writer-a", 60_000).unwrap();
512        assert_eq!(lease.generation, 1);
513        assert_eq!(lease.holder_id, "writer-a");
514    }
515
516    #[test]
517    fn second_holder_rejected_while_first_alive() {
518        let s = store();
519        let _ = s.try_acquire("db", "writer-a", 60_000).unwrap();
520        let err = s.try_acquire("db", "writer-b", 60_000).unwrap_err();
521        match err {
522            LeaseError::Held { current, .. } => {
523                assert_eq!(current.holder_id, "writer-a");
524                assert_eq!(current.generation, 1);
525            }
526            other => panic!("expected Held, got {other:?}"),
527        }
528    }
529
530    #[test]
531    fn expired_lease_is_poachable() {
532        let s = store();
533        let _ = s.try_acquire("db", "writer-a", 1).unwrap();
534        std::thread::sleep(std::time::Duration::from_millis(10));
535        let lease = s.try_acquire("db", "writer-b", 60_000).unwrap();
536        assert_eq!(lease.holder_id, "writer-b");
537        assert_eq!(
538            lease.generation, 2,
539            "generation must increment when poaching"
540        );
541    }
542
543    #[test]
544    fn release_clears_so_anyone_can_take_again() {
545        let s = store();
546        let lease = s.try_acquire("db", "writer-a", 60_000).unwrap();
547        s.release(&lease).unwrap();
548        // After release the slot is empty — generation resets to 1
549        // because the previous record is gone.
550        let next = s.try_acquire("db", "writer-b", 60_000).unwrap();
551        assert_eq!(next.holder_id, "writer-b");
552        assert_eq!(next.generation, 1);
553    }
554
555    #[test]
556    fn refresh_extends_expiration_for_same_holder() {
557        let s = store();
558        let lease = s.try_acquire("db", "writer-a", 1_000).unwrap();
559        std::thread::sleep(std::time::Duration::from_millis(20));
560        let refreshed = s.refresh(&lease, 60_000).unwrap();
561        assert_eq!(refreshed.generation, lease.generation);
562        assert!(refreshed.expires_at_ms > lease.expires_at_ms);
563    }
564
565    #[test]
566    fn refresh_fails_when_someone_else_owns() {
567        let s = store();
568        let lease = s.try_acquire("db", "writer-a", 1).unwrap();
569        std::thread::sleep(std::time::Duration::from_millis(10));
570        let _ = s.try_acquire("db", "writer-b", 60_000).unwrap();
571        let err = s.refresh(&lease, 60_000).unwrap_err();
572        assert!(matches!(err, LeaseError::Stale { .. }));
573    }
574
575    #[test]
576    fn acquire_stamps_term_onto_lease() {
577        let s = store();
578        let lease = s.try_acquire_for_term("db", "writer-a", 60_000, 7).unwrap();
579        assert_eq!(lease.term, 7);
580        assert_eq!(lease.fencing_token(), (7, 1));
581    }
582
583    #[test]
584    fn legacy_lease_defaults_to_base_term() {
585        // A lease acquired through the term-agnostic API carries the base
586        // replication term, so it is never fenced until a termed primary
587        // re-stamps it.
588        let s = store();
589        let lease = s.try_acquire("db", "writer-a", 60_000).unwrap();
590        assert_eq!(lease.term, crate::replication::DEFAULT_REPLICATION_TERM);
591        assert!(!lease.fenced_by_term(crate::replication::DEFAULT_REPLICATION_TERM));
592    }
593
594    #[test]
595    fn stale_term_contender_is_fenced_even_when_lease_expired() {
596        // New primary holds the lease at term 5. The lease then expires,
597        // but a returning ex-primary on the old term 4 still cannot poach
598        // it — the term fence refuses before expiry is even consulted.
599        let s = store();
600        let _new_primary = s.try_acquire_for_term("db", "new-primary", 1, 5).unwrap();
601        std::thread::sleep(std::time::Duration::from_millis(10));
602        let err = s
603            .try_acquire_for_term("db", "ex-primary", 60_000, 4)
604            .unwrap_err();
605        match err {
606            LeaseError::Fenced {
607                attempted_term,
608                current_term,
609                ..
610            } => {
611                assert_eq!(attempted_term, 4);
612                assert_eq!(current_term, 5);
613            }
614            other => panic!("expected Fenced, got {other:?}"),
615        }
616    }
617
618    #[test]
619    fn same_or_higher_term_contender_may_poach_expired_lease() {
620        // The fence only bites a *behind* term. A contender at the same or
621        // a higher term takes an expired lease normally, and the generation
622        // advances with the handover.
623        let s = store();
624        let _ = s.try_acquire_for_term("db", "old", 1, 5).unwrap();
625        std::thread::sleep(std::time::Duration::from_millis(10));
626        let lease = s.try_acquire_for_term("db", "new", 60_000, 6).unwrap();
627        assert_eq!(lease.holder_id, "new");
628        assert_eq!(lease.term, 6);
629        assert_eq!(lease.generation, 2, "poaching advances the generation");
630    }
631
632    #[test]
633    fn refresh_for_term_fails_closed_once_term_advances() {
634        // A primary holds a live lease at term 4, then the cluster moves to
635        // term 5 underneath it. Its keep-alive refresh is fenced before it
636        // touches the backend — the deposed holder stops mutating.
637        let s = store();
638        let lease = s.try_acquire_for_term("db", "deposed", 60_000, 4).unwrap();
639        let err = s.refresh_for_term(&lease, 60_000, 5).unwrap_err();
640        match err {
641            LeaseError::Fenced {
642                attempted_holder,
643                attempted_term,
644                current_term,
645            } => {
646                assert_eq!(attempted_holder, "deposed");
647                assert_eq!(attempted_term, 4);
648                assert_eq!(current_term, 5);
649            }
650            other => panic!("expected Fenced, got {other:?}"),
651        }
652    }
653
654    #[test]
655    fn refresh_for_term_succeeds_while_term_holds() {
656        let s = store();
657        let lease = s.try_acquire_for_term("db", "primary", 1_000, 5).unwrap();
658        std::thread::sleep(std::time::Duration::from_millis(20));
659        let refreshed = s.refresh_for_term(&lease, 60_000, 5).unwrap();
660        assert_eq!(refreshed.term, 5);
661        assert!(refreshed.expires_at_ms > lease.expires_at_ms);
662    }
663
664    // The legacy `acquire_fails_closed_without_backend_conditional_writes`
665    // test was deleted with the trait split: `LeaseStore::new` now requires
666    // `Arc<dyn AtomicRemoteBackend>`, so a non-CAS backend cannot even be
667    // wired into the constructor — the test is enforced at compile time
668    // (see tests/lease_atomic_http_opt_in.rs for the runtime-config branch).
669}