Skip to main content

ibverbs_rs/channel/
polling_scope.rs

1use crate::channel::pending_work::PendingWork;
2use crate::channel::{Channel, TransportError, TransportResult};
3use crate::ibverbs::error::IbvResult;
4use crate::ibverbs::protection_domain::ProtectionDomain;
5use crate::ibverbs::work::{
6    ReadWorkRequest, ReceiveWorkRequest, SendWorkRequest, WorkSuccess, WriteWorkRequest,
7};
8use std::cell::RefCell;
9use std::marker::PhantomData;
10use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};
11use std::rc::Rc;
12use thiserror::Error;
13
14impl Channel {
15    /// Opens a polling scope that automatically polls all outstanding work requests when it ends.
16    ///
17    /// This is the primary safe way to perform RDMA operations. The closure receives a
18    /// [`PollingScope`] through which operations can be posted. When the closure returns,
19    /// any work requests that were not manually polled are automatically polled to completion
20    /// before this method returns.
21    ///
22    /// See [`manual_scope`](Self::manual_scope) for a variant that enforces manual polling
23    /// and returns the user's error type directly.
24    ///
25    /// # Error handling
26    ///
27    /// * If the closure returns `Err(E)`, the scope still auto-polls all outstanding work,
28    ///   then returns `ScopeError::ClosureError(E)`.
29    /// * If the closure returns `Ok(T)` but auto-polling encounters transport errors,
30    ///   returns `ScopeError::AutoPollError(...)`.
31    /// * If the closure panics, outstanding work is still polled for cleanup before
32    ///   the panic is resumed.
33    pub fn scope<'env, F, T, E>(&'env mut self, f: F) -> Result<T, ScopeError<E>>
34    where
35        F: for<'scope> FnOnce(&mut PollingScope<'scope, 'env, Channel>) -> Result<T, E>,
36    {
37        PollingScope::run(self, f)
38    }
39
40    /// Opens a polling scope that enforces manual polling of all work requests.
41    ///
42    /// Both [`scope`](Self::scope) and `manual_scope` allow the user to poll work manually.
43    /// The difference is that `manual_scope` makes this the **contract**: it returns
44    /// `Result<T, E>` directly instead of wrapping it in [`ScopeError`], avoiding
45    /// unnecessary error handling. If the closure succeeds but leaves work unpolled,
46    /// this method panics as a safety net.
47    ///
48    /// If the closure returns an error, outstanding work is still cleaned up without panicking.
49    pub fn manual_scope<'env, F, T, E>(&'env mut self, f: F) -> Result<T, E>
50    where
51        F: for<'scope> FnOnce(&mut PollingScope<'scope, 'env, Channel>) -> Result<T, E>,
52    {
53        PollingScope::run_manual(self, f)
54    }
55}
56
57impl<'scope, 'env> PollingScope<'scope, 'env, Channel> {
58    /// Returns a reference to the [`ProtectionDomain`] of the underlying channel.
59    pub fn pd(&self) -> &ProtectionDomain {
60        self.inner.pd()
61    }
62}
63
64/// Convenience alias for a [`Result`] with [`ScopeError`] as the default error type.
65pub type ScopeResult<T> = Result<T, ScopeError>;
66
67/// Error returned by [`Channel::scope`].
68///
69/// Distinguishes between errors originating from the user's closure and errors
70/// discovered during automatic polling of outstanding work requests.
71#[derive(Debug, Error)]
72pub enum ScopeError<E = TransportError> {
73    /// The user's closure returned an error.
74    #[error("Closure error: {0}")]
75    ClosureError(#[from] E),
76    /// The closure succeeded, but one or more auto-polled work requests failed.
77    #[error("Auto poll error: {0:?}")]
78    AutoPollError(Vec<TransportError>),
79}
80
81impl<'a, 'b, C> PollingScope<'a, 'b, C> {
82    /// Runs a closure inside an auto-polling scope, similar to [`std::thread::scope`].
83    ///
84    /// Work requests created inside the closure are tracked internally. When the closure
85    /// returns, any that were not manually polled are automatically polled to completion.
86    /// The user cannot leak work requests to escape the lifetime, because the handles are
87    /// stored in a private structure owned by the scope.
88    ///
89    /// # Lifetimes
90    ///
91    /// * `'scope` — The lifetime of the scope itself. New operations may be posted and may
92    ///   still be running during this period. It begins before the closure runs and ends
93    ///   after all outstanding work has been polled, but before this method returns.
94    /// * `'env` — The lifetime of data borrowed by the operations. Must outlive `'scope`,
95    ///   meaning anything alive at the call site (e.g. local variables) can be borrowed.
96    pub(crate) fn run<'env, F, T, E>(inner: &'env mut C, f: F) -> Result<T, ScopeError<E>>
97    where
98        F: for<'scope> FnOnce(&mut PollingScope<'scope, 'env, C>) -> Result<T, E>,
99    {
100        let mut scope = PollingScope::new(inner);
101        // The user's closure may panic after issuing work requests.
102        // The panic has to be caught to ensure clean up for exception safety.
103        let scope_result = catch_unwind(AssertUnwindSafe(|| f(&mut scope)));
104        let auto_poll_result = scope.auto_poll();
105
106        match scope_result {
107            Ok(closure_result) => match closure_result {
108                Err(closure_error) => Err(ScopeError::ClosureError(closure_error)),
109                Ok(closure_output) => match auto_poll_result {
110                    Ok(_) => Ok(closure_output),
111                    Err(error) => Err(ScopeError::AutoPollError(error)),
112                },
113            },
114            Err(panic) => resume_unwind(panic),
115        }
116    }
117
118    /// Runs a closure inside a strict polling scope.
119    ///
120    /// If the closure succeeds but any work requests were left unpolled, this method panics.
121    /// If the closure fails, outstanding work is still cleaned up without panicking.
122    pub(crate) fn run_manual<'env, F, T, E>(inner: &'env mut C, f: F) -> Result<T, E>
123    where
124        F: for<'scope> FnOnce(&mut PollingScope<'scope, 'env, C>) -> Result<T, E>,
125    {
126        let mut scope = PollingScope::new(inner);
127        // The user's closure may panic after issuing work requests.
128        // The panic has to be caught to ensure clean up for exception safety.
129        let scope_result = catch_unwind(AssertUnwindSafe(|| f(&mut scope)));
130        let auto_poll_result = scope.auto_poll();
131
132        match scope_result {
133            Ok(closure_result) => {
134                let closure_output = closure_result?;
135                match auto_poll_result {
136                    Ok(AutoPollSuccess::NoPendingWorks) => Ok(closure_output),
137                    Ok(AutoPollSuccess::PendingWorksSucceeded) | Err(_) => {
138                        panic!("Unpolled wrs in PollingScope::run_manual")
139                    }
140                }
141            }
142            Err(panic) => resume_unwind(panic),
143        }
144    }
145}
146
147/// A scoped context for posting RDMA operations with automatic lifetime safety.
148///
149/// Created by [`Channel::scope`] or [`Channel::manual_scope`]. Operations posted through
150/// a `PollingScope` return [`ScopedPendingWork`] handles that borrow the data buffers for
151/// `'scope`, preventing aliasing while the hardware is accessing them.
152///
153/// When the scope ends, all unpolled work is automatically polled to completion, ensuring
154/// that buffers are not released while the NIC may still be performing DMA.
155///
156/// This design mirrors [`std::thread::scope`] — the scope owns the work request handles
157/// internally, so the user cannot leak them via [`std::mem::forget`].
158pub struct PollingScope<'scope, 'env: 'scope, C> {
159    pub(crate) inner: &'env mut C,
160    wrs: Vec<ScopedPendingWork<'scope>>,
161    // for invariance of lifetimes, see `std::thread::scope`
162    scope: PhantomData<&'scope mut &'scope ()>,
163    env: PhantomData<&'env mut &'env ()>,
164}
165
166impl<'scope, 'env, C> PollingScope<'scope, 'env, C> {
167    pub(super) fn new(inner: &'env mut C) -> Self {
168        PollingScope {
169            inner,
170            wrs: vec![],
171            scope: PhantomData,
172            env: PhantomData,
173        }
174    }
175
176    // Important to notice. *Auto-poll does not fail*. The returned result represents the outcome
177    // of the polled work requests during clean up. If it errors, it means some of the work
178    // requests failed.
179    // Auto polls all non manually polled work requests issued during the closure.
180    fn auto_poll(self) -> AutoPollResult {
181        let mut auto_polled = false;
182        let mut transport_errors = Vec::new();
183
184        for wr in self.wrs {
185            let mut wr = wr.inner.borrow_mut();
186            // Only raise error into the auto polled if not polled by the user
187            if !wr.user_polled_to_completion {
188                auto_polled = true; // Mark that user left some wrs unpolled
189                if let Err(transport_error) = wr.wr.spin_poll() {
190                    transport_errors.push(transport_error);
191                }
192            }
193        }
194
195        // If everything goes well, no heap allocation
196        if !auto_polled {
197            Ok(AutoPollSuccess::NoPendingWorks)
198        } else {
199            if transport_errors.is_empty() {
200                Ok(AutoPollSuccess::PendingWorksSucceeded)
201            } else {
202                Err(transport_errors)
203            }
204        }
205    }
206}
207
208type AutoPollResult = Result<AutoPollSuccess, Vec<TransportError>>;
209
210enum AutoPollSuccess {
211    NoPendingWorks,
212    PendingWorksSucceeded,
213}
214
215impl<'scope, 'env, C> PollingScope<'scope, 'env, C> {
216    // The slice cannot be used again until the work request is consumed,
217    // so no overlapping operations can be done concurrently
218    pub(crate) fn channel_post_send<F>(
219        &mut self,
220        channel_selector: F,
221        wr: SendWorkRequest<'_, 'env>,
222    ) -> IbvResult<ScopedPendingWork<'scope>>
223    where
224        F: FnOnce(&mut C) -> IbvResult<&mut Channel>,
225    {
226        let channel = channel_selector(self.inner)?;
227        let wr = ScopedPendingWork::new(unsafe { channel.send_unpolled(wr)? });
228        self.wrs.push(wr.clone());
229        Ok(wr)
230    }
231
232    // The slice cannot be used again until the work request is consumed,
233    // so no overlapping operations can be done concurrently
234    pub(crate) fn channel_post_receive<F>(
235        &mut self,
236        channel_selector: F,
237        wr: ReceiveWorkRequest<'_, 'env>,
238    ) -> IbvResult<ScopedPendingWork<'scope>>
239    where
240        F: FnOnce(&mut C) -> IbvResult<&mut Channel>,
241    {
242        let channel = channel_selector(self.inner)?;
243        let wr = ScopedPendingWork::new(unsafe { channel.receive_unpolled(wr)? });
244        self.wrs.push(wr.clone());
245        Ok(wr)
246    }
247
248    pub(crate) fn channel_post_write<F>(
249        &mut self,
250        channel_selector: F,
251        wr: WriteWorkRequest<'_, 'env>,
252    ) -> IbvResult<ScopedPendingWork<'scope>>
253    where
254        F: FnOnce(&mut C) -> IbvResult<&mut Channel>,
255    {
256        let channel = channel_selector(self.inner)?;
257        let wr = ScopedPendingWork::new(unsafe { channel.write_unpolled(wr)? });
258        self.wrs.push(wr.clone());
259        Ok(wr)
260    }
261
262    pub(crate) fn channel_post_read<F>(
263        &mut self,
264        channel_selector: F,
265        wr: ReadWorkRequest<'_, 'env>,
266    ) -> IbvResult<ScopedPendingWork<'scope>>
267    where
268        F: FnOnce(&mut C) -> IbvResult<&mut Channel>,
269    {
270        let channel = channel_selector(self.inner)?;
271        let wr = ScopedPendingWork::new(unsafe { channel.read_unpolled(wr)? });
272        self.wrs.push(wr.clone());
273        Ok(wr)
274    }
275}
276
277/// A handle to a pending RDMA operation within a [`PollingScope`].
278///
279/// This handle can be used to manually poll for completion. If not polled by the time
280/// the scope ends, the operation will be auto-polled.
281#[derive(Debug, Clone)]
282pub struct ScopedPendingWork<'scope> {
283    inner: Rc<RefCell<ScopedPendingWorkInner<'scope>>>,
284    env: PhantomData<&'scope mut &'scope ()>,
285}
286
287#[derive(Debug)]
288struct ScopedPendingWorkInner<'scope> {
289    user_polled_to_completion: bool,
290    wr: PendingWork<'scope>,
291}
292
293impl<'scope> ScopedPendingWork<'scope> {
294    fn new(wr: PendingWork<'scope>) -> Self {
295        ScopedPendingWork {
296            inner: Rc::new(RefCell::new(ScopedPendingWorkInner {
297                user_polled_to_completion: false,
298                wr,
299            })),
300            env: PhantomData,
301        }
302    }
303
304    /// Checks if the operation has completed.
305    ///
306    /// Returns `None` if the operation is still in progress, or `Some(result)` once complete.
307    pub fn poll(&self) -> Option<TransportResult<WorkSuccess>> {
308        let mut wr = self.inner.borrow_mut();
309        let poll = wr.wr.poll()?;
310        wr.user_polled_to_completion = true;
311        Some(poll)
312    }
313
314    /// Busy-waits until the operation completes and returns the result.
315    pub fn spin_poll(&self) -> TransportResult<WorkSuccess> {
316        let mut wr = self.inner.borrow_mut();
317        let poll = wr.wr.spin_poll();
318        wr.user_polled_to_completion = true;
319        poll
320    }
321}