net/ffi/handle_guard.rs
1//! Per-FFI-handle quiescing protocol.
2//!
3//! Cortex and mesh FFI handles are typically `extern "C" fn(*mut Handle, ...)`.
4//! Without explicit synchronization between in-flight ops and the
5//! handle's `_free` entry point, a foreign caller (Go cgo / Python
6//! threads / Node.js workers) racing a `_free` against an active op
7//! produces:
8//!
9//! 1. **Use-after-free on the inner.** `_free` does
10//! `Box::from_raw(handle); drop(...)`; a concurrent op that already
11//! dereferenced `&*handle` keeps reading freed memory.
12//!
13//! 2. **Use-after-free on the handle box itself.** Even with the
14//! inner held alive via an `Arc<Inner>` clone (e.g.
15//! `MeshStreamHandle._node` keeps the node alive but not the
16//! handle box), a concurrent `_free` can deallocate the outer Box
17//! while the op is still doing pointer-equality / handle-matching
18//! checks via `&*handle`.
19//!
20//! [`crate::ffi::handle_guard::HandleGuard`] is the shared building
21//! block. Each handle struct embeds one inline; every `extern "C"` op
22//! gates on [`crate::ffi::handle_guard::HandleGuard::try_enter`];
23//! every `_free` drives
24//! [`crate::ffi::handle_guard::HandleGuard::begin_free`].
25//!
26//! ## Soundness: the box must outlive `try_enter`'s `fetch_add`
27//!
28//! The Dekker-style "set freeing, check active_ops" handshake orders
29//! only the atomic operations — `Box::from_raw` is a non-atomic
30//! deallocation and can interleave between an op's
31//! `&*handle` and the op's `fetch_add(active_ops)`, producing UAF on
32//! the freed atomic. The same hazard the parent
33//! [`crate::ffi::NetHandle`] addresses by intentionally leaking its
34//! box. We adopt the same rule: **never deallocate the handle box
35//! once it has been handed to C.** `_free` instead takes the inner
36//! out via [`std::mem::ManuallyDrop`] and drops it; the outer box
37//! (carrying `HandleGuard`'s atomics + the now-empty
38//! `ManuallyDrop`) is leaked permanently. Concurrent ops doing
39//! `try_enter` after free safely fetch_add on still-valid memory,
40//! observe `freeing=true`, decrement, and bail.
41//!
42//! The cost is `size_of::<Box<Handle>>()` per `_free` call. Handle
43//! types are small (a few pointers + atomics), so total leak grows
44//! with cumulative `open + free` cycles — acceptable for the
45//! soundness gain.
46//!
47//! ## Adopting the guard on a NEW handle (checklist)
48//!
49//! The protocol is applied by convention at each call site rather
50//! than via a wrapper type (the inline form is uniform across the
51//! cortex / mesh / redis-dedup / aggregator handles). Miss any step
52//! and the UAF this module exists to prevent comes back, so when you
53//! add a handle:
54//!
55//! 1. Wrap every inner field in [`std::mem::ManuallyDrop`] and embed
56//! one [`crate::ffi::handle_guard::HandleGuard`] (not in
57//! `ManuallyDrop` — it must outlive the inner so post-free
58//! `try_enter` calls land on valid memory).
59//! 2. In every `extern "C"` op, gate on
60//! [`crate::ffi::handle_guard::HandleGuard::try_enter`] *before*
61//! touching any inner field, and return the handle's
62//! "invalid/shutting down" sentinel (NULL / error code) on `None`.
63//! 3. Don't hold the returned
64//! [`crate::ffi::handle_guard::HandleOp`] across a long blocking
65//! call — clone the (Arc-backed) inner out, drop the guard, then
66//! re-enter only to write back, so a concurrent `_free` never
67//! waits on a caller-set deadline (see `ffi::aggregator`).
68//! 4. In `_free`, gate the `ManuallyDrop::drop` of each inner field
69//! on [`crate::ffi::handle_guard::HandleGuard::begin_free`]
70//! returning `true`; never `Box::from_raw` — leak the box.
71//! 5. Any pointer handed back to C must be an owned copy, never a
72//! borrow into a `ManuallyDrop` field (which `_free` can drop).
73
74use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
75use std::time::{Duration, Instant};
76
77/// How long [`HandleGuard::begin_free`] will wait for in-flight ops
78/// to drain before giving up. On timeout, the caller must NOT take
79/// or drop the inner — concurrent ops may still be reading it. The
80/// inner is leaked along with the box.
81///
82/// Five seconds matches the parent [`crate::ffi::NetHandle`]'s
83/// `FFI_SHUTDOWN_DEADLINE`; well above any normal op latency
84/// (ingest, append, snapshot etc. are all sub-second), large enough
85/// to absorb a wedged adapter without reflexively leaking on a
86/// transient stall.
87pub const FFI_HANDLE_FREE_DEADLINE: Duration = Duration::from_secs(5);
88
89/// Outcome of [`HandleGuard::begin_free_detailed`] — lets a `_free`
90/// log accurately instead of treating every non-success the same.
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub enum BeginFree {
93 /// This call won the free race AND in-flight ops drained: the
94 /// caller owns the right to take/drop the inner exactly once.
95 Drained,
96 /// A prior `_free` already won for this handle (benign repeat
97 /// free). The caller must NOT touch the inner — the winner has
98 /// it — and nothing is leaked by this call, so don't warn.
99 AlreadyFreeing,
100 /// This call won the free race but in-flight ops did not drain
101 /// within the deadline. The caller must leak the inner (a
102 /// concurrent op may still be reading it) — worth a warning.
103 TimedOut,
104}
105
106/// Per-handle quiescing core. Lives inline inside each handle
107/// struct. `try_enter` returns a guard that prevents `_free` from
108/// completing until dropped; `begin_free` quiesces in-flight ops
109/// and prevents new ones.
110pub struct HandleGuard {
111 /// Set to `true` once `_free` has been called for this handle.
112 /// All future `try_enter` calls observe this and bail. Stored
113 /// as `AtomicBool` (not a generation counter) because we never
114 /// re-use the handle after free — once flipped, never reset.
115 freeing: AtomicBool,
116 /// Number of in-flight ops currently inside `try_enter`'s guard.
117 /// `_free` waits for this to reach zero (with a deadline) before
118 /// taking the inner.
119 active_ops: AtomicU32,
120}
121
122impl HandleGuard {
123 /// Construct an empty guard. Use as a `const` initializer when
124 /// possible.
125 pub const fn new() -> Self {
126 Self {
127 freeing: AtomicBool::new(false),
128 active_ops: AtomicU32::new(0),
129 }
130 }
131
132 /// Try to enter an FFI operation against this handle.
133 ///
134 /// Increments `active_ops` first so a concurrent `begin_free`
135 /// is forced to observe the increment OR to set `freeing` first
136 /// (they synchronize via SeqCst). After the increment, we
137 /// re-check `freeing`: if free is in progress, the op cannot
138 /// proceed and we decrement back out. Otherwise we return a
139 /// guard whose `Drop` decrements.
140 ///
141 /// Returns `None` if `_free` has already started — the caller
142 /// must surface a typed "shutting down / freed" error code and
143 /// MUST NOT touch any fields of the handle except this
144 /// `HandleGuard` (which lives in still-valid leaked memory).
145 pub fn try_enter(&self) -> Option<HandleOp<'_>> {
146 // SeqCst: pairs with `begin_free`'s SeqCst freeing-store.
147 // The total order ensures every (try_enter, begin_free)
148 // pair agrees on which side won — either we observe
149 // `freeing=true` (and bail), or `begin_free` observes
150 // `active_ops > 0` (and waits).
151 self.active_ops.fetch_add(1, Ordering::SeqCst);
152 if self.freeing.load(Ordering::SeqCst) {
153 self.active_ops.fetch_sub(1, Ordering::AcqRel);
154 None
155 } else {
156 Some(HandleOp { core: self })
157 }
158 }
159
160 /// Mark the handle as freeing and wait for in-flight ops to
161 /// drain. Returns `true` if THIS call won the race to flip
162 /// `freeing` AND in-flight ops drained within
163 /// [`FFI_HANDLE_FREE_DEADLINE`]. Returns `false` on timeout
164 /// OR if a prior caller already flipped `freeing`.
165 ///
166 /// **Single-winner contract.** Only ONE caller across the
167 /// lifetime of this guard ever sees `true`. That winning
168 /// caller is the one that owns the right to take the inner
169 /// out of `ManuallyDrop` exactly once. Subsequent callers
170 /// (whether concurrent or strictly after) see `false` and
171 /// MUST NOT touch the inner — the winner has it (or had it,
172 /// and dropped it).
173 ///
174 /// This is what makes `_free` idempotent: a second `_free`
175 /// call gates the `ManuallyDrop::take` behind this method's
176 /// `true` return, so it bails before the double-take that
177 /// would UAF the inner allocation.
178 ///
179 /// On timeout (winner observed `freeing=false→true` but
180 /// drain didn't complete), the caller must NOT take the
181 /// inner — concurrent ops may still be holding it. Leak
182 /// inner along with the box.
183 ///
184 /// Future `try_enter` calls will see `freeing=true` and bail,
185 /// regardless of whether the winner's drain succeeded, timed
186 /// out, or this caller is the loser. "No NEW ops will start"
187 /// is set as soon as the winner flips the flag.
188 pub fn begin_free(&self, deadline: Duration) -> bool {
189 matches!(self.begin_free_detailed(deadline), BeginFree::Drained)
190 }
191
192 /// Same protocol as [`Self::begin_free`] but distinguishes the
193 /// two `false` outcomes so a caller can log accurately: a benign
194 /// repeat `_free` ([`BeginFree::AlreadyFreeing`]) leaks nothing
195 /// and shouldn't warn, whereas a genuine drain timeout
196 /// ([`BeginFree::TimedOut`]) deliberately leaks the inner and is
197 /// worth a warning. The single-winner contract is unchanged:
198 /// exactly one caller ever sees [`BeginFree::Drained`].
199 pub fn begin_free_detailed(&self, deadline: Duration) -> BeginFree {
200 // compare_exchange so only one caller wins the right to
201 // flip false→true. Losers (whether racing concurrently
202 // or strictly after) get an Err and bail without ever
203 // entering the drain loop. SeqCst pairs with
204 // `try_enter`'s SeqCst load and matches the rest of the
205 // protocol's ordering. Pre-fix this was a `store(true)`
206 // which made every caller "win" — the cortex / mesh
207 // `_free` then double-took the inner via `ManuallyDrop::
208 // take`, UAF on the second call.
209 if self
210 .freeing
211 .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
212 .is_err()
213 {
214 return BeginFree::AlreadyFreeing;
215 }
216 let start = Instant::now();
217 // Spin-with-sleep is appropriate: ops are sub-second; the
218 // deadline catches pathological wedge cases. We don't have
219 // an OS-level wait primitive on the atomic without
220 // platform-specific atomic_wait (stable in Rust 1.89+ but
221 // a larger refactor); the 1ms sleep keeps CPU low while
222 // the deadline is large enough to absorb normal jitter.
223 while self.active_ops.load(Ordering::SeqCst) > 0 {
224 if start.elapsed() >= deadline {
225 return BeginFree::TimedOut;
226 }
227 std::thread::sleep(Duration::from_millis(1));
228 }
229 BeginFree::Drained
230 }
231
232 /// True if `begin_free` has been called for this handle.
233 /// Useful for assertions in tests; production paths should use
234 /// `try_enter` (which already gates on this).
235 #[cfg(test)]
236 pub fn is_freeing(&self) -> bool {
237 self.freeing.load(Ordering::SeqCst)
238 }
239}
240
241impl Default for HandleGuard {
242 fn default() -> Self {
243 Self::new()
244 }
245}
246
247/// RAII guard returned by [`HandleGuard::try_enter`]. While alive,
248/// `begin_free` is forced to wait — the in-flight count seen by
249/// `begin_free` includes this op.
250///
251/// Holds only a borrow of the [`HandleGuard`] (which lives in the
252/// leaked handle box, so the borrow is sound for any duration the
253/// op chooses). No public methods — drop is the only operation.
254pub struct HandleOp<'a> {
255 core: &'a HandleGuard,
256}
257
258impl Drop for HandleOp<'_> {
259 fn drop(&mut self) {
260 self.core.active_ops.fetch_sub(1, Ordering::AcqRel);
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267 use std::sync::Arc;
268
269 /// Pin: `try_enter` succeeds on a fresh guard; `Drop`
270 /// decrements `active_ops` so a subsequent `begin_free`
271 /// drains immediately.
272 #[test]
273 fn try_enter_succeeds_and_drop_decrements() {
274 let g = HandleGuard::new();
275 {
276 let _op = g.try_enter().expect("fresh guard must accept ops");
277 assert_eq!(g.active_ops.load(Ordering::SeqCst), 1);
278 }
279 assert_eq!(g.active_ops.load(Ordering::SeqCst), 0);
280 assert!(g.begin_free(Duration::from_millis(50)));
281 }
282
283 /// Pin: `begin_free` flips `freeing` so subsequent `try_enter`
284 /// calls bail with `None`.
285 #[test]
286 fn try_enter_after_free_returns_none() {
287 let g = HandleGuard::new();
288 assert!(g.begin_free(Duration::from_millis(50)));
289 assert!(g.try_enter().is_none());
290 // No-op leak: active_ops was already 0 + nothing increments
291 // it on a None return path.
292 assert_eq!(g.active_ops.load(Ordering::SeqCst), 0);
293 }
294
295 /// A `_free` racing an in-flight op must wait for the op to
296 /// finish before returning success. Without the guard, `_free`
297 /// would be an unconditional `Box::from_raw` and the op's
298 /// subsequent dereference would UAF.
299 #[test]
300 fn begin_free_waits_for_inflight_op() {
301 let g = Arc::new(HandleGuard::new());
302
303 // Spawn a worker that holds an op for ~50ms.
304 let g_op = g.clone();
305 let started = Arc::new(AtomicBool::new(false));
306 let started_op = started.clone();
307 let worker = std::thread::spawn(move || {
308 let op = g_op.try_enter().expect("op must enter before free");
309 started_op.store(true, Ordering::SeqCst);
310 std::thread::sleep(Duration::from_millis(50));
311 drop(op);
312 });
313
314 // Wait for the worker to enter the op so we don't race the
315 // try_enter itself.
316 while !started.load(Ordering::SeqCst) {
317 std::thread::yield_now();
318 }
319
320 // begin_free MUST block until the op drops. A pre-fix free
321 // would return immediately with the op still running →
322 // subsequent inner-drop UAFs the op.
323 let t0 = Instant::now();
324 let drained = g.begin_free(Duration::from_secs(2));
325 let elapsed = t0.elapsed();
326 assert!(drained, "begin_free must drain within deadline");
327 assert!(
328 elapsed >= Duration::from_millis(40),
329 "begin_free returned in {:?} — must have waited for the in-flight op",
330 elapsed,
331 );
332 worker.join().unwrap();
333 }
334
335 /// Pin: `begin_free` returns `false` on timeout when an op
336 /// holds the guard past the deadline. Callers MUST leak the
337 /// inner in this case rather than dropping it.
338 #[test]
339 fn begin_free_times_out_when_op_outlasts_deadline() {
340 let g = Arc::new(HandleGuard::new());
341 let g_op = g.clone();
342 let release = Arc::new(AtomicBool::new(false));
343 let release_op = release.clone();
344 let worker = std::thread::spawn(move || {
345 let op = g_op.try_enter().expect("op must enter");
346 while !release_op.load(Ordering::SeqCst) {
347 std::thread::sleep(Duration::from_millis(1));
348 }
349 drop(op);
350 });
351
352 // Brief sleep to let the worker enter; deadline is shorter
353 // than the worker's hold time.
354 std::thread::sleep(Duration::from_millis(20));
355 let drained = g.begin_free(Duration::from_millis(50));
356 assert!(!drained, "deadline expired with op still in flight");
357 // freeing is still set even on timeout — future ops bail.
358 assert!(g.is_freeing());
359 assert!(g.try_enter().is_none());
360
361 // Let the worker finish so the test thread can join.
362 release.store(true, Ordering::SeqCst);
363 worker.join().unwrap();
364 }
365
366 /// Pin: exactly ONE caller wins the `begin_free` race, even
367 /// under concurrent invocation. The single-winner contract
368 /// is what makes the per-handle `_free` (which gates
369 /// `ManuallyDrop::take` on `begin_free` returning `true`)
370 /// idempotent — a second caller that also returned `true`
371 /// would double-take the inner and UAF.
372 ///
373 /// Pre-fix `begin_free` did a plain `store(true)` so every
374 /// caller saw `true` and every `_free` re-took the inner.
375 /// The post-fix `compare_exchange(false, true)` flips the
376 /// flag exactly once and subsequent callers return `false`.
377 #[test]
378 fn begin_free_has_exactly_one_winner_under_concurrency() {
379 const ROUNDS: usize = 32;
380 for _ in 0..ROUNDS {
381 let g = Arc::new(HandleGuard::new());
382 let g1 = g.clone();
383 let g2 = g.clone();
384 let t1 = std::thread::spawn(move || g1.begin_free(Duration::from_millis(50)));
385 let t2 = std::thread::spawn(move || g2.begin_free(Duration::from_millis(50)));
386 let r1 = t1.join().unwrap();
387 let r2 = t2.join().unwrap();
388 assert!(
389 r1 ^ r2,
390 "exactly one caller must win begin_free; got r1={r1} r2={r2}",
391 );
392 }
393 }
394
395 /// Pin: a strictly-sequential second `begin_free` call after
396 /// a successful first call returns `false`. This is the path
397 /// every `_free` takes on a second invocation — the second
398 /// caller must skip the `ManuallyDrop::take` branch.
399 #[test]
400 fn begin_free_returns_false_on_second_sequential_call() {
401 let g = HandleGuard::new();
402 assert!(g.begin_free(Duration::from_millis(50)));
403 assert!(
404 !g.begin_free(Duration::from_millis(50)),
405 "second begin_free must bail — only the first caller \
406 owns the right to take the inner",
407 );
408 }
409
410 /// Pin: `begin_free_detailed` distinguishes the three outcomes,
411 /// so `_free` can warn only on a genuine drain timeout and stay
412 /// quiet on a benign repeat free.
413 #[test]
414 fn begin_free_detailed_distinguishes_outcomes() {
415 // Fresh guard, no ops → Drained (the winner).
416 let g = HandleGuard::new();
417 assert_eq!(
418 g.begin_free_detailed(Duration::from_millis(50)),
419 BeginFree::Drained
420 );
421 // Second call → AlreadyFreeing (benign repeat, not a timeout).
422 assert_eq!(
423 g.begin_free_detailed(Duration::from_millis(50)),
424 BeginFree::AlreadyFreeing
425 );
426
427 // Op held past the deadline → TimedOut for the winner.
428 let g2 = Arc::new(HandleGuard::new());
429 let g2_op = g2.clone();
430 let release = Arc::new(AtomicBool::new(false));
431 let release_op = release.clone();
432 let worker = std::thread::spawn(move || {
433 let op = g2_op.try_enter().expect("op must enter");
434 while !release_op.load(Ordering::SeqCst) {
435 std::thread::sleep(Duration::from_millis(1));
436 }
437 drop(op);
438 });
439 std::thread::sleep(Duration::from_millis(20));
440 assert_eq!(
441 g2.begin_free_detailed(Duration::from_millis(40)),
442 BeginFree::TimedOut
443 );
444 // After a timed-out winner, a later call is still AlreadyFreeing.
445 assert_eq!(
446 g2.begin_free_detailed(Duration::from_millis(40)),
447 BeginFree::AlreadyFreeing
448 );
449 release.store(true, Ordering::SeqCst);
450 worker.join().unwrap();
451 }
452
453 /// Pin: a second `begin_free` after a TIMED-OUT first call
454 /// also returns `false`. The first caller's
455 /// `compare_exchange` already flipped `freeing=true`, so the
456 /// second caller observes the flag and bails — the
457 /// already-taken inner (or inner that the timed-out caller
458 /// left in place to be leaked) must not be re-taken.
459 #[test]
460 fn begin_free_returns_false_after_timed_out_first_call() {
461 let g = Arc::new(HandleGuard::new());
462 let g_op = g.clone();
463 let release = Arc::new(AtomicBool::new(false));
464 let release_op = release.clone();
465 let worker = std::thread::spawn(move || {
466 let op = g_op.try_enter().expect("op must enter");
467 while !release_op.load(Ordering::SeqCst) {
468 std::thread::sleep(Duration::from_millis(1));
469 }
470 drop(op);
471 });
472
473 std::thread::sleep(Duration::from_millis(20));
474 // First call times out (op still in flight) — returns false
475 // but freeing is set.
476 assert!(!g.begin_free(Duration::from_millis(40)));
477
478 // Let the op drain.
479 release.store(true, Ordering::SeqCst);
480 worker.join().unwrap();
481
482 // Second call must still bail — the first call won the
483 // freeing flag even though it timed out, so no second
484 // caller may claim the right to take the inner.
485 assert!(
486 !g.begin_free(Duration::from_millis(50)),
487 "second begin_free after a timed-out first call must bail",
488 );
489 }
490}