1use std::marker::PhantomData;
2
3use ntex_bytes::ByteString;
4use ntex_error::Error;
5use ntex_http::uri::Scheme;
6use ntex_io::IoBoxed;
7use ntex_net::connect::{Address, Connect, ConnectError, Connector2 as DefaultConnector};
8use ntex_service::cfg::{Cfg, SharedCfg};
9use ntex_service::{IntoServiceFactory, Service, ServiceCtx, ServiceFactory};
10use ntex_util::{channel::pool, time::timeout_checked};
11
12use crate::client::{SimpleClient, stream::InflightStorage};
13use crate::{client::ClientError, config::ServiceConfig};
14
15#[derive(Debug)]
16pub struct Connector<A: Address, T> {
18 svc: T,
19 scheme: Scheme,
20 pool: pool::Pool<()>,
21
22 _t: PhantomData<A>,
23}
24
25impl<A, T> Connector<A, T>
26where
27 A: Address,
28 T: ServiceFactory<Connect<A>, SharedCfg, Error = Error<ConnectError>>,
29 IoBoxed: From<T::Response>,
30{
31 pub fn new<F>(svc: F) -> Connector<A, T>
33 where
34 F: IntoServiceFactory<T, Connect<A>, SharedCfg>,
35 {
36 Connector {
37 svc: svc.into_factory(),
38 scheme: Scheme::HTTP,
39 pool: pool::new(),
40 _t: PhantomData,
41 }
42 }
43}
44
45impl<A> Default for Connector<A, DefaultConnector<A>>
46where
47 A: Address,
48{
49 fn default() -> Self {
51 Self::new(DefaultConnector::default())
52 }
53}
54
55impl<A, T> Connector<A, T>
56where
57 A: Address,
58{
59 #[inline]
60 pub fn scheme(&mut self, scheme: Scheme) -> &mut Self {
62 self.scheme = scheme;
63 self
64 }
65
66 pub fn connector<U, F>(&self, svc: F) -> Connector<A, U>
68 where
69 F: IntoServiceFactory<U, Connect<A>, SharedCfg>,
70 U: ServiceFactory<Connect<A>, SharedCfg, Error = Error<ConnectError>>,
71 IoBoxed: From<U::Response>,
72 {
73 Connector {
74 svc: svc.into_factory(),
75 scheme: self.scheme.clone(),
76 pool: self.pool.clone(),
77 _t: PhantomData,
78 }
79 }
80}
81
82impl<A, T> ServiceFactory<A, SharedCfg> for Connector<A, T>
83where
84 A: Address,
85 T: ServiceFactory<Connect<A>, SharedCfg, Error = Error<ConnectError>>,
86 IoBoxed: From<T::Response>,
87{
88 type Response = SimpleClient;
89 type Error = Error<ClientError>;
90 type InitError = T::InitError;
91 type Service = ConnectorService<A, T::Service>;
92
93 async fn create(&self, cfg: SharedCfg) -> Result<Self::Service, Self::InitError> {
94 let config = cfg.get();
95 let svc = self.svc.create(cfg).await?;
96 Ok(ConnectorService {
97 svc,
98 config,
99 scheme: self.scheme.clone(),
100 pool: self.pool.clone(),
101 _t: PhantomData,
102 })
103 }
104}
105
106#[derive(Debug)]
107pub struct ConnectorService<A, T> {
108 svc: T,
109 scheme: Scheme,
110 config: Cfg<ServiceConfig>,
111 pool: pool::Pool<()>,
112 _t: PhantomData<A>,
113}
114
115impl<A, T> Service<A> for ConnectorService<A, T>
116where
117 A: Address,
118 T: Service<Connect<A>, Error = Error<ConnectError>>,
119 IoBoxed: From<T::Response>,
120{
121 type Response = SimpleClient;
122 type Error = Error<ClientError>;
123
124 async fn call(&self, req: A, ctx: ServiceCtx<'_, Self>) -> Result<SimpleClient, Self::Error> {
126 let authority = ByteString::from(req.host());
127
128 let fut = async {
129 let io = ctx
130 .call(&self.svc, Connect::new(req))
131 .await
132 .map_err(|e| e.map(ClientError::from))?;
133
134 Ok::<_, Error<ClientError>>(SimpleClient::with_params(
135 io.into(),
136 self.config.clone(),
137 &self.scheme,
138 authority,
139 false,
140 InflightStorage::default(),
141 self.pool.clone(),
142 ))
143 };
144
145 timeout_checked(self.config.handshake_timeout, fut)
146 .await
147 .map_err(|()| {
148 Error::from(ClientError::HandshakeTimeout).set_service(self.config.service())
149 })
150 .and_then(|item| item)
151 }
152
153 ntex_service::forward_ready!(svc, |e| e.map(ClientError::from));
154 ntex_service::forward_poll!(svc, |e| e.map(ClientError::from));
155 ntex_service::forward_shutdown!(svc);
156}