use std::collections::BTreeMap;
use std::time::Duration;
use super::identity::NodeIdentity;
use super::membership::MembershipCatalog;
use super::ownership::{CollectionId, RangeId, ShardOwnershipCatalog};
use super::ownership_transition::{
run_transition, CatchUpEvidence, CommitWatermark, TransitionError, TransitionKind,
TransitionOutcome, TransitionRequest,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MemberSignals {
pub since_last_heartbeat: Duration,
pub replication_lag_lsn: u64,
pub recent_errors: u32,
pub unhealthy_for: Duration,
}
impl MemberSignals {
pub fn healthy() -> Self {
Self {
since_last_heartbeat: Duration::ZERO,
replication_lag_lsn: 0,
recent_errors: 0,
unhealthy_for: Duration::ZERO,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HealthClass {
Healthy,
Degraded,
Failed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct HealthScore {
pub overall: u8,
pub liveness: u8,
pub lag: u8,
pub errors: u8,
pub class: HealthClass,
}
impl HealthScore {
pub fn is_healthy(&self) -> bool {
self.class == HealthClass::Healthy
}
pub fn is_failed(&self) -> bool {
self.class == HealthClass::Failed
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct HealthPolicy {
pub heartbeat_timeout: Duration,
pub max_replication_lag: u64,
pub max_recent_errors: u32,
pub failover_threshold: u8,
pub degraded_threshold: u8,
pub grace_period: Duration,
}
impl Default for HealthPolicy {
fn default() -> Self {
Self {
heartbeat_timeout: Duration::from_secs(10),
max_replication_lag: 10_000,
max_recent_errors: 20,
failover_threshold: 30,
degraded_threshold: 70,
grace_period: Duration::from_secs(30),
}
}
}
fn ramp_down(value: f64, limit: f64) -> u8 {
if limit <= 0.0 {
return if value <= 0.0 { 100 } else { 0 };
}
let clamped = value.min(limit);
(100.0 * (1.0 - clamped / limit)).round() as u8
}
impl HealthPolicy {
pub fn evaluate(&self, signals: &MemberSignals) -> HealthScore {
let liveness = ramp_down(
signals.since_last_heartbeat.as_secs_f64(),
self.heartbeat_timeout.as_secs_f64(),
);
let lag = ramp_down(
signals.replication_lag_lsn as f64,
self.max_replication_lag as f64,
);
let errors = ramp_down(signals.recent_errors as f64, self.max_recent_errors as f64);
let overall =
(liveness as f64 * 0.7 + lag as f64 * 0.2 + errors as f64 * 0.1).round() as u8;
let class = if overall <= self.failover_threshold {
HealthClass::Failed
} else if overall <= self.degraded_threshold {
HealthClass::Degraded
} else {
HealthClass::Healthy
};
HealthScore {
overall,
liveness,
lag,
errors,
class,
}
}
pub fn failover_eligible(&self, score: &HealthScore, signals: &MemberSignals) -> bool {
score.is_failed() && signals.unhealthy_for >= self.grace_period
}
}
pub trait ClusterSignals {
fn member_signals(&self, member: &NodeIdentity) -> MemberSignals;
fn commit_watermark(&self, collection: &CollectionId, range_id: RangeId) -> CommitWatermark;
fn catch_up(
&self,
collection: &CollectionId,
range_id: RangeId,
candidate: &NodeIdentity,
) -> Option<CatchUpEvidence>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PlannedPromotion {
pub collection: CollectionId,
pub range_id: RangeId,
pub failed_owner: NodeIdentity,
pub candidate: NodeIdentity,
pub candidate_score: HealthScore,
pub owner_score: HealthScore,
pub request: TransitionRequest,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BlockedReason {
NoSafeCandidate,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockedFailover {
pub collection: CollectionId,
pub range_id: RangeId,
pub failed_owner: NodeIdentity,
pub owner_score: HealthScore,
pub reason: BlockedReason,
}
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct FailoverPlan {
pub promotions: Vec<PlannedPromotion>,
pub blocked: Vec<BlockedFailover>,
}
impl FailoverPlan {
pub fn is_empty(&self) -> bool {
self.promotions.is_empty() && self.blocked.is_empty()
}
}
#[derive(Debug, Clone, Default)]
pub struct ClusterSupervisor {
policy: HealthPolicy,
}
impl ClusterSupervisor {
pub fn new(policy: HealthPolicy) -> Self {
Self { policy }
}
pub fn policy(&self) -> &HealthPolicy {
&self.policy
}
pub fn assess(&self, signals: &MemberSignals) -> HealthScore {
self.policy.evaluate(signals)
}
pub fn assess_members(
&self,
membership: &MembershipCatalog,
signals: &impl ClusterSignals,
) -> BTreeMap<NodeIdentity, HealthScore> {
membership
.members()
.map(|m| {
let id = m.identity().clone();
let score = self.policy.evaluate(&signals.member_signals(&id));
(id, score)
})
.collect()
}
pub fn plan_failovers(
&self,
membership: &MembershipCatalog,
ownership: &ShardOwnershipCatalog,
signals: &impl ClusterSignals,
) -> FailoverPlan {
let mut plan = FailoverPlan::default();
for range in ownership.entries() {
let owner = range.owner().clone();
let owner_signals = signals.member_signals(&owner);
let owner_score = self.policy.evaluate(&owner_signals);
if !self.policy.failover_eligible(&owner_score, &owner_signals) {
continue;
}
let collection = range.collection().clone();
let range_id = range.range_id();
let watermark = signals.commit_watermark(&collection, range_id);
let mut best: Option<(HealthScore, CatchUpEvidence, NodeIdentity)> = None;
for candidate in range.replicas() {
if !membership
.member(candidate)
.is_some_and(|m| m.kind().holds_data())
{
continue;
}
let cand_score = self.policy.evaluate(&signals.member_signals(candidate));
if cand_score.is_failed() {
continue;
}
let Some(evidence) = signals.catch_up(&collection, range_id, candidate) else {
continue;
};
if !evidence.covers(watermark) {
continue;
}
let better = match &best {
None => true,
Some((best_score, _, best_id)) => {
cand_score.overall > best_score.overall
|| (cand_score.overall == best_score.overall && candidate < best_id)
}
};
if better {
best = Some((cand_score, evidence, candidate.clone()));
}
}
match best {
Some((candidate_score, evidence, candidate)) => {
let request = TransitionRequest::new(
TransitionKind::Promote,
collection.clone(),
range_id,
owner.clone(),
range.epoch(),
range.version(),
candidate.clone(),
watermark,
)
.with_evidence(evidence)
.with_replicas(remaining_replicas(range.replicas(), &candidate));
plan.promotions.push(PlannedPromotion {
collection,
range_id,
failed_owner: owner,
candidate,
candidate_score,
owner_score,
request,
});
}
None => plan.blocked.push(BlockedFailover {
collection,
range_id,
failed_owner: owner,
owner_score,
reason: BlockedReason::NoSafeCandidate,
}),
}
}
plan
}
pub fn run_failovers(
&self,
membership: &MembershipCatalog,
ownership: &mut ShardOwnershipCatalog,
signals: &impl ClusterSignals,
) -> (
Vec<Result<TransitionOutcome, TransitionError>>,
FailoverPlan,
) {
let plan = self.plan_failovers(membership, ownership, signals);
let outcomes = plan
.promotions
.iter()
.map(|p| run_transition(ownership, &p.request))
.collect();
(outcomes, plan)
}
}
fn remaining_replicas(replicas: &[NodeIdentity], promoted: &NodeIdentity) -> Vec<NodeIdentity> {
replicas
.iter()
.filter(|r| *r != promoted)
.cloned()
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::cluster::membership::{ClusterId, ClusterMember, MemberKind};
use crate::cluster::ownership::{CatalogVersion, OwnershipEpoch};
use crate::cluster::ownership::{
PlacementMetadata, RangeBounds, RangeOwnership, RangeRole, RangeWriteReject, ShardKeyMode,
};
use std::collections::HashMap;
fn ident(cn: &str) -> NodeIdentity {
NodeIdentity::from_certificate_subject(cn).unwrap()
}
fn collection(name: &str) -> CollectionId {
CollectionId::new(name).unwrap()
}
fn data_member(cn: &str) -> ClusterMember {
ClusterMember::joined_empty(ident(cn), MemberKind::Data)
}
fn membership(members: &[&str]) -> MembershipCatalog {
MembershipCatalog::new(
ClusterId::new("cluster-x").unwrap(),
members.iter().map(|m| data_member(m)),
)
}
fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
let orders = collection("orders");
let mut catalog = ShardOwnershipCatalog::new();
catalog
.apply_update(RangeOwnership::establish(
orders.clone(),
RangeId::new(1),
ShardKeyMode::Hash,
RangeBounds::full(),
ident(owner),
replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
PlacementMetadata::with_replication_factor(3),
))
.unwrap();
(catalog, orders)
}
struct FakeSignals {
members: HashMap<NodeIdentity, MemberSignals>,
watermark: CommitWatermark,
catch_up: HashMap<NodeIdentity, CatchUpEvidence>,
}
impl FakeSignals {
fn new(watermark: CommitWatermark) -> Self {
Self {
members: HashMap::new(),
watermark,
catch_up: HashMap::new(),
}
}
fn with_member(mut self, cn: &str, signals: MemberSignals) -> Self {
self.members.insert(ident(cn), signals);
self
}
fn with_catch_up(mut self, cn: &str, applied_term: u64, applied_lsn: u64) -> Self {
self.catch_up.insert(
ident(cn),
CatchUpEvidence::new(ident(cn), applied_term, applied_lsn),
);
self
}
}
impl ClusterSignals for FakeSignals {
fn member_signals(&self, member: &NodeIdentity) -> MemberSignals {
self.members
.get(member)
.copied()
.unwrap_or_else(MemberSignals::healthy)
}
fn commit_watermark(
&self,
_collection: &CollectionId,
_range_id: RangeId,
) -> CommitWatermark {
self.watermark
}
fn catch_up(
&self,
_collection: &CollectionId,
_range_id: RangeId,
candidate: &NodeIdentity,
) -> Option<CatchUpEvidence> {
self.catch_up.get(candidate).cloned()
}
}
fn failed_signals() -> MemberSignals {
MemberSignals {
since_last_heartbeat: Duration::from_secs(60),
replication_lag_lsn: 50_000,
recent_errors: 100,
unhealthy_for: Duration::from_secs(60),
}
}
#[test]
fn fresh_member_scores_perfectly_healthy() {
let policy = HealthPolicy::default();
let score = policy.evaluate(&MemberSignals::healthy());
assert_eq!(score.overall, 100);
assert_eq!(score.class, HealthClass::Healthy);
}
#[test]
fn score_combines_signals_not_just_a_timeout() {
let policy = HealthPolicy::default();
let signals = MemberSignals {
since_last_heartbeat: Duration::from_secs(2), replication_lag_lsn: 0,
recent_errors: 0,
unhealthy_for: Duration::ZERO,
};
let score = policy.evaluate(&signals);
assert_eq!(score.liveness, 80, "heartbeat at 1/5 of the timeout");
assert_eq!(score.lag, 100);
assert_eq!(score.errors, 100);
assert_eq!(score.overall, 86);
assert_eq!(score.class, HealthClass::Healthy);
}
#[test]
fn lag_and_errors_pull_a_live_member_into_degraded() {
let policy = HealthPolicy::default();
let signals = MemberSignals {
since_last_heartbeat: Duration::ZERO,
replication_lag_lsn: 10_000, recent_errors: 20, unhealthy_for: Duration::ZERO,
};
let score = policy.evaluate(&signals);
assert_eq!(score.overall, 70);
assert_eq!(score.class, HealthClass::Degraded);
}
#[test]
fn dead_heartbeat_alone_reaches_failed() {
let policy = HealthPolicy::default();
let signals = MemberSignals {
since_last_heartbeat: Duration::from_secs(30),
replication_lag_lsn: 0,
recent_errors: 0,
unhealthy_for: Duration::from_secs(30),
};
let score = policy.evaluate(&signals);
assert_eq!(score.liveness, 0);
assert_eq!(score.overall, 30);
assert_eq!(score.class, HealthClass::Failed);
}
#[test]
fn totally_unreachable_member_is_failed() {
let policy = HealthPolicy::default();
let signals = MemberSignals {
since_last_heartbeat: Duration::from_secs(30),
replication_lag_lsn: 50_000,
recent_errors: 100,
unhealthy_for: Duration::from_secs(30),
};
let score = policy.evaluate(&signals);
assert_eq!(score.overall, 0);
assert_eq!(score.class, HealthClass::Failed);
}
#[test]
fn healthy_cluster_is_a_no_op() {
let supervisor = ClusterSupervisor::default();
let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
let signals = FakeSignals::new(CommitWatermark::new(1, 10));
let plan = supervisor.plan_failovers(&members, &catalog, &signals);
assert!(plan.is_empty(), "no failover when every owner is healthy");
}
#[test]
fn degraded_owner_is_detected_but_not_failed_over() {
let supervisor = ClusterSupervisor::default();
let members = membership(&["CN=node-a", "CN=node-b"]);
let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let signals = FakeSignals::new(CommitWatermark::new(1, 10)).with_member(
"CN=node-a",
MemberSignals {
since_last_heartbeat: Duration::ZERO,
replication_lag_lsn: 10_000,
recent_errors: 20,
unhealthy_for: Duration::ZERO,
},
);
let score = supervisor.assess(&signals.member_signals(&ident("CN=node-a")));
assert_eq!(score.class, HealthClass::Degraded, "detected as degraded");
let plan = supervisor.plan_failovers(&members, &catalog, &signals);
assert!(plan.is_empty(), "a degraded owner is not failed over");
}
#[test]
fn safe_candidate_is_promoted_and_old_owner_is_fenced() {
let supervisor = ClusterSupervisor::default();
let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
let signals = FakeSignals::new(CommitWatermark::new(1, 10))
.with_member("CN=node-a", failed_signals())
.with_catch_up("CN=node-b", 1, 10)
.with_catch_up("CN=node-c", 1, 10);
let (outcomes, plan) = supervisor.run_failovers(&members, &mut catalog, &signals);
assert_eq!(plan.promotions.len(), 1);
assert!(plan.blocked.is_empty());
let promotion = &plan.promotions[0];
assert_eq!(promotion.failed_owner, ident("CN=node-a"));
assert_eq!(
promotion.candidate,
ident("CN=node-b"),
"healthiest, tie -> lowest id"
);
let outcome = outcomes[0].as_ref().expect("promotion should activate");
assert_eq!(outcome.kind, TransitionKind::Promote);
assert!(
outcome.fenced_old_owner(),
"epoch bumped to fence old owner"
);
assert_eq!(outcome.new_owner, ident("CN=node-b"));
let range = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(range.owner(), &ident("CN=node-b"));
assert_eq!(range.role_of(&ident("CN=node-b")), RangeRole::Owner);
let err = catalog
.admit_public_write(
&ident("CN=node-a"),
&orders,
b"k",
OwnershipEpoch::initial(),
)
.unwrap_err();
assert!(matches!(
err,
RangeWriteReject::NotOwner { .. } | RangeWriteReject::StaleEpoch { .. }
));
}
#[test]
fn unsafe_candidate_behind_watermark_is_rejected() {
let supervisor = ClusterSupervisor::default();
let members = membership(&["CN=node-a", "CN=node-b"]);
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let signals = FakeSignals::new(CommitWatermark::new(2, 50))
.with_member("CN=node-a", failed_signals())
.with_catch_up("CN=node-b", 2, 49);
let (outcomes, plan) = supervisor.run_failovers(&members, &mut catalog, &signals);
assert!(plan.promotions.is_empty(), "no safe promotion");
assert!(outcomes.is_empty());
assert_eq!(plan.blocked.len(), 1);
assert_eq!(plan.blocked[0].reason, BlockedReason::NoSafeCandidate);
assert_eq!(plan.blocked[0].failed_owner, ident("CN=node-a"));
let range = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(range.owner(), &ident("CN=node-a"));
assert_eq!(range.epoch(), OwnershipEpoch::initial());
assert_eq!(range.version(), CatalogVersion::initial());
}
#[test]
fn flapping_owner_within_grace_period_is_not_failed_over() {
let supervisor = ClusterSupervisor::default();
let members = membership(&["CN=node-a", "CN=node-b"]);
let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let signals = FakeSignals::new(CommitWatermark::new(1, 10))
.with_member(
"CN=node-a",
MemberSignals {
since_last_heartbeat: Duration::from_secs(30),
replication_lag_lsn: 50_000,
recent_errors: 100,
unhealthy_for: Duration::from_secs(2), },
)
.with_catch_up("CN=node-b", 1, 10);
let score = supervisor.assess(&signals.member_signals(&ident("CN=node-a")));
assert_eq!(score.class, HealthClass::Failed);
let plan = supervisor.plan_failovers(&members, &catalog, &signals);
assert!(plan.is_empty(), "flap inside grace period is damped");
}
#[test]
fn unknown_candidate_progress_blocks_failover() {
let supervisor = ClusterSupervisor::default();
let members = membership(&["CN=node-a", "CN=node-b"]);
let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let signals = FakeSignals::new(CommitWatermark::new(1, 10))
.with_member("CN=node-a", failed_signals());
let plan = supervisor.plan_failovers(&members, &catalog, &signals);
assert_eq!(plan.blocked.len(), 1);
assert_eq!(plan.blocked[0].reason, BlockedReason::NoSafeCandidate);
}
#[test]
fn non_replica_node_is_never_a_candidate() {
let supervisor = ClusterSupervisor::default();
let members = membership(&["CN=node-a", "CN=node-c"]);
let (catalog, _orders) = catalog_with("CN=node-a", &[]);
let signals = FakeSignals::new(CommitWatermark::new(1, 10))
.with_member("CN=node-a", failed_signals())
.with_catch_up("CN=node-c", 9, 999);
let plan = supervisor.plan_failovers(&members, &catalog, &signals);
assert_eq!(plan.blocked.len(), 1, "no replica -> no safe candidate");
assert!(plan.promotions.is_empty());
}
#[test]
fn failed_replica_is_not_promoted() {
let supervisor = ClusterSupervisor::default();
let members = membership(&["CN=node-a", "CN=node-b"]);
let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b"]);
let signals = FakeSignals::new(CommitWatermark::new(1, 10))
.with_member("CN=node-a", failed_signals())
.with_member("CN=node-b", failed_signals())
.with_catch_up("CN=node-b", 1, 10);
let plan = supervisor.plan_failovers(&members, &catalog, &signals);
assert_eq!(plan.blocked.len(), 1);
assert_eq!(plan.blocked[0].reason, BlockedReason::NoSafeCandidate);
}
#[test]
fn healthiest_caught_up_candidate_is_preferred() {
let supervisor = ClusterSupervisor::default();
let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
let signals = FakeSignals::new(CommitWatermark::new(1, 10))
.with_member("CN=node-a", failed_signals())
.with_member(
"CN=node-b",
MemberSignals {
since_last_heartbeat: Duration::from_secs(4),
replication_lag_lsn: 0,
recent_errors: 0,
unhealthy_for: Duration::ZERO,
},
) .with_member("CN=node-c", MemberSignals::healthy())
.with_catch_up("CN=node-b", 1, 10)
.with_catch_up("CN=node-c", 1, 10);
let plan = supervisor.plan_failovers(&members, &catalog, &signals);
assert_eq!(plan.promotions.len(), 1);
assert_eq!(
plan.promotions[0].candidate,
ident("CN=node-c"),
"healthier candidate preferred over identity tie-break",
);
}
#[test]
fn promoted_owner_drops_itself_from_the_replica_set() {
let supervisor = ClusterSupervisor::default();
let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
let signals = FakeSignals::new(CommitWatermark::new(1, 10))
.with_member("CN=node-a", failed_signals())
.with_catch_up("CN=node-b", 1, 10)
.with_catch_up("CN=node-c", 1, 10);
supervisor.run_failovers(&members, &mut catalog, &signals);
let range = catalog.range(&orders, RangeId::new(1)).unwrap();
assert_eq!(range.owner(), &ident("CN=node-b"));
assert!(!range.replicas().contains(&ident("CN=node-b")));
assert!(range.replicas().contains(&ident("CN=node-c")));
assert!(!range.replicas().contains(&ident("CN=node-a")));
}
#[test]
fn assess_members_scores_every_authorized_member() {
let supervisor = ClusterSupervisor::default();
let members = membership(&["CN=node-a", "CN=node-b"]);
let signals = FakeSignals::new(CommitWatermark::new(1, 10))
.with_member("CN=node-a", failed_signals());
let scores = supervisor.assess_members(&members, &signals);
assert_eq!(scores.len(), 2);
assert_eq!(scores[&ident("CN=node-a")].class, HealthClass::Failed);
assert_eq!(scores[&ident("CN=node-b")].class, HealthClass::Healthy);
}
}