ntex_h2/client/
connector.rs

1use std::marker::PhantomData;
2
3use ntex_bytes::ByteString;
4use ntex_http::uri::Scheme;
5use ntex_io::IoBoxed;
6use ntex_net::connect::{Address, Connect, ConnectError, Connector as DefaultConnector};
7use ntex_service::cfg::{Cfg, SharedCfg};
8use ntex_service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
9use ntex_util::{channel::pool, time::timeout_checked};
10
11use crate::client::{SimpleClient, stream::InflightStorage};
12use crate::{client::ClientError, config::ServiceConfig};
13
14#[derive(Debug)]
15/// Http2 client connector
16pub struct Connector<A: Address, T> {
17    connector: T,
18    scheme: Scheme,
19    pool: pool::Pool<()>,
20
21    _t: PhantomData<A>,
22}
23
24impl<A, T> Connector<A, T>
25where
26    A: Address,
27    T: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError>,
28    IoBoxed: From<T::Response>,
29{
30    /// Create new http2 connector
31    pub fn new<F>(connector: F) -> Connector<A, T>
32    where
33        F: IntoServiceFactory<T, Connect<A>, SharedCfg>,
34    {
35        Connector {
36            connector: connector.into_factory(),
37            scheme: Scheme::HTTP,
38            pool: pool::new(),
39            _t: PhantomData,
40        }
41    }
42}
43
44impl<A> Default for Connector<A, DefaultConnector<A>>
45where
46    A: Address,
47{
48    /// Create new h2 connector
49    fn default() -> Self {
50        Self::new(DefaultConnector::default())
51    }
52}
53
54impl<A, T> Connector<A, T>
55where
56    A: Address,
57{
58    #[inline]
59    /// Set scheme
60    pub fn scheme(&mut self, scheme: Scheme) -> &mut Self {
61        self.scheme = scheme;
62        self
63    }
64
65    /// Use custom connector
66    pub fn connector<U, F>(&self, connector: F) -> Connector<A, U>
67    where
68        F: IntoServiceFactory<U, Connect<A>, SharedCfg>,
69        U: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError>,
70        IoBoxed: From<U::Response>,
71    {
72        Connector {
73            connector: connector.into_factory(),
74            scheme: self.scheme.clone(),
75            pool: self.pool.clone(),
76            _t: PhantomData,
77        }
78    }
79}
80
81impl<A, T> ServiceFactory<A, SharedCfg> for Connector<A, T>
82where
83    A: Address,
84    T: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError>,
85    IoBoxed: From<T::Response>,
86{
87    type Response = SimpleClient;
88    type Error = ClientError;
89    type InitError = T::InitError;
90    type Service = ConnectorService<A, T::Service>;
91
92    async fn create(&self, cfg: SharedCfg) -> Result<Self::Service, Self::InitError> {
93        let svc = self.connector.create(cfg).await?;
94        Ok(ConnectorService {
95            svc,
96            scheme: self.scheme.clone(),
97            config: cfg.get(),
98            pool: self.pool.clone(),
99            _t: PhantomData,
100        })
101    }
102}
103
104#[derive(Debug)]
105pub struct ConnectorService<A, T> {
106    svc: T,
107    scheme: Scheme,
108    config: Cfg<ServiceConfig>,
109    pool: pool::Pool<()>,
110    _t: PhantomData<A>,
111}
112
113impl<A, T> Service<A> for ConnectorService<A, T>
114where
115    A: Address,
116    T: Service<Connect<A>, Error = ConnectError>,
117    IoBoxed: From<T::Response>,
118{
119    type Response = SimpleClient;
120    type Error = ClientError;
121
122    /// Connect to http2 server
123    async fn call(&self, req: A, ctx: ServiceCtx<'_, Self>) -> Result<SimpleClient, ClientError> {
124        let authority = ByteString::from(req.host());
125
126        let fut = async {
127            Ok::<_, ClientError>(SimpleClient::with_params(
128                ctx.call(&self.svc, Connect::new(req)).await?.into(),
129                self.config,
130                self.scheme.clone(),
131                authority,
132                false,
133                InflightStorage::default(),
134                self.pool.clone(),
135            ))
136        };
137
138        timeout_checked(self.config.handshake_timeout, fut)
139            .await
140            .map_err(|_| ClientError::HandshakeTimeout)
141            .and_then(|item| item)
142    }
143
144    ntex_service::forward_ready!(svc);
145    ntex_service::forward_poll!(svc);
146    ntex_service::forward_shutdown!(svc);
147}