Skip to main content

sozu_lib/
backends.rs

1use std::{
2    cell::{Cell, RefCell},
3    collections::HashMap,
4    net::SocketAddr,
5    rc::Rc,
6    time::Duration,
7};
8
9use mio::net::TcpStream;
10use sozu_command::{
11    proto::command::{
12        Event, EventKind, HealthCheckConfig, LoadBalancingAlgorithms, LoadBalancingParams,
13        LoadMetric,
14    },
15    state::ClusterId,
16};
17
18use crate::metrics::names;
19use crate::{
20    PeakEWMA,
21    load_balancing::{LeastLoaded, LoadBalancingAlgorithm, PowerOfTwo, Random, RoundRobin},
22    retry::{self, RetryPolicy},
23    server::{self, push_event},
24};
25
26#[derive(thiserror::Error, Debug)]
27pub enum BackendError {
28    #[error("No backend found for cluster {0}")]
29    NoBackendForCluster(String),
30    #[error("Failed to connect to socket with MIO: {0}")]
31    MioConnection(std::io::Error),
32    #[error("This backend is not in a normal status: status={0:?}")]
33    Status(BackendStatus),
34    #[error("could not connect {cluster_id} to {backend_address:?} ({failures} failures): {error}")]
35    ConnectionFailures {
36        cluster_id: String,
37        backend_address: SocketAddr,
38        failures: usize,
39        error: String,
40    },
41}
42
43#[derive(Debug, PartialEq, Eq, Clone)]
44pub enum BackendStatus {
45    Normal,
46    Closing,
47    Closed,
48}
49
50#[derive(Debug, PartialEq, Eq, Clone, Copy)]
51pub enum HealthStatus {
52    Healthy,
53    Unhealthy,
54}
55
56/// Per-cluster availability state, owned by `BackendList`. Flips between
57/// `Available` (≥1 backend can serve traffic) and `AllDown` (every backend
58/// fails the `health.is_healthy() && !retry_policy.is_down()` predicate)
59/// every time `BackendMap::record_cluster_availability` is invoked.
60/// Empty clusters never report `AllDown` to avoid log spam during cluster
61/// bootstrap.
62#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub(crate) enum ClusterAvailability {
64    #[default]
65    Available,
66    AllDown,
67}
68
69#[derive(Debug, Clone, PartialEq)]
70pub struct HealthState {
71    pub status: HealthStatus,
72    pub consecutive_successes: u32,
73    pub consecutive_failures: u32,
74}
75
76impl Default for HealthState {
77    fn default() -> Self {
78        HealthState {
79            status: HealthStatus::Healthy,
80            consecutive_successes: 0,
81            consecutive_failures: 0,
82        }
83    }
84}
85
86impl HealthState {
87    /// Record a successful health check. Returns true if the backend transitioned to healthy.
88    pub fn record_success(&mut self, healthy_threshold: u32) -> bool {
89        self.consecutive_failures = 0;
90        self.consecutive_successes += 1;
91
92        if self.status == HealthStatus::Unhealthy && self.consecutive_successes >= healthy_threshold
93        {
94            self.status = HealthStatus::Healthy;
95            return true;
96        }
97        false
98    }
99
100    /// Record a failed health check. Returns true if the backend transitioned to unhealthy.
101    pub fn record_failure(&mut self, unhealthy_threshold: u32) -> bool {
102        self.consecutive_successes = 0;
103        self.consecutive_failures += 1;
104
105        if self.status == HealthStatus::Healthy && self.consecutive_failures >= unhealthy_threshold
106        {
107            self.status = HealthStatus::Unhealthy;
108            return true;
109        }
110        false
111    }
112
113    pub fn is_healthy(&self) -> bool {
114        self.status == HealthStatus::Healthy
115    }
116}
117
118#[derive(Debug, PartialEq, Clone)]
119pub struct Backend {
120    pub sticky_id: Option<String>,
121    pub backend_id: String,
122    pub address: SocketAddr,
123    pub status: BackendStatus,
124    pub retry_policy: retry::RetryPolicyWrapper,
125    pub active_connections: usize,
126    pub active_requests: usize,
127    pub failures: usize,
128    pub load_balancing_parameters: Option<LoadBalancingParams>,
129    pub backup: bool,
130    pub connection_time: PeakEWMA,
131    pub health: HealthState,
132}
133
134impl Backend {
135    pub fn new(
136        backend_id: &str,
137        address: SocketAddr,
138        sticky_id: Option<String>,
139        load_balancing_parameters: Option<LoadBalancingParams>,
140        backup: Option<bool>,
141    ) -> Backend {
142        let desired_policy = retry::ExponentialBackoffPolicy::new(6);
143        Backend {
144            sticky_id,
145            backend_id: backend_id.to_owned(),
146            address,
147            status: BackendStatus::Normal,
148            retry_policy: desired_policy.into(),
149            active_connections: 0,
150            active_requests: 0,
151            failures: 0,
152            load_balancing_parameters,
153            backup: backup.unwrap_or(false),
154            connection_time: PeakEWMA::new(),
155            health: HealthState::default(),
156        }
157    }
158
159    pub fn set_closing(&mut self) {
160        self.status = BackendStatus::Closing;
161    }
162
163    pub fn retry_policy(&mut self) -> &mut retry::RetryPolicyWrapper {
164        &mut self.retry_policy
165    }
166
167    pub fn can_open(&self) -> bool {
168        if !self.health.is_healthy() {
169            return false;
170        }
171        if let Some(action) = self.retry_policy.can_try() {
172            self.status == BackendStatus::Normal && action == retry::RetryAction::OKAY
173        } else {
174            false
175        }
176    }
177
178    /// Canonical "available" check used by per-backend metrics and cluster
179    /// availability accounting. Slightly more permissive than `can_open()`:
180    /// a backend currently in an exponential-backoff *wait window* still
181    /// counts as available because the next call after the window ends
182    /// will route to it without operator intervention. The dashboard
183    /// reading must reflect "operationally up, not exhausted" rather than
184    /// "ready to receive *this* request" — flicking the gauge to 0 on
185    /// every transient backoff would drown out genuine `is_down()`
186    /// transitions. Pairs with `BackendList::evaluate_availability`,
187    /// which applies the same predicate cluster-wide.
188    pub fn is_available(&self) -> bool {
189        self.health.is_healthy()
190            && self.status == BackendStatus::Normal
191            && !self.retry_policy.is_down()
192    }
193
194    pub fn inc_connections(&mut self) -> Option<usize> {
195        if self.status == BackendStatus::Normal {
196            self.active_connections += 1;
197            Some(self.active_connections)
198        } else {
199            None
200        }
201    }
202
203    /// TODO: normalize with saturating_sub()
204    pub fn dec_connections(&mut self) -> Option<usize> {
205        match self.status {
206            BackendStatus::Normal => {
207                if self.active_connections > 0 {
208                    self.active_connections -= 1;
209                }
210                Some(self.active_connections)
211            }
212            BackendStatus::Closed => None,
213            BackendStatus::Closing => {
214                if self.active_connections > 0 {
215                    self.active_connections -= 1;
216                }
217                if self.active_connections == 0 {
218                    self.status = BackendStatus::Closed;
219                    None
220                } else {
221                    Some(self.active_connections)
222                }
223            }
224        }
225    }
226
227    pub fn set_connection_time(&mut self, dur: Duration) {
228        self.connection_time.observe(dur.as_nanos() as f64);
229    }
230
231    pub fn peak_ewma_connection(&mut self) -> f64 {
232        self.connection_time.get(self.active_connections)
233    }
234
235    pub fn try_connect(&mut self) -> Result<mio::net::TcpStream, BackendError> {
236        if self.status != BackendStatus::Normal {
237            return Err(BackendError::Status(self.status.to_owned()));
238        }
239
240        match mio::net::TcpStream::connect(self.address) {
241            Ok(tcp_stream) => {
242                //self.retry_policy.succeed();
243                self.inc_connections();
244                Ok(tcp_stream)
245            }
246            Err(io_error) => {
247                self.retry_policy.fail();
248                self.failures += 1;
249                // TODO: handle EINPROGRESS. It is difficult. It is discussed here:
250                // https://docs.rs/mio/latest/mio/net/struct.TcpStream.html#method.connect
251                // with an example code here:
252                // https://github.com/Thomasdezeeuw/heph/blob/0c4f1ab3eaf08bea1d65776528bfd6114c9f8374/src/net/tcp/stream.rs#L560-L622
253                Err(BackendError::MioConnection(io_error))
254            }
255        }
256    }
257}
258
259// when a backend has been removed from configuration and the last connection to
260// it has stopped, it will be dropped, so we can notify that the backend server
261// can be safely stopped
262impl std::ops::Drop for Backend {
263    fn drop(&mut self) {
264        server::push_event(Event {
265            kind: EventKind::RemovedBackendHasNoConnections as i32,
266            backend_id: Some(self.backend_id.to_owned()),
267            address: Some(self.address.into()),
268            cluster_id: None,
269            metric_detail: None,
270        });
271    }
272}
273
274#[derive(Debug)]
275pub struct BackendMap {
276    pub backends: HashMap<ClusterId, BackendList>,
277    pub max_failures: usize,
278    pub health_check_configs: HashMap<ClusterId, HealthCheckConfig>,
279    /// Whether the cluster's backends speak HTTP/2 (cluster.http2 = true).
280    /// Mirrors the same backend-capability hint the mux router reads at
281    /// `protocol/mux/router.rs::Router::connect`. The health checker uses
282    /// it to switch the probe wire format from HTTP/1.1 to h2c so an
283    /// h2c-only backend is not probed with an HTTP/1.1 preface that
284    /// would always fail.
285    pub cluster_http2: HashMap<ClusterId, bool>,
286}
287
288impl Default for BackendMap {
289    fn default() -> Self {
290        Self::new()
291    }
292}
293
294impl BackendMap {
295    pub fn new() -> BackendMap {
296        BackendMap {
297            backends: HashMap::new(),
298            max_failures: 3,
299            health_check_configs: HashMap::new(),
300            cluster_http2: HashMap::new(),
301        }
302    }
303
304    /// Re-evaluate the availability of `cluster_id`, publish the
305    /// `cluster.available_backends` / `cluster.total_backends` gauges,
306    /// and emit the transition log + counter + `Event` exactly when the
307    /// per-cluster state flips between `Available` and `AllDown`.
308    ///
309    /// Empty clusters (`total == 0`) never report `AllDown` — avoids log
310    /// spam during cluster bootstrap when backends are still being
311    /// registered. The (0, 0) gauges are still published so dashboards
312    /// see "cluster exists, zero backends configured" as a distinct
313    /// state from "cluster doesn't exist".
314    ///
315    /// Takes `&self` so callers that already hold `&mut BackendMap`
316    /// can drop their `&mut BackendList` borrow before invoking it
317    /// without re-borrowing.
318    pub(crate) fn record_cluster_availability(&self, cluster_id: &str) {
319        let Some(list) = self.backends.get(cluster_id) else {
320            return;
321        };
322
323        let (available, total) = list.evaluate_availability();
324        gauge!(
325            names::cluster::AVAILABLE_BACKENDS,
326            available,
327            Some(cluster_id),
328            None
329        );
330        gauge!(
331            names::cluster::TOTAL_BACKENDS,
332            total,
333            Some(cluster_id),
334            None
335        );
336
337        let new_state = if total > 0 && available == 0 {
338            ClusterAvailability::AllDown
339        } else {
340            ClusterAvailability::Available
341        };
342
343        let prev = list.availability.replace(new_state);
344        if prev == new_state {
345            return;
346        }
347        match (prev, new_state) {
348            (ClusterAvailability::Available, ClusterAvailability::AllDown) => {
349                error!("cluster {}: all {} backends are down", cluster_id, total);
350                incr!(
351                    names::cluster::NO_AVAILABLE_BACKENDS,
352                    Some(cluster_id),
353                    None
354                );
355                push_event(Event {
356                    kind: EventKind::NoAvailableBackends as i32,
357                    cluster_id: Some(cluster_id.to_owned()),
358                    backend_id: None,
359                    address: None,
360                    metric_detail: None,
361                });
362            }
363            (ClusterAvailability::AllDown, ClusterAvailability::Available) => {
364                info!(
365                    "cluster {}: backends recovered ({}/{} available)",
366                    cluster_id, available, total
367                );
368                incr!(names::cluster::AVAILABLE_RECOVERED, Some(cluster_id), None);
369                push_event(Event {
370                    kind: EventKind::ClusterRecovered as i32,
371                    cluster_id: Some(cluster_id.to_owned()),
372                    backend_id: None,
373                    address: None,
374                    metric_detail: None,
375                });
376            }
377            _ => {}
378        }
379    }
380
381    /// Record (or clear) the `cluster.http2` backend-capability hint for
382    /// `cluster_id`. The health checker reads the resulting map at probe
383    /// time so the wire format follows what the mux router will use to
384    /// connect to the same backends.
385    pub fn set_cluster_http2(&mut self, cluster_id: &str, http2: bool) {
386        if http2 {
387            self.cluster_http2.insert(cluster_id.to_owned(), true);
388        } else {
389            self.cluster_http2.remove(cluster_id);
390        }
391    }
392
393    pub fn set_health_check_config(&mut self, cluster_id: &str, config: Option<HealthCheckConfig>) {
394        match config {
395            Some(c) => {
396                self.health_check_configs.insert(cluster_id.to_owned(), c);
397            }
398            None => {
399                self.health_check_configs.remove(cluster_id);
400                // When the operator drops the health check, any
401                // previously-recorded `HealthState::Unhealthy` would
402                // otherwise stick — `next_available_backend` keeps
403                // skipping the backend even though we have stopped
404                // probing it. Reset every backend in the cluster to a
405                // pristine healthy state so the load balancer can
406                // route again.
407                if let Some(backend_list) = self.backends.get(cluster_id) {
408                    for backend in &backend_list.backends {
409                        backend.borrow_mut().health = HealthState::default();
410                    }
411                }
412                // Re-emit the rollup gauges so dashboards reflect the
413                // post-reset availability instead of holding the last
414                // health-check value indefinitely.
415                self.record_cluster_availability(cluster_id);
416            }
417        }
418    }
419
420    pub fn import_configuration_state(
421        &mut self,
422        backends: &HashMap<ClusterId, Vec<sozu_command::response::Backend>>,
423    ) {
424        self.backends
425            .extend(backends.iter().map(|(cluster_id, backend_vec)| {
426                (
427                    cluster_id.to_string(),
428                    BackendList::import_configuration_state(backend_vec),
429                )
430            }));
431        // Replay path inserts every cluster's backend list without
432        // touching the gauge emission sites used by add/remove/health.
433        // Latch `cluster.available_backends` and `.total_backends` here
434        // so a freshly-loaded worker reports correct values on the very
435        // first `QueryMetrics` instead of zero until something else
436        // mutates each cluster.
437        for cluster_id in backends.keys() {
438            self.record_cluster_availability(cluster_id);
439        }
440    }
441
442    pub fn add_backend(&mut self, cluster_id: &str, backend: Backend) {
443        self.backends
444            .entry(cluster_id.to_string())
445            .or_default()
446            .add_backend(backend);
447        // Publish initial gauges and surface the corner case where a fresh
448        // cluster's first backend is already down (e.g. registered with a
449        // pre-existing failed retry policy). For an `Available` initial
450        // backend this is just a (1, 1) gauge emission with no transition.
451        self.record_cluster_availability(cluster_id);
452    }
453
454    // TODO: return <Result, BackendError>, log the error downstream
455    /// Remove every backend at `backend_address` from `cluster_id` and
456    /// return the list of `backend_id`s that were dropped. Callers (e.g.
457    /// `Server::remove_backend`) iterate over the returned ids to tear
458    /// down per-backend metrics so the identity used by the runtime
459    /// (address-keyed) matches the identity used by the metrics layer
460    /// (id-keyed) — see PR #1252 follow-up review MEDIUM-3.
461    pub fn remove_backend(
462        &mut self,
463        cluster_id: &str,
464        backend_address: &SocketAddr,
465    ) -> Vec<String> {
466        let removed = if let Some(backends) = self.backends.get_mut(cluster_id) {
467            backends.remove_backend(backend_address)
468        } else {
469            error!(
470                "Backend was already removed: cluster id {}, address {:?}",
471                cluster_id, backend_address
472            );
473            return Vec::new();
474        };
475        // Re-evaluate so removing the last backend logs an explicit
476        // `AllDown` transition (or, with `total == 0`, drops back to
477        // silent gauges).
478        self.record_cluster_availability(cluster_id);
479        removed
480    }
481
482    // TODO: return <Result, BackendError>, log the error downstream
483    pub fn close_backend_connection(&mut self, cluster_id: &str, addr: &SocketAddr) {
484        if let Some(cluster_backends) = self.backends.get_mut(cluster_id) {
485            if let Some(ref mut backend) = cluster_backends.find_backend(addr) {
486                backend.borrow_mut().dec_connections();
487            }
488        }
489    }
490
491    pub fn has_backend(&self, cluster_id: &str, backend: &Backend) -> bool {
492        self.backends
493            .get(cluster_id)
494            .map(|backends| backends.has_backend(&backend.address))
495            .unwrap_or(false)
496    }
497
498    pub fn backend_from_cluster_id(
499        &mut self,
500        cluster_id: &str,
501    ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
502        let cluster_backends = self
503            .backends
504            .get_mut(cluster_id)
505            .ok_or(BackendError::NoBackendForCluster(cluster_id.to_owned()))?;
506
507        if cluster_backends.backends.is_empty() {
508            // Drop the &mut BackendList borrow before the &self helper call.
509            // `total == 0` falls into the "never report AllDown" branch in
510            // record_cluster_availability, so this just publishes the (0, 0)
511            // gauges.
512            let _ = cluster_backends;
513            self.record_cluster_availability(cluster_id);
514            return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
515        }
516
517        let next_backend = match cluster_backends.next_available_backend() {
518            Some(nb) => nb,
519            None => {
520                // Drop the &mut BackendList before the &self helper call.
521                // The helper observes (available=0, total>0) and emits the
522                // Available -> AllDown transition (log + counter + Event)
523                // exactly once per regime entry. Subsequent calls in the
524                // same AllDown regime are no-ops.
525                let _ = cluster_backends;
526                self.record_cluster_availability(cluster_id);
527                return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
528            }
529        };
530
531        let tcp_stream = {
532            let mut borrowed_backend = next_backend.borrow_mut();
533
534            debug!(
535                "Connecting {} -> {:?}",
536                cluster_id,
537                (
538                    borrowed_backend.address,
539                    borrowed_backend.active_connections,
540                    borrowed_backend.failures
541                )
542            );
543
544            borrowed_backend.try_connect().map_err(|backend_error| {
545                BackendError::ConnectionFailures {
546                    cluster_id: cluster_id.to_owned(),
547                    backend_address: borrowed_backend.address,
548                    failures: borrowed_backend.failures,
549                    error: backend_error.to_string(),
550                }
551            })?
552        };
553
554        // Connection succeeded: re-evaluate so we capture an
555        // AllDown -> Available recovery transition the moment a request
556        // first hits a healthy backend after an outage. `next_backend` is
557        // not borrowed here (the inner block dropped `borrowed_backend`),
558        // so the helper's `BackendList::evaluate_availability` walk is
559        // free to call `borrow()` on every backend.
560        let _ = cluster_backends;
561        self.record_cluster_availability(cluster_id);
562
563        Ok((next_backend.clone(), tcp_stream))
564    }
565
566    pub fn backend_from_sticky_session(
567        &mut self,
568        cluster_id: &str,
569        sticky_session: &str,
570    ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
571        let sticky_conn = self
572            .backends
573            .get_mut(cluster_id)
574            .and_then(|cluster_backends| cluster_backends.find_sticky(sticky_session))
575            .map(|backend| {
576                let mut borrowed = backend.borrow_mut();
577                let conn = borrowed.try_connect();
578
579                conn.map(|tcp_stream| (backend.clone(), tcp_stream))
580                    .inspect_err(|_| {
581                        error!(
582                            "could not connect {} to {:?} using session {} ({} failures)",
583                            cluster_id, borrowed.address, sticky_session, borrowed.failures
584                        )
585                    })
586            });
587
588        match sticky_conn {
589            Some(backend_and_stream) => backend_and_stream,
590            None => {
591                debug!(
592                    "Couldn't find a backend corresponding to sticky_session {} for cluster {}",
593                    sticky_session, cluster_id
594                );
595                self.backend_from_cluster_id(cluster_id)
596            }
597        }
598    }
599
600    pub fn set_load_balancing_policy_for_cluster(
601        &mut self,
602        cluster_id: &str,
603        lb_algo: LoadBalancingAlgorithms,
604        metric: Option<LoadMetric>,
605    ) {
606        // The cluster can be created before the backends were registered because of the async config messages.
607        // So when we set the load balancing policy, we have to create the backend list if if it doesn't exist yet.
608        let cluster_backends = self.get_or_create_backend_list_for_cluster(cluster_id);
609        cluster_backends.set_load_balancing_policy(lb_algo, metric);
610    }
611
612    pub fn get_or_create_backend_list_for_cluster(&mut self, cluster_id: &str) -> &mut BackendList {
613        self.backends.entry(cluster_id.to_string()).or_default()
614    }
615}
616
617#[derive(Debug)]
618pub struct BackendList {
619    pub backends: Vec<Rc<RefCell<Backend>>>,
620    pub next_id: u32,
621    pub load_balancing: Box<dyn LoadBalancingAlgorithm>,
622    /// Latches the fail-open `warn!`. Set to `true` when fail-open routing
623    /// emits its entry warning so subsequent routing decisions in the same
624    /// regime stay quiet; reset to `false` when a healthy backend is
625    /// available again, so the regime-exit transition is logged exactly once.
626    /// Without this latch the warning fired per request, which under a
627    /// universal outage (the exact scenario fail-open targets) is also the
628    /// highest request-rate scenario — log volume would become catastrophic.
629    fail_open_warned: bool,
630    /// Per-cluster availability latched by `BackendMap::record_cluster_availability`.
631    /// `Cell` (not `RefCell`) because the receiver is `&self` and the
632    /// state is `Copy`. Worker runtime is single-threaded, so a `Cell` is
633    /// sound — no synchronisation needed.
634    pub(crate) availability: Cell<ClusterAvailability>,
635}
636
637impl Default for BackendList {
638    fn default() -> Self {
639        Self::new()
640    }
641}
642
643impl BackendList {
644    pub fn new() -> BackendList {
645        BackendList {
646            backends: Vec::new(),
647            next_id: 0,
648            load_balancing: Box::new(Random),
649            fail_open_warned: false,
650            availability: Cell::new(ClusterAvailability::Available),
651        }
652    }
653
654    /// Count `(available, total)` for this cluster. Delegates to
655    /// `Backend::is_available` so the per-cluster aggregate and the
656    /// per-backend `backend.available` gauge stay in lock-step.
657    pub(crate) fn evaluate_availability(&self) -> (usize, usize) {
658        let total = self.backends.len();
659        let available = self
660            .backends
661            .iter()
662            .filter(|b| b.borrow().is_available())
663            .count();
664        (available, total)
665    }
666
667    pub fn import_configuration_state(
668        backend_vec: &[sozu_command_lib::response::Backend],
669    ) -> BackendList {
670        let mut list = BackendList::new();
671        for backend in backend_vec {
672            let backend = Backend::new(
673                &backend.backend_id,
674                backend.address,
675                backend.sticky_id.clone(),
676                backend.load_balancing_parameters,
677                backend.backup,
678            );
679            list.add_backend(backend);
680        }
681
682        list
683    }
684
685    pub fn add_backend(&mut self, backend: Backend) {
686        match self.backends.iter_mut().find(|b| {
687            b.borrow().address == backend.address && b.borrow().backend_id == backend.backend_id
688        }) {
689            None => {
690                let backend = Rc::new(RefCell::new(backend));
691                self.backends.push(backend);
692                self.next_id += 1;
693            }
694            // the backend already exists, update the configuration while
695            // keeping connection retry state
696            Some(old_backend) => {
697                let mut b = old_backend.borrow_mut();
698                b.sticky_id.clone_from(&backend.sticky_id);
699                b.load_balancing_parameters
700                    .clone_from(&backend.load_balancing_parameters);
701                b.backup = backend.backup;
702            }
703        }
704    }
705
706    /// Remove every backend at `backend_address` and return the list of
707    /// `backend_id`s that were dropped. Two backends with the same address
708    /// but distinct ids (A/B test, weighted variant, dedup race) are both
709    /// removed here; the caller relies on the returned ids to tear down
710    /// matching per-backend state (metrics, health-check). Returning the
711    /// ids closes the identity drift between runtime-removal-by-address
712    /// and metrics-removal-by-id.
713    pub fn remove_backend(&mut self, backend_address: &SocketAddr) -> Vec<String> {
714        let mut removed = Vec::new();
715        self.backends.retain(|backend| {
716            let b = backend.borrow();
717            if &b.address == backend_address {
718                removed.push(b.backend_id.clone());
719                false
720            } else {
721                true
722            }
723        });
724        removed
725    }
726
727    pub fn has_backend(&self, backend_address: &SocketAddr) -> bool {
728        self.backends
729            .iter()
730            .any(|backend| backend.borrow().address == *backend_address)
731    }
732
733    pub fn find_backend(
734        &mut self,
735        backend_address: &SocketAddr,
736    ) -> Option<&mut Rc<RefCell<Backend>>> {
737        self.backends
738            .iter_mut()
739            .find(|backend| backend.borrow().address == *backend_address)
740    }
741
742    pub fn find_sticky(&mut self, sticky_session: &str) -> Option<&mut Rc<RefCell<Backend>>> {
743        self.backends
744            .iter_mut()
745            .find(|b| b.borrow().sticky_id.as_deref() == Some(sticky_session))
746            .and_then(|b| if b.borrow().can_open() { Some(b) } else { None })
747    }
748
749    pub fn available_backends(&mut self, backup: bool) -> Vec<Rc<RefCell<Backend>>> {
750        self.backends
751            .iter()
752            .filter(|backend| {
753                let owned = backend.borrow();
754                owned.backup == backup && owned.can_open()
755            })
756            .map(Clone::clone)
757            .collect()
758    }
759
760    pub fn next_available_backend(&mut self) -> Option<Rc<RefCell<Backend>>> {
761        let mut backends = self.available_backends(false);
762
763        if backends.is_empty() {
764            backends = self.available_backends(true);
765        }
766
767        if !backends.is_empty() {
768            // Healthy regime: log the fail-open exit transition exactly once.
769            if self.fail_open_warned {
770                info!(
771                    "fail-open: cluster recovered, {} backends now healthy",
772                    backends.len()
773                );
774                self.fail_open_warned = false;
775            }
776            return self.load_balancing.next_available_backend(&mut backends);
777        }
778
779        // Fail-open: when no backend passes the full `can_open()` gate,
780        // route to backends that are administratively `Normal` AND whose
781        // retry policy reports `OKAY` (i.e., not currently in
782        // exponential-backoff). This prevents a shared dependency outage
783        // (e.g., database) from making the entire cluster unavailable while
784        // still respecting the per-backend back-off window — hammering a
785        // backend at line rate during its back-off would defeat the back-off
786        // itself. Ref: Amazon "Implementing Health Checks".
787        backends = self
788            .backends
789            .iter()
790            .filter(|b| {
791                let owned = b.borrow();
792                owned.status == BackendStatus::Normal
793                    && matches!(owned.retry_policy.can_try(), Some(retry::RetryAction::OKAY))
794            })
795            .map(Clone::clone)
796            .collect();
797
798        if backends.is_empty() {
799            return None;
800        }
801
802        // Latched warning + per-decision counter: the warn! fires once on
803        // regime entry; the counter is the operator-visible per-request
804        // signal that does not drown logs under universal outage.
805        if !self.fail_open_warned {
806            warn!(
807                "fail-open: all backends unhealthy, routing to {} normal backends with retry-policy OKAY",
808                backends.len()
809            );
810            self.fail_open_warned = true;
811        }
812        count!(names::backend::FAIL_OPEN, 1);
813
814        self.load_balancing.next_available_backend(&mut backends)
815    }
816
817    pub fn set_load_balancing_policy(
818        &mut self,
819        load_balancing_policy: LoadBalancingAlgorithms,
820        metric: Option<LoadMetric>,
821    ) {
822        match load_balancing_policy {
823            LoadBalancingAlgorithms::RoundRobin => {
824                self.load_balancing = Box::new(RoundRobin::new())
825            }
826            LoadBalancingAlgorithms::Random => self.load_balancing = Box::new(Random {}),
827            LoadBalancingAlgorithms::LeastLoaded => {
828                self.load_balancing = Box::new(LeastLoaded {
829                    metric: metric.unwrap_or(LoadMetric::Connections),
830                })
831            }
832            LoadBalancingAlgorithms::PowerOfTwo => {
833                self.load_balancing = Box::new(PowerOfTwo {
834                    metric: metric.unwrap_or(LoadMetric::Connections),
835                })
836            }
837        }
838    }
839}
840
841#[cfg(test)]
842mod backends_test {
843
844    use std::{net::TcpListener, sync::mpsc::*, thread};
845
846    use super::*;
847
848    fn run_mock_tcp_server(addr: &str, stopper: Receiver<()>) {
849        let mut run = true;
850        let listener = TcpListener::bind(addr).unwrap();
851
852        thread::spawn(move || {
853            while run {
854                for _stream in listener.incoming() {
855                    // accept connections
856                    if let Ok(()) = stopper.try_recv() {
857                        run = false;
858                    }
859                }
860            }
861        });
862    }
863
864    #[test]
865    fn it_should_retrieve_a_backend_from_cluster_id_when_backends_have_been_recorded() {
866        let mut backend_map = BackendMap::new();
867        let cluster_id = "mycluster";
868
869        let backend_addr = "127.0.0.1:1236";
870        let (sender, receiver) = channel();
871        run_mock_tcp_server(backend_addr, receiver);
872
873        backend_map.add_backend(
874            cluster_id,
875            Backend::new(
876                &format!("{cluster_id}-1"),
877                backend_addr.parse().unwrap(),
878                None,
879                None,
880                None,
881            ),
882        );
883
884        assert!(backend_map.backend_from_cluster_id(cluster_id).is_ok());
885        sender.send(()).unwrap();
886    }
887
888    #[test]
889    fn it_should_not_retrieve_a_backend_from_cluster_id_when_backend_has_not_been_recorded() {
890        let mut backend_map = BackendMap::new();
891        let cluster_not_recorded = "not";
892        backend_map.add_backend(
893            "foo",
894            Backend::new("foo-1", "127.0.0.1:9001".parse().unwrap(), None, None, None),
895        );
896
897        assert!(
898            backend_map
899                .backend_from_cluster_id(cluster_not_recorded)
900                .is_err()
901        );
902    }
903
904    #[test]
905    fn it_should_not_retrieve_a_backend_from_cluster_id_when_backend_list_is_empty() {
906        let mut backend_map = BackendMap::new();
907
908        assert!(backend_map.backend_from_cluster_id("dumb").is_err());
909    }
910
911    #[test]
912    fn it_should_retrieve_a_backend_from_sticky_session_when_the_backend_has_been_recorded() {
913        let mut backend_map = BackendMap::new();
914        let cluster_id = "mycluster";
915        let sticky_session = "server-2";
916
917        let backend_addr = "127.0.0.1:3456";
918        let (sender, receiver) = channel();
919        run_mock_tcp_server(backend_addr, receiver);
920
921        backend_map.add_backend(
922            cluster_id,
923            Backend::new(
924                &format!("{cluster_id}-1"),
925                "127.0.0.1:9001".parse().unwrap(),
926                Some("server-1".to_string()),
927                None,
928                None,
929            ),
930        );
931        backend_map.add_backend(
932            cluster_id,
933            Backend::new(
934                &format!("{cluster_id}-2"),
935                "127.0.0.1:9000".parse().unwrap(),
936                Some("server-2".to_string()),
937                None,
938                None,
939            ),
940        );
941        // sticky backend
942        backend_map.add_backend(
943            cluster_id,
944            Backend::new(
945                &format!("{cluster_id}-3"),
946                backend_addr.parse().unwrap(),
947                Some("server-3".to_string()),
948                None,
949                None,
950            ),
951        );
952
953        assert!(
954            backend_map
955                .backend_from_sticky_session(cluster_id, sticky_session)
956                .is_ok()
957        );
958        sender.send(()).unwrap();
959    }
960
961    #[test]
962    fn it_should_not_retrieve_a_backend_from_sticky_session_when_the_backend_has_not_been_recorded()
963    {
964        let mut backend_map = BackendMap::new();
965        let cluster_id = "mycluster";
966        let sticky_session = "test";
967
968        assert!(
969            backend_map
970                .backend_from_sticky_session(cluster_id, sticky_session)
971                .is_err()
972        );
973    }
974
975    #[test]
976    fn it_should_not_retrieve_a_backend_from_sticky_session_when_the_backend_list_is_empty() {
977        let mut backend_map = BackendMap::new();
978        let mycluster_not_recorded = "mycluster";
979        let sticky_session = "test";
980
981        assert!(
982            backend_map
983                .backend_from_sticky_session(mycluster_not_recorded, sticky_session)
984                .is_err()
985        );
986    }
987
988    #[test]
989    fn it_should_add_a_backend_when_he_doesnt_already_exist() {
990        let backend_id = "myback";
991        let mut backends_list = BackendList::new();
992        backends_list.add_backend(Backend::new(
993            backend_id,
994            "127.0.0.1:80".parse().unwrap(),
995            None,
996            None,
997            None,
998        ));
999
1000        assert_eq!(1, backends_list.backends.len());
1001    }
1002
1003    #[test]
1004    fn it_should_not_add_a_backend_when_he_already_exist() {
1005        let backend_id = "myback";
1006        let mut backends_list = BackendList::new();
1007        backends_list.add_backend(Backend::new(
1008            backend_id,
1009            "127.0.0.1:80".parse().unwrap(),
1010            None,
1011            None,
1012            None,
1013        ));
1014
1015        //same backend id
1016        backends_list.add_backend(Backend::new(
1017            backend_id,
1018            "127.0.0.1:80".parse().unwrap(),
1019            None,
1020            None,
1021            None,
1022        ));
1023
1024        assert_eq!(1, backends_list.backends.len());
1025    }
1026
1027    /// Build a backend addressed at 127.0.0.1:port and force it Unhealthy
1028    /// without going through the health-check loop.
1029    fn unhealthy_backend(id: &str, port: u16) -> Backend {
1030        let mut backend = Backend::new(
1031            id,
1032            format!("127.0.0.1:{port}").parse().unwrap(),
1033            None,
1034            None,
1035            None,
1036        );
1037        // Threshold = 1 transitions on the first failure.
1038        backend.health.record_failure(1);
1039        assert!(!backend.health.is_healthy());
1040        backend
1041    }
1042
1043    #[test]
1044    fn fail_open_picks_normal_backend_in_retry_policy_okay() {
1045        // All backends are unhealthy but their retry policy is fresh (OKAY),
1046        // so fail-open must select one. A fresh ExponentialBackoffPolicy
1047        // returns OKAY on `can_try()` until the first `fail()` arms a wait
1048        // window.
1049        let mut list = BackendList::new();
1050        list.add_backend(unhealthy_backend("b1", 9001));
1051        list.add_backend(unhealthy_backend("b2", 9002));
1052
1053        // Sanity: `available_backends` returns nothing (the regular path).
1054        assert!(list.available_backends(false).is_empty());
1055        assert!(list.available_backends(true).is_empty());
1056
1057        let picked = list.next_available_backend();
1058        assert!(
1059            picked.is_some(),
1060            "fail-open must pick a Normal+OKAY backend"
1061        );
1062        assert!(list.fail_open_warned, "regime entry must latch the warn!");
1063    }
1064
1065    #[test]
1066    fn fail_open_skips_backend_in_retry_backoff() {
1067        // Same shape as above, but each backend's retry policy is in the
1068        // WAIT window after a recorded failure. Fail-open must NOT pick any
1069        // of them — hammering a backend at line rate during its back-off
1070        // window is exactly what the back-off is protecting against — and
1071        // the regime-entry warn! must NOT latch (no log spam either).
1072        let mut list = BackendList::new();
1073        list.add_backend(unhealthy_backend("b1", 9011));
1074        list.add_backend(unhealthy_backend("b2", 9012));
1075        for backend_rc in &list.backends {
1076            backend_rc.borrow_mut().retry_policy().fail();
1077            assert_eq!(
1078                Some(retry::RetryAction::WAIT),
1079                backend_rc.borrow().retry_policy.can_try(),
1080                "test fixture must place retry policy in WAIT"
1081            );
1082        }
1083
1084        let picked = list.next_available_backend();
1085        assert!(
1086            picked.is_none(),
1087            "fail-open must skip backends whose retry policy is in WAIT"
1088        );
1089        assert!(
1090            !list.fail_open_warned,
1091            "no candidate backends, no regime entry"
1092        );
1093    }
1094
1095    #[test]
1096    fn fail_open_warn_latched() {
1097        // First call enters the regime → latch flips, warn! fires.
1098        // Second call stays in the regime → latch stays, no second warn!.
1099        // Recovering one backend → latch clears on the next routing call.
1100        let mut list = BackendList::new();
1101        list.add_backend(unhealthy_backend("b1", 9021));
1102        list.add_backend(unhealthy_backend("b2", 9022));
1103
1104        assert!(list.next_available_backend().is_some());
1105        assert!(list.fail_open_warned, "first fail-open must latch");
1106
1107        assert!(list.next_available_backend().is_some());
1108        assert!(
1109            list.fail_open_warned,
1110            "subsequent fail-open routing keeps the latch"
1111        );
1112
1113        // Heal one backend — the next routing call takes the healthy path
1114        // and must clear the latch (regime exit logged once).
1115        list.backends[0].borrow_mut().health.status = HealthStatus::Healthy;
1116        let picked = list.next_available_backend();
1117        assert!(
1118            picked.is_some(),
1119            "regular path must select the healed backend"
1120        );
1121        assert!(
1122            !list.fail_open_warned,
1123            "regime exit must clear the latch so the next entry is logged again"
1124        );
1125    }
1126
1127    // ── #892: per-cluster availability tracker ──────────────────────────
1128
1129    /// Build a backend that passes the `evaluate_availability` predicate
1130    /// (`status == Normal && health.is_healthy() && !retry_policy.is_down()`).
1131    /// A `Backend::new` returns Normal/Healthy with a fresh
1132    /// ExponentialBackoffPolicy that reports `is_down() == false` until the
1133    /// first `fail()`, so this is just `Backend::new` with a stable address.
1134    fn healthy_backend(id: &str, port: u16) -> Backend {
1135        Backend::new(
1136            id,
1137            format!("127.0.0.1:{port}").parse().unwrap(),
1138            None,
1139            None,
1140            None,
1141        )
1142    }
1143
1144    #[test]
1145    fn is_available_requires_health_status_and_retry_policy() {
1146        // Fresh backend: Healthy + Normal + retry_policy fresh (OKAY).
1147        let mut backend = Backend::new("b", "127.0.0.1:9050".parse().unwrap(), None, None, None);
1148        assert!(backend.is_available(), "fresh backend must be available");
1149
1150        // Unhealthy fails the predicate even with everything else OK.
1151        backend.health.record_failure(1);
1152        assert!(!backend.is_available(), "unhealthy must not be available");
1153
1154        // Restore health, then drive retry policy into the exhausted-budget
1155        // state via the test-only helper. Calling `fail()` in a tight loop
1156        // would early-return on the second invocation because the
1157        // exponential-backoff window has not elapsed yet, so the natural
1158        // path needs real-time sleeps the unit test cannot afford.
1159        backend.health.status = HealthStatus::Healthy;
1160        assert!(backend.is_available());
1161        backend.retry_policy.force_down();
1162        assert!(
1163            backend.retry_policy.is_down(),
1164            "test setup: retry policy budget must be exhausted",
1165        );
1166        assert!(
1167            !backend.is_available(),
1168            "retry-policy backoff must fail the predicate"
1169        );
1170
1171        // Reset retry, switch lifecycle to Closing.
1172        backend.retry_policy.succeed();
1173        backend.set_closing();
1174        assert!(
1175            !backend.is_available(),
1176            "Closing lifecycle status must fail the predicate"
1177        );
1178    }
1179
1180    #[test]
1181    fn evaluate_availability_empty_list_returns_zero_zero() {
1182        let list = BackendList::new();
1183        assert_eq!((0, 0), list.evaluate_availability());
1184    }
1185
1186    #[test]
1187    fn evaluate_availability_counts_only_healthy_normal_not_in_backoff() {
1188        let mut list = BackendList::new();
1189        list.add_backend(healthy_backend("b-ok-1", 9101));
1190        list.add_backend(healthy_backend("b-ok-2", 9102));
1191        list.add_backend(unhealthy_backend("b-bad", 9103));
1192        let (available, total) = list.evaluate_availability();
1193        assert_eq!(3, total, "every configured backend counts toward total");
1194        assert_eq!(
1195            2, available,
1196            "only the two healthy backends pass the predicate"
1197        );
1198    }
1199
1200    #[test]
1201    fn evaluate_availability_excludes_retry_policy_down() {
1202        let mut list = BackendList::new();
1203        list.add_backend(healthy_backend("b-fresh", 9111));
1204        list.add_backend(healthy_backend("b-fail", 9112));
1205        // Drive the second backend's retry policy into is_down() via the
1206        // test-only force_down() helper. A natural exhaustion would
1207        // require waiting through the exponential-backoff windows
1208        // (`fail()` early-returns when called inside one).
1209        list.backends[1].borrow_mut().retry_policy.force_down();
1210        let (available, total) = list.evaluate_availability();
1211        assert_eq!(2, total);
1212        assert_eq!(
1213            1, available,
1214            "retry-policy is_down() backend must be excluded even when health.is_healthy()"
1215        );
1216    }
1217
1218    #[test]
1219    fn record_cluster_availability_flips_to_alldown_then_idempotent() {
1220        let mut map = BackendMap::new();
1221        let cluster_id = "c-flap";
1222        map.add_backend(cluster_id, unhealthy_backend("b1", 9201));
1223        // After add_backend the helper has run; total=1, available=0,
1224        // so the cell must already be AllDown.
1225        let list = map.backends.get(cluster_id).expect("cluster present");
1226        assert_eq!(
1227            ClusterAvailability::AllDown,
1228            list.availability.get(),
1229            "single unhealthy backend must drive the cell to AllDown"
1230        );
1231        // Calling again in the same regime is a no-op (Cell already AllDown).
1232        map.record_cluster_availability(cluster_id);
1233        let list = map.backends.get(cluster_id).expect("cluster present");
1234        assert_eq!(
1235            ClusterAvailability::AllDown,
1236            list.availability.get(),
1237            "repeat call must keep the cell at AllDown without flipping"
1238        );
1239    }
1240
1241    #[test]
1242    fn record_cluster_availability_recovers_to_available() {
1243        let mut map = BackendMap::new();
1244        let cluster_id = "c-recover";
1245        map.add_backend(cluster_id, unhealthy_backend("b1", 9301));
1246        assert_eq!(
1247            ClusterAvailability::AllDown,
1248            map.backends.get(cluster_id).unwrap().availability.get()
1249        );
1250        // Heal the backend in place and re-evaluate. Without going through
1251        // a routing call the helper still fires from add_backend / HC, so
1252        // here we drive it manually.
1253        map.backends.get_mut(cluster_id).unwrap().backends[0]
1254            .borrow_mut()
1255            .health
1256            .status = HealthStatus::Healthy;
1257        map.record_cluster_availability(cluster_id);
1258        assert_eq!(
1259            ClusterAvailability::Available,
1260            map.backends.get(cluster_id).unwrap().availability.get(),
1261            "healed backend must flip the cell back to Available"
1262        );
1263    }
1264
1265    #[test]
1266    fn record_cluster_availability_empty_cluster_stays_available() {
1267        let mut map = BackendMap::new();
1268        let cluster_id = "c-empty";
1269        map.backends
1270            .insert(cluster_id.to_owned(), BackendList::new());
1271        // total == 0 path: never report AllDown — avoids log spam during
1272        // cluster bootstrap when backends are still being registered.
1273        map.record_cluster_availability(cluster_id);
1274        assert_eq!(
1275            ClusterAvailability::Available,
1276            map.backends.get(cluster_id).unwrap().availability.get(),
1277            "empty cluster must keep the cell at the default Available"
1278        );
1279    }
1280
1281    #[test]
1282    fn record_cluster_availability_missing_cluster_is_noop() {
1283        let map = BackendMap::new();
1284        // No panic, no insert — just an early return.
1285        map.record_cluster_availability("c-absent");
1286        assert!(
1287            !map.backends.contains_key("c-absent"),
1288            "helper must not insert a BackendList for an unknown cluster_id"
1289        );
1290    }
1291
1292    #[test]
1293    fn import_configuration_state_latches_cluster_rollup_gauges() {
1294        use crate::metrics::METRICS;
1295        use sozu_command_lib::proto::command::QueryMetricsOptions;
1296        // Unique cluster id so the assertion is not perturbed by gauges
1297        // left in the thread-local METRICS aggregator by sibling tests.
1298        let cluster_id = "c-import-rollup-9701";
1299        let mut map = BackendMap::new();
1300        let mut input = HashMap::new();
1301        input.insert(
1302            cluster_id.to_owned(),
1303            vec![sozu_command_lib::response::Backend {
1304                cluster_id: cluster_id.to_owned(),
1305                backend_id: "b1".to_owned(),
1306                address: "127.0.0.1:9701".parse().unwrap(),
1307                sticky_id: None,
1308                load_balancing_parameters: None,
1309                backup: None,
1310            }],
1311        );
1312        map.import_configuration_state(&input);
1313        let response = METRICS
1314            .with(|m| {
1315                m.borrow_mut().query(&QueryMetricsOptions {
1316                    metric_names: vec![
1317                        names::cluster::AVAILABLE_BACKENDS.to_owned(),
1318                        names::cluster::TOTAL_BACKENDS.to_owned(),
1319                    ],
1320                    cluster_ids: vec![cluster_id.to_owned()],
1321                    backend_ids: vec![],
1322                    list: false,
1323                    no_clusters: false,
1324                    workers: false,
1325                })
1326            })
1327            .expect("metrics query succeeds");
1328        let cluster_metrics = match response.content_type {
1329            Some(
1330                sozu_command_lib::proto::command::response_content::ContentType::WorkerMetrics(wm),
1331            ) => wm,
1332            other => panic!("expected WorkerMetrics, got {other:?}"),
1333        };
1334        let cm = cluster_metrics
1335            .clusters
1336            .get(cluster_id)
1337            .expect("imported cluster must have a ClusterMetrics entry");
1338        // Without the import-time `record_cluster_availability` call the
1339        // two rollup gauges would be absent here. The fix guarantees the
1340        // pair lands without waiting for any follow-up backend mutation.
1341        assert!(
1342            cm.cluster.contains_key(names::cluster::AVAILABLE_BACKENDS),
1343            "cluster.available_backends gauge must be latched at import time"
1344        );
1345        assert!(
1346            cm.cluster.contains_key(names::cluster::TOTAL_BACKENDS),
1347            "cluster.total_backends gauge must be latched at import time"
1348        );
1349    }
1350
1351    #[test]
1352    fn set_health_check_config_none_re_emits_rollup_after_reset() {
1353        let mut map = BackendMap::new();
1354        let cluster_id = "c-hc-reset";
1355        // Seed the cluster with an unhealthy backend so `add_backend`
1356        // drives the `availability` cell to AllDown.
1357        map.add_backend(cluster_id, unhealthy_backend("b1", 9801));
1358        assert_eq!(
1359            ClusterAvailability::AllDown,
1360            map.backends.get(cluster_id).unwrap().availability.get(),
1361            "test setup: unhealthy backend must register the cell at AllDown"
1362        );
1363        // Disabling the health check resets backend health to the default
1364        // pristine state AND must re-emit the rollup so the cell reflects
1365        // the post-reset availability instead of the stale AllDown.
1366        map.set_health_check_config(cluster_id, None);
1367        assert_eq!(
1368            ClusterAvailability::Available,
1369            map.backends.get(cluster_id).unwrap().availability.get(),
1370            "set_health_check_config(None) must re-emit the rollup after \
1371             resetting backend health, otherwise dashboards stay stuck at AllDown"
1372        );
1373    }
1374}