sozu_lib/
backends.rs

1use std::{cell::RefCell, collections::HashMap, net::SocketAddr, rc::Rc, time::Duration};
2
3use mio::net::TcpStream;
4
5use sozu_command::{
6    proto::command::{Event, EventKind, LoadBalancingAlgorithms, LoadBalancingParams, LoadMetric},
7    state::ClusterId,
8};
9
10use crate::{
11    load_balancing::{LeastLoaded, LoadBalancingAlgorithm, PowerOfTwo, Random, RoundRobin},
12    retry::{self, RetryPolicy},
13    server::{self, push_event},
14    PeakEWMA,
15};
16
17#[derive(thiserror::Error, Debug)]
18pub enum BackendError {
19    #[error("No backend found for cluster {0}")]
20    NoBackendForCluster(String),
21    #[error("Failed to connect to socket with MIO: {0}")]
22    MioConnection(std::io::Error),
23    #[error("This backend is not in a normal status: status={0:?}")]
24    Status(BackendStatus),
25    #[error(
26        "could not connect {cluster_id} to {backend_address:?} ({failures} failures): {error}"
27    )]
28    ConnectionFailures {
29        cluster_id: String,
30        backend_address: SocketAddr,
31        failures: usize,
32        error: String,
33    },
34}
35
36#[derive(Debug, PartialEq, Eq, Clone)]
37pub enum BackendStatus {
38    Normal,
39    Closing,
40    Closed,
41}
42
43#[derive(Debug, PartialEq, Clone)]
44pub struct Backend {
45    pub sticky_id: Option<String>,
46    pub backend_id: String,
47    pub address: SocketAddr,
48    pub status: BackendStatus,
49    pub retry_policy: retry::RetryPolicyWrapper,
50    pub active_connections: usize,
51    pub active_requests: usize,
52    pub failures: usize,
53    pub load_balancing_parameters: Option<LoadBalancingParams>,
54    pub backup: bool,
55    pub connection_time: PeakEWMA,
56}
57
58impl Backend {
59    pub fn new(
60        backend_id: &str,
61        address: SocketAddr,
62        sticky_id: Option<String>,
63        load_balancing_parameters: Option<LoadBalancingParams>,
64        backup: Option<bool>,
65    ) -> Backend {
66        let desired_policy = retry::ExponentialBackoffPolicy::new(6);
67        Backend {
68            sticky_id,
69            backend_id: backend_id.to_string(),
70            address,
71            status: BackendStatus::Normal,
72            retry_policy: desired_policy.into(),
73            active_connections: 0,
74            active_requests: 0,
75            failures: 0,
76            load_balancing_parameters,
77            backup: backup.unwrap_or(false),
78            connection_time: PeakEWMA::new(),
79        }
80    }
81
82    pub fn set_closing(&mut self) {
83        self.status = BackendStatus::Closing;
84    }
85
86    pub fn retry_policy(&mut self) -> &mut retry::RetryPolicyWrapper {
87        &mut self.retry_policy
88    }
89
90    pub fn can_open(&self) -> bool {
91        if let Some(action) = self.retry_policy.can_try() {
92            self.status == BackendStatus::Normal && action == retry::RetryAction::OKAY
93        } else {
94            false
95        }
96    }
97
98    pub fn inc_connections(&mut self) -> Option<usize> {
99        if self.status == BackendStatus::Normal {
100            self.active_connections += 1;
101            Some(self.active_connections)
102        } else {
103            None
104        }
105    }
106
107    /// TODO: normalize with saturating_sub()
108    pub fn dec_connections(&mut self) -> Option<usize> {
109        match self.status {
110            BackendStatus::Normal => {
111                if self.active_connections > 0 {
112                    self.active_connections -= 1;
113                }
114                Some(self.active_connections)
115            }
116            BackendStatus::Closed => None,
117            BackendStatus::Closing => {
118                if self.active_connections > 0 {
119                    self.active_connections -= 1;
120                }
121                if self.active_connections == 0 {
122                    self.status = BackendStatus::Closed;
123                    None
124                } else {
125                    Some(self.active_connections)
126                }
127            }
128        }
129    }
130
131    pub fn set_connection_time(&mut self, dur: Duration) {
132        self.connection_time.observe(dur.as_nanos() as f64);
133    }
134
135    pub fn peak_ewma_connection(&mut self) -> f64 {
136        self.connection_time.get(self.active_connections)
137    }
138
139    pub fn try_connect(&mut self) -> Result<mio::net::TcpStream, BackendError> {
140        if self.status != BackendStatus::Normal {
141            return Err(BackendError::Status(self.status.to_owned()));
142        }
143
144        match mio::net::TcpStream::connect(self.address) {
145            Ok(tcp_stream) => {
146                //self.retry_policy.succeed();
147                self.inc_connections();
148                Ok(tcp_stream)
149            }
150            Err(io_error) => {
151                self.retry_policy.fail();
152                self.failures += 1;
153                // TODO: handle EINPROGRESS. It is difficult. It is discussed here:
154                // https://docs.rs/mio/latest/mio/net/struct.TcpStream.html#method.connect
155                // with an example code here:
156                // https://github.com/Thomasdezeeuw/heph/blob/0c4f1ab3eaf08bea1d65776528bfd6114c9f8374/src/net/tcp/stream.rs#L560-L622
157                Err(BackendError::MioConnection(io_error))
158            }
159        }
160    }
161}
162
163// when a backend has been removed from configuration and the last connection to
164// it has stopped, it will be dropped, so we can notify that the backend server
165// can be safely stopped
166impl std::ops::Drop for Backend {
167    fn drop(&mut self) {
168        server::push_event(Event {
169            kind: EventKind::RemovedBackendHasNoConnections as i32,
170            backend_id: Some(self.backend_id.clone()),
171            address: Some(self.address.into()),
172            cluster_id: None,
173        });
174    }
175}
176
177#[derive(Debug)]
178pub struct BackendMap {
179    pub backends: HashMap<ClusterId, BackendList>,
180    pub max_failures: usize,
181    pub available: bool,
182}
183
184impl Default for BackendMap {
185    fn default() -> Self {
186        Self::new()
187    }
188}
189
190impl BackendMap {
191    pub fn new() -> BackendMap {
192        BackendMap {
193            backends: HashMap::new(),
194            max_failures: 3,
195            available: true,
196        }
197    }
198
199    pub fn import_configuration_state(
200        &mut self,
201        backends: &HashMap<ClusterId, Vec<sozu_command::response::Backend>>,
202    ) {
203        self.backends
204            .extend(backends.iter().map(|(cluster_id, backend_vec)| {
205                (
206                    cluster_id.to_string(),
207                    BackendList::import_configuration_state(backend_vec),
208                )
209            }));
210    }
211
212    pub fn add_backend(&mut self, cluster_id: &str, backend: Backend) {
213        self.backends
214            .entry(cluster_id.to_string())
215            .or_default()
216            .add_backend(backend);
217    }
218
219    // TODO: return <Result, BackendError>, log the error downstream
220    pub fn remove_backend(&mut self, cluster_id: &str, backend_address: &SocketAddr) {
221        if let Some(backends) = self.backends.get_mut(cluster_id) {
222            backends.remove_backend(backend_address);
223        } else {
224            error!(
225                "Backend was already removed: cluster id {}, address {:?}",
226                cluster_id, backend_address
227            );
228        }
229    }
230
231    // TODO: return <Result, BackendError>, log the error downstream
232    pub fn close_backend_connection(&mut self, cluster_id: &str, addr: &SocketAddr) {
233        if let Some(cluster_backends) = self.backends.get_mut(cluster_id) {
234            if let Some(ref mut backend) = cluster_backends.find_backend(addr) {
235                backend.borrow_mut().dec_connections();
236            }
237        }
238    }
239
240    pub fn has_backend(&self, cluster_id: &str, backend: &Backend) -> bool {
241        self.backends
242            .get(cluster_id)
243            .map(|backends| backends.has_backend(&backend.address))
244            .unwrap_or(false)
245    }
246
247    pub fn backend_from_cluster_id(
248        &mut self,
249        cluster_id: &str,
250    ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
251        let cluster_backends = self
252            .backends
253            .get_mut(cluster_id)
254            .ok_or(BackendError::NoBackendForCluster(cluster_id.to_owned()))?;
255
256        if cluster_backends.backends.is_empty() {
257            self.available = false;
258            return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
259        }
260
261        let next_backend = match cluster_backends.next_available_backend() {
262            Some(nb) => nb,
263            None => {
264                if self.available {
265                    self.available = false;
266
267                    push_event(Event {
268                        kind: EventKind::NoAvailableBackends as i32,
269                        cluster_id: Some(cluster_id.to_owned()),
270                        backend_id: None,
271                        address: None,
272                    });
273                }
274                return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
275            }
276        };
277
278        let mut borrowed_backend = next_backend.borrow_mut();
279
280        debug!(
281            "Connecting {} -> {:?}",
282            cluster_id,
283            (
284                borrowed_backend.address,
285                borrowed_backend.active_connections,
286                borrowed_backend.failures
287            )
288        );
289
290        let tcp_stream = borrowed_backend.try_connect().map_err(|backend_error| {
291            BackendError::ConnectionFailures {
292                cluster_id: cluster_id.to_owned(),
293                backend_address: borrowed_backend.address,
294                failures: borrowed_backend.failures,
295                error: backend_error.to_string(),
296            }
297        })?;
298        self.available = true;
299
300        Ok((next_backend.clone(), tcp_stream))
301    }
302
303    pub fn backend_from_sticky_session(
304        &mut self,
305        cluster_id: &str,
306        sticky_session: &str,
307    ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
308        let sticky_conn = self
309            .backends
310            .get_mut(cluster_id)
311            .and_then(|cluster_backends| cluster_backends.find_sticky(sticky_session))
312            .map(|backend| {
313                let mut borrowed = backend.borrow_mut();
314                let conn = borrowed.try_connect();
315
316                conn.map(|tcp_stream| (backend.clone(), tcp_stream))
317                    .map_err(|e| {
318                        error!(
319                            "could not connect {} to {:?} using session {} ({} failures)",
320                            cluster_id, borrowed.address, sticky_session, borrowed.failures
321                        );
322                        e
323                    })
324            });
325
326        match sticky_conn {
327            Some(backend_and_stream) => backend_and_stream,
328            None => {
329                debug!(
330                    "Couldn't find a backend corresponding to sticky_session {} for cluster {}",
331                    sticky_session, cluster_id
332                );
333                self.backend_from_cluster_id(cluster_id)
334            }
335        }
336    }
337
338    pub fn set_load_balancing_policy_for_cluster(
339        &mut self,
340        cluster_id: &str,
341        lb_algo: LoadBalancingAlgorithms,
342        metric: Option<LoadMetric>,
343    ) {
344        // The cluster can be created before the backends were registered because of the async config messages.
345        // So when we set the load balancing policy, we have to create the backend list if if it doesn't exist yet.
346        let cluster_backends = self.get_or_create_backend_list_for_cluster(cluster_id);
347        cluster_backends.set_load_balancing_policy(lb_algo, metric);
348    }
349
350    pub fn get_or_create_backend_list_for_cluster(&mut self, cluster_id: &str) -> &mut BackendList {
351        self.backends.entry(cluster_id.to_string()).or_default()
352    }
353}
354
355#[derive(Debug)]
356pub struct BackendList {
357    pub backends: Vec<Rc<RefCell<Backend>>>,
358    pub next_id: u32,
359    pub load_balancing: Box<dyn LoadBalancingAlgorithm>,
360}
361
362impl Default for BackendList {
363    fn default() -> Self {
364        Self::new()
365    }
366}
367
368impl BackendList {
369    pub fn new() -> BackendList {
370        BackendList {
371            backends: Vec::new(),
372            next_id: 0,
373            load_balancing: Box::new(Random),
374        }
375    }
376
377    pub fn import_configuration_state(
378        backend_vec: &[sozu_command_lib::response::Backend],
379    ) -> BackendList {
380        let mut list = BackendList::new();
381        for backend in backend_vec {
382            let backend = Backend::new(
383                &backend.backend_id,
384                backend.address,
385                backend.sticky_id.clone(),
386                backend.load_balancing_parameters,
387                backend.backup,
388            );
389            list.add_backend(backend);
390        }
391
392        list
393    }
394
395    pub fn add_backend(&mut self, backend: Backend) {
396        match self.backends.iter_mut().find(|b| {
397            b.borrow().address == backend.address && b.borrow().backend_id == backend.backend_id
398        }) {
399            None => {
400                let backend = Rc::new(RefCell::new(backend));
401                self.backends.push(backend);
402                self.next_id += 1;
403            }
404            // the backend already exists, update the configuration while
405            // keeping connection retry state
406            Some(old_backend) => {
407                let mut b = old_backend.borrow_mut();
408                b.sticky_id.clone_from(&backend.sticky_id);
409                b.load_balancing_parameters
410                    .clone_from(&backend.load_balancing_parameters);
411                b.backup = backend.backup;
412            }
413        }
414    }
415
416    pub fn remove_backend(&mut self, backend_address: &SocketAddr) {
417        self.backends
418            .retain(|backend| &backend.borrow().address != backend_address);
419    }
420
421    pub fn has_backend(&self, backend_address: &SocketAddr) -> bool {
422        self.backends
423            .iter()
424            .any(|backend| backend.borrow().address == *backend_address)
425    }
426
427    pub fn find_backend(
428        &mut self,
429        backend_address: &SocketAddr,
430    ) -> Option<&mut Rc<RefCell<Backend>>> {
431        self.backends
432            .iter_mut()
433            .find(|backend| backend.borrow().address == *backend_address)
434    }
435
436    pub fn find_sticky(&mut self, sticky_session: &str) -> Option<&mut Rc<RefCell<Backend>>> {
437        self.backends
438            .iter_mut()
439            .find(|b| b.borrow().sticky_id.as_deref() == Some(sticky_session))
440            .and_then(|b| if b.borrow().can_open() { Some(b) } else { None })
441    }
442
443    pub fn available_backends(&mut self, backup: bool) -> Vec<Rc<RefCell<Backend>>> {
444        self.backends
445            .iter()
446            .filter(|backend| {
447                let owned = backend.borrow();
448                owned.backup == backup && owned.can_open()
449            })
450            .map(Clone::clone)
451            .collect()
452    }
453
454    pub fn next_available_backend(&mut self) -> Option<Rc<RefCell<Backend>>> {
455        let mut backends = self.available_backends(false);
456
457        if backends.is_empty() {
458            backends = self.available_backends(true);
459        }
460
461        if backends.is_empty() {
462            return None;
463        }
464
465        self.load_balancing.next_available_backend(&mut backends)
466    }
467
468    pub fn set_load_balancing_policy(
469        &mut self,
470        load_balancing_policy: LoadBalancingAlgorithms,
471        metric: Option<LoadMetric>,
472    ) {
473        match load_balancing_policy {
474            LoadBalancingAlgorithms::RoundRobin => {
475                self.load_balancing = Box::new(RoundRobin::new())
476            }
477            LoadBalancingAlgorithms::Random => self.load_balancing = Box::new(Random {}),
478            LoadBalancingAlgorithms::LeastLoaded => {
479                self.load_balancing = Box::new(LeastLoaded {
480                    metric: metric.unwrap_or(LoadMetric::Connections),
481                })
482            }
483            LoadBalancingAlgorithms::PowerOfTwo => {
484                self.load_balancing = Box::new(PowerOfTwo {
485                    metric: metric.unwrap_or(LoadMetric::Connections),
486                })
487            }
488        }
489    }
490}
491
492#[cfg(test)]
493mod backends_test {
494
495    use super::*;
496    use std::{net::TcpListener, sync::mpsc::*, thread};
497
498    fn run_mock_tcp_server(addr: &str, stopper: Receiver<()>) {
499        let mut run = true;
500        let listener = TcpListener::bind(addr).unwrap();
501
502        thread::spawn(move || {
503            while run {
504                for _stream in listener.incoming() {
505                    // accept connections
506                    if let Ok(()) = stopper.try_recv() {
507                        run = false;
508                    }
509                }
510            }
511        });
512    }
513
514    #[test]
515    fn it_should_retrieve_a_backend_from_cluster_id_when_backends_have_been_recorded() {
516        let mut backend_map = BackendMap::new();
517        let cluster_id = "mycluster";
518
519        let backend_addr = "127.0.0.1:1236";
520        let (sender, receiver) = channel();
521        run_mock_tcp_server(backend_addr, receiver);
522
523        backend_map.add_backend(
524            cluster_id,
525            Backend::new(
526                &format!("{cluster_id}-1"),
527                backend_addr.parse().unwrap(),
528                None,
529                None,
530                None,
531            ),
532        );
533
534        assert!(backend_map.backend_from_cluster_id(cluster_id).is_ok());
535        sender.send(()).unwrap();
536    }
537
538    #[test]
539    fn it_should_not_retrieve_a_backend_from_cluster_id_when_backend_has_not_been_recorded() {
540        let mut backend_map = BackendMap::new();
541        let cluster_not_recorded = "not";
542        backend_map.add_backend(
543            "foo",
544            Backend::new("foo-1", "127.0.0.1:9001".parse().unwrap(), None, None, None),
545        );
546
547        assert!(backend_map
548            .backend_from_cluster_id(cluster_not_recorded)
549            .is_err());
550    }
551
552    #[test]
553    fn it_should_not_retrieve_a_backend_from_cluster_id_when_backend_list_is_empty() {
554        let mut backend_map = BackendMap::new();
555
556        assert!(backend_map.backend_from_cluster_id("dumb").is_err());
557    }
558
559    #[test]
560    fn it_should_retrieve_a_backend_from_sticky_session_when_the_backend_has_been_recorded() {
561        let mut backend_map = BackendMap::new();
562        let cluster_id = "mycluster";
563        let sticky_session = "server-2";
564
565        let backend_addr = "127.0.0.1:3456";
566        let (sender, receiver) = channel();
567        run_mock_tcp_server(backend_addr, receiver);
568
569        backend_map.add_backend(
570            cluster_id,
571            Backend::new(
572                &format!("{cluster_id}-1"),
573                "127.0.0.1:9001".parse().unwrap(),
574                Some("server-1".to_string()),
575                None,
576                None,
577            ),
578        );
579        backend_map.add_backend(
580            cluster_id,
581            Backend::new(
582                &format!("{cluster_id}-2"),
583                "127.0.0.1:9000".parse().unwrap(),
584                Some("server-2".to_string()),
585                None,
586                None,
587            ),
588        );
589        // sticky backend
590        backend_map.add_backend(
591            cluster_id,
592            Backend::new(
593                &format!("{cluster_id}-3"),
594                backend_addr.parse().unwrap(),
595                Some("server-3".to_string()),
596                None,
597                None,
598            ),
599        );
600
601        assert!(backend_map
602            .backend_from_sticky_session(cluster_id, sticky_session)
603            .is_ok());
604        sender.send(()).unwrap();
605    }
606
607    #[test]
608    fn it_should_not_retrieve_a_backend_from_sticky_session_when_the_backend_has_not_been_recorded()
609    {
610        let mut backend_map = BackendMap::new();
611        let cluster_id = "mycluster";
612        let sticky_session = "test";
613
614        assert!(backend_map
615            .backend_from_sticky_session(cluster_id, sticky_session)
616            .is_err());
617    }
618
619    #[test]
620    fn it_should_not_retrieve_a_backend_from_sticky_session_when_the_backend_list_is_empty() {
621        let mut backend_map = BackendMap::new();
622        let mycluster_not_recorded = "mycluster";
623        let sticky_session = "test";
624
625        assert!(backend_map
626            .backend_from_sticky_session(mycluster_not_recorded, sticky_session)
627            .is_err());
628    }
629
630    #[test]
631    fn it_should_add_a_backend_when_he_doesnt_already_exist() {
632        let backend_id = "myback";
633        let mut backends_list = BackendList::new();
634        backends_list.add_backend(Backend::new(
635            backend_id,
636            "127.0.0.1:80".parse().unwrap(),
637            None,
638            None,
639            None,
640        ));
641
642        assert_eq!(1, backends_list.backends.len());
643    }
644
645    #[test]
646    fn it_should_not_add_a_backend_when_he_already_exist() {
647        let backend_id = "myback";
648        let mut backends_list = BackendList::new();
649        backends_list.add_backend(Backend::new(
650            backend_id,
651            "127.0.0.1:80".parse().unwrap(),
652            None,
653            None,
654            None,
655        ));
656
657        //same backend id
658        backends_list.add_backend(Backend::new(
659            backend_id,
660            "127.0.0.1:80".parse().unwrap(),
661            None,
662            None,
663            None,
664        ));
665
666        assert_eq!(1, backends_list.backends.len());
667    }
668}