Skip to main content

sozu_lib/
backends.rs

1use std::{
2    cell::{Cell, RefCell},
3    collections::HashMap,
4    net::SocketAddr,
5    rc::Rc,
6    time::Duration,
7};
8
9use mio::net::TcpStream;
10use sozu_command::{
11    proto::command::{
12        Event, EventKind, HealthCheckConfig, LoadBalancingAlgorithms, LoadBalancingParams,
13        LoadMetric,
14    },
15    state::ClusterId,
16};
17
18use crate::metrics::names;
19use crate::{
20    PeakEWMA,
21    load_balancing::{
22        LeastLoaded, LoadBalancingAlgorithm, Maglev, PowerOfTwo, Random, Rendezvous, RoundRobin,
23    },
24    retry::{self, RetryPolicy},
25    server::{self, push_event},
26};
27
28#[derive(thiserror::Error, Debug)]
29pub enum BackendError {
30    #[error("No backend found for cluster {0}")]
31    NoBackendForCluster(String),
32    #[error("Failed to connect to socket with MIO: {0}")]
33    MioConnection(std::io::Error),
34    #[error("This backend is not in a normal status: status={0:?}")]
35    Status(BackendStatus),
36    #[error("could not connect {cluster_id} to {backend_address:?} ({failures} failures): {error}")]
37    ConnectionFailures {
38        cluster_id: String,
39        backend_address: SocketAddr,
40        failures: usize,
41        error: String,
42    },
43}
44
45#[derive(Debug, PartialEq, Eq, Clone)]
46pub enum BackendStatus {
47    Normal,
48    Closing,
49    Closed,
50}
51
52#[derive(Debug, PartialEq, Eq, Clone, Copy)]
53pub enum HealthStatus {
54    Healthy,
55    Unhealthy,
56}
57
58/// Per-cluster availability state, owned by `BackendList`. Flips between
59/// `Available` (≥1 backend can serve traffic) and `AllDown` (every backend
60/// fails the `health.is_healthy() && !retry_policy.is_down()` predicate)
61/// every time `BackendMap::record_cluster_availability` is invoked.
62/// Empty clusters never report `AllDown` to avoid log spam during cluster
63/// bootstrap.
64#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
65pub(crate) enum ClusterAvailability {
66    #[default]
67    Available,
68    AllDown,
69}
70
71#[derive(Debug, Clone, PartialEq)]
72pub struct HealthState {
73    pub status: HealthStatus,
74    pub consecutive_successes: u32,
75    pub consecutive_failures: u32,
76}
77
78impl Default for HealthState {
79    fn default() -> Self {
80        HealthState {
81            status: HealthStatus::Healthy,
82            consecutive_successes: 0,
83            consecutive_failures: 0,
84        }
85    }
86}
87
88impl HealthState {
89    /// Record a successful health check. Returns true if the backend transitioned to healthy.
90    pub fn record_success(&mut self, healthy_threshold: u32) -> bool {
91        let was_unhealthy = self.status == HealthStatus::Unhealthy;
92        let successes_before = self.consecutive_successes;
93        self.consecutive_failures = 0;
94        self.consecutive_successes += 1;
95
96        // A success resets the failure streak and advances the success streak by
97        // exactly one — the two counters are never both non-zero afterwards.
98        debug_assert_eq!(
99            self.consecutive_failures, 0,
100            "a success must clear the consecutive-failure streak"
101        );
102        debug_assert_eq!(
103            self.consecutive_successes,
104            successes_before + 1,
105            "a success must advance the success streak by exactly one"
106        );
107
108        if was_unhealthy && self.consecutive_successes >= healthy_threshold {
109            self.status = HealthStatus::Healthy;
110            // The transition is only reported when crossing the threshold from
111            // Unhealthy; a backend that was already Healthy never "transitions".
112            debug_assert!(
113                self.status == HealthStatus::Healthy,
114                "a reported recovery must leave the status Healthy"
115            );
116            return true;
117        }
118        false
119    }
120
121    /// Record a failed health check. Returns true if the backend transitioned to unhealthy.
122    pub fn record_failure(&mut self, unhealthy_threshold: u32) -> bool {
123        let was_healthy = self.status == HealthStatus::Healthy;
124        let failures_before = self.consecutive_failures;
125        self.consecutive_successes = 0;
126        self.consecutive_failures += 1;
127
128        // A failure resets the success streak and advances the failure streak by
129        // exactly one — symmetric with `record_success`.
130        debug_assert_eq!(
131            self.consecutive_successes, 0,
132            "a failure must clear the consecutive-success streak"
133        );
134        debug_assert_eq!(
135            self.consecutive_failures,
136            failures_before + 1,
137            "a failure must advance the failure streak by exactly one"
138        );
139
140        if was_healthy && self.consecutive_failures >= unhealthy_threshold {
141            self.status = HealthStatus::Unhealthy;
142            debug_assert!(
143                self.status == HealthStatus::Unhealthy,
144                "a reported drop must leave the status Unhealthy"
145            );
146            return true;
147        }
148        false
149    }
150
151    pub fn is_healthy(&self) -> bool {
152        self.status == HealthStatus::Healthy
153    }
154}
155
156#[derive(Debug, PartialEq, Clone)]
157pub struct Backend {
158    pub sticky_id: Option<String>,
159    pub backend_id: String,
160    pub address: SocketAddr,
161    pub status: BackendStatus,
162    pub retry_policy: retry::RetryPolicyWrapper,
163    pub active_connections: usize,
164    pub active_requests: usize,
165    pub failures: usize,
166    pub load_balancing_parameters: Option<LoadBalancingParams>,
167    pub backup: bool,
168    pub connection_time: PeakEWMA,
169    pub health: HealthState,
170}
171
172impl Backend {
173    pub fn new(
174        backend_id: &str,
175        address: SocketAddr,
176        sticky_id: Option<String>,
177        load_balancing_parameters: Option<LoadBalancingParams>,
178        backup: Option<bool>,
179    ) -> Backend {
180        let desired_policy = retry::ExponentialBackoffPolicy::new(6);
181        Backend {
182            sticky_id,
183            backend_id: backend_id.to_owned(),
184            address,
185            status: BackendStatus::Normal,
186            retry_policy: desired_policy.into(),
187            active_connections: 0,
188            active_requests: 0,
189            failures: 0,
190            load_balancing_parameters,
191            backup: backup.unwrap_or(false),
192            connection_time: PeakEWMA::new(),
193            health: HealthState::default(),
194        }
195    }
196
197    pub fn set_closing(&mut self) {
198        self.status = BackendStatus::Closing;
199    }
200
201    pub fn retry_policy(&mut self) -> &mut retry::RetryPolicyWrapper {
202        &mut self.retry_policy
203    }
204
205    pub fn can_open(&self) -> bool {
206        if !self.health.is_healthy() {
207            return false;
208        }
209        if let Some(action) = self.retry_policy.can_try() {
210            self.status == BackendStatus::Normal && action == retry::RetryAction::OKAY
211        } else {
212            false
213        }
214    }
215
216    /// Canonical "available" check used by per-backend metrics and cluster
217    /// availability accounting. Slightly more permissive than `can_open()`:
218    /// a backend currently in an exponential-backoff *wait window* still
219    /// counts as available because the next call after the window ends
220    /// will route to it without operator intervention. The dashboard
221    /// reading must reflect "operationally up, not exhausted" rather than
222    /// "ready to receive *this* request" — flicking the gauge to 0 on
223    /// every transient backoff would drown out genuine `is_down()`
224    /// transitions. Pairs with `BackendList::evaluate_availability`,
225    /// which applies the same predicate cluster-wide.
226    pub fn is_available(&self) -> bool {
227        self.health.is_healthy()
228            && self.status == BackendStatus::Normal
229            && !self.retry_policy.is_down()
230    }
231
232    pub fn inc_connections(&mut self) -> Option<usize> {
233        let before = self.active_connections;
234        if self.status == BackendStatus::Normal {
235            self.active_connections += 1;
236            // A `Normal` backend always increments by exactly one and reports the
237            // post-increment count back to the caller.
238            debug_assert_eq!(
239                self.active_connections,
240                before + 1,
241                "inc_connections must add exactly one active connection"
242            );
243            Some(self.active_connections)
244        } else {
245            // Non-`Normal` backends refuse new connections and leave the gauge
246            // untouched (no silent increment on a Closing/Closed backend).
247            debug_assert_eq!(
248                self.active_connections, before,
249                "inc_connections must not touch the count for a non-Normal backend"
250            );
251            None
252        }
253    }
254
255    /// TODO: normalize with saturating_sub()
256    pub fn dec_connections(&mut self) -> Option<usize> {
257        let before = self.active_connections;
258        match self.status {
259            BackendStatus::Normal => {
260                if self.active_connections > 0 {
261                    self.active_connections -= 1;
262                }
263                // The count drops by one when positive, otherwise saturates at
264                // zero — it must never wrap below zero (usize underflow).
265                debug_assert!(
266                    self.active_connections <= before,
267                    "dec_connections must never increase the active-connection count"
268                );
269                debug_assert_eq!(
270                    self.active_connections,
271                    before.saturating_sub(1),
272                    "dec_connections must drop by exactly one (saturating at zero)"
273                );
274                Some(self.active_connections)
275            }
276            BackendStatus::Closed => {
277                // A Closed backend has already been retired: nothing to decrement.
278                debug_assert_eq!(
279                    self.active_connections, before,
280                    "dec_connections on a Closed backend must not mutate the count"
281                );
282                None
283            }
284            BackendStatus::Closing => {
285                if self.active_connections > 0 {
286                    self.active_connections -= 1;
287                }
288                debug_assert_eq!(
289                    self.active_connections,
290                    before.saturating_sub(1),
291                    "dec_connections must drop by exactly one (saturating at zero)"
292                );
293                if self.active_connections == 0 {
294                    self.status = BackendStatus::Closed;
295                    // Draining a Closing backend to zero retires it: the
296                    // lifecycle advances to Closed and we stop reporting a count.
297                    debug_assert_eq!(
298                        self.status,
299                        BackendStatus::Closed,
300                        "a fully drained Closing backend must become Closed"
301                    );
302                    None
303                } else {
304                    Some(self.active_connections)
305                }
306            }
307        }
308    }
309
310    pub fn set_connection_time(&mut self, dur: Duration) {
311        self.connection_time.observe(dur.as_nanos() as f64);
312    }
313
314    pub fn peak_ewma_connection(&mut self) -> f64 {
315        self.connection_time.get(self.active_connections)
316    }
317
318    pub fn try_connect(&mut self) -> Result<mio::net::TcpStream, BackendError> {
319        if self.status != BackendStatus::Normal {
320            return Err(BackendError::Status(self.status.to_owned()));
321        }
322        // Reaching the connect attempt implies we passed the status gate; the
323        // failure counter is whatever prior attempts accumulated.
324        debug_assert_eq!(
325            self.status,
326            BackendStatus::Normal,
327            "try_connect only attempts a connection on a Normal backend"
328        );
329        let failures_before = self.failures;
330        let connections_before = self.active_connections;
331
332        match mio::net::TcpStream::connect(self.address) {
333            Ok(tcp_stream) => {
334                //self.retry_policy.succeed();
335                self.inc_connections();
336                // Success registers exactly one new active connection and never
337                // touches the failure counter.
338                debug_assert_eq!(
339                    self.active_connections,
340                    connections_before + 1,
341                    "a successful connect must register exactly one active connection"
342                );
343                debug_assert_eq!(
344                    self.failures, failures_before,
345                    "a successful connect must not bump the failure counter"
346                );
347                Ok(tcp_stream)
348            }
349            Err(io_error) => {
350                self.retry_policy.fail();
351                self.failures += 1;
352                // A failed connect arms the retry policy and advances the
353                // failure counter by exactly one, leaving the connection gauge
354                // untouched (no connection was established).
355                debug_assert_eq!(
356                    self.failures,
357                    failures_before + 1,
358                    "a failed connect must advance the failure counter by exactly one"
359                );
360                debug_assert_eq!(
361                    self.active_connections, connections_before,
362                    "a failed connect must not register an active connection"
363                );
364                // TODO: handle EINPROGRESS. It is difficult. It is discussed here:
365                // https://docs.rs/mio/latest/mio/net/struct.TcpStream.html#method.connect
366                // with an example code here:
367                // https://github.com/Thomasdezeeuw/heph/blob/0c4f1ab3eaf08bea1d65776528bfd6114c9f8374/src/net/tcp/stream.rs#L560-L622
368                Err(BackendError::MioConnection(io_error))
369            }
370        }
371    }
372}
373
374// when a backend has been removed from configuration and the last connection to
375// it has stopped, it will be dropped, so we can notify that the backend server
376// can be safely stopped
377impl std::ops::Drop for Backend {
378    fn drop(&mut self) {
379        server::push_event(Event {
380            kind: EventKind::RemovedBackendHasNoConnections as i32,
381            backend_id: Some(self.backend_id.to_owned()),
382            address: Some(self.address.into()),
383            cluster_id: None,
384            metric_detail: None,
385        });
386    }
387}
388
389#[derive(Debug)]
390pub struct BackendMap {
391    pub backends: HashMap<ClusterId, BackendList>,
392    pub max_failures: usize,
393    pub health_check_configs: HashMap<ClusterId, HealthCheckConfig>,
394    /// Whether the cluster's backends speak HTTP/2 (cluster.http2 = true).
395    /// Mirrors the same backend-capability hint the mux router reads at
396    /// `protocol/mux/router.rs::Router::connect`. The health checker uses
397    /// it to switch the probe wire format from HTTP/1.1 to h2c so an
398    /// h2c-only backend is not probed with an HTTP/1.1 preface that
399    /// would always fail.
400    pub cluster_http2: HashMap<ClusterId, bool>,
401}
402
403impl Default for BackendMap {
404    fn default() -> Self {
405        Self::new()
406    }
407}
408
409impl BackendMap {
410    pub fn new() -> BackendMap {
411        BackendMap {
412            backends: HashMap::new(),
413            max_failures: 3,
414            health_check_configs: HashMap::new(),
415            cluster_http2: HashMap::new(),
416        }
417    }
418
419    /// Re-evaluate the availability of `cluster_id`, publish the
420    /// `cluster.available_backends` / `cluster.total_backends` gauges,
421    /// and emit the transition log + counter + `Event` exactly when the
422    /// per-cluster state flips between `Available` and `AllDown`.
423    ///
424    /// Empty clusters (`total == 0`) never report `AllDown` — avoids log
425    /// spam during cluster bootstrap when backends are still being
426    /// registered. The (0, 0) gauges are still published so dashboards
427    /// see "cluster exists, zero backends configured" as a distinct
428    /// state from "cluster doesn't exist".
429    ///
430    /// Takes `&self` so callers that already hold `&mut BackendMap`
431    /// can drop their `&mut BackendList` borrow before invoking it
432    /// without re-borrowing.
433    pub(crate) fn record_cluster_availability(&self, cluster_id: &str) {
434        let Some(list) = self.backends.get(cluster_id) else {
435            return;
436        };
437
438        let (available, total) = list.evaluate_availability();
439        // A subset count can never exceed the whole, and it must match the
440        // live backend vector length the helper just walked.
441        debug_assert!(
442            available <= total,
443            "available backends ({available}) cannot exceed total ({total})"
444        );
445        debug_assert_eq!(
446            total,
447            list.backends.len(),
448            "total must equal the number of registered backends"
449        );
450        gauge!(
451            names::cluster::AVAILABLE_BACKENDS,
452            available,
453            Some(cluster_id),
454            None
455        );
456        gauge!(
457            names::cluster::TOTAL_BACKENDS,
458            total,
459            Some(cluster_id),
460            None
461        );
462
463        let new_state = if total > 0 && available == 0 {
464            ClusterAvailability::AllDown
465        } else {
466            ClusterAvailability::Available
467        };
468        // Empty clusters never report AllDown (avoids bootstrap log spam); a
469        // cluster with at least one available backend is always Available.
470        debug_assert!(
471            !(total == 0 && new_state == ClusterAvailability::AllDown),
472            "an empty cluster must never be reported AllDown"
473        );
474        debug_assert!(
475            !(available > 0 && new_state == ClusterAvailability::AllDown),
476            "a cluster with an available backend must not be AllDown"
477        );
478
479        let prev = list.availability.replace(new_state);
480        // The cell now holds exactly the freshly computed state.
481        debug_assert_eq!(
482            list.availability.get(),
483            new_state,
484            "the availability cell must latch the newly computed state"
485        );
486        if prev == new_state {
487            return;
488        }
489        match (prev, new_state) {
490            (ClusterAvailability::Available, ClusterAvailability::AllDown) => {
491                error!("cluster {}: all {} backends are down", cluster_id, total);
492                incr!(
493                    names::cluster::NO_AVAILABLE_BACKENDS,
494                    Some(cluster_id),
495                    None
496                );
497                push_event(Event {
498                    kind: EventKind::NoAvailableBackends as i32,
499                    cluster_id: Some(cluster_id.to_owned()),
500                    backend_id: None,
501                    address: None,
502                    metric_detail: None,
503                });
504            }
505            (ClusterAvailability::AllDown, ClusterAvailability::Available) => {
506                info!(
507                    "cluster {}: backends recovered ({}/{} available)",
508                    cluster_id, available, total
509                );
510                incr!(names::cluster::AVAILABLE_RECOVERED, Some(cluster_id), None);
511                push_event(Event {
512                    kind: EventKind::ClusterRecovered as i32,
513                    cluster_id: Some(cluster_id.to_owned()),
514                    backend_id: None,
515                    address: None,
516                    metric_detail: None,
517                });
518            }
519            _ => {}
520        }
521    }
522
523    /// Record (or clear) the `cluster.http2` backend-capability hint for
524    /// `cluster_id`. The health checker reads the resulting map at probe
525    /// time so the wire format follows what the mux router will use to
526    /// connect to the same backends.
527    pub fn set_cluster_http2(&mut self, cluster_id: &str, http2: bool) {
528        if http2 {
529            self.cluster_http2.insert(cluster_id.to_owned(), true);
530        } else {
531            self.cluster_http2.remove(cluster_id);
532        }
533    }
534
535    pub fn set_health_check_config(&mut self, cluster_id: &str, config: Option<HealthCheckConfig>) {
536        match config {
537            Some(c) => {
538                self.health_check_configs.insert(cluster_id.to_owned(), c);
539            }
540            None => {
541                self.health_check_configs.remove(cluster_id);
542                // When the operator drops the health check, any
543                // previously-recorded `HealthState::Unhealthy` would
544                // otherwise stick — `next_available_backend` keeps
545                // skipping the backend even though we have stopped
546                // probing it. Reset every backend in the cluster to a
547                // pristine healthy state so the load balancer can
548                // route again.
549                if let Some(backend_list) = self.backends.get(cluster_id) {
550                    for backend in &backend_list.backends {
551                        backend.borrow_mut().health = HealthState::default();
552                    }
553                }
554                // Re-emit the rollup gauges so dashboards reflect the
555                // post-reset availability instead of holding the last
556                // health-check value indefinitely.
557                self.record_cluster_availability(cluster_id);
558            }
559        }
560    }
561
562    pub fn import_configuration_state(
563        &mut self,
564        backends: &HashMap<ClusterId, Vec<sozu_command::response::Backend>>,
565    ) {
566        self.backends
567            .extend(backends.iter().map(|(cluster_id, backend_vec)| {
568                (
569                    cluster_id.to_string(),
570                    BackendList::import_configuration_state(backend_vec),
571                )
572            }));
573        // Replay path inserts every cluster's backend list without
574        // touching the gauge emission sites used by add/remove/health.
575        // Latch `cluster.available_backends` and `.total_backends` here
576        // so a freshly-loaded worker reports correct values on the very
577        // first `QueryMetrics` instead of zero until something else
578        // mutates each cluster.
579        for cluster_id in backends.keys() {
580            self.record_cluster_availability(cluster_id);
581        }
582    }
583
584    pub fn add_backend(&mut self, cluster_id: &str, backend: Backend) {
585        let address = backend.address;
586        self.backends
587            .entry(cluster_id.to_string())
588            .or_default()
589            .add_backend(backend);
590        // Adding a backend must leave the cluster present and containing the
591        // just-added address (whether it created the entry or updated in place).
592        debug_assert!(
593            self.backends
594                .get(cluster_id)
595                .is_some_and(|list| list.has_backend(&address)),
596            "add_backend must leave the backend present in its cluster"
597        );
598        // Publish initial gauges and surface the corner case where a fresh
599        // cluster's first backend is already down (e.g. registered with a
600        // pre-existing failed retry policy). For an `Available` initial
601        // backend this is just a (1, 1) gauge emission with no transition.
602        self.record_cluster_availability(cluster_id);
603    }
604
605    // TODO: return <Result, BackendError>, log the error downstream
606    /// Remove every backend at `backend_address` from `cluster_id` and
607    /// return the list of `backend_id`s that were dropped. Callers (e.g.
608    /// `Server::remove_backend`) iterate over the returned ids to tear
609    /// down per-backend metrics so the identity used by the runtime
610    /// (address-keyed) matches the identity used by the metrics layer
611    /// (id-keyed) — see PR #1252 follow-up review MEDIUM-3.
612    pub fn remove_backend(
613        &mut self,
614        cluster_id: &str,
615        backend_address: &SocketAddr,
616    ) -> Vec<String> {
617        let removed = if let Some(backends) = self.backends.get_mut(cluster_id) {
618            backends.remove_backend(backend_address)
619        } else {
620            error!(
621                "Backend was already removed: cluster id {}, address {:?}",
622                cluster_id, backend_address
623            );
624            return Vec::new();
625        };
626        // Whatever ids came back, the address is now gone from the cluster's
627        // live set (remove_backend evicts every backend at that address).
628        debug_assert!(
629            self.backends
630                .get(cluster_id)
631                .is_none_or(|list| !list.has_backend(backend_address)),
632            "remove_backend must evict every backend at the address"
633        );
634        // Re-evaluate so removing the last backend logs an explicit
635        // `AllDown` transition (or, with `total == 0`, drops back to
636        // silent gauges).
637        self.record_cluster_availability(cluster_id);
638        removed
639    }
640
641    // TODO: return <Result, BackendError>, log the error downstream
642    pub fn close_backend_connection(&mut self, cluster_id: &str, addr: &SocketAddr) {
643        if let Some(cluster_backends) = self.backends.get_mut(cluster_id) {
644            if let Some(ref mut backend) = cluster_backends.find_backend(addr) {
645                backend.borrow_mut().dec_connections();
646            }
647        }
648    }
649
650    pub fn has_backend(&self, cluster_id: &str, backend: &Backend) -> bool {
651        self.backends
652            .get(cluster_id)
653            .map(|backends| backends.has_backend(&backend.address))
654            .unwrap_or(false)
655    }
656
657    pub fn backend_from_cluster_id(
658        &mut self,
659        cluster_id: &str,
660    ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
661        let cluster_backends = self
662            .backends
663            .get_mut(cluster_id)
664            .ok_or(BackendError::NoBackendForCluster(cluster_id.to_owned()))?;
665
666        if cluster_backends.backends.is_empty() {
667            // Drop the &mut BackendList borrow before the &self helper call.
668            // `total == 0` falls into the "never report AllDown" branch in
669            // record_cluster_availability, so this just publishes the (0, 0)
670            // gauges.
671            let _ = cluster_backends;
672            self.record_cluster_availability(cluster_id);
673            return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
674        }
675        // Past the empty guard there is at least one backend to pick from.
676        debug_assert!(
677            !cluster_backends.backends.is_empty(),
678            "selection runs only on a non-empty backend list"
679        );
680
681        let next_backend = match cluster_backends.next_available_backend() {
682            Some(nb) => nb,
683            None => {
684                // Drop the &mut BackendList before the &self helper call.
685                // The helper observes (available=0, total>0) and emits the
686                // Available -> AllDown transition (log + counter + Event)
687                // exactly once per regime entry. Subsequent calls in the
688                // same AllDown regime are no-ops.
689                let _ = cluster_backends;
690                self.record_cluster_availability(cluster_id);
691                return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
692            }
693        };
694
695        let tcp_stream = {
696            let mut borrowed_backend = next_backend.borrow_mut();
697
698            debug!(
699                "Connecting {} -> {:?}",
700                cluster_id,
701                (
702                    borrowed_backend.address,
703                    borrowed_backend.active_connections,
704                    borrowed_backend.failures
705                )
706            );
707
708            borrowed_backend.try_connect().map_err(|backend_error| {
709                BackendError::ConnectionFailures {
710                    cluster_id: cluster_id.to_owned(),
711                    backend_address: borrowed_backend.address,
712                    failures: borrowed_backend.failures,
713                    error: backend_error.to_string(),
714                }
715            })?
716        };
717
718        // Connection succeeded: re-evaluate so we capture an
719        // AllDown -> Available recovery transition the moment a request
720        // first hits a healthy backend after an outage. `next_backend` is
721        // not borrowed here (the inner block dropped `borrowed_backend`),
722        // so the helper's `BackendList::evaluate_availability` walk is
723        // free to call `borrow()` on every backend.
724        let _ = cluster_backends;
725        self.record_cluster_availability(cluster_id);
726
727        // The selected backend is a live member of the cluster it was drawn
728        // from — selection never fabricates or returns a stale backend.
729        debug_assert!(
730            self.backends.get(cluster_id).is_some_and(|list| {
731                let picked = next_backend.borrow().address;
732                list.has_backend(&picked)
733            }),
734            "the selected backend must belong to the cluster's live set"
735        );
736
737        Ok((next_backend.clone(), tcp_stream))
738    }
739
740    /// Select a backend for `cluster_id`, optionally pinned by an affinity
741    /// `key`, and return its `(backend_id, address)` **without** opening any
742    /// connection.
743    ///
744    /// This is the UDP datapath's selection entry point. Unlike
745    /// [`backend_from_cluster_id`](Self::backend_from_cluster_id) — which is
746    /// TCP-specific because it calls `Backend::try_connect` and hands back a
747    /// `TcpStream` — UDP owns its own per-flow connected `UdpSocket` (created in
748    /// the shell via `socket::udp_connect`), so all the map needs to surface is
749    /// the chosen endpoint identity. `key` is `Some(flow_hash)` so HRW/Maglev
750    /// keep a client flow pinned to one backend; `None` behaves like the legacy
751    /// round-robin selection. Fail-open (all-unhealthy ⇒ LB over the full set)
752    /// is inherited from [`BackendList::next_available_backend_with_key`].
753    pub fn backend_from_cluster_id_with_key(
754        &mut self,
755        cluster_id: &str,
756        key: Option<u64>,
757    ) -> Result<(String, SocketAddr), BackendError> {
758        let cluster_backends = self
759            .backends
760            .get_mut(cluster_id)
761            .ok_or(BackendError::NoBackendForCluster(cluster_id.to_owned()))?;
762
763        if cluster_backends.backends.is_empty() {
764            let _ = cluster_backends;
765            self.record_cluster_availability(cluster_id);
766            return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
767        }
768        debug_assert!(
769            !cluster_backends.backends.is_empty(),
770            "keyed selection runs only on a non-empty backend list"
771        );
772
773        let next_backend = match cluster_backends.next_available_backend_with_key(key) {
774            Some(nb) => nb,
775            None => {
776                let _ = cluster_backends;
777                self.record_cluster_availability(cluster_id);
778                return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
779            }
780        };
781
782        let (backend_id, address) = {
783            let borrowed = next_backend.borrow();
784            (borrowed.backend_id.to_owned(), borrowed.address)
785        };
786        // The keyed selection returns a live member of the cluster — the
787        // surfaced identity (id, address) belongs to a registered backend.
788        debug_assert!(
789            cluster_backends.has_backend(&address),
790            "keyed selection must return a backend in the cluster's live set"
791        );
792        Ok((backend_id, address))
793    }
794
795    pub fn backend_from_sticky_session(
796        &mut self,
797        cluster_id: &str,
798        sticky_session: &str,
799    ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
800        let sticky_conn = self
801            .backends
802            .get_mut(cluster_id)
803            .and_then(|cluster_backends| cluster_backends.find_sticky(sticky_session))
804            .map(|backend| {
805                let mut borrowed = backend.borrow_mut();
806                let conn = borrowed.try_connect();
807
808                conn.map(|tcp_stream| (backend.clone(), tcp_stream))
809                    .inspect_err(|_| {
810                        error!(
811                            "could not connect {} to {:?} using session {} ({} failures)",
812                            cluster_id, borrowed.address, sticky_session, borrowed.failures
813                        )
814                    })
815            });
816
817        match sticky_conn {
818            Some(backend_and_stream) => backend_and_stream,
819            None => {
820                debug!(
821                    "Couldn't find a backend corresponding to sticky_session {} for cluster {}",
822                    sticky_session, cluster_id
823                );
824                self.backend_from_cluster_id(cluster_id)
825            }
826        }
827    }
828
829    pub fn set_load_balancing_policy_for_cluster(
830        &mut self,
831        cluster_id: &str,
832        lb_algo: LoadBalancingAlgorithms,
833        metric: Option<LoadMetric>,
834    ) {
835        // The cluster can be created before the backends were registered because of the async config messages.
836        // So when we set the load balancing policy, we have to create the backend list if if it doesn't exist yet.
837        let cluster_backends = self.get_or_create_backend_list_for_cluster(cluster_id);
838        cluster_backends.set_load_balancing_policy(lb_algo, metric);
839    }
840
841    pub fn get_or_create_backend_list_for_cluster(&mut self, cluster_id: &str) -> &mut BackendList {
842        self.backends.entry(cluster_id.to_string()).or_default()
843    }
844}
845
846#[derive(Debug)]
847pub struct BackendList {
848    pub backends: Vec<Rc<RefCell<Backend>>>,
849    pub next_id: u32,
850    pub load_balancing: Box<dyn LoadBalancingAlgorithm>,
851    /// Latches the fail-open `warn!`. Set to `true` when fail-open routing
852    /// emits its entry warning so subsequent routing decisions in the same
853    /// regime stay quiet; reset to `false` when a healthy backend is
854    /// available again, so the regime-exit transition is logged exactly once.
855    /// Without this latch the warning fired per request, which under a
856    /// universal outage (the exact scenario fail-open targets) is also the
857    /// highest request-rate scenario — log volume would become catastrophic.
858    fail_open_warned: bool,
859    /// Per-cluster availability latched by `BackendMap::record_cluster_availability`.
860    /// `Cell` (not `RefCell`) because the receiver is `&self` and the
861    /// state is `Copy`. Worker runtime is single-threaded, so a `Cell` is
862    /// sound — no synchronisation needed.
863    pub(crate) availability: Cell<ClusterAvailability>,
864}
865
866impl Default for BackendList {
867    fn default() -> Self {
868        Self::new()
869    }
870}
871
872impl BackendList {
873    pub fn new() -> BackendList {
874        BackendList {
875            backends: Vec::new(),
876            next_id: 0,
877            load_balancing: Box::new(Random),
878            fail_open_warned: false,
879            availability: Cell::new(ClusterAvailability::Available),
880        }
881    }
882
883    /// Count `(available, total)` for this cluster. Delegates to
884    /// `Backend::is_available` so the per-cluster aggregate and the
885    /// per-backend `backend.available` gauge stay in lock-step.
886    pub(crate) fn evaluate_availability(&self) -> (usize, usize) {
887        let total = self.backends.len();
888        let available = self
889            .backends
890            .iter()
891            .filter(|b| b.borrow().is_available())
892            .count();
893        // The available count is a filtered subset of the total, so it can
894        // never exceed it, and `total` mirrors the backend vector length.
895        debug_assert!(
896            available <= total,
897            "available ({available}) cannot exceed total ({total})"
898        );
899        debug_assert_eq!(total, self.backends.len(), "total must equal backend count");
900        (available, total)
901    }
902
903    /// Full invariant sweep over the backend list. Called as a `debug_assert!`
904    /// postcondition by every mutating method so any cross-field corruption
905    /// (duplicate addresses, a `next_id` that underflowed below the registered
906    /// count) surfaces immediately under test/fuzz.
907    #[cfg(debug_assertions)]
908    fn check_invariants(&self) {
909        // `next_id` is a monotonically incremented registration counter: it is
910        // bumped once per *newly inserted* backend (never decremented, even on
911        // removal), so it is always at least the current live count.
912        debug_assert!(
913            self.next_id as usize >= self.backends.len(),
914            "next_id ({}) must be >= live backend count ({})",
915            self.next_id,
916            self.backends.len()
917        );
918        // Addresses are the routing-stable identity used by `has_backend` /
919        // `remove_backend`; two live backends may legitimately share an address
920        // (A/B variant) but must then differ by `backend_id`. The (address,
921        // backend_id) pair is therefore unique across the live set.
922        for (i, a) in self.backends.iter().enumerate() {
923            let a = a.borrow();
924            for b in self.backends.iter().skip(i + 1) {
925                let b = b.borrow();
926                debug_assert!(
927                    a.address != b.address || a.backend_id != b.backend_id,
928                    "duplicate (address, backend_id) in the live set: {:?} / {}",
929                    a.address,
930                    a.backend_id
931                );
932            }
933        }
934    }
935
936    pub fn import_configuration_state(
937        backend_vec: &[sozu_command_lib::response::Backend],
938    ) -> BackendList {
939        let mut list = BackendList::new();
940        for backend in backend_vec {
941            let backend = Backend::new(
942                &backend.backend_id,
943                backend.address,
944                backend.sticky_id.clone(),
945                backend.load_balancing_parameters,
946                backend.backup,
947            );
948            list.add_backend(backend);
949        }
950
951        list
952    }
953
954    pub fn add_backend(&mut self, backend: Backend) {
955        let address = backend.address;
956        let len_before = self.backends.len();
957        let next_id_before = self.next_id;
958        let existed = self.backends.iter().any(|b| {
959            b.borrow().address == backend.address && b.borrow().backend_id == backend.backend_id
960        });
961        match self.backends.iter_mut().find(|b| {
962            b.borrow().address == backend.address && b.borrow().backend_id == backend.backend_id
963        }) {
964            None => {
965                let backend = Rc::new(RefCell::new(backend));
966                self.backends.push(backend);
967                self.next_id += 1;
968            }
969            // the backend already exists, update the configuration while
970            // keeping connection retry state
971            Some(old_backend) => {
972                let mut b = old_backend.borrow_mut();
973                b.sticky_id.clone_from(&backend.sticky_id);
974                b.load_balancing_parameters
975                    .clone_from(&backend.load_balancing_parameters);
976                b.backup = backend.backup;
977            }
978        }
979        // Insert grows the list by exactly one and bumps `next_id`; an update
980        // in place leaves both untouched. The address is present either way.
981        debug_assert_eq!(
982            self.backends.len(),
983            len_before + (!existed) as usize,
984            "add_backend grows the list by one only on a genuine insert"
985        );
986        debug_assert_eq!(
987            self.next_id,
988            next_id_before + (!existed) as u32,
989            "next_id advances by one only on a genuine insert"
990        );
991        debug_assert!(
992            self.has_backend(&address),
993            "add_backend must leave the backend present in the list"
994        );
995        // Refresh table-based policies (Maglev) off the datapath whenever the
996        // full backend set or a weight changes. This is the ONLY place the
997        // table is rebuilt on mutation; selection never rebuilds. The default
998        // `rebuild` is a no-op for the stateless policies.
999        self.load_balancing.rebuild(&self.backends);
1000        #[cfg(debug_assertions)]
1001        self.check_invariants();
1002    }
1003
1004    /// Remove every backend at `backend_address` and return the list of
1005    /// `backend_id`s that were dropped. Two backends with the same address
1006    /// but distinct ids (A/B test, weighted variant, dedup race) are both
1007    /// removed here; the caller relies on the returned ids to tear down
1008    /// matching per-backend state (metrics, health-check). Returning the
1009    /// ids closes the identity drift between runtime-removal-by-address
1010    /// and metrics-removal-by-id.
1011    pub fn remove_backend(&mut self, backend_address: &SocketAddr) -> Vec<String> {
1012        let len_before = self.backends.len();
1013        let mut removed = Vec::new();
1014        self.backends.retain(|backend| {
1015            let b = backend.borrow();
1016            if &b.address == backend_address {
1017                removed.push(b.backend_id.clone());
1018                false
1019            } else {
1020                true
1021            }
1022        });
1023        // The list shrinks by exactly the number of ids reported removed, and
1024        // the address is fully evicted (no straggler left behind).
1025        debug_assert_eq!(
1026            self.backends.len(),
1027            len_before - removed.len(),
1028            "remove_backend must drop exactly the backends it reports"
1029        );
1030        debug_assert!(
1031            !self.has_backend(backend_address),
1032            "remove_backend must evict every backend at the address"
1033        );
1034        // Rebuild table-based policies (Maglev) off the datapath after the set
1035        // shrinks, only when something was actually removed. No-op for the
1036        // stateless policies.
1037        if !removed.is_empty() {
1038            self.load_balancing.rebuild(&self.backends);
1039        }
1040        #[cfg(debug_assertions)]
1041        self.check_invariants();
1042        removed
1043    }
1044
1045    pub fn has_backend(&self, backend_address: &SocketAddr) -> bool {
1046        self.backends
1047            .iter()
1048            .any(|backend| backend.borrow().address == *backend_address)
1049    }
1050
1051    pub fn find_backend(
1052        &mut self,
1053        backend_address: &SocketAddr,
1054    ) -> Option<&mut Rc<RefCell<Backend>>> {
1055        self.backends
1056            .iter_mut()
1057            .find(|backend| backend.borrow().address == *backend_address)
1058    }
1059
1060    pub fn find_sticky(&mut self, sticky_session: &str) -> Option<&mut Rc<RefCell<Backend>>> {
1061        self.backends
1062            .iter_mut()
1063            .find(|b| b.borrow().sticky_id.as_deref() == Some(sticky_session))
1064            .and_then(|b| if b.borrow().can_open() { Some(b) } else { None })
1065    }
1066
1067    pub fn available_backends(&mut self, backup: bool) -> Vec<Rc<RefCell<Backend>>> {
1068        self.backends
1069            .iter()
1070            .filter(|backend| {
1071                let owned = backend.borrow();
1072                owned.backup == backup && owned.can_open()
1073            })
1074            .map(Clone::clone)
1075            .collect()
1076    }
1077
1078    pub fn next_available_backend(&mut self) -> Option<Rc<RefCell<Backend>>> {
1079        self.next_available_backend_with_key(None)
1080    }
1081
1082    /// Pick the next available backend, optionally pinned by an affinity `key`.
1083    ///
1084    /// `key` is only consulted by consistent-hashing policies (HRW/Maglev);
1085    /// every other policy ignores it, so `next_available_backend_with_key(None)`
1086    /// is byte-for-byte the legacy behavior. The UDP datapath calls this with
1087    /// `Some(flow_hash)` to keep a client flow pinned to one backend.
1088    pub fn next_available_backend_with_key(
1089        &mut self,
1090        key: Option<u64>,
1091    ) -> Option<Rc<RefCell<Backend>>> {
1092        let mut backends = self.available_backends(false);
1093
1094        if backends.is_empty() {
1095            backends = self.available_backends(true);
1096        }
1097
1098        if !backends.is_empty() {
1099            // Healthy regime: log the fail-open exit transition exactly once.
1100            if self.fail_open_warned {
1101                info!(
1102                    "fail-open: cluster recovered, {} backends now healthy",
1103                    backends.len()
1104                );
1105                self.fail_open_warned = false;
1106            }
1107            // The candidate set is a subset of the live backend list, so a
1108            // chosen backend is always a live cluster member.
1109            debug_assert!(
1110                backends.len() <= self.backends.len(),
1111                "candidate set cannot be larger than the full backend list"
1112            );
1113            let picked = self
1114                .load_balancing
1115                .next_available_backend(key, &mut backends);
1116            debug_assert!(
1117                picked.as_ref().is_none_or(|b| {
1118                    let addr = b.borrow().address;
1119                    self.backends.iter().any(|x| x.borrow().address == addr)
1120                }),
1121                "selection must return a backend present in the live list"
1122            );
1123            return picked;
1124        }
1125
1126        // Fail-open: when no backend passes the full `can_open()` gate,
1127        // route to backends that are administratively `Normal` AND whose
1128        // retry policy reports `OKAY` (i.e., not currently in
1129        // exponential-backoff). This prevents a shared dependency outage
1130        // (e.g., database) from making the entire cluster unavailable while
1131        // still respecting the per-backend back-off window — hammering a
1132        // backend at line rate during its back-off would defeat the back-off
1133        // itself. Ref: Amazon "Implementing Health Checks".
1134        backends = self
1135            .backends
1136            .iter()
1137            .filter(|b| {
1138                let owned = b.borrow();
1139                owned.status == BackendStatus::Normal
1140                    && matches!(owned.retry_policy.can_try(), Some(retry::RetryAction::OKAY))
1141            })
1142            .map(Clone::clone)
1143            .collect();
1144
1145        if backends.is_empty() {
1146            return None;
1147        }
1148
1149        // Latched warning + per-decision counter: the warn! fires once on
1150        // regime entry; the counter is the operator-visible per-request
1151        // signal that does not drown logs under universal outage.
1152        if !self.fail_open_warned {
1153            warn!(
1154                "fail-open: all backends unhealthy, routing to {} normal backends with retry-policy OKAY",
1155                backends.len()
1156            );
1157            self.fail_open_warned = true;
1158        }
1159        count!(names::backend::FAIL_OPEN, 1);
1160
1161        self.load_balancing
1162            .next_available_backend(key, &mut backends)
1163    }
1164
1165    pub fn set_load_balancing_policy(
1166        &mut self,
1167        load_balancing_policy: LoadBalancingAlgorithms,
1168        metric: Option<LoadMetric>,
1169    ) {
1170        match load_balancing_policy {
1171            LoadBalancingAlgorithms::RoundRobin => {
1172                self.load_balancing = Box::new(RoundRobin::new())
1173            }
1174            LoadBalancingAlgorithms::Random => self.load_balancing = Box::new(Random {}),
1175            LoadBalancingAlgorithms::LeastLoaded => {
1176                self.load_balancing = Box::new(LeastLoaded {
1177                    metric: metric.unwrap_or(LoadMetric::Connections),
1178                })
1179            }
1180            LoadBalancingAlgorithms::PowerOfTwo => {
1181                self.load_balancing = Box::new(PowerOfTwo {
1182                    metric: metric.unwrap_or(LoadMetric::Connections),
1183                })
1184            }
1185            // Affinity policies (used by the UDP datapath). They consult the
1186            // optional hash key; with `None` they fall back to round-robin.
1187            LoadBalancingAlgorithms::Hrw => self.load_balancing = Box::new(Rendezvous::new()),
1188            LoadBalancingAlgorithms::Maglev => {
1189                let mut maglev = Maglev::new();
1190                // Seed the lookup table from the currently-known backends so
1191                // selection is correct before the first control-plane rebuild.
1192                maglev.rebuild(&self.backends);
1193                self.load_balancing = Box::new(maglev);
1194            }
1195        }
1196    }
1197}
1198
1199#[cfg(test)]
1200mod backends_test {
1201
1202    use std::{net::TcpListener, sync::mpsc::*, thread};
1203
1204    use super::*;
1205
1206    fn run_mock_tcp_server(addr: &str, stopper: Receiver<()>) {
1207        let mut run = true;
1208        let listener = TcpListener::bind(addr).unwrap();
1209
1210        thread::spawn(move || {
1211            while run {
1212                for _stream in listener.incoming() {
1213                    // accept connections
1214                    if let Ok(()) = stopper.try_recv() {
1215                        run = false;
1216                    }
1217                }
1218            }
1219        });
1220    }
1221
1222    #[test]
1223    fn it_should_retrieve_a_backend_from_cluster_id_when_backends_have_been_recorded() {
1224        let mut backend_map = BackendMap::new();
1225        let cluster_id = "mycluster";
1226
1227        let backend_addr = "127.0.0.1:1236";
1228        let (sender, receiver) = channel();
1229        run_mock_tcp_server(backend_addr, receiver);
1230
1231        backend_map.add_backend(
1232            cluster_id,
1233            Backend::new(
1234                &format!("{cluster_id}-1"),
1235                backend_addr.parse().unwrap(),
1236                None,
1237                None,
1238                None,
1239            ),
1240        );
1241
1242        assert!(backend_map.backend_from_cluster_id(cluster_id).is_ok());
1243        sender.send(()).unwrap();
1244    }
1245
1246    #[test]
1247    fn it_should_not_retrieve_a_backend_from_cluster_id_when_backend_has_not_been_recorded() {
1248        let mut backend_map = BackendMap::new();
1249        let cluster_not_recorded = "not";
1250        backend_map.add_backend(
1251            "foo",
1252            Backend::new("foo-1", "127.0.0.1:9001".parse().unwrap(), None, None, None),
1253        );
1254
1255        assert!(
1256            backend_map
1257                .backend_from_cluster_id(cluster_not_recorded)
1258                .is_err()
1259        );
1260    }
1261
1262    #[test]
1263    fn it_should_not_retrieve_a_backend_from_cluster_id_when_backend_list_is_empty() {
1264        let mut backend_map = BackendMap::new();
1265
1266        assert!(backend_map.backend_from_cluster_id("dumb").is_err());
1267    }
1268
1269    #[test]
1270    fn it_should_retrieve_a_backend_from_sticky_session_when_the_backend_has_been_recorded() {
1271        let mut backend_map = BackendMap::new();
1272        let cluster_id = "mycluster";
1273        let sticky_session = "server-2";
1274
1275        let backend_addr = "127.0.0.1:3456";
1276        let (sender, receiver) = channel();
1277        run_mock_tcp_server(backend_addr, receiver);
1278
1279        backend_map.add_backend(
1280            cluster_id,
1281            Backend::new(
1282                &format!("{cluster_id}-1"),
1283                "127.0.0.1:9001".parse().unwrap(),
1284                Some("server-1".to_string()),
1285                None,
1286                None,
1287            ),
1288        );
1289        backend_map.add_backend(
1290            cluster_id,
1291            Backend::new(
1292                &format!("{cluster_id}-2"),
1293                "127.0.0.1:9000".parse().unwrap(),
1294                Some("server-2".to_string()),
1295                None,
1296                None,
1297            ),
1298        );
1299        // sticky backend
1300        backend_map.add_backend(
1301            cluster_id,
1302            Backend::new(
1303                &format!("{cluster_id}-3"),
1304                backend_addr.parse().unwrap(),
1305                Some("server-3".to_string()),
1306                None,
1307                None,
1308            ),
1309        );
1310
1311        assert!(
1312            backend_map
1313                .backend_from_sticky_session(cluster_id, sticky_session)
1314                .is_ok()
1315        );
1316        sender.send(()).unwrap();
1317    }
1318
1319    #[test]
1320    fn it_should_not_retrieve_a_backend_from_sticky_session_when_the_backend_has_not_been_recorded()
1321    {
1322        let mut backend_map = BackendMap::new();
1323        let cluster_id = "mycluster";
1324        let sticky_session = "test";
1325
1326        assert!(
1327            backend_map
1328                .backend_from_sticky_session(cluster_id, sticky_session)
1329                .is_err()
1330        );
1331    }
1332
1333    #[test]
1334    fn it_should_not_retrieve_a_backend_from_sticky_session_when_the_backend_list_is_empty() {
1335        let mut backend_map = BackendMap::new();
1336        let mycluster_not_recorded = "mycluster";
1337        let sticky_session = "test";
1338
1339        assert!(
1340            backend_map
1341                .backend_from_sticky_session(mycluster_not_recorded, sticky_session)
1342                .is_err()
1343        );
1344    }
1345
1346    #[test]
1347    fn it_should_add_a_backend_when_he_doesnt_already_exist() {
1348        let backend_id = "myback";
1349        let mut backends_list = BackendList::new();
1350        backends_list.add_backend(Backend::new(
1351            backend_id,
1352            "127.0.0.1:80".parse().unwrap(),
1353            None,
1354            None,
1355            None,
1356        ));
1357
1358        assert_eq!(1, backends_list.backends.len());
1359    }
1360
1361    #[test]
1362    fn it_should_not_add_a_backend_when_he_already_exist() {
1363        let backend_id = "myback";
1364        let mut backends_list = BackendList::new();
1365        backends_list.add_backend(Backend::new(
1366            backend_id,
1367            "127.0.0.1:80".parse().unwrap(),
1368            None,
1369            None,
1370            None,
1371        ));
1372
1373        //same backend id
1374        backends_list.add_backend(Backend::new(
1375            backend_id,
1376            "127.0.0.1:80".parse().unwrap(),
1377            None,
1378            None,
1379            None,
1380        ));
1381
1382        assert_eq!(1, backends_list.backends.len());
1383    }
1384
1385    /// Build a backend addressed at 127.0.0.1:port and force it Unhealthy
1386    /// without going through the health-check loop.
1387    fn unhealthy_backend(id: &str, port: u16) -> Backend {
1388        let mut backend = Backend::new(
1389            id,
1390            format!("127.0.0.1:{port}").parse().unwrap(),
1391            None,
1392            None,
1393            None,
1394        );
1395        // Threshold = 1 transitions on the first failure.
1396        backend.health.record_failure(1);
1397        assert!(!backend.health.is_healthy());
1398        backend
1399    }
1400
1401    #[test]
1402    fn fail_open_picks_normal_backend_in_retry_policy_okay() {
1403        // All backends are unhealthy but their retry policy is fresh (OKAY),
1404        // so fail-open must select one. A fresh ExponentialBackoffPolicy
1405        // returns OKAY on `can_try()` until the first `fail()` arms a wait
1406        // window.
1407        let mut list = BackendList::new();
1408        list.add_backend(unhealthy_backend("b1", 9001));
1409        list.add_backend(unhealthy_backend("b2", 9002));
1410
1411        // Sanity: `available_backends` returns nothing (the regular path).
1412        assert!(list.available_backends(false).is_empty());
1413        assert!(list.available_backends(true).is_empty());
1414
1415        let picked = list.next_available_backend();
1416        assert!(
1417            picked.is_some(),
1418            "fail-open must pick a Normal+OKAY backend"
1419        );
1420        assert!(list.fail_open_warned, "regime entry must latch the warn!");
1421    }
1422
1423    #[test]
1424    fn fail_open_skips_backend_in_retry_backoff() {
1425        // Same shape as above, but each backend's retry policy is in the
1426        // WAIT window after a recorded failure. Fail-open must NOT pick any
1427        // of them — hammering a backend at line rate during its back-off
1428        // window is exactly what the back-off is protecting against — and
1429        // the regime-entry warn! must NOT latch (no log spam either).
1430        let mut list = BackendList::new();
1431        list.add_backend(unhealthy_backend("b1", 9011));
1432        list.add_backend(unhealthy_backend("b2", 9012));
1433        for backend_rc in &list.backends {
1434            backend_rc.borrow_mut().retry_policy().fail();
1435            assert_eq!(
1436                Some(retry::RetryAction::WAIT),
1437                backend_rc.borrow().retry_policy.can_try(),
1438                "test fixture must place retry policy in WAIT"
1439            );
1440        }
1441
1442        let picked = list.next_available_backend();
1443        assert!(
1444            picked.is_none(),
1445            "fail-open must skip backends whose retry policy is in WAIT"
1446        );
1447        assert!(
1448            !list.fail_open_warned,
1449            "no candidate backends, no regime entry"
1450        );
1451    }
1452
1453    #[test]
1454    fn fail_open_warn_latched() {
1455        // First call enters the regime → latch flips, warn! fires.
1456        // Second call stays in the regime → latch stays, no second warn!.
1457        // Recovering one backend → latch clears on the next routing call.
1458        let mut list = BackendList::new();
1459        list.add_backend(unhealthy_backend("b1", 9021));
1460        list.add_backend(unhealthy_backend("b2", 9022));
1461
1462        assert!(list.next_available_backend().is_some());
1463        assert!(list.fail_open_warned, "first fail-open must latch");
1464
1465        assert!(list.next_available_backend().is_some());
1466        assert!(
1467            list.fail_open_warned,
1468            "subsequent fail-open routing keeps the latch"
1469        );
1470
1471        // Heal one backend — the next routing call takes the healthy path
1472        // and must clear the latch (regime exit logged once).
1473        list.backends[0].borrow_mut().health.status = HealthStatus::Healthy;
1474        let picked = list.next_available_backend();
1475        assert!(
1476            picked.is_some(),
1477            "regular path must select the healed backend"
1478        );
1479        assert!(
1480            !list.fail_open_warned,
1481            "regime exit must clear the latch so the next entry is logged again"
1482        );
1483    }
1484
1485    // ── #892: per-cluster availability tracker ──────────────────────────
1486
1487    /// Build a backend that passes the `evaluate_availability` predicate
1488    /// (`status == Normal && health.is_healthy() && !retry_policy.is_down()`).
1489    /// A `Backend::new` returns Normal/Healthy with a fresh
1490    /// ExponentialBackoffPolicy that reports `is_down() == false` until the
1491    /// first `fail()`, so this is just `Backend::new` with a stable address.
1492    fn healthy_backend(id: &str, port: u16) -> Backend {
1493        Backend::new(
1494            id,
1495            format!("127.0.0.1:{port}").parse().unwrap(),
1496            None,
1497            None,
1498            None,
1499        )
1500    }
1501
1502    #[test]
1503    fn is_available_requires_health_status_and_retry_policy() {
1504        // Fresh backend: Healthy + Normal + retry_policy fresh (OKAY).
1505        let mut backend = Backend::new("b", "127.0.0.1:9050".parse().unwrap(), None, None, None);
1506        assert!(backend.is_available(), "fresh backend must be available");
1507
1508        // Unhealthy fails the predicate even with everything else OK.
1509        backend.health.record_failure(1);
1510        assert!(!backend.is_available(), "unhealthy must not be available");
1511
1512        // Restore health, then drive retry policy into the exhausted-budget
1513        // state via the test-only helper. Calling `fail()` in a tight loop
1514        // would early-return on the second invocation because the
1515        // exponential-backoff window has not elapsed yet, so the natural
1516        // path needs real-time sleeps the unit test cannot afford.
1517        backend.health.status = HealthStatus::Healthy;
1518        assert!(backend.is_available());
1519        backend.retry_policy.force_down();
1520        assert!(
1521            backend.retry_policy.is_down(),
1522            "test setup: retry policy budget must be exhausted",
1523        );
1524        assert!(
1525            !backend.is_available(),
1526            "retry-policy backoff must fail the predicate"
1527        );
1528
1529        // Reset retry, switch lifecycle to Closing.
1530        backend.retry_policy.succeed();
1531        backend.set_closing();
1532        assert!(
1533            !backend.is_available(),
1534            "Closing lifecycle status must fail the predicate"
1535        );
1536    }
1537
1538    #[test]
1539    fn evaluate_availability_empty_list_returns_zero_zero() {
1540        let list = BackendList::new();
1541        assert_eq!((0, 0), list.evaluate_availability());
1542    }
1543
1544    #[test]
1545    fn evaluate_availability_counts_only_healthy_normal_not_in_backoff() {
1546        let mut list = BackendList::new();
1547        list.add_backend(healthy_backend("b-ok-1", 9101));
1548        list.add_backend(healthy_backend("b-ok-2", 9102));
1549        list.add_backend(unhealthy_backend("b-bad", 9103));
1550        let (available, total) = list.evaluate_availability();
1551        assert_eq!(3, total, "every configured backend counts toward total");
1552        assert_eq!(
1553            2, available,
1554            "only the two healthy backends pass the predicate"
1555        );
1556    }
1557
1558    #[test]
1559    fn evaluate_availability_excludes_retry_policy_down() {
1560        let mut list = BackendList::new();
1561        list.add_backend(healthy_backend("b-fresh", 9111));
1562        list.add_backend(healthy_backend("b-fail", 9112));
1563        // Drive the second backend's retry policy into is_down() via the
1564        // test-only force_down() helper. A natural exhaustion would
1565        // require waiting through the exponential-backoff windows
1566        // (`fail()` early-returns when called inside one).
1567        list.backends[1].borrow_mut().retry_policy.force_down();
1568        let (available, total) = list.evaluate_availability();
1569        assert_eq!(2, total);
1570        assert_eq!(
1571            1, available,
1572            "retry-policy is_down() backend must be excluded even when health.is_healthy()"
1573        );
1574    }
1575
1576    #[test]
1577    fn record_cluster_availability_flips_to_alldown_then_idempotent() {
1578        let mut map = BackendMap::new();
1579        let cluster_id = "c-flap";
1580        map.add_backend(cluster_id, unhealthy_backend("b1", 9201));
1581        // After add_backend the helper has run; total=1, available=0,
1582        // so the cell must already be AllDown.
1583        let list = map.backends.get(cluster_id).expect("cluster present");
1584        assert_eq!(
1585            ClusterAvailability::AllDown,
1586            list.availability.get(),
1587            "single unhealthy backend must drive the cell to AllDown"
1588        );
1589        // Calling again in the same regime is a no-op (Cell already AllDown).
1590        map.record_cluster_availability(cluster_id);
1591        let list = map.backends.get(cluster_id).expect("cluster present");
1592        assert_eq!(
1593            ClusterAvailability::AllDown,
1594            list.availability.get(),
1595            "repeat call must keep the cell at AllDown without flipping"
1596        );
1597    }
1598
1599    #[test]
1600    fn record_cluster_availability_recovers_to_available() {
1601        let mut map = BackendMap::new();
1602        let cluster_id = "c-recover";
1603        map.add_backend(cluster_id, unhealthy_backend("b1", 9301));
1604        assert_eq!(
1605            ClusterAvailability::AllDown,
1606            map.backends.get(cluster_id).unwrap().availability.get()
1607        );
1608        // Heal the backend in place and re-evaluate. Without going through
1609        // a routing call the helper still fires from add_backend / HC, so
1610        // here we drive it manually.
1611        map.backends.get_mut(cluster_id).unwrap().backends[0]
1612            .borrow_mut()
1613            .health
1614            .status = HealthStatus::Healthy;
1615        map.record_cluster_availability(cluster_id);
1616        assert_eq!(
1617            ClusterAvailability::Available,
1618            map.backends.get(cluster_id).unwrap().availability.get(),
1619            "healed backend must flip the cell back to Available"
1620        );
1621    }
1622
1623    #[test]
1624    fn record_cluster_availability_empty_cluster_stays_available() {
1625        let mut map = BackendMap::new();
1626        let cluster_id = "c-empty";
1627        map.backends
1628            .insert(cluster_id.to_owned(), BackendList::new());
1629        // total == 0 path: never report AllDown — avoids log spam during
1630        // cluster bootstrap when backends are still being registered.
1631        map.record_cluster_availability(cluster_id);
1632        assert_eq!(
1633            ClusterAvailability::Available,
1634            map.backends.get(cluster_id).unwrap().availability.get(),
1635            "empty cluster must keep the cell at the default Available"
1636        );
1637    }
1638
1639    #[test]
1640    fn record_cluster_availability_missing_cluster_is_noop() {
1641        let map = BackendMap::new();
1642        // No panic, no insert — just an early return.
1643        map.record_cluster_availability("c-absent");
1644        assert!(
1645            !map.backends.contains_key("c-absent"),
1646            "helper must not insert a BackendList for an unknown cluster_id"
1647        );
1648    }
1649
1650    #[test]
1651    fn import_configuration_state_latches_cluster_rollup_gauges() {
1652        use crate::metrics::METRICS;
1653        use sozu_command_lib::proto::command::QueryMetricsOptions;
1654        // Unique cluster id so the assertion is not perturbed by gauges
1655        // left in the thread-local METRICS aggregator by sibling tests.
1656        let cluster_id = "c-import-rollup-9701";
1657        let mut map = BackendMap::new();
1658        let mut input = HashMap::new();
1659        input.insert(
1660            cluster_id.to_owned(),
1661            vec![sozu_command_lib::response::Backend {
1662                cluster_id: cluster_id.to_owned(),
1663                backend_id: "b1".to_owned(),
1664                address: "127.0.0.1:9701".parse().unwrap(),
1665                sticky_id: None,
1666                load_balancing_parameters: None,
1667                backup: None,
1668            }],
1669        );
1670        map.import_configuration_state(&input);
1671        let response = METRICS
1672            .with(|m| {
1673                m.borrow_mut().query(&QueryMetricsOptions {
1674                    metric_names: vec![
1675                        names::cluster::AVAILABLE_BACKENDS.to_owned(),
1676                        names::cluster::TOTAL_BACKENDS.to_owned(),
1677                    ],
1678                    cluster_ids: vec![cluster_id.to_owned()],
1679                    backend_ids: vec![],
1680                    list: false,
1681                    no_clusters: false,
1682                    workers: false,
1683                })
1684            })
1685            .expect("metrics query succeeds");
1686        let cluster_metrics = match response.content_type {
1687            Some(
1688                sozu_command_lib::proto::command::response_content::ContentType::WorkerMetrics(wm),
1689            ) => wm,
1690            other => panic!("expected WorkerMetrics, got {other:?}"),
1691        };
1692        let cm = cluster_metrics
1693            .clusters
1694            .get(cluster_id)
1695            .expect("imported cluster must have a ClusterMetrics entry");
1696        // Without the import-time `record_cluster_availability` call the
1697        // two rollup gauges would be absent here. The fix guarantees the
1698        // pair lands without waiting for any follow-up backend mutation.
1699        assert!(
1700            cm.cluster.contains_key(names::cluster::AVAILABLE_BACKENDS),
1701            "cluster.available_backends gauge must be latched at import time"
1702        );
1703        assert!(
1704            cm.cluster.contains_key(names::cluster::TOTAL_BACKENDS),
1705            "cluster.total_backends gauge must be latched at import time"
1706        );
1707    }
1708
1709    #[test]
1710    fn set_health_check_config_none_re_emits_rollup_after_reset() {
1711        let mut map = BackendMap::new();
1712        let cluster_id = "c-hc-reset";
1713        // Seed the cluster with an unhealthy backend so `add_backend`
1714        // drives the `availability` cell to AllDown.
1715        map.add_backend(cluster_id, unhealthy_backend("b1", 9801));
1716        assert_eq!(
1717            ClusterAvailability::AllDown,
1718            map.backends.get(cluster_id).unwrap().availability.get(),
1719            "test setup: unhealthy backend must register the cell at AllDown"
1720        );
1721        // Disabling the health check resets backend health to the default
1722        // pristine state AND must re-emit the rollup so the cell reflects
1723        // the post-reset availability instead of the stale AllDown.
1724        map.set_health_check_config(cluster_id, None);
1725        assert_eq!(
1726            ClusterAvailability::Available,
1727            map.backends.get(cluster_id).unwrap().availability.get(),
1728            "set_health_check_config(None) must re-emit the rollup after \
1729             resetting backend health, otherwise dashboards stay stuck at AllDown"
1730        );
1731    }
1732}