Skip to main content

rdma_io/
async_cq.rs

1//! Async Completion Queue poller.
2//!
3//! `AsyncCq` wraps a `CompletionQueue` + `CompletionChannel` + runtime `CqNotifier`
4//! to provide async CQ polling without spin loops. Uses the standard drain-after-arm
5//! pattern to avoid the race condition between arming and blocking.
6
7use std::future::Future;
8use std::io;
9use std::pin::Pin;
10use std::sync::Arc;
11use std::sync::atomic::{AtomicU32, Ordering};
12use std::task::{Context, Poll};
13
14use rdma_io_sys::ibverbs::*;
15
16use crate::Result;
17use crate::comp_channel::CompletionChannel;
18use crate::cq::CompletionQueue;
19use crate::wc::WorkCompletion;
20
21/// Ack CQ events every this many events to amortize mutex cost.
22const ACK_BATCH_SIZE: u32 = 16;
23
24/// Trait abstracting over async runtimes for CQ fd readiness.
25///
26/// Each runtime provides an implementation that registers the
27/// comp_channel fd with its reactor and awaits readiness.
28///
29/// # Edge-triggered semantics
30///
31/// Implementations use edge-triggered epoll (EPOLLET). Callers must
32/// drain ALL events from the fd after readiness is signaled to avoid
33/// lost notifications.
34pub trait CqNotifier: Send + Sync {
35    /// Wait until the comp_channel fd is readable.
36    ///
37    /// Returns when the fd has data (a CQ event notification).
38    /// The caller must then drain all events and re-arm.
39    fn readable(&self) -> Pin<Box<dyn Future<Output = io::Result<()>> + Send + '_>>;
40
41    /// Poll-based readiness check for the comp_channel fd.
42    ///
43    /// Returns `Ready(Ok(()))` when the fd is (or was recently) readable.
44    /// The caller must drain all events from the comp_channel to avoid
45    /// lost wakeups with edge-triggered epoll.
46    fn poll_readable(&self, cx: &mut Context<'_>) -> Poll<io::Result<()>>;
47}
48
49/// State for poll-based CQ completion drain.
50///
51/// Tracks position in the drain-after-arm loop for `poll_completions()`.
52#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
53pub enum CqPollState {
54    /// Start a fresh drain-after-arm cycle.
55    #[default]
56    Idle,
57    /// CQ was armed and polled empty; waiting for fd readiness.
58    WaitingFd,
59}
60
61/// Async completion queue poller.
62///
63/// Uses the drain-after-arm pattern:
64/// 1. `req_notify_cq()` — arm CQ notification
65/// 2. `poll_cq()` — drain any completions (catches arm/await race)
66/// 3. If completions found → return them
67/// 4. If empty → `notifier.readable().await` (sleep until fd fires)
68/// 5. `get_cq_event()` + periodic `ack_cq_events()` — consume notification
69/// 6. Loop back to 1
70pub struct AsyncCq {
71    cq: Arc<CompletionQueue>,
72    channel: CompletionChannel,
73    notifier: Box<dyn CqNotifier>,
74    unacked_events: AtomicU32,
75}
76
77impl AsyncCq {
78    /// Create a new async CQ poller.
79    ///
80    /// The `cq` must have been created with `CompletionQueue::with_comp_channel`
81    /// using the same `channel`.
82    pub fn new(
83        cq: Arc<CompletionQueue>,
84        channel: CompletionChannel,
85        notifier: Box<dyn CqNotifier>,
86    ) -> Self {
87        Self {
88            cq,
89            channel,
90            notifier,
91            unacked_events: AtomicU32::new(0),
92        }
93    }
94
95    /// Create an `AsyncCq` with a tokio-backed notifier.
96    ///
97    /// Convenience factory that creates a `CompletionChannel`,
98    /// `CompletionQueue`, and `TokioCqNotifier` in one call.
99    #[cfg(feature = "tokio")]
100    pub fn create_tokio(ctx: Arc<crate::device::Context>, depth: i32) -> crate::Result<Self> {
101        let ch = CompletionChannel::new(&ctx)?;
102        let cq = CompletionQueue::with_comp_channel(ctx, depth, &ch)?;
103        let notifier =
104            crate::tokio_notifier::TokioCqNotifier::new(ch.fd()).map_err(crate::Error::Verbs)?;
105        Ok(Self::new(cq, ch, Box::new(notifier)))
106    }
107
108    /// Poll for up to `wc_buf.len()` completions asynchronously.
109    ///
110    /// Returns when at least one completion is available.
111    pub async fn poll(&self, wc_buf: &mut [WorkCompletion]) -> Result<usize> {
112        loop {
113            // 1. Arm notification
114            self.cq.req_notify(false)?;
115
116            // 2. Drain any completions (catches race between arm and await)
117            let n = self.cq.poll(wc_buf)?;
118            if n > 0 {
119                return Ok(n);
120            }
121
122            // 3. No completions — wait for fd readiness
123            self.notifier
124                .readable()
125                .await
126                .map_err(crate::Error::Verbs)?;
127
128            // 4. Drain all CQ events (EPOLLET safety)
129            self.drain_channel_events()?;
130
131            // 5. Loop back — poll will find completions now
132        }
133    }
134
135    /// Wait for a specific WR ID completion.
136    ///
137    /// Any non-matching completions encountered are discarded.
138    /// For production use with multiple in-flight WRs, use `poll()` directly
139    /// and implement your own dispatch.
140    pub async fn poll_wr_id(&self, expected: u64) -> Result<WorkCompletion> {
141        let mut wc = [WorkCompletion::default(); 4];
142        loop {
143            let n = self.poll(&mut wc).await?;
144            for item in &wc[..n] {
145                if item.wr_id() == expected {
146                    return Ok(*item);
147                }
148            }
149        }
150    }
151
152    /// Access the underlying CQ.
153    pub fn cq(&self) -> &Arc<CompletionQueue> {
154        &self.cq
155    }
156
157    /// Poll-based completion drain using the drain-after-arm pattern.
158    ///
159    /// External `state` tracks where we are in the arm → drain → wait loop.
160    /// Used by `AsyncRead`/`AsyncWrite` trait impls that need `Poll`-based APIs.
161    ///
162    /// # Edge-triggered safety
163    ///
164    /// After `poll_readable` returns Ready (which clears tokio's readiness
165    /// flag), we drain ALL events from the comp_channel. This ensures
166    /// EPOLLET correctly fires for the next new event.
167    pub fn poll_completions(
168        &self,
169        cx: &mut Context<'_>,
170        state: &mut CqPollState,
171        wc_buf: &mut [WorkCompletion],
172    ) -> Poll<Result<usize>> {
173        loop {
174            if *state == CqPollState::WaitingFd {
175                match self.notifier.poll_readable(cx) {
176                    Poll::Pending => return Poll::Pending,
177                    Poll::Ready(Err(e)) => return Poll::Ready(Err(crate::Error::Verbs(e))),
178                    Poll::Ready(Ok(())) => {
179                        // Drain ALL comp_channel events to stay safe with EPOLLET.
180                        // poll_readable cleared tokio's readiness flag, so we must
181                        // empty the fd completely — any leftover event won't trigger
182                        // a new edge and would be lost.
183                        self.drain_channel_events()?;
184                        *state = CqPollState::Idle;
185                    }
186                }
187            }
188
189            // Arm + drain
190            self.cq.req_notify(false)?;
191            let n = self.cq.poll(wc_buf)?;
192            if n > 0 {
193                return Poll::Ready(Ok(n));
194            }
195
196            // No completions — wait for fd
197            match self.notifier.poll_readable(cx) {
198                Poll::Pending => {
199                    *state = CqPollState::WaitingFd;
200                    return Poll::Pending;
201                }
202                Poll::Ready(Err(e)) => return Poll::Ready(Err(crate::Error::Verbs(e))),
203                Poll::Ready(Ok(())) => {
204                    self.drain_channel_events()?;
205                    // Loop back to arm+drain
206                }
207            }
208        }
209    }
210
211    /// Drain all pending events from the comp_channel.
212    ///
213    /// Required after `poll_readable` (which clears tokio's edge-triggered
214    /// readiness flag) to ensure the fd is truly empty. Any leftover event
215    /// would not trigger a new EPOLLET notification.
216    fn drain_channel_events(&self) -> Result<()> {
217        loop {
218            match self.channel.get_cq_event() {
219                Ok(_) => self.ack_event(),
220                Err(crate::Error::Verbs(ref e)) if e.kind() == io::ErrorKind::WouldBlock => {
221                    return Ok(());
222                }
223                Err(e) => return Err(e),
224            }
225        }
226    }
227
228    /// Ack one event, batching to amortize mutex cost.
229    fn ack_event(&self) {
230        let prev = self.unacked_events.fetch_add(1, Ordering::Relaxed);
231        if prev + 1 >= ACK_BATCH_SIZE {
232            let unacked = self.unacked_events.swap(0, Ordering::Relaxed);
233            if unacked > 0 {
234                unsafe {
235                    ibv_ack_cq_events(self.cq.as_raw(), unacked);
236                }
237            }
238        }
239    }
240}
241
242impl Drop for AsyncCq {
243    fn drop(&mut self) {
244        // Drain any pending comp_channel events (from arm-before-poll races)
245        // before acking and destroying the CQ.
246        while self.channel.get_cq_event().is_ok() {
247            self.unacked_events
248                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
249        }
250        // Ack all remaining events before CQ destruction
251        let unacked = self.unacked_events.load(Ordering::Relaxed);
252        if unacked > 0 {
253            unsafe {
254                ibv_ack_cq_events(self.cq.as_raw(), unacked);
255            }
256        }
257    }
258}