ntex_net/connect/
service.rs

1use std::{collections::VecDeque, io, marker::PhantomData, net::SocketAddr};
2
3use ntex_io::{Io, IoConfig, types};
4use ntex_service::cfg::{Cfg, SharedCfg};
5use ntex_service::{Service, ServiceCtx, ServiceFactory};
6use ntex_util::{future::Either, time::timeout_checked};
7
8use super::{Address, Connect, ConnectError, ConnectServiceError, resolve};
9
10#[derive(Copy, Clone, Debug)]
11/// Basic tcp stream connector
12pub struct Connector<T>(PhantomData<T>);
13
14#[derive(Copy, Clone, Debug)]
15/// Basic tcp stream connector
16pub struct ConnectorService<T> {
17    cfg: Cfg<IoConfig>,
18    shared: SharedCfg,
19    _t: PhantomData<T>,
20}
21
22impl<T> Connector<T> {
23    /// Construct new connect service with default configuration
24    pub fn new() -> Self {
25        Connector(PhantomData)
26    }
27}
28
29impl<T> Default for Connector<T> {
30    fn default() -> Self {
31        Self::new()
32    }
33}
34
35impl<T> ConnectorService<T> {
36    #[inline]
37    /// Construct new connect service with default configuration
38    pub fn new() -> Self {
39        ConnectorService::with(Default::default())
40    }
41
42    #[inline]
43    /// Construct new connect service with custom configuration
44    pub fn with(cfg: SharedCfg) -> Self {
45        ConnectorService {
46            cfg: cfg.get(),
47            shared: cfg,
48            _t: PhantomData,
49        }
50    }
51}
52
53impl<T> Default for ConnectorService<T> {
54    fn default() -> Self {
55        ConnectorService::new()
56    }
57}
58
59impl<T: Address> ConnectorService<T> {
60    /// Resolve and connect to remote host
61    pub async fn connect<U>(&self, message: U) -> Result<Io, ConnectError>
62    where
63        Connect<T>: From<U>,
64    {
65        timeout_checked(self.cfg.connect_timeout(), async {
66            // resolve first
67            let msg = resolve::lookup(message.into(), self.shared.tag()).await?;
68
69            let port = msg.port();
70            let Connect { req, addr, .. } = msg;
71
72            if let Some(addr) = addr {
73                connect(req, port, addr, self.shared).await
74            } else if let Some(addr) = req.addr() {
75                connect(req, addr.port(), Either::Left(addr), self.shared).await
76            } else {
77                log::error!("{}: TCP connector: got unresolved address", self.cfg.tag());
78                Err(ConnectError::Unresolved)
79            }
80        })
81        .await
82        .map_err(|_| {
83            ConnectError::Io(io::Error::new(io::ErrorKind::TimedOut, "Connect timeout"))
84        })
85        .and_then(|item| item)
86    }
87}
88
89impl<T: Address> ServiceFactory<Connect<T>, SharedCfg> for Connector<T> {
90    type Response = Io;
91    type Error = ConnectError;
92    type Service = ConnectorService<T>;
93    type InitError = ConnectServiceError;
94
95    async fn create(&self, cfg: SharedCfg) -> Result<Self::Service, Self::InitError> {
96        Ok(ConnectorService::with(cfg))
97    }
98}
99
100impl<T: Address> Service<Connect<T>> for ConnectorService<T> {
101    type Response = Io;
102    type Error = ConnectError;
103
104    async fn call(
105        &self,
106        req: Connect<T>,
107        _: ServiceCtx<'_, Self>,
108    ) -> Result<Self::Response, Self::Error> {
109        self.connect(req).await
110    }
111}
112
113/// Tcp stream connector
114async fn connect<T: Address>(
115    req: T,
116    port: u16,
117    addr: Either<SocketAddr, VecDeque<SocketAddr>>,
118    cfg: SharedCfg,
119) -> Result<Io, ConnectError> {
120    log::trace!(
121        "{}: TCP connector - connecting to {:?} addr:{addr:?} port:{port}",
122        cfg.tag(),
123        req.host(),
124    );
125
126    let io = match addr {
127        Either::Left(addr) => crate::tcp_connect(addr, cfg).await?,
128        Either::Right(mut addrs) => loop {
129            let addr = addrs.pop_front().unwrap();
130
131            match crate::tcp_connect(addr, cfg).await {
132                Ok(io) => break io,
133                Err(err) => {
134                    log::trace!(
135                        "{}: TCP connector - failed to connect to {:?} port: {port} err: {err:?}",
136                        cfg.tag(),
137                        req.host(),
138                    );
139                    if addrs.is_empty() {
140                        return Err(err.into());
141                    }
142                }
143            }
144        },
145    };
146
147    log::trace!(
148        "{}: TCP connector - successfully connected to {:?} - {:?}",
149        cfg.tag(),
150        req.host(),
151        io.query::<types::PeerAddr>().get()
152    );
153    Ok(io)
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159
160    use ntex_util::time::Millis;
161
162    #[ntex::test]
163    async fn test_connect() {
164        let server = ntex::server::test_server(async || {
165            ntex_service::fn_service(|_| async { Ok::<_, ()>(()) })
166        });
167
168        let srv = Connector::default()
169            .create(
170                SharedCfg::new("T")
171                    .add(IoConfig::new().set_connect_timeout(Millis(5000)))
172                    .into(),
173            )
174            .await
175            .unwrap();
176        let result = srv.connect("").await;
177        assert!(result.is_err());
178        let result = srv.connect("localhost:99999").await;
179        assert!(result.is_err());
180        assert!(format!("{srv:?}").contains("Connector"));
181
182        let srv = ConnectorService::default();
183        let result = srv.connect(format!("{}", server.addr())).await;
184        assert!(result.is_ok());
185
186        let msg = Connect::new(format!("{}", server.addr())).set_addrs(vec![
187            format!("127.0.0.1:{}", server.addr().port() - 1)
188                .parse()
189                .unwrap(),
190            server.addr(),
191        ]);
192        let result = crate::connect::connect(msg).await;
193        assert!(result.is_ok());
194
195        let msg = Connect::new(server.addr());
196        let result = crate::connect::connect(msg).await;
197        assert!(result.is_ok());
198    }
199}