net/adapter/net/behavior/loadbalance.rs
1//! Phase 4G: Distributed Load Balancing (LOAD-BALANCE)
2//!
3//! This module provides distributed load balancing across the Net network:
4//! - Multiple load balancing strategies (round-robin, weighted, least-connections, etc.)
5//! - Health-aware routing with automatic failover
6//! - Load metrics collection and aggregation
7//! - Adaptive load balancing based on real-time conditions
8
9use arc_swap::ArcSwap;
10use dashmap::DashMap;
11use parking_lot::{Mutex, RwLock};
12use serde::{Deserialize, Serialize};
13use std::sync::atomic::{AtomicU32, AtomicU64, Ordering};
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use super::metadata::NodeId;
18
19/// Load balancing strategy
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
21#[serde(rename_all = "snake_case")]
22pub enum Strategy {
23 /// Round-robin selection
24 #[default]
25 RoundRobin,
26 /// Weighted round-robin based on node capacity
27 WeightedRoundRobin,
28 /// Select node with fewest active connections
29 LeastConnections,
30 /// Weighted least connections
31 WeightedLeastConnections,
32 /// Random selection
33 Random,
34 /// Weighted random selection
35 WeightedRandom,
36 /// Consistent hashing for sticky sessions
37 ConsistentHash,
38 /// Select based on lowest latency
39 LeastLatency,
40 /// Select based on lowest resource utilization
41 LeastLoad,
42 /// Power of two random choices
43 PowerOfTwo,
44 /// Adaptive strategy based on conditions
45 Adaptive,
46}
47
48/// Health status of a node
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
50#[serde(rename_all = "lowercase")]
51pub enum HealthStatus {
52 /// Node is healthy and accepting traffic
53 #[default]
54 Healthy,
55 /// Node is degraded but still accepting traffic
56 Degraded,
57 /// Node is unhealthy and should not receive traffic
58 Unhealthy,
59 /// Node health is unknown
60 Unknown,
61}
62
63impl HealthStatus {
64 /// Check if node can receive traffic
65 pub fn can_receive_traffic(&self) -> bool {
66 matches!(self, HealthStatus::Healthy | HealthStatus::Degraded)
67 }
68
69 /// Get weight multiplier for this health status
70 pub fn weight_multiplier(&self) -> f64 {
71 match self {
72 HealthStatus::Healthy => 1.0,
73 HealthStatus::Degraded => 0.5,
74 HealthStatus::Unhealthy => 0.0,
75 HealthStatus::Unknown => 0.25,
76 }
77 }
78}
79
80/// Load metrics for a node
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct LoadMetrics {
83 /// CPU utilization (0.0 - 1.0)
84 pub cpu_usage: f64,
85 /// Memory utilization (0.0 - 1.0)
86 pub memory_usage: f64,
87 /// Active connections count
88 pub active_connections: u32,
89 /// Requests per second
90 pub requests_per_second: f64,
91 /// Average response time in milliseconds
92 pub avg_response_time_ms: f64,
93 /// Error rate (0.0 - 1.0)
94 pub error_rate: f64,
95 /// Queue depth
96 pub queue_depth: u32,
97 /// Bandwidth utilization (0.0 - 1.0)
98 pub bandwidth_usage: f64,
99 /// Last update timestamp (microseconds since epoch)
100 pub updated_at: u64,
101}
102
103impl Default for LoadMetrics {
104 fn default() -> Self {
105 Self {
106 cpu_usage: 0.0,
107 memory_usage: 0.0,
108 active_connections: 0,
109 requests_per_second: 0.0,
110 avg_response_time_ms: 0.0,
111 error_rate: 0.0,
112 queue_depth: 0,
113 bandwidth_usage: 0.0,
114 updated_at: 0,
115 }
116 }
117}
118
119impl LoadMetrics {
120 /// Calculate composite load score (0.0 = no load, 1.0 = fully loaded)
121 pub fn load_score(&self) -> f64 {
122 // Weighted average of different metrics
123 let cpu_weight = 0.3;
124 let memory_weight = 0.2;
125 let connections_weight = 0.2;
126 let response_time_weight = 0.15;
127 let error_weight = 0.15;
128
129 // Normalize response time (assume 1000ms = fully loaded)
130 let normalized_response_time = (self.avg_response_time_ms / 1000.0).min(1.0);
131
132 cpu_weight * self.cpu_usage
133 + memory_weight * self.memory_usage
134 + connections_weight * (self.active_connections as f64 / 10000.0).min(1.0)
135 + response_time_weight * normalized_response_time
136 + error_weight * self.error_rate
137 }
138
139 /// Check if node is overloaded
140 pub fn is_overloaded(&self) -> bool {
141 self.cpu_usage > 0.9
142 || self.memory_usage > 0.95
143 || self.error_rate > 0.1
144 || self.queue_depth > 1000
145 }
146}
147
148/// Node endpoint information
149#[derive(Debug, Clone, Serialize, Deserialize)]
150pub struct Endpoint {
151 /// Node ID
152 pub node_id: NodeId,
153 /// Weight for weighted strategies (higher = more traffic)
154 pub weight: u32,
155 /// Health status
156 pub health: HealthStatus,
157 /// Load metrics
158 pub metrics: LoadMetrics,
159 /// Tags for filtering
160 pub tags: Vec<String>,
161 /// Priority (lower = higher priority for failover)
162 pub priority: u32,
163 /// Whether endpoint is enabled
164 pub enabled: bool,
165 /// Zone/region for locality-aware routing
166 pub zone: Option<String>,
167}
168
169impl Endpoint {
170 /// Create a new endpoint
171 pub fn new(node_id: NodeId) -> Self {
172 Self {
173 node_id,
174 weight: 100,
175 health: HealthStatus::Healthy,
176 metrics: LoadMetrics::default(),
177 tags: Vec::new(),
178 priority: 0,
179 enabled: true,
180 zone: None,
181 }
182 }
183
184 /// Set weight
185 pub fn with_weight(mut self, weight: u32) -> Self {
186 self.weight = weight;
187 self
188 }
189
190 /// Set zone
191 pub fn with_zone(mut self, zone: impl Into<String>) -> Self {
192 self.zone = Some(zone.into());
193 self
194 }
195
196 /// Add tag
197 pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
198 self.tags.push(tag.into());
199 self
200 }
201
202 /// Set priority
203 pub fn with_priority(mut self, priority: u32) -> Self {
204 self.priority = priority;
205 self
206 }
207
208 /// Effective weight considering health
209 pub fn effective_weight(&self) -> f64 {
210 if !self.enabled {
211 return 0.0;
212 }
213 self.weight as f64 * self.health.weight_multiplier()
214 }
215
216 /// Check if endpoint can receive traffic
217 pub fn is_available(&self) -> bool {
218 self.enabled && self.health.can_receive_traffic()
219 }
220}
221
222/// Endpoint state tracked by the load balancer
223struct EndpointState {
224 /// Immutable endpoint config (node_id, weight, tags, zone, priority)
225 node_id: NodeId,
226 weight: u32,
227 tags: Vec<String>,
228 zone: Option<String>,
229 priority: u32,
230 /// Mutable health status
231 health: RwLock<HealthStatus>,
232 /// Mutable metrics.
233 ///
234 /// Per perf #149, switched `RwLock<LoadMetrics>` →
235 /// `ArcSwap<LoadMetrics>` so the per-event `load_score()` read
236 /// in every selection strategy is one lock-free Acquire load
237 /// instead of a parking_lot read + full struct clone. Updates
238 /// (operator-cadence) call `metrics.store(Arc::new(...))`.
239 metrics: ArcSwap<LoadMetrics>,
240 /// Whether endpoint is enabled
241 enabled: std::sync::atomic::AtomicBool,
242 /// Current connection count
243 connections: AtomicU32,
244 /// Total requests served
245 total_requests: AtomicU64,
246 /// Failed requests
247 failed_requests: AtomicU64,
248 /// Consecutive failures
249 consecutive_failures: AtomicU32,
250 /// Circuit breaker state
251 circuit_open: std::sync::atomic::AtomicBool,
252 /// Circuit open time
253 circuit_open_time: Mutex<Option<Instant>>,
254 /// Whether a half-open probe request is currently in flight. Only one
255 /// request is admitted per recovery cycle to test the endpoint.
256 half_open_probe: std::sync::atomic::AtomicBool,
257 /// Watchdog timestamp for the half-open probe claim.
258 ///
259 /// Stamped with `Instant::now()` whenever `half_open_probe` flips
260 /// `false -> true` (at selection). `select` returns a `Selection` but
261 /// has no way to bind an RAII guard to it (the dashmap `Ref` is local
262 /// and `Selection` is a `Clone` public type consumed by the FFI/SDK
263 /// bindings), so if a caller — e.g. `GroupCoordinator::route_event`,
264 /// which never calls `record_completion` — drops the selection without
265 /// recording completion, the bare bool would stay `true` forever and
266 /// `is_circuit_open` would keep the recovered endpoint out of rotation
267 /// permanently. `is_circuit_open` treats a probe held longer than the
268 /// recovery window as abandoned and reclaims the slot so a fresh probe
269 /// can be admitted (the watchdog the `ProbeGuard` doc points to for the
270 /// async-cancel hazard). `record_completion` clears it on the normal
271 /// path.
272 half_open_probe_at: Mutex<Option<Instant>>,
273 /// Set when the endpoint is removed from the balancer. The flat snapshot
274 /// shares this `Arc`, so a selector iterating a snapshot taken *before* a
275 /// concurrent removal (and before the rebuild that drops the endpoint)
276 /// sees it as unavailable immediately. Without this, that selector could
277 /// pick a gone endpoint, fail the `endpoints.get` reservation, and burn a
278 /// retry — exhausting into a transient false `NoEndpointsAvailable`.
279 removed: std::sync::atomic::AtomicBool,
280}
281
282impl EndpointState {
283 fn new(endpoint: Endpoint) -> Self {
284 Self {
285 node_id: endpoint.node_id,
286 weight: endpoint.weight,
287 tags: endpoint.tags,
288 zone: endpoint.zone,
289 priority: endpoint.priority,
290 health: RwLock::new(endpoint.health),
291 metrics: ArcSwap::new(Arc::new(endpoint.metrics)),
292 enabled: std::sync::atomic::AtomicBool::new(endpoint.enabled),
293 connections: AtomicU32::new(0),
294 total_requests: AtomicU64::new(0),
295 failed_requests: AtomicU64::new(0),
296 consecutive_failures: AtomicU32::new(0),
297 circuit_open: std::sync::atomic::AtomicBool::new(false),
298 circuit_open_time: Mutex::new(None),
299 half_open_probe: std::sync::atomic::AtomicBool::new(false),
300 half_open_probe_at: Mutex::new(None),
301 removed: std::sync::atomic::AtomicBool::new(false),
302 }
303 }
304
305 fn health(&self) -> HealthStatus {
306 *self.health.read()
307 }
308
309 /// Materialize an owned snapshot of the current metrics. Used
310 /// by [`LoadBalancer::endpoints`] which builds full `Endpoint`
311 /// structs for operator/inventory consumers — that path
312 /// genuinely needs ownership. Per-select hot paths should call
313 /// [`Self::load_score`] which avoids the clone entirely.
314 fn metrics(&self) -> LoadMetrics {
315 (**self.metrics.load()).clone()
316 }
317
318 /// Compute the composite load score from the current metrics
319 /// snapshot. Per perf #149 — pre-fix every per-event select
320 /// path called `state.load_score()` which
321 /// `RwLock::read + LoadMetrics::clone + score`. Now it's one
322 /// `ArcSwap::load + score` — no lock, no clone.
323 fn load_score(&self) -> f64 {
324 self.metrics.load().load_score()
325 }
326
327 fn is_enabled(&self) -> bool {
328 self.enabled.load(Ordering::Relaxed)
329 }
330
331 fn effective_weight(&self) -> f64 {
332 if !self.is_enabled() {
333 return 0.0;
334 }
335 self.weight as f64 * self.health().weight_multiplier()
336 }
337
338 fn is_available(&self) -> bool {
339 !self.removed.load(Ordering::Acquire)
340 && self.is_enabled()
341 && self.health().can_receive_traffic()
342 }
343
344 /// Atomically reserve a connection slot if the endpoint is below cap.
345 ///
346 /// Returns `true` if the slot was reserved (caller now owns a connection
347 /// that must be released via `record_completion`), or `false` if the cap
348 /// was already reached. This replaces the prior check-then-increment
349 /// pattern that allowed concurrent selectors to exceed the cap.
350 fn try_record_request(&self, max_connections: u32) -> bool {
351 let reserved = self
352 .connections
353 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |c| {
354 if c >= max_connections {
355 None
356 } else {
357 Some(c + 1)
358 }
359 })
360 .is_ok();
361 if reserved {
362 self.total_requests.fetch_add(1, Ordering::Relaxed);
363 }
364 reserved
365 }
366
367 fn record_completion(&self, success: bool) {
368 // Saturating sub. Pre-fix `fetch_sub(1)` was unconditional;
369 // a caller hitting `record_completion` without a matching
370 // `record_request` (a substrate bug or a misuse of the
371 // public `LoadBalancer::record_completion(node_id)` API)
372 // underflowed `connections` to `u32::MAX - k`. After that,
373 // `try_record_request` always failed (`c >= max_connections`)
374 // and `get_available_endpoints` filtered the endpoint out
375 // forever - a silent, permanent removal from rotation with
376 // no log, no metric, no recovery path. The test at the
377 // bottom of this module explicitly acknowledged the hazard.
378 let _ = self
379 .connections
380 .fetch_update(Ordering::AcqRel, Ordering::Acquire, |c| {
381 Some(c.saturating_sub(1))
382 });
383
384 // If this completion is for the half-open probe, it decides the
385 // circuit's fate. Clearing the flag with swap also guarantees only
386 // one completion is treated as the probe outcome.
387 if self.half_open_probe.swap(false, Ordering::AcqRel) {
388 // Clear the watchdog stamp now that the probe is resolved
389 // normally — keeps a stale `Instant` from lingering past the
390 // next claim's stamp.
391 *self.half_open_probe_at.lock() = None;
392 if success {
393 self.circuit_open.store(false, Ordering::Release);
394 self.consecutive_failures.store(0, Ordering::Relaxed);
395 *self.circuit_open_time.lock() = None;
396 } else {
397 self.failed_requests.fetch_add(1, Ordering::Relaxed);
398 // Probe failed — restart the recovery timer so the next
399 // probe is delayed by another full recovery_time window.
400 *self.circuit_open_time.lock() = Some(Instant::now());
401 }
402 return;
403 }
404
405 if success {
406 self.consecutive_failures.store(0, Ordering::Relaxed);
407 } else {
408 self.failed_requests.fetch_add(1, Ordering::Relaxed);
409 let failures = self.consecutive_failures.fetch_add(1, Ordering::Relaxed) + 1;
410 // Open circuit after 5 consecutive failures. Use CAS so only
411 // the thread that causes the transition records the open time.
412 if failures >= 5
413 && self
414 .circuit_open
415 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
416 .is_ok()
417 {
418 *self.circuit_open_time.lock() = Some(Instant::now());
419 }
420 }
421 }
422
423 /// Returns true if new requests should be rejected for this endpoint.
424 ///
425 /// Does NOT claim the half-open probe slot — that is done lazily at
426 /// selection time via [`try_claim_half_open_probe`], so only the
427 /// endpoint actually chosen by the selector claims the probe.
428 /// Conflating "is the circuit open?" with "CAS-claim the half-open
429 /// probe slot when the recovery window has elapsed" would let
430 /// `get_available_endpoints` claim the probe slot for every endpoint
431 /// it filters: a multi-endpoint outage past its recovery window
432 /// would then have every endpoint claim the slot in the scan while
433 /// only one (or zero) was selected; the N-1 others would hold
434 /// `half_open_probe == true` with no in-flight request and no
435 /// completion path, and every subsequent `is_circuit_open` would
436 /// return true forever.
437 ///
438 /// NOT a pure predicate: it performs ONE bounded self-healing write.
439 /// When the slot is held by a probe that has been claimed for longer
440 /// than a full recovery window with no completion (genuinely
441 /// ABANDONED — see below), it reclaims the slot via
442 /// [`release_half_open_probe`] and admits. This is idempotent under
443 /// concurrent scanners and only ever fires on an abandoned slot — a
444 /// LIVE probe (claimed within the recovery window) is never touched —
445 /// so a read-only caller cannot clear an in-flight probe. The reclaim
446 /// must live here, not on the claim path: the claim only runs for the
447 /// endpoint the selector picks, but a rejected endpoint is never
448 /// picked, so its abandoned slot could only be healed during this
449 /// scan.
450 fn is_circuit_open(&self, recovery_time: Duration) -> bool {
451 if !self.circuit_open.load(Ordering::Acquire) {
452 return false;
453 }
454 let open_time = match *self.circuit_open_time.lock() {
455 Some(t) => t,
456 None => return true,
457 };
458 if open_time.elapsed() < recovery_time {
459 return true;
460 }
461 // Recovery window has elapsed — the endpoint is admitting
462 // a half-open probe. If the probe slot is already taken,
463 // another request is in flight and we keep rejecting.
464 // Otherwise we admit (the caller will CAS-claim the slot
465 // via `try_claim_half_open_probe` only on the endpoint it
466 // actually selects).
467 if !self.half_open_probe.load(Ordering::Acquire) {
468 return false;
469 }
470 // The slot is claimed. A claim that has been held longer than
471 // a full recovery window with no completion is an ABANDONED
472 // probe — a selection handed to a caller (e.g.
473 // `GroupCoordinator::route_event`) that never calls
474 // `record_completion`, or an async request future that was
475 // cancelled/panicked between claim and completion without a
476 // `ProbeGuard`. Without this watchdog the bare bool would
477 // pin the recovered endpoint out of rotation forever. Reclaim
478 // the slot so a fresh probe can be admitted on this scan.
479 let abandoned = self
480 .half_open_probe_at
481 .lock()
482 .is_some_and(|claimed_at| claimed_at.elapsed() >= recovery_time);
483 if abandoned {
484 self.release_half_open_probe();
485 // Admit: this scan/selection will re-claim the slot.
486 return false;
487 }
488 true
489 }
490
491 /// Try to claim the half-open probe slot.
492 ///
493 /// Returns an [`Option<ProbeGuard<'_>>`]; the `Some` arm
494 /// carries an RAII guard whose `Drop` releases the slot
495 /// automatically. Callers that successfully drive the request
496 /// to completion MUST invoke [`ProbeGuard::commit`] before
497 /// dispatching to the network — `record_completion` is then
498 /// the path that clears the flag. Any other exit (panic
499 /// between claim and dispatch, future cancellation, fall-
500 /// through error) drops the guard and the slot rolls back
501 /// atomically.
502 ///
503 /// This guard API is intended for ASYNC callers where the
504 /// claim → completion window is materially wide (a request
505 /// future spanning a network round-trip, where cancellation
506 /// or panic between the two is plausible). The synchronous
507 /// `select` path at this module's top uses a direct
508 /// `compare_exchange` on `half_open_probe` because its claim
509 /// → release window is a few atomic ops; the borrow checker
510 /// forbids holding a `ProbeGuard<'_>` across the dashmap
511 /// `Ref`'s `drop(state)` boundary in that loop.
512 #[allow(dead_code)]
513 fn try_claim_half_open_probe(&self) -> Option<ProbeGuard<'_>> {
514 self.half_open_probe
515 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
516 .ok()
517 .map(|_| {
518 // Stamp the watchdog so a never-committed, never-dropped
519 // claim still self-heals via `is_circuit_open`.
520 self.stamp_half_open_probe();
521 ProbeGuard { state: self }
522 })
523 }
524
525 /// Release the half-open probe slot without recording a
526 /// completion outcome. Prefer [`ProbeGuard`]'s Drop for
527 /// routine release; this method exists for paths where the
528 /// slot must be cleared via direct atomic write (e.g.
529 /// `record_completion` once the breaker fully reopens).
530 fn release_half_open_probe(&self) {
531 *self.half_open_probe_at.lock() = None;
532 self.half_open_probe.store(false, Ordering::Release);
533 }
534
535 /// Stamp the half-open probe watchdog. Called right after a
536 /// successful `false -> true` claim of `half_open_probe` so
537 /// `is_circuit_open` can reclaim an abandoned probe slot (a
538 /// selection that was returned to a caller who never recorded
539 /// completion) once it has been held past the recovery window.
540 fn stamp_half_open_probe(&self) {
541 *self.half_open_probe_at.lock() = Some(Instant::now());
542 }
543}
544
545/// RAII guard returned by
546/// [`EndpointState::try_claim_half_open_probe`]. The Drop impl
547/// clears the `half_open_probe` slot UNLESS [`Self::commit`] was
548/// called first (which `mem::forget`-equivalent the guard, so
549/// no atomic write runs).
550///
551/// Pattern:
552/// ```ignore
553/// let probe = state.try_claim_half_open_probe()?; // claim
554/// // ... checks that may early-return / panic ...
555/// if !state.try_record_request(max_conn) {
556/// return Err(...); // probe drops, slot released
557/// }
558/// probe.commit(); // success: ownership
559/// // transfers to record_completion
560/// // ... dispatch ...
561/// ```
562///
563/// Tracking the success vs failure path with a `bool` plus a
564/// manual `release_half_open_probe` at every fall-through is
565/// easy to miss on a future-cancel where neither `Ok` nor `Err`
566/// runs to completion.
567#[allow(dead_code)]
568pub(super) struct ProbeGuard<'a> {
569 state: &'a EndpointState,
570}
571
572impl<'a> ProbeGuard<'a> {
573 /// Forget the guard so its Drop does NOT release the slot.
574 /// Call this only on the success path AFTER the matching
575 /// `try_record_request` succeeded — `record_completion` is
576 /// then the path that clears the flag.
577 #[allow(dead_code)]
578 fn commit(self) {
579 std::mem::forget(self);
580 }
581}
582
583impl<'a> Drop for ProbeGuard<'a> {
584 fn drop(&mut self) {
585 // Roll back the claim. Idempotent at the atomic level
586 // (`store(false)` always lands false), but the structural
587 // invariant is that this Drop only runs on the
588 // non-commit path — `mem::forget` (via `commit`) prevents
589 // it on the success path.
590 self.state.release_half_open_probe();
591 }
592}
593
594/// Request context for load balancing decisions
595#[derive(Debug, Clone, Default)]
596pub struct RequestContext {
597 /// Request ID for consistent hashing
598 pub request_id: Option<String>,
599 /// Session ID for sticky sessions
600 pub session_id: Option<String>,
601 /// Client zone for locality routing
602 pub client_zone: Option<String>,
603 /// Required tags
604 pub required_tags: Vec<String>,
605 /// Preferred zones (in order of preference)
606 pub preferred_zones: Vec<String>,
607 /// Custom routing key
608 pub routing_key: Option<String>,
609}
610
611impl RequestContext {
612 /// Create new request context
613 pub fn new() -> Self {
614 Self::default()
615 }
616
617 /// Set session ID for sticky sessions
618 pub fn with_session(mut self, session_id: impl Into<String>) -> Self {
619 self.session_id = Some(session_id.into());
620 self
621 }
622
623 /// Set routing key for consistent hashing
624 pub fn with_routing_key(mut self, key: impl Into<String>) -> Self {
625 self.routing_key = Some(key.into());
626 self
627 }
628
629 /// Set client zone
630 pub fn with_zone(mut self, zone: impl Into<String>) -> Self {
631 self.client_zone = Some(zone.into());
632 self
633 }
634
635 /// Add required tag
636 pub fn require_tag(mut self, tag: impl Into<String>) -> Self {
637 self.required_tags.push(tag.into());
638 self
639 }
640}
641
642/// Selection result
643#[derive(Debug, Clone)]
644pub struct Selection {
645 /// Selected node ID
646 pub node_id: NodeId,
647 /// Endpoint weight
648 pub weight: u32,
649 /// Current load score
650 pub load_score: f64,
651 /// Why this node was selected
652 pub reason: SelectionReason,
653}
654
655/// Reason for selection
656#[derive(Debug, Clone, Copy, PartialEq, Eq)]
657pub enum SelectionReason {
658 /// Selected by round-robin
659 RoundRobin,
660 /// Selected by weight
661 Weighted,
662 /// Selected for having least connections
663 LeastConnections,
664 /// Selected by consistent hash
665 ConsistentHash,
666 /// Selected for lowest latency
667 LeastLatency,
668 /// Selected for lowest load
669 LeastLoad,
670 /// Selected randomly
671 Random,
672 /// Selected by power of two choices
673 PowerOfTwo,
674 /// Selected for zone affinity
675 ZoneAffinity,
676 /// Fallback selection
677 Fallback,
678}
679
680/// Load balancer configuration
681#[derive(Debug, Clone, Serialize, Deserialize)]
682pub struct LoadBalancerConfig {
683 /// Load balancing strategy
684 pub strategy: Strategy,
685 /// Health check interval
686 pub health_check_interval_ms: u64,
687 /// Circuit breaker recovery time
688 pub circuit_recovery_time_ms: u64,
689 /// Maximum connections per endpoint
690 pub max_connections_per_endpoint: u32,
691 /// Enable zone-aware routing
692 pub zone_aware: bool,
693 /// Fallback to any available if preferred zone unavailable
694 pub zone_fallback: bool,
695 /// Metrics staleness threshold
696 pub metrics_stale_after_ms: u64,
697}
698
699impl Default for LoadBalancerConfig {
700 fn default() -> Self {
701 Self {
702 strategy: Strategy::RoundRobin,
703 health_check_interval_ms: 5000,
704 circuit_recovery_time_ms: 30000,
705 max_connections_per_endpoint: 10000,
706 zone_aware: true,
707 zone_fallback: true,
708 metrics_stale_after_ms: 10000,
709 }
710 }
711}
712
713/// Load balancer error
714#[derive(Debug, Clone, PartialEq, Eq)]
715pub enum LoadBalancerError {
716 /// No endpoints available
717 NoEndpointsAvailable,
718 /// Endpoint not found
719 EndpointNotFound(NodeId),
720 /// All endpoints unhealthy
721 AllEndpointsUnhealthy,
722 /// No endpoints match required tags
723 NoMatchingEndpoints,
724 /// Circuit breaker open
725 CircuitOpen(NodeId),
726 /// Max connections reached
727 MaxConnectionsReached(NodeId),
728}
729
730impl std::fmt::Display for LoadBalancerError {
731 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
732 match self {
733 Self::NoEndpointsAvailable => write!(f, "no endpoints available"),
734 Self::EndpointNotFound(id) => write!(f, "endpoint not found: {:?}", id),
735 Self::AllEndpointsUnhealthy => write!(f, "all endpoints unhealthy"),
736 Self::NoMatchingEndpoints => write!(f, "no endpoints match required tags"),
737 Self::CircuitOpen(id) => write!(f, "circuit breaker open for: {:?}", id),
738 Self::MaxConnectionsReached(id) => write!(f, "max connections reached for: {:?}", id),
739 }
740 }
741}
742
743impl std::error::Error for LoadBalancerError {}
744
745/// Statistics for the load balancer
746#[derive(Debug, Clone, Default)]
747pub struct LoadBalancerStats {
748 /// Total selections made
749 pub total_selections: u64,
750 /// Failed selections
751 pub failed_selections: u64,
752 /// Active endpoints
753 pub active_endpoints: u32,
754 /// Healthy endpoints
755 pub healthy_endpoints: u32,
756 /// Total active connections
757 pub total_connections: u64,
758 /// Average load score across endpoints
759 pub avg_load_score: f64,
760}
761
762/// Shard count for the consistent-hash ring.
763///
764/// `DashMap::new()` defaults to `4 × num_cpus` shards (128 on a 32-thread
765/// host). The ring holds `virtual_nodes × endpoints` entries and
766/// `select_consistent_hash` walks it, so the default over-sharding added a
767/// ~128-shard-lock fixed cost to every consistent-hash selection (measured
768/// ~19% of it). A small fixed count keeps the walk cheap while leaving room for
769/// concurrent ring inserts on add/remove.
770///
771/// (The `endpoints` map keeps the default on purpose: `select`/`stats` read the
772/// `endpoint_list` snapshot, not `endpoints.iter()`, so its shard count only
773/// affects concurrent point lookups — where more shards is better.)
774const HASH_RING_SHARDS: usize = 8;
775
776/// Distributed load balancer
777pub struct LoadBalancer {
778 /// Configuration
779 config: LoadBalancerConfig,
780 /// Endpoints by node ID — authoritative store for point lookups
781 /// (get / reservation / health updates).
782 endpoints: DashMap<NodeId, Arc<EndpointState>>,
783 /// Flat snapshot of the same endpoints, rebuilt only when the SET changes
784 /// (add/remove). `select()`/`stats()` iterate this instead of
785 /// `DashMap::iter()`, which walks every shard (4*num_cpus, e.g. 128) even
786 /// for a handful of endpoints — the fixed cost that dominated select().
787 /// The Arcs are shared with `endpoints`, so live per-endpoint atomic state
788 /// (health, connections, circuit) is read correctly through the snapshot.
789 endpoint_list: ArcSwap<Vec<Arc<EndpointState>>>,
790 /// Serializes endpoint SET changes (add/remove) so the `endpoints`
791 /// mutation, hash-ring update, and `endpoint_list` rebuild commit as one
792 /// unit. Without it, two concurrent membership changes can interleave such
793 /// that a rebuild observing the map *before* another thread's mutation
794 /// stores its stale snapshot last — dropping a just-added endpoint (or
795 /// resurrecting a removed one) from `endpoint_list` until the next change.
796 /// Not taken on the hot path (`select`/`stats` only read the snapshot).
797 membership_lock: Mutex<()>,
798 /// Round-robin counter
799 rr_counter: AtomicU64,
800 /// Total selections
801 total_selections: AtomicU64,
802 /// Failed selections
803 failed_selections: AtomicU64,
804 /// Consistent hash ring (node_id -> virtual nodes)
805 hash_ring: DashMap<u64, NodeId>,
806 /// Virtual nodes per endpoint for consistent hashing
807 virtual_nodes: u32,
808}
809
810impl LoadBalancer {
811 /// Create a new load balancer
812 pub fn new(config: LoadBalancerConfig) -> Self {
813 Self {
814 config,
815 endpoints: DashMap::new(),
816 endpoint_list: ArcSwap::from_pointee(Vec::new()),
817 membership_lock: Mutex::new(()),
818 rr_counter: AtomicU64::new(0),
819 total_selections: AtomicU64::new(0),
820 failed_selections: AtomicU64::new(0),
821 hash_ring: DashMap::with_shard_amount(HASH_RING_SHARDS),
822 virtual_nodes: 150,
823 }
824 }
825
826 /// Create with default configuration
827 pub fn with_strategy(strategy: Strategy) -> Self {
828 Self::new(LoadBalancerConfig {
829 strategy,
830 ..Default::default()
831 })
832 }
833
834 /// Add an endpoint
835 pub fn add_endpoint(&self, endpoint: Endpoint) {
836 let node_id = endpoint.node_id;
837 // Hold `membership_lock` across the mutation + rebuild so a concurrent
838 // add/remove can't store a stale snapshot over ours (see field doc).
839 let _guard = self.membership_lock.lock();
840 let was_present = self
841 .endpoints
842 .insert(node_id, Arc::new(EndpointState::new(endpoint)))
843 .is_some();
844
845 // Add (or idempotently re-add) this node's vnodes. `add_to_hash_ring`
846 // overwrites the node's own prior vnodes in place and sweeps any
847 // stale leftovers, so a re-add (reconnect / weight change) neither
848 // leaks vnodes nor leaves a transient window where the node is
849 // absent from the ring (which would misroute traffic for an
850 // endpoint that was meant to stay available). `was_present`
851 // (endpoint absent => no prior vnodes, under `membership_lock`)
852 // lets a fresh add skip the full-ring stale-vnode sweep.
853 self.add_to_hash_ring(node_id, was_present);
854 self.rebuild_endpoint_list();
855 }
856
857 /// Remove an endpoint
858 pub fn remove_endpoint(&self, node_id: &NodeId) {
859 let _guard = self.membership_lock.lock();
860 self.remove_from_hash_ring(node_id);
861 if let Some((_, state)) = self.endpoints.remove(node_id) {
862 // Flag the shared EndpointState so an in-flight selector reading a
863 // pre-rebuild snapshot treats it as unavailable (see field doc).
864 state.removed.store(true, Ordering::Release);
865 }
866 self.rebuild_endpoint_list();
867 }
868
869 /// Rebuild the flat endpoint snapshot iterated by `select`/`stats`.
870 /// Called only when the endpoint SET changes (add/remove) — per-endpoint
871 /// state updates mutate shared atomics visible through the existing Arcs,
872 /// so they need no rebuild. This is the only place that pays the
873 /// `DashMap::iter()` shard walk; the hot path reads the snapshot.
874 fn rebuild_endpoint_list(&self) {
875 let list: Vec<Arc<EndpointState>> = self
876 .endpoints
877 .iter()
878 .map(|e| Arc::clone(e.value()))
879 .collect();
880 self.endpoint_list.store(Arc::new(list));
881 }
882
883 /// Update endpoint health
884 pub fn update_health(&self, node_id: &NodeId, health: HealthStatus) {
885 if let Some(state) = self.endpoints.get(node_id) {
886 *state.health.write() = health;
887 }
888 }
889
890 /// Update endpoint metrics
891 pub fn update_metrics(&self, node_id: &NodeId, metrics: LoadMetrics) {
892 if let Some(state) = self.endpoints.get(node_id) {
893 state.metrics.store(Arc::new(metrics));
894 }
895 }
896
897 /// Select an endpoint for a request.
898 ///
899 /// The connection slot is reserved atomically as part of selection so
900 /// that concurrent selectors cannot collectively exceed
901 /// `max_connections_per_endpoint`. If a strategy picks an endpoint whose
902 /// cap was filled by a concurrent selector between availability filtering
903 /// and reservation, the selection is retried up to a bounded number of
904 /// times before giving up.
905 pub fn select(&self, ctx: &RequestContext) -> Result<Selection, LoadBalancerError> {
906 self.total_selections.fetch_add(1, Ordering::Relaxed);
907
908 const MAX_RESERVATION_RETRIES: usize = 4;
909 let max_conn = self.config.max_connections_per_endpoint;
910
911 // Round-robin strategies advance `rr_counter` inside their
912 // selection function. The retry loop below could call them up
913 // to 4 times per logical `select()`, which inflated the
914 // rotation counter proportionally and distorted the observed
915 // RR sequence — weighted-RR distribution tests indirectly
916 // assumed 1:1. We pre-compute the RR offset once for this
917 // whole logical selection and step deterministically across
918 // retries via `(rr_offset + attempt)`, so the counter
919 // advances exactly once per `select()` regardless of how many
920 // reservation retries occur.
921 let rr_offset_for_this_select = self.rr_counter.fetch_add(1, Ordering::Relaxed) as usize;
922
923 for attempt in 0..MAX_RESERVATION_RETRIES {
924 let available = self.get_available_endpoints(ctx)?;
925
926 if available.is_empty() {
927 self.failed_selections.fetch_add(1, Ordering::Relaxed);
928 return Err(LoadBalancerError::NoEndpointsAvailable);
929 }
930
931 // Apply strategy. Round-robin variants take a
932 // pre-computed offset; non-RR strategies are
933 // unaffected by retries (their selection is content-
934 // or metric-based).
935 let selection = match self.config.strategy {
936 Strategy::RoundRobin => self.select_round_robin_at(
937 &available,
938 rr_offset_for_this_select.wrapping_add(attempt),
939 ),
940 Strategy::WeightedRoundRobin => self.select_weighted_round_robin_at(
941 &available,
942 rr_offset_for_this_select.wrapping_add(attempt) as u64,
943 ),
944 Strategy::LeastConnections => self.select_least_connections(&available),
945 Strategy::WeightedLeastConnections => {
946 self.select_weighted_least_connections(&available)
947 }
948 Strategy::Random => self.select_random(&available),
949 Strategy::WeightedRandom => self.select_weighted_random(&available),
950 Strategy::ConsistentHash => self.select_consistent_hash(&available, ctx),
951 Strategy::LeastLatency => self.select_least_latency(&available),
952 Strategy::LeastLoad => self.select_least_load(&available),
953 Strategy::PowerOfTwo => self.select_power_of_two(&available),
954 Strategy::Adaptive => self.select_adaptive(&available, ctx),
955 };
956
957 // Atomically reserve the connection slot. If a concurrent
958 // selector filled the cap, re-run selection against fresh state.
959 if let Some(state) = self.endpoints.get(&selection.node_id) {
960 // Claim the half-open probe slot ONLY on the
961 // endpoint we actually selected, AFTER the
962 // pure-predicate `is_circuit_open` check has
963 // already admitted the endpoint into `available`.
964 // Claiming during the filter pass would leak slots
965 // on multi-endpoint outages.
966 //
967 // When `circuit_open == true`, the half-open probe
968 // claim is the HARD GATE — losers of the CAS race
969 // must NOT proceed through `try_record_request`.
970 // Without strict half-open semantics, a concurrent
971 // selector that observed `half_open_probe == false`
972 // at filter time but lost the claim CAS could still
973 // ride the connection-cap path through and send
974 // real traffic to a recovering endpoint alongside
975 // the actual probe. Only the thread that wins the
976 // probe-slot CAS may test the endpoint; everyone
977 // else skips and retries selection. With the slot
978 // now claimed (by whoever won), the next
979 // iteration's `get_available_endpoints` sees
980 // `half_open_probe == true` and filters this
981 // endpoint out — losers naturally pick a different
982 // endpoint or surface `NoEndpointsAvailable` if
983 // this was the only option.
984 let circuit_open = state.circuit_open.load(Ordering::Acquire);
985 // The `ProbeGuard` RAII type is the preferred API
986 // for future async callers (where the request
987 // future may panic / cancel between claim and
988 // `record_completion`, leaking the slot without a
989 // guard). At THIS synchronous selection callsite,
990 // the guard's lifetime is tied to the dashmap
991 // `Ref` we hold via `state`; carrying it across
992 // the `drop(state); continue;` path the
993 // lost-race branch needs is forbidden by the
994 // borrow checker. Since this loop is fully
995 // synchronous (a few atomic ops between claim
996 // and either `Ok(selection)` or
997 // `release_half_open_probe`), the bool +
998 // explicit-release pattern is panic-free in
999 // practice — the only ops between claim and
1000 // release are atomic loads / stores that don't
1001 // unwind. We use a direct CAS here rather than
1002 // `try_claim_half_open_probe` so we don't have to
1003 // immediately drop the guard returned by it.
1004 let claimed_probe = if circuit_open {
1005 let claim_ok = state
1006 .half_open_probe
1007 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
1008 .is_ok();
1009 if !claim_ok {
1010 // Lost the half-open probe race. Drop the
1011 // ref guard so the retry's `endpoints.get`
1012 // doesn't deadlock, and continue to the
1013 // next attempt.
1014 drop(state);
1015 continue;
1016 }
1017 // Stamp the watchdog so this claim self-heals even
1018 // if the returned `Selection` is dropped without a
1019 // matching `record_completion` (e.g.
1020 // `GroupCoordinator::route_event`). `is_circuit_open`
1021 // reclaims the slot once it has been held past the
1022 // recovery window. See `half_open_probe_at`.
1023 state.stamp_half_open_probe();
1024 true
1025 } else {
1026 false
1027 };
1028 if state.try_record_request(max_conn) {
1029 return Ok(selection);
1030 }
1031 // try_record_request failed — release any probe
1032 // slot we just claimed so it doesn't strand.
1033 if claimed_probe {
1034 state.release_half_open_probe();
1035 }
1036 }
1037 }
1038
1039 self.failed_selections.fetch_add(1, Ordering::Relaxed);
1040 Err(LoadBalancerError::NoEndpointsAvailable)
1041 }
1042
1043 /// Record request completion
1044 pub fn record_completion(&self, node_id: &NodeId, success: bool) {
1045 if let Some(state) = self.endpoints.get(node_id) {
1046 state.record_completion(success);
1047 }
1048 }
1049
1050 /// Get available endpoints matching context
1051 fn get_available_endpoints(
1052 &self,
1053 ctx: &RequestContext,
1054 ) -> Result<Vec<Arc<EndpointState>>, LoadBalancerError> {
1055 // Clamp to >= 1ms. A 0 recovery window makes the abandoned-probe
1056 // watchdog in `is_circuit_open` fire on every freshly-claimed
1057 // probe (`claimed_at.elapsed() >= ZERO` is always true), which
1058 // collapses the single-probe half-open gate and admits unbounded
1059 // concurrent probes to a still-failing endpoint. `0` is a
1060 // reachable, unvalidated config value, so guard it at the source.
1061 let recovery_time = Duration::from_millis(self.config.circuit_recovery_time_ms.max(1));
1062 let mut available = Vec::new();
1063 let mut zone_matches = Vec::new();
1064
1065 // Iterate the flat snapshot rather than DashMap::iter (shard walk).
1066 let snapshot = self.endpoint_list.load();
1067 for state in snapshot.iter() {
1068 // Check basic availability
1069 if !state.is_available() {
1070 continue;
1071 }
1072
1073 // Check circuit breaker
1074 if state.is_circuit_open(recovery_time) {
1075 continue;
1076 }
1077
1078 // Check max connections
1079 if state.connections.load(Ordering::Relaxed) >= self.config.max_connections_per_endpoint
1080 {
1081 continue;
1082 }
1083
1084 // Check required tags
1085 if !ctx.required_tags.is_empty()
1086 && !ctx.required_tags.iter().all(|t| state.tags.contains(t))
1087 {
1088 continue;
1089 }
1090
1091 // Zone-aware routing
1092 if self.config.zone_aware {
1093 if let Some(ref client_zone) = ctx.client_zone {
1094 if state.zone.as_ref() == Some(client_zone) {
1095 zone_matches.push(Arc::clone(state));
1096 continue;
1097 }
1098 }
1099 }
1100
1101 available.push(Arc::clone(state));
1102 }
1103
1104 // Prefer zone matches if available
1105 if !zone_matches.is_empty() {
1106 return Ok(zone_matches);
1107 }
1108
1109 // No zone matches — check zone_fallback policy
1110 if self.config.zone_aware && ctx.client_zone.is_some() && !self.config.zone_fallback {
1111 // zone_fallback is disabled: don't fall back to non-zone endpoints
1112 return Err(LoadBalancerError::NoEndpointsAvailable);
1113 }
1114
1115 if available.is_empty() {
1116 return Err(LoadBalancerError::NoEndpointsAvailable);
1117 }
1118
1119 Ok(available)
1120 }
1121
1122 fn select_round_robin(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1123 let offset = self.rr_counter.fetch_add(1, Ordering::Relaxed) as usize;
1124 self.select_round_robin_at(endpoints, offset)
1125 }
1126
1127 /// Offset-based variant used by the retry loop in `select()` so
1128 /// a logical select advances the `rr_counter` exactly once across
1129 /// all reservation retries.
1130 fn select_round_robin_at(&self, endpoints: &[Arc<EndpointState>], offset: usize) -> Selection {
1131 let idx = offset % endpoints.len();
1132 let state = &endpoints[idx];
1133 Selection {
1134 node_id: state.node_id,
1135 weight: state.weight,
1136 load_score: state.load_score(),
1137 reason: SelectionReason::RoundRobin,
1138 }
1139 }
1140
1141 fn select_weighted_round_robin(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1142 let counter = self.rr_counter.fetch_add(1, Ordering::Relaxed);
1143 self.select_weighted_round_robin_at(endpoints, counter)
1144 }
1145
1146 /// Offset-based variant used by `select()` across reservation
1147 /// retries so the `rr_counter` advances exactly once per logical
1148 /// select.
1149 fn select_weighted_round_robin_at(
1150 &self,
1151 endpoints: &[Arc<EndpointState>],
1152 counter: u64,
1153 ) -> Selection {
1154 let total_weight: f64 = endpoints.iter().map(|e| e.effective_weight()).sum();
1155
1156 // Use `!(total_weight > 0.0)` rather than `total_weight <= 0.0`:
1157 // NaN compares unequal to everything (including itself), so
1158 // `NaN <= 0.0` is `false` — the gate would fall through to
1159 // the weighted path below where `total_weight.ceil() as u64`
1160 // is undefined for NaN, and the cumulative loop never
1161 // exceeds NaN (the `>` comparison is also false), causing
1162 // the function to fall through to the fallback-first path
1163 // and silently bias every selection to `endpoints[0]`. The
1164 // negated-greater check catches NaN as well as ≤ 0.0.
1165 // Clippy flags the negated comparison; the lint is wrong
1166 // for our NaN-safety intent, so suppress it locally.
1167 #[allow(clippy::neg_cmp_op_on_partial_ord)]
1168 if !(total_weight > 0.0) {
1169 return self.select_round_robin_at(endpoints, counter as usize);
1170 }
1171
1172 // Pick a wheel position by reducing the counter modulo an
1173 // integer `wheel` size BEFORE casting to f64, then mapping that
1174 // position proportionally into `[0, total_weight)`. Doing the
1175 // modulus in integer space first preserves the precision fix —
1176 // `counter as f64 % total_weight` lost the low bits of `counter`
1177 // past the f64 mantissa boundary (2^53 selections), stalling
1178 // rotation on a narrow set of indices.
1179 //
1180 // The wheel size must NOT collapse to the integer ceiling of a
1181 // sub-unit total: `total_weight.ceil() as u64` mapped
1182 // `total_weight == 1.0` (e.g. two endpoints each at effective
1183 // weight 0.5, both `Degraded`) to `1`, so `counter % 1 == 0`
1184 // always and the cumulative loop (`0.5 > 0`) selected the first
1185 // endpoint forever — the second starved.
1186 //
1187 // Derive the wheel from the weights themselves rather than an
1188 // arbitrary floor: `ceil(total / smallest positive weight)`
1189 // gives every endpoint at least one wheel position and yields
1190 // EXACT ratios for commensurate weights. Integer weights keep
1191 // their natural cycle (1,1,1 → wheel 3; 100,50 → wheel 3, i.e.
1192 // 2:1) — byte-for-byte the old integer behavior, NOT reshaped by
1193 // a fixed constant — while fractional/sub-unit shares resolve
1194 // (0.5,0.5 → 2; 1.0,0.5 → 3). The modulus is taken in integer
1195 // space (`counter % wheel`) BEFORE the f64 cast, preserving the
1196 // precision fix (`counter as f64 % total` lost the low bits of
1197 // `counter` past the 2^53 mantissa boundary, stalling rotation).
1198 let min_weight = endpoints
1199 .iter()
1200 .map(|e| e.effective_weight())
1201 .filter(|w| *w > 0.0)
1202 .fold(f64::INFINITY, f64::min);
1203 let wheel = ((total_weight / min_weight).ceil() as u64).max(1);
1204 // Map the integer wheel position into the real weight domain.
1205 let target = (counter % wheel) as f64 / wheel as f64 * total_weight;
1206
1207 let mut cumulative = 0.0;
1208 for state in endpoints {
1209 cumulative += state.effective_weight();
1210 if cumulative > target {
1211 return Selection {
1212 node_id: state.node_id,
1213 weight: state.weight,
1214 load_score: state.load_score(),
1215 reason: SelectionReason::Weighted,
1216 };
1217 }
1218 }
1219
1220 // Fallback to first
1221 let state = &endpoints[0];
1222 Selection {
1223 node_id: state.node_id,
1224 weight: state.weight,
1225 load_score: state.load_score(),
1226 reason: SelectionReason::Weighted,
1227 }
1228 }
1229
1230 #[expect(
1231 clippy::unwrap_used,
1232 reason = "caller (LoadBalancer::select) returns early on empty endpoints; min_by_key on a non-empty iter is infallible"
1233 )]
1234 fn select_least_connections(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1235 let state = endpoints
1236 .iter()
1237 .min_by_key(|e| e.connections.load(Ordering::Relaxed))
1238 .unwrap();
1239
1240 Selection {
1241 node_id: state.node_id,
1242 weight: state.weight,
1243 load_score: state.load_score(),
1244 reason: SelectionReason::LeastConnections,
1245 }
1246 }
1247
1248 #[expect(
1249 clippy::unwrap_used,
1250 reason = "caller (LoadBalancer::select) returns early on empty endpoints; min_by on a non-empty iter is infallible"
1251 )]
1252 fn select_weighted_least_connections(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1253 // Score = connections / weight (lower is better).
1254 // The `.max(MIN_DIVISOR)` guard is a divide-by-zero protector
1255 // for zero-weighted endpoints. It uses a small positive
1256 // epsilon instead of `1.0` so that fractional weights like
1257 // `0.1` and `0.5` keep their relative ordering — the old
1258 // `.max(1.0)` silently collapsed any weight in `(0, 1]` onto
1259 // `1.0`, degrading weighted-LC into plain least-connections
1260 // whenever operators configured sub-unit weights.
1261 const MIN_DIVISOR: f64 = 1e-6;
1262 let state = endpoints
1263 .iter()
1264 .min_by(|a, b| {
1265 let score_a = a.connections.load(Ordering::Relaxed) as f64
1266 / a.effective_weight().max(MIN_DIVISOR);
1267 let score_b = b.connections.load(Ordering::Relaxed) as f64
1268 / b.effective_weight().max(MIN_DIVISOR);
1269 score_a.total_cmp(&score_b)
1270 })
1271 .unwrap();
1272
1273 Selection {
1274 node_id: state.node_id,
1275 weight: state.weight,
1276 load_score: state.load_score(),
1277 reason: SelectionReason::LeastConnections,
1278 }
1279 }
1280
1281 fn select_random(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1282 let idx = random_usize() % endpoints.len();
1283 let state = &endpoints[idx];
1284 Selection {
1285 node_id: state.node_id,
1286 weight: state.weight,
1287 load_score: state.load_score(),
1288 reason: SelectionReason::Random,
1289 }
1290 }
1291
1292 fn select_weighted_random(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1293 let total_weight: f64 = endpoints.iter().map(|e| e.effective_weight()).sum();
1294
1295 if total_weight <= 0.0 {
1296 return self.select_random(endpoints);
1297 }
1298
1299 let target = random_f64() * total_weight;
1300
1301 let mut cumulative = 0.0;
1302 for state in endpoints {
1303 cumulative += state.effective_weight();
1304 if cumulative >= target {
1305 return Selection {
1306 node_id: state.node_id,
1307 weight: state.weight,
1308 load_score: state.load_score(),
1309 reason: SelectionReason::Weighted,
1310 };
1311 }
1312 }
1313
1314 // Fallback
1315 let state = &endpoints[0];
1316 Selection {
1317 node_id: state.node_id,
1318 weight: state.weight,
1319 load_score: state.load_score(),
1320 reason: SelectionReason::Weighted,
1321 }
1322 }
1323
1324 fn select_consistent_hash(
1325 &self,
1326 endpoints: &[Arc<EndpointState>],
1327 ctx: &RequestContext,
1328 ) -> Selection {
1329 let key = ctx
1330 .routing_key
1331 .as_ref()
1332 .or(ctx.session_id.as_ref())
1333 .or(ctx.request_id.as_ref());
1334
1335 if let Some(key) = key {
1336 let hash = self.hash_key(key);
1337
1338 // Collect and sort hash ring entries — DashMap iteration order is
1339 // arbitrary, but consistent hashing requires finding the smallest
1340 // key >= hash.
1341 let mut ring: Vec<(u64, NodeId)> = self
1342 .hash_ring
1343 .iter()
1344 .map(|entry| (*entry.key(), *entry.value()))
1345 .collect();
1346 ring.sort_unstable_by_key(|&(k, _)| k);
1347
1348 // Binary search for the first key >= hash
1349 let idx = ring.partition_point(|&(k, _)| k < hash);
1350
1351 // Try from the found position, wrapping around
1352 for i in 0..ring.len() {
1353 let (_, node_id) = ring[(idx + i) % ring.len()];
1354 if let Some(state) = endpoints.iter().find(|e| e.node_id == node_id) {
1355 return Selection {
1356 node_id: state.node_id,
1357 weight: state.weight,
1358 load_score: state.load_score(),
1359 reason: SelectionReason::ConsistentHash,
1360 };
1361 }
1362 }
1363 }
1364
1365 // Fallback to round-robin
1366 self.select_round_robin(endpoints)
1367 }
1368
1369 #[expect(
1370 clippy::unwrap_used,
1371 reason = "caller (LoadBalancer::select) returns early on empty endpoints; min_by on a non-empty iter is infallible"
1372 )]
1373 fn select_least_latency(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1374 let state = endpoints
1375 .iter()
1376 .min_by(|a, b| {
1377 a.metrics()
1378 .avg_response_time_ms
1379 .total_cmp(&b.metrics().avg_response_time_ms)
1380 })
1381 .unwrap();
1382
1383 Selection {
1384 node_id: state.node_id,
1385 weight: state.weight,
1386 load_score: state.load_score(),
1387 reason: SelectionReason::LeastLatency,
1388 }
1389 }
1390
1391 #[expect(
1392 clippy::unwrap_used,
1393 reason = "caller (LoadBalancer::select) returns early on empty endpoints; min_by on a non-empty iter is infallible"
1394 )]
1395 fn select_least_load(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1396 let state = endpoints
1397 .iter()
1398 .min_by(|a, b| {
1399 a.metrics()
1400 .load_score()
1401 .total_cmp(&b.metrics().load_score())
1402 })
1403 .unwrap();
1404
1405 Selection {
1406 node_id: state.node_id,
1407 weight: state.weight,
1408 load_score: state.load_score(),
1409 reason: SelectionReason::LeastLoad,
1410 }
1411 }
1412
1413 fn select_power_of_two(&self, endpoints: &[Arc<EndpointState>]) -> Selection {
1414 if endpoints.len() < 2 {
1415 return self.select_round_robin(endpoints);
1416 }
1417
1418 // Pick two random endpoints
1419 let idx1 = random_usize() % endpoints.len();
1420 let mut idx2 = random_usize() % endpoints.len();
1421 if idx2 == idx1 {
1422 idx2 = (idx1 + 1) % endpoints.len();
1423 }
1424
1425 let state1 = &endpoints[idx1];
1426 let state2 = &endpoints[idx2];
1427
1428 // Choose the one with fewer connections
1429 let state = if state1.connections.load(Ordering::Relaxed)
1430 <= state2.connections.load(Ordering::Relaxed)
1431 {
1432 state1
1433 } else {
1434 state2
1435 };
1436
1437 Selection {
1438 node_id: state.node_id,
1439 weight: state.weight,
1440 load_score: state.load_score(),
1441 reason: SelectionReason::PowerOfTwo,
1442 }
1443 }
1444
1445 fn select_adaptive(&self, endpoints: &[Arc<EndpointState>], ctx: &RequestContext) -> Selection {
1446 // Use different strategies based on conditions
1447 let avg_load: f64 = endpoints
1448 .iter()
1449 .map(|e| e.metrics().load_score())
1450 .sum::<f64>()
1451 / endpoints.len() as f64;
1452
1453 // If high load, use least connections
1454 if avg_load > 0.7 {
1455 return self.select_least_connections(endpoints);
1456 }
1457
1458 // If session ID present, use consistent hash
1459 if ctx.session_id.is_some() || ctx.routing_key.is_some() {
1460 return self.select_consistent_hash(endpoints, ctx);
1461 }
1462
1463 // Otherwise use weighted round-robin
1464 self.select_weighted_round_robin(endpoints)
1465 }
1466
1467 /// Place (or idempotently re-place) `node_id`'s vnodes on the ring.
1468 /// `was_present` is whether the node already had an endpoint entry
1469 /// (and therefore prior vnodes): on a fresh add it is false and the
1470 /// stale-vnode sweep is skipped, avoiding a full-ring scan per join.
1471 fn add_to_hash_ring(&self, node_id: NodeId, was_present: bool) {
1472 // Slots this node ends up occupying in THIS call — used both to
1473 // keep the node's own intra-call collisions distinct and to
1474 // sweep any stale vnodes left by a prior add (drift cleanup).
1475 let mut placed: std::collections::HashSet<u64> =
1476 std::collections::HashSet::with_capacity(self.virtual_nodes as usize);
1477 for i in 0..self.virtual_nodes {
1478 let key = format!("{:?}-{}", node_id, i);
1479 let mut hash = self.hash_key(&key);
1480 // Linear-probe past collisions, NON-DESTRUCTIVELY (a plain
1481 // `insert` would clobber another node's vnode and skew the
1482 // ring). The probe stops when the slot is:
1483 // * free, OR
1484 // * already held by THIS node from a *prior* add (not one
1485 // we placed this call) — overwrite it in place.
1486 // The in-place overwrite is what makes a re-add (reconnect /
1487 // weight change) idempotent WITHOUT first removing the
1488 // node's vnodes: clearing first (the previous fix) left a
1489 // transient window where the node had no ring presence and
1490 // traffic could misroute. A slot held by a DIFFERENT node,
1491 // or by one of this node's vnodes we ALREADY placed this
1492 // call (an intra-node hash collision), is a true collision —
1493 // probe on so every vnode stays distinct.
1494 loop {
1495 match self.hash_ring.get(&hash).map(|r| *r) {
1496 None => break,
1497 Some(occupant) if occupant == node_id && !placed.contains(&hash) => break,
1498 Some(_) => hash = hash.wrapping_add(1),
1499 }
1500 }
1501 self.hash_ring.insert(hash, node_id);
1502 placed.insert(hash);
1503 }
1504 // Sweep any vnodes still tagged with this node that we did NOT
1505 // (re)place this call — stale entries from an earlier add whose
1506 // probe path changed (e.g. a collision partner was since
1507 // removed). The node's freshly-placed vnodes are all in `placed`
1508 // and inserted above, so it is never absent from the ring; this
1509 // only drops leftovers. Skipped entirely on a fresh add: a node
1510 // with no prior endpoint entry has no prior vnodes to leave
1511 // behind, so the O(ring) retain scan would never remove anything
1512 // — important when many nodes join a large ring.
1513 if was_present {
1514 self.hash_ring
1515 .retain(|k, v| *v != node_id || placed.contains(k));
1516 }
1517 }
1518
1519 fn remove_from_hash_ring(&self, node_id: &NodeId) {
1520 self.hash_ring.retain(|_, v| v != node_id);
1521 }
1522
1523 fn hash_key(&self, key: &str) -> u64 {
1524 // Simple FNV-1a hash
1525 let mut hash: u64 = 0xcbf29ce484222325;
1526 for byte in key.bytes() {
1527 hash ^= byte as u64;
1528 hash = hash.wrapping_mul(0x100000001b3);
1529 }
1530 hash
1531 }
1532
1533 /// Get statistics
1534 pub fn stats(&self) -> LoadBalancerStats {
1535 let mut healthy = 0u32;
1536 let mut total_connections = 0u64;
1537 let mut total_load = 0.0;
1538
1539 let snapshot = self.endpoint_list.load();
1540 for state in snapshot.iter() {
1541 if state.health() == HealthStatus::Healthy {
1542 healthy += 1;
1543 }
1544 total_connections += state.connections.load(Ordering::Relaxed) as u64;
1545 total_load += state.load_score();
1546 }
1547
1548 let endpoint_count = snapshot.len() as u32;
1549
1550 LoadBalancerStats {
1551 total_selections: self.total_selections.load(Ordering::Relaxed),
1552 failed_selections: self.failed_selections.load(Ordering::Relaxed),
1553 active_endpoints: endpoint_count,
1554 healthy_endpoints: healthy,
1555 total_connections,
1556 avg_load_score: if endpoint_count > 0 {
1557 total_load / endpoint_count as f64
1558 } else {
1559 0.0
1560 },
1561 }
1562 }
1563
1564 /// Get all endpoints as snapshots
1565 pub fn endpoints(&self) -> Vec<Endpoint> {
1566 self.endpoint_list
1567 .load()
1568 .iter()
1569 .map(|state| Endpoint {
1570 node_id: state.node_id,
1571 weight: state.weight,
1572 health: state.health(),
1573 metrics: state.metrics(),
1574 tags: state.tags.clone(),
1575 priority: state.priority,
1576 enabled: state.is_enabled(),
1577 zone: state.zone.clone(),
1578 })
1579 .collect()
1580 }
1581
1582 /// Get endpoint count
1583 pub fn endpoint_count(&self) -> usize {
1584 self.endpoint_list.load().len()
1585 }
1586}
1587
1588/// Generate random usize.
1589///
1590/// Aborts on `getrandom` failure rather than panic-unwinding
1591/// through the FFI boundary. Load-balance random numbers are not
1592/// directly auth-bearing, but this function is reachable from hot
1593/// paths called by `extern "C"` FFI consumers (Python / Node / Go
1594/// bindings) — a `getrandom` failure that unwound across the C
1595/// ABI would be undefined behaviour. `process::abort` is
1596/// `extern "C"`-safe (terminates rather than unwinds) and
1597/// loss-of-availability is the only safe response when the system
1598/// can't produce randomness.
1599fn random_usize() -> usize {
1600 let mut bytes = [0u8; 8];
1601 if let Err(e) = getrandom::fill(&mut bytes) {
1602 eprintln!(
1603 "FATAL: behavior::loadbalance::random_usize getrandom failure ({e:?}); \
1604 aborting to avoid panic across the FFI boundary"
1605 );
1606 std::process::abort();
1607 }
1608 usize::from_le_bytes(bytes)
1609}
1610
1611/// Generate random f64 uniformly in the half-open interval [0.0, 1.0).
1612///
1613/// Uses the top 53 bits of entropy (the f64 mantissa width) divided by
1614/// `2^53`, which guarantees the result is strictly less than 1.0. The naive
1615/// `r as f64 / u64::MAX as f64` approach can round up to exactly 1.0 because
1616/// `u64::MAX as f64` itself rounds to `2^64`.
1617fn random_f64() -> f64 {
1618 let r = random_usize() as u64;
1619 (r >> 11) as f64 / ((1u64 << 53) as f64)
1620}
1621
1622#[cfg(test)]
1623mod tests {
1624 use super::*;
1625
1626 fn make_node_id(n: u8) -> NodeId {
1627 let mut id = [0u8; 32];
1628 id[0] = n;
1629 id
1630 }
1631
1632 /// Pin perf #149: `update_metrics` followed by `load_score`
1633 /// observes the new metrics value lock-free via ArcSwap. A
1634 /// regression that reverts to `RwLock<LoadMetrics>` would
1635 /// still pass this functional test but would re-introduce the
1636 /// per-event lock-acquire + struct clone. The pointer-identity
1637 /// check on `Arc::ptr_eq` distinguishes ArcSwap (writer's Arc
1638 /// is the reader's Arc) from any swap-via-clone alternative.
1639 #[test]
1640 fn endpoint_state_metrics_arc_swap_visibility_and_no_clone_on_read() {
1641 let lb = LoadBalancer::with_strategy(Strategy::LeastLoad);
1642 let node = make_node_id(7);
1643 lb.add_endpoint(Endpoint::new(node));
1644
1645 // Initial: defaulted LoadMetrics; score is small but
1646 // computed lock-free.
1647 {
1648 let state_ref = lb.endpoints.get(&node).expect("endpoint registered");
1649 let initial_load = state_ref.load_score();
1650 assert!(
1651 initial_load >= 0.0,
1652 "load_score must compute from current ArcSwap snapshot"
1653 );
1654 // The internal Arc backing metrics — readers should
1655 // observe the SAME Arc identity across two loads with
1656 // no intervening write.
1657 let arc1 = state_ref.metrics.load_full();
1658 let arc2 = state_ref.metrics.load_full();
1659 assert!(
1660 Arc::ptr_eq(&arc1, &arc2),
1661 "two reads with no writer in between must share the Arc — \
1662 confirms ArcSwap (not RwLock<T>) backing"
1663 );
1664 }
1665
1666 // Update to a heavily-loaded snapshot.
1667 let busy = LoadMetrics {
1668 cpu_usage: 0.95,
1669 error_rate: 0.5,
1670 ..Default::default()
1671 };
1672 let busy_score = busy.load_score();
1673 lb.update_metrics(&node, busy);
1674
1675 let state_ref = lb.endpoints.get(&node).expect("endpoint still here");
1676 assert!(
1677 (state_ref.load_score() - busy_score).abs() < f64::EPSILON,
1678 "post-update load_score must reflect the new snapshot"
1679 );
1680 }
1681
1682 #[test]
1683 fn test_health_status() {
1684 assert!(HealthStatus::Healthy.can_receive_traffic());
1685 assert!(HealthStatus::Degraded.can_receive_traffic());
1686 assert!(!HealthStatus::Unhealthy.can_receive_traffic());
1687 assert!(!HealthStatus::Unknown.can_receive_traffic());
1688
1689 assert_eq!(HealthStatus::Healthy.weight_multiplier(), 1.0);
1690 assert_eq!(HealthStatus::Degraded.weight_multiplier(), 0.5);
1691 assert_eq!(HealthStatus::Unhealthy.weight_multiplier(), 0.0);
1692 }
1693
1694 #[test]
1695 fn test_load_metrics() {
1696 let metrics = LoadMetrics {
1697 cpu_usage: 0.5,
1698 memory_usage: 0.3,
1699 active_connections: 100,
1700 requests_per_second: 1000.0,
1701 avg_response_time_ms: 50.0,
1702 error_rate: 0.01,
1703 queue_depth: 10,
1704 bandwidth_usage: 0.2,
1705 updated_at: 0,
1706 };
1707
1708 let score = metrics.load_score();
1709 assert!(score > 0.0 && score < 1.0);
1710 assert!(!metrics.is_overloaded());
1711
1712 let overloaded = LoadMetrics {
1713 cpu_usage: 0.95,
1714 ..Default::default()
1715 };
1716 assert!(overloaded.is_overloaded());
1717 }
1718
1719 #[test]
1720 fn test_endpoint() {
1721 let node_id = make_node_id(1);
1722 let endpoint = Endpoint::new(node_id)
1723 .with_weight(200)
1724 .with_zone("us-east-1")
1725 .with_tag("gpu");
1726
1727 assert_eq!(endpoint.weight, 200);
1728 assert_eq!(endpoint.zone, Some("us-east-1".to_string()));
1729 assert!(endpoint.tags.contains(&"gpu".to_string()));
1730 assert!(endpoint.is_available());
1731 assert_eq!(endpoint.effective_weight(), 200.0);
1732 }
1733
1734 #[test]
1735 fn test_load_balancer_round_robin() {
1736 let lb = LoadBalancer::with_strategy(Strategy::RoundRobin);
1737
1738 lb.add_endpoint(Endpoint::new(make_node_id(1)));
1739 lb.add_endpoint(Endpoint::new(make_node_id(2)));
1740 lb.add_endpoint(Endpoint::new(make_node_id(3)));
1741
1742 let ctx = RequestContext::new();
1743
1744 // Should cycle through endpoints
1745 let mut selected = Vec::new();
1746 for _ in 0..6 {
1747 let selection = lb.select(&ctx).unwrap();
1748 selected.push(selection.node_id[0]);
1749 }
1750
1751 // Should have selected each endpoint twice
1752 assert_eq!(selected.iter().filter(|&&x| x == 1).count(), 2);
1753 assert_eq!(selected.iter().filter(|&&x| x == 2).count(), 2);
1754 assert_eq!(selected.iter().filter(|&&x| x == 3).count(), 2);
1755 }
1756
1757 #[test]
1758 fn test_load_balancer_least_connections() {
1759 let lb = LoadBalancer::with_strategy(Strategy::LeastConnections);
1760
1761 lb.add_endpoint(Endpoint::new(make_node_id(1)));
1762 lb.add_endpoint(Endpoint::new(make_node_id(2)));
1763 lb.add_endpoint(Endpoint::new(make_node_id(3)));
1764
1765 let ctx = RequestContext::new();
1766
1767 // First selection - all have 0 connections
1768 let s1 = lb.select(&ctx).unwrap();
1769 // Don't record completion, so connection stays
1770
1771 // Second selection should pick a different node
1772 let s2 = lb.select(&ctx).unwrap();
1773 assert_ne!(s1.node_id, s2.node_id);
1774 }
1775
1776 #[test]
1777 fn test_load_balancer_weighted() {
1778 let lb = LoadBalancer::with_strategy(Strategy::WeightedRoundRobin);
1779
1780 lb.add_endpoint(Endpoint::new(make_node_id(1)).with_weight(100));
1781 lb.add_endpoint(Endpoint::new(make_node_id(2)).with_weight(200));
1782 lb.add_endpoint(Endpoint::new(make_node_id(3)).with_weight(300));
1783
1784 let ctx = RequestContext::new();
1785
1786 let mut counts = std::collections::HashMap::new();
1787 for _ in 0..600 {
1788 let selection = lb.select(&ctx).unwrap();
1789 lb.record_completion(&selection.node_id, true);
1790 *counts.entry(selection.node_id[0]).or_insert(0) += 1;
1791 }
1792
1793 // Node 3 should get most traffic, node 1 least
1794 assert!(counts.get(&3).unwrap() > counts.get(&2).unwrap());
1795 assert!(counts.get(&2).unwrap() > counts.get(&1).unwrap());
1796 }
1797
1798 #[test]
1799 fn test_regression_weighted_lc_preserves_fractional_weights() {
1800 // Regression (LOW, BUGS.md): `select_weighted_least_connections`
1801 // used `.max(1.0)` as a divide-by-zero guard, which also
1802 // collapsed every weight in `(0, 1]` onto `1.0`. An endpoint
1803 // with weight `0.1` was scored identically to one with
1804 // `1.0`, silently degrading weighted-LC into plain LC
1805 // whenever operators configured sub-unit weights.
1806 //
1807 // Fix: use a small positive epsilon instead, so fractional
1808 // weights keep their relative ordering.
1809 let lb = LoadBalancer::with_strategy(Strategy::WeightedLeastConnections);
1810
1811 // Two endpoints with identical connection counts but very
1812 // different fractional weights.
1813 lb.add_endpoint(Endpoint::new(make_node_id(1)).with_weight(10));
1814 lb.add_endpoint(Endpoint::new(make_node_id(2)).with_weight(1));
1815
1816 let ctx = RequestContext::new();
1817 let mut counts = std::collections::HashMap::new();
1818 for _ in 0..600 {
1819 let selection = lb.select(&ctx).unwrap();
1820 // Don't record completion so connections stay matched.
1821 *counts.entry(selection.node_id[0]).or_insert(0_u32) += 1;
1822 }
1823
1824 // The 10x-weighted endpoint should overwhelmingly win the
1825 // "connections/weight" tiebreak when connection counts are
1826 // comparable. With the old `.max(1.0)` collapse, the two
1827 // endpoints would score identically and a later tiebreaker
1828 // would pick one consistently — distribution would be either
1829 // 50/50 or 100/0 depending on ordering.
1830 let high = *counts.get(&1).unwrap_or(&0);
1831 let low = *counts.get(&2).unwrap_or(&0);
1832 assert!(
1833 high > low * 2,
1834 "weight=10 endpoint must get >2x more traffic than weight=1 \
1835 (got {high} vs {low})",
1836 );
1837 }
1838
1839 #[test]
1840 fn test_regression_weighted_rr_precision_past_f64_mantissa() {
1841 // Regression (LOW, BUGS.md): `select_weighted_round_robin`
1842 // used `counter as f64 % total_weight`. Past 2^53 selections
1843 // the `as f64` cast dropped the low bits and rotation stalled
1844 // on a narrow set of indices. The fix scales weights to
1845 // integers and does the modulus in u64 space.
1846 let lb = LoadBalancer::with_strategy(Strategy::WeightedRoundRobin);
1847 lb.add_endpoint(Endpoint::new(make_node_id(1)).with_weight(1));
1848 lb.add_endpoint(Endpoint::new(make_node_id(2)).with_weight(1));
1849 lb.add_endpoint(Endpoint::new(make_node_id(3)).with_weight(1));
1850
1851 // Jump the counter past the f64 mantissa boundary. The raw
1852 // `AtomicU64` is private but `select` starts from the internal
1853 // counter; we simulate a long-running process by selecting
1854 // once (to warm up) and then seeding the rr_counter via the
1855 // backing atomic through a public helper.
1856 //
1857 // Without direct access we exercise ordinary rotation; the
1858 // real precision gain is covered by the unit-level property
1859 // that `(counter % scaled_total)` is exact for all u64 inputs.
1860 let ctx = RequestContext::new();
1861 let mut counts = std::collections::HashMap::new();
1862 for _ in 0..300 {
1863 let sel = lb.select(&ctx).unwrap();
1864 *counts.entry(sel.node_id[0]).or_insert(0) += 1;
1865 }
1866
1867 // Uniform weights → each of three endpoints gets ~100 hits.
1868 // This is a basic sanity test; the u64 exactness is verified
1869 // by construction (integer math has no rounding).
1870 for id in 1..=3u8 {
1871 let got = counts.get(&id).copied().unwrap_or(0);
1872 assert!(
1873 (80..=120).contains(&got),
1874 "endpoint {id} should get ~100 hits, got {got}",
1875 );
1876 }
1877 }
1878
1879 #[test]
1880 fn test_load_balancer_health() {
1881 let lb = LoadBalancer::with_strategy(Strategy::RoundRobin);
1882
1883 lb.add_endpoint(Endpoint::new(make_node_id(1)));
1884 lb.add_endpoint(Endpoint::new(make_node_id(2)));
1885
1886 let ctx = RequestContext::new();
1887
1888 // Mark node 1 as unhealthy
1889 lb.update_health(&make_node_id(1), HealthStatus::Unhealthy);
1890
1891 // All selections should go to node 2
1892 for _ in 0..10 {
1893 let selection = lb.select(&ctx).unwrap();
1894 assert_eq!(selection.node_id[0], 2);
1895 }
1896 }
1897
1898 #[test]
1899 fn test_load_balancer_zone_affinity() {
1900 let config = LoadBalancerConfig {
1901 strategy: Strategy::RoundRobin,
1902 zone_aware: true,
1903 ..Default::default()
1904 };
1905 let lb = LoadBalancer::new(config);
1906
1907 lb.add_endpoint(Endpoint::new(make_node_id(1)).with_zone("us-east"));
1908 lb.add_endpoint(Endpoint::new(make_node_id(2)).with_zone("us-west"));
1909
1910 let ctx = RequestContext::new().with_zone("us-east");
1911
1912 // Should prefer us-east node
1913 for _ in 0..10 {
1914 let selection = lb.select(&ctx).unwrap();
1915 assert_eq!(selection.node_id[0], 1);
1916 }
1917 }
1918
1919 #[test]
1920 fn test_load_balancer_consistent_hash() {
1921 let lb = LoadBalancer::with_strategy(Strategy::ConsistentHash);
1922
1923 lb.add_endpoint(Endpoint::new(make_node_id(1)));
1924 lb.add_endpoint(Endpoint::new(make_node_id(2)));
1925 lb.add_endpoint(Endpoint::new(make_node_id(3)));
1926
1927 // Same session should always go to same node
1928 let ctx = RequestContext::new().with_session("user-123");
1929
1930 let first = lb.select(&ctx).unwrap();
1931 for _ in 0..10 {
1932 let selection = lb.select(&ctx).unwrap();
1933 assert_eq!(selection.node_id, first.node_id);
1934 }
1935 }
1936
1937 #[test]
1938 fn test_load_balancer_circuit_breaker() {
1939 let config = LoadBalancerConfig {
1940 strategy: Strategy::RoundRobin,
1941 circuit_recovery_time_ms: 100,
1942 ..Default::default()
1943 };
1944 let lb = LoadBalancer::new(config);
1945
1946 lb.add_endpoint(Endpoint::new(make_node_id(1)));
1947 lb.add_endpoint(Endpoint::new(make_node_id(2)));
1948
1949 let ctx = RequestContext::new();
1950
1951 // Simulate 5 consecutive failures on node 1
1952 for _ in 0..5 {
1953 lb.record_completion(&make_node_id(1), false);
1954 }
1955
1956 // Node 1's circuit should be open, all traffic to node 2
1957 for _ in 0..10 {
1958 let selection = lb.select(&ctx).unwrap();
1959 assert_eq!(selection.node_id[0], 2);
1960 }
1961 }
1962
1963 #[test]
1964 fn test_load_balancer_stats() {
1965 let lb = LoadBalancer::with_strategy(Strategy::RoundRobin);
1966
1967 lb.add_endpoint(Endpoint::new(make_node_id(1)));
1968 lb.add_endpoint(Endpoint::new(make_node_id(2)));
1969
1970 let ctx = RequestContext::new();
1971
1972 for _ in 0..10 {
1973 let selection = lb.select(&ctx).unwrap();
1974 lb.record_completion(&selection.node_id, true);
1975 }
1976
1977 let stats = lb.stats();
1978 assert_eq!(stats.total_selections, 10);
1979 assert_eq!(stats.active_endpoints, 2);
1980 assert_eq!(stats.healthy_endpoints, 2);
1981 }
1982
1983 #[test]
1984 fn test_no_endpoints_error() {
1985 let lb = LoadBalancer::with_strategy(Strategy::RoundRobin);
1986 let ctx = RequestContext::new();
1987
1988 let result = lb.select(&ctx);
1989 assert!(matches!(
1990 result,
1991 Err(LoadBalancerError::NoEndpointsAvailable)
1992 ));
1993 }
1994
1995 #[test]
1996 fn test_required_tags() {
1997 let lb = LoadBalancer::with_strategy(Strategy::RoundRobin);
1998
1999 lb.add_endpoint(Endpoint::new(make_node_id(1)).with_tag("gpu"));
2000 lb.add_endpoint(Endpoint::new(make_node_id(2)).with_tag("cpu"));
2001
2002 let ctx = RequestContext::new().require_tag("gpu");
2003
2004 // Should only select gpu-tagged node
2005 for _ in 0..10 {
2006 let selection = lb.select(&ctx).unwrap();
2007 assert_eq!(selection.node_id[0], 1);
2008 }
2009 }
2010
2011 // ---- Regression tests ----
2012
2013 #[test]
2014 fn test_regression_consistent_hash_deterministic() {
2015 // Regression: consistent hash iterated DashMap in arbitrary order
2016 // instead of sorted order, so the same key could map to different
2017 // nodes across calls. Now uses sorted ring + binary search.
2018 let lb = LoadBalancer::with_strategy(Strategy::ConsistentHash);
2019
2020 lb.add_endpoint(Endpoint::new(make_node_id(1)));
2021 lb.add_endpoint(Endpoint::new(make_node_id(2)));
2022 lb.add_endpoint(Endpoint::new(make_node_id(3)));
2023 lb.add_endpoint(Endpoint::new(make_node_id(4)));
2024
2025 // Many different keys should each consistently map to the same node
2026 for i in 0..50 {
2027 let key = format!("session-{}", i);
2028 let ctx = RequestContext::new().with_routing_key(&key);
2029
2030 let first = lb.select(&ctx).unwrap().node_id;
2031 for _ in 0..20 {
2032 let again = lb.select(&ctx).unwrap().node_id;
2033 assert_eq!(
2034 first, again,
2035 "consistent hash must return same node for key '{}'",
2036 key
2037 );
2038 }
2039 }
2040 }
2041
2042 #[test]
2043 fn test_regression_nan_metrics_no_panic() {
2044 // Regression: partial_cmp().unwrap() panicked when metrics
2045 // contained NaN. Now uses total_cmp() which handles NaN.
2046 let lb = LoadBalancer::with_strategy(Strategy::LeastLatency);
2047
2048 let mut ep1 = Endpoint::new(make_node_id(1));
2049 ep1.metrics.avg_response_time_ms = f64::NAN;
2050 lb.add_endpoint(ep1);
2051
2052 let mut ep2 = Endpoint::new(make_node_id(2));
2053 ep2.metrics.avg_response_time_ms = 50.0;
2054 lb.add_endpoint(ep2);
2055
2056 let ctx = RequestContext::new();
2057 // Must not panic
2058 let result = lb.select(&ctx);
2059 assert!(result.is_ok(), "NaN metrics must not panic");
2060 }
2061
2062 #[test]
2063 fn test_regression_nan_load_score_no_panic() {
2064 // Same NaN regression for LeastLoad strategy.
2065 let lb = LoadBalancer::with_strategy(Strategy::LeastLoad);
2066
2067 let mut ep1 = Endpoint::new(make_node_id(1));
2068 ep1.metrics.cpu_usage = f64::NAN;
2069 lb.add_endpoint(ep1);
2070
2071 lb.add_endpoint(Endpoint::new(make_node_id(2)));
2072
2073 let ctx = RequestContext::new();
2074 let result = lb.select(&ctx);
2075 assert!(result.is_ok(), "NaN load score must not panic");
2076 }
2077
2078 #[test]
2079 fn test_regression_zone_fallback_respected() {
2080 // Regression: zone_fallback config was never read. When set to
2081 // false, requests with a client_zone that matches no endpoint
2082 // should fail, not silently fall back to non-zone endpoints.
2083 let config = LoadBalancerConfig {
2084 strategy: Strategy::RoundRobin,
2085 zone_aware: true,
2086 zone_fallback: false, // <-- this was previously ignored
2087 ..Default::default()
2088 };
2089 let lb = LoadBalancer::new(config);
2090
2091 lb.add_endpoint(Endpoint::new(make_node_id(1)).with_zone("us-west"));
2092 lb.add_endpoint(Endpoint::new(make_node_id(2)).with_zone("us-west"));
2093
2094 // Client is in eu-central — no endpoints match
2095 let ctx = RequestContext::new().with_zone("eu-central");
2096 let result = lb.select(&ctx);
2097
2098 assert!(
2099 result.is_err(),
2100 "with zone_fallback=false, mismatched zone must return error"
2101 );
2102 }
2103
2104 #[test]
2105 fn test_zone_fallback_true_allows_cross_zone() {
2106 // Verify that zone_fallback=true (default) still works correctly.
2107 let config = LoadBalancerConfig {
2108 strategy: Strategy::RoundRobin,
2109 zone_aware: true,
2110 zone_fallback: true,
2111 ..Default::default()
2112 };
2113 let lb = LoadBalancer::new(config);
2114
2115 lb.add_endpoint(Endpoint::new(make_node_id(1)).with_zone("us-west"));
2116
2117 let ctx = RequestContext::new().with_zone("eu-central");
2118 let result = lb.select(&ctx);
2119
2120 assert!(
2121 result.is_ok(),
2122 "with zone_fallback=true, cross-zone should succeed"
2123 );
2124 }
2125
2126 #[test]
2127 fn test_regression_random_f64_never_reaches_one() {
2128 // Regression: `r as f64 / u64::MAX as f64` could return exactly 1.0
2129 // because `u64::MAX as f64` rounds to 2^64. Now uses the 53-bit
2130 // mantissa / 2^53 pattern which is strictly in [0, 1).
2131 for _ in 0..10_000 {
2132 let r = random_f64();
2133 assert!((0.0..1.0).contains(&r), "random_f64 out of [0,1): {}", r);
2134 }
2135 }
2136
2137 #[test]
2138 fn test_regression_max_connections_cap_enforced_concurrently() {
2139 // Regression: the select() path loaded `connections` with Relaxed
2140 // then incremented in record_request, allowing N concurrent
2141 // selectors to all pass the check and collectively exceed the cap.
2142 // Now reservation is atomic via fetch_update.
2143 use std::sync::Arc;
2144 use std::thread;
2145
2146 const CAP: u32 = 5;
2147 const THREADS: u32 = 16;
2148
2149 let config = LoadBalancerConfig {
2150 strategy: Strategy::RoundRobin,
2151 max_connections_per_endpoint: CAP,
2152 ..Default::default()
2153 };
2154 let lb = Arc::new(LoadBalancer::new(config));
2155 // Single endpoint so every selection contends for the same cap.
2156 lb.add_endpoint(Endpoint::new(make_node_id(1)));
2157
2158 let mut handles = Vec::new();
2159 for _ in 0..THREADS {
2160 let lb = Arc::clone(&lb);
2161 handles.push(thread::spawn(move || {
2162 // Each thread tries to select one connection and holds it.
2163 let ctx = RequestContext::new();
2164 lb.select(&ctx).ok()
2165 }));
2166 }
2167
2168 let successes = handles
2169 .into_iter()
2170 .filter_map(|h| h.join().unwrap())
2171 .count();
2172
2173 // At most CAP threads may have been granted a connection.
2174 assert!(
2175 successes <= CAP as usize,
2176 "concurrent selectors exceeded cap: {} > {}",
2177 successes,
2178 CAP
2179 );
2180 // And the endpoint's connection count must equal successes.
2181 let state = lb.endpoints.get(&make_node_id(1)).unwrap();
2182 assert_eq!(
2183 state.connections.load(Ordering::Acquire),
2184 successes as u32,
2185 "connection counter must match granted selections"
2186 );
2187 }
2188
2189 #[test]
2190 fn test_regression_circuit_breaker_half_open_single_probe() {
2191 // Regression: on recovery expiry, `is_circuit_open` fully closed
2192 // the breaker, letting every concurrent request hit a possibly
2193 // still-broken endpoint. Now exactly one probe is admitted and
2194 // subsequent callers continue to see the breaker as open until the
2195 // probe's outcome is recorded.
2196 let config = LoadBalancerConfig {
2197 strategy: Strategy::RoundRobin,
2198 circuit_recovery_time_ms: 50,
2199 ..Default::default()
2200 };
2201 let lb = LoadBalancer::new(config);
2202 lb.add_endpoint(Endpoint::new(make_node_id(1)));
2203 let ctx = RequestContext::new();
2204
2205 // Trip the breaker by driving 5 real selections that all fail. Going
2206 // through select() keeps the connection counter consistent — calling
2207 // record_completion() without a matching record_request() would
2208 // underflow.
2209 for _ in 0..5 {
2210 let sel = lb.select(&ctx).expect("admitted before trip");
2211 lb.record_completion(&sel.node_id, false);
2212 }
2213
2214 // Before recovery: all requests rejected.
2215 assert!(lb.select(&ctx).is_err(), "open breaker must reject");
2216
2217 // Wait past the recovery window.
2218 std::thread::sleep(Duration::from_millis(75));
2219
2220 // First request after recovery: admitted as the probe.
2221 let probe = lb.select(&ctx);
2222 assert!(probe.is_ok(), "first request after recovery is the probe");
2223
2224 // Second request while probe is still in flight: must be rejected.
2225 let second = lb.select(&ctx);
2226 assert!(
2227 second.is_err(),
2228 "while probe is in flight, other requests must still be rejected"
2229 );
2230
2231 // Probe reports failure → breaker re-opens and recovery timer resets.
2232 lb.record_completion(&probe.unwrap().node_id, false);
2233 assert!(
2234 lb.select(&ctx).is_err(),
2235 "failed probe must keep breaker open"
2236 );
2237
2238 // After another recovery window, the next probe succeeds and closes
2239 // the breaker.
2240 std::thread::sleep(Duration::from_millis(75));
2241 let probe2 = lb.select(&ctx).expect("second probe admitted");
2242 lb.record_completion(&probe2.node_id, true);
2243
2244 // Breaker is now fully closed — subsequent requests go through.
2245 assert!(lb.select(&ctx).is_ok(), "successful probe closes breaker");
2246 }
2247
2248 /// Regression for BUG_AUDIT_2026_04_30_CORE.md #101: pre-fix
2249 /// `is_circuit_open` was both a predicate AND CAS-claimed
2250 /// the half-open probe slot when called past the recovery
2251 /// window. `get_available_endpoints` calls it for every
2252 /// endpoint being filtered; with N circuit-open endpoints
2253 /// past their recovery window, all N got the probe slot
2254 /// claimed but only one was actually selected. The N-1
2255 /// others permanently held `half_open_probe == true` with
2256 /// no in-flight request — every subsequent
2257 /// `is_circuit_open` then returned true forever, and the
2258 /// breaker never recovered until process restart.
2259 ///
2260 /// We pin the fix by:
2261 /// 1. Building a load balancer with 3 endpoints.
2262 /// 2. Tripping each endpoint's breaker.
2263 /// 3. Waiting past the recovery window.
2264 /// 4. Calling `select` once — this triggers
2265 /// `get_available_endpoints`, which scans all 3
2266 /// endpoints. Only the SELECTED endpoint should claim
2267 /// the probe slot; the other 2 must NOT.
2268 /// 5. Asserting the unselected endpoints have
2269 /// `half_open_probe == false`.
2270 #[test]
2271 fn circuit_breaker_does_not_leak_probe_slot_on_multi_endpoint_scan() {
2272 let config = LoadBalancerConfig {
2273 strategy: Strategy::RoundRobin,
2274 circuit_recovery_time_ms: 50,
2275 ..Default::default()
2276 };
2277 let lb = LoadBalancer::new(config);
2278 for i in 1..=3 {
2279 lb.add_endpoint(Endpoint::new(make_node_id(i)));
2280 }
2281 let ctx = RequestContext::new();
2282
2283 // Trip every endpoint's breaker. Default failure
2284 // threshold is 5 consecutive failures.
2285 for _ in 0..5 {
2286 for i in 1..=3 {
2287 let nid = make_node_id(i);
2288 // Manually trip via record_completion(false). We
2289 // use a dummy connection-counter via select() to
2290 // keep the connection counter consistent; if no
2291 // endpoint is selectable, force it.
2292 if let Some(ep) = lb.endpoints.get(&nid) {
2293 // Simulate a request lifecycle.
2294 ep.try_record_request(u32::MAX);
2295 }
2296 lb.record_completion(&nid, false);
2297 }
2298 }
2299
2300 // All breakers should be open. select() rejects pre-recovery.
2301 assert!(
2302 lb.select(&ctx).is_err(),
2303 "all breakers open pre-recovery — select must fail"
2304 );
2305
2306 // Wait past recovery window.
2307 std::thread::sleep(Duration::from_millis(75));
2308
2309 // First select: scans all 3 endpoints. Selects ONE. The
2310 // other 2 must NOT have their probe slots claimed.
2311 let probe = lb.select(&ctx).expect("recovery elapsed → probe admitted");
2312
2313 // Audit the half_open_probe state on each endpoint:
2314 // exactly one (the selected) should be true; the other
2315 // two MUST be false. Pre-fix all three would be true.
2316 let mut claimed = 0u32;
2317 for i in 1..=3 {
2318 let nid = make_node_id(i);
2319 let ep = lb.endpoints.get(&nid).unwrap();
2320 if ep.half_open_probe.load(Ordering::Acquire) {
2321 claimed += 1;
2322 // The claimed slot must be on the selected endpoint.
2323 assert_eq!(
2324 nid, probe.node_id,
2325 "only the selected endpoint may have its probe slot claimed"
2326 );
2327 }
2328 }
2329 assert_eq!(
2330 claimed, 1,
2331 "exactly one endpoint should have a claimed probe slot — \
2332 pre-fix this was 3 (the filter-time scan claimed all)"
2333 );
2334
2335 // Probe success → selected endpoint's breaker closes;
2336 // the other two are still in their post-recovery state.
2337 lb.record_completion(&probe.node_id, true);
2338 }
2339
2340 /// Cubic-ai P1: with `N` selectors racing concurrently against
2341 /// a circuit-open endpoint that just exited its recovery window,
2342 /// the strict half-open contract says EXACTLY one selector
2343 /// admits the probe — every other selector that lost the
2344 /// `try_claim_half_open_probe` CAS must skip the endpoint, not
2345 /// fall through to `try_record_request` and send extra traffic
2346 /// to the (still potentially sick) endpoint alongside the real
2347 /// probe.
2348 ///
2349 /// Pre-fix (loose) semantics: losers of the probe-claim CAS
2350 /// proceeded via `try_record_request`, so under sufficient
2351 /// concurrency `successes` could be `> 1`. Post-fix (strict)
2352 /// semantics: losers `continue`, the retry's
2353 /// `get_available_endpoints` sees `half_open_probe == true`
2354 /// and filters the endpoint out, the loop exits with
2355 /// `NoEndpointsAvailable`. Net effect: at most one selector
2356 /// admits per recovery cycle.
2357 ///
2358 /// The test:
2359 /// 1. Trips a single endpoint's breaker.
2360 /// 2. Waits past the recovery window so the next selection
2361 /// enters half-open.
2362 /// 3. Spawns `N` threads, gates them on a Barrier so they
2363 /// enter `select()` as close to simultaneously as
2364 /// possible, each retains its `Selection` (no
2365 /// `record_completion`) so the probe slot stays claimed.
2366 /// 4. Asserts `successes == 1`. Pre-fix this could fire
2367 /// `> 1` non-deterministically; post-fix it must be
2368 /// exactly 1.
2369 #[test]
2370 fn select_strict_half_open_admits_exactly_one_probe_under_concurrent_selectors() {
2371 use std::sync::Barrier;
2372 use std::thread;
2373
2374 const N: usize = 32;
2375 let config = LoadBalancerConfig {
2376 strategy: Strategy::RoundRobin,
2377 circuit_recovery_time_ms: 50,
2378 ..Default::default()
2379 };
2380 let lb = Arc::new(LoadBalancer::new(config));
2381 lb.add_endpoint(Endpoint::new(make_node_id(1)));
2382 let ctx = RequestContext::new();
2383
2384 // Trip the breaker (5 consecutive failures).
2385 for _ in 0..5 {
2386 let sel = lb.select(&ctx).expect("admitted before trip");
2387 lb.record_completion(&sel.node_id, false);
2388 }
2389 assert!(lb.select(&ctx).is_err(), "open breaker must reject");
2390
2391 // Wait past the recovery window so the next selection
2392 // observes `half_open_probe == false` and is admitted.
2393 thread::sleep(Duration::from_millis(75));
2394
2395 // Race N threads through select(). DO NOT call
2396 // record_completion — that would clear the probe slot
2397 // and let the next thread succeed legitimately. The
2398 // strict contract is: exactly one admits while the probe
2399 // is in flight.
2400 let barrier = Arc::new(Barrier::new(N));
2401 let mut handles = Vec::with_capacity(N);
2402 for _ in 0..N {
2403 let lb = Arc::clone(&lb);
2404 let barrier = Arc::clone(&barrier);
2405 handles.push(thread::spawn(move || {
2406 let ctx = RequestContext::new();
2407 barrier.wait();
2408 lb.select(&ctx).is_ok()
2409 }));
2410 }
2411 let successes: usize = handles
2412 .into_iter()
2413 .map(|h| h.join().unwrap() as usize)
2414 .sum();
2415
2416 assert_eq!(
2417 successes, 1,
2418 "strict half-open: exactly one selector must admit while the \
2419 probe is in flight (got {successes} of {N}). Pre-fix the \
2420 loose semantic let losers of the probe-claim CAS proceed \
2421 through try_record_request, sending extra traffic to a \
2422 still-recovering endpoint."
2423 );
2424
2425 // Sanity: the probe slot is still claimed (no completion
2426 // was recorded), and the breaker is still nominally open.
2427 let ep = lb.endpoints.get(&make_node_id(1)).unwrap();
2428 assert!(
2429 ep.half_open_probe.load(Ordering::Acquire),
2430 "probe slot must remain claimed across the test (no completion was recorded)"
2431 );
2432 assert!(
2433 ep.circuit_open.load(Ordering::Acquire),
2434 "circuit must remain open until probe completion"
2435 );
2436 }
2437
2438 /// Companion to `select_strict_half_open_admits_exactly_one_probe...`:
2439 /// strict half-open semantics must NOT serialize independent
2440 /// endpoints. With two distinct circuit-open endpoints both
2441 /// past their recovery window, two concurrent selectors should
2442 /// EACH succeed — one probe per endpoint, since each endpoint's
2443 /// `half_open_probe` is its own slot. Pre-fix this also worked
2444 /// (loose semantic), but a naive "strict gate" implementation
2445 /// could accidentally over-tighten and lock out legitimate
2446 /// per-endpoint probes; this test pins that the gate stays
2447 /// scoped to the endpoint it guards.
2448 #[test]
2449 fn select_strict_half_open_allows_concurrent_probes_on_distinct_endpoints() {
2450 use std::sync::Barrier;
2451 use std::thread;
2452
2453 let config = LoadBalancerConfig {
2454 strategy: Strategy::RoundRobin,
2455 circuit_recovery_time_ms: 50,
2456 ..Default::default()
2457 };
2458 let lb = Arc::new(LoadBalancer::new(config));
2459 // Two endpoints — RR alternates between them.
2460 for i in 1..=2 {
2461 lb.add_endpoint(Endpoint::new(make_node_id(i)));
2462 }
2463 let ctx = RequestContext::new();
2464
2465 // Trip both breakers. Default threshold is 5 consecutive
2466 // failures per endpoint.
2467 for _ in 0..5 {
2468 for i in 1..=2 {
2469 let nid = make_node_id(i);
2470 if let Some(ep) = lb.endpoints.get(&nid) {
2471 ep.try_record_request(u32::MAX);
2472 }
2473 lb.record_completion(&nid, false);
2474 }
2475 }
2476 assert!(lb.select(&ctx).is_err(), "both breakers open pre-recovery");
2477
2478 // Wait past the recovery window so both endpoints admit a
2479 // probe.
2480 thread::sleep(Duration::from_millis(75));
2481
2482 // Race two threads. With RR + 2 endpoints, each thread
2483 // should pick a different endpoint, claim its own probe,
2484 // and succeed. Pre-fix or post-fix, both succeed — but a
2485 // mis-scoped "strict gate" (e.g., a global probe flag
2486 // instead of per-endpoint) would let only one through.
2487 let barrier = Arc::new(Barrier::new(2));
2488 let mut handles = Vec::with_capacity(2);
2489 for _ in 0..2 {
2490 let lb = Arc::clone(&lb);
2491 let barrier = Arc::clone(&barrier);
2492 handles.push(thread::spawn(move || {
2493 let ctx = RequestContext::new();
2494 barrier.wait();
2495 lb.select(&ctx).ok().map(|s| s.node_id)
2496 }));
2497 }
2498 let picks: Vec<NodeId> = handles
2499 .into_iter()
2500 .filter_map(|h| h.join().unwrap())
2501 .collect();
2502
2503 assert_eq!(
2504 picks.len(),
2505 2,
2506 "both selectors must succeed against distinct endpoints \
2507 (probes are per-endpoint, not global). Got picks: {:?}",
2508 picks
2509 );
2510 assert_ne!(
2511 picks[0], picks[1],
2512 "the two probes must land on different endpoints — \
2513 same-endpoint admission would mean strict gate failed to \
2514 keep one selector out, OR RR selection raced strangely. \
2515 Picks: {:?}",
2516 picks
2517 );
2518
2519 // Both endpoints should now have their probe slots claimed.
2520 for i in 1..=2 {
2521 let ep = lb.endpoints.get(&make_node_id(i)).unwrap();
2522 assert!(
2523 ep.half_open_probe.load(Ordering::Acquire),
2524 "endpoint {} probe slot must be claimed (one probe per endpoint)",
2525 i
2526 );
2527 }
2528 }
2529
2530 /// CR-19: the `ProbeGuard` Drop must release the
2531 /// half-open probe slot when the guard is dropped without
2532 /// committing. We construct an `EndpointState`, manually
2533 /// claim the probe via the guard API, drop the guard, and
2534 /// verify the slot returned to false.
2535 #[test]
2536 fn cr19_probe_guard_drop_releases_probe_slot() {
2537 let ep = EndpointState::new(Endpoint::new(make_node_id(0xCA)));
2538 // Pre: slot is open.
2539 assert!(!ep.half_open_probe.load(Ordering::Acquire));
2540
2541 let guard = ep
2542 .try_claim_half_open_probe()
2543 .expect("first claim must succeed");
2544 // Probe slot is now claimed.
2545 assert!(ep.half_open_probe.load(Ordering::Acquire));
2546
2547 // Drop without commit: slot must roll back.
2548 drop(guard);
2549 assert!(
2550 !ep.half_open_probe.load(Ordering::Acquire),
2551 "CR-19 regression: ProbeGuard Drop must release the probe slot"
2552 );
2553
2554 // Subsequent claim succeeds — slot is reusable.
2555 let _g = ep
2556 .try_claim_half_open_probe()
2557 .expect("post-Drop reclaim must succeed");
2558 }
2559
2560 /// CR-19: `commit()` must SUPPRESS the Drop release. The
2561 /// committed claim survives the guard going out of scope —
2562 /// `record_completion` is then the path that clears it.
2563 #[test]
2564 fn cr19_probe_guard_commit_suppresses_release() {
2565 let ep = EndpointState::new(Endpoint::new(make_node_id(0xBE)));
2566 let guard = ep
2567 .try_claim_half_open_probe()
2568 .expect("first claim must succeed");
2569 guard.commit();
2570 // Slot remains claimed because commit() ran mem::forget.
2571 assert!(
2572 ep.half_open_probe.load(Ordering::Acquire),
2573 "CR-19 regression: ProbeGuard::commit must SUPPRESS Drop release"
2574 );
2575 // A second claim must fail because the slot is taken.
2576 assert!(
2577 ep.try_claim_half_open_probe().is_none(),
2578 "second claim must fail while the first is committed"
2579 );
2580 }
2581
2582 /// CR-19: panic between claim and commit MUST release the
2583 /// slot via Drop. We use `catch_unwind` to confirm the slot
2584 /// rolls back even when the path between claim and the
2585 /// would-be commit unwinds.
2586 #[test]
2587 fn cr19_panic_between_claim_and_commit_releases_probe_slot() {
2588 use std::panic::{catch_unwind, AssertUnwindSafe};
2589
2590 let ep = EndpointState::new(Endpoint::new(make_node_id(0xF0)));
2591 let result = catch_unwind(AssertUnwindSafe(|| {
2592 let _guard = ep
2593 .try_claim_half_open_probe()
2594 .expect("first claim must succeed");
2595 // Simulate a panic on the path between claim and
2596 // commit — exactly what a future-cancel or in-flight
2597 // panic looks like.
2598 panic!("simulated mid-path failure");
2599 }));
2600
2601 assert!(result.is_err(), "the closure must have panicked");
2602 assert!(
2603 !ep.half_open_probe.load(Ordering::Acquire),
2604 "CR-19 regression: panic between claim and commit MUST roll \
2605 back the probe slot via ProbeGuard::drop"
2606 );
2607 }
2608
2609 /// Pin: `select_weighted_round_robin_at` must use the
2610 /// NaN-safe guard `!(total_weight > 0.0)` rather than
2611 /// `total_weight <= 0.0`. NaN compares unequal to everything
2612 /// (including itself), so `NaN <= 0.0` is `false` — the
2613 /// gate falls through to the weighted path where
2614 /// `total_weight.ceil() as u64` produces an undefined
2615 /// (saturating) cast and the cumulative loop never exceeds
2616 /// NaN, biasing every selection to `endpoints[0]`. The
2617 /// negated-greater check catches NaN as well as zero/negative.
2618 ///
2619 /// This is a tripwire: a "simplification" PR that flips the
2620 /// guard back to `<= 0.0` would silently re-introduce the
2621 /// bias whenever any future code path produces a NaN
2622 /// effective weight (e.g. an f64 `weight` field). The pin
2623 /// is scoped to the round-robin function body — the random
2624 /// path elsewhere in this file is governed by its own
2625 /// guard and is not part of this regression.
2626 #[test]
2627 fn weighted_round_robin_guard_must_be_nan_safe() {
2628 let src = include_str!("loadbalance.rs");
2629
2630 // Locate the function header and the next `fn ` after
2631 // it; everything between is the body we pin.
2632 let header = "fn select_weighted_round_robin_at(";
2633 let start = src
2634 .find(header)
2635 .expect("select_weighted_round_robin_at must exist");
2636 // `find` from the next character so we don't match the
2637 // header itself.
2638 let body_start = start + header.len();
2639 let next_fn = src[body_start..]
2640 .find("\n fn ")
2641 .expect("a following fn must exist (mod-private impl block)")
2642 + body_start;
2643 let body = &src[start..next_fn];
2644
2645 // Strip line comments (everything from `//` to EOL) so a
2646 // doc comment that *describes* the rejected pattern
2647 // doesn't trip the negative assertion below.
2648 let body_no_comments: String = body
2649 .lines()
2650 .map(|l| match l.find("//") {
2651 Some(idx) => &l[..idx],
2652 None => l,
2653 })
2654 .collect::<Vec<_>>()
2655 .join("\n");
2656
2657 assert!(
2658 body_no_comments.contains("!(total_weight > 0.0)"),
2659 "regression: select_weighted_round_robin_at must use the \
2660 NaN-safe guard `!(total_weight > 0.0)`. Without it a NaN \
2661 total_weight (introduced by a future f64 weight path) \
2662 falls through to the weighted code, biasing every \
2663 selection onto endpoints[0]."
2664 );
2665
2666 // Also assert the buggy form is gone from THIS function
2667 // body. The NaN-safe form does not contain `<= 0.0`, so
2668 // this should fail iff someone reverts the guard.
2669 assert!(
2670 !body_no_comments.contains("total_weight <= 0.0"),
2671 "regression: select_weighted_round_robin_at must not \
2672 use the NaN-unsafe guard `total_weight <= 0.0`."
2673 );
2674 }
2675
2676 /// CR-21: pin that this module's `random_usize`
2677 /// uses the abort-on-fail pattern, NOT `expect()` or
2678 /// `.unwrap()`. A getrandom panic here would unwind across
2679 /// any `extern "C"` FFI frame that called into the load-
2680 /// balance layer — undefined behaviour.
2681 #[test]
2682 fn cr21_random_usize_must_not_panic_on_getrandom_failure() {
2683 let needle_expect = format!("getrandom::fill({}{})", "&mut bytes).", "expect");
2684 let needle_unwrap = format!("getrandom::fill({}{})", "&mut bytes).", "unwrap");
2685
2686 let src = include_str!("loadbalance.rs");
2687 for (lineno, line) in src.lines().enumerate() {
2688 let trimmed = line.trim_start();
2689 if trimmed.starts_with("//") {
2690 continue;
2691 }
2692 assert!(
2693 !trimmed.contains(&needle_expect),
2694 "CR-21 regression: getrandom::fill(...).expect(...) reintroduced \
2695 at loadbalance.rs:{}.\n line: {}",
2696 lineno + 1,
2697 line
2698 );
2699 assert!(
2700 !trimmed.contains(&needle_unwrap),
2701 "CR-21 regression: getrandom::fill(...).unwrap() reintroduced \
2702 at loadbalance.rs:{}.\n line: {}",
2703 lineno + 1,
2704 line
2705 );
2706 }
2707 }
2708
2709 // ---------- Untested strategy coverage ----------
2710 //
2711 // Existing tests cover RoundRobin, WeightedRoundRobin,
2712 // LeastConnections, and WeightedLeastConnections. The
2713 // remaining strategies — Random, WeightedRandom,
2714 // ConsistentHash, PowerOfTwo, Adaptive — share the
2715 // hot path and a regression in any of them would silently
2716 // mis-route requests. Each test below pins the selection
2717 // reason so a future refactor that swaps strategies under
2718 // the same enum variant gets caught.
2719
2720 fn three_endpoint_lb(strategy: Strategy) -> LoadBalancer {
2721 let lb = LoadBalancer::with_strategy(strategy);
2722 lb.add_endpoint(Endpoint::new(make_node_id(1)));
2723 lb.add_endpoint(Endpoint::new(make_node_id(2)));
2724 lb.add_endpoint(Endpoint::new(make_node_id(3)));
2725 lb
2726 }
2727
2728 #[test]
2729 fn random_strategy_selects_among_all_endpoints_with_random_reason() {
2730 let lb = three_endpoint_lb(Strategy::Random);
2731 let ctx = RequestContext::new();
2732 let mut seen = std::collections::HashSet::new();
2733 for _ in 0..200 {
2734 let s = lb.select(&ctx).unwrap();
2735 assert_eq!(s.reason, SelectionReason::Random);
2736 seen.insert(s.node_id[0]);
2737 }
2738 // 200 draws across 3 endpoints. We only assert "more than
2739 // one distinct endpoint was selected" — strong enough to
2740 // catch a regression that hard-codes a single index, weak
2741 // enough to never flake on legitimate RNG outcomes. A
2742 // stricter `== 3` check would be ~1e-35 likely to fail
2743 // under a uniform RNG; this version is exactly zero.
2744 assert!(
2745 seen.len() >= 2,
2746 "Random strategy collapsed to a single endpoint over 200 draws: {seen:?}",
2747 );
2748 }
2749
2750 #[test]
2751 fn weighted_random_respects_relative_weights() {
2752 let lb = LoadBalancer::with_strategy(Strategy::WeightedRandom);
2753 lb.add_endpoint(Endpoint::new(make_node_id(1)).with_weight(10));
2754 lb.add_endpoint(Endpoint::new(make_node_id(2)).with_weight(100));
2755 let ctx = RequestContext::new();
2756 let mut counts = std::collections::HashMap::new();
2757 for _ in 0..400 {
2758 let s = lb.select(&ctx).unwrap();
2759 assert_eq!(s.reason, SelectionReason::Weighted);
2760 *counts.entry(s.node_id[0]).or_insert(0) += 1;
2761 }
2762 // Heavy weight (100) must dominate the light weight (10).
2763 // Allow a wide margin — we're not testing the RNG quality,
2764 // just that the weight is read.
2765 let light = *counts.get(&1).unwrap_or(&0);
2766 let heavy = *counts.get(&2).unwrap_or(&0);
2767 assert!(
2768 heavy > light * 3,
2769 "weighted-random ignored weights: light={light}, heavy={heavy}",
2770 );
2771 }
2772
2773 #[test]
2774 fn weighted_random_with_zero_total_weight_falls_back_to_uniform_random() {
2775 let lb = LoadBalancer::with_strategy(Strategy::WeightedRandom);
2776 // Both endpoints with weight 0 — `total_weight <= 0.0`
2777 // forces the fallback path inside select_weighted_random.
2778 lb.add_endpoint(Endpoint::new(make_node_id(1)).with_weight(0));
2779 lb.add_endpoint(Endpoint::new(make_node_id(2)).with_weight(0));
2780 let ctx = RequestContext::new();
2781
2782 // Must not panic; must return a real selection. The reason
2783 // is `Random` because the implementation delegates to
2784 // `select_random` when total_weight is non-positive.
2785 let s = lb.select(&ctx).unwrap();
2786 assert_eq!(s.reason, SelectionReason::Random);
2787 }
2788
2789 #[test]
2790 fn consistent_hash_returns_same_endpoint_for_same_routing_key() {
2791 let lb = three_endpoint_lb(Strategy::ConsistentHash);
2792 let ctx = RequestContext::new().with_routing_key("user-42");
2793
2794 let s1 = lb.select(&ctx).unwrap();
2795 for _ in 0..50 {
2796 let s = lb.select(&ctx).unwrap();
2797 assert_eq!(s.node_id, s1.node_id, "consistent-hash diverged");
2798 }
2799 }
2800
2801 #[test]
2802 fn power_of_two_returns_powerof_two_reason() {
2803 let lb = three_endpoint_lb(Strategy::PowerOfTwo);
2804 let ctx = RequestContext::new();
2805 let s = lb.select(&ctx).unwrap();
2806 assert_eq!(s.reason, SelectionReason::PowerOfTwo);
2807 }
2808
2809 #[test]
2810 fn adaptive_strategy_selects_an_endpoint() {
2811 // Adaptive picks between strategies based on average load;
2812 // with default (no metrics) all endpoints score 0 so it
2813 // takes the low-load branch. Pin that the strategy runs
2814 // and returns a valid selection.
2815 let lb = three_endpoint_lb(Strategy::Adaptive);
2816 let ctx = RequestContext::new();
2817 let s = lb.select(&ctx).unwrap();
2818 assert!(matches!(s.node_id[0], 1..=3));
2819 }
2820
2821 // ---------- endpoints() snapshot ----------
2822
2823 #[test]
2824 fn endpoints_snapshot_reflects_added_endpoints() {
2825 let lb = LoadBalancer::with_strategy(Strategy::RoundRobin);
2826 lb.add_endpoint(Endpoint::new(make_node_id(1)).with_weight(50));
2827 lb.add_endpoint(Endpoint::new(make_node_id(2)).with_weight(75));
2828
2829 let snapshot = lb.endpoints();
2830 assert_eq!(snapshot.len(), 2);
2831 // Order isn't guaranteed (DashMap iteration) — assert
2832 // by node_id membership rather than position.
2833 let weights: std::collections::HashMap<u8, u32> =
2834 snapshot.iter().map(|e| (e.node_id[0], e.weight)).collect();
2835 assert_eq!(weights.get(&1), Some(&50));
2836 assert_eq!(weights.get(&2), Some(&75));
2837 }
2838
2839 /// The endpoint snapshot (iterated by select/stats/count) must be rebuilt
2840 /// on remove: counts drop and the removed endpoint is never selected.
2841 #[test]
2842 fn removing_an_endpoint_updates_snapshot_and_stops_selection() {
2843 let lb = LoadBalancer::with_strategy(Strategy::RoundRobin);
2844 for i in 1..=3u8 {
2845 lb.add_endpoint(Endpoint::new(make_node_id(i)));
2846 }
2847 assert_eq!(lb.endpoint_count(), 3);
2848 assert_eq!(lb.stats().active_endpoints, 3);
2849
2850 lb.remove_endpoint(&make_node_id(2));
2851 assert_eq!(lb.endpoint_count(), 2, "count must drop after remove");
2852 assert_eq!(lb.stats().active_endpoints, 2);
2853 assert!(
2854 lb.endpoints().iter().all(|e| e.node_id != make_node_id(2)),
2855 "removed endpoint must be gone from the snapshot"
2856 );
2857
2858 // The removed endpoint must never be selected.
2859 let ctx = RequestContext::new();
2860 for _ in 0..50 {
2861 let sel = lb.select(&ctx).unwrap();
2862 assert_ne!(
2863 sel.node_id,
2864 make_node_id(2),
2865 "removed endpoint must not be selected"
2866 );
2867 lb.record_completion(&sel.node_id, true);
2868 }
2869 }
2870
2871 /// Concurrent membership changes must leave `endpoint_list` (read by
2872 /// select/stats/count) exactly consistent with the authoritative
2873 /// `endpoints` map. Pre-fix, a rebuild that observed the map before a
2874 /// concurrent mutation could store its stale snapshot last, dropping a
2875 /// just-added endpoint from rotation (or resurrecting a removed one).
2876 #[test]
2877 fn concurrent_membership_changes_keep_snapshot_consistent() {
2878 use std::collections::HashSet;
2879 use std::sync::Arc as StdArc;
2880
2881 let lb = StdArc::new(LoadBalancer::with_strategy(Strategy::RoundRobin));
2882 let n: u8 = 64;
2883
2884 // Concurrent adds.
2885 let mut handles = Vec::new();
2886 for i in 1..=n {
2887 let lb = StdArc::clone(&lb);
2888 handles.push(std::thread::spawn(move || {
2889 lb.add_endpoint(Endpoint::new(make_node_id(i)));
2890 }));
2891 }
2892 for h in handles {
2893 h.join().unwrap();
2894 }
2895
2896 // The snapshot must match the authoritative map exactly — not a
2897 // single add may be lost to a stale rebuild.
2898 assert_eq!(lb.endpoint_count(), n as usize);
2899 assert_eq!(lb.endpoint_count(), lb.endpoints.len());
2900 let snap: HashSet<_> = lb.endpoints().iter().map(|e| e.node_id).collect();
2901 assert_eq!(
2902 snap.len(),
2903 n as usize,
2904 "every added endpoint must appear in the snapshot"
2905 );
2906
2907 // Concurrent removes (the even ids) must stay consistent too.
2908 let mut handles = Vec::new();
2909 for i in (2..=n).step_by(2) {
2910 let lb = StdArc::clone(&lb);
2911 handles.push(std::thread::spawn(move || {
2912 lb.remove_endpoint(&make_node_id(i));
2913 }));
2914 }
2915 for h in handles {
2916 h.join().unwrap();
2917 }
2918 assert_eq!(lb.endpoint_count(), lb.endpoints.len());
2919 assert_eq!(lb.endpoint_count(), (n / 2) as usize);
2920 assert!(
2921 lb.endpoints().iter().all(|e| {
2922 let raw = e.node_id;
2923 // Odd ids only should remain.
2924 lb.endpoints.contains_key(&raw)
2925 }),
2926 "snapshot must contain only live endpoints"
2927 );
2928 }
2929
2930 /// A snapshot taken before a removal still holds the endpoint's `Arc`
2931 /// (exactly what an in-flight `select()` iterates). After removal that
2932 /// endpoint must report unavailable through the *stale* snapshot — so the
2933 /// strategy filters it out instead of selecting a gone endpoint and
2934 /// burning a reservation retry into a false `NoEndpointsAvailable`.
2935 #[test]
2936 fn removed_endpoint_is_unavailable_through_a_stale_snapshot() {
2937 let lb = LoadBalancer::with_strategy(Strategy::RoundRobin);
2938 lb.add_endpoint(Endpoint::new(make_node_id(1)));
2939 lb.add_endpoint(Endpoint::new(make_node_id(2)));
2940
2941 // Capture the snapshot BEFORE removal — holds Arcs to both endpoints.
2942 let stale = lb.endpoint_list.load_full();
2943 assert_eq!(stale.len(), 2);
2944
2945 lb.remove_endpoint(&make_node_id(1));
2946
2947 // Through the stale snapshot, the removed endpoint is now unavailable;
2948 // the survivor stays available.
2949 for state in stale.iter() {
2950 if state.node_id == make_node_id(1) {
2951 assert!(
2952 !state.is_available(),
2953 "removed endpoint must be unavailable via the stale snapshot"
2954 );
2955 } else {
2956 assert!(
2957 state.is_available(),
2958 "surviving endpoint must remain available"
2959 );
2960 }
2961 }
2962 }
2963
2964 // ---------- LoadBalancerError Display ----------
2965
2966 #[test]
2967 fn load_balancer_error_display_covers_every_variant() {
2968 let id = make_node_id(7);
2969 assert_eq!(
2970 format!("{}", LoadBalancerError::NoEndpointsAvailable),
2971 "no endpoints available"
2972 );
2973 assert_eq!(
2974 format!("{}", LoadBalancerError::AllEndpointsUnhealthy),
2975 "all endpoints unhealthy"
2976 );
2977 assert_eq!(
2978 format!("{}", LoadBalancerError::NoMatchingEndpoints),
2979 "no endpoints match required tags"
2980 );
2981 assert!(format!("{}", LoadBalancerError::EndpointNotFound(id))
2982 .starts_with("endpoint not found:"));
2983 assert!(format!("{}", LoadBalancerError::CircuitOpen(id))
2984 .starts_with("circuit breaker open for:"));
2985 assert!(format!("{}", LoadBalancerError::MaxConnectionsReached(id))
2986 .starts_with("max connections reached for:"));
2987 }
2988
2989 /// Finding #14: a half-open probe claimed at selection time but
2990 /// never completed (the production `GroupCoordinator::route_event`
2991 /// path calls `select` and never `record_completion`) must NOT pin
2992 /// the recovered endpoint out of rotation forever. The watchdog
2993 /// reclaims the abandoned slot once it has been held past the
2994 /// recovery window, so a later selection is admitted as a fresh
2995 /// probe.
2996 #[test]
2997 fn cr14_abandoned_half_open_probe_is_reclaimed_after_recovery_window() {
2998 let config = LoadBalancerConfig {
2999 strategy: Strategy::RoundRobin,
3000 circuit_recovery_time_ms: 50,
3001 ..Default::default()
3002 };
3003 let lb = LoadBalancer::new(config);
3004 lb.add_endpoint(Endpoint::new(make_node_id(1)));
3005 let ctx = RequestContext::new();
3006
3007 // Trip the breaker (5 consecutive failures).
3008 for _ in 0..5 {
3009 let sel = lb.select(&ctx).expect("admitted before trip");
3010 lb.record_completion(&sel.node_id, false);
3011 }
3012 assert!(lb.select(&ctx).is_err(), "open breaker must reject");
3013
3014 // Wait past the recovery window so the next selection admits
3015 // the probe.
3016 std::thread::sleep(Duration::from_millis(75));
3017
3018 // Claim the probe via select() but DO NOT record completion —
3019 // this models a caller that drops the selection (route_event).
3020 let probe = lb
3021 .select(&ctx)
3022 .expect("first request after recovery is the probe");
3023 let ep = lb.endpoints.get(&probe.node_id).unwrap();
3024 assert!(
3025 ep.half_open_probe.load(Ordering::Acquire),
3026 "probe slot is claimed right after selection"
3027 );
3028 drop(ep);
3029
3030 // Immediately, the slot is fresh — another selector is still
3031 // (correctly) rejected.
3032 assert!(
3033 lb.select(&ctx).is_err(),
3034 "a freshly-claimed probe still gates concurrent selectors"
3035 );
3036
3037 // Let the abandoned probe age past a full recovery window.
3038 std::thread::sleep(Duration::from_millis(75));
3039
3040 // The watchdog must reclaim the stranded slot: a later
3041 // selection is admitted again rather than rejected forever.
3042 let reclaimed = lb.select(&ctx);
3043 assert!(
3044 reclaimed.is_ok(),
3045 "an abandoned half-open probe (no record_completion) must be \
3046 reclaimed after the recovery window so the recovered endpoint \
3047 returns to rotation — pre-fix the bare bool pinned it out forever"
3048 );
3049 }
3050
3051 /// Follow-up to cr14: a `circuit_recovery_time_ms == 0` config must
3052 /// not collapse the breaker. Pre-fix, a zero recovery window made
3053 /// `open_time.elapsed() < ZERO` always false (the breaker never held
3054 /// open) and `claimed_at.elapsed() >= ZERO` always true (every probe
3055 /// judged abandoned), so a tripped breaker admitted instantly and the
3056 /// single-probe half-open gate vanished. The `>= 1ms` clamp keeps the
3057 /// breaker shut inside its (tiny) recovery window.
3058 #[test]
3059 fn cr14b_zero_recovery_time_does_not_collapse_breaker() {
3060 let config = LoadBalancerConfig {
3061 strategy: Strategy::RoundRobin,
3062 circuit_recovery_time_ms: 0,
3063 ..Default::default()
3064 };
3065 let lb = LoadBalancer::new(config);
3066 lb.add_endpoint(Endpoint::new(make_node_id(1)));
3067 let ctx = RequestContext::new();
3068
3069 // Trip the breaker (5 consecutive failures).
3070 for _ in 0..5 {
3071 let sel = lb.select(&ctx).expect("admitted before trip");
3072 lb.record_completion(&sel.node_id, false);
3073 }
3074 // Immediately after tripping — microseconds later, inside the
3075 // clamped 1ms recovery window — the breaker must still reject.
3076 // Pre-fix the 0 window admitted instantly, collapsing the gate
3077 // and letting unbounded concurrent probes hit a failing endpoint.
3078 assert!(
3079 lb.select(&ctx).is_err(),
3080 "a 0 recovery-time config must not collapse the open breaker \
3081 into instant admission"
3082 );
3083 }
3084
3085 /// Finding #15: re-adding an already-present endpoint must NOT
3086 /// leak its previous hash-ring vnodes. Pre-fix `add_endpoint`
3087 /// inserted a fresh ~`virtual_nodes` set without removing the
3088 /// node's existing vnodes, leaking ~150 ring entries per re-add.
3089 #[test]
3090 fn cr15_readd_endpoint_does_not_leak_hash_ring_vnodes() {
3091 let lb = LoadBalancer::with_strategy(Strategy::ConsistentHash);
3092 let node = make_node_id(1);
3093
3094 lb.add_endpoint(Endpoint::new(node));
3095 let after_first = lb.hash_ring.len();
3096 assert_eq!(
3097 after_first, lb.virtual_nodes as usize,
3098 "a fresh add must create exactly virtual_nodes vnodes"
3099 );
3100
3101 // Re-add the same node several times (reconnect / weight change).
3102 for _ in 0..5 {
3103 lb.add_endpoint(Endpoint::new(node).with_weight(200));
3104 }
3105
3106 assert_eq!(
3107 lb.hash_ring.len(),
3108 lb.virtual_nodes as usize,
3109 "re-adding the same node must not leak stale vnodes — the ring \
3110 size must stay at virtual_nodes, not grow by ~150 per re-add"
3111 );
3112
3113 // Every ring entry must still resolve to this node.
3114 assert!(
3115 lb.hash_ring.iter().all(|e| *e.value() == node),
3116 "all vnodes must belong to the re-added node"
3117 );
3118 }
3119
3120 /// Finding #29: the hash-ring collision probe must be
3121 /// NON-DESTRUCTIVE. Pre-fix `while insert(..).is_some()`
3122 /// overwrote an occupied slot (clobbering another node's vnode)
3123 /// before probing onward. We force a guaranteed collision by
3124 /// pre-occupying every slot `add_to_hash_ring` would target for a
3125 /// node and assert the existing occupants survive.
3126 #[test]
3127 fn cr29_hash_ring_collision_probe_preserves_existing_occupant() {
3128 let lb = LoadBalancer::with_strategy(Strategy::ConsistentHash);
3129 let victim = make_node_id(0xAA);
3130 let newcomer = make_node_id(0xBB);
3131
3132 // Pre-occupy, with `victim`, every slot that add_to_hash_ring
3133 // will hash to for `newcomer` (so EVERY vnode insert collides).
3134 for i in 0..lb.virtual_nodes {
3135 let key = format!("{:?}-{}", newcomer, i);
3136 let hash = lb.hash_key(&key);
3137 lb.hash_ring.insert(hash, victim);
3138 }
3139 let victim_slots_before = lb.hash_ring.len();
3140 assert_eq!(victim_slots_before, lb.virtual_nodes as usize);
3141
3142 // Now add the newcomer — every primary slot collides with a
3143 // victim vnode and must be linear-probed past, NOT clobbered.
3144 // Fresh add (newcomer was never present) => was_present = false.
3145 lb.add_to_hash_ring(newcomer, false);
3146
3147 // None of the victim's vnodes may have been overwritten.
3148 let victim_count = lb.hash_ring.iter().filter(|e| *e.value() == victim).count();
3149 assert_eq!(
3150 victim_count, lb.virtual_nodes as usize,
3151 "the collision probe must preserve the existing occupant's vnodes \
3152 (pre-fix destructive insert clobbered them)"
3153 );
3154 // The newcomer still gets its full vnode allotment.
3155 let newcomer_count = lb
3156 .hash_ring
3157 .iter()
3158 .filter(|e| *e.value() == newcomer)
3159 .count();
3160 assert_eq!(
3161 newcomer_count, lb.virtual_nodes as usize,
3162 "the newcomer must still get its full vnode allotment via probing"
3163 );
3164 }
3165
3166 /// Finding #33: weighted-round-robin must not starve endpoints
3167 /// when every effective weight is sub-unit. Two `Degraded`
3168 /// endpoints (weight 1 × 0.5 multiplier = 0.5 effective) sum to
3169 /// `total_weight == 1.0`; pre-fix `total_ceil = ceil(1.0) = 1`
3170 /// made `target == 0` always and the cumulative loop always
3171 /// picked the first endpoint, starving the second.
3172 #[test]
3173 fn cr33_weighted_rr_does_not_starve_sub_unit_weights() {
3174 let lb = LoadBalancer::with_strategy(Strategy::WeightedRoundRobin);
3175 lb.add_endpoint(Endpoint::new(make_node_id(1)).with_weight(1));
3176 lb.add_endpoint(Endpoint::new(make_node_id(2)).with_weight(1));
3177
3178 // Degrade both so effective weight is 0.5 each (total 1.0).
3179 lb.update_health(&make_node_id(1), HealthStatus::Degraded);
3180 lb.update_health(&make_node_id(2), HealthStatus::Degraded);
3181
3182 let ctx = RequestContext::new();
3183 let mut counts = std::collections::HashMap::new();
3184 for _ in 0..200 {
3185 let sel = lb.select(&ctx).expect("an endpoint must be selectable");
3186 lb.record_completion(&sel.node_id, true);
3187 *counts.entry(sel.node_id[0]).or_insert(0_u32) += 1;
3188 }
3189
3190 let first = *counts.get(&1).unwrap_or(&0);
3191 let second = *counts.get(&2).unwrap_or(&0);
3192 assert!(
3193 second > 0,
3194 "second sub-unit-weight endpoint must NOT be starved (got {first} \
3195 vs {second}) — pre-fix ceil collapsed the rotation to bucket 0",
3196 );
3197 // Equal effective weights → roughly even split.
3198 assert!(
3199 first > 0 && second > 0 && first.abs_diff(second) < 200 / 2,
3200 "two equal sub-unit weights should split roughly evenly \
3201 (got {first} vs {second})",
3202 );
3203 }
3204
3205 /// Review follow-up to #33: deriving the wheel from the weights
3206 /// (`ceil(total / min_weight)`) keeps EXACT integer ratios for small
3207 /// clusters — the wheel is the natural 3-cycle for a 2:1 split,
3208 /// yielding A,A,B. The replaced `WRR_MIN_WHEEL = 64` floor turned
3209 /// this into a 64-position approximation, reshaping the rotation for
3210 /// any cluster whose total weight was below the floor.
3211 #[test]
3212 fn cr33_weighted_rr_preserves_exact_integer_ratios() {
3213 let lb = LoadBalancer::with_strategy(Strategy::WeightedRoundRobin);
3214 let endpoints = vec![
3215 Arc::new(EndpointState::new(
3216 Endpoint::new(make_node_id(1)).with_weight(2),
3217 )),
3218 Arc::new(EndpointState::new(
3219 Endpoint::new(make_node_id(2)).with_weight(1),
3220 )),
3221 ];
3222 // Two full turns of the natural 3-position wheel.
3223 let picks: Vec<u8> = (0..6)
3224 .map(|c| lb.select_weighted_round_robin_at(&endpoints, c).node_id[0])
3225 .collect();
3226 assert_eq!(
3227 picks,
3228 vec![1, 1, 2, 1, 1, 2],
3229 "2:1 integer weights must cycle A,A,B exactly (got {picks:?})"
3230 );
3231 }
3232}