rdma_io/async_cq.rs
1//! Async Completion Queue poller.
2//!
3//! `AsyncCq` wraps a `CompletionQueue` + `CompletionChannel` + runtime `CqNotifier`
4//! to provide async CQ polling without spin loops. Uses the standard drain-after-arm
5//! pattern to avoid the race condition between arming and blocking.
6
7use std::future::Future;
8use std::io;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU32, Ordering};
12use std::task::{Context, Poll};
13
14use rdma_io_sys::ibverbs::*;
15
16use crate::Result;
17use crate::comp_channel::CompletionChannel;
18use crate::cq::CompletionQueue;
19use crate::wc::WorkCompletion;
20
21/// Ack CQ events every this many events to amortize mutex cost.
22const ACK_BATCH_SIZE: u32 = 16;
23
24/// Trait abstracting over async runtimes for CQ fd readiness.
25///
26/// Each runtime provides an implementation that registers the
27/// comp_channel fd with its reactor and awaits readiness.
28///
29/// # Edge-triggered semantics
30///
31/// Implementations use edge-triggered epoll (EPOLLET). Callers must
32/// drain ALL events from the fd after readiness is signaled to avoid
33/// lost notifications.
34pub trait CqNotifier: Send + Sync {
35 /// Wait until the comp_channel fd is readable.
36 ///
37 /// Returns when the fd has data (a CQ event notification).
38 /// The caller must then drain all events and re-arm.
39 fn readable(&self) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + '_>>;
40
41 /// Poll-based readiness check for the comp_channel fd.
42 ///
43 /// Returns `Ready(Ok(()))` when the fd is (or was recently) readable.
44 /// The caller must drain all events from the comp_channel to avoid
45 /// lost wakeups with edge-triggered epoll.
46 fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
47}
48
49/// State for poll-based CQ completion drain.
50///
51/// Tracks position in the drain-after-arm loop for `poll_completions()`.
52#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
53pub enum CqPollState {
54 /// Start a fresh drain-after-arm cycle.
55 #[default]
56 Idle,
57 /// CQ was armed and polled empty; waiting for fd readiness.
58 WaitingFd,
59}
60
61/// Async completion queue poller.
62///
63/// Uses the drain-after-arm pattern:
64/// 1. `req_notify_cq()` — arm CQ notification
65/// 2. `poll_cq()` — drain any completions (catches arm/await race)
66/// 3. If completions found → return them
67/// 4. If empty → `notifier.readable().await` (sleep until fd fires)
68/// 5. `get_cq_event()` + periodic `ack_cq_events()` — consume notification
69/// 6. Loop back to 1
70pub struct AsyncCq {
71 cq: Arc<CompletionQueue>,
72 channel: CompletionChannel,
73 notifier: Box<dyn CqNotifier>,
74 unacked_events: AtomicU32,
75}
76
77impl AsyncCq {
78 /// Create a new async CQ poller.
79 ///
80 /// The `cq` must have been created with `CompletionQueue::with_comp_channel`
81 /// using the same `channel`.
82 pub fn new(
83 cq: Arc<CompletionQueue>,
84 channel: CompletionChannel,
85 notifier: Box<dyn CqNotifier>,
86 ) -> Self {
87 Self {
88 cq,
89 channel,
90 notifier,
91 unacked_events: AtomicU32::new(0),
92 }
93 }
94
95 /// Create an `AsyncCq` with a tokio-backed notifier.
96 ///
97 /// Convenience factory that creates a `CompletionChannel`,
98 /// `CompletionQueue`, and `TokioCqNotifier` in one call.
99 #[cfg(feature = "tokio")]
100 pub fn create_tokio(ctx: Arc<crate::device::Context>, depth: i32) -> crate::Result<Self> {
101 let ch = CompletionChannel::new(&ctx)?;
102 let cq = CompletionQueue::with_comp_channel(ctx, depth, &ch)?;
103 let notifier =
104 crate::tokio_notifier::TokioCqNotifier::new(ch.fd()).map_err(crate::Error::Verbs)?;
105 Ok(Self::new(cq, ch, Box::new(notifier)))
106 }
107
108 /// Poll for up to `wc_buf.len()` completions asynchronously.
109 ///
110 /// Returns when at least one completion is available.
111 pub async fn poll(&self, wc_buf: &mut [WorkCompletion]) -> Result<usize> {
112 loop {
113 // 1. Arm notification
114 self.cq.req_notify(false)?;
115
116 // 2. Drain any completions (catches race between arm and await)
117 let n = self.cq.poll(wc_buf)?;
118 if n > 0 {
119 return Ok(n);
120 }
121
122 // 3. No completions — wait for fd readiness
123 self.notifier
124 .readable()
125 .await
126 .map_err(crate::Error::Verbs)?;
127
128 // 4. Drain all CQ events (EPOLLET safety)
129 self.drain_channel_events()?;
130
131 // 5. Loop back — poll will find completions now
132 }
133 }
134
135 /// Wait for a specific WR ID completion.
136 ///
137 /// Any non-matching completions encountered are discarded.
138 /// For production use with multiple in-flight WRs, use `poll()` directly
139 /// and implement your own dispatch.
140 pub async fn poll_wr_id(&self, expected: u64) -> Result<WorkCompletion> {
141 let mut wc = [WorkCompletion::default(); 4];
142 loop {
143 let n = self.poll(&mut wc).await?;
144 for item in &wc[..n] {
145 if item.wr_id() == expected {
146 return Ok(*item);
147 }
148 }
149 }
150 }
151
152 /// Access the underlying CQ.
153 pub fn cq(&self) -> &Arc<CompletionQueue> {
154 &self.cq
155 }
156
157 /// Poll-based completion drain using the drain-after-arm pattern.
158 ///
159 /// External `state` tracks where we are in the arm → drain → wait loop.
160 /// Used by `AsyncRead`/`AsyncWrite` trait impls that need `Poll`-based APIs.
161 ///
162 /// # Edge-triggered safety
163 ///
164 /// After `poll_readable` returns Ready (which clears tokio's readiness
165 /// flag), we drain ALL events from the comp_channel. This ensures
166 /// EPOLLET correctly fires for the next new event.
167 pub fn poll_completions(
168 &self,
169 cx: &mut Context<'_>,
170 state: &mut CqPollState,
171 wc_buf: &mut [WorkCompletion],
172 ) -> Poll<Result<usize>> {
173 loop {
174 if *state == CqPollState::WaitingFd {
175 match self.notifier.poll_readable(cx) {
176 Poll::Pending => return Poll::Pending,
177 Poll::Ready(Err(e)) => return Poll::Ready(Err(crate::Error::Verbs(e))),
178 Poll::Ready(Ok(())) => {
179 // Drain ALL comp_channel events to stay safe with EPOLLET.
180 // poll_readable cleared tokio's readiness flag, so we must
181 // empty the fd completely — any leftover event won't trigger
182 // a new edge and would be lost.
183 self.drain_channel_events()?;
184 *state = CqPollState::Idle;
185 }
186 }
187 }
188
189 // Arm + drain
190 self.cq.req_notify(false)?;
191 let n = self.cq.poll(wc_buf)?;
192 if n > 0 {
193 return Poll::Ready(Ok(n));
194 }
195
196 // No completions — wait for fd
197 match self.notifier.poll_readable(cx) {
198 Poll::Pending => {
199 *state = CqPollState::WaitingFd;
200 return Poll::Pending;
201 }
202 Poll::Ready(Err(e)) => return Poll::Ready(Err(crate::Error::Verbs(e))),
203 Poll::Ready(Ok(())) => {
204 self.drain_channel_events()?;
205 // Loop back to arm+drain
206 }
207 }
208 }
209 }
210
211 /// Drain all pending events from the comp_channel.
212 ///
213 /// Required after `poll_readable` (which clears tokio's edge-triggered
214 /// readiness flag) to ensure the fd is truly empty. Any leftover event
215 /// would not trigger a new EPOLLET notification.
216 fn drain_channel_events(&self) -> Result<()> {
217 loop {
218 match self.channel.get_cq_event() {
219 Ok(_) => self.ack_event(),
220 Err(crate::Error::Verbs(ref e)) if e.kind() == io::ErrorKind::WouldBlock => {
221 return Ok(());
222 }
223 Err(e) => return Err(e),
224 }
225 }
226 }
227
228 /// Ack one event, batching to amortize mutex cost.
229 fn ack_event(&self) {
230 let prev = self.unacked_events.fetch_add(1, Ordering::Relaxed);
231 if prev + 1 >= ACK_BATCH_SIZE {
232 let unacked = self.unacked_events.swap(0, Ordering::Relaxed);
233 if unacked > 0 {
234 unsafe {
235 ibv_ack_cq_events(self.cq.as_raw(), unacked);
236 }
237 }
238 }
239 }
240}
241
242impl Drop for AsyncCq {
243 fn drop(&mut self) {
244 // Drain any pending comp_channel events (from arm-before-poll races)
245 // before acking and destroying the CQ.
246 while self.channel.get_cq_event().is_ok() {
247 self.unacked_events
248 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
249 }
250 // Ack all remaining events before CQ destruction
251 let unacked = self.unacked_events.load(Ordering::Relaxed);
252 if unacked > 0 {
253 unsafe {
254 ibv_ack_cq_events(self.cq.as_raw(), unacked);
255 }
256 }
257 }
258}