Skip to main content

reddb_server/cluster/
ownership_transition.rs

1//! Ownership transition state machine for promote and fenced handoff
2//! (issue #995, PRD #987, ADR 0037).
3//!
4//! [`ShardOwnershipCatalog`] (issue #989) is the source of truth for *where* a
5//! range lives; this module is the only sanctioned way to *change* that. Per
6//! ADR 0037, "ownership changes are transitions, not arbitrary row edits" and
7//! "rebalancing, failover, and administrative recovery become catalog
8//! transitions" — so failover-promote and move-range handoff both funnel through
9//! the one fenced, versioned, audited machine here.
10//!
11//! ## Why a state machine rather than a `transfer_to` call
12//!
13//! [`RangeOwnership::transfer_to`] already bumps the epoch and version, but it is
14//! an *unconditional* edit: hand it any owner and it produces a new entry. That
15//! is unsafe as a control-plane operation, because a transition that races a
16//! concurrent one, names the wrong current owner, or activates a candidate that
17//! has not caught up to the range commit watermark must **fail closed**, not
18//! silently move authority. This machine adds the compare-and-swap and the safety
19//! gate around that edit:
20//!
21//! 1. **Prepare** ([`prepare`]) — a pure check against the catalog. The request
22//!    must name the *expected current owner*, *expected epoch*, and *expected
23//!    catalog version* (a three-part CAS, so a stale planner loses), a *valid
24//!    target candidate* (a current replica of the range — only a replica that
25//!    covers the range commit watermark may be promoted, per the glossary), and
26//!    *safety evidence* that the candidate's applied log covers the range commit
27//!    watermark. Any failure yields a [`TransitionRejection`] and the catalog is
28//!    never touched.
29//! 2. **Activate** ([`PreparedTransition::activate`]) — only reachable *after*
30//!    prepare succeeds, this applies the fenced transition to the catalog. The
31//!    epoch bump in the new entry is what fences the old owner: from this point
32//!    its writes carry a stale epoch and [`admit_public_write`] rejects them.
33//!
34//! Splitting prepare from activate is the literal encoding of the acceptance
35//! criterion "activate new owners only after safety checks": you cannot obtain a
36//! [`PreparedTransition`] without passing every check, and activation is a
37//! distinct, explicit second step.
38//!
39//! ## Promote vs. handoff
40//!
41//! Both kinds run the identical safety gate — the difference is *intent and
42//! audit*, recorded in [`TransitionKind`]:
43//!
44//! * [`Promote`](TransitionKind::Promote) — failover. The recorded owner is gone
45//!   (or being deposed); a caught-up replica takes authority. The old owner is
46//!   fenced by the epoch bump.
47//! * [`Handoff`](TransitionKind::Handoff) — move-range cutover. The current owner
48//!   keeps serving until the target has copied and caught up; only then does this
49//!   transition move the epoch, fencing the old owner at the cutover instant.
50//!
51//! Forced (disaster-recovery) transitions, which may proceed without ordinary
52//! safety checks, are out of scope here: ADR 0037 reserves them for a separate
53//! `FORCE` capability path, implemented in
54//! [`ownership_force`](super::ownership_force).
55//!
56//! Everything is a pure data model over the catalog, with no I/O, so the CAS,
57//! fencing, and audit story is exercised deterministically.
58
59use super::identity::NodeIdentity;
60use super::ownership::{
61    CatalogError, CatalogVersion, CollectionId, OwnershipEpoch, RangeId, RangeOwnership,
62    ShardOwnershipCatalog,
63};
64
65/// The highest `(term, lsn)` known durable for a range under its commit policy
66/// — the *range commit watermark*. Per the glossary, "failover and interrupted
67/// move-range recovery may promote only a candidate whose log covers this
68/// watermark", so it is the bar a transition's safety evidence must clear.
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub struct CommitWatermark {
71    /// The owning term at the watermark. A candidate on an older term has not
72    /// observed the latest authority and cannot be promoted.
73    pub term: u64,
74    /// The highest durable WAL LSN for the range.
75    pub lsn: u64,
76}
77
78impl CommitWatermark {
79    pub fn new(term: u64, lsn: u64) -> Self {
80        Self { term, lsn }
81    }
82}
83
84/// Evidence that a candidate has caught up enough to take ownership safely: the
85/// `(term, lsn)` its log has durably applied **for the range**. The supervisor
86/// collects this from the candidate's per-range stream progress (issue #992)
87/// before requesting a transition; the machine admits the candidate only if this
88/// covers the [`CommitWatermark`].
89#[derive(Debug, Clone, PartialEq, Eq)]
90pub struct CatchUpEvidence {
91    /// The node this evidence describes. Must match the transition target, so a
92    /// planner cannot present one node's progress to promote another.
93    pub candidate: NodeIdentity,
94    /// The highest term the candidate has applied for the range.
95    pub applied_term: u64,
96    /// The highest WAL LSN the candidate has applied for the range.
97    pub applied_lsn: u64,
98}
99
100impl CatchUpEvidence {
101    pub fn new(candidate: NodeIdentity, applied_term: u64, applied_lsn: u64) -> Self {
102        Self {
103            candidate,
104            applied_term,
105            applied_lsn,
106        }
107    }
108
109    /// Does this evidence cover the watermark? The candidate must be on at least
110    /// the watermark term **and**, on that term, have applied at least its LSN. A
111    /// candidate behind on either axis is fenced out of promotion.
112    pub fn covers(&self, watermark: CommitWatermark) -> bool {
113        self.applied_term > watermark.term
114            || (self.applied_term == watermark.term && self.applied_lsn >= watermark.lsn)
115    }
116}
117
118/// Whether a transition is a failover promote or a move-range handoff. Both run
119/// the same safety gate; the kind is recorded for the audit trail and intent.
120#[derive(Debug, Clone, Copy, PartialEq, Eq)]
121pub enum TransitionKind {
122    /// Failover promote: a caught-up replica takes authority from a gone/deposed
123    /// owner.
124    Promote,
125    /// Move-range fenced handoff: authority cuts over from a live owner to a
126    /// caught-up target.
127    Handoff,
128}
129
130impl TransitionKind {
131    fn label(self) -> &'static str {
132        match self {
133            TransitionKind::Promote => "promote",
134            TransitionKind::Handoff => "handoff",
135        }
136    }
137}
138
139/// A request to move ownership of one range. Carries the three-part CAS
140/// (expected owner / epoch / catalog version), the target candidate, the range
141/// commit watermark the candidate must cover, and the candidate's catch-up
142/// evidence. Built with [`TransitionRequest::new`]; the replica set the new owner
143/// will carry defaults to empty and is set with
144/// [`with_replicas`](TransitionRequest::with_replicas).
145#[derive(Debug, Clone, PartialEq, Eq)]
146pub struct TransitionRequest {
147    kind: TransitionKind,
148    collection: CollectionId,
149    range_id: RangeId,
150    expected_owner: NodeIdentity,
151    expected_epoch: OwnershipEpoch,
152    expected_version: CatalogVersion,
153    target: NodeIdentity,
154    watermark: CommitWatermark,
155    evidence: Option<CatchUpEvidence>,
156    new_replicas: Vec<NodeIdentity>,
157}
158
159impl TransitionRequest {
160    /// A transition request with no safety evidence yet and an empty post-cutover
161    /// replica set. Evidence must be attached with
162    /// [`with_evidence`](Self::with_evidence) before the transition can be
163    /// admitted — a request without it fails closed
164    /// ([`MissingSafetyEvidence`](TransitionRejection::MissingSafetyEvidence)).
165    #[allow(clippy::too_many_arguments)]
166    pub fn new(
167        kind: TransitionKind,
168        collection: CollectionId,
169        range_id: RangeId,
170        expected_owner: NodeIdentity,
171        expected_epoch: OwnershipEpoch,
172        expected_version: CatalogVersion,
173        target: NodeIdentity,
174        watermark: CommitWatermark,
175    ) -> Self {
176        Self {
177            kind,
178            collection,
179            range_id,
180            expected_owner,
181            expected_epoch,
182            expected_version,
183            target,
184            watermark,
185            evidence: None,
186            new_replicas: Vec::new(),
187        }
188    }
189
190    /// Attach the candidate's catch-up evidence for the safety gate.
191    pub fn with_evidence(mut self, evidence: CatchUpEvidence) -> Self {
192        self.evidence = Some(evidence);
193        self
194    }
195
196    /// Set the replica set the new owner will carry after cutover. Defaults to
197    /// empty. A handoff that demotes the old owner to a replica passes it here.
198    pub fn with_replicas(mut self, replicas: impl IntoIterator<Item = NodeIdentity>) -> Self {
199        self.new_replicas = replicas.into_iter().collect();
200        self
201    }
202
203    pub fn kind(&self) -> TransitionKind {
204        self.kind
205    }
206
207    pub fn collection(&self) -> &CollectionId {
208        &self.collection
209    }
210
211    pub fn range_id(&self) -> RangeId {
212        self.range_id
213    }
214
215    pub fn target(&self) -> &NodeIdentity {
216        &self.target
217    }
218}
219
220/// Why a target candidate is not eligible to take ownership.
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
222pub enum InvalidCandidateReason {
223    /// The target is not a current replica of the range. Only a replica that has
224    /// been receiving the range's stream can cover the commit watermark, so an
225    /// arbitrary node is never a valid promotion target.
226    NotAReplica,
227    /// The target is already the current owner — a transition to the incumbent is
228    /// a no-op and almost always a planner bug, so it is rejected.
229    AlreadyOwner,
230}
231
232impl InvalidCandidateReason {
233    fn label(self) -> &'static str {
234        match self {
235            InvalidCandidateReason::NotAReplica => "candidate is not a replica of the range",
236            InvalidCandidateReason::AlreadyOwner => "candidate is already the current owner",
237        }
238    }
239}
240
241/// Why an ownership transition was refused. Every variant leaves the catalog
242/// untouched — transitions fail closed.
243#[derive(Debug, Clone, PartialEq, Eq)]
244pub enum TransitionRejection {
245    /// No range with this `(collection, range_id)` exists in the catalog.
246    UnknownRange {
247        collection: CollectionId,
248        range_id: RangeId,
249    },
250    /// The request's expected current owner does not match the catalog. The
251    /// planner is working from a stale view of who owns the range.
252    OwnerMismatch {
253        collection: CollectionId,
254        range_id: RangeId,
255        expected: NodeIdentity,
256        current: NodeIdentity,
257    },
258    /// The request's expected ownership epoch does not match the catalog —
259    /// authority has already moved since the planner read it.
260    StaleEpoch {
261        collection: CollectionId,
262        range_id: RangeId,
263        expected: OwnershipEpoch,
264        current: OwnershipEpoch,
265    },
266    /// The request's expected catalog version does not match the catalog — the
267    /// entry has been edited since the planner read it (CAS failure).
268    StaleCatalogVersion {
269        collection: CollectionId,
270        range_id: RangeId,
271        expected: CatalogVersion,
272        current: CatalogVersion,
273    },
274    /// The target candidate is not eligible to take ownership.
275    InvalidCandidate {
276        collection: CollectionId,
277        range_id: RangeId,
278        candidate: NodeIdentity,
279        reason: InvalidCandidateReason,
280    },
281    /// No safety evidence was supplied for the candidate, so the safety gate
282    /// cannot be evaluated — fail closed.
283    MissingSafetyEvidence {
284        collection: CollectionId,
285        range_id: RangeId,
286        candidate: NodeIdentity,
287    },
288    /// Safety evidence was supplied but describes a different node than the
289    /// target — it cannot vouch for the candidate being promoted.
290    EvidenceForWrongCandidate {
291        collection: CollectionId,
292        range_id: RangeId,
293        target: NodeIdentity,
294        evidence_for: NodeIdentity,
295    },
296    /// The candidate's applied log does not cover the range commit watermark —
297    /// promoting it could lose committed writes, so the transition is refused.
298    SafetyCheckFailed {
299        collection: CollectionId,
300        range_id: RangeId,
301        candidate: NodeIdentity,
302        watermark: CommitWatermark,
303        applied_term: u64,
304        applied_lsn: u64,
305    },
306}
307
308impl std::fmt::Display for TransitionRejection {
309    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
310        match self {
311            Self::UnknownRange {
312                collection,
313                range_id,
314            } => write!(f, "no range {collection}/{range_id} in the catalog"),
315            Self::OwnerMismatch {
316                collection,
317                range_id,
318                expected,
319                current,
320            } => write!(
321                f,
322                "ownership transition for {collection}/{range_id} expected current owner {expected}, but catalog owner is {current}"
323            ),
324            Self::StaleEpoch {
325                collection,
326                range_id,
327                expected,
328                current,
329            } => write!(
330                f,
331                "ownership transition for {collection}/{range_id} expected epoch {expected}, but catalog epoch is {current}"
332            ),
333            Self::StaleCatalogVersion {
334                collection,
335                range_id,
336                expected,
337                current,
338            } => write!(
339                f,
340                "ownership transition for {collection}/{range_id} expected catalog version {expected}, but catalog version is {current}"
341            ),
342            Self::InvalidCandidate {
343                collection,
344                range_id,
345                candidate,
346                reason,
347            } => write!(
348                f,
349                "invalid ownership transition candidate {candidate} for {collection}/{range_id}: {}",
350                reason.label()
351            ),
352            Self::MissingSafetyEvidence {
353                collection,
354                range_id,
355                candidate,
356            } => write!(
357                f,
358                "ownership transition for {collection}/{range_id} carries no safety evidence for candidate {candidate}"
359            ),
360            Self::EvidenceForWrongCandidate {
361                collection,
362                range_id,
363                target,
364                evidence_for,
365            } => write!(
366                f,
367                "ownership transition for {collection}/{range_id} targets {target} but its safety evidence describes {evidence_for}"
368            ),
369            Self::SafetyCheckFailed {
370                collection,
371                range_id,
372                candidate,
373                watermark,
374                applied_term,
375                applied_lsn,
376            } => write!(
377                f,
378                "candidate {candidate} for {collection}/{range_id} is behind the commit watermark (term {}, lsn {}): applied term {applied_term}, lsn {applied_lsn}",
379                watermark.term, watermark.lsn
380            ),
381        }
382    }
383}
384
385impl std::error::Error for TransitionRejection {}
386
387/// A validated, not-yet-applied ownership transition. Holding one is proof that
388/// every CAS and safety check passed; the only thing left is to
389/// [`activate`](Self::activate) it against the catalog. It carries the full
390/// before/after picture so it doubles as the audit record source.
391#[derive(Debug, Clone, PartialEq, Eq)]
392pub struct PreparedTransition {
393    kind: TransitionKind,
394    collection: CollectionId,
395    range_id: RangeId,
396    previous_owner: NodeIdentity,
397    new_owner: NodeIdentity,
398    previous_epoch: OwnershipEpoch,
399    previous_version: CatalogVersion,
400    watermark: CommitWatermark,
401    next: RangeOwnership,
402}
403
404impl PreparedTransition {
405    /// The new ownership entry that activation will install — epoch and version
406    /// already advanced past the current entry.
407    pub fn next_entry(&self) -> &RangeOwnership {
408        &self.next
409    }
410
411    /// The ownership epoch the new owner becomes authoritative under. Any write
412    /// the old owner attempts under the previous epoch is fenced once this is
413    /// installed.
414    pub fn new_epoch(&self) -> OwnershipEpoch {
415        self.next.epoch()
416    }
417
418    /// Apply the transition to the catalog, making the new owner authoritative
419    /// and fencing the old owner via the epoch bump. Returns the audit-ready
420    /// [`TransitionOutcome`]. Errors only on a catalog-level inconsistency
421    /// (e.g. the entry changed between prepare and activate so the version no
422    /// longer strictly advances) — the safety decision itself was already made.
423    pub fn activate(
424        self,
425        catalog: &mut ShardOwnershipCatalog,
426    ) -> Result<TransitionOutcome, CatalogError> {
427        let new_epoch = self.next.epoch();
428        let new_version = self.next.version();
429        catalog.apply_update(self.next)?;
430        Ok(TransitionOutcome {
431            kind: self.kind,
432            collection: self.collection,
433            range_id: self.range_id,
434            previous_owner: self.previous_owner,
435            new_owner: self.new_owner,
436            previous_epoch: self.previous_epoch,
437            new_epoch,
438            previous_version: self.previous_version,
439            new_version,
440            watermark: self.watermark,
441        })
442    }
443}
444
445/// The audit-ready record of an activated ownership transition. Every field a
446/// reviewer needs to reconstruct *what moved, from whom to whom, and across which
447/// epoch/version boundary* — the fenced before/after the ADR's audit requirement
448/// asks for.
449#[derive(Debug, Clone, PartialEq, Eq)]
450pub struct TransitionOutcome {
451    pub kind: TransitionKind,
452    pub collection: CollectionId,
453    pub range_id: RangeId,
454    pub previous_owner: NodeIdentity,
455    pub new_owner: NodeIdentity,
456    pub previous_epoch: OwnershipEpoch,
457    pub new_epoch: OwnershipEpoch,
458    pub previous_version: CatalogVersion,
459    pub new_version: CatalogVersion,
460    pub watermark: CommitWatermark,
461}
462
463impl TransitionOutcome {
464    /// Whether the epoch advanced — true for every accepted transition, since
465    /// moving write authority always fences the old owner. A handy invariant for
466    /// audit assertions.
467    pub fn fenced_old_owner(&self) -> bool {
468        self.new_epoch > self.previous_epoch
469    }
470}
471
472impl std::fmt::Display for TransitionOutcome {
473    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
474        write!(
475            f,
476            "{} {}/{}: {} (epoch {}, version {}) -> {} (epoch {}, version {}) over watermark term {} lsn {}",
477            self.kind.label(),
478            self.collection,
479            self.range_id,
480            self.previous_owner,
481            self.previous_epoch,
482            self.previous_version,
483            self.new_owner,
484            self.new_epoch,
485            self.new_version,
486            self.watermark.term,
487            self.watermark.lsn,
488        )
489    }
490}
491
492/// Validate `request` against `catalog` without mutating anything. On success
493/// returns a [`PreparedTransition`] ready to [`activate`]; on any failed CAS or
494/// safety check returns a [`TransitionRejection`] and leaves the catalog
495/// untouched.
496///
497/// Checks run in fail-closed order: the range must exist; the expected owner,
498/// epoch, and catalog version must all match (CAS); the target must be a current
499/// replica that is not already the owner; and the candidate's safety evidence
500/// must be present, describe the target, and cover the commit watermark.
501///
502/// [`activate`]: PreparedTransition::activate
503pub fn prepare(
504    catalog: &ShardOwnershipCatalog,
505    request: &TransitionRequest,
506) -> Result<PreparedTransition, TransitionRejection> {
507    let current = catalog.range(&request.collection, request.range_id).ok_or(
508        TransitionRejection::UnknownRange {
509            collection: request.collection.clone(),
510            range_id: request.range_id,
511        },
512    )?;
513
514    // Three-part compare-and-swap: a planner working from a stale catalog view
515    // loses on whichever axis drifted first.
516    if *current.owner() != request.expected_owner {
517        return Err(TransitionRejection::OwnerMismatch {
518            collection: request.collection.clone(),
519            range_id: request.range_id,
520            expected: request.expected_owner.clone(),
521            current: current.owner().clone(),
522        });
523    }
524    if current.epoch() != request.expected_epoch {
525        return Err(TransitionRejection::StaleEpoch {
526            collection: request.collection.clone(),
527            range_id: request.range_id,
528            expected: request.expected_epoch,
529            current: current.epoch(),
530        });
531    }
532    if current.version() != request.expected_version {
533        return Err(TransitionRejection::StaleCatalogVersion {
534            collection: request.collection.clone(),
535            range_id: request.range_id,
536            expected: request.expected_version,
537            current: current.version(),
538        });
539    }
540
541    // Candidate eligibility: a valid target is a current replica of the range
542    // and not the incumbent owner.
543    if request.target == *current.owner() {
544        return Err(TransitionRejection::InvalidCandidate {
545            collection: request.collection.clone(),
546            range_id: request.range_id,
547            candidate: request.target.clone(),
548            reason: InvalidCandidateReason::AlreadyOwner,
549        });
550    }
551    if !current.replicas().contains(&request.target) {
552        return Err(TransitionRejection::InvalidCandidate {
553            collection: request.collection.clone(),
554            range_id: request.range_id,
555            candidate: request.target.clone(),
556            reason: InvalidCandidateReason::NotAReplica,
557        });
558    }
559
560    // Safety gate: evidence must exist, vouch for the target, and cover the
561    // range commit watermark.
562    let evidence =
563        request
564            .evidence
565            .as_ref()
566            .ok_or_else(|| TransitionRejection::MissingSafetyEvidence {
567                collection: request.collection.clone(),
568                range_id: request.range_id,
569                candidate: request.target.clone(),
570            })?;
571    if evidence.candidate != request.target {
572        return Err(TransitionRejection::EvidenceForWrongCandidate {
573            collection: request.collection.clone(),
574            range_id: request.range_id,
575            target: request.target.clone(),
576            evidence_for: evidence.candidate.clone(),
577        });
578    }
579    if !evidence.covers(request.watermark) {
580        return Err(TransitionRejection::SafetyCheckFailed {
581            collection: request.collection.clone(),
582            range_id: request.range_id,
583            candidate: request.target.clone(),
584            watermark: request.watermark,
585            applied_term: evidence.applied_term,
586            applied_lsn: evidence.applied_lsn,
587        });
588    }
589
590    // All checks passed — build the fenced transition (epoch + version bumped).
591    let next = current.transfer_to(request.target.clone(), request.new_replicas.clone());
592    Ok(PreparedTransition {
593        kind: request.kind,
594        collection: request.collection.clone(),
595        range_id: request.range_id,
596        previous_owner: current.owner().clone(),
597        new_owner: request.target.clone(),
598        previous_epoch: current.epoch(),
599        previous_version: current.version(),
600        watermark: request.watermark,
601        next,
602    })
603}
604
605/// Prepare and activate a transition in one step — the common path when the
606/// caller does not need to inspect the [`PreparedTransition`] between the safety
607/// check and the catalog write. A [`TransitionRejection`] from prepare is mapped
608/// into the returned error.
609pub fn run_transition(
610    catalog: &mut ShardOwnershipCatalog,
611    request: &TransitionRequest,
612) -> Result<TransitionOutcome, TransitionError> {
613    let prepared = prepare(catalog, request)?;
614    prepared.activate(catalog).map_err(TransitionError::Catalog)
615}
616
617/// The error of an end-to-end [`run_transition`]: either the safety gate refused
618/// the transition, or the catalog rejected the activation write.
619#[derive(Debug, Clone, PartialEq, Eq)]
620pub enum TransitionError {
621    /// A CAS or safety check failed during prepare.
622    Rejected(TransitionRejection),
623    /// The catalog refused the activation write.
624    Catalog(CatalogError),
625}
626
627impl From<TransitionRejection> for TransitionError {
628    fn from(value: TransitionRejection) -> Self {
629        TransitionError::Rejected(value)
630    }
631}
632
633impl std::fmt::Display for TransitionError {
634    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
635        match self {
636            Self::Rejected(err) => write!(f, "{err}"),
637            Self::Catalog(err) => write!(f, "{err}"),
638        }
639    }
640}
641
642impl std::error::Error for TransitionError {}
643
644#[cfg(test)]
645mod tests {
646    use super::*;
647    use crate::cluster::ownership::{
648        OwnershipEpoch, PlacementMetadata, RangeBounds, RangeRole, RangeWriteReject, ShardKeyMode,
649    };
650
651    fn collection(name: &str) -> CollectionId {
652        CollectionId::new(name).unwrap()
653    }
654
655    fn ident(cn: &str) -> NodeIdentity {
656        NodeIdentity::from_certificate_subject(cn).unwrap()
657    }
658
659    /// A catalog holding one full-keyspace range owned by `owner` with `replicas`.
660    fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
661        let orders = collection("orders");
662        let mut catalog = ShardOwnershipCatalog::new();
663        catalog
664            .apply_update(RangeOwnership::establish(
665                orders.clone(),
666                RangeId::new(1),
667                ShardKeyMode::Hash,
668                RangeBounds::full(),
669                ident(owner),
670                replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
671                PlacementMetadata::with_replication_factor(3),
672            ))
673            .unwrap();
674        (catalog, orders)
675    }
676
677    /// A request that, by default, names the catalog's current owner/epoch/version
678    /// and a watermark/evidence the candidate covers.
679    fn request(
680        kind: TransitionKind,
681        orders: &CollectionId,
682        expected_owner: &str,
683        target: &str,
684    ) -> TransitionRequest {
685        TransitionRequest::new(
686            kind,
687            orders.clone(),
688            RangeId::new(1),
689            ident(expected_owner),
690            OwnershipEpoch::initial(),
691            CatalogVersion::initial(),
692            ident(target),
693            CommitWatermark::new(1, 10),
694        )
695        .with_evidence(CatchUpEvidence::new(ident(target), 1, 10))
696    }
697
698    #[test]
699    fn successful_promote_moves_authority_and_bumps_epoch() {
700        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
701        let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b");
702
703        let outcome = run_transition(&mut catalog, &req).expect("promote should succeed");
704
705        assert_eq!(outcome.kind, TransitionKind::Promote);
706        assert_eq!(outcome.previous_owner, ident("CN=node-a"));
707        assert_eq!(outcome.new_owner, ident("CN=node-b"));
708        assert_eq!(outcome.previous_epoch, OwnershipEpoch::initial());
709        assert_eq!(outcome.new_epoch.value(), 2);
710        assert_eq!(outcome.new_version.value(), 2);
711        assert!(outcome.fenced_old_owner());
712
713        // The catalog now makes node-b authoritative for the range.
714        let range = catalog.range(&orders, RangeId::new(1)).unwrap();
715        assert_eq!(range.owner(), &ident("CN=node-b"));
716        assert_eq!(range.epoch().value(), 2);
717        // Audit string mentions both owners and the kind.
718        let audit = outcome.to_string();
719        assert!(audit.contains("promote"));
720        assert!(audit.contains("CN=node-a"));
721        assert!(audit.contains("CN=node-b"));
722    }
723
724    #[test]
725    fn successful_handoff_demotes_old_owner_to_replica() {
726        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
727        // Move-range cutover: hand off to node-b, keep node-a as a replica.
728        let req = request(TransitionKind::Handoff, &orders, "CN=node-a", "CN=node-b")
729            .with_replicas([ident("CN=node-a")]);
730
731        let outcome = run_transition(&mut catalog, &req).expect("handoff should succeed");
732        assert_eq!(outcome.kind, TransitionKind::Handoff);
733
734        let range = catalog.range(&orders, RangeId::new(1)).unwrap();
735        assert_eq!(range.owner(), &ident("CN=node-b"));
736        assert_eq!(range.role_of(&ident("CN=node-a")), RangeRole::Replica);
737        assert_eq!(range.role_of(&ident("CN=node-b")), RangeRole::Owner);
738    }
739
740    #[test]
741    fn old_owner_is_fenced_from_durable_writes_after_transition() {
742        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
743        // Before the transition node-a is admitted at the initial epoch.
744        assert!(catalog
745            .admit_public_write(
746                &ident("CN=node-a"),
747                &orders,
748                b"k",
749                OwnershipEpoch::initial()
750            )
751            .is_ok());
752
753        let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b")
754            .with_replicas([ident("CN=node-a")]);
755        run_transition(&mut catalog, &req).unwrap();
756
757        // node-a, now a replica at the *old* epoch, is fenced: it is no longer the
758        // owner, so a public durable write is rejected.
759        let err = catalog
760            .admit_public_write(
761                &ident("CN=node-a"),
762                &orders,
763                b"k",
764                OwnershipEpoch::initial(),
765            )
766            .unwrap_err();
767        assert!(matches!(err, RangeWriteReject::NotOwner { .. }));
768
769        // Even if node-a still believed it was owner, its old epoch is stale.
770        // node-b at the new epoch is the one admitted.
771        assert!(catalog
772            .admit_public_write(
773                &ident("CN=node-b"),
774                &orders,
775                b"k",
776                catalog.range(&orders, RangeId::new(1)).unwrap().epoch()
777            )
778            .is_ok());
779    }
780
781    #[test]
782    fn prepare_does_not_mutate_catalog() {
783        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
784        let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b");
785        let _prepared = prepare(&catalog, &req).expect("prepare ok");
786        // Catalog still has node-a as owner at the initial epoch — activation is a
787        // separate step.
788        let range = catalog.range(&orders, RangeId::new(1)).unwrap();
789        assert_eq!(range.owner(), &ident("CN=node-a"));
790        assert_eq!(range.epoch(), OwnershipEpoch::initial());
791    }
792
793    /// The version/epoch a single unapplied `transfer_to` would advance the
794    /// current range to — used to obtain non-initial values without the private
795    /// `next()` constructor.
796    fn bumped(catalog: &ShardOwnershipCatalog, orders: &CollectionId) -> RangeOwnership {
797        catalog
798            .range(orders, RangeId::new(1))
799            .unwrap()
800            .transfer_to(ident("CN=tmp"), [])
801    }
802
803    #[test]
804    fn stale_catalog_version_is_rejected() {
805        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
806        // Advance the entry to version 2 without moving the owner or epoch
807        // (a replica-set change), so only the planner's version is stale.
808        let v2_entry = catalog
809            .range(&orders, RangeId::new(1))
810            .unwrap()
811            .update_replicas([ident("CN=node-b")]);
812        catalog.apply_update(v2_entry).unwrap();
813
814        let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b");
815        // The default request still carries the initial (stale) catalog version.
816        let err = prepare(&catalog, &req).unwrap_err();
817        match err {
818            TransitionRejection::StaleCatalogVersion {
819                expected, current, ..
820            } => {
821                assert_eq!(expected, CatalogVersion::initial());
822                assert_eq!(current.value(), 2);
823            }
824            other => panic!("expected StaleCatalogVersion, got {other:?}"),
825        }
826    }
827
828    #[test]
829    fn stale_expected_owner_is_rejected() {
830        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
831        // Planner believes node-x owns the range.
832        let req = request(TransitionKind::Promote, &orders, "CN=node-x", "CN=node-b");
833        let err = prepare(&catalog, &req).unwrap_err();
834        match err {
835            TransitionRejection::OwnerMismatch {
836                expected, current, ..
837            } => {
838                assert_eq!(expected, ident("CN=node-x"));
839                assert_eq!(current, ident("CN=node-a"));
840            }
841            other => panic!("expected OwnerMismatch, got {other:?}"),
842        }
843    }
844
845    #[test]
846    fn stale_expected_epoch_is_rejected() {
847        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
848        // A non-initial epoch value (2), obtained without the private `next()`.
849        let wrong_epoch = bumped(&catalog, &orders).epoch();
850        assert_eq!(wrong_epoch.value(), 2);
851        let req = TransitionRequest::new(
852            TransitionKind::Promote,
853            orders.clone(),
854            RangeId::new(1),
855            ident("CN=node-a"),
856            wrong_epoch,
857            CatalogVersion::initial(),
858            ident("CN=node-b"),
859            CommitWatermark::new(1, 10),
860        )
861        .with_evidence(CatchUpEvidence::new(ident("CN=node-b"), 1, 10));
862        let err = prepare(&catalog, &req).unwrap_err();
863        assert!(matches!(err, TransitionRejection::StaleEpoch { .. }));
864    }
865
866    #[test]
867    fn invalid_candidate_not_a_replica_is_rejected() {
868        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
869        // node-z holds no copy of the range.
870        let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-z");
871        let err = prepare(&catalog, &req).unwrap_err();
872        match err {
873            TransitionRejection::InvalidCandidate { reason, .. } => {
874                assert_eq!(reason, InvalidCandidateReason::NotAReplica);
875            }
876            other => panic!("expected InvalidCandidate(NotAReplica), got {other:?}"),
877        }
878    }
879
880    #[test]
881    fn invalid_candidate_already_owner_is_rejected() {
882        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
883        // Targeting the incumbent owner is a no-op transition.
884        let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-a")
885            .with_evidence(CatchUpEvidence::new(ident("CN=node-a"), 1, 10));
886        let err = prepare(&catalog, &req).unwrap_err();
887        match err {
888            TransitionRejection::InvalidCandidate { reason, .. } => {
889                assert_eq!(reason, InvalidCandidateReason::AlreadyOwner);
890            }
891            other => panic!("expected InvalidCandidate(AlreadyOwner), got {other:?}"),
892        }
893    }
894
895    #[test]
896    fn missing_safety_evidence_fails_closed() {
897        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
898        let req = TransitionRequest::new(
899            TransitionKind::Promote,
900            orders.clone(),
901            RangeId::new(1),
902            ident("CN=node-a"),
903            OwnershipEpoch::initial(),
904            CatalogVersion::initial(),
905            ident("CN=node-b"),
906            CommitWatermark::new(1, 10),
907        ); // no evidence attached
908        let err = prepare(&catalog, &req).unwrap_err();
909        assert!(matches!(
910            err,
911            TransitionRejection::MissingSafetyEvidence { .. }
912        ));
913    }
914
915    #[test]
916    fn evidence_for_a_different_candidate_is_rejected() {
917        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
918        // Target node-b but present node-c's progress as the evidence.
919        let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b")
920            .with_evidence(CatchUpEvidence::new(ident("CN=node-c"), 9, 99));
921        let err = prepare(&catalog, &req).unwrap_err();
922        assert!(matches!(
923            err,
924            TransitionRejection::EvidenceForWrongCandidate { .. }
925        ));
926    }
927
928    #[test]
929    fn candidate_behind_commit_watermark_fails_safety_check() {
930        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
931        // Watermark is term 2 / lsn 50, but the candidate only applied term 2 lsn 49.
932        let req = TransitionRequest::new(
933            TransitionKind::Promote,
934            orders.clone(),
935            RangeId::new(1),
936            ident("CN=node-a"),
937            OwnershipEpoch::initial(),
938            CatalogVersion::initial(),
939            ident("CN=node-b"),
940            CommitWatermark::new(2, 50),
941        )
942        .with_evidence(CatchUpEvidence::new(ident("CN=node-b"), 2, 49));
943        let err = prepare(&catalog, &req).unwrap_err();
944        match err {
945            TransitionRejection::SafetyCheckFailed {
946                applied_lsn,
947                watermark,
948                ..
949            } => {
950                assert_eq!(applied_lsn, 49);
951                assert_eq!(watermark, CommitWatermark::new(2, 50));
952            }
953            other => panic!("expected SafetyCheckFailed, got {other:?}"),
954        }
955    }
956
957    #[test]
958    fn candidate_on_older_term_fails_even_with_higher_lsn() {
959        let (catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
960        // A higher LSN on a stale term does not cover the watermark.
961        let req = TransitionRequest::new(
962            TransitionKind::Promote,
963            orders.clone(),
964            RangeId::new(1),
965            ident("CN=node-a"),
966            OwnershipEpoch::initial(),
967            CatalogVersion::initial(),
968            ident("CN=node-b"),
969            CommitWatermark::new(3, 10),
970        )
971        .with_evidence(CatchUpEvidence::new(ident("CN=node-b"), 2, 9999));
972        let err = prepare(&catalog, &req).unwrap_err();
973        assert!(matches!(err, TransitionRejection::SafetyCheckFailed { .. }));
974    }
975
976    #[test]
977    fn evidence_on_newer_term_covers_watermark() {
978        // A candidate ahead on term covers the watermark regardless of LSN.
979        let evidence = CatchUpEvidence::new(ident("CN=node-b"), 5, 0);
980        assert!(evidence.covers(CommitWatermark::new(4, 9999)));
981    }
982
983    #[test]
984    fn rejected_transition_leaves_catalog_unchanged() {
985        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
986        let req = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-z");
987        assert!(run_transition(&mut catalog, &req).is_err());
988        // Ownership and epoch are exactly as before.
989        let range = catalog.range(&orders, RangeId::new(1)).unwrap();
990        assert_eq!(range.owner(), &ident("CN=node-a"));
991        assert_eq!(range.epoch(), OwnershipEpoch::initial());
992        assert_eq!(range.version(), CatalogVersion::initial());
993    }
994
995    #[test]
996    fn second_transition_with_stale_cas_loses() {
997        // Two planners race: the first promote wins and moves to v2/epoch2; the
998        // second, still holding the v1 CAS, is rejected.
999        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
1000        let first = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-b");
1001        run_transition(&mut catalog, &first).unwrap();
1002
1003        // Second planner still thinks node-a owns it at the initial epoch/version.
1004        let second = request(TransitionKind::Promote, &orders, "CN=node-a", "CN=node-c");
1005        let err = run_transition(&mut catalog, &second).unwrap_err();
1006        assert!(matches!(
1007            err,
1008            TransitionError::Rejected(TransitionRejection::OwnerMismatch { .. })
1009        ));
1010    }
1011}