1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
use crate::{CmsgIter, Frame, Timestamping}; use futures::future::poll_fn; use futures::ready; use mio::event::Evented; use mio::unix::{EventedFd, UnixReady}; use mio::Ready; use mio::{PollOpt, Token}; use std::ffi::CStr; use std::io::{ErrorKind, Result}; use std::os::unix::io::{AsRawFd, RawFd}; use std::task::Poll; use tokio::io::PollEvented; pub struct Socket(PollEvented<Inner>); impl Socket { pub fn bind<I>(ifname: I) -> Result<Self> where I: AsRef<CStr>, { let socket = crate::Socket::bind(ifname)?; socket.set_nonblocking(true)?; Ok(Self(PollEvented::new(Inner(socket))?)) } pub fn set_timestamping(&self, timestamping: Timestamping) -> Result<()> { self.0.get_ref().0.set_timestamping(timestamping) } pub fn set_recv_own_msgs(&self, enable: bool) -> Result<()> { self.0.get_ref().0.set_recv_own_msgs(enable) } pub fn set_fd_frames(&self, enable: bool) -> Result<()> { self.0.get_ref().0.set_fd_frames(enable) } pub async fn recv(&mut self) -> Result<Frame> { let ready = Ready::readable() | Ready::from(UnixReady::error()); poll_fn(|cx| { ready!(self.0.poll_read_ready(cx, ready))?; match self.0.get_ref().0.recv() { Err(e) if e.kind() == ErrorKind::WouldBlock => { self.0.clear_read_ready(cx, ready)?; Poll::Pending } r => Poll::Ready(r), } }) .await } #[allow(clippy::needless_lifetimes)] pub async fn recv_msg<'a>( &mut self, cmsg_buf: &'a mut [u8], ) -> Result<(Frame, Option<CmsgIter<'a>>)> { let ready = Ready::readable() | Ready::from(UnixReady::error()); let mut cmsg_buf = Some(cmsg_buf); poll_fn(|cx| { ready!(self.0.poll_read_ready(cx, ready))?; match self.0.get_ref().0._recv_msg(cmsg_buf.take().unwrap()) { Err((e, b)) if e.kind() == ErrorKind::WouldBlock => { cmsg_buf = Some(b); self.0.clear_read_ready(cx, ready)?; Poll::Pending } r => Poll::Ready(r.map_err(|(e, _)| e)), } }) .await } pub async fn send(&mut self, frame: &Frame) -> Result<()> { poll_fn(|cx| { ready!(self.0.poll_write_ready(cx))?; match self.0.get_ref().0.send(frame) { Err(e) if e.kind() == ErrorKind::WouldBlock => { self.0.clear_write_ready(cx)?; Poll::Pending } r => Poll::Ready(r), } }) .await } } impl AsRawFd for Socket { fn as_raw_fd(&self) -> RawFd { self.0.get_ref().0.as_raw_fd() } } struct Inner(crate::Socket); impl Evented for Inner { fn register( &self, poll: &mio::Poll, token: Token, interest: Ready, opts: PollOpt, ) -> Result<()> { EventedFd(&self.0.as_raw_fd()).register(poll, token, interest, opts) } fn reregister( &self, poll: &mio::Poll, token: Token, interest: Ready, opts: PollOpt, ) -> Result<()> { EventedFd(&self.0.as_raw_fd()).reregister(poll, token, interest, opts) } fn deregister(&self, poll: &mio::Poll) -> Result<()> { EventedFd(&self.0.as_raw_fd()).deregister(poll) } } #[cfg(test)] mod tests;