reqwest_lb/load_balancer/
mod.rs1mod policy;
2mod registry;
3mod weight;
4
5pub use policy::{LoadBalancerPolicy, LoadBalancerPolicyTrait};
6pub use registry::LoadBalancerRegistry;
7pub use weight::WeightProvider;
8
9use crate::supplier::Supplier;
10use futures::future::BoxFuture;
11use futures::ready;
12use http::Extensions;
13use pin_project_lite::pin_project;
14use std::future::Future;
15use std::pin::Pin;
16use std::sync::atomic::AtomicU64;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19use std::{fmt::Debug, sync::atomic::Ordering};
20
21pub type BoxLoadBalancer<I, E> = Box<
22 dyn LoadBalancerTrait<Element = I, Error = E, Future = BoxFuture<'static, Result<Option<I>, E>>>
23 + Send
24 + Sync,
25>;
26
27pub trait LoadBalancerTrait {
28 type Element;
32
33 type Error;
37
38 type Future: Future<Output = Result<Option<Self::Element>, Self::Error>>;
42
43 fn choose(&self, extensions: &mut Extensions) -> Self::Future;
47
48 fn boxed(self) -> BoxLoadBalancer<Self::Element, Self::Error>
52 where
53 Self: Sized + Send + Sync + 'static,
54 Self::Future: Send + 'static,
55 {
56 Box::new(MapFuture::new(self))
57 }
58}
59
60struct MapFuture<L> {
61 inner: L,
62}
63
64impl<L> MapFuture<L> {
65 pub fn new(inner: L) -> Self {
66 Self { inner }
67 }
68}
69
70impl<L> LoadBalancerTrait for MapFuture<L>
71where
72 L: LoadBalancerTrait,
73 L::Future: Send + 'static,
74{
75 type Element = L::Element;
76 type Error = L::Error;
77 type Future = BoxFuture<'static, Result<Option<Self::Element>, Self::Error>>;
78
79 fn choose(&self, extensions: &mut Extensions) -> Self::Future {
80 Box::pin(self.inner.choose(extensions))
81 }
82}
83
84#[derive(Debug, Clone, Default)]
85pub struct Statistic {
86 pub count: Arc<AtomicU64>,
87}
88
89pub struct LoadBalancer<S: Supplier> {
90 supplier: S,
91 policy: LoadBalancerPolicy<S::Element>,
92 statistic: Statistic,
93}
94
95impl<S: Supplier> LoadBalancer<S> {
96 pub fn new(supplier: S, policy: LoadBalancerPolicy<S::Element>) -> Self {
97 Self {
98 supplier,
99 policy,
100 statistic: Statistic::default(),
101 }
102 }
103}
104
105impl<S> LoadBalancerTrait for LoadBalancer<S>
106where
107 S: Supplier,
108{
109 type Element = S::Element;
110 type Error = S::Error;
111 type Future = ChooseFuture<S::Element, S::Future>;
112
113 fn choose(&self, extensions: &mut Extensions) -> Self::Future {
114 self.statistic.count.fetch_add(1, Ordering::SeqCst);
116 extensions.insert(self.statistic.clone());
117 let extensions = extensions.clone();
118 let future = self.supplier.get();
119 let policy = self.policy.clone();
120 ChooseFuture {
121 extensions,
122 policy,
123 future,
124 }
125 }
126}
127
128pin_project! {
129 pub struct ChooseFuture<I, F> {
130 extensions: Extensions,
131 policy: LoadBalancerPolicy<I>,
132 #[pin]
133 future: F,
134 }
135}
136
137impl<I, E, F> Future for ChooseFuture<I, F>
138where
139 F: Future<Output = Result<Vec<I>, E>>,
140{
141 type Output = Result<Option<I>, E>;
142
143 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
144 let project = self.project();
145 match ready!(project.future.poll(cx)) {
146 Ok(mut elements) => {
147 let size = elements.len();
148 Poll::Ready(match size {
149 0 => Ok(None),
150 1 => Ok(Some(elements.remove(0))),
151 _ => {
152 let index = project.policy.choose(&elements, project.extensions);
154 Ok(Some(elements.remove(index)))
155 }
156 })
157 }
158 Err(e) => Poll::Ready(Err(e)),
159 }
160 }
161}