1use std::{
2 cell::{Cell, RefCell},
3 collections::HashMap,
4 net::SocketAddr,
5 rc::Rc,
6 time::Duration,
7};
8
9use mio::net::TcpStream;
10use sozu_command::{
11 proto::command::{
12 Event, EventKind, HealthCheckConfig, LoadBalancingAlgorithms, LoadBalancingParams,
13 LoadMetric,
14 },
15 state::ClusterId,
16};
17
18use crate::metrics::names;
19use crate::{
20 PeakEWMA,
21 load_balancing::{LeastLoaded, LoadBalancingAlgorithm, PowerOfTwo, Random, RoundRobin},
22 retry::{self, RetryPolicy},
23 server::{self, push_event},
24};
25
26#[derive(thiserror::Error, Debug)]
27pub enum BackendError {
28 #[error("No backend found for cluster {0}")]
29 NoBackendForCluster(String),
30 #[error("Failed to connect to socket with MIO: {0}")]
31 MioConnection(std::io::Error),
32 #[error("This backend is not in a normal status: status={0:?}")]
33 Status(BackendStatus),
34 #[error("could not connect {cluster_id} to {backend_address:?} ({failures} failures): {error}")]
35 ConnectionFailures {
36 cluster_id: String,
37 backend_address: SocketAddr,
38 failures: usize,
39 error: String,
40 },
41}
42
43#[derive(Debug, PartialEq, Eq, Clone)]
44pub enum BackendStatus {
45 Normal,
46 Closing,
47 Closed,
48}
49
50#[derive(Debug, PartialEq, Eq, Clone, Copy)]
51pub enum HealthStatus {
52 Healthy,
53 Unhealthy,
54}
55
56#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
63pub(crate) enum ClusterAvailability {
64 #[default]
65 Available,
66 AllDown,
67}
68
69#[derive(Debug, Clone, PartialEq)]
70pub struct HealthState {
71 pub status: HealthStatus,
72 pub consecutive_successes: u32,
73 pub consecutive_failures: u32,
74}
75
76impl Default for HealthState {
77 fn default() -> Self {
78 HealthState {
79 status: HealthStatus::Healthy,
80 consecutive_successes: 0,
81 consecutive_failures: 0,
82 }
83 }
84}
85
86impl HealthState {
87 pub fn record_success(&mut self, healthy_threshold: u32) -> bool {
89 self.consecutive_failures = 0;
90 self.consecutive_successes += 1;
91
92 if self.status == HealthStatus::Unhealthy && self.consecutive_successes >= healthy_threshold
93 {
94 self.status = HealthStatus::Healthy;
95 return true;
96 }
97 false
98 }
99
100 pub fn record_failure(&mut self, unhealthy_threshold: u32) -> bool {
102 self.consecutive_successes = 0;
103 self.consecutive_failures += 1;
104
105 if self.status == HealthStatus::Healthy && self.consecutive_failures >= unhealthy_threshold
106 {
107 self.status = HealthStatus::Unhealthy;
108 return true;
109 }
110 false
111 }
112
113 pub fn is_healthy(&self) -> bool {
114 self.status == HealthStatus::Healthy
115 }
116}
117
118#[derive(Debug, PartialEq, Clone)]
119pub struct Backend {
120 pub sticky_id: Option<String>,
121 pub backend_id: String,
122 pub address: SocketAddr,
123 pub status: BackendStatus,
124 pub retry_policy: retry::RetryPolicyWrapper,
125 pub active_connections: usize,
126 pub active_requests: usize,
127 pub failures: usize,
128 pub load_balancing_parameters: Option<LoadBalancingParams>,
129 pub backup: bool,
130 pub connection_time: PeakEWMA,
131 pub health: HealthState,
132}
133
134impl Backend {
135 pub fn new(
136 backend_id: &str,
137 address: SocketAddr,
138 sticky_id: Option<String>,
139 load_balancing_parameters: Option<LoadBalancingParams>,
140 backup: Option<bool>,
141 ) -> Backend {
142 let desired_policy = retry::ExponentialBackoffPolicy::new(6);
143 Backend {
144 sticky_id,
145 backend_id: backend_id.to_owned(),
146 address,
147 status: BackendStatus::Normal,
148 retry_policy: desired_policy.into(),
149 active_connections: 0,
150 active_requests: 0,
151 failures: 0,
152 load_balancing_parameters,
153 backup: backup.unwrap_or(false),
154 connection_time: PeakEWMA::new(),
155 health: HealthState::default(),
156 }
157 }
158
159 pub fn set_closing(&mut self) {
160 self.status = BackendStatus::Closing;
161 }
162
163 pub fn retry_policy(&mut self) -> &mut retry::RetryPolicyWrapper {
164 &mut self.retry_policy
165 }
166
167 pub fn can_open(&self) -> bool {
168 if !self.health.is_healthy() {
169 return false;
170 }
171 if let Some(action) = self.retry_policy.can_try() {
172 self.status == BackendStatus::Normal && action == retry::RetryAction::OKAY
173 } else {
174 false
175 }
176 }
177
178 pub fn is_available(&self) -> bool {
189 self.health.is_healthy()
190 && self.status == BackendStatus::Normal
191 && !self.retry_policy.is_down()
192 }
193
194 pub fn inc_connections(&mut self) -> Option<usize> {
195 if self.status == BackendStatus::Normal {
196 self.active_connections += 1;
197 Some(self.active_connections)
198 } else {
199 None
200 }
201 }
202
203 pub fn dec_connections(&mut self) -> Option<usize> {
205 match self.status {
206 BackendStatus::Normal => {
207 if self.active_connections > 0 {
208 self.active_connections -= 1;
209 }
210 Some(self.active_connections)
211 }
212 BackendStatus::Closed => None,
213 BackendStatus::Closing => {
214 if self.active_connections > 0 {
215 self.active_connections -= 1;
216 }
217 if self.active_connections == 0 {
218 self.status = BackendStatus::Closed;
219 None
220 } else {
221 Some(self.active_connections)
222 }
223 }
224 }
225 }
226
227 pub fn set_connection_time(&mut self, dur: Duration) {
228 self.connection_time.observe(dur.as_nanos() as f64);
229 }
230
231 pub fn peak_ewma_connection(&mut self) -> f64 {
232 self.connection_time.get(self.active_connections)
233 }
234
235 pub fn try_connect(&mut self) -> Result<mio::net::TcpStream, BackendError> {
236 if self.status != BackendStatus::Normal {
237 return Err(BackendError::Status(self.status.to_owned()));
238 }
239
240 match mio::net::TcpStream::connect(self.address) {
241 Ok(tcp_stream) => {
242 self.inc_connections();
244 Ok(tcp_stream)
245 }
246 Err(io_error) => {
247 self.retry_policy.fail();
248 self.failures += 1;
249 Err(BackendError::MioConnection(io_error))
254 }
255 }
256 }
257}
258
259impl std::ops::Drop for Backend {
263 fn drop(&mut self) {
264 server::push_event(Event {
265 kind: EventKind::RemovedBackendHasNoConnections as i32,
266 backend_id: Some(self.backend_id.to_owned()),
267 address: Some(self.address.into()),
268 cluster_id: None,
269 metric_detail: None,
270 });
271 }
272}
273
274#[derive(Debug)]
275pub struct BackendMap {
276 pub backends: HashMap<ClusterId, BackendList>,
277 pub max_failures: usize,
278 pub health_check_configs: HashMap<ClusterId, HealthCheckConfig>,
279 pub cluster_http2: HashMap<ClusterId, bool>,
286}
287
288impl Default for BackendMap {
289 fn default() -> Self {
290 Self::new()
291 }
292}
293
294impl BackendMap {
295 pub fn new() -> BackendMap {
296 BackendMap {
297 backends: HashMap::new(),
298 max_failures: 3,
299 health_check_configs: HashMap::new(),
300 cluster_http2: HashMap::new(),
301 }
302 }
303
304 pub(crate) fn record_cluster_availability(&self, cluster_id: &str) {
319 let Some(list) = self.backends.get(cluster_id) else {
320 return;
321 };
322
323 let (available, total) = list.evaluate_availability();
324 gauge!(
325 names::cluster::AVAILABLE_BACKENDS,
326 available,
327 Some(cluster_id),
328 None
329 );
330 gauge!(
331 names::cluster::TOTAL_BACKENDS,
332 total,
333 Some(cluster_id),
334 None
335 );
336
337 let new_state = if total > 0 && available == 0 {
338 ClusterAvailability::AllDown
339 } else {
340 ClusterAvailability::Available
341 };
342
343 let prev = list.availability.replace(new_state);
344 if prev == new_state {
345 return;
346 }
347 match (prev, new_state) {
348 (ClusterAvailability::Available, ClusterAvailability::AllDown) => {
349 error!("cluster {}: all {} backends are down", cluster_id, total);
350 incr!(
351 names::cluster::NO_AVAILABLE_BACKENDS,
352 Some(cluster_id),
353 None
354 );
355 push_event(Event {
356 kind: EventKind::NoAvailableBackends as i32,
357 cluster_id: Some(cluster_id.to_owned()),
358 backend_id: None,
359 address: None,
360 metric_detail: None,
361 });
362 }
363 (ClusterAvailability::AllDown, ClusterAvailability::Available) => {
364 info!(
365 "cluster {}: backends recovered ({}/{} available)",
366 cluster_id, available, total
367 );
368 incr!(names::cluster::AVAILABLE_RECOVERED, Some(cluster_id), None);
369 push_event(Event {
370 kind: EventKind::ClusterRecovered as i32,
371 cluster_id: Some(cluster_id.to_owned()),
372 backend_id: None,
373 address: None,
374 metric_detail: None,
375 });
376 }
377 _ => {}
378 }
379 }
380
381 pub fn set_cluster_http2(&mut self, cluster_id: &str, http2: bool) {
386 if http2 {
387 self.cluster_http2.insert(cluster_id.to_owned(), true);
388 } else {
389 self.cluster_http2.remove(cluster_id);
390 }
391 }
392
393 pub fn set_health_check_config(&mut self, cluster_id: &str, config: Option<HealthCheckConfig>) {
394 match config {
395 Some(c) => {
396 self.health_check_configs.insert(cluster_id.to_owned(), c);
397 }
398 None => {
399 self.health_check_configs.remove(cluster_id);
400 if let Some(backend_list) = self.backends.get(cluster_id) {
408 for backend in &backend_list.backends {
409 backend.borrow_mut().health = HealthState::default();
410 }
411 }
412 self.record_cluster_availability(cluster_id);
416 }
417 }
418 }
419
420 pub fn import_configuration_state(
421 &mut self,
422 backends: &HashMap<ClusterId, Vec<sozu_command::response::Backend>>,
423 ) {
424 self.backends
425 .extend(backends.iter().map(|(cluster_id, backend_vec)| {
426 (
427 cluster_id.to_string(),
428 BackendList::import_configuration_state(backend_vec),
429 )
430 }));
431 for cluster_id in backends.keys() {
438 self.record_cluster_availability(cluster_id);
439 }
440 }
441
442 pub fn add_backend(&mut self, cluster_id: &str, backend: Backend) {
443 self.backends
444 .entry(cluster_id.to_string())
445 .or_default()
446 .add_backend(backend);
447 self.record_cluster_availability(cluster_id);
452 }
453
454 pub fn remove_backend(
462 &mut self,
463 cluster_id: &str,
464 backend_address: &SocketAddr,
465 ) -> Vec<String> {
466 let removed = if let Some(backends) = self.backends.get_mut(cluster_id) {
467 backends.remove_backend(backend_address)
468 } else {
469 error!(
470 "Backend was already removed: cluster id {}, address {:?}",
471 cluster_id, backend_address
472 );
473 return Vec::new();
474 };
475 self.record_cluster_availability(cluster_id);
479 removed
480 }
481
482 pub fn close_backend_connection(&mut self, cluster_id: &str, addr: &SocketAddr) {
484 if let Some(cluster_backends) = self.backends.get_mut(cluster_id) {
485 if let Some(ref mut backend) = cluster_backends.find_backend(addr) {
486 backend.borrow_mut().dec_connections();
487 }
488 }
489 }
490
491 pub fn has_backend(&self, cluster_id: &str, backend: &Backend) -> bool {
492 self.backends
493 .get(cluster_id)
494 .map(|backends| backends.has_backend(&backend.address))
495 .unwrap_or(false)
496 }
497
498 pub fn backend_from_cluster_id(
499 &mut self,
500 cluster_id: &str,
501 ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
502 let cluster_backends = self
503 .backends
504 .get_mut(cluster_id)
505 .ok_or(BackendError::NoBackendForCluster(cluster_id.to_owned()))?;
506
507 if cluster_backends.backends.is_empty() {
508 let _ = cluster_backends;
513 self.record_cluster_availability(cluster_id);
514 return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
515 }
516
517 let next_backend = match cluster_backends.next_available_backend() {
518 Some(nb) => nb,
519 None => {
520 let _ = cluster_backends;
526 self.record_cluster_availability(cluster_id);
527 return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
528 }
529 };
530
531 let tcp_stream = {
532 let mut borrowed_backend = next_backend.borrow_mut();
533
534 debug!(
535 "Connecting {} -> {:?}",
536 cluster_id,
537 (
538 borrowed_backend.address,
539 borrowed_backend.active_connections,
540 borrowed_backend.failures
541 )
542 );
543
544 borrowed_backend.try_connect().map_err(|backend_error| {
545 BackendError::ConnectionFailures {
546 cluster_id: cluster_id.to_owned(),
547 backend_address: borrowed_backend.address,
548 failures: borrowed_backend.failures,
549 error: backend_error.to_string(),
550 }
551 })?
552 };
553
554 let _ = cluster_backends;
561 self.record_cluster_availability(cluster_id);
562
563 Ok((next_backend.clone(), tcp_stream))
564 }
565
566 pub fn backend_from_sticky_session(
567 &mut self,
568 cluster_id: &str,
569 sticky_session: &str,
570 ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
571 let sticky_conn = self
572 .backends
573 .get_mut(cluster_id)
574 .and_then(|cluster_backends| cluster_backends.find_sticky(sticky_session))
575 .map(|backend| {
576 let mut borrowed = backend.borrow_mut();
577 let conn = borrowed.try_connect();
578
579 conn.map(|tcp_stream| (backend.clone(), tcp_stream))
580 .inspect_err(|_| {
581 error!(
582 "could not connect {} to {:?} using session {} ({} failures)",
583 cluster_id, borrowed.address, sticky_session, borrowed.failures
584 )
585 })
586 });
587
588 match sticky_conn {
589 Some(backend_and_stream) => backend_and_stream,
590 None => {
591 debug!(
592 "Couldn't find a backend corresponding to sticky_session {} for cluster {}",
593 sticky_session, cluster_id
594 );
595 self.backend_from_cluster_id(cluster_id)
596 }
597 }
598 }
599
600 pub fn set_load_balancing_policy_for_cluster(
601 &mut self,
602 cluster_id: &str,
603 lb_algo: LoadBalancingAlgorithms,
604 metric: Option<LoadMetric>,
605 ) {
606 let cluster_backends = self.get_or_create_backend_list_for_cluster(cluster_id);
609 cluster_backends.set_load_balancing_policy(lb_algo, metric);
610 }
611
612 pub fn get_or_create_backend_list_for_cluster(&mut self, cluster_id: &str) -> &mut BackendList {
613 self.backends.entry(cluster_id.to_string()).or_default()
614 }
615}
616
617#[derive(Debug)]
618pub struct BackendList {
619 pub backends: Vec<Rc<RefCell<Backend>>>,
620 pub next_id: u32,
621 pub load_balancing: Box<dyn LoadBalancingAlgorithm>,
622 fail_open_warned: bool,
630 pub(crate) availability: Cell<ClusterAvailability>,
635}
636
637impl Default for BackendList {
638 fn default() -> Self {
639 Self::new()
640 }
641}
642
643impl BackendList {
644 pub fn new() -> BackendList {
645 BackendList {
646 backends: Vec::new(),
647 next_id: 0,
648 load_balancing: Box::new(Random),
649 fail_open_warned: false,
650 availability: Cell::new(ClusterAvailability::Available),
651 }
652 }
653
654 pub(crate) fn evaluate_availability(&self) -> (usize, usize) {
658 let total = self.backends.len();
659 let available = self
660 .backends
661 .iter()
662 .filter(|b| b.borrow().is_available())
663 .count();
664 (available, total)
665 }
666
667 pub fn import_configuration_state(
668 backend_vec: &[sozu_command_lib::response::Backend],
669 ) -> BackendList {
670 let mut list = BackendList::new();
671 for backend in backend_vec {
672 let backend = Backend::new(
673 &backend.backend_id,
674 backend.address,
675 backend.sticky_id.clone(),
676 backend.load_balancing_parameters,
677 backend.backup,
678 );
679 list.add_backend(backend);
680 }
681
682 list
683 }
684
685 pub fn add_backend(&mut self, backend: Backend) {
686 match self.backends.iter_mut().find(|b| {
687 b.borrow().address == backend.address && b.borrow().backend_id == backend.backend_id
688 }) {
689 None => {
690 let backend = Rc::new(RefCell::new(backend));
691 self.backends.push(backend);
692 self.next_id += 1;
693 }
694 Some(old_backend) => {
697 let mut b = old_backend.borrow_mut();
698 b.sticky_id.clone_from(&backend.sticky_id);
699 b.load_balancing_parameters
700 .clone_from(&backend.load_balancing_parameters);
701 b.backup = backend.backup;
702 }
703 }
704 }
705
706 pub fn remove_backend(&mut self, backend_address: &SocketAddr) -> Vec<String> {
714 let mut removed = Vec::new();
715 self.backends.retain(|backend| {
716 let b = backend.borrow();
717 if &b.address == backend_address {
718 removed.push(b.backend_id.clone());
719 false
720 } else {
721 true
722 }
723 });
724 removed
725 }
726
727 pub fn has_backend(&self, backend_address: &SocketAddr) -> bool {
728 self.backends
729 .iter()
730 .any(|backend| backend.borrow().address == *backend_address)
731 }
732
733 pub fn find_backend(
734 &mut self,
735 backend_address: &SocketAddr,
736 ) -> Option<&mut Rc<RefCell<Backend>>> {
737 self.backends
738 .iter_mut()
739 .find(|backend| backend.borrow().address == *backend_address)
740 }
741
742 pub fn find_sticky(&mut self, sticky_session: &str) -> Option<&mut Rc<RefCell<Backend>>> {
743 self.backends
744 .iter_mut()
745 .find(|b| b.borrow().sticky_id.as_deref() == Some(sticky_session))
746 .and_then(|b| if b.borrow().can_open() { Some(b) } else { None })
747 }
748
749 pub fn available_backends(&mut self, backup: bool) -> Vec<Rc<RefCell<Backend>>> {
750 self.backends
751 .iter()
752 .filter(|backend| {
753 let owned = backend.borrow();
754 owned.backup == backup && owned.can_open()
755 })
756 .map(Clone::clone)
757 .collect()
758 }
759
760 pub fn next_available_backend(&mut self) -> Option<Rc<RefCell<Backend>>> {
761 let mut backends = self.available_backends(false);
762
763 if backends.is_empty() {
764 backends = self.available_backends(true);
765 }
766
767 if !backends.is_empty() {
768 if self.fail_open_warned {
770 info!(
771 "fail-open: cluster recovered, {} backends now healthy",
772 backends.len()
773 );
774 self.fail_open_warned = false;
775 }
776 return self.load_balancing.next_available_backend(&mut backends);
777 }
778
779 backends = self
788 .backends
789 .iter()
790 .filter(|b| {
791 let owned = b.borrow();
792 owned.status == BackendStatus::Normal
793 && matches!(owned.retry_policy.can_try(), Some(retry::RetryAction::OKAY))
794 })
795 .map(Clone::clone)
796 .collect();
797
798 if backends.is_empty() {
799 return None;
800 }
801
802 if !self.fail_open_warned {
806 warn!(
807 "fail-open: all backends unhealthy, routing to {} normal backends with retry-policy OKAY",
808 backends.len()
809 );
810 self.fail_open_warned = true;
811 }
812 count!(names::backend::FAIL_OPEN, 1);
813
814 self.load_balancing.next_available_backend(&mut backends)
815 }
816
817 pub fn set_load_balancing_policy(
818 &mut self,
819 load_balancing_policy: LoadBalancingAlgorithms,
820 metric: Option<LoadMetric>,
821 ) {
822 match load_balancing_policy {
823 LoadBalancingAlgorithms::RoundRobin => {
824 self.load_balancing = Box::new(RoundRobin::new())
825 }
826 LoadBalancingAlgorithms::Random => self.load_balancing = Box::new(Random {}),
827 LoadBalancingAlgorithms::LeastLoaded => {
828 self.load_balancing = Box::new(LeastLoaded {
829 metric: metric.unwrap_or(LoadMetric::Connections),
830 })
831 }
832 LoadBalancingAlgorithms::PowerOfTwo => {
833 self.load_balancing = Box::new(PowerOfTwo {
834 metric: metric.unwrap_or(LoadMetric::Connections),
835 })
836 }
837 }
838 }
839}
840
841#[cfg(test)]
842mod backends_test {
843
844 use std::{net::TcpListener, sync::mpsc::*, thread};
845
846 use super::*;
847
848 fn run_mock_tcp_server(addr: &str, stopper: Receiver<()>) {
849 let mut run = true;
850 let listener = TcpListener::bind(addr).unwrap();
851
852 thread::spawn(move || {
853 while run {
854 for _stream in listener.incoming() {
855 if let Ok(()) = stopper.try_recv() {
857 run = false;
858 }
859 }
860 }
861 });
862 }
863
864 #[test]
865 fn it_should_retrieve_a_backend_from_cluster_id_when_backends_have_been_recorded() {
866 let mut backend_map = BackendMap::new();
867 let cluster_id = "mycluster";
868
869 let backend_addr = "127.0.0.1:1236";
870 let (sender, receiver) = channel();
871 run_mock_tcp_server(backend_addr, receiver);
872
873 backend_map.add_backend(
874 cluster_id,
875 Backend::new(
876 &format!("{cluster_id}-1"),
877 backend_addr.parse().unwrap(),
878 None,
879 None,
880 None,
881 ),
882 );
883
884 assert!(backend_map.backend_from_cluster_id(cluster_id).is_ok());
885 sender.send(()).unwrap();
886 }
887
888 #[test]
889 fn it_should_not_retrieve_a_backend_from_cluster_id_when_backend_has_not_been_recorded() {
890 let mut backend_map = BackendMap::new();
891 let cluster_not_recorded = "not";
892 backend_map.add_backend(
893 "foo",
894 Backend::new("foo-1", "127.0.0.1:9001".parse().unwrap(), None, None, None),
895 );
896
897 assert!(
898 backend_map
899 .backend_from_cluster_id(cluster_not_recorded)
900 .is_err()
901 );
902 }
903
904 #[test]
905 fn it_should_not_retrieve_a_backend_from_cluster_id_when_backend_list_is_empty() {
906 let mut backend_map = BackendMap::new();
907
908 assert!(backend_map.backend_from_cluster_id("dumb").is_err());
909 }
910
911 #[test]
912 fn it_should_retrieve_a_backend_from_sticky_session_when_the_backend_has_been_recorded() {
913 let mut backend_map = BackendMap::new();
914 let cluster_id = "mycluster";
915 let sticky_session = "server-2";
916
917 let backend_addr = "127.0.0.1:3456";
918 let (sender, receiver) = channel();
919 run_mock_tcp_server(backend_addr, receiver);
920
921 backend_map.add_backend(
922 cluster_id,
923 Backend::new(
924 &format!("{cluster_id}-1"),
925 "127.0.0.1:9001".parse().unwrap(),
926 Some("server-1".to_string()),
927 None,
928 None,
929 ),
930 );
931 backend_map.add_backend(
932 cluster_id,
933 Backend::new(
934 &format!("{cluster_id}-2"),
935 "127.0.0.1:9000".parse().unwrap(),
936 Some("server-2".to_string()),
937 None,
938 None,
939 ),
940 );
941 backend_map.add_backend(
943 cluster_id,
944 Backend::new(
945 &format!("{cluster_id}-3"),
946 backend_addr.parse().unwrap(),
947 Some("server-3".to_string()),
948 None,
949 None,
950 ),
951 );
952
953 assert!(
954 backend_map
955 .backend_from_sticky_session(cluster_id, sticky_session)
956 .is_ok()
957 );
958 sender.send(()).unwrap();
959 }
960
961 #[test]
962 fn it_should_not_retrieve_a_backend_from_sticky_session_when_the_backend_has_not_been_recorded()
963 {
964 let mut backend_map = BackendMap::new();
965 let cluster_id = "mycluster";
966 let sticky_session = "test";
967
968 assert!(
969 backend_map
970 .backend_from_sticky_session(cluster_id, sticky_session)
971 .is_err()
972 );
973 }
974
975 #[test]
976 fn it_should_not_retrieve_a_backend_from_sticky_session_when_the_backend_list_is_empty() {
977 let mut backend_map = BackendMap::new();
978 let mycluster_not_recorded = "mycluster";
979 let sticky_session = "test";
980
981 assert!(
982 backend_map
983 .backend_from_sticky_session(mycluster_not_recorded, sticky_session)
984 .is_err()
985 );
986 }
987
988 #[test]
989 fn it_should_add_a_backend_when_he_doesnt_already_exist() {
990 let backend_id = "myback";
991 let mut backends_list = BackendList::new();
992 backends_list.add_backend(Backend::new(
993 backend_id,
994 "127.0.0.1:80".parse().unwrap(),
995 None,
996 None,
997 None,
998 ));
999
1000 assert_eq!(1, backends_list.backends.len());
1001 }
1002
1003 #[test]
1004 fn it_should_not_add_a_backend_when_he_already_exist() {
1005 let backend_id = "myback";
1006 let mut backends_list = BackendList::new();
1007 backends_list.add_backend(Backend::new(
1008 backend_id,
1009 "127.0.0.1:80".parse().unwrap(),
1010 None,
1011 None,
1012 None,
1013 ));
1014
1015 backends_list.add_backend(Backend::new(
1017 backend_id,
1018 "127.0.0.1:80".parse().unwrap(),
1019 None,
1020 None,
1021 None,
1022 ));
1023
1024 assert_eq!(1, backends_list.backends.len());
1025 }
1026
1027 fn unhealthy_backend(id: &str, port: u16) -> Backend {
1030 let mut backend = Backend::new(
1031 id,
1032 format!("127.0.0.1:{port}").parse().unwrap(),
1033 None,
1034 None,
1035 None,
1036 );
1037 backend.health.record_failure(1);
1039 assert!(!backend.health.is_healthy());
1040 backend
1041 }
1042
1043 #[test]
1044 fn fail_open_picks_normal_backend_in_retry_policy_okay() {
1045 let mut list = BackendList::new();
1050 list.add_backend(unhealthy_backend("b1", 9001));
1051 list.add_backend(unhealthy_backend("b2", 9002));
1052
1053 assert!(list.available_backends(false).is_empty());
1055 assert!(list.available_backends(true).is_empty());
1056
1057 let picked = list.next_available_backend();
1058 assert!(
1059 picked.is_some(),
1060 "fail-open must pick a Normal+OKAY backend"
1061 );
1062 assert!(list.fail_open_warned, "regime entry must latch the warn!");
1063 }
1064
1065 #[test]
1066 fn fail_open_skips_backend_in_retry_backoff() {
1067 let mut list = BackendList::new();
1073 list.add_backend(unhealthy_backend("b1", 9011));
1074 list.add_backend(unhealthy_backend("b2", 9012));
1075 for backend_rc in &list.backends {
1076 backend_rc.borrow_mut().retry_policy().fail();
1077 assert_eq!(
1078 Some(retry::RetryAction::WAIT),
1079 backend_rc.borrow().retry_policy.can_try(),
1080 "test fixture must place retry policy in WAIT"
1081 );
1082 }
1083
1084 let picked = list.next_available_backend();
1085 assert!(
1086 picked.is_none(),
1087 "fail-open must skip backends whose retry policy is in WAIT"
1088 );
1089 assert!(
1090 !list.fail_open_warned,
1091 "no candidate backends, no regime entry"
1092 );
1093 }
1094
1095 #[test]
1096 fn fail_open_warn_latched() {
1097 let mut list = BackendList::new();
1101 list.add_backend(unhealthy_backend("b1", 9021));
1102 list.add_backend(unhealthy_backend("b2", 9022));
1103
1104 assert!(list.next_available_backend().is_some());
1105 assert!(list.fail_open_warned, "first fail-open must latch");
1106
1107 assert!(list.next_available_backend().is_some());
1108 assert!(
1109 list.fail_open_warned,
1110 "subsequent fail-open routing keeps the latch"
1111 );
1112
1113 list.backends[0].borrow_mut().health.status = HealthStatus::Healthy;
1116 let picked = list.next_available_backend();
1117 assert!(
1118 picked.is_some(),
1119 "regular path must select the healed backend"
1120 );
1121 assert!(
1122 !list.fail_open_warned,
1123 "regime exit must clear the latch so the next entry is logged again"
1124 );
1125 }
1126
1127 fn healthy_backend(id: &str, port: u16) -> Backend {
1135 Backend::new(
1136 id,
1137 format!("127.0.0.1:{port}").parse().unwrap(),
1138 None,
1139 None,
1140 None,
1141 )
1142 }
1143
1144 #[test]
1145 fn is_available_requires_health_status_and_retry_policy() {
1146 let mut backend = Backend::new("b", "127.0.0.1:9050".parse().unwrap(), None, None, None);
1148 assert!(backend.is_available(), "fresh backend must be available");
1149
1150 backend.health.record_failure(1);
1152 assert!(!backend.is_available(), "unhealthy must not be available");
1153
1154 backend.health.status = HealthStatus::Healthy;
1160 assert!(backend.is_available());
1161 backend.retry_policy.force_down();
1162 assert!(
1163 backend.retry_policy.is_down(),
1164 "test setup: retry policy budget must be exhausted",
1165 );
1166 assert!(
1167 !backend.is_available(),
1168 "retry-policy backoff must fail the predicate"
1169 );
1170
1171 backend.retry_policy.succeed();
1173 backend.set_closing();
1174 assert!(
1175 !backend.is_available(),
1176 "Closing lifecycle status must fail the predicate"
1177 );
1178 }
1179
1180 #[test]
1181 fn evaluate_availability_empty_list_returns_zero_zero() {
1182 let list = BackendList::new();
1183 assert_eq!((0, 0), list.evaluate_availability());
1184 }
1185
1186 #[test]
1187 fn evaluate_availability_counts_only_healthy_normal_not_in_backoff() {
1188 let mut list = BackendList::new();
1189 list.add_backend(healthy_backend("b-ok-1", 9101));
1190 list.add_backend(healthy_backend("b-ok-2", 9102));
1191 list.add_backend(unhealthy_backend("b-bad", 9103));
1192 let (available, total) = list.evaluate_availability();
1193 assert_eq!(3, total, "every configured backend counts toward total");
1194 assert_eq!(
1195 2, available,
1196 "only the two healthy backends pass the predicate"
1197 );
1198 }
1199
1200 #[test]
1201 fn evaluate_availability_excludes_retry_policy_down() {
1202 let mut list = BackendList::new();
1203 list.add_backend(healthy_backend("b-fresh", 9111));
1204 list.add_backend(healthy_backend("b-fail", 9112));
1205 list.backends[1].borrow_mut().retry_policy.force_down();
1210 let (available, total) = list.evaluate_availability();
1211 assert_eq!(2, total);
1212 assert_eq!(
1213 1, available,
1214 "retry-policy is_down() backend must be excluded even when health.is_healthy()"
1215 );
1216 }
1217
1218 #[test]
1219 fn record_cluster_availability_flips_to_alldown_then_idempotent() {
1220 let mut map = BackendMap::new();
1221 let cluster_id = "c-flap";
1222 map.add_backend(cluster_id, unhealthy_backend("b1", 9201));
1223 let list = map.backends.get(cluster_id).expect("cluster present");
1226 assert_eq!(
1227 ClusterAvailability::AllDown,
1228 list.availability.get(),
1229 "single unhealthy backend must drive the cell to AllDown"
1230 );
1231 map.record_cluster_availability(cluster_id);
1233 let list = map.backends.get(cluster_id).expect("cluster present");
1234 assert_eq!(
1235 ClusterAvailability::AllDown,
1236 list.availability.get(),
1237 "repeat call must keep the cell at AllDown without flipping"
1238 );
1239 }
1240
1241 #[test]
1242 fn record_cluster_availability_recovers_to_available() {
1243 let mut map = BackendMap::new();
1244 let cluster_id = "c-recover";
1245 map.add_backend(cluster_id, unhealthy_backend("b1", 9301));
1246 assert_eq!(
1247 ClusterAvailability::AllDown,
1248 map.backends.get(cluster_id).unwrap().availability.get()
1249 );
1250 map.backends.get_mut(cluster_id).unwrap().backends[0]
1254 .borrow_mut()
1255 .health
1256 .status = HealthStatus::Healthy;
1257 map.record_cluster_availability(cluster_id);
1258 assert_eq!(
1259 ClusterAvailability::Available,
1260 map.backends.get(cluster_id).unwrap().availability.get(),
1261 "healed backend must flip the cell back to Available"
1262 );
1263 }
1264
1265 #[test]
1266 fn record_cluster_availability_empty_cluster_stays_available() {
1267 let mut map = BackendMap::new();
1268 let cluster_id = "c-empty";
1269 map.backends
1270 .insert(cluster_id.to_owned(), BackendList::new());
1271 map.record_cluster_availability(cluster_id);
1274 assert_eq!(
1275 ClusterAvailability::Available,
1276 map.backends.get(cluster_id).unwrap().availability.get(),
1277 "empty cluster must keep the cell at the default Available"
1278 );
1279 }
1280
1281 #[test]
1282 fn record_cluster_availability_missing_cluster_is_noop() {
1283 let map = BackendMap::new();
1284 map.record_cluster_availability("c-absent");
1286 assert!(
1287 !map.backends.contains_key("c-absent"),
1288 "helper must not insert a BackendList for an unknown cluster_id"
1289 );
1290 }
1291
1292 #[test]
1293 fn import_configuration_state_latches_cluster_rollup_gauges() {
1294 use crate::metrics::METRICS;
1295 use sozu_command_lib::proto::command::QueryMetricsOptions;
1296 let cluster_id = "c-import-rollup-9701";
1299 let mut map = BackendMap::new();
1300 let mut input = HashMap::new();
1301 input.insert(
1302 cluster_id.to_owned(),
1303 vec![sozu_command_lib::response::Backend {
1304 cluster_id: cluster_id.to_owned(),
1305 backend_id: "b1".to_owned(),
1306 address: "127.0.0.1:9701".parse().unwrap(),
1307 sticky_id: None,
1308 load_balancing_parameters: None,
1309 backup: None,
1310 }],
1311 );
1312 map.import_configuration_state(&input);
1313 let response = METRICS
1314 .with(|m| {
1315 m.borrow_mut().query(&QueryMetricsOptions {
1316 metric_names: vec![
1317 names::cluster::AVAILABLE_BACKENDS.to_owned(),
1318 names::cluster::TOTAL_BACKENDS.to_owned(),
1319 ],
1320 cluster_ids: vec![cluster_id.to_owned()],
1321 backend_ids: vec![],
1322 list: false,
1323 no_clusters: false,
1324 workers: false,
1325 })
1326 })
1327 .expect("metrics query succeeds");
1328 let cluster_metrics = match response.content_type {
1329 Some(
1330 sozu_command_lib::proto::command::response_content::ContentType::WorkerMetrics(wm),
1331 ) => wm,
1332 other => panic!("expected WorkerMetrics, got {other:?}"),
1333 };
1334 let cm = cluster_metrics
1335 .clusters
1336 .get(cluster_id)
1337 .expect("imported cluster must have a ClusterMetrics entry");
1338 assert!(
1342 cm.cluster.contains_key(names::cluster::AVAILABLE_BACKENDS),
1343 "cluster.available_backends gauge must be latched at import time"
1344 );
1345 assert!(
1346 cm.cluster.contains_key(names::cluster::TOTAL_BACKENDS),
1347 "cluster.total_backends gauge must be latched at import time"
1348 );
1349 }
1350
1351 #[test]
1352 fn set_health_check_config_none_re_emits_rollup_after_reset() {
1353 let mut map = BackendMap::new();
1354 let cluster_id = "c-hc-reset";
1355 map.add_backend(cluster_id, unhealthy_backend("b1", 9801));
1358 assert_eq!(
1359 ClusterAvailability::AllDown,
1360 map.backends.get(cluster_id).unwrap().availability.get(),
1361 "test setup: unhealthy backend must register the cell at AllDown"
1362 );
1363 map.set_health_check_config(cluster_id, None);
1367 assert_eq!(
1368 ClusterAvailability::Available,
1369 map.backends.get(cluster_id).unwrap().availability.get(),
1370 "set_health_check_config(None) must re-emit the rollup after \
1371 resetting backend health, otherwise dashboards stay stuck at AllDown"
1372 );
1373 }
1374}