Skip to main content

graphrefly_core/
mailbox.rs

1//! `CoreMailbox` — the `Send + Sync` bridge between autonomous async
2//! producers (timer tasks) and an owned, relocatable [`crate::node::Core`].
3//!
4//! # Why this exists (D223 / D225 / D227 / D230)
5//!
6//! Under the actor / work-stealing model (D221) a `Core` owns its state
7//! cell **by value** and moves between workers — so a long-lived async
8//! task (e.g. a `tokio::spawn`-ed timer loop) can no longer hold
9//! `&Core` / `Arc<C>` / `Weak<C>` to call `Core::emit` directly (that
10//! was the deleted `WeakCore` path; D223 forbids `Weak<C>` back-refs).
11//!
12//! Instead the task holds an `Arc<CoreMailbox>` (`Send + Sync`) and
13//! **posts** a `(NodeId, HandleId)` emit request. The mailbox is drained
14//! **owner-side** by the synchronous [`crate::node::Core::drain_mailbox`]
15//! (applied via the existing sync `Core::emit` — *no async in Core*,
16//! honoring the locked "Core never async" invariant). The drain point is
17//! the embedder's existing advance/pump site (test harness `TestRuntime`
18//! advance helper, napi pump): timer tasks already require the host
19//! runtime to be advanced before they fire, so draining there is
20//! **behaviour-identical** to the old autonomous `Weak::upgrade →
21//! core.emit` (D230).
22//!
23//! # Shape (D227 full)
24//!
25//! - **Op queue** — FIFO of [`MailboxOp`]s, applied owner-side.
26//! - **`closed`** — set when the owning `Core` drops; a timer task that
27//!   observes it releases its pending handle and bails (mirrors the old
28//!   `Weak::upgrade() == None` teardown path exactly).
29//! - **`runnable`** — the per-group "this Core has work" wake bit. S2b
30//!   lands the field + sets/clears it (D227 builds the *full* mailbox
31//!   shape now); S4 wires it to wave drain-scoping + finalize, and the
32//!   host-executor (M6) reads it to schedule the owning worker. The
33//!   in-wave drain (`BatchGuard::drain_and_flush`) already gates on it
34//!   so the no-producer §7 floor pays one atomic load, not a mutex.
35//!
36//! # This is ONE mechanism (not six)
37//!
38//! The S2b design history records six producer "forks" (D227–D233);
39//! that is *design-question* count, not runtime surface. What actually
40//! exists is **one** owner-side re-entry mechanism:
41//!
42//! > A `Send + Sync` FIFO of deferred [`MailboxOp`]s, drained on the
43//! > owner thread — **in-wave to quiescence** for producer sinks
44//! > (`BatchGuard::drain_and_flush`) and at the embedder pump point for
45//! > autonomous timer tasks ([`crate::node::Core::drain_mailbox`]) —
46//! > each op applied via the one object-safe owner re-entry surface
47//! > [`crate::node::CoreFull`].
48//!
49//! `MailboxOp` keeps a **typed fast path** (`Emit`/`Complete`/`Error`:
50//! zero-alloc, and a deterministic `Core`-gone handle-release contract —
51//! see the per-kind `post_*`) plus a **`Defer` escape hatch** (a boxed
52//! `FnOnce(&dyn CoreFull)` for value-returning topology mutation:
53//! windowing / higher-order inner subscribe). Collapsing the enum to
54//! pure `Defer` was considered and rejected — it would heap-allocate per
55//! timer/producer emit AND lose the typed `Core`-gone release affordance
56//! (a dropped `FnOnce` can't run an else-branch). `ProducerEmitter`
57//! (`graphrefly-operators`) is thin sugar over `post_*`; the producer
58//! *build* closure isn't a mechanism at all — it uses the `&Core` its
59//! `ProducerCtx` already lends it (D231). One queue, one drain loop, one
60//! `match`, one re-entry trait.
61
62use std::cell::{Cell, RefCell};
63use std::collections::VecDeque;
64use std::sync::atomic::{AtomicBool, Ordering};
65
66use crate::handle::{HandleId, NodeId};
67
68/// **`Send`** cross-thread deferred closure (D233; D249/S2c). Posted
69/// by any cross-thread `Send` producer (canonically an autonomous
70/// timer task — `temporal.rs` `window_time`/etc., a `tokio::spawn`ed
71/// cross-thread task whose defer closure captures only `Send` state:
72/// `Arc<Mutex<NodeId>>` / `Arc<dyn BindingBoundary>` / ids — but the
73/// API admits any future cross-thread producer with the same `Send`
74/// capture discipline, e.g. a napi/pyo3 binding-layer pump) and
75/// applied owner-side. Rides the `Send + Sync` [`CoreMailbox`]
76/// (cross-thread post side, drained owner-side via `drain_mailbox`).
77pub type SendDeferFn = Box<dyn FnOnce(&dyn crate::node::CoreFull) + Send>;
78
79/// **`!Send`** owner-side deferred closure (D248/D249/S2c). Posted by
80/// an owner-side in-wave producer/graph sink whose closure captures
81/// `!Send` state (a relaxed `Sink` / `Rc<RefCell<GraphInner>>` —
82/// D248). Lives in the owner-only [`DeferQueue`], **never** the
83/// cross-thread [`CoreMailbox`].
84pub type DeferFn = Box<dyn FnOnce(&dyn crate::node::CoreFull)>;
85
86/// A re-entry request posted to the [`CoreMailbox`] by an autonomous
87/// async producer (timer task → `Emit`) or by a producer-operator sink
88/// (D232-AMEND/A′ → `Emit`/`Complete`/`Error`). Applied owner-side via
89/// the sync `Core::{emit,complete,error}` by [`crate::node::Core::drain_mailbox`]
90/// — drained **in-wave to quiescence** by the `BatchGuard` drain loop
91/// for producer sinks (immediate, cascade-ordering-preserving), and at
92/// the embedder pump point for timer tasks (D230).
93pub enum MailboxOp {
94    /// `Core::emit(node, handle)`. Posted by timer tasks + producer sinks.
95    Emit(NodeId, HandleId),
96    /// `Core::complete(node)`. Posted by producer sinks.
97    Complete(NodeId),
98    /// `Core::error(node, handle)`. Posted by producer sinks.
99    Error(NodeId, HandleId),
100    /// **`Send`** owner-side closure (D233; D249/S2c). Posted by a
101    /// cross-thread timer task (`temporal.rs` `window_time`/etc.) whose
102    /// closure captures only `Send` state; applied **in-wave** by the
103    /// drain loop (the owner holds `&Core`). The `!Send` owner-side
104    /// sink defers (graph describe/observe, control/higher-order
105    /// dynamic-inner) go to the separate owner-only [`DeferQueue`]
106    /// instead — D248/D249.
107    Defer(SendDeferFn),
108}
109
110impl std::fmt::Debug for MailboxOp {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        match self {
113            Self::Emit(n, h) => write!(f, "Emit({n:?}, {h:?})"),
114            Self::Complete(n) => write!(f, "Complete({n:?})"),
115            Self::Error(n, h) => write!(f, "Error({n:?}, {h:?})"),
116            Self::Defer(_) => write!(f, "Defer(<send closure>)"),
117        }
118    }
119}
120
121/// `Send + Sync` mailbox bridging autonomous async producers to an owned,
122/// relocatable [`crate::node::Core`]. Held behind an `Arc`; the `Core`
123/// owns one clone, each timer task another. See the module docs.
124pub struct CoreMailbox {
125    /// FIFO of [`MailboxOp`]s posted by timer tasks (`Emit`) and
126    /// producer-operator sinks (`Emit`/`Complete`/`Error`), applied
127    /// owner-side by [`crate::node::Core::drain_mailbox`] via the sync
128    /// `Core::{emit,complete,error}`.
129    ops: parking_lot::Mutex<VecDeque<MailboxOp>>,
130    /// Set by [`Self::close`] when the owning `Core` drops. A timer task
131    /// observing `true` from [`Self::post_emit`] is told to release its
132    /// pending handle and bail (the old `Weak::upgrade() == None` path).
133    closed: AtomicBool,
134    /// "This Core has queued work" wake bit (D227 full shape;
135    /// **finalized at S4**). Set on every successful [`Self::post_op`];
136    /// cleared by [`Self::drain_into`]/[`Self::take_all`] once the queue
137    /// empties (under the `ops` lock — QA F-#4 lost-wakeup discipline).
138    ///
139    /// **Actor-model granularity (S4, D246/D248/D249).** `Core` is
140    /// single-owner `!Send + !Sync`; in the actor model one worker owns
141    /// exactly one `Core`, so this Core-wide bit **is** that worker's
142    /// per-group runnable-wake (the worker's Core hosts its declared
143    /// `SchedulingGroupId`(s); a finer per-`SchedulingGroupId` sub-bit
144    /// has no consumer — M6's per-binding group executor schedules at
145    /// the Core/worker grain — so adding one would be speculative
146    /// substrate surface, D196/D246-ignore-legacy, cf. D250). It is the
147    /// only cross-thread bridge into a `!Send` Core: timer/producer
148    /// tasks `post_*` + signal; the owner drains
149    /// ([`crate::node::Core::drain_mailbox`] / the in-wave
150    /// `BatchGuard`). M6 (deferred) reads [`Self::is_runnable`] from the
151    /// host executor to decide when to poll a worker's Core.
152    ///
153    /// QA F12 (2026-05-19): if a future per-`SchedulingGroupId` sub-bit
154    /// is ever added, it MUST be split in lockstep across BOTH this
155    /// `CoreMailbox.runnable` AND `DeferQueue.runnable` — the in-wave
156    /// drain gate (`BatchGuard::drain_and_flush`) ORs the two, and a
157    /// half-split would silently lose wakeups for the unsplit queue.
158    runnable: AtomicBool,
159}
160
161impl CoreMailbox {
162    /// A fresh, open, empty mailbox.
163    #[must_use]
164    pub fn new() -> Self {
165        Self {
166            ops: parking_lot::Mutex::new(VecDeque::new()),
167            closed: AtomicBool::new(false),
168            runnable: AtomicBool::new(false),
169        }
170    }
171
172    /// Post a timer-fired emit request. Returns `false` iff the owning
173    /// `Core` has already dropped ([`Self::close`] was called) — the
174    /// caller MUST then release `handle` and stop (mirrors the old
175    /// `WeakCore::upgrade() == None` teardown branch in `timer.rs`).
176    /// Returns `true` when queued (and sets the `runnable` wake bit).
177    #[must_use]
178    pub fn post_emit(&self, node_id: NodeId, handle: HandleId) -> bool {
179        self.post_op(MailboxOp::Emit(node_id, handle))
180    }
181
182    /// Post a producer-sink `Complete` (D232-AMEND/A′). Returns `false`
183    /// iff the owning `Core` is gone (caller stops; nothing to release).
184    #[must_use]
185    pub fn post_complete(&self, node_id: NodeId) -> bool {
186        self.post_op(MailboxOp::Complete(node_id))
187    }
188
189    /// Post a producer-sink `Error` (D232-AMEND/A′). Returns `false` iff
190    /// the owning `Core` is gone — the caller MUST then release
191    /// `handle` (it owned a retain for the would-be `error` payload).
192    #[must_use]
193    pub fn post_error(&self, node_id: NodeId, handle: HandleId) -> bool {
194        self.post_op(MailboxOp::Error(node_id, handle))
195    }
196
197    /// Post a **`Send`** cross-thread `Defer` (D249/S2c). For an
198    /// autonomous timer task whose closure captures only `Send` state
199    /// (`temporal.rs` `window_time`/etc.). Returns `false` iff the
200    /// owning `Core` is gone — the closure is dropped unrun. The
201    /// `!Send` owner-side sink defers use [`DeferQueue::post`] instead.
202    #[must_use]
203    pub fn post_defer(&self, f: SendDeferFn) -> bool {
204        self.post_op(MailboxOp::Defer(f))
205    }
206
207    /// Post a [`MailboxOp`]. Returns `false` iff the owning `Core` has
208    /// already dropped ([`Self::close`]) — see the per-kind wrappers for
209    /// the caller's handle-release obligation.
210    ///
211    /// QA F-A (2026-05-18): the `closed` check and the `push_back` are
212    /// performed **in one `ops`-lock critical section** so a concurrent
213    /// owner-thread `close()` (which also takes `ops`) cannot interleave
214    /// between "observed not-closed" and "enqueued" — that TOCTOU would
215    /// strand the op (with its retained `HandleId`) in a queue
216    /// `Drop for Core` already walked → leak. `close()` takes the same
217    /// lock, so the two are mutually exclusive.
218    #[must_use]
219    pub fn post_op(&self, op: MailboxOp) -> bool {
220        let mut q = self.ops.lock();
221        if self.closed.load(Ordering::Acquire) {
222            return false;
223        }
224        q.push_back(op);
225        self.runnable.store(true, Ordering::Release);
226        true
227    }
228
229    /// Owner-side drain. Pops every queued [`MailboxOp`] in FIFO order
230    /// and hands each to `apply` (the caller passes a closure over the
231    /// sync `Core::{emit,complete,error}`). Re-entrancy: `apply` may
232    /// itself cascade and a concurrent timer task / re-entrant sink may
233    /// post again — a fresh post re-sets `runnable`, so the enclosing
234    /// drain-to-quiescence loop (or a later drain) picks it up.
235    ///
236    /// QA F-#4 (2026-05-18): the empty observation and the
237    /// `runnable = false` store happen **in the same `ops`-lock critical
238    /// section. Previously the empty `pop_front` released the lock before
239    /// storing `false`, so a concurrent `post_op` (which takes `ops`)
240    /// could push and set `runnable=true` in between, then our `false`
241    /// store would clobber it — a lost wakeup invisible to the
242    /// `is_runnable`-gated in-wave drain and the S4/M6 scheduler.
243    /// Clearing `runnable` while still holding the lock every `post_op`
244    /// must take orders them: a post is either popped here or runs
245    /// strictly after the `false` store and re-sets `true`.
246    /// `max_ops` bounds a single drain (QA P3, 2026-05-18): `apply` may
247    /// re-post (a `Defer` that re-`defer`s, a producer re-subscribing),
248    /// which this loop correctly drains in the *same* call — but a
249    /// closure that re-posts itself on *every* application is an
250    /// unbounded mailbox livelock (the producer-authoring analogue of a
251    /// fn that emits to itself). That livelock lives HERE (the inner
252    /// drain loop), not in `drain_and_flush`'s fire-cascade `cap`, so it
253    /// is bounded + panics here — decoupled from the fire counter so a
254    /// legitimately large finite producer drain never false-trips the
255    /// fire `cap`.
256    ///
257    /// /qa M3 (2026-05-19): a panic from `apply(f)` mid-drain previously
258    /// unwound out of this loop with `runnable` still `true` — under the
259    /// `parking_lot::Mutex` shape that was self-correcting (the next
260    /// drain pop would re-observe + clear), but downstream
261    /// `is_runnable()` gates would spuriously return `true` until then.
262    /// Wraps the empty-queue clear in a `RunnableClearGuard` so an
263    /// `apply` panic still clears `runnable` on unwind: if the queue is
264    /// empty at unwind time the cell is reset; if the queue is
265    /// non-empty the next post would re-set it anyway. Tightens the
266    /// scheduler-wakeup contract under panic.
267    ///
268    /// # Panics
269    ///
270    /// Panics if more than `max_ops` ops are applied in one drain — that
271    /// indicates a producer / `Defer` op re-posting itself every
272    /// application (livelock guard). The default cap is sized for
273    /// realistic cascades; bump via the corresponding setter if your
274    /// workload has evidence it needs more.
275    pub fn drain_into(&self, max_ops: u32, mut apply: impl FnMut(MailboxOp)) {
276        let mut applied = 0u32;
277        // /qa M3 RAII: clear `runnable` on panic unwind if the queue
278        // drains empty during unwind. No-op on the normal exit path —
279        // the explicit `runnable.store(false)` below the empty `pop_front`
280        // still races-free under the `ops` lock (QA F-#4).
281        let _runnable_clear = MailboxRunnableClearOnPanic {
282            ops: &self.ops,
283            runnable: &self.runnable,
284        };
285        loop {
286            let op = {
287                let mut q = self.ops.lock();
288                let Some(op) = q.pop_front() else {
289                    self.runnable.store(false, Ordering::Release);
290                    return;
291                };
292                op
293            };
294            applied += 1;
295            assert!(
296                applied < max_ops,
297                "mailbox drain exceeded {max_ops} ops in one drain — a \
298                 producer/Defer op is re-posting itself every application \
299                 (mailbox livelock). Tune via \
300                 Core::set_max_batch_drain_iterations only with concrete \
301                 evidence the workload needs more."
302            );
303            apply(op);
304        }
305    }
306
307    /// Whether the mailbox currently holds queued work (the wake bit).
308    /// Advisory pre-M6 (the embedder pump drains unconditionally); S4/M6
309    /// consumers gate scheduling on it. `#[must_use]` (/qa m4) — a
310    /// discarded result silently loses the scheduling signal.
311    #[must_use = "is_runnable is the scheduling wake-bit; ignoring it loses the signal"]
312    pub fn is_runnable(&self) -> bool {
313        self.runnable.load(Ordering::Acquire)
314    }
315
316    /// Mark the owning `Core` gone. Idempotent. Takes the `ops` lock so
317    /// it is mutually exclusive with [`Self::post_op`]'s under-lock
318    /// `closed` check (QA F-A): after this returns, no further `post_op`
319    /// can enqueue. Callers MUST then [`Self::take_all`] and release any
320    /// `Emit`/`Error` payload handles still queued (a TOCTOU-enqueued op
321    /// posted just before `close` won the lock).
322    pub fn close(&self) {
323        let _q = self.ops.lock();
324        self.closed.store(true, Ordering::Release);
325    }
326
327    /// Drain and return every still-queued [`MailboxOp`] without
328    /// applying it — for `Drop for Core` teardown (QA F-A / Blind #2).
329    /// `Emit`/`Error` ops carry a retained `HandleId` the caller must
330    /// release; `Defer` closures are dropped unrun (running `CoreFull`
331    /// on a half-dropped `Core` is unsound — user-locked QA decision A,
332    /// 2026-05-18). Clears `runnable` under the lock (same race
333    /// discipline as `drain_into`).
334    #[must_use]
335    pub fn take_all(&self) -> VecDeque<MailboxOp> {
336        let mut q = self.ops.lock();
337        // QA F11 (2026-05-19): contract gate — `take_all` MUST be
338        // preceded by `close()` so no further `post_op` can enqueue
339        // after this returns (a post racing a standalone `take_all`
340        // would otherwise strand an op with its retained `HandleId`
341        // in a mailbox the caller assumes is empty). The one in-tree
342        // caller (`Drop for Core`) sequences `close()` first.
343        debug_assert!(
344            self.closed.load(Ordering::Acquire),
345            "CoreMailbox::take_all must be called after close() — see \
346             docstring contract"
347        );
348        self.runnable.store(false, Ordering::Release);
349        std::mem::take(&mut *q)
350    }
351
352    /// Whether [`Self::close`] has been called.
353    #[must_use]
354    pub fn is_closed(&self) -> bool {
355        self.closed.load(Ordering::Acquire)
356    }
357}
358
359impl Default for CoreMailbox {
360    fn default() -> Self {
361        Self::new()
362    }
363}
364
365// `CoreMailbox` is `Send + Sync` by construction (parking_lot::Mutex +
366// atomics over the id-only `MailboxOp`). Asserted so a future field
367// that breaks it fails here, not at the `Arc<CoreMailbox>` share site
368// in `timer.rs`. D249/S2c: this MUST hold — the `!Send` owner-side
369// `Defer` payload now lives in [`DeferQueue`], NOT here.
370const _: fn() = || {
371    fn assert_send_sync<T: Send + Sync>() {}
372    assert_send_sync::<CoreMailbox>();
373};
374
375/// Owner-only deferred-closure queue (D249/S2c — the minimal Defer
376/// split pulled out of [`CoreMailbox`]; D254 (S5) relaxed off cross-
377/// thread primitives).
378///
379/// Under D248 full single-owner the substrate `Sink`/`TopologySink`
380/// dropped `Send + Sync`, so a [`DeferFn`] (which captures relaxed
381/// `Sink`s / `Rc<RefCell<GraphInner>>`) is `!Send`. It cannot ride the
382/// `Send + Sync` [`CoreMailbox`] (that bridges the `timer.rs`
383/// cross-thread `Arc<CoreMailbox>` post side). This queue is therefore
384/// **owner-only and `!Send`**: held behind an `Rc` shared between
385/// [`crate::node::Core`] and `graphrefly-operators`' `ProducerEmitter`
386/// on the one owner thread. Drained owner-side by
387/// [`crate::node::Core::drain_mailbox`] (after the id-mailbox) and the
388/// in-wave `BatchGuard` drain.
389///
390/// **Owner-thread-only by construction (D254 (S5), 2026-05-19).** The
391/// `Rc<DeferQueue>` is never sent across threads (the closures it holds
392/// are `!Send`), so the cross-thread primitives it used to carry —
393/// `parking_lot::Mutex<VecDeque<DeferFn>>` for `q` and `AtomicBool` for
394/// `runnable` — were unused capacity. D254 collapses them to
395/// `RefCell<VecDeque<DeferFn>>` + `Cell<bool>`, dropping the
396/// `parking_lot::Mutex::lock` acquire on every owner-side post/drain
397/// (the hottest D251 path). `closed` similarly drops to `Cell<bool>` —
398/// it is set by `Drop for Core` on the owner thread (the only `close()`
399/// call site) and read by `post`/`drain_into` on the same thread.
400///
401/// S4 still does the per-group-wake + typed snapshot/prune `MailboxOp`
402/// reshape (D246 rule 8) — D249 is the acknowledged minimal first
403/// touch that lets the D248 single-owner Sink relaxation land in S2c.
404pub struct DeferQueue {
405    q: RefCell<VecDeque<DeferFn>>,
406    /// Set by [`Self::close`] (owner-side, called from `Drop for Core`).
407    /// `Cell<bool>` (D254): owner-thread-only by D248/D249 construction,
408    /// no cross-thread reader.
409    closed: Cell<bool>,
410    /// "Has queued work" bit — mirrors `CoreMailbox::runnable` so the
411    /// in-wave `BatchGuard` drain gate (`is_runnable()`) also fires
412    /// when only a `Defer` (no id-`MailboxOp`) was posted mid-wave
413    /// (the reactive describe/observe `DepsChanged` path, D246 r6).
414    /// `Cell<bool>` (D254): owner-thread-only — see the struct doc.
415    runnable: Cell<bool>,
416}
417
418impl DeferQueue {
419    /// A fresh, open, empty owner-side defer queue.
420    #[must_use]
421    pub fn new() -> Self {
422        Self {
423            q: RefCell::new(VecDeque::new()),
424            closed: Cell::new(false),
425            runnable: Cell::new(false),
426        }
427    }
428
429    /// Whether the queue currently holds work (the in-wave drain gate).
430    /// `#[must_use]` (/qa m4) — a discarded result silently loses the
431    /// drain-gate signal.
432    #[must_use = "is_runnable is the in-wave drain gate; ignoring it loses the signal"]
433    pub fn is_runnable(&self) -> bool {
434        self.runnable.get()
435    }
436
437    /// Enqueue an owner-side deferred closure. Returns `false` iff the
438    /// owning `Core` has dropped ([`Self::close`]) — the caller's
439    /// closure is dropped unrun (same contract as the old
440    /// `CoreMailbox::post_defer`). Owner-thread-only by D248/D249
441    /// construction — see the struct doc; no cross-thread TOCTOU
442    /// against `close` is possible because both run on the one owner
443    /// thread and never overlap.
444    ///
445    /// `#[must_use]` (/qa m3) — `false` is the load-bearing signal that
446    /// the caller's closure was dropped unrun; ignoring it can mask
447    /// teardown races where work was silently discarded.
448    #[must_use = "false means the queue is closed and the closure was dropped unrun"]
449    pub fn post(&self, f: DeferFn) -> bool {
450        if self.closed.get() {
451            return false;
452        }
453        self.q.borrow_mut().push_back(f);
454        self.runnable.set(true);
455        true
456    }
457
458    /// Owner-side drain. Pops every queued closure FIFO and applies it
459    /// (re-entrancy-safe: a closure may re-`post`; a later drain picks
460    /// it up). `max_ops` bounds one drain against a self-re-posting
461    /// livelock (mirrors `CoreMailbox::drain_into`).
462    ///
463    /// **Re-entry note (/qa M6).** Under the pre-D254
464    /// `parking_lot::Mutex<VecDeque<DeferFn>>` shape, an owner-thread
465    /// re-entrant `drain_into` from inside `apply(f)` would have
466    /// **deadlocked** at the inner lock acquire. Under the D254
467    /// `RefCell<VecDeque<DeferFn>>` shape it would **panic with
468    /// `BorrowMutError`** instead — different failure mode, same
469    /// fail-loud outcome. In practice the pre-D254 design never had any
470    /// in-tree re-entrant drain caller (the embedder pump's
471    /// `drain_mailbox` is never invoked recursively from within an
472    /// `apply` closure body — `drain_mailbox` is the embedder seam, not
473    /// a `DeferFn` capture target), so neither failure mode is
474    /// reachable from in-tree code. Documented for any future binding
475    /// that might construct one.
476    ///
477    /// # Panics
478    /// Panics if a single drain applies `max_ops` closures — a `Defer`
479    /// closure re-posting itself every application (owner-side
480    /// livelock), the defer-queue analogue of a fn that emits to itself.
481    pub fn drain_into(&self, max_ops: u32, mut apply: impl FnMut(DeferFn)) {
482        let mut applied = 0u32;
483        // /qa M3 RAII: clear `runnable` on panic unwind. Mirrors
484        // `CoreMailbox::drain_into`'s panic-aware clear shape, except
485        // the cell-based discipline lets the guard `Cell::set(false)`
486        // directly with no lock to acquire.
487        let _runnable_clear = DeferRunnableClearOnPanic {
488            q: &self.q,
489            runnable: &self.runnable,
490        };
491        loop {
492            // Borrow `q` only to pop (closures must run with the borrow
493            // released — `apply(f)` may itself re-`post` on this same
494            // queue, requiring `borrow_mut()` again; nested mut borrow
495            // would panic). Mirrors the lock-release shape of the old
496            // `parking_lot::Mutex` version exactly.
497            let f = {
498                let mut q = self.q.borrow_mut();
499                let Some(f) = q.pop_front() else {
500                    // Clear `runnable` while we hold the borrow (owner-
501                    // thread-only ⇒ no concurrent `post` to race; this
502                    // pairs with `is_runnable` post-drain).
503                    self.runnable.set(false);
504                    return;
505                };
506                f
507            };
508            applied += 1;
509            assert!(
510                applied < max_ops,
511                "defer-queue drain exceeded {max_ops} closures in one \
512                 drain — a Defer closure is re-posting itself every \
513                 application (owner-side livelock)."
514            );
515            apply(f);
516        }
517    }
518
519    /// Mark the owning `Core` gone. Idempotent.
520    ///
521    /// **Drop timing of queued closures (QA, 2026-05-19).** This method
522    /// only flips `closed` so subsequent `post` calls drop their
523    /// closures unrun (`closed == true` short-circuits enqueue) and
524    /// running `CoreFull` on a half-dropped `Core` is structurally
525    /// impossible. The *already-queued* closures are NOT drained here;
526    /// they remain in `q` and are dropped when the last `Rc<DeferQueue>`
527    /// clone drops (every `ProducerEmitter` / graph reactive handle
528    /// holds one — D249). Since closures by contract capture no
529    /// `HandleId` (D235 P8 / D246 r8 sites), this is leak-free for
530    /// refcount purposes; callers MUST honour the no-`HandleId`-in-
531    /// `DeferFn` discipline (see [`DeferFn`]'s doc) for this to hold.
532    pub fn close(&self) {
533        self.closed.set(true);
534    }
535}
536
537impl Default for DeferQueue {
538    fn default() -> Self {
539        Self::new()
540    }
541}
542
543// /qa M4 (2026-05-19): `DeferQueue` MUST stay `!Send + !Sync` by
544// construction (owner-thread-only by D248/D249; the `Rc<DeferQueue>`
545// is never sent across threads, the closures it holds are `!Send`).
546// Lock the invariant in the type system so a future field that
547// silently widens the trait bounds (e.g., swapping `RefCell` for
548// `Mutex` "for safety") breaks the build instead of the actor-model
549// invariant.
550static_assertions::assert_not_impl_any!(DeferQueue: Send, Sync);
551
552/// RAII guard that clears [`CoreMailbox::runnable`] on panic unwind if
553/// the queue is empty at unwind time (/qa M3). Outside of unwind, the
554/// explicit `runnable.store(false, Release)` on the empty-pop path of
555/// [`CoreMailbox::drain_into`] handles the normal-exit clear under the
556/// `ops` lock (QA F-#4 lost-wakeup discipline). On panic unwind from
557/// inside `apply(f)`, this guard's `Drop` re-acquires the `ops` lock,
558/// checks whether the queue is empty, and clears `runnable` if so —
559/// if non-empty, leaves `runnable=true` so the next drain picks up the
560/// remaining work, matching the QA F-#4 race discipline.
561struct MailboxRunnableClearOnPanic<'a> {
562    ops: &'a parking_lot::Mutex<VecDeque<MailboxOp>>,
563    runnable: &'a AtomicBool,
564}
565
566impl Drop for MailboxRunnableClearOnPanic<'_> {
567    fn drop(&mut self) {
568        if std::thread::panicking() {
569            let q = self.ops.lock();
570            if q.is_empty() {
571                self.runnable.store(false, Ordering::Release);
572            }
573        }
574    }
575}
576
577/// RAII guard that clears [`DeferQueue::runnable`] on panic unwind if
578/// the queue is empty at unwind time (/qa M3). Owner-thread-only by
579/// D248/D249/D254 construction — no lock to acquire; the cell-based
580/// discipline lets the guard `Cell::set(false)` directly. Mirrors
581/// [`MailboxRunnableClearOnPanic`]'s shape against the relaxed
582/// `RefCell` + `Cell` primitives. If the queue is non-empty at unwind
583/// time, leaves `runnable=true` so the next drain picks up the work.
584struct DeferRunnableClearOnPanic<'a> {
585    q: &'a std::cell::RefCell<VecDeque<DeferFn>>,
586    runnable: &'a Cell<bool>,
587}
588
589impl Drop for DeferRunnableClearOnPanic<'_> {
590    fn drop(&mut self) {
591        if std::thread::panicking() {
592            // `borrow` is safe even mid-panic — the only borrow_mut
593            // sites are this drain loop (which holds the borrow only
594            // for the pop), and `post` (which is on the same owner
595            // thread — single-thread `RefCell` ⇒ no concurrent borrow).
596            // If a closure body panicked DURING the `borrow_mut` scope
597            // (between `borrow_mut()` and the let-binding drop) the
598            // borrow is held; `try_borrow` returns Err and we skip the
599            // clear — safe (next post re-sets runnable; next drain
600            // pops + re-observes).
601            if let Ok(q) = self.q.try_borrow() {
602                if q.is_empty() {
603                    self.runnable.set(false);
604                }
605            }
606        }
607    }
608}