Skip to main content

reddb_server/cluster/
ownership_lease.rs

1//! Ownership leases and owner self-fence behavior (issue #997, PRD #987,
2//! ADR 0037).
3//!
4//! The [`ShardOwnershipCatalog`] (issue #989) records *who* owns a range and the
5//! [ownership-transition machine](super::ownership_transition) is the only
6//! sanctioned way to *move* that authority. But catalog ownership alone is not
7//! enough to make a durable write safe: a node that the catalog still names as
8//! owner may have been partitioned away from the Cluster Supervisor, so the rest
9//! of the cluster has already moved on without it being able to learn so. The
10//! ownership *lease* closes that gap.
11//!
12//! Per the glossary an **ownership lease** is *"time-bounded authority for a range
13//! owner to accept durable writes, issued under the current Cluster Supervisor
14//! term and ownership epoch. If the Supervisor loses majority, owners may continue
15//! only until their valid lease expires."* The lease is the owner's *positive*
16//! permission to write — without a currently-valid one it must stop, even if the
17//! catalog still names it owner and nothing has explicitly deposed it.
18//!
19//! ## What a lease binds together
20//!
21//! An [`OwnershipLease`] ties four identities together, matching ADR 0037's
22//! "expected term and ownership epoch" fencing inputs plus the range and owner the
23//! authority is *for*:
24//!
25//! * [`SupervisorTerm`] — the Cluster Supervisor term the lease was granted under.
26//!   When the Supervisor term advances (a new Supervisor leader), an old lease no
27//!   longer matches and the owner self-fences.
28//! * [`CollectionId`] + [`RangeId`] — the single range this authority covers. A
29//!   lease is per-range, exactly like [`RangeRole`].
30//! * `owner` ([`NodeIdentity`]) — the node the authority was issued to.
31//! * [`OwnershipEpoch`] — the ownership epoch in force when the lease was granted.
32//!   If ownership moves (the epoch bumps via a [transition](super::ownership_transition)),
33//!   the lease no longer matches the catalog and the owner self-fences.
34//!
35//! plus a `[granted_at_ms, expires_at_ms)` validity window — the *time-bounded*
36//! part. Time is passed in explicitly (`now_ms`) so the whole module stays a pure,
37//! deterministic data model with no clock I/O, just like its siblings.
38//!
39//! ## Self-fence and read-only mode
40//!
41//! [`LeasedOwner`] is the owner's local view of its own lease. It answers one
42//! question — *may I take a durable write right now?* — by [`evaluate`] against the
43//! current Supervisor term, the range's current ownership epoch, and the current
44//! time. The answer is an [`OwnerWriteMode`]: either [`Durable`] (a valid lease
45//! covers the write) or [`Fenced`] with the [`FenceReason`] that revoked authority.
46//! An owner self-fences — per the glossary's **owner self-fence** — when its lease
47//! *expires*, is *revoked*, or no longer matches the current *Supervisor term* or
48//! *ownership epoch*; it does not wait for clients to stop routing to it.
49//!
50//! A fenced owner is not dead: per the glossary's **self-fenced read mode** it
51//! *"may continue serving explicitly stale/read-only requests and replication
52//! catch-up, while rejecting durable writes until quorum/lease authority is
53//! restored."* [`admit_request`](LeasedOwner::admit_request) encodes exactly that —
54//! a [`DurableWrite`](RangeRequest::DurableWrite) is rejected once fenced, while a
55//! [`StaleRead`](RangeRequest::StaleRead) and
56//! [`ReplicationCatchUp`](RangeRequest::ReplicationCatchUp) are still served.
57//!
58//! ## Lease *in addition to* ownership
59//!
60//! [`admit_durable_write`] is the combined gate the public write path calls: it
61//! first routes the key and checks catalog ownership (the [`RangeRole`] gate from
62//! issue #990), then requires a valid lease on top. A node that is the catalog
63//! owner but holds no current lease is rejected — "durable writes require a valid
64//! current ownership lease *in addition to* matching range ownership".
65//!
66//! [`evaluate`]: LeasedOwner::evaluate
67//! [`Durable`]: OwnerWriteMode::Durable
68//! [`Fenced`]: OwnerWriteMode::Fenced
69
70use super::identity::NodeIdentity;
71use super::ownership::{
72    CollectionId, OwnershipEpoch, RangeId, RangeOwnership, RangeRole, ShardOwnershipCatalog,
73};
74
75/// The Cluster Supervisor term an ownership lease is granted under.
76///
77/// A lease is authority *"issued under the current Cluster Supervisor term"*: when
78/// a new Supervisor leader is elected the term advances, and a lease stamped with
79/// an older term no longer matches — its holder self-fences
80/// ([`FenceReason::TermSuperseded`]). This is the control-plane analogue of the
81/// replication term that fences a deposed primary (ADR 0030).
82#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
83pub struct SupervisorTerm(u64);
84
85impl SupervisorTerm {
86    /// The term a freshly-bootstrapped Supervisor starts at.
87    pub fn genesis() -> Self {
88        Self(1)
89    }
90
91    pub fn new(value: u64) -> Self {
92        Self(value)
93    }
94
95    pub fn value(self) -> u64 {
96        self.0
97    }
98
99    /// The next term, as minted when a new Supervisor leader is elected.
100    pub fn next(self) -> Self {
101        Self(self.0 + 1)
102    }
103}
104
105impl std::fmt::Display for SupervisorTerm {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        write!(f, "{}", self.0)
108    }
109}
110
111/// Time-bounded write authority for one range owner, issued under a Supervisor
112/// term and ownership epoch.
113///
114/// A lease is the owner's *positive* permission to take durable writes. It is
115/// per-range (it names its [`CollectionId`] + [`RangeId`]), bound to the owner it
116/// was issued to, and valid only on the `[granted_at_ms, expires_at_ms)` window
117/// and only while the Supervisor term and ownership epoch it carries still match
118/// the live cluster. The owner re-validates it on every durable write through
119/// [`LeasedOwner::evaluate`]; once any binding no longer holds, the owner
120/// self-fences.
121#[derive(Debug, Clone, PartialEq, Eq)]
122pub struct OwnershipLease {
123    supervisor_term: SupervisorTerm,
124    collection: CollectionId,
125    range_id: RangeId,
126    owner: NodeIdentity,
127    epoch: OwnershipEpoch,
128    granted_at_ms: u64,
129    expires_at_ms: u64,
130}
131
132impl OwnershipLease {
133    /// Grant a lease valid for `ttl_ms` from `granted_at_ms`, under
134    /// `supervisor_term` and ownership `epoch`, for `owner`'s authority over
135    /// `(collection, range_id)`.
136    #[allow(clippy::too_many_arguments)]
137    pub fn grant(
138        supervisor_term: SupervisorTerm,
139        collection: CollectionId,
140        range_id: RangeId,
141        owner: NodeIdentity,
142        epoch: OwnershipEpoch,
143        granted_at_ms: u64,
144        ttl_ms: u64,
145    ) -> Self {
146        Self {
147            supervisor_term,
148            collection,
149            range_id,
150            owner,
151            epoch,
152            granted_at_ms,
153            expires_at_ms: granted_at_ms.saturating_add(ttl_ms),
154        }
155    }
156
157    pub fn supervisor_term(&self) -> SupervisorTerm {
158        self.supervisor_term
159    }
160
161    pub fn collection(&self) -> &CollectionId {
162        &self.collection
163    }
164
165    pub fn range_id(&self) -> RangeId {
166        self.range_id
167    }
168
169    pub fn owner(&self) -> &NodeIdentity {
170        &self.owner
171    }
172
173    pub fn epoch(&self) -> OwnershipEpoch {
174        self.epoch
175    }
176
177    pub fn granted_at_ms(&self) -> u64 {
178        self.granted_at_ms
179    }
180
181    pub fn expires_at_ms(&self) -> u64 {
182        self.expires_at_ms
183    }
184
185    /// Has the lease's validity window closed at `now_ms`? The window is
186    /// half-open: the instant `now_ms == expires_at_ms` is already expired, so a
187    /// lease never grants authority at or past its stated end.
188    pub fn is_expired(&self, now_ms: u64) -> bool {
189        now_ms >= self.expires_at_ms
190    }
191
192    /// Milliseconds of authority left at `now_ms`, saturating to zero once
193    /// expired. The owner's keep-alive uses this to decide when to renew.
194    pub fn remaining_ms(&self, now_ms: u64) -> u64 {
195        self.expires_at_ms.saturating_sub(now_ms)
196    }
197
198    /// Does this lease cover the range `(collection, range_id)` for `owner`? A
199    /// lease is per-range and per-owner, so a held lease for a *different* range
200    /// or issued to a *different* owner does not authorise this one.
201    fn covers(&self, collection: &CollectionId, range_id: RangeId, owner: &NodeIdentity) -> bool {
202        self.collection == *collection && self.range_id == range_id && self.owner == *owner
203    }
204}
205
206/// Why a range owner is self-fenced — the cause that revoked its durable-write
207/// authority. Reported by [`LeasedOwner::evaluate`] and carried in every
208/// durable-write rejection so an operator (or the owner's own logs) can see
209/// *which* binding lapsed.
210#[derive(Debug, Clone, PartialEq, Eq)]
211pub enum FenceReason {
212    /// The owner holds no lease at all (never granted one, or it was dropped).
213    /// Catalog ownership without a lease is not authority to write.
214    Unleased,
215    /// The Supervisor explicitly revoked the lease before its window closed —
216    /// e.g. ahead of a planned ownership handoff.
217    Revoked,
218    /// The lease was granted under an older Supervisor term than the current one:
219    /// a new Supervisor leader has been elected, so the lease no longer matches.
220    TermSuperseded {
221        lease_term: SupervisorTerm,
222        current_term: SupervisorTerm,
223    },
224    /// The lease's ownership epoch no longer matches the range's current epoch:
225    /// ownership has moved via a transition, fencing this (now stale) owner.
226    EpochSuperseded {
227        lease_epoch: OwnershipEpoch,
228        current_epoch: OwnershipEpoch,
229    },
230    /// The lease's validity window has closed at the current time. With the
231    /// Supervisor partitioned away the owner cannot renew, so it stops writing
232    /// the moment the lease lapses.
233    Expired { now_ms: u64, expires_at_ms: u64 },
234}
235
236impl std::fmt::Display for FenceReason {
237    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
238        match self {
239            Self::Unleased => write!(f, "owner holds no ownership lease"),
240            Self::Revoked => write!(f, "ownership lease was revoked"),
241            Self::TermSuperseded {
242                lease_term,
243                current_term,
244            } => write!(
245                f,
246                "ownership lease granted under supervisor term {lease_term} is behind current term {current_term}"
247            ),
248            Self::EpochSuperseded {
249                lease_epoch,
250                current_epoch,
251            } => write!(
252                f,
253                "ownership lease epoch {lease_epoch} no longer matches current ownership epoch {current_epoch}"
254            ),
255            Self::Expired {
256                now_ms,
257                expires_at_ms,
258            } => write!(
259                f,
260                "ownership lease expired at {expires_at_ms} ms (now {now_ms} ms)"
261            ),
262        }
263    }
264}
265
266impl std::error::Error for FenceReason {}
267
268/// The owner's durable-write authority after evaluating its lease.
269#[derive(Debug, Clone, PartialEq, Eq)]
270pub enum OwnerWriteMode {
271    /// A valid lease covers the write — durable writes are authorised.
272    Durable,
273    /// The owner is self-fenced: durable writes are rejected, but
274    /// [`self-fenced read mode`](LeasedOwner::admit_request) still serves stale
275    /// reads and replication catch-up. Carries the [`FenceReason`].
276    Fenced(FenceReason),
277}
278
279impl OwnerWriteMode {
280    /// Whether durable writes are authorised in this mode.
281    pub fn may_write_durable(&self) -> bool {
282        matches!(self, OwnerWriteMode::Durable)
283    }
284
285    /// Whether the owner is self-fenced.
286    pub fn is_fenced(&self) -> bool {
287        matches!(self, OwnerWriteMode::Fenced(_))
288    }
289}
290
291/// A request kind a (possibly self-fenced) range owner may be asked to serve.
292///
293/// The distinction drives [`self-fenced read mode`](LeasedOwner::admit_request):
294/// a fenced owner rejects [`DurableWrite`](Self::DurableWrite) but still serves
295/// [`StaleRead`](Self::StaleRead) and [`ReplicationCatchUp`](Self::ReplicationCatchUp).
296#[derive(Debug, Clone, Copy, PartialEq, Eq)]
297pub enum RangeRequest {
298    /// A durable mutation. Requires a currently-valid lease.
299    DurableWrite,
300    /// An explicitly stale / read-only request. Served even while self-fenced.
301    StaleRead,
302    /// Replication catch-up (a replica streaming the range's log forward).
303    /// Served even while self-fenced — it is the very mechanism by which the
304    /// member rejoins under a newer ownership epoch.
305    ReplicationCatchUp,
306}
307
308/// Why a request was refused while the owner is self-fenced.
309#[derive(Debug, Clone, PartialEq, Eq)]
310pub struct LeaseFenceRejection {
311    /// The fence cause that was in effect when the request was refused.
312    pub reason: FenceReason,
313}
314
315impl std::fmt::Display for LeaseFenceRejection {
316    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
317        write!(
318            f,
319            "durable write rejected: owner is self-fenced ({})",
320            self.reason
321        )
322    }
323}
324
325impl std::error::Error for LeaseFenceRejection {}
326
327/// A range owner's local view of its own ownership lease — the thing that decides
328/// whether it may take a durable write *right now*.
329///
330/// This is the home of owner self-fence behavior. It holds at most one lease (a
331/// lease is per-range, so one [`LeasedOwner`] tracks one owned range) plus a
332/// `revoked` flag the Supervisor can trip. [`evaluate`](Self::evaluate) folds the
333/// lease, the revoke flag, the current Supervisor term, the range's current
334/// ownership epoch, and the current time into an [`OwnerWriteMode`]; everything
335/// else is built on that one decision.
336#[derive(Debug, Clone, Default, PartialEq, Eq)]
337pub struct LeasedOwner {
338    lease: Option<OwnershipLease>,
339    revoked: bool,
340}
341
342impl LeasedOwner {
343    /// An owner holding no lease — self-fenced until one is granted.
344    pub fn unleased() -> Self {
345        Self {
346            lease: None,
347            revoked: false,
348        }
349    }
350
351    /// An owner holding `lease`.
352    pub fn with_lease(lease: OwnershipLease) -> Self {
353        Self {
354            lease: Some(lease),
355            revoked: false,
356        }
357    }
358
359    /// Install a freshly-granted (or renewed) lease, clearing any prior revoke.
360    /// Renewing is how the owner extends its window before the old one expires.
361    pub fn grant(&mut self, lease: OwnershipLease) {
362        self.lease = Some(lease);
363        self.revoked = false;
364    }
365
366    /// Revoke the current lease. The owner self-fences immediately on its next
367    /// [`evaluate`](Self::evaluate), without waiting for the window to close —
368    /// this is the Supervisor's explicit "stop writing now" ahead of a handoff.
369    pub fn revoke(&mut self) {
370        self.revoked = true;
371    }
372
373    /// The lease currently held, if any. `None` once revoked-and-dropped or never
374    /// granted; note a *held-but-invalid* lease (expired, stale term/epoch) still
375    /// returns `Some` here — validity is [`evaluate`](Self::evaluate)'s job.
376    pub fn lease(&self) -> Option<&OwnershipLease> {
377        self.lease.as_ref()
378    }
379
380    /// Decide the owner's durable-write authority against the current control
381    /// plane. The lease must be present and un-revoked, granted under the current
382    /// `current_term`, carry the range's current `current_epoch`, and still be
383    /// inside its validity window at `now_ms`. Any failure self-fences with the
384    /// corresponding [`FenceReason`].
385    ///
386    /// Checks run fail-closed in order of authority: an explicit revoke first,
387    /// then absence of a lease, then Supervisor-term supersession, then ownership
388    /// epoch supersession, then time expiry. The first cause that holds is the
389    /// one reported.
390    pub fn evaluate(
391        &self,
392        current_term: SupervisorTerm,
393        current_epoch: OwnershipEpoch,
394        now_ms: u64,
395    ) -> OwnerWriteMode {
396        if self.revoked {
397            return OwnerWriteMode::Fenced(FenceReason::Revoked);
398        }
399        let Some(lease) = &self.lease else {
400            return OwnerWriteMode::Fenced(FenceReason::Unleased);
401        };
402        if lease.supervisor_term != current_term {
403            return OwnerWriteMode::Fenced(FenceReason::TermSuperseded {
404                lease_term: lease.supervisor_term,
405                current_term,
406            });
407        }
408        if lease.epoch != current_epoch {
409            return OwnerWriteMode::Fenced(FenceReason::EpochSuperseded {
410                lease_epoch: lease.epoch,
411                current_epoch,
412            });
413        }
414        if lease.is_expired(now_ms) {
415            return OwnerWriteMode::Fenced(FenceReason::Expired {
416                now_ms,
417                expires_at_ms: lease.expires_at_ms,
418            });
419        }
420        OwnerWriteMode::Durable
421    }
422
423    /// Admit (or refuse) a request in light of the owner's current mode — the
424    /// encoding of **self-fenced read mode**. A [`DurableWrite`] needs a valid
425    /// lease; a [`StaleRead`] and [`ReplicationCatchUp`] are served regardless,
426    /// so a fenced owner keeps answering reads and catching up replicas while it
427    /// rejects durable writes.
428    ///
429    /// [`DurableWrite`]: RangeRequest::DurableWrite
430    /// [`StaleRead`]: RangeRequest::StaleRead
431    /// [`ReplicationCatchUp`]: RangeRequest::ReplicationCatchUp
432    pub fn admit_request(
433        &self,
434        request: RangeRequest,
435        current_term: SupervisorTerm,
436        current_epoch: OwnershipEpoch,
437        now_ms: u64,
438    ) -> Result<(), LeaseFenceRejection> {
439        match self.evaluate(current_term, current_epoch, now_ms) {
440            OwnerWriteMode::Durable => Ok(()),
441            OwnerWriteMode::Fenced(reason) => match request {
442                RangeRequest::StaleRead | RangeRequest::ReplicationCatchUp => Ok(()),
443                RangeRequest::DurableWrite => Err(LeaseFenceRejection { reason }),
444            },
445        }
446    }
447}
448
449/// Why a lease-gated durable write was rejected — either the catalog ownership
450/// gate refused it (routing / not-owner), or the owner is self-fenced.
451#[derive(Debug, Clone, PartialEq, Eq)]
452pub enum DurableWriteReject {
453    /// No range of the collection covers the routed key — re-resolve routing.
454    NoRange { collection: CollectionId },
455    /// This node is not the catalog owner of the routed range (it is a replica or
456    /// holds no copy). The write must be routed to `owner`.
457    NotOwner {
458        collection: CollectionId,
459        range_id: RangeId,
460        role: RangeRole,
461        owner: NodeIdentity,
462    },
463    /// This node *is* the catalog owner, but it is self-fenced: it holds no valid
464    /// lease for the range. Carries the [`FenceReason`].
465    Fenced {
466        collection: CollectionId,
467        range_id: RangeId,
468        reason: FenceReason,
469    },
470}
471
472impl std::fmt::Display for DurableWriteReject {
473    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474        match self {
475            Self::NoRange { collection } => write!(
476                f,
477                "no range of collection {collection} covers the routed key — re-resolve routing"
478            ),
479            Self::NotOwner {
480                collection,
481                range_id,
482                owner,
483                ..
484            } => write!(
485                f,
486                "this node does not own {collection}/{range_id} — route the durable write to {owner}"
487            ),
488            Self::Fenced {
489                collection,
490                range_id,
491                reason,
492            } => write!(
493                f,
494                "owner of {collection}/{range_id} is self-fenced and rejects the durable write: {reason}"
495            ),
496        }
497    }
498}
499
500impl std::error::Error for DurableWriteReject {}
501
502/// The combined durable-write gate: catalog ownership **and** a valid lease.
503///
504/// Routes `key` to its range, requires `node` to be the range's current
505/// [`Owner`](RangeRole::Owner) (the issue #990 gate), then requires `holder` to
506/// hold a lease that covers this range and is valid at `current_term` /
507/// `now_ms` against the range's current ownership epoch. On success returns the
508/// owned [`RangeOwnership`]; otherwise the [`DurableWriteReject`] explaining which
509/// layer refused.
510///
511/// This is the literal encoding of the acceptance criterion *"durable writes
512/// require a valid current ownership lease in addition to matching range
513/// ownership"*: catalog ownership is necessary but not sufficient, and the lease
514/// is checked against the catalog's *current* epoch, so an owner whose lease epoch
515/// has been superseded by a transition is fenced here too.
516pub fn admit_durable_write<'c>(
517    catalog: &'c ShardOwnershipCatalog,
518    holder: &LeasedOwner,
519    node: &NodeIdentity,
520    collection: &CollectionId,
521    key: &[u8],
522    current_term: SupervisorTerm,
523    now_ms: u64,
524) -> Result<&'c RangeOwnership, DurableWriteReject> {
525    let range = catalog
526        .route(collection, key)
527        .ok_or_else(|| DurableWriteReject::NoRange {
528            collection: collection.clone(),
529        })?;
530
531    let role = range.role_of(node);
532    if !role.may_write_public() {
533        return Err(DurableWriteReject::NotOwner {
534            collection: collection.clone(),
535            range_id: range.range_id(),
536            role,
537            owner: range.owner().clone(),
538        });
539    }
540
541    // The lease must cover *this* range and *this* owner; a held lease for a
542    // different range or owner does not authorise the write (treated as unleased
543    // for this range).
544    let covered = holder
545        .lease()
546        .is_some_and(|lease| lease.covers(collection, range.range_id(), node));
547
548    let mode = if covered {
549        holder.evaluate(current_term, range.epoch(), now_ms)
550    } else {
551        OwnerWriteMode::Fenced(FenceReason::Unleased)
552    };
553
554    match mode {
555        OwnerWriteMode::Durable => Ok(range),
556        OwnerWriteMode::Fenced(reason) => Err(DurableWriteReject::Fenced {
557            collection: collection.clone(),
558            range_id: range.range_id(),
559            reason,
560        }),
561    }
562}
563
564#[cfg(test)]
565mod tests {
566    use super::*;
567    use crate::cluster::ownership::{PlacementMetadata, RangeBounds, ShardKeyMode};
568
569    fn collection(name: &str) -> CollectionId {
570        CollectionId::new(name).unwrap()
571    }
572
573    fn ident(cn: &str) -> NodeIdentity {
574        NodeIdentity::from_certificate_subject(cn).unwrap()
575    }
576
577    /// A catalog holding one full-keyspace range owned by `owner` with `replicas`.
578    fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
579        let orders = collection("orders");
580        let mut catalog = ShardOwnershipCatalog::new();
581        catalog
582            .apply_update(RangeOwnership::establish(
583                orders.clone(),
584                RangeId::new(1),
585                ShardKeyMode::Hash,
586                RangeBounds::full(),
587                ident(owner),
588                replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
589                PlacementMetadata::with_replication_factor(3),
590            ))
591            .unwrap();
592        (catalog, orders)
593    }
594
595    /// The ownership epoch a single `transfer_to` advances past the initial one
596    /// (value 2) — obtained without the crate-private `OwnershipEpoch::next`.
597    fn next_epoch() -> OwnershipEpoch {
598        RangeOwnership::establish(
599            collection("orders"),
600            RangeId::new(1),
601            ShardKeyMode::Hash,
602            RangeBounds::full(),
603            ident("CN=node-a"),
604            [ident("CN=node-b")],
605            PlacementMetadata::with_replication_factor(3),
606        )
607        .transfer_to(ident("CN=node-b"), [])
608        .epoch()
609    }
610
611    /// A lease for `owner` over orders/1 under term 1, epoch 1, granted at t=0 for
612    /// `ttl_ms`.
613    fn lease_for(orders: &CollectionId, owner: &str, ttl_ms: u64) -> OwnershipLease {
614        OwnershipLease::grant(
615            SupervisorTerm::genesis(),
616            orders.clone(),
617            RangeId::new(1),
618            ident(owner),
619            OwnershipEpoch::initial(),
620            0,
621            ttl_ms,
622        )
623    }
624
625    // ---------------------------------------------------------------
626    // Lease validity window & accessors.
627    // ---------------------------------------------------------------
628
629    #[test]
630    fn lease_window_is_half_open() {
631        let orders = collection("orders");
632        let lease = lease_for(&orders, "CN=node-a", 1_000);
633        assert_eq!(lease.granted_at_ms(), 0);
634        assert_eq!(lease.expires_at_ms(), 1_000);
635        assert!(!lease.is_expired(0));
636        assert!(!lease.is_expired(999));
637        // The boundary instant is already expired — authority never extends to or
638        // past the stated end.
639        assert!(lease.is_expired(1_000));
640        assert!(lease.is_expired(1_001));
641        assert_eq!(lease.remaining_ms(250), 750);
642        assert_eq!(lease.remaining_ms(1_000), 0);
643        assert_eq!(lease.remaining_ms(5_000), 0);
644    }
645
646    #[test]
647    fn lease_binds_term_range_owner_and_epoch() {
648        let orders = collection("orders");
649        let lease = lease_for(&orders, "CN=node-a", 1_000);
650        assert_eq!(lease.supervisor_term(), SupervisorTerm::genesis());
651        assert_eq!(lease.collection(), &orders);
652        assert_eq!(lease.range_id(), RangeId::new(1));
653        assert_eq!(lease.owner(), &ident("CN=node-a"));
654        assert_eq!(lease.epoch(), OwnershipEpoch::initial());
655    }
656
657    // ---------------------------------------------------------------
658    // evaluate(): the self-fence decision.
659    // ---------------------------------------------------------------
660
661    #[test]
662    fn valid_lease_authorises_durable_writes() {
663        let orders = collection("orders");
664        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
665        let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 500);
666        assert_eq!(mode, OwnerWriteMode::Durable);
667        assert!(mode.may_write_durable());
668        assert!(!mode.is_fenced());
669    }
670
671    #[test]
672    fn unleased_owner_is_fenced() {
673        let owner = LeasedOwner::unleased();
674        let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 0);
675        assert_eq!(mode, OwnerWriteMode::Fenced(FenceReason::Unleased));
676    }
677
678    #[test]
679    fn expired_lease_self_fences() {
680        let orders = collection("orders");
681        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
682        // At t=1_500 the lease (window [0, 1_000)) has lapsed: the owner cannot
683        // renew (Supervisor unreachable) so it self-fences.
684        let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 1_500);
685        match mode {
686            OwnerWriteMode::Fenced(FenceReason::Expired {
687                now_ms,
688                expires_at_ms,
689            }) => {
690                assert_eq!(now_ms, 1_500);
691                assert_eq!(expires_at_ms, 1_000);
692            }
693            other => panic!("expected Expired fence, got {other:?}"),
694        }
695    }
696
697    #[test]
698    fn epoch_mismatch_self_fences() {
699        let orders = collection("orders");
700        // Lease granted under epoch 1, but ownership has since moved to epoch 2.
701        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
702        let current_epoch = next_epoch();
703        let mode = owner.evaluate(SupervisorTerm::genesis(), current_epoch, 500);
704        match mode {
705            OwnerWriteMode::Fenced(FenceReason::EpochSuperseded {
706                lease_epoch,
707                current_epoch: reported,
708            }) => {
709                assert_eq!(lease_epoch, OwnershipEpoch::initial());
710                assert_eq!(reported, current_epoch);
711            }
712            other => panic!("expected EpochSuperseded fence, got {other:?}"),
713        }
714    }
715
716    #[test]
717    fn supervisor_term_advance_self_fences() {
718        let orders = collection("orders");
719        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
720        // A new Supervisor leader bumped the term; the lease under the old term no
721        // longer matches even though it has not expired.
722        let current_term = SupervisorTerm::genesis().next();
723        let mode = owner.evaluate(current_term, OwnershipEpoch::initial(), 500);
724        match mode {
725            OwnerWriteMode::Fenced(FenceReason::TermSuperseded {
726                lease_term,
727                current_term: reported,
728            }) => {
729                assert_eq!(lease_term, SupervisorTerm::genesis());
730                assert_eq!(reported, current_term);
731            }
732            other => panic!("expected TermSuperseded fence, got {other:?}"),
733        }
734    }
735
736    #[test]
737    fn revoked_lease_self_fences_before_expiry() {
738        let orders = collection("orders");
739        let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
740        owner.revoke();
741        // Still inside the window and matching term/epoch, but explicitly revoked.
742        let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 100);
743        assert_eq!(mode, OwnerWriteMode::Fenced(FenceReason::Revoked));
744    }
745
746    #[test]
747    fn revoke_takes_precedence_over_other_causes() {
748        // Fail-closed ordering: an explicit revoke is reported even when the lease
749        // is also expired and on a stale term/epoch.
750        let orders = collection("orders");
751        let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
752        owner.revoke();
753        let mode = owner.evaluate(SupervisorTerm::genesis().next(), next_epoch(), 10_000);
754        assert_eq!(mode, OwnerWriteMode::Fenced(FenceReason::Revoked));
755    }
756
757    #[test]
758    fn renewing_a_lease_clears_a_prior_revoke_and_extends_window() {
759        let orders = collection("orders");
760        let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
761        owner.revoke();
762        assert!(owner
763            .evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 100)
764            .is_fenced());
765        // The Supervisor re-grants a fresh lease (e.g. a renewal at t=900 for
766        // another 1_000 ms): authority is restored.
767        owner.grant(OwnershipLease::grant(
768            SupervisorTerm::genesis(),
769            orders.clone(),
770            RangeId::new(1),
771            ident("CN=node-a"),
772            OwnershipEpoch::initial(),
773            900,
774            1_000,
775        ));
776        let mode = owner.evaluate(SupervisorTerm::genesis(), OwnershipEpoch::initial(), 1_500);
777        assert_eq!(mode, OwnerWriteMode::Durable);
778    }
779
780    // ---------------------------------------------------------------
781    // admit_request(): self-fenced read mode.
782    // ---------------------------------------------------------------
783
784    #[test]
785    fn valid_lease_admits_every_request_kind() {
786        let orders = collection("orders");
787        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
788        for req in [
789            RangeRequest::DurableWrite,
790            RangeRequest::StaleRead,
791            RangeRequest::ReplicationCatchUp,
792        ] {
793            assert!(owner
794                .admit_request(
795                    req,
796                    SupervisorTerm::genesis(),
797                    OwnershipEpoch::initial(),
798                    500
799                )
800                .is_ok());
801        }
802    }
803
804    #[test]
805    fn self_fenced_read_mode_serves_reads_and_catch_up_but_rejects_durable_writes() {
806        let orders = collection("orders");
807        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
808        // Past expiry: the owner is self-fenced.
809        let now = 2_000;
810        let term = SupervisorTerm::genesis();
811        let epoch = OwnershipEpoch::initial();
812
813        // Stale reads and replication catch-up are still served.
814        assert!(owner
815            .admit_request(RangeRequest::StaleRead, term, epoch, now)
816            .is_ok());
817        assert!(owner
818            .admit_request(RangeRequest::ReplicationCatchUp, term, epoch, now)
819            .is_ok());
820
821        // Durable writes are rejected with the fence reason.
822        let err = owner
823            .admit_request(RangeRequest::DurableWrite, term, epoch, now)
824            .unwrap_err();
825        assert!(matches!(err.reason, FenceReason::Expired { .. }));
826        assert!(err.to_string().contains("self-fenced"));
827    }
828
829    #[test]
830    fn unleased_owner_rejects_durable_write_but_still_catches_up() {
831        let owner = LeasedOwner::unleased();
832        let term = SupervisorTerm::genesis();
833        let epoch = OwnershipEpoch::initial();
834        assert_eq!(
835            owner
836                .admit_request(RangeRequest::DurableWrite, term, epoch, 0)
837                .unwrap_err()
838                .reason,
839            FenceReason::Unleased
840        );
841        // A brand-new member with no lease must still be allowed to catch up so it
842        // can eventually become a valid owner.
843        assert!(owner
844            .admit_request(RangeRequest::ReplicationCatchUp, term, epoch, 0)
845            .is_ok());
846    }
847
848    // ---------------------------------------------------------------
849    // admit_durable_write(): lease in addition to catalog ownership.
850    // ---------------------------------------------------------------
851
852    #[test]
853    fn durable_write_admitted_for_leased_owner() {
854        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
855        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
856        let range = admit_durable_write(
857            &catalog,
858            &owner,
859            &ident("CN=node-a"),
860            &orders,
861            b"k",
862            SupervisorTerm::genesis(),
863            500,
864        )
865        .expect("leased owner at current term/epoch may write");
866        assert_eq!(range.owner(), &ident("CN=node-a"));
867        assert_eq!(range.range_id(), RangeId::new(1));
868    }
869
870    #[test]
871    fn durable_write_rejected_for_catalog_owner_without_a_lease() {
872        // node-a IS the catalog owner, but holds no lease — ownership alone is not
873        // authority to write.
874        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
875        let owner = LeasedOwner::unleased();
876        let err = admit_durable_write(
877            &catalog,
878            &owner,
879            &ident("CN=node-a"),
880            &orders,
881            b"k",
882            SupervisorTerm::genesis(),
883            0,
884        )
885        .unwrap_err();
886        match err {
887            DurableWriteReject::Fenced { reason, .. } => assert_eq!(reason, FenceReason::Unleased),
888            other => panic!("expected Fenced(Unleased), got {other:?}"),
889        }
890    }
891
892    #[test]
893    fn durable_write_rejected_for_non_owner_before_lease_is_even_consulted() {
894        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
895        // node-b is a replica. Even if it somehow held a lease, the catalog
896        // ownership gate refuses it first.
897        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-b", 1_000));
898        let err = admit_durable_write(
899            &catalog,
900            &owner,
901            &ident("CN=node-b"),
902            &orders,
903            b"k",
904            SupervisorTerm::genesis(),
905            500,
906        )
907        .unwrap_err();
908        match err {
909            DurableWriteReject::NotOwner { role, owner, .. } => {
910                assert_eq!(role, RangeRole::Replica);
911                assert_eq!(owner, ident("CN=node-a"));
912            }
913            other => panic!("expected NotOwner, got {other:?}"),
914        }
915    }
916
917    #[test]
918    fn durable_write_rejected_when_no_range_covers_the_key() {
919        let catalog = ShardOwnershipCatalog::new();
920        let orders = collection("orders");
921        let owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
922        let err = admit_durable_write(
923            &catalog,
924            &owner,
925            &ident("CN=node-a"),
926            &orders,
927            b"k",
928            SupervisorTerm::genesis(),
929            500,
930        )
931        .unwrap_err();
932        assert!(matches!(err, DurableWriteReject::NoRange { .. }));
933    }
934
935    #[test]
936    fn durable_write_fenced_when_lease_epoch_trails_the_catalog() {
937        // The catalog moves ownership a -> b -> a, so the live epoch is 3, but
938        // node-a still holds its original epoch-1 lease. Catalog ownership matches
939        // (node-a is owner again) yet the stale lease epoch fences the write.
940        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
941        let stale_lease = lease_for(&orders, "CN=node-a", 100_000);
942
943        let v1 = catalog.range(&orders, RangeId::new(1)).unwrap().clone();
944        let v2 = v1.transfer_to(ident("CN=node-b"), [ident("CN=node-a")]);
945        catalog.apply_update(v2.clone()).unwrap();
946        let v3 = v2.transfer_to(ident("CN=node-a"), [ident("CN=node-b")]);
947        catalog.apply_update(v3).unwrap();
948
949        let owner = LeasedOwner::with_lease(stale_lease);
950        let current_epoch = catalog.range(&orders, RangeId::new(1)).unwrap().epoch();
951        assert_eq!(current_epoch.value(), 3);
952
953        let err = admit_durable_write(
954            &catalog,
955            &owner,
956            &ident("CN=node-a"),
957            &orders,
958            b"k",
959            SupervisorTerm::genesis(),
960            500,
961        )
962        .unwrap_err();
963        match err {
964            DurableWriteReject::Fenced {
965                reason: FenceReason::EpochSuperseded { lease_epoch, .. },
966                ..
967            } => assert_eq!(lease_epoch, OwnershipEpoch::initial()),
968            other => panic!("expected Fenced(EpochSuperseded), got {other:?}"),
969        }
970    }
971
972    #[test]
973    fn durable_write_fenced_when_lease_is_for_a_different_range() {
974        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
975        // node-a is the owner of range 1, but its lease names range 2.
976        let wrong_range_lease = OwnershipLease::grant(
977            SupervisorTerm::genesis(),
978            orders.clone(),
979            RangeId::new(2),
980            ident("CN=node-a"),
981            OwnershipEpoch::initial(),
982            0,
983            1_000,
984        );
985        let owner = LeasedOwner::with_lease(wrong_range_lease);
986        let err = admit_durable_write(
987            &catalog,
988            &owner,
989            &ident("CN=node-a"),
990            &orders,
991            b"k",
992            SupervisorTerm::genesis(),
993            500,
994        )
995        .unwrap_err();
996        // A lease that does not cover this range is no authority for it.
997        match err {
998            DurableWriteReject::Fenced { reason, .. } => assert_eq!(reason, FenceReason::Unleased),
999            other => panic!("expected Fenced(Unleased), got {other:?}"),
1000        }
1001    }
1002
1003    #[test]
1004    fn durable_write_rejected_after_self_fence_then_restored_on_renewal() {
1005        // End-to-end: a leased owner writes, its lease lapses (fenced), and a
1006        // renewal restores durable writes — the lease, not catalog ownership, is
1007        // the thing that gates here.
1008        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
1009        let mut owner = LeasedOwner::with_lease(lease_for(&orders, "CN=node-a", 1_000));
1010        let term = SupervisorTerm::genesis();
1011
1012        // t=500: valid.
1013        assert!(admit_durable_write(
1014            &catalog,
1015            &owner,
1016            &ident("CN=node-a"),
1017            &orders,
1018            b"k",
1019            term,
1020            500
1021        )
1022        .is_ok());
1023        // t=2_000: lapsed -> fenced.
1024        let err = admit_durable_write(
1025            &catalog,
1026            &owner,
1027            &ident("CN=node-a"),
1028            &orders,
1029            b"k",
1030            term,
1031            2_000,
1032        )
1033        .unwrap_err();
1034        assert!(matches!(
1035            err,
1036            DurableWriteReject::Fenced {
1037                reason: FenceReason::Expired { .. },
1038                ..
1039            }
1040        ));
1041        // Renew under the same term/epoch from t=2_000: durable writes resume.
1042        owner.grant(OwnershipLease::grant(
1043            term,
1044            orders.clone(),
1045            RangeId::new(1),
1046            ident("CN=node-a"),
1047            OwnershipEpoch::initial(),
1048            2_000,
1049            1_000,
1050        ));
1051        assert!(admit_durable_write(
1052            &catalog,
1053            &owner,
1054            &ident("CN=node-a"),
1055            &orders,
1056            b"k",
1057            term,
1058            2_500
1059        )
1060        .is_ok());
1061    }
1062}