sozu_lib/
backends.rs

1use std::{cell::RefCell, collections::HashMap, net::SocketAddr, rc::Rc, time::Duration};
2
3use mio::net::TcpStream;
4use sozu_command::{
5    proto::command::{Event, EventKind, LoadBalancingAlgorithms, LoadBalancingParams, LoadMetric},
6    state::ClusterId,
7};
8
9use crate::{
10    PeakEWMA,
11    load_balancing::{LeastLoaded, LoadBalancingAlgorithm, PowerOfTwo, Random, RoundRobin},
12    retry::{self, RetryPolicy},
13    server::{self, push_event},
14};
15
16#[derive(thiserror::Error, Debug)]
17pub enum BackendError {
18    #[error("No backend found for cluster {0}")]
19    NoBackendForCluster(String),
20    #[error("Failed to connect to socket with MIO: {0}")]
21    MioConnection(std::io::Error),
22    #[error("This backend is not in a normal status: status={0:?}")]
23    Status(BackendStatus),
24    #[error("could not connect {cluster_id} to {backend_address:?} ({failures} failures): {error}")]
25    ConnectionFailures {
26        cluster_id: String,
27        backend_address: SocketAddr,
28        failures: usize,
29        error: String,
30    },
31}
32
33#[derive(Debug, PartialEq, Eq, Clone)]
34pub enum BackendStatus {
35    Normal,
36    Closing,
37    Closed,
38}
39
40#[derive(Debug, PartialEq, Clone)]
41pub struct Backend {
42    pub sticky_id: Option<String>,
43    pub backend_id: String,
44    pub address: SocketAddr,
45    pub status: BackendStatus,
46    pub retry_policy: retry::RetryPolicyWrapper,
47    pub active_connections: usize,
48    pub active_requests: usize,
49    pub failures: usize,
50    pub load_balancing_parameters: Option<LoadBalancingParams>,
51    pub backup: bool,
52    pub connection_time: PeakEWMA,
53}
54
55impl Backend {
56    pub fn new(
57        backend_id: &str,
58        address: SocketAddr,
59        sticky_id: Option<String>,
60        load_balancing_parameters: Option<LoadBalancingParams>,
61        backup: Option<bool>,
62    ) -> Backend {
63        let desired_policy = retry::ExponentialBackoffPolicy::new(6);
64        Backend {
65            sticky_id,
66            backend_id: backend_id.to_string(),
67            address,
68            status: BackendStatus::Normal,
69            retry_policy: desired_policy.into(),
70            active_connections: 0,
71            active_requests: 0,
72            failures: 0,
73            load_balancing_parameters,
74            backup: backup.unwrap_or(false),
75            connection_time: PeakEWMA::new(),
76        }
77    }
78
79    pub fn set_closing(&mut self) {
80        self.status = BackendStatus::Closing;
81    }
82
83    pub fn retry_policy(&mut self) -> &mut retry::RetryPolicyWrapper {
84        &mut self.retry_policy
85    }
86
87    pub fn can_open(&self) -> bool {
88        if let Some(action) = self.retry_policy.can_try() {
89            self.status == BackendStatus::Normal && action == retry::RetryAction::OKAY
90        } else {
91            false
92        }
93    }
94
95    pub fn inc_connections(&mut self) -> Option<usize> {
96        if self.status == BackendStatus::Normal {
97            self.active_connections += 1;
98            Some(self.active_connections)
99        } else {
100            None
101        }
102    }
103
104    /// TODO: normalize with saturating_sub()
105    pub fn dec_connections(&mut self) -> Option<usize> {
106        match self.status {
107            BackendStatus::Normal => {
108                if self.active_connections > 0 {
109                    self.active_connections -= 1;
110                }
111                Some(self.active_connections)
112            }
113            BackendStatus::Closed => None,
114            BackendStatus::Closing => {
115                if self.active_connections > 0 {
116                    self.active_connections -= 1;
117                }
118                if self.active_connections == 0 {
119                    self.status = BackendStatus::Closed;
120                    None
121                } else {
122                    Some(self.active_connections)
123                }
124            }
125        }
126    }
127
128    pub fn set_connection_time(&mut self, dur: Duration) {
129        self.connection_time.observe(dur.as_nanos() as f64);
130    }
131
132    pub fn peak_ewma_connection(&mut self) -> f64 {
133        self.connection_time.get(self.active_connections)
134    }
135
136    pub fn try_connect(&mut self) -> Result<mio::net::TcpStream, BackendError> {
137        if self.status != BackendStatus::Normal {
138            return Err(BackendError::Status(self.status.to_owned()));
139        }
140
141        match mio::net::TcpStream::connect(self.address) {
142            Ok(tcp_stream) => {
143                //self.retry_policy.succeed();
144                self.inc_connections();
145                Ok(tcp_stream)
146            }
147            Err(io_error) => {
148                self.retry_policy.fail();
149                self.failures += 1;
150                // TODO: handle EINPROGRESS. It is difficult. It is discussed here:
151                // https://docs.rs/mio/latest/mio/net/struct.TcpStream.html#method.connect
152                // with an example code here:
153                // https://github.com/Thomasdezeeuw/heph/blob/0c4f1ab3eaf08bea1d65776528bfd6114c9f8374/src/net/tcp/stream.rs#L560-L622
154                Err(BackendError::MioConnection(io_error))
155            }
156        }
157    }
158}
159
160// when a backend has been removed from configuration and the last connection to
161// it has stopped, it will be dropped, so we can notify that the backend server
162// can be safely stopped
163impl std::ops::Drop for Backend {
164    fn drop(&mut self) {
165        server::push_event(Event {
166            kind: EventKind::RemovedBackendHasNoConnections as i32,
167            backend_id: Some(self.backend_id.clone()),
168            address: Some(self.address.into()),
169            cluster_id: None,
170        });
171    }
172}
173
174#[derive(Debug)]
175pub struct BackendMap {
176    pub backends: HashMap<ClusterId, BackendList>,
177    pub max_failures: usize,
178    pub available: bool,
179}
180
181impl Default for BackendMap {
182    fn default() -> Self {
183        Self::new()
184    }
185}
186
187impl BackendMap {
188    pub fn new() -> BackendMap {
189        BackendMap {
190            backends: HashMap::new(),
191            max_failures: 3,
192            available: true,
193        }
194    }
195
196    pub fn import_configuration_state(
197        &mut self,
198        backends: &HashMap<ClusterId, Vec<sozu_command::response::Backend>>,
199    ) {
200        self.backends
201            .extend(backends.iter().map(|(cluster_id, backend_vec)| {
202                (
203                    cluster_id.to_string(),
204                    BackendList::import_configuration_state(backend_vec),
205                )
206            }));
207    }
208
209    pub fn add_backend(&mut self, cluster_id: &str, backend: Backend) {
210        self.backends
211            .entry(cluster_id.to_string())
212            .or_default()
213            .add_backend(backend);
214    }
215
216    // TODO: return <Result, BackendError>, log the error downstream
217    pub fn remove_backend(&mut self, cluster_id: &str, backend_address: &SocketAddr) {
218        if let Some(backends) = self.backends.get_mut(cluster_id) {
219            backends.remove_backend(backend_address);
220        } else {
221            error!(
222                "Backend was already removed: cluster id {}, address {:?}",
223                cluster_id, backend_address
224            );
225        }
226    }
227
228    // TODO: return <Result, BackendError>, log the error downstream
229    pub fn close_backend_connection(&mut self, cluster_id: &str, addr: &SocketAddr) {
230        if let Some(cluster_backends) = self.backends.get_mut(cluster_id) {
231            if let Some(ref mut backend) = cluster_backends.find_backend(addr) {
232                backend.borrow_mut().dec_connections();
233            }
234        }
235    }
236
237    pub fn has_backend(&self, cluster_id: &str, backend: &Backend) -> bool {
238        self.backends
239            .get(cluster_id)
240            .map(|backends| backends.has_backend(&backend.address))
241            .unwrap_or(false)
242    }
243
244    pub fn backend_from_cluster_id(
245        &mut self,
246        cluster_id: &str,
247    ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
248        let cluster_backends = self
249            .backends
250            .get_mut(cluster_id)
251            .ok_or(BackendError::NoBackendForCluster(cluster_id.to_owned()))?;
252
253        if cluster_backends.backends.is_empty() {
254            self.available = false;
255            return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
256        }
257
258        let next_backend = match cluster_backends.next_available_backend() {
259            Some(nb) => nb,
260            None => {
261                if self.available {
262                    self.available = false;
263
264                    push_event(Event {
265                        kind: EventKind::NoAvailableBackends as i32,
266                        cluster_id: Some(cluster_id.to_owned()),
267                        backend_id: None,
268                        address: None,
269                    });
270                }
271                return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
272            }
273        };
274
275        let mut borrowed_backend = next_backend.borrow_mut();
276
277        debug!(
278            "Connecting {} -> {:?}",
279            cluster_id,
280            (
281                borrowed_backend.address,
282                borrowed_backend.active_connections,
283                borrowed_backend.failures
284            )
285        );
286
287        let tcp_stream = borrowed_backend.try_connect().map_err(|backend_error| {
288            BackendError::ConnectionFailures {
289                cluster_id: cluster_id.to_owned(),
290                backend_address: borrowed_backend.address,
291                failures: borrowed_backend.failures,
292                error: backend_error.to_string(),
293            }
294        })?;
295        self.available = true;
296
297        Ok((next_backend.clone(), tcp_stream))
298    }
299
300    pub fn backend_from_sticky_session(
301        &mut self,
302        cluster_id: &str,
303        sticky_session: &str,
304    ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
305        let sticky_conn = self
306            .backends
307            .get_mut(cluster_id)
308            .and_then(|cluster_backends| cluster_backends.find_sticky(sticky_session))
309            .map(|backend| {
310                let mut borrowed = backend.borrow_mut();
311                let conn = borrowed.try_connect();
312
313                conn.map(|tcp_stream| (backend.clone(), tcp_stream))
314                    .inspect_err(|_| {
315                        error!(
316                            "could not connect {} to {:?} using session {} ({} failures)",
317                            cluster_id, borrowed.address, sticky_session, borrowed.failures
318                        )
319                    })
320            });
321
322        match sticky_conn {
323            Some(backend_and_stream) => backend_and_stream,
324            None => {
325                debug!(
326                    "Couldn't find a backend corresponding to sticky_session {} for cluster {}",
327                    sticky_session, cluster_id
328                );
329                self.backend_from_cluster_id(cluster_id)
330            }
331        }
332    }
333
334    pub fn set_load_balancing_policy_for_cluster(
335        &mut self,
336        cluster_id: &str,
337        lb_algo: LoadBalancingAlgorithms,
338        metric: Option<LoadMetric>,
339    ) {
340        // The cluster can be created before the backends were registered because of the async config messages.
341        // So when we set the load balancing policy, we have to create the backend list if if it doesn't exist yet.
342        let cluster_backends = self.get_or_create_backend_list_for_cluster(cluster_id);
343        cluster_backends.set_load_balancing_policy(lb_algo, metric);
344    }
345
346    pub fn get_or_create_backend_list_for_cluster(&mut self, cluster_id: &str) -> &mut BackendList {
347        self.backends.entry(cluster_id.to_string()).or_default()
348    }
349}
350
351#[derive(Debug)]
352pub struct BackendList {
353    pub backends: Vec<Rc<RefCell<Backend>>>,
354    pub next_id: u32,
355    pub load_balancing: Box<dyn LoadBalancingAlgorithm>,
356}
357
358impl Default for BackendList {
359    fn default() -> Self {
360        Self::new()
361    }
362}
363
364impl BackendList {
365    pub fn new() -> BackendList {
366        BackendList {
367            backends: Vec::new(),
368            next_id: 0,
369            load_balancing: Box::new(Random),
370        }
371    }
372
373    pub fn import_configuration_state(
374        backend_vec: &[sozu_command_lib::response::Backend],
375    ) -> BackendList {
376        let mut list = BackendList::new();
377        for backend in backend_vec {
378            let backend = Backend::new(
379                &backend.backend_id,
380                backend.address,
381                backend.sticky_id.clone(),
382                backend.load_balancing_parameters,
383                backend.backup,
384            );
385            list.add_backend(backend);
386        }
387
388        list
389    }
390
391    pub fn add_backend(&mut self, backend: Backend) {
392        match self.backends.iter_mut().find(|b| {
393            b.borrow().address == backend.address && b.borrow().backend_id == backend.backend_id
394        }) {
395            None => {
396                let backend = Rc::new(RefCell::new(backend));
397                self.backends.push(backend);
398                self.next_id += 1;
399            }
400            // the backend already exists, update the configuration while
401            // keeping connection retry state
402            Some(old_backend) => {
403                let mut b = old_backend.borrow_mut();
404                b.sticky_id.clone_from(&backend.sticky_id);
405                b.load_balancing_parameters
406                    .clone_from(&backend.load_balancing_parameters);
407                b.backup = backend.backup;
408            }
409        }
410    }
411
412    pub fn remove_backend(&mut self, backend_address: &SocketAddr) {
413        self.backends
414            .retain(|backend| &backend.borrow().address != backend_address);
415    }
416
417    pub fn has_backend(&self, backend_address: &SocketAddr) -> bool {
418        self.backends
419            .iter()
420            .any(|backend| backend.borrow().address == *backend_address)
421    }
422
423    pub fn find_backend(
424        &mut self,
425        backend_address: &SocketAddr,
426    ) -> Option<&mut Rc<RefCell<Backend>>> {
427        self.backends
428            .iter_mut()
429            .find(|backend| backend.borrow().address == *backend_address)
430    }
431
432    pub fn find_sticky(&mut self, sticky_session: &str) -> Option<&mut Rc<RefCell<Backend>>> {
433        self.backends
434            .iter_mut()
435            .find(|b| b.borrow().sticky_id.as_deref() == Some(sticky_session))
436            .and_then(|b| if b.borrow().can_open() { Some(b) } else { None })
437    }
438
439    pub fn available_backends(&mut self, backup: bool) -> Vec<Rc<RefCell<Backend>>> {
440        self.backends
441            .iter()
442            .filter(|backend| {
443                let owned = backend.borrow();
444                owned.backup == backup && owned.can_open()
445            })
446            .map(Clone::clone)
447            .collect()
448    }
449
450    pub fn next_available_backend(&mut self) -> Option<Rc<RefCell<Backend>>> {
451        let mut backends = self.available_backends(false);
452
453        if backends.is_empty() {
454            backends = self.available_backends(true);
455        }
456
457        if backends.is_empty() {
458            return None;
459        }
460
461        self.load_balancing.next_available_backend(&mut backends)
462    }
463
464    pub fn set_load_balancing_policy(
465        &mut self,
466        load_balancing_policy: LoadBalancingAlgorithms,
467        metric: Option<LoadMetric>,
468    ) {
469        match load_balancing_policy {
470            LoadBalancingAlgorithms::RoundRobin => {
471                self.load_balancing = Box::new(RoundRobin::new())
472            }
473            LoadBalancingAlgorithms::Random => self.load_balancing = Box::new(Random {}),
474            LoadBalancingAlgorithms::LeastLoaded => {
475                self.load_balancing = Box::new(LeastLoaded {
476                    metric: metric.unwrap_or(LoadMetric::Connections),
477                })
478            }
479            LoadBalancingAlgorithms::PowerOfTwo => {
480                self.load_balancing = Box::new(PowerOfTwo {
481                    metric: metric.unwrap_or(LoadMetric::Connections),
482                })
483            }
484        }
485    }
486}
487
488#[cfg(test)]
489mod backends_test {
490
491    use std::{net::TcpListener, sync::mpsc::*, thread};
492
493    use super::*;
494
495    fn run_mock_tcp_server(addr: &str, stopper: Receiver<()>) {
496        let mut run = true;
497        let listener = TcpListener::bind(addr).unwrap();
498
499        thread::spawn(move || {
500            while run {
501                for _stream in listener.incoming() {
502                    // accept connections
503                    if let Ok(()) = stopper.try_recv() {
504                        run = false;
505                    }
506                }
507            }
508        });
509    }
510
511    #[test]
512    fn it_should_retrieve_a_backend_from_cluster_id_when_backends_have_been_recorded() {
513        let mut backend_map = BackendMap::new();
514        let cluster_id = "mycluster";
515
516        let backend_addr = "127.0.0.1:1236";
517        let (sender, receiver) = channel();
518        run_mock_tcp_server(backend_addr, receiver);
519
520        backend_map.add_backend(
521            cluster_id,
522            Backend::new(
523                &format!("{cluster_id}-1"),
524                backend_addr.parse().unwrap(),
525                None,
526                None,
527                None,
528            ),
529        );
530
531        assert!(backend_map.backend_from_cluster_id(cluster_id).is_ok());
532        sender.send(()).unwrap();
533    }
534
535    #[test]
536    fn it_should_not_retrieve_a_backend_from_cluster_id_when_backend_has_not_been_recorded() {
537        let mut backend_map = BackendMap::new();
538        let cluster_not_recorded = "not";
539        backend_map.add_backend(
540            "foo",
541            Backend::new("foo-1", "127.0.0.1:9001".parse().unwrap(), None, None, None),
542        );
543
544        assert!(
545            backend_map
546                .backend_from_cluster_id(cluster_not_recorded)
547                .is_err()
548        );
549    }
550
551    #[test]
552    fn it_should_not_retrieve_a_backend_from_cluster_id_when_backend_list_is_empty() {
553        let mut backend_map = BackendMap::new();
554
555        assert!(backend_map.backend_from_cluster_id("dumb").is_err());
556    }
557
558    #[test]
559    fn it_should_retrieve_a_backend_from_sticky_session_when_the_backend_has_been_recorded() {
560        let mut backend_map = BackendMap::new();
561        let cluster_id = "mycluster";
562        let sticky_session = "server-2";
563
564        let backend_addr = "127.0.0.1:3456";
565        let (sender, receiver) = channel();
566        run_mock_tcp_server(backend_addr, receiver);
567
568        backend_map.add_backend(
569            cluster_id,
570            Backend::new(
571                &format!("{cluster_id}-1"),
572                "127.0.0.1:9001".parse().unwrap(),
573                Some("server-1".to_string()),
574                None,
575                None,
576            ),
577        );
578        backend_map.add_backend(
579            cluster_id,
580            Backend::new(
581                &format!("{cluster_id}-2"),
582                "127.0.0.1:9000".parse().unwrap(),
583                Some("server-2".to_string()),
584                None,
585                None,
586            ),
587        );
588        // sticky backend
589        backend_map.add_backend(
590            cluster_id,
591            Backend::new(
592                &format!("{cluster_id}-3"),
593                backend_addr.parse().unwrap(),
594                Some("server-3".to_string()),
595                None,
596                None,
597            ),
598        );
599
600        assert!(
601            backend_map
602                .backend_from_sticky_session(cluster_id, sticky_session)
603                .is_ok()
604        );
605        sender.send(()).unwrap();
606    }
607
608    #[test]
609    fn it_should_not_retrieve_a_backend_from_sticky_session_when_the_backend_has_not_been_recorded()
610    {
611        let mut backend_map = BackendMap::new();
612        let cluster_id = "mycluster";
613        let sticky_session = "test";
614
615        assert!(
616            backend_map
617                .backend_from_sticky_session(cluster_id, sticky_session)
618                .is_err()
619        );
620    }
621
622    #[test]
623    fn it_should_not_retrieve_a_backend_from_sticky_session_when_the_backend_list_is_empty() {
624        let mut backend_map = BackendMap::new();
625        let mycluster_not_recorded = "mycluster";
626        let sticky_session = "test";
627
628        assert!(
629            backend_map
630                .backend_from_sticky_session(mycluster_not_recorded, sticky_session)
631                .is_err()
632        );
633    }
634
635    #[test]
636    fn it_should_add_a_backend_when_he_doesnt_already_exist() {
637        let backend_id = "myback";
638        let mut backends_list = BackendList::new();
639        backends_list.add_backend(Backend::new(
640            backend_id,
641            "127.0.0.1:80".parse().unwrap(),
642            None,
643            None,
644            None,
645        ));
646
647        assert_eq!(1, backends_list.backends.len());
648    }
649
650    #[test]
651    fn it_should_not_add_a_backend_when_he_already_exist() {
652        let backend_id = "myback";
653        let mut backends_list = BackendList::new();
654        backends_list.add_backend(Backend::new(
655            backend_id,
656            "127.0.0.1:80".parse().unwrap(),
657            None,
658            None,
659            None,
660        ));
661
662        //same backend id
663        backends_list.add_backend(Backend::new(
664            backend_id,
665            "127.0.0.1:80".parse().unwrap(),
666            None,
667            None,
668            None,
669        ));
670
671        assert_eq!(1, backends_list.backends.len());
672    }
673}