Skip to main content

graphrefly_core/
batch.rs

1//! Wave engine — drain loop, fire selection, emission commit, sink dispatch.
2//!
3//! Ports the wave-engine portion of the handle-protocol prototype
4//! (`~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`).
5//! Sibling to [`super::node`]; the dispatcher's other concerns
6//! (registration, subscription, pause/resume, terminal cascade,
7//! `set_deps`) live there.
8//!
9//! # Wave engine entry points
10//!
11//! - [`Core::run_wave`] — wave entry. Claims `in_tick` under the state lock,
12//!   runs `op` lock-released, then drains all transitive fn-fires and
13//!   flushes per-subscriber notifications. Each fn-fire iteration drops
14//!   the state lock around `BindingBoundary::invoke_fn` so user fn callbacks
15//!   can re-enter Core safely.
16//! - [`Core::drain_and_flush`] — drain phase + flush phase. Acquires/drops
17//!   the state lock per iteration around `invoke_fn`.
18//! - [`Core::commit_emission`] — equals-substitution + DIRTY/DATA/RESOLVED
19//!   queueing + child propagation. `&self`-only; bracket-fires
20//!   `BindingBoundary::custom_equals` lock-released.
21//! - [`Core::queue_notify`] — per-subscriber message queueing with
22//!   pause-buffer routing. Snapshots the subscriber list at first-touch-
23//!   per-wave so late subscribers (installed mid-wave between drain
24//!   iterations) don't receive duplicate deliveries from messages already
25//!   queued before they subscribed.
26//! - [`Core::deliver_data_to_consumer`] — single-edge propagation; marks
27//!   the consumer for fn-fire if its tracked-deps set is satisfied.
28//!   Called from `commit_emission`, plus `activate_derived` and
29//!   `set_deps` in [`super::node`].
30//!
31//! # Re-entrance discipline (Slice A close — M1 fully lock-released)
32//!
33//! - **Wave-end sink fires** drop the state lock first (Slice A-bigger
34//!   discipline).
35//! - **`BindingBoundary::invoke_fn`** in `fire_fn` fires lock-released —
36//!   user fn callbacks may re-enter `Core::emit` / `pause` / `resume` /
37//!   `invalidate` / `complete` / `error` / `teardown` and run a nested
38//!   wave (the existing `in_tick` re-entrance gate composes
39//!   transparently).
40//! - **`BindingBoundary::custom_equals`** in `commit_emission`'s equals
41//!   check fires lock-released.
42//! - **Subscribe-time handshake** is the one remaining lock-held callback.
43//!   It now fires per-tier (`[Start]`, `[Data(v)]`, `[Complete|Error]`,
44//!   `[Teardown]`) as separate sink calls, matching the canonical R1.3.5.a
45//!   tier-split. Re-entrance from a handshake sink callback panics with
46//!   the [`reentrance_guard`] diagnostic.
47
48use std::cell::{Cell, RefCell};
49use std::collections::HashMap;
50use std::rc::Rc;
51
52use ahash::AHashSet;
53use indexmap::map::Entry;
54use indexmap::IndexMap;
55
56use smallvec::SmallVec;
57
58use crate::boundary::{DepBatch, FnEmission, FnResult};
59use crate::handle::{FnId, HandleId, NodeId, NO_HANDLE};
60use crate::message::Message;
61use crate::node::{Core, CoreState, EqualsMode, OperatorOp, Sink, TerminalKind};
62
63// Slice G (R1.3.2.d / R1.3.3.a) per-thread tier-3-emit tracker.
64//
65// **Wave scope = the owner thread (S4, D246/D248/D249).** `Core` is
66// single-owner `!Send + !Sync`: every emit in a wave runs on the one
67// owner thread, and a wave is one uninterrupted owner-side drain
68// bounded above by the outermost `BatchGuard` drop. A per-thread
69// `AHashSet<NodeId>` is therefore the natural placement for "has node
70// X already emitted a tier-3 message in this wave?" — its lifetime
71// matches the wave's, with no cross-thread or cross-wave contamination
72// (there is no other thread driving this Core; the deleted
73// `wave_owner` `ReentrantMutex` / cross-thread-BLOCK model is gone
74// with S2c's §7 machinery).
75//
76// **History:** placed per-partition on `SubgraphLockBox::state` (Q3
77// v1), moved to a per-thread thread-local by the D1 patch
78// (2026-05-09) to survive mid-wave cross-thread `set_deps` partition
79// splits — a hazard that only existed under the now-deleted
80// shared-Core cross-thread model. Post-S4 the thread-local placement
81// is simply the owner thread's wave-scoped set.
82//
83// **Lifecycle:** populated by `Core::commit_emission` /
84// `Core::commit_emission_verbatim`; cleared at the OUTERMOST
85// `BatchGuard` drop on the owner thread (both success and
86// panic-discard paths). Re-entrant nested waves on the same Core+
87// thread share the set — inner-wave emits add to the same set; the
88// outermost drop is the canonical clear point.
89thread_local! {
90    static TIER3_EMITTED_THIS_WAVE: RefCell<AHashSet<NodeId>> = RefCell::new(AHashSet::new());
91}
92
93// Q-beyond Sub-slice 1 (D108 / 2026-05-09): per-thread wave-scoped state.
94//
95// **Design rationale (bench-driven, see `benches/lock_strategy.rs`):**
96// - S1 showed parking_lot Mutex same-thread re-acquire is ~14 ns/op,
97//   identical to thread_local borrow_mut. The "mutex hop is slow" intuition
98//   is wrong UNCONTENDED.
99// - S3 showed shared mutex on disjoint cross-thread keys is 2.7× slower
100//   than per-partition mutex / thread_local (35.9 vs 13.0 ns/op) — pure
101//   cache-line bouncing on the lock state itself.
102// - Conclusion: the cost of the prior `Core::cross_partition` mutex was
103//   dominated by cache-line bouncing across cores, NOT by single-thread
104//   mutex acquire overhead. Moving the four wave-scoped fields to a
105//   per-thread thread_local eliminates the bounce point entirely.
106//
107// **Wave scope = the owner thread (S4, D246/D248/D249):** `Core` is
108// single-owner `!Send + !Sync` — exactly one owner thread drives a
109// given `Core`, and a wave is **one uninterrupted owner-side drain**
110// (the deleted cross-thread `wave_owner` `ReentrantMutex` / "cross-thread
111// emits BLOCK" model is gone with S2c's §7 machinery). There is no
112// cross-thread interleave to defend against; the only cross-`Core`
113// concurrency is host-native via *independent* per-worker Cores
114// (actor model), each with its own `WAVE_STATE`.
115//
116// **Lifecycle:** populated by `Core::commit_emission` /
117// `Core::queue_notify` / etc.; mostly drained mid-wave by the auto-resolve
118// sweep + cache snapshot commit/restore. Outermost `BatchGuard::drop`
119// releases any retained handles still in `wave_cache_snapshots` /
120// `deferred_handle_releases`. Defensive wave-start clear at outermost
121// owning BatchGuard entry guards against cargo's thread-reuse propagating
122// stale entries from a prior panicked-mid-wave test.
123thread_local! {
124    static WAVE_STATE: RefCell<WaveState> = RefCell::new(WaveState::new());
125}
126
127// Wave-ownership flag for the at-most-one Core this OS thread may own.
128// Stores the active `Core::generation` (nonzero ⇒ owning a wave on that
129// Core; `0` ⇒ no active Core). Membership-of-generation means "this
130// thread is currently inside an OWNING wave on that Core" — i.e. the
131// outermost `BatchGuard` whose drop must run the drain. Replaces the
132// former Core-global `CoreState::in_tick` bool.
133//
134// **Why a single-generation `Cell<u64>` (D252, S5, 2026-05-19).** Post-
135// D247/D248 `Core` is single-owner `!Send + !Sync`; one Core per OS
136// thread is the model. The earlier `AHashSet<u64>` keyed by
137// `Core::generation` defended against an owner thread holding a wave on
138// Core-A and *also* entering a wave on Core-B from a `DeferQueue`
139// closure (/qa F1, the cross-Core owner-side nesting case D047 fixed).
140// Under D248 single-owner that defense is theoretical — no in-tree call
141// path produces it (would require an owner-side `DeferFn` to capture &
142// drive a *second* `&Core`; the `Send` half of the seam is closed under
143// `Core: !Send`). Per D252 (user-locked 2026-05-19), the hashing +
144// allocation cost is replaced by a single `Cell<u64>` (1 word, no
145// alloc) and the "one Core per OS thread" model is locked in as a
146// **hard invariant** — `BatchGuard::claim_in_tick` panics fail-loud if
147// it observes a nonzero generation that doesn't match `self.generation`,
148// structurally rejecting cross-Core same-thread nesting rather than
149// relying on convention. Nested same-Core re-entry (/qa EC#3, LIVE) is
150// preserved: the matching `claim` returns `false` (slot already holds
151// our generation) so the nested guard's drop no-ops and the outer wave
152// drains. `0` is reserved as the sentinel and `Core::generation` is
153// `NonZeroU64` (the existing process-monotonic counter starts at 1) so
154// the sentinel cannot collide with a live Core.
155//
156// **No lock required.** `Cell` is `!Sync` and only the one owner thread
157// touches it — single-owner `!Sync` Core ⇒ no cross-thread reader.
158//
159// Stale slot: the owning `BatchGuard::drop` clears the cell on every
160// exit path — normal return, the closure-body-panic branch, AND the
161// drain-phase-panic `catch_unwind` arm (before `resume_unwind`). So
162// a slot can only be left stuck if `Drop` itself never runs:
163// `std::mem::forget(guard)` or a process abort without unwinding — both
164// out of contract (`BatchGuard` is `#[must_use]` + `!Send`). A stale
165// nonzero slot would trip the D252 panic-on-mismatch on the NEXT Core's
166// claim on this thread — surfaced loudly, not silently masked.
167//
168// History: this flag lived briefly per-thread (Q-beyond sub-slice 3),
169// was reverted to Core-global (/qa F1+F2), keyed per-(Core, thread) for
170// the deleted disjoint-cross-thread model (D047, 2026-05-15), and at S4
171// (D246/D248/D249) the disjoint-partition constraint was retired with
172// the §7 cross-thread machinery. D252/S5 (2026-05-19) collapses the
173// `AHashSet<u64>` to `Cell<u64>` and locks "one Core per OS thread" as
174// a hard invariant — reversing D047's per-(Core, thread) keying since
175// the cross-Core owner-side nesting case it defended is no in-tree path
176// under single-owner Core. See `docs/rust-port-decisions.md` D252 +
177// D047/D246 and `docs/migration-status.md` § "D246 S5".
178thread_local! {
179    static IN_TICK_OWNED: Cell<u64> = const { Cell::new(0) };
180}
181
182/// Wave-scoped state previously held under [`Core::cross_partition`]'s
183/// `parking_lot::Mutex<CrossPartitionState>`. Now per-thread (Q-beyond
184/// Sub-slice 1, 2026-05-09; Sub-slice 2 added `pending_fires` +
185/// `pending_notify`, 2026-05-09; Sub-slice 3 added `currently_firing`,
186/// `in_tick`, `deferred_flush_jobs`, `deferred_cleanup_hooks`,
187/// `pending_wipes`, `invalidate_hooks_fired_this_wave`, 2026-05-09).
188///
189/// All fields are populated and drained within one wave on the one
190/// owner thread. Cross-thread access is structurally impossible —
191/// `Core` is single-owner `!Send + !Sync` (S4/D248); there is no
192/// other thread driving this Core.
193///
194/// **Refcount discipline (load-bearing):** `wave_cache_snapshots`,
195/// `deferred_handle_releases`, and `pending_notify` hold binding-side
196/// handle retains. They MUST be drained (and released through
197/// `Core::binding.release_handle`) by the outermost `BatchGuard::drop`
198/// on success and panic paths. `pending_notify` holds one retain per
199/// payload-bearing message (one per `Message::payload_handle()`); the
200/// retains are taken in `Core::queue_notify` and balanced either by
201/// `flush_notifications` (success path: pushed into
202/// `deferred_handle_releases`) or directly in the panic-discard path of
203/// `BatchGuard::drop` (taken from `pending_notify` and released).
204///
205/// The thread_local has no `Drop` hook with access to a binding — a
206/// panic that bypasses `BatchGuard::drop` (e.g. panic OUTSIDE any batch)
207/// would leak retains until the thread exits OR the next outermost
208/// wave-start clear runs (which for safety we don't fire — clearing
209/// without releasing would double-leak by losing the retain). The
210/// defensive wave-start clear in `BatchGuard::begin_batch_with_guards`
211/// clears `pending_auto_resolve` + `pending_pause_overflow` +
212/// `pending_fires` (no retains) + `currently_firing` +
213/// `invalidate_hooks_fired_this_wave` (also no retains) but NOT the
214/// retain-holding fields — those must be empty by construction at
215/// outermost wave start (a prior wave's panic-discard path drained them,
216/// or a prior wave's success path drained them).
217pub(crate) struct WaveState {
218    /// Payload-handle releases owed for messages that landed in
219    /// `pending_notify` during this wave (one per `payload_handle()`).
220    /// `BatchGuard::drop` releases these after sinks fire and the lock
221    /// is dropped, balancing the retain done in `queue_notify`.
222    pub(crate) deferred_handle_releases: Vec<HandleId>,
223    /// Pre-wave cache snapshots used to restore state if the wave aborts
224    /// mid-flight (e.g., a `Core::batch` closure panics). Each entry is
225    /// `(node_id → old_cache_handle)` — the handle the node held BEFORE
226    /// the wave started writing to it. The snapshotted handle holds a
227    /// retain (taken when the snapshot was inserted) so it stays alive
228    /// for restoration. On wave success, snapshots are drained and their
229    /// retains released. On wave abort, each cache slot is restored from
230    /// the snapshot and the original retain transfers to the cache slot.
231    pub(crate) wave_cache_snapshots: HashMap<NodeId, HandleId>,
232    /// D291: nodes whose `rec.terminal` slot transitioned `None → Some(_)`
233    /// during this wave (via [`Core::terminate_node`]). The set is the
234    /// snapshot — no payload because the idempotent guard in `terminate_node`
235    /// (`if rec.terminal.is_some() { continue; }`) means the only valid
236    /// transition is `None → Some`, so restore is unconditionally
237    /// `rec.terminal = None`. ERROR-tier `TerminalKind::Error(h)` retains
238    /// are released lock-released on restore (matches the
239    /// `wave_cache_snapshots` discipline — slot owns the retain, restore
240    /// transfers ownership to the releases vec).
241    ///
242    /// Closes the R4.3.2 status-snapshot completeness gap for terminal
243    /// tiers (cross-track-ledger §1 D282 row, D290 follow-on "Case 5"):
244    /// pre-D291 a `ctx.down(src, [COMPLETE])` inside `batch()` + throw
245    /// would clear the wave's pending DATA/COMPLETE messages via
246    /// [`Self::discard_wave_cleanup`] but leave `rec.terminal = Some(_)`,
247    /// silently rejecting any post-rollback emit on the same node
248    /// (substrate's terminal-state guard).
249    ///
250    /// **Refcount discipline:** the set itself holds no handles. On
251    /// restore, the pre-transition `rec.terminal` slot's owned handle
252    /// (`Some(TerminalKind::Error(h))` only — `Complete` has no payload)
253    /// is taken out of the slot and pushed into the releases vec; the
254    /// caller releases it after dropping the state lock.
255    pub(crate) wave_terminal_snapshots: AHashSet<NodeId>,
256    /// D291: per-dep terminal slots that transitioned `None → Some(_)`
257    /// during this wave. Mirrors [`Self::wave_terminal_snapshots`] for
258    /// the cascade-to-children case in [`Core::terminate_node`]
259    /// (`child.dep_records[idx].terminal = Some(t)`, same idempotent
260    /// guard at the dep-record level).
261    ///
262    /// **D291 /qa D2 (2026-05-25): keyed on `(child_id, dep_node_id)`**,
263    /// NOT `(child_id, dep_idx)`. A mid-batch `Core::set_deps(child_id,
264    /// …)` regenerates `dep_records` and invalidates positional
265    /// indexes; re-keying on the dep's `NodeId` lets restore re-resolve
266    /// the (possibly-new) index via `child.dep_index_of(dep_node_id)`.
267    /// Pre-fix a set_deps-mid-batch + cascade-rollback would silently
268    /// zero the WRONG dep's terminal slot. Restore sets the slot back
269    /// to `None`; ERROR-tier handles taken from the slot are released
270    /// lock-released.
271    pub(crate) wave_dep_terminal_snapshots: AHashSet<(NodeId, NodeId)>,
272    /// D297: nodes that transitioned `has_received_teardown: false → true`
273    /// during this wave (via [`Core::teardown_inner`]'s `Action::Visit`
274    /// arm). Mirrors [`Self::wave_terminal_snapshots`] for the R2.6.4 /
275    /// Lock 6.F idempotency flag — restored to `false` on panic-discard so
276    /// a retry of `teardown(node)` post-rollback isn't silently no-op'd by
277    /// the idempotency guard at [`Core::teardown_inner`] `Action::Visit`.
278    ///
279    /// Closes the R2.2.7.a + R4.3.2 atomicity gap surfaced by the D297
280    /// HALT premise check (the original D291 deferred-scope Item 1 framing
281    /// of "academic" was wrong — post-rollback the node was left in a
282    /// contradictory state `terminal=None` ∧ `has_received_teardown=true`
283    /// that the public surface couldn't reset).
284    ///
285    /// Bilateral convergence direction: pure-ts has been conformant on
286    /// this field since D282 — `_preBatchSnapshot.teardownDone`
287    /// ([`packages/pure-ts/src/core/node.ts:3993`] +
288    /// `:4136` for restore). Rust catches up here.
289    ///
290    /// **Refcount discipline:** the set holds no handles. Restore on
291    /// panic-discard just sets the boolean back to `false` for each
292    /// snapshotted node; the wire `Teardown` message (no payload) was
293    /// already cleared by the existing `pending_notify` drop.
294    pub(crate) wave_teardown_snapshots: AHashSet<NodeId>,
295    /// Nodes that need an auto-Resolved at wave end if they don't receive
296    /// a tier-3+ message from their own commit_emission. Populated by
297    /// the RESOLVED child propagation in `commit_emission`. Drained by
298    /// the auto-resolve sweep in `drain_and_flush`.
299    pub(crate) pending_auto_resolve: AHashSet<NodeId>,
300    /// R1.3.8.c pause-overflow ERROR synthesis queue. Recorded by
301    /// [`Core::queue_notify`] when the pause buffer first overflows;
302    /// drained at wave-end after the lock-released call to
303    /// `BindingBoundary::synthesize_pause_overflow_error`.
304    ///
305    /// # Panic-discard trade-off (deliberate; D280 doc-lock, 2026-05-22)
306    ///
307    /// Spec R1.3.8.c specifies the ERROR-synthesis contract for the
308    /// success path; it is silent on panic-discard semantics. The
309    /// [`Core::drain_and_flush`] success path drains this queue and
310    /// fires synthesis. The [`BatchGuard::drop`] panic-discard path
311    /// clears the queue WITHOUT firing synthesis (via
312    /// [`Self::clear_wave_state`] below), so a queued overflow ERROR
313    /// diagnostic for a wave that also panicked is silently dropped.
314    /// The consumer-visible `ResumeReport.dropped` count IS preserved
315    /// (lives on `PauseState`, not here); only the synthesized ERROR
316    /// with `{ nodeId, droppedCount, configuredMax, lockHeldDurationMs }`
317    /// is lost for that specific wave.
318    ///
319    /// This is the load-bearing [`BatchGuard`] atomicity invariant —
320    /// "the wave didn't happen" — applied uniformly to every
321    /// retain-holding / refcount-discipline field across the panic
322    /// path. Adding an asymmetric ERROR-surfacing exception here
323    /// would weaken atomicity for every other invariant that relies
324    /// on it (cache restoration, dep-mask reset,
325    /// `pending_auto_resolve` clear, etc.). The fn panic itself is
326    /// also louder than the missing diagnostic — a process-level
327    /// signal (panic hook → stderr) survives the wave-discard.
328    ///
329    /// # Lift point (future consumer pressure only)
330    ///
331    /// If a consumer surfaces a real need for overflow diagnostics
332    /// across panic boundaries, the correct shape is a
333    /// **panic-survivable diagnostic side channel** (e.g., a
334    /// Core-level `on_panic_diagnostic` hook or a binding-layer
335    /// callback that the panic-discard path invokes BEFORE clearing
336    /// retain-holding state) — NOT bolting a surviving-ERROR
337    /// exception onto [`BatchGuard::drop`]'s atomicity contract. Mint
338    /// as a separate D-number under the D196 consumer-pressure gate
339    /// when the scenario materializes.
340    pub(crate) pending_pause_overflow: Vec<crate::node::PendingPauseOverflow>,
341    /// Nodes whose fn we owe a fire to — drained by [`Core::run_wave`].
342    ///
343    /// Q-beyond Sub-slice 2 (D108, 2026-05-09): moved from
344    /// `CoreState::pending_fires` to per-thread `WaveState`. Wave-scoped
345    /// — populated by `deliver_data_to_consumer`, `terminate_node`'s
346    /// child-cascade `QueueFire` branch, `activate_derived`'s producer
347    /// queueing, `resume`'s pending-wave consolidation, and operator
348    /// re-arm paths; drained by `pick_next_fire` / `fire_fn` /
349    /// `fire_regular` / `fire_operator` (each removes the firing node
350    /// before invoking).
351    pub(crate) pending_fires: AHashSet<NodeId>,
352    /// Per-node outgoing message buffer; flushed at wave end. Insertion-
353    /// ordered so flush order is deterministic — load-bearing for
354    /// R1.3.9.d meta-TEARDOWN ordering: when a parent and its meta
355    /// companion both have queued messages in the same wave, the meta
356    /// (queued first via `teardown_inner`'s recursion order) flushes
357    /// first.
358    ///
359    /// Each entry carries the per-wave subscriber snapshot taken at first
360    /// touch (Slice A close, M1: lock-released drain). Late subscribers
361    /// installed mid-wave between fn-fire iterations don't appear in
362    /// already-snapshotted entries; this is the load-bearing fix that
363    /// prevents duplicate-Data delivery when a handshake delivers the
364    /// post-commit cache and the wave's flush would otherwise also fire
365    /// to the same sink.
366    ///
367    /// Q-beyond Sub-slice 2 (D108, 2026-05-09): moved from
368    /// `CoreState::pending_notify` to per-thread `WaveState`. The map
369    /// holds a payload-handle retain per payload-bearing message
370    /// (`Message::payload_handle()`); these MUST be released by the
371    /// outermost `BatchGuard::drop` (success path: through
372    /// `flush_notifications` → `deferred_handle_releases`; panic path:
373    /// directly in `BatchGuard::drop`'s panic branch).
374    pub(crate) pending_notify: IndexMap<NodeId, PendingPerNode>,
375    /// D217-AMEND-2 (2026-05-16): persistent spare for `pending_notify`,
376    /// ping-ponged with it at wave end so a fresh `IndexMap::default()`
377    /// (a new `ahash::RandomState` via `gen_hasher_seed`/`from_keys`
378    /// PLUS RawVec realloc churn on the next wave's `queue_notify`) is
379    /// NEVER constructed after thread init. The empirical attribution
380    /// (`examples/profile_st_emit.rs` + macOS `sample`) put the old
381    /// per-wave `mem::take(&mut pending_notify)` at ~1250 of ~4767
382    /// hot-path samples — the dominant §7 floor tax (D217 lever-1
383    /// "slab store" falsified; the node store is minor). Holds NO
384    /// retains between waves: it is always empty (cleared, capacity +
385    /// hasher retained) outside `flush_notifications`.
386    pub(crate) pending_notify_recycle: IndexMap<NodeId, PendingPerNode>,
387    // Q-beyond Sub-slice 3 (D108, 2026-05-09) moved `in_tick` and
388    // `currently_firing` from `CoreState` to per-thread `WaveState`;
389    // /qa F1+F2 (2026-05-10) reverted both to `CoreState`; the in_tick
390    // placement was finalized 2026-05-15 (D047) and then collapsed by
391    // D252 (S5, 2026-05-19) — see below. The two fields have *different*
392    // scope requirements:
393    //
394    // - **`in_tick` — one-Core-per-OS-thread `Cell<u64>` (D252).** Pure
395    //   thread-local once broke cross-Core isolation (Core-A's flag
396    //   leaked to Core-B on the same OS thread, /qa F1); pure Core-
397    //   global broke the now-deleted disjoint-partition cross-thread
398    //   drain. The intermediate D047 `AHashSet<u64>` keyed by
399    //   `Core::generation` defended both. Under post-D248 single-owner
400    //   Core the cross-Core owner-side nesting case has no in-tree
401    //   consumer (would require an owner-side `DeferFn` to drive a
402    //   *second* `&Core`, structurally absent), so D252 collapses the
403    //   set to a single `Cell<u64>` slot per OS thread and panics
404    //   fail-loud on cross-Core nesting. Same-Core nested re-entry
405    //   (/qa EC#3) is preserved by the matching-generation branch in
406    //   `BatchGuard::claim_in_tick`. NOT a `CoreState` field.
407    //
408    // - **`currently_firing` — Core-global (stays on `CoreState`).**
409    //   Per-thread placement silently bypassed the cross-thread P13
410    //   partition-migration check in `Core::set_deps`: thread B's set_deps
411    //   must observe thread A's firing pushes. Per-Core (cross-thread
412    //   visible) placement restores the D091 safety check (/qa F2).
413    //
414    // The other 11 wave-scoped fields stay per-thread because they're
415    // accessed only by the one owner thread (single-owner `!Send` Core,
416    // S4/D248 — no cross-thread emitter exists).
417    /// Slice E2 (R1.3.9.b strict per D057): per-wave-per-node dedup
418    /// for `OnInvalidate` cleanup hook firing. A node already in this
419    /// set this wave has already had its `OnInvalidate` queued into
420    /// `deferred_cleanup_hooks` and MUST NOT queue again, even if
421    /// `invalidate_inner` re-encounters it.
422    ///
423    /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
424    /// `CoreState::invalidate_hooks_fired_this_wave` to per-thread
425    /// `WaveState`. Wave-scoped — populated by `invalidate_inner` and
426    /// cleared by `WaveState::clear_wave_state`.
427    pub(crate) invalidate_hooks_fired_this_wave: AHashSet<NodeId>,
428    /// Deferred sink-fire jobs collected by `flush_notifications`.
429    /// `flush_notifications` populates this from `pending_notify`;
430    /// `Core::drain_deferred` takes it and `Core::fire_deferred` fires
431    /// each entry lock-released. Each tuple is
432    /// `(sinks_for_one_node_one_phase, phase_messages)`. Empty between
433    /// waves.
434    ///
435    /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
436    /// `CoreState::deferred_flush_jobs` to per-thread `WaveState`. No
437    /// retains held — the `Vec<Sink>` clones own Arcs that drop
438    /// naturally; the `Vec<Message>` payload retains were already moved
439    /// into `deferred_handle_releases` by `flush_notifications`.
440    pub(crate) deferred_flush_jobs: DeferredJobs,
441    /// Slice E2 (per D060/D061): lock-released drain queue for
442    /// `OnInvalidate` cleanup hooks. Populated by `Core::invalidate_inner`
443    /// when a node's cache transitions `!= NO_HANDLE → NO_HANDLE`;
444    /// drained after the lock drops at wave boundary by
445    /// `Core::fire_deferred` (each call wrapped in `catch_unwind` per
446    /// D060). Panic-discarded silently per D061.
447    ///
448    /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
449    /// `CoreState::deferred_cleanup_hooks` to per-thread `WaveState`.
450    pub(crate) deferred_cleanup_hooks: Vec<(NodeId, crate::boundary::CleanupTrigger)>,
451    /// Slice E2 /qa Q2(b) (D069): lock-released drain queue for
452    /// `BindingBoundary::wipe_ctx` calls fired eagerly from
453    /// `Core::terminate_node` when a resubscribable node terminates with
454    /// no live subscribers. Drained alongside `deferred_cleanup_hooks`
455    /// at wave boundary; same `catch_unwind` discipline. Panic-discarded
456    /// silently.
457    ///
458    /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
459    /// `CoreState::pending_wipes` to per-thread `WaveState`.
460    pub(crate) pending_wipes: Vec<NodeId>,
461}
462
463impl WaveState {
464    fn new() -> Self {
465        Self {
466            deferred_handle_releases: Vec::new(),
467            wave_cache_snapshots: HashMap::new(),
468            // D291: empty by construction at outermost wave start; both
469            // success + panic-discard paths drain.
470            wave_terminal_snapshots: AHashSet::new(),
471            wave_dep_terminal_snapshots: AHashSet::new(),
472            // D297: empty by construction at outermost wave start; both
473            // success + panic-discard paths drain via
474            // `drain_wave_terminal_snapshots` /
475            // `restore_wave_teardown_snapshots`.
476            wave_teardown_snapshots: AHashSet::new(),
477            pending_auto_resolve: AHashSet::new(),
478            pending_pause_overflow: Vec::new(),
479            pending_fires: AHashSet::new(),
480            pending_notify: IndexMap::new(),
481            // D217-AMEND-2: one `IndexMap::default()` for the thread's
482            // life — its ahash seed + capacity are recycled forever.
483            pending_notify_recycle: IndexMap::new(),
484            invalidate_hooks_fired_this_wave: AHashSet::new(),
485            deferred_flush_jobs: Vec::new(),
486            deferred_cleanup_hooks: Vec::new(),
487            pending_wipes: Vec::new(),
488        }
489    }
490
491    /// Wave-end clear of the non-retain-holding fields. Called from
492    /// [`Core::drain_and_flush`]'s wave-end path. Fields holding retains
493    /// (`wave_cache_snapshots`, `deferred_handle_releases`,
494    /// `pending_notify`) are NOT cleared here — they follow the
495    /// success/panic paths' explicit drain discipline in
496    /// `BatchGuard::drop`.
497    pub(crate) fn clear_wave_state(&mut self) {
498        self.pending_auto_resolve.clear();
499        // pending_pause_overflow is normally drained by drain_and_flush
500        // via the synthesis loop. If a wave is panic-discarded BEFORE
501        // synthesis runs, BatchGuard::drop's panic path also clears it
502        // explicitly. Pre-wave defensive clear in
503        // `begin_batch_with_guards` makes this idempotent.
504        //
505        // D280 (2026-05-22): the deliberate trade-off — atomicity beats
506        // diagnostic on panic — is documented on the field declaration
507        // at `pending_pause_overflow`; lift-point (panic-survivable
508        // diagnostic side channel) gated on D196 consumer pressure.
509        self.pending_pause_overflow.clear();
510        // Sub-slice 2: pending_fires is intentionally NOT cleared
511        // here. Two reasons:
512        //   1. Wave-success drain empties it by construction: every
513        //      `pick_next_fire` selection is removed by
514        //      `fire_regular` / `fire_operator` before invocation,
515        //      and `drain_and_flush` only exits when the set is empty.
516        //   2. The `Core::resume` default-mode consolidated-fire
517        //      pattern stages an entry OUTSIDE any in-tick wave and
518        //      then enters a new wave to drain it; clearing here
519        //      would erase that pre-staged entry. The panic-discard
520        //      path in `BatchGuard::drop` clears it explicitly.
521
522        // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
523        // CoreState::currently_firing — defensive clear there.
524        // Slice E2 (D057): per-wave-per-node OnInvalidate dedup is
525        // wave-scoped — cleared so the next wave can fire cleanups
526        // again.
527        self.invalidate_hooks_fired_this_wave.clear();
528        // `deferred_flush_jobs`, `deferred_cleanup_hooks`, and
529        // `pending_wipes` are intentionally NOT cleared here. They
530        // follow the same discipline as `deferred_handle_releases` /
531        // `pending_notify`:
532        //   - SUCCESS path (`BatchGuard::drop` non-panic): drained by
533        //     `Core::drain_deferred` AFTER `clear_wave_state` runs,
534        //     then fired lock-released by `Core::fire_deferred`.
535        //   - PANIC-DISCARD path (`BatchGuard::drop` panic): explicitly
536        //     `std::mem::take`-and-dropped AFTER `clear_wave_state`
537        //     runs (silently per D061 / D069).
538        // Clearing here would race the success path: queued sink fires
539        // / cleanup hooks / wipes would be erased BEFORE
540        // `drain_deferred` could take them.
541    }
542}
543
544/// Run a closure with mutable access to this thread's [`WaveState`].
545///
546/// Convention: prefer this helper over inline `WAVE_STATE.with(...)`
547/// for sites that touch ONE field. For sites that interleave state lock
548/// access with wave-state mutation, inline `WAVE_STATE.with(...)` keeps
549/// the lock-acquire / wave-state-borrow scopes visible (mirrors the
550/// pre-Q-beyond `let mut s = self.lock_state(); let mut cps = self.lock_cross_partition();`
551/// pattern).
552///
553/// **Re-entrance:** the closure MUST NOT re-enter Core in a way that
554/// would call back into `with_wave_state` — `RefCell::borrow_mut` panics
555/// on nested borrow. The same discipline that the prior
556/// `parking_lot::Mutex<CrossPartitionState>` enforced (no re-entry
557/// holding cross_partition) carries over.
558pub(crate) fn with_wave_state<R>(f: impl FnOnce(&mut WaveState) -> R) -> R {
559    WAVE_STATE.with(|cell| f(&mut cell.borrow_mut()))
560}
561
562/// Outermost-wave defensive clear of [`WaveState`]'s non-retain-holding
563/// fields. Called from [`BatchGuard::begin_batch_with_guards`] on
564/// outermost owning entry. Mirrors the pre-existing tier3 defensive
565/// clear (D1 patch, 2026-05-09) — guards against cargo's thread-reuse
566/// propagating stale entries from a prior panicked-mid-wave test.
567///
568/// The retain-holding fields (`wave_cache_snapshots` /
569/// `deferred_handle_releases`) MUST already be empty by construction at
570/// outermost wave entry — outermost `BatchGuard::drop` always drains
571/// them on both success and panic paths. If they're non-empty here it
572/// indicates a prior wave bypassed `BatchGuard::drop`; in that case
573/// the next BatchGuard's outermost drop will eventually drain them.
574fn wave_state_clear_outermost() {
575    with_wave_state(|ws| {
576        // /qa F4 (2026-05-10): debug_assert that retain-holding fields
577        // are empty at outermost wave start. The invariant claim is
578        // "outermost BatchGuard::drop drains them on both success and
579        // panic paths, so they're empty before the next wave starts."
580        // If a panic path EVER bypasses the drain (today: not reachable
581        // because BatchGuard::drop is robust against panicking sinks via
582        // catch_unwind), this assert catches it in tests immediately
583        // rather than letting stale entries leak into the next wave's
584        // drain (which would release Core-A's HandleIds via Core-B's
585        // binding under cross-Core same-thread sequential use).
586        debug_assert!(
587            ws.wave_cache_snapshots.is_empty(),
588            "wave_state_clear_outermost: wave_cache_snapshots non-empty at \
589             outermost wave start ({} entries) — prior BatchGuard::drop \
590             bypassed the drain (would leak retains into next wave's \
591             binding). See /qa F4 (2026-05-10).",
592            ws.wave_cache_snapshots.len()
593        );
594        // D291: same invariant for the terminal-slot snapshots. The set
595        // holds no handles directly, but its drain triggers ERROR-handle
596        // releases via `restore_wave_terminal_snapshots`; stale entries
597        // would either silently restore `rec.terminal = None` on an
598        // unrelated next wave (corrupting state) or skip a real release
599        // (leaking refcounts). Outermost `BatchGuard::drop` drains both
600        // on success (via [`Core::drain_wave_terminal_snapshots`]) and
601        // panic (via [`Core::restore_wave_terminal_snapshots`] from
602        // [`Self::discard_wave_cleanup`]).
603        debug_assert!(
604            ws.wave_terminal_snapshots.is_empty(),
605            "wave_state_clear_outermost: wave_terminal_snapshots non-empty \
606             at outermost wave start ({} entries) — prior BatchGuard::drop \
607             bypassed the drain (would corrupt next wave's terminal slots). \
608             See D291.",
609            ws.wave_terminal_snapshots.len()
610        );
611        debug_assert!(
612            ws.wave_dep_terminal_snapshots.is_empty(),
613            "wave_state_clear_outermost: wave_dep_terminal_snapshots non-empty \
614             at outermost wave start ({} entries). See D291.",
615            ws.wave_dep_terminal_snapshots.len()
616        );
617        // D297: same invariant for the teardown-flag snapshots. The set
618        // holds no handles, but its drain triggers
619        // `has_received_teardown = false` resets via
620        // `restore_wave_teardown_snapshots`; stale entries would either
621        // silently clear an unrelated node's flag on the next wave
622        // (corrupting R2.6.4 idempotency) or no-op (leaking the lift).
623        // Outermost `BatchGuard::drop` drains on success (via
624        // [`Core::drain_wave_terminal_snapshots`]) and panic (via
625        // [`Core::restore_wave_teardown_snapshots`] from
626        // [`Self::discard_wave_cleanup`]).
627        debug_assert!(
628            ws.wave_teardown_snapshots.is_empty(),
629            "wave_state_clear_outermost: wave_teardown_snapshots non-empty \
630             at outermost wave start ({} entries). See D297.",
631            ws.wave_teardown_snapshots.len()
632        );
633        debug_assert!(
634            ws.deferred_handle_releases.is_empty(),
635            "wave_state_clear_outermost: deferred_handle_releases non-empty \
636             at outermost wave start ({} entries) — prior BatchGuard::drop \
637             bypassed the drain. See /qa F4 (2026-05-10).",
638            ws.deferred_handle_releases.len()
639        );
640        debug_assert!(
641            ws.pending_notify.is_empty(),
642            "wave_state_clear_outermost: pending_notify non-empty at \
643             outermost wave start ({} entries) — prior BatchGuard::drop \
644             bypassed the drain. See /qa F4 (2026-05-10).",
645            ws.pending_notify.len()
646        );
647        // D217-AMEND-2 / QA: enforce the invariant the field's own doc
648        // claims ("always empty outside `flush_notifications`"). The
649        // recycle slot is cleared at the end of every `flush_notifications`
650        // and drained on the panic-discard path (`discard_wave_cleanup`);
651        // a non-empty slot here means a panic bypassed both — surface it
652        // loudly in tests rather than silently injecting a prior wave's
653        // stale entries into the next wave's `mem::swap`.
654        debug_assert!(
655            ws.pending_notify_recycle.is_empty(),
656            "wave_state_clear_outermost: pending_notify_recycle non-empty \
657             at outermost wave start ({} entries) — a panic bypassed both \
658             flush_notifications' clear AND discard_wave_cleanup's drain. \
659             See D217-AMEND-2 / QA (2026-05-16).",
660            ws.pending_notify_recycle.len()
661        );
662        ws.pending_auto_resolve.clear();
663        ws.pending_pause_overflow.clear();
664        // Sub-slice 2: pending_fires is intentionally NOT cleared here.
665        // Pre-Sub-slice-2 it lived on CoreState and survived between
666        // waves; load-bearing for `Core::resume`'s default-mode
667        // consolidated-fire pattern, which inserts into pending_fires
668        // OUTSIDE any in-tick wave (Phase 1, lock-held but `in_tick`
669        // false at that moment) and then calls `run_wave_for(node_id)`
670        // — `run_wave_for` enters a NEW outermost wave whose drain must
671        // pick up that pre-staged pending_fires entry. Clearing here
672        // would erase it.
673        //
674        // pending_fires holds no retains, so a stale entry from a
675        // prior panicked-mid-wave test that bypassed BatchGuard::drop
676        // would leak as a spurious fire on the next wave on the same
677        // thread (no refcount damage). The panic-discard path in
678        // BatchGuard::drop and the wave-success drain together
679        // guarantee pending_fires is empty by wave end; relying on
680        // that invariant matches the pre-refactor lifecycle.
681        //
682        // Intentionally NOT clearing wave_cache_snapshots /
683        // deferred_handle_releases / pending_notify here — those hold
684        // retains and need a binding to release. Documented invariant:
685        // they're empty by outermost wave start.
686
687        // Sub-slice 3 (2026-05-09; /qa F2 partially reverted 2026-05-10):
688        // defensively clear the OnInvalidate dedup set on outermost-wave
689        // entry. Holds no retains; a stale entry from a prior
690        // panicked-mid-wave test that bypassed BatchGuard::drop would
691        // only suppress the OnInvalidate cleanup hook for that node on
692        // the next wave (no refcount damage). Clearing matches the
693        // tier3 defensive-clear precedent.
694        //
695        // `currently_firing` was reverted to CoreState (per /qa F2 — the
696        // per-thread placement silently bypassed the cross-thread P13
697        // partition-migration check); its defensive clear lives in
698        // `CoreState::clear_wave_state` (which BatchGuard::drop runs
699        // wave-end on both success and panic paths).
700        ws.invalidate_hooks_fired_this_wave.clear();
701        // Intentionally NOT clearing deferred_flush_jobs /
702        // deferred_cleanup_hooks / pending_wipes here — by invariant
703        // they're empty at outermost wave start (drained on success
704        // by drain_deferred → fire_deferred; drained on panic by
705        // BatchGuard::drop's panic branch). Pre-clearing would race a
706        // hypothetical wave that staged into them OUTSIDE in_tick
707        // (none does today, but matching the deferred_handle_releases
708        // / pending_notify discipline keeps the invariant uniform).
709    });
710}
711
712// §7 (D208–D211): the per-thread `PARTITION_CACHE` is DELETED. It existed
713// to amortize the union-find `compute_touched_partitions` BFS + registry
714// epoch round-trips across repeated same-seed emits. With static
715// user-declared scheduling groups there is no epoch and the
716// touched-group walk resolves group `Arc`s under a single uncontended
717// `group_locks` lock (and is entirely skipped for the all-`None` floor),
718// so the cache (and its ABA-avoidance generation keying) is unnecessary.
719
720/// Has `node` emitted a tier-3 (DATA / RESOLVED) message in the current
721/// wave on this thread? See [`TIER3_EMITTED_THIS_WAVE`] for the per-thread
722/// wave-scope rationale.
723fn tier3_check(node: NodeId) -> bool {
724    TIER3_EMITTED_THIS_WAVE.with(|s| s.borrow().contains(&node))
725}
726
727/// Mark `node` as having emitted a tier-3 message in the current wave on
728/// this thread. Idempotent. See [`TIER3_EMITTED_THIS_WAVE`].
729fn tier3_mark(node: NodeId) {
730    TIER3_EMITTED_THIS_WAVE.with(|s| {
731        s.borrow_mut().insert(node);
732    });
733}
734
735/// Wave-end clear of the per-thread tier3 tracker. Called from the
736/// OUTERMOST [`BatchGuard::drop`] on this thread (both success and
737/// panic-discard paths). Inner non-owning BatchGuard drops MUST NOT
738/// invoke this — the outer wave is still in flight and inner-wave marks
739/// are part of the outer wave's Slice G coalescing state.
740fn tier3_clear() {
741    TIER3_EMITTED_THIS_WAVE.with(|s| {
742        s.borrow_mut().clear();
743    });
744}
745
746/// Deferred sink-fire jobs collected during `flush_notifications`. Each
747/// entry pairs a snapshot of the sink Arcs to fire with the messages to
748/// deliver to them — one entry per (node × phase) cell with non-empty
749/// content. Drained from `CoreState` and fired lock-released.
750pub(crate) type DeferredJobs = Vec<(Vec<Sink>, Vec<Message>)>;
751
752/// Lock-released drain payload of the wave's BatchGuard:
753/// `(sink_jobs, handle_releases, OnInvalidate cleanup hooks, pending wipe_ctx fires)`.
754/// Returned by [`Core::drain_deferred`], consumed by [`Core::fire_deferred`].
755/// Sliced into a type alias to satisfy `clippy::type_complexity`.
756pub(crate) type WaveDeferred = (
757    DeferredJobs,
758    Vec<HandleId>,
759    Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
760    Vec<crate::handle::NodeId>,
761);
762
763/// One subscriber-snapshot epoch within a node's wave-end notification
764/// queue. A `PendingBatch` is opened the first time `queue_notify` runs
765/// for the node in a wave, and a fresh batch is opened whenever the node's
766/// `subscribers_revision` advances mid-wave (a new sink subscribes, an
767/// existing sink unsubscribes, or a handshake-time panic evicts an
768/// orphaned sink). All messages within one batch flush to the same sink
769/// list — the snapshot taken when the batch opened, frozen against
770/// subsequent revision bumps.
771pub(crate) struct PendingBatch {
772    /// `NodeRecord::subscribers_revision` value at the moment this batch
773    /// opened. Used by `queue_notify` to decide append-to-last-batch vs
774    /// open-fresh-batch on every push.
775    pub(crate) snapshot_revision: u64,
776    /// Subscriber snapshot frozen at batch-open time. SmallVec<[_; 1]>
777    /// inlines the common single-subscriber case (avoids heap alloc for
778    /// the dominant 1-sink-per-node pattern in most reactive graphs).
779    pub(crate) sinks: SmallVec<[Sink; 1]>,
780    /// Messages queued to this batch. SmallVec<[_; 3]> inlines the
781    /// common per-node-per-wave message set (DIRTY + DATA + optional
782    /// RESOLVED) without heap allocation.
783    pub(crate) messages: SmallVec<[Message; 3]>,
784}
785
786/// Per-node wave-end notification queue, structured as one or more
787/// subscriber-snapshot epochs (`batches`). The common case (no
788/// mid-wave subscribe / unsubscribe at this node) keeps a single
789/// inline batch — `SmallVec<[_; 1]>` keeps that allocation-free.
790///
791/// **Slice X4 / D2 (2026-05-08):** the prior shape was a single
792/// `(sinks, messages)` pair per node — the snapshot froze on first
793/// `queue_notify` and was reused for every subsequent emit to the same
794/// node in the wave. That caused the documented late-subscriber +
795/// multi-emit-per-wave gap (R1.3.5.a divergence): a sub installed
796/// between two emits to the same node was invisible to the second
797/// emit's flush slice. The revision-tracked batch list resolves it —
798/// late subs land in a fresh batch that frozenly carries them, while
799/// pre-subscribe batches retain their original snapshot so the new
800/// sub doesn't double-receive earlier emits via flush AND handshake.
801pub(crate) struct PendingPerNode {
802    pub(crate) batches: SmallVec<[PendingBatch; 1]>,
803}
804
805impl PendingPerNode {
806    /// Iterate every queued message for this node across all batches in
807    /// arrival order. Used by R1.3.3.a invariant assertions and the
808    /// auto-resolve / Slice-G coalescing tier-3-presence checks, which
809    /// reason about wave-content per node, not per batch.
810    pub(crate) fn iter_messages(&self) -> impl Iterator<Item = &Message> + '_ {
811        self.batches.iter().flat_map(|b| b.messages.iter())
812    }
813
814    /// Mutable counterpart for `iter_messages`. Used by
815    /// `rewrite_prior_resolved_to_data` to in-place rewrite Resolved
816    /// entries to Data when a wave detects a multi-emit case after the
817    /// fact.
818    pub(crate) fn iter_messages_mut(&mut self) -> impl Iterator<Item = &mut Message> + '_ {
819        self.batches.iter_mut().flat_map(|b| b.messages.iter_mut())
820    }
821}
822
823/// RAII helper for the A6 reentrancy guard (Slice F, 2026-05-07).
824///
825/// Pushes `node_id` onto [`WaveState::currently_firing`] on construction,
826/// pops it on Drop. [`Core::set_deps`] consults the stack and rejects
827/// `set_deps(N, ...)` from inside N's own fn-fire with
828/// [`crate::node::SetDepsError::ReentrantOnFiringNode`] — closing the
829/// D1 hazard where Phase-1's snapshot of `dep_handles` would refer to
830/// a different dep ordering than Phase-3's `tracked` storage.
831///
832/// Wraps the lock-released `invoke_fn` (and operator-equivalent FFI
833/// callbacks like `project_each` / `predicate_each`). Drop fires even
834/// on panic, so the stack stays balanced under user-fn unwinds.
835///
836/// Membership semantics (NOT strict LIFO): the only consumer of
837/// `currently_firing` is `Core::set_deps`'s reentrancy check, which uses
838/// `contains(&n)` — a set-membership test. Drop pops the right-most
839/// matching `node_id` via `rposition` + `swap_remove`. For a stack like
840/// `[A, B, A]` (A's fn re-enters B, B's fn re-enters A), B's drop pops
841/// the SECOND A (index 1) via swap_remove, leaving `[A, A]` — the
842/// physical order of the remaining As may not match construction order,
843/// but membership is preserved. If a future call site needs strict LIFO
844/// (e.g. "pop the most recently fired node"), switch to `pop()` + assert
845/// the popped value equals `self.node_id`. (QA A6, 2026-05-07)
846// D221 (F-b) floor-hardening, 2026-05-17: holds `&'a Core`, NOT an
847// owned `core.clone()` (Core is no longer `Clone`). D246/S2c: the
848// `group_locks`/`global_wave` Arcs are deleted; the lock-free
849// single-owner `RefCell` floor pays zero per-fn-fire Arc tax. Both
850// construction sites
851// (`fire_regular` batch.rs:~1148, `fire_operator` batch.rs:~1719) are
852// locals in a `&self` method, so the `self: &Core` borrow strictly
853// outlives the guard — the lifetime is sound by construction (no
854// escape, dropped within the same method). Removing the clone here also
855// stops S2's `LockedCell` deletion from being able to reintroduce the
856// tax.
857pub(crate) struct FiringGuard<'a> {
858    core: &'a Core,
859    node_id: NodeId,
860}
861
862impl<'a> FiringGuard<'a> {
863    pub(crate) fn new(core: &'a Core, node_id: NodeId) -> Self {
864        // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
865        // CoreState (cross-thread visible, restoring the D091 P13 check).
866        // Push under the state lock scope.
867        {
868            let mut s = core.lock_state();
869            s.shared.currently_firing.push(node_id);
870        }
871        Self { core, node_id }
872    }
873}
874
875impl Drop for FiringGuard<'_> {
876    fn drop(&mut self) {
877        // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
878        // CoreState. Pop under state lock.
879        {
880            let mut s = self.core.lock_state();
881            // Pop the right-most matching node_id (membership semantics —
882            // not strict LIFO). If absent, an external rebalance already
883            // popped — silent no-op (panic-in-Drop is poison).
884            if let Some(pos) = s
885                .shared
886                .currently_firing
887                .iter()
888                .rposition(|n| *n == self.node_id)
889            {
890                s.shared.currently_firing.swap_remove(pos);
891            }
892        }
893    }
894}
895
896/// Borrow the per-operator scratch slot as `&T`. Panics if the slot is
897/// uninitialized or the contained type doesn't match `T` — both are
898/// invariant violations for any `fire_op_*` helper that should only be
899/// called from `fire_operator`'s match arm for the matching variant.
900fn scratch_ref<T: crate::op_state::OperatorScratch>(s: &CoreState, node_id: NodeId) -> &T {
901    s.require_node(node_id)
902        .op_scratch
903        .as_ref()
904        .expect("op_scratch slot uninitialized for operator node")
905        .as_any_ref()
906        .downcast_ref::<T>()
907        .expect("op_scratch type mismatch")
908}
909
910/// Mutable borrow of the per-operator scratch slot. Same invariants as
911/// [`scratch_ref`].
912fn scratch_mut<T: crate::op_state::OperatorScratch>(s: &mut CoreState, node_id: NodeId) -> &mut T {
913    s.require_node_mut(node_id)
914        .op_scratch
915        .as_mut()
916        .expect("op_scratch slot uninitialized for operator node")
917        .as_any_mut()
918        .downcast_mut::<T>()
919        .expect("op_scratch type mismatch")
920}
921
922impl Core {
923    // -------------------------------------------------------------------
924    // Wave entry + drain
925    // -------------------------------------------------------------------
926
927    /// Wave entry. The caller passes a closure that performs the wave's
928    /// triggering operation (`commit_emission`, `terminate_node`, etc.).
929    /// The closure runs lock-released; closure-internal Core methods
930    /// acquire the state lock as they go.
931    ///
932    /// **Implementation:** delegates to [`Self::begin_batch`] for the
933    /// wave's RAII lifecycle. The returned `BatchGuard` claims `in_tick`
934    /// (`Core::generation`-keyed) and on drop runs the drain + flush +
935    /// sink-fire phases — OR, if the closure panicked, the
936    /// panic-discard path that restores cache snapshots and clears
937    /// in_tick. (S4/D248: the `wave_owner` re-entrant mutex is deleted —
938    /// single-owner `!Send` Core, one uninterrupted owner-side drain.)
939    /// This unification gives `run_wave` the same panic-safety
940    /// guarantee as the user-facing `Core::batch`.
941    ///
942    /// **Re-entrance:** a closure invoked from inside another wave — the
943    /// inner `run_wave`'s `begin_batch` observes `in_tick=true`, the
944    /// returned guard is non-owning (`owns_tick=false`), drop is a no-op.
945    /// The outer wave's drain picks up the inner closure's queued work.
946    ///
947    /// **Lock-release discipline (Slice A close, M1):** all binding-side
948    /// callbacks except the subscribe-time handshake fire lock-released.
949    /// Sinks / user fns / custom-equals oracles that re-enter Core via
950    /// the owner-side mailbox/`DeferQueue` seam run a nested wave. The
951    /// one owner thread runs the in-flight drain to quiescence before
952    /// `emit` returns — preserving the user-facing "emit returning means
953    /// subscribers have observed" contract (no cross-thread lock needed;
954    /// there is no cross-thread emitter).
955    /// Wave entry with a known `seed` node. Acquires only the partitions
956    /// transitively touched from `seed` (downstream cascade via
957    /// `s.children` + R1.3.9.d meta-companion cascade) instead of every
958    /// current partition. The canonical Y1 parallelism win for per-seed
959    /// entry points (`Core::emit`, `Core::subscribe`'s activation,
960    /// `Core::pause` / `Core::resume` / `Core::invalidate` / `Core::complete`
961    /// / `Core::error` / `Core::teardown` / `Core::set_deps`'s
962    /// push-on-subscribe).
963    ///
964    /// S4/D248: single-owner `!Send + !Sync` Core — one uninterrupted
965    /// owner-side drain per wave; the deleted per-partition `wave_owner`
966    /// `ReentrantMutex` parallelism is replaced by host-native
967    /// concurrency across *independent per-worker Cores* (actor model).
968    /// The "emit returning means subscribers have observed" contract
969    /// holds because the one owner thread drains to quiescence before
970    /// returning.
971    ///
972    /// Slice Y1 / Phase E (2026-05-08).
973    pub(crate) fn run_wave_for<F>(&self, seed: crate::handle::NodeId, op: F)
974    where
975        F: FnOnce(&Self),
976    {
977        let _guard = self.begin_batch_for(seed);
978        op(self);
979    }
980
981    /// Wave entry helper. D274 (2026-05-21): the
982    /// `Result<(), PartitionOrderViolation>` wrapper was deleted — groups
983    /// are static identity only post-D248/D253, single-owner per Core per
984    /// D246, one Core per OS thread per D252; partition-order violations
985    /// cannot fire.
986    pub(crate) fn try_run_wave_for<F>(&self, seed: crate::handle::NodeId, op: F)
987    where
988        F: FnOnce(&Self),
989    {
990        let _guard = self.begin_batch_for(seed);
991        op(self);
992    }
993
994    /// Drain retains held by `wave_cache_snapshots` and return them so
995    /// the caller can release them lock-released. Called from the
996    /// wave-success path in [`BatchGuard::drop`].
997    ///
998    /// Q-beyond Sub-slice 1 (D108, 2026-05-09): the snapshots map moved
999    /// to per-thread `WaveState`; signature takes `&mut WaveState`. The
1000    /// drain-and-release-lock-released discipline (introduced as /qa A1
1001    /// fix 2026-05-09 against the prior cross_partition mutex) carries
1002    /// over: caller drains under WaveState borrow + state lock, releases
1003    /// after both are dropped — `release_handle` may re-enter Core via
1004    /// finalizers and re-entry under either guard would deadlock /
1005    /// double-borrow.
1006    #[must_use]
1007    pub(crate) fn drain_wave_cache_snapshots(ws: &mut WaveState) -> Vec<HandleId> {
1008        if ws.wave_cache_snapshots.is_empty() {
1009            return Vec::new();
1010        }
1011        std::mem::take(&mut ws.wave_cache_snapshots)
1012            .into_values()
1013            .collect()
1014    }
1015
1016    /// Restore cache slots from `wave_cache_snapshots` and clear the map.
1017    /// Called from the wave-abort path in `BatchGuard::drop` (panic).
1018    ///
1019    /// For each snapshotted node:
1020    ///
1021    /// 1. Read the current cache (the in-flight new value).
1022    /// 2. Set `cache = old_handle` (the snapshot's retained value).
1023    /// 3. Release the now-unowned current cache handle.
1024    ///
1025    /// Returns the list of "current" handles to release outside the lock.
1026    /// Q-beyond Sub-slice 1 (D108, 2026-05-09): the snapshots map moved
1027    /// to per-thread `WaveState`; signature takes both `s` (for cache
1028    /// slots) and `ws` (for the snapshots map).
1029    pub(crate) fn restore_wave_cache_snapshots(
1030        &self,
1031        s: &mut CoreState,
1032        ws: &mut WaveState,
1033    ) -> Vec<HandleId> {
1034        if ws.wave_cache_snapshots.is_empty() {
1035            return Vec::new();
1036        }
1037        let snapshots = std::mem::take(&mut ws.wave_cache_snapshots);
1038        let mut releases = Vec::with_capacity(snapshots.len());
1039        for (node_id, old_handle) in snapshots {
1040            let Some(rec) = s.nodes.get_mut(&node_id) else {
1041                releases.push(old_handle);
1042                continue;
1043            };
1044            let current = std::mem::replace(&mut rec.cache, old_handle);
1045            if current != NO_HANDLE {
1046                releases.push(current);
1047            }
1048        }
1049        releases
1050    }
1051
1052    /// D291: success-path drain for `wave_terminal_snapshots` +
1053    /// `wave_dep_terminal_snapshots`. Called from [`BatchGuard::drop`]'s
1054    /// success path (via the same wave-end drain that handles
1055    /// [`Self::drain_wave_cache_snapshots`]). On commit the wave's
1056    /// terminal mutations are kept, so this just clears the snapshot
1057    /// sets without touching `rec.terminal` / dep-record slots — those
1058    /// slots already own their ERROR-handle retains.
1059    pub(crate) fn drain_wave_terminal_snapshots(ws: &mut WaveState) {
1060        // D291 /qa A1 (2026-05-25): `AHashSet::clear()` on an empty set
1061        // is O(1) — the prior `is_empty()` guards were dead pre-checks.
1062        ws.wave_terminal_snapshots.clear();
1063        ws.wave_dep_terminal_snapshots.clear();
1064        // D297: same success-path clear for the teardown-flag snapshots.
1065        // On commit, the wave's `has_received_teardown = true` mutations
1066        // are kept (the R2.6.4 idempotency guard is the load-bearing
1067        // contract for committed teardowns); the snapshot set just
1068        // tracked "which nodes need rollback on panic," nothing on
1069        // commit.
1070        ws.wave_teardown_snapshots.clear();
1071    }
1072
1073    /// D291: panic-path restore for `wave_terminal_snapshots` +
1074    /// `wave_dep_terminal_snapshots`. Mirrors
1075    /// [`Self::restore_wave_cache_snapshots`]: drains each snapshot,
1076    /// resets the corresponding slot to `None`, and returns the
1077    /// ERROR-tier `HandleId`s the caller must release lock-released.
1078    ///
1079    /// For each snapshotted node / `(node, dep_idx)`:
1080    /// 1. Take the current `Option<TerminalKind>` value (expected
1081    ///    `Some(_)` because the snapshot was inserted at the
1082    ///    `None → Some(_)` transition).
1083    /// 2. Set the slot back to `None`.
1084    /// 3. If the taken value was `TerminalKind::Error(h)`, push `h`
1085    ///    into the releases vec so the caller can drop the slot's
1086    ///    retain after the state lock is released.
1087    ///
1088    /// A snapshotted entry whose `rec` no longer exists (orphaned by a
1089    /// torn-down node) is silently skipped — no slot to restore.
1090    pub(crate) fn restore_wave_terminal_snapshots(
1091        &self,
1092        s: &mut CoreState,
1093        ws: &mut WaveState,
1094    ) -> Vec<HandleId> {
1095        let mut releases: Vec<HandleId> = Vec::new();
1096        if !ws.wave_terminal_snapshots.is_empty() {
1097            let snapshots = std::mem::take(&mut ws.wave_terminal_snapshots);
1098            releases.reserve(snapshots.len());
1099            for node_id in snapshots {
1100                let Some(rec) = s.nodes.get_mut(&node_id) else {
1101                    // D291 /qa A10 (2026-05-25): fail-loud in debug
1102                    // builds — no in-tree code path tears down a node
1103                    // mid-wave after its terminal slot was snapshotted,
1104                    // so an orphaned snapshot indicates either a future
1105                    // in-wave node-drop seam (currently absent) or a
1106                    // missed refcount transfer in a future slice.
1107                    // Silent skip would leak the ERROR retain held by
1108                    // the (now-gone) slot.
1109                    debug_assert!(
1110                        false,
1111                        "D291 invariant: snapshotted node {node_id:?} was torn down \
1112                         mid-wave — restore can't release the ERROR retain held by \
1113                         the slot. /qa F6."
1114                    );
1115                    continue;
1116                };
1117                if let Some(TerminalKind::Error(h)) = rec.terminal.take() {
1118                    releases.push(h);
1119                }
1120                // R2.6.4 (Lock 6.F) auto-COMPLETE pre-pend semantics —
1121                // see [`Core::teardown_inner`]. The `has_received_teardown`
1122                // flag is rolled back by D297's sibling restore method
1123                // [`Core::restore_wave_teardown_snapshots`] — called from
1124                // the same `discard_wave_cleanup` site as this one.
1125            }
1126        }
1127        if !ws.wave_dep_terminal_snapshots.is_empty() {
1128            let snapshots = std::mem::take(&mut ws.wave_dep_terminal_snapshots);
1129            releases.reserve(snapshots.len());
1130            for (child_id, dep_node_id) in snapshots {
1131                // D291 /qa D2 (2026-05-25): re-resolve the dep index by
1132                // dep `NodeId` so a mid-batch `set_deps(child_id, …)`
1133                // that regenerated `dep_records` doesn't zero the wrong
1134                // slot. `dep_index_of` returns `None` if the dep has
1135                // been removed entirely (rewire dropped it before
1136                // rollback) — that's fine, no slot to restore;
1137                // `set_deps`'s own refcount discipline already released
1138                // the slot's retain when the dep was removed.
1139                let Some(rec) = s.nodes.get_mut(&child_id) else {
1140                    debug_assert!(
1141                        false,
1142                        "D291 invariant: snapshotted child {child_id:?} was torn down \
1143                         mid-wave (dep {dep_node_id:?}) — restore can't release the \
1144                         ERROR retain held by the slot. /qa F6."
1145                    );
1146                    continue;
1147                };
1148                let Some(idx) = rec.dep_index_of(dep_node_id) else {
1149                    continue;
1150                };
1151                if let Some(TerminalKind::Error(h)) = rec.dep_records[idx].terminal.take() {
1152                    releases.push(h);
1153                }
1154            }
1155        }
1156        releases
1157    }
1158
1159    /// D297: panic-path restore for `wave_teardown_snapshots`. Mirrors
1160    /// [`Self::restore_wave_terminal_snapshots`] for the R2.6.4 / Lock 6.F
1161    /// `has_received_teardown` idempotency flag. Drains the snapshot set,
1162    /// resets `rec.has_received_teardown = false` for each snapshotted
1163    /// node so a retry of `teardown(node)` post-rollback re-runs the
1164    /// auto-COMPLETE prepend + queue Teardown path (not silently no-op'd
1165    /// by [`Core::teardown_inner`]'s `Action::Visit` idempotency guard).
1166    ///
1167    /// The set holds no handles; this method takes nothing to release.
1168    /// Returns `()` (vs `restore_wave_terminal_snapshots`'s `Vec<HandleId>`)
1169    /// to make the contrast at call sites obvious — the existing wire
1170    /// `Teardown` message (no payload, tier 6) was already cleared by the
1171    /// `pending_notify` drop in the same `discard_wave_cleanup` block.
1172    ///
1173    /// A snapshotted entry whose `rec` no longer exists (orphaned by a
1174    /// torn-down node) is silently skipped (no flag to reset). Mirrors the
1175    /// D291 /qa F6 invariant — no in-tree code path tears down a node
1176    /// mid-wave after its teardown flag was snapshotted; a debug_assert
1177    /// guards against future seams.
1178    ///
1179    /// Source: D297 (2026-05-26) — closes D291 deferred-scope Item 1
1180    /// `has_received_teardown` flag rollback. Bilateral convergence on
1181    /// pure-ts's pre-existing `_preBatchSnapshot.teardownDone` capture
1182    /// (`packages/pure-ts/src/core/node.ts:3993` + `:4136`), landed via
1183    /// D282.
1184    pub(crate) fn restore_wave_teardown_snapshots(&self, s: &mut CoreState, ws: &mut WaveState) {
1185        if ws.wave_teardown_snapshots.is_empty() {
1186            return;
1187        }
1188        let snapshots = std::mem::take(&mut ws.wave_teardown_snapshots);
1189        for node_id in snapshots {
1190            let Some(rec) = s.nodes.get_mut(&node_id) else {
1191                debug_assert!(
1192                    false,
1193                    "D297 invariant: snapshotted node {node_id:?} was torn down \
1194                     mid-wave — `has_received_teardown` flag can't be reset. \
1195                     Mirrors D291 /qa F6."
1196                );
1197                continue;
1198            };
1199            rec.has_received_teardown = false;
1200        }
1201    }
1202
1203    /// Drain pending fires until quiescent, then flush wave-end notifications
1204    /// to subscribers. Each fire iteration drops the state lock around the
1205    /// binding's `invoke_fn` callback so user fns may re-enter Core safely.
1206    ///
1207    /// `&self`-only — manages its own locking. Called from [`Self::run_wave`]
1208    /// and [`super::node::Core::activate_derived`] (via `run_wave`).
1209    pub(crate) fn drain_and_flush(&self) {
1210        let mut guard = 0u32;
1211        loop {
1212            // A′ (D232-AMEND): apply any producer-sink / timer
1213            // `MailboxOp`s **in-wave** at the top of every drain
1214            // iteration. Each applied op re-enters `Core::{emit,
1215            // complete,error}` with `in_tick = true` (non-owning batch
1216            // guard → it does NOT start its own drain; it queues into
1217            // this wave's `pending_fires`), so a sink that posted during
1218            // the previous `fire_fn` is picked up on the very next
1219            // iteration — immediate, in-wave, cascade-ordering-preserving
1220            // (consistent with the pre-S2b deferred-producer-op
1221            // drained-within-the-wave behaviour). Quiescence requires
1222            // BOTH `pending_fires` AND the mailbox empty.
1223            //
1224            // §7-floor guard (D-reflect, 2026-05-17): gate on the cheap
1225            // `runnable` atomic (one `Acquire` load) BEFORE touching the
1226            // mailbox `parking_lot::Mutex`. A no-producer / no-timer wave
1227            // (the `identity_dedup` ≈508 ns floor path) never posts, so
1228            // `runnable` stays `false` and this costs one relaxed-ish
1229            // atomic load per drain iteration — NOT a mutex lock + deque
1230            // pop. **Sink-side** posts are same-thread, in-wave: the
1231            // `runnable` Release precedes this Acquire on the same
1232            // thread, so no in-wave op is missed. **Task-side** posts
1233            // (timer/temporal `tokio::spawn` tasks) are cross-thread:
1234            // their happens-before is the `ops` `parking_lot::Mutex`
1235            // (acquire/release), NOT this atomic — `runnable` is only an
1236            // advisory fast-path bit there, and a racing task post is
1237            // caught by the embedder pump's *unconditional*
1238            // `drain_mailbox()` (timer tasks drain at the pump, not this
1239            // in-wave gate). Do NOT remove the unconditional pump drain.
1240            // This is exactly the per-group wake bit S4 wires to the
1241            // host executor — doing it here now is coherent, not
1242            // throwaway.
1243            if self.mailbox.is_runnable() || self.deferred.is_runnable() {
1244                self.drain_mailbox();
1245            }
1246
1247            // R1.3.8.c (Slice F, A3): if no fires are pending but there are
1248            // queued pause-overflow ERRORs, synthesize them now. The
1249            // resulting ERROR cascade may add to pending_fires (children
1250            // settling their terminal state), so we loop back to drain.
1251            //
1252            // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): pending_fires
1253            // and pending_pause_overflow both live on per-thread
1254            // WaveState. State lock no longer required for either read.
1255            let synth_pending = with_wave_state(|ws| {
1256                if ws.pending_fires.is_empty() && !ws.pending_pause_overflow.is_empty() {
1257                    std::mem::take(&mut ws.pending_pause_overflow)
1258                } else {
1259                    Vec::new()
1260                }
1261            });
1262            for entry in synth_pending {
1263                // Lock-released call to the binding hook. Default impl
1264                // returns None — the binding has opted out of R1.3.8.c
1265                // and we fall back to silent-drop + ResumeReport.dropped.
1266                let handle = self.binding.synthesize_pause_overflow_error(
1267                    entry.node_id,
1268                    entry.dropped_count,
1269                    entry.configured_max,
1270                    entry.lock_held_ns / 1_000_000,
1271                );
1272                if let Some(h) = handle {
1273                    // Re-enter Core::error to terminate the node and
1274                    // cascade. We're inside a wave (`in_tick = true`),
1275                    // so error() gets a non-owning batch guard — it
1276                    // doesn't try to start its own drain. The cascade
1277                    // queues into our outer drain via pending_fires
1278                    // and pending_notify.
1279                    self.error(entry.node_id, h);
1280                }
1281            }
1282
1283            // Pick next fire under a short lock. Also re-read the configured
1284            // drain cap so callers can tune via `Core::set_max_batch_drain_iterations`
1285            // without restarting waves mid-flight.
1286            //
1287            // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
1288            // on per-thread WaveState; pick_next_fire takes both state and
1289            // WaveState. The pending_size diagnostic and emptiness check
1290            // also read WaveState. Borrow scopes are split: WaveState
1291            // borrow drops before fire_fn runs (which re-borrows WaveState
1292            // via fire_regular / fire_operator).
1293            let (next, cap, pending_size) = {
1294                let s = self.lock_state();
1295                let cap = s.shared.max_batch_drain_iterations;
1296                let (next, pending_size) = with_wave_state(|ws| {
1297                    if ws.pending_fires.is_empty() {
1298                        return (None, 0);
1299                    }
1300                    let size = ws.pending_fires.len();
1301                    let next = Self::pick_next_fire(&s, ws);
1302                    (next, size)
1303                });
1304                (next, cap, pending_size)
1305            };
1306            if pending_size == 0 {
1307                // QA F1 (2026-05-18): quiescence requires BOTH
1308                // `pending_fires` AND the mailbox empty — the asserted
1309                // invariant the old `if pending_size == 0 { break }`
1310                // did NOT enforce. If the mailbox still holds work,
1311                // `continue` so the next iteration's top-of-loop
1312                // `is_runnable()`-gated `drain_mailbox()` applies it
1313                // (an applied op either cascades into `pending_fires`
1314                // or, if terminal/no-op, leaves the mailbox empty so
1315                // the *next* check breaks). §7 floor: no producer/timer
1316                // ⇒ never posted ⇒ `is_runnable()` false ⇒ breaks on the
1317                // first empty `pending_fires` (one atomic load — already
1318                // in the floor budget).
1319                // QA P3 (2026-05-18): the mailbox-continue does NOT
1320                // count against the fire-cascade `cap`. `drain_mailbox`
1321                // already drains the FIFO to quiescence in ONE call
1322                // (re-posts during `apply` are popped by the same
1323                // `drain_into` loop), and the self-reposting-`Defer`
1324                // livelock is bounded INSIDE `drain_into` (its own
1325                // `max_ops`). Reaching here `is_runnable()` again means
1326                // genuinely-new cross-thread (timer) work — bounded by
1327                // external input, not a fire cycle — so counting it
1328                // against the fire `cap` would false-trip a production
1329                // panic on heavy producer/timer graphs.
1330                if self.mailbox.is_runnable() || self.deferred.is_runnable() {
1331                    continue;
1332                }
1333                break;
1334            }
1335            guard += 1;
1336            assert!(
1337                guard < cap,
1338                "wave drain exceeded {cap} iterations \
1339                 (pending_fires={pending_size}). Most likely cause: a runtime \
1340                 cycle introduced by an operator that re-arms its own pending_fires \
1341                 slot from inside `invoke_fn` (e.g. a producer that subscribes to \
1342                 itself, or a fn that calls Core::emit on a node whose fn fires \
1343                 the original node again). Structural cycles via set_deps are \
1344                 rejected at edge-mutation time. Tune via Core::set_max_batch_drain_iterations \
1345                 only with concrete evidence the workload needs more iterations."
1346            );
1347            let Some(next) = next else { break };
1348            // fire_fn manages its own locking around invoke_fn.
1349            self.fire_fn(next);
1350        }
1351        // Auto-resolve sweep: nodes registered in pending_auto_resolve
1352        // by the RESOLVED child propagation need a Resolved if they didn't
1353        // fire and settle via their own commit_emission. Check pending_notify
1354        // for each candidate — if it has Dirty but no tier-3+ message, the
1355        // node never settled and needs auto-Resolved. Route through
1356        // queue_notify so paused nodes get the Resolved into their pause
1357        // buffer.
1358        let mut s = self.lock_state();
1359        // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): pending_auto_resolve
1360        // and pending_notify both live on per-thread WaveState. /qa A5
1361        // fix (2026-05-09): explicit scope for the WaveState borrow so
1362        // it drops BEFORE the for-loop. Inside the loop, `queue_notify`
1363        // re-borrows WaveState for `pending_pause_overflow.push` /
1364        // `pending_notify` writes — re-entrance on RefCell::borrow_mut
1365        // would panic. Explicit scope makes the lifetime load-bearing.
1366        let candidates = with_wave_state(|ws| std::mem::take(&mut ws.pending_auto_resolve));
1367        for node_id in candidates {
1368            let needs_resolve = with_wave_state(|ws| {
1369                ws.pending_notify
1370                    .get(&node_id)
1371                    .is_some_and(|entry| !entry.iter_messages().any(|m| m.tier() >= 3))
1372            });
1373            if needs_resolve {
1374                self.queue_notify(&mut s, node_id, Message::Resolved);
1375            }
1376        }
1377        // Final flush phase — populates deferred_flush_jobs
1378        // from pending_notify (already carries per-node sink snapshots).
1379        self.flush_notifications(&mut s);
1380    }
1381
1382    /// Pick the pending node with the lowest topological rank.
1383    ///
1384    /// Nodes with lower `topo_rank` have no transitive upstream in
1385    /// `pending_fires` (by construction — `topo_rank = 1 + max dep rank`).
1386    /// This is O(|pending_fires|) instead of the prior O(N·V) BFS.
1387    /// §10 perf optimization (D047, Slice U).
1388    ///
1389    /// Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_fires` lives on
1390    /// per-thread `WaveState`. Caller passes `&WaveState` alongside
1391    /// `&CoreState` so the borrow scopes stay disjoint and visible.
1392    fn pick_next_fire(s: &CoreState, ws: &WaveState) -> Option<NodeId> {
1393        ws.pending_fires
1394            .iter()
1395            .copied()
1396            .min_by_key(|&id| s.nodes.get(&id).map_or(0, |r| r.topo_rank))
1397    }
1398
1399    /// Wave drain entry point. Dispatches via `rec.op` to either the
1400    /// regular fn-fire path ([`Self::fire_regular`]) or the operator
1401    /// dispatch ([`Self::fire_operator`]).
1402    pub(crate) fn fire_fn(&self, node_id: NodeId) {
1403        let op = {
1404            let s = self.lock_state();
1405            s.nodes.get(&node_id).and_then(|r| r.op)
1406        };
1407        match op {
1408            Some(operator_op) => self.fire_operator(node_id, operator_op),
1409            None => {
1410                // State / Derived / Dynamic / Producer all dispatch via fn_id.
1411                self.fire_regular(node_id);
1412            }
1413        }
1414    }
1415
1416    /// Fire a node's fn lock-released around `invoke_fn`.
1417    ///
1418    /// Phase 1 (lock-held): remove from pending_fires, snapshot fn_id +
1419    /// dep_records → DepBatch + kind. Skip if terminal, first-run-gate-closed,
1420    /// or stateless.
1421    ///
1422    /// Phase 2 (lock-released): call `BindingBoundary::invoke_fn`. User fn
1423    /// callbacks may re-enter Core (`emit`, `pause`, etc.) and run a nested
1424    /// wave — the in_tick gate composes naturally because nested calls
1425    /// observe `in_tick = true` and skip their own drain.
1426    ///
1427    /// Phase 3 (lock-held): mark `has_fired_once`, store dynamic-tracked,
1428    /// decide between Noop+RESOLVED, single Data, or Batch.
1429    ///
1430    /// Phase 4: commit emissions. Single Data goes through
1431    /// `commit_emission` (with equals substitution). Batch emissions are
1432    /// processed in sequence — Data via `commit_emission_verbatim` (no
1433    /// equals substitution per R1.3.2.d / R1.3.3.c), Complete/Error via
1434    /// terminal cascade.
1435    #[allow(clippy::too_many_lines)] // Slice G added Noop / Batch tier-3 guards
1436    fn fire_regular(&self, node_id: NodeId) {
1437        enum FireAction {
1438            None,
1439            SingleData(HandleId),
1440            Batch(SmallVec<[FnEmission; 2]>),
1441        }
1442
1443        // Phase 1: snapshot inputs — build DepBatch per dep from dep_records.
1444        // `has_fired_once` is captured here for the Slice E2 OnRerun gate
1445        // (Phase 1.5 below): the cleanup hook only fires when the fn has
1446        // run at least once already in this activation cycle.
1447        let prep: Option<(crate::handle::FnId, Vec<DepBatch>, bool, bool, bool)> = {
1448            let s = self.lock_state();
1449            // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
1450            // on per-thread WaveState. Removed via with_wave_state — no
1451            // re-entry concern because only the immediate remove happens
1452            // under the borrow.
1453            with_wave_state(|ws| {
1454                ws.pending_fires.remove(&node_id);
1455            });
1456            let rec = s.require_node(node_id);
1457            // Skip: terminal, first-run-gate-closed (R2.5.3 / R5.4 — partial
1458            // mode opts out of the gate per D011), or stateless.
1459            //
1460            // D263: when `terminal_as_real_input == true`, a terminal dep
1461            // counts as "real input" so the gate opens on COMPLETE-without-
1462            // DATA from any dep (mirrors `fire_operator`'s unconditional
1463            // terminal-aware clause; gated here per-node so the historical
1464            // sentinel-hold behaviour stays the default for `fire_fn`).
1465            let has_real_input = !rec.has_sentinel_deps()
1466                || (rec.terminal_as_real_input
1467                    && rec.dep_records.iter().any(|dr| dr.terminal.is_some()));
1468            if rec.terminal.is_some() || (!rec.partial && !has_real_input) {
1469                None
1470            } else {
1471                rec.fn_id.map(|fn_id| {
1472                    let use_mask = rec.dep_records.len() <= 64;
1473                    let mask = rec.involved_mask;
1474                    let dep_batches: Vec<DepBatch> = rec
1475                        .dep_records
1476                        .iter()
1477                        .enumerate()
1478                        .map(|(i, dr)| DepBatch {
1479                            data: dr.data_batch.clone(),
1480                            prev_data: dr.prev_data,
1481                            // §10.3 perf (Slice V1): derive from bitmask
1482                            // for ≤64 deps; fall back to per-dep field.
1483                            involved: if use_mask {
1484                                (mask >> i) & 1 != 0
1485                            } else {
1486                                dr.involved_this_wave
1487                            },
1488                        })
1489                        .collect();
1490                    (
1491                        fn_id,
1492                        dep_batches,
1493                        rec.is_dynamic,
1494                        rec.has_fired_once,
1495                        rec.is_producer(),
1496                    )
1497                })
1498            }
1499        };
1500        let Some((fn_id, dep_batches, is_dynamic, has_fired_once, is_producer)) = prep else {
1501            return;
1502        };
1503
1504        // Phase 1.5 (Slice E2 — R2.4.5 OnRerun, lock-released per D045): if
1505        // the fn has fired at least once in this activation cycle, fire its
1506        // OnRerun cleanup hook BEFORE the next invoke_fn re-allocates fn-
1507        // local resources. First-fire is intentionally skipped — there is
1508        // no prior run to clean up. Fires OUTSIDE `FiringGuard` because
1509        // cleanup re-entrance is not the A6 reentrancy concern (which
1510        // protects against `set_deps(self, ...)` from inside the in-flight
1511        // invoke_fn). Operator nodes never reach this path (`fire_regular`
1512        // is the fn-id branch of `fire_fn`; operators dispatch via
1513        // `fire_operator`), so cleanup hooks correctly only fire for fn-
1514        // shaped nodes (state / derived / dynamic / producer).
1515        if has_fired_once {
1516            self.binding
1517                .cleanup_for(node_id, crate::boundary::CleanupTrigger::OnRerun);
1518        }
1519
1520        // Phase 2: invoke fn lock-released. A6 reentrancy guard is scoped to
1521        // the FFI call only — Phase 3's lock-held state mutation is not part
1522        // of "currently firing" because set_deps would already block on the
1523        // state lock by then. Drop on the guard pops the stack even if
1524        // invoke_fn panics, keeping `currently_firing` balanced.
1525        //
1526        // D246 rule 5 / D245 (QA D1) — the owner-side full-`Core` facade
1527        // hand-off is needed ONLY by producer-building bindings (to
1528        // construct `ProducerCtx` from a real Core surface here without a
1529        // thread-local / `Core` clone / stored back-ref — all β-invalid
1530        // under the actor model). Branch on node kind so the hot
1531        // derived/dynamic/state path keeps the single parameterless
1532        // `invoke_fn` virtual call it always had (no `&dyn CoreFull`
1533        // fat-pointer coercion, no default-body re-dispatch) — byte-
1534        // identical to pre-D246, zero §7-floor regression. Only the rare
1535        // producer-build fire pays the facade hand-off. `self: &Core`
1536        // unsized-coerces to `&dyn CoreFull` (`Core: CoreFull`).
1537        let result = {
1538            let _firing = FiringGuard::new(self, node_id);
1539            if is_producer {
1540                self.binding
1541                    .invoke_fn_with_core(node_id, fn_id, &dep_batches, self)
1542            } else {
1543                self.binding.invoke_fn(node_id, fn_id, &dep_batches)
1544            }
1545        };
1546
1547        // Phase 3: apply result under the lock — defensive terminal check
1548        // (a sibling cascade may have terminated this node during phase 2).
1549        let action: FireAction = {
1550            let mut s = self.lock_state();
1551            // Defensive: node may have terminated mid-phase-2 via a sibling
1552            // cascade (a fn that re-entered `Core::error` on a path that
1553            // cascaded here). If so, release any payload handles and no-op.
1554            if s.require_node(node_id).terminal.is_some() {
1555                match &result {
1556                    FnResult::Data { handle, .. } => {
1557                        self.binding.release_handle(*handle);
1558                    }
1559                    FnResult::Batch { emissions, .. } => {
1560                        for em in emissions {
1561                            match em {
1562                                FnEmission::Data(h) | FnEmission::Error(h) => {
1563                                    self.binding.release_handle(*h);
1564                                }
1565                                FnEmission::Complete => {}
1566                            }
1567                        }
1568                    }
1569                    FnResult::Noop { .. } => {}
1570                }
1571                return;
1572            }
1573            let rec = s.require_node_mut(node_id);
1574            rec.has_fired_once = true;
1575            if is_dynamic {
1576                let tracked = match &result {
1577                    FnResult::Data { tracked, .. }
1578                    | FnResult::Noop { tracked }
1579                    | FnResult::Batch { tracked, .. } => tracked.clone(),
1580                };
1581                if let Some(t) = tracked {
1582                    rec.tracked = t.into_iter().collect();
1583                }
1584            }
1585            match result {
1586                FnResult::Noop { .. } => {
1587                    // Slice G: skip Resolved if a prior emission in the same
1588                    // wave already queued tier-3 (would violate R1.3.3.a).
1589                    // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify
1590                    // lives on per-thread WaveState. Borrow scoped to the
1591                    // tier3 read so queue_notify (which re-borrows
1592                    // WaveState) doesn't double-borrow.
1593                    let already_dirty = s.require_node(node_id).dirty;
1594                    let already_tier3 = with_wave_state(|ws| {
1595                        ws.pending_notify
1596                            .get(&node_id)
1597                            .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
1598                    });
1599                    if already_dirty && !already_tier3 {
1600                        self.queue_notify(&mut s, node_id, Message::Resolved);
1601                    }
1602                    FireAction::None
1603                }
1604                FnResult::Data { handle, .. } => FireAction::SingleData(handle),
1605                FnResult::Batch { emissions, .. } if emissions.is_empty() => {
1606                    // Empty Batch is equivalent to Noop — settle with
1607                    // RESOLVED if the node was dirty (R1.3.1.a). Slice G:
1608                    // skip if a prior emission already queued tier-3.
1609                    // Q-beyond Sub-slice 2 (D108, 2026-05-09): see Noop
1610                    // arm above for the WaveState borrow scope rationale.
1611                    let already_dirty = s.require_node(node_id).dirty;
1612                    let already_tier3 = with_wave_state(|ws| {
1613                        ws.pending_notify
1614                            .get(&node_id)
1615                            .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
1616                    });
1617                    if already_dirty && !already_tier3 {
1618                        self.queue_notify(&mut s, node_id, Message::Resolved);
1619                    }
1620                    FireAction::None
1621                }
1622                FnResult::Batch { emissions, .. } => FireAction::Batch(emissions),
1623            }
1624        };
1625
1626        // Phase 4: commit emissions.
1627        match action {
1628            FireAction::None => {}
1629            // Single Data — equals substitution applies (R1.3.2).
1630            FireAction::SingleData(handle) => {
1631                self.commit_emission(node_id, handle);
1632            }
1633            // Batch — process in sequence. No equals substitution
1634            // (R1.3.2.d / R1.3.3.c: multi-message waves pass verbatim).
1635            FireAction::Batch(emissions) => {
1636                self.commit_batch(node_id, emissions);
1637            }
1638        }
1639    }
1640
1641    /// Process a `FnResult::Batch` emissions sequence. Each `Data` goes
1642    /// through `commit_emission_verbatim` (no equals substitution per
1643    /// R1.3.2.d / R1.3.3.c). Terminal emissions (`Complete` / `Error`)
1644    /// cascade per R1.3.4; processing stops at the first terminal and
1645    /// remaining handles are released (R1.3.4.a: no further messages
1646    /// after terminal).
1647    fn commit_batch(&self, node_id: NodeId, emissions: SmallVec<[FnEmission; 2]>) {
1648        let mut iter = emissions.into_iter();
1649        for em in iter.by_ref() {
1650            match em {
1651                FnEmission::Data(handle) => {
1652                    self.commit_emission_verbatim(node_id, handle);
1653                }
1654                FnEmission::Complete => {
1655                    self.complete(node_id);
1656                    break;
1657                }
1658                FnEmission::Error(handle) => {
1659                    self.error(node_id, handle);
1660                    break;
1661                }
1662            }
1663        }
1664        // Release handles from any emissions after the terminal break.
1665        for em in iter {
1666            match em {
1667                FnEmission::Data(h) | FnEmission::Error(h) => {
1668                    self.binding.release_handle(h);
1669                }
1670                FnEmission::Complete => {}
1671            }
1672        }
1673    }
1674
1675    // -------------------------------------------------------------------
1676    // Emission commit — equals-substitution lives here
1677    // -------------------------------------------------------------------
1678
1679    /// Apply a node's emission. `&self`-only; brackets the equals check
1680    /// around a lock release so `BindingBoundary::custom_equals` can re-enter
1681    /// Core safely.
1682    ///
1683    /// Phase 1 (lock-held): defensive terminal short-circuit; snapshot
1684    /// equals_mode + old cache handle.
1685    ///
1686    /// Phase 2 (lock-released): call `handles_equal` — `EqualsMode::Identity`
1687    /// is a pure `u64` compare with no boundary call; `EqualsMode::Custom`
1688    /// crosses to the binding's `custom_equals` oracle, which may re-enter
1689    /// Core.
1690    ///
1691    /// Phase 3 (lock-held): set cache, queue Dirty + Data/Resolved into
1692    /// pending_notify (which snapshots subscribers on first touch),
1693    /// propagate to children.
1694    // Q2 / Q3 (2026-05-09) tipped past clippy's 100-line threshold; the
1695    // function is already a multi-phase wave-engine routine and breaking
1696    // out the four phases would obscure the lock-discipline.
1697    #[allow(clippy::too_many_lines)]
1698    pub(crate) fn commit_emission(&self, node_id: NodeId, new_handle: HandleId) {
1699        assert!(
1700            new_handle != NO_HANDLE,
1701            "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
1702        );
1703
1704        // Phase 1: terminal short-circuit + snapshot equals/cache.
1705        let snapshot = {
1706            let s = self.lock_state();
1707            let rec = s.require_node(node_id);
1708            // (§7-D: the throwaway `bench_state_collapse` relocated
1709            // is_state/producer assert was removed — the normal-path
1710            // validation in `Core::emit` is retained; the
1711            // commit_emission single-pass collapse is deferred, §7-A.)
1712            if rec.terminal.is_some() {
1713                drop(s);
1714                self.binding.release_handle(new_handle);
1715                return;
1716            }
1717            (rec.cache, rec.equals)
1718        };
1719        let (old_handle, equals_mode) = snapshot;
1720
1721        // Slice G (2026-05-07): R1.3.2.d says equals substitution only
1722        // fires for SINGLE-DATA waves at one node. Detect "this is a
1723        // subsequent emit in the same wave at this node" via the
1724        // per-thread `TIER3_EMITTED_THIS_WAVE` thread-local
1725        // (D1 patch, 2026-05-09 — moved off per-partition state to be
1726        // robust against mid-wave cross-thread `set_deps` partition
1727        // splits). If set → multi-emit wave: skip equals, queue Data
1728        // verbatim, retroactively rewrite any prior Resolved (queued by
1729        // an earlier same-value emit's equals match) to Data using the
1730        // wave-start cache snapshot. Outside batch / first emit:
1731        // standard per-emit equals path. Thread-local lookup is
1732        // ~5ns and lock-free.
1733        let is_subsequent_emit_in_wave = tier3_check(node_id);
1734
1735        if is_subsequent_emit_in_wave {
1736            // Multi-emit wave detected. Skip equals, queue Data verbatim.
1737            // Also rewrite any prior Resolved entries to Data using the
1738            // wave-start cache snapshot.
1739            self.rewrite_prior_resolved_to_data(node_id);
1740            self.commit_emission_verbatim(node_id, new_handle);
1741            return;
1742        }
1743
1744        // Phase 2: equals check (lock-released for Custom).
1745        let is_data = !self.handles_equal_lock_released(equals_mode, old_handle, new_handle);
1746
1747        // Phase 3: apply emission under the lock. Defensive terminal
1748        // re-check — a concurrent cascade between phase 2 and phase 3
1749        // could have terminated the node.
1750        let mut s = self.lock_state();
1751        if s.require_node(node_id).terminal.is_some() {
1752            drop(s);
1753            self.binding.release_handle(new_handle);
1754            return;
1755        }
1756
1757        // R1.3.1.a condition (b): synthesize DIRTY only if node not already
1758        // dirty from an earlier emission in the same wave.
1759        let already_dirty = s.require_node(node_id).dirty;
1760        s.require_node_mut(node_id).dirty = true;
1761        if !already_dirty {
1762            self.queue_notify(&mut s, node_id, Message::Dirty);
1763        }
1764
1765        if is_data {
1766            // P3 (Slice A close /qa): re-read CURRENT cache. Same-thread
1767            // re-entry from a `custom_equals` oracle that called back into
1768            // `Core::emit` on this same node during phase 2's lock-released
1769            // equals check could have advanced the cache between phase 1's
1770            // snapshot (`old_handle`) and this point.
1771            let current_cache = s.require_node(node_id).cache;
1772            // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1773            // lives on per-thread WaveState. `in_tick` is the one-Core-
1774            // per-OS-thread [`IN_TICK_OWNED`] slot (D252); this read is on
1775            // the wave-owner thread, so it observes this thread's own
1776            // ownership.
1777            let in_tick = self.in_tick();
1778            let snapshot_taken = if in_tick && current_cache != NO_HANDLE {
1779                use std::collections::hash_map::Entry;
1780                with_wave_state(|ws| match ws.wave_cache_snapshots.entry(node_id) {
1781                    Entry::Vacant(slot) => {
1782                        slot.insert(current_cache);
1783                        true
1784                    }
1785                    Entry::Occupied(_) => false,
1786                })
1787            } else {
1788                false
1789            };
1790            s.require_node_mut(node_id).cache = new_handle;
1791            if current_cache != NO_HANDLE && !snapshot_taken {
1792                self.binding.release_handle(current_cache);
1793            }
1794            // Slice E1 (R2.6.5 / Lock 6.G): push DATA into the replay
1795            // buffer if the node opted in. RESOLVED entries are NOT
1796            // buffered (canonical "DATA only").
1797            self.push_replay_buffer(&mut s, node_id, new_handle);
1798            // Slice G (D1 patch, 2026-05-09): mark this node as having
1799            // emitted tier-3 in this wave on the per-thread tracker.
1800            tier3_mark(node_id);
1801            self.queue_notify(&mut s, node_id, Message::Data(new_handle));
1802            // Propagate to children
1803            let child_ids: Vec<NodeId> = s
1804                .children
1805                .get(&node_id)
1806                .map(|c| c.iter().copied().collect())
1807                .unwrap_or_default();
1808            for child_id in child_ids {
1809                let dep_idx = s.require_node(child_id).dep_index_of(node_id);
1810                if let Some(idx) = dep_idx {
1811                    self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
1812                }
1813            }
1814        } else {
1815            // RESOLVED: handle unchanged. Don't release; old still in use.
1816            // Slice G: snapshot cache so a subsequent same-wave emit can
1817            // rewrite this Resolved to Data using the snapshot.
1818            // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1819            // lives on per-thread WaveState. /qa F1 reverted (2026-05-10);
1820            // D252 (S5) collapsed to one-Core-per-OS-thread `Cell<u64>` —
1821            // `in_tick` is read on the wave-owner
1822            // thread (observes this thread's own ownership).
1823            let current_cache = s.require_node(node_id).cache;
1824            if self.in_tick() && current_cache != NO_HANDLE {
1825                use std::collections::hash_map::Entry;
1826                with_wave_state(|ws| {
1827                    if let Entry::Vacant(slot) = ws.wave_cache_snapshots.entry(node_id) {
1828                        self.binding.retain_handle(current_cache);
1829                        slot.insert(current_cache);
1830                    }
1831                });
1832            }
1833            // Slice G (D1 patch, 2026-05-09): mark this node as having
1834            // emitted tier-3 in this wave on the per-thread tracker.
1835            tier3_mark(node_id);
1836            self.queue_notify(&mut s, node_id, Message::Resolved);
1837            let child_ids: Vec<NodeId> = s
1838                .children
1839                .get(&node_id)
1840                .map(|c| c.iter().copied().collect())
1841                .unwrap_or_default();
1842            // /qa A7 fix (2026-05-09): collect auto-resolve inserts
1843            // during the loop and bulk-insert into pending_auto_resolve
1844            // under a SINGLE cross_partition acquire after the loop.
1845            // Pre-fix the loop acquired `cross_partition` once per
1846            // child via `self.lock_cross_partition().pending_auto_resolve.insert(...)`,
1847            // which is N mutex hops for an N-child cascade. Cannot
1848            // hoist to acquire-cps-before-loop because `queue_notify`
1849            // (called inside the loop) also acquires cross_partition
1850            // for `pending_pause_overflow.push` in the rare overflow
1851            // case — re-entrance on the non-reentrant Mutex would
1852            // self-deadlock.
1853            let mut auto_resolve_inserts: SmallVec<[NodeId; 4]> = SmallVec::new();
1854            for child_id in child_ids {
1855                let already_involved = s.require_node(child_id).involved_this_wave;
1856                if !already_involved {
1857                    {
1858                        let child = s.require_node_mut(child_id);
1859                        child.involved_this_wave = true;
1860                        child.dirty = true;
1861                    }
1862                    self.queue_notify(&mut s, child_id, Message::Dirty);
1863                    // Q2 (2026-05-09): pending_auto_resolve lives on
1864                    // CrossPartitionState. Deferred to after-loop
1865                    // bulk insert per the /qa A7 fix above.
1866                    auto_resolve_inserts.push(child_id);
1867                }
1868            }
1869            // /qa A7 (2026-05-09) — preserved post-Sub-slice-1 (D108):
1870            // single WaveState borrow for the bulk-insert. queue_notify
1871            // above no longer holds the WaveState borrow by the time we
1872            // reach here, so this borrow is uncontested.
1873            if !auto_resolve_inserts.is_empty() {
1874                with_wave_state(|ws| ws.pending_auto_resolve.extend(auto_resolve_inserts));
1875            }
1876        }
1877    }
1878
1879    /// Slice G: when a multi-emit wave is detected at `node_id` (a second
1880    /// emit arrives while a prior tier-3 message is still pending), rewrite
1881    /// any `Resolved` entries from earlier emits to `Data(snapshot_cache)`
1882    /// so the wave conforms to R1.3.3.a (≥1 DATA OR exactly 1 RESOLVED).
1883    /// Touches both `pending_notify` (immediate-flush path) and the per-node
1884    /// pause buffer (paused path).
1885    fn rewrite_prior_resolved_to_data(&self, node_id: NodeId) {
1886        let mut s = self.lock_state();
1887        // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): wave_cache_snapshots
1888        // and pending_notify both live on per-thread WaveState. Single
1889        // WaveState borrow handles both the snapshot lookup and the
1890        // pending_notify rewrite; the pause-buffer path uses the state
1891        // lock and is independent of WaveState.
1892        let snapshot = match with_wave_state(|ws| ws.wave_cache_snapshots.get(&node_id).copied()) {
1893            Some(h) if h != NO_HANDLE => h,
1894            // No snapshot available — the prior Resolved was queued without
1895            // a cache (sentinel pre-emit). Nothing to rewrite to; the
1896            // multi-emit case from sentinel is fine (verbatim Data path).
1897            _ => return,
1898        };
1899        let mut retains_needed = 0u32;
1900        // Pending_notify path. Walk all batches' messages — Slice-G
1901        // coalescing reasons about wave-content per node, not per-batch.
1902        with_wave_state(|ws| {
1903            if let Some(entry) = ws.pending_notify.get_mut(&node_id) {
1904                for msg in entry.iter_messages_mut() {
1905                    if matches!(msg, Message::Resolved) {
1906                        *msg = Message::Data(snapshot);
1907                        retains_needed += 1;
1908                    }
1909                }
1910            }
1911        });
1912        // Pause-buffer path.
1913        if let Some(rec) = s.nodes.get_mut(&node_id) {
1914            if let crate::node::PauseState::Paused { buffer, .. } = &mut rec.pause_state {
1915                for msg in &mut *buffer {
1916                    if matches!(msg, Message::Resolved) {
1917                        *msg = Message::Data(snapshot);
1918                        retains_needed += 1;
1919                    }
1920                }
1921            }
1922        }
1923        drop(s);
1924        // Each rewritten Resolved → Data adds a payload retain that
1925        // queue_notify would otherwise have taken at emit time. The
1926        // snapshot already owns one retain (taken when cache was
1927        // snapshotted); we need one fresh retain per rewrite.
1928        for _ in 0..retains_needed {
1929            self.binding.retain_handle(snapshot);
1930        }
1931    }
1932
1933    /// Equals check that crosses the binding boundary lock-released for
1934    /// `EqualsMode::Custom`. Caller must NOT hold the state lock.
1935    fn handles_equal_lock_released(&self, mode: EqualsMode, a: HandleId, b: HandleId) -> bool {
1936        if a == b {
1937            return true; // identity-on-handles always sufficient
1938        }
1939        if a == NO_HANDLE || b == NO_HANDLE {
1940            return false;
1941        }
1942        match mode {
1943            EqualsMode::Identity => false,
1944            EqualsMode::Custom(handle) => self.binding.custom_equals(handle, a, b),
1945        }
1946    }
1947
1948    /// Commit a DATA emission **without** equals substitution — used by
1949    /// `FnResult::Batch` processing where multi-message waves pass through
1950    /// verbatim per R1.3.2.d / R1.3.3.c. DIRTY auto-prefix respects
1951    /// R1.3.1.a condition (b): only queued if node not already dirty.
1952    ///
1953    /// Structurally identical to the DATA branch of [`Self::commit_emission`]
1954    /// but skips the Phase 2 equals check entirely.
1955    fn commit_emission_verbatim(&self, node_id: NodeId, new_handle: HandleId) {
1956        assert!(
1957            new_handle != NO_HANDLE,
1958            "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
1959        );
1960
1961        let mut s = self.lock_state();
1962        let rec = s.require_node(node_id);
1963        if rec.terminal.is_some() {
1964            drop(s);
1965            self.binding.release_handle(new_handle);
1966            return;
1967        }
1968
1969        // R1.3.1.a condition (b): DIRTY only if not already dirty.
1970        let already_dirty = s.require_node(node_id).dirty;
1971        s.require_node_mut(node_id).dirty = true;
1972        if !already_dirty {
1973            self.queue_notify(&mut s, node_id, Message::Dirty);
1974        }
1975
1976        // Always DATA — no equals substitution for Batch emissions.
1977        // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1978        // lives on per-thread WaveState. /qa F1 reverted (2026-05-10);
1979        // D252 (S5) collapsed to one-Core-per-OS-thread `Cell<u64>` —
1980        // `in_tick` is read on the wave-owner thread.
1981        let current_cache = s.require_node(node_id).cache;
1982        let snapshot_taken = if self.in_tick() && current_cache != NO_HANDLE {
1983            use std::collections::hash_map::Entry;
1984            with_wave_state(|ws| match ws.wave_cache_snapshots.entry(node_id) {
1985                Entry::Vacant(slot) => {
1986                    slot.insert(current_cache);
1987                    true
1988                }
1989                Entry::Occupied(_) => false,
1990            })
1991        } else {
1992            false
1993        };
1994        s.require_node_mut(node_id).cache = new_handle;
1995        if current_cache != NO_HANDLE && !snapshot_taken {
1996            self.binding.release_handle(current_cache);
1997        }
1998        // Slice E1: replay buffer push (R2.6.5 / Lock 6.G).
1999        self.push_replay_buffer(&mut s, node_id, new_handle);
2000        // Slice G QA fix (A2, 2026-05-07) / D1 patch (2026-05-09): mark
2001        // tier3_emitted_this_wave on the per-thread tracker even on the
2002        // verbatim path. A subsequent commit_emission at the same node
2003        // in the same wave needs this flag to detect multi-emit and
2004        // skip equals substitution; without it, a Batch-then-standard
2005        // sequence would queue Resolved into a wave that already has
2006        // Data — violating R1.3.3.a. The Batch path itself still
2007        // passes verbatim per R1.3.3.c (we don't re-run equals here);
2008        // we just record that "this node has emitted tier-3 in this
2009        // wave."
2010        tier3_mark(node_id);
2011        self.queue_notify(&mut s, node_id, Message::Data(new_handle));
2012        // Propagate to children
2013        let child_ids: Vec<NodeId> = s
2014            .children
2015            .get(&node_id)
2016            .map(|c| c.iter().copied().collect())
2017            .unwrap_or_default();
2018        for child_id in child_ids {
2019            let dep_idx = s.require_node(child_id).dep_index_of(node_id);
2020            if let Some(idx) = dep_idx {
2021                self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
2022            }
2023        }
2024    }
2025
2026    /// Slice E1 (R2.6.5 / Lock 6.G): push a DATA handle into the node's
2027    /// replay buffer if opted in. Evicts oldest if cap exceeded; takes a
2028    /// fresh retain on push. RESOLVED is NOT buffered per canonical
2029    /// "DATA only" — call sites only invoke this for Data emissions.
2030    ///
2031    /// Evicted handle is queued into `cps.deferred_handle_releases`
2032    /// (released lock-released at flush time) per the binding-boundary
2033    /// lock-release discipline — `release_handle` may re-enter Core via
2034    /// finalizers and must not run while the state lock is held
2035    /// (QA A3, 2026-05-07). Q2 (2026-05-09): the queue moved to
2036    /// CrossPartitionState; this fn acquires `cross_partition` only
2037    /// when an eviction actually happens (the common case is no
2038    /// eviction → no second-mutex acquire).
2039    fn push_replay_buffer(&self, s: &mut CoreState, node_id: NodeId, new_handle: HandleId) {
2040        let rec = s.require_node_mut(node_id);
2041        let cap = match rec.replay_buffer_cap {
2042            Some(c) if c > 0 => c,
2043            _ => return,
2044        };
2045        self.binding.retain_handle(new_handle);
2046        rec.replay_buffer.push_back(new_handle);
2047        let evicted = if rec.replay_buffer.len() > cap {
2048            rec.replay_buffer.pop_front()
2049        } else {
2050            None
2051        };
2052        if let Some(h) = evicted {
2053            with_wave_state(|ws| ws.deferred_handle_releases.push(h));
2054        }
2055    }
2056
2057    // ===================================================================
2058    // Operator dispatch (Slice C-1, D009).
2059    //
2060    // `fire_operator` is the entry point for nodes whose `kind` is
2061    // `NodeKind::Operator(_)`. It branches on the `OperatorOp` discriminant
2062    // to per-operator helpers that snapshot inputs under the lock, drop the
2063    // lock to call the binding's bulk projection FFI, and reacquire to
2064    // apply emissions via `commit_emission_verbatim` (no per-item equals
2065    // dedup at the wire — operator output passes verbatim per the same
2066    // R1.3.2.d / R1.3.3.c rule that governs `FnResult::Batch`).
2067    //
2068    // **Refcount discipline:** inputs sourced from `dep_records[i].data_batch`
2069    // share retains owned by the wave's data-batch slot (released at
2070    // wave-end rotation in `clear_wave_state`). Operators that emit those
2071    // handles unchanged (`Filter`, `DistinctUntilChanged`, `Pairwise`'s
2072    // `prev` carry-over) take an additional retain via `retain_handle`
2073    // before passing to `commit_emission_verbatim` — the cache slot owns
2074    // its own share, independent of the data-batch slot's. Operators that
2075    // produce fresh handles (`Map` / `Scan` / `Reduce` / `Pairwise`'s
2076    // packed tuples) receive retains pre-bumped by the binding's bulk-
2077    // projection method.
2078    // ===================================================================
2079
2080    /// Operator dispatch entry. Pre-checks (terminal short-circuit, first-
2081    /// run gate accounting for `partial`, terminal-aware fire for `Reduce`)
2082    /// happen here; per-operator behavior lives in the `fire_op_*` helpers.
2083    fn fire_operator(&self, node_id: NodeId, op: OperatorOp) {
2084        // Phase 1 (lock-held): remove from pending_fires, evaluate skip.
2085        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
2086        // per-thread WaveState; state lock + WaveState borrow are
2087        // independent.
2088        let proceed = {
2089            let s = self.lock_state();
2090            with_wave_state(|ws| {
2091                ws.pending_fires.remove(&node_id);
2092            });
2093            let rec = s.require_node(node_id);
2094            if rec.terminal.is_some() {
2095                false
2096            } else {
2097                // First-run gate (R2.5.3 / R5.4). Partial-mode operators
2098                // (D011) opt out of the gate; otherwise we wait for every
2099                // dep to have delivered at least one real handle. Terminal-
2100                // aware operators (currently `Reduce`) additionally count a
2101                // dep terminal as "real input" so they can fire on
2102                // upstream COMPLETE-without-DATA and emit the seed.
2103                let has_real_input = !rec.has_sentinel_deps()
2104                    || rec.dep_records.iter().any(|dr| dr.terminal.is_some());
2105                rec.partial || has_real_input
2106            }
2107        };
2108        if !proceed {
2109            return;
2110        }
2111
2112        // A6 (Slice F, 2026-05-07): track operator fire on the
2113        // `currently_firing` stack so a binding-side project/predicate/fold
2114        // FFI callback that re-enters `Core::set_deps(node_id, ...)` is
2115        // rejected with `SetDepsError::ReentrantOnFiringNode`. Drop pops
2116        // the stack on panic too.
2117        let _firing = FiringGuard::new(self, node_id);
2118
2119        match op {
2120            OperatorOp::Map { fn_id } => self.fire_op_map(node_id, fn_id),
2121            OperatorOp::Filter { fn_id } => self.fire_op_filter(node_id, fn_id),
2122            OperatorOp::Scan { fn_id, .. } => self.fire_op_scan(node_id, fn_id),
2123            OperatorOp::Reduce { fn_id, .. } => self.fire_op_reduce(node_id, fn_id),
2124            OperatorOp::DistinctUntilChanged { equals_fn_id } => {
2125                self.fire_op_distinct(node_id, equals_fn_id);
2126            }
2127            OperatorOp::Pairwise { fn_id } => self.fire_op_pairwise(node_id, fn_id),
2128            OperatorOp::Combine { pack_fn } => self.fire_op_combine(node_id, pack_fn),
2129            OperatorOp::WithLatestFrom { pack_fn } => {
2130                self.fire_op_with_latest_from(node_id, pack_fn);
2131            }
2132            OperatorOp::Merge => self.fire_op_merge(node_id),
2133            OperatorOp::Take { count } => self.fire_op_take(node_id, count),
2134            OperatorOp::Skip { count } => self.fire_op_skip(node_id, count),
2135            OperatorOp::TakeWhile { fn_id } => self.fire_op_take_while(node_id, fn_id),
2136            // The variant carries `default` for `register_operator`'s
2137            // `make_op_scratch` path; once registered, the live default
2138            // is read from `LastState::default` inside `fire_op_last`.
2139            OperatorOp::Last { .. } => self.fire_op_last(node_id),
2140            OperatorOp::Tap { fn_id } => self.fire_op_tap(node_id, fn_id),
2141            OperatorOp::TapFirst { fn_id } => self.fire_op_tap_first(node_id, fn_id),
2142            OperatorOp::Valve => self.fire_op_valve(node_id),
2143            OperatorOp::Settle {
2144                quiet_waves,
2145                max_waves,
2146            } => self.fire_op_settle(node_id, quiet_waves, max_waves),
2147        }
2148    }
2149
2150    /// Snapshot the operator's single dep batch (transform constraint —
2151    /// R5.7 single-dep). Returns `(inputs, terminal)` where `inputs` is a
2152    /// fresh `Vec<HandleId>` (no retains) and `terminal` reflects
2153    /// `dep_records[0].terminal` at snapshot time.
2154    fn snapshot_op_dep0(&self, node_id: NodeId) -> (Vec<HandleId>, Option<TerminalKind>) {
2155        let s = self.lock_state();
2156        let rec = s.require_node(node_id);
2157        debug_assert!(
2158            !rec.dep_records.is_empty(),
2159            "transform operator must have ≥1 dep"
2160        );
2161        let dr = &rec.dep_records[0];
2162        (dr.data_batch.iter().copied().collect(), dr.terminal)
2163    }
2164
2165    /// Emit DIRTY (if not already dirty) followed by RESOLVED. Used by
2166    /// silent-drop operators (Filter / DistinctUntilChanged / Pairwise)
2167    /// when a wave's inputs all suppress and the operator needs to settle
2168    /// the wave for its subscribers (D018 — let DIRTY ride; queue RESOLVED
2169    /// on full-reject).
2170    fn settle_dirty_resolved(&self, node_id: NodeId) {
2171        let mut s = self.lock_state();
2172        if s.require_node(node_id).terminal.is_some() {
2173            return;
2174        }
2175        let already_dirty = s.require_node(node_id).dirty;
2176        s.require_node_mut(node_id).dirty = true;
2177        if !already_dirty {
2178            self.queue_notify(&mut s, node_id, Message::Dirty);
2179        }
2180        // Slice G: skip Resolved if pending_notify already has a tier-3
2181        // message — adding Resolved would violate R1.3.3.a.
2182        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
2183        // on per-thread WaveState; borrow scoped so queue_notify can
2184        // re-borrow.
2185        let already_tier3 = with_wave_state(|ws| {
2186            ws.pending_notify
2187                .get(&node_id)
2188                .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
2189        });
2190        if !already_tier3 {
2191            self.queue_notify(&mut s, node_id, Message::Resolved);
2192        }
2193    }
2194
2195    /// `OperatorOp::Map` dispatch.
2196    fn fire_op_map(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2197        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2198        // Mark fired regardless of input count (activation gate already
2199        // satisfied or partial-mode).
2200        {
2201            let mut s = self.lock_state();
2202            s.require_node_mut(node_id).has_fired_once = true;
2203        }
2204        if inputs.is_empty() {
2205            return;
2206        }
2207        // Phase 2 (lock-released): bulk project. Binding returns one
2208        // handle per input, each with a retain share already taken.
2209        let outputs = self.binding.project_each(fn_id, &inputs);
2210        // Phase 3: emit each output. `commit_emission_verbatim` consumes
2211        // the retain into the cache slot (and releases the prior cache
2212        // handle internally).
2213        for h in outputs {
2214            self.commit_emission_verbatim(node_id, h);
2215        }
2216    }
2217
2218    /// `OperatorOp::Filter` dispatch (D012/D018).
2219    fn fire_op_filter(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2220        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2221        {
2222            let mut s = self.lock_state();
2223            s.require_node_mut(node_id).has_fired_once = true;
2224        }
2225        if inputs.is_empty() {
2226            return;
2227        }
2228        // Phase 2: predicate per input.
2229        let pass = self.binding.predicate_each(fn_id, &inputs);
2230        // Slice V2: promoted from debug_assert! — binding contract violation
2231        // should fail loud in release builds too.
2232        assert!(
2233            pass.len() == inputs.len(),
2234            "predicate_each returned {} bools for {} inputs",
2235            pass.len(),
2236            inputs.len()
2237        );
2238        // Phase 3: emit passing items verbatim. Take a fresh retain for
2239        // each — the data_batch slot still owns its retain (released at
2240        // wave-end rotation), and the cache slot needs its own.
2241        let mut emitted = 0usize;
2242        for (i, &h) in inputs.iter().enumerate() {
2243            if pass.get(i).copied().unwrap_or(false) {
2244                self.binding.retain_handle(h);
2245                self.commit_emission_verbatim(node_id, h);
2246                emitted += 1;
2247            }
2248        }
2249        // D018: full-reject settles with DIRTY+RESOLVED.
2250        if emitted == 0 {
2251            self.settle_dirty_resolved(node_id);
2252        }
2253    }
2254
2255    /// `OperatorOp::Scan` dispatch — left-fold emitting each new acc.
2256    fn fire_op_scan(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2257        use crate::op_state::ScanState;
2258        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2259        let acc = {
2260            let s = self.lock_state();
2261            scratch_ref::<ScanState>(&s, node_id).acc
2262        };
2263        {
2264            let mut s = self.lock_state();
2265            s.require_node_mut(node_id).has_fired_once = true;
2266        }
2267        if inputs.is_empty() {
2268            return;
2269        }
2270        // Phase 2: fold each input through. Returns N new handles, each
2271        // with a fresh retain.
2272        let new_states = self.binding.fold_each(fn_id, acc, &inputs);
2273        // Slice V2: promoted from debug_assert! — binding contract violation.
2274        assert!(
2275            new_states.len() == inputs.len(),
2276            "fold_each returned {} accs for {} inputs",
2277            new_states.len(),
2278            inputs.len()
2279        );
2280        // Phase 3a: update ScanState.acc to the LAST new acc. Take an
2281        // extra retain for the slot; release the prior acc's slot retain.
2282        let last_acc = new_states.last().copied();
2283        if let Some(last) = last_acc {
2284            let prev_acc = {
2285                let mut s = self.lock_state();
2286                let scratch = scratch_mut::<ScanState>(&mut s, node_id);
2287                let prev = scratch.acc;
2288                scratch.acc = last;
2289                prev
2290            };
2291            // Take the slot's retain on the new acc.
2292            self.binding.retain_handle(last);
2293            // Release the prior slot's retain (post-lock to keep binding
2294            // free to re-enter Core safely).
2295            if prev_acc != crate::handle::NO_HANDLE {
2296                self.binding.release_handle(prev_acc);
2297            }
2298        }
2299        // Phase 3b: emit each intermediate acc verbatim. `new_states`
2300        // entries each carry one retain from `fold_each`; that retain is
2301        // consumed by `commit_emission_verbatim` into the cache slot.
2302        for h in new_states {
2303            self.commit_emission_verbatim(node_id, h);
2304        }
2305    }
2306
2307    /// `OperatorOp::Reduce` dispatch — accumulates silently; emits acc on
2308    /// upstream COMPLETE (cascades ERROR verbatim).
2309    fn fire_op_reduce(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2310        use crate::op_state::ReduceState;
2311        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2312        let acc = {
2313            let s = self.lock_state();
2314            scratch_ref::<ReduceState>(&s, node_id).acc
2315        };
2316        {
2317            let mut s = self.lock_state();
2318            s.require_node_mut(node_id).has_fired_once = true;
2319        }
2320        // Phase 2: accumulate (silent — no per-input emit).
2321        let new_states = if inputs.is_empty() {
2322            SmallVec::<[HandleId; 1]>::new()
2323        } else {
2324            self.binding.fold_each(fn_id, acc, &inputs)
2325        };
2326        // Slice V2: promoted from debug_assert! — binding contract violation.
2327        assert!(
2328            new_states.len() == inputs.len(),
2329            "fold_each returned {} accs for {} inputs",
2330            new_states.len(),
2331            inputs.len()
2332        );
2333        // Update ReduceState.acc to last new acc; release intermediate
2334        // states (we don't emit them) and the prior acc's slot retain.
2335        let last_acc = new_states.last().copied();
2336        let intermediates_to_release: Vec<HandleId> = if new_states.len() > 1 {
2337            new_states[..new_states.len() - 1].to_vec()
2338        } else {
2339            Vec::new()
2340        };
2341        let prev_acc_to_release = if let Some(last) = last_acc {
2342            let prev_acc = {
2343                let mut s = self.lock_state();
2344                let scratch = scratch_mut::<ReduceState>(&mut s, node_id);
2345                let prev = scratch.acc;
2346                scratch.acc = last;
2347                prev
2348            };
2349            self.binding.retain_handle(last);
2350            if prev_acc == crate::handle::NO_HANDLE {
2351                None
2352            } else {
2353                Some(prev_acc)
2354            }
2355        } else {
2356            None
2357        };
2358        // Release intermediate fold results (Reduce only emits the LAST,
2359        // but only on terminal). Each was retained by `fold_each`.
2360        for h in intermediates_to_release {
2361            self.binding.release_handle(h);
2362        }
2363        if let Some(h) = prev_acc_to_release {
2364            self.binding.release_handle(h);
2365        }
2366
2367        // Phase 3: emit on terminal.
2368        match terminal {
2369            None => {
2370                // Still accumulating; no emit. Subscribers see no message
2371                // for this wave (silent accumulation). The first wave that
2372                // pushes Reduce to fire produces a Dirty entry on the
2373                // upstream's commit, but Reduce itself doesn't queue any
2374                // tier-3 since R5 silently absorbs. v1: leave the
2375                // post-drain auto-resolve sweep to settle nothing —
2376                // pending_notify has no entry for Reduce so the sweep is
2377                // a no-op.
2378            }
2379            Some(TerminalKind::Complete) => {
2380                // Read the live acc (may be the seed if no DATA arrived)
2381                // and emit Data(acc) + Complete.
2382                let final_acc = {
2383                    let s = self.lock_state();
2384                    scratch_ref::<ReduceState>(&s, node_id).acc
2385                };
2386                if final_acc != crate::handle::NO_HANDLE {
2387                    // Emission needs its own retain (slot's retain is
2388                    // owned by ReduceState.acc until reset/Drop).
2389                    self.binding.retain_handle(final_acc);
2390                    self.commit_emission_verbatim(node_id, final_acc);
2391                }
2392                self.complete(node_id);
2393            }
2394            Some(TerminalKind::Error(h)) => {
2395                // Core::error transfers the caller's share into the
2396                // cascade (node.terminal + per-child dep_terminal slots);
2397                // no release at the error() boundary. Take a fresh share
2398                // here so the cascade owns it independently of the
2399                // dep_records[0].terminal slot's share.
2400                self.binding.retain_handle(h);
2401                self.error(node_id, h);
2402            }
2403        }
2404    }
2405
2406    /// `OperatorOp::DistinctUntilChanged` dispatch.
2407    fn fire_op_distinct(&self, node_id: NodeId, equals_fn_id: crate::handle::FnId) {
2408        use crate::op_state::DistinctState;
2409        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2410        let mut prev = {
2411            let s = self.lock_state();
2412            scratch_ref::<DistinctState>(&s, node_id).prev
2413        };
2414        {
2415            let mut s = self.lock_state();
2416            s.require_node_mut(node_id).has_fired_once = true;
2417        }
2418        if inputs.is_empty() {
2419            return;
2420        }
2421        // Take a working-copy retain on the initial prev so both the loop
2422        // (which releases old_prev on each non-equal item) and phase 3
2423        // (which releases the slot's original handle) each have their own
2424        // share. Without this, the loop's release of old_prev (== original
2425        // DistinctState.prev) double-releases against phase 3's stale_slot
2426        // release.
2427        if prev != crate::handle::NO_HANDLE {
2428            self.binding.retain_handle(prev);
2429        }
2430        // Phase 2: per-input equals(prev, current). Each non-equal input
2431        // is emitted and becomes the new prev. Equals fn_id reuses
2432        // `BindingBoundary::custom_equals`.
2433        let mut emitted = 0usize;
2434        for &h in &inputs {
2435            let equal = if prev == crate::handle::NO_HANDLE {
2436                false
2437            } else if prev == h {
2438                true
2439            } else {
2440                self.binding.custom_equals(equals_fn_id, prev, h)
2441            };
2442            if !equal {
2443                // Emit this input verbatim.
2444                self.binding.retain_handle(h);
2445                self.commit_emission_verbatim(node_id, h);
2446                // Update prev: take retain on new prev, release old
2447                // (working-copy retain from above or from prior iteration).
2448                self.binding.retain_handle(h);
2449                let old_prev = prev;
2450                prev = h;
2451                if old_prev != crate::handle::NO_HANDLE {
2452                    self.binding.release_handle(old_prev);
2453                }
2454                emitted += 1;
2455            }
2456        }
2457        // Phase 3: persist prev into DistinctState.prev slot. Release the
2458        // slot's original retain (stale_slot) — this is the slot-owned
2459        // share, independent of the working-copy share released in the
2460        // loop above.
2461        {
2462            let mut s = self.lock_state();
2463            let scratch = scratch_mut::<DistinctState>(&mut s, node_id);
2464            let stale_slot = scratch.prev;
2465            scratch.prev = prev;
2466            if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
2467                drop(s);
2468                self.binding.release_handle(stale_slot);
2469            }
2470        }
2471        // Release the working-copy retain on the final prev if it was
2472        // never released in the loop (i.e. no non-equal items passed,
2473        // prev == original). In that case stale_slot == prev, so phase 3
2474        // didn't release it either — but the working-copy retain is still
2475        // outstanding. Release it now.
2476        if emitted == 0 && prev != crate::handle::NO_HANDLE {
2477            self.binding.release_handle(prev);
2478        }
2479        if emitted == 0 {
2480            self.settle_dirty_resolved(node_id);
2481        }
2482    }
2483
2484    /// `OperatorOp::Pairwise` dispatch — emits `(prev, current)` tuples
2485    /// starting after the second value (first input swallowed, sets `prev`).
2486    fn fire_op_pairwise(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2487        use crate::op_state::PairwiseState;
2488        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2489        let mut prev = {
2490            let s = self.lock_state();
2491            scratch_ref::<PairwiseState>(&s, node_id).prev
2492        };
2493        {
2494            let mut s = self.lock_state();
2495            s.require_node_mut(node_id).has_fired_once = true;
2496        }
2497        if inputs.is_empty() {
2498            return;
2499        }
2500        let mut emitted = 0usize;
2501        for &h in &inputs {
2502            if prev == crate::handle::NO_HANDLE {
2503                // First-ever value — swallow, set prev. Retain for the
2504                // PairwiseState.prev slot (persisted in phase 3 below).
2505                self.binding.retain_handle(h);
2506                prev = h;
2507                continue;
2508            }
2509            // Pack (prev, current) into a tuple handle. Binding returns a
2510            // fresh retain on the packed handle.
2511            let packed = self.binding.pairwise_pack(fn_id, prev, h);
2512            self.commit_emission_verbatim(node_id, packed);
2513            // Advance prev: take retain on h, release old prev.
2514            self.binding.retain_handle(h);
2515            let old_prev = prev;
2516            prev = h;
2517            self.binding.release_handle(old_prev);
2518            emitted += 1;
2519        }
2520        // Persist prev into PairwiseState.prev slot.
2521        {
2522            let mut s = self.lock_state();
2523            let scratch = scratch_mut::<PairwiseState>(&mut s, node_id);
2524            let stale_slot = scratch.prev;
2525            scratch.prev = prev;
2526            if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
2527                drop(s);
2528                self.binding.release_handle(stale_slot);
2529            }
2530        }
2531        if emitted == 0 {
2532            self.settle_dirty_resolved(node_id);
2533        }
2534    }
2535
2536    // =================================================================
2537    // Slice C-2: multi-dep combinator operators (D020)
2538    // =================================================================
2539
2540    /// Snapshot all deps' "latest" handle for multi-dep combinators.
2541    /// For each dep: returns `data_batch.last()` if non-empty (dep fired
2542    /// this wave), else `prev_data` (last handle from previous wave).
2543    /// Also returns whether dep[0] (primary) had DATA this wave —
2544    /// needed by `fire_op_with_latest_from`.
2545    fn snapshot_op_all_latest(&self, node_id: NodeId) -> (SmallVec<[HandleId; 4]>, bool) {
2546        let s = self.lock_state();
2547        let rec = s.require_node(node_id);
2548        let primary_fired = rec
2549            .dep_records
2550            .first()
2551            .is_some_and(|dr| !dr.data_batch.is_empty());
2552        let latest: SmallVec<[HandleId; 4]> = rec
2553            .dep_records
2554            .iter()
2555            .map(|dr| dr.data_batch.last().copied().unwrap_or(dr.prev_data))
2556            .collect();
2557        (latest, primary_fired)
2558    }
2559
2560    /// `OperatorOp::Combine` dispatch — N-dep combineLatest. Packs the
2561    /// latest handle per dep into a tuple via `pack_tuple`, emits on
2562    /// any dep fire. First-run gate (R2.5.3, partial: false) guarantees
2563    /// all deps have a real handle on first fire. Post-warmup INVALIDATE
2564    /// guard: if any dep's prev_data was cleared, settles with RESOLVED
2565    /// instead of packing a NO_HANDLE into the tuple.
2566    fn fire_op_combine(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
2567        let (latest, _primary_fired) = self.snapshot_op_all_latest(node_id);
2568        {
2569            let mut s = self.lock_state();
2570            s.require_node_mut(node_id).has_fired_once = true;
2571        }
2572        // Post-warmup INVALIDATE guard: a dep may have been invalidated
2573        // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
2574        if latest.contains(&crate::handle::NO_HANDLE) {
2575            self.settle_dirty_resolved(node_id);
2576            return;
2577        }
2578        let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
2579        self.commit_emission_verbatim(node_id, tuple_handle);
2580    }
2581
2582    /// `OperatorOp::WithLatestFrom` dispatch — 2-dep, fire-on-primary-only
2583    /// (D021 / Phase 10.5). Emits `[primary, secondary]` pair only when
2584    /// dep[0] (primary) has DATA in the wave. If only dep[1] fires →
2585    /// RESOLVED. Post-warmup INVALIDATE guard: if secondary latest is
2586    /// `NO_HANDLE` (INVALIDATE cleared it), settles with RESOLVED.
2587    fn fire_op_with_latest_from(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
2588        let (latest, primary_fired) = self.snapshot_op_all_latest(node_id);
2589        let first_fire = {
2590            let mut s = self.lock_state();
2591            let rec = s.require_node_mut(node_id);
2592            let was_first = !rec.has_fired_once;
2593            rec.has_fired_once = true;
2594            was_first
2595        };
2596        // On first fire (gate release), always emit — the first-run gate
2597        // guarantees both deps have values (via prev_data fallback in
2598        // snapshot). On subsequent fires, only emit when primary fires.
2599        if !first_fire && !primary_fired {
2600            // Secondary-only update — no downstream DATA.
2601            self.settle_dirty_resolved(node_id);
2602            return;
2603        }
2604        // Post-warmup INVALIDATE guard: secondary may have been invalidated
2605        // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
2606        debug_assert!(latest.len() == 2, "withLatestFrom requires exactly 2 deps");
2607        if latest[1] == crate::handle::NO_HANDLE {
2608            self.settle_dirty_resolved(node_id);
2609            return;
2610        }
2611        let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
2612        self.commit_emission_verbatim(node_id, tuple_handle);
2613    }
2614
2615    /// `OperatorOp::Merge` dispatch — N-dep, forward all DATA handles
2616    /// verbatim (D022). Zero FFI on fire: no transformation. Each dep's
2617    /// batch handles are collected, retained, and emitted individually.
2618    fn fire_op_merge(&self, node_id: NodeId) {
2619        // Collect all batch handles from all deps (flat).
2620        let all_handles: Vec<HandleId> = {
2621            let s = self.lock_state();
2622            let rec = s.require_node(node_id);
2623            rec.dep_records
2624                .iter()
2625                .flat_map(|dr| dr.data_batch.iter().copied())
2626                .collect()
2627        };
2628        {
2629            let mut s = self.lock_state();
2630            s.require_node_mut(node_id).has_fired_once = true;
2631        }
2632        if all_handles.is_empty() {
2633            // All deps settled RESOLVED this wave — no DATA to forward.
2634            self.settle_dirty_resolved(node_id);
2635            return;
2636        }
2637        // Emit each handle verbatim. Take a fresh retain per handle
2638        // (independent of the dep batch's retain which gets released at
2639        // wave-end). Matches Filter's discipline for passing inputs.
2640        for &h in &all_handles {
2641            self.binding.retain_handle(h);
2642            self.commit_emission_verbatim(node_id, h);
2643        }
2644    }
2645
2646    // =================================================================
2647    // Slice C-3: flow operators (D024)
2648    // =================================================================
2649
2650    /// `OperatorOp::Take` dispatch — emits the first `count` DATA values
2651    /// then self-completes via `Core::complete`. When `count == 0`, the
2652    /// first fire emits zero items then immediately self-completes
2653    /// (D027). Cross-wave counter lives in
2654    /// [`TakeState::count_emitted`](super::op_state::TakeState::count_emitted).
2655    fn fire_op_take(&self, node_id: NodeId, count: u32) {
2656        use crate::op_state::TakeState;
2657        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2658        // Snapshot current counter; mark fired regardless of input count
2659        // (activation gate already satisfied or partial-mode).
2660        let mut count_emitted = {
2661            let s = self.lock_state();
2662            scratch_ref::<TakeState>(&s, node_id).count_emitted
2663        };
2664        {
2665            let mut s = self.lock_state();
2666            s.require_node_mut(node_id).has_fired_once = true;
2667        }
2668        // Already at quota before any input this wave — self-complete
2669        // immediately. Covers `count == 0` (first-fire short-circuit) and
2670        // any defensive re-entry after the terminal-skip in `fire_operator`
2671        // already guards against double-complete.
2672        if count_emitted >= count {
2673            self.complete(node_id);
2674            return;
2675        }
2676        // Per-input emission loop. Each pass takes a fresh retain for the
2677        // cache slot; data_batch slot's retain is released at wave-end
2678        // rotation independently.
2679        for &h in &inputs {
2680            self.binding.retain_handle(h);
2681            self.commit_emission_verbatim(node_id, h);
2682            count_emitted = count_emitted.saturating_add(1);
2683            if count_emitted >= count {
2684                break;
2685            }
2686        }
2687        // Persist the updated counter.
2688        {
2689            let mut s = self.lock_state();
2690            scratch_mut::<TakeState>(&mut s, node_id).count_emitted = count_emitted;
2691        }
2692        // Self-complete if we hit the quota this wave. Upstream COMPLETE
2693        // (terminal == Some(Complete)) without us hitting the count
2694        // propagates via the standard auto-cascade — we don't intercept it.
2695        if count_emitted >= count {
2696            self.complete(node_id);
2697            return;
2698        }
2699        // If upstream is already Errored and we haven't hit count, the
2700        // standard cascade will propagate it. If the wave delivered no
2701        // inputs (e.g. RESOLVED from upstream), settle DIRTY+RESOLVED so
2702        // subscribers see the wave close.
2703        if inputs.is_empty() && terminal.is_none() {
2704            self.settle_dirty_resolved(node_id);
2705        }
2706    }
2707
2708    /// `OperatorOp::Skip` dispatch — drops the first `count` DATA values,
2709    /// then forwards the rest. Cross-wave counter lives in
2710    /// [`SkipState::count_skipped`](super::op_state::SkipState::count_skipped).
2711    /// On a wave where every input is still in the skip window, settles
2712    /// DIRTY+RESOLVED (D018 pattern) so subscribers see the wave close.
2713    fn fire_op_skip(&self, node_id: NodeId, count: u32) {
2714        use crate::op_state::SkipState;
2715        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2716        let mut count_skipped = {
2717            let s = self.lock_state();
2718            scratch_ref::<SkipState>(&s, node_id).count_skipped
2719        };
2720        {
2721            let mut s = self.lock_state();
2722            s.require_node_mut(node_id).has_fired_once = true;
2723        }
2724        // No early-return on empty inputs: the post-loop `emitted == 0`
2725        // settle handles the empty-inputs case identically to the
2726        // all-swallowed-by-skip-window case (Slice C-3 /qa P6 — symmetry
2727        // with `fire_op_take`).
2728        let mut emitted = 0usize;
2729        for &h in &inputs {
2730            if count_skipped < count {
2731                count_skipped = count_skipped.saturating_add(1);
2732                // Drop this input — the data_batch slot still owns its
2733                // retain (released at wave-end rotation). No emission.
2734                continue;
2735            }
2736            // Past the skip window — emit verbatim. Take a fresh retain
2737            // for the cache slot.
2738            self.binding.retain_handle(h);
2739            self.commit_emission_verbatim(node_id, h);
2740            emitted += 1;
2741        }
2742        // Persist the updated counter.
2743        {
2744            let mut s = self.lock_state();
2745            scratch_mut::<SkipState>(&mut s, node_id).count_skipped = count_skipped;
2746        }
2747        if emitted == 0 {
2748            self.settle_dirty_resolved(node_id);
2749        }
2750    }
2751
2752    /// `OperatorOp::TakeWhile` dispatch — emits while the predicate
2753    /// holds; on the first `false`, emits any preceding passes from the
2754    /// same batch then self-completes via `Core::complete`. Reuses
2755    /// [`BindingBoundary::predicate_each`] (D029).
2756    fn fire_op_take_while(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2757        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2758        {
2759            let mut s = self.lock_state();
2760            s.require_node_mut(node_id).has_fired_once = true;
2761        }
2762        if inputs.is_empty() {
2763            return;
2764        }
2765        // Phase 2: predicate per input.
2766        let pass = self.binding.predicate_each(fn_id, &inputs);
2767        // Slice V2: promoted from debug_assert! — binding contract violation
2768        // should fail loud in release builds too.
2769        assert!(
2770            pass.len() == inputs.len(),
2771            "predicate_each returned {} bools for {} inputs",
2772            pass.len(),
2773            inputs.len()
2774        );
2775        // Phase 3: emit each input until the first false; then
2776        // self-complete. `fire_operator`'s `terminal.is_some()`
2777        // short-circuit gates re-entry after the self-complete cascade
2778        // installs the terminal slot — no extra `done` flag needed.
2779        let mut emitted = 0usize;
2780        let mut first_false_seen = false;
2781        for (i, &h) in inputs.iter().enumerate() {
2782            if pass.get(i).copied().unwrap_or(false) {
2783                self.binding.retain_handle(h);
2784                self.commit_emission_verbatim(node_id, h);
2785                emitted += 1;
2786            } else {
2787                first_false_seen = true;
2788                break;
2789            }
2790        }
2791        if first_false_seen {
2792            self.complete(node_id);
2793            return;
2794        }
2795        if emitted == 0 {
2796            // Whole batch passed but was empty (impossible here since
2797            // inputs.is_empty() returned early above) — defensive only.
2798            self.settle_dirty_resolved(node_id);
2799        }
2800    }
2801
2802    /// `OperatorOp::Last` dispatch — buffers the latest DATA; emits
2803    /// `Data(latest)` (or `Data(default)` if no DATA arrived and a
2804    /// default was registered) then `Complete` on upstream COMPLETE.
2805    /// On upstream ERROR, propagates verbatim. Storage:
2806    /// [`LastState`](super::op_state::LastState).
2807    ///
2808    /// **Silent-buffer semantics (mirrors Reduce):** on a non-terminal
2809    /// wave (`terminal == None`), `fire_op_last` updates the buffered
2810    /// `latest` handle but produces NO downstream wire message —
2811    /// subscribers observe the operator only when upstream
2812    /// COMPLETE/ERROR triggers the terminal branch. Intermediate
2813    /// inputs from the dep's batch are dropped on the floor (their
2814    /// `data_batch` retains release at wave-end rotation
2815    /// independently). Per-wave settlement on intermediate waves is
2816    /// the canonical behavior for terminal-aware operators.
2817    fn fire_op_last(&self, node_id: NodeId) {
2818        use crate::op_state::LastState;
2819        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2820        {
2821            let mut s = self.lock_state();
2822            s.require_node_mut(node_id).has_fired_once = true;
2823        }
2824
2825        // Phase 2: buffer the latest input handle (if any). Retain new,
2826        // release old. data_batch slot's retain is released at wave-end
2827        // rotation independently — the LastState slot keeps its own
2828        // share so the value survives across waves.
2829        if let Some(&new_latest) = inputs.last() {
2830            let prev_latest = {
2831                let mut s = self.lock_state();
2832                let scratch = scratch_mut::<LastState>(&mut s, node_id);
2833                let prev = scratch.latest;
2834                scratch.latest = new_latest;
2835                prev
2836            };
2837            self.binding.retain_handle(new_latest);
2838            if prev_latest != crate::handle::NO_HANDLE {
2839                self.binding.release_handle(prev_latest);
2840            }
2841        }
2842
2843        // Phase 3: emit on terminal. Buffer-only fires (no terminal yet)
2844        // produce no downstream message — Reduce-style silent
2845        // accumulation. The post-drain auto-resolve sweep is a no-op
2846        // because pending_notify has no entry for Last.
2847        match terminal {
2848            None => {}
2849            Some(TerminalKind::Complete) => {
2850                // Read the live latest + default. If latest != NO_HANDLE,
2851                // emit it. Otherwise, if default != NO_HANDLE, emit default.
2852                // Otherwise, emit only Complete (empty stream, no default).
2853                let (latest, default) = {
2854                    let s = self.lock_state();
2855                    let scratch = scratch_ref::<LastState>(&s, node_id);
2856                    (scratch.latest, scratch.default)
2857                };
2858                let to_emit = if latest != crate::handle::NO_HANDLE {
2859                    Some(latest)
2860                } else if default != crate::handle::NO_HANDLE {
2861                    Some(default)
2862                } else {
2863                    None
2864                };
2865                if let Some(h) = to_emit {
2866                    // Emission needs its own retain — the LastState slot
2867                    // keeps its share until reset/Drop.
2868                    self.binding.retain_handle(h);
2869                    self.commit_emission_verbatim(node_id, h);
2870                }
2871                self.complete(node_id);
2872            }
2873            Some(TerminalKind::Error(h)) => {
2874                // Take a fresh share for the error cascade — the
2875                // dep_records[0].terminal slot keeps its own share
2876                // (released by reset_for_fresh_lifecycle / Drop).
2877                self.binding.retain_handle(h);
2878                self.error(node_id, h);
2879            }
2880        }
2881    }
2882
2883    // -----------------------------------------------------------------
2884    // Slice U: control operators — fire_op impls
2885    // -----------------------------------------------------------------
2886
2887    /// Tap — side-effect passthrough. Invoke tap fn on each DATA, then
2888    /// emit each input handle unchanged (zero allocation).
2889    fn fire_op_tap(&self, node_id: NodeId, fn_id: FnId) {
2890        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2891        {
2892            let mut s = self.lock_state();
2893            s.require_node_mut(node_id).has_fired_once = true;
2894        }
2895        if inputs.is_empty() {
2896            if terminal.is_none() {
2897                self.settle_dirty_resolved(node_id);
2898            }
2899        } else {
2900            for &h in &inputs {
2901                self.binding.invoke_tap_fn(fn_id, h);
2902                self.binding.retain_handle(h);
2903                self.commit_emission_verbatim(node_id, h);
2904            }
2905        }
2906        // Terminal forwarding.
2907        match terminal {
2908            None => {}
2909            Some(TerminalKind::Complete) => {
2910                self.binding.invoke_tap_complete_fn(fn_id);
2911                self.complete(node_id);
2912            }
2913            Some(TerminalKind::Error(h)) => {
2914                self.binding.invoke_tap_error_fn(fn_id, h);
2915                self.binding.retain_handle(h);
2916                self.error(node_id, h);
2917            }
2918        }
2919    }
2920
2921    /// TapFirst — one-shot side-effect on first DATA. After the first
2922    /// qualifying DATA, acts as pure passthrough.
2923    fn fire_op_tap_first(&self, node_id: NodeId, fn_id: FnId) {
2924        use crate::op_state::TapFirstState;
2925        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2926        {
2927            let mut s = self.lock_state();
2928            s.require_node_mut(node_id).has_fired_once = true;
2929        }
2930        if inputs.is_empty() {
2931            if terminal.is_none() {
2932                self.settle_dirty_resolved(node_id);
2933            }
2934        } else {
2935            let fired = {
2936                let s = self.lock_state();
2937                scratch_ref::<TapFirstState>(&s, node_id).fired
2938            };
2939            for &h in &inputs {
2940                if !fired {
2941                    self.binding.invoke_tap_fn(fn_id, h);
2942                    let mut s = self.lock_state();
2943                    scratch_mut::<TapFirstState>(&mut s, node_id).fired = true;
2944                }
2945                self.binding.retain_handle(h);
2946                self.commit_emission_verbatim(node_id, h);
2947            }
2948        }
2949        if let Some(TerminalKind::Complete) = terminal {
2950            self.complete(node_id);
2951        } else if let Some(TerminalKind::Error(h)) = terminal {
2952            self.binding.retain_handle(h);
2953            self.error(node_id, h);
2954        }
2955    }
2956
2957    /// Valve — conditional forward. dep[0]=source, dep[1]=control.
2958    /// When control is truthy, forwards source DATA; else RESOLVED.
2959    fn fire_op_valve(&self, node_id: NodeId) {
2960        // Snapshot both deps.
2961        let (src_inputs, src_terminal, ctrl_latest) = {
2962            let s = self.lock_state();
2963            let rec = s.require_node(node_id);
2964            debug_assert!(rec.dep_records.len() == 2, "valve must have exactly 2 deps");
2965            let dr0 = &rec.dep_records[0];
2966            let dr1 = &rec.dep_records[1];
2967            let src_inputs: Vec<HandleId> = dr0.data_batch.iter().copied().collect();
2968            let src_term = dr0.terminal;
2969            // Latest control: last of this wave's batch, or prev_data.
2970            let ctrl = dr1.data_batch.last().copied().unwrap_or(dr1.prev_data);
2971            (src_inputs, src_term, ctrl)
2972        };
2973        {
2974            let mut s = self.lock_state();
2975            s.require_node_mut(node_id).has_fired_once = true;
2976        }
2977
2978        // Source terminal forwarding (D3).
2979        if let Some(TerminalKind::Complete) = src_terminal {
2980            self.complete(node_id);
2981            return;
2982        }
2983        if let Some(TerminalKind::Error(h)) = src_terminal {
2984            self.binding.retain_handle(h);
2985            self.error(node_id, h);
2986            return;
2987        }
2988
2989        // Gate: NO_HANDLE means "gate closed" (control never sent DATA);
2990        // any real handle means "gate open". Proper value-level truthiness
2991        // would require BindingBoundary::is_truthy (deferred — D048).
2992        let gate_open = ctrl_latest != crate::handle::NO_HANDLE;
2993
2994        if !gate_open {
2995            self.settle_dirty_resolved(node_id);
2996            return;
2997        }
2998
2999        if src_inputs.is_empty() {
3000            // Control opened but no source DATA this wave. Re-emit
3001            // prev source value if available.
3002            let prev_src = {
3003                let s = self.lock_state();
3004                s.require_node(node_id).dep_records[0].prev_data
3005            };
3006            if prev_src == crate::handle::NO_HANDLE {
3007                self.settle_dirty_resolved(node_id);
3008            } else {
3009                self.binding.retain_handle(prev_src);
3010                self.commit_emission_verbatim(node_id, prev_src);
3011            }
3012        } else {
3013            for &h in &src_inputs {
3014                self.binding.retain_handle(h);
3015                self.commit_emission_verbatim(node_id, h);
3016            }
3017        }
3018    }
3019
3020    /// Settle — convergence detector. Forwards DATA, counts quiet waves,
3021    /// self-completes when converged.
3022    fn fire_op_settle(&self, node_id: NodeId, quiet_waves: u32, max_waves: Option<u32>) {
3023        use crate::op_state::SettleState;
3024        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
3025        {
3026            let mut s = self.lock_state();
3027            s.require_node_mut(node_id).has_fired_once = true;
3028        }
3029
3030        // Terminal forwarding.
3031        if let Some(TerminalKind::Complete) = terminal {
3032            self.complete(node_id);
3033            return;
3034        }
3035        if let Some(TerminalKind::Error(h)) = terminal {
3036            self.binding.retain_handle(h);
3037            self.error(node_id, h);
3038            return;
3039        }
3040
3041        let saw_data = !inputs.is_empty();
3042
3043        // Forward all DATA.
3044        for &h in &inputs {
3045            self.binding.retain_handle(h);
3046            self.commit_emission_verbatim(node_id, h);
3047        }
3048
3049        // Update counters.
3050        let should_complete = {
3051            let mut s = self.lock_state();
3052            let scratch = scratch_mut::<SettleState>(&mut s, node_id);
3053            scratch.wave_count += 1;
3054            if saw_data {
3055                scratch.has_value = true;
3056                scratch.quiet_count = 0;
3057            } else {
3058                scratch.quiet_count += 1;
3059            }
3060            let settled = scratch.has_value && scratch.quiet_count >= quiet_waves;
3061            let exhausted = max_waves.is_some_and(|max| scratch.wave_count >= max);
3062            settled || exhausted
3063        };
3064
3065        if should_complete {
3066            self.complete(node_id);
3067        } else if !saw_data {
3068            self.settle_dirty_resolved(node_id);
3069        }
3070    }
3071
3072    pub(crate) fn deliver_data_to_consumer(
3073        &self,
3074        s: &mut CoreState,
3075        consumer_id: NodeId,
3076        dep_idx: usize,
3077        handle: HandleId,
3078    ) {
3079        // Retain the handle for the batch accumulation slot — each DATA
3080        // handle in `data_batch` owns a retain share, released at wave-end
3081        // rotation in `clear_wave_state`.
3082        self.binding.retain_handle(handle);
3083
3084        let is_dynamic;
3085        let is_state;
3086        let tracked_or_first_fire;
3087        // Slice F audit close (2026-05-07): default-mode pause suppression.
3088        // If the consumer is paused with `PausableMode::Default`, the
3089        // canonical-spec §2.6 behavior is to suppress fn-fire and consolidate
3090        // pause-window dep deliveries into one fn execution on RESUME.
3091        // Mark `pending_wave` on the pause state instead of adding to
3092        // `pending_fires`. The dep state still advances (the data_batch push
3093        // above is unchanged), and clear_wave_state still rotates the latest
3094        // dep DATA into prev_data — so when the fn ultimately fires on
3095        // RESUME, it sees the consolidated post-pause state.
3096        let suppressed_for_default_pause;
3097        {
3098            let consumer = s.require_node_mut(consumer_id);
3099            consumer.dep_records[dep_idx].data_batch.push(handle);
3100            consumer.dep_records[dep_idx].involved_this_wave = true;
3101            consumer.involved_this_wave = true;
3102            // §10.13 perf (D047): set received_mask bit on first DATA
3103            // delivery for this dep.
3104            if dep_idx < 64 {
3105                consumer.received_mask |= 1u64 << dep_idx;
3106                // §10.3 perf (Slice V1): set involved_mask bit for
3107                // O(1) per-dep involvement query during fire.
3108                consumer.involved_mask |= 1u64 << dep_idx;
3109            }
3110            is_dynamic = consumer.is_dynamic;
3111            is_state = consumer.is_state();
3112            tracked_or_first_fire = !consumer.has_fired_once || consumer.tracked.contains(&dep_idx);
3113            suppressed_for_default_pause = consumer.pause_state.is_paused()
3114                && consumer.pausable == crate::node::PausableMode::Default;
3115            if suppressed_for_default_pause {
3116                consumer.pause_state.mark_pending_wave();
3117            }
3118        }
3119        if suppressed_for_default_pause {
3120            // Default-mode pause: don't add to pending_fires; RESUME will
3121            // schedule one consolidated fire.
3122            return;
3123        }
3124        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
3125        // per-thread WaveState. State lock + WaveState borrow are
3126        // independent.
3127        if is_state {
3128            // State nodes don't have deps; unreachable in practice.
3129        } else if is_dynamic {
3130            if tracked_or_first_fire {
3131                with_wave_state(|ws| {
3132                    ws.pending_fires.insert(consumer_id);
3133                });
3134            }
3135        } else {
3136            // Derived / Operator / Producer (Producer has no deps so won't
3137            // reach here, but the predicate-based dispatch handles it
3138            // uniformly).
3139            with_wave_state(|ws| {
3140                ws.pending_fires.insert(consumer_id);
3141            });
3142        }
3143    }
3144
3145    // -------------------------------------------------------------------
3146    // Subscriber notification
3147    // -------------------------------------------------------------------
3148
3149    /// Queue a wave-end message for `node_id`'s subscribers.
3150    ///
3151    /// **Revision-tracked sink-snapshot batches (Slice X4 / D2,
3152    /// 2026-05-08):** each push for a given node either appends the
3153    /// message to the open batch (if `NodeRecord::subscribers_revision`
3154    /// hasn't advanced since that batch opened — the common case — no
3155    /// extra allocation), or opens a fresh batch with a current sink
3156    /// snapshot frozen at the new revision. A sub installed mid-wave
3157    /// bumps `subscribers_revision`; the next `queue_notify` for the
3158    /// same node observes the bump and starts a new batch that includes
3159    /// the new sub. Pre-subscribe batches retain their original snapshot,
3160    /// so earlier emits flush to their original sink list — the new sub
3161    /// does NOT double-receive them via flush AND handshake replay,
3162    /// closing the late-subscriber + multi-emit-per-wave R1.3.5.a gap.
3163    ///
3164    /// Pause routing decision (R1.3.7.b tier table, §10.2 buffering):
3165    ///   Tier 3 (DATA / RESOLVED) and Tier 4 (INVALIDATE) buffer while
3166    ///   paused; all other tiers (DIRTY tier 1, PAUSE/RESUME tier 2,
3167    ///   COMPLETE/ERROR tier 5, TEARDOWN tier 6) bypass the buffer and
3168    ///   flush immediately. START (tier 0) is per-subscription and never
3169    ///   transits queue_notify.
3170    pub(crate) fn queue_notify(&self, s: &mut crate::node::St<'_>, node_id: NodeId, msg: Message) {
3171        // R1.3.3.a / R1.3.3.d (Slice G — re-added 2026-05-07): dev-mode
3172        // wave-content invariant assertion. The tier-3 slot at one node in
3173        // one wave is either ≥1 DATA or exactly 1 RESOLVED — never mixed,
3174        // never multiple RESOLVED. Slice G moved equals substitution from
3175        // per-emit to wave-end coalescing; this assert pins that the
3176        // dispatcher itself never queues a violating combination at the
3177        // queue_notify granularity. Resolved arrivals come from:
3178        //   1. The auto-resolve sweep in `drain_and_flush` (gates on
3179        //      `!any tier-3` so it can't add to a wave with Data).
3180        //   2. The wave-end equals-substitution pass (rewrites in place,
3181        //      doesn't go through queue_notify).
3182        // Both honor R1.3.3.a by construction post-Slice-G.
3183        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
3184        // on per-thread WaveState. The dev-mode invariant assertion
3185        // borrows WaveState briefly and drops before the rest of
3186        // queue_notify proceeds.
3187        #[cfg(debug_assertions)]
3188        if matches!(msg.tier(), 3) {
3189            with_wave_state(|ws| {
3190                if let Some(entry) = ws.pending_notify.get(&node_id) {
3191                    // Walk all batches' messages — R1.3.3.a is a per-node
3192                    // wave-content invariant, not per-batch (the X4 batches
3193                    // are subscriber-snapshot epochs; the protocol-level
3194                    // tier-3 invariant spans the whole wave for the node).
3195                    let has_data = entry.iter_messages().any(|m| matches!(m, Message::Data(_)));
3196                    let resolved_count = entry
3197                        .iter_messages()
3198                        .filter(|m| matches!(m, Message::Resolved))
3199                        .count();
3200                    let incoming_is_data = matches!(msg, Message::Data(_));
3201                    if incoming_is_data {
3202                        debug_assert!(
3203                            resolved_count == 0,
3204                            "R1.3.3.a violation at {node_id:?}: queueing Data into a \
3205                             wave that already contains Resolved — Slice G should have \
3206                             prevented this via wave-end coalescing"
3207                        );
3208                    } else {
3209                        debug_assert!(
3210                            !has_data,
3211                            "R1.3.3.a violation at {node_id:?}: queueing Resolved into a \
3212                             wave that already contains Data"
3213                        );
3214                        debug_assert!(
3215                            resolved_count == 0,
3216                            "R1.3.3.a violation at {node_id:?}: multiple Resolved in one \
3217                             wave at one node"
3218                        );
3219                    }
3220                }
3221            });
3222        }
3223
3224        let buffered_tier = matches!(msg.tier(), 3 | 4);
3225        let cap = s.shared.pause_buffer_cap;
3226
3227        // Pause-routing branch — handles its own retain/release and returns
3228        // before we touch `pending_notify`, so the rec borrow is contained.
3229        {
3230            let rec = s.require_node_mut(node_id);
3231            if rec.subscribers.is_empty() {
3232                return;
3233            }
3234            // Slice F audit close (2026-05-07); amended 2026-05-17 for
3235            // canonical §2.6 R2.6.0 ("Option A"). Pause routing depends on
3236            // mode:
3237            //   - `ResumeAll`: buffer tier-3/4 for verbatim replay on RESUME.
3238            //   - `Default` + STATE node (leaf source — no deps): a state
3239            //     node's value is intrinsic, NOT produced by an fn/dep
3240            //     settle pipeline. PAUSE/RESUME gating is fn/dep-pipeline-
3241            //     scoped only (R2.6.0). A leaf source that holds its own
3242            //     pause lock and then self-emits via a direct external
3243            //     `down([[DATA, v]])` is pushing OUTSIDE that pipeline, so
3244            //     the DATA MUST flush immediately (cache advances now, no
3245            //     PAUSE synthesized, nothing replayed on RESUME). Therefore
3246            //     Default-mode state nodes do NOT buffer — they fall
3247            //     through to the immediate queue path, matching the
3248            //     `@graphrefly/pure-ts` reference (only `pausable:
3249            //     "resumeAll"` buffers a leaf source's direct `down()`).
3250            //   - `Default` + COMPUTE node: suppression happens upstream at
3251            //     fn-fire scheduling (see `deliver_data_to_consumer`); no
3252            //     outgoing tier-3 is produced from this node while paused,
3253            //     so this branch is unreachable for compute-default-paused.
3254            //     Fallthrough to the non-paused queue path is fine.
3255            //   - `Off`: pause is ignored entirely — tier-3 flushes
3256            //     immediately. Fallthrough.
3257            let mode_buffers_tier3 = match rec.pausable {
3258                crate::node::PausableMode::ResumeAll => true,
3259                crate::node::PausableMode::Default | crate::node::PausableMode::Off => false,
3260            };
3261            if buffered_tier && mode_buffers_tier3 && rec.pause_state.is_paused() {
3262                if let Some(h) = msg.payload_handle() {
3263                    self.binding.retain_handle(h);
3264                }
3265                let push_result = rec.pause_state.push_buffered(msg, cap);
3266                for dm in push_result.dropped_msgs {
3267                    if let Some(h) = dm.payload_handle() {
3268                        self.binding.release_handle(h);
3269                    }
3270                }
3271                // R1.3.8.c (Slice F, A3): on first overflow this cycle,
3272                // schedule a synthesized ERROR for wave-end emission.
3273                // `cap` is `Some` here (an overflow can only happen with a
3274                // configured cap), so `unwrap` is safe.
3275                if push_result.first_overflow_this_cycle {
3276                    if let Some((dropped_count, lock_held_ns)) =
3277                        rec.pause_state.overflow_diagnostic()
3278                    {
3279                        // Q-beyond Sub-slice 1 (D108, 2026-05-09):
3280                        // pending_pause_overflow lives on per-thread WaveState.
3281                        with_wave_state(|ws| {
3282                            ws.pending_pause_overflow
3283                                .push(crate::node::PendingPauseOverflow {
3284                                    node_id,
3285                                    dropped_count,
3286                                    configured_max: cap.unwrap_or(0),
3287                                    lock_held_ns,
3288                                });
3289                        });
3290                    }
3291                }
3292                return;
3293            }
3294        }
3295
3296        // Non-paused queue path: retain payload handle and queue into
3297        // pending_notify. Released in `flush_notifications` after sinks
3298        // fire.
3299        if let Some(h) = msg.payload_handle() {
3300            self.binding.retain_handle(h);
3301        }
3302        Self::push_into_pending_notify(s, node_id, msg);
3303    }
3304
3305    /// Slice X4 / D2: revision-tracked batch decision for `queue_notify`'s
3306    /// non-paused path. Either appends `msg` to the open batch (if
3307    /// `subscribers_revision` hasn't advanced since it opened — common
3308    /// case, no extra allocation) or opens a fresh batch with a current
3309    /// sink snapshot frozen at the new revision.
3310    ///
3311    /// Borrow discipline: reads `subscribers_revision` and the snapshot
3312    /// from `s.nodes` BEFORE borrowing WaveState's `pending_notify` to
3313    /// keep the two scopes disjoint.
3314    ///
3315    /// Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_notify` moved
3316    /// to per-thread WaveState. The state-side read of
3317    /// `subscribers_revision` / `subscribers` happens before the
3318    /// `with_wave_state` block opens, then the WaveState borrow
3319    /// performs the entry insertion / append. State lock + WaveState
3320    /// borrow remain independent.
3321    ///
3322    /// Lock-discipline assumption: this read of `subscribers_revision`
3323    /// is safe because both the subscribe install path
3324    /// ([`crate::node::Core::subscribe`]) and `queue_notify` hold
3325    /// `CoreState`'s mutex when they bump / read the revision —
3326    /// concurrent subscribe/unsubscribe cannot interleave. **If
3327    /// `Core::subscribe` ever moves the sink-install lock-released
3328    /// (mirroring the lock-released drain refactor), the revision read
3329    /// here must re-validate post-borrow — otherwise a fresh batch
3330    /// could open with a stale snapshot.**
3331    fn push_into_pending_notify(s: &mut CoreState, node_id: NodeId, msg: Message) {
3332        let current_rev = s.require_node(node_id).subscribers_revision;
3333        let needs_new_batch = with_wave_state(|ws| {
3334            ws.pending_notify.get(&node_id).is_none_or(|entry| {
3335                entry
3336                    .batches
3337                    .last()
3338                    .is_none_or(|b| b.snapshot_revision != current_rev)
3339            })
3340        });
3341        let sinks_snapshot: SmallVec<[Sink; 1]> = if needs_new_batch {
3342            s.require_node(node_id)
3343                .subscribers
3344                .values()
3345                .cloned()
3346                .collect()
3347        } else {
3348            SmallVec::new()
3349        };
3350        with_wave_state(|ws| match ws.pending_notify.entry(node_id) {
3351            Entry::Vacant(slot) => {
3352                let mut batches: SmallVec<[PendingBatch; 1]> = SmallVec::new();
3353                batches.push(PendingBatch {
3354                    snapshot_revision: current_rev,
3355                    sinks: sinks_snapshot,
3356                    messages: smallvec::smallvec![msg],
3357                });
3358                slot.insert(PendingPerNode { batches });
3359            }
3360            Entry::Occupied(mut slot) => {
3361                let entry = slot.get_mut();
3362                if needs_new_batch {
3363                    entry.batches.push(PendingBatch {
3364                        snapshot_revision: current_rev,
3365                        sinks: sinks_snapshot,
3366                        messages: smallvec::smallvec![msg],
3367                    });
3368                } else {
3369                    entry
3370                        .batches
3371                        .last_mut()
3372                        .expect("non-empty by construction (entry exists implies batch exists)")
3373                        .messages
3374                        .push(msg);
3375                }
3376            }
3377        });
3378    }
3379
3380    /// Collect wave-end sink-fire jobs into `ws.deferred_flush_jobs` and the
3381    /// payload-handle releases owed for `pending_notify` into
3382    /// `ws.deferred_handle_releases`. The actual sink fires + handle releases
3383    /// run **after** the state lock is dropped — see [`Core::run_wave`].
3384    ///
3385    /// R1.3.1.b two-phase propagation: phase 1 (DIRTY) propagates through
3386    /// the entire graph before phase 2 (DATA / RESOLVED) begins. Implemented
3387    /// here as cross-node tier-then-node collect — phase 1's jobs sit before
3388    /// phase 2's in `deferred_flush_jobs`, so when `run_wave` drains the
3389    /// queue lock-released, multi-node subscribers see all DIRTYs before any
3390    /// settle. Matches TS's drainPhase model without the per-tier queue
3391    /// indirection.
3392    ///
3393    /// Phase ordering:
3394    ///   1 → tier 1   (DIRTY)
3395    ///   2 → tier 3+4 (DATA/RESOLVED + INVALIDATE — the "settle slice")
3396    ///   3 → tier 5   (COMPLETE/ERROR)
3397    ///   4 → tier 6   (TEARDOWN)
3398    ///
3399    /// Tier 0 (START) is per-subscription (never enters pending_notify) and
3400    /// tier 2 (PAUSE/RESUME) is delivered through dedicated paths, also
3401    /// bypassing pending_notify; both are absent from this enumeration.
3402    ///
3403    /// Within a single phase, per-node insertion order (IndexMap iteration)
3404    /// is preserved — an emit on A before B → A's phase-2 messages flush
3405    /// before B's. Within a single node, message order is preserved.
3406    fn flush_notifications(&self, s: &mut CoreState) {
3407        const PHASES: &[&[u8]] = &[
3408            &[1],    // DIRTY
3409            &[3, 4], // DATA/RESOLVED + INVALIDATE
3410            &[5],    // COMPLETE/ERROR
3411            &[6],    // TEARDOWN
3412        ];
3413        // Q-beyond Sub-slice 1 + 2 + 3 (D108, 2026-05-09): pending_notify,
3414        // deferred_handle_releases, and deferred_flush_jobs all live on
3415        // per-thread WaveState. Take pending_notify under the WaveState
3416        // borrow, drop the borrow, run the per-phase loop (no WaveState
3417        // access in the loop body), then re-borrow WaveState at the end
3418        // to push the collected jobs and payload-handle releases.
3419        //
3420        // /qa F7 (2026-05-10): the `s: &mut CoreState` parameter is
3421        // currently unused inside the per-phase loop — `pending` was
3422        // moved off `s` to WaveState by sub-slice 2, and the per-batch
3423        // sink snapshot is already on the PendingBatch. Kept as a
3424        // parameter to preserve the caller's `let mut s = lock_state();
3425        // self.flush_notifications(&mut s);` invocation shape (caller
3426        // holds the state lock around this call — load-bearing for
3427        // R1.3.5.a per-tier handshake-vs-flush ordering). NOT a "lock
3428        // released" marker; the lock guard belongs to the caller and
3429        // is held throughout this function. A future change that adds
3430        // an in-loop state read should remove the discard below;
3431        // removing the parameter would break the caller's ability to
3432        // express the lock-discipline contract at the call site.
3433        let _ = &*s; // explicit no-op acknowledgement; lock held by caller.
3434                     // D217-AMEND-2 (2026-05-16): recycle the per-wave `pending_notify`
3435                     // map instead of `mem::take`-ing it. `mem::take` installed a
3436                     // fresh `IndexMap::default()` every wave → a new `ahash::
3437                     // RandomState` (entropy via `gen_hasher_seed`/`from_keys`) PLUS
3438                     // RawVec realloc churn (`finish_grow`/`grow_one`) on the next
3439                     // wave's `queue_notify`. Empirical attribution
3440                     // (`examples/profile_st_emit.rs` + macOS `sample`): ~1250 of
3441                     // ~4767 hot-path samples — the dominant §7 floor tax (D217
3442                     // lever-1 "slab store" falsified; `require_node` was minor).
3443                     // Fix: swap the live map with a persistent spare so the next
3444                     // wave fills a capacity-retained, fixed-seed map; process this
3445                     // wave's full map from the spare slot and `.clear()` it (retain
3446                     // capacity + hasher; no drop, no realloc, no reseed). Zero
3447                     // `IndexMap::default()` after thread init.
3448                     //
3449                     // The jobs-building loop now runs INSIDE the single WaveState
3450                     // borrow. This is sound: the loop is pure (iteration +
3451                     // `Arc::clone` + `Vec` collect; no re-entrant `with_wave_state`
3452                     // / `lock_state`), and the caller holds the CoreState lock
3453                     // throughout regardless — so R1.3.5.a per-tier
3454                     // handshake-vs-flush ordering is unchanged. Refcount discipline
3455                     // is unchanged: payload-handle releases are still collected into
3456                     // `deferred_handle_releases` (released post-lock-drop by
3457                     // `BatchGuard::drop`); `.clear()` runs the same element drops as
3458                     // the old map-drop and the `PendingPerNode`/`Message` payloads
3459                     // carry no refcount-releasing `Drop`.
3460        with_wave_state(|ws| {
3461            core::mem::swap(&mut ws.pending_notify, &mut ws.pending_notify_recycle);
3462            // ws.pending_notify         = empty spare (next wave fills it)
3463            // ws.pending_notify_recycle = THIS wave's full map (below)
3464            let mut jobs: DeferredJobs = Vec::new();
3465            let mut releases: Vec<HandleId> = Vec::new();
3466            {
3467                let full = &ws.pending_notify_recycle;
3468                for &phase_tiers in PHASES {
3469                    for (_node_id, entry) in full {
3470                        // Slice X4 / D2: iterate batches in arrival
3471                        // order. Each batch carries its own sink
3472                        // snapshot frozen at open-time; a batch's
3473                        // messages flush to ITS sinks only. Within a
3474                        // single (phase, node), batches stay in arrival
3475                        // order so emit-order semantics are preserved.
3476                        for batch in &entry.batches {
3477                            if batch.sinks.is_empty() {
3478                                continue;
3479                            }
3480                            let phase_msgs: Vec<Message> = batch
3481                                .messages
3482                                .iter()
3483                                .copied()
3484                                .filter(|m| phase_tiers.contains(&m.tier()))
3485                                .collect();
3486                            if phase_msgs.is_empty() {
3487                                continue;
3488                            }
3489                            let sinks_clone: Vec<Sink> =
3490                                batch.sinks.iter().map(Rc::clone).collect();
3491                            jobs.push((sinks_clone, phase_msgs));
3492                        }
3493                    }
3494                }
3495                // Refcount release balances the retain done in
3496                // `queue_notify` for every payload-bearing message that
3497                // landed in pending_notify (across ALL batches per
3498                // node); deferred to post-lock-drop so the binding's
3499                // release path can't re-enter Core under our lock.
3500                for entry in full.values() {
3501                    for msg in entry.iter_messages() {
3502                        if let Some(h) = msg.payload_handle() {
3503                            releases.push(h);
3504                        }
3505                    }
3506                }
3507            }
3508            ws.deferred_flush_jobs.append(&mut jobs);
3509            ws.deferred_handle_releases.append(&mut releases);
3510            // Retain capacity + the existing ahash seed for next wave's
3511            // swap-in. No drop, no realloc, no reseed.
3512            ws.pending_notify_recycle.clear();
3513        });
3514    }
3515
3516    /// Take the deferred sink-fire jobs, payload-handle releases,
3517    /// cleanup-hook fire queue, and pending-wipe queue from `WaveState`.
3518    /// Callers pair this with `drop(state_guard)` and a subsequent
3519    /// [`Self::fire_deferred`] call to deliver the wave's sinks, handle
3520    /// releases, Slice E2 OnInvalidate cleanup hooks, and Slice E2 /qa
3521    /// Q2(b) eager wipe_ctx fires lock-released.
3522    ///
3523    /// Q-beyond Sub-slice 1 (D108, 2026-05-09): `deferred_handle_releases`
3524    /// source moved to per-thread WaveState — signature takes `&mut WaveState`.
3525    /// Q-beyond Sub-slice 3 (D108, 2026-05-09): `deferred_flush_jobs`,
3526    /// `deferred_cleanup_hooks`, and `pending_wipes` all moved to
3527    /// WaveState. The `_s: &mut CoreState` parameter is now unused but
3528    /// kept to preserve the call-site lock-discipline ordering (caller
3529    /// holds the state lock around this call to interleave with prior
3530    /// `clear_wave_state` per-NodeRecord work).
3531    pub(crate) fn drain_deferred(_s: &mut CoreState, ws: &mut WaveState) -> WaveDeferred {
3532        (
3533            std::mem::take(&mut ws.deferred_flush_jobs),
3534            std::mem::take(&mut ws.deferred_handle_releases),
3535            std::mem::take(&mut ws.deferred_cleanup_hooks),
3536            std::mem::take(&mut ws.pending_wipes),
3537        )
3538    }
3539
3540    /// Fire deferred sink-fire jobs in collected order, then release the
3541    /// payload handles owed for messages that landed in `pending_notify`
3542    /// during the wave, then fire any queued Slice E2 OnInvalidate cleanup
3543    /// hooks. All three phases run lock-released so:
3544    /// - Sinks that call back into Core (emit, pause, etc.) re-acquire the
3545    ///   state lock cleanly and run their own nested wave.
3546    /// - The binding's `release_handle` path can't deadlock against a
3547    ///   binding-side mutex held by Core.
3548    /// - User cleanup closures (invoked via `BindingBoundary::cleanup_for`)
3549    ///   may safely re-enter Core for unrelated nodes.
3550    ///
3551    /// **Cleanup-drain panic discipline (D060):** each `cleanup_for` call
3552    /// is wrapped in `catch_unwind` so a single binding panic doesn't
3553    /// short-circuit the per-wave drain. All queued cleanup attempts run;
3554    /// if any panicked, the LAST panic re-raises after the loop completes
3555    /// (preserving wave-end discipline while still surfacing failures).
3556    /// Per D060, Core stays panic-naive about user code — bindings own
3557    /// their host-language panic policy inside `cleanup_for`; this
3558    /// `catch_unwind` is purely about drain-don't-short-circuit.
3559    pub(crate) fn fire_deferred(
3560        &self,
3561        jobs: DeferredJobs,
3562        releases: Vec<HandleId>,
3563        cleanup_hooks: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
3564        pending_wipes: Vec<crate::handle::NodeId>,
3565    ) {
3566        // Slice E2 /qa P1 (2026-05-07): wrap each sink-fire in
3567        // `catch_unwind` so a panicking sink doesn't unwind out of
3568        // `fire_deferred` and drop the queued `releases` +
3569        // `cleanup_hooks`. Mirrors Slice F audit fix A7's per-tier
3570        // handshake-fire discipline. Without this guard, a sink panic
3571        // here would silently leak handle retains AND silently drop
3572        // OnInvalidate cleanup hooks. AssertUnwindSafe is safe because
3573        // we re-raise the last panic at the end after running every
3574        // queued fire — drain ordering is preserved.
3575        let mut last_panic: Option<Box<dyn std::any::Any + Send>> = None;
3576        for (sinks, msgs) in jobs {
3577            for sink in &sinks {
3578                let sink = sink.clone();
3579                let msgs_ref = &msgs;
3580                let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
3581                    sink(msgs_ref);
3582                }));
3583                if let Err(payload) = result {
3584                    last_panic = Some(payload);
3585                }
3586            }
3587        }
3588        for h in releases {
3589            self.binding.release_handle(h);
3590        }
3591        // Slice E2 (D060): drain cleanup hooks with per-item panic
3592        // isolation so the loop always completes. AssertUnwindSafe is
3593        // safe here because we don't rely on logical state being valid
3594        // post-panic — the panic propagates anyway after the drain ends.
3595        for (node_id, trigger) in cleanup_hooks {
3596            let binding = &self.binding;
3597            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
3598                binding.cleanup_for(node_id, trigger);
3599            }));
3600            if let Err(payload) = result {
3601                last_panic = Some(payload);
3602            }
3603        }
3604        // Slice E2 /qa Q2(b) (D069): drain eager wipe_ctx queue with the
3605        // same per-item panic isolation. Fires AFTER cleanup hooks so a
3606        // resubscribable node's OnInvalidate (or any tier-3+ cleanup that
3607        // fires in the same wave) sees pre-wipe binding state if it
3608        // landed in the same wave as the terminal cascade. Mutually
3609        // exclusive with `Subscription::Drop`'s direct-fire site, but
3610        // even concurrent fires are idempotent (binding's `wipe_ctx`
3611        // calls `HashMap::remove` which is a no-op on absent keys).
3612        for node_id in pending_wipes {
3613            let binding = &self.binding;
3614            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
3615                binding.wipe_ctx(node_id);
3616            }));
3617            if let Err(payload) = result {
3618                last_panic = Some(payload);
3619            }
3620        }
3621        if let Some(payload) = last_panic {
3622            std::panic::resume_unwind(payload);
3623        }
3624    }
3625
3626    // -------------------------------------------------------------------
3627    // User-facing batch — coalesce multiple emits into one wave
3628    // -------------------------------------------------------------------
3629
3630    /// Coalesce multiple emissions into a single wave. Every `emit` /
3631    /// `complete` / `error` / `teardown` / `invalidate` call inside `f`
3632    /// queues its downstream work; the wave drains when `f` returns.
3633    ///
3634    /// **R1.3.6.a** — DIRTY still propagates immediately (tier 1 isn't
3635    /// deferred); only tier-3+ delivery is held until scope exit. **R1.3.6.b**
3636    /// — repeated emits on the same node coalesce into a single multi-message
3637    /// delivery (one [`Message::Dirty`] for the wave + one [`Message::Data`]
3638    /// per emit, all delivered together in the per-node phase-2 pass).
3639    ///
3640    /// Nested `batch()` calls share the outer wave; only the outermost call
3641    /// drives the drain. Re-entrant calls from inside an `emit`/fn (the wave
3642    /// engine's own `in_tick` re-entrance) compose with this method
3643    /// transparently — they observe `in_tick = true` and skip drain just
3644    /// like nested `batch()`.
3645    ///
3646    /// On panic inside `f`, the `BatchGuard` returned by the internal
3647    /// `begin_batch` call drops normally and discards pending tier-3+ work
3648    /// (subscribers do not observe the half-built wave). See
3649    /// [`Core::begin_batch`] for the RAII variant if you need explicit control
3650    /// over the scope boundary.
3651    pub fn batch<F>(&self, f: F)
3652    where
3653        F: FnOnce(),
3654    {
3655        let _guard = self.begin_batch();
3656        f();
3657    }
3658
3659    /// RAII batch handle — opens a wave when constructed, drains on drop.
3660    ///
3661    /// Mirrors the closure-based [`Self::batch`] but exposes the scope
3662    /// boundary so callers can compose batches with non-`FnOnce` control
3663    /// flow (e.g. async-state-machine code paths, or splitting setup and
3664    /// drain across helper functions).
3665    ///
3666    /// ```
3667    /// use graphrefly_core::{Core, BindingBoundary, NodeRegistration, NodeOpts,
3668    ///     HandleId, NodeId, FnId, FnResult, DepBatch};
3669    /// use std::sync::Arc;
3670    ///
3671    /// struct Stub;
3672    /// impl BindingBoundary for Stub {
3673    ///     fn invoke_fn(&self, _: NodeId, _: FnId, _: &[DepBatch]) -> FnResult {
3674    ///         FnResult::Noop { tracked: None }
3675    ///     }
3676    ///     fn custom_equals(&self, _: FnId, _: HandleId, _: HandleId) -> bool { false }
3677    ///     fn release_handle(&self, _: HandleId) {}
3678    /// }
3679    ///
3680    /// let core = Core::new(Arc::new(Stub) as Arc<dyn BindingBoundary>);
3681    /// let state_a = core.register(NodeRegistration {
3682    ///     deps: vec![], fn_or_op: None,
3683    ///     opts: NodeOpts { initial: HandleId::new(1), ..Default::default() },
3684    /// }).unwrap();
3685    /// let state_b = core.register(NodeRegistration {
3686    ///     deps: vec![], fn_or_op: None,
3687    ///     opts: NodeOpts { initial: HandleId::new(2), ..Default::default() },
3688    /// }).unwrap();
3689    ///
3690    /// let g = core.begin_batch();
3691    /// core.emit(state_a, HandleId::new(10));
3692    /// core.emit(state_b, HandleId::new(20));
3693    /// drop(g); // wave drains here
3694    /// ```
3695    ///
3696    /// Like the closure form, nested `begin_batch` calls share the outer
3697    /// wave (only the outermost guard drains).
3698    ///
3699    #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3700    pub fn begin_batch(&self) -> BatchGuard<'_> {
3701        // D246/S2c: single-owner ⇒ a `Core` is driven by exactly one
3702        // thread, so there is no cross-thread wave serialization to
3703        // acquire (the §7 group/global wave-locks are deleted) and no
3704        // shard to route to (single shard always).
3705        self.begin_batch_with_guards()
3706    }
3707
3708    /// Begin a batch for a wave seeded at `seed`. Historically this
3709    /// acquired the per-partition `wave_owner` `ReentrantMutex`es for
3710    /// every partition transitively touched from `seed` (the Slice Y1
3711    /// parallelism win). **S2c/D248 deleted that machinery:** `Core` is
3712    /// single-owner `!Send + !Sync`, so a wave is one uninterrupted
3713    /// owner-side drain with nothing to lock. This now delegates
3714    /// directly to [`Self::begin_batch_with_guards`] (the all-`None`
3715    /// single-owner floor acquires nothing); the `seed` parameter is
3716    /// retained for the declared-group identity routing only.
3717    /// Cross-`Core` parallelism is host-native via independent
3718    /// per-worker Cores (actor model), not per-partition locks. D274
3719    /// (2026-05-21) deleted the always-`Ok` `try_begin_batch_for`
3720    /// shim that this used to delegate through.
3721    ///
3722    /// Slice Y1 / Phase E (2026-05-08); infallible-since S2c (D246);
3723    /// shim-collapsed by D274 (2026-05-21).
3724    #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3725    pub fn begin_batch_for(&self, seed: crate::handle::NodeId) -> BatchGuard<'_> {
3726        // D246/S2c: single-owner ⇒ no cross-thread wave serialization
3727        // and no shard routing. `seed` is unused now — the §7 touched-
3728        // group walk is deleted. D274 (2026-05-21): direct call to
3729        // `begin_batch_with_guards`; the always-`Ok` `try_begin_batch_for`
3730        // shim was deleted.
3731        let _ = seed;
3732        self.begin_batch_with_guards()
3733    }
3734
3735    // D274 (2026-05-21): `try_begin_batch_for` was deleted (was an
3736    // always-`Ok` shim). `begin_batch_for` calls `begin_batch_with_guards`
3737    // directly now.
3738
3739    /// Is this thread currently inside an owning wave on this Core?
3740    /// Reads the one-Core-per-thread [`IN_TICK_OWNED`] slot (D252) —
3741    /// `true` iff the slot holds this Core's `generation`. Read on the
3742    /// wave-owner thread (e.g. by `commit_emission` to decide cache-
3743    /// snapshot taking). `#[must_use]`: a discarded result silently
3744    /// loses the ownership/nesting decision (a classic predicate-misuse
3745    /// bug).
3746    #[must_use]
3747    pub(crate) fn in_tick(&self) -> bool {
3748        IN_TICK_OWNED.with(|s| s.get() == self.generation)
3749    }
3750
3751    /// Claim wave ownership for this Core on this thread (D252 "one
3752    /// Core per OS thread" hard invariant). Returns `true` iff this call
3753    /// is the outermost entry (slot was `0`) — i.e. `owns_tick`; `false`
3754    /// for nested same-Core re-entry (slot already holds our
3755    /// `generation`). **Panics fail-loud** if the slot holds any *other*
3756    /// nonzero generation — that is the D252-forbidden cross-Core
3757    /// owner-side nesting on a single thread (one OS thread mid-wave on
3758    /// Core-A entering a wave on Core-B). Under D248 single-owner Core
3759    /// the only call path that could produce it is a `DeferFn` capturing
3760    /// & driving a *second* `&Core`; no in-tree consumer does this, and
3761    /// adding one would violate the actor-model "one worker = one Core"
3762    /// framing — surfaced loudly here rather than silently masking the
3763    /// foreign Core's ownership.
3764    fn claim_in_tick(&self) -> bool {
3765        IN_TICK_OWNED.with(|s| {
3766            let cur = s.get();
3767            if cur == 0 {
3768                s.set(self.generation);
3769                true
3770            } else if cur == self.generation {
3771                false
3772            } else {
3773                // D252 hard invariant: one Core per OS thread. A nonzero
3774                // mismatch means a foreign Core's wave is live on this
3775                // thread — structurally forbidden under D248 single-
3776                // owner (a `DeferFn` driving a second `&Core` on the
3777                // same owner thread). Panic before nesting silently.
3778                //
3779                // Audience: **Rust-dev clarity only.** The
3780                // `@graphrefly/native` JS consumer does NOT see this
3781                // text — `core_actor.rs` M1's `catch_unwind(...)`
3782                // swallows the panic value before it crosses napi (JS
3783                // callers observe a sync_channel disconnect). The
3784                // polished message reaches Rust's panic hook → stderr
3785                // only. The BenchCore line below is preserved as a
3786                // hint for future readers of the panic-hook log; a
3787                // JS-friendly framing is aspirational pending a
3788                // panic→JS bridge slice (NOT committed — see
3789                // porting-deferred §panic-bridge). D258 (S7,
3790                // 2026-05-20) / D262/P4 amend / AMEND-D 2026-05-21 /
3791                // D277 (2026-05-22) text-shape consolidation.
3792                panic!(
3793                    "GraphReFly invariant violated — cross-Core wave nesting \
3794                     on a single OS thread (this Core's generation {self_gen}, \
3795                     observed foreign generation {cur}). One Core per OS \
3796                     thread (D248/D252). \
3797                     \n\
3798                     Rust callers: a `DeferFn` or owner-side seam appears to \
3799                     be driving a *second* `&Core` mid-wave; the substrate \
3800                     does not support cross-Core same-thread nesting. \
3801                     (BenchCore: each instance MUST own its dedicated actor \
3802                     thread — the S6/D255 invariant.) \
3803                     \n\
3804                     Internal reference: `docs/rust-port-decisions.md` \
3805                     D248/D252/D255/D258.",
3806                    self_gen = self.generation,
3807                );
3808            }
3809        })
3810    }
3811
3812    /// Release wave ownership for this Core on this thread. Called by the
3813    /// owning [`BatchGuard::drop`] only — after the `!owns_tick`
3814    /// early-return, so a nested guard never releases — explicitly at
3815    /// each of the three exit points, always AFTER the wave drain +
3816    /// WaveState cleanup and BEFORE `fire_deferred` (so a re-entrant sink
3817    /// emit runs as a fresh owning wave): (1) the closure-body-panic
3818    /// branch, (2) the drain-phase-panic `catch_unwind` arm (before
3819    /// `resume_unwind`), (3) the success path's locked cleanup block.
3820    /// Clears the [`IN_TICK_OWNED`] slot back to `0` (D252). Released
3821    /// exactly once per (Core, wave) on this thread; idempotent (a
3822    /// double-clear of `0` is a no-op).
3823    fn clear_in_tick(&self) {
3824        IN_TICK_OWNED.with(|s| {
3825            debug_assert!(
3826                s.get() == 0 || s.get() == self.generation,
3827                "BatchGuard::clear_in_tick: slot holds foreign \
3828                 generation {observed} (this Core: {self_gen}) — \
3829                 D252 invariant violated",
3830                observed = s.get(),
3831                self_gen = self.generation,
3832            );
3833            s.set(0);
3834        });
3835    }
3836
3837    /// Internal helper: claim `in_tick` and assemble a [`BatchGuard`].
3838    /// D246/S2c: single-owner ⇒ no wave-owner / shard guards to carry.
3839    fn begin_batch_with_guards(&self) -> BatchGuard<'_> {
3840        // Claim wave ownership in the one-Core-per-OS-thread
3841        // [`IN_TICK_OWNED`] slot (D252; see its doc for the same-Core
3842        // nested-re-entry semantics and the cross-Core panic invariant)
3843        // — no state lock needed, since `in_tick` has no cross-thread
3844        // read requirement.
3845        let owns_tick = self.claim_in_tick();
3846        // D1 patch (2026-05-09): defensive wave-start clear of the
3847        // per-thread Slice G tier3 tracker on outermost owning entry.
3848        // The thread-local is cleared at outermost BatchGuard drop on
3849        // both success + panic paths; this start-clear is belt-and-
3850        // suspenders against panic paths that bypass Drop (catch_unwind
3851        // can interleave with thread reuse — e.g. cargo's test-runner
3852        // thread pool — and propagate stale entries from a prior
3853        // panicked test's wave that didn't fully unwind through
3854        // BatchGuard::drop).
3855        if owns_tick {
3856            tier3_clear();
3857            // Q-beyond Sub-slice 1 (D108, 2026-05-09): defensive wave-start
3858            // clear of WaveState's non-retain-holding fields. Mirrors the
3859            // tier3 defensive-clear above. Retain-holding fields
3860            // (wave_cache_snapshots / deferred_handle_releases) MUST be
3861            // empty here — outermost BatchGuard::drop drains them on both
3862            // success + panic paths.
3863            wave_state_clear_outermost();
3864        }
3865        BatchGuard {
3866            core: self,
3867            owns_tick,
3868            _not_send: std::marker::PhantomData,
3869        }
3870    }
3871}
3872
3873/// RAII guard returned by [`Core::begin_batch`].
3874///
3875/// While alive, suppresses per-emit wave drains — multiple `emit` /
3876/// `complete` / `error` / `teardown` / `invalidate` calls coalesce into one
3877/// wave. On drop:
3878/// - Outermost guard: drains the wave (fires sinks, runs cleanup, clears
3879///   in-tick).
3880/// - Nested guard (an outer `BatchGuard` or an in-progress wave already owns
3881///   the in-tick flag): silently no-ops.
3882///
3883/// On thread panic during the closure body, the drop path discards pending
3884/// tier-3+ delivery rather than firing sinks (avoids cascading panics).
3885/// Subscribers observe **no tier-3+ delivery for the panicked wave**.
3886/// State-node cache writes that already executed inside the closure are
3887/// rolled back via wave-cache snapshots — `cache_of(s)` returns the pre-
3888/// panic value. The atomicity guarantee covers both sink-observability and
3889/// cache state.
3890///
3891/// # Thread safety
3892///
3893/// `BatchGuard` is **`!Send`** by design. `begin_batch` claims the
3894/// one-Core-per-OS-thread `in_tick` ownership slot (D252) on the
3895/// calling thread; sending the guard to another thread and dropping it
3896/// there would clear `in_tick` against the wrong thread's slot,
3897/// breaking the "I own the wave scope" semantic. D246/S2c: single-owner
3898/// ⇒ the §7 per-partition `wave_owner` re-entrant mutex(es) are
3899/// deleted; `!Send` is now enforced solely by the
3900/// `PhantomData<*const ()>` marker.
3901///
3902/// ```compile_fail
3903/// use graphrefly_core::{BatchGuard, BindingBoundary, Core, DepBatch, FnId, FnResult, HandleId, NodeId};
3904/// use std::sync::Arc;
3905///
3906/// struct Stub;
3907/// impl BindingBoundary for Stub {
3908///     fn invoke_fn(&self, _: NodeId, _: FnId, _: &[DepBatch]) -> FnResult {
3909///         FnResult::Noop { tracked: None }
3910///     }
3911///     fn custom_equals(&self, _: FnId, _: HandleId, _: HandleId) -> bool { false }
3912///     fn release_handle(&self, _: HandleId) {}
3913/// }
3914/// fn requires_send<T: Send>(_: T) {}
3915/// let core = Core::new(Arc::new(Stub) as Arc<dyn BindingBoundary>);
3916/// let guard = core.begin_batch();
3917/// requires_send(guard); // <- compile_fail: BatchGuard is !Send.
3918/// ```
3919#[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3920pub struct BatchGuard<'a> {
3921    // S2b (D223): borrows `&'a Core` — `Core` is no longer `Clone`
3922    // (owned-by-value, relocates between workers). The guard lives
3923    // entirely within the wave scope of the `&self` that produced it
3924    // (`begin_batch*` takes `&self`), so `self` strictly outlives the
3925    // guard — identical to S1's `FiringGuard<'a>`. `!Send` via
3926    // `_not_send`.
3927    core: &'a Core,
3928    owns_tick: bool,
3929    /// D246/S2c: single-owner ⇒ no per-partition wave-owner guards and
3930    /// no ambient shard-key guard (both were shared-Core-era §7
3931    /// machinery, now deleted). `BatchGuard` stays `!Send` purely via
3932    /// this `PhantomData<*const ()>` — a wave guard is wave-scoped to
3933    /// the one owner thread and must not cross threads.
3934    _not_send: std::marker::PhantomData<*const ()>,
3935}
3936
3937impl BatchGuard<'_> {
3938    /// Panic-discard cleanup for the owning guard: drop pending wave
3939    /// work, release queued payload + handle retains lock-released,
3940    /// restore pre-wave cache snapshots, clear per-thread `WaveState` +
3941    /// the Slice-G tier3 tracker, and discard deferred producer ops.
3942    ///
3943    /// Shared by BOTH panic origins so a drain-phase fn/sink panic gets
3944    /// the identical `BatchGuard` atomicity guarantee as a closure-body
3945    /// panic: (1) the `std::thread::panicking()` branch (panic propagated
3946    /// from the wave's *closure body* — drop runs during that unwind),
3947    /// and (2) the success-path `catch_unwind` around `drain_and_flush()`
3948    /// (a fn/sink panic that escaped the inner per-call `catch_unwind`
3949    /// isolation while drop was NOT already unwinding). /qa D047.
3950    ///
3951    /// Does NOT release `in_tick` — each `BatchGuard::drop` exit path
3952    /// calls `clear_in_tick()` explicitly, after this cleanup and before
3953    /// `fire_deferred` (so a re-entrant sink emit runs as a fresh owning
3954    /// wave).
3955    fn discard_wave_cleanup(&self) {
3956        let (
3957            pending,
3958            pending_recycle,
3959            deferred_releases,
3960            restored_releases,
3961            restored_terminal_releases,
3962        ) = {
3963            let mut s = self.core.lock_state();
3964            // WaveState borrowed alongside state for panic-discard
3965            // cleanup. The WaveState borrow is per-thread, independent of
3966            // state. `in_tick` is the one-Core-per-OS-thread
3967            // [`IN_TICK_OWNED`] slot (D252), released separately by the
3968            // explicit `clear_in_tick` on each
3969            // exit path; this method only drains/cleans the per-thread
3970            // WaveState retain-fields.
3971            with_wave_state(|ws| {
3972                let pending = std::mem::take(&mut ws.pending_notify);
3973                // D217-AMEND-2 / QA: `pending_notify_recycle` is the
3974                // ONE retain-capable WaveState field whose retains live
3975                // in a *persistent* slot (not an owned local that drops
3976                // on unwind). On the success path it is empty here
3977                // (`flush_notifications` cleared it). But if a wave
3978                // panic-discards mid-`flush_notifications` AFTER the
3979                // swap but before `.clear()`, this slot holds the full
3980                // wave's payload-retaining map while `pending_notify` is
3981                // the empty spare. Drain it symmetrically so the same
3982                // "every retain field released on the panic path"
3983                // invariant the sibling fields uphold also covers
3984                // recycle (closes the leak + the stale-entry-injected-
3985                // into-next-wave hazard; success path: this is a no-op
3986                // take of an already-empty map).
3987                let pending_recycle = std::mem::take(&mut ws.pending_notify_recycle);
3988                let _: DeferredJobs = std::mem::take(&mut ws.deferred_flush_jobs);
3989                ws.pending_fires.clear();
3990                let restored = self.core.restore_wave_cache_snapshots(&mut s, ws);
3991                // D291: restore terminal slots that transitioned during
3992                // this wave (closes Case 5 — R4.3.2 status-snapshot
3993                // completeness). Returns ERROR-tier handles for
3994                // lock-released release; the terminal slots own those
3995                // retains pre-restore, so restore_terminal transfers
3996                // ownership into this vec.
3997                let restored_terminal = self.core.restore_wave_terminal_snapshots(&mut s, ws);
3998                // D297: roll back `has_received_teardown` flags set during
3999                // this wave so a retry of `teardown(node)` post-rollback
4000                // re-runs the auto-COMPLETE prepend + queue Teardown path.
4001                // The set holds no handles; nothing to release lock-released
4002                // (the `Teardown` wire message — no payload — was already
4003                // cleared by the `pending_notify` drop above). Called
4004                // alongside `restore_wave_terminal_snapshots` since both
4005                // close R4.3.2 atomicity gaps for `teardown_inner`'s state
4006                // mutations.
4007                self.core.restore_wave_teardown_snapshots(&mut s, ws);
4008                // clear_wave_state pushes batch-handle releases into
4009                // ws.deferred_handle_releases, so take ws's queue AFTER
4010                // the clear.
4011                s.clear_wave_state(ws);
4012                // Step 2a (D220-EXEC): defensive `currently_firing`
4013                // clear, relocated here from `CoreState::clear_wave_state`
4014                // (the field moved to the separate `CoreShared` region;
4015                // `St`'s `.shared` reaches it, a `&mut CoreState` can't).
4016                // Same wave-end point; `FiringGuard` RAII already
4017                // balances push/pop — this is the belt-and-suspenders
4018                // net for a future guard-bypassing path.
4019                s.shared.currently_firing.clear();
4020                ws.clear_wave_state();
4021                let deferred_releases = std::mem::take(&mut ws.deferred_handle_releases);
4022                // Slice E2 (D061): panic-discard wave drops queued
4023                // OnInvalidate cleanup hooks SILENTLY. Bindings using
4024                // OnInvalidate for external-resource cleanup MUST
4025                // idempotent-cleanup at process exit / next successful
4026                // invalidate. Mirrors A3 `pending_pause_overflow`
4027                // panic-discard precedent.
4028                let _: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)> =
4029                    std::mem::take(&mut ws.deferred_cleanup_hooks);
4030                // Slice E2 /qa Q2(b) (D069): same panic-discard discipline
4031                // for the eager-wipe queue. A panic-discarded wave drops
4032                // queued `wipe_ctx` fires silently; the binding-side
4033                // `NodeCtxState` entry remains across the rolled-back
4034                // batch.
4035                //
4036                // **D296 (2026-05-26) — dropping the queue here is the
4037                // CORRECT behavior post-D291, NOT a leak.** When a
4038                // batch panics:
4039                //
4040                // 1. `restore_wave_terminal_snapshots` (above, D291)
4041                //    resets `rec.terminal = None` — the lifecycle did
4042                //    NOT end; the terminal was rolled back.
4043                // 2. Per **R2.4.6**, `wipe_ctx` fires "on resubscribable
4044                //    terminal RESET" — when a terminal lifecycle
4045                //    COMPLETES, not when one was attempted-and-aborted.
4046                // 3. Per **R4.3.2** atomicity (post-rollback observable
4047                //    state ≡ pre-batch observable state), the binding's
4048                //    `NodeCtxState` entry is the pre-batch lifecycle
4049                //    that CONTINUES post-rollback — firing `wipe_ctx`
4050                //    here would violate the cross-side R4.3.2 contract.
4051                //
4052                // The D291 deferred-scope's original framing of this as
4053                // a D061-style leak was reframed by the D296 HALT
4054                // premise check; the existing `mem::take` was already
4055                // honest under R2.4.6 + R4.3.2. D296 ships
4056                // [`tests/d296_pending_wipes_rollback.rs`] as a
4057                // regression pin so a future refactor of
4058                // `Core::terminate_node` / this discard path /
4059                // `wave_terminal_snapshots` can't silently break the
4060                // wipe-NOT-fired + ctx-preserved symmetry. See also
4061                // [`crates/graphrefly-core/src/boundary.rs`]
4062                // `BindingBoundary::wipe_ctx` (R2.4.6 contract).
4063                let _: Vec<crate::handle::NodeId> = std::mem::take(&mut ws.pending_wipes);
4064                (
4065                    pending,
4066                    pending_recycle,
4067                    deferred_releases,
4068                    restored,
4069                    restored_terminal,
4070                )
4071            })
4072        };
4073        // Lock dropped — release retains lock-released so the binding
4074        // can't deadlock against an internal binding mutex.
4075        for entry in pending.values() {
4076            for msg in entry.iter_messages() {
4077                if let Some(h) = msg.payload_handle() {
4078                    self.core.binding.release_handle(h);
4079                }
4080            }
4081        }
4082        // Symmetric with the `pending` loop above (D217-AMEND-2 / QA):
4083        // releases the full-map retains stranded in the recycle slot by
4084        // a panic between `flush_notifications`'s swap and clear. Empty
4085        // (no-op) on every non-panic path.
4086        for entry in pending_recycle.values() {
4087            for msg in entry.iter_messages() {
4088                if let Some(h) = msg.payload_handle() {
4089                    self.core.binding.release_handle(h);
4090                }
4091            }
4092        }
4093        for h in deferred_releases {
4094            self.core.binding.release_handle(h);
4095        }
4096        for h in restored_releases {
4097            self.core.binding.release_handle(h);
4098        }
4099        // D291: release ERROR-tier handles transferred out of restored
4100        // terminal slots. Same lock-released discipline as
4101        // `restored_releases` above.
4102        for h in restored_terminal_releases {
4103            self.core.binding.release_handle(h);
4104        }
4105        // D1 patch (2026-05-09): clear the per-thread Slice G tier3
4106        // tracker on outermost wave-end (panic-discard path). The
4107        // thread-local outlives the BatchGuard otherwise — cargo's
4108        // thread reuse across tests would propagate stale entries.
4109        tier3_clear();
4110        // §7 (D208–D211): the panic-path "discard deferred producer
4111        // ops" block is DELETED. There is no `deferred_producer_ops`
4112        // queue — producer ops execute immediately
4113        // (`Core::push_deferred_producer_op`), so on a panic-discard
4114        // there is nothing queued to release/drop.
4115    }
4116}
4117
4118/// D260: cap on the outer wave-end drain-to-quiescence loop. Realistic
4119/// legitimate cascades stay ≤3 passes; 32 gives 10× headroom and
4120/// panics loudly on a real emit-loop. See [`Drop for BatchGuard`] body.
4121const D260_MAX_REDRAIN_PASSES: u32 = 32;
4122
4123impl Drop for BatchGuard<'_> {
4124    fn drop(&mut self) {
4125        if !self.owns_tick {
4126            // Nested / non-owning guard: never claimed ownership, so it
4127            // must never release it. The owning guard's RAII releaser
4128            // (below) is the single clear site.
4129            return;
4130        }
4131        // Wave-ownership (`in_tick`) release discipline. `clear_in_tick`
4132        // must run AFTER the wave's drain + WaveState cleanup but BEFORE
4133        // `fire_deferred` (sinks), on every exit path:
4134        //
4135        // - **Before `fire_deferred` (load-bearing):** a sink re-entering
4136        //   `Core::emit` / `complete` from a flush callback must run as a
4137        //   fresh OWNING wave (so its own emissions drain + deliver). If
4138        //   `in_tick` were still owned during `fire_deferred`, that
4139        //   re-entrant emit would be a non-owning no-op and its data
4140        //   silently lost (regression caught by
4141        //   `lock_discipline::sink_can_reenter_core_via_emit`). This is
4142        //   why each path clears explicitly at the right point — NOT via
4143        //   an end-of-`drop` RAII guard (which would clear *after*
4144        //   `fire_deferred`).
4145        // - **/qa hardening (D047):** a fn/sink panic in the drain phase
4146        //   can escape the per-call `catch_unwind` isolation (e.g. a
4147        //   derived fn panicking when fired). Drop is NOT already
4148        //   unwinding, so it would otherwise skip BOTH the WaveState
4149        //   drain (→ next owning wave trips `wave_state_clear_outermost`)
4150        //   AND the `in_tick` clear (pre-D047 the explicit clear had this
4151        //   same window). Catching the drain panic, running the shared
4152        //   discard cleanup + `clear_in_tick`, then `resume_unwind` gives
4153        //   a drain-phase panic the identical atomicity as a
4154        //   closure-body panic.
4155        if std::thread::panicking() {
4156            // Closure-body panic — drop runs during that unwind. Discard
4157            // pending wave work (don't fire sinks mid-unwind — a sink
4158            // panic then aborts the process), release queued retains,
4159            // restore caches, then release ownership.
4160            self.discard_wave_cleanup();
4161            self.core.clear_in_tick();
4162            return;
4163        }
4164        // D260 / S7 (2026-05-20): wave-end drain-to-quiescence past
4165        // `fire_deferred`. The S2b/D232-AMEND contract is "the drain
4166        // loop applies mailbox/DeferQueue ops **in-wave, immediately**";
4167        // pre-D260 the BatchGuard ended after one `fire_deferred` pass,
4168        // so posts made by sinks firing **inside** `fire_deferred`
4169        // (post-`drain_and_flush`-exit) were stranded until the next
4170        // external wave entry. D260 completes the D232-AMEND promise:
4171        // loops the full drain → extract → clear-in_tick → fire_deferred
4172        // → release cycle until BOTH `mailbox` AND `deferred` quiesce.
4173        //
4174        // **Primary same-thread case (the actual bug surface).** Post-
4175        // S2b/S2c, producer-build sinks (e.g. `graphrefly_operators::
4176        // buffer::buffer`'s `notifier_sink`, the reactive-log `view`'s
4177        // internal sink) capture `MailboxEmitter` / `SinkEmitter` and
4178        // emit via `mailbox.post_emit(...)` instead of the pre-S2b
4179        // pre-S2b direct `Core::emit` (no mailbox). The post is supposed
4180        // to be drained by the wave's `drain_and_flush` loop (top-of-
4181        // iteration `is_runnable()` check), but `fire_deferred` runs
4182        // AFTER `drain_and_flush` exits. Same-thread same-wave: a
4183        // producer-build sink in `fire_deferred` posts to mailbox →
4184        // stranded → 35 parity failures (buffer / stratify / higher-
4185        // order / zip / control / messaging — all uniform "emissions
4186        // lost"). The cross-thread autonomous-timer-task case (D227/
4187        // D230) is the secondary motivation for `MailboxEmitter`'s
4188        // `Send + Sync` shape, but the in-tree bug surface was the
4189        // same-thread sink case D260 plugs.
4190        //
4191        // **Why iteration not nested recursion.** Pre-S2b, sinks
4192        // captured `Core` directly and synchronous emit ran a
4193        // *nested wave* per emit (recursive RAII; depth-first). D260's
4194        // iteration at the wave-end frontier coalesces all post-
4195        // `fire_deferred` mailbox ops into one outer wave (breadth-
4196        // first at the boundary; fewer nested-wave entries; identical
4197        // quiescence semantics).
4198        //
4199        // **Canonical timing convergence (A3, user-locked 2026-05-20).**
4200        // Rust IS the canonical for this drain-to-quiescence shape;
4201        // TS/PY MUST converge if/when they expose an equivalent
4202        // mailbox-posting sink seam. Pure-ts today implements the same
4203        // observable behavior via a direct-call sink path (no mailbox
4204        // indirection → no "stranded post" hazard → naturally quiescent
4205        // at wave end); cross-arm parity scenarios (buffer, view(slice),
4206        // stratify, ...) verify the observable agreement empirically.
4207        //
4208        // **Bounded-iteration:** D260's outer loop is capped at
4209        // `D260_MAX_REDRAIN_PASSES` (32) — realistic legitimate
4210        // cascades stay ≤3 (each iteration drains a fresh post produced
4211        // by the previous fire_deferred); 32 gives 10× headroom and
4212        // panics loudly on a real emit-loop. Per-iteration
4213        // `drain_and_flush` is independently capped by
4214        // `max_batch_drain_iterations` (configurable via
4215        // `Core::set_max_batch_drain_iterations`). A user sink that
4216        // emit-loops forever was a stack overflow pre-S2b — same
4217        // hazard, now iteration-shaped (no stack growth, bounded by
4218        // the cap, fail-loud).
4219        let mut redrain_passes: u32 = 0;
4220        loop {
4221            redrain_passes += 1;
4222            assert!(
4223                redrain_passes <= D260_MAX_REDRAIN_PASSES,
4224                "D260: wave-end drain-to-quiescence loop did not converge in \
4225                 {D260_MAX_REDRAIN_PASSES} passes (mailbox_runnable={mb}, \
4226                 deferred_runnable={df}). A producer-build sink is likely \
4227                 in an emit-loop: each `fire_deferred` pass posts a fresh \
4228                 `MailboxOp` that retriggers another `fire_deferred` pass. \
4229                 Pre-S2b this would have been a stack overflow; D260 \
4230                 iteration-shapes the hazard with this cap. Investigate \
4231                 the producer-build sink that consumes its own output.",
4232                mb = self.core.mailbox.is_runnable(),
4233                df = self.core.deferred.is_runnable(),
4234            );
4235            if let Err(payload) = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
4236                self.core.drain_and_flush();
4237            })) {
4238                self.discard_wave_cleanup();
4239                self.core.clear_in_tick();
4240                std::panic::resume_unwind(payload);
4241            }
4242            // Wave cleanup + extract deferred jobs under the lock.
4243            let (jobs, releases, cleanup_hooks, pending_wipes, snapshot_releases) = {
4244                let mut s = self.core.lock_state();
4245                // Q-beyond Sub-slice 1 + 3 (D108, 2026-05-09): WaveState
4246                // borrowed alongside state for wave-end cleanup. Per-thread;
4247                // independent of state. Sub-slice 3 moved deferred_* drains
4248                // into WaveState. /qa F1+F2 (2026-05-10) reverted in_tick +
4249                // currently_firing back to CoreState — clear via
4250                // CoreState::clear_wave_state under the held state lock.
4251                let result = with_wave_state(|ws| {
4252                    s.clear_wave_state(ws);
4253                    // Step 2a (D220-EXEC): defensive `currently_firing`
4254                    // clear, relocated here from `CoreState::clear_wave_state`
4255                    // (the field moved to the separate `CoreShared` region;
4256                    // `St`'s `.shared` reaches it, a `&mut CoreState` can't).
4257                    // Same wave-end point; `FiringGuard` RAII already
4258                    // balances push/pop — this is the belt-and-suspenders
4259                    // net for a future guard-bypassing path.
4260                    s.shared.currently_firing.clear();
4261                    ws.clear_wave_state();
4262                    // /qa A1 (2026-05-09) discipline preserved: drain snapshot
4263                    // retains under lock, release lock-released below to avoid
4264                    // binding re-entrance under held mutex / borrow.
4265                    let snapshot_releases = Core::drain_wave_cache_snapshots(ws);
4266                    // D291: success-path drain for terminal-slot snapshot
4267                    // sets. The snapshot sets hold NO retains (slot owns
4268                    // the ERROR-handle retain pre-snapshot AND post-commit
4269                    // — the slot keeps it across the wave on commit); we
4270                    // just clear the bookkeeping sets so the next wave
4271                    // starts clean. Mirrors the placement of
4272                    // `drain_wave_cache_snapshots`.
4273                    Core::drain_wave_terminal_snapshots(ws);
4274                    // `drain_deferred` takes `deferred_flush_jobs` +
4275                    // `deferred_handle_releases` (incl. rotation releases pushed
4276                    // by `clear_wave_state` above) + Slice E2
4277                    // `deferred_cleanup_hooks` + Slice E2 /qa Q2(b)
4278                    // `pending_wipes` — all from WaveState post-Sub-slice-3.
4279                    let (jobs, releases, hooks, wipes) = Core::drain_deferred(&mut s, ws);
4280                    (jobs, releases, hooks, wipes, snapshot_releases)
4281                });
4282                // Release wave ownership now — AFTER drain + WaveState
4283                // cleanup, BEFORE `fire_deferred` below. Load-bearing: a sink
4284                // re-entering Core from a flush callback must observe
4285                // `in_tick` clear so its emit runs as a fresh owning wave.
4286                // (Mirrors the placement of the pre-D047 `s.in_tick = false`;
4287                // the drain-phase-panic window that placement had is closed
4288                // by the `catch_unwind` above.)
4289                self.core.clear_in_tick();
4290                result
4291            };
4292            // Lock dropped — fire deferred sinks + release retains + fire
4293            // cleanup hooks (Slice E2 OnInvalidate, D060 catch_unwind drain)
4294            // + fire eager wipes (D069).
4295            //
4296            // D260 /qa P1 (2026-05-20): wrap `fire_deferred` in `catch_unwind`
4297            // so a sink panic inside this iteration (`fire_deferred`'s own
4298            // `last_panic` + `resume_unwind`-at-end discipline propagates a
4299            // panic out) doesn't bypass the per-iteration post-`fire_deferred`
4300            // cleanup (snapshot_releases at line ~3910, plus the outer
4301            // wave-end `tier3_clear` after the loop). Mirrors the L3844
4302            // drain-phase wrap. On panic: release the queued
4303            // `snapshot_releases` defensively lock-released, run the outer
4304            // wave-end finalization (`tier3_clear`), then `resume_unwind` so
4305            // the caller observes the panic. (D274 deleted the matching
4306            // `drain_deferred_producer_ops()` call that pre-D274 ran here as
4307            // a documented but no-op shim.)
4308            let fire_result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
4309                self.core
4310                    .fire_deferred(jobs, releases, cleanup_hooks, pending_wipes);
4311            }));
4312            // /qa A1 fix (2026-05-09): release wave_cache_snapshots retains
4313            // lock-released. Pre-A1 these were released inside the held
4314            // state + cross_partition locks; binding finalizers re-entering
4315            // Core would deadlock against either mutex. Drained earlier
4316            // under the lock; released here after both mutexes dropped and
4317            // sinks have fired. Runs even on `fire_deferred` panic — these
4318            // retains were lifted out of the wave state already and need
4319            // releasing regardless.
4320            for h in snapshot_releases {
4321                self.core.binding.release_handle(h);
4322            }
4323            if let Err(payload) = fire_result {
4324                // /qa P1: run wave-end finalization defensively before
4325                // propagating the panic. tier3_clear avoids stale-entry
4326                // leakage across cargo's thread-reuse (mirrors the
4327                // wave-start defensive clear in `begin_batch_with_guards`).
4328                // (D274 deleted `drain_deferred_producer_ops()` here — it
4329                // was a no-op shim per D211; no producer-deferred queue
4330                // remains to drain.)
4331                tier3_clear();
4332                std::panic::resume_unwind(payload);
4333            }
4334            // D260: check if `fire_deferred` posted new work to mailbox/
4335            // deferred. Quiescent ⇒ done. Else: re-claim `in_tick` (cleared
4336            // above, before `fire_deferred`) and loop the full sequence.
4337            if !self.core.mailbox.is_runnable() && !self.core.deferred.is_runnable() {
4338                break;
4339            }
4340            // Re-claim wave ownership for the secondary drain pass. The
4341            // previous iteration's `clear_in_tick` ran before its
4342            // `fire_deferred`, so the slot is `0` here. `claim_in_tick`
4343            // panics fail-loud on cross-Core mismatch (D252); a
4344            // same-thread same-Core re-claim must succeed-outermost.
4345            let owns = self.core.claim_in_tick();
4346            debug_assert!(
4347                owns,
4348                "D260: secondary drain pass should always succeed-outermost \
4349                 (in_tick was cleared by the previous iteration's pre-fire_deferred clear)"
4350            );
4351            // /qa P2 (release-build safety): if `claim_in_tick` silently
4352            // returned `false` (slot already held — should be impossible
4353            // by-construction under D255 single-owner / D248 actor model,
4354            // but defense-in-depth), break the loop cleanly rather than
4355            // running `drain_and_flush` without ownership. A subsequent
4356            // outer wave entry will catch the still-runnable mailbox.
4357            if !owns {
4358                break;
4359            }
4360        }
4361        // D1 patch (2026-05-09): clear the per-thread Slice G tier3
4362        // tracker at outermost wave-end (success path). Mirrors the
4363        // panic-discard branch above. Thread-local outlives BatchGuard
4364        // by default; cargo's thread-reuse across tests would propagate
4365        // stale entries. Cleared after sinks fire (sink callbacks may
4366        // re-enter Core via emit and could read the tier3 set
4367        // mid-wave; the wave is over here so clearing is safe).
4368        tier3_clear();
4369        // D246/S2c: no per-partition wave-owner guards to release
4370        // (single-owner ⇒ the §7 wave-locks are deleted).
4371        // Phase H+ STRICT (D115): drain deferred producer ops at
4372        // wave-end (now a no-op shim — §7 deleted the deferred-producer
4373        // queue; retained so call sites compile unchanged, D211).
4374    }
4375}