Skip to main content

ibverbs_rs/ibverbs/
completion_queue.rs

1//! Completion queue — polling for completed work requests.
2//!
3//! A [`CompletionQueue`] (CQ) is the mechanism used to receive notifications about completed
4//! Work Requests (WR) from a Queue Pair (QP). When a Send or Receive operation finishes,
5//! the hardware writes a "Work Completion" (WC) entry into the CQ.
6//!
7//! # Polling
8//!
9//! This library uses direct polling for maximum performance. You must manually call
10//! [`CompletionQueue::poll`] to check for completions. This operation bypasses the
11//! kernel and reads directly from the hardware queue, ensuring minimal latency.
12//!
13//! # Example: Polling for Completions
14//!
15//! ```no_run
16//! use ibverbs_rs::ibverbs;
17//! use ibverbs_rs::ibverbs::completion_queue::PollSlot;
18//!
19//! let context = ibverbs::open_device("mlx5_0")?;
20//! let cq = context.create_cq(16)?;
21//!
22//! // Pre-allocate a buffer for polling multiple completions at once
23//! let mut slots = [PollSlot::default(); 16];
24//!
25//! // Poll for completions (non-blocking)
26//! let completions = cq.poll(&mut slots)?;
27//!
28//! for wc in completions {
29//!     println!("Work completion result: {:?}", wc.result());
30//! }
31//! # Ok::<(), Box<dyn std::error::Error>>(())
32//! ```
33
34use crate::ibverbs::device::Context;
35use crate::ibverbs::error::{IbvError, IbvResult};
36use crate::ibverbs::work::WorkCompletion;
37use ibverbs_sys::*;
38use std::sync::Arc;
39use std::{io, ptr};
40
41/// A shared handle to a Completion Queue (CQ).
42///
43/// This struct is thread-safe ([`Sync`]) and reference-counted ([`Arc`]). It holds a strong reference
44/// to the [`Context`] that created it, ensuring the device remains open.
45#[derive(Debug, Clone)]
46#[doc(alias = "ibv_cq")]
47#[doc(alias = "ibv_create_cq")]
48pub struct CompletionQueue {
49    pub(super) inner: Arc<CompletionQueueInner>,
50}
51
52impl CompletionQueue {
53    /// Creates a new Completion Queue.
54    ///
55    /// # Arguments
56    ///
57    /// * `context` — The device context on which to create the CQ.
58    /// * `min_capacity` — The minimum number of completion entries this CQ must hold.
59    ///   The hardware may allocate a larger queue.
60    ///
61    /// # Errors
62    ///
63    /// * Returns [`IbvError::InvalidInput`] if `min_capacity` is too large (exceeds device limits)
64    ///   or cannot fit in an `i32`.
65    /// * Returns [`IbvError::Resource`] if the system cannot allocate the necessary resources.
66    pub fn create(context: &Context, min_capacity: u32) -> IbvResult<CompletionQueue> {
67        let min_cq_entries = min_capacity.try_into().map_err(|_| {
68            IbvError::InvalidInput("Completion queue min_cq_entries must fit in an i32".to_string())
69        })?;
70
71        let cq = unsafe {
72            ibv_create_cq(
73                context.inner.ctx,
74                min_cq_entries,
75                ptr::null_mut(), // cq_context (user data), unused
76                ptr::null::<ibv_comp_channel>() as *mut _, // comp_channel (NULL = polling only)
77                0,               // comp_vector (CPU affinity, unused w/o channel)
78            )
79        };
80
81        if cq.is_null() {
82            return Err(IbvError::from_errno_with_msg(
83                io::Error::last_os_error().raw_os_error().unwrap_or(0),
84                format!("Failed to create completion queue with size {min_cq_entries}"),
85            ));
86        }
87
88        log::debug!("CompletionQueue created with capacity {}", min_capacity);
89        Ok(CompletionQueue {
90            inner: Arc::new(CompletionQueueInner {
91                context: context.clone(),
92                cq,
93                min_capacity,
94            }),
95        })
96    }
97
98    /// Polls the CQ for completed work requests.
99    ///
100    /// This method checks the hardware queue for completions. It is non-blocking: if no completions
101    /// are available, it returns an empty iterator immediately.
102    ///
103    /// # Arguments
104    ///
105    /// * `completions` — A mutable slice of [`PollSlot`]s. This buffer serves as the destination
106    ///   where the NIC/driver will write the completion data. By requiring the caller to provide
107    ///   this buffer, the library avoids internal heap allocations during the hot polling loop.
108    ///   If the buffer length exceeds `i32::MAX`, only `i32::MAX` entries will be polled and
109    ///   the remaining slots will be unused; a warning is logged in that case.
110    ///
111    /// # Returns
112    ///
113    /// Returns a [`PolledCompletions`] iterator wrapper. This iterator yields owned
114    /// [`WorkCompletion`] values constructed from the
115    /// data copied by the NIC into the provided `completions` buffer.
116    pub fn poll<'poll_buff>(
117        &self,
118        completions: &'poll_buff mut [PollSlot],
119    ) -> IbvResult<PolledCompletions<'poll_buff>> {
120        let ne = i32::try_from(completions.len()).unwrap_or_else(|_| {
121            log::warn!(
122                "poll buffer length {} exceeds i32::MAX; only {} entries will be polled",
123                completions.len(),
124                i32::MAX
125            );
126            i32::MAX
127        });
128
129        let ctx: *mut ibv_context = unsafe { (*self.inner.cq).context };
130        let poll_cq = unsafe {
131            (*ctx)
132                .ops
133                .poll_cq
134                .expect("poll_cq function pointer should be set by driver")
135        };
136        let num_polled =
137            unsafe { poll_cq(self.inner.cq, ne, completions.as_mut_ptr() as *mut ibv_wc) };
138
139        if num_polled < 0 {
140            Err(IbvError::from_errno_with_msg(
141                num_polled.abs(),
142                "Failed to poll completion queue",
143            ))
144        } else {
145            // num_polled is non-negative after the check above
146            #[allow(clippy::cast_sign_loss)]
147            Ok(PolledCompletions {
148                wcs: &mut completions[0..num_polled as usize],
149            })
150        }
151    }
152
153    /// Returns the minimum capacity of the Completion Queue.
154    pub fn min_capacity(&self) -> u32 {
155        self.inner.min_capacity
156    }
157
158    /// Returns a reference to the Context associated with this CQ.
159    pub fn context(&self) -> &Context {
160        &self.inner.context
161    }
162}
163
164/// A pre-allocated slot for receiving a work completion.
165///
166/// This struct is a transparent wrapper around `ibv_wc`. Users should allocate an array
167/// of these slots to pass to [`CompletionQueue::poll`].
168#[derive(Copy, Clone, Debug, Default)]
169#[repr(transparent)]
170pub struct PollSlot {
171    wc: ibv_wc,
172}
173
174/// An iterator over completions retrieved from a poll operation.
175///
176/// This struct is returned by [`CompletionQueue::poll`]. It borrows the underlying
177/// [`PollSlot`] buffer and yields [`WorkCompletion`] objects.
178pub struct PolledCompletions<'a> {
179    wcs: &'a mut [PollSlot],
180}
181
182impl PolledCompletions<'_> {
183    /// Returns the number of completions actually polled.
184    pub fn len(&self) -> usize {
185        self.wcs.len()
186    }
187
188    /// Returns true if no completions were polled.
189    pub fn is_empty(&self) -> bool {
190        self.wcs.is_empty()
191    }
192}
193
194impl<'a> IntoIterator for PolledCompletions<'a> {
195    type Item = WorkCompletion;
196    type IntoIter = std::iter::Map<std::slice::Iter<'a, PollSlot>, fn(&PollSlot) -> WorkCompletion>;
197
198    fn into_iter(self) -> Self::IntoIter {
199        self.wcs
200            .iter()
201            .map(|wc_slot| WorkCompletion::new(wc_slot.wc))
202    }
203}
204
205/// Inner wrapper managing the raw CQ pointer.
206pub(super) struct CompletionQueueInner {
207    pub(super) context: Context,
208    pub(super) cq: *mut ibv_cq,
209    pub(super) min_capacity: u32,
210}
211
212/// SAFETY: libibverbs components are thread safe.
213unsafe impl Send for CompletionQueueInner {}
214/// SAFETY: libibverbs components are thread safe.
215unsafe impl Sync for CompletionQueueInner {}
216
217impl Drop for CompletionQueueInner {
218    fn drop(&mut self) {
219        log::debug!("CompletionQueue destroyed");
220
221        // SAFETY: self.cq is valid for the lifetime of Inner.
222        let errno = unsafe { ibv_destroy_cq(self.cq) };
223        if errno != 0 {
224            let error = IbvError::from_errno_with_msg(errno, "Failed to destroy completion queue");
225            log::error!("{error}");
226        }
227    }
228}
229
230impl std::fmt::Debug for CompletionQueueInner {
231    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232        // SAFETY: Dereferencing self.cq to read fields (handle, cqe) is safe
233        // because the pointer is valid for the lifetime of Inner.
234        f.debug_struct("CompletionQueueInner")
235            .field("handle", &(unsafe { *self.cq }).handle)
236            .field("capacity", &(unsafe { *self.cq }).cqe)
237            .field("context", &self.context)
238            .finish()
239    }
240}