1use std::{cell::RefCell, collections::HashMap, net::SocketAddr, rc::Rc, time::Duration};
2
3use mio::net::TcpStream;
4use sozu_command::{
5 proto::command::{Event, EventKind, LoadBalancingAlgorithms, LoadBalancingParams, LoadMetric},
6 state::ClusterId,
7};
8
9use crate::{
10 PeakEWMA,
11 load_balancing::{LeastLoaded, LoadBalancingAlgorithm, PowerOfTwo, Random, RoundRobin},
12 retry::{self, RetryPolicy},
13 server::{self, push_event},
14};
15
16#[derive(thiserror::Error, Debug)]
17pub enum BackendError {
18 #[error("No backend found for cluster {0}")]
19 NoBackendForCluster(String),
20 #[error("Failed to connect to socket with MIO: {0}")]
21 MioConnection(std::io::Error),
22 #[error("This backend is not in a normal status: status={0:?}")]
23 Status(BackendStatus),
24 #[error("could not connect {cluster_id} to {backend_address:?} ({failures} failures): {error}")]
25 ConnectionFailures {
26 cluster_id: String,
27 backend_address: SocketAddr,
28 failures: usize,
29 error: String,
30 },
31}
32
33#[derive(Debug, PartialEq, Eq, Clone)]
34pub enum BackendStatus {
35 Normal,
36 Closing,
37 Closed,
38}
39
40#[derive(Debug, PartialEq, Clone)]
41pub struct Backend {
42 pub sticky_id: Option<String>,
43 pub backend_id: String,
44 pub address: SocketAddr,
45 pub status: BackendStatus,
46 pub retry_policy: retry::RetryPolicyWrapper,
47 pub active_connections: usize,
48 pub active_requests: usize,
49 pub failures: usize,
50 pub load_balancing_parameters: Option<LoadBalancingParams>,
51 pub backup: bool,
52 pub connection_time: PeakEWMA,
53}
54
55impl Backend {
56 pub fn new(
57 backend_id: &str,
58 address: SocketAddr,
59 sticky_id: Option<String>,
60 load_balancing_parameters: Option<LoadBalancingParams>,
61 backup: Option<bool>,
62 ) -> Backend {
63 let desired_policy = retry::ExponentialBackoffPolicy::new(6);
64 Backend {
65 sticky_id,
66 backend_id: backend_id.to_string(),
67 address,
68 status: BackendStatus::Normal,
69 retry_policy: desired_policy.into(),
70 active_connections: 0,
71 active_requests: 0,
72 failures: 0,
73 load_balancing_parameters,
74 backup: backup.unwrap_or(false),
75 connection_time: PeakEWMA::new(),
76 }
77 }
78
79 pub fn set_closing(&mut self) {
80 self.status = BackendStatus::Closing;
81 }
82
83 pub fn retry_policy(&mut self) -> &mut retry::RetryPolicyWrapper {
84 &mut self.retry_policy
85 }
86
87 pub fn can_open(&self) -> bool {
88 if let Some(action) = self.retry_policy.can_try() {
89 self.status == BackendStatus::Normal && action == retry::RetryAction::OKAY
90 } else {
91 false
92 }
93 }
94
95 pub fn inc_connections(&mut self) -> Option<usize> {
96 if self.status == BackendStatus::Normal {
97 self.active_connections += 1;
98 Some(self.active_connections)
99 } else {
100 None
101 }
102 }
103
104 pub fn dec_connections(&mut self) -> Option<usize> {
106 match self.status {
107 BackendStatus::Normal => {
108 if self.active_connections > 0 {
109 self.active_connections -= 1;
110 }
111 Some(self.active_connections)
112 }
113 BackendStatus::Closed => None,
114 BackendStatus::Closing => {
115 if self.active_connections > 0 {
116 self.active_connections -= 1;
117 }
118 if self.active_connections == 0 {
119 self.status = BackendStatus::Closed;
120 None
121 } else {
122 Some(self.active_connections)
123 }
124 }
125 }
126 }
127
128 pub fn set_connection_time(&mut self, dur: Duration) {
129 self.connection_time.observe(dur.as_nanos() as f64);
130 }
131
132 pub fn peak_ewma_connection(&mut self) -> f64 {
133 self.connection_time.get(self.active_connections)
134 }
135
136 pub fn try_connect(&mut self) -> Result<mio::net::TcpStream, BackendError> {
137 if self.status != BackendStatus::Normal {
138 return Err(BackendError::Status(self.status.to_owned()));
139 }
140
141 match mio::net::TcpStream::connect(self.address) {
142 Ok(tcp_stream) => {
143 self.inc_connections();
145 Ok(tcp_stream)
146 }
147 Err(io_error) => {
148 self.retry_policy.fail();
149 self.failures += 1;
150 Err(BackendError::MioConnection(io_error))
155 }
156 }
157 }
158}
159
160impl std::ops::Drop for Backend {
164 fn drop(&mut self) {
165 server::push_event(Event {
166 kind: EventKind::RemovedBackendHasNoConnections as i32,
167 backend_id: Some(self.backend_id.clone()),
168 address: Some(self.address.into()),
169 cluster_id: None,
170 });
171 }
172}
173
174#[derive(Debug)]
175pub struct BackendMap {
176 pub backends: HashMap<ClusterId, BackendList>,
177 pub max_failures: usize,
178 pub available: bool,
179}
180
181impl Default for BackendMap {
182 fn default() -> Self {
183 Self::new()
184 }
185}
186
187impl BackendMap {
188 pub fn new() -> BackendMap {
189 BackendMap {
190 backends: HashMap::new(),
191 max_failures: 3,
192 available: true,
193 }
194 }
195
196 pub fn import_configuration_state(
197 &mut self,
198 backends: &HashMap<ClusterId, Vec<sozu_command::response::Backend>>,
199 ) {
200 self.backends
201 .extend(backends.iter().map(|(cluster_id, backend_vec)| {
202 (
203 cluster_id.to_string(),
204 BackendList::import_configuration_state(backend_vec),
205 )
206 }));
207 }
208
209 pub fn add_backend(&mut self, cluster_id: &str, backend: Backend) {
210 self.backends
211 .entry(cluster_id.to_string())
212 .or_default()
213 .add_backend(backend);
214 }
215
216 pub fn remove_backend(&mut self, cluster_id: &str, backend_address: &SocketAddr) {
218 if let Some(backends) = self.backends.get_mut(cluster_id) {
219 backends.remove_backend(backend_address);
220 } else {
221 error!(
222 "Backend was already removed: cluster id {}, address {:?}",
223 cluster_id, backend_address
224 );
225 }
226 }
227
228 pub fn close_backend_connection(&mut self, cluster_id: &str, addr: &SocketAddr) {
230 if let Some(cluster_backends) = self.backends.get_mut(cluster_id) {
231 if let Some(ref mut backend) = cluster_backends.find_backend(addr) {
232 backend.borrow_mut().dec_connections();
233 }
234 }
235 }
236
237 pub fn has_backend(&self, cluster_id: &str, backend: &Backend) -> bool {
238 self.backends
239 .get(cluster_id)
240 .map(|backends| backends.has_backend(&backend.address))
241 .unwrap_or(false)
242 }
243
244 pub fn backend_from_cluster_id(
245 &mut self,
246 cluster_id: &str,
247 ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
248 let cluster_backends = self
249 .backends
250 .get_mut(cluster_id)
251 .ok_or(BackendError::NoBackendForCluster(cluster_id.to_owned()))?;
252
253 if cluster_backends.backends.is_empty() {
254 self.available = false;
255 return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
256 }
257
258 let next_backend = match cluster_backends.next_available_backend() {
259 Some(nb) => nb,
260 None => {
261 if self.available {
262 self.available = false;
263
264 push_event(Event {
265 kind: EventKind::NoAvailableBackends as i32,
266 cluster_id: Some(cluster_id.to_owned()),
267 backend_id: None,
268 address: None,
269 });
270 }
271 return Err(BackendError::NoBackendForCluster(cluster_id.to_owned()));
272 }
273 };
274
275 let mut borrowed_backend = next_backend.borrow_mut();
276
277 debug!(
278 "Connecting {} -> {:?}",
279 cluster_id,
280 (
281 borrowed_backend.address,
282 borrowed_backend.active_connections,
283 borrowed_backend.failures
284 )
285 );
286
287 let tcp_stream = borrowed_backend.try_connect().map_err(|backend_error| {
288 BackendError::ConnectionFailures {
289 cluster_id: cluster_id.to_owned(),
290 backend_address: borrowed_backend.address,
291 failures: borrowed_backend.failures,
292 error: backend_error.to_string(),
293 }
294 })?;
295 self.available = true;
296
297 Ok((next_backend.clone(), tcp_stream))
298 }
299
300 pub fn backend_from_sticky_session(
301 &mut self,
302 cluster_id: &str,
303 sticky_session: &str,
304 ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
305 let sticky_conn = self
306 .backends
307 .get_mut(cluster_id)
308 .and_then(|cluster_backends| cluster_backends.find_sticky(sticky_session))
309 .map(|backend| {
310 let mut borrowed = backend.borrow_mut();
311 let conn = borrowed.try_connect();
312
313 conn.map(|tcp_stream| (backend.clone(), tcp_stream))
314 .inspect_err(|_| {
315 error!(
316 "could not connect {} to {:?} using session {} ({} failures)",
317 cluster_id, borrowed.address, sticky_session, borrowed.failures
318 )
319 })
320 });
321
322 match sticky_conn {
323 Some(backend_and_stream) => backend_and_stream,
324 None => {
325 debug!(
326 "Couldn't find a backend corresponding to sticky_session {} for cluster {}",
327 sticky_session, cluster_id
328 );
329 self.backend_from_cluster_id(cluster_id)
330 }
331 }
332 }
333
334 pub fn set_load_balancing_policy_for_cluster(
335 &mut self,
336 cluster_id: &str,
337 lb_algo: LoadBalancingAlgorithms,
338 metric: Option<LoadMetric>,
339 ) {
340 let cluster_backends = self.get_or_create_backend_list_for_cluster(cluster_id);
343 cluster_backends.set_load_balancing_policy(lb_algo, metric);
344 }
345
346 pub fn get_or_create_backend_list_for_cluster(&mut self, cluster_id: &str) -> &mut BackendList {
347 self.backends.entry(cluster_id.to_string()).or_default()
348 }
349}
350
351#[derive(Debug)]
352pub struct BackendList {
353 pub backends: Vec<Rc<RefCell<Backend>>>,
354 pub next_id: u32,
355 pub load_balancing: Box<dyn LoadBalancingAlgorithm>,
356}
357
358impl Default for BackendList {
359 fn default() -> Self {
360 Self::new()
361 }
362}
363
364impl BackendList {
365 pub fn new() -> BackendList {
366 BackendList {
367 backends: Vec::new(),
368 next_id: 0,
369 load_balancing: Box::new(Random),
370 }
371 }
372
373 pub fn import_configuration_state(
374 backend_vec: &[sozu_command_lib::response::Backend],
375 ) -> BackendList {
376 let mut list = BackendList::new();
377 for backend in backend_vec {
378 let backend = Backend::new(
379 &backend.backend_id,
380 backend.address,
381 backend.sticky_id.clone(),
382 backend.load_balancing_parameters,
383 backend.backup,
384 );
385 list.add_backend(backend);
386 }
387
388 list
389 }
390
391 pub fn add_backend(&mut self, backend: Backend) {
392 match self.backends.iter_mut().find(|b| {
393 b.borrow().address == backend.address && b.borrow().backend_id == backend.backend_id
394 }) {
395 None => {
396 let backend = Rc::new(RefCell::new(backend));
397 self.backends.push(backend);
398 self.next_id += 1;
399 }
400 Some(old_backend) => {
403 let mut b = old_backend.borrow_mut();
404 b.sticky_id.clone_from(&backend.sticky_id);
405 b.load_balancing_parameters
406 .clone_from(&backend.load_balancing_parameters);
407 b.backup = backend.backup;
408 }
409 }
410 }
411
412 pub fn remove_backend(&mut self, backend_address: &SocketAddr) {
413 self.backends
414 .retain(|backend| &backend.borrow().address != backend_address);
415 }
416
417 pub fn has_backend(&self, backend_address: &SocketAddr) -> bool {
418 self.backends
419 .iter()
420 .any(|backend| backend.borrow().address == *backend_address)
421 }
422
423 pub fn find_backend(
424 &mut self,
425 backend_address: &SocketAddr,
426 ) -> Option<&mut Rc<RefCell<Backend>>> {
427 self.backends
428 .iter_mut()
429 .find(|backend| backend.borrow().address == *backend_address)
430 }
431
432 pub fn find_sticky(&mut self, sticky_session: &str) -> Option<&mut Rc<RefCell<Backend>>> {
433 self.backends
434 .iter_mut()
435 .find(|b| b.borrow().sticky_id.as_deref() == Some(sticky_session))
436 .and_then(|b| if b.borrow().can_open() { Some(b) } else { None })
437 }
438
439 pub fn available_backends(&mut self, backup: bool) -> Vec<Rc<RefCell<Backend>>> {
440 self.backends
441 .iter()
442 .filter(|backend| {
443 let owned = backend.borrow();
444 owned.backup == backup && owned.can_open()
445 })
446 .map(Clone::clone)
447 .collect()
448 }
449
450 pub fn next_available_backend(&mut self) -> Option<Rc<RefCell<Backend>>> {
451 let mut backends = self.available_backends(false);
452
453 if backends.is_empty() {
454 backends = self.available_backends(true);
455 }
456
457 if backends.is_empty() {
458 return None;
459 }
460
461 self.load_balancing.next_available_backend(&mut backends)
462 }
463
464 pub fn set_load_balancing_policy(
465 &mut self,
466 load_balancing_policy: LoadBalancingAlgorithms,
467 metric: Option<LoadMetric>,
468 ) {
469 match load_balancing_policy {
470 LoadBalancingAlgorithms::RoundRobin => {
471 self.load_balancing = Box::new(RoundRobin::new())
472 }
473 LoadBalancingAlgorithms::Random => self.load_balancing = Box::new(Random {}),
474 LoadBalancingAlgorithms::LeastLoaded => {
475 self.load_balancing = Box::new(LeastLoaded {
476 metric: metric.unwrap_or(LoadMetric::Connections),
477 })
478 }
479 LoadBalancingAlgorithms::PowerOfTwo => {
480 self.load_balancing = Box::new(PowerOfTwo {
481 metric: metric.unwrap_or(LoadMetric::Connections),
482 })
483 }
484 }
485 }
486}
487
488#[cfg(test)]
489mod backends_test {
490
491 use std::{net::TcpListener, sync::mpsc::*, thread};
492
493 use super::*;
494
495 fn run_mock_tcp_server(addr: &str, stopper: Receiver<()>) {
496 let mut run = true;
497 let listener = TcpListener::bind(addr).unwrap();
498
499 thread::spawn(move || {
500 while run {
501 for _stream in listener.incoming() {
502 if let Ok(()) = stopper.try_recv() {
504 run = false;
505 }
506 }
507 }
508 });
509 }
510
511 #[test]
512 fn it_should_retrieve_a_backend_from_cluster_id_when_backends_have_been_recorded() {
513 let mut backend_map = BackendMap::new();
514 let cluster_id = "mycluster";
515
516 let backend_addr = "127.0.0.1:1236";
517 let (sender, receiver) = channel();
518 run_mock_tcp_server(backend_addr, receiver);
519
520 backend_map.add_backend(
521 cluster_id,
522 Backend::new(
523 &format!("{cluster_id}-1"),
524 backend_addr.parse().unwrap(),
525 None,
526 None,
527 None,
528 ),
529 );
530
531 assert!(backend_map.backend_from_cluster_id(cluster_id).is_ok());
532 sender.send(()).unwrap();
533 }
534
535 #[test]
536 fn it_should_not_retrieve_a_backend_from_cluster_id_when_backend_has_not_been_recorded() {
537 let mut backend_map = BackendMap::new();
538 let cluster_not_recorded = "not";
539 backend_map.add_backend(
540 "foo",
541 Backend::new("foo-1", "127.0.0.1:9001".parse().unwrap(), None, None, None),
542 );
543
544 assert!(
545 backend_map
546 .backend_from_cluster_id(cluster_not_recorded)
547 .is_err()
548 );
549 }
550
551 #[test]
552 fn it_should_not_retrieve_a_backend_from_cluster_id_when_backend_list_is_empty() {
553 let mut backend_map = BackendMap::new();
554
555 assert!(backend_map.backend_from_cluster_id("dumb").is_err());
556 }
557
558 #[test]
559 fn it_should_retrieve_a_backend_from_sticky_session_when_the_backend_has_been_recorded() {
560 let mut backend_map = BackendMap::new();
561 let cluster_id = "mycluster";
562 let sticky_session = "server-2";
563
564 let backend_addr = "127.0.0.1:3456";
565 let (sender, receiver) = channel();
566 run_mock_tcp_server(backend_addr, receiver);
567
568 backend_map.add_backend(
569 cluster_id,
570 Backend::new(
571 &format!("{cluster_id}-1"),
572 "127.0.0.1:9001".parse().unwrap(),
573 Some("server-1".to_string()),
574 None,
575 None,
576 ),
577 );
578 backend_map.add_backend(
579 cluster_id,
580 Backend::new(
581 &format!("{cluster_id}-2"),
582 "127.0.0.1:9000".parse().unwrap(),
583 Some("server-2".to_string()),
584 None,
585 None,
586 ),
587 );
588 backend_map.add_backend(
590 cluster_id,
591 Backend::new(
592 &format!("{cluster_id}-3"),
593 backend_addr.parse().unwrap(),
594 Some("server-3".to_string()),
595 None,
596 None,
597 ),
598 );
599
600 assert!(
601 backend_map
602 .backend_from_sticky_session(cluster_id, sticky_session)
603 .is_ok()
604 );
605 sender.send(()).unwrap();
606 }
607
608 #[test]
609 fn it_should_not_retrieve_a_backend_from_sticky_session_when_the_backend_has_not_been_recorded()
610 {
611 let mut backend_map = BackendMap::new();
612 let cluster_id = "mycluster";
613 let sticky_session = "test";
614
615 assert!(
616 backend_map
617 .backend_from_sticky_session(cluster_id, sticky_session)
618 .is_err()
619 );
620 }
621
622 #[test]
623 fn it_should_not_retrieve_a_backend_from_sticky_session_when_the_backend_list_is_empty() {
624 let mut backend_map = BackendMap::new();
625 let mycluster_not_recorded = "mycluster";
626 let sticky_session = "test";
627
628 assert!(
629 backend_map
630 .backend_from_sticky_session(mycluster_not_recorded, sticky_session)
631 .is_err()
632 );
633 }
634
635 #[test]
636 fn it_should_add_a_backend_when_he_doesnt_already_exist() {
637 let backend_id = "myback";
638 let mut backends_list = BackendList::new();
639 backends_list.add_backend(Backend::new(
640 backend_id,
641 "127.0.0.1:80".parse().unwrap(),
642 None,
643 None,
644 None,
645 ));
646
647 assert_eq!(1, backends_list.backends.len());
648 }
649
650 #[test]
651 fn it_should_not_add_a_backend_when_he_already_exist() {
652 let backend_id = "myback";
653 let mut backends_list = BackendList::new();
654 backends_list.add_backend(Backend::new(
655 backend_id,
656 "127.0.0.1:80".parse().unwrap(),
657 None,
658 None,
659 None,
660 ));
661
662 backends_list.add_backend(Backend::new(
664 backend_id,
665 "127.0.0.1:80".parse().unwrap(),
666 None,
667 None,
668 None,
669 ));
670
671 assert_eq!(1, backends_list.backends.len());
672 }
673}