1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use crate::{CmsgIter, Frame, Timestamping};
use futures::future::poll_fn;
use futures::ready;
use mio::event::Evented;
use mio::unix::{EventedFd, UnixReady};
use mio::Ready;
use mio::{PollOpt, Token};
use std::ffi::CStr;
use std::io::{ErrorKind, Result};
use std::os::unix::io::{AsRawFd, RawFd};
use std::task::Poll;
use tokio::io::PollEvented;

pub struct Socket(PollEvented<Inner>);

impl Socket {
    pub fn bind<I>(ifname: I) -> Result<Self>
    where
        I: AsRef<CStr>,
    {
        let socket = crate::Socket::bind(ifname)?;
        socket.set_nonblocking(true)?;
        Ok(Self(PollEvented::new(Inner(socket))?))
    }

    pub fn set_timestamping(&self, timestamping: Timestamping) -> Result<()> {
        self.0.get_ref().0.set_timestamping(timestamping)
    }

    pub fn set_recv_own_msgs(&self, enable: bool) -> Result<()> {
        self.0.get_ref().0.set_recv_own_msgs(enable)
    }

    pub fn set_fd_frames(&self, enable: bool) -> Result<()> {
        self.0.get_ref().0.set_fd_frames(enable)
    }

    pub async fn recv(&mut self) -> Result<Frame> {
        let ready = Ready::readable() | Ready::from(UnixReady::error());
        poll_fn(|cx| {
            ready!(self.0.poll_read_ready(cx, ready))?;
            match self.0.get_ref().0.recv() {
                Err(e) if e.kind() == ErrorKind::WouldBlock => {
                    self.0.clear_read_ready(cx, ready)?;
                    Poll::Pending
                }
                r => Poll::Ready(r),
            }
        })
        .await
    }

    #[allow(clippy::needless_lifetimes)]
    pub async fn recv_msg<'a>(
        &mut self,
        cmsg_buf: &'a mut [u8],
    ) -> Result<(Frame, Option<CmsgIter<'a>>)> {
        let ready = Ready::readable() | Ready::from(UnixReady::error());
        let mut cmsg_buf = Some(cmsg_buf);
        poll_fn(|cx| {
            ready!(self.0.poll_read_ready(cx, ready))?;
            match self.0.get_ref().0._recv_msg(cmsg_buf.take().unwrap()) {
                Err((e, b)) if e.kind() == ErrorKind::WouldBlock => {
                    cmsg_buf = Some(b);
                    self.0.clear_read_ready(cx, ready)?;
                    Poll::Pending
                }
                r => Poll::Ready(r.map_err(|(e, _)| e)),
            }
        })
        .await
    }

    pub async fn send(&mut self, frame: &Frame) -> Result<()> {
        poll_fn(|cx| {
            ready!(self.0.poll_write_ready(cx))?;
            match self.0.get_ref().0.send(frame) {
                Err(e) if e.kind() == ErrorKind::WouldBlock => {
                    self.0.clear_write_ready(cx)?;
                    Poll::Pending
                }
                r => Poll::Ready(r),
            }
        })
        .await
    }
}

impl AsRawFd for Socket {
    fn as_raw_fd(&self) -> RawFd {
        self.0.get_ref().0.as_raw_fd()
    }
}

struct Inner(crate::Socket);

impl Evented for Inner {
    fn register(
        &self,
        poll: &mio::Poll,
        token: Token,
        interest: Ready,
        opts: PollOpt,
    ) -> Result<()> {
        EventedFd(&self.0.as_raw_fd()).register(poll, token, interest, opts)
    }

    fn reregister(
        &self,
        poll: &mio::Poll,
        token: Token,
        interest: Ready,
        opts: PollOpt,
    ) -> Result<()> {
        EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest, opts)
    }

    fn deregister(&self, poll: &mio::Poll) -> Result<()> {
        EventedFd(&self.0.as_raw_fd()).deregister(poll)
    }
}

#[cfg(test)]
mod tests;