ibverbs_rs/channel/polling_scope.rs
1use crate::channel::pending_work::PendingWork;
2use crate::channel::{Channel, TransportError, TransportResult};
3use crate::ibverbs::error::IbvResult;
4use crate::ibverbs::protection_domain::ProtectionDomain;
5use crate::ibverbs::work::{
6 ReadWorkRequest, ReceiveWorkRequest, SendWorkRequest, WorkSuccess, WriteWorkRequest,
7};
8use std::cell::RefCell;
9use std::marker::PhantomData;
10use std::panic::{AssertUnwindSafe, catch_unwind, resume_unwind};
11use std::rc::Rc;
12use thiserror::Error;
13
14impl Channel {
15 /// Opens a polling scope that automatically polls all outstanding work requests when it ends.
16 ///
17 /// This is the primary safe way to perform RDMA operations. The closure receives a
18 /// [`PollingScope`] through which operations can be posted. When the closure returns,
19 /// any work requests that were not manually polled are automatically polled to completion
20 /// before this method returns.
21 ///
22 /// See [`manual_scope`](Self::manual_scope) for a variant that enforces manual polling
23 /// and returns the user's error type directly.
24 ///
25 /// # Error handling
26 ///
27 /// * If the closure returns `Err(E)`, the scope still auto-polls all outstanding work,
28 /// then returns `ScopeError::ClosureError(E)`.
29 /// * If the closure returns `Ok(T)` but auto-polling encounters transport errors,
30 /// returns `ScopeError::AutoPollError(...)`.
31 /// * If the closure panics, outstanding work is still polled for cleanup before
32 /// the panic is resumed.
33 pub fn scope<'env, F, T, E>(&'env mut self, f: F) -> Result<T, ScopeError<E>>
34 where
35 F: for<'scope> FnOnce(&mut PollingScope<'scope, 'env, Channel>) -> Result<T, E>,
36 {
37 PollingScope::run(self, f)
38 }
39
40 /// Opens a polling scope that enforces manual polling of all work requests.
41 ///
42 /// Both [`scope`](Self::scope) and `manual_scope` allow the user to poll work manually.
43 /// The difference is that `manual_scope` makes this the **contract**: it returns
44 /// `Result<T, E>` directly instead of wrapping it in [`ScopeError`], avoiding
45 /// unnecessary error handling. If the closure succeeds but leaves work unpolled,
46 /// this method panics as a safety net.
47 ///
48 /// If the closure returns an error, outstanding work is still cleaned up without panicking.
49 pub fn manual_scope<'env, F, T, E>(&'env mut self, f: F) -> Result<T, E>
50 where
51 F: for<'scope> FnOnce(&mut PollingScope<'scope, 'env, Channel>) -> Result<T, E>,
52 {
53 PollingScope::run_manual(self, f)
54 }
55}
56
57impl<'scope, 'env> PollingScope<'scope, 'env, Channel> {
58 /// Returns a reference to the [`ProtectionDomain`] of the underlying channel.
59 pub fn pd(&self) -> &ProtectionDomain {
60 self.inner.pd()
61 }
62}
63
64/// Convenience alias for a [`Result`] with [`ScopeError`] as the default error type.
65pub type ScopeResult<T> = Result<T, ScopeError>;
66
67/// Error returned by [`Channel::scope`].
68///
69/// Distinguishes between errors originating from the user's closure and errors
70/// discovered during automatic polling of outstanding work requests.
71#[derive(Debug, Error)]
72pub enum ScopeError<E = TransportError> {
73 /// The user's closure returned an error.
74 #[error("Closure error: {0}")]
75 ClosureError(#[from] E),
76 /// The closure succeeded, but one or more auto-polled work requests failed.
77 #[error("Auto poll error: {0:?}")]
78 AutoPollError(Vec<TransportError>),
79}
80
81impl<'a, 'b, C> PollingScope<'a, 'b, C> {
82 /// Runs a closure inside an auto-polling scope, similar to [`std::thread::scope`].
83 ///
84 /// Work requests created inside the closure are tracked internally. When the closure
85 /// returns, any that were not manually polled are automatically polled to completion.
86 /// The user cannot leak work requests to escape the lifetime, because the handles are
87 /// stored in a private structure owned by the scope.
88 ///
89 /// # Lifetimes
90 ///
91 /// * `'scope` — The lifetime of the scope itself. New operations may be posted and may
92 /// still be running during this period. It begins before the closure runs and ends
93 /// after all outstanding work has been polled, but before this method returns.
94 /// * `'env` — The lifetime of data borrowed by the operations. Must outlive `'scope`,
95 /// meaning anything alive at the call site (e.g. local variables) can be borrowed.
96 pub(crate) fn run<'env, F, T, E>(inner: &'env mut C, f: F) -> Result<T, ScopeError<E>>
97 where
98 F: for<'scope> FnOnce(&mut PollingScope<'scope, 'env, C>) -> Result<T, E>,
99 {
100 let mut scope = PollingScope::new(inner);
101 // The user's closure may panic after issuing work requests.
102 // The panic has to be caught to ensure clean up for exception safety.
103 let scope_result = catch_unwind(AssertUnwindSafe(|| f(&mut scope)));
104 let auto_poll_result = scope.auto_poll();
105
106 match scope_result {
107 Ok(closure_result) => match closure_result {
108 Err(closure_error) => Err(ScopeError::ClosureError(closure_error)),
109 Ok(closure_output) => match auto_poll_result {
110 Ok(_) => Ok(closure_output),
111 Err(error) => Err(ScopeError::AutoPollError(error)),
112 },
113 },
114 Err(panic) => resume_unwind(panic),
115 }
116 }
117
118 /// Runs a closure inside a strict polling scope.
119 ///
120 /// If the closure succeeds but any work requests were left unpolled, this method panics.
121 /// If the closure fails, outstanding work is still cleaned up without panicking.
122 pub(crate) fn run_manual<'env, F, T, E>(inner: &'env mut C, f: F) -> Result<T, E>
123 where
124 F: for<'scope> FnOnce(&mut PollingScope<'scope, 'env, C>) -> Result<T, E>,
125 {
126 let mut scope = PollingScope::new(inner);
127 // The user's closure may panic after issuing work requests.
128 // The panic has to be caught to ensure clean up for exception safety.
129 let scope_result = catch_unwind(AssertUnwindSafe(|| f(&mut scope)));
130 let auto_poll_result = scope.auto_poll();
131
132 match scope_result {
133 Ok(closure_result) => {
134 let closure_output = closure_result?;
135 match auto_poll_result {
136 Ok(AutoPollSuccess::NoPendingWorks) => Ok(closure_output),
137 Ok(AutoPollSuccess::PendingWorksSucceeded) | Err(_) => {
138 panic!("Unpolled wrs in PollingScope::run_manual")
139 }
140 }
141 }
142 Err(panic) => resume_unwind(panic),
143 }
144 }
145}
146
147/// A scoped context for posting RDMA operations with automatic lifetime safety.
148///
149/// Created by [`Channel::scope`] or [`Channel::manual_scope`]. Operations posted through
150/// a `PollingScope` return [`ScopedPendingWork`] handles that borrow the data buffers for
151/// `'scope`, preventing aliasing while the hardware is accessing them.
152///
153/// When the scope ends, all unpolled work is automatically polled to completion, ensuring
154/// that buffers are not released while the NIC may still be performing DMA.
155///
156/// This design mirrors [`std::thread::scope`] — the scope owns the work request handles
157/// internally, so the user cannot leak them via [`std::mem::forget`].
158pub struct PollingScope<'scope, 'env: 'scope, C> {
159 pub(crate) inner: &'env mut C,
160 wrs: Vec<ScopedPendingWork<'scope>>,
161 // for invariance of lifetimes, see `std::thread::scope`
162 scope: PhantomData<&'scope mut &'scope ()>,
163 env: PhantomData<&'env mut &'env ()>,
164}
165
166impl<'scope, 'env, C> PollingScope<'scope, 'env, C> {
167 pub(super) fn new(inner: &'env mut C) -> Self {
168 PollingScope {
169 inner,
170 wrs: vec![],
171 scope: PhantomData,
172 env: PhantomData,
173 }
174 }
175
176 // Important to notice. *Auto-poll does not fail*. The returned result represents the outcome
177 // of the polled work requests during clean up. If it errors, it means some of the work
178 // requests failed.
179 // Auto polls all non manually polled work requests issued during the closure.
180 fn auto_poll(self) -> AutoPollResult {
181 let mut auto_polled = false;
182 let mut transport_errors = Vec::new();
183
184 for wr in self.wrs {
185 let mut wr = wr.inner.borrow_mut();
186 // Only raise error into the auto polled if not polled by the user
187 if !wr.user_polled_to_completion {
188 auto_polled = true; // Mark that user left some wrs unpolled
189 if let Err(transport_error) = wr.wr.spin_poll() {
190 transport_errors.push(transport_error);
191 }
192 }
193 }
194
195 // If everything goes well, no heap allocation
196 if !auto_polled {
197 Ok(AutoPollSuccess::NoPendingWorks)
198 } else {
199 if transport_errors.is_empty() {
200 Ok(AutoPollSuccess::PendingWorksSucceeded)
201 } else {
202 Err(transport_errors)
203 }
204 }
205 }
206}
207
208type AutoPollResult = Result<AutoPollSuccess, Vec<TransportError>>;
209
210enum AutoPollSuccess {
211 NoPendingWorks,
212 PendingWorksSucceeded,
213}
214
215impl<'scope, 'env, C> PollingScope<'scope, 'env, C> {
216 // The slice cannot be used again until the work request is consumed,
217 // so no overlapping operations can be done concurrently
218 pub(crate) fn channel_post_send<F>(
219 &mut self,
220 channel_selector: F,
221 wr: SendWorkRequest<'_, 'env>,
222 ) -> IbvResult<ScopedPendingWork<'scope>>
223 where
224 F: FnOnce(&mut C) -> IbvResult<&mut Channel>,
225 {
226 let channel = channel_selector(self.inner)?;
227 let wr = ScopedPendingWork::new(unsafe { channel.send_unpolled(wr)? });
228 self.wrs.push(wr.clone());
229 Ok(wr)
230 }
231
232 // The slice cannot be used again until the work request is consumed,
233 // so no overlapping operations can be done concurrently
234 pub(crate) fn channel_post_receive<F>(
235 &mut self,
236 channel_selector: F,
237 wr: ReceiveWorkRequest<'_, 'env>,
238 ) -> IbvResult<ScopedPendingWork<'scope>>
239 where
240 F: FnOnce(&mut C) -> IbvResult<&mut Channel>,
241 {
242 let channel = channel_selector(self.inner)?;
243 let wr = ScopedPendingWork::new(unsafe { channel.receive_unpolled(wr)? });
244 self.wrs.push(wr.clone());
245 Ok(wr)
246 }
247
248 pub(crate) fn channel_post_write<F>(
249 &mut self,
250 channel_selector: F,
251 wr: WriteWorkRequest<'_, 'env>,
252 ) -> IbvResult<ScopedPendingWork<'scope>>
253 where
254 F: FnOnce(&mut C) -> IbvResult<&mut Channel>,
255 {
256 let channel = channel_selector(self.inner)?;
257 let wr = ScopedPendingWork::new(unsafe { channel.write_unpolled(wr)? });
258 self.wrs.push(wr.clone());
259 Ok(wr)
260 }
261
262 pub(crate) fn channel_post_read<F>(
263 &mut self,
264 channel_selector: F,
265 wr: ReadWorkRequest<'_, 'env>,
266 ) -> IbvResult<ScopedPendingWork<'scope>>
267 where
268 F: FnOnce(&mut C) -> IbvResult<&mut Channel>,
269 {
270 let channel = channel_selector(self.inner)?;
271 let wr = ScopedPendingWork::new(unsafe { channel.read_unpolled(wr)? });
272 self.wrs.push(wr.clone());
273 Ok(wr)
274 }
275}
276
277/// A handle to a pending RDMA operation within a [`PollingScope`].
278///
279/// This handle can be used to manually poll for completion. If not polled by the time
280/// the scope ends, the operation will be auto-polled.
281#[derive(Debug, Clone)]
282pub struct ScopedPendingWork<'scope> {
283 inner: Rc<RefCell<ScopedPendingWorkInner<'scope>>>,
284 env: PhantomData<&'scope mut &'scope ()>,
285}
286
287#[derive(Debug)]
288struct ScopedPendingWorkInner<'scope> {
289 user_polled_to_completion: bool,
290 wr: PendingWork<'scope>,
291}
292
293impl<'scope> ScopedPendingWork<'scope> {
294 fn new(wr: PendingWork<'scope>) -> Self {
295 ScopedPendingWork {
296 inner: Rc::new(RefCell::new(ScopedPendingWorkInner {
297 user_polled_to_completion: false,
298 wr,
299 })),
300 env: PhantomData,
301 }
302 }
303
304 /// Checks if the operation has completed.
305 ///
306 /// Returns `None` if the operation is still in progress, or `Some(result)` once complete.
307 pub fn poll(&self) -> Option<TransportResult<WorkSuccess>> {
308 let mut wr = self.inner.borrow_mut();
309 let poll = wr.wr.poll()?;
310 wr.user_polled_to_completion = true;
311 Some(poll)
312 }
313
314 /// Busy-waits until the operation completes and returns the result.
315 pub fn spin_poll(&self) -> TransportResult<WorkSuccess> {
316 let mut wr = self.inner.borrow_mut();
317 let poll = wr.wr.spin_poll();
318 wr.user_polled_to_completion = true;
319 poll
320 }
321}