Skip to main content

graphrefly_core/
batch.rs

1//! Wave engine — drain loop, fire selection, emission commit, sink dispatch.
2//!
3//! Ports the wave-engine portion of the handle-protocol prototype
4//! (`~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`).
5//! Sibling to [`super::node`]; the dispatcher's other concerns
6//! (registration, subscription, pause/resume, terminal cascade,
7//! `set_deps`) live there.
8//!
9//! # Wave engine entry points
10//!
11//! - [`Core::run_wave`] — wave entry. Claims `in_tick` under the state lock,
12//!   runs `op` lock-released, then drains all transitive fn-fires and
13//!   flushes per-subscriber notifications. Each fn-fire iteration drops
14//!   the state lock around `BindingBoundary::invoke_fn` so user fn callbacks
15//!   can re-enter Core safely.
16//! - [`Core::drain_and_flush`] — drain phase + flush phase. Acquires/drops
17//!   the state lock per iteration around `invoke_fn`.
18//! - [`Core::commit_emission`] — equals-substitution + DIRTY/DATA/RESOLVED
19//!   queueing + child propagation. `&self`-only; bracket-fires
20//!   `BindingBoundary::custom_equals` lock-released.
21//! - [`Core::queue_notify`] — per-subscriber message queueing with
22//!   pause-buffer routing. Snapshots the subscriber list at first-touch-
23//!   per-wave so late subscribers (installed mid-wave between drain
24//!   iterations) don't receive duplicate deliveries from messages already
25//!   queued before they subscribed.
26//! - [`Core::deliver_data_to_consumer`] — single-edge propagation; marks
27//!   the consumer for fn-fire if its tracked-deps set is satisfied.
28//!   Called from `commit_emission`, plus `activate_derived` and
29//!   `set_deps` in [`super::node`].
30//!
31//! # Re-entrance discipline (Slice A close — M1 fully lock-released)
32//!
33//! - **Wave-end sink fires** drop the state lock first (Slice A-bigger
34//!   discipline).
35//! - **`BindingBoundary::invoke_fn`** in `fire_fn` fires lock-released —
36//!   user fn callbacks may re-enter `Core::emit` / `pause` / `resume` /
37//!   `invalidate` / `complete` / `error` / `teardown` and run a nested
38//!   wave (the existing `s.in_tick` re-entrance gate composes
39//!   transparently).
40//! - **`BindingBoundary::custom_equals`** in `commit_emission`'s equals
41//!   check fires lock-released.
42//! - **Subscribe-time handshake** is the one remaining lock-held callback.
43//!   It now fires per-tier (`[Start]`, `[Data(v)]`, `[Complete|Error]`,
44//!   `[Teardown]`) as separate sink calls, matching the canonical R1.3.5.a
45//!   tier-split. Re-entrance from a handshake sink callback panics with
46//!   the [`reentrance_guard`] diagnostic.
47
48use std::sync::Arc;
49
50use indexmap::map::Entry;
51use parking_lot::ArcReentrantMutexGuard;
52
53use smallvec::SmallVec;
54
55use crate::boundary::{DepBatch, FnEmission, FnResult};
56use crate::handle::{HandleId, NodeId, NO_HANDLE};
57use crate::message::Message;
58use crate::node::{Core, CoreState, EqualsMode, OperatorOp, Sink, TerminalKind};
59
60/// Deferred sink-fire jobs collected during `flush_notifications`. Each
61/// entry pairs a snapshot of the sink Arcs to fire with the messages to
62/// deliver to them — one entry per (node × phase) cell with non-empty
63/// content. Drained from `CoreState` and fired lock-released.
64pub(crate) type DeferredJobs = Vec<(Vec<Sink>, Vec<Message>)>;
65
66/// Lock-released drain payload of the wave's BatchGuard:
67/// `(sink_jobs, handle_releases, OnInvalidate cleanup hooks, pending wipe_ctx fires)`.
68/// Returned by [`Core::drain_deferred`], consumed by [`Core::fire_deferred`].
69/// Sliced into a type alias to satisfy `clippy::type_complexity`.
70pub(crate) type WaveDeferred = (
71    DeferredJobs,
72    Vec<HandleId>,
73    Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
74    Vec<crate::handle::NodeId>,
75);
76
77/// Per-node wave-end notification queue.
78///
79/// Subscribers are snapshotted on the FIRST `queue_notify` call for the
80/// node within a wave. Subsequent calls append messages but reuse the
81/// snapshot. This makes late subscribers (installed between fn-fire
82/// iterations now that the state lock is dropped per-iteration) invisible
83/// to messages already queued before they subscribed — they get the
84/// already-fresh post-commit cache via their handshake, but do not
85/// double-receive the wave's `[Dirty, Data]` that would otherwise duplicate
86/// the value.
87///
88/// Bookkeeping: `payload_retains` is the count of `payload_handle()`
89/// retains owed to the binding when this entry's messages release at
90/// flush time. Each push of a payload-bearing message bumps it; flush
91/// drains it via `deferred_handle_releases`.
92pub(crate) struct PendingPerNode {
93    pub(crate) sinks: Vec<Sink>,
94    pub(crate) messages: Vec<Message>,
95}
96
97/// RAII helper for the A6 reentrancy guard (Slice F, 2026-05-07).
98///
99/// Pushes `node_id` onto [`CoreState::currently_firing`] on construction,
100/// pops it on Drop. [`Core::set_deps`] consults the stack and rejects
101/// `set_deps(N, ...)` from inside N's own fn-fire with
102/// [`crate::node::SetDepsError::ReentrantOnFiringNode`] — closing the
103/// D1 hazard where Phase-1's snapshot of `dep_handles` would refer to
104/// a different dep ordering than Phase-3's `tracked` storage.
105///
106/// Wraps the lock-released `invoke_fn` (and operator-equivalent FFI
107/// callbacks like `project_each` / `predicate_each`). Drop fires even
108/// on panic, so the stack stays balanced under user-fn unwinds.
109///
110/// Membership semantics (NOT strict LIFO): the only consumer of
111/// `currently_firing` is `Core::set_deps`'s reentrancy check, which uses
112/// `contains(&n)` — a set-membership test. Drop pops the right-most
113/// matching `node_id` via `rposition` + `swap_remove`. For a stack like
114/// `[A, B, A]` (A's fn re-enters B, B's fn re-enters A), B's drop pops
115/// the SECOND A (index 1) via swap_remove, leaving `[A, A]` — the
116/// physical order of the remaining As may not match construction order,
117/// but membership is preserved. If a future call site needs strict LIFO
118/// (e.g. "pop the most recently fired node"), switch to `pop()` + assert
119/// the popped value equals `self.node_id`. (QA A6, 2026-05-07)
120pub(crate) struct FiringGuard {
121    core: Core,
122    node_id: NodeId,
123}
124
125impl FiringGuard {
126    pub(crate) fn new(core: &Core, node_id: NodeId) -> Self {
127        core.lock_state().currently_firing.push(node_id);
128        Self {
129            core: core.clone(),
130            node_id,
131        }
132    }
133}
134
135impl Drop for FiringGuard {
136    fn drop(&mut self) {
137        let mut s = self.core.lock_state();
138        if let Some(pos) = s.currently_firing.iter().rposition(|n| *n == self.node_id) {
139            s.currently_firing.swap_remove(pos);
140        }
141        // else: already popped by an external rebalance — silent no-op
142        // for Drop discipline (panic-in-Drop is poison).
143    }
144}
145
146/// Borrow the per-operator scratch slot as `&T`. Panics if the slot is
147/// uninitialized or the contained type doesn't match `T` — both are
148/// invariant violations for any `fire_op_*` helper that should only be
149/// called from `fire_operator`'s match arm for the matching variant.
150fn scratch_ref<T: crate::op_state::OperatorScratch>(s: &CoreState, node_id: NodeId) -> &T {
151    s.require_node(node_id)
152        .op_scratch
153        .as_ref()
154        .expect("op_scratch slot uninitialized for operator node")
155        .as_any_ref()
156        .downcast_ref::<T>()
157        .expect("op_scratch type mismatch")
158}
159
160/// Mutable borrow of the per-operator scratch slot. Same invariants as
161/// [`scratch_ref`].
162fn scratch_mut<T: crate::op_state::OperatorScratch>(s: &mut CoreState, node_id: NodeId) -> &mut T {
163    s.require_node_mut(node_id)
164        .op_scratch
165        .as_mut()
166        .expect("op_scratch slot uninitialized for operator node")
167        .as_any_mut()
168        .downcast_mut::<T>()
169        .expect("op_scratch type mismatch")
170}
171
172impl Core {
173    // -------------------------------------------------------------------
174    // Wave entry + drain
175    // -------------------------------------------------------------------
176
177    /// Wave entry. The caller passes a closure that performs the wave's
178    /// triggering operation (`commit_emission`, `terminate_node`, etc.).
179    /// The closure runs lock-released; closure-internal Core methods
180    /// acquire the state lock as they go.
181    ///
182    /// **Implementation:** delegates to [`Self::begin_batch`] for the
183    /// wave's RAII lifecycle. The returned `BatchGuard` holds the
184    /// `wave_owner` re-entrant mutex for the wave's duration (cross-thread
185    /// emits block; same-thread re-entry passes through), claims `in_tick`,
186    /// and on drop runs the drain + flush + sink-fire phases — OR, if the
187    /// closure panicked, the panic-discard path that restores cache
188    /// snapshots and clears in_tick. This unification gives `run_wave` the
189    /// same panic-safety guarantee as the user-facing `Core::batch`.
190    ///
191    /// **Re-entrance:** a closure invoked from inside another wave — the
192    /// inner `run_wave`'s `begin_batch` observes `in_tick=true`, the
193    /// returned guard is non-owning (`owns_tick=false`), drop is a no-op.
194    /// The outer wave's drain picks up the inner closure's queued work.
195    ///
196    /// **Lock-release discipline (Slice A close, M1):** all binding-side
197    /// callbacks except the subscribe-time handshake fire lock-released.
198    /// Sinks that re-enter Core run a nested wave; user fns that re-enter
199    /// Core run a nested wave; custom-equals oracles that re-enter Core
200    /// run a nested wave. Cross-thread emits block at `wave_owner` until
201    /// the in-flight wave's drain completes — preserving the user-facing
202    /// "emit returning means subscribers have observed" contract.
203    pub(crate) fn run_wave<F>(&self, op: F)
204    where
205        F: FnOnce(&Self),
206    {
207        // The guard holds wave_owner + claims in_tick. On normal scope
208        // exit it drains + flushes + fires sinks. On panic-during-op it
209        // restores cache snapshots + clears in_tick (panic-discard).
210        let _guard = self.begin_batch();
211        op(self);
212        // _guard drops here → drain or panic-discard, depending on whether
213        // op() returned normally.
214    }
215
216    /// Release retains held by `wave_cache_snapshots` and clear the map.
217    /// Called from the wave-success paths in `run_wave` and `BatchGuard`.
218    pub(crate) fn commit_wave_cache_snapshots(&self, s: &mut CoreState) {
219        if s.wave_cache_snapshots.is_empty() {
220            return;
221        }
222        let snapshots = std::mem::take(&mut s.wave_cache_snapshots);
223        for (_, old_handle) in snapshots {
224            self.binding.release_handle(old_handle);
225        }
226    }
227
228    /// Restore cache slots from `wave_cache_snapshots` and clear the map.
229    /// Called from the wave-abort path in `BatchGuard::drop` (panic).
230    ///
231    /// For each snapshotted node:
232    ///
233    /// 1. Read the current cache (the in-flight new value).
234    /// 2. Set `cache = old_handle` (the snapshot's retained value).
235    /// 3. Release the now-unowned current cache handle.
236    ///
237    /// Returns the list of "current" handles to release outside the lock.
238    pub(crate) fn restore_wave_cache_snapshots(&self, s: &mut CoreState) -> Vec<HandleId> {
239        if s.wave_cache_snapshots.is_empty() {
240            return Vec::new();
241        }
242        let snapshots = std::mem::take(&mut s.wave_cache_snapshots);
243        let mut releases = Vec::with_capacity(snapshots.len());
244        for (node_id, old_handle) in snapshots {
245            let Some(rec) = s.nodes.get_mut(&node_id) else {
246                releases.push(old_handle);
247                continue;
248            };
249            let current = std::mem::replace(&mut rec.cache, old_handle);
250            if current != NO_HANDLE {
251                releases.push(current);
252            }
253        }
254        releases
255    }
256
257    /// Drain pending fires until quiescent, then flush wave-end notifications
258    /// to subscribers. Each fire iteration drops the state lock around the
259    /// binding's `invoke_fn` callback so user fns may re-enter Core safely.
260    ///
261    /// `&self`-only — manages its own locking. Called from [`Self::run_wave`]
262    /// and [`super::node::Core::activate_derived`] (via `run_wave`).
263    pub(crate) fn drain_and_flush(&self) {
264        let mut guard = 0u32;
265        loop {
266            // R1.3.8.c (Slice F, A3): if no fires are pending but there are
267            // queued pause-overflow ERRORs, synthesize them now. The
268            // resulting ERROR cascade may add to pending_fires (children
269            // settling their terminal state), so we loop back to drain.
270            let synth_pending = {
271                let mut s = self.lock_state();
272                if s.pending_fires.is_empty() && !s.pending_pause_overflow.is_empty() {
273                    std::mem::take(&mut s.pending_pause_overflow)
274                } else {
275                    Vec::new()
276                }
277            };
278            for entry in synth_pending {
279                // Lock-released call to the binding hook. Default impl
280                // returns None — the binding has opted out of R1.3.8.c
281                // and we fall back to silent-drop + ResumeReport.dropped.
282                let handle = self.binding.synthesize_pause_overflow_error(
283                    entry.node_id,
284                    entry.dropped_count,
285                    entry.configured_max,
286                    entry.lock_held_ns / 1_000_000,
287                );
288                if let Some(h) = handle {
289                    // Re-enter Core::error to terminate the node and
290                    // cascade. We're inside a wave (`in_tick = true`),
291                    // so error() gets a non-owning batch guard — it
292                    // doesn't try to start its own drain. The cascade
293                    // queues into our outer drain via pending_fires
294                    // and pending_notify.
295                    self.error(entry.node_id, h);
296                }
297            }
298
299            // Pick next fire under a short lock. Also re-read the configured
300            // drain cap so callers can tune via `Core::set_max_batch_drain_iterations`
301            // without restarting waves mid-flight.
302            let (next, cap, pending_size) = {
303                let s = self.lock_state();
304                if s.pending_fires.is_empty() {
305                    break;
306                }
307                let cap = s.max_batch_drain_iterations;
308                let pending_size = s.pending_fires.len();
309                let next = self.pick_next_fire(&s);
310                (next, cap, pending_size)
311            };
312            guard += 1;
313            assert!(
314                guard < cap,
315                "wave drain exceeded {cap} iterations \
316                 (pending_fires={pending_size}). Most likely cause: a runtime \
317                 cycle introduced by an operator that re-arms its own pending_fires \
318                 slot from inside `invoke_fn` (e.g. a producer that subscribes to \
319                 itself, or a fn that calls Core::emit on a node whose fn fires \
320                 the original node again). Structural cycles via set_deps are \
321                 rejected at edge-mutation time. Tune via Core::set_max_batch_drain_iterations \
322                 only with concrete evidence the workload needs more iterations."
323            );
324            let Some(next) = next else { break };
325            // fire_fn manages its own locking around invoke_fn.
326            self.fire_fn(next);
327        }
328        // Auto-resolve sweep: nodes registered in pending_auto_resolve
329        // by the RESOLVED child propagation need a Resolved if they didn't
330        // fire and settle via their own commit_emission. Check pending_notify
331        // for each candidate — if it has Dirty but no tier-3+ message, the
332        // node never settled and needs auto-Resolved. Route through
333        // queue_notify so paused nodes get the Resolved into their pause
334        // buffer.
335        let mut s = self.lock_state();
336        let candidates = std::mem::take(&mut s.pending_auto_resolve);
337        for node_id in candidates {
338            let needs_resolve = s
339                .pending_notify
340                .get(&node_id)
341                .is_some_and(|entry| !entry.messages.iter().any(|m| m.tier() >= 3));
342            if needs_resolve {
343                self.queue_notify(&mut s, node_id, Message::Resolved);
344            }
345        }
346        // Final flush phase — populates deferred_flush_jobs
347        // from pending_notify (already carries per-node sink snapshots).
348        self.flush_notifications(&mut s);
349    }
350
351    /// Pick a node whose **transitive** upstream is fully settled.
352    ///
353    /// A node `id` is ready to fire iff none of its transitive ancestors
354    /// (via the `deps` chain) is currently in `pending_fires`. Diamond
355    /// glitch prevention requires transitive (not immediate-only) reasoning
356    /// — for graph `A → {B, C, E}; D = combine(B, C); F = combine(D, E)`,
357    /// a wave that fires `E` first adds `F` to `pending_fires` before `D`
358    /// is added. An immediate-only readiness check would mistakenly pick
359    /// `F` because neither `D` nor `E` are in `pending_fires` at that
360    /// moment, firing `F` against the stale activation-time `D` cache and
361    /// again later when `D` actually settles. Transitive upstream walk
362    /// catches `B`/`C` pending and correctly defers `F`.
363    ///
364    /// Cost: O(V) per candidate; worst case O(N·V) per pick. The existing
365    /// porting-deferred entry on `pick_next_fire` perf flagged this as a
366    /// future per-node `unresolved_dep_count` refactor.
367    fn pick_next_fire(&self, s: &CoreState) -> Option<NodeId> {
368        for &id in &s.pending_fires {
369            if Self::transitive_upstream_settled(s, id) {
370                return Some(id);
371            }
372        }
373        // Cycle / no eligible candidate (every node has an upstream pending,
374        // possibly via a cycle path): pick any so the drain guard advances.
375        // The drain-iteration cap will catch genuine cycles.
376        s.pending_fires.iter().copied().next()
377    }
378
379    fn transitive_upstream_settled(s: &CoreState, node_id: NodeId) -> bool {
380        let rec = s.require_node(node_id);
381        if rec.dep_count() == 0 {
382            return true;
383        }
384        let mut visited: ahash::AHashSet<NodeId> = ahash::AHashSet::new();
385        let mut stack: Vec<NodeId> = rec.dep_ids_vec();
386        while let Some(id) = stack.pop() {
387            if !visited.insert(id) {
388                continue;
389            }
390            if s.pending_fires.contains(&id) {
391                return false;
392            }
393            if let Some(r) = s.nodes.get(&id) {
394                for dep_id in r.dep_ids() {
395                    if !visited.contains(&dep_id) {
396                        stack.push(dep_id);
397                    }
398                }
399            }
400        }
401        true
402    }
403
404    /// Wave drain entry point. Dispatches via `rec.op` to either the
405    /// regular fn-fire path ([`Self::fire_regular`]) or the operator
406    /// dispatch ([`Self::fire_operator`]).
407    pub(crate) fn fire_fn(&self, node_id: NodeId) {
408        let op = {
409            let s = self.lock_state();
410            s.nodes.get(&node_id).and_then(|r| r.op)
411        };
412        match op {
413            Some(operator_op) => self.fire_operator(node_id, operator_op),
414            None => {
415                // State / Derived / Dynamic / Producer all dispatch via fn_id.
416                self.fire_regular(node_id);
417            }
418        }
419    }
420
421    /// Fire a node's fn lock-released around `invoke_fn`.
422    ///
423    /// Phase 1 (lock-held): remove from pending_fires, snapshot fn_id +
424    /// dep_records → DepBatch + kind. Skip if terminal, first-run-gate-closed,
425    /// or stateless.
426    ///
427    /// Phase 2 (lock-released): call `BindingBoundary::invoke_fn`. User fn
428    /// callbacks may re-enter Core (`emit`, `pause`, etc.) and run a nested
429    /// wave — the in_tick gate composes naturally because nested calls
430    /// observe `in_tick = true` and skip their own drain.
431    ///
432    /// Phase 3 (lock-held): mark `has_fired_once`, store dynamic-tracked,
433    /// decide between Noop+RESOLVED, single Data, or Batch.
434    ///
435    /// Phase 4: commit emissions. Single Data goes through
436    /// `commit_emission` (with equals substitution). Batch emissions are
437    /// processed in sequence — Data via `commit_emission_verbatim` (no
438    /// equals substitution per R1.3.2.d / R1.3.3.c), Complete/Error via
439    /// terminal cascade.
440    #[allow(clippy::too_many_lines)] // Slice G added Noop / Batch tier-3 guards
441    fn fire_regular(&self, node_id: NodeId) {
442        enum FireAction {
443            None,
444            SingleData(HandleId),
445            Batch(SmallVec<[FnEmission; 2]>),
446        }
447
448        // Phase 1: snapshot inputs — build DepBatch per dep from dep_records.
449        // `has_fired_once` is captured here for the Slice E2 OnRerun gate
450        // (Phase 1.5 below): the cleanup hook only fires when the fn has
451        // run at least once already in this activation cycle.
452        let prep: Option<(crate::handle::FnId, Vec<DepBatch>, bool, bool)> = {
453            let mut s = self.lock_state();
454            s.pending_fires.remove(&node_id);
455            let rec = s.require_node(node_id);
456            // Skip: terminal, first-run-gate-closed (R2.5.3 / R5.4 — partial
457            // mode opts out of the gate per D011), or stateless.
458            if rec.terminal.is_some() || (!rec.partial && rec.has_sentinel_deps()) {
459                None
460            } else {
461                rec.fn_id.map(|fn_id| {
462                    let dep_batches: Vec<DepBatch> = rec
463                        .dep_records
464                        .iter()
465                        .map(|dr| DepBatch {
466                            data: dr.data_batch.clone(),
467                            prev_data: dr.prev_data,
468                            involved: dr.involved_this_wave,
469                        })
470                        .collect();
471                    (fn_id, dep_batches, rec.is_dynamic, rec.has_fired_once)
472                })
473            }
474        };
475        let Some((fn_id, dep_batches, is_dynamic, has_fired_once)) = prep else {
476            return;
477        };
478
479        // Phase 1.5 (Slice E2 — R2.4.5 OnRerun, lock-released per D045): if
480        // the fn has fired at least once in this activation cycle, fire its
481        // OnRerun cleanup hook BEFORE the next invoke_fn re-allocates fn-
482        // local resources. First-fire is intentionally skipped — there is
483        // no prior run to clean up. Fires OUTSIDE `FiringGuard` because
484        // cleanup re-entrance is not the A6 reentrancy concern (which
485        // protects against `set_deps(self, ...)` from inside the in-flight
486        // invoke_fn). Operator nodes never reach this path (`fire_regular`
487        // is the fn-id branch of `fire_fn`; operators dispatch via
488        // `fire_operator`), so cleanup hooks correctly only fire for fn-
489        // shaped nodes (state / derived / dynamic / producer).
490        if has_fired_once {
491            self.binding
492                .cleanup_for(node_id, crate::boundary::CleanupTrigger::OnRerun);
493        }
494
495        // Phase 2: invoke fn lock-released. A6 reentrancy guard is scoped to
496        // the FFI call only — Phase 3's lock-held state mutation is not part
497        // of "currently firing" because set_deps would already block on the
498        // state lock by then. Drop on the guard pops the stack even if
499        // invoke_fn panics, keeping `currently_firing` balanced.
500        let result = {
501            let _firing = FiringGuard::new(self, node_id);
502            self.binding.invoke_fn(node_id, fn_id, &dep_batches)
503        };
504
505        // Phase 3: apply result under the lock — defensive terminal check
506        // (a sibling cascade may have terminated this node during phase 2).
507        let action: FireAction = {
508            let mut s = self.lock_state();
509            // Defensive: node may have terminated mid-phase-2 via a sibling
510            // cascade (a fn that re-entered `Core::error` on a path that
511            // cascaded here). If so, release any payload handles and no-op.
512            if s.require_node(node_id).terminal.is_some() {
513                match &result {
514                    FnResult::Data { handle, .. } => {
515                        self.binding.release_handle(*handle);
516                    }
517                    FnResult::Batch { emissions, .. } => {
518                        for em in emissions {
519                            match em {
520                                FnEmission::Data(h) | FnEmission::Error(h) => {
521                                    self.binding.release_handle(*h);
522                                }
523                                FnEmission::Complete => {}
524                            }
525                        }
526                    }
527                    FnResult::Noop { .. } => {}
528                }
529                return;
530            }
531            let rec = s.require_node_mut(node_id);
532            rec.has_fired_once = true;
533            if is_dynamic {
534                let tracked = match &result {
535                    FnResult::Data { tracked, .. }
536                    | FnResult::Noop { tracked }
537                    | FnResult::Batch { tracked, .. } => tracked.clone(),
538                };
539                if let Some(t) = tracked {
540                    rec.tracked = t.into_iter().collect();
541                }
542            }
543            match result {
544                FnResult::Noop { .. } => {
545                    // Slice G: skip Resolved if a prior emission in the same
546                    // wave already queued tier-3 (would violate R1.3.3.a).
547                    let already_dirty = s.require_node(node_id).dirty;
548                    let already_tier3 = s
549                        .pending_notify
550                        .get(&node_id)
551                        .is_some_and(|entry| entry.messages.iter().any(|m| m.tier() == 3));
552                    if already_dirty && !already_tier3 {
553                        self.queue_notify(&mut s, node_id, Message::Resolved);
554                    }
555                    FireAction::None
556                }
557                FnResult::Data { handle, .. } => FireAction::SingleData(handle),
558                FnResult::Batch { emissions, .. } if emissions.is_empty() => {
559                    // Empty Batch is equivalent to Noop — settle with
560                    // RESOLVED if the node was dirty (R1.3.1.a). Slice G:
561                    // skip if a prior emission already queued tier-3.
562                    let already_dirty = s.require_node(node_id).dirty;
563                    let already_tier3 = s
564                        .pending_notify
565                        .get(&node_id)
566                        .is_some_and(|entry| entry.messages.iter().any(|m| m.tier() == 3));
567                    if already_dirty && !already_tier3 {
568                        self.queue_notify(&mut s, node_id, Message::Resolved);
569                    }
570                    FireAction::None
571                }
572                FnResult::Batch { emissions, .. } => FireAction::Batch(emissions),
573            }
574        };
575
576        // Phase 4: commit emissions.
577        match action {
578            FireAction::None => {}
579            // Single Data — equals substitution applies (R1.3.2).
580            FireAction::SingleData(handle) => {
581                self.commit_emission(node_id, handle);
582            }
583            // Batch — process in sequence. No equals substitution
584            // (R1.3.2.d / R1.3.3.c: multi-message waves pass verbatim).
585            FireAction::Batch(emissions) => {
586                self.commit_batch(node_id, emissions);
587            }
588        }
589    }
590
591    /// Process a `FnResult::Batch` emissions sequence. Each `Data` goes
592    /// through `commit_emission_verbatim` (no equals substitution per
593    /// R1.3.2.d / R1.3.3.c). Terminal emissions (`Complete` / `Error`)
594    /// cascade per R1.3.4; processing stops at the first terminal and
595    /// remaining handles are released (R1.3.4.a: no further messages
596    /// after terminal).
597    fn commit_batch(&self, node_id: NodeId, emissions: SmallVec<[FnEmission; 2]>) {
598        let mut iter = emissions.into_iter();
599        for em in iter.by_ref() {
600            match em {
601                FnEmission::Data(handle) => {
602                    self.commit_emission_verbatim(node_id, handle);
603                }
604                FnEmission::Complete => {
605                    self.complete(node_id);
606                    break;
607                }
608                FnEmission::Error(handle) => {
609                    self.error(node_id, handle);
610                    break;
611                }
612            }
613        }
614        // Release handles from any emissions after the terminal break.
615        for em in iter {
616            match em {
617                FnEmission::Data(h) | FnEmission::Error(h) => {
618                    self.binding.release_handle(h);
619                }
620                FnEmission::Complete => {}
621            }
622        }
623    }
624
625    // -------------------------------------------------------------------
626    // Emission commit — equals-substitution lives here
627    // -------------------------------------------------------------------
628
629    /// Apply a node's emission. `&self`-only; brackets the equals check
630    /// around a lock release so `BindingBoundary::custom_equals` can re-enter
631    /// Core safely.
632    ///
633    /// Phase 1 (lock-held): defensive terminal short-circuit; snapshot
634    /// equals_mode + old cache handle.
635    ///
636    /// Phase 2 (lock-released): call `handles_equal` — `EqualsMode::Identity`
637    /// is a pure `u64` compare with no boundary call; `EqualsMode::Custom`
638    /// crosses to the binding's `custom_equals` oracle, which may re-enter
639    /// Core.
640    ///
641    /// Phase 3 (lock-held): set cache, queue Dirty + Data/Resolved into
642    /// pending_notify (which snapshots subscribers on first touch),
643    /// propagate to children.
644    pub(crate) fn commit_emission(&self, node_id: NodeId, new_handle: HandleId) {
645        assert!(
646            new_handle != NO_HANDLE,
647            "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
648        );
649
650        // Phase 1: terminal short-circuit + snapshot equals/cache.
651        let snapshot = {
652            let s = self.lock_state();
653            let rec = s.require_node(node_id);
654            if rec.terminal.is_some() {
655                drop(s);
656                self.binding.release_handle(new_handle);
657                return;
658            }
659            (rec.cache, rec.equals)
660        };
661        let (old_handle, equals_mode) = snapshot;
662
663        // Slice G (2026-05-07): R1.3.2.d says equals substitution only
664        // fires for SINGLE-DATA waves at one node. Detect "this is a
665        // subsequent emit in the same wave at this node" via the
666        // wave-scoped `tier3_emitted_this_wave` set. If set → multi-emit
667        // wave: skip equals, queue Data verbatim, retroactively rewrite
668        // any prior Resolved (queued by an earlier same-value emit's
669        // equals match) to Data using the wave-start cache snapshot.
670        // Outside batch / first emit: standard per-emit equals path.
671        let is_subsequent_emit_in_wave = {
672            let s = self.lock_state();
673            s.tier3_emitted_this_wave.contains(&node_id)
674        };
675
676        if is_subsequent_emit_in_wave {
677            // Multi-emit wave detected. Skip equals, queue Data verbatim.
678            // Also rewrite any prior Resolved entries to Data using the
679            // wave-start cache snapshot.
680            self.rewrite_prior_resolved_to_data(node_id);
681            self.commit_emission_verbatim(node_id, new_handle);
682            return;
683        }
684
685        // Phase 2: equals check (lock-released for Custom).
686        let is_data = !self.handles_equal_lock_released(equals_mode, old_handle, new_handle);
687
688        // Phase 3: apply emission under the lock. Defensive terminal
689        // re-check — a concurrent cascade between phase 2 and phase 3
690        // could have terminated the node.
691        let mut s = self.lock_state();
692        if s.require_node(node_id).terminal.is_some() {
693            drop(s);
694            self.binding.release_handle(new_handle);
695            return;
696        }
697
698        // R1.3.1.a condition (b): synthesize DIRTY only if node not already
699        // dirty from an earlier emission in the same wave.
700        let already_dirty = s.require_node(node_id).dirty;
701        s.require_node_mut(node_id).dirty = true;
702        if !already_dirty {
703            self.queue_notify(&mut s, node_id, Message::Dirty);
704        }
705
706        if is_data {
707            // P3 (Slice A close /qa): re-read CURRENT cache. Same-thread
708            // re-entry from a `custom_equals` oracle that called back into
709            // `Core::emit` on this same node during phase 2's lock-released
710            // equals check could have advanced the cache between phase 1's
711            // snapshot (`old_handle`) and this point.
712            let current_cache = s.require_node(node_id).cache;
713            let snapshot_taken = if s.in_tick && current_cache != NO_HANDLE {
714                use std::collections::hash_map::Entry;
715                match s.wave_cache_snapshots.entry(node_id) {
716                    Entry::Vacant(slot) => {
717                        slot.insert(current_cache);
718                        true
719                    }
720                    Entry::Occupied(_) => false,
721                }
722            } else {
723                false
724            };
725            s.require_node_mut(node_id).cache = new_handle;
726            if current_cache != NO_HANDLE && !snapshot_taken {
727                self.binding.release_handle(current_cache);
728            }
729            // Slice E1 (R2.6.5 / Lock 6.G): push DATA into the replay
730            // buffer if the node opted in. RESOLVED entries are NOT
731            // buffered (canonical "DATA only").
732            self.push_replay_buffer(&mut s, node_id, new_handle);
733            // Slice G: mark this node as having emitted tier-3 in this wave.
734            s.tier3_emitted_this_wave.insert(node_id);
735            self.queue_notify(&mut s, node_id, Message::Data(new_handle));
736            // Propagate to children
737            let child_ids: Vec<NodeId> = s
738                .children
739                .get(&node_id)
740                .map(|c| c.iter().copied().collect())
741                .unwrap_or_default();
742            for child_id in child_ids {
743                let dep_idx = s.require_node(child_id).dep_index_of(node_id);
744                if let Some(idx) = dep_idx {
745                    self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
746                }
747            }
748        } else {
749            // RESOLVED: handle unchanged. Don't release; old still in use.
750            // Slice G: snapshot cache so a subsequent same-wave emit can
751            // rewrite this Resolved to Data using the snapshot.
752            let current_cache = s.require_node(node_id).cache;
753            if s.in_tick && current_cache != NO_HANDLE {
754                use std::collections::hash_map::Entry;
755                if let Entry::Vacant(slot) = s.wave_cache_snapshots.entry(node_id) {
756                    self.binding.retain_handle(current_cache);
757                    slot.insert(current_cache);
758                }
759            }
760            // Slice G: mark this node as having emitted tier-3 in this wave.
761            s.tier3_emitted_this_wave.insert(node_id);
762            self.queue_notify(&mut s, node_id, Message::Resolved);
763            let child_ids: Vec<NodeId> = s
764                .children
765                .get(&node_id)
766                .map(|c| c.iter().copied().collect())
767                .unwrap_or_default();
768            for child_id in child_ids {
769                let already_involved = s.require_node(child_id).involved_this_wave;
770                if !already_involved {
771                    {
772                        let child = s.require_node_mut(child_id);
773                        child.involved_this_wave = true;
774                        child.dirty = true;
775                    }
776                    self.queue_notify(&mut s, child_id, Message::Dirty);
777                    s.pending_auto_resolve.insert(child_id);
778                }
779            }
780        }
781    }
782
783    /// Slice G: when a multi-emit wave is detected at `node_id` (a second
784    /// emit arrives while a prior tier-3 message is still pending), rewrite
785    /// any `Resolved` entries from earlier emits to `Data(snapshot_cache)`
786    /// so the wave conforms to R1.3.3.a (≥1 DATA OR exactly 1 RESOLVED).
787    /// Touches both `pending_notify` (immediate-flush path) and the per-node
788    /// pause buffer (paused path).
789    fn rewrite_prior_resolved_to_data(&self, node_id: NodeId) {
790        let mut s = self.lock_state();
791        let snapshot = match s.wave_cache_snapshots.get(&node_id).copied() {
792            Some(h) if h != NO_HANDLE => h,
793            // No snapshot available — the prior Resolved was queued without
794            // a cache (sentinel pre-emit). Nothing to rewrite to; the
795            // multi-emit case from sentinel is fine (verbatim Data path).
796            _ => return,
797        };
798        let mut retains_needed = 0u32;
799        // Pending_notify path.
800        if let Some(entry) = s.pending_notify.get_mut(&node_id) {
801            for msg in &mut entry.messages {
802                if matches!(msg, Message::Resolved) {
803                    *msg = Message::Data(snapshot);
804                    retains_needed += 1;
805                }
806            }
807        }
808        // Pause-buffer path.
809        if let Some(rec) = s.nodes.get_mut(&node_id) {
810            if let crate::node::PauseState::Paused { buffer, .. } = &mut rec.pause_state {
811                for msg in &mut *buffer {
812                    if matches!(msg, Message::Resolved) {
813                        *msg = Message::Data(snapshot);
814                        retains_needed += 1;
815                    }
816                }
817            }
818        }
819        drop(s);
820        // Each rewritten Resolved → Data adds a payload retain that
821        // queue_notify would otherwise have taken at emit time. The
822        // snapshot already owns one retain (taken when cache was
823        // snapshotted); we need one fresh retain per rewrite.
824        for _ in 0..retains_needed {
825            self.binding.retain_handle(snapshot);
826        }
827    }
828
829    /// Equals check that crosses the binding boundary lock-released for
830    /// `EqualsMode::Custom`. Caller must NOT hold the state lock.
831    fn handles_equal_lock_released(&self, mode: EqualsMode, a: HandleId, b: HandleId) -> bool {
832        if a == b {
833            return true; // identity-on-handles always sufficient
834        }
835        if a == NO_HANDLE || b == NO_HANDLE {
836            return false;
837        }
838        match mode {
839            EqualsMode::Identity => false,
840            EqualsMode::Custom(handle) => self.binding.custom_equals(handle, a, b),
841        }
842    }
843
844    /// Commit a DATA emission **without** equals substitution — used by
845    /// `FnResult::Batch` processing where multi-message waves pass through
846    /// verbatim per R1.3.2.d / R1.3.3.c. DIRTY auto-prefix respects
847    /// R1.3.1.a condition (b): only queued if node not already dirty.
848    ///
849    /// Structurally identical to the DATA branch of [`Self::commit_emission`]
850    /// but skips the Phase 2 equals check entirely.
851    fn commit_emission_verbatim(&self, node_id: NodeId, new_handle: HandleId) {
852        assert!(
853            new_handle != NO_HANDLE,
854            "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
855        );
856
857        let mut s = self.lock_state();
858        let rec = s.require_node(node_id);
859        if rec.terminal.is_some() {
860            drop(s);
861            self.binding.release_handle(new_handle);
862            return;
863        }
864
865        // R1.3.1.a condition (b): DIRTY only if not already dirty.
866        let already_dirty = s.require_node(node_id).dirty;
867        s.require_node_mut(node_id).dirty = true;
868        if !already_dirty {
869            self.queue_notify(&mut s, node_id, Message::Dirty);
870        }
871
872        // Always DATA — no equals substitution for Batch emissions.
873        let current_cache = s.require_node(node_id).cache;
874        let snapshot_taken = if s.in_tick && current_cache != NO_HANDLE {
875            use std::collections::hash_map::Entry;
876            match s.wave_cache_snapshots.entry(node_id) {
877                Entry::Vacant(slot) => {
878                    slot.insert(current_cache);
879                    true
880                }
881                Entry::Occupied(_) => false,
882            }
883        } else {
884            false
885        };
886        s.require_node_mut(node_id).cache = new_handle;
887        if current_cache != NO_HANDLE && !snapshot_taken {
888            self.binding.release_handle(current_cache);
889        }
890        // Slice E1: replay buffer push (R2.6.5 / Lock 6.G).
891        self.push_replay_buffer(&mut s, node_id, new_handle);
892        // Slice G QA fix (A2, 2026-05-07): mark tier3_emitted_this_wave
893        // even on the verbatim path. A subsequent commit_emission at the
894        // same node in the same wave needs this flag to detect multi-emit
895        // and skip equals substitution; without it, a Batch-then-standard
896        // sequence would queue Resolved into a wave that already has Data
897        // — violating R1.3.3.a. The Batch path itself still passes
898        // verbatim per R1.3.3.c (we don't re-run equals here); we just
899        // record that "this node has emitted tier-3 in this wave."
900        s.tier3_emitted_this_wave.insert(node_id);
901        self.queue_notify(&mut s, node_id, Message::Data(new_handle));
902        // Propagate to children
903        let child_ids: Vec<NodeId> = s
904            .children
905            .get(&node_id)
906            .map(|c| c.iter().copied().collect())
907            .unwrap_or_default();
908        for child_id in child_ids {
909            let dep_idx = s.require_node(child_id).dep_index_of(node_id);
910            if let Some(idx) = dep_idx {
911                self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
912            }
913        }
914    }
915
916    /// Slice E1 (R2.6.5 / Lock 6.G): push a DATA handle into the node's
917    /// replay buffer if opted in. Evicts oldest if cap exceeded; takes a
918    /// fresh retain on push. RESOLVED is NOT buffered per canonical
919    /// "DATA only" — call sites only invoke this for Data emissions.
920    ///
921    /// Evicted handle is queued into `s.deferred_handle_releases`
922    /// (released lock-released at flush time) per the binding-boundary
923    /// lock-release discipline — `release_handle` may re-enter Core via
924    /// finalizers and must not run while the state lock is held
925    /// (QA A3, 2026-05-07).
926    fn push_replay_buffer(&self, s: &mut CoreState, node_id: NodeId, new_handle: HandleId) {
927        let rec = s.require_node_mut(node_id);
928        let cap = match rec.replay_buffer_cap {
929            Some(c) if c > 0 => c,
930            _ => return,
931        };
932        self.binding.retain_handle(new_handle);
933        rec.replay_buffer.push_back(new_handle);
934        let evicted = if rec.replay_buffer.len() > cap {
935            rec.replay_buffer.pop_front()
936        } else {
937            None
938        };
939        if let Some(h) = evicted {
940            s.deferred_handle_releases.push(h);
941        }
942    }
943
944    // ===================================================================
945    // Operator dispatch (Slice C-1, D009).
946    //
947    // `fire_operator` is the entry point for nodes whose `kind` is
948    // `NodeKind::Operator(_)`. It branches on the `OperatorOp` discriminant
949    // to per-operator helpers that snapshot inputs under the lock, drop the
950    // lock to call the binding's bulk projection FFI, and reacquire to
951    // apply emissions via `commit_emission_verbatim` (no per-item equals
952    // dedup at the wire — operator output passes verbatim per the same
953    // R1.3.2.d / R1.3.3.c rule that governs `FnResult::Batch`).
954    //
955    // **Refcount discipline:** inputs sourced from `dep_records[i].data_batch`
956    // share retains owned by the wave's data-batch slot (released at
957    // wave-end rotation in `clear_wave_state`). Operators that emit those
958    // handles unchanged (`Filter`, `DistinctUntilChanged`, `Pairwise`'s
959    // `prev` carry-over) take an additional retain via `retain_handle`
960    // before passing to `commit_emission_verbatim` — the cache slot owns
961    // its own share, independent of the data-batch slot's. Operators that
962    // produce fresh handles (`Map` / `Scan` / `Reduce` / `Pairwise`'s
963    // packed tuples) receive retains pre-bumped by the binding's bulk-
964    // projection method.
965    // ===================================================================
966
967    /// Operator dispatch entry. Pre-checks (terminal short-circuit, first-
968    /// run gate accounting for `partial`, terminal-aware fire for `Reduce`)
969    /// happen here; per-operator behavior lives in the `fire_op_*` helpers.
970    fn fire_operator(&self, node_id: NodeId, op: OperatorOp) {
971        // Phase 1 (lock-held): remove from pending_fires, evaluate skip.
972        let proceed = {
973            let mut s = self.lock_state();
974            s.pending_fires.remove(&node_id);
975            let rec = s.require_node(node_id);
976            if rec.terminal.is_some() {
977                false
978            } else {
979                // First-run gate (R2.5.3 / R5.4). Partial-mode operators
980                // (D011) opt out of the gate; otherwise we wait for every
981                // dep to have delivered at least one real handle. Terminal-
982                // aware operators (currently `Reduce`) additionally count a
983                // dep terminal as "real input" so they can fire on
984                // upstream COMPLETE-without-DATA and emit the seed.
985                let has_real_input = !rec.has_sentinel_deps()
986                    || rec.dep_records.iter().any(|dr| dr.terminal.is_some());
987                rec.partial || has_real_input
988            }
989        };
990        if !proceed {
991            return;
992        }
993
994        // A6 (Slice F, 2026-05-07): track operator fire on the
995        // `currently_firing` stack so a binding-side project/predicate/fold
996        // FFI callback that re-enters `Core::set_deps(node_id, ...)` is
997        // rejected with `SetDepsError::ReentrantOnFiringNode`. Drop pops
998        // the stack on panic too.
999        let _firing = FiringGuard::new(self, node_id);
1000
1001        match op {
1002            OperatorOp::Map { fn_id } => self.fire_op_map(node_id, fn_id),
1003            OperatorOp::Filter { fn_id } => self.fire_op_filter(node_id, fn_id),
1004            OperatorOp::Scan { fn_id, .. } => self.fire_op_scan(node_id, fn_id),
1005            OperatorOp::Reduce { fn_id, .. } => self.fire_op_reduce(node_id, fn_id),
1006            OperatorOp::DistinctUntilChanged { equals_fn_id } => {
1007                self.fire_op_distinct(node_id, equals_fn_id);
1008            }
1009            OperatorOp::Pairwise { fn_id } => self.fire_op_pairwise(node_id, fn_id),
1010            OperatorOp::Combine { pack_fn } => self.fire_op_combine(node_id, pack_fn),
1011            OperatorOp::WithLatestFrom { pack_fn } => {
1012                self.fire_op_with_latest_from(node_id, pack_fn);
1013            }
1014            OperatorOp::Merge => self.fire_op_merge(node_id),
1015            OperatorOp::Take { count } => self.fire_op_take(node_id, count),
1016            OperatorOp::Skip { count } => self.fire_op_skip(node_id, count),
1017            OperatorOp::TakeWhile { fn_id } => self.fire_op_take_while(node_id, fn_id),
1018            // The variant carries `default` for `register_operator`'s
1019            // `make_op_scratch` path; once registered, the live default
1020            // is read from `LastState::default` inside `fire_op_last`.
1021            OperatorOp::Last { .. } => self.fire_op_last(node_id),
1022        }
1023    }
1024
1025    /// Snapshot the operator's single dep batch (transform constraint —
1026    /// R5.7 single-dep). Returns `(inputs, terminal)` where `inputs` is a
1027    /// fresh `Vec<HandleId>` (no retains) and `terminal` reflects
1028    /// `dep_records[0].terminal` at snapshot time.
1029    fn snapshot_op_dep0(&self, node_id: NodeId) -> (Vec<HandleId>, Option<TerminalKind>) {
1030        let s = self.lock_state();
1031        let rec = s.require_node(node_id);
1032        debug_assert!(
1033            !rec.dep_records.is_empty(),
1034            "transform operator must have ≥1 dep"
1035        );
1036        let dr = &rec.dep_records[0];
1037        (dr.data_batch.iter().copied().collect(), dr.terminal)
1038    }
1039
1040    /// Emit DIRTY (if not already dirty) followed by RESOLVED. Used by
1041    /// silent-drop operators (Filter / DistinctUntilChanged / Pairwise)
1042    /// when a wave's inputs all suppress and the operator needs to settle
1043    /// the wave for its subscribers (D018 — let DIRTY ride; queue RESOLVED
1044    /// on full-reject).
1045    fn settle_dirty_resolved(&self, node_id: NodeId) {
1046        let mut s = self.lock_state();
1047        if s.require_node(node_id).terminal.is_some() {
1048            return;
1049        }
1050        let already_dirty = s.require_node(node_id).dirty;
1051        s.require_node_mut(node_id).dirty = true;
1052        if !already_dirty {
1053            self.queue_notify(&mut s, node_id, Message::Dirty);
1054        }
1055        // Slice G: skip Resolved if pending_notify already has a tier-3
1056        // message — adding Resolved would violate R1.3.3.a.
1057        let already_tier3 = s
1058            .pending_notify
1059            .get(&node_id)
1060            .is_some_and(|entry| entry.messages.iter().any(|m| m.tier() == 3));
1061        if !already_tier3 {
1062            self.queue_notify(&mut s, node_id, Message::Resolved);
1063        }
1064    }
1065
1066    /// `OperatorOp::Map` dispatch.
1067    fn fire_op_map(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1068        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1069        // Mark fired regardless of input count (activation gate already
1070        // satisfied or partial-mode).
1071        {
1072            let mut s = self.lock_state();
1073            s.require_node_mut(node_id).has_fired_once = true;
1074        }
1075        if inputs.is_empty() {
1076            return;
1077        }
1078        // Phase 2 (lock-released): bulk project. Binding returns one
1079        // handle per input, each with a retain share already taken.
1080        let outputs = self.binding.project_each(fn_id, &inputs);
1081        // Phase 3: emit each output. `commit_emission_verbatim` consumes
1082        // the retain into the cache slot (and releases the prior cache
1083        // handle internally).
1084        for h in outputs {
1085            self.commit_emission_verbatim(node_id, h);
1086        }
1087    }
1088
1089    /// `OperatorOp::Filter` dispatch (D012/D018).
1090    fn fire_op_filter(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1091        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1092        {
1093            let mut s = self.lock_state();
1094            s.require_node_mut(node_id).has_fired_once = true;
1095        }
1096        if inputs.is_empty() {
1097            return;
1098        }
1099        // Phase 2: predicate per input.
1100        let pass = self.binding.predicate_each(fn_id, &inputs);
1101        debug_assert!(
1102            pass.len() == inputs.len(),
1103            "predicate_each returned {} bools for {} inputs",
1104            pass.len(),
1105            inputs.len()
1106        );
1107        // Phase 3: emit passing items verbatim. Take a fresh retain for
1108        // each — the data_batch slot still owns its retain (released at
1109        // wave-end rotation), and the cache slot needs its own.
1110        let mut emitted = 0usize;
1111        for (i, &h) in inputs.iter().enumerate() {
1112            if pass.get(i).copied().unwrap_or(false) {
1113                self.binding.retain_handle(h);
1114                self.commit_emission_verbatim(node_id, h);
1115                emitted += 1;
1116            }
1117        }
1118        // D018: full-reject settles with DIRTY+RESOLVED.
1119        if emitted == 0 {
1120            self.settle_dirty_resolved(node_id);
1121        }
1122    }
1123
1124    /// `OperatorOp::Scan` dispatch — left-fold emitting each new acc.
1125    fn fire_op_scan(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1126        use crate::op_state::ScanState;
1127        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1128        let acc = {
1129            let s = self.lock_state();
1130            scratch_ref::<ScanState>(&s, node_id).acc
1131        };
1132        {
1133            let mut s = self.lock_state();
1134            s.require_node_mut(node_id).has_fired_once = true;
1135        }
1136        if inputs.is_empty() {
1137            return;
1138        }
1139        // Phase 2: fold each input through. Returns N new handles, each
1140        // with a fresh retain.
1141        let new_states = self.binding.fold_each(fn_id, acc, &inputs);
1142        debug_assert!(
1143            new_states.len() == inputs.len(),
1144            "fold_each returned {} accs for {} inputs",
1145            new_states.len(),
1146            inputs.len()
1147        );
1148        // Phase 3a: update ScanState.acc to the LAST new acc. Take an
1149        // extra retain for the slot; release the prior acc's slot retain.
1150        let last_acc = new_states.last().copied();
1151        if let Some(last) = last_acc {
1152            let prev_acc = {
1153                let mut s = self.lock_state();
1154                let scratch = scratch_mut::<ScanState>(&mut s, node_id);
1155                let prev = scratch.acc;
1156                scratch.acc = last;
1157                prev
1158            };
1159            // Take the slot's retain on the new acc.
1160            self.binding.retain_handle(last);
1161            // Release the prior slot's retain (post-lock to keep binding
1162            // free to re-enter Core safely).
1163            if prev_acc != crate::handle::NO_HANDLE {
1164                self.binding.release_handle(prev_acc);
1165            }
1166        }
1167        // Phase 3b: emit each intermediate acc verbatim. `new_states`
1168        // entries each carry one retain from `fold_each`; that retain is
1169        // consumed by `commit_emission_verbatim` into the cache slot.
1170        for h in new_states {
1171            self.commit_emission_verbatim(node_id, h);
1172        }
1173    }
1174
1175    /// `OperatorOp::Reduce` dispatch — accumulates silently; emits acc on
1176    /// upstream COMPLETE (cascades ERROR verbatim).
1177    fn fire_op_reduce(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1178        use crate::op_state::ReduceState;
1179        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
1180        let acc = {
1181            let s = self.lock_state();
1182            scratch_ref::<ReduceState>(&s, node_id).acc
1183        };
1184        {
1185            let mut s = self.lock_state();
1186            s.require_node_mut(node_id).has_fired_once = true;
1187        }
1188        // Phase 2: accumulate (silent — no per-input emit).
1189        let new_states = if inputs.is_empty() {
1190            SmallVec::<[HandleId; 1]>::new()
1191        } else {
1192            self.binding.fold_each(fn_id, acc, &inputs)
1193        };
1194        debug_assert!(
1195            new_states.len() == inputs.len(),
1196            "fold_each returned {} accs for {} inputs",
1197            new_states.len(),
1198            inputs.len()
1199        );
1200        // Update ReduceState.acc to last new acc; release intermediate
1201        // states (we don't emit them) and the prior acc's slot retain.
1202        let last_acc = new_states.last().copied();
1203        let intermediates_to_release: Vec<HandleId> = if new_states.len() > 1 {
1204            new_states[..new_states.len() - 1].to_vec()
1205        } else {
1206            Vec::new()
1207        };
1208        let prev_acc_to_release = if let Some(last) = last_acc {
1209            let prev_acc = {
1210                let mut s = self.lock_state();
1211                let scratch = scratch_mut::<ReduceState>(&mut s, node_id);
1212                let prev = scratch.acc;
1213                scratch.acc = last;
1214                prev
1215            };
1216            self.binding.retain_handle(last);
1217            if prev_acc == crate::handle::NO_HANDLE {
1218                None
1219            } else {
1220                Some(prev_acc)
1221            }
1222        } else {
1223            None
1224        };
1225        // Release intermediate fold results (Reduce only emits the LAST,
1226        // but only on terminal). Each was retained by `fold_each`.
1227        for h in intermediates_to_release {
1228            self.binding.release_handle(h);
1229        }
1230        if let Some(h) = prev_acc_to_release {
1231            self.binding.release_handle(h);
1232        }
1233
1234        // Phase 3: emit on terminal.
1235        match terminal {
1236            None => {
1237                // Still accumulating; no emit. Subscribers see no message
1238                // for this wave (silent accumulation). The first wave that
1239                // pushes Reduce to fire produces a Dirty entry on the
1240                // upstream's commit, but Reduce itself doesn't queue any
1241                // tier-3 since R5 silently absorbs. v1: leave the
1242                // post-drain auto-resolve sweep to settle nothing —
1243                // pending_notify has no entry for Reduce so the sweep is
1244                // a no-op.
1245            }
1246            Some(TerminalKind::Complete) => {
1247                // Read the live acc (may be the seed if no DATA arrived)
1248                // and emit Data(acc) + Complete.
1249                let final_acc = {
1250                    let s = self.lock_state();
1251                    scratch_ref::<ReduceState>(&s, node_id).acc
1252                };
1253                if final_acc != crate::handle::NO_HANDLE {
1254                    // Emission needs its own retain (slot's retain is
1255                    // owned by ReduceState.acc until reset/Drop).
1256                    self.binding.retain_handle(final_acc);
1257                    self.commit_emission_verbatim(node_id, final_acc);
1258                }
1259                self.complete(node_id);
1260            }
1261            Some(TerminalKind::Error(h)) => {
1262                // Core::error transfers the caller's share into the
1263                // cascade (node.terminal + per-child dep_terminal slots);
1264                // no release at the error() boundary. Take a fresh share
1265                // here so the cascade owns it independently of the
1266                // dep_records[0].terminal slot's share.
1267                self.binding.retain_handle(h);
1268                self.error(node_id, h);
1269            }
1270        }
1271    }
1272
1273    /// `OperatorOp::DistinctUntilChanged` dispatch.
1274    fn fire_op_distinct(&self, node_id: NodeId, equals_fn_id: crate::handle::FnId) {
1275        use crate::op_state::DistinctState;
1276        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1277        let mut prev = {
1278            let s = self.lock_state();
1279            scratch_ref::<DistinctState>(&s, node_id).prev
1280        };
1281        {
1282            let mut s = self.lock_state();
1283            s.require_node_mut(node_id).has_fired_once = true;
1284        }
1285        if inputs.is_empty() {
1286            return;
1287        }
1288        // Take a working-copy retain on the initial prev so both the loop
1289        // (which releases old_prev on each non-equal item) and phase 3
1290        // (which releases the slot's original handle) each have their own
1291        // share. Without this, the loop's release of old_prev (== original
1292        // DistinctState.prev) double-releases against phase 3's stale_slot
1293        // release.
1294        if prev != crate::handle::NO_HANDLE {
1295            self.binding.retain_handle(prev);
1296        }
1297        // Phase 2: per-input equals(prev, current). Each non-equal input
1298        // is emitted and becomes the new prev. Equals fn_id reuses
1299        // `BindingBoundary::custom_equals`.
1300        let mut emitted = 0usize;
1301        for &h in &inputs {
1302            let equal = if prev == crate::handle::NO_HANDLE {
1303                false
1304            } else if prev == h {
1305                true
1306            } else {
1307                self.binding.custom_equals(equals_fn_id, prev, h)
1308            };
1309            if !equal {
1310                // Emit this input verbatim.
1311                self.binding.retain_handle(h);
1312                self.commit_emission_verbatim(node_id, h);
1313                // Update prev: take retain on new prev, release old
1314                // (working-copy retain from above or from prior iteration).
1315                self.binding.retain_handle(h);
1316                let old_prev = prev;
1317                prev = h;
1318                if old_prev != crate::handle::NO_HANDLE {
1319                    self.binding.release_handle(old_prev);
1320                }
1321                emitted += 1;
1322            }
1323        }
1324        // Phase 3: persist prev into DistinctState.prev slot. Release the
1325        // slot's original retain (stale_slot) — this is the slot-owned
1326        // share, independent of the working-copy share released in the
1327        // loop above.
1328        {
1329            let mut s = self.lock_state();
1330            let scratch = scratch_mut::<DistinctState>(&mut s, node_id);
1331            let stale_slot = scratch.prev;
1332            scratch.prev = prev;
1333            if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
1334                drop(s);
1335                self.binding.release_handle(stale_slot);
1336            }
1337        }
1338        // Release the working-copy retain on the final prev if it was
1339        // never released in the loop (i.e. no non-equal items passed,
1340        // prev == original). In that case stale_slot == prev, so phase 3
1341        // didn't release it either — but the working-copy retain is still
1342        // outstanding. Release it now.
1343        if emitted == 0 && prev != crate::handle::NO_HANDLE {
1344            self.binding.release_handle(prev);
1345        }
1346        if emitted == 0 {
1347            self.settle_dirty_resolved(node_id);
1348        }
1349    }
1350
1351    /// `OperatorOp::Pairwise` dispatch — emits `(prev, current)` tuples
1352    /// starting after the second value (first input swallowed, sets `prev`).
1353    fn fire_op_pairwise(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1354        use crate::op_state::PairwiseState;
1355        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1356        let mut prev = {
1357            let s = self.lock_state();
1358            scratch_ref::<PairwiseState>(&s, node_id).prev
1359        };
1360        {
1361            let mut s = self.lock_state();
1362            s.require_node_mut(node_id).has_fired_once = true;
1363        }
1364        if inputs.is_empty() {
1365            return;
1366        }
1367        let mut emitted = 0usize;
1368        for &h in &inputs {
1369            if prev == crate::handle::NO_HANDLE {
1370                // First-ever value — swallow, set prev. Retain for the
1371                // PairwiseState.prev slot (persisted in phase 3 below).
1372                self.binding.retain_handle(h);
1373                prev = h;
1374                continue;
1375            }
1376            // Pack (prev, current) into a tuple handle. Binding returns a
1377            // fresh retain on the packed handle.
1378            let packed = self.binding.pairwise_pack(fn_id, prev, h);
1379            self.commit_emission_verbatim(node_id, packed);
1380            // Advance prev: take retain on h, release old prev.
1381            self.binding.retain_handle(h);
1382            let old_prev = prev;
1383            prev = h;
1384            self.binding.release_handle(old_prev);
1385            emitted += 1;
1386        }
1387        // Persist prev into PairwiseState.prev slot.
1388        {
1389            let mut s = self.lock_state();
1390            let scratch = scratch_mut::<PairwiseState>(&mut s, node_id);
1391            let stale_slot = scratch.prev;
1392            scratch.prev = prev;
1393            if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
1394                drop(s);
1395                self.binding.release_handle(stale_slot);
1396            }
1397        }
1398        if emitted == 0 {
1399            self.settle_dirty_resolved(node_id);
1400        }
1401    }
1402
1403    // =================================================================
1404    // Slice C-2: multi-dep combinator operators (D020)
1405    // =================================================================
1406
1407    /// Snapshot all deps' "latest" handle for multi-dep combinators.
1408    /// For each dep: returns `data_batch.last()` if non-empty (dep fired
1409    /// this wave), else `prev_data` (last handle from previous wave).
1410    /// Also returns whether dep[0] (primary) had DATA this wave —
1411    /// needed by `fire_op_with_latest_from`.
1412    fn snapshot_op_all_latest(&self, node_id: NodeId) -> (SmallVec<[HandleId; 4]>, bool) {
1413        let s = self.lock_state();
1414        let rec = s.require_node(node_id);
1415        let primary_fired = rec
1416            .dep_records
1417            .first()
1418            .is_some_and(|dr| !dr.data_batch.is_empty());
1419        let latest: SmallVec<[HandleId; 4]> = rec
1420            .dep_records
1421            .iter()
1422            .map(|dr| dr.data_batch.last().copied().unwrap_or(dr.prev_data))
1423            .collect();
1424        (latest, primary_fired)
1425    }
1426
1427    /// `OperatorOp::Combine` dispatch — N-dep combineLatest. Packs the
1428    /// latest handle per dep into a tuple via `pack_tuple`, emits on
1429    /// any dep fire. First-run gate (R2.5.3, partial: false) guarantees
1430    /// all deps have a real handle on first fire. Post-warmup INVALIDATE
1431    /// guard: if any dep's prev_data was cleared, settles with RESOLVED
1432    /// instead of packing a NO_HANDLE into the tuple.
1433    fn fire_op_combine(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
1434        let (latest, _primary_fired) = self.snapshot_op_all_latest(node_id);
1435        {
1436            let mut s = self.lock_state();
1437            s.require_node_mut(node_id).has_fired_once = true;
1438        }
1439        // Post-warmup INVALIDATE guard: a dep may have been invalidated
1440        // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
1441        if latest.contains(&crate::handle::NO_HANDLE) {
1442            self.settle_dirty_resolved(node_id);
1443            return;
1444        }
1445        let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
1446        self.commit_emission_verbatim(node_id, tuple_handle);
1447    }
1448
1449    /// `OperatorOp::WithLatestFrom` dispatch — 2-dep, fire-on-primary-only
1450    /// (D021 / Phase 10.5). Emits `[primary, secondary]` pair only when
1451    /// dep[0] (primary) has DATA in the wave. If only dep[1] fires →
1452    /// RESOLVED. Post-warmup INVALIDATE guard: if secondary latest is
1453    /// `NO_HANDLE` (INVALIDATE cleared it), settles with RESOLVED.
1454    fn fire_op_with_latest_from(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
1455        let (latest, primary_fired) = self.snapshot_op_all_latest(node_id);
1456        let first_fire = {
1457            let mut s = self.lock_state();
1458            let rec = s.require_node_mut(node_id);
1459            let was_first = !rec.has_fired_once;
1460            rec.has_fired_once = true;
1461            was_first
1462        };
1463        // On first fire (gate release), always emit — the first-run gate
1464        // guarantees both deps have values (via prev_data fallback in
1465        // snapshot). On subsequent fires, only emit when primary fires.
1466        if !first_fire && !primary_fired {
1467            // Secondary-only update — no downstream DATA.
1468            self.settle_dirty_resolved(node_id);
1469            return;
1470        }
1471        // Post-warmup INVALIDATE guard: secondary may have been invalidated
1472        // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
1473        debug_assert!(latest.len() == 2, "withLatestFrom requires exactly 2 deps");
1474        if latest[1] == crate::handle::NO_HANDLE {
1475            self.settle_dirty_resolved(node_id);
1476            return;
1477        }
1478        let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
1479        self.commit_emission_verbatim(node_id, tuple_handle);
1480    }
1481
1482    /// `OperatorOp::Merge` dispatch — N-dep, forward all DATA handles
1483    /// verbatim (D022). Zero FFI on fire: no transformation. Each dep's
1484    /// batch handles are collected, retained, and emitted individually.
1485    fn fire_op_merge(&self, node_id: NodeId) {
1486        // Collect all batch handles from all deps (flat).
1487        let all_handles: Vec<HandleId> = {
1488            let s = self.lock_state();
1489            let rec = s.require_node(node_id);
1490            rec.dep_records
1491                .iter()
1492                .flat_map(|dr| dr.data_batch.iter().copied())
1493                .collect()
1494        };
1495        {
1496            let mut s = self.lock_state();
1497            s.require_node_mut(node_id).has_fired_once = true;
1498        }
1499        if all_handles.is_empty() {
1500            // All deps settled RESOLVED this wave — no DATA to forward.
1501            self.settle_dirty_resolved(node_id);
1502            return;
1503        }
1504        // Emit each handle verbatim. Take a fresh retain per handle
1505        // (independent of the dep batch's retain which gets released at
1506        // wave-end). Matches Filter's discipline for passing inputs.
1507        for &h in &all_handles {
1508            self.binding.retain_handle(h);
1509            self.commit_emission_verbatim(node_id, h);
1510        }
1511    }
1512
1513    // =================================================================
1514    // Slice C-3: flow operators (D024)
1515    // =================================================================
1516
1517    /// `OperatorOp::Take` dispatch — emits the first `count` DATA values
1518    /// then self-completes via `Core::complete`. When `count == 0`, the
1519    /// first fire emits zero items then immediately self-completes
1520    /// (D027). Cross-wave counter lives in
1521    /// [`TakeState::count_emitted`](super::op_state::TakeState::count_emitted).
1522    fn fire_op_take(&self, node_id: NodeId, count: u32) {
1523        use crate::op_state::TakeState;
1524        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
1525        // Snapshot current counter; mark fired regardless of input count
1526        // (activation gate already satisfied or partial-mode).
1527        let mut count_emitted = {
1528            let s = self.lock_state();
1529            scratch_ref::<TakeState>(&s, node_id).count_emitted
1530        };
1531        {
1532            let mut s = self.lock_state();
1533            s.require_node_mut(node_id).has_fired_once = true;
1534        }
1535        // Already at quota before any input this wave — self-complete
1536        // immediately. Covers `count == 0` (first-fire short-circuit) and
1537        // any defensive re-entry after the terminal-skip in `fire_operator`
1538        // already guards against double-complete.
1539        if count_emitted >= count {
1540            self.complete(node_id);
1541            return;
1542        }
1543        // Per-input emission loop. Each pass takes a fresh retain for the
1544        // cache slot; data_batch slot's retain is released at wave-end
1545        // rotation independently.
1546        for &h in &inputs {
1547            self.binding.retain_handle(h);
1548            self.commit_emission_verbatim(node_id, h);
1549            count_emitted = count_emitted.saturating_add(1);
1550            if count_emitted >= count {
1551                break;
1552            }
1553        }
1554        // Persist the updated counter.
1555        {
1556            let mut s = self.lock_state();
1557            scratch_mut::<TakeState>(&mut s, node_id).count_emitted = count_emitted;
1558        }
1559        // Self-complete if we hit the quota this wave. Upstream COMPLETE
1560        // (terminal == Some(Complete)) without us hitting the count
1561        // propagates via the standard auto-cascade — we don't intercept it.
1562        if count_emitted >= count {
1563            self.complete(node_id);
1564            return;
1565        }
1566        // If upstream is already Errored and we haven't hit count, the
1567        // standard cascade will propagate it. If the wave delivered no
1568        // inputs (e.g. RESOLVED from upstream), settle DIRTY+RESOLVED so
1569        // subscribers see the wave close.
1570        if inputs.is_empty() && terminal.is_none() {
1571            self.settle_dirty_resolved(node_id);
1572        }
1573    }
1574
1575    /// `OperatorOp::Skip` dispatch — drops the first `count` DATA values,
1576    /// then forwards the rest. Cross-wave counter lives in
1577    /// [`SkipState::count_skipped`](super::op_state::SkipState::count_skipped).
1578    /// On a wave where every input is still in the skip window, settles
1579    /// DIRTY+RESOLVED (D018 pattern) so subscribers see the wave close.
1580    fn fire_op_skip(&self, node_id: NodeId, count: u32) {
1581        use crate::op_state::SkipState;
1582        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1583        let mut count_skipped = {
1584            let s = self.lock_state();
1585            scratch_ref::<SkipState>(&s, node_id).count_skipped
1586        };
1587        {
1588            let mut s = self.lock_state();
1589            s.require_node_mut(node_id).has_fired_once = true;
1590        }
1591        // No early-return on empty inputs: the post-loop `emitted == 0`
1592        // settle handles the empty-inputs case identically to the
1593        // all-swallowed-by-skip-window case (Slice C-3 /qa P6 — symmetry
1594        // with `fire_op_take`).
1595        let mut emitted = 0usize;
1596        for &h in &inputs {
1597            if count_skipped < count {
1598                count_skipped = count_skipped.saturating_add(1);
1599                // Drop this input — the data_batch slot still owns its
1600                // retain (released at wave-end rotation). No emission.
1601                continue;
1602            }
1603            // Past the skip window — emit verbatim. Take a fresh retain
1604            // for the cache slot.
1605            self.binding.retain_handle(h);
1606            self.commit_emission_verbatim(node_id, h);
1607            emitted += 1;
1608        }
1609        // Persist the updated counter.
1610        {
1611            let mut s = self.lock_state();
1612            scratch_mut::<SkipState>(&mut s, node_id).count_skipped = count_skipped;
1613        }
1614        if emitted == 0 {
1615            self.settle_dirty_resolved(node_id);
1616        }
1617    }
1618
1619    /// `OperatorOp::TakeWhile` dispatch — emits while the predicate
1620    /// holds; on the first `false`, emits any preceding passes from the
1621    /// same batch then self-completes via `Core::complete`. Reuses
1622    /// [`BindingBoundary::predicate_each`] (D029).
1623    fn fire_op_take_while(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1624        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1625        {
1626            let mut s = self.lock_state();
1627            s.require_node_mut(node_id).has_fired_once = true;
1628        }
1629        if inputs.is_empty() {
1630            return;
1631        }
1632        // Phase 2: predicate per input.
1633        let pass = self.binding.predicate_each(fn_id, &inputs);
1634        debug_assert!(
1635            pass.len() == inputs.len(),
1636            "predicate_each returned {} bools for {} inputs",
1637            pass.len(),
1638            inputs.len()
1639        );
1640        // Phase 3: emit each input until the first false; then
1641        // self-complete. `fire_operator`'s `terminal.is_some()`
1642        // short-circuit gates re-entry after the self-complete cascade
1643        // installs the terminal slot — no extra `done` flag needed.
1644        let mut emitted = 0usize;
1645        let mut first_false_seen = false;
1646        for (i, &h) in inputs.iter().enumerate() {
1647            if pass.get(i).copied().unwrap_or(false) {
1648                self.binding.retain_handle(h);
1649                self.commit_emission_verbatim(node_id, h);
1650                emitted += 1;
1651            } else {
1652                first_false_seen = true;
1653                break;
1654            }
1655        }
1656        if first_false_seen {
1657            self.complete(node_id);
1658            return;
1659        }
1660        if emitted == 0 {
1661            // Whole batch passed but was empty (impossible here since
1662            // inputs.is_empty() returned early above) — defensive only.
1663            self.settle_dirty_resolved(node_id);
1664        }
1665    }
1666
1667    /// `OperatorOp::Last` dispatch — buffers the latest DATA; emits
1668    /// `Data(latest)` (or `Data(default)` if no DATA arrived and a
1669    /// default was registered) then `Complete` on upstream COMPLETE.
1670    /// On upstream ERROR, propagates verbatim. Storage:
1671    /// [`LastState`](super::op_state::LastState).
1672    ///
1673    /// **Silent-buffer semantics (mirrors Reduce):** on a non-terminal
1674    /// wave (`terminal == None`), `fire_op_last` updates the buffered
1675    /// `latest` handle but produces NO downstream wire message —
1676    /// subscribers observe the operator only when upstream
1677    /// COMPLETE/ERROR triggers the terminal branch. Intermediate
1678    /// inputs from the dep's batch are dropped on the floor (their
1679    /// `data_batch` retains release at wave-end rotation
1680    /// independently). Per-wave settlement on intermediate waves is
1681    /// the canonical behavior for terminal-aware operators.
1682    fn fire_op_last(&self, node_id: NodeId) {
1683        use crate::op_state::LastState;
1684        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
1685        {
1686            let mut s = self.lock_state();
1687            s.require_node_mut(node_id).has_fired_once = true;
1688        }
1689
1690        // Phase 2: buffer the latest input handle (if any). Retain new,
1691        // release old. data_batch slot's retain is released at wave-end
1692        // rotation independently — the LastState slot keeps its own
1693        // share so the value survives across waves.
1694        if let Some(&new_latest) = inputs.last() {
1695            let prev_latest = {
1696                let mut s = self.lock_state();
1697                let scratch = scratch_mut::<LastState>(&mut s, node_id);
1698                let prev = scratch.latest;
1699                scratch.latest = new_latest;
1700                prev
1701            };
1702            self.binding.retain_handle(new_latest);
1703            if prev_latest != crate::handle::NO_HANDLE {
1704                self.binding.release_handle(prev_latest);
1705            }
1706        }
1707
1708        // Phase 3: emit on terminal. Buffer-only fires (no terminal yet)
1709        // produce no downstream message — Reduce-style silent
1710        // accumulation. The post-drain auto-resolve sweep is a no-op
1711        // because pending_notify has no entry for Last.
1712        match terminal {
1713            None => {}
1714            Some(TerminalKind::Complete) => {
1715                // Read the live latest + default. If latest != NO_HANDLE,
1716                // emit it. Otherwise, if default != NO_HANDLE, emit default.
1717                // Otherwise, emit only Complete (empty stream, no default).
1718                let (latest, default) = {
1719                    let s = self.lock_state();
1720                    let scratch = scratch_ref::<LastState>(&s, node_id);
1721                    (scratch.latest, scratch.default)
1722                };
1723                let to_emit = if latest != crate::handle::NO_HANDLE {
1724                    Some(latest)
1725                } else if default != crate::handle::NO_HANDLE {
1726                    Some(default)
1727                } else {
1728                    None
1729                };
1730                if let Some(h) = to_emit {
1731                    // Emission needs its own retain — the LastState slot
1732                    // keeps its share until reset/Drop.
1733                    self.binding.retain_handle(h);
1734                    self.commit_emission_verbatim(node_id, h);
1735                }
1736                self.complete(node_id);
1737            }
1738            Some(TerminalKind::Error(h)) => {
1739                // Take a fresh share for the error cascade — the
1740                // dep_records[0].terminal slot keeps its own share
1741                // (released by reset_for_fresh_lifecycle / Drop).
1742                self.binding.retain_handle(h);
1743                self.error(node_id, h);
1744            }
1745        }
1746    }
1747
1748    pub(crate) fn deliver_data_to_consumer(
1749        &self,
1750        s: &mut CoreState,
1751        consumer_id: NodeId,
1752        dep_idx: usize,
1753        handle: HandleId,
1754    ) {
1755        // Retain the handle for the batch accumulation slot — each DATA
1756        // handle in `data_batch` owns a retain share, released at wave-end
1757        // rotation in `clear_wave_state`.
1758        self.binding.retain_handle(handle);
1759
1760        let is_dynamic;
1761        let is_state;
1762        let tracked_or_first_fire;
1763        // Slice F audit close (2026-05-07): default-mode pause suppression.
1764        // If the consumer is paused with `PausableMode::Default`, the
1765        // canonical-spec §2.6 behavior is to suppress fn-fire and consolidate
1766        // pause-window dep deliveries into one fn execution on RESUME.
1767        // Mark `pending_wave` on the pause state instead of adding to
1768        // `pending_fires`. The dep state still advances (the data_batch push
1769        // above is unchanged), and clear_wave_state still rotates the latest
1770        // dep DATA into prev_data — so when the fn ultimately fires on
1771        // RESUME, it sees the consolidated post-pause state.
1772        let suppressed_for_default_pause;
1773        {
1774            let consumer = s.require_node_mut(consumer_id);
1775            consumer.dep_records[dep_idx].data_batch.push(handle);
1776            consumer.dep_records[dep_idx].involved_this_wave = true;
1777            consumer.involved_this_wave = true;
1778            is_dynamic = consumer.is_dynamic;
1779            is_state = consumer.is_state();
1780            tracked_or_first_fire = !consumer.has_fired_once || consumer.tracked.contains(&dep_idx);
1781            suppressed_for_default_pause = consumer.pause_state.is_paused()
1782                && consumer.pausable == crate::node::PausableMode::Default;
1783            if suppressed_for_default_pause {
1784                consumer.pause_state.mark_pending_wave();
1785            }
1786        }
1787        if suppressed_for_default_pause {
1788            // Default-mode pause: don't add to pending_fires; RESUME will
1789            // schedule one consolidated fire.
1790            return;
1791        }
1792        if is_state {
1793            // State nodes don't have deps; unreachable in practice.
1794        } else if is_dynamic {
1795            if tracked_or_first_fire {
1796                s.pending_fires.insert(consumer_id);
1797            }
1798        } else {
1799            // Derived / Operator / Producer (Producer has no deps so won't
1800            // reach here, but the predicate-based dispatch handles it
1801            // uniformly).
1802            s.pending_fires.insert(consumer_id);
1803        }
1804    }
1805
1806    // -------------------------------------------------------------------
1807    // Subscriber notification
1808    // -------------------------------------------------------------------
1809
1810    /// Queue a wave-end message for `node_id`'s subscribers.
1811    ///
1812    /// **Sink-snapshot-on-first-touch:** the per-wave subscriber list is
1813    /// snapshotted the first time `queue_notify` is called for each node
1814    /// in a wave (via the `Vacant` arm of `pending_notify.entry()`).
1815    /// Subsequent calls for the same node append messages but reuse the
1816    /// snapshot. This makes late subscribers (installed mid-wave between
1817    /// fn-fire iterations now that the state lock drops per-iteration)
1818    /// invisible to messages already queued before they subscribed —
1819    /// avoiding the duplicate-Data delivery hazard where a late subscriber
1820    /// would see `[Start, Data(post)]` from its handshake AND `[Dirty,
1821    /// Data(post)]` from the wave's flush.
1822    ///
1823    /// Pause routing decision (R1.3.7.b tier table, §10.2 buffering):
1824    ///   Tier 3 (DATA / RESOLVED) and Tier 4 (INVALIDATE) buffer while
1825    ///   paused; all other tiers (DIRTY tier 1, PAUSE/RESUME tier 2,
1826    ///   COMPLETE/ERROR tier 5, TEARDOWN tier 6) bypass the buffer and
1827    ///   flush immediately. START (tier 0) is per-subscription and never
1828    ///   transits queue_notify.
1829    pub(crate) fn queue_notify(&self, s: &mut CoreState, node_id: NodeId, msg: Message) {
1830        // R1.3.3.a / R1.3.3.d (Slice G — re-added 2026-05-07): dev-mode
1831        // wave-content invariant assertion. The tier-3 slot at one node in
1832        // one wave is either ≥1 DATA or exactly 1 RESOLVED — never mixed,
1833        // never multiple RESOLVED. Slice G moved equals substitution from
1834        // per-emit to wave-end coalescing; this assert pins that the
1835        // dispatcher itself never queues a violating combination at the
1836        // queue_notify granularity. Resolved arrivals come from:
1837        //   1. The auto-resolve sweep in `drain_and_flush` (gates on
1838        //      `!any tier-3` so it can't add to a wave with Data).
1839        //   2. The wave-end equals-substitution pass (rewrites in place,
1840        //      doesn't go through queue_notify).
1841        // Both honor R1.3.3.a by construction post-Slice-G.
1842        #[cfg(debug_assertions)]
1843        if matches!(msg.tier(), 3) {
1844            if let Some(entry) = s.pending_notify.get(&node_id) {
1845                let has_data = entry.messages.iter().any(|m| matches!(m, Message::Data(_)));
1846                let resolved_count = entry
1847                    .messages
1848                    .iter()
1849                    .filter(|m| matches!(m, Message::Resolved))
1850                    .count();
1851                let incoming_is_data = matches!(msg, Message::Data(_));
1852                if incoming_is_data {
1853                    debug_assert!(
1854                        resolved_count == 0,
1855                        "R1.3.3.a violation at {node_id:?}: queueing Data into a \
1856                         wave that already contains Resolved — Slice G should have \
1857                         prevented this via wave-end coalescing"
1858                    );
1859                } else {
1860                    debug_assert!(
1861                        !has_data,
1862                        "R1.3.3.a violation at {node_id:?}: queueing Resolved into a \
1863                         wave that already contains Data"
1864                    );
1865                    debug_assert!(
1866                        resolved_count == 0,
1867                        "R1.3.3.a violation at {node_id:?}: multiple Resolved in one \
1868                         wave at one node"
1869                    );
1870                }
1871            }
1872        }
1873
1874        let buffered_tier = matches!(msg.tier(), 3 | 4);
1875        let cap = s.pause_buffer_cap;
1876
1877        // Pause-routing branch — handles its own retain/release and returns
1878        // before we touch `pending_notify`, so the rec borrow is contained.
1879        {
1880            let rec = s.require_node_mut(node_id);
1881            if rec.subscribers.is_empty() {
1882                return;
1883            }
1884            // Slice F audit close (2026-05-07): pause routing depends on mode.
1885            //   - `ResumeAll`: buffer tier-3/4 for verbatim replay on RESUME.
1886            //   - `Default` + STATE node: state nodes have no fn-fire to
1887            //     suppress, so buffer like resumeAll (collapse-to-latest is
1888            //     a future enhancement; v1 keeps verbatim).
1889            //   - `Default` + COMPUTE node: suppression happens upstream at
1890            //     fn-fire scheduling (see `deliver_data_to_consumer`); no
1891            //     outgoing tier-3 is produced from this node while paused,
1892            //     so this branch is unreachable for compute-default-paused.
1893            //     Fallthrough to the non-paused queue path is fine.
1894            //   - `Off`: pause is ignored entirely — tier-3 flushes
1895            //     immediately. Fallthrough.
1896            let mode_buffers_tier3 = match rec.pausable {
1897                crate::node::PausableMode::ResumeAll => true,
1898                crate::node::PausableMode::Default => rec.is_state(),
1899                crate::node::PausableMode::Off => false,
1900            };
1901            if buffered_tier && mode_buffers_tier3 && rec.pause_state.is_paused() {
1902                if let Some(h) = msg.payload_handle() {
1903                    self.binding.retain_handle(h);
1904                }
1905                let push_result = rec.pause_state.push_buffered(msg, cap);
1906                for dm in push_result.dropped_msgs {
1907                    if let Some(h) = dm.payload_handle() {
1908                        self.binding.release_handle(h);
1909                    }
1910                }
1911                // R1.3.8.c (Slice F, A3): on first overflow this cycle,
1912                // schedule a synthesized ERROR for wave-end emission.
1913                // `cap` is `Some` here (an overflow can only happen with a
1914                // configured cap), so `unwrap` is safe.
1915                if push_result.first_overflow_this_cycle {
1916                    if let Some((dropped_count, lock_held_ns)) =
1917                        rec.pause_state.overflow_diagnostic()
1918                    {
1919                        s.pending_pause_overflow
1920                            .push(crate::node::PendingPauseOverflow {
1921                                node_id,
1922                                dropped_count,
1923                                configured_max: cap.unwrap_or(0),
1924                                lock_held_ns,
1925                            });
1926                    }
1927                }
1928                return;
1929            }
1930        }
1931
1932        // Non-paused queue path: retain payload handle and queue into
1933        // pending_notify with a per-wave subscriber snapshot taken at
1934        // first touch. Released in `flush_notifications` after sinks fire.
1935        if let Some(h) = msg.payload_handle() {
1936            self.binding.retain_handle(h);
1937        }
1938
1939        // Snapshot subscribers on first touch per wave. We need to read
1940        // `subscribers` separately from the `pending_notify.entry()` borrow
1941        // because both are fields of `CoreState` and split-borrowing
1942        // through a method call (`require_node_mut`) defeats the borrow
1943        // checker. So precompute the snapshot iff the entry is vacant.
1944        let needs_snapshot = !s.pending_notify.contains_key(&node_id);
1945        let sinks_snapshot: Vec<Sink> = if needs_snapshot {
1946            s.require_node(node_id)
1947                .subscribers
1948                .values()
1949                .cloned()
1950                .collect()
1951        } else {
1952            Vec::new()
1953        };
1954        match s.pending_notify.entry(node_id) {
1955            Entry::Vacant(slot) => {
1956                slot.insert(PendingPerNode {
1957                    sinks: sinks_snapshot,
1958                    messages: vec![msg],
1959                });
1960            }
1961            Entry::Occupied(mut slot) => {
1962                slot.get_mut().messages.push(msg);
1963            }
1964        }
1965    }
1966
1967    /// Collect wave-end sink-fire jobs into `s.deferred_flush_jobs` and the
1968    /// payload-handle releases owed for `pending_notify` into
1969    /// `s.deferred_handle_releases`. The actual sink fires + handle releases
1970    /// run **after** the state lock is dropped — see [`Core::run_wave`].
1971    ///
1972    /// R1.3.1.b two-phase propagation: phase 1 (DIRTY) propagates through
1973    /// the entire graph before phase 2 (DATA / RESOLVED) begins. Implemented
1974    /// here as cross-node tier-then-node collect — phase 1's jobs sit before
1975    /// phase 2's in `deferred_flush_jobs`, so when `run_wave` drains the
1976    /// queue lock-released, multi-node subscribers see all DIRTYs before any
1977    /// settle. Matches TS's drainPhase model without the per-tier queue
1978    /// indirection.
1979    ///
1980    /// Phase ordering:
1981    ///   1 → tier 1   (DIRTY)
1982    ///   2 → tier 3+4 (DATA/RESOLVED + INVALIDATE — the "settle slice")
1983    ///   3 → tier 5   (COMPLETE/ERROR)
1984    ///   4 → tier 6   (TEARDOWN)
1985    ///
1986    /// Tier 0 (START) is per-subscription (never enters pending_notify) and
1987    /// tier 2 (PAUSE/RESUME) is delivered through dedicated paths, also
1988    /// bypassing pending_notify; both are absent from this enumeration.
1989    ///
1990    /// Within a single phase, per-node insertion order (IndexMap iteration)
1991    /// is preserved — an emit on A before B → A's phase-2 messages flush
1992    /// before B's. Within a single node, message order is preserved.
1993    fn flush_notifications(&self, s: &mut CoreState) {
1994        const PHASES: &[&[u8]] = &[
1995            &[1],    // DIRTY
1996            &[3, 4], // DATA/RESOLVED + INVALIDATE
1997            &[5],    // COMPLETE/ERROR
1998            &[6],    // TEARDOWN
1999        ];
2000        let pending = std::mem::take(&mut s.pending_notify);
2001        for &phase_tiers in PHASES {
2002            for (_node_id, entry) in &pending {
2003                let phase_msgs: Vec<Message> = entry
2004                    .messages
2005                    .iter()
2006                    .copied()
2007                    .filter(|m| phase_tiers.contains(&m.tier()))
2008                    .collect();
2009                if phase_msgs.is_empty() {
2010                    continue;
2011                }
2012                if entry.sinks.is_empty() {
2013                    continue;
2014                }
2015                // Clone the per-node sink snapshot — same Arcs, cheap.
2016                let sinks_clone: Vec<Sink> = entry.sinks.iter().map(Arc::clone).collect();
2017                s.deferred_flush_jobs.push((sinks_clone, phase_msgs));
2018            }
2019        }
2020        // Refcount release: balance the retain done in `queue_notify` for
2021        // every payload-bearing message that landed in pending_notify.
2022        // Deferred to post-lock-drop so the binding's release path can't
2023        // re-enter Core under our lock.
2024        for entry in pending.values() {
2025            for msg in &entry.messages {
2026                if let Some(h) = msg.payload_handle() {
2027                    s.deferred_handle_releases.push(h);
2028                }
2029            }
2030        }
2031    }
2032
2033    /// Take the deferred sink-fire jobs, payload-handle releases,
2034    /// cleanup-hook fire queue, and pending-wipe queue from `CoreState`.
2035    /// Callers pair this with a `drop(state_guard)` and a subsequent
2036    /// [`Self::fire_deferred`] to deliver the wave's sinks + handle
2037    /// releases + Slice E2 OnInvalidate cleanup hooks + Slice E2 /qa
2038    /// Q2(b) eager wipe_ctx fires lock-released.
2039    pub(crate) fn drain_deferred(s: &mut CoreState) -> WaveDeferred {
2040        (
2041            std::mem::take(&mut s.deferred_flush_jobs),
2042            std::mem::take(&mut s.deferred_handle_releases),
2043            std::mem::take(&mut s.deferred_cleanup_hooks),
2044            std::mem::take(&mut s.pending_wipes),
2045        )
2046    }
2047
2048    /// Fire deferred sink-fire jobs in collected order, then release the
2049    /// payload handles owed for messages that landed in `pending_notify`
2050    /// during the wave, then fire any queued Slice E2 OnInvalidate cleanup
2051    /// hooks. All three phases run lock-released so:
2052    /// - Sinks that call back into Core (emit, pause, etc.) re-acquire the
2053    ///   state lock cleanly and run their own nested wave.
2054    /// - The binding's `release_handle` path can't deadlock against a
2055    ///   binding-side mutex held by Core.
2056    /// - User cleanup closures (invoked via `BindingBoundary::cleanup_for`)
2057    ///   may safely re-enter Core for unrelated nodes.
2058    ///
2059    /// **Cleanup-drain panic discipline (D060):** each `cleanup_for` call
2060    /// is wrapped in `catch_unwind` so a single binding panic doesn't
2061    /// short-circuit the per-wave drain. All queued cleanup attempts run;
2062    /// if any panicked, the LAST panic re-raises after the loop completes
2063    /// (preserving wave-end discipline while still surfacing failures).
2064    /// Per D060, Core stays panic-naive about user code — bindings own
2065    /// their host-language panic policy inside `cleanup_for`; this
2066    /// `catch_unwind` is purely about drain-don't-short-circuit.
2067    pub(crate) fn fire_deferred(
2068        &self,
2069        jobs: DeferredJobs,
2070        releases: Vec<HandleId>,
2071        cleanup_hooks: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
2072        pending_wipes: Vec<crate::handle::NodeId>,
2073    ) {
2074        // Slice E2 /qa P1 (2026-05-07): wrap each sink-fire in
2075        // `catch_unwind` so a panicking sink doesn't unwind out of
2076        // `fire_deferred` and drop the queued `releases` +
2077        // `cleanup_hooks`. Mirrors Slice F audit fix A7's per-tier
2078        // handshake-fire discipline. Without this guard, a sink panic
2079        // here would silently leak handle retains AND silently drop
2080        // OnInvalidate cleanup hooks. AssertUnwindSafe is safe because
2081        // we re-raise the last panic at the end after running every
2082        // queued fire — drain ordering is preserved.
2083        let mut last_panic: Option<Box<dyn std::any::Any + Send>> = None;
2084        for (sinks, msgs) in jobs {
2085            for sink in &sinks {
2086                let sink = sink.clone();
2087                let msgs_ref = &msgs;
2088                let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2089                    sink(msgs_ref);
2090                }));
2091                if let Err(payload) = result {
2092                    last_panic = Some(payload);
2093                }
2094            }
2095        }
2096        for h in releases {
2097            self.binding.release_handle(h);
2098        }
2099        // Slice E2 (D060): drain cleanup hooks with per-item panic
2100        // isolation so the loop always completes. AssertUnwindSafe is
2101        // safe here because we don't rely on logical state being valid
2102        // post-panic — the panic propagates anyway after the drain ends.
2103        for (node_id, trigger) in cleanup_hooks {
2104            let binding = &self.binding;
2105            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2106                binding.cleanup_for(node_id, trigger);
2107            }));
2108            if let Err(payload) = result {
2109                last_panic = Some(payload);
2110            }
2111        }
2112        // Slice E2 /qa Q2(b) (D069): drain eager wipe_ctx queue with the
2113        // same per-item panic isolation. Fires AFTER cleanup hooks so a
2114        // resubscribable node's OnInvalidate (or any tier-3+ cleanup that
2115        // fires in the same wave) sees pre-wipe binding state if it
2116        // landed in the same wave as the terminal cascade. Mutually
2117        // exclusive with `Subscription::Drop`'s direct-fire site, but
2118        // even concurrent fires are idempotent (binding's `wipe_ctx`
2119        // calls `HashMap::remove` which is a no-op on absent keys).
2120        for node_id in pending_wipes {
2121            let binding = &self.binding;
2122            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2123                binding.wipe_ctx(node_id);
2124            }));
2125            if let Err(payload) = result {
2126                last_panic = Some(payload);
2127            }
2128        }
2129        if let Some(payload) = last_panic {
2130            std::panic::resume_unwind(payload);
2131        }
2132    }
2133
2134    // -------------------------------------------------------------------
2135    // User-facing batch — coalesce multiple emits into one wave
2136    // -------------------------------------------------------------------
2137
2138    /// Coalesce multiple emissions into a single wave. Every `emit` /
2139    /// `complete` / `error` / `teardown` / `invalidate` call inside `f`
2140    /// queues its downstream work; the wave drains when `f` returns.
2141    ///
2142    /// **R1.3.6.a** — DIRTY still propagates immediately (tier 1 isn't
2143    /// deferred); only tier-3+ delivery is held until scope exit. **R1.3.6.b**
2144    /// — repeated emits on the same node coalesce into a single multi-message
2145    /// delivery (one [`Message::Dirty`] for the wave + one [`Message::Data`]
2146    /// per emit, all delivered together in the per-node phase-2 pass).
2147    ///
2148    /// Nested `batch()` calls share the outer wave; only the outermost call
2149    /// drives the drain. Re-entrant calls from inside an `emit`/fn (the wave
2150    /// engine's own `s.in_tick` re-entrance) compose with this method
2151    /// transparently — they observe `in_tick = true` and skip drain just
2152    /// like nested `batch()`.
2153    ///
2154    /// On panic inside `f`, the `BatchGuard` returned by the internal
2155    /// `begin_batch` call drops normally and discards pending tier-3+ work
2156    /// (subscribers do not observe the half-built wave). See
2157    /// [`Core::begin_batch`] for the RAII variant if you need explicit control
2158    /// over the scope boundary.
2159    pub fn batch<F>(&self, f: F)
2160    where
2161        F: FnOnce(),
2162    {
2163        let _guard = self.begin_batch();
2164        f();
2165    }
2166
2167    /// RAII batch handle — opens a wave when constructed, drains on drop.
2168    ///
2169    /// Mirrors the closure-based [`Self::batch`] but exposes the scope
2170    /// boundary so callers can compose batches with non-`FnOnce` control
2171    /// flow (e.g. async-state-machine code paths, or splitting setup and
2172    /// drain across helper functions).
2173    ///
2174    /// ```ignore
2175    /// let g = core.begin_batch();
2176    /// core.emit(state_a, h1);
2177    /// core.emit(state_b, h2);
2178    /// drop(g); // wave drains here
2179    /// ```
2180    ///
2181    /// Like the closure form, nested `begin_batch` calls share the outer
2182    /// wave (only the outermost guard drains).
2183    #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
2184    pub fn begin_batch(&self) -> BatchGuard {
2185        // Acquire the wave-owner re-entrant mutex BEFORE the state lock.
2186        // Same-thread re-entry passes through (e.g. fn → emit → run_wave →
2187        // begin_batch from inside outer wave's invoke_fn); cross-thread
2188        // entries block until the in-flight wave's `_wave_guard` drops.
2189        // `lock_arc()` is `!Send` — strengthens BatchGuard's `!Send`
2190        // guarantee at the type level.
2191        let wave_guard = self.wave_owner.lock_arc();
2192        let owns_tick = {
2193            let mut s = self.lock_state();
2194            let was_in = s.in_tick;
2195            if !was_in {
2196                s.in_tick = true;
2197            }
2198            !was_in
2199        };
2200        BatchGuard {
2201            core: self.clone(),
2202            owns_tick,
2203            _wave_guard: wave_guard,
2204            _not_send: std::marker::PhantomData,
2205        }
2206    }
2207}
2208
2209/// RAII guard returned by [`Core::begin_batch`].
2210///
2211/// While alive, suppresses per-emit wave drains — multiple `emit` /
2212/// `complete` / `error` / `teardown` / `invalidate` calls coalesce into one
2213/// wave. On drop:
2214/// - Outermost guard: drains the wave (fires sinks, runs cleanup, clears
2215///   in-tick).
2216/// - Nested guard (an outer `BatchGuard` or an in-progress wave already owns
2217///   the in-tick flag): silently no-ops.
2218///
2219/// On thread panic during the closure body, the drop path discards pending
2220/// tier-3+ delivery rather than firing sinks (avoids cascading panics).
2221/// Subscribers observe **no tier-3+ delivery for the panicked wave**.
2222/// State-node cache writes that already executed inside the closure are
2223/// rolled back via wave-cache snapshots — `cache_of(s)` returns the pre-
2224/// panic value. The atomicity guarantee covers both sink-observability and
2225/// cache state.
2226///
2227/// # Thread safety
2228///
2229/// `BatchGuard` is **`!Send`** by design. `begin_batch` claims the
2230/// per-`Core` `in_tick` flag AND the per-`Core` `wave_owner` re-entrant
2231/// mutex on the calling thread; sending the guard to another thread
2232/// and dropping it there would clear `in_tick` and release `wave_owner`
2233/// from a different thread than the one that acquired them, breaking
2234/// both the thread-local "I own the wave scope" semantic and
2235/// `parking_lot::ReentrantMutex`'s ownership invariant. The
2236/// `_wave_guard` field is `!Send` (`ArcReentrantMutexGuard<()>`); the
2237/// `PhantomData<*const ()>` marker is belt-and-suspenders.
2238///
2239/// ```compile_fail
2240/// use graphrefly_core::{BatchGuard, BindingBoundary, Core, DepBatch, FnId, FnResult, HandleId, NodeId};
2241/// use std::sync::Arc;
2242///
2243/// struct Stub;
2244/// impl BindingBoundary for Stub {
2245///     fn invoke_fn(&self, _: NodeId, _: FnId, _: &[DepBatch]) -> FnResult {
2246///         FnResult::Noop { tracked: None }
2247///     }
2248///     fn custom_equals(&self, _: FnId, _: HandleId, _: HandleId) -> bool { false }
2249///     fn release_handle(&self, _: HandleId) {}
2250/// }
2251/// fn requires_send<T: Send>(_: T) {}
2252/// let core = Core::new(Arc::new(Stub) as Arc<dyn BindingBoundary>);
2253/// let guard = core.begin_batch();
2254/// requires_send(guard); // <- compile_fail: BatchGuard is !Send.
2255/// ```
2256#[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
2257pub struct BatchGuard {
2258    core: Core,
2259    owns_tick: bool,
2260    /// Re-entrant mutex guard held for the wave's duration. Drop releases
2261    /// `wave_owner`, allowing cross-thread waves blocked at
2262    /// `Core::begin_batch` to proceed. Same-thread re-entry (nested
2263    /// `begin_batch` from a fn callback) re-acquires this mutex
2264    /// transparently.
2265    ///
2266    /// `ArcReentrantMutexGuard<()>` is `!Send`, so `BatchGuard` is `!Send`
2267    /// at the type level — sending across threads would violate
2268    /// `parking_lot::ReentrantMutex`'s thread-ownership invariant.
2269    _wave_guard: ArcReentrantMutexGuard<parking_lot::RawMutex, parking_lot::RawThreadId, ()>,
2270    _not_send: std::marker::PhantomData<*const ()>,
2271}
2272
2273impl Drop for BatchGuard {
2274    fn drop(&mut self) {
2275        if !self.owns_tick {
2276            return;
2277        }
2278        if std::thread::panicking() {
2279            // Discard pending wave work to avoid firing sinks during
2280            // unwind (sink panic during unwind would abort the process).
2281            //
2282            // Refcount discipline: pending_notify entries (or any
2283            // already-collected deferred_handle_releases from a partial
2284            // drain) hold a queue_notify-time retain on every payload
2285            // handle. Release them here so the discard doesn't leak.
2286            //
2287            // Wave-cache snapshots restore pre-panic cache values so the
2288            // atomicity guarantee covers state, not just observability.
2289            let (pending, deferred_releases, restored_releases) = {
2290                let mut s = self.core.lock_state();
2291                let pending = std::mem::take(&mut s.pending_notify);
2292                let _: DeferredJobs = std::mem::take(&mut s.deferred_flush_jobs);
2293                s.pending_fires.clear();
2294                let restored = self.core.restore_wave_cache_snapshots(&mut s);
2295                // clear_wave_state pushes batch-handle releases into
2296                // deferred_handle_releases, so take it AFTER the clear.
2297                s.clear_wave_state();
2298                let deferred_releases = std::mem::take(&mut s.deferred_handle_releases);
2299                // Slice E2 (D061): panic-discard wave drops queued
2300                // OnInvalidate cleanup hooks SILENTLY. Bindings using
2301                // OnInvalidate for external-resource cleanup MUST
2302                // idempotent-cleanup at process exit / next successful
2303                // invalidate. Mirrors A3 `pending_pause_overflow`
2304                // panic-discard precedent.
2305                let _: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)> =
2306                    std::mem::take(&mut s.deferred_cleanup_hooks);
2307                // Slice E2 /qa Q2(b) (D069): same panic-discard discipline
2308                // for the eager-wipe queue. A panic-discarded wave drops
2309                // queued `wipe_ctx` fires silently; the binding-side
2310                // `NodeCtxState` entry remains until the next successful
2311                // terminate-with-no-subs cycle (or until `Core` drops).
2312                // This mirrors D061's external-resource-cleanup gap and
2313                // is documented similarly.
2314                let _: Vec<crate::handle::NodeId> = std::mem::take(&mut s.pending_wipes);
2315                s.in_tick = false;
2316                (pending, deferred_releases, restored)
2317            };
2318            // Lock dropped — release retains lock-released so the binding
2319            // can't deadlock against an internal binding mutex.
2320            for entry in pending.values() {
2321                for msg in &entry.messages {
2322                    if let Some(h) = msg.payload_handle() {
2323                        self.core.binding.release_handle(h);
2324                    }
2325                }
2326            }
2327            for h in deferred_releases {
2328                self.core.binding.release_handle(h);
2329            }
2330            for h in restored_releases {
2331                self.core.binding.release_handle(h);
2332            }
2333            return;
2334        }
2335        // Successful drain — drain_and_flush manages its own locking.
2336        self.core.drain_and_flush();
2337        // Wave cleanup + extract deferred jobs under the lock.
2338        let (jobs, releases, cleanup_hooks, pending_wipes) = {
2339            let mut s = self.core.lock_state();
2340            s.clear_wave_state();
2341            s.in_tick = false;
2342            self.core.commit_wave_cache_snapshots(&mut s);
2343            // `drain_deferred` takes `deferred_flush_jobs` +
2344            // `deferred_handle_releases` (incl. rotation releases pushed
2345            // by `clear_wave_state` above) + Slice E2
2346            // `deferred_cleanup_hooks` + Slice E2 /qa Q2(b)
2347            // `pending_wipes`.
2348            Core::drain_deferred(&mut s)
2349        };
2350        // Lock dropped — fire deferred sinks + release retains + fire
2351        // cleanup hooks (Slice E2 OnInvalidate, D060 catch_unwind drain)
2352        // + fire eager wipes (D069).
2353        self.core
2354            .fire_deferred(jobs, releases, cleanup_hooks, pending_wipes);
2355    }
2356}