ntex_net/connect/
service.rs1use std::{collections::VecDeque, io, marker::PhantomData, net::SocketAddr};
2
3use ntex_io::{Io, IoConfig, types};
4use ntex_service::cfg::{Cfg, SharedCfg};
5use ntex_service::{Service, ServiceCtx, ServiceFactory};
6use ntex_util::{future::Either, time::timeout_checked};
7
8use super::{Address, Connect, ConnectError, ConnectServiceError, resolve};
9
10#[derive(Copy, Clone, Debug)]
11pub struct Connector<T>(PhantomData<T>);
13
14#[derive(Copy, Clone, Debug)]
15pub struct ConnectorService<T> {
17 cfg: Cfg<IoConfig>,
18 shared: SharedCfg,
19 _t: PhantomData<T>,
20}
21
22impl<T> Connector<T> {
23 pub fn new() -> Self {
25 Connector(PhantomData)
26 }
27}
28
29impl<T> Default for Connector<T> {
30 fn default() -> Self {
31 Self::new()
32 }
33}
34
35impl<T> ConnectorService<T> {
36 #[inline]
37 pub fn new() -> Self {
39 ConnectorService::with(Default::default())
40 }
41
42 #[inline]
43 pub fn with(cfg: SharedCfg) -> Self {
45 ConnectorService {
46 cfg: cfg.get(),
47 shared: cfg,
48 _t: PhantomData,
49 }
50 }
51}
52
53impl<T> Default for ConnectorService<T> {
54 fn default() -> Self {
55 ConnectorService::new()
56 }
57}
58
59impl<T: Address> ConnectorService<T> {
60 pub async fn connect<U>(&self, message: U) -> Result<Io, ConnectError>
62 where
63 Connect<T>: From<U>,
64 {
65 timeout_checked(self.cfg.connect_timeout(), async {
66 let msg = resolve::lookup(message.into(), self.shared.tag()).await?;
68
69 let port = msg.port();
70 let Connect { req, addr, .. } = msg;
71
72 if let Some(addr) = addr {
73 connect(req, port, addr, self.shared).await
74 } else if let Some(addr) = req.addr() {
75 connect(req, addr.port(), Either::Left(addr), self.shared).await
76 } else {
77 log::error!("{}: TCP connector: got unresolved address", self.cfg.tag());
78 Err(ConnectError::Unresolved)
79 }
80 })
81 .await
82 .map_err(|_| {
83 ConnectError::Io(io::Error::new(io::ErrorKind::TimedOut, "Connect timeout"))
84 })
85 .and_then(|item| item)
86 }
87}
88
89impl<T: Address> ServiceFactory<Connect<T>, SharedCfg> for Connector<T> {
90 type Response = Io;
91 type Error = ConnectError;
92 type Service = ConnectorService<T>;
93 type InitError = ConnectServiceError;
94
95 async fn create(&self, cfg: SharedCfg) -> Result<Self::Service, Self::InitError> {
96 Ok(ConnectorService::with(cfg))
97 }
98}
99
100impl<T: Address> Service<Connect<T>> for ConnectorService<T> {
101 type Response = Io;
102 type Error = ConnectError;
103
104 async fn call(
105 &self,
106 req: Connect<T>,
107 _: ServiceCtx<'_, Self>,
108 ) -> Result<Self::Response, Self::Error> {
109 self.connect(req).await
110 }
111}
112
113async fn connect<T: Address>(
115 req: T,
116 port: u16,
117 addr: Either<SocketAddr, VecDeque<SocketAddr>>,
118 cfg: SharedCfg,
119) -> Result<Io, ConnectError> {
120 log::trace!(
121 "{}: TCP connector - connecting to {:?} addr:{addr:?} port:{port}",
122 cfg.tag(),
123 req.host(),
124 );
125
126 let io = match addr {
127 Either::Left(addr) => crate::tcp_connect(addr, cfg).await?,
128 Either::Right(mut addrs) => loop {
129 let addr = addrs.pop_front().unwrap();
130
131 match crate::tcp_connect(addr, cfg).await {
132 Ok(io) => break io,
133 Err(err) => {
134 log::trace!(
135 "{}: TCP connector - failed to connect to {:?} port: {port} err: {err:?}",
136 cfg.tag(),
137 req.host(),
138 );
139 if addrs.is_empty() {
140 return Err(err.into());
141 }
142 }
143 }
144 },
145 };
146
147 log::trace!(
148 "{}: TCP connector - successfully connected to {:?} - {:?}",
149 cfg.tag(),
150 req.host(),
151 io.query::<types::PeerAddr>().get()
152 );
153 Ok(io)
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159
160 use ntex_util::time::Millis;
161
162 #[ntex::test]
163 async fn test_connect() {
164 let server = ntex::server::test_server(async || {
165 ntex_service::fn_service(|_| async { Ok::<_, ()>(()) })
166 });
167
168 let srv = Connector::default()
169 .create(
170 SharedCfg::new("T")
171 .add(IoConfig::new().set_connect_timeout(Millis(5000)))
172 .into(),
173 )
174 .await
175 .unwrap();
176 let result = srv.connect("").await;
177 assert!(result.is_err());
178 let result = srv.connect("localhost:99999").await;
179 assert!(result.is_err());
180 assert!(format!("{srv:?}").contains("Connector"));
181
182 let srv = ConnectorService::default();
183 let result = srv.connect(format!("{}", server.addr())).await;
184 assert!(result.is_ok());
185
186 let msg = Connect::new(format!("{}", server.addr())).set_addrs(vec![
187 format!("127.0.0.1:{}", server.addr().port() - 1)
188 .parse()
189 .unwrap(),
190 server.addr(),
191 ]);
192 let result = crate::connect::connect(msg).await;
193 assert!(result.is_ok());
194
195 let msg = Connect::new(server.addr());
196 let result = crate::connect::connect(msg).await;
197 assert!(result.is_ok());
198 }
199}