reqwest_lb/supplier/
lb.rs1use crate::lb::{LoadBalancerPolicy, LoadBalancerPolicyTrait, Statistic};
2use crate::supplier::Supplier;
3use crate::LoadBalancerTrait;
4use http::Extensions;
5use pin_project_lite::pin_project;
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::atomic::Ordering;
9use std::task::{ready, Context, Poll};
10
11pub struct LoadBalancer<S: Supplier> {
12 supplier: S,
13 policy: LoadBalancerPolicy<S::Element>,
14 statistic: Statistic,
15}
16
17impl<S: Supplier> LoadBalancer<S> {
18 pub fn new(supplier: S, policy: LoadBalancerPolicy<S::Element>) -> Self {
19 Self {
20 supplier,
21 policy,
22 statistic: Statistic::default(),
23 }
24 }
25}
26
27impl<S> LoadBalancerTrait for LoadBalancer<S>
28where
29 S: Supplier,
30{
31 type Element = S::Element;
32 type Error = S::Error;
33 type Future = ChooseFuture<S::Element, S::Future>;
34
35 fn choose(&self, extensions: &mut Extensions) -> Self::Future {
36 self.statistic.count.fetch_add(1, Ordering::SeqCst);
38 extensions.insert(self.statistic.clone());
39 let extensions = extensions.clone();
40 let future = self.supplier.get();
41 let policy = self.policy.clone();
42 ChooseFuture {
43 extensions,
44 policy,
45 future,
46 }
47 }
48}
49
50pin_project! {
51 pub struct ChooseFuture<I, F> {
52 extensions: Extensions,
53 policy: LoadBalancerPolicy<I>,
54 #[pin]
55 future: F,
56 }
57}
58
59impl<I, E, F> Future for ChooseFuture<I, F>
60where
61 F: Future<Output = Result<Vec<I>, E>>,
62{
63 type Output = Result<Option<I>, E>;
64
65 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
66 let project = self.project();
67 match ready!(project.future.poll(cx)) {
68 Ok(mut elements) => {
69 let size = elements.len();
70 Poll::Ready(match size {
71 0 => Ok(None),
72 1 => Ok(Some(elements.remove(0))),
73 _ => {
74 let index = project.policy.choose(&elements, project.extensions);
76 Ok(Some(elements.remove(index)))
77 }
78 })
79 }
80 Err(e) => Poll::Ready(Err(e)),
81 }
82 }
83}