Skip to main content

reddb_server/cluster/
supervisor.rs

1//! Member health scoring and automatic range failover (issue #998, PRD #987,
2//! ADR 0037).
3//!
4//! The **Cluster Supervisor** is the control-plane component that watches the
5//! authorized members ([`MembershipCatalog`]), decides when an owner has failed,
6//! and drives a *safe, fenced* range failover through the one sanctioned path —
7//! the ownership transition state machine ([`super::ownership_transition`]). It
8//! never edits ownership directly: every promotion it proposes is a
9//! [`TransitionRequest`] that the transition machine re-validates (three-part
10//! CAS + commit-watermark safety gate) before it touches the catalog.
11//!
12//! ## Health scoring, not a single short timeout
13//!
14//! A naive supervisor declares a member dead the instant one heartbeat is late.
15//! That is brittle: a single dropped packet, a GC pause, or a brief network
16//! hiccup triggers a needless, disruptive failover (and, under load, a *storm*
17//! of them). Instead the supervisor combines four signals into a
18//! [`HealthScore`]:
19//!
20//! * **Liveness** — time since the last heartbeat. The dominant signal, but not
21//!   the only one.
22//! * **Replication lag** — how far behind the range commit watermark the member
23//!   is. A live-but-far-behind owner is a poor owner.
24//! * **Recent errors** — observed failures in the recent window.
25//! * **Grace period** — how long the member has been continuously below the
26//!   failover threshold. This is the flapping damper: a member that dips and
27//!   recovers inside the grace window is never failed over.
28//!
29//! The first three combine into a 0..=100 score (weighted, liveness-heavy);
30//! the score classifies the member as [`Healthy`](HealthClass::Healthy),
31//! [`Degraded`](HealthClass::Degraded), or [`Failed`](HealthClass::Failed). The
32//! grace period then gates the *action*: only a `Failed` owner that has stayed
33//! failed for at least the grace period is eligible for automatic failover.
34//! Together the score and the grace period damp false positives and flapping
35//! (acceptance criteria 1 and 4).
36//!
37//! ## Safe candidate selection
38//!
39//! When an owner is eligible for failover, the supervisor considers **only**
40//! candidates that are (a) current replicas of the range, (b) still authorized
41//! data members, and (c) backed by catch-up evidence that covers the range
42//! commit watermark — exactly the bar the transition machine enforces. An
43//! arbitrary node, a witness, or a replica that has not caught up is never a
44//! promotion target (acceptance criterion 2). Among the safe candidates the
45//! supervisor prefers the healthiest, breaking ties by stable identity order so
46//! the plan is deterministic.
47//!
48//! The selected promotion is a [`TransitionKind::Promote`] request; activating
49//! it bumps the ownership epoch, which fences the failed owner — any write it
50//! still attempts under the old epoch is rejected by
51//! [`admit_public_write`](super::ownership::ShardOwnershipCatalog::admit_public_write)
52//! (acceptance criterion 3).
53//!
54//! ## Purity
55//!
56//! All state the supervisor needs from the running cluster — heartbeat,
57//! lag, error counts, grace tracking, per-range commit watermarks, and
58//! per-candidate catch-up progress — is read through the [`ClusterSignals`]
59//! trait, injected by the caller. The supervisor itself is a pure policy over
60//! the membership and ownership catalogs plus those signals, so the whole
61//! scoring/selection/fencing story is exercised deterministically with a
62//! scripted fake — no clock, no network, no engine.
63
64use std::collections::BTreeMap;
65use std::time::Duration;
66
67use super::identity::NodeIdentity;
68use super::membership::MembershipCatalog;
69use super::ownership::{CollectionId, RangeId, ShardOwnershipCatalog};
70use super::ownership_transition::{
71    run_transition, CatchUpEvidence, CommitWatermark, TransitionError, TransitionKind,
72    TransitionOutcome, TransitionRequest,
73};
74
75/// Raw, point-in-time health signals for one member, read from the running
76/// cluster through [`ClusterSignals::member_signals`]. The supervisor turns
77/// these into a [`HealthScore`]; it owns no clock or counters itself.
78#[derive(Debug, Clone, Copy, PartialEq, Eq)]
79pub struct MemberSignals {
80    /// Time since the member's last heartbeat was received (liveness).
81    pub since_last_heartbeat: Duration,
82    /// How many WAL LSNs the member trails the range commit watermark by, as an
83    /// aggregate liveness-of-replication signal. Zero means fully caught up.
84    pub replication_lag_lsn: u64,
85    /// Observed errors from the member in the recent observation window.
86    pub recent_errors: u32,
87    /// How long the member has been *continuously* below the failover
88    /// threshold. Zero for a healthy member; the caller resets it the moment the
89    /// member recovers. This is the grace-period input that damps flapping —
90    /// the supervisor refuses to fail over until it reaches the policy's grace
91    /// period.
92    pub unhealthy_for: Duration,
93}
94
95impl MemberSignals {
96    /// A perfectly healthy member: fresh heartbeat, no lag, no errors, never
97    /// unhealthy. Handy as a test/observation baseline.
98    pub fn healthy() -> Self {
99        Self {
100            since_last_heartbeat: Duration::ZERO,
101            replication_lag_lsn: 0,
102            recent_errors: 0,
103            unhealthy_for: Duration::ZERO,
104        }
105    }
106}
107
108/// How a member's [`HealthScore`] classifies against the policy thresholds.
109#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum HealthClass {
111    /// Above the degraded threshold — a fully serving member.
112    Healthy,
113    /// Below the degraded threshold but above the failover threshold — observed
114    /// as impaired, but **not** failed over. Surfacing this is what lets an
115    /// operator see trouble building before it becomes an outage.
116    Degraded,
117    /// At or below the failover threshold — a failover candidate *once the grace
118    /// period has elapsed*.
119    Failed,
120}
121
122/// A member's combined health, with the per-axis sub-scores kept visible so an
123/// operator (or a test) can see *why* a member scored as it did rather than
124/// just the verdict.
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub struct HealthScore {
127    /// Combined 0..=100 score (higher is healthier).
128    pub overall: u8,
129    /// Liveness sub-score from the heartbeat age (0..=100).
130    pub liveness: u8,
131    /// Replication-lag sub-score (0..=100).
132    pub lag: u8,
133    /// Recent-error sub-score (0..=100).
134    pub errors: u8,
135    /// The classification the combined score falls into.
136    pub class: HealthClass,
137}
138
139impl HealthScore {
140    pub fn is_healthy(&self) -> bool {
141        self.class == HealthClass::Healthy
142    }
143
144    pub fn is_failed(&self) -> bool {
145        self.class == HealthClass::Failed
146    }
147}
148
149/// The tunables that turn raw signals into a [`HealthScore`] and gate failover.
150///
151/// The defaults ([`HealthPolicy::default`]) are deliberately conservative:
152/// generous enough that an ordinary hiccup does not trip a failover, with a
153/// grace period long enough to ride out a transient blip.
154#[derive(Debug, Clone, Copy, PartialEq, Eq)]
155pub struct HealthPolicy {
156    /// Heartbeat age at or beyond which the liveness sub-score bottoms out at 0.
157    pub heartbeat_timeout: Duration,
158    /// Replication lag (LSNs) at or beyond which the lag sub-score bottoms out.
159    pub max_replication_lag: u64,
160    /// Recent-error count at or beyond which the error sub-score bottoms out.
161    pub max_recent_errors: u32,
162    /// Score (inclusive) at or below which a member is [`Failed`](HealthClass::Failed).
163    pub failover_threshold: u8,
164    /// Score (inclusive) at or below which a member is at least
165    /// [`Degraded`](HealthClass::Degraded). Must be `>= failover_threshold`.
166    pub degraded_threshold: u8,
167    /// How long a member must stay continuously `Failed` before the supervisor
168    /// will fail it over. The flapping damper: a shorter blip never triggers a
169    /// transition.
170    pub grace_period: Duration,
171}
172
173impl Default for HealthPolicy {
174    fn default() -> Self {
175        Self {
176            heartbeat_timeout: Duration::from_secs(10),
177            max_replication_lag: 10_000,
178            max_recent_errors: 20,
179            failover_threshold: 30,
180            degraded_threshold: 70,
181            grace_period: Duration::from_secs(30),
182        }
183    }
184}
185
186/// Linear sub-score in 0..=100: full marks at `0`, zero at/after `limit`.
187fn ramp_down(value: f64, limit: f64) -> u8 {
188    if limit <= 0.0 {
189        // No tolerance configured: anything non-zero is a total failure on this
190        // axis, zero is perfect.
191        return if value <= 0.0 { 100 } else { 0 };
192    }
193    let clamped = value.min(limit);
194    (100.0 * (1.0 - clamped / limit)).round() as u8
195}
196
197impl HealthPolicy {
198    /// Combine raw `signals` into a [`HealthScore`] under this policy.
199    ///
200    /// The three serving signals fold into the overall score with a
201    /// liveness-heavy weighting (liveness 70%, lag 20%, errors 10%): a member
202    /// whose heartbeat has fully lapsed must be able to reach the failover
203    /// threshold on liveness alone — a crashed node stops heartbeating but its
204    /// *last-known* lag and error counts may still look fine, so trusting them
205    /// would wedge failover shut. At the same time a live owner that is far
206    /// behind or erroring is penalised rather than trusted blindly, and a
207    /// *brief* heartbeat gap with good lag/errors stays out of the failover band
208    /// (which a single short fixed timeout could not express). The grace-period
209    /// signal (`unhealthy_for`) is *not* part of the score — it gates the
210    /// failover action in [`failover_eligible`](Self::failover_eligible).
211    pub fn evaluate(&self, signals: &MemberSignals) -> HealthScore {
212        let liveness = ramp_down(
213            signals.since_last_heartbeat.as_secs_f64(),
214            self.heartbeat_timeout.as_secs_f64(),
215        );
216        let lag = ramp_down(
217            signals.replication_lag_lsn as f64,
218            self.max_replication_lag as f64,
219        );
220        let errors = ramp_down(signals.recent_errors as f64, self.max_recent_errors as f64);
221
222        let overall =
223            (liveness as f64 * 0.7 + lag as f64 * 0.2 + errors as f64 * 0.1).round() as u8;
224        let class = if overall <= self.failover_threshold {
225            HealthClass::Failed
226        } else if overall <= self.degraded_threshold {
227            HealthClass::Degraded
228        } else {
229            HealthClass::Healthy
230        };
231
232        HealthScore {
233            overall,
234            liveness,
235            lag,
236            errors,
237            class,
238        }
239    }
240
241    /// Whether a member with this `score` and these `signals` is eligible for
242    /// automatic failover: it must be [`Failed`](HealthClass::Failed) **and**
243    /// have stayed unhealthy for at least the grace period. A `Failed` member
244    /// still inside the grace window is held back — the flapping damper.
245    pub fn failover_eligible(&self, score: &HealthScore, signals: &MemberSignals) -> bool {
246        score.is_failed() && signals.unhealthy_for >= self.grace_period
247    }
248}
249
250/// The cluster state the supervisor reads but does not own: per-member health
251/// signals, per-range commit watermarks, and per-candidate catch-up evidence.
252///
253/// Production backs this onto the heartbeat tracker, the replica registry, and
254/// the per-range stream progress (issue #992); tests back it onto a scripted
255/// fake. Keeping it behind a trait is what makes the supervisor a pure policy.
256pub trait ClusterSignals {
257    /// Current raw health signals for `member`.
258    fn member_signals(&self, member: &NodeIdentity) -> MemberSignals;
259
260    /// The range commit watermark a promotion candidate must cover for
261    /// `(collection, range_id)` — the highest `(term, lsn)` known durable under
262    /// the range's commit policy.
263    fn commit_watermark(&self, collection: &CollectionId, range_id: RangeId) -> CommitWatermark;
264
265    /// The catch-up evidence the supervisor has for `candidate` on the range, or
266    /// `None` if the candidate's progress is unknown (in which case it cannot be
267    /// promoted — fail closed).
268    fn catch_up(
269        &self,
270        collection: &CollectionId,
271        range_id: RangeId,
272        candidate: &NodeIdentity,
273    ) -> Option<CatchUpEvidence>;
274}
275
276/// A safe, validated promotion the supervisor proposes for one range: the failed
277/// owner, the chosen caught-up candidate, the owner's health at decision time,
278/// and the [`TransitionRequest`] (already carrying the three-part CAS,
279/// watermark, and catch-up evidence) to run through the transition machine.
280#[derive(Debug, Clone, PartialEq, Eq)]
281pub struct PlannedPromotion {
282    pub collection: CollectionId,
283    pub range_id: RangeId,
284    pub failed_owner: NodeIdentity,
285    pub candidate: NodeIdentity,
286    pub candidate_score: HealthScore,
287    pub owner_score: HealthScore,
288    pub request: TransitionRequest,
289}
290
291/// Why a failing owner's range could **not** be failed over. Surfaced rather
292/// than silently skipped, so an operator can see a range that needs attention.
293#[derive(Debug, Clone, PartialEq, Eq)]
294pub enum BlockedReason {
295    /// The owner is failing but no replica is a safe candidate — none is an
296    /// authorized data member with catch-up evidence covering the commit
297    /// watermark. Failing over here could lose committed writes.
298    NoSafeCandidate,
299}
300
301/// A failing owner's range with no safe failover target.
302#[derive(Debug, Clone, PartialEq, Eq)]
303pub struct BlockedFailover {
304    pub collection: CollectionId,
305    pub range_id: RangeId,
306    pub failed_owner: NodeIdentity,
307    pub owner_score: HealthScore,
308    pub reason: BlockedReason,
309}
310
311/// The supervisor's decision for one scan: the safe promotions to run and the
312/// failing ranges with no safe target. A cluster with all owners healthy yields
313/// an empty plan ([`is_empty`](Self::is_empty)) — the healthy no-op.
314#[derive(Debug, Clone, Default, PartialEq, Eq)]
315pub struct FailoverPlan {
316    /// Safe, ready-to-run promotions, in `(collection, range_id)` order.
317    pub promotions: Vec<PlannedPromotion>,
318    /// Failing ranges that have no safe candidate.
319    pub blocked: Vec<BlockedFailover>,
320}
321
322impl FailoverPlan {
323    /// Nothing to do — every owner is healthy (or degraded-but-not-failed, or
324    /// within its grace period). The healthy no-op the supervisor must produce
325    /// for a stable cluster.
326    pub fn is_empty(&self) -> bool {
327        self.promotions.is_empty() && self.blocked.is_empty()
328    }
329}
330
331/// The cluster supervisor: health scoring + automatic range failover planning.
332///
333/// Holds only the [`HealthPolicy`]; all live state is read through
334/// [`ClusterSignals`] at scan time, so one supervisor instance serves the whole
335/// cluster lifetime.
336#[derive(Debug, Clone, Default)]
337pub struct ClusterSupervisor {
338    policy: HealthPolicy,
339}
340
341impl ClusterSupervisor {
342    /// A supervisor with the given health policy.
343    pub fn new(policy: HealthPolicy) -> Self {
344        Self { policy }
345    }
346
347    pub fn policy(&self) -> &HealthPolicy {
348        &self.policy
349    }
350
351    /// Score a single member's health under the policy. The building block of
352    /// degraded-member detection: an operator surface calls this for every
353    /// authorized member to render a health view.
354    pub fn assess(&self, signals: &MemberSignals) -> HealthScore {
355        self.policy.evaluate(signals)
356    }
357
358    /// Score every authorized member of `membership`, in stable identity order.
359    /// Includes healthy, degraded, and failed members alike — the input to a
360    /// cluster health dashboard.
361    pub fn assess_members(
362        &self,
363        membership: &MembershipCatalog,
364        signals: &impl ClusterSignals,
365    ) -> BTreeMap<NodeIdentity, HealthScore> {
366        membership
367            .members()
368            .map(|m| {
369                let id = m.identity().clone();
370                let score = self.policy.evaluate(&signals.member_signals(&id));
371                (id, score)
372            })
373            .collect()
374    }
375
376    /// Plan automatic failovers across the whole ownership catalog **without**
377    /// mutating it. For each range whose owner is failover-eligible (Failed and
378    /// past the grace period), pick the safest caught-up replica candidate and
379    /// produce a [`PlannedPromotion`]; if no replica is safe, record a
380    /// [`BlockedFailover`]. Owners that are healthy, merely degraded, or still
381    /// inside their grace period produce nothing.
382    pub fn plan_failovers(
383        &self,
384        membership: &MembershipCatalog,
385        ownership: &ShardOwnershipCatalog,
386        signals: &impl ClusterSignals,
387    ) -> FailoverPlan {
388        // entries() yields ranges in (collection, range_id) order, so the plan
389        // is deterministic.
390        let mut plan = FailoverPlan::default();
391
392        for range in ownership.entries() {
393            let owner = range.owner().clone();
394            let owner_signals = signals.member_signals(&owner);
395            let owner_score = self.policy.evaluate(&owner_signals);
396
397            // Healthy or degraded-but-not-failed owners are left alone; a failed
398            // owner still inside its grace period is held back (flapping damper).
399            if !self.policy.failover_eligible(&owner_score, &owner_signals) {
400                continue;
401            }
402
403            let collection = range.collection().clone();
404            let range_id = range.range_id();
405            let watermark = signals.commit_watermark(&collection, range_id);
406
407            // Consider only safe candidates: a current replica, still an
408            // authorized data member, not itself failed, with catch-up evidence
409            // covering the commit watermark.
410            let mut best: Option<(HealthScore, CatchUpEvidence, NodeIdentity)> = None;
411            for candidate in range.replicas() {
412                if !membership
413                    .member(candidate)
414                    .is_some_and(|m| m.kind().holds_data())
415                {
416                    continue;
417                }
418                let cand_score = self.policy.evaluate(&signals.member_signals(candidate));
419                if cand_score.is_failed() {
420                    // Promoting a failed replica just moves the outage; skip it.
421                    continue;
422                }
423                let Some(evidence) = signals.catch_up(&collection, range_id, candidate) else {
424                    continue;
425                };
426                if !evidence.covers(watermark) {
427                    // Replica is a copy of the range but has not caught up to the
428                    // commit watermark — promoting it could lose committed
429                    // writes. This is the unsafe-candidate rejection.
430                    continue;
431                }
432
433                // Prefer the healthiest candidate; break ties by stable identity
434                // order for determinism.
435                let better = match &best {
436                    None => true,
437                    Some((best_score, _, best_id)) => {
438                        cand_score.overall > best_score.overall
439                            || (cand_score.overall == best_score.overall && candidate < best_id)
440                    }
441                };
442                if better {
443                    best = Some((cand_score, evidence, candidate.clone()));
444                }
445            }
446
447            match best {
448                Some((candidate_score, evidence, candidate)) => {
449                    let request = TransitionRequest::new(
450                        TransitionKind::Promote,
451                        collection.clone(),
452                        range_id,
453                        owner.clone(),
454                        range.epoch(),
455                        range.version(),
456                        candidate.clone(),
457                        watermark,
458                    )
459                    .with_evidence(evidence)
460                    .with_replicas(remaining_replicas(range.replicas(), &candidate));
461                    plan.promotions.push(PlannedPromotion {
462                        collection,
463                        range_id,
464                        failed_owner: owner,
465                        candidate,
466                        candidate_score,
467                        owner_score,
468                        request,
469                    });
470                }
471                None => plan.blocked.push(BlockedFailover {
472                    collection,
473                    range_id,
474                    failed_owner: owner,
475                    owner_score,
476                    reason: BlockedReason::NoSafeCandidate,
477                }),
478            }
479        }
480
481        plan
482    }
483
484    /// Plan failovers and immediately run the safe promotions through the
485    /// ownership transition machine, fencing each failed owner via the epoch
486    /// bump. Returns the activated [`TransitionOutcome`]s and the surviving
487    /// [`FailoverPlan`] (whose `blocked` entries still need attention; its
488    /// `promotions` are the requests that were run).
489    ///
490    /// Each promotion is an independent catalog entry, so running them in
491    /// sequence never invalidates another's CAS. A promotion whose CAS lost a
492    /// race (the catalog moved between planning and activation) surfaces as a
493    /// [`TransitionError`] in the returned vector rather than aborting the rest.
494    pub fn run_failovers(
495        &self,
496        membership: &MembershipCatalog,
497        ownership: &mut ShardOwnershipCatalog,
498        signals: &impl ClusterSignals,
499    ) -> (
500        Vec<Result<TransitionOutcome, TransitionError>>,
501        FailoverPlan,
502    ) {
503        let plan = self.plan_failovers(membership, ownership, signals);
504        let outcomes = plan
505            .promotions
506            .iter()
507            .map(|p| run_transition(ownership, &p.request))
508            .collect();
509        (outcomes, plan)
510    }
511}
512
513/// The replica set the new owner carries after promotion: the old replica set
514/// minus the promoted candidate (it becomes owner, not its own replica). The
515/// failed owner is intentionally *not* added back as a replica — it is fenced
516/// and presumed down; the rebalancer re-replicates once it returns or is
517/// replaced.
518fn remaining_replicas(replicas: &[NodeIdentity], promoted: &NodeIdentity) -> Vec<NodeIdentity> {
519    replicas
520        .iter()
521        .filter(|r| *r != promoted)
522        .cloned()
523        .collect()
524}
525
526#[cfg(test)]
527mod tests {
528    use super::*;
529    use crate::cluster::membership::{ClusterId, ClusterMember, MemberKind};
530    use crate::cluster::ownership::{CatalogVersion, OwnershipEpoch};
531    use crate::cluster::ownership::{
532        PlacementMetadata, RangeBounds, RangeOwnership, RangeRole, RangeWriteReject, ShardKeyMode,
533    };
534    use std::collections::HashMap;
535
536    fn ident(cn: &str) -> NodeIdentity {
537        NodeIdentity::from_certificate_subject(cn).unwrap()
538    }
539
540    fn collection(name: &str) -> CollectionId {
541        CollectionId::new(name).unwrap()
542    }
543
544    fn data_member(cn: &str) -> ClusterMember {
545        ClusterMember::joined_empty(ident(cn), MemberKind::Data)
546    }
547
548    fn membership(members: &[&str]) -> MembershipCatalog {
549        MembershipCatalog::new(
550            ClusterId::new("cluster-x").unwrap(),
551            members.iter().map(|m| data_member(m)),
552        )
553    }
554
555    /// A catalog with one full-keyspace range `orders/1` owned by `owner` with
556    /// `replicas`, at the initial epoch/version.
557    fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
558        let orders = collection("orders");
559        let mut catalog = ShardOwnershipCatalog::new();
560        catalog
561            .apply_update(RangeOwnership::establish(
562                orders.clone(),
563                RangeId::new(1),
564                ShardKeyMode::Hash,
565                RangeBounds::full(),
566                ident(owner),
567                replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
568                PlacementMetadata::with_replication_factor(3),
569            ))
570            .unwrap();
571        (catalog, orders)
572    }
573
574    /// A scripted [`ClusterSignals`]: per-member signals, one shared watermark,
575    /// and per-(range,candidate) catch-up evidence keyed by candidate CN.
576    struct FakeSignals {
577        members: HashMap<NodeIdentity, MemberSignals>,
578        watermark: CommitWatermark,
579        catch_up: HashMap<NodeIdentity, CatchUpEvidence>,
580    }
581
582    impl FakeSignals {
583        fn new(watermark: CommitWatermark) -> Self {
584            Self {
585                members: HashMap::new(),
586                watermark,
587                catch_up: HashMap::new(),
588            }
589        }
590
591        fn with_member(mut self, cn: &str, signals: MemberSignals) -> Self {
592            self.members.insert(ident(cn), signals);
593            self
594        }
595
596        fn with_catch_up(mut self, cn: &str, applied_term: u64, applied_lsn: u64) -> Self {
597            self.catch_up.insert(
598                ident(cn),
599                CatchUpEvidence::new(ident(cn), applied_term, applied_lsn),
600            );
601            self
602        }
603    }
604
605    impl ClusterSignals for FakeSignals {
606        fn member_signals(&self, member: &NodeIdentity) -> MemberSignals {
607            self.members
608                .get(member)
609                .copied()
610                .unwrap_or_else(MemberSignals::healthy)
611        }
612
613        fn commit_watermark(
614            &self,
615            _collection: &CollectionId,
616            _range_id: RangeId,
617        ) -> CommitWatermark {
618            self.watermark
619        }
620
621        fn catch_up(
622            &self,
623            _collection: &CollectionId,
624            _range_id: RangeId,
625            candidate: &NodeIdentity,
626        ) -> Option<CatchUpEvidence> {
627            self.catch_up.get(candidate).cloned()
628        }
629    }
630
631    /// Signals for a failed-and-past-grace owner: no heartbeat for a long time,
632    /// well over the default grace period.
633    fn failed_signals() -> MemberSignals {
634        MemberSignals {
635            since_last_heartbeat: Duration::from_secs(60),
636            replication_lag_lsn: 50_000,
637            recent_errors: 100,
638            unhealthy_for: Duration::from_secs(60),
639        }
640    }
641
642    // --- health scoring ---------------------------------------------------
643
644    #[test]
645    fn fresh_member_scores_perfectly_healthy() {
646        let policy = HealthPolicy::default();
647        let score = policy.evaluate(&MemberSignals::healthy());
648        assert_eq!(score.overall, 100);
649        assert_eq!(score.class, HealthClass::Healthy);
650    }
651
652    #[test]
653    fn score_combines_signals_not_just_a_timeout() {
654        // A member with a *brief* heartbeat gap but good lag and no errors should
655        // not be treated as dead the way a short fixed timeout would. Its liveness
656        // sub-score dips, but lag/errors keep the overall in the Healthy band.
657        let policy = HealthPolicy::default();
658        let signals = MemberSignals {
659            since_last_heartbeat: Duration::from_secs(2), // 1/5 of the 10s timeout
660            replication_lag_lsn: 0,
661            recent_errors: 0,
662            unhealthy_for: Duration::ZERO,
663        };
664        let score = policy.evaluate(&signals);
665        assert_eq!(score.liveness, 80, "heartbeat at 1/5 of the timeout");
666        assert_eq!(score.lag, 100);
667        assert_eq!(score.errors, 100);
668        // overall = 0.7*80 + 0.2*100 + 0.1*100 = 56 + 20 + 10 = 86 -> Healthy.
669        // A 2s fixed timeout would have declared this member dead.
670        assert_eq!(score.overall, 86);
671        assert_eq!(score.class, HealthClass::Healthy);
672    }
673
674    #[test]
675    fn lag_and_errors_pull_a_live_member_into_degraded() {
676        // A member that is heartbeating fine but far behind and erroring is
677        // penalised — a single timeout would have called it perfectly healthy.
678        let policy = HealthPolicy::default();
679        let signals = MemberSignals {
680            since_last_heartbeat: Duration::ZERO,
681            replication_lag_lsn: 10_000, // at the cap -> lag sub-score 0
682            recent_errors: 20,           // at the cap -> error sub-score 0
683            unhealthy_for: Duration::ZERO,
684        };
685        let score = policy.evaluate(&signals);
686        // overall = 0.7*100 + 0.2*0 + 0.1*0 = 70 -> Degraded (<= degraded threshold).
687        assert_eq!(score.overall, 70);
688        assert_eq!(score.class, HealthClass::Degraded);
689    }
690
691    #[test]
692    fn dead_heartbeat_alone_reaches_failed() {
693        // A crashed node stops heartbeating; even if its last-known lag/errors
694        // look perfect, liveness alone must carry it to the failover band — else
695        // the most common failure (a clean crash) would never fail over.
696        let policy = HealthPolicy::default();
697        let signals = MemberSignals {
698            since_last_heartbeat: Duration::from_secs(30),
699            replication_lag_lsn: 0,
700            recent_errors: 0,
701            unhealthy_for: Duration::from_secs(30),
702        };
703        let score = policy.evaluate(&signals);
704        assert_eq!(score.liveness, 0);
705        // overall = 0.7*0 + 0.2*100 + 0.1*100 = 30 -> Failed (<= failover threshold).
706        assert_eq!(score.overall, 30);
707        assert_eq!(score.class, HealthClass::Failed);
708    }
709
710    #[test]
711    fn totally_unreachable_member_is_failed() {
712        // A member we cannot reach reports a dead heartbeat *and* growing lag and
713        // errors — every axis bottoms out, so it lands well under the failover
714        // threshold.
715        let policy = HealthPolicy::default();
716        let signals = MemberSignals {
717            since_last_heartbeat: Duration::from_secs(30),
718            replication_lag_lsn: 50_000,
719            recent_errors: 100,
720            unhealthy_for: Duration::from_secs(30),
721        };
722        let score = policy.evaluate(&signals);
723        assert_eq!(score.overall, 0);
724        assert_eq!(score.class, HealthClass::Failed);
725    }
726
727    // --- failover planning: the five acceptance scenarios -----------------
728
729    #[test]
730    fn healthy_cluster_is_a_no_op() {
731        let supervisor = ClusterSupervisor::default();
732        let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
733        let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
734        // All members healthy by default.
735        let signals = FakeSignals::new(CommitWatermark::new(1, 10));
736
737        let plan = supervisor.plan_failovers(&members, &catalog, &signals);
738        assert!(plan.is_empty(), "no failover when every owner is healthy");
739    }
740
741    #[test]
742    fn degraded_owner_is_detected_but_not_failed_over() {
743        // node-a is degraded (live, but lagging+erroring) — observable, but the
744        // supervisor must not move ownership for a merely-degraded owner.
745        let supervisor = ClusterSupervisor::default();
746        let members = membership(&["CN=node-a", "CN=node-b"]);
747        let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b"]);
748        let signals = FakeSignals::new(CommitWatermark::new(1, 10)).with_member(
749            "CN=node-a",
750            MemberSignals {
751                since_last_heartbeat: Duration::ZERO,
752                replication_lag_lsn: 10_000,
753                recent_errors: 20,
754                unhealthy_for: Duration::ZERO,
755            },
756        );
757
758        let score = supervisor.assess(&signals.member_signals(&ident("CN=node-a")));
759        assert_eq!(score.class, HealthClass::Degraded, "detected as degraded");
760
761        let plan = supervisor.plan_failovers(&members, &catalog, &signals);
762        assert!(plan.is_empty(), "a degraded owner is not failed over");
763    }
764
765    #[test]
766    fn safe_candidate_is_promoted_and_old_owner_is_fenced() {
767        let supervisor = ClusterSupervisor::default();
768        let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
769        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
770        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
771            .with_member("CN=node-a", failed_signals())
772            // node-b is healthy and fully caught up; node-c is caught up too but
773            // we expect node-b chosen on identity tie-break.
774            .with_catch_up("CN=node-b", 1, 10)
775            .with_catch_up("CN=node-c", 1, 10);
776
777        let (outcomes, plan) = supervisor.run_failovers(&members, &mut catalog, &signals);
778        assert_eq!(plan.promotions.len(), 1);
779        assert!(plan.blocked.is_empty());
780        let promotion = &plan.promotions[0];
781        assert_eq!(promotion.failed_owner, ident("CN=node-a"));
782        assert_eq!(
783            promotion.candidate,
784            ident("CN=node-b"),
785            "healthiest, tie -> lowest id"
786        );
787
788        let outcome = outcomes[0].as_ref().expect("promotion should activate");
789        assert_eq!(outcome.kind, TransitionKind::Promote);
790        assert!(
791            outcome.fenced_old_owner(),
792            "epoch bumped to fence old owner"
793        );
794        assert_eq!(outcome.new_owner, ident("CN=node-b"));
795
796        // The catalog now makes node-b the owner at the bumped epoch, and the old
797        // owner node-a is fenced from public writes under the old epoch.
798        let range = catalog.range(&orders, RangeId::new(1)).unwrap();
799        assert_eq!(range.owner(), &ident("CN=node-b"));
800        assert_eq!(range.role_of(&ident("CN=node-b")), RangeRole::Owner);
801        let err = catalog
802            .admit_public_write(
803                &ident("CN=node-a"),
804                &orders,
805                b"k",
806                OwnershipEpoch::initial(),
807            )
808            .unwrap_err();
809        assert!(matches!(
810            err,
811            RangeWriteReject::NotOwner { .. } | RangeWriteReject::StaleEpoch { .. }
812        ));
813    }
814
815    #[test]
816    fn unsafe_candidate_behind_watermark_is_rejected() {
817        // node-a failed; its only replica node-b is a copy of the range but has
818        // NOT caught up to the commit watermark (term 2 lsn 50 vs applied 2/49).
819        // Promoting it could lose committed writes, so failover is blocked and
820        // the catalog is untouched.
821        let supervisor = ClusterSupervisor::default();
822        let members = membership(&["CN=node-a", "CN=node-b"]);
823        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
824        let signals = FakeSignals::new(CommitWatermark::new(2, 50))
825            .with_member("CN=node-a", failed_signals())
826            .with_catch_up("CN=node-b", 2, 49); // one LSN short
827
828        let (outcomes, plan) = supervisor.run_failovers(&members, &mut catalog, &signals);
829        assert!(plan.promotions.is_empty(), "no safe promotion");
830        assert!(outcomes.is_empty());
831        assert_eq!(plan.blocked.len(), 1);
832        assert_eq!(plan.blocked[0].reason, BlockedReason::NoSafeCandidate);
833        assert_eq!(plan.blocked[0].failed_owner, ident("CN=node-a"));
834
835        // Catalog is unchanged — node-a still owner at the initial epoch.
836        let range = catalog.range(&orders, RangeId::new(1)).unwrap();
837        assert_eq!(range.owner(), &ident("CN=node-a"));
838        assert_eq!(range.epoch(), OwnershipEpoch::initial());
839        assert_eq!(range.version(), CatalogVersion::initial());
840    }
841
842    #[test]
843    fn flapping_owner_within_grace_period_is_not_failed_over() {
844        // node-a's score is Failed, but it has only been unhealthy for 2s — well
845        // inside the default 30s grace period. A flap must not move ownership.
846        let supervisor = ClusterSupervisor::default();
847        let members = membership(&["CN=node-a", "CN=node-b"]);
848        let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b"]);
849        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
850            .with_member(
851                "CN=node-a",
852                MemberSignals {
853                    since_last_heartbeat: Duration::from_secs(30),
854                    replication_lag_lsn: 50_000,
855                    recent_errors: 100,
856                    unhealthy_for: Duration::from_secs(2), // inside grace
857                },
858            )
859            .with_catch_up("CN=node-b", 1, 10);
860
861        // The owner *is* scored Failed...
862        let score = supervisor.assess(&signals.member_signals(&ident("CN=node-a")));
863        assert_eq!(score.class, HealthClass::Failed);
864        // ...but the grace period holds the failover back.
865        let plan = supervisor.plan_failovers(&members, &catalog, &signals);
866        assert!(plan.is_empty(), "flap inside grace period is damped");
867    }
868
869    #[test]
870    fn unknown_candidate_progress_blocks_failover() {
871        // node-a failed; node-b is a replica and a member, but the supervisor has
872        // no catch-up evidence for it. Fail closed: blocked, not promoted.
873        let supervisor = ClusterSupervisor::default();
874        let members = membership(&["CN=node-a", "CN=node-b"]);
875        let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b"]);
876        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
877            .with_member("CN=node-a", failed_signals());
878
879        let plan = supervisor.plan_failovers(&members, &catalog, &signals);
880        assert_eq!(plan.blocked.len(), 1);
881        assert_eq!(plan.blocked[0].reason, BlockedReason::NoSafeCandidate);
882    }
883
884    #[test]
885    fn non_replica_node_is_never_a_candidate() {
886        // node-a failed and has NO replicas for the range. node-c is a healthy,
887        // caught-up member — but it is not a replica, so it is never considered.
888        let supervisor = ClusterSupervisor::default();
889        let members = membership(&["CN=node-a", "CN=node-c"]);
890        let (catalog, _orders) = catalog_with("CN=node-a", &[]);
891        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
892            .with_member("CN=node-a", failed_signals())
893            .with_catch_up("CN=node-c", 9, 999);
894
895        let plan = supervisor.plan_failovers(&members, &catalog, &signals);
896        assert_eq!(plan.blocked.len(), 1, "no replica -> no safe candidate");
897        assert!(plan.promotions.is_empty());
898    }
899
900    #[test]
901    fn failed_replica_is_not_promoted() {
902        // node-a failed; node-b is a caught-up replica but is itself failed.
903        // Promoting it would just move the outage, so it is not selected.
904        let supervisor = ClusterSupervisor::default();
905        let members = membership(&["CN=node-a", "CN=node-b"]);
906        let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b"]);
907        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
908            .with_member("CN=node-a", failed_signals())
909            .with_member("CN=node-b", failed_signals())
910            .with_catch_up("CN=node-b", 1, 10);
911
912        let plan = supervisor.plan_failovers(&members, &catalog, &signals);
913        assert_eq!(plan.blocked.len(), 1);
914        assert_eq!(plan.blocked[0].reason, BlockedReason::NoSafeCandidate);
915    }
916
917    #[test]
918    fn healthiest_caught_up_candidate_is_preferred() {
919        // Both replicas are caught up, but node-c is healthier than node-b, so it
920        // wins despite node-b sorting first by identity.
921        let supervisor = ClusterSupervisor::default();
922        let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
923        let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
924        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
925            .with_member("CN=node-a", failed_signals())
926            .with_member(
927                "CN=node-b",
928                MemberSignals {
929                    since_last_heartbeat: Duration::from_secs(4),
930                    replication_lag_lsn: 0,
931                    recent_errors: 0,
932                    unhealthy_for: Duration::ZERO,
933                },
934            ) // degraded-ish liveness, lower score
935            .with_member("CN=node-c", MemberSignals::healthy())
936            .with_catch_up("CN=node-b", 1, 10)
937            .with_catch_up("CN=node-c", 1, 10);
938
939        let plan = supervisor.plan_failovers(&members, &catalog, &signals);
940        assert_eq!(plan.promotions.len(), 1);
941        assert_eq!(
942            plan.promotions[0].candidate,
943            ident("CN=node-c"),
944            "healthier candidate preferred over identity tie-break",
945        );
946    }
947
948    #[test]
949    fn promoted_owner_drops_itself_from_the_replica_set() {
950        let supervisor = ClusterSupervisor::default();
951        let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
952        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
953        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
954            .with_member("CN=node-a", failed_signals())
955            .with_catch_up("CN=node-b", 1, 10)
956            .with_catch_up("CN=node-c", 1, 10);
957
958        supervisor.run_failovers(&members, &mut catalog, &signals);
959        let range = catalog.range(&orders, RangeId::new(1)).unwrap();
960        assert_eq!(range.owner(), &ident("CN=node-b"));
961        // node-b is no longer in its own replica set; node-c remains; the fenced
962        // old owner node-a is not re-added.
963        assert!(!range.replicas().contains(&ident("CN=node-b")));
964        assert!(range.replicas().contains(&ident("CN=node-c")));
965        assert!(!range.replicas().contains(&ident("CN=node-a")));
966    }
967
968    #[test]
969    fn assess_members_scores_every_authorized_member() {
970        let supervisor = ClusterSupervisor::default();
971        let members = membership(&["CN=node-a", "CN=node-b"]);
972        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
973            .with_member("CN=node-a", failed_signals());
974
975        let scores = supervisor.assess_members(&members, &signals);
976        assert_eq!(scores.len(), 2);
977        assert_eq!(scores[&ident("CN=node-a")].class, HealthClass::Failed);
978        assert_eq!(scores[&ident("CN=node-b")].class, HealthClass::Healthy);
979    }
980}