Skip to main content

graphrefly_core/
batch.rs

1//! Wave engine — drain loop, fire selection, emission commit, sink dispatch.
2//!
3//! Ports the wave-engine portion of the handle-protocol prototype
4//! (`~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`).
5//! Sibling to [`super::node`]; the dispatcher's other concerns
6//! (registration, subscription, pause/resume, terminal cascade,
7//! `set_deps`) live there.
8//!
9//! # Wave engine entry points
10//!
11//! - [`Core::run_wave`] — wave entry. Claims `in_tick` under the state lock,
12//!   runs `op` lock-released, then drains all transitive fn-fires and
13//!   flushes per-subscriber notifications. Each fn-fire iteration drops
14//!   the state lock around `BindingBoundary::invoke_fn` so user fn callbacks
15//!   can re-enter Core safely.
16//! - [`Core::drain_and_flush`] — drain phase + flush phase. Acquires/drops
17//!   the state lock per iteration around `invoke_fn`.
18//! - [`Core::commit_emission`] — equals-substitution + DIRTY/DATA/RESOLVED
19//!   queueing + child propagation. `&self`-only; bracket-fires
20//!   `BindingBoundary::custom_equals` lock-released.
21//! - [`Core::queue_notify`] — per-subscriber message queueing with
22//!   pause-buffer routing. Snapshots the subscriber list at first-touch-
23//!   per-wave so late subscribers (installed mid-wave between drain
24//!   iterations) don't receive duplicate deliveries from messages already
25//!   queued before they subscribed.
26//! - [`Core::deliver_data_to_consumer`] — single-edge propagation; marks
27//!   the consumer for fn-fire if its tracked-deps set is satisfied.
28//!   Called from `commit_emission`, plus `activate_derived` and
29//!   `set_deps` in [`super::node`].
30//!
31//! # Re-entrance discipline (Slice A close — M1 fully lock-released)
32//!
33//! - **Wave-end sink fires** drop the state lock first (Slice A-bigger
34//!   discipline).
35//! - **`BindingBoundary::invoke_fn`** in `fire_fn` fires lock-released —
36//!   user fn callbacks may re-enter `Core::emit` / `pause` / `resume` /
37//!   `invalidate` / `complete` / `error` / `teardown` and run a nested
38//!   wave (the existing `s.in_tick` re-entrance gate composes
39//!   transparently).
40//! - **`BindingBoundary::custom_equals`** in `commit_emission`'s equals
41//!   check fires lock-released.
42//! - **Subscribe-time handshake** is the one remaining lock-held callback.
43//!   It now fires per-tier (`[Start]`, `[Data(v)]`, `[Complete|Error]`,
44//!   `[Teardown]`) as separate sink calls, matching the canonical R1.3.5.a
45//!   tier-split. Re-entrance from a handshake sink callback panics with
46//!   the [`reentrance_guard`] diagnostic.
47
48use std::cell::RefCell;
49use std::sync::Arc;
50
51use ahash::AHashSet;
52use indexmap::map::Entry;
53
54use smallvec::SmallVec;
55
56use crate::boundary::{DepBatch, FnEmission, FnResult};
57use crate::handle::{HandleId, NodeId, NO_HANDLE};
58use crate::message::Message;
59use crate::node::{Core, CoreState, EqualsMode, OperatorOp, Sink, TerminalKind};
60
61// Slice G (R1.3.2.d / R1.3.3.a) per-thread tier-3-emit tracker.
62//
63// **Wave scope = thread-local.** GraphReFly's wave-engine guarantees
64// that every emit at a given node within a single wave runs on the
65// same thread (the thread that holds the partition's `wave_owner`
66// `parking_lot::ReentrantMutex` — cross-thread emits at a node BLOCK
67// on that mutex and so always land in the OTHER thread's wave). A
68// wave is bounded above by the outermost `BatchGuard` drop on its
69// originating thread. Together this means a per-thread
70// `AHashSet<NodeId>` is the natural placement for "has node X already
71// emitted a tier-3 message in this wave?" — the set's lifetime
72// exactly matches the wave's, with no cross-thread or cross-wave
73// contamination.
74//
75// **History (D1 patch, 2026-05-09):** previously placed on
76// `crate::subgraph::SubgraphLockBox::state` per-partition (Q3 v1).
77// That placement was robust to per-partition wave parallelism but
78// vulnerable to mid-wave cross-thread `set_deps` partition splits:
79// thread A is mid-wave on partition P (wave_owner held) but between
80// fn fires (`currently_firing` empty); thread B's `set_deps` acquires
81// the state lock, P13's `currently_firing.is_empty()` check
82// short-circuits, the split proceeds, and X migrates from P to a
83// fresh orphan-side partition with an empty
84// `tier3_emitted_this_wave`. Thread A's subsequent emit at X then
85// mis-detects "first emit" and queues a Resolved alongside the prior
86// Data — R1.3.3.a violation. Thread-local placement is immune to this
87// hazard: thread B's split doesn't touch thread A's thread-local at
88// all.
89//
90// **Lifecycle:** populated by `Core::commit_emission` /
91// `Core::commit_emission_verbatim`; cleared at the OUTERMOST
92// `BatchGuard` drop on this thread (both success and panic-discard
93// paths). Re-entrant nested waves on the same thread share the set —
94// inner-wave emits add to the same set; the outermost drop is the
95// canonical clear point. Cross-thread emits NEVER touch this thread's
96// set (they serialize on the partition wave_owner; the cross-thread
97// emit happens in the OTHER thread's emit-loop and uses the OTHER
98// thread's tier3 thread-local).
99thread_local! {
100    static TIER3_EMITTED_THIS_WAVE: RefCell<AHashSet<NodeId>> = RefCell::new(AHashSet::new());
101}
102
103/// Has `node` emitted a tier-3 (DATA / RESOLVED) message in the current
104/// wave on this thread? See [`TIER3_EMITTED_THIS_WAVE`] for the per-thread
105/// wave-scope rationale.
106fn tier3_check(node: NodeId) -> bool {
107    TIER3_EMITTED_THIS_WAVE.with(|s| s.borrow().contains(&node))
108}
109
110/// Mark `node` as having emitted a tier-3 message in the current wave on
111/// this thread. Idempotent. See [`TIER3_EMITTED_THIS_WAVE`].
112fn tier3_mark(node: NodeId) {
113    TIER3_EMITTED_THIS_WAVE.with(|s| {
114        s.borrow_mut().insert(node);
115    });
116}
117
118/// Wave-end clear of the per-thread tier3 tracker. Called from the
119/// OUTERMOST [`BatchGuard::drop`] on this thread (both success and
120/// panic-discard paths). Inner non-owning BatchGuard drops MUST NOT
121/// invoke this — the outer wave is still in flight and inner-wave marks
122/// are part of the outer wave's Slice G coalescing state.
123fn tier3_clear() {
124    TIER3_EMITTED_THIS_WAVE.with(|s| {
125        s.borrow_mut().clear();
126    });
127}
128
129/// Deferred sink-fire jobs collected during `flush_notifications`. Each
130/// entry pairs a snapshot of the sink Arcs to fire with the messages to
131/// deliver to them — one entry per (node × phase) cell with non-empty
132/// content. Drained from `CoreState` and fired lock-released.
133pub(crate) type DeferredJobs = Vec<(Vec<Sink>, Vec<Message>)>;
134
135/// Lock-released drain payload of the wave's BatchGuard:
136/// `(sink_jobs, handle_releases, OnInvalidate cleanup hooks, pending wipe_ctx fires)`.
137/// Returned by [`Core::drain_deferred`], consumed by [`Core::fire_deferred`].
138/// Sliced into a type alias to satisfy `clippy::type_complexity`.
139pub(crate) type WaveDeferred = (
140    DeferredJobs,
141    Vec<HandleId>,
142    Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
143    Vec<crate::handle::NodeId>,
144);
145
146/// One subscriber-snapshot epoch within a node's wave-end notification
147/// queue. A `PendingBatch` is opened the first time `queue_notify` runs
148/// for the node in a wave, and a fresh batch is opened whenever the node's
149/// `subscribers_revision` advances mid-wave (a new sink subscribes, an
150/// existing sink unsubscribes, or a handshake-time panic evicts an
151/// orphaned sink). All messages within one batch flush to the same sink
152/// list — the snapshot taken when the batch opened, frozen against
153/// subsequent revision bumps.
154pub(crate) struct PendingBatch {
155    /// `NodeRecord::subscribers_revision` value at the moment this batch
156    /// opened. Used by `queue_notify` to decide append-to-last-batch vs
157    /// open-fresh-batch on every push.
158    pub(crate) snapshot_revision: u64,
159    pub(crate) sinks: Vec<Sink>,
160    pub(crate) messages: Vec<Message>,
161}
162
163/// Per-node wave-end notification queue, structured as one or more
164/// subscriber-snapshot epochs (`batches`). The common case (no
165/// mid-wave subscribe / unsubscribe at this node) keeps a single
166/// inline batch — `SmallVec<[_; 1]>` keeps that allocation-free.
167///
168/// **Slice X4 / D2 (2026-05-08):** the prior shape was a single
169/// `(sinks, messages)` pair per node — the snapshot froze on first
170/// `queue_notify` and was reused for every subsequent emit to the same
171/// node in the wave. That caused the documented late-subscriber +
172/// multi-emit-per-wave gap (R1.3.5.a divergence): a sub installed
173/// between two emits to the same node was invisible to the second
174/// emit's flush slice. The revision-tracked batch list resolves it —
175/// late subs land in a fresh batch that frozenly carries them, while
176/// pre-subscribe batches retain their original snapshot so the new
177/// sub doesn't double-receive earlier emits via flush AND handshake.
178pub(crate) struct PendingPerNode {
179    pub(crate) batches: SmallVec<[PendingBatch; 1]>,
180}
181
182impl PendingPerNode {
183    /// Iterate every queued message for this node across all batches in
184    /// arrival order. Used by R1.3.3.a invariant assertions and the
185    /// auto-resolve / Slice-G coalescing tier-3-presence checks, which
186    /// reason about wave-content per node, not per batch.
187    pub(crate) fn iter_messages(&self) -> impl Iterator<Item = &Message> + '_ {
188        self.batches.iter().flat_map(|b| b.messages.iter())
189    }
190
191    /// Mutable counterpart for `iter_messages`. Used by
192    /// `rewrite_prior_resolved_to_data` to in-place rewrite Resolved
193    /// entries to Data when a wave detects a multi-emit case after the
194    /// fact.
195    pub(crate) fn iter_messages_mut(&mut self) -> impl Iterator<Item = &mut Message> + '_ {
196        self.batches.iter_mut().flat_map(|b| b.messages.iter_mut())
197    }
198}
199
200/// RAII helper for the A6 reentrancy guard (Slice F, 2026-05-07).
201///
202/// Pushes `node_id` onto [`CoreState::currently_firing`] on construction,
203/// pops it on Drop. [`Core::set_deps`] consults the stack and rejects
204/// `set_deps(N, ...)` from inside N's own fn-fire with
205/// [`crate::node::SetDepsError::ReentrantOnFiringNode`] — closing the
206/// D1 hazard where Phase-1's snapshot of `dep_handles` would refer to
207/// a different dep ordering than Phase-3's `tracked` storage.
208///
209/// Wraps the lock-released `invoke_fn` (and operator-equivalent FFI
210/// callbacks like `project_each` / `predicate_each`). Drop fires even
211/// on panic, so the stack stays balanced under user-fn unwinds.
212///
213/// Membership semantics (NOT strict LIFO): the only consumer of
214/// `currently_firing` is `Core::set_deps`'s reentrancy check, which uses
215/// `contains(&n)` — a set-membership test. Drop pops the right-most
216/// matching `node_id` via `rposition` + `swap_remove`. For a stack like
217/// `[A, B, A]` (A's fn re-enters B, B's fn re-enters A), B's drop pops
218/// the SECOND A (index 1) via swap_remove, leaving `[A, A]` — the
219/// physical order of the remaining As may not match construction order,
220/// but membership is preserved. If a future call site needs strict LIFO
221/// (e.g. "pop the most recently fired node"), switch to `pop()` + assert
222/// the popped value equals `self.node_id`. (QA A6, 2026-05-07)
223pub(crate) struct FiringGuard {
224    core: Core,
225    node_id: NodeId,
226    /// Phase H+ option (d) /qa N1(a) widened variant (2026-05-09):
227    /// whether this guard participates in the per-thread
228    /// `IN_PRODUCER_BUILD` accounting. Producer-pattern operator
229    /// activations (`zip` / `concat` / `race` / `take_until` /
230    /// `switch_map` / `exhaust_map` / `concat_map` / `merge_map`
231    /// — all `is_producer()`) DO participate: they SUPPRESS the H+
232    /// check during their build/project closure because those
233    /// closures legitimately subscribe to upstream sources
234    /// cross-partition (operator-internal activation-time setup,
235    /// not user-fn re-entry). Refactoring those operators to defer
236    /// inner subscribes to wave-end is the broader Phase H+ STRICT
237    /// variant scope; the limited variant carves them out via this
238    /// flag. Derived / dynamic / state user-fn fires do NOT
239    /// participate — the H+ check applies to them under the
240    /// "held non-empty AND not in producer build" gate (see
241    /// `crate::node::held_partitions` module docstring).
242    ///
243    /// **INVARIANT:** the value is captured at `FiringGuard::new`
244    /// from `NodeRecord::is_producer()` for the firing node and
245    /// must NEVER be re-derived at `Drop` time. `is_producer` is
246    /// stable for a node's lifetime per the current registration
247    /// API (a node's kind cannot change once registered), but a
248    /// future contributor adding mutability MUST honor this snapshot
249    /// to keep the `producer_build_enter` / `producer_build_exit`
250    /// pair balanced. A debug_assert in Drop verifies the snapshot
251    /// still matches when assertions are enabled.
252    is_producer_build: bool,
253}
254
255impl FiringGuard {
256    pub(crate) fn new(core: &Core, node_id: NodeId) -> Self {
257        // Detect node kind under the same lock used to push
258        // currently_firing. Producer-pattern nodes (build/project
259        // closures) suppress the H+ check; everything else
260        // (state / derived / dynamic / operators) is subject to it.
261        let is_producer = {
262            let mut s = core.lock_state();
263            s.currently_firing.push(node_id);
264            s.nodes
265                .get(&node_id)
266                .is_some_and(crate::node::NodeRecord::is_producer)
267        };
268        // Construct Self FIRST (capture the cached `is_producer_build`
269        // snapshot in the struct). Then call `producer_build_enter()`
270        // as a separate step. If a future contributor adds fallible
271        // / panicking code between Self construction and the enter
272        // call, the panic still leaves Self abandoned (no Drop runs
273        // because Self isn't bound to a name yet) and the
274        // `producer_build_enter` call hasn't been made — so no
275        // imbalance. This is the panic-safe ordering per /qa A4.
276        let guard = Self {
277            core: core.clone(),
278            node_id,
279            is_producer_build: is_producer,
280        };
281        if is_producer {
282            crate::node::producer_build_enter();
283        }
284        guard
285    }
286}
287
288impl Drop for FiringGuard {
289    fn drop(&mut self) {
290        // INVARIANT (debug-asserted): `is_producer_build` must match
291        // the node's current `is_producer()` at Drop time. If a future
292        // refactor introduces post-construction node-kind mutation,
293        // this fails loudly under debug builds — the
294        // `producer_build_enter` / `producer_build_exit` pair would
295        // otherwise become unbalanced. Per /qa A5.
296        #[cfg(debug_assertions)]
297        {
298            let s = self.core.lock_state();
299            let now_producer = s
300                .nodes
301                .get(&self.node_id)
302                .is_some_and(crate::node::NodeRecord::is_producer);
303            // Allow node-removed-mid-fire (now `is_some_and(...)` is
304            // false) — that's a benign asymmetry (the producer flag
305            // was true at construction; node removed before drop).
306            // Real concern: a node that was non-producer at
307            // construction is now reported as producer (or vice versa
308            // for an existing node).
309            if s.nodes.contains_key(&self.node_id) {
310                debug_assert_eq!(
311                    self.is_producer_build,
312                    now_producer,
313                    "FiringGuard invariant violation: node {:?} was {} at \
314                     construction but is {} at Drop. The is_producer flag \
315                     must be stable for a node's lifetime; see FiringGuard \
316                     struct docstring.",
317                    self.node_id,
318                    if self.is_producer_build {
319                        "is_producer=true"
320                    } else {
321                        "is_producer=false"
322                    },
323                    if now_producer {
324                        "is_producer=true"
325                    } else {
326                        "is_producer=false"
327                    },
328                );
329            }
330            drop(s);
331        }
332        let mut s = self.core.lock_state();
333        if let Some(pos) = s.currently_firing.iter().rposition(|n| *n == self.node_id) {
334            s.currently_firing.swap_remove(pos);
335        }
336        // else: already popped by an external rebalance — silent no-op
337        // for Drop discipline (panic-in-Drop is poison).
338        drop(s);
339        // Phase H+ pair: decrement IN_PRODUCER_BUILD IFF we incremented
340        // in `new`. Done lock-released so the next thread waiting on
341        // the state lock can proceed without our cell access on its
342        // hot path.
343        if self.is_producer_build {
344            crate::node::producer_build_exit();
345        }
346    }
347}
348
349/// Borrow the per-operator scratch slot as `&T`. Panics if the slot is
350/// uninitialized or the contained type doesn't match `T` — both are
351/// invariant violations for any `fire_op_*` helper that should only be
352/// called from `fire_operator`'s match arm for the matching variant.
353fn scratch_ref<T: crate::op_state::OperatorScratch>(s: &CoreState, node_id: NodeId) -> &T {
354    s.require_node(node_id)
355        .op_scratch
356        .as_ref()
357        .expect("op_scratch slot uninitialized for operator node")
358        .as_any_ref()
359        .downcast_ref::<T>()
360        .expect("op_scratch type mismatch")
361}
362
363/// Mutable borrow of the per-operator scratch slot. Same invariants as
364/// [`scratch_ref`].
365fn scratch_mut<T: crate::op_state::OperatorScratch>(s: &mut CoreState, node_id: NodeId) -> &mut T {
366    s.require_node_mut(node_id)
367        .op_scratch
368        .as_mut()
369        .expect("op_scratch slot uninitialized for operator node")
370        .as_any_mut()
371        .downcast_mut::<T>()
372        .expect("op_scratch type mismatch")
373}
374
375impl Core {
376    // -------------------------------------------------------------------
377    // Wave entry + drain
378    // -------------------------------------------------------------------
379
380    /// Wave entry. The caller passes a closure that performs the wave's
381    /// triggering operation (`commit_emission`, `terminate_node`, etc.).
382    /// The closure runs lock-released; closure-internal Core methods
383    /// acquire the state lock as they go.
384    ///
385    /// **Implementation:** delegates to [`Self::begin_batch`] for the
386    /// wave's RAII lifecycle. The returned `BatchGuard` holds the
387    /// `wave_owner` re-entrant mutex for the wave's duration (cross-thread
388    /// emits block; same-thread re-entry passes through), claims `in_tick`,
389    /// and on drop runs the drain + flush + sink-fire phases — OR, if the
390    /// closure panicked, the panic-discard path that restores cache
391    /// snapshots and clears in_tick. This unification gives `run_wave` the
392    /// same panic-safety guarantee as the user-facing `Core::batch`.
393    ///
394    /// **Re-entrance:** a closure invoked from inside another wave — the
395    /// inner `run_wave`'s `begin_batch` observes `in_tick=true`, the
396    /// returned guard is non-owning (`owns_tick=false`), drop is a no-op.
397    /// The outer wave's drain picks up the inner closure's queued work.
398    ///
399    /// **Lock-release discipline (Slice A close, M1):** all binding-side
400    /// callbacks except the subscribe-time handshake fire lock-released.
401    /// Sinks that re-enter Core run a nested wave; user fns that re-enter
402    /// Core run a nested wave; custom-equals oracles that re-enter Core
403    /// run a nested wave. Cross-thread emits block at `wave_owner` until
404    /// the in-flight wave's drain completes — preserving the user-facing
405    /// "emit returning means subscribers have observed" contract.
406    /// Wave entry with a known `seed` node. Acquires only the partitions
407    /// transitively touched from `seed` (downstream cascade via
408    /// `s.children` + R1.3.9.d meta-companion cascade) instead of every
409    /// current partition. The canonical Y1 parallelism win for per-seed
410    /// entry points (`Core::emit`, `Core::subscribe`'s activation,
411    /// `Core::pause` / `Core::resume` / `Core::invalidate` / `Core::complete`
412    /// / `Core::error` / `Core::teardown` / `Core::set_deps`'s
413    /// push-on-subscribe).
414    ///
415    /// Two threads with disjoint touched-partition sets run truly
416    /// parallel — they don't block each other on Core-global locks.
417    /// Same-thread re-entry passes through each partition's
418    /// `ReentrantMutex` transparently. Cross-thread emits on the SAME
419    /// partition (or any overlapping touched-partition set) serialize
420    /// per the per-partition `wave_owner` mutex, preserving the
421    /// "emit returning means subscribers have observed" contract.
422    ///
423    /// Slice Y1 / Phase E (2026-05-08).
424    pub(crate) fn run_wave_for<F>(&self, seed: crate::handle::NodeId, op: F)
425    where
426        F: FnOnce(&Self),
427    {
428        let _guard = self.begin_batch_for(seed);
429        op(self);
430    }
431
432    /// Drain retains held by `wave_cache_snapshots` and return them so
433    /// the caller can release them lock-released. Called from the
434    /// wave-success path in [`BatchGuard::drop`].
435    ///
436    /// Q2 (2026-05-09): the snapshots map moved to
437    /// [`crate::node::CrossPartitionState`]; signature takes the `cps`
438    /// guard. **/qa A1 fix (2026-05-09):** changed from in-place
439    /// `release_handle` to returning handles for lock-released drop.
440    /// Pre-A1 this function called `binding.release_handle` while the
441    /// caller still held both the `state` and `cross_partition` locks;
442    /// `release_handle` may re-enter Core via finalizers, and re-entry
443    /// under either lock would deadlock against any path that acquires
444    /// the same mutex. Now mirrors [`Self::restore_wave_cache_snapshots`]
445    /// — drain under lock, release after lock drop.
446    #[must_use]
447    pub(crate) fn drain_wave_cache_snapshots(
448        cps: &mut crate::node::CrossPartitionState,
449    ) -> Vec<HandleId> {
450        if cps.wave_cache_snapshots.is_empty() {
451            return Vec::new();
452        }
453        std::mem::take(&mut cps.wave_cache_snapshots)
454            .into_iter()
455            .map(|(_, h)| h)
456            .collect()
457    }
458
459    /// Restore cache slots from `wave_cache_snapshots` and clear the map.
460    /// Called from the wave-abort path in `BatchGuard::drop` (panic).
461    ///
462    /// For each snapshotted node:
463    ///
464    /// 1. Read the current cache (the in-flight new value).
465    /// 2. Set `cache = old_handle` (the snapshot's retained value).
466    /// 3. Release the now-unowned current cache handle.
467    ///
468    /// Returns the list of "current" handles to release outside the lock.
469    /// Q2 (2026-05-09): the snapshots map moved to
470    /// [`crate::node::CrossPartitionState`]; signature now takes both
471    /// `s` (for cache slots) and `cps` (for the snapshots map).
472    pub(crate) fn restore_wave_cache_snapshots(
473        &self,
474        s: &mut CoreState,
475        cps: &mut crate::node::CrossPartitionState,
476    ) -> Vec<HandleId> {
477        if cps.wave_cache_snapshots.is_empty() {
478            return Vec::new();
479        }
480        let snapshots = std::mem::take(&mut cps.wave_cache_snapshots);
481        let mut releases = Vec::with_capacity(snapshots.len());
482        for (node_id, old_handle) in snapshots {
483            let Some(rec) = s.nodes.get_mut(&node_id) else {
484                releases.push(old_handle);
485                continue;
486            };
487            let current = std::mem::replace(&mut rec.cache, old_handle);
488            if current != NO_HANDLE {
489                releases.push(current);
490            }
491        }
492        releases
493    }
494
495    /// Drain pending fires until quiescent, then flush wave-end notifications
496    /// to subscribers. Each fire iteration drops the state lock around the
497    /// binding's `invoke_fn` callback so user fns may re-enter Core safely.
498    ///
499    /// `&self`-only — manages its own locking. Called from [`Self::run_wave`]
500    /// and [`super::node::Core::activate_derived`] (via `run_wave`).
501    pub(crate) fn drain_and_flush(&self) {
502        let mut guard = 0u32;
503        loop {
504            // R1.3.8.c (Slice F, A3): if no fires are pending but there are
505            // queued pause-overflow ERRORs, synthesize them now. The
506            // resulting ERROR cascade may add to pending_fires (children
507            // settling their terminal state), so we loop back to drain.
508            let synth_pending = {
509                // Q2 (2026-05-09): pending_pause_overflow lives on
510                // CrossPartitionState. Lock-discipline: state → cross_partition.
511                let s = self.lock_state();
512                if s.pending_fires.is_empty() {
513                    let mut cps = self.lock_cross_partition();
514                    if cps.pending_pause_overflow.is_empty() {
515                        Vec::new()
516                    } else {
517                        std::mem::take(&mut cps.pending_pause_overflow)
518                    }
519                } else {
520                    Vec::new()
521                }
522            };
523            for entry in synth_pending {
524                // Lock-released call to the binding hook. Default impl
525                // returns None — the binding has opted out of R1.3.8.c
526                // and we fall back to silent-drop + ResumeReport.dropped.
527                let handle = self.binding.synthesize_pause_overflow_error(
528                    entry.node_id,
529                    entry.dropped_count,
530                    entry.configured_max,
531                    entry.lock_held_ns / 1_000_000,
532                );
533                if let Some(h) = handle {
534                    // Re-enter Core::error to terminate the node and
535                    // cascade. We're inside a wave (`in_tick = true`),
536                    // so error() gets a non-owning batch guard — it
537                    // doesn't try to start its own drain. The cascade
538                    // queues into our outer drain via pending_fires
539                    // and pending_notify.
540                    self.error(entry.node_id, h);
541                }
542            }
543
544            // Pick next fire under a short lock. Also re-read the configured
545            // drain cap so callers can tune via `Core::set_max_batch_drain_iterations`
546            // without restarting waves mid-flight.
547            let (next, cap, pending_size) = {
548                let s = self.lock_state();
549                if s.pending_fires.is_empty() {
550                    break;
551                }
552                let cap = s.max_batch_drain_iterations;
553                let pending_size = s.pending_fires.len();
554                let next = self.pick_next_fire(&s);
555                (next, cap, pending_size)
556            };
557            guard += 1;
558            assert!(
559                guard < cap,
560                "wave drain exceeded {cap} iterations \
561                 (pending_fires={pending_size}). Most likely cause: a runtime \
562                 cycle introduced by an operator that re-arms its own pending_fires \
563                 slot from inside `invoke_fn` (e.g. a producer that subscribes to \
564                 itself, or a fn that calls Core::emit on a node whose fn fires \
565                 the original node again). Structural cycles via set_deps are \
566                 rejected at edge-mutation time. Tune via Core::set_max_batch_drain_iterations \
567                 only with concrete evidence the workload needs more iterations."
568            );
569            let Some(next) = next else { break };
570            // fire_fn manages its own locking around invoke_fn.
571            self.fire_fn(next);
572        }
573        // Auto-resolve sweep: nodes registered in pending_auto_resolve
574        // by the RESOLVED child propagation need a Resolved if they didn't
575        // fire and settle via their own commit_emission. Check pending_notify
576        // for each candidate — if it has Dirty but no tier-3+ message, the
577        // node never settled and needs auto-Resolved. Route through
578        // queue_notify so paused nodes get the Resolved into their pause
579        // buffer.
580        let mut s = self.lock_state();
581        // Q2 (2026-05-09): pending_auto_resolve lives on CrossPartitionState.
582        // /qa A5 fix (2026-05-09): explicit scope for the cross_partition
583        // guard so it drops BEFORE the for-loop. Inside the loop,
584        // `queue_notify` re-acquires `cross_partition` for
585        // `pending_pause_overflow.push` — re-entrance on the
586        // non-reentrant `parking_lot::Mutex<CrossPartitionState>` would
587        // self-deadlock. Pre-fix relied on Rust's
588        // temporary-end-of-statement drop to release the guard between
589        // the take and the loop; refactoring the temp into a named
590        // binding (a future maintainer's natural simplification) would
591        // silently extend the lock-hold across `queue_notify` and
592        // deadlock. Explicit scope makes the lifetime load-bearing.
593        let candidates = {
594            let mut cps = self.lock_cross_partition();
595            std::mem::take(&mut cps.pending_auto_resolve)
596        };
597        for node_id in candidates {
598            let needs_resolve = s
599                .pending_notify
600                .get(&node_id)
601                .is_some_and(|entry| !entry.iter_messages().any(|m| m.tier() >= 3));
602            if needs_resolve {
603                self.queue_notify(&mut s, node_id, Message::Resolved);
604            }
605        }
606        // Final flush phase — populates deferred_flush_jobs
607        // from pending_notify (already carries per-node sink snapshots).
608        self.flush_notifications(&mut s);
609    }
610
611    /// Pick a node whose **transitive** upstream is fully settled.
612    ///
613    /// A node `id` is ready to fire iff none of its transitive ancestors
614    /// (via the `deps` chain) is currently in `pending_fires`. Diamond
615    /// glitch prevention requires transitive (not immediate-only) reasoning
616    /// — for graph `A → {B, C, E}; D = combine(B, C); F = combine(D, E)`,
617    /// a wave that fires `E` first adds `F` to `pending_fires` before `D`
618    /// is added. An immediate-only readiness check would mistakenly pick
619    /// `F` because neither `D` nor `E` are in `pending_fires` at that
620    /// moment, firing `F` against the stale activation-time `D` cache and
621    /// again later when `D` actually settles. Transitive upstream walk
622    /// catches `B`/`C` pending and correctly defers `F`.
623    ///
624    /// Cost: O(V) per candidate; worst case O(N·V) per pick. The existing
625    /// porting-deferred entry on `pick_next_fire` perf flagged this as a
626    /// future per-node `unresolved_dep_count` refactor.
627    fn pick_next_fire(&self, s: &CoreState) -> Option<NodeId> {
628        for &id in &s.pending_fires {
629            if Self::transitive_upstream_settled(s, id) {
630                return Some(id);
631            }
632        }
633        // Cycle / no eligible candidate (every node has an upstream pending,
634        // possibly via a cycle path): pick any so the drain guard advances.
635        // The drain-iteration cap will catch genuine cycles.
636        s.pending_fires.iter().copied().next()
637    }
638
639    fn transitive_upstream_settled(s: &CoreState, node_id: NodeId) -> bool {
640        let rec = s.require_node(node_id);
641        if rec.dep_count() == 0 {
642            return true;
643        }
644        let mut visited: ahash::AHashSet<NodeId> = ahash::AHashSet::new();
645        let mut stack: Vec<NodeId> = rec.dep_ids_vec();
646        while let Some(id) = stack.pop() {
647            if !visited.insert(id) {
648                continue;
649            }
650            if s.pending_fires.contains(&id) {
651                return false;
652            }
653            if let Some(r) = s.nodes.get(&id) {
654                for dep_id in r.dep_ids() {
655                    if !visited.contains(&dep_id) {
656                        stack.push(dep_id);
657                    }
658                }
659            }
660        }
661        true
662    }
663
664    /// Wave drain entry point. Dispatches via `rec.op` to either the
665    /// regular fn-fire path ([`Self::fire_regular`]) or the operator
666    /// dispatch ([`Self::fire_operator`]).
667    pub(crate) fn fire_fn(&self, node_id: NodeId) {
668        let op = {
669            let s = self.lock_state();
670            s.nodes.get(&node_id).and_then(|r| r.op)
671        };
672        match op {
673            Some(operator_op) => self.fire_operator(node_id, operator_op),
674            None => {
675                // State / Derived / Dynamic / Producer all dispatch via fn_id.
676                self.fire_regular(node_id);
677            }
678        }
679    }
680
681    /// Fire a node's fn lock-released around `invoke_fn`.
682    ///
683    /// Phase 1 (lock-held): remove from pending_fires, snapshot fn_id +
684    /// dep_records → DepBatch + kind. Skip if terminal, first-run-gate-closed,
685    /// or stateless.
686    ///
687    /// Phase 2 (lock-released): call `BindingBoundary::invoke_fn`. User fn
688    /// callbacks may re-enter Core (`emit`, `pause`, etc.) and run a nested
689    /// wave — the in_tick gate composes naturally because nested calls
690    /// observe `in_tick = true` and skip their own drain.
691    ///
692    /// Phase 3 (lock-held): mark `has_fired_once`, store dynamic-tracked,
693    /// decide between Noop+RESOLVED, single Data, or Batch.
694    ///
695    /// Phase 4: commit emissions. Single Data goes through
696    /// `commit_emission` (with equals substitution). Batch emissions are
697    /// processed in sequence — Data via `commit_emission_verbatim` (no
698    /// equals substitution per R1.3.2.d / R1.3.3.c), Complete/Error via
699    /// terminal cascade.
700    #[allow(clippy::too_many_lines)] // Slice G added Noop / Batch tier-3 guards
701    fn fire_regular(&self, node_id: NodeId) {
702        enum FireAction {
703            None,
704            SingleData(HandleId),
705            Batch(SmallVec<[FnEmission; 2]>),
706        }
707
708        // Phase 1: snapshot inputs — build DepBatch per dep from dep_records.
709        // `has_fired_once` is captured here for the Slice E2 OnRerun gate
710        // (Phase 1.5 below): the cleanup hook only fires when the fn has
711        // run at least once already in this activation cycle.
712        let prep: Option<(crate::handle::FnId, Vec<DepBatch>, bool, bool)> = {
713            let mut s = self.lock_state();
714            s.pending_fires.remove(&node_id);
715            let rec = s.require_node(node_id);
716            // Skip: terminal, first-run-gate-closed (R2.5.3 / R5.4 — partial
717            // mode opts out of the gate per D011), or stateless.
718            if rec.terminal.is_some() || (!rec.partial && rec.has_sentinel_deps()) {
719                None
720            } else {
721                rec.fn_id.map(|fn_id| {
722                    let dep_batches: Vec<DepBatch> = rec
723                        .dep_records
724                        .iter()
725                        .map(|dr| DepBatch {
726                            data: dr.data_batch.clone(),
727                            prev_data: dr.prev_data,
728                            involved: dr.involved_this_wave,
729                        })
730                        .collect();
731                    (fn_id, dep_batches, rec.is_dynamic, rec.has_fired_once)
732                })
733            }
734        };
735        let Some((fn_id, dep_batches, is_dynamic, has_fired_once)) = prep else {
736            return;
737        };
738
739        // Phase 1.5 (Slice E2 — R2.4.5 OnRerun, lock-released per D045): if
740        // the fn has fired at least once in this activation cycle, fire its
741        // OnRerun cleanup hook BEFORE the next invoke_fn re-allocates fn-
742        // local resources. First-fire is intentionally skipped — there is
743        // no prior run to clean up. Fires OUTSIDE `FiringGuard` because
744        // cleanup re-entrance is not the A6 reentrancy concern (which
745        // protects against `set_deps(self, ...)` from inside the in-flight
746        // invoke_fn). Operator nodes never reach this path (`fire_regular`
747        // is the fn-id branch of `fire_fn`; operators dispatch via
748        // `fire_operator`), so cleanup hooks correctly only fire for fn-
749        // shaped nodes (state / derived / dynamic / producer).
750        if has_fired_once {
751            self.binding
752                .cleanup_for(node_id, crate::boundary::CleanupTrigger::OnRerun);
753        }
754
755        // Phase 2: invoke fn lock-released. A6 reentrancy guard is scoped to
756        // the FFI call only — Phase 3's lock-held state mutation is not part
757        // of "currently firing" because set_deps would already block on the
758        // state lock by then. Drop on the guard pops the stack even if
759        // invoke_fn panics, keeping `currently_firing` balanced.
760        let result = {
761            let _firing = FiringGuard::new(self, node_id);
762            self.binding.invoke_fn(node_id, fn_id, &dep_batches)
763        };
764
765        // Phase 3: apply result under the lock — defensive terminal check
766        // (a sibling cascade may have terminated this node during phase 2).
767        let action: FireAction = {
768            let mut s = self.lock_state();
769            // Defensive: node may have terminated mid-phase-2 via a sibling
770            // cascade (a fn that re-entered `Core::error` on a path that
771            // cascaded here). If so, release any payload handles and no-op.
772            if s.require_node(node_id).terminal.is_some() {
773                match &result {
774                    FnResult::Data { handle, .. } => {
775                        self.binding.release_handle(*handle);
776                    }
777                    FnResult::Batch { emissions, .. } => {
778                        for em in emissions {
779                            match em {
780                                FnEmission::Data(h) | FnEmission::Error(h) => {
781                                    self.binding.release_handle(*h);
782                                }
783                                FnEmission::Complete => {}
784                            }
785                        }
786                    }
787                    FnResult::Noop { .. } => {}
788                }
789                return;
790            }
791            let rec = s.require_node_mut(node_id);
792            rec.has_fired_once = true;
793            if is_dynamic {
794                let tracked = match &result {
795                    FnResult::Data { tracked, .. }
796                    | FnResult::Noop { tracked }
797                    | FnResult::Batch { tracked, .. } => tracked.clone(),
798                };
799                if let Some(t) = tracked {
800                    rec.tracked = t.into_iter().collect();
801                }
802            }
803            match result {
804                FnResult::Noop { .. } => {
805                    // Slice G: skip Resolved if a prior emission in the same
806                    // wave already queued tier-3 (would violate R1.3.3.a).
807                    let already_dirty = s.require_node(node_id).dirty;
808                    let already_tier3 = s
809                        .pending_notify
810                        .get(&node_id)
811                        .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3));
812                    if already_dirty && !already_tier3 {
813                        self.queue_notify(&mut s, node_id, Message::Resolved);
814                    }
815                    FireAction::None
816                }
817                FnResult::Data { handle, .. } => FireAction::SingleData(handle),
818                FnResult::Batch { emissions, .. } if emissions.is_empty() => {
819                    // Empty Batch is equivalent to Noop — settle with
820                    // RESOLVED if the node was dirty (R1.3.1.a). Slice G:
821                    // skip if a prior emission already queued tier-3.
822                    let already_dirty = s.require_node(node_id).dirty;
823                    let already_tier3 = s
824                        .pending_notify
825                        .get(&node_id)
826                        .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3));
827                    if already_dirty && !already_tier3 {
828                        self.queue_notify(&mut s, node_id, Message::Resolved);
829                    }
830                    FireAction::None
831                }
832                FnResult::Batch { emissions, .. } => FireAction::Batch(emissions),
833            }
834        };
835
836        // Phase 4: commit emissions.
837        match action {
838            FireAction::None => {}
839            // Single Data — equals substitution applies (R1.3.2).
840            FireAction::SingleData(handle) => {
841                self.commit_emission(node_id, handle);
842            }
843            // Batch — process in sequence. No equals substitution
844            // (R1.3.2.d / R1.3.3.c: multi-message waves pass verbatim).
845            FireAction::Batch(emissions) => {
846                self.commit_batch(node_id, emissions);
847            }
848        }
849    }
850
851    /// Process a `FnResult::Batch` emissions sequence. Each `Data` goes
852    /// through `commit_emission_verbatim` (no equals substitution per
853    /// R1.3.2.d / R1.3.3.c). Terminal emissions (`Complete` / `Error`)
854    /// cascade per R1.3.4; processing stops at the first terminal and
855    /// remaining handles are released (R1.3.4.a: no further messages
856    /// after terminal).
857    fn commit_batch(&self, node_id: NodeId, emissions: SmallVec<[FnEmission; 2]>) {
858        let mut iter = emissions.into_iter();
859        for em in iter.by_ref() {
860            match em {
861                FnEmission::Data(handle) => {
862                    self.commit_emission_verbatim(node_id, handle);
863                }
864                FnEmission::Complete => {
865                    self.complete(node_id);
866                    break;
867                }
868                FnEmission::Error(handle) => {
869                    self.error(node_id, handle);
870                    break;
871                }
872            }
873        }
874        // Release handles from any emissions after the terminal break.
875        for em in iter {
876            match em {
877                FnEmission::Data(h) | FnEmission::Error(h) => {
878                    self.binding.release_handle(h);
879                }
880                FnEmission::Complete => {}
881            }
882        }
883    }
884
885    // -------------------------------------------------------------------
886    // Emission commit — equals-substitution lives here
887    // -------------------------------------------------------------------
888
889    /// Apply a node's emission. `&self`-only; brackets the equals check
890    /// around a lock release so `BindingBoundary::custom_equals` can re-enter
891    /// Core safely.
892    ///
893    /// Phase 1 (lock-held): defensive terminal short-circuit; snapshot
894    /// equals_mode + old cache handle.
895    ///
896    /// Phase 2 (lock-released): call `handles_equal` — `EqualsMode::Identity`
897    /// is a pure `u64` compare with no boundary call; `EqualsMode::Custom`
898    /// crosses to the binding's `custom_equals` oracle, which may re-enter
899    /// Core.
900    ///
901    /// Phase 3 (lock-held): set cache, queue Dirty + Data/Resolved into
902    /// pending_notify (which snapshots subscribers on first touch),
903    /// propagate to children.
904    // Q2 / Q3 (2026-05-09) tipped past clippy's 100-line threshold; the
905    // function is already a multi-phase wave-engine routine and breaking
906    // out the four phases would obscure the lock-discipline.
907    #[allow(clippy::too_many_lines)]
908    pub(crate) fn commit_emission(&self, node_id: NodeId, new_handle: HandleId) {
909        assert!(
910            new_handle != NO_HANDLE,
911            "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
912        );
913
914        // Phase 1: terminal short-circuit + snapshot equals/cache.
915        let snapshot = {
916            let s = self.lock_state();
917            let rec = s.require_node(node_id);
918            if rec.terminal.is_some() {
919                drop(s);
920                self.binding.release_handle(new_handle);
921                return;
922            }
923            (rec.cache, rec.equals)
924        };
925        let (old_handle, equals_mode) = snapshot;
926
927        // Slice G (2026-05-07): R1.3.2.d says equals substitution only
928        // fires for SINGLE-DATA waves at one node. Detect "this is a
929        // subsequent emit in the same wave at this node" via the
930        // per-thread `TIER3_EMITTED_THIS_WAVE` thread-local
931        // (D1 patch, 2026-05-09 — moved off per-partition state to be
932        // robust against mid-wave cross-thread `set_deps` partition
933        // splits). If set → multi-emit wave: skip equals, queue Data
934        // verbatim, retroactively rewrite any prior Resolved (queued by
935        // an earlier same-value emit's equals match) to Data using the
936        // wave-start cache snapshot. Outside batch / first emit:
937        // standard per-emit equals path. Thread-local lookup is
938        // ~5ns and lock-free.
939        let is_subsequent_emit_in_wave = tier3_check(node_id);
940
941        if is_subsequent_emit_in_wave {
942            // Multi-emit wave detected. Skip equals, queue Data verbatim.
943            // Also rewrite any prior Resolved entries to Data using the
944            // wave-start cache snapshot.
945            self.rewrite_prior_resolved_to_data(node_id);
946            self.commit_emission_verbatim(node_id, new_handle);
947            return;
948        }
949
950        // Phase 2: equals check (lock-released for Custom).
951        let is_data = !self.handles_equal_lock_released(equals_mode, old_handle, new_handle);
952
953        // Phase 3: apply emission under the lock. Defensive terminal
954        // re-check — a concurrent cascade between phase 2 and phase 3
955        // could have terminated the node.
956        let mut s = self.lock_state();
957        if s.require_node(node_id).terminal.is_some() {
958            drop(s);
959            self.binding.release_handle(new_handle);
960            return;
961        }
962
963        // R1.3.1.a condition (b): synthesize DIRTY only if node not already
964        // dirty from an earlier emission in the same wave.
965        let already_dirty = s.require_node(node_id).dirty;
966        s.require_node_mut(node_id).dirty = true;
967        if !already_dirty {
968            self.queue_notify(&mut s, node_id, Message::Dirty);
969        }
970
971        if is_data {
972            // P3 (Slice A close /qa): re-read CURRENT cache. Same-thread
973            // re-entry from a `custom_equals` oracle that called back into
974            // `Core::emit` on this same node during phase 2's lock-released
975            // equals check could have advanced the cache between phase 1's
976            // snapshot (`old_handle`) and this point.
977            let current_cache = s.require_node(node_id).cache;
978            // Q2 (2026-05-09): wave_cache_snapshots lives on CrossPartitionState.
979            let snapshot_taken = if s.in_tick && current_cache != NO_HANDLE {
980                use std::collections::hash_map::Entry;
981                let mut cps = self.lock_cross_partition();
982                match cps.wave_cache_snapshots.entry(node_id) {
983                    Entry::Vacant(slot) => {
984                        slot.insert(current_cache);
985                        true
986                    }
987                    Entry::Occupied(_) => false,
988                }
989            } else {
990                false
991            };
992            s.require_node_mut(node_id).cache = new_handle;
993            if current_cache != NO_HANDLE && !snapshot_taken {
994                self.binding.release_handle(current_cache);
995            }
996            // Slice E1 (R2.6.5 / Lock 6.G): push DATA into the replay
997            // buffer if the node opted in. RESOLVED entries are NOT
998            // buffered (canonical "DATA only").
999            self.push_replay_buffer(&mut s, node_id, new_handle);
1000            // Slice G (D1 patch, 2026-05-09): mark this node as having
1001            // emitted tier-3 in this wave on the per-thread tracker.
1002            tier3_mark(node_id);
1003            self.queue_notify(&mut s, node_id, Message::Data(new_handle));
1004            // Propagate to children
1005            let child_ids: Vec<NodeId> = s
1006                .children
1007                .get(&node_id)
1008                .map(|c| c.iter().copied().collect())
1009                .unwrap_or_default();
1010            for child_id in child_ids {
1011                let dep_idx = s.require_node(child_id).dep_index_of(node_id);
1012                if let Some(idx) = dep_idx {
1013                    self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
1014                }
1015            }
1016        } else {
1017            // RESOLVED: handle unchanged. Don't release; old still in use.
1018            // Slice G: snapshot cache so a subsequent same-wave emit can
1019            // rewrite this Resolved to Data using the snapshot.
1020            // Q2 (2026-05-09): wave_cache_snapshots lives on CrossPartitionState.
1021            let current_cache = s.require_node(node_id).cache;
1022            if s.in_tick && current_cache != NO_HANDLE {
1023                use std::collections::hash_map::Entry;
1024                let mut cps = self.lock_cross_partition();
1025                if let Entry::Vacant(slot) = cps.wave_cache_snapshots.entry(node_id) {
1026                    self.binding.retain_handle(current_cache);
1027                    slot.insert(current_cache);
1028                }
1029            }
1030            // Slice G (D1 patch, 2026-05-09): mark this node as having
1031            // emitted tier-3 in this wave on the per-thread tracker.
1032            tier3_mark(node_id);
1033            self.queue_notify(&mut s, node_id, Message::Resolved);
1034            let child_ids: Vec<NodeId> = s
1035                .children
1036                .get(&node_id)
1037                .map(|c| c.iter().copied().collect())
1038                .unwrap_or_default();
1039            // /qa A7 fix (2026-05-09): collect auto-resolve inserts
1040            // during the loop and bulk-insert into pending_auto_resolve
1041            // under a SINGLE cross_partition acquire after the loop.
1042            // Pre-fix the loop acquired `cross_partition` once per
1043            // child via `self.lock_cross_partition().pending_auto_resolve.insert(...)`,
1044            // which is N mutex hops for an N-child cascade. Cannot
1045            // hoist to acquire-cps-before-loop because `queue_notify`
1046            // (called inside the loop) also acquires cross_partition
1047            // for `pending_pause_overflow.push` in the rare overflow
1048            // case — re-entrance on the non-reentrant Mutex would
1049            // self-deadlock.
1050            let mut auto_resolve_inserts: SmallVec<[NodeId; 4]> = SmallVec::new();
1051            for child_id in child_ids {
1052                let already_involved = s.require_node(child_id).involved_this_wave;
1053                if !already_involved {
1054                    {
1055                        let child = s.require_node_mut(child_id);
1056                        child.involved_this_wave = true;
1057                        child.dirty = true;
1058                    }
1059                    self.queue_notify(&mut s, child_id, Message::Dirty);
1060                    // Q2 (2026-05-09): pending_auto_resolve lives on
1061                    // CrossPartitionState. Deferred to after-loop
1062                    // bulk insert per the /qa A7 fix above.
1063                    auto_resolve_inserts.push(child_id);
1064                }
1065            }
1066            // /qa A7 fix (2026-05-09): single cross_partition acquire
1067            // for the bulk-insert. queue_notify above no longer holds
1068            // cross_partition by the time we reach here, so this acquire
1069            // is uncontested by the loop's own queue_notify calls.
1070            if !auto_resolve_inserts.is_empty() {
1071                let mut cps = self.lock_cross_partition();
1072                cps.pending_auto_resolve.extend(auto_resolve_inserts);
1073            }
1074        }
1075    }
1076
1077    /// Slice G: when a multi-emit wave is detected at `node_id` (a second
1078    /// emit arrives while a prior tier-3 message is still pending), rewrite
1079    /// any `Resolved` entries from earlier emits to `Data(snapshot_cache)`
1080    /// so the wave conforms to R1.3.3.a (≥1 DATA OR exactly 1 RESOLVED).
1081    /// Touches both `pending_notify` (immediate-flush path) and the per-node
1082    /// pause buffer (paused path).
1083    fn rewrite_prior_resolved_to_data(&self, node_id: NodeId) {
1084        let mut s = self.lock_state();
1085        // Q2 (2026-05-09): wave_cache_snapshots lives on CrossPartitionState.
1086        let snapshot = match self
1087            .lock_cross_partition()
1088            .wave_cache_snapshots
1089            .get(&node_id)
1090            .copied()
1091        {
1092            Some(h) if h != NO_HANDLE => h,
1093            // No snapshot available — the prior Resolved was queued without
1094            // a cache (sentinel pre-emit). Nothing to rewrite to; the
1095            // multi-emit case from sentinel is fine (verbatim Data path).
1096            _ => return,
1097        };
1098        let mut retains_needed = 0u32;
1099        // Pending_notify path. Walk all batches' messages — Slice-G
1100        // coalescing reasons about wave-content per node, not per-batch.
1101        if let Some(entry) = s.pending_notify.get_mut(&node_id) {
1102            for msg in entry.iter_messages_mut() {
1103                if matches!(msg, Message::Resolved) {
1104                    *msg = Message::Data(snapshot);
1105                    retains_needed += 1;
1106                }
1107            }
1108        }
1109        // Pause-buffer path.
1110        if let Some(rec) = s.nodes.get_mut(&node_id) {
1111            if let crate::node::PauseState::Paused { buffer, .. } = &mut rec.pause_state {
1112                for msg in &mut *buffer {
1113                    if matches!(msg, Message::Resolved) {
1114                        *msg = Message::Data(snapshot);
1115                        retains_needed += 1;
1116                    }
1117                }
1118            }
1119        }
1120        drop(s);
1121        // Each rewritten Resolved → Data adds a payload retain that
1122        // queue_notify would otherwise have taken at emit time. The
1123        // snapshot already owns one retain (taken when cache was
1124        // snapshotted); we need one fresh retain per rewrite.
1125        for _ in 0..retains_needed {
1126            self.binding.retain_handle(snapshot);
1127        }
1128    }
1129
1130    /// Equals check that crosses the binding boundary lock-released for
1131    /// `EqualsMode::Custom`. Caller must NOT hold the state lock.
1132    fn handles_equal_lock_released(&self, mode: EqualsMode, a: HandleId, b: HandleId) -> bool {
1133        if a == b {
1134            return true; // identity-on-handles always sufficient
1135        }
1136        if a == NO_HANDLE || b == NO_HANDLE {
1137            return false;
1138        }
1139        match mode {
1140            EqualsMode::Identity => false,
1141            EqualsMode::Custom(handle) => self.binding.custom_equals(handle, a, b),
1142        }
1143    }
1144
1145    /// Commit a DATA emission **without** equals substitution — used by
1146    /// `FnResult::Batch` processing where multi-message waves pass through
1147    /// verbatim per R1.3.2.d / R1.3.3.c. DIRTY auto-prefix respects
1148    /// R1.3.1.a condition (b): only queued if node not already dirty.
1149    ///
1150    /// Structurally identical to the DATA branch of [`Self::commit_emission`]
1151    /// but skips the Phase 2 equals check entirely.
1152    fn commit_emission_verbatim(&self, node_id: NodeId, new_handle: HandleId) {
1153        assert!(
1154            new_handle != NO_HANDLE,
1155            "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
1156        );
1157
1158        let mut s = self.lock_state();
1159        let rec = s.require_node(node_id);
1160        if rec.terminal.is_some() {
1161            drop(s);
1162            self.binding.release_handle(new_handle);
1163            return;
1164        }
1165
1166        // R1.3.1.a condition (b): DIRTY only if not already dirty.
1167        let already_dirty = s.require_node(node_id).dirty;
1168        s.require_node_mut(node_id).dirty = true;
1169        if !already_dirty {
1170            self.queue_notify(&mut s, node_id, Message::Dirty);
1171        }
1172
1173        // Always DATA — no equals substitution for Batch emissions.
1174        // Q2 (2026-05-09): wave_cache_snapshots lives on CrossPartitionState.
1175        let current_cache = s.require_node(node_id).cache;
1176        let snapshot_taken = if s.in_tick && current_cache != NO_HANDLE {
1177            use std::collections::hash_map::Entry;
1178            let mut cps = self.lock_cross_partition();
1179            match cps.wave_cache_snapshots.entry(node_id) {
1180                Entry::Vacant(slot) => {
1181                    slot.insert(current_cache);
1182                    true
1183                }
1184                Entry::Occupied(_) => false,
1185            }
1186        } else {
1187            false
1188        };
1189        s.require_node_mut(node_id).cache = new_handle;
1190        if current_cache != NO_HANDLE && !snapshot_taken {
1191            self.binding.release_handle(current_cache);
1192        }
1193        // Slice E1: replay buffer push (R2.6.5 / Lock 6.G).
1194        self.push_replay_buffer(&mut s, node_id, new_handle);
1195        // Slice G QA fix (A2, 2026-05-07) / D1 patch (2026-05-09): mark
1196        // tier3_emitted_this_wave on the per-thread tracker even on the
1197        // verbatim path. A subsequent commit_emission at the same node
1198        // in the same wave needs this flag to detect multi-emit and
1199        // skip equals substitution; without it, a Batch-then-standard
1200        // sequence would queue Resolved into a wave that already has
1201        // Data — violating R1.3.3.a. The Batch path itself still
1202        // passes verbatim per R1.3.3.c (we don't re-run equals here);
1203        // we just record that "this node has emitted tier-3 in this
1204        // wave."
1205        tier3_mark(node_id);
1206        self.queue_notify(&mut s, node_id, Message::Data(new_handle));
1207        // Propagate to children
1208        let child_ids: Vec<NodeId> = s
1209            .children
1210            .get(&node_id)
1211            .map(|c| c.iter().copied().collect())
1212            .unwrap_or_default();
1213        for child_id in child_ids {
1214            let dep_idx = s.require_node(child_id).dep_index_of(node_id);
1215            if let Some(idx) = dep_idx {
1216                self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
1217            }
1218        }
1219    }
1220
1221    /// Slice E1 (R2.6.5 / Lock 6.G): push a DATA handle into the node's
1222    /// replay buffer if opted in. Evicts oldest if cap exceeded; takes a
1223    /// fresh retain on push. RESOLVED is NOT buffered per canonical
1224    /// "DATA only" — call sites only invoke this for Data emissions.
1225    ///
1226    /// Evicted handle is queued into `cps.deferred_handle_releases`
1227    /// (released lock-released at flush time) per the binding-boundary
1228    /// lock-release discipline — `release_handle` may re-enter Core via
1229    /// finalizers and must not run while the state lock is held
1230    /// (QA A3, 2026-05-07). Q2 (2026-05-09): the queue moved to
1231    /// CrossPartitionState; this fn acquires `cross_partition` only
1232    /// when an eviction actually happens (the common case is no
1233    /// eviction → no second-mutex acquire).
1234    fn push_replay_buffer(&self, s: &mut CoreState, node_id: NodeId, new_handle: HandleId) {
1235        let rec = s.require_node_mut(node_id);
1236        let cap = match rec.replay_buffer_cap {
1237            Some(c) if c > 0 => c,
1238            _ => return,
1239        };
1240        self.binding.retain_handle(new_handle);
1241        rec.replay_buffer.push_back(new_handle);
1242        let evicted = if rec.replay_buffer.len() > cap {
1243            rec.replay_buffer.pop_front()
1244        } else {
1245            None
1246        };
1247        if let Some(h) = evicted {
1248            self.lock_cross_partition().deferred_handle_releases.push(h);
1249        }
1250    }
1251
1252    // ===================================================================
1253    // Operator dispatch (Slice C-1, D009).
1254    //
1255    // `fire_operator` is the entry point for nodes whose `kind` is
1256    // `NodeKind::Operator(_)`. It branches on the `OperatorOp` discriminant
1257    // to per-operator helpers that snapshot inputs under the lock, drop the
1258    // lock to call the binding's bulk projection FFI, and reacquire to
1259    // apply emissions via `commit_emission_verbatim` (no per-item equals
1260    // dedup at the wire — operator output passes verbatim per the same
1261    // R1.3.2.d / R1.3.3.c rule that governs `FnResult::Batch`).
1262    //
1263    // **Refcount discipline:** inputs sourced from `dep_records[i].data_batch`
1264    // share retains owned by the wave's data-batch slot (released at
1265    // wave-end rotation in `clear_wave_state`). Operators that emit those
1266    // handles unchanged (`Filter`, `DistinctUntilChanged`, `Pairwise`'s
1267    // `prev` carry-over) take an additional retain via `retain_handle`
1268    // before passing to `commit_emission_verbatim` — the cache slot owns
1269    // its own share, independent of the data-batch slot's. Operators that
1270    // produce fresh handles (`Map` / `Scan` / `Reduce` / `Pairwise`'s
1271    // packed tuples) receive retains pre-bumped by the binding's bulk-
1272    // projection method.
1273    // ===================================================================
1274
1275    /// Operator dispatch entry. Pre-checks (terminal short-circuit, first-
1276    /// run gate accounting for `partial`, terminal-aware fire for `Reduce`)
1277    /// happen here; per-operator behavior lives in the `fire_op_*` helpers.
1278    fn fire_operator(&self, node_id: NodeId, op: OperatorOp) {
1279        // Phase 1 (lock-held): remove from pending_fires, evaluate skip.
1280        let proceed = {
1281            let mut s = self.lock_state();
1282            s.pending_fires.remove(&node_id);
1283            let rec = s.require_node(node_id);
1284            if rec.terminal.is_some() {
1285                false
1286            } else {
1287                // First-run gate (R2.5.3 / R5.4). Partial-mode operators
1288                // (D011) opt out of the gate; otherwise we wait for every
1289                // dep to have delivered at least one real handle. Terminal-
1290                // aware operators (currently `Reduce`) additionally count a
1291                // dep terminal as "real input" so they can fire on
1292                // upstream COMPLETE-without-DATA and emit the seed.
1293                let has_real_input = !rec.has_sentinel_deps()
1294                    || rec.dep_records.iter().any(|dr| dr.terminal.is_some());
1295                rec.partial || has_real_input
1296            }
1297        };
1298        if !proceed {
1299            return;
1300        }
1301
1302        // A6 (Slice F, 2026-05-07): track operator fire on the
1303        // `currently_firing` stack so a binding-side project/predicate/fold
1304        // FFI callback that re-enters `Core::set_deps(node_id, ...)` is
1305        // rejected with `SetDepsError::ReentrantOnFiringNode`. Drop pops
1306        // the stack on panic too.
1307        let _firing = FiringGuard::new(self, node_id);
1308
1309        match op {
1310            OperatorOp::Map { fn_id } => self.fire_op_map(node_id, fn_id),
1311            OperatorOp::Filter { fn_id } => self.fire_op_filter(node_id, fn_id),
1312            OperatorOp::Scan { fn_id, .. } => self.fire_op_scan(node_id, fn_id),
1313            OperatorOp::Reduce { fn_id, .. } => self.fire_op_reduce(node_id, fn_id),
1314            OperatorOp::DistinctUntilChanged { equals_fn_id } => {
1315                self.fire_op_distinct(node_id, equals_fn_id);
1316            }
1317            OperatorOp::Pairwise { fn_id } => self.fire_op_pairwise(node_id, fn_id),
1318            OperatorOp::Combine { pack_fn } => self.fire_op_combine(node_id, pack_fn),
1319            OperatorOp::WithLatestFrom { pack_fn } => {
1320                self.fire_op_with_latest_from(node_id, pack_fn);
1321            }
1322            OperatorOp::Merge => self.fire_op_merge(node_id),
1323            OperatorOp::Take { count } => self.fire_op_take(node_id, count),
1324            OperatorOp::Skip { count } => self.fire_op_skip(node_id, count),
1325            OperatorOp::TakeWhile { fn_id } => self.fire_op_take_while(node_id, fn_id),
1326            // The variant carries `default` for `register_operator`'s
1327            // `make_op_scratch` path; once registered, the live default
1328            // is read from `LastState::default` inside `fire_op_last`.
1329            OperatorOp::Last { .. } => self.fire_op_last(node_id),
1330        }
1331    }
1332
1333    /// Snapshot the operator's single dep batch (transform constraint —
1334    /// R5.7 single-dep). Returns `(inputs, terminal)` where `inputs` is a
1335    /// fresh `Vec<HandleId>` (no retains) and `terminal` reflects
1336    /// `dep_records[0].terminal` at snapshot time.
1337    fn snapshot_op_dep0(&self, node_id: NodeId) -> (Vec<HandleId>, Option<TerminalKind>) {
1338        let s = self.lock_state();
1339        let rec = s.require_node(node_id);
1340        debug_assert!(
1341            !rec.dep_records.is_empty(),
1342            "transform operator must have ≥1 dep"
1343        );
1344        let dr = &rec.dep_records[0];
1345        (dr.data_batch.iter().copied().collect(), dr.terminal)
1346    }
1347
1348    /// Emit DIRTY (if not already dirty) followed by RESOLVED. Used by
1349    /// silent-drop operators (Filter / DistinctUntilChanged / Pairwise)
1350    /// when a wave's inputs all suppress and the operator needs to settle
1351    /// the wave for its subscribers (D018 — let DIRTY ride; queue RESOLVED
1352    /// on full-reject).
1353    fn settle_dirty_resolved(&self, node_id: NodeId) {
1354        let mut s = self.lock_state();
1355        if s.require_node(node_id).terminal.is_some() {
1356            return;
1357        }
1358        let already_dirty = s.require_node(node_id).dirty;
1359        s.require_node_mut(node_id).dirty = true;
1360        if !already_dirty {
1361            self.queue_notify(&mut s, node_id, Message::Dirty);
1362        }
1363        // Slice G: skip Resolved if pending_notify already has a tier-3
1364        // message — adding Resolved would violate R1.3.3.a.
1365        let already_tier3 = s
1366            .pending_notify
1367            .get(&node_id)
1368            .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3));
1369        if !already_tier3 {
1370            self.queue_notify(&mut s, node_id, Message::Resolved);
1371        }
1372    }
1373
1374    /// `OperatorOp::Map` dispatch.
1375    fn fire_op_map(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1376        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1377        // Mark fired regardless of input count (activation gate already
1378        // satisfied or partial-mode).
1379        {
1380            let mut s = self.lock_state();
1381            s.require_node_mut(node_id).has_fired_once = true;
1382        }
1383        if inputs.is_empty() {
1384            return;
1385        }
1386        // Phase 2 (lock-released): bulk project. Binding returns one
1387        // handle per input, each with a retain share already taken.
1388        let outputs = self.binding.project_each(fn_id, &inputs);
1389        // Phase 3: emit each output. `commit_emission_verbatim` consumes
1390        // the retain into the cache slot (and releases the prior cache
1391        // handle internally).
1392        for h in outputs {
1393            self.commit_emission_verbatim(node_id, h);
1394        }
1395    }
1396
1397    /// `OperatorOp::Filter` dispatch (D012/D018).
1398    fn fire_op_filter(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1399        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1400        {
1401            let mut s = self.lock_state();
1402            s.require_node_mut(node_id).has_fired_once = true;
1403        }
1404        if inputs.is_empty() {
1405            return;
1406        }
1407        // Phase 2: predicate per input.
1408        let pass = self.binding.predicate_each(fn_id, &inputs);
1409        debug_assert!(
1410            pass.len() == inputs.len(),
1411            "predicate_each returned {} bools for {} inputs",
1412            pass.len(),
1413            inputs.len()
1414        );
1415        // Phase 3: emit passing items verbatim. Take a fresh retain for
1416        // each — the data_batch slot still owns its retain (released at
1417        // wave-end rotation), and the cache slot needs its own.
1418        let mut emitted = 0usize;
1419        for (i, &h) in inputs.iter().enumerate() {
1420            if pass.get(i).copied().unwrap_or(false) {
1421                self.binding.retain_handle(h);
1422                self.commit_emission_verbatim(node_id, h);
1423                emitted += 1;
1424            }
1425        }
1426        // D018: full-reject settles with DIRTY+RESOLVED.
1427        if emitted == 0 {
1428            self.settle_dirty_resolved(node_id);
1429        }
1430    }
1431
1432    /// `OperatorOp::Scan` dispatch — left-fold emitting each new acc.
1433    fn fire_op_scan(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1434        use crate::op_state::ScanState;
1435        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1436        let acc = {
1437            let s = self.lock_state();
1438            scratch_ref::<ScanState>(&s, node_id).acc
1439        };
1440        {
1441            let mut s = self.lock_state();
1442            s.require_node_mut(node_id).has_fired_once = true;
1443        }
1444        if inputs.is_empty() {
1445            return;
1446        }
1447        // Phase 2: fold each input through. Returns N new handles, each
1448        // with a fresh retain.
1449        let new_states = self.binding.fold_each(fn_id, acc, &inputs);
1450        debug_assert!(
1451            new_states.len() == inputs.len(),
1452            "fold_each returned {} accs for {} inputs",
1453            new_states.len(),
1454            inputs.len()
1455        );
1456        // Phase 3a: update ScanState.acc to the LAST new acc. Take an
1457        // extra retain for the slot; release the prior acc's slot retain.
1458        let last_acc = new_states.last().copied();
1459        if let Some(last) = last_acc {
1460            let prev_acc = {
1461                let mut s = self.lock_state();
1462                let scratch = scratch_mut::<ScanState>(&mut s, node_id);
1463                let prev = scratch.acc;
1464                scratch.acc = last;
1465                prev
1466            };
1467            // Take the slot's retain on the new acc.
1468            self.binding.retain_handle(last);
1469            // Release the prior slot's retain (post-lock to keep binding
1470            // free to re-enter Core safely).
1471            if prev_acc != crate::handle::NO_HANDLE {
1472                self.binding.release_handle(prev_acc);
1473            }
1474        }
1475        // Phase 3b: emit each intermediate acc verbatim. `new_states`
1476        // entries each carry one retain from `fold_each`; that retain is
1477        // consumed by `commit_emission_verbatim` into the cache slot.
1478        for h in new_states {
1479            self.commit_emission_verbatim(node_id, h);
1480        }
1481    }
1482
1483    /// `OperatorOp::Reduce` dispatch — accumulates silently; emits acc on
1484    /// upstream COMPLETE (cascades ERROR verbatim).
1485    fn fire_op_reduce(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1486        use crate::op_state::ReduceState;
1487        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
1488        let acc = {
1489            let s = self.lock_state();
1490            scratch_ref::<ReduceState>(&s, node_id).acc
1491        };
1492        {
1493            let mut s = self.lock_state();
1494            s.require_node_mut(node_id).has_fired_once = true;
1495        }
1496        // Phase 2: accumulate (silent — no per-input emit).
1497        let new_states = if inputs.is_empty() {
1498            SmallVec::<[HandleId; 1]>::new()
1499        } else {
1500            self.binding.fold_each(fn_id, acc, &inputs)
1501        };
1502        debug_assert!(
1503            new_states.len() == inputs.len(),
1504            "fold_each returned {} accs for {} inputs",
1505            new_states.len(),
1506            inputs.len()
1507        );
1508        // Update ReduceState.acc to last new acc; release intermediate
1509        // states (we don't emit them) and the prior acc's slot retain.
1510        let last_acc = new_states.last().copied();
1511        let intermediates_to_release: Vec<HandleId> = if new_states.len() > 1 {
1512            new_states[..new_states.len() - 1].to_vec()
1513        } else {
1514            Vec::new()
1515        };
1516        let prev_acc_to_release = if let Some(last) = last_acc {
1517            let prev_acc = {
1518                let mut s = self.lock_state();
1519                let scratch = scratch_mut::<ReduceState>(&mut s, node_id);
1520                let prev = scratch.acc;
1521                scratch.acc = last;
1522                prev
1523            };
1524            self.binding.retain_handle(last);
1525            if prev_acc == crate::handle::NO_HANDLE {
1526                None
1527            } else {
1528                Some(prev_acc)
1529            }
1530        } else {
1531            None
1532        };
1533        // Release intermediate fold results (Reduce only emits the LAST,
1534        // but only on terminal). Each was retained by `fold_each`.
1535        for h in intermediates_to_release {
1536            self.binding.release_handle(h);
1537        }
1538        if let Some(h) = prev_acc_to_release {
1539            self.binding.release_handle(h);
1540        }
1541
1542        // Phase 3: emit on terminal.
1543        match terminal {
1544            None => {
1545                // Still accumulating; no emit. Subscribers see no message
1546                // for this wave (silent accumulation). The first wave that
1547                // pushes Reduce to fire produces a Dirty entry on the
1548                // upstream's commit, but Reduce itself doesn't queue any
1549                // tier-3 since R5 silently absorbs. v1: leave the
1550                // post-drain auto-resolve sweep to settle nothing —
1551                // pending_notify has no entry for Reduce so the sweep is
1552                // a no-op.
1553            }
1554            Some(TerminalKind::Complete) => {
1555                // Read the live acc (may be the seed if no DATA arrived)
1556                // and emit Data(acc) + Complete.
1557                let final_acc = {
1558                    let s = self.lock_state();
1559                    scratch_ref::<ReduceState>(&s, node_id).acc
1560                };
1561                if final_acc != crate::handle::NO_HANDLE {
1562                    // Emission needs its own retain (slot's retain is
1563                    // owned by ReduceState.acc until reset/Drop).
1564                    self.binding.retain_handle(final_acc);
1565                    self.commit_emission_verbatim(node_id, final_acc);
1566                }
1567                self.complete(node_id);
1568            }
1569            Some(TerminalKind::Error(h)) => {
1570                // Core::error transfers the caller's share into the
1571                // cascade (node.terminal + per-child dep_terminal slots);
1572                // no release at the error() boundary. Take a fresh share
1573                // here so the cascade owns it independently of the
1574                // dep_records[0].terminal slot's share.
1575                self.binding.retain_handle(h);
1576                self.error(node_id, h);
1577            }
1578        }
1579    }
1580
1581    /// `OperatorOp::DistinctUntilChanged` dispatch.
1582    fn fire_op_distinct(&self, node_id: NodeId, equals_fn_id: crate::handle::FnId) {
1583        use crate::op_state::DistinctState;
1584        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1585        let mut prev = {
1586            let s = self.lock_state();
1587            scratch_ref::<DistinctState>(&s, node_id).prev
1588        };
1589        {
1590            let mut s = self.lock_state();
1591            s.require_node_mut(node_id).has_fired_once = true;
1592        }
1593        if inputs.is_empty() {
1594            return;
1595        }
1596        // Take a working-copy retain on the initial prev so both the loop
1597        // (which releases old_prev on each non-equal item) and phase 3
1598        // (which releases the slot's original handle) each have their own
1599        // share. Without this, the loop's release of old_prev (== original
1600        // DistinctState.prev) double-releases against phase 3's stale_slot
1601        // release.
1602        if prev != crate::handle::NO_HANDLE {
1603            self.binding.retain_handle(prev);
1604        }
1605        // Phase 2: per-input equals(prev, current). Each non-equal input
1606        // is emitted and becomes the new prev. Equals fn_id reuses
1607        // `BindingBoundary::custom_equals`.
1608        let mut emitted = 0usize;
1609        for &h in &inputs {
1610            let equal = if prev == crate::handle::NO_HANDLE {
1611                false
1612            } else if prev == h {
1613                true
1614            } else {
1615                self.binding.custom_equals(equals_fn_id, prev, h)
1616            };
1617            if !equal {
1618                // Emit this input verbatim.
1619                self.binding.retain_handle(h);
1620                self.commit_emission_verbatim(node_id, h);
1621                // Update prev: take retain on new prev, release old
1622                // (working-copy retain from above or from prior iteration).
1623                self.binding.retain_handle(h);
1624                let old_prev = prev;
1625                prev = h;
1626                if old_prev != crate::handle::NO_HANDLE {
1627                    self.binding.release_handle(old_prev);
1628                }
1629                emitted += 1;
1630            }
1631        }
1632        // Phase 3: persist prev into DistinctState.prev slot. Release the
1633        // slot's original retain (stale_slot) — this is the slot-owned
1634        // share, independent of the working-copy share released in the
1635        // loop above.
1636        {
1637            let mut s = self.lock_state();
1638            let scratch = scratch_mut::<DistinctState>(&mut s, node_id);
1639            let stale_slot = scratch.prev;
1640            scratch.prev = prev;
1641            if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
1642                drop(s);
1643                self.binding.release_handle(stale_slot);
1644            }
1645        }
1646        // Release the working-copy retain on the final prev if it was
1647        // never released in the loop (i.e. no non-equal items passed,
1648        // prev == original). In that case stale_slot == prev, so phase 3
1649        // didn't release it either — but the working-copy retain is still
1650        // outstanding. Release it now.
1651        if emitted == 0 && prev != crate::handle::NO_HANDLE {
1652            self.binding.release_handle(prev);
1653        }
1654        if emitted == 0 {
1655            self.settle_dirty_resolved(node_id);
1656        }
1657    }
1658
1659    /// `OperatorOp::Pairwise` dispatch — emits `(prev, current)` tuples
1660    /// starting after the second value (first input swallowed, sets `prev`).
1661    fn fire_op_pairwise(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1662        use crate::op_state::PairwiseState;
1663        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1664        let mut prev = {
1665            let s = self.lock_state();
1666            scratch_ref::<PairwiseState>(&s, node_id).prev
1667        };
1668        {
1669            let mut s = self.lock_state();
1670            s.require_node_mut(node_id).has_fired_once = true;
1671        }
1672        if inputs.is_empty() {
1673            return;
1674        }
1675        let mut emitted = 0usize;
1676        for &h in &inputs {
1677            if prev == crate::handle::NO_HANDLE {
1678                // First-ever value — swallow, set prev. Retain for the
1679                // PairwiseState.prev slot (persisted in phase 3 below).
1680                self.binding.retain_handle(h);
1681                prev = h;
1682                continue;
1683            }
1684            // Pack (prev, current) into a tuple handle. Binding returns a
1685            // fresh retain on the packed handle.
1686            let packed = self.binding.pairwise_pack(fn_id, prev, h);
1687            self.commit_emission_verbatim(node_id, packed);
1688            // Advance prev: take retain on h, release old prev.
1689            self.binding.retain_handle(h);
1690            let old_prev = prev;
1691            prev = h;
1692            self.binding.release_handle(old_prev);
1693            emitted += 1;
1694        }
1695        // Persist prev into PairwiseState.prev slot.
1696        {
1697            let mut s = self.lock_state();
1698            let scratch = scratch_mut::<PairwiseState>(&mut s, node_id);
1699            let stale_slot = scratch.prev;
1700            scratch.prev = prev;
1701            if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
1702                drop(s);
1703                self.binding.release_handle(stale_slot);
1704            }
1705        }
1706        if emitted == 0 {
1707            self.settle_dirty_resolved(node_id);
1708        }
1709    }
1710
1711    // =================================================================
1712    // Slice C-2: multi-dep combinator operators (D020)
1713    // =================================================================
1714
1715    /// Snapshot all deps' "latest" handle for multi-dep combinators.
1716    /// For each dep: returns `data_batch.last()` if non-empty (dep fired
1717    /// this wave), else `prev_data` (last handle from previous wave).
1718    /// Also returns whether dep[0] (primary) had DATA this wave —
1719    /// needed by `fire_op_with_latest_from`.
1720    fn snapshot_op_all_latest(&self, node_id: NodeId) -> (SmallVec<[HandleId; 4]>, bool) {
1721        let s = self.lock_state();
1722        let rec = s.require_node(node_id);
1723        let primary_fired = rec
1724            .dep_records
1725            .first()
1726            .is_some_and(|dr| !dr.data_batch.is_empty());
1727        let latest: SmallVec<[HandleId; 4]> = rec
1728            .dep_records
1729            .iter()
1730            .map(|dr| dr.data_batch.last().copied().unwrap_or(dr.prev_data))
1731            .collect();
1732        (latest, primary_fired)
1733    }
1734
1735    /// `OperatorOp::Combine` dispatch — N-dep combineLatest. Packs the
1736    /// latest handle per dep into a tuple via `pack_tuple`, emits on
1737    /// any dep fire. First-run gate (R2.5.3, partial: false) guarantees
1738    /// all deps have a real handle on first fire. Post-warmup INVALIDATE
1739    /// guard: if any dep's prev_data was cleared, settles with RESOLVED
1740    /// instead of packing a NO_HANDLE into the tuple.
1741    fn fire_op_combine(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
1742        let (latest, _primary_fired) = self.snapshot_op_all_latest(node_id);
1743        {
1744            let mut s = self.lock_state();
1745            s.require_node_mut(node_id).has_fired_once = true;
1746        }
1747        // Post-warmup INVALIDATE guard: a dep may have been invalidated
1748        // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
1749        if latest.contains(&crate::handle::NO_HANDLE) {
1750            self.settle_dirty_resolved(node_id);
1751            return;
1752        }
1753        let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
1754        self.commit_emission_verbatim(node_id, tuple_handle);
1755    }
1756
1757    /// `OperatorOp::WithLatestFrom` dispatch — 2-dep, fire-on-primary-only
1758    /// (D021 / Phase 10.5). Emits `[primary, secondary]` pair only when
1759    /// dep[0] (primary) has DATA in the wave. If only dep[1] fires →
1760    /// RESOLVED. Post-warmup INVALIDATE guard: if secondary latest is
1761    /// `NO_HANDLE` (INVALIDATE cleared it), settles with RESOLVED.
1762    fn fire_op_with_latest_from(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
1763        let (latest, primary_fired) = self.snapshot_op_all_latest(node_id);
1764        let first_fire = {
1765            let mut s = self.lock_state();
1766            let rec = s.require_node_mut(node_id);
1767            let was_first = !rec.has_fired_once;
1768            rec.has_fired_once = true;
1769            was_first
1770        };
1771        // On first fire (gate release), always emit — the first-run gate
1772        // guarantees both deps have values (via prev_data fallback in
1773        // snapshot). On subsequent fires, only emit when primary fires.
1774        if !first_fire && !primary_fired {
1775            // Secondary-only update — no downstream DATA.
1776            self.settle_dirty_resolved(node_id);
1777            return;
1778        }
1779        // Post-warmup INVALIDATE guard: secondary may have been invalidated
1780        // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
1781        debug_assert!(latest.len() == 2, "withLatestFrom requires exactly 2 deps");
1782        if latest[1] == crate::handle::NO_HANDLE {
1783            self.settle_dirty_resolved(node_id);
1784            return;
1785        }
1786        let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
1787        self.commit_emission_verbatim(node_id, tuple_handle);
1788    }
1789
1790    /// `OperatorOp::Merge` dispatch — N-dep, forward all DATA handles
1791    /// verbatim (D022). Zero FFI on fire: no transformation. Each dep's
1792    /// batch handles are collected, retained, and emitted individually.
1793    fn fire_op_merge(&self, node_id: NodeId) {
1794        // Collect all batch handles from all deps (flat).
1795        let all_handles: Vec<HandleId> = {
1796            let s = self.lock_state();
1797            let rec = s.require_node(node_id);
1798            rec.dep_records
1799                .iter()
1800                .flat_map(|dr| dr.data_batch.iter().copied())
1801                .collect()
1802        };
1803        {
1804            let mut s = self.lock_state();
1805            s.require_node_mut(node_id).has_fired_once = true;
1806        }
1807        if all_handles.is_empty() {
1808            // All deps settled RESOLVED this wave — no DATA to forward.
1809            self.settle_dirty_resolved(node_id);
1810            return;
1811        }
1812        // Emit each handle verbatim. Take a fresh retain per handle
1813        // (independent of the dep batch's retain which gets released at
1814        // wave-end). Matches Filter's discipline for passing inputs.
1815        for &h in &all_handles {
1816            self.binding.retain_handle(h);
1817            self.commit_emission_verbatim(node_id, h);
1818        }
1819    }
1820
1821    // =================================================================
1822    // Slice C-3: flow operators (D024)
1823    // =================================================================
1824
1825    /// `OperatorOp::Take` dispatch — emits the first `count` DATA values
1826    /// then self-completes via `Core::complete`. When `count == 0`, the
1827    /// first fire emits zero items then immediately self-completes
1828    /// (D027). Cross-wave counter lives in
1829    /// [`TakeState::count_emitted`](super::op_state::TakeState::count_emitted).
1830    fn fire_op_take(&self, node_id: NodeId, count: u32) {
1831        use crate::op_state::TakeState;
1832        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
1833        // Snapshot current counter; mark fired regardless of input count
1834        // (activation gate already satisfied or partial-mode).
1835        let mut count_emitted = {
1836            let s = self.lock_state();
1837            scratch_ref::<TakeState>(&s, node_id).count_emitted
1838        };
1839        {
1840            let mut s = self.lock_state();
1841            s.require_node_mut(node_id).has_fired_once = true;
1842        }
1843        // Already at quota before any input this wave — self-complete
1844        // immediately. Covers `count == 0` (first-fire short-circuit) and
1845        // any defensive re-entry after the terminal-skip in `fire_operator`
1846        // already guards against double-complete.
1847        if count_emitted >= count {
1848            self.complete(node_id);
1849            return;
1850        }
1851        // Per-input emission loop. Each pass takes a fresh retain for the
1852        // cache slot; data_batch slot's retain is released at wave-end
1853        // rotation independently.
1854        for &h in &inputs {
1855            self.binding.retain_handle(h);
1856            self.commit_emission_verbatim(node_id, h);
1857            count_emitted = count_emitted.saturating_add(1);
1858            if count_emitted >= count {
1859                break;
1860            }
1861        }
1862        // Persist the updated counter.
1863        {
1864            let mut s = self.lock_state();
1865            scratch_mut::<TakeState>(&mut s, node_id).count_emitted = count_emitted;
1866        }
1867        // Self-complete if we hit the quota this wave. Upstream COMPLETE
1868        // (terminal == Some(Complete)) without us hitting the count
1869        // propagates via the standard auto-cascade — we don't intercept it.
1870        if count_emitted >= count {
1871            self.complete(node_id);
1872            return;
1873        }
1874        // If upstream is already Errored and we haven't hit count, the
1875        // standard cascade will propagate it. If the wave delivered no
1876        // inputs (e.g. RESOLVED from upstream), settle DIRTY+RESOLVED so
1877        // subscribers see the wave close.
1878        if inputs.is_empty() && terminal.is_none() {
1879            self.settle_dirty_resolved(node_id);
1880        }
1881    }
1882
1883    /// `OperatorOp::Skip` dispatch — drops the first `count` DATA values,
1884    /// then forwards the rest. Cross-wave counter lives in
1885    /// [`SkipState::count_skipped`](super::op_state::SkipState::count_skipped).
1886    /// On a wave where every input is still in the skip window, settles
1887    /// DIRTY+RESOLVED (D018 pattern) so subscribers see the wave close.
1888    fn fire_op_skip(&self, node_id: NodeId, count: u32) {
1889        use crate::op_state::SkipState;
1890        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1891        let mut count_skipped = {
1892            let s = self.lock_state();
1893            scratch_ref::<SkipState>(&s, node_id).count_skipped
1894        };
1895        {
1896            let mut s = self.lock_state();
1897            s.require_node_mut(node_id).has_fired_once = true;
1898        }
1899        // No early-return on empty inputs: the post-loop `emitted == 0`
1900        // settle handles the empty-inputs case identically to the
1901        // all-swallowed-by-skip-window case (Slice C-3 /qa P6 — symmetry
1902        // with `fire_op_take`).
1903        let mut emitted = 0usize;
1904        for &h in &inputs {
1905            if count_skipped < count {
1906                count_skipped = count_skipped.saturating_add(1);
1907                // Drop this input — the data_batch slot still owns its
1908                // retain (released at wave-end rotation). No emission.
1909                continue;
1910            }
1911            // Past the skip window — emit verbatim. Take a fresh retain
1912            // for the cache slot.
1913            self.binding.retain_handle(h);
1914            self.commit_emission_verbatim(node_id, h);
1915            emitted += 1;
1916        }
1917        // Persist the updated counter.
1918        {
1919            let mut s = self.lock_state();
1920            scratch_mut::<SkipState>(&mut s, node_id).count_skipped = count_skipped;
1921        }
1922        if emitted == 0 {
1923            self.settle_dirty_resolved(node_id);
1924        }
1925    }
1926
1927    /// `OperatorOp::TakeWhile` dispatch — emits while the predicate
1928    /// holds; on the first `false`, emits any preceding passes from the
1929    /// same batch then self-completes via `Core::complete`. Reuses
1930    /// [`BindingBoundary::predicate_each`] (D029).
1931    fn fire_op_take_while(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1932        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1933        {
1934            let mut s = self.lock_state();
1935            s.require_node_mut(node_id).has_fired_once = true;
1936        }
1937        if inputs.is_empty() {
1938            return;
1939        }
1940        // Phase 2: predicate per input.
1941        let pass = self.binding.predicate_each(fn_id, &inputs);
1942        debug_assert!(
1943            pass.len() == inputs.len(),
1944            "predicate_each returned {} bools for {} inputs",
1945            pass.len(),
1946            inputs.len()
1947        );
1948        // Phase 3: emit each input until the first false; then
1949        // self-complete. `fire_operator`'s `terminal.is_some()`
1950        // short-circuit gates re-entry after the self-complete cascade
1951        // installs the terminal slot — no extra `done` flag needed.
1952        let mut emitted = 0usize;
1953        let mut first_false_seen = false;
1954        for (i, &h) in inputs.iter().enumerate() {
1955            if pass.get(i).copied().unwrap_or(false) {
1956                self.binding.retain_handle(h);
1957                self.commit_emission_verbatim(node_id, h);
1958                emitted += 1;
1959            } else {
1960                first_false_seen = true;
1961                break;
1962            }
1963        }
1964        if first_false_seen {
1965            self.complete(node_id);
1966            return;
1967        }
1968        if emitted == 0 {
1969            // Whole batch passed but was empty (impossible here since
1970            // inputs.is_empty() returned early above) — defensive only.
1971            self.settle_dirty_resolved(node_id);
1972        }
1973    }
1974
1975    /// `OperatorOp::Last` dispatch — buffers the latest DATA; emits
1976    /// `Data(latest)` (or `Data(default)` if no DATA arrived and a
1977    /// default was registered) then `Complete` on upstream COMPLETE.
1978    /// On upstream ERROR, propagates verbatim. Storage:
1979    /// [`LastState`](super::op_state::LastState).
1980    ///
1981    /// **Silent-buffer semantics (mirrors Reduce):** on a non-terminal
1982    /// wave (`terminal == None`), `fire_op_last` updates the buffered
1983    /// `latest` handle but produces NO downstream wire message —
1984    /// subscribers observe the operator only when upstream
1985    /// COMPLETE/ERROR triggers the terminal branch. Intermediate
1986    /// inputs from the dep's batch are dropped on the floor (their
1987    /// `data_batch` retains release at wave-end rotation
1988    /// independently). Per-wave settlement on intermediate waves is
1989    /// the canonical behavior for terminal-aware operators.
1990    fn fire_op_last(&self, node_id: NodeId) {
1991        use crate::op_state::LastState;
1992        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
1993        {
1994            let mut s = self.lock_state();
1995            s.require_node_mut(node_id).has_fired_once = true;
1996        }
1997
1998        // Phase 2: buffer the latest input handle (if any). Retain new,
1999        // release old. data_batch slot's retain is released at wave-end
2000        // rotation independently — the LastState slot keeps its own
2001        // share so the value survives across waves.
2002        if let Some(&new_latest) = inputs.last() {
2003            let prev_latest = {
2004                let mut s = self.lock_state();
2005                let scratch = scratch_mut::<LastState>(&mut s, node_id);
2006                let prev = scratch.latest;
2007                scratch.latest = new_latest;
2008                prev
2009            };
2010            self.binding.retain_handle(new_latest);
2011            if prev_latest != crate::handle::NO_HANDLE {
2012                self.binding.release_handle(prev_latest);
2013            }
2014        }
2015
2016        // Phase 3: emit on terminal. Buffer-only fires (no terminal yet)
2017        // produce no downstream message — Reduce-style silent
2018        // accumulation. The post-drain auto-resolve sweep is a no-op
2019        // because pending_notify has no entry for Last.
2020        match terminal {
2021            None => {}
2022            Some(TerminalKind::Complete) => {
2023                // Read the live latest + default. If latest != NO_HANDLE,
2024                // emit it. Otherwise, if default != NO_HANDLE, emit default.
2025                // Otherwise, emit only Complete (empty stream, no default).
2026                let (latest, default) = {
2027                    let s = self.lock_state();
2028                    let scratch = scratch_ref::<LastState>(&s, node_id);
2029                    (scratch.latest, scratch.default)
2030                };
2031                let to_emit = if latest != crate::handle::NO_HANDLE {
2032                    Some(latest)
2033                } else if default != crate::handle::NO_HANDLE {
2034                    Some(default)
2035                } else {
2036                    None
2037                };
2038                if let Some(h) = to_emit {
2039                    // Emission needs its own retain — the LastState slot
2040                    // keeps its share until reset/Drop.
2041                    self.binding.retain_handle(h);
2042                    self.commit_emission_verbatim(node_id, h);
2043                }
2044                self.complete(node_id);
2045            }
2046            Some(TerminalKind::Error(h)) => {
2047                // Take a fresh share for the error cascade — the
2048                // dep_records[0].terminal slot keeps its own share
2049                // (released by reset_for_fresh_lifecycle / Drop).
2050                self.binding.retain_handle(h);
2051                self.error(node_id, h);
2052            }
2053        }
2054    }
2055
2056    pub(crate) fn deliver_data_to_consumer(
2057        &self,
2058        s: &mut CoreState,
2059        consumer_id: NodeId,
2060        dep_idx: usize,
2061        handle: HandleId,
2062    ) {
2063        // Retain the handle for the batch accumulation slot — each DATA
2064        // handle in `data_batch` owns a retain share, released at wave-end
2065        // rotation in `clear_wave_state`.
2066        self.binding.retain_handle(handle);
2067
2068        let is_dynamic;
2069        let is_state;
2070        let tracked_or_first_fire;
2071        // Slice F audit close (2026-05-07): default-mode pause suppression.
2072        // If the consumer is paused with `PausableMode::Default`, the
2073        // canonical-spec §2.6 behavior is to suppress fn-fire and consolidate
2074        // pause-window dep deliveries into one fn execution on RESUME.
2075        // Mark `pending_wave` on the pause state instead of adding to
2076        // `pending_fires`. The dep state still advances (the data_batch push
2077        // above is unchanged), and clear_wave_state still rotates the latest
2078        // dep DATA into prev_data — so when the fn ultimately fires on
2079        // RESUME, it sees the consolidated post-pause state.
2080        let suppressed_for_default_pause;
2081        {
2082            let consumer = s.require_node_mut(consumer_id);
2083            consumer.dep_records[dep_idx].data_batch.push(handle);
2084            consumer.dep_records[dep_idx].involved_this_wave = true;
2085            consumer.involved_this_wave = true;
2086            is_dynamic = consumer.is_dynamic;
2087            is_state = consumer.is_state();
2088            tracked_or_first_fire = !consumer.has_fired_once || consumer.tracked.contains(&dep_idx);
2089            suppressed_for_default_pause = consumer.pause_state.is_paused()
2090                && consumer.pausable == crate::node::PausableMode::Default;
2091            if suppressed_for_default_pause {
2092                consumer.pause_state.mark_pending_wave();
2093            }
2094        }
2095        if suppressed_for_default_pause {
2096            // Default-mode pause: don't add to pending_fires; RESUME will
2097            // schedule one consolidated fire.
2098            return;
2099        }
2100        if is_state {
2101            // State nodes don't have deps; unreachable in practice.
2102        } else if is_dynamic {
2103            if tracked_or_first_fire {
2104                s.pending_fires.insert(consumer_id);
2105            }
2106        } else {
2107            // Derived / Operator / Producer (Producer has no deps so won't
2108            // reach here, but the predicate-based dispatch handles it
2109            // uniformly).
2110            s.pending_fires.insert(consumer_id);
2111        }
2112    }
2113
2114    // -------------------------------------------------------------------
2115    // Subscriber notification
2116    // -------------------------------------------------------------------
2117
2118    /// Queue a wave-end message for `node_id`'s subscribers.
2119    ///
2120    /// **Revision-tracked sink-snapshot batches (Slice X4 / D2,
2121    /// 2026-05-08):** each push for a given node either appends the
2122    /// message to the open batch (if `NodeRecord::subscribers_revision`
2123    /// hasn't advanced since that batch opened — the common case — no
2124    /// extra allocation), or opens a fresh batch with a current sink
2125    /// snapshot frozen at the new revision. A sub installed mid-wave
2126    /// bumps `subscribers_revision`; the next `queue_notify` for the
2127    /// same node observes the bump and starts a new batch that includes
2128    /// the new sub. Pre-subscribe batches retain their original snapshot,
2129    /// so earlier emits flush to their original sink list — the new sub
2130    /// does NOT double-receive them via flush AND handshake replay,
2131    /// closing the late-subscriber + multi-emit-per-wave R1.3.5.a gap.
2132    ///
2133    /// Pause routing decision (R1.3.7.b tier table, §10.2 buffering):
2134    ///   Tier 3 (DATA / RESOLVED) and Tier 4 (INVALIDATE) buffer while
2135    ///   paused; all other tiers (DIRTY tier 1, PAUSE/RESUME tier 2,
2136    ///   COMPLETE/ERROR tier 5, TEARDOWN tier 6) bypass the buffer and
2137    ///   flush immediately. START (tier 0) is per-subscription and never
2138    ///   transits queue_notify.
2139    pub(crate) fn queue_notify(&self, s: &mut CoreState, node_id: NodeId, msg: Message) {
2140        // R1.3.3.a / R1.3.3.d (Slice G — re-added 2026-05-07): dev-mode
2141        // wave-content invariant assertion. The tier-3 slot at one node in
2142        // one wave is either ≥1 DATA or exactly 1 RESOLVED — never mixed,
2143        // never multiple RESOLVED. Slice G moved equals substitution from
2144        // per-emit to wave-end coalescing; this assert pins that the
2145        // dispatcher itself never queues a violating combination at the
2146        // queue_notify granularity. Resolved arrivals come from:
2147        //   1. The auto-resolve sweep in `drain_and_flush` (gates on
2148        //      `!any tier-3` so it can't add to a wave with Data).
2149        //   2. The wave-end equals-substitution pass (rewrites in place,
2150        //      doesn't go through queue_notify).
2151        // Both honor R1.3.3.a by construction post-Slice-G.
2152        #[cfg(debug_assertions)]
2153        if matches!(msg.tier(), 3) {
2154            if let Some(entry) = s.pending_notify.get(&node_id) {
2155                // Walk all batches' messages — R1.3.3.a is a per-node
2156                // wave-content invariant, not per-batch (the X4 batches
2157                // are subscriber-snapshot epochs; the protocol-level
2158                // tier-3 invariant spans the whole wave for the node).
2159                let has_data = entry.iter_messages().any(|m| matches!(m, Message::Data(_)));
2160                let resolved_count = entry
2161                    .iter_messages()
2162                    .filter(|m| matches!(m, Message::Resolved))
2163                    .count();
2164                let incoming_is_data = matches!(msg, Message::Data(_));
2165                if incoming_is_data {
2166                    debug_assert!(
2167                        resolved_count == 0,
2168                        "R1.3.3.a violation at {node_id:?}: queueing Data into a \
2169                         wave that already contains Resolved — Slice G should have \
2170                         prevented this via wave-end coalescing"
2171                    );
2172                } else {
2173                    debug_assert!(
2174                        !has_data,
2175                        "R1.3.3.a violation at {node_id:?}: queueing Resolved into a \
2176                         wave that already contains Data"
2177                    );
2178                    debug_assert!(
2179                        resolved_count == 0,
2180                        "R1.3.3.a violation at {node_id:?}: multiple Resolved in one \
2181                         wave at one node"
2182                    );
2183                }
2184            }
2185        }
2186
2187        let buffered_tier = matches!(msg.tier(), 3 | 4);
2188        let cap = s.pause_buffer_cap;
2189
2190        // Pause-routing branch — handles its own retain/release and returns
2191        // before we touch `pending_notify`, so the rec borrow is contained.
2192        {
2193            let rec = s.require_node_mut(node_id);
2194            if rec.subscribers.is_empty() {
2195                return;
2196            }
2197            // Slice F audit close (2026-05-07): pause routing depends on mode.
2198            //   - `ResumeAll`: buffer tier-3/4 for verbatim replay on RESUME.
2199            //   - `Default` + STATE node: state nodes have no fn-fire to
2200            //     suppress, so buffer like resumeAll (collapse-to-latest is
2201            //     a future enhancement; v1 keeps verbatim).
2202            //   - `Default` + COMPUTE node: suppression happens upstream at
2203            //     fn-fire scheduling (see `deliver_data_to_consumer`); no
2204            //     outgoing tier-3 is produced from this node while paused,
2205            //     so this branch is unreachable for compute-default-paused.
2206            //     Fallthrough to the non-paused queue path is fine.
2207            //   - `Off`: pause is ignored entirely — tier-3 flushes
2208            //     immediately. Fallthrough.
2209            let mode_buffers_tier3 = match rec.pausable {
2210                crate::node::PausableMode::ResumeAll => true,
2211                crate::node::PausableMode::Default => rec.is_state(),
2212                crate::node::PausableMode::Off => false,
2213            };
2214            if buffered_tier && mode_buffers_tier3 && rec.pause_state.is_paused() {
2215                if let Some(h) = msg.payload_handle() {
2216                    self.binding.retain_handle(h);
2217                }
2218                let push_result = rec.pause_state.push_buffered(msg, cap);
2219                for dm in push_result.dropped_msgs {
2220                    if let Some(h) = dm.payload_handle() {
2221                        self.binding.release_handle(h);
2222                    }
2223                }
2224                // R1.3.8.c (Slice F, A3): on first overflow this cycle,
2225                // schedule a synthesized ERROR for wave-end emission.
2226                // `cap` is `Some` here (an overflow can only happen with a
2227                // configured cap), so `unwrap` is safe.
2228                if push_result.first_overflow_this_cycle {
2229                    if let Some((dropped_count, lock_held_ns)) =
2230                        rec.pause_state.overflow_diagnostic()
2231                    {
2232                        // Q2 (2026-05-09): pending_pause_overflow lives on
2233                        // CrossPartitionState. Lock-discipline: state →
2234                        // cross_partition.
2235                        self.lock_cross_partition().pending_pause_overflow.push(
2236                            crate::node::PendingPauseOverflow {
2237                                node_id,
2238                                dropped_count,
2239                                configured_max: cap.unwrap_or(0),
2240                                lock_held_ns,
2241                            },
2242                        );
2243                    }
2244                }
2245                return;
2246            }
2247        }
2248
2249        // Non-paused queue path: retain payload handle and queue into
2250        // pending_notify. Released in `flush_notifications` after sinks
2251        // fire.
2252        if let Some(h) = msg.payload_handle() {
2253            self.binding.retain_handle(h);
2254        }
2255        Self::push_into_pending_notify(s, node_id, msg);
2256    }
2257
2258    /// Slice X4 / D2: revision-tracked batch decision for `queue_notify`'s
2259    /// non-paused path. Either appends `msg` to the open batch (if
2260    /// `subscribers_revision` hasn't advanced since it opened — common
2261    /// case, no extra allocation) or opens a fresh batch with a current
2262    /// sink snapshot frozen at the new revision.
2263    ///
2264    /// Borrow discipline: reads `subscribers_revision` and the snapshot
2265    /// from `s.nodes` BEFORE calling `s.pending_notify.entry()` to keep
2266    /// the two field borrows disjoint (split-borrow through
2267    /// `require_node_mut` defeats the borrow checker).
2268    ///
2269    /// Lock-discipline assumption: this read of `subscribers_revision`
2270    /// is safe because both the subscribe install path
2271    /// ([`crate::node::Core::subscribe`]) and `queue_notify` hold
2272    /// `CoreState`'s mutex when they bump / read the revision —
2273    /// concurrent subscribe/unsubscribe cannot interleave. **If
2274    /// `Core::subscribe` ever moves the sink-install lock-released
2275    /// (mirroring the lock-released drain refactor), the revision read
2276    /// here must re-validate post-borrow — otherwise a fresh batch
2277    /// could open with a stale snapshot.**
2278    fn push_into_pending_notify(s: &mut CoreState, node_id: NodeId, msg: Message) {
2279        let current_rev = s.require_node(node_id).subscribers_revision;
2280        let needs_new_batch = s.pending_notify.get(&node_id).is_none_or(|entry| {
2281            entry
2282                .batches
2283                .last()
2284                .is_none_or(|b| b.snapshot_revision != current_rev)
2285        });
2286        let sinks_snapshot: Vec<Sink> = if needs_new_batch {
2287            s.require_node(node_id)
2288                .subscribers
2289                .values()
2290                .cloned()
2291                .collect()
2292        } else {
2293            Vec::new()
2294        };
2295        match s.pending_notify.entry(node_id) {
2296            Entry::Vacant(slot) => {
2297                let mut batches: SmallVec<[PendingBatch; 1]> = SmallVec::new();
2298                batches.push(PendingBatch {
2299                    snapshot_revision: current_rev,
2300                    sinks: sinks_snapshot,
2301                    messages: vec![msg],
2302                });
2303                slot.insert(PendingPerNode { batches });
2304            }
2305            Entry::Occupied(mut slot) => {
2306                let entry = slot.get_mut();
2307                if needs_new_batch {
2308                    entry.batches.push(PendingBatch {
2309                        snapshot_revision: current_rev,
2310                        sinks: sinks_snapshot,
2311                        messages: vec![msg],
2312                    });
2313                } else {
2314                    entry
2315                        .batches
2316                        .last_mut()
2317                        .expect("non-empty by construction (entry exists implies batch exists)")
2318                        .messages
2319                        .push(msg);
2320                }
2321            }
2322        }
2323    }
2324
2325    /// Collect wave-end sink-fire jobs into `s.deferred_flush_jobs` and the
2326    /// payload-handle releases owed for `pending_notify` into
2327    /// `s.deferred_handle_releases`. The actual sink fires + handle releases
2328    /// run **after** the state lock is dropped — see [`Core::run_wave`].
2329    ///
2330    /// R1.3.1.b two-phase propagation: phase 1 (DIRTY) propagates through
2331    /// the entire graph before phase 2 (DATA / RESOLVED) begins. Implemented
2332    /// here as cross-node tier-then-node collect — phase 1's jobs sit before
2333    /// phase 2's in `deferred_flush_jobs`, so when `run_wave` drains the
2334    /// queue lock-released, multi-node subscribers see all DIRTYs before any
2335    /// settle. Matches TS's drainPhase model without the per-tier queue
2336    /// indirection.
2337    ///
2338    /// Phase ordering:
2339    ///   1 → tier 1   (DIRTY)
2340    ///   2 → tier 3+4 (DATA/RESOLVED + INVALIDATE — the "settle slice")
2341    ///   3 → tier 5   (COMPLETE/ERROR)
2342    ///   4 → tier 6   (TEARDOWN)
2343    ///
2344    /// Tier 0 (START) is per-subscription (never enters pending_notify) and
2345    /// tier 2 (PAUSE/RESUME) is delivered through dedicated paths, also
2346    /// bypassing pending_notify; both are absent from this enumeration.
2347    ///
2348    /// Within a single phase, per-node insertion order (IndexMap iteration)
2349    /// is preserved — an emit on A before B → A's phase-2 messages flush
2350    /// before B's. Within a single node, message order is preserved.
2351    fn flush_notifications(&self, s: &mut CoreState) {
2352        const PHASES: &[&[u8]] = &[
2353            &[1],    // DIRTY
2354            &[3, 4], // DATA/RESOLVED + INVALIDATE
2355            &[5],    // COMPLETE/ERROR
2356            &[6],    // TEARDOWN
2357        ];
2358        let pending = std::mem::take(&mut s.pending_notify);
2359        for &phase_tiers in PHASES {
2360            for (_node_id, entry) in &pending {
2361                // Slice X4 / D2: iterate batches in arrival order. Each
2362                // batch carries its own sink snapshot frozen at open-time;
2363                // a batch's messages flush to ITS sinks only. Within a
2364                // single (phase, node), batches stay in arrival order so
2365                // emit-order semantics are preserved across batches.
2366                for batch in &entry.batches {
2367                    if batch.sinks.is_empty() {
2368                        continue;
2369                    }
2370                    let phase_msgs: Vec<Message> = batch
2371                        .messages
2372                        .iter()
2373                        .copied()
2374                        .filter(|m| phase_tiers.contains(&m.tier()))
2375                        .collect();
2376                    if phase_msgs.is_empty() {
2377                        continue;
2378                    }
2379                    let sinks_clone: Vec<Sink> = batch.sinks.iter().map(Arc::clone).collect();
2380                    s.deferred_flush_jobs.push((sinks_clone, phase_msgs));
2381                }
2382            }
2383        }
2384        // Refcount release: balance the retain done in `queue_notify` for
2385        // every payload-bearing message that landed in pending_notify
2386        // (across ALL batches per node). Deferred to post-lock-drop so the
2387        // binding's release path can't re-enter Core under our lock.
2388        // Q2 (2026-05-09): deferred_handle_releases lives on
2389        // CrossPartitionState. Acquire briefly under state-held — lock
2390        // discipline `state → cross_partition` is preserved.
2391        let mut cps = self.lock_cross_partition();
2392        for entry in pending.values() {
2393            for msg in entry.iter_messages() {
2394                if let Some(h) = msg.payload_handle() {
2395                    cps.deferred_handle_releases.push(h);
2396                }
2397            }
2398        }
2399    }
2400
2401    /// Take the deferred sink-fire jobs, payload-handle releases,
2402    /// cleanup-hook fire queue, and pending-wipe queue from both
2403    /// `CoreState` and `CrossPartitionState`. Callers pair this with
2404    /// `drop(state_guard)` and a subsequent [`Self::fire_deferred`]
2405    /// call to deliver the wave's sinks, handle releases, Slice E2
2406    /// OnInvalidate cleanup hooks, and Slice E2 /qa Q2(b) eager
2407    /// wipe_ctx fires lock-released.
2408    ///
2409    /// Q2 (2026-05-09): `deferred_handle_releases` source moved to
2410    /// CrossPartitionState — signature widened.
2411    pub(crate) fn drain_deferred(
2412        s: &mut CoreState,
2413        cps: &mut crate::node::CrossPartitionState,
2414    ) -> WaveDeferred {
2415        (
2416            std::mem::take(&mut s.deferred_flush_jobs),
2417            std::mem::take(&mut cps.deferred_handle_releases),
2418            std::mem::take(&mut s.deferred_cleanup_hooks),
2419            std::mem::take(&mut s.pending_wipes),
2420        )
2421    }
2422
2423    /// Fire deferred sink-fire jobs in collected order, then release the
2424    /// payload handles owed for messages that landed in `pending_notify`
2425    /// during the wave, then fire any queued Slice E2 OnInvalidate cleanup
2426    /// hooks. All three phases run lock-released so:
2427    /// - Sinks that call back into Core (emit, pause, etc.) re-acquire the
2428    ///   state lock cleanly and run their own nested wave.
2429    /// - The binding's `release_handle` path can't deadlock against a
2430    ///   binding-side mutex held by Core.
2431    /// - User cleanup closures (invoked via `BindingBoundary::cleanup_for`)
2432    ///   may safely re-enter Core for unrelated nodes.
2433    ///
2434    /// **Cleanup-drain panic discipline (D060):** each `cleanup_for` call
2435    /// is wrapped in `catch_unwind` so a single binding panic doesn't
2436    /// short-circuit the per-wave drain. All queued cleanup attempts run;
2437    /// if any panicked, the LAST panic re-raises after the loop completes
2438    /// (preserving wave-end discipline while still surfacing failures).
2439    /// Per D060, Core stays panic-naive about user code — bindings own
2440    /// their host-language panic policy inside `cleanup_for`; this
2441    /// `catch_unwind` is purely about drain-don't-short-circuit.
2442    pub(crate) fn fire_deferred(
2443        &self,
2444        jobs: DeferredJobs,
2445        releases: Vec<HandleId>,
2446        cleanup_hooks: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
2447        pending_wipes: Vec<crate::handle::NodeId>,
2448    ) {
2449        // Slice E2 /qa P1 (2026-05-07): wrap each sink-fire in
2450        // `catch_unwind` so a panicking sink doesn't unwind out of
2451        // `fire_deferred` and drop the queued `releases` +
2452        // `cleanup_hooks`. Mirrors Slice F audit fix A7's per-tier
2453        // handshake-fire discipline. Without this guard, a sink panic
2454        // here would silently leak handle retains AND silently drop
2455        // OnInvalidate cleanup hooks. AssertUnwindSafe is safe because
2456        // we re-raise the last panic at the end after running every
2457        // queued fire — drain ordering is preserved.
2458        let mut last_panic: Option<Box<dyn std::any::Any + Send>> = None;
2459        for (sinks, msgs) in jobs {
2460            for sink in &sinks {
2461                let sink = sink.clone();
2462                let msgs_ref = &msgs;
2463                let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2464                    sink(msgs_ref);
2465                }));
2466                if let Err(payload) = result {
2467                    last_panic = Some(payload);
2468                }
2469            }
2470        }
2471        for h in releases {
2472            self.binding.release_handle(h);
2473        }
2474        // Slice E2 (D060): drain cleanup hooks with per-item panic
2475        // isolation so the loop always completes. AssertUnwindSafe is
2476        // safe here because we don't rely on logical state being valid
2477        // post-panic — the panic propagates anyway after the drain ends.
2478        for (node_id, trigger) in cleanup_hooks {
2479            let binding = &self.binding;
2480            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2481                binding.cleanup_for(node_id, trigger);
2482            }));
2483            if let Err(payload) = result {
2484                last_panic = Some(payload);
2485            }
2486        }
2487        // Slice E2 /qa Q2(b) (D069): drain eager wipe_ctx queue with the
2488        // same per-item panic isolation. Fires AFTER cleanup hooks so a
2489        // resubscribable node's OnInvalidate (or any tier-3+ cleanup that
2490        // fires in the same wave) sees pre-wipe binding state if it
2491        // landed in the same wave as the terminal cascade. Mutually
2492        // exclusive with `Subscription::Drop`'s direct-fire site, but
2493        // even concurrent fires are idempotent (binding's `wipe_ctx`
2494        // calls `HashMap::remove` which is a no-op on absent keys).
2495        for node_id in pending_wipes {
2496            let binding = &self.binding;
2497            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2498                binding.wipe_ctx(node_id);
2499            }));
2500            if let Err(payload) = result {
2501                last_panic = Some(payload);
2502            }
2503        }
2504        if let Some(payload) = last_panic {
2505            std::panic::resume_unwind(payload);
2506        }
2507    }
2508
2509    // -------------------------------------------------------------------
2510    // User-facing batch — coalesce multiple emits into one wave
2511    // -------------------------------------------------------------------
2512
2513    /// Coalesce multiple emissions into a single wave. Every `emit` /
2514    /// `complete` / `error` / `teardown` / `invalidate` call inside `f`
2515    /// queues its downstream work; the wave drains when `f` returns.
2516    ///
2517    /// **R1.3.6.a** — DIRTY still propagates immediately (tier 1 isn't
2518    /// deferred); only tier-3+ delivery is held until scope exit. **R1.3.6.b**
2519    /// — repeated emits on the same node coalesce into a single multi-message
2520    /// delivery (one [`Message::Dirty`] for the wave + one [`Message::Data`]
2521    /// per emit, all delivered together in the per-node phase-2 pass).
2522    ///
2523    /// Nested `batch()` calls share the outer wave; only the outermost call
2524    /// drives the drain. Re-entrant calls from inside an `emit`/fn (the wave
2525    /// engine's own `s.in_tick` re-entrance) compose with this method
2526    /// transparently — they observe `in_tick = true` and skip drain just
2527    /// like nested `batch()`.
2528    ///
2529    /// On panic inside `f`, the `BatchGuard` returned by the internal
2530    /// `begin_batch` call drops normally and discards pending tier-3+ work
2531    /// (subscribers do not observe the half-built wave). See
2532    /// [`Core::begin_batch`] for the RAII variant if you need explicit control
2533    /// over the scope boundary.
2534    pub fn batch<F>(&self, f: F)
2535    where
2536        F: FnOnce(),
2537    {
2538        let _guard = self.begin_batch();
2539        f();
2540    }
2541
2542    /// RAII batch handle — opens a wave when constructed, drains on drop.
2543    ///
2544    /// Mirrors the closure-based [`Self::batch`] but exposes the scope
2545    /// boundary so callers can compose batches with non-`FnOnce` control
2546    /// flow (e.g. async-state-machine code paths, or splitting setup and
2547    /// drain across helper functions).
2548    ///
2549    /// ```ignore
2550    /// let g = core.begin_batch();
2551    /// core.emit(state_a, h1);
2552    /// core.emit(state_b, h2);
2553    /// drop(g); // wave drains here
2554    /// ```
2555    ///
2556    /// Like the closure form, nested `begin_batch` calls share the outer
2557    /// wave (only the outermost guard drains).
2558    ///
2559    /// # Panics
2560    ///
2561    /// Panics if the registry-epoch retry-validate loop exceeds
2562    /// [`crate::subgraph::MAX_LOCK_RETRIES`] iterations — pathological
2563    /// concurrent `register` / `set_deps` activity racing with
2564    /// closure-form batch entry. Unreachable in correct call paths.
2565    #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
2566    pub fn begin_batch(&self) -> BatchGuard {
2567        // Slice Y1 / Phase E (2026-05-08): closure-form batch has no known
2568        // seed; per session-doc Q7 / D092 it MUST serialize against every
2569        // currently-existing partition. Acquire each partition's
2570        // `wave_owner` in ascending [`SubgraphId`] order via the retry-
2571        // validate primitive. Same-thread re-entry passes through each
2572        // ReentrantMutex transparently; cross-thread waves on any of the
2573        // touched partitions block until our `wave_guards` drop.
2574        //
2575        // **QA-fix #2 (2026-05-09) — registry epoch retry-validate:** a
2576        // concurrent `register` / `set_deps`-driven union/split between
2577        // our `all_partitions_lock_boxes()` snapshot and the post-
2578        // acquire epoch read changes the partition set. We then retry
2579        // the whole acquire with the new snapshot. Without this, a
2580        // partition added after our snapshot would not be held by our
2581        // batch — breaking the closure-form's "all-partitions
2582        // serialization" contract.
2583        //
2584        // Trade-off (documented v1 contract): closure-form batch is the
2585        // serialization point under per-partition parallelism. Per-seed
2586        // entry points (`Core::subscribe`, [`Self::begin_batch_for`])
2587        // acquire only the touched partitions and run truly parallel
2588        // for disjoint partitions.
2589        for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
2590            let epoch_before = self.registry.lock().epoch();
2591            let partition_boxes = self.all_partitions_lock_boxes();
2592            let mut wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]> = SmallVec::new();
2593            for (sid, _box) in &partition_boxes {
2594                // Use the partition's root NodeId as the lock_for retry
2595                // seed. SubgraphId.raw() == root NodeId.raw(); the root
2596                // is always registered in the X5 / Phase-E substrate
2597                // (cleanup_node is gated, Phase G activates).
2598                let representative = crate::handle::NodeId::new(sid.raw());
2599                wave_guards.push(self.partition_wave_owner_lock_arc(representative));
2600            }
2601            // Post-acquire epoch read. If unchanged, our snapshot is
2602            // still authoritative — every existing partition was held
2603            // throughout. If changed, drop guards and retry.
2604            let epoch_after = self.registry.lock().epoch();
2605            if epoch_after == epoch_before {
2606                return self.begin_batch_with_guards(wave_guards);
2607            }
2608            // Drop guards lock-released so retries don't accumulate.
2609            drop(wave_guards);
2610            std::thread::yield_now();
2611        }
2612        panic!(
2613            "Core::begin_batch: exceeded {} retries — pathological concurrent \
2614             register/union/split activity racing with closure-form batch entry",
2615            crate::subgraph::MAX_LOCK_RETRIES
2616        );
2617    }
2618
2619    /// Begin a batch scoped to the partitions transitively touched from
2620    /// `seed`. Walks `s.children` (downstream cascade) + `meta_companions`
2621    /// (R1.3.9.d TEARDOWN cascade) starting at `seed`, collects every
2622    /// reachable partition, and acquires each in ascending
2623    /// [`crate::subgraph::SubgraphId`] order via
2624    /// [`Core::partition_wave_owner_lock_arc`].
2625    ///
2626    /// Two threads with disjoint touched-partition sets run truly
2627    /// parallel — the per-partition `wave_owner` mutexes don't block
2628    /// each other. This is the canonical Y1 parallelism win for
2629    /// per-seed wave-driving entry points (subscribe, emit, pause,
2630    /// resume, invalidate, complete, error, teardown,
2631    /// set_deps push-on-subscribe).
2632    ///
2633    /// **QA-fix #2 (2026-05-09):** retry-validate the touched-partition
2634    /// set against the registry epoch — same protection as
2635    /// [`Self::begin_batch`] but scoped to a per-seed touched set
2636    /// rather than every partition. Conservative: any registry
2637    /// mutation (even on a partition unrelated to seed's touched set)
2638    /// triggers a retry. This avoids a precise "did MY touched set
2639    /// change?" check at the cost of occasional spurious retries.
2640    ///
2641    /// # Panics
2642    ///
2643    /// Panics if the registry-epoch retry-validate loop exceeds
2644    /// [`crate::subgraph::MAX_LOCK_RETRIES`] iterations, OR if
2645    /// [`Core::partition_wave_owner_lock_arc`] panics on an
2646    /// unregistered seed. Both are unreachable in correct call paths
2647    /// (P12 invariant guarantees registry membership matches
2648    /// `s.nodes`).
2649    ///
2650    /// Slice Y1 / Phase E (2026-05-08); QA-fix #2 (2026-05-09).
2651    #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
2652    pub fn begin_batch_for(&self, seed: crate::handle::NodeId) -> BatchGuard {
2653        for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
2654            let epoch_before = self.registry.lock().epoch();
2655            let touched = self.compute_touched_partitions(seed);
2656            let mut wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]> = SmallVec::new();
2657            for sid in &touched {
2658                let representative = crate::handle::NodeId::new(sid.raw());
2659                wave_guards.push(self.partition_wave_owner_lock_arc(representative));
2660            }
2661            let epoch_after = self.registry.lock().epoch();
2662            if epoch_after == epoch_before {
2663                return self.begin_batch_with_guards(wave_guards);
2664            }
2665            drop(wave_guards);
2666            std::thread::yield_now();
2667        }
2668        panic!(
2669            "Core::begin_batch_for(seed={seed:?}): exceeded {} retries — \
2670             pathological concurrent register/union/split activity racing \
2671             with per-seed batch entry",
2672            crate::subgraph::MAX_LOCK_RETRIES
2673        );
2674    }
2675
2676    /// Internal helper: claim `in_tick` and assemble a [`BatchGuard`]
2677    /// with the supplied (already-acquired) partition wave-owner guards.
2678    /// `wave_guards` MUST be in ascending [`crate::subgraph::SubgraphId`]
2679    /// order (the canonical lock-acquisition order) — both
2680    /// [`Self::begin_batch`] (all-partitions) and
2681    /// [`Self::begin_batch_for`] (touched-partitions) construct the
2682    /// vector in that order before calling here.
2683    fn begin_batch_with_guards(
2684        &self,
2685        wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]>,
2686    ) -> BatchGuard {
2687        let owns_tick = {
2688            let mut s = self.lock_state();
2689            let was_in = s.in_tick;
2690            if !was_in {
2691                s.in_tick = true;
2692            }
2693            !was_in
2694        };
2695        // D1 patch (2026-05-09): defensive wave-start clear of the
2696        // per-thread Slice G tier3 tracker on outermost owning entry.
2697        // The thread-local is cleared at outermost BatchGuard drop on
2698        // both success + panic paths; this start-clear is belt-and-
2699        // suspenders against panic paths that bypass Drop (catch_unwind
2700        // can interleave with thread reuse — e.g. cargo's test-runner
2701        // thread pool — and propagate stale entries from a prior
2702        // panicked test's wave that didn't fully unwind through
2703        // BatchGuard::drop).
2704        if owns_tick {
2705            tier3_clear();
2706        }
2707        BatchGuard {
2708            core: self.clone(),
2709            owns_tick,
2710            wave_guards,
2711            _not_send: std::marker::PhantomData,
2712        }
2713    }
2714}
2715
2716/// RAII guard returned by [`Core::begin_batch`].
2717///
2718/// While alive, suppresses per-emit wave drains — multiple `emit` /
2719/// `complete` / `error` / `teardown` / `invalidate` calls coalesce into one
2720/// wave. On drop:
2721/// - Outermost guard: drains the wave (fires sinks, runs cleanup, clears
2722///   in-tick).
2723/// - Nested guard (an outer `BatchGuard` or an in-progress wave already owns
2724///   the in-tick flag): silently no-ops.
2725///
2726/// On thread panic during the closure body, the drop path discards pending
2727/// tier-3+ delivery rather than firing sinks (avoids cascading panics).
2728/// Subscribers observe **no tier-3+ delivery for the panicked wave**.
2729/// State-node cache writes that already executed inside the closure are
2730/// rolled back via wave-cache snapshots — `cache_of(s)` returns the pre-
2731/// panic value. The atomicity guarantee covers both sink-observability and
2732/// cache state.
2733///
2734/// # Thread safety
2735///
2736/// `BatchGuard` is **`!Send`** by design. `begin_batch` claims the
2737/// per-`Core` `in_tick` flag AND the per-partition `wave_owner`
2738/// re-entrant mutex(es) on the calling thread; sending the guard to
2739/// another thread and dropping it there would clear `in_tick` and
2740/// release the wave-owner guards from a different thread than the
2741/// one that acquired them, breaking both the thread-local "I own
2742/// the wave scope" semantic and `parking_lot::ReentrantMutex`'s
2743/// ownership invariant. The `wave_guards` field is a `SmallVec` of
2744/// `!Send` `ArcReentrantMutexGuard<()>`; the `PhantomData<*const ()>`
2745/// marker is belt-and-suspenders.
2746///
2747/// Slice Y1 / Phase E (2026-05-08): the field migrated from a single
2748/// `ArcReentrantMutexGuard` (legacy Core-global `wave_owner`) to a
2749/// `SmallVec` of partition wave-owner guards. Closure-form
2750/// `begin_batch` acquires every current partition (serialization
2751/// point); `begin_batch_for(seed)` acquires only the transitively-
2752/// touched partitions (parallel for disjoint sets).
2753///
2754/// ```compile_fail
2755/// use graphrefly_core::{BatchGuard, BindingBoundary, Core, DepBatch, FnId, FnResult, HandleId, NodeId};
2756/// use std::sync::Arc;
2757///
2758/// struct Stub;
2759/// impl BindingBoundary for Stub {
2760///     fn invoke_fn(&self, _: NodeId, _: FnId, _: &[DepBatch]) -> FnResult {
2761///         FnResult::Noop { tracked: None }
2762///     }
2763///     fn custom_equals(&self, _: FnId, _: HandleId, _: HandleId) -> bool { false }
2764///     fn release_handle(&self, _: HandleId) {}
2765/// }
2766/// fn requires_send<T: Send>(_: T) {}
2767/// let core = Core::new(Arc::new(Stub) as Arc<dyn BindingBoundary>);
2768/// let guard = core.begin_batch();
2769/// requires_send(guard); // <- compile_fail: BatchGuard is !Send.
2770/// ```
2771#[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
2772pub struct BatchGuard {
2773    core: Core,
2774    owns_tick: bool,
2775    /// Re-entrant mutex guards held for the wave's duration. One entry
2776    /// per touched partition's `wave_owner`, in ascending
2777    /// [`crate::subgraph::SubgraphId`] order. Drop releases each guard
2778    /// (any order — `parking_lot::ReentrantMutex` doesn't care since all
2779    /// are held by the same thread). Cross-thread waves on any of the
2780    /// held partitions block until our scope ends; cross-thread waves
2781    /// on partitions NOT in this vector run truly parallel — the
2782    /// canonical Y1 parallelism property.
2783    ///
2784    /// Each `ArcReentrantMutexGuard<()>` is `!Send`, so the `SmallVec`
2785    /// (and thus `BatchGuard`) is `!Send` at the type level — sending
2786    /// across threads would violate `parking_lot::ReentrantMutex`'s
2787    /// thread-ownership invariant.
2788    wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]>,
2789    _not_send: std::marker::PhantomData<*const ()>,
2790}
2791
2792impl Drop for BatchGuard {
2793    fn drop(&mut self) {
2794        if !self.owns_tick {
2795            return;
2796        }
2797        if std::thread::panicking() {
2798            // Discard pending wave work to avoid firing sinks during
2799            // unwind (sink panic during unwind would abort the process).
2800            //
2801            // Refcount discipline: pending_notify entries (or any
2802            // already-collected deferred_handle_releases from a partial
2803            // drain) hold a queue_notify-time retain on every payload
2804            // handle. Release them here so the discard doesn't leak.
2805            //
2806            // Wave-cache snapshots restore pre-panic cache values so the
2807            // atomicity guarantee covers state, not just observability.
2808            let (pending, deferred_releases, restored_releases) = {
2809                let mut s = self.core.lock_state();
2810                // Q2 (2026-05-09): cross_partition lock acquired alongside
2811                // state for the panic-discard cleanup. Lock-discipline:
2812                // state → cross_partition.
2813                let mut cps = self.core.lock_cross_partition();
2814                let pending = std::mem::take(&mut s.pending_notify);
2815                let _: DeferredJobs = std::mem::take(&mut s.deferred_flush_jobs);
2816                s.pending_fires.clear();
2817                let restored = self.core.restore_wave_cache_snapshots(&mut s, &mut cps);
2818                // clear_wave_state pushes batch-handle releases into
2819                // cps.deferred_handle_releases, so take cps's queue AFTER
2820                // the clear.
2821                s.clear_wave_state(&mut cps);
2822                cps.clear_wave_state();
2823                let deferred_releases = std::mem::take(&mut cps.deferred_handle_releases);
2824                // Slice E2 (D061): panic-discard wave drops queued
2825                // OnInvalidate cleanup hooks SILENTLY. Bindings using
2826                // OnInvalidate for external-resource cleanup MUST
2827                // idempotent-cleanup at process exit / next successful
2828                // invalidate. Mirrors A3 `pending_pause_overflow`
2829                // panic-discard precedent.
2830                let _: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)> =
2831                    std::mem::take(&mut s.deferred_cleanup_hooks);
2832                // Slice E2 /qa Q2(b) (D069): same panic-discard discipline
2833                // for the eager-wipe queue. A panic-discarded wave drops
2834                // queued `wipe_ctx` fires silently; the binding-side
2835                // `NodeCtxState` entry remains until the next successful
2836                // terminate-with-no-subs cycle (or until `Core` drops).
2837                // This mirrors D061's external-resource-cleanup gap and
2838                // is documented similarly.
2839                let _: Vec<crate::handle::NodeId> = std::mem::take(&mut s.pending_wipes);
2840                s.in_tick = false;
2841                (pending, deferred_releases, restored)
2842            };
2843            // Lock dropped — release retains lock-released so the binding
2844            // can't deadlock against an internal binding mutex.
2845            for entry in pending.values() {
2846                for msg in entry.iter_messages() {
2847                    if let Some(h) = msg.payload_handle() {
2848                        self.core.binding.release_handle(h);
2849                    }
2850                }
2851            }
2852            for h in deferred_releases {
2853                self.core.binding.release_handle(h);
2854            }
2855            for h in restored_releases {
2856                self.core.binding.release_handle(h);
2857            }
2858            // D1 patch (2026-05-09): clear the per-thread Slice G tier3
2859            // tracker on outermost wave-end (panic-discard path). The
2860            // thread-local outlives the BatchGuard otherwise — cargo's
2861            // thread reuse across tests would propagate stale entries.
2862            tier3_clear();
2863            return;
2864        }
2865        // Successful drain — drain_and_flush manages its own locking.
2866        self.core.drain_and_flush();
2867        // Wave cleanup + extract deferred jobs under the lock.
2868        let (jobs, releases, cleanup_hooks, pending_wipes, snapshot_releases) = {
2869            let mut s = self.core.lock_state();
2870            // Q2 (2026-05-09): cross_partition lock acquired alongside
2871            // state for wave-end cleanup. Lock-discipline: state →
2872            // cross_partition.
2873            let mut cps = self.core.lock_cross_partition();
2874            s.clear_wave_state(&mut cps);
2875            cps.clear_wave_state();
2876            s.in_tick = false;
2877            // /qa A1 fix (2026-05-09): drain snapshot retains under
2878            // lock, release lock-released below to avoid binding
2879            // re-entrance under held mutexes.
2880            let snapshot_releases = Core::drain_wave_cache_snapshots(&mut cps);
2881            // `drain_deferred` takes `deferred_flush_jobs` +
2882            // `deferred_handle_releases` (incl. rotation releases pushed
2883            // by `clear_wave_state` above) + Slice E2
2884            // `deferred_cleanup_hooks` + Slice E2 /qa Q2(b)
2885            // `pending_wipes`.
2886            let (jobs, releases, hooks, wipes) = Core::drain_deferred(&mut s, &mut cps);
2887            (jobs, releases, hooks, wipes, snapshot_releases)
2888        };
2889        // Lock dropped — fire deferred sinks + release retains + fire
2890        // cleanup hooks (Slice E2 OnInvalidate, D060 catch_unwind drain)
2891        // + fire eager wipes (D069).
2892        self.core
2893            .fire_deferred(jobs, releases, cleanup_hooks, pending_wipes);
2894        // /qa A1 fix (2026-05-09): release wave_cache_snapshots retains
2895        // lock-released. Pre-A1 these were released inside the held
2896        // state + cross_partition locks; binding finalizers re-entering
2897        // Core would deadlock against either mutex. Drained earlier
2898        // under the lock; released here after both mutexes dropped and
2899        // sinks have fired.
2900        for h in snapshot_releases {
2901            self.core.binding.release_handle(h);
2902        }
2903        // D1 patch (2026-05-09): clear the per-thread Slice G tier3
2904        // tracker at outermost wave-end (success path). Mirrors the
2905        // panic-discard branch above. Thread-local outlives BatchGuard
2906        // by default; cargo's thread-reuse across tests would propagate
2907        // stale entries. Cleared after sinks fire (sink callbacks may
2908        // re-enter Core via emit and could read the tier3 set
2909        // mid-wave; the wave is over here so clearing is safe).
2910        tier3_clear();
2911        // QA-fix group 2 (2026-05-09): explicitly drop the wave guards
2912        // in REVERSE acquisition order. `parking_lot::ReentrantMutex`
2913        // doesn't care about release order for same-thread holders, but
2914        // a future migration to a non-reentrant lock (or one with a
2915        // Drop side-effect tied to ordering) would silently break if we
2916        // relied on `SmallVec`'s default forward-iteration drop. The
2917        // ascending-acquire / descending-release pattern is the
2918        // canonical lock-discipline shape.
2919        while let Some(guard) = self.wave_guards.pop() {
2920            drop(guard);
2921        }
2922    }
2923}