Skip to main content

graphrefly_core/
batch.rs

1//! Wave engine — drain loop, fire selection, emission commit, sink dispatch.
2//!
3//! Ports the wave-engine portion of the handle-protocol prototype
4//! (`~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`).
5//! Sibling to [`super::node`]; the dispatcher's other concerns
6//! (registration, subscription, pause/resume, terminal cascade,
7//! `set_deps`) live there.
8//!
9//! # Wave engine entry points
10//!
11//! - [`Core::run_wave`] — wave entry. Claims `in_tick` under the state lock,
12//!   runs `op` lock-released, then drains all transitive fn-fires and
13//!   flushes per-subscriber notifications. Each fn-fire iteration drops
14//!   the state lock around `BindingBoundary::invoke_fn` so user fn callbacks
15//!   can re-enter Core safely.
16//! - [`Core::drain_and_flush`] — drain phase + flush phase. Acquires/drops
17//!   the state lock per iteration around `invoke_fn`.
18//! - [`Core::commit_emission`] — equals-substitution + DIRTY/DATA/RESOLVED
19//!   queueing + child propagation. `&self`-only; bracket-fires
20//!   `BindingBoundary::custom_equals` lock-released.
21//! - [`Core::queue_notify`] — per-subscriber message queueing with
22//!   pause-buffer routing. Snapshots the subscriber list at first-touch-
23//!   per-wave so late subscribers (installed mid-wave between drain
24//!   iterations) don't receive duplicate deliveries from messages already
25//!   queued before they subscribed.
26//! - [`Core::deliver_data_to_consumer`] — single-edge propagation; marks
27//!   the consumer for fn-fire if its tracked-deps set is satisfied.
28//!   Called from `commit_emission`, plus `activate_derived` and
29//!   `set_deps` in [`super::node`].
30//!
31//! # Re-entrance discipline (Slice A close — M1 fully lock-released)
32//!
33//! - **Wave-end sink fires** drop the state lock first (Slice A-bigger
34//!   discipline).
35//! - **`BindingBoundary::invoke_fn`** in `fire_fn` fires lock-released —
36//!   user fn callbacks may re-enter `Core::emit` / `pause` / `resume` /
37//!   `invalidate` / `complete` / `error` / `teardown` and run a nested
38//!   wave (the existing `in_tick` re-entrance gate composes
39//!   transparently).
40//! - **`BindingBoundary::custom_equals`** in `commit_emission`'s equals
41//!   check fires lock-released.
42//! - **Subscribe-time handshake** is the one remaining lock-held callback.
43//!   It now fires per-tier (`[Start]`, `[Data(v)]`, `[Complete|Error]`,
44//!   `[Teardown]`) as separate sink calls, matching the canonical R1.3.5.a
45//!   tier-split. Re-entrance from a handshake sink callback panics with
46//!   the [`reentrance_guard`] diagnostic.
47
48use std::cell::RefCell;
49use std::collections::HashMap;
50use std::sync::Arc;
51
52use ahash::AHashSet;
53use indexmap::map::Entry;
54use indexmap::IndexMap;
55
56use smallvec::SmallVec;
57
58use crate::boundary::{DepBatch, FnEmission, FnResult};
59use crate::handle::{FnId, HandleId, NodeId, NO_HANDLE};
60use crate::message::Message;
61use crate::node::{Core, CoreState, EqualsMode, OperatorOp, Sink, TerminalKind};
62
63// Slice G (R1.3.2.d / R1.3.3.a) per-thread tier-3-emit tracker.
64//
65// **Wave scope = thread-local.** GraphReFly's wave-engine guarantees
66// that every emit at a given node within a single wave runs on the
67// same thread (the thread that holds the partition's `wave_owner`
68// `parking_lot::ReentrantMutex` — cross-thread emits at a node BLOCK
69// on that mutex and so always land in the OTHER thread's wave). A
70// wave is bounded above by the outermost `BatchGuard` drop on its
71// originating thread. Together this means a per-thread
72// `AHashSet<NodeId>` is the natural placement for "has node X already
73// emitted a tier-3 message in this wave?" — the set's lifetime
74// exactly matches the wave's, with no cross-thread or cross-wave
75// contamination.
76//
77// **History (D1 patch, 2026-05-09):** previously placed on
78// `crate::subgraph::SubgraphLockBox::state` per-partition (Q3 v1).
79// That placement was robust to per-partition wave parallelism but
80// vulnerable to mid-wave cross-thread `set_deps` partition splits:
81// thread A is mid-wave on partition P (wave_owner held) but between
82// fn fires (`currently_firing` empty); thread B's `set_deps` acquires
83// the state lock, P13's `currently_firing.is_empty()` check
84// short-circuits, the split proceeds, and X migrates from P to a
85// fresh orphan-side partition with an empty
86// `tier3_emitted_this_wave`. Thread A's subsequent emit at X then
87// mis-detects "first emit" and queues a Resolved alongside the prior
88// Data — R1.3.3.a violation. Thread-local placement is immune to this
89// hazard: thread B's split doesn't touch thread A's thread-local at
90// all.
91//
92// **Lifecycle:** populated by `Core::commit_emission` /
93// `Core::commit_emission_verbatim`; cleared at the OUTERMOST
94// `BatchGuard` drop on this thread (both success and panic-discard
95// paths). Re-entrant nested waves on the same thread share the set —
96// inner-wave emits add to the same set; the outermost drop is the
97// canonical clear point. Cross-thread emits NEVER touch this thread's
98// set (they serialize on the partition wave_owner; the cross-thread
99// emit happens in the OTHER thread's emit-loop and uses the OTHER
100// thread's tier3 thread-local).
101thread_local! {
102    static TIER3_EMITTED_THIS_WAVE: RefCell<AHashSet<NodeId>> = RefCell::new(AHashSet::new());
103}
104
105// Q-beyond Sub-slice 1 (D108 / 2026-05-09): per-thread wave-scoped state.
106//
107// **Design rationale (bench-driven, see `benches/lock_strategy.rs`):**
108// - S1 showed parking_lot Mutex same-thread re-acquire is ~14 ns/op,
109//   identical to thread_local borrow_mut. The "mutex hop is slow" intuition
110//   is wrong UNCONTENDED.
111// - S3 showed shared mutex on disjoint cross-thread keys is 2.7× slower
112//   than per-partition mutex / thread_local (35.9 vs 13.0 ns/op) — pure
113//   cache-line bouncing on the lock state itself.
114// - Conclusion: the cost of the prior `Core::cross_partition` mutex was
115//   dominated by cache-line bouncing across cores, NOT by single-thread
116//   mutex acquire overhead. Moving the four wave-scoped fields to a
117//   per-thread thread_local eliminates the bounce point entirely.
118//
119// **Wave scope = thread, same as `TIER3_EMITTED_THIS_WAVE`:** every emit
120// in a wave runs on the thread that holds the partition wave_owner;
121// cross-thread emits BLOCK on wave_owner and so always land in the OTHER
122// thread's wave context with the OTHER thread's WAVE_STATE. Mid-wave
123// cross-thread `set_deps` partition splits don't touch this thread's
124// thread-local at all (D1 lesson, applied here).
125//
126// **Lifecycle:** populated by `Core::commit_emission` /
127// `Core::queue_notify` / etc.; mostly drained mid-wave by the auto-resolve
128// sweep + cache snapshot commit/restore. Outermost `BatchGuard::drop`
129// releases any retained handles still in `wave_cache_snapshots` /
130// `deferred_handle_releases`. Defensive wave-start clear at outermost
131// owning BatchGuard entry guards against cargo's thread-reuse propagating
132// stale entries from a prior panicked-mid-wave test.
133thread_local! {
134    static WAVE_STATE: RefCell<WaveState> = RefCell::new(WaveState::new());
135}
136
137// Wave-ownership flag, keyed per-(Core, thread). Membership of a
138// `Core::generation` value means "this thread is currently inside an
139// OWNING wave on that Core" — i.e. the outermost `BatchGuard` whose drop
140// must run the drain. Replaces the former Core-global `CoreState::in_tick`
141// bool.
142//
143// **Why per-(Core, thread).** The flag must jointly satisfy three
144// constraints that no single-scope placement can:
145//   - **Cross-Core isolation (/qa F1).** A thread holding a live
146//     `BatchGuard` on Core-A and then entering a wave on Core-B must not
147//     see Core-A's flag. Keying by `Core::generation` (a process-monotonic
148//     id, never reused) gives each Core a distinct slot. A purely
149//     thread-local bool failed here (Core-A's `in_tick` leaked to Core-B
150//     on the same OS thread → Core-B non-owning → its writes drained by
151//     Core-A's binding).
152//   - **Disjoint-partition drain correctness.** Two threads running waves
153//     on disjoint partitions of ONE Core run truly parallel (separate
154//     `wave_owner`s — they do not block each other). A Core-global flag
155//     made thread B observe thread A's `in_tick=true`, wrongly classify
156//     its independent wave as nested, and no-op its drop — so B's wave
157//     never drained (leaked payload retains, undelivered sink batches;
158//     caught late by `wave_state_clear_outermost`). A per-thread slot
159//     makes each thread own and drain its own disjoint wave.
160//   - **Nested same-(Core, thread) re-entry (/qa EC#3).** A nested
161//     `run_wave` / `actions.up` on the same Core and thread MUST observe
162//     ownership so its drop no-ops and the outer wave drains. A shared
163//     slot within one (Core, thread) preserves this.
164//
165// **No lock required.** `in_tick` is only ever read or written by the
166// wave-owner thread: cross-thread same-partition emits BLOCK on the
167// partition `wave_owner`, and cross-partition cascades acquire every
168// touched partition upfront. So — unlike `currently_firing`, which the
169// cross-thread P13 set_deps check (/qa F2) requires be Core-global and
170// cross-thread-visible — `in_tick` has no cross-thread read requirement
171// and needs no shared-state lock. (`currently_firing` deliberately stays
172// on `CoreState`; see `node.rs`.)
173//
174// Stale slots: the owning `BatchGuard::drop` releases the generation on
175// every exit path — normal return, the closure-body-panic branch, AND
176// the drain-phase-panic `catch_unwind` arm (before `resume_unwind`). So
177// a slot can only be left interned if `Drop` itself never runs:
178// `std::mem::forget(guard)` or a process abort without unwinding — both
179// out of contract (`BatchGuard` is `#[must_use]` + `!Send`). Such a
180// leaked key is inert: generations are never reused, so it can never
181// false-match a future Core (no correctness impact), and the leak is
182// bounded by the number of distinct Cores a thread ever forgets a guard
183// on — not an unbounded leak under any normal or panic-recovery path.
184//
185// History: this flag lived briefly per-thread (Q-beyond sub-slice 3),
186// was reverted to Core-global (/qa F1+F2), and is now keyed per-(Core,
187// thread) — the placement that satisfies all three constraints at once.
188// See `docs/rust-port-decisions.md` and the `docs/porting-deferred.md`
189// Phase-J / "in_tick Core-global→per-(Core,thread)" entry (2026-05-15).
190thread_local! {
191    static IN_TICK_OWNED: RefCell<AHashSet<u64>> = RefCell::new(AHashSet::new());
192}
193
194/// Wave-scoped state previously held under [`Core::cross_partition`]'s
195/// `parking_lot::Mutex<CrossPartitionState>`. Now per-thread (Q-beyond
196/// Sub-slice 1, 2026-05-09; Sub-slice 2 added `pending_fires` +
197/// `pending_notify`, 2026-05-09; Sub-slice 3 added `currently_firing`,
198/// `in_tick`, `deferred_flush_jobs`, `deferred_cleanup_hooks`,
199/// `pending_wipes`, `invalidate_hooks_fired_this_wave`, 2026-05-09).
200///
201/// All fields are populated and drained within one wave on one thread.
202/// Cross-thread access is structurally impossible — cross-thread emits
203/// block on partition `wave_owner` and land in the OTHER thread's wave
204/// context.
205///
206/// **Refcount discipline (load-bearing):** `wave_cache_snapshots`,
207/// `deferred_handle_releases`, and `pending_notify` hold binding-side
208/// handle retains. They MUST be drained (and released through
209/// `Core::binding.release_handle`) by the outermost `BatchGuard::drop`
210/// on success and panic paths. `pending_notify` holds one retain per
211/// payload-bearing message (one per `Message::payload_handle()`); the
212/// retains are taken in `Core::queue_notify` and balanced either by
213/// `flush_notifications` (success path: pushed into
214/// `deferred_handle_releases`) or directly in the panic-discard path of
215/// `BatchGuard::drop` (taken from `pending_notify` and released).
216///
217/// The thread_local has no `Drop` hook with access to a binding — a
218/// panic that bypasses `BatchGuard::drop` (e.g. panic OUTSIDE any batch)
219/// would leak retains until the thread exits OR the next outermost
220/// wave-start clear runs (which for safety we don't fire — clearing
221/// without releasing would double-leak by losing the retain). The
222/// defensive wave-start clear in `BatchGuard::begin_batch_with_guards`
223/// clears `pending_auto_resolve` + `pending_pause_overflow` +
224/// `pending_fires` (no retains) + `currently_firing` +
225/// `invalidate_hooks_fired_this_wave` (also no retains) but NOT the
226/// retain-holding fields — those must be empty by construction at
227/// outermost wave start (a prior wave's panic-discard path drained them,
228/// or a prior wave's success path drained them).
229pub(crate) struct WaveState {
230    /// Payload-handle releases owed for messages that landed in
231    /// `pending_notify` during this wave (one per `payload_handle()`).
232    /// `BatchGuard::drop` releases these after sinks fire and the lock
233    /// is dropped, balancing the retain done in `queue_notify`.
234    pub(crate) deferred_handle_releases: Vec<HandleId>,
235    /// Pre-wave cache snapshots used to restore state if the wave aborts
236    /// mid-flight (e.g., a `Core::batch` closure panics). Each entry is
237    /// `(node_id → old_cache_handle)` — the handle the node held BEFORE
238    /// the wave started writing to it. The snapshotted handle holds a
239    /// retain (taken when the snapshot was inserted) so it stays alive
240    /// for restoration. On wave success, snapshots are drained and their
241    /// retains released. On wave abort, each cache slot is restored from
242    /// the snapshot and the original retain transfers to the cache slot.
243    pub(crate) wave_cache_snapshots: HashMap<NodeId, HandleId>,
244    /// Nodes that need an auto-Resolved at wave end if they don't receive
245    /// a tier-3+ message from their own commit_emission. Populated by
246    /// the RESOLVED child propagation in `commit_emission`. Drained by
247    /// the auto-resolve sweep in `drain_and_flush`.
248    pub(crate) pending_auto_resolve: AHashSet<NodeId>,
249    /// R1.3.8.c pause-overflow ERROR synthesis queue. Recorded by
250    /// [`Core::queue_notify`] when the pause buffer first overflows;
251    /// drained at wave-end after the lock-released call to
252    /// `BindingBoundary::synthesize_pause_overflow_error`.
253    pub(crate) pending_pause_overflow: Vec<crate::node::PendingPauseOverflow>,
254    /// Nodes whose fn we owe a fire to — drained by [`Core::run_wave`].
255    ///
256    /// Q-beyond Sub-slice 2 (D108, 2026-05-09): moved from
257    /// `CoreState::pending_fires` to per-thread `WaveState`. Wave-scoped
258    /// — populated by `deliver_data_to_consumer`, `terminate_node`'s
259    /// child-cascade `QueueFire` branch, `activate_derived`'s producer
260    /// queueing, `resume`'s pending-wave consolidation, and operator
261    /// re-arm paths; drained by `pick_next_fire` / `fire_fn` /
262    /// `fire_regular` / `fire_operator` (each removes the firing node
263    /// before invoking).
264    pub(crate) pending_fires: AHashSet<NodeId>,
265    /// Per-node outgoing message buffer; flushed at wave end. Insertion-
266    /// ordered so flush order is deterministic — load-bearing for
267    /// R1.3.9.d meta-TEARDOWN ordering: when a parent and its meta
268    /// companion both have queued messages in the same wave, the meta
269    /// (queued first via `teardown_inner`'s recursion order) flushes
270    /// first.
271    ///
272    /// Each entry carries the per-wave subscriber snapshot taken at first
273    /// touch (Slice A close, M1: lock-released drain). Late subscribers
274    /// installed mid-wave between fn-fire iterations don't appear in
275    /// already-snapshotted entries; this is the load-bearing fix that
276    /// prevents duplicate-Data delivery when a handshake delivers the
277    /// post-commit cache and the wave's flush would otherwise also fire
278    /// to the same sink.
279    ///
280    /// Q-beyond Sub-slice 2 (D108, 2026-05-09): moved from
281    /// `CoreState::pending_notify` to per-thread `WaveState`. The map
282    /// holds a payload-handle retain per payload-bearing message
283    /// (`Message::payload_handle()`); these MUST be released by the
284    /// outermost `BatchGuard::drop` (success path: through
285    /// `flush_notifications` → `deferred_handle_releases`; panic path:
286    /// directly in `BatchGuard::drop`'s panic branch).
287    pub(crate) pending_notify: IndexMap<NodeId, PendingPerNode>,
288    // Q-beyond Sub-slice 3 (D108, 2026-05-09) moved `in_tick` and
289    // `currently_firing` from `CoreState` to per-thread `WaveState`;
290    // /qa F1+F2 (2026-05-10) reverted both to `CoreState`; the in_tick
291    // placement was finalized 2026-05-15 (see below). The two fields have
292    // *different* scope requirements:
293    //
294    // - **`in_tick` — per-(Core, thread).** Pure thread-local broke
295    //   cross-Core isolation (Core-A's flag leaked to Core-B on the same
296    //   OS thread → Core-B non-owning → its writes drained by Core-A's
297    //   binding, /qa F1). Pure Core-global broke disjoint-partition drain
298    //   ownership (thread B's independent disjoint wave saw thread A's
299    //   flag → non-owning → never drained). The (Core, thread) key —
300    //   `crate::batch::IN_TICK_OWNED`, keyed by `Core::generation` —
301    //   satisfies both, plus same-(Core, thread) nested re-entry
302    //   (/qa EC#3). NOT a `CoreState` field.
303    //
304    // - **`currently_firing` — Core-global (stays on `CoreState`).**
305    //   Per-thread placement silently bypassed the cross-thread P13
306    //   partition-migration check in `Core::set_deps`: thread B's set_deps
307    //   must observe thread A's firing pushes. Per-Core (cross-thread
308    //   visible) placement restores the D091 safety check (/qa F2).
309    //
310    // The other 11 wave-scoped fields stay per-thread because they're
311    // accessed only by the wave-owner thread under `wave_owner`
312    // discipline (cross-thread emits BLOCK on partition wave_owner).
313    /// Slice E2 (R1.3.9.b strict per D057): per-wave-per-node dedup
314    /// for `OnInvalidate` cleanup hook firing. A node already in this
315    /// set this wave has already had its `OnInvalidate` queued into
316    /// `deferred_cleanup_hooks` and MUST NOT queue again, even if
317    /// `invalidate_inner` re-encounters it.
318    ///
319    /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
320    /// `CoreState::invalidate_hooks_fired_this_wave` to per-thread
321    /// `WaveState`. Wave-scoped — populated by `invalidate_inner` and
322    /// cleared by `WaveState::clear_wave_state`.
323    pub(crate) invalidate_hooks_fired_this_wave: AHashSet<NodeId>,
324    /// Deferred sink-fire jobs collected by `flush_notifications`.
325    /// `flush_notifications` populates this from `pending_notify`;
326    /// `Core::drain_deferred` takes it and `Core::fire_deferred` fires
327    /// each entry lock-released. Each tuple is
328    /// `(sinks_for_one_node_one_phase, phase_messages)`. Empty between
329    /// waves.
330    ///
331    /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
332    /// `CoreState::deferred_flush_jobs` to per-thread `WaveState`. No
333    /// retains held — the `Vec<Sink>` clones own Arcs that drop
334    /// naturally; the `Vec<Message>` payload retains were already moved
335    /// into `deferred_handle_releases` by `flush_notifications`.
336    pub(crate) deferred_flush_jobs: DeferredJobs,
337    /// Slice E2 (per D060/D061): lock-released drain queue for
338    /// `OnInvalidate` cleanup hooks. Populated by `Core::invalidate_inner`
339    /// when a node's cache transitions `!= NO_HANDLE → NO_HANDLE`;
340    /// drained after the lock drops at wave boundary by
341    /// `Core::fire_deferred` (each call wrapped in `catch_unwind` per
342    /// D060). Panic-discarded silently per D061.
343    ///
344    /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
345    /// `CoreState::deferred_cleanup_hooks` to per-thread `WaveState`.
346    pub(crate) deferred_cleanup_hooks: Vec<(NodeId, crate::boundary::CleanupTrigger)>,
347    /// Slice E2 /qa Q2(b) (D069): lock-released drain queue for
348    /// `BindingBoundary::wipe_ctx` calls fired eagerly from
349    /// `Core::terminate_node` when a resubscribable node terminates with
350    /// no live subscribers. Drained alongside `deferred_cleanup_hooks`
351    /// at wave boundary; same `catch_unwind` discipline. Panic-discarded
352    /// silently.
353    ///
354    /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
355    /// `CoreState::pending_wipes` to per-thread `WaveState`.
356    pub(crate) pending_wipes: Vec<NodeId>,
357}
358
359impl WaveState {
360    fn new() -> Self {
361        Self {
362            deferred_handle_releases: Vec::new(),
363            wave_cache_snapshots: HashMap::new(),
364            pending_auto_resolve: AHashSet::new(),
365            pending_pause_overflow: Vec::new(),
366            pending_fires: AHashSet::new(),
367            pending_notify: IndexMap::new(),
368            invalidate_hooks_fired_this_wave: AHashSet::new(),
369            deferred_flush_jobs: Vec::new(),
370            deferred_cleanup_hooks: Vec::new(),
371            pending_wipes: Vec::new(),
372        }
373    }
374
375    /// Wave-end clear of the non-retain-holding fields. Called from
376    /// [`Core::drain_and_flush`]'s wave-end path. Fields holding retains
377    /// (`wave_cache_snapshots`, `deferred_handle_releases`,
378    /// `pending_notify`) are NOT cleared here — they follow the
379    /// success/panic paths' explicit drain discipline in
380    /// `BatchGuard::drop`.
381    pub(crate) fn clear_wave_state(&mut self) {
382        self.pending_auto_resolve.clear();
383        // pending_pause_overflow is normally drained by drain_and_flush
384        // via the synthesis loop. If a wave is panic-discarded BEFORE
385        // synthesis runs, BatchGuard::drop's panic path also clears it
386        // explicitly. Pre-wave defensive clear in
387        // `begin_batch_with_guards` makes this idempotent.
388        self.pending_pause_overflow.clear();
389        // Sub-slice 2: pending_fires is intentionally NOT cleared
390        // here. Two reasons:
391        //   1. Wave-success drain empties it by construction: every
392        //      `pick_next_fire` selection is removed by
393        //      `fire_regular` / `fire_operator` before invocation,
394        //      and `drain_and_flush` only exits when the set is empty.
395        //   2. The `Core::resume` default-mode consolidated-fire
396        //      pattern stages an entry OUTSIDE any in-tick wave and
397        //      then enters a new wave to drain it; clearing here
398        //      would erase that pre-staged entry. The panic-discard
399        //      path in `BatchGuard::drop` clears it explicitly.
400
401        // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
402        // CoreState::currently_firing — defensive clear there.
403        // Slice E2 (D057): per-wave-per-node OnInvalidate dedup is
404        // wave-scoped — cleared so the next wave can fire cleanups
405        // again.
406        self.invalidate_hooks_fired_this_wave.clear();
407        // `deferred_flush_jobs`, `deferred_cleanup_hooks`, and
408        // `pending_wipes` are intentionally NOT cleared here. They
409        // follow the same discipline as `deferred_handle_releases` /
410        // `pending_notify`:
411        //   - SUCCESS path (`BatchGuard::drop` non-panic): drained by
412        //     `Core::drain_deferred` AFTER `clear_wave_state` runs,
413        //     then fired lock-released by `Core::fire_deferred`.
414        //   - PANIC-DISCARD path (`BatchGuard::drop` panic): explicitly
415        //     `std::mem::take`-and-dropped AFTER `clear_wave_state`
416        //     runs (silently per D061 / D069).
417        // Clearing here would race the success path: queued sink fires
418        // / cleanup hooks / wipes would be erased BEFORE
419        // `drain_deferred` could take them.
420    }
421}
422
423/// Run a closure with mutable access to this thread's [`WaveState`].
424///
425/// Convention: prefer this helper over inline `WAVE_STATE.with(...)`
426/// for sites that touch ONE field. For sites that interleave state lock
427/// access with wave-state mutation, inline `WAVE_STATE.with(...)` keeps
428/// the lock-acquire / wave-state-borrow scopes visible (mirrors the
429/// pre-Q-beyond `let mut s = self.lock_state(); let mut cps = self.lock_cross_partition();`
430/// pattern).
431///
432/// **Re-entrance:** the closure MUST NOT re-enter Core in a way that
433/// would call back into `with_wave_state` — `RefCell::borrow_mut` panics
434/// on nested borrow. The same discipline that the prior
435/// `parking_lot::Mutex<CrossPartitionState>` enforced (no re-entry
436/// holding cross_partition) carries over.
437pub(crate) fn with_wave_state<R>(f: impl FnOnce(&mut WaveState) -> R) -> R {
438    WAVE_STATE.with(|cell| f(&mut cell.borrow_mut()))
439}
440
441/// Outermost-wave defensive clear of [`WaveState`]'s non-retain-holding
442/// fields. Called from [`BatchGuard::begin_batch_with_guards`] on
443/// outermost owning entry. Mirrors the pre-existing tier3 defensive
444/// clear (D1 patch, 2026-05-09) — guards against cargo's thread-reuse
445/// propagating stale entries from a prior panicked-mid-wave test.
446///
447/// The retain-holding fields (`wave_cache_snapshots` /
448/// `deferred_handle_releases`) MUST already be empty by construction at
449/// outermost wave entry — outermost `BatchGuard::drop` always drains
450/// them on both success and panic paths. If they're non-empty here it
451/// indicates a prior wave bypassed `BatchGuard::drop`; in that case
452/// the next BatchGuard's outermost drop will eventually drain them.
453fn wave_state_clear_outermost() {
454    with_wave_state(|ws| {
455        // /qa F4 (2026-05-10): debug_assert that retain-holding fields
456        // are empty at outermost wave start. The invariant claim is
457        // "outermost BatchGuard::drop drains them on both success and
458        // panic paths, so they're empty before the next wave starts."
459        // If a panic path EVER bypasses the drain (today: not reachable
460        // because BatchGuard::drop is robust against panicking sinks via
461        // catch_unwind), this assert catches it in tests immediately
462        // rather than letting stale entries leak into the next wave's
463        // drain (which would release Core-A's HandleIds via Core-B's
464        // binding under cross-Core same-thread sequential use).
465        debug_assert!(
466            ws.wave_cache_snapshots.is_empty(),
467            "wave_state_clear_outermost: wave_cache_snapshots non-empty at \
468             outermost wave start ({} entries) — prior BatchGuard::drop \
469             bypassed the drain (would leak retains into next wave's \
470             binding). See /qa F4 (2026-05-10).",
471            ws.wave_cache_snapshots.len()
472        );
473        debug_assert!(
474            ws.deferred_handle_releases.is_empty(),
475            "wave_state_clear_outermost: deferred_handle_releases non-empty \
476             at outermost wave start ({} entries) — prior BatchGuard::drop \
477             bypassed the drain. See /qa F4 (2026-05-10).",
478            ws.deferred_handle_releases.len()
479        );
480        debug_assert!(
481            ws.pending_notify.is_empty(),
482            "wave_state_clear_outermost: pending_notify non-empty at \
483             outermost wave start ({} entries) — prior BatchGuard::drop \
484             bypassed the drain. See /qa F4 (2026-05-10).",
485            ws.pending_notify.len()
486        );
487        ws.pending_auto_resolve.clear();
488        ws.pending_pause_overflow.clear();
489        // Sub-slice 2: pending_fires is intentionally NOT cleared here.
490        // Pre-Sub-slice-2 it lived on CoreState and survived between
491        // waves; load-bearing for `Core::resume`'s default-mode
492        // consolidated-fire pattern, which inserts into pending_fires
493        // OUTSIDE any in-tick wave (Phase 1, lock-held but `in_tick`
494        // false at that moment) and then calls `run_wave_for(node_id)`
495        // — `run_wave_for` enters a NEW outermost wave whose drain must
496        // pick up that pre-staged pending_fires entry. Clearing here
497        // would erase it.
498        //
499        // pending_fires holds no retains, so a stale entry from a
500        // prior panicked-mid-wave test that bypassed BatchGuard::drop
501        // would leak as a spurious fire on the next wave on the same
502        // thread (no refcount damage). The panic-discard path in
503        // BatchGuard::drop and the wave-success drain together
504        // guarantee pending_fires is empty by wave end; relying on
505        // that invariant matches the pre-refactor lifecycle.
506        //
507        // Intentionally NOT clearing wave_cache_snapshots /
508        // deferred_handle_releases / pending_notify here — those hold
509        // retains and need a binding to release. Documented invariant:
510        // they're empty by outermost wave start.
511
512        // Sub-slice 3 (2026-05-09; /qa F2 partially reverted 2026-05-10):
513        // defensively clear the OnInvalidate dedup set on outermost-wave
514        // entry. Holds no retains; a stale entry from a prior
515        // panicked-mid-wave test that bypassed BatchGuard::drop would
516        // only suppress the OnInvalidate cleanup hook for that node on
517        // the next wave (no refcount damage). Clearing matches the
518        // tier3 defensive-clear precedent.
519        //
520        // `currently_firing` was reverted to CoreState (per /qa F2 — the
521        // per-thread placement silently bypassed the cross-thread P13
522        // partition-migration check); its defensive clear lives in
523        // `CoreState::clear_wave_state` (which BatchGuard::drop runs
524        // wave-end on both success and panic paths).
525        ws.invalidate_hooks_fired_this_wave.clear();
526        // Intentionally NOT clearing deferred_flush_jobs /
527        // deferred_cleanup_hooks / pending_wipes here — by invariant
528        // they're empty at outermost wave start (drained on success
529        // by drain_deferred → fire_deferred; drained on panic by
530        // BatchGuard::drop's panic branch). Pre-clearing would race a
531        // hypothetical wave that staged into them OUTSIDE in_tick
532        // (none does today, but matching the deferred_handle_releases
533        // / pending_notify discipline keeps the invariant uniform).
534    });
535}
536
537// Profile-driven optimization (2026-05-10): per-thread partition cache for
538// `begin_batch_for`. The common hot-loop pattern is repeated emits to the
539// same seed node (e.g., state node in a tight emit loop). Each emit calls
540// `begin_batch_for(seed)` which calls `compute_touched_partitions(seed)` —
541// a BFS that acquires state + registry locks and allocates a HashSet +
542// SmallVec. Since the topology doesn't change between emits (registry epoch
543// is stable), we cache the BFS result per-thread and skip the BFS on hit.
544//
545// Cache validity: keyed on (core_generation, seed, epoch). Any registry mutation
546// (register/union/split) bumps epoch → invalidates. The post-acquire epoch
547// recheck in `begin_batch_for` catches the (rare) case where a concurrent
548// mutation happens between cache read and lock acquisition.
549struct PartitionCache {
550    /// Monotonic generation from [`crate::node::CORE_GENERATION`]. Avoids
551    /// ABA false-hits that `Arc::as_ptr` would suffer after Core drop +
552    /// allocator address reuse (/qa F1, 2026-05-10).
553    core_generation: u64,
554    seed: NodeId,
555    epoch: u64,
556    partitions: SmallVec<[crate::subgraph::SubgraphId; 4]>,
557}
558
559thread_local! {
560    static PARTITION_CACHE: RefCell<Option<PartitionCache>> = const { RefCell::new(None) };
561}
562
563/// Has `node` emitted a tier-3 (DATA / RESOLVED) message in the current
564/// wave on this thread? See [`TIER3_EMITTED_THIS_WAVE`] for the per-thread
565/// wave-scope rationale.
566fn tier3_check(node: NodeId) -> bool {
567    TIER3_EMITTED_THIS_WAVE.with(|s| s.borrow().contains(&node))
568}
569
570/// Mark `node` as having emitted a tier-3 message in the current wave on
571/// this thread. Idempotent. See [`TIER3_EMITTED_THIS_WAVE`].
572fn tier3_mark(node: NodeId) {
573    TIER3_EMITTED_THIS_WAVE.with(|s| {
574        s.borrow_mut().insert(node);
575    });
576}
577
578/// Wave-end clear of the per-thread tier3 tracker. Called from the
579/// OUTERMOST [`BatchGuard::drop`] on this thread (both success and
580/// panic-discard paths). Inner non-owning BatchGuard drops MUST NOT
581/// invoke this — the outer wave is still in flight and inner-wave marks
582/// are part of the outer wave's Slice G coalescing state.
583fn tier3_clear() {
584    TIER3_EMITTED_THIS_WAVE.with(|s| {
585        s.borrow_mut().clear();
586    });
587}
588
589/// Deferred sink-fire jobs collected during `flush_notifications`. Each
590/// entry pairs a snapshot of the sink Arcs to fire with the messages to
591/// deliver to them — one entry per (node × phase) cell with non-empty
592/// content. Drained from `CoreState` and fired lock-released.
593pub(crate) type DeferredJobs = Vec<(Vec<Sink>, Vec<Message>)>;
594
595/// Lock-released drain payload of the wave's BatchGuard:
596/// `(sink_jobs, handle_releases, OnInvalidate cleanup hooks, pending wipe_ctx fires)`.
597/// Returned by [`Core::drain_deferred`], consumed by [`Core::fire_deferred`].
598/// Sliced into a type alias to satisfy `clippy::type_complexity`.
599pub(crate) type WaveDeferred = (
600    DeferredJobs,
601    Vec<HandleId>,
602    Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
603    Vec<crate::handle::NodeId>,
604);
605
606/// One subscriber-snapshot epoch within a node's wave-end notification
607/// queue. A `PendingBatch` is opened the first time `queue_notify` runs
608/// for the node in a wave, and a fresh batch is opened whenever the node's
609/// `subscribers_revision` advances mid-wave (a new sink subscribes, an
610/// existing sink unsubscribes, or a handshake-time panic evicts an
611/// orphaned sink). All messages within one batch flush to the same sink
612/// list — the snapshot taken when the batch opened, frozen against
613/// subsequent revision bumps.
614pub(crate) struct PendingBatch {
615    /// `NodeRecord::subscribers_revision` value at the moment this batch
616    /// opened. Used by `queue_notify` to decide append-to-last-batch vs
617    /// open-fresh-batch on every push.
618    pub(crate) snapshot_revision: u64,
619    /// Subscriber snapshot frozen at batch-open time. SmallVec<[_; 1]>
620    /// inlines the common single-subscriber case (avoids heap alloc for
621    /// the dominant 1-sink-per-node pattern in most reactive graphs).
622    pub(crate) sinks: SmallVec<[Sink; 1]>,
623    /// Messages queued to this batch. SmallVec<[_; 3]> inlines the
624    /// common per-node-per-wave message set (DIRTY + DATA + optional
625    /// RESOLVED) without heap allocation.
626    pub(crate) messages: SmallVec<[Message; 3]>,
627}
628
629/// Per-node wave-end notification queue, structured as one or more
630/// subscriber-snapshot epochs (`batches`). The common case (no
631/// mid-wave subscribe / unsubscribe at this node) keeps a single
632/// inline batch — `SmallVec<[_; 1]>` keeps that allocation-free.
633///
634/// **Slice X4 / D2 (2026-05-08):** the prior shape was a single
635/// `(sinks, messages)` pair per node — the snapshot froze on first
636/// `queue_notify` and was reused for every subsequent emit to the same
637/// node in the wave. That caused the documented late-subscriber +
638/// multi-emit-per-wave gap (R1.3.5.a divergence): a sub installed
639/// between two emits to the same node was invisible to the second
640/// emit's flush slice. The revision-tracked batch list resolves it —
641/// late subs land in a fresh batch that frozenly carries them, while
642/// pre-subscribe batches retain their original snapshot so the new
643/// sub doesn't double-receive earlier emits via flush AND handshake.
644pub(crate) struct PendingPerNode {
645    pub(crate) batches: SmallVec<[PendingBatch; 1]>,
646}
647
648impl PendingPerNode {
649    /// Iterate every queued message for this node across all batches in
650    /// arrival order. Used by R1.3.3.a invariant assertions and the
651    /// auto-resolve / Slice-G coalescing tier-3-presence checks, which
652    /// reason about wave-content per node, not per batch.
653    pub(crate) fn iter_messages(&self) -> impl Iterator<Item = &Message> + '_ {
654        self.batches.iter().flat_map(|b| b.messages.iter())
655    }
656
657    /// Mutable counterpart for `iter_messages`. Used by
658    /// `rewrite_prior_resolved_to_data` to in-place rewrite Resolved
659    /// entries to Data when a wave detects a multi-emit case after the
660    /// fact.
661    pub(crate) fn iter_messages_mut(&mut self) -> impl Iterator<Item = &mut Message> + '_ {
662        self.batches.iter_mut().flat_map(|b| b.messages.iter_mut())
663    }
664}
665
666/// RAII helper for the A6 reentrancy guard (Slice F, 2026-05-07).
667///
668/// Pushes `node_id` onto [`WaveState::currently_firing`] on construction,
669/// pops it on Drop. [`Core::set_deps`] consults the stack and rejects
670/// `set_deps(N, ...)` from inside N's own fn-fire with
671/// [`crate::node::SetDepsError::ReentrantOnFiringNode`] — closing the
672/// D1 hazard where Phase-1's snapshot of `dep_handles` would refer to
673/// a different dep ordering than Phase-3's `tracked` storage.
674///
675/// Wraps the lock-released `invoke_fn` (and operator-equivalent FFI
676/// callbacks like `project_each` / `predicate_each`). Drop fires even
677/// on panic, so the stack stays balanced under user-fn unwinds.
678///
679/// Membership semantics (NOT strict LIFO): the only consumer of
680/// `currently_firing` is `Core::set_deps`'s reentrancy check, which uses
681/// `contains(&n)` — a set-membership test. Drop pops the right-most
682/// matching `node_id` via `rposition` + `swap_remove`. For a stack like
683/// `[A, B, A]` (A's fn re-enters B, B's fn re-enters A), B's drop pops
684/// the SECOND A (index 1) via swap_remove, leaving `[A, A]` — the
685/// physical order of the remaining As may not match construction order,
686/// but membership is preserved. If a future call site needs strict LIFO
687/// (e.g. "pop the most recently fired node"), switch to `pop()` + assert
688/// the popped value equals `self.node_id`. (QA A6, 2026-05-07)
689pub(crate) struct FiringGuard {
690    core: Core,
691    node_id: NodeId,
692}
693
694impl FiringGuard {
695    pub(crate) fn new(core: &Core, node_id: NodeId) -> Self {
696        // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
697        // CoreState (cross-thread visible, restoring the D091 P13 check).
698        // Push under the state lock scope.
699        {
700            let mut s = core.lock_state();
701            s.currently_firing.push(node_id);
702        }
703        Self {
704            core: core.clone(),
705            node_id,
706        }
707    }
708}
709
710impl Drop for FiringGuard {
711    fn drop(&mut self) {
712        // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
713        // CoreState. Pop under state lock.
714        {
715            let mut s = self.core.lock_state();
716            // Pop the right-most matching node_id (membership semantics —
717            // not strict LIFO). If absent, an external rebalance already
718            // popped — silent no-op (panic-in-Drop is poison).
719            if let Some(pos) = s.currently_firing.iter().rposition(|n| *n == self.node_id) {
720                s.currently_firing.swap_remove(pos);
721            }
722        }
723    }
724}
725
726/// Borrow the per-operator scratch slot as `&T`. Panics if the slot is
727/// uninitialized or the contained type doesn't match `T` — both are
728/// invariant violations for any `fire_op_*` helper that should only be
729/// called from `fire_operator`'s match arm for the matching variant.
730fn scratch_ref<T: crate::op_state::OperatorScratch>(s: &CoreState, node_id: NodeId) -> &T {
731    s.require_node(node_id)
732        .op_scratch
733        .as_ref()
734        .expect("op_scratch slot uninitialized for operator node")
735        .as_any_ref()
736        .downcast_ref::<T>()
737        .expect("op_scratch type mismatch")
738}
739
740/// Mutable borrow of the per-operator scratch slot. Same invariants as
741/// [`scratch_ref`].
742fn scratch_mut<T: crate::op_state::OperatorScratch>(s: &mut CoreState, node_id: NodeId) -> &mut T {
743    s.require_node_mut(node_id)
744        .op_scratch
745        .as_mut()
746        .expect("op_scratch slot uninitialized for operator node")
747        .as_any_mut()
748        .downcast_mut::<T>()
749        .expect("op_scratch type mismatch")
750}
751
752impl Core {
753    // -------------------------------------------------------------------
754    // Wave entry + drain
755    // -------------------------------------------------------------------
756
757    /// Wave entry. The caller passes a closure that performs the wave's
758    /// triggering operation (`commit_emission`, `terminate_node`, etc.).
759    /// The closure runs lock-released; closure-internal Core methods
760    /// acquire the state lock as they go.
761    ///
762    /// **Implementation:** delegates to [`Self::begin_batch`] for the
763    /// wave's RAII lifecycle. The returned `BatchGuard` holds the
764    /// `wave_owner` re-entrant mutex for the wave's duration (cross-thread
765    /// emits block; same-thread re-entry passes through), claims `in_tick`,
766    /// and on drop runs the drain + flush + sink-fire phases — OR, if the
767    /// closure panicked, the panic-discard path that restores cache
768    /// snapshots and clears in_tick. This unification gives `run_wave` the
769    /// same panic-safety guarantee as the user-facing `Core::batch`.
770    ///
771    /// **Re-entrance:** a closure invoked from inside another wave — the
772    /// inner `run_wave`'s `begin_batch` observes `in_tick=true`, the
773    /// returned guard is non-owning (`owns_tick=false`), drop is a no-op.
774    /// The outer wave's drain picks up the inner closure's queued work.
775    ///
776    /// **Lock-release discipline (Slice A close, M1):** all binding-side
777    /// callbacks except the subscribe-time handshake fire lock-released.
778    /// Sinks that re-enter Core run a nested wave; user fns that re-enter
779    /// Core run a nested wave; custom-equals oracles that re-enter Core
780    /// run a nested wave. Cross-thread emits block at `wave_owner` until
781    /// the in-flight wave's drain completes — preserving the user-facing
782    /// "emit returning means subscribers have observed" contract.
783    /// Wave entry with a known `seed` node. Acquires only the partitions
784    /// transitively touched from `seed` (downstream cascade via
785    /// `s.children` + R1.3.9.d meta-companion cascade) instead of every
786    /// current partition. The canonical Y1 parallelism win for per-seed
787    /// entry points (`Core::emit`, `Core::subscribe`'s activation,
788    /// `Core::pause` / `Core::resume` / `Core::invalidate` / `Core::complete`
789    /// / `Core::error` / `Core::teardown` / `Core::set_deps`'s
790    /// push-on-subscribe).
791    ///
792    /// Two threads with disjoint touched-partition sets run truly
793    /// parallel — they don't block each other on Core-global locks.
794    /// Same-thread re-entry passes through each partition's
795    /// `ReentrantMutex` transparently. Cross-thread emits on the SAME
796    /// partition (or any overlapping touched-partition set) serialize
797    /// per the per-partition `wave_owner` mutex, preserving the
798    /// "emit returning means subscribers have observed" contract.
799    ///
800    /// Slice Y1 / Phase E (2026-05-08).
801    pub(crate) fn run_wave_for<F>(&self, seed: crate::handle::NodeId, op: F)
802    where
803        F: FnOnce(&Self),
804    {
805        let _guard = self.begin_batch_for(seed);
806        op(self);
807    }
808
809    /// Fallible wave entry. Returns `Err` if partition acquire violates
810    /// ascending order (Phase H+ STRICT, D115). Used by `try_emit` /
811    /// `try_complete` / `try_error`; the public `run_wave_for` calls
812    /// `begin_batch_for` which panics on violation.
813    pub(crate) fn try_run_wave_for<F>(
814        &self,
815        seed: crate::handle::NodeId,
816        op: F,
817    ) -> Result<(), crate::node::PartitionOrderViolation>
818    where
819        F: FnOnce(&Self),
820    {
821        let _guard = self.try_begin_batch_for(seed)?;
822        op(self);
823        Ok(())
824    }
825
826    /// Drain retains held by `wave_cache_snapshots` and return them so
827    /// the caller can release them lock-released. Called from the
828    /// wave-success path in [`BatchGuard::drop`].
829    ///
830    /// Q-beyond Sub-slice 1 (D108, 2026-05-09): the snapshots map moved
831    /// to per-thread `WaveState`; signature takes `&mut WaveState`. The
832    /// drain-and-release-lock-released discipline (introduced as /qa A1
833    /// fix 2026-05-09 against the prior cross_partition mutex) carries
834    /// over: caller drains under WaveState borrow + state lock, releases
835    /// after both are dropped — `release_handle` may re-enter Core via
836    /// finalizers and re-entry under either guard would deadlock /
837    /// double-borrow.
838    #[must_use]
839    pub(crate) fn drain_wave_cache_snapshots(ws: &mut WaveState) -> Vec<HandleId> {
840        if ws.wave_cache_snapshots.is_empty() {
841            return Vec::new();
842        }
843        std::mem::take(&mut ws.wave_cache_snapshots)
844            .into_values()
845            .collect()
846    }
847
848    /// Restore cache slots from `wave_cache_snapshots` and clear the map.
849    /// Called from the wave-abort path in `BatchGuard::drop` (panic).
850    ///
851    /// For each snapshotted node:
852    ///
853    /// 1. Read the current cache (the in-flight new value).
854    /// 2. Set `cache = old_handle` (the snapshot's retained value).
855    /// 3. Release the now-unowned current cache handle.
856    ///
857    /// Returns the list of "current" handles to release outside the lock.
858    /// Q-beyond Sub-slice 1 (D108, 2026-05-09): the snapshots map moved
859    /// to per-thread `WaveState`; signature takes both `s` (for cache
860    /// slots) and `ws` (for the snapshots map).
861    pub(crate) fn restore_wave_cache_snapshots(
862        &self,
863        s: &mut CoreState,
864        ws: &mut WaveState,
865    ) -> Vec<HandleId> {
866        if ws.wave_cache_snapshots.is_empty() {
867            return Vec::new();
868        }
869        let snapshots = std::mem::take(&mut ws.wave_cache_snapshots);
870        let mut releases = Vec::with_capacity(snapshots.len());
871        for (node_id, old_handle) in snapshots {
872            let Some(rec) = s.nodes.get_mut(&node_id) else {
873                releases.push(old_handle);
874                continue;
875            };
876            let current = std::mem::replace(&mut rec.cache, old_handle);
877            if current != NO_HANDLE {
878                releases.push(current);
879            }
880        }
881        releases
882    }
883
884    /// Drain pending fires until quiescent, then flush wave-end notifications
885    /// to subscribers. Each fire iteration drops the state lock around the
886    /// binding's `invoke_fn` callback so user fns may re-enter Core safely.
887    ///
888    /// `&self`-only — manages its own locking. Called from [`Self::run_wave`]
889    /// and [`super::node::Core::activate_derived`] (via `run_wave`).
890    pub(crate) fn drain_and_flush(&self) {
891        let mut guard = 0u32;
892        loop {
893            // R1.3.8.c (Slice F, A3): if no fires are pending but there are
894            // queued pause-overflow ERRORs, synthesize them now. The
895            // resulting ERROR cascade may add to pending_fires (children
896            // settling their terminal state), so we loop back to drain.
897            //
898            // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): pending_fires
899            // and pending_pause_overflow both live on per-thread
900            // WaveState. State lock no longer required for either read.
901            let synth_pending = with_wave_state(|ws| {
902                if ws.pending_fires.is_empty() && !ws.pending_pause_overflow.is_empty() {
903                    std::mem::take(&mut ws.pending_pause_overflow)
904                } else {
905                    Vec::new()
906                }
907            });
908            for entry in synth_pending {
909                // Lock-released call to the binding hook. Default impl
910                // returns None — the binding has opted out of R1.3.8.c
911                // and we fall back to silent-drop + ResumeReport.dropped.
912                let handle = self.binding.synthesize_pause_overflow_error(
913                    entry.node_id,
914                    entry.dropped_count,
915                    entry.configured_max,
916                    entry.lock_held_ns / 1_000_000,
917                );
918                if let Some(h) = handle {
919                    // Re-enter Core::error to terminate the node and
920                    // cascade. We're inside a wave (`in_tick = true`),
921                    // so error() gets a non-owning batch guard — it
922                    // doesn't try to start its own drain. The cascade
923                    // queues into our outer drain via pending_fires
924                    // and pending_notify.
925                    self.error(entry.node_id, h);
926                }
927            }
928
929            // Pick next fire under a short lock. Also re-read the configured
930            // drain cap so callers can tune via `Core::set_max_batch_drain_iterations`
931            // without restarting waves mid-flight.
932            //
933            // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
934            // on per-thread WaveState; pick_next_fire takes both state and
935            // WaveState. The pending_size diagnostic and emptiness check
936            // also read WaveState. Borrow scopes are split: WaveState
937            // borrow drops before fire_fn runs (which re-borrows WaveState
938            // via fire_regular / fire_operator).
939            let (next, cap, pending_size) = {
940                let s = self.lock_state();
941                let cap = s.max_batch_drain_iterations;
942                let (next, pending_size) = with_wave_state(|ws| {
943                    if ws.pending_fires.is_empty() {
944                        return (None, 0);
945                    }
946                    let size = ws.pending_fires.len();
947                    let next = Self::pick_next_fire(&s, ws);
948                    (next, size)
949                });
950                if pending_size == 0 {
951                    break;
952                }
953                (next, cap, pending_size)
954            };
955            guard += 1;
956            assert!(
957                guard < cap,
958                "wave drain exceeded {cap} iterations \
959                 (pending_fires={pending_size}). Most likely cause: a runtime \
960                 cycle introduced by an operator that re-arms its own pending_fires \
961                 slot from inside `invoke_fn` (e.g. a producer that subscribes to \
962                 itself, or a fn that calls Core::emit on a node whose fn fires \
963                 the original node again). Structural cycles via set_deps are \
964                 rejected at edge-mutation time. Tune via Core::set_max_batch_drain_iterations \
965                 only with concrete evidence the workload needs more iterations."
966            );
967            let Some(next) = next else { break };
968            // fire_fn manages its own locking around invoke_fn.
969            self.fire_fn(next);
970        }
971        // Auto-resolve sweep: nodes registered in pending_auto_resolve
972        // by the RESOLVED child propagation need a Resolved if they didn't
973        // fire and settle via their own commit_emission. Check pending_notify
974        // for each candidate — if it has Dirty but no tier-3+ message, the
975        // node never settled and needs auto-Resolved. Route through
976        // queue_notify so paused nodes get the Resolved into their pause
977        // buffer.
978        let mut s = self.lock_state();
979        // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): pending_auto_resolve
980        // and pending_notify both live on per-thread WaveState. /qa A5
981        // fix (2026-05-09): explicit scope for the WaveState borrow so
982        // it drops BEFORE the for-loop. Inside the loop, `queue_notify`
983        // re-borrows WaveState for `pending_pause_overflow.push` /
984        // `pending_notify` writes — re-entrance on RefCell::borrow_mut
985        // would panic. Explicit scope makes the lifetime load-bearing.
986        let candidates = with_wave_state(|ws| std::mem::take(&mut ws.pending_auto_resolve));
987        for node_id in candidates {
988            let needs_resolve = with_wave_state(|ws| {
989                ws.pending_notify
990                    .get(&node_id)
991                    .is_some_and(|entry| !entry.iter_messages().any(|m| m.tier() >= 3))
992            });
993            if needs_resolve {
994                self.queue_notify(&mut s, node_id, Message::Resolved);
995            }
996        }
997        // Final flush phase — populates deferred_flush_jobs
998        // from pending_notify (already carries per-node sink snapshots).
999        self.flush_notifications(&mut s);
1000    }
1001
1002    /// Pick the pending node with the lowest topological rank.
1003    ///
1004    /// Nodes with lower `topo_rank` have no transitive upstream in
1005    /// `pending_fires` (by construction — `topo_rank = 1 + max dep rank`).
1006    /// This is O(|pending_fires|) instead of the prior O(N·V) BFS.
1007    /// §10 perf optimization (D047, Slice U).
1008    ///
1009    /// Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_fires` lives on
1010    /// per-thread `WaveState`. Caller passes `&WaveState` alongside
1011    /// `&CoreState` so the borrow scopes stay disjoint and visible.
1012    fn pick_next_fire(s: &CoreState, ws: &WaveState) -> Option<NodeId> {
1013        ws.pending_fires
1014            .iter()
1015            .copied()
1016            .min_by_key(|&id| s.nodes.get(&id).map_or(0, |r| r.topo_rank))
1017    }
1018
1019    /// Wave drain entry point. Dispatches via `rec.op` to either the
1020    /// regular fn-fire path ([`Self::fire_regular`]) or the operator
1021    /// dispatch ([`Self::fire_operator`]).
1022    pub(crate) fn fire_fn(&self, node_id: NodeId) {
1023        let op = {
1024            let s = self.lock_state();
1025            s.nodes.get(&node_id).and_then(|r| r.op)
1026        };
1027        match op {
1028            Some(operator_op) => self.fire_operator(node_id, operator_op),
1029            None => {
1030                // State / Derived / Dynamic / Producer all dispatch via fn_id.
1031                self.fire_regular(node_id);
1032            }
1033        }
1034    }
1035
1036    /// Fire a node's fn lock-released around `invoke_fn`.
1037    ///
1038    /// Phase 1 (lock-held): remove from pending_fires, snapshot fn_id +
1039    /// dep_records → DepBatch + kind. Skip if terminal, first-run-gate-closed,
1040    /// or stateless.
1041    ///
1042    /// Phase 2 (lock-released): call `BindingBoundary::invoke_fn`. User fn
1043    /// callbacks may re-enter Core (`emit`, `pause`, etc.) and run a nested
1044    /// wave — the in_tick gate composes naturally because nested calls
1045    /// observe `in_tick = true` and skip their own drain.
1046    ///
1047    /// Phase 3 (lock-held): mark `has_fired_once`, store dynamic-tracked,
1048    /// decide between Noop+RESOLVED, single Data, or Batch.
1049    ///
1050    /// Phase 4: commit emissions. Single Data goes through
1051    /// `commit_emission` (with equals substitution). Batch emissions are
1052    /// processed in sequence — Data via `commit_emission_verbatim` (no
1053    /// equals substitution per R1.3.2.d / R1.3.3.c), Complete/Error via
1054    /// terminal cascade.
1055    #[allow(clippy::too_many_lines)] // Slice G added Noop / Batch tier-3 guards
1056    fn fire_regular(&self, node_id: NodeId) {
1057        enum FireAction {
1058            None,
1059            SingleData(HandleId),
1060            Batch(SmallVec<[FnEmission; 2]>),
1061        }
1062
1063        // Phase 1: snapshot inputs — build DepBatch per dep from dep_records.
1064        // `has_fired_once` is captured here for the Slice E2 OnRerun gate
1065        // (Phase 1.5 below): the cleanup hook only fires when the fn has
1066        // run at least once already in this activation cycle.
1067        let prep: Option<(crate::handle::FnId, Vec<DepBatch>, bool, bool)> = {
1068            let s = self.lock_state();
1069            // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
1070            // on per-thread WaveState. Removed via with_wave_state — no
1071            // re-entry concern because only the immediate remove happens
1072            // under the borrow.
1073            with_wave_state(|ws| {
1074                ws.pending_fires.remove(&node_id);
1075            });
1076            let rec = s.require_node(node_id);
1077            // Skip: terminal, first-run-gate-closed (R2.5.3 / R5.4 — partial
1078            // mode opts out of the gate per D011), or stateless.
1079            if rec.terminal.is_some() || (!rec.partial && rec.has_sentinel_deps()) {
1080                None
1081            } else {
1082                rec.fn_id.map(|fn_id| {
1083                    let use_mask = rec.dep_records.len() <= 64;
1084                    let mask = rec.involved_mask;
1085                    let dep_batches: Vec<DepBatch> = rec
1086                        .dep_records
1087                        .iter()
1088                        .enumerate()
1089                        .map(|(i, dr)| DepBatch {
1090                            data: dr.data_batch.clone(),
1091                            prev_data: dr.prev_data,
1092                            // §10.3 perf (Slice V1): derive from bitmask
1093                            // for ≤64 deps; fall back to per-dep field.
1094                            involved: if use_mask {
1095                                (mask >> i) & 1 != 0
1096                            } else {
1097                                dr.involved_this_wave
1098                            },
1099                        })
1100                        .collect();
1101                    (fn_id, dep_batches, rec.is_dynamic, rec.has_fired_once)
1102                })
1103            }
1104        };
1105        let Some((fn_id, dep_batches, is_dynamic, has_fired_once)) = prep else {
1106            return;
1107        };
1108
1109        // Phase 1.5 (Slice E2 — R2.4.5 OnRerun, lock-released per D045): if
1110        // the fn has fired at least once in this activation cycle, fire its
1111        // OnRerun cleanup hook BEFORE the next invoke_fn re-allocates fn-
1112        // local resources. First-fire is intentionally skipped — there is
1113        // no prior run to clean up. Fires OUTSIDE `FiringGuard` because
1114        // cleanup re-entrance is not the A6 reentrancy concern (which
1115        // protects against `set_deps(self, ...)` from inside the in-flight
1116        // invoke_fn). Operator nodes never reach this path (`fire_regular`
1117        // is the fn-id branch of `fire_fn`; operators dispatch via
1118        // `fire_operator`), so cleanup hooks correctly only fire for fn-
1119        // shaped nodes (state / derived / dynamic / producer).
1120        if has_fired_once {
1121            self.binding
1122                .cleanup_for(node_id, crate::boundary::CleanupTrigger::OnRerun);
1123        }
1124
1125        // Phase 2: invoke fn lock-released. A6 reentrancy guard is scoped to
1126        // the FFI call only — Phase 3's lock-held state mutation is not part
1127        // of "currently firing" because set_deps would already block on the
1128        // state lock by then. Drop on the guard pops the stack even if
1129        // invoke_fn panics, keeping `currently_firing` balanced.
1130        let result = {
1131            let _firing = FiringGuard::new(self, node_id);
1132            self.binding.invoke_fn(node_id, fn_id, &dep_batches)
1133        };
1134
1135        // Phase 3: apply result under the lock — defensive terminal check
1136        // (a sibling cascade may have terminated this node during phase 2).
1137        let action: FireAction = {
1138            let mut s = self.lock_state();
1139            // Defensive: node may have terminated mid-phase-2 via a sibling
1140            // cascade (a fn that re-entered `Core::error` on a path that
1141            // cascaded here). If so, release any payload handles and no-op.
1142            if s.require_node(node_id).terminal.is_some() {
1143                match &result {
1144                    FnResult::Data { handle, .. } => {
1145                        self.binding.release_handle(*handle);
1146                    }
1147                    FnResult::Batch { emissions, .. } => {
1148                        for em in emissions {
1149                            match em {
1150                                FnEmission::Data(h) | FnEmission::Error(h) => {
1151                                    self.binding.release_handle(*h);
1152                                }
1153                                FnEmission::Complete => {}
1154                            }
1155                        }
1156                    }
1157                    FnResult::Noop { .. } => {}
1158                }
1159                return;
1160            }
1161            let rec = s.require_node_mut(node_id);
1162            rec.has_fired_once = true;
1163            if is_dynamic {
1164                let tracked = match &result {
1165                    FnResult::Data { tracked, .. }
1166                    | FnResult::Noop { tracked }
1167                    | FnResult::Batch { tracked, .. } => tracked.clone(),
1168                };
1169                if let Some(t) = tracked {
1170                    rec.tracked = t.into_iter().collect();
1171                }
1172            }
1173            match result {
1174                FnResult::Noop { .. } => {
1175                    // Slice G: skip Resolved if a prior emission in the same
1176                    // wave already queued tier-3 (would violate R1.3.3.a).
1177                    // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify
1178                    // lives on per-thread WaveState. Borrow scoped to the
1179                    // tier3 read so queue_notify (which re-borrows
1180                    // WaveState) doesn't double-borrow.
1181                    let already_dirty = s.require_node(node_id).dirty;
1182                    let already_tier3 = with_wave_state(|ws| {
1183                        ws.pending_notify
1184                            .get(&node_id)
1185                            .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
1186                    });
1187                    if already_dirty && !already_tier3 {
1188                        self.queue_notify(&mut s, node_id, Message::Resolved);
1189                    }
1190                    FireAction::None
1191                }
1192                FnResult::Data { handle, .. } => FireAction::SingleData(handle),
1193                FnResult::Batch { emissions, .. } if emissions.is_empty() => {
1194                    // Empty Batch is equivalent to Noop — settle with
1195                    // RESOLVED if the node was dirty (R1.3.1.a). Slice G:
1196                    // skip if a prior emission already queued tier-3.
1197                    // Q-beyond Sub-slice 2 (D108, 2026-05-09): see Noop
1198                    // arm above for the WaveState borrow scope rationale.
1199                    let already_dirty = s.require_node(node_id).dirty;
1200                    let already_tier3 = with_wave_state(|ws| {
1201                        ws.pending_notify
1202                            .get(&node_id)
1203                            .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
1204                    });
1205                    if already_dirty && !already_tier3 {
1206                        self.queue_notify(&mut s, node_id, Message::Resolved);
1207                    }
1208                    FireAction::None
1209                }
1210                FnResult::Batch { emissions, .. } => FireAction::Batch(emissions),
1211            }
1212        };
1213
1214        // Phase 4: commit emissions.
1215        match action {
1216            FireAction::None => {}
1217            // Single Data — equals substitution applies (R1.3.2).
1218            FireAction::SingleData(handle) => {
1219                self.commit_emission(node_id, handle);
1220            }
1221            // Batch — process in sequence. No equals substitution
1222            // (R1.3.2.d / R1.3.3.c: multi-message waves pass verbatim).
1223            FireAction::Batch(emissions) => {
1224                self.commit_batch(node_id, emissions);
1225            }
1226        }
1227    }
1228
1229    /// Process a `FnResult::Batch` emissions sequence. Each `Data` goes
1230    /// through `commit_emission_verbatim` (no equals substitution per
1231    /// R1.3.2.d / R1.3.3.c). Terminal emissions (`Complete` / `Error`)
1232    /// cascade per R1.3.4; processing stops at the first terminal and
1233    /// remaining handles are released (R1.3.4.a: no further messages
1234    /// after terminal).
1235    fn commit_batch(&self, node_id: NodeId, emissions: SmallVec<[FnEmission; 2]>) {
1236        let mut iter = emissions.into_iter();
1237        for em in iter.by_ref() {
1238            match em {
1239                FnEmission::Data(handle) => {
1240                    self.commit_emission_verbatim(node_id, handle);
1241                }
1242                FnEmission::Complete => {
1243                    self.complete(node_id);
1244                    break;
1245                }
1246                FnEmission::Error(handle) => {
1247                    self.error(node_id, handle);
1248                    break;
1249                }
1250            }
1251        }
1252        // Release handles from any emissions after the terminal break.
1253        for em in iter {
1254            match em {
1255                FnEmission::Data(h) | FnEmission::Error(h) => {
1256                    self.binding.release_handle(h);
1257                }
1258                FnEmission::Complete => {}
1259            }
1260        }
1261    }
1262
1263    // -------------------------------------------------------------------
1264    // Emission commit — equals-substitution lives here
1265    // -------------------------------------------------------------------
1266
1267    /// Apply a node's emission. `&self`-only; brackets the equals check
1268    /// around a lock release so `BindingBoundary::custom_equals` can re-enter
1269    /// Core safely.
1270    ///
1271    /// Phase 1 (lock-held): defensive terminal short-circuit; snapshot
1272    /// equals_mode + old cache handle.
1273    ///
1274    /// Phase 2 (lock-released): call `handles_equal` — `EqualsMode::Identity`
1275    /// is a pure `u64` compare with no boundary call; `EqualsMode::Custom`
1276    /// crosses to the binding's `custom_equals` oracle, which may re-enter
1277    /// Core.
1278    ///
1279    /// Phase 3 (lock-held): set cache, queue Dirty + Data/Resolved into
1280    /// pending_notify (which snapshots subscribers on first touch),
1281    /// propagate to children.
1282    // Q2 / Q3 (2026-05-09) tipped past clippy's 100-line threshold; the
1283    // function is already a multi-phase wave-engine routine and breaking
1284    // out the four phases would obscure the lock-discipline.
1285    #[allow(clippy::too_many_lines)]
1286    pub(crate) fn commit_emission(&self, node_id: NodeId, new_handle: HandleId) {
1287        assert!(
1288            new_handle != NO_HANDLE,
1289            "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
1290        );
1291
1292        // Phase 1: terminal short-circuit + snapshot equals/cache.
1293        let snapshot = {
1294            let s = self.lock_state();
1295            let rec = s.require_node(node_id);
1296            if rec.terminal.is_some() {
1297                drop(s);
1298                self.binding.release_handle(new_handle);
1299                return;
1300            }
1301            (rec.cache, rec.equals)
1302        };
1303        let (old_handle, equals_mode) = snapshot;
1304
1305        // Slice G (2026-05-07): R1.3.2.d says equals substitution only
1306        // fires for SINGLE-DATA waves at one node. Detect "this is a
1307        // subsequent emit in the same wave at this node" via the
1308        // per-thread `TIER3_EMITTED_THIS_WAVE` thread-local
1309        // (D1 patch, 2026-05-09 — moved off per-partition state to be
1310        // robust against mid-wave cross-thread `set_deps` partition
1311        // splits). If set → multi-emit wave: skip equals, queue Data
1312        // verbatim, retroactively rewrite any prior Resolved (queued by
1313        // an earlier same-value emit's equals match) to Data using the
1314        // wave-start cache snapshot. Outside batch / first emit:
1315        // standard per-emit equals path. Thread-local lookup is
1316        // ~5ns and lock-free.
1317        let is_subsequent_emit_in_wave = tier3_check(node_id);
1318
1319        if is_subsequent_emit_in_wave {
1320            // Multi-emit wave detected. Skip equals, queue Data verbatim.
1321            // Also rewrite any prior Resolved entries to Data using the
1322            // wave-start cache snapshot.
1323            self.rewrite_prior_resolved_to_data(node_id);
1324            self.commit_emission_verbatim(node_id, new_handle);
1325            return;
1326        }
1327
1328        // Phase 2: equals check (lock-released for Custom).
1329        let is_data = !self.handles_equal_lock_released(equals_mode, old_handle, new_handle);
1330
1331        // Phase 3: apply emission under the lock. Defensive terminal
1332        // re-check — a concurrent cascade between phase 2 and phase 3
1333        // could have terminated the node.
1334        let mut s = self.lock_state();
1335        if s.require_node(node_id).terminal.is_some() {
1336            drop(s);
1337            self.binding.release_handle(new_handle);
1338            return;
1339        }
1340
1341        // R1.3.1.a condition (b): synthesize DIRTY only if node not already
1342        // dirty from an earlier emission in the same wave.
1343        let already_dirty = s.require_node(node_id).dirty;
1344        s.require_node_mut(node_id).dirty = true;
1345        if !already_dirty {
1346            self.queue_notify(&mut s, node_id, Message::Dirty);
1347        }
1348
1349        if is_data {
1350            // P3 (Slice A close /qa): re-read CURRENT cache. Same-thread
1351            // re-entry from a `custom_equals` oracle that called back into
1352            // `Core::emit` on this same node during phase 2's lock-released
1353            // equals check could have advanced the cache between phase 1's
1354            // snapshot (`old_handle`) and this point.
1355            let current_cache = s.require_node(node_id).cache;
1356            // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1357            // lives on per-thread WaveState. `in_tick` is per-(Core,
1358            // thread) (`IN_TICK_OWNED`); this read is on the wave-owner
1359            // thread, so it observes this thread's own ownership.
1360            let in_tick = self.in_tick();
1361            let snapshot_taken = if in_tick && current_cache != NO_HANDLE {
1362                use std::collections::hash_map::Entry;
1363                with_wave_state(|ws| match ws.wave_cache_snapshots.entry(node_id) {
1364                    Entry::Vacant(slot) => {
1365                        slot.insert(current_cache);
1366                        true
1367                    }
1368                    Entry::Occupied(_) => false,
1369                })
1370            } else {
1371                false
1372            };
1373            s.require_node_mut(node_id).cache = new_handle;
1374            if current_cache != NO_HANDLE && !snapshot_taken {
1375                self.binding.release_handle(current_cache);
1376            }
1377            // Slice E1 (R2.6.5 / Lock 6.G): push DATA into the replay
1378            // buffer if the node opted in. RESOLVED entries are NOT
1379            // buffered (canonical "DATA only").
1380            self.push_replay_buffer(&mut s, node_id, new_handle);
1381            // Slice G (D1 patch, 2026-05-09): mark this node as having
1382            // emitted tier-3 in this wave on the per-thread tracker.
1383            tier3_mark(node_id);
1384            self.queue_notify(&mut s, node_id, Message::Data(new_handle));
1385            // Propagate to children
1386            let child_ids: Vec<NodeId> = s
1387                .children
1388                .get(&node_id)
1389                .map(|c| c.iter().copied().collect())
1390                .unwrap_or_default();
1391            for child_id in child_ids {
1392                let dep_idx = s.require_node(child_id).dep_index_of(node_id);
1393                if let Some(idx) = dep_idx {
1394                    self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
1395                }
1396            }
1397        } else {
1398            // RESOLVED: handle unchanged. Don't release; old still in use.
1399            // Slice G: snapshot cache so a subsequent same-wave emit can
1400            // rewrite this Resolved to Data using the snapshot.
1401            // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1402            // lives on per-thread WaveState. /qa F1 reverted (2026-05-10):
1403            // `in_tick` is per-(Core, thread); read on the wave-owner
1404            // thread (observes this thread's own ownership).
1405            let current_cache = s.require_node(node_id).cache;
1406            if self.in_tick() && current_cache != NO_HANDLE {
1407                use std::collections::hash_map::Entry;
1408                with_wave_state(|ws| {
1409                    if let Entry::Vacant(slot) = ws.wave_cache_snapshots.entry(node_id) {
1410                        self.binding.retain_handle(current_cache);
1411                        slot.insert(current_cache);
1412                    }
1413                });
1414            }
1415            // Slice G (D1 patch, 2026-05-09): mark this node as having
1416            // emitted tier-3 in this wave on the per-thread tracker.
1417            tier3_mark(node_id);
1418            self.queue_notify(&mut s, node_id, Message::Resolved);
1419            let child_ids: Vec<NodeId> = s
1420                .children
1421                .get(&node_id)
1422                .map(|c| c.iter().copied().collect())
1423                .unwrap_or_default();
1424            // /qa A7 fix (2026-05-09): collect auto-resolve inserts
1425            // during the loop and bulk-insert into pending_auto_resolve
1426            // under a SINGLE cross_partition acquire after the loop.
1427            // Pre-fix the loop acquired `cross_partition` once per
1428            // child via `self.lock_cross_partition().pending_auto_resolve.insert(...)`,
1429            // which is N mutex hops for an N-child cascade. Cannot
1430            // hoist to acquire-cps-before-loop because `queue_notify`
1431            // (called inside the loop) also acquires cross_partition
1432            // for `pending_pause_overflow.push` in the rare overflow
1433            // case — re-entrance on the non-reentrant Mutex would
1434            // self-deadlock.
1435            let mut auto_resolve_inserts: SmallVec<[NodeId; 4]> = SmallVec::new();
1436            for child_id in child_ids {
1437                let already_involved = s.require_node(child_id).involved_this_wave;
1438                if !already_involved {
1439                    {
1440                        let child = s.require_node_mut(child_id);
1441                        child.involved_this_wave = true;
1442                        child.dirty = true;
1443                    }
1444                    self.queue_notify(&mut s, child_id, Message::Dirty);
1445                    // Q2 (2026-05-09): pending_auto_resolve lives on
1446                    // CrossPartitionState. Deferred to after-loop
1447                    // bulk insert per the /qa A7 fix above.
1448                    auto_resolve_inserts.push(child_id);
1449                }
1450            }
1451            // /qa A7 (2026-05-09) — preserved post-Sub-slice-1 (D108):
1452            // single WaveState borrow for the bulk-insert. queue_notify
1453            // above no longer holds the WaveState borrow by the time we
1454            // reach here, so this borrow is uncontested.
1455            if !auto_resolve_inserts.is_empty() {
1456                with_wave_state(|ws| ws.pending_auto_resolve.extend(auto_resolve_inserts));
1457            }
1458        }
1459    }
1460
1461    /// Slice G: when a multi-emit wave is detected at `node_id` (a second
1462    /// emit arrives while a prior tier-3 message is still pending), rewrite
1463    /// any `Resolved` entries from earlier emits to `Data(snapshot_cache)`
1464    /// so the wave conforms to R1.3.3.a (≥1 DATA OR exactly 1 RESOLVED).
1465    /// Touches both `pending_notify` (immediate-flush path) and the per-node
1466    /// pause buffer (paused path).
1467    fn rewrite_prior_resolved_to_data(&self, node_id: NodeId) {
1468        let mut s = self.lock_state();
1469        // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): wave_cache_snapshots
1470        // and pending_notify both live on per-thread WaveState. Single
1471        // WaveState borrow handles both the snapshot lookup and the
1472        // pending_notify rewrite; the pause-buffer path uses the state
1473        // lock and is independent of WaveState.
1474        let snapshot = match with_wave_state(|ws| ws.wave_cache_snapshots.get(&node_id).copied()) {
1475            Some(h) if h != NO_HANDLE => h,
1476            // No snapshot available — the prior Resolved was queued without
1477            // a cache (sentinel pre-emit). Nothing to rewrite to; the
1478            // multi-emit case from sentinel is fine (verbatim Data path).
1479            _ => return,
1480        };
1481        let mut retains_needed = 0u32;
1482        // Pending_notify path. Walk all batches' messages — Slice-G
1483        // coalescing reasons about wave-content per node, not per-batch.
1484        with_wave_state(|ws| {
1485            if let Some(entry) = ws.pending_notify.get_mut(&node_id) {
1486                for msg in entry.iter_messages_mut() {
1487                    if matches!(msg, Message::Resolved) {
1488                        *msg = Message::Data(snapshot);
1489                        retains_needed += 1;
1490                    }
1491                }
1492            }
1493        });
1494        // Pause-buffer path.
1495        if let Some(rec) = s.nodes.get_mut(&node_id) {
1496            if let crate::node::PauseState::Paused { buffer, .. } = &mut rec.pause_state {
1497                for msg in &mut *buffer {
1498                    if matches!(msg, Message::Resolved) {
1499                        *msg = Message::Data(snapshot);
1500                        retains_needed += 1;
1501                    }
1502                }
1503            }
1504        }
1505        drop(s);
1506        // Each rewritten Resolved → Data adds a payload retain that
1507        // queue_notify would otherwise have taken at emit time. The
1508        // snapshot already owns one retain (taken when cache was
1509        // snapshotted); we need one fresh retain per rewrite.
1510        for _ in 0..retains_needed {
1511            self.binding.retain_handle(snapshot);
1512        }
1513    }
1514
1515    /// Equals check that crosses the binding boundary lock-released for
1516    /// `EqualsMode::Custom`. Caller must NOT hold the state lock.
1517    fn handles_equal_lock_released(&self, mode: EqualsMode, a: HandleId, b: HandleId) -> bool {
1518        if a == b {
1519            return true; // identity-on-handles always sufficient
1520        }
1521        if a == NO_HANDLE || b == NO_HANDLE {
1522            return false;
1523        }
1524        match mode {
1525            EqualsMode::Identity => false,
1526            EqualsMode::Custom(handle) => self.binding.custom_equals(handle, a, b),
1527        }
1528    }
1529
1530    /// Commit a DATA emission **without** equals substitution — used by
1531    /// `FnResult::Batch` processing where multi-message waves pass through
1532    /// verbatim per R1.3.2.d / R1.3.3.c. DIRTY auto-prefix respects
1533    /// R1.3.1.a condition (b): only queued if node not already dirty.
1534    ///
1535    /// Structurally identical to the DATA branch of [`Self::commit_emission`]
1536    /// but skips the Phase 2 equals check entirely.
1537    fn commit_emission_verbatim(&self, node_id: NodeId, new_handle: HandleId) {
1538        assert!(
1539            new_handle != NO_HANDLE,
1540            "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
1541        );
1542
1543        let mut s = self.lock_state();
1544        let rec = s.require_node(node_id);
1545        if rec.terminal.is_some() {
1546            drop(s);
1547            self.binding.release_handle(new_handle);
1548            return;
1549        }
1550
1551        // R1.3.1.a condition (b): DIRTY only if not already dirty.
1552        let already_dirty = s.require_node(node_id).dirty;
1553        s.require_node_mut(node_id).dirty = true;
1554        if !already_dirty {
1555            self.queue_notify(&mut s, node_id, Message::Dirty);
1556        }
1557
1558        // Always DATA — no equals substitution for Batch emissions.
1559        // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1560        // lives on per-thread WaveState. /qa F1 reverted (2026-05-10):
1561        // `in_tick` is per-(Core, thread); read on the wave-owner thread.
1562        let current_cache = s.require_node(node_id).cache;
1563        let snapshot_taken = if self.in_tick() && current_cache != NO_HANDLE {
1564            use std::collections::hash_map::Entry;
1565            with_wave_state(|ws| match ws.wave_cache_snapshots.entry(node_id) {
1566                Entry::Vacant(slot) => {
1567                    slot.insert(current_cache);
1568                    true
1569                }
1570                Entry::Occupied(_) => false,
1571            })
1572        } else {
1573            false
1574        };
1575        s.require_node_mut(node_id).cache = new_handle;
1576        if current_cache != NO_HANDLE && !snapshot_taken {
1577            self.binding.release_handle(current_cache);
1578        }
1579        // Slice E1: replay buffer push (R2.6.5 / Lock 6.G).
1580        self.push_replay_buffer(&mut s, node_id, new_handle);
1581        // Slice G QA fix (A2, 2026-05-07) / D1 patch (2026-05-09): mark
1582        // tier3_emitted_this_wave on the per-thread tracker even on the
1583        // verbatim path. A subsequent commit_emission at the same node
1584        // in the same wave needs this flag to detect multi-emit and
1585        // skip equals substitution; without it, a Batch-then-standard
1586        // sequence would queue Resolved into a wave that already has
1587        // Data — violating R1.3.3.a. The Batch path itself still
1588        // passes verbatim per R1.3.3.c (we don't re-run equals here);
1589        // we just record that "this node has emitted tier-3 in this
1590        // wave."
1591        tier3_mark(node_id);
1592        self.queue_notify(&mut s, node_id, Message::Data(new_handle));
1593        // Propagate to children
1594        let child_ids: Vec<NodeId> = s
1595            .children
1596            .get(&node_id)
1597            .map(|c| c.iter().copied().collect())
1598            .unwrap_or_default();
1599        for child_id in child_ids {
1600            let dep_idx = s.require_node(child_id).dep_index_of(node_id);
1601            if let Some(idx) = dep_idx {
1602                self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
1603            }
1604        }
1605    }
1606
1607    /// Slice E1 (R2.6.5 / Lock 6.G): push a DATA handle into the node's
1608    /// replay buffer if opted in. Evicts oldest if cap exceeded; takes a
1609    /// fresh retain on push. RESOLVED is NOT buffered per canonical
1610    /// "DATA only" — call sites only invoke this for Data emissions.
1611    ///
1612    /// Evicted handle is queued into `cps.deferred_handle_releases`
1613    /// (released lock-released at flush time) per the binding-boundary
1614    /// lock-release discipline — `release_handle` may re-enter Core via
1615    /// finalizers and must not run while the state lock is held
1616    /// (QA A3, 2026-05-07). Q2 (2026-05-09): the queue moved to
1617    /// CrossPartitionState; this fn acquires `cross_partition` only
1618    /// when an eviction actually happens (the common case is no
1619    /// eviction → no second-mutex acquire).
1620    fn push_replay_buffer(&self, s: &mut CoreState, node_id: NodeId, new_handle: HandleId) {
1621        let rec = s.require_node_mut(node_id);
1622        let cap = match rec.replay_buffer_cap {
1623            Some(c) if c > 0 => c,
1624            _ => return,
1625        };
1626        self.binding.retain_handle(new_handle);
1627        rec.replay_buffer.push_back(new_handle);
1628        let evicted = if rec.replay_buffer.len() > cap {
1629            rec.replay_buffer.pop_front()
1630        } else {
1631            None
1632        };
1633        if let Some(h) = evicted {
1634            with_wave_state(|ws| ws.deferred_handle_releases.push(h));
1635        }
1636    }
1637
1638    // ===================================================================
1639    // Operator dispatch (Slice C-1, D009).
1640    //
1641    // `fire_operator` is the entry point for nodes whose `kind` is
1642    // `NodeKind::Operator(_)`. It branches on the `OperatorOp` discriminant
1643    // to per-operator helpers that snapshot inputs under the lock, drop the
1644    // lock to call the binding's bulk projection FFI, and reacquire to
1645    // apply emissions via `commit_emission_verbatim` (no per-item equals
1646    // dedup at the wire — operator output passes verbatim per the same
1647    // R1.3.2.d / R1.3.3.c rule that governs `FnResult::Batch`).
1648    //
1649    // **Refcount discipline:** inputs sourced from `dep_records[i].data_batch`
1650    // share retains owned by the wave's data-batch slot (released at
1651    // wave-end rotation in `clear_wave_state`). Operators that emit those
1652    // handles unchanged (`Filter`, `DistinctUntilChanged`, `Pairwise`'s
1653    // `prev` carry-over) take an additional retain via `retain_handle`
1654    // before passing to `commit_emission_verbatim` — the cache slot owns
1655    // its own share, independent of the data-batch slot's. Operators that
1656    // produce fresh handles (`Map` / `Scan` / `Reduce` / `Pairwise`'s
1657    // packed tuples) receive retains pre-bumped by the binding's bulk-
1658    // projection method.
1659    // ===================================================================
1660
1661    /// Operator dispatch entry. Pre-checks (terminal short-circuit, first-
1662    /// run gate accounting for `partial`, terminal-aware fire for `Reduce`)
1663    /// happen here; per-operator behavior lives in the `fire_op_*` helpers.
1664    fn fire_operator(&self, node_id: NodeId, op: OperatorOp) {
1665        // Phase 1 (lock-held): remove from pending_fires, evaluate skip.
1666        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
1667        // per-thread WaveState; state lock + WaveState borrow are
1668        // independent.
1669        let proceed = {
1670            let s = self.lock_state();
1671            with_wave_state(|ws| {
1672                ws.pending_fires.remove(&node_id);
1673            });
1674            let rec = s.require_node(node_id);
1675            if rec.terminal.is_some() {
1676                false
1677            } else {
1678                // First-run gate (R2.5.3 / R5.4). Partial-mode operators
1679                // (D011) opt out of the gate; otherwise we wait for every
1680                // dep to have delivered at least one real handle. Terminal-
1681                // aware operators (currently `Reduce`) additionally count a
1682                // dep terminal as "real input" so they can fire on
1683                // upstream COMPLETE-without-DATA and emit the seed.
1684                let has_real_input = !rec.has_sentinel_deps()
1685                    || rec.dep_records.iter().any(|dr| dr.terminal.is_some());
1686                rec.partial || has_real_input
1687            }
1688        };
1689        if !proceed {
1690            return;
1691        }
1692
1693        // A6 (Slice F, 2026-05-07): track operator fire on the
1694        // `currently_firing` stack so a binding-side project/predicate/fold
1695        // FFI callback that re-enters `Core::set_deps(node_id, ...)` is
1696        // rejected with `SetDepsError::ReentrantOnFiringNode`. Drop pops
1697        // the stack on panic too.
1698        let _firing = FiringGuard::new(self, node_id);
1699
1700        match op {
1701            OperatorOp::Map { fn_id } => self.fire_op_map(node_id, fn_id),
1702            OperatorOp::Filter { fn_id } => self.fire_op_filter(node_id, fn_id),
1703            OperatorOp::Scan { fn_id, .. } => self.fire_op_scan(node_id, fn_id),
1704            OperatorOp::Reduce { fn_id, .. } => self.fire_op_reduce(node_id, fn_id),
1705            OperatorOp::DistinctUntilChanged { equals_fn_id } => {
1706                self.fire_op_distinct(node_id, equals_fn_id);
1707            }
1708            OperatorOp::Pairwise { fn_id } => self.fire_op_pairwise(node_id, fn_id),
1709            OperatorOp::Combine { pack_fn } => self.fire_op_combine(node_id, pack_fn),
1710            OperatorOp::WithLatestFrom { pack_fn } => {
1711                self.fire_op_with_latest_from(node_id, pack_fn);
1712            }
1713            OperatorOp::Merge => self.fire_op_merge(node_id),
1714            OperatorOp::Take { count } => self.fire_op_take(node_id, count),
1715            OperatorOp::Skip { count } => self.fire_op_skip(node_id, count),
1716            OperatorOp::TakeWhile { fn_id } => self.fire_op_take_while(node_id, fn_id),
1717            // The variant carries `default` for `register_operator`'s
1718            // `make_op_scratch` path; once registered, the live default
1719            // is read from `LastState::default` inside `fire_op_last`.
1720            OperatorOp::Last { .. } => self.fire_op_last(node_id),
1721            OperatorOp::Tap { fn_id } => self.fire_op_tap(node_id, fn_id),
1722            OperatorOp::TapFirst { fn_id } => self.fire_op_tap_first(node_id, fn_id),
1723            OperatorOp::Valve => self.fire_op_valve(node_id),
1724            OperatorOp::Settle {
1725                quiet_waves,
1726                max_waves,
1727            } => self.fire_op_settle(node_id, quiet_waves, max_waves),
1728        }
1729    }
1730
1731    /// Snapshot the operator's single dep batch (transform constraint —
1732    /// R5.7 single-dep). Returns `(inputs, terminal)` where `inputs` is a
1733    /// fresh `Vec<HandleId>` (no retains) and `terminal` reflects
1734    /// `dep_records[0].terminal` at snapshot time.
1735    fn snapshot_op_dep0(&self, node_id: NodeId) -> (Vec<HandleId>, Option<TerminalKind>) {
1736        let s = self.lock_state();
1737        let rec = s.require_node(node_id);
1738        debug_assert!(
1739            !rec.dep_records.is_empty(),
1740            "transform operator must have ≥1 dep"
1741        );
1742        let dr = &rec.dep_records[0];
1743        (dr.data_batch.iter().copied().collect(), dr.terminal)
1744    }
1745
1746    /// Emit DIRTY (if not already dirty) followed by RESOLVED. Used by
1747    /// silent-drop operators (Filter / DistinctUntilChanged / Pairwise)
1748    /// when a wave's inputs all suppress and the operator needs to settle
1749    /// the wave for its subscribers (D018 — let DIRTY ride; queue RESOLVED
1750    /// on full-reject).
1751    fn settle_dirty_resolved(&self, node_id: NodeId) {
1752        let mut s = self.lock_state();
1753        if s.require_node(node_id).terminal.is_some() {
1754            return;
1755        }
1756        let already_dirty = s.require_node(node_id).dirty;
1757        s.require_node_mut(node_id).dirty = true;
1758        if !already_dirty {
1759            self.queue_notify(&mut s, node_id, Message::Dirty);
1760        }
1761        // Slice G: skip Resolved if pending_notify already has a tier-3
1762        // message — adding Resolved would violate R1.3.3.a.
1763        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
1764        // on per-thread WaveState; borrow scoped so queue_notify can
1765        // re-borrow.
1766        let already_tier3 = with_wave_state(|ws| {
1767            ws.pending_notify
1768                .get(&node_id)
1769                .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
1770        });
1771        if !already_tier3 {
1772            self.queue_notify(&mut s, node_id, Message::Resolved);
1773        }
1774    }
1775
1776    /// `OperatorOp::Map` dispatch.
1777    fn fire_op_map(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1778        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1779        // Mark fired regardless of input count (activation gate already
1780        // satisfied or partial-mode).
1781        {
1782            let mut s = self.lock_state();
1783            s.require_node_mut(node_id).has_fired_once = true;
1784        }
1785        if inputs.is_empty() {
1786            return;
1787        }
1788        // Phase 2 (lock-released): bulk project. Binding returns one
1789        // handle per input, each with a retain share already taken.
1790        let outputs = self.binding.project_each(fn_id, &inputs);
1791        // Phase 3: emit each output. `commit_emission_verbatim` consumes
1792        // the retain into the cache slot (and releases the prior cache
1793        // handle internally).
1794        for h in outputs {
1795            self.commit_emission_verbatim(node_id, h);
1796        }
1797    }
1798
1799    /// `OperatorOp::Filter` dispatch (D012/D018).
1800    fn fire_op_filter(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1801        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1802        {
1803            let mut s = self.lock_state();
1804            s.require_node_mut(node_id).has_fired_once = true;
1805        }
1806        if inputs.is_empty() {
1807            return;
1808        }
1809        // Phase 2: predicate per input.
1810        let pass = self.binding.predicate_each(fn_id, &inputs);
1811        // Slice V2: promoted from debug_assert! — binding contract violation
1812        // should fail loud in release builds too.
1813        assert!(
1814            pass.len() == inputs.len(),
1815            "predicate_each returned {} bools for {} inputs",
1816            pass.len(),
1817            inputs.len()
1818        );
1819        // Phase 3: emit passing items verbatim. Take a fresh retain for
1820        // each — the data_batch slot still owns its retain (released at
1821        // wave-end rotation), and the cache slot needs its own.
1822        let mut emitted = 0usize;
1823        for (i, &h) in inputs.iter().enumerate() {
1824            if pass.get(i).copied().unwrap_or(false) {
1825                self.binding.retain_handle(h);
1826                self.commit_emission_verbatim(node_id, h);
1827                emitted += 1;
1828            }
1829        }
1830        // D018: full-reject settles with DIRTY+RESOLVED.
1831        if emitted == 0 {
1832            self.settle_dirty_resolved(node_id);
1833        }
1834    }
1835
1836    /// `OperatorOp::Scan` dispatch — left-fold emitting each new acc.
1837    fn fire_op_scan(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1838        use crate::op_state::ScanState;
1839        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1840        let acc = {
1841            let s = self.lock_state();
1842            scratch_ref::<ScanState>(&s, node_id).acc
1843        };
1844        {
1845            let mut s = self.lock_state();
1846            s.require_node_mut(node_id).has_fired_once = true;
1847        }
1848        if inputs.is_empty() {
1849            return;
1850        }
1851        // Phase 2: fold each input through. Returns N new handles, each
1852        // with a fresh retain.
1853        let new_states = self.binding.fold_each(fn_id, acc, &inputs);
1854        // Slice V2: promoted from debug_assert! — binding contract violation.
1855        assert!(
1856            new_states.len() == inputs.len(),
1857            "fold_each returned {} accs for {} inputs",
1858            new_states.len(),
1859            inputs.len()
1860        );
1861        // Phase 3a: update ScanState.acc to the LAST new acc. Take an
1862        // extra retain for the slot; release the prior acc's slot retain.
1863        let last_acc = new_states.last().copied();
1864        if let Some(last) = last_acc {
1865            let prev_acc = {
1866                let mut s = self.lock_state();
1867                let scratch = scratch_mut::<ScanState>(&mut s, node_id);
1868                let prev = scratch.acc;
1869                scratch.acc = last;
1870                prev
1871            };
1872            // Take the slot's retain on the new acc.
1873            self.binding.retain_handle(last);
1874            // Release the prior slot's retain (post-lock to keep binding
1875            // free to re-enter Core safely).
1876            if prev_acc != crate::handle::NO_HANDLE {
1877                self.binding.release_handle(prev_acc);
1878            }
1879        }
1880        // Phase 3b: emit each intermediate acc verbatim. `new_states`
1881        // entries each carry one retain from `fold_each`; that retain is
1882        // consumed by `commit_emission_verbatim` into the cache slot.
1883        for h in new_states {
1884            self.commit_emission_verbatim(node_id, h);
1885        }
1886    }
1887
1888    /// `OperatorOp::Reduce` dispatch — accumulates silently; emits acc on
1889    /// upstream COMPLETE (cascades ERROR verbatim).
1890    fn fire_op_reduce(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1891        use crate::op_state::ReduceState;
1892        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
1893        let acc = {
1894            let s = self.lock_state();
1895            scratch_ref::<ReduceState>(&s, node_id).acc
1896        };
1897        {
1898            let mut s = self.lock_state();
1899            s.require_node_mut(node_id).has_fired_once = true;
1900        }
1901        // Phase 2: accumulate (silent — no per-input emit).
1902        let new_states = if inputs.is_empty() {
1903            SmallVec::<[HandleId; 1]>::new()
1904        } else {
1905            self.binding.fold_each(fn_id, acc, &inputs)
1906        };
1907        // Slice V2: promoted from debug_assert! — binding contract violation.
1908        assert!(
1909            new_states.len() == inputs.len(),
1910            "fold_each returned {} accs for {} inputs",
1911            new_states.len(),
1912            inputs.len()
1913        );
1914        // Update ReduceState.acc to last new acc; release intermediate
1915        // states (we don't emit them) and the prior acc's slot retain.
1916        let last_acc = new_states.last().copied();
1917        let intermediates_to_release: Vec<HandleId> = if new_states.len() > 1 {
1918            new_states[..new_states.len() - 1].to_vec()
1919        } else {
1920            Vec::new()
1921        };
1922        let prev_acc_to_release = if let Some(last) = last_acc {
1923            let prev_acc = {
1924                let mut s = self.lock_state();
1925                let scratch = scratch_mut::<ReduceState>(&mut s, node_id);
1926                let prev = scratch.acc;
1927                scratch.acc = last;
1928                prev
1929            };
1930            self.binding.retain_handle(last);
1931            if prev_acc == crate::handle::NO_HANDLE {
1932                None
1933            } else {
1934                Some(prev_acc)
1935            }
1936        } else {
1937            None
1938        };
1939        // Release intermediate fold results (Reduce only emits the LAST,
1940        // but only on terminal). Each was retained by `fold_each`.
1941        for h in intermediates_to_release {
1942            self.binding.release_handle(h);
1943        }
1944        if let Some(h) = prev_acc_to_release {
1945            self.binding.release_handle(h);
1946        }
1947
1948        // Phase 3: emit on terminal.
1949        match terminal {
1950            None => {
1951                // Still accumulating; no emit. Subscribers see no message
1952                // for this wave (silent accumulation). The first wave that
1953                // pushes Reduce to fire produces a Dirty entry on the
1954                // upstream's commit, but Reduce itself doesn't queue any
1955                // tier-3 since R5 silently absorbs. v1: leave the
1956                // post-drain auto-resolve sweep to settle nothing —
1957                // pending_notify has no entry for Reduce so the sweep is
1958                // a no-op.
1959            }
1960            Some(TerminalKind::Complete) => {
1961                // Read the live acc (may be the seed if no DATA arrived)
1962                // and emit Data(acc) + Complete.
1963                let final_acc = {
1964                    let s = self.lock_state();
1965                    scratch_ref::<ReduceState>(&s, node_id).acc
1966                };
1967                if final_acc != crate::handle::NO_HANDLE {
1968                    // Emission needs its own retain (slot's retain is
1969                    // owned by ReduceState.acc until reset/Drop).
1970                    self.binding.retain_handle(final_acc);
1971                    self.commit_emission_verbatim(node_id, final_acc);
1972                }
1973                self.complete(node_id);
1974            }
1975            Some(TerminalKind::Error(h)) => {
1976                // Core::error transfers the caller's share into the
1977                // cascade (node.terminal + per-child dep_terminal slots);
1978                // no release at the error() boundary. Take a fresh share
1979                // here so the cascade owns it independently of the
1980                // dep_records[0].terminal slot's share.
1981                self.binding.retain_handle(h);
1982                self.error(node_id, h);
1983            }
1984        }
1985    }
1986
1987    /// `OperatorOp::DistinctUntilChanged` dispatch.
1988    fn fire_op_distinct(&self, node_id: NodeId, equals_fn_id: crate::handle::FnId) {
1989        use crate::op_state::DistinctState;
1990        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1991        let mut prev = {
1992            let s = self.lock_state();
1993            scratch_ref::<DistinctState>(&s, node_id).prev
1994        };
1995        {
1996            let mut s = self.lock_state();
1997            s.require_node_mut(node_id).has_fired_once = true;
1998        }
1999        if inputs.is_empty() {
2000            return;
2001        }
2002        // Take a working-copy retain on the initial prev so both the loop
2003        // (which releases old_prev on each non-equal item) and phase 3
2004        // (which releases the slot's original handle) each have their own
2005        // share. Without this, the loop's release of old_prev (== original
2006        // DistinctState.prev) double-releases against phase 3's stale_slot
2007        // release.
2008        if prev != crate::handle::NO_HANDLE {
2009            self.binding.retain_handle(prev);
2010        }
2011        // Phase 2: per-input equals(prev, current). Each non-equal input
2012        // is emitted and becomes the new prev. Equals fn_id reuses
2013        // `BindingBoundary::custom_equals`.
2014        let mut emitted = 0usize;
2015        for &h in &inputs {
2016            let equal = if prev == crate::handle::NO_HANDLE {
2017                false
2018            } else if prev == h {
2019                true
2020            } else {
2021                self.binding.custom_equals(equals_fn_id, prev, h)
2022            };
2023            if !equal {
2024                // Emit this input verbatim.
2025                self.binding.retain_handle(h);
2026                self.commit_emission_verbatim(node_id, h);
2027                // Update prev: take retain on new prev, release old
2028                // (working-copy retain from above or from prior iteration).
2029                self.binding.retain_handle(h);
2030                let old_prev = prev;
2031                prev = h;
2032                if old_prev != crate::handle::NO_HANDLE {
2033                    self.binding.release_handle(old_prev);
2034                }
2035                emitted += 1;
2036            }
2037        }
2038        // Phase 3: persist prev into DistinctState.prev slot. Release the
2039        // slot's original retain (stale_slot) — this is the slot-owned
2040        // share, independent of the working-copy share released in the
2041        // loop above.
2042        {
2043            let mut s = self.lock_state();
2044            let scratch = scratch_mut::<DistinctState>(&mut s, node_id);
2045            let stale_slot = scratch.prev;
2046            scratch.prev = prev;
2047            if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
2048                drop(s);
2049                self.binding.release_handle(stale_slot);
2050            }
2051        }
2052        // Release the working-copy retain on the final prev if it was
2053        // never released in the loop (i.e. no non-equal items passed,
2054        // prev == original). In that case stale_slot == prev, so phase 3
2055        // didn't release it either — but the working-copy retain is still
2056        // outstanding. Release it now.
2057        if emitted == 0 && prev != crate::handle::NO_HANDLE {
2058            self.binding.release_handle(prev);
2059        }
2060        if emitted == 0 {
2061            self.settle_dirty_resolved(node_id);
2062        }
2063    }
2064
2065    /// `OperatorOp::Pairwise` dispatch — emits `(prev, current)` tuples
2066    /// starting after the second value (first input swallowed, sets `prev`).
2067    fn fire_op_pairwise(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2068        use crate::op_state::PairwiseState;
2069        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2070        let mut prev = {
2071            let s = self.lock_state();
2072            scratch_ref::<PairwiseState>(&s, node_id).prev
2073        };
2074        {
2075            let mut s = self.lock_state();
2076            s.require_node_mut(node_id).has_fired_once = true;
2077        }
2078        if inputs.is_empty() {
2079            return;
2080        }
2081        let mut emitted = 0usize;
2082        for &h in &inputs {
2083            if prev == crate::handle::NO_HANDLE {
2084                // First-ever value — swallow, set prev. Retain for the
2085                // PairwiseState.prev slot (persisted in phase 3 below).
2086                self.binding.retain_handle(h);
2087                prev = h;
2088                continue;
2089            }
2090            // Pack (prev, current) into a tuple handle. Binding returns a
2091            // fresh retain on the packed handle.
2092            let packed = self.binding.pairwise_pack(fn_id, prev, h);
2093            self.commit_emission_verbatim(node_id, packed);
2094            // Advance prev: take retain on h, release old prev.
2095            self.binding.retain_handle(h);
2096            let old_prev = prev;
2097            prev = h;
2098            self.binding.release_handle(old_prev);
2099            emitted += 1;
2100        }
2101        // Persist prev into PairwiseState.prev slot.
2102        {
2103            let mut s = self.lock_state();
2104            let scratch = scratch_mut::<PairwiseState>(&mut s, node_id);
2105            let stale_slot = scratch.prev;
2106            scratch.prev = prev;
2107            if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
2108                drop(s);
2109                self.binding.release_handle(stale_slot);
2110            }
2111        }
2112        if emitted == 0 {
2113            self.settle_dirty_resolved(node_id);
2114        }
2115    }
2116
2117    // =================================================================
2118    // Slice C-2: multi-dep combinator operators (D020)
2119    // =================================================================
2120
2121    /// Snapshot all deps' "latest" handle for multi-dep combinators.
2122    /// For each dep: returns `data_batch.last()` if non-empty (dep fired
2123    /// this wave), else `prev_data` (last handle from previous wave).
2124    /// Also returns whether dep[0] (primary) had DATA this wave —
2125    /// needed by `fire_op_with_latest_from`.
2126    fn snapshot_op_all_latest(&self, node_id: NodeId) -> (SmallVec<[HandleId; 4]>, bool) {
2127        let s = self.lock_state();
2128        let rec = s.require_node(node_id);
2129        let primary_fired = rec
2130            .dep_records
2131            .first()
2132            .is_some_and(|dr| !dr.data_batch.is_empty());
2133        let latest: SmallVec<[HandleId; 4]> = rec
2134            .dep_records
2135            .iter()
2136            .map(|dr| dr.data_batch.last().copied().unwrap_or(dr.prev_data))
2137            .collect();
2138        (latest, primary_fired)
2139    }
2140
2141    /// `OperatorOp::Combine` dispatch — N-dep combineLatest. Packs the
2142    /// latest handle per dep into a tuple via `pack_tuple`, emits on
2143    /// any dep fire. First-run gate (R2.5.3, partial: false) guarantees
2144    /// all deps have a real handle on first fire. Post-warmup INVALIDATE
2145    /// guard: if any dep's prev_data was cleared, settles with RESOLVED
2146    /// instead of packing a NO_HANDLE into the tuple.
2147    fn fire_op_combine(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
2148        let (latest, _primary_fired) = self.snapshot_op_all_latest(node_id);
2149        {
2150            let mut s = self.lock_state();
2151            s.require_node_mut(node_id).has_fired_once = true;
2152        }
2153        // Post-warmup INVALIDATE guard: a dep may have been invalidated
2154        // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
2155        if latest.contains(&crate::handle::NO_HANDLE) {
2156            self.settle_dirty_resolved(node_id);
2157            return;
2158        }
2159        let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
2160        self.commit_emission_verbatim(node_id, tuple_handle);
2161    }
2162
2163    /// `OperatorOp::WithLatestFrom` dispatch — 2-dep, fire-on-primary-only
2164    /// (D021 / Phase 10.5). Emits `[primary, secondary]` pair only when
2165    /// dep[0] (primary) has DATA in the wave. If only dep[1] fires →
2166    /// RESOLVED. Post-warmup INVALIDATE guard: if secondary latest is
2167    /// `NO_HANDLE` (INVALIDATE cleared it), settles with RESOLVED.
2168    fn fire_op_with_latest_from(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
2169        let (latest, primary_fired) = self.snapshot_op_all_latest(node_id);
2170        let first_fire = {
2171            let mut s = self.lock_state();
2172            let rec = s.require_node_mut(node_id);
2173            let was_first = !rec.has_fired_once;
2174            rec.has_fired_once = true;
2175            was_first
2176        };
2177        // On first fire (gate release), always emit — the first-run gate
2178        // guarantees both deps have values (via prev_data fallback in
2179        // snapshot). On subsequent fires, only emit when primary fires.
2180        if !first_fire && !primary_fired {
2181            // Secondary-only update — no downstream DATA.
2182            self.settle_dirty_resolved(node_id);
2183            return;
2184        }
2185        // Post-warmup INVALIDATE guard: secondary may have been invalidated
2186        // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
2187        debug_assert!(latest.len() == 2, "withLatestFrom requires exactly 2 deps");
2188        if latest[1] == crate::handle::NO_HANDLE {
2189            self.settle_dirty_resolved(node_id);
2190            return;
2191        }
2192        let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
2193        self.commit_emission_verbatim(node_id, tuple_handle);
2194    }
2195
2196    /// `OperatorOp::Merge` dispatch — N-dep, forward all DATA handles
2197    /// verbatim (D022). Zero FFI on fire: no transformation. Each dep's
2198    /// batch handles are collected, retained, and emitted individually.
2199    fn fire_op_merge(&self, node_id: NodeId) {
2200        // Collect all batch handles from all deps (flat).
2201        let all_handles: Vec<HandleId> = {
2202            let s = self.lock_state();
2203            let rec = s.require_node(node_id);
2204            rec.dep_records
2205                .iter()
2206                .flat_map(|dr| dr.data_batch.iter().copied())
2207                .collect()
2208        };
2209        {
2210            let mut s = self.lock_state();
2211            s.require_node_mut(node_id).has_fired_once = true;
2212        }
2213        if all_handles.is_empty() {
2214            // All deps settled RESOLVED this wave — no DATA to forward.
2215            self.settle_dirty_resolved(node_id);
2216            return;
2217        }
2218        // Emit each handle verbatim. Take a fresh retain per handle
2219        // (independent of the dep batch's retain which gets released at
2220        // wave-end). Matches Filter's discipline for passing inputs.
2221        for &h in &all_handles {
2222            self.binding.retain_handle(h);
2223            self.commit_emission_verbatim(node_id, h);
2224        }
2225    }
2226
2227    // =================================================================
2228    // Slice C-3: flow operators (D024)
2229    // =================================================================
2230
2231    /// `OperatorOp::Take` dispatch — emits the first `count` DATA values
2232    /// then self-completes via `Core::complete`. When `count == 0`, the
2233    /// first fire emits zero items then immediately self-completes
2234    /// (D027). Cross-wave counter lives in
2235    /// [`TakeState::count_emitted`](super::op_state::TakeState::count_emitted).
2236    fn fire_op_take(&self, node_id: NodeId, count: u32) {
2237        use crate::op_state::TakeState;
2238        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2239        // Snapshot current counter; mark fired regardless of input count
2240        // (activation gate already satisfied or partial-mode).
2241        let mut count_emitted = {
2242            let s = self.lock_state();
2243            scratch_ref::<TakeState>(&s, node_id).count_emitted
2244        };
2245        {
2246            let mut s = self.lock_state();
2247            s.require_node_mut(node_id).has_fired_once = true;
2248        }
2249        // Already at quota before any input this wave — self-complete
2250        // immediately. Covers `count == 0` (first-fire short-circuit) and
2251        // any defensive re-entry after the terminal-skip in `fire_operator`
2252        // already guards against double-complete.
2253        if count_emitted >= count {
2254            self.complete(node_id);
2255            return;
2256        }
2257        // Per-input emission loop. Each pass takes a fresh retain for the
2258        // cache slot; data_batch slot's retain is released at wave-end
2259        // rotation independently.
2260        for &h in &inputs {
2261            self.binding.retain_handle(h);
2262            self.commit_emission_verbatim(node_id, h);
2263            count_emitted = count_emitted.saturating_add(1);
2264            if count_emitted >= count {
2265                break;
2266            }
2267        }
2268        // Persist the updated counter.
2269        {
2270            let mut s = self.lock_state();
2271            scratch_mut::<TakeState>(&mut s, node_id).count_emitted = count_emitted;
2272        }
2273        // Self-complete if we hit the quota this wave. Upstream COMPLETE
2274        // (terminal == Some(Complete)) without us hitting the count
2275        // propagates via the standard auto-cascade — we don't intercept it.
2276        if count_emitted >= count {
2277            self.complete(node_id);
2278            return;
2279        }
2280        // If upstream is already Errored and we haven't hit count, the
2281        // standard cascade will propagate it. If the wave delivered no
2282        // inputs (e.g. RESOLVED from upstream), settle DIRTY+RESOLVED so
2283        // subscribers see the wave close.
2284        if inputs.is_empty() && terminal.is_none() {
2285            self.settle_dirty_resolved(node_id);
2286        }
2287    }
2288
2289    /// `OperatorOp::Skip` dispatch — drops the first `count` DATA values,
2290    /// then forwards the rest. Cross-wave counter lives in
2291    /// [`SkipState::count_skipped`](super::op_state::SkipState::count_skipped).
2292    /// On a wave where every input is still in the skip window, settles
2293    /// DIRTY+RESOLVED (D018 pattern) so subscribers see the wave close.
2294    fn fire_op_skip(&self, node_id: NodeId, count: u32) {
2295        use crate::op_state::SkipState;
2296        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2297        let mut count_skipped = {
2298            let s = self.lock_state();
2299            scratch_ref::<SkipState>(&s, node_id).count_skipped
2300        };
2301        {
2302            let mut s = self.lock_state();
2303            s.require_node_mut(node_id).has_fired_once = true;
2304        }
2305        // No early-return on empty inputs: the post-loop `emitted == 0`
2306        // settle handles the empty-inputs case identically to the
2307        // all-swallowed-by-skip-window case (Slice C-3 /qa P6 — symmetry
2308        // with `fire_op_take`).
2309        let mut emitted = 0usize;
2310        for &h in &inputs {
2311            if count_skipped < count {
2312                count_skipped = count_skipped.saturating_add(1);
2313                // Drop this input — the data_batch slot still owns its
2314                // retain (released at wave-end rotation). No emission.
2315                continue;
2316            }
2317            // Past the skip window — emit verbatim. Take a fresh retain
2318            // for the cache slot.
2319            self.binding.retain_handle(h);
2320            self.commit_emission_verbatim(node_id, h);
2321            emitted += 1;
2322        }
2323        // Persist the updated counter.
2324        {
2325            let mut s = self.lock_state();
2326            scratch_mut::<SkipState>(&mut s, node_id).count_skipped = count_skipped;
2327        }
2328        if emitted == 0 {
2329            self.settle_dirty_resolved(node_id);
2330        }
2331    }
2332
2333    /// `OperatorOp::TakeWhile` dispatch — emits while the predicate
2334    /// holds; on the first `false`, emits any preceding passes from the
2335    /// same batch then self-completes via `Core::complete`. Reuses
2336    /// [`BindingBoundary::predicate_each`] (D029).
2337    fn fire_op_take_while(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2338        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2339        {
2340            let mut s = self.lock_state();
2341            s.require_node_mut(node_id).has_fired_once = true;
2342        }
2343        if inputs.is_empty() {
2344            return;
2345        }
2346        // Phase 2: predicate per input.
2347        let pass = self.binding.predicate_each(fn_id, &inputs);
2348        // Slice V2: promoted from debug_assert! — binding contract violation
2349        // should fail loud in release builds too.
2350        assert!(
2351            pass.len() == inputs.len(),
2352            "predicate_each returned {} bools for {} inputs",
2353            pass.len(),
2354            inputs.len()
2355        );
2356        // Phase 3: emit each input until the first false; then
2357        // self-complete. `fire_operator`'s `terminal.is_some()`
2358        // short-circuit gates re-entry after the self-complete cascade
2359        // installs the terminal slot — no extra `done` flag needed.
2360        let mut emitted = 0usize;
2361        let mut first_false_seen = false;
2362        for (i, &h) in inputs.iter().enumerate() {
2363            if pass.get(i).copied().unwrap_or(false) {
2364                self.binding.retain_handle(h);
2365                self.commit_emission_verbatim(node_id, h);
2366                emitted += 1;
2367            } else {
2368                first_false_seen = true;
2369                break;
2370            }
2371        }
2372        if first_false_seen {
2373            self.complete(node_id);
2374            return;
2375        }
2376        if emitted == 0 {
2377            // Whole batch passed but was empty (impossible here since
2378            // inputs.is_empty() returned early above) — defensive only.
2379            self.settle_dirty_resolved(node_id);
2380        }
2381    }
2382
2383    /// `OperatorOp::Last` dispatch — buffers the latest DATA; emits
2384    /// `Data(latest)` (or `Data(default)` if no DATA arrived and a
2385    /// default was registered) then `Complete` on upstream COMPLETE.
2386    /// On upstream ERROR, propagates verbatim. Storage:
2387    /// [`LastState`](super::op_state::LastState).
2388    ///
2389    /// **Silent-buffer semantics (mirrors Reduce):** on a non-terminal
2390    /// wave (`terminal == None`), `fire_op_last` updates the buffered
2391    /// `latest` handle but produces NO downstream wire message —
2392    /// subscribers observe the operator only when upstream
2393    /// COMPLETE/ERROR triggers the terminal branch. Intermediate
2394    /// inputs from the dep's batch are dropped on the floor (their
2395    /// `data_batch` retains release at wave-end rotation
2396    /// independently). Per-wave settlement on intermediate waves is
2397    /// the canonical behavior for terminal-aware operators.
2398    fn fire_op_last(&self, node_id: NodeId) {
2399        use crate::op_state::LastState;
2400        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2401        {
2402            let mut s = self.lock_state();
2403            s.require_node_mut(node_id).has_fired_once = true;
2404        }
2405
2406        // Phase 2: buffer the latest input handle (if any). Retain new,
2407        // release old. data_batch slot's retain is released at wave-end
2408        // rotation independently — the LastState slot keeps its own
2409        // share so the value survives across waves.
2410        if let Some(&new_latest) = inputs.last() {
2411            let prev_latest = {
2412                let mut s = self.lock_state();
2413                let scratch = scratch_mut::<LastState>(&mut s, node_id);
2414                let prev = scratch.latest;
2415                scratch.latest = new_latest;
2416                prev
2417            };
2418            self.binding.retain_handle(new_latest);
2419            if prev_latest != crate::handle::NO_HANDLE {
2420                self.binding.release_handle(prev_latest);
2421            }
2422        }
2423
2424        // Phase 3: emit on terminal. Buffer-only fires (no terminal yet)
2425        // produce no downstream message — Reduce-style silent
2426        // accumulation. The post-drain auto-resolve sweep is a no-op
2427        // because pending_notify has no entry for Last.
2428        match terminal {
2429            None => {}
2430            Some(TerminalKind::Complete) => {
2431                // Read the live latest + default. If latest != NO_HANDLE,
2432                // emit it. Otherwise, if default != NO_HANDLE, emit default.
2433                // Otherwise, emit only Complete (empty stream, no default).
2434                let (latest, default) = {
2435                    let s = self.lock_state();
2436                    let scratch = scratch_ref::<LastState>(&s, node_id);
2437                    (scratch.latest, scratch.default)
2438                };
2439                let to_emit = if latest != crate::handle::NO_HANDLE {
2440                    Some(latest)
2441                } else if default != crate::handle::NO_HANDLE {
2442                    Some(default)
2443                } else {
2444                    None
2445                };
2446                if let Some(h) = to_emit {
2447                    // Emission needs its own retain — the LastState slot
2448                    // keeps its share until reset/Drop.
2449                    self.binding.retain_handle(h);
2450                    self.commit_emission_verbatim(node_id, h);
2451                }
2452                self.complete(node_id);
2453            }
2454            Some(TerminalKind::Error(h)) => {
2455                // Take a fresh share for the error cascade — the
2456                // dep_records[0].terminal slot keeps its own share
2457                // (released by reset_for_fresh_lifecycle / Drop).
2458                self.binding.retain_handle(h);
2459                self.error(node_id, h);
2460            }
2461        }
2462    }
2463
2464    // -----------------------------------------------------------------
2465    // Slice U: control operators — fire_op impls
2466    // -----------------------------------------------------------------
2467
2468    /// Tap — side-effect passthrough. Invoke tap fn on each DATA, then
2469    /// emit each input handle unchanged (zero allocation).
2470    fn fire_op_tap(&self, node_id: NodeId, fn_id: FnId) {
2471        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2472        {
2473            let mut s = self.lock_state();
2474            s.require_node_mut(node_id).has_fired_once = true;
2475        }
2476        if inputs.is_empty() {
2477            if terminal.is_none() {
2478                self.settle_dirty_resolved(node_id);
2479            }
2480        } else {
2481            for &h in &inputs {
2482                self.binding.invoke_tap_fn(fn_id, h);
2483                self.binding.retain_handle(h);
2484                self.commit_emission_verbatim(node_id, h);
2485            }
2486        }
2487        // Terminal forwarding.
2488        match terminal {
2489            None => {}
2490            Some(TerminalKind::Complete) => {
2491                self.binding.invoke_tap_complete_fn(fn_id);
2492                self.complete(node_id);
2493            }
2494            Some(TerminalKind::Error(h)) => {
2495                self.binding.invoke_tap_error_fn(fn_id, h);
2496                self.binding.retain_handle(h);
2497                self.error(node_id, h);
2498            }
2499        }
2500    }
2501
2502    /// TapFirst — one-shot side-effect on first DATA. After the first
2503    /// qualifying DATA, acts as pure passthrough.
2504    fn fire_op_tap_first(&self, node_id: NodeId, fn_id: FnId) {
2505        use crate::op_state::TapFirstState;
2506        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2507        {
2508            let mut s = self.lock_state();
2509            s.require_node_mut(node_id).has_fired_once = true;
2510        }
2511        if inputs.is_empty() {
2512            if terminal.is_none() {
2513                self.settle_dirty_resolved(node_id);
2514            }
2515        } else {
2516            let fired = {
2517                let s = self.lock_state();
2518                scratch_ref::<TapFirstState>(&s, node_id).fired
2519            };
2520            for &h in &inputs {
2521                if !fired {
2522                    self.binding.invoke_tap_fn(fn_id, h);
2523                    let mut s = self.lock_state();
2524                    scratch_mut::<TapFirstState>(&mut s, node_id).fired = true;
2525                }
2526                self.binding.retain_handle(h);
2527                self.commit_emission_verbatim(node_id, h);
2528            }
2529        }
2530        if let Some(TerminalKind::Complete) = terminal {
2531            self.complete(node_id);
2532        } else if let Some(TerminalKind::Error(h)) = terminal {
2533            self.binding.retain_handle(h);
2534            self.error(node_id, h);
2535        }
2536    }
2537
2538    /// Valve — conditional forward. dep[0]=source, dep[1]=control.
2539    /// When control is truthy, forwards source DATA; else RESOLVED.
2540    fn fire_op_valve(&self, node_id: NodeId) {
2541        // Snapshot both deps.
2542        let (src_inputs, src_terminal, ctrl_latest) = {
2543            let s = self.lock_state();
2544            let rec = s.require_node(node_id);
2545            debug_assert!(rec.dep_records.len() == 2, "valve must have exactly 2 deps");
2546            let dr0 = &rec.dep_records[0];
2547            let dr1 = &rec.dep_records[1];
2548            let src_inputs: Vec<HandleId> = dr0.data_batch.iter().copied().collect();
2549            let src_term = dr0.terminal;
2550            // Latest control: last of this wave's batch, or prev_data.
2551            let ctrl = dr1.data_batch.last().copied().unwrap_or(dr1.prev_data);
2552            (src_inputs, src_term, ctrl)
2553        };
2554        {
2555            let mut s = self.lock_state();
2556            s.require_node_mut(node_id).has_fired_once = true;
2557        }
2558
2559        // Source terminal forwarding (D3).
2560        if let Some(TerminalKind::Complete) = src_terminal {
2561            self.complete(node_id);
2562            return;
2563        }
2564        if let Some(TerminalKind::Error(h)) = src_terminal {
2565            self.binding.retain_handle(h);
2566            self.error(node_id, h);
2567            return;
2568        }
2569
2570        // Gate: NO_HANDLE means "gate closed" (control never sent DATA);
2571        // any real handle means "gate open". Proper value-level truthiness
2572        // would require BindingBoundary::is_truthy (deferred — D048).
2573        let gate_open = ctrl_latest != crate::handle::NO_HANDLE;
2574
2575        if !gate_open {
2576            self.settle_dirty_resolved(node_id);
2577            return;
2578        }
2579
2580        if src_inputs.is_empty() {
2581            // Control opened but no source DATA this wave. Re-emit
2582            // prev source value if available.
2583            let prev_src = {
2584                let s = self.lock_state();
2585                s.require_node(node_id).dep_records[0].prev_data
2586            };
2587            if prev_src == crate::handle::NO_HANDLE {
2588                self.settle_dirty_resolved(node_id);
2589            } else {
2590                self.binding.retain_handle(prev_src);
2591                self.commit_emission_verbatim(node_id, prev_src);
2592            }
2593        } else {
2594            for &h in &src_inputs {
2595                self.binding.retain_handle(h);
2596                self.commit_emission_verbatim(node_id, h);
2597            }
2598        }
2599    }
2600
2601    /// Settle — convergence detector. Forwards DATA, counts quiet waves,
2602    /// self-completes when converged.
2603    fn fire_op_settle(&self, node_id: NodeId, quiet_waves: u32, max_waves: Option<u32>) {
2604        use crate::op_state::SettleState;
2605        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2606        {
2607            let mut s = self.lock_state();
2608            s.require_node_mut(node_id).has_fired_once = true;
2609        }
2610
2611        // Terminal forwarding.
2612        if let Some(TerminalKind::Complete) = terminal {
2613            self.complete(node_id);
2614            return;
2615        }
2616        if let Some(TerminalKind::Error(h)) = terminal {
2617            self.binding.retain_handle(h);
2618            self.error(node_id, h);
2619            return;
2620        }
2621
2622        let saw_data = !inputs.is_empty();
2623
2624        // Forward all DATA.
2625        for &h in &inputs {
2626            self.binding.retain_handle(h);
2627            self.commit_emission_verbatim(node_id, h);
2628        }
2629
2630        // Update counters.
2631        let should_complete = {
2632            let mut s = self.lock_state();
2633            let scratch = scratch_mut::<SettleState>(&mut s, node_id);
2634            scratch.wave_count += 1;
2635            if saw_data {
2636                scratch.has_value = true;
2637                scratch.quiet_count = 0;
2638            } else {
2639                scratch.quiet_count += 1;
2640            }
2641            let settled = scratch.has_value && scratch.quiet_count >= quiet_waves;
2642            let exhausted = max_waves.is_some_and(|max| scratch.wave_count >= max);
2643            settled || exhausted
2644        };
2645
2646        if should_complete {
2647            self.complete(node_id);
2648        } else if !saw_data {
2649            self.settle_dirty_resolved(node_id);
2650        }
2651    }
2652
2653    pub(crate) fn deliver_data_to_consumer(
2654        &self,
2655        s: &mut CoreState,
2656        consumer_id: NodeId,
2657        dep_idx: usize,
2658        handle: HandleId,
2659    ) {
2660        // Retain the handle for the batch accumulation slot — each DATA
2661        // handle in `data_batch` owns a retain share, released at wave-end
2662        // rotation in `clear_wave_state`.
2663        self.binding.retain_handle(handle);
2664
2665        let is_dynamic;
2666        let is_state;
2667        let tracked_or_first_fire;
2668        // Slice F audit close (2026-05-07): default-mode pause suppression.
2669        // If the consumer is paused with `PausableMode::Default`, the
2670        // canonical-spec §2.6 behavior is to suppress fn-fire and consolidate
2671        // pause-window dep deliveries into one fn execution on RESUME.
2672        // Mark `pending_wave` on the pause state instead of adding to
2673        // `pending_fires`. The dep state still advances (the data_batch push
2674        // above is unchanged), and clear_wave_state still rotates the latest
2675        // dep DATA into prev_data — so when the fn ultimately fires on
2676        // RESUME, it sees the consolidated post-pause state.
2677        let suppressed_for_default_pause;
2678        {
2679            let consumer = s.require_node_mut(consumer_id);
2680            consumer.dep_records[dep_idx].data_batch.push(handle);
2681            consumer.dep_records[dep_idx].involved_this_wave = true;
2682            consumer.involved_this_wave = true;
2683            // §10.13 perf (D047): set received_mask bit on first DATA
2684            // delivery for this dep.
2685            if dep_idx < 64 {
2686                consumer.received_mask |= 1u64 << dep_idx;
2687                // §10.3 perf (Slice V1): set involved_mask bit for
2688                // O(1) per-dep involvement query during fire.
2689                consumer.involved_mask |= 1u64 << dep_idx;
2690            }
2691            is_dynamic = consumer.is_dynamic;
2692            is_state = consumer.is_state();
2693            tracked_or_first_fire = !consumer.has_fired_once || consumer.tracked.contains(&dep_idx);
2694            suppressed_for_default_pause = consumer.pause_state.is_paused()
2695                && consumer.pausable == crate::node::PausableMode::Default;
2696            if suppressed_for_default_pause {
2697                consumer.pause_state.mark_pending_wave();
2698            }
2699        }
2700        if suppressed_for_default_pause {
2701            // Default-mode pause: don't add to pending_fires; RESUME will
2702            // schedule one consolidated fire.
2703            return;
2704        }
2705        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
2706        // per-thread WaveState. State lock + WaveState borrow are
2707        // independent.
2708        if is_state {
2709            // State nodes don't have deps; unreachable in practice.
2710        } else if is_dynamic {
2711            if tracked_or_first_fire {
2712                with_wave_state(|ws| {
2713                    ws.pending_fires.insert(consumer_id);
2714                });
2715            }
2716        } else {
2717            // Derived / Operator / Producer (Producer has no deps so won't
2718            // reach here, but the predicate-based dispatch handles it
2719            // uniformly).
2720            with_wave_state(|ws| {
2721                ws.pending_fires.insert(consumer_id);
2722            });
2723        }
2724    }
2725
2726    // -------------------------------------------------------------------
2727    // Subscriber notification
2728    // -------------------------------------------------------------------
2729
2730    /// Queue a wave-end message for `node_id`'s subscribers.
2731    ///
2732    /// **Revision-tracked sink-snapshot batches (Slice X4 / D2,
2733    /// 2026-05-08):** each push for a given node either appends the
2734    /// message to the open batch (if `NodeRecord::subscribers_revision`
2735    /// hasn't advanced since that batch opened — the common case — no
2736    /// extra allocation), or opens a fresh batch with a current sink
2737    /// snapshot frozen at the new revision. A sub installed mid-wave
2738    /// bumps `subscribers_revision`; the next `queue_notify` for the
2739    /// same node observes the bump and starts a new batch that includes
2740    /// the new sub. Pre-subscribe batches retain their original snapshot,
2741    /// so earlier emits flush to their original sink list — the new sub
2742    /// does NOT double-receive them via flush AND handshake replay,
2743    /// closing the late-subscriber + multi-emit-per-wave R1.3.5.a gap.
2744    ///
2745    /// Pause routing decision (R1.3.7.b tier table, §10.2 buffering):
2746    ///   Tier 3 (DATA / RESOLVED) and Tier 4 (INVALIDATE) buffer while
2747    ///   paused; all other tiers (DIRTY tier 1, PAUSE/RESUME tier 2,
2748    ///   COMPLETE/ERROR tier 5, TEARDOWN tier 6) bypass the buffer and
2749    ///   flush immediately. START (tier 0) is per-subscription and never
2750    ///   transits queue_notify.
2751    pub(crate) fn queue_notify(&self, s: &mut CoreState, node_id: NodeId, msg: Message) {
2752        // R1.3.3.a / R1.3.3.d (Slice G — re-added 2026-05-07): dev-mode
2753        // wave-content invariant assertion. The tier-3 slot at one node in
2754        // one wave is either ≥1 DATA or exactly 1 RESOLVED — never mixed,
2755        // never multiple RESOLVED. Slice G moved equals substitution from
2756        // per-emit to wave-end coalescing; this assert pins that the
2757        // dispatcher itself never queues a violating combination at the
2758        // queue_notify granularity. Resolved arrivals come from:
2759        //   1. The auto-resolve sweep in `drain_and_flush` (gates on
2760        //      `!any tier-3` so it can't add to a wave with Data).
2761        //   2. The wave-end equals-substitution pass (rewrites in place,
2762        //      doesn't go through queue_notify).
2763        // Both honor R1.3.3.a by construction post-Slice-G.
2764        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
2765        // on per-thread WaveState. The dev-mode invariant assertion
2766        // borrows WaveState briefly and drops before the rest of
2767        // queue_notify proceeds.
2768        #[cfg(debug_assertions)]
2769        if matches!(msg.tier(), 3) {
2770            with_wave_state(|ws| {
2771                if let Some(entry) = ws.pending_notify.get(&node_id) {
2772                    // Walk all batches' messages — R1.3.3.a is a per-node
2773                    // wave-content invariant, not per-batch (the X4 batches
2774                    // are subscriber-snapshot epochs; the protocol-level
2775                    // tier-3 invariant spans the whole wave for the node).
2776                    let has_data = entry.iter_messages().any(|m| matches!(m, Message::Data(_)));
2777                    let resolved_count = entry
2778                        .iter_messages()
2779                        .filter(|m| matches!(m, Message::Resolved))
2780                        .count();
2781                    let incoming_is_data = matches!(msg, Message::Data(_));
2782                    if incoming_is_data {
2783                        debug_assert!(
2784                            resolved_count == 0,
2785                            "R1.3.3.a violation at {node_id:?}: queueing Data into a \
2786                             wave that already contains Resolved — Slice G should have \
2787                             prevented this via wave-end coalescing"
2788                        );
2789                    } else {
2790                        debug_assert!(
2791                            !has_data,
2792                            "R1.3.3.a violation at {node_id:?}: queueing Resolved into a \
2793                             wave that already contains Data"
2794                        );
2795                        debug_assert!(
2796                            resolved_count == 0,
2797                            "R1.3.3.a violation at {node_id:?}: multiple Resolved in one \
2798                             wave at one node"
2799                        );
2800                    }
2801                }
2802            });
2803        }
2804
2805        let buffered_tier = matches!(msg.tier(), 3 | 4);
2806        let cap = s.pause_buffer_cap;
2807
2808        // Pause-routing branch — handles its own retain/release and returns
2809        // before we touch `pending_notify`, so the rec borrow is contained.
2810        {
2811            let rec = s.require_node_mut(node_id);
2812            if rec.subscribers.is_empty() {
2813                return;
2814            }
2815            // Slice F audit close (2026-05-07): pause routing depends on mode.
2816            //   - `ResumeAll`: buffer tier-3/4 for verbatim replay on RESUME.
2817            //   - `Default` + STATE node: state nodes have no fn-fire to
2818            //     suppress, so buffer like resumeAll (collapse-to-latest is
2819            //     a future enhancement; v1 keeps verbatim).
2820            //   - `Default` + COMPUTE node: suppression happens upstream at
2821            //     fn-fire scheduling (see `deliver_data_to_consumer`); no
2822            //     outgoing tier-3 is produced from this node while paused,
2823            //     so this branch is unreachable for compute-default-paused.
2824            //     Fallthrough to the non-paused queue path is fine.
2825            //   - `Off`: pause is ignored entirely — tier-3 flushes
2826            //     immediately. Fallthrough.
2827            let mode_buffers_tier3 = match rec.pausable {
2828                crate::node::PausableMode::ResumeAll => true,
2829                crate::node::PausableMode::Default => rec.is_state(),
2830                crate::node::PausableMode::Off => false,
2831            };
2832            if buffered_tier && mode_buffers_tier3 && rec.pause_state.is_paused() {
2833                if let Some(h) = msg.payload_handle() {
2834                    self.binding.retain_handle(h);
2835                }
2836                let push_result = rec.pause_state.push_buffered(msg, cap);
2837                for dm in push_result.dropped_msgs {
2838                    if let Some(h) = dm.payload_handle() {
2839                        self.binding.release_handle(h);
2840                    }
2841                }
2842                // R1.3.8.c (Slice F, A3): on first overflow this cycle,
2843                // schedule a synthesized ERROR for wave-end emission.
2844                // `cap` is `Some` here (an overflow can only happen with a
2845                // configured cap), so `unwrap` is safe.
2846                if push_result.first_overflow_this_cycle {
2847                    if let Some((dropped_count, lock_held_ns)) =
2848                        rec.pause_state.overflow_diagnostic()
2849                    {
2850                        // Q-beyond Sub-slice 1 (D108, 2026-05-09):
2851                        // pending_pause_overflow lives on per-thread WaveState.
2852                        with_wave_state(|ws| {
2853                            ws.pending_pause_overflow
2854                                .push(crate::node::PendingPauseOverflow {
2855                                    node_id,
2856                                    dropped_count,
2857                                    configured_max: cap.unwrap_or(0),
2858                                    lock_held_ns,
2859                                });
2860                        });
2861                    }
2862                }
2863                return;
2864            }
2865        }
2866
2867        // Non-paused queue path: retain payload handle and queue into
2868        // pending_notify. Released in `flush_notifications` after sinks
2869        // fire.
2870        if let Some(h) = msg.payload_handle() {
2871            self.binding.retain_handle(h);
2872        }
2873        Self::push_into_pending_notify(s, node_id, msg);
2874    }
2875
2876    /// Slice X4 / D2: revision-tracked batch decision for `queue_notify`'s
2877    /// non-paused path. Either appends `msg` to the open batch (if
2878    /// `subscribers_revision` hasn't advanced since it opened — common
2879    /// case, no extra allocation) or opens a fresh batch with a current
2880    /// sink snapshot frozen at the new revision.
2881    ///
2882    /// Borrow discipline: reads `subscribers_revision` and the snapshot
2883    /// from `s.nodes` BEFORE borrowing WaveState's `pending_notify` to
2884    /// keep the two scopes disjoint.
2885    ///
2886    /// Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_notify` moved
2887    /// to per-thread WaveState. The state-side read of
2888    /// `subscribers_revision` / `subscribers` happens before the
2889    /// `with_wave_state` block opens, then the WaveState borrow
2890    /// performs the entry insertion / append. State lock + WaveState
2891    /// borrow remain independent.
2892    ///
2893    /// Lock-discipline assumption: this read of `subscribers_revision`
2894    /// is safe because both the subscribe install path
2895    /// ([`crate::node::Core::subscribe`]) and `queue_notify` hold
2896    /// `CoreState`'s mutex when they bump / read the revision —
2897    /// concurrent subscribe/unsubscribe cannot interleave. **If
2898    /// `Core::subscribe` ever moves the sink-install lock-released
2899    /// (mirroring the lock-released drain refactor), the revision read
2900    /// here must re-validate post-borrow — otherwise a fresh batch
2901    /// could open with a stale snapshot.**
2902    fn push_into_pending_notify(s: &mut CoreState, node_id: NodeId, msg: Message) {
2903        let current_rev = s.require_node(node_id).subscribers_revision;
2904        let needs_new_batch = with_wave_state(|ws| {
2905            ws.pending_notify.get(&node_id).is_none_or(|entry| {
2906                entry
2907                    .batches
2908                    .last()
2909                    .is_none_or(|b| b.snapshot_revision != current_rev)
2910            })
2911        });
2912        let sinks_snapshot: SmallVec<[Sink; 1]> = if needs_new_batch {
2913            s.require_node(node_id)
2914                .subscribers
2915                .values()
2916                .cloned()
2917                .collect()
2918        } else {
2919            SmallVec::new()
2920        };
2921        with_wave_state(|ws| match ws.pending_notify.entry(node_id) {
2922            Entry::Vacant(slot) => {
2923                let mut batches: SmallVec<[PendingBatch; 1]> = SmallVec::new();
2924                batches.push(PendingBatch {
2925                    snapshot_revision: current_rev,
2926                    sinks: sinks_snapshot,
2927                    messages: smallvec::smallvec![msg],
2928                });
2929                slot.insert(PendingPerNode { batches });
2930            }
2931            Entry::Occupied(mut slot) => {
2932                let entry = slot.get_mut();
2933                if needs_new_batch {
2934                    entry.batches.push(PendingBatch {
2935                        snapshot_revision: current_rev,
2936                        sinks: sinks_snapshot,
2937                        messages: smallvec::smallvec![msg],
2938                    });
2939                } else {
2940                    entry
2941                        .batches
2942                        .last_mut()
2943                        .expect("non-empty by construction (entry exists implies batch exists)")
2944                        .messages
2945                        .push(msg);
2946                }
2947            }
2948        });
2949    }
2950
2951    /// Collect wave-end sink-fire jobs into `ws.deferred_flush_jobs` and the
2952    /// payload-handle releases owed for `pending_notify` into
2953    /// `ws.deferred_handle_releases`. The actual sink fires + handle releases
2954    /// run **after** the state lock is dropped — see [`Core::run_wave`].
2955    ///
2956    /// R1.3.1.b two-phase propagation: phase 1 (DIRTY) propagates through
2957    /// the entire graph before phase 2 (DATA / RESOLVED) begins. Implemented
2958    /// here as cross-node tier-then-node collect — phase 1's jobs sit before
2959    /// phase 2's in `deferred_flush_jobs`, so when `run_wave` drains the
2960    /// queue lock-released, multi-node subscribers see all DIRTYs before any
2961    /// settle. Matches TS's drainPhase model without the per-tier queue
2962    /// indirection.
2963    ///
2964    /// Phase ordering:
2965    ///   1 → tier 1   (DIRTY)
2966    ///   2 → tier 3+4 (DATA/RESOLVED + INVALIDATE — the "settle slice")
2967    ///   3 → tier 5   (COMPLETE/ERROR)
2968    ///   4 → tier 6   (TEARDOWN)
2969    ///
2970    /// Tier 0 (START) is per-subscription (never enters pending_notify) and
2971    /// tier 2 (PAUSE/RESUME) is delivered through dedicated paths, also
2972    /// bypassing pending_notify; both are absent from this enumeration.
2973    ///
2974    /// Within a single phase, per-node insertion order (IndexMap iteration)
2975    /// is preserved — an emit on A before B → A's phase-2 messages flush
2976    /// before B's. Within a single node, message order is preserved.
2977    fn flush_notifications(&self, s: &mut CoreState) {
2978        const PHASES: &[&[u8]] = &[
2979            &[1],    // DIRTY
2980            &[3, 4], // DATA/RESOLVED + INVALIDATE
2981            &[5],    // COMPLETE/ERROR
2982            &[6],    // TEARDOWN
2983        ];
2984        // Q-beyond Sub-slice 1 + 2 + 3 (D108, 2026-05-09): pending_notify,
2985        // deferred_handle_releases, and deferred_flush_jobs all live on
2986        // per-thread WaveState. Take pending_notify under the WaveState
2987        // borrow, drop the borrow, run the per-phase loop (no WaveState
2988        // access in the loop body), then re-borrow WaveState at the end
2989        // to push the collected jobs and payload-handle releases.
2990        //
2991        // /qa F7 (2026-05-10): the `s: &mut CoreState` parameter is
2992        // currently unused inside the per-phase loop — `pending` was
2993        // moved off `s` to WaveState by sub-slice 2, and the per-batch
2994        // sink snapshot is already on the PendingBatch. Kept as a
2995        // parameter to preserve the caller's `let mut s = lock_state();
2996        // self.flush_notifications(&mut s);` invocation shape (caller
2997        // holds the state lock around this call — load-bearing for
2998        // R1.3.5.a per-tier handshake-vs-flush ordering). NOT a "lock
2999        // released" marker; the lock guard belongs to the caller and
3000        // is held throughout this function. A future change that adds
3001        // an in-loop state read should remove the discard below;
3002        // removing the parameter would break the caller's ability to
3003        // express the lock-discipline contract at the call site.
3004        let _ = &*s; // explicit no-op acknowledgement; lock held by caller.
3005        let pending = with_wave_state(|ws| std::mem::take(&mut ws.pending_notify));
3006        let mut jobs: DeferredJobs = Vec::new();
3007        for &phase_tiers in PHASES {
3008            for (_node_id, entry) in &pending {
3009                // Slice X4 / D2: iterate batches in arrival order. Each
3010                // batch carries its own sink snapshot frozen at open-time;
3011                // a batch's messages flush to ITS sinks only. Within a
3012                // single (phase, node), batches stay in arrival order so
3013                // emit-order semantics are preserved across batches.
3014                for batch in &entry.batches {
3015                    if batch.sinks.is_empty() {
3016                        continue;
3017                    }
3018                    let phase_msgs: Vec<Message> = batch
3019                        .messages
3020                        .iter()
3021                        .copied()
3022                        .filter(|m| phase_tiers.contains(&m.tier()))
3023                        .collect();
3024                    if phase_msgs.is_empty() {
3025                        continue;
3026                    }
3027                    let sinks_clone: Vec<Sink> = batch.sinks.iter().map(Arc::clone).collect();
3028                    jobs.push((sinks_clone, phase_msgs));
3029                }
3030            }
3031        }
3032        // Single WaveState borrow at the end: push the collected jobs
3033        // and the payload-handle releases. Refcount release balances the
3034        // retain done in `queue_notify` for every payload-bearing message
3035        // that landed in pending_notify (across ALL batches per node);
3036        // deferred to post-lock-drop so the binding's release path can't
3037        // re-enter Core under our lock.
3038        with_wave_state(|ws| {
3039            ws.deferred_flush_jobs.append(&mut jobs);
3040            for entry in pending.values() {
3041                for msg in entry.iter_messages() {
3042                    if let Some(h) = msg.payload_handle() {
3043                        ws.deferred_handle_releases.push(h);
3044                    }
3045                }
3046            }
3047        });
3048    }
3049
3050    /// Take the deferred sink-fire jobs, payload-handle releases,
3051    /// cleanup-hook fire queue, and pending-wipe queue from `WaveState`.
3052    /// Callers pair this with `drop(state_guard)` and a subsequent
3053    /// [`Self::fire_deferred`] call to deliver the wave's sinks, handle
3054    /// releases, Slice E2 OnInvalidate cleanup hooks, and Slice E2 /qa
3055    /// Q2(b) eager wipe_ctx fires lock-released.
3056    ///
3057    /// Q-beyond Sub-slice 1 (D108, 2026-05-09): `deferred_handle_releases`
3058    /// source moved to per-thread WaveState — signature takes `&mut WaveState`.
3059    /// Q-beyond Sub-slice 3 (D108, 2026-05-09): `deferred_flush_jobs`,
3060    /// `deferred_cleanup_hooks`, and `pending_wipes` all moved to
3061    /// WaveState. The `_s: &mut CoreState` parameter is now unused but
3062    /// kept to preserve the call-site lock-discipline ordering (caller
3063    /// holds the state lock around this call to interleave with prior
3064    /// `clear_wave_state` per-NodeRecord work).
3065    pub(crate) fn drain_deferred(_s: &mut CoreState, ws: &mut WaveState) -> WaveDeferred {
3066        (
3067            std::mem::take(&mut ws.deferred_flush_jobs),
3068            std::mem::take(&mut ws.deferred_handle_releases),
3069            std::mem::take(&mut ws.deferred_cleanup_hooks),
3070            std::mem::take(&mut ws.pending_wipes),
3071        )
3072    }
3073
3074    /// Fire deferred sink-fire jobs in collected order, then release the
3075    /// payload handles owed for messages that landed in `pending_notify`
3076    /// during the wave, then fire any queued Slice E2 OnInvalidate cleanup
3077    /// hooks. All three phases run lock-released so:
3078    /// - Sinks that call back into Core (emit, pause, etc.) re-acquire the
3079    ///   state lock cleanly and run their own nested wave.
3080    /// - The binding's `release_handle` path can't deadlock against a
3081    ///   binding-side mutex held by Core.
3082    /// - User cleanup closures (invoked via `BindingBoundary::cleanup_for`)
3083    ///   may safely re-enter Core for unrelated nodes.
3084    ///
3085    /// **Cleanup-drain panic discipline (D060):** each `cleanup_for` call
3086    /// is wrapped in `catch_unwind` so a single binding panic doesn't
3087    /// short-circuit the per-wave drain. All queued cleanup attempts run;
3088    /// if any panicked, the LAST panic re-raises after the loop completes
3089    /// (preserving wave-end discipline while still surfacing failures).
3090    /// Per D060, Core stays panic-naive about user code — bindings own
3091    /// their host-language panic policy inside `cleanup_for`; this
3092    /// `catch_unwind` is purely about drain-don't-short-circuit.
3093    pub(crate) fn fire_deferred(
3094        &self,
3095        jobs: DeferredJobs,
3096        releases: Vec<HandleId>,
3097        cleanup_hooks: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
3098        pending_wipes: Vec<crate::handle::NodeId>,
3099    ) {
3100        // Slice E2 /qa P1 (2026-05-07): wrap each sink-fire in
3101        // `catch_unwind` so a panicking sink doesn't unwind out of
3102        // `fire_deferred` and drop the queued `releases` +
3103        // `cleanup_hooks`. Mirrors Slice F audit fix A7's per-tier
3104        // handshake-fire discipline. Without this guard, a sink panic
3105        // here would silently leak handle retains AND silently drop
3106        // OnInvalidate cleanup hooks. AssertUnwindSafe is safe because
3107        // we re-raise the last panic at the end after running every
3108        // queued fire — drain ordering is preserved.
3109        let mut last_panic: Option<Box<dyn std::any::Any + Send>> = None;
3110        for (sinks, msgs) in jobs {
3111            for sink in &sinks {
3112                let sink = sink.clone();
3113                let msgs_ref = &msgs;
3114                let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
3115                    sink(msgs_ref);
3116                }));
3117                if let Err(payload) = result {
3118                    last_panic = Some(payload);
3119                }
3120            }
3121        }
3122        for h in releases {
3123            self.binding.release_handle(h);
3124        }
3125        // Slice E2 (D060): drain cleanup hooks with per-item panic
3126        // isolation so the loop always completes. AssertUnwindSafe is
3127        // safe here because we don't rely on logical state being valid
3128        // post-panic — the panic propagates anyway after the drain ends.
3129        for (node_id, trigger) in cleanup_hooks {
3130            let binding = &self.binding;
3131            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
3132                binding.cleanup_for(node_id, trigger);
3133            }));
3134            if let Err(payload) = result {
3135                last_panic = Some(payload);
3136            }
3137        }
3138        // Slice E2 /qa Q2(b) (D069): drain eager wipe_ctx queue with the
3139        // same per-item panic isolation. Fires AFTER cleanup hooks so a
3140        // resubscribable node's OnInvalidate (or any tier-3+ cleanup that
3141        // fires in the same wave) sees pre-wipe binding state if it
3142        // landed in the same wave as the terminal cascade. Mutually
3143        // exclusive with `Subscription::Drop`'s direct-fire site, but
3144        // even concurrent fires are idempotent (binding's `wipe_ctx`
3145        // calls `HashMap::remove` which is a no-op on absent keys).
3146        for node_id in pending_wipes {
3147            let binding = &self.binding;
3148            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
3149                binding.wipe_ctx(node_id);
3150            }));
3151            if let Err(payload) = result {
3152                last_panic = Some(payload);
3153            }
3154        }
3155        if let Some(payload) = last_panic {
3156            std::panic::resume_unwind(payload);
3157        }
3158    }
3159
3160    // -------------------------------------------------------------------
3161    // User-facing batch — coalesce multiple emits into one wave
3162    // -------------------------------------------------------------------
3163
3164    /// Coalesce multiple emissions into a single wave. Every `emit` /
3165    /// `complete` / `error` / `teardown` / `invalidate` call inside `f`
3166    /// queues its downstream work; the wave drains when `f` returns.
3167    ///
3168    /// **R1.3.6.a** — DIRTY still propagates immediately (tier 1 isn't
3169    /// deferred); only tier-3+ delivery is held until scope exit. **R1.3.6.b**
3170    /// — repeated emits on the same node coalesce into a single multi-message
3171    /// delivery (one [`Message::Dirty`] for the wave + one [`Message::Data`]
3172    /// per emit, all delivered together in the per-node phase-2 pass).
3173    ///
3174    /// Nested `batch()` calls share the outer wave; only the outermost call
3175    /// drives the drain. Re-entrant calls from inside an `emit`/fn (the wave
3176    /// engine's own `in_tick` re-entrance) compose with this method
3177    /// transparently — they observe `in_tick = true` and skip drain just
3178    /// like nested `batch()`.
3179    ///
3180    /// On panic inside `f`, the `BatchGuard` returned by the internal
3181    /// `begin_batch` call drops normally and discards pending tier-3+ work
3182    /// (subscribers do not observe the half-built wave). See
3183    /// [`Core::begin_batch`] for the RAII variant if you need explicit control
3184    /// over the scope boundary.
3185    pub fn batch<F>(&self, f: F)
3186    where
3187        F: FnOnce(),
3188    {
3189        let _guard = self.begin_batch();
3190        f();
3191    }
3192
3193    /// RAII batch handle — opens a wave when constructed, drains on drop.
3194    ///
3195    /// Mirrors the closure-based [`Self::batch`] but exposes the scope
3196    /// boundary so callers can compose batches with non-`FnOnce` control
3197    /// flow (e.g. async-state-machine code paths, or splitting setup and
3198    /// drain across helper functions).
3199    ///
3200    /// ```
3201    /// use graphrefly_core::{Core, BindingBoundary, NodeRegistration, NodeOpts,
3202    ///     HandleId, NodeId, FnId, FnResult, DepBatch};
3203    /// use std::sync::Arc;
3204    ///
3205    /// struct Stub;
3206    /// impl BindingBoundary for Stub {
3207    ///     fn invoke_fn(&self, _: NodeId, _: FnId, _: &[DepBatch]) -> FnResult {
3208    ///         FnResult::Noop { tracked: None }
3209    ///     }
3210    ///     fn custom_equals(&self, _: FnId, _: HandleId, _: HandleId) -> bool { false }
3211    ///     fn release_handle(&self, _: HandleId) {}
3212    /// }
3213    ///
3214    /// let core = Core::new(Arc::new(Stub) as Arc<dyn BindingBoundary>);
3215    /// let state_a = core.register(NodeRegistration {
3216    ///     deps: vec![], fn_or_op: None,
3217    ///     opts: NodeOpts { initial: HandleId::new(1), ..Default::default() },
3218    /// }).unwrap();
3219    /// let state_b = core.register(NodeRegistration {
3220    ///     deps: vec![], fn_or_op: None,
3221    ///     opts: NodeOpts { initial: HandleId::new(2), ..Default::default() },
3222    /// }).unwrap();
3223    ///
3224    /// let g = core.begin_batch();
3225    /// core.emit(state_a, HandleId::new(10));
3226    /// core.emit(state_b, HandleId::new(20));
3227    /// drop(g); // wave drains here
3228    /// ```
3229    ///
3230    /// Like the closure form, nested `begin_batch` calls share the outer
3231    /// wave (only the outermost guard drains).
3232    ///
3233    /// # Panics
3234    ///
3235    /// Panics if the registry-epoch retry-validate loop exceeds
3236    /// [`crate::subgraph::MAX_LOCK_RETRIES`] iterations — pathological
3237    /// concurrent `register` / `set_deps` activity racing with
3238    /// closure-form batch entry. Unreachable in correct call paths.
3239    #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3240    pub fn begin_batch(&self) -> BatchGuard {
3241        // Slice Y1 / Phase E (2026-05-08): closure-form batch has no known
3242        // seed; per session-doc Q7 / D092 it MUST serialize against every
3243        // currently-existing partition. Acquire each partition's
3244        // `wave_owner` in ascending [`SubgraphId`] order via the retry-
3245        // validate primitive. Same-thread re-entry passes through each
3246        // ReentrantMutex transparently; cross-thread waves on any of the
3247        // touched partitions block until our `wave_guards` drop.
3248        //
3249        // **QA-fix #2 (2026-05-09) — registry epoch retry-validate:** a
3250        // concurrent `register` / `set_deps`-driven union/split between
3251        // our `all_partitions_lock_boxes()` snapshot and the post-
3252        // acquire epoch read changes the partition set. We then retry
3253        // the whole acquire with the new snapshot. Without this, a
3254        // partition added after our snapshot would not be held by our
3255        // batch — breaking the closure-form's "all-partitions
3256        // serialization" contract.
3257        //
3258        // Trade-off (documented v1 contract): closure-form batch is the
3259        // serialization point under per-partition parallelism. Per-seed
3260        // entry points (`Core::subscribe`, [`Self::begin_batch_for`])
3261        // acquire only the touched partitions and run truly parallel
3262        // for disjoint partitions.
3263        for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
3264            let epoch_before = self.registry.lock().epoch();
3265            let partition_boxes = self.all_partitions_lock_boxes();
3266            let mut wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]> = SmallVec::new();
3267            for (sid, _box) in &partition_boxes {
3268                // Use the partition's root NodeId as the lock_for retry
3269                // seed. SubgraphId.raw() == root NodeId.raw(); the root
3270                // is always registered in the X5 / Phase-E substrate
3271                // (cleanup_node is gated, Phase G activates).
3272                let representative = crate::handle::NodeId::new(sid.raw());
3273                wave_guards.push(
3274                    self.partition_wave_owner_lock_arc(representative)
3275                        .unwrap_or_else(|e| panic!("{e}")),
3276                );
3277            }
3278            // Post-acquire epoch read. If unchanged, our snapshot is
3279            // still authoritative — every existing partition was held
3280            // throughout. If changed, drop guards and retry.
3281            let epoch_after = self.registry.lock().epoch();
3282            if epoch_after == epoch_before {
3283                return self.begin_batch_with_guards(wave_guards);
3284            }
3285            // Drop guards lock-released so retries don't accumulate.
3286            drop(wave_guards);
3287            std::thread::yield_now();
3288        }
3289        panic!(
3290            "Core::begin_batch: exceeded {} retries — pathological concurrent \
3291             register/union/split activity racing with closure-form batch entry",
3292            crate::subgraph::MAX_LOCK_RETRIES
3293        );
3294    }
3295
3296    /// Begin a batch scoped to the partitions transitively touched from
3297    /// `seed`. Walks `s.children` (downstream cascade) + `meta_companions`
3298    /// (R1.3.9.d TEARDOWN cascade) starting at `seed`, collects every
3299    /// reachable partition, and acquires each in ascending
3300    /// [`crate::subgraph::SubgraphId`] order via
3301    /// [`Core::partition_wave_owner_lock_arc`].
3302    ///
3303    /// Two threads with disjoint touched-partition sets run truly
3304    /// parallel — the per-partition `wave_owner` mutexes don't block
3305    /// each other. This is the canonical Y1 parallelism win for
3306    /// per-seed wave-driving entry points (subscribe, emit, pause,
3307    /// resume, invalidate, complete, error, teardown,
3308    /// set_deps push-on-subscribe).
3309    ///
3310    /// **QA-fix #2 (2026-05-09):** retry-validate the touched-partition
3311    /// set against the registry epoch — same protection as
3312    /// [`Self::begin_batch`] but scoped to a per-seed touched set
3313    /// rather than every partition. Conservative: any registry
3314    /// mutation (even on a partition unrelated to seed's touched set)
3315    /// triggers a retry. This avoids a precise "did MY touched set
3316    /// change?" check at the cost of occasional spurious retries.
3317    ///
3318    /// # Panics
3319    ///
3320    /// Panics if the registry-epoch retry-validate loop exceeds
3321    /// [`crate::subgraph::MAX_LOCK_RETRIES`] iterations, OR if
3322    /// [`Core::partition_wave_owner_lock_arc`] panics on an
3323    /// unregistered seed. Both are unreachable in correct call paths
3324    /// (P12 invariant guarantees registry membership matches
3325    /// `s.nodes`).
3326    ///
3327    /// Slice Y1 / Phase E (2026-05-08); QA-fix #2 (2026-05-09).
3328    #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3329    pub fn begin_batch_for(&self, seed: crate::handle::NodeId) -> BatchGuard {
3330        match self.try_begin_batch_for(seed) {
3331            Ok(guard) => guard,
3332            Err(e) => panic!("{e}"),
3333        }
3334    }
3335
3336    /// Fallible variant of `begin_batch_for`. Returns `Err` if any
3337    /// partition acquire violates ascending order (Phase H+ STRICT,
3338    /// D115). Used by `try_run_wave_for`; the public `begin_batch_for`
3339    /// calls this and unwraps.
3340    pub(crate) fn try_begin_batch_for(
3341        &self,
3342        seed: crate::handle::NodeId,
3343    ) -> Result<BatchGuard, crate::node::PartitionOrderViolation> {
3344        let core_generation = self.generation;
3345        for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
3346            let epoch_before = self.registry.lock().epoch();
3347            // Fast-path: per-thread partition cache. On repeated emits to
3348            // the same seed (the dominant hot-loop pattern), skip the BFS
3349            // in compute_touched_partitions — it acquires state + registry
3350            // locks and allocates a HashSet + SmallVec per call. The cache
3351            // is valid as long as the registry epoch hasn't changed (no
3352            // register/union/split since the cache was populated).
3353            let touched = PARTITION_CACHE
3354                .with(|cell| {
3355                    let cache = cell.borrow();
3356                    if let Some(ref c) = *cache {
3357                        if c.core_generation == core_generation
3358                            && c.seed == seed
3359                            && c.epoch == epoch_before
3360                        {
3361                            return Some(c.partitions.clone());
3362                        }
3363                    }
3364                    None
3365                })
3366                .unwrap_or_else(|| {
3367                    let result = self.compute_touched_partitions(seed);
3368                    PARTITION_CACHE.with(|cell| {
3369                        *cell.borrow_mut() = Some(PartitionCache {
3370                            core_generation,
3371                            seed,
3372                            epoch: epoch_before,
3373                            partitions: result.clone(),
3374                        });
3375                    });
3376                    result
3377                });
3378            let mut wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]> = SmallVec::new();
3379            let mut partition_err = None;
3380            for sid in &touched {
3381                let representative = crate::handle::NodeId::new(sid.raw());
3382                match self.partition_wave_owner_lock_arc(representative) {
3383                    Ok(guard) => wave_guards.push(guard),
3384                    Err(e) => {
3385                        partition_err = Some(e);
3386                        break;
3387                    }
3388                }
3389            }
3390            // Drop wave_guards on error — release any already-acquired partitions.
3391            if let Some(e) = partition_err {
3392                drop(wave_guards);
3393                return Err(e);
3394            }
3395            let epoch_after = self.registry.lock().epoch();
3396            if epoch_after == epoch_before {
3397                return Ok(self.begin_batch_with_guards(wave_guards));
3398            }
3399            // Epoch changed — invalidate cache and retry.
3400            PARTITION_CACHE.with(|cell| {
3401                *cell.borrow_mut() = None;
3402            });
3403            drop(wave_guards);
3404            std::thread::yield_now();
3405        }
3406        panic!(
3407            "Core::begin_batch_for(seed={seed:?}): exceeded {} retries — \
3408             pathological concurrent register/union/split activity racing \
3409             with per-seed batch entry",
3410            crate::subgraph::MAX_LOCK_RETRIES
3411        );
3412    }
3413
3414    /// Is this thread currently inside an owning wave on this Core?
3415    /// Per-(Core, thread) — see [`IN_TICK_OWNED`]. Read on the wave-owner
3416    /// thread (e.g. by `commit_emission` to decide cache-snapshot taking).
3417    /// `#[must_use]`: a discarded result silently loses the
3418    /// ownership/nesting decision (a classic predicate-misuse bug).
3419    #[must_use]
3420    fn in_tick(&self) -> bool {
3421        IN_TICK_OWNED.with(|s| s.borrow().contains(&self.generation))
3422    }
3423
3424    /// Claim wave ownership for this (Core, thread). Returns `true` iff
3425    /// this call is the outermost entry (slot was absent) — i.e.
3426    /// `owns_tick`; `false` for nested same-(Core, thread) re-entry.
3427    /// `AHashSet::insert` returns `true` exactly when the value was newly
3428    /// inserted, which is precisely the `owns_tick` semantics.
3429    fn claim_in_tick(&self) -> bool {
3430        IN_TICK_OWNED.with(|s| s.borrow_mut().insert(self.generation))
3431    }
3432
3433    /// Release wave ownership for this (Core, thread). Called by the
3434    /// owning [`BatchGuard::drop`] only — after the `!owns_tick`
3435    /// early-return, so a nested guard never releases — explicitly at
3436    /// each of the three exit points, always AFTER the wave drain +
3437    /// WaveState cleanup and BEFORE `fire_deferred` (so a re-entrant sink
3438    /// emit runs as a fresh owning wave): (1) the closure-body-panic
3439    /// branch, (2) the drain-phase-panic `catch_unwind` arm (before
3440    /// `resume_unwind`), (3) the success path's locked cleanup block.
3441    /// Released exactly once per (Core, thread, wave); idempotent
3442    /// regardless (`AHashSet::remove` of an absent key is a no-op).
3443    fn clear_in_tick(&self) {
3444        IN_TICK_OWNED.with(|s| {
3445            s.borrow_mut().remove(&self.generation);
3446        });
3447    }
3448
3449    /// Internal helper: claim `in_tick` and assemble a [`BatchGuard`]
3450    /// with the supplied (already-acquired) partition wave-owner guards.
3451    /// `wave_guards` MUST be in ascending [`crate::subgraph::SubgraphId`]
3452    /// order (the canonical lock-acquisition order) — both
3453    /// [`Self::begin_batch`] (all-partitions) and
3454    /// [`Self::begin_batch_for`] (touched-partitions) construct the
3455    /// vector in that order before calling here.
3456    fn begin_batch_with_guards(
3457        &self,
3458        wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]>,
3459    ) -> BatchGuard {
3460        // Claim wave ownership for this (Core, thread). Keyed per-(Core,
3461        // thread) in the `IN_TICK_OWNED` thread_local (see its doc for
3462        // the cross-Core / disjoint-partition / nested-re-entry rationale)
3463        // — no state lock needed, since `in_tick` has no cross-thread read
3464        // requirement.
3465        let owns_tick = self.claim_in_tick();
3466        // D1 patch (2026-05-09): defensive wave-start clear of the
3467        // per-thread Slice G tier3 tracker on outermost owning entry.
3468        // The thread-local is cleared at outermost BatchGuard drop on
3469        // both success + panic paths; this start-clear is belt-and-
3470        // suspenders against panic paths that bypass Drop (catch_unwind
3471        // can interleave with thread reuse — e.g. cargo's test-runner
3472        // thread pool — and propagate stale entries from a prior
3473        // panicked test's wave that didn't fully unwind through
3474        // BatchGuard::drop).
3475        if owns_tick {
3476            tier3_clear();
3477            // Q-beyond Sub-slice 1 (D108, 2026-05-09): defensive wave-start
3478            // clear of WaveState's non-retain-holding fields. Mirrors the
3479            // tier3 defensive-clear above. Retain-holding fields
3480            // (wave_cache_snapshots / deferred_handle_releases) MUST be
3481            // empty here — outermost BatchGuard::drop drains them on both
3482            // success + panic paths.
3483            wave_state_clear_outermost();
3484        }
3485        BatchGuard {
3486            core: self.clone(),
3487            owns_tick,
3488            wave_guards,
3489            _not_send: std::marker::PhantomData,
3490        }
3491    }
3492}
3493
3494/// RAII guard returned by [`Core::begin_batch`].
3495///
3496/// While alive, suppresses per-emit wave drains — multiple `emit` /
3497/// `complete` / `error` / `teardown` / `invalidate` calls coalesce into one
3498/// wave. On drop:
3499/// - Outermost guard: drains the wave (fires sinks, runs cleanup, clears
3500///   in-tick).
3501/// - Nested guard (an outer `BatchGuard` or an in-progress wave already owns
3502///   the in-tick flag): silently no-ops.
3503///
3504/// On thread panic during the closure body, the drop path discards pending
3505/// tier-3+ delivery rather than firing sinks (avoids cascading panics).
3506/// Subscribers observe **no tier-3+ delivery for the panicked wave**.
3507/// State-node cache writes that already executed inside the closure are
3508/// rolled back via wave-cache snapshots — `cache_of(s)` returns the pre-
3509/// panic value. The atomicity guarantee covers both sink-observability and
3510/// cache state.
3511///
3512/// # Thread safety
3513///
3514/// `BatchGuard` is **`!Send`** by design. `begin_batch` claims the
3515/// per-(Core, thread) `in_tick` ownership slot AND the per-partition
3516/// `wave_owner` re-entrant mutex(es) on the calling thread; sending the
3517/// guard to another thread and dropping it there would clear `in_tick`
3518/// against the wrong thread's slot and release the wave-owner guards
3519/// from a different thread than the one that acquired them, breaking
3520/// both the per-(Core, thread) "I own the wave scope" semantic and
3521/// `parking_lot::ReentrantMutex`'s ownership invariant. The `wave_guards` field is a `SmallVec` of
3522/// `!Send` `ArcReentrantMutexGuard<()>`; the `PhantomData<*const ()>`
3523/// marker is belt-and-suspenders.
3524///
3525/// Slice Y1 / Phase E (2026-05-08): the field migrated from a single
3526/// `ArcReentrantMutexGuard` (legacy Core-global `wave_owner`) to a
3527/// `SmallVec` of partition wave-owner guards. Closure-form
3528/// `begin_batch` acquires every current partition (serialization
3529/// point); `begin_batch_for(seed)` acquires only the transitively-
3530/// touched partitions (parallel for disjoint sets).
3531///
3532/// ```compile_fail
3533/// use graphrefly_core::{BatchGuard, BindingBoundary, Core, DepBatch, FnId, FnResult, HandleId, NodeId};
3534/// use std::sync::Arc;
3535///
3536/// struct Stub;
3537/// impl BindingBoundary for Stub {
3538///     fn invoke_fn(&self, _: NodeId, _: FnId, _: &[DepBatch]) -> FnResult {
3539///         FnResult::Noop { tracked: None }
3540///     }
3541///     fn custom_equals(&self, _: FnId, _: HandleId, _: HandleId) -> bool { false }
3542///     fn release_handle(&self, _: HandleId) {}
3543/// }
3544/// fn requires_send<T: Send>(_: T) {}
3545/// let core = Core::new(Arc::new(Stub) as Arc<dyn BindingBoundary>);
3546/// let guard = core.begin_batch();
3547/// requires_send(guard); // <- compile_fail: BatchGuard is !Send.
3548/// ```
3549#[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3550pub struct BatchGuard {
3551    core: Core,
3552    owns_tick: bool,
3553    /// Re-entrant mutex guards held for the wave's duration. One entry
3554    /// per touched partition's `wave_owner`, in ascending
3555    /// [`crate::subgraph::SubgraphId`] order. Drop releases each guard
3556    /// (any order — `parking_lot::ReentrantMutex` doesn't care since all
3557    /// are held by the same thread). Cross-thread waves on any of the
3558    /// held partitions block until our scope ends; cross-thread waves
3559    /// on partitions NOT in this vector run truly parallel — the
3560    /// canonical Y1 parallelism property.
3561    ///
3562    /// Each `ArcReentrantMutexGuard<()>` is `!Send`, so the `SmallVec`
3563    /// (and thus `BatchGuard`) is `!Send` at the type level — sending
3564    /// across threads would violate `parking_lot::ReentrantMutex`'s
3565    /// thread-ownership invariant.
3566    wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]>,
3567    _not_send: std::marker::PhantomData<*const ()>,
3568}
3569
3570impl BatchGuard {
3571    /// Panic-discard cleanup for the owning guard: drop pending wave
3572    /// work, release queued payload + handle retains lock-released,
3573    /// restore pre-wave cache snapshots, clear per-thread `WaveState` +
3574    /// the Slice-G tier3 tracker, and discard deferred producer ops.
3575    ///
3576    /// Shared by BOTH panic origins so a drain-phase fn/sink panic gets
3577    /// the identical `BatchGuard` atomicity guarantee as a closure-body
3578    /// panic: (1) the `std::thread::panicking()` branch (panic propagated
3579    /// from the wave's *closure body* — drop runs during that unwind),
3580    /// and (2) the success-path `catch_unwind` around `drain_and_flush()`
3581    /// (a fn/sink panic that escaped the inner per-call `catch_unwind`
3582    /// isolation while drop was NOT already unwinding). /qa D047.
3583    ///
3584    /// Does NOT release `in_tick` — each `BatchGuard::drop` exit path
3585    /// calls `clear_in_tick()` explicitly, after this cleanup and before
3586    /// `fire_deferred` (so a re-entrant sink emit runs as a fresh owning
3587    /// wave).
3588    fn discard_wave_cleanup(&self) {
3589        let (pending, deferred_releases, restored_releases) = {
3590            let mut s = self.core.lock_state();
3591            // WaveState borrowed alongside state for panic-discard
3592            // cleanup. The WaveState borrow is per-thread, independent of
3593            // state. `in_tick` is per-(Core, thread) (`IN_TICK_OWNED`),
3594            // released separately by the explicit `clear_in_tick` on each
3595            // exit path; this method only drains/cleans the per-thread
3596            // WaveState retain-fields.
3597            with_wave_state(|ws| {
3598                let pending = std::mem::take(&mut ws.pending_notify);
3599                let _: DeferredJobs = std::mem::take(&mut ws.deferred_flush_jobs);
3600                ws.pending_fires.clear();
3601                let restored = self.core.restore_wave_cache_snapshots(&mut s, ws);
3602                // clear_wave_state pushes batch-handle releases into
3603                // ws.deferred_handle_releases, so take ws's queue AFTER
3604                // the clear.
3605                s.clear_wave_state(ws);
3606                ws.clear_wave_state();
3607                let deferred_releases = std::mem::take(&mut ws.deferred_handle_releases);
3608                // Slice E2 (D061): panic-discard wave drops queued
3609                // OnInvalidate cleanup hooks SILENTLY. Bindings using
3610                // OnInvalidate for external-resource cleanup MUST
3611                // idempotent-cleanup at process exit / next successful
3612                // invalidate. Mirrors A3 `pending_pause_overflow`
3613                // panic-discard precedent.
3614                let _: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)> =
3615                    std::mem::take(&mut ws.deferred_cleanup_hooks);
3616                // Slice E2 /qa Q2(b) (D069): same panic-discard discipline
3617                // for the eager-wipe queue. A panic-discarded wave drops
3618                // queued `wipe_ctx` fires silently; the binding-side
3619                // `NodeCtxState` entry remains until the next successful
3620                // terminate-with-no-subs cycle (or until `Core` drops).
3621                // This mirrors D061's external-resource-cleanup gap and
3622                // is documented similarly.
3623                let _: Vec<crate::handle::NodeId> = std::mem::take(&mut ws.pending_wipes);
3624                (pending, deferred_releases, restored)
3625            })
3626        };
3627        // Lock dropped — release retains lock-released so the binding
3628        // can't deadlock against an internal binding mutex.
3629        for entry in pending.values() {
3630            for msg in entry.iter_messages() {
3631                if let Some(h) = msg.payload_handle() {
3632                    self.core.binding.release_handle(h);
3633                }
3634            }
3635        }
3636        for h in deferred_releases {
3637            self.core.binding.release_handle(h);
3638        }
3639        for h in restored_releases {
3640            self.core.binding.release_handle(h);
3641        }
3642        // D1 patch (2026-05-09): clear the per-thread Slice G tier3
3643        // tracker on outermost wave-end (panic-discard path). The
3644        // thread-local outlives the BatchGuard otherwise — cargo's
3645        // thread reuse across tests would propagate stale entries.
3646        tier3_clear();
3647        // Phase H+ STRICT (D115): discard deferred producer ops on
3648        // panic. Release handle retains without firing.
3649        {
3650            let mut ops = self.core.deferred_producer_ops.lock();
3651            let discarded = std::mem::take(&mut *ops);
3652            for op in discarded {
3653                match op {
3654                    crate::node::DeferredProducerOp::Emit { handle, .. }
3655                    | crate::node::DeferredProducerOp::Error { handle, .. } => {
3656                        self.core.binding.release_handle(handle);
3657                    }
3658                    _ => {} // Complete has no handle; Callback drops naturally
3659                }
3660            }
3661        }
3662    }
3663}
3664
3665impl Drop for BatchGuard {
3666    fn drop(&mut self) {
3667        if !self.owns_tick {
3668            // Nested / non-owning guard: never claimed ownership, so it
3669            // must never release it. The owning guard's RAII releaser
3670            // (below) is the single clear site.
3671            return;
3672        }
3673        // Wave-ownership (`in_tick`) release discipline. `clear_in_tick`
3674        // must run AFTER the wave's drain + WaveState cleanup but BEFORE
3675        // `fire_deferred` (sinks), on every exit path:
3676        //
3677        // - **Before `fire_deferred` (load-bearing):** a sink re-entering
3678        //   `Core::emit` / `complete` from a flush callback must run as a
3679        //   fresh OWNING wave (so its own emissions drain + deliver). If
3680        //   `in_tick` were still owned during `fire_deferred`, that
3681        //   re-entrant emit would be a non-owning no-op and its data
3682        //   silently lost (regression caught by
3683        //   `lock_discipline::sink_can_reenter_core_via_emit`). This is
3684        //   why each path clears explicitly at the right point — NOT via
3685        //   an end-of-`drop` RAII guard (which would clear *after*
3686        //   `fire_deferred`).
3687        // - **/qa hardening (D047):** a fn/sink panic in the drain phase
3688        //   can escape the per-call `catch_unwind` isolation (e.g. a
3689        //   derived fn panicking when fired). Drop is NOT already
3690        //   unwinding, so it would otherwise skip BOTH the WaveState
3691        //   drain (→ next owning wave trips `wave_state_clear_outermost`)
3692        //   AND the `in_tick` clear (pre-D047 the explicit clear had this
3693        //   same window). Catching the drain panic, running the shared
3694        //   discard cleanup + `clear_in_tick`, then `resume_unwind` gives
3695        //   a drain-phase panic the identical atomicity as a
3696        //   closure-body panic.
3697        if std::thread::panicking() {
3698            // Closure-body panic — drop runs during that unwind. Discard
3699            // pending wave work (don't fire sinks mid-unwind — a sink
3700            // panic then aborts the process), release queued retains,
3701            // restore caches, then release ownership.
3702            self.discard_wave_cleanup();
3703            self.core.clear_in_tick();
3704            return;
3705        }
3706        if let Err(payload) =
3707            std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| self.core.drain_and_flush()))
3708        {
3709            self.discard_wave_cleanup();
3710            self.core.clear_in_tick();
3711            std::panic::resume_unwind(payload);
3712        }
3713        // Wave cleanup + extract deferred jobs under the lock.
3714        let (jobs, releases, cleanup_hooks, pending_wipes, snapshot_releases) = {
3715            let mut s = self.core.lock_state();
3716            // Q-beyond Sub-slice 1 + 3 (D108, 2026-05-09): WaveState
3717            // borrowed alongside state for wave-end cleanup. Per-thread;
3718            // independent of state. Sub-slice 3 moved deferred_* drains
3719            // into WaveState. /qa F1+F2 (2026-05-10) reverted in_tick +
3720            // currently_firing back to CoreState — clear via
3721            // CoreState::clear_wave_state under the held state lock.
3722            let result = with_wave_state(|ws| {
3723                s.clear_wave_state(ws);
3724                ws.clear_wave_state();
3725                // /qa A1 (2026-05-09) discipline preserved: drain snapshot
3726                // retains under lock, release lock-released below to avoid
3727                // binding re-entrance under held mutex / borrow.
3728                let snapshot_releases = Core::drain_wave_cache_snapshots(ws);
3729                // `drain_deferred` takes `deferred_flush_jobs` +
3730                // `deferred_handle_releases` (incl. rotation releases pushed
3731                // by `clear_wave_state` above) + Slice E2
3732                // `deferred_cleanup_hooks` + Slice E2 /qa Q2(b)
3733                // `pending_wipes` — all from WaveState post-Sub-slice-3.
3734                let (jobs, releases, hooks, wipes) = Core::drain_deferred(&mut s, ws);
3735                (jobs, releases, hooks, wipes, snapshot_releases)
3736            });
3737            // Release wave ownership now — AFTER drain + WaveState
3738            // cleanup, BEFORE `fire_deferred` below. Load-bearing: a sink
3739            // re-entering Core from a flush callback must observe
3740            // `in_tick` clear so its emit runs as a fresh owning wave.
3741            // (Mirrors the placement of the pre-D047 `s.in_tick = false`;
3742            // the drain-phase-panic window that placement had is closed
3743            // by the `catch_unwind` above.)
3744            self.core.clear_in_tick();
3745            result
3746        };
3747        // Lock dropped — fire deferred sinks + release retains + fire
3748        // cleanup hooks (Slice E2 OnInvalidate, D060 catch_unwind drain)
3749        // + fire eager wipes (D069).
3750        self.core
3751            .fire_deferred(jobs, releases, cleanup_hooks, pending_wipes);
3752        // /qa A1 fix (2026-05-09): release wave_cache_snapshots retains
3753        // lock-released. Pre-A1 these were released inside the held
3754        // state + cross_partition locks; binding finalizers re-entering
3755        // Core would deadlock against either mutex. Drained earlier
3756        // under the lock; released here after both mutexes dropped and
3757        // sinks have fired.
3758        for h in snapshot_releases {
3759            self.core.binding.release_handle(h);
3760        }
3761        // D1 patch (2026-05-09): clear the per-thread Slice G tier3
3762        // tracker at outermost wave-end (success path). Mirrors the
3763        // panic-discard branch above. Thread-local outlives BatchGuard
3764        // by default; cargo's thread-reuse across tests would propagate
3765        // stale entries. Cleared after sinks fire (sink callbacks may
3766        // re-enter Core via emit and could read the tier3 set
3767        // mid-wave; the wave is over here so clearing is safe).
3768        tier3_clear();
3769        // QA-fix group 2 (2026-05-09): explicitly drop the wave guards
3770        // in REVERSE acquisition order. `parking_lot::ReentrantMutex`
3771        // doesn't care about release order for same-thread holders, but
3772        // a future migration to a non-reentrant lock (or one with a
3773        // Drop side-effect tied to ordering) would silently break if we
3774        // relied on `SmallVec`'s default forward-iteration drop. The
3775        // ascending-acquire / descending-release pattern is the
3776        // canonical lock-discipline shape.
3777        while let Some(guard) = self.wave_guards.pop() {
3778            drop(guard);
3779        }
3780        // Phase H+ STRICT (D115): drain deferred producer ops now that
3781        // THIS BatchGuard's wave_guards are released. However, if an
3782        // outer scope still holds partitions (e.g., try_subscribe's
3783        // _wave_guard), draining here would re-enter Core::subscribe /
3784        // emit while those partitions are still in held_partitions,
3785        // triggering the ascending-order check. In that case, leave the
3786        // ops in the queue — the outermost BatchGuard (whose drop runs
3787        // with no outer partitions held) will drain them.
3788        self.core.drain_deferred_producer_ops();
3789    }
3790}