reddb_server/cluster/supervisor.rs
1//! Member health scoring and automatic range failover (issue #998, PRD #987,
2//! ADR 0037).
3//!
4//! The **Cluster Supervisor** is the control-plane component that watches the
5//! authorized members ([`MembershipCatalog`]), decides when an owner has failed,
6//! and drives a *safe, fenced* range failover through the one sanctioned path —
7//! the ownership transition state machine ([`super::ownership_transition`]). It
8//! never edits ownership directly: every promotion it proposes is a
9//! [`TransitionRequest`] that the transition machine re-validates (three-part
10//! CAS + commit-watermark safety gate) before it touches the catalog.
11//!
12//! ## Health scoring, not a single short timeout
13//!
14//! A naive supervisor declares a member dead the instant one heartbeat is late.
15//! That is brittle: a single dropped packet, a GC pause, or a brief network
16//! hiccup triggers a needless, disruptive failover (and, under load, a *storm*
17//! of them). Instead the supervisor combines four signals into a
18//! [`HealthScore`]:
19//!
20//! * **Liveness** — time since the last heartbeat. The dominant signal, but not
21//! the only one.
22//! * **Replication lag** — how far behind the range commit watermark the member
23//! is. A live-but-far-behind owner is a poor owner.
24//! * **Recent errors** — observed failures in the recent window.
25//! * **Grace period** — how long the member has been continuously below the
26//! failover threshold. This is the flapping damper: a member that dips and
27//! recovers inside the grace window is never failed over.
28//!
29//! The first three combine into a 0..=100 score (weighted, liveness-heavy);
30//! the score classifies the member as [`Healthy`](HealthClass::Healthy),
31//! [`Degraded`](HealthClass::Degraded), or [`Failed`](HealthClass::Failed). The
32//! grace period then gates the *action*: only a `Failed` owner that has stayed
33//! failed for at least the grace period is eligible for automatic failover.
34//! Together the score and the grace period damp false positives and flapping
35//! (acceptance criteria 1 and 4).
36//!
37//! ## Safe candidate selection
38//!
39//! When an owner is eligible for failover, the supervisor considers **only**
40//! candidates that are (a) current replicas of the range, (b) still authorized
41//! data members, and (c) backed by catch-up evidence that covers the range
42//! commit watermark — exactly the bar the transition machine enforces. An
43//! arbitrary node, a witness, or a replica that has not caught up is never a
44//! promotion target (acceptance criterion 2). Among the safe candidates the
45//! supervisor prefers the healthiest, breaking ties by stable identity order so
46//! the plan is deterministic.
47//!
48//! The selected promotion is a [`TransitionKind::Promote`] request; activating
49//! it bumps the ownership epoch, which fences the failed owner — any write it
50//! still attempts under the old epoch is rejected by
51//! [`admit_public_write`](super::ownership::ShardOwnershipCatalog::admit_public_write)
52//! (acceptance criterion 3).
53//!
54//! ## Purity
55//!
56//! All state the supervisor needs from the running cluster — heartbeat,
57//! lag, error counts, grace tracking, per-range commit watermarks, and
58//! per-candidate catch-up progress — is read through the [`ClusterSignals`]
59//! trait, injected by the caller. The supervisor itself is a pure policy over
60//! the membership and ownership catalogs plus those signals, so the whole
61//! scoring/selection/fencing story is exercised deterministically with a
62//! scripted fake — no clock, no network, no engine.
63
64use std::collections::BTreeMap;
65use std::time::Duration;
66
67use super::identity::NodeIdentity;
68use super::membership::MembershipCatalog;
69use super::ownership::{CollectionId, RangeId, ShardOwnershipCatalog};
70use super::ownership_transition::{
71 run_transition, CatchUpEvidence, CommitWatermark, TransitionError, TransitionKind,
72 TransitionOutcome, TransitionRequest,
73};
74
75/// Raw, point-in-time health signals for one member, read from the running
76/// cluster through [`ClusterSignals::member_signals`]. The supervisor turns
77/// these into a [`HealthScore`]; it owns no clock or counters itself.
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub struct MemberSignals {
80 /// Time since the member's last heartbeat was received (liveness).
81 pub since_last_heartbeat: Duration,
82 /// How many WAL LSNs the member trails the range commit watermark by, as an
83 /// aggregate liveness-of-replication signal. Zero means fully caught up.
84 pub replication_lag_lsn: u64,
85 /// Observed errors from the member in the recent observation window.
86 pub recent_errors: u32,
87 /// How long the member has been *continuously* below the failover
88 /// threshold. Zero for a healthy member; the caller resets it the moment the
89 /// member recovers. This is the grace-period input that damps flapping —
90 /// the supervisor refuses to fail over until it reaches the policy's grace
91 /// period.
92 pub unhealthy_for: Duration,
93}
94
95impl MemberSignals {
96 /// A perfectly healthy member: fresh heartbeat, no lag, no errors, never
97 /// unhealthy. Handy as a test/observation baseline.
98 pub fn healthy() -> Self {
99 Self {
100 since_last_heartbeat: Duration::ZERO,
101 replication_lag_lsn: 0,
102 recent_errors: 0,
103 unhealthy_for: Duration::ZERO,
104 }
105 }
106}
107
108/// How a member's [`HealthScore`] classifies against the policy thresholds.
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum HealthClass {
111 /// Above the degraded threshold — a fully serving member.
112 Healthy,
113 /// Below the degraded threshold but above the failover threshold — observed
114 /// as impaired, but **not** failed over. Surfacing this is what lets an
115 /// operator see trouble building before it becomes an outage.
116 Degraded,
117 /// At or below the failover threshold — a failover candidate *once the grace
118 /// period has elapsed*.
119 Failed,
120}
121
122/// A member's combined health, with the per-axis sub-scores kept visible so an
123/// operator (or a test) can see *why* a member scored as it did rather than
124/// just the verdict.
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub struct HealthScore {
127 /// Combined 0..=100 score (higher is healthier).
128 pub overall: u8,
129 /// Liveness sub-score from the heartbeat age (0..=100).
130 pub liveness: u8,
131 /// Replication-lag sub-score (0..=100).
132 pub lag: u8,
133 /// Recent-error sub-score (0..=100).
134 pub errors: u8,
135 /// The classification the combined score falls into.
136 pub class: HealthClass,
137}
138
139impl HealthScore {
140 pub fn is_healthy(&self) -> bool {
141 self.class == HealthClass::Healthy
142 }
143
144 pub fn is_failed(&self) -> bool {
145 self.class == HealthClass::Failed
146 }
147}
148
149/// The tunables that turn raw signals into a [`HealthScore`] and gate failover.
150///
151/// The defaults ([`HealthPolicy::default`]) are deliberately conservative:
152/// generous enough that an ordinary hiccup does not trip a failover, with a
153/// grace period long enough to ride out a transient blip.
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
155pub struct HealthPolicy {
156 /// Heartbeat age at or beyond which the liveness sub-score bottoms out at 0.
157 pub heartbeat_timeout: Duration,
158 /// Replication lag (LSNs) at or beyond which the lag sub-score bottoms out.
159 pub max_replication_lag: u64,
160 /// Recent-error count at or beyond which the error sub-score bottoms out.
161 pub max_recent_errors: u32,
162 /// Score (inclusive) at or below which a member is [`Failed`](HealthClass::Failed).
163 pub failover_threshold: u8,
164 /// Score (inclusive) at or below which a member is at least
165 /// [`Degraded`](HealthClass::Degraded). Must be `>= failover_threshold`.
166 pub degraded_threshold: u8,
167 /// How long a member must stay continuously `Failed` before the supervisor
168 /// will fail it over. The flapping damper: a shorter blip never triggers a
169 /// transition.
170 pub grace_period: Duration,
171}
172
173impl Default for HealthPolicy {
174 fn default() -> Self {
175 Self {
176 heartbeat_timeout: Duration::from_secs(10),
177 max_replication_lag: 10_000,
178 max_recent_errors: 20,
179 failover_threshold: 30,
180 degraded_threshold: 70,
181 grace_period: Duration::from_secs(30),
182 }
183 }
184}
185
186/// Linear sub-score in 0..=100: full marks at `0`, zero at/after `limit`.
187fn ramp_down(value: f64, limit: f64) -> u8 {
188 if limit <= 0.0 {
189 // No tolerance configured: anything non-zero is a total failure on this
190 // axis, zero is perfect.
191 return if value <= 0.0 { 100 } else { 0 };
192 }
193 let clamped = value.min(limit);
194 (100.0 * (1.0 - clamped / limit)).round() as u8
195}
196
197impl HealthPolicy {
198 /// Combine raw `signals` into a [`HealthScore`] under this policy.
199 ///
200 /// The three serving signals fold into the overall score with a
201 /// liveness-heavy weighting (liveness 70%, lag 20%, errors 10%): a member
202 /// whose heartbeat has fully lapsed must be able to reach the failover
203 /// threshold on liveness alone — a crashed node stops heartbeating but its
204 /// *last-known* lag and error counts may still look fine, so trusting them
205 /// would wedge failover shut. At the same time a live owner that is far
206 /// behind or erroring is penalised rather than trusted blindly, and a
207 /// *brief* heartbeat gap with good lag/errors stays out of the failover band
208 /// (which a single short fixed timeout could not express). The grace-period
209 /// signal (`unhealthy_for`) is *not* part of the score — it gates the
210 /// failover action in [`failover_eligible`](Self::failover_eligible).
211 pub fn evaluate(&self, signals: &MemberSignals) -> HealthScore {
212 let liveness = ramp_down(
213 signals.since_last_heartbeat.as_secs_f64(),
214 self.heartbeat_timeout.as_secs_f64(),
215 );
216 let lag = ramp_down(
217 signals.replication_lag_lsn as f64,
218 self.max_replication_lag as f64,
219 );
220 let errors = ramp_down(signals.recent_errors as f64, self.max_recent_errors as f64);
221
222 let overall =
223 (liveness as f64 * 0.7 + lag as f64 * 0.2 + errors as f64 * 0.1).round() as u8;
224 let class = if overall <= self.failover_threshold {
225 HealthClass::Failed
226 } else if overall <= self.degraded_threshold {
227 HealthClass::Degraded
228 } else {
229 HealthClass::Healthy
230 };
231
232 HealthScore {
233 overall,
234 liveness,
235 lag,
236 errors,
237 class,
238 }
239 }
240
241 /// Whether a member with this `score` and these `signals` is eligible for
242 /// automatic failover: it must be [`Failed`](HealthClass::Failed) **and**
243 /// have stayed unhealthy for at least the grace period. A `Failed` member
244 /// still inside the grace window is held back — the flapping damper.
245 pub fn failover_eligible(&self, score: &HealthScore, signals: &MemberSignals) -> bool {
246 score.is_failed() && signals.unhealthy_for >= self.grace_period
247 }
248}
249
250/// The cluster state the supervisor reads but does not own: per-member health
251/// signals, per-range commit watermarks, and per-candidate catch-up evidence.
252///
253/// Production backs this onto the heartbeat tracker, the replica registry, and
254/// the per-range stream progress (issue #992); tests back it onto a scripted
255/// fake. Keeping it behind a trait is what makes the supervisor a pure policy.
256pub trait ClusterSignals {
257 /// Current raw health signals for `member`.
258 fn member_signals(&self, member: &NodeIdentity) -> MemberSignals;
259
260 /// The range commit watermark a promotion candidate must cover for
261 /// `(collection, range_id)` — the highest `(term, lsn)` known durable under
262 /// the range's commit policy.
263 fn commit_watermark(&self, collection: &CollectionId, range_id: RangeId) -> CommitWatermark;
264
265 /// The catch-up evidence the supervisor has for `candidate` on the range, or
266 /// `None` if the candidate's progress is unknown (in which case it cannot be
267 /// promoted — fail closed).
268 fn catch_up(
269 &self,
270 collection: &CollectionId,
271 range_id: RangeId,
272 candidate: &NodeIdentity,
273 ) -> Option<CatchUpEvidence>;
274}
275
276/// A safe, validated promotion the supervisor proposes for one range: the failed
277/// owner, the chosen caught-up candidate, the owner's health at decision time,
278/// and the [`TransitionRequest`] (already carrying the three-part CAS,
279/// watermark, and catch-up evidence) to run through the transition machine.
280#[derive(Debug, Clone, PartialEq, Eq)]
281pub struct PlannedPromotion {
282 pub collection: CollectionId,
283 pub range_id: RangeId,
284 pub failed_owner: NodeIdentity,
285 pub candidate: NodeIdentity,
286 pub candidate_score: HealthScore,
287 pub owner_score: HealthScore,
288 pub request: TransitionRequest,
289}
290
291/// Why a failing owner's range could **not** be failed over. Surfaced rather
292/// than silently skipped, so an operator can see a range that needs attention.
293#[derive(Debug, Clone, PartialEq, Eq)]
294pub enum BlockedReason {
295 /// The owner is failing but no replica is a safe candidate — none is an
296 /// authorized data member with catch-up evidence covering the commit
297 /// watermark. Failing over here could lose committed writes.
298 NoSafeCandidate,
299}
300
301/// A failing owner's range with no safe failover target.
302#[derive(Debug, Clone, PartialEq, Eq)]
303pub struct BlockedFailover {
304 pub collection: CollectionId,
305 pub range_id: RangeId,
306 pub failed_owner: NodeIdentity,
307 pub owner_score: HealthScore,
308 pub reason: BlockedReason,
309}
310
311/// The supervisor's decision for one scan: the safe promotions to run and the
312/// failing ranges with no safe target. A cluster with all owners healthy yields
313/// an empty plan ([`is_empty`](Self::is_empty)) — the healthy no-op.
314#[derive(Debug, Clone, Default, PartialEq, Eq)]
315pub struct FailoverPlan {
316 /// Safe, ready-to-run promotions, in `(collection, range_id)` order.
317 pub promotions: Vec<PlannedPromotion>,
318 /// Failing ranges that have no safe candidate.
319 pub blocked: Vec<BlockedFailover>,
320}
321
322impl FailoverPlan {
323 /// Nothing to do — every owner is healthy (or degraded-but-not-failed, or
324 /// within its grace period). The healthy no-op the supervisor must produce
325 /// for a stable cluster.
326 pub fn is_empty(&self) -> bool {
327 self.promotions.is_empty() && self.blocked.is_empty()
328 }
329}
330
331/// The cluster supervisor: health scoring + automatic range failover planning.
332///
333/// Holds only the [`HealthPolicy`]; all live state is read through
334/// [`ClusterSignals`] at scan time, so one supervisor instance serves the whole
335/// cluster lifetime.
336#[derive(Debug, Clone, Default)]
337pub struct ClusterSupervisor {
338 policy: HealthPolicy,
339}
340
341impl ClusterSupervisor {
342 /// A supervisor with the given health policy.
343 pub fn new(policy: HealthPolicy) -> Self {
344 Self { policy }
345 }
346
347 pub fn policy(&self) -> &HealthPolicy {
348 &self.policy
349 }
350
351 /// Score a single member's health under the policy. The building block of
352 /// degraded-member detection: an operator surface calls this for every
353 /// authorized member to render a health view.
354 pub fn assess(&self, signals: &MemberSignals) -> HealthScore {
355 self.policy.evaluate(signals)
356 }
357
358 /// Score every authorized member of `membership`, in stable identity order.
359 /// Includes healthy, degraded, and failed members alike — the input to a
360 /// cluster health dashboard.
361 pub fn assess_members(
362 &self,
363 membership: &MembershipCatalog,
364 signals: &impl ClusterSignals,
365 ) -> BTreeMap<NodeIdentity, HealthScore> {
366 membership
367 .members()
368 .map(|m| {
369 let id = m.identity().clone();
370 let score = self.policy.evaluate(&signals.member_signals(&id));
371 (id, score)
372 })
373 .collect()
374 }
375
376 /// Plan automatic failovers across the whole ownership catalog **without**
377 /// mutating it. For each range whose owner is failover-eligible (Failed and
378 /// past the grace period), pick the safest caught-up replica candidate and
379 /// produce a [`PlannedPromotion`]; if no replica is safe, record a
380 /// [`BlockedFailover`]. Owners that are healthy, merely degraded, or still
381 /// inside their grace period produce nothing.
382 pub fn plan_failovers(
383 &self,
384 membership: &MembershipCatalog,
385 ownership: &ShardOwnershipCatalog,
386 signals: &impl ClusterSignals,
387 ) -> FailoverPlan {
388 // entries() yields ranges in (collection, range_id) order, so the plan
389 // is deterministic.
390 let mut plan = FailoverPlan::default();
391
392 for range in ownership.entries() {
393 let owner = range.owner().clone();
394 let owner_signals = signals.member_signals(&owner);
395 let owner_score = self.policy.evaluate(&owner_signals);
396
397 // Healthy or degraded-but-not-failed owners are left alone; a failed
398 // owner still inside its grace period is held back (flapping damper).
399 if !self.policy.failover_eligible(&owner_score, &owner_signals) {
400 continue;
401 }
402
403 let collection = range.collection().clone();
404 let range_id = range.range_id();
405 let watermark = signals.commit_watermark(&collection, range_id);
406
407 // Consider only safe candidates: a current replica, still an
408 // authorized data member, not itself failed, with catch-up evidence
409 // covering the commit watermark.
410 let mut best: Option<(HealthScore, CatchUpEvidence, NodeIdentity)> = None;
411 for candidate in range.replicas() {
412 if !membership
413 .member(candidate)
414 .is_some_and(|m| m.kind().holds_data())
415 {
416 continue;
417 }
418 let cand_score = self.policy.evaluate(&signals.member_signals(candidate));
419 if cand_score.is_failed() {
420 // Promoting a failed replica just moves the outage; skip it.
421 continue;
422 }
423 let Some(evidence) = signals.catch_up(&collection, range_id, candidate) else {
424 continue;
425 };
426 if !evidence.covers(watermark) {
427 // Replica is a copy of the range but has not caught up to the
428 // commit watermark — promoting it could lose committed
429 // writes. This is the unsafe-candidate rejection.
430 continue;
431 }
432
433 // Prefer the healthiest candidate; break ties by stable identity
434 // order for determinism.
435 let better = match &best {
436 None => true,
437 Some((best_score, _, best_id)) => {
438 cand_score.overall > best_score.overall
439 || (cand_score.overall == best_score.overall && candidate < best_id)
440 }
441 };
442 if better {
443 best = Some((cand_score, evidence, candidate.clone()));
444 }
445 }
446
447 match best {
448 Some((candidate_score, evidence, candidate)) => {
449 let request = TransitionRequest::new(
450 TransitionKind::Promote,
451 collection.clone(),
452 range_id,
453 owner.clone(),
454 range.epoch(),
455 range.version(),
456 candidate.clone(),
457 watermark,
458 )
459 .with_evidence(evidence)
460 .with_replicas(remaining_replicas(range.replicas(), &candidate));
461 plan.promotions.push(PlannedPromotion {
462 collection,
463 range_id,
464 failed_owner: owner,
465 candidate,
466 candidate_score,
467 owner_score,
468 request,
469 });
470 }
471 None => plan.blocked.push(BlockedFailover {
472 collection,
473 range_id,
474 failed_owner: owner,
475 owner_score,
476 reason: BlockedReason::NoSafeCandidate,
477 }),
478 }
479 }
480
481 plan
482 }
483
484 /// Plan failovers and immediately run the safe promotions through the
485 /// ownership transition machine, fencing each failed owner via the epoch
486 /// bump. Returns the activated [`TransitionOutcome`]s and the surviving
487 /// [`FailoverPlan`] (whose `blocked` entries still need attention; its
488 /// `promotions` are the requests that were run).
489 ///
490 /// Each promotion is an independent catalog entry, so running them in
491 /// sequence never invalidates another's CAS. A promotion whose CAS lost a
492 /// race (the catalog moved between planning and activation) surfaces as a
493 /// [`TransitionError`] in the returned vector rather than aborting the rest.
494 pub fn run_failovers(
495 &self,
496 membership: &MembershipCatalog,
497 ownership: &mut ShardOwnershipCatalog,
498 signals: &impl ClusterSignals,
499 ) -> (
500 Vec<Result<TransitionOutcome, TransitionError>>,
501 FailoverPlan,
502 ) {
503 let plan = self.plan_failovers(membership, ownership, signals);
504 let outcomes = plan
505 .promotions
506 .iter()
507 .map(|p| run_transition(ownership, &p.request))
508 .collect();
509 (outcomes, plan)
510 }
511}
512
513/// The replica set the new owner carries after promotion: the old replica set
514/// minus the promoted candidate (it becomes owner, not its own replica). The
515/// failed owner is intentionally *not* added back as a replica — it is fenced
516/// and presumed down; the rebalancer re-replicates once it returns or is
517/// replaced.
518fn remaining_replicas(replicas: &[NodeIdentity], promoted: &NodeIdentity) -> Vec<NodeIdentity> {
519 replicas
520 .iter()
521 .filter(|r| *r != promoted)
522 .cloned()
523 .collect()
524}
525
526#[cfg(test)]
527mod tests {
528 use super::*;
529 use crate::cluster::membership::{ClusterId, ClusterMember, MemberKind};
530 use crate::cluster::ownership::{CatalogVersion, OwnershipEpoch};
531 use crate::cluster::ownership::{
532 PlacementMetadata, RangeBounds, RangeOwnership, RangeRole, RangeWriteReject, ShardKeyMode,
533 };
534 use std::collections::HashMap;
535
536 fn ident(cn: &str) -> NodeIdentity {
537 NodeIdentity::from_certificate_subject(cn).unwrap()
538 }
539
540 fn collection(name: &str) -> CollectionId {
541 CollectionId::new(name).unwrap()
542 }
543
544 fn data_member(cn: &str) -> ClusterMember {
545 ClusterMember::joined_empty(ident(cn), MemberKind::Data)
546 }
547
548 fn membership(members: &[&str]) -> MembershipCatalog {
549 MembershipCatalog::new(
550 ClusterId::new("cluster-x").unwrap(),
551 members.iter().map(|m| data_member(m)),
552 )
553 }
554
555 /// A catalog with one full-keyspace range `orders/1` owned by `owner` with
556 /// `replicas`, at the initial epoch/version.
557 fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
558 let orders = collection("orders");
559 let mut catalog = ShardOwnershipCatalog::new();
560 catalog
561 .apply_update(RangeOwnership::establish(
562 orders.clone(),
563 RangeId::new(1),
564 ShardKeyMode::Hash,
565 RangeBounds::full(),
566 ident(owner),
567 replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
568 PlacementMetadata::with_replication_factor(3),
569 ))
570 .unwrap();
571 (catalog, orders)
572 }
573
574 /// A scripted [`ClusterSignals`]: per-member signals, one shared watermark,
575 /// and per-(range,candidate) catch-up evidence keyed by candidate CN.
576 struct FakeSignals {
577 members: HashMap<NodeIdentity, MemberSignals>,
578 watermark: CommitWatermark,
579 catch_up: HashMap<NodeIdentity, CatchUpEvidence>,
580 }
581
582 impl FakeSignals {
583 fn new(watermark: CommitWatermark) -> Self {
584 Self {
585 members: HashMap::new(),
586 watermark,
587 catch_up: HashMap::new(),
588 }
589 }
590
591 fn with_member(mut self, cn: &str, signals: MemberSignals) -> Self {
592 self.members.insert(ident(cn), signals);
593 self
594 }
595
596 fn with_catch_up(mut self, cn: &str, applied_term: u64, applied_lsn: u64) -> Self {
597 self.catch_up.insert(
598 ident(cn),
599 CatchUpEvidence::new(ident(cn), applied_term, applied_lsn),
600 );
601 self
602 }
603 }
604
605 impl ClusterSignals for FakeSignals {
606 fn member_signals(&self, member: &NodeIdentity) -> MemberSignals {
607 self.members
608 .get(member)
609 .copied()
610 .unwrap_or_else(MemberSignals::healthy)
611 }
612
613 fn commit_watermark(
614 &self,
615 _collection: &CollectionId,
616 _range_id: RangeId,
617 ) -> CommitWatermark {
618 self.watermark
619 }
620
621 fn catch_up(
622 &self,
623 _collection: &CollectionId,
624 _range_id: RangeId,
625 candidate: &NodeIdentity,
626 ) -> Option<CatchUpEvidence> {
627 self.catch_up.get(candidate).cloned()
628 }
629 }
630
631 /// Signals for a failed-and-past-grace owner: no heartbeat for a long time,
632 /// well over the default grace period.
633 fn failed_signals() -> MemberSignals {
634 MemberSignals {
635 since_last_heartbeat: Duration::from_secs(60),
636 replication_lag_lsn: 50_000,
637 recent_errors: 100,
638 unhealthy_for: Duration::from_secs(60),
639 }
640 }
641
642 // --- health scoring ---------------------------------------------------
643
644 #[test]
645 fn fresh_member_scores_perfectly_healthy() {
646 let policy = HealthPolicy::default();
647 let score = policy.evaluate(&MemberSignals::healthy());
648 assert_eq!(score.overall, 100);
649 assert_eq!(score.class, HealthClass::Healthy);
650 }
651
652 #[test]
653 fn score_combines_signals_not_just_a_timeout() {
654 // A member with a *brief* heartbeat gap but good lag and no errors should
655 // not be treated as dead the way a short fixed timeout would. Its liveness
656 // sub-score dips, but lag/errors keep the overall in the Healthy band.
657 let policy = HealthPolicy::default();
658 let signals = MemberSignals {
659 since_last_heartbeat: Duration::from_secs(2), // 1/5 of the 10s timeout
660 replication_lag_lsn: 0,
661 recent_errors: 0,
662 unhealthy_for: Duration::ZERO,
663 };
664 let score = policy.evaluate(&signals);
665 assert_eq!(score.liveness, 80, "heartbeat at 1/5 of the timeout");
666 assert_eq!(score.lag, 100);
667 assert_eq!(score.errors, 100);
668 // overall = 0.7*80 + 0.2*100 + 0.1*100 = 56 + 20 + 10 = 86 -> Healthy.
669 // A 2s fixed timeout would have declared this member dead.
670 assert_eq!(score.overall, 86);
671 assert_eq!(score.class, HealthClass::Healthy);
672 }
673
674 #[test]
675 fn lag_and_errors_pull_a_live_member_into_degraded() {
676 // A member that is heartbeating fine but far behind and erroring is
677 // penalised — a single timeout would have called it perfectly healthy.
678 let policy = HealthPolicy::default();
679 let signals = MemberSignals {
680 since_last_heartbeat: Duration::ZERO,
681 replication_lag_lsn: 10_000, // at the cap -> lag sub-score 0
682 recent_errors: 20, // at the cap -> error sub-score 0
683 unhealthy_for: Duration::ZERO,
684 };
685 let score = policy.evaluate(&signals);
686 // overall = 0.7*100 + 0.2*0 + 0.1*0 = 70 -> Degraded (<= degraded threshold).
687 assert_eq!(score.overall, 70);
688 assert_eq!(score.class, HealthClass::Degraded);
689 }
690
691 #[test]
692 fn dead_heartbeat_alone_reaches_failed() {
693 // A crashed node stops heartbeating; even if its last-known lag/errors
694 // look perfect, liveness alone must carry it to the failover band — else
695 // the most common failure (a clean crash) would never fail over.
696 let policy = HealthPolicy::default();
697 let signals = MemberSignals {
698 since_last_heartbeat: Duration::from_secs(30),
699 replication_lag_lsn: 0,
700 recent_errors: 0,
701 unhealthy_for: Duration::from_secs(30),
702 };
703 let score = policy.evaluate(&signals);
704 assert_eq!(score.liveness, 0);
705 // overall = 0.7*0 + 0.2*100 + 0.1*100 = 30 -> Failed (<= failover threshold).
706 assert_eq!(score.overall, 30);
707 assert_eq!(score.class, HealthClass::Failed);
708 }
709
710 #[test]
711 fn totally_unreachable_member_is_failed() {
712 // A member we cannot reach reports a dead heartbeat *and* growing lag and
713 // errors — every axis bottoms out, so it lands well under the failover
714 // threshold.
715 let policy = HealthPolicy::default();
716 let signals = MemberSignals {
717 since_last_heartbeat: Duration::from_secs(30),
718 replication_lag_lsn: 50_000,
719 recent_errors: 100,
720 unhealthy_for: Duration::from_secs(30),
721 };
722 let score = policy.evaluate(&signals);
723 assert_eq!(score.overall, 0);
724 assert_eq!(score.class, HealthClass::Failed);
725 }
726
727 // --- failover planning: the five acceptance scenarios -----------------
728
729 #[test]
730 fn healthy_cluster_is_a_no_op() {
731 let supervisor = ClusterSupervisor::default();
732 let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
733 let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
734 // All members healthy by default.
735 let signals = FakeSignals::new(CommitWatermark::new(1, 10));
736
737 let plan = supervisor.plan_failovers(&members, &catalog, &signals);
738 assert!(plan.is_empty(), "no failover when every owner is healthy");
739 }
740
741 #[test]
742 fn degraded_owner_is_detected_but_not_failed_over() {
743 // node-a is degraded (live, but lagging+erroring) — observable, but the
744 // supervisor must not move ownership for a merely-degraded owner.
745 let supervisor = ClusterSupervisor::default();
746 let members = membership(&["CN=node-a", "CN=node-b"]);
747 let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b"]);
748 let signals = FakeSignals::new(CommitWatermark::new(1, 10)).with_member(
749 "CN=node-a",
750 MemberSignals {
751 since_last_heartbeat: Duration::ZERO,
752 replication_lag_lsn: 10_000,
753 recent_errors: 20,
754 unhealthy_for: Duration::ZERO,
755 },
756 );
757
758 let score = supervisor.assess(&signals.member_signals(&ident("CN=node-a")));
759 assert_eq!(score.class, HealthClass::Degraded, "detected as degraded");
760
761 let plan = supervisor.plan_failovers(&members, &catalog, &signals);
762 assert!(plan.is_empty(), "a degraded owner is not failed over");
763 }
764
765 #[test]
766 fn safe_candidate_is_promoted_and_old_owner_is_fenced() {
767 let supervisor = ClusterSupervisor::default();
768 let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
769 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
770 let signals = FakeSignals::new(CommitWatermark::new(1, 10))
771 .with_member("CN=node-a", failed_signals())
772 // node-b is healthy and fully caught up; node-c is caught up too but
773 // we expect node-b chosen on identity tie-break.
774 .with_catch_up("CN=node-b", 1, 10)
775 .with_catch_up("CN=node-c", 1, 10);
776
777 let (outcomes, plan) = supervisor.run_failovers(&members, &mut catalog, &signals);
778 assert_eq!(plan.promotions.len(), 1);
779 assert!(plan.blocked.is_empty());
780 let promotion = &plan.promotions[0];
781 assert_eq!(promotion.failed_owner, ident("CN=node-a"));
782 assert_eq!(
783 promotion.candidate,
784 ident("CN=node-b"),
785 "healthiest, tie -> lowest id"
786 );
787
788 let outcome = outcomes[0].as_ref().expect("promotion should activate");
789 assert_eq!(outcome.kind, TransitionKind::Promote);
790 assert!(
791 outcome.fenced_old_owner(),
792 "epoch bumped to fence old owner"
793 );
794 assert_eq!(outcome.new_owner, ident("CN=node-b"));
795
796 // The catalog now makes node-b the owner at the bumped epoch, and the old
797 // owner node-a is fenced from public writes under the old epoch.
798 let range = catalog.range(&orders, RangeId::new(1)).unwrap();
799 assert_eq!(range.owner(), &ident("CN=node-b"));
800 assert_eq!(range.role_of(&ident("CN=node-b")), RangeRole::Owner);
801 let err = catalog
802 .admit_public_write(
803 &ident("CN=node-a"),
804 &orders,
805 b"k",
806 OwnershipEpoch::initial(),
807 )
808 .unwrap_err();
809 assert!(matches!(
810 err,
811 RangeWriteReject::NotOwner { .. } | RangeWriteReject::StaleEpoch { .. }
812 ));
813 }
814
815 #[test]
816 fn unsafe_candidate_behind_watermark_is_rejected() {
817 // node-a failed; its only replica node-b is a copy of the range but has
818 // NOT caught up to the commit watermark (term 2 lsn 50 vs applied 2/49).
819 // Promoting it could lose committed writes, so failover is blocked and
820 // the catalog is untouched.
821 let supervisor = ClusterSupervisor::default();
822 let members = membership(&["CN=node-a", "CN=node-b"]);
823 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
824 let signals = FakeSignals::new(CommitWatermark::new(2, 50))
825 .with_member("CN=node-a", failed_signals())
826 .with_catch_up("CN=node-b", 2, 49); // one LSN short
827
828 let (outcomes, plan) = supervisor.run_failovers(&members, &mut catalog, &signals);
829 assert!(plan.promotions.is_empty(), "no safe promotion");
830 assert!(outcomes.is_empty());
831 assert_eq!(plan.blocked.len(), 1);
832 assert_eq!(plan.blocked[0].reason, BlockedReason::NoSafeCandidate);
833 assert_eq!(plan.blocked[0].failed_owner, ident("CN=node-a"));
834
835 // Catalog is unchanged — node-a still owner at the initial epoch.
836 let range = catalog.range(&orders, RangeId::new(1)).unwrap();
837 assert_eq!(range.owner(), &ident("CN=node-a"));
838 assert_eq!(range.epoch(), OwnershipEpoch::initial());
839 assert_eq!(range.version(), CatalogVersion::initial());
840 }
841
842 #[test]
843 fn flapping_owner_within_grace_period_is_not_failed_over() {
844 // node-a's score is Failed, but it has only been unhealthy for 2s — well
845 // inside the default 30s grace period. A flap must not move ownership.
846 let supervisor = ClusterSupervisor::default();
847 let members = membership(&["CN=node-a", "CN=node-b"]);
848 let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b"]);
849 let signals = FakeSignals::new(CommitWatermark::new(1, 10))
850 .with_member(
851 "CN=node-a",
852 MemberSignals {
853 since_last_heartbeat: Duration::from_secs(30),
854 replication_lag_lsn: 50_000,
855 recent_errors: 100,
856 unhealthy_for: Duration::from_secs(2), // inside grace
857 },
858 )
859 .with_catch_up("CN=node-b", 1, 10);
860
861 // The owner *is* scored Failed...
862 let score = supervisor.assess(&signals.member_signals(&ident("CN=node-a")));
863 assert_eq!(score.class, HealthClass::Failed);
864 // ...but the grace period holds the failover back.
865 let plan = supervisor.plan_failovers(&members, &catalog, &signals);
866 assert!(plan.is_empty(), "flap inside grace period is damped");
867 }
868
869 #[test]
870 fn unknown_candidate_progress_blocks_failover() {
871 // node-a failed; node-b is a replica and a member, but the supervisor has
872 // no catch-up evidence for it. Fail closed: blocked, not promoted.
873 let supervisor = ClusterSupervisor::default();
874 let members = membership(&["CN=node-a", "CN=node-b"]);
875 let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b"]);
876 let signals = FakeSignals::new(CommitWatermark::new(1, 10))
877 .with_member("CN=node-a", failed_signals());
878
879 let plan = supervisor.plan_failovers(&members, &catalog, &signals);
880 assert_eq!(plan.blocked.len(), 1);
881 assert_eq!(plan.blocked[0].reason, BlockedReason::NoSafeCandidate);
882 }
883
884 #[test]
885 fn non_replica_node_is_never_a_candidate() {
886 // node-a failed and has NO replicas for the range. node-c is a healthy,
887 // caught-up member — but it is not a replica, so it is never considered.
888 let supervisor = ClusterSupervisor::default();
889 let members = membership(&["CN=node-a", "CN=node-c"]);
890 let (catalog, _orders) = catalog_with("CN=node-a", &[]);
891 let signals = FakeSignals::new(CommitWatermark::new(1, 10))
892 .with_member("CN=node-a", failed_signals())
893 .with_catch_up("CN=node-c", 9, 999);
894
895 let plan = supervisor.plan_failovers(&members, &catalog, &signals);
896 assert_eq!(plan.blocked.len(), 1, "no replica -> no safe candidate");
897 assert!(plan.promotions.is_empty());
898 }
899
900 #[test]
901 fn failed_replica_is_not_promoted() {
902 // node-a failed; node-b is a caught-up replica but is itself failed.
903 // Promoting it would just move the outage, so it is not selected.
904 let supervisor = ClusterSupervisor::default();
905 let members = membership(&["CN=node-a", "CN=node-b"]);
906 let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b"]);
907 let signals = FakeSignals::new(CommitWatermark::new(1, 10))
908 .with_member("CN=node-a", failed_signals())
909 .with_member("CN=node-b", failed_signals())
910 .with_catch_up("CN=node-b", 1, 10);
911
912 let plan = supervisor.plan_failovers(&members, &catalog, &signals);
913 assert_eq!(plan.blocked.len(), 1);
914 assert_eq!(plan.blocked[0].reason, BlockedReason::NoSafeCandidate);
915 }
916
917 #[test]
918 fn healthiest_caught_up_candidate_is_preferred() {
919 // Both replicas are caught up, but node-c is healthier than node-b, so it
920 // wins despite node-b sorting first by identity.
921 let supervisor = ClusterSupervisor::default();
922 let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
923 let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
924 let signals = FakeSignals::new(CommitWatermark::new(1, 10))
925 .with_member("CN=node-a", failed_signals())
926 .with_member(
927 "CN=node-b",
928 MemberSignals {
929 since_last_heartbeat: Duration::from_secs(4),
930 replication_lag_lsn: 0,
931 recent_errors: 0,
932 unhealthy_for: Duration::ZERO,
933 },
934 ) // degraded-ish liveness, lower score
935 .with_member("CN=node-c", MemberSignals::healthy())
936 .with_catch_up("CN=node-b", 1, 10)
937 .with_catch_up("CN=node-c", 1, 10);
938
939 let plan = supervisor.plan_failovers(&members, &catalog, &signals);
940 assert_eq!(plan.promotions.len(), 1);
941 assert_eq!(
942 plan.promotions[0].candidate,
943 ident("CN=node-c"),
944 "healthier candidate preferred over identity tie-break",
945 );
946 }
947
948 #[test]
949 fn promoted_owner_drops_itself_from_the_replica_set() {
950 let supervisor = ClusterSupervisor::default();
951 let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
952 let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
953 let signals = FakeSignals::new(CommitWatermark::new(1, 10))
954 .with_member("CN=node-a", failed_signals())
955 .with_catch_up("CN=node-b", 1, 10)
956 .with_catch_up("CN=node-c", 1, 10);
957
958 supervisor.run_failovers(&members, &mut catalog, &signals);
959 let range = catalog.range(&orders, RangeId::new(1)).unwrap();
960 assert_eq!(range.owner(), &ident("CN=node-b"));
961 // node-b is no longer in its own replica set; node-c remains; the fenced
962 // old owner node-a is not re-added.
963 assert!(!range.replicas().contains(&ident("CN=node-b")));
964 assert!(range.replicas().contains(&ident("CN=node-c")));
965 assert!(!range.replicas().contains(&ident("CN=node-a")));
966 }
967
968 #[test]
969 fn assess_members_scores_every_authorized_member() {
970 let supervisor = ClusterSupervisor::default();
971 let members = membership(&["CN=node-a", "CN=node-b"]);
972 let signals = FakeSignals::new(CommitWatermark::new(1, 10))
973 .with_member("CN=node-a", failed_signals());
974
975 let scores = supervisor.assess_members(&members, &signals);
976 assert_eq!(scores.len(), 2);
977 assert_eq!(scores[&ident("CN=node-a")].class, HealthClass::Failed);
978 assert_eq!(scores[&ident("CN=node-b")].class, HealthClass::Healthy);
979 }
980}