use rdma_io_sys::ibverbs::*;
use rdma_io_sys::wrapper::*;
use std::task::{Context, Poll};
use crate::Result;
use crate::async_cq::{AsyncCq, CqPollState};
use crate::cm::CmQueuePair;
use crate::error::from_ret;
use crate::mr::{OwnedMemoryRegion, RemoteMr};
use crate::wc::WorkCompletion;
use crate::wr::{RecvWr, SendFlags, SendWr, Sge, WrOpcode};
pub struct AsyncQp {
qp: CmQueuePair,
send_cq: AsyncCq,
recv_cq: AsyncCq,
}
impl AsyncQp {
pub fn new(qp: CmQueuePair, send_cq: AsyncCq, recv_cq: AsyncCq) -> Self {
Self {
qp,
send_cq,
recv_cq,
}
}
pub fn as_raw(&self) -> *mut ibv_qp {
self.qp.as_raw()
}
pub fn send_cq(&self) -> &AsyncCq {
&self.send_cq
}
pub fn recv_cq(&self) -> &AsyncCq {
&self.recv_cq
}
pub(crate) fn post_send_wr(&self, wr: &mut SendWr) -> Result<()> {
let mut raw = wr.build_raw();
let mut bad_wr: *mut ibv_send_wr = std::ptr::null_mut();
from_ret(unsafe { rdma_wrap_ibv_post_send(self.qp.as_raw(), &mut raw, &mut bad_wr) })
}
pub(crate) fn post_recv_wr(&self, wr: &mut RecvWr) -> Result<()> {
let mut raw = wr.build_raw();
let mut bad_wr: *mut ibv_recv_wr = std::ptr::null_mut();
from_ret(unsafe { rdma_wrap_ibv_post_recv(self.qp.as_raw(), &mut raw, &mut bad_wr) })
}
pub fn post_send_signaled(
&self,
mr: &OwnedMemoryRegion,
offset: usize,
length: usize,
wr_id: u64,
) -> Result<()> {
let sge = Sge::new(
unsafe { (*mr.as_raw()).addr as u64 } + offset as u64,
length as u32,
mr.lkey(),
);
let mut wr = SendWr::new(wr_id, WrOpcode::Send)
.flags(SendFlags::SIGNALED)
.sg(sge);
self.post_send_wr(&mut wr)
}
pub fn post_recv_buffer(&self, mr: &OwnedMemoryRegion, wr_id: u64) -> Result<()> {
let sge = Sge::new(
unsafe { (*mr.as_raw()).addr as u64 },
mr.as_slice().len() as u32,
mr.lkey(),
);
let mut wr = RecvWr::new(wr_id).sg(sge);
self.post_recv_wr(&mut wr)
}
#[inline]
pub fn poll_send_cq(
&self,
cx: &mut Context<'_>,
state: &mut CqPollState,
wc_buf: &mut [WorkCompletion],
) -> Poll<Result<usize>> {
self.send_cq.poll_completions(cx, state, wc_buf)
}
#[inline]
pub fn poll_recv_cq(
&self,
cx: &mut Context<'_>,
state: &mut CqPollState,
wc_buf: &mut [WorkCompletion],
) -> Poll<Result<usize>> {
self.recv_cq.poll_completions(cx, state, wc_buf)
}
pub async fn send(
&self,
mr: &OwnedMemoryRegion,
offset: usize,
length: usize,
wr_id: u64,
) -> Result<WorkCompletion> {
let sge = Sge::new(
unsafe { (*mr.as_raw()).addr as u64 } + offset as u64,
length as u32,
mr.lkey(),
);
let mut wr = SendWr::new(wr_id, WrOpcode::Send)
.flags(SendFlags::SIGNALED)
.sg(sge);
self.post_send_wr(&mut wr)?;
self.send_cq.poll_wr_id(wr_id).await
}
pub async fn recv(
&self,
mr: &OwnedMemoryRegion,
offset: usize,
length: usize,
wr_id: u64,
) -> Result<WorkCompletion> {
let sge = Sge::new(
unsafe { (*mr.as_raw()).addr as u64 } + offset as u64,
length as u32,
mr.lkey(),
);
let mut wr = RecvWr::new(wr_id).sg(sge);
self.post_recv_wr(&mut wr)?;
self.recv_cq.poll_wr_id(wr_id).await
}
pub async fn send_with_imm(
&self,
mr: &OwnedMemoryRegion,
offset: usize,
length: usize,
imm_data: u32,
wr_id: u64,
) -> Result<WorkCompletion> {
let sge = Sge::new(
unsafe { (*mr.as_raw()).addr as u64 } + offset as u64,
length as u32,
mr.lkey(),
);
let mut wr = SendWr::new(wr_id, WrOpcode::SendWithImm(imm_data))
.flags(SendFlags::SIGNALED)
.sg(sge);
self.post_send_wr(&mut wr)?;
self.send_cq.poll_wr_id(wr_id).await
}
pub async fn read_remote(
&self,
mr: &OwnedMemoryRegion,
local_offset: usize,
length: usize,
remote: &RemoteMr,
remote_offset: u64,
wr_id: u64,
) -> Result<WorkCompletion> {
let sge = Sge::new(mr.addr() + local_offset as u64, length as u32, mr.lkey());
let mut wr = SendWr::new(wr_id, WrOpcode::RdmaRead)
.flags(SendFlags::SIGNALED)
.sg(sge)
.rdma(remote.addr + remote_offset, remote.rkey);
self.post_send_wr(&mut wr)?;
self.send_cq.poll_wr_id(wr_id).await
}
pub async fn write_remote(
&self,
mr: &OwnedMemoryRegion,
local_offset: usize,
length: usize,
remote: &RemoteMr,
remote_offset: u64,
wr_id: u64,
) -> Result<WorkCompletion> {
let sge = Sge::new(mr.addr() + local_offset as u64, length as u32, mr.lkey());
let mut wr = SendWr::new(wr_id, WrOpcode::RdmaWrite)
.flags(SendFlags::SIGNALED)
.sg(sge)
.rdma(remote.addr + remote_offset, remote.rkey);
self.post_send_wr(&mut wr)?;
self.send_cq.poll_wr_id(wr_id).await
}
#[allow(clippy::too_many_arguments)]
pub async fn write_remote_with_imm(
&self,
mr: &OwnedMemoryRegion,
local_offset: usize,
length: usize,
remote: &RemoteMr,
remote_offset: u64,
imm_data: u32,
wr_id: u64,
) -> Result<WorkCompletion> {
let sge = Sge::new(mr.addr() + local_offset as u64, length as u32, mr.lkey());
let mut wr = SendWr::new(wr_id, WrOpcode::RdmaWriteWithImm(imm_data))
.flags(SendFlags::SIGNALED)
.sg(sge)
.rdma(remote.addr + remote_offset, remote.rkey);
self.post_send_wr(&mut wr)?;
self.send_cq.poll_wr_id(wr_id).await
}
#[allow(clippy::too_many_arguments)]
pub async fn compare_and_swap(
&self,
result_mr: &OwnedMemoryRegion,
result_offset: usize,
remote: &RemoteMr,
remote_offset: u64,
compare: u64,
swap: u64,
wr_id: u64,
) -> Result<WorkCompletion> {
let sge = Sge::new(result_mr.addr() + result_offset as u64, 8, result_mr.lkey());
let mut wr = SendWr::new(wr_id, WrOpcode::AtomicCmpAndSwp)
.flags(SendFlags::SIGNALED)
.sg(sge)
.atomic(remote.addr + remote_offset, remote.rkey, compare, swap);
self.post_send_wr(&mut wr)?;
self.send_cq.poll_wr_id(wr_id).await
}
pub async fn fetch_and_add(
&self,
result_mr: &OwnedMemoryRegion,
result_offset: usize,
remote: &RemoteMr,
remote_offset: u64,
add_value: u64,
wr_id: u64,
) -> Result<WorkCompletion> {
let sge = Sge::new(result_mr.addr() + result_offset as u64, 8, result_mr.lkey());
let mut wr = SendWr::new(wr_id, WrOpcode::AtomicFetchAndAdd)
.flags(SendFlags::SIGNALED)
.sg(sge)
.atomic(remote.addr + remote_offset, remote.rkey, add_value, 0);
self.post_send_wr(&mut wr)?;
self.send_cq.poll_wr_id(wr_id).await
}
}