reqwest_lb/supplier/
lb.rs

1use crate::lb::{LoadBalancerPolicy, LoadBalancerPolicyTrait, Statistic};
2use crate::supplier::Supplier;
3use crate::LoadBalancerTrait;
4use http::Extensions;
5use pin_project_lite::pin_project;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::atomic::Ordering;
9use std::task::{ready, Context, Poll};
10
11pub struct LoadBalancer<S: Supplier> {
12    supplier: S,
13    policy: LoadBalancerPolicy<S::Element>,
14    statistic: Statistic,
15}
16
17impl<S: Supplier> LoadBalancer<S> {
18    pub fn new(supplier: S, policy: LoadBalancerPolicy<S::Element>) -> Self {
19        Self {
20            supplier,
21            policy,
22            statistic: Statistic::default(),
23        }
24    }
25}
26
27impl<S> LoadBalancerTrait for LoadBalancer<S>
28where
29    S: Supplier,
30{
31    type Element = S::Element;
32    type Error = S::Error;
33    type Future = ChooseFuture<S::Element, S::Future>;
34
35    fn choose(&self, extensions: &mut Extensions) -> Self::Future {
36        // touch statistic
37        self.statistic.count.fetch_add(1, Ordering::SeqCst);
38        extensions.insert(self.statistic.clone());
39        let extensions = extensions.clone();
40        let future = self.supplier.get();
41        let policy = self.policy.clone();
42        ChooseFuture {
43            extensions,
44            policy,
45            future,
46        }
47    }
48}
49
50pin_project! {
51    pub struct ChooseFuture<I, F> {
52        extensions: Extensions,
53        policy: LoadBalancerPolicy<I>,
54        #[pin]
55        future: F,
56    }
57}
58
59impl<I, E, F> Future for ChooseFuture<I, F>
60where
61    F: Future<Output = Result<Vec<I>, E>>,
62{
63    type Output = Result<Option<I>, E>;
64
65    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66        let project = self.project();
67        match ready!(project.future.poll(cx)) {
68            Ok(mut elements) => {
69                let size = elements.len();
70                Poll::Ready(match size {
71                    0 => Ok(None),
72                    1 => Ok(Some(elements.remove(0))),
73                    _ => {
74                        // use policy choose and return the index
75                        let index = project.policy.choose(&elements, project.extensions);
76                        Ok(Some(elements.remove(index)))
77                    }
78                })
79            }
80            Err(e) => Poll::Ready(Err(e)),
81        }
82    }
83}