1use std::{
2 cell::{Cell, RefCell},
3 collections::HashMap,
4 net::SocketAddr,
5 rc::Rc,
6 time::Duration,
7};
8
9use mio::net::TcpStream;
10use sozu_command::{
11 proto::command::{
12 Event, EventKind, HealthCheckConfig, LoadBalancingAlgorithms, LoadBalancingParams,
13 LoadMetric,
14 },
15 state::ClusterId,
16};
17
18use crate::metrics::names;
19use crate::{
20 PeakEWMA,
21 load_balancing::{
22 LeastLoaded, LoadBalancingAlgorithm, Maglev, PowerOfTwo, Random, Rendezvous, RoundRobin,
23 },
24 retry::{self, RetryPolicy},
25 server::{self, push_event},
26};
27
28#[derive(thiserror::Error, Debug)]
29pub enum BackendError {
30 #[error("No backend found for cluster {0}")]
31 NoBackendForCluster(String),
32 #[error("Failed to connect to socket with MIO: {0}")]
33 MioConnection(std::io::Error),
34 #[error("This backend is not in a normal status: status={0:?}")]
35 Status(BackendStatus),
36 #[error("could not connect {cluster_id} to {backend_address:?} ({failures} failures): {error}")]
37 ConnectionFailures {
38 cluster_id: String,
39 backend_address: SocketAddr,
40 failures: usize,
41 error: String,
42 },
43}
44
45#[derive(Debug, PartialEq, Eq, Clone)]
46pub enum BackendStatus {
47 Normal,
48 Closing,
49 Closed,
50}
51
52#[derive(Debug, PartialEq, Eq, Clone, Copy)]
53pub enum HealthStatus {
54 Healthy,
55 Unhealthy,
56}
57
58#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
65pub(crate) enum ClusterAvailability {
66 #[default]
67 Available,
68 AllDown,
69}
70
71#[derive(Debug, Clone, PartialEq)]
72pub struct HealthState {
73 pub status: HealthStatus,
74 pub consecutive_successes: u32,
75 pub consecutive_failures: u32,
76}
77
78impl Default for HealthState {
79 fn default() -> Self {
80 HealthState {
81 status: HealthStatus::Healthy,
82 consecutive_successes: 0,
83 consecutive_failures: 0,
84 }
85 }
86}
87
88impl HealthState {
89 pub fn record_success(&mut self, healthy_threshold: u32) -> bool {
91 let was_unhealthy = self.status == HealthStatus::Unhealthy;
92 let successes_before = self.consecutive_successes;
93 self.consecutive_failures = 0;
94 self.consecutive_successes += 1;
95
96 debug_assert_eq!(
99 self.consecutive_failures, 0,
100 "a success must clear the consecutive-failure streak"
101 );
102 debug_assert_eq!(
103 self.consecutive_successes,
104 successes_before + 1,
105 "a success must advance the success streak by exactly one"
106 );
107
108 if was_unhealthy && self.consecutive_successes >= healthy_threshold {
109 self.status = HealthStatus::Healthy;
110 debug_assert!(
113 self.status == HealthStatus::Healthy,
114 "a reported recovery must leave the status Healthy"
115 );
116 return true;
117 }
118 false
119 }
120
121 pub fn record_failure(&mut self, unhealthy_threshold: u32) -> bool {
123 let was_healthy = self.status == HealthStatus::Healthy;
124 let failures_before = self.consecutive_failures;
125 self.consecutive_successes = 0;
126 self.consecutive_failures += 1;
127
128 debug_assert_eq!(
131 self.consecutive_successes, 0,
132 "a failure must clear the consecutive-success streak"
133 );
134 debug_assert_eq!(
135 self.consecutive_failures,
136 failures_before + 1,
137 "a failure must advance the failure streak by exactly one"
138 );
139
140 if was_healthy && self.consecutive_failures >= unhealthy_threshold {
141 self.status = HealthStatus::Unhealthy;
142 debug_assert!(
143 self.status == HealthStatus::Unhealthy,
144 "a reported drop must leave the status Unhealthy"
145 );
146 return true;
147 }
148 false
149 }
150
151 pub fn is_healthy(&self) -> bool {
152 self.status == HealthStatus::Healthy
153 }
154}
155
156#[derive(Debug, PartialEq, Clone)]
157pub struct Backend {
158 pub sticky_id: Option<String>,
159 pub backend_id: String,
160 pub address: SocketAddr,
161 pub status: BackendStatus,
162 pub retry_policy: retry::RetryPolicyWrapper,
163 pub active_connections: usize,
164 pub active_requests: usize,
165 pub failures: usize,
166 pub load_balancing_parameters: Option<LoadBalancingParams>,
167 pub backup: bool,
168 pub connection_time: PeakEWMA,
169 pub health: HealthState,
170}
171
172impl Backend {
173 pub fn new(
174 backend_id: &str,
175 address: SocketAddr,
176 sticky_id: Option<String>,
177 load_balancing_parameters: Option<LoadBalancingParams>,
178 backup: Option<bool>,
179 ) -> Backend {
180 let desired_policy = retry::ExponentialBackoffPolicy::new(6);
181 Backend {
182 sticky_id,
183 backend_id: backend_id.to_owned(),
184 address,
185 status: BackendStatus::Normal,
186 retry_policy: desired_policy.into(),
187 active_connections: 0,
188 active_requests: 0,
189 failures: 0,
190 load_balancing_parameters,
191 backup: backup.unwrap_or(false),
192 connection_time: PeakEWMA::new(),
193 health: HealthState::default(),
194 }
195 }
196
197 pub fn set_closing(&mut self) {
198 self.status = BackendStatus::Closing;
199 }
200
201 pub fn retry_policy(&mut self) -> &mut retry::RetryPolicyWrapper {
202 &mut self.retry_policy
203 }
204
205 pub fn can_open(&self) -> bool {
206 if !self.health.is_healthy() {
207 return false;
208 }
209 if let Some(action) = self.retry_policy.can_try() {
210 self.status == BackendStatus::Normal && action == retry::RetryAction::OKAY
211 } else {
212 false
213 }
214 }
215
216 pub fn is_available(&self) -> bool {
227 self.health.is_healthy()
228 && self.status == BackendStatus::Normal
229 && !self.retry_policy.is_down()
230 }
231
232 pub fn inc_connections(&mut self) -> Option<usize> {
233 let before = self.active_connections;
234 if self.status == BackendStatus::Normal {
235 self.active_connections += 1;
236 debug_assert_eq!(
239 self.active_connections,
240 before + 1,
241 "inc_connections must add exactly one active connection"
242 );
243 Some(self.active_connections)
244 } else {
245 debug_assert_eq!(
248 self.active_connections, before,
249 "inc_connections must not touch the count for a non-Normal backend"
250 );
251 None
252 }
253 }
254
255 pub fn dec_connections(&mut self) -> Option<usize> {
257 let before = self.active_connections;
258 match self.status {
259 BackendStatus::Normal => {
260 if self.active_connections > 0 {
261 self.active_connections -= 1;
262 }
263 debug_assert!(
266 self.active_connections <= before,
267 "dec_connections must never increase the active-connection count"
268 );
269 debug_assert_eq!(
270 self.active_connections,
271 before.saturating_sub(1),
272 "dec_connections must drop by exactly one (saturating at zero)"
273 );
274 Some(self.active_connections)
275 }
276 BackendStatus::Closed => {
277 debug_assert_eq!(
279 self.active_connections, before,
280 "dec_connections on a Closed backend must not mutate the count"
281 );
282 None
283 }
284 BackendStatus::Closing => {
285 if self.active_connections > 0 {
286 self.active_connections -= 1;
287 }
288 debug_assert_eq!(
289 self.active_connections,
290 before.saturating_sub(1),
291 "dec_connections must drop by exactly one (saturating at zero)"
292 );
293 if self.active_connections == 0 {
294 self.status = BackendStatus::Closed;
295 debug_assert_eq!(
298 self.status,
299 BackendStatus::Closed,
300 "a fully drained Closing backend must become Closed"
301 );
302 None
303 } else {
304 Some(self.active_connections)
305 }
306 }
307 }
308 }
309
310 pub fn set_connection_time(&mut self, dur: Duration) {
311 self.connection_time.observe(dur.as_nanos() as f64);
312 }
313
314 pub fn peak_ewma_connection(&mut self) -> f64 {
315 self.connection_time.get(self.active_connections)
316 }
317
318 pub fn try_connect(&mut self) -> Result<mio::net::TcpStream, BackendError> {
319 if self.status != BackendStatus::Normal {
320 return Err(BackendError::Status(self.status.to_owned()));
321 }
322 debug_assert_eq!(
325 self.status,
326 BackendStatus::Normal,
327 "try_connect only attempts a connection on a Normal backend"
328 );
329 let failures_before = self.failures;
330 let connections_before = self.active_connections;
331
332 match mio::net::TcpStream::connect(self.address) {
333 Ok(tcp_stream) => {
334 self.inc_connections();
336 debug_assert_eq!(
339 self.active_connections,
340 connections_before + 1,
341 "a successful connect must register exactly one active connection"
342 );
343 debug_assert_eq!(
344 self.failures, failures_before,
345 "a successful connect must not bump the failure counter"
346 );
347 Ok(tcp_stream)
348 }
349 Err(io_error) => {
350 self.retry_policy.fail();
351 self.failures += 1;
352 debug_assert_eq!(
356 self.failures,
357 failures_before + 1,
358 "a failed connect must advance the failure counter by exactly one"
359 );
360 debug_assert_eq!(
361 self.active_connections, connections_before,
362 "a failed connect must not register an active connection"
363 );
364 Err(BackendError::MioConnection(io_error))
369 }
370 }
371 }
372}
373
374impl std::ops::Drop for Backend {
378 fn drop(&mut self) {
379 server::push_event(Event {
380 kind: EventKind::RemovedBackendHasNoConnections as i32,
381 backend_id: Some(self.backend_id.to_owned()),
382 address: Some(self.address.into()),
383 cluster_id: None,
384 metric_detail: None,
385 });
386 }
387}
388
389#[derive(Debug)]
390pub struct BackendMap {
391 pub backends: HashMap<ClusterId, BackendList>,
392 pub max_failures: usize,
393 pub health_check_configs: HashMap<ClusterId, HealthCheckConfig>,
394 pub cluster_http2: HashMap<ClusterId, bool>,
401}
402
403impl Default for BackendMap {
404 fn default() -> Self {
405 Self::new()
406 }
407}
408
409impl BackendMap {
410 pub fn new() -> BackendMap {
411 BackendMap {
412 backends: HashMap::new(),
413 max_failures: 3,
414 health_check_configs: HashMap::new(),
415 cluster_http2: HashMap::new(),
416 }
417 }
418
419 pub(crate) fn record_cluster_availability(&self, cluster_id: &str) {
434 let Some(list) = self.backends.get(cluster_id) else {
435 return;
436 };
437
438 let (available, total) = list.evaluate_availability();
439 debug_assert!(
442 available <= total,
443 "available backends ({available}) cannot exceed total ({total})"
444 );
445 debug_assert_eq!(
446 total,
447 list.backends.len(),
448 "total must equal the number of registered backends"
449 );
450 gauge!(
451 names::cluster::AVAILABLE_BACKENDS,
452 available,
453 Some(cluster_id),
454 None
455 );
456 gauge!(
457 names::cluster::TOTAL_BACKENDS,
458 total,
459 Some(cluster_id),
460 None
461 );
462
463 let new_state = if total > 0 && available == 0 {
464 ClusterAvailability::AllDown
465 } else {
466 ClusterAvailability::Available
467 };
468 debug_assert!(
471 !(total == 0 && new_state == ClusterAvailability::AllDown),
472 "an empty cluster must never be reported AllDown"
473 );
474 debug_assert!(
475 !(available > 0 && new_state == ClusterAvailability::AllDown),
476 "a cluster with an available backend must not be AllDown"
477 );
478
479 let prev = list.availability.replace(new_state);
480 debug_assert_eq!(
482 list.availability.get(),
483 new_state,
484 "the availability cell must latch the newly computed state"
485 );
486 if prev == new_state {
487 return;
488 }
489 match (prev, new_state) {
490 (ClusterAvailability::Available, ClusterAvailability::AllDown) => {
491 error!("cluster {}: all {} backends are down", cluster_id, total);
492 incr!(
493 names::cluster::NO_AVAILABLE_BACKENDS,
494 Some(cluster_id),
495 None
496 );
497 push_event(Event {
498 kind: EventKind::NoAvailableBackends as i32,
499 cluster_id: Some(cluster_id.to_owned()),
500 backend_id: None,
501 address: None,
502 metric_detail: None,
503 });
504 }
505 (ClusterAvailability::AllDown, ClusterAvailability::Available) => {
506 info!(
507 "cluster {}: backends recovered ({}/{} available)",
508 cluster_id, available, total
509 );
510 incr!(names::cluster::AVAILABLE_RECOVERED, Some(cluster_id), None);
511 push_event(Event {
512 kind: EventKind::ClusterRecovered as i32,
513 cluster_id: Some(cluster_id.to_owned()),
514 backend_id: None,
515 address: None,
516 metric_detail: None,
517 });
518 }
519 _ => {}
520 }
521 }
522
523 pub fn set_cluster_http2(&mut self, cluster_id: &str, http2: bool) {
528 if http2 {
529 self.cluster_http2.insert(cluster_id.to_owned(), true);
530 } else {
531 self.cluster_http2.remove(cluster_id);
532 }
533 }
534
535 pub fn set_health_check_config(&mut self, cluster_id: &str, config: Option<HealthCheckConfig>) {
536 match config {
537 Some(c) => {
538 self.health_check_configs.insert(cluster_id.to_owned(), c);
539 }
540 None => {
541 self.health_check_configs.remove(cluster_id);
542 if let Some(backend_list) = self.backends.get(cluster_id) {
550 for backend in &backend_list.backends {
551 backend.borrow_mut().health = HealthState::default();
552 }
553 }
554 self.record_cluster_availability(cluster_id);
558 }
559 }
560 }
561
562 pub fn import_configuration_state(
563 &mut self,
564 backends: &HashMap<ClusterId, Vec<sozu_command::response::Backend>>,
565 ) {
566 self.backends
567 .extend(backends.iter().map(|(cluster_id, backend_vec)| {
568 (
569 cluster_id.to_string(),
570 BackendList::import_configuration_state(backend_vec),
571 )
572 }));
573 for cluster_id in backends.keys() {
580 self.record_cluster_availability(cluster_id);
581 }
582 }
583
584 pub fn add_backend(&mut self, cluster_id: &str, backend: Backend) {
585 let address = backend.address;
586 self.backends
587 .entry(cluster_id.to_string())
588 .or_default()
589 .add_backend(backend);
590 debug_assert!(
593 self.backends
594 .get(cluster_id)
595 .is_some_and(|list| list.has_backend(&address)),
596 "add_backend must leave the backend present in its cluster"
597 );
598 self.record_cluster_availability(cluster_id);
603 }
604
605 pub fn remove_backend(
613 &mut self,
614 cluster_id: &str,
615 backend_address: &SocketAddr,
616 ) -> Vec<String> {
617 let removed = if let Some(backends) = self.backends.get_mut(cluster_id) {
618 backends.remove_backend(backend_address)
619 } else {
620 error!(
621 "Backend was already removed: cluster id {}, address {:?}",
622 cluster_id, backend_address
623 );
624 return Vec::new();
625 };
626 debug_assert!(
629 self.backends
630 .get(cluster_id)
631 .is_none_or(|list| !list.has_backend(backend_address)),
632 "remove_backend must evict every backend at the address"
633 );
634 self.record_cluster_availability(cluster_id);
638 removed
639 }
640
641 pub fn close_backend_connection(&mut self, cluster_id: &str, addr: &SocketAddr) {
643 if let Some(cluster_backends) = self.backends.get_mut(cluster_id) {
644 if let Some(ref mut backend) = cluster_backends.find_backend(addr) {
645 backend.borrow_mut().dec_connections();
646 }
647 }
648 }
649
650 pub fn has_backend(&self, cluster_id: &str, backend: &Backend) -> bool {
651 self.backends
652 .get(cluster_id)
653 .map(|backends| backends.has_backend(&backend.address))
654 .unwrap_or(false)
655 }
656
657 pub fn backend_from_cluster_id(
658 &mut self,
659 cluster_id: &str,
660 ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
661 let cluster_backends = self
662 .backends
663 .get_mut(cluster_id)
664 .ok_or(BackendError::NoBackendForCluster(cluster_id.to_owned()))?;
665
666 if cluster_backends.backends.is_empty() {
667 let _ = cluster_backends;
672 self.record_cluster_availability(cluster_id);
673 return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
674 }
675 debug_assert!(
677 !cluster_backends.backends.is_empty(),
678 "selection runs only on a non-empty backend list"
679 );
680
681 let next_backend = match cluster_backends.next_available_backend() {
682 Some(nb) => nb,
683 None => {
684 let _ = cluster_backends;
690 self.record_cluster_availability(cluster_id);
691 return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
692 }
693 };
694
695 let tcp_stream = {
696 let mut borrowed_backend = next_backend.borrow_mut();
697
698 debug!(
699 "Connecting {} -> {:?}",
700 cluster_id,
701 (
702 borrowed_backend.address,
703 borrowed_backend.active_connections,
704 borrowed_backend.failures
705 )
706 );
707
708 borrowed_backend.try_connect().map_err(|backend_error| {
709 BackendError::ConnectionFailures {
710 cluster_id: cluster_id.to_owned(),
711 backend_address: borrowed_backend.address,
712 failures: borrowed_backend.failures,
713 error: backend_error.to_string(),
714 }
715 })?
716 };
717
718 let _ = cluster_backends;
725 self.record_cluster_availability(cluster_id);
726
727 debug_assert!(
730 self.backends.get(cluster_id).is_some_and(|list| {
731 let picked = next_backend.borrow().address;
732 list.has_backend(&picked)
733 }),
734 "the selected backend must belong to the cluster's live set"
735 );
736
737 Ok((next_backend.clone(), tcp_stream))
738 }
739
740 pub fn backend_from_cluster_id_with_key(
754 &mut self,
755 cluster_id: &str,
756 key: Option<u64>,
757 ) -> Result<(String, SocketAddr), BackendError> {
758 let cluster_backends = self
759 .backends
760 .get_mut(cluster_id)
761 .ok_or(BackendError::NoBackendForCluster(cluster_id.to_owned()))?;
762
763 if cluster_backends.backends.is_empty() {
764 let _ = cluster_backends;
765 self.record_cluster_availability(cluster_id);
766 return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
767 }
768 debug_assert!(
769 !cluster_backends.backends.is_empty(),
770 "keyed selection runs only on a non-empty backend list"
771 );
772
773 let next_backend = match cluster_backends.next_available_backend_with_key(key) {
774 Some(nb) => nb,
775 None => {
776 let _ = cluster_backends;
777 self.record_cluster_availability(cluster_id);
778 return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
779 }
780 };
781
782 let (backend_id, address) = {
783 let borrowed = next_backend.borrow();
784 (borrowed.backend_id.to_owned(), borrowed.address)
785 };
786 debug_assert!(
789 cluster_backends.has_backend(&address),
790 "keyed selection must return a backend in the cluster's live set"
791 );
792 Ok((backend_id, address))
793 }
794
795 pub fn backend_from_sticky_session(
796 &mut self,
797 cluster_id: &str,
798 sticky_session: &str,
799 ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
800 let sticky_conn = self
801 .backends
802 .get_mut(cluster_id)
803 .and_then(|cluster_backends| cluster_backends.find_sticky(sticky_session))
804 .map(|backend| {
805 let mut borrowed = backend.borrow_mut();
806 let conn = borrowed.try_connect();
807
808 conn.map(|tcp_stream| (backend.clone(), tcp_stream))
809 .inspect_err(|_| {
810 error!(
811 "could not connect {} to {:?} using session {} ({} failures)",
812 cluster_id, borrowed.address, sticky_session, borrowed.failures
813 )
814 })
815 });
816
817 match sticky_conn {
818 Some(backend_and_stream) => backend_and_stream,
819 None => {
820 debug!(
821 "Couldn't find a backend corresponding to sticky_session {} for cluster {}",
822 sticky_session, cluster_id
823 );
824 self.backend_from_cluster_id(cluster_id)
825 }
826 }
827 }
828
829 pub fn set_load_balancing_policy_for_cluster(
830 &mut self,
831 cluster_id: &str,
832 lb_algo: LoadBalancingAlgorithms,
833 metric: Option<LoadMetric>,
834 ) {
835 let cluster_backends = self.get_or_create_backend_list_for_cluster(cluster_id);
838 cluster_backends.set_load_balancing_policy(lb_algo, metric);
839 }
840
841 pub fn get_or_create_backend_list_for_cluster(&mut self, cluster_id: &str) -> &mut BackendList {
842 self.backends.entry(cluster_id.to_string()).or_default()
843 }
844}
845
846#[derive(Debug)]
847pub struct BackendList {
848 pub backends: Vec<Rc<RefCell<Backend>>>,
849 pub next_id: u32,
850 pub load_balancing: Box<dyn LoadBalancingAlgorithm>,
851 fail_open_warned: bool,
859 pub(crate) availability: Cell<ClusterAvailability>,
864}
865
866impl Default for BackendList {
867 fn default() -> Self {
868 Self::new()
869 }
870}
871
872impl BackendList {
873 pub fn new() -> BackendList {
874 BackendList {
875 backends: Vec::new(),
876 next_id: 0,
877 load_balancing: Box::new(Random),
878 fail_open_warned: false,
879 availability: Cell::new(ClusterAvailability::Available),
880 }
881 }
882
883 pub(crate) fn evaluate_availability(&self) -> (usize, usize) {
887 let total = self.backends.len();
888 let available = self
889 .backends
890 .iter()
891 .filter(|b| b.borrow().is_available())
892 .count();
893 debug_assert!(
896 available <= total,
897 "available ({available}) cannot exceed total ({total})"
898 );
899 debug_assert_eq!(total, self.backends.len(), "total must equal backend count");
900 (available, total)
901 }
902
903 #[cfg(debug_assertions)]
908 fn check_invariants(&self) {
909 debug_assert!(
913 self.next_id as usize >= self.backends.len(),
914 "next_id ({}) must be >= live backend count ({})",
915 self.next_id,
916 self.backends.len()
917 );
918 for (i, a) in self.backends.iter().enumerate() {
923 let a = a.borrow();
924 for b in self.backends.iter().skip(i + 1) {
925 let b = b.borrow();
926 debug_assert!(
927 a.address != b.address || a.backend_id != b.backend_id,
928 "duplicate (address, backend_id) in the live set: {:?} / {}",
929 a.address,
930 a.backend_id
931 );
932 }
933 }
934 }
935
936 pub fn import_configuration_state(
937 backend_vec: &[sozu_command_lib::response::Backend],
938 ) -> BackendList {
939 let mut list = BackendList::new();
940 for backend in backend_vec {
941 let backend = Backend::new(
942 &backend.backend_id,
943 backend.address,
944 backend.sticky_id.clone(),
945 backend.load_balancing_parameters,
946 backend.backup,
947 );
948 list.add_backend(backend);
949 }
950
951 list
952 }
953
954 pub fn add_backend(&mut self, backend: Backend) {
955 let address = backend.address;
956 let len_before = self.backends.len();
957 let next_id_before = self.next_id;
958 let existed = self.backends.iter().any(|b| {
959 b.borrow().address == backend.address && b.borrow().backend_id == backend.backend_id
960 });
961 match self.backends.iter_mut().find(|b| {
962 b.borrow().address == backend.address && b.borrow().backend_id == backend.backend_id
963 }) {
964 None => {
965 let backend = Rc::new(RefCell::new(backend));
966 self.backends.push(backend);
967 self.next_id += 1;
968 }
969 Some(old_backend) => {
972 let mut b = old_backend.borrow_mut();
973 b.sticky_id.clone_from(&backend.sticky_id);
974 b.load_balancing_parameters
975 .clone_from(&backend.load_balancing_parameters);
976 b.backup = backend.backup;
977 }
978 }
979 debug_assert_eq!(
982 self.backends.len(),
983 len_before + (!existed) as usize,
984 "add_backend grows the list by one only on a genuine insert"
985 );
986 debug_assert_eq!(
987 self.next_id,
988 next_id_before + (!existed) as u32,
989 "next_id advances by one only on a genuine insert"
990 );
991 debug_assert!(
992 self.has_backend(&address),
993 "add_backend must leave the backend present in the list"
994 );
995 self.load_balancing.rebuild(&self.backends);
1000 #[cfg(debug_assertions)]
1001 self.check_invariants();
1002 }
1003
1004 pub fn remove_backend(&mut self, backend_address: &SocketAddr) -> Vec<String> {
1012 let len_before = self.backends.len();
1013 let mut removed = Vec::new();
1014 self.backends.retain(|backend| {
1015 let b = backend.borrow();
1016 if &b.address == backend_address {
1017 removed.push(b.backend_id.clone());
1018 false
1019 } else {
1020 true
1021 }
1022 });
1023 debug_assert_eq!(
1026 self.backends.len(),
1027 len_before - removed.len(),
1028 "remove_backend must drop exactly the backends it reports"
1029 );
1030 debug_assert!(
1031 !self.has_backend(backend_address),
1032 "remove_backend must evict every backend at the address"
1033 );
1034 if !removed.is_empty() {
1038 self.load_balancing.rebuild(&self.backends);
1039 }
1040 #[cfg(debug_assertions)]
1041 self.check_invariants();
1042 removed
1043 }
1044
1045 pub fn has_backend(&self, backend_address: &SocketAddr) -> bool {
1046 self.backends
1047 .iter()
1048 .any(|backend| backend.borrow().address == *backend_address)
1049 }
1050
1051 pub fn find_backend(
1052 &mut self,
1053 backend_address: &SocketAddr,
1054 ) -> Option<&mut Rc<RefCell<Backend>>> {
1055 self.backends
1056 .iter_mut()
1057 .find(|backend| backend.borrow().address == *backend_address)
1058 }
1059
1060 pub fn find_sticky(&mut self, sticky_session: &str) -> Option<&mut Rc<RefCell<Backend>>> {
1061 self.backends
1062 .iter_mut()
1063 .find(|b| b.borrow().sticky_id.as_deref() == Some(sticky_session))
1064 .and_then(|b| if b.borrow().can_open() { Some(b) } else { None })
1065 }
1066
1067 pub fn available_backends(&mut self, backup: bool) -> Vec<Rc<RefCell<Backend>>> {
1068 self.backends
1069 .iter()
1070 .filter(|backend| {
1071 let owned = backend.borrow();
1072 owned.backup == backup && owned.can_open()
1073 })
1074 .map(Clone::clone)
1075 .collect()
1076 }
1077
1078 pub fn next_available_backend(&mut self) -> Option<Rc<RefCell<Backend>>> {
1079 self.next_available_backend_with_key(None)
1080 }
1081
1082 pub fn next_available_backend_with_key(
1089 &mut self,
1090 key: Option<u64>,
1091 ) -> Option<Rc<RefCell<Backend>>> {
1092 let mut backends = self.available_backends(false);
1093
1094 if backends.is_empty() {
1095 backends = self.available_backends(true);
1096 }
1097
1098 if !backends.is_empty() {
1099 if self.fail_open_warned {
1101 info!(
1102 "fail-open: cluster recovered, {} backends now healthy",
1103 backends.len()
1104 );
1105 self.fail_open_warned = false;
1106 }
1107 debug_assert!(
1110 backends.len() <= self.backends.len(),
1111 "candidate set cannot be larger than the full backend list"
1112 );
1113 let picked = self
1114 .load_balancing
1115 .next_available_backend(key, &mut backends);
1116 debug_assert!(
1117 picked.as_ref().is_none_or(|b| {
1118 let addr = b.borrow().address;
1119 self.backends.iter().any(|x| x.borrow().address == addr)
1120 }),
1121 "selection must return a backend present in the live list"
1122 );
1123 return picked;
1124 }
1125
1126 backends = self
1135 .backends
1136 .iter()
1137 .filter(|b| {
1138 let owned = b.borrow();
1139 owned.status == BackendStatus::Normal
1140 && matches!(owned.retry_policy.can_try(), Some(retry::RetryAction::OKAY))
1141 })
1142 .map(Clone::clone)
1143 .collect();
1144
1145 if backends.is_empty() {
1146 return None;
1147 }
1148
1149 if !self.fail_open_warned {
1153 warn!(
1154 "fail-open: all backends unhealthy, routing to {} normal backends with retry-policy OKAY",
1155 backends.len()
1156 );
1157 self.fail_open_warned = true;
1158 }
1159 count!(names::backend::FAIL_OPEN, 1);
1160
1161 self.load_balancing
1162 .next_available_backend(key, &mut backends)
1163 }
1164
1165 pub fn set_load_balancing_policy(
1166 &mut self,
1167 load_balancing_policy: LoadBalancingAlgorithms,
1168 metric: Option<LoadMetric>,
1169 ) {
1170 match load_balancing_policy {
1171 LoadBalancingAlgorithms::RoundRobin => {
1172 self.load_balancing = Box::new(RoundRobin::new())
1173 }
1174 LoadBalancingAlgorithms::Random => self.load_balancing = Box::new(Random {}),
1175 LoadBalancingAlgorithms::LeastLoaded => {
1176 self.load_balancing = Box::new(LeastLoaded {
1177 metric: metric.unwrap_or(LoadMetric::Connections),
1178 })
1179 }
1180 LoadBalancingAlgorithms::PowerOfTwo => {
1181 self.load_balancing = Box::new(PowerOfTwo {
1182 metric: metric.unwrap_or(LoadMetric::Connections),
1183 })
1184 }
1185 LoadBalancingAlgorithms::Hrw => self.load_balancing = Box::new(Rendezvous::new()),
1188 LoadBalancingAlgorithms::Maglev => {
1189 let mut maglev = Maglev::new();
1190 maglev.rebuild(&self.backends);
1193 self.load_balancing = Box::new(maglev);
1194 }
1195 }
1196 }
1197}
1198
1199#[cfg(test)]
1200mod backends_test {
1201
1202 use std::{net::TcpListener, sync::mpsc::*, thread};
1203
1204 use super::*;
1205
1206 fn run_mock_tcp_server(addr: &str, stopper: Receiver<()>) {
1207 let mut run = true;
1208 let listener = TcpListener::bind(addr).unwrap();
1209
1210 thread::spawn(move || {
1211 while run {
1212 for _stream in listener.incoming() {
1213 if let Ok(()) = stopper.try_recv() {
1215 run = false;
1216 }
1217 }
1218 }
1219 });
1220 }
1221
1222 #[test]
1223 fn it_should_retrieve_a_backend_from_cluster_id_when_backends_have_been_recorded() {
1224 let mut backend_map = BackendMap::new();
1225 let cluster_id = "mycluster";
1226
1227 let backend_addr = "127.0.0.1:1236";
1228 let (sender, receiver) = channel();
1229 run_mock_tcp_server(backend_addr, receiver);
1230
1231 backend_map.add_backend(
1232 cluster_id,
1233 Backend::new(
1234 &format!("{cluster_id}-1"),
1235 backend_addr.parse().unwrap(),
1236 None,
1237 None,
1238 None,
1239 ),
1240 );
1241
1242 assert!(backend_map.backend_from_cluster_id(cluster_id).is_ok());
1243 sender.send(()).unwrap();
1244 }
1245
1246 #[test]
1247 fn it_should_not_retrieve_a_backend_from_cluster_id_when_backend_has_not_been_recorded() {
1248 let mut backend_map = BackendMap::new();
1249 let cluster_not_recorded = "not";
1250 backend_map.add_backend(
1251 "foo",
1252 Backend::new("foo-1", "127.0.0.1:9001".parse().unwrap(), None, None, None),
1253 );
1254
1255 assert!(
1256 backend_map
1257 .backend_from_cluster_id(cluster_not_recorded)
1258 .is_err()
1259 );
1260 }
1261
1262 #[test]
1263 fn it_should_not_retrieve_a_backend_from_cluster_id_when_backend_list_is_empty() {
1264 let mut backend_map = BackendMap::new();
1265
1266 assert!(backend_map.backend_from_cluster_id("dumb").is_err());
1267 }
1268
1269 #[test]
1270 fn it_should_retrieve_a_backend_from_sticky_session_when_the_backend_has_been_recorded() {
1271 let mut backend_map = BackendMap::new();
1272 let cluster_id = "mycluster";
1273 let sticky_session = "server-2";
1274
1275 let backend_addr = "127.0.0.1:3456";
1276 let (sender, receiver) = channel();
1277 run_mock_tcp_server(backend_addr, receiver);
1278
1279 backend_map.add_backend(
1280 cluster_id,
1281 Backend::new(
1282 &format!("{cluster_id}-1"),
1283 "127.0.0.1:9001".parse().unwrap(),
1284 Some("server-1".to_string()),
1285 None,
1286 None,
1287 ),
1288 );
1289 backend_map.add_backend(
1290 cluster_id,
1291 Backend::new(
1292 &format!("{cluster_id}-2"),
1293 "127.0.0.1:9000".parse().unwrap(),
1294 Some("server-2".to_string()),
1295 None,
1296 None,
1297 ),
1298 );
1299 backend_map.add_backend(
1301 cluster_id,
1302 Backend::new(
1303 &format!("{cluster_id}-3"),
1304 backend_addr.parse().unwrap(),
1305 Some("server-3".to_string()),
1306 None,
1307 None,
1308 ),
1309 );
1310
1311 assert!(
1312 backend_map
1313 .backend_from_sticky_session(cluster_id, sticky_session)
1314 .is_ok()
1315 );
1316 sender.send(()).unwrap();
1317 }
1318
1319 #[test]
1320 fn it_should_not_retrieve_a_backend_from_sticky_session_when_the_backend_has_not_been_recorded()
1321 {
1322 let mut backend_map = BackendMap::new();
1323 let cluster_id = "mycluster";
1324 let sticky_session = "test";
1325
1326 assert!(
1327 backend_map
1328 .backend_from_sticky_session(cluster_id, sticky_session)
1329 .is_err()
1330 );
1331 }
1332
1333 #[test]
1334 fn it_should_not_retrieve_a_backend_from_sticky_session_when_the_backend_list_is_empty() {
1335 let mut backend_map = BackendMap::new();
1336 let mycluster_not_recorded = "mycluster";
1337 let sticky_session = "test";
1338
1339 assert!(
1340 backend_map
1341 .backend_from_sticky_session(mycluster_not_recorded, sticky_session)
1342 .is_err()
1343 );
1344 }
1345
1346 #[test]
1347 fn it_should_add_a_backend_when_he_doesnt_already_exist() {
1348 let backend_id = "myback";
1349 let mut backends_list = BackendList::new();
1350 backends_list.add_backend(Backend::new(
1351 backend_id,
1352 "127.0.0.1:80".parse().unwrap(),
1353 None,
1354 None,
1355 None,
1356 ));
1357
1358 assert_eq!(1, backends_list.backends.len());
1359 }
1360
1361 #[test]
1362 fn it_should_not_add_a_backend_when_he_already_exist() {
1363 let backend_id = "myback";
1364 let mut backends_list = BackendList::new();
1365 backends_list.add_backend(Backend::new(
1366 backend_id,
1367 "127.0.0.1:80".parse().unwrap(),
1368 None,
1369 None,
1370 None,
1371 ));
1372
1373 backends_list.add_backend(Backend::new(
1375 backend_id,
1376 "127.0.0.1:80".parse().unwrap(),
1377 None,
1378 None,
1379 None,
1380 ));
1381
1382 assert_eq!(1, backends_list.backends.len());
1383 }
1384
1385 fn unhealthy_backend(id: &str, port: u16) -> Backend {
1388 let mut backend = Backend::new(
1389 id,
1390 format!("127.0.0.1:{port}").parse().unwrap(),
1391 None,
1392 None,
1393 None,
1394 );
1395 backend.health.record_failure(1);
1397 assert!(!backend.health.is_healthy());
1398 backend
1399 }
1400
1401 #[test]
1402 fn fail_open_picks_normal_backend_in_retry_policy_okay() {
1403 let mut list = BackendList::new();
1408 list.add_backend(unhealthy_backend("b1", 9001));
1409 list.add_backend(unhealthy_backend("b2", 9002));
1410
1411 assert!(list.available_backends(false).is_empty());
1413 assert!(list.available_backends(true).is_empty());
1414
1415 let picked = list.next_available_backend();
1416 assert!(
1417 picked.is_some(),
1418 "fail-open must pick a Normal+OKAY backend"
1419 );
1420 assert!(list.fail_open_warned, "regime entry must latch the warn!");
1421 }
1422
1423 #[test]
1424 fn fail_open_skips_backend_in_retry_backoff() {
1425 let mut list = BackendList::new();
1431 list.add_backend(unhealthy_backend("b1", 9011));
1432 list.add_backend(unhealthy_backend("b2", 9012));
1433 for backend_rc in &list.backends {
1434 backend_rc.borrow_mut().retry_policy().fail();
1435 assert_eq!(
1436 Some(retry::RetryAction::WAIT),
1437 backend_rc.borrow().retry_policy.can_try(),
1438 "test fixture must place retry policy in WAIT"
1439 );
1440 }
1441
1442 let picked = list.next_available_backend();
1443 assert!(
1444 picked.is_none(),
1445 "fail-open must skip backends whose retry policy is in WAIT"
1446 );
1447 assert!(
1448 !list.fail_open_warned,
1449 "no candidate backends, no regime entry"
1450 );
1451 }
1452
1453 #[test]
1454 fn fail_open_warn_latched() {
1455 let mut list = BackendList::new();
1459 list.add_backend(unhealthy_backend("b1", 9021));
1460 list.add_backend(unhealthy_backend("b2", 9022));
1461
1462 assert!(list.next_available_backend().is_some());
1463 assert!(list.fail_open_warned, "first fail-open must latch");
1464
1465 assert!(list.next_available_backend().is_some());
1466 assert!(
1467 list.fail_open_warned,
1468 "subsequent fail-open routing keeps the latch"
1469 );
1470
1471 list.backends[0].borrow_mut().health.status = HealthStatus::Healthy;
1474 let picked = list.next_available_backend();
1475 assert!(
1476 picked.is_some(),
1477 "regular path must select the healed backend"
1478 );
1479 assert!(
1480 !list.fail_open_warned,
1481 "regime exit must clear the latch so the next entry is logged again"
1482 );
1483 }
1484
1485 fn healthy_backend(id: &str, port: u16) -> Backend {
1493 Backend::new(
1494 id,
1495 format!("127.0.0.1:{port}").parse().unwrap(),
1496 None,
1497 None,
1498 None,
1499 )
1500 }
1501
1502 #[test]
1503 fn is_available_requires_health_status_and_retry_policy() {
1504 let mut backend = Backend::new("b", "127.0.0.1:9050".parse().unwrap(), None, None, None);
1506 assert!(backend.is_available(), "fresh backend must be available");
1507
1508 backend.health.record_failure(1);
1510 assert!(!backend.is_available(), "unhealthy must not be available");
1511
1512 backend.health.status = HealthStatus::Healthy;
1518 assert!(backend.is_available());
1519 backend.retry_policy.force_down();
1520 assert!(
1521 backend.retry_policy.is_down(),
1522 "test setup: retry policy budget must be exhausted",
1523 );
1524 assert!(
1525 !backend.is_available(),
1526 "retry-policy backoff must fail the predicate"
1527 );
1528
1529 backend.retry_policy.succeed();
1531 backend.set_closing();
1532 assert!(
1533 !backend.is_available(),
1534 "Closing lifecycle status must fail the predicate"
1535 );
1536 }
1537
1538 #[test]
1539 fn evaluate_availability_empty_list_returns_zero_zero() {
1540 let list = BackendList::new();
1541 assert_eq!((0, 0), list.evaluate_availability());
1542 }
1543
1544 #[test]
1545 fn evaluate_availability_counts_only_healthy_normal_not_in_backoff() {
1546 let mut list = BackendList::new();
1547 list.add_backend(healthy_backend("b-ok-1", 9101));
1548 list.add_backend(healthy_backend("b-ok-2", 9102));
1549 list.add_backend(unhealthy_backend("b-bad", 9103));
1550 let (available, total) = list.evaluate_availability();
1551 assert_eq!(3, total, "every configured backend counts toward total");
1552 assert_eq!(
1553 2, available,
1554 "only the two healthy backends pass the predicate"
1555 );
1556 }
1557
1558 #[test]
1559 fn evaluate_availability_excludes_retry_policy_down() {
1560 let mut list = BackendList::new();
1561 list.add_backend(healthy_backend("b-fresh", 9111));
1562 list.add_backend(healthy_backend("b-fail", 9112));
1563 list.backends[1].borrow_mut().retry_policy.force_down();
1568 let (available, total) = list.evaluate_availability();
1569 assert_eq!(2, total);
1570 assert_eq!(
1571 1, available,
1572 "retry-policy is_down() backend must be excluded even when health.is_healthy()"
1573 );
1574 }
1575
1576 #[test]
1577 fn record_cluster_availability_flips_to_alldown_then_idempotent() {
1578 let mut map = BackendMap::new();
1579 let cluster_id = "c-flap";
1580 map.add_backend(cluster_id, unhealthy_backend("b1", 9201));
1581 let list = map.backends.get(cluster_id).expect("cluster present");
1584 assert_eq!(
1585 ClusterAvailability::AllDown,
1586 list.availability.get(),
1587 "single unhealthy backend must drive the cell to AllDown"
1588 );
1589 map.record_cluster_availability(cluster_id);
1591 let list = map.backends.get(cluster_id).expect("cluster present");
1592 assert_eq!(
1593 ClusterAvailability::AllDown,
1594 list.availability.get(),
1595 "repeat call must keep the cell at AllDown without flipping"
1596 );
1597 }
1598
1599 #[test]
1600 fn record_cluster_availability_recovers_to_available() {
1601 let mut map = BackendMap::new();
1602 let cluster_id = "c-recover";
1603 map.add_backend(cluster_id, unhealthy_backend("b1", 9301));
1604 assert_eq!(
1605 ClusterAvailability::AllDown,
1606 map.backends.get(cluster_id).unwrap().availability.get()
1607 );
1608 map.backends.get_mut(cluster_id).unwrap().backends[0]
1612 .borrow_mut()
1613 .health
1614 .status = HealthStatus::Healthy;
1615 map.record_cluster_availability(cluster_id);
1616 assert_eq!(
1617 ClusterAvailability::Available,
1618 map.backends.get(cluster_id).unwrap().availability.get(),
1619 "healed backend must flip the cell back to Available"
1620 );
1621 }
1622
1623 #[test]
1624 fn record_cluster_availability_empty_cluster_stays_available() {
1625 let mut map = BackendMap::new();
1626 let cluster_id = "c-empty";
1627 map.backends
1628 .insert(cluster_id.to_owned(), BackendList::new());
1629 map.record_cluster_availability(cluster_id);
1632 assert_eq!(
1633 ClusterAvailability::Available,
1634 map.backends.get(cluster_id).unwrap().availability.get(),
1635 "empty cluster must keep the cell at the default Available"
1636 );
1637 }
1638
1639 #[test]
1640 fn record_cluster_availability_missing_cluster_is_noop() {
1641 let map = BackendMap::new();
1642 map.record_cluster_availability("c-absent");
1644 assert!(
1645 !map.backends.contains_key("c-absent"),
1646 "helper must not insert a BackendList for an unknown cluster_id"
1647 );
1648 }
1649
1650 #[test]
1651 fn import_configuration_state_latches_cluster_rollup_gauges() {
1652 use crate::metrics::METRICS;
1653 use sozu_command_lib::proto::command::QueryMetricsOptions;
1654 let cluster_id = "c-import-rollup-9701";
1657 let mut map = BackendMap::new();
1658 let mut input = HashMap::new();
1659 input.insert(
1660 cluster_id.to_owned(),
1661 vec![sozu_command_lib::response::Backend {
1662 cluster_id: cluster_id.to_owned(),
1663 backend_id: "b1".to_owned(),
1664 address: "127.0.0.1:9701".parse().unwrap(),
1665 sticky_id: None,
1666 load_balancing_parameters: None,
1667 backup: None,
1668 }],
1669 );
1670 map.import_configuration_state(&input);
1671 let response = METRICS
1672 .with(|m| {
1673 m.borrow_mut().query(&QueryMetricsOptions {
1674 metric_names: vec![
1675 names::cluster::AVAILABLE_BACKENDS.to_owned(),
1676 names::cluster::TOTAL_BACKENDS.to_owned(),
1677 ],
1678 cluster_ids: vec![cluster_id.to_owned()],
1679 backend_ids: vec![],
1680 list: false,
1681 no_clusters: false,
1682 workers: false,
1683 })
1684 })
1685 .expect("metrics query succeeds");
1686 let cluster_metrics = match response.content_type {
1687 Some(
1688 sozu_command_lib::proto::command::response_content::ContentType::WorkerMetrics(wm),
1689 ) => wm,
1690 other => panic!("expected WorkerMetrics, got {other:?}"),
1691 };
1692 let cm = cluster_metrics
1693 .clusters
1694 .get(cluster_id)
1695 .expect("imported cluster must have a ClusterMetrics entry");
1696 assert!(
1700 cm.cluster.contains_key(names::cluster::AVAILABLE_BACKENDS),
1701 "cluster.available_backends gauge must be latched at import time"
1702 );
1703 assert!(
1704 cm.cluster.contains_key(names::cluster::TOTAL_BACKENDS),
1705 "cluster.total_backends gauge must be latched at import time"
1706 );
1707 }
1708
1709 #[test]
1710 fn set_health_check_config_none_re_emits_rollup_after_reset() {
1711 let mut map = BackendMap::new();
1712 let cluster_id = "c-hc-reset";
1713 map.add_backend(cluster_id, unhealthy_backend("b1", 9801));
1716 assert_eq!(
1717 ClusterAvailability::AllDown,
1718 map.backends.get(cluster_id).unwrap().availability.get(),
1719 "test setup: unhealthy backend must register the cell at AllDown"
1720 );
1721 map.set_health_check_config(cluster_id, None);
1725 assert_eq!(
1726 ClusterAvailability::Available,
1727 map.backends.get(cluster_id).unwrap().availability.get(),
1728 "set_health_check_config(None) must re-emit the rollup after \
1729 resetting backend health, otherwise dashboards stay stuck at AllDown"
1730 );
1731 }
1732}