sozu_lib/
load_balancing.rs

1use std::{cell::RefCell, fmt::Debug, rc::Rc};
2
3use rand::{
4    distributions::{Distribution, WeightedIndex},
5    seq::SliceRandom,
6    thread_rng, Rng,
7};
8
9use crate::{backends::Backend, sozu_command::proto::command::LoadMetric};
10
11pub trait LoadBalancingAlgorithm: Debug {
12    fn next_available_backend(
13        &mut self,
14        backends: &mut Vec<Rc<RefCell<Backend>>>,
15    ) -> Option<Rc<RefCell<Backend>>>;
16}
17
18#[derive(Debug)]
19pub struct RoundRobin {
20    pub next_backend: u32,
21}
22
23impl LoadBalancingAlgorithm for RoundRobin {
24    fn next_available_backend(
25        &mut self,
26        backends: &mut Vec<Rc<RefCell<Backend>>>,
27    ) -> Option<Rc<RefCell<Backend>>> {
28        let res = backends
29            .get(self.next_backend as usize % backends.len())
30            .map(|backend| (*backend).clone());
31
32        self.next_backend = (self.next_backend + 1) % backends.len() as u32;
33        res
34    }
35}
36
37impl Default for RoundRobin {
38    fn default() -> Self {
39        Self::new()
40    }
41}
42
43impl RoundRobin {
44    pub fn new() -> Self {
45        Self { next_backend: 0 }
46    }
47}
48
49#[derive(Debug)]
50pub struct Random;
51
52impl LoadBalancingAlgorithm for Random {
53    fn next_available_backend(
54        &mut self,
55        backends: &mut Vec<Rc<RefCell<Backend>>>,
56    ) -> Option<Rc<RefCell<Backend>>> {
57        let mut rng = thread_rng();
58        let weights: Vec<i32> = backends
59            .iter()
60            .map(|b| {
61                b.borrow()
62                    .load_balancing_parameters
63                    .as_ref()
64                    .map(|p| p.weight)
65                    .unwrap_or(100)
66            })
67            .collect();
68
69        if let Ok(dist) = WeightedIndex::new(weights) {
70            let index = dist.sample(&mut rng);
71            backends.get(index).cloned()
72        } else {
73            (*backends)
74                .choose(&mut rng)
75                .map(|backend| (*backend).clone())
76        }
77    }
78}
79
80#[derive(Debug)]
81pub struct LeastLoaded {
82    pub metric: LoadMetric,
83}
84
85impl LoadBalancingAlgorithm for LeastLoaded {
86    fn next_available_backend(
87        &mut self,
88        backends: &mut Vec<Rc<RefCell<Backend>>>,
89    ) -> Option<Rc<RefCell<Backend>>> {
90        let opt_b = match self.metric {
91            LoadMetric::Connections => backends
92                .iter_mut()
93                .min_by_key(|backend| backend.borrow().active_connections),
94            LoadMetric::Requests => backends
95                .iter_mut()
96                .min_by_key(|backend| backend.borrow().active_requests),
97            LoadMetric::ConnectionTime => {
98                let mut b = None;
99                for backend in backends.iter_mut() {
100                    let cost2 = backend.borrow_mut().peak_ewma_connection();
101
102                    match b.take() {
103                        None => b = Some((cost2, backend)),
104                        Some((cost1, back1)) => {
105                            if cost1 <= cost2 {
106                                b = Some((cost1, back1));
107                            } else {
108                                b = Some((cost2, backend));
109                            }
110                        }
111                    }
112                }
113
114                b.map(|(_cost, backend)| backend)
115            }
116        };
117        opt_b.map(|backend| (*backend).clone())
118    }
119}
120
121#[derive(Debug)]
122pub struct PowerOfTwo {
123    pub metric: LoadMetric,
124}
125
126impl LoadBalancingAlgorithm for PowerOfTwo {
127    fn next_available_backend(
128        &mut self,
129        backends: &mut Vec<Rc<RefCell<Backend>>>,
130    ) -> Option<Rc<RefCell<Backend>>> {
131        let mut first = None;
132        let mut second = None;
133
134        for backend in backends.iter_mut() {
135            let measure = match self.metric {
136                LoadMetric::Connections => backend.borrow().active_connections as f64,
137                LoadMetric::Requests => backend.borrow().active_requests as f64,
138                LoadMetric::ConnectionTime => backend.borrow_mut().peak_ewma_connection(),
139            };
140
141            if first.is_none() {
142                first = Some((measure, backend));
143            } else if second.is_none() {
144                if first.as_ref().unwrap().0 <= measure {
145                    second = Some((measure, backend));
146                } else {
147                    second = first.take();
148                    first = Some((measure, backend));
149                }
150            } else if first.as_ref().unwrap().0 <= measure && measure < second.as_ref().unwrap().0 {
151                second = Some((measure, backend));
152                // other case: we don't change anything
153            } else {
154                second = first.take();
155                first = Some((measure, backend));
156            }
157        }
158
159        match (first, second) {
160            (None, None) => None,
161            (Some((_, b)), None) => Some(b.clone()),
162            // should not happen, but let's be exhaustive
163            (None, Some((_, b))) => Some(b.clone()),
164            (Some((_, b1)), Some((_, b2))) => {
165                if thread_rng().gen_bool(0.5) {
166                    Some(b1.clone())
167                } else {
168                    Some(b2.clone())
169                }
170            }
171        }
172    }
173}
174
175#[cfg(test)]
176mod test {
177    use super::*;
178    use crate::retry::{ExponentialBackoffPolicy, RetryPolicyWrapper};
179    use crate::sozu_command::proto::command::LoadMetric;
180    use crate::{backends::BackendStatus, PeakEWMA};
181    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
182
183    fn create_backend(id: String, connections: Option<usize>) -> Backend {
184        Backend {
185            sticky_id: None,
186            backend_id: id,
187            address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
188            status: BackendStatus::Normal,
189            retry_policy: RetryPolicyWrapper::ExponentialBackoff(ExponentialBackoffPolicy::new(1)),
190            active_connections: connections.unwrap_or(0),
191            active_requests: 0,
192            failures: 0,
193            load_balancing_parameters: None,
194            backup: false,
195            connection_time: PeakEWMA::new(),
196        }
197    }
198
199    #[test]
200    fn it_should_find_the_backend_with_least_connections() {
201        let backend_with_least_connection =
202            Rc::new(RefCell::new(create_backend("yolo".to_string(), Some(1))));
203
204        let mut backends = vec![
205            Rc::new(RefCell::new(create_backend("nolo".to_string(), Some(10)))),
206            Rc::new(RefCell::new(create_backend("philo".to_string(), Some(20)))),
207            backend_with_least_connection.clone(),
208        ];
209
210        let mut least_connection_algorithm = LeastLoaded {
211            metric: LoadMetric::Connections,
212        };
213
214        let backend_res = least_connection_algorithm
215            .next_available_backend(&mut backends)
216            .unwrap();
217        let backend = backend_res.borrow();
218
219        assert!(*backend == *backend_with_least_connection.borrow());
220    }
221
222    #[test]
223    fn it_shouldnt_find_backend_with_least_connections_when_list_is_empty() {
224        let mut backends = vec![];
225
226        let mut least_connection_algorithm = LeastLoaded {
227            metric: LoadMetric::Connections,
228        };
229
230        let backend = least_connection_algorithm.next_available_backend(&mut backends);
231        assert!(backend.is_none());
232    }
233
234    #[test]
235    fn it_should_find_backend_with_roundrobin_when_some_backends_were_removed() {
236        let mut backends = vec![
237            Rc::new(RefCell::new(create_backend("toto".to_string(), None))),
238            Rc::new(RefCell::new(create_backend("voto".to_string(), None))),
239            Rc::new(RefCell::new(create_backend("yoto".to_string(), None))),
240        ];
241
242        let mut roundrobin = RoundRobin { next_backend: 1 };
243        let backend = roundrobin.next_available_backend(&mut backends);
244        assert_eq!(backend.as_ref(), backends.get(1));
245
246        backends.remove(1);
247
248        let backend2 = roundrobin.next_available_backend(&mut backends);
249        assert_eq!(backend2.as_ref(), backends.first());
250    }
251}