Skip to main content

reqwest_lb/load_balancer/
mod.rs

1mod policy;
2mod registry;
3mod weight;
4
5pub use policy::{LoadBalancerPolicy, LoadBalancerPolicyTrait};
6pub use registry::LoadBalancerRegistry;
7pub use weight::WeightProvider;
8
9use crate::supplier::Supplier;
10use async_trait::async_trait;
11use http::Extensions;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::{fmt::Debug, sync::atomic::Ordering};
15
16pub type BoxLoadBalancer<I, E> = Box<dyn LoadBalancerTrait<Element = I, Error = E> + Send + Sync>;
17
18#[async_trait]
19pub trait LoadBalancerTrait {
20    ///
21    /// load balancer element type
22    ///
23    type Element;
24
25    ///
26    /// load balancer choose element maybe error type
27    ///
28    type Error;
29
30    ///
31    /// load balancer choose a effect element
32    ///
33    async fn choose(
34        &self,
35        extensions: &mut Extensions,
36    ) -> Result<Option<Self::Element>, Self::Error>;
37
38    ///
39    /// Wrap to boxed load balancer
40    ///
41    fn boxed(self) -> BoxLoadBalancer<Self::Element, Self::Error>
42    where
43        Self: Sized + Send + Sync + 'static,
44    {
45        Box::new(MapFuture::new(self))
46    }
47}
48
49struct MapFuture<L> {
50    inner: L,
51}
52
53impl<L> MapFuture<L> {
54    pub fn new(inner: L) -> Self {
55        Self { inner }
56    }
57}
58
59#[async_trait]
60impl<L> LoadBalancerTrait for MapFuture<L>
61where
62    L: LoadBalancerTrait + Sync,
63{
64    type Element = L::Element;
65    type Error = L::Error;
66
67    async fn choose(
68        &self,
69        extensions: &mut Extensions,
70    ) -> Result<Option<Self::Element>, Self::Error> {
71        self.inner.choose(extensions).await
72    }
73}
74
75#[derive(Debug, Clone, Default)]
76pub struct Statistic {
77    pub count: Arc<AtomicU64>,
78}
79
80pub struct LoadBalancer<S: Supplier> {
81    supplier: S,
82    policy: LoadBalancerPolicy<S::Element>,
83    statistic: Statistic,
84}
85
86impl<S: Supplier> LoadBalancer<S> {
87    pub fn new(supplier: S, policy: LoadBalancerPolicy<S::Element>) -> Self {
88        Self {
89            supplier,
90            policy,
91            statistic: Statistic::default(),
92        }
93    }
94}
95
96#[async_trait]
97impl<S> LoadBalancerTrait for LoadBalancer<S>
98where
99    S: Supplier + Sync,
100    S::Future: Send,
101{
102    type Element = S::Element;
103    type Error = S::Error;
104
105    async fn choose(
106        &self,
107        extensions: &mut Extensions,
108    ) -> Result<Option<Self::Element>, Self::Error> {
109        // touch statistic
110        self.statistic.count.fetch_add(1, Ordering::SeqCst);
111        extensions.insert(self.statistic.clone());
112        let mut elements = self.supplier.get().await?;
113        let size = elements.len();
114        match size {
115            0 => Ok(None),
116            1 => Ok(Some(elements.remove(0))),
117            _ => {
118                // use policy choose and return the index
119                let index = self.policy.choose(&elements, extensions);
120                Ok(Some(elements.remove(index)))
121            }
122        }
123    }
124}