tokio_wasi 1.23.0

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
#![warn(rust_2018_idioms)]
#![cfg(all(target_os = "freebsd", feature = "net"))]

use mio_aio::{AioCb, AioFsyncMode, LioCb};
use std::{
    future::Future,
    mem,
    os::unix::io::{AsRawFd, RawFd},
    pin::Pin,
    task::{Context, Poll},
};
use tempfile::tempfile;
use tokio::io::bsd::{Aio, AioSource};
use tokio_test::assert_pending;

mod aio {
    use super::*;

    /// Adapts mio_aio::AioCb (which implements mio::event::Source) to AioSource
    struct WrappedAioCb<'a>(AioCb<'a>);
    impl<'a> AioSource for WrappedAioCb<'a> {
        fn register(&mut self, kq: RawFd, token: usize) {
            self.0.register_raw(kq, token)
        }
        fn deregister(&mut self) {
            self.0.deregister_raw()
        }
    }

    /// A very crude implementation of an AIO-based future
    struct FsyncFut(Aio<WrappedAioCb<'static>>);

    impl Future for FsyncFut {
        type Output = std::io::Result<()>;

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let poll_result = self.0.poll_ready(cx);
            match poll_result {
                Poll::Pending => Poll::Pending,
                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
                Poll::Ready(Ok(_ev)) => {
                    // At this point, we could clear readiness.  But there's no
                    // point, since we're about to drop the Aio.
                    let result = (*self.0).0.aio_return();
                    match result {
                        Ok(_) => Poll::Ready(Ok(())),
                        Err(e) => Poll::Ready(Err(e.into())),
                    }
                }
            }
        }
    }

    /// Low-level AIO Source
    ///
    /// An example bypassing mio_aio and Nix to demonstrate how the kevent
    /// registration actually works, under the hood.
    struct LlSource(Pin<Box<libc::aiocb>>);

    impl AioSource for LlSource {
        fn register(&mut self, kq: RawFd, token: usize) {
            let mut sev: libc::sigevent = unsafe { mem::MaybeUninit::zeroed().assume_init() };
            sev.sigev_notify = libc::SIGEV_KEVENT;
            sev.sigev_signo = kq;
            sev.sigev_value = libc::sigval {
                sival_ptr: token as *mut libc::c_void,
            };
            self.0.aio_sigevent = sev;
        }

        fn deregister(&mut self) {
            unsafe {
                self.0.aio_sigevent = mem::zeroed();
            }
        }
    }

    struct LlFut(Aio<LlSource>);

    impl Future for LlFut {
        type Output = std::io::Result<()>;

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let poll_result = self.0.poll_ready(cx);
            match poll_result {
                Poll::Pending => Poll::Pending,
                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
                Poll::Ready(Ok(_ev)) => {
                    let r = unsafe { libc::aio_return(self.0 .0.as_mut().get_unchecked_mut()) };
                    assert_eq!(0, r);
                    Poll::Ready(Ok(()))
                }
            }
        }
    }

    /// A very simple object that can implement AioSource and can be reused.
    ///
    /// mio_aio normally assumes that each AioCb will be consumed on completion.
    /// This somewhat contrived example shows how an Aio object can be reused
    /// anyway.
    struct ReusableFsyncSource {
        aiocb: Pin<Box<AioCb<'static>>>,
        fd: RawFd,
        token: usize,
    }
    impl ReusableFsyncSource {
        fn fsync(&mut self) {
            self.aiocb.register_raw(self.fd, self.token);
            self.aiocb.fsync(AioFsyncMode::O_SYNC).unwrap();
        }
        fn new(aiocb: AioCb<'static>) -> Self {
            ReusableFsyncSource {
                aiocb: Box::pin(aiocb),
                fd: 0,
                token: 0,
            }
        }
        fn reset(&mut self, aiocb: AioCb<'static>) {
            self.aiocb = Box::pin(aiocb);
        }
    }
    impl AioSource for ReusableFsyncSource {
        fn register(&mut self, kq: RawFd, token: usize) {
            self.fd = kq;
            self.token = token;
        }
        fn deregister(&mut self) {
            self.fd = 0;
        }
    }

    struct ReusableFsyncFut<'a>(&'a mut Aio<ReusableFsyncSource>);
    impl<'a> Future for ReusableFsyncFut<'a> {
        type Output = std::io::Result<()>;

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let poll_result = self.0.poll_ready(cx);
            match poll_result {
                Poll::Pending => Poll::Pending,
                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
                Poll::Ready(Ok(ev)) => {
                    // Since this future uses a reusable Aio, we must clear
                    // its readiness here.  That makes the future
                    // non-idempotent; the caller can't poll it repeatedly after
                    // it has already returned Ready.  But that's ok; most
                    // futures behave this way.
                    self.0.clear_ready(ev);
                    let result = (*self.0).aiocb.aio_return();
                    match result {
                        Ok(_) => Poll::Ready(Ok(())),
                        Err(e) => Poll::Ready(Err(e.into())),
                    }
                }
            }
        }
    }

    #[tokio::test]
    async fn fsync() {
        let f = tempfile().unwrap();
        let fd = f.as_raw_fd();
        let aiocb = AioCb::from_fd(fd, 0);
        let source = WrappedAioCb(aiocb);
        let mut poll_aio = Aio::new_for_aio(source).unwrap();
        (*poll_aio).0.fsync(AioFsyncMode::O_SYNC).unwrap();
        let fut = FsyncFut(poll_aio);
        fut.await.unwrap();
    }

    #[tokio::test]
    async fn ll_fsync() {
        let f = tempfile().unwrap();
        let fd = f.as_raw_fd();
        let mut aiocb: libc::aiocb = unsafe { mem::MaybeUninit::zeroed().assume_init() };
        aiocb.aio_fildes = fd;
        let source = LlSource(Box::pin(aiocb));
        let mut poll_aio = Aio::new_for_aio(source).unwrap();
        let r = unsafe {
            let p = (*poll_aio).0.as_mut().get_unchecked_mut();
            libc::aio_fsync(libc::O_SYNC, p)
        };
        assert_eq!(0, r);
        let fut = LlFut(poll_aio);
        fut.await.unwrap();
    }

    /// A suitably crafted future type can reuse an Aio object
    #[tokio::test]
    async fn reuse() {
        let f = tempfile().unwrap();
        let fd = f.as_raw_fd();
        let aiocb0 = AioCb::from_fd(fd, 0);
        let source = ReusableFsyncSource::new(aiocb0);
        let mut poll_aio = Aio::new_for_aio(source).unwrap();
        poll_aio.fsync();
        let fut0 = ReusableFsyncFut(&mut poll_aio);
        fut0.await.unwrap();

        let aiocb1 = AioCb::from_fd(fd, 0);
        poll_aio.reset(aiocb1);
        let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
        assert_pending!(poll_aio.poll_ready(&mut ctx));
        poll_aio.fsync();
        let fut1 = ReusableFsyncFut(&mut poll_aio);
        fut1.await.unwrap();
    }
}

mod lio {
    use super::*;

    struct WrappedLioCb<'a>(LioCb<'a>);
    impl<'a> AioSource for WrappedLioCb<'a> {
        fn register(&mut self, kq: RawFd, token: usize) {
            self.0.register_raw(kq, token)
        }
        fn deregister(&mut self) {
            self.0.deregister_raw()
        }
    }

    /// A very crude lio_listio-based Future
    struct LioFut(Option<Aio<WrappedLioCb<'static>>>);

    impl Future for LioFut {
        type Output = std::io::Result<Vec<isize>>;

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let poll_result = self.0.as_mut().unwrap().poll_ready(cx);
            match poll_result {
                Poll::Pending => Poll::Pending,
                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
                Poll::Ready(Ok(_ev)) => {
                    // At this point, we could clear readiness.  But there's no
                    // point, since we're about to drop the Aio.
                    let r = self.0.take().unwrap().into_inner().0.into_results(|iter| {
                        iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
                    });
                    Poll::Ready(Ok(r))
                }
            }
        }
    }

    /// Minimal example demonstrating reuse of an Aio object with lio
    /// readiness.  mio_aio::LioCb actually does something similar under the
    /// hood.
    struct ReusableLioSource {
        liocb: Option<LioCb<'static>>,
        fd: RawFd,
        token: usize,
    }
    impl ReusableLioSource {
        fn new(liocb: LioCb<'static>) -> Self {
            ReusableLioSource {
                liocb: Some(liocb),
                fd: 0,
                token: 0,
            }
        }
        fn reset(&mut self, liocb: LioCb<'static>) {
            self.liocb = Some(liocb);
        }
        fn submit(&mut self) {
            self.liocb
                .as_mut()
                .unwrap()
                .register_raw(self.fd, self.token);
            self.liocb.as_mut().unwrap().submit().unwrap();
        }
    }
    impl AioSource for ReusableLioSource {
        fn register(&mut self, kq: RawFd, token: usize) {
            self.fd = kq;
            self.token = token;
        }
        fn deregister(&mut self) {
            self.fd = 0;
        }
    }
    struct ReusableLioFut<'a>(&'a mut Aio<ReusableLioSource>);
    impl<'a> Future for ReusableLioFut<'a> {
        type Output = std::io::Result<Vec<isize>>;

        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let poll_result = self.0.poll_ready(cx);
            match poll_result {
                Poll::Pending => Poll::Pending,
                Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
                Poll::Ready(Ok(ev)) => {
                    // Since this future uses a reusable Aio, we must clear
                    // its readiness here.  That makes the future
                    // non-idempotent; the caller can't poll it repeatedly after
                    // it has already returned Ready.  But that's ok; most
                    // futures behave this way.
                    self.0.clear_ready(ev);
                    let r = (*self.0).liocb.take().unwrap().into_results(|iter| {
                        iter.map(|lr| lr.result.unwrap()).collect::<Vec<isize>>()
                    });
                    Poll::Ready(Ok(r))
                }
            }
        }
    }

    /// An lio_listio operation with one write element
    #[tokio::test]
    async fn onewrite() {
        const WBUF: &[u8] = b"abcdef";
        let f = tempfile().unwrap();

        let mut builder = mio_aio::LioCbBuilder::with_capacity(1);
        builder = builder.emplace_slice(
            f.as_raw_fd(),
            0,
            &WBUF[..],
            0,
            mio_aio::LioOpcode::LIO_WRITE,
        );
        let liocb = builder.finish();
        let source = WrappedLioCb(liocb);
        let mut poll_aio = Aio::new_for_lio(source).unwrap();

        // Send the operation to the kernel
        (*poll_aio).0.submit().unwrap();
        let fut = LioFut(Some(poll_aio));
        let v = fut.await.unwrap();
        assert_eq!(v.len(), 1);
        assert_eq!(v[0] as usize, WBUF.len());
    }

    /// A suitably crafted future type can reuse an Aio object
    #[tokio::test]
    async fn reuse() {
        const WBUF: &[u8] = b"abcdef";
        let f = tempfile().unwrap();

        let mut builder0 = mio_aio::LioCbBuilder::with_capacity(1);
        builder0 = builder0.emplace_slice(
            f.as_raw_fd(),
            0,
            &WBUF[..],
            0,
            mio_aio::LioOpcode::LIO_WRITE,
        );
        let liocb0 = builder0.finish();
        let source = ReusableLioSource::new(liocb0);
        let mut poll_aio = Aio::new_for_aio(source).unwrap();
        poll_aio.submit();
        let fut0 = ReusableLioFut(&mut poll_aio);
        let v = fut0.await.unwrap();
        assert_eq!(v.len(), 1);
        assert_eq!(v[0] as usize, WBUF.len());

        // Now reuse the same Aio
        let mut builder1 = mio_aio::LioCbBuilder::with_capacity(1);
        builder1 = builder1.emplace_slice(
            f.as_raw_fd(),
            0,
            &WBUF[..],
            0,
            mio_aio::LioOpcode::LIO_WRITE,
        );
        let liocb1 = builder1.finish();
        poll_aio.reset(liocb1);
        let mut ctx = Context::from_waker(futures::task::noop_waker_ref());
        assert_pending!(poll_aio.poll_ready(&mut ctx));
        poll_aio.submit();
        let fut1 = ReusableLioFut(&mut poll_aio);
        let v = fut1.await.unwrap();
        assert_eq!(v.len(), 1);
        assert_eq!(v[0] as usize, WBUF.len());
    }
}