reddb-io-server 1.12.0

RedDB server-side engine: storage, runtime, replication, MCP, AI, and the gRPC/HTTP/RedWire/PG-wire dispatchers. Re-exported by the umbrella `reddb` crate.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
//! Member health scoring and automatic range failover (issue #998, PRD #987,
//! ADR 0037).
//!
//! The **Cluster Supervisor** is the control-plane component that watches the
//! authorized members ([`MembershipCatalog`]), decides when an owner has failed,
//! and drives a *safe, fenced* range failover through the one sanctioned path —
//! the ownership transition state machine ([`super::ownership_transition`]). It
//! never edits ownership directly: every promotion it proposes is a
//! [`TransitionRequest`] that the transition machine re-validates (three-part
//! CAS + commit-watermark safety gate) before it touches the catalog.
//!
//! ## Health scoring, not a single short timeout
//!
//! A naive supervisor declares a member dead the instant one heartbeat is late.
//! That is brittle: a single dropped packet, a GC pause, or a brief network
//! hiccup triggers a needless, disruptive failover (and, under load, a *storm*
//! of them). Instead the supervisor combines four signals into a
//! [`HealthScore`]:
//!
//! * **Liveness** — time since the last heartbeat. The dominant signal, but not
//!   the only one.
//! * **Replication lag** — how far behind the range commit watermark the member
//!   is. A live-but-far-behind owner is a poor owner.
//! * **Recent errors** — observed failures in the recent window.
//! * **Grace period** — how long the member has been continuously below the
//!   failover threshold. This is the flapping damper: a member that dips and
//!   recovers inside the grace window is never failed over.
//!
//! The first three combine into a 0..=100 score (weighted, liveness-heavy);
//! the score classifies the member as [`Healthy`](HealthClass::Healthy),
//! [`Degraded`](HealthClass::Degraded), or [`Failed`](HealthClass::Failed). The
//! grace period then gates the *action*: only a `Failed` owner that has stayed
//! failed for at least the grace period is eligible for automatic failover.
//! Together the score and the grace period damp false positives and flapping
//! (acceptance criteria 1 and 4).
//!
//! ## Safe candidate selection
//!
//! When an owner is eligible for failover, the supervisor considers **only**
//! candidates that are (a) current replicas of the range, (b) still authorized
//! data members, and (c) backed by catch-up evidence that covers the range
//! commit watermark — exactly the bar the transition machine enforces. An
//! arbitrary node, a witness, or a replica that has not caught up is never a
//! promotion target (acceptance criterion 2). Among the safe candidates the
//! supervisor prefers the healthiest, breaking ties by stable identity order so
//! the plan is deterministic.
//!
//! The selected promotion is a [`TransitionKind::Promote`] request; activating
//! it bumps the ownership epoch, which fences the failed owner — any write it
//! still attempts under the old epoch is rejected by
//! [`admit_public_write`](super::ownership::ShardOwnershipCatalog::admit_public_write)
//! (acceptance criterion 3).
//!
//! ## Purity
//!
//! All state the supervisor needs from the running cluster — heartbeat,
//! lag, error counts, grace tracking, per-range commit watermarks, and
//! per-candidate catch-up progress — is read through the [`ClusterSignals`]
//! trait, injected by the caller. The supervisor itself is a pure policy over
//! the membership and ownership catalogs plus those signals, so the whole
//! scoring/selection/fencing story is exercised deterministically with a
//! scripted fake — no clock, no network, no engine.

use std::collections::BTreeMap;
use std::time::Duration;

use super::identity::NodeIdentity;
use super::membership::MembershipCatalog;
use super::ownership::{CollectionId, RangeId, ShardOwnershipCatalog};
use super::ownership_transition::{
    run_transition, CatchUpEvidence, CommitWatermark, TransitionError, TransitionKind,
    TransitionOutcome, TransitionRequest,
};

/// Raw, point-in-time health signals for one member, read from the running
/// cluster through [`ClusterSignals::member_signals`]. The supervisor turns
/// these into a [`HealthScore`]; it owns no clock or counters itself.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct MemberSignals {
    /// Time since the member's last heartbeat was received (liveness).
    pub since_last_heartbeat: Duration,
    /// How many WAL LSNs the member trails the range commit watermark by, as an
    /// aggregate liveness-of-replication signal. Zero means fully caught up.
    pub replication_lag_lsn: u64,
    /// Observed errors from the member in the recent observation window.
    pub recent_errors: u32,
    /// How long the member has been *continuously* below the failover
    /// threshold. Zero for a healthy member; the caller resets it the moment the
    /// member recovers. This is the grace-period input that damps flapping —
    /// the supervisor refuses to fail over until it reaches the policy's grace
    /// period.
    pub unhealthy_for: Duration,
}

impl MemberSignals {
    /// A perfectly healthy member: fresh heartbeat, no lag, no errors, never
    /// unhealthy. Handy as a test/observation baseline.
    pub fn healthy() -> Self {
        Self {
            since_last_heartbeat: Duration::ZERO,
            replication_lag_lsn: 0,
            recent_errors: 0,
            unhealthy_for: Duration::ZERO,
        }
    }
}

/// How a member's [`HealthScore`] classifies against the policy thresholds.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum HealthClass {
    /// Above the degraded threshold — a fully serving member.
    Healthy,
    /// Below the degraded threshold but above the failover threshold — observed
    /// as impaired, but **not** failed over. Surfacing this is what lets an
    /// operator see trouble building before it becomes an outage.
    Degraded,
    /// At or below the failover threshold — a failover candidate *once the grace
    /// period has elapsed*.
    Failed,
}

/// A member's combined health, with the per-axis sub-scores kept visible so an
/// operator (or a test) can see *why* a member scored as it did rather than
/// just the verdict.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct HealthScore {
    /// Combined 0..=100 score (higher is healthier).
    pub overall: u8,
    /// Liveness sub-score from the heartbeat age (0..=100).
    pub liveness: u8,
    /// Replication-lag sub-score (0..=100).
    pub lag: u8,
    /// Recent-error sub-score (0..=100).
    pub errors: u8,
    /// The classification the combined score falls into.
    pub class: HealthClass,
}

impl HealthScore {
    pub fn is_healthy(&self) -> bool {
        self.class == HealthClass::Healthy
    }

    pub fn is_failed(&self) -> bool {
        self.class == HealthClass::Failed
    }
}

/// The tunables that turn raw signals into a [`HealthScore`] and gate failover.
///
/// The defaults ([`HealthPolicy::default`]) are deliberately conservative:
/// generous enough that an ordinary hiccup does not trip a failover, with a
/// grace period long enough to ride out a transient blip.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct HealthPolicy {
    /// Heartbeat age at or beyond which the liveness sub-score bottoms out at 0.
    pub heartbeat_timeout: Duration,
    /// Replication lag (LSNs) at or beyond which the lag sub-score bottoms out.
    pub max_replication_lag: u64,
    /// Recent-error count at or beyond which the error sub-score bottoms out.
    pub max_recent_errors: u32,
    /// Score (inclusive) at or below which a member is [`Failed`](HealthClass::Failed).
    pub failover_threshold: u8,
    /// Score (inclusive) at or below which a member is at least
    /// [`Degraded`](HealthClass::Degraded). Must be `>= failover_threshold`.
    pub degraded_threshold: u8,
    /// How long a member must stay continuously `Failed` before the supervisor
    /// will fail it over. The flapping damper: a shorter blip never triggers a
    /// transition.
    pub grace_period: Duration,
}

impl Default for HealthPolicy {
    fn default() -> Self {
        Self {
            heartbeat_timeout: Duration::from_secs(10),
            max_replication_lag: 10_000,
            max_recent_errors: 20,
            failover_threshold: 30,
            degraded_threshold: 70,
            grace_period: Duration::from_secs(30),
        }
    }
}

/// Linear sub-score in 0..=100: full marks at `0`, zero at/after `limit`.
fn ramp_down(value: f64, limit: f64) -> u8 {
    if limit <= 0.0 {
        // No tolerance configured: anything non-zero is a total failure on this
        // axis, zero is perfect.
        return if value <= 0.0 { 100 } else { 0 };
    }
    let clamped = value.min(limit);
    (100.0 * (1.0 - clamped / limit)).round() as u8
}

impl HealthPolicy {
    /// Combine raw `signals` into a [`HealthScore`] under this policy.
    ///
    /// The three serving signals fold into the overall score with a
    /// liveness-heavy weighting (liveness 70%, lag 20%, errors 10%): a member
    /// whose heartbeat has fully lapsed must be able to reach the failover
    /// threshold on liveness alone — a crashed node stops heartbeating but its
    /// *last-known* lag and error counts may still look fine, so trusting them
    /// would wedge failover shut. At the same time a live owner that is far
    /// behind or erroring is penalised rather than trusted blindly, and a
    /// *brief* heartbeat gap with good lag/errors stays out of the failover band
    /// (which a single short fixed timeout could not express). The grace-period
    /// signal (`unhealthy_for`) is *not* part of the score — it gates the
    /// failover action in [`failover_eligible`](Self::failover_eligible).
    pub fn evaluate(&self, signals: &MemberSignals) -> HealthScore {
        let liveness = ramp_down(
            signals.since_last_heartbeat.as_secs_f64(),
            self.heartbeat_timeout.as_secs_f64(),
        );
        let lag = ramp_down(
            signals.replication_lag_lsn as f64,
            self.max_replication_lag as f64,
        );
        let errors = ramp_down(signals.recent_errors as f64, self.max_recent_errors as f64);

        let overall =
            (liveness as f64 * 0.7 + lag as f64 * 0.2 + errors as f64 * 0.1).round() as u8;
        let class = if overall <= self.failover_threshold {
            HealthClass::Failed
        } else if overall <= self.degraded_threshold {
            HealthClass::Degraded
        } else {
            HealthClass::Healthy
        };

        HealthScore {
            overall,
            liveness,
            lag,
            errors,
            class,
        }
    }

    /// Whether a member with this `score` and these `signals` is eligible for
    /// automatic failover: it must be [`Failed`](HealthClass::Failed) **and**
    /// have stayed unhealthy for at least the grace period. A `Failed` member
    /// still inside the grace window is held back — the flapping damper.
    pub fn failover_eligible(&self, score: &HealthScore, signals: &MemberSignals) -> bool {
        score.is_failed() && signals.unhealthy_for >= self.grace_period
    }
}

/// The cluster state the supervisor reads but does not own: per-member health
/// signals, per-range commit watermarks, and per-candidate catch-up evidence.
///
/// Production backs this onto the heartbeat tracker, the replica registry, and
/// the per-range stream progress (issue #992); tests back it onto a scripted
/// fake. Keeping it behind a trait is what makes the supervisor a pure policy.
pub trait ClusterSignals {
    /// Current raw health signals for `member`.
    fn member_signals(&self, member: &NodeIdentity) -> MemberSignals;

    /// The range commit watermark a promotion candidate must cover for
    /// `(collection, range_id)` — the highest `(term, lsn)` known durable under
    /// the range's commit policy.
    fn commit_watermark(&self, collection: &CollectionId, range_id: RangeId) -> CommitWatermark;

    /// The catch-up evidence the supervisor has for `candidate` on the range, or
    /// `None` if the candidate's progress is unknown (in which case it cannot be
    /// promoted — fail closed).
    fn catch_up(
        &self,
        collection: &CollectionId,
        range_id: RangeId,
        candidate: &NodeIdentity,
    ) -> Option<CatchUpEvidence>;
}

/// A safe, validated promotion the supervisor proposes for one range: the failed
/// owner, the chosen caught-up candidate, the owner's health at decision time,
/// and the [`TransitionRequest`] (already carrying the three-part CAS,
/// watermark, and catch-up evidence) to run through the transition machine.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PlannedPromotion {
    pub collection: CollectionId,
    pub range_id: RangeId,
    pub failed_owner: NodeIdentity,
    pub candidate: NodeIdentity,
    pub candidate_score: HealthScore,
    pub owner_score: HealthScore,
    pub request: TransitionRequest,
}

/// Why a failing owner's range could **not** be failed over. Surfaced rather
/// than silently skipped, so an operator can see a range that needs attention.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum BlockedReason {
    /// The owner is failing but no replica is a safe candidate — none is an
    /// authorized data member with catch-up evidence covering the commit
    /// watermark. Failing over here could lose committed writes.
    NoSafeCandidate,
}

/// A failing owner's range with no safe failover target.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BlockedFailover {
    pub collection: CollectionId,
    pub range_id: RangeId,
    pub failed_owner: NodeIdentity,
    pub owner_score: HealthScore,
    pub reason: BlockedReason,
}

/// The supervisor's decision for one scan: the safe promotions to run and the
/// failing ranges with no safe target. A cluster with all owners healthy yields
/// an empty plan ([`is_empty`](Self::is_empty)) — the healthy no-op.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct FailoverPlan {
    /// Safe, ready-to-run promotions, in `(collection, range_id)` order.
    pub promotions: Vec<PlannedPromotion>,
    /// Failing ranges that have no safe candidate.
    pub blocked: Vec<BlockedFailover>,
}

impl FailoverPlan {
    /// Nothing to do — every owner is healthy (or degraded-but-not-failed, or
    /// within its grace period). The healthy no-op the supervisor must produce
    /// for a stable cluster.
    pub fn is_empty(&self) -> bool {
        self.promotions.is_empty() && self.blocked.is_empty()
    }
}

/// The cluster supervisor: health scoring + automatic range failover planning.
///
/// Holds only the [`HealthPolicy`]; all live state is read through
/// [`ClusterSignals`] at scan time, so one supervisor instance serves the whole
/// cluster lifetime.
#[derive(Debug, Clone, Default)]
pub struct ClusterSupervisor {
    policy: HealthPolicy,
}

impl ClusterSupervisor {
    /// A supervisor with the given health policy.
    pub fn new(policy: HealthPolicy) -> Self {
        Self { policy }
    }

    pub fn policy(&self) -> &HealthPolicy {
        &self.policy
    }

    /// Score a single member's health under the policy. The building block of
    /// degraded-member detection: an operator surface calls this for every
    /// authorized member to render a health view.
    pub fn assess(&self, signals: &MemberSignals) -> HealthScore {
        self.policy.evaluate(signals)
    }

    /// Score every authorized member of `membership`, in stable identity order.
    /// Includes healthy, degraded, and failed members alike — the input to a
    /// cluster health dashboard.
    pub fn assess_members(
        &self,
        membership: &MembershipCatalog,
        signals: &impl ClusterSignals,
    ) -> BTreeMap<NodeIdentity, HealthScore> {
        membership
            .members()
            .map(|m| {
                let id = m.identity().clone();
                let score = self.policy.evaluate(&signals.member_signals(&id));
                (id, score)
            })
            .collect()
    }

    /// Plan automatic failovers across the whole ownership catalog **without**
    /// mutating it. For each range whose owner is failover-eligible (Failed and
    /// past the grace period), pick the safest caught-up replica candidate and
    /// produce a [`PlannedPromotion`]; if no replica is safe, record a
    /// [`BlockedFailover`]. Owners that are healthy, merely degraded, or still
    /// inside their grace period produce nothing.
    pub fn plan_failovers(
        &self,
        membership: &MembershipCatalog,
        ownership: &ShardOwnershipCatalog,
        signals: &impl ClusterSignals,
    ) -> FailoverPlan {
        // entries() yields ranges in (collection, range_id) order, so the plan
        // is deterministic.
        let mut plan = FailoverPlan::default();

        for range in ownership.entries() {
            let owner = range.owner().clone();
            let owner_signals = signals.member_signals(&owner);
            let owner_score = self.policy.evaluate(&owner_signals);

            // Healthy or degraded-but-not-failed owners are left alone; a failed
            // owner still inside its grace period is held back (flapping damper).
            if !self.policy.failover_eligible(&owner_score, &owner_signals) {
                continue;
            }

            let collection = range.collection().clone();
            let range_id = range.range_id();
            let watermark = signals.commit_watermark(&collection, range_id);

            // Consider only safe candidates: a current replica, still an
            // authorized data member, not itself failed, with catch-up evidence
            // covering the commit watermark.
            let mut best: Option<(HealthScore, CatchUpEvidence, NodeIdentity)> = None;
            for candidate in range.replicas() {
                if !membership
                    .member(candidate)
                    .is_some_and(|m| m.kind().holds_data())
                {
                    continue;
                }
                let cand_score = self.policy.evaluate(&signals.member_signals(candidate));
                if cand_score.is_failed() {
                    // Promoting a failed replica just moves the outage; skip it.
                    continue;
                }
                let Some(evidence) = signals.catch_up(&collection, range_id, candidate) else {
                    continue;
                };
                if !evidence.covers(watermark) {
                    // Replica is a copy of the range but has not caught up to the
                    // commit watermark — promoting it could lose committed
                    // writes. This is the unsafe-candidate rejection.
                    continue;
                }

                // Prefer the healthiest candidate; break ties by stable identity
                // order for determinism.
                let better = match &best {
                    None => true,
                    Some((best_score, _, best_id)) => {
                        cand_score.overall > best_score.overall
                            || (cand_score.overall == best_score.overall && candidate < best_id)
                    }
                };
                if better {
                    best = Some((cand_score, evidence, candidate.clone()));
                }
            }

            match best {
                Some((candidate_score, evidence, candidate)) => {
                    let request = TransitionRequest::new(
                        TransitionKind::Promote,
                        collection.clone(),
                        range_id,
                        owner.clone(),
                        range.epoch(),
                        range.version(),
                        candidate.clone(),
                        watermark,
                    )
                    .with_evidence(evidence)
                    .with_replicas(remaining_replicas(range.replicas(), &candidate));
                    plan.promotions.push(PlannedPromotion {
                        collection,
                        range_id,
                        failed_owner: owner,
                        candidate,
                        candidate_score,
                        owner_score,
                        request,
                    });
                }
                None => plan.blocked.push(BlockedFailover {
                    collection,
                    range_id,
                    failed_owner: owner,
                    owner_score,
                    reason: BlockedReason::NoSafeCandidate,
                }),
            }
        }

        plan
    }

    /// Plan failovers and immediately run the safe promotions through the
    /// ownership transition machine, fencing each failed owner via the epoch
    /// bump. Returns the activated [`TransitionOutcome`]s and the surviving
    /// [`FailoverPlan`] (whose `blocked` entries still need attention; its
    /// `promotions` are the requests that were run).
    ///
    /// Each promotion is an independent catalog entry, so running them in
    /// sequence never invalidates another's CAS. A promotion whose CAS lost a
    /// race (the catalog moved between planning and activation) surfaces as a
    /// [`TransitionError`] in the returned vector rather than aborting the rest.
    pub fn run_failovers(
        &self,
        membership: &MembershipCatalog,
        ownership: &mut ShardOwnershipCatalog,
        signals: &impl ClusterSignals,
    ) -> (
        Vec<Result<TransitionOutcome, TransitionError>>,
        FailoverPlan,
    ) {
        let plan = self.plan_failovers(membership, ownership, signals);
        let outcomes = plan
            .promotions
            .iter()
            .map(|p| run_transition(ownership, &p.request))
            .collect();
        (outcomes, plan)
    }
}

/// The replica set the new owner carries after promotion: the old replica set
/// minus the promoted candidate (it becomes owner, not its own replica). The
/// failed owner is intentionally *not* added back as a replica — it is fenced
/// and presumed down; the rebalancer re-replicates once it returns or is
/// replaced.
fn remaining_replicas(replicas: &[NodeIdentity], promoted: &NodeIdentity) -> Vec<NodeIdentity> {
    replicas
        .iter()
        .filter(|r| *r != promoted)
        .cloned()
        .collect()
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::cluster::membership::{ClusterId, ClusterMember, MemberKind};
    use crate::cluster::ownership::{CatalogVersion, OwnershipEpoch};
    use crate::cluster::ownership::{
        PlacementMetadata, RangeBounds, RangeOwnership, RangeRole, RangeWriteReject, ShardKeyMode,
    };
    use std::collections::HashMap;

    fn ident(cn: &str) -> NodeIdentity {
        NodeIdentity::from_certificate_subject(cn).unwrap()
    }

    fn collection(name: &str) -> CollectionId {
        CollectionId::new(name).unwrap()
    }

    fn data_member(cn: &str) -> ClusterMember {
        ClusterMember::joined_empty(ident(cn), MemberKind::Data)
    }

    fn membership(members: &[&str]) -> MembershipCatalog {
        MembershipCatalog::new(
            ClusterId::new("cluster-x").unwrap(),
            members.iter().map(|m| data_member(m)),
        )
    }

    /// A catalog with one full-keyspace range `orders/1` owned by `owner` with
    /// `replicas`, at the initial epoch/version.
    fn catalog_with(owner: &str, replicas: &[&str]) -> (ShardOwnershipCatalog, CollectionId) {
        let orders = collection("orders");
        let mut catalog = ShardOwnershipCatalog::new();
        catalog
            .apply_update(RangeOwnership::establish(
                orders.clone(),
                RangeId::new(1),
                ShardKeyMode::Hash,
                RangeBounds::full(),
                ident(owner),
                replicas.iter().map(|r| ident(r)).collect::<Vec<_>>(),
                PlacementMetadata::with_replication_factor(3),
            ))
            .unwrap();
        (catalog, orders)
    }

    /// A scripted [`ClusterSignals`]: per-member signals, one shared watermark,
    /// and per-(range,candidate) catch-up evidence keyed by candidate CN.
    struct FakeSignals {
        members: HashMap<NodeIdentity, MemberSignals>,
        watermark: CommitWatermark,
        catch_up: HashMap<NodeIdentity, CatchUpEvidence>,
    }

    impl FakeSignals {
        fn new(watermark: CommitWatermark) -> Self {
            Self {
                members: HashMap::new(),
                watermark,
                catch_up: HashMap::new(),
            }
        }

        fn with_member(mut self, cn: &str, signals: MemberSignals) -> Self {
            self.members.insert(ident(cn), signals);
            self
        }

        fn with_catch_up(mut self, cn: &str, applied_term: u64, applied_lsn: u64) -> Self {
            self.catch_up.insert(
                ident(cn),
                CatchUpEvidence::new(ident(cn), applied_term, applied_lsn),
            );
            self
        }
    }

    impl ClusterSignals for FakeSignals {
        fn member_signals(&self, member: &NodeIdentity) -> MemberSignals {
            self.members
                .get(member)
                .copied()
                .unwrap_or_else(MemberSignals::healthy)
        }

        fn commit_watermark(
            &self,
            _collection: &CollectionId,
            _range_id: RangeId,
        ) -> CommitWatermark {
            self.watermark
        }

        fn catch_up(
            &self,
            _collection: &CollectionId,
            _range_id: RangeId,
            candidate: &NodeIdentity,
        ) -> Option<CatchUpEvidence> {
            self.catch_up.get(candidate).cloned()
        }
    }

    /// Signals for a failed-and-past-grace owner: no heartbeat for a long time,
    /// well over the default grace period.
    fn failed_signals() -> MemberSignals {
        MemberSignals {
            since_last_heartbeat: Duration::from_secs(60),
            replication_lag_lsn: 50_000,
            recent_errors: 100,
            unhealthy_for: Duration::from_secs(60),
        }
    }

    // --- health scoring ---------------------------------------------------

    #[test]
    fn fresh_member_scores_perfectly_healthy() {
        let policy = HealthPolicy::default();
        let score = policy.evaluate(&MemberSignals::healthy());
        assert_eq!(score.overall, 100);
        assert_eq!(score.class, HealthClass::Healthy);
    }

    #[test]
    fn score_combines_signals_not_just_a_timeout() {
        // A member with a *brief* heartbeat gap but good lag and no errors should
        // not be treated as dead the way a short fixed timeout would. Its liveness
        // sub-score dips, but lag/errors keep the overall in the Healthy band.
        let policy = HealthPolicy::default();
        let signals = MemberSignals {
            since_last_heartbeat: Duration::from_secs(2), // 1/5 of the 10s timeout
            replication_lag_lsn: 0,
            recent_errors: 0,
            unhealthy_for: Duration::ZERO,
        };
        let score = policy.evaluate(&signals);
        assert_eq!(score.liveness, 80, "heartbeat at 1/5 of the timeout");
        assert_eq!(score.lag, 100);
        assert_eq!(score.errors, 100);
        // overall = 0.7*80 + 0.2*100 + 0.1*100 = 56 + 20 + 10 = 86 -> Healthy.
        // A 2s fixed timeout would have declared this member dead.
        assert_eq!(score.overall, 86);
        assert_eq!(score.class, HealthClass::Healthy);
    }

    #[test]
    fn lag_and_errors_pull_a_live_member_into_degraded() {
        // A member that is heartbeating fine but far behind and erroring is
        // penalised — a single timeout would have called it perfectly healthy.
        let policy = HealthPolicy::default();
        let signals = MemberSignals {
            since_last_heartbeat: Duration::ZERO,
            replication_lag_lsn: 10_000, // at the cap -> lag sub-score 0
            recent_errors: 20,           // at the cap -> error sub-score 0
            unhealthy_for: Duration::ZERO,
        };
        let score = policy.evaluate(&signals);
        // overall = 0.7*100 + 0.2*0 + 0.1*0 = 70 -> Degraded (<= degraded threshold).
        assert_eq!(score.overall, 70);
        assert_eq!(score.class, HealthClass::Degraded);
    }

    #[test]
    fn dead_heartbeat_alone_reaches_failed() {
        // A crashed node stops heartbeating; even if its last-known lag/errors
        // look perfect, liveness alone must carry it to the failover band — else
        // the most common failure (a clean crash) would never fail over.
        let policy = HealthPolicy::default();
        let signals = MemberSignals {
            since_last_heartbeat: Duration::from_secs(30),
            replication_lag_lsn: 0,
            recent_errors: 0,
            unhealthy_for: Duration::from_secs(30),
        };
        let score = policy.evaluate(&signals);
        assert_eq!(score.liveness, 0);
        // overall = 0.7*0 + 0.2*100 + 0.1*100 = 30 -> Failed (<= failover threshold).
        assert_eq!(score.overall, 30);
        assert_eq!(score.class, HealthClass::Failed);
    }

    #[test]
    fn totally_unreachable_member_is_failed() {
        // A member we cannot reach reports a dead heartbeat *and* growing lag and
        // errors — every axis bottoms out, so it lands well under the failover
        // threshold.
        let policy = HealthPolicy::default();
        let signals = MemberSignals {
            since_last_heartbeat: Duration::from_secs(30),
            replication_lag_lsn: 50_000,
            recent_errors: 100,
            unhealthy_for: Duration::from_secs(30),
        };
        let score = policy.evaluate(&signals);
        assert_eq!(score.overall, 0);
        assert_eq!(score.class, HealthClass::Failed);
    }

    // --- failover planning: the five acceptance scenarios -----------------

    #[test]
    fn healthy_cluster_is_a_no_op() {
        let supervisor = ClusterSupervisor::default();
        let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
        let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
        // All members healthy by default.
        let signals = FakeSignals::new(CommitWatermark::new(1, 10));

        let plan = supervisor.plan_failovers(&members, &catalog, &signals);
        assert!(plan.is_empty(), "no failover when every owner is healthy");
    }

    #[test]
    fn degraded_owner_is_detected_but_not_failed_over() {
        // node-a is degraded (live, but lagging+erroring) — observable, but the
        // supervisor must not move ownership for a merely-degraded owner.
        let supervisor = ClusterSupervisor::default();
        let members = membership(&["CN=node-a", "CN=node-b"]);
        let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b"]);
        let signals = FakeSignals::new(CommitWatermark::new(1, 10)).with_member(
            "CN=node-a",
            MemberSignals {
                since_last_heartbeat: Duration::ZERO,
                replication_lag_lsn: 10_000,
                recent_errors: 20,
                unhealthy_for: Duration::ZERO,
            },
        );

        let score = supervisor.assess(&signals.member_signals(&ident("CN=node-a")));
        assert_eq!(score.class, HealthClass::Degraded, "detected as degraded");

        let plan = supervisor.plan_failovers(&members, &catalog, &signals);
        assert!(plan.is_empty(), "a degraded owner is not failed over");
    }

    #[test]
    fn safe_candidate_is_promoted_and_old_owner_is_fenced() {
        let supervisor = ClusterSupervisor::default();
        let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
            .with_member("CN=node-a", failed_signals())
            // node-b is healthy and fully caught up; node-c is caught up too but
            // we expect node-b chosen on identity tie-break.
            .with_catch_up("CN=node-b", 1, 10)
            .with_catch_up("CN=node-c", 1, 10);

        let (outcomes, plan) = supervisor.run_failovers(&members, &mut catalog, &signals);
        assert_eq!(plan.promotions.len(), 1);
        assert!(plan.blocked.is_empty());
        let promotion = &plan.promotions[0];
        assert_eq!(promotion.failed_owner, ident("CN=node-a"));
        assert_eq!(
            promotion.candidate,
            ident("CN=node-b"),
            "healthiest, tie -> lowest id"
        );

        let outcome = outcomes[0].as_ref().expect("promotion should activate");
        assert_eq!(outcome.kind, TransitionKind::Promote);
        assert!(
            outcome.fenced_old_owner(),
            "epoch bumped to fence old owner"
        );
        assert_eq!(outcome.new_owner, ident("CN=node-b"));

        // The catalog now makes node-b the owner at the bumped epoch, and the old
        // owner node-a is fenced from public writes under the old epoch.
        let range = catalog.range(&orders, RangeId::new(1)).unwrap();
        assert_eq!(range.owner(), &ident("CN=node-b"));
        assert_eq!(range.role_of(&ident("CN=node-b")), RangeRole::Owner);
        let err = catalog
            .admit_public_write(
                &ident("CN=node-a"),
                &orders,
                b"k",
                OwnershipEpoch::initial(),
            )
            .unwrap_err();
        assert!(matches!(
            err,
            RangeWriteReject::NotOwner { .. } | RangeWriteReject::StaleEpoch { .. }
        ));
    }

    #[test]
    fn unsafe_candidate_behind_watermark_is_rejected() {
        // node-a failed; its only replica node-b is a copy of the range but has
        // NOT caught up to the commit watermark (term 2 lsn 50 vs applied 2/49).
        // Promoting it could lose committed writes, so failover is blocked and
        // the catalog is untouched.
        let supervisor = ClusterSupervisor::default();
        let members = membership(&["CN=node-a", "CN=node-b"]);
        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b"]);
        let signals = FakeSignals::new(CommitWatermark::new(2, 50))
            .with_member("CN=node-a", failed_signals())
            .with_catch_up("CN=node-b", 2, 49); // one LSN short

        let (outcomes, plan) = supervisor.run_failovers(&members, &mut catalog, &signals);
        assert!(plan.promotions.is_empty(), "no safe promotion");
        assert!(outcomes.is_empty());
        assert_eq!(plan.blocked.len(), 1);
        assert_eq!(plan.blocked[0].reason, BlockedReason::NoSafeCandidate);
        assert_eq!(plan.blocked[0].failed_owner, ident("CN=node-a"));

        // Catalog is unchanged — node-a still owner at the initial epoch.
        let range = catalog.range(&orders, RangeId::new(1)).unwrap();
        assert_eq!(range.owner(), &ident("CN=node-a"));
        assert_eq!(range.epoch(), OwnershipEpoch::initial());
        assert_eq!(range.version(), CatalogVersion::initial());
    }

    #[test]
    fn flapping_owner_within_grace_period_is_not_failed_over() {
        // node-a's score is Failed, but it has only been unhealthy for 2s — well
        // inside the default 30s grace period. A flap must not move ownership.
        let supervisor = ClusterSupervisor::default();
        let members = membership(&["CN=node-a", "CN=node-b"]);
        let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b"]);
        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
            .with_member(
                "CN=node-a",
                MemberSignals {
                    since_last_heartbeat: Duration::from_secs(30),
                    replication_lag_lsn: 50_000,
                    recent_errors: 100,
                    unhealthy_for: Duration::from_secs(2), // inside grace
                },
            )
            .with_catch_up("CN=node-b", 1, 10);

        // The owner *is* scored Failed...
        let score = supervisor.assess(&signals.member_signals(&ident("CN=node-a")));
        assert_eq!(score.class, HealthClass::Failed);
        // ...but the grace period holds the failover back.
        let plan = supervisor.plan_failovers(&members, &catalog, &signals);
        assert!(plan.is_empty(), "flap inside grace period is damped");
    }

    #[test]
    fn unknown_candidate_progress_blocks_failover() {
        // node-a failed; node-b is a replica and a member, but the supervisor has
        // no catch-up evidence for it. Fail closed: blocked, not promoted.
        let supervisor = ClusterSupervisor::default();
        let members = membership(&["CN=node-a", "CN=node-b"]);
        let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b"]);
        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
            .with_member("CN=node-a", failed_signals());

        let plan = supervisor.plan_failovers(&members, &catalog, &signals);
        assert_eq!(plan.blocked.len(), 1);
        assert_eq!(plan.blocked[0].reason, BlockedReason::NoSafeCandidate);
    }

    #[test]
    fn non_replica_node_is_never_a_candidate() {
        // node-a failed and has NO replicas for the range. node-c is a healthy,
        // caught-up member — but it is not a replica, so it is never considered.
        let supervisor = ClusterSupervisor::default();
        let members = membership(&["CN=node-a", "CN=node-c"]);
        let (catalog, _orders) = catalog_with("CN=node-a", &[]);
        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
            .with_member("CN=node-a", failed_signals())
            .with_catch_up("CN=node-c", 9, 999);

        let plan = supervisor.plan_failovers(&members, &catalog, &signals);
        assert_eq!(plan.blocked.len(), 1, "no replica -> no safe candidate");
        assert!(plan.promotions.is_empty());
    }

    #[test]
    fn failed_replica_is_not_promoted() {
        // node-a failed; node-b is a caught-up replica but is itself failed.
        // Promoting it would just move the outage, so it is not selected.
        let supervisor = ClusterSupervisor::default();
        let members = membership(&["CN=node-a", "CN=node-b"]);
        let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b"]);
        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
            .with_member("CN=node-a", failed_signals())
            .with_member("CN=node-b", failed_signals())
            .with_catch_up("CN=node-b", 1, 10);

        let plan = supervisor.plan_failovers(&members, &catalog, &signals);
        assert_eq!(plan.blocked.len(), 1);
        assert_eq!(plan.blocked[0].reason, BlockedReason::NoSafeCandidate);
    }

    #[test]
    fn healthiest_caught_up_candidate_is_preferred() {
        // Both replicas are caught up, but node-c is healthier than node-b, so it
        // wins despite node-b sorting first by identity.
        let supervisor = ClusterSupervisor::default();
        let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
        let (catalog, _orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
            .with_member("CN=node-a", failed_signals())
            .with_member(
                "CN=node-b",
                MemberSignals {
                    since_last_heartbeat: Duration::from_secs(4),
                    replication_lag_lsn: 0,
                    recent_errors: 0,
                    unhealthy_for: Duration::ZERO,
                },
            ) // degraded-ish liveness, lower score
            .with_member("CN=node-c", MemberSignals::healthy())
            .with_catch_up("CN=node-b", 1, 10)
            .with_catch_up("CN=node-c", 1, 10);

        let plan = supervisor.plan_failovers(&members, &catalog, &signals);
        assert_eq!(plan.promotions.len(), 1);
        assert_eq!(
            plan.promotions[0].candidate,
            ident("CN=node-c"),
            "healthier candidate preferred over identity tie-break",
        );
    }

    #[test]
    fn promoted_owner_drops_itself_from_the_replica_set() {
        let supervisor = ClusterSupervisor::default();
        let members = membership(&["CN=node-a", "CN=node-b", "CN=node-c"]);
        let (mut catalog, orders) = catalog_with("CN=node-a", &["CN=node-b", "CN=node-c"]);
        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
            .with_member("CN=node-a", failed_signals())
            .with_catch_up("CN=node-b", 1, 10)
            .with_catch_up("CN=node-c", 1, 10);

        supervisor.run_failovers(&members, &mut catalog, &signals);
        let range = catalog.range(&orders, RangeId::new(1)).unwrap();
        assert_eq!(range.owner(), &ident("CN=node-b"));
        // node-b is no longer in its own replica set; node-c remains; the fenced
        // old owner node-a is not re-added.
        assert!(!range.replicas().contains(&ident("CN=node-b")));
        assert!(range.replicas().contains(&ident("CN=node-c")));
        assert!(!range.replicas().contains(&ident("CN=node-a")));
    }

    #[test]
    fn assess_members_scores_every_authorized_member() {
        let supervisor = ClusterSupervisor::default();
        let members = membership(&["CN=node-a", "CN=node-b"]);
        let signals = FakeSignals::new(CommitWatermark::new(1, 10))
            .with_member("CN=node-a", failed_signals());

        let scores = supervisor.assess_members(&members, &signals);
        assert_eq!(scores.len(), 2);
        assert_eq!(scores[&ident("CN=node-a")].class, HealthClass::Failed);
        assert_eq!(scores[&ident("CN=node-b")].class, HealthClass::Healthy);
    }
}