Skip to main content

reddb_server/cluster/
drain.rs

1//! Cluster drain and force-remove flows (issue #1000, PRD #987, ADR 0037).
2//!
3//! Removing a data member from a multi-writer cluster is not a single catalog
4//! edit — a member may own ranges (it is the sole writer) and replicate others (it
5//! holds required copies). Dropping it from membership while a range still depends
6//! on it would orphan write authority or lose a copy. This module is the policy
7//! that moves those dependencies off a member *first*, in the two shapes the
8//! glossary names:
9//!
10//! * **Cluster drain** — the *planned* removal of a live, cooperating member. The
11//!   member is marked [`Draining`](super::membership::MemberState::Draining) so it
12//!   stops receiving new range placements, then every range it owns is handed off
13//!   to a caught-up replica through the ordinary fenced transition machine
14//!   ([`super::ownership_transition`]) and every range it replicates is evacuated
15//!   to another member. Only once it owns and replicates nothing is membership
16//!   removal allowed — a removal that still has a dependency is *refused*, not
17//!   forced.
18//! * **Force remove** — the *unplanned* removal of a dead or unrecoverable member.
19//!   The member cannot cooperate (it is gone), so ordinary safety checks cannot be
20//!   satisfied. Under the ADR 0037 forced-ownership rules a `FORCE` order — a
21//!   special administrative capability plus an explicit operator reason — promotes
22//!   the most-caught-up surviving replica even when it cannot prove it covers the
23//!   commit watermark, recording the possible committed-write loss as durable
24//!   audit evidence, and bumps the ownership epoch so the dead owner is fenced if
25//!   it ever reappears. A range with no surviving replica at all is surfaced as
26//!   *unrecoverable* rather than silently dropped.
27//!
28//! ## Purity
29//!
30//! Like the supervisor ([`super::supervisor`]), everything here is a pure policy
31//! over the membership and ownership catalogs plus the [`ClusterSignals`] the
32//! caller injects (per-range commit watermarks and per-candidate catch-up
33//! evidence). There is no clock, network, or engine, so the whole drain /
34//! force-remove / refusal / audit story is exercised deterministically.
35
36use super::identity::NodeIdentity;
37use super::membership::{ClusterMember, MembershipCatalog};
38use super::ownership::{CollectionId, RangeId, RangeOwnership, ShardOwnershipCatalog};
39use super::ownership_transition::{
40    run_transition, CatchUpEvidence, CommitWatermark, TransitionError, TransitionKind,
41    TransitionOutcome, TransitionRequest,
42};
43use super::supervisor::ClusterSignals;
44
45// =============================================================================
46// Planned drain
47// =============================================================================
48
49/// One scheduled step that moves a single range's dependency off the draining
50/// member. A complete [`DrainPlan`] is a list of these plus any [`DrainBlock`]s
51/// that could not be scheduled.
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum DrainStep {
54    /// The draining member *owns* this range: hand write authority off to a
55    /// caught-up replica through a fenced [`Handoff`](TransitionKind::Handoff)
56    /// transition.
57    Handoff(OwnedHandoff),
58    /// The draining member *replicates* this range: drop its copy from the owner's
59    /// replica set, moving the copy to a `replacement` member if one is needed to
60    /// keep the range at its replication factor.
61    Evacuate(ReplicaEvacuation),
62}
63
64/// A scheduled hand-off of an owned range away from the draining member to a
65/// safe, caught-up replica. The [`TransitionRequest`] already carries the
66/// three-part CAS, the commit watermark, and the target's catch-up evidence, so
67/// it runs through [`run_transition`] unchanged.
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct OwnedHandoff {
70    pub collection: CollectionId,
71    pub range_id: RangeId,
72    pub target: NodeIdentity,
73    pub request: TransitionRequest,
74}
75
76/// A scheduled evacuation of the draining member's *replica* copy of a range.
77/// `replacement` is `Some` when a new host was assigned to preserve the range's
78/// replication factor, or `None` when the range is already replicated enough to
79/// drop the copy outright. `next` is the catalog entry the evacuation installs
80/// (a replica-set change — no epoch bump, since write authority does not move).
81#[derive(Debug, Clone, PartialEq, Eq)]
82pub struct ReplicaEvacuation {
83    pub collection: CollectionId,
84    pub range_id: RangeId,
85    pub replacement: Option<NodeIdentity>,
86    pub next: RangeOwnership,
87}
88
89/// Why a range could not be scheduled off the draining member. Surfaced rather
90/// than silently skipped, so an operator sees exactly which range is holding the
91/// drain open (and removal blocked).
92#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub enum DrainBlockReason {
94    /// An *owned* range has no safe hand-off target — no replica is an active data
95    /// member with catch-up evidence covering the commit watermark. Handing off to
96    /// a node that has not caught up could lose committed writes.
97    NoSafeHandoffTarget,
98    /// A *replicated* range cannot shed the draining member's copy without dropping
99    /// below its replication factor, and no eligible member is free to host a
100    /// replacement copy.
101    NoReplacementReplica,
102}
103
104impl DrainBlockReason {
105    fn label(self) -> &'static str {
106        match self {
107            DrainBlockReason::NoSafeHandoffTarget => {
108                "no caught-up replica is a safe hand-off target"
109            }
110            DrainBlockReason::NoReplacementReplica => {
111                "no eligible member can host a replacement replica"
112            }
113        }
114    }
115}
116
117/// A range that blocks the drain: it still depends on the draining member and
118/// could not be moved.
119#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct DrainBlock {
121    pub collection: CollectionId,
122    pub range_id: RangeId,
123    pub reason: DrainBlockReason,
124}
125
126/// The planned drain of one member: the steps that move its ranges off it, and
127/// the ranges that could not be moved. A member that owns and replicates nothing
128/// yields an empty plan ([`is_empty`](Self::is_empty)); a plan with no
129/// [`blocked`](Self::blocked) entries is [`complete`](Self::is_complete) and the
130/// member becomes removable once the steps are applied.
131#[derive(Debug, Clone, PartialEq, Eq)]
132pub struct DrainPlan {
133    pub member: NodeIdentity,
134    pub steps: Vec<DrainStep>,
135    pub blocked: Vec<DrainBlock>,
136}
137
138impl DrainPlan {
139    /// Nothing to move and nothing blocked — the member already holds no ranges.
140    pub fn is_empty(&self) -> bool {
141        self.steps.is_empty() && self.blocked.is_empty()
142    }
143
144    /// Every dependency could be scheduled (no [`blocked`](Self::blocked) ranges),
145    /// so applying the steps fully drains the member and removal will be allowed.
146    pub fn is_complete(&self) -> bool {
147        self.blocked.is_empty()
148    }
149}
150
151/// Plan a member's drain **without** mutating either catalog. For every range the
152/// member owns, schedule a fenced hand-off to the safest caught-up replica; for
153/// every range it replicates, schedule an evacuation (with a replacement host
154/// when the replication factor requires one). Ranges that cannot be moved become
155/// [`DrainBlock`]s.
156pub fn plan_drain(
157    member: &NodeIdentity,
158    membership: &MembershipCatalog,
159    ownership: &ShardOwnershipCatalog,
160    signals: &impl ClusterSignals,
161) -> DrainPlan {
162    let mut steps = Vec::new();
163    let mut blocked = Vec::new();
164
165    for range in ownership.entries() {
166        let collection = range.collection().clone();
167        let range_id = range.range_id();
168
169        if range.owner() == member {
170            // Owned range: hand authority off to a caught-up replica.
171            let watermark = signals.commit_watermark(&collection, range_id);
172            match select_handoff_target(range, member, membership, watermark, signals) {
173                Some((target, evidence)) => {
174                    let request = TransitionRequest::new(
175                        TransitionKind::Handoff,
176                        collection.clone(),
177                        range_id,
178                        member.clone(),
179                        range.epoch(),
180                        range.version(),
181                        target.clone(),
182                        watermark,
183                    )
184                    .with_evidence(evidence)
185                    .with_replicas(without(range.replicas(), &target));
186                    steps.push(DrainStep::Handoff(OwnedHandoff {
187                        collection,
188                        range_id,
189                        target,
190                        request,
191                    }));
192                }
193                None => blocked.push(DrainBlock {
194                    collection,
195                    range_id,
196                    reason: DrainBlockReason::NoSafeHandoffTarget,
197                }),
198            }
199        } else if range.replicas().contains(member) {
200            // Replicated range: drop the member's copy, adding a replacement host
201            // if dropping it would take the range below its replication factor.
202            let remaining = without(range.replicas(), member);
203            // copies after dropping = owner (1) + remaining replicas.
204            let copies_after = 1 + remaining.len();
205            let required = range.placement().replication_factor();
206            if copies_after >= required {
207                let next = range.update_replicas(remaining);
208                steps.push(DrainStep::Evacuate(ReplicaEvacuation {
209                    collection,
210                    range_id,
211                    replacement: None,
212                    next,
213                }));
214            } else if let Some(replacement) = select_replacement_replica(range, member, membership)
215            {
216                let mut replicas = remaining;
217                replicas.push(replacement.clone());
218                let next = range.update_replicas(replicas);
219                steps.push(DrainStep::Evacuate(ReplicaEvacuation {
220                    collection,
221                    range_id,
222                    replacement: Some(replacement),
223                    next,
224                }));
225            } else {
226                blocked.push(DrainBlock {
227                    collection,
228                    range_id,
229                    reason: DrainBlockReason::NoReplacementReplica,
230                });
231            }
232        }
233    }
234
235    DrainPlan {
236        member: member.clone(),
237        steps,
238        blocked,
239    }
240}
241
242/// The outcome of running a drain: the result of each scheduled step (a hand-off
243/// outcome or an evacuation), and the ranges that stayed blocked. The membership
244/// catalog is untouched — removal is the separate, gated
245/// [`commit_drain_removal`] step.
246#[derive(Debug, Clone, PartialEq, Eq)]
247pub struct DrainOutcome {
248    pub member: NodeIdentity,
249    /// Each owned-range hand-off's transition result, in plan order.
250    pub handoffs: Vec<Result<TransitionOutcome, TransitionError>>,
251    /// Each replica evacuation that was applied, in plan order.
252    pub evacuations: Vec<ReplicaEvacuation>,
253    /// Ranges that could not be scheduled and still block the drain.
254    pub blocked: Vec<DrainBlock>,
255}
256
257impl DrainOutcome {
258    /// Did every scheduled step apply cleanly and nothing stay blocked? When true,
259    /// the member now owns and replicates nothing and [`commit_drain_removal`]
260    /// will succeed.
261    pub fn is_drained(&self) -> bool {
262        self.blocked.is_empty() && self.handoffs.iter().all(Result::is_ok)
263    }
264}
265
266/// Plan a drain and apply its steps to the ownership catalog. Owned-range
267/// hand-offs run through the fenced transition machine; replica evacuations are
268/// applied as replica-set updates. Membership is **not** changed here — finishing
269/// the removal is the gated [`commit_drain_removal`].
270pub fn run_drain(
271    member: &NodeIdentity,
272    membership: &MembershipCatalog,
273    ownership: &mut ShardOwnershipCatalog,
274    signals: &impl ClusterSignals,
275) -> DrainOutcome {
276    let plan = plan_drain(member, membership, ownership, signals);
277    let mut handoffs = Vec::new();
278    let mut evacuations = Vec::new();
279
280    for step in plan.steps {
281        match step {
282            DrainStep::Handoff(handoff) => {
283                handoffs.push(run_transition(ownership, &handoff.request));
284            }
285            DrainStep::Evacuate(evac) => {
286                // A replica-set update strictly advances the entry version, so it
287                // cannot fail the catalog's monotonicity check; surface any
288                // catalog error by leaving the catalog untouched is unnecessary —
289                // apply and record.
290                if ownership.apply_update(evac.next.clone()).is_ok() {
291                    evacuations.push(evac);
292                }
293            }
294        }
295    }
296
297    DrainOutcome {
298        member: member.clone(),
299        handoffs,
300        evacuations,
301        blocked: plan.blocked,
302    }
303}
304
305/// Why a membership removal was refused. Every variant leaves both catalogs
306/// untouched — a planned removal fails closed while any dependency remains.
307#[derive(Debug, Clone, PartialEq, Eq)]
308pub enum RemovalRejection {
309    /// The node is not a member of this cluster.
310    NotAMember { member: NodeIdentity },
311    /// The member has not been marked draining. Planned removal must mark a member
312    /// [`Draining`](super::membership::MemberState::Draining) first (use
313    /// [`MembershipCatalog::begin_drain`]); force-remove is the path for a member
314    /// that was never drained.
315    NotDraining { member: NodeIdentity },
316    /// The member still owns these ranges — removing it would orphan their write
317    /// authority. Drain must hand them off first.
318    StillOwnsRanges {
319        member: NodeIdentity,
320        ranges: Vec<(CollectionId, RangeId)>,
321    },
322    /// The member still holds replica copies of these ranges — removing it would
323    /// drop required copies. Drain must evacuate them first.
324    StillReplicaFor {
325        member: NodeIdentity,
326        ranges: Vec<(CollectionId, RangeId)>,
327    },
328}
329
330impl std::fmt::Display for RemovalRejection {
331    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332        match self {
333            Self::NotAMember { member } => write!(f, "{member} is not a cluster member"),
334            Self::NotDraining { member } => {
335                write!(f, "{member} must be marked draining before planned removal")
336            }
337            Self::StillOwnsRanges { member, ranges } => write!(
338                f,
339                "{member} cannot be removed: still owns {} range(s)",
340                ranges.len()
341            ),
342            Self::StillReplicaFor { member, ranges } => write!(
343                f,
344                "{member} cannot be removed: still replicates {} range(s)",
345                ranges.len()
346            ),
347        }
348    }
349}
350
351impl std::error::Error for RemovalRejection {}
352
353/// Finish a planned drain by removing the member from the catalog — but only if
354/// it is a draining member that no longer owns or replicates any range. Refuses
355/// (leaving membership untouched) on any remaining dependency, so a member is
356/// never removed out from under a range that still needs it.
357pub fn commit_drain_removal(
358    member: &NodeIdentity,
359    membership: &mut MembershipCatalog,
360    ownership: &ShardOwnershipCatalog,
361) -> Result<ClusterMember, RemovalRejection> {
362    match membership.member(member) {
363        None => {
364            return Err(RemovalRejection::NotAMember {
365                member: member.clone(),
366            })
367        }
368        Some(m) if !m.is_draining() => {
369            return Err(RemovalRejection::NotDraining {
370                member: member.clone(),
371            })
372        }
373        Some(_) => {}
374    }
375
376    let (owned, replicated) = range_dependencies(member, ownership);
377    if !owned.is_empty() {
378        return Err(RemovalRejection::StillOwnsRanges {
379            member: member.clone(),
380            ranges: owned,
381        });
382    }
383    if !replicated.is_empty() {
384        return Err(RemovalRejection::StillReplicaFor {
385            member: member.clone(),
386            ranges: replicated,
387        });
388    }
389
390    Ok(membership
391        .remove(member)
392        .expect("membership presence checked above"))
393}
394
395// =============================================================================
396// Force remove (dead / unrecoverable member)
397// =============================================================================
398
399/// Proof that the caller holds the special administrative `FORCE` capability that
400/// ADR 0037 requires for forced ownership transitions. Constructing one *is* the
401/// capability check at the call boundary; the policy here only proceeds when given
402/// one, so a forced removal can never be requested without it.
403#[derive(Debug, Clone, PartialEq, Eq)]
404pub struct ForceCapability {
405    holder: String,
406}
407
408impl ForceCapability {
409    /// Mint a capability for `holder` (an operator/identity the audit trail
410    /// records as the authority behind the forced removal).
411    pub fn granted(holder: impl Into<String>) -> Self {
412        Self {
413            holder: holder.into(),
414        }
415    }
416
417    pub fn holder(&self) -> &str {
418        &self.holder
419    }
420}
421
422/// Why a [`ForceRemoveOrder`] could not be built.
423#[derive(Debug, Clone, Copy, PartialEq, Eq)]
424pub enum ForceRemoveOrderError {
425    /// ADR 0037 requires an *explicit operator reason*; a blank reason is refused
426    /// so the audit trail can never record an unexplained forced removal.
427    EmptyReason,
428}
429
430impl std::fmt::Display for ForceRemoveOrderError {
431    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
432        match self {
433            Self::EmptyReason => write!(f, "a forced removal requires an explicit operator reason"),
434        }
435    }
436}
437
438impl std::error::Error for ForceRemoveOrderError {}
439
440/// A fully authorized order to force-remove a dead/unrecoverable member: the
441/// administrative capability, the target member, and the explicit operator
442/// reason. Built with [`ForceRemoveOrder::new`], which enforces the non-empty
443/// reason.
444#[derive(Debug, Clone, PartialEq, Eq)]
445pub struct ForceRemoveOrder {
446    capability: ForceCapability,
447    member: NodeIdentity,
448    reason: String,
449}
450
451impl ForceRemoveOrder {
452    pub fn new(
453        capability: ForceCapability,
454        member: NodeIdentity,
455        reason: impl Into<String>,
456    ) -> Result<Self, ForceRemoveOrderError> {
457        let reason = reason.into();
458        if reason.trim().is_empty() {
459            return Err(ForceRemoveOrderError::EmptyReason);
460        }
461        Ok(Self {
462            capability,
463            member,
464            reason,
465        })
466    }
467
468    pub fn member(&self) -> &NodeIdentity {
469        &self.member
470    }
471
472    pub fn reason(&self) -> &str {
473        &self.reason
474    }
475
476    pub fn capability(&self) -> &ForceCapability {
477        &self.capability
478    }
479}
480
481/// A forced promotion of one owned range away from the dead member to the
482/// best-available surviving replica. Unlike a planned hand-off this proceeds even
483/// when the target cannot prove it covers the commit watermark — `covers_watermark`
484/// records whether it could, so the audit trail captures any possible
485/// committed-write loss. `next` is the fenced catalog entry (epoch bumped) that
486/// activation installs.
487#[derive(Debug, Clone, PartialEq, Eq)]
488pub struct ForcedPromotion {
489    pub collection: CollectionId,
490    pub range_id: RangeId,
491    pub dead_owner: NodeIdentity,
492    pub new_owner: NodeIdentity,
493    /// Whether the promoted replica's applied log covers the commit watermark. When
494    /// `false`, writes past the replica's applied point may be lost — the price of
495    /// recovering a range whose owner is gone.
496    pub covers_watermark: bool,
497    /// The promoted replica's catch-up evidence, if any was known.
498    pub evidence: Option<CatchUpEvidence>,
499    pub next: RangeOwnership,
500}
501
502/// An owned range that could **not** be force-recovered: the dead member was its
503/// owner and no surviving replica exists to promote. The range's data is lost
504/// with the member — recorded so the operator sees exactly what was unrecoverable.
505#[derive(Debug, Clone, PartialEq, Eq)]
506pub struct ForcedBlock {
507    pub collection: CollectionId,
508    pub range_id: RangeId,
509    pub dead_owner: NodeIdentity,
510}
511
512/// The plan for a forced removal: the owned ranges to force-promote, the
513/// replicated ranges whose dead copy is dropped, and the owned ranges that are
514/// unrecoverable. Produced by [`plan_force_remove`] without mutating any catalog.
515#[derive(Debug, Clone, PartialEq, Eq)]
516pub struct ForceRemovePlan {
517    pub member: NodeIdentity,
518    pub reason: String,
519    pub capability_holder: String,
520    pub promotions: Vec<ForcedPromotion>,
521    /// Ranges where the dead member was only a replica: its copy is dropped from
522    /// the live owner's replica set (no epoch bump). Each entry is the catalog
523    /// update to apply.
524    pub replica_drops: Vec<RangeOwnership>,
525    pub unrecoverable: Vec<ForcedBlock>,
526}
527
528/// Plan the forced removal of a dead/unrecoverable member **without** mutating any
529/// catalog, under the ADR 0037 forced-ownership rules. For each owned range, pick
530/// the best surviving replica (preferring one that covers the commit watermark,
531/// then the furthest-applied) and force-promote it; a range with no surviving
532/// replica is recorded as unrecoverable. For each replicated range, drop the dead
533/// member's copy.
534pub fn plan_force_remove(
535    order: &ForceRemoveOrder,
536    membership: &MembershipCatalog,
537    ownership: &ShardOwnershipCatalog,
538    signals: &impl ClusterSignals,
539) -> ForceRemovePlan {
540    let member = order.member();
541    let mut promotions = Vec::new();
542    let mut replica_drops = Vec::new();
543    let mut unrecoverable = Vec::new();
544
545    for range in ownership.entries() {
546        let collection = range.collection().clone();
547        let range_id = range.range_id();
548
549        if range.owner() == member {
550            let watermark = signals.commit_watermark(&collection, range_id);
551            match select_force_target(range, member, membership, watermark, signals) {
552                Some((target, covers_watermark, evidence)) => {
553                    let next =
554                        range.transfer_to(target.clone(), without(range.replicas(), &target));
555                    promotions.push(ForcedPromotion {
556                        collection,
557                        range_id,
558                        dead_owner: member.clone(),
559                        new_owner: target,
560                        covers_watermark,
561                        evidence,
562                        next,
563                    });
564                }
565                None => unrecoverable.push(ForcedBlock {
566                    collection,
567                    range_id,
568                    dead_owner: member.clone(),
569                }),
570            }
571        } else if range.replicas().contains(member) {
572            replica_drops.push(range.update_replicas(without(range.replicas(), member)));
573        }
574    }
575
576    ForceRemovePlan {
577        member: member.clone(),
578        reason: order.reason().to_string(),
579        capability_holder: order.capability().holder().to_string(),
580        promotions,
581        replica_drops,
582        unrecoverable,
583    }
584}
585
586/// The durable audit evidence of a forced removal: who authorized it, why, which
587/// ranges moved (and whether each may have lost writes), which were unrecoverable,
588/// and how many stale replica copies were dropped. ADR 0037 requires a forced
589/// transition to leave exactly this trail; its [`Display`](std::fmt::Display) is a
590/// single audit line.
591#[derive(Debug, Clone, PartialEq, Eq)]
592pub struct ForceRemoveAudit {
593    pub member: NodeIdentity,
594    pub capability_holder: String,
595    pub reason: String,
596    /// `(collection, range_id, new_owner, covers_watermark)` for each forced
597    /// promotion. A `false` flag marks a range that may have lost committed writes.
598    pub promotions: Vec<(CollectionId, RangeId, NodeIdentity, bool)>,
599    pub unrecoverable: Vec<(CollectionId, RangeId)>,
600    pub replica_copies_dropped: usize,
601}
602
603impl ForceRemoveAudit {
604    /// Whether any forced promotion could not prove it covered the commit
605    /// watermark — i.e. the forced removal may have lost committed writes.
606    pub fn has_potential_write_loss(&self) -> bool {
607        self.promotions.iter().any(|(_, _, _, covers)| !covers) || !self.unrecoverable.is_empty()
608    }
609}
610
611impl std::fmt::Display for ForceRemoveAudit {
612    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
613        write!(
614            f,
615            "FORCE remove {} by {} (reason: {}): {} range(s) force-promoted, {} unrecoverable, {} stale replica copies dropped",
616            self.member,
617            self.capability_holder,
618            self.reason,
619            self.promotions.len(),
620            self.unrecoverable.len(),
621            self.replica_copies_dropped,
622        )?;
623        if self.has_potential_write_loss() {
624            write!(f, "; POTENTIAL WRITE LOSS")?;
625        }
626        Ok(())
627    }
628}
629
630/// The result of running a forced removal: the audit evidence, the activated
631/// promotion outcomes, the unrecoverable ranges, and the removed member (if it was
632/// a member). The dead member is removed from the catalog regardless of
633/// unrecoverable ranges — it is gone; the unrecoverable ranges are recorded for
634/// the operator, not a reason to keep a dead member listed.
635#[derive(Debug, Clone, PartialEq, Eq)]
636pub struct ForceRemoveResult {
637    pub audit: ForceRemoveAudit,
638    pub promotions: Vec<TransitionOutcome>,
639    pub unrecoverable: Vec<ForcedBlock>,
640    pub removed: Option<ClusterMember>,
641}
642
643/// Plan and execute a forced removal: force-promote each owned range's surviving
644/// replica (fencing the dead owner via the epoch bump), drop the dead member's
645/// stale replica copies, then remove it from membership. Returns the audit
646/// evidence and outcomes. Unlike a planned drain, this never refuses — a dead
647/// member is removed even when some of its ranges are unrecoverable.
648pub fn run_force_remove(
649    order: &ForceRemoveOrder,
650    membership: &mut MembershipCatalog,
651    ownership: &mut ShardOwnershipCatalog,
652    signals: &impl ClusterSignals,
653) -> ForceRemoveResult {
654    let plan = plan_force_remove(order, membership, ownership, signals);
655
656    let mut promotion_outcomes = Vec::new();
657    let mut audit_promotions = Vec::new();
658    for promotion in &plan.promotions {
659        let previous_owner = promotion.dead_owner.clone();
660        let new_epoch = promotion.next.epoch();
661        let previous_epoch = ownership
662            .range(&promotion.collection, promotion.range_id)
663            .map(RangeOwnership::epoch)
664            .unwrap_or(new_epoch);
665        let new_version = promotion.next.version();
666        let previous_version = ownership
667            .range(&promotion.collection, promotion.range_id)
668            .map(RangeOwnership::version)
669            .unwrap_or(new_version);
670        let watermark = signals.commit_watermark(&promotion.collection, promotion.range_id);
671        if ownership.apply_update(promotion.next.clone()).is_ok() {
672            audit_promotions.push((
673                promotion.collection.clone(),
674                promotion.range_id,
675                promotion.new_owner.clone(),
676                promotion.covers_watermark,
677            ));
678            promotion_outcomes.push(TransitionOutcome {
679                kind: TransitionKind::Promote,
680                collection: promotion.collection.clone(),
681                range_id: promotion.range_id,
682                previous_owner,
683                new_owner: promotion.new_owner.clone(),
684                previous_epoch,
685                new_epoch,
686                previous_version,
687                new_version,
688                watermark,
689            });
690        }
691    }
692
693    let mut replica_copies_dropped = 0;
694    for drop in &plan.replica_drops {
695        if ownership.apply_update(drop.clone()).is_ok() {
696            replica_copies_dropped += 1;
697        }
698    }
699
700    let removed = membership.remove(order.member());
701
702    let audit = ForceRemoveAudit {
703        member: order.member().clone(),
704        capability_holder: plan.capability_holder,
705        reason: plan.reason,
706        promotions: audit_promotions,
707        unrecoverable: plan
708            .unrecoverable
709            .iter()
710            .map(|b| (b.collection.clone(), b.range_id))
711            .collect(),
712        replica_copies_dropped,
713    };
714
715    ForceRemoveResult {
716        audit,
717        promotions: promotion_outcomes,
718        unrecoverable: plan.unrecoverable,
719        removed,
720    }
721}
722
723// =============================================================================
724// Status reporting
725// =============================================================================
726
727/// A snapshot of one member's drain posture for operator/status reporting: its
728/// draining flag, the ranges it still owns and replicates, the count of steps a
729/// drain would schedule, the ranges currently blocking it, and whether it is
730/// removable right now.
731#[derive(Debug, Clone, PartialEq, Eq)]
732pub struct DrainStatus {
733    pub member: NodeIdentity,
734    pub is_member: bool,
735    pub is_draining: bool,
736    pub owned_ranges: Vec<(CollectionId, RangeId)>,
737    pub replicated_ranges: Vec<(CollectionId, RangeId)>,
738    pub planned_steps: usize,
739    pub blocked: Vec<DrainBlock>,
740    /// True when the member is draining and depends on no range — a
741    /// [`commit_drain_removal`] would succeed.
742    pub removable: bool,
743}
744
745/// Report a member's current drain status without mutating anything — the read
746/// side of the drain flow an operator surface renders.
747pub fn drain_status(
748    member: &NodeIdentity,
749    membership: &MembershipCatalog,
750    ownership: &ShardOwnershipCatalog,
751    signals: &impl ClusterSignals,
752) -> DrainStatus {
753    let member_entry = membership.member(member);
754    let is_member = member_entry.is_some();
755    let is_draining = member_entry.is_some_and(ClusterMember::is_draining);
756    let (owned_ranges, replicated_ranges) = range_dependencies(member, ownership);
757    let plan = plan_drain(member, membership, ownership, signals);
758    let removable = is_draining && owned_ranges.is_empty() && replicated_ranges.is_empty();
759
760    DrainStatus {
761        member: member.clone(),
762        is_member,
763        is_draining,
764        owned_ranges,
765        replicated_ranges,
766        planned_steps: plan.steps.len(),
767        blocked: plan.blocked,
768        removable,
769    }
770}
771
772// =============================================================================
773// Internal helpers
774// =============================================================================
775
776/// `(owned, replicated)` ranges for `member`, each in `(collection, range_id)`
777/// order — the dependencies that must be cleared before removal.
778fn range_dependencies(
779    member: &NodeIdentity,
780    ownership: &ShardOwnershipCatalog,
781) -> (Vec<(CollectionId, RangeId)>, Vec<(CollectionId, RangeId)>) {
782    let mut owned = Vec::new();
783    let mut replicated = Vec::new();
784    for range in ownership.entries() {
785        if range.owner() == member {
786            owned.push((range.collection().clone(), range.range_id()));
787        } else if range.replicas().contains(member) {
788            replicated.push((range.collection().clone(), range.range_id()));
789        }
790    }
791    (owned, replicated)
792}
793
794/// `replicas` without `node`, preserving order.
795fn without(replicas: &[NodeIdentity], node: &NodeIdentity) -> Vec<NodeIdentity> {
796    replicas.iter().filter(|r| *r != node).cloned().collect()
797}
798
799/// The safest caught-up hand-off target for an owned range: a current replica that
800/// is an active data member (so not the draining member, not a witness, not
801/// another draining member) and whose catch-up evidence covers the commit
802/// watermark. Prefers the furthest-applied candidate, breaking ties by stable
803/// identity order. `None` when no replica is a safe target.
804fn select_handoff_target(
805    range: &RangeOwnership,
806    member: &NodeIdentity,
807    membership: &MembershipCatalog,
808    watermark: CommitWatermark,
809    signals: &impl ClusterSignals,
810) -> Option<(NodeIdentity, CatchUpEvidence)> {
811    let mut best: Option<(CatchUpEvidence, NodeIdentity)> = None;
812    for candidate in range.replicas() {
813        if candidate == member {
814            continue;
815        }
816        if !membership
817            .member(candidate)
818            .is_some_and(ClusterMember::is_placement_eligible)
819        {
820            continue;
821        }
822        let Some(evidence) = signals.catch_up(range.collection(), range.range_id(), candidate)
823        else {
824            continue;
825        };
826        if !evidence.covers(watermark) {
827            continue;
828        }
829        let applied = (evidence.applied_term, evidence.applied_lsn);
830        let better = match &best {
831            None => true,
832            Some((best_ev, best_id)) => {
833                applied > (best_ev.applied_term, best_ev.applied_lsn)
834                    || (applied == (best_ev.applied_term, best_ev.applied_lsn)
835                        && candidate < best_id)
836            }
837        };
838        if better {
839            best = Some((evidence, candidate.clone()));
840        }
841    }
842    best.map(|(evidence, id)| (id, evidence))
843}
844
845/// The best surviving replica to force-promote for a dead owner's range, under the
846/// forced-ownership rules: any current replica that is still an active data member
847/// (other than the dead member). Prefers a replica that covers the commit
848/// watermark, then the furthest-applied, then stable identity order — so the
849/// forced promotion minimises loss even though it does not *require* coverage.
850/// Returns `(target, covers_watermark, evidence)`, or `None` when no replica
851/// survives.
852fn select_force_target(
853    range: &RangeOwnership,
854    member: &NodeIdentity,
855    membership: &MembershipCatalog,
856    watermark: CommitWatermark,
857    signals: &impl ClusterSignals,
858) -> Option<(NodeIdentity, bool, Option<CatchUpEvidence>)> {
859    // Rank key: (covers_watermark, (applied_term, applied_lsn)). A replica with no
860    // evidence ranks at (false, (0, 0)) — still eligible (the owner is dead), just
861    // least preferred.
862    let mut best: Option<(bool, (u64, u64), NodeIdentity, Option<CatchUpEvidence>)> = None;
863    for candidate in range.replicas() {
864        if candidate == member {
865            continue;
866        }
867        if !membership
868            .member(candidate)
869            .is_some_and(ClusterMember::is_placement_eligible)
870        {
871            continue;
872        }
873        let evidence = signals.catch_up(range.collection(), range.range_id(), candidate);
874        let covers = evidence.as_ref().is_some_and(|e| e.covers(watermark));
875        let applied = evidence
876            .as_ref()
877            .map(|e| (e.applied_term, e.applied_lsn))
878            .unwrap_or((0, 0));
879        let better = match &best {
880            None => true,
881            Some((best_covers, best_applied, best_id, _)) => {
882                (covers, applied) > (*best_covers, *best_applied)
883                    || ((covers, applied) == (*best_covers, *best_applied) && candidate < best_id)
884            }
885        };
886        if better {
887            best = Some((covers, applied, candidate.clone(), evidence));
888        }
889    }
890    best.map(|(covers, _, id, evidence)| (id, covers, evidence))
891}
892
893/// An eligible host for a replacement replica copy when evacuating the draining
894/// member would otherwise drop a range below its replication factor: an active
895/// data member that is not the draining member and does not already hold the range
896/// (as owner or replica). Lowest stable identity wins, for determinism. `None`
897/// when no member is free to take a copy.
898fn select_replacement_replica(
899    range: &RangeOwnership,
900    member: &NodeIdentity,
901    membership: &MembershipCatalog,
902) -> Option<NodeIdentity> {
903    membership
904        .placement_eligible_members()
905        .map(ClusterMember::identity)
906        .find(|id| *id != member && range.owner() != *id && !range.replicas().contains(id))
907        .cloned()
908}
909
910#[cfg(test)]
911mod tests {
912    use super::*;
913    use crate::cluster::membership::{ClusterId, MemberKind};
914    use crate::cluster::ownership::{
915        OwnershipEpoch, PlacementMetadata, RangeBounds, RangeRole, RangeWriteReject, ShardKeyMode,
916    };
917    use std::collections::HashMap;
918
919    fn ident(cn: &str) -> NodeIdentity {
920        NodeIdentity::from_certificate_subject(cn).unwrap()
921    }
922
923    fn collection(name: &str) -> CollectionId {
924        CollectionId::new(name).unwrap()
925    }
926
927    fn data_member(cn: &str) -> ClusterMember {
928        ClusterMember::joined_empty(ident(cn), MemberKind::Data)
929    }
930
931    fn membership(members: &[&str]) -> MembershipCatalog {
932        MembershipCatalog::new(
933            ClusterId::new("cluster-x").unwrap(),
934            members.iter().map(|m| data_member(m)),
935        )
936    }
937
938    /// A single full-keyspace range `orders/1` owned by `owner` with `replicas` and
939    /// the given replication factor.
940    fn catalog_with_rf(
941        owner: &str,
942        replicas: &[&str],
943        rf: usize,
944    ) -> (ShardOwnershipCatalog, CollectionId) {
945        let orders = collection("orders");
946        let mut catalog = ShardOwnershipCatalog::new();
947        catalog
948            .apply_update(RangeOwnership::establish(
949                orders.clone(),
950                RangeId::new(1),
951                ShardKeyMode::Hash,
952                RangeBounds::full(),
953                ident(owner),
954                replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
955                PlacementMetadata::with_replication_factor(rf),
956            ))
957            .unwrap();
958        (catalog, orders)
959    }
960
961    fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
962        catalog_with_rf(owner, replicas, 3)
963    }
964
965    /// A scripted [`ClusterSignals`]: one shared watermark and per-candidate
966    /// catch-up evidence keyed by CN. Health signals are not consulted by the drain
967    /// flows, so only the watermark/catch-up surface is scripted.
968    struct FakeSignals {
969        watermark: CommitWatermark,
970        catch_up: HashMap<NodeIdentity, CatchUpEvidence>,
971    }
972
973    impl FakeSignals {
974        fn new(watermark: CommitWatermark) -> Self {
975            Self {
976                watermark,
977                catch_up: HashMap::new(),
978            }
979        }
980
981        fn with_catch_up(mut self, cn: &str, applied_term: u64, applied_lsn: u64) -> Self {
982            self.catch_up.insert(
983                ident(cn),
984                CatchUpEvidence::new(ident(cn), applied_term, applied_lsn),
985            );
986            self
987        }
988    }
989
990    impl ClusterSignals for FakeSignals {
991        fn member_signals(
992            &self,
993            _member: &NodeIdentity,
994        ) -> crate::cluster::supervisor::MemberSignals {
995            crate::cluster::supervisor::MemberSignals::healthy()
996        }
997
998        fn commit_watermark(
999            &self,
1000            _collection: &CollectionId,
1001            _range_id: RangeId,
1002        ) -> CommitWatermark {
1003            self.watermark
1004        }
1005
1006        fn catch_up(
1007            &self,
1008            _collection: &CollectionId,
1009            _range_id: RangeId,
1010            candidate: &NodeIdentity,
1011        ) -> Option<CatchUpEvidence> {
1012            self.catch_up.get(candidate).cloned()
1013        }
1014    }
1015
1016    // --- membership drain state -------------------------------------------
1017
1018    #[test]
1019    fn begin_drain_marks_member_and_excludes_from_placement() {
1020        let mut members = membership(&["CN=node-a", "CN=node-b"]);
1021        assert!(members
1022            .member(&ident("CN=node-a"))
1023            .unwrap()
1024            .is_placement_eligible());
1025
1026        let changed = members.begin_drain(&ident("CN=node-a"));
1027        assert_eq!(changed, Some(true));
1028        assert!(members.member(&ident("CN=node-a")).unwrap().is_draining());
1029        // Idempotent: marking again reports no change.
1030        assert_eq!(members.begin_drain(&ident("CN=node-a")), Some(false));
1031        // A draining member is no longer a placement target.
1032        assert!(!members
1033            .member(&ident("CN=node-a"))
1034            .unwrap()
1035            .is_placement_eligible());
1036        let eligible: Vec<_> = members
1037            .placement_eligible_members()
1038            .map(|m| m.identity().clone())
1039            .collect();
1040        assert_eq!(eligible, vec![ident("CN=node-b")]);
1041        // A non-member cannot be drained.
1042        assert_eq!(members.begin_drain(&ident("CN=ghost")), None);
1043    }
1044
1045    // --- successful drain --------------------------------------------------
1046
1047    #[test]
1048    fn successful_drain_moves_all_ranges_then_allows_removal() {
1049        // node-a owns orders/1 and replicates a second range; both must move before
1050        // it can be removed.
1051        let mut members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
1052        let orders = collection("orders");
1053        let mut catalog = ShardOwnershipCatalog::new();
1054        // orders/1 owned by node-a, replicated by node-b and node-c.
1055        catalog
1056            .apply_update(RangeOwnership::establish(
1057                orders.clone(),
1058                RangeId::new(1),
1059                ShardKeyMode::Hash,
1060                RangeBounds::full(),
1061                ident("CN=node-a"),
1062                vec![ident("CN=node-b"), ident("CN=node-c")],
1063                PlacementMetadata::with_replication_factor(2),
1064            ))
1065            .unwrap();
1066        // events/1 owned by node-b, replicated by node-a (over-replicated: rf 1).
1067        let events = collection("events");
1068        catalog
1069            .apply_update(RangeOwnership::establish(
1070                events.clone(),
1071                RangeId::new(1),
1072                ShardKeyMode::Hash,
1073                RangeBounds::full(),
1074                ident("CN=node-b"),
1075                vec![ident("CN=node-a")],
1076                PlacementMetadata::with_replication_factor(1),
1077            ))
1078            .unwrap();
1079
1080        members.begin_drain(&ident("CN=node-a")).unwrap();
1081        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
1082            .with_catch_up("CN=node-b", 1, 10)
1083            .with_catch_up("CN=node-c", 1, 10);
1084
1085        let outcome = run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
1086        assert!(outcome.is_drained(), "every range moved off node-a");
1087        assert_eq!(outcome.handoffs.len(), 1);
1088        assert!(outcome.handoffs[0].is_ok());
1089        assert_eq!(outcome.evacuations.len(), 1);
1090
1091        // orders/1 is now owned by a caught-up replica (node-b, identity tie-break),
1092        // and node-a is fenced from public writes.
1093        let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
1094        assert_eq!(r1.owner(), &ident("CN=node-b"));
1095        assert!(r1.epoch().value() > 1, "epoch bumped to fence old owner");
1096        // events/1 no longer lists node-a as a replica.
1097        let r2 = catalog.range(&events, RangeId::new(1)).unwrap();
1098        assert!(!r2.replicas().contains(&ident("CN=node-a")));
1099
1100        // node-a now depends on no range: removal is allowed.
1101        let removed = commit_drain_removal(&ident("CN=node-a"), &mut members, &catalog)
1102            .expect("drained member is removable");
1103        assert_eq!(removed.identity(), &ident("CN=node-a"));
1104        assert!(!members.is_authorized(&ident("CN=node-a")));
1105
1106        // The fenced old owner is rejected if it still tries to write orders/1.
1107        let err = catalog
1108            .admit_public_write(
1109                &ident("CN=node-a"),
1110                &orders,
1111                b"k",
1112                OwnershipEpoch::initial(),
1113            )
1114            .unwrap_err();
1115        assert!(matches!(
1116            err,
1117            RangeWriteReject::NotOwner { .. } | RangeWriteReject::StaleEpoch { .. }
1118        ));
1119    }
1120
1121    // --- drain blocked by an unmoved range --------------------------------
1122
1123    #[test]
1124    fn drain_blocked_by_unmoved_range_refuses_removal() {
1125        // node-a owns orders/1 whose only replica node-b has NOT caught up to the
1126        // watermark, so there is no safe hand-off target. The range stays, and
1127        // removal is refused.
1128        let mut members = membership(&["CN=node-a", "CN=node-b"]);
1129        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
1130        members.begin_drain(&ident("CN=node-a")).unwrap();
1131        let signals =
1132            FakeSignals::new(CommitWatermark::new(2, 50)).with_catch_up("CN=node-b", 2, 49); // one LSN short
1133
1134        let outcome = run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
1135        assert!(!outcome.is_drained());
1136        assert!(outcome.handoffs.is_empty());
1137        assert_eq!(outcome.blocked.len(), 1);
1138        assert_eq!(
1139            outcome.blocked[0].reason,
1140            DrainBlockReason::NoSafeHandoffTarget
1141        );
1142
1143        // Ownership is untouched — node-a still owns orders/1.
1144        let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
1145        assert_eq!(r1.owner(), &ident("CN=node-a"));
1146        assert_eq!(r1.epoch(), OwnershipEpoch::initial());
1147
1148        // Removal is refused while the range still depends on node-a.
1149        let err = commit_drain_removal(&ident("CN=node-a"), &mut members, &catalog).unwrap_err();
1150        match err {
1151            RemovalRejection::StillOwnsRanges { ranges, .. } => {
1152                assert_eq!(ranges, vec![(orders.clone(), RangeId::new(1))]);
1153            }
1154            other => panic!("expected StillOwnsRanges, got {other:?}"),
1155        }
1156        assert!(members.is_authorized(&ident("CN=node-a")), "still a member");
1157    }
1158
1159    #[test]
1160    fn drain_blocked_when_replica_evac_would_drop_below_rf() {
1161        // node-a replicates orders/1 (owner node-b) at exactly rf 2, and there is no
1162        // free member to host a replacement copy — evacuation is blocked.
1163        let mut members = membership(&["CN=node-a", "CN=node-b"]);
1164        let (mut catalog, _orders) = catalog_with_rf("CN=node-b", &["CN=node-a"], 2);
1165        members.begin_drain(&ident("CN=node-a")).unwrap();
1166        let signals = FakeSignals::new(CommitWatermark::new(1, 10));
1167
1168        let outcome = run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
1169        assert_eq!(outcome.blocked.len(), 1);
1170        assert_eq!(
1171            outcome.blocked[0].reason,
1172            DrainBlockReason::NoReplacementReplica
1173        );
1174    }
1175
1176    #[test]
1177    fn replica_evac_assigns_replacement_to_preserve_rf() {
1178        // node-a replicates orders/1 (owner node-b) at rf 2; node-c is free to take
1179        // a replacement copy, so the evacuation moves the copy rather than blocking.
1180        let mut members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
1181        let (mut catalog, orders) = catalog_with_rf("CN=node-b", &["CN=node-a"], 2);
1182        members.begin_drain(&ident("CN=node-a")).unwrap();
1183        let signals = FakeSignals::new(CommitWatermark::new(1, 10));
1184
1185        let plan = plan_drain(&ident("CN=node-a"), &members, &catalog, &signals);
1186        assert_eq!(plan.steps.len(), 1);
1187        match &plan.steps[0] {
1188            DrainStep::Evacuate(evac) => {
1189                assert_eq!(evac.replacement, Some(ident("CN=node-c")));
1190            }
1191            other => panic!("expected Evacuate, got {other:?}"),
1192        }
1193
1194        run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
1195        let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
1196        assert!(!r1.replicas().contains(&ident("CN=node-a")));
1197        assert!(r1.replicas().contains(&ident("CN=node-c")));
1198        // Owner unchanged and epoch not bumped — only the replica set moved.
1199        assert_eq!(r1.owner(), &ident("CN=node-b"));
1200        assert_eq!(r1.epoch(), OwnershipEpoch::initial());
1201    }
1202
1203    // --- no new placements to a draining member ---------------------------
1204
1205    #[test]
1206    fn draining_member_is_never_a_handoff_or_replacement_target() {
1207        // node-a (draining) owns orders/1 with replicas node-b (also draining) and
1208        // node-c. The hand-off must skip the draining node-b and choose node-c.
1209        let mut members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
1210        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
1211        members.begin_drain(&ident("CN=node-a")).unwrap();
1212        members.begin_drain(&ident("CN=node-b")).unwrap();
1213        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
1214            .with_catch_up("CN=node-b", 1, 10)
1215            .with_catch_up("CN=node-c", 1, 10);
1216
1217        let plan = plan_drain(&ident("CN=node-a"), &members, &catalog, &signals);
1218        match &plan.steps[0] {
1219            DrainStep::Handoff(h) => assert_eq!(
1220                h.target,
1221                ident("CN=node-c"),
1222                "draining node-b is not a placement target"
1223            ),
1224            other => panic!("expected Handoff, got {other:?}"),
1225        }
1226
1227        run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
1228        let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
1229        assert_eq!(r1.owner(), &ident("CN=node-c"));
1230    }
1231
1232    // --- force remove recovery --------------------------------------------
1233
1234    #[test]
1235    fn force_remove_promotes_surviving_replica_and_fences_dead_owner() {
1236        // node-a is dead; it owned orders/1 with a caught-up replica node-b.
1237        let mut members = membership(&["CN=node-a", "CN=node-b"]);
1238        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
1239        let signals =
1240            FakeSignals::new(CommitWatermark::new(1, 10)).with_catch_up("CN=node-b", 1, 10);
1241        let order = ForceRemoveOrder::new(
1242            ForceCapability::granted("ops:alice"),
1243            ident("CN=node-a"),
1244            "node-a hardware failure, unrecoverable",
1245        )
1246        .unwrap();
1247
1248        let result = run_force_remove(&order, &mut members, &mut catalog, &signals);
1249        assert_eq!(result.promotions.len(), 1);
1250        assert_eq!(result.promotions[0].new_owner, ident("CN=node-b"));
1251        assert!(result.promotions[0].fenced_old_owner());
1252        assert!(result.unrecoverable.is_empty());
1253        // The dead member is removed.
1254        assert!(result.removed.is_some());
1255        assert!(!members.is_authorized(&ident("CN=node-a")));
1256
1257        // node-b owns orders/1 at a bumped epoch; node-a is fenced.
1258        let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
1259        assert_eq!(r1.owner(), &ident("CN=node-b"));
1260        assert_eq!(r1.role_of(&ident("CN=node-b")), RangeRole::Owner);
1261        assert!(r1.epoch().value() > 1);
1262
1263        // The audit covers the watermark — no write loss here.
1264        assert!(!result.audit.has_potential_write_loss());
1265        let line = result.audit.to_string();
1266        assert!(line.contains("FORCE remove"));
1267        assert!(line.contains("ops:alice"));
1268        assert!(line.contains("hardware failure"));
1269    }
1270
1271    #[test]
1272    fn force_remove_proceeds_with_behind_replica_and_records_write_loss() {
1273        // node-a is dead; its only replica node-b is BEHIND the watermark. Ordinary
1274        // failover would block, but a forced removal promotes it anyway and records
1275        // the potential committed-write loss as audit evidence.
1276        let mut members = membership(&["CN=node-a", "CN=node-b"]);
1277        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
1278        let signals =
1279            FakeSignals::new(CommitWatermark::new(2, 50)).with_catch_up("CN=node-b", 2, 49); // one LSN short
1280        let order = ForceRemoveOrder::new(
1281            ForceCapability::granted("ops:bob"),
1282            ident("CN=node-a"),
1283            "node-a disk destroyed",
1284        )
1285        .unwrap();
1286
1287        let result = run_force_remove(&order, &mut members, &mut catalog, &signals);
1288        assert_eq!(result.promotions.len(), 1);
1289        assert!(!result.audit.promotions[0].3, "does not cover watermark");
1290        assert!(result.audit.has_potential_write_loss());
1291        assert!(result.audit.to_string().contains("POTENTIAL WRITE LOSS"));
1292
1293        // It still moved authority and fenced the dead owner.
1294        let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
1295        assert_eq!(r1.owner(), &ident("CN=node-b"));
1296    }
1297
1298    #[test]
1299    fn force_remove_records_unrecoverable_range_with_no_replica() {
1300        // node-a is dead and owned orders/1 with NO replica — the range is
1301        // unrecoverable, recorded in the audit, but node-a is still removed.
1302        let mut members = membership(&["CN=node-a", "CN=node-b"]);
1303        let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
1304        let signals = FakeSignals::new(CommitWatermark::new(1, 10));
1305        let order = ForceRemoveOrder::new(
1306            ForceCapability::granted("ops:carol"),
1307            ident("CN=node-a"),
1308            "node-a lost, no replicas existed",
1309        )
1310        .unwrap();
1311
1312        let result = run_force_remove(&order, &mut members, &mut catalog, &signals);
1313        assert!(result.promotions.is_empty());
1314        assert_eq!(result.unrecoverable.len(), 1);
1315        assert_eq!(result.unrecoverable[0].range_id, RangeId::new(1));
1316        assert!(result.audit.has_potential_write_loss());
1317        assert!(result.removed.is_some());
1318        assert!(!members.is_authorized(&ident("CN=node-a")));
1319
1320        // The orphaned range still names node-a (it is unrecoverable, not silently
1321        // reassigned).
1322        let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
1323        assert_eq!(r1.owner(), &ident("CN=node-a"));
1324    }
1325
1326    #[test]
1327    fn force_remove_drops_dead_members_stale_replica_copies() {
1328        // node-a is dead and was only a replica of orders/1 (owner node-b). Force
1329        // remove drops its stale copy and removes it; the live owner is untouched.
1330        let mut members = membership(&["CN=node-a", "CN=node-b"]);
1331        let (mut catalog, orders) = catalog_with("CN=node-b", &["CN=node-a"]);
1332        let signals = FakeSignals::new(CommitWatermark::new(1, 10));
1333        let order = ForceRemoveOrder::new(
1334            ForceCapability::granted("ops:dan"),
1335            ident("CN=node-a"),
1336            "node-a gone",
1337        )
1338        .unwrap();
1339
1340        let result = run_force_remove(&order, &mut members, &mut catalog, &signals);
1341        assert!(result.promotions.is_empty());
1342        assert_eq!(result.audit.replica_copies_dropped, 1);
1343        let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
1344        assert_eq!(r1.owner(), &ident("CN=node-b"));
1345        assert!(!r1.replicas().contains(&ident("CN=node-a")));
1346        assert_eq!(r1.epoch(), OwnershipEpoch::initial(), "owner unchanged");
1347    }
1348
1349    #[test]
1350    fn force_remove_order_requires_explicit_reason() {
1351        let err = ForceRemoveOrder::new(
1352            ForceCapability::granted("ops:eve"),
1353            ident("CN=node-a"),
1354            "   ",
1355        )
1356        .unwrap_err();
1357        assert_eq!(err, ForceRemoveOrderError::EmptyReason);
1358    }
1359
1360    // --- audit / status reporting -----------------------------------------
1361
1362    #[test]
1363    fn drain_status_reports_dependencies_and_removability() {
1364        let mut members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
1365        let (mut catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
1366        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
1367            .with_catch_up("CN=node-b", 1, 10)
1368            .with_catch_up("CN=node-c", 1, 10);
1369
1370        // Before draining: a member, owns one range, not draining, not removable.
1371        let status = drain_status(&ident("CN=node-a"), &members, &catalog, &signals);
1372        assert!(status.is_member);
1373        assert!(!status.is_draining);
1374        assert_eq!(status.owned_ranges.len(), 1);
1375        assert!(status.replicated_ranges.is_empty());
1376        assert_eq!(status.planned_steps, 1);
1377        assert!(!status.removable);
1378
1379        // After marking draining and running the drain: no dependencies, removable.
1380        members.begin_drain(&ident("CN=node-a")).unwrap();
1381        run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
1382        let status = drain_status(&ident("CN=node-a"), &members, &catalog, &signals);
1383        assert!(status.is_draining);
1384        assert!(status.owned_ranges.is_empty());
1385        assert!(status.replicated_ranges.is_empty());
1386        assert!(status.removable);
1387    }
1388
1389    #[test]
1390    fn removing_a_non_member_or_non_draining_member_is_refused() {
1391        let mut members = membership(&["CN=node-a"]);
1392        let catalog = ShardOwnershipCatalog::new();
1393
1394        // Not a member.
1395        let err = commit_drain_removal(&ident("CN=ghost"), &mut members, &catalog).unwrap_err();
1396        assert!(matches!(err, RemovalRejection::NotAMember { .. }));
1397
1398        // A member that was never marked draining cannot be removed via the planned
1399        // path (force-remove is the path for that).
1400        let err = commit_drain_removal(&ident("CN=node-a"), &mut members, &catalog).unwrap_err();
1401        assert!(matches!(err, RemovalRejection::NotDraining { .. }));
1402    }
1403}