tokio-ping 0.3.0

Asynchronous ICMP pinging library
Documentation
use std::io;
use std::sync::Arc;

use futures;
use std::net::SocketAddr;
use mio::Ready;
use tokio_reactor::{Handle, PollEvented};
use socket2::{Domain, Protocol, SockAddr, Type};

use socket::mio;

#[derive(Clone)]
pub struct Socket {
    socket: Arc<PollEvented<mio::Socket>>,
}

impl Socket {
    pub fn new(
        domain: Domain,
        type_: Type,
        protocol: Protocol,
        handle: &Handle,
    ) -> io::Result<Self> {
        let socket = mio::Socket::new(domain, type_, protocol)?;
        let socket = PollEvented::new_with_handle(socket, handle)?;
        Ok(Self {
            socket: Arc::new(socket),
        })
    }

    pub fn send_to<T>(&self, buf: T, target: &SocketAddr) -> Send<T>
    where
        T: AsRef<[u8]>,
    {
        Send {
            state: SendState::Writing {
                socket: self.socket.clone(),
                addr: target.clone().into(),
                buf: buf,
            },
        }
    }

    pub fn recv(&self, buffer: &mut [u8]) -> futures::Poll<usize, io::Error> {
        try_ready!(self.socket.poll_read_ready(Ready::readable()));

        match self.socket.get_ref().recv(buffer) {
            Ok(n) => Ok(n.into()),
            Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => {
                self.socket.clear_read_ready(Ready::readable())?;
                Ok(futures::Async::NotReady)
            }
            Err(e) => Err(e),
        }
    }
}

pub struct Send<T> {
    state: SendState<T>,
}

enum SendState<T> {
    Writing {
        socket: Arc<PollEvented<mio::Socket>>,
        buf: T,
        addr: SockAddr,
    },
    Empty,
}

fn send_to(
    socket: &Arc<PollEvented<mio::Socket>>,
    buf: &[u8],
    target: &SockAddr,
) -> futures::Poll<usize, io::Error> {
    try_ready!(socket.poll_write_ready());

    match socket.get_ref().send_to(buf, target) {
        Ok(n) => Ok(n.into()),
        Err(e) => {
            if e.kind() == io::ErrorKind::WouldBlock {
                socket.clear_write_ready()?;
                Ok(futures::Async::NotReady)
            } else {
                Err(e)
            }
        }
    }
}

impl<T> futures::Future for Send<T>
where
    T: AsRef<[u8]>,
{
    type Item = ();
    type Error = io::Error;

    fn poll(&mut self) -> futures::Poll<(), io::Error> {
        match self.state {
            SendState::Writing {
                ref socket,
                ref buf,
                ref addr,
            } => {
                let n = try_ready!(send_to(socket, buf.as_ref(), addr));
                if n != buf.as_ref().len() {
                    return Err(io::Error::new(
                        io::ErrorKind::Other,
                        "failed to send entire packet",
                    ));
                }
            }
            SendState::Empty => panic!("poll a Send after it's done"),
        }

        match ::std::mem::replace(&mut self.state, SendState::Empty) {
            SendState::Writing { .. } => Ok(futures::Async::Ready(())),
            SendState::Empty => unreachable!(),
        }
    }
}