Skip to main content

ntex_h2/client/
connector.rs

1use std::marker::PhantomData;
2
3use ntex_bytes::ByteString;
4use ntex_error::Error;
5use ntex_http::uri::Scheme;
6use ntex_io::IoBoxed;
7use ntex_net::connect::{Address, Connect, ConnectError, Connector2 as DefaultConnector};
8use ntex_service::cfg::{Cfg, SharedCfg};
9use ntex_service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
10use ntex_util::{channel::pool, time::timeout_checked};
11
12use crate::client::{SimpleClient, stream::InflightStorage};
13use crate::{client::ClientError, config::ServiceConfig};
14
15#[derive(Debug)]
16/// Http2 client connector
17pub struct Connector<A: Address, T> {
18    svc: T,
19    scheme: Scheme,
20    pool: pool::Pool<()>,
21
22    _t: PhantomData<A>,
23}
24
25impl<A, T> Connector<A, T>
26where
27    A: Address,
28    T: ServiceFactory<Connect<A>, SharedCfg, Error = Error<ConnectError>>,
29    IoBoxed: From<T::Response>,
30{
31    /// Create new http2 connector
32    pub fn new<F>(svc: F) -> Connector<A, T>
33    where
34        F: IntoServiceFactory<T, Connect<A>, SharedCfg>,
35    {
36        Connector {
37            svc: svc.into_factory(),
38            scheme: Scheme::HTTP,
39            pool: pool::new(),
40            _t: PhantomData,
41        }
42    }
43}
44
45impl<A> Default for Connector<A, DefaultConnector<A>>
46where
47    A: Address,
48{
49    /// Create new h2 connector
50    fn default() -> Self {
51        Self::new(DefaultConnector::default())
52    }
53}
54
55impl<A, T> Connector<A, T>
56where
57    A: Address,
58{
59    #[inline]
60    /// Set scheme
61    pub fn scheme(&mut self, scheme: Scheme) -> &mut Self {
62        self.scheme = scheme;
63        self
64    }
65
66    /// Use custom connector
67    pub fn connector<U, F>(&self, svc: F) -> Connector<A, U>
68    where
69        F: IntoServiceFactory<U, Connect<A>, SharedCfg>,
70        U: ServiceFactory<Connect<A>, SharedCfg, Error = Error<ConnectError>>,
71        IoBoxed: From<U::Response>,
72    {
73        Connector {
74            svc: svc.into_factory(),
75            scheme: self.scheme.clone(),
76            pool: self.pool.clone(),
77            _t: PhantomData,
78        }
79    }
80}
81
82impl<A, T> ServiceFactory<A, SharedCfg> for Connector<A, T>
83where
84    A: Address,
85    T: ServiceFactory<Connect<A>, SharedCfg, Error = Error<ConnectError>>,
86    IoBoxed: From<T::Response>,
87{
88    type Response = SimpleClient;
89    type Error = Error<ClientError>;
90    type InitError = T::InitError;
91    type Service = ConnectorService<A, T::Service>;
92
93    async fn create(&self, cfg: SharedCfg) -> Result<Self::Service, Self::InitError> {
94        let config = cfg.get();
95        let svc = self.svc.create(cfg).await?;
96        Ok(ConnectorService {
97            svc,
98            config,
99            scheme: self.scheme.clone(),
100            pool: self.pool.clone(),
101            _t: PhantomData,
102        })
103    }
104}
105
106#[derive(Debug)]
107pub struct ConnectorService<A, T> {
108    svc: T,
109    scheme: Scheme,
110    config: Cfg<ServiceConfig>,
111    pool: pool::Pool<()>,
112    _t: PhantomData<A>,
113}
114
115impl<A, T> Service<A> for ConnectorService<A, T>
116where
117    A: Address,
118    T: Service<Connect<A>, Error = Error<ConnectError>>,
119    IoBoxed: From<T::Response>,
120{
121    type Response = SimpleClient;
122    type Error = Error<ClientError>;
123
124    /// Connect to http2 server
125    async fn call(&self, req: A, ctx: ServiceCtx<'_, Self>) -> Result<SimpleClient, Self::Error> {
126        let authority = ByteString::from(req.host());
127
128        let fut = async {
129            let io = ctx
130                .call(&self.svc, Connect::new(req))
131                .await
132                .map_err(|e| e.map(ClientError::from))?;
133
134            Ok::<_, Error<ClientError>>(SimpleClient::with_params(
135                io.into(),
136                self.config.clone(),
137                &self.scheme,
138                authority,
139                false,
140                InflightStorage::default(),
141                self.pool.clone(),
142            ))
143        };
144
145        timeout_checked(self.config.handshake_timeout, fut)
146            .await
147            .map_err(|()| {
148                Error::from(ClientError::HandshakeTimeout).set_service(self.config.service())
149            })
150            .and_then(|item| item)
151    }
152
153    ntex_service::forward_ready!(svc, |e| e.map(ClientError::from));
154    ntex_service::forward_poll!(svc, |e| e.map(ClientError::from));
155    ntex_service::forward_shutdown!(svc);
156}