async-rdma 0.4.0

A rust async wrapper for RDMA ibvers lib
Documentation
use super::{raw::RawMemoryRegion, IbvAccess, MrAccess, MrToken};
#[cfg(test)]
use crate::protection_domain::ProtectionDomain;
use crate::{
    lock_utilities::{MappedRwLockReadGuard, MappedRwLockWriteGuard},
    MRManageStrategy,
};

use parking_lot::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use rdma_sys::ibv_access_flags;
use sealed::sealed;
use std::{
    alloc::{dealloc, Layout},
    fmt::Debug,
    ops::Range,
    slice,
    sync::Arc,
    time::{Duration, SystemTime},
};
use tracing::debug;

/// Local memory region trait
///
/// # Safety
///
/// For the `fn`s that have not been marked as `unsafe`, we should make sure the implementations
/// meet all safety requirements, for example the memory of mrs should be initialized.
///
/// For the `unsafe` `fn`s, we should make sure no other safety issues have been introduced except
/// for the issues that have been listed in the `Safety` documents of `fn`s.
#[sealed]
pub unsafe trait LocalMrReadAccess: MrAccess {
    /// Get the start pointer until it is readable
    ///
    /// If this mr is being used in RDMA ops, the thread may be blocked
    #[allow(clippy::as_conversions)]
    #[inline]
    fn as_ptr(&self) -> MappedRwLockReadGuard<*const u8> {
        MappedRwLockReadGuard::new(self.get_inner().read(), self.addr() as *const u8)
    }

    /// Try to get the start pointer
    ///
    /// Return `None` if this mr is being used in RDMA ops without blocking thread
    #[allow(clippy::as_conversions)]
    #[inline]
    fn try_as_ptr(&self) -> Option<MappedRwLockReadGuard<*const u8>> {
        self.get_inner().try_read().map_or_else(
            || None,
            |guard| return Some(MappedRwLockReadGuard::new(guard, self.addr() as *const u8)),
        )
    }

    /// Get the start pointer without lock
    ///
    /// # Safety
    ///
    /// Make sure the mr is readable without cancel safety issue
    #[inline]
    #[allow(clippy::as_conversions)]
    fn as_ptr_unchecked(&self) -> *const u8 {
        self.addr() as _
    }

    /// Get the memory region as slice until it is readable
    ///
    /// If this mr is being used in RDMA ops, the thread may be blocked
    #[inline]
    #[allow(clippy::as_conversions)]
    fn as_slice(&self) -> MappedRwLockReadGuard<&[u8]> {
        // SAFETY: memory of this mr should have been initialized
        MappedRwLockReadGuard::map(self.as_ptr(), |ptr| unsafe {
            slice::from_raw_parts(ptr, self.length())
        })
    }

    /// Try to get the memory region as slice
    ///
    /// Return `None` if this mr is being used in RDMA ops without blocking thread
    #[allow(clippy::as_conversions)]
    #[inline]
    fn try_as_slice(&self) -> Option<MappedRwLockReadGuard<&[u8]>> {
        self.try_as_ptr().map_or_else(
            || None,
            |guard| {
                // SAFETY: memory of this mr should have been initialized
                return Some(MappedRwLockReadGuard::map(guard, |ptr| unsafe {
                    slice::from_raw_parts(ptr, self.length())
                }));
            },
        )
    }

    /// Get the memory region as slice without lock
    ///
    /// # Safety
    ///
    /// * Make sure the mr is readable without cancel safety issue.
    /// * The memory of this mr is initialized.
    /// * The total size of this mr of the slice must be no larger than `isize::MAX`.
    #[inline]
    unsafe fn as_slice_unchecked(&self) -> &[u8] {
        slice::from_raw_parts(self.as_ptr_unchecked(), self.length())
    }

    /// Get the local key
    fn lkey(&self) -> u32;

    /// Get the local key without lock
    ///
    /// # Safety
    ///
    /// Must ensure that there are no data races, for example:
    ///
    /// * The current thread logically owns a guard but that guard has been discarded using `mem::forget`.
    /// * The `lkey` of this mr is going to be changed.(It's not going to happen so far, because variable
    /// lkey has not been implemented yet.)
    #[inline]
    #[allow(clippy::unreachable)] // inner will not be null
    unsafe fn lkey_unchecked(&self) -> u32 {
        // SAFETY: must ensure that there are no data races
        let inner = self.get_inner().data_ptr();
        // SAFETY: rely on the former ?
        <*const LocalMrInner>::as_ref(inner)
            .map_or_else(|| unreachable!("get null inner"), LocalMrInner::lkey)
    }

    /// Get the remote key without lock
    ///
    /// # Safety
    ///
    /// Must ensure that there are no data races, for example:
    /// * The current thread logically owns a guard but that guard has been discarded using `mem::forget`.
    /// * The `rkey` of this mr is going to be changed.(It's not going to happen so far, because variable
    /// rkey has not been implemented yet.)
    #[inline]
    #[allow(clippy::unreachable)] // inner will not be null
    unsafe fn rkey_unchecked(&self) -> u32 {
        // SAFETY: must ensure that there are no data races
        let inner = self.get_inner().data_ptr();
        // SAFETY: rely on the former ?
        <*const LocalMrInner>::as_ref(inner)
            .map_or_else(|| unreachable!("get null inner"), LocalMrInner::rkey)
    }

    /// New a token with specified timeout
    #[inline]
    fn token_with_timeout(&self, timeout: Duration) -> Option<MrToken> {
        SystemTime::now().checked_add(timeout).map_or_else(
            || None,
            |ddl| {
                Some(MrToken {
                    addr: self.addr(),
                    len: self.length(),
                    rkey: self.rkey(),
                    ddl,
                    access: self.ibv_access().0,
                })
            },
        )
    }

    /// New a token with specified timeout with `rkey_unchecked`
    ///
    /// # Safety
    ///
    /// Must ensure that there are no data races about `rkey`, for example:
    /// * The current thread logically owns a guard but that guard has been discarded using `mem::forget`.
    /// * The `rkey` of this mr is going to be changed.(It's not going to happen so far, because variable
    /// rkey has not been implemented yet.)
    ///
    #[inline]
    unsafe fn token_with_timeout_unchecked(&self, timeout: Duration) -> Option<MrToken> {
        SystemTime::now().checked_add(timeout).map_or_else(
            || None,
            |ddl| {
                Some(MrToken {
                    addr: self.addr(),
                    len: self.length(),
                    rkey: self.rkey_unchecked(),
                    ddl,
                    access: self.ibv_access().0,
                })
            },
        )
    }

    /// Get the corresponding `RwLocalMrInner`
    fn get_inner(&self) -> &Arc<RwLocalMrInner>;

    /// Is the corresponding `RwLocalMrInner` readable?
    #[inline]
    fn is_readable(&self) -> bool {
        !self.get_inner().is_locked_exclusive()
    }

    /// Get read lock of `LocalMrInenr`
    #[inline]
    fn read_inner(&self) -> RwLockReadGuard<LocalMrInner> {
        self.get_inner().read()
    }
}

/// Writable local mr trait
///
/// # Safety
///
/// For the `fn`s that have not been marked as `unsafe`, we should make sure the implementations
/// meet all safety requirements, for example the memory should be initialized.
///
/// For the `unsafe` `fn`s, we should make sure no other safety issues have been introduced except
/// for the issues that have been listed in the `Safety` documents of `fn`s.
#[sealed]
pub unsafe trait LocalMrWriteAccess: MrAccess + LocalMrReadAccess {
    /// Get the mutable start pointer until it is writeable
    ///
    /// If this mr is being used in RDMA ops, the thread may be blocked
    #[inline]
    #[allow(clippy::as_conversions)]
    fn as_mut_ptr(&mut self) -> MappedRwLockWriteGuard<*mut u8> {
        MappedRwLockWriteGuard::new(self.get_inner().write(), self.addr() as *mut u8)
    }

    /// Try to get the mutable start pointer
    ///
    /// Return `None` if this mr is being used in RDMA ops without blocking thread
    #[allow(clippy::as_conversions)]
    #[inline]
    fn try_as_mut_ptr(&self) -> Option<MappedRwLockWriteGuard<*mut u8>> {
        self.get_inner().try_write().map_or_else(
            || None,
            |guard| return Some(MappedRwLockWriteGuard::new(guard, self.addr() as *mut u8)),
        )
    }

    /// Get the memory region start mut addr without lock
    ///
    /// # Safety
    ///
    /// Make sure the mr is writeable without cancel safety issue
    #[inline]
    #[allow(clippy::as_conversions)]
    fn as_mut_ptr_unchecked(&mut self) -> *mut u8 {
        // const pointer to mut pointer is safe
        self.as_ptr_unchecked() as _
    }

    /// Get the memory region as mutable slice until it is writeable
    ///
    /// If this mr is being used in RDMA ops, the thread may be blocked
    #[inline]
    #[allow(clippy::as_conversions)]
    fn as_mut_slice(&mut self) -> MappedRwLockWriteGuard<&mut [u8]> {
        let len = self.length();
        // SAFETY: memory of this mr should have been initialized
        MappedRwLockWriteGuard::map(self.as_mut_ptr(), |ptr| unsafe {
            slice::from_raw_parts_mut(ptr, len)
        })
    }

    /// Try to get the memory region as mutable slice
    ///
    /// Return `None` if this mr is being used in RDMA ops without blocking thread
    #[allow(clippy::as_conversions)]
    #[inline]
    fn try_as_mut_slice(&mut self) -> Option<MappedRwLockWriteGuard<&mut [u8]>> {
        self.try_as_mut_ptr().map_or_else(
            || None,
            |guard| {
                // SAFETY: memory of this mr should have been initialized
                return Some(MappedRwLockWriteGuard::map(guard, |ptr| unsafe {
                    slice::from_raw_parts_mut(ptr, self.length())
                }));
            },
        )
    }

    /// Get the memory region as mut slice without lock
    ///
    /// # Safety
    ///
    /// * Make sure the mr is writeable without cancel safety issue.
    /// * The memory of this mr is initialized.
    /// * The total size of this mr of the slice must be no larger than `isize::MAX`.
    #[inline]
    unsafe fn as_mut_slice_unchecked(&mut self) -> &mut [u8] {
        slice::from_raw_parts_mut(self.as_mut_ptr_unchecked(), self.length())
    }

    /// Is the corresponding `RwLocalMrInner` writeable?
    #[inline]
    fn is_writeable(&self) -> bool {
        !self.get_inner().is_locked()
    }

    /// Get write lock of `LocalMrInenr`
    #[inline]
    fn write_inner(&self) -> RwLockWriteGuard<LocalMrInner> {
        self.get_inner().write()
    }
}

/// Local Memory Region
#[derive(Debug)]
pub struct LocalMr {
    /// The corresponding `RwLocalMrInner`.
    inner: Arc<RwLocalMrInner>,
    /// The start address of this mr
    addr: usize,
    /// the length of this mr
    len: usize,
}

impl MrAccess for LocalMr {
    #[inline]
    fn addr(&self) -> usize {
        self.addr
    }

    #[inline]
    fn length(&self) -> usize {
        self.len
    }

    #[inline]
    fn rkey(&self) -> u32 {
        self.read_inner().rkey()
    }
}

impl IbvAccess for LocalMr {
    #[inline]
    fn ibv_access(&self) -> ibv_access_flags {
        self.read_inner().ibv_access()
    }
}

#[sealed]
unsafe impl LocalMrReadAccess for LocalMr {
    #[inline]
    fn lkey(&self) -> u32 {
        self.read_inner().lkey()
    }

    #[inline]
    fn get_inner(&self) -> &Arc<RwLocalMrInner> {
        &self.inner
    }
}

#[sealed]
unsafe impl LocalMrWriteAccess for LocalMr {}

impl LocalMr {
    /// New Local Mr
    pub(crate) fn new(inner: LocalMrInner) -> Self {
        let addr = inner.addr;
        let len = inner.layout.size();
        let inner = Arc::new(RwLock::new(inner));
        Self { inner, addr, len }
    }

    /// Get a local mr slice
    ///
    /// Return `None` if the inputed range is wrong
    #[inline]
    #[must_use]
    pub fn get(&self, i: Range<usize>) -> Option<LocalMrSlice> {
        // SAFETY: `self` is checked to be valid and in bounds above.
        if i.start >= i.end || i.end > self.len {
            None
        } else {
            Some(LocalMrSlice::new(
                self,
                Arc::<RwLocalMrInner>::clone(&self.inner),
                self.addr().wrapping_add(i.start),
                i.len(),
            ))
        }
    }

    /// Get an unchecked local mr slice
    ///
    /// # Safety
    ///
    /// Callers of this function are responsible that these preconditions are
    /// satisfied:
    ///
    /// * The starting index must not exceed the ending index;
    /// * Indexes must be within bounds of the original `LocalMr`.
    #[inline]
    #[must_use]
    pub unsafe fn get_unchecked(&self, i: Range<usize>) -> LocalMrSlice {
        LocalMrSlice::new(
            self,
            Arc::<RwLocalMrInner>::clone(&self.inner),
            self.addr().wrapping_add(i.start),
            i.len(),
        )
    }

    /// Get a mutable local mr slice
    ///
    /// Return `None` if the inputed range is wrong
    #[inline]
    pub fn get_mut(&mut self, i: Range<usize>) -> Option<LocalMrSliceMut> {
        // SAFETY: `self` is checked to be valid and in bounds above.
        if i.start >= i.end || i.end > self.length() {
            None
        } else {
            Some(LocalMrSliceMut::new(
                self,
                Arc::<RwLocalMrInner>::clone(&self.inner),
                self.addr().wrapping_add(i.start),
                i.len(),
            ))
        }
    }

    /// Get an unchecked mutable local mr slice
    ///
    /// # Safety
    ///
    /// Callers of this function are responsible that these preconditions are
    /// satisfied:
    ///
    /// * The starting index must not exceed the ending index;
    /// * Indexes must be within bounds of the original `LocalMr`.
    #[inline]
    pub unsafe fn get_unchecked_mut(&mut self, i: Range<usize>) -> LocalMrSliceMut {
        LocalMrSliceMut::new(
            self,
            Arc::<RwLocalMrInner>::clone(&self.inner),
            self.addr().wrapping_add(i.start),
            i.len(),
        )
    }

    /// Take the ownership and return a sub local mr from self
    ///
    /// Return `None` if the inputed range is wrong
    #[inline]
    pub(crate) fn take(mut self, i: Range<usize>) -> Option<Self> {
        // SAFETY: `self` is checked to be valid and in bounds above.
        if i.start >= i.end || i.end > self.length() {
            None
        } else {
            self.addr = self.addr.wrapping_add(i.start);
            self.len = i.end.wrapping_sub(i.start);
            Some(self)
        }
    }

    /// Take the ownership and return an unchecked sub local mr from self
    ///
    /// # Safety
    ///
    /// Callers of this function are responsible that these preconditions are
    /// satisfied:
    ///
    /// * The starting index must not exceed the ending index;
    /// * Indexes must be within bounds of the original `LocalMr`.
    #[inline]
    #[allow(dead_code)]
    pub(crate) unsafe fn take_unchecked(mut self, i: Range<usize>) -> Self {
        self.addr = self.addr.wrapping_add(i.start);
        self.len = i.end.wrapping_sub(i.start);
        self
    }
}

/// `LocalMrInner` in `RwLock`
pub(crate) type RwLocalMrInner = RwLock<LocalMrInner>;
/// Local Memory Region inner
#[derive(Debug)]
pub struct LocalMrInner {
    /// The start address of this mr
    addr: usize,
    /// The layout of this mr
    layout: Layout,
    /// The raw mr where this local mr comes from.
    raw: Arc<RawMemoryRegion>,
    /// Strategy to manage this `MR`
    strategy: MRManageStrategy,
}

impl Drop for LocalMrInner {
    #[inline]
    #[allow(clippy::as_conversions)]
    fn drop(&mut self) {
        debug!("drop LocalMr {:?}", self);
        match self.strategy {
            crate::MRManageStrategy::Jemalloc => {
                // SAFETY: ffi
                unsafe { tikv_jemalloc_sys::free(self.addr as _) }
            }
            crate::MRManageStrategy::Raw => {
                // SAFETY: The ptr is allocated via this allocator, and the layout is the same layout
                // that was used to allocate that block of memory.
                unsafe {
                    dealloc(self.addr as _, self.layout);
                }
            }
        }
    }
}

impl MrAccess for LocalMrInner {
    #[inline]
    fn addr(&self) -> usize {
        self.addr
    }

    #[inline]
    fn length(&self) -> usize {
        self.layout.size()
    }

    #[inline]
    fn rkey(&self) -> u32 {
        self.raw.rkey()
    }
}

impl IbvAccess for LocalMrInner {
    #[inline]
    fn ibv_access(&self) -> ibv_access_flags {
        self.raw.ibv_access()
    }
}

impl LocalMrInner {
    /// Crate a new `LocalMrInner`
    pub(crate) fn new(
        addr: usize,
        layout: Layout,
        raw: Arc<RawMemoryRegion>,
        strategy: MRManageStrategy,
    ) -> Self {
        Self {
            addr,
            layout,
            raw,
            strategy,
        }
    }

    /// Get local key of memory region
    fn lkey(&self) -> u32 {
        self.raw.lkey()
    }

    /// Get pd of this memory region
    #[cfg(test)]
    pub(crate) fn pd(&self) -> &Arc<ProtectionDomain> {
        self.raw.pd()
    }
}

impl MrAccess for &LocalMr {
    #[inline]
    fn addr(&self) -> usize {
        self.addr
    }

    #[inline]
    fn length(&self) -> usize {
        self.len
    }

    #[inline]
    fn rkey(&self) -> u32 {
        self.read_inner().rkey()
    }
}

impl IbvAccess for &LocalMr {
    #[inline]
    fn ibv_access(&self) -> ibv_access_flags {
        self.read_inner().ibv_access()
    }
}

#[sealed]
unsafe impl LocalMrReadAccess for &LocalMr {
    #[inline]
    fn lkey(&self) -> u32 {
        self.read_inner().lkey()
    }

    #[inline]
    fn get_inner(&self) -> &Arc<RwLocalMrInner> {
        &self.inner
    }
}

/// A slice of `LocalMr`
#[derive(Debug)]
pub struct LocalMrSlice<'a> {
    /// The local mr where this local mr slice comes from.
    lmr: &'a LocalMr,
    /// The corresponding `RwLocalMrInner`.
    inner: Arc<RwLocalMrInner>,
    /// The start address of this mr
    addr: usize,
    /// the length of this mr
    len: usize,
}

impl MrAccess for LocalMrSlice<'_> {
    #[inline]
    fn addr(&self) -> usize {
        self.addr
    }

    #[inline]
    fn length(&self) -> usize {
        self.len
    }

    #[inline]
    fn rkey(&self) -> u32 {
        self.lmr.rkey()
    }
}

impl IbvAccess for LocalMrSlice<'_> {
    #[inline]
    fn ibv_access(&self) -> ibv_access_flags {
        self.read_inner().ibv_access()
    }
}

#[sealed]
unsafe impl LocalMrReadAccess for LocalMrSlice<'_> {
    fn lkey(&self) -> u32 {
        self.lmr.lkey()
    }

    #[inline]
    fn get_inner(&self) -> &Arc<RwLocalMrInner> {
        &self.inner
    }
}

impl<'a> LocalMrSlice<'a> {
    /// New a local mr slice.
    pub(crate) fn new(
        lmr: &'a LocalMr,
        inner: Arc<RwLocalMrInner>,
        addr: usize,
        len: usize,
    ) -> Self {
        Self {
            lmr,
            inner,
            addr,
            len,
        }
    }
}

/// Mutable local mr slice
#[derive(Debug)]
pub struct LocalMrSliceMut<'a> {
    /// The local mr where this local mr slice comes from.
    lmr: &'a mut LocalMr,
    /// The corresponding `RwLocalMrInner`.
    inner: Arc<RwLocalMrInner>,
    /// The start address of this mr
    addr: usize,
    /// the length of this mr
    len: usize,
}

impl<'a> LocalMrSliceMut<'a> {
    /// New a mutable local mr slice.
    pub(crate) fn new(
        lmr: &'a mut LocalMr,
        inner: Arc<RwLocalMrInner>,
        addr: usize,
        len: usize,
    ) -> Self {
        Self {
            lmr,
            inner,
            addr,
            len,
        }
    }
}

impl MrAccess for LocalMrSliceMut<'_> {
    #[inline]
    fn addr(&self) -> usize {
        self.addr
    }

    #[inline]
    fn length(&self) -> usize {
        self.len
    }

    #[inline]
    fn rkey(&self) -> u32 {
        self.lmr.rkey()
    }
}

impl IbvAccess for LocalMrSliceMut<'_> {
    #[inline]
    fn ibv_access(&self) -> ibv_access_flags {
        self.read_inner().ibv_access()
    }
}

#[sealed]
unsafe impl LocalMrReadAccess for LocalMrSliceMut<'_> {
    fn lkey(&self) -> u32 {
        self.lmr.lkey()
    }

    #[inline]
    fn get_inner(&self) -> &Arc<RwLocalMrInner> {
        &self.inner
    }
}

#[sealed]
unsafe impl LocalMrWriteAccess for LocalMrSliceMut<'_> {}