tokio 1.37.0

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
#![warn(rust_2018_idioms)]
#![cfg(all(target_os = "freebsd", feature = "net"))]

use mio_aio::{AioFsyncMode, SourceApi};
use std::{
    future::Future,
    io, mem,
    os::unix::io::{AsRawFd, RawFd},
    pin::{pin, Pin},
    task::{Context, Poll},
};
use tempfile::tempfile;
use tokio::io::bsd::{Aio, AioSource};
use tokio_test::assert_pending;

mod aio {
    use super::*;

    #[derive(Debug)]
    struct TokioSource(mio_aio::Source<nix::sys::aio::AioFsync>);

    impl AioSource for TokioSource {
        fn register(&mut self, kq: RawFd, token: usize) {
            self.0.register_raw(kq, token)
        }
        fn deregister(&mut self) {
            self.0.deregister_raw()
        }
    }

    /// A very crude implementation of an AIO-based future
    struct FsyncFut(Aio<TokioSource>);

    impl FsyncFut {
        pub fn submit(self: Pin<&mut Self>) -> io::Result<()> {
            let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) };
            match p.submit() {
                Ok(()) => Ok(()),
                Err(e) => Err(io::Error::from_raw_os_error(e as i32)),
            }
        }
    }

    impl Future for FsyncFut {
        type Output = io::Result<()>;

        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let poll_result = self.0.poll_ready(cx);
            match poll_result {
                Poll::Pending => Poll::Pending,
                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
                Poll::Ready(Ok(_ev)) => {
                    // At this point, we could clear readiness.  But there's no
                    // point, since we're about to drop the Aio.
                    let p = unsafe { self.map_unchecked_mut(|s| &mut s.0 .0) };
                    let result = p.aio_return();
                    match result {
                        Ok(r) => Poll::Ready(Ok(r)),
                        Err(e) => Poll::Ready(Err(io::Error::from_raw_os_error(e as i32))),
                    }
                }
            }
        }
    }

    /// Low-level AIO Source
    ///
    /// An example bypassing mio_aio and Nix to demonstrate how the kevent
    /// registration actually works, under the hood.
    struct LlSource(Pin<Box<libc::aiocb>>);

    impl LlSource {
        fn fsync(mut self: Pin<&mut Self>) {
            let r = unsafe {
                let p = self.0.as_mut().get_unchecked_mut();
                libc::aio_fsync(libc::O_SYNC, p)
            };
            assert_eq!(0, r);
        }
    }

    impl AioSource for LlSource {
        fn register(&mut self, kq: RawFd, token: usize) {
            let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
            sev.sigev_notify = libc::SIGEV_KEVENT;
            sev.sigev_signo = kq;
            sev.sigev_value = libc::sigval {
                sival_ptr: token as *mut libc::c_void,
            };
            self.0.aio_sigevent = sev;
        }

        fn deregister(&mut self) {
            unsafe {
                self.0.aio_sigevent = mem::zeroed();
            }
        }
    }

    struct LlFut(Aio<LlSource>);

    impl LlFut {
        pub fn fsync(self: Pin<&mut Self>) {
            let p = unsafe { self.map_unchecked_mut(|s| &mut *(s.0)) };
            p.fsync();
        }
    }

    impl Future for LlFut {
        type Output = std::io::Result<usize>;

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let poll_result = self.0.poll_ready(cx);
            match poll_result {
                Poll::Pending => Poll::Pending,
                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
                Poll::Ready(Ok(ev)) => {
                    // Clearing readiness makes the future non-idempotent; the
                    // caller can't poll it repeatedly after it has already
                    // returned Ready.  But that's ok; most futures behave this
                    // way.
                    self.0.clear_ready(ev);
                    let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) };
                    if r >= 0 {
                        Poll::Ready(Ok(r as usize))
                    } else {
                        Poll::Ready(Err(io::Error::last_os_error()))
                    }
                }
            }
        }
    }

    #[tokio::test]
    async fn fsync() {
        let f = tempfile().unwrap();
        let fd = f.as_raw_fd();
        let mode = AioFsyncMode::O_SYNC;
        let source = TokioSource(mio_aio::Fsync::fsync(fd, mode, 0));
        let poll_aio = Aio::new_for_aio(source).unwrap();
        let mut fut = pin!(FsyncFut(poll_aio));
        fut.as_mut().submit().unwrap();
        fut.await.unwrap();
    }

    #[tokio::test]
    async fn ll_fsync() {
        let f = tempfile().unwrap();
        let fd = f.as_raw_fd();
        let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() };
        aiocb.aio_fildes = fd;
        let source = LlSource(Box::pin(aiocb));
        let mut poll_aio = Aio::new_for_aio(source).unwrap();
        let r = unsafe {
            let p = poll_aio.0.as_mut().get_unchecked_mut();
            libc::aio_fsync(libc::O_SYNC, p)
        };
        assert_eq!(0, r);
        let fut = LlFut(poll_aio);
        fut.await.unwrap();
    }

    /// A suitably crafted future type can reuse an Aio object
    #[tokio::test]
    async fn reuse() {
        let f = tempfile().unwrap();
        let fd = f.as_raw_fd();
        let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() };
        aiocb.aio_fildes = fd;
        let source = LlSource(Box::pin(aiocb));
        let poll_aio = Aio::new_for_aio(source).unwrap();

        // Send the operation to the kernel the first time
        let mut fut = LlFut(poll_aio);
        {
            let mut pfut = Pin::new(&mut fut);
            pfut.as_mut().fsync();
            pfut.as_mut().await.unwrap();
        }

        // Check that readiness was cleared
        let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
        assert_pending!(fut.0.poll_ready(&mut ctx));

        // and reuse the future and its Aio object
        {
            let mut pfut = Pin::new(&mut fut);
            pfut.as_mut().fsync();
            pfut.as_mut().await.unwrap();
        }
    }
}

mod lio {
    use super::*;

    /// Low-level source based on lio_listio
    ///
    /// An example demonstrating using AIO with `Interest::Lio`.  mio_aio 0.8
    /// doesn't include any bindings for lio_listio, so we've got to go
    /// low-level.
    struct LioSource<'a> {
        aiocb: Pin<&'a mut [&'a mut libc::aiocb; 1]>,
        sev: libc::sigevent,
    }

    impl<'a> LioSource<'a> {
        fn new(aiocb: Pin<&'a mut [&'a mut libc::aiocb; 1]>) -> Self {
            LioSource {
                aiocb,
                sev: unsafe { mem::zeroed() },
            }
        }

        fn submit(mut self: Pin<&mut Self>) {
            let p: *const *mut libc::aiocb =
                unsafe { self.aiocb.as_mut().get_unchecked_mut() } as *const _ as *const *mut _;
            let r = unsafe { libc::lio_listio(libc::LIO_NOWAIT, p, 1, &mut self.sev) };
            assert_eq!(r, 0);
        }
    }

    impl<'a> AioSource for LioSource<'a> {
        fn register(&mut self, kq: RawFd, token: usize) {
            let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
            sev.sigev_notify = libc::SIGEV_KEVENT;
            sev.sigev_signo = kq;
            sev.sigev_value = libc::sigval {
                sival_ptr: token as *mut libc::c_void,
            };
            self.sev = sev;
        }

        fn deregister(&mut self) {
            unsafe {
                self.sev = mem::zeroed();
            }
        }
    }

    struct LioFut<'a>(Aio<LioSource<'a>>);

    impl<'a> LioFut<'a> {
        pub fn submit(self: Pin<&mut Self>) {
            let p = unsafe { self.map_unchecked_mut(|s| &mut *(s.0)) };
            p.submit();
        }
    }

    impl<'a> Future for LioFut<'a> {
        type Output = std::io::Result<usize>;

        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let poll_result = self.0.poll_ready(cx);
            match poll_result {
                Poll::Pending => Poll::Pending,
                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
                Poll::Ready(Ok(ev)) => {
                    // Clearing readiness makes the future non-idempotent; the
                    // caller can't poll it repeatedly after it has already
                    // returned Ready.  But that's ok; most futures behave this
                    // way.  Clearing readiness is especially useful for
                    // lio_listio, because sometimes some operations will be
                    // ready but not all.
                    self.0.clear_ready(ev);
                    let r = unsafe {
                        let p1 = self.get_unchecked_mut();
                        let p2: &mut [&mut libc::aiocb; 1] =
                            p1.0.aiocb.as_mut().get_unchecked_mut();
                        let p3: &mut libc::aiocb = p2[0];
                        libc::aio_return(p3)
                    };
                    if r >= 0 {
                        Poll::Ready(Ok(r as usize))
                    } else {
                        Poll::Ready(Err(io::Error::last_os_error()))
                    }
                }
            }
        }
    }

    /// An lio_listio operation with one fsync element
    #[tokio::test]
    async fn onewrite() {
        const WBUF: &[u8] = b"abcdef";
        let f = tempfile().unwrap();

        let mut aiocb: libc::aiocb = unsafe { mem::zeroed() };
        aiocb.aio_fildes = f.as_raw_fd();
        aiocb.aio_lio_opcode = libc::LIO_WRITE;
        aiocb.aio_nbytes = WBUF.len();
        aiocb.aio_buf = WBUF.as_ptr() as *mut _;
        let aiocb = pin!([&mut aiocb]);
        let source = LioSource::new(aiocb);
        let poll_aio = Aio::new_for_lio(source).unwrap();

        // Send the operation to the kernel
        let mut fut = pin!(LioFut(poll_aio));
        fut.as_mut().submit();
        fut.await.unwrap();
    }

    /// A suitably crafted future type can reuse an Aio object
    #[tokio::test]
    async fn reuse() {
        const WBUF: &[u8] = b"abcdef";
        let f = tempfile().unwrap();

        let mut aiocb: libc::aiocb = unsafe { mem::zeroed() };
        aiocb.aio_fildes = f.as_raw_fd();
        aiocb.aio_lio_opcode = libc::LIO_WRITE;
        aiocb.aio_nbytes = WBUF.len();
        aiocb.aio_buf = WBUF.as_ptr() as *mut _;
        let aiocb = pin!([&mut aiocb]);
        let source = LioSource::new(aiocb);
        let poll_aio = Aio::new_for_lio(source).unwrap();

        // Send the operation to the kernel the first time
        let mut fut = LioFut(poll_aio);
        {
            let mut pfut = Pin::new(&mut fut);
            pfut.as_mut().submit();
            pfut.as_mut().await.unwrap();
        }

        // Check that readiness was cleared
        let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
        assert_pending!(fut.0.poll_ready(&mut ctx));

        // and reuse the future and its Aio object
        {
            let mut pfut = Pin::new(&mut fut);
            pfut.as_mut().submit();
            pfut.as_mut().await.unwrap();
        }
    }
}