async-rdma 0.4.0

A rust async wrapper for RDMA ibvers lib
Documentation
//! RDMA high-level abstraction, providing several useful APIs.
//!
//! Async-rdma is a framework for writing asynchronous rdma applications with the Rust
//! programing language. At a high level, it provides a few major components:
//!
//! * Tools for establishing connections with rdma endpoints such as `RdmaBuilder`.
//!
//! *  High-level APIs for data transmission between endpoints including `read`,
//! `write`, `send`, `receive`.
//!
//! *  High-level APIs for rdma memory region management including `alloc_local_mr`,
//! `request_remote_mr`, `send_mr`, `receive_local_mr`, `receive_remote_mr`.
//!
//! *  A framework including `agent` and `event_listener` working behind APIs for memory
//! region management and executing rdma requests such as `post_send` and `poll`.
//!
//! #### Example
//! A simple example: client request a remote memory region and put data into this remote
//! memory region by rdma `write`.
//! And finally client `send_mr` to make server aware of this memory region.
//! Server `receive_local_mr`, and then get data from this mr.
//!
//! ```
//! use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
//! use portpicker::pick_unused_port;
//! use std::{
//!     alloc::Layout,
//!     io,
//!     net::{Ipv4Addr, SocketAddrV4},
//!     time::Duration,
//! };
//!
//! struct Data(String);
//!
//! async fn client(addr: SocketAddrV4) -> io::Result<()> {
//!     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
//!     let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
//!     let mut rmr = rdma.request_remote_mr(Layout::new::<Data>()).await?;
//!     // load data into lmr
//!     unsafe { *(*lmr.as_mut_ptr() as *mut Data) = Data("hello world".to_string()) };
//!     // write the content of local mr into remote mr
//!     rdma.write(&lmr, &mut rmr).await?;
//!     // then send rmr's metadata to server to make server aware of it
//!     rdma.send_remote_mr(rmr).await?;
//!     Ok(())
//! }
//!
//! #[tokio::main]
//! async fn server(addr: SocketAddrV4) -> io::Result<()> {
//!     let rdma_listener = RdmaListener::bind(addr).await?;
//!     let rdma = rdma_listener.accept(1, 1, 512).await?;
//!     // receive the metadata of the mr sent by client
//!     let lmr = rdma.receive_local_mr().await?;
//!     // print the content of lmr, which was `write` by client
//!     unsafe { println!("{}", &*(*(*lmr.as_ptr() as *const Data)).0) };
//!     // wait for the agent thread to send all reponses to the remote.
//!     tokio::time::sleep(Duration::from_secs(1)).await;
//!     Ok(())
//! }
//! #[tokio::main]
//! async fn main() {
//!     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
//!     std::thread::spawn(move || server(addr));
//!     tokio::time::sleep(Duration::from_secs(3)).await;
//!     client(addr)
//!         .await
//!         .map_err(|err| println!("{}", err))
//!         .unwrap();
//! }
//! ```
//!
//!
#![deny(
    // The following are allowed by default lints according to
    // https://doc.rust-lang.org/rustc/lints/listing/allowed-by-default.html
    anonymous_parameters,
    bare_trait_objects,
    // box_pointers, // use box pointer to allocate on heap
    // elided_lifetimes_in_paths, // allow anonymous lifetime
    missing_copy_implementations,
    missing_debug_implementations,
    missing_docs, // TODO: add documents
    single_use_lifetimes, // TODO: fix lifetime names only used once
    trivial_casts, // TODO: remove trivial casts in code
    trivial_numeric_casts,
    // unreachable_pub, allow clippy::redundant_pub_crate lint instead
    // unsafe_code,
    unstable_features,
    unused_extern_crates,
    unused_import_braces,
    unused_qualifications,
    unused_results,
    variant_size_differences,

    warnings, // treat all wanings as errors

    clippy::all,
    clippy::restriction,
    clippy::pedantic,
    // clippy::nursery, // It's still under development
    clippy::cargo,
    unreachable_pub,
)]
#![allow(
    // Some explicitly allowed Clippy lints, must have clear reason to allow
    clippy::blanket_clippy_restriction_lints, // allow clippy::restriction
    clippy::implicit_return, // actually omitting the return keyword is idiomatic Rust code
    clippy::module_name_repetitions, // repeation of module name in a struct name is not big deal
    clippy::multiple_crate_versions, // multi-version dependency crates is not able to fix
    clippy::missing_errors_doc, // TODO: add error docs
    clippy::missing_panics_doc, // TODO: add panic docs
    clippy::panic_in_result_fn,
    clippy::shadow_same, // Not too much bad
    clippy::shadow_reuse, // Not too much bad
    clippy::exhaustive_enums,
    clippy::exhaustive_structs,
    clippy::indexing_slicing,
    clippy::separated_literal_suffix, // conflicts with clippy::unseparated_literal_suffix
    clippy::single_char_lifetime_names, // TODO: change lifetime names
)]

/// The agent that handles async events in the background
mod agent;
/// The completion queue that handles the completion event
mod completion_queue;
/// The rmda device context
mod context;
/// The rmda device
pub mod device;

/// Access of `QP` and `MR`
mod access;
/// Error handling utilities
mod error_utilities;
/// The event channel that notifies the completion or error of a request
mod event_channel;
/// The driver to poll the completion queue
mod event_listener;
/// Gid for device
mod gid;
/// `HashMap` extension
mod hashmap_extension;
/// id utils
mod id;
/// Lock utilities
mod lock_utilities;
/// Memory region abstraction
mod memory_region;
/// Memory window abstraction
mod memory_window;
/// Memory Region allocator
mod mr_allocator;
/// Protection Domain
mod protection_domain;
/// Queue Pair
mod queue_pair;
/// Remote memory region manager
mod rmr_manager;
/// Work Request wrapper
mod work_request;

use access::flags_into_ibv_access;
pub use access::AccessFlag;
use agent::{Agent, MAX_MSG_LEN};
use clippy_utilities::Cast;
use completion_queue::{DEFAULT_CQ_SIZE, DEFAULT_MAX_CQE};
use context::Context;
use enumflags2::BitFlags;
use error_utilities::log_ret_last_os_err;
use event_listener::EventListener;
pub use memory_region::{
    local::{LocalMr, LocalMrReadAccess, LocalMrWriteAccess},
    remote::{RemoteMr, RemoteMrReadAccess, RemoteMrWriteAccess},
    MrAccess,
};
pub use mr_allocator::MRManageStrategy;
use mr_allocator::MrAllocator;
use protection_domain::ProtectionDomain;
use queue_pair::{
    QueuePair, QueuePairEndpoint, MAX_RECV_SGE, MAX_RECV_WR, MAX_SEND_SGE, MAX_SEND_WR,
};
use rdma_sys::ibv_access_flags;
#[cfg(feature = "cm")]
use rdma_sys::{
    rdma_addrinfo, rdma_cm_id, rdma_connect, rdma_create_ep, rdma_disconnect, rdma_freeaddrinfo,
    rdma_getaddrinfo, rdma_port_space,
};
use rmr_manager::DEFAULT_RMR_TIMEOUT;
#[cfg(feature = "cm")]
use std::ptr::null_mut;
use std::{alloc::Layout, fmt::Debug, io, ptr::NonNull, sync::Arc, time::Duration};
use tokio::{
    io::{AsyncReadExt, AsyncWriteExt},
    net::{TcpListener, TcpStream, ToSocketAddrs},
    sync::Mutex,
};
use tracing::debug;

#[macro_use]
extern crate lazy_static;

/// initial device attributes
#[derive(Debug)]
pub struct DeviceInitAttr {
    /// Rdma device name
    dev_name: Option<String>,
    /// Device port number
    port_num: u8,
    /// Gid index
    gid_index: usize,
}

impl Default for DeviceInitAttr {
    #[inline]
    fn default() -> Self {
        Self {
            dev_name: None,
            port_num: 1,
            gid_index: 1,
        }
    }
}

/// initial CQ attributes
#[derive(Debug, Clone, Copy)]
pub struct CQInitAttr {
    /// Complete queue size
    cq_size: u32,
    /// Maximum number of completion queue entries (CQE) to poll at a time.
    /// The higher the concurrency, the bigger this value should be and more memory allocated at a time.
    max_cqe: i32,
}

impl Default for CQInitAttr {
    #[inline]
    fn default() -> Self {
        Self {
            cq_size: DEFAULT_CQ_SIZE,
            max_cqe: DEFAULT_MAX_CQE,
        }
    }
}

/// initial QP attributes
#[derive(Debug, Clone, Copy)]
pub struct QPInitAttr {
    /// Access flag
    access: ibv_access_flags,
    /// Connection type
    conn_type: ConnectionType,
    /// If send/recv raw data
    raw: bool,
    /// Maximum number of outstanding send requests in the send queue
    max_send_wr: u32,
    /// Maximum number of outstanding receive requests in the receive queue
    max_recv_wr: u32,
    /// Maximum number of scatter/gather elements (SGE) in a WR on the send queue
    max_send_sge: u32,
    /// Maximum number of scatter/gather elements (SGE) in a WR on the receive queue
    max_recv_sge: u32,
}

lazy_static! {
    /// Default `ibv_access_flags`
    static ref DEFAULT_ACCESS:ibv_access_flags = ibv_access_flags::IBV_ACCESS_LOCAL_WRITE
    | ibv_access_flags::IBV_ACCESS_REMOTE_WRITE
    | ibv_access_flags::IBV_ACCESS_REMOTE_READ
    | ibv_access_flags::IBV_ACCESS_REMOTE_ATOMIC;
}

impl Default for QPInitAttr {
    #[inline]
    fn default() -> Self {
        Self {
            max_send_wr: MAX_SEND_WR,
            max_recv_wr: MAX_RECV_WR,
            max_send_sge: MAX_SEND_SGE,
            max_recv_sge: MAX_RECV_SGE,
            access: *DEFAULT_ACCESS,
            conn_type: ConnectionType::RCSocket,
            raw: false,
        }
    }
}

/// Initial `MR` attributes
#[derive(Debug, Clone, Copy)]
pub struct MRInitAttr {
    /// Access flag
    access: ibv_access_flags,
    /// Strategy to manage `MR`s
    strategy: MRManageStrategy,
}

impl Default for MRInitAttr {
    #[inline]
    fn default() -> Self {
        Self {
            access: *DEFAULT_ACCESS,
            strategy: MRManageStrategy::Jemalloc,
        }
    }
}

/// initial Agent attributes
#[derive(Debug, Clone, Copy)]
pub struct AgentInitAttr {
    /// Max length of message send/recv by Agent
    max_message_length: usize,
    /// Max access permission for remote mr requests
    max_rmr_access: ibv_access_flags,
}

impl Default for AgentInitAttr {
    #[inline]
    fn default() -> Self {
        Self {
            max_message_length: MAX_MSG_LEN,
            max_rmr_access: *DEFAULT_ACCESS,
        }
    }
}

/// The builder for the `Rdma`, it follows the builder pattern.
#[derive(Default)]
pub struct RdmaBuilder {
    /// Rdma device name
    dev_attr: DeviceInitAttr,
    /// initial CQ attributes
    cq_attr: CQInitAttr,
    /// initial QP attributes
    qp_attr: QPInitAttr,
    /// initial MR attributes
    mr_attr: MRInitAttr,
    /// initial Agent attributes
    agent_attr: AgentInitAttr,
}

impl RdmaBuilder {
    /// Create a default builder
    /// The default settings are:
    ///     dev name: None
    ///     access right: `LocalWrite` | `RemoteRead` | `RemoteWrite` | `RemoteAtomic`
    ///     complete queue size: 16
    ///     port number: 1
    ///     gid index: 1
    ///
    /// Note: We highly recommend setting the port number and the gid index.
    #[must_use]
    #[inline]
    pub fn new() -> Self {
        Self::default()
    }

    /// Create a `Rdma` from this builder
    #[inline]
    pub fn build(&self) -> io::Result<Rdma> {
        Rdma::new(&self.dev_attr, self.cq_attr, self.qp_attr, self.mr_attr)
    }

    /// Establish connection with RDMA server
    ///
    /// Used with `listen`
    ///
    /// # Examples
    ///
    /// ```
    /// use async_rdma::RdmaBuilder;
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let _rdma = RdmaBuilder::default().connect(addr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let _rdma = RdmaBuilder::default().listen(addr).await?;
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn connect<A: ToSocketAddrs>(self, addr: A) -> io::Result<Rdma> {
        match self.qp_attr.conn_type {
            ConnectionType::RCSocket => {
                let mut rdma = self.build()?;
                let remote = tcp_connect_helper(addr, &rdma.endpoint()).await?;
                rdma.qp_handshake(remote)?;
                rdma.init_agent(
                    self.agent_attr.max_message_length,
                    self.agent_attr.max_rmr_access,
                )
                .await?;
                Ok(rdma)
            }
            ConnectionType::RCCM => Err(io::Error::new(
                io::ErrorKind::Other,
                "ConnectionType should be XXSocket",
            )),
        }
    }

    /// Establish connection with RDMA CM server
    ///
    /// Application scenario can be seen in `[/example/cm_client.rs]`
    ///
    /// # Examples
    ///
    /// ```
    /// use async_rdma::{ConnectionType, RdmaBuilder};
    /// use local_ip_address::local_ip;
    /// use portpicker::pick_unused_port;
    /// use rdma_sys::*;
    /// use std::{io, ptr::null_mut, time::Duration};
    ///
    /// static SERVER_NODE: &str = "0.0.0.0\0";
    ///
    /// async fn client(node: &str, service: &str) -> io::Result<()> {
    ///     let _rdma = RdmaBuilder::default()
    ///         .set_conn_type(ConnectionType::RCCM)
    ///         .set_raw(true)
    ///         .cm_connect(node, service)
    ///         .await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(node: &str, service: &str) -> io::Result<()> {
    ///     let mut hints = unsafe { std::mem::zeroed::<rdma_addrinfo>() };
    ///     let mut res: *mut rdma_addrinfo = null_mut();
    ///     hints.ai_flags = RAI_PASSIVE.try_into().unwrap();
    ///     hints.ai_port_space = rdma_port_space::RDMA_PS_TCP.try_into().unwrap();
    ///     let mut ret = unsafe {
    ///         rdma_getaddrinfo(
    ///             node.as_ptr().cast(),
    ///             service.as_ptr().cast(),
    ///             &hints,
    ///             &mut res,
    ///         )
    ///     };
    ///     if ret != 0 {
    ///         println!("rdma_getaddrinfo");
    ///         return Err(io::Error::last_os_error());
    ///     }
    ///
    ///     let mut listen_id = null_mut();
    ///     let mut id = null_mut();
    ///     let mut init_attr = unsafe { std::mem::zeroed::<ibv_qp_init_attr>() };
    ///     init_attr.cap.max_send_wr = 1;
    ///     init_attr.cap.max_recv_wr = 1;
    ///     ret = unsafe { rdma_create_ep(&mut listen_id, res, null_mut(), &mut init_attr) };
    ///     if ret != 0 {
    ///         println!("rdma_create_ep");
    ///         return Err(io::Error::last_os_error());
    ///     }
    ///
    ///     ret = unsafe { rdma_listen(listen_id, 0) };
    ///     if ret != 0 {
    ///         println!("rdma_listen");
    ///         return Err(io::Error::last_os_error());
    ///     }
    ///
    ///     ret = unsafe { rdma_get_request(listen_id, &mut id) };
    ///     if ret != 0 {
    ///         println!("rdma_get_request");
    ///         return Err(io::Error::last_os_error());
    ///     }
    ///
    ///     ret = unsafe { rdma_accept(id, null_mut()) };
    ///     if ret != 0 {
    ///         println!("rdma_get_request");
    ///         return Err(io::Error::last_os_error());
    ///     }
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let port = pick_unused_port().unwrap();
    ///     let server_service = port.to_string() + "\0";
    ///     let client_service = server_service.clone();
    ///     std::thread::spawn(move || server(SERVER_NODE, &server_service));
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     let node = local_ip().unwrap().to_string() + "\0";
    ///     client(&node, &client_service)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    #[cfg(feature = "cm")]
    pub async fn cm_connect(self, node: &str, service: &str) -> io::Result<Rdma> {
        match self.qp_attr.conn_type {
            ConnectionType::RCSocket => Err(io::Error::new(
                io::ErrorKind::Other,
                "ConnectionType should be XXSocket",
            )),
            ConnectionType::RCCM => {
                let max_message_length = self.agent_attr.max_message_length;
                let max_rmr_access = self.agent_attr.max_rmr_access;
                let mut rdma = self.build()?;
                cm_connect_helper(&mut rdma, node, service)?;
                rdma.init_agent(max_message_length, max_rmr_access).await?;
                Ok(rdma)
            }
        }
    }

    /// Listen to the address to wait for a connection to be established
    ///
    /// Used with `connect`
    ///
    /// # Examples
    ///
    /// ```
    /// use async_rdma::RdmaBuilder;
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let _rdma = RdmaBuilder::default().connect(addr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let _rdma = RdmaBuilder::default().listen(addr).await?;
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn listen<A: ToSocketAddrs>(self, addr: A) -> io::Result<Rdma> {
        match self.qp_attr.conn_type {
            ConnectionType::RCSocket => {
                let mut rdma = self.build()?;
                let tcp_listener = TcpListener::bind(addr).await?;
                let remote = tcp_listen(&tcp_listener, &rdma.endpoint()).await?;
                rdma.qp_handshake(remote)?;
                debug!("handshake done");
                rdma.init_agent(
                    self.agent_attr.max_message_length,
                    self.agent_attr.max_rmr_access,
                )
                .await?;
                rdma.clone_attr = CloneAttr::default().set_tcp_listener(tcp_listener);
                Ok(rdma)
            }
            ConnectionType::RCCM => Err(io::Error::new(
                io::ErrorKind::Other,
                "ConnectionType should be XXSocket",
            )),
        }
    }

    /// Set device name
    #[inline]
    #[must_use]
    pub fn set_dev(mut self, dev: &str) -> Self {
        self.dev_attr.dev_name = Some(dev.to_owned());
        self
    }

    /// Set the complete queue size
    #[inline]
    #[must_use]
    pub fn set_cq_size(mut self, cq_size: u32) -> Self {
        self.cq_attr.cq_size = cq_size;
        self
    }

    /// Set the gid index
    #[inline]
    #[must_use]
    pub fn set_gid_index(mut self, gid_index: usize) -> Self {
        self.dev_attr.gid_index = gid_index;
        self
    }

    /// Set the port number
    #[inline]
    #[must_use]
    pub fn set_port_num(mut self, port_num: u8) -> Self {
        self.dev_attr.port_num = port_num;
        self
    }

    /// Set the connection type
    #[inline]
    #[must_use]
    pub fn set_conn_type(mut self, conn_type: ConnectionType) -> Self {
        self.qp_attr.conn_type = conn_type;
        self
    }

    /// Set if send/recv raw data
    #[inline]
    #[must_use]
    pub fn set_raw(mut self, raw: bool) -> Self {
        self.qp_attr.raw = raw;
        self
    }

    /// Set maximum number of outstanding send requests in the send queue
    #[inline]
    #[must_use]
    pub fn set_qp_max_send_wr(mut self, max_send_wr: u32) -> Self {
        self.qp_attr.max_send_wr = max_send_wr;
        self
    }

    /// Set maximum number of outstanding receive requests in the receive queue
    #[inline]
    #[must_use]
    pub fn set_qp_max_recv_wr(mut self, max_recv_wr: u32) -> Self {
        self.qp_attr.max_recv_wr = max_recv_wr;
        self
    }

    /// Set maximum number of scatter/gather elements (SGE) in a WR on the send queue
    #[inline]
    #[must_use]
    pub fn set_qp_max_send_sge(mut self, max_send_sge: u32) -> Self {
        self.qp_attr.max_send_sge = max_send_sge;
        self
    }

    /// Set maximum number of scatter/gather elements (SGE) in a WR on the receive queue
    #[inline]
    #[must_use]
    pub fn set_qp_max_recv_sge(mut self, max_recv_sge: u32) -> Self {
        self.qp_attr.max_recv_sge = max_recv_sge;
        self
    }

    /// Set default `QP` access
    #[inline]
    #[must_use]
    pub fn set_qp_access(mut self, flags: BitFlags<AccessFlag>) -> Self {
        self.qp_attr.access = flags_into_ibv_access(flags);
        self
    }

    /// Set default `MR` access
    #[inline]
    #[must_use]
    pub fn set_mr_access(mut self, flags: BitFlags<AccessFlag>) -> Self {
        self.mr_attr.access = flags_into_ibv_access(flags);
        self
    }

    /// Set the stragety to manage `MR`s
    #[inline]
    #[must_use]
    pub fn set_mr_strategy(mut self, strategy: MRManageStrategy) -> Self {
        self.mr_attr.strategy = strategy;
        self
    }

    /// Set max length of message send/recv by Agent
    #[inline]
    #[must_use]
    pub fn set_max_message_length(mut self, max_msg_len: usize) -> Self {
        self.agent_attr.max_message_length = max_msg_len;
        self
    }

    /// Set max access permission for remote mr requests
    #[inline]
    #[must_use]
    pub fn set_max_rmr_access(mut self, flags: BitFlags<AccessFlag>) -> Self {
        self.agent_attr.max_rmr_access = flags_into_ibv_access(flags);
        self
    }
}

impl Debug for RdmaBuilder {
    #[inline]
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("RdmaBuilder")
            .field("dev_name", &self.dev_attr.dev_name)
            .field("cq_size", &self.cq_attr.cq_size)
            .finish()
    }
}

/// Exchange metadata through tcp
async fn tcp_connect_helper<A: ToSocketAddrs>(
    addr: A,
    ep: &QueuePairEndpoint,
) -> io::Result<QueuePairEndpoint> {
    let mut stream = TcpStream::connect(addr).await?;
    let mut endpoint = bincode::serialize(ep).map_err(|e| {
        io::Error::new(
            io::ErrorKind::InvalidInput,
            format!("failed to serailize the endpoint, {:?}", e),
        )
    })?;
    stream.write_all(&endpoint).await?;
    // the byte number is not important, as read_exact will fill the buffer
    let _ = stream.read_exact(endpoint.as_mut()).await?;
    bincode::deserialize(&endpoint).map_err(|e| {
        io::Error::new(
            io::ErrorKind::InvalidInput,
            format!("failed to deserailize the endpoint, {:?}", e),
        )
    })
}

/// Listen for exchanging metadata through tcp
async fn tcp_listen(
    tcp_listener: &TcpListener,
    ep: &QueuePairEndpoint,
) -> io::Result<QueuePairEndpoint> {
    let (mut stream, _) = tcp_listener.accept().await?;

    let endpoint_size = bincode::serialized_size(ep).map_err(|e| {
        io::Error::new(
            io::ErrorKind::Other,
            format!("Endpoint serialization failed, {:?}", e),
        )
    })?;
    let mut remote = vec![0_u8; endpoint_size.cast()];
    // the byte number is not important, as read_exact will fill the buffer
    let _ = stream.read_exact(remote.as_mut()).await?;
    let remote: QueuePairEndpoint = bincode::deserialize(&remote).map_err(|e| {
        io::Error::new(
            io::ErrorKind::Other,
            format!("failed to deserialize remote endpoint, {:?}", e),
        )
    })?;
    let local = bincode::serialize(ep).map_err(|e| {
        io::Error::new(
            io::ErrorKind::Other,
            format!("failed to deserialize remote endpoint, {:?}", e),
        )
    })?;
    stream.write_all(&local).await?;
    Ok(remote)
}

/// Exchange metadata and setup connection through cm
#[inline]
#[cfg(feature = "cm")]
fn cm_connect_helper(rdma: &mut Rdma, node: &str, service: &str) -> io::Result<()> {
    // SAFETY: POD FFI type
    let mut hints = unsafe { std::mem::zeroed::<rdma_addrinfo>() };
    let mut info: *mut rdma_addrinfo = null_mut();
    hints.ai_port_space = rdma_port_space::RDMA_PS_TCP.cast();
    // Safety: ffi
    let mut ret = unsafe {
        rdma_getaddrinfo(
            node.as_ptr().cast(),
            service.as_ptr().cast(),
            &hints,
            &mut info,
        )
    };
    if ret != 0_i32 {
        return Err(log_ret_last_os_err());
    }

    let mut id: *mut rdma_cm_id = null_mut();
    // Safety: ffi
    ret = unsafe { rdma_create_ep(&mut id, info, rdma.pd.as_ptr(), null_mut()) };
    if ret != 0_i32 {
        // Safety: ffi
        unsafe {
            rdma_freeaddrinfo(info);
        }
        return Err(log_ret_last_os_err());
    }

    // Safety: id was initialized by `rdma_create_ep`
    unsafe {
        debug!(
            "cm_id: {:?},{:?},{:?},{:?},{:?},{:?},{:?}",
            (*id).qp,
            (*id).pd,
            (*id).verbs,
            (*id).recv_cq_channel,
            (*id).send_cq_channel,
            (*id).recv_cq,
            (*id).send_cq
        );
        (*id).qp = rdma.qp.as_ptr();
        (*id).pd = rdma.pd.as_ptr();
        (*id).verbs = rdma.ctx.as_ptr();
        (*id).recv_cq_channel = rdma.qp.event_listener.cq.event_channel().as_ptr();
        (*id).recv_cq_channel = rdma.qp.event_listener.cq.event_channel().as_ptr();
        (*id).recv_cq = rdma.qp.event_listener.cq.as_ptr();
        (*id).send_cq = rdma.qp.event_listener.cq.as_ptr();
        debug!(
            "cm_id: {:?},{:?},{:?},{:?},{:?},{:?},{:?}",
            (*id).qp,
            (*id).pd,
            (*id).verbs,
            (*id).recv_cq_channel,
            (*id).send_cq_channel,
            (*id).recv_cq,
            (*id).send_cq
        );
    }
    // Safety: ffi
    ret = unsafe { rdma_connect(id, null_mut()) };
    if ret != 0_i32 {
        // Safety: ffi
        unsafe {
            let _ = rdma_disconnect(id);
        }
        return Err(log_ret_last_os_err());
    }

    Ok(())
}

/// Method of establishing a connection
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionType {
    /// Establish reliable connection through `Socket` APIs.
    RCSocket,
    /// Establish reliable connection through `CM` APIs.
    RCCM,
}

/// Attributes for creating new `Rdma`s through `clone`
#[derive(Debug, Clone, Default)]
pub(crate) struct CloneAttr {
    /// Tcp listener used for new connections
    pub(crate) tcp_listener: Option<Arc<Mutex<TcpListener>>>,
    /// Clone `Rdma` with new `ProtectionDomain`
    pub(crate) pd: Option<Arc<ProtectionDomain>>,
    /// Clone `Rdma` with new `Port number`
    pub(crate) port_num: Option<u8>,
    /// Clone `Rdma` with new qp access
    pub(crate) qp_access: Option<ibv_access_flags>,
    /// Clone `Rdma` with new max access permission for remote mr requests
    pub(crate) max_rmr_access: Option<ibv_access_flags>,
}

impl CloneAttr {
    /// Set `TcpListener`
    fn set_tcp_listener(mut self, tcp_listener: TcpListener) -> Self {
        self.tcp_listener = Some(Arc::new(Mutex::new(tcp_listener)));
        self
    }

    /// Set `ProtectionDomain`
    #[allow(dead_code)] // not yet fully implemented
    fn set_pd(mut self, pd: ProtectionDomain) -> Self {
        self.pd = Some(Arc::new(pd));
        self
    }

    /// Set port number
    fn set_port_num(mut self, port_num: u8) -> Self {
        self.port_num = Some(port_num);
        self
    }

    /// Set qp access
    fn set_qp_access(mut self, access: ibv_access_flags) -> Self {
        self.qp_access = Some(access);
        self
    }

    /// Set max access permission for remote mr requests with new agent
    fn set_max_rmr_access(mut self, access: ibv_access_flags) -> Self {
        self.max_rmr_access = Some(access);
        self
    }
}

/// Rdma handler, the only interface that the users deal with rdma
#[derive(Debug)]
pub struct Rdma {
    /// device context
    ctx: Arc<Context>,
    /// protection domain
    pd: Arc<ProtectionDomain>,
    /// Memory region allocator
    allocator: Arc<MrAllocator>,
    /// Queue pair
    qp: Arc<QueuePair>,
    /// Background agent
    agent: Option<Arc<Agent>>,
    /// Connection type
    conn_type: ConnectionType,
    /// If send/recv raw data
    raw: bool,
    /// Attributes for creating new `Rdma`s through `clone`
    clone_attr: CloneAttr,
}

impl Rdma {
    /// create a new `Rdma` instance
    fn new(
        dev_attr: &DeviceInitAttr,
        cq_attr: CQInitAttr,
        qp_attr: QPInitAttr,
        mr_attr: MRInitAttr,
    ) -> io::Result<Self> {
        let ctx = Arc::new(Context::open(
            dev_attr.dev_name.as_deref(),
            dev_attr.port_num,
            dev_attr.gid_index,
        )?);
        let ec = ctx.create_event_channel()?;
        let cq = Arc::new(ctx.create_completion_queue(cq_attr.cq_size, ec, cq_attr.max_cqe)?);
        let event_listener = EventListener::new(cq);
        let pd = Arc::new(ctx.create_protection_domain()?);
        let allocator = Arc::new(MrAllocator::new(
            Arc::<ProtectionDomain>::clone(&pd),
            mr_attr,
        ));
        let mut qp = pd
            .create_queue_pair_builder()
            .set_event_listener(event_listener)
            .set_port_num(dev_attr.port_num)
            .set_gid_index(dev_attr.gid_index)
            .set_max_send_wr(qp_attr.max_send_wr)
            .set_max_send_sge(qp_attr.max_send_sge)
            .set_max_recv_wr(qp_attr.max_recv_wr)
            .set_max_recv_sge(qp_attr.max_recv_sge)
            .build()?;
        qp.modify_to_init(qp_attr.access, dev_attr.port_num)?;
        Ok(Self {
            ctx,
            pd,
            qp: Arc::new(qp),
            agent: None,
            allocator,
            conn_type: qp_attr.conn_type,
            raw: qp_attr.raw,
            clone_attr: CloneAttr::default(),
        })
    }

    /// Create a new `Rdma` that has the same `mr_allocator` and `event_listener` as parent.
    fn clone(&self) -> io::Result<Self> {
        let qp_access = self.clone_attr.qp_access.map_or_else(
            || {
                self.qp.access.map_or_else(
                    || {
                        Err(io::Error::new(
                            io::ErrorKind::Other,
                            "parent qp access is none",
                        ))
                    },
                    Ok,
                )
            },
            Ok,
        )?;
        let port_num = self.clone_attr.port_num.unwrap_or(self.qp.port_num);
        let pd = self
            .clone_attr
            .pd
            .as_ref()
            .map_or_else(|| &self.pd, |pd| pd);

        let mut qp_init_attr = self.qp.qp_init_attr.clone();
        // SAFETY: ffi
        let inner_qp = NonNull::new(unsafe {
            rdma_sys::ibv_create_qp(self.pd.as_ptr(), &mut qp_init_attr.qp_init_attr_inner)
        })
        .ok_or_else(log_ret_last_os_err)?;
        let mut qp = QueuePair {
            pd: Arc::clone(pd),
            event_listener: Arc::clone(&self.qp.event_listener),
            inner_qp,
            port_num,
            gid_index: self.qp.gid_index,
            qp_init_attr,
            access: Some(qp_access),
        };
        qp.modify_to_init(qp_access, self.qp.port_num)?;
        Ok(Self {
            ctx: Arc::clone(&self.ctx),
            pd: Arc::clone(pd),
            qp: Arc::new(qp),
            agent: None,
            allocator: Arc::clone(&self.allocator),
            conn_type: self.conn_type,
            raw: self.raw,
            clone_attr: self.clone_attr.clone(),
        })
    }

    /// get the queue pair endpoint information
    fn endpoint(&self) -> QueuePairEndpoint {
        self.qp.endpoint()
    }

    /// to hand shake the qp so that it works
    fn qp_handshake(&mut self, remote: QueuePairEndpoint) -> io::Result<()> {
        self.qp.modify_to_rtr(remote, 0, 1, 0x12)?;
        debug!("rtr");
        self.qp.modify_to_rts(0x12, 6, 6, 0, 1)?;
        debug!("rts");
        Ok(())
    }

    /// Agent init helper
    async fn init_agent(
        &mut self,
        max_message_length: usize,
        max_rmr_access: ibv_access_flags,
    ) -> io::Result<()> {
        if !self.raw {
            let agent = Arc::new(Agent::new(
                Arc::<QueuePair>::clone(&self.qp),
                Arc::<MrAllocator>::clone(&self.allocator),
                max_message_length,
                max_rmr_access,
            )?);
            self.agent = Some(agent);
            // wait for the remote agent to prepare
            tokio::time::sleep(Duration::from_secs(1)).await;
        }
        Ok(())
    }

    /// Listen for new connections using the same `mr_allocator` and `event_listener` as parent `Rdma`
    ///
    /// Used with `connect` and `new_connect`
    ///
    /// # Examples
    ///
    /// ```
    /// use async_rdma::RdmaBuilder;
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().connect(addr).await?;
    ///     for _ in 0..3 {
    ///         let _new_rdma = rdma.new_connect(addr).await?;
    ///     }
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().listen(addr).await?;
    ///     for _ in 0..3 {
    ///         let _new_rdma = rdma.listen().await?;
    ///     }
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn listen(&self) -> io::Result<Self> {
        match self.conn_type {
            ConnectionType::RCSocket => {
                let mut rdma = self.clone()?;
                let remote = self
                    .clone_attr
                    .tcp_listener
                    .as_ref()
                    .map_or_else(
                        || Err(io::Error::new(io::ErrorKind::Other, "tcp_listener is None")),
                        |tcp_listener| {
                            Ok(async {
                                let tcp_listener = tcp_listener.lock().await;
                                tcp_listen(&tcp_listener, &rdma.endpoint()).await
                            })
                        },
                    )?
                    .await?;
                rdma.qp_handshake(remote)?;
                debug!("handshake done");
                #[allow(clippy::unreachable)]
                let (max_message_length, max_rmr_access) = self.agent.as_ref().map_or_else(
                    || {
                        unreachable!("agent of parent rdma is None");
                    },
                    |agent| (agent.max_msg_len(), agent.max_rmr_access()),
                );
                let max_rmr_access = self
                    .clone_attr
                    .max_rmr_access
                    .map_or(max_rmr_access, |new_access| new_access);
                rdma.init_agent(max_message_length, max_rmr_access).await?;
                Ok(rdma)
            }
            ConnectionType::RCCM => Err(io::Error::new(
                io::ErrorKind::Other,
                "ConnectionType should be XXSocket",
            )),
        }
    }

    /// Establish new connections with RDMA server using the same `mr_allocator` and `event_listener` as parent `Rdma`
    ///
    /// Used with `listen`
    ///
    /// # Examples
    ///
    /// ```
    /// use async_rdma::RdmaBuilder;
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().connect(addr).await?;
    ///     for _ in 0..3 {
    ///         let _new_rdma = rdma.new_connect(addr).await?;
    ///     }
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().listen(addr).await?;
    ///     for _ in 0..3 {
    ///         let _new_rdma = rdma.listen().await?;
    ///     }
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn new_connect<A: ToSocketAddrs>(&self, addr: A) -> io::Result<Self> {
        match self.conn_type {
            ConnectionType::RCSocket => {
                let mut rdma = self.clone()?;
                let remote = tcp_connect_helper(addr, &rdma.endpoint()).await?;
                rdma.qp_handshake(remote)?;
                #[allow(clippy::unreachable)]
                let (max_message_length, max_rmr_access) = self.agent.as_ref().map_or_else(
                    || {
                        unreachable!("agent of parent rdma is None");
                    },
                    |agent| (agent.max_msg_len(), agent.max_rmr_access()),
                );
                let max_rmr_access = self
                    .clone_attr
                    .max_rmr_access
                    .map_or(max_rmr_access, |new_access| new_access);
                rdma.init_agent(max_message_length, max_rmr_access).await?;
                Ok(rdma)
            }
            ConnectionType::RCCM => Err(io::Error::new(
                io::ErrorKind::Other,
                "ConnectionType should be XXSocket",
            )),
        }
    }

    /// Send the content in the `lm`
    ///
    /// Used with `receive`.
    /// Application scenario such as: client put data into a local mr and `send` to server.
    /// Server `receive` the mr sent by client and process data in it.
    ///
    /// Application scenario can be seen in `[/example/rpc.rs]`
    ///
    /// # Examples
    /// ```
    /// use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// struct Data(String);
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    ///     // put data into lmr
    ///     unsafe { *(*lmr.as_mut_ptr() as *mut Data) = Data("hello world".to_string()) };
    ///     // send the content of lmr to server
    ///     rdma.send(&lmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // receive the data sent by client and put it into an mr
    ///     let lmr = rdma.receive().await?;
    ///     // read data from mr
    ///     unsafe {
    ///         assert_eq!(
    ///             "hello world".to_string(),
    ///             *(*(*lmr.as_ptr() as *const Data)).0
    ///         )
    ///     };
    ///     // wait for the agent thread to send all reponses to the remote.
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn send(&self, lm: &LocalMr) -> io::Result<()> {
        self.agent
            .as_ref()
            .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Agent is not ready"))?
            .send_data(lm, None)
            .await
    }

    /// Send raw data in the lm
    ///
    /// Used with `receive_raw`.
    ///
    /// # Examples
    ///
    ///  ```
    /// use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, RdmaBuilder};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io::{self, Write},
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// const RAW_DATA: [u8; 8] = [1_u8; 8];
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().set_raw(true).connect(addr).await?;
    ///     let mut lmr = rdma.alloc_local_mr(Layout::for_value(&RAW_DATA))?;
    ///     // put data into lmr
    ///     let _num = lmr.as_mut_slice().write(&RAW_DATA)?;
    ///     // wait for serer to receive first
    ///     tokio::time::sleep(Duration::from_millis(100)).await;
    ///     // send the content of lmr to server
    ///     rdma.send_raw(&lmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().set_raw(true).listen(addr).await?;
    ///     // receive the data sent by client and put it into an mr
    ///     let lmr = rdma.receive_raw(Layout::for_value(&RAW_DATA)).await?;
    ///     // read data from mr
    ///     assert_eq!(*lmr.as_slice(), RAW_DATA);
    ///     // wait for the agent thread to send all reponses to the remote.
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    #[cfg(feature = "raw")]
    pub async fn send_raw(&self, lm: &LocalMr) -> io::Result<()> {
        self.qp.send_sge_raw(&[lm], None).await
    }

    /// Send raw data in the lm with imm
    ///
    /// Used with `receive_raw_with_imm`
    ///
    /// # Examples
    ///
    ///  ```
    /// use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, RdmaBuilder};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io::{self, Write},
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// const RAW_DATA: [u8; 8] = [1_u8; 8];
    /// const IMM: u32 = 1_u32;
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().set_raw(true).connect(addr).await?;
    ///     let mut lmr = rdma.alloc_local_mr(Layout::for_value(&RAW_DATA))?;
    ///     // put data into lmr
    ///     let _num = lmr.as_mut_slice().write(&RAW_DATA)?;
    ///     // wait for serer to receive first
    ///     tokio::time::sleep(Duration::from_millis(100)).await;
    ///     // send the content of lmr to server
    ///     rdma.send_raw_with_imm(&lmr, IMM).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().set_raw(true).listen(addr).await?;
    ///     // receive the data sent by client and put it into an mr
    ///     let (lmr, imm) = rdma
    ///         .receive_raw_with_imm(Layout::for_value(&RAW_DATA))
    ///         .await?;
    ///     // read data from mr
    ///     assert_eq!(*lmr.as_slice(), RAW_DATA);
    ///     assert_eq!(imm, Some(IMM));
    ///     // wait for the agent thread to send all reponses to the remote.
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    #[cfg(feature = "raw")]
    pub async fn send_raw_with_imm(&self, lm: &LocalMr, imm: u32) -> io::Result<()> {
        self.qp.send_sge_raw(&[lm], Some(imm)).await
    }

    /// Send the content in the `lm` with immediate date.
    ///
    /// Used with `receive_with_imm`.
    ///
    /// # Examples
    /// ```rust
    /// use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// struct Data(String);
    /// static IMM_NUM: u32 = 123;
    /// static MSG: &str = "hello world";
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    ///     // put data into lmr
    ///     unsafe { std::ptr::write(*lmr.as_mut_ptr() as *mut Data, Data(MSG.to_string())) };
    ///     // send the content of lmr and imm data to server
    ///     rdma.send_with_imm(&lmr, IMM_NUM).await?;
    ///     rdma.send_with_imm(&lmr, IMM_NUM).await?;
    ///     rdma.send(&lmr).await?;
    ///     rdma.send(&lmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // receive the data and imm sent by the client
    ///     let (lmr, imm) = rdma.receive_with_imm().await?;
    ///     assert_eq!(imm, Some(IMM_NUM));
    ///     unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    ///     // receive the data in mr while avoiding the immediate data is ok.
    ///     let lmr = rdma.receive().await?;
    ///     unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    ///     // `receive_with_imm` works well even if the client didn't send any immediate data.
    ///     // the imm received will be a `None`.
    ///     let (lmr, imm) = rdma.receive_with_imm().await?;
    ///     assert_eq!(imm, None);
    ///     unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    ///     // compared to the above, using `receive` is a better choice.
    ///     let lmr = rdma.receive().await?;
    ///     unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    ///     // wait for the agent thread to send all reponses to the remote.
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     let server_handle = std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    ///     server_handle.join().unwrap().unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn send_with_imm(&self, lm: &LocalMr, imm: u32) -> io::Result<()> {
        self.agent
            .as_ref()
            .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Agent is not ready"))?
            .send_data(lm, Some(imm))
            .await
    }

    /// Receive the content and stored in the returned memory region
    ///
    /// Used with `send`.
    /// Application scenario such as: client put data into a local mr and `send` to server.
    /// Server `receive` the mr sent by client and process data in it.
    ///
    /// Application scenario can be seen in `[/example/rpc.rs]`
    ///
    /// # Examples
    /// ```
    /// use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// struct Data(String);
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    ///     // put data into lmr
    ///     unsafe { *(*lmr.as_mut_ptr() as *mut Data) = Data("hello world".to_string()) };
    ///     // send the content of lmr to server
    ///     rdma.send(&lmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // receive the data sent by client and put it into an mr
    ///     let lmr = rdma.receive().await?;
    ///     // read data from mr
    ///     unsafe {
    ///         assert_eq!(
    ///             "hello world".to_string(),
    ///             *(*(*lmr.as_ptr() as *const Data)).0
    ///         )
    ///     };
    ///     // wait for the agent thread to send all reponses to the remote.
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn receive(&self) -> io::Result<LocalMr> {
        let (lmr, _) = self.receive_with_imm().await?;
        Ok(lmr)
    }

    /// Receive raw data
    ///
    /// Used with `send_raw`.
    ///
    /// # Examples
    ///
    ///  ```
    /// use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, RdmaBuilder};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io::{self, Write},
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// const RAW_DATA: [u8; 8] = [1_u8; 8];
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().set_raw(true).connect(addr).await?;
    ///     let mut lmr = rdma.alloc_local_mr(Layout::for_value(&RAW_DATA))?;
    ///     // put data into lmr
    ///     let _num = lmr.as_mut_slice().write(&RAW_DATA)?;
    ///     // wait for serer to receive first
    ///     tokio::time::sleep(Duration::from_millis(100)).await;
    ///     // send the content of lmr to server
    ///     rdma.send_raw(&lmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().set_raw(true).listen(addr).await?;
    ///     // receive the data sent by client and put it into an mr
    ///     let lmr = rdma.receive_raw(Layout::for_value(&RAW_DATA)).await?;
    ///     // read data from mr
    ///     assert_eq!(*lmr.as_slice(), RAW_DATA);
    ///     // wait for the agent thread to send all reponses to the remote.
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    #[cfg(feature = "raw")]
    pub async fn receive_raw(&self, layout: Layout) -> io::Result<LocalMr> {
        let mut lmr = self.alloc_local_mr(layout)?;
        let _imm = self.qp.receive_sge_raw(&[&mut lmr]).await?;
        Ok(lmr)
    }

    /// Receive raw data with imm
    ///
    /// Used with `send_raw_with_imm`
    ///
    /// # Examples
    ///
    ///  ```
    /// use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, RdmaBuilder};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io::{self, Write},
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// const RAW_DATA: [u8; 8] = [1_u8; 8];
    /// const IMM: u32 = 1_u32;
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().set_raw(true).connect(addr).await?;
    ///     let mut lmr = rdma.alloc_local_mr(Layout::for_value(&RAW_DATA))?;
    ///     // put data into lmr
    ///     let _num = lmr.as_mut_slice().write(&RAW_DATA)?;
    ///     // wait for serer to receive first
    ///     tokio::time::sleep(Duration::from_millis(100)).await;
    ///     // send the content of lmr to server
    ///     rdma.send_raw_with_imm(&lmr, IMM).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().set_raw(true).listen(addr).await?;
    ///     // receive the data sent by client and put it into an mr
    ///     let (lmr, imm) = rdma
    ///         .receive_raw_with_imm(Layout::for_value(&RAW_DATA))
    ///         .await?;
    ///     // read data from mr
    ///     assert_eq!(*lmr.as_slice(), RAW_DATA);
    ///     assert_eq!(imm, Some(IMM));
    ///     // wait for the agent thread to send all reponses to the remote.
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    #[cfg(feature = "raw")]
    pub async fn receive_raw_with_imm(&self, layout: Layout) -> io::Result<(LocalMr, Option<u32>)> {
        let mut lmr = self.alloc_local_mr(layout)?;
        let imm = self.qp.receive_sge_raw(&[&mut lmr]).await?;
        Ok((lmr, imm))
    }

    /// Receive the content and stored in the returned memory region.
    ///
    /// Used with `send_with_imm`.
    ///
    /// # Examples
    /// ```rust
    /// use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// struct Data(String);
    /// static IMM_NUM: u32 = 123;
    /// static MSG: &str = "hello world";
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    ///     // put data into lmr
    ///     unsafe { std::ptr::write(*lmr.as_mut_ptr() as *mut Data, Data(MSG.to_string())) };
    ///     // send the content of lmr and imm data to server
    ///     rdma.send_with_imm(&lmr, IMM_NUM).await?;
    ///     rdma.send_with_imm(&lmr, IMM_NUM).await?;
    ///     rdma.send(&lmr).await?;
    ///     rdma.send(&lmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // receive the data and imm sent by the client
    ///     let (lmr, imm) = rdma.receive_with_imm().await?;
    ///     assert_eq!(imm, Some(IMM_NUM));
    ///     unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    ///     // receive the data in mr while avoiding the immediate data is ok.
    ///     let lmr = rdma.receive().await?;
    ///     unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    ///     // `receive_with_imm` works well even if the client didn't send any immediate data.
    ///     // the imm received will be a `None`.
    ///     let (lmr, imm) = rdma.receive_with_imm().await?;
    ///     assert_eq!(imm, None);
    ///     unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    ///     // compared to the above, using `receive` is a better choice.
    ///     let lmr = rdma.receive().await?;
    ///     unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    ///     // wait for the agent thread to send all reponses to the remote.
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     let server_handle = std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    ///     server_handle.join().unwrap().unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn receive_with_imm(&self) -> io::Result<(LocalMr, Option<u32>)> {
        self.agent
            .as_ref()
            .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Agent is not ready"))?
            .receive_data()
            .await
    }

    /// Receive the immediate data sent by `write_with_imm`.
    ///
    /// Used with `write_with_imm`.
    ///
    /// # Examples
    /// ```rust
    /// use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// static IMM_NUM: u32 = 123;
    /// struct Data(String);
    ///
    /// static MSG: &str = "hello world";
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    ///     let mut rmr = rdma.request_remote_mr(Layout::new::<Data>()).await?;
    ///     let data = Data(MSG.to_string());
    ///     unsafe { *(*lmr.as_mut_ptr() as *mut Data) = data };
    ///     // write the content of lmr to remote mr with immediate data.
    ///     rdma.write_with_imm(&lmr, &mut rmr, IMM_NUM).await?;
    ///     // then send the metadata of rmr to server to make server aware of this mr.
    ///     rdma.send_remote_mr(rmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // receive the immediate data sent by `write_with_imm`
    ///     let imm = rdma.receive_write_imm().await?;
    ///     assert_eq!(imm, IMM_NUM);
    ///     // receive the metadata of the lmr that had been requested by client
    ///     let lmr = rdma.receive_local_mr().await?;
    ///     // assert the content of lmr, which was `write` by client
    ///     unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    ///     // wait for the agent thread to send all reponses to the remote.
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     let server_handle = std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    ///     server_handle.join().unwrap().unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn receive_write_imm(&self) -> io::Result<u32> {
        self.agent
            .as_ref()
            .ok_or_else(|| io::Error::new(io::ErrorKind::Other, "Agent is not ready"))?
            .receive_imm()
            .await
    }

    /// Read content in the `rm` and store the content in the `lm`
    ///
    /// Application scenario such as: client put data into a local mr and `send_mr` to server.
    /// Server get a remote mr by `receive_remote_mr`, and then get data from this rmr by rdma `read`.
    ///
    /// Application scenario can be seen in `[/example/rpc.rs]`
    ///
    /// # Examples
    /// ```
    /// use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// struct Data(String);
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    ///     // put data into lmr
    ///     unsafe { *(*lmr.as_mut_ptr() as *mut Data) = Data("hello world".to_string()) };
    ///     // then send the metadata of this lmr to server to make server aware of this mr.
    ///     rdma.send_local_mr(lmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    ///     // receive the metadata of rmr sent by client
    ///     let rmr = rdma.receive_remote_mr().await?;
    ///     // `read` data from rmr to lmr
    ///     rdma.read(&mut lmr, &rmr).await?;
    ///     // assert the content of lmr, which was get from rmr by rdma `read`
    ///     unsafe {
    ///         assert_eq!(
    ///             "hello world".to_string(),
    ///             *(*(*lmr.as_ptr() as *const Data)).0
    ///         )
    ///     };
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn read<LW, RR>(&self, lm: &mut LW, rm: &RR) -> io::Result<()>
    where
        LW: LocalMrWriteAccess,
        RR: RemoteMrReadAccess,
    {
        self.qp.read(lm, rm).await
    }

    /// Write content in the `lm` to `rm`
    ///
    /// Application scenario such as: client request a remote mr through `request_remote_mr`,
    /// and then put data into this rmr by rdma `write`. After all client `send_mr` to make
    /// server aware of this mr.
    /// After client `send_mr`, server `receive_local_mr`, and then get data from this mr.
    ///
    /// Application scenario can be seen in `[/example/rpc.rs]`
    ///
    /// # Examples
    /// ```
    /// use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// struct Data(String);
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    ///     let mut rmr = rdma.request_remote_mr(Layout::new::<Data>()).await?;
    ///     // put data into lmr
    ///     unsafe { *(*lmr.as_mut_ptr() as *mut Data) = Data("hello world".to_string()) };
    ///     // write the content of local mr into remote mr
    ///     rdma.write(&lmr, &mut rmr).await?;
    ///     // then send the metadata of rmr to server to make server aware of this mr.
    ///     rdma.send_remote_mr(rmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // receive the metadata of the lmr that had been requested by client
    ///     let lmr = rdma.receive_local_mr().await?;
    ///     // assert the content of lmr, which was `write` by client
    ///     unsafe {
    ///         assert_eq!(
    ///             "hello world".to_string(),
    ///             *(*(*lmr.as_ptr() as *const Data)).0
    ///         )
    ///     };
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn write<LR, RW>(&self, lm: &LR, rm: &mut RW) -> io::Result<()>
    where
        LR: LocalMrReadAccess,
        RW: RemoteMrWriteAccess,
    {
        self.qp.write(lm, rm, None).await
    }

    /// Write content in the `lm` to `rm` and send a immediate data which
    /// will consume a `rdma receive work request` in the receiver's `receive queue`.
    /// The receiver can receive this immediate data by using `receive_write_imm`.
    ///
    /// Used with `receive_write_imm`.
    ///
    /// # Examples
    /// ```rust
    /// use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// static IMM_NUM: u32 = 123;
    /// struct Data(String);
    ///
    /// static MSG: &str = "hello world";
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    ///     let mut rmr = rdma.request_remote_mr(Layout::new::<Data>()).await?;
    ///     let data = Data(MSG.to_string());
    ///     unsafe { *(*lmr.as_mut_ptr() as *mut Data) = data };
    ///     // write the content of lmr to server with immediate data.
    ///     rdma.write_with_imm(&lmr, &mut rmr, IMM_NUM).await?;
    ///     // then send the metadata of rmr to server to make server aware of this mr.
    ///     rdma.send_remote_mr(rmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // receive the immediate data sent by `write_with_imm`
    ///     let imm = rdma.receive_write_imm().await?;
    ///     assert_eq!(imm, IMM_NUM);
    ///     // receive the metadata of the lmr that had been requested by client
    ///     let lmr = rdma.receive_local_mr().await?;
    ///     // assert the content of lmr, which was `write` by client
    ///     unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    ///     // wait for the agent thread to send all reponses to the remote.
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     let server_handle = std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    ///     server_handle.join().unwrap().unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn write_with_imm<LR, RW>(&self, lm: &LR, rm: &mut RW, imm: u32) -> io::Result<()>
    where
        LR: LocalMrReadAccess,
        RW: RemoteMrWriteAccess,
    {
        self.qp.write(lm, rm, Some(imm)).await
    }

    /// Connect the remote endpoint and build rmda queue pair by TCP connection
    ///
    /// `gid_index`: 0:ipv6, 1:ipv4
    /// `max_message_length`: max length of msg used in `send`&`receive`.
    ///
    /// # Examples
    /// ```
    /// use async_rdma::{Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let _rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let _rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // run here after client connect
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn connect<A: ToSocketAddrs>(
        addr: A,
        port_num: u8,
        gid_index: usize,
        max_message_length: usize,
    ) -> io::Result<Self> {
        let mut rdma = RdmaBuilder::default()
            .set_port_num(port_num)
            .set_gid_index(gid_index)
            .build()?;
        assert!(
            rdma.conn_type == ConnectionType::RCSocket,
            "should set connection type to RCSocket"
        );
        let remote = tcp_connect_helper(addr, &rdma.endpoint()).await?;
        rdma.qp_handshake(remote)?;
        rdma.init_agent(max_message_length, *DEFAULT_ACCESS).await?;
        // wait for server to initialize
        tokio::time::sleep(Duration::from_secs(1)).await;
        Ok(rdma)
    }

    /// Establish connection with RDMA CM server
    ///
    /// Application scenario can be seen in `[/example/cm_client.rs]`
    ///
    /// # Examples
    ///
    /// ```
    /// use async_rdma::Rdma;
    /// use local_ip_address::local_ip;
    /// use portpicker::pick_unused_port;
    /// use rdma_sys::*;
    /// use std::{io, ptr::null_mut, time::Duration};
    ///
    /// static SERVER_NODE: &str = "0.0.0.0\0";
    ///
    /// async fn client(node: &str, service: &str) -> io::Result<()> {
    ///     let _rdma = Rdma::cm_connect(node, service, 1, 1, 0).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(node: &str, service: &str) -> io::Result<()> {
    ///     let mut hints = unsafe { std::mem::zeroed::<rdma_addrinfo>() };
    ///     let mut res: *mut rdma_addrinfo = null_mut();
    ///     hints.ai_flags = RAI_PASSIVE.try_into().unwrap();
    ///     hints.ai_port_space = rdma_port_space::RDMA_PS_TCP.try_into().unwrap();
    ///     let mut ret = unsafe {
    ///         rdma_getaddrinfo(
    ///             node.as_ptr().cast(),
    ///             service.as_ptr().cast(),
    ///             &hints,
    ///             &mut res,
    ///         )
    ///     };
    ///     if ret != 0 {
    ///         println!("rdma_getaddrinfo");
    ///         return Err(io::Error::last_os_error());
    ///     }
    ///
    ///     let mut listen_id = null_mut();
    ///     let mut id = null_mut();
    ///     let mut init_attr = unsafe { std::mem::zeroed::<ibv_qp_init_attr>() };
    ///     init_attr.cap.max_send_wr = 1;
    ///     init_attr.cap.max_recv_wr = 1;
    ///     ret = unsafe { rdma_create_ep(&mut listen_id, res, null_mut(), &mut init_attr) };
    ///     if ret != 0 {
    ///         println!("rdma_create_ep");
    ///         return Err(io::Error::last_os_error());
    ///     }
    ///
    ///     ret = unsafe { rdma_listen(listen_id, 0) };
    ///     if ret != 0 {
    ///         println!("rdma_listen");
    ///         return Err(io::Error::last_os_error());
    ///     }
    ///
    ///     ret = unsafe { rdma_get_request(listen_id, &mut id) };
    ///     if ret != 0 {
    ///         println!("rdma_get_request");
    ///         return Err(io::Error::last_os_error());
    ///     }
    ///
    ///     ret = unsafe { rdma_accept(id, null_mut()) };
    ///     if ret != 0 {
    ///         println!("rdma_get_request");
    ///         return Err(io::Error::last_os_error());
    ///     }
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let port = pick_unused_port().unwrap();
    ///     let server_service = port.to_string() + "\0";
    ///     let client_service = server_service.clone();
    ///     std::thread::spawn(move || server(SERVER_NODE, &server_service));
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     let node = local_ip().unwrap().to_string() + "\0";
    ///     client(&node, &client_service)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    #[cfg(feature = "cm")]
    pub async fn cm_connect(
        node: &str,
        service: &str,
        port_num: u8,
        gid_index: usize,
        max_message_length: usize,
    ) -> io::Result<Self> {
        let mut rdma = RdmaBuilder::default()
            .set_port_num(port_num)
            .set_gid_index(gid_index)
            .set_raw(true)
            .set_conn_type(ConnectionType::RCCM)
            .build()?;
        assert!(
            rdma.conn_type == ConnectionType::RCCM,
            "should set connection type to RCSocket"
        );
        cm_connect_helper(&mut rdma, node, service)?;
        rdma.init_agent(max_message_length, *DEFAULT_ACCESS).await?;
        // wait for server to initialize
        tokio::time::sleep(Duration::from_secs(1)).await;
        Ok(rdma)
    }

    /// Allocate a local memory region
    ///
    /// You can use local mr to `send`&`receive` or `read`&`write` with a remote mr.
    /// The parameter `layout` can be obtained by `Layout::new::<Data>()`.
    /// You can learn the way to write or read data in mr in the following example.
    ///
    /// # Examples
    /// ```
    /// use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// struct Data(String);
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    ///     // put data into lmr
    ///     unsafe { *(*lmr.as_mut_ptr() as *mut Data) = Data("hello world".to_string()) };
    ///     // send the content of lmr to server
    ///     rdma.send(&lmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // receive the data sent by client and put it into an mr
    ///     let lmr = rdma.receive().await?;
    ///     // assert data in the lmr
    ///     unsafe {
    ///         assert_eq!(
    ///             "hello world".to_string(),
    ///             *(*(*lmr.as_ptr() as *const Data)).0
    ///         )
    ///     };
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub fn alloc_local_mr(&self, layout: Layout) -> io::Result<LocalMr> {
        self.allocator
            .alloc_zeroed_default_access(&layout, &self.pd)
    }

    /// Allocate a local memory region that has not been initialized
    ///
    /// You can use local mr to `send`&`receive` or `read`&`write` with a remote mr.
    /// The parameter `layout` can be obtained by `Layout::new::<Data>()`.
    /// You can learn the way to write or read data in mr in the following example.
    ///
    /// # Safety
    ///
    /// The newly allocated memory in this `LocalMr` is uninitialized.
    /// Initialize it before using to make it safe.
    ///
    /// # Examples
    /// ```
    /// use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// struct Data(String);
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     let mut lmr = unsafe { rdma.alloc_local_mr_uninit(Layout::new::<Data>())? };
    ///     // put data into lmr
    ///     unsafe { *(*lmr.as_mut_ptr() as *mut Data) = Data("hello world".to_string()) };
    ///     // send the content of lmr to server
    ///     rdma.send(&lmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // receive the data sent by client and put it into an mr
    ///     let lmr = rdma.receive().await?;
    ///     // assert data in the lmr
    ///     unsafe {
    ///         assert_eq!(
    ///             "hello world".to_string(),
    ///             *(*(*lmr.as_ptr() as *const Data)).0
    ///         )
    ///     };
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub unsafe fn alloc_local_mr_uninit(&self, layout: Layout) -> io::Result<LocalMr> {
        self.allocator.alloc_default_access(&layout, &self.pd)
    }

    /// Allocate a local memory region with specified access
    ///
    /// Use `alloc_local_mr` if you want to alloc memory region with default access.
    ///
    /// If you want more information, please check the documentation and examples
    /// of `alloc_local_mr`.
    ///
    /// # Example
    ///
    /// ```
    /// use async_rdma::{AccessFlag, MrAccess, RdmaBuilder};
    /// use std::alloc::Layout;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let rdma = RdmaBuilder::default().build().unwrap();
    ///     let layout = Layout::new::<[u8; 4096]>();
    ///     let access = AccessFlag::LocalWrite | AccessFlag::RemoteRead;
    ///     let mr = rdma.alloc_local_mr_with_access(layout, access).unwrap();
    ///     assert_eq!(mr.access(), access);
    /// }
    ///
    /// ```
    #[inline]
    pub fn alloc_local_mr_with_access(
        &self,
        layout: Layout,
        access: BitFlags<AccessFlag>,
    ) -> io::Result<LocalMr> {
        self.allocator
            .alloc_zeroed(&layout, flags_into_ibv_access(access), &self.pd)
    }

    /// Allocate a local memory region with specified access that has not been initialized
    ///
    /// Use `alloc_local_mr_uninit` if you want to alloc memory region with default access.
    ///
    /// If you want more information, please check the documentation and examples
    /// of `alloc_local_mr_uninit`.
    ///
    /// # Safety
    ///
    /// The newly allocated memory in this `LocalMr` is uninitialized.
    /// Initialize it before using to make it safe.
    ///
    /// # Example
    ///
    /// ```
    /// use async_rdma::{AccessFlag, MrAccess, RdmaBuilder};
    /// use std::alloc::Layout;
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let rdma = RdmaBuilder::default().build().unwrap();
    ///     let layout = Layout::new::<[u8; 4096]>();
    ///     let access = AccessFlag::LocalWrite | AccessFlag::RemoteRead;
    ///     let mr = unsafe {
    ///         rdma.alloc_local_mr_uninit_with_access(layout, access)
    ///             .unwrap()
    ///     };
    ///     assert_eq!(mr.access(), access);
    /// }
    ///
    /// ```
    #[inline]
    pub unsafe fn alloc_local_mr_uninit_with_access(
        &self,
        layout: Layout,
        access: BitFlags<AccessFlag>,
    ) -> io::Result<LocalMr> {
        self.allocator
            .alloc(&layout, flags_into_ibv_access(access), &self.pd)
    }

    /// Request a remote memory region with default timeout value.
    ///
    /// **Note**: The operation of this memory region will fail after timeout.
    ///
    /// Used with `send_mr`, `receive_local_mr`, `read` and `write`.
    /// Application scenario such as: client uses `request_remote_mr` to apply for
    /// a remote mr from server, and makes server aware of this mr by `send_mr` to server.
    /// For server, this mr is a local mr, which can be received through `receive_local_mr`.
    ///
    /// Application scenario can be seen in `[/example/rpc.rs]`
    ///
    /// # Examples
    /// ```
    /// use async_rdma::{Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// struct Data(String);
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     // request a mr located in server.
    ///     let rmr = rdma.request_remote_mr(Layout::new::<Data>()).await?;
    ///     // do something with rmr like `write` data into it.
    ///     // then send the metadata of rmr to server to make server aware of this mr.
    ///     rdma.send_remote_mr(rmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // receive the metadata of the lmr that had been requested by client
    ///     let _lmr = rdma.receive_local_mr().await?;
    ///     // do something with lmr like getting data from it.
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn request_remote_mr(&self, layout: Layout) -> io::Result<RemoteMr> {
        self.request_remote_mr_with_timeout(layout, DEFAULT_RMR_TIMEOUT)
            .await
    }

    /// Request a remote memory region with customized timeout value.
    /// The rest is consistent with `request_remote_mr`.
    ///
    /// **Note**: The operation of this memory region will fail after timeout.
    ///
    /// Used with `send_mr`, `receive_local_mr`, `read` and `write`.
    /// Application scenario such as: client uses `request_remote_mr` to apply for
    /// a remote mr from server, and makes server aware of this mr by `send_mr` to server.
    /// For server, this mr is a local mr, which can be received through `receive_local_mr`.
    ///
    /// # Examples
    /// ```
    /// use async_rdma::{Rdma, RdmaListener, RemoteMrReadAccess};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// struct Data(String);
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     // request a mr located in server.
    ///     let rmr = rdma
    ///         .request_remote_mr_with_timeout(Layout::new::<Data>(), Duration::from_secs(10))
    ///         .await?;
    ///     assert!(!rmr.timeout_check());
    ///     // do something with rmr like `write` data into it.
    ///     // then send the metadata of rmr to server to make server aware of this mr.
    ///     rdma.send_remote_mr(rmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // receive the metadata of the lmr that had been requested by client
    ///     let _lmr = rdma.receive_local_mr().await?;
    ///     // do something with lmr like getting data from it.
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn request_remote_mr_with_timeout(
        &self,
        layout: Layout,
        timeout: Duration,
    ) -> io::Result<RemoteMr> {
        if let Some(ref agent) = self.agent {
            agent.request_remote_mr_with_timeout(layout, timeout).await
        } else {
            Err(io::Error::new(
                io::ErrorKind::Other,
                "Agent is not ready, please wait a while",
            ))
        }
    }

    /// Send a local memory region metadata to remote with default timeout value
    ///
    /// **Note**: The operation of this memory region will fail after timeout.
    ///
    /// Used with `receive_remote_mr`
    ///
    /// Application scenario such as: client uses `alloc_local_mr` to alloc a local mr, and
    /// makes server aware of this mr by `send_local_mr` to server.
    /// For server, this mr is a remote mr, which can be received through `receive_remote_mr`.
    ///
    /// Application scenario can be seen in `[/example/rpc.rs]`
    ///
    /// # Examples
    /// ```
    /// use async_rdma::{Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// struct Data(String);
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     // request a mr located in server.
    ///     let lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    ///     // do something with rmr like `write` data into it.
    ///     // then send the metadata of this lmr to server to make server aware of this mr.
    ///     rdma.send_local_mr(lmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // receive the metadata of rmr sent by client
    ///     let _rmr = rdma.receive_remote_mr().await?;
    ///     // do something with lmr like getting data from it.
    ///     // wait for the agent thread to send all reponses to the remote.
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn send_local_mr(&self, mr: LocalMr) -> io::Result<()> {
        self.send_local_mr_with_timeout(mr, DEFAULT_RMR_TIMEOUT)
            .await
    }

    /// Send a local memory region metadata with timeout to remote with customized timeout value.
    ///
    /// **Note**: The operation of this memory region will fail after timeout.
    ///
    /// Used with `receive_remote_mr`
    ///
    /// Application scenario such as: client uses `alloc_local_mr` to alloc a local mr, and
    /// makes server aware of this mr by `send_local_mr` to server.
    /// For server, this mr is a remote mr, which can be received through `receive_remote_mr`.
    ///
    /// # Examples
    /// ```
    /// use async_rdma::{Rdma, RdmaListener, RemoteMrReadAccess};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// struct Data(String);
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     // request a mr located in server.
    ///     let lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    ///     // do something with rmr like `write` data into it.
    ///     // then send the metadata of this lmr to server to make server aware of this mr.
    ///     rdma.send_local_mr_with_timeout(lmr, Duration::from_secs(1))
    ///         .await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // receive the metadata of rmr sent by client
    ///     let rmr = rdma.receive_remote_mr().await?;
    ///     assert!(!rmr.timeout_check());
    ///     // do something with lmr like getting data from it.
    ///     // wait for the agent thread to send all reponses to the remote.
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn send_local_mr_with_timeout(
        &self,
        mr: LocalMr,
        timeout: Duration,
    ) -> io::Result<()> {
        if let Some(ref agent) = self.agent {
            agent.send_local_mr_with_timeout(mr, timeout).await
        } else {
            Err(io::Error::new(
                io::ErrorKind::Other,
                "Agent is not ready, please wait a while",
            ))
        }
    }

    /// Send a remote memory region metadata to remote
    ///
    /// Used with `receive_local_mr`.
    ///
    /// Application scenario such as: client uses `request_remote_mr` to apply for
    /// a remote mr from server, and makes server aware of this mr by `send_remote_mr` to server.
    /// For server, this mr is a local mr, which can be received through `receive_local_mr`.
    ///
    /// Application scenario can be seen in `[/example/rpc.rs]`
    ///
    /// # Examples
    /// ```
    /// use async_rdma::{Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// struct Data(String);
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     // request a mr located in server.
    ///     let rmr = rdma.request_remote_mr(Layout::new::<Data>()).await?;
    ///     // do something with rmr like `write` data into it.
    ///     // then send the metadata of rmr to server to make server aware of this mr.
    ///     rdma.send_remote_mr(rmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // receive the metadata of the lmr that had been requested by client
    ///     let _lmr = rdma.receive_local_mr().await?;
    ///     // do something with lmr like getting data from it.
    ///     // wait for the agent thread to send all reponses to the remote.
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn send_remote_mr(&self, mr: RemoteMr) -> io::Result<()> {
        if let Some(ref agent) = self.agent {
            agent.send_remote_mr(mr).await
        } else {
            Err(io::Error::new(
                io::ErrorKind::Other,
                "Agent is not ready, please wait a while",
            ))
        }
    }

    /// Receive a local memory region
    ///
    /// Used with `send_mr`.
    /// Application scenario such as: client uses `request_remote_mr` to apply for
    /// a remote mr from server, and makes server aware of this mr by `send_mr` to server.
    /// For server, this mr is a local mr, which can be received through `receive_local_mr`.
    ///
    /// Application scenario can be seen in `[/example/rpc.rs]`
    ///
    /// # Examples
    /// Application scenario such as: client request a remote mr through `request_remote_mr`,
    /// and then put data into this rmr by rdma `write`. After all client `send_mr` to make
    /// server aware of this mr.
    /// After client `send_mr`, server `receive_local_mr`, and then get data from this mr.
    ///
    /// Application scenario can be seen in `[/example/rpc.rs]`
    ///
    /// # Examples
    /// ```
    /// use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// struct Data(String);
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    ///     let mut rmr = rdma.request_remote_mr(Layout::new::<Data>()).await?;
    ///     // put data into lmr
    ///     unsafe { *(*lmr.as_mut_ptr() as *mut Data) = Data("hello world".to_string()) };
    ///     // write the content of local mr into remote mr
    ///     rdma.write(&lmr, &mut rmr).await?;
    ///     // then send the metadata of rmr to server to make server aware of this mr.
    ///     rdma.send_remote_mr(rmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // receive the metadata of the lmr that had been requested by client
    ///     let lmr = rdma.receive_local_mr().await?;
    ///     // assert the content of lmr, which was `write` by client
    ///     unsafe {
    ///         assert_eq!(
    ///         "hello world".to_string(),
    ///         *(*(*lmr.as_ptr() as *const Data)).0
    ///     )
    ///     };
    ///     // wait for the agent thread to send all reponses to the remote.
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn receive_local_mr(&self) -> io::Result<LocalMr> {
        if let Some(ref agent) = self.agent {
            agent.receive_local_mr().await
        } else {
            Err(io::Error::new(
                io::ErrorKind::Other,
                "Agent is not ready, please wait a while",
            ))
        }
    }

    /// Receive a remote memory region
    ///
    /// Used with `send_mr`.
    /// Application scenario such as: server alloc a local mr and put data into it and let
    /// client know about this mr through `send_mr`. For client, this is a remote mr located
    /// in server.Client receive the metadata of this mr by `receive_remote_mr`.
    ///
    /// Application scenario can be seen in `[/example/rpc.rs]`
    ///
    /// # Examples
    /// ```
    /// use async_rdma::{Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// struct Data(String);
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     // receive the metadata of rmr sent by client
    ///     let _rmr = rdma.receive_remote_mr().await?;
    ///     // do something with rmr like `read` data from it.
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     let lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    ///     // do something with lmr like put data into it.
    ///     // then send the metadata of this lmr to server to make server aware of this mr.
    ///     rdma.send_local_mr(lmr).await?;
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn receive_remote_mr(&self) -> io::Result<RemoteMr> {
        if let Some(ref agent) = self.agent {
            agent.receive_remote_mr().await
        } else {
            Err(io::Error::new(
                io::ErrorKind::Other,
                "Agent is not ready, please wait a while",
            ))
        }
    }

    /// Set qp access for new `Rdma` that created by `clone`
    ///
    /// Used with `listen`, `new_connect`
    ///
    /// # Examples
    ///
    /// ```
    /// use async_rdma::{AccessFlag, RdmaBuilder};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().connect(addr).await?;
    ///     let access = AccessFlag::LocalWrite | AccessFlag::RemoteRead;
    ///     let rdma = rdma.set_new_qp_access(access);
    ///     let _new_rdma = rdma.new_connect(addr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().listen(addr).await?;
    ///         let _new_rdma = rdma.listen().await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    #[must_use]
    pub fn set_new_qp_access(mut self, qp_access: BitFlags<AccessFlag>) -> Self {
        self.clone_attr = self
            .clone_attr
            .set_qp_access(flags_into_ibv_access(qp_access));
        self
    }

    /// Set max access permission for remote mr requests for new `Rdma` that created by `clone`
    ///
    /// Used with `listen`, `new_connect`
    ///
    /// # Examples
    ///
    /// ```
    /// use async_rdma::{AccessFlag, RdmaBuilder, MrAccess};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     alloc::Layout,
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().connect(addr).await?;
    ///     let rmr = rdma.request_remote_mr(Layout::new::<char>()).await?;
    ///     let new_rdma = rdma.new_connect(addr).await?;
    ///     let new_rmr = new_rdma.request_remote_mr(Layout::new::<char>()).await?;
    ///     let access = AccessFlag::LocalWrite | AccessFlag::RemoteRead;
    ///     assert_eq!(new_rmr.access(), access);
    ///     assert_ne!(rmr.access(), new_rmr.access());
    ///     new_rdma.send_remote_mr(new_rmr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().listen(addr).await?;
    ///     let access = AccessFlag::LocalWrite | AccessFlag::RemoteRead;
    ///     let rdma = rdma.set_new_max_rmr_access(access);
    ///     let new_rdma = rdma.listen().await?;
    ///     // receive the metadata of the lmr that had been requested by client
    ///     let _lmr = new_rdma.receive_local_mr().await?;
    ///     // wait for the agent thread to send all reponses to the remote.
    ///     tokio::time::sleep(Duration::from_secs(1)).await;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    #[must_use]
    pub fn set_new_max_rmr_access(mut self, max_rmr_access: BitFlags<AccessFlag>) -> Self {
        self.clone_attr = self
            .clone_attr
            .set_max_rmr_access(flags_into_ibv_access(max_rmr_access));
        self
    }

    /// Set qp access for new `Rdma` that created by `clone`
    ///
    /// Used with `listen`, `new_connect`
    ///
    /// # Examples
    ///
    /// ```
    /// use async_rdma::RdmaBuilder;
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().connect(addr).await?;
    ///     let rdma = rdma.set_new_port_num(1_u8);
    ///     let _new_rdma = rdma.new_connect(addr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().listen(addr).await?;
    ///     let _new_rdma = rdma.listen().await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    #[must_use]
    pub fn set_new_port_num(mut self, port_num: u8) -> Self {
        self.clone_attr = self.clone_attr.set_port_num(port_num);
        self
    }

    /// Set new `ProtectionDomain` for new `Rdma` that created by `clone` to provide isolation.
    ///
    /// Used with `listen`, `new_connect`
    ///
    /// # Examples
    ///
    /// ```
    /// use async_rdma::RdmaBuilder;
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().connect(addr).await?;
    ///     let rdma = rdma.set_new_pd()?;
    ///     // then the `Rdma`s created by `new_connect` will have a new `ProtectionDomain`
    ///     let _new_rdma = rdma.new_connect(addr).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma = RdmaBuilder::default().listen(addr).await?;
    ///     let _new_rdma = rdma.listen().await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub fn set_new_pd(mut self) -> io::Result<Self> {
        let new_pd = self.ctx.create_protection_domain()?;
        self.clone_attr = self.clone_attr.set_pd(new_pd);
        Ok(self)
    }
}

/// Rdma Listener is the wrapper of a `TcpListener`, which is used to
/// build the rdma queue pair.
#[derive(Debug)]
pub struct RdmaListener {
    /// Tcp listener to establish the queue pair
    tcp_listener: TcpListener,
}

impl RdmaListener {
    /// Bind the address and wait for a connection
    /// # Examples
    /// ```
    /// use async_rdma::{Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let _rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let _rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // run here after client connect
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
        let tcp_listener = TcpListener::bind(addr).await?;
        Ok(Self { tcp_listener })
    }

    /// Wait for a connection from a remote host
    /// # Examples
    /// ```
    /// use async_rdma::{Rdma, RdmaListener};
    /// use portpicker::pick_unused_port;
    /// use std::{
    ///     io,
    ///     net::{Ipv4Addr, SocketAddrV4},
    ///     time::Duration,
    /// };
    ///
    /// async fn client(addr: SocketAddrV4) -> io::Result<()> {
    ///     let _rdma = Rdma::connect(addr, 1, 1, 512).await?;
    ///     Ok(())
    /// }
    ///
    /// #[tokio::main]
    /// async fn server(addr: SocketAddrV4) -> io::Result<()> {
    ///     let rdma_listener = RdmaListener::bind(addr).await?;
    ///     let _rdma = rdma_listener.accept(1, 1, 512).await?;
    ///     // run here after client connect
    ///     Ok(())
    /// }
    /// #[tokio::main]
    /// async fn main() {
    ///     let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    ///     std::thread::spawn(move || server(addr));
    ///     tokio::time::sleep(Duration::from_secs(3)).await;
    ///     client(addr)
    ///         .await
    ///         .map_err(|err| println!("{}", err))
    ///         .unwrap();
    /// }
    /// ```
    #[inline]
    pub async fn accept(
        &self,
        port_num: u8,
        gid_index: usize,
        max_message_length: usize,
    ) -> io::Result<Rdma> {
        let (mut stream, _) = self.tcp_listener.accept().await?;
        let mut rdma = RdmaBuilder::default()
            .set_port_num(port_num)
            .set_gid_index(gid_index)
            .build()?;
        assert!(
            rdma.conn_type == ConnectionType::RCSocket,
            "should set connection type to RCSocket"
        );
        let endpoint_size = bincode::serialized_size(&rdma.endpoint()).map_err(|e| {
            io::Error::new(
                io::ErrorKind::Other,
                format!("Endpoint serialization failed, {:?}", e),
            )
        })?;
        let mut remote = vec![0_u8; endpoint_size.cast()];
        // the byte number is not important, as read_exact will fill the buffer
        let _ = stream.read_exact(remote.as_mut()).await?;
        let remote: QueuePairEndpoint = bincode::deserialize(&remote).map_err(|e| {
            io::Error::new(
                io::ErrorKind::Other,
                format!("failed to deserialize remote endpoint, {:?}", e),
            )
        })?;
        let local = bincode::serialize(&rdma.endpoint()).map_err(|e| {
            io::Error::new(
                io::ErrorKind::Other,
                format!("failed to deserialize remote endpoint, {:?}", e),
            )
        })?;
        stream.write_all(&local).await?;
        rdma.qp_handshake(remote)?;
        debug!("handshake done");
        rdma.init_agent(max_message_length, *DEFAULT_ACCESS).await?;
        Ok(rdma)
    }
}