1use std::sync::Arc;
4
5use rdma_io_sys::ibverbs::*;
6use rdma_io_sys::wrapper::*;
7
8use crate::Result;
9use crate::comp_channel::CompletionChannel;
10use crate::device::Context;
11use crate::error::{from_ptr, from_ret};
12use crate::wc::WorkCompletion;
13
14pub struct CompletionQueue {
16 pub(crate) inner: *mut ibv_cq,
17 _ctx: Arc<Context>,
18}
19
20unsafe impl Send for CompletionQueue {}
22unsafe impl Sync for CompletionQueue {}
23
24impl Drop for CompletionQueue {
25 fn drop(&mut self) {
26 let ret = unsafe { ibv_destroy_cq(self.inner) };
27 if ret != 0 {
28 tracing::error!(
29 "ibv_destroy_cq failed: {}",
30 std::io::Error::from_raw_os_error(-ret)
31 );
32 }
33 }
34}
35
36impl CompletionQueue {
37 pub fn new(ctx: Arc<Context>, cqe: i32) -> Result<Arc<Self>> {
39 let cq = from_ptr(unsafe {
40 ibv_create_cq(
41 ctx.inner,
42 cqe,
43 std::ptr::null_mut(), std::ptr::null_mut(), 0, )
47 })?;
48 Ok(Arc::new(Self {
49 inner: cq,
50 _ctx: ctx,
51 }))
52 }
53
54 pub fn poll(&self, wc_buf: &mut [WorkCompletion]) -> Result<usize> {
58 let n = unsafe {
59 rdma_wrap_ibv_poll_cq(self.inner, wc_buf.len() as i32, wc_buf.as_mut_ptr().cast())
60 };
61 if n < 0 {
62 Err(crate::Error::Verbs(std::io::Error::from_raw_os_error(-n)))
63 } else {
64 Ok(n as usize)
65 }
66 }
67
68 pub fn req_notify(&self, solicited_only: bool) -> Result<()> {
70 from_ret(unsafe { rdma_wrap_ibv_req_notify_cq(self.inner, i32::from(solicited_only)) })
71 }
72
73 pub fn as_raw(&self) -> *mut ibv_cq {
75 self.inner
76 }
77
78 pub fn with_comp_channel(
83 ctx: Arc<Context>,
84 cqe: i32,
85 channel: &CompletionChannel,
86 ) -> Result<Arc<Self>> {
87 let cq = from_ptr(unsafe {
88 ibv_create_cq(
89 ctx.inner,
90 cqe,
91 std::ptr::null_mut(), channel.as_raw(), 0, )
95 })?;
96 Ok(Arc::new(Self {
97 inner: cq,
98 _ctx: ctx,
99 }))
100 }
101}
102
103impl Context {
104 pub fn create_cq(self: &Arc<Self>, cqe: i32) -> Result<Arc<CompletionQueue>> {
106 CompletionQueue::new(Arc::clone(self), cqe)
107 }
108}