1use std::marker::PhantomData;
2
3use ntex_bytes::ByteString;
4use ntex_http::uri::Scheme;
5use ntex_io::IoBoxed;
6use ntex_net::connect::{Address, Connect, ConnectError, Connector as DefaultConnector};
7use ntex_service::cfg::{Cfg, SharedCfg};
8use ntex_service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
9use ntex_util::{channel::pool, time::timeout_checked};
10
11use crate::client::{SimpleClient, stream::InflightStorage};
12use crate::{client::ClientError, config::ServiceConfig};
13
14#[derive(Debug)]
15pub struct Connector<A: Address, T> {
17 connector: T,
18 scheme: Scheme,
19 pool: pool::Pool<()>,
20
21 _t: PhantomData<A>,
22}
23
24impl<A, T> Connector<A, T>
25where
26 A: Address,
27 T: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError>,
28 IoBoxed: From<T::Response>,
29{
30 pub fn new<F>(connector: F) -> Connector<A, T>
32 where
33 F: IntoServiceFactory<T, Connect<A>, SharedCfg>,
34 {
35 Connector {
36 connector: connector.into_factory(),
37 scheme: Scheme::HTTP,
38 pool: pool::new(),
39 _t: PhantomData,
40 }
41 }
42}
43
44impl<A> Default for Connector<A, DefaultConnector<A>>
45where
46 A: Address,
47{
48 fn default() -> Self {
50 Self::new(DefaultConnector::default())
51 }
52}
53
54impl<A, T> Connector<A, T>
55where
56 A: Address,
57{
58 #[inline]
59 pub fn scheme(&mut self, scheme: Scheme) -> &mut Self {
61 self.scheme = scheme;
62 self
63 }
64
65 pub fn connector<U, F>(&self, connector: F) -> Connector<A, U>
67 where
68 F: IntoServiceFactory<U, Connect<A>, SharedCfg>,
69 U: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError>,
70 IoBoxed: From<U::Response>,
71 {
72 Connector {
73 connector: connector.into_factory(),
74 scheme: self.scheme.clone(),
75 pool: self.pool.clone(),
76 _t: PhantomData,
77 }
78 }
79}
80
81impl<A, T> ServiceFactory<A, SharedCfg> for Connector<A, T>
82where
83 A: Address,
84 T: ServiceFactory<Connect<A>, SharedCfg, Error = ConnectError>,
85 IoBoxed: From<T::Response>,
86{
87 type Response = SimpleClient;
88 type Error = ClientError;
89 type InitError = T::InitError;
90 type Service = ConnectorService<A, T::Service>;
91
92 async fn create(&self, cfg: SharedCfg) -> Result<Self::Service, Self::InitError> {
93 let svc = self.connector.create(cfg).await?;
94 Ok(ConnectorService {
95 svc,
96 scheme: self.scheme.clone(),
97 config: cfg.get(),
98 pool: self.pool.clone(),
99 _t: PhantomData,
100 })
101 }
102}
103
104#[derive(Debug)]
105pub struct ConnectorService<A, T> {
106 svc: T,
107 scheme: Scheme,
108 config: Cfg<ServiceConfig>,
109 pool: pool::Pool<()>,
110 _t: PhantomData<A>,
111}
112
113impl<A, T> Service<A> for ConnectorService<A, T>
114where
115 A: Address,
116 T: Service<Connect<A>, Error = ConnectError>,
117 IoBoxed: From<T::Response>,
118{
119 type Response = SimpleClient;
120 type Error = ClientError;
121
122 async fn call(&self, req: A, ctx: ServiceCtx<'_, Self>) -> Result<SimpleClient, ClientError> {
124 let authority = ByteString::from(req.host());
125
126 let fut = async {
127 Ok::<_, ClientError>(SimpleClient::with_params(
128 ctx.call(&self.svc, Connect::new(req)).await?.into(),
129 self.config,
130 self.scheme.clone(),
131 authority,
132 false,
133 InflightStorage::default(),
134 self.pool.clone(),
135 ))
136 };
137
138 timeout_checked(self.config.handshake_timeout, fut)
139 .await
140 .map_err(|_| ClientError::HandshakeTimeout)
141 .and_then(|item| item)
142 }
143
144 ntex_service::forward_ready!(svc);
145 ntex_service::forward_poll!(svc);
146 ntex_service::forward_shutdown!(svc);
147}