Skip to main content

graphrefly_core/
batch.rs

1//! Wave engine — drain loop, fire selection, emission commit, sink dispatch.
2//!
3//! Ports the wave-engine portion of the handle-protocol prototype
4//! (`~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`).
5//! Sibling to [`super::node`]; the dispatcher's other concerns
6//! (registration, subscription, pause/resume, terminal cascade,
7//! `set_deps`) live there.
8//!
9//! # Wave engine entry points
10//!
11//! - [`Core::run_wave`] — wave entry. Claims `in_tick` under the state lock,
12//!   runs `op` lock-released, then drains all transitive fn-fires and
13//!   flushes per-subscriber notifications. Each fn-fire iteration drops
14//!   the state lock around `BindingBoundary::invoke_fn` so user fn callbacks
15//!   can re-enter Core safely.
16//! - [`Core::drain_and_flush`] — drain phase + flush phase. Acquires/drops
17//!   the state lock per iteration around `invoke_fn`.
18//! - [`Core::commit_emission`] — equals-substitution + DIRTY/DATA/RESOLVED
19//!   queueing + child propagation. `&self`-only; bracket-fires
20//!   `BindingBoundary::custom_equals` lock-released.
21//! - [`Core::queue_notify`] — per-subscriber message queueing with
22//!   pause-buffer routing. Snapshots the subscriber list at first-touch-
23//!   per-wave so late subscribers (installed mid-wave between drain
24//!   iterations) don't receive duplicate deliveries from messages already
25//!   queued before they subscribed.
26//! - [`Core::deliver_data_to_consumer`] — single-edge propagation; marks
27//!   the consumer for fn-fire if its tracked-deps set is satisfied.
28//!   Called from `commit_emission`, plus `activate_derived` and
29//!   `set_deps` in [`super::node`].
30//!
31//! # Re-entrance discipline (Slice A close — M1 fully lock-released)
32//!
33//! - **Wave-end sink fires** drop the state lock first (Slice A-bigger
34//!   discipline).
35//! - **`BindingBoundary::invoke_fn`** in `fire_fn` fires lock-released —
36//!   user fn callbacks may re-enter `Core::emit` / `pause` / `resume` /
37//!   `invalidate` / `complete` / `error` / `teardown` and run a nested
38//!   wave (the existing `s.in_tick` re-entrance gate composes
39//!   transparently).
40//! - **`BindingBoundary::custom_equals`** in `commit_emission`'s equals
41//!   check fires lock-released.
42//! - **Subscribe-time handshake** is the one remaining lock-held callback.
43//!   It now fires per-tier (`[Start]`, `[Data(v)]`, `[Complete|Error]`,
44//!   `[Teardown]`) as separate sink calls, matching the canonical R1.3.5.a
45//!   tier-split. Re-entrance from a handshake sink callback panics with
46//!   the [`reentrance_guard`] diagnostic.
47
48use std::sync::Arc;
49
50use indexmap::map::Entry;
51
52use smallvec::SmallVec;
53
54use crate::boundary::{DepBatch, FnEmission, FnResult};
55use crate::handle::{HandleId, NodeId, NO_HANDLE};
56use crate::message::Message;
57use crate::node::{Core, CoreState, EqualsMode, OperatorOp, Sink, TerminalKind};
58
59/// Deferred sink-fire jobs collected during `flush_notifications`. Each
60/// entry pairs a snapshot of the sink Arcs to fire with the messages to
61/// deliver to them — one entry per (node × phase) cell with non-empty
62/// content. Drained from `CoreState` and fired lock-released.
63pub(crate) type DeferredJobs = Vec<(Vec<Sink>, Vec<Message>)>;
64
65/// Lock-released drain payload of the wave's BatchGuard:
66/// `(sink_jobs, handle_releases, OnInvalidate cleanup hooks, pending wipe_ctx fires)`.
67/// Returned by [`Core::drain_deferred`], consumed by [`Core::fire_deferred`].
68/// Sliced into a type alias to satisfy `clippy::type_complexity`.
69pub(crate) type WaveDeferred = (
70    DeferredJobs,
71    Vec<HandleId>,
72    Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
73    Vec<crate::handle::NodeId>,
74);
75
76/// One subscriber-snapshot epoch within a node's wave-end notification
77/// queue. A `PendingBatch` is opened the first time `queue_notify` runs
78/// for the node in a wave, and a fresh batch is opened whenever the node's
79/// `subscribers_revision` advances mid-wave (a new sink subscribes, an
80/// existing sink unsubscribes, or a handshake-time panic evicts an
81/// orphaned sink). All messages within one batch flush to the same sink
82/// list — the snapshot taken when the batch opened, frozen against
83/// subsequent revision bumps.
84pub(crate) struct PendingBatch {
85    /// `NodeRecord::subscribers_revision` value at the moment this batch
86    /// opened. Used by `queue_notify` to decide append-to-last-batch vs
87    /// open-fresh-batch on every push.
88    pub(crate) snapshot_revision: u64,
89    pub(crate) sinks: Vec<Sink>,
90    pub(crate) messages: Vec<Message>,
91}
92
93/// Per-node wave-end notification queue, structured as one or more
94/// subscriber-snapshot epochs (`batches`). The common case (no
95/// mid-wave subscribe / unsubscribe at this node) keeps a single
96/// inline batch — `SmallVec<[_; 1]>` keeps that allocation-free.
97///
98/// **Slice X4 / D2 (2026-05-08):** the prior shape was a single
99/// `(sinks, messages)` pair per node — the snapshot froze on first
100/// `queue_notify` and was reused for every subsequent emit to the same
101/// node in the wave. That caused the documented late-subscriber +
102/// multi-emit-per-wave gap (R1.3.5.a divergence): a sub installed
103/// between two emits to the same node was invisible to the second
104/// emit's flush slice. The revision-tracked batch list resolves it —
105/// late subs land in a fresh batch that frozenly carries them, while
106/// pre-subscribe batches retain their original snapshot so the new
107/// sub doesn't double-receive earlier emits via flush AND handshake.
108pub(crate) struct PendingPerNode {
109    pub(crate) batches: SmallVec<[PendingBatch; 1]>,
110}
111
112impl PendingPerNode {
113    /// Iterate every queued message for this node across all batches in
114    /// arrival order. Used by R1.3.3.a invariant assertions and the
115    /// auto-resolve / Slice-G coalescing tier-3-presence checks, which
116    /// reason about wave-content per node, not per batch.
117    pub(crate) fn iter_messages(&self) -> impl Iterator<Item = &Message> + '_ {
118        self.batches.iter().flat_map(|b| b.messages.iter())
119    }
120
121    /// Mutable counterpart for `iter_messages`. Used by
122    /// `rewrite_prior_resolved_to_data` to in-place rewrite Resolved
123    /// entries to Data when a wave detects a multi-emit case after the
124    /// fact.
125    pub(crate) fn iter_messages_mut(&mut self) -> impl Iterator<Item = &mut Message> + '_ {
126        self.batches.iter_mut().flat_map(|b| b.messages.iter_mut())
127    }
128}
129
130/// RAII helper for the A6 reentrancy guard (Slice F, 2026-05-07).
131///
132/// Pushes `node_id` onto [`CoreState::currently_firing`] on construction,
133/// pops it on Drop. [`Core::set_deps`] consults the stack and rejects
134/// `set_deps(N, ...)` from inside N's own fn-fire with
135/// [`crate::node::SetDepsError::ReentrantOnFiringNode`] — closing the
136/// D1 hazard where Phase-1's snapshot of `dep_handles` would refer to
137/// a different dep ordering than Phase-3's `tracked` storage.
138///
139/// Wraps the lock-released `invoke_fn` (and operator-equivalent FFI
140/// callbacks like `project_each` / `predicate_each`). Drop fires even
141/// on panic, so the stack stays balanced under user-fn unwinds.
142///
143/// Membership semantics (NOT strict LIFO): the only consumer of
144/// `currently_firing` is `Core::set_deps`'s reentrancy check, which uses
145/// `contains(&n)` — a set-membership test. Drop pops the right-most
146/// matching `node_id` via `rposition` + `swap_remove`. For a stack like
147/// `[A, B, A]` (A's fn re-enters B, B's fn re-enters A), B's drop pops
148/// the SECOND A (index 1) via swap_remove, leaving `[A, A]` — the
149/// physical order of the remaining As may not match construction order,
150/// but membership is preserved. If a future call site needs strict LIFO
151/// (e.g. "pop the most recently fired node"), switch to `pop()` + assert
152/// the popped value equals `self.node_id`. (QA A6, 2026-05-07)
153pub(crate) struct FiringGuard {
154    core: Core,
155    node_id: NodeId,
156}
157
158impl FiringGuard {
159    pub(crate) fn new(core: &Core, node_id: NodeId) -> Self {
160        core.lock_state().currently_firing.push(node_id);
161        Self {
162            core: core.clone(),
163            node_id,
164        }
165    }
166}
167
168impl Drop for FiringGuard {
169    fn drop(&mut self) {
170        let mut s = self.core.lock_state();
171        if let Some(pos) = s.currently_firing.iter().rposition(|n| *n == self.node_id) {
172            s.currently_firing.swap_remove(pos);
173        }
174        // else: already popped by an external rebalance — silent no-op
175        // for Drop discipline (panic-in-Drop is poison).
176    }
177}
178
179/// Borrow the per-operator scratch slot as `&T`. Panics if the slot is
180/// uninitialized or the contained type doesn't match `T` — both are
181/// invariant violations for any `fire_op_*` helper that should only be
182/// called from `fire_operator`'s match arm for the matching variant.
183fn scratch_ref<T: crate::op_state::OperatorScratch>(s: &CoreState, node_id: NodeId) -> &T {
184    s.require_node(node_id)
185        .op_scratch
186        .as_ref()
187        .expect("op_scratch slot uninitialized for operator node")
188        .as_any_ref()
189        .downcast_ref::<T>()
190        .expect("op_scratch type mismatch")
191}
192
193/// Mutable borrow of the per-operator scratch slot. Same invariants as
194/// [`scratch_ref`].
195fn scratch_mut<T: crate::op_state::OperatorScratch>(s: &mut CoreState, node_id: NodeId) -> &mut T {
196    s.require_node_mut(node_id)
197        .op_scratch
198        .as_mut()
199        .expect("op_scratch slot uninitialized for operator node")
200        .as_any_mut()
201        .downcast_mut::<T>()
202        .expect("op_scratch type mismatch")
203}
204
205impl Core {
206    // -------------------------------------------------------------------
207    // Wave entry + drain
208    // -------------------------------------------------------------------
209
210    /// Wave entry. The caller passes a closure that performs the wave's
211    /// triggering operation (`commit_emission`, `terminate_node`, etc.).
212    /// The closure runs lock-released; closure-internal Core methods
213    /// acquire the state lock as they go.
214    ///
215    /// **Implementation:** delegates to [`Self::begin_batch`] for the
216    /// wave's RAII lifecycle. The returned `BatchGuard` holds the
217    /// `wave_owner` re-entrant mutex for the wave's duration (cross-thread
218    /// emits block; same-thread re-entry passes through), claims `in_tick`,
219    /// and on drop runs the drain + flush + sink-fire phases — OR, if the
220    /// closure panicked, the panic-discard path that restores cache
221    /// snapshots and clears in_tick. This unification gives `run_wave` the
222    /// same panic-safety guarantee as the user-facing `Core::batch`.
223    ///
224    /// **Re-entrance:** a closure invoked from inside another wave — the
225    /// inner `run_wave`'s `begin_batch` observes `in_tick=true`, the
226    /// returned guard is non-owning (`owns_tick=false`), drop is a no-op.
227    /// The outer wave's drain picks up the inner closure's queued work.
228    ///
229    /// **Lock-release discipline (Slice A close, M1):** all binding-side
230    /// callbacks except the subscribe-time handshake fire lock-released.
231    /// Sinks that re-enter Core run a nested wave; user fns that re-enter
232    /// Core run a nested wave; custom-equals oracles that re-enter Core
233    /// run a nested wave. Cross-thread emits block at `wave_owner` until
234    /// the in-flight wave's drain completes — preserving the user-facing
235    /// "emit returning means subscribers have observed" contract.
236    /// Wave entry with a known `seed` node. Acquires only the partitions
237    /// transitively touched from `seed` (downstream cascade via
238    /// `s.children` + R1.3.9.d meta-companion cascade) instead of every
239    /// current partition. The canonical Y1 parallelism win for per-seed
240    /// entry points (`Core::emit`, `Core::subscribe`'s activation,
241    /// `Core::pause` / `Core::resume` / `Core::invalidate` / `Core::complete`
242    /// / `Core::error` / `Core::teardown` / `Core::set_deps`'s
243    /// push-on-subscribe).
244    ///
245    /// Two threads with disjoint touched-partition sets run truly
246    /// parallel — they don't block each other on Core-global locks.
247    /// Same-thread re-entry passes through each partition's
248    /// `ReentrantMutex` transparently. Cross-thread emits on the SAME
249    /// partition (or any overlapping touched-partition set) serialize
250    /// per the per-partition `wave_owner` mutex, preserving the
251    /// "emit returning means subscribers have observed" contract.
252    ///
253    /// Slice Y1 / Phase E (2026-05-08).
254    pub(crate) fn run_wave_for<F>(&self, seed: crate::handle::NodeId, op: F)
255    where
256        F: FnOnce(&Self),
257    {
258        let _guard = self.begin_batch_for(seed);
259        op(self);
260    }
261
262    /// Release retains held by `wave_cache_snapshots` and clear the map.
263    /// Called from the wave-success paths in `run_wave` and `BatchGuard`.
264    pub(crate) fn commit_wave_cache_snapshots(&self, s: &mut CoreState) {
265        if s.wave_cache_snapshots.is_empty() {
266            return;
267        }
268        let snapshots = std::mem::take(&mut s.wave_cache_snapshots);
269        for (_, old_handle) in snapshots {
270            self.binding.release_handle(old_handle);
271        }
272    }
273
274    /// Restore cache slots from `wave_cache_snapshots` and clear the map.
275    /// Called from the wave-abort path in `BatchGuard::drop` (panic).
276    ///
277    /// For each snapshotted node:
278    ///
279    /// 1. Read the current cache (the in-flight new value).
280    /// 2. Set `cache = old_handle` (the snapshot's retained value).
281    /// 3. Release the now-unowned current cache handle.
282    ///
283    /// Returns the list of "current" handles to release outside the lock.
284    pub(crate) fn restore_wave_cache_snapshots(&self, s: &mut CoreState) -> Vec<HandleId> {
285        if s.wave_cache_snapshots.is_empty() {
286            return Vec::new();
287        }
288        let snapshots = std::mem::take(&mut s.wave_cache_snapshots);
289        let mut releases = Vec::with_capacity(snapshots.len());
290        for (node_id, old_handle) in snapshots {
291            let Some(rec) = s.nodes.get_mut(&node_id) else {
292                releases.push(old_handle);
293                continue;
294            };
295            let current = std::mem::replace(&mut rec.cache, old_handle);
296            if current != NO_HANDLE {
297                releases.push(current);
298            }
299        }
300        releases
301    }
302
303    /// Drain pending fires until quiescent, then flush wave-end notifications
304    /// to subscribers. Each fire iteration drops the state lock around the
305    /// binding's `invoke_fn` callback so user fns may re-enter Core safely.
306    ///
307    /// `&self`-only — manages its own locking. Called from [`Self::run_wave`]
308    /// and [`super::node::Core::activate_derived`] (via `run_wave`).
309    pub(crate) fn drain_and_flush(&self) {
310        let mut guard = 0u32;
311        loop {
312            // R1.3.8.c (Slice F, A3): if no fires are pending but there are
313            // queued pause-overflow ERRORs, synthesize them now. The
314            // resulting ERROR cascade may add to pending_fires (children
315            // settling their terminal state), so we loop back to drain.
316            let synth_pending = {
317                let mut s = self.lock_state();
318                if s.pending_fires.is_empty() && !s.pending_pause_overflow.is_empty() {
319                    std::mem::take(&mut s.pending_pause_overflow)
320                } else {
321                    Vec::new()
322                }
323            };
324            for entry in synth_pending {
325                // Lock-released call to the binding hook. Default impl
326                // returns None — the binding has opted out of R1.3.8.c
327                // and we fall back to silent-drop + ResumeReport.dropped.
328                let handle = self.binding.synthesize_pause_overflow_error(
329                    entry.node_id,
330                    entry.dropped_count,
331                    entry.configured_max,
332                    entry.lock_held_ns / 1_000_000,
333                );
334                if let Some(h) = handle {
335                    // Re-enter Core::error to terminate the node and
336                    // cascade. We're inside a wave (`in_tick = true`),
337                    // so error() gets a non-owning batch guard — it
338                    // doesn't try to start its own drain. The cascade
339                    // queues into our outer drain via pending_fires
340                    // and pending_notify.
341                    self.error(entry.node_id, h);
342                }
343            }
344
345            // Pick next fire under a short lock. Also re-read the configured
346            // drain cap so callers can tune via `Core::set_max_batch_drain_iterations`
347            // without restarting waves mid-flight.
348            let (next, cap, pending_size) = {
349                let s = self.lock_state();
350                if s.pending_fires.is_empty() {
351                    break;
352                }
353                let cap = s.max_batch_drain_iterations;
354                let pending_size = s.pending_fires.len();
355                let next = self.pick_next_fire(&s);
356                (next, cap, pending_size)
357            };
358            guard += 1;
359            assert!(
360                guard < cap,
361                "wave drain exceeded {cap} iterations \
362                 (pending_fires={pending_size}). Most likely cause: a runtime \
363                 cycle introduced by an operator that re-arms its own pending_fires \
364                 slot from inside `invoke_fn` (e.g. a producer that subscribes to \
365                 itself, or a fn that calls Core::emit on a node whose fn fires \
366                 the original node again). Structural cycles via set_deps are \
367                 rejected at edge-mutation time. Tune via Core::set_max_batch_drain_iterations \
368                 only with concrete evidence the workload needs more iterations."
369            );
370            let Some(next) = next else { break };
371            // fire_fn manages its own locking around invoke_fn.
372            self.fire_fn(next);
373        }
374        // Auto-resolve sweep: nodes registered in pending_auto_resolve
375        // by the RESOLVED child propagation need a Resolved if they didn't
376        // fire and settle via their own commit_emission. Check pending_notify
377        // for each candidate — if it has Dirty but no tier-3+ message, the
378        // node never settled and needs auto-Resolved. Route through
379        // queue_notify so paused nodes get the Resolved into their pause
380        // buffer.
381        let mut s = self.lock_state();
382        let candidates = std::mem::take(&mut s.pending_auto_resolve);
383        for node_id in candidates {
384            let needs_resolve = s
385                .pending_notify
386                .get(&node_id)
387                .is_some_and(|entry| !entry.iter_messages().any(|m| m.tier() >= 3));
388            if needs_resolve {
389                self.queue_notify(&mut s, node_id, Message::Resolved);
390            }
391        }
392        // Final flush phase — populates deferred_flush_jobs
393        // from pending_notify (already carries per-node sink snapshots).
394        self.flush_notifications(&mut s);
395    }
396
397    /// Pick a node whose **transitive** upstream is fully settled.
398    ///
399    /// A node `id` is ready to fire iff none of its transitive ancestors
400    /// (via the `deps` chain) is currently in `pending_fires`. Diamond
401    /// glitch prevention requires transitive (not immediate-only) reasoning
402    /// — for graph `A → {B, C, E}; D = combine(B, C); F = combine(D, E)`,
403    /// a wave that fires `E` first adds `F` to `pending_fires` before `D`
404    /// is added. An immediate-only readiness check would mistakenly pick
405    /// `F` because neither `D` nor `E` are in `pending_fires` at that
406    /// moment, firing `F` against the stale activation-time `D` cache and
407    /// again later when `D` actually settles. Transitive upstream walk
408    /// catches `B`/`C` pending and correctly defers `F`.
409    ///
410    /// Cost: O(V) per candidate; worst case O(N·V) per pick. The existing
411    /// porting-deferred entry on `pick_next_fire` perf flagged this as a
412    /// future per-node `unresolved_dep_count` refactor.
413    fn pick_next_fire(&self, s: &CoreState) -> Option<NodeId> {
414        for &id in &s.pending_fires {
415            if Self::transitive_upstream_settled(s, id) {
416                return Some(id);
417            }
418        }
419        // Cycle / no eligible candidate (every node has an upstream pending,
420        // possibly via a cycle path): pick any so the drain guard advances.
421        // The drain-iteration cap will catch genuine cycles.
422        s.pending_fires.iter().copied().next()
423    }
424
425    fn transitive_upstream_settled(s: &CoreState, node_id: NodeId) -> bool {
426        let rec = s.require_node(node_id);
427        if rec.dep_count() == 0 {
428            return true;
429        }
430        let mut visited: ahash::AHashSet<NodeId> = ahash::AHashSet::new();
431        let mut stack: Vec<NodeId> = rec.dep_ids_vec();
432        while let Some(id) = stack.pop() {
433            if !visited.insert(id) {
434                continue;
435            }
436            if s.pending_fires.contains(&id) {
437                return false;
438            }
439            if let Some(r) = s.nodes.get(&id) {
440                for dep_id in r.dep_ids() {
441                    if !visited.contains(&dep_id) {
442                        stack.push(dep_id);
443                    }
444                }
445            }
446        }
447        true
448    }
449
450    /// Wave drain entry point. Dispatches via `rec.op` to either the
451    /// regular fn-fire path ([`Self::fire_regular`]) or the operator
452    /// dispatch ([`Self::fire_operator`]).
453    pub(crate) fn fire_fn(&self, node_id: NodeId) {
454        let op = {
455            let s = self.lock_state();
456            s.nodes.get(&node_id).and_then(|r| r.op)
457        };
458        match op {
459            Some(operator_op) => self.fire_operator(node_id, operator_op),
460            None => {
461                // State / Derived / Dynamic / Producer all dispatch via fn_id.
462                self.fire_regular(node_id);
463            }
464        }
465    }
466
467    /// Fire a node's fn lock-released around `invoke_fn`.
468    ///
469    /// Phase 1 (lock-held): remove from pending_fires, snapshot fn_id +
470    /// dep_records → DepBatch + kind. Skip if terminal, first-run-gate-closed,
471    /// or stateless.
472    ///
473    /// Phase 2 (lock-released): call `BindingBoundary::invoke_fn`. User fn
474    /// callbacks may re-enter Core (`emit`, `pause`, etc.) and run a nested
475    /// wave — the in_tick gate composes naturally because nested calls
476    /// observe `in_tick = true` and skip their own drain.
477    ///
478    /// Phase 3 (lock-held): mark `has_fired_once`, store dynamic-tracked,
479    /// decide between Noop+RESOLVED, single Data, or Batch.
480    ///
481    /// Phase 4: commit emissions. Single Data goes through
482    /// `commit_emission` (with equals substitution). Batch emissions are
483    /// processed in sequence — Data via `commit_emission_verbatim` (no
484    /// equals substitution per R1.3.2.d / R1.3.3.c), Complete/Error via
485    /// terminal cascade.
486    #[allow(clippy::too_many_lines)] // Slice G added Noop / Batch tier-3 guards
487    fn fire_regular(&self, node_id: NodeId) {
488        enum FireAction {
489            None,
490            SingleData(HandleId),
491            Batch(SmallVec<[FnEmission; 2]>),
492        }
493
494        // Phase 1: snapshot inputs — build DepBatch per dep from dep_records.
495        // `has_fired_once` is captured here for the Slice E2 OnRerun gate
496        // (Phase 1.5 below): the cleanup hook only fires when the fn has
497        // run at least once already in this activation cycle.
498        let prep: Option<(crate::handle::FnId, Vec<DepBatch>, bool, bool)> = {
499            let mut s = self.lock_state();
500            s.pending_fires.remove(&node_id);
501            let rec = s.require_node(node_id);
502            // Skip: terminal, first-run-gate-closed (R2.5.3 / R5.4 — partial
503            // mode opts out of the gate per D011), or stateless.
504            if rec.terminal.is_some() || (!rec.partial && rec.has_sentinel_deps()) {
505                None
506            } else {
507                rec.fn_id.map(|fn_id| {
508                    let dep_batches: Vec<DepBatch> = rec
509                        .dep_records
510                        .iter()
511                        .map(|dr| DepBatch {
512                            data: dr.data_batch.clone(),
513                            prev_data: dr.prev_data,
514                            involved: dr.involved_this_wave,
515                        })
516                        .collect();
517                    (fn_id, dep_batches, rec.is_dynamic, rec.has_fired_once)
518                })
519            }
520        };
521        let Some((fn_id, dep_batches, is_dynamic, has_fired_once)) = prep else {
522            return;
523        };
524
525        // Phase 1.5 (Slice E2 — R2.4.5 OnRerun, lock-released per D045): if
526        // the fn has fired at least once in this activation cycle, fire its
527        // OnRerun cleanup hook BEFORE the next invoke_fn re-allocates fn-
528        // local resources. First-fire is intentionally skipped — there is
529        // no prior run to clean up. Fires OUTSIDE `FiringGuard` because
530        // cleanup re-entrance is not the A6 reentrancy concern (which
531        // protects against `set_deps(self, ...)` from inside the in-flight
532        // invoke_fn). Operator nodes never reach this path (`fire_regular`
533        // is the fn-id branch of `fire_fn`; operators dispatch via
534        // `fire_operator`), so cleanup hooks correctly only fire for fn-
535        // shaped nodes (state / derived / dynamic / producer).
536        if has_fired_once {
537            self.binding
538                .cleanup_for(node_id, crate::boundary::CleanupTrigger::OnRerun);
539        }
540
541        // Phase 2: invoke fn lock-released. A6 reentrancy guard is scoped to
542        // the FFI call only — Phase 3's lock-held state mutation is not part
543        // of "currently firing" because set_deps would already block on the
544        // state lock by then. Drop on the guard pops the stack even if
545        // invoke_fn panics, keeping `currently_firing` balanced.
546        let result = {
547            let _firing = FiringGuard::new(self, node_id);
548            self.binding.invoke_fn(node_id, fn_id, &dep_batches)
549        };
550
551        // Phase 3: apply result under the lock — defensive terminal check
552        // (a sibling cascade may have terminated this node during phase 2).
553        let action: FireAction = {
554            let mut s = self.lock_state();
555            // Defensive: node may have terminated mid-phase-2 via a sibling
556            // cascade (a fn that re-entered `Core::error` on a path that
557            // cascaded here). If so, release any payload handles and no-op.
558            if s.require_node(node_id).terminal.is_some() {
559                match &result {
560                    FnResult::Data { handle, .. } => {
561                        self.binding.release_handle(*handle);
562                    }
563                    FnResult::Batch { emissions, .. } => {
564                        for em in emissions {
565                            match em {
566                                FnEmission::Data(h) | FnEmission::Error(h) => {
567                                    self.binding.release_handle(*h);
568                                }
569                                FnEmission::Complete => {}
570                            }
571                        }
572                    }
573                    FnResult::Noop { .. } => {}
574                }
575                return;
576            }
577            let rec = s.require_node_mut(node_id);
578            rec.has_fired_once = true;
579            if is_dynamic {
580                let tracked = match &result {
581                    FnResult::Data { tracked, .. }
582                    | FnResult::Noop { tracked }
583                    | FnResult::Batch { tracked, .. } => tracked.clone(),
584                };
585                if let Some(t) = tracked {
586                    rec.tracked = t.into_iter().collect();
587                }
588            }
589            match result {
590                FnResult::Noop { .. } => {
591                    // Slice G: skip Resolved if a prior emission in the same
592                    // wave already queued tier-3 (would violate R1.3.3.a).
593                    let already_dirty = s.require_node(node_id).dirty;
594                    let already_tier3 = s
595                        .pending_notify
596                        .get(&node_id)
597                        .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3));
598                    if already_dirty && !already_tier3 {
599                        self.queue_notify(&mut s, node_id, Message::Resolved);
600                    }
601                    FireAction::None
602                }
603                FnResult::Data { handle, .. } => FireAction::SingleData(handle),
604                FnResult::Batch { emissions, .. } if emissions.is_empty() => {
605                    // Empty Batch is equivalent to Noop — settle with
606                    // RESOLVED if the node was dirty (R1.3.1.a). Slice G:
607                    // skip if a prior emission already queued tier-3.
608                    let already_dirty = s.require_node(node_id).dirty;
609                    let already_tier3 = s
610                        .pending_notify
611                        .get(&node_id)
612                        .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3));
613                    if already_dirty && !already_tier3 {
614                        self.queue_notify(&mut s, node_id, Message::Resolved);
615                    }
616                    FireAction::None
617                }
618                FnResult::Batch { emissions, .. } => FireAction::Batch(emissions),
619            }
620        };
621
622        // Phase 4: commit emissions.
623        match action {
624            FireAction::None => {}
625            // Single Data — equals substitution applies (R1.3.2).
626            FireAction::SingleData(handle) => {
627                self.commit_emission(node_id, handle);
628            }
629            // Batch — process in sequence. No equals substitution
630            // (R1.3.2.d / R1.3.3.c: multi-message waves pass verbatim).
631            FireAction::Batch(emissions) => {
632                self.commit_batch(node_id, emissions);
633            }
634        }
635    }
636
637    /// Process a `FnResult::Batch` emissions sequence. Each `Data` goes
638    /// through `commit_emission_verbatim` (no equals substitution per
639    /// R1.3.2.d / R1.3.3.c). Terminal emissions (`Complete` / `Error`)
640    /// cascade per R1.3.4; processing stops at the first terminal and
641    /// remaining handles are released (R1.3.4.a: no further messages
642    /// after terminal).
643    fn commit_batch(&self, node_id: NodeId, emissions: SmallVec<[FnEmission; 2]>) {
644        let mut iter = emissions.into_iter();
645        for em in iter.by_ref() {
646            match em {
647                FnEmission::Data(handle) => {
648                    self.commit_emission_verbatim(node_id, handle);
649                }
650                FnEmission::Complete => {
651                    self.complete(node_id);
652                    break;
653                }
654                FnEmission::Error(handle) => {
655                    self.error(node_id, handle);
656                    break;
657                }
658            }
659        }
660        // Release handles from any emissions after the terminal break.
661        for em in iter {
662            match em {
663                FnEmission::Data(h) | FnEmission::Error(h) => {
664                    self.binding.release_handle(h);
665                }
666                FnEmission::Complete => {}
667            }
668        }
669    }
670
671    // -------------------------------------------------------------------
672    // Emission commit — equals-substitution lives here
673    // -------------------------------------------------------------------
674
675    /// Apply a node's emission. `&self`-only; brackets the equals check
676    /// around a lock release so `BindingBoundary::custom_equals` can re-enter
677    /// Core safely.
678    ///
679    /// Phase 1 (lock-held): defensive terminal short-circuit; snapshot
680    /// equals_mode + old cache handle.
681    ///
682    /// Phase 2 (lock-released): call `handles_equal` — `EqualsMode::Identity`
683    /// is a pure `u64` compare with no boundary call; `EqualsMode::Custom`
684    /// crosses to the binding's `custom_equals` oracle, which may re-enter
685    /// Core.
686    ///
687    /// Phase 3 (lock-held): set cache, queue Dirty + Data/Resolved into
688    /// pending_notify (which snapshots subscribers on first touch),
689    /// propagate to children.
690    pub(crate) fn commit_emission(&self, node_id: NodeId, new_handle: HandleId) {
691        assert!(
692            new_handle != NO_HANDLE,
693            "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
694        );
695
696        // Phase 1: terminal short-circuit + snapshot equals/cache.
697        let snapshot = {
698            let s = self.lock_state();
699            let rec = s.require_node(node_id);
700            if rec.terminal.is_some() {
701                drop(s);
702                self.binding.release_handle(new_handle);
703                return;
704            }
705            (rec.cache, rec.equals)
706        };
707        let (old_handle, equals_mode) = snapshot;
708
709        // Slice G (2026-05-07): R1.3.2.d says equals substitution only
710        // fires for SINGLE-DATA waves at one node. Detect "this is a
711        // subsequent emit in the same wave at this node" via the
712        // wave-scoped `tier3_emitted_this_wave` set. If set → multi-emit
713        // wave: skip equals, queue Data verbatim, retroactively rewrite
714        // any prior Resolved (queued by an earlier same-value emit's
715        // equals match) to Data using the wave-start cache snapshot.
716        // Outside batch / first emit: standard per-emit equals path.
717        let is_subsequent_emit_in_wave = {
718            let s = self.lock_state();
719            s.tier3_emitted_this_wave.contains(&node_id)
720        };
721
722        if is_subsequent_emit_in_wave {
723            // Multi-emit wave detected. Skip equals, queue Data verbatim.
724            // Also rewrite any prior Resolved entries to Data using the
725            // wave-start cache snapshot.
726            self.rewrite_prior_resolved_to_data(node_id);
727            self.commit_emission_verbatim(node_id, new_handle);
728            return;
729        }
730
731        // Phase 2: equals check (lock-released for Custom).
732        let is_data = !self.handles_equal_lock_released(equals_mode, old_handle, new_handle);
733
734        // Phase 3: apply emission under the lock. Defensive terminal
735        // re-check — a concurrent cascade between phase 2 and phase 3
736        // could have terminated the node.
737        let mut s = self.lock_state();
738        if s.require_node(node_id).terminal.is_some() {
739            drop(s);
740            self.binding.release_handle(new_handle);
741            return;
742        }
743
744        // R1.3.1.a condition (b): synthesize DIRTY only if node not already
745        // dirty from an earlier emission in the same wave.
746        let already_dirty = s.require_node(node_id).dirty;
747        s.require_node_mut(node_id).dirty = true;
748        if !already_dirty {
749            self.queue_notify(&mut s, node_id, Message::Dirty);
750        }
751
752        if is_data {
753            // P3 (Slice A close /qa): re-read CURRENT cache. Same-thread
754            // re-entry from a `custom_equals` oracle that called back into
755            // `Core::emit` on this same node during phase 2's lock-released
756            // equals check could have advanced the cache between phase 1's
757            // snapshot (`old_handle`) and this point.
758            let current_cache = s.require_node(node_id).cache;
759            let snapshot_taken = if s.in_tick && current_cache != NO_HANDLE {
760                use std::collections::hash_map::Entry;
761                match s.wave_cache_snapshots.entry(node_id) {
762                    Entry::Vacant(slot) => {
763                        slot.insert(current_cache);
764                        true
765                    }
766                    Entry::Occupied(_) => false,
767                }
768            } else {
769                false
770            };
771            s.require_node_mut(node_id).cache = new_handle;
772            if current_cache != NO_HANDLE && !snapshot_taken {
773                self.binding.release_handle(current_cache);
774            }
775            // Slice E1 (R2.6.5 / Lock 6.G): push DATA into the replay
776            // buffer if the node opted in. RESOLVED entries are NOT
777            // buffered (canonical "DATA only").
778            self.push_replay_buffer(&mut s, node_id, new_handle);
779            // Slice G: mark this node as having emitted tier-3 in this wave.
780            s.tier3_emitted_this_wave.insert(node_id);
781            self.queue_notify(&mut s, node_id, Message::Data(new_handle));
782            // Propagate to children
783            let child_ids: Vec<NodeId> = s
784                .children
785                .get(&node_id)
786                .map(|c| c.iter().copied().collect())
787                .unwrap_or_default();
788            for child_id in child_ids {
789                let dep_idx = s.require_node(child_id).dep_index_of(node_id);
790                if let Some(idx) = dep_idx {
791                    self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
792                }
793            }
794        } else {
795            // RESOLVED: handle unchanged. Don't release; old still in use.
796            // Slice G: snapshot cache so a subsequent same-wave emit can
797            // rewrite this Resolved to Data using the snapshot.
798            let current_cache = s.require_node(node_id).cache;
799            if s.in_tick && current_cache != NO_HANDLE {
800                use std::collections::hash_map::Entry;
801                if let Entry::Vacant(slot) = s.wave_cache_snapshots.entry(node_id) {
802                    self.binding.retain_handle(current_cache);
803                    slot.insert(current_cache);
804                }
805            }
806            // Slice G: mark this node as having emitted tier-3 in this wave.
807            s.tier3_emitted_this_wave.insert(node_id);
808            self.queue_notify(&mut s, node_id, Message::Resolved);
809            let child_ids: Vec<NodeId> = s
810                .children
811                .get(&node_id)
812                .map(|c| c.iter().copied().collect())
813                .unwrap_or_default();
814            for child_id in child_ids {
815                let already_involved = s.require_node(child_id).involved_this_wave;
816                if !already_involved {
817                    {
818                        let child = s.require_node_mut(child_id);
819                        child.involved_this_wave = true;
820                        child.dirty = true;
821                    }
822                    self.queue_notify(&mut s, child_id, Message::Dirty);
823                    s.pending_auto_resolve.insert(child_id);
824                }
825            }
826        }
827    }
828
829    /// Slice G: when a multi-emit wave is detected at `node_id` (a second
830    /// emit arrives while a prior tier-3 message is still pending), rewrite
831    /// any `Resolved` entries from earlier emits to `Data(snapshot_cache)`
832    /// so the wave conforms to R1.3.3.a (≥1 DATA OR exactly 1 RESOLVED).
833    /// Touches both `pending_notify` (immediate-flush path) and the per-node
834    /// pause buffer (paused path).
835    fn rewrite_prior_resolved_to_data(&self, node_id: NodeId) {
836        let mut s = self.lock_state();
837        let snapshot = match s.wave_cache_snapshots.get(&node_id).copied() {
838            Some(h) if h != NO_HANDLE => h,
839            // No snapshot available — the prior Resolved was queued without
840            // a cache (sentinel pre-emit). Nothing to rewrite to; the
841            // multi-emit case from sentinel is fine (verbatim Data path).
842            _ => return,
843        };
844        let mut retains_needed = 0u32;
845        // Pending_notify path. Walk all batches' messages — Slice-G
846        // coalescing reasons about wave-content per node, not per-batch.
847        if let Some(entry) = s.pending_notify.get_mut(&node_id) {
848            for msg in entry.iter_messages_mut() {
849                if matches!(msg, Message::Resolved) {
850                    *msg = Message::Data(snapshot);
851                    retains_needed += 1;
852                }
853            }
854        }
855        // Pause-buffer path.
856        if let Some(rec) = s.nodes.get_mut(&node_id) {
857            if let crate::node::PauseState::Paused { buffer, .. } = &mut rec.pause_state {
858                for msg in &mut *buffer {
859                    if matches!(msg, Message::Resolved) {
860                        *msg = Message::Data(snapshot);
861                        retains_needed += 1;
862                    }
863                }
864            }
865        }
866        drop(s);
867        // Each rewritten Resolved → Data adds a payload retain that
868        // queue_notify would otherwise have taken at emit time. The
869        // snapshot already owns one retain (taken when cache was
870        // snapshotted); we need one fresh retain per rewrite.
871        for _ in 0..retains_needed {
872            self.binding.retain_handle(snapshot);
873        }
874    }
875
876    /// Equals check that crosses the binding boundary lock-released for
877    /// `EqualsMode::Custom`. Caller must NOT hold the state lock.
878    fn handles_equal_lock_released(&self, mode: EqualsMode, a: HandleId, b: HandleId) -> bool {
879        if a == b {
880            return true; // identity-on-handles always sufficient
881        }
882        if a == NO_HANDLE || b == NO_HANDLE {
883            return false;
884        }
885        match mode {
886            EqualsMode::Identity => false,
887            EqualsMode::Custom(handle) => self.binding.custom_equals(handle, a, b),
888        }
889    }
890
891    /// Commit a DATA emission **without** equals substitution — used by
892    /// `FnResult::Batch` processing where multi-message waves pass through
893    /// verbatim per R1.3.2.d / R1.3.3.c. DIRTY auto-prefix respects
894    /// R1.3.1.a condition (b): only queued if node not already dirty.
895    ///
896    /// Structurally identical to the DATA branch of [`Self::commit_emission`]
897    /// but skips the Phase 2 equals check entirely.
898    fn commit_emission_verbatim(&self, node_id: NodeId, new_handle: HandleId) {
899        assert!(
900            new_handle != NO_HANDLE,
901            "NO_HANDLE is not a valid DATA payload (R1.2.4) for node {node_id:?}",
902        );
903
904        let mut s = self.lock_state();
905        let rec = s.require_node(node_id);
906        if rec.terminal.is_some() {
907            drop(s);
908            self.binding.release_handle(new_handle);
909            return;
910        }
911
912        // R1.3.1.a condition (b): DIRTY only if not already dirty.
913        let already_dirty = s.require_node(node_id).dirty;
914        s.require_node_mut(node_id).dirty = true;
915        if !already_dirty {
916            self.queue_notify(&mut s, node_id, Message::Dirty);
917        }
918
919        // Always DATA — no equals substitution for Batch emissions.
920        let current_cache = s.require_node(node_id).cache;
921        let snapshot_taken = if s.in_tick && current_cache != NO_HANDLE {
922            use std::collections::hash_map::Entry;
923            match s.wave_cache_snapshots.entry(node_id) {
924                Entry::Vacant(slot) => {
925                    slot.insert(current_cache);
926                    true
927                }
928                Entry::Occupied(_) => false,
929            }
930        } else {
931            false
932        };
933        s.require_node_mut(node_id).cache = new_handle;
934        if current_cache != NO_HANDLE && !snapshot_taken {
935            self.binding.release_handle(current_cache);
936        }
937        // Slice E1: replay buffer push (R2.6.5 / Lock 6.G).
938        self.push_replay_buffer(&mut s, node_id, new_handle);
939        // Slice G QA fix (A2, 2026-05-07): mark tier3_emitted_this_wave
940        // even on the verbatim path. A subsequent commit_emission at the
941        // same node in the same wave needs this flag to detect multi-emit
942        // and skip equals substitution; without it, a Batch-then-standard
943        // sequence would queue Resolved into a wave that already has Data
944        // — violating R1.3.3.a. The Batch path itself still passes
945        // verbatim per R1.3.3.c (we don't re-run equals here); we just
946        // record that "this node has emitted tier-3 in this wave."
947        s.tier3_emitted_this_wave.insert(node_id);
948        self.queue_notify(&mut s, node_id, Message::Data(new_handle));
949        // Propagate to children
950        let child_ids: Vec<NodeId> = s
951            .children
952            .get(&node_id)
953            .map(|c| c.iter().copied().collect())
954            .unwrap_or_default();
955        for child_id in child_ids {
956            let dep_idx = s.require_node(child_id).dep_index_of(node_id);
957            if let Some(idx) = dep_idx {
958                self.deliver_data_to_consumer(&mut s, child_id, idx, new_handle);
959            }
960        }
961    }
962
963    /// Slice E1 (R2.6.5 / Lock 6.G): push a DATA handle into the node's
964    /// replay buffer if opted in. Evicts oldest if cap exceeded; takes a
965    /// fresh retain on push. RESOLVED is NOT buffered per canonical
966    /// "DATA only" — call sites only invoke this for Data emissions.
967    ///
968    /// Evicted handle is queued into `s.deferred_handle_releases`
969    /// (released lock-released at flush time) per the binding-boundary
970    /// lock-release discipline — `release_handle` may re-enter Core via
971    /// finalizers and must not run while the state lock is held
972    /// (QA A3, 2026-05-07).
973    fn push_replay_buffer(&self, s: &mut CoreState, node_id: NodeId, new_handle: HandleId) {
974        let rec = s.require_node_mut(node_id);
975        let cap = match rec.replay_buffer_cap {
976            Some(c) if c > 0 => c,
977            _ => return,
978        };
979        self.binding.retain_handle(new_handle);
980        rec.replay_buffer.push_back(new_handle);
981        let evicted = if rec.replay_buffer.len() > cap {
982            rec.replay_buffer.pop_front()
983        } else {
984            None
985        };
986        if let Some(h) = evicted {
987            s.deferred_handle_releases.push(h);
988        }
989    }
990
991    // ===================================================================
992    // Operator dispatch (Slice C-1, D009).
993    //
994    // `fire_operator` is the entry point for nodes whose `kind` is
995    // `NodeKind::Operator(_)`. It branches on the `OperatorOp` discriminant
996    // to per-operator helpers that snapshot inputs under the lock, drop the
997    // lock to call the binding's bulk projection FFI, and reacquire to
998    // apply emissions via `commit_emission_verbatim` (no per-item equals
999    // dedup at the wire — operator output passes verbatim per the same
1000    // R1.3.2.d / R1.3.3.c rule that governs `FnResult::Batch`).
1001    //
1002    // **Refcount discipline:** inputs sourced from `dep_records[i].data_batch`
1003    // share retains owned by the wave's data-batch slot (released at
1004    // wave-end rotation in `clear_wave_state`). Operators that emit those
1005    // handles unchanged (`Filter`, `DistinctUntilChanged`, `Pairwise`'s
1006    // `prev` carry-over) take an additional retain via `retain_handle`
1007    // before passing to `commit_emission_verbatim` — the cache slot owns
1008    // its own share, independent of the data-batch slot's. Operators that
1009    // produce fresh handles (`Map` / `Scan` / `Reduce` / `Pairwise`'s
1010    // packed tuples) receive retains pre-bumped by the binding's bulk-
1011    // projection method.
1012    // ===================================================================
1013
1014    /// Operator dispatch entry. Pre-checks (terminal short-circuit, first-
1015    /// run gate accounting for `partial`, terminal-aware fire for `Reduce`)
1016    /// happen here; per-operator behavior lives in the `fire_op_*` helpers.
1017    fn fire_operator(&self, node_id: NodeId, op: OperatorOp) {
1018        // Phase 1 (lock-held): remove from pending_fires, evaluate skip.
1019        let proceed = {
1020            let mut s = self.lock_state();
1021            s.pending_fires.remove(&node_id);
1022            let rec = s.require_node(node_id);
1023            if rec.terminal.is_some() {
1024                false
1025            } else {
1026                // First-run gate (R2.5.3 / R5.4). Partial-mode operators
1027                // (D011) opt out of the gate; otherwise we wait for every
1028                // dep to have delivered at least one real handle. Terminal-
1029                // aware operators (currently `Reduce`) additionally count a
1030                // dep terminal as "real input" so they can fire on
1031                // upstream COMPLETE-without-DATA and emit the seed.
1032                let has_real_input = !rec.has_sentinel_deps()
1033                    || rec.dep_records.iter().any(|dr| dr.terminal.is_some());
1034                rec.partial || has_real_input
1035            }
1036        };
1037        if !proceed {
1038            return;
1039        }
1040
1041        // A6 (Slice F, 2026-05-07): track operator fire on the
1042        // `currently_firing` stack so a binding-side project/predicate/fold
1043        // FFI callback that re-enters `Core::set_deps(node_id, ...)` is
1044        // rejected with `SetDepsError::ReentrantOnFiringNode`. Drop pops
1045        // the stack on panic too.
1046        let _firing = FiringGuard::new(self, node_id);
1047
1048        match op {
1049            OperatorOp::Map { fn_id } => self.fire_op_map(node_id, fn_id),
1050            OperatorOp::Filter { fn_id } => self.fire_op_filter(node_id, fn_id),
1051            OperatorOp::Scan { fn_id, .. } => self.fire_op_scan(node_id, fn_id),
1052            OperatorOp::Reduce { fn_id, .. } => self.fire_op_reduce(node_id, fn_id),
1053            OperatorOp::DistinctUntilChanged { equals_fn_id } => {
1054                self.fire_op_distinct(node_id, equals_fn_id);
1055            }
1056            OperatorOp::Pairwise { fn_id } => self.fire_op_pairwise(node_id, fn_id),
1057            OperatorOp::Combine { pack_fn } => self.fire_op_combine(node_id, pack_fn),
1058            OperatorOp::WithLatestFrom { pack_fn } => {
1059                self.fire_op_with_latest_from(node_id, pack_fn);
1060            }
1061            OperatorOp::Merge => self.fire_op_merge(node_id),
1062            OperatorOp::Take { count } => self.fire_op_take(node_id, count),
1063            OperatorOp::Skip { count } => self.fire_op_skip(node_id, count),
1064            OperatorOp::TakeWhile { fn_id } => self.fire_op_take_while(node_id, fn_id),
1065            // The variant carries `default` for `register_operator`'s
1066            // `make_op_scratch` path; once registered, the live default
1067            // is read from `LastState::default` inside `fire_op_last`.
1068            OperatorOp::Last { .. } => self.fire_op_last(node_id),
1069        }
1070    }
1071
1072    /// Snapshot the operator's single dep batch (transform constraint —
1073    /// R5.7 single-dep). Returns `(inputs, terminal)` where `inputs` is a
1074    /// fresh `Vec<HandleId>` (no retains) and `terminal` reflects
1075    /// `dep_records[0].terminal` at snapshot time.
1076    fn snapshot_op_dep0(&self, node_id: NodeId) -> (Vec<HandleId>, Option<TerminalKind>) {
1077        let s = self.lock_state();
1078        let rec = s.require_node(node_id);
1079        debug_assert!(
1080            !rec.dep_records.is_empty(),
1081            "transform operator must have ≥1 dep"
1082        );
1083        let dr = &rec.dep_records[0];
1084        (dr.data_batch.iter().copied().collect(), dr.terminal)
1085    }
1086
1087    /// Emit DIRTY (if not already dirty) followed by RESOLVED. Used by
1088    /// silent-drop operators (Filter / DistinctUntilChanged / Pairwise)
1089    /// when a wave's inputs all suppress and the operator needs to settle
1090    /// the wave for its subscribers (D018 — let DIRTY ride; queue RESOLVED
1091    /// on full-reject).
1092    fn settle_dirty_resolved(&self, node_id: NodeId) {
1093        let mut s = self.lock_state();
1094        if s.require_node(node_id).terminal.is_some() {
1095            return;
1096        }
1097        let already_dirty = s.require_node(node_id).dirty;
1098        s.require_node_mut(node_id).dirty = true;
1099        if !already_dirty {
1100            self.queue_notify(&mut s, node_id, Message::Dirty);
1101        }
1102        // Slice G: skip Resolved if pending_notify already has a tier-3
1103        // message — adding Resolved would violate R1.3.3.a.
1104        let already_tier3 = s
1105            .pending_notify
1106            .get(&node_id)
1107            .is_some_and(|entry| entry.iter_messages().any(|m| m.tier() == 3));
1108        if !already_tier3 {
1109            self.queue_notify(&mut s, node_id, Message::Resolved);
1110        }
1111    }
1112
1113    /// `OperatorOp::Map` dispatch.
1114    fn fire_op_map(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1115        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1116        // Mark fired regardless of input count (activation gate already
1117        // satisfied or partial-mode).
1118        {
1119            let mut s = self.lock_state();
1120            s.require_node_mut(node_id).has_fired_once = true;
1121        }
1122        if inputs.is_empty() {
1123            return;
1124        }
1125        // Phase 2 (lock-released): bulk project. Binding returns one
1126        // handle per input, each with a retain share already taken.
1127        let outputs = self.binding.project_each(fn_id, &inputs);
1128        // Phase 3: emit each output. `commit_emission_verbatim` consumes
1129        // the retain into the cache slot (and releases the prior cache
1130        // handle internally).
1131        for h in outputs {
1132            self.commit_emission_verbatim(node_id, h);
1133        }
1134    }
1135
1136    /// `OperatorOp::Filter` dispatch (D012/D018).
1137    fn fire_op_filter(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1138        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1139        {
1140            let mut s = self.lock_state();
1141            s.require_node_mut(node_id).has_fired_once = true;
1142        }
1143        if inputs.is_empty() {
1144            return;
1145        }
1146        // Phase 2: predicate per input.
1147        let pass = self.binding.predicate_each(fn_id, &inputs);
1148        debug_assert!(
1149            pass.len() == inputs.len(),
1150            "predicate_each returned {} bools for {} inputs",
1151            pass.len(),
1152            inputs.len()
1153        );
1154        // Phase 3: emit passing items verbatim. Take a fresh retain for
1155        // each — the data_batch slot still owns its retain (released at
1156        // wave-end rotation), and the cache slot needs its own.
1157        let mut emitted = 0usize;
1158        for (i, &h) in inputs.iter().enumerate() {
1159            if pass.get(i).copied().unwrap_or(false) {
1160                self.binding.retain_handle(h);
1161                self.commit_emission_verbatim(node_id, h);
1162                emitted += 1;
1163            }
1164        }
1165        // D018: full-reject settles with DIRTY+RESOLVED.
1166        if emitted == 0 {
1167            self.settle_dirty_resolved(node_id);
1168        }
1169    }
1170
1171    /// `OperatorOp::Scan` dispatch — left-fold emitting each new acc.
1172    fn fire_op_scan(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1173        use crate::op_state::ScanState;
1174        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1175        let acc = {
1176            let s = self.lock_state();
1177            scratch_ref::<ScanState>(&s, node_id).acc
1178        };
1179        {
1180            let mut s = self.lock_state();
1181            s.require_node_mut(node_id).has_fired_once = true;
1182        }
1183        if inputs.is_empty() {
1184            return;
1185        }
1186        // Phase 2: fold each input through. Returns N new handles, each
1187        // with a fresh retain.
1188        let new_states = self.binding.fold_each(fn_id, acc, &inputs);
1189        debug_assert!(
1190            new_states.len() == inputs.len(),
1191            "fold_each returned {} accs for {} inputs",
1192            new_states.len(),
1193            inputs.len()
1194        );
1195        // Phase 3a: update ScanState.acc to the LAST new acc. Take an
1196        // extra retain for the slot; release the prior acc's slot retain.
1197        let last_acc = new_states.last().copied();
1198        if let Some(last) = last_acc {
1199            let prev_acc = {
1200                let mut s = self.lock_state();
1201                let scratch = scratch_mut::<ScanState>(&mut s, node_id);
1202                let prev = scratch.acc;
1203                scratch.acc = last;
1204                prev
1205            };
1206            // Take the slot's retain on the new acc.
1207            self.binding.retain_handle(last);
1208            // Release the prior slot's retain (post-lock to keep binding
1209            // free to re-enter Core safely).
1210            if prev_acc != crate::handle::NO_HANDLE {
1211                self.binding.release_handle(prev_acc);
1212            }
1213        }
1214        // Phase 3b: emit each intermediate acc verbatim. `new_states`
1215        // entries each carry one retain from `fold_each`; that retain is
1216        // consumed by `commit_emission_verbatim` into the cache slot.
1217        for h in new_states {
1218            self.commit_emission_verbatim(node_id, h);
1219        }
1220    }
1221
1222    /// `OperatorOp::Reduce` dispatch — accumulates silently; emits acc on
1223    /// upstream COMPLETE (cascades ERROR verbatim).
1224    fn fire_op_reduce(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1225        use crate::op_state::ReduceState;
1226        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
1227        let acc = {
1228            let s = self.lock_state();
1229            scratch_ref::<ReduceState>(&s, node_id).acc
1230        };
1231        {
1232            let mut s = self.lock_state();
1233            s.require_node_mut(node_id).has_fired_once = true;
1234        }
1235        // Phase 2: accumulate (silent — no per-input emit).
1236        let new_states = if inputs.is_empty() {
1237            SmallVec::<[HandleId; 1]>::new()
1238        } else {
1239            self.binding.fold_each(fn_id, acc, &inputs)
1240        };
1241        debug_assert!(
1242            new_states.len() == inputs.len(),
1243            "fold_each returned {} accs for {} inputs",
1244            new_states.len(),
1245            inputs.len()
1246        );
1247        // Update ReduceState.acc to last new acc; release intermediate
1248        // states (we don't emit them) and the prior acc's slot retain.
1249        let last_acc = new_states.last().copied();
1250        let intermediates_to_release: Vec<HandleId> = if new_states.len() > 1 {
1251            new_states[..new_states.len() - 1].to_vec()
1252        } else {
1253            Vec::new()
1254        };
1255        let prev_acc_to_release = if let Some(last) = last_acc {
1256            let prev_acc = {
1257                let mut s = self.lock_state();
1258                let scratch = scratch_mut::<ReduceState>(&mut s, node_id);
1259                let prev = scratch.acc;
1260                scratch.acc = last;
1261                prev
1262            };
1263            self.binding.retain_handle(last);
1264            if prev_acc == crate::handle::NO_HANDLE {
1265                None
1266            } else {
1267                Some(prev_acc)
1268            }
1269        } else {
1270            None
1271        };
1272        // Release intermediate fold results (Reduce only emits the LAST,
1273        // but only on terminal). Each was retained by `fold_each`.
1274        for h in intermediates_to_release {
1275            self.binding.release_handle(h);
1276        }
1277        if let Some(h) = prev_acc_to_release {
1278            self.binding.release_handle(h);
1279        }
1280
1281        // Phase 3: emit on terminal.
1282        match terminal {
1283            None => {
1284                // Still accumulating; no emit. Subscribers see no message
1285                // for this wave (silent accumulation). The first wave that
1286                // pushes Reduce to fire produces a Dirty entry on the
1287                // upstream's commit, but Reduce itself doesn't queue any
1288                // tier-3 since R5 silently absorbs. v1: leave the
1289                // post-drain auto-resolve sweep to settle nothing —
1290                // pending_notify has no entry for Reduce so the sweep is
1291                // a no-op.
1292            }
1293            Some(TerminalKind::Complete) => {
1294                // Read the live acc (may be the seed if no DATA arrived)
1295                // and emit Data(acc) + Complete.
1296                let final_acc = {
1297                    let s = self.lock_state();
1298                    scratch_ref::<ReduceState>(&s, node_id).acc
1299                };
1300                if final_acc != crate::handle::NO_HANDLE {
1301                    // Emission needs its own retain (slot's retain is
1302                    // owned by ReduceState.acc until reset/Drop).
1303                    self.binding.retain_handle(final_acc);
1304                    self.commit_emission_verbatim(node_id, final_acc);
1305                }
1306                self.complete(node_id);
1307            }
1308            Some(TerminalKind::Error(h)) => {
1309                // Core::error transfers the caller's share into the
1310                // cascade (node.terminal + per-child dep_terminal slots);
1311                // no release at the error() boundary. Take a fresh share
1312                // here so the cascade owns it independently of the
1313                // dep_records[0].terminal slot's share.
1314                self.binding.retain_handle(h);
1315                self.error(node_id, h);
1316            }
1317        }
1318    }
1319
1320    /// `OperatorOp::DistinctUntilChanged` dispatch.
1321    fn fire_op_distinct(&self, node_id: NodeId, equals_fn_id: crate::handle::FnId) {
1322        use crate::op_state::DistinctState;
1323        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1324        let mut prev = {
1325            let s = self.lock_state();
1326            scratch_ref::<DistinctState>(&s, node_id).prev
1327        };
1328        {
1329            let mut s = self.lock_state();
1330            s.require_node_mut(node_id).has_fired_once = true;
1331        }
1332        if inputs.is_empty() {
1333            return;
1334        }
1335        // Take a working-copy retain on the initial prev so both the loop
1336        // (which releases old_prev on each non-equal item) and phase 3
1337        // (which releases the slot's original handle) each have their own
1338        // share. Without this, the loop's release of old_prev (== original
1339        // DistinctState.prev) double-releases against phase 3's stale_slot
1340        // release.
1341        if prev != crate::handle::NO_HANDLE {
1342            self.binding.retain_handle(prev);
1343        }
1344        // Phase 2: per-input equals(prev, current). Each non-equal input
1345        // is emitted and becomes the new prev. Equals fn_id reuses
1346        // `BindingBoundary::custom_equals`.
1347        let mut emitted = 0usize;
1348        for &h in &inputs {
1349            let equal = if prev == crate::handle::NO_HANDLE {
1350                false
1351            } else if prev == h {
1352                true
1353            } else {
1354                self.binding.custom_equals(equals_fn_id, prev, h)
1355            };
1356            if !equal {
1357                // Emit this input verbatim.
1358                self.binding.retain_handle(h);
1359                self.commit_emission_verbatim(node_id, h);
1360                // Update prev: take retain on new prev, release old
1361                // (working-copy retain from above or from prior iteration).
1362                self.binding.retain_handle(h);
1363                let old_prev = prev;
1364                prev = h;
1365                if old_prev != crate::handle::NO_HANDLE {
1366                    self.binding.release_handle(old_prev);
1367                }
1368                emitted += 1;
1369            }
1370        }
1371        // Phase 3: persist prev into DistinctState.prev slot. Release the
1372        // slot's original retain (stale_slot) — this is the slot-owned
1373        // share, independent of the working-copy share released in the
1374        // loop above.
1375        {
1376            let mut s = self.lock_state();
1377            let scratch = scratch_mut::<DistinctState>(&mut s, node_id);
1378            let stale_slot = scratch.prev;
1379            scratch.prev = prev;
1380            if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
1381                drop(s);
1382                self.binding.release_handle(stale_slot);
1383            }
1384        }
1385        // Release the working-copy retain on the final prev if it was
1386        // never released in the loop (i.e. no non-equal items passed,
1387        // prev == original). In that case stale_slot == prev, so phase 3
1388        // didn't release it either — but the working-copy retain is still
1389        // outstanding. Release it now.
1390        if emitted == 0 && prev != crate::handle::NO_HANDLE {
1391            self.binding.release_handle(prev);
1392        }
1393        if emitted == 0 {
1394            self.settle_dirty_resolved(node_id);
1395        }
1396    }
1397
1398    /// `OperatorOp::Pairwise` dispatch — emits `(prev, current)` tuples
1399    /// starting after the second value (first input swallowed, sets `prev`).
1400    fn fire_op_pairwise(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1401        use crate::op_state::PairwiseState;
1402        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1403        let mut prev = {
1404            let s = self.lock_state();
1405            scratch_ref::<PairwiseState>(&s, node_id).prev
1406        };
1407        {
1408            let mut s = self.lock_state();
1409            s.require_node_mut(node_id).has_fired_once = true;
1410        }
1411        if inputs.is_empty() {
1412            return;
1413        }
1414        let mut emitted = 0usize;
1415        for &h in &inputs {
1416            if prev == crate::handle::NO_HANDLE {
1417                // First-ever value — swallow, set prev. Retain for the
1418                // PairwiseState.prev slot (persisted in phase 3 below).
1419                self.binding.retain_handle(h);
1420                prev = h;
1421                continue;
1422            }
1423            // Pack (prev, current) into a tuple handle. Binding returns a
1424            // fresh retain on the packed handle.
1425            let packed = self.binding.pairwise_pack(fn_id, prev, h);
1426            self.commit_emission_verbatim(node_id, packed);
1427            // Advance prev: take retain on h, release old prev.
1428            self.binding.retain_handle(h);
1429            let old_prev = prev;
1430            prev = h;
1431            self.binding.release_handle(old_prev);
1432            emitted += 1;
1433        }
1434        // Persist prev into PairwiseState.prev slot.
1435        {
1436            let mut s = self.lock_state();
1437            let scratch = scratch_mut::<PairwiseState>(&mut s, node_id);
1438            let stale_slot = scratch.prev;
1439            scratch.prev = prev;
1440            if stale_slot != prev && stale_slot != crate::handle::NO_HANDLE {
1441                drop(s);
1442                self.binding.release_handle(stale_slot);
1443            }
1444        }
1445        if emitted == 0 {
1446            self.settle_dirty_resolved(node_id);
1447        }
1448    }
1449
1450    // =================================================================
1451    // Slice C-2: multi-dep combinator operators (D020)
1452    // =================================================================
1453
1454    /// Snapshot all deps' "latest" handle for multi-dep combinators.
1455    /// For each dep: returns `data_batch.last()` if non-empty (dep fired
1456    /// this wave), else `prev_data` (last handle from previous wave).
1457    /// Also returns whether dep[0] (primary) had DATA this wave —
1458    /// needed by `fire_op_with_latest_from`.
1459    fn snapshot_op_all_latest(&self, node_id: NodeId) -> (SmallVec<[HandleId; 4]>, bool) {
1460        let s = self.lock_state();
1461        let rec = s.require_node(node_id);
1462        let primary_fired = rec
1463            .dep_records
1464            .first()
1465            .is_some_and(|dr| !dr.data_batch.is_empty());
1466        let latest: SmallVec<[HandleId; 4]> = rec
1467            .dep_records
1468            .iter()
1469            .map(|dr| dr.data_batch.last().copied().unwrap_or(dr.prev_data))
1470            .collect();
1471        (latest, primary_fired)
1472    }
1473
1474    /// `OperatorOp::Combine` dispatch — N-dep combineLatest. Packs the
1475    /// latest handle per dep into a tuple via `pack_tuple`, emits on
1476    /// any dep fire. First-run gate (R2.5.3, partial: false) guarantees
1477    /// all deps have a real handle on first fire. Post-warmup INVALIDATE
1478    /// guard: if any dep's prev_data was cleared, settles with RESOLVED
1479    /// instead of packing a NO_HANDLE into the tuple.
1480    fn fire_op_combine(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
1481        let (latest, _primary_fired) = self.snapshot_op_all_latest(node_id);
1482        {
1483            let mut s = self.lock_state();
1484            s.require_node_mut(node_id).has_fired_once = true;
1485        }
1486        // Post-warmup INVALIDATE guard: a dep may have been invalidated
1487        // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
1488        if latest.contains(&crate::handle::NO_HANDLE) {
1489            self.settle_dirty_resolved(node_id);
1490            return;
1491        }
1492        let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
1493        self.commit_emission_verbatim(node_id, tuple_handle);
1494    }
1495
1496    /// `OperatorOp::WithLatestFrom` dispatch — 2-dep, fire-on-primary-only
1497    /// (D021 / Phase 10.5). Emits `[primary, secondary]` pair only when
1498    /// dep[0] (primary) has DATA in the wave. If only dep[1] fires →
1499    /// RESOLVED. Post-warmup INVALIDATE guard: if secondary latest is
1500    /// `NO_HANDLE` (INVALIDATE cleared it), settles with RESOLVED.
1501    fn fire_op_with_latest_from(&self, node_id: NodeId, pack_fn: crate::handle::FnId) {
1502        let (latest, primary_fired) = self.snapshot_op_all_latest(node_id);
1503        let first_fire = {
1504            let mut s = self.lock_state();
1505            let rec = s.require_node_mut(node_id);
1506            let was_first = !rec.has_fired_once;
1507            rec.has_fired_once = true;
1508            was_first
1509        };
1510        // On first fire (gate release), always emit — the first-run gate
1511        // guarantees both deps have values (via prev_data fallback in
1512        // snapshot). On subsequent fires, only emit when primary fires.
1513        if !first_fire && !primary_fired {
1514            // Secondary-only update — no downstream DATA.
1515            self.settle_dirty_resolved(node_id);
1516            return;
1517        }
1518        // Post-warmup INVALIDATE guard: secondary may have been invalidated
1519        // (prev_data cleared to NO_HANDLE) and not yet re-delivered.
1520        debug_assert!(latest.len() == 2, "withLatestFrom requires exactly 2 deps");
1521        if latest[1] == crate::handle::NO_HANDLE {
1522            self.settle_dirty_resolved(node_id);
1523            return;
1524        }
1525        let tuple_handle = self.binding.pack_tuple(pack_fn, &latest);
1526        self.commit_emission_verbatim(node_id, tuple_handle);
1527    }
1528
1529    /// `OperatorOp::Merge` dispatch — N-dep, forward all DATA handles
1530    /// verbatim (D022). Zero FFI on fire: no transformation. Each dep's
1531    /// batch handles are collected, retained, and emitted individually.
1532    fn fire_op_merge(&self, node_id: NodeId) {
1533        // Collect all batch handles from all deps (flat).
1534        let all_handles: Vec<HandleId> = {
1535            let s = self.lock_state();
1536            let rec = s.require_node(node_id);
1537            rec.dep_records
1538                .iter()
1539                .flat_map(|dr| dr.data_batch.iter().copied())
1540                .collect()
1541        };
1542        {
1543            let mut s = self.lock_state();
1544            s.require_node_mut(node_id).has_fired_once = true;
1545        }
1546        if all_handles.is_empty() {
1547            // All deps settled RESOLVED this wave — no DATA to forward.
1548            self.settle_dirty_resolved(node_id);
1549            return;
1550        }
1551        // Emit each handle verbatim. Take a fresh retain per handle
1552        // (independent of the dep batch's retain which gets released at
1553        // wave-end). Matches Filter's discipline for passing inputs.
1554        for &h in &all_handles {
1555            self.binding.retain_handle(h);
1556            self.commit_emission_verbatim(node_id, h);
1557        }
1558    }
1559
1560    // =================================================================
1561    // Slice C-3: flow operators (D024)
1562    // =================================================================
1563
1564    /// `OperatorOp::Take` dispatch — emits the first `count` DATA values
1565    /// then self-completes via `Core::complete`. When `count == 0`, the
1566    /// first fire emits zero items then immediately self-completes
1567    /// (D027). Cross-wave counter lives in
1568    /// [`TakeState::count_emitted`](super::op_state::TakeState::count_emitted).
1569    fn fire_op_take(&self, node_id: NodeId, count: u32) {
1570        use crate::op_state::TakeState;
1571        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
1572        // Snapshot current counter; mark fired regardless of input count
1573        // (activation gate already satisfied or partial-mode).
1574        let mut count_emitted = {
1575            let s = self.lock_state();
1576            scratch_ref::<TakeState>(&s, node_id).count_emitted
1577        };
1578        {
1579            let mut s = self.lock_state();
1580            s.require_node_mut(node_id).has_fired_once = true;
1581        }
1582        // Already at quota before any input this wave — self-complete
1583        // immediately. Covers `count == 0` (first-fire short-circuit) and
1584        // any defensive re-entry after the terminal-skip in `fire_operator`
1585        // already guards against double-complete.
1586        if count_emitted >= count {
1587            self.complete(node_id);
1588            return;
1589        }
1590        // Per-input emission loop. Each pass takes a fresh retain for the
1591        // cache slot; data_batch slot's retain is released at wave-end
1592        // rotation independently.
1593        for &h in &inputs {
1594            self.binding.retain_handle(h);
1595            self.commit_emission_verbatim(node_id, h);
1596            count_emitted = count_emitted.saturating_add(1);
1597            if count_emitted >= count {
1598                break;
1599            }
1600        }
1601        // Persist the updated counter.
1602        {
1603            let mut s = self.lock_state();
1604            scratch_mut::<TakeState>(&mut s, node_id).count_emitted = count_emitted;
1605        }
1606        // Self-complete if we hit the quota this wave. Upstream COMPLETE
1607        // (terminal == Some(Complete)) without us hitting the count
1608        // propagates via the standard auto-cascade — we don't intercept it.
1609        if count_emitted >= count {
1610            self.complete(node_id);
1611            return;
1612        }
1613        // If upstream is already Errored and we haven't hit count, the
1614        // standard cascade will propagate it. If the wave delivered no
1615        // inputs (e.g. RESOLVED from upstream), settle DIRTY+RESOLVED so
1616        // subscribers see the wave close.
1617        if inputs.is_empty() && terminal.is_none() {
1618            self.settle_dirty_resolved(node_id);
1619        }
1620    }
1621
1622    /// `OperatorOp::Skip` dispatch — drops the first `count` DATA values,
1623    /// then forwards the rest. Cross-wave counter lives in
1624    /// [`SkipState::count_skipped`](super::op_state::SkipState::count_skipped).
1625    /// On a wave where every input is still in the skip window, settles
1626    /// DIRTY+RESOLVED (D018 pattern) so subscribers see the wave close.
1627    fn fire_op_skip(&self, node_id: NodeId, count: u32) {
1628        use crate::op_state::SkipState;
1629        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1630        let mut count_skipped = {
1631            let s = self.lock_state();
1632            scratch_ref::<SkipState>(&s, node_id).count_skipped
1633        };
1634        {
1635            let mut s = self.lock_state();
1636            s.require_node_mut(node_id).has_fired_once = true;
1637        }
1638        // No early-return on empty inputs: the post-loop `emitted == 0`
1639        // settle handles the empty-inputs case identically to the
1640        // all-swallowed-by-skip-window case (Slice C-3 /qa P6 — symmetry
1641        // with `fire_op_take`).
1642        let mut emitted = 0usize;
1643        for &h in &inputs {
1644            if count_skipped < count {
1645                count_skipped = count_skipped.saturating_add(1);
1646                // Drop this input — the data_batch slot still owns its
1647                // retain (released at wave-end rotation). No emission.
1648                continue;
1649            }
1650            // Past the skip window — emit verbatim. Take a fresh retain
1651            // for the cache slot.
1652            self.binding.retain_handle(h);
1653            self.commit_emission_verbatim(node_id, h);
1654            emitted += 1;
1655        }
1656        // Persist the updated counter.
1657        {
1658            let mut s = self.lock_state();
1659            scratch_mut::<SkipState>(&mut s, node_id).count_skipped = count_skipped;
1660        }
1661        if emitted == 0 {
1662            self.settle_dirty_resolved(node_id);
1663        }
1664    }
1665
1666    /// `OperatorOp::TakeWhile` dispatch — emits while the predicate
1667    /// holds; on the first `false`, emits any preceding passes from the
1668    /// same batch then self-completes via `Core::complete`. Reuses
1669    /// [`BindingBoundary::predicate_each`] (D029).
1670    fn fire_op_take_while(&self, node_id: NodeId, fn_id: crate::handle::FnId) {
1671        let (inputs, _terminal) = self.snapshot_op_dep0(node_id);
1672        {
1673            let mut s = self.lock_state();
1674            s.require_node_mut(node_id).has_fired_once = true;
1675        }
1676        if inputs.is_empty() {
1677            return;
1678        }
1679        // Phase 2: predicate per input.
1680        let pass = self.binding.predicate_each(fn_id, &inputs);
1681        debug_assert!(
1682            pass.len() == inputs.len(),
1683            "predicate_each returned {} bools for {} inputs",
1684            pass.len(),
1685            inputs.len()
1686        );
1687        // Phase 3: emit each input until the first false; then
1688        // self-complete. `fire_operator`'s `terminal.is_some()`
1689        // short-circuit gates re-entry after the self-complete cascade
1690        // installs the terminal slot — no extra `done` flag needed.
1691        let mut emitted = 0usize;
1692        let mut first_false_seen = false;
1693        for (i, &h) in inputs.iter().enumerate() {
1694            if pass.get(i).copied().unwrap_or(false) {
1695                self.binding.retain_handle(h);
1696                self.commit_emission_verbatim(node_id, h);
1697                emitted += 1;
1698            } else {
1699                first_false_seen = true;
1700                break;
1701            }
1702        }
1703        if first_false_seen {
1704            self.complete(node_id);
1705            return;
1706        }
1707        if emitted == 0 {
1708            // Whole batch passed but was empty (impossible here since
1709            // inputs.is_empty() returned early above) — defensive only.
1710            self.settle_dirty_resolved(node_id);
1711        }
1712    }
1713
1714    /// `OperatorOp::Last` dispatch — buffers the latest DATA; emits
1715    /// `Data(latest)` (or `Data(default)` if no DATA arrived and a
1716    /// default was registered) then `Complete` on upstream COMPLETE.
1717    /// On upstream ERROR, propagates verbatim. Storage:
1718    /// [`LastState`](super::op_state::LastState).
1719    ///
1720    /// **Silent-buffer semantics (mirrors Reduce):** on a non-terminal
1721    /// wave (`terminal == None`), `fire_op_last` updates the buffered
1722    /// `latest` handle but produces NO downstream wire message —
1723    /// subscribers observe the operator only when upstream
1724    /// COMPLETE/ERROR triggers the terminal branch. Intermediate
1725    /// inputs from the dep's batch are dropped on the floor (their
1726    /// `data_batch` retains release at wave-end rotation
1727    /// independently). Per-wave settlement on intermediate waves is
1728    /// the canonical behavior for terminal-aware operators.
1729    fn fire_op_last(&self, node_id: NodeId) {
1730        use crate::op_state::LastState;
1731        let (inputs, terminal) = self.snapshot_op_dep0(node_id);
1732        {
1733            let mut s = self.lock_state();
1734            s.require_node_mut(node_id).has_fired_once = true;
1735        }
1736
1737        // Phase 2: buffer the latest input handle (if any). Retain new,
1738        // release old. data_batch slot's retain is released at wave-end
1739        // rotation independently — the LastState slot keeps its own
1740        // share so the value survives across waves.
1741        if let Some(&new_latest) = inputs.last() {
1742            let prev_latest = {
1743                let mut s = self.lock_state();
1744                let scratch = scratch_mut::<LastState>(&mut s, node_id);
1745                let prev = scratch.latest;
1746                scratch.latest = new_latest;
1747                prev
1748            };
1749            self.binding.retain_handle(new_latest);
1750            if prev_latest != crate::handle::NO_HANDLE {
1751                self.binding.release_handle(prev_latest);
1752            }
1753        }
1754
1755        // Phase 3: emit on terminal. Buffer-only fires (no terminal yet)
1756        // produce no downstream message — Reduce-style silent
1757        // accumulation. The post-drain auto-resolve sweep is a no-op
1758        // because pending_notify has no entry for Last.
1759        match terminal {
1760            None => {}
1761            Some(TerminalKind::Complete) => {
1762                // Read the live latest + default. If latest != NO_HANDLE,
1763                // emit it. Otherwise, if default != NO_HANDLE, emit default.
1764                // Otherwise, emit only Complete (empty stream, no default).
1765                let (latest, default) = {
1766                    let s = self.lock_state();
1767                    let scratch = scratch_ref::<LastState>(&s, node_id);
1768                    (scratch.latest, scratch.default)
1769                };
1770                let to_emit = if latest != crate::handle::NO_HANDLE {
1771                    Some(latest)
1772                } else if default != crate::handle::NO_HANDLE {
1773                    Some(default)
1774                } else {
1775                    None
1776                };
1777                if let Some(h) = to_emit {
1778                    // Emission needs its own retain — the LastState slot
1779                    // keeps its share until reset/Drop.
1780                    self.binding.retain_handle(h);
1781                    self.commit_emission_verbatim(node_id, h);
1782                }
1783                self.complete(node_id);
1784            }
1785            Some(TerminalKind::Error(h)) => {
1786                // Take a fresh share for the error cascade — the
1787                // dep_records[0].terminal slot keeps its own share
1788                // (released by reset_for_fresh_lifecycle / Drop).
1789                self.binding.retain_handle(h);
1790                self.error(node_id, h);
1791            }
1792        }
1793    }
1794
1795    pub(crate) fn deliver_data_to_consumer(
1796        &self,
1797        s: &mut CoreState,
1798        consumer_id: NodeId,
1799        dep_idx: usize,
1800        handle: HandleId,
1801    ) {
1802        // Retain the handle for the batch accumulation slot — each DATA
1803        // handle in `data_batch` owns a retain share, released at wave-end
1804        // rotation in `clear_wave_state`.
1805        self.binding.retain_handle(handle);
1806
1807        let is_dynamic;
1808        let is_state;
1809        let tracked_or_first_fire;
1810        // Slice F audit close (2026-05-07): default-mode pause suppression.
1811        // If the consumer is paused with `PausableMode::Default`, the
1812        // canonical-spec §2.6 behavior is to suppress fn-fire and consolidate
1813        // pause-window dep deliveries into one fn execution on RESUME.
1814        // Mark `pending_wave` on the pause state instead of adding to
1815        // `pending_fires`. The dep state still advances (the data_batch push
1816        // above is unchanged), and clear_wave_state still rotates the latest
1817        // dep DATA into prev_data — so when the fn ultimately fires on
1818        // RESUME, it sees the consolidated post-pause state.
1819        let suppressed_for_default_pause;
1820        {
1821            let consumer = s.require_node_mut(consumer_id);
1822            consumer.dep_records[dep_idx].data_batch.push(handle);
1823            consumer.dep_records[dep_idx].involved_this_wave = true;
1824            consumer.involved_this_wave = true;
1825            is_dynamic = consumer.is_dynamic;
1826            is_state = consumer.is_state();
1827            tracked_or_first_fire = !consumer.has_fired_once || consumer.tracked.contains(&dep_idx);
1828            suppressed_for_default_pause = consumer.pause_state.is_paused()
1829                && consumer.pausable == crate::node::PausableMode::Default;
1830            if suppressed_for_default_pause {
1831                consumer.pause_state.mark_pending_wave();
1832            }
1833        }
1834        if suppressed_for_default_pause {
1835            // Default-mode pause: don't add to pending_fires; RESUME will
1836            // schedule one consolidated fire.
1837            return;
1838        }
1839        if is_state {
1840            // State nodes don't have deps; unreachable in practice.
1841        } else if is_dynamic {
1842            if tracked_or_first_fire {
1843                s.pending_fires.insert(consumer_id);
1844            }
1845        } else {
1846            // Derived / Operator / Producer (Producer has no deps so won't
1847            // reach here, but the predicate-based dispatch handles it
1848            // uniformly).
1849            s.pending_fires.insert(consumer_id);
1850        }
1851    }
1852
1853    // -------------------------------------------------------------------
1854    // Subscriber notification
1855    // -------------------------------------------------------------------
1856
1857    /// Queue a wave-end message for `node_id`'s subscribers.
1858    ///
1859    /// **Revision-tracked sink-snapshot batches (Slice X4 / D2,
1860    /// 2026-05-08):** each push for a given node either appends the
1861    /// message to the open batch (if `NodeRecord::subscribers_revision`
1862    /// hasn't advanced since that batch opened — the common case — no
1863    /// extra allocation), or opens a fresh batch with a current sink
1864    /// snapshot frozen at the new revision. A sub installed mid-wave
1865    /// bumps `subscribers_revision`; the next `queue_notify` for the
1866    /// same node observes the bump and starts a new batch that includes
1867    /// the new sub. Pre-subscribe batches retain their original snapshot,
1868    /// so earlier emits flush to their original sink list — the new sub
1869    /// does NOT double-receive them via flush AND handshake replay,
1870    /// closing the late-subscriber + multi-emit-per-wave R1.3.5.a gap.
1871    ///
1872    /// Pause routing decision (R1.3.7.b tier table, §10.2 buffering):
1873    ///   Tier 3 (DATA / RESOLVED) and Tier 4 (INVALIDATE) buffer while
1874    ///   paused; all other tiers (DIRTY tier 1, PAUSE/RESUME tier 2,
1875    ///   COMPLETE/ERROR tier 5, TEARDOWN tier 6) bypass the buffer and
1876    ///   flush immediately. START (tier 0) is per-subscription and never
1877    ///   transits queue_notify.
1878    pub(crate) fn queue_notify(&self, s: &mut CoreState, node_id: NodeId, msg: Message) {
1879        // R1.3.3.a / R1.3.3.d (Slice G — re-added 2026-05-07): dev-mode
1880        // wave-content invariant assertion. The tier-3 slot at one node in
1881        // one wave is either ≥1 DATA or exactly 1 RESOLVED — never mixed,
1882        // never multiple RESOLVED. Slice G moved equals substitution from
1883        // per-emit to wave-end coalescing; this assert pins that the
1884        // dispatcher itself never queues a violating combination at the
1885        // queue_notify granularity. Resolved arrivals come from:
1886        //   1. The auto-resolve sweep in `drain_and_flush` (gates on
1887        //      `!any tier-3` so it can't add to a wave with Data).
1888        //   2. The wave-end equals-substitution pass (rewrites in place,
1889        //      doesn't go through queue_notify).
1890        // Both honor R1.3.3.a by construction post-Slice-G.
1891        #[cfg(debug_assertions)]
1892        if matches!(msg.tier(), 3) {
1893            if let Some(entry) = s.pending_notify.get(&node_id) {
1894                // Walk all batches' messages — R1.3.3.a is a per-node
1895                // wave-content invariant, not per-batch (the X4 batches
1896                // are subscriber-snapshot epochs; the protocol-level
1897                // tier-3 invariant spans the whole wave for the node).
1898                let has_data = entry.iter_messages().any(|m| matches!(m, Message::Data(_)));
1899                let resolved_count = entry
1900                    .iter_messages()
1901                    .filter(|m| matches!(m, Message::Resolved))
1902                    .count();
1903                let incoming_is_data = matches!(msg, Message::Data(_));
1904                if incoming_is_data {
1905                    debug_assert!(
1906                        resolved_count == 0,
1907                        "R1.3.3.a violation at {node_id:?}: queueing Data into a \
1908                         wave that already contains Resolved — Slice G should have \
1909                         prevented this via wave-end coalescing"
1910                    );
1911                } else {
1912                    debug_assert!(
1913                        !has_data,
1914                        "R1.3.3.a violation at {node_id:?}: queueing Resolved into a \
1915                         wave that already contains Data"
1916                    );
1917                    debug_assert!(
1918                        resolved_count == 0,
1919                        "R1.3.3.a violation at {node_id:?}: multiple Resolved in one \
1920                         wave at one node"
1921                    );
1922                }
1923            }
1924        }
1925
1926        let buffered_tier = matches!(msg.tier(), 3 | 4);
1927        let cap = s.pause_buffer_cap;
1928
1929        // Pause-routing branch — handles its own retain/release and returns
1930        // before we touch `pending_notify`, so the rec borrow is contained.
1931        {
1932            let rec = s.require_node_mut(node_id);
1933            if rec.subscribers.is_empty() {
1934                return;
1935            }
1936            // Slice F audit close (2026-05-07): pause routing depends on mode.
1937            //   - `ResumeAll`: buffer tier-3/4 for verbatim replay on RESUME.
1938            //   - `Default` + STATE node: state nodes have no fn-fire to
1939            //     suppress, so buffer like resumeAll (collapse-to-latest is
1940            //     a future enhancement; v1 keeps verbatim).
1941            //   - `Default` + COMPUTE node: suppression happens upstream at
1942            //     fn-fire scheduling (see `deliver_data_to_consumer`); no
1943            //     outgoing tier-3 is produced from this node while paused,
1944            //     so this branch is unreachable for compute-default-paused.
1945            //     Fallthrough to the non-paused queue path is fine.
1946            //   - `Off`: pause is ignored entirely — tier-3 flushes
1947            //     immediately. Fallthrough.
1948            let mode_buffers_tier3 = match rec.pausable {
1949                crate::node::PausableMode::ResumeAll => true,
1950                crate::node::PausableMode::Default => rec.is_state(),
1951                crate::node::PausableMode::Off => false,
1952            };
1953            if buffered_tier && mode_buffers_tier3 && rec.pause_state.is_paused() {
1954                if let Some(h) = msg.payload_handle() {
1955                    self.binding.retain_handle(h);
1956                }
1957                let push_result = rec.pause_state.push_buffered(msg, cap);
1958                for dm in push_result.dropped_msgs {
1959                    if let Some(h) = dm.payload_handle() {
1960                        self.binding.release_handle(h);
1961                    }
1962                }
1963                // R1.3.8.c (Slice F, A3): on first overflow this cycle,
1964                // schedule a synthesized ERROR for wave-end emission.
1965                // `cap` is `Some` here (an overflow can only happen with a
1966                // configured cap), so `unwrap` is safe.
1967                if push_result.first_overflow_this_cycle {
1968                    if let Some((dropped_count, lock_held_ns)) =
1969                        rec.pause_state.overflow_diagnostic()
1970                    {
1971                        s.pending_pause_overflow
1972                            .push(crate::node::PendingPauseOverflow {
1973                                node_id,
1974                                dropped_count,
1975                                configured_max: cap.unwrap_or(0),
1976                                lock_held_ns,
1977                            });
1978                    }
1979                }
1980                return;
1981            }
1982        }
1983
1984        // Non-paused queue path: retain payload handle and queue into
1985        // pending_notify. Released in `flush_notifications` after sinks
1986        // fire.
1987        if let Some(h) = msg.payload_handle() {
1988            self.binding.retain_handle(h);
1989        }
1990        Self::push_into_pending_notify(s, node_id, msg);
1991    }
1992
1993    /// Slice X4 / D2: revision-tracked batch decision for `queue_notify`'s
1994    /// non-paused path. Either appends `msg` to the open batch (if
1995    /// `subscribers_revision` hasn't advanced since it opened — common
1996    /// case, no extra allocation) or opens a fresh batch with a current
1997    /// sink snapshot frozen at the new revision.
1998    ///
1999    /// Borrow discipline: reads `subscribers_revision` and the snapshot
2000    /// from `s.nodes` BEFORE calling `s.pending_notify.entry()` to keep
2001    /// the two field borrows disjoint (split-borrow through
2002    /// `require_node_mut` defeats the borrow checker).
2003    ///
2004    /// Lock-discipline assumption: this read of `subscribers_revision`
2005    /// is safe because both the subscribe install path
2006    /// ([`crate::node::Core::subscribe`]) and `queue_notify` hold
2007    /// `CoreState`'s mutex when they bump / read the revision —
2008    /// concurrent subscribe/unsubscribe cannot interleave. **If
2009    /// `Core::subscribe` ever moves the sink-install lock-released
2010    /// (mirroring the lock-released drain refactor), the revision read
2011    /// here must re-validate post-borrow — otherwise a fresh batch
2012    /// could open with a stale snapshot.**
2013    fn push_into_pending_notify(s: &mut CoreState, node_id: NodeId, msg: Message) {
2014        let current_rev = s.require_node(node_id).subscribers_revision;
2015        let needs_new_batch = s.pending_notify.get(&node_id).is_none_or(|entry| {
2016            entry
2017                .batches
2018                .last()
2019                .is_none_or(|b| b.snapshot_revision != current_rev)
2020        });
2021        let sinks_snapshot: Vec<Sink> = if needs_new_batch {
2022            s.require_node(node_id)
2023                .subscribers
2024                .values()
2025                .cloned()
2026                .collect()
2027        } else {
2028            Vec::new()
2029        };
2030        match s.pending_notify.entry(node_id) {
2031            Entry::Vacant(slot) => {
2032                let mut batches: SmallVec<[PendingBatch; 1]> = SmallVec::new();
2033                batches.push(PendingBatch {
2034                    snapshot_revision: current_rev,
2035                    sinks: sinks_snapshot,
2036                    messages: vec![msg],
2037                });
2038                slot.insert(PendingPerNode { batches });
2039            }
2040            Entry::Occupied(mut slot) => {
2041                let entry = slot.get_mut();
2042                if needs_new_batch {
2043                    entry.batches.push(PendingBatch {
2044                        snapshot_revision: current_rev,
2045                        sinks: sinks_snapshot,
2046                        messages: vec![msg],
2047                    });
2048                } else {
2049                    entry
2050                        .batches
2051                        .last_mut()
2052                        .expect("non-empty by construction (entry exists implies batch exists)")
2053                        .messages
2054                        .push(msg);
2055                }
2056            }
2057        }
2058    }
2059
2060    /// Collect wave-end sink-fire jobs into `s.deferred_flush_jobs` and the
2061    /// payload-handle releases owed for `pending_notify` into
2062    /// `s.deferred_handle_releases`. The actual sink fires + handle releases
2063    /// run **after** the state lock is dropped — see [`Core::run_wave`].
2064    ///
2065    /// R1.3.1.b two-phase propagation: phase 1 (DIRTY) propagates through
2066    /// the entire graph before phase 2 (DATA / RESOLVED) begins. Implemented
2067    /// here as cross-node tier-then-node collect — phase 1's jobs sit before
2068    /// phase 2's in `deferred_flush_jobs`, so when `run_wave` drains the
2069    /// queue lock-released, multi-node subscribers see all DIRTYs before any
2070    /// settle. Matches TS's drainPhase model without the per-tier queue
2071    /// indirection.
2072    ///
2073    /// Phase ordering:
2074    ///   1 → tier 1   (DIRTY)
2075    ///   2 → tier 3+4 (DATA/RESOLVED + INVALIDATE — the "settle slice")
2076    ///   3 → tier 5   (COMPLETE/ERROR)
2077    ///   4 → tier 6   (TEARDOWN)
2078    ///
2079    /// Tier 0 (START) is per-subscription (never enters pending_notify) and
2080    /// tier 2 (PAUSE/RESUME) is delivered through dedicated paths, also
2081    /// bypassing pending_notify; both are absent from this enumeration.
2082    ///
2083    /// Within a single phase, per-node insertion order (IndexMap iteration)
2084    /// is preserved — an emit on A before B → A's phase-2 messages flush
2085    /// before B's. Within a single node, message order is preserved.
2086    fn flush_notifications(&self, s: &mut CoreState) {
2087        const PHASES: &[&[u8]] = &[
2088            &[1],    // DIRTY
2089            &[3, 4], // DATA/RESOLVED + INVALIDATE
2090            &[5],    // COMPLETE/ERROR
2091            &[6],    // TEARDOWN
2092        ];
2093        let pending = std::mem::take(&mut s.pending_notify);
2094        for &phase_tiers in PHASES {
2095            for (_node_id, entry) in &pending {
2096                // Slice X4 / D2: iterate batches in arrival order. Each
2097                // batch carries its own sink snapshot frozen at open-time;
2098                // a batch's messages flush to ITS sinks only. Within a
2099                // single (phase, node), batches stay in arrival order so
2100                // emit-order semantics are preserved across batches.
2101                for batch in &entry.batches {
2102                    if batch.sinks.is_empty() {
2103                        continue;
2104                    }
2105                    let phase_msgs: Vec<Message> = batch
2106                        .messages
2107                        .iter()
2108                        .copied()
2109                        .filter(|m| phase_tiers.contains(&m.tier()))
2110                        .collect();
2111                    if phase_msgs.is_empty() {
2112                        continue;
2113                    }
2114                    let sinks_clone: Vec<Sink> = batch.sinks.iter().map(Arc::clone).collect();
2115                    s.deferred_flush_jobs.push((sinks_clone, phase_msgs));
2116                }
2117            }
2118        }
2119        // Refcount release: balance the retain done in `queue_notify` for
2120        // every payload-bearing message that landed in pending_notify
2121        // (across ALL batches per node). Deferred to post-lock-drop so the
2122        // binding's release path can't re-enter Core under our lock.
2123        for entry in pending.values() {
2124            for msg in entry.iter_messages() {
2125                if let Some(h) = msg.payload_handle() {
2126                    s.deferred_handle_releases.push(h);
2127                }
2128            }
2129        }
2130    }
2131
2132    /// Take the deferred sink-fire jobs, payload-handle releases,
2133    /// cleanup-hook fire queue, and pending-wipe queue from `CoreState`.
2134    /// Callers pair this with a `drop(state_guard)` and a subsequent
2135    /// [`Self::fire_deferred`] to deliver the wave's sinks + handle
2136    /// releases + Slice E2 OnInvalidate cleanup hooks + Slice E2 /qa
2137    /// Q2(b) eager wipe_ctx fires lock-released.
2138    pub(crate) fn drain_deferred(s: &mut CoreState) -> WaveDeferred {
2139        (
2140            std::mem::take(&mut s.deferred_flush_jobs),
2141            std::mem::take(&mut s.deferred_handle_releases),
2142            std::mem::take(&mut s.deferred_cleanup_hooks),
2143            std::mem::take(&mut s.pending_wipes),
2144        )
2145    }
2146
2147    /// Fire deferred sink-fire jobs in collected order, then release the
2148    /// payload handles owed for messages that landed in `pending_notify`
2149    /// during the wave, then fire any queued Slice E2 OnInvalidate cleanup
2150    /// hooks. All three phases run lock-released so:
2151    /// - Sinks that call back into Core (emit, pause, etc.) re-acquire the
2152    ///   state lock cleanly and run their own nested wave.
2153    /// - The binding's `release_handle` path can't deadlock against a
2154    ///   binding-side mutex held by Core.
2155    /// - User cleanup closures (invoked via `BindingBoundary::cleanup_for`)
2156    ///   may safely re-enter Core for unrelated nodes.
2157    ///
2158    /// **Cleanup-drain panic discipline (D060):** each `cleanup_for` call
2159    /// is wrapped in `catch_unwind` so a single binding panic doesn't
2160    /// short-circuit the per-wave drain. All queued cleanup attempts run;
2161    /// if any panicked, the LAST panic re-raises after the loop completes
2162    /// (preserving wave-end discipline while still surfacing failures).
2163    /// Per D060, Core stays panic-naive about user code — bindings own
2164    /// their host-language panic policy inside `cleanup_for`; this
2165    /// `catch_unwind` is purely about drain-don't-short-circuit.
2166    pub(crate) fn fire_deferred(
2167        &self,
2168        jobs: DeferredJobs,
2169        releases: Vec<HandleId>,
2170        cleanup_hooks: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)>,
2171        pending_wipes: Vec<crate::handle::NodeId>,
2172    ) {
2173        // Slice E2 /qa P1 (2026-05-07): wrap each sink-fire in
2174        // `catch_unwind` so a panicking sink doesn't unwind out of
2175        // `fire_deferred` and drop the queued `releases` +
2176        // `cleanup_hooks`. Mirrors Slice F audit fix A7's per-tier
2177        // handshake-fire discipline. Without this guard, a sink panic
2178        // here would silently leak handle retains AND silently drop
2179        // OnInvalidate cleanup hooks. AssertUnwindSafe is safe because
2180        // we re-raise the last panic at the end after running every
2181        // queued fire — drain ordering is preserved.
2182        let mut last_panic: Option<Box<dyn std::any::Any + Send>> = None;
2183        for (sinks, msgs) in jobs {
2184            for sink in &sinks {
2185                let sink = sink.clone();
2186                let msgs_ref = &msgs;
2187                let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2188                    sink(msgs_ref);
2189                }));
2190                if let Err(payload) = result {
2191                    last_panic = Some(payload);
2192                }
2193            }
2194        }
2195        for h in releases {
2196            self.binding.release_handle(h);
2197        }
2198        // Slice E2 (D060): drain cleanup hooks with per-item panic
2199        // isolation so the loop always completes. AssertUnwindSafe is
2200        // safe here because we don't rely on logical state being valid
2201        // post-panic — the panic propagates anyway after the drain ends.
2202        for (node_id, trigger) in cleanup_hooks {
2203            let binding = &self.binding;
2204            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2205                binding.cleanup_for(node_id, trigger);
2206            }));
2207            if let Err(payload) = result {
2208                last_panic = Some(payload);
2209            }
2210        }
2211        // Slice E2 /qa Q2(b) (D069): drain eager wipe_ctx queue with the
2212        // same per-item panic isolation. Fires AFTER cleanup hooks so a
2213        // resubscribable node's OnInvalidate (or any tier-3+ cleanup that
2214        // fires in the same wave) sees pre-wipe binding state if it
2215        // landed in the same wave as the terminal cascade. Mutually
2216        // exclusive with `Subscription::Drop`'s direct-fire site, but
2217        // even concurrent fires are idempotent (binding's `wipe_ctx`
2218        // calls `HashMap::remove` which is a no-op on absent keys).
2219        for node_id in pending_wipes {
2220            let binding = &self.binding;
2221            let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(move || {
2222                binding.wipe_ctx(node_id);
2223            }));
2224            if let Err(payload) = result {
2225                last_panic = Some(payload);
2226            }
2227        }
2228        if let Some(payload) = last_panic {
2229            std::panic::resume_unwind(payload);
2230        }
2231    }
2232
2233    // -------------------------------------------------------------------
2234    // User-facing batch — coalesce multiple emits into one wave
2235    // -------------------------------------------------------------------
2236
2237    /// Coalesce multiple emissions into a single wave. Every `emit` /
2238    /// `complete` / `error` / `teardown` / `invalidate` call inside `f`
2239    /// queues its downstream work; the wave drains when `f` returns.
2240    ///
2241    /// **R1.3.6.a** — DIRTY still propagates immediately (tier 1 isn't
2242    /// deferred); only tier-3+ delivery is held until scope exit. **R1.3.6.b**
2243    /// — repeated emits on the same node coalesce into a single multi-message
2244    /// delivery (one [`Message::Dirty`] for the wave + one [`Message::Data`]
2245    /// per emit, all delivered together in the per-node phase-2 pass).
2246    ///
2247    /// Nested `batch()` calls share the outer wave; only the outermost call
2248    /// drives the drain. Re-entrant calls from inside an `emit`/fn (the wave
2249    /// engine's own `s.in_tick` re-entrance) compose with this method
2250    /// transparently — they observe `in_tick = true` and skip drain just
2251    /// like nested `batch()`.
2252    ///
2253    /// On panic inside `f`, the `BatchGuard` returned by the internal
2254    /// `begin_batch` call drops normally and discards pending tier-3+ work
2255    /// (subscribers do not observe the half-built wave). See
2256    /// [`Core::begin_batch`] for the RAII variant if you need explicit control
2257    /// over the scope boundary.
2258    pub fn batch<F>(&self, f: F)
2259    where
2260        F: FnOnce(),
2261    {
2262        let _guard = self.begin_batch();
2263        f();
2264    }
2265
2266    /// RAII batch handle — opens a wave when constructed, drains on drop.
2267    ///
2268    /// Mirrors the closure-based [`Self::batch`] but exposes the scope
2269    /// boundary so callers can compose batches with non-`FnOnce` control
2270    /// flow (e.g. async-state-machine code paths, or splitting setup and
2271    /// drain across helper functions).
2272    ///
2273    /// ```ignore
2274    /// let g = core.begin_batch();
2275    /// core.emit(state_a, h1);
2276    /// core.emit(state_b, h2);
2277    /// drop(g); // wave drains here
2278    /// ```
2279    ///
2280    /// Like the closure form, nested `begin_batch` calls share the outer
2281    /// wave (only the outermost guard drains).
2282    #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
2283    pub fn begin_batch(&self) -> BatchGuard {
2284        // Slice Y1 / Phase E (2026-05-08): closure-form batch has no known
2285        // seed; per session-doc Q7 / D092 it MUST serialize against every
2286        // currently-existing partition. Acquire each partition's
2287        // `wave_owner` in ascending [`SubgraphId`] order via the retry-
2288        // validate primitive. Same-thread re-entry passes through each
2289        // ReentrantMutex transparently; cross-thread waves on any of the
2290        // touched partitions block until our `_wave_guards` drop.
2291        //
2292        // Trade-off (documented v1 contract): closure-form batch is the
2293        // serialization point under per-partition parallelism. Per-seed
2294        // entry points (`Core::subscribe`, [`Self::begin_batch_for`])
2295        // acquire only the touched partitions and run truly parallel
2296        // for disjoint partitions. Closure-form is intentionally slower
2297        // — its all-partitions semantic is what makes the closure's
2298        // `core.emit(s1, ...); core.emit(s2, ...)` coalesce regardless
2299        // of which partitions s1 / s2 live in.
2300        let partition_boxes = self.all_partitions_lock_boxes();
2301        let mut wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]> = SmallVec::new();
2302        for (sid, _box) in &partition_boxes {
2303            // Use the partition's root NodeId as the lock_for retry seed.
2304            // SubgraphId.raw() == root NodeId.raw(); the root is always
2305            // registered in the X5 / Phase-E substrate (cleanup_node is
2306            // gated, Phase G activates).
2307            let representative = crate::handle::NodeId::new(sid.raw());
2308            wave_guards.push(self.partition_wave_owner_lock_arc(representative));
2309        }
2310        self.begin_batch_with_guards(wave_guards)
2311    }
2312
2313    /// Begin a batch scoped to the partitions transitively touched from
2314    /// `seed`. Walks `s.children` (downstream cascade) + `meta_companions`
2315    /// (R1.3.9.d TEARDOWN cascade) starting at `seed`, collects every
2316    /// reachable partition, and acquires each in ascending
2317    /// [`crate::subgraph::SubgraphId`] order via
2318    /// [`Core::partition_wave_owner_lock_arc`].
2319    ///
2320    /// Two threads with disjoint touched-partition sets run truly
2321    /// parallel — the per-partition `wave_owner` mutexes don't block
2322    /// each other. This is the canonical Y1 parallelism win for
2323    /// per-seed wave-driving entry points (subscribe, emit, pause,
2324    /// resume, invalidate, complete, error, teardown,
2325    /// set_deps push-on-subscribe).
2326    ///
2327    /// Slice Y1 / Phase E (2026-05-08).
2328    #[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
2329    pub fn begin_batch_for(&self, seed: crate::handle::NodeId) -> BatchGuard {
2330        let touched = self.compute_touched_partitions(seed);
2331        let mut wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]> = SmallVec::new();
2332        for sid in &touched {
2333            let representative = crate::handle::NodeId::new(sid.raw());
2334            wave_guards.push(self.partition_wave_owner_lock_arc(representative));
2335        }
2336        self.begin_batch_with_guards(wave_guards)
2337    }
2338
2339    /// Internal helper: claim `in_tick` and assemble a [`BatchGuard`]
2340    /// with the supplied (already-acquired) partition wave-owner guards.
2341    /// `wave_guards` MUST be in ascending [`crate::subgraph::SubgraphId`]
2342    /// order (the canonical lock-acquisition order) — both
2343    /// [`Self::begin_batch`] (all-partitions) and
2344    /// [`Self::begin_batch_for`] (touched-partitions) construct the
2345    /// vector in that order before calling here.
2346    fn begin_batch_with_guards(
2347        &self,
2348        wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]>,
2349    ) -> BatchGuard {
2350        let owns_tick = {
2351            let mut s = self.lock_state();
2352            let was_in = s.in_tick;
2353            if !was_in {
2354                s.in_tick = true;
2355            }
2356            !was_in
2357        };
2358        BatchGuard {
2359            core: self.clone(),
2360            owns_tick,
2361            _wave_guards: wave_guards,
2362            _not_send: std::marker::PhantomData,
2363        }
2364    }
2365}
2366
2367/// RAII guard returned by [`Core::begin_batch`].
2368///
2369/// While alive, suppresses per-emit wave drains — multiple `emit` /
2370/// `complete` / `error` / `teardown` / `invalidate` calls coalesce into one
2371/// wave. On drop:
2372/// - Outermost guard: drains the wave (fires sinks, runs cleanup, clears
2373///   in-tick).
2374/// - Nested guard (an outer `BatchGuard` or an in-progress wave already owns
2375///   the in-tick flag): silently no-ops.
2376///
2377/// On thread panic during the closure body, the drop path discards pending
2378/// tier-3+ delivery rather than firing sinks (avoids cascading panics).
2379/// Subscribers observe **no tier-3+ delivery for the panicked wave**.
2380/// State-node cache writes that already executed inside the closure are
2381/// rolled back via wave-cache snapshots — `cache_of(s)` returns the pre-
2382/// panic value. The atomicity guarantee covers both sink-observability and
2383/// cache state.
2384///
2385/// # Thread safety
2386///
2387/// `BatchGuard` is **`!Send`** by design. `begin_batch` claims the
2388/// per-`Core` `in_tick` flag AND the per-partition `wave_owner`
2389/// re-entrant mutex(es) on the calling thread; sending the guard to
2390/// another thread and dropping it there would clear `in_tick` and
2391/// release the wave-owner guards from a different thread than the
2392/// one that acquired them, breaking both the thread-local "I own
2393/// the wave scope" semantic and `parking_lot::ReentrantMutex`'s
2394/// ownership invariant. The `_wave_guards` field is a `SmallVec` of
2395/// `!Send` `ArcReentrantMutexGuard<()>`; the `PhantomData<*const ()>`
2396/// marker is belt-and-suspenders.
2397///
2398/// Slice Y1 / Phase E (2026-05-08): the field migrated from a single
2399/// `ArcReentrantMutexGuard` (legacy Core-global `wave_owner`) to a
2400/// `SmallVec` of partition wave-owner guards. Closure-form
2401/// `begin_batch` acquires every current partition (serialization
2402/// point); `begin_batch_for(seed)` acquires only the transitively-
2403/// touched partitions (parallel for disjoint sets).
2404///
2405/// ```compile_fail
2406/// use graphrefly_core::{BatchGuard, BindingBoundary, Core, DepBatch, FnId, FnResult, HandleId, NodeId};
2407/// use std::sync::Arc;
2408///
2409/// struct Stub;
2410/// impl BindingBoundary for Stub {
2411///     fn invoke_fn(&self, _: NodeId, _: FnId, _: &[DepBatch]) -> FnResult {
2412///         FnResult::Noop { tracked: None }
2413///     }
2414///     fn custom_equals(&self, _: FnId, _: HandleId, _: HandleId) -> bool { false }
2415///     fn release_handle(&self, _: HandleId) {}
2416/// }
2417/// fn requires_send<T: Send>(_: T) {}
2418/// let core = Core::new(Arc::new(Stub) as Arc<dyn BindingBoundary>);
2419/// let guard = core.begin_batch();
2420/// requires_send(guard); // <- compile_fail: BatchGuard is !Send.
2421/// ```
2422#[must_use = "BatchGuard drains the wave on drop; assign to a named binding"]
2423pub struct BatchGuard {
2424    core: Core,
2425    owns_tick: bool,
2426    /// Re-entrant mutex guards held for the wave's duration. One entry
2427    /// per touched partition's `wave_owner`, in ascending
2428    /// [`crate::subgraph::SubgraphId`] order. Drop releases each guard
2429    /// (any order — `parking_lot::ReentrantMutex` doesn't care since all
2430    /// are held by the same thread). Cross-thread waves on any of the
2431    /// held partitions block until our scope ends; cross-thread waves
2432    /// on partitions NOT in this vector run truly parallel — the
2433    /// canonical Y1 parallelism property.
2434    ///
2435    /// Each `ArcReentrantMutexGuard<()>` is `!Send`, so the `SmallVec`
2436    /// (and thus `BatchGuard`) is `!Send` at the type level — sending
2437    /// across threads would violate `parking_lot::ReentrantMutex`'s
2438    /// thread-ownership invariant.
2439    _wave_guards: SmallVec<[crate::node::WaveOwnerGuard; 4]>,
2440    _not_send: std::marker::PhantomData<*const ()>,
2441}
2442
2443impl Drop for BatchGuard {
2444    fn drop(&mut self) {
2445        if !self.owns_tick {
2446            return;
2447        }
2448        if std::thread::panicking() {
2449            // Discard pending wave work to avoid firing sinks during
2450            // unwind (sink panic during unwind would abort the process).
2451            //
2452            // Refcount discipline: pending_notify entries (or any
2453            // already-collected deferred_handle_releases from a partial
2454            // drain) hold a queue_notify-time retain on every payload
2455            // handle. Release them here so the discard doesn't leak.
2456            //
2457            // Wave-cache snapshots restore pre-panic cache values so the
2458            // atomicity guarantee covers state, not just observability.
2459            let (pending, deferred_releases, restored_releases) = {
2460                let mut s = self.core.lock_state();
2461                let pending = std::mem::take(&mut s.pending_notify);
2462                let _: DeferredJobs = std::mem::take(&mut s.deferred_flush_jobs);
2463                s.pending_fires.clear();
2464                let restored = self.core.restore_wave_cache_snapshots(&mut s);
2465                // clear_wave_state pushes batch-handle releases into
2466                // deferred_handle_releases, so take it AFTER the clear.
2467                s.clear_wave_state();
2468                let deferred_releases = std::mem::take(&mut s.deferred_handle_releases);
2469                // Slice E2 (D061): panic-discard wave drops queued
2470                // OnInvalidate cleanup hooks SILENTLY. Bindings using
2471                // OnInvalidate for external-resource cleanup MUST
2472                // idempotent-cleanup at process exit / next successful
2473                // invalidate. Mirrors A3 `pending_pause_overflow`
2474                // panic-discard precedent.
2475                let _: Vec<(crate::handle::NodeId, crate::boundary::CleanupTrigger)> =
2476                    std::mem::take(&mut s.deferred_cleanup_hooks);
2477                // Slice E2 /qa Q2(b) (D069): same panic-discard discipline
2478                // for the eager-wipe queue. A panic-discarded wave drops
2479                // queued `wipe_ctx` fires silently; the binding-side
2480                // `NodeCtxState` entry remains until the next successful
2481                // terminate-with-no-subs cycle (or until `Core` drops).
2482                // This mirrors D061's external-resource-cleanup gap and
2483                // is documented similarly.
2484                let _: Vec<crate::handle::NodeId> = std::mem::take(&mut s.pending_wipes);
2485                s.in_tick = false;
2486                (pending, deferred_releases, restored)
2487            };
2488            // Lock dropped — release retains lock-released so the binding
2489            // can't deadlock against an internal binding mutex.
2490            for entry in pending.values() {
2491                for msg in entry.iter_messages() {
2492                    if let Some(h) = msg.payload_handle() {
2493                        self.core.binding.release_handle(h);
2494                    }
2495                }
2496            }
2497            for h in deferred_releases {
2498                self.core.binding.release_handle(h);
2499            }
2500            for h in restored_releases {
2501                self.core.binding.release_handle(h);
2502            }
2503            return;
2504        }
2505        // Successful drain — drain_and_flush manages its own locking.
2506        self.core.drain_and_flush();
2507        // Wave cleanup + extract deferred jobs under the lock.
2508        let (jobs, releases, cleanup_hooks, pending_wipes) = {
2509            let mut s = self.core.lock_state();
2510            s.clear_wave_state();
2511            s.in_tick = false;
2512            self.core.commit_wave_cache_snapshots(&mut s);
2513            // `drain_deferred` takes `deferred_flush_jobs` +
2514            // `deferred_handle_releases` (incl. rotation releases pushed
2515            // by `clear_wave_state` above) + Slice E2
2516            // `deferred_cleanup_hooks` + Slice E2 /qa Q2(b)
2517            // `pending_wipes`.
2518            Core::drain_deferred(&mut s)
2519        };
2520        // Lock dropped — fire deferred sinks + release retains + fire
2521        // cleanup hooks (Slice E2 OnInvalidate, D060 catch_unwind drain)
2522        // + fire eager wipes (D069).
2523        self.core
2524            .fire_deferred(jobs, releases, cleanup_hooks, pending_wipes);
2525    }
2526}