Skip to main content

net/adapter/net/compute/
group_coord.rs

1//! Group coordinator — shared coordination logic for daemon groups.
2//!
3//! Extracted from `ReplicaGroup` so that both `ReplicaGroup` and `ForkGroup`
4//! can reuse the same load balancing, health tracking, member management,
5//! and scaling mechanics without duplication.
6
7use std::collections::{HashMap, HashSet};
8
9use crate::adapter::net::behavior::capability::CapabilityFilter;
10use crate::adapter::net::behavior::loadbalance::{
11    Endpoint, HealthStatus, LoadBalancer, RequestContext, Strategy,
12};
13use crate::adapter::net::behavior::metadata::NodeId;
14use crate::adapter::net::behavior::placement::{Artifact, PlacementFilter, TieBreakContext};
15use crate::adapter::net::compute::daemon::DaemonError;
16use crate::adapter::net::compute::scheduler::{
17    PlacementDecision, PlacementReason, Scheduler, SchedulerError,
18};
19
20// ── Member info ──────────────────────────────────────────────────────────────
21
22/// Per-member metadata within a group.
23#[derive(Debug, Clone)]
24pub struct MemberInfo {
25    /// Member index (0-based).
26    pub index: u8,
27    /// The member's origin_hash (from its keypair).
28    pub origin_hash: u64,
29    /// Node where this member is placed.
30    pub node_id: u64,
31    /// The member's entity ID bytes (used as LoadBalancer NodeId).
32    pub entity_id_bytes: NodeId,
33    /// Whether this member is currently healthy.
34    pub healthy: bool,
35}
36
37// ── Group health ─────────────────────────────────────────────────────────────
38
39/// Aggregate health of a daemon group.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum GroupHealth {
42    /// All members healthy.
43    Healthy,
44    /// Some members down but at least one healthy.
45    Degraded {
46        /// Number of healthy members.
47        healthy: u8,
48        /// Total member count.
49        total: u8,
50    },
51    /// All members down.
52    Dead,
53}
54
55// ── Errors ───────────────────────────────────────────────────────────────────
56
57/// Errors from group operations.
58#[derive(Debug, Clone, PartialEq, Eq)]
59pub enum GroupError {
60    /// No healthy member available for routing.
61    NoHealthyMember,
62    /// Placement failed.
63    PlacementFailed(String),
64    /// Registry operation failed.
65    RegistryFailed(String),
66    /// Invalid configuration.
67    InvalidConfig(String),
68}
69
70impl std::fmt::Display for GroupError {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        match self {
73            Self::NoHealthyMember => write!(f, "no healthy member available"),
74            Self::PlacementFailed(msg) => write!(f, "placement failed: {}", msg),
75            Self::RegistryFailed(msg) => write!(f, "registry operation failed: {}", msg),
76            Self::InvalidConfig(msg) => write!(f, "invalid config: {}", msg),
77        }
78    }
79}
80
81impl std::error::Error for GroupError {}
82
83impl From<SchedulerError> for GroupError {
84    fn from(e: SchedulerError) -> Self {
85        Self::PlacementFailed(e.to_string())
86    }
87}
88
89impl From<DaemonError> for GroupError {
90    fn from(e: DaemonError) -> Self {
91        Self::RegistryFailed(e.to_string())
92    }
93}
94
95// ── Group coordinator ────────────────────────────────────────────────────────
96
97/// Shared coordination logic for daemon groups.
98///
99/// Manages the `LoadBalancer`, member tracking, health aggregation,
100/// and routing. Both `ReplicaGroup` and `ForkGroup` own a coordinator
101/// and delegate group-level operations to it.
102pub struct GroupCoordinator {
103    /// Per-member state, indexed by member index.
104    pub members: Vec<MemberInfo>,
105    /// Reverse index: `entity_id_bytes -> origin_hash`.
106    ///
107    /// Pre-fix `origin_hash_for_entity_id` (called per routed event
108    /// after the load balancer picks an endpoint) ran a linear scan
109    /// over `members`, comparing 32-byte `NodeId`s. For a 100-member
110    /// group at 100K events/sec, that's 10M × 32-byte equality checks
111    /// per second just to translate an LB-returned entity id back
112    /// into the origin_hash the registry needs.
113    ///
114    /// Maintained alongside `members` on `add_member` / `remove_last`
115    /// / `update_member_placement` so the per-event lookup becomes a
116    /// single O(1) HashMap probe. Per perf #152.
117    origin_hash_by_entity_id: HashMap<NodeId, u64>,
118    /// Load balancer for routing events to healthy members.
119    pub lb: LoadBalancer,
120}
121
122impl GroupCoordinator {
123    /// Create a new empty coordinator with the given LB strategy.
124    pub fn new(strategy: Strategy) -> Self {
125        Self {
126            members: Vec::new(),
127            origin_hash_by_entity_id: HashMap::new(),
128            lb: LoadBalancer::with_strategy(strategy),
129        }
130    }
131
132    /// Add a member that has already been registered in the DaemonRegistry.
133    pub fn add_member(&mut self, info: MemberInfo) {
134        self.lb.add_endpoint(Endpoint::new(info.entity_id_bytes));
135        self.origin_hash_by_entity_id
136            .insert(info.entity_id_bytes, info.origin_hash);
137        self.members.push(info);
138    }
139
140    /// Remove and return the highest-index member.
141    ///
142    /// Also removes from the LoadBalancer. Caller is responsible for
143    /// unregistering from DaemonRegistry.
144    pub fn remove_last(&mut self) -> Option<MemberInfo> {
145        let info = self.members.pop()?;
146        self.lb.remove_endpoint(&info.entity_id_bytes);
147        self.origin_hash_by_entity_id.remove(&info.entity_id_bytes);
148        Some(info)
149    }
150
151    /// Route an event to the best available member.
152    ///
153    /// Returns the `origin_hash` for delivery via `DaemonRegistry::deliver()`.
154    pub fn route_event(&self, ctx: &RequestContext) -> Result<u64, GroupError> {
155        let selection = self
156            .lb
157            .select(ctx)
158            .map_err(|_| GroupError::NoHealthyMember)?;
159
160        self.origin_hash_for_entity_id(&selection.node_id)
161            .ok_or(GroupError::NoHealthyMember)
162    }
163
164    /// Mark a member unhealthy in the LoadBalancer.
165    pub fn mark_unhealthy(&mut self, index: u8) {
166        // Members are pushed in dense `0..n` order at construction
167        // and scale-up time, so `member.index` equals its position in
168        // `self.members`. Direct array access is O(1) instead of the
169        // legacy linear `find` over `member.index == index`. Per
170        // perf #158. The `index == idx` re-check guards a future
171        // change that violates the dense-index invariant — if the
172        // assumption ever breaks, the slow path of "do nothing" is
173        // strictly safer than acting on the wrong member.
174        if let Some(member) = self.members.get_mut(index as usize) {
175            if member.index == index {
176                member.healthy = false;
177                self.lb
178                    .update_health(&member.entity_id_bytes, HealthStatus::Unhealthy);
179            }
180        }
181    }
182
183    /// Mark a member healthy in the LoadBalancer.
184    pub fn mark_healthy(&mut self, index: u8) {
185        // See `mark_unhealthy` for the dense-index invariant.
186        // Per perf #158.
187        if let Some(member) = self.members.get_mut(index as usize) {
188            if member.index == index {
189                member.healthy = true;
190                self.lb
191                    .update_health(&member.entity_id_bytes, HealthStatus::Healthy);
192            }
193        }
194    }
195
196    /// Update a member's placement after failure recovery.
197    ///
198    /// Updates the node_id, re-marks healthy, and updates the LB endpoint.
199    pub fn update_member_placement(
200        &mut self,
201        index: u8,
202        new_node_id: u64,
203        new_entity_id_bytes: NodeId,
204    ) {
205        // Same dense-index invariant as `mark_healthy`. Per perf #158.
206        if let Some(member) = self.members.get_mut(index as usize) {
207            if member.index != index {
208                return;
209            }
210            // Remove old endpoint, add new one
211            self.lb.remove_endpoint(&member.entity_id_bytes);
212            let old_entity_id = member.entity_id_bytes;
213            let origin_hash = member.origin_hash;
214            member.node_id = new_node_id;
215            member.entity_id_bytes = new_entity_id_bytes;
216            member.healthy = true;
217            self.lb.add_endpoint(Endpoint::new(new_entity_id_bytes));
218            // Keep `origin_hash_by_entity_id` consistent with the
219            // member's new `entity_id_bytes` so the perf #152 O(1)
220            // route_event lookup stays valid after recovery.
221            self.origin_hash_by_entity_id.remove(&old_entity_id);
222            self.origin_hash_by_entity_id
223                .insert(new_entity_id_bytes, origin_hash);
224        }
225    }
226
227    /// Re-mark members on a recovered node as healthy, but only if they
228    /// are still registered in the `DaemonRegistry`. If `on_node_failure()`
229    /// unregistered a member and replacement failed, marking it healthy
230    /// would route events to an origin_hash that no longer exists.
231    pub fn on_node_recovery(
232        &mut self,
233        recovered_node_id: u64,
234        registry: &crate::adapter::net::compute::registry::DaemonRegistry,
235    ) {
236        for member in &mut self.members {
237            if member.node_id == recovered_node_id
238                && !member.healthy
239                && registry.contains(member.origin_hash)
240            {
241                member.healthy = true;
242                self.lb
243                    .update_health(&member.entity_id_bytes, HealthStatus::Healthy);
244            }
245        }
246    }
247
248    /// Aggregate health of the group.
249    pub fn health(&self) -> GroupHealth {
250        let healthy_count = self.members.iter().filter(|m| m.healthy).count();
251        let total_count = self.members.len();
252        // Compare at full precision before saturating to u8 for the return value.
253        let healthy = healthy_count.min(u8::MAX as usize) as u8;
254        let total = total_count.min(u8::MAX as usize) as u8;
255        if healthy_count == 0 {
256            GroupHealth::Dead
257        } else if healthy_count == total_count {
258            GroupHealth::Healthy
259        } else {
260            GroupHealth::Degraded { healthy, total }
261        }
262    }
263
264    /// Get all member info.
265    pub fn members(&self) -> &[MemberInfo] {
266        &self.members
267    }
268
269    /// Number of members.
270    pub fn member_count(&self) -> u8 {
271        self.members.len().min(u8::MAX as usize) as u8
272    }
273
274    /// Number of healthy members.
275    pub fn healthy_count(&self) -> u8 {
276        self.members
277            .iter()
278            .filter(|m| m.healthy)
279            .count()
280            .min(u8::MAX as usize) as u8
281    }
282
283    /// Indices of members on a given node.
284    pub fn members_on_node(&self, node_id: u64) -> Vec<u8> {
285        self.members
286            .iter()
287            .filter(|m| m.node_id == node_id)
288            .map(|m| m.index)
289            .collect()
290    }
291
292    /// Look up origin_hash from a LoadBalancer entity ID.
293    ///
294    /// O(1) via `origin_hash_by_entity_id` instead of the legacy
295    /// linear scan over `members` (32-byte `NodeId` equality per
296    /// candidate). Per perf #152.
297    fn origin_hash_for_entity_id(&self, entity_id: &NodeId) -> Option<u64> {
298        self.origin_hash_by_entity_id.get(entity_id).copied()
299    }
300
301    /// Place a daemon with best-effort spread across nodes.
302    pub fn place_with_spread(
303        scheduler: &Scheduler,
304        requirements: &CapabilityFilter,
305        exclude: &HashSet<u64>,
306    ) -> Result<PlacementDecision, GroupError> {
307        let placement = scheduler.place(requirements)?;
308        if !exclude.contains(&placement.node_id) {
309            return Ok(placement);
310        }
311        // Primary placement is excluded; query candidates and pick the first non-excluded.
312        let candidates = scheduler.query_candidates(requirements);
313        for node_id in candidates {
314            if !exclude.contains(&node_id) {
315                return Ok(PlacementDecision {
316                    node_id,
317                    reason: PlacementReason::FirstMatch,
318                });
319            }
320        }
321        // All candidates are in the exclusion set — no valid placement exists.
322        Err(GroupError::PlacementFailed(
323            "all candidate nodes are excluded by spread constraint".into(),
324        ))
325    }
326
327    /// Phase G slice 4 — score-based v2 of [`Self::place_with_spread`].
328    /// Delegates to [`Scheduler::select_member_node`] so the same
329    /// scoring + §7-LOCKED tie-breaker that backs migration placement
330    /// also drives replica / fork / standby member placement.
331    ///
332    /// The returned `PlacementDecision` carries
333    /// [`PlacementReason::BestScore`] so observers can distinguish
334    /// the v2 path from the legacy first-match path.
335    ///
336    /// `requirements` narrows the candidate pool via the existing
337    /// `CapabilityFilter`; `placement` scores survivors; `exclude`
338    /// drops already-placed members (for spread); `tie_break`
339    /// resolves equal scores. Returns
340    /// [`GroupError::PlacementFailed`] when every candidate is
341    /// excluded or filter-vetoed — same observable failure mode as
342    /// `place_with_spread`'s "all candidates excluded" branch.
343    ///
344    /// Additive — does not change `place_with_spread`'s behavior.
345    /// Group modules opt in by calling this helper from their
346    /// `*_with_placement` variants in subsequent slices.
347    pub fn place_member(
348        scheduler: &Scheduler,
349        artifact: &Artifact<'_>,
350        requirements: &CapabilityFilter,
351        exclude: &HashSet<u64>,
352        placement: &dyn PlacementFilter,
353        tie_break: &TieBreakContext<'_>,
354    ) -> Result<PlacementDecision, GroupError> {
355        let node_id = scheduler
356            .select_member_node(artifact, requirements, exclude, placement, tie_break)
357            .ok_or_else(|| {
358                GroupError::PlacementFailed(
359                    "no candidate satisfied placement filter (every node excluded or vetoed)"
360                        .into(),
361                )
362            })?;
363        Ok(PlacementDecision {
364            node_id,
365            reason: PlacementReason::BestScore,
366        })
367    }
368}
369
370impl std::fmt::Debug for GroupCoordinator {
371    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
372        f.debug_struct("GroupCoordinator")
373            .field("members", &self.members.len())
374            .field("healthy", &self.healthy_count())
375            .finish()
376    }
377}
378
379// ── Tests ────────────────────────────────────────────────────────────────────
380
381#[cfg(test)]
382mod tests {
383    use super::*;
384    use crate::adapter::net::behavior::capability::{CapabilityAnnouncement, CapabilitySet};
385    use crate::adapter::net::behavior::placement::{NodeId as PlacementNodeId, ResourceAxis};
386    use std::sync::Arc;
387
388    fn make_scheduler(node_ids: &[u64]) -> Scheduler {
389        use crate::adapter::net::behavior::fold::{capability_bridge, CapabilityFold, Fold};
390        let fold: Arc<Fold<CapabilityFold>> =
391            Arc::new(Fold::with_sweep_interval(std::time::Duration::ZERO));
392        let eid = crate::adapter::net::identity::EntityId::from_bytes([0u8; 32]);
393        for &id in node_ids {
394            capability_bridge::apply_legacy_announcement(
395                &fold,
396                CapabilityAnnouncement::new(id, eid.clone(), 1, CapabilitySet::new()),
397            )
398            .expect("apply legacy announcement in fixture");
399        }
400        // Local node id = first in list (or 0xFFFF if list is empty).
401        let local = node_ids.first().copied().unwrap_or(0xFFFF);
402        Scheduler::new(fold, local, CapabilitySet::new())
403    }
404
405    /// Synthetic placement filter — a fixed score for every candidate
406    /// except those listed in `veto`, which return `None` (hard veto).
407    struct FixedScore {
408        score: f32,
409        veto: Vec<u64>,
410    }
411
412    impl PlacementFilter for FixedScore {
413        fn placement_score(
414            &self,
415            target: &PlacementNodeId,
416            _artifact: &Artifact<'_>,
417        ) -> Option<f32> {
418            if self.veto.contains(target) {
419                None
420            } else {
421                Some(self.score)
422            }
423        }
424    }
425
426    fn empty_caps() -> CapabilitySet {
427        CapabilitySet::new()
428    }
429
430    fn daemon_artifact<'a>(
431        required: &'a CapabilitySet,
432        optional: &'a CapabilitySet,
433    ) -> Artifact<'a> {
434        Artifact::Daemon {
435            daemon_id: [0u8; 32],
436            required,
437            optional,
438        }
439    }
440
441    /// `place_member` returns a `PlacementDecision` carrying
442    /// `BestScore` reason for every successful selection — pins the
443    /// observability contract that distinguishes v2 from v1.
444    #[test]
445    fn place_member_stamps_best_score_reason() {
446        let sched = make_scheduler(&[0x1111, 0x2222, 0x3333]);
447        let req = empty_caps();
448        let opt = empty_caps();
449        let artifact = daemon_artifact(&req, &opt);
450        let placement = FixedScore {
451            score: 0.7,
452            veto: vec![],
453        };
454        let tb = TieBreakContext {
455            rtt_lookup: None,
456            resource_axis: ResourceAxis::Compute,
457        };
458        let exclude = HashSet::new();
459
460        let decision = GroupCoordinator::place_member(
461            &sched,
462            &artifact,
463            &CapabilityFilter::default(),
464            &exclude,
465            &placement,
466            &tb,
467        )
468        .expect("placement should succeed with three eligible candidates");
469
470        assert_eq!(decision.reason, PlacementReason::BestScore);
471        // All three score 0.7; lex-NodeId tie-breaker picks lowest.
472        assert_eq!(decision.node_id, 0x1111);
473    }
474
475    /// `place_member` honors the exclusion set — pins spread-aware
476    /// behavior the replica / fork / standby groups depend on. The
477    /// excluded node is never returned even when its score would be
478    /// the best.
479    #[test]
480    fn place_member_excludes_already_placed_nodes() {
481        let sched = make_scheduler(&[0x1111, 0x2222, 0x3333]);
482        let req = empty_caps();
483        let opt = empty_caps();
484        let artifact = daemon_artifact(&req, &opt);
485        let placement = FixedScore {
486            score: 0.5,
487            veto: vec![],
488        };
489        let tb = TieBreakContext {
490            rtt_lookup: None,
491            resource_axis: ResourceAxis::Compute,
492        };
493
494        let mut exclude = HashSet::new();
495        exclude.insert(0x1111u64);
496
497        let decision = GroupCoordinator::place_member(
498            &sched,
499            &artifact,
500            &CapabilityFilter::default(),
501            &exclude,
502            &placement,
503            &tb,
504        )
505        .expect("placement should succeed with two remaining candidates");
506
507        assert_eq!(decision.reason, PlacementReason::BestScore);
508        assert_eq!(decision.node_id, 0x2222);
509    }
510
511    /// Filter veto + exclusion compose: every candidate either
512    /// excluded or vetoed → `PlacementFailed`. Same observable
513    /// failure mode as `place_with_spread`'s "all candidates
514    /// excluded" branch.
515    #[test]
516    fn place_member_returns_placement_failed_when_all_vetoed_or_excluded() {
517        let sched = make_scheduler(&[0x1111, 0x2222, 0x3333]);
518        let req = empty_caps();
519        let opt = empty_caps();
520        let artifact = daemon_artifact(&req, &opt);
521        // Veto 0x2222; exclude 0x1111 and 0x3333.
522        let placement = FixedScore {
523            score: 1.0,
524            veto: vec![0x2222],
525        };
526        let tb = TieBreakContext {
527            rtt_lookup: None,
528            resource_axis: ResourceAxis::Compute,
529        };
530        let mut exclude = HashSet::new();
531        exclude.insert(0x1111u64);
532        exclude.insert(0x3333u64);
533
534        let err = GroupCoordinator::place_member(
535            &sched,
536            &artifact,
537            &CapabilityFilter::default(),
538            &exclude,
539            &placement,
540            &tb,
541        )
542        .expect_err("every candidate is filtered out");
543
544        match err {
545            GroupError::PlacementFailed(msg) => {
546                assert!(
547                    msg.contains("excluded or vetoed"),
548                    "error message should explain why placement failed: {msg}"
549                );
550            }
551            other => panic!("expected PlacementFailed, got {other:?}"),
552        }
553    }
554
555    /// `place_member` ranks candidates by score: a higher-scoring
556    /// node wins over a lower-scoring one even when lex order would
557    /// pick the lower-scoring node first. Pins the v2 score-based
558    /// contract — v1 (`place_with_spread`) returns the first match
559    /// in index order, which can deviate from the score-best pick.
560    #[test]
561    fn place_member_picks_highest_scoring_over_lex_order() {
562        let sched = make_scheduler(&[0x1111, 0x2222, 0x3333]);
563        let req = empty_caps();
564        let opt = empty_caps();
565        let artifact = daemon_artifact(&req, &opt);
566
567        // Synthetic score: 0x1111 → 0.1, 0x2222 → 0.9, 0x3333 → 0.5.
568        struct ScoredFilter;
569        impl PlacementFilter for ScoredFilter {
570            fn placement_score(&self, target: &PlacementNodeId, _: &Artifact<'_>) -> Option<f32> {
571                Some(match *target {
572                    0x1111 => 0.1,
573                    0x2222 => 0.9,
574                    0x3333 => 0.5,
575                    _ => 0.0,
576                })
577            }
578        }
579
580        let tb = TieBreakContext {
581            rtt_lookup: None,
582            resource_axis: ResourceAxis::Compute,
583        };
584        let exclude = HashSet::new();
585
586        let decision = GroupCoordinator::place_member(
587            &sched,
588            &artifact,
589            &CapabilityFilter::default(),
590            &exclude,
591            &ScoredFilter,
592            &tb,
593        )
594        .expect("placement should succeed");
595
596        assert_eq!(decision.reason, PlacementReason::BestScore);
597        assert_eq!(
598            decision.node_id, 0x2222,
599            "highest scorer wins, NOT the lex-lowest"
600        );
601    }
602
603    /// Empty candidate pool (no nodes in index) → `PlacementFailed`.
604    #[test]
605    fn place_member_returns_placement_failed_for_empty_index() {
606        let sched = make_scheduler(&[]);
607        let req = empty_caps();
608        let opt = empty_caps();
609        let artifact = daemon_artifact(&req, &opt);
610        let placement = FixedScore {
611            score: 1.0,
612            veto: vec![],
613        };
614        let tb = TieBreakContext {
615            rtt_lookup: None,
616            resource_axis: ResourceAxis::Compute,
617        };
618        let exclude = HashSet::new();
619
620        let err = GroupCoordinator::place_member(
621            &sched,
622            &artifact,
623            &CapabilityFilter::default(),
624            &exclude,
625            &placement,
626            &tb,
627        )
628        .expect_err("empty index → placement failure");
629
630        assert!(matches!(err, GroupError::PlacementFailed(_)));
631    }
632
633    /// `place_member` is additive — calling `place_with_spread` on
634    /// the same scheduler still works. Covers the "no callers
635    /// changed yet" guarantee for slice 4.
636    #[test]
637    fn place_with_spread_unchanged_after_place_member_added() {
638        let sched = make_scheduler(&[0x1111, 0x2222, 0x3333]);
639        let exclude = HashSet::new();
640        let decision =
641            GroupCoordinator::place_with_spread(&sched, &CapabilityFilter::default(), &exclude)
642                .expect("legacy path still works");
643        // Local node (first in list) preferred.
644        assert_eq!(decision.node_id, 0x1111);
645        assert_eq!(decision.reason, PlacementReason::LocalPreferred);
646    }
647
648    fn make_member(index: u8, origin_hash: u64) -> MemberInfo {
649        // `entity_id_bytes` is the routing key the LB returns; encode
650        // the index in the first byte so each member has a distinct
651        // 32-byte NodeId. `origin_hash` is what `origin_hash_for_entity_id`
652        // must round-trip back to.
653        let mut entity = [0u8; 32];
654        entity[0] = index;
655        MemberInfo {
656            index,
657            origin_hash,
658            node_id: 0xA000 | index as u64,
659            entity_id_bytes: entity,
660            healthy: true,
661        }
662    }
663
664    /// Pin #152: `origin_hash_for_entity_id` must resolve via the
665    /// O(1) reverse index and stay correct after `add_member` /
666    /// `remove_last` / `update_member_placement` mutations. Regression
667    /// guard against re-introducing the linear `members.iter().find()`
668    /// or letting the reverse index drift out of sync with `members`.
669    #[test]
670    fn origin_hash_lookup_uses_reverse_index_across_mutations() {
671        let mut coord = GroupCoordinator::new(Strategy::RoundRobin);
672        let m0 = make_member(0, 0xA1);
673        let m1 = make_member(1, 0xA2);
674        let m2 = make_member(2, 0xA3);
675        coord.add_member(m0.clone());
676        coord.add_member(m1.clone());
677        coord.add_member(m2.clone());
678
679        assert_eq!(
680            coord.origin_hash_for_entity_id(&m0.entity_id_bytes),
681            Some(0xA1)
682        );
683        assert_eq!(
684            coord.origin_hash_for_entity_id(&m1.entity_id_bytes),
685            Some(0xA2)
686        );
687        assert_eq!(
688            coord.origin_hash_for_entity_id(&m2.entity_id_bytes),
689            Some(0xA3)
690        );
691        // Unknown entity_id resolves to None.
692        let mut stranger = [0u8; 32];
693        stranger[0] = 0xFE;
694        assert_eq!(coord.origin_hash_for_entity_id(&stranger), None);
695
696        // Remove the tail, lookup for it must clear.
697        let removed = coord.remove_last().expect("pop returns the tail");
698        assert_eq!(removed.origin_hash, 0xA3);
699        assert_eq!(
700            coord.origin_hash_for_entity_id(&m2.entity_id_bytes),
701            None,
702            "remove_last must clear the reverse-index entry"
703        );
704
705        // Update the placement of member 0 to a fresh entity_id —
706        // the reverse index must drop the stale key AND insert the
707        // new one.
708        let mut new_entity = [0u8; 32];
709        new_entity[0] = 0xC0;
710        new_entity[1] = 0x01;
711        coord.update_member_placement(0, 0xBEEF, new_entity);
712        assert_eq!(
713            coord.origin_hash_for_entity_id(&m0.entity_id_bytes),
714            None,
715            "stale entity_id must be evicted from the reverse index"
716        );
717        assert_eq!(
718            coord.origin_hash_for_entity_id(&new_entity),
719            Some(0xA1),
720            "new entity_id must resolve to the same origin_hash"
721        );
722    }
723
724    /// Pin #158: `mark_healthy` / `mark_unhealthy` must reach the
725    /// member at `members[index]` (dense-index invariant from the
726    /// `0..n` construction loops in replica_group / fork_group /
727    /// standby_group) and ignore an `index` that lies past the end of
728    /// the vec rather than spinning over the legacy `find` linear scan.
729    /// Regression guard against a Vec reordering that breaks the
730    /// dense-index assumption.
731    #[test]
732    fn mark_health_resolves_via_direct_index() {
733        let mut coord = GroupCoordinator::new(Strategy::RoundRobin);
734        coord.add_member(make_member(0, 0xA1));
735        coord.add_member(make_member(1, 0xA2));
736        coord.add_member(make_member(2, 0xA3));
737
738        // All members start healthy.
739        assert!(coord.members[1].healthy);
740
741        coord.mark_unhealthy(1);
742        assert!(!coord.members[1].healthy);
743
744        coord.mark_healthy(1);
745        assert!(coord.members[1].healthy);
746
747        // Out-of-range index is a no-op (not a panic, not silently
748        // mutating member 0).
749        coord.mark_unhealthy(99);
750        assert!(coord.members[0].healthy);
751        assert!(coord.members[1].healthy);
752        assert!(coord.members[2].healthy);
753    }
754}