Struct async_rdma::Rdma

source ·
pub struct Rdma { /* private fields */ }
Expand description

Rdma handler, the only interface that the users deal with rdma

Implementations§

source§

impl Rdma

source

pub async fn ibv_connect(&mut self, remote: QueuePairEndpoint) -> Result<()>

Connect to remote end by raw ibv information.

You can get the destination qp information in any way and use this interface to establish connection.

Examples
use async_rdma::{
    ConnectionType, LocalMrReadAccess, LocalMrWriteAccess, QueuePairEndpoint, Rdma, RdmaBuilder,
};
use std::{
    alloc::Layout,
    io::{self, Write},
    time::Duration,
};

async fn client(mut client_rdma: Rdma, server_info: QueuePairEndpoint) -> io::Result<()> {
    client_rdma.ibv_connect(server_info).await?;
    // alloc 8 bytes local memory
    let mut lmr = client_rdma.alloc_local_mr(Layout::new::<[u8; 8]>())?;
    // write data into lmr
    let _num = lmr.as_mut_slice().write(&[1_u8; 8])?;
    // send data in mr to the remote end
    client_rdma.send(&lmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(mut server_rdma: Rdma, client_info: QueuePairEndpoint) -> io::Result<()> {
    server_rdma.ibv_connect(client_info).await?;
    // receive data
    let lmr = server_rdma.receive().await?;
    let data = *lmr.as_slice();
    assert_eq!(data, [1_u8; 8]);
    Ok(())
}

#[tokio::main]
async fn main() {
    let server_rdma = RdmaBuilder::default()
        .set_conn_type(ConnectionType::RCIBV)
        .build()
        .unwrap();
    let server_info = server_rdma.get_qp_endpoint();
    let client_rdma = RdmaBuilder::default()
        .set_conn_type(ConnectionType::RCIBV)
        .build()
        .unwrap();
    let client_info = client_rdma.get_qp_endpoint();
    std::thread::spawn(move || server(server_rdma, client_info));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(client_rdma, server_info)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub async fn listen(&mut self) -> Result<Self>

Listen for new connections using the same mr_allocator and event_listener as parent Rdma

Used with connect and new_connect

Examples
use async_rdma::RdmaBuilder;
use portpicker::pick_unused_port;
use std::{
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let mut rdma = RdmaBuilder::default().connect(addr).await?;
    for _ in 0..3 {
        let _new_rdma = rdma.new_connect(addr).await?;
    }
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let mut rdma = RdmaBuilder::default().listen(addr).await?;
    for _ in 0..3 {
        let _new_rdma = rdma.listen().await?;
    }
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub async fn new_connect<A: ToSocketAddrs>(&mut self, addr: A) -> Result<Self>

Establish new connections with RDMA server using the same mr_allocator and event_listener as parent Rdma

Used with listen

Examples
use async_rdma::RdmaBuilder;
use portpicker::pick_unused_port;
use std::{
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let mut rdma = RdmaBuilder::default().connect(addr).await?;
    for _ in 0..3 {
        let _new_rdma = rdma.new_connect(addr).await?;
    }
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let mut rdma = RdmaBuilder::default().listen(addr).await?;
    for _ in 0..3 {
        let _new_rdma = rdma.listen().await?;
    }
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub async fn send(&self, lm: &LocalMr) -> Result<()>

Send the content in the lm

Used with receive. Application scenario such as: client put data into a local mr and send to server. Server receive the mr sent by client and process data in it.

Application scenario can be seen in [/example/rpc.rs]

Examples
use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

struct Data(String);

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    // put data into lmr
    unsafe { *(*lmr.as_mut_ptr() as *mut Data) = Data("hello world".to_string()) };
    // send the content of lmr to server
    rdma.send(&lmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    // receive the data sent by client and put it into an mr
    let lmr = rdma.receive().await?;
    // read data from mr
    unsafe {
        assert_eq!(
            "hello world".to_string(),
            *(*(*lmr.as_ptr() as *const Data)).0
        )
    };
    // wait for the agent thread to send all reponses to the remote.
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub async fn atomic_cas<RW>(
    &self,
    old_value: u64,
    new_value: u64,
    rm: &mut RW
) -> Result<()>where
    RW: RemoteMrWriteAccess,

A 64 bits value in a remote mr being read, compared with old_value and if they are equal, the new_value is being written to the remote mr in an atomic way.

Examples
use async_rdma::{LocalMrReadAccess, RdmaBuilder};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = RdmaBuilder::default().connect(addr).await?;
    // alloc 8 bytes remote memory
    let mut rmr = rdma.request_remote_mr(Layout::new::<[u8; 8]>()).await?;
    let new_value = u64::from_le_bytes([1_u8; 8]);
    // read, compare with rmr and swap `old_value` with `new_value`
    rdma.atomic_cas(0, new_value, &mut rmr).await?;
    // send rmr's meta data to the remote end
    rdma.send_remote_mr(rmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = RdmaBuilder::default().listen(addr).await?;
    // receive mr's meta data from client
    let lmr = rdma.receive_local_mr().await?;
    // assert the content of lmr, which was write by cas
    let data = *lmr.as_slice();
    assert_eq!(data, [1_u8; 8]);
    Ok(())
}

#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub async fn send_with_imm(&self, lm: &LocalMr, imm: u32) -> Result<()>

Send the content in the lm with immediate date.

Used with receive_with_imm.

Examples
use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

struct Data(String);
static IMM_NUM: u32 = 123;
static MSG: &str = "hello world";

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    // put data into lmr
    unsafe { std::ptr::write(*lmr.as_mut_ptr() as *mut Data, Data(MSG.to_string())) };
    // send the content of lmr and imm data to server
    rdma.send_with_imm(&lmr, IMM_NUM).await?;
    rdma.send_with_imm(&lmr, IMM_NUM).await?;
    rdma.send(&lmr).await?;
    rdma.send(&lmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    // receive the data and imm sent by the client
    let (lmr, imm) = rdma.receive_with_imm().await?;
    assert_eq!(imm, Some(IMM_NUM));
    unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    // receive the data in mr while avoiding the immediate data is ok.
    let lmr = rdma.receive().await?;
    unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    // `receive_with_imm` works well even if the client didn't send any immediate data.
    // the imm received will be a `None`.
    let (lmr, imm) = rdma.receive_with_imm().await?;
    assert_eq!(imm, None);
    unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    // compared to the above, using `receive` is a better choice.
    let lmr = rdma.receive().await?;
    unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    // wait for the agent thread to send all reponses to the remote.
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    let server_handle = std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
    server_handle.join().unwrap().unwrap();
}
source

pub async fn receive(&self) -> Result<LocalMr>

Receive the content and stored in the returned memory region

Used with send. Application scenario such as: client put data into a local mr and send to server. Server receive the mr sent by client and process data in it.

Application scenario can be seen in [/example/rpc.rs]

Examples
use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

struct Data(String);

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    // put data into lmr
    unsafe { *(*lmr.as_mut_ptr() as *mut Data) = Data("hello world".to_string()) };
    // send the content of lmr to server
    rdma.send(&lmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    // receive the data sent by client and put it into an mr
    let lmr = rdma.receive().await?;
    // read data from mr
    unsafe {
        assert_eq!(
            "hello world".to_string(),
            *(*(*lmr.as_ptr() as *const Data)).0
        )
    };
    // wait for the agent thread to send all reponses to the remote.
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub async fn receive_with_imm(&self) -> Result<(LocalMr, Option<u32>)>

Receive the content and stored in the returned memory region.

Used with send_with_imm.

Examples
use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

struct Data(String);
static IMM_NUM: u32 = 123;
static MSG: &str = "hello world";

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    // put data into lmr
    unsafe { std::ptr::write(*lmr.as_mut_ptr() as *mut Data, Data(MSG.to_string())) };
    // send the content of lmr and imm data to server
    rdma.send_with_imm(&lmr, IMM_NUM).await?;
    rdma.send_with_imm(&lmr, IMM_NUM).await?;
    rdma.send(&lmr).await?;
    rdma.send(&lmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    // receive the data and imm sent by the client
    let (lmr, imm) = rdma.receive_with_imm().await?;
    assert_eq!(imm, Some(IMM_NUM));
    unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    // receive the data in mr while avoiding the immediate data is ok.
    let lmr = rdma.receive().await?;
    unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    // `receive_with_imm` works well even if the client didn't send any immediate data.
    // the imm received will be a `None`.
    let (lmr, imm) = rdma.receive_with_imm().await?;
    assert_eq!(imm, None);
    unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    // compared to the above, using `receive` is a better choice.
    let lmr = rdma.receive().await?;
    unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    // wait for the agent thread to send all reponses to the remote.
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    let server_handle = std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
    server_handle.join().unwrap().unwrap();
}
source

pub async fn receive_write_imm(&self) -> Result<u32>

Receive the immediate data sent by write_with_imm.

Used with write_with_imm.

Examples
use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

static IMM_NUM: u32 = 123;
struct Data(String);

static MSG: &str = "hello world";

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    let mut rmr = rdma.request_remote_mr(Layout::new::<Data>()).await?;
    let data = Data(MSG.to_string());
    unsafe { *(*lmr.as_mut_ptr() as *mut Data) = data };
    // write the content of lmr to remote mr with immediate data.
    rdma.write_with_imm(&lmr, &mut rmr, IMM_NUM).await?;
    // then send the metadata of rmr to server to make server aware of this mr.
    rdma.send_remote_mr(rmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    // receive the immediate data sent by `write_with_imm`
    let imm = rdma.receive_write_imm().await?;
    assert_eq!(imm, IMM_NUM);
    // receive the metadata of the lmr that had been requested by client
    let lmr = rdma.receive_local_mr().await?;
    // assert the content of lmr, which was `write` by client
    unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    // wait for the agent thread to send all reponses to the remote.
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok(())
}

#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    let server_handle = std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
    server_handle.join().unwrap().unwrap();
}
source

pub async fn read<LW, RR>(&self, lm: &mut LW, rm: &RR) -> Result<()>where
    LW: LocalMrWriteAccess,
    RR: RemoteMrReadAccess,

Read content in the rm and store the content in the lm

Application scenario such as: client put data into a local mr and send_mr to server. Server get a remote mr by receive_remote_mr, and then get data from this rmr by rdma read.

Application scenario can be seen in [/example/rpc.rs]

Examples
use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

struct Data(String);

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    // put data into lmr
    unsafe { *(*lmr.as_mut_ptr() as *mut Data) = Data("hello world".to_string()) };
    // then send the metadata of this lmr to server to make server aware of this mr.
    rdma.send_local_mr(lmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    // receive the metadata of rmr sent by client
    let rmr = rdma.receive_remote_mr().await?;
    // `read` data from rmr to lmr
    rdma.read(&mut lmr, &rmr).await?;
    // assert the content of lmr, which was get from rmr by rdma `read`
    unsafe {
        assert_eq!(
            "hello world".to_string(),
            *(*(*lmr.as_ptr() as *const Data)).0
        )
    };
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub async fn write<LR, RW>(&self, lm: &LR, rm: &mut RW) -> Result<()>where
    LR: LocalMrReadAccess,
    RW: RemoteMrWriteAccess,

Write content in the lm to rm

Application scenario such as: client request a remote mr through request_remote_mr, and then put data into this rmr by rdma write. After all client send_mr to make server aware of this mr. After client send_mr, server receive_local_mr, and then get data from this mr.

Application scenario can be seen in [/example/rpc.rs]

Examples
use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

struct Data(String);

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    let mut rmr = rdma.request_remote_mr(Layout::new::<Data>()).await?;
    // put data into lmr
    unsafe { *(*lmr.as_mut_ptr() as *mut Data) = Data("hello world".to_string()) };
    // write the content of local mr into remote mr
    rdma.write(&lmr, &mut rmr).await?;
    // then send the metadata of rmr to server to make server aware of this mr.
    rdma.send_remote_mr(rmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    // receive the metadata of the lmr that had been requested by client
    let lmr = rdma.receive_local_mr().await?;
    // assert the content of lmr, which was `write` by client
    unsafe {
        assert_eq!(
            "hello world".to_string(),
            *(*(*lmr.as_ptr() as *const Data)).0
        )
    };
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub async fn write_with_imm<LR, RW>(
    &self,
    lm: &LR,
    rm: &mut RW,
    imm: u32
) -> Result<()>where
    LR: LocalMrReadAccess,
    RW: RemoteMrWriteAccess,

Write content in the lm to rm and send a immediate data which will consume a rdma receive work request in the receiver’s receive queue. The receiver can receive this immediate data by using receive_write_imm.

Used with receive_write_imm.

Examples
use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

static IMM_NUM: u32 = 123;
struct Data(String);

static MSG: &str = "hello world";

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    let mut rmr = rdma.request_remote_mr(Layout::new::<Data>()).await?;
    let data = Data(MSG.to_string());
    unsafe { *(*lmr.as_mut_ptr() as *mut Data) = data };
    // write the content of lmr to server with immediate data.
    rdma.write_with_imm(&lmr, &mut rmr, IMM_NUM).await?;
    // then send the metadata of rmr to server to make server aware of this mr.
    rdma.send_remote_mr(rmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    // receive the immediate data sent by `write_with_imm`
    let imm = rdma.receive_write_imm().await?;
    assert_eq!(imm, IMM_NUM);
    // receive the metadata of the lmr that had been requested by client
    let lmr = rdma.receive_local_mr().await?;
    // assert the content of lmr, which was `write` by client
    unsafe { assert_eq!(MSG.to_string(), *(*(*lmr.as_ptr() as *const Data)).0) };
    // wait for the agent thread to send all reponses to the remote.
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok(())
}

#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    let server_handle = std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
    server_handle.join().unwrap().unwrap();
}
source

pub async fn connect<A: ToSocketAddrs>(
    addr: A,
    port_num: u8,
    gid_index: usize,
    max_message_length: usize
) -> Result<Self>

Connect the remote endpoint and build rmda queue pair by TCP connection

gid_index: 0:ipv6, 1:ipv4 max_message_length: max length of msg used in send&receive.

Examples
use async_rdma::{Rdma, RdmaListener};
use portpicker::pick_unused_port;
use std::{
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let _rdma = Rdma::connect(addr, 1, 1, 512).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let _rdma = rdma_listener.accept(1, 1, 512).await?;
    // run here after client connect
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub fn alloc_local_mr(&self, layout: Layout) -> Result<LocalMr>

Allocate a local memory region

You can use local mr to send&receive or read&write with a remote mr. The parameter layout can be obtained by Layout::new::<Data>(). You can learn the way to write or read data in mr in the following example.

Examples
use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

struct Data(String);

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    // put data into lmr
    unsafe { *(*lmr.as_mut_ptr() as *mut Data) = Data("hello world".to_string()) };
    // send the content of lmr to server
    rdma.send(&lmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    // receive the data sent by client and put it into an mr
    let lmr = rdma.receive().await?;
    // assert data in the lmr
    unsafe {
        assert_eq!(
            "hello world".to_string(),
            *(*(*lmr.as_ptr() as *const Data)).0
        )
    };
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub unsafe fn alloc_local_mr_uninit(&self, layout: Layout) -> Result<LocalMr>

Allocate a local memory region that has not been initialized

You can use local mr to send&receive or read&write with a remote mr. The parameter layout can be obtained by Layout::new::<Data>(). You can learn the way to write or read data in mr in the following example.

Safety

The newly allocated memory in this LocalMr is uninitialized. Initialize it before using to make it safe.

Examples
use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

struct Data(String);

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    let mut lmr = unsafe { rdma.alloc_local_mr_uninit(Layout::new::<Data>())? };
    // put data into lmr
    unsafe { *(*lmr.as_mut_ptr() as *mut Data) = Data("hello world".to_string()) };
    // send the content of lmr to server
    rdma.send(&lmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    // receive the data sent by client and put it into an mr
    let lmr = rdma.receive().await?;
    // assert data in the lmr
    unsafe {
        assert_eq!(
            "hello world".to_string(),
            *(*(*lmr.as_ptr() as *const Data)).0
        )
    };
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub fn alloc_local_mr_with_access(
    &self,
    layout: Layout,
    access: BitFlags<AccessFlag>
) -> Result<LocalMr>

Allocate a local memory region with specified access

Use alloc_local_mr if you want to alloc memory region with default access.

If you want more information, please check the documentation and examples of alloc_local_mr.

Example
use async_rdma::{AccessFlag, MrAccess, RdmaBuilder};
use std::alloc::Layout;

#[tokio::main]
async fn main() {
    let rdma = RdmaBuilder::default().build().unwrap();
    let layout = Layout::new::<[u8; 4096]>();
    let access = AccessFlag::LocalWrite | AccessFlag::RemoteRead;
    let mr = rdma.alloc_local_mr_with_access(layout, access).unwrap();
    assert_eq!(mr.access(), access);
}
source

pub unsafe fn alloc_local_mr_uninit_with_access(
    &self,
    layout: Layout,
    access: BitFlags<AccessFlag>
) -> Result<LocalMr>

Allocate a local memory region with specified access that has not been initialized

Use alloc_local_mr_uninit if you want to alloc memory region with default access.

If you want more information, please check the documentation and examples of alloc_local_mr_uninit.

Safety

The newly allocated memory in this LocalMr is uninitialized. Initialize it before using to make it safe.

Example
use async_rdma::{AccessFlag, MrAccess, RdmaBuilder};
use std::alloc::Layout;

#[tokio::main]
async fn main() {
    let rdma = RdmaBuilder::default().build().unwrap();
    let layout = Layout::new::<[u8; 4096]>();
    let access = AccessFlag::LocalWrite | AccessFlag::RemoteRead;
    let mr = unsafe {
        rdma.alloc_local_mr_uninit_with_access(layout, access)
            .unwrap()
    };
    assert_eq!(mr.access(), access);
}
source

pub async fn request_remote_mr(&self, layout: Layout) -> Result<RemoteMr>

Request a remote memory region with default timeout value.

Note: The operation of this memory region will fail after timeout.

Used with send_mr, receive_local_mr, read and write. Application scenario such as: client uses request_remote_mr to apply for a remote mr from server, and makes server aware of this mr by send_mr to server. For server, this mr is a local mr, which can be received through receive_local_mr.

Application scenario can be seen in [/example/rpc.rs]

Examples
use async_rdma::{Rdma, RdmaListener};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

struct Data(String);

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    // request a mr located in server.
    let rmr = rdma.request_remote_mr(Layout::new::<Data>()).await?;
    // do something with rmr like `write` data into it.
    // then send the metadata of rmr to server to make server aware of this mr.
    rdma.send_remote_mr(rmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    // receive the metadata of the lmr that had been requested by client
    let _lmr = rdma.receive_local_mr().await?;
    // do something with lmr like getting data from it.
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub async fn request_remote_mr_with_timeout(
    &self,
    layout: Layout,
    timeout: Duration
) -> Result<RemoteMr>

Request a remote memory region with customized timeout value. The rest is consistent with request_remote_mr.

Note: The operation of this memory region will fail after timeout.

Used with send_mr, receive_local_mr, read and write. Application scenario such as: client uses request_remote_mr to apply for a remote mr from server, and makes server aware of this mr by send_mr to server. For server, this mr is a local mr, which can be received through receive_local_mr.

Examples
use async_rdma::{Rdma, RdmaListener, RemoteMrReadAccess};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

struct Data(String);

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    // request a mr located in server.
    let rmr = rdma
        .request_remote_mr_with_timeout(Layout::new::<Data>(), Duration::from_secs(10))
        .await?;
    assert!(!rmr.timeout_check());
    // do something with rmr like `write` data into it.
    // then send the metadata of rmr to server to make server aware of this mr.
    rdma.send_remote_mr(rmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    // receive the metadata of the lmr that had been requested by client
    let _lmr = rdma.receive_local_mr().await?;
    // do something with lmr like getting data from it.
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub async fn send_local_mr(&self, mr: LocalMr) -> Result<()>

Send a local memory region metadata to remote with default timeout value

Note: The operation of this memory region will fail after timeout.

Used with receive_remote_mr

Application scenario such as: client uses alloc_local_mr to alloc a local mr, and makes server aware of this mr by send_local_mr to server. For server, this mr is a remote mr, which can be received through receive_remote_mr.

Application scenario can be seen in [/example/rpc.rs]

Examples
use async_rdma::{Rdma, RdmaListener};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

struct Data(String);

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    // request a mr located in server.
    let lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    // do something with rmr like `write` data into it.
    // then send the metadata of this lmr to server to make server aware of this mr.
    rdma.send_local_mr(lmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    // receive the metadata of rmr sent by client
    let _rmr = rdma.receive_remote_mr().await?;
    // do something with lmr like getting data from it.
    // wait for the agent thread to send all reponses to the remote.
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub async fn send_local_mr_with_timeout(
    &self,
    mr: LocalMr,
    timeout: Duration
) -> Result<()>

Send a local memory region metadata with timeout to remote with customized timeout value.

Note: The operation of this memory region will fail after timeout.

Used with receive_remote_mr

Application scenario such as: client uses alloc_local_mr to alloc a local mr, and makes server aware of this mr by send_local_mr to server. For server, this mr is a remote mr, which can be received through receive_remote_mr.

Examples
use async_rdma::{Rdma, RdmaListener, RemoteMrReadAccess};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

struct Data(String);

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    // request a mr located in server.
    let lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    // do something with rmr like `write` data into it.
    // then send the metadata of this lmr to server to make server aware of this mr.
    rdma.send_local_mr_with_timeout(lmr, Duration::from_secs(1))
        .await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    // receive the metadata of rmr sent by client
    let rmr = rdma.receive_remote_mr().await?;
    assert!(!rmr.timeout_check());
    // do something with lmr like getting data from it.
    // wait for the agent thread to send all reponses to the remote.
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub async fn send_remote_mr(&self, mr: RemoteMr) -> Result<()>

Send a remote memory region metadata to remote

Used with receive_local_mr.

Application scenario such as: client uses request_remote_mr to apply for a remote mr from server, and makes server aware of this mr by send_remote_mr to server. For server, this mr is a local mr, which can be received through receive_local_mr.

Application scenario can be seen in [/example/rpc.rs]

Examples
use async_rdma::{Rdma, RdmaListener};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

struct Data(String);

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    // request a mr located in server.
    let rmr = rdma.request_remote_mr(Layout::new::<Data>()).await?;
    // do something with rmr like `write` data into it.
    // then send the metadata of rmr to server to make server aware of this mr.
    rdma.send_remote_mr(rmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    // receive the metadata of the lmr that had been requested by client
    let _lmr = rdma.receive_local_mr().await?;
    // do something with lmr like getting data from it.
    // wait for the agent thread to send all reponses to the remote.
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub async fn receive_local_mr(&self) -> Result<LocalMr>

Receive a local memory region

Used with send_mr. Application scenario such as: client uses request_remote_mr to apply for a remote mr from server, and makes server aware of this mr by send_mr to server. For server, this mr is a local mr, which can be received through receive_local_mr.

Application scenario can be seen in [/example/rpc.rs]

Examples

Application scenario such as: client request a remote mr through request_remote_mr, and then put data into this rmr by rdma write. After all client send_mr to make server aware of this mr. After client send_mr, server receive_local_mr, and then get data from this mr.

Application scenario can be seen in [/example/rpc.rs]

Examples
use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, Rdma, RdmaListener};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

struct Data(String);

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    let mut lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    let mut rmr = rdma.request_remote_mr(Layout::new::<Data>()).await?;
    // put data into lmr
    unsafe { *(*lmr.as_mut_ptr() as *mut Data) = Data("hello world".to_string()) };
    // write the content of local mr into remote mr
    rdma.write(&lmr, &mut rmr).await?;
    // then send the metadata of rmr to server to make server aware of this mr.
    rdma.send_remote_mr(rmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    // receive the metadata of the lmr that had been requested by client
    let lmr = rdma.receive_local_mr().await?;
    // assert the content of lmr, which was `write` by client
    unsafe {
        assert_eq!(
        "hello world".to_string(),
        *(*(*lmr.as_ptr() as *const Data)).0
    )
    };
    // wait for the agent thread to send all reponses to the remote.
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub async fn receive_remote_mr(&self) -> Result<RemoteMr>

Receive a remote memory region

Used with send_mr. Application scenario such as: server alloc a local mr and put data into it and let client know about this mr through send_mr. For client, this is a remote mr located in server.Client receive the metadata of this mr by receive_remote_mr.

Application scenario can be seen in [/example/rpc.rs]

Examples
use async_rdma::{Rdma, RdmaListener};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

struct Data(String);

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Rdma::connect(addr, 1, 1, 512).await?;
    // receive the metadata of rmr sent by client
    let _rmr = rdma.receive_remote_mr().await?;
    // do something with rmr like `read` data from it.
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_listener = RdmaListener::bind(addr).await?;
    let rdma = rdma_listener.accept(1, 1, 512).await?;
    let lmr = rdma.alloc_local_mr(Layout::new::<Data>())?;
    // do something with lmr like put data into it.
    // then send the metadata of this lmr to server to make server aware of this mr.
    rdma.send_local_mr(lmr).await?;
    Ok(())
}
#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub fn set_new_qp_access(self, qp_access: BitFlags<AccessFlag>) -> Self

Set qp access for new Rdma that created by clone

Used with listen, new_connect

Examples
use async_rdma::{AccessFlag, RdmaBuilder};
use portpicker::pick_unused_port;
use std::{
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = RdmaBuilder::default().connect(addr).await?;
    let access = AccessFlag::LocalWrite | AccessFlag::RemoteRead;
    let mut rdma = rdma.set_new_qp_access(access);
    let _new_rdma = rdma.new_connect(addr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let mut rdma = RdmaBuilder::default().listen(addr).await?;
        let _new_rdma = rdma.listen().await?;
    Ok(())
}

#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub fn set_new_max_rmr_access(self, max_rmr_access: BitFlags<AccessFlag>) -> Self

Set max access permission for remote mr requests for new Rdma that created by clone

Used with listen, new_connect

Examples
use async_rdma::{AccessFlag, RdmaBuilder, MrAccess};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let mut rdma = RdmaBuilder::default().connect(addr).await?;
    let rmr = rdma.request_remote_mr(Layout::new::<char>()).await?;
    let new_rdma = rdma.new_connect(addr).await?;
    let new_rmr = new_rdma.request_remote_mr(Layout::new::<char>()).await?;
    let access = AccessFlag::LocalWrite | AccessFlag::RemoteRead;
    assert_eq!(new_rmr.access(), access);
    assert_ne!(rmr.access(), new_rmr.access());
    new_rdma.send_remote_mr(new_rmr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = RdmaBuilder::default().listen(addr).await?;
    let access = AccessFlag::LocalWrite | AccessFlag::RemoteRead;
    let mut rdma = rdma.set_new_max_rmr_access(access);
    let new_rdma = rdma.listen().await?;
    // receive the metadata of the lmr that had been requested by client
    let _lmr = new_rdma.receive_local_mr().await?;
    // wait for the agent thread to send all reponses to the remote.
    tokio::time::sleep(Duration::from_secs(1)).await;
    Ok(())
}

#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub fn set_new_port_num(self, port_num: u8) -> Self

Set qp access for new Rdma that created by clone

Used with listen, new_connect

Examples
use async_rdma::RdmaBuilder;
use portpicker::pick_unused_port;
use std::{
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = RdmaBuilder::default().connect(addr).await?;
    let mut rdma = rdma.set_new_port_num(1_u8);
    let _new_rdma = rdma.new_connect(addr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let mut rdma = RdmaBuilder::default().listen(addr).await?;
    let _new_rdma = rdma.listen().await?;
    Ok(())
}

#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub fn set_new_pd(self) -> Result<Self>

Set new ProtectionDomain for new Rdma that created by clone to provide isolation.

Used with listen, new_connect

Examples
use async_rdma::RdmaBuilder;
use portpicker::pick_unused_port;
use std::{
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = RdmaBuilder::default().connect(addr).await?;
    let mut rdma = rdma.set_new_pd()?;
    // then the `Rdma`s created by `new_connect` will have a new `ProtectionDomain`
    let _new_rdma = rdma.new_connect(addr).await?;
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let mut rdma = RdmaBuilder::default().listen(addr).await?;
    let _new_rdma = rdma.listen().await?;
    Ok(())
}

#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub fn get_cur_qp_state(&self) -> QueuePairState

Get the real state of qp by quering

Examples
use async_rdma::{RdmaBuilder, QueuePairState};
use portpicker::pick_unused_port;
use std::{
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_init = RdmaBuilder::default().build()?;
    assert_eq!(rdma_init.get_cur_qp_state(), QueuePairState::Init);
    let rdma_send = RdmaBuilder::default().connect(addr).await?;
    assert_eq!(rdma_send.get_cur_qp_state(), QueuePairState::ReadyToSend);
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = RdmaBuilder::default().listen(addr).await?;
    assert_eq!(rdma.get_cur_qp_state(), QueuePairState::ReadyToSend);
    Ok(())
}

#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub fn query_qp_state(&self) -> Result<QueuePairState>

Get the real state of qp by quering

Examples
use async_rdma::{RdmaBuilder, QueuePairState};
use portpicker::pick_unused_port;
use std::{
    io,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma_init = RdmaBuilder::default().build()?;
    assert_eq!(rdma_init.query_qp_state()?, QueuePairState::Init);
    let rdma_send = RdmaBuilder::default().connect(addr).await?;
    assert_eq!(rdma_send.query_qp_state()?, QueuePairState::ReadyToSend);
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = RdmaBuilder::default().listen(addr).await?;
    assert_eq!(rdma.query_qp_state()?, QueuePairState::ReadyToSend);
    Ok(())
}

#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub fn get_qp_endpoint(&self) -> QueuePairEndpoint

Get information of this qp for establishing a connection.

source

pub async fn poll(&self) -> Result<()>

User driven poll. Used with RdmaBuilder.set_polling_trigger(). See also Rdma.get_manual_trigger().

If you want to control CQ polling by yourself manually, you can set PollingTriggerType::Manual using RdmaBuilder.set_polling_trigger(). Then all async RDMA operations will not complete until you poll.

Examples
use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, RdmaBuilder};
use minstant::Instant;
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io::{self, Write},
    net::{Ipv4Addr, SocketAddrV4},
    sync::Arc,
    time::Duration,
};

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    const POLLING_INTERVAL: Duration = Duration::from_millis(100);
    let rdma = Arc::new(RdmaBuilder::default()
        .set_polling_trigger(async_rdma::PollingTriggerType::Manual)
        .set_cc_evnet_timeout(Duration::from_secs(10))
        .connect(addr)
        .await
        .unwrap());

    let rdma_trigger = rdma.clone();
    // polling task
    let _trigger_handle = tokio::spawn(async move {
        loop {
            tokio::time::sleep(POLLING_INTERVAL).await;
            rdma_trigger.poll().await.unwrap();
        }
    });

    let mut lmr = rdma.alloc_local_mr(Layout::new::<[u8; 8]>())?;
    let _num = lmr.as_mut_slice().write(&[1_u8; 8])?;
    let instant = Instant::now();
    rdma.send(&lmr).await?;
    assert!(instant.elapsed() >= POLLING_INTERVAL);
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = RdmaBuilder::default().listen(addr).await?;
    let lmr = rdma.receive().await?;
    let data = *lmr.as_slice();
    assert_eq!(data, [1_u8; 8]);
    Ok(())
}

#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub fn get_manual_trigger(&self) -> Result<ManualTrigger>

Get tx end of channel polling trigger.

You can send a () message through tx like to trigger a polling like Rdma.poll(). This API is more convenient and flexible than Rdma.poll(), becasue you can clone more than one tx and send them to other threads or tasks.

Examples
use async_rdma::{LocalMrReadAccess, LocalMrWriteAccess, RdmaBuilder};
use minstant::Instant;
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    io::{self, Write},
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration,
};

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    const POLLING_INTERVAL: Duration = Duration::from_millis(100);
    let rdma = RdmaBuilder::default()
        .set_polling_trigger(async_rdma::PollingTriggerType::Manual)
        .set_cc_evnet_timeout(Duration::from_secs(10))
        .connect(addr)
        .await
        .unwrap();

    let trigger = rdma.get_manual_trigger().unwrap();
    // polling task
    let _trigger_handle = tokio::spawn(async move {
        loop {
            tokio::time::sleep(POLLING_INTERVAL).await;
            trigger.pull().await.unwrap();
        }
    });

    let mut lmr = rdma.alloc_local_mr(Layout::new::<[u8; 8]>())?;
    let _num = lmr.as_mut_slice().write(&[1_u8; 8])?;
    let instant = Instant::now();
    rdma.send(&lmr).await?;
    assert!(instant.elapsed() >= POLLING_INTERVAL);
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = RdmaBuilder::default().listen(addr).await?;
    let lmr = rdma.receive().await?;
    let data = *lmr.as_slice();
    assert_eq!(data, [1_u8; 8]);
    Ok(())
}

#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}
source

pub fn get_last_ibv_event_type(&self) -> Result<Option<IbvEventType>>

Get the last ibv async event type.

Examples

use async_rdma::{RdmaBuilder, IbvEventType};
use portpicker::pick_unused_port;
use std::{
    alloc::Layout,
    net::{Ipv4Addr, SocketAddrV4},
    time::Duration, sync::Arc, io,
};
const POLLING_INTERVAL: Duration = Duration::from_secs(2);

async fn client(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Arc::new(RdmaBuilder::default()
        .set_polling_trigger(async_rdma::PollingTriggerType::Manual)
        .set_cc_evnet_timeout(POLLING_INTERVAL)
        .set_cq_size(1)
        .set_max_cqe(1)
        .connect(addr)
        .await
        .unwrap());

    // waiting for rdma send requests
    tokio::time::sleep(POLLING_INTERVAL).await;
    //  cq is full
    assert_eq!(rdma.get_last_ibv_event_type().unwrap().unwrap(),IbvEventType::IBV_EVENT_CQ_ERR);
    Ok(())
}

#[tokio::main]
async fn server(addr: SocketAddrV4) -> io::Result<()> {
    let rdma = Arc::new(RdmaBuilder::default().listen(addr).await?);
    let layout = Layout::new::<[u8; 8]>();
    let mut handles = vec![];

    for _ in 0..10{
        let rdma_move = rdma.clone();
        let lmr = rdma_move.alloc_local_mr(layout)?;
        handles.push(tokio::spawn(async move {let _ = rdma_move.send(&lmr).await;}));
    }
    tokio::time::sleep(POLLING_INTERVAL).await;
    for handle in handles{
        handle.abort();
    }
    Ok(())
}

#[tokio::main]
async fn main() {
    let addr = SocketAddrV4::new(Ipv4Addr::new(127, 0, 0, 1), pick_unused_port().unwrap());
    std::thread::spawn(move || server(addr));
    tokio::time::sleep(Duration::from_secs(3)).await;
    client(addr)
        .await
        .map_err(|err| println!("{}", err))
        .unwrap();
}

Trait Implementations§

source§

impl Debug for Rdma

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl !RefUnwindSafe for Rdma

§

impl Send for Rdma

§

impl Sync for Rdma

§

impl Unpin for Rdma

§

impl !UnwindSafe for Rdma

Blanket Implementations§

source§

impl<T> Any for Twhere
    T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for Twhere
    T: ?Sized,

const: unstable · source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for Twhere
    T: ?Sized,

const: unstable · source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<U> Cast for U

source§

fn cast<T>(self) -> Twhere
    T: TryFrom<Self>,
    Self: Sized + Display + Copy,

Performs the conversion.
source§

impl<T> From<T> for T

const: unstable · source§

fn from(t: T) -> T

Returns the argument unchanged.

source§

impl<T> Instrument for T

source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for Twhere
    U: From<T>,

const: unstable · source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

§

impl<T> Pointable for T

§

const ALIGN: usize = mem::align_of::<T>()

The alignment of pointer.
§

type Init = T

The type for initializers.
§

unsafe fn init(init: <T as Pointable>::Init) -> usize

Initializes a with the given initializer. Read more
§

unsafe fn deref<'a>(ptr: usize) -> &'a T

Dereferences the given pointer. Read more
§

unsafe fn deref_mut<'a>(ptr: usize) -> &'a mut T

Mutably dereferences the given pointer. Read more
§

unsafe fn drop(ptr: usize)

Drops the object pointed to by the given pointer. Read more
source§

impl<T, U> TryFrom<U> for Twhere
    U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
const: unstable · source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for Twhere
    U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
const: unstable · source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for Twhere
    V: MultiLane<T>,

§

fn vzip(self) -> V

source§

impl<T> WithSubscriber for T

source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>where
    S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more