reddb_server/cluster/ownership_force.rs
1//! Forced ownership transitions for disaster recovery (issue #999, PRD #987,
2//! ADR 0037).
3//!
4//! The [ordinary transition machine](super::ownership_transition) moves range
5//! authority *safely*: it demands a three-part compare-and-swap on the current
6//! owner/epoch/version and proof that the candidate's applied log covers the range
7//! commit watermark. That is exactly right when the cluster is healthy — but a
8//! disaster (the owner and a quorum of replicas are simultaneously lost) can leave
9//! a range with *no* candidate that can satisfy those checks, and therefore no way
10//! to recover through the ordinary path. This module is the deliberately
11//! dangerous escape hatch the ADR reserves for that case.
12//!
13//! Per ADR 0037: *"Forced transitions are reserved for disaster recovery. Normal
14//! ownership transitions require the ordinary cluster safety checks. A `FORCE`
15//! transition may proceed without ordinary quorum only with a special
16//! administrative capability, explicit operator reason, durable audit evidence,
17//! and an ownership epoch bump that fences any old owner that later reappears."*
18//! Each of those four conditions is a structural part of this module:
19//!
20//! 1. **Distinct administrative capability.** A forced transition is authorised by
21//! a [`ForceTransitionCapability`], a privilege *separate* from the authority to
22//! run an ordinary transition. The ordinary [`run_transition`] path never
23//! consults it, and this path refuses outright
24//! ([`ForceDenial::MissingCapability`]) when it is absent — there is no way to
25//! force a transition by accident.
26//! 2. **Explicit operator reason.** The operator must attach a non-empty
27//! [`OperatorReason`]; a forced transition with no stated justification is
28//! refused ([`ForceDenial::MissingReason`]). The reason is recorded in the audit
29//! evidence so a later reviewer learns *why* quorum was bypassed.
30//! 3. **Durable audit evidence for every attempt.** [`force_transition`] *always*
31//! returns a [`ForcedTransitionAudit`] — for allowed, denied, *and* failed
32//! attempts alike. A privileged operation that can bypass quorum must leave a
33//! trail whether or not it succeeded, so the evidence is the function's return
34//! value, not a side effect a caller can forget to capture.
35//! 4. **Epoch bump that fences the old owner.** A successful force installs a new
36//! catalog entry via [`RangeOwnership::transfer_to`], which bumps the ownership
37//! epoch. From that instant the old owner's epoch is stale: should it reappear
38//! (the partition that "killed" it heals), [`admit_public_write`] rejects its
39//! writes exactly as it would after an ordinary transition.
40//!
41//! ## What force bypasses — and what it does not
42//!
43//! Force exists *because* ordinary safety cannot be met, so it skips the CAS, the
44//! catch-up safety evidence, and the replica-membership check: the operator may
45//! install **any** target node, even one the catalog does not list as a replica,
46//! because in a true disaster the surviving copy may be exactly such a node. What
47//! force does **not** skip is the catalog's own structural integrity: the range
48//! must exist, and the epoch/version still advance monotonically through
49//! [`apply_update`]. A force against an unknown range is a
50//! [`ForceFailure::UnknownRange`], audited like any other failed attempt.
51//!
52//! Like its siblings this module is a pure, deterministic data model over the
53//! catalog — time (`now_ms`) is passed in for the audit timestamp rather than read
54//! from a clock — so the capability, reason, fencing, and audit story is exercised
55//! without any I/O.
56//!
57//! [`run_transition`]: super::ownership_transition::run_transition
58//! [`admit_public_write`]: super::ownership::ShardOwnershipCatalog::admit_public_write
59
60use super::identity::NodeIdentity;
61use super::ownership::{
62 CatalogError, CatalogVersion, CollectionId, OwnershipEpoch, RangeId, RangeOwnership,
63 ShardOwnershipCatalog,
64};
65
66/// A distinct administrative capability authorising forced ownership transitions.
67///
68/// This is the *special administrative capability* ADR 0037 requires for a `FORCE`
69/// transition — deliberately a separate privilege from ordinary transition
70/// authority, so that holding the power to rebalance or fail over does **not**
71/// confer the power to bypass quorum. Possessing one is the operator's proof of
72/// that privilege; it names the operator principal so the audit trail records *who*
73/// forced the transition.
74#[derive(Debug, Clone, PartialEq, Eq)]
75pub struct ForceTransitionCapability {
76 operator: NodeIdentity,
77}
78
79impl ForceTransitionCapability {
80 /// A capability granted to `operator` — the principal that will be recorded as
81 /// having exercised it in the audit evidence.
82 pub fn granted_to(operator: NodeIdentity) -> Self {
83 Self { operator }
84 }
85
86 /// The operator principal this capability was granted to.
87 pub fn operator(&self) -> &NodeIdentity {
88 &self.operator
89 }
90}
91
92/// A non-empty, operator-supplied justification for a forced transition.
93///
94/// ADR 0037 requires an *explicit operator reason*; this newtype makes "explicit"
95/// enforceable — it cannot be constructed from blank text, so a forced transition
96/// either carries a real justification or is refused for the lack of one. The
97/// stored text is trimmed of surrounding whitespace.
98#[derive(Debug, Clone, PartialEq, Eq)]
99pub struct OperatorReason(String);
100
101/// The operator reason was empty or whitespace-only — not an explicit
102/// justification, so it cannot authorise a forced transition.
103#[derive(Debug, Clone, Copy, PartialEq, Eq)]
104pub struct EmptyOperatorReason;
105
106impl std::fmt::Display for EmptyOperatorReason {
107 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
108 write!(f, "forced transition operator reason must not be empty")
109 }
110}
111
112impl std::error::Error for EmptyOperatorReason {}
113
114impl OperatorReason {
115 /// Build a reason from `text`, rejecting blank (empty or whitespace-only)
116 /// input. Surrounding whitespace is trimmed from the stored value.
117 pub fn new(text: impl Into<String>) -> Result<Self, EmptyOperatorReason> {
118 let text = text.into();
119 let trimmed = text.trim();
120 if trimmed.is_empty() {
121 return Err(EmptyOperatorReason);
122 }
123 Ok(Self(trimmed.to_string()))
124 }
125
126 pub fn as_str(&self) -> &str {
127 &self.0
128 }
129}
130
131impl std::fmt::Display for OperatorReason {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 f.write_str(&self.0)
134 }
135}
136
137/// A request to force ownership of one range to `target` for disaster recovery.
138///
139/// Unlike a [`TransitionRequest`](super::ownership_transition::TransitionRequest)
140/// it carries no CAS expectations and no catch-up evidence — force exists precisely
141/// for the case where those cannot be satisfied. What it carries instead are the
142/// two authorisation inputs ADR 0037 requires: an optional
143/// [`ForceTransitionCapability`] and an optional [`OperatorReason`]. They are
144/// *optional* on the request so the authorisation gate can observe — and audit —
145/// their absence; [`force_transition`] refuses any request missing either.
146#[derive(Debug, Clone, PartialEq, Eq)]
147pub struct ForcedTransitionRequest {
148 collection: CollectionId,
149 range_id: RangeId,
150 target: NodeIdentity,
151 new_replicas: Vec<NodeIdentity>,
152 capability: Option<ForceTransitionCapability>,
153 reason: Option<OperatorReason>,
154}
155
156impl ForcedTransitionRequest {
157 /// A forced-transition request for `(collection, range_id)` installing `target`
158 /// as the new owner, with **no** capability or reason attached yet. As built it
159 /// will be denied; attach authorisation with
160 /// [`with_capability`](Self::with_capability) and [`with_reason`](Self::with_reason).
161 pub fn new(collection: CollectionId, range_id: RangeId, target: NodeIdentity) -> Self {
162 Self {
163 collection,
164 range_id,
165 target,
166 new_replicas: Vec::new(),
167 capability: None,
168 reason: None,
169 }
170 }
171
172 /// Attach the administrative capability authorising the force.
173 pub fn with_capability(mut self, capability: ForceTransitionCapability) -> Self {
174 self.capability = Some(capability);
175 self
176 }
177
178 /// Attach the operator's explicit justification.
179 pub fn with_reason(mut self, reason: OperatorReason) -> Self {
180 self.reason = Some(reason);
181 self
182 }
183
184 /// Set the replica set the forced new owner will carry. Defaults to empty — in
185 /// a disaster the operator often installs a sole survivor with no replicas.
186 pub fn with_replicas(mut self, replicas: impl IntoIterator<Item = NodeIdentity>) -> Self {
187 self.new_replicas = replicas.into_iter().collect();
188 self
189 }
190
191 pub fn collection(&self) -> &CollectionId {
192 &self.collection
193 }
194
195 pub fn range_id(&self) -> RangeId {
196 self.range_id
197 }
198
199 pub fn target(&self) -> &NodeIdentity {
200 &self.target
201 }
202}
203
204/// Why a forced transition was refused at the **authorisation** gate, before the
205/// catalog was consulted. Distinct from [`ForceFailure`], which is a refusal
206/// *after* authorisation passed.
207#[derive(Debug, Clone, Copy, PartialEq, Eq)]
208pub enum ForceDenial {
209 /// No [`ForceTransitionCapability`] was presented. Forcing a transition
210 /// requires the distinct administrative privilege; without it the request is
211 /// refused outright.
212 MissingCapability,
213 /// No (non-empty) [`OperatorReason`] was attached. A forced transition must
214 /// state why quorum is being bypassed.
215 MissingReason,
216}
217
218impl ForceDenial {
219 fn label(self) -> &'static str {
220 match self {
221 ForceDenial::MissingCapability => "no administrative force capability presented",
222 ForceDenial::MissingReason => "no explicit operator reason supplied",
223 }
224 }
225}
226
227impl std::fmt::Display for ForceDenial {
228 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
229 f.write_str(self.label())
230 }
231}
232
233/// Why an *authorised* forced transition could not be applied to the catalog. The
234/// operator held the capability and stated a reason, but the catalog refused the
235/// write.
236#[derive(Debug, Clone, PartialEq, Eq)]
237pub enum ForceFailure {
238 /// No range with this `(collection, range_id)` exists — there is nothing to
239 /// take ownership of.
240 UnknownRange,
241 /// The catalog rejected the activation write (e.g. a concurrent edit advanced
242 /// the version first). The forced entry was not installed.
243 Catalog(CatalogError),
244}
245
246impl std::fmt::Display for ForceFailure {
247 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
248 match self {
249 Self::UnknownRange => write!(f, "no such range in the catalog"),
250 Self::Catalog(err) => write!(f, "{err}"),
251 }
252 }
253}
254
255/// The disposition of a forced-transition attempt — the verdict recorded in the
256/// [`ForcedTransitionAudit`].
257#[derive(Debug, Clone, PartialEq, Eq)]
258pub enum ForcedTransitionDisposition {
259 /// The force was authorised and applied. Carries the before/after owner,
260 /// epoch, and version so the audit record fully describes what moved.
261 Allowed {
262 previous_owner: NodeIdentity,
263 new_owner: NodeIdentity,
264 previous_epoch: OwnershipEpoch,
265 new_epoch: OwnershipEpoch,
266 previous_version: CatalogVersion,
267 new_version: CatalogVersion,
268 },
269 /// The force was refused at the authorisation gate; the catalog was never
270 /// touched.
271 Denied(ForceDenial),
272 /// The force was authorised but the catalog write failed; nothing moved.
273 Failed(ForceFailure),
274}
275
276/// Durable audit evidence for one forced-transition attempt — emitted for
277/// **allowed, denied, and failed** attempts alike.
278///
279/// This is the *durable audit evidence* ADR 0037 requires. Because a forced
280/// transition can bypass quorum, every attempt — including the ones that were
281/// refused — must leave a trail; making the audit the return value of
282/// [`force_transition`] means a caller cannot perform a force without also
283/// receiving its evidence. The record captures *who* (operator), *why* (reason),
284/// *what* (collection/range/target), *when* (`attempted_at_ms`), and the
285/// [`disposition`](Self::disposition) (the outcome, with the full epoch/version
286/// boundary on success).
287#[derive(Debug, Clone, PartialEq, Eq)]
288pub struct ForcedTransitionAudit {
289 attempted_at_ms: u64,
290 operator: Option<NodeIdentity>,
291 reason: Option<String>,
292 collection: CollectionId,
293 range_id: RangeId,
294 target: NodeIdentity,
295 disposition: ForcedTransitionDisposition,
296}
297
298impl ForcedTransitionAudit {
299 /// The wall-clock time (ms) the attempt was evaluated, as supplied by the
300 /// caller.
301 pub fn attempted_at_ms(&self) -> u64 {
302 self.attempted_at_ms
303 }
304
305 /// The operator principal that exercised the capability, if one was presented.
306 /// `None` on an attempt denied for [`MissingCapability`](ForceDenial::MissingCapability).
307 pub fn operator(&self) -> Option<&NodeIdentity> {
308 self.operator.as_ref()
309 }
310
311 /// The operator's stated reason, if one was attached. `None` on an attempt
312 /// denied for [`MissingReason`](ForceDenial::MissingReason) (or missing
313 /// capability).
314 pub fn reason(&self) -> Option<&str> {
315 self.reason.as_deref()
316 }
317
318 pub fn collection(&self) -> &CollectionId {
319 &self.collection
320 }
321
322 pub fn range_id(&self) -> RangeId {
323 self.range_id
324 }
325
326 /// The node the force tried to install as owner.
327 pub fn target(&self) -> &NodeIdentity {
328 &self.target
329 }
330
331 pub fn disposition(&self) -> &ForcedTransitionDisposition {
332 &self.disposition
333 }
334
335 /// Whether the force was authorised and applied.
336 pub fn is_allowed(&self) -> bool {
337 matches!(
338 self.disposition,
339 ForcedTransitionDisposition::Allowed { .. }
340 )
341 }
342
343 /// Whether a successful force bumped the ownership epoch — true for every
344 /// allowed force, since installing a new owner always advances the epoch and so
345 /// fences any old owner that later reappears. `false` for denied/failed
346 /// attempts, where nothing moved.
347 pub fn fenced_old_owner(&self) -> bool {
348 matches!(
349 self.disposition,
350 ForcedTransitionDisposition::Allowed {
351 previous_epoch,
352 new_epoch,
353 ..
354 } if new_epoch > previous_epoch
355 )
356 }
357}
358
359impl std::fmt::Display for ForcedTransitionAudit {
360 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
361 let operator = self
362 .operator
363 .as_ref()
364 .map(|o| o.to_string())
365 .unwrap_or_else(|| "<none>".to_string());
366 let reason = self.reason.as_deref().unwrap_or("<none>");
367 write!(
368 f,
369 "forced ownership transition @ {} ms by operator {} for {}/{} -> {} (reason: {}): ",
370 self.attempted_at_ms, operator, self.collection, self.range_id, self.target, reason,
371 )?;
372 match &self.disposition {
373 ForcedTransitionDisposition::Allowed {
374 previous_owner,
375 new_owner,
376 previous_epoch,
377 new_epoch,
378 previous_version,
379 new_version,
380 } => write!(
381 f,
382 "ALLOWED: {} (epoch {}, version {}) -> {} (epoch {}, version {})",
383 previous_owner, previous_epoch, previous_version, new_owner, new_epoch, new_version,
384 ),
385 ForcedTransitionDisposition::Denied(reason) => write!(f, "DENIED: {reason}"),
386 ForcedTransitionDisposition::Failed(failure) => write!(f, "FAILED: {failure}"),
387 }
388 }
389}
390
391/// Evaluate and, if authorised, apply a forced ownership transition — returning the
392/// durable audit evidence in **every** case.
393///
394/// The authorisation gate is fail-closed and runs before the catalog is consulted:
395/// the request must carry a [`ForceTransitionCapability`]
396/// ([`MissingCapability`](ForceDenial::MissingCapability) otherwise) and a non-empty
397/// [`OperatorReason`] ([`MissingReason`](ForceDenial::MissingReason) otherwise).
398/// Only once both hold is the range looked up; a missing range is a
399/// [`ForceFailure::UnknownRange`]. On success the new owner is installed via
400/// [`RangeOwnership::transfer_to`], bumping the ownership epoch so the old owner is
401/// fenced if it reappears.
402///
403/// Whatever the outcome, the returned [`ForcedTransitionAudit`] records it — the
404/// caller persists it as the operation's audit trail. Denied and failed attempts
405/// leave the catalog untouched.
406pub fn force_transition(
407 catalog: &mut ShardOwnershipCatalog,
408 request: &ForcedTransitionRequest,
409 now_ms: u64,
410) -> ForcedTransitionAudit {
411 let operator = request.capability.as_ref().map(|c| c.operator().clone());
412 let reason = request.reason.as_ref().map(|r| r.as_str().to_string());
413
414 let audit = |disposition| ForcedTransitionAudit {
415 attempted_at_ms: now_ms,
416 operator: operator.clone(),
417 reason: reason.clone(),
418 collection: request.collection.clone(),
419 range_id: request.range_id,
420 target: request.target.clone(),
421 disposition,
422 };
423
424 // Authorisation gate (fail-closed): the distinct capability first, then the
425 // explicit operator reason. Both are required before the catalog is touched.
426 if request.capability.is_none() {
427 return audit(ForcedTransitionDisposition::Denied(
428 ForceDenial::MissingCapability,
429 ));
430 }
431 if request.reason.is_none() {
432 return audit(ForcedTransitionDisposition::Denied(
433 ForceDenial::MissingReason,
434 ));
435 }
436
437 // Authorised. Force bypasses CAS / catch-up evidence / replica membership, but
438 // the range must exist for there to be ownership to move.
439 let Some(current) = catalog.range(&request.collection, request.range_id) else {
440 return audit(ForcedTransitionDisposition::Failed(
441 ForceFailure::UnknownRange,
442 ));
443 };
444
445 let previous_owner = current.owner().clone();
446 let previous_epoch = current.epoch();
447 let previous_version = current.version();
448 // transfer_to bumps both epoch (fencing the old owner) and version.
449 let next = current.transfer_to(request.target.clone(), request.new_replicas.clone());
450 let new_epoch = next.epoch();
451 let new_version = next.version();
452
453 match catalog.apply_update(next) {
454 Ok(_) => audit(ForcedTransitionDisposition::Allowed {
455 previous_owner,
456 new_owner: request.target.clone(),
457 previous_epoch,
458 new_epoch,
459 previous_version,
460 new_version,
461 }),
462 Err(err) => audit(ForcedTransitionDisposition::Failed(ForceFailure::Catalog(
463 err,
464 ))),
465 }
466}
467
468#[cfg(test)]
469mod tests {
470 use super::*;
471 use crate::cluster::ownership::{
472 OwnershipEpoch, PlacementMetadata, RangeBounds, RangeWriteReject, ShardKeyMode,
473 };
474
475 fn collection(name: &str) -> CollectionId {
476 CollectionId::new(name).unwrap()
477 }
478
479 fn ident(cn: &str) -> NodeIdentity {
480 NodeIdentity::from_certificate_subject(cn).unwrap()
481 }
482
483 /// A catalog holding one full-keyspace range owned by `owner` with `replicas`.
484 fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
485 let orders = collection("orders");
486 let mut catalog = ShardOwnershipCatalog::new();
487 catalog
488 .apply_update(RangeOwnership::establish(
489 orders.clone(),
490 RangeId::new(1),
491 ShardKeyMode::Hash,
492 RangeBounds::full(),
493 ident(owner),
494 replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
495 PlacementMetadata::with_replication_factor(3),
496 ))
497 .unwrap();
498 (catalog, orders)
499 }
500
501 fn capability(operator: &str) -> ForceTransitionCapability {
502 ForceTransitionCapability::granted_to(ident(operator))
503 }
504
505 fn reason() -> OperatorReason {
506 OperatorReason::new("primary AZ lost, promoting surviving copy").unwrap()
507 }
508
509 /// A fully-authorised forced request: capability + reason attached.
510 fn authorised_request(orders: &CollectionId, target: &str) -> ForcedTransitionRequest {
511 ForcedTransitionRequest::new(orders.clone(), RangeId::new(1), ident(target))
512 .with_capability(capability("CN=operator-root"))
513 .with_reason(reason())
514 }
515
516 // ---------------------------------------------------------------
517 // OperatorReason: "explicit" must be enforceable.
518 // ---------------------------------------------------------------
519
520 #[test]
521 fn operator_reason_rejects_blank_input() {
522 assert_eq!(OperatorReason::new(""), Err(EmptyOperatorReason));
523 assert_eq!(OperatorReason::new(" "), Err(EmptyOperatorReason));
524 assert_eq!(OperatorReason::new("\t\n "), Err(EmptyOperatorReason));
525 }
526
527 #[test]
528 fn operator_reason_trims_surrounding_whitespace() {
529 let r = OperatorReason::new(" recover orders/1 ").unwrap();
530 assert_eq!(r.as_str(), "recover orders/1");
531 }
532
533 // ---------------------------------------------------------------
534 // Authorisation gate: capability and reason are both required.
535 // ---------------------------------------------------------------
536
537 #[test]
538 fn force_denied_without_capability() {
539 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
540 // Reason present, but no capability.
541 let req = ForcedTransitionRequest::new(orders.clone(), RangeId::new(1), ident("CN=node-b"))
542 .with_reason(reason());
543
544 let audit = force_transition(&mut catalog, &req, 1_000);
545
546 assert!(!audit.is_allowed());
547 assert_eq!(
548 audit.disposition(),
549 &ForcedTransitionDisposition::Denied(ForceDenial::MissingCapability)
550 );
551 // The catalog is untouched: node-a is still owner at the initial epoch.
552 let range = catalog.range(&orders, RangeId::new(1)).unwrap();
553 assert_eq!(range.owner(), &ident("CN=node-a"));
554 assert_eq!(range.epoch(), OwnershipEpoch::initial());
555 // Audit evidence is still emitted for the denied attempt.
556 assert!(audit.to_string().contains("DENIED"));
557 assert_eq!(audit.attempted_at_ms(), 1_000);
558 }
559
560 #[test]
561 fn force_denied_without_reason() {
562 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
563 // Capability present, but no operator reason.
564 let req = ForcedTransitionRequest::new(orders.clone(), RangeId::new(1), ident("CN=node-b"))
565 .with_capability(capability("CN=operator-root"));
566
567 let audit = force_transition(&mut catalog, &req, 2_000);
568
569 assert!(!audit.is_allowed());
570 assert_eq!(
571 audit.disposition(),
572 &ForcedTransitionDisposition::Denied(ForceDenial::MissingReason)
573 );
574 // Operator is recorded (capability was presented) but reason is absent.
575 assert_eq!(audit.operator(), Some(&ident("CN=operator-root")));
576 assert_eq!(audit.reason(), None);
577 // Catalog untouched.
578 let range = catalog.range(&orders, RangeId::new(1)).unwrap();
579 assert_eq!(range.owner(), &ident("CN=node-a"));
580 }
581
582 #[test]
583 fn missing_capability_is_reported_before_missing_reason() {
584 // Fail-closed ordering: with neither present, the capability denial wins.
585 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
586 let req = ForcedTransitionRequest::new(orders, RangeId::new(1), ident("CN=node-b"));
587 let audit = force_transition(&mut catalog, &req, 0);
588 assert_eq!(
589 audit.disposition(),
590 &ForcedTransitionDisposition::Denied(ForceDenial::MissingCapability)
591 );
592 }
593
594 // ---------------------------------------------------------------
595 // Successful forced transition: epoch bump + audit evidence.
596 // ---------------------------------------------------------------
597
598 #[test]
599 fn authorised_force_bumps_epoch_and_moves_owner() {
600 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
601 // node-b is not even a replica — force can still install it.
602 let req = ForcedTransitionRequest::new(orders.clone(), RangeId::new(1), ident("CN=node-z"))
603 .with_capability(capability("CN=operator-root"))
604 .with_reason(reason());
605
606 let audit = force_transition(&mut catalog, &req, 5_000);
607
608 assert!(audit.is_allowed());
609 assert!(audit.fenced_old_owner());
610 match audit.disposition() {
611 ForcedTransitionDisposition::Allowed {
612 previous_owner,
613 new_owner,
614 previous_epoch,
615 new_epoch,
616 previous_version,
617 new_version,
618 } => {
619 assert_eq!(previous_owner, &ident("CN=node-a"));
620 assert_eq!(new_owner, &ident("CN=node-z"));
621 assert_eq!(*previous_epoch, OwnershipEpoch::initial());
622 assert_eq!(new_epoch.value(), 2);
623 assert_eq!(previous_version.value(), 1);
624 assert_eq!(new_version.value(), 2);
625 }
626 other => panic!("expected Allowed, got {other:?}"),
627 }
628
629 // The catalog now makes node-z authoritative at the bumped epoch.
630 let range = catalog.range(&orders, RangeId::new(1)).unwrap();
631 assert_eq!(range.owner(), &ident("CN=node-z"));
632 assert_eq!(range.epoch().value(), 2);
633 }
634
635 #[test]
636 fn audit_evidence_records_operator_reason_and_boundary() {
637 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
638 let req = authorised_request(&orders, "CN=node-b");
639
640 let audit = force_transition(&mut catalog, &req, 7_777);
641
642 assert_eq!(audit.operator(), Some(&ident("CN=operator-root")));
643 assert_eq!(
644 audit.reason(),
645 Some("primary AZ lost, promoting surviving copy")
646 );
647 assert_eq!(audit.attempted_at_ms(), 7_777);
648 assert_eq!(audit.target(), &ident("CN=node-b"));
649 let line = audit.to_string();
650 assert!(line.contains("ALLOWED"));
651 assert!(line.contains("CN=operator-root"));
652 assert!(line.contains("primary AZ lost"));
653 assert!(line.contains("CN=node-a"));
654 assert!(line.contains("CN=node-b"));
655 }
656
657 // ---------------------------------------------------------------
658 // Old owner is fenced once it reappears after a force.
659 // ---------------------------------------------------------------
660
661 #[test]
662 fn reappearing_old_owner_is_fenced_after_force() {
663 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
664 // Before the force node-a (the owner) is admitted at the initial epoch.
665 assert!(catalog
666 .admit_public_write(
667 &ident("CN=node-a"),
668 &orders,
669 b"k",
670 OwnershipEpoch::initial()
671 )
672 .is_ok());
673
674 // Force ownership to node-b, demoting node-a to a replica so role alone
675 // would not fence it — only the epoch bump does.
676 let req = authorised_request(&orders, "CN=node-b").with_replicas([ident("CN=node-a")]);
677 let audit = force_transition(&mut catalog, &req, 1_000);
678 assert!(audit.is_allowed());
679
680 // node-a reappears (partition healed) still believing epoch 1. Its write is
681 // fenced: as a replica it is no longer the owner...
682 let err = catalog
683 .admit_public_write(
684 &ident("CN=node-a"),
685 &orders,
686 b"k",
687 OwnershipEpoch::initial(),
688 )
689 .unwrap_err();
690 assert!(matches!(err, RangeWriteReject::NotOwner { .. }));
691
692 // ...and even an old owner that still believed itself owner would carry the
693 // stale epoch; node-b at the bumped epoch is the one now admitted.
694 let current_epoch = catalog.range(&orders, RangeId::new(1)).unwrap().epoch();
695 assert!(catalog
696 .admit_public_write(&ident("CN=node-b"), &orders, b"k", current_epoch)
697 .is_ok());
698 }
699
700 #[test]
701 fn force_against_unknown_range_fails_and_is_audited() {
702 let mut catalog = ShardOwnershipCatalog::new();
703 let orders = collection("orders");
704 // Authorised, but the range does not exist.
705 let req = authorised_request(&orders, "CN=node-b");
706
707 let audit = force_transition(&mut catalog, &req, 3_000);
708
709 assert!(!audit.is_allowed());
710 assert_eq!(
711 audit.disposition(),
712 &ForcedTransitionDisposition::Failed(ForceFailure::UnknownRange)
713 );
714 // Failed attempts still carry full operator/reason evidence.
715 assert_eq!(audit.operator(), Some(&ident("CN=operator-root")));
716 assert!(audit.reason().is_some());
717 assert!(audit.to_string().contains("FAILED"));
718 }
719
720 #[test]
721 fn ordinary_safety_checks_are_untouched_by_the_force_path() {
722 // The force path neither weakens nor invokes the ordinary transition gate:
723 // an ordinary transition with a stale CAS still loses. This guards the
724 // acceptance criterion that non-force operations keep their safety checks.
725 use crate::cluster::ownership_transition::{
726 run_transition, CommitWatermark, TransitionError, TransitionKind, TransitionRejection,
727 TransitionRequest,
728 };
729 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
730
731 // First, a legitimate force moves authority to node-b (epoch -> 2).
732 let forced = force_transition(&mut catalog, &authorised_request(&orders, "CN=node-b"), 10);
733 assert!(forced.is_allowed());
734
735 // An ordinary transition planner still holding the pre-force CAS (node-a at
736 // epoch 1) is rejected by the ordinary gate — force did not disable it.
737 let stale = TransitionRequest::new(
738 TransitionKind::Promote,
739 orders.clone(),
740 RangeId::new(1),
741 ident("CN=node-a"),
742 OwnershipEpoch::initial(),
743 CatalogVersion::initial(),
744 ident("CN=node-b"),
745 CommitWatermark::new(1, 10),
746 )
747 .with_evidence(crate::cluster::ownership_transition::CatchUpEvidence::new(
748 ident("CN=node-b"),
749 1,
750 10,
751 ));
752 let err = run_transition(&mut catalog, &stale).unwrap_err();
753 assert!(matches!(
754 err,
755 TransitionError::Rejected(TransitionRejection::OwnerMismatch { .. })
756 ));
757 }
758}