Skip to main content

graphrefly_core/
node.rs

1//! The dispatcher — node registration, subscription, wave engine.
2//!
3//! Mirrors `~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`
4//! (the Phase 13.6 brainstorm prototype, ~370 lines, 22 invariant tests).
5//!
6//! # Scope (M1 dispatcher + Slice A+B parity, closed 2026-05-05)
7//!
8//! - State + derived + dynamic node registration.
9//! - Subscribe / unsubscribe with push-on-subscribe (R1.2.3).
10//! - RAII [`Subscription`] with Drop-based deregister (§10.12).
11//! - DIRTY → DATA / RESOLVED ordering (R1.3.1.b two-phase push).
12//! - Equals-substitution (R1.3.2): identity is zero-FFI; custom crosses boundary.
13//! - First-run gate (R2.5.3) — fn does not fire until every dep has a handle.
14//! - Diamond resolution — one fn fire per wave even with shared upstream.
15//! - `set_deps()` atomic dep mutation with cycle detection + Phase 13.8 Q1
16//!   terminal-rejection policy (R3.3.1).
17//! - PAUSE / RESUME with lockId set + replay buffer (R1.2.6, R2.6, §10.2).
18//! - INVALIDATE broadcast + cascade with R1.4 idempotency.
19//! - COMPLETE / ERROR cascade + Lock 2.B auto-cascade gating
20//!   (ERROR dominates COMPLETE; first error wins).
21//! - TEARDOWN auto-precedes COMPLETE (R2.6.4 / Lock 6.F) +
22//!   `has_received_teardown` idempotency.
23//! - Meta TEARDOWN ordering (R1.3.9.d) — companions tear down before parent.
24//! - Resubscribable terminal lifecycle (R2.2.7, R2.5.3) — late subscribe to a
25//!   resubscribable terminal node resets lifecycle, except after TEARDOWN
26//!   (per F3 audit guard: TEARDOWN is permanent).
27//!
28//! # Module split (Slice C-1, 2026-05-05)
29//!
30//! Wave-engine internals (drain loop, fire selection, emission commit, sink
31//! dispatch) live in [`crate::batch`]. The split is purely organizational —
32//! the methods are still on `Core`. See `batch.rs` for the wave-engine
33//! entry points (`run_wave`, `drain_and_flush`, `commit_emission`,
34//! `queue_notify`, `deliver_data_to_consumer`).
35//!
36//! # Out of scope (later slices / milestones)
37//!
38//! - Deactivation cleanup (RAM nodes clear cache when sink count → 0) — M2.
39//!
40//! See [`migration-status.md`](../../../docs/migration-status.md) for the
41//! milestone tracker and [`porting-deferred.md`](../../../docs/porting-deferred.md)
42//! for surfaced concerns deferred to evidence-driven slices.
43//!
44//! # Re-entrance discipline (Slice A close, M1: fully lock-released)
45//!
46//! - **Wave-end sink fires** drop the state lock first. A subscriber's sink
47//!   that calls back into `Core::emit` / `pause` / `resume` / `invalidate` /
48//!   `complete` / `error` / `teardown` re-acquires the lock cleanly and runs
49//!   a nested wave (`WaveState::in_tick` is cleared before the
50//!   deferred-fire phase).
51//! - **`BindingBoundary::invoke_fn`** fires lock-released. The wave engine
52//!   acquires + drops the state lock per fn-fire iteration around the
53//!   `invoke_fn` callback. User fns may re-enter `Core::emit` / `pause` /
54//!   etc. and run a nested wave.
55//! - **`BindingBoundary::custom_equals`** fires lock-released.
56//!   `commit_emission` brackets the equals check around a lock release;
57//!   custom equals oracles may re-enter Core safely.
58//! - **Subscribe-time handshake** also fires lock-released. [`Core::subscribe`]
59//!   acquires the [`Core::wave_owner`] re-entrant mutex first (cross-thread
60//!   serialization), installs the sink under the state lock, drops the state
61//!   lock, then fires the per-tier handshake (`[Start]` / `[Data(cache)]?` /
62//!   `[Complete]?` / `[Error(h)]?` / `[Teardown]?` per R1.3.5.a) lock-released.
63//!   A handshake-time sink callback may re-enter Core (`emit` / `complete` /
64//!   `error` / `subscribe`); same-thread re-entry passes through `wave_owner`
65//!   transparently. Cross-thread emits block on `wave_owner` until the
66//!   subscribe path drops it, preserving R1.3.5.a happens-after ordering.
67
68use std::collections::VecDeque;
69use std::panic::{catch_unwind, AssertUnwindSafe};
70use std::sync::{Arc, Weak};
71
72use ahash::{AHashMap as HashMap, AHashSet as HashSet};
73use parking_lot::{ArcReentrantMutexGuard, Mutex, MutexGuard};
74
75/// Held guard from `parking_lot::ReentrantMutex::lock_arc()` on a
76/// partition's `wave_owner`. `!Send` per `parking_lot::ReentrantMutex`'s
77/// thread-affinity contract (the inner guard is `!Send`; the wrapper
78/// inherits) — the type-level `!Send` flows into
79/// [`crate::batch::BatchGuard::_wave_guards`] so any attempt to send
80/// the batch guard across threads fails to compile.
81///
82/// **Phase H+ option (d) limited variant (2026-05-09):** the guard's
83/// [`Drop`] pops `sid` from the [`held_partitions`] thread-local so
84/// the ascending-order check on the next acquire sees the correct
85/// "currently held" set. Re-entrant acquires (same thread, same
86/// partition) increment a refcount in the thread-local; final drop
87/// removes the entry.
88///
89/// Slice Y1 / Phase E (2026-05-08); Phase H+ wrapper (2026-05-09).
90pub(crate) struct WaveOwnerGuard {
91    /// Drop order: the wrapper's Drop runs FIRST (pops `held_partitions`),
92    /// then `inner` drops automatically (releases the parking_lot lock).
93    /// Field-declaration order matters in Rust: the wrapper drops top-
94    /// down by default, so `inner` is listed AFTER `sid` so the wrapper's
95    /// custom Drop runs on the whole struct first, releasing the
96    /// thread-local entry under our control before the inner guard
97    /// hits parking_lot's release path.
98    sid: crate::subgraph::SubgraphId,
99    /// `#[allow(dead_code)]`: the inner guard is held to keep the
100    /// parking_lot::ReentrantMutex acquired for the wave's duration;
101    /// it's never read, only its `Drop` matters.
102    #[allow(dead_code)]
103    inner: ArcReentrantMutexGuard<parking_lot::RawMutex, parking_lot::RawThreadId, ()>,
104}
105
106impl Drop for WaveOwnerGuard {
107    fn drop(&mut self) {
108        // `held_partitions::release` returns `bool was_outermost`. The
109        // outermost-release signal is consumed by [`crate::batch::BatchGuard::drop`]
110        // for the per-thread `TIER3_EMITTED_THIS_WAVE` clear (D1 patch,
111        // 2026-05-09 — Slice G coalescing tracker is keyed by thread,
112        // not by partition, so the clear lives on `BatchGuard` not here).
113        // We discard the bool — no per-guard cleanup remains.
114        let _ = held_partitions::release(self.sid);
115        // `inner` drops automatically after this — releases the
116        // parking_lot::ReentrantMutex (decrementing parking_lot's
117        // own internal re-entry counter; only the FINAL release
118        // unparks waiters).
119    }
120}
121
122/// Phase H+ option (d) limited variant — thread-local infrastructure
123/// for cross-partition acquire-during-fire / cross-partition
124/// acquire-during-wave deadlock detection (extended /qa N1(a)
125/// 2026-05-09 to cover sink callbacks + any nested in-wave acquire).
126///
127/// **Protocol invariant enforced:** whenever this thread already
128/// holds at least one partition wave_owner (HELD non-empty), every
129/// NEW partition this thread tries to acquire must have an id
130/// strictly greater than every partition currently held. Re-entrant
131/// acquires (same thread, same partition that's already held) bypass
132/// the check (they're fine — `parking_lot::ReentrantMutex` allows
133/// same-thread re-entry transparently).
134///
135/// Without this check, two threads each doing nested cross-partition
136/// acquires within an active wave could form an AB/BA cycle: thread A
137/// holds X, attempts Y (Y < X); concurrently thread B holds Y,
138/// attempts X (X > Y, ascending-OK from B's POV). A's acquire on Y
139/// blocks behind B; B's acquire on X blocks behind A. Cycle.
140///
141/// **Single carve-out — producer build/project closures:** producer-
142/// pattern operator activation (`zip` / `concat` / `race` /
143/// `take_until` / `switch_map` / `exhaust_map` / `concat_map` /
144/// `merge_map`) runs build closures inside `BindingBoundary::invoke_fn`
145/// that legitimately subscribe to upstream sources cross-partition.
146/// Refactoring those operators to defer their inner subscribes to
147/// wave-end is the broader "Phase H+ STRICT variant" carry-forward
148/// (see `docs/porting-deferred.md`). For the limited variant, the
149/// thread-local [`IN_PRODUCER_BUILD`] refcount is incremented when a
150/// producer node enters its FiringGuard scope and the H+ check is
151/// suppressed for the duration. All other call paths (derived /
152/// dynamic user fns, sink callbacks, subscribe handshakes, drop
153/// cleanup) DO see the check.
154///
155/// **What this widening (/qa N1(a) 2026-05-09) catches that the
156/// original `fire_depth > 0` gate did NOT:**
157/// - Sink callbacks fired by `flush_notifications` lock-released
158///   AFTER drain — the outer wave's wave_owner is still held but
159///   no FiringGuard is on the stack. A sink that calls
160///   `core.subscribe(node_in_lower_partition, sink)` would have
161///   bypassed the original gate; now caught.
162/// - Subscribe-handshake-time cross-partition operations (similar
163///   shape — the handshake fires lock-released).
164/// - Any future re-entry path that doesn't go through `FiringGuard`
165///   (e.g., `Subscription::Drop` cleanup paths, R3.7-style graph
166///   destroy cascades).
167mod held_partitions {
168    use crate::subgraph::SubgraphId;
169    use smallvec::SmallVec;
170    use std::cell::RefCell;
171
172    /// Inline-storage capacity for the per-thread held-partitions set.
173    /// 4 mirrors the same inline limit used elsewhere in the codebase
174    /// (`compute_touched_partitions` returns `SmallVec<[SubgraphId; 4]>`,
175    /// `BatchGuard::_wave_guards` is `SmallVec<[WaveOwnerGuard; 4]>`).
176    /// In the typical wave a single thread holds 1–3 partitions; spillover
177    /// to the heap costs allocation but is correct.
178    const HELD_INLINE: usize = 4;
179
180    thread_local! {
181        /// Currently-held partitions on this thread, as `(SubgraphId, refcount)`
182        /// pairs in arbitrary order. The refcount mirrors
183        /// `parking_lot::ReentrantMutex`'s internal counter (we can't query
184        /// parking_lot's directly).
185        ///
186        /// `SmallVec<[_; HELD_INLINE]>` over `BTreeMap<_, _>`: under the
187        /// expected workload (≤4 partitions held simultaneously per wave) the
188        /// inline-storage SmallVec keeps the entire set in stack memory with
189        /// no allocation, no Box-per-node, and contiguous cache layout. Linear
190        /// scans are branch-predictable and faster than BTreeMap's logn
191        /// pointer-chasing for tiny N. Phase J post-widening bench
192        /// (`migration-status.md` 2026-05-09) reported 14–25% overhead vs the
193        /// pre-widening baseline, attributed in part to BTreeMap allocation
194        /// costs on the hot path. This swap recovers part of that overhead.
195        ///
196        /// Bookkeeping is unconditional — every acquire bumps the
197        /// refcount, every release decrements. The CHECK gate
198        /// (`!held.is_empty() && !in_producer_build()`) is what
199        /// distinguishes "first-time acquire on a fresh thread"
200        /// (allowed, held empty) from "nested acquire while we
201        /// already hold something" (must be ascending).
202        static HELD: RefCell<SmallVec<[(SubgraphId, u32); HELD_INLINE]>>
203            = const { RefCell::new(SmallVec::new_const()) };
204
205        /// Per-thread "we're inside a producer build/project closure"
206        /// refcount. Producer-pattern operator activation increments
207        /// this on `FiringGuard::new`; their build closures call
208        /// cross-partition `Core::subscribe` legitimately during
209        /// activation (operator-internal setup, not user-fn re-entry).
210        /// The H+ check is suppressed while this counter is non-zero.
211        ///
212        /// Tracked as a refcount (not a bool) because nested producer
213        /// activations are theoretically possible (e.g., a producer
214        /// whose build closure subscribes to ANOTHER producer that
215        /// also activates) and must balance correctly. The increment
216        /// uses `checked_add(1)` so an unbounded recursion (a real
217        /// bug — protocol cascades are bounded by
218        /// `MAX_BATCH_DRAIN_ITERATIONS`) panics instead of silently
219        /// saturating and disabling the check.
220        ///
221        /// /qa N1(a) (2026-05-09): replaced the original `FIRE_DEPTH`
222        /// thread-local. The previous design gated the H+ check on
223        /// fire_depth > 0; this restricted coverage to the
224        /// FiringGuard-wrapped invoke_fn scope. The new design
225        /// inverts: gate on `held non-empty AND !in_producer_build`.
226        /// Catches sink-callback / handshake / drop-cleanup paths
227        /// the previous gate missed.
228        static IN_PRODUCER_BUILD: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
229    }
230
231    /// Increment the producer-build refcount. Called from
232    /// `FiringGuard::new` ONLY for `is_producer()` nodes, AND from
233    /// `ProducerCtx::subscribe_to`'s wrapped sink (so producer-
234    /// internal sink callbacks also suppress the H+ check).
235    ///
236    /// # Panics
237    ///
238    /// Panics if the per-thread refcount would overflow `u32::MAX`.
239    /// This indicates an unbounded recursion through producer build /
240    /// sink dispatch — a real bug. Safer to surface than to silently
241    /// saturate and disable the check or produce inverted Drop
242    /// semantics. The protocol's `MAX_BATCH_DRAIN_ITERATIONS` cap
243    /// makes this overflow unreachable in practice.
244    pub fn producer_build_enter() {
245        IN_PRODUCER_BUILD.with(|c| {
246            let next = c.get().checked_add(1).expect(
247                "in_producer_build refcount overflow — unbounded \
248                 producer-build re-entrance. Should be bounded by the \
249                 protocol's MAX_BATCH_DRAIN_ITERATIONS cap.",
250            );
251            c.set(next);
252        });
253    }
254
255    /// Decrement the producer-build refcount. Called from
256    /// `FiringGuard::drop` ONLY for guards that incremented in `new`,
257    /// AND from `ProducerSinkGuard::drop` in the producer-sink
258    /// wrapper. Saturates on underflow (would indicate Drop without
259    /// matching `new` — recovery via no-op is safer than panicking
260    /// in Drop).
261    pub fn producer_build_exit() {
262        IN_PRODUCER_BUILD.with(|c| c.set(c.get().saturating_sub(1)));
263    }
264
265    /// Currently inside a producer build/project closure on this thread?
266    fn in_producer_build() -> bool {
267        IN_PRODUCER_BUILD.with(|c| c.get() > 0)
268    }
269
270    /// Phase H+ check + bookkeeping. Called BEFORE acquiring the
271    /// partition's parking_lot::ReentrantMutex.
272    ///
273    /// Panics with a clear diagnostic if:
274    /// - HELD is non-empty (this thread already holds ≥1 partition),
275    /// - AND we're NOT inside a producer build closure,
276    /// - AND `sid` is NOT already held by this thread,
277    /// - AND `sid <= max(currently held)`.
278    ///
279    /// Otherwise: increments the refcount for `sid` (creating the
280    /// entry if needed) and returns. The caller MUST pair every
281    /// call with a [`release`] when the guard drops.
282    ///
283    /// **Important note on cross-thread vs same-thread:** this check
284    /// is a SAME-THREAD invariant — it catches a thread acquiring
285    /// out of order from itself. Cross-thread AB/BA cycles between
286    /// threads with disjoint same-thread acquisition orders are
287    /// prevented at a different layer (the
288    /// `compute_touched_partitions` upfront-acquire-all-ascending
289    /// rule in `Core::begin_batch_for`). This thread-local check
290    /// adds the layer that prevents a same-thread descending acquire
291    /// from creating the FIRST half of a cross-thread cycle.
292    pub(crate) fn check_and_acquire(sid: SubgraphId) {
293        HELD.with(|h| {
294            let mut held = h.borrow_mut();
295            // Gate: held non-empty (we're nested) AND not in producer
296            // build/sink (the v1 carve-out for operator activation +
297            // producer-internal sink callbacks). First-time acquires
298            // on a fresh thread (held empty) skip the check — there's
299            // nothing to compare against.
300            let already_held = held.iter().any(|(s, _)| *s == sid);
301            if !held.is_empty() && !in_producer_build() && !already_held {
302                // Linear-scan max over the inline storage (typical N ≤ 4).
303                // Branch-predictable and cache-local; no allocation.
304                if let Some(max_held) = held.iter().map(|(s, _)| *s).max() {
305                    if sid <= max_held {
306                        // Drop the borrow before panicking so unwind
307                        // doesn't see a still-borrowed RefCell.
308                        let new_id = sid;
309                        drop(held);
310                        panic!(
311                            "Phase H+ ascending-order violation: thread tried \
312                             to acquire partition {new_id:?} while already \
313                             holding partition {max_held:?}. \
314                             The same-thread cross-partition lock-acquisition \
315                             protocol requires every NEW partition acquired \
316                             while ANY partition is already held to have id \
317                             strictly greater than every already-held \
318                             partition; otherwise two threads doing \
319                             reciprocal acquires can form an AB/BA deadlock.\n\
320                             \n\
321                             Note: this check is per-thread. A cross-thread \
322                             AB/BA cycle between threads each obeying \
323                             ascending order at the per-thread level is \
324                             prevented at a different layer — the \
325                             `compute_touched_partitions` upfront-acquire-\
326                             all-ascending rule in `Core::begin_batch_for`.\n\
327                             \n\
328                             Common triggers (see docs/porting-deferred.md \
329                             'Cross-partition acquire-during-fire deadlock'):\n\
330                             - A user fn (derived / dynamic) that calls \
331                             `Core::emit` / `complete` / `error` / `teardown` \
332                             / `invalidate` mid-fire on a node in a partition \
333                             with a smaller id than the firing node's.\n\
334                             - A sink callback that calls `Core::subscribe` \
335                             on a node in a smaller-id partition while the \
336                             outer wave's wave_owner is still held.\n\
337                             - A subscribe handshake (or Drop cleanup) that \
338                             re-enters Core on a smaller-id partition.\n\
339                             \n\
340                             Fix: schedule the cross-partition operation \
341                             OUTSIDE the wave (e.g., via a deferred queue \
342                             applied at wave end) so the acquire happens at \
343                             top-level rather than nested under a held \
344                             partition; OR declare the cross-partition \
345                             reachability upfront via `add_meta_companion` \
346                             so the wave acquires both partitions ascending \
347                             at top-level and the inner re-entry becomes a \
348                             re-entrant acquire on a held partition."
349                        );
350                    }
351                }
352            }
353            // Bookkeeping: increment refcount. `checked_add(1)` so
354            // overflow surfaces (would indicate an unbounded
355            // re-entrance — a real bug). Linear find-or-push.
356            if let Some((_, count)) = held.iter_mut().find(|(s, _)| *s == sid) {
357                *count = count.checked_add(1).expect(
358                    "held_partitions refcount overflow — unbounded \
359                     same-partition re-entrance. Should be bounded by the \
360                     protocol's MAX_BATCH_DRAIN_ITERATIONS cap.",
361                );
362            } else {
363                held.push((sid, 1));
364            }
365        });
366    }
367
368    /// Decrement the refcount for `sid`; remove the entry if it
369    /// hits zero. Called from [`super::WaveOwnerGuard::drop`] AND
370    /// from the retry / panic paths in
371    /// [`super::Core::partition_wave_owner_lock_arc`] to ensure the
372    /// refcount stays balanced under all unwind / retry / exhaust
373    /// paths.
374    ///
375    /// Returns `true` iff this release brought the partition's
376    /// refcount on this thread to zero — i.e. this was the OUTERMOST
377    /// guard for `sid` on this thread. [`super::WaveOwnerGuard::drop`]
378    /// uses this signal to clear per-partition wave state (Q3) on
379    /// outermost release only; inner re-entrant guard drops must NOT
380    /// clear (a containing wave is still active and still holds
381    /// in-flight wave state). The `partition_wave_owner_lock_arc`
382    /// retry / panic paths ignore the return value because the
383    /// partition state hadn't been touched yet on those paths
384    /// (clearing would be a no-op anyway).
385    pub(crate) fn release(sid: SubgraphId) -> bool {
386        HELD.with(|h| {
387            let mut held = h.borrow_mut();
388            if let Some(idx) = held.iter().position(|(s, _)| *s == sid) {
389                let count = &mut held[idx].1;
390                // /qa A3 fix (2026-05-09): debug_assert the refcount is
391                // non-zero before decrement. A `release(sid)` on an
392                // entry with `count == 0` indicates a bookkeeping bug
393                // (a release without a matching `check_and_acquire`,
394                // or a logic error in caller), but the legacy
395                // saturating_sub silently returned `was_outermost=true`
396                // and removed the entry — masking the bug. Surface
397                // in dev/test builds; release builds preserve the
398                // saturating behavior.
399                debug_assert!(
400                    *count > 0,
401                    "held_partitions::release({sid:?}): refcount underflow — \
402                     release without matching check_and_acquire (caller bug)"
403                );
404                *count = count.saturating_sub(1);
405                if *count == 0 {
406                    // `swap_remove` is O(1) and order-irrelevant: the
407                    // CHECK gate computes max via linear scan and does
408                    // not depend on iteration order.
409                    held.swap_remove(idx);
410                    return true;
411                }
412            } else {
413                // /qa A3 fix (2026-05-09): same intent — release on a
414                // sid that's not in HELD is a bookkeeping bug. Surface
415                // in dev/test builds.
416                debug_assert!(
417                    false,
418                    "held_partitions::release({sid:?}): sid not in HELD — \
419                     double-drop or stray release (caller bug)"
420                );
421            }
422            false
423        })
424    }
425
426    /// Test-only: read the current thread's held-partitions snapshot.
427    /// Used by post-panic regression assertions to verify the
428    /// thread-local stays clean even when the H+ check unwinds the
429    /// stack (so cargo's thread-reuse doesn't propagate corrupted
430    /// state to subsequent tests). `pub` (gated by
431    /// `cfg(any(test, debug_assertions))`) so integration tests
432    /// outside the crate can read it.
433    #[cfg(any(test, debug_assertions))]
434    #[must_use]
435    pub fn held_snapshot_for_tests() -> Vec<(SubgraphId, u32)> {
436        // /qa A2 fix (2026-05-09): sort by SubgraphId so the snapshot
437        // returns ascending order — matches the BTreeMap-iteration
438        // contract that pre-/qa SmallVec swap consumers might rely on.
439        // Test consumers currently only assert `is_empty()`, but the
440        // ordered shape is the safer default for future tests that
441        // assert specific entries.
442        let mut v: Vec<(SubgraphId, u32)> = HELD.with(|h| h.borrow().to_vec());
443        v.sort_unstable_by_key(|(s, _)| *s);
444        v
445    }
446
447    /// Test-only: read the current thread's producer-build refcount.
448    /// Companion to [`held_snapshot_for_tests`] for verifying the
449    /// thread-local stays clean post-panic.
450    #[cfg(any(test, debug_assertions))]
451    #[must_use]
452    pub fn in_producer_build_for_tests() -> u32 {
453        IN_PRODUCER_BUILD.with(std::cell::Cell::get)
454    }
455}
456
457/// `pub` re-exports for the `graphrefly-operators` crate to wrap
458/// producer-internal sinks with the same `IN_PRODUCER_BUILD` flag
459/// the FiringGuard uses (per /qa N1(a) — operator sinks are
460/// operator-internal and SUPPRESS the H+ check, mirroring the
461/// activation-time carve-out). Not part of the v1 stable user API
462/// surface; intended for in-workspace consumers only. Phase H+
463/// STRICT variant (the producer-architecture refactor) will
464/// eliminate the need for this carve-out.
465pub use held_partitions::{producer_build_enter, producer_build_exit};
466
467/// Test-only re-exports for integration tests under
468/// `crates/graphrefly-core/tests/`. Gated `#[cfg(any(test, debug_assertions))]`
469/// so they don't leak into release builds. Public visibility is
470/// required because integration tests live outside the crate.
471#[cfg(any(test, debug_assertions))]
472pub use held_partitions::{held_snapshot_for_tests, in_producer_build_for_tests};
473use smallvec::SmallVec;
474use thiserror::Error;
475
476use crate::boundary::{BindingBoundary, CleanupTrigger};
477use crate::clock::monotonic_ns;
478use crate::handle::{FnId, HandleId, LockId, NodeId, NO_HANDLE};
479use crate::message::Message;
480
481/// Terminal-lifecycle state — once set on a node, the node will not emit
482/// further DATA; per-dep slots on consumers also use this to track which
483/// upstreams have terminated (R1.3.4 / Lock 2.B).
484///
485/// `Error` carries a [`HandleId`] resolving to the error value. Refcount is
486/// retained when the variant is stored in a node's `terminal` slot or any
487/// consumer's `dep_terminals` slot; v1 does not release these (terminal
488/// state is one-shot at this layer; release happens on resubscribable
489/// terminal-lifecycle reset, a separate slice).
490#[derive(Copy, Clone, Debug, PartialEq, Eq)]
491pub enum TerminalKind {
492    Complete,
493    Error(HandleId),
494}
495
496/// Node kind discriminant — **derived metadata** computed from
497/// [`NodeRecord`]'s field shape (D030 unification, Slice D).
498///
499/// Core no longer stores `kind` as a field; it's computed on demand from
500/// `(deps.is_empty(), fn_id.is_some(), op.is_some(), is_dynamic)`,
501/// mirroring TS's data model where `NodeImpl` has no `_kind` field. The
502/// shape uniquely identifies the kind:
503///
504/// | deps      | fn_id | op   | is_dynamic | kind     |
505/// |-----------|-------|------|-----------|----------|
506/// | empty     | None  | None | -         | State    |
507/// | empty     | Some  | None | -         | Producer |
508/// | non-empty | Some  | None | false     | Derived  |
509/// | non-empty | Some  | None | true      | Dynamic  |
510/// | non-empty | None  | Some | -         | Operator |
511///
512/// Public API ([`Core::kind_of`]) derives this enum on each call. State
513/// nodes are ROM (cache survives deactivation); compute nodes
514/// (Derived / Dynamic / Operator) and producers are RAM.
515#[derive(Copy, Clone, Eq, PartialEq, Debug)]
516pub enum NodeKind {
517    /// Source node: cache is intrinsic, no fn, no deps. Mutated via [`Core::emit`].
518    State,
519    /// Producer node: fn fires once on first subscribe. No deps;
520    /// emissions arrive via sinks the fn subscribes to (zip / concat /
521    /// race / takeUntil pattern). Slice D / D031.
522    Producer,
523    /// Derived node: fn fires on every dep change; all deps tracked.
524    Derived,
525    /// Dynamic node: fn declares which dep indices it actually read this run.
526    /// Untracked dep updates flow through cache but do NOT re-fire fn.
527    Dynamic,
528    /// Operator node: built-in dispatch path for transform / combine /
529    /// flow / resilience operators. The `OperatorOp` discriminant selects
530    /// the per-operator FFI path ([`BindingBoundary::project_each`] etc.);
531    /// Core manages per-operator state via the generic `op_scratch` slot
532    /// on `NodeRecord` (D026). Per Slice C-1 (D009) / Slice C-3 (D026).
533    Operator(OperatorOp),
534}
535
536impl NodeKind {
537    /// True if this kind opts OUT of Lock 2.B auto-cascade. Operator(Reduce)
538    /// and Operator(Last) must intercept upstream COMPLETE so they can emit
539    /// their accumulator / buffered value before the cascade terminates them;
540    /// instead of cascading, terminate_node queues such children for fn-fire
541    /// so `fire_operator` can handle the terminal.
542    pub(crate) fn skips_auto_cascade(self) -> bool {
543        matches!(
544            self,
545            NodeKind::Operator(OperatorOp::Reduce { .. } | OperatorOp::Last { .. })
546        )
547    }
548}
549
550/// Built-in operator discriminant. Selects the per-operator dispatch path
551/// in `fire_operator` (`crates/graphrefly-core/src/batch.rs`). Each variant
552/// carries the binding-side closure ids (and seed handle for stateful
553/// folders) needed for the wave-execution path; Core stores no user values
554/// itself per the handle-protocol cleaving plane.
555#[derive(Copy, Clone, Eq, PartialEq, Debug)]
556pub enum OperatorOp {
557    /// `map(source, project)` — element-wise transform. Calls
558    /// `BindingBoundary::project_each(fn_id, &inputs)` per fire; emits each
559    /// returned handle via `commit_emission_verbatim` (R1.3.2.d batch
560    /// semantics — no equals substitution between batch entries).
561    Map { fn_id: FnId },
562    /// `filter(source, predicate)` — silent-drop selection (D012/D018).
563    /// Calls `BindingBoundary::predicate_each(fn_id, &inputs)`; emits each
564    /// passing input verbatim. If zero pass on a wave that dirtied the
565    /// node, queues a single `RESOLVED` to settle (D018).
566    Filter { fn_id: FnId },
567    /// `scan(source, fold, seed)` — left-fold emitting each new accumulator.
568    /// `seed` is captured at registration; `acc` lives in
569    /// [`ScanState`](super::op_state::ScanState) inside
570    /// [`NodeRecord::op_scratch`] and persists across waves until
571    /// resubscribable reset. Calls `BindingBoundary::fold_each(fn_id, acc,
572    /// &inputs) -> SmallVec<HandleId>` per fire.
573    Scan { fn_id: FnId, seed: HandleId },
574    /// `reduce(source, fold, seed)` — left-fold emitting once on upstream
575    /// COMPLETE. Accumulates silently while source DATA flows; on
576    /// dep[0].terminal == Some(Complete), emits `[Data(acc), Complete]`.
577    /// On `Error(h)`, propagates the error verbatim. Opts out of Lock 2.B
578    /// auto-cascade (see `NodeKind::skips_auto_cascade`).
579    Reduce { fn_id: FnId, seed: HandleId },
580    /// `distinctUntilChanged(source, equals)` — suppresses adjacent
581    /// duplicates. Calls `BindingBoundary::custom_equals(equals_fn_id,
582    /// prev, current)` per input; emits non-equal items verbatim and
583    /// updates `prev`. If zero items pass on a wave that dirtied the node,
584    /// queues `RESOLVED` (matches Filter discipline).
585    DistinctUntilChanged { equals_fn_id: FnId },
586    /// `pairwise(source)` — emits `(prev, current)` pairs starting after
587    /// the second value. First value swallowed (sets `prev`). Calls
588    /// `BindingBoundary::pairwise_pack(fn_id, prev, current)` per pair to
589    /// produce the binding-side tuple handle.
590    Pairwise { fn_id: FnId },
591
592    // ----- Slice C-2: multi-dep combinators (D020) -----
593    /// `combine(...sources)` — N-dep combineLatest. On any dep fire, packs
594    /// the latest handle per dep into a single tuple handle via
595    /// `BindingBoundary::pack_tuple(pack_fn, &handles)`. First-run gate
596    /// (`partial: false` default) holds until all deps deliver real DATA
597    /// (R2.5.3). COMPLETE cascades when all deps complete (R1.3.4.b).
598    Combine { pack_fn: FnId },
599
600    /// `withLatestFrom(primary, secondary)` — 2-dep, fire-on-primary-only
601    /// (D021, Phase 10.5). Packs `[primary, secondary]` via
602    /// `BindingBoundary::pack_tuple(pack_fn, &handles)` when dep[0]
603    /// (primary) has DATA in the wave. If only dep[1] (secondary) fires,
604    /// settles with RESOLVED (D018 pattern). First-run gate holds until
605    /// both deps deliver (R2.5.3 `partial: false`). Post-warmup INVALIDATE
606    /// guard: if secondary `prev_data == NO_HANDLE` and batch empty after
607    /// warmup, settles with RESOLVED (no stale pair).
608    WithLatestFrom { pack_fn: FnId },
609
610    /// `merge(...sources)` — N-dep, forward all DATA handles verbatim
611    /// (D022). Zero FFI on fire: no transformation, no binding call.
612    /// Each dep's batch handles are retained and emitted individually.
613    /// COMPLETE cascades when all deps complete (R1.3.4.b).
614    Merge,
615
616    // ----- Slice C-3: flow operators (D024) -----
617    /// `take(source, count)` — emits the first `count` DATA values then
618    /// self-completes via `Core::complete`. Tracks `count_emitted` in
619    /// [`TakeState`](super::op_state::TakeState). When upstream completes
620    /// before `count` is reached, the standard auto-cascade propagates
621    /// COMPLETE. `count == 0` is allowed: the first fire emits zero
622    /// items then immediately self-completes (D027).
623    Take { count: u32 },
624
625    /// `skip(source, count)` — drops the first `count` DATA values; once
626    /// the threshold is crossed, subsequent DATAs pass through verbatim.
627    /// Tracks `count_skipped` in [`SkipState`](super::op_state::SkipState).
628    /// On a wave where every input is still in the skip window, queues
629    /// DIRTY+RESOLVED to settle (D018 pattern).
630    Skip { count: u32 },
631
632    /// `takeWhile(source, predicate)` — emits while `predicate(input)`
633    /// holds; on the first `false`, emits any preceding passes then
634    /// self-completes via `Core::complete`. Reuses
635    /// [`BindingBoundary::predicate_each`] (D029); after the first
636    /// `false`, subsequent inputs in the same batch are dropped.
637    TakeWhile { fn_id: FnId },
638
639    /// `last(source)` / `last_with_default(source, default)` — buffers
640    /// the latest DATA; on upstream COMPLETE, emits `Data(latest)` then
641    /// `Complete`. The `default` field is `NO_HANDLE` for the no-default
642    /// factory (emits only `Complete` on empty stream), or a registered
643    /// default handle (emits `Data(default)` + `Complete` on empty
644    /// stream). Storage: [`LastState`](super::op_state::LastState) holds
645    /// `latest` (live buffer) and `default` (registration-time, stable).
646    /// Opts out of Lock 2.B auto-cascade so it can intercept upstream
647    /// COMPLETE.
648    Last { default: HandleId },
649}
650
651/// Registration options for [`Core::register_operator`].
652///
653/// `equals` controls operator output dedup (R5.7 — defaults to identity).
654/// `partial` controls the R2.5.3 first-run gate (R5.4 — operator dispatch
655/// fires on first DATA from any dep when `true`; default `false` matches
656/// the gated derived discipline).
657#[derive(Copy, Clone, Debug)]
658pub struct OperatorOpts {
659    pub equals: EqualsMode,
660    pub partial: bool,
661}
662
663impl Default for OperatorOpts {
664    fn default() -> Self {
665        Self {
666            equals: EqualsMode::Identity,
667            partial: false,
668        }
669    }
670}
671
672/// Closure-form fn id OR typed operator discriminant — the two dispatch
673/// paths a node can use. State / passthrough nodes pass `None` to
674/// [`Core::register`] (no fn at all).
675#[derive(Copy, Clone, Debug)]
676pub enum NodeFnOrOp {
677    /// Closure-form: invokes [`BindingBoundary::invoke_fn`] per fire.
678    /// Used for Derived / Dynamic / Producer.
679    Fn(FnId),
680    /// Typed-op: routes to a `fire_op_*` helper that calls per-operator
681    /// FFI methods (`project_each` / `predicate_each` / `fold_each` /
682    /// `pairwise_pack` / `pack_tuple`). Used for Operator nodes.
683    Op(OperatorOp),
684}
685
686/// Pause behavior mode (canonical-spec §2.6 — three modes shipped in TS;
687/// Slice F audit, 2026-05-07 — closed the Rust port gap).
688///
689/// | Mode | Outgoing tier-3 routing while paused | RESUME behavior |
690/// |---|---|---|
691/// | [`PausableMode::Default`] | suppress fn-fire upstream (no DIRTY emitted) | fire fn ONCE on RESUME if any dep delivered DATA during pause; collapses N pause-window writes into one settle |
692/// | [`PausableMode::ResumeAll`] | buffer outgoing tier-3 / tier-4 messages per-wave | replay each buffered wave verbatim on RESUME |
693/// | [`PausableMode::Off`] | dispatcher ignores PAUSE; tier-3 flushes immediately | no-op (no buffer to drain) |
694///
695/// Default is [`PausableMode::Default`] per canonical §2.6 — every untagged
696/// source picks it up. Memory profile is O(1) per node (no buffer); the
697/// trade-off is "subscribers see one consolidated DATA on RESUME" rather
698/// than the K mid-pause emissions verbatim.
699///
700/// Note: tier-1 (DIRTY) / tier-2 (PAUSE/RESUME) / tier-5 (COMPLETE/ERROR) /
701/// tier-6 (TEARDOWN) bypass pause regardless of mode — they remain
702/// observable so leaked pause-controllers cannot strand subscribers.
703#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
704pub enum PausableMode {
705    /// Suppress fn-fire while paused; fire once on RESUME if any dep
706    /// delivered DATA during the pause window. Canonical default.
707    #[default]
708    Default,
709    /// Buffer outgoing tier-3 / tier-4 messages per-wave; replay on
710    /// RESUME. Use when subscribers need verbatim emit history (e.g. an
711    /// audit log, replay-on-reconnect bridge).
712    ResumeAll,
713    /// Dispatcher ignores PAUSE for this node — tier-3 flushes
714    /// immediately even while a lock is held. Use for nodes whose value
715    /// production is intrinsically pause-immune (telemetry counters,
716    /// monotonic timers).
717    Off,
718}
719
720/// Per-kind opts for [`Core::register`]. Cross-kind config knobs live
721/// here; per-kind specifics (deps, fn_or_op) live on
722/// [`NodeRegistration`].
723#[derive(Copy, Clone, Debug)]
724pub struct NodeOpts {
725    /// Initial cached value. Only valid for state nodes (no deps + no
726    /// fn + no op). [`NO_HANDLE`] starts the node sentinel.
727    pub initial: HandleId,
728    /// Equality mode for outgoing emissions (R1.3.2). Defaults to
729    /// [`EqualsMode::Identity`].
730    pub equals: EqualsMode,
731    /// First-run gate (R2.5.3 / D011). When `true`, the node fires as
732    /// soon as ANY dep delivers a real handle; when `false` (default),
733    /// the node holds until every dep has delivered.
734    pub partial: bool,
735    /// Dynamic flag (R2.5.3) — fn declares actually-tracked dep indices
736    /// per fire. Only meaningful when `fn_or_op == Some(Fn(_))` AND
737    /// deps non-empty.
738    pub is_dynamic: bool,
739    /// Pause behavior mode (canonical §2.6). Default is
740    /// [`PausableMode::Default`]. See [`PausableMode`] for the trade-offs.
741    pub pausable: PausableMode,
742    /// Replay buffer cap (canonical R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
743    /// `None` (default) disables; `Some(N)` keeps a circular buffer of the
744    /// last N DATA emissions and replays them to late subscribers as part
745    /// of the per-tier handshake (between [`Message::Start`] and any
746    /// terminal slice). Only DATA is buffered; RESOLVED entries are NOT
747    /// (R2.6.5 explicit "DATA only").
748    pub replay_buffer: Option<usize>,
749}
750
751impl Default for NodeOpts {
752    fn default() -> Self {
753        Self {
754            initial: NO_HANDLE,
755            equals: EqualsMode::Identity,
756            partial: false,
757            is_dynamic: false,
758            pausable: PausableMode::Default,
759            replay_buffer: None,
760        }
761    }
762}
763
764/// Unified node-registration descriptor (D030, Slice D).
765///
766/// All node kinds (State / Producer / Derived / Dynamic / Operator)
767/// register through [`Core::register`] with a `NodeRegistration`. The
768/// kind is **derived from the field shape** of the registration —
769/// `(deps.is_empty(), fn_or_op variant)`:
770///
771/// | deps      | fn_or_op   | is_dynamic | resulting kind |
772/// |-----------|-----------|-----------|----------------|
773/// | empty     | None      | -         | State          |
774/// | empty     | Some(Fn)  | -         | Producer       |
775/// | non-empty | Some(Fn)  | false     | Derived        |
776/// | non-empty | Some(Fn)  | true      | Dynamic        |
777/// | non-empty | Some(Op)  | -         | Operator       |
778///
779/// The sugar wrappers ([`Core::register_state`], [`Core::register_producer`],
780/// etc.) build a `NodeRegistration` and delegate.
781#[derive(Clone, Debug)]
782pub struct NodeRegistration {
783    /// Upstream deps in declaration order. Empty for state / producer.
784    pub deps: Vec<NodeId>,
785    /// Closure-form fn id or typed-op discriminant. `None` for state /
786    /// passthrough.
787    pub fn_or_op: Option<NodeFnOrOp>,
788    /// Cross-kind config knobs.
789    pub opts: NodeOpts,
790}
791
792/// Equality mode for a node's outgoing emissions.
793///
794/// `Identity` is the default: cache vs. new handle compare is a `u64` equal —
795/// zero FFI. `Custom` invokes [`BindingBoundary::custom_equals`] every check
796/// (R1.3.2.b two-arg call when both sides are non-sentinel).
797#[derive(Copy, Clone, Debug)]
798pub enum EqualsMode {
799    Identity,
800    Custom(FnId),
801}
802
803/// Internal identifier for a single subscription. Allocated per
804/// [`Core::subscribe`] call. Wrapped by [`Subscription`] for the public API;
805/// consumed directly only by Core internals and the [`Subscription::Drop`]
806/// path.
807#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
808pub(crate) struct SubscriptionId(u64);
809
810/// RAII subscription handle.
811///
812/// Returned by [`Core::subscribe`]. While the handle is held, the sink stays
813/// registered against its node. Dropping the handle (explicitly via
814/// `drop(sub)` or implicitly at scope exit) unsubscribes the sink — no manual
815/// `unsubscribe()` call is needed. Per §10.12 of the rust-port session doc.
816///
817/// # Lifetime semantics
818///
819/// The subscription holds a [`Weak`] reference back to the Core's state. If
820/// the Core is dropped before the subscription, the Drop impl is a silent
821/// no-op (the sink has nowhere to deregister from anyway). This avoids a
822/// reference cycle when subscribers capture an `Arc<Core>` in their closure.
823///
824/// # Thread safety
825///
826/// `Send + Sync`. The handle can be moved across threads or dropped from
827/// any thread.
828///
829/// # Not Clone
830///
831/// `Subscription` owns the unsubscribe action exclusively. Cloning would
832/// require either "first drop wins" or "last drop wins" semantics, both
833/// of which surprise. If a binding needs multiple deregistration handles,
834/// it should subscribe multiple times (each producing a fresh handle) or
835/// wrap the single `Subscription` in `Arc<Mutex<Option<Subscription>>>`.
836#[must_use = "dropping a Subscription unsubscribes its sink immediately"]
837pub struct Subscription {
838    state: Weak<Mutex<CoreState>>,
839    node_id: NodeId,
840    sub_id: SubscriptionId,
841}
842
843impl Subscription {
844    /// The node this subscription is attached to.
845    #[must_use]
846    pub fn node_id(&self) -> NodeId {
847        self.node_id
848    }
849}
850
851impl Drop for Subscription {
852    fn drop(&mut self) {
853        // Silent no-op if Core is gone. This keeps Drop infallible (no panics
854        // from a dropped subscription racing a dropped Core) and avoids
855        // surprising users with errors on shutdown.
856        //
857        // Producer deactivation (Slice D, D031): if removing this sub
858        // empties the subscribers map AND the node is a producer, fire
859        // `BindingBoundary::producer_deactivate(node_id)` AFTER releasing
860        // the state lock. The binding then drops its per-node state
861        // (subscriptions to upstream sources, captured closure state),
862        // which transitively unsubs from upstreams via their own
863        // `Subscription::Drop`. Re-entrance into Core from the deactivate
864        // hook is permitted since the lock is released first.
865        let Some(state) = self.state.upgrade() else {
866            return;
867        };
868        // Slice E2 (D056): when the last subscriber drops, fire the
869        // node's OnDeactivation cleanup hook BEFORE producer_deactivate
870        // (cleanup may release handles the producer subscription owns;
871        // reverse order would let producer_deactivate drop subs that user
872        // cleanup expected to be live). Both calls are lock-released per
873        // D045.
874        //
875        // OnDeactivation gating (D068, QA Q3 fix): fires only when the
876        // node has fired its fn at least once AND has a fn (`fn_id`
877        // populated). State nodes have no fn — they cannot register a
878        // cleanup spec via the production fn-return path (R2.4.5), so
879        // firing `cleanup_for` on them is wasted FFI; the binding's
880        // lookup is guaranteed to find no `current_cleanup`. Skipping
881        // here saves the FFI hop and matches the design-doc wording
882        // ("never-fired state nodes" — state-with-initial-value satisfies
883        // `has_fired_once = true` but still has no fn).
884        //
885        // Slice E2 /qa Q2(b) (D069): if the node is a resubscribable
886        // node that's ALREADY terminal (terminate fired BEFORE this last
887        // sub drop), fire `wipe_ctx` lock-released AFTER OnDeactivation
888        // + producer_deactivate. Mutually exclusive with `terminate_node`'s
889        // queue-wipe site: terminate-with-empty-subs goes through
890        // `pending_wipes`; terminate-with-live-subs routes here when
891        // those subs eventually drop. Either path fires exactly one
892        // wipe per terminal lifecycle.
893        let (was_last_sub, is_producer, has_user_cleanup, fire_wipe, binding) = {
894            let mut s = state.lock();
895            let Some(rec) = s.nodes.get_mut(&self.node_id) else {
896                return;
897            };
898            rec.subscribers.remove(&self.sub_id);
899            // Slice X4 / D2: bump revision so any pending_notify entry for
900            // this node opened earlier in the wave starts a fresh batch on
901            // the next queue_notify, dropping the now-departed sink from
902            // the snapshot.
903            rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
904            let last = rec.subscribers.is_empty();
905            let producer = rec.is_producer();
906            // OnDeactivation gate: must have run a fn at least once
907            // (has_fired_once) AND have a fn registered (fn_id.is_some()).
908            // The fn_id check excludes state nodes whose has_fired_once
909            // tracks initial-value status, not "user fn ran."
910            let user_cleanup = rec.has_fired_once && rec.fn_id.is_some();
911            let fire_wipe = last && rec.resubscribable && rec.terminal.is_some();
912            // Clone the binding Arc out only if at least one hook will
913            // fire. Cheap (Arc::clone) in the common path; skipped on
914            // non-last-sub or never-fired non-producer nodes.
915            let binding = if last && (producer || user_cleanup || fire_wipe) {
916                Some(s.binding.clone())
917            } else {
918                None
919            };
920            (last, producer, user_cleanup, fire_wipe, binding)
921        };
922        if was_last_sub {
923            if let Some(binding) = binding {
924                if has_user_cleanup {
925                    binding.cleanup_for(self.node_id, CleanupTrigger::OnDeactivation);
926                }
927                if is_producer {
928                    binding.producer_deactivate(self.node_id);
929                }
930                // D069: eager wipe — fires AFTER OnDeactivation so the
931                // user closure observes pre-wipe `store` (matches the
932                // existing "OnDeactivation runs before wipe on terminal
933                // reset" invariant covered by test 10). Idempotent —
934                // `HashMap::remove` on absent key is a no-op, so even
935                // if the wave already drained `pending_wipes` earlier,
936                // this fire is benign.
937                if fire_wipe {
938                    binding.wipe_ctx(self.node_id);
939                }
940            }
941        }
942    }
943}
944
945// Compile-time assertion that Subscription is Send + Sync. If a future field
946// breaks this, the build fails here rather than downstream at the binding
947// site.
948const _: fn() = || {
949    fn assert_send_sync<T: Send + Sync>() {}
950    assert_send_sync::<Subscription>();
951};
952
953/// A subscriber callback. `Send + Sync` so the Core can fire it from any
954/// thread; `Fn` (not `FnMut`) so multiple references coexist — capture
955/// mutable state in `Mutex<T>` or atomics on the binding side.
956pub type Sink = Arc<dyn Fn(&[Message]) + Send + Sync>;
957
958// ---------------------------------------------------------------------------
959// PAUSE/RESUME state — §10.2 of the rust-port session doc
960// ---------------------------------------------------------------------------
961
962/// Per-node pause state.
963///
964/// Replaces the four TS fields (`_pauseLocks`, `_pauseBuffer`,
965/// `_pauseDroppedCount`, `_pauseStartNs`) with a single enum where
966/// the buffered fields are unreachable in the [`Self::Active`] variant —
967/// the compiler refuses access. Per §10.2 simplification.
968///
969/// # Invariants
970///
971/// - `Active` ⇔ no lockId held.
972/// - `Paused { locks, .. }` ⇔ `!locks.is_empty()`.
973/// - Buffered messages are tier 3 (DATA/RESOLVED) and tier 4 (INVALIDATE)
974///   only. Other tiers pass through immediately even while paused.
975/// - `dropped` counts messages that fell out the front of `buffer` due to
976///   the Core-global `pause_buffer_cap`; it is reported on resume so callers
977///   can detect overflow without re-tracking it externally.
978#[derive(Debug)]
979pub(crate) enum PauseState {
980    Active,
981    Paused {
982        /// Active lock holders. `SmallVec` keeps the common 1–2 lock case
983        /// stack-allocated. Replaces `Set<unknown>` from TS.
984        locks: SmallVec<[LockId; 2]>,
985        /// Buffered tier-3/tier-4 outgoing messages, in arrival order.
986        /// Replayed on the final RESUME.
987        buffer: VecDeque<Message>,
988        /// Count of messages dropped from the front when `buffer.len()` would
989        /// exceed `pause_buffer_cap`. Cleared on final RESUME (next pause
990        /// cycle starts fresh).
991        dropped: u32,
992        /// Wall-clock-monotonic ns when the lock first transitioned this node
993        /// from `Active` to `Paused`. Used by R1.3.8.c overflow ERROR
994        /// synthesis to compute `lock_held_duration_ms` in the diagnostic
995        /// payload (Slice F, A3 — 2026-05-07).
996        started_at_ns: u64,
997        /// True after the first overflow event in this pause cycle has been
998        /// reported via [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
999        /// Subsequent overflows in the same cycle don't re-emit ERROR
1000        /// (canonical R1.3.8.c: "once per overflow event"). Cleared on
1001        /// final RESUME (next pause cycle starts fresh).
1002        overflow_reported: bool,
1003        /// Default-mode bookkeeping (Slice F audit close, 2026-05-07).
1004        /// Set to `true` when an upstream dep delivery arrives while this
1005        /// node is paused with [`PausableMode::Default`]. On final RESUME,
1006        /// if `true`, the node is added back to `pending_fires` so the fn
1007        /// fires once with the consolidated dep state. Always `false` for
1008        /// `ResumeAll` mode (the buffered messages are the consolidation
1009        /// mechanism there). Cleared on final RESUME.
1010        pending_wave: bool,
1011    },
1012}
1013
1014impl PauseState {
1015    pub(crate) fn is_paused(&self) -> bool {
1016        matches!(self, Self::Paused { .. })
1017    }
1018
1019    fn lock_count(&self) -> usize {
1020        match self {
1021            Self::Active => 0,
1022            Self::Paused { locks, .. } => locks.len(),
1023        }
1024    }
1025
1026    fn contains_lock(&self, lock_id: LockId) -> bool {
1027        match self {
1028            Self::Active => false,
1029            Self::Paused { locks, .. } => locks.contains(&lock_id),
1030        }
1031    }
1032
1033    /// Add a lock; transitions Active → Paused on first lock. Idempotent on
1034    /// duplicate lock_id (matches TS convention; spec is silent on the case).
1035    fn add_lock(&mut self, lock_id: LockId) {
1036        match self {
1037            Self::Active => {
1038                let mut locks = SmallVec::new();
1039                locks.push(lock_id);
1040                *self = Self::Paused {
1041                    locks,
1042                    buffer: VecDeque::new(),
1043                    dropped: 0,
1044                    started_at_ns: monotonic_ns(),
1045                    overflow_reported: false,
1046                    pending_wave: false,
1047                };
1048            }
1049            Self::Paused { locks, .. } => {
1050                if !locks.contains(&lock_id) {
1051                    locks.push(lock_id);
1052                }
1053            }
1054        }
1055    }
1056
1057    /// Mark that an upstream dep delivered DATA to a node paused with
1058    /// [`PausableMode::Default`]. The node will re-enter `pending_fires`
1059    /// on final RESUME via [`Self::take_pending_wave`].
1060    pub(crate) fn mark_pending_wave(&mut self) {
1061        if let Self::Paused { pending_wave, .. } = self {
1062            *pending_wave = true;
1063        }
1064    }
1065
1066    /// Read and clear the `pending_wave` flag. Called from
1067    /// [`Core::resume`] when transitioning Paused → Active. Returns `true`
1068    /// only if the node was paused with `pending_wave` set.
1069    pub(crate) fn take_pending_wave(&mut self) -> bool {
1070        if let Self::Paused { pending_wave, .. } = self {
1071            std::mem::replace(pending_wave, false)
1072        } else {
1073            false
1074        }
1075    }
1076
1077    /// Remove a lock; if the lockset becomes empty, transition Paused →
1078    /// Active and return the buffered messages for replay (along with the
1079    /// dropped count for diagnostics). Unknown lock_id is an idempotent
1080    /// no-op (matches TS, R1.2.6 implicit).
1081    fn remove_lock(&mut self, lock_id: LockId) -> Option<(VecDeque<Message>, u32)> {
1082        match self {
1083            Self::Active => None,
1084            Self::Paused { locks, .. } => {
1085                if let Some(idx) = locks.iter().position(|l| *l == lock_id) {
1086                    locks.swap_remove(idx);
1087                }
1088                if locks.is_empty() {
1089                    let prev = std::mem::replace(self, Self::Active);
1090                    if let Self::Paused {
1091                        buffer, dropped, ..
1092                    } = prev
1093                    {
1094                        return Some((buffer, dropped));
1095                    }
1096                }
1097                None
1098            }
1099        }
1100    }
1101
1102    /// Append a message to the buffer; if the buffer would exceed `cap`,
1103    /// pop from the front (oldest-first), increment `dropped`, and return
1104    /// the dropped messages so the caller can release any payload handles
1105    /// they reference. `cap` of `None` means unbounded.
1106    ///
1107    /// Returns [`PushBufferedResult`] carrying both the dropped messages
1108    /// (for refcount release) and whether this push triggered the FIRST
1109    /// overflow event in the current pause cycle (for R1.3.8.c ERROR
1110    /// synthesis — the caller schedules a single ERROR per cycle).
1111    ///
1112    /// Note: refcount management for the message's payload handle is the
1113    /// caller's responsibility — see [`Core::queue_notify`] for the
1114    /// retain/release discipline. The buffer itself is just a message
1115    /// container; refcounts cross the binding boundary.
1116    pub(crate) fn push_buffered(&mut self, msg: Message, cap: Option<usize>) -> PushBufferedResult {
1117        let mut result = PushBufferedResult::default();
1118        if let Self::Paused {
1119            buffer,
1120            dropped,
1121            overflow_reported,
1122            ..
1123        } = self
1124        {
1125            buffer.push_back(msg);
1126            if let Some(c) = cap {
1127                while buffer.len() > c {
1128                    if let Some(dropped_msg) = buffer.pop_front() {
1129                        result.dropped_msgs.push(dropped_msg);
1130                    }
1131                    *dropped = dropped.saturating_add(1);
1132                }
1133            }
1134            // R1.3.8.c (Slice F, A3): flag first overflow this cycle.
1135            if !result.dropped_msgs.is_empty() && !*overflow_reported {
1136                *overflow_reported = true;
1137                result.first_overflow_this_cycle = true;
1138            }
1139        }
1140        result
1141    }
1142
1143    /// Snapshot the diagnostic for an R1.3.8.c overflow ERROR synthesis.
1144    /// Returns `(dropped_count, lock_held_ns)`. Caller must already know
1145    /// the configured cap (it's a Core-global value, not per-PauseState).
1146    pub(crate) fn overflow_diagnostic(&self) -> Option<(u32, u64)> {
1147        match self {
1148            Self::Active => None,
1149            Self::Paused {
1150                dropped,
1151                started_at_ns,
1152                ..
1153            } => {
1154                let lock_held_ns = monotonic_ns().saturating_sub(*started_at_ns);
1155                Some((*dropped, lock_held_ns))
1156            }
1157        }
1158    }
1159}
1160
1161/// Return shape for [`PauseState::push_buffered`]. Carries both the dropped
1162/// messages (for refcount release) and an "is this the first overflow this
1163/// cycle" flag (for R1.3.8.c ERROR synthesis scheduling).
1164#[derive(Default)]
1165pub(crate) struct PushBufferedResult {
1166    pub(crate) dropped_msgs: Vec<Message>,
1167    pub(crate) first_overflow_this_cycle: bool,
1168}
1169
1170/// Pending R1.3.8.c overflow ERROR synthesis entry. Recorded by
1171/// [`Core::queue_notify`] when the pause buffer first overflows in a cycle;
1172/// drained at wave-end after the lock-released call to
1173/// [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
1174///
1175/// `configured_max` is captured at scheduling time rather than read at
1176/// drain — the user could change `pause_buffer_cap` between schedule and
1177/// drain, and the diagnostic reads "the cap that was in effect when the
1178/// overflow happened."
1179#[derive(Debug, Clone)]
1180pub(crate) struct PendingPauseOverflow {
1181    pub(crate) node_id: NodeId,
1182    pub(crate) dropped_count: u32,
1183    pub(crate) configured_max: usize,
1184    pub(crate) lock_held_ns: u64,
1185}
1186
1187/// Errors returnable by [`Core::pause`] and [`Core::resume`].
1188#[derive(Error, Debug, Clone, PartialEq)]
1189pub enum PauseError {
1190    #[error("pause/resume: unknown node {0:?}")]
1191    UnknownNode(NodeId),
1192}
1193
1194/// Errors returnable by [`Core::up`] (canonical R1.4.1).
1195#[derive(Error, Debug, Clone, PartialEq)]
1196pub enum UpError {
1197    /// Node id is not registered.
1198    #[error("up: unknown node {0:?}")]
1199    UnknownNode(NodeId),
1200    /// Tier-3 (DATA / RESOLVED) and tier-5 (COMPLETE / ERROR) are
1201    /// downstream-only per R1.4.1; rejected at the boundary.
1202    #[error(
1203        "up: tier {tier} is forbidden upstream — value (tier 3) and \
1204         terminal-lifecycle (tier 5) planes are downstream-only per R1.4.1"
1205    )]
1206    TierForbidden { tier: u8 },
1207}
1208
1209/// Errors returnable by [`Core::register`] and its sugar wrappers
1210/// ([`Core::register_state`], [`Core::register_producer`],
1211/// [`Core::register_derived`], [`Core::register_dynamic`],
1212/// [`Core::register_operator`]).
1213///
1214/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
1215/// errors so that callers can recover from contract violations without
1216/// process abort. Every variant corresponds to a construction-time
1217/// invariant that the caller is responsible for upholding; the dispatcher
1218/// rejects the registration before any reactive state is created (so
1219/// there is no `Message::Error` channel through which to surface the
1220/// failure — these are imperative-layer errors, not reactive ones).
1221///
1222/// All variants are zero-side-effect: when [`Core::register`] returns
1223/// `Err`, no node has been added to the graph and any handle retains
1224/// taken on the way in (e.g. operator scratch seed retains via
1225/// [`BindingBoundary::retain_handle`]) have been released.
1226#[derive(Error, Debug, Clone, PartialEq, Eq)]
1227pub enum RegisterError {
1228    /// One of the supplied dep ids is not a registered node.
1229    #[error("register: unknown dep {0:?}")]
1230    UnknownDep(NodeId),
1231
1232    /// `op` was supplied (operator node) but `deps` was empty. Operator
1233    /// nodes need at least one dep — for subscription-managed combinators
1234    /// with no declared deps, use [`Core::register_producer`] instead.
1235    #[error(
1236        "register: operator nodes require at least one dep — \
1237         use register_producer for subscription-managed combinators"
1238    )]
1239    OperatorWithoutDeps,
1240
1241    /// [`NodeOpts::initial`] was set to a real handle but the registration
1242    /// shape is not a state node (state nodes are `deps.is_empty() &&
1243    /// fn_id.is_none() && op.is_none()`). Initial cache only makes sense
1244    /// for state nodes.
1245    #[error("register: NodeOpts::initial only valid for state nodes (no deps + no fn + no op)")]
1246    InitialOnlyForStateNodes,
1247
1248    /// A supplied dep is terminal (COMPLETE / ERROR) AND not
1249    /// resubscribable. Adding it would create a permanent wedge — the dep
1250    /// will never re-emit, so the registered node would be stuck.
1251    /// Mirrors [`SetDepsError::TerminalDep`] at registration time.
1252    #[error(
1253        "register: dep {0:?} is terminal and not resubscribable; \
1254         mark it resubscribable before terminating, or remove it from the dep list"
1255    )]
1256    TerminalDep(NodeId),
1257
1258    /// A stateful operator ([`OperatorOp::Scan`] / [`OperatorOp::Reduce`])
1259    /// was registered with `seed = NO_HANDLE`. R2.5.3 first-run gate
1260    /// requires the seed to be a real handle so that the operator can
1261    /// emit on its first fire.
1262    #[error("register: operator seed must be a real handle (R2.5.3); got NO_HANDLE")]
1263    OperatorSeedSentinel,
1264}
1265
1266/// Errors returnable by [`Core::set_pausable_mode`].
1267///
1268/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
1269/// errors. Same imperative-layer error model as [`RegisterError`].
1270#[derive(Error, Debug, Clone, PartialEq, Eq)]
1271pub enum SetPausableModeError {
1272    /// `node_id` is not a registered node.
1273    #[error("set_pausable_mode: unknown node {0:?}")]
1274    UnknownNode(NodeId),
1275    /// The node currently holds at least one pause lock. Changing pausable
1276    /// mode mid-pause would lose buffered content or strand a
1277    /// `pending_wave` flag — resume all locks first.
1278    #[error(
1279        "set_pausable_mode: cannot change pausable mode while paused; \
1280         resume all locks first"
1281    )]
1282    WhilePaused,
1283}
1284
1285/// Per-dep record. Replaces the parallel `deps` / `dep_handles` /
1286/// `dep_terminals` vectors from v1. Canonical spec R2.9.b alignment.
1287///
1288/// Each entry tracks one dep's lifecycle state, wave-scoped batch data,
1289/// and cross-wave `prev_data` for `ctx.prevData` access.
1290pub(crate) struct DepRecord {
1291    /// The dep node this record tracks.
1292    pub(crate) node: NodeId,
1293    /// Last DATA handle from the end of the previous wave. [`NO_HANDLE`]
1294    /// means the dep has never emitted DATA.
1295    pub(crate) prev_data: HandleId,
1296    /// Per-dep dirty flag — awaiting DATA/RESOLVED for current wave.
1297    pub(crate) dirty: bool,
1298    /// Per-dep involved-this-wave flag. Distinguishes:
1299    /// - `involved && data_batch.is_empty()` → dep settled RESOLVED
1300    /// - `!involved && data_batch.is_empty()` → dep was not in this wave
1301    pub(crate) involved_this_wave: bool,
1302    /// DATA handles accumulated this wave. Outside `batch()` scope, at most
1303    /// 1 element. Inside `batch()`, K emits on the source produce K entries
1304    /// per R1.3.6.b coalescing. Each handle holds a `retain_handle` share
1305    /// taken at `deliver_data_to_consumer` time; released at wave-end
1306    /// rotation in `clear_wave_state`.
1307    pub(crate) data_batch: SmallVec<[HandleId; 1]>,
1308    /// Terminal state for this dep. `None` = dep is live.
1309    /// `Some` = dep emitted COMPLETE/ERROR. When ALL entries are Some,
1310    /// the node auto-cascades per Lock 2.B (ERROR dominates COMPLETE).
1311    pub(crate) terminal: Option<TerminalKind>,
1312}
1313
1314impl DepRecord {
1315    fn new(node: NodeId) -> Self {
1316        Self {
1317            node,
1318            prev_data: NO_HANDLE,
1319            dirty: false,
1320            involved_this_wave: false,
1321            data_batch: SmallVec::new(),
1322            terminal: None,
1323        }
1324    }
1325}
1326
1327/// Internal node record. Mirrors `core.ts:132–154` post-D030 unification.
1328///
1329/// **Kind is derived, not stored** (D030, Slice D). `(dep_records.is_empty(),
1330/// fn_id, op, is_dynamic)` uniquely identifies the kind — see [`NodeKind`].
1331/// Helper methods (`is_state()`, `is_producer()`, `is_compute()`,
1332/// `is_operator()`, `skips_auto_cascade()`, `kind()`) cover the common
1333/// predicates without unpacking via [`Core::kind_of`].
1334///
1335/// The 5 bool fields (`has_fired_once`, `dirty`, `involved_this_wave`,
1336/// `has_received_teardown`, `resubscribable`, `is_dynamic`) each represent
1337/// an orthogonal concern. `is_dynamic` is constant per node (set at
1338/// register time); the others are mutable lifecycle state. Collapsing
1339/// them into a bitfield would obscure intent.
1340#[allow(clippy::struct_excessive_bools)]
1341pub(crate) struct NodeRecord {
1342    /// Per-dep records. Replaces the old parallel `deps` / `dep_handles` /
1343    /// `dep_terminals` vecs. Dep NodeIds derived via `dep_ids()`.
1344    pub(crate) dep_records: Vec<DepRecord>,
1345    /// User-fn id for closure-form dispatch. `Some` for Derived / Dynamic /
1346    /// Producer; `None` for State / Operator. (Operator dispatch goes via
1347    /// [`Self::op`] instead.)
1348    pub(crate) fn_id: Option<FnId>,
1349    /// Operator discriminant for typed-op dispatch. `Some` for Operator
1350    /// nodes; `None` otherwise. Mutually exclusive with `fn_id` (a node is
1351    /// either closure-form OR typed-op, never both).
1352    pub(crate) op: Option<OperatorOp>,
1353    /// True for Dynamic nodes (R2.5.3 — fn declares actually-tracked dep
1354    /// indices per fire). False for everything else. Only meaningful when
1355    /// `fn_id.is_some()` AND `!dep_records.is_empty()`.
1356    pub(crate) is_dynamic: bool,
1357    pub(crate) equals: EqualsMode,
1358
1359    // Mutable state
1360    pub(crate) cache: HandleId,
1361    pub(crate) has_fired_once: bool,
1362    pub(crate) subscribers: HashMap<SubscriptionId, Sink>,
1363    /// Monotonic counter bumped on every mutation of [`Self::subscribers`]
1364    /// (insert on subscribe, remove on `Subscription::Drop`, remove on
1365    /// handshake-panic cleanup). Used by
1366    /// [`crate::batch::Core::queue_notify`] to detect mid-wave subscriber-
1367    /// set changes and start a fresh `PendingBatch` with an updated sink
1368    /// snapshot — closes D2 (Slice X4, 2026-05-08): the late-subscriber
1369    /// and multi-emit-per-wave gap where the pre-fix per-node single
1370    /// snapshot meant a sub installed between two emits to the same node
1371    /// in one wave was invisible to the second emit's flush.
1372    ///
1373    /// Per-node (not per-Core) so that a subscribe to node A doesn't
1374    /// invalidate snapshot reuse for node B's pending batch in the same
1375    /// wave.
1376    pub(crate) subscribers_revision: u64,
1377    /// For dynamic nodes: which dep indices fn actually tracks.
1378    /// For static derived: all indices, populated at construction.
1379    pub(crate) tracked: HashSet<usize>,
1380
1381    // Wave-scoped state — cleared at wave end.
1382    pub(crate) dirty: bool,
1383    pub(crate) involved_this_wave: bool,
1384
1385    /// Per-node pause state. Default `Active`. See [`PauseState`].
1386    pub(crate) pause_state: PauseState,
1387    /// Pause behavior mode (canonical-spec §2.6). Set at registration via
1388    /// [`NodeOpts::pausable`]. Default [`PausableMode::Default`] suppresses
1389    /// fn-fire while paused and consolidates N pause-window dep deliveries
1390    /// into one fn-fire on RESUME; `ResumeAll` buffers tier-3/4 outgoing
1391    /// for verbatim replay; `Off` ignores PAUSE entirely. See
1392    /// [`PausableMode`].
1393    pub(crate) pausable: PausableMode,
1394    /// Replay buffer cap (R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
1395    /// `None` disables; `Some(N)` keeps a circular VecDeque of the last N
1396    /// DATA-handle emissions for late-subscriber replay. Each handle in
1397    /// the buffer owns one binding-side retain share, released on evict
1398    /// (cap exceeded) or in `Drop for CoreState`.
1399    pub(crate) replay_buffer_cap: Option<usize>,
1400    pub(crate) replay_buffer: VecDeque<HandleId>,
1401
1402    /// Terminal lifecycle state for THIS node's outgoing stream. Once set,
1403    /// further `emit` calls are silent no-ops, fn no longer fires, and only
1404    /// the terminal message has been queued downstream.
1405    pub(crate) terminal: Option<TerminalKind>,
1406    /// True after the first TEARDOWN has been processed for this node
1407    /// (R2.6.4 / Lock 6.F). Subsequent TEARDOWN deliveries are idempotent
1408    /// — the auto-prepended COMPLETE only fires on the first one. Without
1409    /// this flag, a redundant TEARDOWN delivered via the cascade plus an
1410    /// explicit `core.teardown(node)` would re-emit `[COMPLETE, TEARDOWN]`
1411    /// to subscribers per delivery, which is incorrect.
1412    pub(crate) has_received_teardown: bool,
1413    /// Per R2.2.7 / R2.5.3 — resubscribable terminal lifecycle.
1414    /// When `true` AND `terminal == Some(...)`, a fresh subscribe call
1415    /// will reset the node: clear `terminal`, `has_fired_once`,
1416    /// `has_received_teardown`, all dep_records to sentinel, and drain the
1417    /// pause lockset. Default `false`.
1418    pub(crate) resubscribable: bool,
1419    /// Meta companion nodes attached to this node per R1.3.9.d. When this
1420    /// node tears down, its meta companions are torn down FIRST (before
1421    /// the main node's auto-COMPLETE + TEARDOWN wire emission), so
1422    /// observers see companions terminate before the parent. The ordering
1423    /// is load-bearing — meta nodes typically subscribe to parent state
1424    /// that becomes inconsistent during the parent's destruction phase.
1425    pub(crate) meta_companions: Vec<NodeId>,
1426    /// R5.4 / D011 partial-mode: when `true`, fire_fn skips the R2.5.3
1427    /// first-run gate — the node fires as soon as ANY dep delivers a
1428    /// real handle, even if other deps remain sentinel. Defaults to
1429    /// `false` (gated). Lifted into Core for operator support; for
1430    /// State/Derived/Dynamic nodes the field is settable but the gated
1431    /// path remains the typical caller default.
1432    pub(crate) partial: bool,
1433    /// Generic per-operator scratch slot (Slice C-3, D026). Replaces
1434    /// the typed `operator_state: HandleId` field used by Slices C-1 / C-2.
1435    /// `None` for non-operator kinds and operators with no cross-wave
1436    /// state (Map / Filter / Combine / WithLatestFrom / Merge); `Some`
1437    /// for stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
1438    /// [`DistinctUntilChanged`] / [`Pairwise`] / [`Take`] / [`Skip`] /
1439    /// [`TakeWhile`] / [`Last`]).
1440    ///
1441    /// The boxed value implements
1442    /// [`OperatorScratch`](crate::op_state::OperatorScratch); its
1443    /// `release_handles` method is called from
1444    /// [`reset_for_fresh_lifecycle`] (resubscribable terminal cycle) and
1445    /// from [`Drop for CoreState`].
1446    ///
1447    /// **Refcount discipline:** the state struct owns whatever handle
1448    /// shares it stores (e.g., [`ScanState::acc`](crate::op_state::ScanState::acc),
1449    /// [`LastState::latest`](crate::op_state::LastState::latest)).
1450    /// Per-fire helpers retain the new value before releasing the old;
1451    /// `release_handles` releases the current shares at end-of-life.
1452    pub(crate) op_scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1453}
1454
1455impl NodeRecord {
1456    // ---- Kind predicates (D030 — derived from field shape) ----
1457
1458    /// True iff this is a state node (no deps, no fn, no op).
1459    pub(crate) fn is_state(&self) -> bool {
1460        self.dep_records.is_empty() && self.fn_id.is_none() && self.op.is_none()
1461    }
1462
1463    /// True iff this is a producer node (no deps + has fn + no op).
1464    /// Producers fire fn once on first subscribe; cleanup fires via
1465    /// [`BindingBoundary::producer_deactivate`] (D031, Slice D).
1466    pub(crate) fn is_producer(&self) -> bool {
1467        self.dep_records.is_empty() && self.fn_id.is_some() && self.op.is_none()
1468    }
1469
1470    /// True iff this is a compute node (Derived / Dynamic / Operator) —
1471    /// has at least one dep AND either a fn or an op.
1472    #[allow(dead_code)] // Convenience predicate; callers may use is_state/is_producer instead.
1473    pub(crate) fn is_compute(&self) -> bool {
1474        !self.dep_records.is_empty() && (self.fn_id.is_some() || self.op.is_some())
1475    }
1476
1477    /// True iff this is an Operator node (has op set).
1478    #[allow(dead_code)] // Direct `op.is_some()` is more common; this is a readability sugar.
1479    pub(crate) fn is_operator(&self) -> bool {
1480        self.op.is_some()
1481    }
1482
1483    /// True iff this node opts OUT of Lock 2.B auto-cascade —
1484    /// Operator(Reduce) / Operator(Last) intercept upstream COMPLETE.
1485    pub(crate) fn skips_auto_cascade(&self) -> bool {
1486        match self.op {
1487            Some(op) => NodeKind::Operator(op).skips_auto_cascade(),
1488            None => false,
1489        }
1490    }
1491
1492    /// Compute the public-API [`NodeKind`] from the field shape (D030).
1493    /// Used by [`Core::kind_of`] and rare internal sites that need the
1494    /// enum (most use the predicate methods above).
1495    pub(crate) fn kind(&self) -> NodeKind {
1496        if let Some(op) = self.op {
1497            NodeKind::Operator(op)
1498        } else if self.dep_records.is_empty() {
1499            if self.fn_id.is_some() {
1500                NodeKind::Producer
1501            } else {
1502                NodeKind::State
1503            }
1504        } else if self.is_dynamic {
1505            NodeKind::Dynamic
1506        } else {
1507            NodeKind::Derived
1508        }
1509    }
1510
1511    // ---- Existing accessors ----
1512
1513    /// Iterator over dep NodeIds in declaration order.
1514    pub(crate) fn dep_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
1515        self.dep_records.iter().map(|r| r.node)
1516    }
1517
1518    /// Collected dep NodeIds — for call sites that need a `Vec<NodeId>`.
1519    pub(crate) fn dep_ids_vec(&self) -> Vec<NodeId> {
1520        self.dep_ids().collect()
1521    }
1522
1523    /// Number of deps.
1524    pub(crate) fn dep_count(&self) -> usize {
1525        self.dep_records.len()
1526    }
1527
1528    /// True if any dep is in sentinel state (never emitted DATA and no
1529    /// data this wave). Replaces the old `dep_handles.contains(&NO_HANDLE)`.
1530    pub(crate) fn has_sentinel_deps(&self) -> bool {
1531        self.dep_records
1532            .iter()
1533            .any(|r| r.prev_data == NO_HANDLE && r.data_batch.is_empty())
1534    }
1535
1536    /// Find the index of a dep by NodeId.
1537    pub(crate) fn dep_index_of(&self, dep_id: NodeId) -> Option<usize> {
1538        self.dep_records.iter().position(|r| r.node == dep_id)
1539    }
1540
1541    /// True if ALL dep terminal slots are populated (Lock 2.B cascade check).
1542    pub(crate) fn all_deps_terminal(&self) -> bool {
1543        !self.dep_records.is_empty() && self.dep_records.iter().all(|r| r.terminal.is_some())
1544    }
1545}
1546
1547// Q-beyond Sub-slice 1 (D108, 2026-05-09): `CrossPartitionState` removed.
1548//
1549// The four wave-scoped fields previously held under
1550// `Core::cross_partition: Arc<parking_lot::Mutex<CrossPartitionState>>`
1551// (Q2, 2026-05-09) moved to a per-thread `WaveState` thread_local in
1552// [`crate::batch`]. The bench-driven rationale is documented at
1553// [`crate::batch::WaveState`]: cross-thread cache-line bouncing on the
1554// `cross_partition` mutex was the dominant cost in Phase J's regression,
1555// not single-thread mutex hop count. Per-thread placement eliminates the
1556// bounce point entirely.
1557//
1558// **Refcount discipline preserved.** `wave_cache_snapshots` and
1559// `deferred_handle_releases` still hold binding-side handle retains;
1560// outermost `BatchGuard::drop` drains them via `Core::binding` on both
1561// success and panic paths (the binding ref the prior
1562// `CrossPartitionState` held for its `Drop` impl is no longer needed —
1563// `BatchGuard` already holds a `Core` clone with the binding).
1564
1565/// All mutable Core state, behind one [`parking_lot::Mutex`].
1566///
1567/// **Architecture history.** Q2 (2026-05-09) split four wave-scoped
1568/// cross-partition aggregation fields out into a separate
1569/// `parking_lot::Mutex<CrossPartitionState>` on [`Core`]. Q-beyond
1570/// Sub-slice 1 (D108, 2026-05-09) eliminated `CrossPartitionState`
1571/// entirely — its four fields moved to per-thread `WaveState`
1572/// thread_local in `crate::batch`. Sub-slice 2 + 3 moved 8 more
1573/// wave-scoped fields the same way. /qa F1+F2 (2026-05-10) reverted
1574/// `in_tick` and `currently_firing` BACK to CoreState (per-Core,
1575/// cross-thread visible — load-bearing for cross-Core same-thread
1576/// isolation and cross-thread P13 set_deps check respectively).
1577///
1578/// The D1 patch (2026-05-09) moved Slice G's `tier3_emitted_this_wave`
1579/// set to a per-thread thread-local in `crate::batch` (was briefly
1580/// per-partition under Q3 v1; that placement was vulnerable to mid-wave
1581/// cross-thread `set_deps` partition splits — see
1582/// `docs/porting-deferred.md` "Per-partition state-shard refactor"
1583/// closing summary). Q-beyond
1584/// will continue the shape decomposition by sharding most of the
1585/// remaining fields per-partition.
1586pub(crate) struct CoreState {
1587    pub(crate) next_node_id: u64,
1588    pub(crate) next_subscription_id: u64,
1589    pub(crate) next_lock_id: u64,
1590    pub(crate) nodes: HashMap<NodeId, NodeRecord>,
1591    /// Inverted adjacency: `parent → children`. Updated on registration.
1592    pub(crate) children: HashMap<NodeId, HashSet<NodeId>>,
1593    // Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_fires` and
1594    // `pending_notify` moved to per-thread
1595    // [`crate::batch::WaveState`]. Both fields are wave-scoped — emit
1596    // populates them on the same thread that drains them at wave end.
1597    // Cross-thread emits block on partition `wave_owner` and land in
1598    // the OTHER thread's wave context, so the per-thread placement is
1599    // safe by construction.
1600    //
1601    // Q-beyond Sub-slice 3 (D108, 2026-05-09): `in_tick`,
1602    // `currently_firing`, `deferred_flush_jobs`, `deferred_cleanup_hooks`,
1603    // `pending_wipes`, and `invalidate_hooks_fired_this_wave` likewise
1604    // moved to [`crate::batch::WaveState`]. Same wave-scoped per-thread
1605    // rationale.
1606    /// Core-global cap on per-node pause replay buffer length. `None` means
1607    /// unbounded. Per the user direction (Q1, 2026-05-05): start core-global;
1608    /// per-node override can be added later as a pure addition without API
1609    /// breakage. Default `None`.
1610    pub(crate) pause_buffer_cap: Option<usize>,
1611    /// Core-global cap on wave-drain iterations before
1612    /// [`crate::batch::Core::drain_and_flush`] aborts with a diagnostic panic.
1613    /// Replaces the prior `MAX_DRAIN_ITERATIONS` hard-coded constant
1614    /// (R4.3 / Lock 2.F′). Default `10_000`.
1615    ///
1616    /// The drain loop bound exists to surface runtime cycles
1617    /// (e.g. an operator that re-arms its own `pending_fires` slot during
1618    /// `invoke_fn`) as a panic with context, rather than letting Core
1619    /// spin forever. Structural cycles via [`Core::set_deps`] are
1620    /// rejected at edge-mutation time (`SetDepsError::WouldCreateCycle`);
1621    /// registration is structurally cycle-safe by construction (the new
1622    /// node's id is not allocated until AFTER deps are validated, so deps
1623    /// cannot transitively reach the new node). The drain bound is the
1624    /// safety net for runtime cycles that bypass both static checks.
1625    pub(crate) max_batch_drain_iterations: u32,
1626    /// Wave-active flag — true between outermost `BatchGuard` entry and
1627    /// the matching outermost drop. Per-Core (cross-thread visible) so
1628    /// cross-Core same-thread BatchGuard nesting is correctly isolated:
1629    /// Core-A's `in_tick=true` does NOT cause Core-B's `begin_batch` to
1630    /// see itself as nested.
1631    ///
1632    /// /qa F1 reverted (2026-05-10): briefly placed on per-thread
1633    /// `WaveState` in Sub-slice 3, then moved BACK to `CoreState` after
1634    /// /qa F1 surfaced the cross-Core same-thread isolation regression
1635    /// (a thread holding a live BatchGuard on Core-A and starting a wave
1636    /// on Core-B would observe `ws.in_tick=true` set by Core-A → Core-B
1637    /// becomes non-owning → Core-B's writes drained by Core-A's binding
1638    /// → silent corruption). The other 11 wave-scoped fields stay on
1639    /// per-thread `WaveState` because they're accessed only by the
1640    /// wave-owner thread under `wave_owner` discipline.
1641    pub(crate) in_tick: bool,
1642    /// A6 reentrancy guard stack (Slice F, 2026-05-07): the stack of
1643    /// NodeIds whose fn is currently being invoked. Pushed at the top of
1644    /// `fire_fn` (just before the lock-released `invoke_fn` call) and
1645    /// popped on return / unwind via the [`crate::batch::FiringGuard`]
1646    /// RAII helper. [`Core::set_deps`] consults this set and rejects
1647    /// with [`SetDepsError::ReentrantOnFiringNode`] if `n` is currently
1648    /// firing — preventing the D1 `tracked` index corruption. Read by
1649    /// the P13 partition-migration check (D091) to reject mid-fire
1650    /// `set_deps` that would migrate a firing node's partition.
1651    ///
1652    /// **Per-Core (cross-thread visible).** /qa F2 reverted (2026-05-10):
1653    /// briefly placed on per-thread `WaveState` in Sub-slice 3, then
1654    /// moved BACK to `CoreState` after /qa F2 surfaced the cross-thread
1655    /// P13 bypass (per-thread placement made Thread B's `set_deps` read
1656    /// its own empty stack → P13 silently bypassed for cross-thread
1657    /// `set_deps` calls during Thread A's lock-released `invoke_fn`).
1658    /// Cross-thread visibility on shared `CoreState` is the load-bearing
1659    /// property the D091 check requires.
1660    ///
1661    /// Membership semantics (NOT strict LIFO): consumed via
1662    /// `contains(&n)` membership test. `FiringGuard::drop` pops the
1663    /// right-most matching `node_id` via `rposition` + `swap_remove`;
1664    /// physical order of remaining entries may not match construction
1665    /// order, but membership is preserved.
1666    pub(crate) currently_firing: Vec<NodeId>,
1667    // Q-beyond Sub-slice 3 (D108, 2026-05-09): `deferred_flush_jobs`
1668    // moved to [`crate::batch::WaveState`].
1669    /// Binding-boundary handle for `Drop`-time refcount balancing.
1670    /// `Core` also holds a clone of this Arc; storing it here lets
1671    /// `Drop for CoreState` walk every retained slot and release the
1672    /// binding-side share when the last `Core` clone drops. Without this,
1673    /// `cache` / `terminal` / `dep_terminals` Error / pause-buffer payload
1674    /// handle refs leak in the binding registry until process exit.
1675    pub(crate) binding: Arc<dyn BindingBoundary>,
1676    // Q-beyond Sub-slice 1 (D108, 2026-05-09): `wave_cache_snapshots`
1677    // moved to [`crate::batch::WaveState::wave_cache_snapshots`].
1678    // Q-beyond Sub-slice 1 (D108, 2026-05-09): `pending_auto_resolve`
1679    // moved to [`crate::batch::WaveState::pending_auto_resolve`].
1680    /// Topology-change sinks. Keyed by subscription id for O(1) removal.
1681    pub(crate) topology_sinks: HashMap<u64, crate::topology::TopologySink>,
1682    pub(crate) next_topology_id: u64,
1683    // /qa F2 reverted (2026-05-10): `currently_firing` field is declared
1684    // EARLIER in this struct (above `pause_buffer_cap`). Sub-slice 3
1685    // briefly moved it to `crate::batch::WaveState::currently_firing` on
1686    // the per-thread thread_local, but per-thread placement silently
1687    // bypassed the cross-thread P13 partition-migration check. /qa F2
1688    // (2026-05-10) moved it BACK to CoreState (cross-thread visible).
1689    // Q-beyond Sub-slice 1 (D108, 2026-05-09): `pending_pause_overflow`
1690    // moved to [`crate::batch::WaveState::pending_pause_overflow`].
1691    // Slice G (R1.3.2.d / R1.3.3.a — 2026-05-07): tier3-emitted-this-wave
1692    // tracker MOVED to a per-thread thread-local in `crate::batch`
1693    // (D1 patch, 2026-05-09 — was briefly per-partition under Q3 v1
1694    // 2026-05-09 morning). Wave-scope = thread; per-thread placement
1695    // is robust to mid-wave cross-thread `set_deps` partition splits
1696    // because thread B's split doesn't touch thread A's thread-local.
1697    // See [`crate::batch::TIER3_EMITTED_THIS_WAVE`] for the per-thread
1698    // wave-scope rationale and lifecycle (cleared at outermost
1699    // [`crate::batch::BatchGuard`] drop, both success + panic).
1700    //
1701    // Q-beyond Sub-slice 3 (D108, 2026-05-09):
1702    // `invalidate_hooks_fired_this_wave`, `deferred_cleanup_hooks`,
1703    // and `pending_wipes` moved to [`crate::batch::WaveState`].
1704    // Wave-scoped per-thread; same rationale as the other Sub-slice 3
1705    // migrations.
1706}
1707
1708/// The handle-protocol Core dispatcher.
1709///
1710/// Holds an [`Arc`] to the [`BindingBoundary`] and all dispatch state. Cheap
1711/// to clone (the inner `Arc<Mutex<CoreState>>` is shared); pass `Core` by
1712/// value to threads.
1713///
1714/// # Wave-owner re-entrant mutex (Slice A close /qa, M1)
1715///
1716/// The state lock (`state: Mutex<CoreState>`) is **dropped** around binding
1717/// callbacks (`invoke_fn`, `custom_equals`) so user fns may re-enter Core.
1718/// To preserve serializability of WAVE EXECUTION across threads — without
1719/// re-introducing the lock-held-during-fn-fire deadlock the Slice A close
1720/// refactor lifted — the wave engine acquires `wave_owner` (a
1721/// [`parking_lot::ReentrantMutex`]) for the lifetime of each wave.
1722///
1723/// Properties:
1724///
1725/// - **Same-thread re-entrance is free.** A user fn that calls back into
1726///   `Core::emit` / `Core::pause` / etc. mid-fire re-acquires `wave_owner`
1727///   on the same thread and runs as a nested wave (the inner `run_wave`
1728///   sees `in_tick=true` and skips drain — outer drain picks up).
1729/// - **Cross-thread emits BLOCK** at `wave_owner.lock_arc()` until the
1730///   in-flight wave completes (drain + flush + sink fire all done). This
1731///   serializes wave OWNERSHIP across threads, while still allowing the
1732///   state lock to drop inside the wave for binding callbacks.
1733///
1734/// Without this, Slice A close's lock-released drain let cross-thread
1735/// emits absorb into the in-flight wave's `pending_notify` and return
1736/// before subscribers fire — breaking the user-facing happens-after
1737/// contract that `emit` returning means subscribers have observed.
1738#[derive(Clone)]
1739pub struct Core {
1740    pub(crate) state: Arc<Mutex<CoreState>>,
1741    pub(crate) binding: Arc<dyn BindingBoundary>,
1742    /// Slice X5 (D3 substrate, 2026-05-08) + Slice Y1 / Phase E
1743    /// (wave-engine migration, 2026-05-08): per-subgraph union-find
1744    /// registry. Tracks each registered node's connected-component
1745    /// membership (a "subgraph"). Each component's root carries an
1746    /// `Arc<SubgraphLockBox>` whose `wave_owner: ReentrantMutex<()>`
1747    /// is the per-partition wave-serialization lock — acquired by
1748    /// [`Self::partition_wave_owner_lock_arc`] under the retry-validate
1749    /// loop. Cross-thread emits to disjoint partitions run truly
1750    /// parallel; same-thread re-entry passes through reentrantly.
1751    ///
1752    /// Direct port of [`graphrefly-py`'s
1753    /// `subgraph_locks.py`](https://github.com/graphrefly/graphrefly-py/blob/main/src/graphrefly/core/subgraph_locks.py)
1754    /// design (locked in [`SESSION-rust-port-d3-per-subgraph-parallelism.md`](https://github.com/graphrefly/graphrefly-ts/blob/main/archive/docs/SESSION-rust-port-d3-per-subgraph-parallelism.md)).
1755    pub(crate) registry: Arc<parking_lot::Mutex<crate::subgraph::SubgraphRegistry>>,
1756}
1757
1758/// Weak handle to a [`Core`] — does not contribute to strong refcount.
1759///
1760/// Constructed via [`Core::weak_handle`]; upgraded back to a strong
1761/// [`Core`] via [`WeakCore::upgrade`]. Used by long-lived binding-stored
1762/// closures (notably `ProducerBuildFn`s registered via
1763/// [`graphrefly_operators::ProducerBinding::register_producer_build`])
1764/// to break the BenchBinding → registry → closure → strong-Core cycle
1765/// that would otherwise leak the entire graph state when a `BenchCore`
1766/// drops with active producer registrations.
1767///
1768/// Upgrade on each invocation; if the host `Core` was already dropped,
1769/// `upgrade()` returns `None` and the closure should no-op (the host
1770/// is being torn down, no work to do).
1771#[derive(Clone)]
1772pub struct WeakCore {
1773    state: Weak<Mutex<CoreState>>,
1774    binding: Weak<dyn BindingBoundary>,
1775    registry: Weak<parking_lot::Mutex<crate::subgraph::SubgraphRegistry>>,
1776}
1777
1778impl WeakCore {
1779    /// Try to upgrade back to a strong [`Core`]. Returns `None` if the
1780    /// host `Core`'s strong count has reached zero (i.e. the host
1781    /// `BenchCore` / equivalent owner was dropped).
1782    #[must_use]
1783    pub fn upgrade(&self) -> Option<Core> {
1784        Some(Core {
1785            state: self.state.upgrade()?,
1786            binding: self.binding.upgrade()?,
1787            registry: self.registry.upgrade()?,
1788        })
1789    }
1790}
1791
1792/// RAII guard that owns an [`OperatorScratch`] until either (a) the
1793/// caller `take()`s it for installation, or (b) the guard drops on an
1794/// early return / unwind, in which case the scratch's handle retains
1795/// are released via [`OperatorScratch::release_handles`].
1796///
1797/// Slice H /qa F1 + F2 (2026-05-07): closes two related correctness
1798/// gaps in `Core::register`:
1799///
1800/// 1. **TOCTOU window** — the original three-phase split called
1801///    `lock_state()` twice (once for validation, once for insertion),
1802///    so a concurrent `Core::complete(dep)` on a non-resubscribable
1803///    dep could slip in between the two acquisitions and re-create
1804///    the wedge `RegisterError::TerminalDep` was designed to prevent.
1805///    The guard plus a single locked region for both phases closes
1806///    this gap (release runs lock-released because guard variables
1807///    drop in reverse declaration order — guard declared BEFORE
1808///    `lock_state()`, so the lock guard drops first).
1809///
1810/// 2. **Panic-unsafe scratch leak** — without an RAII drop, a panic
1811///    between `make_op_scratch` (Phase 2) and the explicit
1812///    `if let Err(e)` cleanup branch (e.g., `lock_state()` reentrance
1813///    assert, OOM-as-panic on Vec growth in dep iteration) would
1814///    drop the `Box<dyn OperatorScratch>` without releasing the
1815///    seed/default retain. The guard's `Drop` impl releases on any
1816///    unwind path.
1817///
1818/// Lock-discipline: the guard holds `&dyn BindingBoundary` (through
1819/// the `Arc<dyn BindingBoundary>` it borrows from). On `Drop`, it
1820/// invokes `release_handles` lock-released — fires AFTER any
1821/// `MutexGuard<CoreState>` declared later in the same scope drops
1822/// (LIFO destruction order). Mirrors `Core::resume` Phase 2 release
1823/// pattern.
1824struct ScratchReleaseGuard<'a> {
1825    scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1826    binding: &'a dyn BindingBoundary,
1827}
1828
1829impl<'a> ScratchReleaseGuard<'a> {
1830    fn new(
1831        scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1832        binding: &'a dyn BindingBoundary,
1833    ) -> Self {
1834        Self { scratch, binding }
1835    }
1836
1837    /// Take ownership of the scratch — disarms the release-on-drop
1838    /// behavior. Used on the success path to install the scratch on
1839    /// `NodeRecord.op_scratch`.
1840    fn take(mut self) -> Option<Box<dyn crate::op_state::OperatorScratch>> {
1841        self.scratch.take()
1842    }
1843}
1844
1845impl Drop for ScratchReleaseGuard<'_> {
1846    fn drop(&mut self) {
1847        if let Some(mut scratch) = self.scratch.take() {
1848            scratch.release_handles(self.binding);
1849        }
1850    }
1851}
1852
1853impl Core {
1854    /// Construct a fresh Core wired to the given binding. Pause buffer cap
1855    /// defaults to unbounded; set via [`Self::set_pause_buffer_cap`].
1856    #[must_use]
1857    pub fn new(binding: Arc<dyn BindingBoundary>) -> Self {
1858        Self {
1859            state: Arc::new(Mutex::new(CoreState {
1860                next_node_id: 1,
1861                next_subscription_id: 1,
1862                // A4 (Slice F, 2026-05-07): start `next_lock_id` in the high
1863                // half of the u32 range so `alloc_lock_id` can't collide with
1864                // user-supplied `LockId::new(N)` constructors (which the
1865                // napi-rs binding marshals from `u32` and tests typically use
1866                // in the low range, 1..1024). Phase E /qa F1 (2026-05-08):
1867                // lowered from `1u64 << 32` to `1u64 << 31` so the value
1868                // round-trips through `u32::try_from(...)` at the napi
1869                // boundary — the previous seed errored every napi
1870                // `alloc_lock_id` call. Anti-collision intent (high range vs
1871                // low user range) preserved at half the prior ceiling
1872                // (2^31 ≈ 2 billion allocations per Core, ample for parity
1873                // tests). Lift the floor when the deferred BigInt-narrowing
1874                // migration extends `LockId` to `u64` at the FFI layer
1875                // (porting-deferred "BigInt migration for u32-narrowed napi
1876                // types" entry).
1877                next_lock_id: 1u64 << 31,
1878                nodes: HashMap::new(),
1879                children: HashMap::new(),
1880                // Q-beyond Sub-slice 2 + 3 (D108, 2026-05-09): pending_fires,
1881                // pending_notify, deferred_flush_jobs,
1882                // deferred_cleanup_hooks, pending_wipes, and
1883                // invalidate_hooks_fired_this_wave all live on per-thread
1884                // [`crate::batch::WaveState`].
1885                // /qa F1+F2 reverted (2026-05-10): in_tick + currently_firing
1886                // stay on CoreState (per-Core, cross-thread visible) for
1887                // cross-Core same-thread isolation (F1) and cross-thread
1888                // P13 set_deps check (F2).
1889                in_tick: false,
1890                currently_firing: Vec::new(),
1891                pause_buffer_cap: None,
1892                max_batch_drain_iterations: 10_000,
1893                binding: binding.clone(),
1894                topology_sinks: HashMap::new(),
1895                next_topology_id: 1,
1896            })),
1897            binding,
1898            registry: Arc::new(parking_lot::Mutex::new(
1899                crate::subgraph::SubgraphRegistry::new(),
1900            )),
1901        }
1902    }
1903
1904    /// Acquire the state lock.
1905    ///
1906    /// Post-Slice-E: `Core::subscribe` fires the per-tier handshake
1907    /// LOCK-RELEASED with `wave_owner` held; sink callbacks may freely
1908    /// re-enter Core (`emit` / `complete` / `error` / nested `subscribe`).
1909    /// Same-thread re-entry passes through `wave_owner`'s `ReentrantMutex`
1910    /// transparently; cross-thread emits block on `wave_owner` until the
1911    /// outer subscribe completes, preserving R1.3.5.a happens-after
1912    /// ordering. The previous `IN_HANDSHAKE_FIRE` panic-diagnostic is no
1913    /// longer needed.
1914    pub(crate) fn lock_state(&self) -> MutexGuard<'_, CoreState> {
1915        self.state.lock()
1916    }
1917
1918    /// Whether `self` and `other` point to the same dispatcher state.
1919    /// True when one was produced by `Clone`-ing the other (or they
1920    /// were both cloned from a common ancestor); false for two
1921    /// independently `Core::new`-constructed instances even with the
1922    /// same binding.
1923    ///
1924    /// Used by `graphrefly-graph`'s `mount` to enforce the "shared-Core
1925    /// only" v1 invariant — cross-Core mount is post-M6.
1926    #[must_use]
1927    pub fn same_dispatcher(&self, other: &Core) -> bool {
1928        Arc::ptr_eq(&self.state, &other.state)
1929    }
1930
1931    /// Downgrade to a [`WeakCore`] handle that doesn't contribute to
1932    /// strong refcount of the underlying state / binding / wave_owner.
1933    ///
1934    /// Used by binding-stored long-lived closures (e.g.
1935    /// `register_producer_build`-stored `ProducerBuildFn`s) to avoid the
1936    /// Arc cycle:
1937    ///
1938    /// ```text
1939    /// BenchBinding → registry → producer_builds[fn_id]
1940    ///   → closure → strong Arc<dyn _Binding> → BenchBinding
1941    /// ```
1942    ///
1943    /// Closures hold `WeakCore` and `Weak<dyn _Binding>` instead, then
1944    /// upgrade-on-fire (returning early if either weak is dangling —
1945    /// indicating the host BenchCore was already dropped). Upgraded
1946    /// strong refs live only for the build closure's invocation; sinks
1947    /// the build closure spawns close over those upgraded strongs and
1948    /// stay alive only while the producer is active (cleared via
1949    /// `producer_deactivate` on last-subscriber unsubscribe).
1950    #[must_use]
1951    pub fn weak_handle(&self) -> WeakCore {
1952        WeakCore {
1953            state: Arc::downgrade(&self.state),
1954            binding: Arc::downgrade(&self.binding),
1955            registry: Arc::downgrade(&self.registry),
1956        }
1957    }
1958
1959    /// Number of distinct connected-component partitions tracked by
1960    /// the per-subgraph union-find registry (Slice X5 substrate).
1961    /// Two threads emitting into nodes with distinct partitions will
1962    /// run truly parallel once Y1 wires the wave engine through the
1963    /// registry; X5 reports the partition count for inspection
1964    /// (acceptance bar + debugging) but the wave engine still uses
1965    /// the legacy Core-level `wave_owner`.
1966    #[must_use]
1967    pub fn partition_count(&self) -> usize {
1968        self.registry.lock().component_count()
1969    }
1970
1971    /// Resolve `node`'s partition identity per the per-subgraph
1972    /// union-find registry (Slice X5 substrate). Two nodes with the
1973    /// same `SubgraphId` are connected via dep edges (transitively)
1974    /// and share a partition lock under Y1+; nodes in different
1975    /// partitions can run truly parallel.
1976    ///
1977    /// Returns `None` for unregistered nodes.
1978    #[must_use]
1979    pub fn partition_of(&self, node: NodeId) -> Option<crate::subgraph::SubgraphId> {
1980        self.registry.lock().partition_of(node)
1981    }
1982
1983    // Q3 (2026-05-09) introduced `Core::partition_box_of(node)` to
1984    // resolve a partition's `Arc<SubgraphLockBox>` for per-partition
1985    // state access. The D1 patch (2026-05-09) moved Slice G's
1986    // `tier3_emitted_this_wave` set off `SubgraphLockBox::state` to a
1987    // per-thread thread-local in `crate::batch`, eliminating
1988    // `partition_box_of`'s only callers (`commit_emission` /
1989    // `commit_emission_verbatim`). The helper is REMOVED rather than
1990    // kept dead — Q-beyond will resurrect a similar shape when the
1991    // CoreState shard layout actually needs per-partition lookups.
1992
1993    /// Acquire `seed`'s partition `wave_owner` re-entrant mutex with
1994    /// retry-validate against concurrent union/split. Mirrors
1995    /// graphrefly-py's `subgraph_locks.py::lock_for` retry pattern
1996    /// (lines 154–178): a concurrent `union_nodes` may redirect
1997    /// `seed`'s partition root between our `lock_for` resolve and
1998    /// our `lock_arc` call; if so, the held guard is on a stale
1999    /// (but still valid) `SubgraphLockBox` whose `Arc` no longer
2000    /// matches the registry's canonical box for `seed`'s current
2001    /// root. Release + retry up to [`crate::subgraph::MAX_LOCK_RETRIES`].
2002    ///
2003    /// Returns the held guard. Caller holds it for the wave's
2004    /// duration; drop releases.
2005    ///
2006    /// **Panics** if `seed` is not registered (caller violation —
2007    /// every wave entry takes a `NodeId` already in `s.nodes`, and
2008    /// the P12-fixed lock-discipline guarantees registry membership
2009    /// is published atomically with state). **Panics** on exceeding
2010    /// `MAX_LOCK_RETRIES` — pathological union activity.
2011    ///
2012    /// Slice Y1 / Phase E (2026-05-08).
2013    pub(crate) fn partition_wave_owner_lock_arc(&self, seed: NodeId) -> WaveOwnerGuard {
2014        /// Scope-guard for the H+ thread-local refcount entry. Released on
2015        /// Drop unless `into_consumed()` is called (the success path).
2016        /// Ensures balance even on panic between `check_and_acquire` and
2017        /// successful `WaveOwnerGuard` construction (`lock_arc()` /
2018        /// `lock_for_validate()` could in principle panic; defensive).
2019        struct AcquireGuard {
2020            sid: crate::subgraph::SubgraphId,
2021            consumed: bool,
2022        }
2023        impl AcquireGuard {
2024            fn into_consumed(mut self) {
2025                self.consumed = true;
2026            }
2027        }
2028        impl Drop for AcquireGuard {
2029            fn drop(&mut self) {
2030                if !self.consumed {
2031                    held_partitions::release(self.sid);
2032                }
2033            }
2034        }
2035
2036        for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
2037            let (sid, lock_box) = {
2038                let mut reg = self.registry.lock();
2039                reg.lock_for(seed).expect(
2040                    "partition_wave_owner_lock_arc: seed must be registered \
2041                     (P12-fix invariant: registry membership is published \
2042                     atomically with `s.nodes`)",
2043                )
2044            };
2045            // Phase H+ option (d) /qa N1(a) widened variant: BEFORE
2046            // acquiring the parking_lot lock, check ascending-order if
2047            // this thread already holds at least one partition AND we're
2048            // not in a producer build closure. Panics on violation.
2049            // Also increments the thread-local refcount for `sid`. The
2050            // `AcquireGuard` ensures the refcount is released on EVERY
2051            // exit path — successful return (via `into_consumed()`),
2052            // retry-validate failure (Drop fires), retry-exhaustion panic
2053            // (Drop fires before unwind), or a panic in `lock_arc()` /
2054            // `lock_for_validate()` (Drop fires during unwind).
2055            held_partitions::check_and_acquire(sid);
2056            let acquire_guard = AcquireGuard {
2057                sid,
2058                consumed: false,
2059            };
2060            let inner = lock_box.wave_owner.lock_arc();
2061            // Re-validate post-acquire. If a concurrent `union` redirected
2062            // `seed`'s root between our `lock_for` and `lock_arc`, the
2063            // registry's current box for `seed` differs from what we hold.
2064            let still_valid = self.registry.lock().lock_for_validate(seed, &lock_box);
2065            if still_valid {
2066                acquire_guard.into_consumed();
2067                // `lock_box` is unused after this point — the D1 patch
2068                // moved Slice G tier3 tracking off the per-partition
2069                // `SubgraphLockBox::state` to a per-thread thread-local,
2070                // so the guard no longer carries the box reference.
2071                drop(lock_box);
2072                return WaveOwnerGuard { sid, inner };
2073            }
2074            // Stale — drop the parking_lot guard. The AcquireGuard's
2075            // Drop releases the held_partitions refcount automatically.
2076            // Yield to give the contending writer a chance to make
2077            // forward progress before re-resolving (QA-fix group 2 —
2078            // earlier tight-spin could monopolize a CPU under sustained
2079            // pathological union/split activity).
2080            drop(inner);
2081            drop(acquire_guard);
2082            std::thread::yield_now();
2083        }
2084        panic!(
2085            "partition_wave_owner_lock_arc: exceeded {} retries for seed {:?} \
2086             — pathological concurrent union/split activity. Mirrors py \
2087             `_MAX_LOCK_RETRIES`.",
2088            crate::subgraph::MAX_LOCK_RETRIES,
2089            seed
2090        );
2091    }
2092
2093    /// BFS from `seed` along `s.children` (downstream consumer cascade
2094    /// for DATA / RESOLVED / INVALIDATE / COMPLETE / ERROR / TEARDOWN)
2095    /// and `meta_companions` (R1.3.9.d TEARDOWN cascade). Collects
2096    /// every partition reachable from `seed`, returning the unique
2097    /// `SubgraphId`s sorted ascending — the canonical lock-acquisition
2098    /// order per session-doc Q7 / decision D092 that guarantees
2099    /// deadlock-freedom across cross-partition waves.
2100    ///
2101    /// Holds the state lock + registry lock for the BFS duration
2102    /// (lock order `state → registry` per the P12-fix invariant).
2103    /// Bounded by the cascade graph reachable from `seed`; for typical
2104    /// apps the partition count is small (1–3) and the BFS is
2105    /// negligible relative to wave drain.
2106    ///
2107    /// Used by [`Core::begin_batch_for`] to compute the upfront-
2108    /// acquired partition set for per-seed waves. Closure-form
2109    /// [`Core::batch`] doesn't have a seed and uses
2110    /// [`Core::all_partitions_lock_boxes`] instead.
2111    ///
2112    /// Slice Y1 / Phase E (2026-05-08).
2113    pub(crate) fn compute_touched_partitions(
2114        &self,
2115        seed: NodeId,
2116    ) -> SmallVec<[crate::subgraph::SubgraphId; 4]> {
2117        let s = self.lock_state();
2118        let mut reg = self.registry.lock();
2119        let mut partitions: SmallVec<[crate::subgraph::SubgraphId; 4]> = SmallVec::new();
2120        let mut visited: HashSet<NodeId> = HashSet::default();
2121        let mut stack: SmallVec<[NodeId; 16]> = SmallVec::new();
2122        stack.push(seed);
2123        while let Some(n) = stack.pop() {
2124            if !visited.insert(n) {
2125                continue;
2126            }
2127            if let Some(p) = reg.partition_of(n) {
2128                if !partitions.contains(&p) {
2129                    partitions.push(p);
2130                }
2131            }
2132            if let Some(children) = s.children.get(&n) {
2133                stack.extend(children.iter().copied());
2134            }
2135            if let Some(rec) = s.nodes.get(&n) {
2136                stack.extend(rec.meta_companions.iter().copied());
2137            }
2138        }
2139        partitions.sort_unstable_by_key(|sid| sid.raw());
2140        partitions
2141    }
2142
2143    /// Snapshot of every currently-existing partition's lock box, in
2144    /// ascending [`crate::subgraph::SubgraphId`] order (canonical
2145    /// lock-acquisition order per session-doc Q7 / D092). Used by
2146    /// closure-form [`Core::batch`] / [`Core::begin_batch`] which
2147    /// don't have a known seed and must serialize against every
2148    /// existing partition.
2149    ///
2150    /// Slice Y1 / Phase E (2026-05-08).
2151    pub(crate) fn all_partitions_lock_boxes(
2152        &self,
2153    ) -> Vec<(
2154        crate::subgraph::SubgraphId,
2155        Arc<crate::subgraph::SubgraphLockBox>,
2156    )> {
2157        self.registry.lock().all_partitions()
2158    }
2159}
2160
2161/// Walk the undirected dep-edge graph from `start`, optionally
2162/// skipping ONE edge in both directions, and optionally treating
2163/// additional edges as if present. Returns every reachable
2164/// [`NodeId`].
2165///
2166/// Implementation note: uses a stack (`pop()` on a `SmallVec`) — i.e.
2167/// DFS traversal order. For pure reachability the order doesn't
2168/// matter (the visited set is identical to BFS); the function is
2169/// named "walk" rather than "BFS" to avoid implying that traversal
2170/// distance is meaningful (QA-fix group 2 — earlier name
2171/// `bfs_undirected_dep_graph` was misleading).
2172///
2173/// **Edge convention:** the dep edge `parent → child` represents
2174/// data flow from `parent` (a dep) to `child` (the consumer). It
2175/// appears in `s.children[parent]` as `child`, and in
2176/// `s.nodes[child].dep_records` as `parent`. `skip_edge =
2177/// Some((parent, child))` skips both forward (`parent → child`) and
2178/// backward (`child → parent`) traversals of that edge. Each
2179/// `(p, c)` pair in `extra_edges` is treated as if `c ∈
2180/// s.children[p]` and `p ∈ s.nodes[c].dep_records` — used for
2181/// "what would connectivity look like if THESE edges were also
2182/// present?" lookahead.
2183///
2184/// Used by Slice Y1 / Phase F (D3 split-eager, 2026-05-09):
2185/// - **P13 widening (pre-removal connectivity):** call with
2186///   `skip_edge = Some((removed_parent, removed_child))` AND
2187///   `extra_edges = added_edges_in_set_deps_call` so a `set_deps`
2188///   that simultaneously removes one edge AND adds another path
2189///   isn't falsely flagged as disconnecting (QA-fix #4 2026-05-09 —
2190///   without `extra_edges`, the pre-mutation BFS doesn't see the
2191///   would-be-added edges and rejects the conservative case).
2192/// - **Actual split execution (post-removal):** call with
2193///   `skip_edge = None` and `extra_edges = &[]`; the visited set is
2194///   the keep-side of the split (the side containing `start`).
2195pub(crate) fn walk_undirected_dep_graph(
2196    s: &CoreState,
2197    start: NodeId,
2198    skip_edge: Option<(NodeId, NodeId)>,
2199    extra_edges: &[(NodeId, NodeId)],
2200) -> HashSet<NodeId> {
2201    let mut visited: HashSet<NodeId> = HashSet::default();
2202    let mut queue: SmallVec<[NodeId; 32]> = SmallVec::new();
2203    queue.push(start);
2204    while let Some(cur) = queue.pop() {
2205        if !visited.insert(cur) {
2206            continue;
2207        }
2208        if let Some(consumers) = s.children.get(&cur) {
2209            for &c in consumers {
2210                let is_skipped = skip_edge.is_some_and(|(sp, sc)| cur == sp && c == sc);
2211                if !is_skipped && !visited.contains(&c) {
2212                    queue.push(c);
2213                }
2214            }
2215        }
2216        if let Some(rec) = s.nodes.get(&cur) {
2217            for d in rec.dep_records.iter().map(|r| r.node) {
2218                let is_skipped = skip_edge.is_some_and(|(sp, sc)| cur == sc && d == sp);
2219                if !is_skipped && !visited.contains(&d) {
2220                    queue.push(d);
2221                }
2222            }
2223        }
2224        // Virtual extra edges (e.g. would-be-added edges in
2225        // pre-mutation BFS).
2226        for &(ep, ec) in extra_edges {
2227            if cur == ep && !visited.contains(&ec) {
2228                queue.push(ec);
2229            }
2230            if cur == ec && !visited.contains(&ep) {
2231                queue.push(ep);
2232            }
2233        }
2234    }
2235    visited
2236}
2237
2238impl Core {
2239    /// Test-only inspection: number of `PendingBatch`es queued for
2240    /// `node` in the current wave. Used by Slice X4 D2 regression
2241    /// tests to pin the "common case = single batch, no SmallVec
2242    /// spill" perf invariant.
2243    ///
2244    /// Returns `None` if no `pending_notify` entry exists for `node`
2245    /// (no tier-1+ message has been queued for this node yet in this
2246    /// wave). `Some(0)` is unreachable by construction (a vacant
2247    /// entry implies no batches; an occupied entry has at least one).
2248    #[cfg(any(test, debug_assertions))]
2249    #[must_use]
2250    pub fn pending_batch_count(&self, node: NodeId) -> Option<usize> {
2251        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
2252        // on per-thread `WaveState`. Test callers run on the same
2253        // thread that ran the wave, so the per-thread placement is
2254        // observable here.
2255        crate::batch::with_wave_state(|ws| {
2256            ws.pending_notify
2257                .get(&node)
2258                .map(|entry| entry.batches.len())
2259        })
2260    }
2261
2262    /// Configure the Core-global cap on pause replay buffer length. When set,
2263    /// any per-node pause buffer that would exceed `cap` drops the oldest
2264    /// message(s) from the front; the dropped count is reported back via the
2265    /// resume callback (see [`ResumeReport`]). `None` (default) means
2266    /// unbounded; messages buffer indefinitely until the lockset clears.
2267    pub fn set_pause_buffer_cap(&self, cap: Option<usize>) {
2268        self.lock_state().pause_buffer_cap = cap;
2269    }
2270
2271    /// Configure the replay buffer cap on `node_id` (R2.6.5 / Lock 6.G —
2272    /// Slice E1, 2026-05-07). `None` disables the buffer. `Some(N)` keeps
2273    /// the last `N` DATA emissions in a circular buffer; late subscribers
2274    /// receive them as part of the per-tier handshake (between START and
2275    /// any terminal). Switching from a larger cap to a smaller cap evicts
2276    /// the front of the buffer to fit; switching to `None` drains the
2277    /// buffer entirely. Each evicted/drained handle's retain is released
2278    /// back to the binding.
2279    ///
2280    /// # Panics
2281    ///
2282    /// Panics if `node_id` is not registered.
2283    pub fn set_replay_buffer_cap(&self, node_id: NodeId, cap: Option<usize>) {
2284        // QA A7 (2026-05-07): normalize `Some(0)` to `None`. Two ways to
2285        // express "disabled" is confusing: `push_replay_buffer` already
2286        // treats `Some(0)` as no-op, so persisting it adds nothing.
2287        let cap = match cap {
2288            Some(0) => None,
2289            other => other,
2290        };
2291        let to_release: Vec<HandleId> = {
2292            let mut s = self.lock_state();
2293            let rec = s.require_node_mut(node_id);
2294            rec.replay_buffer_cap = cap;
2295            match cap {
2296                None => rec.replay_buffer.drain(..).collect(),
2297                Some(c) => {
2298                    let mut drained = Vec::new();
2299                    while rec.replay_buffer.len() > c {
2300                        if let Some(h) = rec.replay_buffer.pop_front() {
2301                            drained.push(h);
2302                        }
2303                    }
2304                    drained
2305                }
2306            }
2307        };
2308        for h in to_release {
2309            self.binding.release_handle(h);
2310        }
2311    }
2312
2313    /// Reconfigure the pause mode for `node_id` (canonical §2.6 — Slice F
2314    /// audit close, 2026-05-07). Default for new nodes is
2315    /// [`PausableMode::Default`]; switch to [`PausableMode::ResumeAll`]
2316    /// for nodes whose pause-window emit history must be observable
2317    /// verbatim, or [`PausableMode::Off`] for nodes intrinsically
2318    /// pause-immune.
2319    ///
2320    /// # Errors
2321    ///
2322    /// - [`SetPausableModeError::UnknownNode`] — `node_id` is not
2323    ///   registered.
2324    /// - [`SetPausableModeError::WhilePaused`] — the node currently
2325    ///   holds at least one pause lock. Changing mode mid-pause would
2326    ///   lose buffered content or strand a `pending_wave` flag — resume
2327    ///   all locks first.
2328    pub fn set_pausable_mode(
2329        &self,
2330        node_id: NodeId,
2331        mode: PausableMode,
2332    ) -> Result<(), SetPausableModeError> {
2333        let mut s = self.lock_state();
2334        let rec = s
2335            .nodes
2336            .get_mut(&node_id)
2337            .ok_or(SetPausableModeError::UnknownNode(node_id))?;
2338        if rec.pause_state.is_paused() {
2339            return Err(SetPausableModeError::WhilePaused);
2340        }
2341        rec.pausable = mode;
2342        Ok(())
2343    }
2344
2345    /// Configure the wave-drain iteration cap (R4.3 / Lock 2.F′). The wave
2346    /// engine aborts a drain after `cap` iterations with a diagnostic panic.
2347    /// Default is `10_000` — high enough to avoid false positives on legitimate
2348    /// fan-in cascades, low enough to surface runtime cycles within seconds.
2349    ///
2350    /// Lower this only when running adversarial / property-based tests that
2351    /// want fast cycle detection. Raise it only with concrete evidence that a
2352    /// legitimate workload needs more iterations than the default — and even
2353    /// then, prefer to tune the workload (per-subgraph batching, etc.) over
2354    /// raising the cap.
2355    ///
2356    /// # Panics
2357    ///
2358    /// Panics if `cap == 0` — a zero cap would abort every wave on the very
2359    /// first iteration, deadlocking any subsequent dispatcher work.
2360    pub fn set_max_batch_drain_iterations(&self, cap: u32) {
2361        assert!(cap > 0, "max_batch_drain_iterations must be > 0");
2362        self.lock_state().max_batch_drain_iterations = cap;
2363    }
2364
2365    /// Send a message UPSTREAM from `node_id` to each of its declared deps
2366    /// (canonical R1.4.1 — Slice F audit, F2 / 2026-05-07).
2367    ///
2368    /// The dispatcher rejects tier-3 (DATA / RESOLVED) and tier-5
2369    /// (COMPLETE / ERROR) per R1.4.1: value and terminal-lifecycle planes
2370    /// are downstream-only. All other tiers (0 START, 1 DIRTY, 2 PAUSE /
2371    /// RESUME, 4 INVALIDATE, 6 TEARDOWN) pass.
2372    ///
2373    /// # Routing per tier
2374    ///
2375    /// - **Tier 0 ([`Message::Start`]):** no-op. START is a per-subscription
2376    ///   handshake, not a routable wire signal — sending it upstream has no
2377    ///   well-defined target.
2378    /// - **Tier 1 ([`Message::Dirty`]):** no-op. The dep's "something
2379    ///   changed" notification is its own [`Self::emit`] / commit
2380    ///   responsibility; ignoring upstream DIRTY hints is safe.
2381    /// - **Tier 2 ([`Message::Pause`] / [`Message::Resume`]):** translates
2382    ///   to [`Self::pause`] / [`Self::resume`] on each dep. Lock id is
2383    ///   forwarded verbatim. Errors from individual deps are accumulated
2384    ///   in the `dep_errors` field of the returned report.
2385    /// - **Tier 4 ([`Message::Invalidate`]):** translates to
2386    ///   [`Self::invalidate`] on each dep. Note: canonical R1.4.2
2387    ///   distinguishes "downstream INVALIDATE" (cache clear + cascade) from
2388    ///   "upstream INVALIDATE" (plain forward, no self-process). The Rust
2389    ///   port v1 SIMPLIFICATION delegates to the same `Core::invalidate`
2390    ///   path — upstream INVALIDATE here DOES clear dep caches and cascade.
2391    ///   If a "plain forward" mode surfaces as a real consumer need, add
2392    ///   `up_with_options`.
2393    /// - **Tier 6 ([`Message::Teardown`]):** translates to
2394    ///   [`Self::teardown`] on each dep. Cascades per the standard
2395    ///   teardown path.
2396    ///
2397    /// # Errors
2398    ///
2399    /// - [`UpError::UnknownNode`] — `node_id` is not registered.
2400    /// - [`UpError::TierForbidden`] — tier 3 or tier 5.
2401    pub fn up(&self, node_id: NodeId, message: Message) -> Result<(), UpError> {
2402        // QA A10 (2026-05-07): check unknown node BEFORE tier rejection
2403        // for consistent error UX — `up(unknown, Data)` and
2404        // `up(unknown, Pause)` both report `UnknownNode` rather than
2405        // splitting on the tier.
2406        let dep_ids: Vec<NodeId> = {
2407            let s = self.lock_state();
2408            let rec = s.nodes.get(&node_id).ok_or(UpError::UnknownNode(node_id))?;
2409            rec.dep_ids_vec()
2410        };
2411        let tier = message.tier();
2412        if tier == 3 || tier == 5 {
2413            return Err(UpError::TierForbidden { tier });
2414        }
2415        for dep_id in dep_ids {
2416            match message {
2417                Message::Pause(lock) => {
2418                    let _ = self.pause(dep_id, lock);
2419                }
2420                Message::Resume(lock) => {
2421                    let _ = self.resume(dep_id, lock);
2422                }
2423                Message::Invalidate => {
2424                    self.invalidate(dep_id);
2425                }
2426                Message::Teardown => {
2427                    self.teardown(dep_id);
2428                }
2429                // Tier 0 START + tier 1 DIRTY: no-op upstream per the
2430                // routing table above.
2431                _ => {}
2432            }
2433        }
2434        Ok(())
2435    }
2436
2437    /// Allocate a unique [`LockId`] for use with [`Self::pause`] /
2438    /// [`Self::resume`]. Convenience for callers that don't already have an
2439    /// id-allocation scheme; user-supplied ids work too.
2440    #[must_use]
2441    pub fn alloc_lock_id(&self) -> LockId {
2442        let mut s = self.lock_state();
2443        let id = LockId::new(s.next_lock_id);
2444        s.next_lock_id += 1;
2445        id
2446    }
2447
2448    // -------------------------------------------------------------------
2449    // Registration — unified `register()` (D030, Slice D)
2450    //
2451    // All node kinds (State / Producer / Derived / Dynamic / Operator)
2452    // funnel through `Core::register(NodeRegistration) -> NodeId`. Sugar
2453    // wrappers (`register_state` / `register_producer` / `register_derived`
2454    // / `register_dynamic` / `register_operator`) build a `NodeRegistration`
2455    // and delegate. There is no parallel registration path internally.
2456    // -------------------------------------------------------------------
2457
2458    /// Unified node registration (D030).
2459    ///
2460    /// `reg` describes the node's identity (deps + closure-form fn id OR
2461    /// typed-op + per-kind opts). The kind is **derived from the field
2462    /// shape**, not stored — see [`NodeKind`].
2463    ///
2464    /// Sugar wrappers below ([`Self::register_state`],
2465    /// [`Self::register_producer`], [`Self::register_derived`],
2466    /// [`Self::register_dynamic`], [`Self::register_operator`]) build the
2467    /// registration for the common kinds and delegate here. Direct callers
2468    /// that need uncommon combinations (e.g., a partial-true derived) can
2469    /// invoke this method directly.
2470    ///
2471    /// # Errors
2472    ///
2473    /// Errors are returned in evaluation order — earlier phases short-circuit
2474    /// later ones, so a single registration produces at most one variant.
2475    ///
2476    /// **Phase 1 — lock-released, side-effect-free validation:**
2477    /// - [`RegisterError::OperatorWithoutDeps`] — `reg` carries an op but
2478    ///   `deps` is empty. Operator nodes need at least one dep — for
2479    ///   subscription-managed combinators with no declared deps, use
2480    ///   [`Self::register_producer`] instead.
2481    /// - [`RegisterError::InitialOnlyForStateNodes`] — `reg.opts.initial`
2482    ///   is non-sentinel for a non-state shape (deps non-empty, or
2483    ///   fn_or_op present). State nodes are the only kind with an initial
2484    ///   cache.
2485    ///
2486    /// **Phase 2 — operator scratch construction (lock-released):**
2487    /// - [`RegisterError::OperatorSeedSentinel`] — `reg` carries `Op(Scan)`
2488    ///   / `Op(Reduce)` with a `NO_HANDLE` seed. R2.5.3 — stateful folders
2489    ///   must have a real seed.
2490    ///
2491    /// **Phase 3 — state-lock validation (folded with insertion under a
2492    /// single lock acquisition per /qa F1 to prevent TOCTOU between
2493    /// validation and `nodes.insert`):**
2494    /// - [`RegisterError::UnknownDep`] — any element of `reg.deps` is not
2495    ///   a registered node id.
2496    /// - [`RegisterError::TerminalDep`] — a dep is terminal (COMPLETE /
2497    ///   ERROR) AND not resubscribable — would create a permanent wedge.
2498    ///
2499    /// All errors are construction-time invariants — the dispatcher
2500    /// rejects the registration before any reactive state is created.
2501    /// On `Err`, no node has been added and any handle retains taken on
2502    /// the way in (operator scratch seed retains via
2503    /// [`BindingBoundary::retain_handle`]) have been released
2504    /// lock-released — see [`ScratchReleaseGuard`] for the RAII
2505    /// discipline that covers both early-return AND unwind paths.
2506    /// `Last { default }` retains its `default` handle on the same
2507    /// release path.
2508    pub fn register(&self, reg: NodeRegistration) -> Result<NodeId, RegisterError> {
2509        let NodeRegistration {
2510            deps,
2511            fn_or_op,
2512            opts,
2513        } = reg;
2514        let NodeOpts {
2515            initial,
2516            equals,
2517            partial,
2518            is_dynamic,
2519            pausable,
2520            replay_buffer,
2521        } = opts;
2522
2523        // Derive the field shape from fn_or_op + deps.
2524        let (fn_id, op) = match fn_or_op {
2525            Some(NodeFnOrOp::Fn(f)) => (Some(f), None),
2526            Some(NodeFnOrOp::Op(o)) => (None, Some(o)),
2527            None => (None, None),
2528        };
2529
2530        // Phase 1 — lock-released, side-effect-free validation. Errors
2531        // here return BEFORE any handle retain is taken.
2532        //
2533        //   - State (no deps + no fn + no op) is the only kind with `initial`.
2534        //   - Dynamic flag only meaningful when fn + non-empty deps.
2535        //   - Operator (op present) must have deps (P9: operator without deps
2536        //     would skip activation — use a producer instead).
2537        let is_state_shape = deps.is_empty() && fn_id.is_none() && op.is_none();
2538        if op.is_some() && deps.is_empty() {
2539            return Err(RegisterError::OperatorWithoutDeps);
2540        }
2541        if initial != NO_HANDLE && !is_state_shape {
2542            return Err(RegisterError::InitialOnlyForStateNodes);
2543        }
2544
2545        // Phase 2 — build per-operator scratch struct (may take handle
2546        // retains via `binding.retain_handle` for Scan/Reduce/Last seed).
2547        // Lock-released per Slice E (D045) handshake discipline. Returns
2548        // `OperatorSeedSentinel` BEFORE retain so an Err leaves no
2549        // dangling handles.
2550        let scratch = match op {
2551            Some(operator_op) => self.make_op_scratch(operator_op)?,
2552            None => None,
2553        };
2554
2555        // Wrap scratch in an RAII guard immediately after Phase 2. From
2556        // here on, ANY early return / unwind path correctly releases the
2557        // scratch's handle retains via `OperatorScratch::release_handles`
2558        // (Slice H /qa F2 — defense against panics between Phase 2 and
2559        // Phase 3 cleanup branch). Lock-released because the guard is
2560        // declared BEFORE `lock_state()` below — variable destruction
2561        // order is reverse declaration order, so the `MutexGuard` drops
2562        // first on any return path.
2563        let scratch_guard = ScratchReleaseGuard::new(scratch, &*self.binding);
2564
2565        // Phase 3 — state-lock-required validation, FOLDED with insertion
2566        // under a single `lock_state()` acquisition per /qa F1. The
2567        // pre-/qa version split this into two acquisitions (one for
2568        // validation, one for `alloc_node_id` + `nodes.insert`), opening
2569        // a TOCTOU window where a concurrent `Core::complete(dep)` on a
2570        // non-resubscribable dep could slip in and recreate the wedge
2571        // `TerminalDep` was designed to prevent. Single locked region
2572        // closes the gap.
2573        let mut s = self.lock_state();
2574
2575        for &dep in &deps {
2576            if !s.nodes.contains_key(&dep) {
2577                return Err(RegisterError::UnknownDep(dep));
2578            }
2579        }
2580        // Slice F audit (2026-05-07): mirror `set_deps`'s `TerminalDep`
2581        // rejection at registration time. Adding a non-resubscribable
2582        // terminal node as a dep at registration creates a permanent wedge.
2583        for &dep in &deps {
2584            let dep_rec = s.require_node(dep);
2585            if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
2586                return Err(RegisterError::TerminalDep(dep));
2587            }
2588        }
2589
2590        // Validation passed — install. Take scratch out of the guard
2591        // (disarms the release-on-drop) and continue using `s`.
2592        let installed_scratch = scratch_guard.take();
2593
2594        let id = s.alloc_node_id();
2595
2596        // `tracked`: Static derived + Operator track all deps; Dynamic
2597        // starts empty and fills via fn return; State / Producer have no
2598        // deps so tracked is empty.
2599        let tracked: HashSet<usize> = if op.is_some() {
2600            (0..deps.len()).collect()
2601        } else if is_dynamic {
2602            HashSet::new()
2603        } else if fn_id.is_some() && !deps.is_empty() {
2604            // Static derived
2605            (0..deps.len()).collect()
2606        } else {
2607            HashSet::new()
2608        };
2609
2610        let dep_records: Vec<DepRecord> = deps.iter().map(|&d| DepRecord::new(d)).collect();
2611
2612        let rec = NodeRecord {
2613            dep_records,
2614            fn_id,
2615            op,
2616            is_dynamic,
2617            equals,
2618            cache: initial,
2619            has_fired_once: initial != NO_HANDLE,
2620            subscribers: HashMap::new(),
2621            subscribers_revision: 0,
2622            tracked,
2623            dirty: false,
2624            involved_this_wave: false,
2625            pause_state: PauseState::Active,
2626            pausable,
2627            replay_buffer_cap: replay_buffer,
2628            replay_buffer: VecDeque::new(),
2629            terminal: None,
2630            has_received_teardown: false,
2631            resubscribable: false,
2632            meta_companions: Vec::new(),
2633            partial,
2634            op_scratch: installed_scratch,
2635        };
2636        s.nodes.insert(id, rec);
2637        s.children.insert(id, HashSet::new());
2638        for &dep in &deps {
2639            s.children.entry(dep).or_default().insert(id);
2640        }
2641        // Slice Y1 (D3 / D090 — P12 fix, 2026-05-08): maintain partition
2642        // membership BEFORE dropping the state lock. Closes the
2643        // eventual-consistency window where a concurrent thread observed
2644        // the new node in `s.nodes` / new edges in `s.children` but the
2645        // registry hadn't unioned the partition yet. Today benign
2646        // (`partition_of` is debug-only); under Y1's wave engine
2647        // migration `lock_for(node)` consumes registry state on the hot
2648        // path, and the window means `lock_for` could resolve to a
2649        // partition that's been topologically unioned in `s.children`
2650        // but not yet in `registry`.
2651        //
2652        // **Lock-discipline invariant:** `state lock → registry mutex`
2653        // (one-way; never registry → state). Registry mutex is
2654        // uncontended in the X5 substrate — the only acquisition sites
2655        // are this one + `Core::set_deps` + the read-only accessors
2656        // `partition_count`/`partition_of` and (Y1+) `lock_for` — none
2657        // of which take the state lock — so the inner critical section
2658        // adds negligible latency.
2659        {
2660            let mut reg = self.registry.lock();
2661            reg.ensure_registered(id);
2662            for &dep in &deps {
2663                reg.union_nodes(id, dep);
2664            }
2665        }
2666        drop(s);
2667        self.fire_topology_event(&crate::topology::TopologyEvent::NodeRegistered(id));
2668        Ok(id)
2669    }
2670
2671    /// Sugar over [`Self::register`] — register a state node. `initial`
2672    /// may be [`NO_HANDLE`] to start sentinel.
2673    ///
2674    /// `partial` is accepted for surface consistency (D019); for state
2675    /// nodes it has no effect (state nodes don't fire fn).
2676    ///
2677    /// # Errors
2678    ///
2679    /// State registration is structurally simple — no deps, no op — so
2680    /// the only reachable variant is none in practice. Returns
2681    /// [`Result`] for surface consistency with [`Self::register`].
2682    pub fn register_state(
2683        &self,
2684        initial: HandleId,
2685        partial: bool,
2686    ) -> Result<NodeId, RegisterError> {
2687        self.register(NodeRegistration {
2688            deps: Vec::new(),
2689            fn_or_op: None,
2690            opts: NodeOpts {
2691                initial,
2692                partial,
2693                ..NodeOpts::default()
2694            },
2695        })
2696    }
2697
2698    /// Sugar over [`Self::register`] — register a producer node (D031,
2699    /// Slice D). No deps; fn fires once on first subscribe; cleanup runs
2700    /// via [`BindingBoundary::producer_deactivate`] when the last
2701    /// subscriber unsubscribes.
2702    ///
2703    /// The fn body uses the binding's `ProducerCtx`-equivalent helper
2704    /// (see `graphrefly-operators::producer`) to subscribe to other Core
2705    /// nodes — the zip / concat / race / takeUntil pattern.
2706    ///
2707    /// # Errors
2708    ///
2709    /// Producer registration has no user-supplied deps, so structurally
2710    /// none of [`RegisterError`]'s variants are reachable. Returns
2711    /// [`Result`] for surface consistency with [`Self::register`].
2712    pub fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
2713        self.register(NodeRegistration {
2714            deps: Vec::new(),
2715            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2716            opts: NodeOpts {
2717                // Producers have no deps — the first-run gate is degenerate.
2718                partial: true,
2719                ..NodeOpts::default()
2720            },
2721        })
2722    }
2723
2724    /// Sugar over [`Self::register`] — register a derived (static) node.
2725    /// `partial` controls the R2.5.3 first-run gate (D011).
2726    ///
2727    /// # Errors
2728    ///
2729    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2730    ///   registered.
2731    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2732    ///   resubscribable.
2733    pub fn register_derived(
2734        &self,
2735        deps: &[NodeId],
2736        fn_id: FnId,
2737        equals: EqualsMode,
2738        partial: bool,
2739    ) -> Result<NodeId, RegisterError> {
2740        self.register(NodeRegistration {
2741            deps: deps.to_vec(),
2742            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2743            opts: NodeOpts {
2744                equals,
2745                partial,
2746                ..NodeOpts::default()
2747            },
2748        })
2749    }
2750
2751    /// Sugar over [`Self::register`] — register a dynamic node (fn
2752    /// declares its actually-tracked dep indices per fire). `partial`
2753    /// controls the R2.5.3 first-run gate (D011).
2754    ///
2755    /// # Errors
2756    ///
2757    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2758    ///   registered.
2759    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2760    ///   resubscribable.
2761    pub fn register_dynamic(
2762        &self,
2763        deps: &[NodeId],
2764        fn_id: FnId,
2765        equals: EqualsMode,
2766        partial: bool,
2767    ) -> Result<NodeId, RegisterError> {
2768        self.register(NodeRegistration {
2769            deps: deps.to_vec(),
2770            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2771            opts: NodeOpts {
2772                equals,
2773                partial,
2774                is_dynamic: true,
2775                ..NodeOpts::default()
2776            },
2777        })
2778    }
2779
2780    /// Build a fresh [`OperatorScratch`](crate::op_state::OperatorScratch)
2781    /// box for an operator variant, taking any required handle retains.
2782    /// Shared between `register_operator` (initial install) and
2783    /// `reset_for_fresh_lifecycle` (resubscribable cycle re-install).
2784    ///
2785    /// # Errors
2786    ///
2787    /// Returns [`RegisterError::OperatorSeedSentinel`] if `op` is `Scan`
2788    /// / `Reduce` with a [`NO_HANDLE`] seed (R2.5.3 — stateful folders
2789    /// must have a real seed). Refcount discipline: the seed-sentinel
2790    /// check happens BEFORE [`BindingBoundary::retain_handle`], so an
2791    /// `Err` leaves no handles dangling.
2792    fn make_op_scratch(
2793        &self,
2794        op: OperatorOp,
2795    ) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
2796        use crate::op_state::{
2797            DistinctState, LastState, PairwiseState, ReduceState, ScanState, SkipState, TakeState,
2798            TakeWhileState,
2799        };
2800        // Slice H (2026-05-07): Scan/Reduce seed-sentinel checks happen
2801        // BEFORE retain_handle so an Err return leaves no handles dangling.
2802        //
2803        // Slice H /qa F13 (2026-05-07): for retaining variants, allocate
2804        // the `Box<State>` BEFORE calling `binding.retain_handle`. If
2805        // `Box::new` panics (e.g., OOM-as-panic), no retain has happened
2806        // yet — no leak. If `retain_handle` panics after Box succeeds,
2807        // the `Box<State>` is dropped on unwind; State has no handle yet
2808        // (we haven't touched the registry refcount), so still no leak.
2809        // Caller wraps the returned scratch in `ScratchReleaseGuard` to
2810        // cover panics AFTER make_op_scratch returns.
2811        match op {
2812            OperatorOp::Scan { seed, .. } => {
2813                if seed == NO_HANDLE {
2814                    return Err(RegisterError::OperatorSeedSentinel);
2815                }
2816                let state = Box::new(ScanState { acc: seed });
2817                self.binding.retain_handle(seed);
2818                Ok(Some(state))
2819            }
2820            OperatorOp::Reduce { seed, .. } => {
2821                if seed == NO_HANDLE {
2822                    return Err(RegisterError::OperatorSeedSentinel);
2823                }
2824                let state = Box::new(ReduceState { acc: seed });
2825                self.binding.retain_handle(seed);
2826                Ok(Some(state))
2827            }
2828            OperatorOp::DistinctUntilChanged { .. } => Ok(Some(Box::new(DistinctState::default()))),
2829            OperatorOp::Pairwise { .. } => Ok(Some(Box::new(PairwiseState::default()))),
2830            OperatorOp::Take { .. } => Ok(Some(Box::new(TakeState::default()))),
2831            OperatorOp::Skip { .. } => Ok(Some(Box::new(SkipState::default()))),
2832            OperatorOp::TakeWhile { .. } => Ok(Some(Box::new(TakeWhileState))),
2833            OperatorOp::Last { default } => {
2834                let state = Box::new(LastState {
2835                    latest: NO_HANDLE,
2836                    default,
2837                });
2838                if default != NO_HANDLE {
2839                    self.binding.retain_handle(default);
2840                }
2841                Ok(Some(state))
2842            }
2843            OperatorOp::Map { .. }
2844            | OperatorOp::Filter { .. }
2845            | OperatorOp::Combine { .. }
2846            | OperatorOp::WithLatestFrom { .. }
2847            | OperatorOp::Merge => Ok(None),
2848        }
2849    }
2850
2851    /// Sugar over [`Self::register`] — register a built-in operator node
2852    /// (Slice C-1, D009; D026 generic scratch). The operator dispatch path
2853    /// lives in `fire_operator`; `op` selects which per-operator FFI
2854    /// method on [`BindingBoundary`] gets called per fire.
2855    ///
2856    /// For stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
2857    /// [`Last`] with a default), the seed/default handle is captured
2858    /// into the appropriate
2859    /// [`OperatorScratch`](crate::op_state::OperatorScratch) struct
2860    /// stored at [`NodeRecord::op_scratch`], and Core takes one retain
2861    /// share via [`BindingBoundary::retain_handle`].
2862    ///
2863    /// # Errors
2864    ///
2865    /// - [`RegisterError::OperatorWithoutDeps`] — `deps` is empty (use
2866    ///   [`Self::register_producer`] instead).
2867    /// - [`RegisterError::OperatorSeedSentinel`] — `op` is
2868    ///   [`OperatorOp::Scan`] / [`OperatorOp::Reduce`] with a
2869    ///   [`NO_HANDLE`] seed.
2870    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2871    ///   registered.
2872    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2873    ///   resubscribable.
2874    pub fn register_operator(
2875        &self,
2876        deps: &[NodeId],
2877        op: OperatorOp,
2878        opts: OperatorOpts,
2879    ) -> Result<NodeId, RegisterError> {
2880        self.register(NodeRegistration {
2881            deps: deps.to_vec(),
2882            fn_or_op: Some(NodeFnOrOp::Op(op)),
2883            opts: NodeOpts {
2884                equals: opts.equals,
2885                partial: opts.partial,
2886                ..NodeOpts::default()
2887            },
2888        })
2889    }
2890
2891    // -------------------------------------------------------------------
2892    // Subscription
2893    // -------------------------------------------------------------------
2894
2895    /// Subscribe a sink to a node. Returns a [`Subscription`] handle —
2896    /// dropping the handle unsubscribes the sink. Per §10.12, no manual
2897    /// `unsubscribe(node, id)` call is required.
2898    ///
2899    /// Push-on-subscribe (R1.2.3, R2.2.3 step 4): the sink is registered AFTER
2900    /// the START handshake fires. The handshake contents depend on node
2901    /// state:
2902    /// - Sentinel cache + live (non-terminal): `[START]`
2903    /// - Cached + live: `[START, DATA(handle)]`
2904    /// - Cached + terminated (non-resubscribable): `[START, DATA(handle), <terminal>]`
2905    /// - Sentinel + terminated (non-resubscribable): `[START, <terminal>]`
2906    ///
2907    /// Resubscribable terminal lifecycle (R2.2.7 / R2.5.3): if the node was
2908    /// marked resubscribable via [`Self::set_resubscribable`] AND has
2909    /// terminated, the subscribe call first **resets** the node — clears
2910    /// `terminal`, `has_fired_once`, `has_received_teardown`, all
2911    /// `dep_handles` to `NO_HANDLE`, all `dep_terminals` to `None`, and
2912    /// drains the pause lockset. The new subscriber then receives a fresh
2913    /// `[START]` (cache may survive for state nodes; sentinel for compute).
2914    ///
2915    /// Activation (R2.2.3 step 5): if this is the first subscriber and the
2916    /// node is a derived/dynamic compute, recursively activate deps so their
2917    /// cached handles fill our `dep_handles`.
2918    #[allow(clippy::needless_pass_by_value)] // Sink is `Arc<dyn Fn>`; we clone for the subscribers map and call it directly. Taking by value matches the ergonomics callers expect.
2919    pub fn subscribe(&self, node_id: NodeId, sink: Sink) -> Subscription {
2920        // Subscribe protocol (Slice E rework, post-handshake-reentry-lift):
2921        //
2922        // 1. Acquire `wave_owner` first (re-entrant; same-thread passes
2923        //    through, cross-thread blocks). This is the cross-thread
2924        //    serialization point that preserves R1.3.5.a happens-after
2925        //    ordering across the lock-released handshake fire.
2926        // 2. Acquire state lock briefly: alloc sub_id, run resubscribable
2927        //    reset if applicable, snapshot handshake state, install sink
2928        //    in `subscribers`. Drop state lock.
2929        // 3. Fire handshake LOCK-RELEASED. Per-tier slices (R1.3.5.a):
2930        //    `[Start]` / `[Data(cache)]?` / `[Complete]?` / `[Error(h)]?`
2931        //    / `[Teardown]?`. Empty tiers are skipped. Sink callbacks
2932        //    may re-enter Core freely — same-thread re-entry passes
2933        //    through `wave_owner` reentrantly.
2934        // 4. Run activation under `run_wave` if needed (first subscriber
2935        //    on a non-state node).
2936        // 5. Drop `wave_owner`.
2937        //
2938        // Race-fix discipline: the sink is installed in `subscribers`
2939        // BEFORE the state lock drops, so concurrent threads that
2940        // acquire `wave_owner` after our scope sees the sink already
2941        // registered. Cross-thread emits block on `wave_owner` until
2942        // we drop it, ensuring all our handshake calls land before
2943        // any concurrent wave's flush observes the sink.
2944
2945        // Acquire the partition's `wave_owner` first — cross-thread
2946        // serialization point. Per Slice Y1 / Phase E (2026-05-08),
2947        // subscribe routes through the per-partition lock instead of
2948        // a Core-global one. Subscribe touches only `node_id`'s
2949        // partition (activation cascade stays within the partition
2950        // because dep edges are unioned). `partition_wave_owner_lock_arc`
2951        // does retry-validate against concurrent union/split.
2952        // `lock_arc()` is `!Send`; same-thread reentrant.
2953        let _wave_guard = self.partition_wave_owner_lock_arc(node_id);
2954
2955        let (sub_id, tier_slices, needs_activation, did_reset) = {
2956            let mut s = self.lock_state();
2957            let sub_id = s.alloc_sub_id();
2958
2959            // Resubscribable reset: terminal + flagged → clear lifecycle
2960            // state so the incoming subscriber starts fresh. F3 audit
2961            // guard: a node that has received TEARDOWN (R2.6.4) is
2962            // permanently destroyed at this layer; resurrecting it via a
2963            // late subscribe is a category error. COMPLETE/ERROR is
2964            // recoverable for resubscribable nodes; TEARDOWN is not. The
2965            // handshake will still replay the terminal in the non-reset
2966            // branch so the late subscriber sees a clean
2967            // `[START, ?DATA, COMPLETE|ERROR, TEARDOWN]` stream.
2968            let needs_reset = {
2969                let rec = s.require_node(node_id);
2970                rec.resubscribable && rec.terminal.is_some() && !rec.has_received_teardown
2971            };
2972            if needs_reset {
2973                self.reset_for_fresh_lifecycle(&mut s, node_id);
2974            }
2975
2976            // Snapshot handshake state under lock.
2977            let (cache, is_state, first_subscriber, terminal, torn_down) = {
2978                let rec = s.require_node(node_id);
2979                (
2980                    rec.cache,
2981                    rec.is_state(),
2982                    rec.subscribers.is_empty(),
2983                    rec.terminal,
2984                    rec.has_received_teardown,
2985                )
2986            };
2987
2988            // Build per-tier handshake slices. Each non-empty slice is
2989            // fired as a separate sink call (R1.3.5.a tier-split).
2990            let mut tier_slices: SmallVec<[Vec<Message>; 4]> = SmallVec::new();
2991            tier_slices.push(vec![Message::Start]);
2992            if cache != NO_HANDLE {
2993                tier_slices.push(vec![Message::Data(cache)]);
2994            }
2995            // Slice E1 (R2.6.5 / Lock 6.G): replay buffered DATA between
2996            // [Start] (and the cache slice, if present) and any terminal.
2997            // Each buffered handle becomes a separate per-tier slice so
2998            // late subscribers see the historical Data sequence as
2999            // distinct sink calls.
3000            //
3001            // Dedupe: when a cache slice is present and the buffer's last
3002            // entry is the same handle (the typical case — cache always
3003            // tracks the last DATA emitted, and the buffer's tail entry
3004            // is that same DATA), skip the last buffer entry to avoid
3005            // delivering Data(cache) twice. For state nodes whose cache
3006            // survives unsubscribe, the buffer may have older entries
3007            // the cache doesn't reflect; the dedupe only drops the
3008            // single trailing entry that equals cache. (QA A1, 2026-05-07)
3009            let replay_handles: Vec<HandleId> = {
3010                let rec = s.require_node(node_id);
3011                let cap = rec.replay_buffer_cap.unwrap_or(0);
3012                if cap == 0 {
3013                    Vec::new()
3014                } else {
3015                    let mut v: Vec<HandleId> = rec.replay_buffer.iter().copied().collect();
3016                    if cache != NO_HANDLE && v.last() == Some(&cache) {
3017                        v.pop();
3018                    }
3019                    v
3020                }
3021            };
3022            for h in &replay_handles {
3023                tier_slices.push(vec![Message::Data(*h)]);
3024            }
3025            if let Some(t) = terminal {
3026                tier_slices.push(vec![match t {
3027                    TerminalKind::Complete => Message::Complete,
3028                    TerminalKind::Error(h) => Message::Error(h),
3029                }]);
3030            }
3031            if torn_down {
3032                tier_slices.push(vec![Message::Teardown]);
3033            }
3034
3035            // Install sink BEFORE dropping state lock so any thread that
3036            // subsequently acquires `wave_owner` (after our scope ends)
3037            // sees the sink already registered.
3038            //
3039            // Slice X4 / D2: bump `subscribers_revision` alongside the
3040            // insert so a pending_notify entry opened earlier in the same
3041            // wave (e.g. inside `batch(|| { emit(s, h1); subscribe(s,
3042            // late); emit(s, h2); })`) starts a fresh `PendingBatch` on
3043            // its next `queue_notify` push — making the new sink visible
3044            // to subsequent emits' flush slices, while the pre-subscribe
3045            // batch's snapshot stays frozen so we don't double-deliver
3046            // earlier emits via the wave's flush AND the new sub's
3047            // handshake replay.
3048            {
3049                let rec = s.require_node_mut(node_id);
3050                rec.subscribers.insert(sub_id, sink.clone());
3051                rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
3052            }
3053
3054            let needs_activation = first_subscriber && !is_state;
3055            (sub_id, tier_slices, needs_activation, needs_reset)
3056            // state lock drops here
3057        };
3058
3059        // Slice E2 (R2.4.6 / D055): on resubscribable terminal reset, fire
3060        // `wipe_ctx` LOCK-RELEASED so the binding drops its `NodeCtxState`
3061        // entry (clearing both `store` and any residual `current_cleanup`).
3062        // The new subscriber's first invoke_fn sees a fresh empty store.
3063        // Fires AFTER the state lock drops so the binding's
3064        // `node_ctx.lock()` can't deadlock against Core's state lock — and
3065        // BEFORE the handshake so the wipe is observable before any
3066        // user-visible interaction with the new lifecycle.
3067        if did_reset {
3068            self.binding.wipe_ctx(node_id);
3069        }
3070
3071        // Fire handshake LOCK-RELEASED. Sink may re-enter Core; same-
3072        // thread re-entry passes through `wave_owner` reentrantly.
3073        // Cross-thread emits block at `wave_owner` until our scope ends.
3074        //
3075        // A7 (Slice F, 2026-05-07): per-tier slice fire is wrapped in
3076        // `catch_unwind`. The sink is installed in `subscribers` BEFORE
3077        // the handshake fires (load-bearing — concurrent threads observe
3078        // the sink immediately). If a sink panics on tier N, the panic
3079        // would otherwise unwind out of `subscribe` BEFORE the
3080        // `Subscription` handle is constructed, leaving the sink
3081        // registered in `subscribers` with no user-held handle to drop.
3082        // Subsequent waves' `flush_notifications` would re-fire the
3083        // panicking sink forever.
3084        //
3085        // On panic: remove the sink from `subscribers` (via the
3086        // already-allocated `sub_id`), drop `_wave_guard` cleanly via
3087        // RAII, and resume the unwind so the user observes the panic at
3088        // the call site. Same effect as the user dropping the
3089        // `Subscription` immediately, but pre-emptive.
3090        for slice in &tier_slices {
3091            let sink_clone = sink.clone();
3092            let slice_ref: &[Message] = slice;
3093            let result = catch_unwind(AssertUnwindSafe(|| sink_clone(slice_ref)));
3094            if let Err(panic_payload) = result {
3095                // Remove the orphaned sink. Best-effort: if the node was
3096                // since torn down (e.g., the sink itself called teardown
3097                // before panicking), the entry may already be gone.
3098                {
3099                    let mut s = self.lock_state();
3100                    if let Some(rec) = s.nodes.get_mut(&node_id) {
3101                        rec.subscribers.remove(&sub_id);
3102                        // Slice X4 / D2: keep revision-tracked snapshot
3103                        // discipline consistent with the install site —
3104                        // any pending_notify entry that already absorbed
3105                        // the panicking sink under the post-install
3106                        // revision should start a fresh batch on its
3107                        // next queue_notify push.
3108                        rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
3109                    }
3110                }
3111                std::panic::resume_unwind(panic_payload);
3112            }
3113        }
3114
3115        // Run activation if needed. `run_wave_for(node_id)` acquires
3116        // only the partitions transitively touched from `node_id`
3117        // (downstream cascade + meta-companion teardown reach) — same-
3118        // partition activation re-enters reentrantly. Slice Y1 / Phase E.
3119        if needs_activation {
3120            self.run_wave_for(node_id, |this| {
3121                let mut s = this.lock_state();
3122                this.activate_derived(&mut s, node_id);
3123            });
3124        }
3125
3126        Subscription {
3127            state: Arc::downgrade(&self.state),
3128            node_id,
3129            sub_id,
3130        }
3131        // _wave_guard drops here, releasing wave_owner.
3132    }
3133
3134    /// Mark `node_id` as resubscribable per R2.2.7. Resubscribable nodes
3135    /// reset their terminal-lifecycle state on a fresh subscribe — see
3136    /// [`Self::subscribe`].
3137    ///
3138    /// Configuration call — must be made before the node has any active
3139    /// subscribers, since changing the policy mid-flight would surprise
3140    /// existing observers.
3141    ///
3142    /// # Panics
3143    ///
3144    /// Panics if the node has subscribers (the policy is observable
3145    /// behavior; changing it after the fact would change semantics for
3146    /// existing sinks).
3147    pub fn set_resubscribable(&self, node_id: NodeId, resubscribable: bool) {
3148        let mut s = self.lock_state();
3149        let rec = s.require_node_mut(node_id);
3150        assert!(
3151            rec.subscribers.is_empty(),
3152            "set_resubscribable: node already has subscribers; \
3153             configure resubscribable before any subscribe call"
3154        );
3155        rec.resubscribable = resubscribable;
3156    }
3157
3158    /// Reset a resubscribable node's terminal-lifecycle state. Called from
3159    /// `subscribe` when a late subscriber arrives at a flagged node.
3160    ///
3161    /// Released: terminal-slot retain (Error handle), all per-dep terminal
3162    /// retains (Error handles), all data_batch retains.
3163    /// Cleared: `terminal`, `has_fired_once`, `has_received_teardown`, all
3164    /// dep_records to sentinel, the pause lockset (any held locks are
3165    /// released — replay buffer drops silently because there are no
3166    /// subscribers to flush to).
3167    fn reset_for_fresh_lifecycle(&self, s: &mut CoreState, node_id: NodeId) {
3168        // Phase 1: collect wave-state handle releases + take the old
3169        // op_scratch + reset other state. Take all mutations under one
3170        // borrow so the post-borrow phases don't re-walk dep_records.
3171        let (prev_op, mut old_scratch, handles_to_release, pause_buffer_payloads) = {
3172            let rec = s.require_node_mut(node_id);
3173            let mut hs = Vec::new();
3174            if let Some(TerminalKind::Error(h)) = rec.terminal {
3175                hs.push(h);
3176            }
3177            for dr in &rec.dep_records {
3178                if let Some(TerminalKind::Error(h)) = dr.terminal {
3179                    hs.push(h);
3180                }
3181                for &h in &dr.data_batch {
3182                    hs.push(h);
3183                }
3184                // Slice C-3 /qa: also release `prev_data`. Prior to this
3185                // collection, `reset_for_fresh_lifecycle` overwrote
3186                // `dr.prev_data = NO_HANDLE` without releasing the old
3187                // handle, leaking one share per dep per resubscribable
3188                // cycle. The leak was masked because no test exercised
3189                // the per-dep `prev_data` retain across a lifecycle
3190                // reset; surfaced by the T1 tightening of
3191                // `last_releases_buffered_latest_on_lifecycle_reset`.
3192                if dr.prev_data != NO_HANDLE {
3193                    hs.push(dr.prev_data);
3194                }
3195            }
3196            // Take pause_state's buffer; collect its payload handles for
3197            // release (they were retained at queue_notify time; buffer
3198            // drops because the new subscriber starts fresh).
3199            let mut pulled = Vec::new();
3200            if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
3201                for msg in buffer.drain(..) {
3202                    if let Some(h) = msg.payload_handle() {
3203                        pulled.push(h);
3204                    }
3205                }
3206            }
3207            // Slice E1: drain the replay buffer too — the new subscriber
3208            // gets a fresh lifecycle and shouldn't see prior emissions.
3209            for h in rec.replay_buffer.drain(..) {
3210                pulled.push(h);
3211            }
3212            // Reset wave / lifecycle state.
3213            rec.terminal = None;
3214            rec.has_fired_once = rec.cache != NO_HANDLE && rec.is_state();
3215            rec.has_received_teardown = false;
3216            for dr in &mut rec.dep_records {
3217                dr.prev_data = NO_HANDLE;
3218                dr.data_batch.clear();
3219                dr.terminal = None;
3220                dr.dirty = false;
3221                dr.involved_this_wave = false;
3222            }
3223            rec.pause_state = PauseState::Active;
3224            rec.involved_this_wave = false;
3225            rec.dirty = false;
3226            // P7 (Slice A close /qa): Dynamic nodes clear `tracked` so
3227            // the post-reset first fire repopulates from the fn's
3228            // returned tracked-deps set.
3229            if rec.is_dynamic {
3230                rec.tracked.clear();
3231            }
3232            // Take the old scratch out so we can release its handles and
3233            // install a fresh one. Operator op is copied for the
3234            // rebuild step below.
3235            let prev_op = rec.op;
3236            let old = std::mem::take(&mut rec.op_scratch);
3237            (prev_op, old, hs, pulled)
3238        };
3239
3240        // Phase 2 (Slice C-3 /qa P1 — RETAIN-BEFORE-RELEASE ordering):
3241        // build the fresh scratch FIRST, taking new retains on any
3242        // seed/default handles. This must run BEFORE Phase 3 releases
3243        // the old scratch's shares — if old `acc` (Scan/Reduce) or old
3244        // `latest` (Last) aliases the new `seed`/`default` (common:
3245        // `fold(seed, x) == seed` interns to the same registry entry),
3246        // releasing the old share first could collapse the binding's
3247        // registry slot to zero (production bindings remove the value
3248        // entry on refcount-zero — see `tests/common/mod.rs:191-204`),
3249        // and a subsequent `retain_handle` on the new seed would bump a
3250        // refcount on a slot whose value has been removed. By taking
3251        // the new retains first, we floor the refcount at ≥1 before
3252        // any release happens.
3253        let new_scratch = match prev_op {
3254            // Slice H: the OperatorOp stored on NodeRecord previously
3255            // passed `make_op_scratch` validation at registration time
3256            // (RegisterError::OperatorSeedSentinel for Scan/Reduce
3257            // sentinel seeds; Last { default: NO_HANDLE } is accepted
3258            // and never errors). Re-running it here on the same op
3259            // value is structurally guaranteed to succeed.
3260            Some(op) => self
3261                .make_op_scratch(op)
3262                .expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time"),
3263            None => None,
3264        };
3265
3266        // Phase 3: NOW release handles owned by the old op_scratch
3267        // (Scan/Reduce acc, Distinct/Pairwise prev, Last latest +
3268        // default). Safe per Phase 2's retain-first floor. The boxed
3269        // value is consumed and dropped after.
3270        if let Some(scratch) = old_scratch.as_mut() {
3271            scratch.release_handles(&*self.binding);
3272        }
3273        drop(old_scratch);
3274
3275        // Phase 4: install the fresh scratch.
3276        {
3277            let rec = s.require_node_mut(node_id);
3278            rec.op_scratch = new_scratch;
3279        }
3280
3281        // Phase 5: release wave-state handles collected in phase 1.
3282        for h in handles_to_release {
3283            self.binding.release_handle(h);
3284        }
3285        for h in pause_buffer_payloads {
3286            self.binding.release_handle(h);
3287        }
3288    }
3289
3290    /// Activate `root` and any transitive uncached compute deps so their
3291    /// caches fill our dep_handles slots.
3292    ///
3293    /// Slice A close (M1): pure dep-walk + dep_handles population +
3294    /// pending_fires queueing. No `in_tick` management or `drain_and_flush`
3295    /// call — the outer caller (typically `Core::subscribe` via
3296    /// [`Core::run_wave`]) owns the wave lifecycle and drains lock-released
3297    /// around `invoke_fn`.
3298    ///
3299    /// Walk shape:
3300    ///   1. **Discover phase (DFS via Vec stack):** starting at `root`,
3301    ///      walk transitively-needing-activation deps via the `deps`
3302    ///      chain. Build an ordering where each node appears AFTER all
3303    ///      of its uncached compute deps — i.e., reverse topological
3304    ///      among the visited subgraph.
3305    ///   2. **Deliver phase (forward iteration):** for each visited
3306    ///      node in dep-first order, push deps' caches into the node's
3307    ///      `dep_handles` slots. Caches that were sentinel pre-walk are
3308    ///      filled because their parent's fn fires later in the wave's
3309    ///      drain loop and `commit_emission` propagates new caches forward
3310    ///      via `deliver_data_to_consumer` — the same path this method
3311    ///      uses for the initial seed. Adds the node to `pending_fires`
3312    ///      if its tracked-deps gate is satisfied; the wave-engine drain
3313    ///      fires the fn lock-released around `invoke_fn`.
3314    pub(crate) fn activate_derived(&self, s: &mut CoreState, root: NodeId) {
3315        // Phase 1: discover. DFS to collect every compute node reachable
3316        // via deps that doesn't yet have a cache and hasn't fired.
3317        // Record them in dep-first (post-order) so phase 2 can deliver
3318        // caches forward. Frame is `(node_id, finalize)` — `finalize=false`
3319        // means "first visit: push deps then re-push self with finalize=true";
3320        // `finalize=true` means "deps have been expanded, append self to
3321        // `order`."
3322        let mut visited: HashSet<NodeId> = HashSet::new();
3323        let mut order: Vec<NodeId> = Vec::new();
3324        let mut stack: Vec<(NodeId, bool)> = vec![(root, false)];
3325        while let Some((id, finalize)) = stack.pop() {
3326            if finalize {
3327                order.push(id);
3328                continue;
3329            }
3330            if !visited.insert(id) {
3331                continue;
3332            }
3333            stack.push((id, true));
3334            let dep_ids: Vec<NodeId> = s.require_node(id).dep_ids_vec();
3335            for dep_id in dep_ids {
3336                let (dep_is_state, dep_cache, dep_has_fired) = {
3337                    let dep_rec = s.require_node(dep_id);
3338                    (dep_rec.is_state(), dep_rec.cache, dep_rec.has_fired_once)
3339                };
3340                if !dep_is_state
3341                    && dep_cache == NO_HANDLE
3342                    && !dep_has_fired
3343                    && !visited.contains(&dep_id)
3344                {
3345                    stack.push((dep_id, false));
3346                }
3347            }
3348        }
3349
3350        // Phase 2: deliver caches in dep-first order. For each node, walk
3351        // its deps and call `deliver_data_to_consumer` for any with caches.
3352        // Producer nodes (no deps + has fn — Slice D, D031) have no deps
3353        // to walk; queue them directly into `pending_fires` so the wave
3354        // engine fires their fn once on activation.
3355        //
3356        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
3357        // per-thread WaveState. State lock + thread_local borrow are
3358        // independent; deliver_data_to_consumer also writes pending_fires
3359        // via WaveState (no nested with_wave_state borrows here).
3360        for &id in &order {
3361            let (dep_ids, is_producer) = {
3362                let rec = s.require_node(id);
3363                (rec.dep_ids_vec(), rec.is_producer())
3364            };
3365            if is_producer {
3366                crate::batch::with_wave_state(|ws| {
3367                    ws.pending_fires.insert(id);
3368                });
3369                continue;
3370            }
3371            for (i, dep_id) in dep_ids.iter().copied().enumerate() {
3372                let dep_cache = s.require_node(dep_id).cache;
3373                if dep_cache != NO_HANDLE {
3374                    self.deliver_data_to_consumer(s, id, i, dep_cache);
3375                }
3376            }
3377        }
3378    }
3379
3380    // -------------------------------------------------------------------
3381    // Emission entry point
3382    // -------------------------------------------------------------------
3383
3384    /// Set a state node's value. Triggers a wave (DIRTY → DATA/RESOLVED →
3385    /// fn fires for downstream).
3386    ///
3387    /// Silent no-op if the node has already terminated (R1.3.4). The handle
3388    /// passed in is still released by the caller's binding-side intern path
3389    /// — no implicit retain is consumed when the call short-circuits.
3390    ///
3391    /// # Panics
3392    ///
3393    /// Panics if `node_id` is not a state node, or if `new_handle` is
3394    /// [`NO_HANDLE`] (per R1.2.4, sentinel is not a valid DATA payload).
3395    pub fn emit(&self, node_id: NodeId, new_handle: HandleId) {
3396        assert!(
3397            new_handle != NO_HANDLE,
3398            "NO_HANDLE is not a valid DATA payload (R1.2.4)"
3399        );
3400        // Validate + terminal short-circuit under a brief lock.
3401        //
3402        // emit() is valid for State and Producer nodes — both are
3403        // intrinsic sources whose values are not derived from declared
3404        // deps. State nodes get emit() from user code; Producer nodes
3405        // get emit() from sink callbacks the producer's build closure
3406        // registered (sink fires → re-enter Core → emit on self).
3407        // Derived / Dynamic / Operator nodes emit via their fn return
3408        // value through fire_fn / fire_operator, NOT via emit().
3409        {
3410            let s = self.lock_state();
3411            let rec = s.require_node(node_id);
3412            assert!(
3413                rec.is_state() || rec.is_producer(),
3414                "emit() is for state or producer nodes only; \
3415                 derived/dynamic/operator emit via their fn return value"
3416            );
3417            if rec.terminal.is_some() {
3418                drop(s);
3419                // Caller's intern share would otherwise leak; cache slot
3420                // ownership doesn't transfer because we're not advancing
3421                // cache. Released lock-released so the binding can't
3422                // deadlock against an internal binding mutex.
3423                self.binding.release_handle(new_handle);
3424                return;
3425            }
3426        }
3427        // Run wave on `node_id`'s touched partitions. Slice Y1 / Phase E:
3428        // emit cascades only via `s.children`, all unioned with `node_id`'s
3429        // partition by construction (dep edges = union edges). Common case
3430        // is a single-partition acquire — disjoint-partition emits run
3431        // truly parallel under per-partition `wave_owner`.
3432        self.run_wave_for(node_id, |this| {
3433            this.commit_emission(node_id, new_handle);
3434        });
3435    }
3436
3437    /// Read a node's current cache. Returns [`NO_HANDLE`] if sentinel.
3438    #[must_use]
3439    pub fn cache_of(&self, node_id: NodeId) -> HandleId {
3440        self.lock_state().require_node(node_id).cache
3441    }
3442
3443    /// Whether the node's fn has fired at least once (compute) OR it has had
3444    /// a non-sentinel value (state).
3445    #[must_use]
3446    pub fn has_fired_once(&self, node_id: NodeId) -> bool {
3447        self.lock_state().require_node(node_id).has_fired_once
3448    }
3449
3450    // -------------------------------------------------------------------
3451    // Read-side inspection helpers (Slice E+, M2)
3452    //
3453    // Non-panicking accessors for graph-layer introspection (`describe()`,
3454    // `observe()`, `node_count()`). All five return Option/empty for
3455    // unknown ids — they're meant to back walks over `node_ids()` where
3456    // the caller already knows the ids are valid, plus debugging /
3457    // dry-run probes that prefer "absence" over "panic".
3458    //
3459    // Keep these strictly read-only: no wave entry, no binding callbacks,
3460    // no lock release. Each takes the state lock once, copies a small
3461    // value, and drops the lock.
3462    // -------------------------------------------------------------------
3463
3464    /// Snapshot of every registered `NodeId` in unspecified order. The
3465    /// order matches `HashMap` iteration over the internal node table —
3466    /// callers that need stable ordering should track names at the
3467    /// `Graph` layer (canonical spec §3.5 namespace).
3468    #[must_use]
3469    pub fn node_ids(&self) -> Vec<NodeId> {
3470        self.lock_state().nodes.keys().copied().collect()
3471    }
3472
3473    /// Total number of nodes registered in this Core.
3474    #[must_use]
3475    pub fn node_count(&self) -> usize {
3476        self.lock_state().nodes.len()
3477    }
3478
3479    /// Returns `Some(kind)` for known nodes, `None` for unknown. Kind is
3480    /// **derived** from the field shape per D030 — see [`NodeKind`].
3481    #[must_use]
3482    pub fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
3483        self.lock_state().nodes.get(&node_id).map(NodeRecord::kind)
3484    }
3485
3486    /// Snapshot of the node's deps in declaration order. Empty for
3487    /// unknown nodes or for state nodes (which have no deps).
3488    #[must_use]
3489    pub fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
3490        self.lock_state()
3491            .nodes
3492            .get(&node_id)
3493            .map(NodeRecord::dep_ids_vec)
3494            .unwrap_or_default()
3495    }
3496
3497    /// Returns `Some(kind)` if the node has terminated (R1.3.4) — the
3498    /// pair `Some(Complete)` / `Some(Error(h))` mirrors the wire message
3499    /// the node emitted. `None` for live nodes or unknown ids.
3500    #[must_use]
3501    pub fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
3502        self.lock_state()
3503            .nodes
3504            .get(&node_id)
3505            .and_then(|r| r.terminal)
3506    }
3507
3508    /// Whether the node has wave-scoped DIRTY pending (a tier-1 message
3509    /// queued but the matching tier-3 settle has not yet flushed).
3510    /// `false` for unknown ids. Mostly useful for `describe()` status
3511    /// classification (R3.6.1 `"dirty"`).
3512    #[must_use]
3513    pub fn is_dirty(&self, node_id: NodeId) -> bool {
3514        self.lock_state()
3515            .nodes
3516            .get(&node_id)
3517            .is_some_and(|r| r.dirty)
3518    }
3519
3520    /// Snapshot of `parent`'s meta companion list (R1.3.9.d / R2.3.3 —
3521    /// the companions added via [`Self::add_meta_companion`]). Empty
3522    /// for unknown ids or for nodes with no companions registered.
3523    ///
3524    /// Used by the graph layer's `signal_invalidate` to filter meta
3525    /// children out of the broadcast (canonical R3.7.2 — meta caches
3526    /// are preserved across graph-wide INVALIDATE).
3527    #[must_use]
3528    pub fn meta_companions_of(&self, parent: NodeId) -> Vec<NodeId> {
3529        self.lock_state()
3530            .nodes
3531            .get(&parent)
3532            .map(|r| r.meta_companions.clone())
3533            .unwrap_or_default()
3534    }
3535
3536    // -------------------------------------------------------------------
3537    // Wave engine — lives in `crate::batch` (Slice C-1 module split;
3538    // Slice A close M1 refactor lifted the binding-callback re-entrance
3539    // restrictions). The methods are still on `Core`; see `batch.rs` for:
3540    //
3541    //   - `run_wave` — wave entry, manages own locking.
3542    //   - `drain_and_flush` — drain phase, lock-released around invoke_fn.
3543    //   - `commit_emission` — lock-released around custom_equals.
3544    //   - `pick_next_fire`, `deliver_data_to_consumer`, `queue_notify`,
3545    //     `flush_notifications` — wave-engine helpers.
3546    // -------------------------------------------------------------------
3547}
3548
3549// -----------------------------------------------------------------------
3550// COMPLETE / ERROR — terminal lifecycle + auto-cascade gating
3551// -----------------------------------------------------------------------
3552
3553impl Core {
3554    /// Emit `[COMPLETE]` (R1.3.4) on `node_id`, marking it terminal. After
3555    /// this call:
3556    ///
3557    /// - Subsequent `Core::emit` on this node is a silent no-op (idempotent
3558    ///   termination).
3559    /// - The node's fn no longer fires.
3560    /// - The node's cache is preserved (last value still observable via
3561    ///   `cache_of`).
3562    /// - Children receive `[COMPLETE]` (tier 5 — bypasses pause buffer).
3563    /// - Auto-cascade gating (Lock 2.B): each child that has all of its
3564    ///   deps in a terminal state auto-emits its own `[COMPLETE]`. ERROR
3565    ///   dominates COMPLETE — if any of a child's deps emitted ERROR, the
3566    ///   child auto-cascades that ERROR instead.
3567    ///
3568    /// Idempotent: calling `complete` on an already-terminal node is a no-op.
3569    ///
3570    /// # Panics
3571    ///
3572    /// Panics if `node_id` is unknown.
3573    pub fn complete(&self, node_id: NodeId) {
3574        self.emit_terminal(node_id, TerminalKind::Complete);
3575    }
3576
3577    /// Emit `[ERROR, error_handle]` (R1.3.4) on `node_id`. `error_handle`
3578    /// must resolve to a non-sentinel value (R1.2.5) — the binding side has
3579    /// already interned the error value before this call. Same lifecycle
3580    /// effects as [`Self::complete`]; ERROR dominates COMPLETE in auto-
3581    /// cascade gating.
3582    ///
3583    /// # Panics
3584    ///
3585    /// Panics if `node_id` is unknown or `error_handle == NO_HANDLE`.
3586    pub fn error(&self, node_id: NodeId, error_handle: HandleId) {
3587        assert!(
3588            error_handle != NO_HANDLE,
3589            "NO_HANDLE is not a valid ERROR payload (R1.2.5)"
3590        );
3591        self.emit_terminal(node_id, TerminalKind::Error(error_handle));
3592        // The caller's intern share for `error_handle` is NOT transferred
3593        // to any slot — `terminate_node` takes its OWN retain for every
3594        // populated `terminal` and `dep_terminals` slot. Release the
3595        // caller's share here (mirrors `Core::emit`'s short-circuit
3596        // release on terminal). Without this, every `error()` call leaks
3597        // one binding-side handle ref. Slice A-bigger /qa item D fix.
3598        self.binding.release_handle(error_handle);
3599    }
3600
3601    fn emit_terminal(&self, node_id: NodeId, terminal: TerminalKind) {
3602        {
3603            let s = self.lock_state();
3604            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
3605        }
3606        // Wave on `node_id`'s touched partitions (Slice Y1 / Phase E).
3607        // COMPLETE / ERROR cascade follows `s.children` (in-partition by
3608        // union-find construction). The thunk acquires its own state lock
3609        // to queue the cascade.
3610        self.run_wave_for(node_id, |this| {
3611            let mut s = this.lock_state();
3612            this.terminate_node(&mut s, node_id, terminal);
3613        });
3614    }
3615
3616    /// Set the node's terminal slot, queue the wire message, and cascade to
3617    /// children. Idempotent on already-terminal node (no-op).
3618    ///
3619    /// Iterative implementation (Slice A-bigger, M1-close): a work-queue
3620    /// drives the cascade so deep linear chains don't overflow the OS
3621    /// thread stack. Mirrors `path_from_to`'s explicit-stack pattern.
3622    fn terminate_node(&self, s: &mut CoreState, node_id: NodeId, terminal: TerminalKind) {
3623        let mut work: Vec<(NodeId, TerminalKind)> = vec![(node_id, terminal)];
3624        while let Some((id, t)) = work.pop() {
3625            if s.require_node(id).terminal.is_some() {
3626                continue; // Idempotent — already terminal.
3627            }
3628            // Take a refcount share for the terminal slot so the error
3629            // handle outlives the binding-side intern's transient share.
3630            if let TerminalKind::Error(h) = t {
3631                self.binding.retain_handle(h);
3632            }
3633            // Slice E2 /qa Q2(b) (D069): if a resubscribable node is
3634            // terminating with no live subscribers, queue eager
3635            // `wipe_ctx` for the wave's lock-released drain. This is the
3636            // mutually-exclusive complement of the `Subscription::Drop`
3637            // wipe site: when the LAST sub drops first then terminate
3638            // fires, subs are empty here and we queue; when terminate
3639            // fires WITH subs still alive, we DON'T queue (subs not
3640            // empty), and `Subscription::Drop` will fire wipe directly
3641            // when those subs eventually drop. Either way, exactly one
3642            // wipe fires per terminal lifecycle.
3643            let queue_wipe = {
3644                let rec = s.require_node(id);
3645                rec.resubscribable && rec.subscribers.is_empty()
3646            };
3647            s.require_node_mut(id).terminal = Some(t);
3648            // Q-beyond Sub-slice 2 + 3 (D108, 2026-05-09): pending_fires
3649            // and pending_wipes both live on per-thread WaveState. Single
3650            // borrow handles the queue-wipe push and the pending_fires
3651            // remove.
3652            crate::batch::with_wave_state(|ws| {
3653                if queue_wipe {
3654                    ws.pending_wipes.push(id);
3655                }
3656                // Drain pending fires for this node — fn won't fire on a
3657                // terminal node.
3658                ws.pending_fires.remove(&id);
3659            });
3660            // R1.3.8.b / Slice F (A3, 2026-05-07): if this node was paused
3661            // when terminating (the canonical case is the R1.3.8.c overflow
3662            // ERROR synthesis path), drain the pause buffer and release
3663            // each payload's queue_notify-time retain. Without this, the
3664            // buffer leaks one share per buffered DATA/RESOLVED/INVALIDATE.
3665            // Subscribers receive the terminal directly via the cascade
3666            // below (tier-5 bypasses the pause buffer); the buffered
3667            // content is moot post-terminal.
3668            let drained: Vec<HandleId> = {
3669                let rec = s.require_node_mut(id);
3670                let mut drained: Vec<HandleId> = Vec::new();
3671                if rec.pause_state.is_paused() {
3672                    // Take the buffered messages out, then collapse the
3673                    // pause state to Active so subsequent code observes a
3674                    // clean lifecycle. Idempotent on Active (no-op).
3675                    let prev = std::mem::replace(&mut rec.pause_state, PauseState::Active);
3676                    if let PauseState::Paused { buffer, .. } = prev {
3677                        drained.extend(buffer.into_iter().filter_map(Message::payload_handle));
3678                    }
3679                }
3680                // QA A4 (2026-05-07): drain replay buffer on terminate. A
3681                // non-resubscribable terminal ends the lifecycle; without
3682                // this drain the buffer's retains leak until `Drop for
3683                // CoreState`. Resubscribable nodes' replay buffers are
3684                // also drained (when they're hit by a terminal cascade);
3685                // a fresh subscribe rebuilds the buffer from scratch as
3686                // part of `reset_for_fresh_lifecycle`.
3687                drained.extend(rec.replay_buffer.drain(..));
3688                drained
3689            };
3690            for h in drained {
3691                self.binding.release_handle(h);
3692            }
3693            // Queue the wire message (tier 5 — bypasses pause buffer).
3694            let msg = match t {
3695                TerminalKind::Complete => Message::Complete,
3696                TerminalKind::Error(h) => Message::Error(h),
3697            };
3698            self.queue_notify(s, id, msg);
3699            // Cascade to children.
3700            let child_ids: Vec<NodeId> = s
3701                .children
3702                .get(&id)
3703                .map(|c| c.iter().copied().collect())
3704                .unwrap_or_default();
3705            for child_id in child_ids {
3706                let dep_idx = s.require_node(child_id).dep_index_of(id);
3707                let Some(idx) = dep_idx else { continue };
3708                // Mark this child's per-dep terminal slot. Take a retain on
3709                // the error handle for the slot share.
3710                {
3711                    let child = s.require_node_mut(child_id);
3712                    if child.dep_records[idx].terminal.is_some() {
3713                        // Idempotent — child already saw this dep terminate.
3714                        continue;
3715                    }
3716                    child.dep_records[idx].terminal = Some(t);
3717                }
3718                if let TerminalKind::Error(h) = t {
3719                    self.binding.retain_handle(h);
3720                }
3721                // Auto-cascade gating: if all deps now terminal, push child
3722                // onto the work queue with the chosen terminal.
3723                //
3724                // Slice C-1: kinds that opt out of Lock 2.B (currently
3725                // `Operator(Reduce)`) intercept upstream COMPLETE so they
3726                // can emit their accumulator before terminating. Instead of
3727                // cascading, queue the child for fn-fire — `fire_operator`
3728                // sees `dep_records[0].terminal` set and emits the
3729                // appropriate batch (Data(acc) + Complete on COMPLETE,
3730                // Error(h) on ERROR).
3731                let action = {
3732                    let child = s.require_node(child_id);
3733                    if child.terminal.is_some() {
3734                        ChildAction::None // Already terminated — no-op.
3735                    } else if child.all_deps_terminal() {
3736                        if child.skips_auto_cascade() {
3737                            ChildAction::QueueFire
3738                        } else {
3739                            ChildAction::Cascade(pick_cascade_terminal(&child.dep_records))
3740                        }
3741                    } else {
3742                        ChildAction::None
3743                    }
3744                };
3745                match action {
3746                    ChildAction::None => {}
3747                    ChildAction::Cascade(t_child) => {
3748                        work.push((child_id, t_child));
3749                    }
3750                    ChildAction::QueueFire => {
3751                        // Q-beyond Sub-slice 2 (D108, 2026-05-09):
3752                        // pending_fires lives on per-thread WaveState.
3753                        crate::batch::with_wave_state(|ws| {
3754                            ws.pending_fires.insert(child_id);
3755                        });
3756                    }
3757                }
3758            }
3759        }
3760    }
3761}
3762
3763/// Outcome of Lock 2.B child gating in `terminate_node`'s cascade walk.
3764enum ChildAction {
3765    /// No cascade; child is already terminal or not yet all-deps-terminal.
3766    None,
3767    /// Auto-cascade with the picked terminal kind (ERROR dominates COMPLETE).
3768    Cascade(TerminalKind),
3769    /// Queue child for fn-fire instead of cascading. Used by operator
3770    /// kinds that intercept upstream terminal (Operator(Reduce)).
3771    QueueFire,
3772}
3773
3774/// Lock 2.B cascade-terminal selection: ERROR dominates COMPLETE; first
3775/// ERROR seen wins. Caller has already verified all deps are terminal.
3776fn pick_cascade_terminal(dep_records: &[DepRecord]) -> TerminalKind {
3777    for dr in dep_records {
3778        if let Some(TerminalKind::Error(h)) = dr.terminal {
3779            return TerminalKind::Error(h);
3780        }
3781    }
3782    TerminalKind::Complete
3783}
3784
3785// -----------------------------------------------------------------------
3786// TEARDOWN — destruction, with auto-COMPLETE prepend (R2.6.4 / Lock 6.F)
3787// -----------------------------------------------------------------------
3788
3789impl Core {
3790    /// Tear `node_id` down. Per R2.6.4 / Lock 6.F:
3791    ///
3792    /// - **Auto-prepend COMPLETE.** If the node has not yet emitted a
3793    ///   terminal (`COMPLETE` / `ERROR`), `terminate_node` is called with
3794    ///   `Complete` first so subscribers see `[COMPLETE, TEARDOWN]`, not
3795    ///   bare `[TEARDOWN]`. This guarantees a clean end-of-stream signal
3796    ///   to async iterators and other consumers that wait on terminal
3797    ///   delivery.
3798    /// - **Idempotent on duplicate delivery.** The per-node
3799    ///   `has_received_teardown` flag is set on the first call; subsequent
3800    ///   `teardown` calls (or cascade visits from other paths) are silent
3801    ///   no-ops — no second `[COMPLETE, TEARDOWN]` pair to subscribers.
3802    /// - **Cascade downstream.** Each child is recursively torn down. The
3803    ///   child's own COMPLETE auto-cascades from `terminate_node`'s logic
3804    ///   (Lock 2.B); its TEARDOWN comes from this cascade.
3805    ///
3806    /// # Panics
3807    ///
3808    /// Panics if `node_id` is unknown.
3809    pub fn teardown(&self, node_id: NodeId) {
3810        {
3811            let s = self.lock_state();
3812            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
3813        }
3814        let torn_down: Arc<Mutex<Vec<NodeId>>> = Arc::new(Mutex::new(Vec::new()));
3815        let torn_down_for_wave = torn_down.clone();
3816        // TEARDOWN cascade follows `s.children` AND `meta_companions`
3817        // (R1.3.9.d) — meta-companions can cross partitions. Slice Y1 /
3818        // Phase E `compute_touched_partitions(node_id)` (called by
3819        // `run_wave_for`) walks both axes so the wave acquires every
3820        // partition reachable via the cascade.
3821        self.run_wave_for(node_id, move |this| {
3822            let mut s = this.lock_state();
3823            let collected = this.teardown_inner(&mut s, node_id);
3824            torn_down_for_wave.lock().extend(collected);
3825        });
3826        // Fire NodeTornDown for every cascaded id (root + metas +
3827        // downstream consumers that auto-cascaded). Outside the state
3828        // lock, matching fire_topology_event discipline.
3829        let ids = std::mem::take(&mut *torn_down.lock());
3830        for id in ids {
3831            self.fire_topology_event(&crate::topology::TopologyEvent::NodeTornDown(id));
3832        }
3833    }
3834
3835    /// Iterative teardown walk (Slice A-bigger, M1-close).
3836    ///
3837    /// The recursive shape was:
3838    ///   ```text
3839    ///   teardown(n):
3840    ///     if torn_down: return
3841    ///     mark torn_down
3842    ///     for meta in metas: teardown(meta)
3843    ///     terminate_node + queue Teardown
3844    ///     for child in children: teardown(child)
3845    ///   ```
3846    /// Deep linear chains (~10k nodes) overflowed the OS thread stack.
3847    ///
3848    /// The iterative shape uses a `Vec<Action>` stack with `Visit` and
3849    /// `EmitTeardown` actions. `Visit(n)` marks `n` torn-down (or no-ops
3850    /// if already), then pushes (in reverse order so LIFO pops in forward
3851    /// order) `Visit(child_K), …, Visit(child_1), EmitTeardown(n),
3852    /// Visit(meta_M), …, Visit(meta_1)`. The R1.3.9.d "metas first, then
3853    /// self, then children" ordering is preserved by the push order:
3854    /// metas pop first, recursively expand and emit; then `EmitTeardown(n)`
3855    /// pops and runs `terminate_node` + queue `Teardown`; then children
3856    /// pop. Idempotency via `has_received_teardown` keeps each node
3857    /// visited at most once even when multi-parent diamonds re-enter via
3858    /// a sibling path.
3859    fn teardown_inner(&self, s: &mut CoreState, root: NodeId) -> Vec<NodeId> {
3860        enum Action {
3861            Visit(NodeId),
3862            EmitTeardown(NodeId),
3863        }
3864        let mut stack: Vec<Action> = vec![Action::Visit(root)];
3865        // Topology accumulator: every node that actually emits TEARDOWN
3866        // (i.e. each `EmitTeardown(id)` site, NOT each `Visit` — visits
3867        // for already-torn-down nodes short-circuit on idempotency).
3868        let mut torn_down: Vec<NodeId> = Vec::new();
3869        while let Some(action) = stack.pop() {
3870            match action {
3871                Action::Visit(id) => {
3872                    if s.require_node(id).has_received_teardown {
3873                        continue; // Idempotent (R2.6.4).
3874                    }
3875                    s.require_node_mut(id).has_received_teardown = true;
3876                    // Push order: children first (pop LAST), then
3877                    // EmitTeardown(id), then metas (pop FIRST). Reverse
3878                    // each list so within-group order matches the original
3879                    // recursive iteration.
3880                    let children: Vec<NodeId> = s
3881                        .children
3882                        .get(&id)
3883                        .map(|c| c.iter().copied().collect())
3884                        .unwrap_or_default();
3885                    for &child in children.iter().rev() {
3886                        stack.push(Action::Visit(child));
3887                    }
3888                    stack.push(Action::EmitTeardown(id));
3889                    let metas: Vec<NodeId> = s.require_node(id).meta_companions.clone();
3890                    for &meta in metas.iter().rev() {
3891                        stack.push(Action::Visit(meta));
3892                    }
3893                }
3894                Action::EmitTeardown(id) => {
3895                    // Auto-prepend COMPLETE if not yet terminal. The (now
3896                    // iterative) terminate_node handles auto-cascade to
3897                    // children's own terminal slots per Lock 2.B.
3898                    let already_terminal = s.require_node(id).terminal.is_some();
3899                    if !already_terminal {
3900                        self.terminate_node(s, id, TerminalKind::Complete);
3901                    }
3902                    // Wire emission of the TEARDOWN itself (tier 6).
3903                    self.queue_notify(s, id, Message::Teardown);
3904                    torn_down.push(id);
3905                }
3906            }
3907        }
3908        torn_down
3909    }
3910
3911    /// Attach `companion` as a meta companion of `parent` per R1.3.9.d.
3912    /// Meta companions are nodes whose lifecycle is bound to the parent's
3913    /// in TEARDOWN ordering: when `parent` tears down, `companion` tears
3914    /// down first.
3915    ///
3916    /// Use this for inspection / audit / sidecar nodes that subscribe to
3917    /// parent state — without the ordering, the companion could observe
3918    /// the parent mid-destruction and emit garbage.
3919    ///
3920    /// Idempotent on duplicate registration of the same companion.
3921    ///
3922    /// # Lifecycle constraint
3923    ///
3924    /// Intended for **setup-time** wiring — call this before `parent` or
3925    /// `companion` enters a wave. Mid-wave registration (especially during
3926    /// a teardown cascade in flight) is implementation-defined: the new
3927    /// edge takes effect on the *next* wave. Adding a companion to a
3928    /// torn-down parent silently no-ops (the parent will not tear down
3929    /// again). For dynamic companion attachment with deterministic
3930    /// ordering, prefer constructing the wiring before subscribers exist.
3931    ///
3932    /// # Panics
3933    ///
3934    /// Panics if either node id is unknown, or if `parent == companion`
3935    /// (a node cannot be its own meta companion — would loop on TEARDOWN).
3936    pub fn add_meta_companion(&self, parent: NodeId, companion: NodeId) {
3937        assert!(parent != companion, "node cannot be its own meta companion");
3938        let mut s = self.lock_state();
3939        assert!(s.nodes.contains_key(&parent), "unknown parent {parent:?}");
3940        assert!(
3941            s.nodes.contains_key(&companion),
3942            "unknown companion {companion:?}"
3943        );
3944        let metas = &mut s.require_node_mut(parent).meta_companions;
3945        if !metas.contains(&companion) {
3946            metas.push(companion);
3947        }
3948    }
3949}
3950
3951// -----------------------------------------------------------------------
3952// INVALIDATE — cache clear + downstream cascade
3953// -----------------------------------------------------------------------
3954
3955impl Core {
3956    /// Clear `node_id`'s cache and cascade `[INVALIDATE]` to downstream
3957    /// dependents per canonical spec §1.4.
3958    ///
3959    /// Semantics:
3960    /// - **Never-populated case (R1.4 line 197):** if `cache == NO_HANDLE`,
3961    ///   the call is a no-op — no cache to clear, no INVALIDATE emitted.
3962    ///   This naturally provides idempotency within a wave: once a node has
3963    ///   been invalidated this wave (cache = NO_HANDLE), a second invalidate
3964    ///   on the same node does nothing.
3965    /// - **Cache clear (immediate):** the node's cached handle is dropped
3966    ///   (refcount released), `cache` becomes `NO_HANDLE`. State nodes
3967    ///   keep `has_fired_once` per spec — INVALIDATE is not a re-gating
3968    ///   event (the next emission to a previously-fired state still does
3969    ///   not re-trigger the first-run gate; that's a resubscribable-terminal
3970    ///   lifecycle concern, separate slice).
3971    /// - **Wire emission (tier 4):** `[INVALIDATE]` is queued via the
3972    ///   normal pause-aware notify path. Buffers while paused, flushes
3973    ///   immediately otherwise.
3974    /// - **Downstream cascade:** for each child of this node, the child's
3975    ///   `dep_handles[idx_of_node]` is reset to `NO_HANDLE` (its previous
3976    ///   value referenced a now-released handle). The child is then
3977    ///   recursively invalidated (no-op if its cache was already
3978    ///   `NO_HANDLE`). This re-closes the child's first-run gate — fn
3979    ///   won't fire again until the upstream re-emits a value.
3980    ///
3981    /// Wraps in a fresh wave when called from outside a wave, so
3982    /// notifications flush at the natural wave boundary.
3983    ///
3984    /// # Panics
3985    ///
3986    /// Panics if `node_id` is unknown, consistent with `emit` / `pause`.
3987    pub fn invalidate(&self, node_id: NodeId) {
3988        {
3989            let s = self.lock_state();
3990            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
3991        }
3992        // INVALIDATE cascade follows `s.children` (in-partition by union-
3993        // find construction). Slice Y1 / Phase E.
3994        self.run_wave_for(node_id, |this| {
3995            let mut s = this.lock_state();
3996            this.invalidate_inner(&mut s, node_id);
3997        });
3998    }
3999
4000    /// Iterative invalidate cascade (Slice A-bigger, M1-close).
4001    ///
4002    /// The recursive shape was a depth-first cache-clear walk:
4003    ///   ```text
4004    ///   invalidate(n):
4005    ///     if cache(n) == NO_HANDLE: return  // already-invalidated guard
4006    ///     cache(n) = NO_HANDLE; release handle
4007    ///     queue Invalidate(n)
4008    ///     for child in children:
4009    ///       child.dep_handles[idx] = NO_HANDLE
4010    ///       invalidate(child)
4011    ///   ```
4012    /// Deep linear chains overflowed the OS thread stack. The work-queue
4013    /// rewrite has no ordering subtleties (unlike teardown's R1.3.9.d
4014    /// metas-first constraint) — Invalidate is a tier-4 broadcast where
4015    /// the never-populated / already-invalidated guard provides natural
4016    /// idempotency for diamond fan-in.
4017    fn invalidate_inner(&self, s: &mut CoreState, root: NodeId) {
4018        let mut work: Vec<NodeId> = vec![root];
4019        while let Some(node_id) = work.pop() {
4020            // Never-populated / already-invalidated: no-op (R1.4 idempotency).
4021            // Per R1.3.9.c never-populated case, OnInvalidate cleanup hook
4022            // also does NOT fire — natural fallout of skipping via the
4023            // cache==NO_HANDLE guard (we never reach the queue-push below).
4024            let old_handle = s.require_node(node_id).cache;
4025            if old_handle == NO_HANDLE {
4026                continue;
4027            }
4028            // Clear cache + release the handle's slot ownership.
4029            s.require_node_mut(node_id).cache = NO_HANDLE;
4030            self.binding.release_handle(old_handle);
4031            // Slice E2 (R1.3.9.b strict per D057 + D058 fire-at-cache-clear):
4032            // queue OnInvalidate cleanup hook for lock-released drain at
4033            // wave-end. The dedup set guarantees at-most-once-per-wave-per-
4034            // node firing even if a node re-populates mid-wave (via fn-fire
4035            // emit) and gets re-invalidated through a separate path. Pure
4036            // cache==NO_HANDLE idempotency (above) catches "still at
4037            // sentinel" only; the explicit set is the strict R1.3.9.b
4038            // reading.
4039            // Q-beyond Sub-slice 3 (D108, 2026-05-09):
4040            // `invalidate_hooks_fired_this_wave` and
4041            // `deferred_cleanup_hooks` both live on per-thread WaveState.
4042            // Single borrow handles the dedup-insert and (on first
4043            // insertion) the cleanup-hook push.
4044            crate::batch::with_wave_state(|ws| {
4045                if ws.invalidate_hooks_fired_this_wave.insert(node_id) {
4046                    ws.deferred_cleanup_hooks
4047                        .push((node_id, CleanupTrigger::OnInvalidate));
4048                }
4049            });
4050            // Wire emission. Pause-aware via queue_notify.
4051            self.queue_notify(s, node_id, Message::Invalidate);
4052            // Cascade: for each child, clear the dep record's prev_data
4053            // referencing this node and push child onto the work queue.
4054            let child_ids: Vec<NodeId> = s
4055                .children
4056                .get(&node_id)
4057                .map(|c| c.iter().copied().collect())
4058                .unwrap_or_default();
4059            for child_id in child_ids {
4060                let dep_idx = s.require_node(child_id).dep_index_of(node_id);
4061                if let Some(idx) = dep_idx {
4062                    // Reset the child's dep record — the handle was just
4063                    // released. Subsequent first-run-gate checks see
4064                    // sentinel and re-close.
4065                    //
4066                    // Snapshot prev_data + data_batch retains for deferred
4067                    // release, then clear the record. Two-phase to satisfy
4068                    // the borrow checker (nodes + deferred_handle_releases
4069                    // are separate CoreState fields).
4070                    let (old_prev, batch_hs): (HandleId, SmallVec<[HandleId; 1]>) = {
4071                        let dr = &s.require_node(child_id).dep_records[idx];
4072                        (dr.prev_data, dr.data_batch.clone())
4073                    };
4074                    {
4075                        // Q-beyond Sub-slice 1 (D108, 2026-05-09):
4076                        // deferred_handle_releases moved to per-thread
4077                        // WaveState thread_local. State lock held; the
4078                        // thread_local borrow is independent.
4079                        crate::batch::with_wave_state(|ws| {
4080                            if old_prev != NO_HANDLE {
4081                                ws.deferred_handle_releases.push(old_prev);
4082                            }
4083                            for h in batch_hs {
4084                                ws.deferred_handle_releases.push(h);
4085                            }
4086                        });
4087                    }
4088                    let dr = &mut s.require_node_mut(child_id).dep_records[idx];
4089                    dr.prev_data = NO_HANDLE;
4090                    dr.data_batch.clear();
4091                    work.push(child_id);
4092                }
4093            }
4094        }
4095    }
4096}
4097
4098// -----------------------------------------------------------------------
4099// PAUSE / RESUME — multi-pauser lockset + replay buffer
4100// -----------------------------------------------------------------------
4101
4102/// Reported back from [`Core::resume`] when the final lock releases.
4103///
4104/// `replayed` is the number of tier-3/tier-4 messages dispatched to
4105/// subscribers as part of the drain. `dropped` is the number of messages
4106/// that fell out the front of the buffer due to the Core-global
4107/// `pause_buffer_cap` while this pause cycle was active. A non-zero
4108/// `dropped` indicates a controller held the lock long enough to overflow
4109/// the cap; the binding may want to surface a warning or error.
4110#[derive(Copy, Clone, Debug, PartialEq, Eq)]
4111pub struct ResumeReport {
4112    pub replayed: u32,
4113    pub dropped: u32,
4114}
4115
4116impl Core {
4117    /// Acquire a pause lock on `node_id`. The first lock transitions the
4118    /// node from `Active` to `Paused`; further locks add to the lockset.
4119    /// While paused, tier-3 (DATA/RESOLVED) and tier-4 (INVALIDATE) outgoing
4120    /// messages buffer in the node's pause buffer; other tiers flush
4121    /// immediately.
4122    ///
4123    /// Re-acquiring the same `lock_id` is an idempotent no-op (matches TS
4124    /// convention, R1.2.6 silent on the case).
4125    pub fn pause(&self, node_id: NodeId, lock_id: LockId) -> Result<(), PauseError> {
4126        let mut s = self.lock_state();
4127        let rec = s
4128            .nodes
4129            .get_mut(&node_id)
4130            .ok_or(PauseError::UnknownNode(node_id))?;
4131        // QA A5 (2026-05-07): terminated nodes can't be re-paused. Without
4132        // this check, a stale pause-controller calling pause() on an
4133        // already-terminated node would re-arm `pause_state` to Paused.
4134        // The terminate_node path collapses pause_state → Active and
4135        // drains the buffer (A3-related), but doesn't gate subsequent
4136        // pause() calls. Treat as idempotent no-op (consistent with how
4137        // emit/complete/error early-return on terminal).
4138        if rec.terminal.is_some() {
4139            return Ok(());
4140        }
4141        // Slice F audit close (2026-05-07): `PausableMode::Off` means the
4142        // dispatcher ignores PAUSE for this node — tier-3 flushes
4143        // immediately, fn fires immediately. Treat the call as a successful
4144        // no-op so callers don't need to special-case.
4145        if rec.pausable == PausableMode::Off {
4146            return Ok(());
4147        }
4148        rec.pause_state.add_lock(lock_id);
4149        Ok(())
4150    }
4151
4152    /// Release a pause lock on `node_id`. If the lockset becomes empty, the
4153    /// node transitions back to `Active` and the buffered messages are
4154    /// dispatched to subscribers in arrival order. Returns a [`ResumeReport`]
4155    /// when the final lock released; `None` if the lockset is still
4156    /// non-empty (further locks held).
4157    ///
4158    /// Releasing an unknown `lock_id` (or releasing on an already-Active
4159    /// node) is an idempotent no-op returning `None`.
4160    pub fn resume(
4161        &self,
4162        node_id: NodeId,
4163        lock_id: LockId,
4164    ) -> Result<Option<ResumeReport>, PauseError> {
4165        // Phase 1 (lock-held): collect drained buffer + pending-wave flag +
4166        // sink Arcs. For default-mode nodes whose `pending_wave` was set
4167        // during pause, schedule a single fn-fire by adding to
4168        // `pending_fires` BEFORE we exit the lock — the wave engine picks
4169        // it up on the next drain tick.
4170        let (sinks, messages, dropped, pending_wave_for_default) = {
4171            let mut s = self.lock_state();
4172            let rec = s
4173                .nodes
4174                .get_mut(&node_id)
4175                .ok_or(PauseError::UnknownNode(node_id))?;
4176            // For Off mode, pause/resume are no-ops by construction.
4177            if rec.pausable == PausableMode::Off {
4178                return Ok(None);
4179            }
4180            let was_default_mode = rec.pausable == PausableMode::Default;
4181            // Capture pending_wave BEFORE remove_lock collapses the state.
4182            let pending_wave = if was_default_mode {
4183                rec.pause_state.take_pending_wave()
4184            } else {
4185                false
4186            };
4187            let Some((buffer, dropped)) = rec.pause_state.remove_lock(lock_id) else {
4188                // Not the final-resume — restore the pending_wave flag we
4189                // tentatively cleared, since we're not transitioning to
4190                // Active yet.
4191                if pending_wave {
4192                    rec.pause_state.mark_pending_wave();
4193                }
4194                return Ok(None);
4195            };
4196            let sinks: Vec<Sink> = rec.subscribers.values().cloned().collect();
4197            let messages: Vec<Message> = buffer.into_iter().collect();
4198            // Default-mode pending-wave handling: schedule the fn-fire so
4199            // the wave engine consolidates the pause-window dep deliveries
4200            // into one fn execution. State nodes don't fire fn (no
4201            // `pending_fires` membership has effect for them).
4202            //
4203            // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
4204            // on per-thread WaveState.
4205            if pending_wave && was_default_mode {
4206                crate::batch::with_wave_state(|ws| {
4207                    ws.pending_fires.insert(node_id);
4208                });
4209            }
4210            (sinks, messages, dropped, pending_wave && was_default_mode)
4211        };
4212        let replayed = u32::try_from(messages.len()).unwrap_or(u32::MAX);
4213
4214        // Phase 2 (lock-released): fire sinks for ResumeAll-buffered
4215        // messages. Default-mode resume produces no buffered replay (the
4216        // consolidated fn-fire produces fresh wave traffic via the standard
4217        // commit_emission path).
4218        if !messages.is_empty() {
4219            for sink in &sinks {
4220                sink(&messages);
4221            }
4222            // Phase 3: balance the retain_handle calls done at buffer-push
4223            // time — sinks observe values but don't own refcount shares.
4224            for msg in &messages {
4225                if let Some(h) = msg.payload_handle() {
4226                    self.binding.release_handle(h);
4227                }
4228            }
4229        }
4230
4231        // Phase 4 (default-mode): drain the consolidated fn-fire scheduled
4232        // in Phase 1. `run_wave_for(node_id)` acquires the partitions
4233        // touched from `node_id` (Slice Y1 / Phase E) and runs the standard
4234        // drain pipeline; the new fn-fire emerges as a normal wave's worth
4235        // of messages to subscribers.
4236        if pending_wave_for_default {
4237            self.run_wave_for(node_id, |_this| {
4238                // The pending_fires entry was pushed in Phase 1 under the
4239                // lock. run_wave's drain picks it up.
4240            });
4241        }
4242        Ok(Some(ResumeReport { replayed, dropped }))
4243    }
4244
4245    /// True if the node currently holds at least one pause lock.
4246    #[must_use]
4247    pub fn is_paused(&self, node_id: NodeId) -> bool {
4248        self.state
4249            .lock()
4250            .require_node(node_id)
4251            .pause_state
4252            .is_paused()
4253    }
4254
4255    /// Number of pause locks currently held on `node_id`. `0` if Active.
4256    #[must_use]
4257    pub fn pause_lock_count(&self, node_id: NodeId) -> usize {
4258        self.state
4259            .lock()
4260            .require_node(node_id)
4261            .pause_state
4262            .lock_count()
4263    }
4264
4265    /// Test helper: whether `node_id` currently holds the given `lock_id`.
4266    #[must_use]
4267    pub fn holds_pause_lock(&self, node_id: NodeId, lock_id: LockId) -> bool {
4268        self.state
4269            .lock()
4270            .require_node(node_id)
4271            .pause_state
4272            .contains_lock(lock_id)
4273    }
4274}
4275
4276// -----------------------------------------------------------------------
4277// set_deps — atomic dep mutation
4278// -----------------------------------------------------------------------
4279
4280/// Errors returnable by [`Core::set_deps`].
4281///
4282/// Per `~/src/graphrefly-ts/docs/research/rewire-design-notes.md` and the
4283/// Phase 13.8 Q1 lock:
4284/// - `SelfDependency` — `n in newDeps` (self-loops are pathological without
4285///   explicit fixed-point semantics, which GraphReFly does not provide).
4286/// - `WouldCreateCycle { path }` — adding the new edge would create a cycle.
4287///   The `path` field reports the offending dep chain for debuggability.
4288/// - `UnknownNode` / `NotComputeNode` — invariant violations from the caller.
4289/// - `TerminalNode` — `n` itself has emitted COMPLETE/ERROR; rewiring a
4290///   terminal stream is a category error (terminal is one-shot at this
4291///   layer; recovery is the resubscribable path on a fresh subscribe).
4292/// - `TerminalDep` — a newly-added dep is terminal AND not resubscribable.
4293///   Resubscribable terminal deps are accepted because the subscribe path
4294///   resets their lifecycle. Non-resubscribable terminal deps would deliver
4295///   their already-emitted terminal directly to `n`'s `dep_terminals` slot,
4296///   which is rarely intended.
4297#[derive(Error, Debug, Clone, PartialEq)]
4298pub enum SetDepsError {
4299    /// `n` appeared in `new_deps` (self-loop rejection).
4300    #[error("set_deps({n:?}, ...): self-dependency rejected (n appeared in new_deps)")]
4301    SelfDependency { n: NodeId },
4302
4303    /// Adding the new dep would create a cycle. `path` is the chain
4304    /// `[added_dep, ..., n]` reachable via existing deps.
4305    #[error(
4306        "set_deps({n:?}, ...): cycle would form via path {path:?} \
4307         (adding {added_dep:?} → {n:?} closes the loop)"
4308    )]
4309    WouldCreateCycle {
4310        n: NodeId,
4311        added_dep: NodeId,
4312        path: Vec<NodeId>,
4313    },
4314
4315    #[error("set_deps: unknown node {0:?}")]
4316    UnknownNode(NodeId),
4317
4318    #[error("set_deps: node {0:?} is not a compute node (state nodes have no deps)")]
4319    NotComputeNode(NodeId),
4320
4321    /// `n` itself has terminated (COMPLETE / ERROR). Rewiring a terminal node
4322    /// is rejected — the stream has ended at this layer. To recover, mark
4323    /// the node resubscribable before terminate; a fresh subscribe will then
4324    /// reset its lifecycle.
4325    #[error("set_deps({n:?}, ...): node has already terminated; cannot rewire a terminal node")]
4326    TerminalNode { n: NodeId },
4327
4328    /// A newly-added dep is terminal AND non-resubscribable. Per Phase 13.8
4329    /// Q1, this is rejected; resubscribable terminal deps are allowed
4330    /// because the subscribe path resets them when activated. Already-present
4331    /// terminal deps are unaffected (their terminal status was accepted at
4332    /// the time they terminated).
4333    #[error(
4334        "set_deps({n:?}, ...): added dep {dep:?} is terminal and not resubscribable; \
4335         either mark it resubscribable before terminate, or remove the dep from new_deps"
4336    )]
4337    TerminalDep { n: NodeId, dep: NodeId },
4338
4339    /// `n` itself is currently mid-fire — a user fn for `n` re-entered Core
4340    /// via `set_deps(n, ...)` from inside `n`'s own `invoke_fn` /
4341    /// `project_each` / `predicate_each` / etc. Phase 1 of the dispatcher
4342    /// snapshotted `dep_handles` BEFORE the lock-released callback; the
4343    /// callback returning a `tracked` set indexed against THAT ordering
4344    /// would corrupt indices if the rewire re-orders deps mid-fire.
4345    /// Rejected to preserve the dynamic-tracked-indices invariant (D1).
4346    ///
4347    /// Workaround: schedule the rewire from a different node's fn (via
4348    /// `Core::emit` on a state node and observing the emit downstream),
4349    /// or perform the rewire after the wave completes (e.g. from a sink
4350    /// callback that is itself outside any fn-fire scope).
4351    ///
4352    /// Slice F (2026-05-07) — A6.
4353    #[error(
4354        "set_deps({n:?}, ...): rejected — node {n:?} is currently mid-fire \
4355         (set_deps from inside the firing node's own fn would corrupt the \
4356         Dynamic `tracked` indices snapshot taken before invoke_fn). \
4357         Schedule the rewire outside this fire scope."
4358    )]
4359    ReentrantOnFiringNode { n: NodeId },
4360
4361    /// `set_deps(n, ...)` would trigger a partition migration (union or
4362    /// split in the per-subgraph union-find registry) that affects the
4363    /// partition of a node currently mid-fire on this thread. Distinct
4364    /// from [`Self::ReentrantOnFiringNode`]: that variant rejects
4365    /// `set_deps(n, ...)` where `n` itself is firing; this variant
4366    /// rejects `set_deps(n, ...)` on some OTHER node whose union/split
4367    /// shifts a firing node's partition root mid-wave.
4368    ///
4369    /// Why this matters: Y1's wave engine holds an
4370    /// [`Arc<crate::subgraph::SubgraphLockBox>`] for the firing node's
4371    /// partition for the wave's duration. A union mid-wave swaps the
4372    /// box-identity for one of the two affected partitions; a split
4373    /// (Y1+ post-Phase-F) extracts a fresh box for the orphan side.
4374    /// Either way the held Arc would diverge from the registry's
4375    /// current root for that partition, so the wave would lose
4376    /// serialization against the box's true partition mid-flight.
4377    ///
4378    /// Per [`SESSION-rust-port-d3-per-subgraph-parallelism.md`](https://github.com/graphrefly/graphrefly-ts/blob/main/archive/docs/SESSION-rust-port-d3-per-subgraph-parallelism.md)
4379    /// Q3 = (a-strict): mid-wave migration is rejected at edge-mutation
4380    /// time. If a real consumer surfaces pressure to support mid-wave
4381    /// migration, lift via state-migration logic in a follow-up — but
4382    /// the v1 contract is "the partition a wave runs in cannot change
4383    /// shape mid-flight."
4384    ///
4385    /// `n` is the node whose `set_deps` was rejected; `firing` is the
4386    /// concretely-identified firing node whose partition would be
4387    /// migrated. Workaround: schedule the rewire outside the wave
4388    /// (e.g. emit a state-change that triggers `set_deps` from a sink
4389    /// callback running post-flush).
4390    ///
4391    /// Slice Y1 (D3 / D091, 2026-05-08).
4392    #[error(
4393        "set_deps({n:?}, ...): rejected — would migrate the partition of \
4394         currently-firing node {firing:?} mid-wave (union/split during \
4395         fire would invalidate the held wave_owner Arc). Schedule the \
4396         rewire outside the wave."
4397    )]
4398    PartitionMigrationDuringFire { n: NodeId, firing: NodeId },
4399}
4400
4401impl Core {
4402    /// Atomic dep mutation — change a node's upstream deps without TEARDOWN
4403    /// cascading and without losing cache.
4404    ///
4405    /// Per the TLA+-verified design at
4406    /// `~/src/graphrefly-ts/docs/research/wave_protocol_rewire.tla`
4407    /// (35,950 distinct states, all 7 invariants clean):
4408    ///
4409    /// - Removed deps: clear dirtyMask bit, drain pending queue, drop DepRecord.
4410    /// - Added deps: SENTINEL prevData; push-on-subscribe if added dep has cached DATA.
4411    /// - Preserved: `firstRunPassed`, `pauseLocks`, `pauseBuffer`, `cache` (ROM/RAM).
4412    /// - Status auto-settles if dirtyMask becomes empty.
4413    /// - Idempotent on `new_deps == current deps`.
4414    /// - Self-rewire `n ∈ new_deps` rejected (`SelfDependency`).
4415    /// - Cycles rejected (`WouldCreateCycle`).
4416    /// - Allowed mid-wave + while paused.
4417    /// - Phase 13.8 Q1: terminal `n` rejected (`TerminalNode`); newly-added
4418    ///   terminal non-resubscribable deps rejected (`TerminalDep`).
4419    ///
4420    /// The body is a single atomic dep-mutation transaction with several
4421    /// discrete validation stages. Splitting would require passing a
4422    /// partially-mutable CoreState across helpers, and the transaction's
4423    /// locality is what makes the F1 refcount-leak collection work.
4424    #[allow(clippy::too_many_lines)]
4425    pub fn set_deps(&self, n: NodeId, new_deps: &[NodeId]) -> Result<(), SetDepsError> {
4426        let mut s = self.lock_state();
4427        // Validate node exists and is compute. Read-once via the helper so
4428        // subsequent code can use `require_node(n)` without re-checking.
4429        let (is_state, is_producer, is_terminal) = {
4430            let rec = s.nodes.get(&n).ok_or(SetDepsError::UnknownNode(n))?;
4431            (rec.is_state(), rec.is_producer(), rec.terminal.is_some())
4432        };
4433        if is_state || is_producer {
4434            // State and Producer nodes have no declared deps — set_deps
4435            // is meaningless. Producer nodes manage their own subscriptions
4436            // through the binding's ProducerCtx; mutating their (empty)
4437            // dep set would not affect that.
4438            return Err(SetDepsError::NotComputeNode(n));
4439        }
4440        // Reject if `n` itself is terminal (Phase 13.8 Q1: terminal nodes
4441        // cannot be rewired; recovery is via resubscribable subscribe).
4442        if is_terminal {
4443            return Err(SetDepsError::TerminalNode { n });
4444        }
4445        // A6 reentrancy guard (Slice F, 2026-05-07): reject if `n` is
4446        // currently mid-fire on the wave-owner thread. Closes the D1 hazard
4447        // where `Phase 1` snapshotted `dep_handles` against pre-rewire dep
4448        // ordering and `Phase 3` would store the returned `tracked` indices
4449        // against post-rewire ordering. Same-thread re-entry is the only
4450        // path that matters — cross-thread emits already block on
4451        // `wave_owner` per the M1 design.
4452        // /qa F2 reverted (2026-05-10): currently_firing lives on
4453        // CoreState (per-Core, cross-thread visible). The D1 reentrance
4454        // check requires the same-thread visibility (a fn re-entering
4455        // set_deps on its own firing node), and the P13 cross-thread
4456        // check requires cross-thread visibility (Thread B's set_deps
4457        // observing Thread A's firing pushes during A's lock-released
4458        // invoke_fn). Per-Core placement on shared CoreState delivers
4459        // both. Read under the already-held state lock.
4460        if s.currently_firing.contains(&n) {
4461            return Err(SetDepsError::ReentrantOnFiringNode { n });
4462        }
4463        // Self-rewire rejection.
4464        if new_deps.contains(&n) {
4465            return Err(SetDepsError::SelfDependency { n });
4466        }
4467        // Validate all new deps exist.
4468        for &d in new_deps {
4469            if !s.nodes.contains_key(&d) {
4470                return Err(SetDepsError::UnknownNode(d));
4471            }
4472        }
4473        // Cycle detection: data flows parent → child via the `children` map.
4474        // Adding edge `d → n` (d becomes a dep of n) creates a cycle iff
4475        // `d` is already reachable from `n` via existing data-flow edges
4476        // (so `n → ... → d` exists, and the new `d → n` closes the loop).
4477        // DFS from `n` along `children` edges, looking for each added dep.
4478        let current_deps: HashSet<NodeId> = s.require_node(n).dep_ids().collect();
4479        let new_deps_set: HashSet<NodeId> = new_deps.iter().copied().collect();
4480        let added: HashSet<NodeId> = new_deps_set.difference(&current_deps).copied().collect();
4481        for &d in &added {
4482            if let Some(path) = self.path_from_to(&s, n, d) {
4483                return Err(SetDepsError::WouldCreateCycle {
4484                    n,
4485                    added_dep: d,
4486                    path,
4487                });
4488            }
4489        }
4490        // Phase 13.8 Q1: reject newly-added deps that are terminal AND not
4491        // resubscribable. Resubscribable terminal deps are allowed — the
4492        // subscribe path resets their lifecycle when something activates
4493        // them. Already-present (kept) deps are unaffected; their terminal
4494        // status was accepted at the time they terminated.
4495        for &d in &added {
4496            let dep_rec = s.require_node(d);
4497            if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
4498                return Err(SetDepsError::TerminalDep { n, dep: d });
4499            }
4500        }
4501        // Compute `removed` early (Phase F: needs to be available for P13
4502        // split-case widening below). Idempotent fast-path moved below the
4503        // P13 check accordingly.
4504        let removed: HashSet<NodeId> = current_deps.difference(&new_deps_set).copied().collect();
4505
4506        // Slice Y1 (D3 / D091 — P13, 2026-05-08): reject mid-wave set_deps
4507        // that would shift a currently-firing node's partition root.
4508        // Distinct from `ReentrantOnFiringNode` (same-node case, line above).
4509        // Holds the registry lock briefly under the state lock per the
4510        // P12-fix lock-discipline invariant `state lock → registry mutex`.
4511        //
4512        // **Two cases:**
4513        // 1. **Union (Phase D)** — adding a cross-partition dep merges two
4514        //    components. Both pre-merge components are affected (the
4515        //    smaller-rank loser's box is dropped, its members migrate to
4516        //    the winner's root).
4517        // 2. **Split (Phase F, 2026-05-09)** — removing an edge whose
4518        //    removal disconnects the dep graph splits one component into
4519        //    two. The pre-split component is affected (every member
4520        //    re-unions; the orphan side gets a fresh `SubgraphLockBox`
4521        //    while the keep side preserves the original Arc).
4522        //
4523        // Either case migrates the partition root (and box-identity) for
4524        // affected nodes mid-wave; if any node currently firing on this
4525        // thread is in an affected partition, the wave's held
4526        // `Arc<SubgraphLockBox>` would diverge from the registry's new
4527        // canonical box. Q3 = (a-strict) per the D3 design lock rejects
4528        // both cases at edge-mutation time.
4529        // /qa F2 reverted (2026-05-10): currently_firing lives on
4530        // CoreState (cross-thread visible — load-bearing for this P13
4531        // partition-migration check, which detects cross-thread set_deps
4532        // calls during another thread's lock-released invoke_fn).
4533        // Snapshot under the already-held state lock so the registry
4534        // mutex acquire below doesn't need to also hold the snapshot
4535        // borrow.
4536        let currently_firing_snapshot: Vec<NodeId> = s.currently_firing.clone();
4537        if !currently_firing_snapshot.is_empty() && (!added.is_empty() || !removed.is_empty()) {
4538            let mut reg = self.registry.lock();
4539            // Snapshot firing nodes' partitions. `partition_of` is mutating
4540            // (path compression) but partition IDENTITY is stable across
4541            // reads (only `union_nodes` / `split_partition` mutate roots).
4542            let firing_with_partition: Vec<(NodeId, crate::subgraph::SubgraphId)> =
4543                currently_firing_snapshot
4544                    .iter()
4545                    .filter_map(|&f| reg.partition_of(f).map(|p| (f, p)))
4546                    .collect();
4547            if !firing_with_partition.is_empty() {
4548                let part_n = reg.partition_of(n);
4549                // Case 1 (union): for each added dep, check cross-partition merge.
4550                for &added_dep in &added {
4551                    let part_added = reg.partition_of(added_dep);
4552                    if part_n == part_added {
4553                        continue; // same-partition add is a no-op in union-find
4554                    }
4555                    let affected = [part_n, part_added];
4556                    if let Some(&(firing, _)) = firing_with_partition
4557                        .iter()
4558                        .find(|(_, p)| affected.contains(&Some(*p)))
4559                    {
4560                        return Err(SetDepsError::PartitionMigrationDuringFire { n, firing });
4561                    }
4562                }
4563                // Case 2 (split): for each removed dep, simulate undirected
4564                // walk from `removed_dep` skipping the would-be-removed edge
4565                // (`removed_dep → n`); if `n` is unreachable, removal would
4566                // disconnect — split — affecting all nodes in that
4567                // partition. Since dep edges are within a single partition
4568                // by construction (union-find merges on edge add), every
4569                // node currently in the partition is affected.
4570                //
4571                // QA-fix #4 (2026-05-09): pass `added_edges` as `extra_edges`
4572                // so a `set_deps` that simultaneously REMOVES one edge AND
4573                // ADDS another path isn't falsely rejected. Without this,
4574                // the pre-mutation walk doesn't see the would-be-added
4575                // edges and reports disconnect even when the net change
4576                // preserves connectivity.
4577                let added_edges: Vec<(NodeId, NodeId)> = added.iter().map(|&a| (a, n)).collect();
4578                for &removed_dep in &removed {
4579                    let part_removed = reg.partition_of(removed_dep);
4580                    let visited = walk_undirected_dep_graph(
4581                        &s,
4582                        removed_dep,
4583                        Some((removed_dep, n)),
4584                        &added_edges,
4585                    );
4586                    let would_disconnect = !visited.contains(&n);
4587                    if would_disconnect {
4588                        if let Some(&(firing, _)) = firing_with_partition
4589                            .iter()
4590                            .find(|(_, p)| Some(*p) == part_removed)
4591                        {
4592                            return Err(SetDepsError::PartitionMigrationDuringFire { n, firing });
4593                        }
4594                    }
4595                }
4596            }
4597        }
4598        // Idempotent fast-path. Now safe to short-circuit since the P13
4599        // check above already considered both `added` and `removed`.
4600        if added.is_empty() && removed.is_empty() {
4601            return Ok(());
4602        }
4603
4604        // Snapshot old deps (ordered) for topology event, before mutation.
4605        let old_deps_vec: Vec<NodeId> = s.require_node(n).dep_ids_vec();
4606
4607        // Carry out the rewire atomically.
4608        // 1. Build new dep_records, preserving DepRecord state for kept deps.
4609        let new_deps_vec: Vec<NodeId> = new_deps.to_vec();
4610        //
4611        // Refcount discipline (F1 audit fix): each `Some(TerminalKind::Error(h))`
4612        // slot owns a refcount share retained at `terminate_node` time. When a
4613        // dep is REMOVED, its slot is dropped — the corresponding handle's
4614        // share must be released here, otherwise it leaks until Core drop.
4615        // Also release data_batch retains for removed deps.
4616        let (new_dep_records, removed_handles): (Vec<DepRecord>, Vec<HandleId>) = {
4617            let rec = s.require_node(n);
4618            // Index old dep_records by NodeId for O(1) lookup of kept deps.
4619            let old_by_node: HashMap<NodeId, &DepRecord> =
4620                rec.dep_records.iter().map(|dr| (dr.node, dr)).collect();
4621            let new_records: Vec<DepRecord> = new_deps_vec
4622                .iter()
4623                .map(|&d| {
4624                    if let Some(old) = old_by_node.get(&d) {
4625                        // Kept dep: preserve all state (prev_data, data_batch,
4626                        // terminal, wave flags). Subscriptions stay live.
4627                        DepRecord {
4628                            node: d,
4629                            prev_data: old.prev_data,
4630                            dirty: old.dirty,
4631                            involved_this_wave: old.involved_this_wave,
4632                            data_batch: old.data_batch.clone(),
4633                            terminal: old.terminal,
4634                        }
4635                    } else {
4636                        // Added dep: fresh sentinel record.
4637                        DepRecord::new(d)
4638                    }
4639                })
4640                .collect();
4641            // Collect handles to release from REMOVED dep records.
4642            let mut to_release: Vec<HandleId> = Vec::new();
4643            for d in &removed {
4644                if let Some(old) = old_by_node.get(d) {
4645                    if let Some(TerminalKind::Error(h)) = old.terminal {
4646                        to_release.push(h);
4647                    }
4648                    // Release data_batch retains for removed deps.
4649                    for &h in &old.data_batch {
4650                        to_release.push(h);
4651                    }
4652                }
4653            }
4654            (new_records, to_release)
4655        };
4656        // Clear dirtyMask bit by re-emitting the wave-bookkeeping: we don't
4657        // currently model a per-dep dirtyMask explicitly (we use the boolean
4658        // `dirty` flag at node level). Removing a dep's entry from the implicit
4659        // mask is therefore implicit — by removing the dep, future emissions
4660        // from it can't re-arm the bit. The per-dep `involved_this_wave` flag
4661        // stays wave-scoped and gets cleared at wave end. The setDeps action
4662        // itself does NOT change the dirty boolean unless all deps are cleared;
4663        // in that case we settle.
4664        // Slice E2 (D067): on a dynamic node that had previously fired its
4665        // fn, capture `has_fired_once` BEFORE the reset so we can fire
4666        // `OnRerun` cleanup lock-released after `drop(s)` below. Without
4667        // this, the next `fire_regular` Phase 1 would capture
4668        // `has_fired_once = false`, causing Phase 1.5 to skip OnRerun —
4669        // silently dropping the prior activation's cleanup closure when
4670        // the next `invoke_fn` overwrites `current_cleanup`. Per spec
4671        // R2.4.5, `set_deps` does NOT end the activation cycle
4672        // (subscribe→unsubscribe is the cycle boundary), so OnRerun must
4673        // fire on every re-fire including post-set_deps.
4674        let fire_set_deps_on_rerun;
4675        {
4676            let rec = s.require_node_mut(n);
4677            fire_set_deps_on_rerun = rec.is_dynamic && rec.has_fired_once;
4678            rec.dep_records = new_dep_records;
4679            // Re-derive `tracked` for static derived: all indices.
4680            // For dynamic: clear `tracked` AND reset `has_fired_once` so the
4681            // next dep delivery satisfies the first-fire branch in
4682            // `deliver_data_to_consumer` (`!has_fired_once || tracked.contains(...)`).
4683            // Without resetting `has_fired_once`, the cleared `tracked` blocks
4684            // every future fire — fn never re-runs and the dynamic node sits
4685            // on stale cache derived from the old dep set. The next fire
4686            // re-runs fn unconditionally; fn's returned `tracked` then
4687            // repopulates `rec.tracked` and normal selective-deps semantics
4688            // resume from the next dep update onward.
4689            if rec.is_dynamic {
4690                rec.tracked.clear();
4691                rec.has_fired_once = false;
4692            } else {
4693                // Derived (static) and Operator track all deps.
4694                rec.tracked = (0..new_deps_vec.len()).collect();
4695            }
4696        }
4697
4698        // 2. Update inverted-edge map (children).
4699        for &removed_dep in &removed {
4700            if let Some(set) = s.children.get_mut(&removed_dep) {
4701                set.remove(&n);
4702            }
4703        }
4704        for &added_dep in &added {
4705            s.children.entry(added_dep).or_default().insert(n);
4706        }
4707
4708        // 3. Push-on-subscribe for added deps with cached DATA. Wraps in a
4709        // wave so any downstream propagation runs cleanly. We capture only
4710        // the LIST of added deps (not their cache values) because the cache
4711        // can change between releasing the validation lock and the wave's
4712        // re-acquisition — see the P2 race fix below.
4713        //
4714        // P2 (Slice A close /qa) — between `drop(s)` and `run_wave`'s
4715        // closure re-acquiring the lock, a concurrent thread could
4716        // invalidate one of the added deps, releasing its cache handle. A
4717        // pre-snapshot of `(added_dep, cache)` pairs would then carry a
4718        // dangling HandleId into `deliver_data_to_consumer`. The fix is to
4719        // re-read each added dep's `cache` INSIDE the closure (under the
4720        // freshly re-acquired state lock). The wave-owner re-entrant mutex
4721        // (Q2) blocks concurrent waves once we enter `run_wave`, so the
4722        // re-read sees a coherent post-validation state.
4723        let added_for_wave: Vec<NodeId> = added.iter().copied().collect();
4724        // Slice Y1 (D3 / D090 — P12 fix, 2026-05-08): maintain partition
4725        // membership BEFORE dropping the state lock so the registry can
4726        // never lag behind topology mutations as observed by concurrent
4727        // readers. Lock-order invariant `state lock → registry mutex`
4728        // (one-way; never registry → state) — see the matching block in
4729        // `Core::register` for the full rationale.
4730        //   - For each new edge: union the partitions of `n` and `added_dep`.
4731        //   - For each removed edge (Slice Y1 / Phase F, 2026-05-09):
4732        //     run undirected-dep-graph BFS from `removed_dep` over the
4733        //     POST-removal `s.children` + `dep_records`. If `n` is
4734        //     unreachable, the partition has split — gather the affected
4735        //     component nodes + intra-component edges, then call
4736        //     [`SubgraphRegistry::split_partition`] to migrate the orphan
4737        //     side onto a fresh `SubgraphLockBox`. Mid-fire splits would
4738        //     have been rejected at the P13 check above (Q3 = (a-strict)).
4739        {
4740            let mut reg = self.registry.lock();
4741            for &added_dep in &added {
4742                reg.union_nodes(n, added_dep);
4743            }
4744            for &removed_dep in &removed {
4745                // Post-removal walk — `s.children[removed_dep]` no longer
4746                // contains `n`, and `s.nodes[n].dep_records` no longer
4747                // contains `removed_dep`. No skip needed; no extra edges
4748                // (added edges are already applied to `s.children` and
4749                // `dep_records` by the time we reach this block).
4750                let visited = walk_undirected_dep_graph(&s, removed_dep, None, &[]);
4751                if visited.contains(&n) {
4752                    // Still connected via other dep edges — no split.
4753                    continue;
4754                }
4755                // Disconnected. `visited` is the keep-side (containing
4756                // `removed_dep`). Identify the original component, the
4757                // intra-component dep edges, and split.
4758                let original_root = reg.find(removed_dep);
4759                // Snapshot keys before iterating — `find` mutates via
4760                // path compression; iterating + mutating concurrently
4761                // would alias-borrow.
4762                let snapshot_keys: Vec<NodeId> = reg.registered_nodes();
4763                let component_nodes: Vec<NodeId> = snapshot_keys
4764                    .into_iter()
4765                    .filter(|&node| reg.find(node) == original_root)
4766                    .collect();
4767                let component_set: HashSet<NodeId> = component_nodes.iter().copied().collect();
4768                // Collect dep edges within the component (post-removal).
4769                // Edge convention: `(parent, child)` data-flow direction.
4770                let mut edges_in_component: Vec<(NodeId, NodeId)> = Vec::new();
4771                for &node in &component_nodes {
4772                    if let Some(rec) = s.nodes.get(&node) {
4773                        for d in rec.dep_records.iter().map(|r| r.node) {
4774                            if component_set.contains(&d) {
4775                                edges_in_component.push((d, node));
4776                            }
4777                        }
4778                    }
4779                }
4780                let keep_side_nodes: Vec<NodeId> = visited.iter().copied().collect();
4781                reg.split_partition(&component_nodes, &keep_side_nodes, &edges_in_component);
4782                // Marker call kept for symmetry with `union_nodes` — the
4783                // registry's `on_edge_removed` is itself a no-op (Phase F
4784                // moved the actual work into Core where the dep-graph
4785                // view is available).
4786                reg.on_edge_removed(n, removed_dep);
4787            }
4788        }
4789        // Drop the state lock before run_wave (which acquires its own) and
4790        // before crossing the binding boundary for the F1 refcount-fix
4791        // releases. Keeps the lock-discipline split (binding calls outside
4792        // the state lock) consistent with the rest of the dispatcher.
4793        drop(s);
4794        // Slice E2 (D067): fire OnRerun lock-released for dynamic nodes
4795        // that had previously fired. The cleanup closure cleans up
4796        // resources tied to the old dep shape before the next fn-fire
4797        // (triggered by added-dep push-on-subscribe below) registers a
4798        // fresh cleanup spec. Direct fire (NOT via deferred_cleanup_hooks)
4799        // because set_deps may NOT enter a wave (no added deps → no
4800        // run_wave below) — queueing the hook would orphan it until the
4801        // next unrelated wave drains.
4802        if fire_set_deps_on_rerun {
4803            self.binding.cleanup_for(n, CleanupTrigger::OnRerun);
4804        }
4805        // Fire topology event after lock is dropped.
4806        self.fire_topology_event(&crate::topology::TopologyEvent::DepsChanged {
4807            node: n,
4808            old_deps: old_deps_vec,
4809            new_deps: new_deps_vec.clone(),
4810        });
4811        if !added_for_wave.is_empty() {
4812            // Slice Y1 / Phase E: push-on-subscribe wave runs on `n`'s
4813            // touched partitions. Added deps are now unioned with `n`
4814            // (Phase C P12 fix moved registry mutation inside the state
4815            // lock), so any cascade through them stays in `n`'s partition
4816            // set as walked by `compute_touched_partitions`.
4817            self.run_wave_for(n, |this| {
4818                let mut s = this.lock_state();
4819                // Defensive: re-validate `n` still exists and isn't terminal.
4820                // A concurrent path could have terminated it between
4821                // validation and run_wave_for's partition-lock acquisition.
4822                if !s.nodes.contains_key(&n) || s.require_node(n).terminal.is_some() {
4823                    return;
4824                }
4825                for added_dep in &added_for_wave {
4826                    // Re-read cache under the wave-owner-held lock — this
4827                    // is the post-validation, post-concurrent-action
4828                    // snapshot. NO_HANDLE means the dep was invalidated
4829                    // concurrently; skip (no data to push).
4830                    let cache = match s.nodes.get(added_dep) {
4831                        Some(rec) => rec.cache,
4832                        None => continue, // dep deleted concurrently
4833                    };
4834                    if cache == NO_HANDLE {
4835                        continue;
4836                    }
4837                    let dep_idx = s.require_node(n).dep_index_of(*added_dep);
4838                    if let Some(idx) = dep_idx {
4839                        this.deliver_data_to_consumer(&mut s, n, idx, cache);
4840                    }
4841                }
4842            });
4843        }
4844        for h in removed_handles {
4845            self.binding.release_handle(h);
4846        }
4847        Ok(())
4848    }
4849
4850    /// DFS from `from` along data-flow edges (children map) looking for `to`.
4851    /// Returns the path including endpoints, or `None` if unreachable. Used
4852    /// for cycle detection in [`Self::set_deps`].
4853    fn path_from_to(&self, s: &CoreState, from: NodeId, to: NodeId) -> Option<Vec<NodeId>> {
4854        if from == to {
4855            return Some(vec![from]);
4856        }
4857        let mut stack: Vec<(NodeId, Vec<NodeId>)> = vec![(from, vec![from])];
4858        let mut visited: HashSet<NodeId> = HashSet::new();
4859        while let Some((cur, path)) = stack.pop() {
4860            if !visited.insert(cur) {
4861                continue;
4862            }
4863            if cur == to {
4864                return Some(path);
4865            }
4866            if let Some(children) = s.children.get(&cur) {
4867                for &child in children {
4868                    let mut new_path = path.clone();
4869                    new_path.push(child);
4870                    stack.push((child, new_path));
4871                }
4872            }
4873        }
4874        None
4875    }
4876}
4877
4878// CoreState helpers — kept on the inner struct so they're naturally scoped
4879// to the lock guard.
4880impl CoreState {
4881    fn alloc_node_id(&mut self) -> NodeId {
4882        let id = NodeId::new(self.next_node_id);
4883        self.next_node_id += 1;
4884        id
4885    }
4886
4887    fn alloc_sub_id(&mut self) -> SubscriptionId {
4888        let id = SubscriptionId(self.next_subscription_id);
4889        self.next_subscription_id += 1;
4890        id
4891    }
4892
4893    /// Clear wave-scoped flags and rotate per-dep batch data on every
4894    /// node. Run at the end of every wave (regular drain via `run_wave`,
4895    /// activation drain via `activate_derived`, and `BatchGuard::drop`'s
4896    /// drain). Centralized so a future wave-state field can't be missed
4897    /// at one of the cleanup sites.
4898    ///
4899    /// Per-dep rotation (R2.9.b / R1.3.6.b):
4900    /// - `prev_data` ← last element of `data_batch` (or unchanged if empty).
4901    ///   The last batch entry's retain transfers to `prev_data`; the old
4902    ///   `prev_data`'s retain is released. All earlier batch entries are
4903    ///   released.
4904    /// - `data_batch` cleared.
4905    /// - Per-dep `dirty` and `involved_this_wave` cleared.
4906    ///
4907    /// Handle releases are pushed to `deferred_handle_releases` for
4908    /// post-lock-drop release by the caller.
4909    pub(crate) fn clear_wave_state(&mut self, ws: &mut crate::batch::WaveState) {
4910        // Q-beyond Sub-slice 1 (D108, 2026-05-09): `pending_auto_resolve`
4911        // + `pending_pause_overflow` clears moved to
4912        // [`crate::batch::WaveState::clear_wave_state`]. The per-NodeRecord
4913        // rotation below pushes batch-handle and prev_data releases into
4914        // `ws.deferred_handle_releases` (was `cps.deferred_handle_releases`
4915        // pre-sub-slice-1, was `s.deferred_handle_releases` pre-Q2). Caller
4916        // borrows the WaveState thread_local; no lock-discipline rule
4917        // applies (state lock + thread_local borrow are independent).
4918        //
4919        // Q-beyond Sub-slice 3 (D108, 2026-05-09):
4920        // `invalidate_hooks_fired_this_wave` clear moved to
4921        // [`crate::batch::WaveState::clear_wave_state`]. The
4922        // `deferred_cleanup_hooks` invariant (NOT cleared here, drained
4923        // explicitly on success/panic paths) likewise moves with the
4924        // field.
4925        //
4926        // /qa F2 reverted (2026-05-10): `currently_firing` stays on
4927        // CoreState (per-Core, cross-thread visible — load-bearing for
4928        // P13). Defensive clear here mirrors the pre-sub-slice-3 safety
4929        // net (`FiringGuard`'s RAII push/pop is balanced even on panic;
4930        // a future code path that bypasses the guard would otherwise
4931        // leak a stale entry into the next wave).
4932        self.currently_firing.clear();
4933        //
4934        // Slice G tier3 emit tracking moved to per-partition state (Q3,
4935        // 2026-05-09); cleared by [`super::WaveOwnerGuard::drop`] on
4936        // outermost release for each partition the wave touched.
4937        for rec in self.nodes.values_mut() {
4938            rec.dirty = false;
4939            rec.involved_this_wave = false;
4940            for dr in &mut rec.dep_records {
4941                let batch_len = dr.data_batch.len();
4942                if batch_len > 0 {
4943                    // Release all batch entries EXCEPT the last — the last
4944                    // entry's retain transfers to prev_data.
4945                    for &h in &dr.data_batch[..batch_len - 1] {
4946                        ws.deferred_handle_releases.push(h);
4947                    }
4948                    // Release the OLD prev_data (its retain was from the
4949                    // previous wave's rotation or from initial delivery).
4950                    if dr.prev_data != NO_HANDLE {
4951                        ws.deferred_handle_releases.push(dr.prev_data);
4952                    }
4953                    // Rotate: last batch entry becomes new prev_data.
4954                    // Its retain carries over — no extra retain needed.
4955                    dr.prev_data = dr.data_batch[batch_len - 1];
4956                    dr.data_batch.clear();
4957                }
4958                dr.dirty = false;
4959                dr.involved_this_wave = false;
4960            }
4961        }
4962    }
4963
4964    pub(crate) fn require_node(&self, id: NodeId) -> &NodeRecord {
4965        self.nodes
4966            .get(&id)
4967            .unwrap_or_else(|| panic!("unknown node {id:?}"))
4968    }
4969
4970    pub(crate) fn require_node_mut(&mut self, id: NodeId) -> &mut NodeRecord {
4971        self.nodes
4972            .get_mut(&id)
4973            .unwrap_or_else(|| panic!("unknown node {id:?}"))
4974    }
4975}
4976
4977/// Release every binding-side refcount share owned by this `CoreState`
4978/// when the last `Core` clone drops the inner Mutex.
4979///
4980/// Without this, every retained handle in `cache` / `terminal` Error /
4981/// `dep_terminals` Error / pause-buffer-payload would leak in the binding
4982/// registry until process exit. Production bindings (napi-rs, pyo3,
4983/// wasm-bindgen) all maintain handle-ref maps that grow unbounded without
4984/// this cleanup.
4985///
4986/// Safe to call during panic unwinding — `BindingBoundary::release_handle`
4987/// is the only call, and a panicking binding during cleanup would already
4988/// have been a problem in normal operation.
4989impl Drop for CoreState {
4990    fn drop(&mut self) {
4991        // Q-beyond Sub-slice 3 (D108, 2026-05-09): `deferred_flush_jobs`
4992        // moved to [`crate::batch::WaveState`]. The `Vec<Sink>` clones
4993        // drop naturally with the per-thread WaveState's lifetime; no
4994        // CoreState-side cleanup needed.
4995        // Q-beyond Sub-slice 1 (D108, 2026-05-09): `deferred_handle_releases`
4996        // and `wave_cache_snapshots` moved to per-thread WaveState
4997        // thread_local. By outermost-BatchGuard-drop discipline both fields
4998        // are empty by the time CoreState drops (BatchGuard owns a Core
4999        // clone, so Core can't drop while a BatchGuard is in flight). Any
5000        // thread that ran a wave on this Core drained on its own outermost
5001        // BatchGuard; cross-Core thread_local sharing is fine because each
5002        // wave drains its own retains.
5003        //
5004        // Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_fires` and
5005        // `pending_notify` likewise moved to per-thread WaveState. The
5006        // pre-Sub-slice-2 `pending_notify` walk here (drain + release each
5007        // payload_handle) is no longer reachable from `Drop for CoreState`:
5008        // by invariant, no wave is in flight when CoreState drops (BatchGuard
5009        // holds a Core clone), so the originating thread's WaveState
5010        // pending_notify is empty by then. Other threads' WaveStates are
5011        // unreachable from CoreState::drop anyway — they're per-thread
5012        // thread_locals scoped to whichever thread ran the wave. The
5013        // outermost `BatchGuard::drop` is the canonical drain point on both
5014        // success and panic paths; Drop for CoreState relies on that
5015        // discipline holding rather than re-implementing it.
5016
5017        // Per-node retained handles:
5018        //   - `cache` (1 retain per non-NO_HANDLE state cache or
5019        //     populated compute cache).
5020        //   - `terminal == Some(Error(h))` (1 retain on the terminal slot).
5021        //   - `dep_terminals[i] == Some(Error(h))` (1 retain per consumer's
5022        //     terminated-dep slot).
5023        //   - `pause_state` paused buffer messages with payload handles
5024        //     (1 retain per buffered Data/Error).
5025        for rec in self.nodes.values_mut() {
5026            if rec.cache != NO_HANDLE {
5027                self.binding.release_handle(rec.cache);
5028            }
5029            if let Some(TerminalKind::Error(h)) = rec.terminal {
5030                self.binding.release_handle(h);
5031            }
5032            for dr in &rec.dep_records {
5033                if let Some(TerminalKind::Error(h)) = dr.terminal {
5034                    self.binding.release_handle(h);
5035                }
5036                // Release data_batch retains (in-flight wave data).
5037                for &h in &dr.data_batch {
5038                    self.binding.release_handle(h);
5039                }
5040                // Release prev_data retain (cross-wave persistence).
5041                if dr.prev_data != NO_HANDLE {
5042                    self.binding.release_handle(dr.prev_data);
5043                }
5044            }
5045            if let PauseState::Paused { buffer, .. } = &rec.pause_state {
5046                for msg in buffer {
5047                    if let Some(h) = msg.payload_handle() {
5048                        self.binding.release_handle(h);
5049                    }
5050                }
5051            }
5052            // Slice E1: release replay-buffer retains.
5053            for &h in &rec.replay_buffer {
5054                self.binding.release_handle(h);
5055            }
5056            // Operator scratch (Slice C-3, D026): generic per-operator
5057            // state struct. Each variant's release_handles releases the
5058            // shares it owns (Scan/Reduce acc, Distinct/Pairwise prev,
5059            // Last latest + default; Take/Skip/TakeWhile own no handles).
5060            if let Some(scratch) = rec.op_scratch.as_mut() {
5061                scratch.release_handles(&*self.binding);
5062            }
5063        }
5064
5065        // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): WaveState's
5066        // retain-holding fields (`wave_cache_snapshots`,
5067        // `deferred_handle_releases`, `pending_notify`) are drained by
5068        // outermost BatchGuard::drop (success + panic paths). See
5069        // comment above for the invariant.
5070    }
5071}