Skip to main content

net/ffi/
handle_guard.rs

1//! Per-FFI-handle quiescing protocol.
2//!
3//! Cortex and mesh FFI handles are typically `extern "C" fn(*mut Handle, ...)`.
4//! Without explicit synchronization between in-flight ops and the
5//! handle's `_free` entry point, a foreign caller (Go cgo / Python
6//! threads / Node.js workers) racing a `_free` against an active op
7//! produces:
8//!
9//! 1. **Use-after-free on the inner.** `_free` does
10//!    `Box::from_raw(handle); drop(...)`; a concurrent op that already
11//!    dereferenced `&*handle` keeps reading freed memory.
12//!
13//! 2. **Use-after-free on the handle box itself.** Even with the
14//!    inner held alive via an `Arc<Inner>` clone (e.g.
15//!    `MeshStreamHandle._node` keeps the node alive but not the
16//!    handle box), a concurrent `_free` can deallocate the outer Box
17//!    while the op is still doing pointer-equality / handle-matching
18//!    checks via `&*handle`.
19//!
20//! [`crate::ffi::handle_guard::HandleGuard`] is the shared building
21//! block. Each handle struct embeds one inline; every `extern "C"` op
22//! gates on [`crate::ffi::handle_guard::HandleGuard::try_enter`];
23//! every `_free` drives
24//! [`crate::ffi::handle_guard::HandleGuard::begin_free`].
25//!
26//! ## Soundness: the box must outlive `try_enter`'s `fetch_add`
27//!
28//! The Dekker-style "set freeing, check active_ops" handshake orders
29//! only the atomic operations — `Box::from_raw` is a non-atomic
30//! deallocation and can interleave between an op's
31//! `&*handle` and the op's `fetch_add(active_ops)`, producing UAF on
32//! the freed atomic. The same hazard the parent
33//! [`crate::ffi::NetHandle`] addresses by intentionally leaking its
34//! box. We adopt the same rule: **never deallocate the handle box
35//! once it has been handed to C.** `_free` instead takes the inner
36//! out via [`std::mem::ManuallyDrop`] and drops it; the outer box
37//! (carrying `HandleGuard`'s atomics + the now-empty
38//! `ManuallyDrop`) is leaked permanently. Concurrent ops doing
39//! `try_enter` after free safely fetch_add on still-valid memory,
40//! observe `freeing=true`, decrement, and bail.
41//!
42//! The cost is `size_of::<Box<Handle>>()` per `_free` call. Handle
43//! types are small (a few pointers + atomics), so total leak grows
44//! with cumulative `open + free` cycles — acceptable for the
45//! soundness gain.
46
47use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
48use std::time::{Duration, Instant};
49
50/// How long [`HandleGuard::begin_free`] will wait for in-flight ops
51/// to drain before giving up. On timeout, the caller must NOT take
52/// or drop the inner — concurrent ops may still be reading it. The
53/// inner is leaked along with the box.
54///
55/// Five seconds matches the parent [`crate::ffi::NetHandle`]'s
56/// `FFI_SHUTDOWN_DEADLINE`; well above any normal op latency
57/// (ingest, append, snapshot etc. are all sub-second), large enough
58/// to absorb a wedged adapter without reflexively leaking on a
59/// transient stall.
60pub const FFI_HANDLE_FREE_DEADLINE: Duration = Duration::from_secs(5);
61
62/// Per-handle quiescing core. Lives inline inside each handle
63/// struct. `try_enter` returns a guard that prevents `_free` from
64/// completing until dropped; `begin_free` quiesces in-flight ops
65/// and prevents new ones.
66pub struct HandleGuard {
67    /// Set to `true` once `_free` has been called for this handle.
68    /// All future `try_enter` calls observe this and bail. Stored
69    /// as `AtomicBool` (not a generation counter) because we never
70    /// re-use the handle after free — once flipped, never reset.
71    freeing: AtomicBool,
72    /// Number of in-flight ops currently inside `try_enter`'s guard.
73    /// `_free` waits for this to reach zero (with a deadline) before
74    /// taking the inner.
75    active_ops: AtomicU32,
76}
77
78impl HandleGuard {
79    /// Construct an empty guard. Use as a `const` initializer when
80    /// possible.
81    pub const fn new() -> Self {
82        Self {
83            freeing: AtomicBool::new(false),
84            active_ops: AtomicU32::new(0),
85        }
86    }
87
88    /// Try to enter an FFI operation against this handle.
89    ///
90    /// Increments `active_ops` first so a concurrent `begin_free`
91    /// is forced to observe the increment OR to set `freeing` first
92    /// (they synchronize via SeqCst). After the increment, we
93    /// re-check `freeing`: if free is in progress, the op cannot
94    /// proceed and we decrement back out. Otherwise we return a
95    /// guard whose `Drop` decrements.
96    ///
97    /// Returns `None` if `_free` has already started — the caller
98    /// must surface a typed "shutting down / freed" error code and
99    /// MUST NOT touch any fields of the handle except this
100    /// `HandleGuard` (which lives in still-valid leaked memory).
101    pub fn try_enter(&self) -> Option<HandleOp<'_>> {
102        // SeqCst: pairs with `begin_free`'s SeqCst freeing-store.
103        // The total order ensures every (try_enter, begin_free)
104        // pair agrees on which side won — either we observe
105        // `freeing=true` (and bail), or `begin_free` observes
106        // `active_ops > 0` (and waits).
107        self.active_ops.fetch_add(1, Ordering::SeqCst);
108        if self.freeing.load(Ordering::SeqCst) {
109            self.active_ops.fetch_sub(1, Ordering::AcqRel);
110            None
111        } else {
112            Some(HandleOp { core: self })
113        }
114    }
115
116    /// Mark the handle as freeing and wait for in-flight ops to
117    /// drain. Returns `true` if THIS call won the race to flip
118    /// `freeing` AND in-flight ops drained within
119    /// [`FFI_HANDLE_FREE_DEADLINE`]. Returns `false` on timeout
120    /// OR if a prior caller already flipped `freeing`.
121    ///
122    /// **Single-winner contract.** Only ONE caller across the
123    /// lifetime of this guard ever sees `true`. That winning
124    /// caller is the one that owns the right to take the inner
125    /// out of `ManuallyDrop` exactly once. Subsequent callers
126    /// (whether concurrent or strictly after) see `false` and
127    /// MUST NOT touch the inner — the winner has it (or had it,
128    /// and dropped it).
129    ///
130    /// This is what makes `_free` idempotent: a second `_free`
131    /// call gates the `ManuallyDrop::take` behind this method's
132    /// `true` return, so it bails before the double-take that
133    /// would UAF the inner allocation.
134    ///
135    /// On timeout (winner observed `freeing=false→true` but
136    /// drain didn't complete), the caller must NOT take the
137    /// inner — concurrent ops may still be holding it. Leak
138    /// inner along with the box.
139    ///
140    /// Future `try_enter` calls will see `freeing=true` and bail,
141    /// regardless of whether the winner's drain succeeded, timed
142    /// out, or this caller is the loser. "No NEW ops will start"
143    /// is set as soon as the winner flips the flag.
144    pub fn begin_free(&self, deadline: Duration) -> bool {
145        // compare_exchange so only one caller wins the right to
146        // flip false→true. Losers (whether racing concurrently
147        // or strictly after) get an Err and bail without ever
148        // entering the drain loop. SeqCst pairs with
149        // `try_enter`'s SeqCst load and matches the rest of the
150        // protocol's ordering. Pre-fix this was a `store(true)`
151        // which made every caller "win" — the cortex / mesh
152        // `_free` then double-took the inner via `ManuallyDrop::
153        // take`, UAF on the second call.
154        if self
155            .freeing
156            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
157            .is_err()
158        {
159            return false;
160        }
161        let start = Instant::now();
162        // Spin-with-sleep is appropriate: ops are sub-second; the
163        // deadline catches pathological wedge cases. We don't have
164        // an OS-level wait primitive on the atomic without
165        // platform-specific atomic_wait (stable in Rust 1.89+ but
166        // a larger refactor); the 1ms sleep keeps CPU low while
167        // the deadline is large enough to absorb normal jitter.
168        while self.active_ops.load(Ordering::SeqCst) > 0 {
169            if start.elapsed() >= deadline {
170                return false;
171            }
172            std::thread::sleep(Duration::from_millis(1));
173        }
174        true
175    }
176
177    /// True if `begin_free` has been called for this handle.
178    /// Useful for assertions in tests; production paths should use
179    /// `try_enter` (which already gates on this).
180    #[cfg(test)]
181    pub fn is_freeing(&self) -> bool {
182        self.freeing.load(Ordering::SeqCst)
183    }
184}
185
186impl Default for HandleGuard {
187    fn default() -> Self {
188        Self::new()
189    }
190}
191
192/// RAII guard returned by [`HandleGuard::try_enter`]. While alive,
193/// `begin_free` is forced to wait — the in-flight count seen by
194/// `begin_free` includes this op.
195///
196/// Holds only a borrow of the [`HandleGuard`] (which lives in the
197/// leaked handle box, so the borrow is sound for any duration the
198/// op chooses). No public methods — drop is the only operation.
199pub struct HandleOp<'a> {
200    core: &'a HandleGuard,
201}
202
203impl Drop for HandleOp<'_> {
204    fn drop(&mut self) {
205        self.core.active_ops.fetch_sub(1, Ordering::AcqRel);
206    }
207}
208
209#[cfg(test)]
210mod tests {
211    use super::*;
212    use std::sync::Arc;
213
214    /// Pin: `try_enter` succeeds on a fresh guard; `Drop`
215    /// decrements `active_ops` so a subsequent `begin_free`
216    /// drains immediately.
217    #[test]
218    fn try_enter_succeeds_and_drop_decrements() {
219        let g = HandleGuard::new();
220        {
221            let _op = g.try_enter().expect("fresh guard must accept ops");
222            assert_eq!(g.active_ops.load(Ordering::SeqCst), 1);
223        }
224        assert_eq!(g.active_ops.load(Ordering::SeqCst), 0);
225        assert!(g.begin_free(Duration::from_millis(50)));
226    }
227
228    /// Pin: `begin_free` flips `freeing` so subsequent `try_enter`
229    /// calls bail with `None`.
230    #[test]
231    fn try_enter_after_free_returns_none() {
232        let g = HandleGuard::new();
233        assert!(g.begin_free(Duration::from_millis(50)));
234        assert!(g.try_enter().is_none());
235        // No-op leak: active_ops was already 0 + nothing increments
236        // it on a None return path.
237        assert_eq!(g.active_ops.load(Ordering::SeqCst), 0);
238    }
239
240    /// A `_free` racing an in-flight op must wait for the op to
241    /// finish before returning success. Without the guard, `_free`
242    /// would be an unconditional `Box::from_raw` and the op's
243    /// subsequent dereference would UAF.
244    #[test]
245    fn begin_free_waits_for_inflight_op() {
246        let g = Arc::new(HandleGuard::new());
247
248        // Spawn a worker that holds an op for ~50ms.
249        let g_op = g.clone();
250        let started = Arc::new(AtomicBool::new(false));
251        let started_op = started.clone();
252        let worker = std::thread::spawn(move || {
253            let op = g_op.try_enter().expect("op must enter before free");
254            started_op.store(true, Ordering::SeqCst);
255            std::thread::sleep(Duration::from_millis(50));
256            drop(op);
257        });
258
259        // Wait for the worker to enter the op so we don't race the
260        // try_enter itself.
261        while !started.load(Ordering::SeqCst) {
262            std::thread::yield_now();
263        }
264
265        // begin_free MUST block until the op drops. A pre-fix free
266        // would return immediately with the op still running →
267        // subsequent inner-drop UAFs the op.
268        let t0 = Instant::now();
269        let drained = g.begin_free(Duration::from_secs(2));
270        let elapsed = t0.elapsed();
271        assert!(drained, "begin_free must drain within deadline");
272        assert!(
273            elapsed >= Duration::from_millis(40),
274            "begin_free returned in {:?} — must have waited for the in-flight op",
275            elapsed,
276        );
277        worker.join().unwrap();
278    }
279
280    /// Pin: `begin_free` returns `false` on timeout when an op
281    /// holds the guard past the deadline. Callers MUST leak the
282    /// inner in this case rather than dropping it.
283    #[test]
284    fn begin_free_times_out_when_op_outlasts_deadline() {
285        let g = Arc::new(HandleGuard::new());
286        let g_op = g.clone();
287        let release = Arc::new(AtomicBool::new(false));
288        let release_op = release.clone();
289        let worker = std::thread::spawn(move || {
290            let op = g_op.try_enter().expect("op must enter");
291            while !release_op.load(Ordering::SeqCst) {
292                std::thread::sleep(Duration::from_millis(1));
293            }
294            drop(op);
295        });
296
297        // Brief sleep to let the worker enter; deadline is shorter
298        // than the worker's hold time.
299        std::thread::sleep(Duration::from_millis(20));
300        let drained = g.begin_free(Duration::from_millis(50));
301        assert!(!drained, "deadline expired with op still in flight");
302        // freeing is still set even on timeout — future ops bail.
303        assert!(g.is_freeing());
304        assert!(g.try_enter().is_none());
305
306        // Let the worker finish so the test thread can join.
307        release.store(true, Ordering::SeqCst);
308        worker.join().unwrap();
309    }
310
311    /// Pin: exactly ONE caller wins the `begin_free` race, even
312    /// under concurrent invocation. The single-winner contract
313    /// is what makes the per-handle `_free` (which gates
314    /// `ManuallyDrop::take` on `begin_free` returning `true`)
315    /// idempotent — a second caller that also returned `true`
316    /// would double-take the inner and UAF.
317    ///
318    /// Pre-fix `begin_free` did a plain `store(true)` so every
319    /// caller saw `true` and every `_free` re-took the inner.
320    /// The post-fix `compare_exchange(false, true)` flips the
321    /// flag exactly once and subsequent callers return `false`.
322    #[test]
323    fn begin_free_has_exactly_one_winner_under_concurrency() {
324        const ROUNDS: usize = 32;
325        for _ in 0..ROUNDS {
326            let g = Arc::new(HandleGuard::new());
327            let g1 = g.clone();
328            let g2 = g.clone();
329            let t1 = std::thread::spawn(move || g1.begin_free(Duration::from_millis(50)));
330            let t2 = std::thread::spawn(move || g2.begin_free(Duration::from_millis(50)));
331            let r1 = t1.join().unwrap();
332            let r2 = t2.join().unwrap();
333            assert!(
334                r1 ^ r2,
335                "exactly one caller must win begin_free; got r1={r1} r2={r2}",
336            );
337        }
338    }
339
340    /// Pin: a strictly-sequential second `begin_free` call after
341    /// a successful first call returns `false`. This is the path
342    /// every `_free` takes on a second invocation — the second
343    /// caller must skip the `ManuallyDrop::take` branch.
344    #[test]
345    fn begin_free_returns_false_on_second_sequential_call() {
346        let g = HandleGuard::new();
347        assert!(g.begin_free(Duration::from_millis(50)));
348        assert!(
349            !g.begin_free(Duration::from_millis(50)),
350            "second begin_free must bail — only the first caller \
351             owns the right to take the inner",
352        );
353    }
354
355    /// Pin: a second `begin_free` after a TIMED-OUT first call
356    /// also returns `false`. The first caller's
357    /// `compare_exchange` already flipped `freeing=true`, so the
358    /// second caller observes the flag and bails — the
359    /// already-taken inner (or inner that the timed-out caller
360    /// left in place to be leaked) must not be re-taken.
361    #[test]
362    fn begin_free_returns_false_after_timed_out_first_call() {
363        let g = Arc::new(HandleGuard::new());
364        let g_op = g.clone();
365        let release = Arc::new(AtomicBool::new(false));
366        let release_op = release.clone();
367        let worker = std::thread::spawn(move || {
368            let op = g_op.try_enter().expect("op must enter");
369            while !release_op.load(Ordering::SeqCst) {
370                std::thread::sleep(Duration::from_millis(1));
371            }
372            drop(op);
373        });
374
375        std::thread::sleep(Duration::from_millis(20));
376        // First call times out (op still in flight) — returns false
377        // but freeing is set.
378        assert!(!g.begin_free(Duration::from_millis(40)));
379
380        // Let the op drain.
381        release.store(true, Ordering::SeqCst);
382        worker.join().unwrap();
383
384        // Second call must still bail — the first call won the
385        // freeing flag even though it timed out, so no second
386        // caller may claim the right to take the inner.
387        assert!(
388            !g.begin_free(Duration::from_millis(50)),
389            "second begin_free after a timed-out first call must bail",
390        );
391    }
392}