Skip to main content

sozu_lib/
load_balancing.rs

1use std::{cell::RefCell, fmt::Debug, rc::Rc};
2
3use rand::{
4    RngExt,
5    distr::{Distribution, weighted::WeightedIndex},
6    prelude::IndexedRandom,
7    rng,
8};
9
10use crate::{backends::Backend, sozu_command::proto::command::LoadMetric};
11
12pub trait LoadBalancingAlgorithm: Debug {
13    fn next_available_backend(
14        &mut self,
15        backends: &mut Vec<Rc<RefCell<Backend>>>,
16    ) -> Option<Rc<RefCell<Backend>>>;
17}
18
19#[derive(Debug)]
20pub struct RoundRobin {
21    pub next_backend: u32,
22}
23
24impl LoadBalancingAlgorithm for RoundRobin {
25    fn next_available_backend(
26        &mut self,
27        backends: &mut Vec<Rc<RefCell<Backend>>>,
28    ) -> Option<Rc<RefCell<Backend>>> {
29        let res = backends
30            .get(self.next_backend as usize % backends.len())
31            .map(|backend| (*backend).clone());
32
33        self.next_backend = (self.next_backend + 1) % backends.len() as u32;
34        res
35    }
36}
37
38impl Default for RoundRobin {
39    fn default() -> Self {
40        Self::new()
41    }
42}
43
44impl RoundRobin {
45    pub fn new() -> Self {
46        Self { next_backend: 0 }
47    }
48}
49
50#[derive(Debug)]
51pub struct Random;
52
53impl LoadBalancingAlgorithm for Random {
54    fn next_available_backend(
55        &mut self,
56        backends: &mut Vec<Rc<RefCell<Backend>>>,
57    ) -> Option<Rc<RefCell<Backend>>> {
58        let mut rng = rng();
59        let weights: Vec<i32> = backends
60            .iter()
61            .map(|b| {
62                b.borrow()
63                    .load_balancing_parameters
64                    .as_ref()
65                    .map(|p| p.weight)
66                    .unwrap_or(100)
67            })
68            .collect();
69
70        if let Ok(dist) = WeightedIndex::new(weights) {
71            let index = dist.sample(&mut rng);
72            backends.get(index).cloned()
73        } else {
74            (*backends)
75                .choose(&mut rng)
76                .map(|backend| (*backend).clone())
77        }
78    }
79}
80
81#[derive(Debug)]
82pub struct LeastLoaded {
83    pub metric: LoadMetric,
84}
85
86impl LoadBalancingAlgorithm for LeastLoaded {
87    fn next_available_backend(
88        &mut self,
89        backends: &mut Vec<Rc<RefCell<Backend>>>,
90    ) -> Option<Rc<RefCell<Backend>>> {
91        let opt_b = match self.metric {
92            LoadMetric::Connections => backends
93                .iter_mut()
94                .min_by_key(|backend| backend.borrow().active_connections),
95            LoadMetric::Requests => backends
96                .iter_mut()
97                .min_by_key(|backend| backend.borrow().active_requests),
98            LoadMetric::ConnectionTime => {
99                let mut b = None;
100                for backend in backends.iter_mut() {
101                    let cost2 = backend.borrow_mut().peak_ewma_connection();
102
103                    match b.take() {
104                        None => b = Some((cost2, backend)),
105                        Some((cost1, back1)) => {
106                            if cost1 <= cost2 {
107                                b = Some((cost1, back1));
108                            } else {
109                                b = Some((cost2, backend));
110                            }
111                        }
112                    }
113                }
114
115                b.map(|(_cost, backend)| backend)
116            }
117        };
118        opt_b.map(|backend| (*backend).clone())
119    }
120}
121
122#[derive(Debug)]
123pub struct PowerOfTwo {
124    pub metric: LoadMetric,
125}
126
127impl LoadBalancingAlgorithm for PowerOfTwo {
128    fn next_available_backend(
129        &mut self,
130        backends: &mut Vec<Rc<RefCell<Backend>>>,
131    ) -> Option<Rc<RefCell<Backend>>> {
132        let mut first = None;
133        let mut second = None;
134
135        for backend in backends.iter_mut() {
136            let measure = match self.metric {
137                LoadMetric::Connections => backend.borrow().active_connections as f64,
138                LoadMetric::Requests => backend.borrow().active_requests as f64,
139                LoadMetric::ConnectionTime => backend.borrow_mut().peak_ewma_connection(),
140            };
141
142            if first.is_none() {
143                first = Some((measure, backend));
144            } else if second.is_none() {
145                if first.as_ref().unwrap().0 <= measure {
146                    second = Some((measure, backend));
147                } else {
148                    second = first.take();
149                    first = Some((measure, backend));
150                }
151            } else if first.as_ref().unwrap().0 <= measure && measure < second.as_ref().unwrap().0 {
152                second = Some((measure, backend));
153                // other case: we don't change anything
154            } else {
155                second = first.take();
156                first = Some((measure, backend));
157            }
158        }
159
160        match (first, second) {
161            (None, None) => None,
162            (Some((_, b)), None) => Some(b.clone()),
163            // should not happen, but let's be exhaustive
164            (None, Some((_, b))) => Some(b.clone()),
165            (Some((_, b1)), Some((_, b2))) => {
166                if rng().random_bool(0.5) {
167                    Some(b1.clone())
168                } else {
169                    Some(b2.clone())
170                }
171            }
172        }
173    }
174}
175
176#[cfg(test)]
177mod test {
178    use std::net::{IpAddr, Ipv4Addr, SocketAddr};
179
180    use super::*;
181    use crate::{
182        PeakEWMA,
183        backends::{BackendStatus, HealthState},
184        retry::{ExponentialBackoffPolicy, RetryPolicyWrapper},
185        sozu_command::proto::command::LoadMetric,
186    };
187
188    fn create_backend(id: String, connections: Option<usize>) -> Backend {
189        Backend {
190            sticky_id: None,
191            backend_id: id,
192            address: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
193            status: BackendStatus::Normal,
194            retry_policy: RetryPolicyWrapper::ExponentialBackoff(ExponentialBackoffPolicy::new(1)),
195            active_connections: connections.unwrap_or(0),
196            active_requests: 0,
197            failures: 0,
198            load_balancing_parameters: None,
199            backup: false,
200            connection_time: PeakEWMA::new(),
201            health: HealthState::default(),
202        }
203    }
204
205    #[test]
206    fn it_should_find_the_backend_with_least_connections() {
207        let backend_with_least_connection =
208            Rc::new(RefCell::new(create_backend("yolo".to_string(), Some(1))));
209
210        let mut backends = vec![
211            Rc::new(RefCell::new(create_backend("nolo".to_string(), Some(10)))),
212            Rc::new(RefCell::new(create_backend("philo".to_string(), Some(20)))),
213            backend_with_least_connection.clone(),
214        ];
215
216        let mut least_connection_algorithm = LeastLoaded {
217            metric: LoadMetric::Connections,
218        };
219
220        let backend_res = least_connection_algorithm
221            .next_available_backend(&mut backends)
222            .unwrap();
223        let backend = backend_res.borrow();
224
225        assert!(*backend == *backend_with_least_connection.borrow());
226    }
227
228    #[test]
229    fn it_shouldnt_find_backend_with_least_connections_when_list_is_empty() {
230        let mut backends = vec![];
231
232        let mut least_connection_algorithm = LeastLoaded {
233            metric: LoadMetric::Connections,
234        };
235
236        let backend = least_connection_algorithm.next_available_backend(&mut backends);
237        assert!(backend.is_none());
238    }
239
240    #[test]
241    fn it_should_find_backend_with_roundrobin_when_some_backends_were_removed() {
242        let mut backends = vec![
243            Rc::new(RefCell::new(create_backend("toto".to_string(), None))),
244            Rc::new(RefCell::new(create_backend("voto".to_string(), None))),
245            Rc::new(RefCell::new(create_backend("yoto".to_string(), None))),
246        ];
247
248        let mut roundrobin = RoundRobin { next_backend: 1 };
249        let backend = roundrobin.next_available_backend(&mut backends);
250        assert_eq!(backend.as_ref(), backends.get(1));
251
252        backends.remove(1);
253
254        let backend2 = roundrobin.next_available_backend(&mut backends);
255        assert_eq!(backend2.as_ref(), backends.first());
256    }
257}