Skip to main content

reqwest_lb/load_balancer/
mod.rs

1mod policy;
2mod registry;
3mod weight;
4
5pub use policy::{LoadBalancerPolicy, LoadBalancerPolicyTrait};
6pub use registry::LoadBalancerRegistry;
7pub use weight::WeightProvider;
8
9use crate::supplier::Supplier;
10use futures::future::BoxFuture;
11use futures::ready;
12use http::Extensions;
13use pin_project_lite::pin_project;
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::atomic::AtomicU64;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19use std::{fmt::Debug, sync::atomic::Ordering};
20
21pub type BoxLoadBalancer<I, E> = Box<
22    dyn LoadBalancerTrait<Element = I, Error = E, Future = BoxFuture<'static, Result<Option<I>, E>>>
23        + Send
24        + Sync,
25>;
26
27pub trait LoadBalancerTrait {
28    ///
29    /// load balancer element type
30    ///
31    type Element;
32
33    ///
34    /// load balancer choose element maybe error type
35    ///
36    type Error;
37
38    ///
39    /// load balancer choose element future type
40    ///
41    type Future: Future<Output = Result<Option<Self::Element>, Self::Error>>;
42
43    ///
44    /// load balancer choose a effect element
45    ///
46    fn choose(&self, extensions: &mut Extensions) -> Self::Future;
47
48    ///
49    /// Wrap to boxed load balancer
50    ///
51    fn boxed(self) -> BoxLoadBalancer<Self::Element, Self::Error>
52    where
53        Self: Sized + Send + Sync + 'static,
54        Self::Future: Send + 'static,
55    {
56        Box::new(MapFuture::new(self))
57    }
58}
59
60struct MapFuture<L> {
61    inner: L,
62}
63
64impl<L> MapFuture<L> {
65    pub fn new(inner: L) -> Self {
66        Self { inner }
67    }
68}
69
70impl<L> LoadBalancerTrait for MapFuture<L>
71where
72    L: LoadBalancerTrait,
73    L::Future: Send + 'static,
74{
75    type Element = L::Element;
76    type Error = L::Error;
77    type Future = BoxFuture<'static, Result<Option<Self::Element>, Self::Error>>;
78
79    fn choose(&self, extensions: &mut Extensions) -> Self::Future {
80        Box::pin(self.inner.choose(extensions))
81    }
82}
83
84#[derive(Debug, Clone, Default)]
85pub struct Statistic {
86    pub count: Arc<AtomicU64>,
87}
88
89pub struct LoadBalancer<S: Supplier> {
90    supplier: S,
91    policy: LoadBalancerPolicy<S::Element>,
92    statistic: Statistic,
93}
94
95impl<S: Supplier> LoadBalancer<S> {
96    pub fn new(supplier: S, policy: LoadBalancerPolicy<S::Element>) -> Self {
97        Self {
98            supplier,
99            policy,
100            statistic: Statistic::default(),
101        }
102    }
103}
104
105impl<S> LoadBalancerTrait for LoadBalancer<S>
106where
107    S: Supplier,
108{
109    type Element = S::Element;
110    type Error = S::Error;
111    type Future = ChooseFuture<S::Element, S::Future>;
112
113    fn choose(&self, extensions: &mut Extensions) -> Self::Future {
114        // touch statistic
115        self.statistic.count.fetch_add(1, Ordering::SeqCst);
116        extensions.insert(self.statistic.clone());
117        let extensions = extensions.clone();
118        let future = self.supplier.get();
119        let policy = self.policy.clone();
120        ChooseFuture {
121            extensions,
122            policy,
123            future,
124        }
125    }
126}
127
128pin_project! {
129    pub struct ChooseFuture<I, F> {
130        extensions: Extensions,
131        policy: LoadBalancerPolicy<I>,
132        #[pin]
133        future: F,
134    }
135}
136
137impl<I, E, F> Future for ChooseFuture<I, F>
138where
139    F: Future<Output = Result<Vec<I>, E>>,
140{
141    type Output = Result<Option<I>, E>;
142
143    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
144        let project = self.project();
145        match ready!(project.future.poll(cx)) {
146            Ok(mut elements) => {
147                let size = elements.len();
148                Poll::Ready(match size {
149                    0 => Ok(None),
150                    1 => Ok(Some(elements.remove(0))),
151                    _ => {
152                        // use policy choose and return the index
153                        let index = project.policy.choose(&elements, project.extensions);
154                        Ok(Some(elements.remove(index)))
155                    }
156                })
157            }
158            Err(e) => Poll::Ready(Err(e)),
159        }
160    }
161}