Skip to main content

net/adapter/net/behavior/
loadbalance.rs

1//! Phase 4G: Distributed Load Balancing (LOAD-BALANCE)
2//!
3//! This module provides distributed load balancing across the Net network:
4//! - Multiple load balancing strategies (round-robin, weighted, least-connections, etc.)
5//! - Health-aware routing with automatic failover
6//! - Load metrics collection and aggregation
7//! - Adaptive load balancing based on real-time conditions
8
9use arc_swap::ArcSwap;
10use dashmap::DashMap;
11use parking_lot::{Mutex, RwLock};
12use serde::{Deserialize, Serialize};
13use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use super::metadata::NodeId;
18
19/// Load balancing strategy
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
21#[serde(rename_all = "snake_case")]
22pub enum Strategy {
23    /// Round-robin selection
24    #[default]
25    RoundRobin,
26    /// Weighted round-robin based on node capacity
27    WeightedRoundRobin,
28    /// Select node with fewest active connections
29    LeastConnections,
30    /// Weighted least connections
31    WeightedLeastConnections,
32    /// Random selection
33    Random,
34    /// Weighted random selection
35    WeightedRandom,
36    /// Consistent hashing for sticky sessions
37    ConsistentHash,
38    /// Select based on lowest latency
39    LeastLatency,
40    /// Select based on lowest resource utilization
41    LeastLoad,
42    /// Power of two random choices
43    PowerOfTwo,
44    /// Adaptive strategy based on conditions
45    Adaptive,
46}
47
48/// Health status of a node
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
50#[serde(rename_all = "lowercase")]
51pub enum HealthStatus {
52    /// Node is healthy and accepting traffic
53    #[default]
54    Healthy,
55    /// Node is degraded but still accepting traffic
56    Degraded,
57    /// Node is unhealthy and should not receive traffic
58    Unhealthy,
59    /// Node health is unknown
60    Unknown,
61}
62
63impl HealthStatus {
64    /// Check if node can receive traffic
65    pub fn can_receive_traffic(&self) -> bool {
66        matches!(self, HealthStatus::Healthy | HealthStatus::Degraded)
67    }
68
69    /// Get weight multiplier for this health status
70    pub fn weight_multiplier(&self) -> f64 {
71        match self {
72            HealthStatus::Healthy => 1.0,
73            HealthStatus::Degraded => 0.5,
74            HealthStatus::Unhealthy => 0.0,
75            HealthStatus::Unknown => 0.25,
76        }
77    }
78}
79
80/// Load metrics for a node
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct LoadMetrics {
83    /// CPU utilization (0.0 - 1.0)
84    pub cpu_usage: f64,
85    /// Memory utilization (0.0 - 1.0)
86    pub memory_usage: f64,
87    /// Active connections count
88    pub active_connections: u32,
89    /// Requests per second
90    pub requests_per_second: f64,
91    /// Average response time in milliseconds
92    pub avg_response_time_ms: f64,
93    /// Error rate (0.0 - 1.0)
94    pub error_rate: f64,
95    /// Queue depth
96    pub queue_depth: u32,
97    /// Bandwidth utilization (0.0 - 1.0)
98    pub bandwidth_usage: f64,
99    /// Last update timestamp (microseconds since epoch)
100    pub updated_at: u64,
101}
102
103impl Default for LoadMetrics {
104    fn default() -> Self {
105        Self {
106            cpu_usage: 0.0,
107            memory_usage: 0.0,
108            active_connections: 0,
109            requests_per_second: 0.0,
110            avg_response_time_ms: 0.0,
111            error_rate: 0.0,
112            queue_depth: 0,
113            bandwidth_usage: 0.0,
114            updated_at: 0,
115        }
116    }
117}
118
119impl LoadMetrics {
120    /// Calculate composite load score (0.0 = no load, 1.0 = fully loaded)
121    pub fn load_score(&self) -> f64 {
122        // Weighted average of different metrics
123        let cpu_weight = 0.3;
124        let memory_weight = 0.2;
125        let connections_weight = 0.2;
126        let response_time_weight = 0.15;
127        let error_weight = 0.15;
128
129        // Normalize response time (assume 1000ms = fully loaded)
130        let normalized_response_time = (self.avg_response_time_ms / 1000.0).min(1.0);
131
132        cpu_weight * self.cpu_usage
133            + memory_weight * self.memory_usage
134            + connections_weight * (self.active_connections as f64 / 10000.0).min(1.0)
135            + response_time_weight * normalized_response_time
136            + error_weight * self.error_rate
137    }
138
139    /// Check if node is overloaded
140    pub fn is_overloaded(&self) -> bool {
141        self.cpu_usage > 0.9
142            || self.memory_usage > 0.95
143            || self.error_rate > 0.1
144            || self.queue_depth > 1000
145    }
146}
147
148/// Node endpoint information
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct Endpoint {
151    /// Node ID
152    pub node_id: NodeId,
153    /// Weight for weighted strategies (higher = more traffic)
154    pub weight: u32,
155    /// Health status
156    pub health: HealthStatus,
157    /// Load metrics
158    pub metrics: LoadMetrics,
159    /// Tags for filtering
160    pub tags: Vec<String>,
161    /// Priority (lower = higher priority for failover)
162    pub priority: u32,
163    /// Whether endpoint is enabled
164    pub enabled: bool,
165    /// Zone/region for locality-aware routing
166    pub zone: Option<String>,
167}
168
169impl Endpoint {
170    /// Create a new endpoint
171    pub fn new(node_id: NodeId) -> Self {
172        Self {
173            node_id,
174            weight: 100,
175            health: HealthStatus::Healthy,
176            metrics: LoadMetrics::default(),
177            tags: Vec::new(),
178            priority: 0,
179            enabled: true,
180            zone: None,
181        }
182    }
183
184    /// Set weight
185    pub fn with_weight(mut self, weight: u32) -> Self {
186        self.weight = weight;
187        self
188    }
189
190    /// Set zone
191    pub fn with_zone(mut self, zone: impl Into<String>) -> Self {
192        self.zone = Some(zone.into());
193        self
194    }
195
196    /// Add tag
197    pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
198        self.tags.push(tag.into());
199        self
200    }
201
202    /// Set priority
203    pub fn with_priority(mut self, priority: u32) -> Self {
204        self.priority = priority;
205        self
206    }
207
208    /// Effective weight considering health
209    pub fn effective_weight(&self) -> f64 {
210        if !self.enabled {
211            return 0.0;
212        }
213        self.weight as f64 * self.health.weight_multiplier()
214    }
215
216    /// Check if endpoint can receive traffic
217    pub fn is_available(&self) -> bool {
218        self.enabled && self.health.can_receive_traffic()
219    }
220}
221
222/// Endpoint state tracked by the load balancer
223struct EndpointState {
224    /// Immutable endpoint config (node_id, weight, tags, zone, priority)
225    node_id: NodeId,
226    weight: u32,
227    tags: Vec<String>,
228    zone: Option<String>,
229    priority: u32,
230    /// Mutable health status
231    health: RwLock<HealthStatus>,
232    /// Mutable metrics.
233    ///
234    /// Per perf #149, switched `RwLock<LoadMetrics>` →
235    /// `ArcSwap<LoadMetrics>` so the per-event `load_score()` read
236    /// in every selection strategy is one lock-free Acquire load
237    /// instead of a parking_lot read + full struct clone. Updates
238    /// (operator-cadence) call `metrics.store(Arc::new(...))`.
239    metrics: ArcSwap<LoadMetrics>,
240    /// Whether endpoint is enabled
241    enabled: std::sync::atomic::AtomicBool,
242    /// Current connection count
243    connections: AtomicU32,
244    /// Total requests served
245    total_requests: AtomicU64,
246    /// Failed requests
247    failed_requests: AtomicU64,
248    /// Consecutive failures
249    consecutive_failures: AtomicU32,
250    /// Circuit breaker state
251    circuit_open: std::sync::atomic::AtomicBool,
252    /// Circuit open time
253    circuit_open_time: Mutex<Option<Instant>>,
254    /// Whether a half-open probe request is currently in flight. Only one
255    /// request is admitted per recovery cycle to test the endpoint.
256    half_open_probe: std::sync::atomic::AtomicBool,
257    /// Watchdog timestamp for the half-open probe claim.
258    ///
259    /// Stamped with `Instant::now()` whenever `half_open_probe` flips
260    /// `false -> true` (at selection). `select` returns a `Selection` but
261    /// has no way to bind an RAII guard to it (the dashmap `Ref` is local
262    /// and `Selection` is a `Clone` public type consumed by the FFI/SDK
263    /// bindings), so if a caller — e.g. `GroupCoordinator::route_event`,
264    /// which never calls `record_completion` — drops the selection without
265    /// recording completion, the bare bool would stay `true` forever and
266    /// `is_circuit_open` would keep the recovered endpoint out of rotation
267    /// permanently. `is_circuit_open` treats a probe held longer than the
268    /// recovery window as abandoned and reclaims the slot so a fresh probe
269    /// can be admitted (the watchdog the `ProbeGuard` doc points to for the
270    /// async-cancel hazard). `record_completion` clears it on the normal
271    /// path.
272    half_open_probe_at: Mutex<Option<Instant>>,
273    /// Set when the endpoint is removed from the balancer. The flat snapshot
274    /// shares this `Arc`, so a selector iterating a snapshot taken *before* a
275    /// concurrent removal (and before the rebuild that drops the endpoint)
276    /// sees it as unavailable immediately. Without this, that selector could
277    /// pick a gone endpoint, fail the `endpoints.get` reservation, and burn a
278    /// retry — exhausting into a transient false `NoEndpointsAvailable`.
279    removed: std::sync::atomic::AtomicBool,
280}
281
282impl EndpointState {
283    fn new(endpoint: Endpoint) -> Self {
284        Self {
285            node_id: endpoint.node_id,
286            weight: endpoint.weight,
287            tags: endpoint.tags,
288            zone: endpoint.zone,
289            priority: endpoint.priority,
290            health: RwLock::new(endpoint.health),
291            metrics: ArcSwap::new(Arc::new(endpoint.metrics)),
292            enabled: std::sync::atomic::AtomicBool::new(endpoint.enabled),
293            connections: AtomicU32::new(0),
294            total_requests: AtomicU64::new(0),
295            failed_requests: AtomicU64::new(0),
296            consecutive_failures: AtomicU32::new(0),
297            circuit_open: std::sync::atomic::AtomicBool::new(false),
298            circuit_open_time: Mutex::new(None),
299            half_open_probe: std::sync::atomic::AtomicBool::new(false),
300            half_open_probe_at: Mutex::new(None),
301            removed: std::sync::atomic::AtomicBool::new(false),
302        }
303    }
304
305    fn health(&self) -> HealthStatus {
306        *self.health.read()
307    }
308
309    /// Materialize an owned snapshot of the current metrics. Used
310    /// by [`LoadBalancer::endpoints`] which builds full `Endpoint`
311    /// structs for operator/inventory consumers — that path
312    /// genuinely needs ownership. Per-select hot paths should call
313    /// [`Self::load_score`] which avoids the clone entirely.
314    fn metrics(&self) -> LoadMetrics {
315        (**self.metrics.load()).clone()
316    }
317
318    /// Compute the composite load score from the current metrics
319    /// snapshot. Per perf #149 — pre-fix every per-event select
320    /// path called `state.load_score()` which
321    /// `RwLock::read + LoadMetrics::clone + score`. Now it's one
322    /// `ArcSwap::load + score` — no lock, no clone.
323    fn load_score(&self) -> f64 {
324        self.metrics.load().load_score()
325    }
326
327    fn is_enabled(&self) -> bool {
328        self.enabled.load(Ordering::Relaxed)
329    }
330
331    fn effective_weight(&self) -> f64 {
332        if !self.is_enabled() {
333            return 0.0;
334        }
335        self.weight as f64 * self.health().weight_multiplier()
336    }
337
338    fn is_available(&self) -> bool {
339        !self.removed.load(Ordering::Acquire)
340            && self.is_enabled()
341            && self.health().can_receive_traffic()
342    }
343
344    /// Atomically reserve a connection slot if the endpoint is below cap.
345    ///
346    /// Returns `true` if the slot was reserved (caller now owns a connection
347    /// that must be released via `record_completion`), or `false` if the cap
348    /// was already reached. This replaces the prior check-then-increment
349    /// pattern that allowed concurrent selectors to exceed the cap.
350    fn try_record_request(&self, max_connections: u32) -> bool {
351        let reserved = self
352            .connections
353            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |c| {
354                if c >= max_connections {
355                    None
356                } else {
357                    Some(c + 1)
358                }
359            })
360            .is_ok();
361        if reserved {
362            self.total_requests.fetch_add(1, Ordering::Relaxed);
363        }
364        reserved
365    }
366
367    fn record_completion(&self, success: bool) {
368        // Saturating sub. Pre-fix `fetch_sub(1)` was unconditional;
369        // a caller hitting `record_completion` without a matching
370        // `record_request` (a substrate bug or a misuse of the
371        // public `LoadBalancer::record_completion(node_id)` API)
372        // underflowed `connections` to `u32::MAX - k`. After that,
373        // `try_record_request` always failed (`c >= max_connections`)
374        // and `get_available_endpoints` filtered the endpoint out
375        // forever - a silent, permanent removal from rotation with
376        // no log, no metric, no recovery path. The test at the
377        // bottom of this module explicitly acknowledged the hazard.
378        let _ = self
379            .connections
380            .fetch_update(Ordering::AcqRel, Ordering::Acquire, |c| {
381                Some(c.saturating_sub(1))
382            });
383
384        // If this completion is for the half-open probe, it decides the
385        // circuit's fate. Clearing the flag with swap also guarantees only
386        // one completion is treated as the probe outcome.
387        if self.half_open_probe.swap(false, Ordering::AcqRel) {
388            // Clear the watchdog stamp now that the probe is resolved
389            // normally — keeps a stale `Instant` from lingering past the
390            // next claim's stamp.
391            *self.half_open_probe_at.lock() = None;
392            if success {
393                self.circuit_open.store(false, Ordering::Release);
394                self.consecutive_failures.store(0, Ordering::Relaxed);
395                *self.circuit_open_time.lock() = None;
396            } else {
397                self.failed_requests.fetch_add(1, Ordering::Relaxed);
398                // Probe failed — restart the recovery timer so the next
399                // probe is delayed by another full recovery_time window.
400                *self.circuit_open_time.lock() = Some(Instant::now());
401            }
402            return;
403        }
404
405        if success {
406            self.consecutive_failures.store(0, Ordering::Relaxed);
407        } else {
408            self.failed_requests.fetch_add(1, Ordering::Relaxed);
409            let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
410            // Open circuit after 5 consecutive failures. Use CAS so only
411            // the thread that causes the transition records the open time.
412            if failures >= 5
413                && self
414                    .circuit_open
415                    .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
416                    .is_ok()
417            {
418                *self.circuit_open_time.lock() = Some(Instant::now());
419            }
420        }
421    }
422
423    /// Returns true if new requests should be rejected for this endpoint.
424    ///
425    /// Does NOT claim the half-open probe slot — that is done lazily at
426    /// selection time via [`try_claim_half_open_probe`], so only the
427    /// endpoint actually chosen by the selector claims the probe.
428    /// Conflating "is the circuit open?" with "CAS-claim the half-open
429    /// probe slot when the recovery window has elapsed" would let
430    /// `get_available_endpoints` claim the probe slot for every endpoint
431    /// it filters: a multi-endpoint outage past its recovery window
432    /// would then have every endpoint claim the slot in the scan while
433    /// only one (or zero) was selected; the N-1 others would hold
434    /// `half_open_probe == true` with no in-flight request and no
435    /// completion path, and every subsequent `is_circuit_open` would
436    /// return true forever.
437    ///
438    /// NOT a pure predicate: it performs ONE bounded self-healing write.
439    /// When the slot is held by a probe that has been claimed for longer
440    /// than a full recovery window with no completion (genuinely
441    /// ABANDONED — see below), it reclaims the slot via
442    /// [`release_half_open_probe`] and admits. This is idempotent under
443    /// concurrent scanners and only ever fires on an abandoned slot — a
444    /// LIVE probe (claimed within the recovery window) is never touched —
445    /// so a read-only caller cannot clear an in-flight probe. The reclaim
446    /// must live here, not on the claim path: the claim only runs for the
447    /// endpoint the selector picks, but a rejected endpoint is never
448    /// picked, so its abandoned slot could only be healed during this
449    /// scan.
450    fn is_circuit_open(&self, recovery_time: Duration) -> bool {
451        if !self.circuit_open.load(Ordering::Acquire) {
452            return false;
453        }
454        let open_time = match *self.circuit_open_time.lock() {
455            Some(t) => t,
456            None => return true,
457        };
458        if open_time.elapsed() < recovery_time {
459            return true;
460        }
461        // Recovery window has elapsed — the endpoint is admitting
462        // a half-open probe. If the probe slot is already taken,
463        // another request is in flight and we keep rejecting.
464        // Otherwise we admit (the caller will CAS-claim the slot
465        // via `try_claim_half_open_probe` only on the endpoint it
466        // actually selects).
467        if !self.half_open_probe.load(Ordering::Acquire) {
468            return false;
469        }
470        // The slot is claimed. A claim that has been held longer than
471        // a full recovery window with no completion is an ABANDONED
472        // probe — a selection handed to a caller (e.g.
473        // `GroupCoordinator::route_event`) that never calls
474        // `record_completion`, or an async request future that was
475        // cancelled/panicked between claim and completion without a
476        // `ProbeGuard`. Without this watchdog the bare bool would
477        // pin the recovered endpoint out of rotation forever. Reclaim
478        // the slot so a fresh probe can be admitted on this scan.
479        let abandoned = self
480            .half_open_probe_at
481            .lock()
482            .is_some_and(|claimed_at| claimed_at.elapsed() >= recovery_time);
483        if abandoned {
484            self.release_half_open_probe();
485            // Admit: this scan/selection will re-claim the slot.
486            return false;
487        }
488        true
489    }
490
491    /// Try to claim the half-open probe slot.
492    ///
493    /// Returns an [`Option<ProbeGuard<'_>>`]; the `Some` arm
494    /// carries an RAII guard whose `Drop` releases the slot
495    /// automatically. Callers that successfully drive the request
496    /// to completion MUST invoke [`ProbeGuard::commit`] before
497    /// dispatching to the network — `record_completion` is then
498    /// the path that clears the flag. Any other exit (panic
499    /// between claim and dispatch, future cancellation, fall-
500    /// through error) drops the guard and the slot rolls back
501    /// atomically.
502    ///
503    /// This guard API is intended for ASYNC callers where the
504    /// claim → completion window is materially wide (a request
505    /// future spanning a network round-trip, where cancellation
506    /// or panic between the two is plausible). The synchronous
507    /// `select` path at this module's top uses a direct
508    /// `compare_exchange` on `half_open_probe` because its claim
509    /// → release window is a few atomic ops; the borrow checker
510    /// forbids holding a `ProbeGuard<'_>` across the dashmap
511    /// `Ref`'s `drop(state)` boundary in that loop.
512    #[allow(dead_code)]
513    fn try_claim_half_open_probe(&self) -> Option<ProbeGuard<'_>> {
514        self.half_open_probe
515            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
516            .ok()
517            .map(|_| {
518                // Stamp the watchdog so a never-committed, never-dropped
519                // claim still self-heals via `is_circuit_open`.
520                self.stamp_half_open_probe();
521                ProbeGuard { state: self }
522            })
523    }
524
525    /// Release the half-open probe slot without recording a
526    /// completion outcome. Prefer [`ProbeGuard`]'s Drop for
527    /// routine release; this method exists for paths where the
528    /// slot must be cleared via direct atomic write (e.g.
529    /// `record_completion` once the breaker fully reopens).
530    fn release_half_open_probe(&self) {
531        *self.half_open_probe_at.lock() = None;
532        self.half_open_probe.store(false, Ordering::Release);
533    }
534
535    /// Stamp the half-open probe watchdog. Called right after a
536    /// successful `false -> true` claim of `half_open_probe` so
537    /// `is_circuit_open` can reclaim an abandoned probe slot (a
538    /// selection that was returned to a caller who never recorded
539    /// completion) once it has been held past the recovery window.
540    fn stamp_half_open_probe(&self) {
541        *self.half_open_probe_at.lock() = Some(Instant::now());
542    }
543}
544
545/// RAII guard returned by
546/// [`EndpointState::try_claim_half_open_probe`]. The Drop impl
547/// clears the `half_open_probe` slot UNLESS [`Self::commit`] was
548/// called first (which `mem::forget`-equivalent the guard, so
549/// no atomic write runs).
550///
551/// Pattern:
552/// ```ignore
553/// let probe = state.try_claim_half_open_probe()?;   // claim
554/// // ... checks that may early-return / panic ...
555/// if !state.try_record_request(max_conn) {
556///     return Err(...);                                // probe drops, slot released
557/// }
558/// probe.commit();                                     // success: ownership
559///                                                     //   transfers to record_completion
560/// // ... dispatch ...
561/// ```
562///
563/// Tracking the success vs failure path with a `bool` plus a
564/// manual `release_half_open_probe` at every fall-through is
565/// easy to miss on a future-cancel where neither `Ok` nor `Err`
566/// runs to completion.
567#[allow(dead_code)]
568pub(super) struct ProbeGuard<'a> {
569    state: &'a EndpointState,
570}
571
572impl<'a> ProbeGuard<'a> {
573    /// Forget the guard so its Drop does NOT release the slot.
574    /// Call this only on the success path AFTER the matching
575    /// `try_record_request` succeeded — `record_completion` is
576    /// then the path that clears the flag.
577    #[allow(dead_code)]
578    fn commit(self) {
579        std::mem::forget(self);
580    }
581}
582
583impl<'a> Drop for ProbeGuard<'a> {
584    fn drop(&mut self) {
585        // Roll back the claim. Idempotent at the atomic level
586        // (`store(false)` always lands false), but the structural
587        // invariant is that this Drop only runs on the
588        // non-commit path — `mem::forget` (via `commit`) prevents
589        // it on the success path.
590        self.state.release_half_open_probe();
591    }
592}
593
594/// Request context for load balancing decisions
595#[derive(Debug, Clone, Default)]
596pub struct RequestContext {
597    /// Request ID for consistent hashing
598    pub request_id: Option<String>,
599    /// Session ID for sticky sessions
600    pub session_id: Option<String>,
601    /// Client zone for locality routing
602    pub client_zone: Option<String>,
603    /// Required tags
604    pub required_tags: Vec<String>,
605    /// Preferred zones (in order of preference)
606    pub preferred_zones: Vec<String>,
607    /// Custom routing key
608    pub routing_key: Option<String>,
609}
610
611impl RequestContext {
612    /// Create new request context
613    pub fn new() -> Self {
614        Self::default()
615    }
616
617    /// Set session ID for sticky sessions
618    pub fn with_session(mut self, session_id: impl Into<String>) -> Self {
619        self.session_id = Some(session_id.into());
620        self
621    }
622
623    /// Set routing key for consistent hashing
624    pub fn with_routing_key(mut self, key: impl Into<String>) -> Self {
625        self.routing_key = Some(key.into());
626        self
627    }
628
629    /// Set client zone
630    pub fn with_zone(mut self, zone: impl Into<String>) -> Self {
631        self.client_zone = Some(zone.into());
632        self
633    }
634
635    /// Add required tag
636    pub fn require_tag(mut self, tag: impl Into<String>) -> Self {
637        self.required_tags.push(tag.into());
638        self
639    }
640}
641
642/// Selection result
643#[derive(Debug, Clone)]
644pub struct Selection {
645    /// Selected node ID
646    pub node_id: NodeId,
647    /// Endpoint weight
648    pub weight: u32,
649    /// Current load score
650    pub load_score: f64,
651    /// Why this node was selected
652    pub reason: SelectionReason,
653}
654
655/// Reason for selection
656#[derive(Debug, Clone, Copy, PartialEq, Eq)]
657pub enum SelectionReason {
658    /// Selected by round-robin
659    RoundRobin,
660    /// Selected by weight
661    Weighted,
662    /// Selected for having least connections
663    LeastConnections,
664    /// Selected by consistent hash
665    ConsistentHash,
666    /// Selected for lowest latency
667    LeastLatency,
668    /// Selected for lowest load
669    LeastLoad,
670    /// Selected randomly
671    Random,
672    /// Selected by power of two choices
673    PowerOfTwo,
674    /// Selected for zone affinity
675    ZoneAffinity,
676    /// Fallback selection
677    Fallback,
678}
679
680/// Load balancer configuration
681#[derive(Debug, Clone, Serialize, Deserialize)]
682pub struct LoadBalancerConfig {
683    /// Load balancing strategy
684    pub strategy: Strategy,
685    /// Health check interval
686    pub health_check_interval_ms: u64,
687    /// Circuit breaker recovery time
688    pub circuit_recovery_time_ms: u64,
689    /// Maximum connections per endpoint
690    pub max_connections_per_endpoint: u32,
691    /// Enable zone-aware routing
692    pub zone_aware: bool,
693    /// Fallback to any available if preferred zone unavailable
694    pub zone_fallback: bool,
695    /// Metrics staleness threshold
696    pub metrics_stale_after_ms: u64,
697}
698
699impl Default for LoadBalancerConfig {
700    fn default() -> Self {
701        Self {
702            strategy: Strategy::RoundRobin,
703            health_check_interval_ms: 5000,
704            circuit_recovery_time_ms: 30000,
705            max_connections_per_endpoint: 10000,
706            zone_aware: true,
707            zone_fallback: true,
708            metrics_stale_after_ms: 10000,
709        }
710    }
711}
712
713/// Load balancer error
714#[derive(Debug, Clone, PartialEq, Eq)]
715pub enum LoadBalancerError {
716    /// No endpoints available
717    NoEndpointsAvailable,
718    /// Endpoint not found
719    EndpointNotFound(NodeId),
720    /// All endpoints unhealthy
721    AllEndpointsUnhealthy,
722    /// No endpoints match required tags
723    NoMatchingEndpoints,
724    /// Circuit breaker open
725    CircuitOpen(NodeId),
726    /// Max connections reached
727    MaxConnectionsReached(NodeId),
728}
729
730impl std::fmt::Display for LoadBalancerError {
731    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
732        match self {
733            Self::NoEndpointsAvailable => write!(f, "no endpoints available"),
734            Self::EndpointNotFound(id) => write!(f, "endpoint not found: {:?}", id),
735            Self::AllEndpointsUnhealthy => write!(f, "all endpoints unhealthy"),
736            Self::NoMatchingEndpoints => write!(f, "no endpoints match required tags"),
737            Self::CircuitOpen(id) => write!(f, "circuit breaker open for: {:?}", id),
738            Self::MaxConnectionsReached(id) => write!(f, "max connections reached for: {:?}", id),
739        }
740    }
741}
742
743impl std::error::Error for LoadBalancerError {}
744
745/// Statistics for the load balancer
746#[derive(Debug, Clone, Default)]
747pub struct LoadBalancerStats {
748    /// Total selections made
749    pub total_selections: u64,
750    /// Failed selections
751    pub failed_selections: u64,
752    /// Active endpoints
753    pub active_endpoints: u32,
754    /// Healthy endpoints
755    pub healthy_endpoints: u32,
756    /// Total active connections
757    pub total_connections: u64,
758    /// Average load score across endpoints
759    pub avg_load_score: f64,
760}
761
762/// Shard count for the consistent-hash ring.
763///
764/// `DashMap::new()` defaults to `4 × num_cpus` shards (128 on a 32-thread
765/// host). The ring holds `virtual_nodes × endpoints` entries and
766/// `select_consistent_hash` walks it, so the default over-sharding added a
767/// ~128-shard-lock fixed cost to every consistent-hash selection (measured
768/// ~19% of it). A small fixed count keeps the walk cheap while leaving room for
769/// concurrent ring inserts on add/remove.
770///
771/// (The `endpoints` map keeps the default on purpose: `select`/`stats` read the
772/// `endpoint_list` snapshot, not `endpoints.iter()`, so its shard count only
773/// affects concurrent point lookups — where more shards is better.)
774const HASH_RING_SHARDS: usize = 8;
775
776/// Distributed load balancer
777pub struct LoadBalancer {
778    /// Configuration
779    config: LoadBalancerConfig,
780    /// Endpoints by node ID — authoritative store for point lookups
781    /// (get / reservation / health updates).
782    endpoints: DashMap<NodeId, Arc<EndpointState>>,
783    /// Flat snapshot of the same endpoints, rebuilt only when the SET changes
784    /// (add/remove). `select()`/`stats()` iterate this instead of
785    /// `DashMap::iter()`, which walks every shard (4*num_cpus, e.g. 128) even
786    /// for a handful of endpoints — the fixed cost that dominated select().
787    /// The Arcs are shared with `endpoints`, so live per-endpoint atomic state
788    /// (health, connections, circuit) is read correctly through the snapshot.
789    endpoint_list: ArcSwap<Vec<Arc<EndpointState>>>,
790    /// Serializes endpoint SET changes (add/remove) so the `endpoints`
791    /// mutation, hash-ring update, and `endpoint_list` rebuild commit as one
792    /// unit. Without it, two concurrent membership changes can interleave such
793    /// that a rebuild observing the map *before* another thread's mutation
794    /// stores its stale snapshot last — dropping a just-added endpoint (or
795    /// resurrecting a removed one) from `endpoint_list` until the next change.
796    /// Not taken on the hot path (`select`/`stats` only read the snapshot).
797    membership_lock: Mutex<()>,
798    /// Round-robin counter
799    rr_counter: AtomicU64,
800    /// Total selections
801    total_selections: AtomicU64,
802    /// Failed selections
803    failed_selections: AtomicU64,
804    /// Consistent hash ring (node_id -> virtual nodes)
805    hash_ring: DashMap<u64, NodeId>,
806    /// Virtual nodes per endpoint for consistent hashing
807    virtual_nodes: u32,
808}
809
810impl LoadBalancer {
811    /// Create a new load balancer
812    pub fn new(config: LoadBalancerConfig) -> Self {
813        Self {
814            config,
815            endpoints: DashMap::new(),
816            endpoint_list: ArcSwap::from_pointee(Vec::new()),
817            membership_lock: Mutex::new(()),
818            rr_counter: AtomicU64::new(0),
819            total_selections: AtomicU64::new(0),
820            failed_selections: AtomicU64::new(0),
821            hash_ring: DashMap::with_shard_amount(HASH_RING_SHARDS),
822            virtual_nodes: 150,
823        }
824    }
825
826    /// Create with default configuration
827    pub fn with_strategy(strategy: Strategy) -> Self {
828        Self::new(LoadBalancerConfig {
829            strategy,
830            ..Default::default()
831        })
832    }
833
834    /// Add an endpoint
835    pub fn add_endpoint(&self, endpoint: Endpoint) {
836        let node_id = endpoint.node_id;
837        // Hold `membership_lock` across the mutation + rebuild so a concurrent
838        // add/remove can't store a stale snapshot over ours (see field doc).
839        let _guard = self.membership_lock.lock();
840        let was_present = self
841            .endpoints
842            .insert(node_id, Arc::new(EndpointState::new(endpoint)))
843            .is_some();
844
845        // Add (or idempotently re-add) this node's vnodes. `add_to_hash_ring`
846        // overwrites the node's own prior vnodes in place and sweeps any
847        // stale leftovers, so a re-add (reconnect / weight change) neither
848        // leaks vnodes nor leaves a transient window where the node is
849        // absent from the ring (which would misroute traffic for an
850        // endpoint that was meant to stay available). `was_present`
851        // (endpoint absent => no prior vnodes, under `membership_lock`)
852        // lets a fresh add skip the full-ring stale-vnode sweep.
853        self.add_to_hash_ring(node_id, was_present);
854        self.rebuild_endpoint_list();
855    }
856
857    /// Remove an endpoint
858    pub fn remove_endpoint(&self, node_id: &NodeId) {
859        let _guard = self.membership_lock.lock();
860        self.remove_from_hash_ring(node_id);
861        if let Some((_, state)) = self.endpoints.remove(node_id) {
862            // Flag the shared EndpointState so an in-flight selector reading a
863            // pre-rebuild snapshot treats it as unavailable (see field doc).
864            state.removed.store(true, Ordering::Release);
865        }
866        self.rebuild_endpoint_list();
867    }
868
869    /// Rebuild the flat endpoint snapshot iterated by `select`/`stats`.
870    /// Called only when the endpoint SET changes (add/remove) — per-endpoint
871    /// state updates mutate shared atomics visible through the existing Arcs,
872    /// so they need no rebuild. This is the only place that pays the
873    /// `DashMap::iter()` shard walk; the hot path reads the snapshot.
874    fn rebuild_endpoint_list(&self) {
875        let list: Vec<Arc<EndpointState>> = self
876            .endpoints
877            .iter()
878            .map(|e| Arc::clone(e.value()))
879            .collect();
880        self.endpoint_list.store(Arc::new(list));
881    }
882
883    /// Update endpoint health
884    pub fn update_health(&self, node_id: &NodeId, health: HealthStatus) {
885        if let Some(state) = self.endpoints.get(node_id) {
886            *state.health.write() = health;
887        }
888    }
889
890    /// Update endpoint metrics
891    pub fn update_metrics(&self, node_id: &NodeId, metrics: LoadMetrics) {
892        if let Some(state) = self.endpoints.get(node_id) {
893            state.metrics.store(Arc::new(metrics));
894        }
895    }
896
897    /// Select an endpoint for a request.
898    ///
899    /// The connection slot is reserved atomically as part of selection so
900    /// that concurrent selectors cannot collectively exceed
901    /// `max_connections_per_endpoint`. If a strategy picks an endpoint whose
902    /// cap was filled by a concurrent selector between availability filtering
903    /// and reservation, the selection is retried up to a bounded number of
904    /// times before giving up.
905    pub fn select(&self, ctx: &RequestContext) -> Result<Selection, LoadBalancerError> {
906        self.total_selections.fetch_add(1, Ordering::Relaxed);
907
908        const MAX_RESERVATION_RETRIES: usize = 4;
909        let max_conn = self.config.max_connections_per_endpoint;
910
911        // Round-robin strategies advance `rr_counter` inside their
912        // selection function. The retry loop below could call them up
913        // to 4 times per logical `select()`, which inflated the
914        // rotation counter proportionally and distorted the observed
915        // RR sequence — weighted-RR distribution tests indirectly
916        // assumed 1:1. We pre-compute the RR offset once for this
917        // whole logical selection and step deterministically across
918        // retries via `(rr_offset + attempt)`, so the counter
919        // advances exactly once per `select()` regardless of how many
920        // reservation retries occur.
921        let rr_offset_for_this_select = self.rr_counter.fetch_add(1, Ordering::Relaxed) as usize;
922
923        for attempt in 0..MAX_RESERVATION_RETRIES {
924            let available = self.get_available_endpoints(ctx)?;
925
926            if available.is_empty() {
927                self.failed_selections.fetch_add(1, Ordering::Relaxed);
928                return Err(LoadBalancerError::NoEndpointsAvailable);
929            }
930
931            // Apply strategy. Round-robin variants take a
932            // pre-computed offset; non-RR strategies are
933            // unaffected by retries (their selection is content-
934            // or metric-based).
935            let selection = match self.config.strategy {
936                Strategy::RoundRobin => self.select_round_robin_at(
937                    &available,
938                    rr_offset_for_this_select.wrapping_add(attempt),
939                ),
940                Strategy::WeightedRoundRobin => self.select_weighted_round_robin_at(
941                    &available,
942                    rr_offset_for_this_select.wrapping_add(attempt) as u64,
943                ),
944                Strategy::LeastConnections => self.select_least_connections(&available),
945                Strategy::WeightedLeastConnections => {
946                    self.select_weighted_least_connections(&available)
947                }
948                Strategy::Random => self.select_random(&available),
949                Strategy::WeightedRandom => self.select_weighted_random(&available),
950                Strategy::ConsistentHash => self.select_consistent_hash(&available, ctx),
951                Strategy::LeastLatency => self.select_least_latency(&available),
952                Strategy::LeastLoad => self.select_least_load(&available),
953                Strategy::PowerOfTwo => self.select_power_of_two(&available),
954                Strategy::Adaptive => self.select_adaptive(&available, ctx),
955            };
956
957            // Atomically reserve the connection slot. If a concurrent
958            // selector filled the cap, re-run selection against fresh state.
959            if let Some(state) = self.endpoints.get(&selection.node_id) {
960                // Claim the half-open probe slot ONLY on the
961                // endpoint we actually selected, AFTER the
962                // pure-predicate `is_circuit_open` check has
963                // already admitted the endpoint into `available`.
964                // Claiming during the filter pass would leak slots
965                // on multi-endpoint outages.
966                //
967                // When `circuit_open == true`, the half-open probe
968                // claim is the HARD GATE — losers of the CAS race
969                // must NOT proceed through `try_record_request`.
970                // Without strict half-open semantics, a concurrent
971                // selector that observed `half_open_probe == false`
972                // at filter time but lost the claim CAS could still
973                // ride the connection-cap path through and send
974                // real traffic to a recovering endpoint alongside
975                // the actual probe. Only the thread that wins the
976                // probe-slot CAS may test the endpoint; everyone
977                // else skips and retries selection. With the slot
978                // now claimed (by whoever won), the next
979                // iteration's `get_available_endpoints` sees
980                // `half_open_probe == true` and filters this
981                // endpoint out — losers naturally pick a different
982                // endpoint or surface `NoEndpointsAvailable` if
983                // this was the only option.
984                let circuit_open = state.circuit_open.load(Ordering::Acquire);
985                // The `ProbeGuard` RAII type is the preferred API
986                // for future async callers (where the request
987                // future may panic / cancel between claim and
988                // `record_completion`, leaking the slot without a
989                // guard). At THIS synchronous selection callsite,
990                // the guard's lifetime is tied to the dashmap
991                // `Ref` we hold via `state`; carrying it across
992                // the `drop(state); continue;` path the
993                // lost-race branch needs is forbidden by the
994                // borrow checker. Since this loop is fully
995                // synchronous (a few atomic ops between claim
996                // and either `Ok(selection)` or
997                // `release_half_open_probe`), the bool +
998                // explicit-release pattern is panic-free in
999                // practice — the only ops between claim and
1000                // release are atomic loads / stores that don't
1001                // unwind. We use a direct CAS here rather than
1002                // `try_claim_half_open_probe` so we don't have to
1003                // immediately drop the guard returned by it.
1004                let claimed_probe = if circuit_open {
1005                    let claim_ok = state
1006                        .half_open_probe
1007                        .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1008                        .is_ok();
1009                    if !claim_ok {
1010                        // Lost the half-open probe race. Drop the
1011                        // ref guard so the retry's `endpoints.get`
1012                        // doesn't deadlock, and continue to the
1013                        // next attempt.
1014                        drop(state);
1015                        continue;
1016                    }
1017                    // Stamp the watchdog so this claim self-heals even
1018                    // if the returned `Selection` is dropped without a
1019                    // matching `record_completion` (e.g.
1020                    // `GroupCoordinator::route_event`). `is_circuit_open`
1021                    // reclaims the slot once it has been held past the
1022                    // recovery window. See `half_open_probe_at`.
1023                    state.stamp_half_open_probe();
1024                    true
1025                } else {
1026                    false
1027                };
1028                if state.try_record_request(max_conn) {
1029                    return Ok(selection);
1030                }
1031                // try_record_request failed — release any probe
1032                // slot we just claimed so it doesn't strand.
1033                if claimed_probe {
1034                    state.release_half_open_probe();
1035                }
1036            }
1037        }
1038
1039        self.failed_selections.fetch_add(1, Ordering::Relaxed);
1040        Err(LoadBalancerError::NoEndpointsAvailable)
1041    }
1042
1043    /// Record request completion
1044    pub fn record_completion(&self, node_id: &NodeId, success: bool) {
1045        if let Some(state) = self.endpoints.get(node_id) {
1046            state.record_completion(success);
1047        }
1048    }
1049
1050    /// Get available endpoints matching context
1051    fn get_available_endpoints(
1052        &self,
1053        ctx: &RequestContext,
1054    ) -> Result<Vec<Arc<EndpointState>>, LoadBalancerError> {
1055        // Clamp to >= 1ms. A 0 recovery window makes the abandoned-probe
1056        // watchdog in `is_circuit_open` fire on every freshly-claimed
1057        // probe (`claimed_at.elapsed() >= ZERO` is always true), which
1058        // collapses the single-probe half-open gate and admits unbounded
1059        // concurrent probes to a still-failing endpoint. `0` is a
1060        // reachable, unvalidated config value, so guard it at the source.
1061        let recovery_time = Duration::from_millis(self.config.circuit_recovery_time_ms.max(1));
1062        let mut available = Vec::new();
1063        let mut zone_matches = Vec::new();
1064
1065        // Iterate the flat snapshot rather than DashMap::iter (shard walk).
1066        let snapshot = self.endpoint_list.load();
1067        for state in snapshot.iter() {
1068            // Check basic availability
1069            if !state.is_available() {
1070                continue;
1071            }
1072
1073            // Check circuit breaker
1074            if state.is_circuit_open(recovery_time) {
1075                continue;
1076            }
1077
1078            // Check max connections
1079            if state.connections.load(Ordering::Relaxed) >= self.config.max_connections_per_endpoint
1080            {
1081                continue;
1082            }
1083
1084            // Check required tags
1085            if !ctx.required_tags.is_empty()
1086                && !ctx.required_tags.iter().all(|t| state.tags.contains(t))
1087            {
1088                continue;
1089            }
1090
1091            // Zone-aware routing
1092            if self.config.zone_aware {
1093                if let Some(ref client_zone) = ctx.client_zone {
1094                    if state.zone.as_ref() == Some(client_zone) {
1095                        zone_matches.push(Arc::clone(state));
1096                        continue;
1097                    }
1098                }
1099            }
1100
1101            available.push(Arc::clone(state));
1102        }
1103
1104        // Prefer zone matches if available
1105        if !zone_matches.is_empty() {
1106            return Ok(zone_matches);
1107        }
1108
1109        // No zone matches — check zone_fallback policy
1110        if self.config.zone_aware && ctx.client_zone.is_some() && !self.config.zone_fallback {
1111            // zone_fallback is disabled: don't fall back to non-zone endpoints
1112            return Err(LoadBalancerError::NoEndpointsAvailable);
1113        }
1114
1115        if available.is_empty() {
1116            return Err(LoadBalancerError::NoEndpointsAvailable);
1117        }
1118
1119        Ok(available)
1120    }
1121
1122    fn select_round_robin(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1123        let offset = self.rr_counter.fetch_add(1, Ordering::Relaxed) as usize;
1124        self.select_round_robin_at(endpoints, offset)
1125    }
1126
1127    /// Offset-based variant used by the retry loop in `select()` so
1128    /// a logical select advances the `rr_counter` exactly once across
1129    /// all reservation retries.
1130    fn select_round_robin_at(&self, endpoints: &[Arc<EndpointState>], offset: usize) -> Selection {
1131        let idx = offset % endpoints.len();
1132        let state = &endpoints[idx];
1133        Selection {
1134            node_id: state.node_id,
1135            weight: state.weight,
1136            load_score: state.load_score(),
1137            reason: SelectionReason::RoundRobin,
1138        }
1139    }
1140
1141    fn select_weighted_round_robin(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1142        let counter = self.rr_counter.fetch_add(1, Ordering::Relaxed);
1143        self.select_weighted_round_robin_at(endpoints, counter)
1144    }
1145
1146    /// Offset-based variant used by `select()` across reservation
1147    /// retries so the `rr_counter` advances exactly once per logical
1148    /// select.
1149    fn select_weighted_round_robin_at(
1150        &self,
1151        endpoints: &[Arc<EndpointState>],
1152        counter: u64,
1153    ) -> Selection {
1154        let total_weight: f64 = endpoints.iter().map(|e| e.effective_weight()).sum();
1155
1156        // Use `!(total_weight > 0.0)` rather than `total_weight <= 0.0`:
1157        // NaN compares unequal to everything (including itself), so
1158        // `NaN <= 0.0` is `false` — the gate would fall through to
1159        // the weighted path below where `total_weight.ceil() as u64`
1160        // is undefined for NaN, and the cumulative loop never
1161        // exceeds NaN (the `>` comparison is also false), causing
1162        // the function to fall through to the fallback-first path
1163        // and silently bias every selection to `endpoints[0]`. The
1164        // negated-greater check catches NaN as well as ≤ 0.0.
1165        // Clippy flags the negated comparison; the lint is wrong
1166        // for our NaN-safety intent, so suppress it locally.
1167        #[allow(clippy::neg_cmp_op_on_partial_ord)]
1168        if !(total_weight > 0.0) {
1169            return self.select_round_robin_at(endpoints, counter as usize);
1170        }
1171
1172        // Pick a wheel position by reducing the counter modulo an
1173        // integer `wheel` size BEFORE casting to f64, then mapping that
1174        // position proportionally into `[0, total_weight)`. Doing the
1175        // modulus in integer space first preserves the precision fix —
1176        // `counter as f64 % total_weight` lost the low bits of `counter`
1177        // past the f64 mantissa boundary (2^53 selections), stalling
1178        // rotation on a narrow set of indices.
1179        //
1180        // The wheel size must NOT collapse to the integer ceiling of a
1181        // sub-unit total: `total_weight.ceil() as u64` mapped
1182        // `total_weight == 1.0` (e.g. two endpoints each at effective
1183        // weight 0.5, both `Degraded`) to `1`, so `counter % 1 == 0`
1184        // always and the cumulative loop (`0.5 > 0`) selected the first
1185        // endpoint forever — the second starved.
1186        //
1187        // Derive the wheel from the weights themselves rather than an
1188        // arbitrary floor: `ceil(total / smallest positive weight)`
1189        // gives every endpoint at least one wheel position and yields
1190        // EXACT ratios for commensurate weights. Integer weights keep
1191        // their natural cycle (1,1,1 → wheel 3; 100,50 → wheel 3, i.e.
1192        // 2:1) — byte-for-byte the old integer behavior, NOT reshaped by
1193        // a fixed constant — while fractional/sub-unit shares resolve
1194        // (0.5,0.5 → 2; 1.0,0.5 → 3). The modulus is taken in integer
1195        // space (`counter % wheel`) BEFORE the f64 cast, preserving the
1196        // precision fix (`counter as f64 % total` lost the low bits of
1197        // `counter` past the 2^53 mantissa boundary, stalling rotation).
1198        let min_weight = endpoints
1199            .iter()
1200            .map(|e| e.effective_weight())
1201            .filter(|w| *w > 0.0)
1202            .fold(f64::INFINITY, f64::min);
1203        let wheel = ((total_weight / min_weight).ceil() as u64).max(1);
1204        // Map the integer wheel position into the real weight domain.
1205        let target = (counter % wheel) as f64 / wheel as f64 * total_weight;
1206
1207        let mut cumulative = 0.0;
1208        for state in endpoints {
1209            cumulative += state.effective_weight();
1210            if cumulative > target {
1211                return Selection {
1212                    node_id: state.node_id,
1213                    weight: state.weight,
1214                    load_score: state.load_score(),
1215                    reason: SelectionReason::Weighted,
1216                };
1217            }
1218        }
1219
1220        // Fallback to first
1221        let state = &endpoints[0];
1222        Selection {
1223            node_id: state.node_id,
1224            weight: state.weight,
1225            load_score: state.load_score(),
1226            reason: SelectionReason::Weighted,
1227        }
1228    }
1229
1230    #[expect(
1231        clippy::unwrap_used,
1232        reason = "caller (LoadBalancer::select) returns early on empty endpoints; min_by_key on a non-empty iter is infallible"
1233    )]
1234    fn select_least_connections(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1235        let state = endpoints
1236            .iter()
1237            .min_by_key(|e| e.connections.load(Ordering::Relaxed))
1238            .unwrap();
1239
1240        Selection {
1241            node_id: state.node_id,
1242            weight: state.weight,
1243            load_score: state.load_score(),
1244            reason: SelectionReason::LeastConnections,
1245        }
1246    }
1247
1248    #[expect(
1249        clippy::unwrap_used,
1250        reason = "caller (LoadBalancer::select) returns early on empty endpoints; min_by on a non-empty iter is infallible"
1251    )]
1252    fn select_weighted_least_connections(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1253        // Score = connections / weight (lower is better).
1254        // The `.max(MIN_DIVISOR)` guard is a divide-by-zero protector
1255        // for zero-weighted endpoints. It uses a small positive
1256        // epsilon instead of `1.0` so that fractional weights like
1257        // `0.1` and `0.5` keep their relative ordering — the old
1258        // `.max(1.0)` silently collapsed any weight in `(0, 1]` onto
1259        // `1.0`, degrading weighted-LC into plain least-connections
1260        // whenever operators configured sub-unit weights.
1261        const MIN_DIVISOR: f64 = 1e-6;
1262        let state = endpoints
1263            .iter()
1264            .min_by(|a, b| {
1265                let score_a = a.connections.load(Ordering::Relaxed) as f64
1266                    / a.effective_weight().max(MIN_DIVISOR);
1267                let score_b = b.connections.load(Ordering::Relaxed) as f64
1268                    / b.effective_weight().max(MIN_DIVISOR);
1269                score_a.total_cmp(&score_b)
1270            })
1271            .unwrap();
1272
1273        Selection {
1274            node_id: state.node_id,
1275            weight: state.weight,
1276            load_score: state.load_score(),
1277            reason: SelectionReason::LeastConnections,
1278        }
1279    }
1280
1281    fn select_random(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1282        let idx = random_usize() % endpoints.len();
1283        let state = &endpoints[idx];
1284        Selection {
1285            node_id: state.node_id,
1286            weight: state.weight,
1287            load_score: state.load_score(),
1288            reason: SelectionReason::Random,
1289        }
1290    }
1291
1292    fn select_weighted_random(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1293        let total_weight: f64 = endpoints.iter().map(|e| e.effective_weight()).sum();
1294
1295        if total_weight <= 0.0 {
1296            return self.select_random(endpoints);
1297        }
1298
1299        let target = random_f64() * total_weight;
1300
1301        let mut cumulative = 0.0;
1302        for state in endpoints {
1303            cumulative += state.effective_weight();
1304            if cumulative >= target {
1305                return Selection {
1306                    node_id: state.node_id,
1307                    weight: state.weight,
1308                    load_score: state.load_score(),
1309                    reason: SelectionReason::Weighted,
1310                };
1311            }
1312        }
1313
1314        // Fallback
1315        let state = &endpoints[0];
1316        Selection {
1317            node_id: state.node_id,
1318            weight: state.weight,
1319            load_score: state.load_score(),
1320            reason: SelectionReason::Weighted,
1321        }
1322    }
1323
1324    fn select_consistent_hash(
1325        &self,
1326        endpoints: &[Arc<EndpointState>],
1327        ctx: &RequestContext,
1328    ) -> Selection {
1329        let key = ctx
1330            .routing_key
1331            .as_ref()
1332            .or(ctx.session_id.as_ref())
1333            .or(ctx.request_id.as_ref());
1334
1335        if let Some(key) = key {
1336            let hash = self.hash_key(key);
1337
1338            // Collect and sort hash ring entries — DashMap iteration order is
1339            // arbitrary, but consistent hashing requires finding the smallest
1340            // key >= hash.
1341            let mut ring: Vec<(u64, NodeId)> = self
1342                .hash_ring
1343                .iter()
1344                .map(|entry| (*entry.key(), *entry.value()))
1345                .collect();
1346            ring.sort_unstable_by_key(|&(k, _)| k);
1347
1348            // Binary search for the first key >= hash
1349            let idx = ring.partition_point(|&(k, _)| k < hash);
1350
1351            // Try from the found position, wrapping around
1352            for i in 0..ring.len() {
1353                let (_, node_id) = ring[(idx + i) % ring.len()];
1354                if let Some(state) = endpoints.iter().find(|e| e.node_id == node_id) {
1355                    return Selection {
1356                        node_id: state.node_id,
1357                        weight: state.weight,
1358                        load_score: state.load_score(),
1359                        reason: SelectionReason::ConsistentHash,
1360                    };
1361                }
1362            }
1363        }
1364
1365        // Fallback to round-robin
1366        self.select_round_robin(endpoints)
1367    }
1368
1369    #[expect(
1370        clippy::unwrap_used,
1371        reason = "caller (LoadBalancer::select) returns early on empty endpoints; min_by on a non-empty iter is infallible"
1372    )]
1373    fn select_least_latency(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1374        let state = endpoints
1375            .iter()
1376            .min_by(|a, b| {
1377                a.metrics()
1378                    .avg_response_time_ms
1379                    .total_cmp(&b.metrics().avg_response_time_ms)
1380            })
1381            .unwrap();
1382
1383        Selection {
1384            node_id: state.node_id,
1385            weight: state.weight,
1386            load_score: state.load_score(),
1387            reason: SelectionReason::LeastLatency,
1388        }
1389    }
1390
1391    #[expect(
1392        clippy::unwrap_used,
1393        reason = "caller (LoadBalancer::select) returns early on empty endpoints; min_by on a non-empty iter is infallible"
1394    )]
1395    fn select_least_load(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1396        let state = endpoints
1397            .iter()
1398            .min_by(|a, b| {
1399                a.metrics()
1400                    .load_score()
1401                    .total_cmp(&b.metrics().load_score())
1402            })
1403            .unwrap();
1404
1405        Selection {
1406            node_id: state.node_id,
1407            weight: state.weight,
1408            load_score: state.load_score(),
1409            reason: SelectionReason::LeastLoad,
1410        }
1411    }
1412
1413    fn select_power_of_two(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1414        if endpoints.len() < 2 {
1415            return self.select_round_robin(endpoints);
1416        }
1417
1418        // Pick two random endpoints
1419        let idx1 = random_usize() % endpoints.len();
1420        let mut idx2 = random_usize() % endpoints.len();
1421        if idx2 == idx1 {
1422            idx2 = (idx1 + 1) % endpoints.len();
1423        }
1424
1425        let state1 = &endpoints[idx1];
1426        let state2 = &endpoints[idx2];
1427
1428        // Choose the one with fewer connections
1429        let state = if state1.connections.load(Ordering::Relaxed)
1430            <= state2.connections.load(Ordering::Relaxed)
1431        {
1432            state1
1433        } else {
1434            state2
1435        };
1436
1437        Selection {
1438            node_id: state.node_id,
1439            weight: state.weight,
1440            load_score: state.load_score(),
1441            reason: SelectionReason::PowerOfTwo,
1442        }
1443    }
1444
1445    fn select_adaptive(&self, endpoints: &[Arc<EndpointState>], ctx: &RequestContext) -> Selection {
1446        // Use different strategies based on conditions
1447        let avg_load: f64 = endpoints
1448            .iter()
1449            .map(|e| e.metrics().load_score())
1450            .sum::<f64>()
1451            / endpoints.len() as f64;
1452
1453        // If high load, use least connections
1454        if avg_load > 0.7 {
1455            return self.select_least_connections(endpoints);
1456        }
1457
1458        // If session ID present, use consistent hash
1459        if ctx.session_id.is_some() || ctx.routing_key.is_some() {
1460            return self.select_consistent_hash(endpoints, ctx);
1461        }
1462
1463        // Otherwise use weighted round-robin
1464        self.select_weighted_round_robin(endpoints)
1465    }
1466
1467    /// Place (or idempotently re-place) `node_id`'s vnodes on the ring.
1468    /// `was_present` is whether the node already had an endpoint entry
1469    /// (and therefore prior vnodes): on a fresh add it is false and the
1470    /// stale-vnode sweep is skipped, avoiding a full-ring scan per join.
1471    fn add_to_hash_ring(&self, node_id: NodeId, was_present: bool) {
1472        // Slots this node ends up occupying in THIS call — used both to
1473        // keep the node's own intra-call collisions distinct and to
1474        // sweep any stale vnodes left by a prior add (drift cleanup).
1475        let mut placed: std::collections::HashSet<u64> =
1476            std::collections::HashSet::with_capacity(self.virtual_nodes as usize);
1477        for i in 0..self.virtual_nodes {
1478            let key = format!("{:?}-{}", node_id, i);
1479            let mut hash = self.hash_key(&key);
1480            // Linear-probe past collisions, NON-DESTRUCTIVELY (a plain
1481            // `insert` would clobber another node's vnode and skew the
1482            // ring). The probe stops when the slot is:
1483            //   * free, OR
1484            //   * already held by THIS node from a *prior* add (not one
1485            //     we placed this call) — overwrite it in place.
1486            // The in-place overwrite is what makes a re-add (reconnect /
1487            // weight change) idempotent WITHOUT first removing the
1488            // node's vnodes: clearing first (the previous fix) left a
1489            // transient window where the node had no ring presence and
1490            // traffic could misroute. A slot held by a DIFFERENT node,
1491            // or by one of this node's vnodes we ALREADY placed this
1492            // call (an intra-node hash collision), is a true collision —
1493            // probe on so every vnode stays distinct.
1494            loop {
1495                match self.hash_ring.get(&hash).map(|r| *r) {
1496                    None => break,
1497                    Some(occupant) if occupant == node_id && !placed.contains(&hash) => break,
1498                    Some(_) => hash = hash.wrapping_add(1),
1499                }
1500            }
1501            self.hash_ring.insert(hash, node_id);
1502            placed.insert(hash);
1503        }
1504        // Sweep any vnodes still tagged with this node that we did NOT
1505        // (re)place this call — stale entries from an earlier add whose
1506        // probe path changed (e.g. a collision partner was since
1507        // removed). The node's freshly-placed vnodes are all in `placed`
1508        // and inserted above, so it is never absent from the ring; this
1509        // only drops leftovers. Skipped entirely on a fresh add: a node
1510        // with no prior endpoint entry has no prior vnodes to leave
1511        // behind, so the O(ring) retain scan would never remove anything
1512        // — important when many nodes join a large ring.
1513        if was_present {
1514            self.hash_ring
1515                .retain(|k, v| *v != node_id || placed.contains(k));
1516        }
1517    }
1518
1519    fn remove_from_hash_ring(&self, node_id: &NodeId) {
1520        self.hash_ring.retain(|_, v| v != node_id);
1521    }
1522
1523    fn hash_key(&self, key: &str) -> u64 {
1524        // Simple FNV-1a hash
1525        let mut hash: u64 = 0xcbf29ce484222325;
1526        for byte in key.bytes() {
1527            hash ^= byte as u64;
1528            hash = hash.wrapping_mul(0x100000001b3);
1529        }
1530        hash
1531    }
1532
1533    /// Get statistics
1534    pub fn stats(&self) -> LoadBalancerStats {
1535        let mut healthy = 0u32;
1536        let mut total_connections = 0u64;
1537        let mut total_load = 0.0;
1538
1539        let snapshot = self.endpoint_list.load();
1540        for state in snapshot.iter() {
1541            if state.health() == HealthStatus::Healthy {
1542                healthy += 1;
1543            }
1544            total_connections += state.connections.load(Ordering::Relaxed) as u64;
1545            total_load += state.load_score();
1546        }
1547
1548        let endpoint_count = snapshot.len() as u32;
1549
1550        LoadBalancerStats {
1551            total_selections: self.total_selections.load(Ordering::Relaxed),
1552            failed_selections: self.failed_selections.load(Ordering::Relaxed),
1553            active_endpoints: endpoint_count,
1554            healthy_endpoints: healthy,
1555            total_connections,
1556            avg_load_score: if endpoint_count > 0 {
1557                total_load / endpoint_count as f64
1558            } else {
1559                0.0
1560            },
1561        }
1562    }
1563
1564    /// Get all endpoints as snapshots
1565    pub fn endpoints(&self) -> Vec<Endpoint> {
1566        self.endpoint_list
1567            .load()
1568            .iter()
1569            .map(|state| Endpoint {
1570                node_id: state.node_id,
1571                weight: state.weight,
1572                health: state.health(),
1573                metrics: state.metrics(),
1574                tags: state.tags.clone(),
1575                priority: state.priority,
1576                enabled: state.is_enabled(),
1577                zone: state.zone.clone(),
1578            })
1579            .collect()
1580    }
1581
1582    /// Get endpoint count
1583    pub fn endpoint_count(&self) -> usize {
1584        self.endpoint_list.load().len()
1585    }
1586}
1587
1588/// Generate random usize.
1589///
1590/// Aborts on `getrandom` failure rather than panic-unwinding
1591/// through the FFI boundary. Load-balance random numbers are not
1592/// directly auth-bearing, but this function is reachable from hot
1593/// paths called by `extern "C"` FFI consumers (Python / Node / Go
1594/// bindings) — a `getrandom` failure that unwound across the C
1595/// ABI would be undefined behaviour. `process::abort` is
1596/// `extern "C"`-safe (terminates rather than unwinds) and
1597/// loss-of-availability is the only safe response when the system
1598/// can't produce randomness.
1599fn random_usize() -> usize {
1600    let mut bytes = [0u8; 8];
1601    if let Err(e) = getrandom::fill(&mut bytes) {
1602        eprintln!(
1603            "FATAL: behavior::loadbalance::random_usize getrandom failure ({e:?}); \
1604             aborting to avoid panic across the FFI boundary"
1605        );
1606        std::process::abort();
1607    }
1608    usize::from_le_bytes(bytes)
1609}
1610
1611/// Generate random f64 uniformly in the half-open interval [0.0, 1.0).
1612///
1613/// Uses the top 53 bits of entropy (the f64 mantissa width) divided by
1614/// `2^53`, which guarantees the result is strictly less than 1.0. The naive
1615/// `r as f64 / u64::MAX as f64` approach can round up to exactly 1.0 because
1616/// `u64::MAX as f64` itself rounds to `2^64`.
1617fn random_f64() -> f64 {
1618    let r = random_usize() as u64;
1619    (r >> 11) as f64 / ((1u64 << 53) as f64)
1620}
1621
1622#[cfg(test)]
1623mod tests {
1624    use super::*;
1625
1626    fn make_node_id(n: u8) -> NodeId {
1627        let mut id = [0u8; 32];
1628        id[0] = n;
1629        id
1630    }
1631
1632    /// Pin perf #149: `update_metrics` followed by `load_score`
1633    /// observes the new metrics value lock-free via ArcSwap. A
1634    /// regression that reverts to `RwLock<LoadMetrics>` would
1635    /// still pass this functional test but would re-introduce the
1636    /// per-event lock-acquire + struct clone. The pointer-identity
1637    /// check on `Arc::ptr_eq` distinguishes ArcSwap (writer's Arc
1638    /// is the reader's Arc) from any swap-via-clone alternative.
1639    #[test]
1640    fn endpoint_state_metrics_arc_swap_visibility_and_no_clone_on_read() {
1641        let lb = LoadBalancer::with_strategy(Strategy::LeastLoad);
1642        let node = make_node_id(7);
1643        lb.add_endpoint(Endpoint::new(node));
1644
1645        // Initial: defaulted LoadMetrics; score is small but
1646        // computed lock-free.
1647        {
1648            let state_ref = lb.endpoints.get(&node).expect("endpoint registered");
1649            let initial_load = state_ref.load_score();
1650            assert!(
1651                initial_load >= 0.0,
1652                "load_score must compute from current ArcSwap snapshot"
1653            );
1654            // The internal Arc backing metrics — readers should
1655            // observe the SAME Arc identity across two loads with
1656            // no intervening write.
1657            let arc1 = state_ref.metrics.load_full();
1658            let arc2 = state_ref.metrics.load_full();
1659            assert!(
1660                Arc::ptr_eq(&arc1, &arc2),
1661                "two reads with no writer in between must share the Arc — \
1662                 confirms ArcSwap (not RwLock<T>) backing"
1663            );
1664        }
1665
1666        // Update to a heavily-loaded snapshot.
1667        let busy = LoadMetrics {
1668            cpu_usage: 0.95,
1669            error_rate: 0.5,
1670            ..Default::default()
1671        };
1672        let busy_score = busy.load_score();
1673        lb.update_metrics(&node, busy);
1674
1675        let state_ref = lb.endpoints.get(&node).expect("endpoint still here");
1676        assert!(
1677            (state_ref.load_score() - busy_score).abs() < f64::EPSILON,
1678            "post-update load_score must reflect the new snapshot"
1679        );
1680    }
1681
1682    #[test]
1683    fn test_health_status() {
1684        assert!(HealthStatus::Healthy.can_receive_traffic());
1685        assert!(HealthStatus::Degraded.can_receive_traffic());
1686        assert!(!HealthStatus::Unhealthy.can_receive_traffic());
1687        assert!(!HealthStatus::Unknown.can_receive_traffic());
1688
1689        assert_eq!(HealthStatus::Healthy.weight_multiplier(), 1.0);
1690        assert_eq!(HealthStatus::Degraded.weight_multiplier(), 0.5);
1691        assert_eq!(HealthStatus::Unhealthy.weight_multiplier(), 0.0);
1692    }
1693
1694    #[test]
1695    fn test_load_metrics() {
1696        let metrics = LoadMetrics {
1697            cpu_usage: 0.5,
1698            memory_usage: 0.3,
1699            active_connections: 100,
1700            requests_per_second: 1000.0,
1701            avg_response_time_ms: 50.0,
1702            error_rate: 0.01,
1703            queue_depth: 10,
1704            bandwidth_usage: 0.2,
1705            updated_at: 0,
1706        };
1707
1708        let score = metrics.load_score();
1709        assert!(score > 0.0 && score < 1.0);
1710        assert!(!metrics.is_overloaded());
1711
1712        let overloaded = LoadMetrics {
1713            cpu_usage: 0.95,
1714            ..Default::default()
1715        };
1716        assert!(overloaded.is_overloaded());
1717    }
1718
1719    #[test]
1720    fn test_endpoint() {
1721        let node_id = make_node_id(1);
1722        let endpoint = Endpoint::new(node_id)
1723            .with_weight(200)
1724            .with_zone("us-east-1")
1725            .with_tag("gpu");
1726
1727        assert_eq!(endpoint.weight, 200);
1728        assert_eq!(endpoint.zone, Some("us-east-1".to_string()));
1729        assert!(endpoint.tags.contains(&"gpu".to_string()));
1730        assert!(endpoint.is_available());
1731        assert_eq!(endpoint.effective_weight(), 200.0);
1732    }
1733
1734    #[test]
1735    fn test_load_balancer_round_robin() {
1736        let lb = LoadBalancer::with_strategy(Strategy::RoundRobin);
1737
1738        lb.add_endpoint(Endpoint::new(make_node_id(1)));
1739        lb.add_endpoint(Endpoint::new(make_node_id(2)));
1740        lb.add_endpoint(Endpoint::new(make_node_id(3)));
1741
1742        let ctx = RequestContext::new();
1743
1744        // Should cycle through endpoints
1745        let mut selected = Vec::new();
1746        for _ in 0..6 {
1747            let selection = lb.select(&ctx).unwrap();
1748            selected.push(selection.node_id[0]);
1749        }
1750
1751        // Should have selected each endpoint twice
1752        assert_eq!(selected.iter().filter(|&&x| x == 1).count(), 2);
1753        assert_eq!(selected.iter().filter(|&&x| x == 2).count(), 2);
1754        assert_eq!(selected.iter().filter(|&&x| x == 3).count(), 2);
1755    }
1756
1757    #[test]
1758    fn test_load_balancer_least_connections() {
1759        let lb = LoadBalancer::with_strategy(Strategy::LeastConnections);
1760
1761        lb.add_endpoint(Endpoint::new(make_node_id(1)));
1762        lb.add_endpoint(Endpoint::new(make_node_id(2)));
1763        lb.add_endpoint(Endpoint::new(make_node_id(3)));
1764
1765        let ctx = RequestContext::new();
1766
1767        // First selection - all have 0 connections
1768        let s1 = lb.select(&ctx).unwrap();
1769        // Don't record completion, so connection stays
1770
1771        // Second selection should pick a different node
1772        let s2 = lb.select(&ctx).unwrap();
1773        assert_ne!(s1.node_id, s2.node_id);
1774    }
1775
1776    #[test]
1777    fn test_load_balancer_weighted() {
1778        let lb = LoadBalancer::with_strategy(Strategy::WeightedRoundRobin);
1779
1780        lb.add_endpoint(Endpoint::new(make_node_id(1)).with_weight(100));
1781        lb.add_endpoint(Endpoint::new(make_node_id(2)).with_weight(200));
1782        lb.add_endpoint(Endpoint::new(make_node_id(3)).with_weight(300));
1783
1784        let ctx = RequestContext::new();
1785
1786        let mut counts = std::collections::HashMap::new();
1787        for _ in 0..600 {
1788            let selection = lb.select(&ctx).unwrap();
1789            lb.record_completion(&selection.node_id, true);
1790            *counts.entry(selection.node_id[0]).or_insert(0) += 1;
1791        }
1792
1793        // Node 3 should get most traffic, node 1 least
1794        assert!(counts.get(&3).unwrap() > counts.get(&2).unwrap());
1795        assert!(counts.get(&2).unwrap() > counts.get(&1).unwrap());
1796    }
1797
1798    #[test]
1799    fn test_regression_weighted_lc_preserves_fractional_weights() {
1800        // Regression (LOW, BUGS.md): `select_weighted_least_connections`
1801        // used `.max(1.0)` as a divide-by-zero guard, which also
1802        // collapsed every weight in `(0, 1]` onto `1.0`. An endpoint
1803        // with weight `0.1` was scored identically to one with
1804        // `1.0`, silently degrading weighted-LC into plain LC
1805        // whenever operators configured sub-unit weights.
1806        //
1807        // Fix: use a small positive epsilon instead, so fractional
1808        // weights keep their relative ordering.
1809        let lb = LoadBalancer::with_strategy(Strategy::WeightedLeastConnections);
1810
1811        // Two endpoints with identical connection counts but very
1812        // different fractional weights.
1813        lb.add_endpoint(Endpoint::new(make_node_id(1)).with_weight(10));
1814        lb.add_endpoint(Endpoint::new(make_node_id(2)).with_weight(1));
1815
1816        let ctx = RequestContext::new();
1817        let mut counts = std::collections::HashMap::new();
1818        for _ in 0..600 {
1819            let selection = lb.select(&ctx).unwrap();
1820            // Don't record completion so connections stay matched.
1821            *counts.entry(selection.node_id[0]).or_insert(0_u32) += 1;
1822        }
1823
1824        // The 10x-weighted endpoint should overwhelmingly win the
1825        // "connections/weight" tiebreak when connection counts are
1826        // comparable. With the old `.max(1.0)` collapse, the two
1827        // endpoints would score identically and a later tiebreaker
1828        // would pick one consistently — distribution would be either
1829        // 50/50 or 100/0 depending on ordering.
1830        let high = *counts.get(&1).unwrap_or(&0);
1831        let low = *counts.get(&2).unwrap_or(&0);
1832        assert!(
1833            high > low * 2,
1834            "weight=10 endpoint must get >2x more traffic than weight=1 \
1835             (got {high} vs {low})",
1836        );
1837    }
1838
1839    #[test]
1840    fn test_regression_weighted_rr_precision_past_f64_mantissa() {
1841        // Regression (LOW, BUGS.md): `select_weighted_round_robin`
1842        // used `counter as f64 % total_weight`. Past 2^53 selections
1843        // the `as f64` cast dropped the low bits and rotation stalled
1844        // on a narrow set of indices. The fix scales weights to
1845        // integers and does the modulus in u64 space.
1846        let lb = LoadBalancer::with_strategy(Strategy::WeightedRoundRobin);
1847        lb.add_endpoint(Endpoint::new(make_node_id(1)).with_weight(1));
1848        lb.add_endpoint(Endpoint::new(make_node_id(2)).with_weight(1));
1849        lb.add_endpoint(Endpoint::new(make_node_id(3)).with_weight(1));
1850
1851        // Jump the counter past the f64 mantissa boundary. The raw
1852        // `AtomicU64` is private but `select` starts from the internal
1853        // counter; we simulate a long-running process by selecting
1854        // once (to warm up) and then seeding the rr_counter via the
1855        // backing atomic through a public helper.
1856        //
1857        // Without direct access we exercise ordinary rotation; the
1858        // real precision gain is covered by the unit-level property
1859        // that `(counter % scaled_total)` is exact for all u64 inputs.
1860        let ctx = RequestContext::new();
1861        let mut counts = std::collections::HashMap::new();
1862        for _ in 0..300 {
1863            let sel = lb.select(&ctx).unwrap();
1864            *counts.entry(sel.node_id[0]).or_insert(0) += 1;
1865        }
1866
1867        // Uniform weights → each of three endpoints gets ~100 hits.
1868        // This is a basic sanity test; the u64 exactness is verified
1869        // by construction (integer math has no rounding).
1870        for id in 1..=3u8 {
1871            let got = counts.get(&id).copied().unwrap_or(0);
1872            assert!(
1873                (80..=120).contains(&got),
1874                "endpoint {id} should get ~100 hits, got {got}",
1875            );
1876        }
1877    }
1878
1879    #[test]
1880    fn test_load_balancer_health() {
1881        let lb = LoadBalancer::with_strategy(Strategy::RoundRobin);
1882
1883        lb.add_endpoint(Endpoint::new(make_node_id(1)));
1884        lb.add_endpoint(Endpoint::new(make_node_id(2)));
1885
1886        let ctx = RequestContext::new();
1887
1888        // Mark node 1 as unhealthy
1889        lb.update_health(&make_node_id(1), HealthStatus::Unhealthy);
1890
1891        // All selections should go to node 2
1892        for _ in 0..10 {
1893            let selection = lb.select(&ctx).unwrap();
1894            assert_eq!(selection.node_id[0], 2);
1895        }
1896    }
1897
1898    #[test]
1899    fn test_load_balancer_zone_affinity() {
1900        let config = LoadBalancerConfig {
1901            strategy: Strategy::RoundRobin,
1902            zone_aware: true,
1903            ..Default::default()
1904        };
1905        let lb = LoadBalancer::new(config);
1906
1907        lb.add_endpoint(Endpoint::new(make_node_id(1)).with_zone("us-east"));
1908        lb.add_endpoint(Endpoint::new(make_node_id(2)).with_zone("us-west"));
1909
1910        let ctx = RequestContext::new().with_zone("us-east");
1911
1912        // Should prefer us-east node
1913        for _ in 0..10 {
1914            let selection = lb.select(&ctx).unwrap();
1915            assert_eq!(selection.node_id[0], 1);
1916        }
1917    }
1918
1919    #[test]
1920    fn test_load_balancer_consistent_hash() {
1921        let lb = LoadBalancer::with_strategy(Strategy::ConsistentHash);
1922
1923        lb.add_endpoint(Endpoint::new(make_node_id(1)));
1924        lb.add_endpoint(Endpoint::new(make_node_id(2)));
1925        lb.add_endpoint(Endpoint::new(make_node_id(3)));
1926
1927        // Same session should always go to same node
1928        let ctx = RequestContext::new().with_session("user-123");
1929
1930        let first = lb.select(&ctx).unwrap();
1931        for _ in 0..10 {
1932            let selection = lb.select(&ctx).unwrap();
1933            assert_eq!(selection.node_id, first.node_id);
1934        }
1935    }
1936
1937    #[test]
1938    fn test_load_balancer_circuit_breaker() {
1939        let config = LoadBalancerConfig {
1940            strategy: Strategy::RoundRobin,
1941            circuit_recovery_time_ms: 100,
1942            ..Default::default()
1943        };
1944        let lb = LoadBalancer::new(config);
1945
1946        lb.add_endpoint(Endpoint::new(make_node_id(1)));
1947        lb.add_endpoint(Endpoint::new(make_node_id(2)));
1948
1949        let ctx = RequestContext::new();
1950
1951        // Simulate 5 consecutive failures on node 1
1952        for _ in 0..5 {
1953            lb.record_completion(&make_node_id(1), false);
1954        }
1955
1956        // Node 1's circuit should be open, all traffic to node 2
1957        for _ in 0..10 {
1958            let selection = lb.select(&ctx).unwrap();
1959            assert_eq!(selection.node_id[0], 2);
1960        }
1961    }
1962
1963    #[test]
1964    fn test_load_balancer_stats() {
1965        let lb = LoadBalancer::with_strategy(Strategy::RoundRobin);
1966
1967        lb.add_endpoint(Endpoint::new(make_node_id(1)));
1968        lb.add_endpoint(Endpoint::new(make_node_id(2)));
1969
1970        let ctx = RequestContext::new();
1971
1972        for _ in 0..10 {
1973            let selection = lb.select(&ctx).unwrap();
1974            lb.record_completion(&selection.node_id, true);
1975        }
1976
1977        let stats = lb.stats();
1978        assert_eq!(stats.total_selections, 10);
1979        assert_eq!(stats.active_endpoints, 2);
1980        assert_eq!(stats.healthy_endpoints, 2);
1981    }
1982
1983    #[test]
1984    fn test_no_endpoints_error() {
1985        let lb = LoadBalancer::with_strategy(Strategy::RoundRobin);
1986        let ctx = RequestContext::new();
1987
1988        let result = lb.select(&ctx);
1989        assert!(matches!(
1990            result,
1991            Err(LoadBalancerError::NoEndpointsAvailable)
1992        ));
1993    }
1994
1995    #[test]
1996    fn test_required_tags() {
1997        let lb = LoadBalancer::with_strategy(Strategy::RoundRobin);
1998
1999        lb.add_endpoint(Endpoint::new(make_node_id(1)).with_tag("gpu"));
2000        lb.add_endpoint(Endpoint::new(make_node_id(2)).with_tag("cpu"));
2001
2002        let ctx = RequestContext::new().require_tag("gpu");
2003
2004        // Should only select gpu-tagged node
2005        for _ in 0..10 {
2006            let selection = lb.select(&ctx).unwrap();
2007            assert_eq!(selection.node_id[0], 1);
2008        }
2009    }
2010
2011    // ---- Regression tests ----
2012
2013    #[test]
2014    fn test_regression_consistent_hash_deterministic() {
2015        // Regression: consistent hash iterated DashMap in arbitrary order
2016        // instead of sorted order, so the same key could map to different
2017        // nodes across calls. Now uses sorted ring + binary search.
2018        let lb = LoadBalancer::with_strategy(Strategy::ConsistentHash);
2019
2020        lb.add_endpoint(Endpoint::new(make_node_id(1)));
2021        lb.add_endpoint(Endpoint::new(make_node_id(2)));
2022        lb.add_endpoint(Endpoint::new(make_node_id(3)));
2023        lb.add_endpoint(Endpoint::new(make_node_id(4)));
2024
2025        // Many different keys should each consistently map to the same node
2026        for i in 0..50 {
2027            let key = format!("session-{}", i);
2028            let ctx = RequestContext::new().with_routing_key(&key);
2029
2030            let first = lb.select(&ctx).unwrap().node_id;
2031            for _ in 0..20 {
2032                let again = lb.select(&ctx).unwrap().node_id;
2033                assert_eq!(
2034                    first, again,
2035                    "consistent hash must return same node for key '{}'",
2036                    key
2037                );
2038            }
2039        }
2040    }
2041
2042    #[test]
2043    fn test_regression_nan_metrics_no_panic() {
2044        // Regression: partial_cmp().unwrap() panicked when metrics
2045        // contained NaN. Now uses total_cmp() which handles NaN.
2046        let lb = LoadBalancer::with_strategy(Strategy::LeastLatency);
2047
2048        let mut ep1 = Endpoint::new(make_node_id(1));
2049        ep1.metrics.avg_response_time_ms = f64::NAN;
2050        lb.add_endpoint(ep1);
2051
2052        let mut ep2 = Endpoint::new(make_node_id(2));
2053        ep2.metrics.avg_response_time_ms = 50.0;
2054        lb.add_endpoint(ep2);
2055
2056        let ctx = RequestContext::new();
2057        // Must not panic
2058        let result = lb.select(&ctx);
2059        assert!(result.is_ok(), "NaN metrics must not panic");
2060    }
2061
2062    #[test]
2063    fn test_regression_nan_load_score_no_panic() {
2064        // Same NaN regression for LeastLoad strategy.
2065        let lb = LoadBalancer::with_strategy(Strategy::LeastLoad);
2066
2067        let mut ep1 = Endpoint::new(make_node_id(1));
2068        ep1.metrics.cpu_usage = f64::NAN;
2069        lb.add_endpoint(ep1);
2070
2071        lb.add_endpoint(Endpoint::new(make_node_id(2)));
2072
2073        let ctx = RequestContext::new();
2074        let result = lb.select(&ctx);
2075        assert!(result.is_ok(), "NaN load score must not panic");
2076    }
2077
2078    #[test]
2079    fn test_regression_zone_fallback_respected() {
2080        // Regression: zone_fallback config was never read. When set to
2081        // false, requests with a client_zone that matches no endpoint
2082        // should fail, not silently fall back to non-zone endpoints.
2083        let config = LoadBalancerConfig {
2084            strategy: Strategy::RoundRobin,
2085            zone_aware: true,
2086            zone_fallback: false, // <-- this was previously ignored
2087            ..Default::default()
2088        };
2089        let lb = LoadBalancer::new(config);
2090
2091        lb.add_endpoint(Endpoint::new(make_node_id(1)).with_zone("us-west"));
2092        lb.add_endpoint(Endpoint::new(make_node_id(2)).with_zone("us-west"));
2093
2094        // Client is in eu-central — no endpoints match
2095        let ctx = RequestContext::new().with_zone("eu-central");
2096        let result = lb.select(&ctx);
2097
2098        assert!(
2099            result.is_err(),
2100            "with zone_fallback=false, mismatched zone must return error"
2101        );
2102    }
2103
2104    #[test]
2105    fn test_zone_fallback_true_allows_cross_zone() {
2106        // Verify that zone_fallback=true (default) still works correctly.
2107        let config = LoadBalancerConfig {
2108            strategy: Strategy::RoundRobin,
2109            zone_aware: true,
2110            zone_fallback: true,
2111            ..Default::default()
2112        };
2113        let lb = LoadBalancer::new(config);
2114
2115        lb.add_endpoint(Endpoint::new(make_node_id(1)).with_zone("us-west"));
2116
2117        let ctx = RequestContext::new().with_zone("eu-central");
2118        let result = lb.select(&ctx);
2119
2120        assert!(
2121            result.is_ok(),
2122            "with zone_fallback=true, cross-zone should succeed"
2123        );
2124    }
2125
2126    #[test]
2127    fn test_regression_random_f64_never_reaches_one() {
2128        // Regression: `r as f64 / u64::MAX as f64` could return exactly 1.0
2129        // because `u64::MAX as f64` rounds to 2^64. Now uses the 53-bit
2130        // mantissa / 2^53 pattern which is strictly in [0, 1).
2131        for _ in 0..10_000 {
2132            let r = random_f64();
2133            assert!((0.0..1.0).contains(&r), "random_f64 out of [0,1): {}", r);
2134        }
2135    }
2136
2137    #[test]
2138    fn test_regression_max_connections_cap_enforced_concurrently() {
2139        // Regression: the select() path loaded `connections` with Relaxed
2140        // then incremented in record_request, allowing N concurrent
2141        // selectors to all pass the check and collectively exceed the cap.
2142        // Now reservation is atomic via fetch_update.
2143        use std::sync::Arc;
2144        use std::thread;
2145
2146        const CAP: u32 = 5;
2147        const THREADS: u32 = 16;
2148
2149        let config = LoadBalancerConfig {
2150            strategy: Strategy::RoundRobin,
2151            max_connections_per_endpoint: CAP,
2152            ..Default::default()
2153        };
2154        let lb = Arc::new(LoadBalancer::new(config));
2155        // Single endpoint so every selection contends for the same cap.
2156        lb.add_endpoint(Endpoint::new(make_node_id(1)));
2157
2158        let mut handles = Vec::new();
2159        for _ in 0..THREADS {
2160            let lb = Arc::clone(&lb);
2161            handles.push(thread::spawn(move || {
2162                // Each thread tries to select one connection and holds it.
2163                let ctx = RequestContext::new();
2164                lb.select(&ctx).ok()
2165            }));
2166        }
2167
2168        let successes = handles
2169            .into_iter()
2170            .filter_map(|h| h.join().unwrap())
2171            .count();
2172
2173        // At most CAP threads may have been granted a connection.
2174        assert!(
2175            successes <= CAP as usize,
2176            "concurrent selectors exceeded cap: {} > {}",
2177            successes,
2178            CAP
2179        );
2180        // And the endpoint's connection count must equal successes.
2181        let state = lb.endpoints.get(&make_node_id(1)).unwrap();
2182        assert_eq!(
2183            state.connections.load(Ordering::Acquire),
2184            successes as u32,
2185            "connection counter must match granted selections"
2186        );
2187    }
2188
2189    #[test]
2190    fn test_regression_circuit_breaker_half_open_single_probe() {
2191        // Regression: on recovery expiry, `is_circuit_open` fully closed
2192        // the breaker, letting every concurrent request hit a possibly
2193        // still-broken endpoint. Now exactly one probe is admitted and
2194        // subsequent callers continue to see the breaker as open until the
2195        // probe's outcome is recorded.
2196        let config = LoadBalancerConfig {
2197            strategy: Strategy::RoundRobin,
2198            circuit_recovery_time_ms: 50,
2199            ..Default::default()
2200        };
2201        let lb = LoadBalancer::new(config);
2202        lb.add_endpoint(Endpoint::new(make_node_id(1)));
2203        let ctx = RequestContext::new();
2204
2205        // Trip the breaker by driving 5 real selections that all fail. Going
2206        // through select() keeps the connection counter consistent — calling
2207        // record_completion() without a matching record_request() would
2208        // underflow.
2209        for _ in 0..5 {
2210            let sel = lb.select(&ctx).expect("admitted before trip");
2211            lb.record_completion(&sel.node_id, false);
2212        }
2213
2214        // Before recovery: all requests rejected.
2215        assert!(lb.select(&ctx).is_err(), "open breaker must reject");
2216
2217        // Wait past the recovery window.
2218        std::thread::sleep(Duration::from_millis(75));
2219
2220        // First request after recovery: admitted as the probe.
2221        let probe = lb.select(&ctx);
2222        assert!(probe.is_ok(), "first request after recovery is the probe");
2223
2224        // Second request while probe is still in flight: must be rejected.
2225        let second = lb.select(&ctx);
2226        assert!(
2227            second.is_err(),
2228            "while probe is in flight, other requests must still be rejected"
2229        );
2230
2231        // Probe reports failure → breaker re-opens and recovery timer resets.
2232        lb.record_completion(&probe.unwrap().node_id, false);
2233        assert!(
2234            lb.select(&ctx).is_err(),
2235            "failed probe must keep breaker open"
2236        );
2237
2238        // After another recovery window, the next probe succeeds and closes
2239        // the breaker.
2240        std::thread::sleep(Duration::from_millis(75));
2241        let probe2 = lb.select(&ctx).expect("second probe admitted");
2242        lb.record_completion(&probe2.node_id, true);
2243
2244        // Breaker is now fully closed — subsequent requests go through.
2245        assert!(lb.select(&ctx).is_ok(), "successful probe closes breaker");
2246    }
2247
2248    /// Regression for BUG_AUDIT_2026_04_30_CORE.md #101: pre-fix
2249    /// `is_circuit_open` was both a predicate AND CAS-claimed
2250    /// the half-open probe slot when called past the recovery
2251    /// window. `get_available_endpoints` calls it for every
2252    /// endpoint being filtered; with N circuit-open endpoints
2253    /// past their recovery window, all N got the probe slot
2254    /// claimed but only one was actually selected. The N-1
2255    /// others permanently held `half_open_probe == true` with
2256    /// no in-flight request — every subsequent
2257    /// `is_circuit_open` then returned true forever, and the
2258    /// breaker never recovered until process restart.
2259    ///
2260    /// We pin the fix by:
2261    ///   1. Building a load balancer with 3 endpoints.
2262    ///   2. Tripping each endpoint's breaker.
2263    ///   3. Waiting past the recovery window.
2264    ///   4. Calling `select` once — this triggers
2265    ///      `get_available_endpoints`, which scans all 3
2266    ///      endpoints. Only the SELECTED endpoint should claim
2267    ///      the probe slot; the other 2 must NOT.
2268    ///   5. Asserting the unselected endpoints have
2269    ///      `half_open_probe == false`.
2270    #[test]
2271    fn circuit_breaker_does_not_leak_probe_slot_on_multi_endpoint_scan() {
2272        let config = LoadBalancerConfig {
2273            strategy: Strategy::RoundRobin,
2274            circuit_recovery_time_ms: 50,
2275            ..Default::default()
2276        };
2277        let lb = LoadBalancer::new(config);
2278        for i in 1..=3 {
2279            lb.add_endpoint(Endpoint::new(make_node_id(i)));
2280        }
2281        let ctx = RequestContext::new();
2282
2283        // Trip every endpoint's breaker. Default failure
2284        // threshold is 5 consecutive failures.
2285        for _ in 0..5 {
2286            for i in 1..=3 {
2287                let nid = make_node_id(i);
2288                // Manually trip via record_completion(false). We
2289                // use a dummy connection-counter via select() to
2290                // keep the connection counter consistent; if no
2291                // endpoint is selectable, force it.
2292                if let Some(ep) = lb.endpoints.get(&nid) {
2293                    // Simulate a request lifecycle.
2294                    ep.try_record_request(u32::MAX);
2295                }
2296                lb.record_completion(&nid, false);
2297            }
2298        }
2299
2300        // All breakers should be open. select() rejects pre-recovery.
2301        assert!(
2302            lb.select(&ctx).is_err(),
2303            "all breakers open pre-recovery — select must fail"
2304        );
2305
2306        // Wait past recovery window.
2307        std::thread::sleep(Duration::from_millis(75));
2308
2309        // First select: scans all 3 endpoints. Selects ONE. The
2310        // other 2 must NOT have their probe slots claimed.
2311        let probe = lb.select(&ctx).expect("recovery elapsed → probe admitted");
2312
2313        // Audit the half_open_probe state on each endpoint:
2314        // exactly one (the selected) should be true; the other
2315        // two MUST be false. Pre-fix all three would be true.
2316        let mut claimed = 0u32;
2317        for i in 1..=3 {
2318            let nid = make_node_id(i);
2319            let ep = lb.endpoints.get(&nid).unwrap();
2320            if ep.half_open_probe.load(Ordering::Acquire) {
2321                claimed += 1;
2322                // The claimed slot must be on the selected endpoint.
2323                assert_eq!(
2324                    nid, probe.node_id,
2325                    "only the selected endpoint may have its probe slot claimed"
2326                );
2327            }
2328        }
2329        assert_eq!(
2330            claimed, 1,
2331            "exactly one endpoint should have a claimed probe slot — \
2332             pre-fix this was 3 (the filter-time scan claimed all)"
2333        );
2334
2335        // Probe success → selected endpoint's breaker closes;
2336        // the other two are still in their post-recovery state.
2337        lb.record_completion(&probe.node_id, true);
2338    }
2339
2340    /// Cubic-ai P1: with `N` selectors racing concurrently against
2341    /// a circuit-open endpoint that just exited its recovery window,
2342    /// the strict half-open contract says EXACTLY one selector
2343    /// admits the probe — every other selector that lost the
2344    /// `try_claim_half_open_probe` CAS must skip the endpoint, not
2345    /// fall through to `try_record_request` and send extra traffic
2346    /// to the (still potentially sick) endpoint alongside the real
2347    /// probe.
2348    ///
2349    /// Pre-fix (loose) semantics: losers of the probe-claim CAS
2350    /// proceeded via `try_record_request`, so under sufficient
2351    /// concurrency `successes` could be `> 1`. Post-fix (strict)
2352    /// semantics: losers `continue`, the retry's
2353    /// `get_available_endpoints` sees `half_open_probe == true`
2354    /// and filters the endpoint out, the loop exits with
2355    /// `NoEndpointsAvailable`. Net effect: at most one selector
2356    /// admits per recovery cycle.
2357    ///
2358    /// The test:
2359    ///   1. Trips a single endpoint's breaker.
2360    ///   2. Waits past the recovery window so the next selection
2361    ///      enters half-open.
2362    ///   3. Spawns `N` threads, gates them on a Barrier so they
2363    ///      enter `select()` as close to simultaneously as
2364    ///      possible, each retains its `Selection` (no
2365    ///      `record_completion`) so the probe slot stays claimed.
2366    ///   4. Asserts `successes == 1`. Pre-fix this could fire
2367    ///      `> 1` non-deterministically; post-fix it must be
2368    ///      exactly 1.
2369    #[test]
2370    fn select_strict_half_open_admits_exactly_one_probe_under_concurrent_selectors() {
2371        use std::sync::Barrier;
2372        use std::thread;
2373
2374        const N: usize = 32;
2375        let config = LoadBalancerConfig {
2376            strategy: Strategy::RoundRobin,
2377            circuit_recovery_time_ms: 50,
2378            ..Default::default()
2379        };
2380        let lb = Arc::new(LoadBalancer::new(config));
2381        lb.add_endpoint(Endpoint::new(make_node_id(1)));
2382        let ctx = RequestContext::new();
2383
2384        // Trip the breaker (5 consecutive failures).
2385        for _ in 0..5 {
2386            let sel = lb.select(&ctx).expect("admitted before trip");
2387            lb.record_completion(&sel.node_id, false);
2388        }
2389        assert!(lb.select(&ctx).is_err(), "open breaker must reject");
2390
2391        // Wait past the recovery window so the next selection
2392        // observes `half_open_probe == false` and is admitted.
2393        thread::sleep(Duration::from_millis(75));
2394
2395        // Race N threads through select(). DO NOT call
2396        // record_completion — that would clear the probe slot
2397        // and let the next thread succeed legitimately. The
2398        // strict contract is: exactly one admits while the probe
2399        // is in flight.
2400        let barrier = Arc::new(Barrier::new(N));
2401        let mut handles = Vec::with_capacity(N);
2402        for _ in 0..N {
2403            let lb = Arc::clone(&lb);
2404            let barrier = Arc::clone(&barrier);
2405            handles.push(thread::spawn(move || {
2406                let ctx = RequestContext::new();
2407                barrier.wait();
2408                lb.select(&ctx).is_ok()
2409            }));
2410        }
2411        let successes: usize = handles
2412            .into_iter()
2413            .map(|h| h.join().unwrap() as usize)
2414            .sum();
2415
2416        assert_eq!(
2417            successes, 1,
2418            "strict half-open: exactly one selector must admit while the \
2419             probe is in flight (got {successes} of {N}). Pre-fix the \
2420             loose semantic let losers of the probe-claim CAS proceed \
2421             through try_record_request, sending extra traffic to a \
2422             still-recovering endpoint."
2423        );
2424
2425        // Sanity: the probe slot is still claimed (no completion
2426        // was recorded), and the breaker is still nominally open.
2427        let ep = lb.endpoints.get(&make_node_id(1)).unwrap();
2428        assert!(
2429            ep.half_open_probe.load(Ordering::Acquire),
2430            "probe slot must remain claimed across the test (no completion was recorded)"
2431        );
2432        assert!(
2433            ep.circuit_open.load(Ordering::Acquire),
2434            "circuit must remain open until probe completion"
2435        );
2436    }
2437
2438    /// Companion to `select_strict_half_open_admits_exactly_one_probe...`:
2439    /// strict half-open semantics must NOT serialize independent
2440    /// endpoints. With two distinct circuit-open endpoints both
2441    /// past their recovery window, two concurrent selectors should
2442    /// EACH succeed — one probe per endpoint, since each endpoint's
2443    /// `half_open_probe` is its own slot. Pre-fix this also worked
2444    /// (loose semantic), but a naive "strict gate" implementation
2445    /// could accidentally over-tighten and lock out legitimate
2446    /// per-endpoint probes; this test pins that the gate stays
2447    /// scoped to the endpoint it guards.
2448    #[test]
2449    fn select_strict_half_open_allows_concurrent_probes_on_distinct_endpoints() {
2450        use std::sync::Barrier;
2451        use std::thread;
2452
2453        let config = LoadBalancerConfig {
2454            strategy: Strategy::RoundRobin,
2455            circuit_recovery_time_ms: 50,
2456            ..Default::default()
2457        };
2458        let lb = Arc::new(LoadBalancer::new(config));
2459        // Two endpoints — RR alternates between them.
2460        for i in 1..=2 {
2461            lb.add_endpoint(Endpoint::new(make_node_id(i)));
2462        }
2463        let ctx = RequestContext::new();
2464
2465        // Trip both breakers. Default threshold is 5 consecutive
2466        // failures per endpoint.
2467        for _ in 0..5 {
2468            for i in 1..=2 {
2469                let nid = make_node_id(i);
2470                if let Some(ep) = lb.endpoints.get(&nid) {
2471                    ep.try_record_request(u32::MAX);
2472                }
2473                lb.record_completion(&nid, false);
2474            }
2475        }
2476        assert!(lb.select(&ctx).is_err(), "both breakers open pre-recovery");
2477
2478        // Wait past the recovery window so both endpoints admit a
2479        // probe.
2480        thread::sleep(Duration::from_millis(75));
2481
2482        // Race two threads. With RR + 2 endpoints, each thread
2483        // should pick a different endpoint, claim its own probe,
2484        // and succeed. Pre-fix or post-fix, both succeed — but a
2485        // mis-scoped "strict gate" (e.g., a global probe flag
2486        // instead of per-endpoint) would let only one through.
2487        let barrier = Arc::new(Barrier::new(2));
2488        let mut handles = Vec::with_capacity(2);
2489        for _ in 0..2 {
2490            let lb = Arc::clone(&lb);
2491            let barrier = Arc::clone(&barrier);
2492            handles.push(thread::spawn(move || {
2493                let ctx = RequestContext::new();
2494                barrier.wait();
2495                lb.select(&ctx).ok().map(|s| s.node_id)
2496            }));
2497        }
2498        let picks: Vec<NodeId> = handles
2499            .into_iter()
2500            .filter_map(|h| h.join().unwrap())
2501            .collect();
2502
2503        assert_eq!(
2504            picks.len(),
2505            2,
2506            "both selectors must succeed against distinct endpoints \
2507             (probes are per-endpoint, not global). Got picks: {:?}",
2508            picks
2509        );
2510        assert_ne!(
2511            picks[0], picks[1],
2512            "the two probes must land on different endpoints — \
2513             same-endpoint admission would mean strict gate failed to \
2514             keep one selector out, OR RR selection raced strangely. \
2515             Picks: {:?}",
2516            picks
2517        );
2518
2519        // Both endpoints should now have their probe slots claimed.
2520        for i in 1..=2 {
2521            let ep = lb.endpoints.get(&make_node_id(i)).unwrap();
2522            assert!(
2523                ep.half_open_probe.load(Ordering::Acquire),
2524                "endpoint {} probe slot must be claimed (one probe per endpoint)",
2525                i
2526            );
2527        }
2528    }
2529
2530    /// CR-19: the `ProbeGuard` Drop must release the
2531    /// half-open probe slot when the guard is dropped without
2532    /// committing. We construct an `EndpointState`, manually
2533    /// claim the probe via the guard API, drop the guard, and
2534    /// verify the slot returned to false.
2535    #[test]
2536    fn cr19_probe_guard_drop_releases_probe_slot() {
2537        let ep = EndpointState::new(Endpoint::new(make_node_id(0xCA)));
2538        // Pre: slot is open.
2539        assert!(!ep.half_open_probe.load(Ordering::Acquire));
2540
2541        let guard = ep
2542            .try_claim_half_open_probe()
2543            .expect("first claim must succeed");
2544        // Probe slot is now claimed.
2545        assert!(ep.half_open_probe.load(Ordering::Acquire));
2546
2547        // Drop without commit: slot must roll back.
2548        drop(guard);
2549        assert!(
2550            !ep.half_open_probe.load(Ordering::Acquire),
2551            "CR-19 regression: ProbeGuard Drop must release the probe slot"
2552        );
2553
2554        // Subsequent claim succeeds — slot is reusable.
2555        let _g = ep
2556            .try_claim_half_open_probe()
2557            .expect("post-Drop reclaim must succeed");
2558    }
2559
2560    /// CR-19: `commit()` must SUPPRESS the Drop release. The
2561    /// committed claim survives the guard going out of scope —
2562    /// `record_completion` is then the path that clears it.
2563    #[test]
2564    fn cr19_probe_guard_commit_suppresses_release() {
2565        let ep = EndpointState::new(Endpoint::new(make_node_id(0xBE)));
2566        let guard = ep
2567            .try_claim_half_open_probe()
2568            .expect("first claim must succeed");
2569        guard.commit();
2570        // Slot remains claimed because commit() ran mem::forget.
2571        assert!(
2572            ep.half_open_probe.load(Ordering::Acquire),
2573            "CR-19 regression: ProbeGuard::commit must SUPPRESS Drop release"
2574        );
2575        // A second claim must fail because the slot is taken.
2576        assert!(
2577            ep.try_claim_half_open_probe().is_none(),
2578            "second claim must fail while the first is committed"
2579        );
2580    }
2581
2582    /// CR-19: panic between claim and commit MUST release the
2583    /// slot via Drop. We use `catch_unwind` to confirm the slot
2584    /// rolls back even when the path between claim and the
2585    /// would-be commit unwinds.
2586    #[test]
2587    fn cr19_panic_between_claim_and_commit_releases_probe_slot() {
2588        use std::panic::{catch_unwind, AssertUnwindSafe};
2589
2590        let ep = EndpointState::new(Endpoint::new(make_node_id(0xF0)));
2591        let result = catch_unwind(AssertUnwindSafe(|| {
2592            let _guard = ep
2593                .try_claim_half_open_probe()
2594                .expect("first claim must succeed");
2595            // Simulate a panic on the path between claim and
2596            // commit — exactly what a future-cancel or in-flight
2597            // panic looks like.
2598            panic!("simulated mid-path failure");
2599        }));
2600
2601        assert!(result.is_err(), "the closure must have panicked");
2602        assert!(
2603            !ep.half_open_probe.load(Ordering::Acquire),
2604            "CR-19 regression: panic between claim and commit MUST roll \
2605             back the probe slot via ProbeGuard::drop"
2606        );
2607    }
2608
2609    /// Pin: `select_weighted_round_robin_at` must use the
2610    /// NaN-safe guard `!(total_weight > 0.0)` rather than
2611    /// `total_weight <= 0.0`. NaN compares unequal to everything
2612    /// (including itself), so `NaN <= 0.0` is `false` — the
2613    /// gate falls through to the weighted path where
2614    /// `total_weight.ceil() as u64` produces an undefined
2615    /// (saturating) cast and the cumulative loop never exceeds
2616    /// NaN, biasing every selection to `endpoints[0]`. The
2617    /// negated-greater check catches NaN as well as zero/negative.
2618    ///
2619    /// This is a tripwire: a "simplification" PR that flips the
2620    /// guard back to `<= 0.0` would silently re-introduce the
2621    /// bias whenever any future code path produces a NaN
2622    /// effective weight (e.g. an f64 `weight` field). The pin
2623    /// is scoped to the round-robin function body — the random
2624    /// path elsewhere in this file is governed by its own
2625    /// guard and is not part of this regression.
2626    #[test]
2627    fn weighted_round_robin_guard_must_be_nan_safe() {
2628        let src = include_str!("loadbalance.rs");
2629
2630        // Locate the function header and the next `fn ` after
2631        // it; everything between is the body we pin.
2632        let header = "fn select_weighted_round_robin_at(";
2633        let start = src
2634            .find(header)
2635            .expect("select_weighted_round_robin_at must exist");
2636        // `find` from the next character so we don't match the
2637        // header itself.
2638        let body_start = start + header.len();
2639        let next_fn = src[body_start..]
2640            .find("\n    fn ")
2641            .expect("a following fn must exist (mod-private impl block)")
2642            + body_start;
2643        let body = &src[start..next_fn];
2644
2645        // Strip line comments (everything from `//` to EOL) so a
2646        // doc comment that *describes* the rejected pattern
2647        // doesn't trip the negative assertion below.
2648        let body_no_comments: String = body
2649            .lines()
2650            .map(|l| match l.find("//") {
2651                Some(idx) => &l[..idx],
2652                None => l,
2653            })
2654            .collect::<Vec<_>>()
2655            .join("\n");
2656
2657        assert!(
2658            body_no_comments.contains("!(total_weight > 0.0)"),
2659            "regression: select_weighted_round_robin_at must use the \
2660             NaN-safe guard `!(total_weight > 0.0)`. Without it a NaN \
2661             total_weight (introduced by a future f64 weight path) \
2662             falls through to the weighted code, biasing every \
2663             selection onto endpoints[0]."
2664        );
2665
2666        // Also assert the buggy form is gone from THIS function
2667        // body. The NaN-safe form does not contain `<= 0.0`, so
2668        // this should fail iff someone reverts the guard.
2669        assert!(
2670            !body_no_comments.contains("total_weight <= 0.0"),
2671            "regression: select_weighted_round_robin_at must not \
2672             use the NaN-unsafe guard `total_weight <= 0.0`."
2673        );
2674    }
2675
2676    /// CR-21: pin that this module's `random_usize`
2677    /// uses the abort-on-fail pattern, NOT `expect()` or
2678    /// `.unwrap()`. A getrandom panic here would unwind across
2679    /// any `extern "C"` FFI frame that called into the load-
2680    /// balance layer — undefined behaviour.
2681    #[test]
2682    fn cr21_random_usize_must_not_panic_on_getrandom_failure() {
2683        let needle_expect = format!("getrandom::fill({}{})", "&mut bytes).", "expect");
2684        let needle_unwrap = format!("getrandom::fill({}{})", "&mut bytes).", "unwrap");
2685
2686        let src = include_str!("loadbalance.rs");
2687        for (lineno, line) in src.lines().enumerate() {
2688            let trimmed = line.trim_start();
2689            if trimmed.starts_with("//") {
2690                continue;
2691            }
2692            assert!(
2693                !trimmed.contains(&needle_expect),
2694                "CR-21 regression: getrandom::fill(...).expect(...) reintroduced \
2695                 at loadbalance.rs:{}.\n  line: {}",
2696                lineno + 1,
2697                line
2698            );
2699            assert!(
2700                !trimmed.contains(&needle_unwrap),
2701                "CR-21 regression: getrandom::fill(...).unwrap() reintroduced \
2702                 at loadbalance.rs:{}.\n  line: {}",
2703                lineno + 1,
2704                line
2705            );
2706        }
2707    }
2708
2709    // ---------- Untested strategy coverage ----------
2710    //
2711    // Existing tests cover RoundRobin, WeightedRoundRobin,
2712    // LeastConnections, and WeightedLeastConnections. The
2713    // remaining strategies — Random, WeightedRandom,
2714    // ConsistentHash, PowerOfTwo, Adaptive — share the
2715    // hot path and a regression in any of them would silently
2716    // mis-route requests. Each test below pins the selection
2717    // reason so a future refactor that swaps strategies under
2718    // the same enum variant gets caught.
2719
2720    fn three_endpoint_lb(strategy: Strategy) -> LoadBalancer {
2721        let lb = LoadBalancer::with_strategy(strategy);
2722        lb.add_endpoint(Endpoint::new(make_node_id(1)));
2723        lb.add_endpoint(Endpoint::new(make_node_id(2)));
2724        lb.add_endpoint(Endpoint::new(make_node_id(3)));
2725        lb
2726    }
2727
2728    #[test]
2729    fn random_strategy_selects_among_all_endpoints_with_random_reason() {
2730        let lb = three_endpoint_lb(Strategy::Random);
2731        let ctx = RequestContext::new();
2732        let mut seen = std::collections::HashSet::new();
2733        for _ in 0..200 {
2734            let s = lb.select(&ctx).unwrap();
2735            assert_eq!(s.reason, SelectionReason::Random);
2736            seen.insert(s.node_id[0]);
2737        }
2738        // 200 draws across 3 endpoints. We only assert "more than
2739        // one distinct endpoint was selected" — strong enough to
2740        // catch a regression that hard-codes a single index, weak
2741        // enough to never flake on legitimate RNG outcomes. A
2742        // stricter `== 3` check would be ~1e-35 likely to fail
2743        // under a uniform RNG; this version is exactly zero.
2744        assert!(
2745            seen.len() >= 2,
2746            "Random strategy collapsed to a single endpoint over 200 draws: {seen:?}",
2747        );
2748    }
2749
2750    #[test]
2751    fn weighted_random_respects_relative_weights() {
2752        let lb = LoadBalancer::with_strategy(Strategy::WeightedRandom);
2753        lb.add_endpoint(Endpoint::new(make_node_id(1)).with_weight(10));
2754        lb.add_endpoint(Endpoint::new(make_node_id(2)).with_weight(100));
2755        let ctx = RequestContext::new();
2756        let mut counts = std::collections::HashMap::new();
2757        for _ in 0..400 {
2758            let s = lb.select(&ctx).unwrap();
2759            assert_eq!(s.reason, SelectionReason::Weighted);
2760            *counts.entry(s.node_id[0]).or_insert(0) += 1;
2761        }
2762        // Heavy weight (100) must dominate the light weight (10).
2763        // Allow a wide margin — we're not testing the RNG quality,
2764        // just that the weight is read.
2765        let light = *counts.get(&1).unwrap_or(&0);
2766        let heavy = *counts.get(&2).unwrap_or(&0);
2767        assert!(
2768            heavy > light * 3,
2769            "weighted-random ignored weights: light={light}, heavy={heavy}",
2770        );
2771    }
2772
2773    #[test]
2774    fn weighted_random_with_zero_total_weight_falls_back_to_uniform_random() {
2775        let lb = LoadBalancer::with_strategy(Strategy::WeightedRandom);
2776        // Both endpoints with weight 0 — `total_weight <= 0.0`
2777        // forces the fallback path inside select_weighted_random.
2778        lb.add_endpoint(Endpoint::new(make_node_id(1)).with_weight(0));
2779        lb.add_endpoint(Endpoint::new(make_node_id(2)).with_weight(0));
2780        let ctx = RequestContext::new();
2781
2782        // Must not panic; must return a real selection. The reason
2783        // is `Random` because the implementation delegates to
2784        // `select_random` when total_weight is non-positive.
2785        let s = lb.select(&ctx).unwrap();
2786        assert_eq!(s.reason, SelectionReason::Random);
2787    }
2788
2789    #[test]
2790    fn consistent_hash_returns_same_endpoint_for_same_routing_key() {
2791        let lb = three_endpoint_lb(Strategy::ConsistentHash);
2792        let ctx = RequestContext::new().with_routing_key("user-42");
2793
2794        let s1 = lb.select(&ctx).unwrap();
2795        for _ in 0..50 {
2796            let s = lb.select(&ctx).unwrap();
2797            assert_eq!(s.node_id, s1.node_id, "consistent-hash diverged");
2798        }
2799    }
2800
2801    #[test]
2802    fn power_of_two_returns_powerof_two_reason() {
2803        let lb = three_endpoint_lb(Strategy::PowerOfTwo);
2804        let ctx = RequestContext::new();
2805        let s = lb.select(&ctx).unwrap();
2806        assert_eq!(s.reason, SelectionReason::PowerOfTwo);
2807    }
2808
2809    #[test]
2810    fn adaptive_strategy_selects_an_endpoint() {
2811        // Adaptive picks between strategies based on average load;
2812        // with default (no metrics) all endpoints score 0 so it
2813        // takes the low-load branch. Pin that the strategy runs
2814        // and returns a valid selection.
2815        let lb = three_endpoint_lb(Strategy::Adaptive);
2816        let ctx = RequestContext::new();
2817        let s = lb.select(&ctx).unwrap();
2818        assert!(matches!(s.node_id[0], 1..=3));
2819    }
2820
2821    // ---------- endpoints() snapshot ----------
2822
2823    #[test]
2824    fn endpoints_snapshot_reflects_added_endpoints() {
2825        let lb = LoadBalancer::with_strategy(Strategy::RoundRobin);
2826        lb.add_endpoint(Endpoint::new(make_node_id(1)).with_weight(50));
2827        lb.add_endpoint(Endpoint::new(make_node_id(2)).with_weight(75));
2828
2829        let snapshot = lb.endpoints();
2830        assert_eq!(snapshot.len(), 2);
2831        // Order isn't guaranteed (DashMap iteration) — assert
2832        // by node_id membership rather than position.
2833        let weights: std::collections::HashMap<u8, u32> =
2834            snapshot.iter().map(|e| (e.node_id[0], e.weight)).collect();
2835        assert_eq!(weights.get(&1), Some(&50));
2836        assert_eq!(weights.get(&2), Some(&75));
2837    }
2838
2839    /// The endpoint snapshot (iterated by select/stats/count) must be rebuilt
2840    /// on remove: counts drop and the removed endpoint is never selected.
2841    #[test]
2842    fn removing_an_endpoint_updates_snapshot_and_stops_selection() {
2843        let lb = LoadBalancer::with_strategy(Strategy::RoundRobin);
2844        for i in 1..=3u8 {
2845            lb.add_endpoint(Endpoint::new(make_node_id(i)));
2846        }
2847        assert_eq!(lb.endpoint_count(), 3);
2848        assert_eq!(lb.stats().active_endpoints, 3);
2849
2850        lb.remove_endpoint(&make_node_id(2));
2851        assert_eq!(lb.endpoint_count(), 2, "count must drop after remove");
2852        assert_eq!(lb.stats().active_endpoints, 2);
2853        assert!(
2854            lb.endpoints().iter().all(|e| e.node_id != make_node_id(2)),
2855            "removed endpoint must be gone from the snapshot"
2856        );
2857
2858        // The removed endpoint must never be selected.
2859        let ctx = RequestContext::new();
2860        for _ in 0..50 {
2861            let sel = lb.select(&ctx).unwrap();
2862            assert_ne!(
2863                sel.node_id,
2864                make_node_id(2),
2865                "removed endpoint must not be selected"
2866            );
2867            lb.record_completion(&sel.node_id, true);
2868        }
2869    }
2870
2871    /// Concurrent membership changes must leave `endpoint_list` (read by
2872    /// select/stats/count) exactly consistent with the authoritative
2873    /// `endpoints` map. Pre-fix, a rebuild that observed the map before a
2874    /// concurrent mutation could store its stale snapshot last, dropping a
2875    /// just-added endpoint from rotation (or resurrecting a removed one).
2876    #[test]
2877    fn concurrent_membership_changes_keep_snapshot_consistent() {
2878        use std::collections::HashSet;
2879        use std::sync::Arc as StdArc;
2880
2881        let lb = StdArc::new(LoadBalancer::with_strategy(Strategy::RoundRobin));
2882        let n: u8 = 64;
2883
2884        // Concurrent adds.
2885        let mut handles = Vec::new();
2886        for i in 1..=n {
2887            let lb = StdArc::clone(&lb);
2888            handles.push(std::thread::spawn(move || {
2889                lb.add_endpoint(Endpoint::new(make_node_id(i)));
2890            }));
2891        }
2892        for h in handles {
2893            h.join().unwrap();
2894        }
2895
2896        // The snapshot must match the authoritative map exactly — not a
2897        // single add may be lost to a stale rebuild.
2898        assert_eq!(lb.endpoint_count(), n as usize);
2899        assert_eq!(lb.endpoint_count(), lb.endpoints.len());
2900        let snap: HashSet<_> = lb.endpoints().iter().map(|e| e.node_id).collect();
2901        assert_eq!(
2902            snap.len(),
2903            n as usize,
2904            "every added endpoint must appear in the snapshot"
2905        );
2906
2907        // Concurrent removes (the even ids) must stay consistent too.
2908        let mut handles = Vec::new();
2909        for i in (2..=n).step_by(2) {
2910            let lb = StdArc::clone(&lb);
2911            handles.push(std::thread::spawn(move || {
2912                lb.remove_endpoint(&make_node_id(i));
2913            }));
2914        }
2915        for h in handles {
2916            h.join().unwrap();
2917        }
2918        assert_eq!(lb.endpoint_count(), lb.endpoints.len());
2919        assert_eq!(lb.endpoint_count(), (n / 2) as usize);
2920        assert!(
2921            lb.endpoints().iter().all(|e| {
2922                let raw = e.node_id;
2923                // Odd ids only should remain.
2924                lb.endpoints.contains_key(&raw)
2925            }),
2926            "snapshot must contain only live endpoints"
2927        );
2928    }
2929
2930    /// A snapshot taken before a removal still holds the endpoint's `Arc`
2931    /// (exactly what an in-flight `select()` iterates). After removal that
2932    /// endpoint must report unavailable through the *stale* snapshot — so the
2933    /// strategy filters it out instead of selecting a gone endpoint and
2934    /// burning a reservation retry into a false `NoEndpointsAvailable`.
2935    #[test]
2936    fn removed_endpoint_is_unavailable_through_a_stale_snapshot() {
2937        let lb = LoadBalancer::with_strategy(Strategy::RoundRobin);
2938        lb.add_endpoint(Endpoint::new(make_node_id(1)));
2939        lb.add_endpoint(Endpoint::new(make_node_id(2)));
2940
2941        // Capture the snapshot BEFORE removal — holds Arcs to both endpoints.
2942        let stale = lb.endpoint_list.load_full();
2943        assert_eq!(stale.len(), 2);
2944
2945        lb.remove_endpoint(&make_node_id(1));
2946
2947        // Through the stale snapshot, the removed endpoint is now unavailable;
2948        // the survivor stays available.
2949        for state in stale.iter() {
2950            if state.node_id == make_node_id(1) {
2951                assert!(
2952                    !state.is_available(),
2953                    "removed endpoint must be unavailable via the stale snapshot"
2954                );
2955            } else {
2956                assert!(
2957                    state.is_available(),
2958                    "surviving endpoint must remain available"
2959                );
2960            }
2961        }
2962    }
2963
2964    // ---------- LoadBalancerError Display ----------
2965
2966    #[test]
2967    fn load_balancer_error_display_covers_every_variant() {
2968        let id = make_node_id(7);
2969        assert_eq!(
2970            format!("{}", LoadBalancerError::NoEndpointsAvailable),
2971            "no endpoints available"
2972        );
2973        assert_eq!(
2974            format!("{}", LoadBalancerError::AllEndpointsUnhealthy),
2975            "all endpoints unhealthy"
2976        );
2977        assert_eq!(
2978            format!("{}", LoadBalancerError::NoMatchingEndpoints),
2979            "no endpoints match required tags"
2980        );
2981        assert!(format!("{}", LoadBalancerError::EndpointNotFound(id))
2982            .starts_with("endpoint not found:"));
2983        assert!(format!("{}", LoadBalancerError::CircuitOpen(id))
2984            .starts_with("circuit breaker open for:"));
2985        assert!(format!("{}", LoadBalancerError::MaxConnectionsReached(id))
2986            .starts_with("max connections reached for:"));
2987    }
2988
2989    /// Finding #14: a half-open probe claimed at selection time but
2990    /// never completed (the production `GroupCoordinator::route_event`
2991    /// path calls `select` and never `record_completion`) must NOT pin
2992    /// the recovered endpoint out of rotation forever. The watchdog
2993    /// reclaims the abandoned slot once it has been held past the
2994    /// recovery window, so a later selection is admitted as a fresh
2995    /// probe.
2996    #[test]
2997    fn cr14_abandoned_half_open_probe_is_reclaimed_after_recovery_window() {
2998        let config = LoadBalancerConfig {
2999            strategy: Strategy::RoundRobin,
3000            circuit_recovery_time_ms: 50,
3001            ..Default::default()
3002        };
3003        let lb = LoadBalancer::new(config);
3004        lb.add_endpoint(Endpoint::new(make_node_id(1)));
3005        let ctx = RequestContext::new();
3006
3007        // Trip the breaker (5 consecutive failures).
3008        for _ in 0..5 {
3009            let sel = lb.select(&ctx).expect("admitted before trip");
3010            lb.record_completion(&sel.node_id, false);
3011        }
3012        assert!(lb.select(&ctx).is_err(), "open breaker must reject");
3013
3014        // Wait past the recovery window so the next selection admits
3015        // the probe.
3016        std::thread::sleep(Duration::from_millis(75));
3017
3018        // Claim the probe via select() but DO NOT record completion —
3019        // this models a caller that drops the selection (route_event).
3020        let probe = lb
3021            .select(&ctx)
3022            .expect("first request after recovery is the probe");
3023        let ep = lb.endpoints.get(&probe.node_id).unwrap();
3024        assert!(
3025            ep.half_open_probe.load(Ordering::Acquire),
3026            "probe slot is claimed right after selection"
3027        );
3028        drop(ep);
3029
3030        // Immediately, the slot is fresh — another selector is still
3031        // (correctly) rejected.
3032        assert!(
3033            lb.select(&ctx).is_err(),
3034            "a freshly-claimed probe still gates concurrent selectors"
3035        );
3036
3037        // Let the abandoned probe age past a full recovery window.
3038        std::thread::sleep(Duration::from_millis(75));
3039
3040        // The watchdog must reclaim the stranded slot: a later
3041        // selection is admitted again rather than rejected forever.
3042        let reclaimed = lb.select(&ctx);
3043        assert!(
3044            reclaimed.is_ok(),
3045            "an abandoned half-open probe (no record_completion) must be \
3046             reclaimed after the recovery window so the recovered endpoint \
3047             returns to rotation — pre-fix the bare bool pinned it out forever"
3048        );
3049    }
3050
3051    /// Follow-up to cr14: a `circuit_recovery_time_ms == 0` config must
3052    /// not collapse the breaker. Pre-fix, a zero recovery window made
3053    /// `open_time.elapsed() < ZERO` always false (the breaker never held
3054    /// open) and `claimed_at.elapsed() >= ZERO` always true (every probe
3055    /// judged abandoned), so a tripped breaker admitted instantly and the
3056    /// single-probe half-open gate vanished. The `>= 1ms` clamp keeps the
3057    /// breaker shut inside its (tiny) recovery window.
3058    #[test]
3059    fn cr14b_zero_recovery_time_does_not_collapse_breaker() {
3060        let config = LoadBalancerConfig {
3061            strategy: Strategy::RoundRobin,
3062            circuit_recovery_time_ms: 0,
3063            ..Default::default()
3064        };
3065        let lb = LoadBalancer::new(config);
3066        lb.add_endpoint(Endpoint::new(make_node_id(1)));
3067        let ctx = RequestContext::new();
3068
3069        // Trip the breaker (5 consecutive failures).
3070        for _ in 0..5 {
3071            let sel = lb.select(&ctx).expect("admitted before trip");
3072            lb.record_completion(&sel.node_id, false);
3073        }
3074        // Immediately after tripping — microseconds later, inside the
3075        // clamped 1ms recovery window — the breaker must still reject.
3076        // Pre-fix the 0 window admitted instantly, collapsing the gate
3077        // and letting unbounded concurrent probes hit a failing endpoint.
3078        assert!(
3079            lb.select(&ctx).is_err(),
3080            "a 0 recovery-time config must not collapse the open breaker \
3081             into instant admission"
3082        );
3083    }
3084
3085    /// Finding #15: re-adding an already-present endpoint must NOT
3086    /// leak its previous hash-ring vnodes. Pre-fix `add_endpoint`
3087    /// inserted a fresh ~`virtual_nodes` set without removing the
3088    /// node's existing vnodes, leaking ~150 ring entries per re-add.
3089    #[test]
3090    fn cr15_readd_endpoint_does_not_leak_hash_ring_vnodes() {
3091        let lb = LoadBalancer::with_strategy(Strategy::ConsistentHash);
3092        let node = make_node_id(1);
3093
3094        lb.add_endpoint(Endpoint::new(node));
3095        let after_first = lb.hash_ring.len();
3096        assert_eq!(
3097            after_first, lb.virtual_nodes as usize,
3098            "a fresh add must create exactly virtual_nodes vnodes"
3099        );
3100
3101        // Re-add the same node several times (reconnect / weight change).
3102        for _ in 0..5 {
3103            lb.add_endpoint(Endpoint::new(node).with_weight(200));
3104        }
3105
3106        assert_eq!(
3107            lb.hash_ring.len(),
3108            lb.virtual_nodes as usize,
3109            "re-adding the same node must not leak stale vnodes — the ring \
3110             size must stay at virtual_nodes, not grow by ~150 per re-add"
3111        );
3112
3113        // Every ring entry must still resolve to this node.
3114        assert!(
3115            lb.hash_ring.iter().all(|e| *e.value() == node),
3116            "all vnodes must belong to the re-added node"
3117        );
3118    }
3119
3120    /// Finding #29: the hash-ring collision probe must be
3121    /// NON-DESTRUCTIVE. Pre-fix `while insert(..).is_some()`
3122    /// overwrote an occupied slot (clobbering another node's vnode)
3123    /// before probing onward. We force a guaranteed collision by
3124    /// pre-occupying every slot `add_to_hash_ring` would target for a
3125    /// node and assert the existing occupants survive.
3126    #[test]
3127    fn cr29_hash_ring_collision_probe_preserves_existing_occupant() {
3128        let lb = LoadBalancer::with_strategy(Strategy::ConsistentHash);
3129        let victim = make_node_id(0xAA);
3130        let newcomer = make_node_id(0xBB);
3131
3132        // Pre-occupy, with `victim`, every slot that add_to_hash_ring
3133        // will hash to for `newcomer` (so EVERY vnode insert collides).
3134        for i in 0..lb.virtual_nodes {
3135            let key = format!("{:?}-{}", newcomer, i);
3136            let hash = lb.hash_key(&key);
3137            lb.hash_ring.insert(hash, victim);
3138        }
3139        let victim_slots_before = lb.hash_ring.len();
3140        assert_eq!(victim_slots_before, lb.virtual_nodes as usize);
3141
3142        // Now add the newcomer — every primary slot collides with a
3143        // victim vnode and must be linear-probed past, NOT clobbered.
3144        // Fresh add (newcomer was never present) => was_present = false.
3145        lb.add_to_hash_ring(newcomer, false);
3146
3147        // None of the victim's vnodes may have been overwritten.
3148        let victim_count = lb.hash_ring.iter().filter(|e| *e.value() == victim).count();
3149        assert_eq!(
3150            victim_count, lb.virtual_nodes as usize,
3151            "the collision probe must preserve the existing occupant's vnodes \
3152             (pre-fix destructive insert clobbered them)"
3153        );
3154        // The newcomer still gets its full vnode allotment.
3155        let newcomer_count = lb
3156            .hash_ring
3157            .iter()
3158            .filter(|e| *e.value() == newcomer)
3159            .count();
3160        assert_eq!(
3161            newcomer_count, lb.virtual_nodes as usize,
3162            "the newcomer must still get its full vnode allotment via probing"
3163        );
3164    }
3165
3166    /// Finding #33: weighted-round-robin must not starve endpoints
3167    /// when every effective weight is sub-unit. Two `Degraded`
3168    /// endpoints (weight 1 × 0.5 multiplier = 0.5 effective) sum to
3169    /// `total_weight == 1.0`; pre-fix `total_ceil = ceil(1.0) = 1`
3170    /// made `target == 0` always and the cumulative loop always
3171    /// picked the first endpoint, starving the second.
3172    #[test]
3173    fn cr33_weighted_rr_does_not_starve_sub_unit_weights() {
3174        let lb = LoadBalancer::with_strategy(Strategy::WeightedRoundRobin);
3175        lb.add_endpoint(Endpoint::new(make_node_id(1)).with_weight(1));
3176        lb.add_endpoint(Endpoint::new(make_node_id(2)).with_weight(1));
3177
3178        // Degrade both so effective weight is 0.5 each (total 1.0).
3179        lb.update_health(&make_node_id(1), HealthStatus::Degraded);
3180        lb.update_health(&make_node_id(2), HealthStatus::Degraded);
3181
3182        let ctx = RequestContext::new();
3183        let mut counts = std::collections::HashMap::new();
3184        for _ in 0..200 {
3185            let sel = lb.select(&ctx).expect("an endpoint must be selectable");
3186            lb.record_completion(&sel.node_id, true);
3187            *counts.entry(sel.node_id[0]).or_insert(0_u32) += 1;
3188        }
3189
3190        let first = *counts.get(&1).unwrap_or(&0);
3191        let second = *counts.get(&2).unwrap_or(&0);
3192        assert!(
3193            second > 0,
3194            "second sub-unit-weight endpoint must NOT be starved (got {first} \
3195             vs {second}) — pre-fix ceil collapsed the rotation to bucket 0",
3196        );
3197        // Equal effective weights → roughly even split.
3198        assert!(
3199            first > 0 && second > 0 && first.abs_diff(second) < 200 / 2,
3200            "two equal sub-unit weights should split roughly evenly \
3201             (got {first} vs {second})",
3202        );
3203    }
3204
3205    /// Review follow-up to #33: deriving the wheel from the weights
3206    /// (`ceil(total / min_weight)`) keeps EXACT integer ratios for small
3207    /// clusters — the wheel is the natural 3-cycle for a 2:1 split,
3208    /// yielding A,A,B. The replaced `WRR_MIN_WHEEL = 64` floor turned
3209    /// this into a 64-position approximation, reshaping the rotation for
3210    /// any cluster whose total weight was below the floor.
3211    #[test]
3212    fn cr33_weighted_rr_preserves_exact_integer_ratios() {
3213        let lb = LoadBalancer::with_strategy(Strategy::WeightedRoundRobin);
3214        let endpoints = vec![
3215            Arc::new(EndpointState::new(
3216                Endpoint::new(make_node_id(1)).with_weight(2),
3217            )),
3218            Arc::new(EndpointState::new(
3219                Endpoint::new(make_node_id(2)).with_weight(1),
3220            )),
3221        ];
3222        // Two full turns of the natural 3-position wheel.
3223        let picks: Vec<u8> = (0..6)
3224            .map(|c| lb.select_weighted_round_robin_at(&endpoints, c).node_id[0])
3225            .collect();
3226        assert_eq!(
3227            picks,
3228            vec![1, 1, 2, 1, 1, 2],
3229            "2:1 integer weights must cycle A,A,B exactly (got {picks:?})"
3230        );
3231    }
3232}