1use super::identity::NodeIdentity;
37use super::membership::{ClusterMember, MembershipCatalog};
38use super::ownership::{CollectionId, RangeId, RangeOwnership, ShardOwnershipCatalog};
39use super::ownership_transition::{
40 run_transition, CatchUpEvidence, CommitWatermark, TransitionError, TransitionKind,
41 TransitionOutcome, TransitionRequest,
42};
43use super::supervisor::ClusterSignals;
44
45#[derive(Debug, Clone, PartialEq, Eq)]
53pub enum DrainStep {
54 Handoff(OwnedHandoff),
58 Evacuate(ReplicaEvacuation),
62}
63
64#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct OwnedHandoff {
70 pub collection: CollectionId,
71 pub range_id: RangeId,
72 pub target: NodeIdentity,
73 pub request: TransitionRequest,
74}
75
76#[derive(Debug, Clone, PartialEq, Eq)]
82pub struct ReplicaEvacuation {
83 pub collection: CollectionId,
84 pub range_id: RangeId,
85 pub replacement: Option<NodeIdentity>,
86 pub next: RangeOwnership,
87}
88
89#[derive(Debug, Clone, Copy, PartialEq, Eq)]
93pub enum DrainBlockReason {
94 NoSafeHandoffTarget,
98 NoReplacementReplica,
102}
103
104impl DrainBlockReason {
105 fn label(self) -> &'static str {
106 match self {
107 DrainBlockReason::NoSafeHandoffTarget => {
108 "no caught-up replica is a safe hand-off target"
109 }
110 DrainBlockReason::NoReplacementReplica => {
111 "no eligible member can host a replacement replica"
112 }
113 }
114 }
115}
116
117#[derive(Debug, Clone, PartialEq, Eq)]
120pub struct DrainBlock {
121 pub collection: CollectionId,
122 pub range_id: RangeId,
123 pub reason: DrainBlockReason,
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
132pub struct DrainPlan {
133 pub member: NodeIdentity,
134 pub steps: Vec<DrainStep>,
135 pub blocked: Vec<DrainBlock>,
136}
137
138impl DrainPlan {
139 pub fn is_empty(&self) -> bool {
141 self.steps.is_empty() && self.blocked.is_empty()
142 }
143
144 pub fn is_complete(&self) -> bool {
147 self.blocked.is_empty()
148 }
149}
150
151pub fn plan_drain(
157 member: &NodeIdentity,
158 membership: &MembershipCatalog,
159 ownership: &ShardOwnershipCatalog,
160 signals: &impl ClusterSignals,
161) -> DrainPlan {
162 let mut steps = Vec::new();
163 let mut blocked = Vec::new();
164
165 for range in ownership.entries() {
166 let collection = range.collection().clone();
167 let range_id = range.range_id();
168
169 if range.owner() == member {
170 let watermark = signals.commit_watermark(&collection, range_id);
172 match select_handoff_target(range, member, membership, watermark, signals) {
173 Some((target, evidence)) => {
174 let request = TransitionRequest::new(
175 TransitionKind::Handoff,
176 collection.clone(),
177 range_id,
178 member.clone(),
179 range.epoch(),
180 range.version(),
181 target.clone(),
182 watermark,
183 )
184 .with_evidence(evidence)
185 .with_replicas(without(range.replicas(), &target));
186 steps.push(DrainStep::Handoff(OwnedHandoff {
187 collection,
188 range_id,
189 target,
190 request,
191 }));
192 }
193 None => blocked.push(DrainBlock {
194 collection,
195 range_id,
196 reason: DrainBlockReason::NoSafeHandoffTarget,
197 }),
198 }
199 } else if range.replicas().contains(member) {
200 let remaining = without(range.replicas(), member);
203 let copies_after = 1 + remaining.len();
205 let required = range.placement().replication_factor();
206 if copies_after >= required {
207 let next = range.update_replicas(remaining);
208 steps.push(DrainStep::Evacuate(ReplicaEvacuation {
209 collection,
210 range_id,
211 replacement: None,
212 next,
213 }));
214 } else if let Some(replacement) = select_replacement_replica(range, member, membership)
215 {
216 let mut replicas = remaining;
217 replicas.push(replacement.clone());
218 let next = range.update_replicas(replicas);
219 steps.push(DrainStep::Evacuate(ReplicaEvacuation {
220 collection,
221 range_id,
222 replacement: Some(replacement),
223 next,
224 }));
225 } else {
226 blocked.push(DrainBlock {
227 collection,
228 range_id,
229 reason: DrainBlockReason::NoReplacementReplica,
230 });
231 }
232 }
233 }
234
235 DrainPlan {
236 member: member.clone(),
237 steps,
238 blocked,
239 }
240}
241
242#[derive(Debug, Clone, PartialEq, Eq)]
247pub struct DrainOutcome {
248 pub member: NodeIdentity,
249 pub handoffs: Vec<Result<TransitionOutcome, TransitionError>>,
251 pub evacuations: Vec<ReplicaEvacuation>,
253 pub blocked: Vec<DrainBlock>,
255}
256
257impl DrainOutcome {
258 pub fn is_drained(&self) -> bool {
262 self.blocked.is_empty() && self.handoffs.iter().all(Result::is_ok)
263 }
264}
265
266pub fn run_drain(
271 member: &NodeIdentity,
272 membership: &MembershipCatalog,
273 ownership: &mut ShardOwnershipCatalog,
274 signals: &impl ClusterSignals,
275) -> DrainOutcome {
276 let plan = plan_drain(member, membership, ownership, signals);
277 let mut handoffs = Vec::new();
278 let mut evacuations = Vec::new();
279
280 for step in plan.steps {
281 match step {
282 DrainStep::Handoff(handoff) => {
283 handoffs.push(run_transition(ownership, &handoff.request));
284 }
285 DrainStep::Evacuate(evac) => {
286 if ownership.apply_update(evac.next.clone()).is_ok() {
291 evacuations.push(evac);
292 }
293 }
294 }
295 }
296
297 DrainOutcome {
298 member: member.clone(),
299 handoffs,
300 evacuations,
301 blocked: plan.blocked,
302 }
303}
304
305#[derive(Debug, Clone, PartialEq, Eq)]
308pub enum RemovalRejection {
309 NotAMember { member: NodeIdentity },
311 NotDraining { member: NodeIdentity },
316 StillOwnsRanges {
319 member: NodeIdentity,
320 ranges: Vec<(CollectionId, RangeId)>,
321 },
322 StillReplicaFor {
325 member: NodeIdentity,
326 ranges: Vec<(CollectionId, RangeId)>,
327 },
328}
329
330impl std::fmt::Display for RemovalRejection {
331 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
332 match self {
333 Self::NotAMember { member } => write!(f, "{member} is not a cluster member"),
334 Self::NotDraining { member } => {
335 write!(f, "{member} must be marked draining before planned removal")
336 }
337 Self::StillOwnsRanges { member, ranges } => write!(
338 f,
339 "{member} cannot be removed: still owns {} range(s)",
340 ranges.len()
341 ),
342 Self::StillReplicaFor { member, ranges } => write!(
343 f,
344 "{member} cannot be removed: still replicates {} range(s)",
345 ranges.len()
346 ),
347 }
348 }
349}
350
351impl std::error::Error for RemovalRejection {}
352
353pub fn commit_drain_removal(
358 member: &NodeIdentity,
359 membership: &mut MembershipCatalog,
360 ownership: &ShardOwnershipCatalog,
361) -> Result<ClusterMember, RemovalRejection> {
362 match membership.member(member) {
363 None => {
364 return Err(RemovalRejection::NotAMember {
365 member: member.clone(),
366 })
367 }
368 Some(m) if !m.is_draining() => {
369 return Err(RemovalRejection::NotDraining {
370 member: member.clone(),
371 })
372 }
373 Some(_) => {}
374 }
375
376 let (owned, replicated) = range_dependencies(member, ownership);
377 if !owned.is_empty() {
378 return Err(RemovalRejection::StillOwnsRanges {
379 member: member.clone(),
380 ranges: owned,
381 });
382 }
383 if !replicated.is_empty() {
384 return Err(RemovalRejection::StillReplicaFor {
385 member: member.clone(),
386 ranges: replicated,
387 });
388 }
389
390 Ok(membership
391 .remove(member)
392 .expect("membership presence checked above"))
393}
394
395#[derive(Debug, Clone, PartialEq, Eq)]
404pub struct ForceCapability {
405 holder: String,
406}
407
408impl ForceCapability {
409 pub fn granted(holder: impl Into<String>) -> Self {
412 Self {
413 holder: holder.into(),
414 }
415 }
416
417 pub fn holder(&self) -> &str {
418 &self.holder
419 }
420}
421
422#[derive(Debug, Clone, Copy, PartialEq, Eq)]
424pub enum ForceRemoveOrderError {
425 EmptyReason,
428}
429
430impl std::fmt::Display for ForceRemoveOrderError {
431 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
432 match self {
433 Self::EmptyReason => write!(f, "a forced removal requires an explicit operator reason"),
434 }
435 }
436}
437
438impl std::error::Error for ForceRemoveOrderError {}
439
440#[derive(Debug, Clone, PartialEq, Eq)]
445pub struct ForceRemoveOrder {
446 capability: ForceCapability,
447 member: NodeIdentity,
448 reason: String,
449}
450
451impl ForceRemoveOrder {
452 pub fn new(
453 capability: ForceCapability,
454 member: NodeIdentity,
455 reason: impl Into<String>,
456 ) -> Result<Self, ForceRemoveOrderError> {
457 let reason = reason.into();
458 if reason.trim().is_empty() {
459 return Err(ForceRemoveOrderError::EmptyReason);
460 }
461 Ok(Self {
462 capability,
463 member,
464 reason,
465 })
466 }
467
468 pub fn member(&self) -> &NodeIdentity {
469 &self.member
470 }
471
472 pub fn reason(&self) -> &str {
473 &self.reason
474 }
475
476 pub fn capability(&self) -> &ForceCapability {
477 &self.capability
478 }
479}
480
481#[derive(Debug, Clone, PartialEq, Eq)]
488pub struct ForcedPromotion {
489 pub collection: CollectionId,
490 pub range_id: RangeId,
491 pub dead_owner: NodeIdentity,
492 pub new_owner: NodeIdentity,
493 pub covers_watermark: bool,
497 pub evidence: Option<CatchUpEvidence>,
499 pub next: RangeOwnership,
500}
501
502#[derive(Debug, Clone, PartialEq, Eq)]
506pub struct ForcedBlock {
507 pub collection: CollectionId,
508 pub range_id: RangeId,
509 pub dead_owner: NodeIdentity,
510}
511
512#[derive(Debug, Clone, PartialEq, Eq)]
516pub struct ForceRemovePlan {
517 pub member: NodeIdentity,
518 pub reason: String,
519 pub capability_holder: String,
520 pub promotions: Vec<ForcedPromotion>,
521 pub replica_drops: Vec<RangeOwnership>,
525 pub unrecoverable: Vec<ForcedBlock>,
526}
527
528pub fn plan_force_remove(
535 order: &ForceRemoveOrder,
536 membership: &MembershipCatalog,
537 ownership: &ShardOwnershipCatalog,
538 signals: &impl ClusterSignals,
539) -> ForceRemovePlan {
540 let member = order.member();
541 let mut promotions = Vec::new();
542 let mut replica_drops = Vec::new();
543 let mut unrecoverable = Vec::new();
544
545 for range in ownership.entries() {
546 let collection = range.collection().clone();
547 let range_id = range.range_id();
548
549 if range.owner() == member {
550 let watermark = signals.commit_watermark(&collection, range_id);
551 match select_force_target(range, member, membership, watermark, signals) {
552 Some((target, covers_watermark, evidence)) => {
553 let next =
554 range.transfer_to(target.clone(), without(range.replicas(), &target));
555 promotions.push(ForcedPromotion {
556 collection,
557 range_id,
558 dead_owner: member.clone(),
559 new_owner: target,
560 covers_watermark,
561 evidence,
562 next,
563 });
564 }
565 None => unrecoverable.push(ForcedBlock {
566 collection,
567 range_id,
568 dead_owner: member.clone(),
569 }),
570 }
571 } else if range.replicas().contains(member) {
572 replica_drops.push(range.update_replicas(without(range.replicas(), member)));
573 }
574 }
575
576 ForceRemovePlan {
577 member: member.clone(),
578 reason: order.reason().to_string(),
579 capability_holder: order.capability().holder().to_string(),
580 promotions,
581 replica_drops,
582 unrecoverable,
583 }
584}
585
586#[derive(Debug, Clone, PartialEq, Eq)]
592pub struct ForceRemoveAudit {
593 pub member: NodeIdentity,
594 pub capability_holder: String,
595 pub reason: String,
596 pub promotions: Vec<(CollectionId, RangeId, NodeIdentity, bool)>,
599 pub unrecoverable: Vec<(CollectionId, RangeId)>,
600 pub replica_copies_dropped: usize,
601}
602
603impl ForceRemoveAudit {
604 pub fn has_potential_write_loss(&self) -> bool {
607 self.promotions.iter().any(|(_, _, _, covers)| !covers) || !self.unrecoverable.is_empty()
608 }
609}
610
611impl std::fmt::Display for ForceRemoveAudit {
612 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
613 write!(
614 f,
615 "FORCE remove {} by {} (reason: {}): {} range(s) force-promoted, {} unrecoverable, {} stale replica copies dropped",
616 self.member,
617 self.capability_holder,
618 self.reason,
619 self.promotions.len(),
620 self.unrecoverable.len(),
621 self.replica_copies_dropped,
622 )?;
623 if self.has_potential_write_loss() {
624 write!(f, "; POTENTIAL WRITE LOSS")?;
625 }
626 Ok(())
627 }
628}
629
630#[derive(Debug, Clone, PartialEq, Eq)]
636pub struct ForceRemoveResult {
637 pub audit: ForceRemoveAudit,
638 pub promotions: Vec<TransitionOutcome>,
639 pub unrecoverable: Vec<ForcedBlock>,
640 pub removed: Option<ClusterMember>,
641}
642
643pub fn run_force_remove(
649 order: &ForceRemoveOrder,
650 membership: &mut MembershipCatalog,
651 ownership: &mut ShardOwnershipCatalog,
652 signals: &impl ClusterSignals,
653) -> ForceRemoveResult {
654 let plan = plan_force_remove(order, membership, ownership, signals);
655
656 let mut promotion_outcomes = Vec::new();
657 let mut audit_promotions = Vec::new();
658 for promotion in &plan.promotions {
659 let previous_owner = promotion.dead_owner.clone();
660 let new_epoch = promotion.next.epoch();
661 let previous_epoch = ownership
662 .range(&promotion.collection, promotion.range_id)
663 .map(RangeOwnership::epoch)
664 .unwrap_or(new_epoch);
665 let new_version = promotion.next.version();
666 let previous_version = ownership
667 .range(&promotion.collection, promotion.range_id)
668 .map(RangeOwnership::version)
669 .unwrap_or(new_version);
670 let watermark = signals.commit_watermark(&promotion.collection, promotion.range_id);
671 if ownership.apply_update(promotion.next.clone()).is_ok() {
672 audit_promotions.push((
673 promotion.collection.clone(),
674 promotion.range_id,
675 promotion.new_owner.clone(),
676 promotion.covers_watermark,
677 ));
678 promotion_outcomes.push(TransitionOutcome {
679 kind: TransitionKind::Promote,
680 collection: promotion.collection.clone(),
681 range_id: promotion.range_id,
682 previous_owner,
683 new_owner: promotion.new_owner.clone(),
684 previous_epoch,
685 new_epoch,
686 previous_version,
687 new_version,
688 watermark,
689 });
690 }
691 }
692
693 let mut replica_copies_dropped = 0;
694 for drop in &plan.replica_drops {
695 if ownership.apply_update(drop.clone()).is_ok() {
696 replica_copies_dropped += 1;
697 }
698 }
699
700 let removed = membership.remove(order.member());
701
702 let audit = ForceRemoveAudit {
703 member: order.member().clone(),
704 capability_holder: plan.capability_holder,
705 reason: plan.reason,
706 promotions: audit_promotions,
707 unrecoverable: plan
708 .unrecoverable
709 .iter()
710 .map(|b| (b.collection.clone(), b.range_id))
711 .collect(),
712 replica_copies_dropped,
713 };
714
715 ForceRemoveResult {
716 audit,
717 promotions: promotion_outcomes,
718 unrecoverable: plan.unrecoverable,
719 removed,
720 }
721}
722
723#[derive(Debug, Clone, PartialEq, Eq)]
732pub struct DrainStatus {
733 pub member: NodeIdentity,
734 pub is_member: bool,
735 pub is_draining: bool,
736 pub owned_ranges: Vec<(CollectionId, RangeId)>,
737 pub replicated_ranges: Vec<(CollectionId, RangeId)>,
738 pub planned_steps: usize,
739 pub blocked: Vec<DrainBlock>,
740 pub removable: bool,
743}
744
745pub fn drain_status(
748 member: &NodeIdentity,
749 membership: &MembershipCatalog,
750 ownership: &ShardOwnershipCatalog,
751 signals: &impl ClusterSignals,
752) -> DrainStatus {
753 let member_entry = membership.member(member);
754 let is_member = member_entry.is_some();
755 let is_draining = member_entry.is_some_and(ClusterMember::is_draining);
756 let (owned_ranges, replicated_ranges) = range_dependencies(member, ownership);
757 let plan = plan_drain(member, membership, ownership, signals);
758 let removable = is_draining && owned_ranges.is_empty() && replicated_ranges.is_empty();
759
760 DrainStatus {
761 member: member.clone(),
762 is_member,
763 is_draining,
764 owned_ranges,
765 replicated_ranges,
766 planned_steps: plan.steps.len(),
767 blocked: plan.blocked,
768 removable,
769 }
770}
771
772fn range_dependencies(
779 member: &NodeIdentity,
780 ownership: &ShardOwnershipCatalog,
781) -> (Vec<(CollectionId, RangeId)>, Vec<(CollectionId, RangeId)>) {
782 let mut owned = Vec::new();
783 let mut replicated = Vec::new();
784 for range in ownership.entries() {
785 if range.owner() == member {
786 owned.push((range.collection().clone(), range.range_id()));
787 } else if range.replicas().contains(member) {
788 replicated.push((range.collection().clone(), range.range_id()));
789 }
790 }
791 (owned, replicated)
792}
793
794fn without(replicas: &[NodeIdentity], node: &NodeIdentity) -> Vec<NodeIdentity> {
796 replicas.iter().filter(|r| *r != node).cloned().collect()
797}
798
799fn select_handoff_target(
805 range: &RangeOwnership,
806 member: &NodeIdentity,
807 membership: &MembershipCatalog,
808 watermark: CommitWatermark,
809 signals: &impl ClusterSignals,
810) -> Option<(NodeIdentity, CatchUpEvidence)> {
811 let mut best: Option<(CatchUpEvidence, NodeIdentity)> = None;
812 for candidate in range.replicas() {
813 if candidate == member {
814 continue;
815 }
816 if !membership
817 .member(candidate)
818 .is_some_and(ClusterMember::is_placement_eligible)
819 {
820 continue;
821 }
822 let Some(evidence) = signals.catch_up(range.collection(), range.range_id(), candidate)
823 else {
824 continue;
825 };
826 if !evidence.covers(watermark) {
827 continue;
828 }
829 let applied = (evidence.applied_term, evidence.applied_lsn);
830 let better = match &best {
831 None => true,
832 Some((best_ev, best_id)) => {
833 applied > (best_ev.applied_term, best_ev.applied_lsn)
834 || (applied == (best_ev.applied_term, best_ev.applied_lsn)
835 && candidate < best_id)
836 }
837 };
838 if better {
839 best = Some((evidence, candidate.clone()));
840 }
841 }
842 best.map(|(evidence, id)| (id, evidence))
843}
844
845fn select_force_target(
853 range: &RangeOwnership,
854 member: &NodeIdentity,
855 membership: &MembershipCatalog,
856 watermark: CommitWatermark,
857 signals: &impl ClusterSignals,
858) -> Option<(NodeIdentity, bool, Option<CatchUpEvidence>)> {
859 let mut best: Option<(bool, (u64, u64), NodeIdentity, Option<CatchUpEvidence>)> = None;
863 for candidate in range.replicas() {
864 if candidate == member {
865 continue;
866 }
867 if !membership
868 .member(candidate)
869 .is_some_and(ClusterMember::is_placement_eligible)
870 {
871 continue;
872 }
873 let evidence = signals.catch_up(range.collection(), range.range_id(), candidate);
874 let covers = evidence.as_ref().is_some_and(|e| e.covers(watermark));
875 let applied = evidence
876 .as_ref()
877 .map(|e| (e.applied_term, e.applied_lsn))
878 .unwrap_or((0, 0));
879 let better = match &best {
880 None => true,
881 Some((best_covers, best_applied, best_id, _)) => {
882 (covers, applied) > (*best_covers, *best_applied)
883 || ((covers, applied) == (*best_covers, *best_applied) && candidate < best_id)
884 }
885 };
886 if better {
887 best = Some((covers, applied, candidate.clone(), evidence));
888 }
889 }
890 best.map(|(covers, _, id, evidence)| (id, covers, evidence))
891}
892
893fn select_replacement_replica(
899 range: &RangeOwnership,
900 member: &NodeIdentity,
901 membership: &MembershipCatalog,
902) -> Option<NodeIdentity> {
903 membership
904 .placement_eligible_members()
905 .map(ClusterMember::identity)
906 .find(|id| *id != member && range.owner() != *id && !range.replicas().contains(id))
907 .cloned()
908}
909
910#[cfg(test)]
911mod tests {
912 use super::*;
913 use crate::cluster::membership::{ClusterId, MemberKind};
914 use crate::cluster::ownership::{
915 OwnershipEpoch, PlacementMetadata, RangeBounds, RangeRole, RangeWriteReject, ShardKeyMode,
916 };
917 use std::collections::HashMap;
918
919 fn ident(cn: &str) -> NodeIdentity {
920 NodeIdentity::from_certificate_subject(cn).unwrap()
921 }
922
923 fn collection(name: &str) -> CollectionId {
924 CollectionId::new(name).unwrap()
925 }
926
927 fn data_member(cn: &str) -> ClusterMember {
928 ClusterMember::joined_empty(ident(cn), MemberKind::Data)
929 }
930
931 fn membership(members: &[&str]) -> MembershipCatalog {
932 MembershipCatalog::new(
933 ClusterId::new("cluster-x").unwrap(),
934 members.iter().map(|m| data_member(m)),
935 )
936 }
937
938 fn catalog_with_rf(
941 owner: &str,
942 replicas: &[&str],
943 rf: usize,
944 ) -> (ShardOwnershipCatalog, CollectionId) {
945 let orders = collection("orders");
946 let mut catalog = ShardOwnershipCatalog::new();
947 catalog
948 .apply_update(RangeOwnership::establish(
949 orders.clone(),
950 RangeId::new(1),
951 ShardKeyMode::Hash,
952 RangeBounds::full(),
953 ident(owner),
954 replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
955 PlacementMetadata::with_replication_factor(rf),
956 ))
957 .unwrap();
958 (catalog, orders)
959 }
960
961 fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
962 catalog_with_rf(owner, replicas, 3)
963 }
964
965 struct FakeSignals {
969 watermark: CommitWatermark,
970 catch_up: HashMap<NodeIdentity, CatchUpEvidence>,
971 }
972
973 impl FakeSignals {
974 fn new(watermark: CommitWatermark) -> Self {
975 Self {
976 watermark,
977 catch_up: HashMap::new(),
978 }
979 }
980
981 fn with_catch_up(mut self, cn: &str, applied_term: u64, applied_lsn: u64) -> Self {
982 self.catch_up.insert(
983 ident(cn),
984 CatchUpEvidence::new(ident(cn), applied_term, applied_lsn),
985 );
986 self
987 }
988 }
989
990 impl ClusterSignals for FakeSignals {
991 fn member_signals(
992 &self,
993 _member: &NodeIdentity,
994 ) -> crate::cluster::supervisor::MemberSignals {
995 crate::cluster::supervisor::MemberSignals::healthy()
996 }
997
998 fn commit_watermark(
999 &self,
1000 _collection: &CollectionId,
1001 _range_id: RangeId,
1002 ) -> CommitWatermark {
1003 self.watermark
1004 }
1005
1006 fn catch_up(
1007 &self,
1008 _collection: &CollectionId,
1009 _range_id: RangeId,
1010 candidate: &NodeIdentity,
1011 ) -> Option<CatchUpEvidence> {
1012 self.catch_up.get(candidate).cloned()
1013 }
1014 }
1015
1016 #[test]
1019 fn begin_drain_marks_member_and_excludes_from_placement() {
1020 let mut members = membership(&["CN=node-a", "CN=node-b"]);
1021 assert!(members
1022 .member(&ident("CN=node-a"))
1023 .unwrap()
1024 .is_placement_eligible());
1025
1026 let changed = members.begin_drain(&ident("CN=node-a"));
1027 assert_eq!(changed, Some(true));
1028 assert!(members.member(&ident("CN=node-a")).unwrap().is_draining());
1029 assert_eq!(members.begin_drain(&ident("CN=node-a")), Some(false));
1031 assert!(!members
1033 .member(&ident("CN=node-a"))
1034 .unwrap()
1035 .is_placement_eligible());
1036 let eligible: Vec<_> = members
1037 .placement_eligible_members()
1038 .map(|m| m.identity().clone())
1039 .collect();
1040 assert_eq!(eligible, vec![ident("CN=node-b")]);
1041 assert_eq!(members.begin_drain(&ident("CN=ghost")), None);
1043 }
1044
1045 #[test]
1048 fn successful_drain_moves_all_ranges_then_allows_removal() {
1049 let mut members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
1052 let orders = collection("orders");
1053 let mut catalog = ShardOwnershipCatalog::new();
1054 catalog
1056 .apply_update(RangeOwnership::establish(
1057 orders.clone(),
1058 RangeId::new(1),
1059 ShardKeyMode::Hash,
1060 RangeBounds::full(),
1061 ident("CN=node-a"),
1062 vec![ident("CN=node-b"), ident("CN=node-c")],
1063 PlacementMetadata::with_replication_factor(2),
1064 ))
1065 .unwrap();
1066 let events = collection("events");
1068 catalog
1069 .apply_update(RangeOwnership::establish(
1070 events.clone(),
1071 RangeId::new(1),
1072 ShardKeyMode::Hash,
1073 RangeBounds::full(),
1074 ident("CN=node-b"),
1075 vec![ident("CN=node-a")],
1076 PlacementMetadata::with_replication_factor(1),
1077 ))
1078 .unwrap();
1079
1080 members.begin_drain(&ident("CN=node-a")).unwrap();
1081 let signals = FakeSignals::new(CommitWatermark::new(1, 10))
1082 .with_catch_up("CN=node-b", 1, 10)
1083 .with_catch_up("CN=node-c", 1, 10);
1084
1085 let outcome = run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
1086 assert!(outcome.is_drained(), "every range moved off node-a");
1087 assert_eq!(outcome.handoffs.len(), 1);
1088 assert!(outcome.handoffs[0].is_ok());
1089 assert_eq!(outcome.evacuations.len(), 1);
1090
1091 let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
1094 assert_eq!(r1.owner(), &ident("CN=node-b"));
1095 assert!(r1.epoch().value() > 1, "epoch bumped to fence old owner");
1096 let r2 = catalog.range(&events, RangeId::new(1)).unwrap();
1098 assert!(!r2.replicas().contains(&ident("CN=node-a")));
1099
1100 let removed = commit_drain_removal(&ident("CN=node-a"), &mut members, &catalog)
1102 .expect("drained member is removable");
1103 assert_eq!(removed.identity(), &ident("CN=node-a"));
1104 assert!(!members.is_authorized(&ident("CN=node-a")));
1105
1106 let err = catalog
1108 .admit_public_write(
1109 &ident("CN=node-a"),
1110 &orders,
1111 b"k",
1112 OwnershipEpoch::initial(),
1113 )
1114 .unwrap_err();
1115 assert!(matches!(
1116 err,
1117 RangeWriteReject::NotOwner { .. } | RangeWriteReject::StaleEpoch { .. }
1118 ));
1119 }
1120
1121 #[test]
1124 fn drain_blocked_by_unmoved_range_refuses_removal() {
1125 let mut members = membership(&["CN=node-a", "CN=node-b"]);
1129 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
1130 members.begin_drain(&ident("CN=node-a")).unwrap();
1131 let signals =
1132 FakeSignals::new(CommitWatermark::new(2, 50)).with_catch_up("CN=node-b", 2, 49); let outcome = run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
1135 assert!(!outcome.is_drained());
1136 assert!(outcome.handoffs.is_empty());
1137 assert_eq!(outcome.blocked.len(), 1);
1138 assert_eq!(
1139 outcome.blocked[0].reason,
1140 DrainBlockReason::NoSafeHandoffTarget
1141 );
1142
1143 let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
1145 assert_eq!(r1.owner(), &ident("CN=node-a"));
1146 assert_eq!(r1.epoch(), OwnershipEpoch::initial());
1147
1148 let err = commit_drain_removal(&ident("CN=node-a"), &mut members, &catalog).unwrap_err();
1150 match err {
1151 RemovalRejection::StillOwnsRanges { ranges, .. } => {
1152 assert_eq!(ranges, vec![(orders.clone(), RangeId::new(1))]);
1153 }
1154 other => panic!("expected StillOwnsRanges, got {other:?}"),
1155 }
1156 assert!(members.is_authorized(&ident("CN=node-a")), "still a member");
1157 }
1158
1159 #[test]
1160 fn drain_blocked_when_replica_evac_would_drop_below_rf() {
1161 let mut members = membership(&["CN=node-a", "CN=node-b"]);
1164 let (mut catalog, _orders) = catalog_with_rf("CN=node-b", &["CN=node-a"], 2);
1165 members.begin_drain(&ident("CN=node-a")).unwrap();
1166 let signals = FakeSignals::new(CommitWatermark::new(1, 10));
1167
1168 let outcome = run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
1169 assert_eq!(outcome.blocked.len(), 1);
1170 assert_eq!(
1171 outcome.blocked[0].reason,
1172 DrainBlockReason::NoReplacementReplica
1173 );
1174 }
1175
1176 #[test]
1177 fn replica_evac_assigns_replacement_to_preserve_rf() {
1178 let mut members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
1181 let (mut catalog, orders) = catalog_with_rf("CN=node-b", &["CN=node-a"], 2);
1182 members.begin_drain(&ident("CN=node-a")).unwrap();
1183 let signals = FakeSignals::new(CommitWatermark::new(1, 10));
1184
1185 let plan = plan_drain(&ident("CN=node-a"), &members, &catalog, &signals);
1186 assert_eq!(plan.steps.len(), 1);
1187 match &plan.steps[0] {
1188 DrainStep::Evacuate(evac) => {
1189 assert_eq!(evac.replacement, Some(ident("CN=node-c")));
1190 }
1191 other => panic!("expected Evacuate, got {other:?}"),
1192 }
1193
1194 run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
1195 let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
1196 assert!(!r1.replicas().contains(&ident("CN=node-a")));
1197 assert!(r1.replicas().contains(&ident("CN=node-c")));
1198 assert_eq!(r1.owner(), &ident("CN=node-b"));
1200 assert_eq!(r1.epoch(), OwnershipEpoch::initial());
1201 }
1202
1203 #[test]
1206 fn draining_member_is_never_a_handoff_or_replacement_target() {
1207 let mut members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
1210 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
1211 members.begin_drain(&ident("CN=node-a")).unwrap();
1212 members.begin_drain(&ident("CN=node-b")).unwrap();
1213 let signals = FakeSignals::new(CommitWatermark::new(1, 10))
1214 .with_catch_up("CN=node-b", 1, 10)
1215 .with_catch_up("CN=node-c", 1, 10);
1216
1217 let plan = plan_drain(&ident("CN=node-a"), &members, &catalog, &signals);
1218 match &plan.steps[0] {
1219 DrainStep::Handoff(h) => assert_eq!(
1220 h.target,
1221 ident("CN=node-c"),
1222 "draining node-b is not a placement target"
1223 ),
1224 other => panic!("expected Handoff, got {other:?}"),
1225 }
1226
1227 run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
1228 let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
1229 assert_eq!(r1.owner(), &ident("CN=node-c"));
1230 }
1231
1232 #[test]
1235 fn force_remove_promotes_surviving_replica_and_fences_dead_owner() {
1236 let mut members = membership(&["CN=node-a", "CN=node-b"]);
1238 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
1239 let signals =
1240 FakeSignals::new(CommitWatermark::new(1, 10)).with_catch_up("CN=node-b", 1, 10);
1241 let order = ForceRemoveOrder::new(
1242 ForceCapability::granted("ops:alice"),
1243 ident("CN=node-a"),
1244 "node-a hardware failure, unrecoverable",
1245 )
1246 .unwrap();
1247
1248 let result = run_force_remove(&order, &mut members, &mut catalog, &signals);
1249 assert_eq!(result.promotions.len(), 1);
1250 assert_eq!(result.promotions[0].new_owner, ident("CN=node-b"));
1251 assert!(result.promotions[0].fenced_old_owner());
1252 assert!(result.unrecoverable.is_empty());
1253 assert!(result.removed.is_some());
1255 assert!(!members.is_authorized(&ident("CN=node-a")));
1256
1257 let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
1259 assert_eq!(r1.owner(), &ident("CN=node-b"));
1260 assert_eq!(r1.role_of(&ident("CN=node-b")), RangeRole::Owner);
1261 assert!(r1.epoch().value() > 1);
1262
1263 assert!(!result.audit.has_potential_write_loss());
1265 let line = result.audit.to_string();
1266 assert!(line.contains("FORCE remove"));
1267 assert!(line.contains("ops:alice"));
1268 assert!(line.contains("hardware failure"));
1269 }
1270
1271 #[test]
1272 fn force_remove_proceeds_with_behind_replica_and_records_write_loss() {
1273 let mut members = membership(&["CN=node-a", "CN=node-b"]);
1277 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
1278 let signals =
1279 FakeSignals::new(CommitWatermark::new(2, 50)).with_catch_up("CN=node-b", 2, 49); let order = ForceRemoveOrder::new(
1281 ForceCapability::granted("ops:bob"),
1282 ident("CN=node-a"),
1283 "node-a disk destroyed",
1284 )
1285 .unwrap();
1286
1287 let result = run_force_remove(&order, &mut members, &mut catalog, &signals);
1288 assert_eq!(result.promotions.len(), 1);
1289 assert!(!result.audit.promotions[0].3, "does not cover watermark");
1290 assert!(result.audit.has_potential_write_loss());
1291 assert!(result.audit.to_string().contains("POTENTIAL WRITE LOSS"));
1292
1293 let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
1295 assert_eq!(r1.owner(), &ident("CN=node-b"));
1296 }
1297
1298 #[test]
1299 fn force_remove_records_unrecoverable_range_with_no_replica() {
1300 let mut members = membership(&["CN=node-a", "CN=node-b"]);
1303 let (mut catalog, orders) = catalog_with("CN=node-a", &[]);
1304 let signals = FakeSignals::new(CommitWatermark::new(1, 10));
1305 let order = ForceRemoveOrder::new(
1306 ForceCapability::granted("ops:carol"),
1307 ident("CN=node-a"),
1308 "node-a lost, no replicas existed",
1309 )
1310 .unwrap();
1311
1312 let result = run_force_remove(&order, &mut members, &mut catalog, &signals);
1313 assert!(result.promotions.is_empty());
1314 assert_eq!(result.unrecoverable.len(), 1);
1315 assert_eq!(result.unrecoverable[0].range_id, RangeId::new(1));
1316 assert!(result.audit.has_potential_write_loss());
1317 assert!(result.removed.is_some());
1318 assert!(!members.is_authorized(&ident("CN=node-a")));
1319
1320 let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
1323 assert_eq!(r1.owner(), &ident("CN=node-a"));
1324 }
1325
1326 #[test]
1327 fn force_remove_drops_dead_members_stale_replica_copies() {
1328 let mut members = membership(&["CN=node-a", "CN=node-b"]);
1331 let (mut catalog, orders) = catalog_with("CN=node-b", &["CN=node-a"]);
1332 let signals = FakeSignals::new(CommitWatermark::new(1, 10));
1333 let order = ForceRemoveOrder::new(
1334 ForceCapability::granted("ops:dan"),
1335 ident("CN=node-a"),
1336 "node-a gone",
1337 )
1338 .unwrap();
1339
1340 let result = run_force_remove(&order, &mut members, &mut catalog, &signals);
1341 assert!(result.promotions.is_empty());
1342 assert_eq!(result.audit.replica_copies_dropped, 1);
1343 let r1 = catalog.range(&orders, RangeId::new(1)).unwrap();
1344 assert_eq!(r1.owner(), &ident("CN=node-b"));
1345 assert!(!r1.replicas().contains(&ident("CN=node-a")));
1346 assert_eq!(r1.epoch(), OwnershipEpoch::initial(), "owner unchanged");
1347 }
1348
1349 #[test]
1350 fn force_remove_order_requires_explicit_reason() {
1351 let err = ForceRemoveOrder::new(
1352 ForceCapability::granted("ops:eve"),
1353 ident("CN=node-a"),
1354 " ",
1355 )
1356 .unwrap_err();
1357 assert_eq!(err, ForceRemoveOrderError::EmptyReason);
1358 }
1359
1360 #[test]
1363 fn drain_status_reports_dependencies_and_removability() {
1364 let mut members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
1365 let (mut catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
1366 let signals = FakeSignals::new(CommitWatermark::new(1, 10))
1367 .with_catch_up("CN=node-b", 1, 10)
1368 .with_catch_up("CN=node-c", 1, 10);
1369
1370 let status = drain_status(&ident("CN=node-a"), &members, &catalog, &signals);
1372 assert!(status.is_member);
1373 assert!(!status.is_draining);
1374 assert_eq!(status.owned_ranges.len(), 1);
1375 assert!(status.replicated_ranges.is_empty());
1376 assert_eq!(status.planned_steps, 1);
1377 assert!(!status.removable);
1378
1379 members.begin_drain(&ident("CN=node-a")).unwrap();
1381 run_drain(&ident("CN=node-a"), &members, &mut catalog, &signals);
1382 let status = drain_status(&ident("CN=node-a"), &members, &catalog, &signals);
1383 assert!(status.is_draining);
1384 assert!(status.owned_ranges.is_empty());
1385 assert!(status.replicated_ranges.is_empty());
1386 assert!(status.removable);
1387 }
1388
1389 #[test]
1390 fn removing_a_non_member_or_non_draining_member_is_refused() {
1391 let mut members = membership(&["CN=node-a"]);
1392 let catalog = ShardOwnershipCatalog::new();
1393
1394 let err = commit_drain_removal(&ident("CN=ghost"), &mut members, &catalog).unwrap_err();
1396 assert!(matches!(err, RemovalRejection::NotAMember { .. }));
1397
1398 let err = commit_drain_removal(&ident("CN=node-a"), &mut members, &catalog).unwrap_err();
1401 assert!(matches!(err, RemovalRejection::NotDraining { .. }));
1402 }
1403}