Skip to main content

net/ffi/
handle_guard.rs

1//! Per-FFI-handle quiescing protocol.
2//!
3//! Cortex and mesh FFI handles are typically `extern "C" fn(*mut Handle, ...)`.
4//! Without explicit synchronization between in-flight ops and the
5//! handle's `_free` entry point, a foreign caller (Go cgo / Python
6//! threads / Node.js workers) racing a `_free` against an active op
7//! produces:
8//!
9//! 1. **Use-after-free on the inner.** `_free` does
10//!    `Box::from_raw(handle); drop(...)`; a concurrent op that already
11//!    dereferenced `&*handle` keeps reading freed memory.
12//!
13//! 2. **Use-after-free on the handle box itself.** Even with the
14//!    inner held alive via an `Arc<Inner>` clone (e.g.
15//!    `MeshStreamHandle._node` keeps the node alive but not the
16//!    handle box), a concurrent `_free` can deallocate the outer Box
17//!    while the op is still doing pointer-equality / handle-matching
18//!    checks via `&*handle`.
19//!
20//! [`crate::ffi::handle_guard::HandleGuard`] is the shared building
21//! block. Each handle struct embeds one inline; every `extern "C"` op
22//! gates on [`crate::ffi::handle_guard::HandleGuard::try_enter`];
23//! every `_free` drives
24//! [`crate::ffi::handle_guard::HandleGuard::begin_free`].
25//!
26//! ## Soundness: the box must outlive `try_enter`'s `fetch_add`
27//!
28//! The Dekker-style "set freeing, check active_ops" handshake orders
29//! only the atomic operations — `Box::from_raw` is a non-atomic
30//! deallocation and can interleave between an op's
31//! `&*handle` and the op's `fetch_add(active_ops)`, producing UAF on
32//! the freed atomic. The same hazard the parent
33//! [`crate::ffi::NetHandle`] addresses by intentionally leaking its
34//! box. We adopt the same rule: **never deallocate the handle box
35//! once it has been handed to C.** `_free` instead takes the inner
36//! out via [`std::mem::ManuallyDrop`] and drops it; the outer box
37//! (carrying `HandleGuard`'s atomics + the now-empty
38//! `ManuallyDrop`) is leaked permanently. Concurrent ops doing
39//! `try_enter` after free safely fetch_add on still-valid memory,
40//! observe `freeing=true`, decrement, and bail.
41//!
42//! The cost is `size_of::<Box<Handle>>()` per `_free` call. Handle
43//! types are small (a few pointers + atomics), so total leak grows
44//! with cumulative `open + free` cycles — acceptable for the
45//! soundness gain.
46//!
47//! ## Adopting the guard on a NEW handle (checklist)
48//!
49//! The protocol is applied by convention at each call site rather
50//! than via a wrapper type (the inline form is uniform across the
51//! cortex / mesh / redis-dedup / aggregator handles). Miss any step
52//! and the UAF this module exists to prevent comes back, so when you
53//! add a handle:
54//!
55//! 1. Wrap every inner field in [`std::mem::ManuallyDrop`] and embed
56//!    one [`crate::ffi::handle_guard::HandleGuard`] (not in
57//!    `ManuallyDrop` — it must outlive the inner so post-free
58//!    `try_enter` calls land on valid memory).
59//! 2. In every `extern "C"` op, gate on
60//!    [`crate::ffi::handle_guard::HandleGuard::try_enter`] *before*
61//!    touching any inner field, and return the handle's
62//!    "invalid/shutting down" sentinel (NULL / error code) on `None`.
63//! 3. Don't hold the returned
64//!    [`crate::ffi::handle_guard::HandleOp`] across a long blocking
65//!    call — clone the (Arc-backed) inner out, drop the guard, then
66//!    re-enter only to write back, so a concurrent `_free` never
67//!    waits on a caller-set deadline (see `ffi::aggregator`).
68//! 4. In `_free`, gate the `ManuallyDrop::drop` of each inner field
69//!    on [`crate::ffi::handle_guard::HandleGuard::begin_free`]
70//!    returning `true`; never `Box::from_raw` — leak the box.
71//! 5. Any pointer handed back to C must be an owned copy, never a
72//!    borrow into a `ManuallyDrop` field (which `_free` can drop).
73
74use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
75use std::time::{Duration, Instant};
76
77/// How long [`HandleGuard::begin_free`] will wait for in-flight ops
78/// to drain before giving up. On timeout, the caller must NOT take
79/// or drop the inner — concurrent ops may still be reading it. The
80/// inner is leaked along with the box.
81///
82/// Five seconds matches the parent [`crate::ffi::NetHandle`]'s
83/// `FFI_SHUTDOWN_DEADLINE`; well above any normal op latency
84/// (ingest, append, snapshot etc. are all sub-second), large enough
85/// to absorb a wedged adapter without reflexively leaking on a
86/// transient stall.
87pub const FFI_HANDLE_FREE_DEADLINE: Duration = Duration::from_secs(5);
88
89/// Outcome of [`HandleGuard::begin_free_detailed`] — lets a `_free`
90/// log accurately instead of treating every non-success the same.
91#[derive(Debug, Clone, Copy, PartialEq, Eq)]
92pub enum BeginFree {
93    /// This call won the free race AND in-flight ops drained: the
94    /// caller owns the right to take/drop the inner exactly once.
95    Drained,
96    /// A prior `_free` already won for this handle (benign repeat
97    /// free). The caller must NOT touch the inner — the winner has
98    /// it — and nothing is leaked by this call, so don't warn.
99    AlreadyFreeing,
100    /// This call won the free race but in-flight ops did not drain
101    /// within the deadline. The caller must leak the inner (a
102    /// concurrent op may still be reading it) — worth a warning.
103    TimedOut,
104}
105
106/// Per-handle quiescing core. Lives inline inside each handle
107/// struct. `try_enter` returns a guard that prevents `_free` from
108/// completing until dropped; `begin_free` quiesces in-flight ops
109/// and prevents new ones.
110pub struct HandleGuard {
111    /// Set to `true` once `_free` has been called for this handle.
112    /// All future `try_enter` calls observe this and bail. Stored
113    /// as `AtomicBool` (not a generation counter) because we never
114    /// re-use the handle after free — once flipped, never reset.
115    freeing: AtomicBool,
116    /// Number of in-flight ops currently inside `try_enter`'s guard.
117    /// `_free` waits for this to reach zero (with a deadline) before
118    /// taking the inner.
119    active_ops: AtomicU32,
120}
121
122impl HandleGuard {
123    /// Construct an empty guard. Use as a `const` initializer when
124    /// possible.
125    pub const fn new() -> Self {
126        Self {
127            freeing: AtomicBool::new(false),
128            active_ops: AtomicU32::new(0),
129        }
130    }
131
132    /// Try to enter an FFI operation against this handle.
133    ///
134    /// Increments `active_ops` first so a concurrent `begin_free`
135    /// is forced to observe the increment OR to set `freeing` first
136    /// (they synchronize via SeqCst). After the increment, we
137    /// re-check `freeing`: if free is in progress, the op cannot
138    /// proceed and we decrement back out. Otherwise we return a
139    /// guard whose `Drop` decrements.
140    ///
141    /// Returns `None` if `_free` has already started — the caller
142    /// must surface a typed "shutting down / freed" error code and
143    /// MUST NOT touch any fields of the handle except this
144    /// `HandleGuard` (which lives in still-valid leaked memory).
145    pub fn try_enter(&self) -> Option<HandleOp<'_>> {
146        // SeqCst: pairs with `begin_free`'s SeqCst freeing-store.
147        // The total order ensures every (try_enter, begin_free)
148        // pair agrees on which side won — either we observe
149        // `freeing=true` (and bail), or `begin_free` observes
150        // `active_ops > 0` (and waits).
151        self.active_ops.fetch_add(1, Ordering::SeqCst);
152        if self.freeing.load(Ordering::SeqCst) {
153            self.active_ops.fetch_sub(1, Ordering::AcqRel);
154            None
155        } else {
156            Some(HandleOp { core: self })
157        }
158    }
159
160    /// Mark the handle as freeing and wait for in-flight ops to
161    /// drain. Returns `true` if THIS call won the race to flip
162    /// `freeing` AND in-flight ops drained within
163    /// [`FFI_HANDLE_FREE_DEADLINE`]. Returns `false` on timeout
164    /// OR if a prior caller already flipped `freeing`.
165    ///
166    /// **Single-winner contract.** Only ONE caller across the
167    /// lifetime of this guard ever sees `true`. That winning
168    /// caller is the one that owns the right to take the inner
169    /// out of `ManuallyDrop` exactly once. Subsequent callers
170    /// (whether concurrent or strictly after) see `false` and
171    /// MUST NOT touch the inner — the winner has it (or had it,
172    /// and dropped it).
173    ///
174    /// This is what makes `_free` idempotent: a second `_free`
175    /// call gates the `ManuallyDrop::take` behind this method's
176    /// `true` return, so it bails before the double-take that
177    /// would UAF the inner allocation.
178    ///
179    /// On timeout (winner observed `freeing=false→true` but
180    /// drain didn't complete), the caller must NOT take the
181    /// inner — concurrent ops may still be holding it. Leak
182    /// inner along with the box.
183    ///
184    /// Future `try_enter` calls will see `freeing=true` and bail,
185    /// regardless of whether the winner's drain succeeded, timed
186    /// out, or this caller is the loser. "No NEW ops will start"
187    /// is set as soon as the winner flips the flag.
188    pub fn begin_free(&self, deadline: Duration) -> bool {
189        matches!(self.begin_free_detailed(deadline), BeginFree::Drained)
190    }
191
192    /// Same protocol as [`Self::begin_free`] but distinguishes the
193    /// two `false` outcomes so a caller can log accurately: a benign
194    /// repeat `_free` ([`BeginFree::AlreadyFreeing`]) leaks nothing
195    /// and shouldn't warn, whereas a genuine drain timeout
196    /// ([`BeginFree::TimedOut`]) deliberately leaks the inner and is
197    /// worth a warning. The single-winner contract is unchanged:
198    /// exactly one caller ever sees [`BeginFree::Drained`].
199    pub fn begin_free_detailed(&self, deadline: Duration) -> BeginFree {
200        // compare_exchange so only one caller wins the right to
201        // flip false→true. Losers (whether racing concurrently
202        // or strictly after) get an Err and bail without ever
203        // entering the drain loop. SeqCst pairs with
204        // `try_enter`'s SeqCst load and matches the rest of the
205        // protocol's ordering. Pre-fix this was a `store(true)`
206        // which made every caller "win" — the cortex / mesh
207        // `_free` then double-took the inner via `ManuallyDrop::
208        // take`, UAF on the second call.
209        if self
210            .freeing
211            .compare_exchange(false, true, Ordering::SeqCst, Ordering::SeqCst)
212            .is_err()
213        {
214            return BeginFree::AlreadyFreeing;
215        }
216        let start = Instant::now();
217        // Spin-with-sleep is appropriate: ops are sub-second; the
218        // deadline catches pathological wedge cases. We don't have
219        // an OS-level wait primitive on the atomic without
220        // platform-specific atomic_wait (stable in Rust 1.89+ but
221        // a larger refactor); the 1ms sleep keeps CPU low while
222        // the deadline is large enough to absorb normal jitter.
223        while self.active_ops.load(Ordering::SeqCst) > 0 {
224            if start.elapsed() >= deadline {
225                return BeginFree::TimedOut;
226            }
227            std::thread::sleep(Duration::from_millis(1));
228        }
229        BeginFree::Drained
230    }
231
232    /// True if `begin_free` has been called for this handle.
233    /// Useful for assertions in tests; production paths should use
234    /// `try_enter` (which already gates on this).
235    #[cfg(test)]
236    pub fn is_freeing(&self) -> bool {
237        self.freeing.load(Ordering::SeqCst)
238    }
239}
240
241impl Default for HandleGuard {
242    fn default() -> Self {
243        Self::new()
244    }
245}
246
247/// RAII guard returned by [`HandleGuard::try_enter`]. While alive,
248/// `begin_free` is forced to wait — the in-flight count seen by
249/// `begin_free` includes this op.
250///
251/// Holds only a borrow of the [`HandleGuard`] (which lives in the
252/// leaked handle box, so the borrow is sound for any duration the
253/// op chooses). No public methods — drop is the only operation.
254pub struct HandleOp<'a> {
255    core: &'a HandleGuard,
256}
257
258impl Drop for HandleOp<'_> {
259    fn drop(&mut self) {
260        self.core.active_ops.fetch_sub(1, Ordering::AcqRel);
261    }
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267    use std::sync::Arc;
268
269    /// Pin: `try_enter` succeeds on a fresh guard; `Drop`
270    /// decrements `active_ops` so a subsequent `begin_free`
271    /// drains immediately.
272    #[test]
273    fn try_enter_succeeds_and_drop_decrements() {
274        let g = HandleGuard::new();
275        {
276            let _op = g.try_enter().expect("fresh guard must accept ops");
277            assert_eq!(g.active_ops.load(Ordering::SeqCst), 1);
278        }
279        assert_eq!(g.active_ops.load(Ordering::SeqCst), 0);
280        assert!(g.begin_free(Duration::from_millis(50)));
281    }
282
283    /// Pin: `begin_free` flips `freeing` so subsequent `try_enter`
284    /// calls bail with `None`.
285    #[test]
286    fn try_enter_after_free_returns_none() {
287        let g = HandleGuard::new();
288        assert!(g.begin_free(Duration::from_millis(50)));
289        assert!(g.try_enter().is_none());
290        // No-op leak: active_ops was already 0 + nothing increments
291        // it on a None return path.
292        assert_eq!(g.active_ops.load(Ordering::SeqCst), 0);
293    }
294
295    /// A `_free` racing an in-flight op must wait for the op to
296    /// finish before returning success. Without the guard, `_free`
297    /// would be an unconditional `Box::from_raw` and the op's
298    /// subsequent dereference would UAF.
299    #[test]
300    fn begin_free_waits_for_inflight_op() {
301        let g = Arc::new(HandleGuard::new());
302
303        // Spawn a worker that holds an op for ~50ms.
304        let g_op = g.clone();
305        let started = Arc::new(AtomicBool::new(false));
306        let started_op = started.clone();
307        let worker = std::thread::spawn(move || {
308            let op = g_op.try_enter().expect("op must enter before free");
309            started_op.store(true, Ordering::SeqCst);
310            std::thread::sleep(Duration::from_millis(50));
311            drop(op);
312        });
313
314        // Wait for the worker to enter the op so we don't race the
315        // try_enter itself.
316        while !started.load(Ordering::SeqCst) {
317            std::thread::yield_now();
318        }
319
320        // begin_free MUST block until the op drops. A pre-fix free
321        // would return immediately with the op still running →
322        // subsequent inner-drop UAFs the op.
323        let t0 = Instant::now();
324        let drained = g.begin_free(Duration::from_secs(2));
325        let elapsed = t0.elapsed();
326        assert!(drained, "begin_free must drain within deadline");
327        assert!(
328            elapsed >= Duration::from_millis(40),
329            "begin_free returned in {:?} — must have waited for the in-flight op",
330            elapsed,
331        );
332        worker.join().unwrap();
333    }
334
335    /// Pin: `begin_free` returns `false` on timeout when an op
336    /// holds the guard past the deadline. Callers MUST leak the
337    /// inner in this case rather than dropping it.
338    #[test]
339    fn begin_free_times_out_when_op_outlasts_deadline() {
340        let g = Arc::new(HandleGuard::new());
341        let g_op = g.clone();
342        let release = Arc::new(AtomicBool::new(false));
343        let release_op = release.clone();
344        let worker = std::thread::spawn(move || {
345            let op = g_op.try_enter().expect("op must enter");
346            while !release_op.load(Ordering::SeqCst) {
347                std::thread::sleep(Duration::from_millis(1));
348            }
349            drop(op);
350        });
351
352        // Brief sleep to let the worker enter; deadline is shorter
353        // than the worker's hold time.
354        std::thread::sleep(Duration::from_millis(20));
355        let drained = g.begin_free(Duration::from_millis(50));
356        assert!(!drained, "deadline expired with op still in flight");
357        // freeing is still set even on timeout — future ops bail.
358        assert!(g.is_freeing());
359        assert!(g.try_enter().is_none());
360
361        // Let the worker finish so the test thread can join.
362        release.store(true, Ordering::SeqCst);
363        worker.join().unwrap();
364    }
365
366    /// Pin: exactly ONE caller wins the `begin_free` race, even
367    /// under concurrent invocation. The single-winner contract
368    /// is what makes the per-handle `_free` (which gates
369    /// `ManuallyDrop::take` on `begin_free` returning `true`)
370    /// idempotent — a second caller that also returned `true`
371    /// would double-take the inner and UAF.
372    ///
373    /// Pre-fix `begin_free` did a plain `store(true)` so every
374    /// caller saw `true` and every `_free` re-took the inner.
375    /// The post-fix `compare_exchange(false, true)` flips the
376    /// flag exactly once and subsequent callers return `false`.
377    #[test]
378    fn begin_free_has_exactly_one_winner_under_concurrency() {
379        const ROUNDS: usize = 32;
380        for _ in 0..ROUNDS {
381            let g = Arc::new(HandleGuard::new());
382            let g1 = g.clone();
383            let g2 = g.clone();
384            let t1 = std::thread::spawn(move || g1.begin_free(Duration::from_millis(50)));
385            let t2 = std::thread::spawn(move || g2.begin_free(Duration::from_millis(50)));
386            let r1 = t1.join().unwrap();
387            let r2 = t2.join().unwrap();
388            assert!(
389                r1 ^ r2,
390                "exactly one caller must win begin_free; got r1={r1} r2={r2}",
391            );
392        }
393    }
394
395    /// Pin: a strictly-sequential second `begin_free` call after
396    /// a successful first call returns `false`. This is the path
397    /// every `_free` takes on a second invocation — the second
398    /// caller must skip the `ManuallyDrop::take` branch.
399    #[test]
400    fn begin_free_returns_false_on_second_sequential_call() {
401        let g = HandleGuard::new();
402        assert!(g.begin_free(Duration::from_millis(50)));
403        assert!(
404            !g.begin_free(Duration::from_millis(50)),
405            "second begin_free must bail — only the first caller \
406             owns the right to take the inner",
407        );
408    }
409
410    /// Pin: `begin_free_detailed` distinguishes the three outcomes,
411    /// so `_free` can warn only on a genuine drain timeout and stay
412    /// quiet on a benign repeat free.
413    #[test]
414    fn begin_free_detailed_distinguishes_outcomes() {
415        // Fresh guard, no ops → Drained (the winner).
416        let g = HandleGuard::new();
417        assert_eq!(
418            g.begin_free_detailed(Duration::from_millis(50)),
419            BeginFree::Drained
420        );
421        // Second call → AlreadyFreeing (benign repeat, not a timeout).
422        assert_eq!(
423            g.begin_free_detailed(Duration::from_millis(50)),
424            BeginFree::AlreadyFreeing
425        );
426
427        // Op held past the deadline → TimedOut for the winner.
428        let g2 = Arc::new(HandleGuard::new());
429        let g2_op = g2.clone();
430        let release = Arc::new(AtomicBool::new(false));
431        let release_op = release.clone();
432        let worker = std::thread::spawn(move || {
433            let op = g2_op.try_enter().expect("op must enter");
434            while !release_op.load(Ordering::SeqCst) {
435                std::thread::sleep(Duration::from_millis(1));
436            }
437            drop(op);
438        });
439        std::thread::sleep(Duration::from_millis(20));
440        assert_eq!(
441            g2.begin_free_detailed(Duration::from_millis(40)),
442            BeginFree::TimedOut
443        );
444        // After a timed-out winner, a later call is still AlreadyFreeing.
445        assert_eq!(
446            g2.begin_free_detailed(Duration::from_millis(40)),
447            BeginFree::AlreadyFreeing
448        );
449        release.store(true, Ordering::SeqCst);
450        worker.join().unwrap();
451    }
452
453    /// Pin: a second `begin_free` after a TIMED-OUT first call
454    /// also returns `false`. The first caller's
455    /// `compare_exchange` already flipped `freeing=true`, so the
456    /// second caller observes the flag and bails — the
457    /// already-taken inner (or inner that the timed-out caller
458    /// left in place to be leaked) must not be re-taken.
459    #[test]
460    fn begin_free_returns_false_after_timed_out_first_call() {
461        let g = Arc::new(HandleGuard::new());
462        let g_op = g.clone();
463        let release = Arc::new(AtomicBool::new(false));
464        let release_op = release.clone();
465        let worker = std::thread::spawn(move || {
466            let op = g_op.try_enter().expect("op must enter");
467            while !release_op.load(Ordering::SeqCst) {
468                std::thread::sleep(Duration::from_millis(1));
469            }
470            drop(op);
471        });
472
473        std::thread::sleep(Duration::from_millis(20));
474        // First call times out (op still in flight) — returns false
475        // but freeing is set.
476        assert!(!g.begin_free(Duration::from_millis(40)));
477
478        // Let the op drain.
479        release.store(true, Ordering::SeqCst);
480        worker.join().unwrap();
481
482        // Second call must still bail — the first call won the
483        // freeing flag even though it timed out, so no second
484        // caller may claim the right to take the inner.
485        assert!(
486            !g.begin_free(Duration::from_millis(50)),
487            "second begin_free after a timed-out first call must bail",
488        );
489    }
490}