ibverbs_rs/ibverbs/completion_queue.rs
1//! Completion queue — polling for completed work requests.
2//!
3//! A [`CompletionQueue`] (CQ) is the mechanism used to receive notifications about completed
4//! Work Requests (WR) from a Queue Pair (QP). When a Send or Receive operation finishes,
5//! the hardware writes a "Work Completion" (WC) entry into the CQ.
6//!
7//! # Polling
8//!
9//! This library uses direct polling for maximum performance. You must manually call
10//! [`CompletionQueue::poll`] to check for completions. This operation bypasses the
11//! kernel and reads directly from the hardware queue, ensuring minimal latency.
12//!
13//! # Example: Polling for Completions
14//!
15//! ```no_run
16//! use ibverbs_rs::ibverbs;
17//! use ibverbs_rs::ibverbs::completion_queue::PollSlot;
18//!
19//! let context = ibverbs::open_device("mlx5_0")?;
20//! let cq = context.create_cq(16)?;
21//!
22//! // Pre-allocate a buffer for polling multiple completions at once
23//! let mut slots = [PollSlot::default(); 16];
24//!
25//! // Poll for completions (non-blocking)
26//! let completions = cq.poll(&mut slots)?;
27//!
28//! for wc in completions {
29//! println!("Work completion result: {:?}", wc.result());
30//! }
31//! # Ok::<(), Box<dyn std::error::Error>>(())
32//! ```
33
34use crate::ibverbs::device::Context;
35use crate::ibverbs::error::{IbvError, IbvResult};
36use crate::ibverbs::work::WorkCompletion;
37use ibverbs_sys::*;
38use std::sync::Arc;
39use std::{io, ptr};
40
41/// A shared handle to a Completion Queue (CQ).
42///
43/// This struct is thread-safe ([`Sync`]) and reference-counted ([`Arc`]). It holds a strong reference
44/// to the [`Context`] that created it, ensuring the device remains open.
45#[derive(Debug, Clone)]
46#[doc(alias = "ibv_cq")]
47#[doc(alias = "ibv_create_cq")]
48pub struct CompletionQueue {
49 pub(super) inner: Arc<CompletionQueueInner>,
50}
51
52impl CompletionQueue {
53 /// Creates a new Completion Queue.
54 ///
55 /// # Arguments
56 ///
57 /// * `context` — The device context on which to create the CQ.
58 /// * `min_capacity` — The minimum number of completion entries this CQ must hold.
59 /// The hardware may allocate a larger queue.
60 ///
61 /// # Errors
62 ///
63 /// * Returns [`IbvError::InvalidInput`] if `min_capacity` is too large (exceeds device limits)
64 /// or cannot fit in an `i32`.
65 /// * Returns [`IbvError::Resource`] if the system cannot allocate the necessary resources.
66 pub fn create(context: &Context, min_capacity: u32) -> IbvResult<CompletionQueue> {
67 let min_cq_entries = min_capacity.try_into().map_err(|_| {
68 IbvError::InvalidInput("Completion queue min_cq_entries must fit in an i32".to_string())
69 })?;
70
71 let cq = unsafe {
72 ibv_create_cq(
73 context.inner.ctx,
74 min_cq_entries,
75 ptr::null_mut(), // cq_context (user data), unused
76 ptr::null::<ibv_comp_channel>() as *mut _, // comp_channel (NULL = polling only)
77 0, // comp_vector (CPU affinity, unused w/o channel)
78 )
79 };
80
81 if cq.is_null() {
82 return Err(IbvError::from_errno_with_msg(
83 io::Error::last_os_error().raw_os_error().unwrap_or(0),
84 format!("Failed to create completion queue with size {min_cq_entries}"),
85 ));
86 }
87
88 log::debug!("CompletionQueue created with capacity {}", min_capacity);
89 Ok(CompletionQueue {
90 inner: Arc::new(CompletionQueueInner {
91 context: context.clone(),
92 cq,
93 min_capacity,
94 }),
95 })
96 }
97
98 /// Polls the CQ for completed work requests.
99 ///
100 /// This method checks the hardware queue for completions. It is non-blocking: if no completions
101 /// are available, it returns an empty iterator immediately.
102 ///
103 /// # Arguments
104 ///
105 /// * `completions` — A mutable slice of [`PollSlot`]s. This buffer serves as the destination
106 /// where the NIC/driver will write the completion data. By requiring the caller to provide
107 /// this buffer, the library avoids internal heap allocations during the hot polling loop.
108 /// If the buffer length exceeds `i32::MAX`, only `i32::MAX` entries will be polled and
109 /// the remaining slots will be unused; a warning is logged in that case.
110 ///
111 /// # Returns
112 ///
113 /// Returns a [`PolledCompletions`] iterator wrapper. This iterator yields owned
114 /// [`WorkCompletion`] values constructed from the
115 /// data copied by the NIC into the provided `completions` buffer.
116 pub fn poll<'poll_buff>(
117 &self,
118 completions: &'poll_buff mut [PollSlot],
119 ) -> IbvResult<PolledCompletions<'poll_buff>> {
120 let ne = i32::try_from(completions.len()).unwrap_or_else(|_| {
121 log::warn!(
122 "poll buffer length {} exceeds i32::MAX; only {} entries will be polled",
123 completions.len(),
124 i32::MAX
125 );
126 i32::MAX
127 });
128
129 let ctx: *mut ibv_context = unsafe { (*self.inner.cq).context };
130 let poll_cq = unsafe {
131 (*ctx)
132 .ops
133 .poll_cq
134 .expect("poll_cq function pointer should be set by driver")
135 };
136 let num_polled =
137 unsafe { poll_cq(self.inner.cq, ne, completions.as_mut_ptr() as *mut ibv_wc) };
138
139 if num_polled < 0 {
140 Err(IbvError::from_errno_with_msg(
141 num_polled.abs(),
142 "Failed to poll completion queue",
143 ))
144 } else {
145 // num_polled is non-negative after the check above
146 #[allow(clippy::cast_sign_loss)]
147 Ok(PolledCompletions {
148 wcs: &mut completions[0..num_polled as usize],
149 })
150 }
151 }
152
153 /// Returns the minimum capacity of the Completion Queue.
154 pub fn min_capacity(&self) -> u32 {
155 self.inner.min_capacity
156 }
157
158 /// Returns a reference to the Context associated with this CQ.
159 pub fn context(&self) -> &Context {
160 &self.inner.context
161 }
162}
163
164/// A pre-allocated slot for receiving a work completion.
165///
166/// This struct is a transparent wrapper around `ibv_wc`. Users should allocate an array
167/// of these slots to pass to [`CompletionQueue::poll`].
168#[derive(Copy, Clone, Debug, Default)]
169#[repr(transparent)]
170pub struct PollSlot {
171 wc: ibv_wc,
172}
173
174/// An iterator over completions retrieved from a poll operation.
175///
176/// This struct is returned by [`CompletionQueue::poll`]. It borrows the underlying
177/// [`PollSlot`] buffer and yields [`WorkCompletion`] objects.
178pub struct PolledCompletions<'a> {
179 wcs: &'a mut [PollSlot],
180}
181
182impl PolledCompletions<'_> {
183 /// Returns the number of completions actually polled.
184 pub fn len(&self) -> usize {
185 self.wcs.len()
186 }
187
188 /// Returns true if no completions were polled.
189 pub fn is_empty(&self) -> bool {
190 self.wcs.is_empty()
191 }
192}
193
194impl<'a> IntoIterator for PolledCompletions<'a> {
195 type Item = WorkCompletion;
196 type IntoIter = std::iter::Map<std::slice::Iter<'a, PollSlot>, fn(&PollSlot) -> WorkCompletion>;
197
198 fn into_iter(self) -> Self::IntoIter {
199 self.wcs
200 .iter()
201 .map(|wc_slot| WorkCompletion::new(wc_slot.wc))
202 }
203}
204
205/// Inner wrapper managing the raw CQ pointer.
206pub(super) struct CompletionQueueInner {
207 pub(super) context: Context,
208 pub(super) cq: *mut ibv_cq,
209 pub(super) min_capacity: u32,
210}
211
212/// SAFETY: libibverbs components are thread safe.
213unsafe impl Send for CompletionQueueInner {}
214/// SAFETY: libibverbs components are thread safe.
215unsafe impl Sync for CompletionQueueInner {}
216
217impl Drop for CompletionQueueInner {
218 fn drop(&mut self) {
219 log::debug!("CompletionQueue destroyed");
220
221 // SAFETY: self.cq is valid for the lifetime of Inner.
222 let errno = unsafe { ibv_destroy_cq(self.cq) };
223 if errno != 0 {
224 let error = IbvError::from_errno_with_msg(errno, "Failed to destroy completion queue");
225 log::error!("{error}");
226 }
227 }
228}
229
230impl std::fmt::Debug for CompletionQueueInner {
231 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232 // SAFETY: Dereferencing self.cq to read fields (handle, cqe) is safe
233 // because the pointer is valid for the lifetime of Inner.
234 f.debug_struct("CompletionQueueInner")
235 .field("handle", &(unsafe { *self.cq }).handle)
236 .field("capacity", &(unsafe { *self.cq }).cqe)
237 .field("context", &self.context)
238 .finish()
239 }
240}