reqwest_lb/lb/
policy.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use crate::lb::weight::WeightProvider;
use crate::lb::Statistic;
use crate::with::With;
use http::Extensions;
use rand::Rng;
use std::sync::atomic::Ordering;
use std::sync::Arc;

#[derive(Default)]
pub enum LoadBalancerPolicy<I> {
    #[default]
    RoundRobin,
    Random,
    First,
    Last,
    Weight(Arc<dyn WeightProvider<I> + Send + Sync>),
    Dynamic(Arc<dyn LoadBalancerPolicyTrait<I> + Send + Sync>),
}

impl<I> Clone for LoadBalancerPolicy<I> {
    fn clone(&self) -> Self {
        match self {
            LoadBalancerPolicy::RoundRobin => LoadBalancerPolicy::RoundRobin,
            LoadBalancerPolicy::Random => LoadBalancerPolicy::Random,
            LoadBalancerPolicy::First => LoadBalancerPolicy::First,
            LoadBalancerPolicy::Last => LoadBalancerPolicy::Last,
            LoadBalancerPolicy::Weight(f) => LoadBalancerPolicy::Weight(f.clone()),
            LoadBalancerPolicy::Dynamic(f) => LoadBalancerPolicy::Dynamic(f.clone()),
        }
    }
}

impl<I> LoadBalancerPolicy<I> {
    pub fn weight<F: Fn(&I) -> usize + Send + Sync + 'static>(f: F) -> Self {
        Self::Weight(Arc::new(f))
    }

    pub fn dynamic<F: Fn(&[I], &Extensions) -> usize + Send + Sync + 'static>(f: F) -> Self {
        Self::Dynamic(Arc::new(f))
    }
}

pub trait LoadBalancerPolicyTrait<I>: sealed::Sealed<I> {
    fn choose(&self, items: &[I], extensions: &mut Extensions) -> usize;
}

impl<I> sealed::Sealed<I> for LoadBalancerPolicy<I> {}

impl<I> LoadBalancerPolicyTrait<I> for LoadBalancerPolicy<I> {
    fn choose(&self, items: &[I], extensions: &mut Extensions) -> usize {
        let len = items.len();
        assert!(len > 1);
        match self {
            LoadBalancerPolicy::RoundRobin => match extensions.get::<Statistic>() {
                Some(statistic) => {
                    let count = statistic.count.load(Ordering::Relaxed).saturating_sub(1);
                    (count % (len as u64)) as usize
                }
                None => 0,
            },
            LoadBalancerPolicy::Random => rand::thread_rng().gen_range(0..len),
            LoadBalancerPolicy::First => 0,
            LoadBalancerPolicy::Last => items.len() - 1,
            LoadBalancerPolicy::Weight(f) => {
                let indexes = items
                    .iter()
                    .enumerate()
                    .map(|(index, item)| (index, f.weight(item)))
                    .flat_map(|(index, len)| {
                        Vec::with_capacity(len).with(|c| {
                            for _ in 0..len {
                                c.push(index);
                            }
                        })
                    })
                    .collect::<Vec<_>>();
                let index = rand::thread_rng().gen_range(0..indexes.len());
                indexes[index]
            }
            LoadBalancerPolicy::Dynamic(f) => f.choose(items, extensions),
        }
    }
}

impl<I, F> sealed::Sealed<I> for F where F: Fn(&[I], &Extensions) -> usize {}

impl<I, F> LoadBalancerPolicyTrait<I> for F
where
    F: Fn(&[I], &Extensions) -> usize,
{
    fn choose(&self, items: &[I], extensions: &mut Extensions) -> usize {
        self(items, extensions)
    }
}

mod sealed {
    pub trait Sealed<I> {}
}