Skip to main content

graphrefly_core/
batch.rs

1//! Wave engine — drain loop, fire selection, emission commit, sink dispatch.
2//!
3//! Ports the wave-engine portion of the handle-protocol prototype
4//! (`~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`).
5//! Sibling to [`super::node`]; the dispatcher's other concerns
6//! (registration, subscription, pause/resume, terminal cascade,
7//! `set_deps`) live there.
8//!
9//! # Wave engine entry points
10//!
11//! - [`Core::run_wave`] — wave entry. Claims `in_tick` under the state lock,
12//!   runs `op` lock-released, then drains all transitive fn-fires and
13//!   flushes per-subscriber notifications. Each fn-fire iteration drops
14//!   the state lock around `BindingBoundary::invoke_fn` so user fn callbacks
15//!   can re-enter Core safely.
16//! - [`Core::drain_and_flush`] — drain phase + flush phase. Acquires/drops
17//!   the state lock per iteration around `invoke_fn`.
18//! - [`Core::commit_emission`] — equals-substitution + DIRTY/DATA/RESOLVED
19//!   queueing + child propagation. `&self`-only; bracket-fires
20//!   `BindingBoundary::custom_equals` lock-released.
21//! - [`Core::queue_notify`] — per-subscriber message queueing with
22//!   pause-buffer routing. Snapshots the subscriber list at first-touch-
23//!   per-wave so late subscribers (installed mid-wave between drain
24//!   iterations) don't receive duplicate deliveries from messages already
25//!   queued before they subscribed.
26//! - [`Core::deliver_data_to_consumer`] — single-edge propagation; marks
27//!   the consumer for fn-fire if its tracked-deps set is satisfied.
28//!   Called from `commit_emission`, plus `activate_derived` and
29//!   `set_deps` in [`super::node`].
30//!
31//! # Re-entrance discipline (Slice A close — M1 fully lock-released)
32//!
33//! - **Wave-end sink fires** drop the state lock first (Slice A-bigger
34//!   discipline).
35//! - **`BindingBoundary::invoke_fn`** in `fire_fn` fires lock-released —
36//!   user fn callbacks may re-enter `Core::emit` / `pause` / `resume` /
37//!   `invalidate` / `complete` / `error` / `teardown` and run a nested
38//!   wave (the existing `s.in_tick` re-entrance gate composes
39//!   transparently).
40//! - **`BindingBoundary::custom_equals`** in `commit_emission`'s equals
41//!   check fires lock-released.
42//! - **Subscribe-time handshake** is the one remaining lock-held callback.
43//!   It now fires per-tier (`[Start]`, `[Data(v)]`, `[Complete|Error]`,
44//!   `[Teardown]`) as separate sink calls, matching the canonical R1.3.5.a
45//!   tier-split. Re-entrance from a handshake sink callback panics with
46//!   the [`reentrance_guard`] diagnostic.
47
48use std::cell::RefCell;
49use std::collections::HashMap;
50use std::sync::Arc;
51
52use ahash::AHashSet;
53use indexmap::map::Entry;
54use indexmap::IndexMap;
55
56use smallvec::SmallVec;
57
58use crate::boundary::{DepBatch, FnEmission, FnResult};
59use crate::handle::{HandleId, NodeId, NO_HANDLE};
60use crate::message::Message;
61use crate::node::{Core, CoreState, EqualsMode, OperatorOp, Sink, TerminalKind};
62
63// Slice G (R1.3.2.d / R1.3.3.a) per-thread tier-3-emit tracker.
64//
65// **Wave scope = thread-local.** GraphReFly's wave-engine guarantees
66// that every emit at a given node within a single wave runs on the
67// same thread (the thread that holds the partition's `wave_owner`
68// `parking_lot::ReentrantMutex` — cross-thread emits at a node BLOCK
69// on that mutex and so always land in the OTHER thread's wave). A
70// wave is bounded above by the outermost `BatchGuard` drop on its
71// originating thread. Together this means a per-thread
72// `AHashSet<NodeId>` is the natural placement for "has node X already
73// emitted a tier-3 message in this wave?" — the set's lifetime
74// exactly matches the wave's, with no cross-thread or cross-wave
75// contamination.
76//
77// **History (D1 patch, 2026-05-09):** previously placed on
78// `crate::subgraph::SubgraphLockBox::state` per-partition (Q3 v1).
79// That placement was robust to per-partition wave parallelism but
80// vulnerable to mid-wave cross-thread `set_deps` partition splits:
81// thread A is mid-wave on partition P (wave_owner held) but between
82// fn fires (`currently_firing` empty); thread B's `set_deps` acquires
83// the state lock, P13's `currently_firing.is_empty()` check
84// short-circuits, the split proceeds, and X migrates from P to a
85// fresh orphan-side partition with an empty
86// `tier3_emitted_this_wave`. Thread A's subsequent emit at X then
87// mis-detects "first emit" and queues a Resolved alongside the prior
88// Data — R1.3.3.a violation. Thread-local placement is immune to this
89// hazard: thread B's split doesn't touch thread A's thread-local at
90// all.
91//
92// **Lifecycle:** populated by `Core::commit_emission` /
93// `Core::commit_emission_verbatim`; cleared at the OUTERMOST
94// `BatchGuard` drop on this thread (both success and panic-discard
95// paths). Re-entrant nested waves on the same thread share the set —
96// inner-wave emits add to the same set; the outermost drop is the
97// canonical clear point. Cross-thread emits NEVER touch this thread's
98// set (they serialize on the partition wave_owner; the cross-thread
99// emit happens in the OTHER thread's emit-loop and uses the OTHER
100// thread's tier3 thread-local).
101thread_local! {
102    static TIER3_EMITTED_THIS_WAVE: RefCell<AHashSet<NodeId>> = RefCell::new(AHashSet::new());
103}
104
105// Q-beyond Sub-slice 1 (D108 / 2026-05-09): per-thread wave-scoped state.
106//
107// **Design rationale (bench-driven, see `benches/lock_strategy.rs`):**
108// - S1 showed parking_lot Mutex same-thread re-acquire is ~14 ns/op,
109//   identical to thread_local borrow_mut. The "mutex hop is slow" intuition
110//   is wrong UNCONTENDED.
111// - S3 showed shared mutex on disjoint cross-thread keys is 2.7× slower
112//   than per-partition mutex / thread_local (35.9 vs 13.0 ns/op) — pure
113//   cache-line bouncing on the lock state itself.
114// - Conclusion: the cost of the prior `Core::cross_partition` mutex was
115//   dominated by cache-line bouncing across cores, NOT by single-thread
116//   mutex acquire overhead. Moving the four wave-scoped fields to a
117//   per-thread thread_local eliminates the bounce point entirely.
118//
119// **Wave scope = thread, same as `TIER3_EMITTED_THIS_WAVE`:** every emit
120// in a wave runs on the thread that holds the partition wave_owner;
121// cross-thread emits BLOCK on wave_owner and so always land in the OTHER
122// thread's wave context with the OTHER thread's WAVE_STATE. Mid-wave
123// cross-thread `set_deps` partition splits don't touch this thread's
124// thread-local at all (D1 lesson, applied here).
125//
126// **Lifecycle:** populated by `Core::commit_emission` /
127// `Core::queue_notify` / etc.; mostly drained mid-wave by the auto-resolve
128// sweep + cache snapshot commit/restore. Outermost `BatchGuard::drop`
129// releases any retained handles still in `wave_cache_snapshots` /
130// `deferred_handle_releases`. Defensive wave-start clear at outermost
131// owning BatchGuard entry guards against cargo's thread-reuse propagating
132// stale entries from a prior panicked-mid-wave test.
133thread_local! {
134    static WAVE_STATE: RefCell<WaveState> = RefCell::new(WaveState::new());
135}
136
137/// Wave-scoped state previously held under [`Core::cross_partition`]'s
138/// `parking_lot::Mutex<CrossPartitionState>`. Now per-thread (Q-beyond
139/// Sub-slice 1, 2026-05-09; Sub-slice 2 added `pending_fires` +
140/// `pending_notify`, 2026-05-09; Sub-slice 3 added `currently_firing`,
141/// `in_tick`, `deferred_flush_jobs`, `deferred_cleanup_hooks`,
142/// `pending_wipes`, `invalidate_hooks_fired_this_wave`, 2026-05-09).
143///
144/// All fields are populated and drained within one wave on one thread.
145/// Cross-thread access is structurally impossible — cross-thread emits
146/// block on partition `wave_owner` and land in the OTHER thread's wave
147/// context.
148///
149/// **Refcount discipline (load-bearing):** `wave_cache_snapshots`,
150/// `deferred_handle_releases`, and `pending_notify` hold binding-side
151/// handle retains. They MUST be drained (and released through
152/// `Core::binding.release_handle`) by the outermost `BatchGuard::drop`
153/// on success and panic paths. `pending_notify` holds one retain per
154/// payload-bearing message (one per `Message::payload_handle()`); the
155/// retains are taken in `Core::queue_notify` and balanced either by
156/// `flush_notifications` (success path: pushed into
157/// `deferred_handle_releases`) or directly in the panic-discard path of
158/// `BatchGuard::drop` (taken from `pending_notify` and released).
159///
160/// The thread_local has no `Drop` hook with access to a binding — a
161/// panic that bypasses `BatchGuard::drop` (e.g. panic OUTSIDE any batch)
162/// would leak retains until the thread exits OR the next outermost
163/// wave-start clear runs (which for safety we don't fire — clearing
164/// without releasing would double-leak by losing the retain). The
165/// defensive wave-start clear in `BatchGuard::begin_batch_with_guards`
166/// clears `pending_auto_resolve` + `pending_pause_overflow` +
167/// `pending_fires` (no retains) + `currently_firing` +
168/// `invalidate_hooks_fired_this_wave` (also no retains) but NOT the
169/// retain-holding fields — those must be empty by construction at
170/// outermost wave start (a prior wave's panic-discard path drained them,
171/// or a prior wave's success path drained them).
172pub(crate) struct WaveState {
173    /// Payload-handle releases owed for messages that landed in
174    /// `pending_notify` during this wave (one per `payload_handle()`).
175    /// `BatchGuard::drop` releases these after sinks fire and the lock
176    /// is dropped, balancing the retain done in `queue_notify`.
177    pub(crate) deferred_handle_releases: Vec<HandleId>,
178    /// Pre-wave cache snapshots used to restore state if the wave aborts
179    /// mid-flight (e.g., a `Core::batch` closure panics). Each entry is
180    /// `(node_id → old_cache_handle)` — the handle the node held BEFORE
181    /// the wave started writing to it. The snapshotted handle holds a
182    /// retain (taken when the snapshot was inserted) so it stays alive
183    /// for restoration. On wave success, snapshots are drained and their
184    /// retains released. On wave abort, each cache slot is restored from
185    /// the snapshot and the original retain transfers to the cache slot.
186    pub(crate) wave_cache_snapshots: HashMap<NodeId, HandleId>,
187    /// Nodes that need an auto-Resolved at wave end if they don't receive
188    /// a tier-3+ message from their own commit_emission. Populated by
189    /// the RESOLVED child propagation in `commit_emission`. Drained by
190    /// the auto-resolve sweep in `drain_and_flush`.
191    pub(crate) pending_auto_resolve: AHashSet<NodeId>,
192    /// R1.3.8.c pause-overflow ERROR synthesis queue. Recorded by
193    /// [`Core::queue_notify`] when the pause buffer first overflows;
194    /// drained at wave-end after the lock-released call to
195    /// `BindingBoundary::synthesize_pause_overflow_error`.
196    pub(crate) pending_pause_overflow: Vec<crate::node::PendingPauseOverflow>,
197    /// Nodes whose fn we owe a fire to — drained by [`Core::run_wave`].
198    ///
199    /// Q-beyond Sub-slice 2 (D108, 2026-05-09): moved from
200    /// `CoreState::pending_fires` to per-thread `WaveState`. Wave-scoped
201    /// — populated by `deliver_data_to_consumer`, `terminate_node`'s
202    /// child-cascade `QueueFire` branch, `activate_derived`'s producer
203    /// queueing, `resume`'s pending-wave consolidation, and operator
204    /// re-arm paths; drained by `pick_next_fire` / `fire_fn` /
205    /// `fire_regular` / `fire_operator` (each removes the firing node
206    /// before invoking).
207    pub(crate) pending_fires: AHashSet<NodeId>,
208    /// Per-node outgoing message buffer; flushed at wave end. Insertion-
209    /// ordered so flush order is deterministic — load-bearing for
210    /// R1.3.9.d meta-TEARDOWN ordering: when a parent and its meta
211    /// companion both have queued messages in the same wave, the meta
212    /// (queued first via `teardown_inner`'s recursion order) flushes
213    /// first.
214    ///
215    /// Each entry carries the per-wave subscriber snapshot taken at first
216    /// touch (Slice A close, M1: lock-released drain). Late subscribers
217    /// installed mid-wave between fn-fire iterations don't appear in
218    /// already-snapshotted entries; this is the load-bearing fix that
219    /// prevents duplicate-Data delivery when a handshake delivers the
220    /// post-commit cache and the wave's flush would otherwise also fire
221    /// to the same sink.
222    ///
223    /// Q-beyond Sub-slice 2 (D108, 2026-05-09): moved from
224    /// `CoreState::pending_notify` to per-thread `WaveState`. The map
225    /// holds a payload-handle retain per payload-bearing message
226    /// (`Message::payload_handle()`); these MUST be released by the
227    /// outermost `BatchGuard::drop` (success path: through
228    /// `flush_notifications` → `deferred_handle_releases`; panic path:
229    /// directly in `BatchGuard::drop`'s panic branch).
230    pub(crate) pending_notify: IndexMap<NodeId, PendingPerNode>,
231    // Q-beyond Sub-slice 3 (D108, 2026-05-09 → /qa F1+F2 reverted
232    // 2026-05-10): `in_tick` and `currently_firing` were moved from
233    // `CoreState` to per-thread `WaveState` in sub-slice 3, then moved
234    // BACK to `CoreState` after /qa surfaced two architectural
235    // regressions:
236    //
237    // - **F1 (in_tick):** per-thread placement broke cross-Core
238    //   same-thread BatchGuard isolation. A thread holding a live
239    //   BatchGuard on Core-A and starting a wave on Core-B would
240    //   read `ws.in_tick = true` (set by Core-A) → Core-B becomes
241    //   non-owning → Core-B's writes get drained by Core-A's
242    //   binding (silent corruption). Per-Core placement on
243    //   `CoreState::in_tick` restores cross-Core isolation.
244    //
245    // - **F2 (currently_firing):** per-thread placement silently
246    //   bypassed the cross-thread P13 partition-migration check in
247    //   `Core::set_deps`. Pre-sub-slice-3 the shared `s.currently_firing`
248    //   was readable cross-thread; thread B's set_deps could observe
249    //   thread A's firing pushes. Per-thread placement made thread B
250    //   read its own empty stack → P13 silently bypassed for
251    //   cross-thread set_deps. Per-Core placement restores the D091
252    //   safety check.
253    //
254    // The other 11 wave-scoped fields stay per-thread because they're
255    // accessed only by the wave-owner thread under `wave_owner`
256    // discipline (cross-thread emits BLOCK on partition wave_owner).
257    /// Slice E2 (R1.3.9.b strict per D057): per-wave-per-node dedup
258    /// for `OnInvalidate` cleanup hook firing. A node already in this
259    /// set this wave has already had its `OnInvalidate` queued into
260    /// `deferred_cleanup_hooks` and MUST NOT queue again, even if
261    /// `invalidate_inner` re-encounters it.
262    ///
263    /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
264    /// `CoreState::invalidate_hooks_fired_this_wave` to per-thread
265    /// `WaveState`. Wave-scoped — populated by `invalidate_inner` and
266    /// cleared by `WaveState::clear_wave_state`.
267    pub(crate) invalidate_hooks_fired_this_wave: AHashSet<NodeId>,
268    /// Deferred sink-fire jobs collected by `flush_notifications`.
269    /// `flush_notifications` populates this from `pending_notify`;
270    /// `Core::drain_deferred` takes it and `Core::fire_deferred` fires
271    /// each entry lock-released. Each tuple is
272    /// `(sinks_for_one_node_one_phase, phase_messages)`. Empty between
273    /// waves.
274    ///
275    /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
276    /// `CoreState::deferred_flush_jobs` to per-thread `WaveState`. No
277    /// retains held — the `Vec<Sink>` clones own Arcs that drop
278    /// naturally; the `Vec<Message>` payload retains were already moved
279    /// into `deferred_handle_releases` by `flush_notifications`.
280    pub(crate) deferred_flush_jobs: DeferredJobs,
281    /// Slice E2 (per D060/D061): lock-released drain queue for
282    /// `OnInvalidate` cleanup hooks. Populated by `Core::invalidate_inner`
283    /// when a node's cache transitions `!= NO_HANDLE → NO_HANDLE`;
284    /// drained after the lock drops at wave boundary by
285    /// `Core::fire_deferred` (each call wrapped in `catch_unwind` per
286    /// D060). Panic-discarded silently per D061.
287    ///
288    /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
289    /// `CoreState::deferred_cleanup_hooks` to per-thread `WaveState`.
290    pub(crate) deferred_cleanup_hooks: Vec<(NodeId, crate::boundary::CleanupTrigger)>,
291    /// Slice E2 /qa Q2(b) (D069): lock-released drain queue for
292    /// `BindingBoundary::wipe_ctx` calls fired eagerly from
293    /// `Core::terminate_node` when a resubscribable node terminates with
294    /// no live subscribers. Drained alongside `deferred_cleanup_hooks`
295    /// at wave boundary; same `catch_unwind` discipline. Panic-discarded
296    /// silently.
297    ///
298    /// Q-beyond Sub-slice 3 (D108, 2026-05-09): moved from
299    /// `CoreState::pending_wipes` to per-thread `WaveState`.
300    pub(crate) pending_wipes: Vec<NodeId>,
301}
302
303impl WaveState {
304    fn new() -> Self {
305        Self {
306            deferred_handle_releases: Vec::new(),
307            wave_cache_snapshots: HashMap::new(),
308            pending_auto_resolve: AHashSet::new(),
309            pending_pause_overflow: Vec::new(),
310            pending_fires: AHashSet::new(),
311            pending_notify: IndexMap::new(),
312            invalidate_hooks_fired_this_wave: AHashSet::new(),
313            deferred_flush_jobs: Vec::new(),
314            deferred_cleanup_hooks: Vec::new(),
315            pending_wipes: Vec::new(),
316        }
317    }
318
319    /// Wave-end clear of the non-retain-holding fields. Called from
320    /// [`Core::drain_and_flush`]'s wave-end path. Fields holding retains
321    /// (`wave_cache_snapshots`, `deferred_handle_releases`,
322    /// `pending_notify`) are NOT cleared here — they follow the
323    /// success/panic paths' explicit drain discipline in
324    /// `BatchGuard::drop`.
325    pub(crate) fn clear_wave_state(&mut self) {
326        self.pending_auto_resolve.clear();
327        // pending_pause_overflow is normally drained by drain_and_flush
328        // via the synthesis loop. If a wave is panic-discarded BEFORE
329        // synthesis runs, BatchGuard::drop's panic path also clears it
330        // explicitly. Pre-wave defensive clear in
331        // `begin_batch_with_guards` makes this idempotent.
332        self.pending_pause_overflow.clear();
333        // Sub-slice 2: pending_fires is intentionally NOT cleared
334        // here. Two reasons:
335        //   1. Wave-success drain empties it by construction: every
336        //      `pick_next_fire` selection is removed by
337        //      `fire_regular` / `fire_operator` before invocation,
338        //      and `drain_and_flush` only exits when the set is empty.
339        //   2. The `Core::resume` default-mode consolidated-fire
340        //      pattern stages an entry OUTSIDE any in-tick wave and
341        //      then enters a new wave to drain it; clearing here
342        //      would erase that pre-staged entry. The panic-discard
343        //      path in `BatchGuard::drop` clears it explicitly.
344
345        // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
346        // CoreState::currently_firing — defensive clear there.
347        // Slice E2 (D057): per-wave-per-node OnInvalidate dedup is
348        // wave-scoped — cleared so the next wave can fire cleanups
349        // again.
350        self.invalidate_hooks_fired_this_wave.clear();
351        // `deferred_flush_jobs`, `deferred_cleanup_hooks`, and
352        // `pending_wipes` are intentionally NOT cleared here. They
353        // follow the same discipline as `deferred_handle_releases` /
354        // `pending_notify`:
355        //   - SUCCESS path (`BatchGuard::drop` non-panic): drained by
356        //     `Core::drain_deferred` AFTER `clear_wave_state` runs,
357        //     then fired lock-released by `Core::fire_deferred`.
358        //   - PANIC-DISCARD path (`BatchGuard::drop` panic): explicitly
359        //     `std::mem::take`-and-dropped AFTER `clear_wave_state`
360        //     runs (silently per D061 / D069).
361        // Clearing here would race the success path: queued sink fires
362        // / cleanup hooks / wipes would be erased BEFORE
363        // `drain_deferred` could take them.
364    }
365}
366
367/// Run a closure with mutable access to this thread's [`WaveState`].
368///
369/// Convention: prefer this helper over inline `WAVE_STATE.with(...)`
370/// for sites that touch ONE field. For sites that interleave state lock
371/// access with wave-state mutation, inline `WAVE_STATE.with(...)` keeps
372/// the lock-acquire / wave-state-borrow scopes visible (mirrors the
373/// pre-Q-beyond `let mut s = self.lock_state(); let mut cps = self.lock_cross_partition();`
374/// pattern).
375///
376/// **Re-entrance:** the closure MUST NOT re-enter Core in a way that
377/// would call back into `with_wave_state` — `RefCell::borrow_mut` panics
378/// on nested borrow. The same discipline that the prior
379/// `parking_lot::Mutex<CrossPartitionState>` enforced (no re-entry
380/// holding cross_partition) carries over.
381pub(crate) fn with_wave_state<R>(f: impl FnOnce(&mut WaveState) -> R) -> R {
382    WAVE_STATE.with(|cell| f(&mut cell.borrow_mut()))
383}
384
385/// Outermost-wave defensive clear of [`WaveState`]'s non-retain-holding
386/// fields. Called from [`BatchGuard::begin_batch_with_guards`] on
387/// outermost owning entry. Mirrors the pre-existing tier3 defensive
388/// clear (D1 patch, 2026-05-09) — guards against cargo's thread-reuse
389/// propagating stale entries from a prior panicked-mid-wave test.
390///
391/// The retain-holding fields (`wave_cache_snapshots` /
392/// `deferred_handle_releases`) MUST already be empty by construction at
393/// outermost wave entry — outermost `BatchGuard::drop` always drains
394/// them on both success and panic paths. If they're non-empty here it
395/// indicates a prior wave bypassed `BatchGuard::drop`; in that case
396/// the next BatchGuard's outermost drop will eventually drain them.
397fn wave_state_clear_outermost() {
398    with_wave_state(|ws| {
399        // /qa F4 (2026-05-10): debug_assert that retain-holding fields
400        // are empty at outermost wave start. The invariant claim is
401        // "outermost BatchGuard::drop drains them on both success and
402        // panic paths, so they're empty before the next wave starts."
403        // If a panic path EVER bypasses the drain (today: not reachable
404        // because BatchGuard::drop is robust against panicking sinks via
405        // catch_unwind), this assert catches it in tests immediately
406        // rather than letting stale entries leak into the next wave's
407        // drain (which would release Core-A's HandleIds via Core-B's
408        // binding under cross-Core same-thread sequential use).
409        debug_assert!(
410            ws.wave_cache_snapshots.is_empty(),
411            "wave_state_clear_outermost: wave_cache_snapshots non-empty at \
412             outermost wave start ({} entries) — prior BatchGuard::drop \
413             bypassed the drain (would leak retains into next wave's \
414             binding). See /qa F4 (2026-05-10).",
415            ws.wave_cache_snapshots.len()
416        );
417        debug_assert!(
418            ws.deferred_handle_releases.is_empty(),
419            "wave_state_clear_outermost: deferred_handle_releases non-empty \
420             at outermost wave start ({} entries) — prior BatchGuard::drop \
421             bypassed the drain. See /qa F4 (2026-05-10).",
422            ws.deferred_handle_releases.len()
423        );
424        debug_assert!(
425            ws.pending_notify.is_empty(),
426            "wave_state_clear_outermost: pending_notify non-empty at \
427             outermost wave start ({} entries) — prior BatchGuard::drop \
428             bypassed the drain. See /qa F4 (2026-05-10).",
429            ws.pending_notify.len()
430        );
431        ws.pending_auto_resolve.clear();
432        ws.pending_pause_overflow.clear();
433        // Sub-slice 2: pending_fires is intentionally NOT cleared here.
434        // Pre-Sub-slice-2 it lived on CoreState and survived between
435        // waves; load-bearing for `Core::resume`'s default-mode
436        // consolidated-fire pattern, which inserts into pending_fires
437        // OUTSIDE any in-tick wave (Phase 1, lock-held but `in_tick`
438        // false at that moment) and then calls `run_wave_for(node_id)`
439        // — `run_wave_for` enters a NEW outermost wave whose drain must
440        // pick up that pre-staged pending_fires entry. Clearing here
441        // would erase it.
442        //
443        // pending_fires holds no retains, so a stale entry from a
444        // prior panicked-mid-wave test that bypassed BatchGuard::drop
445        // would leak as a spurious fire on the next wave on the same
446        // thread (no refcount damage). The panic-discard path in
447        // BatchGuard::drop and the wave-success drain together
448        // guarantee pending_fires is empty by wave end; relying on
449        // that invariant matches the pre-refactor lifecycle.
450        //
451        // Intentionally NOT clearing wave_cache_snapshots /
452        // deferred_handle_releases / pending_notify here — those hold
453        // retains and need a binding to release. Documented invariant:
454        // they're empty by outermost wave start.
455
456        // Sub-slice 3 (2026-05-09; /qa F2 partially reverted 2026-05-10):
457        // defensively clear the OnInvalidate dedup set on outermost-wave
458        // entry. Holds no retains; a stale entry from a prior
459        // panicked-mid-wave test that bypassed BatchGuard::drop would
460        // only suppress the OnInvalidate cleanup hook for that node on
461        // the next wave (no refcount damage). Clearing matches the
462        // tier3 defensive-clear precedent.
463        //
464        // `currently_firing` was reverted to CoreState (per /qa F2 — the
465        // per-thread placement silently bypassed the cross-thread P13
466        // partition-migration check); its defensive clear lives in
467        // `CoreState::clear_wave_state` (which BatchGuard::drop runs
468        // wave-end on both success and panic paths).
469        ws.invalidate_hooks_fired_this_wave.clear();
470        // Intentionally NOT clearing deferred_flush_jobs /
471        // deferred_cleanup_hooks / pending_wipes here — by invariant
472        // they're empty at outermost wave start (drained on success
473        // by drain_deferred → fire_deferred; drained on panic by
474        // BatchGuard::drop's panic branch). Pre-clearing would race a
475        // hypothetical wave that staged into them OUTSIDE in_tick
476        // (none does today, but matching the deferred_handle_releases
477        // / pending_notify discipline keeps the invariant uniform).
478    });
479}
480
481/// Has `node` emitted a tier-3 (DATA / RESOLVED) message in the current
482/// wave on this thread? See [`TIER3_EMITTED_THIS_WAVE`] for the per-thread
483/// wave-scope rationale.
484fn tier3_check(node: NodeId) -> bool {
485    TIER3_EMITTED_THIS_WAVE.with(|s| s.borrow().contains(&node))
486}
487
488/// Mark `node` as having emitted a tier-3 message in the current wave on
489/// this thread. Idempotent. See [`TIER3_EMITTED_THIS_WAVE`].
490fn tier3_mark(node: NodeId) {
491    TIER3_EMITTED_THIS_WAVE.with(|s| {
492        s.borrow_mut().insert(node);
493    });
494}
495
496/// Wave-end clear of the per-thread tier3 tracker. Called from the
497/// OUTERMOST [`BatchGuard::drop`] on this thread (both success and
498/// panic-discard paths). Inner non-owning BatchGuard drops MUST NOT
499/// invoke this — the outer wave is still in flight and inner-wave marks
500/// are part of the outer wave's Slice G coalescing state.
501fn tier3_clear() {
502    TIER3_EMITTED_THIS_WAVE.with(|s| {
503        s.borrow_mut().clear();
504    });
505}
506
507/// Deferred sink-fire jobs collected during `flush_notifications`. Each
508/// entry pairs a snapshot of the sink Arcs to fire with the messages to
509/// deliver to them — one entry per (node × phase) cell with non-empty
510/// content. Drained from `CoreState` and fired lock-released.
511pub(crate) type DeferredJobs = Vec<(Vec<Sink>, Vec<Message>)>;
512
513/// Lock-released drain payload of the wave's BatchGuard:
514/// `(sink_jobs, handle_releases, OnInvalidate cleanup hooks, pending wipe_ctx fires)`.
515/// Returned by [`Core::drain_deferred`], consumed by [`Core::fire_deferred`].
516/// Sliced into a type alias to satisfy `clippy::type_complexity`.
517pub(crate) type WaveDeferred = (
518    DeferredJobs,
519    Vec<HandleId>,
520    Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
521    Vec<crate::handle::NodeId>,
522);
523
524/// One subscriber-snapshot epoch within a node's wave-end notification
525/// queue. A `PendingBatch` is opened the first time `queue_notify` runs
526/// for the node in a wave, and a fresh batch is opened whenever the node's
527/// `subscribers_revision` advances mid-wave (a new sink subscribes, an
528/// existing sink unsubscribes, or a handshake-time panic evicts an
529/// orphaned sink). All messages within one batch flush to the same sink
530/// list — the snapshot taken when the batch opened, frozen against
531/// subsequent revision bumps.
532pub(crate) struct PendingBatch {
533    /// `NodeRecord::subscribers_revision` value at the moment this batch
534    /// opened. Used by `queue_notify` to decide append-to-last-batch vs
535    /// open-fresh-batch on every push.
536    pub(crate) snapshot_revision: u64,
537    pub(crate) sinks: Vec<Sink>,
538    pub(crate) messages: Vec<Message>,
539}
540
541/// Per-node wave-end notification queue, structured as one or more
542/// subscriber-snapshot epochs (`batches`). The common case (no
543/// mid-wave subscribe / unsubscribe at this node) keeps a single
544/// inline batch — `SmallVec<[_; 1]>` keeps that allocation-free.
545///
546/// **Slice X4 / D2 (2026-05-08):** the prior shape was a single
547/// `(sinks, messages)` pair per node — the snapshot froze on first
548/// `queue_notify` and was reused for every subsequent emit to the same
549/// node in the wave. That caused the documented late-subscriber +
550/// multi-emit-per-wave gap (R1.3.5.a divergence): a sub installed
551/// between two emits to the same node was invisible to the second
552/// emit's flush slice. The revision-tracked batch list resolves it —
553/// late subs land in a fresh batch that frozenly carries them, while
554/// pre-subscribe batches retain their original snapshot so the new
555/// sub doesn't double-receive earlier emits via flush AND handshake.
556pub(crate) struct PendingPerNode {
557    pub(crate) batches: SmallVec<[PendingBatch; 1]>,
558}
559
560impl PendingPerNode {
561    /// Iterate every queued message for this node across all batches in
562    /// arrival order. Used by R1.3.3.a invariant assertions and the
563    /// auto-resolve / Slice-G coalescing tier-3-presence checks, which
564    /// reason about wave-content per node, not per batch.
565    pub(crate) fn iter_messages(&self) -> impl Iterator<Item = &Message> + '_ {
566        self.batches.iter().flat_map(|b| b.messages.iter())
567    }
568
569    /// Mutable counterpart for `iter_messages`. Used by
570    /// `rewrite_prior_resolved_to_data` to in-place rewrite Resolved
571    /// entries to Data when a wave detects a multi-emit case after the
572    /// fact.
573    pub(crate) fn iter_messages_mut(&mut self) -> impl Iterator<Item = &mut Message> + '_ {
574        self.batches.iter_mut().flat_map(|b| b.messages.iter_mut())
575    }
576}
577
578/// RAII helper for the A6 reentrancy guard (Slice F, 2026-05-07).
579///
580/// Pushes `node_id` onto [`WaveState::currently_firing`] on construction,
581/// pops it on Drop. [`Core::set_deps`] consults the stack and rejects
582/// `set_deps(N, ...)` from inside N's own fn-fire with
583/// [`crate::node::SetDepsError::ReentrantOnFiringNode`] — closing the
584/// D1 hazard where Phase-1's snapshot of `dep_handles` would refer to
585/// a different dep ordering than Phase-3's `tracked` storage.
586///
587/// Wraps the lock-released `invoke_fn` (and operator-equivalent FFI
588/// callbacks like `project_each` / `predicate_each`). Drop fires even
589/// on panic, so the stack stays balanced under user-fn unwinds.
590///
591/// Membership semantics (NOT strict LIFO): the only consumer of
592/// `currently_firing` is `Core::set_deps`'s reentrancy check, which uses
593/// `contains(&n)` — a set-membership test. Drop pops the right-most
594/// matching `node_id` via `rposition` + `swap_remove`. For a stack like
595/// `[A, B, A]` (A's fn re-enters B, B's fn re-enters A), B's drop pops
596/// the SECOND A (index 1) via swap_remove, leaving `[A, A]` — the
597/// physical order of the remaining As may not match construction order,
598/// but membership is preserved. If a future call site needs strict LIFO
599/// (e.g. "pop the most recently fired node"), switch to `pop()` + assert
600/// the popped value equals `self.node_id`. (QA A6, 2026-05-07)
601pub(crate) struct FiringGuard {
602    core: Core,
603    node_id: NodeId,
604    /// Phase H+ option (d) /qa N1(a) widened variant (2026-05-09):
605    /// whether this guard participates in the per-thread
606    /// `IN_PRODUCER_BUILD` accounting. Producer-pattern operator
607    /// activations (`zip` / `concat` / `race` / `take_until` /
608    /// `switch_map` / `exhaust_map` / `concat_map` / `merge_map`
609    /// — all `is_producer()`) DO participate: they SUPPRESS the H+
610    /// check during their build/project closure because those
611    /// closures legitimately subscribe to upstream sources
612    /// cross-partition (operator-internal activation-time setup,
613    /// not user-fn re-entry). Refactoring those operators to defer
614    /// inner subscribes to wave-end is the broader Phase H+ STRICT
615    /// variant scope; the limited variant carves them out via this
616    /// flag. Derived / dynamic / state user-fn fires do NOT
617    /// participate — the H+ check applies to them under the
618    /// "held non-empty AND not in producer build" gate (see
619    /// `crate::node::held_partitions` module docstring).
620    ///
621    /// **INVARIANT:** the value is captured at `FiringGuard::new`
622    /// from `NodeRecord::is_producer()` for the firing node and
623    /// must NEVER be re-derived at `Drop` time. `is_producer` is
624    /// stable for a node's lifetime per the current registration
625    /// API (a node's kind cannot change once registered), but a
626    /// future contributor adding mutability MUST honor this snapshot
627    /// to keep the `producer_build_enter` / `producer_build_exit`
628    /// pair balanced. A debug_assert in Drop verifies the snapshot
629    /// still matches when assertions are enabled.
630    is_producer_build: bool,
631}
632
633impl FiringGuard {
634    pub(crate) fn new(core: &Core, node_id: NodeId) -> Self {
635        // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
636        // CoreState (cross-thread visible, restoring the D091 P13 check).
637        // Push under the SAME state lock scope as the is_producer read —
638        // this also closes /qa F3 (the panic-window between
639        // with_wave_state push and Self construction is gone; a panic in
640        // `lock_state` itself can't leak the push because no push has
641        // run yet, and the push + is_producer read happen atomically
642        // under the state lock).
643        let is_producer = {
644            let mut s = core.lock_state();
645            s.currently_firing.push(node_id);
646            s.nodes
647                .get(&node_id)
648                .is_some_and(crate::node::NodeRecord::is_producer)
649        };
650        // Construct Self FIRST (capture the cached `is_producer_build`
651        // snapshot in the struct). Then call `producer_build_enter()`
652        // as a separate step. If a future contributor adds fallible
653        // / panicking code between Self construction and the enter
654        // call, the panic still leaves Self abandoned (no Drop runs
655        // because Self isn't bound to a name yet) and the
656        // `producer_build_enter` call hasn't been made — so no
657        // imbalance. This is the panic-safe ordering per /qa A4.
658        //
659        // **Cleanup-on-panic path:** a panic AFTER Self is constructed
660        // and bound to a named guard runs `Drop for FiringGuard`, which
661        // pops `currently_firing` under the state lock. A panic between
662        // the `lock_state` block above and Self construction would leak
663        // the push; in practice `lock_state` is parking_lot
664        // (no-panic-on-unpoisoned) and `core.clone()` is infallible, so
665        // the panic-window is empty for current code. The defensive
666        // clear in `CoreState::clear_wave_state` (called from
667        // BatchGuard::drop wave-end) catches any hypothetical leak.
668        let guard = Self {
669            core: core.clone(),
670            node_id,
671            is_producer_build: is_producer,
672        };
673        if is_producer {
674            crate::node::producer_build_enter();
675        }
676        guard
677    }
678}
679
680impl Drop for FiringGuard {
681    fn drop(&mut self) {
682        // INVARIANT (debug-asserted): `is_producer_build` must match
683        // the node's current `is_producer()` at Drop time. If a future
684        // refactor introduces post-construction node-kind mutation,
685        // this fails loudly under debug builds — the
686        // `producer_build_enter` / `producer_build_exit` pair would
687        // otherwise become unbalanced. Per /qa A5.
688        // /qa F2 reverted (2026-05-10): currently_firing moved BACK to
689        // CoreState. Pop under the SAME state lock scope as the debug
690        // invariant check, atomic with the construction-side push.
691        {
692            let mut s = self.core.lock_state();
693            #[cfg(debug_assertions)]
694            {
695                // INVARIANT: `is_producer_build` must match the node's
696                // current `is_producer()` at Drop time. If a future
697                // refactor introduces post-construction node-kind
698                // mutation, this fails loudly under debug builds — the
699                // `producer_build_enter` / `producer_build_exit` pair
700                // would otherwise become unbalanced. Per /qa A5.
701                let now_producer = s
702                    .nodes
703                    .get(&self.node_id)
704                    .is_some_and(crate::node::NodeRecord::is_producer);
705                // Allow node-removed-mid-fire (now `is_some_and(...)` is
706                // false) — that's a benign asymmetry. Real concern: a
707                // node that was non-producer at construction is now
708                // reported as producer (or vice versa for an existing
709                // node).
710                if s.nodes.contains_key(&self.node_id) {
711                    debug_assert_eq!(
712                        self.is_producer_build,
713                        now_producer,
714                        "FiringGuard invariant violation: node {:?} was {} at \
715                         construction but is {} at Drop. The is_producer flag \
716                         must be stable for a node's lifetime; see FiringGuard \
717                         struct docstring.",
718                        self.node_id,
719                        if self.is_producer_build {
720                            "is_producer=true"
721                        } else {
722                            "is_producer=false"
723                        },
724                        if now_producer {
725                            "is_producer=true"
726                        } else {
727                            "is_producer=false"
728                        },
729                    );
730                }
731            }
732            // Pop the right-most matching node_id (membership semantics —
733            // not strict LIFO). If absent, an external rebalance already
734            // popped — silent no-op (panic-in-Drop is poison).
735            if let Some(pos) = s.currently_firing.iter().rposition(|n| *n == self.node_id) {
736                s.currently_firing.swap_remove(pos);
737            }
738        }
739        // Phase H+ pair: decrement IN_PRODUCER_BUILD IFF we incremented
740        // in `new`. Done outside the WaveState borrow so the next
741        // wave-state access (potentially on a re-entry hot path) can
742        // proceed without our cell access on its critical section.
743        if self.is_producer_build {
744            crate::node::producer_build_exit();
745        }
746    }
747}
748
749/// Borrow the per-operator scratch slot as `&T`. Panics if the slot is
750/// uninitialized or the contained type doesn't match `T` — both are
751/// invariant violations for any `fire_op_*` helper that should only be
752/// called from `fire_operator`'s match arm for the matching variant.
753fn scratch_ref<T: crate::op_state::OperatorScratch>(s: &CoreState, node_id: NodeId) -> &T {
754    s.require_node(node_id)
755        .op_scratch
756        .as_ref()
757        .expect("op_scratch slot uninitialized for operator node")
758        .as_any_ref()
759        .downcast_ref::<T>()
760        .expect("op_scratch type mismatch")
761}
762
763/// Mutable borrow of the per-operator scratch slot. Same invariants as
764/// [`scratch_ref`].
765fn scratch_mut<T: crate::op_state::OperatorScratch>(s: &mut CoreState, node_id: NodeId) -> &mut T {
766    s.require_node_mut(node_id)
767        .op_scratch
768        .as_mut()
769        .expect("op_scratch slot uninitialized for operator node")
770        .as_any_mut()
771        .downcast_mut::<T>()
772        .expect("op_scratch type mismatch")
773}
774
775impl Core {
776    // -------------------------------------------------------------------
777    // Wave entry + drain
778    // -------------------------------------------------------------------
779
780    /// Wave entry. The caller passes a closure that performs the wave's
781    /// triggering operation (`commit_emission`, `terminate_node`, etc.).
782    /// The closure runs lock-released; closure-internal Core methods
783    /// acquire the state lock as they go.
784    ///
785    /// **Implementation:** delegates to [`Self::begin_batch`] for the
786    /// wave's RAII lifecycle. The returned `BatchGuard` holds the
787    /// `wave_owner` re-entrant mutex for the wave's duration (cross-thread
788    /// emits block; same-thread re-entry passes through), claims `in_tick`,
789    /// and on drop runs the drain + flush + sink-fire phases — OR, if the
790    /// closure panicked, the panic-discard path that restores cache
791    /// snapshots and clears in_tick. This unification gives `run_wave` the
792    /// same panic-safety guarantee as the user-facing `Core::batch`.
793    ///
794    /// **Re-entrance:** a closure invoked from inside another wave — the
795    /// inner `run_wave`'s `begin_batch` observes `in_tick=true`, the
796    /// returned guard is non-owning (`owns_tick=false`), drop is a no-op.
797    /// The outer wave's drain picks up the inner closure's queued work.
798    ///
799    /// **Lock-release discipline (Slice A close, M1):** all binding-side
800    /// callbacks except the subscribe-time handshake fire lock-released.
801    /// Sinks that re-enter Core run a nested wave; user fns that re-enter
802    /// Core run a nested wave; custom-equals oracles that re-enter Core
803    /// run a nested wave. Cross-thread emits block at `wave_owner` until
804    /// the in-flight wave's drain completes — preserving the user-facing
805    /// "emit returning means subscribers have observed" contract.
806    /// Wave entry with a known `seed` node. Acquires only the partitions
807    /// transitively touched from `seed` (downstream cascade via
808    /// `s.children` + R1.3.9.d meta-companion cascade) instead of every
809    /// current partition. The canonical Y1 parallelism win for per-seed
810    /// entry points (`Core::emit`, `Core::subscribe`'s activation,
811    /// `Core::pause` / `Core::resume` / `Core::invalidate` / `Core::complete`
812    /// / `Core::error` / `Core::teardown` / `Core::set_deps`'s
813    /// push-on-subscribe).
814    ///
815    /// Two threads with disjoint touched-partition sets run truly
816    /// parallel — they don't block each other on Core-global locks.
817    /// Same-thread re-entry passes through each partition's
818    /// `ReentrantMutex` transparently. Cross-thread emits on the SAME
819    /// partition (or any overlapping touched-partition set) serialize
820    /// per the per-partition `wave_owner` mutex, preserving the
821    /// "emit returning means subscribers have observed" contract.
822    ///
823    /// Slice Y1 / Phase E (2026-05-08).
824    pub(crate) fn run_wave_for<F>(&self, seed: crate::handle::NodeId, op: F)
825    where
826        F: FnOnce(&Self),
827    {
828        let _guard = self.begin_batch_for(seed);
829        op(self);
830    }
831
832    /// Drain retains held by `wave_cache_snapshots` and return them so
833    /// the caller can release them lock-released. Called from the
834    /// wave-success path in [`BatchGuard::drop`].
835    ///
836    /// Q-beyond Sub-slice 1 (D108, 2026-05-09): the snapshots map moved
837    /// to per-thread `WaveState`; signature takes `&mut WaveState`. The
838    /// drain-and-release-lock-released discipline (introduced as /qa A1
839    /// fix 2026-05-09 against the prior cross_partition mutex) carries
840    /// over: caller drains under WaveState borrow + state lock, releases
841    /// after both are dropped — `release_handle` may re-enter Core via
842    /// finalizers and re-entry under either guard would deadlock /
843    /// double-borrow.
844    #[must_use]
845    pub(crate) fn drain_wave_cache_snapshots(ws: &mut WaveState) -> Vec<HandleId> {
846        if ws.wave_cache_snapshots.is_empty() {
847            return Vec::new();
848        }
849        std::mem::take(&mut ws.wave_cache_snapshots)
850            .into_values()
851            .collect()
852    }
853
854    /// Restore cache slots from `wave_cache_snapshots` and clear the map.
855    /// Called from the wave-abort path in `BatchGuard::drop` (panic).
856    ///
857    /// For each snapshotted node:
858    ///
859    /// 1. Read the current cache (the in-flight new value).
860    /// 2. Set `cache = old_handle` (the snapshot's retained value).
861    /// 3. Release the now-unowned current cache handle.
862    ///
863    /// Returns the list of "current" handles to release outside the lock.
864    /// Q-beyond Sub-slice 1 (D108, 2026-05-09): the snapshots map moved
865    /// to per-thread `WaveState`; signature takes both `s` (for cache
866    /// slots) and `ws` (for the snapshots map).
867    pub(crate) fn restore_wave_cache_snapshots(
868        &self,
869        s: &mut CoreState,
870        ws: &mut WaveState,
871    ) -> Vec<HandleId> {
872        if ws.wave_cache_snapshots.is_empty() {
873            return Vec::new();
874        }
875        let snapshots = std::mem::take(&mut ws.wave_cache_snapshots);
876        let mut releases = Vec::with_capacity(snapshots.len());
877        for (node_id, old_handle) in snapshots {
878            let Some(rec) = s.nodes.get_mut(&node_id) else {
879                releases.push(old_handle);
880                continue;
881            };
882            let current = std::mem::replace(&mut rec.cache, old_handle);
883            if current != NO_HANDLE {
884                releases.push(current);
885            }
886        }
887        releases
888    }
889
890    /// Drain pending fires until quiescent, then flush wave-end notifications
891    /// to subscribers. Each fire iteration drops the state lock around the
892    /// binding's `invoke_fn` callback so user fns may re-enter Core safely.
893    ///
894    /// `&self`-only — manages its own locking. Called from [`Self::run_wave`]
895    /// and [`super::node::Core::activate_derived`] (via `run_wave`).
896    pub(crate) fn drain_and_flush(&self) {
897        let mut guard = 0u32;
898        loop {
899            // R1.3.8.c (Slice F, A3): if no fires are pending but there are
900            // queued pause-overflow ERRORs, synthesize them now. The
901            // resulting ERROR cascade may add to pending_fires (children
902            // settling their terminal state), so we loop back to drain.
903            //
904            // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): pending_fires
905            // and pending_pause_overflow both live on per-thread
906            // WaveState. State lock no longer required for either read.
907            let synth_pending = with_wave_state(|ws| {
908                if ws.pending_fires.is_empty() && !ws.pending_pause_overflow.is_empty() {
909                    std::mem::take(&mut ws.pending_pause_overflow)
910                } else {
911                    Vec::new()
912                }
913            });
914            for entry in synth_pending {
915                // Lock-released call to the binding hook. Default impl
916                // returns None — the binding has opted out of R1.3.8.c
917                // and we fall back to silent-drop + ResumeReport.dropped.
918                let handle = self.binding.synthesize_pause_overflow_error(
919                    entry.node_id,
920                    entry.dropped_count,
921                    entry.configured_max,
922                    entry.lock_held_ns / 1_000_000,
923                );
924                if let Some(h) = handle {
925                    // Re-enter Core::error to terminate the node and
926                    // cascade. We're inside a wave (`in_tick = true`),
927                    // so error() gets a non-owning batch guard — it
928                    // doesn't try to start its own drain. The cascade
929                    // queues into our outer drain via pending_fires
930                    // and pending_notify.
931                    self.error(entry.node_id, h);
932                }
933            }
934
935            // Pick next fire under a short lock. Also re-read the configured
936            // drain cap so callers can tune via `Core::set_max_batch_drain_iterations`
937            // without restarting waves mid-flight.
938            //
939            // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
940            // on per-thread WaveState; pick_next_fire takes both state and
941            // WaveState. The pending_size diagnostic and emptiness check
942            // also read WaveState. Borrow scopes are split: WaveState
943            // borrow drops before fire_fn runs (which re-borrows WaveState
944            // via fire_regular / fire_operator).
945            let (next, cap, pending_size) = {
946                let s = self.lock_state();
947                let cap = s.max_batch_drain_iterations;
948                let (next, pending_size) = with_wave_state(|ws| {
949                    if ws.pending_fires.is_empty() {
950                        return (None, 0);
951                    }
952                    let size = ws.pending_fires.len();
953                    let next = Self::pick_next_fire(&s, ws);
954                    (next, size)
955                });
956                if pending_size == 0 {
957                    break;
958                }
959                (next, cap, pending_size)
960            };
961            guard += 1;
962            assert!(
963                guard < cap,
964                "wave drain exceeded {cap} iterations \
965                 (pending_fires={pending_size}). Most likely cause: a runtime \
966                 cycle introduced by an operator that re-arms its own pending_fires \
967                 slot from inside `invoke_fn` (e.g. a producer that subscribes to \
968                 itself, or a fn that calls Core::emit on a node whose fn fires \
969                 the original node again). Structural cycles via set_deps are \
970                 rejected at edge-mutation time. Tune via Core::set_max_batch_drain_iterations \
971                 only with concrete evidence the workload needs more iterations."
972            );
973            let Some(next) = next else { break };
974            // fire_fn manages its own locking around invoke_fn.
975            self.fire_fn(next);
976        }
977        // Auto-resolve sweep: nodes registered in pending_auto_resolve
978        // by the RESOLVED child propagation need a Resolved if they didn't
979        // fire and settle via their own commit_emission. Check pending_notify
980        // for each candidate — if it has Dirty but no tier-3+ message, the
981        // node never settled and needs auto-Resolved. Route through
982        // queue_notify so paused nodes get the Resolved into their pause
983        // buffer.
984        let mut s = self.lock_state();
985        // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): pending_auto_resolve
986        // and pending_notify both live on per-thread WaveState. /qa A5
987        // fix (2026-05-09): explicit scope for the WaveState borrow so
988        // it drops BEFORE the for-loop. Inside the loop, `queue_notify`
989        // re-borrows WaveState for `pending_pause_overflow.push` /
990        // `pending_notify` writes — re-entrance on RefCell::borrow_mut
991        // would panic. Explicit scope makes the lifetime load-bearing.
992        let candidates = with_wave_state(|ws| std::mem::take(&mut ws.pending_auto_resolve));
993        for node_id in candidates {
994            let needs_resolve = with_wave_state(|ws| {
995                ws.pending_notify
996                    .get(&node_id)
997                    .is_some_and(|entry| !entry.iter_messages().any(|m| m.tier() >= 3))
998            });
999            if needs_resolve {
1000                self.queue_notify(&mut s, node_id, Message::Resolved);
1001            }
1002        }
1003        // Final flush phase — populates deferred_flush_jobs
1004        // from pending_notify (already carries per-node sink snapshots).
1005        self.flush_notifications(&mut s);
1006    }
1007
1008    /// Pick a node whose **transitive** upstream is fully settled.
1009    ///
1010    /// A node `id` is ready to fire iff none of its transitive ancestors
1011    /// (via the `deps` chain) is currently in `pending_fires`. Diamond
1012    /// glitch prevention requires transitive (not immediate-only) reasoning
1013    /// — for graph `A → {B, C, E}; D = combine(B, C); F = combine(D, E)`,
1014    /// a wave that fires `E` first adds `F` to `pending_fires` before `D`
1015    /// is added. An immediate-only readiness check would mistakenly pick
1016    /// `F` because neither `D` nor `E` are in `pending_fires` at that
1017    /// moment, firing `F` against the stale activation-time `D` cache and
1018    /// again later when `D` actually settles. Transitive upstream walk
1019    /// catches `B`/`C` pending and correctly defers `F`.
1020    ///
1021    /// Cost: O(V) per candidate; worst case O(N·V) per pick. The existing
1022    /// porting-deferred entry on `pick_next_fire` perf flagged this as a
1023    /// future per-node `unresolved_dep_count` refactor.
1024    /// Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
1025    /// per-thread WaveState. Caller passes `&WaveState` alongside
1026    /// `&CoreState` so the borrow scopes stay disjoint and visible.
1027    fn pick_next_fire(s: &CoreState, ws: &WaveState) -> Option<NodeId> {
1028        for &id in &ws.pending_fires {
1029            if Self::transitive_upstream_settled(s, ws, id) {
1030                return Some(id);
1031            }
1032        }
1033        // Cycle / no eligible candidate (every node has an upstream pending,
1034        // possibly via a cycle path): pick any so the drain guard advances.
1035        // The drain-iteration cap will catch genuine cycles.
1036        ws.pending_fires.iter().copied().next()
1037    }
1038
1039    fn transitive_upstream_settled(s: &CoreState, ws: &WaveState, node_id: NodeId) -> bool {
1040        let rec = s.require_node(node_id);
1041        if rec.dep_count() == 0 {
1042            return true;
1043        }
1044        let mut visited: ahash::AHashSet<NodeId> = ahash::AHashSet::new();
1045        let mut stack: Vec<NodeId> = rec.dep_ids_vec();
1046        while let Some(id) = stack.pop() {
1047            if !visited.insert(id) {
1048                continue;
1049            }
1050            if ws.pending_fires.contains(&id) {
1051                return false;
1052            }
1053            if let Some(r) = s.nodes.get(&id) {
1054                for dep_id in r.dep_ids() {
1055                    if !visited.contains(&dep_id) {
1056                        stack.push(dep_id);
1057                    }
1058                }
1059            }
1060        }
1061        true
1062    }
1063
1064    /// Wave drain entry point. Dispatches via `rec.op` to either the
1065    /// regular fn-fire path ([`Self::fire_regular`]) or the operator
1066    /// dispatch ([`Self::fire_operator`]).
1067    pub(crate) fn fire_fn(&self, node_id: NodeId) {
1068        let op = {
1069            let s = self.lock_state();
1070            s.nodes.get(&node_id).and_then(|r| r.op)
1071        };
1072        match op {
1073            Some(operator_op) => self.fire_operator(node_id, operator_op),
1074            None => {
1075                // State / Derived / Dynamic / Producer all dispatch via fn_id.
1076                self.fire_regular(node_id);
1077            }
1078        }
1079    }
1080
1081    /// Fire a node's fn lock-released around `invoke_fn`.
1082    ///
1083    /// Phase 1 (lock-held): remove from pending_fires, snapshot fn_id +
1084    /// dep_records → DepBatch + kind. Skip if terminal, first-run-gate-closed,
1085    /// or stateless.
1086    ///
1087    /// Phase 2 (lock-released): call `BindingBoundary::invoke_fn`. User fn
1088    /// callbacks may re-enter Core (`emit`, `pause`, etc.) and run a nested
1089    /// wave — the in_tick gate composes naturally because nested calls
1090    /// observe `in_tick = true` and skip their own drain.
1091    ///
1092    /// Phase 3 (lock-held): mark `has_fired_once`, store dynamic-tracked,
1093    /// decide between Noop+RESOLVED, single Data, or Batch.
1094    ///
1095    /// Phase 4: commit emissions. Single Data goes through
1096    /// `commit_emission` (with equals substitution). Batch emissions are
1097    /// processed in sequence — Data via `commit_emission_verbatim` (no
1098    /// equals substitution per R1.3.2.d / R1.3.3.c), Complete/Error via
1099    /// terminal cascade.
1100    #[allow(clippy::too_many_lines)] // Slice G added Noop / Batch tier-3 guards
1101    fn fire_regular(&self, node_id: NodeId) {
1102        enum FireAction {
1103            None,
1104            SingleData(HandleId),
1105            Batch(SmallVec<[FnEmission; 2]>),
1106        }
1107
1108        // Phase 1: snapshot inputs — build DepBatch per dep from dep_records.
1109        // `has_fired_once` is captured here for the Slice E2 OnRerun gate
1110        // (Phase 1.5 below): the cleanup hook only fires when the fn has
1111        // run at least once already in this activation cycle.
1112        let prep: Option<(crate::handle::FnId, Vec<DepBatch>, bool, bool)> = {
1113            let s = self.lock_state();
1114            // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
1115            // on per-thread WaveState. Removed via with_wave_state — no
1116            // re-entry concern because only the immediate remove happens
1117            // under the borrow.
1118            with_wave_state(|ws| {
1119                ws.pending_fires.remove(&node_id);
1120            });
1121            let rec = s.require_node(node_id);
1122            // Skip: terminal, first-run-gate-closed (R2.5.3 / R5.4 — partial
1123            // mode opts out of the gate per D011), or stateless.
1124            if rec.terminal.is_some() || (!rec.partial && rec.has_sentinel_deps()) {
1125                None
1126            } else {
1127                rec.fn_id.map(|fn_id| {
1128                    let dep_batches: Vec<DepBatch> = rec
1129                        .dep_records
1130                        .iter()
1131                        .map(|dr| DepBatch {
1132                            data: dr.data_batch.clone(),
1133                            prev_data: dr.prev_data,
1134                            involved: dr.involved_this_wave,
1135                        })
1136                        .collect();
1137                    (fn_id, dep_batches, rec.is_dynamic, rec.has_fired_once)
1138                })
1139            }
1140        };
1141        let Some((fn_id, dep_batches, is_dynamic, has_fired_once)) = prep else {
1142            return;
1143        };
1144
1145        // Phase 1.5 (Slice E2 — R2.4.5 OnRerun, lock-released per D045): if
1146        // the fn has fired at least once in this activation cycle, fire its
1147        // OnRerun cleanup hook BEFORE the next invoke_fn re-allocates fn-
1148        // local resources. First-fire is intentionally skipped — there is
1149        // no prior run to clean up. Fires OUTSIDE `FiringGuard` because
1150        // cleanup re-entrance is not the A6 reentrancy concern (which
1151        // protects against `set_deps(self, ...)` from inside the in-flight
1152        // invoke_fn). Operator nodes never reach this path (`fire_regular`
1153        // is the fn-id branch of `fire_fn`; operators dispatch via
1154        // `fire_operator`), so cleanup hooks correctly only fire for fn-
1155        // shaped nodes (state / derived / dynamic / producer).
1156        if has_fired_once {
1157            self.binding
1158                .cleanup_for(node_id, crate::boundary::CleanupTrigger::OnRerun);
1159        }
1160
1161        // Phase 2: invoke fn lock-released. A6 reentrancy guard is scoped to
1162        // the FFI call only — Phase 3's lock-held state mutation is not part
1163        // of "currently firing" because set_deps would already block on the
1164        // state lock by then. Drop on the guard pops the stack even if
1165        // invoke_fn panics, keeping `currently_firing` balanced.
1166        let result = {
1167            let _firing = FiringGuard::new(self, node_id);
1168            self.binding.invoke_fn(node_id, fn_id, &dep_batches)
1169        };
1170
1171        // Phase 3: apply result under the lock — defensive terminal check
1172        // (a sibling cascade may have terminated this node during phase 2).
1173        let action: FireAction = {
1174            let mut s = self.lock_state();
1175            // Defensive: node may have terminated mid-phase-2 via a sibling
1176            // cascade (a fn that re-entered `Core::error` on a path that
1177            // cascaded here). If so, release any payload handles and no-op.
1178            if s.require_node(node_id).terminal.is_some() {
1179                match &result {
1180                    FnResult::Data { handle, .. } => {
1181                        self.binding.release_handle(*handle);
1182                    }
1183                    FnResult::Batch { emissions, .. } => {
1184                        for em in emissions {
1185                            match em {
1186                                FnEmission::Data(h) | FnEmission::Error(h) => {
1187                                    self.binding.release_handle(*h);
1188                                }
1189                                FnEmission::Complete => {}
1190                            }
1191                        }
1192                    }
1193                    FnResult::Noop { .. } => {}
1194                }
1195                return;
1196            }
1197            let rec = s.require_node_mut(node_id);
1198            rec.has_fired_once = true;
1199            if is_dynamic {
1200                let tracked = match &result {
1201                    FnResult::Data { tracked, .. }
1202                    | FnResult::Noop { tracked }
1203                    | FnResult::Batch { tracked, .. } => tracked.clone(),
1204                };
1205                if let Some(t) = tracked {
1206                    rec.tracked = t.into_iter().collect();
1207                }
1208            }
1209            match result {
1210                FnResult::Noop { .. } => {
1211                    // Slice G: skip Resolved if a prior emission in the same
1212                    // wave already queued tier-3 (would violate R1.3.3.a).
1213                    // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify
1214                    // lives on per-thread WaveState. Borrow scoped to the
1215                    // tier3 read so queue_notify (which re-borrows
1216                    // WaveState) doesn't double-borrow.
1217                    let already_dirty = s.require_node(node_id).dirty;
1218                    let already_tier3 = with_wave_state(|ws| {
1219                        ws.pending_notify
1220                            .get(&node_id)
1221                            .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
1222                    });
1223                    if already_dirty && !already_tier3 {
1224                        self.queue_notify(&mut s, node_id, Message::Resolved);
1225                    }
1226                    FireAction::None
1227                }
1228                FnResult::Data { handle, .. } => FireAction::SingleData(handle),
1229                FnResult::Batch { emissions, .. } if emissions.is_empty() => {
1230                    // Empty Batch is equivalent to Noop — settle with
1231                    // RESOLVED if the node was dirty (R1.3.1.a). Slice G:
1232                    // skip if a prior emission already queued tier-3.
1233                    // Q-beyond Sub-slice 2 (D108, 2026-05-09): see Noop
1234                    // arm above for the WaveState borrow scope rationale.
1235                    let already_dirty = s.require_node(node_id).dirty;
1236                    let already_tier3 = with_wave_state(|ws| {
1237                        ws.pending_notify
1238                            .get(&node_id)
1239                            .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
1240                    });
1241                    if already_dirty && !already_tier3 {
1242                        self.queue_notify(&mut s, node_id, Message::Resolved);
1243                    }
1244                    FireAction::None
1245                }
1246                FnResult::Batch { emissions, .. } => FireAction::Batch(emissions),
1247            }
1248        };
1249
1250        // Phase 4: commit emissions.
1251        match action {
1252            FireAction::None => {}
1253            // Single Data — equals substitution applies (R1.3.2).
1254            FireAction::SingleData(handle) => {
1255                self.commit_emission(node_id, handle);
1256            }
1257            // Batch — process in sequence. No equals substitution
1258            // (R1.3.2.d / R1.3.3.c: multi-message waves pass verbatim).
1259            FireAction::Batch(emissions) => {
1260                self.commit_batch(node_id, emissions);
1261            }
1262        }
1263    }
1264
1265    /// Process a `FnResult::Batch` emissions sequence. Each `Data` goes
1266    /// through `commit_emission_verbatim` (no equals substitution per
1267    /// R1.3.2.d / R1.3.3.c). Terminal emissions (`Complete` / `Error`)
1268    /// cascade per R1.3.4; processing stops at the first terminal and
1269    /// remaining handles are released (R1.3.4.a: no further messages
1270    /// after terminal).
1271    fn commit_batch(&self, node_id: NodeId, emissions: SmallVec<[FnEmission; 2]>) {
1272        let mut iter = emissions.into_iter();
1273        for em in iter.by_ref() {
1274            match em {
1275                FnEmission::Data(handle) => {
1276                    self.commit_emission_verbatim(node_id, handle);
1277                }
1278                FnEmission::Complete => {
1279                    self.complete(node_id);
1280                    break;
1281                }
1282                FnEmission::Error(handle) => {
1283                    self.error(node_id, handle);
1284                    break;
1285                }
1286            }
1287        }
1288        // Release handles from any emissions after the terminal break.
1289        for em in iter {
1290            match em {
1291                FnEmission::Data(h) | FnEmission::Error(h) => {
1292                    self.binding.release_handle(h);
1293                }
1294                FnEmission::Complete => {}
1295            }
1296        }
1297    }
1298
1299    // -------------------------------------------------------------------
1300    // Emission commit — equals-substitution lives here
1301    // -------------------------------------------------------------------
1302
1303    /// Apply a node's emission. `&self`-only; brackets the equals check
1304    /// around a lock release so `BindingBoundary::custom_equals` can re-enter
1305    /// Core safely.
1306    ///
1307    /// Phase 1 (lock-held): defensive terminal short-circuit; snapshot
1308    /// equals_mode + old cache handle.
1309    ///
1310    /// Phase 2 (lock-released): call `handles_equal` — `EqualsMode::Identity`
1311    /// is a pure `u64` compare with no boundary call; `EqualsMode::Custom`
1312    /// crosses to the binding's `custom_equals` oracle, which may re-enter
1313    /// Core.
1314    ///
1315    /// Phase 3 (lock-held): set cache, queue Dirty + Data/Resolved into
1316    /// pending_notify (which snapshots subscribers on first touch),
1317    /// propagate to children.
1318    // Q2 / Q3 (2026-05-09) tipped past clippy's 100-line threshold; the
1319    // function is already a multi-phase wave-engine routine and breaking
1320    // out the four phases would obscure the lock-discipline.
1321    #[allow(clippy::too_many_lines)]
1322    pub(crate) fn commit_emission(&self, node_id: NodeId, new_handle: HandleId) {
1323        assert!(
1324            new_handle != NO_HANDLE,
1325            "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
1326        );
1327
1328        // Phase 1: terminal short-circuit + snapshot equals/cache.
1329        let snapshot = {
1330            let s = self.lock_state();
1331            let rec = s.require_node(node_id);
1332            if rec.terminal.is_some() {
1333                drop(s);
1334                self.binding.release_handle(new_handle);
1335                return;
1336            }
1337            (rec.cache, rec.equals)
1338        };
1339        let (old_handle, equals_mode) = snapshot;
1340
1341        // Slice G (2026-05-07): R1.3.2.d says equals substitution only
1342        // fires for SINGLE-DATA waves at one node. Detect "this is a
1343        // subsequent emit in the same wave at this node" via the
1344        // per-thread `TIER3_EMITTED_THIS_WAVE` thread-local
1345        // (D1 patch, 2026-05-09 — moved off per-partition state to be
1346        // robust against mid-wave cross-thread `set_deps` partition
1347        // splits). If set → multi-emit wave: skip equals, queue Data
1348        // verbatim, retroactively rewrite any prior Resolved (queued by
1349        // an earlier same-value emit's equals match) to Data using the
1350        // wave-start cache snapshot. Outside batch / first emit:
1351        // standard per-emit equals path. Thread-local lookup is
1352        // ~5ns and lock-free.
1353        let is_subsequent_emit_in_wave = tier3_check(node_id);
1354
1355        if is_subsequent_emit_in_wave {
1356            // Multi-emit wave detected. Skip equals, queue Data verbatim.
1357            // Also rewrite any prior Resolved entries to Data using the
1358            // wave-start cache snapshot.
1359            self.rewrite_prior_resolved_to_data(node_id);
1360            self.commit_emission_verbatim(node_id, new_handle);
1361            return;
1362        }
1363
1364        // Phase 2: equals check (lock-released for Custom).
1365        let is_data = !self.handles_equal_lock_released(equals_mode, old_handle, new_handle);
1366
1367        // Phase 3: apply emission under the lock. Defensive terminal
1368        // re-check — a concurrent cascade between phase 2 and phase 3
1369        // could have terminated the node.
1370        let mut s = self.lock_state();
1371        if s.require_node(node_id).terminal.is_some() {
1372            drop(s);
1373            self.binding.release_handle(new_handle);
1374            return;
1375        }
1376
1377        // R1.3.1.a condition (b): synthesize DIRTY only if node not already
1378        // dirty from an earlier emission in the same wave.
1379        let already_dirty = s.require_node(node_id).dirty;
1380        s.require_node_mut(node_id).dirty = true;
1381        if !already_dirty {
1382            self.queue_notify(&mut s, node_id, Message::Dirty);
1383        }
1384
1385        if is_data {
1386            // P3 (Slice A close /qa): re-read CURRENT cache. Same-thread
1387            // re-entry from a `custom_equals` oracle that called back into
1388            // `Core::emit` on this same node during phase 2's lock-released
1389            // equals check could have advanced the cache between phase 1's
1390            // snapshot (`old_handle`) and this point.
1391            let current_cache = s.require_node(node_id).cache;
1392            // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1393            // lives on per-thread WaveState. /qa F1 reverted (2026-05-10):
1394            // in_tick lives on CoreState (cross-Core isolation requires
1395            // per-Core placement, not per-thread). Read in_tick under the
1396            // already-held state lock; snapshot-take via with_wave_state.
1397            let in_tick = s.in_tick;
1398            let snapshot_taken = if in_tick && current_cache != NO_HANDLE {
1399                use std::collections::hash_map::Entry;
1400                with_wave_state(|ws| match ws.wave_cache_snapshots.entry(node_id) {
1401                    Entry::Vacant(slot) => {
1402                        slot.insert(current_cache);
1403                        true
1404                    }
1405                    Entry::Occupied(_) => false,
1406                })
1407            } else {
1408                false
1409            };
1410            s.require_node_mut(node_id).cache = new_handle;
1411            if current_cache != NO_HANDLE && !snapshot_taken {
1412                self.binding.release_handle(current_cache);
1413            }
1414            // Slice E1 (R2.6.5 / Lock 6.G): push DATA into the replay
1415            // buffer if the node opted in. RESOLVED entries are NOT
1416            // buffered (canonical "DATA only").
1417            self.push_replay_buffer(&mut s, node_id, new_handle);
1418            // Slice G (D1 patch, 2026-05-09): mark this node as having
1419            // emitted tier-3 in this wave on the per-thread tracker.
1420            tier3_mark(node_id);
1421            self.queue_notify(&mut s, node_id, Message::Data(new_handle));
1422            // Propagate to children
1423            let child_ids: Vec<NodeId> = s
1424                .children
1425                .get(&node_id)
1426                .map(|c| c.iter().copied().collect())
1427                .unwrap_or_default();
1428            for child_id in child_ids {
1429                let dep_idx = s.require_node(child_id).dep_index_of(node_id);
1430                if let Some(idx) = dep_idx {
1431                    self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
1432                }
1433            }
1434        } else {
1435            // RESOLVED: handle unchanged. Don't release; old still in use.
1436            // Slice G: snapshot cache so a subsequent same-wave emit can
1437            // rewrite this Resolved to Data using the snapshot.
1438            // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1439            // lives on per-thread WaveState. /qa F1 reverted (2026-05-10):
1440            // in_tick lives on CoreState — read under held state lock.
1441            let current_cache = s.require_node(node_id).cache;
1442            if s.in_tick && current_cache != NO_HANDLE {
1443                use std::collections::hash_map::Entry;
1444                with_wave_state(|ws| {
1445                    if let Entry::Vacant(slot) = ws.wave_cache_snapshots.entry(node_id) {
1446                        self.binding.retain_handle(current_cache);
1447                        slot.insert(current_cache);
1448                    }
1449                });
1450            }
1451            // Slice G (D1 patch, 2026-05-09): mark this node as having
1452            // emitted tier-3 in this wave on the per-thread tracker.
1453            tier3_mark(node_id);
1454            self.queue_notify(&mut s, node_id, Message::Resolved);
1455            let child_ids: Vec<NodeId> = s
1456                .children
1457                .get(&node_id)
1458                .map(|c| c.iter().copied().collect())
1459                .unwrap_or_default();
1460            // /qa A7 fix (2026-05-09): collect auto-resolve inserts
1461            // during the loop and bulk-insert into pending_auto_resolve
1462            // under a SINGLE cross_partition acquire after the loop.
1463            // Pre-fix the loop acquired `cross_partition` once per
1464            // child via `self.lock_cross_partition().pending_auto_resolve.insert(...)`,
1465            // which is N mutex hops for an N-child cascade. Cannot
1466            // hoist to acquire-cps-before-loop because `queue_notify`
1467            // (called inside the loop) also acquires cross_partition
1468            // for `pending_pause_overflow.push` in the rare overflow
1469            // case — re-entrance on the non-reentrant Mutex would
1470            // self-deadlock.
1471            let mut auto_resolve_inserts: SmallVec<[NodeId; 4]> = SmallVec::new();
1472            for child_id in child_ids {
1473                let already_involved = s.require_node(child_id).involved_this_wave;
1474                if !already_involved {
1475                    {
1476                        let child = s.require_node_mut(child_id);
1477                        child.involved_this_wave = true;
1478                        child.dirty = true;
1479                    }
1480                    self.queue_notify(&mut s, child_id, Message::Dirty);
1481                    // Q2 (2026-05-09): pending_auto_resolve lives on
1482                    // CrossPartitionState. Deferred to after-loop
1483                    // bulk insert per the /qa A7 fix above.
1484                    auto_resolve_inserts.push(child_id);
1485                }
1486            }
1487            // /qa A7 (2026-05-09) — preserved post-Sub-slice-1 (D108):
1488            // single WaveState borrow for the bulk-insert. queue_notify
1489            // above no longer holds the WaveState borrow by the time we
1490            // reach here, so this borrow is uncontested.
1491            if !auto_resolve_inserts.is_empty() {
1492                with_wave_state(|ws| ws.pending_auto_resolve.extend(auto_resolve_inserts));
1493            }
1494        }
1495    }
1496
1497    /// Slice G: when a multi-emit wave is detected at `node_id` (a second
1498    /// emit arrives while a prior tier-3 message is still pending), rewrite
1499    /// any `Resolved` entries from earlier emits to `Data(snapshot_cache)`
1500    /// so the wave conforms to R1.3.3.a (≥1 DATA OR exactly 1 RESOLVED).
1501    /// Touches both `pending_notify` (immediate-flush path) and the per-node
1502    /// pause buffer (paused path).
1503    fn rewrite_prior_resolved_to_data(&self, node_id: NodeId) {
1504        let mut s = self.lock_state();
1505        // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): wave_cache_snapshots
1506        // and pending_notify both live on per-thread WaveState. Single
1507        // WaveState borrow handles both the snapshot lookup and the
1508        // pending_notify rewrite; the pause-buffer path uses the state
1509        // lock and is independent of WaveState.
1510        let snapshot = match with_wave_state(|ws| ws.wave_cache_snapshots.get(&node_id).copied()) {
1511            Some(h) if h != NO_HANDLE => h,
1512            // No snapshot available — the prior Resolved was queued without
1513            // a cache (sentinel pre-emit). Nothing to rewrite to; the
1514            // multi-emit case from sentinel is fine (verbatim Data path).
1515            _ => return,
1516        };
1517        let mut retains_needed = 0u32;
1518        // Pending_notify path. Walk all batches' messages — Slice-G
1519        // coalescing reasons about wave-content per node, not per-batch.
1520        with_wave_state(|ws| {
1521            if let Some(entry) = ws.pending_notify.get_mut(&node_id) {
1522                for msg in entry.iter_messages_mut() {
1523                    if matches!(msg, Message::Resolved) {
1524                        *msg = Message::Data(snapshot);
1525                        retains_needed += 1;
1526                    }
1527                }
1528            }
1529        });
1530        // Pause-buffer path.
1531        if let Some(rec) = s.nodes.get_mut(&node_id) {
1532            if let crate::node::PauseState::Paused { buffer, .. } = &mut rec.pause_state {
1533                for msg in &mut *buffer {
1534                    if matches!(msg, Message::Resolved) {
1535                        *msg = Message::Data(snapshot);
1536                        retains_needed += 1;
1537                    }
1538                }
1539            }
1540        }
1541        drop(s);
1542        // Each rewritten Resolved → Data adds a payload retain that
1543        // queue_notify would otherwise have taken at emit time. The
1544        // snapshot already owns one retain (taken when cache was
1545        // snapshotted); we need one fresh retain per rewrite.
1546        for _ in 0..retains_needed {
1547            self.binding.retain_handle(snapshot);
1548        }
1549    }
1550
1551    /// Equals check that crosses the binding boundary lock-released for
1552    /// `EqualsMode::Custom`. Caller must NOT hold the state lock.
1553    fn handles_equal_lock_released(&self, mode: EqualsMode, a: HandleId, b: HandleId) -> bool {
1554        if a == b {
1555            return true; // identity-on-handles always sufficient
1556        }
1557        if a == NO_HANDLE || b == NO_HANDLE {
1558            return false;
1559        }
1560        match mode {
1561            EqualsMode::Identity => false,
1562            EqualsMode::Custom(handle) => self.binding.custom_equals(handle, a, b),
1563        }
1564    }
1565
1566    /// Commit a DATA emission **without** equals substitution — used by
1567    /// `FnResult::Batch` processing where multi-message waves pass through
1568    /// verbatim per R1.3.2.d / R1.3.3.c. DIRTY auto-prefix respects
1569    /// R1.3.1.a condition (b): only queued if node not already dirty.
1570    ///
1571    /// Structurally identical to the DATA branch of [`Self::commit_emission`]
1572    /// but skips the Phase 2 equals check entirely.
1573    fn commit_emission_verbatim(&self, node_id: NodeId, new_handle: HandleId) {
1574        assert!(
1575            new_handle != NO_HANDLE,
1576            "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
1577        );
1578
1579        let mut s = self.lock_state();
1580        let rec = s.require_node(node_id);
1581        if rec.terminal.is_some() {
1582            drop(s);
1583            self.binding.release_handle(new_handle);
1584            return;
1585        }
1586
1587        // R1.3.1.a condition (b): DIRTY only if not already dirty.
1588        let already_dirty = s.require_node(node_id).dirty;
1589        s.require_node_mut(node_id).dirty = true;
1590        if !already_dirty {
1591            self.queue_notify(&mut s, node_id, Message::Dirty);
1592        }
1593
1594        // Always DATA — no equals substitution for Batch emissions.
1595        // Q-beyond Sub-slice 1 (D108, 2026-05-09): wave_cache_snapshots
1596        // lives on per-thread WaveState. /qa F1 reverted (2026-05-10):
1597        // in_tick lives on CoreState — read under held state lock.
1598        let current_cache = s.require_node(node_id).cache;
1599        let snapshot_taken = if s.in_tick && current_cache != NO_HANDLE {
1600            use std::collections::hash_map::Entry;
1601            with_wave_state(|ws| match ws.wave_cache_snapshots.entry(node_id) {
1602                Entry::Vacant(slot) => {
1603                    slot.insert(current_cache);
1604                    true
1605                }
1606                Entry::Occupied(_) => false,
1607            })
1608        } else {
1609            false
1610        };
1611        s.require_node_mut(node_id).cache = new_handle;
1612        if current_cache != NO_HANDLE && !snapshot_taken {
1613            self.binding.release_handle(current_cache);
1614        }
1615        // Slice E1: replay buffer push (R2.6.5 / Lock 6.G).
1616        self.push_replay_buffer(&mut s, node_id, new_handle);
1617        // Slice G QA fix (A2, 2026-05-07) / D1 patch (2026-05-09): mark
1618        // tier3_emitted_this_wave on the per-thread tracker even on the
1619        // verbatim path. A subsequent commit_emission at the same node
1620        // in the same wave needs this flag to detect multi-emit and
1621        // skip equals substitution; without it, a Batch-then-standard
1622        // sequence would queue Resolved into a wave that already has
1623        // Data — violating R1.3.3.a. The Batch path itself still
1624        // passes verbatim per R1.3.3.c (we don't re-run equals here);
1625        // we just record that "this node has emitted tier-3 in this
1626        // wave."
1627        tier3_mark(node_id);
1628        self.queue_notify(&mut s, node_id, Message::Data(new_handle));
1629        // Propagate to children
1630        let child_ids: Vec<NodeId> = s
1631            .children
1632            .get(&node_id)
1633            .map(|c| c.iter().copied().collect())
1634            .unwrap_or_default();
1635        for child_id in child_ids {
1636            let dep_idx = s.require_node(child_id).dep_index_of(node_id);
1637            if let Some(idx) = dep_idx {
1638                self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
1639            }
1640        }
1641    }
1642
1643    /// Slice E1 (R2.6.5 / Lock 6.G): push a DATA handle into the node's
1644    /// replay buffer if opted in. Evicts oldest if cap exceeded; takes a
1645    /// fresh retain on push. RESOLVED is NOT buffered per canonical
1646    /// "DATA only" — call sites only invoke this for Data emissions.
1647    ///
1648    /// Evicted handle is queued into `cps.deferred_handle_releases`
1649    /// (released lock-released at flush time) per the binding-boundary
1650    /// lock-release discipline — `release_handle` may re-enter Core via
1651    /// finalizers and must not run while the state lock is held
1652    /// (QA A3, 2026-05-07). Q2 (2026-05-09): the queue moved to
1653    /// CrossPartitionState; this fn acquires `cross_partition` only
1654    /// when an eviction actually happens (the common case is no
1655    /// eviction → no second-mutex acquire).
1656    fn push_replay_buffer(&self, s: &mut CoreState, node_id: NodeId, new_handle: HandleId) {
1657        let rec = s.require_node_mut(node_id);
1658        let cap = match rec.replay_buffer_cap {
1659            Some(c) if c > 0 => c,
1660            _ => return,
1661        };
1662        self.binding.retain_handle(new_handle);
1663        rec.replay_buffer.push_back(new_handle);
1664        let evicted = if rec.replay_buffer.len() > cap {
1665            rec.replay_buffer.pop_front()
1666        } else {
1667            None
1668        };
1669        if let Some(h) = evicted {
1670            with_wave_state(|ws| ws.deferred_handle_releases.push(h));
1671        }
1672    }
1673
1674    // ===================================================================
1675    // Operator dispatch (Slice C-1, D009).
1676    //
1677    // `fire_operator` is the entry point for nodes whose `kind` is
1678    // `NodeKind::Operator(_)`. It branches on the `OperatorOp` discriminant
1679    // to per-operator helpers that snapshot inputs under the lock, drop the
1680    // lock to call the binding's bulk projection FFI, and reacquire to
1681    // apply emissions via `commit_emission_verbatim` (no per-item equals
1682    // dedup at the wire — operator output passes verbatim per the same
1683    // R1.3.2.d / R1.3.3.c rule that governs `FnResult::Batch`).
1684    //
1685    // **Refcount discipline:** inputs sourced from `dep_records[i].data_batch`
1686    // share retains owned by the wave's data-batch slot (released at
1687    // wave-end rotation in `clear_wave_state`). Operators that emit those
1688    // handles unchanged (`Filter`, `DistinctUntilChanged`, `Pairwise`'s
1689    // `prev` carry-over) take an additional retain via `retain_handle`
1690    // before passing to `commit_emission_verbatim` — the cache slot owns
1691    // its own share, independent of the data-batch slot's. Operators that
1692    // produce fresh handles (`Map` / `Scan` / `Reduce` / `Pairwise`'s
1693    // packed tuples) receive retains pre-bumped by the binding's bulk-
1694    // projection method.
1695    // ===================================================================
1696
1697    /// Operator dispatch entry. Pre-checks (terminal short-circuit, first-
1698    /// run gate accounting for `partial`, terminal-aware fire for `Reduce`)
1699    /// happen here; per-operator behavior lives in the `fire_op_*` helpers.
1700    fn fire_operator(&self, node_id: NodeId, op: OperatorOp) {
1701        // Phase 1 (lock-held): remove from pending_fires, evaluate skip.
1702        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
1703        // per-thread WaveState; state lock + WaveState borrow are
1704        // independent.
1705        let proceed = {
1706            let s = self.lock_state();
1707            with_wave_state(|ws| {
1708                ws.pending_fires.remove(&node_id);
1709            });
1710            let rec = s.require_node(node_id);
1711            if rec.terminal.is_some() {
1712                false
1713            } else {
1714                // First-run gate (R2.5.3 / R5.4). Partial-mode operators
1715                // (D011) opt out of the gate; otherwise we wait for every
1716                // dep to have delivered at least one real handle. Terminal-
1717                // aware operators (currently `Reduce`) additionally count a
1718                // dep terminal as "real input" so they can fire on
1719                // upstream COMPLETE-without-DATA and emit the seed.
1720                let has_real_input = !rec.has_sentinel_deps()
1721                    || rec.dep_records.iter().any(|dr| dr.terminal.is_some());
1722                rec.partial || has_real_input
1723            }
1724        };
1725        if !proceed {
1726            return;
1727        }
1728
1729        // A6 (Slice F, 2026-05-07): track operator fire on the
1730        // `currently_firing` stack so a binding-side project/predicate/fold
1731        // FFI callback that re-enters `Core::set_deps(node_id, ...)` is
1732        // rejected with `SetDepsError::ReentrantOnFiringNode`. Drop pops
1733        // the stack on panic too.
1734        let _firing = FiringGuard::new(self, node_id);
1735
1736        match op {
1737            OperatorOp::Map { fn_id } => self.fire_op_map(node_id, fn_id),
1738            OperatorOp::Filter { fn_id } => self.fire_op_filter(node_id, fn_id),
1739            OperatorOp::Scan { fn_id, .. } => self.fire_op_scan(node_id, fn_id),
1740            OperatorOp::Reduce { fn_id, .. } => self.fire_op_reduce(node_id, fn_id),
1741            OperatorOp::DistinctUntilChanged { equals_fn_id } => {
1742                self.fire_op_distinct(node_id, equals_fn_id);
1743            }
1744            OperatorOp::Pairwise { fn_id } => self.fire_op_pairwise(node_id, fn_id),
1745            OperatorOp::Combine { pack_fn } => self.fire_op_combine(node_id, pack_fn),
1746            OperatorOp::WithLatestFrom { pack_fn } => {
1747                self.fire_op_with_latest_from(node_id, pack_fn);
1748            }
1749            OperatorOp::Merge => self.fire_op_merge(node_id),
1750            OperatorOp::Take { count } => self.fire_op_take(node_id, count),
1751            OperatorOp::Skip { count } => self.fire_op_skip(node_id, count),
1752            OperatorOp::TakeWhile { fn_id } => self.fire_op_take_while(node_id, fn_id),
1753            // The variant carries `default` for `register_operator`'s
1754            // `make_op_scratch` path; once registered, the live default
1755            // is read from `LastState::default` inside `fire_op_last`.
1756            OperatorOp::Last { .. } => self.fire_op_last(node_id),
1757        }
1758    }
1759
1760    /// Snapshot the operator's single dep batch (transform constraint —
1761    /// R5.7 single-dep). Returns `(inputs, terminal)` where `inputs` is a
1762    /// fresh `Vec<HandleId>` (no retains) and `terminal` reflects
1763    /// `dep_records[0].terminal` at snapshot time.
1764    fn snapshot_op_dep0(&self, node_id: NodeId) -> (Vec<HandleId>, Option<TerminalKind>) {
1765        let s = self.lock_state();
1766        let rec = s.require_node(node_id);
1767        debug_assert!(
1768            !rec.dep_records.is_empty(),
1769            "transform operator must have ≥1 dep"
1770        );
1771        let dr = &rec.dep_records[0];
1772        (dr.data_batch.iter().copied().collect(), dr.terminal)
1773    }
1774
1775    /// Emit DIRTY (if not already dirty) followed by RESOLVED. Used by
1776    /// silent-drop operators (Filter / DistinctUntilChanged / Pairwise)
1777    /// when a wave's inputs all suppress and the operator needs to settle
1778    /// the wave for its subscribers (D018 — let DIRTY ride; queue RESOLVED
1779    /// on full-reject).
1780    fn settle_dirty_resolved(&self, node_id: NodeId) {
1781        let mut s = self.lock_state();
1782        if s.require_node(node_id).terminal.is_some() {
1783            return;
1784        }
1785        let already_dirty = s.require_node(node_id).dirty;
1786        s.require_node_mut(node_id).dirty = true;
1787        if !already_dirty {
1788            self.queue_notify(&mut s, node_id, Message::Dirty);
1789        }
1790        // Slice G: skip Resolved if pending_notify already has a tier-3
1791        // message — adding Resolved would violate R1.3.3.a.
1792        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
1793        // on per-thread WaveState; borrow scoped so queue_notify can
1794        // re-borrow.
1795        let already_tier3 = with_wave_state(|ws| {
1796            ws.pending_notify
1797                .get(&node_id)
1798                .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3))
1799        });
1800        if !already_tier3 {
1801            self.queue_notify(&mut s, node_id, Message::Resolved);
1802        }
1803    }
1804
1805    /// `OperatorOp::Map` dispatch.
1806    fn fire_op_map(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1807        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1808        // Mark fired regardless of input count (activation gate already
1809        // satisfied or partial-mode).
1810        {
1811            let mut s = self.lock_state();
1812            s.require_node_mut(node_id).has_fired_once = true;
1813        }
1814        if inputs.is_empty() {
1815            return;
1816        }
1817        // Phase 2 (lock-released): bulk project. Binding returns one
1818        // handle per input, each with a retain share already taken.
1819        let outputs = self.binding.project_each(fn_id, &inputs);
1820        // Phase 3: emit each output. `commit_emission_verbatim` consumes
1821        // the retain into the cache slot (and releases the prior cache
1822        // handle internally).
1823        for h in outputs {
1824            self.commit_emission_verbatim(node_id, h);
1825        }
1826    }
1827
1828    /// `OperatorOp::Filter` dispatch (D012/D018).
1829    fn fire_op_filter(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1830        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1831        {
1832            let mut s = self.lock_state();
1833            s.require_node_mut(node_id).has_fired_once = true;
1834        }
1835        if inputs.is_empty() {
1836            return;
1837        }
1838        // Phase 2: predicate per input.
1839        let pass = self.binding.predicate_each(fn_id, &inputs);
1840        debug_assert!(
1841            pass.len() == inputs.len(),
1842            "predicate_each returned {} bools for {} inputs",
1843            pass.len(),
1844            inputs.len()
1845        );
1846        // Phase 3: emit passing items verbatim. Take a fresh retain for
1847        // each — the data_batch slot still owns its retain (released at
1848        // wave-end rotation), and the cache slot needs its own.
1849        let mut emitted = 0usize;
1850        for (i, &h) in inputs.iter().enumerate() {
1851            if pass.get(i).copied().unwrap_or(false) {
1852                self.binding.retain_handle(h);
1853                self.commit_emission_verbatim(node_id, h);
1854                emitted += 1;
1855            }
1856        }
1857        // D018: full-reject settles with DIRTY+RESOLVED.
1858        if emitted == 0 {
1859            self.settle_dirty_resolved(node_id);
1860        }
1861    }
1862
1863    /// `OperatorOp::Scan` dispatch — left-fold emitting each new acc.
1864    fn fire_op_scan(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1865        use crate::op_state::ScanState;
1866        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1867        let acc = {
1868            let s = self.lock_state();
1869            scratch_ref::<ScanState>(&s, node_id).acc
1870        };
1871        {
1872            let mut s = self.lock_state();
1873            s.require_node_mut(node_id).has_fired_once = true;
1874        }
1875        if inputs.is_empty() {
1876            return;
1877        }
1878        // Phase 2: fold each input through. Returns N new handles, each
1879        // with a fresh retain.
1880        let new_states = self.binding.fold_each(fn_id, acc, &inputs);
1881        debug_assert!(
1882            new_states.len() == inputs.len(),
1883            "fold_each returned {} accs for {} inputs",
1884            new_states.len(),
1885            inputs.len()
1886        );
1887        // Phase 3a: update ScanState.acc to the LAST new acc. Take an
1888        // extra retain for the slot; release the prior acc's slot retain.
1889        let last_acc = new_states.last().copied();
1890        if let Some(last) = last_acc {
1891            let prev_acc = {
1892                let mut s = self.lock_state();
1893                let scratch = scratch_mut::<ScanState>(&mut s, node_id);
1894                let prev = scratch.acc;
1895                scratch.acc = last;
1896                prev
1897            };
1898            // Take the slot's retain on the new acc.
1899            self.binding.retain_handle(last);
1900            // Release the prior slot's retain (post-lock to keep binding
1901            // free to re-enter Core safely).
1902            if prev_acc != crate::handle::NO_HANDLE {
1903                self.binding.release_handle(prev_acc);
1904            }
1905        }
1906        // Phase 3b: emit each intermediate acc verbatim. `new_states`
1907        // entries each carry one retain from `fold_each`; that retain is
1908        // consumed by `commit_emission_verbatim` into the cache slot.
1909        for h in new_states {
1910            self.commit_emission_verbatim(node_id, h);
1911        }
1912    }
1913
1914    /// `OperatorOp::Reduce` dispatch — accumulates silently; emits acc on
1915    /// upstream COMPLETE (cascades ERROR verbatim).
1916    fn fire_op_reduce(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1917        use crate::op_state::ReduceState;
1918        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
1919        let acc = {
1920            let s = self.lock_state();
1921            scratch_ref::<ReduceState>(&s, node_id).acc
1922        };
1923        {
1924            let mut s = self.lock_state();
1925            s.require_node_mut(node_id).has_fired_once = true;
1926        }
1927        // Phase 2: accumulate (silent — no per-input emit).
1928        let new_states = if inputs.is_empty() {
1929            SmallVec::<[HandleId; 1]>::new()
1930        } else {
1931            self.binding.fold_each(fn_id, acc, &inputs)
1932        };
1933        debug_assert!(
1934            new_states.len() == inputs.len(),
1935            "fold_each returned {} accs for {} inputs",
1936            new_states.len(),
1937            inputs.len()
1938        );
1939        // Update ReduceState.acc to last new acc; release intermediate
1940        // states (we don't emit them) and the prior acc's slot retain.
1941        let last_acc = new_states.last().copied();
1942        let intermediates_to_release: Vec<HandleId> = if new_states.len() > 1 {
1943            new_states[..new_states.len() - 1].to_vec()
1944        } else {
1945            Vec::new()
1946        };
1947        let prev_acc_to_release = if let Some(last) = last_acc {
1948            let prev_acc = {
1949                let mut s = self.lock_state();
1950                let scratch = scratch_mut::<ReduceState>(&mut s, node_id);
1951                let prev = scratch.acc;
1952                scratch.acc = last;
1953                prev
1954            };
1955            self.binding.retain_handle(last);
1956            if prev_acc == crate::handle::NO_HANDLE {
1957                None
1958            } else {
1959                Some(prev_acc)
1960            }
1961        } else {
1962            None
1963        };
1964        // Release intermediate fold results (Reduce only emits the LAST,
1965        // but only on terminal). Each was retained by `fold_each`.
1966        for h in intermediates_to_release {
1967            self.binding.release_handle(h);
1968        }
1969        if let Some(h) = prev_acc_to_release {
1970            self.binding.release_handle(h);
1971        }
1972
1973        // Phase 3: emit on terminal.
1974        match terminal {
1975            None => {
1976                // Still accumulating; no emit. Subscribers see no message
1977                // for this wave (silent accumulation). The first wave that
1978                // pushes Reduce to fire produces a Dirty entry on the
1979                // upstream's commit, but Reduce itself doesn't queue any
1980                // tier-3 since R5 silently absorbs. v1: leave the
1981                // post-drain auto-resolve sweep to settle nothing —
1982                // pending_notify has no entry for Reduce so the sweep is
1983                // a no-op.
1984            }
1985            Some(TerminalKind::Complete) => {
1986                // Read the live acc (may be the seed if no DATA arrived)
1987                // and emit Data(acc) + Complete.
1988                let final_acc = {
1989                    let s = self.lock_state();
1990                    scratch_ref::<ReduceState>(&s, node_id).acc
1991                };
1992                if final_acc != crate::handle::NO_HANDLE {
1993                    // Emission needs its own retain (slot's retain is
1994                    // owned by ReduceState.acc until reset/Drop).
1995                    self.binding.retain_handle(final_acc);
1996                    self.commit_emission_verbatim(node_id, final_acc);
1997                }
1998                self.complete(node_id);
1999            }
2000            Some(TerminalKind::Error(h)) => {
2001                // Core::error transfers the caller's share into the
2002                // cascade (node.terminal + per-child dep_terminal slots);
2003                // no release at the error() boundary. Take a fresh share
2004                // here so the cascade owns it independently of the
2005                // dep_records[0].terminal slot's share.
2006                self.binding.retain_handle(h);
2007                self.error(node_id, h);
2008            }
2009        }
2010    }
2011
2012    /// `OperatorOp::DistinctUntilChanged` dispatch.
2013    fn fire_op_distinct(&self, node_id: NodeId, equals_fn_id: crate::handle::FnId) {
2014        use crate::op_state::DistinctState;
2015        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2016        let mut prev = {
2017            let s = self.lock_state();
2018            scratch_ref::<DistinctState>(&s, node_id).prev
2019        };
2020        {
2021            let mut s = self.lock_state();
2022            s.require_node_mut(node_id).has_fired_once = true;
2023        }
2024        if inputs.is_empty() {
2025            return;
2026        }
2027        // Take a working-copy retain on the initial prev so both the loop
2028        // (which releases old_prev on each non-equal item) and phase 3
2029        // (which releases the slot's original handle) each have their own
2030        // share. Without this, the loop's release of old_prev (== original
2031        // DistinctState.prev) double-releases against phase 3's stale_slot
2032        // release.
2033        if prev != crate::handle::NO_HANDLE {
2034            self.binding.retain_handle(prev);
2035        }
2036        // Phase 2: per-input equals(prev, current). Each non-equal input
2037        // is emitted and becomes the new prev. Equals fn_id reuses
2038        // `BindingBoundary::custom_equals`.
2039        let mut emitted = 0usize;
2040        for &h in &inputs {
2041            let equal = if prev == crate::handle::NO_HANDLE {
2042                false
2043            } else if prev == h {
2044                true
2045            } else {
2046                self.binding.custom_equals(equals_fn_id, prev, h)
2047            };
2048            if !equal {
2049                // Emit this input verbatim.
2050                self.binding.retain_handle(h);
2051                self.commit_emission_verbatim(node_id, h);
2052                // Update prev: take retain on new prev, release old
2053                // (working-copy retain from above or from prior iteration).
2054                self.binding.retain_handle(h);
2055                let old_prev = prev;
2056                prev = h;
2057                if old_prev != crate::handle::NO_HANDLE {
2058                    self.binding.release_handle(old_prev);
2059                }
2060                emitted += 1;
2061            }
2062        }
2063        // Phase 3: persist prev into DistinctState.prev slot. Release the
2064        // slot's original retain (stale_slot) — this is the slot-owned
2065        // share, independent of the working-copy share released in the
2066        // loop above.
2067        {
2068            let mut s = self.lock_state();
2069            let scratch = scratch_mut::<DistinctState>(&mut s, node_id);
2070            let stale_slot = scratch.prev;
2071            scratch.prev = prev;
2072            if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
2073                drop(s);
2074                self.binding.release_handle(stale_slot);
2075            }
2076        }
2077        // Release the working-copy retain on the final prev if it was
2078        // never released in the loop (i.e. no non-equal items passed,
2079        // prev == original). In that case stale_slot == prev, so phase 3
2080        // didn't release it either — but the working-copy retain is still
2081        // outstanding. Release it now.
2082        if emitted == 0 && prev != crate::handle::NO_HANDLE {
2083            self.binding.release_handle(prev);
2084        }
2085        if emitted == 0 {
2086            self.settle_dirty_resolved(node_id);
2087        }
2088    }
2089
2090    /// `OperatorOp::Pairwise` dispatch — emits `(prev, current)` tuples
2091    /// starting after the second value (first input swallowed, sets `prev`).
2092    fn fire_op_pairwise(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2093        use crate::op_state::PairwiseState;
2094        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2095        let mut prev = {
2096            let s = self.lock_state();
2097            scratch_ref::<PairwiseState>(&s, node_id).prev
2098        };
2099        {
2100            let mut s = self.lock_state();
2101            s.require_node_mut(node_id).has_fired_once = true;
2102        }
2103        if inputs.is_empty() {
2104            return;
2105        }
2106        let mut emitted = 0usize;
2107        for &h in &inputs {
2108            if prev == crate::handle::NO_HANDLE {
2109                // First-ever value — swallow, set prev. Retain for the
2110                // PairwiseState.prev slot (persisted in phase 3 below).
2111                self.binding.retain_handle(h);
2112                prev = h;
2113                continue;
2114            }
2115            // Pack (prev, current) into a tuple handle. Binding returns a
2116            // fresh retain on the packed handle.
2117            let packed = self.binding.pairwise_pack(fn_id, prev, h);
2118            self.commit_emission_verbatim(node_id, packed);
2119            // Advance prev: take retain on h, release old prev.
2120            self.binding.retain_handle(h);
2121            let old_prev = prev;
2122            prev = h;
2123            self.binding.release_handle(old_prev);
2124            emitted += 1;
2125        }
2126        // Persist prev into PairwiseState.prev slot.
2127        {
2128            let mut s = self.lock_state();
2129            let scratch = scratch_mut::<PairwiseState>(&mut s, node_id);
2130            let stale_slot = scratch.prev;
2131            scratch.prev = prev;
2132            if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
2133                drop(s);
2134                self.binding.release_handle(stale_slot);
2135            }
2136        }
2137        if emitted == 0 {
2138            self.settle_dirty_resolved(node_id);
2139        }
2140    }
2141
2142    // =================================================================
2143    // Slice C-2: multi-dep combinator operators (D020)
2144    // =================================================================
2145
2146    /// Snapshot all deps' "latest" handle for multi-dep combinators.
2147    /// For each dep: returns `data_batch.last()` if non-empty (dep fired
2148    /// this wave), else `prev_data` (last handle from previous wave).
2149    /// Also returns whether dep[0] (primary) had DATA this wave —
2150    /// needed by `fire_op_with_latest_from`.
2151    fn snapshot_op_all_latest(&self, node_id: NodeId) -> (SmallVec<[HandleId; 4]>, bool) {
2152        let s = self.lock_state();
2153        let rec = s.require_node(node_id);
2154        let primary_fired = rec
2155            .dep_records
2156            .first()
2157            .is_some_and(|dr| !dr.data_batch.is_empty());
2158        let latest: SmallVec<[HandleId; 4]> = rec
2159            .dep_records
2160            .iter()
2161            .map(|dr| dr.data_batch.last().copied().unwrap_or(dr.prev_data))
2162            .collect();
2163        (latest, primary_fired)
2164    }
2165
2166    /// `OperatorOp::Combine` dispatch — N-dep combineLatest. Packs the
2167    /// latest handle per dep into a tuple via `pack_tuple`, emits on
2168    /// any dep fire. First-run gate (R2.5.3, partial: false) guarantees
2169    /// all deps have a real handle on first fire. Post-warmup INVALIDATE
2170    /// guard: if any dep's prev_data was cleared, settles with RESOLVED
2171    /// instead of packing a NO_HANDLE into the tuple.
2172    fn fire_op_combine(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
2173        let (latest, _primary_fired) = self.snapshot_op_all_latest(node_id);
2174        {
2175            let mut s = self.lock_state();
2176            s.require_node_mut(node_id).has_fired_once = true;
2177        }
2178        // Post-warmup INVALIDATE guard: a dep may have been invalidated
2179        // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
2180        if latest.contains(&crate::handle::NO_HANDLE) {
2181            self.settle_dirty_resolved(node_id);
2182            return;
2183        }
2184        let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
2185        self.commit_emission_verbatim(node_id, tuple_handle);
2186    }
2187
2188    /// `OperatorOp::WithLatestFrom` dispatch — 2-dep, fire-on-primary-only
2189    /// (D021 / Phase 10.5). Emits `[primary, secondary]` pair only when
2190    /// dep[0] (primary) has DATA in the wave. If only dep[1] fires →
2191    /// RESOLVED. Post-warmup INVALIDATE guard: if secondary latest is
2192    /// `NO_HANDLE` (INVALIDATE cleared it), settles with RESOLVED.
2193    fn fire_op_with_latest_from(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
2194        let (latest, primary_fired) = self.snapshot_op_all_latest(node_id);
2195        let first_fire = {
2196            let mut s = self.lock_state();
2197            let rec = s.require_node_mut(node_id);
2198            let was_first = !rec.has_fired_once;
2199            rec.has_fired_once = true;
2200            was_first
2201        };
2202        // On first fire (gate release), always emit — the first-run gate
2203        // guarantees both deps have values (via prev_data fallback in
2204        // snapshot). On subsequent fires, only emit when primary fires.
2205        if !first_fire && !primary_fired {
2206            // Secondary-only update — no downstream DATA.
2207            self.settle_dirty_resolved(node_id);
2208            return;
2209        }
2210        // Post-warmup INVALIDATE guard: secondary may have been invalidated
2211        // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
2212        debug_assert!(latest.len() == 2, "withLatestFrom requires exactly 2 deps");
2213        if latest[1] == crate::handle::NO_HANDLE {
2214            self.settle_dirty_resolved(node_id);
2215            return;
2216        }
2217        let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
2218        self.commit_emission_verbatim(node_id, tuple_handle);
2219    }
2220
2221    /// `OperatorOp::Merge` dispatch — N-dep, forward all DATA handles
2222    /// verbatim (D022). Zero FFI on fire: no transformation. Each dep's
2223    /// batch handles are collected, retained, and emitted individually.
2224    fn fire_op_merge(&self, node_id: NodeId) {
2225        // Collect all batch handles from all deps (flat).
2226        let all_handles: Vec<HandleId> = {
2227            let s = self.lock_state();
2228            let rec = s.require_node(node_id);
2229            rec.dep_records
2230                .iter()
2231                .flat_map(|dr| dr.data_batch.iter().copied())
2232                .collect()
2233        };
2234        {
2235            let mut s = self.lock_state();
2236            s.require_node_mut(node_id).has_fired_once = true;
2237        }
2238        if all_handles.is_empty() {
2239            // All deps settled RESOLVED this wave — no DATA to forward.
2240            self.settle_dirty_resolved(node_id);
2241            return;
2242        }
2243        // Emit each handle verbatim. Take a fresh retain per handle
2244        // (independent of the dep batch's retain which gets released at
2245        // wave-end). Matches Filter's discipline for passing inputs.
2246        for &h in &all_handles {
2247            self.binding.retain_handle(h);
2248            self.commit_emission_verbatim(node_id, h);
2249        }
2250    }
2251
2252    // =================================================================
2253    // Slice C-3: flow operators (D024)
2254    // =================================================================
2255
2256    /// `OperatorOp::Take` dispatch — emits the first `count` DATA values
2257    /// then self-completes via `Core::complete`. When `count == 0`, the
2258    /// first fire emits zero items then immediately self-completes
2259    /// (D027). Cross-wave counter lives in
2260    /// [`TakeState::count_emitted`](super::op_state::TakeState::count_emitted).
2261    fn fire_op_take(&self, node_id: NodeId, count: u32) {
2262        use crate::op_state::TakeState;
2263        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2264        // Snapshot current counter; mark fired regardless of input count
2265        // (activation gate already satisfied or partial-mode).
2266        let mut count_emitted = {
2267            let s = self.lock_state();
2268            scratch_ref::<TakeState>(&s, node_id).count_emitted
2269        };
2270        {
2271            let mut s = self.lock_state();
2272            s.require_node_mut(node_id).has_fired_once = true;
2273        }
2274        // Already at quota before any input this wave — self-complete
2275        // immediately. Covers `count == 0` (first-fire short-circuit) and
2276        // any defensive re-entry after the terminal-skip in `fire_operator`
2277        // already guards against double-complete.
2278        if count_emitted >= count {
2279            self.complete(node_id);
2280            return;
2281        }
2282        // Per-input emission loop. Each pass takes a fresh retain for the
2283        // cache slot; data_batch slot's retain is released at wave-end
2284        // rotation independently.
2285        for &h in &inputs {
2286            self.binding.retain_handle(h);
2287            self.commit_emission_verbatim(node_id, h);
2288            count_emitted = count_emitted.saturating_add(1);
2289            if count_emitted >= count {
2290                break;
2291            }
2292        }
2293        // Persist the updated counter.
2294        {
2295            let mut s = self.lock_state();
2296            scratch_mut::<TakeState>(&mut s, node_id).count_emitted = count_emitted;
2297        }
2298        // Self-complete if we hit the quota this wave. Upstream COMPLETE
2299        // (terminal == Some(Complete)) without us hitting the count
2300        // propagates via the standard auto-cascade — we don't intercept it.
2301        if count_emitted >= count {
2302            self.complete(node_id);
2303            return;
2304        }
2305        // If upstream is already Errored and we haven't hit count, the
2306        // standard cascade will propagate it. If the wave delivered no
2307        // inputs (e.g. RESOLVED from upstream), settle DIRTY+RESOLVED so
2308        // subscribers see the wave close.
2309        if inputs.is_empty() && terminal.is_none() {
2310            self.settle_dirty_resolved(node_id);
2311        }
2312    }
2313
2314    /// `OperatorOp::Skip` dispatch — drops the first `count` DATA values,
2315    /// then forwards the rest. Cross-wave counter lives in
2316    /// [`SkipState::count_skipped`](super::op_state::SkipState::count_skipped).
2317    /// On a wave where every input is still in the skip window, settles
2318    /// DIRTY+RESOLVED (D018 pattern) so subscribers see the wave close.
2319    fn fire_op_skip(&self, node_id: NodeId, count: u32) {
2320        use crate::op_state::SkipState;
2321        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2322        let mut count_skipped = {
2323            let s = self.lock_state();
2324            scratch_ref::<SkipState>(&s, node_id).count_skipped
2325        };
2326        {
2327            let mut s = self.lock_state();
2328            s.require_node_mut(node_id).has_fired_once = true;
2329        }
2330        // No early-return on empty inputs: the post-loop `emitted == 0`
2331        // settle handles the empty-inputs case identically to the
2332        // all-swallowed-by-skip-window case (Slice C-3 /qa P6 — symmetry
2333        // with `fire_op_take`).
2334        let mut emitted = 0usize;
2335        for &h in &inputs {
2336            if count_skipped < count {
2337                count_skipped = count_skipped.saturating_add(1);
2338                // Drop this input — the data_batch slot still owns its
2339                // retain (released at wave-end rotation). No emission.
2340                continue;
2341            }
2342            // Past the skip window — emit verbatim. Take a fresh retain
2343            // for the cache slot.
2344            self.binding.retain_handle(h);
2345            self.commit_emission_verbatim(node_id, h);
2346            emitted += 1;
2347        }
2348        // Persist the updated counter.
2349        {
2350            let mut s = self.lock_state();
2351            scratch_mut::<SkipState>(&mut s, node_id).count_skipped = count_skipped;
2352        }
2353        if emitted == 0 {
2354            self.settle_dirty_resolved(node_id);
2355        }
2356    }
2357
2358    /// `OperatorOp::TakeWhile` dispatch — emits while the predicate
2359    /// holds; on the first `false`, emits any preceding passes from the
2360    /// same batch then self-completes via `Core::complete`. Reuses
2361    /// [`BindingBoundary::predicate_each`] (D029).
2362    fn fire_op_take_while(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
2363        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
2364        {
2365            let mut s = self.lock_state();
2366            s.require_node_mut(node_id).has_fired_once = true;
2367        }
2368        if inputs.is_empty() {
2369            return;
2370        }
2371        // Phase 2: predicate per input.
2372        let pass = self.binding.predicate_each(fn_id, &inputs);
2373        debug_assert!(
2374            pass.len() == inputs.len(),
2375            "predicate_each returned {} bools for {} inputs",
2376            pass.len(),
2377            inputs.len()
2378        );
2379        // Phase 3: emit each input until the first false; then
2380        // self-complete. `fire_operator`'s `terminal.is_some()`
2381        // short-circuit gates re-entry after the self-complete cascade
2382        // installs the terminal slot — no extra `done` flag needed.
2383        let mut emitted = 0usize;
2384        let mut first_false_seen = false;
2385        for (i, &h) in inputs.iter().enumerate() {
2386            if pass.get(i).copied().unwrap_or(false) {
2387                self.binding.retain_handle(h);
2388                self.commit_emission_verbatim(node_id, h);
2389                emitted += 1;
2390            } else {
2391                first_false_seen = true;
2392                break;
2393            }
2394        }
2395        if first_false_seen {
2396            self.complete(node_id);
2397            return;
2398        }
2399        if emitted == 0 {
2400            // Whole batch passed but was empty (impossible here since
2401            // inputs.is_empty() returned early above) — defensive only.
2402            self.settle_dirty_resolved(node_id);
2403        }
2404    }
2405
2406    /// `OperatorOp::Last` dispatch — buffers the latest DATA; emits
2407    /// `Data(latest)` (or `Data(default)` if no DATA arrived and a
2408    /// default was registered) then `Complete` on upstream COMPLETE.
2409    /// On upstream ERROR, propagates verbatim. Storage:
2410    /// [`LastState`](super::op_state::LastState).
2411    ///
2412    /// **Silent-buffer semantics (mirrors Reduce):** on a non-terminal
2413    /// wave (`terminal == None`), `fire_op_last` updates the buffered
2414    /// `latest` handle but produces NO downstream wire message —
2415    /// subscribers observe the operator only when upstream
2416    /// COMPLETE/ERROR triggers the terminal branch. Intermediate
2417    /// inputs from the dep's batch are dropped on the floor (their
2418    /// `data_batch` retains release at wave-end rotation
2419    /// independently). Per-wave settlement on intermediate waves is
2420    /// the canonical behavior for terminal-aware operators.
2421    fn fire_op_last(&self, node_id: NodeId) {
2422        use crate::op_state::LastState;
2423        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
2424        {
2425            let mut s = self.lock_state();
2426            s.require_node_mut(node_id).has_fired_once = true;
2427        }
2428
2429        // Phase 2: buffer the latest input handle (if any). Retain new,
2430        // release old. data_batch slot's retain is released at wave-end
2431        // rotation independently — the LastState slot keeps its own
2432        // share so the value survives across waves.
2433        if let Some(&new_latest) = inputs.last() {
2434            let prev_latest = {
2435                let mut s = self.lock_state();
2436                let scratch = scratch_mut::<LastState>(&mut s, node_id);
2437                let prev = scratch.latest;
2438                scratch.latest = new_latest;
2439                prev
2440            };
2441            self.binding.retain_handle(new_latest);
2442            if prev_latest != crate::handle::NO_HANDLE {
2443                self.binding.release_handle(prev_latest);
2444            }
2445        }
2446
2447        // Phase 3: emit on terminal. Buffer-only fires (no terminal yet)
2448        // produce no downstream message — Reduce-style silent
2449        // accumulation. The post-drain auto-resolve sweep is a no-op
2450        // because pending_notify has no entry for Last.
2451        match terminal {
2452            None => {}
2453            Some(TerminalKind::Complete) => {
2454                // Read the live latest + default. If latest != NO_HANDLE,
2455                // emit it. Otherwise, if default != NO_HANDLE, emit default.
2456                // Otherwise, emit only Complete (empty stream, no default).
2457                let (latest, default) = {
2458                    let s = self.lock_state();
2459                    let scratch = scratch_ref::<LastState>(&s, node_id);
2460                    (scratch.latest, scratch.default)
2461                };
2462                let to_emit = if latest != crate::handle::NO_HANDLE {
2463                    Some(latest)
2464                } else if default != crate::handle::NO_HANDLE {
2465                    Some(default)
2466                } else {
2467                    None
2468                };
2469                if let Some(h) = to_emit {
2470                    // Emission needs its own retain — the LastState slot
2471                    // keeps its share until reset/Drop.
2472                    self.binding.retain_handle(h);
2473                    self.commit_emission_verbatim(node_id, h);
2474                }
2475                self.complete(node_id);
2476            }
2477            Some(TerminalKind::Error(h)) => {
2478                // Take a fresh share for the error cascade — the
2479                // dep_records[0].terminal slot keeps its own share
2480                // (released by reset_for_fresh_lifecycle / Drop).
2481                self.binding.retain_handle(h);
2482                self.error(node_id, h);
2483            }
2484        }
2485    }
2486
2487    pub(crate) fn deliver_data_to_consumer(
2488        &self,
2489        s: &mut CoreState,
2490        consumer_id: NodeId,
2491        dep_idx: usize,
2492        handle: HandleId,
2493    ) {
2494        // Retain the handle for the batch accumulation slot — each DATA
2495        // handle in `data_batch` owns a retain share, released at wave-end
2496        // rotation in `clear_wave_state`.
2497        self.binding.retain_handle(handle);
2498
2499        let is_dynamic;
2500        let is_state;
2501        let tracked_or_first_fire;
2502        // Slice F audit close (2026-05-07): default-mode pause suppression.
2503        // If the consumer is paused with `PausableMode::Default`, the
2504        // canonical-spec §2.6 behavior is to suppress fn-fire and consolidate
2505        // pause-window dep deliveries into one fn execution on RESUME.
2506        // Mark `pending_wave` on the pause state instead of adding to
2507        // `pending_fires`. The dep state still advances (the data_batch push
2508        // above is unchanged), and clear_wave_state still rotates the latest
2509        // dep DATA into prev_data — so when the fn ultimately fires on
2510        // RESUME, it sees the consolidated post-pause state.
2511        let suppressed_for_default_pause;
2512        {
2513            let consumer = s.require_node_mut(consumer_id);
2514            consumer.dep_records[dep_idx].data_batch.push(handle);
2515            consumer.dep_records[dep_idx].involved_this_wave = true;
2516            consumer.involved_this_wave = true;
2517            is_dynamic = consumer.is_dynamic;
2518            is_state = consumer.is_state();
2519            tracked_or_first_fire = !consumer.has_fired_once || consumer.tracked.contains(&dep_idx);
2520            suppressed_for_default_pause = consumer.pause_state.is_paused()
2521                && consumer.pausable == crate::node::PausableMode::Default;
2522            if suppressed_for_default_pause {
2523                consumer.pause_state.mark_pending_wave();
2524            }
2525        }
2526        if suppressed_for_default_pause {
2527            // Default-mode pause: don't add to pending_fires; RESUME will
2528            // schedule one consolidated fire.
2529            return;
2530        }
2531        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
2532        // per-thread WaveState. State lock + WaveState borrow are
2533        // independent.
2534        if is_state {
2535            // State nodes don't have deps; unreachable in practice.
2536        } else if is_dynamic {
2537            if tracked_or_first_fire {
2538                with_wave_state(|ws| {
2539                    ws.pending_fires.insert(consumer_id);
2540                });
2541            }
2542        } else {
2543            // Derived / Operator / Producer (Producer has no deps so won't
2544            // reach here, but the predicate-based dispatch handles it
2545            // uniformly).
2546            with_wave_state(|ws| {
2547                ws.pending_fires.insert(consumer_id);
2548            });
2549        }
2550    }
2551
2552    // -------------------------------------------------------------------
2553    // Subscriber notification
2554    // -------------------------------------------------------------------
2555
2556    /// Queue a wave-end message for `node_id`'s subscribers.
2557    ///
2558    /// **Revision-tracked sink-snapshot batches (Slice X4 / D2,
2559    /// 2026-05-08):** each push for a given node either appends the
2560    /// message to the open batch (if `NodeRecord::subscribers_revision`
2561    /// hasn't advanced since that batch opened — the common case — no
2562    /// extra allocation), or opens a fresh batch with a current sink
2563    /// snapshot frozen at the new revision. A sub installed mid-wave
2564    /// bumps `subscribers_revision`; the next `queue_notify` for the
2565    /// same node observes the bump and starts a new batch that includes
2566    /// the new sub. Pre-subscribe batches retain their original snapshot,
2567    /// so earlier emits flush to their original sink list — the new sub
2568    /// does NOT double-receive them via flush AND handshake replay,
2569    /// closing the late-subscriber + multi-emit-per-wave R1.3.5.a gap.
2570    ///
2571    /// Pause routing decision (R1.3.7.b tier table, §10.2 buffering):
2572    ///   Tier 3 (DATA / RESOLVED) and Tier 4 (INVALIDATE) buffer while
2573    ///   paused; all other tiers (DIRTY tier 1, PAUSE/RESUME tier 2,
2574    ///   COMPLETE/ERROR tier 5, TEARDOWN tier 6) bypass the buffer and
2575    ///   flush immediately. START (tier 0) is per-subscription and never
2576    ///   transits queue_notify.
2577    pub(crate) fn queue_notify(&self, s: &mut CoreState, node_id: NodeId, msg: Message) {
2578        // R1.3.3.a / R1.3.3.d (Slice G — re-added 2026-05-07): dev-mode
2579        // wave-content invariant assertion. The tier-3 slot at one node in
2580        // one wave is either ≥1 DATA or exactly 1 RESOLVED — never mixed,
2581        // never multiple RESOLVED. Slice G moved equals substitution from
2582        // per-emit to wave-end coalescing; this assert pins that the
2583        // dispatcher itself never queues a violating combination at the
2584        // queue_notify granularity. Resolved arrivals come from:
2585        //   1. The auto-resolve sweep in `drain_and_flush` (gates on
2586        //      `!any tier-3` so it can't add to a wave with Data).
2587        //   2. The wave-end equals-substitution pass (rewrites in place,
2588        //      doesn't go through queue_notify).
2589        // Both honor R1.3.3.a by construction post-Slice-G.
2590        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
2591        // on per-thread WaveState. The dev-mode invariant assertion
2592        // borrows WaveState briefly and drops before the rest of
2593        // queue_notify proceeds.
2594        #[cfg(debug_assertions)]
2595        if matches!(msg.tier(), 3) {
2596            with_wave_state(|ws| {
2597                if let Some(entry) = ws.pending_notify.get(&node_id) {
2598                    // Walk all batches' messages — R1.3.3.a is a per-node
2599                    // wave-content invariant, not per-batch (the X4 batches
2600                    // are subscriber-snapshot epochs; the protocol-level
2601                    // tier-3 invariant spans the whole wave for the node).
2602                    let has_data = entry.iter_messages().any(|m| matches!(m, Message::Data(_)));
2603                    let resolved_count = entry
2604                        .iter_messages()
2605                        .filter(|m| matches!(m, Message::Resolved))
2606                        .count();
2607                    let incoming_is_data = matches!(msg, Message::Data(_));
2608                    if incoming_is_data {
2609                        debug_assert!(
2610                            resolved_count == 0,
2611                            "R1.3.3.a violation at {node_id:?}: queueing Data into a \
2612                             wave that already contains Resolved — Slice G should have \
2613                             prevented this via wave-end coalescing"
2614                        );
2615                    } else {
2616                        debug_assert!(
2617                            !has_data,
2618                            "R1.3.3.a violation at {node_id:?}: queueing Resolved into a \
2619                             wave that already contains Data"
2620                        );
2621                        debug_assert!(
2622                            resolved_count == 0,
2623                            "R1.3.3.a violation at {node_id:?}: multiple Resolved in one \
2624                             wave at one node"
2625                        );
2626                    }
2627                }
2628            });
2629        }
2630
2631        let buffered_tier = matches!(msg.tier(), 3 | 4);
2632        let cap = s.pause_buffer_cap;
2633
2634        // Pause-routing branch — handles its own retain/release and returns
2635        // before we touch `pending_notify`, so the rec borrow is contained.
2636        {
2637            let rec = s.require_node_mut(node_id);
2638            if rec.subscribers.is_empty() {
2639                return;
2640            }
2641            // Slice F audit close (2026-05-07): pause routing depends on mode.
2642            //   - `ResumeAll`: buffer tier-3/4 for verbatim replay on RESUME.
2643            //   - `Default` + STATE node: state nodes have no fn-fire to
2644            //     suppress, so buffer like resumeAll (collapse-to-latest is
2645            //     a future enhancement; v1 keeps verbatim).
2646            //   - `Default` + COMPUTE node: suppression happens upstream at
2647            //     fn-fire scheduling (see `deliver_data_to_consumer`); no
2648            //     outgoing tier-3 is produced from this node while paused,
2649            //     so this branch is unreachable for compute-default-paused.
2650            //     Fallthrough to the non-paused queue path is fine.
2651            //   - `Off`: pause is ignored entirely — tier-3 flushes
2652            //     immediately. Fallthrough.
2653            let mode_buffers_tier3 = match rec.pausable {
2654                crate::node::PausableMode::ResumeAll => true,
2655                crate::node::PausableMode::Default => rec.is_state(),
2656                crate::node::PausableMode::Off => false,
2657            };
2658            if buffered_tier && mode_buffers_tier3 && rec.pause_state.is_paused() {
2659                if let Some(h) = msg.payload_handle() {
2660                    self.binding.retain_handle(h);
2661                }
2662                let push_result = rec.pause_state.push_buffered(msg, cap);
2663                for dm in push_result.dropped_msgs {
2664                    if let Some(h) = dm.payload_handle() {
2665                        self.binding.release_handle(h);
2666                    }
2667                }
2668                // R1.3.8.c (Slice F, A3): on first overflow this cycle,
2669                // schedule a synthesized ERROR for wave-end emission.
2670                // `cap` is `Some` here (an overflow can only happen with a
2671                // configured cap), so `unwrap` is safe.
2672                if push_result.first_overflow_this_cycle {
2673                    if let Some((dropped_count, lock_held_ns)) =
2674                        rec.pause_state.overflow_diagnostic()
2675                    {
2676                        // Q-beyond Sub-slice 1 (D108, 2026-05-09):
2677                        // pending_pause_overflow lives on per-thread WaveState.
2678                        with_wave_state(|ws| {
2679                            ws.pending_pause_overflow
2680                                .push(crate::node::PendingPauseOverflow {
2681                                    node_id,
2682                                    dropped_count,
2683                                    configured_max: cap.unwrap_or(0),
2684                                    lock_held_ns,
2685                                });
2686                        });
2687                    }
2688                }
2689                return;
2690            }
2691        }
2692
2693        // Non-paused queue path: retain payload handle and queue into
2694        // pending_notify. Released in `flush_notifications` after sinks
2695        // fire.
2696        if let Some(h) = msg.payload_handle() {
2697            self.binding.retain_handle(h);
2698        }
2699        Self::push_into_pending_notify(s, node_id, msg);
2700    }
2701
2702    /// Slice X4 / D2: revision-tracked batch decision for `queue_notify`'s
2703    /// non-paused path. Either appends `msg` to the open batch (if
2704    /// `subscribers_revision` hasn't advanced since it opened — common
2705    /// case, no extra allocation) or opens a fresh batch with a current
2706    /// sink snapshot frozen at the new revision.
2707    ///
2708    /// Borrow discipline: reads `subscribers_revision` and the snapshot
2709    /// from `s.nodes` BEFORE borrowing WaveState's `pending_notify` to
2710    /// keep the two scopes disjoint.
2711    ///
2712    /// Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_notify` moved
2713    /// to per-thread WaveState. The state-side read of
2714    /// `subscribers_revision` / `subscribers` happens before the
2715    /// `with_wave_state` block opens, then the WaveState borrow
2716    /// performs the entry insertion / append. State lock + WaveState
2717    /// borrow remain independent.
2718    ///
2719    /// Lock-discipline assumption: this read of `subscribers_revision`
2720    /// is safe because both the subscribe install path
2721    /// ([`crate::node::Core::subscribe`]) and `queue_notify` hold
2722    /// `CoreState`'s mutex when they bump / read the revision —
2723    /// concurrent subscribe/unsubscribe cannot interleave. **If
2724    /// `Core::subscribe` ever moves the sink-install lock-released
2725    /// (mirroring the lock-released drain refactor), the revision read
2726    /// here must re-validate post-borrow — otherwise a fresh batch
2727    /// could open with a stale snapshot.**
2728    fn push_into_pending_notify(s: &mut CoreState, node_id: NodeId, msg: Message) {
2729        let current_rev = s.require_node(node_id).subscribers_revision;
2730        let needs_new_batch = with_wave_state(|ws| {
2731            ws.pending_notify.get(&node_id).is_none_or(|entry| {
2732                entry
2733                    .batches
2734                    .last()
2735                    .is_none_or(|b| b.snapshot_revision != current_rev)
2736            })
2737        });
2738        let sinks_snapshot: Vec<Sink> = if needs_new_batch {
2739            s.require_node(node_id)
2740                .subscribers
2741                .values()
2742                .cloned()
2743                .collect()
2744        } else {
2745            Vec::new()
2746        };
2747        with_wave_state(|ws| match ws.pending_notify.entry(node_id) {
2748            Entry::Vacant(slot) => {
2749                let mut batches: SmallVec<[PendingBatch; 1]> = SmallVec::new();
2750                batches.push(PendingBatch {
2751                    snapshot_revision: current_rev,
2752                    sinks: sinks_snapshot,
2753                    messages: vec![msg],
2754                });
2755                slot.insert(PendingPerNode { batches });
2756            }
2757            Entry::Occupied(mut slot) => {
2758                let entry = slot.get_mut();
2759                if needs_new_batch {
2760                    entry.batches.push(PendingBatch {
2761                        snapshot_revision: current_rev,
2762                        sinks: sinks_snapshot,
2763                        messages: vec![msg],
2764                    });
2765                } else {
2766                    entry
2767                        .batches
2768                        .last_mut()
2769                        .expect("non-empty by construction (entry exists implies batch exists)")
2770                        .messages
2771                        .push(msg);
2772                }
2773            }
2774        });
2775    }
2776
2777    /// Collect wave-end sink-fire jobs into `ws.deferred_flush_jobs` and the
2778    /// payload-handle releases owed for `pending_notify` into
2779    /// `ws.deferred_handle_releases`. The actual sink fires + handle releases
2780    /// run **after** the state lock is dropped — see [`Core::run_wave`].
2781    ///
2782    /// R1.3.1.b two-phase propagation: phase 1 (DIRTY) propagates through
2783    /// the entire graph before phase 2 (DATA / RESOLVED) begins. Implemented
2784    /// here as cross-node tier-then-node collect — phase 1's jobs sit before
2785    /// phase 2's in `deferred_flush_jobs`, so when `run_wave` drains the
2786    /// queue lock-released, multi-node subscribers see all DIRTYs before any
2787    /// settle. Matches TS's drainPhase model without the per-tier queue
2788    /// indirection.
2789    ///
2790    /// Phase ordering:
2791    ///   1 → tier 1   (DIRTY)
2792    ///   2 → tier 3+4 (DATA/RESOLVED + INVALIDATE — the "settle slice")
2793    ///   3 → tier 5   (COMPLETE/ERROR)
2794    ///   4 → tier 6   (TEARDOWN)
2795    ///
2796    /// Tier 0 (START) is per-subscription (never enters pending_notify) and
2797    /// tier 2 (PAUSE/RESUME) is delivered through dedicated paths, also
2798    /// bypassing pending_notify; both are absent from this enumeration.
2799    ///
2800    /// Within a single phase, per-node insertion order (IndexMap iteration)
2801    /// is preserved — an emit on A before B → A's phase-2 messages flush
2802    /// before B's. Within a single node, message order is preserved.
2803    fn flush_notifications(&self, s: &mut CoreState) {
2804        const PHASES: &[&[u8]] = &[
2805            &[1],    // DIRTY
2806            &[3, 4], // DATA/RESOLVED + INVALIDATE
2807            &[5],    // COMPLETE/ERROR
2808            &[6],    // TEARDOWN
2809        ];
2810        // Q-beyond Sub-slice 1 + 2 + 3 (D108, 2026-05-09): pending_notify,
2811        // deferred_handle_releases, and deferred_flush_jobs all live on
2812        // per-thread WaveState. Take pending_notify under the WaveState
2813        // borrow, drop the borrow, run the per-phase loop (no WaveState
2814        // access in the loop body), then re-borrow WaveState at the end
2815        // to push the collected jobs and payload-handle releases.
2816        //
2817        // /qa F7 (2026-05-10): the `s: &mut CoreState` parameter is
2818        // currently unused inside the per-phase loop — `pending` was
2819        // moved off `s` to WaveState by sub-slice 2, and the per-batch
2820        // sink snapshot is already on the PendingBatch. Kept as a
2821        // parameter to preserve the caller's `let mut s = lock_state();
2822        // self.flush_notifications(&mut s);` invocation shape (caller
2823        // holds the state lock around this call — load-bearing for
2824        // R1.3.5.a per-tier handshake-vs-flush ordering). NOT a "lock
2825        // released" marker; the lock guard belongs to the caller and
2826        // is held throughout this function. A future change that adds
2827        // an in-loop state read should remove the discard below;
2828        // removing the parameter would break the caller's ability to
2829        // express the lock-discipline contract at the call site.
2830        let _ = &*s; // explicit no-op acknowledgement; lock held by caller.
2831        let pending = with_wave_state(|ws| std::mem::take(&mut ws.pending_notify));
2832        let mut jobs: DeferredJobs = Vec::new();
2833        for &phase_tiers in PHASES {
2834            for (_node_id, entry) in &pending {
2835                // Slice X4 / D2: iterate batches in arrival order. Each
2836                // batch carries its own sink snapshot frozen at open-time;
2837                // a batch's messages flush to ITS sinks only. Within a
2838                // single (phase, node), batches stay in arrival order so
2839                // emit-order semantics are preserved across batches.
2840                for batch in &entry.batches {
2841                    if batch.sinks.is_empty() {
2842                        continue;
2843                    }
2844                    let phase_msgs: Vec<Message> = batch
2845                        .messages
2846                        .iter()
2847                        .copied()
2848                        .filter(|m| phase_tiers.contains(&m.tier()))
2849                        .collect();
2850                    if phase_msgs.is_empty() {
2851                        continue;
2852                    }
2853                    let sinks_clone: Vec<Sink> = batch.sinks.iter().map(Arc::clone).collect();
2854                    jobs.push((sinks_clone, phase_msgs));
2855                }
2856            }
2857        }
2858        // Single WaveState borrow at the end: push the collected jobs
2859        // and the payload-handle releases. Refcount release balances the
2860        // retain done in `queue_notify` for every payload-bearing message
2861        // that landed in pending_notify (across ALL batches per node);
2862        // deferred to post-lock-drop so the binding's release path can't
2863        // re-enter Core under our lock.
2864        with_wave_state(|ws| {
2865            ws.deferred_flush_jobs.append(&mut jobs);
2866            for entry in pending.values() {
2867                for msg in entry.iter_messages() {
2868                    if let Some(h) = msg.payload_handle() {
2869                        ws.deferred_handle_releases.push(h);
2870                    }
2871                }
2872            }
2873        });
2874    }
2875
2876    /// Take the deferred sink-fire jobs, payload-handle releases,
2877    /// cleanup-hook fire queue, and pending-wipe queue from `WaveState`.
2878    /// Callers pair this with `drop(state_guard)` and a subsequent
2879    /// [`Self::fire_deferred`] call to deliver the wave's sinks, handle
2880    /// releases, Slice E2 OnInvalidate cleanup hooks, and Slice E2 /qa
2881    /// Q2(b) eager wipe_ctx fires lock-released.
2882    ///
2883    /// Q-beyond Sub-slice 1 (D108, 2026-05-09): `deferred_handle_releases`
2884    /// source moved to per-thread WaveState — signature takes `&mut WaveState`.
2885    /// Q-beyond Sub-slice 3 (D108, 2026-05-09): `deferred_flush_jobs`,
2886    /// `deferred_cleanup_hooks`, and `pending_wipes` all moved to
2887    /// WaveState. The `_s: &mut CoreState` parameter is now unused but
2888    /// kept to preserve the call-site lock-discipline ordering (caller
2889    /// holds the state lock around this call to interleave with prior
2890    /// `clear_wave_state` per-NodeRecord work).
2891    pub(crate) fn drain_deferred(_s: &mut CoreState, ws: &mut WaveState) -> WaveDeferred {
2892        (
2893            std::mem::take(&mut ws.deferred_flush_jobs),
2894            std::mem::take(&mut ws.deferred_handle_releases),
2895            std::mem::take(&mut ws.deferred_cleanup_hooks),
2896            std::mem::take(&mut ws.pending_wipes),
2897        )
2898    }
2899
2900    /// Fire deferred sink-fire jobs in collected order, then release the
2901    /// payload handles owed for messages that landed in `pending_notify`
2902    /// during the wave, then fire any queued Slice E2 OnInvalidate cleanup
2903    /// hooks. All three phases run lock-released so:
2904    /// - Sinks that call back into Core (emit, pause, etc.) re-acquire the
2905    ///   state lock cleanly and run their own nested wave.
2906    /// - The binding's `release_handle` path can't deadlock against a
2907    ///   binding-side mutex held by Core.
2908    /// - User cleanup closures (invoked via `BindingBoundary::cleanup_for`)
2909    ///   may safely re-enter Core for unrelated nodes.
2910    ///
2911    /// **Cleanup-drain panic discipline (D060):** each `cleanup_for` call
2912    /// is wrapped in `catch_unwind` so a single binding panic doesn't
2913    /// short-circuit the per-wave drain. All queued cleanup attempts run;
2914    /// if any panicked, the LAST panic re-raises after the loop completes
2915    /// (preserving wave-end discipline while still surfacing failures).
2916    /// Per D060, Core stays panic-naive about user code — bindings own
2917    /// their host-language panic policy inside `cleanup_for`; this
2918    /// `catch_unwind` is purely about drain-don't-short-circuit.
2919    pub(crate) fn fire_deferred(
2920        &self,
2921        jobs: DeferredJobs,
2922        releases: Vec<HandleId>,
2923        cleanup_hooks: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
2924        pending_wipes: Vec<crate::handle::NodeId>,
2925    ) {
2926        // Slice E2 /qa P1 (2026-05-07): wrap each sink-fire in
2927        // `catch_unwind` so a panicking sink doesn't unwind out of
2928        // `fire_deferred` and drop the queued `releases` +
2929        // `cleanup_hooks`. Mirrors Slice F audit fix A7's per-tier
2930        // handshake-fire discipline. Without this guard, a sink panic
2931        // here would silently leak handle retains AND silently drop
2932        // OnInvalidate cleanup hooks. AssertUnwindSafe is safe because
2933        // we re-raise the last panic at the end after running every
2934        // queued fire — drain ordering is preserved.
2935        let mut last_panic: Option<Box<dyn std::any::Any + Send>> = None;
2936        for (sinks, msgs) in jobs {
2937            for sink in &sinks {
2938                let sink = sink.clone();
2939                let msgs_ref = &msgs;
2940                let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2941                    sink(msgs_ref);
2942                }));
2943                if let Err(payload) = result {
2944                    last_panic = Some(payload);
2945                }
2946            }
2947        }
2948        for h in releases {
2949            self.binding.release_handle(h);
2950        }
2951        // Slice E2 (D060): drain cleanup hooks with per-item panic
2952        // isolation so the loop always completes. AssertUnwindSafe is
2953        // safe here because we don't rely on logical state being valid
2954        // post-panic — the panic propagates anyway after the drain ends.
2955        for (node_id, trigger) in cleanup_hooks {
2956            let binding = &self.binding;
2957            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2958                binding.cleanup_for(node_id, trigger);
2959            }));
2960            if let Err(payload) = result {
2961                last_panic = Some(payload);
2962            }
2963        }
2964        // Slice E2 /qa Q2(b) (D069): drain eager wipe_ctx queue with the
2965        // same per-item panic isolation. Fires AFTER cleanup hooks so a
2966        // resubscribable node's OnInvalidate (or any tier-3+ cleanup that
2967        // fires in the same wave) sees pre-wipe binding state if it
2968        // landed in the same wave as the terminal cascade. Mutually
2969        // exclusive with `Subscription::Drop`'s direct-fire site, but
2970        // even concurrent fires are idempotent (binding's `wipe_ctx`
2971        // calls `HashMap::remove` which is a no-op on absent keys).
2972        for node_id in pending_wipes {
2973            let binding = &self.binding;
2974            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2975                binding.wipe_ctx(node_id);
2976            }));
2977            if let Err(payload) = result {
2978                last_panic = Some(payload);
2979            }
2980        }
2981        if let Some(payload) = last_panic {
2982            std::panic::resume_unwind(payload);
2983        }
2984    }
2985
2986    // -------------------------------------------------------------------
2987    // User-facing batch — coalesce multiple emits into one wave
2988    // -------------------------------------------------------------------
2989
2990    /// Coalesce multiple emissions into a single wave. Every `emit` /
2991    /// `complete` / `error` / `teardown` / `invalidate` call inside `f`
2992    /// queues its downstream work; the wave drains when `f` returns.
2993    ///
2994    /// **R1.3.6.a** — DIRTY still propagates immediately (tier 1 isn't
2995    /// deferred); only tier-3+ delivery is held until scope exit. **R1.3.6.b**
2996    /// — repeated emits on the same node coalesce into a single multi-message
2997    /// delivery (one [`Message::Dirty`] for the wave + one [`Message::Data`]
2998    /// per emit, all delivered together in the per-node phase-2 pass).
2999    ///
3000    /// Nested `batch()` calls share the outer wave; only the outermost call
3001    /// drives the drain. Re-entrant calls from inside an `emit`/fn (the wave
3002    /// engine's own `s.in_tick` re-entrance) compose with this method
3003    /// transparently — they observe `in_tick = true` and skip drain just
3004    /// like nested `batch()`.
3005    ///
3006    /// On panic inside `f`, the `BatchGuard` returned by the internal
3007    /// `begin_batch` call drops normally and discards pending tier-3+ work
3008    /// (subscribers do not observe the half-built wave). See
3009    /// [`Core::begin_batch`] for the RAII variant if you need explicit control
3010    /// over the scope boundary.
3011    pub fn batch<F>(&self, f: F)
3012    where
3013        F: FnOnce(),
3014    {
3015        let _guard = self.begin_batch();
3016        f();
3017    }
3018
3019    /// RAII batch handle — opens a wave when constructed, drains on drop.
3020    ///
3021    /// Mirrors the closure-based [`Self::batch`] but exposes the scope
3022    /// boundary so callers can compose batches with non-`FnOnce` control
3023    /// flow (e.g. async-state-machine code paths, or splitting setup and
3024    /// drain across helper functions).
3025    ///
3026    /// ```ignore
3027    /// let g = core.begin_batch();
3028    /// core.emit(state_a, h1);
3029    /// core.emit(state_b, h2);
3030    /// drop(g); // wave drains here
3031    /// ```
3032    ///
3033    /// Like the closure form, nested `begin_batch` calls share the outer
3034    /// wave (only the outermost guard drains).
3035    ///
3036    /// # Panics
3037    ///
3038    /// Panics if the registry-epoch retry-validate loop exceeds
3039    /// [`crate::subgraph::MAX_LOCK_RETRIES`] iterations — pathological
3040    /// concurrent `register` / `set_deps` activity racing with
3041    /// closure-form batch entry. Unreachable in correct call paths.
3042    #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3043    pub fn begin_batch(&self) -> BatchGuard {
3044        // Slice Y1 / Phase E (2026-05-08): closure-form batch has no known
3045        // seed; per session-doc Q7 / D092 it MUST serialize against every
3046        // currently-existing partition. Acquire each partition's
3047        // `wave_owner` in ascending [`SubgraphId`] order via the retry-
3048        // validate primitive. Same-thread re-entry passes through each
3049        // ReentrantMutex transparently; cross-thread waves on any of the
3050        // touched partitions block until our `wave_guards` drop.
3051        //
3052        // **QA-fix #2 (2026-05-09) — registry epoch retry-validate:** a
3053        // concurrent `register` / `set_deps`-driven union/split between
3054        // our `all_partitions_lock_boxes()` snapshot and the post-
3055        // acquire epoch read changes the partition set. We then retry
3056        // the whole acquire with the new snapshot. Without this, a
3057        // partition added after our snapshot would not be held by our
3058        // batch — breaking the closure-form's "all-partitions
3059        // serialization" contract.
3060        //
3061        // Trade-off (documented v1 contract): closure-form batch is the
3062        // serialization point under per-partition parallelism. Per-seed
3063        // entry points (`Core::subscribe`, [`Self::begin_batch_for`])
3064        // acquire only the touched partitions and run truly parallel
3065        // for disjoint partitions.
3066        for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
3067            let epoch_before = self.registry.lock().epoch();
3068            let partition_boxes = self.all_partitions_lock_boxes();
3069            let mut wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]> = SmallVec::new();
3070            for (sid, _box) in &partition_boxes {
3071                // Use the partition's root NodeId as the lock_for retry
3072                // seed. SubgraphId.raw() == root NodeId.raw(); the root
3073                // is always registered in the X5 / Phase-E substrate
3074                // (cleanup_node is gated, Phase G activates).
3075                let representative = crate::handle::NodeId::new(sid.raw());
3076                wave_guards.push(self.partition_wave_owner_lock_arc(representative));
3077            }
3078            // Post-acquire epoch read. If unchanged, our snapshot is
3079            // still authoritative — every existing partition was held
3080            // throughout. If changed, drop guards and retry.
3081            let epoch_after = self.registry.lock().epoch();
3082            if epoch_after == epoch_before {
3083                return self.begin_batch_with_guards(wave_guards);
3084            }
3085            // Drop guards lock-released so retries don't accumulate.
3086            drop(wave_guards);
3087            std::thread::yield_now();
3088        }
3089        panic!(
3090            "Core::begin_batch: exceeded {} retries — pathological concurrent \
3091             register/union/split activity racing with closure-form batch entry",
3092            crate::subgraph::MAX_LOCK_RETRIES
3093        );
3094    }
3095
3096    /// Begin a batch scoped to the partitions transitively touched from
3097    /// `seed`. Walks `s.children` (downstream cascade) + `meta_companions`
3098    /// (R1.3.9.d TEARDOWN cascade) starting at `seed`, collects every
3099    /// reachable partition, and acquires each in ascending
3100    /// [`crate::subgraph::SubgraphId`] order via
3101    /// [`Core::partition_wave_owner_lock_arc`].
3102    ///
3103    /// Two threads with disjoint touched-partition sets run truly
3104    /// parallel — the per-partition `wave_owner` mutexes don't block
3105    /// each other. This is the canonical Y1 parallelism win for
3106    /// per-seed wave-driving entry points (subscribe, emit, pause,
3107    /// resume, invalidate, complete, error, teardown,
3108    /// set_deps push-on-subscribe).
3109    ///
3110    /// **QA-fix #2 (2026-05-09):** retry-validate the touched-partition
3111    /// set against the registry epoch — same protection as
3112    /// [`Self::begin_batch`] but scoped to a per-seed touched set
3113    /// rather than every partition. Conservative: any registry
3114    /// mutation (even on a partition unrelated to seed's touched set)
3115    /// triggers a retry. This avoids a precise "did MY touched set
3116    /// change?" check at the cost of occasional spurious retries.
3117    ///
3118    /// # Panics
3119    ///
3120    /// Panics if the registry-epoch retry-validate loop exceeds
3121    /// [`crate::subgraph::MAX_LOCK_RETRIES`] iterations, OR if
3122    /// [`Core::partition_wave_owner_lock_arc`] panics on an
3123    /// unregistered seed. Both are unreachable in correct call paths
3124    /// (P12 invariant guarantees registry membership matches
3125    /// `s.nodes`).
3126    ///
3127    /// Slice Y1 / Phase E (2026-05-08); QA-fix #2 (2026-05-09).
3128    #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3129    pub fn begin_batch_for(&self, seed: crate::handle::NodeId) -> BatchGuard {
3130        for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
3131            let epoch_before = self.registry.lock().epoch();
3132            let touched = self.compute_touched_partitions(seed);
3133            let mut wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]> = SmallVec::new();
3134            for sid in &touched {
3135                let representative = crate::handle::NodeId::new(sid.raw());
3136                wave_guards.push(self.partition_wave_owner_lock_arc(representative));
3137            }
3138            let epoch_after = self.registry.lock().epoch();
3139            if epoch_after == epoch_before {
3140                return self.begin_batch_with_guards(wave_guards);
3141            }
3142            drop(wave_guards);
3143            std::thread::yield_now();
3144        }
3145        panic!(
3146            "Core::begin_batch_for(seed={seed:?}): exceeded {} retries — \
3147             pathological concurrent register/union/split activity racing \
3148             with per-seed batch entry",
3149            crate::subgraph::MAX_LOCK_RETRIES
3150        );
3151    }
3152
3153    /// Internal helper: claim `in_tick` and assemble a [`BatchGuard`]
3154    /// with the supplied (already-acquired) partition wave-owner guards.
3155    /// `wave_guards` MUST be in ascending [`crate::subgraph::SubgraphId`]
3156    /// order (the canonical lock-acquisition order) — both
3157    /// [`Self::begin_batch`] (all-partitions) and
3158    /// [`Self::begin_batch_for`] (touched-partitions) construct the
3159    /// vector in that order before calling here.
3160    fn begin_batch_with_guards(
3161        &self,
3162        wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]>,
3163    ) -> BatchGuard {
3164        // /qa F1 reverted (2026-05-10): `in_tick` lives on CoreState
3165        // (per-Core, cross-thread visible). Per-thread placement broke
3166        // cross-Core same-thread BatchGuard isolation — a thread holding
3167        // a live BatchGuard on Core-A and starting a wave on Core-B
3168        // would see ws.in_tick=true (set by Core-A) → Core-B's writes
3169        // get drained by Core-A's binding. Per-Core placement on
3170        // CoreState restores cross-Core isolation. Cost: one state lock
3171        // acquire on outermost batch entry (small; negligible vs the
3172        // wave's drain).
3173        let owns_tick = {
3174            let mut s = self.lock_state();
3175            let was_in = s.in_tick;
3176            if !was_in {
3177                s.in_tick = true;
3178            }
3179            !was_in
3180        };
3181        // D1 patch (2026-05-09): defensive wave-start clear of the
3182        // per-thread Slice G tier3 tracker on outermost owning entry.
3183        // The thread-local is cleared at outermost BatchGuard drop on
3184        // both success + panic paths; this start-clear is belt-and-
3185        // suspenders against panic paths that bypass Drop (catch_unwind
3186        // can interleave with thread reuse — e.g. cargo's test-runner
3187        // thread pool — and propagate stale entries from a prior
3188        // panicked test's wave that didn't fully unwind through
3189        // BatchGuard::drop).
3190        if owns_tick {
3191            tier3_clear();
3192            // Q-beyond Sub-slice 1 (D108, 2026-05-09): defensive wave-start
3193            // clear of WaveState's non-retain-holding fields. Mirrors the
3194            // tier3 defensive-clear above. Retain-holding fields
3195            // (wave_cache_snapshots / deferred_handle_releases) MUST be
3196            // empty here — outermost BatchGuard::drop drains them on both
3197            // success + panic paths.
3198            wave_state_clear_outermost();
3199        }
3200        BatchGuard {
3201            core: self.clone(),
3202            owns_tick,
3203            wave_guards,
3204            _not_send: std::marker::PhantomData,
3205        }
3206    }
3207}
3208
3209/// RAII guard returned by [`Core::begin_batch`].
3210///
3211/// While alive, suppresses per-emit wave drains — multiple `emit` /
3212/// `complete` / `error` / `teardown` / `invalidate` calls coalesce into one
3213/// wave. On drop:
3214/// - Outermost guard: drains the wave (fires sinks, runs cleanup, clears
3215///   in-tick).
3216/// - Nested guard (an outer `BatchGuard` or an in-progress wave already owns
3217///   the in-tick flag): silently no-ops.
3218///
3219/// On thread panic during the closure body, the drop path discards pending
3220/// tier-3+ delivery rather than firing sinks (avoids cascading panics).
3221/// Subscribers observe **no tier-3+ delivery for the panicked wave**.
3222/// State-node cache writes that already executed inside the closure are
3223/// rolled back via wave-cache snapshots — `cache_of(s)` returns the pre-
3224/// panic value. The atomicity guarantee covers both sink-observability and
3225/// cache state.
3226///
3227/// # Thread safety
3228///
3229/// `BatchGuard` is **`!Send`** by design. `begin_batch` claims the
3230/// per-`Core` `in_tick` flag AND the per-partition `wave_owner`
3231/// re-entrant mutex(es) on the calling thread; sending the guard to
3232/// another thread and dropping it there would clear `in_tick` and
3233/// release the wave-owner guards from a different thread than the
3234/// one that acquired them, breaking both the thread-local "I own
3235/// the wave scope" semantic and `parking_lot::ReentrantMutex`'s
3236/// ownership invariant. The `wave_guards` field is a `SmallVec` of
3237/// `!Send` `ArcReentrantMutexGuard<()>`; the `PhantomData<*const ()>`
3238/// marker is belt-and-suspenders.
3239///
3240/// Slice Y1 / Phase E (2026-05-08): the field migrated from a single
3241/// `ArcReentrantMutexGuard` (legacy Core-global `wave_owner`) to a
3242/// `SmallVec` of partition wave-owner guards. Closure-form
3243/// `begin_batch` acquires every current partition (serialization
3244/// point); `begin_batch_for(seed)` acquires only the transitively-
3245/// touched partitions (parallel for disjoint sets).
3246///
3247/// ```compile_fail
3248/// use graphrefly_core::{BatchGuard, BindingBoundary, Core, DepBatch, FnId, FnResult, HandleId, NodeId};
3249/// use std::sync::Arc;
3250///
3251/// struct Stub;
3252/// impl BindingBoundary for Stub {
3253///     fn invoke_fn(&self, _: NodeId, _: FnId, _: &[DepBatch]) -> FnResult {
3254///         FnResult::Noop { tracked: None }
3255///     }
3256///     fn custom_equals(&self, _: FnId, _: HandleId, _: HandleId) -> bool { false }
3257///     fn release_handle(&self, _: HandleId) {}
3258/// }
3259/// fn requires_send<T: Send>(_: T) {}
3260/// let core = Core::new(Arc::new(Stub) as Arc<dyn BindingBoundary>);
3261/// let guard = core.begin_batch();
3262/// requires_send(guard); // <- compile_fail: BatchGuard is !Send.
3263/// ```
3264#[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
3265pub struct BatchGuard {
3266    core: Core,
3267    owns_tick: bool,
3268    /// Re-entrant mutex guards held for the wave's duration. One entry
3269    /// per touched partition's `wave_owner`, in ascending
3270    /// [`crate::subgraph::SubgraphId`] order. Drop releases each guard
3271    /// (any order — `parking_lot::ReentrantMutex` doesn't care since all
3272    /// are held by the same thread). Cross-thread waves on any of the
3273    /// held partitions block until our scope ends; cross-thread waves
3274    /// on partitions NOT in this vector run truly parallel — the
3275    /// canonical Y1 parallelism property.
3276    ///
3277    /// Each `ArcReentrantMutexGuard<()>` is `!Send`, so the `SmallVec`
3278    /// (and thus `BatchGuard`) is `!Send` at the type level — sending
3279    /// across threads would violate `parking_lot::ReentrantMutex`'s
3280    /// thread-ownership invariant.
3281    wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]>,
3282    _not_send: std::marker::PhantomData<*const ()>,
3283}
3284
3285impl Drop for BatchGuard {
3286    fn drop(&mut self) {
3287        if !self.owns_tick {
3288            return;
3289        }
3290        if std::thread::panicking() {
3291            // Discard pending wave work to avoid firing sinks during
3292            // unwind (sink panic during unwind would abort the process).
3293            //
3294            // Refcount discipline: pending_notify entries (or any
3295            // already-collected deferred_handle_releases from a partial
3296            // drain) hold a queue_notify-time retain on every payload
3297            // handle. Release them here so the discard doesn't leak.
3298            //
3299            // Wave-cache snapshots restore pre-panic cache values so the
3300            // atomicity guarantee covers state, not just observability.
3301            let (pending, deferred_releases, restored_releases) = {
3302                let mut s = self.core.lock_state();
3303                // Q-beyond Sub-slice 1 + 2 + 3 (D108, 2026-05-09):
3304                // WaveState borrowed alongside state for panic-discard
3305                // cleanup. The WaveState borrow is per-thread,
3306                // independent of state. Sub-slice 2 moved pending_fires
3307                // + pending_notify out of CoreState into WaveState;
3308                // sub-slice 3 moved deferred_flush_jobs + deferred_cleanup_hooks
3309                // + pending_wipes + invalidate_hooks_fired_this_wave;
3310                // /qa F1+F2 (2026-05-10) reverted in_tick + currently_firing
3311                // back to CoreState (cross-Core / cross-thread visibility
3312                // required) — those clear paths are in CoreState::clear_wave_state.
3313                let result = with_wave_state(|ws| {
3314                    let pending = std::mem::take(&mut ws.pending_notify);
3315                    let _: DeferredJobs = std::mem::take(&mut ws.deferred_flush_jobs);
3316                    ws.pending_fires.clear();
3317                    let restored = self.core.restore_wave_cache_snapshots(&mut s, ws);
3318                    // clear_wave_state pushes batch-handle releases into
3319                    // ws.deferred_handle_releases, so take ws's queue AFTER
3320                    // the clear.
3321                    s.clear_wave_state(ws);
3322                    ws.clear_wave_state();
3323                    let deferred_releases = std::mem::take(&mut ws.deferred_handle_releases);
3324                    // Slice E2 (D061): panic-discard wave drops queued
3325                    // OnInvalidate cleanup hooks SILENTLY. Bindings using
3326                    // OnInvalidate for external-resource cleanup MUST
3327                    // idempotent-cleanup at process exit / next successful
3328                    // invalidate. Mirrors A3 `pending_pause_overflow`
3329                    // panic-discard precedent.
3330                    let _: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)> =
3331                        std::mem::take(&mut ws.deferred_cleanup_hooks);
3332                    // Slice E2 /qa Q2(b) (D069): same panic-discard discipline
3333                    // for the eager-wipe queue. A panic-discarded wave drops
3334                    // queued `wipe_ctx` fires silently; the binding-side
3335                    // `NodeCtxState` entry remains until the next successful
3336                    // terminate-with-no-subs cycle (or until `Core` drops).
3337                    // This mirrors D061's external-resource-cleanup gap and
3338                    // is documented similarly.
3339                    let _: Vec<crate::handle::NodeId> = std::mem::take(&mut ws.pending_wipes);
3340                    (pending, deferred_releases, restored)
3341                });
3342                // /qa F1 reverted (2026-05-10): in_tick lives on CoreState.
3343                // Cleared after the WaveState borrow drops; lock is still
3344                // held, so cross-thread observers see in_tick=false
3345                // atomic with the wave-state cleanup completing.
3346                s.in_tick = false;
3347                result
3348            };
3349            // Lock dropped — release retains lock-released so the binding
3350            // can't deadlock against an internal binding mutex.
3351            for entry in pending.values() {
3352                for msg in entry.iter_messages() {
3353                    if let Some(h) = msg.payload_handle() {
3354                        self.core.binding.release_handle(h);
3355                    }
3356                }
3357            }
3358            for h in deferred_releases {
3359                self.core.binding.release_handle(h);
3360            }
3361            for h in restored_releases {
3362                self.core.binding.release_handle(h);
3363            }
3364            // D1 patch (2026-05-09): clear the per-thread Slice G tier3
3365            // tracker on outermost wave-end (panic-discard path). The
3366            // thread-local outlives the BatchGuard otherwise — cargo's
3367            // thread reuse across tests would propagate stale entries.
3368            tier3_clear();
3369            return;
3370        }
3371        // Successful drain — drain_and_flush manages its own locking.
3372        self.core.drain_and_flush();
3373        // Wave cleanup + extract deferred jobs under the lock.
3374        let (jobs, releases, cleanup_hooks, pending_wipes, snapshot_releases) = {
3375            let mut s = self.core.lock_state();
3376            // Q-beyond Sub-slice 1 + 3 (D108, 2026-05-09): WaveState
3377            // borrowed alongside state for wave-end cleanup. Per-thread;
3378            // independent of state. Sub-slice 3 moved deferred_* drains
3379            // into WaveState. /qa F1+F2 (2026-05-10) reverted in_tick +
3380            // currently_firing back to CoreState — clear via
3381            // CoreState::clear_wave_state under the held state lock.
3382            let result = with_wave_state(|ws| {
3383                s.clear_wave_state(ws);
3384                ws.clear_wave_state();
3385                // /qa A1 (2026-05-09) discipline preserved: drain snapshot
3386                // retains under lock, release lock-released below to avoid
3387                // binding re-entrance under held mutex / borrow.
3388                let snapshot_releases = Core::drain_wave_cache_snapshots(ws);
3389                // `drain_deferred` takes `deferred_flush_jobs` +
3390                // `deferred_handle_releases` (incl. rotation releases pushed
3391                // by `clear_wave_state` above) + Slice E2
3392                // `deferred_cleanup_hooks` + Slice E2 /qa Q2(b)
3393                // `pending_wipes` — all from WaveState post-Sub-slice-3.
3394                let (jobs, releases, hooks, wipes) = Core::drain_deferred(&mut s, ws);
3395                (jobs, releases, hooks, wipes, snapshot_releases)
3396            });
3397            // /qa F1 reverted (2026-05-10): in_tick lives on CoreState.
3398            // Cleared after the WaveState borrow drops; lock still held.
3399            s.in_tick = false;
3400            result
3401        };
3402        // Lock dropped — fire deferred sinks + release retains + fire
3403        // cleanup hooks (Slice E2 OnInvalidate, D060 catch_unwind drain)
3404        // + fire eager wipes (D069).
3405        self.core
3406            .fire_deferred(jobs, releases, cleanup_hooks, pending_wipes);
3407        // /qa A1 fix (2026-05-09): release wave_cache_snapshots retains
3408        // lock-released. Pre-A1 these were released inside the held
3409        // state + cross_partition locks; binding finalizers re-entering
3410        // Core would deadlock against either mutex. Drained earlier
3411        // under the lock; released here after both mutexes dropped and
3412        // sinks have fired.
3413        for h in snapshot_releases {
3414            self.core.binding.release_handle(h);
3415        }
3416        // D1 patch (2026-05-09): clear the per-thread Slice G tier3
3417        // tracker at outermost wave-end (success path). Mirrors the
3418        // panic-discard branch above. Thread-local outlives BatchGuard
3419        // by default; cargo's thread-reuse across tests would propagate
3420        // stale entries. Cleared after sinks fire (sink callbacks may
3421        // re-enter Core via emit and could read the tier3 set
3422        // mid-wave; the wave is over here so clearing is safe).
3423        tier3_clear();
3424        // QA-fix group 2 (2026-05-09): explicitly drop the wave guards
3425        // in REVERSE acquisition order. `parking_lot::ReentrantMutex`
3426        // doesn't care about release order for same-thread holders, but
3427        // a future migration to a non-reentrant lock (or one with a
3428        // Drop side-effect tied to ordering) would silently break if we
3429        // relied on `SmallVec`'s default forward-iteration drop. The
3430        // ascending-acquire / descending-release pattern is the
3431        // canonical lock-discipline shape.
3432        while let Some(guard) = self.wave_guards.pop() {
3433            drop(guard);
3434        }
3435    }
3436}