sozu_lib/
load_balancing.rs

1use std::{cell::RefCell, fmt::Debug, rc::Rc};
2
3use rand::{
4    Rng,
5    distr::{Distribution, weighted::WeightedIndex},
6    prelude::IndexedRandom,
7    rng,
8};
9
10use crate::{backends::Backend, sozu_command::proto::command::LoadMetric};
11
12pub trait LoadBalancingAlgorithm: Debug {
13    fn next_available_backend(
14        &mut self,
15        backends: &mut Vec<Rc<RefCell<Backend>>>,
16    ) -> Option<Rc<RefCell<Backend>>>;
17}
18
19#[derive(Debug)]
20pub struct RoundRobin {
21    pub next_backend: u32,
22}
23
24impl LoadBalancingAlgorithm for RoundRobin {
25    fn next_available_backend(
26        &mut self,
27        backends: &mut Vec<Rc<RefCell<Backend>>>,
28    ) -> Option<Rc<RefCell<Backend>>> {
29        let res = backends
30            .get(self.next_backend as usize % backends.len())
31            .map(|backend| (*backend).clone());
32
33        self.next_backend = (self.next_backend + 1) % backends.len() as u32;
34        res
35    }
36}
37
38impl Default for RoundRobin {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44impl RoundRobin {
45    pub fn new() -> Self {
46        Self { next_backend: 0 }
47    }
48}
49
50#[derive(Debug)]
51pub struct Random;
52
53impl LoadBalancingAlgorithm for Random {
54    fn next_available_backend(
55        &mut self,
56        backends: &mut Vec<Rc<RefCell<Backend>>>,
57    ) -> Option<Rc<RefCell<Backend>>> {
58        let mut rng = rng();
59        let weights: Vec<i32> = backends
60            .iter()
61            .map(|b| {
62                b.borrow()
63                    .load_balancing_parameters
64                    .as_ref()
65                    .map(|p| p.weight)
66                    .unwrap_or(100)
67            })
68            .collect();
69
70        if let Ok(dist) = WeightedIndex::new(weights) {
71            let index = dist.sample(&mut rng);
72            backends.get(index).cloned()
73        } else {
74            (*backends)
75                .choose(&mut rng)
76                .map(|backend| (*backend).clone())
77        }
78    }
79}
80
81#[derive(Debug)]
82pub struct LeastLoaded {
83    pub metric: LoadMetric,
84}
85
86impl LoadBalancingAlgorithm for LeastLoaded {
87    fn next_available_backend(
88        &mut self,
89        backends: &mut Vec<Rc<RefCell<Backend>>>,
90    ) -> Option<Rc<RefCell<Backend>>> {
91        let opt_b = match self.metric {
92            LoadMetric::Connections => backends
93                .iter_mut()
94                .min_by_key(|backend| backend.borrow().active_connections),
95            LoadMetric::Requests => backends
96                .iter_mut()
97                .min_by_key(|backend| backend.borrow().active_requests),
98            LoadMetric::ConnectionTime => {
99                let mut b = None;
100                for backend in backends.iter_mut() {
101                    let cost2 = backend.borrow_mut().peak_ewma_connection();
102
103                    match b.take() {
104                        None => b = Some((cost2, backend)),
105                        Some((cost1, back1)) => {
106                            if cost1 <= cost2 {
107                                b = Some((cost1, back1));
108                            } else {
109                                b = Some((cost2, backend));
110                            }
111                        }
112                    }
113                }
114
115                b.map(|(_cost, backend)| backend)
116            }
117        };
118        opt_b.map(|backend| (*backend).clone())
119    }
120}
121
122#[derive(Debug)]
123pub struct PowerOfTwo {
124    pub metric: LoadMetric,
125}
126
127impl LoadBalancingAlgorithm for PowerOfTwo {
128    fn next_available_backend(
129        &mut self,
130        backends: &mut Vec<Rc<RefCell<Backend>>>,
131    ) -> Option<Rc<RefCell<Backend>>> {
132        let mut first = None;
133        let mut second = None;
134
135        for backend in backends.iter_mut() {
136            let measure = match self.metric {
137                LoadMetric::Connections => backend.borrow().active_connections as f64,
138                LoadMetric::Requests => backend.borrow().active_requests as f64,
139                LoadMetric::ConnectionTime => backend.borrow_mut().peak_ewma_connection(),
140            };
141
142            if first.is_none() {
143                first = Some((measure, backend));
144            } else if second.is_none() {
145                if first.as_ref().unwrap().0 <= measure {
146                    second = Some((measure, backend));
147                } else {
148                    second = first.take();
149                    first = Some((measure, backend));
150                }
151            } else if first.as_ref().unwrap().0 <= measure && measure < second.as_ref().unwrap().0 {
152                second = Some((measure, backend));
153                // other case: we don't change anything
154            } else {
155                second = first.take();
156                first = Some((measure, backend));
157            }
158        }
159
160        match (first, second) {
161            (None, None) => None,
162            (Some((_, b)), None) => Some(b.clone()),
163            // should not happen, but let's be exhaustive
164            (None, Some((_, b))) => Some(b.clone()),
165            (Some((_, b1)), Some((_, b2))) => {
166                if rng().random_bool(0.5) {
167                    Some(b1.clone())
168                } else {
169                    Some(b2.clone())
170                }
171            }
172        }
173    }
174}
175
176#[cfg(test)]
177mod test {
178    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
179
180    use super::*;
181    use crate::{
182        PeakEWMA,
183        backends::BackendStatus,
184        retry::{ExponentialBackoffPolicy, RetryPolicyWrapper},
185        sozu_command::proto::command::LoadMetric,
186    };
187
188    fn create_backend(id: String, connections: Option<usize>) -> Backend {
189        Backend {
190            sticky_id: None,
191            backend_id: id,
192            address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
193            status: BackendStatus::Normal,
194            retry_policy: RetryPolicyWrapper::ExponentialBackoff(ExponentialBackoffPolicy::new(1)),
195            active_connections: connections.unwrap_or(0),
196            active_requests: 0,
197            failures: 0,
198            load_balancing_parameters: None,
199            backup: false,
200            connection_time: PeakEWMA::new(),
201        }
202    }
203
204    #[test]
205    fn it_should_find_the_backend_with_least_connections() {
206        let backend_with_least_connection =
207            Rc::new(RefCell::new(create_backend("yolo".to_string(), Some(1))));
208
209        let mut backends = vec![
210            Rc::new(RefCell::new(create_backend("nolo".to_string(), Some(10)))),
211            Rc::new(RefCell::new(create_backend("philo".to_string(), Some(20)))),
212            backend_with_least_connection.clone(),
213        ];
214
215        let mut least_connection_algorithm = LeastLoaded {
216            metric: LoadMetric::Connections,
217        };
218
219        let backend_res = least_connection_algorithm
220            .next_available_backend(&mut backends)
221            .unwrap();
222        let backend = backend_res.borrow();
223
224        assert!(*backend == *backend_with_least_connection.borrow());
225    }
226
227    #[test]
228    fn it_shouldnt_find_backend_with_least_connections_when_list_is_empty() {
229        let mut backends = vec![];
230
231        let mut least_connection_algorithm = LeastLoaded {
232            metric: LoadMetric::Connections,
233        };
234
235        let backend = least_connection_algorithm.next_available_backend(&mut backends);
236        assert!(backend.is_none());
237    }
238
239    #[test]
240    fn it_should_find_backend_with_roundrobin_when_some_backends_were_removed() {
241        let mut backends = vec![
242            Rc::new(RefCell::new(create_backend("toto".to_string(), None))),
243            Rc::new(RefCell::new(create_backend("voto".to_string(), None))),
244            Rc::new(RefCell::new(create_backend("yoto".to_string(), None))),
245        ];
246
247        let mut roundrobin = RoundRobin { next_backend: 1 };
248        let backend = roundrobin.next_available_backend(&mut backends);
249        assert_eq!(backend.as_ref(), backends.get(1));
250
251        backends.remove(1);
252
253        let backend2 = roundrobin.next_available_backend(&mut backends);
254        assert_eq!(backend2.as_ref(), backends.first());
255    }
256}