linux-aio-tokio 0.3.0

Tokio bindings for Linux kernel AIO
Documentation
#![allow(clippy::unneeded_field_pattern)]

use std::marker::PhantomData;
use std::os::unix::prelude::*;
use std::{io, mem};

use futures::channel::oneshot;
use intrusive_collections::linked_list::LinkedListOps;
use intrusive_collections::{DefaultLinkOps, LinkedList};
use lock_api::{Mutex, RawMutex};

use crate::locked_buf::LifetimeExtender;
pub use crate::requests::atomic_link::AtomicLink;
use crate::{aio, AioResult, RawCommand};

pub use self::intrusive_adapter::{IntrusiveAdapter, LocalRequestAdapter, SyncRequestAdapter};

mod atomic_link;
mod intrusive_adapter;

#[derive(Debug)]
pub(crate) struct RequestInner {
    pub aio_req: aio::iocb,
    pub completed_tx: Option<oneshot::Sender<AioResult>>,
    pub buf_lifetime_extender: Option<LifetimeExtender>,
}

impl RequestInner {
    pub(crate) fn take_buf_lifetime_extender(&mut self) -> Option<LifetimeExtender> {
        self.buf_lifetime_extender.take()
    }
}

#[derive(Debug)]
pub struct Request<M: RawMutex, L: DefaultLinkOps + Default> {
    link: L,
    pub(crate) inner: Mutex<M, RequestInner>,
}

impl<M: RawMutex, L: DefaultLinkOps + Default> Default for Request<M, L> {
    fn default() -> Self {
        Request {
            link: Default::default(),
            inner: Mutex::new(RequestInner {
                aio_req: unsafe { mem::zeroed() },
                completed_tx: None,
                buf_lifetime_extender: None,
            }),
        }
    }
}
impl<M: RawMutex, L: DefaultLinkOps + Default> Request<M, L> {
    pub fn aio_addr(&self) -> u64 {
        (unsafe { mem::transmute::<_, usize>(self as *const Self) }) as u64
    }

    pub fn send_to_waiter(&self, data: AioResult) -> bool {
        self.inner
            .lock()
            .completed_tx
            .take()
            .expect("no completed_tx in received AIO request")
            .send(data)
            .is_ok()
    }

    pub fn set_payload(
        &mut self,
        request_ptr_array: &mut [*mut aio::iocb; 1],
        request_addr: u64,
        eventfd: RawFd,
        fd: RawFd,
        command: &mut RawCommand,
        tx: oneshot::Sender<AioResult>,
    ) {
        let inner = &mut *self.inner.lock();

        let (addr, buf_len) = command.buffer_addr().unwrap_or((0, 0));
        let len = command.len().unwrap_or(0);

        assert!(len <= buf_len as u64, "len should be <= buffer.size()");

        inner.aio_req.aio_data = request_addr;
        inner.aio_req.aio_resfd = eventfd as u32;
        inner.aio_req.aio_flags = aio::IOCB_FLAG_RESFD | command.flags().unwrap_or(0);
        inner.aio_req.aio_fildes = fd as u32;
        inner.aio_req.aio_offset = command.offset().unwrap_or(0) as i64;
        inner.aio_req.aio_buf = addr;
        inner.aio_req.aio_nbytes = len;
        inner.aio_req.aio_lio_opcode = command.opcode() as u16;

        inner.buf_lifetime_extender = command.buffer_lifetime_extender();
        inner.completed_tx = Some(tx);

        request_ptr_array[0] = &mut inner.aio_req as *mut aio::iocb;
    }
}

pub struct Requests<
    M: RawMutex,
    A: crate::IntrusiveAdapter<M, L>,
    L: DefaultLinkOps<Ops = A::LinkOps> + Default,
> where
    A::LinkOps: LinkedListOps + Default,
{
    ready_pool: LinkedList<A>,
    outstanding: LinkedList<A>,
    _request_mutex: PhantomData<M>,
    _link_ops: PhantomData<L>,
}

impl<M, A, L> Requests<M, A, L>
where
    M: RawMutex,
    A: crate::IntrusiveAdapter<M, L>,
    A::LinkOps: LinkedListOps + Default,
    L: DefaultLinkOps<Ops = A::LinkOps> + Default,
{
    pub fn new(nr: usize) -> Result<Self, io::Error> {
        let outstanding = LinkedList::new(A::new());
        let mut ready_pool = LinkedList::new(A::new());

        for _ in 0..nr {
            ready_pool.push_back(Box::new(Request::default()));
        }

        Ok(Requests {
            ready_pool,
            outstanding,
            _request_mutex: Default::default(),
            _link_ops: Default::default(),
        })
    }

    pub fn move_to_outstanding(&mut self, ptr: Box<Request<M, L>>) {
        self.outstanding.push_back(ptr);
    }

    pub fn return_outstanding_to_ready(&mut self, request: *const Request<M, L>) {
        let mut cursor = unsafe { self.outstanding.cursor_mut_from_ptr(request) };

        self.ready_pool.push_back(
            cursor.remove().expect(
                "Could not find item in outstanding list while trying to move to ready_pool",
            ),
        );
    }

    pub fn return_in_flight_to_ready(&mut self, req: Box<Request<M, L>>) {
        self.ready_pool.push_back(req);
    }

    pub fn take(&mut self) -> Option<Box<Request<M, L>>> {
        self.ready_pool.pop_front()
    }
}