hyper_client_pool/pool/
builder.rs

1use crate::deliverable::Deliverable;
2use hyper::client::{
3    connect::{
4        dns::{GaiResolver, Name},
5        Connect,
6    },
7    HttpConnector,
8};
9use hyper_tls::HttpsConnector;
10use std::sync::{Arc, RwLock};
11use std::{marker::PhantomData, net::SocketAddr};
12use tower_service::Service;
13
14use super::Pool;
15use crate::config::Config;
16use crate::error::SpawnError;
17use crate::executor::TransactionCounter;
18
19pub type PoolConnector<R> = HttpsConnector<HttpConnector<R>>;
20
21/// A trait used to wrap the PoolConnector used by default into a
22/// different type that implements Connect
23pub trait ConnectorAdaptor<R> {
24    type Connect: Connect;
25
26    fn wrap(connector: PoolConnector<R>) -> Self::Connect;
27}
28
29/// A trait used to create a DNS resolver. DefaultResolver uses the default DNS
30/// resolver built into hyper.
31pub trait CreateResolver {
32    type Resolver: 'static
33        + Clone
34        + Send
35        + Sync
36        + Service<Name, Error = Self::Error, Future = Self::Future, Response = Self::Response>;
37
38    type Error: 'static + Send + Sync + std::error::Error;
39    type Future: Send + std::future::Future<Output = Result<Self::Response, Self::Error>>;
40    type Response: Iterator<Item = SocketAddr>;
41
42    fn create_resolver() -> Self::Resolver;
43}
44
45struct DefaultResolver;
46
47impl CreateResolver for DefaultResolver {
48    type Resolver = GaiResolver;
49
50    type Error = <Self::Resolver as Service<Name>>::Error;
51    type Future = <Self::Resolver as Service<Name>>::Future;
52    type Response = <Self::Resolver as Service<Name>>::Response;
53
54    fn create_resolver() -> Self::Resolver {
55        GaiResolver::new()
56    }
57}
58
59/// Default type that implemented ConnectorAdaptor, just passes through the connector
60pub struct DefaultConnectorAdapator;
61
62pub struct PoolBuilder<D: Deliverable> {
63    pub(in crate::pool) config: Config,
64    pub(in crate::pool) transaction_counters: Option<Arc<RwLock<Vec<TransactionCounter>>>>,
65
66    _d: PhantomData<D>,
67}
68
69impl<D: Deliverable> PoolBuilder<D> {
70    pub(in crate::pool) fn new(config: Config) -> PoolBuilder<D> {
71        PoolBuilder {
72            config,
73            transaction_counters: None,
74
75            _d: PhantomData,
76        }
77    }
78
79    pub fn build(self) -> Result<Pool<D>, SpawnError> {
80        self.build_with_adaptor::<DefaultConnectorAdapator>()
81    }
82
83    /// Create the pool with a ConnectorAdaptor, a type that is used to
84    /// wrap the hyper::Client's connector
85    pub fn build_with_adaptor<A>(self) -> Result<Pool<D>, SpawnError>
86    where
87        A: ConnectorAdaptor<GaiResolver>,
88        A::Connect: 'static + Clone + Send + Sync,
89    {
90        self.build_with_adaptor_and_resolver::<A, DefaultResolver>()
91    }
92
93    /// Create the pool with a ConnectorAdaptor, a type that is used to
94    /// wrap the hyper::Client's connector
95    pub fn build_with_adaptor_and_resolver<A, CR>(self) -> Result<Pool<D>, SpawnError>
96    where
97        A: ConnectorAdaptor<CR::Resolver>,
98        A::Connect: 'static + Clone + Send + Sync,
99        CR: CreateResolver,
100        CR::Resolver: 'static + Clone + Send + Sync + Service<Name>,
101        CR::Error: 'static + Send + Sync + std::error::Error,
102        CR::Future: Send + std::future::Future<Output = Result<CR::Response, CR::Error>>,
103        CR::Response: Iterator<Item = SocketAddr>,
104    {
105        Pool::new::<A, CR>(self)
106    }
107
108    /// Pass in an synchronized Vec<Weak<WeakCounter>> that will be populated
109    /// with transaction counters for each of the workers spawned.
110    ///
111    /// You can check that the WeakCounter is still valid by ensuring that the
112    /// Arc::strong_count on the Weak reference
113    pub fn transaction_counters(mut self, value: Arc<RwLock<Vec<TransactionCounter>>>) -> Self {
114        self.transaction_counters = Some(value);
115        self
116    }
117}
118
119impl<R> ConnectorAdaptor<R> for DefaultConnectorAdapator
120where
121    R: 'static + Clone + Send + Sync + Service<Name>,
122    R::Error: 'static + Send + Sync + std::error::Error,
123    R::Future: Send + std::future::Future<Output = Result<R::Response, R::Error>>,
124    R::Response: Iterator<Item = SocketAddr>,
125{
126    type Connect = PoolConnector<R>;
127
128    fn wrap(connector: PoolConnector<R>) -> Self::Connect {
129        connector
130    }
131}