actix_connector/
connector.rs

1use std::collections::VecDeque;
2use std::marker::PhantomData;
3use std::net::{IpAddr, SocketAddr};
4use std::time::Duration;
5use std::{fmt, io};
6
7use actix_service::{fn_factory, NewService, Service};
8use futures::future::{ok, Either};
9use futures::{try_ready, Async, Future, Poll};
10use tokio_tcp::{ConnectFuture, TcpStream};
11use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
12use trust_dns_resolver::system_conf::read_system_conf;
13
14use super::resolver::{RequestHost, ResolveError, Resolver, ResolverFuture};
15
16/// Port of the request
17pub trait RequestPort {
18    fn port(&self) -> u16;
19}
20
21// #[derive(Fail, Debug)]
22#[derive(Debug)]
23pub enum ConnectorError {
24    /// Failed to resolve the hostname
25    // #[fail(display = "Failed resolving hostname: {}", _0)]
26    Resolver(ResolveError),
27
28    /// No dns records
29    // #[fail(display = "No dns records found for the input")]
30    NoRecords,
31
32    /// Connecting took too long
33    // #[fail(display = "Timeout out while establishing connection")]
34    Timeout,
35
36    /// Invalid input
37    InvalidInput,
38
39    /// Connection io error
40    // #[fail(display = "{}", _0)]
41    IoError(io::Error),
42}
43
44impl From<ResolveError> for ConnectorError {
45    fn from(err: ResolveError) -> Self {
46        ConnectorError::Resolver(err)
47    }
48}
49
50impl From<io::Error> for ConnectorError {
51    fn from(err: io::Error) -> Self {
52        ConnectorError::IoError(err)
53    }
54}
55
56/// Connect request
57#[derive(Eq, PartialEq, Debug, Hash)]
58pub struct Connect {
59    pub kind: ConnectKind,
60    pub timeout: Duration,
61}
62
63#[derive(Eq, PartialEq, Debug, Hash)]
64pub enum ConnectKind {
65    Host { host: String, port: u16 },
66    Addr { host: String, addr: SocketAddr },
67}
68
69impl Connect {
70    /// Create new `Connect` instance.
71    pub fn new<T: AsRef<str>>(host: T, port: u16) -> Connect {
72        Connect {
73            kind: ConnectKind::Host {
74                host: host.as_ref().to_owned(),
75                port,
76            },
77            timeout: Duration::from_secs(1),
78        }
79    }
80
81    /// Create `Connect` instance by spliting the string by ':' and convert the second part to u16
82    pub fn with<T: AsRef<str>>(host: T) -> Result<Connect, ConnectorError> {
83        let mut parts_iter = host.as_ref().splitn(2, ':');
84        let host = parts_iter.next().ok_or(ConnectorError::InvalidInput)?;
85        let port_str = parts_iter.next().unwrap_or("");
86        let port = port_str
87            .parse::<u16>()
88            .map_err(|_| ConnectorError::InvalidInput)?;
89        Ok(Connect {
90            kind: ConnectKind::Host {
91                host: host.to_owned(),
92                port,
93            },
94            timeout: Duration::from_secs(1),
95        })
96    }
97
98    /// Create new `Connect` instance from host and address. Connector skips name resolution stage for such connect messages.
99    pub fn with_address<T: Into<String>>(host: T, addr: SocketAddr) -> Connect {
100        Connect {
101            kind: ConnectKind::Addr {
102                addr,
103                host: host.into(),
104            },
105            timeout: Duration::from_secs(1),
106        }
107    }
108
109    /// Set connect timeout
110    ///
111    /// By default timeout is set to a 1 second.
112    pub fn timeout(mut self, timeout: Duration) -> Connect {
113        self.timeout = timeout;
114        self
115    }
116}
117
118impl RequestHost for Connect {
119    fn host(&self) -> &str {
120        match self.kind {
121            ConnectKind::Host { ref host, .. } => host,
122            ConnectKind::Addr { ref host, .. } => host,
123        }
124    }
125}
126
127impl RequestPort for Connect {
128    fn port(&self) -> u16 {
129        match self.kind {
130            ConnectKind::Host { port, .. } => port,
131            ConnectKind::Addr { addr, .. } => addr.port(),
132        }
133    }
134}
135
136impl fmt::Display for Connect {
137    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
138        write!(f, "{}:{}", self.host(), self.port())
139    }
140}
141
142/// Tcp connector
143pub struct Connector {
144    resolver: Resolver<Connect>,
145}
146
147impl Default for Connector {
148    fn default() -> Self {
149        let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() {
150            (cfg, opts)
151        } else {
152            (ResolverConfig::default(), ResolverOpts::default())
153        };
154
155        Connector::new(cfg, opts)
156    }
157}
158
159impl Connector {
160    /// Create new connector with resolver configuration
161    pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self {
162        Connector {
163            resolver: Resolver::new(cfg, opts),
164        }
165    }
166
167    /// Create new connector with custom resolver
168    pub fn with_resolver(
169        resolver: Resolver<Connect>,
170    ) -> impl Service<Request = Connect, Response = (Connect, TcpStream), Error = ConnectorError>
171                 + Clone {
172        Connector { resolver }
173    }
174
175    /// Create new default connector service
176    pub fn new_service_with_config<E>(
177        cfg: ResolverConfig,
178        opts: ResolverOpts,
179    ) -> impl NewService<
180        (),
181        Request = Connect,
182        Response = (Connect, TcpStream),
183        Error = ConnectorError,
184        InitError = E,
185    > + Clone {
186        fn_factory(move || ok(Connector::new(cfg.clone(), opts)))
187    }
188}
189
190impl Clone for Connector {
191    fn clone(&self) -> Self {
192        Connector {
193            resolver: self.resolver.clone(),
194        }
195    }
196}
197
198impl Service for Connector {
199    type Request = Connect;
200    type Response = (Connect, TcpStream);
201    type Error = ConnectorError;
202    type Future = Either<ConnectorFuture, ConnectorTcpFuture>;
203
204    fn poll_ready(&mut self) -> Poll<(), Self::Error> {
205        Ok(Async::Ready(()))
206    }
207
208    fn call(&mut self, req: Connect) -> Self::Future {
209        match req.kind {
210            ConnectKind::Host { .. } => Either::A(ConnectorFuture {
211                fut: self.resolver.call(req),
212                fut2: None,
213            }),
214            ConnectKind::Addr { addr, .. } => {
215                let mut addrs = VecDeque::new();
216                addrs.push_back(addr.ip());
217                Either::B(ConnectorTcpFuture {
218                    fut: TcpConnectorResponse::new(req, addrs),
219                })
220            }
221        }
222    }
223}
224
225#[doc(hidden)]
226pub struct ConnectorFuture {
227    fut: ResolverFuture<Connect>,
228    fut2: Option<TcpConnectorResponse<Connect>>,
229}
230
231impl Future for ConnectorFuture {
232    type Item = (Connect, TcpStream);
233    type Error = ConnectorError;
234
235    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
236        if let Some(ref mut fut) = self.fut2 {
237            return fut.poll().map_err(ConnectorError::from);
238        }
239        match self.fut.poll().map_err(ConnectorError::from)? {
240            Async::Ready((req, addrs)) => {
241                if addrs.is_empty() {
242                    Err(ConnectorError::NoRecords)
243                } else {
244                    self.fut2 = Some(TcpConnectorResponse::new(req, addrs));
245                    self.poll()
246                }
247            }
248            Async::NotReady => Ok(Async::NotReady),
249        }
250    }
251}
252
253#[doc(hidden)]
254pub struct ConnectorTcpFuture {
255    fut: TcpConnectorResponse<Connect>,
256}
257
258impl Future for ConnectorTcpFuture {
259    type Item = (Connect, TcpStream);
260    type Error = ConnectorError;
261
262    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
263        self.fut.poll().map_err(ConnectorError::IoError)
264    }
265}
266
267/// Tcp stream connector service
268pub struct TcpConnector<T: RequestPort>(PhantomData<T>);
269
270impl<T: RequestPort> Default for TcpConnector<T> {
271    fn default() -> TcpConnector<T> {
272        TcpConnector(PhantomData)
273    }
274}
275
276impl<T: RequestPort> Service for TcpConnector<T> {
277    type Request = (T, VecDeque<IpAddr>);
278    type Response = (T, TcpStream);
279    type Error = io::Error;
280    type Future = TcpConnectorResponse<T>;
281
282    fn poll_ready(&mut self) -> Poll<(), Self::Error> {
283        Ok(Async::Ready(()))
284    }
285
286    fn call(&mut self, (req, addrs): (T, VecDeque<IpAddr>)) -> Self::Future {
287        TcpConnectorResponse::new(req, addrs)
288    }
289}
290
291#[doc(hidden)]
292/// Tcp stream connector response future
293pub struct TcpConnectorResponse<T: RequestPort> {
294    port: u16,
295    req: Option<T>,
296    addr: Option<SocketAddr>,
297    addrs: VecDeque<IpAddr>,
298    stream: Option<ConnectFuture>,
299}
300
301impl<T: RequestPort> TcpConnectorResponse<T> {
302    pub fn new(req: T, addrs: VecDeque<IpAddr>) -> TcpConnectorResponse<T> {
303        TcpConnectorResponse {
304            addrs,
305            port: req.port(),
306            req: Some(req),
307            addr: None,
308            stream: None,
309        }
310    }
311}
312
313impl<T: RequestPort> Future for TcpConnectorResponse<T> {
314    type Item = (T, TcpStream);
315    type Error = io::Error;
316
317    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
318        // connect
319        loop {
320            if let Some(new) = self.stream.as_mut() {
321                match new.poll() {
322                    Ok(Async::Ready(sock)) => {
323                        return Ok(Async::Ready((self.req.take().unwrap(), sock)));
324                    }
325                    Ok(Async::NotReady) => return Ok(Async::NotReady),
326                    Err(err) => {
327                        if self.addrs.is_empty() {
328                            return Err(err);
329                        }
330                    }
331                }
332            }
333
334            // try to connect
335            let addr = SocketAddr::new(self.addrs.pop_front().unwrap(), self.port);
336            self.stream = Some(TcpStream::connect(&addr));
337            self.addr = Some(addr)
338        }
339    }
340}
341
342#[derive(Clone)]
343pub struct DefaultConnector(Connector);
344
345impl Default for DefaultConnector {
346    fn default() -> Self {
347        DefaultConnector(Connector::default())
348    }
349}
350
351impl DefaultConnector {
352    pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self {
353        DefaultConnector(Connector::new(cfg, opts))
354    }
355}
356
357impl Service for DefaultConnector {
358    type Request = Connect;
359    type Response = TcpStream;
360    type Error = ConnectorError;
361    type Future = DefaultConnectorFuture;
362
363    fn poll_ready(&mut self) -> Poll<(), Self::Error> {
364        self.0.poll_ready()
365    }
366
367    fn call(&mut self, req: Connect) -> Self::Future {
368        DefaultConnectorFuture {
369            fut: self.0.call(req),
370        }
371    }
372}
373
374#[doc(hidden)]
375pub struct DefaultConnectorFuture {
376    fut: Either<ConnectorFuture, ConnectorTcpFuture>,
377}
378
379impl Future for DefaultConnectorFuture {
380    type Item = TcpStream;
381    type Error = ConnectorError;
382
383    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
384        Ok(Async::Ready(try_ready!(self.fut.poll()).1))
385    }
386}