1use std::collections::VecDeque;
2use std::marker::PhantomData;
3use std::net::{IpAddr, SocketAddr};
4use std::time::Duration;
5use std::{fmt, io};
6
7use actix_service::{fn_factory, NewService, Service};
8use futures::future::{ok, Either};
9use futures::{try_ready, Async, Future, Poll};
10use tokio_tcp::{ConnectFuture, TcpStream};
11use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
12use trust_dns_resolver::system_conf::read_system_conf;
13
14use super::resolver::{RequestHost, ResolveError, Resolver, ResolverFuture};
15
16pub trait RequestPort {
18 fn port(&self) -> u16;
19}
20
21#[derive(Debug)]
23pub enum ConnectorError {
24 Resolver(ResolveError),
27
28 NoRecords,
31
32 Timeout,
35
36 InvalidInput,
38
39 IoError(io::Error),
42}
43
44impl From<ResolveError> for ConnectorError {
45 fn from(err: ResolveError) -> Self {
46 ConnectorError::Resolver(err)
47 }
48}
49
50impl From<io::Error> for ConnectorError {
51 fn from(err: io::Error) -> Self {
52 ConnectorError::IoError(err)
53 }
54}
55
56#[derive(Eq, PartialEq, Debug, Hash)]
58pub struct Connect {
59 pub kind: ConnectKind,
60 pub timeout: Duration,
61}
62
63#[derive(Eq, PartialEq, Debug, Hash)]
64pub enum ConnectKind {
65 Host { host: String, port: u16 },
66 Addr { host: String, addr: SocketAddr },
67}
68
69impl Connect {
70 pub fn new<T: AsRef<str>>(host: T, port: u16) -> Connect {
72 Connect {
73 kind: ConnectKind::Host {
74 host: host.as_ref().to_owned(),
75 port,
76 },
77 timeout: Duration::from_secs(1),
78 }
79 }
80
81 pub fn with<T: AsRef<str>>(host: T) -> Result<Connect, ConnectorError> {
83 let mut parts_iter = host.as_ref().splitn(2, ':');
84 let host = parts_iter.next().ok_or(ConnectorError::InvalidInput)?;
85 let port_str = parts_iter.next().unwrap_or("");
86 let port = port_str
87 .parse::<u16>()
88 .map_err(|_| ConnectorError::InvalidInput)?;
89 Ok(Connect {
90 kind: ConnectKind::Host {
91 host: host.to_owned(),
92 port,
93 },
94 timeout: Duration::from_secs(1),
95 })
96 }
97
98 pub fn with_address<T: Into<String>>(host: T, addr: SocketAddr) -> Connect {
100 Connect {
101 kind: ConnectKind::Addr {
102 addr,
103 host: host.into(),
104 },
105 timeout: Duration::from_secs(1),
106 }
107 }
108
109 pub fn timeout(mut self, timeout: Duration) -> Connect {
113 self.timeout = timeout;
114 self
115 }
116}
117
118impl RequestHost for Connect {
119 fn host(&self) -> &str {
120 match self.kind {
121 ConnectKind::Host { ref host, .. } => host,
122 ConnectKind::Addr { ref host, .. } => host,
123 }
124 }
125}
126
127impl RequestPort for Connect {
128 fn port(&self) -> u16 {
129 match self.kind {
130 ConnectKind::Host { port, .. } => port,
131 ConnectKind::Addr { addr, .. } => addr.port(),
132 }
133 }
134}
135
136impl fmt::Display for Connect {
137 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
138 write!(f, "{}:{}", self.host(), self.port())
139 }
140}
141
142pub struct Connector {
144 resolver: Resolver<Connect>,
145}
146
147impl Default for Connector {
148 fn default() -> Self {
149 let (cfg, opts) = if let Ok((cfg, opts)) = read_system_conf() {
150 (cfg, opts)
151 } else {
152 (ResolverConfig::default(), ResolverOpts::default())
153 };
154
155 Connector::new(cfg, opts)
156 }
157}
158
159impl Connector {
160 pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self {
162 Connector {
163 resolver: Resolver::new(cfg, opts),
164 }
165 }
166
167 pub fn with_resolver(
169 resolver: Resolver<Connect>,
170 ) -> impl Service<Request = Connect, Response = (Connect, TcpStream), Error = ConnectorError>
171 + Clone {
172 Connector { resolver }
173 }
174
175 pub fn new_service_with_config<E>(
177 cfg: ResolverConfig,
178 opts: ResolverOpts,
179 ) -> impl NewService<
180 (),
181 Request = Connect,
182 Response = (Connect, TcpStream),
183 Error = ConnectorError,
184 InitError = E,
185 > + Clone {
186 fn_factory(move || ok(Connector::new(cfg.clone(), opts)))
187 }
188}
189
190impl Clone for Connector {
191 fn clone(&self) -> Self {
192 Connector {
193 resolver: self.resolver.clone(),
194 }
195 }
196}
197
198impl Service for Connector {
199 type Request = Connect;
200 type Response = (Connect, TcpStream);
201 type Error = ConnectorError;
202 type Future = Either<ConnectorFuture, ConnectorTcpFuture>;
203
204 fn poll_ready(&mut self) -> Poll<(), Self::Error> {
205 Ok(Async::Ready(()))
206 }
207
208 fn call(&mut self, req: Connect) -> Self::Future {
209 match req.kind {
210 ConnectKind::Host { .. } => Either::A(ConnectorFuture {
211 fut: self.resolver.call(req),
212 fut2: None,
213 }),
214 ConnectKind::Addr { addr, .. } => {
215 let mut addrs = VecDeque::new();
216 addrs.push_back(addr.ip());
217 Either::B(ConnectorTcpFuture {
218 fut: TcpConnectorResponse::new(req, addrs),
219 })
220 }
221 }
222 }
223}
224
225#[doc(hidden)]
226pub struct ConnectorFuture {
227 fut: ResolverFuture<Connect>,
228 fut2: Option<TcpConnectorResponse<Connect>>,
229}
230
231impl Future for ConnectorFuture {
232 type Item = (Connect, TcpStream);
233 type Error = ConnectorError;
234
235 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
236 if let Some(ref mut fut) = self.fut2 {
237 return fut.poll().map_err(ConnectorError::from);
238 }
239 match self.fut.poll().map_err(ConnectorError::from)? {
240 Async::Ready((req, addrs)) => {
241 if addrs.is_empty() {
242 Err(ConnectorError::NoRecords)
243 } else {
244 self.fut2 = Some(TcpConnectorResponse::new(req, addrs));
245 self.poll()
246 }
247 }
248 Async::NotReady => Ok(Async::NotReady),
249 }
250 }
251}
252
253#[doc(hidden)]
254pub struct ConnectorTcpFuture {
255 fut: TcpConnectorResponse<Connect>,
256}
257
258impl Future for ConnectorTcpFuture {
259 type Item = (Connect, TcpStream);
260 type Error = ConnectorError;
261
262 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
263 self.fut.poll().map_err(ConnectorError::IoError)
264 }
265}
266
267pub struct TcpConnector<T: RequestPort>(PhantomData<T>);
269
270impl<T: RequestPort> Default for TcpConnector<T> {
271 fn default() -> TcpConnector<T> {
272 TcpConnector(PhantomData)
273 }
274}
275
276impl<T: RequestPort> Service for TcpConnector<T> {
277 type Request = (T, VecDeque<IpAddr>);
278 type Response = (T, TcpStream);
279 type Error = io::Error;
280 type Future = TcpConnectorResponse<T>;
281
282 fn poll_ready(&mut self) -> Poll<(), Self::Error> {
283 Ok(Async::Ready(()))
284 }
285
286 fn call(&mut self, (req, addrs): (T, VecDeque<IpAddr>)) -> Self::Future {
287 TcpConnectorResponse::new(req, addrs)
288 }
289}
290
291#[doc(hidden)]
292pub struct TcpConnectorResponse<T: RequestPort> {
294 port: u16,
295 req: Option<T>,
296 addr: Option<SocketAddr>,
297 addrs: VecDeque<IpAddr>,
298 stream: Option<ConnectFuture>,
299}
300
301impl<T: RequestPort> TcpConnectorResponse<T> {
302 pub fn new(req: T, addrs: VecDeque<IpAddr>) -> TcpConnectorResponse<T> {
303 TcpConnectorResponse {
304 addrs,
305 port: req.port(),
306 req: Some(req),
307 addr: None,
308 stream: None,
309 }
310 }
311}
312
313impl<T: RequestPort> Future for TcpConnectorResponse<T> {
314 type Item = (T, TcpStream);
315 type Error = io::Error;
316
317 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
318 loop {
320 if let Some(new) = self.stream.as_mut() {
321 match new.poll() {
322 Ok(Async::Ready(sock)) => {
323 return Ok(Async::Ready((self.req.take().unwrap(), sock)));
324 }
325 Ok(Async::NotReady) => return Ok(Async::NotReady),
326 Err(err) => {
327 if self.addrs.is_empty() {
328 return Err(err);
329 }
330 }
331 }
332 }
333
334 let addr = SocketAddr::new(self.addrs.pop_front().unwrap(), self.port);
336 self.stream = Some(TcpStream::connect(&addr));
337 self.addr = Some(addr)
338 }
339 }
340}
341
342#[derive(Clone)]
343pub struct DefaultConnector(Connector);
344
345impl Default for DefaultConnector {
346 fn default() -> Self {
347 DefaultConnector(Connector::default())
348 }
349}
350
351impl DefaultConnector {
352 pub fn new(cfg: ResolverConfig, opts: ResolverOpts) -> Self {
353 DefaultConnector(Connector::new(cfg, opts))
354 }
355}
356
357impl Service for DefaultConnector {
358 type Request = Connect;
359 type Response = TcpStream;
360 type Error = ConnectorError;
361 type Future = DefaultConnectorFuture;
362
363 fn poll_ready(&mut self) -> Poll<(), Self::Error> {
364 self.0.poll_ready()
365 }
366
367 fn call(&mut self, req: Connect) -> Self::Future {
368 DefaultConnectorFuture {
369 fut: self.0.call(req),
370 }
371 }
372}
373
374#[doc(hidden)]
375pub struct DefaultConnectorFuture {
376 fut: Either<ConnectorFuture, ConnectorTcpFuture>,
377}
378
379impl Future for DefaultConnectorFuture {
380 type Item = TcpStream;
381 type Error = ConnectorError;
382
383 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
384 Ok(Async::Ready(try_ready!(self.fut.poll()).1))
385 }
386}