Skip to main content

reddb_server/cluster/
ownership_force.rs

1//! Forced ownership transitions for disaster recovery (issue #999, PRD #987,
2//! ADR 0037).
3//!
4//! The [ordinary transition machine](super::ownership_transition) moves range
5//! authority *safely*: it demands a three-part compare-and-swap on the current
6//! owner/epoch/version and proof that the candidate's applied log covers the range
7//! commit watermark. That is exactly right when the cluster is healthy — but a
8//! disaster (the owner and a quorum of replicas are simultaneously lost) can leave
9//! a range with *no* candidate that can satisfy those checks, and therefore no way
10//! to recover through the ordinary path. This module is the deliberately
11//! dangerous escape hatch the ADR reserves for that case.
12//!
13//! Per ADR 0037: *"Forced transitions are reserved for disaster recovery. Normal
14//! ownership transitions require the ordinary cluster safety checks. A `FORCE`
15//! transition may proceed without ordinary quorum only with a special
16//! administrative capability, explicit operator reason, durable audit evidence,
17//! and an ownership epoch bump that fences any old owner that later reappears."*
18//! Each of those four conditions is a structural part of this module:
19//!
20//! 1. **Distinct administrative capability.** A forced transition is authorised by
21//!    a [`ForceTransitionCapability`], a privilege *separate* from the authority to
22//!    run an ordinary transition. The ordinary [`run_transition`] path never
23//!    consults it, and this path refuses outright
24//!    ([`ForceDenial::MissingCapability`]) when it is absent — there is no way to
25//!    force a transition by accident.
26//! 2. **Explicit operator reason.** The operator must attach a non-empty
27//!    [`OperatorReason`]; a forced transition with no stated justification is
28//!    refused ([`ForceDenial::MissingReason`]). The reason is recorded in the audit
29//!    evidence so a later reviewer learns *why* quorum was bypassed.
30//! 3. **Durable audit evidence for every attempt.** [`force_transition`] *always*
31//!    returns a [`ForcedTransitionAudit`] — for allowed, denied, *and* failed
32//!    attempts alike. A privileged operation that can bypass quorum must leave a
33//!    trail whether or not it succeeded, so the evidence is the function's return
34//!    value, not a side effect a caller can forget to capture.
35//! 4. **Epoch bump that fences the old owner.** A successful force installs a new
36//!    catalog entry via [`RangeOwnership::transfer_to`], which bumps the ownership
37//!    epoch. From that instant the old owner's epoch is stale: should it reappear
38//!    (the partition that "killed" it heals), [`admit_public_write`] rejects its
39//!    writes exactly as it would after an ordinary transition.
40//!
41//! ## What force bypasses — and what it does not
42//!
43//! Force exists *because* ordinary safety cannot be met, so it skips the CAS, the
44//! catch-up safety evidence, and the replica-membership check: the operator may
45//! install **any** target node, even one the catalog does not list as a replica,
46//! because in a true disaster the surviving copy may be exactly such a node. What
47//! force does **not** skip is the catalog's own structural integrity: the range
48//! must exist, and the epoch/version still advance monotonically through
49//! [`apply_update`]. A force against an unknown range is a
50//! [`ForceFailure::UnknownRange`], audited like any other failed attempt.
51//!
52//! Like its siblings this module is a pure, deterministic data model over the
53//! catalog — time (`now_ms`) is passed in for the audit timestamp rather than read
54//! from a clock — so the capability, reason, fencing, and audit story is exercised
55//! without any I/O.
56//!
57//! [`run_transition`]: super::ownership_transition::run_transition
58//! [`admit_public_write`]: super::ownership::ShardOwnershipCatalog::admit_public_write
59
60use super::identity::NodeIdentity;
61use super::ownership::{
62    CatalogError, CatalogVersion, CollectionId, OwnershipEpoch, RangeId, RangeOwnership,
63    ShardOwnershipCatalog,
64};
65
66/// A distinct administrative capability authorising forced ownership transitions.
67///
68/// This is the *special administrative capability* ADR 0037 requires for a `FORCE`
69/// transition — deliberately a separate privilege from ordinary transition
70/// authority, so that holding the power to rebalance or fail over does **not**
71/// confer the power to bypass quorum. Possessing one is the operator's proof of
72/// that privilege; it names the operator principal so the audit trail records *who*
73/// forced the transition.
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct ForceTransitionCapability {
76    operator: NodeIdentity,
77}
78
79impl ForceTransitionCapability {
80    /// A capability granted to `operator` — the principal that will be recorded as
81    /// having exercised it in the audit evidence.
82    pub fn granted_to(operator: NodeIdentity) -> Self {
83        Self { operator }
84    }
85
86    /// The operator principal this capability was granted to.
87    pub fn operator(&self) -> &NodeIdentity {
88        &self.operator
89    }
90}
91
92/// A non-empty, operator-supplied justification for a forced transition.
93///
94/// ADR 0037 requires an *explicit operator reason*; this newtype makes "explicit"
95/// enforceable — it cannot be constructed from blank text, so a forced transition
96/// either carries a real justification or is refused for the lack of one. The
97/// stored text is trimmed of surrounding whitespace.
98#[derive(Debug, Clone, PartialEq, Eq)]
99pub struct OperatorReason(String);
100
101/// The operator reason was empty or whitespace-only — not an explicit
102/// justification, so it cannot authorise a forced transition.
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub struct EmptyOperatorReason;
105
106impl std::fmt::Display for EmptyOperatorReason {
107    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108        write!(f, "forced transition operator reason must not be empty")
109    }
110}
111
112impl std::error::Error for EmptyOperatorReason {}
113
114impl OperatorReason {
115    /// Build a reason from `text`, rejecting blank (empty or whitespace-only)
116    /// input. Surrounding whitespace is trimmed from the stored value.
117    pub fn new(text: impl Into<String>) -> Result<Self, EmptyOperatorReason> {
118        let text = text.into();
119        let trimmed = text.trim();
120        if trimmed.is_empty() {
121            return Err(EmptyOperatorReason);
122        }
123        Ok(Self(trimmed.to_string()))
124    }
125
126    pub fn as_str(&self) -> &str {
127        &self.0
128    }
129}
130
131impl std::fmt::Display for OperatorReason {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        f.write_str(&self.0)
134    }
135}
136
137/// A request to force ownership of one range to `target` for disaster recovery.
138///
139/// Unlike a [`TransitionRequest`](super::ownership_transition::TransitionRequest)
140/// it carries no CAS expectations and no catch-up evidence — force exists precisely
141/// for the case where those cannot be satisfied. What it carries instead are the
142/// two authorisation inputs ADR 0037 requires: an optional
143/// [`ForceTransitionCapability`] and an optional [`OperatorReason`]. They are
144/// *optional* on the request so the authorisation gate can observe — and audit —
145/// their absence; [`force_transition`] refuses any request missing either.
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct ForcedTransitionRequest {
148    collection: CollectionId,
149    range_id: RangeId,
150    target: NodeIdentity,
151    new_replicas: Vec<NodeIdentity>,
152    capability: Option<ForceTransitionCapability>,
153    reason: Option<OperatorReason>,
154}
155
156impl ForcedTransitionRequest {
157    /// A forced-transition request for `(collection, range_id)` installing `target`
158    /// as the new owner, with **no** capability or reason attached yet. As built it
159    /// will be denied; attach authorisation with
160    /// [`with_capability`](Self::with_capability) and [`with_reason`](Self::with_reason).
161    pub fn new(collection: CollectionId, range_id: RangeId, target: NodeIdentity) -> Self {
162        Self {
163            collection,
164            range_id,
165            target,
166            new_replicas: Vec::new(),
167            capability: None,
168            reason: None,
169        }
170    }
171
172    /// Attach the administrative capability authorising the force.
173    pub fn with_capability(mut self, capability: ForceTransitionCapability) -> Self {
174        self.capability = Some(capability);
175        self
176    }
177
178    /// Attach the operator's explicit justification.
179    pub fn with_reason(mut self, reason: OperatorReason) -> Self {
180        self.reason = Some(reason);
181        self
182    }
183
184    /// Set the replica set the forced new owner will carry. Defaults to empty — in
185    /// a disaster the operator often installs a sole survivor with no replicas.
186    pub fn with_replicas(mut self, replicas: impl IntoIterator<Item = NodeIdentity>) -> Self {
187        self.new_replicas = replicas.into_iter().collect();
188        self
189    }
190
191    pub fn collection(&self) -> &CollectionId {
192        &self.collection
193    }
194
195    pub fn range_id(&self) -> RangeId {
196        self.range_id
197    }
198
199    pub fn target(&self) -> &NodeIdentity {
200        &self.target
201    }
202}
203
204/// Why a forced transition was refused at the **authorisation** gate, before the
205/// catalog was consulted. Distinct from [`ForceFailure`], which is a refusal
206/// *after* authorisation passed.
207#[derive(Debug, Clone, Copy, PartialEq, Eq)]
208pub enum ForceDenial {
209    /// No [`ForceTransitionCapability`] was presented. Forcing a transition
210    /// requires the distinct administrative privilege; without it the request is
211    /// refused outright.
212    MissingCapability,
213    /// No (non-empty) [`OperatorReason`] was attached. A forced transition must
214    /// state why quorum is being bypassed.
215    MissingReason,
216}
217
218impl ForceDenial {
219    fn label(self) -> &'static str {
220        match self {
221            ForceDenial::MissingCapability => "no administrative force capability presented",
222            ForceDenial::MissingReason => "no explicit operator reason supplied",
223        }
224    }
225}
226
227impl std::fmt::Display for ForceDenial {
228    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229        f.write_str(self.label())
230    }
231}
232
233/// Why an *authorised* forced transition could not be applied to the catalog. The
234/// operator held the capability and stated a reason, but the catalog refused the
235/// write.
236#[derive(Debug, Clone, PartialEq, Eq)]
237pub enum ForceFailure {
238    /// No range with this `(collection, range_id)` exists — there is nothing to
239    /// take ownership of.
240    UnknownRange,
241    /// The catalog rejected the activation write (e.g. a concurrent edit advanced
242    /// the version first). The forced entry was not installed.
243    Catalog(CatalogError),
244}
245
246impl std::fmt::Display for ForceFailure {
247    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248        match self {
249            Self::UnknownRange => write!(f, "no such range in the catalog"),
250            Self::Catalog(err) => write!(f, "{err}"),
251        }
252    }
253}
254
255/// The disposition of a forced-transition attempt — the verdict recorded in the
256/// [`ForcedTransitionAudit`].
257#[derive(Debug, Clone, PartialEq, Eq)]
258pub enum ForcedTransitionDisposition {
259    /// The force was authorised and applied. Carries the before/after owner,
260    /// epoch, and version so the audit record fully describes what moved.
261    Allowed {
262        previous_owner: NodeIdentity,
263        new_owner: NodeIdentity,
264        previous_epoch: OwnershipEpoch,
265        new_epoch: OwnershipEpoch,
266        previous_version: CatalogVersion,
267        new_version: CatalogVersion,
268    },
269    /// The force was refused at the authorisation gate; the catalog was never
270    /// touched.
271    Denied(ForceDenial),
272    /// The force was authorised but the catalog write failed; nothing moved.
273    Failed(ForceFailure),
274}
275
276/// Durable audit evidence for one forced-transition attempt — emitted for
277/// **allowed, denied, and failed** attempts alike.
278///
279/// This is the *durable audit evidence* ADR 0037 requires. Because a forced
280/// transition can bypass quorum, every attempt — including the ones that were
281/// refused — must leave a trail; making the audit the return value of
282/// [`force_transition`] means a caller cannot perform a force without also
283/// receiving its evidence. The record captures *who* (operator), *why* (reason),
284/// *what* (collection/range/target), *when* (`attempted_at_ms`), and the
285/// [`disposition`](Self::disposition) (the outcome, with the full epoch/version
286/// boundary on success).
287#[derive(Debug, Clone, PartialEq, Eq)]
288pub struct ForcedTransitionAudit {
289    attempted_at_ms: u64,
290    operator: Option<NodeIdentity>,
291    reason: Option<String>,
292    collection: CollectionId,
293    range_id: RangeId,
294    target: NodeIdentity,
295    disposition: ForcedTransitionDisposition,
296}
297
298impl ForcedTransitionAudit {
299    /// The wall-clock time (ms) the attempt was evaluated, as supplied by the
300    /// caller.
301    pub fn attempted_at_ms(&self) -> u64 {
302        self.attempted_at_ms
303    }
304
305    /// The operator principal that exercised the capability, if one was presented.
306    /// `None` on an attempt denied for [`MissingCapability`](ForceDenial::MissingCapability).
307    pub fn operator(&self) -> Option<&NodeIdentity> {
308        self.operator.as_ref()
309    }
310
311    /// The operator's stated reason, if one was attached. `None` on an attempt
312    /// denied for [`MissingReason`](ForceDenial::MissingReason) (or missing
313    /// capability).
314    pub fn reason(&self) -> Option<&str> {
315        self.reason.as_deref()
316    }
317
318    pub fn collection(&self) -> &CollectionId {
319        &self.collection
320    }
321
322    pub fn range_id(&self) -> RangeId {
323        self.range_id
324    }
325
326    /// The node the force tried to install as owner.
327    pub fn target(&self) -> &NodeIdentity {
328        &self.target
329    }
330
331    pub fn disposition(&self) -> &ForcedTransitionDisposition {
332        &self.disposition
333    }
334
335    /// Whether the force was authorised and applied.
336    pub fn is_allowed(&self) -> bool {
337        matches!(
338            self.disposition,
339            ForcedTransitionDisposition::Allowed { .. }
340        )
341    }
342
343    /// Whether a successful force bumped the ownership epoch — true for every
344    /// allowed force, since installing a new owner always advances the epoch and so
345    /// fences any old owner that later reappears. `false` for denied/failed
346    /// attempts, where nothing moved.
347    pub fn fenced_old_owner(&self) -> bool {
348        matches!(
349            self.disposition,
350            ForcedTransitionDisposition::Allowed {
351                previous_epoch,
352                new_epoch,
353                ..
354            } if new_epoch > previous_epoch
355        )
356    }
357}
358
359impl std::fmt::Display for ForcedTransitionAudit {
360    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361        let operator = self
362            .operator
363            .as_ref()
364            .map(|o| o.to_string())
365            .unwrap_or_else(|| "<none>".to_string());
366        let reason = self.reason.as_deref().unwrap_or("<none>");
367        write!(
368            f,
369            "forced ownership transition @ {} ms by operator {} for {}/{} -> {} (reason: {}): ",
370            self.attempted_at_ms, operator, self.collection, self.range_id, self.target, reason,
371        )?;
372        match &self.disposition {
373            ForcedTransitionDisposition::Allowed {
374                previous_owner,
375                new_owner,
376                previous_epoch,
377                new_epoch,
378                previous_version,
379                new_version,
380            } => write!(
381                f,
382                "ALLOWED: {} (epoch {}, version {}) -> {} (epoch {}, version {})",
383                previous_owner, previous_epoch, previous_version, new_owner, new_epoch, new_version,
384            ),
385            ForcedTransitionDisposition::Denied(reason) => write!(f, "DENIED: {reason}"),
386            ForcedTransitionDisposition::Failed(failure) => write!(f, "FAILED: {failure}"),
387        }
388    }
389}
390
391/// Evaluate and, if authorised, apply a forced ownership transition — returning the
392/// durable audit evidence in **every** case.
393///
394/// The authorisation gate is fail-closed and runs before the catalog is consulted:
395/// the request must carry a [`ForceTransitionCapability`]
396/// ([`MissingCapability`](ForceDenial::MissingCapability) otherwise) and a non-empty
397/// [`OperatorReason`] ([`MissingReason`](ForceDenial::MissingReason) otherwise).
398/// Only once both hold is the range looked up; a missing range is a
399/// [`ForceFailure::UnknownRange`]. On success the new owner is installed via
400/// [`RangeOwnership::transfer_to`], bumping the ownership epoch so the old owner is
401/// fenced if it reappears.
402///
403/// Whatever the outcome, the returned [`ForcedTransitionAudit`] records it — the
404/// caller persists it as the operation's audit trail. Denied and failed attempts
405/// leave the catalog untouched.
406pub fn force_transition(
407    catalog: &mut ShardOwnershipCatalog,
408    request: &ForcedTransitionRequest,
409    now_ms: u64,
410) -> ForcedTransitionAudit {
411    let operator = request.capability.as_ref().map(|c| c.operator().clone());
412    let reason = request.reason.as_ref().map(|r| r.as_str().to_string());
413
414    let audit = |disposition| ForcedTransitionAudit {
415        attempted_at_ms: now_ms,
416        operator: operator.clone(),
417        reason: reason.clone(),
418        collection: request.collection.clone(),
419        range_id: request.range_id,
420        target: request.target.clone(),
421        disposition,
422    };
423
424    // Authorisation gate (fail-closed): the distinct capability first, then the
425    // explicit operator reason. Both are required before the catalog is touched.
426    if request.capability.is_none() {
427        return audit(ForcedTransitionDisposition::Denied(
428            ForceDenial::MissingCapability,
429        ));
430    }
431    if request.reason.is_none() {
432        return audit(ForcedTransitionDisposition::Denied(
433            ForceDenial::MissingReason,
434        ));
435    }
436
437    // Authorised. Force bypasses CAS / catch-up evidence / replica membership, but
438    // the range must exist for there to be ownership to move.
439    let Some(current) = catalog.range(&request.collection, request.range_id) else {
440        return audit(ForcedTransitionDisposition::Failed(
441            ForceFailure::UnknownRange,
442        ));
443    };
444
445    let previous_owner = current.owner().clone();
446    let previous_epoch = current.epoch();
447    let previous_version = current.version();
448    // transfer_to bumps both epoch (fencing the old owner) and version.
449    let next = current.transfer_to(request.target.clone(), request.new_replicas.clone());
450    let new_epoch = next.epoch();
451    let new_version = next.version();
452
453    match catalog.apply_update(next) {
454        Ok(_) => audit(ForcedTransitionDisposition::Allowed {
455            previous_owner,
456            new_owner: request.target.clone(),
457            previous_epoch,
458            new_epoch,
459            previous_version,
460            new_version,
461        }),
462        Err(err) => audit(ForcedTransitionDisposition::Failed(ForceFailure::Catalog(
463            err,
464        ))),
465    }
466}
467
468#[cfg(test)]
469mod tests {
470    use super::*;
471    use crate::cluster::ownership::{
472        OwnershipEpoch, PlacementMetadata, RangeBounds, RangeWriteReject, ShardKeyMode,
473    };
474
475    fn collection(name: &str) -> CollectionId {
476        CollectionId::new(name).unwrap()
477    }
478
479    fn ident(cn: &str) -> NodeIdentity {
480        NodeIdentity::from_certificate_subject(cn).unwrap()
481    }
482
483    /// A catalog holding one full-keyspace range owned by `owner` with `replicas`.
484    fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
485        let orders = collection("orders");
486        let mut catalog = ShardOwnershipCatalog::new();
487        catalog
488            .apply_update(RangeOwnership::establish(
489                orders.clone(),
490                RangeId::new(1),
491                ShardKeyMode::Hash,
492                RangeBounds::full(),
493                ident(owner),
494                replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
495                PlacementMetadata::with_replication_factor(3),
496            ))
497            .unwrap();
498        (catalog, orders)
499    }
500
501    fn capability(operator: &str) -> ForceTransitionCapability {
502        ForceTransitionCapability::granted_to(ident(operator))
503    }
504
505    fn reason() -> OperatorReason {
506        OperatorReason::new("primary AZ lost, promoting surviving copy").unwrap()
507    }
508
509    /// A fully-authorised forced request: capability + reason attached.
510    fn authorised_request(orders: &CollectionId, target: &str) -> ForcedTransitionRequest {
511        ForcedTransitionRequest::new(orders.clone(), RangeId::new(1), ident(target))
512            .with_capability(capability("CN=operator-root"))
513            .with_reason(reason())
514    }
515
516    // ---------------------------------------------------------------
517    // OperatorReason: "explicit" must be enforceable.
518    // ---------------------------------------------------------------
519
520    #[test]
521    fn operator_reason_rejects_blank_input() {
522        assert_eq!(OperatorReason::new(""), Err(EmptyOperatorReason));
523        assert_eq!(OperatorReason::new("   "), Err(EmptyOperatorReason));
524        assert_eq!(OperatorReason::new("\t\n "), Err(EmptyOperatorReason));
525    }
526
527    #[test]
528    fn operator_reason_trims_surrounding_whitespace() {
529        let r = OperatorReason::new("  recover orders/1  ").unwrap();
530        assert_eq!(r.as_str(), "recover orders/1");
531    }
532
533    // ---------------------------------------------------------------
534    // Authorisation gate: capability and reason are both required.
535    // ---------------------------------------------------------------
536
537    #[test]
538    fn force_denied_without_capability() {
539        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
540        // Reason present, but no capability.
541        let req = ForcedTransitionRequest::new(orders.clone(), RangeId::new(1), ident("CN=node-b"))
542            .with_reason(reason());
543
544        let audit = force_transition(&mut catalog, &req, 1_000);
545
546        assert!(!audit.is_allowed());
547        assert_eq!(
548            audit.disposition(),
549            &ForcedTransitionDisposition::Denied(ForceDenial::MissingCapability)
550        );
551        // The catalog is untouched: node-a is still owner at the initial epoch.
552        let range = catalog.range(&orders, RangeId::new(1)).unwrap();
553        assert_eq!(range.owner(), &ident("CN=node-a"));
554        assert_eq!(range.epoch(), OwnershipEpoch::initial());
555        // Audit evidence is still emitted for the denied attempt.
556        assert!(audit.to_string().contains("DENIED"));
557        assert_eq!(audit.attempted_at_ms(), 1_000);
558    }
559
560    #[test]
561    fn force_denied_without_reason() {
562        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
563        // Capability present, but no operator reason.
564        let req = ForcedTransitionRequest::new(orders.clone(), RangeId::new(1), ident("CN=node-b"))
565            .with_capability(capability("CN=operator-root"));
566
567        let audit = force_transition(&mut catalog, &req, 2_000);
568
569        assert!(!audit.is_allowed());
570        assert_eq!(
571            audit.disposition(),
572            &ForcedTransitionDisposition::Denied(ForceDenial::MissingReason)
573        );
574        // Operator is recorded (capability was presented) but reason is absent.
575        assert_eq!(audit.operator(), Some(&ident("CN=operator-root")));
576        assert_eq!(audit.reason(), None);
577        // Catalog untouched.
578        let range = catalog.range(&orders, RangeId::new(1)).unwrap();
579        assert_eq!(range.owner(), &ident("CN=node-a"));
580    }
581
582    #[test]
583    fn missing_capability_is_reported_before_missing_reason() {
584        // Fail-closed ordering: with neither present, the capability denial wins.
585        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
586        let req = ForcedTransitionRequest::new(orders, RangeId::new(1), ident("CN=node-b"));
587        let audit = force_transition(&mut catalog, &req, 0);
588        assert_eq!(
589            audit.disposition(),
590            &ForcedTransitionDisposition::Denied(ForceDenial::MissingCapability)
591        );
592    }
593
594    // ---------------------------------------------------------------
595    // Successful forced transition: epoch bump + audit evidence.
596    // ---------------------------------------------------------------
597
598    #[test]
599    fn authorised_force_bumps_epoch_and_moves_owner() {
600        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
601        // node-b is not even a replica — force can still install it.
602        let req = ForcedTransitionRequest::new(orders.clone(), RangeId::new(1), ident("CN=node-z"))
603            .with_capability(capability("CN=operator-root"))
604            .with_reason(reason());
605
606        let audit = force_transition(&mut catalog, &req, 5_000);
607
608        assert!(audit.is_allowed());
609        assert!(audit.fenced_old_owner());
610        match audit.disposition() {
611            ForcedTransitionDisposition::Allowed {
612                previous_owner,
613                new_owner,
614                previous_epoch,
615                new_epoch,
616                previous_version,
617                new_version,
618            } => {
619                assert_eq!(previous_owner, &ident("CN=node-a"));
620                assert_eq!(new_owner, &ident("CN=node-z"));
621                assert_eq!(*previous_epoch, OwnershipEpoch::initial());
622                assert_eq!(new_epoch.value(), 2);
623                assert_eq!(previous_version.value(), 1);
624                assert_eq!(new_version.value(), 2);
625            }
626            other => panic!("expected Allowed, got {other:?}"),
627        }
628
629        // The catalog now makes node-z authoritative at the bumped epoch.
630        let range = catalog.range(&orders, RangeId::new(1)).unwrap();
631        assert_eq!(range.owner(), &ident("CN=node-z"));
632        assert_eq!(range.epoch().value(), 2);
633    }
634
635    #[test]
636    fn audit_evidence_records_operator_reason_and_boundary() {
637        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
638        let req = authorised_request(&orders, "CN=node-b");
639
640        let audit = force_transition(&mut catalog, &req, 7_777);
641
642        assert_eq!(audit.operator(), Some(&ident("CN=operator-root")));
643        assert_eq!(
644            audit.reason(),
645            Some("primary AZ lost, promoting surviving copy")
646        );
647        assert_eq!(audit.attempted_at_ms(), 7_777);
648        assert_eq!(audit.target(), &ident("CN=node-b"));
649        let line = audit.to_string();
650        assert!(line.contains("ALLOWED"));
651        assert!(line.contains("CN=operator-root"));
652        assert!(line.contains("primary AZ lost"));
653        assert!(line.contains("CN=node-a"));
654        assert!(line.contains("CN=node-b"));
655    }
656
657    // ---------------------------------------------------------------
658    // Old owner is fenced once it reappears after a force.
659    // ---------------------------------------------------------------
660
661    #[test]
662    fn reappearing_old_owner_is_fenced_after_force() {
663        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
664        // Before the force node-a (the owner) is admitted at the initial epoch.
665        assert!(catalog
666            .admit_public_write(
667                &ident("CN=node-a"),
668                &orders,
669                b"k",
670                OwnershipEpoch::initial()
671            )
672            .is_ok());
673
674        // Force ownership to node-b, demoting node-a to a replica so role alone
675        // would not fence it — only the epoch bump does.
676        let req = authorised_request(&orders, "CN=node-b").with_replicas([ident("CN=node-a")]);
677        let audit = force_transition(&mut catalog, &req, 1_000);
678        assert!(audit.is_allowed());
679
680        // node-a reappears (partition healed) still believing epoch 1. Its write is
681        // fenced: as a replica it is no longer the owner...
682        let err = catalog
683            .admit_public_write(
684                &ident("CN=node-a"),
685                &orders,
686                b"k",
687                OwnershipEpoch::initial(),
688            )
689            .unwrap_err();
690        assert!(matches!(err, RangeWriteReject::NotOwner { .. }));
691
692        // ...and even an old owner that still believed itself owner would carry the
693        // stale epoch; node-b at the bumped epoch is the one now admitted.
694        let current_epoch = catalog.range(&orders, RangeId::new(1)).unwrap().epoch();
695        assert!(catalog
696            .admit_public_write(&ident("CN=node-b"), &orders, b"k", current_epoch)
697            .is_ok());
698    }
699
700    #[test]
701    fn force_against_unknown_range_fails_and_is_audited() {
702        let mut catalog = ShardOwnershipCatalog::new();
703        let orders = collection("orders");
704        // Authorised, but the range does not exist.
705        let req = authorised_request(&orders, "CN=node-b");
706
707        let audit = force_transition(&mut catalog, &req, 3_000);
708
709        assert!(!audit.is_allowed());
710        assert_eq!(
711            audit.disposition(),
712            &ForcedTransitionDisposition::Failed(ForceFailure::UnknownRange)
713        );
714        // Failed attempts still carry full operator/reason evidence.
715        assert_eq!(audit.operator(), Some(&ident("CN=operator-root")));
716        assert!(audit.reason().is_some());
717        assert!(audit.to_string().contains("FAILED"));
718    }
719
720    #[test]
721    fn ordinary_safety_checks_are_untouched_by_the_force_path() {
722        // The force path neither weakens nor invokes the ordinary transition gate:
723        // an ordinary transition with a stale CAS still loses. This guards the
724        // acceptance criterion that non-force operations keep their safety checks.
725        use crate::cluster::ownership_transition::{
726            run_transition, CommitWatermark, TransitionError, TransitionKind, TransitionRejection,
727            TransitionRequest,
728        };
729        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
730
731        // First, a legitimate force moves authority to node-b (epoch -> 2).
732        let forced = force_transition(&mut catalog, &authorised_request(&orders, "CN=node-b"), 10);
733        assert!(forced.is_allowed());
734
735        // An ordinary transition planner still holding the pre-force CAS (node-a at
736        // epoch 1) is rejected by the ordinary gate — force did not disable it.
737        let stale = TransitionRequest::new(
738            TransitionKind::Promote,
739            orders.clone(),
740            RangeId::new(1),
741            ident("CN=node-a"),
742            OwnershipEpoch::initial(),
743            CatalogVersion::initial(),
744            ident("CN=node-b"),
745            CommitWatermark::new(1, 10),
746        )
747        .with_evidence(crate::cluster::ownership_transition::CatchUpEvidence::new(
748            ident("CN=node-b"),
749            1,
750            10,
751        ));
752        let err = run_transition(&mut catalog, &stale).unwrap_err();
753        assert!(matches!(
754            err,
755            TransitionError::Rejected(TransitionRejection::OwnerMismatch { .. })
756        ));
757    }
758}