use std::collections::VecDeque;
use std::net::SocketAddr;
use std::os::unix::io::RawFd;
use std::sync::Arc;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::io::unix::AsyncFd;
use crate::async_cm::{AsyncCmId, AsyncCmListener};
use crate::async_cq::{AsyncCq, CqPollState};
use crate::async_qp::AsyncQp;
use crate::cm::{CmId, ConnParam, EventChannel, PortSpace};
use crate::mr::{AccessFlags, OwnedMemoryRegion};
use crate::mw::MemoryWindow;
use crate::pd::ProtectionDomain;
use crate::qp::QpInitAttr;
use crate::transport::{RecvCompletion, Transport, TransportBuilder};
use crate::transport_common::*;
use crate::wc::{WcOpcode, WorkCompletion};
use crate::wr::{QpType, RecvWr, SendFlags, SendWr, Sge, WrOpcode};
#[derive(Debug, Clone)]
pub struct CreditRingConfig {
pub ring_capacity: usize,
pub max_message_size: usize,
pub token_timeout: Duration,
pub max_inline_data: u32,
}
impl CreditRingConfig {
pub fn datagram() -> Self {
Self {
ring_capacity: 65536,
max_message_size: 1500,
token_timeout: Duration::from_secs(5),
max_inline_data: 0,
}
}
}
impl Default for CreditRingConfig {
fn default() -> Self {
Self::datagram()
}
}
pub struct CreditRingTransport {
send_cq_state: CqPollState,
recv_cq_state: CqPollState,
disconnected: bool,
peer_disconnected: bool,
virtual_idx_map: Box<[Option<(usize, usize, usize)>]>, next_virt_idx: usize,
max_outstanding: usize,
recv_arrival_seq: usize,
recv_stash: VecDeque<RecvCompletion>,
remote_credits: usize,
remote_freed_received: u32, local_freed_credits: usize,
send_in_flight: usize,
_recv_mw: Option<MemoryWindow>,
qp: AsyncQp,
send_ring: RingBuffer,
recv_ring: RingBuffer,
remote_addr: u64,
remote_rkey: u32,
remote_capacity: usize,
remote_write_tail: usize,
doorbell_bufs: Box<[OwnedMemoryRegion]>,
doorbell_repost_idx: usize,
recv_tracker: CompletionTracker,
config: CreditRingConfig,
_pd: Arc<ProtectionDomain>,
cm_async_fd: AsyncFd<RawFd>,
cm_id: CmId,
event_channel: EventChannel,
}
impl CreditRingTransport {
pub async fn connect(addr: &SocketAddr, config: CreditRingConfig) -> crate::Result<Self> {
if crate::device::any_device_is_iwarp() {
return Err(crate::Error::InvalidArg(
"ring transport requires InfiniBand/RoCE (iWARP detected)".into(),
));
}
let async_cm = AsyncCmId::new(PortSpace::Tcp)?;
async_cm.resolve_addr(None, addr, 2000).await?;
async_cm.resolve_route(2000).await?;
let ctx = async_cm
.verbs_context()
.ok_or(crate::Error::InvalidArg("no verbs context".into()))?;
let pd = async_cm.alloc_pd()?;
let max_outstanding = config.ring_capacity / config.max_message_size;
let send_cq_depth = (max_outstanding + 2) as i32;
let recv_cq_depth = (max_outstanding + 2) as i32;
let send_cq = AsyncCq::create_tokio(ctx.clone(), send_cq_depth)?;
let recv_cq = AsyncCq::create_tokio(ctx, recv_cq_depth)?;
let qp_attr = QpInitAttr {
qp_type: QpType::Rc,
max_send_wr: send_cq_depth as u32,
max_recv_wr: recv_cq_depth as u32,
max_send_sge: 1,
max_recv_sge: 1,
max_inline_data: config.max_inline_data.max(RING_TOKEN_SIZE as u32),
sq_sig_all: true,
};
let cmqp =
async_cm.create_qp_with_cq(&pd, &qp_attr, Some(send_cq.cq()), Some(recv_cq.cq()))?;
let send_mr = pd.reg_mr_owned(vec![0u8; config.ring_capacity], AccessFlags::LOCAL_WRITE)?;
let recv_mr = pd.reg_mr_owned(
vec![0u8; config.ring_capacity],
AccessFlags::LOCAL_WRITE | AccessFlags::REMOTE_WRITE | AccessFlags::MW_BIND,
)?;
let doorbell_bufs: Box<[OwnedMemoryRegion]> = (0..max_outstanding)
.map(|_| pd.reg_mr_owned(vec![0u8; 4], AccessFlags::LOCAL_WRITE))
.collect::<crate::Result<Vec<_>>>()?
.into_boxed_slice();
let qp = AsyncQp::new(cmqp, send_cq, recv_cq);
let token_recv_mr = post_token_recv(&qp, &pd)?;
async_cm.connect(&ConnParam::default()).await?;
let (event_channel, cm_id) = async_cm.into_parts();
let cm_async_fd = AsyncFd::new(event_channel.fd()).map_err(crate::Error::Verbs)?;
let (recv_mw, mw_rkey) = bind_recv_mw(&qp, &pd, &recv_mr, config.ring_capacity)?;
let (remote_addr, remote_rkey, remote_capacity) = complete_token_exchange(
&qp,
&pd,
&recv_mr,
mw_rkey,
&token_recv_mr,
config.token_timeout,
config.ring_capacity,
)
.await?;
drain_send_cq(&qp)?;
for (i, mr) in doorbell_bufs.iter().enumerate() {
let sge = Sge::new(mr.addr(), 4, mr.lkey());
let mut wr = RecvWr::new(i as u64).sg(sge);
qp.post_recv_wr(&mut wr)?;
}
Ok(Self::from_parts(
qp,
cm_async_fd,
cm_id,
event_channel,
pd,
send_mr,
recv_mr,
recv_mw,
doorbell_bufs,
remote_addr,
remote_rkey,
remote_capacity,
max_outstanding,
config,
))
}
pub async fn accept(
listener: &AsyncCmListener,
config: CreditRingConfig,
) -> crate::Result<Self> {
if crate::device::any_device_is_iwarp() {
return Err(crate::Error::InvalidArg(
"ring transport requires InfiniBand/RoCE (iWARP detected)".into(),
));
}
let conn_id = listener.get_request().await?;
let ctx = conn_id
.verbs_context()
.ok_or(crate::Error::InvalidArg("no verbs context".into()))?;
let pd = conn_id.alloc_pd()?;
let max_outstanding = config.ring_capacity / config.max_message_size;
let send_cq_depth = (max_outstanding + 2) as i32;
let recv_cq_depth = (max_outstanding + 2) as i32;
let send_cq = AsyncCq::create_tokio(ctx.clone(), send_cq_depth)?;
let recv_cq = AsyncCq::create_tokio(ctx, recv_cq_depth)?;
let qp_attr = QpInitAttr {
qp_type: QpType::Rc,
max_send_wr: send_cq_depth as u32,
max_recv_wr: recv_cq_depth as u32,
max_send_sge: 1,
max_recv_sge: 1,
max_inline_data: config.max_inline_data.max(RING_TOKEN_SIZE as u32),
sq_sig_all: true,
};
let cmqp =
conn_id.create_qp_with_cq(&pd, &qp_attr, Some(send_cq.cq()), Some(recv_cq.cq()))?;
let send_mr = pd.reg_mr_owned(vec![0u8; config.ring_capacity], AccessFlags::LOCAL_WRITE)?;
let recv_mr = pd.reg_mr_owned(
vec![0u8; config.ring_capacity],
AccessFlags::LOCAL_WRITE | AccessFlags::REMOTE_WRITE | AccessFlags::MW_BIND,
)?;
let doorbell_bufs: Box<[OwnedMemoryRegion]> = (0..max_outstanding)
.map(|_| pd.reg_mr_owned(vec![0u8; 4], AccessFlags::LOCAL_WRITE))
.collect::<crate::Result<Vec<_>>>()?
.into_boxed_slice();
let qp = AsyncQp::new(cmqp, send_cq, recv_cq);
let token_recv_mr = post_token_recv(&qp, &pd)?;
let async_cm = listener
.complete_accept(conn_id, &ConnParam::default())
.await?;
let (event_channel, cm_id) = async_cm.into_parts();
let cm_async_fd = AsyncFd::new(event_channel.fd()).map_err(crate::Error::Verbs)?;
let (recv_mw, mw_rkey) = bind_recv_mw(&qp, &pd, &recv_mr, config.ring_capacity)?;
let (remote_addr, remote_rkey, remote_capacity) = complete_token_exchange(
&qp,
&pd,
&recv_mr,
mw_rkey,
&token_recv_mr,
config.token_timeout,
config.ring_capacity,
)
.await?;
drain_send_cq(&qp)?;
for (i, mr) in doorbell_bufs.iter().enumerate() {
let sge = Sge::new(mr.addr(), 4, mr.lkey());
let mut wr = RecvWr::new(i as u64).sg(sge);
qp.post_recv_wr(&mut wr)?;
}
Ok(Self::from_parts(
qp,
cm_async_fd,
cm_id,
event_channel,
pd,
send_mr,
recv_mr,
recv_mw,
doorbell_bufs,
remote_addr,
remote_rkey,
remote_capacity,
max_outstanding,
config,
))
}
#[allow(clippy::too_many_arguments)]
fn from_parts(
qp: AsyncQp,
cm_async_fd: AsyncFd<RawFd>,
cm_id: CmId,
event_channel: EventChannel,
pd: Arc<ProtectionDomain>,
send_mr: OwnedMemoryRegion,
recv_mr: OwnedMemoryRegion,
recv_mw: MemoryWindow,
doorbell_bufs: Box<[OwnedMemoryRegion]>,
remote_addr: u64,
remote_rkey: u32,
remote_capacity: usize,
max_outstanding: usize,
config: CreditRingConfig,
) -> Self {
let ring_capacity = config.ring_capacity;
Self {
send_cq_state: CqPollState::default(),
recv_cq_state: CqPollState::default(),
disconnected: false,
peer_disconnected: false,
virtual_idx_map: vec![None; max_outstanding].into_boxed_slice(),
next_virt_idx: 0,
max_outstanding,
recv_arrival_seq: 0,
recv_stash: VecDeque::new(),
remote_credits: max_outstanding,
remote_freed_received: 0,
local_freed_credits: 0,
send_in_flight: 0,
_recv_mw: Some(recv_mw),
qp,
send_ring: RingBuffer::new(send_mr, ring_capacity),
recv_ring: RingBuffer::new(recv_mr, ring_capacity),
remote_addr,
remote_rkey,
remote_capacity,
remote_write_tail: 0,
doorbell_bufs,
doorbell_repost_idx: 0,
recv_tracker: CompletionTracker::new(max_outstanding),
config,
_pd: pd,
cm_async_fd,
cm_id,
event_channel,
}
}
fn check_cm_event(&mut self) -> bool {
match self.event_channel.try_get_event() {
Ok(ev) => {
let etype = ev.event_type();
ev.ack();
if etype == crate::cm::CmEventType::Disconnected {
self.peer_disconnected = true;
}
self.peer_disconnected
}
Err(crate::Error::WouldBlock) => false,
Err(_) => {
self.peer_disconnected = true;
true
}
}
}
fn repost_doorbell(&mut self) -> crate::Result<()> {
let idx = self.doorbell_repost_idx;
self.doorbell_repost_idx = (idx + 1) % self.doorbell_bufs.len();
let mr = &self.doorbell_bufs[idx];
let sge = Sge::new(mr.addr(), 4, mr.lkey());
let mut wr = RecvWr::new(idx as u64).sg(sge);
self.qp.post_recv_wr(&mut wr)
}
fn drain_recv_credits(&mut self) {
let mut wc_buf = [WorkCompletion::default(); 8];
if self.qp.recv_cq().cq().req_notify(false).is_err() {
self.peer_disconnected = true;
return;
}
let n = match self.qp.recv_cq().cq().poll(&mut wc_buf) {
Ok(n) => n,
Err(_) => {
self.peer_disconnected = true;
return;
}
};
for wc in &wc_buf[..n] {
if !wc.is_success() {
self.peer_disconnected = true;
return;
}
match wc.opcode() {
WcOpcode::RecvRdmaWithImm => {
let imm = wc.imm_data();
let offset = (imm >> 16) as usize;
let length = (imm & 0xFFFF) as usize;
if length == 0 {
let _pad_len = self.recv_ring.capacity - offset;
if self.repost_doorbell().is_err() {
self.peer_disconnected = true;
return;
}
let seq = self.recv_arrival_seq;
self.recv_arrival_seq += 1;
let slot = seq % self.max_outstanding;
let contiguous = self.recv_tracker.release(slot);
if contiguous > 0 {
self.local_freed_credits += contiguous;
let credit_imm = self.local_freed_credits as u32;
let mut credit_wr = SendWr::new(
WR_ID_CREDIT_FLAG | (self.local_freed_credits as u64),
WrOpcode::SendWithImm(credit_imm),
)
.flags(SendFlags::SIGNALED);
if self.qp.post_send_wr(&mut credit_wr).is_err() {
self.peer_disconnected = true;
return;
}
self.send_in_flight += 1;
}
} else {
let mut virt_idx = self.next_virt_idx;
let mut found = false;
for _ in 0..self.max_outstanding {
if self.virtual_idx_map[virt_idx].is_none() {
found = true;
break;
}
virt_idx = (virt_idx + 1) % self.max_outstanding;
}
if found {
let seq = self.recv_arrival_seq;
self.recv_arrival_seq += 1;
self.virtual_idx_map[virt_idx] = Some((offset, length, seq));
self.next_virt_idx = (virt_idx + 1) % self.max_outstanding;
self.recv_stash.push_back(RecvCompletion {
buf_idx: virt_idx,
byte_len: length,
});
}
}
}
WcOpcode::Recv => {
if wc.wc_flags() & rdma_io_sys::ibverbs::IBV_WC_WITH_IMM != 0 {
let freed_count = wc.imm_data();
let last = self.remote_freed_received;
let delta = freed_count.wrapping_sub(last) as usize;
if delta > 0 && delta <= self.max_outstanding {
self.remote_credits += delta;
self.remote_freed_received = freed_count;
tracing::debug!(
freed_count,
delta,
remote_credits = self.remote_credits,
"drain_recv_credits: credit update"
);
}
}
if self.repost_doorbell().is_err() {
self.peer_disconnected = true;
return;
}
}
_ => {}
}
}
}
}
impl Transport for CreditRingTransport {
fn send_copy(&mut self, data: &[u8]) -> crate::Result<usize> {
if self.peer_disconnected {
return Err(crate::Error::WorkCompletion {
status: rdma_io_sys::ibverbs::IBV_WC_WR_FLUSH_ERR,
vendor_err: 0,
});
}
if data.is_empty() {
return Ok(0);
}
if self.remote_credits == 0 {
self.drain_recv_credits();
if self.remote_credits == 0 {
return Ok(0);
}
}
let data_len = data
.len()
.min(self.config.max_message_size)
.min(self.remote_capacity)
.min(0xFFFF);
let (local_offset, padding) = match self.send_ring.reserve(data_len) {
Some(result) => result,
None => return Ok(0),
};
if padding > 0 {
if self.remote_credits < 2 {
self.send_ring.tail = if local_offset == 0 {
self.send_ring.capacity - padding
} else {
local_offset
};
return Ok(0);
}
let pad_remote_offset = self.remote_write_tail;
let imm = (pad_remote_offset as u32) << 16; let mut pad_wr = SendWr::new(WR_ID_PADDING_SENTINEL, WrOpcode::RdmaWriteWithImm(imm))
.flags(SendFlags::SIGNALED)
.rdma(
self.remote_addr + pad_remote_offset as u64,
self.remote_rkey,
);
self.qp.post_send_wr(&mut pad_wr)?;
self.remote_credits -= 1;
self.send_in_flight += 1;
self.remote_write_tail = 0;
}
self.send_ring.mr.as_mut_slice()[local_offset..local_offset + data_len]
.copy_from_slice(&data[..data_len]);
let remote_offset = self.remote_write_tail;
let imm = ((remote_offset as u32) << 16) | (data_len as u32);
let sge = Sge::new(
self.send_ring.mr.addr() + local_offset as u64,
data_len as u32,
self.send_ring.mr.lkey(),
);
let release_len = padding + data_len;
let mut wr = SendWr::new(release_len as u64, WrOpcode::RdmaWriteWithImm(imm))
.flags(SendFlags::SIGNALED)
.sg(sge)
.rdma(self.remote_addr + remote_offset as u64, self.remote_rkey);
self.qp.post_send_wr(&mut wr)?;
self.remote_write_tail = (remote_offset + data_len) % self.remote_capacity;
self.remote_credits -= 1;
self.send_in_flight += 1;
tracing::debug!(
data_len,
remote_offset,
remote_credits = self.remote_credits,
send_in_flight = self.send_in_flight,
"send_copy: posted Write+Imm"
);
Ok(data_len)
}
fn poll_send_completion(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
let mut wc_buf = [WorkCompletion::default(); 8];
let n = match self
.qp
.poll_send_cq(cx, &mut self.send_cq_state, &mut wc_buf)
{
Poll::Pending => return Poll::Pending,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Ready(Ok(n)) => n,
};
for wc in &wc_buf[..n] {
if !wc.is_success() {
self.peer_disconnected = true;
return Poll::Ready(Err(crate::Error::WorkCompletion {
status: wc.status_raw(),
vendor_err: wc.vendor_err(),
}));
}
let wr_id = wc.wr_id();
if wr_id & WR_ID_CREDIT_FLAG != 0 {
} else if wr_id == WR_ID_PADDING_SENTINEL {
} else {
let data_len = wr_id as usize;
if data_len > 0 && data_len <= self.send_ring.capacity {
self.send_ring.release(data_len);
}
}
self.send_in_flight = self.send_in_flight.saturating_sub(1);
}
Poll::Ready(Ok(()))
}
fn poll_recv(
&mut self,
cx: &mut Context<'_>,
out: &mut [RecvCompletion],
) -> Poll<crate::Result<usize>> {
let mut filled = 0;
while filled < out.len() {
if let Some(rc) = self.recv_stash.pop_front() {
out[filled] = rc;
filled += 1;
} else {
break;
}
}
if filled >= out.len() {
return Poll::Ready(Ok(filled));
}
loop {
let mut wc_buf = [WorkCompletion::default(); 8];
let n = match self
.qp
.poll_recv_cq(cx, &mut self.recv_cq_state, &mut wc_buf)
{
Poll::Pending => {
if filled > 0 {
return Poll::Ready(Ok(filled));
}
return Poll::Pending;
}
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Ready(Ok(n)) => n,
};
let mut got_data = false;
let mut got_credits = false;
for wc in &wc_buf[..n] {
if !wc.is_success() {
self.peer_disconnected = true;
return Poll::Ready(Err(crate::Error::WorkCompletion {
status: wc.status_raw(),
vendor_err: wc.vendor_err(),
}));
}
match wc.opcode() {
WcOpcode::RecvRdmaWithImm => {
let imm = wc.imm_data();
let offset = (imm >> 16) as usize;
let length = (imm & 0xFFFF) as usize;
if offset >= self.recv_ring.capacity
|| (length > 0 && offset + length > self.recv_ring.capacity)
{
self.peer_disconnected = true;
return Poll::Ready(Err(crate::Error::InvalidArg(
"recv ring offset/length out of bounds".into(),
)));
}
if length == 0 {
if self.repost_doorbell().is_err() {
self.peer_disconnected = true;
return Poll::Ready(Err(crate::Error::InvalidArg(
"repost_doorbell failed".into(),
)));
}
let seq = self.recv_arrival_seq;
self.recv_arrival_seq += 1;
let slot = seq % self.max_outstanding;
let contiguous = self.recv_tracker.release(slot);
if contiguous > 0 {
self.local_freed_credits += contiguous;
let credit_imm = self.local_freed_credits as u32;
let mut credit_wr = SendWr::new(
WR_ID_CREDIT_FLAG | (self.local_freed_credits as u64),
WrOpcode::SendWithImm(credit_imm),
)
.flags(SendFlags::SIGNALED);
if self.qp.post_send_wr(&mut credit_wr).is_err() {
self.peer_disconnected = true;
return Poll::Ready(Err(crate::Error::InvalidArg(
"credit post_send failed".into(),
)));
}
self.send_in_flight += 1;
}
continue;
}
let mut virt_idx = self.next_virt_idx;
let mut found = false;
for _ in 0..self.max_outstanding {
if self.virtual_idx_map[virt_idx].is_none() {
found = true;
break;
}
virt_idx = (virt_idx + 1) % self.max_outstanding;
}
if !found {
self.peer_disconnected = true;
return Poll::Ready(Err(crate::Error::InvalidArg(
"recv virtual index map exhausted".into(),
)));
}
let seq = self.recv_arrival_seq;
self.recv_arrival_seq += 1;
self.virtual_idx_map[virt_idx] = Some((offset, length, seq));
self.next_virt_idx = (virt_idx + 1) % self.max_outstanding;
let rc = RecvCompletion {
buf_idx: virt_idx,
byte_len: length,
};
if filled < out.len() {
out[filled] = rc;
filled += 1;
} else {
self.recv_stash.push_back(rc);
}
got_data = true;
}
WcOpcode::Recv => {
if wc.wc_flags() & rdma_io_sys::ibverbs::IBV_WC_WITH_IMM != 0 {
let freed_count = wc.imm_data();
let last = self.remote_freed_received;
let delta = freed_count.wrapping_sub(last) as usize;
if delta > 0 && delta <= self.max_outstanding {
self.remote_credits += delta;
self.remote_freed_received = freed_count;
tracing::debug!(
freed_count,
delta,
remote_credits = self.remote_credits,
"credit update received"
);
}
got_credits = true;
}
if self.repost_doorbell().is_err() {
self.peer_disconnected = true;
return Poll::Ready(Err(crate::Error::InvalidArg(
"repost_doorbell failed".into(),
)));
}
}
_ => {
}
}
}
if filled > 0 || got_data {
tracing::debug!(filled, "poll_recv: returning data completions");
return Poll::Ready(Ok(filled));
}
if got_credits {
tracing::debug!(
remote_credits = self.remote_credits,
"poll_recv: processed credits only, returning Ok(0)"
);
return Poll::Ready(Ok(0));
}
}
}
fn recv_buf(&self, buf_idx: usize) -> &[u8] {
let (offset, length, _seq) =
self.virtual_idx_map[buf_idx].expect("recv_buf called with invalid buf_idx");
&self.recv_ring.mr.as_slice()[offset..offset + length]
}
fn repost_recv(&mut self, buf_idx: usize) -> crate::Result<()> {
let (_offset, _length, arrival_seq) = self.virtual_idx_map[buf_idx]
.take()
.expect("repost_recv called with invalid buf_idx");
self.repost_doorbell()?;
let slot = arrival_seq % self.max_outstanding;
let contiguous = self.recv_tracker.release(slot);
if contiguous > 0 {
self.local_freed_credits += contiguous;
let imm = self.local_freed_credits as u32;
let wr_id = WR_ID_CREDIT_FLAG | (self.local_freed_credits as u64);
let mut wr = SendWr::new(wr_id, WrOpcode::SendWithImm(imm)).flags(SendFlags::SIGNALED);
self.qp.post_send_wr(&mut wr)?;
self.send_in_flight += 1;
tracing::debug!(
contiguous,
local_freed_credits = self.local_freed_credits,
send_in_flight = self.send_in_flight,
"repost_recv: sent credit update (contiguous advance)"
);
} else {
tracing::debug!(
arrival_seq,
"repost_recv: non-contiguous release, credit deferred"
);
}
Ok(())
}
fn poll_disconnect(&mut self, cx: &mut Context<'_>) -> bool {
if self.peer_disconnected {
return true;
}
loop {
match self.cm_async_fd.poll_read_ready(cx) {
Poll::Ready(Ok(mut guard)) => {
guard.clear_ready();
if self.check_cm_event() {
return true;
}
}
Poll::Pending => {
return false;
}
Poll::Ready(Err(_)) => {
self.peer_disconnected = true;
return true;
}
}
}
}
fn disconnect(&mut self) -> crate::Result<()> {
if !self.disconnected {
self.cm_id.disconnect()?;
self.disconnected = true;
}
Ok(())
}
fn local_addr(&self) -> Option<SocketAddr> {
self.cm_id.local_addr()
}
fn peer_addr(&self) -> Option<SocketAddr> {
self.cm_id.peer_addr()
}
}
impl Drop for CreditRingTransport {
fn drop(&mut self) {
if !self.disconnected {
let _ = self.cm_id.disconnect();
}
let mut wc = [WorkCompletion::default(); 16];
loop {
match self.qp.send_cq().cq().poll(&mut wc) {
Ok(0) | Err(_) => break,
Ok(_) => continue,
}
}
loop {
match self.qp.recv_cq().cq().poll(&mut wc) {
Ok(0) | Err(_) => break,
Ok(_) => continue,
}
}
}
}
impl TransportBuilder for CreditRingConfig {
type Transport = CreditRingTransport;
async fn connect(&self, addr: &SocketAddr) -> crate::Result<CreditRingTransport> {
CreditRingTransport::connect(addr, self.clone()).await
}
async fn accept(&self, listener: &AsyncCmListener) -> crate::Result<CreditRingTransport> {
CreditRingTransport::accept(listener, self.clone()).await
}
}