net/ffi/handle_guard.rs
1//! Per-FFI-handle quiescing protocol.
2//!
3//! Cortex and mesh FFI handles are typically `extern "C" fn(*mut Handle, ...)`.
4//! Without explicit synchronization between in-flight ops and the
5//! handle's `_free` entry point, a foreign caller (Go cgo / Python
6//! threads / Node.js workers) racing a `_free` against an active op
7//! produces:
8//!
9//! 1. **Use-after-free on the inner.** `_free` does
10//! `Box::from_raw(handle); drop(...)`; a concurrent op that already
11//! dereferenced `&*handle` keeps reading freed memory.
12//!
13//! 2. **Use-after-free on the handle box itself.** Even with the
14//! inner held alive via an `Arc<Inner>` clone (e.g.
15//! `MeshStreamHandle._node` keeps the node alive but not the
16//! handle box), a concurrent `_free` can deallocate the outer Box
17//! while the op is still doing pointer-equality / handle-matching
18//! checks via `&*handle`.
19//!
20//! [`crate::ffi::handle_guard::HandleGuard`] is the shared building
21//! block. Each handle struct embeds one inline; every `extern "C"` op
22//! gates on [`crate::ffi::handle_guard::HandleGuard::try_enter`];
23//! every `_free` drives
24//! [`crate::ffi::handle_guard::HandleGuard::begin_free`].
25//!
26//! ## Soundness: the box must outlive `try_enter`'s `fetch_add`
27//!
28//! The Dekker-style "set freeing, check active_ops" handshake orders
29//! only the atomic operations — `Box::from_raw` is a non-atomic
30//! deallocation and can interleave between an op's
31//! `&*handle` and the op's `fetch_add(active_ops)`, producing UAF on
32//! the freed atomic. The same hazard the parent
33//! [`crate::ffi::NetHandle`] addresses by intentionally leaking its
34//! box. We adopt the same rule: **never deallocate the handle box
35//! once it has been handed to C.** `_free` instead takes the inner
36//! out via [`std::mem::ManuallyDrop`] and drops it; the outer box
37//! (carrying `HandleGuard`'s atomics + the now-empty
38//! `ManuallyDrop`) is leaked permanently. Concurrent ops doing
39//! `try_enter` after free safely fetch_add on still-valid memory,
40//! observe `freeing=true`, decrement, and bail.
41//!
42//! The cost is `size_of::<Box<Handle>>()` per `_free` call. Handle
43//! types are small (a few pointers + atomics), so total leak grows
44//! with cumulative `open + free` cycles — acceptable for the
45//! soundness gain.
46
47use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
48use std::time::{Duration, Instant};
49
50/// How long [`HandleGuard::begin_free`] will wait for in-flight ops
51/// to drain before giving up. On timeout, the caller must NOT take
52/// or drop the inner — concurrent ops may still be reading it. The
53/// inner is leaked along with the box.
54///
55/// Five seconds matches the parent [`crate::ffi::NetHandle`]'s
56/// `FFI_SHUTDOWN_DEADLINE`; well above any normal op latency
57/// (ingest, append, snapshot etc. are all sub-second), large enough
58/// to absorb a wedged adapter without reflexively leaking on a
59/// transient stall.
60pub const FFI_HANDLE_FREE_DEADLINE: Duration = Duration::from_secs(5);
61
62/// Per-handle quiescing core. Lives inline inside each handle
63/// struct. `try_enter` returns a guard that prevents `_free` from
64/// completing until dropped; `begin_free` quiesces in-flight ops
65/// and prevents new ones.
66pub struct HandleGuard {
67 /// Set to `true` once `_free` has been called for this handle.
68 /// All future `try_enter` calls observe this and bail. Stored
69 /// as `AtomicBool` (not a generation counter) because we never
70 /// re-use the handle after free — once flipped, never reset.
71 freeing: AtomicBool,
72 /// Number of in-flight ops currently inside `try_enter`'s guard.
73 /// `_free` waits for this to reach zero (with a deadline) before
74 /// taking the inner.
75 active_ops: AtomicU32,
76}
77
78impl HandleGuard {
79 /// Construct an empty guard. Use as a `const` initializer when
80 /// possible.
81 pub const fn new() -> Self {
82 Self {
83 freeing: AtomicBool::new(false),
84 active_ops: AtomicU32::new(0),
85 }
86 }
87
88 /// Try to enter an FFI operation against this handle.
89 ///
90 /// Increments `active_ops` first so a concurrent `begin_free`
91 /// is forced to observe the increment OR to set `freeing` first
92 /// (they synchronize via SeqCst). After the increment, we
93 /// re-check `freeing`: if free is in progress, the op cannot
94 /// proceed and we decrement back out. Otherwise we return a
95 /// guard whose `Drop` decrements.
96 ///
97 /// Returns `None` if `_free` has already started — the caller
98 /// must surface a typed "shutting down / freed" error code and
99 /// MUST NOT touch any fields of the handle except this
100 /// `HandleGuard` (which lives in still-valid leaked memory).
101 pub fn try_enter(&self) -> Option<HandleOp<'_>> {
102 // SeqCst: pairs with `begin_free`'s SeqCst freeing-store.
103 // The total order ensures every (try_enter, begin_free)
104 // pair agrees on which side won — either we observe
105 // `freeing=true` (and bail), or `begin_free` observes
106 // `active_ops > 0` (and waits).
107 self.active_ops.fetch_add(1, Ordering::SeqCst);
108 if self.freeing.load(Ordering::SeqCst) {
109 self.active_ops.fetch_sub(1, Ordering::AcqRel);
110 None
111 } else {
112 Some(HandleOp { core: self })
113 }
114 }
115
116 /// Mark the handle as freeing and wait for in-flight ops to
117 /// drain. Returns `true` if THIS call won the race to flip
118 /// `freeing` AND in-flight ops drained within
119 /// [`FFI_HANDLE_FREE_DEADLINE`]. Returns `false` on timeout
120 /// OR if a prior caller already flipped `freeing`.
121 ///
122 /// **Single-winner contract.** Only ONE caller across the
123 /// lifetime of this guard ever sees `true`. That winning
124 /// caller is the one that owns the right to take the inner
125 /// out of `ManuallyDrop` exactly once. Subsequent callers
126 /// (whether concurrent or strictly after) see `false` and
127 /// MUST NOT touch the inner — the winner has it (or had it,
128 /// and dropped it).
129 ///
130 /// This is what makes `_free` idempotent: a second `_free`
131 /// call gates the `ManuallyDrop::take` behind this method's
132 /// `true` return, so it bails before the double-take that
133 /// would UAF the inner allocation.
134 ///
135 /// On timeout (winner observed `freeing=false→true` but
136 /// drain didn't complete), the caller must NOT take the
137 /// inner — concurrent ops may still be holding it. Leak
138 /// inner along with the box.
139 ///
140 /// Future `try_enter` calls will see `freeing=true` and bail,
141 /// regardless of whether the winner's drain succeeded, timed
142 /// out, or this caller is the loser. "No NEW ops will start"
143 /// is set as soon as the winner flips the flag.
144 pub fn begin_free(&self, deadline: Duration) -> bool {
145 // compare_exchange so only one caller wins the right to
146 // flip false→true. Losers (whether racing concurrently
147 // or strictly after) get an Err and bail without ever
148 // entering the drain loop. SeqCst pairs with
149 // `try_enter`'s SeqCst load and matches the rest of the
150 // protocol's ordering. Pre-fix this was a `store(true)`
151 // which made every caller "win" — the cortex / mesh
152 // `_free` then double-took the inner via `ManuallyDrop::
153 // take`, UAF on the second call.
154 if self
155 .freeing
156 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
157 .is_err()
158 {
159 return false;
160 }
161 let start = Instant::now();
162 // Spin-with-sleep is appropriate: ops are sub-second; the
163 // deadline catches pathological wedge cases. We don't have
164 // an OS-level wait primitive on the atomic without
165 // platform-specific atomic_wait (stable in Rust 1.89+ but
166 // a larger refactor); the 1ms sleep keeps CPU low while
167 // the deadline is large enough to absorb normal jitter.
168 while self.active_ops.load(Ordering::SeqCst) > 0 {
169 if start.elapsed() >= deadline {
170 return false;
171 }
172 std::thread::sleep(Duration::from_millis(1));
173 }
174 true
175 }
176
177 /// True if `begin_free` has been called for this handle.
178 /// Useful for assertions in tests; production paths should use
179 /// `try_enter` (which already gates on this).
180 #[cfg(test)]
181 pub fn is_freeing(&self) -> bool {
182 self.freeing.load(Ordering::SeqCst)
183 }
184}
185
186impl Default for HandleGuard {
187 fn default() -> Self {
188 Self::new()
189 }
190}
191
192/// RAII guard returned by [`HandleGuard::try_enter`]. While alive,
193/// `begin_free` is forced to wait — the in-flight count seen by
194/// `begin_free` includes this op.
195///
196/// Holds only a borrow of the [`HandleGuard`] (which lives in the
197/// leaked handle box, so the borrow is sound for any duration the
198/// op chooses). No public methods — drop is the only operation.
199pub struct HandleOp<'a> {
200 core: &'a HandleGuard,
201}
202
203impl Drop for HandleOp<'_> {
204 fn drop(&mut self) {
205 self.core.active_ops.fetch_sub(1, Ordering::AcqRel);
206 }
207}
208
209#[cfg(test)]
210mod tests {
211 use super::*;
212 use std::sync::Arc;
213
214 /// Pin: `try_enter` succeeds on a fresh guard; `Drop`
215 /// decrements `active_ops` so a subsequent `begin_free`
216 /// drains immediately.
217 #[test]
218 fn try_enter_succeeds_and_drop_decrements() {
219 let g = HandleGuard::new();
220 {
221 let _op = g.try_enter().expect("fresh guard must accept ops");
222 assert_eq!(g.active_ops.load(Ordering::SeqCst), 1);
223 }
224 assert_eq!(g.active_ops.load(Ordering::SeqCst), 0);
225 assert!(g.begin_free(Duration::from_millis(50)));
226 }
227
228 /// Pin: `begin_free` flips `freeing` so subsequent `try_enter`
229 /// calls bail with `None`.
230 #[test]
231 fn try_enter_after_free_returns_none() {
232 let g = HandleGuard::new();
233 assert!(g.begin_free(Duration::from_millis(50)));
234 assert!(g.try_enter().is_none());
235 // No-op leak: active_ops was already 0 + nothing increments
236 // it on a None return path.
237 assert_eq!(g.active_ops.load(Ordering::SeqCst), 0);
238 }
239
240 /// A `_free` racing an in-flight op must wait for the op to
241 /// finish before returning success. Without the guard, `_free`
242 /// would be an unconditional `Box::from_raw` and the op's
243 /// subsequent dereference would UAF.
244 #[test]
245 fn begin_free_waits_for_inflight_op() {
246 let g = Arc::new(HandleGuard::new());
247
248 // Spawn a worker that holds an op for ~50ms.
249 let g_op = g.clone();
250 let started = Arc::new(AtomicBool::new(false));
251 let started_op = started.clone();
252 let worker = std::thread::spawn(move || {
253 let op = g_op.try_enter().expect("op must enter before free");
254 started_op.store(true, Ordering::SeqCst);
255 std::thread::sleep(Duration::from_millis(50));
256 drop(op);
257 });
258
259 // Wait for the worker to enter the op so we don't race the
260 // try_enter itself.
261 while !started.load(Ordering::SeqCst) {
262 std::thread::yield_now();
263 }
264
265 // begin_free MUST block until the op drops. A pre-fix free
266 // would return immediately with the op still running →
267 // subsequent inner-drop UAFs the op.
268 let t0 = Instant::now();
269 let drained = g.begin_free(Duration::from_secs(2));
270 let elapsed = t0.elapsed();
271 assert!(drained, "begin_free must drain within deadline");
272 assert!(
273 elapsed >= Duration::from_millis(40),
274 "begin_free returned in {:?} — must have waited for the in-flight op",
275 elapsed,
276 );
277 worker.join().unwrap();
278 }
279
280 /// Pin: `begin_free` returns `false` on timeout when an op
281 /// holds the guard past the deadline. Callers MUST leak the
282 /// inner in this case rather than dropping it.
283 #[test]
284 fn begin_free_times_out_when_op_outlasts_deadline() {
285 let g = Arc::new(HandleGuard::new());
286 let g_op = g.clone();
287 let release = Arc::new(AtomicBool::new(false));
288 let release_op = release.clone();
289 let worker = std::thread::spawn(move || {
290 let op = g_op.try_enter().expect("op must enter");
291 while !release_op.load(Ordering::SeqCst) {
292 std::thread::sleep(Duration::from_millis(1));
293 }
294 drop(op);
295 });
296
297 // Brief sleep to let the worker enter; deadline is shorter
298 // than the worker's hold time.
299 std::thread::sleep(Duration::from_millis(20));
300 let drained = g.begin_free(Duration::from_millis(50));
301 assert!(!drained, "deadline expired with op still in flight");
302 // freeing is still set even on timeout — future ops bail.
303 assert!(g.is_freeing());
304 assert!(g.try_enter().is_none());
305
306 // Let the worker finish so the test thread can join.
307 release.store(true, Ordering::SeqCst);
308 worker.join().unwrap();
309 }
310
311 /// Pin: exactly ONE caller wins the `begin_free` race, even
312 /// under concurrent invocation. The single-winner contract
313 /// is what makes the per-handle `_free` (which gates
314 /// `ManuallyDrop::take` on `begin_free` returning `true`)
315 /// idempotent — a second caller that also returned `true`
316 /// would double-take the inner and UAF.
317 ///
318 /// Pre-fix `begin_free` did a plain `store(true)` so every
319 /// caller saw `true` and every `_free` re-took the inner.
320 /// The post-fix `compare_exchange(false, true)` flips the
321 /// flag exactly once and subsequent callers return `false`.
322 #[test]
323 fn begin_free_has_exactly_one_winner_under_concurrency() {
324 const ROUNDS: usize = 32;
325 for _ in 0..ROUNDS {
326 let g = Arc::new(HandleGuard::new());
327 let g1 = g.clone();
328 let g2 = g.clone();
329 let t1 = std::thread::spawn(move || g1.begin_free(Duration::from_millis(50)));
330 let t2 = std::thread::spawn(move || g2.begin_free(Duration::from_millis(50)));
331 let r1 = t1.join().unwrap();
332 let r2 = t2.join().unwrap();
333 assert!(
334 r1 ^ r2,
335 "exactly one caller must win begin_free; got r1={r1} r2={r2}",
336 );
337 }
338 }
339
340 /// Pin: a strictly-sequential second `begin_free` call after
341 /// a successful first call returns `false`. This is the path
342 /// every `_free` takes on a second invocation — the second
343 /// caller must skip the `ManuallyDrop::take` branch.
344 #[test]
345 fn begin_free_returns_false_on_second_sequential_call() {
346 let g = HandleGuard::new();
347 assert!(g.begin_free(Duration::from_millis(50)));
348 assert!(
349 !g.begin_free(Duration::from_millis(50)),
350 "second begin_free must bail — only the first caller \
351 owns the right to take the inner",
352 );
353 }
354
355 /// Pin: a second `begin_free` after a TIMED-OUT first call
356 /// also returns `false`. The first caller's
357 /// `compare_exchange` already flipped `freeing=true`, so the
358 /// second caller observes the flag and bails — the
359 /// already-taken inner (or inner that the timed-out caller
360 /// left in place to be leaked) must not be re-taken.
361 #[test]
362 fn begin_free_returns_false_after_timed_out_first_call() {
363 let g = Arc::new(HandleGuard::new());
364 let g_op = g.clone();
365 let release = Arc::new(AtomicBool::new(false));
366 let release_op = release.clone();
367 let worker = std::thread::spawn(move || {
368 let op = g_op.try_enter().expect("op must enter");
369 while !release_op.load(Ordering::SeqCst) {
370 std::thread::sleep(Duration::from_millis(1));
371 }
372 drop(op);
373 });
374
375 std::thread::sleep(Duration::from_millis(20));
376 // First call times out (op still in flight) — returns false
377 // but freeing is set.
378 assert!(!g.begin_free(Duration::from_millis(40)));
379
380 // Let the op drain.
381 release.store(true, Ordering::SeqCst);
382 worker.join().unwrap();
383
384 // Second call must still bail — the first call won the
385 // freeing flag even though it timed out, so no second
386 // caller may claim the right to take the inner.
387 assert!(
388 !g.begin_free(Duration::from_millis(50)),
389 "second begin_free after a timed-out first call must bail",
390 );
391 }
392}