actix 0.5.7

Actor framework for Rust
Documentation
//! DNS resolver and connector utility actor
//!
//! ## Example
//!
//! ```rust
//! # extern crate actix;
//! # extern crate futures;
//! # use futures::{future, Future};
//! use actix::prelude::*;
//! use actix::actors;
//!
//! fn main() {
//!     let sys = System::new("test");
//!
//!     Arbiter::handle().spawn({
//!         let resolver: Addr<Unsync, _> = actors::Connector::from_registry();
//!
//!         resolver.send(
//!             actors::Resolve::host("localhost"))       // <- resolve "localhost"
//!                 .then(|addrs| {
//!                     println!("RESULT: {:?}", addrs);
//! #                   Arbiter::system().do_send(actix::msgs::SystemExit(0));
//!                     Ok::<_, ()>(())
//!                 })
//!    });
//!
//!     Arbiter::handle().spawn({
//!         let resolver: Addr<Unsync, _> = actors::Connector::from_registry();
//!
//!         resolver.send(
//!             actors::Connect::host("localhost:5000"))  // <- connect to a "localhost"
//!                 .then(|stream| {
//!                     println!("RESULT: {:?}", stream);
//!                     Ok::<_, ()>(())
//!                 })
//!    });
//!
//!    sys.run();
//! }
//! ```
use std::collections::VecDeque;
use std::io;
use std::net::SocketAddr;
use std::time::Duration;

use futures::{Async, Future, Poll};
use tokio_core::net::{TcpStream, TcpStreamNew};
use tokio_core::reactor::Timeout;
use trust_dns_resolver::ResolverFuture;
use trust_dns_resolver::config::{ResolverConfig, ResolverOpts};
use trust_dns_resolver::lookup_ip::LookupIpFuture;

use prelude::*;

#[derive(Eq, PartialEq, Debug)]
pub struct Resolve {
    name: String,
    port: Option<u16>,
}

impl Resolve {
    pub fn host<T: AsRef<str>>(host: T) -> Resolve {
        Resolve {
            name: host.as_ref().to_owned(),
            port: None,
        }
    }

    pub fn host_and_port<T: AsRef<str>>(host: T, port: u16) -> Resolve {
        Resolve {
            name: host.as_ref().to_owned(),
            port: Some(port),
        }
    }
}

impl Message for Resolve {
    type Result = Result<VecDeque<SocketAddr>, ConnectorError>;
}

#[derive(Eq, PartialEq, Debug)]
pub struct Connect {
    name: String,
    port: Option<u16>,
    timeout: Duration,
}

impl Connect {
    pub fn host<T: AsRef<str>>(host: T) -> Connect {
        Connect {
            name: host.as_ref().to_owned(),
            port: None,
            timeout: Duration::from_secs(1),
        }
    }

    pub fn host_and_port<T: AsRef<str>>(host: T, port: u16) -> Connect {
        Connect {
            name: host.as_ref().to_owned(),
            port: Some(port),
            timeout: Duration::from_secs(1),
        }
    }

    /// Set connect timeout
    ///
    /// By default timeout is set to a 1 second.
    pub fn timeout(mut self, timeout: Duration) -> Connect {
        self.timeout = timeout;
        self
    }
}

impl Message for Connect {
    type Result = Result<TcpStream, ConnectorError>;
}

#[derive(Eq, PartialEq, Debug)]
pub struct ConnectAddr(pub SocketAddr);

impl Message for ConnectAddr {
    type Result = Result<TcpStream, ConnectorError>;
}

#[derive(Fail, Debug)]
pub enum ConnectorError {
    /// Failed to resolve the hostname
    #[fail(display = "Failed resolving hostname: {}", _0)]
    Resolver(String),

    /// Address is invalid
    #[fail(display = "Invalid input: {}", _0)]
    InvalidInput(&'static str),

    /// Connecting took too long
    #[fail(display = "Timeout out while establishing connection")]
    Timeout,

    /// Connection io error
    #[fail(display = "{}", _0)]
    IoError(io::Error),
}

pub struct Connector {
    resolver: ResolverFuture,
}

impl Actor for Connector {
    type Context = Context<Self>;
}

impl Supervised for Connector {}

impl actix::ArbiterService for Connector {}

impl Default for Connector {
    #[cfg(unix)]
    fn default() -> Connector {
        let resolver = match ResolverFuture::from_system_conf(Arbiter::handle()) {
            Ok(resolver) => resolver,
            Err(err) => {
                warn!("Can not create system dns resolver: {}", err);
                ResolverFuture::new(
                    ResolverConfig::default(),
                    ResolverOpts::default(),
                    Arbiter::handle(),
                )
            }
        };
        Connector { resolver }
    }

    #[cfg(not(unix))]
    fn default() -> Connector {
        let resolver = ResolverFuture::new(
            ResolverConfig::default(),
            ResolverOpts::default(),
            Arbiter::handle(),
        );
        Connector {
            resolver: resolver,
        }
    }
}

impl Handler<Resolve> for Connector {
    type Result = ResponseActFuture<Self, VecDeque<SocketAddr>, ConnectorError>;

    fn handle(&mut self, msg: Resolve, _: &mut Self::Context) -> Self::Result {
        Box::new(Resolver::new(
            msg.name,
            msg.port.unwrap_or(0),
            &self.resolver,
        ))
    }
}

impl Handler<Connect> for Connector {
    type Result = ResponseActFuture<Self, TcpStream, ConnectorError>;

    fn handle(&mut self, msg: Connect, _: &mut Self::Context) -> Self::Result {
        let timeout = msg.timeout;
        Box::new(
            Resolver::new(msg.name, msg.port.unwrap_or(0), &self.resolver)
                .and_then(move |addrs, _, _| TcpConnector::with_timeout(addrs, timeout)),
        )
    }
}

impl Handler<ConnectAddr> for Connector {
    type Result = ResponseActFuture<Self, TcpStream, ConnectorError>;

    fn handle(&mut self, msg: ConnectAddr, _: &mut Self::Context) -> Self::Result {
        let mut v = VecDeque::new();
        v.push_back(msg.0);
        Box::new(TcpConnector::new(v))
    }
}

/// Resolver future
struct Resolver {
    lookup: Option<LookupIpFuture>,
    port: u16,
    addrs: Option<VecDeque<SocketAddr>>,
    error: Option<ConnectorError>,
}

impl Resolver {
    pub fn new<S: AsRef<str>>(
        addr: S, port: u16, resolver: &ResolverFuture
    ) -> Resolver {
        // try to parse as a regular SocketAddr first
        if let Ok(addr) = addr.as_ref().parse() {
            let mut addrs = VecDeque::new();
            addrs.push_back(addr);

            Resolver {
                port,
                lookup: None,
                addrs: Some(addrs),
                error: None,
            }
        } else {
            // we need to do dns resolution
            match Resolver::parse(addr.as_ref(), port) {
                Ok((host, port)) => Resolver {
                    port,
                    lookup: Some(resolver.lookup_ip(host)),
                    addrs: None,
                    error: None,
                },
                Err(err) => Resolver {
                    port,
                    lookup: None,
                    addrs: None,
                    error: Some(err),
                },
            }
        }
    }

    fn parse(addr: &str, port: u16) -> Result<(&str, u16), ConnectorError> {
        macro_rules! try_opt {
            ($e:expr, $msg:expr) => {
                match $e {
                    Some(r) => r,
                    None => return Err(ConnectorError::InvalidInput($msg)),
                }
            };
        }

        // split the string by ':' and convert the second part to u16
        let mut parts_iter = addr.splitn(2, ':');
        let host = try_opt!(parts_iter.next(), "invalid socket address");
        let port_str = parts_iter.next().unwrap_or("");
        let port: u16 = port_str.parse().unwrap_or(port);

        Ok((host, port))
    }
}

impl ActorFuture for Resolver {
    type Item = VecDeque<SocketAddr>;
    type Error = ConnectorError;
    type Actor = Connector;

    fn poll(
        &mut self, _: &mut Connector, _: &mut Context<Connector>
    ) -> Poll<Self::Item, Self::Error> {
        if let Some(err) = self.error.take() {
            Err(err)
        } else if let Some(addrs) = self.addrs.take() {
            Ok(Async::Ready(addrs))
        } else {
            match self.lookup.as_mut().unwrap().poll() {
                Ok(Async::NotReady) => Ok(Async::NotReady),
                Ok(Async::Ready(ips)) => {
                    let addrs: VecDeque<_> = ips.iter()
                        .map(|ip| SocketAddr::new(ip, self.port))
                        .collect();
                    if addrs.is_empty() {
                        Err(ConnectorError::Resolver(
                            "Expect at least one A dns record".to_owned(),
                        ))
                    } else {
                        Ok(Async::Ready(addrs))
                    }
                }
                Err(err) => Err(ConnectorError::Resolver(format!("{}", err))),
            }
        }
    }
}

/// Tcp stream connector
pub struct TcpConnector {
    addrs: VecDeque<SocketAddr>,
    timeout: Timeout,
    stream: Option<TcpStreamNew>,
}

impl TcpConnector {
    pub fn new(addrs: VecDeque<SocketAddr>) -> TcpConnector {
        TcpConnector::with_timeout(addrs, Duration::from_secs(1))
    }

    pub fn with_timeout(addrs: VecDeque<SocketAddr>, timeout: Duration) -> TcpConnector {
        TcpConnector {
            addrs,
            stream: None,
            timeout: Timeout::new(timeout, Arbiter::handle()).unwrap(),
        }
    }
}

impl ActorFuture for TcpConnector {
    type Item = TcpStream;
    type Error = ConnectorError;
    type Actor = Connector;

    fn poll(
        &mut self, _: &mut Connector, _: &mut Context<Connector>
    ) -> Poll<Self::Item, Self::Error> {
        // timeout
        if let Ok(Async::Ready(_)) = self.timeout.poll() {
            return Err(ConnectorError::Timeout);
        }

        // connect
        loop {
            if let Some(new) = self.stream.as_mut() {
                match new.poll() {
                    Ok(Async::Ready(sock)) => return Ok(Async::Ready(sock)),
                    Ok(Async::NotReady) => return Ok(Async::NotReady),
                    Err(err) => {
                        if self.addrs.is_empty() {
                            return Err(ConnectorError::IoError(err));
                        }
                    }
                }
            }

            // try to connect
            let addr = self.addrs.pop_front().unwrap();
            self.stream = Some(TcpStream::connect(&addr, Arbiter::handle()));
        }
    }
}