1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107
use std::convert::TryFrom; use std::io::{Error, ErrorKind, Result}; use std::os::unix::io::{AsRawFd, RawFd}; use std::pin::Pin; use std::task::{Context, Poll, Poll::*}; use tokio::io::{unix, AsyncRead, AsyncWrite, ReadBuf}; pub struct AsyncFd(unix::AsyncFd<RawFd>); impl TryFrom<RawFd> for AsyncFd { type Error = Error; fn try_from(fd: RawFd) -> Result<Self> { set_nonblock(fd)?; Ok(Self(unix::AsyncFd::new(fd)?)) } } impl AsRawFd for AsyncFd { fn as_raw_fd(&self) -> RawFd { *self.0.get_ref() } } impl AsyncRead for AsyncFd { fn poll_read( self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>, ) -> Poll<Result<()>> { loop { let mut ready = match self.0.poll_read_ready(cx) { Ready(x) => x?, Pending => return Pending, }; let ret = unsafe { libc::read( self.as_raw_fd(), buf.unfilled_mut() as *mut _ as _, buf.remaining(), ) }; return if ret < 0 { let e = Error::last_os_error(); if e.kind() == ErrorKind::WouldBlock { ready.clear_ready(); continue; } else { Ready(Err(e)) } } else { let n = ret as usize; unsafe { buf.assume_init(n) }; buf.advance(n); Ready(Ok(())) }; } } } impl AsyncWrite for AsyncFd { fn poll_write(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &[u8]) -> Poll<Result<usize>> { loop { let mut ready = match self.0.poll_write_ready(cx) { Ready(x) => x?, Pending => return Pending, }; let ret = unsafe { libc::write(self.as_raw_fd(), buf.as_ptr() as _, buf.len()) }; return if ret < 0 { let e = Error::last_os_error(); if e.kind() == ErrorKind::WouldBlock { ready.clear_ready(); continue; } else { Ready(Err(e)) } } else { Ready(Ok(ret as usize)) }; } } fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> { Ready(Ok(())) } fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<()>> { Ready(Ok(())) } } fn set_nonblock(fd: RawFd) -> Result<()> { let flags = unsafe { libc::fcntl(fd, libc::F_GETFL) }; if flags < 0 { return Err(Error::last_os_error()); } match unsafe { libc::fcntl(fd, libc::F_SETFL, flags | libc::O_NONBLOCK) } { 0 => Ok(()), _ => Err(Error::last_os_error()), } }