Skip to main content

graphrefly_core/
mailbox.rs

1//! `CoreMailbox` — the `Send + Sync` bridge between autonomous async
2//! producers (timer tasks) and an owned, relocatable [`crate::node::Core`].
3//!
4//! # Why this exists (D223 / D225 / D227 / D230)
5//!
6//! Under the actor / work-stealing model (D221) a `Core` owns its state
7//! cell **by value** and moves between workers — so a long-lived async
8//! task (e.g. a `tokio::spawn`-ed timer loop) can no longer hold
9//! `&Core` / `Arc<C>` / `Weak<C>` to call `Core::emit` directly (that
10//! was the deleted `WeakCore` path; D223 forbids `Weak<C>` back-refs).
11//!
12//! Instead the task holds an `Arc<CoreMailbox>` (`Send + Sync`) and
13//! **posts** a `(NodeId, HandleId)` emit request. The mailbox is drained
14//! **owner-side** by the synchronous [`crate::node::Core::drain_mailbox`]
15//! (applied via the existing sync `Core::emit` — *no async in Core*,
16//! honoring the locked "Core never async" invariant). The drain point is
17//! the embedder's existing advance/pump site (test harness `TestRuntime`
18//! advance helper, napi pump): timer tasks already require the host
19//! runtime to be advanced before they fire, so draining there is
20//! **behaviour-identical** to the old autonomous `Weak::upgrade →
21//! core.emit` (D230).
22//!
23//! # Shape (D227 full)
24//!
25//! - **Op queue** — FIFO of [`MailboxOp`]s, applied owner-side.
26//! - **`closed`** — set when the owning `Core` drops; a timer task that
27//!   observes it releases its pending handle and bails (mirrors the old
28//!   `Weak::upgrade() == None` teardown path exactly).
29//! - **`runnable`** — the per-group "this Core has work" wake bit. S2b
30//!   lands the field + sets/clears it (D227 builds the *full* mailbox
31//!   shape now); S4 wires it to wave drain-scoping + finalize, and the
32//!   host-executor (M6) reads it to schedule the owning worker. The
33//!   in-wave drain (`BatchGuard::drain_and_flush`) already gates on it
34//!   so the no-producer §7 floor pays one atomic load, not a mutex.
35//!
36//! # This is ONE mechanism (not six)
37//!
38//! The S2b design history records six producer "forks" (D227–D233);
39//! that is *design-question* count, not runtime surface. What actually
40//! exists is **one** owner-side re-entry mechanism:
41//!
42//! > A `Send + Sync` FIFO of deferred [`MailboxOp`]s, drained on the
43//! > owner thread — **in-wave to quiescence** for producer sinks
44//! > (`BatchGuard::drain_and_flush`) and at the embedder pump point for
45//! > autonomous timer tasks ([`crate::node::Core::drain_mailbox`]) —
46//! > each op applied via the one object-safe owner re-entry surface
47//! > [`crate::node::CoreFull`].
48//!
49//! `MailboxOp` keeps a **typed fast path** (`Emit`/`Complete`/`Error`:
50//! zero-alloc, and a deterministic `Core`-gone handle-release contract —
51//! see the per-kind `post_*`) plus a **`Defer` escape hatch** (a boxed
52//! `FnOnce(&dyn CoreFull)` for value-returning topology mutation:
53//! windowing / higher-order inner subscribe). Collapsing the enum to
54//! pure `Defer` was considered and rejected — it would heap-allocate per
55//! timer/producer emit AND lose the typed `Core`-gone release affordance
56//! (a dropped `FnOnce` can't run an else-branch). `ProducerEmitter`
57//! (`graphrefly-operators`) is thin sugar over `post_*`; the producer
58//! *build* closure isn't a mechanism at all — it uses the `&Core` its
59//! `ProducerCtx` already lends it (D231). One queue, one drain loop, one
60//! `match`, one re-entry trait.
61
62use std::cell::{Cell, RefCell};
63use std::collections::VecDeque;
64use std::sync::atomic::{AtomicBool, Ordering};
65
66use crate::handle::{HandleId, NodeId};
67
68/// **`Send`** cross-thread deferred closure (D233; D249/S2c). Posted
69/// by any cross-thread `Send` producer (canonically an autonomous
70/// timer task — `temporal.rs` `window_time`/etc., a `tokio::spawn`ed
71/// cross-thread task whose defer closure captures only `Send` state:
72/// `Arc<Mutex<NodeId>>` / `Arc<dyn BindingBoundary>` / ids — but the
73/// API admits any future cross-thread producer with the same `Send`
74/// capture discipline, e.g. a napi/pyo3 binding-layer pump) and
75/// applied owner-side. Rides the `Send + Sync` [`CoreMailbox`]
76/// (cross-thread post side, drained owner-side via `drain_mailbox`).
77pub type SendDeferFn = Box<dyn FnOnce(&dyn crate::node::CoreFull) + Send>;
78
79/// **`!Send`** owner-side deferred closure (D248/D249/S2c). Posted by
80/// an owner-side in-wave producer/graph sink whose closure captures
81/// `!Send` state (a relaxed `Sink` / `Rc<RefCell<GraphInner>>` —
82/// D248). Lives in the owner-only [`DeferQueue`], **never** the
83/// cross-thread [`CoreMailbox`].
84pub type DeferFn = Box<dyn FnOnce(&dyn crate::node::CoreFull)>;
85
86/// A re-entry request posted to the [`CoreMailbox`] by an autonomous
87/// async producer (timer task → `Emit`) or by a producer-operator sink
88/// (D232-AMEND/A′ → `Emit`/`Complete`/`Error`). Applied owner-side via
89/// the sync `Core::{emit,complete,error}` by [`crate::node::Core::drain_mailbox`]
90/// — drained **in-wave to quiescence** by the `BatchGuard` drain loop
91/// for producer sinks (immediate, cascade-ordering-preserving), and at
92/// the embedder pump point for timer tasks (D230).
93pub enum MailboxOp {
94    /// `Core::emit(node, handle)`. Posted by timer tasks + producer sinks.
95    Emit(NodeId, HandleId),
96    /// `Core::complete(node)`. Posted by producer sinks.
97    Complete(NodeId),
98    /// `Core::error(node, handle)`. Posted by producer sinks.
99    Error(NodeId, HandleId),
100    /// **`Send`** owner-side closure (D233; D249/S2c). Posted by a
101    /// cross-thread timer task (`temporal.rs` `window_time`/etc.) whose
102    /// closure captures only `Send` state; applied **in-wave** by the
103    /// drain loop (the owner holds `&Core`). The `!Send` owner-side
104    /// sink defers (graph describe/observe, control/higher-order
105    /// dynamic-inner) go to the separate owner-only [`DeferQueue`]
106    /// instead — D248/D249.
107    Defer(SendDeferFn),
108}
109
110impl std::fmt::Debug for MailboxOp {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        match self {
113            Self::Emit(n, h) => write!(f, "Emit({n:?}, {h:?})"),
114            Self::Complete(n) => write!(f, "Complete({n:?})"),
115            Self::Error(n, h) => write!(f, "Error({n:?}, {h:?})"),
116            Self::Defer(_) => write!(f, "Defer(<send closure>)"),
117        }
118    }
119}
120
121/// `Send + Sync` mailbox bridging autonomous async producers to an owned,
122/// relocatable [`crate::node::Core`]. Held behind an `Arc`; the `Core`
123/// owns one clone, each timer task another. See the module docs.
124pub struct CoreMailbox {
125    /// FIFO of [`MailboxOp`]s posted by timer tasks (`Emit`) and
126    /// producer-operator sinks (`Emit`/`Complete`/`Error`), applied
127    /// owner-side by [`crate::node::Core::drain_mailbox`] via the sync
128    /// `Core::{emit,complete,error}`.
129    ops: parking_lot::Mutex<VecDeque<MailboxOp>>,
130    /// Set by [`Self::close`] when the owning `Core` drops. A timer task
131    /// observing `true` from [`Self::post_emit`] is told to release its
132    /// pending handle and bail (the old `Weak::upgrade() == None` path).
133    closed: AtomicBool,
134    /// "This Core has queued work" wake bit (D227 full shape;
135    /// **finalized at S4**). Set on every successful [`Self::post_op`];
136    /// cleared by [`Self::drain_into`]/[`Self::take_all`] once the queue
137    /// empties (under the `ops` lock — QA F-#4 lost-wakeup discipline).
138    ///
139    /// **Actor-model granularity (S4, D246/D248/D249).** `Core` is
140    /// single-owner `!Send + !Sync`; in the actor model one worker owns
141    /// exactly one `Core`, so this Core-wide bit **is** that worker's
142    /// per-Core runnable-wake. It is the only cross-thread bridge into
143    /// a `!Send` Core: timer/producer tasks `post_*` + signal; the
144    /// owner drains ([`crate::node::Core::drain_mailbox`] / the in-wave
145    /// `BatchGuard`). M6 (deferred) reads [`Self::is_runnable`] from the
146    /// host executor to decide when to poll a worker's Core. The M6
147    /// scheduling-grain (per-Core vs finer) is TBD per the M6 design
148    /// pass; D253 deleted the `SchedulingGroupId` surface that the
149    /// pre-actor framing hung on.
150    ///
151    /// QA F12 (2026-05-19, amended D274 doc-hygiene 2026-05-21): if a
152    /// future per-Core sub-wake bit is ever split out (M6 design pass),
153    /// it MUST be split in lockstep across BOTH this `CoreMailbox.runnable`
154    /// AND `DeferQueue.runnable` — the in-wave drain gate
155    /// (`BatchGuard::drain_and_flush`) ORs the two, and a half-split
156    /// would silently lose wakeups for the unsplit queue.
157    runnable: AtomicBool,
158}
159
160impl CoreMailbox {
161    /// A fresh, open, empty mailbox.
162    #[must_use]
163    pub fn new() -> Self {
164        Self {
165            ops: parking_lot::Mutex::new(VecDeque::new()),
166            closed: AtomicBool::new(false),
167            runnable: AtomicBool::new(false),
168        }
169    }
170
171    /// Post a timer-fired emit request. Returns `false` iff the owning
172    /// `Core` has already dropped ([`Self::close`] was called) — the
173    /// caller MUST then release `handle` and stop (mirrors the old
174    /// `WeakCore::upgrade() == None` teardown branch in `timer.rs`).
175    /// Returns `true` when queued (and sets the `runnable` wake bit).
176    #[must_use]
177    pub fn post_emit(&self, node_id: NodeId, handle: HandleId) -> bool {
178        self.post_op(MailboxOp::Emit(node_id, handle))
179    }
180
181    /// Post a producer-sink `Complete` (D232-AMEND/A′). Returns `false`
182    /// iff the owning `Core` is gone (caller stops; nothing to release).
183    #[must_use]
184    pub fn post_complete(&self, node_id: NodeId) -> bool {
185        self.post_op(MailboxOp::Complete(node_id))
186    }
187
188    /// Post a producer-sink `Error` (D232-AMEND/A′). Returns `false` iff
189    /// the owning `Core` is gone — the caller MUST then release
190    /// `handle` (it owned a retain for the would-be `error` payload).
191    #[must_use]
192    pub fn post_error(&self, node_id: NodeId, handle: HandleId) -> bool {
193        self.post_op(MailboxOp::Error(node_id, handle))
194    }
195
196    /// Post a **`Send`** cross-thread `Defer` (D249/S2c). For an
197    /// autonomous timer task whose closure captures only `Send` state
198    /// (`temporal.rs` `window_time`/etc.). Returns `false` iff the
199    /// owning `Core` is gone — the closure is dropped unrun. The
200    /// `!Send` owner-side sink defers use [`DeferQueue::post`] instead.
201    #[must_use]
202    pub fn post_defer(&self, f: SendDeferFn) -> bool {
203        self.post_op(MailboxOp::Defer(f))
204    }
205
206    /// Post a [`MailboxOp`]. Returns `false` iff the owning `Core` has
207    /// already dropped ([`Self::close`]) — see the per-kind wrappers for
208    /// the caller's handle-release obligation.
209    ///
210    /// QA F-A (2026-05-18): the `closed` check and the `push_back` are
211    /// performed **in one `ops`-lock critical section** so a concurrent
212    /// owner-thread `close()` (which also takes `ops`) cannot interleave
213    /// between "observed not-closed" and "enqueued" — that TOCTOU would
214    /// strand the op (with its retained `HandleId`) in a queue
215    /// `Drop for Core` already walked → leak. `close()` takes the same
216    /// lock, so the two are mutually exclusive.
217    #[must_use]
218    pub fn post_op(&self, op: MailboxOp) -> bool {
219        let mut q = self.ops.lock();
220        if self.closed.load(Ordering::Acquire) {
221            return false;
222        }
223        q.push_back(op);
224        self.runnable.store(true, Ordering::Release);
225        true
226    }
227
228    /// Owner-side drain. Pops every queued [`MailboxOp`] in FIFO order
229    /// and hands each to `apply` (the caller passes a closure over the
230    /// sync `Core::{emit,complete,error}`). Re-entrancy: `apply` may
231    /// itself cascade and a concurrent timer task / re-entrant sink may
232    /// post again — a fresh post re-sets `runnable`, so the enclosing
233    /// drain-to-quiescence loop (or a later drain) picks it up.
234    ///
235    /// QA F-#4 (2026-05-18): the empty observation and the
236    /// `runnable = false` store happen **in the same `ops`-lock critical
237    /// section. Previously the empty `pop_front` released the lock before
238    /// storing `false`, so a concurrent `post_op` (which takes `ops`)
239    /// could push and set `runnable=true` in between, then our `false`
240    /// store would clobber it — a lost wakeup invisible to the
241    /// `is_runnable`-gated in-wave drain and the S4/M6 scheduler.
242    /// Clearing `runnable` while still holding the lock every `post_op`
243    /// must take orders them: a post is either popped here or runs
244    /// strictly after the `false` store and re-sets `true`.
245    /// `max_ops` bounds a single drain (QA P3, 2026-05-18): `apply` may
246    /// re-post (a `Defer` that re-`defer`s, a producer re-subscribing),
247    /// which this loop correctly drains in the *same* call — but a
248    /// closure that re-posts itself on *every* application is an
249    /// unbounded mailbox livelock (the producer-authoring analogue of a
250    /// fn that emits to itself). That livelock lives HERE (the inner
251    /// drain loop), not in `drain_and_flush`'s fire-cascade `cap`, so it
252    /// is bounded + panics here — decoupled from the fire counter so a
253    /// legitimately large finite producer drain never false-trips the
254    /// fire `cap`.
255    ///
256    /// /qa M3 (2026-05-19): a panic from `apply(f)` mid-drain previously
257    /// unwound out of this loop with `runnable` still `true` — under the
258    /// `parking_lot::Mutex` shape that was self-correcting (the next
259    /// drain pop would re-observe + clear), but downstream
260    /// `is_runnable()` gates would spuriously return `true` until then.
261    /// Wraps the empty-queue clear in a `RunnableClearGuard` so an
262    /// `apply` panic still clears `runnable` on unwind: if the queue is
263    /// empty at unwind time the cell is reset; if the queue is
264    /// non-empty the next post would re-set it anyway. Tightens the
265    /// scheduler-wakeup contract under panic.
266    ///
267    /// # Panics
268    ///
269    /// Panics if more than `max_ops` ops are applied in one drain — that
270    /// indicates a producer / `Defer` op re-posting itself every
271    /// application (livelock guard). The default cap is sized for
272    /// realistic cascades; bump via the corresponding setter if your
273    /// workload has evidence it needs more.
274    pub fn drain_into(&self, max_ops: u32, mut apply: impl FnMut(MailboxOp)) {
275        let mut applied = 0u32;
276        // /qa M3 RAII: clear `runnable` on panic unwind if the queue
277        // drains empty during unwind. No-op on the normal exit path —
278        // the explicit `runnable.store(false)` below the empty `pop_front`
279        // still races-free under the `ops` lock (QA F-#4).
280        let _runnable_clear = MailboxRunnableClearOnPanic {
281            ops: &self.ops,
282            runnable: &self.runnable,
283        };
284        loop {
285            let op = {
286                let mut q = self.ops.lock();
287                let Some(op) = q.pop_front() else {
288                    self.runnable.store(false, Ordering::Release);
289                    return;
290                };
291                op
292            };
293            applied += 1;
294            assert!(
295                applied < max_ops,
296                "mailbox drain exceeded {max_ops} ops in one drain — a \
297                 producer/Defer op is re-posting itself every application \
298                 (mailbox livelock). Tune via \
299                 Core::set_max_batch_drain_iterations only with concrete \
300                 evidence the workload needs more."
301            );
302            apply(op);
303        }
304    }
305
306    /// Whether the mailbox currently holds queued work (the wake bit).
307    /// Advisory pre-M6 (the embedder pump drains unconditionally); S4/M6
308    /// consumers gate scheduling on it. `#[must_use]` (/qa m4) — a
309    /// discarded result silently loses the scheduling signal.
310    #[must_use = "is_runnable is the scheduling wake-bit; ignoring it loses the signal"]
311    pub fn is_runnable(&self) -> bool {
312        self.runnable.load(Ordering::Acquire)
313    }
314
315    /// Mark the owning `Core` gone. Idempotent. Takes the `ops` lock so
316    /// it is mutually exclusive with [`Self::post_op`]'s under-lock
317    /// `closed` check (QA F-A): after this returns, no further `post_op`
318    /// can enqueue. Callers MUST then [`Self::take_all`] and release any
319    /// `Emit`/`Error` payload handles still queued (a TOCTOU-enqueued op
320    /// posted just before `close` won the lock).
321    pub fn close(&self) {
322        let _q = self.ops.lock();
323        self.closed.store(true, Ordering::Release);
324    }
325
326    /// Drain and return every still-queued [`MailboxOp`] without
327    /// applying it — for `Drop for Core` teardown (QA F-A / Blind #2).
328    /// `Emit`/`Error` ops carry a retained `HandleId` the caller must
329    /// release; `Defer` closures are dropped unrun (running `CoreFull`
330    /// on a half-dropped `Core` is unsound — user-locked QA decision A,
331    /// 2026-05-18). Clears `runnable` under the lock (same race
332    /// discipline as `drain_into`).
333    #[must_use]
334    pub fn take_all(&self) -> VecDeque<MailboxOp> {
335        let mut q = self.ops.lock();
336        // QA F11 (2026-05-19): contract gate — `take_all` MUST be
337        // preceded by `close()` so no further `post_op` can enqueue
338        // after this returns (a post racing a standalone `take_all`
339        // would otherwise strand an op with its retained `HandleId`
340        // in a mailbox the caller assumes is empty). The one in-tree
341        // caller (`Drop for Core`) sequences `close()` first.
342        debug_assert!(
343            self.closed.load(Ordering::Acquire),
344            "CoreMailbox::take_all must be called after close() — see \
345             docstring contract"
346        );
347        self.runnable.store(false, Ordering::Release);
348        std::mem::take(&mut *q)
349    }
350
351    /// Whether [`Self::close`] has been called.
352    #[must_use]
353    pub fn is_closed(&self) -> bool {
354        self.closed.load(Ordering::Acquire)
355    }
356}
357
358impl Default for CoreMailbox {
359    fn default() -> Self {
360        Self::new()
361    }
362}
363
364// `CoreMailbox` is `Send + Sync` by construction (parking_lot::Mutex +
365// atomics over the id-only `MailboxOp`). Asserted so a future field
366// that breaks it fails here, not at the `Arc<CoreMailbox>` share site
367// in `timer.rs`. D249/S2c: this MUST hold — the `!Send` owner-side
368// `Defer` payload now lives in [`DeferQueue`], NOT here.
369const _: fn() = || {
370    fn assert_send_sync<T: Send + Sync>() {}
371    assert_send_sync::<CoreMailbox>();
372};
373
374/// Owner-only deferred-closure queue (D249/S2c — the minimal Defer
375/// split pulled out of [`CoreMailbox`]; D254 (S5) relaxed off cross-
376/// thread primitives).
377///
378/// Under D248 full single-owner the substrate `Sink`/`TopologySink`
379/// dropped `Send + Sync`, so a [`DeferFn`] (which captures relaxed
380/// `Sink`s / `Rc<RefCell<GraphInner>>`) is `!Send`. It cannot ride the
381/// `Send + Sync` [`CoreMailbox`] (that bridges the `timer.rs`
382/// cross-thread `Arc<CoreMailbox>` post side). This queue is therefore
383/// **owner-only and `!Send`**: held behind an `Rc` shared between
384/// [`crate::node::Core`] and `graphrefly-operators`' `ProducerEmitter`
385/// on the one owner thread. Drained owner-side by
386/// [`crate::node::Core::drain_mailbox`] (after the id-mailbox) and the
387/// in-wave `BatchGuard` drain.
388///
389/// **Owner-thread-only by construction (D254 (S5), 2026-05-19).** The
390/// `Rc<DeferQueue>` is never sent across threads (the closures it holds
391/// are `!Send`), so the cross-thread primitives it used to carry —
392/// `parking_lot::Mutex<VecDeque<DeferFn>>` for `q` and `AtomicBool` for
393/// `runnable` — were unused capacity. D254 collapses them to
394/// `RefCell<VecDeque<DeferFn>>` + `Cell<bool>`, dropping the
395/// `parking_lot::Mutex::lock` acquire on every owner-side post/drain
396/// (the hottest D251 path). `closed` similarly drops to `Cell<bool>` —
397/// it is set by `Drop for Core` on the owner thread (the only `close()`
398/// call site) and read by `post`/`drain_into` on the same thread.
399///
400/// S4 still does the per-group-wake + typed snapshot/prune `MailboxOp`
401/// reshape (D246 rule 8) — D249 is the acknowledged minimal first
402/// touch that lets the D248 single-owner Sink relaxation land in S2c.
403pub struct DeferQueue {
404    q: RefCell<VecDeque<DeferFn>>,
405    /// Set by [`Self::close`] (owner-side, called from `Drop for Core`).
406    /// `Cell<bool>` (D254): owner-thread-only by D248/D249 construction,
407    /// no cross-thread reader.
408    closed: Cell<bool>,
409    /// "Has queued work" bit — mirrors `CoreMailbox::runnable` so the
410    /// in-wave `BatchGuard` drain gate (`is_runnable()`) also fires
411    /// when only a `Defer` (no id-`MailboxOp`) was posted mid-wave
412    /// (the reactive describe/observe `DepsChanged` path, D246 r6).
413    /// `Cell<bool>` (D254): owner-thread-only — see the struct doc.
414    runnable: Cell<bool>,
415}
416
417impl DeferQueue {
418    /// A fresh, open, empty owner-side defer queue.
419    #[must_use]
420    pub fn new() -> Self {
421        Self {
422            q: RefCell::new(VecDeque::new()),
423            closed: Cell::new(false),
424            runnable: Cell::new(false),
425        }
426    }
427
428    /// Whether the queue currently holds work (the in-wave drain gate).
429    /// `#[must_use]` (/qa m4) — a discarded result silently loses the
430    /// drain-gate signal.
431    #[must_use = "is_runnable is the in-wave drain gate; ignoring it loses the signal"]
432    pub fn is_runnable(&self) -> bool {
433        self.runnable.get()
434    }
435
436    /// Enqueue an owner-side deferred closure. Returns `false` iff the
437    /// owning `Core` has dropped ([`Self::close`]) — the caller's
438    /// closure is dropped unrun (same contract as the old
439    /// `CoreMailbox::post_defer`). Owner-thread-only by D248/D249
440    /// construction — see the struct doc; no cross-thread TOCTOU
441    /// against `close` is possible because both run on the one owner
442    /// thread and never overlap.
443    ///
444    /// `#[must_use]` (/qa m3) — `false` is the load-bearing signal that
445    /// the caller's closure was dropped unrun; ignoring it can mask
446    /// teardown races where work was silently discarded.
447    #[must_use = "false means the queue is closed and the closure was dropped unrun"]
448    pub fn post(&self, f: DeferFn) -> bool {
449        if self.closed.get() {
450            return false;
451        }
452        self.q.borrow_mut().push_back(f);
453        self.runnable.set(true);
454        true
455    }
456
457    /// Owner-side drain. Pops every queued closure FIFO and applies it
458    /// (re-entrancy-safe: a closure may re-`post`; a later drain picks
459    /// it up). `max_ops` bounds one drain against a self-re-posting
460    /// livelock (mirrors `CoreMailbox::drain_into`).
461    ///
462    /// **Re-entry note (/qa M6).** Under the pre-D254
463    /// `parking_lot::Mutex<VecDeque<DeferFn>>` shape, an owner-thread
464    /// re-entrant `drain_into` from inside `apply(f)` would have
465    /// **deadlocked** at the inner lock acquire. Under the D254
466    /// `RefCell<VecDeque<DeferFn>>` shape it would **panic with
467    /// `BorrowMutError`** instead — different failure mode, same
468    /// fail-loud outcome. In practice the pre-D254 design never had any
469    /// in-tree re-entrant drain caller (the embedder pump's
470    /// `drain_mailbox` is never invoked recursively from within an
471    /// `apply` closure body — `drain_mailbox` is the embedder seam, not
472    /// a `DeferFn` capture target), so neither failure mode is
473    /// reachable from in-tree code. Documented for any future binding
474    /// that might construct one.
475    ///
476    /// # Panics
477    /// Panics if a single drain applies `max_ops` closures — a `Defer`
478    /// closure re-posting itself every application (owner-side
479    /// livelock), the defer-queue analogue of a fn that emits to itself.
480    pub fn drain_into(&self, max_ops: u32, mut apply: impl FnMut(DeferFn)) {
481        let mut applied = 0u32;
482        // /qa M3 RAII: clear `runnable` on panic unwind. Mirrors
483        // `CoreMailbox::drain_into`'s panic-aware clear shape, except
484        // the cell-based discipline lets the guard `Cell::set(false)`
485        // directly with no lock to acquire.
486        let _runnable_clear = DeferRunnableClearOnPanic {
487            q: &self.q,
488            runnable: &self.runnable,
489        };
490        loop {
491            // Borrow `q` only to pop (closures must run with the borrow
492            // released — `apply(f)` may itself re-`post` on this same
493            // queue, requiring `borrow_mut()` again; nested mut borrow
494            // would panic). Mirrors the lock-release shape of the old
495            // `parking_lot::Mutex` version exactly.
496            let f = {
497                let mut q = self.q.borrow_mut();
498                let Some(f) = q.pop_front() else {
499                    // Clear `runnable` while we hold the borrow (owner-
500                    // thread-only ⇒ no concurrent `post` to race; this
501                    // pairs with `is_runnable` post-drain).
502                    self.runnable.set(false);
503                    return;
504                };
505                f
506            };
507            applied += 1;
508            assert!(
509                applied < max_ops,
510                "defer-queue drain exceeded {max_ops} closures in one \
511                 drain — a Defer closure is re-posting itself every \
512                 application (owner-side livelock)."
513            );
514            apply(f);
515        }
516    }
517
518    /// Mark the owning `Core` gone. Idempotent.
519    ///
520    /// **Drop timing of queued closures (QA, 2026-05-19).** This method
521    /// only flips `closed` so subsequent `post` calls drop their
522    /// closures unrun (`closed == true` short-circuits enqueue) and
523    /// running `CoreFull` on a half-dropped `Core` is structurally
524    /// impossible. The *already-queued* closures are NOT drained here;
525    /// they remain in `q` and are dropped when the last `Rc<DeferQueue>`
526    /// clone drops (every `ProducerEmitter` / graph reactive handle
527    /// holds one — D249). Since closures by contract capture no
528    /// `HandleId` (D235 P8 / D246 r8 sites), this is leak-free for
529    /// refcount purposes; callers MUST honour the no-`HandleId`-in-
530    /// `DeferFn` discipline (see [`DeferFn`]'s doc) for this to hold.
531    pub fn close(&self) {
532        self.closed.set(true);
533    }
534}
535
536impl Default for DeferQueue {
537    fn default() -> Self {
538        Self::new()
539    }
540}
541
542// /qa M4 (2026-05-19): `DeferQueue` MUST stay `!Send + !Sync` by
543// construction (owner-thread-only by D248/D249; the `Rc<DeferQueue>`
544// is never sent across threads, the closures it holds are `!Send`).
545// Lock the invariant in the type system so a future field that
546// silently widens the trait bounds (e.g., swapping `RefCell` for
547// `Mutex` "for safety") breaks the build instead of the actor-model
548// invariant.
549static_assertions::assert_not_impl_any!(DeferQueue: Send, Sync);
550
551/// RAII guard that clears [`CoreMailbox::runnable`] on panic unwind if
552/// the queue is empty at unwind time (/qa M3). Outside of unwind, the
553/// explicit `runnable.store(false, Release)` on the empty-pop path of
554/// [`CoreMailbox::drain_into`] handles the normal-exit clear under the
555/// `ops` lock (QA F-#4 lost-wakeup discipline). On panic unwind from
556/// inside `apply(f)`, this guard's `Drop` re-acquires the `ops` lock,
557/// checks whether the queue is empty, and clears `runnable` if so —
558/// if non-empty, leaves `runnable=true` so the next drain picks up the
559/// remaining work, matching the QA F-#4 race discipline.
560struct MailboxRunnableClearOnPanic<'a> {
561    ops: &'a parking_lot::Mutex<VecDeque<MailboxOp>>,
562    runnable: &'a AtomicBool,
563}
564
565impl Drop for MailboxRunnableClearOnPanic<'_> {
566    fn drop(&mut self) {
567        if std::thread::panicking() {
568            let q = self.ops.lock();
569            if q.is_empty() {
570                self.runnable.store(false, Ordering::Release);
571            }
572        }
573    }
574}
575
576/// RAII guard that clears [`DeferQueue::runnable`] on panic unwind if
577/// the queue is empty at unwind time (/qa M3). Owner-thread-only by
578/// D248/D249/D254 construction — no lock to acquire; the cell-based
579/// discipline lets the guard `Cell::set(false)` directly. Mirrors
580/// [`MailboxRunnableClearOnPanic`]'s shape against the relaxed
581/// `RefCell` + `Cell` primitives. If the queue is non-empty at unwind
582/// time, leaves `runnable=true` so the next drain picks up the work.
583struct DeferRunnableClearOnPanic<'a> {
584    q: &'a std::cell::RefCell<VecDeque<DeferFn>>,
585    runnable: &'a Cell<bool>,
586}
587
588impl Drop for DeferRunnableClearOnPanic<'_> {
589    fn drop(&mut self) {
590        if std::thread::panicking() {
591            // `borrow` is safe even mid-panic — the only borrow_mut
592            // sites are this drain loop (which holds the borrow only
593            // for the pop), and `post` (which is on the same owner
594            // thread — single-thread `RefCell` ⇒ no concurrent borrow).
595            // If a closure body panicked DURING the `borrow_mut` scope
596            // (between `borrow_mut()` and the let-binding drop) the
597            // borrow is held; `try_borrow` returns Err and we skip the
598            // clear — safe (next post re-sets runnable; next drain
599            // pops + re-observes).
600            if let Ok(q) = self.q.try_borrow() {
601                if q.is_empty() {
602                    self.runnable.set(false);
603                }
604            }
605        }
606    }
607}