use super::identity::NodeIdentity;
use super::membership::{ClusterMember, MembershipCatalog};
use super::ownership::{CollectionId, RangeId, RangeOwnership, ShardOwnershipCatalog};
use super::ownership_transition::{
run_transition, CatchUpEvidence, CommitWatermark, TransitionError, TransitionKind,
TransitionOutcome, TransitionRequest,
};
use super::supervisor::ClusterSignals;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DrainStep {
Handoff(OwnedHandoff),
Evacuate(ReplicaEvacuation),
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct OwnedHandoff {
pub collection: CollectionId,
pub range_id: RangeId,
pub target: NodeIdentity,
pub request: TransitionRequest,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReplicaEvacuation {
pub collection: CollectionId,
pub range_id: RangeId,
pub replacement: Option<NodeIdentity>,
pub next: RangeOwnership,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DrainBlockReason {
NoSafeHandoffTarget,
NoReplacementReplica,
}
impl DrainBlockReason {
fn label(self) -> &'static str {
match self {
DrainBlockReason::NoSafeHandoffTarget => {
"no caught-up replica is a safe hand-off target"
}
DrainBlockReason::NoReplacementReplica => {
"no eligible member can host a replacement replica"
}
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DrainBlock {
pub collection: CollectionId,
pub range_id: RangeId,
pub reason: DrainBlockReason,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DrainPlan {
pub member: NodeIdentity,
pub steps: Vec<DrainStep>,
pub blocked: Vec<DrainBlock>,
}
impl DrainPlan {
pub fn is_empty(&self) -> bool {
self.steps.is_empty() && self.blocked.is_empty()
}
pub fn is_complete(&self) -> bool {
self.blocked.is_empty()
}
}
pub fn plan_drain(
member: &NodeIdentity,
membership: &MembershipCatalog,
ownership: &ShardOwnershipCatalog,
signals: &impl ClusterSignals,
) -> DrainPlan {
let mut steps = Vec::new();
let mut blocked = Vec::new();
for range in ownership.entries() {
let collection = range.collection().clone();
let range_id = range.range_id();
if range.owner() == member {
let watermark = signals.commit_watermark(&collection, range_id);
match select_handoff_target(range, member, membership, watermark, signals) {
Some((target, evidence)) => {
let request = TransitionRequest::new(
TransitionKind::Handoff,
collection.clone(),
range_id,
member.clone(),
range.epoch(),
range.version(),
target.clone(),
watermark,
)
.with_evidence(evidence)
.with_replicas(without(range.replicas(), &target));
steps.push(DrainStep::Handoff(OwnedHandoff {
collection,
range_id,
target,
request,
}));
}
None => blocked.push(DrainBlock {
collection,
range_id,
reason: DrainBlockReason::NoSafeHandoffTarget,
}),
}
} else if range.replicas().contains(member) {
let remaining = without(range.replicas(), member);
let copies_after = 1 + remaining.len();
let required = range.placement().replication_factor();
if copies_after >= required {
let next = range.update_replicas(remaining);
steps.push(DrainStep::Evacuate(ReplicaEvacuation {
collection,
range_id,
replacement: None,
next,
}));
} else if let Some(replacement) = select_replacement_replica(range, member, membership)
{
let mut replicas = remaining;
replicas.push(replacement.clone());
let next = range.update_replicas(replicas);
steps.push(DrainStep::Evacuate(ReplicaEvacuation {
collection,
range_id,
replacement: Some(replacement),
next,
}));
} else {
blocked.push(DrainBlock {
collection,
range_id,
reason: DrainBlockReason::NoReplacementReplica,
});
}
}
}
DrainPlan {
member: member.clone(),
steps,
blocked,
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DrainOutcome {
pub member: NodeIdentity,
pub handoffs: Vec<Result<TransitionOutcome, TransitionError>>,
pub evacuations: Vec<ReplicaEvacuation>,
pub blocked: Vec<DrainBlock>,
}
impl DrainOutcome {
pub fn is_drained(&self) -> bool {
self.blocked.is_empty() && self.handoffs.iter().all(Result::is_ok)
}
}
pub fn run_drain(
member: &NodeIdentity,
membership: &MembershipCatalog,
ownership: &mut ShardOwnershipCatalog,
signals: &impl ClusterSignals,
) -> DrainOutcome {
let plan = plan_drain(member, membership, ownership, signals);
let mut handoffs = Vec::new();
let mut evacuations = Vec::new();
for step in plan.steps {
match step {
DrainStep::Handoff(handoff) => {
handoffs.push(run_transition(ownership, &handoff.request));
}
DrainStep::Evacuate(evac) => {
if ownership.apply_update(evac.next.clone()).is_ok() {
evacuations.push(evac);
}
}
}
}
DrainOutcome {
member: member.clone(),
handoffs,
evacuations,
blocked: plan.blocked,
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RemovalRejection {
NotAMember { member: NodeIdentity },
NotDraining { member: NodeIdentity },
StillOwnsRanges {
member: NodeIdentity,
ranges: Vec<(CollectionId, RangeId)>,
},
StillReplicaFor {
member: NodeIdentity,
ranges: Vec<(CollectionId, RangeId)>,
},
}
impl std::fmt::Display for RemovalRejection {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::NotAMember { member } => write!(f, "{member} is not a cluster member"),
Self::NotDraining { member } => {
write!(f, "{member} must be marked draining before planned removal")
}
Self::StillOwnsRanges { member, ranges } => write!(
f,
"{member} cannot be removed: still owns {} range(s)",
ranges.len()
),
Self::StillReplicaFor { member, ranges } => write!(
f,
"{member} cannot be removed: still replicates {} range(s)",
ranges.len()
),
}
}
}
impl std::error::Error for RemovalRejection {}
pub fn commit_drain_removal(
member: &NodeIdentity,
membership: &mut MembershipCatalog,
ownership: &ShardOwnershipCatalog,
) -> Result<ClusterMember, RemovalRejection> {
match membership.member(member) {
None => {
return Err(RemovalRejection::NotAMember {
member: member.clone(),
})
}
Some(m) if !m.is_draining() => {
return Err(RemovalRejection::NotDraining {
member: member.clone(),
})
}
Some(_) => {}
}
let (owned, replicated) = range_dependencies(member, ownership);
if !owned.is_empty() {
return Err(RemovalRejection::StillOwnsRanges {
member: member.clone(),
ranges: owned,
});
}
if !replicated.is_empty() {
return Err(RemovalRejection::StillReplicaFor {
member: member.clone(),
ranges: replicated,
});
}
Ok(membership
.remove(member)
.expect("membership presence checked above"))
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForceCapability {
holder: String,
}
impl ForceCapability {
pub fn granted(holder: impl Into<String>) -> Self {
Self {
holder: holder.into(),
}
}
pub fn holder(&self) -> &str {
&self.holder
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ForceRemoveOrderError {
EmptyReason,
}
impl std::fmt::Display for ForceRemoveOrderError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::EmptyReason => write!(f, "a forced removal requires an explicit operator reason"),
}
}
}
impl std::error::Error for ForceRemoveOrderError {}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForceRemoveOrder {
capability: ForceCapability,
member: NodeIdentity,
reason: String,
}
impl ForceRemoveOrder {
pub fn new(
capability: ForceCapability,
member: NodeIdentity,
reason: impl Into<String>,
) -> Result<Self, ForceRemoveOrderError> {
let reason = reason.into();
if reason.trim().is_empty() {
return Err(ForceRemoveOrderError::EmptyReason);
}
Ok(Self {
capability,
member,
reason,
})
}
pub fn member(&self) -> &NodeIdentity {
&self.member
}
pub fn reason(&self) -> &str {
&self.reason
}
pub fn capability(&self) -> &ForceCapability {
&self.capability
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForcedPromotion {
pub collection: CollectionId,
pub range_id: RangeId,
pub dead_owner: NodeIdentity,
pub new_owner: NodeIdentity,
pub covers_watermark: bool,
pub evidence: Option<CatchUpEvidence>,
pub next: RangeOwnership,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForcedBlock {
pub collection: CollectionId,
pub range_id: RangeId,
pub dead_owner: NodeIdentity,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForceRemovePlan {
pub member: NodeIdentity,
pub reason: String,
pub capability_holder: String,
pub promotions: Vec<ForcedPromotion>,
pub replica_drops: Vec<RangeOwnership>,
pub unrecoverable: Vec<ForcedBlock>,
}
pub fn plan_force_remove(
order: &ForceRemoveOrder,
membership: &MembershipCatalog,
ownership: &ShardOwnershipCatalog,
signals: &impl ClusterSignals,
) -> ForceRemovePlan {
let member = order.member();
let mut promotions = Vec::new();
let mut replica_drops = Vec::new();
let mut unrecoverable = Vec::new();
for range in ownership.entries() {
let collection = range.collection().clone();
let range_id = range.range_id();
if range.owner() == member {
let watermark = signals.commit_watermark(&collection, range_id);
match select_force_target(range, member, membership, watermark, signals) {
Some((target, covers_watermark, evidence)) => {
let next =
range.transfer_to(target.clone(), without(range.replicas(), &target));
promotions.push(ForcedPromotion {
collection,
range_id,
dead_owner: member.clone(),
new_owner: target,
covers_watermark,
evidence,
next,
});
}
None => unrecoverable.push(ForcedBlock {
collection,
range_id,
dead_owner: member.clone(),
}),
}
} else if range.replicas().contains(member) {
replica_drops.push(range.update_replicas(without(range.replicas(), member)));
}
}
ForceRemovePlan {
member: member.clone(),
reason: order.reason().to_string(),
capability_holder: order.capability().holder().to_string(),
promotions,
replica_drops,
unrecoverable,
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForceRemoveAudit {
pub member: NodeIdentity,
pub capability_holder: String,
pub reason: String,
pub promotions: Vec<(CollectionId, RangeId, NodeIdentity, bool)>,
pub unrecoverable: Vec<(CollectionId, RangeId)>,
pub replica_copies_dropped: usize,
}
impl ForceRemoveAudit {
pub fn has_potential_write_loss(&self) -> bool {
self.promotions.iter().any(|(_, _, _, covers)| !covers) || !self.unrecoverable.is_empty()
}
}
impl std::fmt::Display for ForceRemoveAudit {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"FORCE remove {} by {} (reason: {}): {} range(s) force-promoted, {} unrecoverable, {} stale replica copies dropped",
self.member,
self.capability_holder,
self.reason,
self.promotions.len(),
self.unrecoverable.len(),
self.replica_copies_dropped,
)?;
if self.has_potential_write_loss() {
write!(f, "; POTENTIAL WRITE LOSS")?;
}
Ok(())
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ForceRemoveResult {
pub audit: ForceRemoveAudit,
pub promotions: Vec<TransitionOutcome>,
pub unrecoverable: Vec<ForcedBlock>,
pub removed: Option<ClusterMember>,
}
pub fn run_force_remove(
order: &ForceRemoveOrder,
membership: &mut MembershipCatalog,
ownership: &mut ShardOwnershipCatalog,
signals: &impl ClusterSignals,
) -> ForceRemoveResult {
let plan = plan_force_remove(order, membership, ownership, signals);
let mut promotion_outcomes = Vec::new();
let mut audit_promotions = Vec::new();
for promotion in &plan.promotions {
let previous_owner = promotion.dead_owner.clone();
let new_epoch = promotion.next.epoch();
let previous_epoch = ownership
.range(&promotion.collection, promotion.range_id)
.map(RangeOwnership::epoch)
.unwrap_or(new_epoch);
let new_version = promotion.next.version();
let previous_version = ownership
.range(&promotion.collection, promotion.range_id)
.map(RangeOwnership::version)
.unwrap_or(new_version);
let watermark = signals.commit_watermark(&promotion.collection, promotion.range_id);
if ownership.apply_update(promotion.next.clone()).is_ok() {
audit_promotions.push((
promotion.collection.clone(),
promotion.range_id,
promotion.new_owner.clone(),
promotion.covers_watermark,
));
promotion_outcomes.push(TransitionOutcome {
kind: TransitionKind::Promote,
collection: promotion.collection.clone(),
range_id: promotion.range_id,
previous_owner,
new_owner: promotion.new_owner.clone(),
previous_epoch,
new_epoch,
previous_version,
new_version,
watermark,
});
}
}
let mut replica_copies_dropped = 0;
for drop in &plan.replica_drops {
if ownership.apply_update(drop.clone()).is_ok() {
replica_copies_dropped += 1;
}
}
let removed = membership.remove(order.member());
let audit = ForceRemoveAudit {
member: order.member().clone(),
capability_holder: plan.capability_holder,
reason: plan.reason,
promotions: audit_promotions,
unrecoverable: plan
.unrecoverable
.iter()
.map(|b| (b.collection.clone(), b.range_id))
.collect(),
replica_copies_dropped,
};
ForceRemoveResult {
audit,
promotions: promotion_outcomes,
unrecoverable: plan.unrecoverable,
removed,
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DrainStatus {
pub member: NodeIdentity,
pub is_member: bool,
pub is_draining: bool,
pub owned_ranges: Vec<(CollectionId, RangeId)>,
pub replicated_ranges: Vec<(CollectionId, RangeId)>,
pub planned_steps: usize,
pub blocked: Vec<DrainBlock>,
pub removable: bool,
}
pub fn drain_status(
member: &NodeIdentity,
membership: &MembershipCatalog,
ownership: &ShardOwnershipCatalog,
signals: &impl ClusterSignals,
) -> DrainStatus {
let member_entry = membership.member(member);
let is_member = member_entry.is_some();
let is_draining = member_entry.is_some_and(ClusterMember::is_draining);
let (owned_ranges, replicated_ranges) = range_dependencies(member, ownership);
let plan = plan_drain(member, membership, ownership, signals);
let removable = is_draining && owned_ranges.is_empty() && replicated_ranges.is_empty();
DrainStatus {
member: member.clone(),
is_member,
is_draining,
owned_ranges,
replicated_ranges,
planned_steps: plan.steps.len(),
blocked: plan.blocked,
removable,
}
}
fn range_dependencies(
member: &NodeIdentity,
ownership: &ShardOwnershipCatalog,
) -> (Vec<(CollectionId, RangeId)>, Vec<(CollectionId, RangeId)>) {
let mut owned = Vec::new();
let mut replicated = Vec::new();
for range in ownership.entries() {
if range.owner() == member {
owned.push((range.collection().clone(), range.range_id()));
} else if range.replicas().contains(member) {
replicated.push((range.collection().clone(), range.range_id()));
}
}
(owned, replicated)
}
fn without(replicas: &[NodeIdentity], node: &NodeIdentity) -> Vec<NodeIdentity> {
replicas.iter().filter(|r| *r != node).cloned().collect()
}
fn select_handoff_target(
range: &RangeOwnership,
member: &NodeIdentity,
membership: &MembershipCatalog,
watermark: CommitWatermark,
signals: &impl ClusterSignals,
) -> Option<(NodeIdentity, CatchUpEvidence)> {
let mut best: Option<(CatchUpEvidence, NodeIdentity)> = None;
for candidate in range.replicas() {
if candidate == member {
continue;
}
if !membership
.member(candidate)
.is_some_and(ClusterMember::is_placement_eligible)
{
continue;
}
let Some(evidence) = signals.catch_up(range.collection(), range.range_id(), candidate)
else {
continue;
};
if !evidence.covers(watermark) {
continue;
}
let applied = (evidence.applied_term, evidence.applied_lsn);
let better = match &best {
None => true,
Some((best_ev, best_id)) => {
applied > (best_ev.applied_term, best_ev.applied_lsn)
|| (applied == (best_ev.applied_term, best_ev.applied_lsn)
&& candidate < best_id)
}
};
if better {
best = Some((evidence, candidate.clone()));
}
}
best.map(|(evidence, id)| (id, evidence))
}
fn select_force_target(
range: &RangeOwnership,
member: &NodeIdentity,
membership: &MembershipCatalog,
watermark: CommitWatermark,
signals: &impl ClusterSignals,
) -> Option<(NodeIdentity, bool, Option<CatchUpEvidence>)> {
let mut best: Option<(bool, (u64, u64), NodeIdentity, Option<CatchUpEvidence>)> = None;
for candidate in range.replicas() {
if candidate == member {
continue;
}
if !membership
.member(candidate)
.is_some_and(ClusterMember::is_placement_eligible)
{
continue;
}
let evidence = signals.catch_up(range.collection(), range.range_id(), candidate);
let covers = evidence.as_ref().is_some_and(|e| e.covers(watermark));
let applied = evidence
.as_ref()
.map(|e| (e.applied_term, e.applied_lsn))
.unwrap_or((0, 0));
let better = match &best {
None => true,
Some((best_covers, best_applied, best_id, _)) => {
(covers, applied) > (*best_covers, *best_applied)
|| ((covers, applied) == (*best_covers, *best_applied) && candidate < best_id)
}
};
if better {
best = Some((covers, applied, candidate.clone(), evidence));
}
}
best.map(|(covers, _, id, evidence)| (id, covers, evidence))
}
fn select_replacement_replica(
range: &RangeOwnership,
member: &NodeIdentity,
membership: &MembershipCatalog,
) -> Option<NodeIdentity> {
membership
.placement_eligible_members()
.map(ClusterMember::identity)
.find(|id| *id != member && range.owner() != *id && !range.replicas().contains(id))
.cloned()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::membership::{ClusterId, MemberKind};
use crate::cluster::ownership::{
OwnershipEpoch, PlacementMetadata, RangeBounds, RangeRole, RangeWriteReject, ShardKeyMode,
};
use std::collections::HashMap;
fn ident(cn: &str) -> NodeIdentity {
NodeIdentity::from_certificate_subject(cn).unwrap()
}
fn collection(name: &str) -> CollectionId {
CollectionId::new(name).unwrap()
}
fn data_member(cn: &str) -> ClusterMember {
ClusterMember::joined_empty(ident(cn), MemberKind::Data)
}
fn membership(members: &[&str]) -> MembershipCatalog {
MembershipCatalog::new(
ClusterId::new("cluster-x").unwrap(),
members.iter().map(|m| data_member(m)),
)
}
fn catalog_with_rf(
owner: &str,
replicas: &[&str],
rf: usize,
) -> (ShardOwnershipCatalog, CollectionId) {
let orders = collection("orders");
let mut catalog = ShardOwnershipCatalog::new();
catalog
.apply_update(RangeOwnership::establish(
orders.clone(),
RangeId::new(1),
ShardKeyMode::Hash,
RangeBounds::full(),
ident(owner),
replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
PlacementMetadata::with_replication_factor(rf),
))
.unwrap();
(catalog, orders)
}
fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
catalog_with_rf(owner, replicas, 3)
}
struct FakeSignals {
watermark: CommitWatermark,
catch_up: HashMap<NodeIdentity, CatchUpEvidence>,
}
impl FakeSignals {
fn new(watermark: CommitWatermark) -> Self {
Self {
watermark,
catch_up: HashMap::new(),
}
}
fn with_catch_up(mut self, cn: &str, applied_term: u64, applied_lsn: u64) -> Self {
self.catch_up.insert(
ident(cn),
CatchUpEvidence::new(ident(cn), applied_term, applied_lsn),
);
self
}
}
impl ClusterSignals for FakeSignals {
fn member_signals(
&self,
_member: &NodeIdentity,
) -> crate::cluster::supervisor::MemberSignals {
crate::cluster::supervisor::MemberSignals::healthy()
}
fn commit_watermark(
&self,
_collection: &CollectionId,
_range_id: RangeId,
) -> CommitWatermark {
self.watermark
}
fn catch_up(
&self,
_collection: &CollectionId,
_range_id: RangeId,
candidate: &NodeIdentity,
) -> Option<CatchUpEvidence> {
self.catch_up.get(candidate).cloned()
}
}
#[test]
fn begin_drain_marks_member_and_excludes_from_placement() {
let mut members = membership(&["CN=node-a", "CN=node-b"]);
assert!(members
.member(&ident("CN=node-a"))
.unwrap()
.is_placement_eligible());
let changed = members.begin_drain(&ident("CN=node-a"));
assert_eq!(changed, Some(true));
assert!(members.member(&ident("CN=node-a")).unwrap().is_draining());
assert_eq!(members.begin_drain(&ident("CN=node-a")), Some(false));
assert!(!members
.member(&ident("CN=node-a"))
.unwrap()
.is_placement_eligible());
let eligible: Vec<_> = members
.placement_eligible_members()
.map(|m| m.identity().clone())
.collect();
assert_eq!(eligible, vec![ident("CN=node-b")]);
assert_eq!(members.begin_drain(&ident("CN=ghost")), None);
}
#[test]
fn successful_drain_moves_all_ranges_then_allows_removal() {
let mut members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
let orders = collection("orders");
let mut catalog = ShardOwnershipCatalog::new();
catalog
.apply_update(RangeOwnership::establish(
orders.clone(),
RangeId::new(1),
ShardKeyMode::Hash,
RangeBounds::full(),
ident("CN=node-a"),
vec![ident("CN=node-b"), ident("CN=node-c")],
PlacementMetadata::with_replication_factor(2),
))
.unwrap();
let events = collection("events");
catalog
.apply_update(RangeOwnership::establish(
events.clone(),
RangeId::new(1),
ShardKeyMode::Hash,
RangeBounds::full(),
ident("CN=node-b"),
vec![ident("CN=node-a")],
PlacementMetadata::with_replication_factor(1),
))
.unwrap();
members.begin_drain(&ident("CN=node-a")).unwrap();
let signals = FakeSignals::new(CommitWatermark::new(1, 10))
.with_catch_up("CN=node-b", 1, 10)
.with_catch_up("CN=node-c", 1, 10);
let outcome = run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
assert!(outcome.is_drained(), "every range moved off node-a");
assert_eq!(outcome.handoffs.len(), 1);
assert!(outcome.handoffs[0].is_ok());
assert_eq!(outcome.evacuations.len(), 1);
let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(r1.owner(), &ident("CN=node-b"));
assert!(r1.epoch().value() > 1, "epoch bumped to fence old owner");
let r2 = catalog.range(&events, RangeId::new(1)).unwrap();
assert!(!r2.replicas().contains(&ident("CN=node-a")));
let removed = commit_drain_removal(&ident("CN=node-a"), &mut members, &catalog)
.expect("drained member is removable");
assert_eq!(removed.identity(), &ident("CN=node-a"));
assert!(!members.is_authorized(&ident("CN=node-a")));
let err = catalog
.admit_public_write(
&ident("CN=node-a"),
&orders,
b"k",
OwnershipEpoch::initial(),
)
.unwrap_err();
assert!(matches!(
err,
RangeWriteReject::NotOwner { .. } | RangeWriteReject::StaleEpoch { .. }
));
}
#[test]
fn drain_blocked_by_unmoved_range_refuses_removal() {
let mut members = membership(&["CN=node-a", "CN=node-b"]);
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
members.begin_drain(&ident("CN=node-a")).unwrap();
let signals =
FakeSignals::new(CommitWatermark::new(2, 50)).with_catch_up("CN=node-b", 2, 49);
let outcome = run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
assert!(!outcome.is_drained());
assert!(outcome.handoffs.is_empty());
assert_eq!(outcome.blocked.len(), 1);
assert_eq!(
outcome.blocked[0].reason,
DrainBlockReason::NoSafeHandoffTarget
);
let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(r1.owner(), &ident("CN=node-a"));
assert_eq!(r1.epoch(), OwnershipEpoch::initial());
let err = commit_drain_removal(&ident("CN=node-a"), &mut members, &catalog).unwrap_err();
match err {
RemovalRejection::StillOwnsRanges { ranges, .. } => {
assert_eq!(ranges, vec![(orders.clone(), RangeId::new(1))]);
}
other => panic!("expected StillOwnsRanges, got {other:?}"),
}
assert!(members.is_authorized(&ident("CN=node-a")), "still a member");
}
#[test]
fn drain_blocked_when_replica_evac_would_drop_below_rf() {
let mut members = membership(&["CN=node-a", "CN=node-b"]);
let (mut catalog, _orders) = catalog_with_rf("CN=node-b", &["CN=node-a"], 2);
members.begin_drain(&ident("CN=node-a")).unwrap();
let signals = FakeSignals::new(CommitWatermark::new(1, 10));
let outcome = run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
assert_eq!(outcome.blocked.len(), 1);
assert_eq!(
outcome.blocked[0].reason,
DrainBlockReason::NoReplacementReplica
);
}
#[test]
fn replica_evac_assigns_replacement_to_preserve_rf() {
let mut members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
let (mut catalog, orders) = catalog_with_rf("CN=node-b", &["CN=node-a"], 2);
members.begin_drain(&ident("CN=node-a")).unwrap();
let signals = FakeSignals::new(CommitWatermark::new(1, 10));
let plan = plan_drain(&ident("CN=node-a"), &members, &catalog, &signals);
assert_eq!(plan.steps.len(), 1);
match &plan.steps[0] {
DrainStep::Evacuate(evac) => {
assert_eq!(evac.replacement, Some(ident("CN=node-c")));
}
other => panic!("expected Evacuate, got {other:?}"),
}
run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
assert!(!r1.replicas().contains(&ident("CN=node-a")));
assert!(r1.replicas().contains(&ident("CN=node-c")));
assert_eq!(r1.owner(), &ident("CN=node-b"));
assert_eq!(r1.epoch(), OwnershipEpoch::initial());
}
#[test]
fn draining_member_is_never_a_handoff_or_replacement_target() {
let mut members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
members.begin_drain(&ident("CN=node-a")).unwrap();
members.begin_drain(&ident("CN=node-b")).unwrap();
let signals = FakeSignals::new(CommitWatermark::new(1, 10))
.with_catch_up("CN=node-b", 1, 10)
.with_catch_up("CN=node-c", 1, 10);
let plan = plan_drain(&ident("CN=node-a"), &members, &catalog, &signals);
match &plan.steps[0] {
DrainStep::Handoff(h) => assert_eq!(
h.target,
ident("CN=node-c"),
"draining node-b is not a placement target"
),
other => panic!("expected Handoff, got {other:?}"),
}
run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(r1.owner(), &ident("CN=node-c"));
}
#[test]
fn force_remove_promotes_surviving_replica_and_fences_dead_owner() {
let mut members = membership(&["CN=node-a", "CN=node-b"]);
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let signals =
FakeSignals::new(CommitWatermark::new(1, 10)).with_catch_up("CN=node-b", 1, 10);
let order = ForceRemoveOrder::new(
ForceCapability::granted("ops:alice"),
ident("CN=node-a"),
"node-a hardware failure, unrecoverable",
)
.unwrap();
let result = run_force_remove(&order, &mut members, &mut catalog, &signals);
assert_eq!(result.promotions.len(), 1);
assert_eq!(result.promotions[0].new_owner, ident("CN=node-b"));
assert!(result.promotions[0].fenced_old_owner());
assert!(result.unrecoverable.is_empty());
assert!(result.removed.is_some());
assert!(!members.is_authorized(&ident("CN=node-a")));
let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(r1.owner(), &ident("CN=node-b"));
assert_eq!(r1.role_of(&ident("CN=node-b")), RangeRole::Owner);
assert!(r1.epoch().value() > 1);
assert!(!result.audit.has_potential_write_loss());
let line = result.audit.to_string();
assert!(line.contains("FORCE remove"));
assert!(line.contains("ops:alice"));
assert!(line.contains("hardware failure"));
}
#[test]
fn force_remove_proceeds_with_behind_replica_and_records_write_loss() {
let mut members = membership(&["CN=node-a", "CN=node-b"]);
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let signals =
FakeSignals::new(CommitWatermark::new(2, 50)).with_catch_up("CN=node-b", 2, 49); let order = ForceRemoveOrder::new(
ForceCapability::granted("ops:bob"),
ident("CN=node-a"),
"node-a disk destroyed",
)
.unwrap();
let result = run_force_remove(&order, &mut members, &mut catalog, &signals);
assert_eq!(result.promotions.len(), 1);
assert!(!result.audit.promotions[0].3, "does not cover watermark");
assert!(result.audit.has_potential_write_loss());
assert!(result.audit.to_string().contains("POTENTIAL WRITE LOSS"));
let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(r1.owner(), &ident("CN=node-b"));
}
#[test]
fn force_remove_records_unrecoverable_range_with_no_replica() {
let mut members = membership(&["CN=node-a", "CN=node-b"]);
let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
let signals = FakeSignals::new(CommitWatermark::new(1, 10));
let order = ForceRemoveOrder::new(
ForceCapability::granted("ops:carol"),
ident("CN=node-a"),
"node-a lost, no replicas existed",
)
.unwrap();
let result = run_force_remove(&order, &mut members, &mut catalog, &signals);
assert!(result.promotions.is_empty());
assert_eq!(result.unrecoverable.len(), 1);
assert_eq!(result.unrecoverable[0].range_id, RangeId::new(1));
assert!(result.audit.has_potential_write_loss());
assert!(result.removed.is_some());
assert!(!members.is_authorized(&ident("CN=node-a")));
let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(r1.owner(), &ident("CN=node-a"));
}
#[test]
fn force_remove_drops_dead_members_stale_replica_copies() {
let mut members = membership(&["CN=node-a", "CN=node-b"]);
let (mut catalog, orders) = catalog_with("CN=node-b", &["CN=node-a"]);
let signals = FakeSignals::new(CommitWatermark::new(1, 10));
let order = ForceRemoveOrder::new(
ForceCapability::granted("ops:dan"),
ident("CN=node-a"),
"node-a gone",
)
.unwrap();
let result = run_force_remove(&order, &mut members, &mut catalog, &signals);
assert!(result.promotions.is_empty());
assert_eq!(result.audit.replica_copies_dropped, 1);
let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(r1.owner(), &ident("CN=node-b"));
assert!(!r1.replicas().contains(&ident("CN=node-a")));
assert_eq!(r1.epoch(), OwnershipEpoch::initial(), "owner unchanged");
}
#[test]
fn force_remove_order_requires_explicit_reason() {
let err = ForceRemoveOrder::new(
ForceCapability::granted("ops:eve"),
ident("CN=node-a"),
" ",
)
.unwrap_err();
assert_eq!(err, ForceRemoveOrderError::EmptyReason);
}
#[test]
fn drain_status_reports_dependencies_and_removability() {
let mut members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
let (mut catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
let signals = FakeSignals::new(CommitWatermark::new(1, 10))
.with_catch_up("CN=node-b", 1, 10)
.with_catch_up("CN=node-c", 1, 10);
let status = drain_status(&ident("CN=node-a"), &members, &catalog, &signals);
assert!(status.is_member);
assert!(!status.is_draining);
assert_eq!(status.owned_ranges.len(), 1);
assert!(status.replicated_ranges.is_empty());
assert_eq!(status.planned_steps, 1);
assert!(!status.removable);
members.begin_drain(&ident("CN=node-a")).unwrap();
run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
let status = drain_status(&ident("CN=node-a"), &members, &catalog, &signals);
assert!(status.is_draining);
assert!(status.owned_ranges.is_empty());
assert!(status.replicated_ranges.is_empty());
assert!(status.removable);
}
#[test]
fn removing_a_non_member_or_non_draining_member_is_refused() {
let mut members = membership(&["CN=node-a"]);
let catalog = ShardOwnershipCatalog::new();
let err = commit_drain_removal(&ident("CN=ghost"), &mut members, &catalog).unwrap_err();
assert!(matches!(err, RemovalRejection::NotAMember { .. }));
let err = commit_drain_removal(&ident("CN=node-a"), &mut members, &catalog).unwrap_err();
assert!(matches!(err, RemovalRejection::NotDraining { .. }));
}
}