Skip to main content

rdma_io/
cq.rs

1//! Completion Queue.
2
3use std::sync::Arc;
4
5use rdma_io_sys::ibverbs::*;
6use rdma_io_sys::wrapper::*;
7
8use crate::Result;
9use crate::comp_channel::CompletionChannel;
10use crate::device::Context;
11use crate::error::{from_ptr, from_ret};
12use crate::wc::WorkCompletion;
13
14/// An RDMA Completion Queue (`ibv_cq`).
15pub struct CompletionQueue {
16    pub(crate) inner: *mut ibv_cq,
17    _ctx: Arc<Context>,
18}
19
20// Safety: ibv_cq is thread-safe (polling serialized by caller or lock).
21unsafe impl Send for CompletionQueue {}
22unsafe impl Sync for CompletionQueue {}
23
24impl Drop for CompletionQueue {
25    fn drop(&mut self) {
26        let ret = unsafe { ibv_destroy_cq(self.inner) };
27        if ret != 0 {
28            tracing::error!(
29                "ibv_destroy_cq failed: {}",
30                std::io::Error::from_raw_os_error(-ret)
31            );
32        }
33    }
34}
35
36impl CompletionQueue {
37    /// Create a new CQ with `cqe` entries.
38    pub fn new(ctx: Arc<Context>, cqe: i32) -> Result<Arc<Self>> {
39        let cq = from_ptr(unsafe {
40            ibv_create_cq(
41                ctx.inner,
42                cqe,
43                std::ptr::null_mut(), // cq_context
44                std::ptr::null_mut(), // comp_channel
45                0,                    // comp_vector
46            )
47        })?;
48        Ok(Arc::new(Self {
49            inner: cq,
50            _ctx: ctx,
51        }))
52    }
53
54    /// Poll up to `wc_buf.len()` completions.
55    ///
56    /// Returns the number of completions written to `wc_buf`.
57    pub fn poll(&self, wc_buf: &mut [WorkCompletion]) -> Result<usize> {
58        let n = unsafe {
59            rdma_wrap_ibv_poll_cq(self.inner, wc_buf.len() as i32, wc_buf.as_mut_ptr().cast())
60        };
61        if n < 0 {
62            Err(crate::Error::Verbs(std::io::Error::from_raw_os_error(-n)))
63        } else {
64            Ok(n as usize)
65        }
66    }
67
68    /// Request notification for the next completion.
69    pub fn req_notify(&self, solicited_only: bool) -> Result<()> {
70        from_ret(unsafe { rdma_wrap_ibv_req_notify_cq(self.inner, i32::from(solicited_only)) })
71    }
72
73    /// Raw pointer (for advanced/FFI use).
74    pub fn as_raw(&self) -> *mut ibv_cq {
75        self.inner
76    }
77
78    /// Create a CQ associated with a completion channel.
79    ///
80    /// When completions arrive, the channel's fd becomes readable,
81    /// enabling async notification via `epoll`/`kqueue`.
82    pub fn with_comp_channel(
83        ctx: Arc<Context>,
84        cqe: i32,
85        channel: &CompletionChannel,
86    ) -> Result<Arc<Self>> {
87        let cq = from_ptr(unsafe {
88            ibv_create_cq(
89                ctx.inner,
90                cqe,
91                std::ptr::null_mut(), // cq_context
92                channel.as_raw(),     // comp_channel
93                0,                    // comp_vector
94            )
95        })?;
96        Ok(Arc::new(Self {
97            inner: cq,
98            _ctx: ctx,
99        }))
100    }
101}
102
103impl Context {
104    /// Create a Completion Queue.
105    pub fn create_cq(self: &Arc<Self>, cqe: i32) -> Result<Arc<CompletionQueue>> {
106        CompletionQueue::new(Arc::clone(self), cqe)
107    }
108}