Skip to main content

reddb_server/cluster/
ownership_lease.rs

1//! Ownership leases and owner self-fence behavior (issue #997, PRD #987,
2//! ADR 0037).
3//!
4//! The [`ShardOwnershipCatalog`] (issue #989) records *who* owns a range and the
5//! [ownership-transition machine](super::ownership_transition) is the only
6//! sanctioned way to *move* that authority. But catalog ownership alone is not
7//! enough to make a durable write safe: a node that the catalog still names as
8//! owner may have been partitioned away from the Cluster Supervisor, so the rest
9//! of the cluster has already moved on without it being able to learn so. The
10//! ownership *lease* closes that gap.
11//!
12//! Per the glossary an **ownership lease** is *"time-bounded authority for a range
13//! owner to accept durable writes, issued under the current Cluster Supervisor
14//! term and ownership epoch. If the Supervisor loses majority, owners may continue
15//! only until their valid lease expires."* The lease is the owner's *positive*
16//! permission to write — without a currently-valid one it must stop, even if the
17//! catalog still names it owner and nothing has explicitly deposed it.
18//!
19//! ## What a lease binds together
20//!
21//! An [`OwnershipLease`] ties four identities together, matching ADR 0037's
22//! "expected term and ownership epoch" fencing inputs plus the range and owner the
23//! authority is *for*:
24//!
25//! * [`SupervisorTerm`] — the Cluster Supervisor term the lease was granted under.
26//!   When the Supervisor term advances (a new Supervisor leader), an old lease no
27//!   longer matches and the owner self-fences.
28//! * [`CollectionId`] + [`RangeId`] — the single range this authority covers. A
29//!   lease is per-range, exactly like [`RangeRole`].
30//! * `owner` ([`NodeIdentity`]) — the node the authority was issued to.
31//! * [`OwnershipEpoch`] — the ownership epoch in force when the lease was granted.
32//!   If ownership moves (the epoch bumps via a [transition](super::ownership_transition)),
33//!   the lease no longer matches the catalog and the owner self-fences.
34//!
35//! plus a `[granted_at_ms, expires_at_ms)` validity window — the *time-bounded*
36//! part. Time is passed in explicitly (`now_ms`) so the whole module stays a pure,
37//! deterministic data model with no clock I/O, just like its siblings.
38//!
39//! ## Self-fence and read-only mode
40//!
41//! [`LeasedOwner`] is the owner's local view of its own lease. It answers one
42//! question — *may I take a durable write right now?* — by [`evaluate`] against the
43//! current Supervisor term, the range's current ownership epoch, and the current
44//! time. The answer is an [`OwnerWriteMode`]: either [`Durable`] (a valid lease
45//! covers the write) or [`Fenced`] with the [`FenceReason`] that revoked authority.
46//! An owner self-fences — per the glossary's **owner self-fence** — when its lease
47//! *expires*, is *revoked*, or no longer matches the current *Supervisor term* or
48//! *ownership epoch*; it does not wait for clients to stop routing to it.
49//!
50//! A fenced owner is not dead: per the glossary's **self-fenced read mode** it
51//! *"may continue serving explicitly stale/read-only requests and replication
52//! catch-up, while rejecting durable writes until quorum/lease authority is
53//! restored."* [`admit_request`](LeasedOwner::admit_request) encodes exactly that —
54//! a [`DurableWrite`](RangeRequest::DurableWrite) is rejected once fenced, while a
55//! [`StaleRead`](RangeRequest::StaleRead) and
56//! [`ReplicationCatchUp`](RangeRequest::ReplicationCatchUp) are still served.
57//!
58//! ## Lease *in addition to* ownership
59//!
60//! [`admit_durable_write`] is the combined gate the public write path calls: it
61//! first routes the key and checks catalog ownership (the [`RangeRole`] gate from
62//! issue #990), then requires a valid lease on top. A node that is the catalog
63//! owner but holds no current lease is rejected — "durable writes require a valid
64//! current ownership lease *in addition to* matching range ownership".
65//!
66//! [`evaluate`]: LeasedOwner::evaluate
67//! [`Durable`]: OwnerWriteMode::Durable
68//! [`Fenced`]: OwnerWriteMode::Fenced
69
70use super::identity::NodeIdentity;
71use super::ownership::{
72    CollectionId, OwnershipEpoch, RangeId, RangeOwnership, RangeRole, ShardOwnershipCatalog,
73};
74
75/// The Cluster Supervisor term an ownership lease is granted under.
76///
77/// A lease is authority *"issued under the current Cluster Supervisor term"*: when
78/// a new Supervisor leader is elected the term advances, and a lease stamped with
79/// an older term no longer matches — its holder self-fences
80/// ([`FenceReason::TermSuperseded`]). This is the control-plane analogue of the
81/// replication term that fences a deposed primary (ADR 0030).
82#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
83pub struct SupervisorTerm(u64);
84
85impl SupervisorTerm {
86    /// The term a freshly-bootstrapped Supervisor starts at.
87    pub fn genesis() -> Self {
88        Self(1)
89    }
90
91    pub fn new(value: u64) -> Self {
92        Self(value)
93    }
94
95    pub fn value(self) -> u64 {
96        self.0
97    }
98
99    /// The next term, as minted when a new Supervisor leader is elected.
100    pub fn next(self) -> Self {
101        Self(self.0 + 1)
102    }
103}
104
105impl std::fmt::Display for SupervisorTerm {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        write!(f, "{}", self.0)
108    }
109}
110
111/// Time-bounded write authority for one range owner, issued under a Supervisor
112/// term and ownership epoch.
113///
114/// A lease is the owner's *positive* permission to take durable writes. It is
115/// per-range (it names its [`CollectionId`] + [`RangeId`]), bound to the owner it
116/// was issued to, and valid only on the `[granted_at_ms, expires_at_ms)` window
117/// and only while the Supervisor term and ownership epoch it carries still match
118/// the live cluster. The owner re-validates it on every durable write through
119/// [`LeasedOwner::evaluate`]; once any binding no longer holds, the owner
120/// self-fences.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct OwnershipLease {
123    supervisor_term: SupervisorTerm,
124    collection: CollectionId,
125    range_id: RangeId,
126    owner: NodeIdentity,
127    epoch: OwnershipEpoch,
128    granted_at_ms: u64,
129    expires_at_ms: u64,
130}
131
132impl OwnershipLease {
133    /// Grant a lease valid for `ttl_ms` from `granted_at_ms`, under
134    /// `supervisor_term` and ownership `epoch`, for `owner`'s authority over
135    /// `(collection, range_id)`.
136    #[allow(clippy::too_many_arguments)]
137    pub fn grant(
138        supervisor_term: SupervisorTerm,
139        collection: CollectionId,
140        range_id: RangeId,
141        owner: NodeIdentity,
142        epoch: OwnershipEpoch,
143        granted_at_ms: u64,
144        ttl_ms: u64,
145    ) -> Self {
146        Self {
147            supervisor_term,
148            collection,
149            range_id,
150            owner,
151            epoch,
152            granted_at_ms,
153            expires_at_ms: granted_at_ms.saturating_add(ttl_ms),
154        }
155    }
156
157    pub fn supervisor_term(&self) -> SupervisorTerm {
158        self.supervisor_term
159    }
160
161    pub fn collection(&self) -> &CollectionId {
162        &self.collection
163    }
164
165    pub fn range_id(&self) -> RangeId {
166        self.range_id
167    }
168
169    pub fn owner(&self) -> &NodeIdentity {
170        &self.owner
171    }
172
173    pub fn epoch(&self) -> OwnershipEpoch {
174        self.epoch
175    }
176
177    pub fn granted_at_ms(&self) -> u64 {
178        self.granted_at_ms
179    }
180
181    pub fn expires_at_ms(&self) -> u64 {
182        self.expires_at_ms
183    }
184
185    /// Has the lease's validity window closed at `now_ms`? The window is
186    /// half-open: the instant `now_ms == expires_at_ms` is already expired, so a
187    /// lease never grants authority at or past its stated end.
188    pub fn is_expired(&self, now_ms: u64) -> bool {
189        now_ms >= self.expires_at_ms
190    }
191
192    /// Milliseconds of authority left at `now_ms`, saturating to zero once
193    /// expired. The owner's keep-alive uses this to decide when to renew.
194    pub fn remaining_ms(&self, now_ms: u64) -> u64 {
195        self.expires_at_ms.saturating_sub(now_ms)
196    }
197
198    /// Does this lease cover the range `(collection, range_id)` for `owner`? A
199    /// lease is per-range and per-owner, so a held lease for a *different* range
200    /// or issued to a *different* owner does not authorise this one.
201    fn covers(&self, collection: &CollectionId, range_id: RangeId, owner: &NodeIdentity) -> bool {
202        self.collection == *collection && self.range_id == range_id && self.owner == *owner
203    }
204}
205
206/// Why a range owner is self-fenced — the cause that revoked its durable-write
207/// authority. Reported by [`LeasedOwner::evaluate`] and carried in every
208/// durable-write rejection so an operator (or the owner's own logs) can see
209/// *which* binding lapsed.
210#[derive(Debug, Clone, PartialEq, Eq)]
211pub enum FenceReason {
212    /// The owner holds no lease at all (never granted one, or it was dropped).
213    /// Catalog ownership without a lease is not authority to write.
214    Unleased,
215    /// The Supervisor explicitly revoked the lease before its window closed —
216    /// e.g. ahead of a planned ownership handoff.
217    Revoked,
218    /// The lease was granted under an older Supervisor term than the current one:
219    /// a new Supervisor leader has been elected, so the lease no longer matches.
220    TermSuperseded {
221        lease_term: SupervisorTerm,
222        current_term: SupervisorTerm,
223    },
224    /// The lease's ownership epoch no longer matches the range's current epoch:
225    /// ownership has moved via a transition, fencing this (now stale) owner.
226    EpochSuperseded {
227        lease_epoch: OwnershipEpoch,
228        current_epoch: OwnershipEpoch,
229    },
230    /// The lease's validity window has closed at the current time. With the
231    /// Supervisor partitioned away the owner cannot renew, so it stops writing
232    /// the moment the lease lapses.
233    Expired { now_ms: u64, expires_at_ms: u64 },
234}
235
236impl std::fmt::Display for FenceReason {
237    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238        match self {
239            Self::Unleased => write!(f, "owner holds no ownership lease"),
240            Self::Revoked => write!(f, "ownership lease was revoked"),
241            Self::TermSuperseded {
242                lease_term,
243                current_term,
244            } => write!(
245                f,
246                "ownership lease granted under supervisor term {lease_term} is behind current term {current_term}"
247            ),
248            Self::EpochSuperseded {
249                lease_epoch,
250                current_epoch,
251            } => write!(
252                f,
253                "ownership lease epoch {lease_epoch} no longer matches current ownership epoch {current_epoch}"
254            ),
255            Self::Expired {
256                now_ms,
257                expires_at_ms,
258            } => write!(
259                f,
260                "ownership lease expired at {expires_at_ms} ms (now {now_ms} ms)"
261            ),
262        }
263    }
264}
265
266impl std::error::Error for FenceReason {}
267
268/// The owner's durable-write authority after evaluating its lease.
269#[derive(Debug, Clone, PartialEq, Eq)]
270pub enum OwnerWriteMode {
271    /// A valid lease covers the write — durable writes are authorised.
272    Durable,
273    /// The owner is self-fenced: durable writes are rejected, but
274    /// [`self-fenced read mode`](LeasedOwner::admit_request) still serves stale
275    /// reads and replication catch-up. Carries the [`FenceReason`].
276    Fenced(FenceReason),
277}
278
279impl OwnerWriteMode {
280    /// Whether durable writes are authorised in this mode.
281    pub fn may_write_durable(&self) -> bool {
282        matches!(self, OwnerWriteMode::Durable)
283    }
284
285    /// Whether the owner is self-fenced.
286    pub fn is_fenced(&self) -> bool {
287        matches!(self, OwnerWriteMode::Fenced(_))
288    }
289}
290
291/// A request kind a (possibly self-fenced) range owner may be asked to serve.
292///
293/// The distinction drives [`self-fenced read mode`](LeasedOwner::admit_request):
294/// a fenced owner rejects [`DurableWrite`](Self::DurableWrite) but still serves
295/// [`StaleRead`](Self::StaleRead) and [`ReplicationCatchUp`](Self::ReplicationCatchUp).
296#[derive(Debug, Clone, Copy, PartialEq, Eq)]
297pub enum RangeRequest {
298    /// A durable mutation. Requires a currently-valid lease.
299    DurableWrite,
300    /// An explicitly stale / read-only request. Served even while self-fenced.
301    StaleRead,
302    /// Replication catch-up (a replica streaming the range's log forward).
303    /// Served even while self-fenced — it is the very mechanism by which the
304    /// member rejoins under a newer ownership epoch.
305    ReplicationCatchUp,
306}
307
308/// Why a request was refused while the owner is self-fenced.
309#[derive(Debug, Clone, PartialEq, Eq)]
310pub struct LeaseFenceRejection {
311    /// The fence cause that was in effect when the request was refused.
312    pub reason: FenceReason,
313}
314
315impl std::fmt::Display for LeaseFenceRejection {
316    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
317        write!(
318            f,
319            "durable write rejected: owner is self-fenced ({})",
320            self.reason
321        )
322    }
323}
324
325impl std::error::Error for LeaseFenceRejection {}
326
327/// A range owner's local view of its own ownership lease — the thing that decides
328/// whether it may take a durable write *right now*.
329///
330/// This is the home of owner self-fence behavior. It holds at most one lease (a
331/// lease is per-range, so one [`LeasedOwner`] tracks one owned range) plus a
332/// `revoked` flag the Supervisor can trip. [`evaluate`](Self::evaluate) folds the
333/// lease, the revoke flag, the current Supervisor term, the range's current
334/// ownership epoch, and the current time into an [`OwnerWriteMode`]; everything
335/// else is built on that one decision.
336#[derive(Debug, Clone, Default, PartialEq, Eq)]
337pub struct LeasedOwner {
338    lease: Option<OwnershipLease>,
339    revoked: bool,
340}
341
342impl LeasedOwner {
343    /// An owner holding no lease — self-fenced until one is granted.
344    pub fn unleased() -> Self {
345        Self {
346            lease: None,
347            revoked: false,
348        }
349    }
350
351    /// An owner holding `lease`.
352    pub fn with_lease(lease: OwnershipLease) -> Self {
353        Self {
354            lease: Some(lease),
355            revoked: false,
356        }
357    }
358
359    /// Install a freshly-granted (or renewed) lease, clearing any prior revoke.
360    /// Renewing is how the owner extends its window before the old one expires.
361    pub fn grant(&mut self, lease: OwnershipLease) {
362        self.lease = Some(lease);
363        self.revoked = false;
364    }
365
366    /// Revoke the current lease. The owner self-fences immediately on its next
367    /// [`evaluate`](Self::evaluate), without waiting for the window to close —
368    /// this is the Supervisor's explicit "stop writing now" ahead of a handoff.
369    pub fn revoke(&mut self) {
370        self.revoked = true;
371    }
372
373    /// The lease currently held, if any. `None` once revoked-and-dropped or never
374    /// granted; note a *held-but-invalid* lease (expired, stale term/epoch) still
375    /// returns `Some` here — validity is [`evaluate`](Self::evaluate)'s job.
376    pub fn lease(&self) -> Option<&OwnershipLease> {
377        self.lease.as_ref()
378    }
379
380    /// Decide the owner's durable-write authority against the current control
381    /// plane. The lease must be present and un-revoked, granted under the current
382    /// `current_term`, carry the range's current `current_epoch`, and still be
383    /// inside its validity window at `now_ms`. Any failure self-fences with the
384    /// corresponding [`FenceReason`].
385    ///
386    /// Checks run fail-closed in order of authority: an explicit revoke first,
387    /// then absence of a lease, then Supervisor-term supersession, then ownership
388    /// epoch supersession, then time expiry. The first cause that holds is the
389    /// one reported.
390    pub fn evaluate(
391        &self,
392        current_term: SupervisorTerm,
393        current_epoch: OwnershipEpoch,
394        now_ms: u64,
395    ) -> OwnerWriteMode {
396        if self.revoked {
397            return OwnerWriteMode::Fenced(FenceReason::Revoked);
398        }
399        let Some(lease) = &self.lease else {
400            return OwnerWriteMode::Fenced(FenceReason::Unleased);
401        };
402        if lease.supervisor_term != current_term {
403            return OwnerWriteMode::Fenced(FenceReason::TermSuperseded {
404                lease_term: lease.supervisor_term,
405                current_term,
406            });
407        }
408        if lease.epoch != current_epoch {
409            return OwnerWriteMode::Fenced(FenceReason::EpochSuperseded {
410                lease_epoch: lease.epoch,
411                current_epoch,
412            });
413        }
414        if lease.is_expired(now_ms) {
415            return OwnerWriteMode::Fenced(FenceReason::Expired {
416                now_ms,
417                expires_at_ms: lease.expires_at_ms,
418            });
419        }
420        OwnerWriteMode::Durable
421    }
422
423    /// Admit (or refuse) a request in light of the owner's current mode — the
424    /// encoding of **self-fenced read mode**. A [`DurableWrite`] needs a valid
425    /// lease; a [`StaleRead`] and [`ReplicationCatchUp`] are served regardless,
426    /// so a fenced owner keeps answering reads and catching up replicas while it
427    /// rejects durable writes.
428    ///
429    /// [`DurableWrite`]: RangeRequest::DurableWrite
430    /// [`StaleRead`]: RangeRequest::StaleRead
431    /// [`ReplicationCatchUp`]: RangeRequest::ReplicationCatchUp
432    pub fn admit_request(
433        &self,
434        request: RangeRequest,
435        current_term: SupervisorTerm,
436        current_epoch: OwnershipEpoch,
437        now_ms: u64,
438    ) -> Result<(), LeaseFenceRejection> {
439        match self.evaluate(current_term, current_epoch, now_ms) {
440            OwnerWriteMode::Durable => Ok(()),
441            OwnerWriteMode::Fenced(reason) => match request {
442                RangeRequest::StaleRead | RangeRequest::ReplicationCatchUp => Ok(()),
443                RangeRequest::DurableWrite => Err(LeaseFenceRejection { reason }),
444            },
445        }
446    }
447}
448
449/// Why a lease-gated durable write was rejected — either the catalog ownership
450/// gate refused it (routing / not-owner), or the owner is self-fenced.
451#[derive(Debug, Clone, PartialEq, Eq)]
452pub enum DurableWriteReject {
453    /// No range of the collection covers the routed key — re-resolve routing.
454    NoRange { collection: CollectionId },
455    /// This node is not the catalog owner of the routed range (it is a replica or
456    /// holds no copy). The write must be routed to `owner`.
457    NotOwner {
458        collection: CollectionId,
459        range_id: RangeId,
460        role: RangeRole,
461        owner: NodeIdentity,
462    },
463    /// This node *is* the catalog owner, but it is self-fenced: it holds no valid
464    /// lease for the range. Carries the [`FenceReason`].
465    Fenced {
466        collection: CollectionId,
467        range_id: RangeId,
468        reason: FenceReason,
469    },
470}
471
472impl std::fmt::Display for DurableWriteReject {
473    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474        match self {
475            Self::NoRange { collection } => write!(
476                f,
477                "no range of collection {collection} covers the routed key — re-resolve routing"
478            ),
479            Self::NotOwner {
480                collection,
481                range_id,
482                owner,
483                ..
484            } => write!(
485                f,
486                "this node does not own {collection}/{range_id} — route the durable write to {owner}"
487            ),
488            Self::Fenced {
489                collection,
490                range_id,
491                reason,
492            } => write!(
493                f,
494                "owner of {collection}/{range_id} is self-fenced and rejects the durable write: {reason}"
495            ),
496        }
497    }
498}
499
500impl std::error::Error for DurableWriteReject {}
501
502/// The combined durable-write gate: catalog ownership **and** a valid lease.
503///
504/// Routes `key` to its range, requires `node` to be the range's current
505/// [`Owner`](RangeRole::Owner) (the issue #990 gate), then requires `holder` to
506/// hold a lease that covers this range and is valid at `current_term` /
507/// `now_ms` against the range's current ownership epoch. On success returns the
508/// owned [`RangeOwnership`]; otherwise the [`DurableWriteReject`] explaining which
509/// layer refused.
510///
511/// This is the literal encoding of the acceptance criterion *"durable writes
512/// require a valid current ownership lease in addition to matching range
513/// ownership"*: catalog ownership is necessary but not sufficient, and the lease
514/// is checked against the catalog's *current* epoch, so an owner whose lease epoch
515/// has been superseded by a transition is fenced here too.
516pub fn admit_durable_write<'c>(
517    catalog: &'c ShardOwnershipCatalog,
518    holder: &LeasedOwner,
519    node: &NodeIdentity,
520    collection: &CollectionId,
521    key: &[u8],
522    current_term: SupervisorTerm,
523    now_ms: u64,
524) -> Result<&'c RangeOwnership, DurableWriteReject> {
525    let range =
526        catalog
527            .route_shard_key(collection, key)
528            .ok_or_else(|| DurableWriteReject::NoRange {
529                collection: collection.clone(),
530            })?;
531
532    let role = range.role_of(node);
533    if !role.may_write_public() {
534        return Err(DurableWriteReject::NotOwner {
535            collection: collection.clone(),
536            range_id: range.range_id(),
537            role,
538            owner: range.owner().clone(),
539        });
540    }
541
542    // The lease must cover *this* range and *this* owner; a held lease for a
543    // different range or owner does not authorise the write (treated as unleased
544    // for this range).
545    let covered = holder
546        .lease()
547        .is_some_and(|lease| lease.covers(collection, range.range_id(), node));
548
549    let mode = if covered {
550        holder.evaluate(current_term, range.epoch(), now_ms)
551    } else {
552        OwnerWriteMode::Fenced(FenceReason::Unleased)
553    };
554
555    match mode {
556        OwnerWriteMode::Durable => Ok(range),
557        OwnerWriteMode::Fenced(reason) => Err(DurableWriteReject::Fenced {
558            collection: collection.clone(),
559            range_id: range.range_id(),
560            reason,
561        }),
562    }
563}
564
565#[cfg(test)]
566mod tests {
567    use super::*;
568    use crate::cluster::ownership::{PlacementMetadata, RangeBounds, ShardKeyMode};
569
570    fn collection(name: &str) -> CollectionId {
571        CollectionId::new(name).unwrap()
572    }
573
574    fn ident(cn: &str) -> NodeIdentity {
575        NodeIdentity::from_certificate_subject(cn).unwrap()
576    }
577
578    /// A catalog holding one full-keyspace range owned by `owner` with `replicas`.
579    fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
580        let orders = collection("orders");
581        let mut catalog = ShardOwnershipCatalog::new();
582        catalog
583            .apply_update(RangeOwnership::establish(
584                orders.clone(),
585                RangeId::new(1),
586                ShardKeyMode::Hash,
587                RangeBounds::full(),
588                ident(owner),
589                replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
590                PlacementMetadata::with_replication_factor(3),
591            ))
592            .unwrap();
593        (catalog, orders)
594    }
595
596    /// The ownership epoch a single `transfer_to` advances past the initial one
597    /// (value 2) — obtained without the crate-private `OwnershipEpoch::next`.
598    fn next_epoch() -> OwnershipEpoch {
599        RangeOwnership::establish(
600            collection("orders"),
601            RangeId::new(1),
602            ShardKeyMode::Hash,
603            RangeBounds::full(),
604            ident("CN=node-a"),
605            [ident("CN=node-b")],
606            PlacementMetadata::with_replication_factor(3),
607        )
608        .transfer_to(ident("CN=node-b"), [])
609        .epoch()
610    }
611
612    /// A lease for `owner` over orders/1 under term 1, epoch 1, granted at t=0 for
613    /// `ttl_ms`.
614    fn lease_for(orders: &CollectionId, owner: &str, ttl_ms: u64) -> OwnershipLease {
615        OwnershipLease::grant(
616            SupervisorTerm::genesis(),
617            orders.clone(),
618            RangeId::new(1),
619            ident(owner),
620            OwnershipEpoch::initial(),
621            0,
622            ttl_ms,
623        )
624    }
625
626    // ---------------------------------------------------------------
627    // Lease validity window & accessors.
628    // ---------------------------------------------------------------
629
630    #[test]
631    fn lease_window_is_half_open() {
632        let orders = collection("orders");
633        let lease = lease_for(&orders, "CN=node-a", 1_000);
634        assert_eq!(lease.granted_at_ms(), 0);
635        assert_eq!(lease.expires_at_ms(), 1_000);
636        assert!(!lease.is_expired(0));
637        assert!(!lease.is_expired(999));
638        // The boundary instant is already expired — authority never extends to or
639        // past the stated end.
640        assert!(lease.is_expired(1_000));
641        assert!(lease.is_expired(1_001));
642        assert_eq!(lease.remaining_ms(250), 750);
643        assert_eq!(lease.remaining_ms(1_000), 0);
644        assert_eq!(lease.remaining_ms(5_000), 0);
645    }
646
647    #[test]
648    fn lease_binds_term_range_owner_and_epoch() {
649        let orders = collection("orders");
650        let lease = lease_for(&orders, "CN=node-a", 1_000);
651        assert_eq!(lease.supervisor_term(), SupervisorTerm::genesis());
652        assert_eq!(lease.collection(), &orders);
653        assert_eq!(lease.range_id(), RangeId::new(1));
654        assert_eq!(lease.owner(), &ident("CN=node-a"));
655        assert_eq!(lease.epoch(), OwnershipEpoch::initial());
656    }
657
658    // ---------------------------------------------------------------
659    // evaluate(): the self-fence decision.
660    // ---------------------------------------------------------------
661
662    #[test]
663    fn valid_lease_authorises_durable_writes() {
664        let orders = collection("orders");
665        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
666        let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 500);
667        assert_eq!(mode, OwnerWriteMode::Durable);
668        assert!(mode.may_write_durable());
669        assert!(!mode.is_fenced());
670    }
671
672    #[test]
673    fn unleased_owner_is_fenced() {
674        let owner = LeasedOwner::unleased();
675        let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 0);
676        assert_eq!(mode, OwnerWriteMode::Fenced(FenceReason::Unleased));
677    }
678
679    #[test]
680    fn expired_lease_self_fences() {
681        let orders = collection("orders");
682        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
683        // At t=1_500 the lease (window [0, 1_000)) has lapsed: the owner cannot
684        // renew (Supervisor unreachable) so it self-fences.
685        let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 1_500);
686        match mode {
687            OwnerWriteMode::Fenced(FenceReason::Expired {
688                now_ms,
689                expires_at_ms,
690            }) => {
691                assert_eq!(now_ms, 1_500);
692                assert_eq!(expires_at_ms, 1_000);
693            }
694            other => panic!("expected Expired fence, got {other:?}"),
695        }
696    }
697
698    #[test]
699    fn epoch_mismatch_self_fences() {
700        let orders = collection("orders");
701        // Lease granted under epoch 1, but ownership has since moved to epoch 2.
702        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
703        let current_epoch = next_epoch();
704        let mode = owner.evaluate(SupervisorTerm::genesis(), current_epoch, 500);
705        match mode {
706            OwnerWriteMode::Fenced(FenceReason::EpochSuperseded {
707                lease_epoch,
708                current_epoch: reported,
709            }) => {
710                assert_eq!(lease_epoch, OwnershipEpoch::initial());
711                assert_eq!(reported, current_epoch);
712            }
713            other => panic!("expected EpochSuperseded fence, got {other:?}"),
714        }
715    }
716
717    #[test]
718    fn supervisor_term_advance_self_fences() {
719        let orders = collection("orders");
720        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
721        // A new Supervisor leader bumped the term; the lease under the old term no
722        // longer matches even though it has not expired.
723        let current_term = SupervisorTerm::genesis().next();
724        let mode = owner.evaluate(current_term, OwnershipEpoch::initial(), 500);
725        match mode {
726            OwnerWriteMode::Fenced(FenceReason::TermSuperseded {
727                lease_term,
728                current_term: reported,
729            }) => {
730                assert_eq!(lease_term, SupervisorTerm::genesis());
731                assert_eq!(reported, current_term);
732            }
733            other => panic!("expected TermSuperseded fence, got {other:?}"),
734        }
735    }
736
737    #[test]
738    fn revoked_lease_self_fences_before_expiry() {
739        let orders = collection("orders");
740        let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
741        owner.revoke();
742        // Still inside the window and matching term/epoch, but explicitly revoked.
743        let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 100);
744        assert_eq!(mode, OwnerWriteMode::Fenced(FenceReason::Revoked));
745    }
746
747    #[test]
748    fn revoke_takes_precedence_over_other_causes() {
749        // Fail-closed ordering: an explicit revoke is reported even when the lease
750        // is also expired and on a stale term/epoch.
751        let orders = collection("orders");
752        let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
753        owner.revoke();
754        let mode = owner.evaluate(SupervisorTerm::genesis().next(), next_epoch(), 10_000);
755        assert_eq!(mode, OwnerWriteMode::Fenced(FenceReason::Revoked));
756    }
757
758    #[test]
759    fn renewing_a_lease_clears_a_prior_revoke_and_extends_window() {
760        let orders = collection("orders");
761        let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
762        owner.revoke();
763        assert!(owner
764            .evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 100)
765            .is_fenced());
766        // The Supervisor re-grants a fresh lease (e.g. a renewal at t=900 for
767        // another 1_000 ms): authority is restored.
768        owner.grant(OwnershipLease::grant(
769            SupervisorTerm::genesis(),
770            orders.clone(),
771            RangeId::new(1),
772            ident("CN=node-a"),
773            OwnershipEpoch::initial(),
774            900,
775            1_000,
776        ));
777        let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 1_500);
778        assert_eq!(mode, OwnerWriteMode::Durable);
779    }
780
781    // ---------------------------------------------------------------
782    // admit_request(): self-fenced read mode.
783    // ---------------------------------------------------------------
784
785    #[test]
786    fn valid_lease_admits_every_request_kind() {
787        let orders = collection("orders");
788        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
789        for req in [
790            RangeRequest::DurableWrite,
791            RangeRequest::StaleRead,
792            RangeRequest::ReplicationCatchUp,
793        ] {
794            assert!(owner
795                .admit_request(
796                    req,
797                    SupervisorTerm::genesis(),
798                    OwnershipEpoch::initial(),
799                    500
800                )
801                .is_ok());
802        }
803    }
804
805    #[test]
806    fn self_fenced_read_mode_serves_reads_and_catch_up_but_rejects_durable_writes() {
807        let orders = collection("orders");
808        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
809        // Past expiry: the owner is self-fenced.
810        let now = 2_000;
811        let term = SupervisorTerm::genesis();
812        let epoch = OwnershipEpoch::initial();
813
814        // Stale reads and replication catch-up are still served.
815        assert!(owner
816            .admit_request(RangeRequest::StaleRead, term, epoch, now)
817            .is_ok());
818        assert!(owner
819            .admit_request(RangeRequest::ReplicationCatchUp, term, epoch, now)
820            .is_ok());
821
822        // Durable writes are rejected with the fence reason.
823        let err = owner
824            .admit_request(RangeRequest::DurableWrite, term, epoch, now)
825            .unwrap_err();
826        assert!(matches!(err.reason, FenceReason::Expired { .. }));
827        assert!(err.to_string().contains("self-fenced"));
828    }
829
830    #[test]
831    fn unleased_owner_rejects_durable_write_but_still_catches_up() {
832        let owner = LeasedOwner::unleased();
833        let term = SupervisorTerm::genesis();
834        let epoch = OwnershipEpoch::initial();
835        assert_eq!(
836            owner
837                .admit_request(RangeRequest::DurableWrite, term, epoch, 0)
838                .unwrap_err()
839                .reason,
840            FenceReason::Unleased
841        );
842        // A brand-new member with no lease must still be allowed to catch up so it
843        // can eventually become a valid owner.
844        assert!(owner
845            .admit_request(RangeRequest::ReplicationCatchUp, term, epoch, 0)
846            .is_ok());
847    }
848
849    // ---------------------------------------------------------------
850    // admit_durable_write(): lease in addition to catalog ownership.
851    // ---------------------------------------------------------------
852
853    #[test]
854    fn durable_write_admitted_for_leased_owner() {
855        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
856        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
857        let range = admit_durable_write(
858            &catalog,
859            &owner,
860            &ident("CN=node-a"),
861            &orders,
862            b"k",
863            SupervisorTerm::genesis(),
864            500,
865        )
866        .expect("leased owner at current term/epoch may write");
867        assert_eq!(range.owner(), &ident("CN=node-a"));
868        assert_eq!(range.range_id(), RangeId::new(1));
869    }
870
871    #[test]
872    fn durable_write_rejected_for_catalog_owner_without_a_lease() {
873        // node-a IS the catalog owner, but holds no lease — ownership alone is not
874        // authority to write.
875        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
876        let owner = LeasedOwner::unleased();
877        let err = admit_durable_write(
878            &catalog,
879            &owner,
880            &ident("CN=node-a"),
881            &orders,
882            b"k",
883            SupervisorTerm::genesis(),
884            0,
885        )
886        .unwrap_err();
887        match err {
888            DurableWriteReject::Fenced { reason, .. } => assert_eq!(reason, FenceReason::Unleased),
889            other => panic!("expected Fenced(Unleased), got {other:?}"),
890        }
891    }
892
893    #[test]
894    fn durable_write_rejected_for_non_owner_before_lease_is_even_consulted() {
895        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
896        // node-b is a replica. Even if it somehow held a lease, the catalog
897        // ownership gate refuses it first.
898        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-b", 1_000));
899        let err = admit_durable_write(
900            &catalog,
901            &owner,
902            &ident("CN=node-b"),
903            &orders,
904            b"k",
905            SupervisorTerm::genesis(),
906            500,
907        )
908        .unwrap_err();
909        match err {
910            DurableWriteReject::NotOwner { role, owner, .. } => {
911                assert_eq!(role, RangeRole::Replica);
912                assert_eq!(owner, ident("CN=node-a"));
913            }
914            other => panic!("expected NotOwner, got {other:?}"),
915        }
916    }
917
918    #[test]
919    fn durable_write_rejected_when_no_range_covers_the_key() {
920        let catalog = ShardOwnershipCatalog::new();
921        let orders = collection("orders");
922        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
923        let err = admit_durable_write(
924            &catalog,
925            &owner,
926            &ident("CN=node-a"),
927            &orders,
928            b"k",
929            SupervisorTerm::genesis(),
930            500,
931        )
932        .unwrap_err();
933        assert!(matches!(err, DurableWriteReject::NoRange { .. }));
934    }
935
936    #[test]
937    fn durable_write_fenced_when_lease_epoch_trails_the_catalog() {
938        // The catalog moves ownership a -> b -> a, so the live epoch is 3, but
939        // node-a still holds its original epoch-1 lease. Catalog ownership matches
940        // (node-a is owner again) yet the stale lease epoch fences the write.
941        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
942        let stale_lease = lease_for(&orders, "CN=node-a", 100_000);
943
944        let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
945        let v2 = v1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
946        catalog.apply_update(v2.clone()).unwrap();
947        let v3 = v2.transfer_to(ident("CN=node-a"), [ident("CN=node-b")]);
948        catalog.apply_update(v3).unwrap();
949
950        let owner = LeasedOwner::with_lease(stale_lease);
951        let current_epoch = catalog.range(&orders, RangeId::new(1)).unwrap().epoch();
952        assert_eq!(current_epoch.value(), 3);
953
954        let err = admit_durable_write(
955            &catalog,
956            &owner,
957            &ident("CN=node-a"),
958            &orders,
959            b"k",
960            SupervisorTerm::genesis(),
961            500,
962        )
963        .unwrap_err();
964        match err {
965            DurableWriteReject::Fenced {
966                reason: FenceReason::EpochSuperseded { lease_epoch, .. },
967                ..
968            } => assert_eq!(lease_epoch, OwnershipEpoch::initial()),
969            other => panic!("expected Fenced(EpochSuperseded), got {other:?}"),
970        }
971    }
972
973    #[test]
974    fn durable_write_fenced_when_lease_is_for_a_different_range() {
975        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
976        // node-a is the owner of range 1, but its lease names range 2.
977        let wrong_range_lease = OwnershipLease::grant(
978            SupervisorTerm::genesis(),
979            orders.clone(),
980            RangeId::new(2),
981            ident("CN=node-a"),
982            OwnershipEpoch::initial(),
983            0,
984            1_000,
985        );
986        let owner = LeasedOwner::with_lease(wrong_range_lease);
987        let err = admit_durable_write(
988            &catalog,
989            &owner,
990            &ident("CN=node-a"),
991            &orders,
992            b"k",
993            SupervisorTerm::genesis(),
994            500,
995        )
996        .unwrap_err();
997        // A lease that does not cover this range is no authority for it.
998        match err {
999            DurableWriteReject::Fenced { reason, .. } => assert_eq!(reason, FenceReason::Unleased),
1000            other => panic!("expected Fenced(Unleased), got {other:?}"),
1001        }
1002    }
1003
1004    #[test]
1005    fn durable_write_rejected_after_self_fence_then_restored_on_renewal() {
1006        // End-to-end: a leased owner writes, its lease lapses (fenced), and a
1007        // renewal restores durable writes — the lease, not catalog ownership, is
1008        // the thing that gates here.
1009        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
1010        let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
1011        let term = SupervisorTerm::genesis();
1012
1013        // t=500: valid.
1014        assert!(admit_durable_write(
1015            &catalog,
1016            &owner,
1017            &ident("CN=node-a"),
1018            &orders,
1019            b"k",
1020            term,
1021            500
1022        )
1023        .is_ok());
1024        // t=2_000: lapsed -> fenced.
1025        let err = admit_durable_write(
1026            &catalog,
1027            &owner,
1028            &ident("CN=node-a"),
1029            &orders,
1030            b"k",
1031            term,
1032            2_000,
1033        )
1034        .unwrap_err();
1035        assert!(matches!(
1036            err,
1037            DurableWriteReject::Fenced {
1038                reason: FenceReason::Expired { .. },
1039                ..
1040            }
1041        ));
1042        // Renew under the same term/epoch from t=2_000: durable writes resume.
1043        owner.grant(OwnershipLease::grant(
1044            term,
1045            orders.clone(),
1046            RangeId::new(1),
1047            ident("CN=node-a"),
1048            OwnershipEpoch::initial(),
1049            2_000,
1050            1_000,
1051        ));
1052        assert!(admit_durable_write(
1053            &catalog,
1054            &owner,
1055            &ident("CN=node-a"),
1056            &orders,
1057            b"k",
1058            term,
1059            2_500
1060        )
1061        .is_ok());
1062    }
1063}