1use std::{cell::RefCell, collections::HashMap, net::SocketAddr, rc::Rc, time::Duration};
2
3use mio::net::TcpStream;
4
5use sozu_command::{
6 proto::command::{Event, EventKind, LoadBalancingAlgorithms, LoadBalancingParams, LoadMetric},
7 state::ClusterId,
8};
9
10use crate::{
11 load_balancing::{LeastLoaded, LoadBalancingAlgorithm, PowerOfTwo, Random, RoundRobin},
12 retry::{self, RetryPolicy},
13 server::{self, push_event},
14 PeakEWMA,
15};
16
17#[derive(thiserror::Error, Debug)]
18pub enum BackendError {
19 #[error("No backend found for cluster {0}")]
20 NoBackendForCluster(String),
21 #[error("Failed to connect to socket with MIO: {0}")]
22 MioConnection(std::io::Error),
23 #[error("This backend is not in a normal status: status={0:?}")]
24 Status(BackendStatus),
25 #[error(
26 "could not connect {cluster_id} to {backend_address:?} ({failures} failures): {error}"
27 )]
28 ConnectionFailures {
29 cluster_id: String,
30 backend_address: SocketAddr,
31 failures: usize,
32 error: String,
33 },
34}
35
36#[derive(Debug, PartialEq, Eq, Clone)]
37pub enum BackendStatus {
38 Normal,
39 Closing,
40 Closed,
41}
42
43#[derive(Debug, PartialEq, Clone)]
44pub struct Backend {
45 pub sticky_id: Option<String>,
46 pub backend_id: String,
47 pub address: SocketAddr,
48 pub status: BackendStatus,
49 pub retry_policy: retry::RetryPolicyWrapper,
50 pub active_connections: usize,
51 pub active_requests: usize,
52 pub failures: usize,
53 pub load_balancing_parameters: Option<LoadBalancingParams>,
54 pub backup: bool,
55 pub connection_time: PeakEWMA,
56}
57
58impl Backend {
59 pub fn new(
60 backend_id: &str,
61 address: SocketAddr,
62 sticky_id: Option<String>,
63 load_balancing_parameters: Option<LoadBalancingParams>,
64 backup: Option<bool>,
65 ) -> Backend {
66 let desired_policy = retry::ExponentialBackoffPolicy::new(6);
67 Backend {
68 sticky_id,
69 backend_id: backend_id.to_string(),
70 address,
71 status: BackendStatus::Normal,
72 retry_policy: desired_policy.into(),
73 active_connections: 0,
74 active_requests: 0,
75 failures: 0,
76 load_balancing_parameters,
77 backup: backup.unwrap_or(false),
78 connection_time: PeakEWMA::new(),
79 }
80 }
81
82 pub fn set_closing(&mut self) {
83 self.status = BackendStatus::Closing;
84 }
85
86 pub fn retry_policy(&mut self) -> &mut retry::RetryPolicyWrapper {
87 &mut self.retry_policy
88 }
89
90 pub fn can_open(&self) -> bool {
91 if let Some(action) = self.retry_policy.can_try() {
92 self.status == BackendStatus::Normal && action == retry::RetryAction::OKAY
93 } else {
94 false
95 }
96 }
97
98 pub fn inc_connections(&mut self) -> Option<usize> {
99 if self.status == BackendStatus::Normal {
100 self.active_connections += 1;
101 Some(self.active_connections)
102 } else {
103 None
104 }
105 }
106
107 pub fn dec_connections(&mut self) -> Option<usize> {
109 match self.status {
110 BackendStatus::Normal => {
111 if self.active_connections > 0 {
112 self.active_connections -= 1;
113 }
114 Some(self.active_connections)
115 }
116 BackendStatus::Closed => None,
117 BackendStatus::Closing => {
118 if self.active_connections > 0 {
119 self.active_connections -= 1;
120 }
121 if self.active_connections == 0 {
122 self.status = BackendStatus::Closed;
123 None
124 } else {
125 Some(self.active_connections)
126 }
127 }
128 }
129 }
130
131 pub fn set_connection_time(&mut self, dur: Duration) {
132 self.connection_time.observe(dur.as_nanos() as f64);
133 }
134
135 pub fn peak_ewma_connection(&mut self) -> f64 {
136 self.connection_time.get(self.active_connections)
137 }
138
139 pub fn try_connect(&mut self) -> Result<mio::net::TcpStream, BackendError> {
140 if self.status != BackendStatus::Normal {
141 return Err(BackendError::Status(self.status.to_owned()));
142 }
143
144 match mio::net::TcpStream::connect(self.address) {
145 Ok(tcp_stream) => {
146 self.inc_connections();
148 Ok(tcp_stream)
149 }
150 Err(io_error) => {
151 self.retry_policy.fail();
152 self.failures += 1;
153 Err(BackendError::MioConnection(io_error))
158 }
159 }
160 }
161}
162
163impl std::ops::Drop for Backend {
167 fn drop(&mut self) {
168 server::push_event(Event {
169 kind: EventKind::RemovedBackendHasNoConnections as i32,
170 backend_id: Some(self.backend_id.clone()),
171 address: Some(self.address.into()),
172 cluster_id: None,
173 });
174 }
175}
176
177#[derive(Debug)]
178pub struct BackendMap {
179 pub backends: HashMap<ClusterId, BackendList>,
180 pub max_failures: usize,
181 pub available: bool,
182}
183
184impl Default for BackendMap {
185 fn default() -> Self {
186 Self::new()
187 }
188}
189
190impl BackendMap {
191 pub fn new() -> BackendMap {
192 BackendMap {
193 backends: HashMap::new(),
194 max_failures: 3,
195 available: true,
196 }
197 }
198
199 pub fn import_configuration_state(
200 &mut self,
201 backends: &HashMap<ClusterId, Vec<sozu_command::response::Backend>>,
202 ) {
203 self.backends
204 .extend(backends.iter().map(|(cluster_id, backend_vec)| {
205 (
206 cluster_id.to_string(),
207 BackendList::import_configuration_state(backend_vec),
208 )
209 }));
210 }
211
212 pub fn add_backend(&mut self, cluster_id: &str, backend: Backend) {
213 self.backends
214 .entry(cluster_id.to_string())
215 .or_default()
216 .add_backend(backend);
217 }
218
219 pub fn remove_backend(&mut self, cluster_id: &str, backend_address: &SocketAddr) {
221 if let Some(backends) = self.backends.get_mut(cluster_id) {
222 backends.remove_backend(backend_address);
223 } else {
224 error!(
225 "Backend was already removed: cluster id {}, address {:?}",
226 cluster_id, backend_address
227 );
228 }
229 }
230
231 pub fn close_backend_connection(&mut self, cluster_id: &str, addr: &SocketAddr) {
233 if let Some(cluster_backends) = self.backends.get_mut(cluster_id) {
234 if let Some(ref mut backend) = cluster_backends.find_backend(addr) {
235 backend.borrow_mut().dec_connections();
236 }
237 }
238 }
239
240 pub fn has_backend(&self, cluster_id: &str, backend: &Backend) -> bool {
241 self.backends
242 .get(cluster_id)
243 .map(|backends| backends.has_backend(&backend.address))
244 .unwrap_or(false)
245 }
246
247 pub fn backend_from_cluster_id(
248 &mut self,
249 cluster_id: &str,
250 ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
251 let cluster_backends = self
252 .backends
253 .get_mut(cluster_id)
254 .ok_or(BackendError::NoBackendForCluster(cluster_id.to_owned()))?;
255
256 if cluster_backends.backends.is_empty() {
257 self.available = false;
258 return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
259 }
260
261 let next_backend = match cluster_backends.next_available_backend() {
262 Some(nb) => nb,
263 None => {
264 if self.available {
265 self.available = false;
266
267 push_event(Event {
268 kind: EventKind::NoAvailableBackends as i32,
269 cluster_id: Some(cluster_id.to_owned()),
270 backend_id: None,
271 address: None,
272 });
273 }
274 return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
275 }
276 };
277
278 let mut borrowed_backend = next_backend.borrow_mut();
279
280 debug!(
281 "Connecting {} -> {:?}",
282 cluster_id,
283 (
284 borrowed_backend.address,
285 borrowed_backend.active_connections,
286 borrowed_backend.failures
287 )
288 );
289
290 let tcp_stream = borrowed_backend.try_connect().map_err(|backend_error| {
291 BackendError::ConnectionFailures {
292 cluster_id: cluster_id.to_owned(),
293 backend_address: borrowed_backend.address,
294 failures: borrowed_backend.failures,
295 error: backend_error.to_string(),
296 }
297 })?;
298 self.available = true;
299
300 Ok((next_backend.clone(), tcp_stream))
301 }
302
303 pub fn backend_from_sticky_session(
304 &mut self,
305 cluster_id: &str,
306 sticky_session: &str,
307 ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
308 let sticky_conn = self
309 .backends
310 .get_mut(cluster_id)
311 .and_then(|cluster_backends| cluster_backends.find_sticky(sticky_session))
312 .map(|backend| {
313 let mut borrowed = backend.borrow_mut();
314 let conn = borrowed.try_connect();
315
316 conn.map(|tcp_stream| (backend.clone(), tcp_stream))
317 .map_err(|e| {
318 error!(
319 "could not connect {} to {:?} using session {} ({} failures)",
320 cluster_id, borrowed.address, sticky_session, borrowed.failures
321 );
322 e
323 })
324 });
325
326 match sticky_conn {
327 Some(backend_and_stream) => backend_and_stream,
328 None => {
329 debug!(
330 "Couldn't find a backend corresponding to sticky_session {} for cluster {}",
331 sticky_session, cluster_id
332 );
333 self.backend_from_cluster_id(cluster_id)
334 }
335 }
336 }
337
338 pub fn set_load_balancing_policy_for_cluster(
339 &mut self,
340 cluster_id: &str,
341 lb_algo: LoadBalancingAlgorithms,
342 metric: Option<LoadMetric>,
343 ) {
344 let cluster_backends = self.get_or_create_backend_list_for_cluster(cluster_id);
347 cluster_backends.set_load_balancing_policy(lb_algo, metric);
348 }
349
350 pub fn get_or_create_backend_list_for_cluster(&mut self, cluster_id: &str) -> &mut BackendList {
351 self.backends.entry(cluster_id.to_string()).or_default()
352 }
353}
354
355#[derive(Debug)]
356pub struct BackendList {
357 pub backends: Vec<Rc<RefCell<Backend>>>,
358 pub next_id: u32,
359 pub load_balancing: Box<dyn LoadBalancingAlgorithm>,
360}
361
362impl Default for BackendList {
363 fn default() -> Self {
364 Self::new()
365 }
366}
367
368impl BackendList {
369 pub fn new() -> BackendList {
370 BackendList {
371 backends: Vec::new(),
372 next_id: 0,
373 load_balancing: Box::new(Random),
374 }
375 }
376
377 pub fn import_configuration_state(
378 backend_vec: &[sozu_command_lib::response::Backend],
379 ) -> BackendList {
380 let mut list = BackendList::new();
381 for backend in backend_vec {
382 let backend = Backend::new(
383 &backend.backend_id,
384 backend.address,
385 backend.sticky_id.clone(),
386 backend.load_balancing_parameters,
387 backend.backup,
388 );
389 list.add_backend(backend);
390 }
391
392 list
393 }
394
395 pub fn add_backend(&mut self, backend: Backend) {
396 match self.backends.iter_mut().find(|b| {
397 b.borrow().address == backend.address && b.borrow().backend_id == backend.backend_id
398 }) {
399 None => {
400 let backend = Rc::new(RefCell::new(backend));
401 self.backends.push(backend);
402 self.next_id += 1;
403 }
404 Some(old_backend) => {
407 let mut b = old_backend.borrow_mut();
408 b.sticky_id.clone_from(&backend.sticky_id);
409 b.load_balancing_parameters
410 .clone_from(&backend.load_balancing_parameters);
411 b.backup = backend.backup;
412 }
413 }
414 }
415
416 pub fn remove_backend(&mut self, backend_address: &SocketAddr) {
417 self.backends
418 .retain(|backend| &backend.borrow().address != backend_address);
419 }
420
421 pub fn has_backend(&self, backend_address: &SocketAddr) -> bool {
422 self.backends
423 .iter()
424 .any(|backend| backend.borrow().address == *backend_address)
425 }
426
427 pub fn find_backend(
428 &mut self,
429 backend_address: &SocketAddr,
430 ) -> Option<&mut Rc<RefCell<Backend>>> {
431 self.backends
432 .iter_mut()
433 .find(|backend| backend.borrow().address == *backend_address)
434 }
435
436 pub fn find_sticky(&mut self, sticky_session: &str) -> Option<&mut Rc<RefCell<Backend>>> {
437 self.backends
438 .iter_mut()
439 .find(|b| b.borrow().sticky_id.as_deref() == Some(sticky_session))
440 .and_then(|b| if b.borrow().can_open() { Some(b) } else { None })
441 }
442
443 pub fn available_backends(&mut self, backup: bool) -> Vec<Rc<RefCell<Backend>>> {
444 self.backends
445 .iter()
446 .filter(|backend| {
447 let owned = backend.borrow();
448 owned.backup == backup && owned.can_open()
449 })
450 .map(Clone::clone)
451 .collect()
452 }
453
454 pub fn next_available_backend(&mut self) -> Option<Rc<RefCell<Backend>>> {
455 let mut backends = self.available_backends(false);
456
457 if backends.is_empty() {
458 backends = self.available_backends(true);
459 }
460
461 if backends.is_empty() {
462 return None;
463 }
464
465 self.load_balancing.next_available_backend(&mut backends)
466 }
467
468 pub fn set_load_balancing_policy(
469 &mut self,
470 load_balancing_policy: LoadBalancingAlgorithms,
471 metric: Option<LoadMetric>,
472 ) {
473 match load_balancing_policy {
474 LoadBalancingAlgorithms::RoundRobin => {
475 self.load_balancing = Box::new(RoundRobin::new())
476 }
477 LoadBalancingAlgorithms::Random => self.load_balancing = Box::new(Random {}),
478 LoadBalancingAlgorithms::LeastLoaded => {
479 self.load_balancing = Box::new(LeastLoaded {
480 metric: metric.unwrap_or(LoadMetric::Connections),
481 })
482 }
483 LoadBalancingAlgorithms::PowerOfTwo => {
484 self.load_balancing = Box::new(PowerOfTwo {
485 metric: metric.unwrap_or(LoadMetric::Connections),
486 })
487 }
488 }
489 }
490}
491
492#[cfg(test)]
493mod backends_test {
494
495 use super::*;
496 use std::{net::TcpListener, sync::mpsc::*, thread};
497
498 fn run_mock_tcp_server(addr: &str, stopper: Receiver<()>) {
499 let mut run = true;
500 let listener = TcpListener::bind(addr).unwrap();
501
502 thread::spawn(move || {
503 while run {
504 for _stream in listener.incoming() {
505 if let Ok(()) = stopper.try_recv() {
507 run = false;
508 }
509 }
510 }
511 });
512 }
513
514 #[test]
515 fn it_should_retrieve_a_backend_from_cluster_id_when_backends_have_been_recorded() {
516 let mut backend_map = BackendMap::new();
517 let cluster_id = "mycluster";
518
519 let backend_addr = "127.0.0.1:1236";
520 let (sender, receiver) = channel();
521 run_mock_tcp_server(backend_addr, receiver);
522
523 backend_map.add_backend(
524 cluster_id,
525 Backend::new(
526 &format!("{cluster_id}-1"),
527 backend_addr.parse().unwrap(),
528 None,
529 None,
530 None,
531 ),
532 );
533
534 assert!(backend_map.backend_from_cluster_id(cluster_id).is_ok());
535 sender.send(()).unwrap();
536 }
537
538 #[test]
539 fn it_should_not_retrieve_a_backend_from_cluster_id_when_backend_has_not_been_recorded() {
540 let mut backend_map = BackendMap::new();
541 let cluster_not_recorded = "not";
542 backend_map.add_backend(
543 "foo",
544 Backend::new("foo-1", "127.0.0.1:9001".parse().unwrap(), None, None, None),
545 );
546
547 assert!(backend_map
548 .backend_from_cluster_id(cluster_not_recorded)
549 .is_err());
550 }
551
552 #[test]
553 fn it_should_not_retrieve_a_backend_from_cluster_id_when_backend_list_is_empty() {
554 let mut backend_map = BackendMap::new();
555
556 assert!(backend_map.backend_from_cluster_id("dumb").is_err());
557 }
558
559 #[test]
560 fn it_should_retrieve_a_backend_from_sticky_session_when_the_backend_has_been_recorded() {
561 let mut backend_map = BackendMap::new();
562 let cluster_id = "mycluster";
563 let sticky_session = "server-2";
564
565 let backend_addr = "127.0.0.1:3456";
566 let (sender, receiver) = channel();
567 run_mock_tcp_server(backend_addr, receiver);
568
569 backend_map.add_backend(
570 cluster_id,
571 Backend::new(
572 &format!("{cluster_id}-1"),
573 "127.0.0.1:9001".parse().unwrap(),
574 Some("server-1".to_string()),
575 None,
576 None,
577 ),
578 );
579 backend_map.add_backend(
580 cluster_id,
581 Backend::new(
582 &format!("{cluster_id}-2"),
583 "127.0.0.1:9000".parse().unwrap(),
584 Some("server-2".to_string()),
585 None,
586 None,
587 ),
588 );
589 backend_map.add_backend(
591 cluster_id,
592 Backend::new(
593 &format!("{cluster_id}-3"),
594 backend_addr.parse().unwrap(),
595 Some("server-3".to_string()),
596 None,
597 None,
598 ),
599 );
600
601 assert!(backend_map
602 .backend_from_sticky_session(cluster_id, sticky_session)
603 .is_ok());
604 sender.send(()).unwrap();
605 }
606
607 #[test]
608 fn it_should_not_retrieve_a_backend_from_sticky_session_when_the_backend_has_not_been_recorded()
609 {
610 let mut backend_map = BackendMap::new();
611 let cluster_id = "mycluster";
612 let sticky_session = "test";
613
614 assert!(backend_map
615 .backend_from_sticky_session(cluster_id, sticky_session)
616 .is_err());
617 }
618
619 #[test]
620 fn it_should_not_retrieve_a_backend_from_sticky_session_when_the_backend_list_is_empty() {
621 let mut backend_map = BackendMap::new();
622 let mycluster_not_recorded = "mycluster";
623 let sticky_session = "test";
624
625 assert!(backend_map
626 .backend_from_sticky_session(mycluster_not_recorded, sticky_session)
627 .is_err());
628 }
629
630 #[test]
631 fn it_should_add_a_backend_when_he_doesnt_already_exist() {
632 let backend_id = "myback";
633 let mut backends_list = BackendList::new();
634 backends_list.add_backend(Backend::new(
635 backend_id,
636 "127.0.0.1:80".parse().unwrap(),
637 None,
638 None,
639 None,
640 ));
641
642 assert_eq!(1, backends_list.backends.len());
643 }
644
645 #[test]
646 fn it_should_not_add_a_backend_when_he_already_exist() {
647 let backend_id = "myback";
648 let mut backends_list = BackendList::new();
649 backends_list.add_backend(Backend::new(
650 backend_id,
651 "127.0.0.1:80".parse().unwrap(),
652 None,
653 None,
654 None,
655 ));
656
657 backends_list.add_backend(Backend::new(
659 backend_id,
660 "127.0.0.1:80".parse().unwrap(),
661 None,
662 None,
663 None,
664 ));
665
666 assert_eq!(1, backends_list.backends.len());
667 }
668}