Skip to main content

graphrefly_core/
node.rs

1//! The dispatcher — node registration, subscription, wave engine.
2//!
3//! Mirrors `~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`
4//! (the Phase 13.6 brainstorm prototype, ~370 lines, 22 invariant tests).
5//!
6//! # Scope (M1 dispatcher + Slice A+B parity, closed 2026-05-05)
7//!
8//! - State + derived + dynamic node registration.
9//! - Subscribe / unsubscribe with push-on-subscribe (R1.2.3).
10//! - RAII [`Subscription`] with Drop-based deregister (§10.12).
11//! - DIRTY → DATA / RESOLVED ordering (R1.3.1.b two-phase push).
12//! - Equals-substitution (R1.3.2): identity is zero-FFI; custom crosses boundary.
13//! - First-run gate (R2.5.3) — fn does not fire until every dep has a handle.
14//! - Diamond resolution — one fn fire per wave even with shared upstream.
15//! - `set_deps()` atomic dep mutation with cycle detection + Phase 13.8 Q1
16//!   terminal-rejection policy (R3.3.1).
17//! - PAUSE / RESUME with lockId set + replay buffer (R1.2.6, R2.6, §10.2).
18//! - INVALIDATE broadcast + cascade with R1.4 idempotency.
19//! - COMPLETE / ERROR cascade + Lock 2.B auto-cascade gating
20//!   (ERROR dominates COMPLETE; first error wins).
21//! - TEARDOWN auto-precedes COMPLETE (R2.6.4 / Lock 6.F) +
22//!   `has_received_teardown` idempotency.
23//! - Meta TEARDOWN ordering (R1.3.9.d) — companions tear down before parent.
24//! - Resubscribable terminal lifecycle (R2.2.7, R2.5.3) — late subscribe to a
25//!   resubscribable terminal node resets lifecycle, except after TEARDOWN
26//!   (per F3 audit guard: TEARDOWN is permanent).
27//!
28//! # Module split (Slice C-1, 2026-05-05)
29//!
30//! Wave-engine internals (drain loop, fire selection, emission commit, sink
31//! dispatch) live in [`crate::batch`]. The split is purely organizational —
32//! the methods are still on `Core`. See `batch.rs` for the wave-engine
33//! entry points (`run_wave`, `drain_and_flush`, `commit_emission`,
34//! `queue_notify`, `deliver_data_to_consumer`).
35//!
36//! # Out of scope (later slices / milestones)
37//!
38//! - Deactivation cleanup (RAM nodes clear cache when sink count → 0) — M2.
39//!
40//! See [`migration-status.md`](../../../docs/migration-status.md) for the
41//! milestone tracker and [`porting-deferred.md`](../../../docs/porting-deferred.md)
42//! for surfaced concerns deferred to evidence-driven slices.
43//!
44//! # Re-entrance discipline (Slice A close, M1: fully lock-released)
45//!
46//! - **Wave-end sink fires** drop the state lock first. A subscriber's sink
47//!   that calls back into `Core::emit` / `pause` / `resume` / `invalidate` /
48//!   `complete` / `error` / `teardown` re-acquires the lock cleanly and runs
49//!   a nested wave (the per-(Core, thread) `in_tick` ownership slot is
50//!   cleared by the owning `BatchGuard::drop` before the deferred-fire
51//!   phase).
52//! - **`BindingBoundary::invoke_fn`** fires lock-released. The wave engine
53//!   acquires + drops the state lock per fn-fire iteration around the
54//!   `invoke_fn` callback. User fns may re-enter `Core::emit` / `pause` /
55//!   etc. and run a nested wave.
56//! - **`BindingBoundary::custom_equals`** fires lock-released.
57//!   `commit_emission` brackets the equals check around a lock release;
58//!   custom equals oracles may re-enter Core safely.
59//! - **Subscribe-time handshake** also fires lock-released. [`Core::subscribe`]
60//!   acquires the [`Core::wave_owner`] re-entrant mutex first (cross-thread
61//!   serialization), installs the sink under the state lock, drops the state
62//!   lock, then fires the per-tier handshake (`[Start]` / `[Data(cache)]?` /
63//!   `[Complete]?` / `[Error(h)]?` / `[Teardown]?` per R1.3.5.a) lock-released.
64//!   A handshake-time sink callback may re-enter Core (`emit` / `complete` /
65//!   `error` / `subscribe`); same-thread re-entry passes through `wave_owner`
66//!   transparently. Cross-thread emits block on `wave_owner` until the
67//!   subscribe path drops it, preserving R1.3.5.a happens-after ordering.
68
69use std::collections::VecDeque;
70use std::panic::{catch_unwind, AssertUnwindSafe};
71use std::sync::atomic::{AtomicU64, Ordering};
72use std::sync::{Arc, Weak};
73
74use ahash::{AHashMap as HashMap, AHashSet as HashSet};
75use parking_lot::{ArcReentrantMutexGuard, Mutex, MutexGuard};
76
77/// Held guard from `parking_lot::ReentrantMutex::lock_arc()` on a
78/// partition's `wave_owner`. `!Send` per `parking_lot::ReentrantMutex`'s
79/// thread-affinity contract (the inner guard is `!Send`; the wrapper
80/// inherits) — the type-level `!Send` flows into
81/// [`crate::batch::BatchGuard::_wave_guards`] so any attempt to send
82/// the batch guard across threads fails to compile.
83///
84/// **Phase H+ option (d) limited variant (2026-05-09):** the guard's
85/// [`Drop`] pops `sid` from the [`held_partitions`] thread-local so
86/// the ascending-order check on the next acquire sees the correct
87/// "currently held" set. Re-entrant acquires (same thread, same
88/// partition) increment a refcount in the thread-local; final drop
89/// removes the entry.
90///
91/// Slice Y1 / Phase E (2026-05-08); Phase H+ wrapper (2026-05-09).
92pub(crate) struct WaveOwnerGuard {
93    /// Drop order: the wrapper's Drop runs FIRST (pops `held_partitions`),
94    /// then `inner` drops automatically (releases the parking_lot lock).
95    /// Field-declaration order matters in Rust: the wrapper drops top-
96    /// down by default, so `inner` is listed AFTER `sid` so the wrapper's
97    /// custom Drop runs on the whole struct first, releasing the
98    /// thread-local entry under our control before the inner guard
99    /// hits parking_lot's release path.
100    sid: crate::subgraph::SubgraphId,
101    /// `#[allow(dead_code)]`: the inner guard is held to keep the
102    /// parking_lot::ReentrantMutex acquired for the wave's duration;
103    /// it's never read, only its `Drop` matters.
104    #[allow(dead_code)]
105    inner: ArcReentrantMutexGuard<parking_lot::RawMutex, parking_lot::RawThreadId, ()>,
106}
107
108impl Drop for WaveOwnerGuard {
109    fn drop(&mut self) {
110        // `held_partitions::release` returns `bool was_outermost`. The
111        // outermost-release signal is consumed by [`crate::batch::BatchGuard::drop`]
112        // for the per-thread `TIER3_EMITTED_THIS_WAVE` clear (D1 patch,
113        // 2026-05-09 — Slice G coalescing tracker is keyed by thread,
114        // not by partition, so the clear lives on `BatchGuard` not here).
115        // We discard the bool — no per-guard cleanup remains.
116        let _ = held_partitions::release(self.sid);
117        // `inner` drops automatically after this — releases the
118        // parking_lot::ReentrantMutex (decrementing parking_lot's
119        // own internal re-entry counter; only the FINAL release
120        // unparks waiters).
121    }
122}
123
124/// Phase H+ STRICT variant — thread-local infrastructure for
125/// cross-partition acquire-during-fire / cross-partition
126/// acquire-during-wave deadlock detection (D115, 2026-05-10).
127///
128/// **Protocol invariant enforced:** whenever this thread already
129/// holds at least one partition wave_owner (HELD non-empty), every
130/// NEW partition this thread tries to acquire must have an id
131/// strictly greater than every partition currently held. Re-entrant
132/// acquires (same thread, same partition that's already held) bypass
133/// the check (they're fine — `parking_lot::ReentrantMutex` allows
134/// same-thread re-entry transparently).
135///
136/// Without this check, two threads each doing nested cross-partition
137/// acquires within an active wave could form an AB/BA cycle: thread A
138/// holds X, attempts Y (Y < X); concurrently thread B holds Y,
139/// attempts X (X > Y, ascending-OK from B's POV). A's acquire on Y
140/// blocks behind B; B's acquire on X blocks behind A. Cycle.
141///
142/// **No producer carve-out:** the prior limited variant suppressed
143/// the check during producer build/sink closures via an
144/// `IN_PRODUCER_BUILD` refcount. The STRICT variant (D115) removes
145/// this carve-out entirely. Instead, `check_and_acquire` returns a
146/// typed `Result<(), PartitionOrderViolation>` error, and callers
147/// that would violate ascending order (producer-pattern operator
148/// sinks) defer the operation to wave-end via the per-Core
149/// `deferred_producer_ops` queue. The defer-and-retry approach
150/// preserves deadlock-freedom without suppressing the check.
151mod held_partitions {
152    use crate::subgraph::SubgraphId;
153    use smallvec::SmallVec;
154    use std::cell::RefCell;
155
156    /// Inline-storage capacity for the per-thread held-partitions set.
157    /// 4 mirrors the same inline limit used elsewhere in the codebase
158    /// (`compute_touched_partitions` returns `SmallVec<[SubgraphId; 4]>`,
159    /// `BatchGuard::_wave_guards` is `SmallVec<[WaveOwnerGuard; 4]>`).
160    /// In the typical wave a single thread holds 1–3 partitions; spillover
161    /// to the heap costs allocation but is correct.
162    const HELD_INLINE: usize = 4;
163
164    thread_local! {
165        /// Currently-held partitions on this thread, as `(SubgraphId, refcount)`
166        /// pairs in arbitrary order. The refcount mirrors
167        /// `parking_lot::ReentrantMutex`'s internal counter (we can't query
168        /// parking_lot's directly).
169        ///
170        /// `SmallVec<[_; HELD_INLINE]>` over `BTreeMap<_, _>`: under the
171        /// expected workload (≤4 partitions held simultaneously per wave) the
172        /// inline-storage SmallVec keeps the entire set in stack memory with
173        /// no allocation, no Box-per-node, and contiguous cache layout. Linear
174        /// scans are branch-predictable and faster than BTreeMap's logn
175        /// pointer-chasing for tiny N. Phase J post-widening bench
176        /// (`migration-status.md` 2026-05-09) reported 14–25% overhead vs the
177        /// pre-widening baseline, attributed in part to BTreeMap allocation
178        /// costs on the hot path. This swap recovers part of that overhead.
179        ///
180        /// Bookkeeping is unconditional — every acquire bumps the
181        /// refcount, every release decrements. The CHECK gate
182        /// (`!held.is_empty() && !already_held`) is what
183        /// distinguishes "first-time acquire on a fresh thread"
184        /// (allowed, held empty) from "nested acquire while we
185        /// already hold something" (must be ascending).
186        static HELD: RefCell<SmallVec<[(SubgraphId, u32); HELD_INLINE]>>
187            = const { RefCell::new(SmallVec::new_const()) };
188    }
189
190    /// Phase H+ STRICT check + bookkeeping. Called BEFORE acquiring the
191    /// partition's parking_lot::ReentrantMutex.
192    ///
193    /// Panics with a clear diagnostic if:
194    /// - HELD is non-empty (this thread already holds ≥1 partition),
195    /// - AND we're NOT inside a producer build closure,
196    /// - AND `sid` is NOT already held by this thread,
197    /// - AND `sid <= max(currently held)`.
198    ///
199    /// Otherwise: increments the refcount for `sid` (creating the
200    /// entry if needed) and returns. The caller MUST pair every
201    /// call with a [`release`] when the guard drops.
202    ///
203    /// **Important note on cross-thread vs same-thread:** this check
204    /// is a SAME-THREAD invariant — it catches a thread acquiring
205    /// out of order from itself. Cross-thread AB/BA cycles between
206    /// threads with disjoint same-thread acquisition orders are
207    /// prevented at a different layer (the
208    /// `compute_touched_partitions` upfront-acquire-all-ascending
209    /// rule in `Core::begin_batch_for`). This thread-local check
210    /// adds the layer that prevents a same-thread descending acquire
211    /// from creating the FIRST half of a cross-thread cycle.
212    pub(crate) fn check_and_acquire(sid: SubgraphId) -> Result<(), super::PartitionOrderViolation> {
213        HELD.with(|h| {
214            let mut held = h.borrow_mut();
215            // Gate: held non-empty (we're nested). First-time acquires
216            // on a fresh thread (held empty) skip the check — there's
217            // nothing to compare against.
218            let already_held = held.iter().any(|(s, _)| *s == sid);
219            if !held.is_empty() && !already_held {
220                // Linear-scan max over the inline storage (typical N ≤ 4).
221                // Branch-predictable and cache-local; no allocation.
222                if let Some(max_held) = held.iter().map(|(s, _)| *s).max() {
223                    if sid <= max_held {
224                        // Drop the borrow before returning Err so
225                        // the caller doesn't see a still-borrowed RefCell.
226                        let new_id = sid;
227                        drop(held);
228                        return Err(super::PartitionOrderViolation {
229                            attempted: new_id,
230                            max_held,
231                        });
232                    }
233                }
234            }
235            // Bookkeeping: increment refcount. `checked_add(1)` so
236            // overflow surfaces (would indicate an unbounded
237            // re-entrance — a real bug). Linear find-or-push.
238            if let Some((_, count)) = held.iter_mut().find(|(s, _)| *s == sid) {
239                *count = count.checked_add(1).expect(
240                    "held_partitions refcount overflow — unbounded \
241                     same-partition re-entrance. Should be bounded by the \
242                     protocol's MAX_BATCH_DRAIN_ITERATIONS cap.",
243                );
244            } else {
245                held.push((sid, 1));
246            }
247            Ok(())
248        })
249    }
250
251    /// Decrement the refcount for `sid`; remove the entry if it
252    /// hits zero. Called from [`super::WaveOwnerGuard::drop`] AND
253    /// from the retry / panic paths in
254    /// [`super::Core::partition_wave_owner_lock_arc`] to ensure the
255    /// refcount stays balanced under all unwind / retry / exhaust
256    /// paths.
257    ///
258    /// Returns `true` iff this release brought the partition's
259    /// refcount on this thread to zero — i.e. this was the OUTERMOST
260    /// guard for `sid` on this thread. [`super::WaveOwnerGuard::drop`]
261    /// uses this signal to clear per-partition wave state (Q3) on
262    /// outermost release only; inner re-entrant guard drops must NOT
263    /// clear (a containing wave is still active and still holds
264    /// in-flight wave state). The `partition_wave_owner_lock_arc`
265    /// retry / panic paths ignore the return value because the
266    /// partition state hadn't been touched yet on those paths
267    /// (clearing would be a no-op anyway).
268    pub(crate) fn release(sid: SubgraphId) -> bool {
269        HELD.with(|h| {
270            let mut held = h.borrow_mut();
271            if let Some(idx) = held.iter().position(|(s, _)| *s == sid) {
272                let count = &mut held[idx].1;
273                // /qa A3 fix (2026-05-09): debug_assert the refcount is
274                // non-zero before decrement. A `release(sid)` on an
275                // entry with `count == 0` indicates a bookkeeping bug
276                // (a release without a matching `check_and_acquire`,
277                // or a logic error in caller), but the legacy
278                // saturating_sub silently returned `was_outermost=true`
279                // and removed the entry — masking the bug. Surface
280                // in dev/test builds; release builds preserve the
281                // saturating behavior.
282                debug_assert!(
283                    *count > 0,
284                    "held_partitions::release({sid:?}): refcount underflow — \
285                     release without matching check_and_acquire (caller bug)"
286                );
287                *count = count.saturating_sub(1);
288                if *count == 0 {
289                    // `swap_remove` is O(1) and order-irrelevant: the
290                    // CHECK gate computes max via linear scan and does
291                    // not depend on iteration order.
292                    held.swap_remove(idx);
293                    return true;
294                }
295            } else {
296                // /qa A3 fix (2026-05-09): same intent — release on a
297                // sid that's not in HELD is a bookkeeping bug. Surface
298                // in dev/test builds.
299                debug_assert!(
300                    false,
301                    "held_partitions::release({sid:?}): sid not in HELD — \
302                     double-drop or stray release (caller bug)"
303                );
304            }
305            false
306        })
307    }
308
309    /// Returns `true` if this thread currently holds any partition
310    /// wave_owner. Used by `BatchGuard::drop` to skip the deferred-ops
311    /// drain when we're still nested inside an outer wave_guard scope
312    /// (Phase H+ STRICT, D115).
313    pub(crate) fn any_held() -> bool {
314        HELD.with(|h| !h.borrow().is_empty())
315    }
316
317    /// Test-only: read the current thread's held-partitions snapshot.
318    /// Used by post-panic regression assertions to verify the
319    /// thread-local stays clean even when the H+ check unwinds the
320    /// stack (so cargo's thread-reuse doesn't propagate corrupted
321    /// state to subsequent tests). `pub` (gated by
322    /// `cfg(any(test, debug_assertions))`) so integration tests
323    /// outside the crate can read it.
324    #[cfg(any(test, debug_assertions))]
325    #[must_use]
326    pub fn held_snapshot_for_tests() -> Vec<(SubgraphId, u32)> {
327        // /qa A2 fix (2026-05-09): sort by SubgraphId so the snapshot
328        // returns ascending order — matches the BTreeMap-iteration
329        // contract that pre-/qa SmallVec swap consumers might rely on.
330        // Test consumers currently only assert `is_empty()`, but the
331        // ordered shape is the safer default for future tests that
332        // assert specific entries.
333        let mut v: Vec<(SubgraphId, u32)> = HELD.with(|h| h.borrow().to_vec());
334        v.sort_unstable_by_key(|(s, _)| *s);
335        v
336    }
337}
338
339/// Test-only re-exports for integration tests under
340/// `crates/graphrefly-core/tests/`. Gated `#[cfg(any(test, debug_assertions))]`
341/// so they don't leak into release builds. Public visibility is
342/// required because integration tests live outside the crate.
343#[cfg(any(test, debug_assertions))]
344pub use held_partitions::held_snapshot_for_tests;
345use smallvec::SmallVec;
346use thiserror::Error;
347
348use crate::boundary::{BindingBoundary, CleanupTrigger};
349use crate::clock::monotonic_ns;
350use crate::handle::{FnId, HandleId, LockId, NodeId, NO_HANDLE};
351use crate::message::Message;
352
353/// Terminal-lifecycle state — once set on a node, the node will not emit
354/// further DATA; per-dep slots on consumers also use this to track which
355/// upstreams have terminated (R1.3.4 / Lock 2.B).
356///
357/// `Error` carries a [`HandleId`] resolving to the error value. Refcount is
358/// retained when the variant is stored in a node's `terminal` slot or any
359/// consumer's `dep_terminals` slot; v1 does not release these (terminal
360/// state is one-shot at this layer; release happens on resubscribable
361/// terminal-lifecycle reset, a separate slice).
362#[derive(Copy, Clone, Debug, PartialEq, Eq)]
363pub enum TerminalKind {
364    Complete,
365    Error(HandleId),
366}
367
368/// Node kind discriminant — **derived metadata** computed from
369/// [`NodeRecord`]'s field shape (D030 unification, Slice D).
370///
371/// Core no longer stores `kind` as a field; it's computed on demand from
372/// `(deps.is_empty(), fn_id.is_some(), op.is_some(), is_dynamic)`,
373/// mirroring TS's data model where `NodeImpl` has no `_kind` field. The
374/// shape uniquely identifies the kind:
375///
376/// | deps      | fn_id | op   | is_dynamic | kind     |
377/// |-----------|-------|------|-----------|----------|
378/// | empty     | None  | None | -         | State    |
379/// | empty     | Some  | None | -         | Producer |
380/// | non-empty | Some  | None | false     | Derived  |
381/// | non-empty | Some  | None | true      | Dynamic  |
382/// | non-empty | None  | Some | -         | Operator |
383///
384/// Public API ([`Core::kind_of`]) derives this enum on each call. State
385/// nodes are ROM (cache survives deactivation); compute nodes
386/// (Derived / Dynamic / Operator) and producers are RAM.
387#[derive(Copy, Clone, Eq, PartialEq, Debug)]
388pub enum NodeKind {
389    /// Source node: cache is intrinsic, no fn, no deps. Mutated via [`Core::emit`].
390    State,
391    /// Producer node: fn fires once on first subscribe. No deps;
392    /// emissions arrive via sinks the fn subscribes to (zip / concat /
393    /// race / takeUntil pattern). Slice D / D031.
394    Producer,
395    /// Derived node: fn fires on every dep change; all deps tracked.
396    Derived,
397    /// Dynamic node: fn declares which dep indices it actually read this run.
398    /// Untracked dep updates flow through cache but do NOT re-fire fn.
399    Dynamic,
400    /// Operator node: built-in dispatch path for transform / combine /
401    /// flow / resilience operators. The `OperatorOp` discriminant selects
402    /// the per-operator FFI path ([`BindingBoundary::project_each`] etc.);
403    /// Core manages per-operator state via the generic `op_scratch` slot
404    /// on `NodeRecord` (D026). Per Slice C-1 (D009) / Slice C-3 (D026).
405    Operator(OperatorOp),
406}
407
408impl NodeKind {
409    /// True if this kind opts OUT of Lock 2.B auto-cascade. Operator(Reduce)
410    /// and Operator(Last) must intercept upstream COMPLETE so they can emit
411    /// their accumulator / buffered value before the cascade terminates them;
412    /// instead of cascading, terminate_node queues such children for fn-fire
413    /// so `fire_operator` can handle the terminal.
414    pub(crate) fn skips_auto_cascade(self) -> bool {
415        matches!(
416            self,
417            NodeKind::Operator(
418                OperatorOp::Reduce { .. } | OperatorOp::Last { .. } | OperatorOp::Valve
419            )
420        )
421    }
422}
423
424/// Built-in operator discriminant. Selects the per-operator dispatch path
425/// in `fire_operator` (`crates/graphrefly-core/src/batch.rs`). Each variant
426/// carries the binding-side closure ids (and seed handle for stateful
427/// folders) needed for the wave-execution path; Core stores no user values
428/// itself per the handle-protocol cleaving plane.
429#[derive(Copy, Clone, Eq, PartialEq, Debug)]
430pub enum OperatorOp {
431    /// `map(source, project)` — element-wise transform. Calls
432    /// `BindingBoundary::project_each(fn_id, &inputs)` per fire; emits each
433    /// returned handle via `commit_emission_verbatim` (R1.3.2.d batch
434    /// semantics — no equals substitution between batch entries).
435    Map { fn_id: FnId },
436    /// `filter(source, predicate)` — silent-drop selection (D012/D018).
437    /// Calls `BindingBoundary::predicate_each(fn_id, &inputs)`; emits each
438    /// passing input verbatim. If zero pass on a wave that dirtied the
439    /// node, queues a single `RESOLVED` to settle (D018).
440    Filter { fn_id: FnId },
441    /// `scan(source, fold, seed)` — left-fold emitting each new accumulator.
442    /// `seed` is captured at registration; `acc` lives in
443    /// [`ScanState`](super::op_state::ScanState) inside
444    /// [`NodeRecord::op_scratch`] and persists across waves until
445    /// resubscribable reset. Calls `BindingBoundary::fold_each(fn_id, acc,
446    /// &inputs) -> SmallVec<HandleId>` per fire.
447    Scan { fn_id: FnId, seed: HandleId },
448    /// `reduce(source, fold, seed)` — left-fold emitting once on upstream
449    /// COMPLETE. Accumulates silently while source DATA flows; on
450    /// dep[0].terminal == Some(Complete), emits `[Data(acc), Complete]`.
451    /// On `Error(h)`, propagates the error verbatim. Opts out of Lock 2.B
452    /// auto-cascade (see `NodeKind::skips_auto_cascade`).
453    Reduce { fn_id: FnId, seed: HandleId },
454    /// `distinctUntilChanged(source, equals)` — suppresses adjacent
455    /// duplicates. Calls `BindingBoundary::custom_equals(equals_fn_id,
456    /// prev, current)` per input; emits non-equal items verbatim and
457    /// updates `prev`. If zero items pass on a wave that dirtied the node,
458    /// queues `RESOLVED` (matches Filter discipline).
459    DistinctUntilChanged { equals_fn_id: FnId },
460    /// `pairwise(source)` — emits `(prev, current)` pairs starting after
461    /// the second value. First value swallowed (sets `prev`). Calls
462    /// `BindingBoundary::pairwise_pack(fn_id, prev, current)` per pair to
463    /// produce the binding-side tuple handle.
464    Pairwise { fn_id: FnId },
465
466    // ----- Slice C-2: multi-dep combinators (D020) -----
467    /// `combine(...sources)` — N-dep combineLatest. On any dep fire, packs
468    /// the latest handle per dep into a single tuple handle via
469    /// `BindingBoundary::pack_tuple(pack_fn, &handles)`. First-run gate
470    /// (`partial: false` default) holds until all deps deliver real DATA
471    /// (R2.5.3). COMPLETE cascades when all deps complete (R1.3.4.b).
472    Combine { pack_fn: FnId },
473
474    /// `withLatestFrom(primary, secondary)` — 2-dep, fire-on-primary-only
475    /// (D021, Phase 10.5). Packs `[primary, secondary]` via
476    /// `BindingBoundary::pack_tuple(pack_fn, &handles)` when dep[0]
477    /// (primary) has DATA in the wave. If only dep[1] (secondary) fires,
478    /// settles with RESOLVED (D018 pattern). First-run gate holds until
479    /// both deps deliver (R2.5.3 `partial: false`). Post-warmup INVALIDATE
480    /// guard: if secondary `prev_data == NO_HANDLE` and batch empty after
481    /// warmup, settles with RESOLVED (no stale pair).
482    WithLatestFrom { pack_fn: FnId },
483
484    /// `merge(...sources)` — N-dep, forward all DATA handles verbatim
485    /// (D022). Zero FFI on fire: no transformation, no binding call.
486    /// Each dep's batch handles are retained and emitted individually.
487    /// COMPLETE cascades when all deps complete (R1.3.4.b).
488    Merge,
489
490    // ----- Slice C-3: flow operators (D024) -----
491    /// `take(source, count)` — emits the first `count` DATA values then
492    /// self-completes via `Core::complete`. Tracks `count_emitted` in
493    /// [`TakeState`](super::op_state::TakeState). When upstream completes
494    /// before `count` is reached, the standard auto-cascade propagates
495    /// COMPLETE. `count == 0` is allowed: the first fire emits zero
496    /// items then immediately self-completes (D027).
497    Take { count: u32 },
498
499    /// `skip(source, count)` — drops the first `count` DATA values; once
500    /// the threshold is crossed, subsequent DATAs pass through verbatim.
501    /// Tracks `count_skipped` in [`SkipState`](super::op_state::SkipState).
502    /// On a wave where every input is still in the skip window, queues
503    /// DIRTY+RESOLVED to settle (D018 pattern).
504    Skip { count: u32 },
505
506    /// `takeWhile(source, predicate)` — emits while `predicate(input)`
507    /// holds; on the first `false`, emits any preceding passes then
508    /// self-completes via `Core::complete`. Reuses
509    /// [`BindingBoundary::predicate_each`] (D029); after the first
510    /// `false`, subsequent inputs in the same batch are dropped.
511    TakeWhile { fn_id: FnId },
512
513    /// `last(source)` / `last_with_default(source, default)` — buffers
514    /// the latest DATA; on upstream COMPLETE, emits `Data(latest)` then
515    /// `Complete`. The `default` field is `NO_HANDLE` for the no-default
516    /// factory (emits only `Complete` on empty stream), or a registered
517    /// default handle (emits `Data(default)` + `Complete` on empty
518    /// stream). Storage: [`LastState`](super::op_state::LastState) holds
519    /// `latest` (live buffer) and `default` (registration-time, stable).
520    /// Opts out of Lock 2.B auto-cascade so it can intercept upstream
521    /// COMPLETE.
522    Last { default: HandleId },
523
524    // ----- Slice U: control operators (D047) -----
525    /// `tap(source, fn)` — side-effect passthrough. Calls
526    /// `BindingBoundary::invoke_tap_fn(fn_id, handle)` on each input DATA,
527    /// then emits the input handle unchanged. Zero-transform: output
528    /// handles are the inputs verbatim (no equals substitution, no
529    /// allocation).
530    Tap { fn_id: FnId },
531
532    /// `tapFirst(source, fn)` — one-shot side-effect on first DATA. Same
533    /// as [`Tap`](Self::Tap) but fires `invoke_tap_fn` only once; after
534    /// the first fire, subsequent DATA passes through without a callback.
535    /// State: [`TapFirstState`](super::op_state::TapFirstState) tracks
536    /// `fired: bool`.
537    TapFirst { fn_id: FnId },
538
539    /// `valve(source, control)` — conditional forward. 2-dep: dep[0] is
540    /// source, dep[1] is boolean control. When the latest control value
541    /// is truthy (non-zero handle), forwards source DATA; when falsy,
542    /// settles with RESOLVED. Partial mode so it fires on control-alone
543    /// before source has delivered. Does NOT auto-complete on control
544    /// terminal (`completeWhenDepsComplete: false` equivalent).
545    Valve,
546
547    /// `settle(source, quietWaves, maxWaves)` — convergence detector.
548    /// Forwards each upstream DATA, counts consecutive no-change waves,
549    /// and self-completes when `quiet_count >= quiet_waves` (or
550    /// `wave_count >= max_waves` if set). State:
551    /// [`SettleState`](super::op_state::SettleState).
552    Settle {
553        quiet_waves: u32,
554        max_waves: Option<u32>,
555    },
556}
557
558/// Registration options for [`Core::register_operator`].
559///
560/// `equals` controls operator output dedup (R5.7 — defaults to identity).
561/// `partial` controls the R2.5.3 first-run gate (R5.4 — operator dispatch
562/// fires on first DATA from any dep when `true`; default `false` matches
563/// the gated derived discipline).
564#[derive(Copy, Clone, Debug)]
565pub struct OperatorOpts {
566    pub equals: EqualsMode,
567    pub partial: bool,
568}
569
570impl Default for OperatorOpts {
571    fn default() -> Self {
572        Self {
573            equals: EqualsMode::Identity,
574            partial: false,
575        }
576    }
577}
578
579/// Closure-form fn id OR typed operator discriminant — the two dispatch
580/// paths a node can use. State / passthrough nodes pass `None` to
581/// [`Core::register`] (no fn at all).
582#[derive(Copy, Clone, Debug)]
583pub enum NodeFnOrOp {
584    /// Closure-form: invokes [`BindingBoundary::invoke_fn`] per fire.
585    /// Used for Derived / Dynamic / Producer.
586    Fn(FnId),
587    /// Typed-op: routes to a `fire_op_*` helper that calls per-operator
588    /// FFI methods (`project_each` / `predicate_each` / `fold_each` /
589    /// `pairwise_pack` / `pack_tuple`). Used for Operator nodes.
590    Op(OperatorOp),
591}
592
593/// Pause behavior mode (canonical-spec §2.6 — three modes shipped in TS;
594/// Slice F audit, 2026-05-07 — closed the Rust port gap).
595///
596/// | Mode | Outgoing tier-3 routing while paused | RESUME behavior |
597/// |---|---|---|
598/// | [`PausableMode::Default`] | suppress fn-fire upstream (no DIRTY emitted) | fire fn ONCE on RESUME if any dep delivered DATA during pause; collapses N pause-window writes into one settle |
599/// | [`PausableMode::ResumeAll`] | buffer outgoing tier-3 / tier-4 messages per-wave | replay each buffered wave verbatim on RESUME |
600/// | [`PausableMode::Off`] | dispatcher ignores PAUSE; tier-3 flushes immediately | no-op (no buffer to drain) |
601///
602/// Default is [`PausableMode::Default`] per canonical §2.6 — every untagged
603/// source picks it up. Memory profile is O(1) per node (no buffer); the
604/// trade-off is "subscribers see one consolidated DATA on RESUME" rather
605/// than the K mid-pause emissions verbatim.
606///
607/// Note: tier-1 (DIRTY) / tier-2 (PAUSE/RESUME) / tier-5 (COMPLETE/ERROR) /
608/// tier-6 (TEARDOWN) bypass pause regardless of mode — they remain
609/// observable so leaked pause-controllers cannot strand subscribers.
610#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
611pub enum PausableMode {
612    /// Suppress fn-fire while paused; fire once on RESUME if any dep
613    /// delivered DATA during the pause window. Canonical default.
614    #[default]
615    Default,
616    /// Buffer outgoing tier-3 / tier-4 messages per-wave; replay on
617    /// RESUME. Use when subscribers need verbatim emit history (e.g. an
618    /// audit log, replay-on-reconnect bridge).
619    ResumeAll,
620    /// Dispatcher ignores PAUSE for this node — tier-3 flushes
621    /// immediately even while a lock is held. Use for nodes whose value
622    /// production is intrinsically pause-immune (telemetry counters,
623    /// monotonic timers).
624    Off,
625}
626
627/// Per-kind opts for [`Core::register`]. Cross-kind config knobs live
628/// here; per-kind specifics (deps, fn_or_op) live on
629/// [`NodeRegistration`].
630#[derive(Copy, Clone, Debug)]
631pub struct NodeOpts {
632    /// Initial cached value. Only valid for state nodes (no deps + no
633    /// fn + no op). [`NO_HANDLE`] starts the node sentinel.
634    pub initial: HandleId,
635    /// Equality mode for outgoing emissions (R1.3.2). Defaults to
636    /// [`EqualsMode::Identity`].
637    pub equals: EqualsMode,
638    /// First-run gate (R2.5.3 / D011). When `true`, the node fires as
639    /// soon as ANY dep delivers a real handle; when `false` (default),
640    /// the node holds until every dep has delivered.
641    pub partial: bool,
642    /// Dynamic flag (R2.5.3) — fn declares actually-tracked dep indices
643    /// per fire. Only meaningful when `fn_or_op == Some(Fn(_))` AND
644    /// deps non-empty.
645    pub is_dynamic: bool,
646    /// Pause behavior mode (canonical §2.6). Default is
647    /// [`PausableMode::Default`]. See [`PausableMode`] for the trade-offs.
648    pub pausable: PausableMode,
649    /// Replay buffer cap (canonical R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
650    /// `None` (default) disables; `Some(N)` keeps a circular buffer of the
651    /// last N DATA emissions and replays them to late subscribers as part
652    /// of the per-tier handshake (between [`Message::Start`] and any
653    /// terminal slice). Only DATA is buffered; RESOLVED entries are NOT
654    /// (R2.6.5 explicit "DATA only").
655    pub replay_buffer: Option<usize>,
656}
657
658impl Default for NodeOpts {
659    fn default() -> Self {
660        Self {
661            initial: NO_HANDLE,
662            equals: EqualsMode::Identity,
663            partial: false,
664            is_dynamic: false,
665            pausable: PausableMode::Default,
666            replay_buffer: None,
667        }
668    }
669}
670
671/// Unified node-registration descriptor (D030, Slice D).
672///
673/// All node kinds (State / Producer / Derived / Dynamic / Operator)
674/// register through [`Core::register`] with a `NodeRegistration`. The
675/// kind is **derived from the field shape** of the registration —
676/// `(deps.is_empty(), fn_or_op variant)`:
677///
678/// | deps      | fn_or_op   | is_dynamic | resulting kind |
679/// |-----------|-----------|-----------|----------------|
680/// | empty     | None      | -         | State          |
681/// | empty     | Some(Fn)  | -         | Producer       |
682/// | non-empty | Some(Fn)  | false     | Derived        |
683/// | non-empty | Some(Fn)  | true      | Dynamic        |
684/// | non-empty | Some(Op)  | -         | Operator       |
685///
686/// The sugar wrappers ([`Core::register_state`], [`Core::register_producer`],
687/// etc.) build a `NodeRegistration` and delegate.
688#[derive(Clone, Debug)]
689pub struct NodeRegistration {
690    /// Upstream deps in declaration order. Empty for state / producer.
691    pub deps: Vec<NodeId>,
692    /// Closure-form fn id or typed-op discriminant. `None` for state /
693    /// passthrough.
694    pub fn_or_op: Option<NodeFnOrOp>,
695    /// Cross-kind config knobs.
696    pub opts: NodeOpts,
697}
698
699/// Equality mode for a node's outgoing emissions.
700///
701/// `Identity` is the default: cache vs. new handle compare is a `u64` equal —
702/// zero FFI. `Custom` invokes [`BindingBoundary::custom_equals`] every check
703/// (R1.3.2.b two-arg call when both sides are non-sentinel).
704#[derive(Copy, Clone, Debug)]
705pub enum EqualsMode {
706    Identity,
707    Custom(FnId),
708}
709
710/// Internal identifier for a single subscription. Allocated per
711/// [`Core::subscribe`] call. Wrapped by [`Subscription`] for the public API;
712/// consumed directly only by Core internals and the [`Subscription::Drop`]
713/// path.
714#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
715pub(crate) struct SubscriptionId(u64);
716
717/// RAII subscription handle.
718///
719/// Returned by [`Core::subscribe`]. While the handle is held, the sink stays
720/// registered against its node. Dropping the handle (explicitly via
721/// `drop(sub)` or implicitly at scope exit) unsubscribes the sink — no manual
722/// `unsubscribe()` call is needed. Per §10.12 of the rust-port session doc.
723///
724/// # Lifetime semantics
725///
726/// The subscription holds a [`Weak`] reference back to the Core's state. If
727/// the Core is dropped before the subscription, the Drop impl is a silent
728/// no-op (the sink has nowhere to deregister from anyway). This avoids a
729/// reference cycle when subscribers capture an `Arc<Core>` in their closure.
730///
731/// # Thread safety
732///
733/// `Send + Sync`. The handle can be moved across threads or dropped from
734/// any thread.
735///
736/// # Not Clone
737///
738/// `Subscription` owns the unsubscribe action exclusively. Cloning would
739/// require either "first drop wins" or "last drop wins" semantics, both
740/// of which surprise. If a binding needs multiple deregistration handles,
741/// it should subscribe multiple times (each producing a fresh handle) or
742/// wrap the single `Subscription` in `Arc<Mutex<Option<Subscription>>>`.
743#[must_use = "dropping a Subscription unsubscribes its sink immediately"]
744pub struct Subscription {
745    state: Weak<Mutex<CoreState>>,
746    node_id: NodeId,
747    sub_id: SubscriptionId,
748}
749
750impl Subscription {
751    /// The node this subscription is attached to.
752    #[must_use]
753    pub fn node_id(&self) -> NodeId {
754        self.node_id
755    }
756}
757
758impl Drop for Subscription {
759    #[allow(clippy::too_many_lines)] // Phase G is one continuous lifecycle hook chain (user cleanup → producer_deactivate → wipe_ctx → Core cache-clear); splitting it would obscure the ordering invariant.
760    fn drop(&mut self) {
761        // Silent no-op if Core is gone. This keeps Drop infallible (no panics
762        // from a dropped subscription racing a dropped Core) and avoids
763        // surprising users with errors on shutdown.
764        //
765        // Producer deactivation (Slice D, D031): if removing this sub
766        // empties the subscribers map AND the node is a producer, fire
767        // `BindingBoundary::producer_deactivate(node_id)` AFTER releasing
768        // the state lock. The binding then drops its per-node state
769        // (subscriptions to upstream sources, captured closure state),
770        // which transitively unsubs from upstreams via their own
771        // `Subscription::Drop`. Re-entrance into Core from the deactivate
772        // hook is permitted since the lock is released first.
773        let Some(state) = self.state.upgrade() else {
774            return;
775        };
776        // Slice E2 (D056): when the last subscriber drops, fire the
777        // node's OnDeactivation cleanup hook BEFORE producer_deactivate
778        // (cleanup may release handles the producer subscription owns;
779        // reverse order would let producer_deactivate drop subs that user
780        // cleanup expected to be live). Both calls are lock-released per
781        // D045.
782        //
783        // OnDeactivation gating (D068, QA Q3 fix): fires only when the
784        // node has fired its fn at least once AND has a fn (`fn_id`
785        // populated). State nodes have no fn — they cannot register a
786        // cleanup spec via the production fn-return path (R2.4.5), so
787        // firing `cleanup_for` on them is wasted FFI; the binding's
788        // lookup is guaranteed to find no `current_cleanup`. Skipping
789        // here saves the FFI hop and matches the design-doc wording
790        // ("never-fired state nodes" — state-with-initial-value satisfies
791        // `has_fired_once = true` but still has no fn).
792        //
793        // Slice E2 /qa Q2(b) (D069): if the node is a resubscribable
794        // node that's ALREADY terminal (terminate fired BEFORE this last
795        // sub drop), fire `wipe_ctx` lock-released AFTER OnDeactivation
796        // + producer_deactivate. Mutually exclusive with `terminate_node`'s
797        // queue-wipe site: terminate-with-empty-subs goes through
798        // `pending_wipes`; terminate-with-live-subs routes here when
799        // those subs eventually drop. Either path fires exactly one
800        // wipe per terminal lifecycle.
801        let (was_last_sub, is_producer, has_user_cleanup, fire_wipe, binding) = {
802            let mut s = state.lock();
803            let Some(rec) = s.nodes.get_mut(&self.node_id) else {
804                return;
805            };
806            rec.subscribers.remove(&self.sub_id);
807            // Slice X4 / D2: bump revision so any pending_notify entry for
808            // this node opened earlier in the wave starts a fresh batch on
809            // the next queue_notify, dropping the now-departed sink from
810            // the snapshot.
811            rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
812            let last = rec.subscribers.is_empty();
813            let producer = rec.is_producer();
814            // OnDeactivation gate: must have run a fn at least once
815            // (has_fired_once) AND have a fn registered (fn_id.is_some()).
816            // The fn_id check excludes state nodes whose has_fired_once
817            // tracks initial-value status, not "user fn ran."
818            let user_cleanup = rec.has_fired_once && rec.fn_id.is_some();
819            let fire_wipe = last && rec.resubscribable && rec.terminal.is_some();
820            // Phase G (D119/D120/D121, 2026-05-10): always clone the
821            // binding when last sub leaves so we can run the Core
822            // cache-clear after the existing hooks. The Arc::clone is
823            // cheap and dwarfed by the cost of the hooks themselves.
824            let binding = if last { Some(s.binding.clone()) } else { None };
825            (last, producer, user_cleanup, fire_wipe, binding)
826        };
827        if was_last_sub {
828            if let Some(binding) = binding {
829                if has_user_cleanup {
830                    binding.cleanup_for(self.node_id, CleanupTrigger::OnDeactivation);
831                }
832                if is_producer {
833                    binding.producer_deactivate(self.node_id);
834                }
835                // D069: eager wipe — fires AFTER OnDeactivation so the
836                // user closure observes pre-wipe `store` (matches the
837                // existing "OnDeactivation runs before wipe on terminal
838                // reset" invariant covered by test 10). Idempotent —
839                // `HashMap::remove` on absent key is a no-op, so even
840                // if the wave already drained `pending_wipes` earlier,
841                // this fire is benign.
842                if fire_wipe {
843                    binding.wipe_ctx(self.node_id);
844                }
845
846                // Phase G (D118/D119/D120/D121, 2026-05-10) — Core
847                // cache-clear on deactivation, mirroring TS `_deactivate`
848                // (`pure-ts/src/core/node.ts:2185-2297`):
849                //
850                //   1. user `cleanup_for(OnDeactivation)`  ← above
851                //   2. `producer_deactivate`                ← above
852                //   3. `wipe_ctx` (resubscribable+terminal) ← above
853                //   4. **NEW: Core cache-clear**            ← here
854                //
855                // Releases per-dep `prev_data` + `data_batch` retains +
856                // `dep_terminals` Error retains (the latter closes the
857                // long-standing "Non-resubscribable terminal Error
858                // handles leak via diamond cascade" porting-deferred
859                // entry — D121). Clears pause/replay buffers. Releases
860                // `cached` for compute nodes (R2.2.7 / R2.2.8 ROM:
861                // state nodes preserve cache; compute nodes clear).
862                // Keeps the per-node `terminal` slot intact (D121:
863                // producer-side terminal stays for late-subscriber
864                // R2.2.7.a reset or R2.2.7.b rejection).
865                //
866                // Lock-released release discipline: collect handles
867                // under the lock, drop the lock, fire `release_handle`
868                // outside (mirrors `Core::resume` Phase 2 + the
869                // existing F1 `set_deps` removed-handles release at
870                // node.rs:5003).
871                //
872                // Re-entrance safety (F1 / D123, /qa 2026-05-10): if the
873                // user `cleanup_for` / `producer_deactivate` / `wipe_ctx`
874                // hook re-subscribed via some path, `subscribers` is now
875                // non-empty. **Skip Phase G entirely in that case** —
876                // the new subscriber's handshake delivered the live
877                // `cache` handle to its sink and is holding a refcount
878                // share through `pending_notify`; clearing cache here
879                // would race with the new subscriber's wave and cause
880                // a use-after-release in bindings that reap registry
881                // slots at refcount-zero.
882                //
883                // Why this diverges from TS `_deactivate` (which clears
884                // unconditionally): TS runs the cleanup hook + cache
885                // clear as ONE sync block under the (implicit) JS
886                // single-thread mutex; there's no released-lock window
887                // for a re-subscribe to install a sub before the
888                // cache-clear runs. Rust's lock-released hook discipline
889                // (D045) opens that window, so the re-check is
890                // necessary to preserve refcount soundness.
891                //
892                // Re-acquire state lock atomically with the recheck so
893                // a concurrent thread cannot install a sub between the
894                // emptiness check and the cache mutation.
895                // F8 (/qa 2026-05-10): also take `op_scratch` so its
896                // retains release lock-released after the state-lock
897                // scope. Pre-F8 Phase G only released per-edge handles
898                // + the compute `cache`, leaving operator-internal
899                // retains (Last.latest, Scan.acc, Reduce.acc, etc.)
900                // in-place. For non-resubscribable nodes that never
901                // re-subscribe, this was a permanent leak —
902                // asymmetric with the per-edge cleanup. F8 closes that
903                // by taking the scratch alongside per-edge handles
904                // and calling `release_handles` lock-released.
905                //
906                // D-α (D028 full close, 2026-05-10): for resubscribable
907                // operator nodes, take the OLD scratch AND build a
908                // FRESH scratch via `make_op_scratch` (lock-held, but
909                // `binding.retain_handle` is a leaf operation per
910                // op_state.rs:69-80 docs), install the fresh scratch
911                // on `rec.op_scratch`, and push the old scratch to
912                // `pending_scratch_release` for deferred release. The
913                // queue drains on the next `reset_for_fresh_lifecycle`
914                // (after Phase 2 takes fresh retains — preserves the
915                // C-3 /qa P1 seed-aliasing-acc invariant) or on
916                // `Drop for CoreState`. The fresh install gives
917                // re-activation correct counter / acc state (Take.taken
918                // back to 0, Scan.acc back to seed, etc.) matching TS
919                // Lock 6.D ("resets on every deactivation").
920                let (to_release, scratch_to_release): (
921                    Vec<HandleId>,
922                    Option<Box<dyn crate::op_state::OperatorScratch>>,
923                ) = {
924                    let mut s = state.lock();
925                    if let Some(rec) = s.nodes.get_mut(&self.node_id) {
926                        // F1 re-entrance check.
927                        if !rec.subscribers.is_empty() {
928                            // A user hook re-subscribed during the
929                            // lock-released window; the new lifecycle
930                            // owns this node now. Phase G is a no-op.
931                            return;
932                        }
933                        let mut handles: Vec<HandleId> = Vec::new();
934                        // Per-dep state. Empty for state nodes.
935                        for dr in &mut rec.dep_records {
936                            if dr.prev_data != NO_HANDLE {
937                                handles.push(dr.prev_data);
938                                dr.prev_data = NO_HANDLE;
939                            }
940                            for h in dr.data_batch.drain(..) {
941                                handles.push(h);
942                            }
943                            // D121: per-edge terminal-Error retain
944                            // released here. Closes the cascade leak.
945                            // The producer's own `rec.terminal` slot
946                            // stays intact (preserved below).
947                            if let Some(TerminalKind::Error(h)) = dr.terminal {
948                                handles.push(h);
949                            }
950                            dr.terminal = None;
951                            dr.dirty = false;
952                            dr.involved_this_wave = false;
953                        }
954                        // Pause buffer DATA / replay buffer.
955                        if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
956                            for msg in buffer.drain(..) {
957                                if let Some(h) = msg.payload_handle() {
958                                    handles.push(h);
959                                }
960                            }
961                        }
962                        for h in rec.replay_buffer.drain(..) {
963                            handles.push(h);
964                        }
965                        // Pause locks drained → node back to Active.
966                        rec.pause_state = PauseState::Active;
967                        // R2.2.7 / R2.2.8 ROM: state nodes preserve
968                        // cache; compute (fn or op) nodes clear.
969                        // D119: state nodes keep `cached` because the
970                        // value is intrinsic and non-volatile;
971                        // resubscribe sees the same value.
972                        let is_compute = rec.fn_id.is_some() || rec.op.is_some();
973                        if is_compute && rec.cache != NO_HANDLE {
974                            handles.push(rec.cache);
975                            rec.cache = NO_HANDLE;
976                        }
977                        // Reset wave + lifecycle state so reactivation
978                        // begins fresh. `terminal` STAYS (D121).
979                        rec.has_fired_once = false;
980                        rec.dirty = false;
981                        rec.involved_this_wave = false;
982                        // §10.13 perf (D047): reset received_mask — all deps back
983                        // to sentinel on resubscribe.
984                        rec.received_mask = 0;
985                        // §10.3 perf (Slice V1): reset involved_mask.
986                        rec.involved_mask = 0;
987                        if rec.is_dynamic {
988                            rec.tracked.clear();
989                        }
990                        // F8 + D-α: op_scratch handling forks by
991                        // `resubscribable`. Non-resubscribable: eager
992                        // release (F8 path — there's no future reset
993                        // so the leak ends here). Resubscribable +
994                        // has-op: take old AND install fresh; defer
995                        // old release to the queue.
996                        let scratch = if !rec.resubscribable {
997                            std::mem::take(&mut rec.op_scratch)
998                        } else if let Some(op) = rec.op {
999                            // Slice C-3 /qa P1 (retain-before-release):
1000                            // build fresh scratch FIRST (this calls
1001                            // `binding.retain_handle` for any seed /
1002                            // default the op carries), THEN swap. The
1003                            // old scratch's release is deferred to the
1004                            // `pending_scratch_release` queue (drained
1005                            // on next reset_for_fresh_lifecycle or
1006                            // Drop for CoreState).
1007                            //
1008                            // make_op_scratch is fallible only for
1009                            // OperatorSeedSentinel; the OperatorOp
1010                            // stored on NodeRecord passed validation
1011                            // at registration time, so the unwrap
1012                            // here is structurally guaranteed (mirrors
1013                            // reset_for_fresh_lifecycle).
1014                            //
1015                            // Uses the binding-explicit static variant
1016                            // because we have only `&dyn BindingBoundary`
1017                            // here (Subscription::Drop holds no Core).
1018                            let new_scratch = Core::make_op_scratch_with_binding(&*binding, op)
1019                                .expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time");
1020                            let old = std::mem::replace(&mut rec.op_scratch, new_scratch);
1021                            if let Some(old_box) = old {
1022                                s.pending_scratch_release.push(old_box);
1023                            }
1024                            // The "scratch_to_release" for lock-released
1025                            // release stays None here — the resubscribable
1026                            // case routes through the queue.
1027                            None
1028                        } else {
1029                            // Resubscribable but no op (e.g. derived
1030                            // compute / dynamic / state). Nothing to
1031                            // do for op_scratch.
1032                            None
1033                        };
1034                        (handles, scratch)
1035                    } else {
1036                        // Node destroyed between lock-released hooks
1037                        // and this re-acquire (terminate cascade or
1038                        // graph removal) — nothing to clear.
1039                        (Vec::new(), None)
1040                    }
1041                };
1042                // Release handles lock-released. Binding may re-enter
1043                // Core during `release_handle` (final-Drop callbacks
1044                // are user code).
1045                for h in to_release {
1046                    binding.release_handle(h);
1047                }
1048                // F8: release operator-scratch handles lock-released
1049                // (mirrors `ScratchReleaseGuard::drop` ordering).
1050                if let Some(mut scratch) = scratch_to_release {
1051                    scratch.release_handles(&*binding);
1052                }
1053            }
1054        }
1055    }
1056}
1057
1058// Compile-time assertion that Subscription is Send + Sync. If a future field
1059// breaks this, the build fails here rather than downstream at the binding
1060// site.
1061const _: fn() = || {
1062    fn assert_send_sync<T: Send + Sync>() {}
1063    assert_send_sync::<Subscription>();
1064};
1065
1066/// A subscriber callback. `Send + Sync` so the Core can fire it from any
1067/// thread; `Fn` (not `FnMut`) so multiple references coexist — capture
1068/// mutable state in `Mutex<T>` or atomics on the binding side.
1069pub type Sink = Arc<dyn Fn(&[Message]) + Send + Sync>;
1070
1071// ---------------------------------------------------------------------------
1072// PAUSE/RESUME state — §10.2 of the rust-port session doc
1073// ---------------------------------------------------------------------------
1074
1075/// Per-node pause state.
1076///
1077/// Replaces the four TS fields (`_pauseLocks`, `_pauseBuffer`,
1078/// `_pauseDroppedCount`, `_pauseStartNs`) with a single enum where
1079/// the buffered fields are unreachable in the [`Self::Active`] variant —
1080/// the compiler refuses access. Per §10.2 simplification.
1081///
1082/// # Invariants
1083///
1084/// - `Active` ⇔ no lockId held.
1085/// - `Paused { locks, .. }` ⇔ `!locks.is_empty()`.
1086/// - Buffered messages are tier 3 (DATA/RESOLVED) and tier 4 (INVALIDATE)
1087///   only. Other tiers pass through immediately even while paused.
1088/// - `dropped` counts messages that fell out the front of `buffer` due to
1089///   the Core-global `pause_buffer_cap`; it is reported on resume so callers
1090///   can detect overflow without re-tracking it externally.
1091#[derive(Debug)]
1092pub(crate) enum PauseState {
1093    Active,
1094    Paused {
1095        /// Active lock holders. `SmallVec` keeps the common 1–2 lock case
1096        /// stack-allocated. Replaces `Set<unknown>` from TS.
1097        locks: SmallVec<[LockId; 2]>,
1098        /// Buffered tier-3/tier-4 outgoing messages, in arrival order.
1099        /// Replayed on the final RESUME.
1100        buffer: VecDeque<Message>,
1101        /// Count of messages dropped from the front when `buffer.len()` would
1102        /// exceed `pause_buffer_cap`. Cleared on final RESUME (next pause
1103        /// cycle starts fresh).
1104        dropped: u32,
1105        /// Wall-clock-monotonic ns when the lock first transitioned this node
1106        /// from `Active` to `Paused`. Used by R1.3.8.c overflow ERROR
1107        /// synthesis to compute `lock_held_duration_ms` in the diagnostic
1108        /// payload (Slice F, A3 — 2026-05-07).
1109        started_at_ns: u64,
1110        /// True after the first overflow event in this pause cycle has been
1111        /// reported via [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
1112        /// Subsequent overflows in the same cycle don't re-emit ERROR
1113        /// (canonical R1.3.8.c: "once per overflow event"). Cleared on
1114        /// final RESUME (next pause cycle starts fresh).
1115        overflow_reported: bool,
1116        /// Default-mode bookkeeping (Slice F audit close, 2026-05-07).
1117        /// Set to `true` when an upstream dep delivery arrives while this
1118        /// node is paused with [`PausableMode::Default`]. On final RESUME,
1119        /// if `true`, the node is added back to `pending_fires` so the fn
1120        /// fires once with the consolidated dep state. Always `false` for
1121        /// `ResumeAll` mode (the buffered messages are the consolidation
1122        /// mechanism there). Cleared on final RESUME.
1123        pending_wave: bool,
1124    },
1125}
1126
1127impl PauseState {
1128    pub(crate) fn is_paused(&self) -> bool {
1129        matches!(self, Self::Paused { .. })
1130    }
1131
1132    fn lock_count(&self) -> usize {
1133        match self {
1134            Self::Active => 0,
1135            Self::Paused { locks, .. } => locks.len(),
1136        }
1137    }
1138
1139    fn contains_lock(&self, lock_id: LockId) -> bool {
1140        match self {
1141            Self::Active => false,
1142            Self::Paused { locks, .. } => locks.contains(&lock_id),
1143        }
1144    }
1145
1146    /// Add a lock; transitions Active → Paused on first lock. Idempotent on
1147    /// duplicate lock_id (matches TS convention; spec is silent on the case).
1148    fn add_lock(&mut self, lock_id: LockId) {
1149        match self {
1150            Self::Active => {
1151                let mut locks = SmallVec::new();
1152                locks.push(lock_id);
1153                *self = Self::Paused {
1154                    locks,
1155                    buffer: VecDeque::new(),
1156                    dropped: 0,
1157                    started_at_ns: monotonic_ns(),
1158                    overflow_reported: false,
1159                    pending_wave: false,
1160                };
1161            }
1162            Self::Paused { locks, .. } => {
1163                if !locks.contains(&lock_id) {
1164                    locks.push(lock_id);
1165                }
1166            }
1167        }
1168    }
1169
1170    /// Mark that an upstream dep delivered DATA to a node paused with
1171    /// [`PausableMode::Default`]. The node will re-enter `pending_fires`
1172    /// on final RESUME via [`Self::take_pending_wave`].
1173    pub(crate) fn mark_pending_wave(&mut self) {
1174        if let Self::Paused { pending_wave, .. } = self {
1175            *pending_wave = true;
1176        }
1177    }
1178
1179    /// Read and clear the `pending_wave` flag. Called from
1180    /// [`Core::resume`] when transitioning Paused → Active. Returns `true`
1181    /// only if the node was paused with `pending_wave` set.
1182    pub(crate) fn take_pending_wave(&mut self) -> bool {
1183        if let Self::Paused { pending_wave, .. } = self {
1184            std::mem::replace(pending_wave, false)
1185        } else {
1186            false
1187        }
1188    }
1189
1190    /// Remove a lock; if the lockset becomes empty, transition Paused →
1191    /// Active and return the buffered messages for replay (along with the
1192    /// dropped count for diagnostics). Unknown lock_id is an idempotent
1193    /// no-op (matches TS, R1.2.6 implicit).
1194    fn remove_lock(&mut self, lock_id: LockId) -> Option<(VecDeque<Message>, u32)> {
1195        match self {
1196            Self::Active => None,
1197            Self::Paused { locks, .. } => {
1198                if let Some(idx) = locks.iter().position(|l| *l == lock_id) {
1199                    locks.swap_remove(idx);
1200                }
1201                if locks.is_empty() {
1202                    let prev = std::mem::replace(self, Self::Active);
1203                    if let Self::Paused {
1204                        buffer, dropped, ..
1205                    } = prev
1206                    {
1207                        return Some((buffer, dropped));
1208                    }
1209                }
1210                None
1211            }
1212        }
1213    }
1214
1215    /// Append a message to the buffer; if the buffer would exceed `cap`,
1216    /// pop from the front (oldest-first), increment `dropped`, and return
1217    /// the dropped messages so the caller can release any payload handles
1218    /// they reference. `cap` of `None` means unbounded.
1219    ///
1220    /// Returns [`PushBufferedResult`] carrying both the dropped messages
1221    /// (for refcount release) and whether this push triggered the FIRST
1222    /// overflow event in the current pause cycle (for R1.3.8.c ERROR
1223    /// synthesis — the caller schedules a single ERROR per cycle).
1224    ///
1225    /// Note: refcount management for the message's payload handle is the
1226    /// caller's responsibility — see [`Core::queue_notify`] for the
1227    /// retain/release discipline. The buffer itself is just a message
1228    /// container; refcounts cross the binding boundary.
1229    pub(crate) fn push_buffered(&mut self, msg: Message, cap: Option<usize>) -> PushBufferedResult {
1230        let mut result = PushBufferedResult::default();
1231        if let Self::Paused {
1232            buffer,
1233            dropped,
1234            overflow_reported,
1235            ..
1236        } = self
1237        {
1238            buffer.push_back(msg);
1239            if let Some(c) = cap {
1240                while buffer.len() > c {
1241                    if let Some(dropped_msg) = buffer.pop_front() {
1242                        result.dropped_msgs.push(dropped_msg);
1243                    }
1244                    *dropped = dropped.saturating_add(1);
1245                }
1246            }
1247            // R1.3.8.c (Slice F, A3): flag first overflow this cycle.
1248            if !result.dropped_msgs.is_empty() && !*overflow_reported {
1249                *overflow_reported = true;
1250                result.first_overflow_this_cycle = true;
1251            }
1252        }
1253        result
1254    }
1255
1256    /// Snapshot the diagnostic for an R1.3.8.c overflow ERROR synthesis.
1257    /// Returns `(dropped_count, lock_held_ns)`. Caller must already know
1258    /// the configured cap (it's a Core-global value, not per-PauseState).
1259    pub(crate) fn overflow_diagnostic(&self) -> Option<(u32, u64)> {
1260        match self {
1261            Self::Active => None,
1262            Self::Paused {
1263                dropped,
1264                started_at_ns,
1265                ..
1266            } => {
1267                let lock_held_ns = monotonic_ns().saturating_sub(*started_at_ns);
1268                Some((*dropped, lock_held_ns))
1269            }
1270        }
1271    }
1272}
1273
1274/// Return shape for [`PauseState::push_buffered`]. Carries both the dropped
1275/// messages (for refcount release) and an "is this the first overflow this
1276/// cycle" flag (for R1.3.8.c ERROR synthesis scheduling).
1277#[derive(Default)]
1278pub(crate) struct PushBufferedResult {
1279    pub(crate) dropped_msgs: Vec<Message>,
1280    pub(crate) first_overflow_this_cycle: bool,
1281}
1282
1283/// Pending R1.3.8.c overflow ERROR synthesis entry. Recorded by
1284/// [`Core::queue_notify`] when the pause buffer first overflows in a cycle;
1285/// drained at wave-end after the lock-released call to
1286/// [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
1287///
1288/// `configured_max` is captured at scheduling time rather than read at
1289/// drain — the user could change `pause_buffer_cap` between schedule and
1290/// drain, and the diagnostic reads "the cap that was in effect when the
1291/// overflow happened."
1292#[derive(Debug, Clone)]
1293pub(crate) struct PendingPauseOverflow {
1294    pub(crate) node_id: NodeId,
1295    pub(crate) dropped_count: u32,
1296    pub(crate) configured_max: usize,
1297    pub(crate) lock_held_ns: u64,
1298}
1299
1300/// Error returned when a same-thread partition acquire violates
1301/// ascending order. Phase H+ STRICT variant (D115).
1302///
1303/// The ascending-order protocol prevents AB/BA deadlocks between
1304/// threads. When this error surfaces, the caller should defer the
1305/// operation to wave-end (when no partitions are held) and retry.
1306#[derive(Error, Debug, Clone, PartialEq, Eq)]
1307#[error(
1308    "Phase H+ ascending-order violation: attempted partition {attempted:?} \
1309     while already holding partition {max_held:?} — defer to wave-end"
1310)]
1311pub struct PartitionOrderViolation {
1312    /// The partition the caller tried to acquire.
1313    pub attempted: crate::subgraph::SubgraphId,
1314    /// The highest-id partition currently held by this thread.
1315    pub max_held: crate::subgraph::SubgraphId,
1316}
1317
1318/// Errors returnable by [`Core::try_subscribe`].
1319///
1320/// `Core::subscribe` (the panic-on-error variant) panics on either
1321/// case; `try_subscribe` returns these so operators (zip / concat /
1322/// race / take_until / merge / switch_map / etc.) can match on the
1323/// variant — defer for [`Self::PartitionOrderViolation`], skip the
1324/// source for [`Self::TornDown`].
1325///
1326/// Per canonical spec R2.2.7.a / R2.2.7.b (D118, 2026-05-10).
1327#[derive(Error, Debug, Clone, PartialEq, Eq)]
1328pub enum SubscribeError {
1329    /// Phase H+ STRICT (D115): partition acquisition would violate the
1330    /// ascending-order protocol. Caller should defer the subscribe to
1331    /// wave-end via the producer-pattern deferred-op queue.
1332    #[error(transparent)]
1333    PartitionOrderViolation(#[from] PartitionOrderViolation),
1334
1335    /// R2.2.7.b (D118, 2026-05-10): the node is non-resubscribable AND
1336    /// has terminated (`[COMPLETE]` or `[ERROR, h]` was delivered).
1337    /// The stream is permanently over; subscribe is rejected.
1338    /// Resubscribable terminal nodes do NOT surface this error — they
1339    /// reset to a fresh lifecycle on subscribe per R2.2.7.a, regardless
1340    /// of TEARDOWN state.
1341    #[error(
1342        "subscribe({node:?}): node is non-resubscribable and has terminated; \
1343         the stream is permanently over (R2.2.7.b)"
1344    )]
1345    TornDown {
1346        /// The non-resubscribable terminal node that rejected the subscribe.
1347        node: NodeId,
1348    },
1349}
1350
1351/// A producer-pattern operation deferred because it would have
1352/// violated the ascending partition-order protocol (Phase H+ STRICT,
1353/// D115). Drained by `BatchGuard::drop` after wave_guards are
1354/// released (no partitions held → safe to acquire any partition).
1355///
1356/// Variants with `HandleId` fields hold a binding-side retain taken
1357/// at defer time. The drain path releases this retain after the
1358/// operation fires; the panic-discard path releases it without firing.
1359pub enum DeferredProducerOp {
1360    /// Deferred `Core::emit`. Retain held on `handle`.
1361    Emit { node_id: NodeId, handle: HandleId },
1362    /// Deferred `Core::complete`. No handle.
1363    Complete { node_id: NodeId },
1364    /// Deferred `Core::error`. Retain held on `handle`.
1365    Error { node_id: NodeId, handle: HandleId },
1366    /// Generic deferred callback (e.g., deferred subscribe from
1367    /// producer build closure). The closure captures everything it
1368    /// needs; `graphrefly-core` doesn't depend on operator-specific
1369    /// types. The closure is responsible for its own retain discipline.
1370    Callback(Box<dyn FnOnce() + Send>),
1371}
1372
1373/// Errors returnable by [`Core::pause`] and [`Core::resume`].
1374#[derive(Error, Debug, Clone, PartialEq)]
1375pub enum PauseError {
1376    #[error("pause/resume: unknown node {0:?}")]
1377    UnknownNode(NodeId),
1378}
1379
1380/// Errors returnable by [`Core::up`] (canonical R1.4.1).
1381#[derive(Error, Debug, Clone, PartialEq)]
1382pub enum UpError {
1383    /// Node id is not registered.
1384    #[error("up: unknown node {0:?}")]
1385    UnknownNode(NodeId),
1386    /// Tier-3 (DATA / RESOLVED) and tier-5 (COMPLETE / ERROR) are
1387    /// downstream-only per R1.4.1; rejected at the boundary.
1388    #[error(
1389        "up: tier {tier} is forbidden upstream — value (tier 3) and \
1390         terminal-lifecycle (tier 5) planes are downstream-only per R1.4.1"
1391    )]
1392    TierForbidden { tier: u8 },
1393}
1394
1395/// Errors returnable by [`Core::register`] and its sugar wrappers
1396/// ([`Core::register_state`], [`Core::register_producer`],
1397/// [`Core::register_derived`], [`Core::register_dynamic`],
1398/// [`Core::register_operator`]).
1399///
1400/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
1401/// errors so that callers can recover from contract violations without
1402/// process abort. Every variant corresponds to a construction-time
1403/// invariant that the caller is responsible for upholding; the dispatcher
1404/// rejects the registration before any reactive state is created (so
1405/// there is no `Message::Error` channel through which to surface the
1406/// failure — these are imperative-layer errors, not reactive ones).
1407///
1408/// All variants are zero-side-effect: when [`Core::register`] returns
1409/// `Err`, no node has been added to the graph and any handle retains
1410/// taken on the way in (e.g. operator scratch seed retains via
1411/// [`BindingBoundary::retain_handle`]) have been released.
1412#[derive(Error, Debug, Clone, PartialEq, Eq)]
1413pub enum RegisterError {
1414    /// One of the supplied dep ids is not a registered node.
1415    #[error("register: unknown dep {0:?}")]
1416    UnknownDep(NodeId),
1417
1418    /// `op` was supplied (operator node) but `deps` was empty. Operator
1419    /// nodes need at least one dep — for subscription-managed combinators
1420    /// with no declared deps, use [`Core::register_producer`] instead.
1421    #[error(
1422        "register: operator nodes require at least one dep — \
1423         use register_producer for subscription-managed combinators"
1424    )]
1425    OperatorWithoutDeps,
1426
1427    /// [`NodeOpts::initial`] was set to a real handle but the registration
1428    /// shape is not a state node (state nodes are `deps.is_empty() &&
1429    /// fn_id.is_none() && op.is_none()`). Initial cache only makes sense
1430    /// for state nodes.
1431    #[error("register: NodeOpts::initial only valid for state nodes (no deps + no fn + no op)")]
1432    InitialOnlyForStateNodes,
1433
1434    /// A supplied dep is terminal (COMPLETE / ERROR) AND not
1435    /// resubscribable. Adding it would create a permanent wedge — the dep
1436    /// will never re-emit, so the registered node would be stuck.
1437    /// Mirrors [`SetDepsError::TerminalDep`] at registration time.
1438    #[error(
1439        "register: dep {0:?} is terminal and not resubscribable; \
1440         mark it resubscribable before terminating, or remove it from the dep list"
1441    )]
1442    TerminalDep(NodeId),
1443
1444    /// A stateful operator ([`OperatorOp::Scan`] / [`OperatorOp::Reduce`])
1445    /// was registered with `seed = NO_HANDLE`. R2.5.3 first-run gate
1446    /// requires the seed to be a real handle so that the operator can
1447    /// emit on its first fire.
1448    #[error("register: operator seed must be a real handle (R2.5.3); got NO_HANDLE")]
1449    OperatorSeedSentinel,
1450}
1451
1452/// Errors returnable by [`Core::set_pausable_mode`].
1453///
1454/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
1455/// errors. Same imperative-layer error model as [`RegisterError`].
1456#[derive(Error, Debug, Clone, PartialEq, Eq)]
1457pub enum SetPausableModeError {
1458    /// `node_id` is not a registered node.
1459    #[error("set_pausable_mode: unknown node {0:?}")]
1460    UnknownNode(NodeId),
1461    /// The node currently holds at least one pause lock. Changing pausable
1462    /// mode mid-pause would lose buffered content or strand a
1463    /// `pending_wave` flag — resume all locks first.
1464    #[error(
1465        "set_pausable_mode: cannot change pausable mode while paused; \
1466         resume all locks first"
1467    )]
1468    WhilePaused,
1469}
1470
1471/// Per-dep record. Replaces the parallel `deps` / `dep_handles` /
1472/// `dep_terminals` vectors from v1. Canonical spec R2.9.b alignment.
1473///
1474/// Each entry tracks one dep's lifecycle state, wave-scoped batch data,
1475/// and cross-wave `prev_data` for `ctx.prevData` access.
1476pub(crate) struct DepRecord {
1477    /// The dep node this record tracks.
1478    pub(crate) node: NodeId,
1479    /// Last DATA handle from the end of the previous wave. [`NO_HANDLE`]
1480    /// means the dep has never emitted DATA.
1481    pub(crate) prev_data: HandleId,
1482    /// Per-dep dirty flag — awaiting DATA/RESOLVED for current wave.
1483    pub(crate) dirty: bool,
1484    /// Per-dep involved-this-wave flag. Distinguishes:
1485    /// - `involved && data_batch.is_empty()` → dep settled RESOLVED
1486    /// - `!involved && data_batch.is_empty()` → dep was not in this wave
1487    pub(crate) involved_this_wave: bool,
1488    /// DATA handles accumulated this wave. Outside `batch()` scope, at most
1489    /// 1 element. Inside `batch()`, K emits on the source produce K entries
1490    /// per R1.3.6.b coalescing. Each handle holds a `retain_handle` share
1491    /// taken at `deliver_data_to_consumer` time; released at wave-end
1492    /// rotation in `clear_wave_state`.
1493    pub(crate) data_batch: SmallVec<[HandleId; 1]>,
1494    /// Terminal state for this dep. `None` = dep is live.
1495    /// `Some` = dep emitted COMPLETE/ERROR. When ALL entries are Some,
1496    /// the node auto-cascades per Lock 2.B (ERROR dominates COMPLETE).
1497    pub(crate) terminal: Option<TerminalKind>,
1498}
1499
1500impl DepRecord {
1501    fn new(node: NodeId) -> Self {
1502        Self {
1503            node,
1504            prev_data: NO_HANDLE,
1505            dirty: false,
1506            involved_this_wave: false,
1507            data_batch: SmallVec::new(),
1508            terminal: None,
1509        }
1510    }
1511}
1512
1513/// Internal node record. Mirrors `core.ts:132–154` post-D030 unification.
1514///
1515/// **Kind is derived, not stored** (D030, Slice D). `(dep_records.is_empty(),
1516/// fn_id, op, is_dynamic)` uniquely identifies the kind — see [`NodeKind`].
1517/// Helper methods (`is_state()`, `is_producer()`, `is_compute()`,
1518/// `is_operator()`, `skips_auto_cascade()`, `kind()`) cover the common
1519/// predicates without unpacking via [`Core::kind_of`].
1520///
1521/// The 5 bool fields (`has_fired_once`, `dirty`, `involved_this_wave`,
1522/// `has_received_teardown`, `resubscribable`, `is_dynamic`) each represent
1523/// an orthogonal concern. `is_dynamic` is constant per node (set at
1524/// register time); the others are mutable lifecycle state. Collapsing
1525/// them into a bitfield would obscure intent.
1526#[allow(clippy::struct_excessive_bools)]
1527pub(crate) struct NodeRecord {
1528    /// Per-dep records. Replaces the old parallel `deps` / `dep_handles` /
1529    /// `dep_terminals` vecs. Dep NodeIds derived via `dep_ids()`.
1530    pub(crate) dep_records: Vec<DepRecord>,
1531    /// User-fn id for closure-form dispatch. `Some` for Derived / Dynamic /
1532    /// Producer; `None` for State / Operator. (Operator dispatch goes via
1533    /// [`Self::op`] instead.)
1534    pub(crate) fn_id: Option<FnId>,
1535    /// Operator discriminant for typed-op dispatch. `Some` for Operator
1536    /// nodes; `None` otherwise. Mutually exclusive with `fn_id` (a node is
1537    /// either closure-form OR typed-op, never both).
1538    pub(crate) op: Option<OperatorOp>,
1539    /// True for Dynamic nodes (R2.5.3 — fn declares actually-tracked dep
1540    /// indices per fire). False for everything else. Only meaningful when
1541    /// `fn_id.is_some()` AND `!dep_records.is_empty()`.
1542    pub(crate) is_dynamic: bool,
1543    pub(crate) equals: EqualsMode,
1544
1545    // Mutable state
1546    pub(crate) cache: HandleId,
1547    pub(crate) has_fired_once: bool,
1548    pub(crate) subscribers: HashMap<SubscriptionId, Sink>,
1549    /// Monotonic counter bumped on every mutation of [`Self::subscribers`]
1550    /// (insert on subscribe, remove on `Subscription::Drop`, remove on
1551    /// handshake-panic cleanup). Used by
1552    /// [`crate::batch::Core::queue_notify`] to detect mid-wave subscriber-
1553    /// set changes and start a fresh `PendingBatch` with an updated sink
1554    /// snapshot — closes D2 (Slice X4, 2026-05-08): the late-subscriber
1555    /// and multi-emit-per-wave gap where the pre-fix per-node single
1556    /// snapshot meant a sub installed between two emits to the same node
1557    /// in one wave was invisible to the second emit's flush.
1558    ///
1559    /// Per-node (not per-Core) so that a subscribe to node A doesn't
1560    /// invalidate snapshot reuse for node B's pending batch in the same
1561    /// wave.
1562    pub(crate) subscribers_revision: u64,
1563    /// For dynamic nodes: which dep indices fn actually tracks.
1564    /// For static derived: all indices, populated at construction.
1565    pub(crate) tracked: HashSet<usize>,
1566
1567    // Wave-scoped state — cleared at wave end.
1568    pub(crate) dirty: bool,
1569    pub(crate) involved_this_wave: bool,
1570
1571    /// Per-node pause state. Default `Active`. See [`PauseState`].
1572    pub(crate) pause_state: PauseState,
1573    /// Pause behavior mode (canonical-spec §2.6). Set at registration via
1574    /// [`NodeOpts::pausable`]. Default [`PausableMode::Default`] suppresses
1575    /// fn-fire while paused and consolidates N pause-window dep deliveries
1576    /// into one fn-fire on RESUME; `ResumeAll` buffers tier-3/4 outgoing
1577    /// for verbatim replay; `Off` ignores PAUSE entirely. See
1578    /// [`PausableMode`].
1579    pub(crate) pausable: PausableMode,
1580    /// Replay buffer cap (R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
1581    /// `None` disables; `Some(N)` keeps a circular VecDeque of the last N
1582    /// DATA-handle emissions for late-subscriber replay. Each handle in
1583    /// the buffer owns one binding-side retain share, released on evict
1584    /// (cap exceeded) or in `Drop for CoreState`.
1585    pub(crate) replay_buffer_cap: Option<usize>,
1586    pub(crate) replay_buffer: VecDeque<HandleId>,
1587
1588    /// Terminal lifecycle state for THIS node's outgoing stream. Once set,
1589    /// further `emit` calls are silent no-ops, fn no longer fires, and only
1590    /// the terminal message has been queued downstream.
1591    pub(crate) terminal: Option<TerminalKind>,
1592    /// True after the first TEARDOWN has been processed for this node
1593    /// (R2.6.4 / Lock 6.F). Subsequent TEARDOWN deliveries are idempotent
1594    /// — the auto-prepended COMPLETE only fires on the first one. Without
1595    /// this flag, a redundant TEARDOWN delivered via the cascade plus an
1596    /// explicit `core.teardown(node)` would re-emit `[COMPLETE, TEARDOWN]`
1597    /// to subscribers per delivery, which is incorrect.
1598    pub(crate) has_received_teardown: bool,
1599    /// Per R2.2.7 / R2.5.3 — resubscribable terminal lifecycle.
1600    /// When `true` AND `terminal == Some(...)`, a fresh subscribe call
1601    /// will reset the node: clear `terminal`, `has_fired_once`,
1602    /// `has_received_teardown`, all dep_records to sentinel, and drain the
1603    /// pause lockset. Default `false`.
1604    pub(crate) resubscribable: bool,
1605    /// Meta companion nodes attached to this node per R1.3.9.d. When this
1606    /// node tears down, its meta companions are torn down FIRST (before
1607    /// the main node's auto-COMPLETE + TEARDOWN wire emission), so
1608    /// observers see companions terminate before the parent. The ordering
1609    /// is load-bearing — meta nodes typically subscribe to parent state
1610    /// that becomes inconsistent during the parent's destruction phase.
1611    pub(crate) meta_companions: Vec<NodeId>,
1612    /// R5.4 / D011 partial-mode: when `true`, fire_fn skips the R2.5.3
1613    /// first-run gate — the node fires as soon as ANY dep delivers a
1614    /// real handle, even if other deps remain sentinel. Defaults to
1615    /// `false` (gated). Lifted into Core for operator support; for
1616    /// State/Derived/Dynamic nodes the field is settable but the gated
1617    /// path remains the typical caller default.
1618    pub(crate) partial: bool,
1619    /// Topological rank — 1 + max dep rank. Nodes with no deps have rank 0.
1620    /// Used by `pick_next_fire` for O(|pending_fires|) scheduling instead
1621    /// of O(V) transitive BFS. Computed at registration; recomputed on
1622    /// `set_deps` for the modified node (consumer propagation deferred —
1623    /// see `porting-deferred.md`). §10 perf optimization (D047, Slice U).
1624    pub(crate) topo_rank: u32,
1625    /// Bitmask for first-run gate — bit i set when dep i delivers first
1626    /// DATA. `has_sentinel_deps()` becomes a single integer compare for
1627    /// ≤64 deps. Falls back to `iter().any()` when `dep_count > 64`.
1628    /// Reset on resubscribe. §10.13 perf optimization (D047, Slice U).
1629    pub(crate) received_mask: u64,
1630    /// §10.3 diamond resolution bitmask — bit i set when dep i is
1631    /// involved in the current wave (DATA delivered). Mirrors
1632    /// `received_mask` pattern. Cleared to 0 at wave end instead of
1633    /// iterating all deps. For ≤64 deps, `DepBatch::involved` can be
1634    /// derived via `(involved_mask >> dep_idx) & 1 != 0`. For >64 deps,
1635    /// falls back to per-dep `DepRecord::involved_this_wave` field.
1636    /// §10.3 perf optimization (Slice V1).
1637    pub(crate) involved_mask: u64,
1638    /// Generic per-operator scratch slot (Slice C-3, D026). Replaces
1639    /// the typed `operator_state: HandleId` field used by Slices C-1 / C-2.
1640    /// `None` for non-operator kinds and operators with no cross-wave
1641    /// state (Map / Filter / Combine / WithLatestFrom / Merge); `Some`
1642    /// for stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
1643    /// [`DistinctUntilChanged`] / [`Pairwise`] / [`Take`] / [`Skip`] /
1644    /// [`TakeWhile`] / [`Last`]).
1645    ///
1646    /// The boxed value implements
1647    /// [`OperatorScratch`](crate::op_state::OperatorScratch); its
1648    /// `release_handles` method is called from
1649    /// [`reset_for_fresh_lifecycle`] (resubscribable terminal cycle) and
1650    /// from [`Drop for CoreState`].
1651    ///
1652    /// **Refcount discipline:** the state struct owns whatever handle
1653    /// shares it stores (e.g., [`ScanState::acc`](crate::op_state::ScanState::acc),
1654    /// [`LastState::latest`](crate::op_state::LastState::latest)).
1655    /// Per-fire helpers retain the new value before releasing the old;
1656    /// `release_handles` releases the current shares at end-of-life.
1657    pub(crate) op_scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1658}
1659
1660impl NodeRecord {
1661    // ---- Kind predicates (D030 — derived from field shape) ----
1662
1663    /// True iff this is a state node (no deps, no fn, no op).
1664    pub(crate) fn is_state(&self) -> bool {
1665        self.dep_records.is_empty() && self.fn_id.is_none() && self.op.is_none()
1666    }
1667
1668    /// True iff this is a producer node (no deps + has fn + no op).
1669    /// Producers fire fn once on first subscribe; cleanup fires via
1670    /// [`BindingBoundary::producer_deactivate`] (D031, Slice D).
1671    pub(crate) fn is_producer(&self) -> bool {
1672        self.dep_records.is_empty() && self.fn_id.is_some() && self.op.is_none()
1673    }
1674
1675    /// True iff this is a compute node (Derived / Dynamic / Operator) —
1676    /// has at least one dep AND either a fn or an op.
1677    #[allow(dead_code)] // Convenience predicate; callers may use is_state/is_producer instead.
1678    pub(crate) fn is_compute(&self) -> bool {
1679        !self.dep_records.is_empty() && (self.fn_id.is_some() || self.op.is_some())
1680    }
1681
1682    /// True iff this is an Operator node (has op set).
1683    #[allow(dead_code)] // Direct `op.is_some()` is more common; this is a readability sugar.
1684    pub(crate) fn is_operator(&self) -> bool {
1685        self.op.is_some()
1686    }
1687
1688    /// True iff this node opts OUT of Lock 2.B auto-cascade —
1689    /// Operator(Reduce) / Operator(Last) intercept upstream COMPLETE.
1690    pub(crate) fn skips_auto_cascade(&self) -> bool {
1691        match self.op {
1692            Some(op) => NodeKind::Operator(op).skips_auto_cascade(),
1693            None => false,
1694        }
1695    }
1696
1697    /// Compute the public-API [`NodeKind`] from the field shape (D030).
1698    /// Used by [`Core::kind_of`] and rare internal sites that need the
1699    /// enum (most use the predicate methods above).
1700    pub(crate) fn kind(&self) -> NodeKind {
1701        if let Some(op) = self.op {
1702            NodeKind::Operator(op)
1703        } else if self.dep_records.is_empty() {
1704            if self.fn_id.is_some() {
1705                NodeKind::Producer
1706            } else {
1707                NodeKind::State
1708            }
1709        } else if self.is_dynamic {
1710            NodeKind::Dynamic
1711        } else {
1712            NodeKind::Derived
1713        }
1714    }
1715
1716    // ---- Existing accessors ----
1717
1718    /// Iterator over dep NodeIds in declaration order.
1719    pub(crate) fn dep_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
1720        self.dep_records.iter().map(|r| r.node)
1721    }
1722
1723    /// Collected dep NodeIds — for call sites that need a `Vec<NodeId>`.
1724    pub(crate) fn dep_ids_vec(&self) -> Vec<NodeId> {
1725        self.dep_ids().collect()
1726    }
1727
1728    /// True if any dep is in sentinel state (never emitted DATA and no
1729    /// data this wave). Replaces the old `dep_handles.contains(&NO_HANDLE)`.
1730    pub(crate) fn has_sentinel_deps(&self) -> bool {
1731        let n = self.dep_records.len();
1732        if n == 0 {
1733            return false;
1734        }
1735        if n <= 64 {
1736            // O(1): check if all bits [0..n) are set.
1737            let full_mask = if n == 64 { u64::MAX } else { (1u64 << n) - 1 };
1738            self.received_mask != full_mask
1739        } else {
1740            // Fallback for >64 deps (extremely rare).
1741            self.dep_records
1742                .iter()
1743                .any(|r| r.prev_data == NO_HANDLE && r.data_batch.is_empty())
1744        }
1745    }
1746
1747    /// Find the index of a dep by NodeId.
1748    pub(crate) fn dep_index_of(&self, dep_id: NodeId) -> Option<usize> {
1749        self.dep_records.iter().position(|r| r.node == dep_id)
1750    }
1751
1752    /// True if ALL dep terminal slots are populated (Lock 2.B cascade check).
1753    pub(crate) fn all_deps_terminal(&self) -> bool {
1754        !self.dep_records.is_empty() && self.dep_records.iter().all(|r| r.terminal.is_some())
1755    }
1756}
1757
1758// Q-beyond Sub-slice 1 (D108, 2026-05-09): `CrossPartitionState` removed.
1759//
1760// The four wave-scoped fields previously held under
1761// `Core::cross_partition: Arc<parking_lot::Mutex<CrossPartitionState>>`
1762// (Q2, 2026-05-09) moved to a per-thread `WaveState` thread_local in
1763// [`crate::batch`]. The bench-driven rationale is documented at
1764// [`crate::batch::WaveState`]: cross-thread cache-line bouncing on the
1765// `cross_partition` mutex was the dominant cost in Phase J's regression,
1766// not single-thread mutex hop count. Per-thread placement eliminates the
1767// bounce point entirely.
1768//
1769// **Refcount discipline preserved.** `wave_cache_snapshots` and
1770// `deferred_handle_releases` still hold binding-side handle retains;
1771// outermost `BatchGuard::drop` drains them via `Core::binding` on both
1772// success and panic paths (the binding ref the prior
1773// `CrossPartitionState` held for its `Drop` impl is no longer needed —
1774// `BatchGuard` already holds a `Core` clone with the binding).
1775
1776/// All mutable Core state, behind one [`parking_lot::Mutex`].
1777///
1778/// **Architecture history.** Q2 (2026-05-09) split four wave-scoped
1779/// cross-partition aggregation fields out into a separate
1780/// `parking_lot::Mutex<CrossPartitionState>` on [`Core`]. Q-beyond
1781/// Sub-slice 1 (D108, 2026-05-09) eliminated `CrossPartitionState`
1782/// entirely — its four fields moved to per-thread `WaveState`
1783/// thread_local in `crate::batch`. Sub-slice 2 + 3 moved 8 more
1784/// wave-scoped fields the same way. /qa F1+F2 (2026-05-10) reverted
1785/// `in_tick` and `currently_firing` to CoreState; `currently_firing`
1786/// stays here (cross-thread P13 set_deps check, /qa F2), but `in_tick`
1787/// was later re-keyed per-(Core, thread) into the
1788/// `crate::batch::IN_TICK_OWNED` thread_local (2026-05-15) — Core-global
1789/// placement broke disjoint-partition drain ownership while per-thread
1790/// broke cross-Core isolation; the (Core, thread) key satisfies both.
1791/// See `docs/rust-port-decisions.md`.
1792///
1793/// The D1 patch (2026-05-09) moved Slice G's `tier3_emitted_this_wave`
1794/// set to a per-thread thread-local in `crate::batch` (was briefly
1795/// per-partition under Q3 v1; that placement was vulnerable to mid-wave
1796/// cross-thread `set_deps` partition splits — see
1797/// `docs/porting-deferred.md` "Per-partition state-shard refactor"
1798/// closing summary). Q-beyond
1799/// will continue the shape decomposition by sharding most of the
1800/// remaining fields per-partition.
1801pub(crate) struct CoreState {
1802    pub(crate) next_node_id: u64,
1803    pub(crate) next_subscription_id: u64,
1804    pub(crate) next_lock_id: u64,
1805    pub(crate) nodes: HashMap<NodeId, NodeRecord>,
1806    /// Inverted adjacency: `parent → children`. Updated on registration.
1807    pub(crate) children: HashMap<NodeId, HashSet<NodeId>>,
1808    // Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_fires` and
1809    // `pending_notify` moved to per-thread
1810    // [`crate::batch::WaveState`]. Both fields are wave-scoped — emit
1811    // populates them on the same thread that drains them at wave end.
1812    // Cross-thread emits block on partition `wave_owner` and land in
1813    // the OTHER thread's wave context, so the per-thread placement is
1814    // safe by construction.
1815    //
1816    // Q-beyond Sub-slice 3 (D108, 2026-05-09): `deferred_flush_jobs`,
1817    // `deferred_cleanup_hooks`, `pending_wipes`, and
1818    // `invalidate_hooks_fired_this_wave` likewise moved to
1819    // [`crate::batch::WaveState`] (same wave-scoped per-thread rationale).
1820    // `in_tick` is per-(Core, thread) in `crate::batch::IN_TICK_OWNED`;
1821    // `currently_firing` stays on `CoreState` (/qa F2, see below).
1822    /// Core-global cap on per-node pause replay buffer length. `None` means
1823    /// unbounded. Per the user direction (Q1, 2026-05-05): start core-global;
1824    /// per-node override can be added later as a pure addition without API
1825    /// breakage. Default `None`.
1826    pub(crate) pause_buffer_cap: Option<usize>,
1827    /// Core-global cap on wave-drain iterations before
1828    /// [`crate::batch::Core::drain_and_flush`] aborts with a diagnostic panic.
1829    /// Replaces the prior `MAX_DRAIN_ITERATIONS` hard-coded constant
1830    /// (R4.3 / Lock 2.F′). Default `10_000`.
1831    ///
1832    /// The drain loop bound exists to surface runtime cycles
1833    /// (e.g. an operator that re-arms its own `pending_fires` slot during
1834    /// `invoke_fn`) as a panic with context, rather than letting Core
1835    /// spin forever. Structural cycles via [`Core::set_deps`] are
1836    /// rejected at edge-mutation time (`SetDepsError::WouldCreateCycle`);
1837    /// registration is structurally cycle-safe by construction (the new
1838    /// node's id is not allocated until AFTER deps are validated, so deps
1839    /// cannot transitively reach the new node). The drain bound is the
1840    /// safety net for runtime cycles that bypass both static checks.
1841    pub(crate) max_batch_drain_iterations: u32,
1842    // Wave-ownership (`in_tick`) is NOT a `CoreState` field: it is keyed
1843    // per-(Core, thread) in the `crate::batch::IN_TICK_OWNED` thread_local
1844    // (see its doc for the cross-Core / disjoint-partition / nested-
1845    // re-entry rationale and history). `currently_firing` below DOES stay
1846    // on `CoreState` — the cross-thread P13 set_deps check (/qa F2)
1847    // requires it be cross-thread-visible.
1848    /// A6 reentrancy guard stack (Slice F, 2026-05-07): the stack of
1849    /// NodeIds whose fn is currently being invoked. Pushed at the top of
1850    /// `fire_fn` (just before the lock-released `invoke_fn` call) and
1851    /// popped on return / unwind via the [`crate::batch::FiringGuard`]
1852    /// RAII helper. [`Core::set_deps`] consults this set and rejects
1853    /// with [`SetDepsError::ReentrantOnFiringNode`] if `n` is currently
1854    /// firing — preventing the D1 `tracked` index corruption. Read by
1855    /// the P13 partition-migration check (D091) to reject mid-fire
1856    /// `set_deps` that would migrate a firing node's partition.
1857    ///
1858    /// **Per-Core (cross-thread visible).** /qa F2 reverted (2026-05-10):
1859    /// briefly placed on per-thread `WaveState` in Sub-slice 3, then
1860    /// moved BACK to `CoreState` after /qa F2 surfaced the cross-thread
1861    /// P13 bypass (per-thread placement made Thread B's `set_deps` read
1862    /// its own empty stack → P13 silently bypassed for cross-thread
1863    /// `set_deps` calls during Thread A's lock-released `invoke_fn`).
1864    /// Cross-thread visibility on shared `CoreState` is the load-bearing
1865    /// property the D091 check requires.
1866    ///
1867    /// Membership semantics (NOT strict LIFO): consumed via
1868    /// `contains(&n)` membership test. `FiringGuard::drop` pops the
1869    /// right-most matching `node_id` via `rposition` + `swap_remove`;
1870    /// physical order of remaining entries may not match construction
1871    /// order, but membership is preserved.
1872    pub(crate) currently_firing: Vec<NodeId>,
1873    // Q-beyond Sub-slice 3 (D108, 2026-05-09): `deferred_flush_jobs`
1874    // moved to [`crate::batch::WaveState`].
1875    /// Binding-boundary handle for `Drop`-time refcount balancing.
1876    /// `Core` also holds a clone of this Arc; storing it here lets
1877    /// `Drop for CoreState` walk every retained slot and release the
1878    /// binding-side share when the last `Core` clone drops. Without this,
1879    /// `cache` / `terminal` / `dep_terminals` Error / pause-buffer payload
1880    /// handle refs leak in the binding registry until process exit.
1881    pub(crate) binding: Arc<dyn BindingBoundary>,
1882    // Q-beyond Sub-slice 1 (D108, 2026-05-09): `wave_cache_snapshots`
1883    // moved to [`crate::batch::WaveState::wave_cache_snapshots`].
1884    // Q-beyond Sub-slice 1 (D108, 2026-05-09): `pending_auto_resolve`
1885    // moved to [`crate::batch::WaveState::pending_auto_resolve`].
1886    /// Topology-change sinks. Keyed by subscription id for O(1) removal.
1887    pub(crate) topology_sinks: HashMap<u64, crate::topology::TopologySink>,
1888    pub(crate) next_topology_id: u64,
1889    /// D-α (D028 full close, 2026-05-10): per-Core defer queue for old
1890    /// operator-scratch boxes pushed by Phase G on resubscribable nodes.
1891    /// Phase G builds a fresh scratch via [`Core::make_op_scratch`] and
1892    /// installs it on the node's `op_scratch` slot (so re-activation
1893    /// sees fresh counters / a fresh seed-share); the OLD scratch's
1894    /// handle retains are deferred to one of two drain points to
1895    /// preserve the Slice C-3 /qa P1 retain-before-release invariant
1896    /// (the C-3 test
1897    /// `scan_resubscribable_reset_with_seed_aliasing_acc_does_not_collapse_registry`
1898    /// fails if the old `acc` share is released before the fresh seed
1899    /// share is taken — when `acc == seed` interns to the same registry
1900    /// slot, eager release drops the slot to zero before the fresh
1901    /// retain bumps it back up).
1902    ///
1903    /// Drain points:
1904    /// 1. [`Core::reset_for_fresh_lifecycle`] — after Phase 2 takes
1905    ///    fresh retains on the new seed/default, the queue drain
1906    ///    releases queued boxes whose handles may have aliased.
1907    /// 2. [`Drop for CoreState`] — catch-all on Core shutdown.
1908    ///
1909    /// Note that the queue lives on `CoreState` (not on `Core`) so
1910    /// `Drop for CoreState` has access to both the binding and the
1911    /// queue under the single state lock — no separate mutex needed.
1912    ///
1913    /// **Growth bound (/qa m17, 2026-05-10):** size is bounded by the
1914    /// number of non-terminal deactivate-reactivate cycles since the
1915    /// last terminal-then-resubscribe reset on any resubscribable +
1916    /// has-op node in this Core. Each entry is a `Box<dyn OperatorScratch>`
1917    /// holding O(1) handles (Scan/Reduce/Last: 1 handle; Take/Skip/
1918    /// TakeWhile/DistinctUntilChanged/Pairwise: 0 or 1 handle). Typical
1919    /// workloads: O(few KB). Degenerate workloads (long-lived Cores
1920    /// with frequent deactivate-reactivate cycles and no terminal
1921    /// resets) should call `core.complete()` / `core.error()` on the
1922    /// op node periodically to trigger queue drain via
1923    /// `reset_for_fresh_lifecycle`. The release happens unconditionally
1924    /// on Core drop, so this is not a leak — just a deferred-release
1925    /// growth concern under unusual workloads.
1926    pub(crate) pending_scratch_release: Vec<Box<dyn crate::op_state::OperatorScratch>>,
1927    // /qa F2 reverted (2026-05-10): `currently_firing` field is declared
1928    // EARLIER in this struct (above `pause_buffer_cap`). Sub-slice 3
1929    // briefly moved it to `crate::batch::WaveState::currently_firing` on
1930    // the per-thread thread_local, but per-thread placement silently
1931    // bypassed the cross-thread P13 partition-migration check. /qa F2
1932    // (2026-05-10) moved it BACK to CoreState (cross-thread visible).
1933    // Q-beyond Sub-slice 1 (D108, 2026-05-09): `pending_pause_overflow`
1934    // moved to [`crate::batch::WaveState::pending_pause_overflow`].
1935    // Slice G (R1.3.2.d / R1.3.3.a — 2026-05-07): tier3-emitted-this-wave
1936    // tracker MOVED to a per-thread thread-local in `crate::batch`
1937    // (D1 patch, 2026-05-09 — was briefly per-partition under Q3 v1
1938    // 2026-05-09 morning). Wave-scope = thread; per-thread placement
1939    // is robust to mid-wave cross-thread `set_deps` partition splits
1940    // because thread B's split doesn't touch thread A's thread-local.
1941    // See [`crate::batch::TIER3_EMITTED_THIS_WAVE`] for the per-thread
1942    // wave-scope rationale and lifecycle (cleared at outermost
1943    // [`crate::batch::BatchGuard`] drop, both success + panic).
1944    //
1945    // Q-beyond Sub-slice 3 (D108, 2026-05-09):
1946    // `invalidate_hooks_fired_this_wave`, `deferred_cleanup_hooks`,
1947    // and `pending_wipes` moved to [`crate::batch::WaveState`].
1948    // Wave-scoped per-thread; same rationale as the other Sub-slice 3
1949    // migrations.
1950}
1951
1952/// The handle-protocol Core dispatcher.
1953///
1954/// Holds an [`Arc`] to the [`BindingBoundary`] and all dispatch state. Cheap
1955/// to clone (the inner `Arc<Mutex<CoreState>>` is shared); pass `Core` by
1956/// value to threads.
1957///
1958/// # Wave-owner re-entrant mutex (Slice A close /qa, M1)
1959///
1960/// The state lock (`state: Mutex<CoreState>`) is **dropped** around binding
1961/// callbacks (`invoke_fn`, `custom_equals`) so user fns may re-enter Core.
1962/// To preserve serializability of WAVE EXECUTION across threads — without
1963/// re-introducing the lock-held-during-fn-fire deadlock the Slice A close
1964/// refactor lifted — the wave engine acquires `wave_owner` (a
1965/// [`parking_lot::ReentrantMutex`]) for the lifetime of each wave.
1966///
1967/// Properties:
1968///
1969/// - **Same-thread re-entrance is free.** A user fn that calls back into
1970///   `Core::emit` / `Core::pause` / etc. mid-fire re-acquires `wave_owner`
1971///   on the same thread and runs as a nested wave (the inner `run_wave`
1972///   sees `in_tick=true` and skips drain — outer drain picks up).
1973/// - **Cross-thread emits BLOCK** at `wave_owner.lock_arc()` until the
1974///   in-flight wave completes (drain + flush + sink fire all done). This
1975///   serializes wave OWNERSHIP across threads, while still allowing the
1976///   state lock to drop inside the wave for binding callbacks.
1977///
1978/// Without this, Slice A close's lock-released drain let cross-thread
1979/// emits absorb into the in-flight wave's `pending_notify` and return
1980/// before subscribers fire — breaking the user-facing happens-after
1981/// contract that `emit` returning means subscribers have observed.
1982/// Monotonic generation counter for `Core` instances. Used by the
1983/// per-thread `PARTITION_CACHE` in `batch.rs` to distinguish Core
1984/// instances without relying on `Arc::as_ptr` (which can be reused by
1985/// the allocator after a Core is dropped — ABA hazard). One atomic
1986/// increment per `Core::new`; negligible cost.
1987static CORE_GENERATION: AtomicU64 = AtomicU64::new(1);
1988
1989#[derive(Clone)]
1990pub struct Core {
1991    pub(crate) state: Arc<Mutex<CoreState>>,
1992    pub(crate) binding: Arc<dyn BindingBoundary>,
1993    /// Deferred producer-pattern operations. Per-Core (not per-thread)
1994    /// to avoid the cross-Core contamination discovered in D114 F1/F2.
1995    /// Drained after wave_guards release in `BatchGuard::drop`.
1996    pub(crate) deferred_producer_ops: Arc<parking_lot::Mutex<Vec<DeferredProducerOp>>>,
1997    /// Slice X5 (D3 substrate, 2026-05-08) + Slice Y1 / Phase E
1998    /// (wave-engine migration, 2026-05-08): per-subgraph union-find
1999    /// registry. Tracks each registered node's connected-component
2000    /// membership (a "subgraph"). Each component's root carries an
2001    /// `Arc<SubgraphLockBox>` whose `wave_owner: ReentrantMutex<()>`
2002    /// is the per-partition wave-serialization lock — acquired by
2003    /// [`Self::partition_wave_owner_lock_arc`] under the retry-validate
2004    /// loop. Cross-thread emits to disjoint partitions run truly
2005    /// parallel; same-thread re-entry passes through reentrantly.
2006    ///
2007    /// Direct port of [`graphrefly-py`'s
2008    /// `subgraph_locks.py`](https://github.com/graphrefly/graphrefly-py/blob/main/src/graphrefly/core/subgraph_locks.py)
2009    /// design (locked in [`SESSION-rust-port-d3-per-subgraph-parallelism.md`](https://github.com/graphrefly/graphrefly-ts/blob/main/archive/docs/SESSION-rust-port-d3-per-subgraph-parallelism.md)).
2010    pub(crate) registry: Arc<parking_lot::Mutex<crate::subgraph::SubgraphRegistry>>,
2011    /// Unique generation ID for this Core instance. Assigned from
2012    /// [`CORE_GENERATION`] at construction. Used by `PARTITION_CACHE`
2013    /// to avoid ABA false-hits after Core drop + allocator reuse.
2014    pub(crate) generation: u64,
2015}
2016
2017/// Weak handle to a [`Core`] — does not contribute to strong refcount.
2018///
2019/// Constructed via [`Core::weak_handle`]; upgraded back to a strong
2020/// [`Core`] via [`WeakCore::upgrade`]. Used by long-lived binding-stored
2021/// closures (notably `ProducerBuildFn`s registered via
2022/// [`graphrefly_operators::ProducerBinding::register_producer_build`])
2023/// to break the BenchBinding → registry → closure → strong-Core cycle
2024/// that would otherwise leak the entire graph state when a `BenchCore`
2025/// drops with active producer registrations.
2026///
2027/// Upgrade on each invocation; if the host `Core` was already dropped,
2028/// `upgrade()` returns `None` and the closure should no-op (the host
2029/// is being torn down, no work to do).
2030#[derive(Clone)]
2031pub struct WeakCore {
2032    state: Weak<Mutex<CoreState>>,
2033    binding: Weak<dyn BindingBoundary>,
2034    deferred_producer_ops: Weak<parking_lot::Mutex<Vec<DeferredProducerOp>>>,
2035    registry: Weak<parking_lot::Mutex<crate::subgraph::SubgraphRegistry>>,
2036    generation: u64,
2037}
2038
2039impl WeakCore {
2040    /// Try to upgrade back to a strong [`Core`]. Returns `None` if the
2041    /// host `Core`'s strong count has reached zero (i.e. the host
2042    /// `BenchCore` / equivalent owner was dropped).
2043    #[must_use]
2044    pub fn upgrade(&self) -> Option<Core> {
2045        Some(Core {
2046            state: self.state.upgrade()?,
2047            binding: self.binding.upgrade()?,
2048            deferred_producer_ops: self.deferred_producer_ops.upgrade()?,
2049            registry: self.registry.upgrade()?,
2050            generation: self.generation,
2051        })
2052    }
2053}
2054
2055/// RAII guard that owns an [`OperatorScratch`] until either (a) the
2056/// caller `take()`s it for installation, or (b) the guard drops on an
2057/// early return / unwind, in which case the scratch's handle retains
2058/// are released via [`OperatorScratch::release_handles`].
2059///
2060/// Slice H /qa F1 + F2 (2026-05-07): closes two related correctness
2061/// gaps in `Core::register`:
2062///
2063/// 1. **TOCTOU window** — the original three-phase split called
2064///    `lock_state()` twice (once for validation, once for insertion),
2065///    so a concurrent `Core::complete(dep)` on a non-resubscribable
2066///    dep could slip in between the two acquisitions and re-create
2067///    the wedge `RegisterError::TerminalDep` was designed to prevent.
2068///    The guard plus a single locked region for both phases closes
2069///    this gap (release runs lock-released because guard variables
2070///    drop in reverse declaration order — guard declared BEFORE
2071///    `lock_state()`, so the lock guard drops first).
2072///
2073/// 2. **Panic-unsafe scratch leak** — without an RAII drop, a panic
2074///    between `make_op_scratch` (Phase 2) and the explicit
2075///    `if let Err(e)` cleanup branch (e.g., `lock_state()` reentrance
2076///    assert, OOM-as-panic on Vec growth in dep iteration) would
2077///    drop the `Box<dyn OperatorScratch>` without releasing the
2078///    seed/default retain. The guard's `Drop` impl releases on any
2079///    unwind path.
2080///
2081/// Lock-discipline: the guard holds `&dyn BindingBoundary` (through
2082/// the `Arc<dyn BindingBoundary>` it borrows from). On `Drop`, it
2083/// invokes `release_handles` lock-released — fires AFTER any
2084/// `MutexGuard<CoreState>` declared later in the same scope drops
2085/// (LIFO destruction order). Mirrors `Core::resume` Phase 2 release
2086/// pattern.
2087struct ScratchReleaseGuard<'a> {
2088    scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
2089    binding: &'a dyn BindingBoundary,
2090}
2091
2092impl<'a> ScratchReleaseGuard<'a> {
2093    fn new(
2094        scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
2095        binding: &'a dyn BindingBoundary,
2096    ) -> Self {
2097        Self { scratch, binding }
2098    }
2099
2100    /// Take ownership of the scratch — disarms the release-on-drop
2101    /// behavior. Used on the success path to install the scratch on
2102    /// `NodeRecord.op_scratch`.
2103    fn take(mut self) -> Option<Box<dyn crate::op_state::OperatorScratch>> {
2104        self.scratch.take()
2105    }
2106}
2107
2108impl Drop for ScratchReleaseGuard<'_> {
2109    fn drop(&mut self) {
2110        if let Some(mut scratch) = self.scratch.take() {
2111            scratch.release_handles(self.binding);
2112        }
2113    }
2114}
2115
2116impl Core {
2117    /// Construct a fresh Core wired to the given binding. Pause buffer cap
2118    /// defaults to unbounded; set via [`Self::set_pause_buffer_cap`].
2119    #[must_use]
2120    pub fn new(binding: Arc<dyn BindingBoundary>) -> Self {
2121        Self {
2122            state: Arc::new(Mutex::new(CoreState {
2123                next_node_id: 1,
2124                next_subscription_id: 1,
2125                // A4 (Slice F, 2026-05-07): start `next_lock_id` in the high
2126                // half of the u32 range so `alloc_lock_id` can't collide with
2127                // user-supplied `LockId::new(N)` constructors (which the
2128                // napi-rs binding marshals from `u32` and tests typically use
2129                // in the low range, 1..1024). Phase E /qa F1 (2026-05-08):
2130                // lowered from `1u64 << 32` to `1u64 << 31` so the value
2131                // round-trips through `u32::try_from(...)` at the napi
2132                // boundary — the previous seed errored every napi
2133                // `alloc_lock_id` call. Anti-collision intent (high range vs
2134                // low user range) preserved at half the prior ceiling
2135                // (2^31 ≈ 2 billion allocations per Core, ample for parity
2136                // tests). Lift the floor when the deferred BigInt-narrowing
2137                // migration extends `LockId` to `u64` at the FFI layer
2138                // (porting-deferred "BigInt migration for u32-narrowed napi
2139                // types" entry).
2140                next_lock_id: 1u64 << 31,
2141                nodes: HashMap::new(),
2142                children: HashMap::new(),
2143                // Q-beyond Sub-slice 2 + 3 (D108, 2026-05-09): pending_fires,
2144                // pending_notify, deferred_flush_jobs,
2145                // deferred_cleanup_hooks, pending_wipes, and
2146                // invalidate_hooks_fired_this_wave all live on per-thread
2147                // [`crate::batch::WaveState`].
2148                // `currently_firing` stays on CoreState (cross-thread
2149                // visible) for the cross-thread P13 set_deps check (/qa
2150                // F2). `in_tick` is NOT here — it is per-(Core, thread) in
2151                // `crate::batch::IN_TICK_OWNED` (see its doc).
2152                currently_firing: Vec::new(),
2153                pause_buffer_cap: None,
2154                max_batch_drain_iterations: 10_000,
2155                binding: binding.clone(),
2156                topology_sinks: HashMap::new(),
2157                next_topology_id: 1,
2158                pending_scratch_release: Vec::new(),
2159            })),
2160            binding,
2161            deferred_producer_ops: Arc::new(parking_lot::Mutex::new(Vec::new())),
2162            registry: Arc::new(parking_lot::Mutex::new(
2163                crate::subgraph::SubgraphRegistry::new(),
2164            )),
2165            generation: CORE_GENERATION.fetch_add(1, Ordering::Relaxed),
2166        }
2167    }
2168
2169    /// Acquire the state lock.
2170    ///
2171    /// Post-Slice-E: `Core::subscribe` fires the per-tier handshake
2172    /// LOCK-RELEASED with `wave_owner` held; sink callbacks may freely
2173    /// re-enter Core (`emit` / `complete` / `error` / nested `subscribe`).
2174    /// Same-thread re-entry passes through `wave_owner`'s `ReentrantMutex`
2175    /// transparently; cross-thread emits block on `wave_owner` until the
2176    /// outer subscribe completes, preserving R1.3.5.a happens-after
2177    /// ordering. The previous `IN_HANDSHAKE_FIRE` panic-diagnostic is no
2178    /// longer needed.
2179    pub(crate) fn lock_state(&self) -> MutexGuard<'_, CoreState> {
2180        self.state.lock()
2181    }
2182
2183    /// Whether `self` and `other` point to the same dispatcher state.
2184    /// True when one was produced by `Clone`-ing the other (or they
2185    /// were both cloned from a common ancestor); false for two
2186    /// independently `Core::new`-constructed instances even with the
2187    /// same binding.
2188    ///
2189    /// Used by `graphrefly-graph`'s `mount` to enforce the "shared-Core
2190    /// only" v1 invariant — cross-Core mount is post-M6.
2191    #[must_use]
2192    pub fn same_dispatcher(&self, other: &Core) -> bool {
2193        Arc::ptr_eq(&self.state, &other.state)
2194    }
2195
2196    /// Downgrade to a [`WeakCore`] handle that doesn't contribute to
2197    /// strong refcount of the underlying state / binding / wave_owner.
2198    ///
2199    /// Used by binding-stored long-lived closures (e.g.
2200    /// `register_producer_build`-stored `ProducerBuildFn`s) to avoid the
2201    /// Arc cycle:
2202    ///
2203    /// ```text
2204    /// BenchBinding → registry → producer_builds[fn_id]
2205    ///   → closure → strong Arc<dyn _Binding> → BenchBinding
2206    /// ```
2207    ///
2208    /// Closures hold `WeakCore` and `Weak<dyn _Binding>` instead, then
2209    /// upgrade-on-fire (returning early if either weak is dangling —
2210    /// indicating the host BenchCore was already dropped). Upgraded
2211    /// strong refs live only for the build closure's invocation; sinks
2212    /// the build closure spawns close over those upgraded strongs and
2213    /// stay alive only while the producer is active (cleared via
2214    /// `producer_deactivate` on last-subscriber unsubscribe).
2215    #[must_use]
2216    pub fn weak_handle(&self) -> WeakCore {
2217        WeakCore {
2218            state: Arc::downgrade(&self.state),
2219            binding: Arc::downgrade(&self.binding),
2220            deferred_producer_ops: Arc::downgrade(&self.deferred_producer_ops),
2221            registry: Arc::downgrade(&self.registry),
2222            generation: self.generation,
2223        }
2224    }
2225
2226    /// Number of distinct connected-component partitions tracked by
2227    /// the per-subgraph union-find registry (Slice X5 substrate).
2228    /// Two threads emitting into nodes with distinct partitions will
2229    /// run truly parallel once Y1 wires the wave engine through the
2230    /// registry; X5 reports the partition count for inspection
2231    /// (acceptance bar + debugging) but the wave engine still uses
2232    /// the legacy Core-level `wave_owner`.
2233    #[must_use]
2234    pub fn partition_count(&self) -> usize {
2235        self.registry.lock().component_count()
2236    }
2237
2238    /// Resolve `node`'s partition identity per the per-subgraph
2239    /// union-find registry (Slice X5 substrate). Two nodes with the
2240    /// same `SubgraphId` are connected via dep edges (transitively)
2241    /// and share a partition lock under Y1+; nodes in different
2242    /// partitions can run truly parallel.
2243    ///
2244    /// Returns `None` for unregistered nodes.
2245    #[must_use]
2246    pub fn partition_of(&self, node: NodeId) -> Option<crate::subgraph::SubgraphId> {
2247        self.registry.lock().partition_of(node)
2248    }
2249
2250    /// Push a deferred producer operation. Called by operator sinks
2251    /// when a Core method returns `PartitionOrderViolation`.
2252    ///
2253    /// For `Emit` and `Error` variants, the caller MUST retain the
2254    /// handle before pushing (the drain releases it after firing).
2255    /// `Complete` and `Callback` variants have no handle to retain.
2256    pub fn push_deferred_producer_op(&self, op: DeferredProducerOp) {
2257        self.deferred_producer_ops.lock().push(op);
2258    }
2259
2260    /// Drain deferred producer ops when no partitions are held on
2261    /// the current thread. Each op may itself produce new deferred
2262    /// ops (e.g., a deferred subscribe activates a producer whose
2263    /// build defers further subscribes), so drain in a loop until
2264    /// the queue is empty.
2265    ///
2266    /// Called from `try_subscribe` after its `wave_guard` drops,
2267    /// and from `BatchGuard::drop` after releasing all wave_guards.
2268    /// Skips silently if partitions are still held (nested context).
2269    /// Maximum number of drain iterations before panicking with a
2270    /// diagnostic. Prevents unbounded loops from buggy callbacks that
2271    /// keep pushing more deferred ops indefinitely.
2272    const MAX_DEFERRED_DRAIN_ITERATIONS: u32 = 1000;
2273
2274    pub(crate) fn drain_deferred_producer_ops(&self) {
2275        if held_partitions::any_held() {
2276            return;
2277        }
2278        let mut iterations = 0u32;
2279        loop {
2280            let deferred_ops: Vec<DeferredProducerOp> = {
2281                let mut ops = self.deferred_producer_ops.lock();
2282                if ops.is_empty() {
2283                    break;
2284                }
2285                std::mem::take(&mut *ops)
2286            };
2287            iterations += 1;
2288            assert!(
2289                iterations <= Self::MAX_DEFERRED_DRAIN_ITERATIONS,
2290                "drain_deferred_producer_ops exceeded {} iterations — \
2291                 a deferred callback is likely pushing unbounded ops. \
2292                 Iteration {iterations}, batch size {}.",
2293                Self::MAX_DEFERRED_DRAIN_ITERATIONS,
2294                deferred_ops.len(),
2295            );
2296            for op in deferred_ops {
2297                match op {
2298                    DeferredProducerOp::Emit { node_id, handle } => {
2299                        self.emit(node_id, handle);
2300                        self.binding.release_handle(handle);
2301                    }
2302                    DeferredProducerOp::Complete { node_id } => {
2303                        self.complete(node_id);
2304                    }
2305                    DeferredProducerOp::Error { node_id, handle } => {
2306                        self.error(node_id, handle);
2307                        self.binding.release_handle(handle);
2308                    }
2309                    DeferredProducerOp::Callback(f) => {
2310                        f();
2311                    }
2312                }
2313            }
2314        }
2315    }
2316
2317    // Q3 (2026-05-09) introduced `Core::partition_box_of(node)` to
2318    // resolve a partition's `Arc<SubgraphLockBox>` for per-partition
2319    // state access. The D1 patch (2026-05-09) moved Slice G's
2320    // `tier3_emitted_this_wave` set off `SubgraphLockBox::state` to a
2321    // per-thread thread-local in `crate::batch`, eliminating
2322    // `partition_box_of`'s only callers (`commit_emission` /
2323    // `commit_emission_verbatim`). The helper is REMOVED rather than
2324    // kept dead — Q-beyond will resurrect a similar shape when the
2325    // CoreState shard layout actually needs per-partition lookups.
2326
2327    /// Acquire `seed`'s partition `wave_owner` re-entrant mutex with
2328    /// retry-validate against concurrent union/split. Mirrors
2329    /// graphrefly-py's `subgraph_locks.py::lock_for` retry pattern
2330    /// (lines 154–178): a concurrent `union_nodes` may redirect
2331    /// `seed`'s partition root between our `lock_for` resolve and
2332    /// our `lock_arc` call; if so, the held guard is on a stale
2333    /// (but still valid) `SubgraphLockBox` whose `Arc` no longer
2334    /// matches the registry's canonical box for `seed`'s current
2335    /// root. Release + retry up to [`crate::subgraph::MAX_LOCK_RETRIES`].
2336    ///
2337    /// Returns the held guard. Caller holds it for the wave's
2338    /// duration; drop releases.
2339    ///
2340    /// **Panics** if `seed` is not registered (caller violation —
2341    /// every wave entry takes a `NodeId` already in `s.nodes`, and
2342    /// the P12-fixed lock-discipline guarantees registry membership
2343    /// is published atomically with state). **Panics** on exceeding
2344    /// `MAX_LOCK_RETRIES` — pathological union activity.
2345    ///
2346    /// Slice Y1 / Phase E (2026-05-08).
2347    pub(crate) fn partition_wave_owner_lock_arc(
2348        &self,
2349        seed: NodeId,
2350    ) -> Result<WaveOwnerGuard, PartitionOrderViolation> {
2351        /// Scope-guard for the H+ thread-local refcount entry. Released on
2352        /// Drop unless `into_consumed()` is called (the success path).
2353        /// Ensures balance even on panic between `check_and_acquire` and
2354        /// successful `WaveOwnerGuard` construction (`lock_arc()` /
2355        /// `lock_for_validate()` could in principle panic; defensive).
2356        struct AcquireGuard {
2357            sid: crate::subgraph::SubgraphId,
2358            consumed: bool,
2359        }
2360        impl AcquireGuard {
2361            fn into_consumed(mut self) {
2362                self.consumed = true;
2363            }
2364        }
2365        impl Drop for AcquireGuard {
2366            fn drop(&mut self) {
2367                if !self.consumed {
2368                    held_partitions::release(self.sid);
2369                }
2370            }
2371        }
2372
2373        for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
2374            let (sid, lock_box) = {
2375                let mut reg = self.registry.lock();
2376                reg.lock_for(seed).expect(
2377                    "partition_wave_owner_lock_arc: seed must be registered \
2378                     (P12-fix invariant: registry membership is published \
2379                     atomically with `s.nodes`)",
2380                )
2381            };
2382            // Phase H+ option (d) /qa N1(a) widened variant: BEFORE
2383            // acquiring the parking_lot lock, check ascending-order if
2384            // this thread already holds at least one partition AND we're
2385            // not in a producer build closure. Panics on violation.
2386            // Also increments the thread-local refcount for `sid`. The
2387            // `AcquireGuard` ensures the refcount is released on EVERY
2388            // exit path — successful return (via `into_consumed()`),
2389            // retry-validate failure (Drop fires), retry-exhaustion panic
2390            // (Drop fires before unwind), or a panic in `lock_arc()` /
2391            // `lock_for_validate()` (Drop fires during unwind).
2392            held_partitions::check_and_acquire(sid)?;
2393            let acquire_guard = AcquireGuard {
2394                sid,
2395                consumed: false,
2396            };
2397            let inner = lock_box.wave_owner.lock_arc();
2398            // Re-validate post-acquire. If a concurrent `union` redirected
2399            // `seed`'s root between our `lock_for` and `lock_arc`, the
2400            // registry's current box for `seed` differs from what we hold.
2401            let still_valid = self.registry.lock().lock_for_validate(seed, &lock_box);
2402            if still_valid {
2403                acquire_guard.into_consumed();
2404                // `lock_box` is unused after this point — the D1 patch
2405                // moved Slice G tier3 tracking off the per-partition
2406                // `SubgraphLockBox::state` to a per-thread thread-local,
2407                // so the guard no longer carries the box reference.
2408                drop(lock_box);
2409                return Ok(WaveOwnerGuard { sid, inner });
2410            }
2411            // Stale — drop the parking_lot guard. The AcquireGuard's
2412            // Drop releases the held_partitions refcount automatically.
2413            // Yield to give the contending writer a chance to make
2414            // forward progress before re-resolving (QA-fix group 2 —
2415            // earlier tight-spin could monopolize a CPU under sustained
2416            // pathological union/split activity).
2417            drop(inner);
2418            drop(acquire_guard);
2419            std::thread::yield_now();
2420        }
2421        panic!(
2422            "partition_wave_owner_lock_arc: exceeded {} retries for seed {:?} \
2423             — pathological concurrent union/split activity. Mirrors py \
2424             `_MAX_LOCK_RETRIES`.",
2425            crate::subgraph::MAX_LOCK_RETRIES,
2426            seed
2427        );
2428    }
2429
2430    /// BFS from `seed` along `s.children` (downstream consumer cascade
2431    /// for DATA / RESOLVED / INVALIDATE / COMPLETE / ERROR / TEARDOWN)
2432    /// and `meta_companions` (R1.3.9.d TEARDOWN cascade). Collects
2433    /// every partition reachable from `seed`, returning the unique
2434    /// `SubgraphId`s sorted ascending — the canonical lock-acquisition
2435    /// order per session-doc Q7 / decision D092 that guarantees
2436    /// deadlock-freedom across cross-partition waves.
2437    ///
2438    /// Holds the state lock + registry lock for the BFS duration
2439    /// (lock order `state → registry` per the P12-fix invariant).
2440    /// Bounded by the cascade graph reachable from `seed`; for typical
2441    /// apps the partition count is small (1–3) and the BFS is
2442    /// negligible relative to wave drain.
2443    ///
2444    /// Used by [`Core::begin_batch_for`] to compute the upfront-
2445    /// acquired partition set for per-seed waves. Closure-form
2446    /// [`Core::batch`] doesn't have a seed and uses
2447    /// [`Core::all_partitions_lock_boxes`] instead.
2448    ///
2449    /// Slice Y1 / Phase E (2026-05-08).
2450    pub(crate) fn compute_touched_partitions(
2451        &self,
2452        seed: NodeId,
2453    ) -> SmallVec<[crate::subgraph::SubgraphId; 4]> {
2454        let s = self.lock_state();
2455        let mut reg = self.registry.lock();
2456        let mut partitions: SmallVec<[crate::subgraph::SubgraphId; 4]> = SmallVec::new();
2457        let mut visited: HashSet<NodeId> = HashSet::default();
2458        let mut stack: SmallVec<[NodeId; 16]> = SmallVec::new();
2459        stack.push(seed);
2460        while let Some(n) = stack.pop() {
2461            if !visited.insert(n) {
2462                continue;
2463            }
2464            if let Some(p) = reg.partition_of(n) {
2465                if !partitions.contains(&p) {
2466                    partitions.push(p);
2467                }
2468            }
2469            if let Some(children) = s.children.get(&n) {
2470                stack.extend(children.iter().copied());
2471            }
2472            if let Some(rec) = s.nodes.get(&n) {
2473                stack.extend(rec.meta_companions.iter().copied());
2474            }
2475        }
2476        partitions.sort_unstable_by_key(|sid| sid.raw());
2477        partitions
2478    }
2479
2480    /// Snapshot of every currently-existing partition's lock box, in
2481    /// ascending [`crate::subgraph::SubgraphId`] order (canonical
2482    /// lock-acquisition order per session-doc Q7 / D092). Used by
2483    /// closure-form [`Core::batch`] / [`Core::begin_batch`] which
2484    /// don't have a known seed and must serialize against every
2485    /// existing partition.
2486    ///
2487    /// Slice Y1 / Phase E (2026-05-08).
2488    pub(crate) fn all_partitions_lock_boxes(
2489        &self,
2490    ) -> Vec<(
2491        crate::subgraph::SubgraphId,
2492        Arc<crate::subgraph::SubgraphLockBox>,
2493    )> {
2494        self.registry.lock().all_partitions()
2495    }
2496}
2497
2498/// Walk the undirected dep-edge graph from `start`, optionally
2499/// skipping ONE edge in both directions, and optionally treating
2500/// additional edges as if present. Returns every reachable
2501/// [`NodeId`].
2502///
2503/// Implementation note: uses a stack (`pop()` on a `SmallVec`) — i.e.
2504/// DFS traversal order. For pure reachability the order doesn't
2505/// matter (the visited set is identical to BFS); the function is
2506/// named "walk" rather than "BFS" to avoid implying that traversal
2507/// distance is meaningful (QA-fix group 2 — earlier name
2508/// `bfs_undirected_dep_graph` was misleading).
2509///
2510/// **Edge convention:** the dep edge `parent → child` represents
2511/// data flow from `parent` (a dep) to `child` (the consumer). It
2512/// appears in `s.children[parent]` as `child`, and in
2513/// `s.nodes[child].dep_records` as `parent`. `skip_edge =
2514/// Some((parent, child))` skips both forward (`parent → child`) and
2515/// backward (`child → parent`) traversals of that edge. Each
2516/// `(p, c)` pair in `extra_edges` is treated as if `c ∈
2517/// s.children[p]` and `p ∈ s.nodes[c].dep_records` — used for
2518/// "what would connectivity look like if THESE edges were also
2519/// present?" lookahead.
2520///
2521/// Used by Slice Y1 / Phase F (D3 split-eager, 2026-05-09):
2522/// - **P13 widening (pre-removal connectivity):** call with
2523///   `skip_edge = Some((removed_parent, removed_child))` AND
2524///   `extra_edges = added_edges_in_set_deps_call` so a `set_deps`
2525///   that simultaneously removes one edge AND adds another path
2526///   isn't falsely flagged as disconnecting (QA-fix #4 2026-05-09 —
2527///   without `extra_edges`, the pre-mutation BFS doesn't see the
2528///   would-be-added edges and rejects the conservative case).
2529/// - **Actual split execution (post-removal):** call with
2530///   `skip_edge = None` and `extra_edges = &[]`; the visited set is
2531///   the keep-side of the split (the side containing `start`).
2532pub(crate) fn walk_undirected_dep_graph(
2533    s: &CoreState,
2534    start: NodeId,
2535    skip_edge: Option<(NodeId, NodeId)>,
2536    extra_edges: &[(NodeId, NodeId)],
2537) -> HashSet<NodeId> {
2538    let mut visited: HashSet<NodeId> = HashSet::default();
2539    let mut queue: SmallVec<[NodeId; 32]> = SmallVec::new();
2540    queue.push(start);
2541    while let Some(cur) = queue.pop() {
2542        if !visited.insert(cur) {
2543            continue;
2544        }
2545        if let Some(consumers) = s.children.get(&cur) {
2546            for &c in consumers {
2547                let is_skipped = skip_edge.is_some_and(|(sp, sc)| cur == sp && c == sc);
2548                if !is_skipped && !visited.contains(&c) {
2549                    queue.push(c);
2550                }
2551            }
2552        }
2553        if let Some(rec) = s.nodes.get(&cur) {
2554            for d in rec.dep_records.iter().map(|r| r.node) {
2555                let is_skipped = skip_edge.is_some_and(|(sp, sc)| cur == sc && d == sp);
2556                if !is_skipped && !visited.contains(&d) {
2557                    queue.push(d);
2558                }
2559            }
2560        }
2561        // Virtual extra edges (e.g. would-be-added edges in
2562        // pre-mutation BFS).
2563        for &(ep, ec) in extra_edges {
2564            if cur == ep && !visited.contains(&ec) {
2565                queue.push(ec);
2566            }
2567            if cur == ec && !visited.contains(&ep) {
2568                queue.push(ep);
2569            }
2570        }
2571    }
2572    visited
2573}
2574
2575impl Core {
2576    /// Test-only inspection: number of `PendingBatch`es queued for
2577    /// `node` in the current wave. Used by Slice X4 D2 regression
2578    /// tests to pin the "common case = single batch, no SmallVec
2579    /// spill" perf invariant.
2580    ///
2581    /// Returns `None` if no `pending_notify` entry exists for `node`
2582    /// (no tier-1+ message has been queued for this node yet in this
2583    /// wave). `Some(0)` is unreachable by construction (a vacant
2584    /// entry implies no batches; an occupied entry has at least one).
2585    #[cfg(any(test, debug_assertions))]
2586    #[must_use]
2587    pub fn pending_batch_count(&self, node: NodeId) -> Option<usize> {
2588        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
2589        // on per-thread `WaveState`. Test callers run on the same
2590        // thread that ran the wave, so the per-thread placement is
2591        // observable here.
2592        crate::batch::with_wave_state(|ws| {
2593            ws.pending_notify
2594                .get(&node)
2595                .map(|entry| entry.batches.len())
2596        })
2597    }
2598
2599    /// Configure the Core-global cap on pause replay buffer length. When set,
2600    /// any per-node pause buffer that would exceed `cap` drops the oldest
2601    /// message(s) from the front; the dropped count is reported back via the
2602    /// resume callback (see [`ResumeReport`]). `None` (default) means
2603    /// unbounded; messages buffer indefinitely until the lockset clears.
2604    pub fn set_pause_buffer_cap(&self, cap: Option<usize>) {
2605        self.lock_state().pause_buffer_cap = cap;
2606    }
2607
2608    /// Configure the replay buffer cap on `node_id` (R2.6.5 / Lock 6.G —
2609    /// Slice E1, 2026-05-07). `None` disables the buffer. `Some(N)` keeps
2610    /// the last `N` DATA emissions in a circular buffer; late subscribers
2611    /// receive them as part of the per-tier handshake (between START and
2612    /// any terminal). Switching from a larger cap to a smaller cap evicts
2613    /// the front of the buffer to fit; switching to `None` drains the
2614    /// buffer entirely. Each evicted/drained handle's retain is released
2615    /// back to the binding.
2616    ///
2617    /// # Panics
2618    ///
2619    /// Panics if `node_id` is not registered.
2620    pub fn set_replay_buffer_cap(&self, node_id: NodeId, cap: Option<usize>) {
2621        // QA A7 (2026-05-07): normalize `Some(0)` to `None`. Two ways to
2622        // express "disabled" is confusing: `push_replay_buffer` already
2623        // treats `Some(0)` as no-op, so persisting it adds nothing.
2624        let cap = match cap {
2625            Some(0) => None,
2626            other => other,
2627        };
2628        let to_release: Vec<HandleId> = {
2629            let mut s = self.lock_state();
2630            let rec = s.require_node_mut(node_id);
2631            rec.replay_buffer_cap = cap;
2632            match cap {
2633                None => rec.replay_buffer.drain(..).collect(),
2634                Some(c) => {
2635                    let mut drained = Vec::new();
2636                    while rec.replay_buffer.len() > c {
2637                        if let Some(h) = rec.replay_buffer.pop_front() {
2638                            drained.push(h);
2639                        }
2640                    }
2641                    drained
2642                }
2643            }
2644        };
2645        for h in to_release {
2646            self.binding.release_handle(h);
2647        }
2648    }
2649
2650    /// Reconfigure the pause mode for `node_id` (canonical §2.6 — Slice F
2651    /// audit close, 2026-05-07). Default for new nodes is
2652    /// [`PausableMode::Default`]; switch to [`PausableMode::ResumeAll`]
2653    /// for nodes whose pause-window emit history must be observable
2654    /// verbatim, or [`PausableMode::Off`] for nodes intrinsically
2655    /// pause-immune.
2656    ///
2657    /// # Errors
2658    ///
2659    /// - [`SetPausableModeError::UnknownNode`] — `node_id` is not
2660    ///   registered.
2661    /// - [`SetPausableModeError::WhilePaused`] — the node currently
2662    ///   holds at least one pause lock. Changing mode mid-pause would
2663    ///   lose buffered content or strand a `pending_wave` flag — resume
2664    ///   all locks first.
2665    pub fn set_pausable_mode(
2666        &self,
2667        node_id: NodeId,
2668        mode: PausableMode,
2669    ) -> Result<(), SetPausableModeError> {
2670        let mut s = self.lock_state();
2671        let rec = s
2672            .nodes
2673            .get_mut(&node_id)
2674            .ok_or(SetPausableModeError::UnknownNode(node_id))?;
2675        if rec.pause_state.is_paused() {
2676            return Err(SetPausableModeError::WhilePaused);
2677        }
2678        rec.pausable = mode;
2679        Ok(())
2680    }
2681
2682    /// Configure the wave-drain iteration cap (R4.3 / Lock 2.F′). The wave
2683    /// engine aborts a drain after `cap` iterations with a diagnostic panic.
2684    /// Default is `10_000` — high enough to avoid false positives on legitimate
2685    /// fan-in cascades, low enough to surface runtime cycles within seconds.
2686    ///
2687    /// Lower this only when running adversarial / property-based tests that
2688    /// want fast cycle detection. Raise it only with concrete evidence that a
2689    /// legitimate workload needs more iterations than the default — and even
2690    /// then, prefer to tune the workload (per-subgraph batching, etc.) over
2691    /// raising the cap.
2692    ///
2693    /// # Panics
2694    ///
2695    /// Panics if `cap == 0` — a zero cap would abort every wave on the very
2696    /// first iteration, deadlocking any subsequent dispatcher work.
2697    pub fn set_max_batch_drain_iterations(&self, cap: u32) {
2698        assert!(cap > 0, "max_batch_drain_iterations must be > 0");
2699        self.lock_state().max_batch_drain_iterations = cap;
2700    }
2701
2702    /// Send a message UPSTREAM from `node_id` to each of its declared deps
2703    /// (canonical R1.4.1 — Slice F audit, F2 / 2026-05-07).
2704    ///
2705    /// The dispatcher rejects tier-3 (DATA / RESOLVED) and tier-5
2706    /// (COMPLETE / ERROR) per R1.4.1: value and terminal-lifecycle planes
2707    /// are downstream-only. All other tiers (0 START, 1 DIRTY, 2 PAUSE /
2708    /// RESUME, 4 INVALIDATE, 6 TEARDOWN) pass.
2709    ///
2710    /// # Routing per tier
2711    ///
2712    /// - **Tier 0 ([`Message::Start`]):** no-op. START is a per-subscription
2713    ///   handshake, not a routable wire signal — sending it upstream has no
2714    ///   well-defined target.
2715    /// - **Tier 1 ([`Message::Dirty`]):** no-op. The dep's "something
2716    ///   changed" notification is its own [`Self::emit`] / commit
2717    ///   responsibility; ignoring upstream DIRTY hints is safe.
2718    /// - **Tier 2 ([`Message::Pause`] / [`Message::Resume`]):** translates
2719    ///   to [`Self::pause`] / [`Self::resume`] on each dep. Lock id is
2720    ///   forwarded verbatim. Errors from individual deps are accumulated
2721    ///   in the `dep_errors` field of the returned report.
2722    /// - **Tier 4 ([`Message::Invalidate`]):** translates to
2723    ///   [`Self::invalidate`] on each dep. Note: canonical R1.4.2
2724    ///   distinguishes "downstream INVALIDATE" (cache clear + cascade) from
2725    ///   "upstream INVALIDATE" (plain forward, no self-process). The Rust
2726    ///   port v1 SIMPLIFICATION delegates to the same `Core::invalidate`
2727    ///   path — upstream INVALIDATE here DOES clear dep caches and cascade.
2728    ///   If a "plain forward" mode surfaces as a real consumer need, add
2729    ///   `up_with_options`.
2730    /// - **Tier 6 ([`Message::Teardown`]):** translates to
2731    ///   [`Self::teardown`] on each dep. Cascades per the standard
2732    ///   teardown path.
2733    ///
2734    /// # Errors
2735    ///
2736    /// - [`UpError::UnknownNode`] — `node_id` is not registered.
2737    /// - [`UpError::TierForbidden`] — tier 3 or tier 5.
2738    pub fn up(&self, node_id: NodeId, message: Message) -> Result<(), UpError> {
2739        // QA A10 (2026-05-07): check unknown node BEFORE tier rejection
2740        // for consistent error UX — `up(unknown, Data)` and
2741        // `up(unknown, Pause)` both report `UnknownNode` rather than
2742        // splitting on the tier.
2743        let dep_ids: Vec<NodeId> = {
2744            let s = self.lock_state();
2745            let rec = s.nodes.get(&node_id).ok_or(UpError::UnknownNode(node_id))?;
2746            rec.dep_ids_vec()
2747        };
2748        let tier = message.tier();
2749        if tier == 3 || tier == 5 {
2750            return Err(UpError::TierForbidden { tier });
2751        }
2752        for dep_id in dep_ids {
2753            match message {
2754                Message::Pause(lock) => {
2755                    let _ = self.pause(dep_id, lock);
2756                }
2757                Message::Resume(lock) => {
2758                    let _ = self.resume(dep_id, lock);
2759                }
2760                Message::Invalidate => {
2761                    self.invalidate(dep_id);
2762                }
2763                Message::Teardown => {
2764                    self.teardown(dep_id);
2765                }
2766                // Tier 0 START + tier 1 DIRTY: no-op upstream per the
2767                // routing table above.
2768                _ => {}
2769            }
2770        }
2771        Ok(())
2772    }
2773
2774    /// Allocate a unique [`LockId`] for use with [`Self::pause`] /
2775    /// [`Self::resume`]. Convenience for callers that don't already have an
2776    /// id-allocation scheme; user-supplied ids work too.
2777    #[must_use]
2778    pub fn alloc_lock_id(&self) -> LockId {
2779        let mut s = self.lock_state();
2780        let id = LockId::new(s.next_lock_id);
2781        s.next_lock_id += 1;
2782        id
2783    }
2784
2785    /// Access the binding boundary for this Core.
2786    ///
2787    /// Used by `graphrefly-graph` for snapshot serialization (M4.E1 / D166):
2788    /// `Graph::snapshot()` calls `binding.serialize_handle(cache)` to
2789    /// project each node's cached value into portable JSON.
2790    #[must_use]
2791    pub fn binding_ptr(&self) -> &Arc<dyn BindingBoundary> {
2792        &self.binding
2793    }
2794
2795    // -------------------------------------------------------------------
2796    // Registration — unified `register()` (D030, Slice D)
2797    //
2798    // All node kinds (State / Producer / Derived / Dynamic / Operator)
2799    // funnel through `Core::register(NodeRegistration) -> NodeId`. Sugar
2800    // wrappers (`register_state` / `register_producer` / `register_derived`
2801    // / `register_dynamic` / `register_operator`) build a `NodeRegistration`
2802    // and delegate. There is no parallel registration path internally.
2803    // -------------------------------------------------------------------
2804
2805    /// Unified node registration (D030).
2806    ///
2807    /// `reg` describes the node's identity (deps + closure-form fn id OR
2808    /// typed-op + per-kind opts). The kind is **derived from the field
2809    /// shape**, not stored — see [`NodeKind`].
2810    ///
2811    /// Sugar wrappers below ([`Self::register_state`],
2812    /// [`Self::register_producer`], [`Self::register_derived`],
2813    /// [`Self::register_dynamic`], [`Self::register_operator`]) build the
2814    /// registration for the common kinds and delegate here. Direct callers
2815    /// that need uncommon combinations (e.g., a partial-true derived) can
2816    /// invoke this method directly.
2817    ///
2818    /// # Errors
2819    ///
2820    /// Errors are returned in evaluation order — earlier phases short-circuit
2821    /// later ones, so a single registration produces at most one variant.
2822    ///
2823    /// **Phase 1 — lock-released, side-effect-free validation:**
2824    /// - [`RegisterError::OperatorWithoutDeps`] — `reg` carries an op but
2825    ///   `deps` is empty. Operator nodes need at least one dep — for
2826    ///   subscription-managed combinators with no declared deps, use
2827    ///   [`Self::register_producer`] instead.
2828    /// - [`RegisterError::InitialOnlyForStateNodes`] — `reg.opts.initial`
2829    ///   is non-sentinel for a non-state shape (deps non-empty, or
2830    ///   fn_or_op present). State nodes are the only kind with an initial
2831    ///   cache.
2832    ///
2833    /// **Phase 2 — operator scratch construction (lock-released):**
2834    /// - [`RegisterError::OperatorSeedSentinel`] — `reg` carries `Op(Scan)`
2835    ///   / `Op(Reduce)` with a `NO_HANDLE` seed. R2.5.3 — stateful folders
2836    ///   must have a real seed.
2837    ///
2838    /// **Phase 3 — state-lock validation (folded with insertion under a
2839    /// single lock acquisition per /qa F1 to prevent TOCTOU between
2840    /// validation and `nodes.insert`):**
2841    /// - [`RegisterError::UnknownDep`] — any element of `reg.deps` is not
2842    ///   a registered node id.
2843    /// - [`RegisterError::TerminalDep`] — a dep is terminal (COMPLETE /
2844    ///   ERROR) AND not resubscribable — would create a permanent wedge.
2845    ///
2846    /// All errors are construction-time invariants — the dispatcher
2847    /// rejects the registration before any reactive state is created.
2848    /// On `Err`, no node has been added and any handle retains taken on
2849    /// the way in (operator scratch seed retains via
2850    /// [`BindingBoundary::retain_handle`]) have been released
2851    /// lock-released — see [`ScratchReleaseGuard`] for the RAII
2852    /// discipline that covers both early-return AND unwind paths.
2853    /// `Last { default }` retains its `default` handle on the same
2854    /// release path.
2855    #[allow(clippy::too_many_lines)]
2856    pub fn register(&self, reg: NodeRegistration) -> Result<NodeId, RegisterError> {
2857        let NodeRegistration {
2858            deps,
2859            fn_or_op,
2860            opts,
2861        } = reg;
2862        let NodeOpts {
2863            initial,
2864            equals,
2865            partial,
2866            is_dynamic,
2867            pausable,
2868            replay_buffer,
2869        } = opts;
2870
2871        // Derive the field shape from fn_or_op + deps.
2872        let (fn_id, op) = match fn_or_op {
2873            Some(NodeFnOrOp::Fn(f)) => (Some(f), None),
2874            Some(NodeFnOrOp::Op(o)) => (None, Some(o)),
2875            None => (None, None),
2876        };
2877
2878        // Phase 1 — lock-released, side-effect-free validation. Errors
2879        // here return BEFORE any handle retain is taken.
2880        //
2881        //   - State (no deps + no fn + no op) is the only kind with `initial`.
2882        //   - Dynamic flag only meaningful when fn + non-empty deps.
2883        //   - Operator (op present) must have deps (P9: operator without deps
2884        //     would skip activation — use a producer instead).
2885        let is_state_shape = deps.is_empty() && fn_id.is_none() && op.is_none();
2886        if op.is_some() && deps.is_empty() {
2887            return Err(RegisterError::OperatorWithoutDeps);
2888        }
2889        if initial != NO_HANDLE && !is_state_shape {
2890            return Err(RegisterError::InitialOnlyForStateNodes);
2891        }
2892
2893        // Phase 2 — build per-operator scratch struct (may take handle
2894        // retains via `binding.retain_handle` for Scan/Reduce/Last seed).
2895        // Lock-released per Slice E (D045) handshake discipline. Returns
2896        // `OperatorSeedSentinel` BEFORE retain so an Err leaves no
2897        // dangling handles.
2898        let scratch = match op {
2899            Some(operator_op) => self.make_op_scratch(operator_op)?,
2900            None => None,
2901        };
2902
2903        // Wrap scratch in an RAII guard immediately after Phase 2. From
2904        // here on, ANY early return / unwind path correctly releases the
2905        // scratch's handle retains via `OperatorScratch::release_handles`
2906        // (Slice H /qa F2 — defense against panics between Phase 2 and
2907        // Phase 3 cleanup branch). Lock-released because the guard is
2908        // declared BEFORE `lock_state()` below — variable destruction
2909        // order is reverse declaration order, so the `MutexGuard` drops
2910        // first on any return path.
2911        let scratch_guard = ScratchReleaseGuard::new(scratch, &*self.binding);
2912
2913        // Phase 3 — state-lock-required validation, FOLDED with insertion
2914        // under a single `lock_state()` acquisition per /qa F1. The
2915        // pre-/qa version split this into two acquisitions (one for
2916        // validation, one for `alloc_node_id` + `nodes.insert`), opening
2917        // a TOCTOU window where a concurrent `Core::complete(dep)` on a
2918        // non-resubscribable dep could slip in and recreate the wedge
2919        // `TerminalDep` was designed to prevent. Single locked region
2920        // closes the gap.
2921        let mut s = self.lock_state();
2922
2923        for &dep in &deps {
2924            if !s.nodes.contains_key(&dep) {
2925                return Err(RegisterError::UnknownDep(dep));
2926            }
2927        }
2928        // Slice F audit (2026-05-07): mirror `set_deps`'s `TerminalDep`
2929        // rejection at registration time. Adding a non-resubscribable
2930        // terminal node as a dep at registration creates a permanent wedge.
2931        for &dep in &deps {
2932            let dep_rec = s.require_node(dep);
2933            if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
2934                return Err(RegisterError::TerminalDep(dep));
2935            }
2936        }
2937
2938        // Validation passed — install. Take scratch out of the guard
2939        // (disarms the release-on-drop) and continue using `s`.
2940        let installed_scratch = scratch_guard.take();
2941
2942        let id = s.alloc_node_id();
2943
2944        // `tracked`: Static derived + Operator track all deps; Dynamic
2945        // starts empty and fills via fn return; State / Producer have no
2946        // deps so tracked is empty.
2947        let tracked: HashSet<usize> = if op.is_some() {
2948            (0..deps.len()).collect()
2949        } else if is_dynamic {
2950            HashSet::new()
2951        } else if fn_id.is_some() && !deps.is_empty() {
2952            // Static derived
2953            (0..deps.len()).collect()
2954        } else {
2955            HashSet::new()
2956        };
2957
2958        let dep_records: Vec<DepRecord> = deps.iter().map(|&d| DepRecord::new(d)).collect();
2959
2960        // §10 perf (D047): compute topo_rank = 1 + max(dep ranks).
2961        let topo_rank = if deps.is_empty() {
2962            0
2963        } else {
2964            deps.iter()
2965                .filter_map(|&d| s.nodes.get(&d).map(|r| r.topo_rank))
2966                .max()
2967                .unwrap_or(0)
2968                .saturating_add(1)
2969        };
2970
2971        let rec = NodeRecord {
2972            dep_records,
2973            fn_id,
2974            op,
2975            is_dynamic,
2976            equals,
2977            cache: initial,
2978            has_fired_once: initial != NO_HANDLE,
2979            subscribers: HashMap::new(),
2980            subscribers_revision: 0,
2981            tracked,
2982            dirty: false,
2983            involved_this_wave: false,
2984            pause_state: PauseState::Active,
2985            pausable,
2986            replay_buffer_cap: replay_buffer,
2987            replay_buffer: VecDeque::new(),
2988            terminal: None,
2989            has_received_teardown: false,
2990            resubscribable: false,
2991            meta_companions: Vec::new(),
2992            partial,
2993            topo_rank,
2994            received_mask: 0,
2995            involved_mask: 0,
2996            op_scratch: installed_scratch,
2997        };
2998        s.nodes.insert(id, rec);
2999        s.children.insert(id, HashSet::new());
3000        for &dep in &deps {
3001            s.children.entry(dep).or_default().insert(id);
3002        }
3003        // Slice Y1 (D3 / D090 — P12 fix, 2026-05-08): maintain partition
3004        // membership BEFORE dropping the state lock. Closes the
3005        // eventual-consistency window where a concurrent thread observed
3006        // the new node in `s.nodes` / new edges in `s.children` but the
3007        // registry hadn't unioned the partition yet. Today benign
3008        // (`partition_of` is debug-only); under Y1's wave engine
3009        // migration `lock_for(node)` consumes registry state on the hot
3010        // path, and the window means `lock_for` could resolve to a
3011        // partition that's been topologically unioned in `s.children`
3012        // but not yet in `registry`.
3013        //
3014        // **Lock-discipline invariant:** `state lock → registry mutex`
3015        // (one-way; never registry → state). Registry mutex is
3016        // uncontended in the X5 substrate — the only acquisition sites
3017        // are this one + `Core::set_deps` + the read-only accessors
3018        // `partition_count`/`partition_of` and (Y1+) `lock_for` — none
3019        // of which take the state lock — so the inner critical section
3020        // adds negligible latency.
3021        {
3022            let mut reg = self.registry.lock();
3023            reg.ensure_registered(id);
3024            for &dep in &deps {
3025                reg.union_nodes(id, dep);
3026            }
3027        }
3028        drop(s);
3029        self.fire_topology_event(&crate::topology::TopologyEvent::NodeRegistered(id));
3030        Ok(id)
3031    }
3032
3033    /// Sugar over [`Self::register`] — register a state node. `initial`
3034    /// may be [`NO_HANDLE`] to start sentinel.
3035    ///
3036    /// `partial` is accepted for surface consistency (D019); for state
3037    /// nodes it has no effect (state nodes don't fire fn).
3038    ///
3039    /// # Errors
3040    ///
3041    /// State registration is structurally simple — no deps, no op — so
3042    /// the only reachable variant is none in practice. Returns
3043    /// [`Result`] for surface consistency with [`Self::register`].
3044    pub fn register_state(
3045        &self,
3046        initial: HandleId,
3047        partial: bool,
3048    ) -> Result<NodeId, RegisterError> {
3049        self.register(NodeRegistration {
3050            deps: Vec::new(),
3051            fn_or_op: None,
3052            opts: NodeOpts {
3053                initial,
3054                partial,
3055                ..NodeOpts::default()
3056            },
3057        })
3058    }
3059
3060    /// Sugar over [`Self::register`] — register a producer node (D031,
3061    /// Slice D). No deps; fn fires once on first subscribe; cleanup runs
3062    /// via [`BindingBoundary::producer_deactivate`] when the last
3063    /// subscriber unsubscribes.
3064    ///
3065    /// The fn body uses the binding's `ProducerCtx`-equivalent helper
3066    /// (see `graphrefly-operators::producer`) to subscribe to other Core
3067    /// nodes — the zip / concat / race / takeUntil pattern.
3068    ///
3069    /// # Errors
3070    ///
3071    /// Producer registration has no user-supplied deps, so structurally
3072    /// none of [`RegisterError`]'s variants are reachable. Returns
3073    /// [`Result`] for surface consistency with [`Self::register`].
3074    pub fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
3075        self.register(NodeRegistration {
3076            deps: Vec::new(),
3077            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
3078            opts: NodeOpts {
3079                // Producers have no deps — the first-run gate is degenerate.
3080                partial: true,
3081                ..NodeOpts::default()
3082            },
3083        })
3084    }
3085
3086    /// Sugar over [`Self::register`] — register a derived (static) node.
3087    /// `partial` controls the R2.5.3 first-run gate (D011).
3088    ///
3089    /// # Errors
3090    ///
3091    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
3092    ///   registered.
3093    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
3094    ///   resubscribable.
3095    pub fn register_derived(
3096        &self,
3097        deps: &[NodeId],
3098        fn_id: FnId,
3099        equals: EqualsMode,
3100        partial: bool,
3101    ) -> Result<NodeId, RegisterError> {
3102        self.register(NodeRegistration {
3103            deps: deps.to_vec(),
3104            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
3105            opts: NodeOpts {
3106                equals,
3107                partial,
3108                ..NodeOpts::default()
3109            },
3110        })
3111    }
3112
3113    /// Sugar over [`Self::register`] — register a dynamic node (fn
3114    /// declares its actually-tracked dep indices per fire). `partial`
3115    /// controls the R2.5.3 first-run gate (D011).
3116    ///
3117    /// # Errors
3118    ///
3119    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
3120    ///   registered.
3121    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
3122    ///   resubscribable.
3123    pub fn register_dynamic(
3124        &self,
3125        deps: &[NodeId],
3126        fn_id: FnId,
3127        equals: EqualsMode,
3128        partial: bool,
3129    ) -> Result<NodeId, RegisterError> {
3130        self.register(NodeRegistration {
3131            deps: deps.to_vec(),
3132            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
3133            opts: NodeOpts {
3134                equals,
3135                partial,
3136                is_dynamic: true,
3137                ..NodeOpts::default()
3138            },
3139        })
3140    }
3141
3142    /// Build a fresh [`OperatorScratch`](crate::op_state::OperatorScratch)
3143    /// box for an operator variant, taking any required handle retains.
3144    /// Shared between `register_operator` (initial install),
3145    /// `reset_for_fresh_lifecycle` (resubscribable terminal-cycle
3146    /// re-install), and Phase G of `Subscription::Drop` (D-α
3147    /// resubscribable + non-terminal deactivate re-install).
3148    ///
3149    /// # Errors
3150    ///
3151    /// Returns [`RegisterError::OperatorSeedSentinel`] if `op` is `Scan`
3152    /// / `Reduce` with a [`NO_HANDLE`] seed (R2.5.3 — stateful folders
3153    /// must have a real seed). Refcount discipline: the seed-sentinel
3154    /// check happens BEFORE [`BindingBoundary::retain_handle`], so an
3155    /// `Err` leaves no handles dangling.
3156    fn make_op_scratch(
3157        &self,
3158        op: OperatorOp,
3159    ) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
3160        Self::make_op_scratch_with_binding(&*self.binding, op)
3161    }
3162
3163    /// Associated-function variant of [`Self::make_op_scratch`] that
3164    /// takes the binding explicitly. Used by call sites that have a
3165    /// `&dyn BindingBoundary` but not a `&Core` (notably
3166    /// [`Subscription::Drop`]'s Phase G, which holds only a
3167    /// `Weak<Mutex<CoreState>>` and operates on `s.binding`).
3168    pub(crate) fn make_op_scratch_with_binding(
3169        binding: &dyn BindingBoundary,
3170        op: OperatorOp,
3171    ) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
3172        use crate::op_state::{
3173            DistinctState, LastState, PairwiseState, ReduceState, ScanState, SkipState, TakeState,
3174            TakeWhileState,
3175        };
3176        // Slice H (2026-05-07): Scan/Reduce seed-sentinel checks happen
3177        // BEFORE retain_handle so an Err return leaves no handles dangling.
3178        //
3179        // Slice H /qa F13 (2026-05-07): for retaining variants, allocate
3180        // the `Box<State>` BEFORE calling `binding.retain_handle`. If
3181        // `Box::new` panics (e.g., OOM-as-panic), no retain has happened
3182        // yet — no leak. If `retain_handle` panics after Box succeeds,
3183        // the `Box<State>` is dropped on unwind; State has no handle yet
3184        // (we haven't touched the registry refcount), so still no leak.
3185        // Caller wraps the returned scratch in `ScratchReleaseGuard` to
3186        // cover panics AFTER make_op_scratch returns.
3187        match op {
3188            OperatorOp::Scan { seed, .. } => {
3189                if seed == NO_HANDLE {
3190                    return Err(RegisterError::OperatorSeedSentinel);
3191                }
3192                let state = Box::new(ScanState { acc: seed });
3193                binding.retain_handle(seed);
3194                Ok(Some(state))
3195            }
3196            OperatorOp::Reduce { seed, .. } => {
3197                if seed == NO_HANDLE {
3198                    return Err(RegisterError::OperatorSeedSentinel);
3199                }
3200                let state = Box::new(ReduceState { acc: seed });
3201                binding.retain_handle(seed);
3202                Ok(Some(state))
3203            }
3204            OperatorOp::DistinctUntilChanged { .. } => Ok(Some(Box::new(DistinctState::default()))),
3205            OperatorOp::Pairwise { .. } => Ok(Some(Box::new(PairwiseState::default()))),
3206            OperatorOp::Take { .. } => Ok(Some(Box::new(TakeState::default()))),
3207            OperatorOp::Skip { .. } => Ok(Some(Box::new(SkipState::default()))),
3208            OperatorOp::TakeWhile { .. } => Ok(Some(Box::new(TakeWhileState))),
3209            OperatorOp::Last { default } => {
3210                let state = Box::new(LastState {
3211                    latest: NO_HANDLE,
3212                    default,
3213                });
3214                if default != NO_HANDLE {
3215                    binding.retain_handle(default);
3216                }
3217                Ok(Some(state))
3218            }
3219            OperatorOp::TapFirst { .. } => {
3220                Ok(Some(Box::new(crate::op_state::TapFirstState::default())))
3221            }
3222            OperatorOp::Settle { .. } => {
3223                Ok(Some(Box::new(crate::op_state::SettleState::default())))
3224            }
3225            OperatorOp::Map { .. }
3226            | OperatorOp::Filter { .. }
3227            | OperatorOp::Combine { .. }
3228            | OperatorOp::WithLatestFrom { .. }
3229            | OperatorOp::Merge
3230            | OperatorOp::Tap { .. }
3231            | OperatorOp::Valve => Ok(None),
3232        }
3233    }
3234
3235    /// Sugar over [`Self::register`] — register a built-in operator node
3236    /// (Slice C-1, D009; D026 generic scratch). The operator dispatch path
3237    /// lives in `fire_operator`; `op` selects which per-operator FFI
3238    /// method on [`BindingBoundary`] gets called per fire.
3239    ///
3240    /// For stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
3241    /// [`Last`] with a default), the seed/default handle is captured
3242    /// into the appropriate
3243    /// [`OperatorScratch`](crate::op_state::OperatorScratch) struct
3244    /// stored at [`NodeRecord::op_scratch`], and Core takes one retain
3245    /// share via [`BindingBoundary::retain_handle`].
3246    ///
3247    /// # Errors
3248    ///
3249    /// - [`RegisterError::OperatorWithoutDeps`] — `deps` is empty (use
3250    ///   [`Self::register_producer`] instead).
3251    /// - [`RegisterError::OperatorSeedSentinel`] — `op` is
3252    ///   [`OperatorOp::Scan`] / [`OperatorOp::Reduce`] with a
3253    ///   [`NO_HANDLE`] seed.
3254    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
3255    ///   registered.
3256    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
3257    ///   resubscribable.
3258    pub fn register_operator(
3259        &self,
3260        deps: &[NodeId],
3261        op: OperatorOp,
3262        opts: OperatorOpts,
3263    ) -> Result<NodeId, RegisterError> {
3264        self.register(NodeRegistration {
3265            deps: deps.to_vec(),
3266            fn_or_op: Some(NodeFnOrOp::Op(op)),
3267            opts: NodeOpts {
3268                equals: opts.equals,
3269                partial: opts.partial,
3270                ..NodeOpts::default()
3271            },
3272        })
3273    }
3274
3275    // -------------------------------------------------------------------
3276    // Subscription
3277    // -------------------------------------------------------------------
3278
3279    /// Subscribe a sink to a node. Returns a [`Subscription`] handle —
3280    /// dropping the handle unsubscribes the sink. Per §10.12, no manual
3281    /// `unsubscribe(node, id)` call is required.
3282    ///
3283    /// Push-on-subscribe (R1.2.3, R2.2.3 step 4): the sink is registered AFTER
3284    /// the START handshake fires. The handshake contents depend on node
3285    /// state:
3286    /// - Sentinel cache + live (non-terminal): `[START]`
3287    /// - Cached + live: `[START, DATA(handle)]`
3288    ///
3289    /// Subscribe-after-terminal semantics (canonical R2.2.7.a / R2.2.7.b,
3290    /// D118 2026-05-10):
3291    /// - **Resubscribable + terminal** (any TEARDOWN state): the subscribe
3292    ///   call first **resets** the node — clears `terminal`,
3293    ///   `has_fired_once`, `has_received_teardown`, all `dep_handles` to
3294    ///   `NO_HANDLE`, all `dep_terminals` to `None`, drains the pause
3295    ///   lockset, clears the replay buffer. The new subscriber receives a
3296    ///   fresh `[START]` (cache survives for state nodes per R2.2.8;
3297    ///   sentinel for compute). The `wipe_ctx` cleanup hook fires
3298    ///   lock-released so binding-side `ctx.store` starts fresh.
3299    /// - **Non-resubscribable + terminal** (any TEARDOWN state): the
3300    ///   subscribe is rejected — `try_subscribe` returns
3301    ///   [`SubscribeError::TornDown`]; this method (the panic-on-error
3302    ///   variant) panics with the diagnostic.
3303    ///
3304    /// Activation (R2.2.3 step 5): if this is the first subscriber and the
3305    /// node is a derived/dynamic compute, recursively activate deps so their
3306    /// cached handles fill our `dep_handles`.
3307    ///
3308    /// # Panics
3309    ///
3310    /// Panics if:
3311    /// - Subscribing would violate the Phase H+ ascending partition-order
3312    ///   invariant ([`SubscribeError::PartitionOrderViolation`]).
3313    /// - The node is non-resubscribable AND has terminated
3314    ///   ([`SubscribeError::TornDown`], R2.2.7.b).
3315    #[allow(clippy::needless_pass_by_value)] // Sink is `Arc<dyn Fn>`; we clone for the subscribers map and call it directly. Taking by value matches the ergonomics callers expect.
3316    pub fn subscribe(&self, node_id: NodeId, sink: Sink) -> Subscription {
3317        match self.try_subscribe(node_id, sink) {
3318            Ok(sub) => sub,
3319            Err(e) => panic!("{e}"),
3320        }
3321    }
3322
3323    /// Fallible subscribe. Returns `Err` on:
3324    /// - Partition order violation (Phase H+ STRICT, D115) — caller defers.
3325    /// - Non-resubscribable terminal node (R2.2.7.b, D118) — caller skips.
3326    ///
3327    /// Used by `subscribe` (unwraps both errors as panics) and producer-
3328    /// pattern operator sinks (match on variant).
3329    #[allow(clippy::needless_pass_by_value)]
3330    pub fn try_subscribe(
3331        &self,
3332        node_id: NodeId,
3333        sink: Sink,
3334    ) -> Result<Subscription, SubscribeError> {
3335        // Subscribe protocol (Slice E rework, post-handshake-reentry-lift):
3336        //
3337        // 1. Acquire `wave_owner` first (re-entrant; same-thread passes
3338        //    through, cross-thread blocks). This is the cross-thread
3339        //    serialization point that preserves R1.3.5.a happens-after
3340        //    ordering across the lock-released handshake fire.
3341        // 2. Acquire state lock briefly: alloc sub_id, run resubscribable
3342        //    reset if applicable, snapshot handshake state, install sink
3343        //    in `subscribers`. Drop state lock.
3344        // 3. Fire handshake LOCK-RELEASED. Per-tier slices (R1.3.5.a):
3345        //    `[Start]` / `[Data(cache)]?` / `[Complete]?` / `[Error(h)]?`
3346        //    / `[Teardown]?`. Empty tiers are skipped. Sink callbacks
3347        //    may re-enter Core freely — same-thread re-entry passes
3348        //    through `wave_owner` reentrantly.
3349        // 4. Run activation under `run_wave` if needed (first subscriber
3350        //    on a non-state node).
3351        // 5. Drop `wave_owner`.
3352        //
3353        // Race-fix discipline: the sink is installed in `subscribers`
3354        // BEFORE the state lock drops, so concurrent threads that
3355        // acquire `wave_owner` after our scope sees the sink already
3356        // registered. Cross-thread emits block on `wave_owner` until
3357        // we drop it, ensuring all our handshake calls land before
3358        // any concurrent wave's flush observes the sink.
3359
3360        // Acquire the partition's `wave_owner` first — cross-thread
3361        // serialization point. Per Slice Y1 / Phase E (2026-05-08),
3362        // subscribe routes through the per-partition lock instead of
3363        // a Core-global one. Subscribe touches only `node_id`'s
3364        // partition (activation cascade stays within the partition
3365        // because dep edges are unioned). `partition_wave_owner_lock_arc`
3366        // does retry-validate against concurrent union/split.
3367        // `lock_arc()` is `!Send`; same-thread reentrant.
3368        let wave_guard = self.partition_wave_owner_lock_arc(node_id)?;
3369
3370        let (sub_id, tier_slices, needs_activation, did_reset) = {
3371            let mut s = self.lock_state();
3372
3373            // R2.2.7.b (D118, 2026-05-10): non-resubscribable + terminal →
3374            // reject. The stream is permanently over; subscribe gets a
3375            // clean error rather than a confusing replay of past events.
3376            // Operators (zip / concat / race / ...) match on the variant
3377            // and skip the source. Drop the wave_owner before returning so
3378            // a concurrent waiter can proceed.
3379            //
3380            // The `has_received_teardown` flag is irrelevant here —
3381            // `terminal.is_some()` alone gates rejection. The auto-TEARDOWN
3382            // cascade in R2.6.4 / Lock 6.F means torn_down lags terminal
3383            // by at most one wave anyway; a brief mid-wave window where
3384            // `terminal.is_some() && !torn_down` is reachable but the
3385            // rejection decision doesn't depend on which side of that
3386            // window we're in.
3387            let should_reject = {
3388                let rec = s.require_node(node_id);
3389                !rec.resubscribable && rec.terminal.is_some()
3390            };
3391            if should_reject {
3392                drop(s);
3393                drop(wave_guard);
3394                return Err(SubscribeError::TornDown { node: node_id });
3395            }
3396
3397            let sub_id = s.alloc_sub_id();
3398
3399            // R2.2.7.a (D118, 2026-05-10): resubscribable + terminal → reset
3400            // to fresh lifecycle, regardless of TEARDOWN state. The prior
3401            // `!has_received_teardown` guard (Slice A+B F3) conflated
3402            // "TEARDOWN is the cleanup signal of the previous activation
3403            // cycle" with "permanent destruction" — corrected per the
3404            // canonical-spec amendment. `reset_for_fresh_lifecycle` clears
3405            // `has_received_teardown` along with `terminal`,
3406            // `has_fired_once`, dep_records sentinels, pause lockset, and
3407            // replay buffer. `wipe_ctx` fires lock-released after the
3408            // state lock drops so the binding's `ctx.store` starts fresh.
3409            let needs_reset = {
3410                let rec = s.require_node(node_id);
3411                rec.resubscribable && rec.terminal.is_some()
3412            };
3413            if needs_reset {
3414                self.reset_for_fresh_lifecycle(&mut s, node_id);
3415            }
3416
3417            // Snapshot handshake state under lock.
3418            //
3419            // F5 (/qa 2026-05-10): post-D118 R2.2.7.a/b, the snapshot
3420            // ALWAYS sees a non-terminal node here — `should_reject`
3421            // already rejected non-resubscribable terminal above, and
3422            // `needs_reset` cleared resubscribable terminal back to
3423            // `terminal = None` / `has_received_teardown = false`.
3424            // The pre-D118 terminal-replay + teardown-replay branches
3425            // were dead code in this post-D118 sequence and are
3426            // removed.
3427            let (cache, is_state, first_subscriber) = {
3428                let rec = s.require_node(node_id);
3429                debug_assert!(
3430                    rec.terminal.is_none(),
3431                    "R2.2.7.a/b invariant: post-reject/reset, terminal must be None"
3432                );
3433                debug_assert!(
3434                    !rec.has_received_teardown,
3435                    "R2.2.7.a invariant: reset clears has_received_teardown"
3436                );
3437                (rec.cache, rec.is_state(), rec.subscribers.is_empty())
3438            };
3439
3440            // Build per-tier handshake slices. Each non-empty slice is
3441            // fired as a separate sink call (R1.3.5.a tier-split).
3442            let mut tier_slices: SmallVec<[Vec<Message>; 4]> = SmallVec::new();
3443            tier_slices.push(vec![Message::Start]);
3444            if cache != NO_HANDLE {
3445                tier_slices.push(vec![Message::Data(cache)]);
3446            }
3447            // Slice E1 (R2.6.5 / Lock 6.G): replay buffered DATA between
3448            // [Start] (and the cache slice, if present) and any terminal.
3449            // Each buffered handle becomes a separate per-tier slice so
3450            // late subscribers see the historical Data sequence as
3451            // distinct sink calls.
3452            //
3453            // Dedupe: when a cache slice is present and the buffer's last
3454            // entry is the same handle (the typical case — cache always
3455            // tracks the last DATA emitted, and the buffer's tail entry
3456            // is that same DATA), skip the last buffer entry to avoid
3457            // delivering Data(cache) twice. For state nodes whose cache
3458            // survives unsubscribe, the buffer may have older entries
3459            // the cache doesn't reflect; the dedupe only drops the
3460            // single trailing entry that equals cache. (QA A1, 2026-05-07)
3461            let replay_handles: Vec<HandleId> = {
3462                let rec = s.require_node(node_id);
3463                let cap = rec.replay_buffer_cap.unwrap_or(0);
3464                if cap == 0 {
3465                    Vec::new()
3466                } else {
3467                    let mut v: Vec<HandleId> = rec.replay_buffer.iter().copied().collect();
3468                    if cache != NO_HANDLE && v.last() == Some(&cache) {
3469                        v.pop();
3470                    }
3471                    v
3472                }
3473            };
3474            for h in &replay_handles {
3475                tier_slices.push(vec![Message::Data(*h)]);
3476            }
3477
3478            // Install sink BEFORE dropping state lock so any thread that
3479            // subsequently acquires `wave_owner` (after our scope ends)
3480            // sees the sink already registered.
3481            //
3482            // Slice X4 / D2: bump `subscribers_revision` alongside the
3483            // insert so a pending_notify entry opened earlier in the same
3484            // wave (e.g. inside `batch(|| { emit(s, h1); subscribe(s,
3485            // late); emit(s, h2); })`) starts a fresh `PendingBatch` on
3486            // its next `queue_notify` push — making the new sink visible
3487            // to subsequent emits' flush slices, while the pre-subscribe
3488            // batch's snapshot stays frozen so we don't double-deliver
3489            // earlier emits via the wave's flush AND the new sub's
3490            // handshake replay.
3491            {
3492                let rec = s.require_node_mut(node_id);
3493                rec.subscribers.insert(sub_id, sink.clone());
3494                rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
3495            }
3496
3497            let needs_activation = first_subscriber && !is_state;
3498            (sub_id, tier_slices, needs_activation, needs_reset)
3499            // state lock drops here
3500        };
3501
3502        // Slice E2 (R2.4.6 / D055): on resubscribable terminal reset, fire
3503        // `wipe_ctx` LOCK-RELEASED so the binding drops its `NodeCtxState`
3504        // entry (clearing both `store` and any residual `current_cleanup`).
3505        // The new subscriber's first invoke_fn sees a fresh empty store.
3506        // Fires AFTER the state lock drops so the binding's
3507        // `node_ctx.lock()` can't deadlock against Core's state lock — and
3508        // BEFORE the handshake so the wipe is observable before any
3509        // user-visible interaction with the new lifecycle.
3510        if did_reset {
3511            self.binding.wipe_ctx(node_id);
3512        }
3513
3514        // Fire handshake LOCK-RELEASED. Sink may re-enter Core; same-
3515        // thread re-entry passes through `wave_owner` reentrantly.
3516        // Cross-thread emits block at `wave_owner` until our scope ends.
3517        //
3518        // A7 (Slice F, 2026-05-07): per-tier slice fire is wrapped in
3519        // `catch_unwind`. The sink is installed in `subscribers` BEFORE
3520        // the handshake fires (load-bearing — concurrent threads observe
3521        // the sink immediately). If a sink panics on tier N, the panic
3522        // would otherwise unwind out of `subscribe` BEFORE the
3523        // `Subscription` handle is constructed, leaving the sink
3524        // registered in `subscribers` with no user-held handle to drop.
3525        // Subsequent waves' `flush_notifications` would re-fire the
3526        // panicking sink forever.
3527        //
3528        // On panic: remove the sink from `subscribers` (via the
3529        // already-allocated `sub_id`), drop `_wave_guard` cleanly via
3530        // RAII, and resume the unwind so the user observes the panic at
3531        // the call site. Same effect as the user dropping the
3532        // `Subscription` immediately, but pre-emptive.
3533        for slice in &tier_slices {
3534            let sink_clone = sink.clone();
3535            let slice_ref: &[Message] = slice;
3536            let result = catch_unwind(AssertUnwindSafe(|| sink_clone(slice_ref)));
3537            if let Err(panic_payload) = result {
3538                // Remove the orphaned sink. Best-effort: if the node was
3539                // since torn down (e.g., the sink itself called teardown
3540                // before panicking), the entry may already be gone.
3541                {
3542                    let mut s = self.lock_state();
3543                    if let Some(rec) = s.nodes.get_mut(&node_id) {
3544                        rec.subscribers.remove(&sub_id);
3545                        // Slice X4 / D2: keep revision-tracked snapshot
3546                        // discipline consistent with the install site —
3547                        // any pending_notify entry that already absorbed
3548                        // the panicking sink under the post-install
3549                        // revision should start a fresh batch on its
3550                        // next queue_notify push.
3551                        rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
3552                    }
3553                }
3554                std::panic::resume_unwind(panic_payload);
3555            }
3556        }
3557
3558        // Run activation if needed. `run_wave_for(node_id)` acquires
3559        // only the partitions transitively touched from `node_id`
3560        // (downstream cascade + meta-companion teardown reach) — same-
3561        // partition activation re-enters reentrantly. Slice Y1 / Phase E.
3562        if needs_activation {
3563            self.run_wave_for(node_id, |this| {
3564                let mut s = this.lock_state();
3565                this.activate_derived(&mut s, node_id);
3566            });
3567        }
3568
3569        // Phase H+ STRICT (D115): drop the wave_guard BEFORE draining
3570        // deferred producer ops. The deferred ops may need to subscribe
3571        // to sources in lower-numbered partitions — if wave_guard is
3572        // still held, the ascending-order check would reject them.
3573        drop(wave_guard);
3574
3575        // Drain deferred producer ops now that no partitions are held
3576        // on this thread. The drain is a loop because each deferred op
3577        // may itself produce new deferred ops.
3578        self.drain_deferred_producer_ops();
3579
3580        Ok(Subscription {
3581            state: Arc::downgrade(&self.state),
3582            node_id,
3583            sub_id,
3584        })
3585    }
3586
3587    /// Mark `node_id` as resubscribable per R2.2.7. Resubscribable nodes
3588    /// reset their terminal-lifecycle state on a fresh subscribe — see
3589    /// [`Self::subscribe`].
3590    ///
3591    /// Configuration call — must be made before the node has any active
3592    /// subscribers, since changing the policy mid-flight would surprise
3593    /// existing observers.
3594    ///
3595    /// # Panics
3596    ///
3597    /// Panics if the node has subscribers (the policy is observable
3598    /// behavior; changing it after the fact would change semantics for
3599    /// existing sinks).
3600    pub fn set_resubscribable(&self, node_id: NodeId, resubscribable: bool) {
3601        let mut s = self.lock_state();
3602        let rec = s.require_node_mut(node_id);
3603        assert!(
3604            rec.subscribers.is_empty(),
3605            "set_resubscribable: node already has subscribers; \
3606             configure resubscribable before any subscribe call"
3607        );
3608        rec.resubscribable = resubscribable;
3609    }
3610
3611    /// Reset a resubscribable node's terminal-lifecycle state. Called from
3612    /// `subscribe` when a late subscriber arrives at a flagged node.
3613    ///
3614    /// Released: terminal-slot retain (Error handle), all per-dep terminal
3615    /// retains (Error handles), all data_batch retains.
3616    /// Cleared: `terminal`, `has_fired_once`, `has_received_teardown`, all
3617    /// dep_records to sentinel, the pause lockset (any held locks are
3618    /// released — replay buffer drops silently because there are no
3619    /// subscribers to flush to).
3620    fn reset_for_fresh_lifecycle(&self, s: &mut CoreState, node_id: NodeId) {
3621        // Phase 1: collect wave-state handle releases + take the old
3622        // op_scratch + reset other state. Take all mutations under one
3623        // borrow so the post-borrow phases don't re-walk dep_records.
3624        let (prev_op, mut old_scratch, handles_to_release, pause_buffer_payloads) = {
3625            let rec = s.require_node_mut(node_id);
3626            let mut hs = Vec::new();
3627            if let Some(TerminalKind::Error(h)) = rec.terminal {
3628                hs.push(h);
3629            }
3630            for dr in &rec.dep_records {
3631                if let Some(TerminalKind::Error(h)) = dr.terminal {
3632                    hs.push(h);
3633                }
3634                for &h in &dr.data_batch {
3635                    hs.push(h);
3636                }
3637                // Slice C-3 /qa: also release `prev_data`. Prior to this
3638                // collection, `reset_for_fresh_lifecycle` overwrote
3639                // `dr.prev_data = NO_HANDLE` without releasing the old
3640                // handle, leaking one share per dep per resubscribable
3641                // cycle. The leak was masked because no test exercised
3642                // the per-dep `prev_data` retain across a lifecycle
3643                // reset; surfaced by the T1 tightening of
3644                // `last_releases_buffered_latest_on_lifecycle_reset`.
3645                if dr.prev_data != NO_HANDLE {
3646                    hs.push(dr.prev_data);
3647                }
3648            }
3649            // Take pause_state's buffer; collect its payload handles for
3650            // release (they were retained at queue_notify time; buffer
3651            // drops because the new subscriber starts fresh).
3652            let mut pulled = Vec::new();
3653            if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
3654                for msg in buffer.drain(..) {
3655                    if let Some(h) = msg.payload_handle() {
3656                        pulled.push(h);
3657                    }
3658                }
3659            }
3660            // Slice E1: drain the replay buffer too — the new subscriber
3661            // gets a fresh lifecycle and shouldn't see prior emissions.
3662            for h in rec.replay_buffer.drain(..) {
3663                pulled.push(h);
3664            }
3665            // Reset wave / lifecycle state.
3666            rec.terminal = None;
3667            rec.has_fired_once = rec.cache != NO_HANDLE && rec.is_state();
3668            rec.has_received_teardown = false;
3669            for dr in &mut rec.dep_records {
3670                dr.prev_data = NO_HANDLE;
3671                dr.data_batch.clear();
3672                dr.terminal = None;
3673                dr.dirty = false;
3674                dr.involved_this_wave = false;
3675            }
3676            rec.pause_state = PauseState::Active;
3677            rec.involved_this_wave = false;
3678            rec.dirty = false;
3679            // §10.13 perf (D047): reset received_mask — fresh lifecycle
3680            // means all deps are sentinel again.
3681            rec.received_mask = 0;
3682            // §10.3 perf (Slice V1): reset involved_mask.
3683            rec.involved_mask = 0;
3684            // P7 (Slice A close /qa): Dynamic nodes clear `tracked` so
3685            // the post-reset first fire repopulates from the fn's
3686            // returned tracked-deps set.
3687            if rec.is_dynamic {
3688                rec.tracked.clear();
3689            }
3690            // Take the old scratch out so we can release its handles and
3691            // install a fresh one. Operator op is copied for the
3692            // rebuild step below.
3693            let prev_op = rec.op;
3694            let old = std::mem::take(&mut rec.op_scratch);
3695            (prev_op, old, hs, pulled)
3696        };
3697
3698        // Phase 2 (Slice C-3 /qa P1 — RETAIN-BEFORE-RELEASE ordering):
3699        // build the fresh scratch FIRST, taking new retains on any
3700        // seed/default handles. This must run BEFORE Phase 3 releases
3701        // the old scratch's shares — if old `acc` (Scan/Reduce) or old
3702        // `latest` (Last) aliases the new `seed`/`default` (common:
3703        // `fold(seed, x) == seed` interns to the same registry entry),
3704        // releasing the old share first could collapse the binding's
3705        // registry slot to zero (production bindings remove the value
3706        // entry on refcount-zero — see `tests/common/mod.rs:191-204`),
3707        // and a subsequent `retain_handle` on the new seed would bump a
3708        // refcount on a slot whose value has been removed. By taking
3709        // the new retains first, we floor the refcount at ≥1 before
3710        // any release happens.
3711        let new_scratch = match prev_op {
3712            // Slice H: the OperatorOp stored on NodeRecord previously
3713            // passed `make_op_scratch` validation at registration time
3714            // (RegisterError::OperatorSeedSentinel for Scan/Reduce
3715            // sentinel seeds; Last { default: NO_HANDLE } is accepted
3716            // and never errors). Re-running it here on the same op
3717            // value is structurally guaranteed to succeed.
3718            Some(op) => self
3719                .make_op_scratch(op)
3720                .expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time"),
3721            None => None,
3722        };
3723
3724        // Phase 3: NOW release handles owned by the old op_scratch
3725        // (Scan/Reduce acc, Distinct/Pairwise prev, Last latest +
3726        // default). Safe per Phase 2's retain-first floor. The boxed
3727        // value is consumed and dropped after.
3728        if let Some(scratch) = old_scratch.as_mut() {
3729            scratch.release_handles(&*self.binding);
3730        }
3731        drop(old_scratch);
3732
3733        // Phase 3b (D-α, D028 full close, 2026-05-10): drain the
3734        // per-Core `pending_scratch_release` queue populated by Phase G
3735        // on prior resubscribable + non-terminal deactivate cycles.
3736        // Queued boxes' shares may alias the seed/default we just
3737        // retained in Phase 2 (e.g., when scan's `acc` interns to the
3738        // same registry slot as `seed`). Releasing AFTER Phase 2's
3739        // floor keeps the registry slot at refcount ≥1 throughout —
3740        // mirrors the Phase 2/3 retain-before-release ordering. Safe
3741        // even when the queue is empty (no prior deactivations
3742        // happened).
3743        let queued: Vec<Box<dyn crate::op_state::OperatorScratch>> =
3744            std::mem::take(&mut s.pending_scratch_release);
3745        for mut scratch in queued {
3746            scratch.release_handles(&*self.binding);
3747        }
3748
3749        // Phase 4: install the fresh scratch.
3750        {
3751            let rec = s.require_node_mut(node_id);
3752            rec.op_scratch = new_scratch;
3753        }
3754
3755        // Phase 5: release wave-state handles collected in phase 1.
3756        for h in handles_to_release {
3757            self.binding.release_handle(h);
3758        }
3759        for h in pause_buffer_payloads {
3760            self.binding.release_handle(h);
3761        }
3762    }
3763
3764    /// Activate `root` and any transitive uncached compute deps so their
3765    /// caches fill our dep_handles slots.
3766    ///
3767    /// Slice A close (M1): pure dep-walk + dep_handles population +
3768    /// pending_fires queueing. No `in_tick` management or `drain_and_flush`
3769    /// call — the outer caller (typically `Core::subscribe` via
3770    /// [`Core::run_wave`]) owns the wave lifecycle and drains lock-released
3771    /// around `invoke_fn`.
3772    ///
3773    /// Walk shape:
3774    ///   1. **Discover phase (DFS via Vec stack):** starting at `root`,
3775    ///      walk transitively-needing-activation deps via the `deps`
3776    ///      chain. Build an ordering where each node appears AFTER all
3777    ///      of its uncached compute deps — i.e., reverse topological
3778    ///      among the visited subgraph.
3779    ///   2. **Deliver phase (forward iteration):** for each visited
3780    ///      node in dep-first order, push deps' caches into the node's
3781    ///      `dep_handles` slots. Caches that were sentinel pre-walk are
3782    ///      filled because their parent's fn fires later in the wave's
3783    ///      drain loop and `commit_emission` propagates new caches forward
3784    ///      via `deliver_data_to_consumer` — the same path this method
3785    ///      uses for the initial seed. Adds the node to `pending_fires`
3786    ///      if its tracked-deps gate is satisfied; the wave-engine drain
3787    ///      fires the fn lock-released around `invoke_fn`.
3788    pub(crate) fn activate_derived(&self, s: &mut CoreState, root: NodeId) {
3789        // Phase 1: discover. DFS to collect every compute node reachable
3790        // via deps that doesn't yet have a cache and hasn't fired.
3791        // Record them in dep-first (post-order) so phase 2 can deliver
3792        // caches forward. Frame is `(node_id, finalize)` — `finalize=false`
3793        // means "first visit: push deps then re-push self with finalize=true";
3794        // `finalize=true` means "deps have been expanded, append self to
3795        // `order`."
3796        let mut visited: HashSet<NodeId> = HashSet::new();
3797        let mut order: Vec<NodeId> = Vec::new();
3798        let mut stack: Vec<(NodeId, bool)> = vec![(root, false)];
3799        while let Some((id, finalize)) = stack.pop() {
3800            if finalize {
3801                order.push(id);
3802                continue;
3803            }
3804            if !visited.insert(id) {
3805                continue;
3806            }
3807            stack.push((id, true));
3808            let dep_ids: Vec<NodeId> = s.require_node(id).dep_ids_vec();
3809            for dep_id in dep_ids {
3810                let (dep_is_state, dep_cache, dep_has_fired) = {
3811                    let dep_rec = s.require_node(dep_id);
3812                    (dep_rec.is_state(), dep_rec.cache, dep_rec.has_fired_once)
3813                };
3814                if !dep_is_state
3815                    && dep_cache == NO_HANDLE
3816                    && !dep_has_fired
3817                    && !visited.contains(&dep_id)
3818                {
3819                    stack.push((dep_id, false));
3820                }
3821            }
3822        }
3823
3824        // Phase 2: deliver caches in dep-first order. For each node, walk
3825        // its deps and call `deliver_data_to_consumer` for any with caches.
3826        // Producer nodes (no deps + has fn — Slice D, D031) have no deps
3827        // to walk; queue them directly into `pending_fires` so the wave
3828        // engine fires their fn once on activation.
3829        //
3830        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
3831        // per-thread WaveState. State lock + thread_local borrow are
3832        // independent; deliver_data_to_consumer also writes pending_fires
3833        // via WaveState (no nested with_wave_state borrows here).
3834        for &id in &order {
3835            let (dep_ids, is_producer) = {
3836                let rec = s.require_node(id);
3837                (rec.dep_ids_vec(), rec.is_producer())
3838            };
3839            if is_producer {
3840                crate::batch::with_wave_state(|ws| {
3841                    ws.pending_fires.insert(id);
3842                });
3843                continue;
3844            }
3845            for (i, dep_id) in dep_ids.iter().copied().enumerate() {
3846                let dep_cache = s.require_node(dep_id).cache;
3847                if dep_cache != NO_HANDLE {
3848                    self.deliver_data_to_consumer(s, id, i, dep_cache);
3849                }
3850            }
3851        }
3852    }
3853
3854    // -------------------------------------------------------------------
3855    // Emission entry point
3856    // -------------------------------------------------------------------
3857
3858    /// Set a state node's value. Triggers a wave (DIRTY → DATA/RESOLVED →
3859    /// fn fires for downstream).
3860    ///
3861    /// Silent no-op if the node has already terminated (R1.3.4). The handle
3862    /// passed in is still released by the caller's binding-side intern path
3863    /// — no implicit retain is consumed when the call short-circuits.
3864    ///
3865    /// # Panics
3866    ///
3867    /// Panics if `node_id` is not a state node, or if `new_handle` is
3868    /// [`NO_HANDLE`] (per R1.2.4, sentinel is not a valid DATA payload).
3869    pub fn emit(&self, node_id: NodeId, new_handle: HandleId) {
3870        match self.try_emit(node_id, new_handle) {
3871            Ok(()) => {}
3872            Err(e) => panic!("{e}"),
3873        }
3874    }
3875
3876    /// Fallible emit. Returns `Err` on partition order violation
3877    /// (Phase H+ STRICT, D115). The public `emit` calls this and
3878    /// unwraps; `emit_or_defer` calls this and defers on Err.
3879    pub(crate) fn try_emit(
3880        &self,
3881        node_id: NodeId,
3882        new_handle: HandleId,
3883    ) -> Result<(), PartitionOrderViolation> {
3884        assert!(
3885            new_handle != NO_HANDLE,
3886            "NO_HANDLE is not a valid DATA payload (R1.2.4)"
3887        );
3888        // Validate + terminal short-circuit under a brief lock.
3889        //
3890        // emit() is valid for State and Producer nodes — both are
3891        // intrinsic sources whose values are not derived from declared
3892        // deps. State nodes get emit() from user code; Producer nodes
3893        // get emit() from sink callbacks the producer's build closure
3894        // registered (sink fires → re-enter Core → emit on self).
3895        // Derived / Dynamic / Operator nodes emit via their fn return
3896        // value through fire_fn / fire_operator, NOT via emit().
3897        {
3898            let s = self.lock_state();
3899            let rec = s.require_node(node_id);
3900            assert!(
3901                rec.is_state() || rec.is_producer(),
3902                "emit() is for state or producer nodes only; \
3903                 derived/dynamic/operator emit via their fn return value"
3904            );
3905            if rec.terminal.is_some() {
3906                drop(s);
3907                // Caller's intern share would otherwise leak; cache slot
3908                // ownership doesn't transfer because we're not advancing
3909                // cache. Released lock-released so the binding can't
3910                // deadlock against an internal binding mutex.
3911                self.binding.release_handle(new_handle);
3912                return Ok(());
3913            }
3914        }
3915        // Run wave on `node_id`'s touched partitions. Slice Y1 / Phase E:
3916        // emit cascades only via `s.children`, all unioned with `node_id`'s
3917        // partition by construction (dep edges = union edges). Common case
3918        // is a single-partition acquire — disjoint-partition emits run
3919        // truly parallel under per-partition `wave_owner`.
3920        self.try_run_wave_for(node_id, |this| {
3921            this.commit_emission(node_id, new_handle);
3922        })?;
3923        Ok(())
3924    }
3925
3926    /// Emit or defer to wave-end on partition order violation.
3927    /// For producer-pattern operator sinks. Retains `handle` on defer;
3928    /// the drain releases it after firing (or on discard).
3929    pub fn emit_or_defer(&self, node_id: NodeId, new_handle: HandleId) {
3930        if self.try_emit(node_id, new_handle).is_err() {
3931            self.binding.retain_handle(new_handle);
3932            self.push_deferred_producer_op(DeferredProducerOp::Emit {
3933                node_id,
3934                handle: new_handle,
3935            });
3936        }
3937    }
3938
3939    /// Read a node's current cache. Returns [`NO_HANDLE`] if sentinel.
3940    #[must_use]
3941    pub fn cache_of(&self, node_id: NodeId) -> HandleId {
3942        self.lock_state().require_node(node_id).cache
3943    }
3944
3945    /// Whether the node's fn has fired at least once (compute) OR it has had
3946    /// a non-sentinel value (state).
3947    #[must_use]
3948    pub fn has_fired_once(&self, node_id: NodeId) -> bool {
3949        self.lock_state().require_node(node_id).has_fired_once
3950    }
3951
3952    // -------------------------------------------------------------------
3953    // Read-side inspection helpers (Slice E+, M2)
3954    //
3955    // Non-panicking accessors for graph-layer introspection (`describe()`,
3956    // `observe()`, `node_count()`). All five return Option/empty for
3957    // unknown ids — they're meant to back walks over `node_ids()` where
3958    // the caller already knows the ids are valid, plus debugging /
3959    // dry-run probes that prefer "absence" over "panic".
3960    //
3961    // Keep these strictly read-only: no wave entry, no binding callbacks,
3962    // no lock release. Each takes the state lock once, copies a small
3963    // value, and drops the lock.
3964    // -------------------------------------------------------------------
3965
3966    /// Snapshot of every registered `NodeId` in unspecified order. The
3967    /// order matches `HashMap` iteration over the internal node table —
3968    /// callers that need stable ordering should track names at the
3969    /// `Graph` layer (canonical spec §3.5 namespace).
3970    #[must_use]
3971    pub fn node_ids(&self) -> Vec<NodeId> {
3972        self.lock_state().nodes.keys().copied().collect()
3973    }
3974
3975    /// Total number of nodes registered in this Core.
3976    #[must_use]
3977    pub fn node_count(&self) -> usize {
3978        self.lock_state().nodes.len()
3979    }
3980
3981    /// Returns `Some(kind)` for known nodes, `None` for unknown. Kind is
3982    /// **derived** from the field shape per D030 — see [`NodeKind`].
3983    #[must_use]
3984    pub fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
3985        self.lock_state().nodes.get(&node_id).map(NodeRecord::kind)
3986    }
3987
3988    /// Snapshot of the node's deps in declaration order. Empty for
3989    /// unknown nodes or for state nodes (which have no deps).
3990    #[must_use]
3991    pub fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
3992        self.lock_state()
3993            .nodes
3994            .get(&node_id)
3995            .map(NodeRecord::dep_ids_vec)
3996            .unwrap_or_default()
3997    }
3998
3999    /// Returns `Some(kind)` if the node has terminated (R1.3.4) — the
4000    /// pair `Some(Complete)` / `Some(Error(h))` mirrors the wire message
4001    /// the node emitted. `None` for live nodes or unknown ids.
4002    #[must_use]
4003    pub fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
4004        self.lock_state()
4005            .nodes
4006            .get(&node_id)
4007            .and_then(|r| r.terminal)
4008    }
4009
4010    /// Whether the node has wave-scoped DIRTY pending (a tier-1 message
4011    /// queued but the matching tier-3 settle has not yet flushed).
4012    /// `false` for unknown ids. Mostly useful for `describe()` status
4013    /// classification (R3.6.1 `"dirty"`).
4014    #[must_use]
4015    pub fn is_dirty(&self, node_id: NodeId) -> bool {
4016        self.lock_state()
4017            .nodes
4018            .get(&node_id)
4019            .is_some_and(|r| r.dirty)
4020    }
4021
4022    /// Snapshot of `parent`'s meta companion list (R1.3.9.d / R2.3.3 —
4023    /// the companions added via [`Self::add_meta_companion`]). Empty
4024    /// for unknown ids or for nodes with no companions registered.
4025    ///
4026    /// Used by the graph layer's `signal_invalidate` to filter meta
4027    /// children out of the broadcast (canonical R3.7.2 — meta caches
4028    /// are preserved across graph-wide INVALIDATE).
4029    #[must_use]
4030    pub fn meta_companions_of(&self, parent: NodeId) -> Vec<NodeId> {
4031        self.lock_state()
4032            .nodes
4033            .get(&parent)
4034            .map(|r| r.meta_companions.clone())
4035            .unwrap_or_default()
4036    }
4037
4038    // -------------------------------------------------------------------
4039    // Wave engine — lives in `crate::batch` (Slice C-1 module split;
4040    // Slice A close M1 refactor lifted the binding-callback re-entrance
4041    // restrictions). The methods are still on `Core`; see `batch.rs` for:
4042    //
4043    //   - `run_wave` — wave entry, manages own locking.
4044    //   - `drain_and_flush` — drain phase, lock-released around invoke_fn.
4045    //   - `commit_emission` — lock-released around custom_equals.
4046    //   - `pick_next_fire`, `deliver_data_to_consumer`, `queue_notify`,
4047    //     `flush_notifications` — wave-engine helpers.
4048    // -------------------------------------------------------------------
4049}
4050
4051// -----------------------------------------------------------------------
4052// COMPLETE / ERROR — terminal lifecycle + auto-cascade gating
4053// -----------------------------------------------------------------------
4054
4055impl Core {
4056    /// Emit `[COMPLETE]` (R1.3.4) on `node_id`, marking it terminal. After
4057    /// this call:
4058    ///
4059    /// - Subsequent `Core::emit` on this node is a silent no-op (idempotent
4060    ///   termination).
4061    /// - The node's fn no longer fires.
4062    /// - The node's cache is preserved (last value still observable via
4063    ///   `cache_of`).
4064    /// - Children receive `[COMPLETE]` (tier 5 — bypasses pause buffer).
4065    /// - Auto-cascade gating (Lock 2.B): each child that has all of its
4066    ///   deps in a terminal state auto-emits its own `[COMPLETE]`. ERROR
4067    ///   dominates COMPLETE — if any of a child's deps emitted ERROR, the
4068    ///   child auto-cascades that ERROR instead.
4069    ///
4070    /// Idempotent: calling `complete` on an already-terminal node is a no-op.
4071    ///
4072    /// # Panics
4073    ///
4074    /// Panics if `node_id` is unknown.
4075    pub fn complete(&self, node_id: NodeId) {
4076        match self.try_complete(node_id) {
4077            Ok(()) => {}
4078            Err(e) => panic!("{e}"),
4079        }
4080    }
4081
4082    /// Fallible complete. Returns `Err` on partition order violation.
4083    pub(crate) fn try_complete(&self, node_id: NodeId) -> Result<(), PartitionOrderViolation> {
4084        self.try_emit_terminal(node_id, TerminalKind::Complete)
4085    }
4086
4087    /// Complete or defer to wave-end on partition order violation.
4088    /// For producer-pattern operator sinks.
4089    pub fn complete_or_defer(&self, node_id: NodeId) {
4090        match self.try_complete(node_id) {
4091            Ok(()) => {}
4092            Err(_) => {
4093                self.push_deferred_producer_op(DeferredProducerOp::Complete { node_id });
4094            }
4095        }
4096    }
4097
4098    /// Emit `[ERROR, error_handle]` (R1.3.4) on `node_id`. `error_handle`
4099    /// must resolve to a non-sentinel value (R1.2.5) — the binding side has
4100    /// already interned the error value before this call. Same lifecycle
4101    /// effects as [`Self::complete`]; ERROR dominates COMPLETE in auto-
4102    /// cascade gating.
4103    ///
4104    /// # Panics
4105    ///
4106    /// Panics if `node_id` is unknown or `error_handle == NO_HANDLE`.
4107    pub fn error(&self, node_id: NodeId, error_handle: HandleId) {
4108        match self.try_error(node_id, error_handle) {
4109            Ok(()) => {}
4110            Err(e) => panic!("{e}"),
4111        }
4112    }
4113
4114    /// Fallible error. Returns `Err` on partition order violation.
4115    pub(crate) fn try_error(
4116        &self,
4117        node_id: NodeId,
4118        error_handle: HandleId,
4119    ) -> Result<(), PartitionOrderViolation> {
4120        assert!(
4121            error_handle != NO_HANDLE,
4122            "NO_HANDLE is not a valid ERROR payload (R1.2.5)"
4123        );
4124        self.try_emit_terminal(node_id, TerminalKind::Error(error_handle))?;
4125        // The caller's intern share for `error_handle` is NOT transferred
4126        // to any slot — `terminate_node` takes its OWN retain for every
4127        // populated `terminal` and `dep_terminals` slot. Release the
4128        // caller's share here (mirrors `Core::emit`'s short-circuit
4129        // release on terminal). Without this, every `error()` call leaks
4130        // one binding-side handle ref. Slice A-bigger /qa item D fix.
4131        self.binding.release_handle(error_handle);
4132        Ok(())
4133    }
4134
4135    /// Error or defer to wave-end on partition order violation.
4136    /// For producer-pattern operator sinks. Retains `handle` on defer;
4137    /// the drain releases it after firing (or on discard).
4138    pub fn error_or_defer(&self, node_id: NodeId, error_handle: HandleId) {
4139        if self.try_error(node_id, error_handle).is_err() {
4140            self.binding.retain_handle(error_handle);
4141            self.push_deferred_producer_op(DeferredProducerOp::Error {
4142                node_id,
4143                handle: error_handle,
4144            });
4145        }
4146    }
4147
4148    fn try_emit_terminal(
4149        &self,
4150        node_id: NodeId,
4151        terminal: TerminalKind,
4152    ) -> Result<(), PartitionOrderViolation> {
4153        {
4154            let s = self.lock_state();
4155            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
4156        }
4157        // Wave on `node_id`'s touched partitions (Slice Y1 / Phase E).
4158        // COMPLETE / ERROR cascade follows `s.children` (in-partition by
4159        // union-find construction). The thunk acquires its own state lock
4160        // to queue the cascade.
4161        self.try_run_wave_for(node_id, |this| {
4162            let mut s = this.lock_state();
4163            this.terminate_node(&mut s, node_id, terminal);
4164        })
4165    }
4166
4167    /// Set the node's terminal slot, queue the wire message, and cascade to
4168    /// children. Idempotent on already-terminal node (no-op).
4169    ///
4170    /// Iterative implementation (Slice A-bigger, M1-close): a work-queue
4171    /// drives the cascade so deep linear chains don't overflow the OS
4172    /// thread stack. Mirrors `path_from_to`'s explicit-stack pattern.
4173    fn terminate_node(&self, s: &mut CoreState, node_id: NodeId, terminal: TerminalKind) {
4174        let mut work: Vec<(NodeId, TerminalKind)> = vec![(node_id, terminal)];
4175        while let Some((id, t)) = work.pop() {
4176            if s.require_node(id).terminal.is_some() {
4177                continue; // Idempotent — already terminal.
4178            }
4179            // Take a refcount share for the terminal slot so the error
4180            // handle outlives the binding-side intern's transient share.
4181            if let TerminalKind::Error(h) = t {
4182                self.binding.retain_handle(h);
4183            }
4184            // Slice E2 /qa Q2(b) (D069): if a resubscribable node is
4185            // terminating with no live subscribers, queue eager
4186            // `wipe_ctx` for the wave's lock-released drain. This is the
4187            // mutually-exclusive complement of the `Subscription::Drop`
4188            // wipe site: when the LAST sub drops first then terminate
4189            // fires, subs are empty here and we queue; when terminate
4190            // fires WITH subs still alive, we DON'T queue (subs not
4191            // empty), and `Subscription::Drop` will fire wipe directly
4192            // when those subs eventually drop. Either way, exactly one
4193            // wipe fires per terminal lifecycle.
4194            let queue_wipe = {
4195                let rec = s.require_node(id);
4196                rec.resubscribable && rec.subscribers.is_empty()
4197            };
4198            s.require_node_mut(id).terminal = Some(t);
4199            // Q-beyond Sub-slice 2 + 3 (D108, 2026-05-09): pending_fires
4200            // and pending_wipes both live on per-thread WaveState. Single
4201            // borrow handles the queue-wipe push and the pending_fires
4202            // remove.
4203            crate::batch::with_wave_state(|ws| {
4204                if queue_wipe {
4205                    ws.pending_wipes.push(id);
4206                }
4207                // Drain pending fires for this node — fn won't fire on a
4208                // terminal node.
4209                ws.pending_fires.remove(&id);
4210            });
4211            // R1.3.8.b / Slice F (A3, 2026-05-07): if this node was paused
4212            // when terminating (the canonical case is the R1.3.8.c overflow
4213            // ERROR synthesis path), drain the pause buffer and release
4214            // each payload's queue_notify-time retain. Without this, the
4215            // buffer leaks one share per buffered DATA/RESOLVED/INVALIDATE.
4216            // Subscribers receive the terminal directly via the cascade
4217            // below (tier-5 bypasses the pause buffer); the buffered
4218            // content is moot post-terminal.
4219            let drained: Vec<HandleId> = {
4220                let rec = s.require_node_mut(id);
4221                let mut drained: Vec<HandleId> = Vec::new();
4222                if rec.pause_state.is_paused() {
4223                    // Take the buffered messages out, then collapse the
4224                    // pause state to Active so subsequent code observes a
4225                    // clean lifecycle. Idempotent on Active (no-op).
4226                    let prev = std::mem::replace(&mut rec.pause_state, PauseState::Active);
4227                    if let PauseState::Paused { buffer, .. } = prev {
4228                        drained.extend(buffer.into_iter().filter_map(Message::payload_handle));
4229                    }
4230                }
4231                // QA A4 (2026-05-07): drain replay buffer on terminate. A
4232                // non-resubscribable terminal ends the lifecycle; without
4233                // this drain the buffer's retains leak until `Drop for
4234                // CoreState`. Resubscribable nodes' replay buffers are
4235                // also drained (when they're hit by a terminal cascade);
4236                // a fresh subscribe rebuilds the buffer from scratch as
4237                // part of `reset_for_fresh_lifecycle`.
4238                drained.extend(rec.replay_buffer.drain(..));
4239                drained
4240            };
4241            for h in drained {
4242                self.binding.release_handle(h);
4243            }
4244            // Queue the wire message (tier 5 — bypasses pause buffer).
4245            let msg = match t {
4246                TerminalKind::Complete => Message::Complete,
4247                TerminalKind::Error(h) => Message::Error(h),
4248            };
4249            self.queue_notify(s, id, msg);
4250            // Cascade to children.
4251            let child_ids: Vec<NodeId> = s
4252                .children
4253                .get(&id)
4254                .map(|c| c.iter().copied().collect())
4255                .unwrap_or_default();
4256            for child_id in child_ids {
4257                let dep_idx = s.require_node(child_id).dep_index_of(id);
4258                let Some(idx) = dep_idx else { continue };
4259                // Mark this child's per-dep terminal slot. Take a retain on
4260                // the error handle for the slot share.
4261                {
4262                    let child = s.require_node_mut(child_id);
4263                    if child.dep_records[idx].terminal.is_some() {
4264                        // Idempotent — child already saw this dep terminate.
4265                        continue;
4266                    }
4267                    child.dep_records[idx].terminal = Some(t);
4268                }
4269                if let TerminalKind::Error(h) = t {
4270                    self.binding.retain_handle(h);
4271                }
4272                // Auto-cascade gating: if all deps now terminal, push child
4273                // onto the work queue with the chosen terminal.
4274                //
4275                // Slice C-1: kinds that opt out of Lock 2.B (currently
4276                // `Operator(Reduce)`) intercept upstream COMPLETE so they
4277                // can emit their accumulator before terminating. Instead of
4278                // cascading, queue the child for fn-fire — `fire_operator`
4279                // sees `dep_records[0].terminal` set and emits the
4280                // appropriate batch (Data(acc) + Complete on COMPLETE,
4281                // Error(h) on ERROR).
4282                let action = {
4283                    let child = s.require_node(child_id);
4284                    if child.terminal.is_some() {
4285                        ChildAction::None // Already terminated — no-op.
4286                    } else if child.all_deps_terminal() {
4287                        if child.skips_auto_cascade() {
4288                            ChildAction::QueueFire
4289                        } else {
4290                            ChildAction::Cascade(pick_cascade_terminal(&child.dep_records))
4291                        }
4292                    } else {
4293                        ChildAction::None
4294                    }
4295                };
4296                match action {
4297                    ChildAction::None => {}
4298                    ChildAction::Cascade(t_child) => {
4299                        work.push((child_id, t_child));
4300                    }
4301                    ChildAction::QueueFire => {
4302                        // Q-beyond Sub-slice 2 (D108, 2026-05-09):
4303                        // pending_fires lives on per-thread WaveState.
4304                        crate::batch::with_wave_state(|ws| {
4305                            ws.pending_fires.insert(child_id);
4306                        });
4307                    }
4308                }
4309            }
4310        }
4311    }
4312}
4313
4314/// Outcome of Lock 2.B child gating in `terminate_node`'s cascade walk.
4315enum ChildAction {
4316    /// No cascade; child is already terminal or not yet all-deps-terminal.
4317    None,
4318    /// Auto-cascade with the picked terminal kind (ERROR dominates COMPLETE).
4319    Cascade(TerminalKind),
4320    /// Queue child for fn-fire instead of cascading. Used by operator
4321    /// kinds that intercept upstream terminal (Operator(Reduce)).
4322    QueueFire,
4323}
4324
4325/// Lock 2.B cascade-terminal selection: ERROR dominates COMPLETE; first
4326/// ERROR seen wins. Caller has already verified all deps are terminal.
4327fn pick_cascade_terminal(dep_records: &[DepRecord]) -> TerminalKind {
4328    for dr in dep_records {
4329        if let Some(TerminalKind::Error(h)) = dr.terminal {
4330            return TerminalKind::Error(h);
4331        }
4332    }
4333    TerminalKind::Complete
4334}
4335
4336// -----------------------------------------------------------------------
4337// TEARDOWN — destruction, with auto-COMPLETE prepend (R2.6.4 / Lock 6.F)
4338// -----------------------------------------------------------------------
4339
4340impl Core {
4341    /// Tear `node_id` down. Per R2.6.4 / Lock 6.F:
4342    ///
4343    /// - **Auto-prepend COMPLETE.** If the node has not yet emitted a
4344    ///   terminal (`COMPLETE` / `ERROR`), `terminate_node` is called with
4345    ///   `Complete` first so subscribers see `[COMPLETE, TEARDOWN]`, not
4346    ///   bare `[TEARDOWN]`. This guarantees a clean end-of-stream signal
4347    ///   to async iterators and other consumers that wait on terminal
4348    ///   delivery.
4349    /// - **Idempotent on duplicate delivery.** The per-node
4350    ///   `has_received_teardown` flag is set on the first call; subsequent
4351    ///   `teardown` calls (or cascade visits from other paths) are silent
4352    ///   no-ops — no second `[COMPLETE, TEARDOWN]` pair to subscribers.
4353    /// - **Cascade downstream.** Each child is recursively torn down. The
4354    ///   child's own COMPLETE auto-cascades from `terminate_node`'s logic
4355    ///   (Lock 2.B); its TEARDOWN comes from this cascade.
4356    ///
4357    /// # Panics
4358    ///
4359    /// Panics if `node_id` is unknown.
4360    pub fn teardown(&self, node_id: NodeId) {
4361        match self.try_teardown(node_id) {
4362            Ok(()) => {}
4363            Err(e) => panic!("{e}"),
4364        }
4365    }
4366
4367    /// Teardown or defer to wave-end on partition order violation.
4368    /// For producer-pattern operator sinks.
4369    pub fn teardown_or_defer(&self, node_id: NodeId) {
4370        match self.try_teardown(node_id) {
4371            Ok(()) => {}
4372            Err(_) => {
4373                self.push_deferred_producer_op(DeferredProducerOp::Callback(Box::new({
4374                    let core = self.clone();
4375                    move || {
4376                        core.teardown(node_id);
4377                    }
4378                })));
4379            }
4380        }
4381    }
4382
4383    fn try_teardown(&self, node_id: NodeId) -> Result<(), PartitionOrderViolation> {
4384        {
4385            let s = self.lock_state();
4386            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
4387        }
4388        let torn_down: Arc<Mutex<Vec<NodeId>>> = Arc::new(Mutex::new(Vec::new()));
4389        let torn_down_for_wave = torn_down.clone();
4390        // TEARDOWN cascade follows `s.children` AND `meta_companions`
4391        // (R1.3.9.d) — meta-companions can cross partitions. Slice Y1 /
4392        // Phase E `compute_touched_partitions(node_id)` (called by
4393        // `run_wave_for`) walks both axes so the wave acquires every
4394        // partition reachable via the cascade.
4395        self.try_run_wave_for(node_id, move |this| {
4396            let mut s = this.lock_state();
4397            let collected = this.teardown_inner(&mut s, node_id);
4398            torn_down_for_wave.lock().extend(collected);
4399        })?;
4400        // Fire NodeTornDown for every cascaded id (root + metas +
4401        // downstream consumers that auto-cascaded). Outside the state
4402        // lock, matching fire_topology_event discipline.
4403        let ids = std::mem::take(&mut *torn_down.lock());
4404        for id in ids {
4405            self.fire_topology_event(&crate::topology::TopologyEvent::NodeTornDown(id));
4406        }
4407        Ok(())
4408    }
4409
4410    /// Iterative teardown walk (Slice A-bigger, M1-close).
4411    ///
4412    /// The recursive shape was:
4413    ///   ```text
4414    ///   teardown(n):
4415    ///     if torn_down: return
4416    ///     mark torn_down
4417    ///     for meta in metas: teardown(meta)
4418    ///     terminate_node + queue Teardown
4419    ///     for child in children: teardown(child)
4420    ///   ```
4421    /// Deep linear chains (~10k nodes) overflowed the OS thread stack.
4422    ///
4423    /// The iterative shape uses a `Vec<Action>` stack with `Visit` and
4424    /// `EmitTeardown` actions. `Visit(n)` marks `n` torn-down (or no-ops
4425    /// if already), then pushes (in reverse order so LIFO pops in forward
4426    /// order) `Visit(child_K), …, Visit(child_1), EmitTeardown(n),
4427    /// Visit(meta_M), …, Visit(meta_1)`. The R1.3.9.d "metas first, then
4428    /// self, then children" ordering is preserved by the push order:
4429    /// metas pop first, recursively expand and emit; then `EmitTeardown(n)`
4430    /// pops and runs `terminate_node` + queue `Teardown`; then children
4431    /// pop. Idempotency via `has_received_teardown` keeps each node
4432    /// visited at most once even when multi-parent diamonds re-enter via
4433    /// a sibling path.
4434    fn teardown_inner(&self, s: &mut CoreState, root: NodeId) -> Vec<NodeId> {
4435        enum Action {
4436            Visit(NodeId),
4437            EmitTeardown(NodeId),
4438        }
4439        let mut stack: Vec<Action> = vec![Action::Visit(root)];
4440        // Topology accumulator: every node that actually emits TEARDOWN
4441        // (i.e. each `EmitTeardown(id)` site, NOT each `Visit` — visits
4442        // for already-torn-down nodes short-circuit on idempotency).
4443        let mut torn_down: Vec<NodeId> = Vec::new();
4444        while let Some(action) = stack.pop() {
4445            match action {
4446                Action::Visit(id) => {
4447                    if s.require_node(id).has_received_teardown {
4448                        continue; // Idempotent (R2.6.4).
4449                    }
4450                    s.require_node_mut(id).has_received_teardown = true;
4451                    // Push order: children first (pop LAST), then
4452                    // EmitTeardown(id), then metas (pop FIRST). Reverse
4453                    // each list so within-group order matches the original
4454                    // recursive iteration.
4455                    let children: Vec<NodeId> = s
4456                        .children
4457                        .get(&id)
4458                        .map(|c| c.iter().copied().collect())
4459                        .unwrap_or_default();
4460                    for &child in children.iter().rev() {
4461                        stack.push(Action::Visit(child));
4462                    }
4463                    stack.push(Action::EmitTeardown(id));
4464                    let metas: Vec<NodeId> = s.require_node(id).meta_companions.clone();
4465                    for &meta in metas.iter().rev() {
4466                        stack.push(Action::Visit(meta));
4467                    }
4468                }
4469                Action::EmitTeardown(id) => {
4470                    // Auto-prepend COMPLETE if not yet terminal. The (now
4471                    // iterative) terminate_node handles auto-cascade to
4472                    // children's own terminal slots per Lock 2.B.
4473                    let already_terminal = s.require_node(id).terminal.is_some();
4474                    if !already_terminal {
4475                        self.terminate_node(s, id, TerminalKind::Complete);
4476                    }
4477                    // Wire emission of the TEARDOWN itself (tier 6).
4478                    self.queue_notify(s, id, Message::Teardown);
4479                    torn_down.push(id);
4480                }
4481            }
4482        }
4483        torn_down
4484    }
4485
4486    /// Attach `companion` as a meta companion of `parent` per R1.3.9.d.
4487    /// Meta companions are nodes whose lifecycle is bound to the parent's
4488    /// in TEARDOWN ordering: when `parent` tears down, `companion` tears
4489    /// down first.
4490    ///
4491    /// Use this for inspection / audit / sidecar nodes that subscribe to
4492    /// parent state — without the ordering, the companion could observe
4493    /// the parent mid-destruction and emit garbage.
4494    ///
4495    /// Idempotent on duplicate registration of the same companion.
4496    ///
4497    /// # Lifecycle constraint
4498    ///
4499    /// Intended for **setup-time** wiring — call this before `parent` or
4500    /// `companion` enters a wave. Mid-wave registration (especially during
4501    /// a teardown cascade in flight) is implementation-defined: the new
4502    /// edge takes effect on the *next* wave. Adding a companion to a
4503    /// torn-down parent silently no-ops (the parent will not tear down
4504    /// again). For dynamic companion attachment with deterministic
4505    /// ordering, prefer constructing the wiring before subscribers exist.
4506    ///
4507    /// # Panics
4508    ///
4509    /// Panics if either node id is unknown, or if `parent == companion`
4510    /// (a node cannot be its own meta companion — would loop on TEARDOWN).
4511    pub fn add_meta_companion(&self, parent: NodeId, companion: NodeId) {
4512        assert!(parent != companion, "node cannot be its own meta companion");
4513        let mut s = self.lock_state();
4514        assert!(s.nodes.contains_key(&parent), "unknown parent {parent:?}");
4515        assert!(
4516            s.nodes.contains_key(&companion),
4517            "unknown companion {companion:?}"
4518        );
4519        let metas = &mut s.require_node_mut(parent).meta_companions;
4520        if !metas.contains(&companion) {
4521            metas.push(companion);
4522        }
4523    }
4524}
4525
4526// -----------------------------------------------------------------------
4527// INVALIDATE — cache clear + downstream cascade
4528// -----------------------------------------------------------------------
4529
4530impl Core {
4531    /// Clear `node_id`'s cache and cascade `[INVALIDATE]` to downstream
4532    /// dependents per canonical spec §1.4.
4533    ///
4534    /// Semantics:
4535    /// - **Never-populated case (R1.4 line 197):** if `cache == NO_HANDLE`,
4536    ///   the call is a no-op — no cache to clear, no INVALIDATE emitted.
4537    ///   This naturally provides idempotency within a wave: once a node has
4538    ///   been invalidated this wave (cache = NO_HANDLE), a second invalidate
4539    ///   on the same node does nothing.
4540    /// - **Cache clear (immediate):** the node's cached handle is dropped
4541    ///   (refcount released), `cache` becomes `NO_HANDLE`. State nodes
4542    ///   keep `has_fired_once` per spec — INVALIDATE is not a re-gating
4543    ///   event (the next emission to a previously-fired state still does
4544    ///   not re-trigger the first-run gate; that's a resubscribable-terminal
4545    ///   lifecycle concern, separate slice).
4546    /// - **Wire emission (tier 4):** `[INVALIDATE]` is queued via the
4547    ///   normal pause-aware notify path. Buffers while paused, flushes
4548    ///   immediately otherwise.
4549    /// - **Downstream cascade:** for each child of this node, the child's
4550    ///   `dep_handles[idx_of_node]` is reset to `NO_HANDLE` (its previous
4551    ///   value referenced a now-released handle). The child is then
4552    ///   recursively invalidated (no-op if its cache was already
4553    ///   `NO_HANDLE`). This re-closes the child's first-run gate — fn
4554    ///   won't fire again until the upstream re-emits a value.
4555    ///
4556    /// Wraps in a fresh wave when called from outside a wave, so
4557    /// notifications flush at the natural wave boundary.
4558    ///
4559    /// # Panics
4560    ///
4561    /// Panics if `node_id` is unknown, consistent with `emit` / `pause`.
4562    pub fn invalidate(&self, node_id: NodeId) {
4563        {
4564            let s = self.lock_state();
4565            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
4566        }
4567        // INVALIDATE cascade follows `s.children` (in-partition by union-
4568        // find construction). Slice Y1 / Phase E.
4569        self.run_wave_for(node_id, |this| {
4570            let mut s = this.lock_state();
4571            this.invalidate_inner(&mut s, node_id);
4572        });
4573    }
4574
4575    /// Invalidate or defer to wave-end on partition order violation.
4576    /// For producer-pattern operator sinks.
4577    ///
4578    /// # Panics
4579    ///
4580    /// Panics if `node_id` is not registered in this Core.
4581    pub fn invalidate_or_defer(&self, node_id: NodeId) {
4582        {
4583            let s = self.lock_state();
4584            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
4585        }
4586        let result = self.try_run_wave_for(node_id, |this| {
4587            let mut s = this.lock_state();
4588            this.invalidate_inner(&mut s, node_id);
4589        });
4590        if result.is_err() {
4591            self.push_deferred_producer_op(DeferredProducerOp::Callback(Box::new({
4592                let core = self.clone();
4593                move || {
4594                    core.invalidate(node_id);
4595                }
4596            })));
4597        }
4598    }
4599
4600    /// Iterative invalidate cascade (Slice A-bigger, M1-close).
4601    ///
4602    /// The recursive shape was a depth-first cache-clear walk:
4603    ///   ```text
4604    ///   invalidate(n):
4605    ///     if cache(n) == NO_HANDLE: return  // already-invalidated guard
4606    ///     cache(n) = NO_HANDLE; release handle
4607    ///     queue Invalidate(n)
4608    ///     for child in children:
4609    ///       child.dep_handles[idx] = NO_HANDLE
4610    ///       invalidate(child)
4611    ///   ```
4612    /// Deep linear chains overflowed the OS thread stack. The work-queue
4613    /// rewrite has no ordering subtleties (unlike teardown's R1.3.9.d
4614    /// metas-first constraint) — Invalidate is a tier-4 broadcast where
4615    /// the never-populated / already-invalidated guard provides natural
4616    /// idempotency for diamond fan-in.
4617    fn invalidate_inner(&self, s: &mut CoreState, root: NodeId) {
4618        let mut work: Vec<NodeId> = vec![root];
4619        while let Some(node_id) = work.pop() {
4620            // Never-populated / already-invalidated: no-op (R1.4 idempotency).
4621            // Per R1.3.9.c never-populated case, OnInvalidate cleanup hook
4622            // also does NOT fire — natural fallout of skipping via the
4623            // cache==NO_HANDLE guard (we never reach the queue-push below).
4624            let old_handle = s.require_node(node_id).cache;
4625            if old_handle == NO_HANDLE {
4626                continue;
4627            }
4628            // Clear cache + release the handle's slot ownership.
4629            s.require_node_mut(node_id).cache = NO_HANDLE;
4630            self.binding.release_handle(old_handle);
4631            // Slice E2 (R1.3.9.b strict per D057 + D058 fire-at-cache-clear):
4632            // queue OnInvalidate cleanup hook for lock-released drain at
4633            // wave-end. The dedup set guarantees at-most-once-per-wave-per-
4634            // node firing even if a node re-populates mid-wave (via fn-fire
4635            // emit) and gets re-invalidated through a separate path. Pure
4636            // cache==NO_HANDLE idempotency (above) catches "still at
4637            // sentinel" only; the explicit set is the strict R1.3.9.b
4638            // reading.
4639            // Q-beyond Sub-slice 3 (D108, 2026-05-09):
4640            // `invalidate_hooks_fired_this_wave` and
4641            // `deferred_cleanup_hooks` both live on per-thread WaveState.
4642            // Single borrow handles the dedup-insert and (on first
4643            // insertion) the cleanup-hook push.
4644            crate::batch::with_wave_state(|ws| {
4645                if ws.invalidate_hooks_fired_this_wave.insert(node_id) {
4646                    ws.deferred_cleanup_hooks
4647                        .push((node_id, CleanupTrigger::OnInvalidate));
4648                }
4649            });
4650            // Wire emission. Pause-aware via queue_notify.
4651            self.queue_notify(s, node_id, Message::Invalidate);
4652            // Cascade: for each child, clear the dep record's prev_data
4653            // referencing this node and push child onto the work queue.
4654            let child_ids: Vec<NodeId> = s
4655                .children
4656                .get(&node_id)
4657                .map(|c| c.iter().copied().collect())
4658                .unwrap_or_default();
4659            for child_id in child_ids {
4660                let dep_idx = s.require_node(child_id).dep_index_of(node_id);
4661                if let Some(idx) = dep_idx {
4662                    // Reset the child's dep record — the handle was just
4663                    // released. Subsequent first-run-gate checks see
4664                    // sentinel and re-close.
4665                    //
4666                    // Snapshot prev_data + data_batch retains for deferred
4667                    // release, then clear the record. Two-phase to satisfy
4668                    // the borrow checker (nodes + deferred_handle_releases
4669                    // are separate CoreState fields).
4670                    let (old_prev, batch_hs): (HandleId, SmallVec<[HandleId; 1]>) = {
4671                        let dr = &s.require_node(child_id).dep_records[idx];
4672                        (dr.prev_data, dr.data_batch.clone())
4673                    };
4674                    {
4675                        // Q-beyond Sub-slice 1 (D108, 2026-05-09):
4676                        // deferred_handle_releases moved to per-thread
4677                        // WaveState thread_local. State lock held; the
4678                        // thread_local borrow is independent.
4679                        crate::batch::with_wave_state(|ws| {
4680                            if old_prev != NO_HANDLE {
4681                                ws.deferred_handle_releases.push(old_prev);
4682                            }
4683                            for h in batch_hs {
4684                                ws.deferred_handle_releases.push(h);
4685                            }
4686                        });
4687                    }
4688                    let child_rec = s.require_node_mut(child_id);
4689                    child_rec.dep_records[idx].prev_data = NO_HANDLE;
4690                    child_rec.dep_records[idx].data_batch.clear();
4691                    // §10.13 perf (D047): clear received_mask bit so
4692                    // has_sentinel_deps() re-closes the first-run gate.
4693                    if idx < 64 {
4694                        child_rec.received_mask &= !(1u64 << idx);
4695                    }
4696                    work.push(child_id);
4697                }
4698            }
4699        }
4700    }
4701}
4702
4703// -----------------------------------------------------------------------
4704// PAUSE / RESUME — multi-pauser lockset + replay buffer
4705// -----------------------------------------------------------------------
4706
4707/// Reported back from [`Core::resume`] when the final lock releases.
4708///
4709/// `replayed` is the number of tier-3/tier-4 messages dispatched to
4710/// subscribers as part of the drain. `dropped` is the number of messages
4711/// that fell out the front of the buffer due to the Core-global
4712/// `pause_buffer_cap` while this pause cycle was active. A non-zero
4713/// `dropped` indicates a controller held the lock long enough to overflow
4714/// the cap; the binding may want to surface a warning or error.
4715#[derive(Copy, Clone, Debug, PartialEq, Eq)]
4716pub struct ResumeReport {
4717    pub replayed: u32,
4718    pub dropped: u32,
4719}
4720
4721impl Core {
4722    /// Acquire a pause lock on `node_id`. The first lock transitions the
4723    /// node from `Active` to `Paused`; further locks add to the lockset.
4724    /// While paused, tier-3 (DATA/RESOLVED) and tier-4 (INVALIDATE) outgoing
4725    /// messages buffer in the node's pause buffer; other tiers flush
4726    /// immediately.
4727    ///
4728    /// Re-acquiring the same `lock_id` is an idempotent no-op (matches TS
4729    /// convention, R1.2.6 silent on the case).
4730    pub fn pause(&self, node_id: NodeId, lock_id: LockId) -> Result<(), PauseError> {
4731        let mut s = self.lock_state();
4732        let rec = s
4733            .nodes
4734            .get_mut(&node_id)
4735            .ok_or(PauseError::UnknownNode(node_id))?;
4736        // QA A5 (2026-05-07): terminated nodes can't be re-paused. Without
4737        // this check, a stale pause-controller calling pause() on an
4738        // already-terminated node would re-arm `pause_state` to Paused.
4739        // The terminate_node path collapses pause_state → Active and
4740        // drains the buffer (A3-related), but doesn't gate subsequent
4741        // pause() calls. Treat as idempotent no-op (consistent with how
4742        // emit/complete/error early-return on terminal).
4743        if rec.terminal.is_some() {
4744            return Ok(());
4745        }
4746        // Slice F audit close (2026-05-07): `PausableMode::Off` means the
4747        // dispatcher ignores PAUSE for this node — tier-3 flushes
4748        // immediately, fn fires immediately. Treat the call as a successful
4749        // no-op so callers don't need to special-case.
4750        if rec.pausable == PausableMode::Off {
4751            return Ok(());
4752        }
4753        rec.pause_state.add_lock(lock_id);
4754        Ok(())
4755    }
4756
4757    /// Release a pause lock on `node_id`. If the lockset becomes empty, the
4758    /// node transitions back to `Active` and the buffered messages are
4759    /// dispatched to subscribers in arrival order. Returns a [`ResumeReport`]
4760    /// when the final lock released; `None` if the lockset is still
4761    /// non-empty (further locks held).
4762    ///
4763    /// Releasing an unknown `lock_id` (or releasing on an already-Active
4764    /// node) is an idempotent no-op returning `None`.
4765    pub fn resume(
4766        &self,
4767        node_id: NodeId,
4768        lock_id: LockId,
4769    ) -> Result<Option<ResumeReport>, PauseError> {
4770        // Phase 1 (lock-held): collect drained buffer + pending-wave flag +
4771        // sink Arcs. For default-mode nodes whose `pending_wave` was set
4772        // during pause, schedule a single fn-fire by adding to
4773        // `pending_fires` BEFORE we exit the lock — the wave engine picks
4774        // it up on the next drain tick.
4775        let (sinks, messages, dropped, pending_wave_for_default) = {
4776            let mut s = self.lock_state();
4777            let rec = s
4778                .nodes
4779                .get_mut(&node_id)
4780                .ok_or(PauseError::UnknownNode(node_id))?;
4781            // For Off mode, pause/resume are no-ops by construction.
4782            if rec.pausable == PausableMode::Off {
4783                return Ok(None);
4784            }
4785            let was_default_mode = rec.pausable == PausableMode::Default;
4786            // Capture pending_wave BEFORE remove_lock collapses the state.
4787            let pending_wave = if was_default_mode {
4788                rec.pause_state.take_pending_wave()
4789            } else {
4790                false
4791            };
4792            let Some((buffer, dropped)) = rec.pause_state.remove_lock(lock_id) else {
4793                // Not the final-resume — restore the pending_wave flag we
4794                // tentatively cleared, since we're not transitioning to
4795                // Active yet.
4796                if pending_wave {
4797                    rec.pause_state.mark_pending_wave();
4798                }
4799                return Ok(None);
4800            };
4801            let sinks: Vec<Sink> = rec.subscribers.values().cloned().collect();
4802            let messages: Vec<Message> = buffer.into_iter().collect();
4803            // Default-mode pending-wave handling: schedule the fn-fire so
4804            // the wave engine consolidates the pause-window dep deliveries
4805            // into one fn execution. State nodes don't fire fn (no
4806            // `pending_fires` membership has effect for them).
4807            //
4808            // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
4809            // on per-thread WaveState.
4810            if pending_wave && was_default_mode {
4811                crate::batch::with_wave_state(|ws| {
4812                    ws.pending_fires.insert(node_id);
4813                });
4814            }
4815            (sinks, messages, dropped, pending_wave && was_default_mode)
4816        };
4817        let replayed = u32::try_from(messages.len()).unwrap_or(u32::MAX);
4818
4819        // Phase 2 (lock-released): fire sinks for ResumeAll-buffered
4820        // messages. Default-mode resume produces no buffered replay (the
4821        // consolidated fn-fire produces fresh wave traffic via the standard
4822        // commit_emission path).
4823        if !messages.is_empty() {
4824            for sink in &sinks {
4825                sink(&messages);
4826            }
4827            // Phase 3: balance the retain_handle calls done at buffer-push
4828            // time — sinks observe values but don't own refcount shares.
4829            for msg in &messages {
4830                if let Some(h) = msg.payload_handle() {
4831                    self.binding.release_handle(h);
4832                }
4833            }
4834        }
4835
4836        // Phase 4 (default-mode): drain the consolidated fn-fire scheduled
4837        // in Phase 1. `run_wave_for(node_id)` acquires the partitions
4838        // touched from `node_id` (Slice Y1 / Phase E) and runs the standard
4839        // drain pipeline; the new fn-fire emerges as a normal wave's worth
4840        // of messages to subscribers.
4841        if pending_wave_for_default {
4842            self.run_wave_for(node_id, |_this| {
4843                // The pending_fires entry was pushed in Phase 1 under the
4844                // lock. run_wave's drain picks it up.
4845            });
4846        }
4847        Ok(Some(ResumeReport { replayed, dropped }))
4848    }
4849
4850    /// True if the node currently holds at least one pause lock.
4851    #[must_use]
4852    pub fn is_paused(&self, node_id: NodeId) -> bool {
4853        self.state
4854            .lock()
4855            .require_node(node_id)
4856            .pause_state
4857            .is_paused()
4858    }
4859
4860    /// Number of pause locks currently held on `node_id`. `0` if Active.
4861    #[must_use]
4862    pub fn pause_lock_count(&self, node_id: NodeId) -> usize {
4863        self.state
4864            .lock()
4865            .require_node(node_id)
4866            .pause_state
4867            .lock_count()
4868    }
4869
4870    /// Test helper: whether `node_id` currently holds the given `lock_id`.
4871    #[must_use]
4872    pub fn holds_pause_lock(&self, node_id: NodeId, lock_id: LockId) -> bool {
4873        self.state
4874            .lock()
4875            .require_node(node_id)
4876            .pause_state
4877            .contains_lock(lock_id)
4878    }
4879}
4880
4881// -----------------------------------------------------------------------
4882// set_deps — atomic dep mutation
4883// -----------------------------------------------------------------------
4884
4885/// Errors returnable by [`Core::set_deps`].
4886///
4887/// Per `~/src/graphrefly-ts/docs/research/rewire-design-notes.md` and the
4888/// Phase 13.8 Q1 lock:
4889/// - `SelfDependency` — `n in newDeps` (self-loops are pathological without
4890///   explicit fixed-point semantics, which GraphReFly does not provide).
4891/// - `WouldCreateCycle { path }` — adding the new edge would create a cycle.
4892///   The `path` field reports the offending dep chain for debuggability.
4893/// - `UnknownNode` / `NotComputeNode` — invariant violations from the caller.
4894/// - `TerminalNode` — `n` itself has emitted COMPLETE/ERROR; rewiring a
4895///   terminal stream is a category error (terminal is one-shot at this
4896///   layer; recovery is the resubscribable path on a fresh subscribe).
4897/// - `TerminalDep` — a newly-added dep is terminal AND not resubscribable.
4898///   Resubscribable terminal deps are accepted because the subscribe path
4899///   resets their lifecycle. Non-resubscribable terminal deps would deliver
4900///   their already-emitted terminal directly to `n`'s `dep_terminals` slot,
4901///   which is rarely intended.
4902#[derive(Error, Debug, Clone, PartialEq)]
4903pub enum SetDepsError {
4904    /// `n` appeared in `new_deps` (self-loop rejection).
4905    #[error("set_deps({n:?}, ...): self-dependency rejected (n appeared in new_deps)")]
4906    SelfDependency { n: NodeId },
4907
4908    /// Adding the new dep would create a cycle. `path` is the chain
4909    /// `[added_dep, ..., n]` reachable via existing deps.
4910    #[error(
4911        "set_deps({n:?}, ...): cycle would form via path {path:?} \
4912         (adding {added_dep:?} → {n:?} closes the loop)"
4913    )]
4914    WouldCreateCycle {
4915        n: NodeId,
4916        added_dep: NodeId,
4917        path: Vec<NodeId>,
4918    },
4919
4920    #[error("set_deps: unknown node {0:?}")]
4921    UnknownNode(NodeId),
4922
4923    #[error("set_deps: node {0:?} is not a compute node (state nodes have no deps)")]
4924    NotComputeNode(NodeId),
4925
4926    /// `n` itself has terminated (COMPLETE / ERROR). Rewiring a terminal node
4927    /// is rejected — the stream has ended at this layer. To recover, mark
4928    /// the node resubscribable before terminate; a fresh subscribe will then
4929    /// reset its lifecycle.
4930    #[error("set_deps({n:?}, ...): node has already terminated; cannot rewire a terminal node")]
4931    TerminalNode { n: NodeId },
4932
4933    /// A newly-added dep is terminal AND non-resubscribable. Per Phase 13.8
4934    /// Q1, this is rejected; resubscribable terminal deps are allowed
4935    /// because the subscribe path resets them when activated. Already-present
4936    /// terminal deps are unaffected (their terminal status was accepted at
4937    /// the time they terminated).
4938    #[error(
4939        "set_deps({n:?}, ...): added dep {dep:?} is terminal and not resubscribable; \
4940         either mark it resubscribable before terminate, or remove the dep from new_deps"
4941    )]
4942    TerminalDep { n: NodeId, dep: NodeId },
4943
4944    /// `n` itself is currently mid-fire — a user fn for `n` re-entered Core
4945    /// via `set_deps(n, ...)` from inside `n`'s own `invoke_fn` /
4946    /// `project_each` / `predicate_each` / etc. Phase 1 of the dispatcher
4947    /// snapshotted `dep_handles` BEFORE the lock-released callback; the
4948    /// callback returning a `tracked` set indexed against THAT ordering
4949    /// would corrupt indices if the rewire re-orders deps mid-fire.
4950    /// Rejected to preserve the dynamic-tracked-indices invariant (D1).
4951    ///
4952    /// Workaround: schedule the rewire from a different node's fn (via
4953    /// `Core::emit` on a state node and observing the emit downstream),
4954    /// or perform the rewire after the wave completes (e.g. from a sink
4955    /// callback that is itself outside any fn-fire scope).
4956    ///
4957    /// Slice F (2026-05-07) — A6.
4958    #[error(
4959        "set_deps({n:?}, ...): rejected — node {n:?} is currently mid-fire \
4960         (set_deps from inside the firing node's own fn would corrupt the \
4961         Dynamic `tracked` indices snapshot taken before invoke_fn). \
4962         Schedule the rewire outside this fire scope."
4963    )]
4964    ReentrantOnFiringNode { n: NodeId },
4965
4966    /// `set_deps(n, ...)` would trigger a partition migration (union or
4967    /// split in the per-subgraph union-find registry) that affects the
4968    /// partition of a node currently mid-fire on this thread. Distinct
4969    /// from [`Self::ReentrantOnFiringNode`]: that variant rejects
4970    /// `set_deps(n, ...)` where `n` itself is firing; this variant
4971    /// rejects `set_deps(n, ...)` on some OTHER node whose union/split
4972    /// shifts a firing node's partition root mid-wave.
4973    ///
4974    /// Why this matters: Y1's wave engine holds an
4975    /// [`Arc<crate::subgraph::SubgraphLockBox>`] for the firing node's
4976    /// partition for the wave's duration. A union mid-wave swaps the
4977    /// box-identity for one of the two affected partitions; a split
4978    /// (Y1+ post-Phase-F) extracts a fresh box for the orphan side.
4979    /// Either way the held Arc would diverge from the registry's
4980    /// current root for that partition, so the wave would lose
4981    /// serialization against the box's true partition mid-flight.
4982    ///
4983    /// Per [`SESSION-rust-port-d3-per-subgraph-parallelism.md`](https://github.com/graphrefly/graphrefly-ts/blob/main/archive/docs/SESSION-rust-port-d3-per-subgraph-parallelism.md)
4984    /// Q3 = (a-strict): mid-wave migration is rejected at edge-mutation
4985    /// time. If a real consumer surfaces pressure to support mid-wave
4986    /// migration, lift via state-migration logic in a follow-up — but
4987    /// the v1 contract is "the partition a wave runs in cannot change
4988    /// shape mid-flight."
4989    ///
4990    /// `n` is the node whose `set_deps` was rejected; `firing` is the
4991    /// concretely-identified firing node whose partition would be
4992    /// migrated. Workaround: schedule the rewire outside the wave
4993    /// (e.g. emit a state-change that triggers `set_deps` from a sink
4994    /// callback running post-flush).
4995    ///
4996    /// Slice Y1 (D3 / D091, 2026-05-08).
4997    #[error(
4998        "set_deps({n:?}, ...): rejected — would migrate the partition of \
4999         currently-firing node {firing:?} mid-wave (union/split during \
5000         fire would invalidate the held wave_owner Arc). Schedule the \
5001         rewire outside the wave."
5002    )]
5003    PartitionMigrationDuringFire { n: NodeId, firing: NodeId },
5004}
5005
5006impl Core {
5007    /// Atomic dep mutation — change a node's upstream deps without TEARDOWN
5008    /// cascading and without losing cache.
5009    ///
5010    /// Per the TLA+-verified design at
5011    /// `~/src/graphrefly-ts/docs/research/wave_protocol_rewire.tla`
5012    /// (35,950 distinct states, all 7 invariants clean):
5013    ///
5014    /// - Removed deps: clear dirtyMask bit, drain pending queue, drop DepRecord.
5015    /// - Added deps: SENTINEL prevData; push-on-subscribe if added dep has cached DATA.
5016    /// - Preserved: `firstRunPassed`, `pauseLocks`, `pauseBuffer`, `cache` (ROM/RAM).
5017    /// - Status auto-settles if dirtyMask becomes empty.
5018    /// - Idempotent on `new_deps == current deps`.
5019    /// - Self-rewire `n ∈ new_deps` rejected (`SelfDependency`).
5020    /// - Cycles rejected (`WouldCreateCycle`).
5021    /// - Allowed mid-wave + while paused.
5022    /// - Phase 13.8 Q1: terminal `n` rejected (`TerminalNode`); newly-added
5023    ///   terminal non-resubscribable deps rejected (`TerminalDep`).
5024    ///
5025    /// The body is a single atomic dep-mutation transaction with several
5026    /// discrete validation stages. Splitting would require passing a
5027    /// partially-mutable CoreState across helpers, and the transaction's
5028    /// locality is what makes the F1 refcount-leak collection work.
5029    #[allow(clippy::too_many_lines)]
5030    pub fn set_deps(&self, n: NodeId, new_deps: &[NodeId]) -> Result<(), SetDepsError> {
5031        let mut s = self.lock_state();
5032        // Validate node exists and is compute. Read-once via the helper so
5033        // subsequent code can use `require_node(n)` without re-checking.
5034        let (is_state, is_producer, is_terminal) = {
5035            let rec = s.nodes.get(&n).ok_or(SetDepsError::UnknownNode(n))?;
5036            (rec.is_state(), rec.is_producer(), rec.terminal.is_some())
5037        };
5038        if is_state || is_producer {
5039            // State and Producer nodes have no declared deps — set_deps
5040            // is meaningless. Producer nodes manage their own subscriptions
5041            // through the binding's ProducerCtx; mutating their (empty)
5042            // dep set would not affect that.
5043            return Err(SetDepsError::NotComputeNode(n));
5044        }
5045        // Reject if `n` itself is terminal (Phase 13.8 Q1: terminal nodes
5046        // cannot be rewired; recovery is via resubscribable subscribe).
5047        if is_terminal {
5048            return Err(SetDepsError::TerminalNode { n });
5049        }
5050        // A6 reentrancy guard (Slice F, 2026-05-07): reject if `n` is
5051        // currently mid-fire on the wave-owner thread. Closes the D1 hazard
5052        // where `Phase 1` snapshotted `dep_handles` against pre-rewire dep
5053        // ordering and `Phase 3` would store the returned `tracked` indices
5054        // against post-rewire ordering. Same-thread re-entry is the only
5055        // path that matters — cross-thread emits already block on
5056        // `wave_owner` per the M1 design.
5057        // /qa F2 reverted (2026-05-10): currently_firing lives on
5058        // CoreState (per-Core, cross-thread visible). The D1 reentrance
5059        // check requires the same-thread visibility (a fn re-entering
5060        // set_deps on its own firing node), and the P13 cross-thread
5061        // check requires cross-thread visibility (Thread B's set_deps
5062        // observing Thread A's firing pushes during A's lock-released
5063        // invoke_fn). Per-Core placement on shared CoreState delivers
5064        // both. Read under the already-held state lock.
5065        if s.currently_firing.contains(&n) {
5066            return Err(SetDepsError::ReentrantOnFiringNode { n });
5067        }
5068        // Self-rewire rejection.
5069        if new_deps.contains(&n) {
5070            return Err(SetDepsError::SelfDependency { n });
5071        }
5072        // Validate all new deps exist.
5073        for &d in new_deps {
5074            if !s.nodes.contains_key(&d) {
5075                return Err(SetDepsError::UnknownNode(d));
5076            }
5077        }
5078        // Cycle detection: data flows parent → child via the `children` map.
5079        // Adding edge `d → n` (d becomes a dep of n) creates a cycle iff
5080        // `d` is already reachable from `n` via existing data-flow edges
5081        // (so `n → ... → d` exists, and the new `d → n` closes the loop).
5082        // DFS from `n` along `children` edges, looking for each added dep.
5083        let current_deps: HashSet<NodeId> = s.require_node(n).dep_ids().collect();
5084        let new_deps_set: HashSet<NodeId> = new_deps.iter().copied().collect();
5085        let added: HashSet<NodeId> = new_deps_set.difference(&current_deps).copied().collect();
5086        for &d in &added {
5087            if let Some(path) = self.path_from_to(&s, n, d) {
5088                return Err(SetDepsError::WouldCreateCycle {
5089                    n,
5090                    added_dep: d,
5091                    path,
5092                });
5093            }
5094        }
5095        // Phase 13.8 Q1: reject newly-added deps that are terminal AND not
5096        // resubscribable. Resubscribable terminal deps are allowed — the
5097        // subscribe path resets their lifecycle when something activates
5098        // them. Already-present (kept) deps are unaffected; their terminal
5099        // status was accepted at the time they terminated.
5100        for &d in &added {
5101            let dep_rec = s.require_node(d);
5102            if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
5103                return Err(SetDepsError::TerminalDep { n, dep: d });
5104            }
5105        }
5106        // Compute `removed` early (Phase F: needs to be available for P13
5107        // split-case widening below). Idempotent fast-path moved below the
5108        // P13 check accordingly.
5109        let removed: HashSet<NodeId> = current_deps.difference(&new_deps_set).copied().collect();
5110
5111        // Slice Y1 (D3 / D091 — P13, 2026-05-08): reject mid-wave set_deps
5112        // that would shift a currently-firing node's partition root.
5113        // Distinct from `ReentrantOnFiringNode` (same-node case, line above).
5114        // Holds the registry lock briefly under the state lock per the
5115        // P12-fix lock-discipline invariant `state lock → registry mutex`.
5116        //
5117        // **Two cases:**
5118        // 1. **Union (Phase D)** — adding a cross-partition dep merges two
5119        //    components. Both pre-merge components are affected (the
5120        //    smaller-rank loser's box is dropped, its members migrate to
5121        //    the winner's root).
5122        // 2. **Split (Phase F, 2026-05-09)** — removing an edge whose
5123        //    removal disconnects the dep graph splits one component into
5124        //    two. The pre-split component is affected (every member
5125        //    re-unions; the orphan side gets a fresh `SubgraphLockBox`
5126        //    while the keep side preserves the original Arc).
5127        //
5128        // Either case migrates the partition root (and box-identity) for
5129        // affected nodes mid-wave; if any node currently firing on this
5130        // thread is in an affected partition, the wave's held
5131        // `Arc<SubgraphLockBox>` would diverge from the registry's new
5132        // canonical box. Q3 = (a-strict) per the D3 design lock rejects
5133        // both cases at edge-mutation time.
5134        // /qa F2 reverted (2026-05-10): currently_firing lives on
5135        // CoreState (cross-thread visible — load-bearing for this P13
5136        // partition-migration check, which detects cross-thread set_deps
5137        // calls during another thread's lock-released invoke_fn).
5138        // Snapshot under the already-held state lock so the registry
5139        // mutex acquire below doesn't need to also hold the snapshot
5140        // borrow.
5141        let currently_firing_snapshot: Vec<NodeId> = s.currently_firing.clone();
5142        if !currently_firing_snapshot.is_empty() && (!added.is_empty() || !removed.is_empty()) {
5143            let mut reg = self.registry.lock();
5144            // Snapshot firing nodes' partitions. `partition_of` is mutating
5145            // (path compression) but partition IDENTITY is stable across
5146            // reads (only `union_nodes` / `split_partition` mutate roots).
5147            let firing_with_partition: Vec<(NodeId, crate::subgraph::SubgraphId)> =
5148                currently_firing_snapshot
5149                    .iter()
5150                    .filter_map(|&f| reg.partition_of(f).map(|p| (f, p)))
5151                    .collect();
5152            if !firing_with_partition.is_empty() {
5153                let part_n = reg.partition_of(n);
5154                // Case 1 (union): for each added dep, check cross-partition merge.
5155                for &added_dep in &added {
5156                    let part_added = reg.partition_of(added_dep);
5157                    if part_n == part_added {
5158                        continue; // same-partition add is a no-op in union-find
5159                    }
5160                    let affected = [part_n, part_added];
5161                    if let Some(&(firing, _)) = firing_with_partition
5162                        .iter()
5163                        .find(|(_, p)| affected.contains(&Some(*p)))
5164                    {
5165                        return Err(SetDepsError::PartitionMigrationDuringFire { n, firing });
5166                    }
5167                }
5168                // Case 2 (split): for each removed dep, simulate undirected
5169                // walk from `removed_dep` skipping the would-be-removed edge
5170                // (`removed_dep → n`); if `n` is unreachable, removal would
5171                // disconnect — split — affecting all nodes in that
5172                // partition. Since dep edges are within a single partition
5173                // by construction (union-find merges on edge add), every
5174                // node currently in the partition is affected.
5175                //
5176                // QA-fix #4 (2026-05-09): pass `added_edges` as `extra_edges`
5177                // so a `set_deps` that simultaneously REMOVES one edge AND
5178                // ADDS another path isn't falsely rejected. Without this,
5179                // the pre-mutation walk doesn't see the would-be-added
5180                // edges and reports disconnect even when the net change
5181                // preserves connectivity.
5182                let added_edges: Vec<(NodeId, NodeId)> = added.iter().map(|&a| (a, n)).collect();
5183                for &removed_dep in &removed {
5184                    let part_removed = reg.partition_of(removed_dep);
5185                    let visited = walk_undirected_dep_graph(
5186                        &s,
5187                        removed_dep,
5188                        Some((removed_dep, n)),
5189                        &added_edges,
5190                    );
5191                    let would_disconnect = !visited.contains(&n);
5192                    if would_disconnect {
5193                        if let Some(&(firing, _)) = firing_with_partition
5194                            .iter()
5195                            .find(|(_, p)| Some(*p) == part_removed)
5196                        {
5197                            return Err(SetDepsError::PartitionMigrationDuringFire { n, firing });
5198                        }
5199                    }
5200                }
5201            }
5202        }
5203        // Idempotent fast-path. Now safe to short-circuit since the P13
5204        // check above already considered both `added` and `removed`.
5205        if added.is_empty() && removed.is_empty() {
5206            return Ok(());
5207        }
5208
5209        // Snapshot old deps (ordered) for topology event, before mutation.
5210        let old_deps_vec: Vec<NodeId> = s.require_node(n).dep_ids_vec();
5211
5212        // Carry out the rewire atomically.
5213        // 1. Build new dep_records, preserving DepRecord state for kept deps.
5214        let new_deps_vec: Vec<NodeId> = new_deps.to_vec();
5215        //
5216        // Refcount discipline (F1 audit fix): each `Some(TerminalKind::Error(h))`
5217        // slot owns a refcount share retained at `terminate_node` time. When a
5218        // dep is REMOVED, its slot is dropped — the corresponding handle's
5219        // share must be released here, otherwise it leaks until Core drop.
5220        // Also release data_batch retains for removed deps.
5221        let (new_dep_records, removed_handles): (Vec<DepRecord>, Vec<HandleId>) = {
5222            let rec = s.require_node(n);
5223            // Index old dep_records by NodeId for O(1) lookup of kept deps.
5224            let old_by_node: HashMap<NodeId, &DepRecord> =
5225                rec.dep_records.iter().map(|dr| (dr.node, dr)).collect();
5226            let new_records: Vec<DepRecord> = new_deps_vec
5227                .iter()
5228                .map(|&d| {
5229                    if let Some(old) = old_by_node.get(&d) {
5230                        // Kept dep: preserve all state (prev_data, data_batch,
5231                        // terminal, wave flags). Subscriptions stay live.
5232                        DepRecord {
5233                            node: d,
5234                            prev_data: old.prev_data,
5235                            dirty: old.dirty,
5236                            involved_this_wave: old.involved_this_wave,
5237                            data_batch: old.data_batch.clone(),
5238                            terminal: old.terminal,
5239                        }
5240                    } else {
5241                        // Added dep: fresh sentinel record.
5242                        DepRecord::new(d)
5243                    }
5244                })
5245                .collect();
5246            // Collect handles to release from REMOVED dep records.
5247            let mut to_release: Vec<HandleId> = Vec::new();
5248            for d in &removed {
5249                if let Some(old) = old_by_node.get(d) {
5250                    if let Some(TerminalKind::Error(h)) = old.terminal {
5251                        to_release.push(h);
5252                    }
5253                    // Release data_batch retains for removed deps.
5254                    for &h in &old.data_batch {
5255                        to_release.push(h);
5256                    }
5257                }
5258            }
5259            (new_records, to_release)
5260        };
5261        // Clear dirtyMask bit by re-emitting the wave-bookkeeping: we don't
5262        // currently model a per-dep dirtyMask explicitly (we use the boolean
5263        // `dirty` flag at node level). Removing a dep's entry from the implicit
5264        // mask is therefore implicit — by removing the dep, future emissions
5265        // from it can't re-arm the bit. The per-dep `involved_this_wave` flag
5266        // stays wave-scoped and gets cleared at wave end. The setDeps action
5267        // itself does NOT change the dirty boolean unless all deps are cleared;
5268        // in that case we settle.
5269        // Slice E2 (D067): on a dynamic node that had previously fired its
5270        // fn, capture `has_fired_once` BEFORE the reset so we can fire
5271        // `OnRerun` cleanup lock-released after `drop(s)` below. Without
5272        // this, the next `fire_regular` Phase 1 would capture
5273        // `has_fired_once = false`, causing Phase 1.5 to skip OnRerun —
5274        // silently dropping the prior activation's cleanup closure when
5275        // the next `invoke_fn` overwrites `current_cleanup`. Per spec
5276        // R2.4.5, `set_deps` does NOT end the activation cycle
5277        // (subscribe→unsubscribe is the cycle boundary), so OnRerun must
5278        // fire on every re-fire including post-set_deps.
5279        // §10 perf (D047): compute new topo_rank before the mutable
5280        // borrow on rec, since we need to read other nodes' depths.
5281        let new_topo_rank = if new_deps_vec.is_empty() {
5282            0
5283        } else {
5284            new_deps_vec
5285                .iter()
5286                .filter(|&&d| d != n)
5287                .filter_map(|&d| s.nodes.get(&d).map(|r| r.topo_rank))
5288                .max()
5289                .unwrap_or(0)
5290                .saturating_add(1)
5291        };
5292        let fire_set_deps_on_rerun;
5293        {
5294            let rec = s.require_node_mut(n);
5295            fire_set_deps_on_rerun = rec.is_dynamic && rec.has_fired_once;
5296            rec.dep_records = new_dep_records;
5297            rec.topo_rank = new_topo_rank;
5298            // §10.13 perf (D047): recompute received_mask from new dep_records.
5299            // §10.3 perf (Slice V1): recompute involved_mask alongside.
5300            rec.received_mask = 0;
5301            rec.involved_mask = 0;
5302            for (i, dr) in rec.dep_records.iter().enumerate() {
5303                if i < 64 {
5304                    if dr.prev_data != NO_HANDLE || !dr.data_batch.is_empty() {
5305                        rec.received_mask |= 1u64 << i;
5306                    }
5307                    if dr.involved_this_wave {
5308                        rec.involved_mask |= 1u64 << i;
5309                    }
5310                }
5311            }
5312            // Re-derive `tracked` for static derived: all indices.
5313            // For dynamic: clear `tracked` AND reset `has_fired_once` so the
5314            // next dep delivery satisfies the first-fire branch in
5315            // `deliver_data_to_consumer` (`!has_fired_once || tracked.contains(...)`).
5316            // Without resetting `has_fired_once`, the cleared `tracked` blocks
5317            // every future fire — fn never re-runs and the dynamic node sits
5318            // on stale cache derived from the old dep set. The next fire
5319            // re-runs fn unconditionally; fn's returned `tracked` then
5320            // repopulates `rec.tracked` and normal selective-deps semantics
5321            // resume from the next dep update onward.
5322            if rec.is_dynamic {
5323                rec.tracked.clear();
5324                rec.has_fired_once = false;
5325            } else {
5326                // Derived (static) and Operator track all deps.
5327                rec.tracked = (0..new_deps_vec.len()).collect();
5328            }
5329        }
5330
5331        // 2. Update inverted-edge map (children).
5332        for &removed_dep in &removed {
5333            if let Some(set) = s.children.get_mut(&removed_dep) {
5334                set.remove(&n);
5335            }
5336        }
5337        for &added_dep in &added {
5338            s.children.entry(added_dep).or_default().insert(n);
5339        }
5340
5341        // 3. Push-on-subscribe for added deps with cached DATA. Wraps in a
5342        // wave so any downstream propagation runs cleanly. We capture only
5343        // the LIST of added deps (not their cache values) because the cache
5344        // can change between releasing the validation lock and the wave's
5345        // re-acquisition — see the P2 race fix below.
5346        //
5347        // P2 (Slice A close /qa) — between `drop(s)` and `run_wave`'s
5348        // closure re-acquiring the lock, a concurrent thread could
5349        // invalidate one of the added deps, releasing its cache handle. A
5350        // pre-snapshot of `(added_dep, cache)` pairs would then carry a
5351        // dangling HandleId into `deliver_data_to_consumer`. The fix is to
5352        // re-read each added dep's `cache` INSIDE the closure (under the
5353        // freshly re-acquired state lock). The wave-owner re-entrant mutex
5354        // (Q2) blocks concurrent waves once we enter `run_wave`, so the
5355        // re-read sees a coherent post-validation state.
5356        let added_for_wave: Vec<NodeId> = added.iter().copied().collect();
5357        // Slice Y1 (D3 / D090 — P12 fix, 2026-05-08): maintain partition
5358        // membership BEFORE dropping the state lock so the registry can
5359        // never lag behind topology mutations as observed by concurrent
5360        // readers. Lock-order invariant `state lock → registry mutex`
5361        // (one-way; never registry → state) — see the matching block in
5362        // `Core::register` for the full rationale.
5363        //   - For each new edge: union the partitions of `n` and `added_dep`.
5364        //   - For each removed edge (Slice Y1 / Phase F, 2026-05-09):
5365        //     run undirected-dep-graph BFS from `removed_dep` over the
5366        //     POST-removal `s.children` + `dep_records`. If `n` is
5367        //     unreachable, the partition has split — gather the affected
5368        //     component nodes + intra-component edges, then call
5369        //     [`SubgraphRegistry::split_partition`] to migrate the orphan
5370        //     side onto a fresh `SubgraphLockBox`. Mid-fire splits would
5371        //     have been rejected at the P13 check above (Q3 = (a-strict)).
5372        {
5373            let mut reg = self.registry.lock();
5374            for &added_dep in &added {
5375                reg.union_nodes(n, added_dep);
5376            }
5377            for &removed_dep in &removed {
5378                // Post-removal walk — `s.children[removed_dep]` no longer
5379                // contains `n`, and `s.nodes[n].dep_records` no longer
5380                // contains `removed_dep`. No skip needed; no extra edges
5381                // (added edges are already applied to `s.children` and
5382                // `dep_records` by the time we reach this block).
5383                let visited = walk_undirected_dep_graph(&s, removed_dep, None, &[]);
5384                if visited.contains(&n) {
5385                    // Still connected via other dep edges — no split.
5386                    continue;
5387                }
5388                // Disconnected. `visited` is the keep-side (containing
5389                // `removed_dep`). Identify the original component, the
5390                // intra-component dep edges, and split.
5391                let original_root = reg.find(removed_dep);
5392                // Snapshot keys before iterating — `find` mutates via
5393                // path compression; iterating + mutating concurrently
5394                // would alias-borrow.
5395                let snapshot_keys: Vec<NodeId> = reg.registered_nodes();
5396                let component_nodes: Vec<NodeId> = snapshot_keys
5397                    .into_iter()
5398                    .filter(|&node| reg.find(node) == original_root)
5399                    .collect();
5400                let component_set: HashSet<NodeId> = component_nodes.iter().copied().collect();
5401                // Collect dep edges within the component (post-removal).
5402                // Edge convention: `(parent, child)` data-flow direction.
5403                let mut edges_in_component: Vec<(NodeId, NodeId)> = Vec::new();
5404                for &node in &component_nodes {
5405                    if let Some(rec) = s.nodes.get(&node) {
5406                        for d in rec.dep_records.iter().map(|r| r.node) {
5407                            if component_set.contains(&d) {
5408                                edges_in_component.push((d, node));
5409                            }
5410                        }
5411                    }
5412                }
5413                let keep_side_nodes: Vec<NodeId> = visited.iter().copied().collect();
5414                reg.split_partition(&component_nodes, &keep_side_nodes, &edges_in_component);
5415                // Marker call kept for symmetry with `union_nodes` — the
5416                // registry's `on_edge_removed` is itself a no-op (Phase F
5417                // moved the actual work into Core where the dep-graph
5418                // view is available).
5419                reg.on_edge_removed(n, removed_dep);
5420            }
5421        }
5422        // B3 (D117, 2026-05-10): when set_deps clears ALL deps mid-wave AND
5423        // n has a tier-1 (DIRTY) message already queued this wave with no
5424        // settle yet, push n to `pending_auto_resolve` so the drain-end
5425        // sweep emits a paired Resolved. Without this, subscribers observe
5426        // an unpaired DIRTY, violating R1.3.1.b two-phase push pairing.
5427        //
5428        // Outside any active wave the per-thread `pending_notify` is empty
5429        // (cleared at wave-end), so the predicate short-circuits and the
5430        // insert is a no-op. Inside a wave, the `pending_auto_resolve`
5431        // sweep at `drain_and_flush` end re-checks pending_notify and
5432        // routes through `queue_notify` (which handles paused-children
5433        // pause-buffer placement automatically).
5434        if new_deps_vec.is_empty() {
5435            // F6 (/qa 2026-05-10): walk pending_notify in arrival order
5436            // counting unpaired DIRTYs. Tier 4 INVALIDATE is NOT a
5437            // settle for two-phase pairing — it clears cache but does
5438            // not pair with a DIRTY. Pairs are DIRTY ↔ DATA / RESOLVED
5439            // (tier 3 value-class) or DIRTY ↔ COMPLETE / ERROR (tier 5
5440            // terminal). Multi-emit waves like `[DIRTY, RESOLVED, DIRTY]`
5441            // leave one trailing unpaired DIRTY that needs auto-Resolved.
5442            crate::batch::with_wave_state(|ws| {
5443                let needs_auto_resolve = ws.pending_notify.get(&n).is_some_and(|entry| {
5444                    let mut unpaired: i32 = 0;
5445                    for m in entry.iter_messages() {
5446                        match m {
5447                            crate::message::Message::Dirty => unpaired += 1,
5448                            crate::message::Message::Data(_)
5449                            | crate::message::Message::Resolved
5450                            | crate::message::Message::Complete
5451                            | crate::message::Message::Error(_)
5452                                if unpaired > 0 =>
5453                            {
5454                                unpaired -= 1;
5455                            }
5456                            // INVALIDATE / PAUSE / RESUME / TEARDOWN /
5457                            // START — not settles for two-phase pairing.
5458                            _ => {}
5459                        }
5460                    }
5461                    unpaired > 0
5462                });
5463                if needs_auto_resolve {
5464                    ws.pending_auto_resolve.insert(n);
5465                }
5466            });
5467        }
5468        // Drop the state lock before run_wave (which acquires its own) and
5469        // before crossing the binding boundary for the F1 refcount-fix
5470        // releases. Keeps the lock-discipline split (binding calls outside
5471        // the state lock) consistent with the rest of the dispatcher.
5472        drop(s);
5473        // Slice E2 (D067): fire OnRerun lock-released for dynamic nodes
5474        // that had previously fired. The cleanup closure cleans up
5475        // resources tied to the old dep shape before the next fn-fire
5476        // (triggered by added-dep push-on-subscribe below) registers a
5477        // fresh cleanup spec. Direct fire (NOT via deferred_cleanup_hooks)
5478        // because set_deps may NOT enter a wave (no added deps → no
5479        // run_wave below) — queueing the hook would orphan it until the
5480        // next unrelated wave drains.
5481        if fire_set_deps_on_rerun {
5482            self.binding.cleanup_for(n, CleanupTrigger::OnRerun);
5483        }
5484        // Fire topology event after lock is dropped.
5485        self.fire_topology_event(&crate::topology::TopologyEvent::DepsChanged {
5486            node: n,
5487            old_deps: old_deps_vec,
5488            new_deps: new_deps_vec.clone(),
5489        });
5490        if !added_for_wave.is_empty() {
5491            // Slice Y1 / Phase E: push-on-subscribe wave runs on `n`'s
5492            // touched partitions. Added deps are now unioned with `n`
5493            // (Phase C P12 fix moved registry mutation inside the state
5494            // lock), so any cascade through them stays in `n`'s partition
5495            // set as walked by `compute_touched_partitions`.
5496            self.run_wave_for(n, |this| {
5497                let mut s = this.lock_state();
5498                // Defensive: re-validate `n` still exists and isn't terminal.
5499                // A concurrent path could have terminated it between
5500                // validation and run_wave_for's partition-lock acquisition.
5501                if !s.nodes.contains_key(&n) || s.require_node(n).terminal.is_some() {
5502                    return;
5503                }
5504                for added_dep in &added_for_wave {
5505                    // Re-read cache under the wave-owner-held lock — this
5506                    // is the post-validation, post-concurrent-action
5507                    // snapshot. NO_HANDLE means the dep was invalidated
5508                    // concurrently; skip (no data to push).
5509                    let cache = match s.nodes.get(added_dep) {
5510                        Some(rec) => rec.cache,
5511                        None => continue, // dep deleted concurrently
5512                    };
5513                    if cache == NO_HANDLE {
5514                        continue;
5515                    }
5516                    let dep_idx = s.require_node(n).dep_index_of(*added_dep);
5517                    if let Some(idx) = dep_idx {
5518                        this.deliver_data_to_consumer(&mut s, n, idx, cache);
5519                    }
5520                }
5521            });
5522        }
5523        for h in removed_handles {
5524            self.binding.release_handle(h);
5525        }
5526        Ok(())
5527    }
5528
5529    /// DFS from `from` along data-flow edges (children map) looking for `to`.
5530    /// Returns the path including endpoints, or `None` if unreachable. Used
5531    /// for cycle detection in [`Self::set_deps`].
5532    fn path_from_to(&self, s: &CoreState, from: NodeId, to: NodeId) -> Option<Vec<NodeId>> {
5533        if from == to {
5534            return Some(vec![from]);
5535        }
5536        let mut stack: Vec<(NodeId, Vec<NodeId>)> = vec![(from, vec![from])];
5537        let mut visited: HashSet<NodeId> = HashSet::new();
5538        while let Some((cur, path)) = stack.pop() {
5539            if !visited.insert(cur) {
5540                continue;
5541            }
5542            if cur == to {
5543                return Some(path);
5544            }
5545            if let Some(children) = s.children.get(&cur) {
5546                for &child in children {
5547                    let mut new_path = path.clone();
5548                    new_path.push(child);
5549                    stack.push((child, new_path));
5550                }
5551            }
5552        }
5553        None
5554    }
5555}
5556
5557// CoreState helpers — kept on the inner struct so they're naturally scoped
5558// to the lock guard.
5559impl CoreState {
5560    fn alloc_node_id(&mut self) -> NodeId {
5561        let id = NodeId::new(self.next_node_id);
5562        self.next_node_id += 1;
5563        id
5564    }
5565
5566    fn alloc_sub_id(&mut self) -> SubscriptionId {
5567        let id = SubscriptionId(self.next_subscription_id);
5568        self.next_subscription_id += 1;
5569        id
5570    }
5571
5572    /// Clear wave-scoped flags and rotate per-dep batch data on every
5573    /// node. Run at the end of every wave (regular drain via `run_wave`,
5574    /// activation drain via `activate_derived`, and `BatchGuard::drop`'s
5575    /// drain). Centralized so a future wave-state field can't be missed
5576    /// at one of the cleanup sites.
5577    ///
5578    /// Per-dep rotation (R2.9.b / R1.3.6.b):
5579    /// - `prev_data` ← last element of `data_batch` (or unchanged if empty).
5580    ///   The last batch entry's retain transfers to `prev_data`; the old
5581    ///   `prev_data`'s retain is released. All earlier batch entries are
5582    ///   released.
5583    /// - `data_batch` cleared.
5584    /// - Per-dep `dirty` and `involved_this_wave` cleared.
5585    ///
5586    /// Handle releases are pushed to `deferred_handle_releases` for
5587    /// post-lock-drop release by the caller.
5588    pub(crate) fn clear_wave_state(&mut self, ws: &mut crate::batch::WaveState) {
5589        // Q-beyond Sub-slice 1 (D108, 2026-05-09): `pending_auto_resolve`
5590        // + `pending_pause_overflow` clears moved to
5591        // [`crate::batch::WaveState::clear_wave_state`]. The per-NodeRecord
5592        // rotation below pushes batch-handle and prev_data releases into
5593        // `ws.deferred_handle_releases` (was `cps.deferred_handle_releases`
5594        // pre-sub-slice-1, was `s.deferred_handle_releases` pre-Q2). Caller
5595        // borrows the WaveState thread_local; no lock-discipline rule
5596        // applies (state lock + thread_local borrow are independent).
5597        //
5598        // Q-beyond Sub-slice 3 (D108, 2026-05-09):
5599        // `invalidate_hooks_fired_this_wave` clear moved to
5600        // [`crate::batch::WaveState::clear_wave_state`]. The
5601        // `deferred_cleanup_hooks` invariant (NOT cleared here, drained
5602        // explicitly on success/panic paths) likewise moves with the
5603        // field.
5604        //
5605        // /qa F2 reverted (2026-05-10): `currently_firing` stays on
5606        // CoreState (per-Core, cross-thread visible — load-bearing for
5607        // P13). Defensive clear here mirrors the pre-sub-slice-3 safety
5608        // net (`FiringGuard`'s RAII push/pop is balanced even on panic;
5609        // a future code path that bypasses the guard would otherwise
5610        // leak a stale entry into the next wave).
5611        self.currently_firing.clear();
5612        //
5613        // Slice G tier3 emit tracking moved to per-partition state (Q3,
5614        // 2026-05-09); cleared by [`super::WaveOwnerGuard::drop`] on
5615        // outermost release for each partition the wave touched.
5616        for rec in self.nodes.values_mut() {
5617            rec.dirty = false;
5618            rec.involved_this_wave = false;
5619            // §10.3 perf (Slice V1): clear involved_mask in one op.
5620            rec.involved_mask = 0;
5621            for dr in &mut rec.dep_records {
5622                let batch_len = dr.data_batch.len();
5623                if batch_len > 0 {
5624                    // Release all batch entries EXCEPT the last — the last
5625                    // entry's retain transfers to prev_data.
5626                    for &h in &dr.data_batch[..batch_len - 1] {
5627                        ws.deferred_handle_releases.push(h);
5628                    }
5629                    // Release the OLD prev_data (its retain was from the
5630                    // previous wave's rotation or from initial delivery).
5631                    if dr.prev_data != NO_HANDLE {
5632                        ws.deferred_handle_releases.push(dr.prev_data);
5633                    }
5634                    // Rotate: last batch entry becomes new prev_data.
5635                    // Its retain carries over — no extra retain needed.
5636                    dr.prev_data = dr.data_batch[batch_len - 1];
5637                    dr.data_batch.clear();
5638                }
5639                dr.dirty = false;
5640                dr.involved_this_wave = false;
5641            }
5642        }
5643    }
5644
5645    pub(crate) fn require_node(&self, id: NodeId) -> &NodeRecord {
5646        self.nodes
5647            .get(&id)
5648            .unwrap_or_else(|| panic!("unknown node {id:?}"))
5649    }
5650
5651    pub(crate) fn require_node_mut(&mut self, id: NodeId) -> &mut NodeRecord {
5652        self.nodes
5653            .get_mut(&id)
5654            .unwrap_or_else(|| panic!("unknown node {id:?}"))
5655    }
5656}
5657
5658/// Release every binding-side refcount share owned by this `CoreState`
5659/// when the last `Core` clone drops the inner Mutex.
5660///
5661/// Without this, every retained handle in `cache` / `terminal` Error /
5662/// `dep_terminals` Error / pause-buffer-payload would leak in the binding
5663/// registry until process exit. Production bindings (napi-rs, pyo3,
5664/// wasm-bindgen) all maintain handle-ref maps that grow unbounded without
5665/// this cleanup.
5666///
5667/// Safe to call during panic unwinding — `BindingBoundary::release_handle`
5668/// is the only call, and a panicking binding during cleanup would already
5669/// have been a problem in normal operation.
5670impl Drop for CoreState {
5671    fn drop(&mut self) {
5672        // Q-beyond Sub-slice 3 (D108, 2026-05-09): `deferred_flush_jobs`
5673        // moved to [`crate::batch::WaveState`]. The `Vec<Sink>` clones
5674        // drop naturally with the per-thread WaveState's lifetime; no
5675        // CoreState-side cleanup needed.
5676        // Q-beyond Sub-slice 1 (D108, 2026-05-09): `deferred_handle_releases`
5677        // and `wave_cache_snapshots` moved to per-thread WaveState
5678        // thread_local. By outermost-BatchGuard-drop discipline both fields
5679        // are empty by the time CoreState drops (BatchGuard owns a Core
5680        // clone, so Core can't drop while a BatchGuard is in flight). Any
5681        // thread that ran a wave on this Core drained on its own outermost
5682        // BatchGuard; cross-Core thread_local sharing is fine because each
5683        // wave drains its own retains.
5684        //
5685        // Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_fires` and
5686        // `pending_notify` likewise moved to per-thread WaveState. The
5687        // pre-Sub-slice-2 `pending_notify` walk here (drain + release each
5688        // payload_handle) is no longer reachable from `Drop for CoreState`:
5689        // by invariant, no wave is in flight when CoreState drops (BatchGuard
5690        // holds a Core clone), so the originating thread's WaveState
5691        // pending_notify is empty by then. Other threads' WaveStates are
5692        // unreachable from CoreState::drop anyway — they're per-thread
5693        // thread_locals scoped to whichever thread ran the wave. The
5694        // outermost `BatchGuard::drop` is the canonical drain point on both
5695        // success and panic paths; Drop for CoreState relies on that
5696        // discipline holding rather than re-implementing it.
5697
5698        // Per-node retained handles:
5699        //   - `cache` (1 retain per non-NO_HANDLE state cache or
5700        //     populated compute cache).
5701        //   - `terminal == Some(Error(h))` (1 retain on the terminal slot).
5702        //   - `dep_terminals[i] == Some(Error(h))` (1 retain per consumer's
5703        //     terminated-dep slot).
5704        //   - `pause_state` paused buffer messages with payload handles
5705        //     (1 retain per buffered Data/Error).
5706        for rec in self.nodes.values_mut() {
5707            if rec.cache != NO_HANDLE {
5708                self.binding.release_handle(rec.cache);
5709            }
5710            if let Some(TerminalKind::Error(h)) = rec.terminal {
5711                self.binding.release_handle(h);
5712            }
5713            for dr in &rec.dep_records {
5714                if let Some(TerminalKind::Error(h)) = dr.terminal {
5715                    self.binding.release_handle(h);
5716                }
5717                // Release data_batch retains (in-flight wave data).
5718                for &h in &dr.data_batch {
5719                    self.binding.release_handle(h);
5720                }
5721                // Release prev_data retain (cross-wave persistence).
5722                if dr.prev_data != NO_HANDLE {
5723                    self.binding.release_handle(dr.prev_data);
5724                }
5725            }
5726            if let PauseState::Paused { buffer, .. } = &rec.pause_state {
5727                for msg in buffer {
5728                    if let Some(h) = msg.payload_handle() {
5729                        self.binding.release_handle(h);
5730                    }
5731                }
5732            }
5733            // Slice E1: release replay-buffer retains.
5734            for &h in &rec.replay_buffer {
5735                self.binding.release_handle(h);
5736            }
5737            // Operator scratch (Slice C-3, D026): generic per-operator
5738            // state struct. Each variant's release_handles releases the
5739            // shares it owns (Scan/Reduce acc, Distinct/Pairwise prev,
5740            // Last latest + default; Take/Skip/TakeWhile own no handles).
5741            if let Some(scratch) = rec.op_scratch.as_mut() {
5742                scratch.release_handles(&*self.binding);
5743            }
5744        }
5745
5746        // D-α (D028 full close, 2026-05-10): drain the
5747        // `pending_scratch_release` queue (Phase G of
5748        // `Subscription::Drop` pushes old operator-scratch boxes here
5749        // on resubscribable + non-terminal deactivate). Catch-all for
5750        // Core shutdown — anything still queued never made it through
5751        // a `reset_for_fresh_lifecycle` drain. Release happens BEFORE
5752        // the queue's `Vec<Box<dyn OperatorScratch>>` drops, because
5753        // each box's `Drop` impl is a plain `mem::drop` of the state
5754        // struct fields (HandleIds are raw u64s; the boxes don't
5755        // re-enter the binding on drop). Without this explicit drain
5756        // the binding-side refcount is left bumped on Core shutdown.
5757        let queued: Vec<Box<dyn crate::op_state::OperatorScratch>> =
5758            std::mem::take(&mut self.pending_scratch_release);
5759        for mut scratch in queued {
5760            scratch.release_handles(&*self.binding);
5761        }
5762
5763        // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): WaveState's
5764        // retain-holding fields (`wave_cache_snapshots`,
5765        // `deferred_handle_releases`, `pending_notify`) are drained by
5766        // outermost BatchGuard::drop (success + panic paths). See
5767        // comment above for the invariant.
5768    }
5769}