reqwest_lb/load_balancer/
mod.rs1mod policy;
2mod registry;
3mod weight;
4
5pub use policy::{LoadBalancerPolicy, LoadBalancerPolicyTrait};
6pub use registry::LoadBalancerRegistry;
7pub use weight::WeightProvider;
8
9use crate::supplier::Supplier;
10use async_trait::async_trait;
11use http::Extensions;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::{fmt::Debug, sync::atomic::Ordering};
15
16pub type BoxLoadBalancer<I, E> = Box<dyn LoadBalancerTrait<Element = I, Error = E> + Send + Sync>;
17
18#[async_trait]
19pub trait LoadBalancerTrait {
20 type Element;
24
25 type Error;
29
30 async fn choose(
34 &self,
35 extensions: &mut Extensions,
36 ) -> Result<Option<Self::Element>, Self::Error>;
37
38 fn boxed(self) -> BoxLoadBalancer<Self::Element, Self::Error>
42 where
43 Self: Sized + Send + Sync + 'static,
44 {
45 Box::new(MapFuture::new(self))
46 }
47}
48
49struct MapFuture<L> {
50 inner: L,
51}
52
53impl<L> MapFuture<L> {
54 pub fn new(inner: L) -> Self {
55 Self { inner }
56 }
57}
58
59#[async_trait]
60impl<L> LoadBalancerTrait for MapFuture<L>
61where
62 L: LoadBalancerTrait + Sync,
63{
64 type Element = L::Element;
65 type Error = L::Error;
66
67 async fn choose(
68 &self,
69 extensions: &mut Extensions,
70 ) -> Result<Option<Self::Element>, Self::Error> {
71 self.inner.choose(extensions).await
72 }
73}
74
75#[derive(Debug, Clone, Default)]
76pub struct Statistic {
77 pub count: Arc<AtomicU64>,
78}
79
80pub struct LoadBalancer<S: Supplier> {
81 supplier: S,
82 policy: LoadBalancerPolicy<S::Element>,
83 statistic: Statistic,
84}
85
86impl<S: Supplier> LoadBalancer<S> {
87 pub fn new(supplier: S, policy: LoadBalancerPolicy<S::Element>) -> Self {
88 Self {
89 supplier,
90 policy,
91 statistic: Statistic::default(),
92 }
93 }
94}
95
96#[async_trait]
97impl<S> LoadBalancerTrait for LoadBalancer<S>
98where
99 S: Supplier + Sync,
100 S::Future: Send,
101{
102 type Element = S::Element;
103 type Error = S::Error;
104
105 async fn choose(
106 &self,
107 extensions: &mut Extensions,
108 ) -> Result<Option<Self::Element>, Self::Error> {
109 self.statistic.count.fetch_add(1, Ordering::SeqCst);
111 extensions.insert(self.statistic.clone());
112 let mut elements = self.supplier.get().await?;
113 let size = elements.len();
114 match size {
115 0 => Ok(None),
116 1 => Ok(Some(elements.remove(0))),
117 _ => {
118 let index = self.policy.choose(&elements, extensions);
120 Ok(Some(elements.remove(index)))
121 }
122 }
123 }
124}