use crate::ibverbs::device::Context;
use crate::ibverbs::error::{IbvError, IbvResult};
use crate::ibverbs::work::WorkCompletion;
use ibverbs_sys::*;
use std::sync::Arc;
use std::{io, ptr};
#[derive(Debug, Clone)]
#[doc(alias = "ibv_cq")]
#[doc(alias = "ibv_create_cq")]
pub struct CompletionQueue {
pub(super) inner: Arc<CompletionQueueInner>,
}
impl CompletionQueue {
pub fn create(context: &Context, min_capacity: u32) -> IbvResult<CompletionQueue> {
let min_cq_entries = min_capacity.try_into().map_err(|_| {
IbvError::InvalidInput("Completion queue min_cq_entries must fit in an i32".to_string())
})?;
let cq = unsafe {
ibv_create_cq(
context.inner.ctx,
min_cq_entries,
ptr::null_mut(), ptr::null::<ibv_comp_channel>() as *mut _, 0, )
};
if cq.is_null() {
return Err(IbvError::from_errno_with_msg(
io::Error::last_os_error().raw_os_error().unwrap_or(0),
format!("Failed to create completion queue with size {min_cq_entries}"),
));
}
log::debug!("CompletionQueue created with capacity {}", min_capacity);
Ok(CompletionQueue {
inner: Arc::new(CompletionQueueInner {
context: context.clone(),
cq,
min_capacity,
}),
})
}
pub fn poll<'poll_buff>(
&self,
completions: &'poll_buff mut [PollSlot],
) -> IbvResult<PolledCompletions<'poll_buff>> {
let ne = i32::try_from(completions.len()).unwrap_or_else(|_| {
log::warn!(
"poll buffer length {} exceeds i32::MAX; only {} entries will be polled",
completions.len(),
i32::MAX
);
i32::MAX
});
let ctx: *mut ibv_context = unsafe { (*self.inner.cq).context };
let poll_cq = unsafe {
(*ctx)
.ops
.poll_cq
.expect("poll_cq function pointer should be set by driver")
};
let num_polled =
unsafe { poll_cq(self.inner.cq, ne, completions.as_mut_ptr() as *mut ibv_wc) };
if num_polled < 0 {
Err(IbvError::from_errno_with_msg(
num_polled.abs(),
"Failed to poll completion queue",
))
} else {
#[allow(clippy::cast_sign_loss)]
Ok(PolledCompletions {
wcs: &mut completions[0..num_polled as usize],
})
}
}
pub fn min_capacity(&self) -> u32 {
self.inner.min_capacity
}
pub fn context(&self) -> &Context {
&self.inner.context
}
}
#[derive(Copy, Clone, Debug, Default)]
#[repr(transparent)]
pub struct PollSlot {
wc: ibv_wc,
}
pub struct PolledCompletions<'a> {
wcs: &'a mut [PollSlot],
}
impl PolledCompletions<'_> {
pub fn len(&self) -> usize {
self.wcs.len()
}
pub fn is_empty(&self) -> bool {
self.wcs.is_empty()
}
}
impl<'a> IntoIterator for PolledCompletions<'a> {
type Item = WorkCompletion;
type IntoIter = std::iter::Map<std::slice::Iter<'a, PollSlot>, fn(&PollSlot) -> WorkCompletion>;
fn into_iter(self) -> Self::IntoIter {
self.wcs
.iter()
.map(|wc_slot| WorkCompletion::new(wc_slot.wc))
}
}
pub(super) struct CompletionQueueInner {
pub(super) context: Context,
pub(super) cq: *mut ibv_cq,
pub(super) min_capacity: u32,
}
unsafe impl Send for CompletionQueueInner {}
unsafe impl Sync for CompletionQueueInner {}
impl Drop for CompletionQueueInner {
fn drop(&mut self) {
log::debug!("CompletionQueue destroyed");
let errno = unsafe { ibv_destroy_cq(self.cq) };
if errno != 0 {
let error = IbvError::from_errno_with_msg(errno, "Failed to destroy completion queue");
log::error!("{error}");
}
}
}
impl std::fmt::Debug for CompletionQueueInner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CompletionQueueInner")
.field("handle", &(unsafe { *self.cq }).handle)
.field("capacity", &(unsafe { *self.cq }).cqe)
.field("context", &self.context)
.finish()
}
}