use std::collections::VecDeque;
use std::io;
use std::net::SocketAddr;
use std::time::Duration;
use futures::{Async, Future, Poll};
use tokio_core::net::{TcpStream, TcpStreamNew};
use tokio_core::reactor::Timeout;
use trust_dns_resolver::ResolverFuture;
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use trust_dns_resolver::lookup_ip::LookupIpFuture;
use prelude::*;
#[derive(Eq, PartialEq, Debug)]
pub struct Resolve {
name: String,
port: Option<u16>,
}
impl Resolve {
pub fn host<T: AsRef<str>>(host: T) -> Resolve {
Resolve {
name: host.as_ref().to_owned(),
port: None,
}
}
pub fn host_and_port<T: AsRef<str>>(host: T, port: u16) -> Resolve {
Resolve {
name: host.as_ref().to_owned(),
port: Some(port),
}
}
}
impl Message for Resolve {
type Result = Result<VecDeque<SocketAddr>, ConnectorError>;
}
#[derive(Eq, PartialEq, Debug)]
pub struct Connect {
name: String,
port: Option<u16>,
timeout: Duration,
}
impl Connect {
pub fn host<T: AsRef<str>>(host: T) -> Connect {
Connect {
name: host.as_ref().to_owned(),
port: None,
timeout: Duration::from_secs(1),
}
}
pub fn host_and_port<T: AsRef<str>>(host: T, port: u16) -> Connect {
Connect {
name: host.as_ref().to_owned(),
port: Some(port),
timeout: Duration::from_secs(1),
}
}
pub fn timeout(mut self, timeout: Duration) -> Connect {
self.timeout = timeout;
self
}
}
impl Message for Connect {
type Result = Result<TcpStream, ConnectorError>;
}
#[derive(Eq, PartialEq, Debug)]
pub struct ConnectAddr(pub SocketAddr);
impl Message for ConnectAddr {
type Result = Result<TcpStream, ConnectorError>;
}
#[derive(Fail, Debug)]
pub enum ConnectorError {
#[fail(display = "Failed resolving hostname: {}", _0)]
Resolver(String),
#[fail(display = "Invalid input: {}", _0)]
InvalidInput(&'static str),
#[fail(display = "Timeout out while establishing connection")]
Timeout,
#[fail(display = "{}", _0)]
IoError(io::Error),
}
pub struct Connector {
resolver: ResolverFuture,
}
impl Actor for Connector {
type Context = Context<Self>;
}
impl Supervised for Connector {}
impl actix::ArbiterService for Connector {}
impl Default for Connector {
#[cfg(unix)]
fn default() -> Connector {
let resolver = match ResolverFuture::from_system_conf(Arbiter::handle()) {
Ok(resolver) => resolver,
Err(err) => {
warn!("Can not create system dns resolver: {}", err);
ResolverFuture::new(
ResolverConfig::default(),
ResolverOpts::default(),
Arbiter::handle(),
)
}
};
Connector { resolver }
}
#[cfg(not(unix))]
fn default() -> Connector {
let resolver = ResolverFuture::new(
ResolverConfig::default(),
ResolverOpts::default(),
Arbiter::handle(),
);
Connector {
resolver: resolver,
}
}
}
impl Handler<Resolve> for Connector {
type Result = ResponseActFuture<Self, VecDeque<SocketAddr>, ConnectorError>;
fn handle(&mut self, msg: Resolve, _: &mut Self::Context) -> Self::Result {
Box::new(Resolver::new(
msg.name,
msg.port.unwrap_or(0),
&self.resolver,
))
}
}
impl Handler<Connect> for Connector {
type Result = ResponseActFuture<Self, TcpStream, ConnectorError>;
fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result {
let timeout = msg.timeout;
Box::new(
Resolver::new(msg.name, msg.port.unwrap_or(0), &self.resolver)
.and_then(move |addrs, _, _| TcpConnector::with_timeout(addrs, timeout)),
)
}
}
impl Handler<ConnectAddr> for Connector {
type Result = ResponseActFuture<Self, TcpStream, ConnectorError>;
fn handle(&mut self, msg: ConnectAddr, _: &mut Self::Context) -> Self::Result {
let mut v = VecDeque::new();
v.push_back(msg.0);
Box::new(TcpConnector::new(v))
}
}
struct Resolver {
lookup: Option<LookupIpFuture>,
port: u16,
addrs: Option<VecDeque<SocketAddr>>,
error: Option<ConnectorError>,
}
impl Resolver {
pub fn new<S: AsRef<str>>(
addr: S, port: u16, resolver: &ResolverFuture
) -> Resolver {
if let Ok(addr) = addr.as_ref().parse() {
let mut addrs = VecDeque::new();
addrs.push_back(addr);
Resolver {
port,
lookup: None,
addrs: Some(addrs),
error: None,
}
} else {
match Resolver::parse(addr.as_ref(), port) {
Ok((host, port)) => Resolver {
port,
lookup: Some(resolver.lookup_ip(host)),
addrs: None,
error: None,
},
Err(err) => Resolver {
port,
lookup: None,
addrs: None,
error: Some(err),
},
}
}
}
fn parse(addr: &str, port: u16) -> Result<(&str, u16), ConnectorError> {
macro_rules! try_opt {
($e:expr, $msg:expr) => {
match $e {
Some(r) => r,
None => return Err(ConnectorError::InvalidInput($msg)),
}
};
}
let mut parts_iter = addr.splitn(2, ':');
let host = try_opt!(parts_iter.next(), "invalid socket address");
let port_str = parts_iter.next().unwrap_or("");
let port: u16 = port_str.parse().unwrap_or(port);
Ok((host, port))
}
}
impl ActorFuture for Resolver {
type Item = VecDeque<SocketAddr>;
type Error = ConnectorError;
type Actor = Connector;
fn poll(
&mut self, _: &mut Connector, _: &mut Context<Connector>
) -> Poll<Self::Item, Self::Error> {
if let Some(err) = self.error.take() {
Err(err)
} else if let Some(addrs) = self.addrs.take() {
Ok(Async::Ready(addrs))
} else {
match self.lookup.as_mut().unwrap().poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Ok(Async::Ready(ips)) => {
let addrs: VecDeque<_> = ips.iter()
.map(|ip| SocketAddr::new(ip, self.port))
.collect();
if addrs.is_empty() {
Err(ConnectorError::Resolver(
"Expect at least one A dns record".to_owned(),
))
} else {
Ok(Async::Ready(addrs))
}
}
Err(err) => Err(ConnectorError::Resolver(format!("{}", err))),
}
}
}
}
pub struct TcpConnector {
addrs: VecDeque<SocketAddr>,
timeout: Timeout,
stream: Option<TcpStreamNew>,
}
impl TcpConnector {
pub fn new(addrs: VecDeque<SocketAddr>) -> TcpConnector {
TcpConnector::with_timeout(addrs, Duration::from_secs(1))
}
pub fn with_timeout(addrs: VecDeque<SocketAddr>, timeout: Duration) -> TcpConnector {
TcpConnector {
addrs,
stream: None,
timeout: Timeout::new(timeout, Arbiter::handle()).unwrap(),
}
}
}
impl ActorFuture for TcpConnector {
type Item = TcpStream;
type Error = ConnectorError;
type Actor = Connector;
fn poll(
&mut self, _: &mut Connector, _: &mut Context<Connector>
) -> Poll<Self::Item, Self::Error> {
if let Ok(Async::Ready(_)) = self.timeout.poll() {
return Err(ConnectorError::Timeout);
}
loop {
if let Some(new) = self.stream.as_mut() {
match new.poll() {
Ok(Async::Ready(sock)) => return Ok(Async::Ready(sock)),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
if self.addrs.is_empty() {
return Err(ConnectorError::IoError(err));
}
}
}
}
let addr = self.addrs.pop_front().unwrap();
self.stream = Some(TcpStream::connect(&addr, Arbiter::handle()));
}
}
}