graphrefly_core/mailbox.rs
1//! `CoreMailbox` — the `Send + Sync` bridge between autonomous async
2//! producers (timer tasks) and an owned, relocatable [`crate::node::Core`].
3//!
4//! # Why this exists (D223 / D225 / D227 / D230)
5//!
6//! Under the actor / work-stealing model (D221) a `Core` owns its state
7//! cell **by value** and moves between workers — so a long-lived async
8//! task (e.g. a `tokio::spawn`-ed timer loop) can no longer hold
9//! `&Core` / `Arc<C>` / `Weak<C>` to call `Core::emit` directly (that
10//! was the deleted `WeakCore` path; D223 forbids `Weak<C>` back-refs).
11//!
12//! Instead the task holds an `Arc<CoreMailbox>` (`Send + Sync`) and
13//! **posts** a `(NodeId, HandleId)` emit request. The mailbox is drained
14//! **owner-side** by the synchronous [`crate::node::Core::drain_mailbox`]
15//! (applied via the existing sync `Core::emit` — *no async in Core*,
16//! honoring the locked "Core never async" invariant). The drain point is
17//! the embedder's existing advance/pump site (test harness `TestRuntime`
18//! advance helper, napi pump): timer tasks already require the host
19//! runtime to be advanced before they fire, so draining there is
20//! **behaviour-identical** to the old autonomous `Weak::upgrade →
21//! core.emit` (D230).
22//!
23//! # Shape (D227 full)
24//!
25//! - **Op queue** — FIFO of [`MailboxOp`]s, applied owner-side.
26//! - **`closed`** — set when the owning `Core` drops; a timer task that
27//! observes it releases its pending handle and bails (mirrors the old
28//! `Weak::upgrade() == None` teardown path exactly).
29//! - **`runnable`** — the per-group "this Core has work" wake bit. S2b
30//! lands the field + sets/clears it (D227 builds the *full* mailbox
31//! shape now); S4 wires it to wave drain-scoping + finalize, and the
32//! host-executor (M6) reads it to schedule the owning worker. The
33//! in-wave drain (`BatchGuard::drain_and_flush`) already gates on it
34//! so the no-producer §7 floor pays one atomic load, not a mutex.
35//!
36//! # This is ONE mechanism (not six)
37//!
38//! The S2b design history records six producer "forks" (D227–D233);
39//! that is *design-question* count, not runtime surface. What actually
40//! exists is **one** owner-side re-entry mechanism:
41//!
42//! > A `Send + Sync` FIFO of deferred [`MailboxOp`]s, drained on the
43//! > owner thread — **in-wave to quiescence** for producer sinks
44//! > (`BatchGuard::drain_and_flush`) and at the embedder pump point for
45//! > autonomous timer tasks ([`crate::node::Core::drain_mailbox`]) —
46//! > each op applied via the one object-safe owner re-entry surface
47//! > [`crate::node::CoreFull`].
48//!
49//! `MailboxOp` keeps a **typed fast path** (`Emit`/`Complete`/`Error`:
50//! zero-alloc, and a deterministic `Core`-gone handle-release contract —
51//! see the per-kind `post_*`) plus a **`Defer` escape hatch** (a boxed
52//! `FnOnce(&dyn CoreFull)` for value-returning topology mutation:
53//! windowing / higher-order inner subscribe). Collapsing the enum to
54//! pure `Defer` was considered and rejected — it would heap-allocate per
55//! timer/producer emit AND lose the typed `Core`-gone release affordance
56//! (a dropped `FnOnce` can't run an else-branch). `ProducerEmitter`
57//! (`graphrefly-operators`) is thin sugar over `post_*`; the producer
58//! *build* closure isn't a mechanism at all — it uses the `&Core` its
59//! `ProducerCtx` already lends it (D231). One queue, one drain loop, one
60//! `match`, one re-entry trait.
61
62use std::cell::{Cell, RefCell};
63use std::collections::VecDeque;
64use std::sync::atomic::{AtomicBool, Ordering};
65
66use crate::handle::{HandleId, NodeId};
67
68/// **`Send`** cross-thread deferred closure (D233; D249/S2c). Posted
69/// by any cross-thread `Send` producer (canonically an autonomous
70/// timer task — `temporal.rs` `window_time`/etc., a `tokio::spawn`ed
71/// cross-thread task whose defer closure captures only `Send` state:
72/// `Arc<Mutex<NodeId>>` / `Arc<dyn BindingBoundary>` / ids — but the
73/// API admits any future cross-thread producer with the same `Send`
74/// capture discipline, e.g. a napi/pyo3 binding-layer pump) and
75/// applied owner-side. Rides the `Send + Sync` [`CoreMailbox`]
76/// (cross-thread post side, drained owner-side via `drain_mailbox`).
77pub type SendDeferFn = Box<dyn FnOnce(&dyn crate::node::CoreFull) + Send>;
78
79/// **`!Send`** owner-side deferred closure (D248/D249/S2c). Posted by
80/// an owner-side in-wave producer/graph sink whose closure captures
81/// `!Send` state (a relaxed `Sink` / `Rc<RefCell<GraphInner>>` —
82/// D248). Lives in the owner-only [`DeferQueue`], **never** the
83/// cross-thread [`CoreMailbox`].
84pub type DeferFn = Box<dyn FnOnce(&dyn crate::node::CoreFull)>;
85
86/// A re-entry request posted to the [`CoreMailbox`] by an autonomous
87/// async producer (timer task → `Emit`) or by a producer-operator sink
88/// (D232-AMEND/A′ → `Emit`/`Complete`/`Error`). Applied owner-side via
89/// the sync `Core::{emit,complete,error}` by [`crate::node::Core::drain_mailbox`]
90/// — drained **in-wave to quiescence** by the `BatchGuard` drain loop
91/// for producer sinks (immediate, cascade-ordering-preserving), and at
92/// the embedder pump point for timer tasks (D230).
93pub enum MailboxOp {
94 /// `Core::emit(node, handle)`. Posted by timer tasks + producer sinks.
95 Emit(NodeId, HandleId),
96 /// `Core::complete(node)`. Posted by producer sinks.
97 Complete(NodeId),
98 /// `Core::error(node, handle)`. Posted by producer sinks.
99 Error(NodeId, HandleId),
100 /// **`Send`** owner-side closure (D233; D249/S2c). Posted by a
101 /// cross-thread timer task (`temporal.rs` `window_time`/etc.) whose
102 /// closure captures only `Send` state; applied **in-wave** by the
103 /// drain loop (the owner holds `&Core`). The `!Send` owner-side
104 /// sink defers (graph describe/observe, control/higher-order
105 /// dynamic-inner) go to the separate owner-only [`DeferQueue`]
106 /// instead — D248/D249.
107 Defer(SendDeferFn),
108}
109
110impl std::fmt::Debug for MailboxOp {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 match self {
113 Self::Emit(n, h) => write!(f, "Emit({n:?}, {h:?})"),
114 Self::Complete(n) => write!(f, "Complete({n:?})"),
115 Self::Error(n, h) => write!(f, "Error({n:?}, {h:?})"),
116 Self::Defer(_) => write!(f, "Defer(<send closure>)"),
117 }
118 }
119}
120
121/// `Send + Sync` mailbox bridging autonomous async producers to an owned,
122/// relocatable [`crate::node::Core`]. Held behind an `Arc`; the `Core`
123/// owns one clone, each timer task another. See the module docs.
124pub struct CoreMailbox {
125 /// FIFO of [`MailboxOp`]s posted by timer tasks (`Emit`) and
126 /// producer-operator sinks (`Emit`/`Complete`/`Error`), applied
127 /// owner-side by [`crate::node::Core::drain_mailbox`] via the sync
128 /// `Core::{emit,complete,error}`.
129 ops: parking_lot::Mutex<VecDeque<MailboxOp>>,
130 /// Set by [`Self::close`] when the owning `Core` drops. A timer task
131 /// observing `true` from [`Self::post_emit`] is told to release its
132 /// pending handle and bail (the old `Weak::upgrade() == None` path).
133 closed: AtomicBool,
134 /// "This Core has queued work" wake bit (D227 full shape;
135 /// **finalized at S4**). Set on every successful [`Self::post_op`];
136 /// cleared by [`Self::drain_into`]/[`Self::take_all`] once the queue
137 /// empties (under the `ops` lock — QA F-#4 lost-wakeup discipline).
138 ///
139 /// **Actor-model granularity (S4, D246/D248/D249).** `Core` is
140 /// single-owner `!Send + !Sync`; in the actor model one worker owns
141 /// exactly one `Core`, so this Core-wide bit **is** that worker's
142 /// per-group runnable-wake (the worker's Core hosts its declared
143 /// `SchedulingGroupId`(s); a finer per-`SchedulingGroupId` sub-bit
144 /// has no consumer — M6's per-binding group executor schedules at
145 /// the Core/worker grain — so adding one would be speculative
146 /// substrate surface, D196/D246-ignore-legacy, cf. D250). It is the
147 /// only cross-thread bridge into a `!Send` Core: timer/producer
148 /// tasks `post_*` + signal; the owner drains
149 /// ([`crate::node::Core::drain_mailbox`] / the in-wave
150 /// `BatchGuard`). M6 (deferred) reads [`Self::is_runnable`] from the
151 /// host executor to decide when to poll a worker's Core.
152 ///
153 /// QA F12 (2026-05-19): if a future per-`SchedulingGroupId` sub-bit
154 /// is ever added, it MUST be split in lockstep across BOTH this
155 /// `CoreMailbox.runnable` AND `DeferQueue.runnable` — the in-wave
156 /// drain gate (`BatchGuard::drain_and_flush`) ORs the two, and a
157 /// half-split would silently lose wakeups for the unsplit queue.
158 runnable: AtomicBool,
159}
160
161impl CoreMailbox {
162 /// A fresh, open, empty mailbox.
163 #[must_use]
164 pub fn new() -> Self {
165 Self {
166 ops: parking_lot::Mutex::new(VecDeque::new()),
167 closed: AtomicBool::new(false),
168 runnable: AtomicBool::new(false),
169 }
170 }
171
172 /// Post a timer-fired emit request. Returns `false` iff the owning
173 /// `Core` has already dropped ([`Self::close`] was called) — the
174 /// caller MUST then release `handle` and stop (mirrors the old
175 /// `WeakCore::upgrade() == None` teardown branch in `timer.rs`).
176 /// Returns `true` when queued (and sets the `runnable` wake bit).
177 #[must_use]
178 pub fn post_emit(&self, node_id: NodeId, handle: HandleId) -> bool {
179 self.post_op(MailboxOp::Emit(node_id, handle))
180 }
181
182 /// Post a producer-sink `Complete` (D232-AMEND/A′). Returns `false`
183 /// iff the owning `Core` is gone (caller stops; nothing to release).
184 #[must_use]
185 pub fn post_complete(&self, node_id: NodeId) -> bool {
186 self.post_op(MailboxOp::Complete(node_id))
187 }
188
189 /// Post a producer-sink `Error` (D232-AMEND/A′). Returns `false` iff
190 /// the owning `Core` is gone — the caller MUST then release
191 /// `handle` (it owned a retain for the would-be `error` payload).
192 #[must_use]
193 pub fn post_error(&self, node_id: NodeId, handle: HandleId) -> bool {
194 self.post_op(MailboxOp::Error(node_id, handle))
195 }
196
197 /// Post a **`Send`** cross-thread `Defer` (D249/S2c). For an
198 /// autonomous timer task whose closure captures only `Send` state
199 /// (`temporal.rs` `window_time`/etc.). Returns `false` iff the
200 /// owning `Core` is gone — the closure is dropped unrun. The
201 /// `!Send` owner-side sink defers use [`DeferQueue::post`] instead.
202 #[must_use]
203 pub fn post_defer(&self, f: SendDeferFn) -> bool {
204 self.post_op(MailboxOp::Defer(f))
205 }
206
207 /// Post a [`MailboxOp`]. Returns `false` iff the owning `Core` has
208 /// already dropped ([`Self::close`]) — see the per-kind wrappers for
209 /// the caller's handle-release obligation.
210 ///
211 /// QA F-A (2026-05-18): the `closed` check and the `push_back` are
212 /// performed **in one `ops`-lock critical section** so a concurrent
213 /// owner-thread `close()` (which also takes `ops`) cannot interleave
214 /// between "observed not-closed" and "enqueued" — that TOCTOU would
215 /// strand the op (with its retained `HandleId`) in a queue
216 /// `Drop for Core` already walked → leak. `close()` takes the same
217 /// lock, so the two are mutually exclusive.
218 #[must_use]
219 pub fn post_op(&self, op: MailboxOp) -> bool {
220 let mut q = self.ops.lock();
221 if self.closed.load(Ordering::Acquire) {
222 return false;
223 }
224 q.push_back(op);
225 self.runnable.store(true, Ordering::Release);
226 true
227 }
228
229 /// Owner-side drain. Pops every queued [`MailboxOp`] in FIFO order
230 /// and hands each to `apply` (the caller passes a closure over the
231 /// sync `Core::{emit,complete,error}`). Re-entrancy: `apply` may
232 /// itself cascade and a concurrent timer task / re-entrant sink may
233 /// post again — a fresh post re-sets `runnable`, so the enclosing
234 /// drain-to-quiescence loop (or a later drain) picks it up.
235 ///
236 /// QA F-#4 (2026-05-18): the empty observation and the
237 /// `runnable = false` store happen **in the same `ops`-lock critical
238 /// section. Previously the empty `pop_front` released the lock before
239 /// storing `false`, so a concurrent `post_op` (which takes `ops`)
240 /// could push and set `runnable=true` in between, then our `false`
241 /// store would clobber it — a lost wakeup invisible to the
242 /// `is_runnable`-gated in-wave drain and the S4/M6 scheduler.
243 /// Clearing `runnable` while still holding the lock every `post_op`
244 /// must take orders them: a post is either popped here or runs
245 /// strictly after the `false` store and re-sets `true`.
246 /// `max_ops` bounds a single drain (QA P3, 2026-05-18): `apply` may
247 /// re-post (a `Defer` that re-`defer`s, a producer re-subscribing),
248 /// which this loop correctly drains in the *same* call — but a
249 /// closure that re-posts itself on *every* application is an
250 /// unbounded mailbox livelock (the producer-authoring analogue of a
251 /// fn that emits to itself). That livelock lives HERE (the inner
252 /// drain loop), not in `drain_and_flush`'s fire-cascade `cap`, so it
253 /// is bounded + panics here — decoupled from the fire counter so a
254 /// legitimately large finite producer drain never false-trips the
255 /// fire `cap`.
256 ///
257 /// /qa M3 (2026-05-19): a panic from `apply(f)` mid-drain previously
258 /// unwound out of this loop with `runnable` still `true` — under the
259 /// `parking_lot::Mutex` shape that was self-correcting (the next
260 /// drain pop would re-observe + clear), but downstream
261 /// `is_runnable()` gates would spuriously return `true` until then.
262 /// Wraps the empty-queue clear in a `RunnableClearGuard` so an
263 /// `apply` panic still clears `runnable` on unwind: if the queue is
264 /// empty at unwind time the cell is reset; if the queue is
265 /// non-empty the next post would re-set it anyway. Tightens the
266 /// scheduler-wakeup contract under panic.
267 ///
268 /// # Panics
269 ///
270 /// Panics if more than `max_ops` ops are applied in one drain — that
271 /// indicates a producer / `Defer` op re-posting itself every
272 /// application (livelock guard). The default cap is sized for
273 /// realistic cascades; bump via the corresponding setter if your
274 /// workload has evidence it needs more.
275 pub fn drain_into(&self, max_ops: u32, mut apply: impl FnMut(MailboxOp)) {
276 let mut applied = 0u32;
277 // /qa M3 RAII: clear `runnable` on panic unwind if the queue
278 // drains empty during unwind. No-op on the normal exit path —
279 // the explicit `runnable.store(false)` below the empty `pop_front`
280 // still races-free under the `ops` lock (QA F-#4).
281 let _runnable_clear = MailboxRunnableClearOnPanic {
282 ops: &self.ops,
283 runnable: &self.runnable,
284 };
285 loop {
286 let op = {
287 let mut q = self.ops.lock();
288 let Some(op) = q.pop_front() else {
289 self.runnable.store(false, Ordering::Release);
290 return;
291 };
292 op
293 };
294 applied += 1;
295 assert!(
296 applied < max_ops,
297 "mailbox drain exceeded {max_ops} ops in one drain — a \
298 producer/Defer op is re-posting itself every application \
299 (mailbox livelock). Tune via \
300 Core::set_max_batch_drain_iterations only with concrete \
301 evidence the workload needs more."
302 );
303 apply(op);
304 }
305 }
306
307 /// Whether the mailbox currently holds queued work (the wake bit).
308 /// Advisory pre-M6 (the embedder pump drains unconditionally); S4/M6
309 /// consumers gate scheduling on it. `#[must_use]` (/qa m4) — a
310 /// discarded result silently loses the scheduling signal.
311 #[must_use = "is_runnable is the scheduling wake-bit; ignoring it loses the signal"]
312 pub fn is_runnable(&self) -> bool {
313 self.runnable.load(Ordering::Acquire)
314 }
315
316 /// Mark the owning `Core` gone. Idempotent. Takes the `ops` lock so
317 /// it is mutually exclusive with [`Self::post_op`]'s under-lock
318 /// `closed` check (QA F-A): after this returns, no further `post_op`
319 /// can enqueue. Callers MUST then [`Self::take_all`] and release any
320 /// `Emit`/`Error` payload handles still queued (a TOCTOU-enqueued op
321 /// posted just before `close` won the lock).
322 pub fn close(&self) {
323 let _q = self.ops.lock();
324 self.closed.store(true, Ordering::Release);
325 }
326
327 /// Drain and return every still-queued [`MailboxOp`] without
328 /// applying it — for `Drop for Core` teardown (QA F-A / Blind #2).
329 /// `Emit`/`Error` ops carry a retained `HandleId` the caller must
330 /// release; `Defer` closures are dropped unrun (running `CoreFull`
331 /// on a half-dropped `Core` is unsound — user-locked QA decision A,
332 /// 2026-05-18). Clears `runnable` under the lock (same race
333 /// discipline as `drain_into`).
334 #[must_use]
335 pub fn take_all(&self) -> VecDeque<MailboxOp> {
336 let mut q = self.ops.lock();
337 // QA F11 (2026-05-19): contract gate — `take_all` MUST be
338 // preceded by `close()` so no further `post_op` can enqueue
339 // after this returns (a post racing a standalone `take_all`
340 // would otherwise strand an op with its retained `HandleId`
341 // in a mailbox the caller assumes is empty). The one in-tree
342 // caller (`Drop for Core`) sequences `close()` first.
343 debug_assert!(
344 self.closed.load(Ordering::Acquire),
345 "CoreMailbox::take_all must be called after close() — see \
346 docstring contract"
347 );
348 self.runnable.store(false, Ordering::Release);
349 std::mem::take(&mut *q)
350 }
351
352 /// Whether [`Self::close`] has been called.
353 #[must_use]
354 pub fn is_closed(&self) -> bool {
355 self.closed.load(Ordering::Acquire)
356 }
357}
358
359impl Default for CoreMailbox {
360 fn default() -> Self {
361 Self::new()
362 }
363}
364
365// `CoreMailbox` is `Send + Sync` by construction (parking_lot::Mutex +
366// atomics over the id-only `MailboxOp`). Asserted so a future field
367// that breaks it fails here, not at the `Arc<CoreMailbox>` share site
368// in `timer.rs`. D249/S2c: this MUST hold — the `!Send` owner-side
369// `Defer` payload now lives in [`DeferQueue`], NOT here.
370const _: fn() = || {
371 fn assert_send_sync<T: Send + Sync>() {}
372 assert_send_sync::<CoreMailbox>();
373};
374
375/// Owner-only deferred-closure queue (D249/S2c — the minimal Defer
376/// split pulled out of [`CoreMailbox`]; D254 (S5) relaxed off cross-
377/// thread primitives).
378///
379/// Under D248 full single-owner the substrate `Sink`/`TopologySink`
380/// dropped `Send + Sync`, so a [`DeferFn`] (which captures relaxed
381/// `Sink`s / `Rc<RefCell<GraphInner>>`) is `!Send`. It cannot ride the
382/// `Send + Sync` [`CoreMailbox`] (that bridges the `timer.rs`
383/// cross-thread `Arc<CoreMailbox>` post side). This queue is therefore
384/// **owner-only and `!Send`**: held behind an `Rc` shared between
385/// [`crate::node::Core`] and `graphrefly-operators`' `ProducerEmitter`
386/// on the one owner thread. Drained owner-side by
387/// [`crate::node::Core::drain_mailbox`] (after the id-mailbox) and the
388/// in-wave `BatchGuard` drain.
389///
390/// **Owner-thread-only by construction (D254 (S5), 2026-05-19).** The
391/// `Rc<DeferQueue>` is never sent across threads (the closures it holds
392/// are `!Send`), so the cross-thread primitives it used to carry —
393/// `parking_lot::Mutex<VecDeque<DeferFn>>` for `q` and `AtomicBool` for
394/// `runnable` — were unused capacity. D254 collapses them to
395/// `RefCell<VecDeque<DeferFn>>` + `Cell<bool>`, dropping the
396/// `parking_lot::Mutex::lock` acquire on every owner-side post/drain
397/// (the hottest D251 path). `closed` similarly drops to `Cell<bool>` —
398/// it is set by `Drop for Core` on the owner thread (the only `close()`
399/// call site) and read by `post`/`drain_into` on the same thread.
400///
401/// S4 still does the per-group-wake + typed snapshot/prune `MailboxOp`
402/// reshape (D246 rule 8) — D249 is the acknowledged minimal first
403/// touch that lets the D248 single-owner Sink relaxation land in S2c.
404pub struct DeferQueue {
405 q: RefCell<VecDeque<DeferFn>>,
406 /// Set by [`Self::close`] (owner-side, called from `Drop for Core`).
407 /// `Cell<bool>` (D254): owner-thread-only by D248/D249 construction,
408 /// no cross-thread reader.
409 closed: Cell<bool>,
410 /// "Has queued work" bit — mirrors `CoreMailbox::runnable` so the
411 /// in-wave `BatchGuard` drain gate (`is_runnable()`) also fires
412 /// when only a `Defer` (no id-`MailboxOp`) was posted mid-wave
413 /// (the reactive describe/observe `DepsChanged` path, D246 r6).
414 /// `Cell<bool>` (D254): owner-thread-only — see the struct doc.
415 runnable: Cell<bool>,
416}
417
418impl DeferQueue {
419 /// A fresh, open, empty owner-side defer queue.
420 #[must_use]
421 pub fn new() -> Self {
422 Self {
423 q: RefCell::new(VecDeque::new()),
424 closed: Cell::new(false),
425 runnable: Cell::new(false),
426 }
427 }
428
429 /// Whether the queue currently holds work (the in-wave drain gate).
430 /// `#[must_use]` (/qa m4) — a discarded result silently loses the
431 /// drain-gate signal.
432 #[must_use = "is_runnable is the in-wave drain gate; ignoring it loses the signal"]
433 pub fn is_runnable(&self) -> bool {
434 self.runnable.get()
435 }
436
437 /// Enqueue an owner-side deferred closure. Returns `false` iff the
438 /// owning `Core` has dropped ([`Self::close`]) — the caller's
439 /// closure is dropped unrun (same contract as the old
440 /// `CoreMailbox::post_defer`). Owner-thread-only by D248/D249
441 /// construction — see the struct doc; no cross-thread TOCTOU
442 /// against `close` is possible because both run on the one owner
443 /// thread and never overlap.
444 ///
445 /// `#[must_use]` (/qa m3) — `false` is the load-bearing signal that
446 /// the caller's closure was dropped unrun; ignoring it can mask
447 /// teardown races where work was silently discarded.
448 #[must_use = "false means the queue is closed and the closure was dropped unrun"]
449 pub fn post(&self, f: DeferFn) -> bool {
450 if self.closed.get() {
451 return false;
452 }
453 self.q.borrow_mut().push_back(f);
454 self.runnable.set(true);
455 true
456 }
457
458 /// Owner-side drain. Pops every queued closure FIFO and applies it
459 /// (re-entrancy-safe: a closure may re-`post`; a later drain picks
460 /// it up). `max_ops` bounds one drain against a self-re-posting
461 /// livelock (mirrors `CoreMailbox::drain_into`).
462 ///
463 /// **Re-entry note (/qa M6).** Under the pre-D254
464 /// `parking_lot::Mutex<VecDeque<DeferFn>>` shape, an owner-thread
465 /// re-entrant `drain_into` from inside `apply(f)` would have
466 /// **deadlocked** at the inner lock acquire. Under the D254
467 /// `RefCell<VecDeque<DeferFn>>` shape it would **panic with
468 /// `BorrowMutError`** instead — different failure mode, same
469 /// fail-loud outcome. In practice the pre-D254 design never had any
470 /// in-tree re-entrant drain caller (the embedder pump's
471 /// `drain_mailbox` is never invoked recursively from within an
472 /// `apply` closure body — `drain_mailbox` is the embedder seam, not
473 /// a `DeferFn` capture target), so neither failure mode is
474 /// reachable from in-tree code. Documented for any future binding
475 /// that might construct one.
476 ///
477 /// # Panics
478 /// Panics if a single drain applies `max_ops` closures — a `Defer`
479 /// closure re-posting itself every application (owner-side
480 /// livelock), the defer-queue analogue of a fn that emits to itself.
481 pub fn drain_into(&self, max_ops: u32, mut apply: impl FnMut(DeferFn)) {
482 let mut applied = 0u32;
483 // /qa M3 RAII: clear `runnable` on panic unwind. Mirrors
484 // `CoreMailbox::drain_into`'s panic-aware clear shape, except
485 // the cell-based discipline lets the guard `Cell::set(false)`
486 // directly with no lock to acquire.
487 let _runnable_clear = DeferRunnableClearOnPanic {
488 q: &self.q,
489 runnable: &self.runnable,
490 };
491 loop {
492 // Borrow `q` only to pop (closures must run with the borrow
493 // released — `apply(f)` may itself re-`post` on this same
494 // queue, requiring `borrow_mut()` again; nested mut borrow
495 // would panic). Mirrors the lock-release shape of the old
496 // `parking_lot::Mutex` version exactly.
497 let f = {
498 let mut q = self.q.borrow_mut();
499 let Some(f) = q.pop_front() else {
500 // Clear `runnable` while we hold the borrow (owner-
501 // thread-only ⇒ no concurrent `post` to race; this
502 // pairs with `is_runnable` post-drain).
503 self.runnable.set(false);
504 return;
505 };
506 f
507 };
508 applied += 1;
509 assert!(
510 applied < max_ops,
511 "defer-queue drain exceeded {max_ops} closures in one \
512 drain — a Defer closure is re-posting itself every \
513 application (owner-side livelock)."
514 );
515 apply(f);
516 }
517 }
518
519 /// Mark the owning `Core` gone. Idempotent.
520 ///
521 /// **Drop timing of queued closures (QA, 2026-05-19).** This method
522 /// only flips `closed` so subsequent `post` calls drop their
523 /// closures unrun (`closed == true` short-circuits enqueue) and
524 /// running `CoreFull` on a half-dropped `Core` is structurally
525 /// impossible. The *already-queued* closures are NOT drained here;
526 /// they remain in `q` and are dropped when the last `Rc<DeferQueue>`
527 /// clone drops (every `ProducerEmitter` / graph reactive handle
528 /// holds one — D249). Since closures by contract capture no
529 /// `HandleId` (D235 P8 / D246 r8 sites), this is leak-free for
530 /// refcount purposes; callers MUST honour the no-`HandleId`-in-
531 /// `DeferFn` discipline (see [`DeferFn`]'s doc) for this to hold.
532 pub fn close(&self) {
533 self.closed.set(true);
534 }
535}
536
537impl Default for DeferQueue {
538 fn default() -> Self {
539 Self::new()
540 }
541}
542
543// /qa M4 (2026-05-19): `DeferQueue` MUST stay `!Send + !Sync` by
544// construction (owner-thread-only by D248/D249; the `Rc<DeferQueue>`
545// is never sent across threads, the closures it holds are `!Send`).
546// Lock the invariant in the type system so a future field that
547// silently widens the trait bounds (e.g., swapping `RefCell` for
548// `Mutex` "for safety") breaks the build instead of the actor-model
549// invariant.
550static_assertions::assert_not_impl_any!(DeferQueue: Send, Sync);
551
552/// RAII guard that clears [`CoreMailbox::runnable`] on panic unwind if
553/// the queue is empty at unwind time (/qa M3). Outside of unwind, the
554/// explicit `runnable.store(false, Release)` on the empty-pop path of
555/// [`CoreMailbox::drain_into`] handles the normal-exit clear under the
556/// `ops` lock (QA F-#4 lost-wakeup discipline). On panic unwind from
557/// inside `apply(f)`, this guard's `Drop` re-acquires the `ops` lock,
558/// checks whether the queue is empty, and clears `runnable` if so —
559/// if non-empty, leaves `runnable=true` so the next drain picks up the
560/// remaining work, matching the QA F-#4 race discipline.
561struct MailboxRunnableClearOnPanic<'a> {
562 ops: &'a parking_lot::Mutex<VecDeque<MailboxOp>>,
563 runnable: &'a AtomicBool,
564}
565
566impl Drop for MailboxRunnableClearOnPanic<'_> {
567 fn drop(&mut self) {
568 if std::thread::panicking() {
569 let q = self.ops.lock();
570 if q.is_empty() {
571 self.runnable.store(false, Ordering::Release);
572 }
573 }
574 }
575}
576
577/// RAII guard that clears [`DeferQueue::runnable`] on panic unwind if
578/// the queue is empty at unwind time (/qa M3). Owner-thread-only by
579/// D248/D249/D254 construction — no lock to acquire; the cell-based
580/// discipline lets the guard `Cell::set(false)` directly. Mirrors
581/// [`MailboxRunnableClearOnPanic`]'s shape against the relaxed
582/// `RefCell` + `Cell` primitives. If the queue is non-empty at unwind
583/// time, leaves `runnable=true` so the next drain picks up the work.
584struct DeferRunnableClearOnPanic<'a> {
585 q: &'a std::cell::RefCell<VecDeque<DeferFn>>,
586 runnable: &'a Cell<bool>,
587}
588
589impl Drop for DeferRunnableClearOnPanic<'_> {
590 fn drop(&mut self) {
591 if std::thread::panicking() {
592 // `borrow` is safe even mid-panic — the only borrow_mut
593 // sites are this drain loop (which holds the borrow only
594 // for the pop), and `post` (which is on the same owner
595 // thread — single-thread `RefCell` ⇒ no concurrent borrow).
596 // If a closure body panicked DURING the `borrow_mut` scope
597 // (between `borrow_mut()` and the let-binding drop) the
598 // borrow is held; `try_borrow` returns Err and we skip the
599 // clear — safe (next post re-sets runnable; next drain
600 // pops + re-observes).
601 if let Ok(q) = self.q.try_borrow() {
602 if q.is_empty() {
603 self.runnable.set(false);
604 }
605 }
606 }
607 }
608}