Skip to main content

graphrefly_core/
batch.rs

1//! Wave engine — drain loop, fire selection, emission commit, sink dispatch.
2//!
3//! Ports the wave-engine portion of the handle-protocol prototype
4//! (`~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`).
5//! Sibling to [`super::node`]; the dispatcher's other concerns
6//! (registration, subscription, pause/resume, terminal cascade,
7//! `set_deps`) live there.
8//!
9//! # Wave engine entry points
10//!
11//! - [`Core::run_wave`] — wave entry. Claims `in_tick` under the state lock,
12//!   runs `op` lock-released, then drains all transitive fn-fires and
13//!   flushes per-subscriber notifications. Each fn-fire iteration drops
14//!   the state lock around `BindingBoundary::invoke_fn` so user fn callbacks
15//!   can re-enter Core safely.
16//! - [`Core::drain_and_flush`] — drain phase + flush phase. Acquires/drops
17//!   the state lock per iteration around `invoke_fn`.
18//! - [`Core::commit_emission`] — equals-substitution + DIRTY/DATA/RESOLVED
19//!   queueing + child propagation. `&self`-only; bracket-fires
20//!   `BindingBoundary::custom_equals` lock-released.
21//! - [`Core::queue_notify`] — per-subscriber message queueing with
22//!   pause-buffer routing. Snapshots the subscriber list at first-touch-
23//!   per-wave so late subscribers (installed mid-wave between drain
24//!   iterations) don't receive duplicate deliveries from messages already
25//!   queued before they subscribed.
26//! - [`Core::deliver_data_to_consumer`] — single-edge propagation; marks
27//!   the consumer for fn-fire if its tracked-deps set is satisfied.
28//!   Called from `commit_emission`, plus `activate_derived` and
29//!   `set_deps` in [`super::node`].
30//!
31//! # Re-entrance discipline (Slice A close — M1 fully lock-released)
32//!
33//! - **Wave-end sink fires** drop the state lock first (Slice A-bigger
34//!   discipline).
35//! - **`BindingBoundary::invoke_fn`** in `fire_fn` fires lock-released —
36//!   user fn callbacks may re-enter `Core::emit` / `pause` / `resume` /
37//!   `invalidate` / `complete` / `error` / `teardown` and run a nested
38//!   wave (the existing `s.in_tick` re-entrance gate composes
39//!   transparently).
40//! - **`BindingBoundary::custom_equals`** in `commit_emission`'s equals
41//!   check fires lock-released.
42//! - **Subscribe-time handshake** is the one remaining lock-held callback.
43//!   It now fires per-tier (`[Start]`, `[Data(v)]`, `[Complete|Error]`,
44//!   `[Teardown]`) as separate sink calls, matching the canonical R1.3.5.a
45//!   tier-split. Re-entrance from a handshake sink callback panics with
46//!   the [`reentrance_guard`] diagnostic.
47
48use std::sync::Arc;
49
50use indexmap::map::Entry;
51use parking_lot::ArcReentrantMutexGuard;
52
53use smallvec::SmallVec;
54
55use crate::boundary::{DepBatch, FnEmission, FnResult};
56use crate::handle::{HandleId, NodeId, NO_HANDLE};
57use crate::message::Message;
58use crate::node::{Core, CoreState, EqualsMode, OperatorOp, Sink, TerminalKind};
59
60/// Deferred sink-fire jobs collected during `flush_notifications`. Each
61/// entry pairs a snapshot of the sink Arcs to fire with the messages to
62/// deliver to them — one entry per (node × phase) cell with non-empty
63/// content. Drained from `CoreState` and fired lock-released.
64pub(crate) type DeferredJobs = Vec<(Vec<Sink>, Vec<Message>)>;
65
66/// Lock-released drain payload of the wave's BatchGuard:
67/// `(sink_jobs, handle_releases, OnInvalidate cleanup hooks, pending wipe_ctx fires)`.
68/// Returned by [`Core::drain_deferred`], consumed by [`Core::fire_deferred`].
69/// Sliced into a type alias to satisfy `clippy::type_complexity`.
70pub(crate) type WaveDeferred = (
71    DeferredJobs,
72    Vec<HandleId>,
73    Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
74    Vec<crate::handle::NodeId>,
75);
76
77/// One subscriber-snapshot epoch within a node's wave-end notification
78/// queue. A `PendingBatch` is opened the first time `queue_notify` runs
79/// for the node in a wave, and a fresh batch is opened whenever the node's
80/// `subscribers_revision` advances mid-wave (a new sink subscribes, an
81/// existing sink unsubscribes, or a handshake-time panic evicts an
82/// orphaned sink). All messages within one batch flush to the same sink
83/// list — the snapshot taken when the batch opened, frozen against
84/// subsequent revision bumps.
85pub(crate) struct PendingBatch {
86    /// `NodeRecord::subscribers_revision` value at the moment this batch
87    /// opened. Used by `queue_notify` to decide append-to-last-batch vs
88    /// open-fresh-batch on every push.
89    pub(crate) snapshot_revision: u64,
90    pub(crate) sinks: Vec<Sink>,
91    pub(crate) messages: Vec<Message>,
92}
93
94/// Per-node wave-end notification queue, structured as one or more
95/// subscriber-snapshot epochs (`batches`). The common case (no
96/// mid-wave subscribe / unsubscribe at this node) keeps a single
97/// inline batch — `SmallVec<[_; 1]>` keeps that allocation-free.
98///
99/// **Slice X4 / D2 (2026-05-08):** the prior shape was a single
100/// `(sinks, messages)` pair per node — the snapshot froze on first
101/// `queue_notify` and was reused for every subsequent emit to the same
102/// node in the wave. That caused the documented late-subscriber +
103/// multi-emit-per-wave gap (R1.3.5.a divergence): a sub installed
104/// between two emits to the same node was invisible to the second
105/// emit's flush slice. The revision-tracked batch list resolves it —
106/// late subs land in a fresh batch that frozenly carries them, while
107/// pre-subscribe batches retain their original snapshot so the new
108/// sub doesn't double-receive earlier emits via flush AND handshake.
109pub(crate) struct PendingPerNode {
110    pub(crate) batches: SmallVec<[PendingBatch; 1]>,
111}
112
113impl PendingPerNode {
114    /// Iterate every queued message for this node across all batches in
115    /// arrival order. Used by R1.3.3.a invariant assertions and the
116    /// auto-resolve / Slice-G coalescing tier-3-presence checks, which
117    /// reason about wave-content per node, not per batch.
118    pub(crate) fn iter_messages(&self) -> impl Iterator<Item = &Message> + '_ {
119        self.batches.iter().flat_map(|b| b.messages.iter())
120    }
121
122    /// Mutable counterpart for `iter_messages`. Used by
123    /// `rewrite_prior_resolved_to_data` to in-place rewrite Resolved
124    /// entries to Data when a wave detects a multi-emit case after the
125    /// fact.
126    pub(crate) fn iter_messages_mut(&mut self) -> impl Iterator<Item = &mut Message> + '_ {
127        self.batches.iter_mut().flat_map(|b| b.messages.iter_mut())
128    }
129}
130
131/// RAII helper for the A6 reentrancy guard (Slice F, 2026-05-07).
132///
133/// Pushes `node_id` onto [`CoreState::currently_firing`] on construction,
134/// pops it on Drop. [`Core::set_deps`] consults the stack and rejects
135/// `set_deps(N, ...)` from inside N's own fn-fire with
136/// [`crate::node::SetDepsError::ReentrantOnFiringNode`] — closing the
137/// D1 hazard where Phase-1's snapshot of `dep_handles` would refer to
138/// a different dep ordering than Phase-3's `tracked` storage.
139///
140/// Wraps the lock-released `invoke_fn` (and operator-equivalent FFI
141/// callbacks like `project_each` / `predicate_each`). Drop fires even
142/// on panic, so the stack stays balanced under user-fn unwinds.
143///
144/// Membership semantics (NOT strict LIFO): the only consumer of
145/// `currently_firing` is `Core::set_deps`'s reentrancy check, which uses
146/// `contains(&n)` — a set-membership test. Drop pops the right-most
147/// matching `node_id` via `rposition` + `swap_remove`. For a stack like
148/// `[A, B, A]` (A's fn re-enters B, B's fn re-enters A), B's drop pops
149/// the SECOND A (index 1) via swap_remove, leaving `[A, A]` — the
150/// physical order of the remaining As may not match construction order,
151/// but membership is preserved. If a future call site needs strict LIFO
152/// (e.g. "pop the most recently fired node"), switch to `pop()` + assert
153/// the popped value equals `self.node_id`. (QA A6, 2026-05-07)
154pub(crate) struct FiringGuard {
155    core: Core,
156    node_id: NodeId,
157}
158
159impl FiringGuard {
160    pub(crate) fn new(core: &Core, node_id: NodeId) -> Self {
161        core.lock_state().currently_firing.push(node_id);
162        Self {
163            core: core.clone(),
164            node_id,
165        }
166    }
167}
168
169impl Drop for FiringGuard {
170    fn drop(&mut self) {
171        let mut s = self.core.lock_state();
172        if let Some(pos) = s.currently_firing.iter().rposition(|n| *n == self.node_id) {
173            s.currently_firing.swap_remove(pos);
174        }
175        // else: already popped by an external rebalance — silent no-op
176        // for Drop discipline (panic-in-Drop is poison).
177    }
178}
179
180/// Borrow the per-operator scratch slot as `&T`. Panics if the slot is
181/// uninitialized or the contained type doesn't match `T` — both are
182/// invariant violations for any `fire_op_*` helper that should only be
183/// called from `fire_operator`'s match arm for the matching variant.
184fn scratch_ref<T: crate::op_state::OperatorScratch>(s: &CoreState, node_id: NodeId) -> &T {
185    s.require_node(node_id)
186        .op_scratch
187        .as_ref()
188        .expect("op_scratch slot uninitialized for operator node")
189        .as_any_ref()
190        .downcast_ref::<T>()
191        .expect("op_scratch type mismatch")
192}
193
194/// Mutable borrow of the per-operator scratch slot. Same invariants as
195/// [`scratch_ref`].
196fn scratch_mut<T: crate::op_state::OperatorScratch>(s: &mut CoreState, node_id: NodeId) -> &mut T {
197    s.require_node_mut(node_id)
198        .op_scratch
199        .as_mut()
200        .expect("op_scratch slot uninitialized for operator node")
201        .as_any_mut()
202        .downcast_mut::<T>()
203        .expect("op_scratch type mismatch")
204}
205
206impl Core {
207    // -------------------------------------------------------------------
208    // Wave entry + drain
209    // -------------------------------------------------------------------
210
211    /// Wave entry. The caller passes a closure that performs the wave's
212    /// triggering operation (`commit_emission`, `terminate_node`, etc.).
213    /// The closure runs lock-released; closure-internal Core methods
214    /// acquire the state lock as they go.
215    ///
216    /// **Implementation:** delegates to [`Self::begin_batch`] for the
217    /// wave's RAII lifecycle. The returned `BatchGuard` holds the
218    /// `wave_owner` re-entrant mutex for the wave's duration (cross-thread
219    /// emits block; same-thread re-entry passes through), claims `in_tick`,
220    /// and on drop runs the drain + flush + sink-fire phases — OR, if the
221    /// closure panicked, the panic-discard path that restores cache
222    /// snapshots and clears in_tick. This unification gives `run_wave` the
223    /// same panic-safety guarantee as the user-facing `Core::batch`.
224    ///
225    /// **Re-entrance:** a closure invoked from inside another wave — the
226    /// inner `run_wave`'s `begin_batch` observes `in_tick=true`, the
227    /// returned guard is non-owning (`owns_tick=false`), drop is a no-op.
228    /// The outer wave's drain picks up the inner closure's queued work.
229    ///
230    /// **Lock-release discipline (Slice A close, M1):** all binding-side
231    /// callbacks except the subscribe-time handshake fire lock-released.
232    /// Sinks that re-enter Core run a nested wave; user fns that re-enter
233    /// Core run a nested wave; custom-equals oracles that re-enter Core
234    /// run a nested wave. Cross-thread emits block at `wave_owner` until
235    /// the in-flight wave's drain completes — preserving the user-facing
236    /// "emit returning means subscribers have observed" contract.
237    pub(crate) fn run_wave<F>(&self, op: F)
238    where
239        F: FnOnce(&Self),
240    {
241        // The guard holds wave_owner + claims in_tick. On normal scope
242        // exit it drains + flushes + fires sinks. On panic-during-op it
243        // restores cache snapshots + clears in_tick (panic-discard).
244        let _guard = self.begin_batch();
245        op(self);
246        // _guard drops here → drain or panic-discard, depending on whether
247        // op() returned normally.
248    }
249
250    /// Release retains held by `wave_cache_snapshots` and clear the map.
251    /// Called from the wave-success paths in `run_wave` and `BatchGuard`.
252    pub(crate) fn commit_wave_cache_snapshots(&self, s: &mut CoreState) {
253        if s.wave_cache_snapshots.is_empty() {
254            return;
255        }
256        let snapshots = std::mem::take(&mut s.wave_cache_snapshots);
257        for (_, old_handle) in snapshots {
258            self.binding.release_handle(old_handle);
259        }
260    }
261
262    /// Restore cache slots from `wave_cache_snapshots` and clear the map.
263    /// Called from the wave-abort path in `BatchGuard::drop` (panic).
264    ///
265    /// For each snapshotted node:
266    ///
267    /// 1. Read the current cache (the in-flight new value).
268    /// 2. Set `cache = old_handle` (the snapshot's retained value).
269    /// 3. Release the now-unowned current cache handle.
270    ///
271    /// Returns the list of "current" handles to release outside the lock.
272    pub(crate) fn restore_wave_cache_snapshots(&self, s: &mut CoreState) -> Vec<HandleId> {
273        if s.wave_cache_snapshots.is_empty() {
274            return Vec::new();
275        }
276        let snapshots = std::mem::take(&mut s.wave_cache_snapshots);
277        let mut releases = Vec::with_capacity(snapshots.len());
278        for (node_id, old_handle) in snapshots {
279            let Some(rec) = s.nodes.get_mut(&node_id) else {
280                releases.push(old_handle);
281                continue;
282            };
283            let current = std::mem::replace(&mut rec.cache, old_handle);
284            if current != NO_HANDLE {
285                releases.push(current);
286            }
287        }
288        releases
289    }
290
291    /// Drain pending fires until quiescent, then flush wave-end notifications
292    /// to subscribers. Each fire iteration drops the state lock around the
293    /// binding's `invoke_fn` callback so user fns may re-enter Core safely.
294    ///
295    /// `&self`-only — manages its own locking. Called from [`Self::run_wave`]
296    /// and [`super::node::Core::activate_derived`] (via `run_wave`).
297    pub(crate) fn drain_and_flush(&self) {
298        let mut guard = 0u32;
299        loop {
300            // R1.3.8.c (Slice F, A3): if no fires are pending but there are
301            // queued pause-overflow ERRORs, synthesize them now. The
302            // resulting ERROR cascade may add to pending_fires (children
303            // settling their terminal state), so we loop back to drain.
304            let synth_pending = {
305                let mut s = self.lock_state();
306                if s.pending_fires.is_empty() && !s.pending_pause_overflow.is_empty() {
307                    std::mem::take(&mut s.pending_pause_overflow)
308                } else {
309                    Vec::new()
310                }
311            };
312            for entry in synth_pending {
313                // Lock-released call to the binding hook. Default impl
314                // returns None — the binding has opted out of R1.3.8.c
315                // and we fall back to silent-drop + ResumeReport.dropped.
316                let handle = self.binding.synthesize_pause_overflow_error(
317                    entry.node_id,
318                    entry.dropped_count,
319                    entry.configured_max,
320                    entry.lock_held_ns / 1_000_000,
321                );
322                if let Some(h) = handle {
323                    // Re-enter Core::error to terminate the node and
324                    // cascade. We're inside a wave (`in_tick = true`),
325                    // so error() gets a non-owning batch guard — it
326                    // doesn't try to start its own drain. The cascade
327                    // queues into our outer drain via pending_fires
328                    // and pending_notify.
329                    self.error(entry.node_id, h);
330                }
331            }
332
333            // Pick next fire under a short lock. Also re-read the configured
334            // drain cap so callers can tune via `Core::set_max_batch_drain_iterations`
335            // without restarting waves mid-flight.
336            let (next, cap, pending_size) = {
337                let s = self.lock_state();
338                if s.pending_fires.is_empty() {
339                    break;
340                }
341                let cap = s.max_batch_drain_iterations;
342                let pending_size = s.pending_fires.len();
343                let next = self.pick_next_fire(&s);
344                (next, cap, pending_size)
345            };
346            guard += 1;
347            assert!(
348                guard < cap,
349                "wave drain exceeded {cap} iterations \
350                 (pending_fires={pending_size}). Most likely cause: a runtime \
351                 cycle introduced by an operator that re-arms its own pending_fires \
352                 slot from inside `invoke_fn` (e.g. a producer that subscribes to \
353                 itself, or a fn that calls Core::emit on a node whose fn fires \
354                 the original node again). Structural cycles via set_deps are \
355                 rejected at edge-mutation time. Tune via Core::set_max_batch_drain_iterations \
356                 only with concrete evidence the workload needs more iterations."
357            );
358            let Some(next) = next else { break };
359            // fire_fn manages its own locking around invoke_fn.
360            self.fire_fn(next);
361        }
362        // Auto-resolve sweep: nodes registered in pending_auto_resolve
363        // by the RESOLVED child propagation need a Resolved if they didn't
364        // fire and settle via their own commit_emission. Check pending_notify
365        // for each candidate — if it has Dirty but no tier-3+ message, the
366        // node never settled and needs auto-Resolved. Route through
367        // queue_notify so paused nodes get the Resolved into their pause
368        // buffer.
369        let mut s = self.lock_state();
370        let candidates = std::mem::take(&mut s.pending_auto_resolve);
371        for node_id in candidates {
372            let needs_resolve = s
373                .pending_notify
374                .get(&node_id)
375                .is_some_and(|entry| !entry.iter_messages().any(|m| m.tier() >= 3));
376            if needs_resolve {
377                self.queue_notify(&mut s, node_id, Message::Resolved);
378            }
379        }
380        // Final flush phase — populates deferred_flush_jobs
381        // from pending_notify (already carries per-node sink snapshots).
382        self.flush_notifications(&mut s);
383    }
384
385    /// Pick a node whose **transitive** upstream is fully settled.
386    ///
387    /// A node `id` is ready to fire iff none of its transitive ancestors
388    /// (via the `deps` chain) is currently in `pending_fires`. Diamond
389    /// glitch prevention requires transitive (not immediate-only) reasoning
390    /// — for graph `A → {B, C, E}; D = combine(B, C); F = combine(D, E)`,
391    /// a wave that fires `E` first adds `F` to `pending_fires` before `D`
392    /// is added. An immediate-only readiness check would mistakenly pick
393    /// `F` because neither `D` nor `E` are in `pending_fires` at that
394    /// moment, firing `F` against the stale activation-time `D` cache and
395    /// again later when `D` actually settles. Transitive upstream walk
396    /// catches `B`/`C` pending and correctly defers `F`.
397    ///
398    /// Cost: O(V) per candidate; worst case O(N·V) per pick. The existing
399    /// porting-deferred entry on `pick_next_fire` perf flagged this as a
400    /// future per-node `unresolved_dep_count` refactor.
401    fn pick_next_fire(&self, s: &CoreState) -> Option<NodeId> {
402        for &id in &s.pending_fires {
403            if Self::transitive_upstream_settled(s, id) {
404                return Some(id);
405            }
406        }
407        // Cycle / no eligible candidate (every node has an upstream pending,
408        // possibly via a cycle path): pick any so the drain guard advances.
409        // The drain-iteration cap will catch genuine cycles.
410        s.pending_fires.iter().copied().next()
411    }
412
413    fn transitive_upstream_settled(s: &CoreState, node_id: NodeId) -> bool {
414        let rec = s.require_node(node_id);
415        if rec.dep_count() == 0 {
416            return true;
417        }
418        let mut visited: ahash::AHashSet<NodeId> = ahash::AHashSet::new();
419        let mut stack: Vec<NodeId> = rec.dep_ids_vec();
420        while let Some(id) = stack.pop() {
421            if !visited.insert(id) {
422                continue;
423            }
424            if s.pending_fires.contains(&id) {
425                return false;
426            }
427            if let Some(r) = s.nodes.get(&id) {
428                for dep_id in r.dep_ids() {
429                    if !visited.contains(&dep_id) {
430                        stack.push(dep_id);
431                    }
432                }
433            }
434        }
435        true
436    }
437
438    /// Wave drain entry point. Dispatches via `rec.op` to either the
439    /// regular fn-fire path ([`Self::fire_regular`]) or the operator
440    /// dispatch ([`Self::fire_operator`]).
441    pub(crate) fn fire_fn(&self, node_id: NodeId) {
442        let op = {
443            let s = self.lock_state();
444            s.nodes.get(&node_id).and_then(|r| r.op)
445        };
446        match op {
447            Some(operator_op) => self.fire_operator(node_id, operator_op),
448            None => {
449                // State / Derived / Dynamic / Producer all dispatch via fn_id.
450                self.fire_regular(node_id);
451            }
452        }
453    }
454
455    /// Fire a node's fn lock-released around `invoke_fn`.
456    ///
457    /// Phase 1 (lock-held): remove from pending_fires, snapshot fn_id +
458    /// dep_records → DepBatch + kind. Skip if terminal, first-run-gate-closed,
459    /// or stateless.
460    ///
461    /// Phase 2 (lock-released): call `BindingBoundary::invoke_fn`. User fn
462    /// callbacks may re-enter Core (`emit`, `pause`, etc.) and run a nested
463    /// wave — the in_tick gate composes naturally because nested calls
464    /// observe `in_tick = true` and skip their own drain.
465    ///
466    /// Phase 3 (lock-held): mark `has_fired_once`, store dynamic-tracked,
467    /// decide between Noop+RESOLVED, single Data, or Batch.
468    ///
469    /// Phase 4: commit emissions. Single Data goes through
470    /// `commit_emission` (with equals substitution). Batch emissions are
471    /// processed in sequence — Data via `commit_emission_verbatim` (no
472    /// equals substitution per R1.3.2.d / R1.3.3.c), Complete/Error via
473    /// terminal cascade.
474    #[allow(clippy::too_many_lines)] // Slice G added Noop / Batch tier-3 guards
475    fn fire_regular(&self, node_id: NodeId) {
476        enum FireAction {
477            None,
478            SingleData(HandleId),
479            Batch(SmallVec<[FnEmission; 2]>),
480        }
481
482        // Phase 1: snapshot inputs — build DepBatch per dep from dep_records.
483        // `has_fired_once` is captured here for the Slice E2 OnRerun gate
484        // (Phase 1.5 below): the cleanup hook only fires when the fn has
485        // run at least once already in this activation cycle.
486        let prep: Option<(crate::handle::FnId, Vec<DepBatch>, bool, bool)> = {
487            let mut s = self.lock_state();
488            s.pending_fires.remove(&node_id);
489            let rec = s.require_node(node_id);
490            // Skip: terminal, first-run-gate-closed (R2.5.3 / R5.4 — partial
491            // mode opts out of the gate per D011), or stateless.
492            if rec.terminal.is_some() || (!rec.partial && rec.has_sentinel_deps()) {
493                None
494            } else {
495                rec.fn_id.map(|fn_id| {
496                    let dep_batches: Vec<DepBatch> = rec
497                        .dep_records
498                        .iter()
499                        .map(|dr| DepBatch {
500                            data: dr.data_batch.clone(),
501                            prev_data: dr.prev_data,
502                            involved: dr.involved_this_wave,
503                        })
504                        .collect();
505                    (fn_id, dep_batches, rec.is_dynamic, rec.has_fired_once)
506                })
507            }
508        };
509        let Some((fn_id, dep_batches, is_dynamic, has_fired_once)) = prep else {
510            return;
511        };
512
513        // Phase 1.5 (Slice E2 — R2.4.5 OnRerun, lock-released per D045): if
514        // the fn has fired at least once in this activation cycle, fire its
515        // OnRerun cleanup hook BEFORE the next invoke_fn re-allocates fn-
516        // local resources. First-fire is intentionally skipped — there is
517        // no prior run to clean up. Fires OUTSIDE `FiringGuard` because
518        // cleanup re-entrance is not the A6 reentrancy concern (which
519        // protects against `set_deps(self, ...)` from inside the in-flight
520        // invoke_fn). Operator nodes never reach this path (`fire_regular`
521        // is the fn-id branch of `fire_fn`; operators dispatch via
522        // `fire_operator`), so cleanup hooks correctly only fire for fn-
523        // shaped nodes (state / derived / dynamic / producer).
524        if has_fired_once {
525            self.binding
526                .cleanup_for(node_id, crate::boundary::CleanupTrigger::OnRerun);
527        }
528
529        // Phase 2: invoke fn lock-released. A6 reentrancy guard is scoped to
530        // the FFI call only — Phase 3's lock-held state mutation is not part
531        // of "currently firing" because set_deps would already block on the
532        // state lock by then. Drop on the guard pops the stack even if
533        // invoke_fn panics, keeping `currently_firing` balanced.
534        let result = {
535            let _firing = FiringGuard::new(self, node_id);
536            self.binding.invoke_fn(node_id, fn_id, &dep_batches)
537        };
538
539        // Phase 3: apply result under the lock — defensive terminal check
540        // (a sibling cascade may have terminated this node during phase 2).
541        let action: FireAction = {
542            let mut s = self.lock_state();
543            // Defensive: node may have terminated mid-phase-2 via a sibling
544            // cascade (a fn that re-entered `Core::error` on a path that
545            // cascaded here). If so, release any payload handles and no-op.
546            if s.require_node(node_id).terminal.is_some() {
547                match &result {
548                    FnResult::Data { handle, .. } => {
549                        self.binding.release_handle(*handle);
550                    }
551                    FnResult::Batch { emissions, .. } => {
552                        for em in emissions {
553                            match em {
554                                FnEmission::Data(h) | FnEmission::Error(h) => {
555                                    self.binding.release_handle(*h);
556                                }
557                                FnEmission::Complete => {}
558                            }
559                        }
560                    }
561                    FnResult::Noop { .. } => {}
562                }
563                return;
564            }
565            let rec = s.require_node_mut(node_id);
566            rec.has_fired_once = true;
567            if is_dynamic {
568                let tracked = match &result {
569                    FnResult::Data { tracked, .. }
570                    | FnResult::Noop { tracked }
571                    | FnResult::Batch { tracked, .. } => tracked.clone(),
572                };
573                if let Some(t) = tracked {
574                    rec.tracked = t.into_iter().collect();
575                }
576            }
577            match result {
578                FnResult::Noop { .. } => {
579                    // Slice G: skip Resolved if a prior emission in the same
580                    // wave already queued tier-3 (would violate R1.3.3.a).
581                    let already_dirty = s.require_node(node_id).dirty;
582                    let already_tier3 = s
583                        .pending_notify
584                        .get(&node_id)
585                        .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3));
586                    if already_dirty && !already_tier3 {
587                        self.queue_notify(&mut s, node_id, Message::Resolved);
588                    }
589                    FireAction::None
590                }
591                FnResult::Data { handle, .. } => FireAction::SingleData(handle),
592                FnResult::Batch { emissions, .. } if emissions.is_empty() => {
593                    // Empty Batch is equivalent to Noop — settle with
594                    // RESOLVED if the node was dirty (R1.3.1.a). Slice G:
595                    // skip if a prior emission already queued tier-3.
596                    let already_dirty = s.require_node(node_id).dirty;
597                    let already_tier3 = s
598                        .pending_notify
599                        .get(&node_id)
600                        .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3));
601                    if already_dirty && !already_tier3 {
602                        self.queue_notify(&mut s, node_id, Message::Resolved);
603                    }
604                    FireAction::None
605                }
606                FnResult::Batch { emissions, .. } => FireAction::Batch(emissions),
607            }
608        };
609
610        // Phase 4: commit emissions.
611        match action {
612            FireAction::None => {}
613            // Single Data — equals substitution applies (R1.3.2).
614            FireAction::SingleData(handle) => {
615                self.commit_emission(node_id, handle);
616            }
617            // Batch — process in sequence. No equals substitution
618            // (R1.3.2.d / R1.3.3.c: multi-message waves pass verbatim).
619            FireAction::Batch(emissions) => {
620                self.commit_batch(node_id, emissions);
621            }
622        }
623    }
624
625    /// Process a `FnResult::Batch` emissions sequence. Each `Data` goes
626    /// through `commit_emission_verbatim` (no equals substitution per
627    /// R1.3.2.d / R1.3.3.c). Terminal emissions (`Complete` / `Error`)
628    /// cascade per R1.3.4; processing stops at the first terminal and
629    /// remaining handles are released (R1.3.4.a: no further messages
630    /// after terminal).
631    fn commit_batch(&self, node_id: NodeId, emissions: SmallVec<[FnEmission; 2]>) {
632        let mut iter = emissions.into_iter();
633        for em in iter.by_ref() {
634            match em {
635                FnEmission::Data(handle) => {
636                    self.commit_emission_verbatim(node_id, handle);
637                }
638                FnEmission::Complete => {
639                    self.complete(node_id);
640                    break;
641                }
642                FnEmission::Error(handle) => {
643                    self.error(node_id, handle);
644                    break;
645                }
646            }
647        }
648        // Release handles from any emissions after the terminal break.
649        for em in iter {
650            match em {
651                FnEmission::Data(h) | FnEmission::Error(h) => {
652                    self.binding.release_handle(h);
653                }
654                FnEmission::Complete => {}
655            }
656        }
657    }
658
659    // -------------------------------------------------------------------
660    // Emission commit — equals-substitution lives here
661    // -------------------------------------------------------------------
662
663    /// Apply a node's emission. `&self`-only; brackets the equals check
664    /// around a lock release so `BindingBoundary::custom_equals` can re-enter
665    /// Core safely.
666    ///
667    /// Phase 1 (lock-held): defensive terminal short-circuit; snapshot
668    /// equals_mode + old cache handle.
669    ///
670    /// Phase 2 (lock-released): call `handles_equal` — `EqualsMode::Identity`
671    /// is a pure `u64` compare with no boundary call; `EqualsMode::Custom`
672    /// crosses to the binding's `custom_equals` oracle, which may re-enter
673    /// Core.
674    ///
675    /// Phase 3 (lock-held): set cache, queue Dirty + Data/Resolved into
676    /// pending_notify (which snapshots subscribers on first touch),
677    /// propagate to children.
678    pub(crate) fn commit_emission(&self, node_id: NodeId, new_handle: HandleId) {
679        assert!(
680            new_handle != NO_HANDLE,
681            "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
682        );
683
684        // Phase 1: terminal short-circuit + snapshot equals/cache.
685        let snapshot = {
686            let s = self.lock_state();
687            let rec = s.require_node(node_id);
688            if rec.terminal.is_some() {
689                drop(s);
690                self.binding.release_handle(new_handle);
691                return;
692            }
693            (rec.cache, rec.equals)
694        };
695        let (old_handle, equals_mode) = snapshot;
696
697        // Slice G (2026-05-07): R1.3.2.d says equals substitution only
698        // fires for SINGLE-DATA waves at one node. Detect "this is a
699        // subsequent emit in the same wave at this node" via the
700        // wave-scoped `tier3_emitted_this_wave` set. If set → multi-emit
701        // wave: skip equals, queue Data verbatim, retroactively rewrite
702        // any prior Resolved (queued by an earlier same-value emit's
703        // equals match) to Data using the wave-start cache snapshot.
704        // Outside batch / first emit: standard per-emit equals path.
705        let is_subsequent_emit_in_wave = {
706            let s = self.lock_state();
707            s.tier3_emitted_this_wave.contains(&node_id)
708        };
709
710        if is_subsequent_emit_in_wave {
711            // Multi-emit wave detected. Skip equals, queue Data verbatim.
712            // Also rewrite any prior Resolved entries to Data using the
713            // wave-start cache snapshot.
714            self.rewrite_prior_resolved_to_data(node_id);
715            self.commit_emission_verbatim(node_id, new_handle);
716            return;
717        }
718
719        // Phase 2: equals check (lock-released for Custom).
720        let is_data = !self.handles_equal_lock_released(equals_mode, old_handle, new_handle);
721
722        // Phase 3: apply emission under the lock. Defensive terminal
723        // re-check — a concurrent cascade between phase 2 and phase 3
724        // could have terminated the node.
725        let mut s = self.lock_state();
726        if s.require_node(node_id).terminal.is_some() {
727            drop(s);
728            self.binding.release_handle(new_handle);
729            return;
730        }
731
732        // R1.3.1.a condition (b): synthesize DIRTY only if node not already
733        // dirty from an earlier emission in the same wave.
734        let already_dirty = s.require_node(node_id).dirty;
735        s.require_node_mut(node_id).dirty = true;
736        if !already_dirty {
737            self.queue_notify(&mut s, node_id, Message::Dirty);
738        }
739
740        if is_data {
741            // P3 (Slice A close /qa): re-read CURRENT cache. Same-thread
742            // re-entry from a `custom_equals` oracle that called back into
743            // `Core::emit` on this same node during phase 2's lock-released
744            // equals check could have advanced the cache between phase 1's
745            // snapshot (`old_handle`) and this point.
746            let current_cache = s.require_node(node_id).cache;
747            let snapshot_taken = if s.in_tick && current_cache != NO_HANDLE {
748                use std::collections::hash_map::Entry;
749                match s.wave_cache_snapshots.entry(node_id) {
750                    Entry::Vacant(slot) => {
751                        slot.insert(current_cache);
752                        true
753                    }
754                    Entry::Occupied(_) => false,
755                }
756            } else {
757                false
758            };
759            s.require_node_mut(node_id).cache = new_handle;
760            if current_cache != NO_HANDLE && !snapshot_taken {
761                self.binding.release_handle(current_cache);
762            }
763            // Slice E1 (R2.6.5 / Lock 6.G): push DATA into the replay
764            // buffer if the node opted in. RESOLVED entries are NOT
765            // buffered (canonical "DATA only").
766            self.push_replay_buffer(&mut s, node_id, new_handle);
767            // Slice G: mark this node as having emitted tier-3 in this wave.
768            s.tier3_emitted_this_wave.insert(node_id);
769            self.queue_notify(&mut s, node_id, Message::Data(new_handle));
770            // Propagate to children
771            let child_ids: Vec<NodeId> = s
772                .children
773                .get(&node_id)
774                .map(|c| c.iter().copied().collect())
775                .unwrap_or_default();
776            for child_id in child_ids {
777                let dep_idx = s.require_node(child_id).dep_index_of(node_id);
778                if let Some(idx) = dep_idx {
779                    self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
780                }
781            }
782        } else {
783            // RESOLVED: handle unchanged. Don't release; old still in use.
784            // Slice G: snapshot cache so a subsequent same-wave emit can
785            // rewrite this Resolved to Data using the snapshot.
786            let current_cache = s.require_node(node_id).cache;
787            if s.in_tick && current_cache != NO_HANDLE {
788                use std::collections::hash_map::Entry;
789                if let Entry::Vacant(slot) = s.wave_cache_snapshots.entry(node_id) {
790                    self.binding.retain_handle(current_cache);
791                    slot.insert(current_cache);
792                }
793            }
794            // Slice G: mark this node as having emitted tier-3 in this wave.
795            s.tier3_emitted_this_wave.insert(node_id);
796            self.queue_notify(&mut s, node_id, Message::Resolved);
797            let child_ids: Vec<NodeId> = s
798                .children
799                .get(&node_id)
800                .map(|c| c.iter().copied().collect())
801                .unwrap_or_default();
802            for child_id in child_ids {
803                let already_involved = s.require_node(child_id).involved_this_wave;
804                if !already_involved {
805                    {
806                        let child = s.require_node_mut(child_id);
807                        child.involved_this_wave = true;
808                        child.dirty = true;
809                    }
810                    self.queue_notify(&mut s, child_id, Message::Dirty);
811                    s.pending_auto_resolve.insert(child_id);
812                }
813            }
814        }
815    }
816
817    /// Slice G: when a multi-emit wave is detected at `node_id` (a second
818    /// emit arrives while a prior tier-3 message is still pending), rewrite
819    /// any `Resolved` entries from earlier emits to `Data(snapshot_cache)`
820    /// so the wave conforms to R1.3.3.a (≥1 DATA OR exactly 1 RESOLVED).
821    /// Touches both `pending_notify` (immediate-flush path) and the per-node
822    /// pause buffer (paused path).
823    fn rewrite_prior_resolved_to_data(&self, node_id: NodeId) {
824        let mut s = self.lock_state();
825        let snapshot = match s.wave_cache_snapshots.get(&node_id).copied() {
826            Some(h) if h != NO_HANDLE => h,
827            // No snapshot available — the prior Resolved was queued without
828            // a cache (sentinel pre-emit). Nothing to rewrite to; the
829            // multi-emit case from sentinel is fine (verbatim Data path).
830            _ => return,
831        };
832        let mut retains_needed = 0u32;
833        // Pending_notify path. Walk all batches' messages — Slice-G
834        // coalescing reasons about wave-content per node, not per-batch.
835        if let Some(entry) = s.pending_notify.get_mut(&node_id) {
836            for msg in entry.iter_messages_mut() {
837                if matches!(msg, Message::Resolved) {
838                    *msg = Message::Data(snapshot);
839                    retains_needed += 1;
840                }
841            }
842        }
843        // Pause-buffer path.
844        if let Some(rec) = s.nodes.get_mut(&node_id) {
845            if let crate::node::PauseState::Paused { buffer, .. } = &mut rec.pause_state {
846                for msg in &mut *buffer {
847                    if matches!(msg, Message::Resolved) {
848                        *msg = Message::Data(snapshot);
849                        retains_needed += 1;
850                    }
851                }
852            }
853        }
854        drop(s);
855        // Each rewritten Resolved → Data adds a payload retain that
856        // queue_notify would otherwise have taken at emit time. The
857        // snapshot already owns one retain (taken when cache was
858        // snapshotted); we need one fresh retain per rewrite.
859        for _ in 0..retains_needed {
860            self.binding.retain_handle(snapshot);
861        }
862    }
863
864    /// Equals check that crosses the binding boundary lock-released for
865    /// `EqualsMode::Custom`. Caller must NOT hold the state lock.
866    fn handles_equal_lock_released(&self, mode: EqualsMode, a: HandleId, b: HandleId) -> bool {
867        if a == b {
868            return true; // identity-on-handles always sufficient
869        }
870        if a == NO_HANDLE || b == NO_HANDLE {
871            return false;
872        }
873        match mode {
874            EqualsMode::Identity => false,
875            EqualsMode::Custom(handle) => self.binding.custom_equals(handle, a, b),
876        }
877    }
878
879    /// Commit a DATA emission **without** equals substitution — used by
880    /// `FnResult::Batch` processing where multi-message waves pass through
881    /// verbatim per R1.3.2.d / R1.3.3.c. DIRTY auto-prefix respects
882    /// R1.3.1.a condition (b): only queued if node not already dirty.
883    ///
884    /// Structurally identical to the DATA branch of [`Self::commit_emission`]
885    /// but skips the Phase 2 equals check entirely.
886    fn commit_emission_verbatim(&self, node_id: NodeId, new_handle: HandleId) {
887        assert!(
888            new_handle != NO_HANDLE,
889            "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
890        );
891
892        let mut s = self.lock_state();
893        let rec = s.require_node(node_id);
894        if rec.terminal.is_some() {
895            drop(s);
896            self.binding.release_handle(new_handle);
897            return;
898        }
899
900        // R1.3.1.a condition (b): DIRTY only if not already dirty.
901        let already_dirty = s.require_node(node_id).dirty;
902        s.require_node_mut(node_id).dirty = true;
903        if !already_dirty {
904            self.queue_notify(&mut s, node_id, Message::Dirty);
905        }
906
907        // Always DATA — no equals substitution for Batch emissions.
908        let current_cache = s.require_node(node_id).cache;
909        let snapshot_taken = if s.in_tick && current_cache != NO_HANDLE {
910            use std::collections::hash_map::Entry;
911            match s.wave_cache_snapshots.entry(node_id) {
912                Entry::Vacant(slot) => {
913                    slot.insert(current_cache);
914                    true
915                }
916                Entry::Occupied(_) => false,
917            }
918        } else {
919            false
920        };
921        s.require_node_mut(node_id).cache = new_handle;
922        if current_cache != NO_HANDLE && !snapshot_taken {
923            self.binding.release_handle(current_cache);
924        }
925        // Slice E1: replay buffer push (R2.6.5 / Lock 6.G).
926        self.push_replay_buffer(&mut s, node_id, new_handle);
927        // Slice G QA fix (A2, 2026-05-07): mark tier3_emitted_this_wave
928        // even on the verbatim path. A subsequent commit_emission at the
929        // same node in the same wave needs this flag to detect multi-emit
930        // and skip equals substitution; without it, a Batch-then-standard
931        // sequence would queue Resolved into a wave that already has Data
932        // — violating R1.3.3.a. The Batch path itself still passes
933        // verbatim per R1.3.3.c (we don't re-run equals here); we just
934        // record that "this node has emitted tier-3 in this wave."
935        s.tier3_emitted_this_wave.insert(node_id);
936        self.queue_notify(&mut s, node_id, Message::Data(new_handle));
937        // Propagate to children
938        let child_ids: Vec<NodeId> = s
939            .children
940            .get(&node_id)
941            .map(|c| c.iter().copied().collect())
942            .unwrap_or_default();
943        for child_id in child_ids {
944            let dep_idx = s.require_node(child_id).dep_index_of(node_id);
945            if let Some(idx) = dep_idx {
946                self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
947            }
948        }
949    }
950
951    /// Slice E1 (R2.6.5 / Lock 6.G): push a DATA handle into the node's
952    /// replay buffer if opted in. Evicts oldest if cap exceeded; takes a
953    /// fresh retain on push. RESOLVED is NOT buffered per canonical
954    /// "DATA only" — call sites only invoke this for Data emissions.
955    ///
956    /// Evicted handle is queued into `s.deferred_handle_releases`
957    /// (released lock-released at flush time) per the binding-boundary
958    /// lock-release discipline — `release_handle` may re-enter Core via
959    /// finalizers and must not run while the state lock is held
960    /// (QA A3, 2026-05-07).
961    fn push_replay_buffer(&self, s: &mut CoreState, node_id: NodeId, new_handle: HandleId) {
962        let rec = s.require_node_mut(node_id);
963        let cap = match rec.replay_buffer_cap {
964            Some(c) if c > 0 => c,
965            _ => return,
966        };
967        self.binding.retain_handle(new_handle);
968        rec.replay_buffer.push_back(new_handle);
969        let evicted = if rec.replay_buffer.len() > cap {
970            rec.replay_buffer.pop_front()
971        } else {
972            None
973        };
974        if let Some(h) = evicted {
975            s.deferred_handle_releases.push(h);
976        }
977    }
978
979    // ===================================================================
980    // Operator dispatch (Slice C-1, D009).
981    //
982    // `fire_operator` is the entry point for nodes whose `kind` is
983    // `NodeKind::Operator(_)`. It branches on the `OperatorOp` discriminant
984    // to per-operator helpers that snapshot inputs under the lock, drop the
985    // lock to call the binding's bulk projection FFI, and reacquire to
986    // apply emissions via `commit_emission_verbatim` (no per-item equals
987    // dedup at the wire — operator output passes verbatim per the same
988    // R1.3.2.d / R1.3.3.c rule that governs `FnResult::Batch`).
989    //
990    // **Refcount discipline:** inputs sourced from `dep_records[i].data_batch`
991    // share retains owned by the wave's data-batch slot (released at
992    // wave-end rotation in `clear_wave_state`). Operators that emit those
993    // handles unchanged (`Filter`, `DistinctUntilChanged`, `Pairwise`'s
994    // `prev` carry-over) take an additional retain via `retain_handle`
995    // before passing to `commit_emission_verbatim` — the cache slot owns
996    // its own share, independent of the data-batch slot's. Operators that
997    // produce fresh handles (`Map` / `Scan` / `Reduce` / `Pairwise`'s
998    // packed tuples) receive retains pre-bumped by the binding's bulk-
999    // projection method.
1000    // ===================================================================
1001
1002    /// Operator dispatch entry. Pre-checks (terminal short-circuit, first-
1003    /// run gate accounting for `partial`, terminal-aware fire for `Reduce`)
1004    /// happen here; per-operator behavior lives in the `fire_op_*` helpers.
1005    fn fire_operator(&self, node_id: NodeId, op: OperatorOp) {
1006        // Phase 1 (lock-held): remove from pending_fires, evaluate skip.
1007        let proceed = {
1008            let mut s = self.lock_state();
1009            s.pending_fires.remove(&node_id);
1010            let rec = s.require_node(node_id);
1011            if rec.terminal.is_some() {
1012                false
1013            } else {
1014                // First-run gate (R2.5.3 / R5.4). Partial-mode operators
1015                // (D011) opt out of the gate; otherwise we wait for every
1016                // dep to have delivered at least one real handle. Terminal-
1017                // aware operators (currently `Reduce`) additionally count a
1018                // dep terminal as "real input" so they can fire on
1019                // upstream COMPLETE-without-DATA and emit the seed.
1020                let has_real_input = !rec.has_sentinel_deps()
1021                    || rec.dep_records.iter().any(|dr| dr.terminal.is_some());
1022                rec.partial || has_real_input
1023            }
1024        };
1025        if !proceed {
1026            return;
1027        }
1028
1029        // A6 (Slice F, 2026-05-07): track operator fire on the
1030        // `currently_firing` stack so a binding-side project/predicate/fold
1031        // FFI callback that re-enters `Core::set_deps(node_id, ...)` is
1032        // rejected with `SetDepsError::ReentrantOnFiringNode`. Drop pops
1033        // the stack on panic too.
1034        let _firing = FiringGuard::new(self, node_id);
1035
1036        match op {
1037            OperatorOp::Map { fn_id } => self.fire_op_map(node_id, fn_id),
1038            OperatorOp::Filter { fn_id } => self.fire_op_filter(node_id, fn_id),
1039            OperatorOp::Scan { fn_id, .. } => self.fire_op_scan(node_id, fn_id),
1040            OperatorOp::Reduce { fn_id, .. } => self.fire_op_reduce(node_id, fn_id),
1041            OperatorOp::DistinctUntilChanged { equals_fn_id } => {
1042                self.fire_op_distinct(node_id, equals_fn_id);
1043            }
1044            OperatorOp::Pairwise { fn_id } => self.fire_op_pairwise(node_id, fn_id),
1045            OperatorOp::Combine { pack_fn } => self.fire_op_combine(node_id, pack_fn),
1046            OperatorOp::WithLatestFrom { pack_fn } => {
1047                self.fire_op_with_latest_from(node_id, pack_fn);
1048            }
1049            OperatorOp::Merge => self.fire_op_merge(node_id),
1050            OperatorOp::Take { count } => self.fire_op_take(node_id, count),
1051            OperatorOp::Skip { count } => self.fire_op_skip(node_id, count),
1052            OperatorOp::TakeWhile { fn_id } => self.fire_op_take_while(node_id, fn_id),
1053            // The variant carries `default` for `register_operator`'s
1054            // `make_op_scratch` path; once registered, the live default
1055            // is read from `LastState::default` inside `fire_op_last`.
1056            OperatorOp::Last { .. } => self.fire_op_last(node_id),
1057        }
1058    }
1059
1060    /// Snapshot the operator's single dep batch (transform constraint —
1061    /// R5.7 single-dep). Returns `(inputs, terminal)` where `inputs` is a
1062    /// fresh `Vec<HandleId>` (no retains) and `terminal` reflects
1063    /// `dep_records[0].terminal` at snapshot time.
1064    fn snapshot_op_dep0(&self, node_id: NodeId) -> (Vec<HandleId>, Option<TerminalKind>) {
1065        let s = self.lock_state();
1066        let rec = s.require_node(node_id);
1067        debug_assert!(
1068            !rec.dep_records.is_empty(),
1069            "transform operator must have ≥1 dep"
1070        );
1071        let dr = &rec.dep_records[0];
1072        (dr.data_batch.iter().copied().collect(), dr.terminal)
1073    }
1074
1075    /// Emit DIRTY (if not already dirty) followed by RESOLVED. Used by
1076    /// silent-drop operators (Filter / DistinctUntilChanged / Pairwise)
1077    /// when a wave's inputs all suppress and the operator needs to settle
1078    /// the wave for its subscribers (D018 — let DIRTY ride; queue RESOLVED
1079    /// on full-reject).
1080    fn settle_dirty_resolved(&self, node_id: NodeId) {
1081        let mut s = self.lock_state();
1082        if s.require_node(node_id).terminal.is_some() {
1083            return;
1084        }
1085        let already_dirty = s.require_node(node_id).dirty;
1086        s.require_node_mut(node_id).dirty = true;
1087        if !already_dirty {
1088            self.queue_notify(&mut s, node_id, Message::Dirty);
1089        }
1090        // Slice G: skip Resolved if pending_notify already has a tier-3
1091        // message — adding Resolved would violate R1.3.3.a.
1092        let already_tier3 = s
1093            .pending_notify
1094            .get(&node_id)
1095            .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3));
1096        if !already_tier3 {
1097            self.queue_notify(&mut s, node_id, Message::Resolved);
1098        }
1099    }
1100
1101    /// `OperatorOp::Map` dispatch.
1102    fn fire_op_map(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1103        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1104        // Mark fired regardless of input count (activation gate already
1105        // satisfied or partial-mode).
1106        {
1107            let mut s = self.lock_state();
1108            s.require_node_mut(node_id).has_fired_once = true;
1109        }
1110        if inputs.is_empty() {
1111            return;
1112        }
1113        // Phase 2 (lock-released): bulk project. Binding returns one
1114        // handle per input, each with a retain share already taken.
1115        let outputs = self.binding.project_each(fn_id, &inputs);
1116        // Phase 3: emit each output. `commit_emission_verbatim` consumes
1117        // the retain into the cache slot (and releases the prior cache
1118        // handle internally).
1119        for h in outputs {
1120            self.commit_emission_verbatim(node_id, h);
1121        }
1122    }
1123
1124    /// `OperatorOp::Filter` dispatch (D012/D018).
1125    fn fire_op_filter(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1126        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1127        {
1128            let mut s = self.lock_state();
1129            s.require_node_mut(node_id).has_fired_once = true;
1130        }
1131        if inputs.is_empty() {
1132            return;
1133        }
1134        // Phase 2: predicate per input.
1135        let pass = self.binding.predicate_each(fn_id, &inputs);
1136        debug_assert!(
1137            pass.len() == inputs.len(),
1138            "predicate_each returned {} bools for {} inputs",
1139            pass.len(),
1140            inputs.len()
1141        );
1142        // Phase 3: emit passing items verbatim. Take a fresh retain for
1143        // each — the data_batch slot still owns its retain (released at
1144        // wave-end rotation), and the cache slot needs its own.
1145        let mut emitted = 0usize;
1146        for (i, &h) in inputs.iter().enumerate() {
1147            if pass.get(i).copied().unwrap_or(false) {
1148                self.binding.retain_handle(h);
1149                self.commit_emission_verbatim(node_id, h);
1150                emitted += 1;
1151            }
1152        }
1153        // D018: full-reject settles with DIRTY+RESOLVED.
1154        if emitted == 0 {
1155            self.settle_dirty_resolved(node_id);
1156        }
1157    }
1158
1159    /// `OperatorOp::Scan` dispatch — left-fold emitting each new acc.
1160    fn fire_op_scan(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1161        use crate::op_state::ScanState;
1162        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1163        let acc = {
1164            let s = self.lock_state();
1165            scratch_ref::<ScanState>(&s, node_id).acc
1166        };
1167        {
1168            let mut s = self.lock_state();
1169            s.require_node_mut(node_id).has_fired_once = true;
1170        }
1171        if inputs.is_empty() {
1172            return;
1173        }
1174        // Phase 2: fold each input through. Returns N new handles, each
1175        // with a fresh retain.
1176        let new_states = self.binding.fold_each(fn_id, acc, &inputs);
1177        debug_assert!(
1178            new_states.len() == inputs.len(),
1179            "fold_each returned {} accs for {} inputs",
1180            new_states.len(),
1181            inputs.len()
1182        );
1183        // Phase 3a: update ScanState.acc to the LAST new acc. Take an
1184        // extra retain for the slot; release the prior acc's slot retain.
1185        let last_acc = new_states.last().copied();
1186        if let Some(last) = last_acc {
1187            let prev_acc = {
1188                let mut s = self.lock_state();
1189                let scratch = scratch_mut::<ScanState>(&mut s, node_id);
1190                let prev = scratch.acc;
1191                scratch.acc = last;
1192                prev
1193            };
1194            // Take the slot's retain on the new acc.
1195            self.binding.retain_handle(last);
1196            // Release the prior slot's retain (post-lock to keep binding
1197            // free to re-enter Core safely).
1198            if prev_acc != crate::handle::NO_HANDLE {
1199                self.binding.release_handle(prev_acc);
1200            }
1201        }
1202        // Phase 3b: emit each intermediate acc verbatim. `new_states`
1203        // entries each carry one retain from `fold_each`; that retain is
1204        // consumed by `commit_emission_verbatim` into the cache slot.
1205        for h in new_states {
1206            self.commit_emission_verbatim(node_id, h);
1207        }
1208    }
1209
1210    /// `OperatorOp::Reduce` dispatch — accumulates silently; emits acc on
1211    /// upstream COMPLETE (cascades ERROR verbatim).
1212    fn fire_op_reduce(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1213        use crate::op_state::ReduceState;
1214        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
1215        let acc = {
1216            let s = self.lock_state();
1217            scratch_ref::<ReduceState>(&s, node_id).acc
1218        };
1219        {
1220            let mut s = self.lock_state();
1221            s.require_node_mut(node_id).has_fired_once = true;
1222        }
1223        // Phase 2: accumulate (silent — no per-input emit).
1224        let new_states = if inputs.is_empty() {
1225            SmallVec::<[HandleId; 1]>::new()
1226        } else {
1227            self.binding.fold_each(fn_id, acc, &inputs)
1228        };
1229        debug_assert!(
1230            new_states.len() == inputs.len(),
1231            "fold_each returned {} accs for {} inputs",
1232            new_states.len(),
1233            inputs.len()
1234        );
1235        // Update ReduceState.acc to last new acc; release intermediate
1236        // states (we don't emit them) and the prior acc's slot retain.
1237        let last_acc = new_states.last().copied();
1238        let intermediates_to_release: Vec<HandleId> = if new_states.len() > 1 {
1239            new_states[..new_states.len() - 1].to_vec()
1240        } else {
1241            Vec::new()
1242        };
1243        let prev_acc_to_release = if let Some(last) = last_acc {
1244            let prev_acc = {
1245                let mut s = self.lock_state();
1246                let scratch = scratch_mut::<ReduceState>(&mut s, node_id);
1247                let prev = scratch.acc;
1248                scratch.acc = last;
1249                prev
1250            };
1251            self.binding.retain_handle(last);
1252            if prev_acc == crate::handle::NO_HANDLE {
1253                None
1254            } else {
1255                Some(prev_acc)
1256            }
1257        } else {
1258            None
1259        };
1260        // Release intermediate fold results (Reduce only emits the LAST,
1261        // but only on terminal). Each was retained by `fold_each`.
1262        for h in intermediates_to_release {
1263            self.binding.release_handle(h);
1264        }
1265        if let Some(h) = prev_acc_to_release {
1266            self.binding.release_handle(h);
1267        }
1268
1269        // Phase 3: emit on terminal.
1270        match terminal {
1271            None => {
1272                // Still accumulating; no emit. Subscribers see no message
1273                // for this wave (silent accumulation). The first wave that
1274                // pushes Reduce to fire produces a Dirty entry on the
1275                // upstream's commit, but Reduce itself doesn't queue any
1276                // tier-3 since R5 silently absorbs. v1: leave the
1277                // post-drain auto-resolve sweep to settle nothing —
1278                // pending_notify has no entry for Reduce so the sweep is
1279                // a no-op.
1280            }
1281            Some(TerminalKind::Complete) => {
1282                // Read the live acc (may be the seed if no DATA arrived)
1283                // and emit Data(acc) + Complete.
1284                let final_acc = {
1285                    let s = self.lock_state();
1286                    scratch_ref::<ReduceState>(&s, node_id).acc
1287                };
1288                if final_acc != crate::handle::NO_HANDLE {
1289                    // Emission needs its own retain (slot's retain is
1290                    // owned by ReduceState.acc until reset/Drop).
1291                    self.binding.retain_handle(final_acc);
1292                    self.commit_emission_verbatim(node_id, final_acc);
1293                }
1294                self.complete(node_id);
1295            }
1296            Some(TerminalKind::Error(h)) => {
1297                // Core::error transfers the caller's share into the
1298                // cascade (node.terminal + per-child dep_terminal slots);
1299                // no release at the error() boundary. Take a fresh share
1300                // here so the cascade owns it independently of the
1301                // dep_records[0].terminal slot's share.
1302                self.binding.retain_handle(h);
1303                self.error(node_id, h);
1304            }
1305        }
1306    }
1307
1308    /// `OperatorOp::DistinctUntilChanged` dispatch.
1309    fn fire_op_distinct(&self, node_id: NodeId, equals_fn_id: crate::handle::FnId) {
1310        use crate::op_state::DistinctState;
1311        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1312        let mut prev = {
1313            let s = self.lock_state();
1314            scratch_ref::<DistinctState>(&s, node_id).prev
1315        };
1316        {
1317            let mut s = self.lock_state();
1318            s.require_node_mut(node_id).has_fired_once = true;
1319        }
1320        if inputs.is_empty() {
1321            return;
1322        }
1323        // Take a working-copy retain on the initial prev so both the loop
1324        // (which releases old_prev on each non-equal item) and phase 3
1325        // (which releases the slot's original handle) each have their own
1326        // share. Without this, the loop's release of old_prev (== original
1327        // DistinctState.prev) double-releases against phase 3's stale_slot
1328        // release.
1329        if prev != crate::handle::NO_HANDLE {
1330            self.binding.retain_handle(prev);
1331        }
1332        // Phase 2: per-input equals(prev, current). Each non-equal input
1333        // is emitted and becomes the new prev. Equals fn_id reuses
1334        // `BindingBoundary::custom_equals`.
1335        let mut emitted = 0usize;
1336        for &h in &inputs {
1337            let equal = if prev == crate::handle::NO_HANDLE {
1338                false
1339            } else if prev == h {
1340                true
1341            } else {
1342                self.binding.custom_equals(equals_fn_id, prev, h)
1343            };
1344            if !equal {
1345                // Emit this input verbatim.
1346                self.binding.retain_handle(h);
1347                self.commit_emission_verbatim(node_id, h);
1348                // Update prev: take retain on new prev, release old
1349                // (working-copy retain from above or from prior iteration).
1350                self.binding.retain_handle(h);
1351                let old_prev = prev;
1352                prev = h;
1353                if old_prev != crate::handle::NO_HANDLE {
1354                    self.binding.release_handle(old_prev);
1355                }
1356                emitted += 1;
1357            }
1358        }
1359        // Phase 3: persist prev into DistinctState.prev slot. Release the
1360        // slot's original retain (stale_slot) — this is the slot-owned
1361        // share, independent of the working-copy share released in the
1362        // loop above.
1363        {
1364            let mut s = self.lock_state();
1365            let scratch = scratch_mut::<DistinctState>(&mut s, node_id);
1366            let stale_slot = scratch.prev;
1367            scratch.prev = prev;
1368            if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
1369                drop(s);
1370                self.binding.release_handle(stale_slot);
1371            }
1372        }
1373        // Release the working-copy retain on the final prev if it was
1374        // never released in the loop (i.e. no non-equal items passed,
1375        // prev == original). In that case stale_slot == prev, so phase 3
1376        // didn't release it either — but the working-copy retain is still
1377        // outstanding. Release it now.
1378        if emitted == 0 && prev != crate::handle::NO_HANDLE {
1379            self.binding.release_handle(prev);
1380        }
1381        if emitted == 0 {
1382            self.settle_dirty_resolved(node_id);
1383        }
1384    }
1385
1386    /// `OperatorOp::Pairwise` dispatch — emits `(prev, current)` tuples
1387    /// starting after the second value (first input swallowed, sets `prev`).
1388    fn fire_op_pairwise(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1389        use crate::op_state::PairwiseState;
1390        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1391        let mut prev = {
1392            let s = self.lock_state();
1393            scratch_ref::<PairwiseState>(&s, node_id).prev
1394        };
1395        {
1396            let mut s = self.lock_state();
1397            s.require_node_mut(node_id).has_fired_once = true;
1398        }
1399        if inputs.is_empty() {
1400            return;
1401        }
1402        let mut emitted = 0usize;
1403        for &h in &inputs {
1404            if prev == crate::handle::NO_HANDLE {
1405                // First-ever value — swallow, set prev. Retain for the
1406                // PairwiseState.prev slot (persisted in phase 3 below).
1407                self.binding.retain_handle(h);
1408                prev = h;
1409                continue;
1410            }
1411            // Pack (prev, current) into a tuple handle. Binding returns a
1412            // fresh retain on the packed handle.
1413            let packed = self.binding.pairwise_pack(fn_id, prev, h);
1414            self.commit_emission_verbatim(node_id, packed);
1415            // Advance prev: take retain on h, release old prev.
1416            self.binding.retain_handle(h);
1417            let old_prev = prev;
1418            prev = h;
1419            self.binding.release_handle(old_prev);
1420            emitted += 1;
1421        }
1422        // Persist prev into PairwiseState.prev slot.
1423        {
1424            let mut s = self.lock_state();
1425            let scratch = scratch_mut::<PairwiseState>(&mut s, node_id);
1426            let stale_slot = scratch.prev;
1427            scratch.prev = prev;
1428            if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
1429                drop(s);
1430                self.binding.release_handle(stale_slot);
1431            }
1432        }
1433        if emitted == 0 {
1434            self.settle_dirty_resolved(node_id);
1435        }
1436    }
1437
1438    // =================================================================
1439    // Slice C-2: multi-dep combinator operators (D020)
1440    // =================================================================
1441
1442    /// Snapshot all deps' "latest" handle for multi-dep combinators.
1443    /// For each dep: returns `data_batch.last()` if non-empty (dep fired
1444    /// this wave), else `prev_data` (last handle from previous wave).
1445    /// Also returns whether dep[0] (primary) had DATA this wave —
1446    /// needed by `fire_op_with_latest_from`.
1447    fn snapshot_op_all_latest(&self, node_id: NodeId) -> (SmallVec<[HandleId; 4]>, bool) {
1448        let s = self.lock_state();
1449        let rec = s.require_node(node_id);
1450        let primary_fired = rec
1451            .dep_records
1452            .first()
1453            .is_some_and(|dr| !dr.data_batch.is_empty());
1454        let latest: SmallVec<[HandleId; 4]> = rec
1455            .dep_records
1456            .iter()
1457            .map(|dr| dr.data_batch.last().copied().unwrap_or(dr.prev_data))
1458            .collect();
1459        (latest, primary_fired)
1460    }
1461
1462    /// `OperatorOp::Combine` dispatch — N-dep combineLatest. Packs the
1463    /// latest handle per dep into a tuple via `pack_tuple`, emits on
1464    /// any dep fire. First-run gate (R2.5.3, partial: false) guarantees
1465    /// all deps have a real handle on first fire. Post-warmup INVALIDATE
1466    /// guard: if any dep's prev_data was cleared, settles with RESOLVED
1467    /// instead of packing a NO_HANDLE into the tuple.
1468    fn fire_op_combine(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
1469        let (latest, _primary_fired) = self.snapshot_op_all_latest(node_id);
1470        {
1471            let mut s = self.lock_state();
1472            s.require_node_mut(node_id).has_fired_once = true;
1473        }
1474        // Post-warmup INVALIDATE guard: a dep may have been invalidated
1475        // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
1476        if latest.contains(&crate::handle::NO_HANDLE) {
1477            self.settle_dirty_resolved(node_id);
1478            return;
1479        }
1480        let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
1481        self.commit_emission_verbatim(node_id, tuple_handle);
1482    }
1483
1484    /// `OperatorOp::WithLatestFrom` dispatch — 2-dep, fire-on-primary-only
1485    /// (D021 / Phase 10.5). Emits `[primary, secondary]` pair only when
1486    /// dep[0] (primary) has DATA in the wave. If only dep[1] fires →
1487    /// RESOLVED. Post-warmup INVALIDATE guard: if secondary latest is
1488    /// `NO_HANDLE` (INVALIDATE cleared it), settles with RESOLVED.
1489    fn fire_op_with_latest_from(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
1490        let (latest, primary_fired) = self.snapshot_op_all_latest(node_id);
1491        let first_fire = {
1492            let mut s = self.lock_state();
1493            let rec = s.require_node_mut(node_id);
1494            let was_first = !rec.has_fired_once;
1495            rec.has_fired_once = true;
1496            was_first
1497        };
1498        // On first fire (gate release), always emit — the first-run gate
1499        // guarantees both deps have values (via prev_data fallback in
1500        // snapshot). On subsequent fires, only emit when primary fires.
1501        if !first_fire && !primary_fired {
1502            // Secondary-only update — no downstream DATA.
1503            self.settle_dirty_resolved(node_id);
1504            return;
1505        }
1506        // Post-warmup INVALIDATE guard: secondary may have been invalidated
1507        // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
1508        debug_assert!(latest.len() == 2, "withLatestFrom requires exactly 2 deps");
1509        if latest[1] == crate::handle::NO_HANDLE {
1510            self.settle_dirty_resolved(node_id);
1511            return;
1512        }
1513        let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
1514        self.commit_emission_verbatim(node_id, tuple_handle);
1515    }
1516
1517    /// `OperatorOp::Merge` dispatch — N-dep, forward all DATA handles
1518    /// verbatim (D022). Zero FFI on fire: no transformation. Each dep's
1519    /// batch handles are collected, retained, and emitted individually.
1520    fn fire_op_merge(&self, node_id: NodeId) {
1521        // Collect all batch handles from all deps (flat).
1522        let all_handles: Vec<HandleId> = {
1523            let s = self.lock_state();
1524            let rec = s.require_node(node_id);
1525            rec.dep_records
1526                .iter()
1527                .flat_map(|dr| dr.data_batch.iter().copied())
1528                .collect()
1529        };
1530        {
1531            let mut s = self.lock_state();
1532            s.require_node_mut(node_id).has_fired_once = true;
1533        }
1534        if all_handles.is_empty() {
1535            // All deps settled RESOLVED this wave — no DATA to forward.
1536            self.settle_dirty_resolved(node_id);
1537            return;
1538        }
1539        // Emit each handle verbatim. Take a fresh retain per handle
1540        // (independent of the dep batch's retain which gets released at
1541        // wave-end). Matches Filter's discipline for passing inputs.
1542        for &h in &all_handles {
1543            self.binding.retain_handle(h);
1544            self.commit_emission_verbatim(node_id, h);
1545        }
1546    }
1547
1548    // =================================================================
1549    // Slice C-3: flow operators (D024)
1550    // =================================================================
1551
1552    /// `OperatorOp::Take` dispatch — emits the first `count` DATA values
1553    /// then self-completes via `Core::complete`. When `count == 0`, the
1554    /// first fire emits zero items then immediately self-completes
1555    /// (D027). Cross-wave counter lives in
1556    /// [`TakeState::count_emitted`](super::op_state::TakeState::count_emitted).
1557    fn fire_op_take(&self, node_id: NodeId, count: u32) {
1558        use crate::op_state::TakeState;
1559        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
1560        // Snapshot current counter; mark fired regardless of input count
1561        // (activation gate already satisfied or partial-mode).
1562        let mut count_emitted = {
1563            let s = self.lock_state();
1564            scratch_ref::<TakeState>(&s, node_id).count_emitted
1565        };
1566        {
1567            let mut s = self.lock_state();
1568            s.require_node_mut(node_id).has_fired_once = true;
1569        }
1570        // Already at quota before any input this wave — self-complete
1571        // immediately. Covers `count == 0` (first-fire short-circuit) and
1572        // any defensive re-entry after the terminal-skip in `fire_operator`
1573        // already guards against double-complete.
1574        if count_emitted >= count {
1575            self.complete(node_id);
1576            return;
1577        }
1578        // Per-input emission loop. Each pass takes a fresh retain for the
1579        // cache slot; data_batch slot's retain is released at wave-end
1580        // rotation independently.
1581        for &h in &inputs {
1582            self.binding.retain_handle(h);
1583            self.commit_emission_verbatim(node_id, h);
1584            count_emitted = count_emitted.saturating_add(1);
1585            if count_emitted >= count {
1586                break;
1587            }
1588        }
1589        // Persist the updated counter.
1590        {
1591            let mut s = self.lock_state();
1592            scratch_mut::<TakeState>(&mut s, node_id).count_emitted = count_emitted;
1593        }
1594        // Self-complete if we hit the quota this wave. Upstream COMPLETE
1595        // (terminal == Some(Complete)) without us hitting the count
1596        // propagates via the standard auto-cascade — we don't intercept it.
1597        if count_emitted >= count {
1598            self.complete(node_id);
1599            return;
1600        }
1601        // If upstream is already Errored and we haven't hit count, the
1602        // standard cascade will propagate it. If the wave delivered no
1603        // inputs (e.g. RESOLVED from upstream), settle DIRTY+RESOLVED so
1604        // subscribers see the wave close.
1605        if inputs.is_empty() && terminal.is_none() {
1606            self.settle_dirty_resolved(node_id);
1607        }
1608    }
1609
1610    /// `OperatorOp::Skip` dispatch — drops the first `count` DATA values,
1611    /// then forwards the rest. Cross-wave counter lives in
1612    /// [`SkipState::count_skipped`](super::op_state::SkipState::count_skipped).
1613    /// On a wave where every input is still in the skip window, settles
1614    /// DIRTY+RESOLVED (D018 pattern) so subscribers see the wave close.
1615    fn fire_op_skip(&self, node_id: NodeId, count: u32) {
1616        use crate::op_state::SkipState;
1617        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1618        let mut count_skipped = {
1619            let s = self.lock_state();
1620            scratch_ref::<SkipState>(&s, node_id).count_skipped
1621        };
1622        {
1623            let mut s = self.lock_state();
1624            s.require_node_mut(node_id).has_fired_once = true;
1625        }
1626        // No early-return on empty inputs: the post-loop `emitted == 0`
1627        // settle handles the empty-inputs case identically to the
1628        // all-swallowed-by-skip-window case (Slice C-3 /qa P6 — symmetry
1629        // with `fire_op_take`).
1630        let mut emitted = 0usize;
1631        for &h in &inputs {
1632            if count_skipped < count {
1633                count_skipped = count_skipped.saturating_add(1);
1634                // Drop this input — the data_batch slot still owns its
1635                // retain (released at wave-end rotation). No emission.
1636                continue;
1637            }
1638            // Past the skip window — emit verbatim. Take a fresh retain
1639            // for the cache slot.
1640            self.binding.retain_handle(h);
1641            self.commit_emission_verbatim(node_id, h);
1642            emitted += 1;
1643        }
1644        // Persist the updated counter.
1645        {
1646            let mut s = self.lock_state();
1647            scratch_mut::<SkipState>(&mut s, node_id).count_skipped = count_skipped;
1648        }
1649        if emitted == 0 {
1650            self.settle_dirty_resolved(node_id);
1651        }
1652    }
1653
1654    /// `OperatorOp::TakeWhile` dispatch — emits while the predicate
1655    /// holds; on the first `false`, emits any preceding passes from the
1656    /// same batch then self-completes via `Core::complete`. Reuses
1657    /// [`BindingBoundary::predicate_each`] (D029).
1658    fn fire_op_take_while(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1659        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1660        {
1661            let mut s = self.lock_state();
1662            s.require_node_mut(node_id).has_fired_once = true;
1663        }
1664        if inputs.is_empty() {
1665            return;
1666        }
1667        // Phase 2: predicate per input.
1668        let pass = self.binding.predicate_each(fn_id, &inputs);
1669        debug_assert!(
1670            pass.len() == inputs.len(),
1671            "predicate_each returned {} bools for {} inputs",
1672            pass.len(),
1673            inputs.len()
1674        );
1675        // Phase 3: emit each input until the first false; then
1676        // self-complete. `fire_operator`'s `terminal.is_some()`
1677        // short-circuit gates re-entry after the self-complete cascade
1678        // installs the terminal slot — no extra `done` flag needed.
1679        let mut emitted = 0usize;
1680        let mut first_false_seen = false;
1681        for (i, &h) in inputs.iter().enumerate() {
1682            if pass.get(i).copied().unwrap_or(false) {
1683                self.binding.retain_handle(h);
1684                self.commit_emission_verbatim(node_id, h);
1685                emitted += 1;
1686            } else {
1687                first_false_seen = true;
1688                break;
1689            }
1690        }
1691        if first_false_seen {
1692            self.complete(node_id);
1693            return;
1694        }
1695        if emitted == 0 {
1696            // Whole batch passed but was empty (impossible here since
1697            // inputs.is_empty() returned early above) — defensive only.
1698            self.settle_dirty_resolved(node_id);
1699        }
1700    }
1701
1702    /// `OperatorOp::Last` dispatch — buffers the latest DATA; emits
1703    /// `Data(latest)` (or `Data(default)` if no DATA arrived and a
1704    /// default was registered) then `Complete` on upstream COMPLETE.
1705    /// On upstream ERROR, propagates verbatim. Storage:
1706    /// [`LastState`](super::op_state::LastState).
1707    ///
1708    /// **Silent-buffer semantics (mirrors Reduce):** on a non-terminal
1709    /// wave (`terminal == None`), `fire_op_last` updates the buffered
1710    /// `latest` handle but produces NO downstream wire message —
1711    /// subscribers observe the operator only when upstream
1712    /// COMPLETE/ERROR triggers the terminal branch. Intermediate
1713    /// inputs from the dep's batch are dropped on the floor (their
1714    /// `data_batch` retains release at wave-end rotation
1715    /// independently). Per-wave settlement on intermediate waves is
1716    /// the canonical behavior for terminal-aware operators.
1717    fn fire_op_last(&self, node_id: NodeId) {
1718        use crate::op_state::LastState;
1719        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
1720        {
1721            let mut s = self.lock_state();
1722            s.require_node_mut(node_id).has_fired_once = true;
1723        }
1724
1725        // Phase 2: buffer the latest input handle (if any). Retain new,
1726        // release old. data_batch slot's retain is released at wave-end
1727        // rotation independently — the LastState slot keeps its own
1728        // share so the value survives across waves.
1729        if let Some(&new_latest) = inputs.last() {
1730            let prev_latest = {
1731                let mut s = self.lock_state();
1732                let scratch = scratch_mut::<LastState>(&mut s, node_id);
1733                let prev = scratch.latest;
1734                scratch.latest = new_latest;
1735                prev
1736            };
1737            self.binding.retain_handle(new_latest);
1738            if prev_latest != crate::handle::NO_HANDLE {
1739                self.binding.release_handle(prev_latest);
1740            }
1741        }
1742
1743        // Phase 3: emit on terminal. Buffer-only fires (no terminal yet)
1744        // produce no downstream message — Reduce-style silent
1745        // accumulation. The post-drain auto-resolve sweep is a no-op
1746        // because pending_notify has no entry for Last.
1747        match terminal {
1748            None => {}
1749            Some(TerminalKind::Complete) => {
1750                // Read the live latest + default. If latest != NO_HANDLE,
1751                // emit it. Otherwise, if default != NO_HANDLE, emit default.
1752                // Otherwise, emit only Complete (empty stream, no default).
1753                let (latest, default) = {
1754                    let s = self.lock_state();
1755                    let scratch = scratch_ref::<LastState>(&s, node_id);
1756                    (scratch.latest, scratch.default)
1757                };
1758                let to_emit = if latest != crate::handle::NO_HANDLE {
1759                    Some(latest)
1760                } else if default != crate::handle::NO_HANDLE {
1761                    Some(default)
1762                } else {
1763                    None
1764                };
1765                if let Some(h) = to_emit {
1766                    // Emission needs its own retain — the LastState slot
1767                    // keeps its share until reset/Drop.
1768                    self.binding.retain_handle(h);
1769                    self.commit_emission_verbatim(node_id, h);
1770                }
1771                self.complete(node_id);
1772            }
1773            Some(TerminalKind::Error(h)) => {
1774                // Take a fresh share for the error cascade — the
1775                // dep_records[0].terminal slot keeps its own share
1776                // (released by reset_for_fresh_lifecycle / Drop).
1777                self.binding.retain_handle(h);
1778                self.error(node_id, h);
1779            }
1780        }
1781    }
1782
1783    pub(crate) fn deliver_data_to_consumer(
1784        &self,
1785        s: &mut CoreState,
1786        consumer_id: NodeId,
1787        dep_idx: usize,
1788        handle: HandleId,
1789    ) {
1790        // Retain the handle for the batch accumulation slot — each DATA
1791        // handle in `data_batch` owns a retain share, released at wave-end
1792        // rotation in `clear_wave_state`.
1793        self.binding.retain_handle(handle);
1794
1795        let is_dynamic;
1796        let is_state;
1797        let tracked_or_first_fire;
1798        // Slice F audit close (2026-05-07): default-mode pause suppression.
1799        // If the consumer is paused with `PausableMode::Default`, the
1800        // canonical-spec §2.6 behavior is to suppress fn-fire and consolidate
1801        // pause-window dep deliveries into one fn execution on RESUME.
1802        // Mark `pending_wave` on the pause state instead of adding to
1803        // `pending_fires`. The dep state still advances (the data_batch push
1804        // above is unchanged), and clear_wave_state still rotates the latest
1805        // dep DATA into prev_data — so when the fn ultimately fires on
1806        // RESUME, it sees the consolidated post-pause state.
1807        let suppressed_for_default_pause;
1808        {
1809            let consumer = s.require_node_mut(consumer_id);
1810            consumer.dep_records[dep_idx].data_batch.push(handle);
1811            consumer.dep_records[dep_idx].involved_this_wave = true;
1812            consumer.involved_this_wave = true;
1813            is_dynamic = consumer.is_dynamic;
1814            is_state = consumer.is_state();
1815            tracked_or_first_fire = !consumer.has_fired_once || consumer.tracked.contains(&dep_idx);
1816            suppressed_for_default_pause = consumer.pause_state.is_paused()
1817                && consumer.pausable == crate::node::PausableMode::Default;
1818            if suppressed_for_default_pause {
1819                consumer.pause_state.mark_pending_wave();
1820            }
1821        }
1822        if suppressed_for_default_pause {
1823            // Default-mode pause: don't add to pending_fires; RESUME will
1824            // schedule one consolidated fire.
1825            return;
1826        }
1827        if is_state {
1828            // State nodes don't have deps; unreachable in practice.
1829        } else if is_dynamic {
1830            if tracked_or_first_fire {
1831                s.pending_fires.insert(consumer_id);
1832            }
1833        } else {
1834            // Derived / Operator / Producer (Producer has no deps so won't
1835            // reach here, but the predicate-based dispatch handles it
1836            // uniformly).
1837            s.pending_fires.insert(consumer_id);
1838        }
1839    }
1840
1841    // -------------------------------------------------------------------
1842    // Subscriber notification
1843    // -------------------------------------------------------------------
1844
1845    /// Queue a wave-end message for `node_id`'s subscribers.
1846    ///
1847    /// **Revision-tracked sink-snapshot batches (Slice X4 / D2,
1848    /// 2026-05-08):** each push for a given node either appends the
1849    /// message to the open batch (if `NodeRecord::subscribers_revision`
1850    /// hasn't advanced since that batch opened — the common case — no
1851    /// extra allocation), or opens a fresh batch with a current sink
1852    /// snapshot frozen at the new revision. A sub installed mid-wave
1853    /// bumps `subscribers_revision`; the next `queue_notify` for the
1854    /// same node observes the bump and starts a new batch that includes
1855    /// the new sub. Pre-subscribe batches retain their original snapshot,
1856    /// so earlier emits flush to their original sink list — the new sub
1857    /// does NOT double-receive them via flush AND handshake replay,
1858    /// closing the late-subscriber + multi-emit-per-wave R1.3.5.a gap.
1859    ///
1860    /// Pause routing decision (R1.3.7.b tier table, §10.2 buffering):
1861    ///   Tier 3 (DATA / RESOLVED) and Tier 4 (INVALIDATE) buffer while
1862    ///   paused; all other tiers (DIRTY tier 1, PAUSE/RESUME tier 2,
1863    ///   COMPLETE/ERROR tier 5, TEARDOWN tier 6) bypass the buffer and
1864    ///   flush immediately. START (tier 0) is per-subscription and never
1865    ///   transits queue_notify.
1866    pub(crate) fn queue_notify(&self, s: &mut CoreState, node_id: NodeId, msg: Message) {
1867        // R1.3.3.a / R1.3.3.d (Slice G — re-added 2026-05-07): dev-mode
1868        // wave-content invariant assertion. The tier-3 slot at one node in
1869        // one wave is either ≥1 DATA or exactly 1 RESOLVED — never mixed,
1870        // never multiple RESOLVED. Slice G moved equals substitution from
1871        // per-emit to wave-end coalescing; this assert pins that the
1872        // dispatcher itself never queues a violating combination at the
1873        // queue_notify granularity. Resolved arrivals come from:
1874        //   1. The auto-resolve sweep in `drain_and_flush` (gates on
1875        //      `!any tier-3` so it can't add to a wave with Data).
1876        //   2. The wave-end equals-substitution pass (rewrites in place,
1877        //      doesn't go through queue_notify).
1878        // Both honor R1.3.3.a by construction post-Slice-G.
1879        #[cfg(debug_assertions)]
1880        if matches!(msg.tier(), 3) {
1881            if let Some(entry) = s.pending_notify.get(&node_id) {
1882                // Walk all batches' messages — R1.3.3.a is a per-node
1883                // wave-content invariant, not per-batch (the X4 batches
1884                // are subscriber-snapshot epochs; the protocol-level
1885                // tier-3 invariant spans the whole wave for the node).
1886                let has_data = entry.iter_messages().any(|m| matches!(m, Message::Data(_)));
1887                let resolved_count = entry
1888                    .iter_messages()
1889                    .filter(|m| matches!(m, Message::Resolved))
1890                    .count();
1891                let incoming_is_data = matches!(msg, Message::Data(_));
1892                if incoming_is_data {
1893                    debug_assert!(
1894                        resolved_count == 0,
1895                        "R1.3.3.a violation at {node_id:?}: queueing Data into a \
1896                         wave that already contains Resolved — Slice G should have \
1897                         prevented this via wave-end coalescing"
1898                    );
1899                } else {
1900                    debug_assert!(
1901                        !has_data,
1902                        "R1.3.3.a violation at {node_id:?}: queueing Resolved into a \
1903                         wave that already contains Data"
1904                    );
1905                    debug_assert!(
1906                        resolved_count == 0,
1907                        "R1.3.3.a violation at {node_id:?}: multiple Resolved in one \
1908                         wave at one node"
1909                    );
1910                }
1911            }
1912        }
1913
1914        let buffered_tier = matches!(msg.tier(), 3 | 4);
1915        let cap = s.pause_buffer_cap;
1916
1917        // Pause-routing branch — handles its own retain/release and returns
1918        // before we touch `pending_notify`, so the rec borrow is contained.
1919        {
1920            let rec = s.require_node_mut(node_id);
1921            if rec.subscribers.is_empty() {
1922                return;
1923            }
1924            // Slice F audit close (2026-05-07): pause routing depends on mode.
1925            //   - `ResumeAll`: buffer tier-3/4 for verbatim replay on RESUME.
1926            //   - `Default` + STATE node: state nodes have no fn-fire to
1927            //     suppress, so buffer like resumeAll (collapse-to-latest is
1928            //     a future enhancement; v1 keeps verbatim).
1929            //   - `Default` + COMPUTE node: suppression happens upstream at
1930            //     fn-fire scheduling (see `deliver_data_to_consumer`); no
1931            //     outgoing tier-3 is produced from this node while paused,
1932            //     so this branch is unreachable for compute-default-paused.
1933            //     Fallthrough to the non-paused queue path is fine.
1934            //   - `Off`: pause is ignored entirely — tier-3 flushes
1935            //     immediately. Fallthrough.
1936            let mode_buffers_tier3 = match rec.pausable {
1937                crate::node::PausableMode::ResumeAll => true,
1938                crate::node::PausableMode::Default => rec.is_state(),
1939                crate::node::PausableMode::Off => false,
1940            };
1941            if buffered_tier && mode_buffers_tier3 && rec.pause_state.is_paused() {
1942                if let Some(h) = msg.payload_handle() {
1943                    self.binding.retain_handle(h);
1944                }
1945                let push_result = rec.pause_state.push_buffered(msg, cap);
1946                for dm in push_result.dropped_msgs {
1947                    if let Some(h) = dm.payload_handle() {
1948                        self.binding.release_handle(h);
1949                    }
1950                }
1951                // R1.3.8.c (Slice F, A3): on first overflow this cycle,
1952                // schedule a synthesized ERROR for wave-end emission.
1953                // `cap` is `Some` here (an overflow can only happen with a
1954                // configured cap), so `unwrap` is safe.
1955                if push_result.first_overflow_this_cycle {
1956                    if let Some((dropped_count, lock_held_ns)) =
1957                        rec.pause_state.overflow_diagnostic()
1958                    {
1959                        s.pending_pause_overflow
1960                            .push(crate::node::PendingPauseOverflow {
1961                                node_id,
1962                                dropped_count,
1963                                configured_max: cap.unwrap_or(0),
1964                                lock_held_ns,
1965                            });
1966                    }
1967                }
1968                return;
1969            }
1970        }
1971
1972        // Non-paused queue path: retain payload handle and queue into
1973        // pending_notify. Released in `flush_notifications` after sinks
1974        // fire.
1975        if let Some(h) = msg.payload_handle() {
1976            self.binding.retain_handle(h);
1977        }
1978        Self::push_into_pending_notify(s, node_id, msg);
1979    }
1980
1981    /// Slice X4 / D2: revision-tracked batch decision for `queue_notify`'s
1982    /// non-paused path. Either appends `msg` to the open batch (if
1983    /// `subscribers_revision` hasn't advanced since it opened — common
1984    /// case, no extra allocation) or opens a fresh batch with a current
1985    /// sink snapshot frozen at the new revision.
1986    ///
1987    /// Borrow discipline: reads `subscribers_revision` and the snapshot
1988    /// from `s.nodes` BEFORE calling `s.pending_notify.entry()` to keep
1989    /// the two field borrows disjoint (split-borrow through
1990    /// `require_node_mut` defeats the borrow checker).
1991    ///
1992    /// Lock-discipline assumption: this read of `subscribers_revision`
1993    /// is safe because both the subscribe install path
1994    /// ([`crate::node::Core::subscribe`]) and `queue_notify` hold
1995    /// `CoreState`'s mutex when they bump / read the revision —
1996    /// concurrent subscribe/unsubscribe cannot interleave. **If
1997    /// `Core::subscribe` ever moves the sink-install lock-released
1998    /// (mirroring the lock-released drain refactor), the revision read
1999    /// here must re-validate post-borrow — otherwise a fresh batch
2000    /// could open with a stale snapshot.**
2001    fn push_into_pending_notify(s: &mut CoreState, node_id: NodeId, msg: Message) {
2002        let current_rev = s.require_node(node_id).subscribers_revision;
2003        let needs_new_batch = s.pending_notify.get(&node_id).is_none_or(|entry| {
2004            entry
2005                .batches
2006                .last()
2007                .is_none_or(|b| b.snapshot_revision != current_rev)
2008        });
2009        let sinks_snapshot: Vec<Sink> = if needs_new_batch {
2010            s.require_node(node_id)
2011                .subscribers
2012                .values()
2013                .cloned()
2014                .collect()
2015        } else {
2016            Vec::new()
2017        };
2018        match s.pending_notify.entry(node_id) {
2019            Entry::Vacant(slot) => {
2020                let mut batches: SmallVec<[PendingBatch; 1]> = SmallVec::new();
2021                batches.push(PendingBatch {
2022                    snapshot_revision: current_rev,
2023                    sinks: sinks_snapshot,
2024                    messages: vec![msg],
2025                });
2026                slot.insert(PendingPerNode { batches });
2027            }
2028            Entry::Occupied(mut slot) => {
2029                let entry = slot.get_mut();
2030                if needs_new_batch {
2031                    entry.batches.push(PendingBatch {
2032                        snapshot_revision: current_rev,
2033                        sinks: sinks_snapshot,
2034                        messages: vec![msg],
2035                    });
2036                } else {
2037                    entry
2038                        .batches
2039                        .last_mut()
2040                        .expect("non-empty by construction (entry exists implies batch exists)")
2041                        .messages
2042                        .push(msg);
2043                }
2044            }
2045        }
2046    }
2047
2048    /// Collect wave-end sink-fire jobs into `s.deferred_flush_jobs` and the
2049    /// payload-handle releases owed for `pending_notify` into
2050    /// `s.deferred_handle_releases`. The actual sink fires + handle releases
2051    /// run **after** the state lock is dropped — see [`Core::run_wave`].
2052    ///
2053    /// R1.3.1.b two-phase propagation: phase 1 (DIRTY) propagates through
2054    /// the entire graph before phase 2 (DATA / RESOLVED) begins. Implemented
2055    /// here as cross-node tier-then-node collect — phase 1's jobs sit before
2056    /// phase 2's in `deferred_flush_jobs`, so when `run_wave` drains the
2057    /// queue lock-released, multi-node subscribers see all DIRTYs before any
2058    /// settle. Matches TS's drainPhase model without the per-tier queue
2059    /// indirection.
2060    ///
2061    /// Phase ordering:
2062    ///   1 → tier 1   (DIRTY)
2063    ///   2 → tier 3+4 (DATA/RESOLVED + INVALIDATE — the "settle slice")
2064    ///   3 → tier 5   (COMPLETE/ERROR)
2065    ///   4 → tier 6   (TEARDOWN)
2066    ///
2067    /// Tier 0 (START) is per-subscription (never enters pending_notify) and
2068    /// tier 2 (PAUSE/RESUME) is delivered through dedicated paths, also
2069    /// bypassing pending_notify; both are absent from this enumeration.
2070    ///
2071    /// Within a single phase, per-node insertion order (IndexMap iteration)
2072    /// is preserved — an emit on A before B → A's phase-2 messages flush
2073    /// before B's. Within a single node, message order is preserved.
2074    fn flush_notifications(&self, s: &mut CoreState) {
2075        const PHASES: &[&[u8]] = &[
2076            &[1],    // DIRTY
2077            &[3, 4], // DATA/RESOLVED + INVALIDATE
2078            &[5],    // COMPLETE/ERROR
2079            &[6],    // TEARDOWN
2080        ];
2081        let pending = std::mem::take(&mut s.pending_notify);
2082        for &phase_tiers in PHASES {
2083            for (_node_id, entry) in &pending {
2084                // Slice X4 / D2: iterate batches in arrival order. Each
2085                // batch carries its own sink snapshot frozen at open-time;
2086                // a batch's messages flush to ITS sinks only. Within a
2087                // single (phase, node), batches stay in arrival order so
2088                // emit-order semantics are preserved across batches.
2089                for batch in &entry.batches {
2090                    if batch.sinks.is_empty() {
2091                        continue;
2092                    }
2093                    let phase_msgs: Vec<Message> = batch
2094                        .messages
2095                        .iter()
2096                        .copied()
2097                        .filter(|m| phase_tiers.contains(&m.tier()))
2098                        .collect();
2099                    if phase_msgs.is_empty() {
2100                        continue;
2101                    }
2102                    let sinks_clone: Vec<Sink> = batch.sinks.iter().map(Arc::clone).collect();
2103                    s.deferred_flush_jobs.push((sinks_clone, phase_msgs));
2104                }
2105            }
2106        }
2107        // Refcount release: balance the retain done in `queue_notify` for
2108        // every payload-bearing message that landed in pending_notify
2109        // (across ALL batches per node). Deferred to post-lock-drop so the
2110        // binding's release path can't re-enter Core under our lock.
2111        for entry in pending.values() {
2112            for msg in entry.iter_messages() {
2113                if let Some(h) = msg.payload_handle() {
2114                    s.deferred_handle_releases.push(h);
2115                }
2116            }
2117        }
2118    }
2119
2120    /// Take the deferred sink-fire jobs, payload-handle releases,
2121    /// cleanup-hook fire queue, and pending-wipe queue from `CoreState`.
2122    /// Callers pair this with a `drop(state_guard)` and a subsequent
2123    /// [`Self::fire_deferred`] to deliver the wave's sinks + handle
2124    /// releases + Slice E2 OnInvalidate cleanup hooks + Slice E2 /qa
2125    /// Q2(b) eager wipe_ctx fires lock-released.
2126    pub(crate) fn drain_deferred(s: &mut CoreState) -> WaveDeferred {
2127        (
2128            std::mem::take(&mut s.deferred_flush_jobs),
2129            std::mem::take(&mut s.deferred_handle_releases),
2130            std::mem::take(&mut s.deferred_cleanup_hooks),
2131            std::mem::take(&mut s.pending_wipes),
2132        )
2133    }
2134
2135    /// Fire deferred sink-fire jobs in collected order, then release the
2136    /// payload handles owed for messages that landed in `pending_notify`
2137    /// during the wave, then fire any queued Slice E2 OnInvalidate cleanup
2138    /// hooks. All three phases run lock-released so:
2139    /// - Sinks that call back into Core (emit, pause, etc.) re-acquire the
2140    ///   state lock cleanly and run their own nested wave.
2141    /// - The binding's `release_handle` path can't deadlock against a
2142    ///   binding-side mutex held by Core.
2143    /// - User cleanup closures (invoked via `BindingBoundary::cleanup_for`)
2144    ///   may safely re-enter Core for unrelated nodes.
2145    ///
2146    /// **Cleanup-drain panic discipline (D060):** each `cleanup_for` call
2147    /// is wrapped in `catch_unwind` so a single binding panic doesn't
2148    /// short-circuit the per-wave drain. All queued cleanup attempts run;
2149    /// if any panicked, the LAST panic re-raises after the loop completes
2150    /// (preserving wave-end discipline while still surfacing failures).
2151    /// Per D060, Core stays panic-naive about user code — bindings own
2152    /// their host-language panic policy inside `cleanup_for`; this
2153    /// `catch_unwind` is purely about drain-don't-short-circuit.
2154    pub(crate) fn fire_deferred(
2155        &self,
2156        jobs: DeferredJobs,
2157        releases: Vec<HandleId>,
2158        cleanup_hooks: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
2159        pending_wipes: Vec<crate::handle::NodeId>,
2160    ) {
2161        // Slice E2 /qa P1 (2026-05-07): wrap each sink-fire in
2162        // `catch_unwind` so a panicking sink doesn't unwind out of
2163        // `fire_deferred` and drop the queued `releases` +
2164        // `cleanup_hooks`. Mirrors Slice F audit fix A7's per-tier
2165        // handshake-fire discipline. Without this guard, a sink panic
2166        // here would silently leak handle retains AND silently drop
2167        // OnInvalidate cleanup hooks. AssertUnwindSafe is safe because
2168        // we re-raise the last panic at the end after running every
2169        // queued fire — drain ordering is preserved.
2170        let mut last_panic: Option<Box<dyn std::any::Any + Send>> = None;
2171        for (sinks, msgs) in jobs {
2172            for sink in &sinks {
2173                let sink = sink.clone();
2174                let msgs_ref = &msgs;
2175                let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2176                    sink(msgs_ref);
2177                }));
2178                if let Err(payload) = result {
2179                    last_panic = Some(payload);
2180                }
2181            }
2182        }
2183        for h in releases {
2184            self.binding.release_handle(h);
2185        }
2186        // Slice E2 (D060): drain cleanup hooks with per-item panic
2187        // isolation so the loop always completes. AssertUnwindSafe is
2188        // safe here because we don't rely on logical state being valid
2189        // post-panic — the panic propagates anyway after the drain ends.
2190        for (node_id, trigger) in cleanup_hooks {
2191            let binding = &self.binding;
2192            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2193                binding.cleanup_for(node_id, trigger);
2194            }));
2195            if let Err(payload) = result {
2196                last_panic = Some(payload);
2197            }
2198        }
2199        // Slice E2 /qa Q2(b) (D069): drain eager wipe_ctx queue with the
2200        // same per-item panic isolation. Fires AFTER cleanup hooks so a
2201        // resubscribable node's OnInvalidate (or any tier-3+ cleanup that
2202        // fires in the same wave) sees pre-wipe binding state if it
2203        // landed in the same wave as the terminal cascade. Mutually
2204        // exclusive with `Subscription::Drop`'s direct-fire site, but
2205        // even concurrent fires are idempotent (binding's `wipe_ctx`
2206        // calls `HashMap::remove` which is a no-op on absent keys).
2207        for node_id in pending_wipes {
2208            let binding = &self.binding;
2209            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2210                binding.wipe_ctx(node_id);
2211            }));
2212            if let Err(payload) = result {
2213                last_panic = Some(payload);
2214            }
2215        }
2216        if let Some(payload) = last_panic {
2217            std::panic::resume_unwind(payload);
2218        }
2219    }
2220
2221    // -------------------------------------------------------------------
2222    // User-facing batch — coalesce multiple emits into one wave
2223    // -------------------------------------------------------------------
2224
2225    /// Coalesce multiple emissions into a single wave. Every `emit` /
2226    /// `complete` / `error` / `teardown` / `invalidate` call inside `f`
2227    /// queues its downstream work; the wave drains when `f` returns.
2228    ///
2229    /// **R1.3.6.a** — DIRTY still propagates immediately (tier 1 isn't
2230    /// deferred); only tier-3+ delivery is held until scope exit. **R1.3.6.b**
2231    /// — repeated emits on the same node coalesce into a single multi-message
2232    /// delivery (one [`Message::Dirty`] for the wave + one [`Message::Data`]
2233    /// per emit, all delivered together in the per-node phase-2 pass).
2234    ///
2235    /// Nested `batch()` calls share the outer wave; only the outermost call
2236    /// drives the drain. Re-entrant calls from inside an `emit`/fn (the wave
2237    /// engine's own `s.in_tick` re-entrance) compose with this method
2238    /// transparently — they observe `in_tick = true` and skip drain just
2239    /// like nested `batch()`.
2240    ///
2241    /// On panic inside `f`, the `BatchGuard` returned by the internal
2242    /// `begin_batch` call drops normally and discards pending tier-3+ work
2243    /// (subscribers do not observe the half-built wave). See
2244    /// [`Core::begin_batch`] for the RAII variant if you need explicit control
2245    /// over the scope boundary.
2246    pub fn batch<F>(&self, f: F)
2247    where
2248        F: FnOnce(),
2249    {
2250        let _guard = self.begin_batch();
2251        f();
2252    }
2253
2254    /// RAII batch handle — opens a wave when constructed, drains on drop.
2255    ///
2256    /// Mirrors the closure-based [`Self::batch`] but exposes the scope
2257    /// boundary so callers can compose batches with non-`FnOnce` control
2258    /// flow (e.g. async-state-machine code paths, or splitting setup and
2259    /// drain across helper functions).
2260    ///
2261    /// ```ignore
2262    /// let g = core.begin_batch();
2263    /// core.emit(state_a, h1);
2264    /// core.emit(state_b, h2);
2265    /// drop(g); // wave drains here
2266    /// ```
2267    ///
2268    /// Like the closure form, nested `begin_batch` calls share the outer
2269    /// wave (only the outermost guard drains).
2270    #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
2271    pub fn begin_batch(&self) -> BatchGuard {
2272        // Acquire the wave-owner re-entrant mutex BEFORE the state lock.
2273        // Same-thread re-entry passes through (e.g. fn → emit → run_wave →
2274        // begin_batch from inside outer wave's invoke_fn); cross-thread
2275        // entries block until the in-flight wave's `_wave_guard` drops.
2276        // `lock_arc()` is `!Send` — strengthens BatchGuard's `!Send`
2277        // guarantee at the type level.
2278        let wave_guard = self.wave_owner.lock_arc();
2279        let owns_tick = {
2280            let mut s = self.lock_state();
2281            let was_in = s.in_tick;
2282            if !was_in {
2283                s.in_tick = true;
2284            }
2285            !was_in
2286        };
2287        BatchGuard {
2288            core: self.clone(),
2289            owns_tick,
2290            _wave_guard: wave_guard,
2291            _not_send: std::marker::PhantomData,
2292        }
2293    }
2294}
2295
2296/// RAII guard returned by [`Core::begin_batch`].
2297///
2298/// While alive, suppresses per-emit wave drains — multiple `emit` /
2299/// `complete` / `error` / `teardown` / `invalidate` calls coalesce into one
2300/// wave. On drop:
2301/// - Outermost guard: drains the wave (fires sinks, runs cleanup, clears
2302///   in-tick).
2303/// - Nested guard (an outer `BatchGuard` or an in-progress wave already owns
2304///   the in-tick flag): silently no-ops.
2305///
2306/// On thread panic during the closure body, the drop path discards pending
2307/// tier-3+ delivery rather than firing sinks (avoids cascading panics).
2308/// Subscribers observe **no tier-3+ delivery for the panicked wave**.
2309/// State-node cache writes that already executed inside the closure are
2310/// rolled back via wave-cache snapshots — `cache_of(s)` returns the pre-
2311/// panic value. The atomicity guarantee covers both sink-observability and
2312/// cache state.
2313///
2314/// # Thread safety
2315///
2316/// `BatchGuard` is **`!Send`** by design. `begin_batch` claims the
2317/// per-`Core` `in_tick` flag AND the per-`Core` `wave_owner` re-entrant
2318/// mutex on the calling thread; sending the guard to another thread
2319/// and dropping it there would clear `in_tick` and release `wave_owner`
2320/// from a different thread than the one that acquired them, breaking
2321/// both the thread-local "I own the wave scope" semantic and
2322/// `parking_lot::ReentrantMutex`'s ownership invariant. The
2323/// `_wave_guard` field is `!Send` (`ArcReentrantMutexGuard<()>`); the
2324/// `PhantomData<*const ()>` marker is belt-and-suspenders.
2325///
2326/// ```compile_fail
2327/// use graphrefly_core::{BatchGuard, BindingBoundary, Core, DepBatch, FnId, FnResult, HandleId, NodeId};
2328/// use std::sync::Arc;
2329///
2330/// struct Stub;
2331/// impl BindingBoundary for Stub {
2332///     fn invoke_fn(&self, _: NodeId, _: FnId, _: &[DepBatch]) -> FnResult {
2333///         FnResult::Noop { tracked: None }
2334///     }
2335///     fn custom_equals(&self, _: FnId, _: HandleId, _: HandleId) -> bool { false }
2336///     fn release_handle(&self, _: HandleId) {}
2337/// }
2338/// fn requires_send<T: Send>(_: T) {}
2339/// let core = Core::new(Arc::new(Stub) as Arc<dyn BindingBoundary>);
2340/// let guard = core.begin_batch();
2341/// requires_send(guard); // <- compile_fail: BatchGuard is !Send.
2342/// ```
2343#[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
2344pub struct BatchGuard {
2345    core: Core,
2346    owns_tick: bool,
2347    /// Re-entrant mutex guard held for the wave's duration. Drop releases
2348    /// `wave_owner`, allowing cross-thread waves blocked at
2349    /// `Core::begin_batch` to proceed. Same-thread re-entry (nested
2350    /// `begin_batch` from a fn callback) re-acquires this mutex
2351    /// transparently.
2352    ///
2353    /// `ArcReentrantMutexGuard<()>` is `!Send`, so `BatchGuard` is `!Send`
2354    /// at the type level — sending across threads would violate
2355    /// `parking_lot::ReentrantMutex`'s thread-ownership invariant.
2356    _wave_guard: ArcReentrantMutexGuard<parking_lot::RawMutex, parking_lot::RawThreadId, ()>,
2357    _not_send: std::marker::PhantomData<*const ()>,
2358}
2359
2360impl Drop for BatchGuard {
2361    fn drop(&mut self) {
2362        if !self.owns_tick {
2363            return;
2364        }
2365        if std::thread::panicking() {
2366            // Discard pending wave work to avoid firing sinks during
2367            // unwind (sink panic during unwind would abort the process).
2368            //
2369            // Refcount discipline: pending_notify entries (or any
2370            // already-collected deferred_handle_releases from a partial
2371            // drain) hold a queue_notify-time retain on every payload
2372            // handle. Release them here so the discard doesn't leak.
2373            //
2374            // Wave-cache snapshots restore pre-panic cache values so the
2375            // atomicity guarantee covers state, not just observability.
2376            let (pending, deferred_releases, restored_releases) = {
2377                let mut s = self.core.lock_state();
2378                let pending = std::mem::take(&mut s.pending_notify);
2379                let _: DeferredJobs = std::mem::take(&mut s.deferred_flush_jobs);
2380                s.pending_fires.clear();
2381                let restored = self.core.restore_wave_cache_snapshots(&mut s);
2382                // clear_wave_state pushes batch-handle releases into
2383                // deferred_handle_releases, so take it AFTER the clear.
2384                s.clear_wave_state();
2385                let deferred_releases = std::mem::take(&mut s.deferred_handle_releases);
2386                // Slice E2 (D061): panic-discard wave drops queued
2387                // OnInvalidate cleanup hooks SILENTLY. Bindings using
2388                // OnInvalidate for external-resource cleanup MUST
2389                // idempotent-cleanup at process exit / next successful
2390                // invalidate. Mirrors A3 `pending_pause_overflow`
2391                // panic-discard precedent.
2392                let _: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)> =
2393                    std::mem::take(&mut s.deferred_cleanup_hooks);
2394                // Slice E2 /qa Q2(b) (D069): same panic-discard discipline
2395                // for the eager-wipe queue. A panic-discarded wave drops
2396                // queued `wipe_ctx` fires silently; the binding-side
2397                // `NodeCtxState` entry remains until the next successful
2398                // terminate-with-no-subs cycle (or until `Core` drops).
2399                // This mirrors D061's external-resource-cleanup gap and
2400                // is documented similarly.
2401                let _: Vec<crate::handle::NodeId> = std::mem::take(&mut s.pending_wipes);
2402                s.in_tick = false;
2403                (pending, deferred_releases, restored)
2404            };
2405            // Lock dropped — release retains lock-released so the binding
2406            // can't deadlock against an internal binding mutex.
2407            for entry in pending.values() {
2408                for msg in entry.iter_messages() {
2409                    if let Some(h) = msg.payload_handle() {
2410                        self.core.binding.release_handle(h);
2411                    }
2412                }
2413            }
2414            for h in deferred_releases {
2415                self.core.binding.release_handle(h);
2416            }
2417            for h in restored_releases {
2418                self.core.binding.release_handle(h);
2419            }
2420            return;
2421        }
2422        // Successful drain — drain_and_flush manages its own locking.
2423        self.core.drain_and_flush();
2424        // Wave cleanup + extract deferred jobs under the lock.
2425        let (jobs, releases, cleanup_hooks, pending_wipes) = {
2426            let mut s = self.core.lock_state();
2427            s.clear_wave_state();
2428            s.in_tick = false;
2429            self.core.commit_wave_cache_snapshots(&mut s);
2430            // `drain_deferred` takes `deferred_flush_jobs` +
2431            // `deferred_handle_releases` (incl. rotation releases pushed
2432            // by `clear_wave_state` above) + Slice E2
2433            // `deferred_cleanup_hooks` + Slice E2 /qa Q2(b)
2434            // `pending_wipes`.
2435            Core::drain_deferred(&mut s)
2436        };
2437        // Lock dropped — fire deferred sinks + release retains + fire
2438        // cleanup hooks (Slice E2 OnInvalidate, D060 catch_unwind drain)
2439        // + fire eager wipes (D069).
2440        self.core
2441            .fire_deferred(jobs, releases, cleanup_hooks, pending_wipes);
2442    }
2443}