Skip to main content

graphrefly_core/
node.rs

1//! The dispatcher — node registration, subscription, wave engine.
2//!
3//! Mirrors `~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`
4//! (the Phase 13.6 brainstorm prototype, ~370 lines, 22 invariant tests).
5//!
6//! # Scope (M1 dispatcher + Slice A+B parity, closed 2026-05-05)
7//!
8//! - State + derived + dynamic node registration.
9//! - Subscribe / unsubscribe with push-on-subscribe (R1.2.3).
10//! - RAII [`Subscription`] with Drop-based deregister (§10.12).
11//! - DIRTY → DATA / RESOLVED ordering (R1.3.1.b two-phase push).
12//! - Equals-substitution (R1.3.2): identity is zero-FFI; custom crosses boundary.
13//! - First-run gate (R2.5.3) — fn does not fire until every dep has a handle.
14//! - Diamond resolution — one fn fire per wave even with shared upstream.
15//! - `set_deps()` atomic dep mutation with cycle detection + Phase 13.8 Q1
16//!   terminal-rejection policy (R3.3.1).
17//! - PAUSE / RESUME with lockId set + replay buffer (R1.2.6, R2.6, §10.2).
18//! - INVALIDATE broadcast + cascade with R1.4 idempotency.
19//! - COMPLETE / ERROR cascade + Lock 2.B auto-cascade gating
20//!   (ERROR dominates COMPLETE; first error wins).
21//! - TEARDOWN auto-precedes COMPLETE (R2.6.4 / Lock 6.F) +
22//!   `has_received_teardown` idempotency.
23//! - Meta TEARDOWN ordering (R1.3.9.d) — companions tear down before parent.
24//! - Resubscribable terminal lifecycle (R2.2.7, R2.5.3) — late subscribe to a
25//!   resubscribable terminal node resets lifecycle, except after TEARDOWN
26//!   (per F3 audit guard: TEARDOWN is permanent).
27//!
28//! # Module split (Slice C-1, 2026-05-05)
29//!
30//! Wave-engine internals (drain loop, fire selection, emission commit, sink
31//! dispatch) live in [`crate::batch`]. The split is purely organizational —
32//! the methods are still on `Core`. See `batch.rs` for the wave-engine
33//! entry points (`run_wave`, `drain_and_flush`, `commit_emission`,
34//! `queue_notify`, `deliver_data_to_consumer`).
35//!
36//! # Out of scope (later slices / milestones)
37//!
38//! - Deactivation cleanup (RAM nodes clear cache when sink count → 0) — M2.
39//!
40//! See [`migration-status.md`](../../../docs/migration-status.md) for the
41//! milestone tracker and [`porting-deferred.md`](../../../docs/porting-deferred.md)
42//! for surfaced concerns deferred to evidence-driven slices.
43//!
44//! # Re-entrance discipline (Slice A close, M1: fully lock-released)
45//!
46//! - **Wave-end sink fires** drop the state lock first. A subscriber's sink
47//!   that calls back into `Core::emit` / `pause` / `resume` / `invalidate` /
48//!   `complete` / `error` / `teardown` re-acquires the lock cleanly and runs
49//!   a nested wave (`s.in_tick` is cleared before the deferred-fire phase).
50//! - **`BindingBoundary::invoke_fn`** fires lock-released. The wave engine
51//!   acquires + drops the state lock per fn-fire iteration around the
52//!   `invoke_fn` callback. User fns may re-enter `Core::emit` / `pause` /
53//!   etc. and run a nested wave.
54//! - **`BindingBoundary::custom_equals`** fires lock-released.
55//!   `commit_emission` brackets the equals check around a lock release;
56//!   custom equals oracles may re-enter Core safely.
57//! - **Subscribe-time handshake** also fires lock-released. [`Core::subscribe`]
58//!   acquires the [`Core::wave_owner`] re-entrant mutex first (cross-thread
59//!   serialization), installs the sink under the state lock, drops the state
60//!   lock, then fires the per-tier handshake (`[Start]` / `[Data(cache)]?` /
61//!   `[Complete]?` / `[Error(h)]?` / `[Teardown]?` per R1.3.5.a) lock-released.
62//!   A handshake-time sink callback may re-enter Core (`emit` / `complete` /
63//!   `error` / `subscribe`); same-thread re-entry passes through `wave_owner`
64//!   transparently. Cross-thread emits block on `wave_owner` until the
65//!   subscribe path drops it, preserving R1.3.5.a happens-after ordering.
66
67use std::collections::VecDeque;
68use std::panic::{catch_unwind, AssertUnwindSafe};
69use std::sync::{Arc, Weak};
70
71use ahash::{AHashMap as HashMap, AHashSet as HashSet};
72use indexmap::IndexMap;
73use parking_lot::{ArcReentrantMutexGuard, Mutex, MutexGuard};
74
75/// Held guard from `parking_lot::ReentrantMutex::lock_arc()` on a
76/// partition's `wave_owner`. `!Send` per `parking_lot::ReentrantMutex`'s
77/// thread-affinity contract (the inner guard is `!Send`; the wrapper
78/// inherits) — the type-level `!Send` flows into
79/// [`crate::batch::BatchGuard::_wave_guards`] so any attempt to send
80/// the batch guard across threads fails to compile.
81///
82/// **Phase H+ option (d) limited variant (2026-05-09):** the guard's
83/// [`Drop`] pops `sid` from the [`held_partitions`] thread-local so
84/// the ascending-order check on the next acquire sees the correct
85/// "currently held" set. Re-entrant acquires (same thread, same
86/// partition) increment a refcount in the thread-local; final drop
87/// removes the entry.
88///
89/// Slice Y1 / Phase E (2026-05-08); Phase H+ wrapper (2026-05-09).
90pub(crate) struct WaveOwnerGuard {
91    /// Drop order: the wrapper's Drop runs FIRST (pops `held_partitions`),
92    /// then `inner` drops automatically (releases the parking_lot lock).
93    /// Field-declaration order matters in Rust: the wrapper drops top-
94    /// down by default, so `inner` is listed AFTER `sid` so the wrapper's
95    /// custom Drop runs on the whole struct first, releasing the
96    /// thread-local entry under our control before the inner guard
97    /// hits parking_lot's release path.
98    sid: crate::subgraph::SubgraphId,
99    /// `#[allow(dead_code)]`: the inner guard is held to keep the
100    /// parking_lot::ReentrantMutex acquired for the wave's duration;
101    /// it's never read, only its `Drop` matters.
102    #[allow(dead_code)]
103    inner: ArcReentrantMutexGuard<parking_lot::RawMutex, parking_lot::RawThreadId, ()>,
104}
105
106impl Drop for WaveOwnerGuard {
107    fn drop(&mut self) {
108        // `held_partitions::release` returns `bool was_outermost`. The
109        // outermost-release signal is consumed by [`crate::batch::BatchGuard::drop`]
110        // for the per-thread `TIER3_EMITTED_THIS_WAVE` clear (D1 patch,
111        // 2026-05-09 — Slice G coalescing tracker is keyed by thread,
112        // not by partition, so the clear lives on `BatchGuard` not here).
113        // We discard the bool — no per-guard cleanup remains.
114        let _ = held_partitions::release(self.sid);
115        // `inner` drops automatically after this — releases the
116        // parking_lot::ReentrantMutex (decrementing parking_lot's
117        // own internal re-entry counter; only the FINAL release
118        // unparks waiters).
119    }
120}
121
122/// Phase H+ option (d) limited variant — thread-local infrastructure
123/// for cross-partition acquire-during-fire / cross-partition
124/// acquire-during-wave deadlock detection (extended /qa N1(a)
125/// 2026-05-09 to cover sink callbacks + any nested in-wave acquire).
126///
127/// **Protocol invariant enforced:** whenever this thread already
128/// holds at least one partition wave_owner (HELD non-empty), every
129/// NEW partition this thread tries to acquire must have an id
130/// strictly greater than every partition currently held. Re-entrant
131/// acquires (same thread, same partition that's already held) bypass
132/// the check (they're fine — `parking_lot::ReentrantMutex` allows
133/// same-thread re-entry transparently).
134///
135/// Without this check, two threads each doing nested cross-partition
136/// acquires within an active wave could form an AB/BA cycle: thread A
137/// holds X, attempts Y (Y < X); concurrently thread B holds Y,
138/// attempts X (X > Y, ascending-OK from B's POV). A's acquire on Y
139/// blocks behind B; B's acquire on X blocks behind A. Cycle.
140///
141/// **Single carve-out — producer build/project closures:** producer-
142/// pattern operator activation (`zip` / `concat` / `race` /
143/// `take_until` / `switch_map` / `exhaust_map` / `concat_map` /
144/// `merge_map`) runs build closures inside `BindingBoundary::invoke_fn`
145/// that legitimately subscribe to upstream sources cross-partition.
146/// Refactoring those operators to defer their inner subscribes to
147/// wave-end is the broader "Phase H+ STRICT variant" carry-forward
148/// (see `docs/porting-deferred.md`). For the limited variant, the
149/// thread-local [`IN_PRODUCER_BUILD`] refcount is incremented when a
150/// producer node enters its FiringGuard scope and the H+ check is
151/// suppressed for the duration. All other call paths (derived /
152/// dynamic user fns, sink callbacks, subscribe handshakes, drop
153/// cleanup) DO see the check.
154///
155/// **What this widening (/qa N1(a) 2026-05-09) catches that the
156/// original `fire_depth > 0` gate did NOT:**
157/// - Sink callbacks fired by `flush_notifications` lock-released
158///   AFTER drain — the outer wave's wave_owner is still held but
159///   no FiringGuard is on the stack. A sink that calls
160///   `core.subscribe(node_in_lower_partition, sink)` would have
161///   bypassed the original gate; now caught.
162/// - Subscribe-handshake-time cross-partition operations (similar
163///   shape — the handshake fires lock-released).
164/// - Any future re-entry path that doesn't go through `FiringGuard`
165///   (e.g., `Subscription::Drop` cleanup paths, R3.7-style graph
166///   destroy cascades).
167mod held_partitions {
168    use crate::subgraph::SubgraphId;
169    use smallvec::SmallVec;
170    use std::cell::RefCell;
171
172    /// Inline-storage capacity for the per-thread held-partitions set.
173    /// 4 mirrors the same inline limit used elsewhere in the codebase
174    /// (`compute_touched_partitions` returns `SmallVec<[SubgraphId; 4]>`,
175    /// `BatchGuard::_wave_guards` is `SmallVec<[WaveOwnerGuard; 4]>`).
176    /// In the typical wave a single thread holds 1–3 partitions; spillover
177    /// to the heap costs allocation but is correct.
178    const HELD_INLINE: usize = 4;
179
180    thread_local! {
181        /// Currently-held partitions on this thread, as `(SubgraphId, refcount)`
182        /// pairs in arbitrary order. The refcount mirrors
183        /// `parking_lot::ReentrantMutex`'s internal counter (we can't query
184        /// parking_lot's directly).
185        ///
186        /// `SmallVec<[_; HELD_INLINE]>` over `BTreeMap<_, _>`: under the
187        /// expected workload (≤4 partitions held simultaneously per wave) the
188        /// inline-storage SmallVec keeps the entire set in stack memory with
189        /// no allocation, no Box-per-node, and contiguous cache layout. Linear
190        /// scans are branch-predictable and faster than BTreeMap's logn
191        /// pointer-chasing for tiny N. Phase J post-widening bench
192        /// (`migration-status.md` 2026-05-09) reported 14–25% overhead vs the
193        /// pre-widening baseline, attributed in part to BTreeMap allocation
194        /// costs on the hot path. This swap recovers part of that overhead.
195        ///
196        /// Bookkeeping is unconditional — every acquire bumps the
197        /// refcount, every release decrements. The CHECK gate
198        /// (`!held.is_empty() && !in_producer_build()`) is what
199        /// distinguishes "first-time acquire on a fresh thread"
200        /// (allowed, held empty) from "nested acquire while we
201        /// already hold something" (must be ascending).
202        static HELD: RefCell<SmallVec<[(SubgraphId, u32); HELD_INLINE]>>
203            = const { RefCell::new(SmallVec::new_const()) };
204
205        /// Per-thread "we're inside a producer build/project closure"
206        /// refcount. Producer-pattern operator activation increments
207        /// this on `FiringGuard::new`; their build closures call
208        /// cross-partition `Core::subscribe` legitimately during
209        /// activation (operator-internal setup, not user-fn re-entry).
210        /// The H+ check is suppressed while this counter is non-zero.
211        ///
212        /// Tracked as a refcount (not a bool) because nested producer
213        /// activations are theoretically possible (e.g., a producer
214        /// whose build closure subscribes to ANOTHER producer that
215        /// also activates) and must balance correctly. The increment
216        /// uses `checked_add(1)` so an unbounded recursion (a real
217        /// bug — protocol cascades are bounded by
218        /// `MAX_BATCH_DRAIN_ITERATIONS`) panics instead of silently
219        /// saturating and disabling the check.
220        ///
221        /// /qa N1(a) (2026-05-09): replaced the original `FIRE_DEPTH`
222        /// thread-local. The previous design gated the H+ check on
223        /// fire_depth > 0; this restricted coverage to the
224        /// FiringGuard-wrapped invoke_fn scope. The new design
225        /// inverts: gate on `held non-empty AND !in_producer_build`.
226        /// Catches sink-callback / handshake / drop-cleanup paths
227        /// the previous gate missed.
228        static IN_PRODUCER_BUILD: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
229    }
230
231    /// Increment the producer-build refcount. Called from
232    /// `FiringGuard::new` ONLY for `is_producer()` nodes, AND from
233    /// `ProducerCtx::subscribe_to`'s wrapped sink (so producer-
234    /// internal sink callbacks also suppress the H+ check).
235    ///
236    /// # Panics
237    ///
238    /// Panics if the per-thread refcount would overflow `u32::MAX`.
239    /// This indicates an unbounded recursion through producer build /
240    /// sink dispatch — a real bug. Safer to surface than to silently
241    /// saturate and disable the check or produce inverted Drop
242    /// semantics. The protocol's `MAX_BATCH_DRAIN_ITERATIONS` cap
243    /// makes this overflow unreachable in practice.
244    pub fn producer_build_enter() {
245        IN_PRODUCER_BUILD.with(|c| {
246            let next = c.get().checked_add(1).expect(
247                "in_producer_build refcount overflow — unbounded \
248                 producer-build re-entrance. Should be bounded by the \
249                 protocol's MAX_BATCH_DRAIN_ITERATIONS cap.",
250            );
251            c.set(next);
252        });
253    }
254
255    /// Decrement the producer-build refcount. Called from
256    /// `FiringGuard::drop` ONLY for guards that incremented in `new`,
257    /// AND from `ProducerSinkGuard::drop` in the producer-sink
258    /// wrapper. Saturates on underflow (would indicate Drop without
259    /// matching `new` — recovery via no-op is safer than panicking
260    /// in Drop).
261    pub fn producer_build_exit() {
262        IN_PRODUCER_BUILD.with(|c| c.set(c.get().saturating_sub(1)));
263    }
264
265    /// Currently inside a producer build/project closure on this thread?
266    fn in_producer_build() -> bool {
267        IN_PRODUCER_BUILD.with(|c| c.get() > 0)
268    }
269
270    /// Phase H+ check + bookkeeping. Called BEFORE acquiring the
271    /// partition's parking_lot::ReentrantMutex.
272    ///
273    /// Panics with a clear diagnostic if:
274    /// - HELD is non-empty (this thread already holds ≥1 partition),
275    /// - AND we're NOT inside a producer build closure,
276    /// - AND `sid` is NOT already held by this thread,
277    /// - AND `sid <= max(currently held)`.
278    ///
279    /// Otherwise: increments the refcount for `sid` (creating the
280    /// entry if needed) and returns. The caller MUST pair every
281    /// call with a [`release`] when the guard drops.
282    ///
283    /// **Important note on cross-thread vs same-thread:** this check
284    /// is a SAME-THREAD invariant — it catches a thread acquiring
285    /// out of order from itself. Cross-thread AB/BA cycles between
286    /// threads with disjoint same-thread acquisition orders are
287    /// prevented at a different layer (the
288    /// `compute_touched_partitions` upfront-acquire-all-ascending
289    /// rule in `Core::begin_batch_for`). This thread-local check
290    /// adds the layer that prevents a same-thread descending acquire
291    /// from creating the FIRST half of a cross-thread cycle.
292    pub(crate) fn check_and_acquire(sid: SubgraphId) {
293        HELD.with(|h| {
294            let mut held = h.borrow_mut();
295            // Gate: held non-empty (we're nested) AND not in producer
296            // build/sink (the v1 carve-out for operator activation +
297            // producer-internal sink callbacks). First-time acquires
298            // on a fresh thread (held empty) skip the check — there's
299            // nothing to compare against.
300            let already_held = held.iter().any(|(s, _)| *s == sid);
301            if !held.is_empty() && !in_producer_build() && !already_held {
302                // Linear-scan max over the inline storage (typical N ≤ 4).
303                // Branch-predictable and cache-local; no allocation.
304                if let Some(max_held) = held.iter().map(|(s, _)| *s).max() {
305                    if sid <= max_held {
306                        // Drop the borrow before panicking so unwind
307                        // doesn't see a still-borrowed RefCell.
308                        let new_id = sid;
309                        drop(held);
310                        panic!(
311                            "Phase H+ ascending-order violation: thread tried \
312                             to acquire partition {new_id:?} while already \
313                             holding partition {max_held:?}. \
314                             The same-thread cross-partition lock-acquisition \
315                             protocol requires every NEW partition acquired \
316                             while ANY partition is already held to have id \
317                             strictly greater than every already-held \
318                             partition; otherwise two threads doing \
319                             reciprocal acquires can form an AB/BA deadlock.\n\
320                             \n\
321                             Note: this check is per-thread. A cross-thread \
322                             AB/BA cycle between threads each obeying \
323                             ascending order at the per-thread level is \
324                             prevented at a different layer — the \
325                             `compute_touched_partitions` upfront-acquire-\
326                             all-ascending rule in `Core::begin_batch_for`.\n\
327                             \n\
328                             Common triggers (see docs/porting-deferred.md \
329                             'Cross-partition acquire-during-fire deadlock'):\n\
330                             - A user fn (derived / dynamic) that calls \
331                             `Core::emit` / `complete` / `error` / `teardown` \
332                             / `invalidate` mid-fire on a node in a partition \
333                             with a smaller id than the firing node's.\n\
334                             - A sink callback that calls `Core::subscribe` \
335                             on a node in a smaller-id partition while the \
336                             outer wave's wave_owner is still held.\n\
337                             - A subscribe handshake (or Drop cleanup) that \
338                             re-enters Core on a smaller-id partition.\n\
339                             \n\
340                             Fix: schedule the cross-partition operation \
341                             OUTSIDE the wave (e.g., via a deferred queue \
342                             applied at wave end) so the acquire happens at \
343                             top-level rather than nested under a held \
344                             partition; OR declare the cross-partition \
345                             reachability upfront via `add_meta_companion` \
346                             so the wave acquires both partitions ascending \
347                             at top-level and the inner re-entry becomes a \
348                             re-entrant acquire on a held partition."
349                        );
350                    }
351                }
352            }
353            // Bookkeeping: increment refcount. `checked_add(1)` so
354            // overflow surfaces (would indicate an unbounded
355            // re-entrance — a real bug). Linear find-or-push.
356            if let Some((_, count)) = held.iter_mut().find(|(s, _)| *s == sid) {
357                *count = count.checked_add(1).expect(
358                    "held_partitions refcount overflow — unbounded \
359                     same-partition re-entrance. Should be bounded by the \
360                     protocol's MAX_BATCH_DRAIN_ITERATIONS cap.",
361                );
362            } else {
363                held.push((sid, 1));
364            }
365        });
366    }
367
368    /// Decrement the refcount for `sid`; remove the entry if it
369    /// hits zero. Called from [`super::WaveOwnerGuard::drop`] AND
370    /// from the retry / panic paths in
371    /// [`super::Core::partition_wave_owner_lock_arc`] to ensure the
372    /// refcount stays balanced under all unwind / retry / exhaust
373    /// paths.
374    ///
375    /// Returns `true` iff this release brought the partition's
376    /// refcount on this thread to zero — i.e. this was the OUTERMOST
377    /// guard for `sid` on this thread. [`super::WaveOwnerGuard::drop`]
378    /// uses this signal to clear per-partition wave state (Q3) on
379    /// outermost release only; inner re-entrant guard drops must NOT
380    /// clear (a containing wave is still active and still holds
381    /// in-flight wave state). The `partition_wave_owner_lock_arc`
382    /// retry / panic paths ignore the return value because the
383    /// partition state hadn't been touched yet on those paths
384    /// (clearing would be a no-op anyway).
385    pub(crate) fn release(sid: SubgraphId) -> bool {
386        HELD.with(|h| {
387            let mut held = h.borrow_mut();
388            if let Some(idx) = held.iter().position(|(s, _)| *s == sid) {
389                let count = &mut held[idx].1;
390                // /qa A3 fix (2026-05-09): debug_assert the refcount is
391                // non-zero before decrement. A `release(sid)` on an
392                // entry with `count == 0` indicates a bookkeeping bug
393                // (a release without a matching `check_and_acquire`,
394                // or a logic error in caller), but the legacy
395                // saturating_sub silently returned `was_outermost=true`
396                // and removed the entry — masking the bug. Surface
397                // in dev/test builds; release builds preserve the
398                // saturating behavior.
399                debug_assert!(
400                    *count > 0,
401                    "held_partitions::release({sid:?}): refcount underflow — \
402                     release without matching check_and_acquire (caller bug)"
403                );
404                *count = count.saturating_sub(1);
405                if *count == 0 {
406                    // `swap_remove` is O(1) and order-irrelevant: the
407                    // CHECK gate computes max via linear scan and does
408                    // not depend on iteration order.
409                    held.swap_remove(idx);
410                    return true;
411                }
412            } else {
413                // /qa A3 fix (2026-05-09): same intent — release on a
414                // sid that's not in HELD is a bookkeeping bug. Surface
415                // in dev/test builds.
416                debug_assert!(
417                    false,
418                    "held_partitions::release({sid:?}): sid not in HELD — \
419                     double-drop or stray release (caller bug)"
420                );
421            }
422            false
423        })
424    }
425
426    /// Test-only: read the current thread's held-partitions snapshot.
427    /// Used by post-panic regression assertions to verify the
428    /// thread-local stays clean even when the H+ check unwinds the
429    /// stack (so cargo's thread-reuse doesn't propagate corrupted
430    /// state to subsequent tests). `pub` (gated by
431    /// `cfg(any(test, debug_assertions))`) so integration tests
432    /// outside the crate can read it.
433    #[cfg(any(test, debug_assertions))]
434    #[must_use]
435    pub fn held_snapshot_for_tests() -> Vec<(SubgraphId, u32)> {
436        // /qa A2 fix (2026-05-09): sort by SubgraphId so the snapshot
437        // returns ascending order — matches the BTreeMap-iteration
438        // contract that pre-/qa SmallVec swap consumers might rely on.
439        // Test consumers currently only assert `is_empty()`, but the
440        // ordered shape is the safer default for future tests that
441        // assert specific entries.
442        let mut v: Vec<(SubgraphId, u32)> = HELD.with(|h| h.borrow().to_vec());
443        v.sort_unstable_by_key(|(s, _)| *s);
444        v
445    }
446
447    /// Test-only: read the current thread's producer-build refcount.
448    /// Companion to [`held_snapshot_for_tests`] for verifying the
449    /// thread-local stays clean post-panic.
450    #[cfg(any(test, debug_assertions))]
451    #[must_use]
452    pub fn in_producer_build_for_tests() -> u32 {
453        IN_PRODUCER_BUILD.with(std::cell::Cell::get)
454    }
455}
456
457/// `pub` re-exports for the `graphrefly-operators` crate to wrap
458/// producer-internal sinks with the same `IN_PRODUCER_BUILD` flag
459/// the FiringGuard uses (per /qa N1(a) — operator sinks are
460/// operator-internal and SUPPRESS the H+ check, mirroring the
461/// activation-time carve-out). Not part of the v1 stable user API
462/// surface; intended for in-workspace consumers only. Phase H+
463/// STRICT variant (the producer-architecture refactor) will
464/// eliminate the need for this carve-out.
465pub use held_partitions::{producer_build_enter, producer_build_exit};
466
467/// Test-only re-exports for integration tests under
468/// `crates/graphrefly-core/tests/`. Gated `#[cfg(any(test, debug_assertions))]`
469/// so they don't leak into release builds. Public visibility is
470/// required because integration tests live outside the crate.
471#[cfg(any(test, debug_assertions))]
472pub use held_partitions::{held_snapshot_for_tests, in_producer_build_for_tests};
473use smallvec::SmallVec;
474use thiserror::Error;
475
476use crate::batch::PendingPerNode;
477use crate::boundary::{BindingBoundary, CleanupTrigger};
478use crate::clock::monotonic_ns;
479use crate::handle::{FnId, HandleId, LockId, NodeId, NO_HANDLE};
480use crate::message::Message;
481
482/// Terminal-lifecycle state — once set on a node, the node will not emit
483/// further DATA; per-dep slots on consumers also use this to track which
484/// upstreams have terminated (R1.3.4 / Lock 2.B).
485///
486/// `Error` carries a [`HandleId`] resolving to the error value. Refcount is
487/// retained when the variant is stored in a node's `terminal` slot or any
488/// consumer's `dep_terminals` slot; v1 does not release these (terminal
489/// state is one-shot at this layer; release happens on resubscribable
490/// terminal-lifecycle reset, a separate slice).
491#[derive(Copy, Clone, Debug, PartialEq, Eq)]
492pub enum TerminalKind {
493    Complete,
494    Error(HandleId),
495}
496
497/// Node kind discriminant — **derived metadata** computed from
498/// [`NodeRecord`]'s field shape (D030 unification, Slice D).
499///
500/// Core no longer stores `kind` as a field; it's computed on demand from
501/// `(deps.is_empty(), fn_id.is_some(), op.is_some(), is_dynamic)`,
502/// mirroring TS's data model where `NodeImpl` has no `_kind` field. The
503/// shape uniquely identifies the kind:
504///
505/// | deps      | fn_id | op   | is_dynamic | kind     |
506/// |-----------|-------|------|-----------|----------|
507/// | empty     | None  | None | -         | State    |
508/// | empty     | Some  | None | -         | Producer |
509/// | non-empty | Some  | None | false     | Derived  |
510/// | non-empty | Some  | None | true      | Dynamic  |
511/// | non-empty | None  | Some | -         | Operator |
512///
513/// Public API ([`Core::kind_of`]) derives this enum on each call. State
514/// nodes are ROM (cache survives deactivation); compute nodes
515/// (Derived / Dynamic / Operator) and producers are RAM.
516#[derive(Copy, Clone, Eq, PartialEq, Debug)]
517pub enum NodeKind {
518    /// Source node: cache is intrinsic, no fn, no deps. Mutated via [`Core::emit`].
519    State,
520    /// Producer node: fn fires once on first subscribe. No deps;
521    /// emissions arrive via sinks the fn subscribes to (zip / concat /
522    /// race / takeUntil pattern). Slice D / D031.
523    Producer,
524    /// Derived node: fn fires on every dep change; all deps tracked.
525    Derived,
526    /// Dynamic node: fn declares which dep indices it actually read this run.
527    /// Untracked dep updates flow through cache but do NOT re-fire fn.
528    Dynamic,
529    /// Operator node: built-in dispatch path for transform / combine /
530    /// flow / resilience operators. The `OperatorOp` discriminant selects
531    /// the per-operator FFI path ([`BindingBoundary::project_each`] etc.);
532    /// Core manages per-operator state via the generic `op_scratch` slot
533    /// on `NodeRecord` (D026). Per Slice C-1 (D009) / Slice C-3 (D026).
534    Operator(OperatorOp),
535}
536
537impl NodeKind {
538    /// True if this kind opts OUT of Lock 2.B auto-cascade. Operator(Reduce)
539    /// and Operator(Last) must intercept upstream COMPLETE so they can emit
540    /// their accumulator / buffered value before the cascade terminates them;
541    /// instead of cascading, terminate_node queues such children for fn-fire
542    /// so `fire_operator` can handle the terminal.
543    pub(crate) fn skips_auto_cascade(self) -> bool {
544        matches!(
545            self,
546            NodeKind::Operator(OperatorOp::Reduce { .. } | OperatorOp::Last { .. })
547        )
548    }
549}
550
551/// Built-in operator discriminant. Selects the per-operator dispatch path
552/// in `fire_operator` (`crates/graphrefly-core/src/batch.rs`). Each variant
553/// carries the binding-side closure ids (and seed handle for stateful
554/// folders) needed for the wave-execution path; Core stores no user values
555/// itself per the handle-protocol cleaving plane.
556#[derive(Copy, Clone, Eq, PartialEq, Debug)]
557pub enum OperatorOp {
558    /// `map(source, project)` — element-wise transform. Calls
559    /// `BindingBoundary::project_each(fn_id, &inputs)` per fire; emits each
560    /// returned handle via `commit_emission_verbatim` (R1.3.2.d batch
561    /// semantics — no equals substitution between batch entries).
562    Map { fn_id: FnId },
563    /// `filter(source, predicate)` — silent-drop selection (D012/D018).
564    /// Calls `BindingBoundary::predicate_each(fn_id, &inputs)`; emits each
565    /// passing input verbatim. If zero pass on a wave that dirtied the
566    /// node, queues a single `RESOLVED` to settle (D018).
567    Filter { fn_id: FnId },
568    /// `scan(source, fold, seed)` — left-fold emitting each new accumulator.
569    /// `seed` is captured at registration; `acc` lives in
570    /// [`ScanState`](super::op_state::ScanState) inside
571    /// [`NodeRecord::op_scratch`] and persists across waves until
572    /// resubscribable reset. Calls `BindingBoundary::fold_each(fn_id, acc,
573    /// &inputs) -> SmallVec<HandleId>` per fire.
574    Scan { fn_id: FnId, seed: HandleId },
575    /// `reduce(source, fold, seed)` — left-fold emitting once on upstream
576    /// COMPLETE. Accumulates silently while source DATA flows; on
577    /// dep[0].terminal == Some(Complete), emits `[Data(acc), Complete]`.
578    /// On `Error(h)`, propagates the error verbatim. Opts out of Lock 2.B
579    /// auto-cascade (see `NodeKind::skips_auto_cascade`).
580    Reduce { fn_id: FnId, seed: HandleId },
581    /// `distinctUntilChanged(source, equals)` — suppresses adjacent
582    /// duplicates. Calls `BindingBoundary::custom_equals(equals_fn_id,
583    /// prev, current)` per input; emits non-equal items verbatim and
584    /// updates `prev`. If zero items pass on a wave that dirtied the node,
585    /// queues `RESOLVED` (matches Filter discipline).
586    DistinctUntilChanged { equals_fn_id: FnId },
587    /// `pairwise(source)` — emits `(prev, current)` pairs starting after
588    /// the second value. First value swallowed (sets `prev`). Calls
589    /// `BindingBoundary::pairwise_pack(fn_id, prev, current)` per pair to
590    /// produce the binding-side tuple handle.
591    Pairwise { fn_id: FnId },
592
593    // ----- Slice C-2: multi-dep combinators (D020) -----
594    /// `combine(...sources)` — N-dep combineLatest. On any dep fire, packs
595    /// the latest handle per dep into a single tuple handle via
596    /// `BindingBoundary::pack_tuple(pack_fn, &handles)`. First-run gate
597    /// (`partial: false` default) holds until all deps deliver real DATA
598    /// (R2.5.3). COMPLETE cascades when all deps complete (R1.3.4.b).
599    Combine { pack_fn: FnId },
600
601    /// `withLatestFrom(primary, secondary)` — 2-dep, fire-on-primary-only
602    /// (D021, Phase 10.5). Packs `[primary, secondary]` via
603    /// `BindingBoundary::pack_tuple(pack_fn, &handles)` when dep[0]
604    /// (primary) has DATA in the wave. If only dep[1] (secondary) fires,
605    /// settles with RESOLVED (D018 pattern). First-run gate holds until
606    /// both deps deliver (R2.5.3 `partial: false`). Post-warmup INVALIDATE
607    /// guard: if secondary `prev_data == NO_HANDLE` and batch empty after
608    /// warmup, settles with RESOLVED (no stale pair).
609    WithLatestFrom { pack_fn: FnId },
610
611    /// `merge(...sources)` — N-dep, forward all DATA handles verbatim
612    /// (D022). Zero FFI on fire: no transformation, no binding call.
613    /// Each dep's batch handles are retained and emitted individually.
614    /// COMPLETE cascades when all deps complete (R1.3.4.b).
615    Merge,
616
617    // ----- Slice C-3: flow operators (D024) -----
618    /// `take(source, count)` — emits the first `count` DATA values then
619    /// self-completes via `Core::complete`. Tracks `count_emitted` in
620    /// [`TakeState`](super::op_state::TakeState). When upstream completes
621    /// before `count` is reached, the standard auto-cascade propagates
622    /// COMPLETE. `count == 0` is allowed: the first fire emits zero
623    /// items then immediately self-completes (D027).
624    Take { count: u32 },
625
626    /// `skip(source, count)` — drops the first `count` DATA values; once
627    /// the threshold is crossed, subsequent DATAs pass through verbatim.
628    /// Tracks `count_skipped` in [`SkipState`](super::op_state::SkipState).
629    /// On a wave where every input is still in the skip window, queues
630    /// DIRTY+RESOLVED to settle (D018 pattern).
631    Skip { count: u32 },
632
633    /// `takeWhile(source, predicate)` — emits while `predicate(input)`
634    /// holds; on the first `false`, emits any preceding passes then
635    /// self-completes via `Core::complete`. Reuses
636    /// [`BindingBoundary::predicate_each`] (D029); after the first
637    /// `false`, subsequent inputs in the same batch are dropped.
638    TakeWhile { fn_id: FnId },
639
640    /// `last(source)` / `last_with_default(source, default)` — buffers
641    /// the latest DATA; on upstream COMPLETE, emits `Data(latest)` then
642    /// `Complete`. The `default` field is `NO_HANDLE` for the no-default
643    /// factory (emits only `Complete` on empty stream), or a registered
644    /// default handle (emits `Data(default)` + `Complete` on empty
645    /// stream). Storage: [`LastState`](super::op_state::LastState) holds
646    /// `latest` (live buffer) and `default` (registration-time, stable).
647    /// Opts out of Lock 2.B auto-cascade so it can intercept upstream
648    /// COMPLETE.
649    Last { default: HandleId },
650}
651
652/// Registration options for [`Core::register_operator`].
653///
654/// `equals` controls operator output dedup (R5.7 — defaults to identity).
655/// `partial` controls the R2.5.3 first-run gate (R5.4 — operator dispatch
656/// fires on first DATA from any dep when `true`; default `false` matches
657/// the gated derived discipline).
658#[derive(Copy, Clone, Debug)]
659pub struct OperatorOpts {
660    pub equals: EqualsMode,
661    pub partial: bool,
662}
663
664impl Default for OperatorOpts {
665    fn default() -> Self {
666        Self {
667            equals: EqualsMode::Identity,
668            partial: false,
669        }
670    }
671}
672
673/// Closure-form fn id OR typed operator discriminant — the two dispatch
674/// paths a node can use. State / passthrough nodes pass `None` to
675/// [`Core::register`] (no fn at all).
676#[derive(Copy, Clone, Debug)]
677pub enum NodeFnOrOp {
678    /// Closure-form: invokes [`BindingBoundary::invoke_fn`] per fire.
679    /// Used for Derived / Dynamic / Producer.
680    Fn(FnId),
681    /// Typed-op: routes to a `fire_op_*` helper that calls per-operator
682    /// FFI methods (`project_each` / `predicate_each` / `fold_each` /
683    /// `pairwise_pack` / `pack_tuple`). Used for Operator nodes.
684    Op(OperatorOp),
685}
686
687/// Pause behavior mode (canonical-spec §2.6 — three modes shipped in TS;
688/// Slice F audit, 2026-05-07 — closed the Rust port gap).
689///
690/// | Mode | Outgoing tier-3 routing while paused | RESUME behavior |
691/// |---|---|---|
692/// | [`PausableMode::Default`] | suppress fn-fire upstream (no DIRTY emitted) | fire fn ONCE on RESUME if any dep delivered DATA during pause; collapses N pause-window writes into one settle |
693/// | [`PausableMode::ResumeAll`] | buffer outgoing tier-3 / tier-4 messages per-wave | replay each buffered wave verbatim on RESUME |
694/// | [`PausableMode::Off`] | dispatcher ignores PAUSE; tier-3 flushes immediately | no-op (no buffer to drain) |
695///
696/// Default is [`PausableMode::Default`] per canonical §2.6 — every untagged
697/// source picks it up. Memory profile is O(1) per node (no buffer); the
698/// trade-off is "subscribers see one consolidated DATA on RESUME" rather
699/// than the K mid-pause emissions verbatim.
700///
701/// Note: tier-1 (DIRTY) / tier-2 (PAUSE/RESUME) / tier-5 (COMPLETE/ERROR) /
702/// tier-6 (TEARDOWN) bypass pause regardless of mode — they remain
703/// observable so leaked pause-controllers cannot strand subscribers.
704#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
705pub enum PausableMode {
706    /// Suppress fn-fire while paused; fire once on RESUME if any dep
707    /// delivered DATA during the pause window. Canonical default.
708    #[default]
709    Default,
710    /// Buffer outgoing tier-3 / tier-4 messages per-wave; replay on
711    /// RESUME. Use when subscribers need verbatim emit history (e.g. an
712    /// audit log, replay-on-reconnect bridge).
713    ResumeAll,
714    /// Dispatcher ignores PAUSE for this node — tier-3 flushes
715    /// immediately even while a lock is held. Use for nodes whose value
716    /// production is intrinsically pause-immune (telemetry counters,
717    /// monotonic timers).
718    Off,
719}
720
721/// Per-kind opts for [`Core::register`]. Cross-kind config knobs live
722/// here; per-kind specifics (deps, fn_or_op) live on
723/// [`NodeRegistration`].
724#[derive(Copy, Clone, Debug)]
725pub struct NodeOpts {
726    /// Initial cached value. Only valid for state nodes (no deps + no
727    /// fn + no op). [`NO_HANDLE`] starts the node sentinel.
728    pub initial: HandleId,
729    /// Equality mode for outgoing emissions (R1.3.2). Defaults to
730    /// [`EqualsMode::Identity`].
731    pub equals: EqualsMode,
732    /// First-run gate (R2.5.3 / D011). When `true`, the node fires as
733    /// soon as ANY dep delivers a real handle; when `false` (default),
734    /// the node holds until every dep has delivered.
735    pub partial: bool,
736    /// Dynamic flag (R2.5.3) — fn declares actually-tracked dep indices
737    /// per fire. Only meaningful when `fn_or_op == Some(Fn(_))` AND
738    /// deps non-empty.
739    pub is_dynamic: bool,
740    /// Pause behavior mode (canonical §2.6). Default is
741    /// [`PausableMode::Default`]. See [`PausableMode`] for the trade-offs.
742    pub pausable: PausableMode,
743    /// Replay buffer cap (canonical R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
744    /// `None` (default) disables; `Some(N)` keeps a circular buffer of the
745    /// last N DATA emissions and replays them to late subscribers as part
746    /// of the per-tier handshake (between [`Message::Start`] and any
747    /// terminal slice). Only DATA is buffered; RESOLVED entries are NOT
748    /// (R2.6.5 explicit "DATA only").
749    pub replay_buffer: Option<usize>,
750}
751
752impl Default for NodeOpts {
753    fn default() -> Self {
754        Self {
755            initial: NO_HANDLE,
756            equals: EqualsMode::Identity,
757            partial: false,
758            is_dynamic: false,
759            pausable: PausableMode::Default,
760            replay_buffer: None,
761        }
762    }
763}
764
765/// Unified node-registration descriptor (D030, Slice D).
766///
767/// All node kinds (State / Producer / Derived / Dynamic / Operator)
768/// register through [`Core::register`] with a `NodeRegistration`. The
769/// kind is **derived from the field shape** of the registration —
770/// `(deps.is_empty(), fn_or_op variant)`:
771///
772/// | deps      | fn_or_op   | is_dynamic | resulting kind |
773/// |-----------|-----------|-----------|----------------|
774/// | empty     | None      | -         | State          |
775/// | empty     | Some(Fn)  | -         | Producer       |
776/// | non-empty | Some(Fn)  | false     | Derived        |
777/// | non-empty | Some(Fn)  | true      | Dynamic        |
778/// | non-empty | Some(Op)  | -         | Operator       |
779///
780/// The sugar wrappers ([`Core::register_state`], [`Core::register_producer`],
781/// etc.) build a `NodeRegistration` and delegate.
782#[derive(Clone, Debug)]
783pub struct NodeRegistration {
784    /// Upstream deps in declaration order. Empty for state / producer.
785    pub deps: Vec<NodeId>,
786    /// Closure-form fn id or typed-op discriminant. `None` for state /
787    /// passthrough.
788    pub fn_or_op: Option<NodeFnOrOp>,
789    /// Cross-kind config knobs.
790    pub opts: NodeOpts,
791}
792
793/// Equality mode for a node's outgoing emissions.
794///
795/// `Identity` is the default: cache vs. new handle compare is a `u64` equal —
796/// zero FFI. `Custom` invokes [`BindingBoundary::custom_equals`] every check
797/// (R1.3.2.b two-arg call when both sides are non-sentinel).
798#[derive(Copy, Clone, Debug)]
799pub enum EqualsMode {
800    Identity,
801    Custom(FnId),
802}
803
804/// Internal identifier for a single subscription. Allocated per
805/// [`Core::subscribe`] call. Wrapped by [`Subscription`] for the public API;
806/// consumed directly only by Core internals and the [`Subscription::Drop`]
807/// path.
808#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
809pub(crate) struct SubscriptionId(u64);
810
811/// RAII subscription handle.
812///
813/// Returned by [`Core::subscribe`]. While the handle is held, the sink stays
814/// registered against its node. Dropping the handle (explicitly via
815/// `drop(sub)` or implicitly at scope exit) unsubscribes the sink — no manual
816/// `unsubscribe()` call is needed. Per §10.12 of the rust-port session doc.
817///
818/// # Lifetime semantics
819///
820/// The subscription holds a [`Weak`] reference back to the Core's state. If
821/// the Core is dropped before the subscription, the Drop impl is a silent
822/// no-op (the sink has nowhere to deregister from anyway). This avoids a
823/// reference cycle when subscribers capture an `Arc<Core>` in their closure.
824///
825/// # Thread safety
826///
827/// `Send + Sync`. The handle can be moved across threads or dropped from
828/// any thread.
829///
830/// # Not Clone
831///
832/// `Subscription` owns the unsubscribe action exclusively. Cloning would
833/// require either "first drop wins" or "last drop wins" semantics, both
834/// of which surprise. If a binding needs multiple deregistration handles,
835/// it should subscribe multiple times (each producing a fresh handle) or
836/// wrap the single `Subscription` in `Arc<Mutex<Option<Subscription>>>`.
837#[must_use = "dropping a Subscription unsubscribes its sink immediately"]
838pub struct Subscription {
839    state: Weak<Mutex<CoreState>>,
840    node_id: NodeId,
841    sub_id: SubscriptionId,
842}
843
844impl Subscription {
845    /// The node this subscription is attached to.
846    #[must_use]
847    pub fn node_id(&self) -> NodeId {
848        self.node_id
849    }
850}
851
852impl Drop for Subscription {
853    fn drop(&mut self) {
854        // Silent no-op if Core is gone. This keeps Drop infallible (no panics
855        // from a dropped subscription racing a dropped Core) and avoids
856        // surprising users with errors on shutdown.
857        //
858        // Producer deactivation (Slice D, D031): if removing this sub
859        // empties the subscribers map AND the node is a producer, fire
860        // `BindingBoundary::producer_deactivate(node_id)` AFTER releasing
861        // the state lock. The binding then drops its per-node state
862        // (subscriptions to upstream sources, captured closure state),
863        // which transitively unsubs from upstreams via their own
864        // `Subscription::Drop`. Re-entrance into Core from the deactivate
865        // hook is permitted since the lock is released first.
866        let Some(state) = self.state.upgrade() else {
867            return;
868        };
869        // Slice E2 (D056): when the last subscriber drops, fire the
870        // node's OnDeactivation cleanup hook BEFORE producer_deactivate
871        // (cleanup may release handles the producer subscription owns;
872        // reverse order would let producer_deactivate drop subs that user
873        // cleanup expected to be live). Both calls are lock-released per
874        // D045.
875        //
876        // OnDeactivation gating (D068, QA Q3 fix): fires only when the
877        // node has fired its fn at least once AND has a fn (`fn_id`
878        // populated). State nodes have no fn — they cannot register a
879        // cleanup spec via the production fn-return path (R2.4.5), so
880        // firing `cleanup_for` on them is wasted FFI; the binding's
881        // lookup is guaranteed to find no `current_cleanup`. Skipping
882        // here saves the FFI hop and matches the design-doc wording
883        // ("never-fired state nodes" — state-with-initial-value satisfies
884        // `has_fired_once = true` but still has no fn).
885        //
886        // Slice E2 /qa Q2(b) (D069): if the node is a resubscribable
887        // node that's ALREADY terminal (terminate fired BEFORE this last
888        // sub drop), fire `wipe_ctx` lock-released AFTER OnDeactivation
889        // + producer_deactivate. Mutually exclusive with `terminate_node`'s
890        // queue-wipe site: terminate-with-empty-subs goes through
891        // `pending_wipes`; terminate-with-live-subs routes here when
892        // those subs eventually drop. Either path fires exactly one
893        // wipe per terminal lifecycle.
894        let (was_last_sub, is_producer, has_user_cleanup, fire_wipe, binding) = {
895            let mut s = state.lock();
896            let Some(rec) = s.nodes.get_mut(&self.node_id) else {
897                return;
898            };
899            rec.subscribers.remove(&self.sub_id);
900            // Slice X4 / D2: bump revision so any pending_notify entry for
901            // this node opened earlier in the wave starts a fresh batch on
902            // the next queue_notify, dropping the now-departed sink from
903            // the snapshot.
904            rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
905            let last = rec.subscribers.is_empty();
906            let producer = rec.is_producer();
907            // OnDeactivation gate: must have run a fn at least once
908            // (has_fired_once) AND have a fn registered (fn_id.is_some()).
909            // The fn_id check excludes state nodes whose has_fired_once
910            // tracks initial-value status, not "user fn ran."
911            let user_cleanup = rec.has_fired_once && rec.fn_id.is_some();
912            let fire_wipe = last && rec.resubscribable && rec.terminal.is_some();
913            // Clone the binding Arc out only if at least one hook will
914            // fire. Cheap (Arc::clone) in the common path; skipped on
915            // non-last-sub or never-fired non-producer nodes.
916            let binding = if last && (producer || user_cleanup || fire_wipe) {
917                Some(s.binding.clone())
918            } else {
919                None
920            };
921            (last, producer, user_cleanup, fire_wipe, binding)
922        };
923        if was_last_sub {
924            if let Some(binding) = binding {
925                if has_user_cleanup {
926                    binding.cleanup_for(self.node_id, CleanupTrigger::OnDeactivation);
927                }
928                if is_producer {
929                    binding.producer_deactivate(self.node_id);
930                }
931                // D069: eager wipe — fires AFTER OnDeactivation so the
932                // user closure observes pre-wipe `store` (matches the
933                // existing "OnDeactivation runs before wipe on terminal
934                // reset" invariant covered by test 10). Idempotent —
935                // `HashMap::remove` on absent key is a no-op, so even
936                // if the wave already drained `pending_wipes` earlier,
937                // this fire is benign.
938                if fire_wipe {
939                    binding.wipe_ctx(self.node_id);
940                }
941            }
942        }
943    }
944}
945
946// Compile-time assertion that Subscription is Send + Sync. If a future field
947// breaks this, the build fails here rather than downstream at the binding
948// site.
949const _: fn() = || {
950    fn assert_send_sync<T: Send + Sync>() {}
951    assert_send_sync::<Subscription>();
952};
953
954/// A subscriber callback. `Send + Sync` so the Core can fire it from any
955/// thread; `Fn` (not `FnMut`) so multiple references coexist — capture
956/// mutable state in `Mutex<T>` or atomics on the binding side.
957pub type Sink = Arc<dyn Fn(&[Message]) + Send + Sync>;
958
959// ---------------------------------------------------------------------------
960// PAUSE/RESUME state — §10.2 of the rust-port session doc
961// ---------------------------------------------------------------------------
962
963/// Per-node pause state.
964///
965/// Replaces the four TS fields (`_pauseLocks`, `_pauseBuffer`,
966/// `_pauseDroppedCount`, `_pauseStartNs`) with a single enum where
967/// the buffered fields are unreachable in the [`Self::Active`] variant —
968/// the compiler refuses access. Per §10.2 simplification.
969///
970/// # Invariants
971///
972/// - `Active` ⇔ no lockId held.
973/// - `Paused { locks, .. }` ⇔ `!locks.is_empty()`.
974/// - Buffered messages are tier 3 (DATA/RESOLVED) and tier 4 (INVALIDATE)
975///   only. Other tiers pass through immediately even while paused.
976/// - `dropped` counts messages that fell out the front of `buffer` due to
977///   the Core-global `pause_buffer_cap`; it is reported on resume so callers
978///   can detect overflow without re-tracking it externally.
979#[derive(Debug)]
980pub(crate) enum PauseState {
981    Active,
982    Paused {
983        /// Active lock holders. `SmallVec` keeps the common 1–2 lock case
984        /// stack-allocated. Replaces `Set<unknown>` from TS.
985        locks: SmallVec<[LockId; 2]>,
986        /// Buffered tier-3/tier-4 outgoing messages, in arrival order.
987        /// Replayed on the final RESUME.
988        buffer: VecDeque<Message>,
989        /// Count of messages dropped from the front when `buffer.len()` would
990        /// exceed `pause_buffer_cap`. Cleared on final RESUME (next pause
991        /// cycle starts fresh).
992        dropped: u32,
993        /// Wall-clock-monotonic ns when the lock first transitioned this node
994        /// from `Active` to `Paused`. Used by R1.3.8.c overflow ERROR
995        /// synthesis to compute `lock_held_duration_ms` in the diagnostic
996        /// payload (Slice F, A3 — 2026-05-07).
997        started_at_ns: u64,
998        /// True after the first overflow event in this pause cycle has been
999        /// reported via [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
1000        /// Subsequent overflows in the same cycle don't re-emit ERROR
1001        /// (canonical R1.3.8.c: "once per overflow event"). Cleared on
1002        /// final RESUME (next pause cycle starts fresh).
1003        overflow_reported: bool,
1004        /// Default-mode bookkeeping (Slice F audit close, 2026-05-07).
1005        /// Set to `true` when an upstream dep delivery arrives while this
1006        /// node is paused with [`PausableMode::Default`]. On final RESUME,
1007        /// if `true`, the node is added back to `pending_fires` so the fn
1008        /// fires once with the consolidated dep state. Always `false` for
1009        /// `ResumeAll` mode (the buffered messages are the consolidation
1010        /// mechanism there). Cleared on final RESUME.
1011        pending_wave: bool,
1012    },
1013}
1014
1015impl PauseState {
1016    pub(crate) fn is_paused(&self) -> bool {
1017        matches!(self, Self::Paused { .. })
1018    }
1019
1020    fn lock_count(&self) -> usize {
1021        match self {
1022            Self::Active => 0,
1023            Self::Paused { locks, .. } => locks.len(),
1024        }
1025    }
1026
1027    fn contains_lock(&self, lock_id: LockId) -> bool {
1028        match self {
1029            Self::Active => false,
1030            Self::Paused { locks, .. } => locks.contains(&lock_id),
1031        }
1032    }
1033
1034    /// Add a lock; transitions Active → Paused on first lock. Idempotent on
1035    /// duplicate lock_id (matches TS convention; spec is silent on the case).
1036    fn add_lock(&mut self, lock_id: LockId) {
1037        match self {
1038            Self::Active => {
1039                let mut locks = SmallVec::new();
1040                locks.push(lock_id);
1041                *self = Self::Paused {
1042                    locks,
1043                    buffer: VecDeque::new(),
1044                    dropped: 0,
1045                    started_at_ns: monotonic_ns(),
1046                    overflow_reported: false,
1047                    pending_wave: false,
1048                };
1049            }
1050            Self::Paused { locks, .. } => {
1051                if !locks.contains(&lock_id) {
1052                    locks.push(lock_id);
1053                }
1054            }
1055        }
1056    }
1057
1058    /// Mark that an upstream dep delivered DATA to a node paused with
1059    /// [`PausableMode::Default`]. The node will re-enter `pending_fires`
1060    /// on final RESUME via [`Self::take_pending_wave`].
1061    pub(crate) fn mark_pending_wave(&mut self) {
1062        if let Self::Paused { pending_wave, .. } = self {
1063            *pending_wave = true;
1064        }
1065    }
1066
1067    /// Read and clear the `pending_wave` flag. Called from
1068    /// [`Core::resume`] when transitioning Paused → Active. Returns `true`
1069    /// only if the node was paused with `pending_wave` set.
1070    pub(crate) fn take_pending_wave(&mut self) -> bool {
1071        if let Self::Paused { pending_wave, .. } = self {
1072            std::mem::replace(pending_wave, false)
1073        } else {
1074            false
1075        }
1076    }
1077
1078    /// Remove a lock; if the lockset becomes empty, transition Paused →
1079    /// Active and return the buffered messages for replay (along with the
1080    /// dropped count for diagnostics). Unknown lock_id is an idempotent
1081    /// no-op (matches TS, R1.2.6 implicit).
1082    fn remove_lock(&mut self, lock_id: LockId) -> Option<(VecDeque<Message>, u32)> {
1083        match self {
1084            Self::Active => None,
1085            Self::Paused { locks, .. } => {
1086                if let Some(idx) = locks.iter().position(|l| *l == lock_id) {
1087                    locks.swap_remove(idx);
1088                }
1089                if locks.is_empty() {
1090                    let prev = std::mem::replace(self, Self::Active);
1091                    if let Self::Paused {
1092                        buffer, dropped, ..
1093                    } = prev
1094                    {
1095                        return Some((buffer, dropped));
1096                    }
1097                }
1098                None
1099            }
1100        }
1101    }
1102
1103    /// Append a message to the buffer; if the buffer would exceed `cap`,
1104    /// pop from the front (oldest-first), increment `dropped`, and return
1105    /// the dropped messages so the caller can release any payload handles
1106    /// they reference. `cap` of `None` means unbounded.
1107    ///
1108    /// Returns [`PushBufferedResult`] carrying both the dropped messages
1109    /// (for refcount release) and whether this push triggered the FIRST
1110    /// overflow event in the current pause cycle (for R1.3.8.c ERROR
1111    /// synthesis — the caller schedules a single ERROR per cycle).
1112    ///
1113    /// Note: refcount management for the message's payload handle is the
1114    /// caller's responsibility — see [`Core::queue_notify`] for the
1115    /// retain/release discipline. The buffer itself is just a message
1116    /// container; refcounts cross the binding boundary.
1117    pub(crate) fn push_buffered(&mut self, msg: Message, cap: Option<usize>) -> PushBufferedResult {
1118        let mut result = PushBufferedResult::default();
1119        if let Self::Paused {
1120            buffer,
1121            dropped,
1122            overflow_reported,
1123            ..
1124        } = self
1125        {
1126            buffer.push_back(msg);
1127            if let Some(c) = cap {
1128                while buffer.len() > c {
1129                    if let Some(dropped_msg) = buffer.pop_front() {
1130                        result.dropped_msgs.push(dropped_msg);
1131                    }
1132                    *dropped = dropped.saturating_add(1);
1133                }
1134            }
1135            // R1.3.8.c (Slice F, A3): flag first overflow this cycle.
1136            if !result.dropped_msgs.is_empty() && !*overflow_reported {
1137                *overflow_reported = true;
1138                result.first_overflow_this_cycle = true;
1139            }
1140        }
1141        result
1142    }
1143
1144    /// Snapshot the diagnostic for an R1.3.8.c overflow ERROR synthesis.
1145    /// Returns `(dropped_count, lock_held_ns)`. Caller must already know
1146    /// the configured cap (it's a Core-global value, not per-PauseState).
1147    pub(crate) fn overflow_diagnostic(&self) -> Option<(u32, u64)> {
1148        match self {
1149            Self::Active => None,
1150            Self::Paused {
1151                dropped,
1152                started_at_ns,
1153                ..
1154            } => {
1155                let lock_held_ns = monotonic_ns().saturating_sub(*started_at_ns);
1156                Some((*dropped, lock_held_ns))
1157            }
1158        }
1159    }
1160}
1161
1162/// Return shape for [`PauseState::push_buffered`]. Carries both the dropped
1163/// messages (for refcount release) and an "is this the first overflow this
1164/// cycle" flag (for R1.3.8.c ERROR synthesis scheduling).
1165#[derive(Default)]
1166pub(crate) struct PushBufferedResult {
1167    pub(crate) dropped_msgs: Vec<Message>,
1168    pub(crate) first_overflow_this_cycle: bool,
1169}
1170
1171/// Pending R1.3.8.c overflow ERROR synthesis entry. Recorded by
1172/// [`Core::queue_notify`] when the pause buffer first overflows in a cycle;
1173/// drained at wave-end after the lock-released call to
1174/// [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
1175///
1176/// `configured_max` is captured at scheduling time rather than read at
1177/// drain — the user could change `pause_buffer_cap` between schedule and
1178/// drain, and the diagnostic reads "the cap that was in effect when the
1179/// overflow happened."
1180#[derive(Debug, Clone)]
1181pub(crate) struct PendingPauseOverflow {
1182    pub(crate) node_id: NodeId,
1183    pub(crate) dropped_count: u32,
1184    pub(crate) configured_max: usize,
1185    pub(crate) lock_held_ns: u64,
1186}
1187
1188/// Errors returnable by [`Core::pause`] and [`Core::resume`].
1189#[derive(Error, Debug, Clone, PartialEq)]
1190pub enum PauseError {
1191    #[error("pause/resume: unknown node {0:?}")]
1192    UnknownNode(NodeId),
1193}
1194
1195/// Errors returnable by [`Core::up`] (canonical R1.4.1).
1196#[derive(Error, Debug, Clone, PartialEq)]
1197pub enum UpError {
1198    /// Node id is not registered.
1199    #[error("up: unknown node {0:?}")]
1200    UnknownNode(NodeId),
1201    /// Tier-3 (DATA / RESOLVED) and tier-5 (COMPLETE / ERROR) are
1202    /// downstream-only per R1.4.1; rejected at the boundary.
1203    #[error(
1204        "up: tier {tier} is forbidden upstream — value (tier 3) and \
1205         terminal-lifecycle (tier 5) planes are downstream-only per R1.4.1"
1206    )]
1207    TierForbidden { tier: u8 },
1208}
1209
1210/// Errors returnable by [`Core::register`] and its sugar wrappers
1211/// ([`Core::register_state`], [`Core::register_producer`],
1212/// [`Core::register_derived`], [`Core::register_dynamic`],
1213/// [`Core::register_operator`]).
1214///
1215/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
1216/// errors so that callers can recover from contract violations without
1217/// process abort. Every variant corresponds to a construction-time
1218/// invariant that the caller is responsible for upholding; the dispatcher
1219/// rejects the registration before any reactive state is created (so
1220/// there is no `Message::Error` channel through which to surface the
1221/// failure — these are imperative-layer errors, not reactive ones).
1222///
1223/// All variants are zero-side-effect: when [`Core::register`] returns
1224/// `Err`, no node has been added to the graph and any handle retains
1225/// taken on the way in (e.g. operator scratch seed retains via
1226/// [`BindingBoundary::retain_handle`]) have been released.
1227#[derive(Error, Debug, Clone, PartialEq, Eq)]
1228pub enum RegisterError {
1229    /// One of the supplied dep ids is not a registered node.
1230    #[error("register: unknown dep {0:?}")]
1231    UnknownDep(NodeId),
1232
1233    /// `op` was supplied (operator node) but `deps` was empty. Operator
1234    /// nodes need at least one dep — for subscription-managed combinators
1235    /// with no declared deps, use [`Core::register_producer`] instead.
1236    #[error(
1237        "register: operator nodes require at least one dep — \
1238         use register_producer for subscription-managed combinators"
1239    )]
1240    OperatorWithoutDeps,
1241
1242    /// [`NodeOpts::initial`] was set to a real handle but the registration
1243    /// shape is not a state node (state nodes are `deps.is_empty() &&
1244    /// fn_id.is_none() && op.is_none()`). Initial cache only makes sense
1245    /// for state nodes.
1246    #[error("register: NodeOpts::initial only valid for state nodes (no deps + no fn + no op)")]
1247    InitialOnlyForStateNodes,
1248
1249    /// A supplied dep is terminal (COMPLETE / ERROR) AND not
1250    /// resubscribable. Adding it would create a permanent wedge — the dep
1251    /// will never re-emit, so the registered node would be stuck.
1252    /// Mirrors [`SetDepsError::TerminalDep`] at registration time.
1253    #[error(
1254        "register: dep {0:?} is terminal and not resubscribable; \
1255         mark it resubscribable before terminating, or remove it from the dep list"
1256    )]
1257    TerminalDep(NodeId),
1258
1259    /// A stateful operator ([`OperatorOp::Scan`] / [`OperatorOp::Reduce`])
1260    /// was registered with `seed = NO_HANDLE`. R2.5.3 first-run gate
1261    /// requires the seed to be a real handle so that the operator can
1262    /// emit on its first fire.
1263    #[error("register: operator seed must be a real handle (R2.5.3); got NO_HANDLE")]
1264    OperatorSeedSentinel,
1265}
1266
1267/// Errors returnable by [`Core::set_pausable_mode`].
1268///
1269/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
1270/// errors. Same imperative-layer error model as [`RegisterError`].
1271#[derive(Error, Debug, Clone, PartialEq, Eq)]
1272pub enum SetPausableModeError {
1273    /// `node_id` is not a registered node.
1274    #[error("set_pausable_mode: unknown node {0:?}")]
1275    UnknownNode(NodeId),
1276    /// The node currently holds at least one pause lock. Changing pausable
1277    /// mode mid-pause would lose buffered content or strand a
1278    /// `pending_wave` flag — resume all locks first.
1279    #[error(
1280        "set_pausable_mode: cannot change pausable mode while paused; \
1281         resume all locks first"
1282    )]
1283    WhilePaused,
1284}
1285
1286/// Per-dep record. Replaces the parallel `deps` / `dep_handles` /
1287/// `dep_terminals` vectors from v1. Canonical spec R2.9.b alignment.
1288///
1289/// Each entry tracks one dep's lifecycle state, wave-scoped batch data,
1290/// and cross-wave `prev_data` for `ctx.prevData` access.
1291pub(crate) struct DepRecord {
1292    /// The dep node this record tracks.
1293    pub(crate) node: NodeId,
1294    /// Last DATA handle from the end of the previous wave. [`NO_HANDLE`]
1295    /// means the dep has never emitted DATA.
1296    pub(crate) prev_data: HandleId,
1297    /// Per-dep dirty flag — awaiting DATA/RESOLVED for current wave.
1298    pub(crate) dirty: bool,
1299    /// Per-dep involved-this-wave flag. Distinguishes:
1300    /// - `involved && data_batch.is_empty()` → dep settled RESOLVED
1301    /// - `!involved && data_batch.is_empty()` → dep was not in this wave
1302    pub(crate) involved_this_wave: bool,
1303    /// DATA handles accumulated this wave. Outside `batch()` scope, at most
1304    /// 1 element. Inside `batch()`, K emits on the source produce K entries
1305    /// per R1.3.6.b coalescing. Each handle holds a `retain_handle` share
1306    /// taken at `deliver_data_to_consumer` time; released at wave-end
1307    /// rotation in `clear_wave_state`.
1308    pub(crate) data_batch: SmallVec<[HandleId; 1]>,
1309    /// Terminal state for this dep. `None` = dep is live.
1310    /// `Some` = dep emitted COMPLETE/ERROR. When ALL entries are Some,
1311    /// the node auto-cascades per Lock 2.B (ERROR dominates COMPLETE).
1312    pub(crate) terminal: Option<TerminalKind>,
1313}
1314
1315impl DepRecord {
1316    fn new(node: NodeId) -> Self {
1317        Self {
1318            node,
1319            prev_data: NO_HANDLE,
1320            dirty: false,
1321            involved_this_wave: false,
1322            data_batch: SmallVec::new(),
1323            terminal: None,
1324        }
1325    }
1326}
1327
1328/// Internal node record. Mirrors `core.ts:132–154` post-D030 unification.
1329///
1330/// **Kind is derived, not stored** (D030, Slice D). `(dep_records.is_empty(),
1331/// fn_id, op, is_dynamic)` uniquely identifies the kind — see [`NodeKind`].
1332/// Helper methods (`is_state()`, `is_producer()`, `is_compute()`,
1333/// `is_operator()`, `skips_auto_cascade()`, `kind()`) cover the common
1334/// predicates without unpacking via [`Core::kind_of`].
1335///
1336/// The 5 bool fields (`has_fired_once`, `dirty`, `involved_this_wave`,
1337/// `has_received_teardown`, `resubscribable`, `is_dynamic`) each represent
1338/// an orthogonal concern. `is_dynamic` is constant per node (set at
1339/// register time); the others are mutable lifecycle state. Collapsing
1340/// them into a bitfield would obscure intent.
1341#[allow(clippy::struct_excessive_bools)]
1342pub(crate) struct NodeRecord {
1343    /// Per-dep records. Replaces the old parallel `deps` / `dep_handles` /
1344    /// `dep_terminals` vecs. Dep NodeIds derived via `dep_ids()`.
1345    pub(crate) dep_records: Vec<DepRecord>,
1346    /// User-fn id for closure-form dispatch. `Some` for Derived / Dynamic /
1347    /// Producer; `None` for State / Operator. (Operator dispatch goes via
1348    /// [`Self::op`] instead.)
1349    pub(crate) fn_id: Option<FnId>,
1350    /// Operator discriminant for typed-op dispatch. `Some` for Operator
1351    /// nodes; `None` otherwise. Mutually exclusive with `fn_id` (a node is
1352    /// either closure-form OR typed-op, never both).
1353    pub(crate) op: Option<OperatorOp>,
1354    /// True for Dynamic nodes (R2.5.3 — fn declares actually-tracked dep
1355    /// indices per fire). False for everything else. Only meaningful when
1356    /// `fn_id.is_some()` AND `!dep_records.is_empty()`.
1357    pub(crate) is_dynamic: bool,
1358    pub(crate) equals: EqualsMode,
1359
1360    // Mutable state
1361    pub(crate) cache: HandleId,
1362    pub(crate) has_fired_once: bool,
1363    pub(crate) subscribers: HashMap<SubscriptionId, Sink>,
1364    /// Monotonic counter bumped on every mutation of [`Self::subscribers`]
1365    /// (insert on subscribe, remove on `Subscription::Drop`, remove on
1366    /// handshake-panic cleanup). Used by
1367    /// [`crate::batch::Core::queue_notify`] to detect mid-wave subscriber-
1368    /// set changes and start a fresh `PendingBatch` with an updated sink
1369    /// snapshot — closes D2 (Slice X4, 2026-05-08): the late-subscriber
1370    /// and multi-emit-per-wave gap where the pre-fix per-node single
1371    /// snapshot meant a sub installed between two emits to the same node
1372    /// in one wave was invisible to the second emit's flush.
1373    ///
1374    /// Per-node (not per-Core) so that a subscribe to node A doesn't
1375    /// invalidate snapshot reuse for node B's pending batch in the same
1376    /// wave.
1377    pub(crate) subscribers_revision: u64,
1378    /// For dynamic nodes: which dep indices fn actually tracks.
1379    /// For static derived: all indices, populated at construction.
1380    pub(crate) tracked: HashSet<usize>,
1381
1382    // Wave-scoped state — cleared at wave end.
1383    pub(crate) dirty: bool,
1384    pub(crate) involved_this_wave: bool,
1385
1386    /// Per-node pause state. Default `Active`. See [`PauseState`].
1387    pub(crate) pause_state: PauseState,
1388    /// Pause behavior mode (canonical-spec §2.6). Set at registration via
1389    /// [`NodeOpts::pausable`]. Default [`PausableMode::Default`] suppresses
1390    /// fn-fire while paused and consolidates N pause-window dep deliveries
1391    /// into one fn-fire on RESUME; `ResumeAll` buffers tier-3/4 outgoing
1392    /// for verbatim replay; `Off` ignores PAUSE entirely. See
1393    /// [`PausableMode`].
1394    pub(crate) pausable: PausableMode,
1395    /// Replay buffer cap (R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
1396    /// `None` disables; `Some(N)` keeps a circular VecDeque of the last N
1397    /// DATA-handle emissions for late-subscriber replay. Each handle in
1398    /// the buffer owns one binding-side retain share, released on evict
1399    /// (cap exceeded) or in `Drop for CoreState`.
1400    pub(crate) replay_buffer_cap: Option<usize>,
1401    pub(crate) replay_buffer: VecDeque<HandleId>,
1402
1403    /// Terminal lifecycle state for THIS node's outgoing stream. Once set,
1404    /// further `emit` calls are silent no-ops, fn no longer fires, and only
1405    /// the terminal message has been queued downstream.
1406    pub(crate) terminal: Option<TerminalKind>,
1407    /// True after the first TEARDOWN has been processed for this node
1408    /// (R2.6.4 / Lock 6.F). Subsequent TEARDOWN deliveries are idempotent
1409    /// — the auto-prepended COMPLETE only fires on the first one. Without
1410    /// this flag, a redundant TEARDOWN delivered via the cascade plus an
1411    /// explicit `core.teardown(node)` would re-emit `[COMPLETE, TEARDOWN]`
1412    /// to subscribers per delivery, which is incorrect.
1413    pub(crate) has_received_teardown: bool,
1414    /// Per R2.2.7 / R2.5.3 — resubscribable terminal lifecycle.
1415    /// When `true` AND `terminal == Some(...)`, a fresh subscribe call
1416    /// will reset the node: clear `terminal`, `has_fired_once`,
1417    /// `has_received_teardown`, all dep_records to sentinel, and drain the
1418    /// pause lockset. Default `false`.
1419    pub(crate) resubscribable: bool,
1420    /// Meta companion nodes attached to this node per R1.3.9.d. When this
1421    /// node tears down, its meta companions are torn down FIRST (before
1422    /// the main node's auto-COMPLETE + TEARDOWN wire emission), so
1423    /// observers see companions terminate before the parent. The ordering
1424    /// is load-bearing — meta nodes typically subscribe to parent state
1425    /// that becomes inconsistent during the parent's destruction phase.
1426    pub(crate) meta_companions: Vec<NodeId>,
1427    /// R5.4 / D011 partial-mode: when `true`, fire_fn skips the R2.5.3
1428    /// first-run gate — the node fires as soon as ANY dep delivers a
1429    /// real handle, even if other deps remain sentinel. Defaults to
1430    /// `false` (gated). Lifted into Core for operator support; for
1431    /// State/Derived/Dynamic nodes the field is settable but the gated
1432    /// path remains the typical caller default.
1433    pub(crate) partial: bool,
1434    /// Generic per-operator scratch slot (Slice C-3, D026). Replaces
1435    /// the typed `operator_state: HandleId` field used by Slices C-1 / C-2.
1436    /// `None` for non-operator kinds and operators with no cross-wave
1437    /// state (Map / Filter / Combine / WithLatestFrom / Merge); `Some`
1438    /// for stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
1439    /// [`DistinctUntilChanged`] / [`Pairwise`] / [`Take`] / [`Skip`] /
1440    /// [`TakeWhile`] / [`Last`]).
1441    ///
1442    /// The boxed value implements
1443    /// [`OperatorScratch`](crate::op_state::OperatorScratch); its
1444    /// `release_handles` method is called from
1445    /// [`reset_for_fresh_lifecycle`] (resubscribable terminal cycle) and
1446    /// from [`Drop for CoreState`].
1447    ///
1448    /// **Refcount discipline:** the state struct owns whatever handle
1449    /// shares it stores (e.g., [`ScanState::acc`](crate::op_state::ScanState::acc),
1450    /// [`LastState::latest`](crate::op_state::LastState::latest)).
1451    /// Per-fire helpers retain the new value before releasing the old;
1452    /// `release_handles` releases the current shares at end-of-life.
1453    pub(crate) op_scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1454}
1455
1456impl NodeRecord {
1457    // ---- Kind predicates (D030 — derived from field shape) ----
1458
1459    /// True iff this is a state node (no deps, no fn, no op).
1460    pub(crate) fn is_state(&self) -> bool {
1461        self.dep_records.is_empty() && self.fn_id.is_none() && self.op.is_none()
1462    }
1463
1464    /// True iff this is a producer node (no deps + has fn + no op).
1465    /// Producers fire fn once on first subscribe; cleanup fires via
1466    /// [`BindingBoundary::producer_deactivate`] (D031, Slice D).
1467    pub(crate) fn is_producer(&self) -> bool {
1468        self.dep_records.is_empty() && self.fn_id.is_some() && self.op.is_none()
1469    }
1470
1471    /// True iff this is a compute node (Derived / Dynamic / Operator) —
1472    /// has at least one dep AND either a fn or an op.
1473    #[allow(dead_code)] // Convenience predicate; callers may use is_state/is_producer instead.
1474    pub(crate) fn is_compute(&self) -> bool {
1475        !self.dep_records.is_empty() && (self.fn_id.is_some() || self.op.is_some())
1476    }
1477
1478    /// True iff this is an Operator node (has op set).
1479    #[allow(dead_code)] // Direct `op.is_some()` is more common; this is a readability sugar.
1480    pub(crate) fn is_operator(&self) -> bool {
1481        self.op.is_some()
1482    }
1483
1484    /// True iff this node opts OUT of Lock 2.B auto-cascade —
1485    /// Operator(Reduce) / Operator(Last) intercept upstream COMPLETE.
1486    pub(crate) fn skips_auto_cascade(&self) -> bool {
1487        match self.op {
1488            Some(op) => NodeKind::Operator(op).skips_auto_cascade(),
1489            None => false,
1490        }
1491    }
1492
1493    /// Compute the public-API [`NodeKind`] from the field shape (D030).
1494    /// Used by [`Core::kind_of`] and rare internal sites that need the
1495    /// enum (most use the predicate methods above).
1496    pub(crate) fn kind(&self) -> NodeKind {
1497        if let Some(op) = self.op {
1498            NodeKind::Operator(op)
1499        } else if self.dep_records.is_empty() {
1500            if self.fn_id.is_some() {
1501                NodeKind::Producer
1502            } else {
1503                NodeKind::State
1504            }
1505        } else if self.is_dynamic {
1506            NodeKind::Dynamic
1507        } else {
1508            NodeKind::Derived
1509        }
1510    }
1511
1512    // ---- Existing accessors ----
1513
1514    /// Iterator over dep NodeIds in declaration order.
1515    pub(crate) fn dep_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
1516        self.dep_records.iter().map(|r| r.node)
1517    }
1518
1519    /// Collected dep NodeIds — for call sites that need a `Vec<NodeId>`.
1520    pub(crate) fn dep_ids_vec(&self) -> Vec<NodeId> {
1521        self.dep_ids().collect()
1522    }
1523
1524    /// Number of deps.
1525    pub(crate) fn dep_count(&self) -> usize {
1526        self.dep_records.len()
1527    }
1528
1529    /// True if any dep is in sentinel state (never emitted DATA and no
1530    /// data this wave). Replaces the old `dep_handles.contains(&NO_HANDLE)`.
1531    pub(crate) fn has_sentinel_deps(&self) -> bool {
1532        self.dep_records
1533            .iter()
1534            .any(|r| r.prev_data == NO_HANDLE && r.data_batch.is_empty())
1535    }
1536
1537    /// Find the index of a dep by NodeId.
1538    pub(crate) fn dep_index_of(&self, dep_id: NodeId) -> Option<usize> {
1539        self.dep_records.iter().position(|r| r.node == dep_id)
1540    }
1541
1542    /// True if ALL dep terminal slots are populated (Lock 2.B cascade check).
1543    pub(crate) fn all_deps_terminal(&self) -> bool {
1544        !self.dep_records.is_empty() && self.dep_records.iter().all(|r| r.terminal.is_some())
1545    }
1546}
1547
1548/// Wave-scoped cross-partition aggregation state — fields populated as
1549/// a wave executes that aggregate work across every partition the wave
1550/// touched, drained / cleared at wave-end. Lives behind a separate
1551/// [`parking_lot::Mutex`] from [`CoreState`].
1552///
1553/// **Q2 (next batch — post-D3-closure, 2026-05-09):** moved out of
1554/// `CoreState`'s critical section into a sibling mutex. Q2 alone is
1555/// pattern-prep for Q-beyond — the bench impact in isolation is 0–5% in
1556/// Regime A (the four fields are ~4 of ~15 fields in the wave-engine
1557/// critical section). The structural value is establishing the "Core
1558/// has multiple Core-level mutexes" pattern with explicit
1559/// lock-discipline rules:
1560///
1561/// - **Lock order (acquire-direction rules; constrains the order ONLY
1562///   WHEN both are acquired simultaneously):**
1563///   - `state → registry` (P12 invariant — pre-existing).
1564///   - `state → cross_partition` (Q2, 2026-05-09).
1565///   - `registry → cross_partition` is permitted (no path uses both
1566///     simultaneously today; reserved for forward compatibility).
1567///   - The forbidden direction is `cross_partition → state`: any
1568///     thread holding `cross_partition` must NOT then attempt to
1569///     acquire `state`, since some thread doing the canonical
1570///     `state → cross_partition` order would deadlock against it.
1571///   - `partition_state` (the per-`SubgraphLockBox::state` mutex) was
1572///     introduced under Q3 v1 then reverted by the D1 patch; it no
1573///     longer exists in this lock chain.
1574/// - **Acquire alone:** sites that ONLY touch the four fields (no state
1575///   access in the same critical section) MAY acquire `cross_partition`
1576///   without state. The lock-order rule only constrains the RELATIVE
1577///   order when both are acquired.
1578/// - **Wave-end clear:** [`Self::clear_wave_state`] zeroes
1579///   `pending_auto_resolve` and `pending_pause_overflow`. The other two
1580///   fields (`wave_cache_snapshots` and `deferred_handle_releases`)
1581///   follow the explicit drain-and-release-lock-released discipline in
1582///   the BatchGuard success/panic paths — see `Drop for BatchGuard` for
1583///   the canonical drain order.
1584///
1585/// Q-beyond will continue the shape decomposition with per-partition
1586/// `SubgraphShard`s holding most of `CoreState`'s fields per-partition;
1587/// the four fields here aggregate ACROSS partitions in a wave and stay
1588/// shared.
1589pub(crate) struct CrossPartitionState {
1590    /// Payload-handle releases owed for messages that landed in
1591    /// `pending_notify` during this wave (one per `payload_handle()`).
1592    /// `run_wave` releases these after sinks fire and the lock is dropped,
1593    /// balancing the retain done in `queue_notify`.
1594    pub(crate) deferred_handle_releases: Vec<HandleId>,
1595    /// Pre-wave cache snapshots used to restore state if the wave aborts
1596    /// mid-flight (e.g., a `Core::batch` closure panics). Each entry is
1597    /// `(node_id → old_cache_handle)` — the handle the node held BEFORE
1598    /// the wave started writing to it. The snapshotted handle holds a
1599    /// retain (taken when the snapshot was inserted) so it stays alive
1600    /// for restoration. On wave success, snapshots are dropped and their
1601    /// retains released. On wave abort (`BatchGuard::drop` panic-discard
1602    /// path), each cache slot is restored from the snapshot — the slot's
1603    /// current handle is released, and the snapshot's retain transfers
1604    /// to the cache slot. Only populated for in-flight waves; empty
1605    /// between waves.
1606    pub(crate) wave_cache_snapshots: HashMap<NodeId, HandleId>,
1607    /// Nodes that need an auto-Resolved at wave end if they don't receive
1608    /// a tier-3+ message from their own commit_emission. Populated by
1609    /// the RESOLVED child propagation in `commit_emission` (which queues
1610    /// Dirty but defers Resolved to avoid double-settlement). Drained by
1611    /// the auto-resolve sweep in `drain_and_flush`. Cleared by
1612    /// `clear_wave_state`.
1613    pub(crate) pending_auto_resolve: ahash::AHashSet<NodeId>,
1614    /// R1.3.8.c pause-overflow ERROR synthesis queue (Slice F, A3 —
1615    /// 2026-05-07). Recorded by [`Core::queue_notify`] when the pause
1616    /// buffer first overflows in a cycle; drained at wave-end after the
1617    /// lock-released call to
1618    /// [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
1619    ///
1620    /// One entry per (node × pause-cycle); subsequent overflows in the
1621    /// same cycle don't re-queue (gated by `PauseState::overflow_reported`).
1622    pub(crate) pending_pause_overflow: Vec<PendingPauseOverflow>,
1623    /// Binding-boundary handle for `Drop`-time refcount balancing.
1624    /// Mirrors [`CoreState::binding`] — both structs hold their own
1625    /// strong ref so each can independently release retained handles
1626    /// from its drop path. The Arc clone is cheap; the binding object
1627    /// itself is shared.
1628    binding: Arc<dyn BindingBoundary>,
1629}
1630
1631impl CrossPartitionState {
1632    /// Construct an empty `CrossPartitionState`. **Caller invariant
1633    /// (load-bearing):** `binding` MUST be the same `Arc<dyn BindingBoundary>`
1634    /// instance that's also stored in [`CoreState::binding`] and
1635    /// [`Core::binding`] for the same `Core`. The three strong refs
1636    /// share refcount accounting for handle retain/release pairings;
1637    /// passing a DIFFERENT binding here would split the refcount
1638    /// state and produce use-after-free / double-release on
1639    /// `release_handle` paths. Only `Core::new` constructs this — keep
1640    /// it that way unless a future change moves construction
1641    /// elsewhere with the same invariant preserved.
1642    ///
1643    /// /qa A8 doc-fix (2026-05-09).
1644    fn new(binding: Arc<dyn BindingBoundary>) -> Self {
1645        Self {
1646            deferred_handle_releases: Vec::new(),
1647            wave_cache_snapshots: HashMap::new(),
1648            pending_auto_resolve: ahash::AHashSet::new(),
1649            pending_pause_overflow: Vec::new(),
1650            binding,
1651        }
1652    }
1653
1654    /// Wave-end clear. Mirrors the four field clears that previously
1655    /// lived in [`CoreState::clear_wave_state`]. `pending_auto_resolve`
1656    /// and `pending_pause_overflow` are zeroed; `wave_cache_snapshots`
1657    /// and `deferred_handle_releases` are NOT cleared here — they
1658    /// follow the success/panic paths' explicit drain discipline (see
1659    /// [`Drop`] for panic-time cleanup).
1660    pub(crate) fn clear_wave_state(&mut self) {
1661        self.pending_auto_resolve.clear();
1662        // A3 (Slice F, 2026-05-07): pending_pause_overflow is normally
1663        // drained by drain_and_flush via the synthesis loop. If a wave is
1664        // panic-discarded BEFORE the synthesis runs, we drop the queued
1665        // entries silently — the binding never sees ERROR for that
1666        // overflow event, but the pause buffer's `dropped` count is
1667        // unchanged so callers can still detect via ResumeReport.
1668        self.pending_pause_overflow.clear();
1669    }
1670}
1671
1672impl Drop for CrossPartitionState {
1673    fn drop(&mut self) {
1674        // Mirrors the cross_partition cleanup that previously lived in
1675        // [`Drop for CoreState`] (Q2, 2026-05-09 — fields moved here).
1676        // Defensive: at process-exit time the wave engine has typically
1677        // drained these to empty, but a panicked-mid-wave Core could
1678        // leave retains in either field.
1679        let snapshots = std::mem::take(&mut self.wave_cache_snapshots);
1680        for (_, h) in snapshots {
1681            self.binding.release_handle(h);
1682        }
1683        let releases = std::mem::take(&mut self.deferred_handle_releases);
1684        for h in releases {
1685            self.binding.release_handle(h);
1686        }
1687        // pending_auto_resolve / pending_pause_overflow hold no retains.
1688    }
1689}
1690
1691/// All mutable Core state, behind one [`parking_lot::Mutex`].
1692///
1693/// Q2 (2026-05-09) split four wave-scoped cross-partition aggregation
1694/// fields out into [`CrossPartitionState`] under its own
1695/// `parking_lot::Mutex` on [`Core`]. The D1 patch (2026-05-09) moved
1696/// Slice G's `tier3_emitted_this_wave` set out to a per-thread
1697/// thread-local in `crate::batch` (was briefly per-partition under Q3
1698/// v1; that placement was vulnerable to mid-wave cross-thread
1699/// `set_deps` partition splits — see `docs/porting-deferred.md`
1700/// "Per-partition state-shard refactor" closing summary). Q-beyond
1701/// will continue the shape decomposition by sharding most of the
1702/// remaining fields per-partition.
1703pub(crate) struct CoreState {
1704    pub(crate) next_node_id: u64,
1705    pub(crate) next_subscription_id: u64,
1706    pub(crate) next_lock_id: u64,
1707    pub(crate) nodes: HashMap<NodeId, NodeRecord>,
1708    /// Inverted adjacency: `parent → children`. Updated on registration.
1709    pub(crate) children: HashMap<NodeId, HashSet<NodeId>>,
1710    /// Nodes whose fn we owe a fire to — drained by [`Core::run_wave`].
1711    pub(crate) pending_fires: HashSet<NodeId>,
1712    /// Per-node outgoing message buffer; flushed at wave end. Insertion-
1713    /// ordered so flush order is deterministic — load-bearing for
1714    /// R1.3.9.d meta-TEARDOWN ordering: when a parent and its meta
1715    /// companion both have queued messages in the same wave, the meta
1716    /// (queued first via `teardown_inner`'s recursion order) flushes
1717    /// first.
1718    ///
1719    /// Each entry carries the per-wave subscriber snapshot taken at first
1720    /// touch (Slice A close, M1: lock-released drain). Late subscribers
1721    /// installed mid-wave between fn-fire iterations don't appear in
1722    /// already-snapshotted entries; this is the load-bearing fix that
1723    /// prevents duplicate-Data delivery when a handshake delivers the
1724    /// post-commit cache and the wave's flush would otherwise also fire
1725    /// to the same sink.
1726    pub(crate) pending_notify: IndexMap<NodeId, PendingPerNode>,
1727    pub(crate) in_tick: bool,
1728    /// Core-global cap on per-node pause replay buffer length. `None` means
1729    /// unbounded. Per the user direction (Q1, 2026-05-05): start core-global;
1730    /// per-node override can be added later as a pure addition without API
1731    /// breakage. Default `None`.
1732    pub(crate) pause_buffer_cap: Option<usize>,
1733    /// Core-global cap on wave-drain iterations before
1734    /// [`crate::batch::Core::drain_and_flush`] aborts with a diagnostic panic.
1735    /// Replaces the prior `MAX_DRAIN_ITERATIONS` hard-coded constant
1736    /// (R4.3 / Lock 2.F′). Default `10_000`.
1737    ///
1738    /// The drain loop bound exists to surface runtime cycles
1739    /// (e.g. an operator that re-arms its own `pending_fires` slot during
1740    /// `invoke_fn`) as a panic with context, rather than letting Core
1741    /// spin forever. Structural cycles via [`Core::set_deps`] are
1742    /// rejected at edge-mutation time (`SetDepsError::WouldCreateCycle`);
1743    /// registration is structurally cycle-safe by construction (the new
1744    /// node's id is not allocated until AFTER deps are validated, so deps
1745    /// cannot transitively reach the new node). The drain bound is the
1746    /// safety net for runtime cycles that bypass both static checks.
1747    pub(crate) max_batch_drain_iterations: u32,
1748    /// Deferred sink-fire jobs collected by `flush_notifications`. The wave
1749    /// engine populates this under the state lock during the flush phase;
1750    /// `run_wave` then drops the lock and fires the jobs. Each tuple is
1751    /// `(sinks_for_one_node_one_phase, phase_messages)`. Empty between waves.
1752    pub(crate) deferred_flush_jobs: crate::batch::DeferredJobs,
1753    // Q2 (2026-05-09): `deferred_handle_releases` moved to
1754    // [`CrossPartitionState::deferred_handle_releases`].
1755    /// Binding-boundary handle for `Drop`-time refcount balancing.
1756    /// `Core` also holds a clone of this Arc; storing it here lets
1757    /// `Drop for CoreState` walk every retained slot and release the
1758    /// binding-side share when the last `Core` clone drops. Without this,
1759    /// `cache` / `terminal` / `dep_terminals` Error / pause-buffer payload
1760    /// handle refs leak in the binding registry until process exit.
1761    pub(crate) binding: Arc<dyn BindingBoundary>,
1762    // Q2 (2026-05-09): `wave_cache_snapshots` moved to
1763    // [`CrossPartitionState::wave_cache_snapshots`].
1764    // Q2 (2026-05-09): `pending_auto_resolve` moved to
1765    // [`CrossPartitionState::pending_auto_resolve`].
1766    /// Topology-change sinks. Keyed by subscription id for O(1) removal.
1767    pub(crate) topology_sinks: HashMap<u64, crate::topology::TopologySink>,
1768    pub(crate) next_topology_id: u64,
1769    /// A6 reentrancy guard (Slice F, 2026-05-07): the stack of NodeIds whose
1770    /// fn is currently being invoked on the wave-owner thread. Pushed at the
1771    /// top of `fire_fn` (just before the lock-released `invoke_fn` call) and
1772    /// popped on return / unwind via the [`crate::batch::FiringGuard`] RAII
1773    /// helper. [`Core::set_deps`] consults this set and rejects with
1774    /// [`SetDepsError::ReentrantOnFiringNode`] if `n` is currently firing —
1775    /// preventing the D1 `tracked` index corruption (see
1776    /// `porting-deferred.md` "Set_deps from inside firing node's fn corrupts
1777    /// Dynamic `tracked` indices").
1778    ///
1779    /// Stack rather than set so nested fn re-entrance (Producer subscribing
1780    /// to a fn that itself fires another fn) tracks every concurrently-firing
1781    /// node on the wave-owner. `Vec` rather than `HashSet` because the
1782    /// expected depth is small (typically 1, occasionally 2–3 with
1783    /// higher-order operators) and linear scan is faster than hash for that
1784    /// size.
1785    pub(crate) currently_firing: Vec<NodeId>,
1786    // Q2 (2026-05-09): `pending_pause_overflow` moved to
1787    // [`CrossPartitionState::pending_pause_overflow`].
1788    // Slice G (R1.3.2.d / R1.3.3.a — 2026-05-07): tier3-emitted-this-wave
1789    // tracker MOVED to a per-thread thread-local in `crate::batch`
1790    // (D1 patch, 2026-05-09 — was briefly per-partition under Q3 v1
1791    // 2026-05-09 morning). Wave-scope = thread; per-thread placement
1792    // is robust to mid-wave cross-thread `set_deps` partition splits
1793    // because thread B's split doesn't touch thread A's thread-local.
1794    // See [`crate::batch::TIER3_EMITTED_THIS_WAVE`] for the per-thread
1795    // wave-scope rationale and lifecycle (cleared at outermost
1796    // [`crate::batch::BatchGuard`] drop, both success + panic).
1797    /// Slice E2 (R1.3.9.b strict reading per D057): per-wave-per-node
1798    /// dedup for `OnInvalidate` cleanup hook firing. A node already in
1799    /// this set this wave has already had its `OnInvalidate` queued into
1800    /// `deferred_cleanup_hooks` and MUST NOT queue again, even if
1801    /// `invalidate_inner` re-encounters it (rare: only matters when the
1802    /// node re-populates mid-wave via fn-fire and then gets re-invalidated
1803    /// in the same wave through a separate path).
1804    ///
1805    /// Cleared in [`CoreState::clear_wave_state`] alongside the other
1806    /// wave-scoped queues.
1807    pub(crate) invalidate_hooks_fired_this_wave: ahash::AHashSet<NodeId>,
1808    /// Slice E2 (per D060/D061): lock-released drain queue for
1809    /// `OnInvalidate` cleanup hooks. Populated under the state lock by
1810    /// `Core::invalidate_inner` when a node's cache transitions
1811    /// `!= NO_HANDLE → NO_HANDLE`; drained after the lock drops at wave
1812    /// boundary by [`crate::batch::Core::fire_deferred_cleanup_hooks`]
1813    /// (each call wrapped in `catch_unwind` so a single binding panic
1814    /// doesn't short-circuit the drain — last panic re-raises after the
1815    /// loop completes per D060).
1816    ///
1817    /// **Panic-discard semantics (D061):** cleared in
1818    /// [`CoreState::clear_wave_state`] without firing — a panic-discarded
1819    /// wave drops the queued cleanup hooks silently, mirroring the
1820    /// `pending_pause_overflow` precedent (Slice F /qa A3). Bindings using
1821    /// `OnInvalidate` for external-resource cleanup MUST idempotent-cleanup
1822    /// at process exit or next successful invalidate cycle.
1823    pub(crate) deferred_cleanup_hooks: Vec<(NodeId, crate::boundary::CleanupTrigger)>,
1824    /// Slice E2 /qa Q2(b) (D069): lock-released drain queue for
1825    /// `BindingBoundary::wipe_ctx` calls fired eagerly from
1826    /// `Core::terminate_node` when a resubscribable node terminates with
1827    /// no live subscribers. Pairs with the `Subscription::Drop` direct-
1828    /// fire site (mutually exclusive: subs-empty-at-terminate routes
1829    /// here; subs-non-empty-at-terminate fires from Subscription::Drop's
1830    /// last-sub-drop path). Drained alongside `deferred_cleanup_hooks`
1831    /// at wave boundary; same `catch_unwind` discipline so a single
1832    /// binding panic doesn't short-circuit the drain. Same panic-discard
1833    /// semantics as `deferred_cleanup_hooks` (silent drop on
1834    /// panic-discarded waves).
1835    pub(crate) pending_wipes: Vec<NodeId>,
1836}
1837
1838/// The handle-protocol Core dispatcher.
1839///
1840/// Holds an [`Arc`] to the [`BindingBoundary`] and all dispatch state. Cheap
1841/// to clone (the inner `Arc<Mutex<CoreState>>` is shared); pass `Core` by
1842/// value to threads.
1843///
1844/// # Wave-owner re-entrant mutex (Slice A close /qa, M1)
1845///
1846/// The state lock (`state: Mutex<CoreState>`) is **dropped** around binding
1847/// callbacks (`invoke_fn`, `custom_equals`) so user fns may re-enter Core.
1848/// To preserve serializability of WAVE EXECUTION across threads — without
1849/// re-introducing the lock-held-during-fn-fire deadlock the Slice A close
1850/// refactor lifted — the wave engine acquires `wave_owner` (a
1851/// [`parking_lot::ReentrantMutex`]) for the lifetime of each wave.
1852///
1853/// Properties:
1854///
1855/// - **Same-thread re-entrance is free.** A user fn that calls back into
1856///   `Core::emit` / `Core::pause` / etc. mid-fire re-acquires `wave_owner`
1857///   on the same thread and runs as a nested wave (the inner `run_wave`
1858///   sees `in_tick=true` and skips drain — outer drain picks up).
1859/// - **Cross-thread emits BLOCK** at `wave_owner.lock_arc()` until the
1860///   in-flight wave completes (drain + flush + sink fire all done). This
1861///   serializes wave OWNERSHIP across threads, while still allowing the
1862///   state lock to drop inside the wave for binding callbacks.
1863///
1864/// Without this, Slice A close's lock-released drain let cross-thread
1865/// emits absorb into the in-flight wave's `pending_notify` and return
1866/// before subscribers fire — breaking the user-facing happens-after
1867/// contract that `emit` returning means subscribers have observed.
1868#[derive(Clone)]
1869pub struct Core {
1870    pub(crate) state: Arc<Mutex<CoreState>>,
1871    /// Q2 (2026-05-09): wave-scoped cross-partition aggregation state
1872    /// behind a sibling `parking_lot::Mutex` (separate from `state`).
1873    /// Lock-discipline: `state → cross_partition` (acquire state first
1874    /// when both are needed). See [`CrossPartitionState`] for the
1875    /// per-field rationale.
1876    pub(crate) cross_partition: Arc<parking_lot::Mutex<CrossPartitionState>>,
1877    pub(crate) binding: Arc<dyn BindingBoundary>,
1878    /// Slice X5 (D3 substrate, 2026-05-08) + Slice Y1 / Phase E
1879    /// (wave-engine migration, 2026-05-08): per-subgraph union-find
1880    /// registry. Tracks each registered node's connected-component
1881    /// membership (a "subgraph"). Each component's root carries an
1882    /// `Arc<SubgraphLockBox>` whose `wave_owner: ReentrantMutex<()>`
1883    /// is the per-partition wave-serialization lock — acquired by
1884    /// [`Self::partition_wave_owner_lock_arc`] under the retry-validate
1885    /// loop. Cross-thread emits to disjoint partitions run truly
1886    /// parallel; same-thread re-entry passes through reentrantly.
1887    ///
1888    /// Direct port of [`graphrefly-py`'s
1889    /// `subgraph_locks.py`](https://github.com/graphrefly/graphrefly-py/blob/main/src/graphrefly/core/subgraph_locks.py)
1890    /// design (locked in [`SESSION-rust-port-d3-per-subgraph-parallelism.md`](https://github.com/graphrefly/graphrefly-ts/blob/main/archive/docs/SESSION-rust-port-d3-per-subgraph-parallelism.md)).
1891    pub(crate) registry: Arc<parking_lot::Mutex<crate::subgraph::SubgraphRegistry>>,
1892}
1893
1894/// Weak handle to a [`Core`] — does not contribute to strong refcount.
1895///
1896/// Constructed via [`Core::weak_handle`]; upgraded back to a strong
1897/// [`Core`] via [`WeakCore::upgrade`]. Used by long-lived binding-stored
1898/// closures (notably `ProducerBuildFn`s registered via
1899/// [`graphrefly_operators::ProducerBinding::register_producer_build`])
1900/// to break the BenchBinding → registry → closure → strong-Core cycle
1901/// that would otherwise leak the entire graph state when a `BenchCore`
1902/// drops with active producer registrations.
1903///
1904/// Upgrade on each invocation; if the host `Core` was already dropped,
1905/// `upgrade()` returns `None` and the closure should no-op (the host
1906/// is being torn down, no work to do).
1907#[derive(Clone)]
1908pub struct WeakCore {
1909    state: Weak<Mutex<CoreState>>,
1910    cross_partition: Weak<parking_lot::Mutex<CrossPartitionState>>,
1911    binding: Weak<dyn BindingBoundary>,
1912    registry: Weak<parking_lot::Mutex<crate::subgraph::SubgraphRegistry>>,
1913}
1914
1915impl WeakCore {
1916    /// Try to upgrade back to a strong [`Core`]. Returns `None` if the
1917    /// host `Core`'s strong count has reached zero (i.e. the host
1918    /// `BenchCore` / equivalent owner was dropped).
1919    #[must_use]
1920    pub fn upgrade(&self) -> Option<Core> {
1921        Some(Core {
1922            state: self.state.upgrade()?,
1923            cross_partition: self.cross_partition.upgrade()?,
1924            binding: self.binding.upgrade()?,
1925            registry: self.registry.upgrade()?,
1926        })
1927    }
1928}
1929
1930/// RAII guard that owns an [`OperatorScratch`] until either (a) the
1931/// caller `take()`s it for installation, or (b) the guard drops on an
1932/// early return / unwind, in which case the scratch's handle retains
1933/// are released via [`OperatorScratch::release_handles`].
1934///
1935/// Slice H /qa F1 + F2 (2026-05-07): closes two related correctness
1936/// gaps in `Core::register`:
1937///
1938/// 1. **TOCTOU window** — the original three-phase split called
1939///    `lock_state()` twice (once for validation, once for insertion),
1940///    so a concurrent `Core::complete(dep)` on a non-resubscribable
1941///    dep could slip in between the two acquisitions and re-create
1942///    the wedge `RegisterError::TerminalDep` was designed to prevent.
1943///    The guard plus a single locked region for both phases closes
1944///    this gap (release runs lock-released because guard variables
1945///    drop in reverse declaration order — guard declared BEFORE
1946///    `lock_state()`, so the lock guard drops first).
1947///
1948/// 2. **Panic-unsafe scratch leak** — without an RAII drop, a panic
1949///    between `make_op_scratch` (Phase 2) and the explicit
1950///    `if let Err(e)` cleanup branch (e.g., `lock_state()` reentrance
1951///    assert, OOM-as-panic on Vec growth in dep iteration) would
1952///    drop the `Box<dyn OperatorScratch>` without releasing the
1953///    seed/default retain. The guard's `Drop` impl releases on any
1954///    unwind path.
1955///
1956/// Lock-discipline: the guard holds `&dyn BindingBoundary` (through
1957/// the `Arc<dyn BindingBoundary>` it borrows from). On `Drop`, it
1958/// invokes `release_handles` lock-released — fires AFTER any
1959/// `MutexGuard<CoreState>` declared later in the same scope drops
1960/// (LIFO destruction order). Mirrors `Core::resume` Phase 2 release
1961/// pattern.
1962struct ScratchReleaseGuard<'a> {
1963    scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1964    binding: &'a dyn BindingBoundary,
1965}
1966
1967impl<'a> ScratchReleaseGuard<'a> {
1968    fn new(
1969        scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1970        binding: &'a dyn BindingBoundary,
1971    ) -> Self {
1972        Self { scratch, binding }
1973    }
1974
1975    /// Take ownership of the scratch — disarms the release-on-drop
1976    /// behavior. Used on the success path to install the scratch on
1977    /// `NodeRecord.op_scratch`.
1978    fn take(mut self) -> Option<Box<dyn crate::op_state::OperatorScratch>> {
1979        self.scratch.take()
1980    }
1981}
1982
1983impl Drop for ScratchReleaseGuard<'_> {
1984    fn drop(&mut self) {
1985        if let Some(mut scratch) = self.scratch.take() {
1986            scratch.release_handles(self.binding);
1987        }
1988    }
1989}
1990
1991impl Core {
1992    /// Construct a fresh Core wired to the given binding. Pause buffer cap
1993    /// defaults to unbounded; set via [`Self::set_pause_buffer_cap`].
1994    #[must_use]
1995    pub fn new(binding: Arc<dyn BindingBoundary>) -> Self {
1996        Self {
1997            state: Arc::new(Mutex::new(CoreState {
1998                next_node_id: 1,
1999                next_subscription_id: 1,
2000                // A4 (Slice F, 2026-05-07): start `next_lock_id` in the high
2001                // half of the u32 range so `alloc_lock_id` can't collide with
2002                // user-supplied `LockId::new(N)` constructors (which the
2003                // napi-rs binding marshals from `u32` and tests typically use
2004                // in the low range, 1..1024). Phase E /qa F1 (2026-05-08):
2005                // lowered from `1u64 << 32` to `1u64 << 31` so the value
2006                // round-trips through `u32::try_from(...)` at the napi
2007                // boundary — the previous seed errored every napi
2008                // `alloc_lock_id` call. Anti-collision intent (high range vs
2009                // low user range) preserved at half the prior ceiling
2010                // (2^31 ≈ 2 billion allocations per Core, ample for parity
2011                // tests). Lift the floor when the deferred BigInt-narrowing
2012                // migration extends `LockId` to `u64` at the FFI layer
2013                // (porting-deferred "BigInt migration for u32-narrowed napi
2014                // types" entry).
2015                next_lock_id: 1u64 << 31,
2016                nodes: HashMap::new(),
2017                children: HashMap::new(),
2018                pending_fires: HashSet::new(),
2019                pending_notify: IndexMap::new(),
2020                in_tick: false,
2021                pause_buffer_cap: None,
2022                max_batch_drain_iterations: 10_000,
2023                deferred_flush_jobs: Vec::new(),
2024                binding: binding.clone(),
2025                topology_sinks: HashMap::new(),
2026                next_topology_id: 1,
2027                currently_firing: Vec::new(),
2028                invalidate_hooks_fired_this_wave: ahash::AHashSet::new(),
2029                deferred_cleanup_hooks: Vec::new(),
2030                pending_wipes: Vec::new(),
2031            })),
2032            cross_partition: Arc::new(parking_lot::Mutex::new(CrossPartitionState::new(
2033                binding.clone(),
2034            ))),
2035            binding,
2036            registry: Arc::new(parking_lot::Mutex::new(
2037                crate::subgraph::SubgraphRegistry::new(),
2038            )),
2039        }
2040    }
2041
2042    /// Acquire the state lock.
2043    ///
2044    /// Post-Slice-E: `Core::subscribe` fires the per-tier handshake
2045    /// LOCK-RELEASED with `wave_owner` held; sink callbacks may freely
2046    /// re-enter Core (`emit` / `complete` / `error` / nested `subscribe`).
2047    /// Same-thread re-entry passes through `wave_owner`'s `ReentrantMutex`
2048    /// transparently; cross-thread emits block on `wave_owner` until the
2049    /// outer subscribe completes, preserving R1.3.5.a happens-after
2050    /// ordering. The previous `IN_HANDSHAKE_FIRE` panic-diagnostic is no
2051    /// longer needed.
2052    pub(crate) fn lock_state(&self) -> MutexGuard<'_, CoreState> {
2053        self.state.lock()
2054    }
2055
2056    /// Acquire the cross-partition aggregation lock (Q2, 2026-05-09).
2057    ///
2058    /// **Lock-discipline:** if both `state` and `cross_partition` are
2059    /// needed in the same critical section, ALWAYS acquire `state`
2060    /// FIRST. Acquiring `cross_partition` then `state` is forbidden —
2061    /// any thread doing the canonical `state → cross_partition` order
2062    /// would deadlock against it. The compiler can't enforce this; the
2063    /// rule is preserved by code-review and the consistent shape
2064    /// `let mut s = self.lock_state(); let mut cps = self.lock_cross_partition();`.
2065    ///
2066    /// Sites that touch ONLY the four cross-partition fields (no state
2067    /// access in the same critical section) MAY acquire
2068    /// `cross_partition` standalone — the lock-order rule only
2069    /// constrains the relative order WHEN both are needed.
2070    pub(crate) fn lock_cross_partition(&self) -> parking_lot::MutexGuard<'_, CrossPartitionState> {
2071        self.cross_partition.lock()
2072    }
2073
2074    /// Whether `self` and `other` point to the same dispatcher state.
2075    /// True when one was produced by `Clone`-ing the other (or they
2076    /// were both cloned from a common ancestor); false for two
2077    /// independently `Core::new`-constructed instances even with the
2078    /// same binding.
2079    ///
2080    /// Used by `graphrefly-graph`'s `mount` to enforce the "shared-Core
2081    /// only" v1 invariant — cross-Core mount is post-M6.
2082    #[must_use]
2083    pub fn same_dispatcher(&self, other: &Core) -> bool {
2084        Arc::ptr_eq(&self.state, &other.state)
2085    }
2086
2087    /// Downgrade to a [`WeakCore`] handle that doesn't contribute to
2088    /// strong refcount of the underlying state / binding / wave_owner.
2089    ///
2090    /// Used by binding-stored long-lived closures (e.g.
2091    /// `register_producer_build`-stored `ProducerBuildFn`s) to avoid the
2092    /// Arc cycle:
2093    ///
2094    /// ```text
2095    /// BenchBinding → registry → producer_builds[fn_id]
2096    ///   → closure → strong Arc<dyn _Binding> → BenchBinding
2097    /// ```
2098    ///
2099    /// Closures hold `WeakCore` and `Weak<dyn _Binding>` instead, then
2100    /// upgrade-on-fire (returning early if either weak is dangling —
2101    /// indicating the host BenchCore was already dropped). Upgraded
2102    /// strong refs live only for the build closure's invocation; sinks
2103    /// the build closure spawns close over those upgraded strongs and
2104    /// stay alive only while the producer is active (cleared via
2105    /// `producer_deactivate` on last-subscriber unsubscribe).
2106    #[must_use]
2107    pub fn weak_handle(&self) -> WeakCore {
2108        WeakCore {
2109            state: Arc::downgrade(&self.state),
2110            cross_partition: Arc::downgrade(&self.cross_partition),
2111            binding: Arc::downgrade(&self.binding),
2112            registry: Arc::downgrade(&self.registry),
2113        }
2114    }
2115
2116    /// Number of distinct connected-component partitions tracked by
2117    /// the per-subgraph union-find registry (Slice X5 substrate).
2118    /// Two threads emitting into nodes with distinct partitions will
2119    /// run truly parallel once Y1 wires the wave engine through the
2120    /// registry; X5 reports the partition count for inspection
2121    /// (acceptance bar + debugging) but the wave engine still uses
2122    /// the legacy Core-level `wave_owner`.
2123    #[must_use]
2124    pub fn partition_count(&self) -> usize {
2125        self.registry.lock().component_count()
2126    }
2127
2128    /// Resolve `node`'s partition identity per the per-subgraph
2129    /// union-find registry (Slice X5 substrate). Two nodes with the
2130    /// same `SubgraphId` are connected via dep edges (transitively)
2131    /// and share a partition lock under Y1+; nodes in different
2132    /// partitions can run truly parallel.
2133    ///
2134    /// Returns `None` for unregistered nodes.
2135    #[must_use]
2136    pub fn partition_of(&self, node: NodeId) -> Option<crate::subgraph::SubgraphId> {
2137        self.registry.lock().partition_of(node)
2138    }
2139
2140    // Q3 (2026-05-09) introduced `Core::partition_box_of(node)` to
2141    // resolve a partition's `Arc<SubgraphLockBox>` for per-partition
2142    // state access. The D1 patch (2026-05-09) moved Slice G's
2143    // `tier3_emitted_this_wave` set off `SubgraphLockBox::state` to a
2144    // per-thread thread-local in `crate::batch`, eliminating
2145    // `partition_box_of`'s only callers (`commit_emission` /
2146    // `commit_emission_verbatim`). The helper is REMOVED rather than
2147    // kept dead — Q-beyond will resurrect a similar shape when the
2148    // CoreState shard layout actually needs per-partition lookups.
2149
2150    /// Acquire `seed`'s partition `wave_owner` re-entrant mutex with
2151    /// retry-validate against concurrent union/split. Mirrors
2152    /// graphrefly-py's `subgraph_locks.py::lock_for` retry pattern
2153    /// (lines 154–178): a concurrent `union_nodes` may redirect
2154    /// `seed`'s partition root between our `lock_for` resolve and
2155    /// our `lock_arc` call; if so, the held guard is on a stale
2156    /// (but still valid) `SubgraphLockBox` whose `Arc` no longer
2157    /// matches the registry's canonical box for `seed`'s current
2158    /// root. Release + retry up to [`crate::subgraph::MAX_LOCK_RETRIES`].
2159    ///
2160    /// Returns the held guard. Caller holds it for the wave's
2161    /// duration; drop releases.
2162    ///
2163    /// **Panics** if `seed` is not registered (caller violation —
2164    /// every wave entry takes a `NodeId` already in `s.nodes`, and
2165    /// the P12-fixed lock-discipline guarantees registry membership
2166    /// is published atomically with state). **Panics** on exceeding
2167    /// `MAX_LOCK_RETRIES` — pathological union activity.
2168    ///
2169    /// Slice Y1 / Phase E (2026-05-08).
2170    pub(crate) fn partition_wave_owner_lock_arc(&self, seed: NodeId) -> WaveOwnerGuard {
2171        /// Scope-guard for the H+ thread-local refcount entry. Released on
2172        /// Drop unless `into_consumed()` is called (the success path).
2173        /// Ensures balance even on panic between `check_and_acquire` and
2174        /// successful `WaveOwnerGuard` construction (`lock_arc()` /
2175        /// `lock_for_validate()` could in principle panic; defensive).
2176        struct AcquireGuard {
2177            sid: crate::subgraph::SubgraphId,
2178            consumed: bool,
2179        }
2180        impl AcquireGuard {
2181            fn into_consumed(mut self) {
2182                self.consumed = true;
2183            }
2184        }
2185        impl Drop for AcquireGuard {
2186            fn drop(&mut self) {
2187                if !self.consumed {
2188                    held_partitions::release(self.sid);
2189                }
2190            }
2191        }
2192
2193        for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
2194            let (sid, lock_box) = {
2195                let mut reg = self.registry.lock();
2196                reg.lock_for(seed).expect(
2197                    "partition_wave_owner_lock_arc: seed must be registered \
2198                     (P12-fix invariant: registry membership is published \
2199                     atomically with `s.nodes`)",
2200                )
2201            };
2202            // Phase H+ option (d) /qa N1(a) widened variant: BEFORE
2203            // acquiring the parking_lot lock, check ascending-order if
2204            // this thread already holds at least one partition AND we're
2205            // not in a producer build closure. Panics on violation.
2206            // Also increments the thread-local refcount for `sid`. The
2207            // `AcquireGuard` ensures the refcount is released on EVERY
2208            // exit path — successful return (via `into_consumed()`),
2209            // retry-validate failure (Drop fires), retry-exhaustion panic
2210            // (Drop fires before unwind), or a panic in `lock_arc()` /
2211            // `lock_for_validate()` (Drop fires during unwind).
2212            held_partitions::check_and_acquire(sid);
2213            let acquire_guard = AcquireGuard {
2214                sid,
2215                consumed: false,
2216            };
2217            let inner = lock_box.wave_owner.lock_arc();
2218            // Re-validate post-acquire. If a concurrent `union` redirected
2219            // `seed`'s root between our `lock_for` and `lock_arc`, the
2220            // registry's current box for `seed` differs from what we hold.
2221            let still_valid = self.registry.lock().lock_for_validate(seed, &lock_box);
2222            if still_valid {
2223                acquire_guard.into_consumed();
2224                // `lock_box` is unused after this point — the D1 patch
2225                // moved Slice G tier3 tracking off the per-partition
2226                // `SubgraphLockBox::state` to a per-thread thread-local,
2227                // so the guard no longer carries the box reference.
2228                drop(lock_box);
2229                return WaveOwnerGuard { sid, inner };
2230            }
2231            // Stale — drop the parking_lot guard. The AcquireGuard's
2232            // Drop releases the held_partitions refcount automatically.
2233            // Yield to give the contending writer a chance to make
2234            // forward progress before re-resolving (QA-fix group 2 —
2235            // earlier tight-spin could monopolize a CPU under sustained
2236            // pathological union/split activity).
2237            drop(inner);
2238            drop(acquire_guard);
2239            std::thread::yield_now();
2240        }
2241        panic!(
2242            "partition_wave_owner_lock_arc: exceeded {} retries for seed {:?} \
2243             — pathological concurrent union/split activity. Mirrors py \
2244             `_MAX_LOCK_RETRIES`.",
2245            crate::subgraph::MAX_LOCK_RETRIES,
2246            seed
2247        );
2248    }
2249
2250    /// BFS from `seed` along `s.children` (downstream consumer cascade
2251    /// for DATA / RESOLVED / INVALIDATE / COMPLETE / ERROR / TEARDOWN)
2252    /// and `meta_companions` (R1.3.9.d TEARDOWN cascade). Collects
2253    /// every partition reachable from `seed`, returning the unique
2254    /// `SubgraphId`s sorted ascending — the canonical lock-acquisition
2255    /// order per session-doc Q7 / decision D092 that guarantees
2256    /// deadlock-freedom across cross-partition waves.
2257    ///
2258    /// Holds the state lock + registry lock for the BFS duration
2259    /// (lock order `state → registry` per the P12-fix invariant).
2260    /// Bounded by the cascade graph reachable from `seed`; for typical
2261    /// apps the partition count is small (1–3) and the BFS is
2262    /// negligible relative to wave drain.
2263    ///
2264    /// Used by [`Core::begin_batch_for`] to compute the upfront-
2265    /// acquired partition set for per-seed waves. Closure-form
2266    /// [`Core::batch`] doesn't have a seed and uses
2267    /// [`Core::all_partitions_lock_boxes`] instead.
2268    ///
2269    /// Slice Y1 / Phase E (2026-05-08).
2270    pub(crate) fn compute_touched_partitions(
2271        &self,
2272        seed: NodeId,
2273    ) -> SmallVec<[crate::subgraph::SubgraphId; 4]> {
2274        let s = self.lock_state();
2275        let mut reg = self.registry.lock();
2276        let mut partitions: SmallVec<[crate::subgraph::SubgraphId; 4]> = SmallVec::new();
2277        let mut visited: HashSet<NodeId> = HashSet::default();
2278        let mut stack: SmallVec<[NodeId; 16]> = SmallVec::new();
2279        stack.push(seed);
2280        while let Some(n) = stack.pop() {
2281            if !visited.insert(n) {
2282                continue;
2283            }
2284            if let Some(p) = reg.partition_of(n) {
2285                if !partitions.contains(&p) {
2286                    partitions.push(p);
2287                }
2288            }
2289            if let Some(children) = s.children.get(&n) {
2290                stack.extend(children.iter().copied());
2291            }
2292            if let Some(rec) = s.nodes.get(&n) {
2293                stack.extend(rec.meta_companions.iter().copied());
2294            }
2295        }
2296        partitions.sort_unstable_by_key(|sid| sid.raw());
2297        partitions
2298    }
2299
2300    /// Snapshot of every currently-existing partition's lock box, in
2301    /// ascending [`crate::subgraph::SubgraphId`] order (canonical
2302    /// lock-acquisition order per session-doc Q7 / D092). Used by
2303    /// closure-form [`Core::batch`] / [`Core::begin_batch`] which
2304    /// don't have a known seed and must serialize against every
2305    /// existing partition.
2306    ///
2307    /// Slice Y1 / Phase E (2026-05-08).
2308    pub(crate) fn all_partitions_lock_boxes(
2309        &self,
2310    ) -> Vec<(
2311        crate::subgraph::SubgraphId,
2312        Arc<crate::subgraph::SubgraphLockBox>,
2313    )> {
2314        self.registry.lock().all_partitions()
2315    }
2316}
2317
2318/// Walk the undirected dep-edge graph from `start`, optionally
2319/// skipping ONE edge in both directions, and optionally treating
2320/// additional edges as if present. Returns every reachable
2321/// [`NodeId`].
2322///
2323/// Implementation note: uses a stack (`pop()` on a `SmallVec`) — i.e.
2324/// DFS traversal order. For pure reachability the order doesn't
2325/// matter (the visited set is identical to BFS); the function is
2326/// named "walk" rather than "BFS" to avoid implying that traversal
2327/// distance is meaningful (QA-fix group 2 — earlier name
2328/// `bfs_undirected_dep_graph` was misleading).
2329///
2330/// **Edge convention:** the dep edge `parent → child` represents
2331/// data flow from `parent` (a dep) to `child` (the consumer). It
2332/// appears in `s.children[parent]` as `child`, and in
2333/// `s.nodes[child].dep_records` as `parent`. `skip_edge =
2334/// Some((parent, child))` skips both forward (`parent → child`) and
2335/// backward (`child → parent`) traversals of that edge. Each
2336/// `(p, c)` pair in `extra_edges` is treated as if `c ∈
2337/// s.children[p]` and `p ∈ s.nodes[c].dep_records` — used for
2338/// "what would connectivity look like if THESE edges were also
2339/// present?" lookahead.
2340///
2341/// Used by Slice Y1 / Phase F (D3 split-eager, 2026-05-09):
2342/// - **P13 widening (pre-removal connectivity):** call with
2343///   `skip_edge = Some((removed_parent, removed_child))` AND
2344///   `extra_edges = added_edges_in_set_deps_call` so a `set_deps`
2345///   that simultaneously removes one edge AND adds another path
2346///   isn't falsely flagged as disconnecting (QA-fix #4 2026-05-09 —
2347///   without `extra_edges`, the pre-mutation BFS doesn't see the
2348///   would-be-added edges and rejects the conservative case).
2349/// - **Actual split execution (post-removal):** call with
2350///   `skip_edge = None` and `extra_edges = &[]`; the visited set is
2351///   the keep-side of the split (the side containing `start`).
2352pub(crate) fn walk_undirected_dep_graph(
2353    s: &CoreState,
2354    start: NodeId,
2355    skip_edge: Option<(NodeId, NodeId)>,
2356    extra_edges: &[(NodeId, NodeId)],
2357) -> HashSet<NodeId> {
2358    let mut visited: HashSet<NodeId> = HashSet::default();
2359    let mut queue: SmallVec<[NodeId; 32]> = SmallVec::new();
2360    queue.push(start);
2361    while let Some(cur) = queue.pop() {
2362        if !visited.insert(cur) {
2363            continue;
2364        }
2365        if let Some(consumers) = s.children.get(&cur) {
2366            for &c in consumers {
2367                let is_skipped = skip_edge.is_some_and(|(sp, sc)| cur == sp && c == sc);
2368                if !is_skipped && !visited.contains(&c) {
2369                    queue.push(c);
2370                }
2371            }
2372        }
2373        if let Some(rec) = s.nodes.get(&cur) {
2374            for d in rec.dep_records.iter().map(|r| r.node) {
2375                let is_skipped = skip_edge.is_some_and(|(sp, sc)| cur == sc && d == sp);
2376                if !is_skipped && !visited.contains(&d) {
2377                    queue.push(d);
2378                }
2379            }
2380        }
2381        // Virtual extra edges (e.g. would-be-added edges in
2382        // pre-mutation BFS).
2383        for &(ep, ec) in extra_edges {
2384            if cur == ep && !visited.contains(&ec) {
2385                queue.push(ec);
2386            }
2387            if cur == ec && !visited.contains(&ep) {
2388                queue.push(ep);
2389            }
2390        }
2391    }
2392    visited
2393}
2394
2395impl Core {
2396    /// Test-only inspection: number of `PendingBatch`es queued for
2397    /// `node` in the current wave. Used by Slice X4 D2 regression
2398    /// tests to pin the "common case = single batch, no SmallVec
2399    /// spill" perf invariant.
2400    ///
2401    /// Returns `None` if no `pending_notify` entry exists for `node`
2402    /// (no tier-1+ message has been queued for this node yet in this
2403    /// wave). `Some(0)` is unreachable by construction (a vacant
2404    /// entry implies no batches; an occupied entry has at least one).
2405    #[cfg(any(test, debug_assertions))]
2406    #[must_use]
2407    pub fn pending_batch_count(&self, node: NodeId) -> Option<usize> {
2408        self.lock_state()
2409            .pending_notify
2410            .get(&node)
2411            .map(|entry| entry.batches.len())
2412    }
2413
2414    /// Configure the Core-global cap on pause replay buffer length. When set,
2415    /// any per-node pause buffer that would exceed `cap` drops the oldest
2416    /// message(s) from the front; the dropped count is reported back via the
2417    /// resume callback (see [`ResumeReport`]). `None` (default) means
2418    /// unbounded; messages buffer indefinitely until the lockset clears.
2419    pub fn set_pause_buffer_cap(&self, cap: Option<usize>) {
2420        self.lock_state().pause_buffer_cap = cap;
2421    }
2422
2423    /// Configure the replay buffer cap on `node_id` (R2.6.5 / Lock 6.G —
2424    /// Slice E1, 2026-05-07). `None` disables the buffer. `Some(N)` keeps
2425    /// the last `N` DATA emissions in a circular buffer; late subscribers
2426    /// receive them as part of the per-tier handshake (between START and
2427    /// any terminal). Switching from a larger cap to a smaller cap evicts
2428    /// the front of the buffer to fit; switching to `None` drains the
2429    /// buffer entirely. Each evicted/drained handle's retain is released
2430    /// back to the binding.
2431    ///
2432    /// # Panics
2433    ///
2434    /// Panics if `node_id` is not registered.
2435    pub fn set_replay_buffer_cap(&self, node_id: NodeId, cap: Option<usize>) {
2436        // QA A7 (2026-05-07): normalize `Some(0)` to `None`. Two ways to
2437        // express "disabled" is confusing: `push_replay_buffer` already
2438        // treats `Some(0)` as no-op, so persisting it adds nothing.
2439        let cap = match cap {
2440            Some(0) => None,
2441            other => other,
2442        };
2443        let to_release: Vec<HandleId> = {
2444            let mut s = self.lock_state();
2445            let rec = s.require_node_mut(node_id);
2446            rec.replay_buffer_cap = cap;
2447            match cap {
2448                None => rec.replay_buffer.drain(..).collect(),
2449                Some(c) => {
2450                    let mut drained = Vec::new();
2451                    while rec.replay_buffer.len() > c {
2452                        if let Some(h) = rec.replay_buffer.pop_front() {
2453                            drained.push(h);
2454                        }
2455                    }
2456                    drained
2457                }
2458            }
2459        };
2460        for h in to_release {
2461            self.binding.release_handle(h);
2462        }
2463    }
2464
2465    /// Reconfigure the pause mode for `node_id` (canonical §2.6 — Slice F
2466    /// audit close, 2026-05-07). Default for new nodes is
2467    /// [`PausableMode::Default`]; switch to [`PausableMode::ResumeAll`]
2468    /// for nodes whose pause-window emit history must be observable
2469    /// verbatim, or [`PausableMode::Off`] for nodes intrinsically
2470    /// pause-immune.
2471    ///
2472    /// # Errors
2473    ///
2474    /// - [`SetPausableModeError::UnknownNode`] — `node_id` is not
2475    ///   registered.
2476    /// - [`SetPausableModeError::WhilePaused`] — the node currently
2477    ///   holds at least one pause lock. Changing mode mid-pause would
2478    ///   lose buffered content or strand a `pending_wave` flag — resume
2479    ///   all locks first.
2480    pub fn set_pausable_mode(
2481        &self,
2482        node_id: NodeId,
2483        mode: PausableMode,
2484    ) -> Result<(), SetPausableModeError> {
2485        let mut s = self.lock_state();
2486        let rec = s
2487            .nodes
2488            .get_mut(&node_id)
2489            .ok_or(SetPausableModeError::UnknownNode(node_id))?;
2490        if rec.pause_state.is_paused() {
2491            return Err(SetPausableModeError::WhilePaused);
2492        }
2493        rec.pausable = mode;
2494        Ok(())
2495    }
2496
2497    /// Configure the wave-drain iteration cap (R4.3 / Lock 2.F′). The wave
2498    /// engine aborts a drain after `cap` iterations with a diagnostic panic.
2499    /// Default is `10_000` — high enough to avoid false positives on legitimate
2500    /// fan-in cascades, low enough to surface runtime cycles within seconds.
2501    ///
2502    /// Lower this only when running adversarial / property-based tests that
2503    /// want fast cycle detection. Raise it only with concrete evidence that a
2504    /// legitimate workload needs more iterations than the default — and even
2505    /// then, prefer to tune the workload (per-subgraph batching, etc.) over
2506    /// raising the cap.
2507    ///
2508    /// # Panics
2509    ///
2510    /// Panics if `cap == 0` — a zero cap would abort every wave on the very
2511    /// first iteration, deadlocking any subsequent dispatcher work.
2512    pub fn set_max_batch_drain_iterations(&self, cap: u32) {
2513        assert!(cap > 0, "max_batch_drain_iterations must be > 0");
2514        self.lock_state().max_batch_drain_iterations = cap;
2515    }
2516
2517    /// Send a message UPSTREAM from `node_id` to each of its declared deps
2518    /// (canonical R1.4.1 — Slice F audit, F2 / 2026-05-07).
2519    ///
2520    /// The dispatcher rejects tier-3 (DATA / RESOLVED) and tier-5
2521    /// (COMPLETE / ERROR) per R1.4.1: value and terminal-lifecycle planes
2522    /// are downstream-only. All other tiers (0 START, 1 DIRTY, 2 PAUSE /
2523    /// RESUME, 4 INVALIDATE, 6 TEARDOWN) pass.
2524    ///
2525    /// # Routing per tier
2526    ///
2527    /// - **Tier 0 ([`Message::Start`]):** no-op. START is a per-subscription
2528    ///   handshake, not a routable wire signal — sending it upstream has no
2529    ///   well-defined target.
2530    /// - **Tier 1 ([`Message::Dirty`]):** no-op. The dep's "something
2531    ///   changed" notification is its own [`Self::emit`] / commit
2532    ///   responsibility; ignoring upstream DIRTY hints is safe.
2533    /// - **Tier 2 ([`Message::Pause`] / [`Message::Resume`]):** translates
2534    ///   to [`Self::pause`] / [`Self::resume`] on each dep. Lock id is
2535    ///   forwarded verbatim. Errors from individual deps are accumulated
2536    ///   in the `dep_errors` field of the returned report.
2537    /// - **Tier 4 ([`Message::Invalidate`]):** translates to
2538    ///   [`Self::invalidate`] on each dep. Note: canonical R1.4.2
2539    ///   distinguishes "downstream INVALIDATE" (cache clear + cascade) from
2540    ///   "upstream INVALIDATE" (plain forward, no self-process). The Rust
2541    ///   port v1 SIMPLIFICATION delegates to the same `Core::invalidate`
2542    ///   path — upstream INVALIDATE here DOES clear dep caches and cascade.
2543    ///   If a "plain forward" mode surfaces as a real consumer need, add
2544    ///   `up_with_options`.
2545    /// - **Tier 6 ([`Message::Teardown`]):** translates to
2546    ///   [`Self::teardown`] on each dep. Cascades per the standard
2547    ///   teardown path.
2548    ///
2549    /// # Errors
2550    ///
2551    /// - [`UpError::UnknownNode`] — `node_id` is not registered.
2552    /// - [`UpError::TierForbidden`] — tier 3 or tier 5.
2553    pub fn up(&self, node_id: NodeId, message: Message) -> Result<(), UpError> {
2554        // QA A10 (2026-05-07): check unknown node BEFORE tier rejection
2555        // for consistent error UX — `up(unknown, Data)` and
2556        // `up(unknown, Pause)` both report `UnknownNode` rather than
2557        // splitting on the tier.
2558        let dep_ids: Vec<NodeId> = {
2559            let s = self.lock_state();
2560            let rec = s.nodes.get(&node_id).ok_or(UpError::UnknownNode(node_id))?;
2561            rec.dep_ids_vec()
2562        };
2563        let tier = message.tier();
2564        if tier == 3 || tier == 5 {
2565            return Err(UpError::TierForbidden { tier });
2566        }
2567        for dep_id in dep_ids {
2568            match message {
2569                Message::Pause(lock) => {
2570                    let _ = self.pause(dep_id, lock);
2571                }
2572                Message::Resume(lock) => {
2573                    let _ = self.resume(dep_id, lock);
2574                }
2575                Message::Invalidate => {
2576                    self.invalidate(dep_id);
2577                }
2578                Message::Teardown => {
2579                    self.teardown(dep_id);
2580                }
2581                // Tier 0 START + tier 1 DIRTY: no-op upstream per the
2582                // routing table above.
2583                _ => {}
2584            }
2585        }
2586        Ok(())
2587    }
2588
2589    /// Allocate a unique [`LockId`] for use with [`Self::pause`] /
2590    /// [`Self::resume`]. Convenience for callers that don't already have an
2591    /// id-allocation scheme; user-supplied ids work too.
2592    #[must_use]
2593    pub fn alloc_lock_id(&self) -> LockId {
2594        let mut s = self.lock_state();
2595        let id = LockId::new(s.next_lock_id);
2596        s.next_lock_id += 1;
2597        id
2598    }
2599
2600    // -------------------------------------------------------------------
2601    // Registration — unified `register()` (D030, Slice D)
2602    //
2603    // All node kinds (State / Producer / Derived / Dynamic / Operator)
2604    // funnel through `Core::register(NodeRegistration) -> NodeId`. Sugar
2605    // wrappers (`register_state` / `register_producer` / `register_derived`
2606    // / `register_dynamic` / `register_operator`) build a `NodeRegistration`
2607    // and delegate. There is no parallel registration path internally.
2608    // -------------------------------------------------------------------
2609
2610    /// Unified node registration (D030).
2611    ///
2612    /// `reg` describes the node's identity (deps + closure-form fn id OR
2613    /// typed-op + per-kind opts). The kind is **derived from the field
2614    /// shape**, not stored — see [`NodeKind`].
2615    ///
2616    /// Sugar wrappers below ([`Self::register_state`],
2617    /// [`Self::register_producer`], [`Self::register_derived`],
2618    /// [`Self::register_dynamic`], [`Self::register_operator`]) build the
2619    /// registration for the common kinds and delegate here. Direct callers
2620    /// that need uncommon combinations (e.g., a partial-true derived) can
2621    /// invoke this method directly.
2622    ///
2623    /// # Errors
2624    ///
2625    /// Errors are returned in evaluation order — earlier phases short-circuit
2626    /// later ones, so a single registration produces at most one variant.
2627    ///
2628    /// **Phase 1 — lock-released, side-effect-free validation:**
2629    /// - [`RegisterError::OperatorWithoutDeps`] — `reg` carries an op but
2630    ///   `deps` is empty. Operator nodes need at least one dep — for
2631    ///   subscription-managed combinators with no declared deps, use
2632    ///   [`Self::register_producer`] instead.
2633    /// - [`RegisterError::InitialOnlyForStateNodes`] — `reg.opts.initial`
2634    ///   is non-sentinel for a non-state shape (deps non-empty, or
2635    ///   fn_or_op present). State nodes are the only kind with an initial
2636    ///   cache.
2637    ///
2638    /// **Phase 2 — operator scratch construction (lock-released):**
2639    /// - [`RegisterError::OperatorSeedSentinel`] — `reg` carries `Op(Scan)`
2640    ///   / `Op(Reduce)` with a `NO_HANDLE` seed. R2.5.3 — stateful folders
2641    ///   must have a real seed.
2642    ///
2643    /// **Phase 3 — state-lock validation (folded with insertion under a
2644    /// single lock acquisition per /qa F1 to prevent TOCTOU between
2645    /// validation and `nodes.insert`):**
2646    /// - [`RegisterError::UnknownDep`] — any element of `reg.deps` is not
2647    ///   a registered node id.
2648    /// - [`RegisterError::TerminalDep`] — a dep is terminal (COMPLETE /
2649    ///   ERROR) AND not resubscribable — would create a permanent wedge.
2650    ///
2651    /// All errors are construction-time invariants — the dispatcher
2652    /// rejects the registration before any reactive state is created.
2653    /// On `Err`, no node has been added and any handle retains taken on
2654    /// the way in (operator scratch seed retains via
2655    /// [`BindingBoundary::retain_handle`]) have been released
2656    /// lock-released — see [`ScratchReleaseGuard`] for the RAII
2657    /// discipline that covers both early-return AND unwind paths.
2658    /// `Last { default }` retains its `default` handle on the same
2659    /// release path.
2660    pub fn register(&self, reg: NodeRegistration) -> Result<NodeId, RegisterError> {
2661        let NodeRegistration {
2662            deps,
2663            fn_or_op,
2664            opts,
2665        } = reg;
2666        let NodeOpts {
2667            initial,
2668            equals,
2669            partial,
2670            is_dynamic,
2671            pausable,
2672            replay_buffer,
2673        } = opts;
2674
2675        // Derive the field shape from fn_or_op + deps.
2676        let (fn_id, op) = match fn_or_op {
2677            Some(NodeFnOrOp::Fn(f)) => (Some(f), None),
2678            Some(NodeFnOrOp::Op(o)) => (None, Some(o)),
2679            None => (None, None),
2680        };
2681
2682        // Phase 1 — lock-released, side-effect-free validation. Errors
2683        // here return BEFORE any handle retain is taken.
2684        //
2685        //   - State (no deps + no fn + no op) is the only kind with `initial`.
2686        //   - Dynamic flag only meaningful when fn + non-empty deps.
2687        //   - Operator (op present) must have deps (P9: operator without deps
2688        //     would skip activation — use a producer instead).
2689        let is_state_shape = deps.is_empty() && fn_id.is_none() && op.is_none();
2690        if op.is_some() && deps.is_empty() {
2691            return Err(RegisterError::OperatorWithoutDeps);
2692        }
2693        if initial != NO_HANDLE && !is_state_shape {
2694            return Err(RegisterError::InitialOnlyForStateNodes);
2695        }
2696
2697        // Phase 2 — build per-operator scratch struct (may take handle
2698        // retains via `binding.retain_handle` for Scan/Reduce/Last seed).
2699        // Lock-released per Slice E (D045) handshake discipline. Returns
2700        // `OperatorSeedSentinel` BEFORE retain so an Err leaves no
2701        // dangling handles.
2702        let scratch = match op {
2703            Some(operator_op) => self.make_op_scratch(operator_op)?,
2704            None => None,
2705        };
2706
2707        // Wrap scratch in an RAII guard immediately after Phase 2. From
2708        // here on, ANY early return / unwind path correctly releases the
2709        // scratch's handle retains via `OperatorScratch::release_handles`
2710        // (Slice H /qa F2 — defense against panics between Phase 2 and
2711        // Phase 3 cleanup branch). Lock-released because the guard is
2712        // declared BEFORE `lock_state()` below — variable destruction
2713        // order is reverse declaration order, so the `MutexGuard` drops
2714        // first on any return path.
2715        let scratch_guard = ScratchReleaseGuard::new(scratch, &*self.binding);
2716
2717        // Phase 3 — state-lock-required validation, FOLDED with insertion
2718        // under a single `lock_state()` acquisition per /qa F1. The
2719        // pre-/qa version split this into two acquisitions (one for
2720        // validation, one for `alloc_node_id` + `nodes.insert`), opening
2721        // a TOCTOU window where a concurrent `Core::complete(dep)` on a
2722        // non-resubscribable dep could slip in and recreate the wedge
2723        // `TerminalDep` was designed to prevent. Single locked region
2724        // closes the gap.
2725        let mut s = self.lock_state();
2726
2727        for &dep in &deps {
2728            if !s.nodes.contains_key(&dep) {
2729                return Err(RegisterError::UnknownDep(dep));
2730            }
2731        }
2732        // Slice F audit (2026-05-07): mirror `set_deps`'s `TerminalDep`
2733        // rejection at registration time. Adding a non-resubscribable
2734        // terminal node as a dep at registration creates a permanent wedge.
2735        for &dep in &deps {
2736            let dep_rec = s.require_node(dep);
2737            if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
2738                return Err(RegisterError::TerminalDep(dep));
2739            }
2740        }
2741
2742        // Validation passed — install. Take scratch out of the guard
2743        // (disarms the release-on-drop) and continue using `s`.
2744        let installed_scratch = scratch_guard.take();
2745
2746        let id = s.alloc_node_id();
2747
2748        // `tracked`: Static derived + Operator track all deps; Dynamic
2749        // starts empty and fills via fn return; State / Producer have no
2750        // deps so tracked is empty.
2751        let tracked: HashSet<usize> = if op.is_some() {
2752            (0..deps.len()).collect()
2753        } else if is_dynamic {
2754            HashSet::new()
2755        } else if fn_id.is_some() && !deps.is_empty() {
2756            // Static derived
2757            (0..deps.len()).collect()
2758        } else {
2759            HashSet::new()
2760        };
2761
2762        let dep_records: Vec<DepRecord> = deps.iter().map(|&d| DepRecord::new(d)).collect();
2763
2764        let rec = NodeRecord {
2765            dep_records,
2766            fn_id,
2767            op,
2768            is_dynamic,
2769            equals,
2770            cache: initial,
2771            has_fired_once: initial != NO_HANDLE,
2772            subscribers: HashMap::new(),
2773            subscribers_revision: 0,
2774            tracked,
2775            dirty: false,
2776            involved_this_wave: false,
2777            pause_state: PauseState::Active,
2778            pausable,
2779            replay_buffer_cap: replay_buffer,
2780            replay_buffer: VecDeque::new(),
2781            terminal: None,
2782            has_received_teardown: false,
2783            resubscribable: false,
2784            meta_companions: Vec::new(),
2785            partial,
2786            op_scratch: installed_scratch,
2787        };
2788        s.nodes.insert(id, rec);
2789        s.children.insert(id, HashSet::new());
2790        for &dep in &deps {
2791            s.children.entry(dep).or_default().insert(id);
2792        }
2793        // Slice Y1 (D3 / D090 — P12 fix, 2026-05-08): maintain partition
2794        // membership BEFORE dropping the state lock. Closes the
2795        // eventual-consistency window where a concurrent thread observed
2796        // the new node in `s.nodes` / new edges in `s.children` but the
2797        // registry hadn't unioned the partition yet. Today benign
2798        // (`partition_of` is debug-only); under Y1's wave engine
2799        // migration `lock_for(node)` consumes registry state on the hot
2800        // path, and the window means `lock_for` could resolve to a
2801        // partition that's been topologically unioned in `s.children`
2802        // but not yet in `registry`.
2803        //
2804        // **Lock-discipline invariant:** `state lock → registry mutex`
2805        // (one-way; never registry → state). Registry mutex is
2806        // uncontended in the X5 substrate — the only acquisition sites
2807        // are this one + `Core::set_deps` + the read-only accessors
2808        // `partition_count`/`partition_of` and (Y1+) `lock_for` — none
2809        // of which take the state lock — so the inner critical section
2810        // adds negligible latency.
2811        {
2812            let mut reg = self.registry.lock();
2813            reg.ensure_registered(id);
2814            for &dep in &deps {
2815                reg.union_nodes(id, dep);
2816            }
2817        }
2818        drop(s);
2819        self.fire_topology_event(&crate::topology::TopologyEvent::NodeRegistered(id));
2820        Ok(id)
2821    }
2822
2823    /// Sugar over [`Self::register`] — register a state node. `initial`
2824    /// may be [`NO_HANDLE`] to start sentinel.
2825    ///
2826    /// `partial` is accepted for surface consistency (D019); for state
2827    /// nodes it has no effect (state nodes don't fire fn).
2828    ///
2829    /// # Errors
2830    ///
2831    /// State registration is structurally simple — no deps, no op — so
2832    /// the only reachable variant is none in practice. Returns
2833    /// [`Result`] for surface consistency with [`Self::register`].
2834    pub fn register_state(
2835        &self,
2836        initial: HandleId,
2837        partial: bool,
2838    ) -> Result<NodeId, RegisterError> {
2839        self.register(NodeRegistration {
2840            deps: Vec::new(),
2841            fn_or_op: None,
2842            opts: NodeOpts {
2843                initial,
2844                partial,
2845                ..NodeOpts::default()
2846            },
2847        })
2848    }
2849
2850    /// Sugar over [`Self::register`] — register a producer node (D031,
2851    /// Slice D). No deps; fn fires once on first subscribe; cleanup runs
2852    /// via [`BindingBoundary::producer_deactivate`] when the last
2853    /// subscriber unsubscribes.
2854    ///
2855    /// The fn body uses the binding's `ProducerCtx`-equivalent helper
2856    /// (see `graphrefly-operators::producer`) to subscribe to other Core
2857    /// nodes — the zip / concat / race / takeUntil pattern.
2858    ///
2859    /// # Errors
2860    ///
2861    /// Producer registration has no user-supplied deps, so structurally
2862    /// none of [`RegisterError`]'s variants are reachable. Returns
2863    /// [`Result`] for surface consistency with [`Self::register`].
2864    pub fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
2865        self.register(NodeRegistration {
2866            deps: Vec::new(),
2867            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2868            opts: NodeOpts {
2869                // Producers have no deps — the first-run gate is degenerate.
2870                partial: true,
2871                ..NodeOpts::default()
2872            },
2873        })
2874    }
2875
2876    /// Sugar over [`Self::register`] — register a derived (static) node.
2877    /// `partial` controls the R2.5.3 first-run gate (D011).
2878    ///
2879    /// # Errors
2880    ///
2881    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2882    ///   registered.
2883    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2884    ///   resubscribable.
2885    pub fn register_derived(
2886        &self,
2887        deps: &[NodeId],
2888        fn_id: FnId,
2889        equals: EqualsMode,
2890        partial: bool,
2891    ) -> Result<NodeId, RegisterError> {
2892        self.register(NodeRegistration {
2893            deps: deps.to_vec(),
2894            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2895            opts: NodeOpts {
2896                equals,
2897                partial,
2898                ..NodeOpts::default()
2899            },
2900        })
2901    }
2902
2903    /// Sugar over [`Self::register`] — register a dynamic node (fn
2904    /// declares its actually-tracked dep indices per fire). `partial`
2905    /// controls the R2.5.3 first-run gate (D011).
2906    ///
2907    /// # Errors
2908    ///
2909    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2910    ///   registered.
2911    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2912    ///   resubscribable.
2913    pub fn register_dynamic(
2914        &self,
2915        deps: &[NodeId],
2916        fn_id: FnId,
2917        equals: EqualsMode,
2918        partial: bool,
2919    ) -> Result<NodeId, RegisterError> {
2920        self.register(NodeRegistration {
2921            deps: deps.to_vec(),
2922            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2923            opts: NodeOpts {
2924                equals,
2925                partial,
2926                is_dynamic: true,
2927                ..NodeOpts::default()
2928            },
2929        })
2930    }
2931
2932    /// Build a fresh [`OperatorScratch`](crate::op_state::OperatorScratch)
2933    /// box for an operator variant, taking any required handle retains.
2934    /// Shared between `register_operator` (initial install) and
2935    /// `reset_for_fresh_lifecycle` (resubscribable cycle re-install).
2936    ///
2937    /// # Errors
2938    ///
2939    /// Returns [`RegisterError::OperatorSeedSentinel`] if `op` is `Scan`
2940    /// / `Reduce` with a [`NO_HANDLE`] seed (R2.5.3 — stateful folders
2941    /// must have a real seed). Refcount discipline: the seed-sentinel
2942    /// check happens BEFORE [`BindingBoundary::retain_handle`], so an
2943    /// `Err` leaves no handles dangling.
2944    fn make_op_scratch(
2945        &self,
2946        op: OperatorOp,
2947    ) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
2948        use crate::op_state::{
2949            DistinctState, LastState, PairwiseState, ReduceState, ScanState, SkipState, TakeState,
2950            TakeWhileState,
2951        };
2952        // Slice H (2026-05-07): Scan/Reduce seed-sentinel checks happen
2953        // BEFORE retain_handle so an Err return leaves no handles dangling.
2954        //
2955        // Slice H /qa F13 (2026-05-07): for retaining variants, allocate
2956        // the `Box<State>` BEFORE calling `binding.retain_handle`. If
2957        // `Box::new` panics (e.g., OOM-as-panic), no retain has happened
2958        // yet — no leak. If `retain_handle` panics after Box succeeds,
2959        // the `Box<State>` is dropped on unwind; State has no handle yet
2960        // (we haven't touched the registry refcount), so still no leak.
2961        // Caller wraps the returned scratch in `ScratchReleaseGuard` to
2962        // cover panics AFTER make_op_scratch returns.
2963        match op {
2964            OperatorOp::Scan { seed, .. } => {
2965                if seed == NO_HANDLE {
2966                    return Err(RegisterError::OperatorSeedSentinel);
2967                }
2968                let state = Box::new(ScanState { acc: seed });
2969                self.binding.retain_handle(seed);
2970                Ok(Some(state))
2971            }
2972            OperatorOp::Reduce { seed, .. } => {
2973                if seed == NO_HANDLE {
2974                    return Err(RegisterError::OperatorSeedSentinel);
2975                }
2976                let state = Box::new(ReduceState { acc: seed });
2977                self.binding.retain_handle(seed);
2978                Ok(Some(state))
2979            }
2980            OperatorOp::DistinctUntilChanged { .. } => Ok(Some(Box::new(DistinctState::default()))),
2981            OperatorOp::Pairwise { .. } => Ok(Some(Box::new(PairwiseState::default()))),
2982            OperatorOp::Take { .. } => Ok(Some(Box::new(TakeState::default()))),
2983            OperatorOp::Skip { .. } => Ok(Some(Box::new(SkipState::default()))),
2984            OperatorOp::TakeWhile { .. } => Ok(Some(Box::new(TakeWhileState))),
2985            OperatorOp::Last { default } => {
2986                let state = Box::new(LastState {
2987                    latest: NO_HANDLE,
2988                    default,
2989                });
2990                if default != NO_HANDLE {
2991                    self.binding.retain_handle(default);
2992                }
2993                Ok(Some(state))
2994            }
2995            OperatorOp::Map { .. }
2996            | OperatorOp::Filter { .. }
2997            | OperatorOp::Combine { .. }
2998            | OperatorOp::WithLatestFrom { .. }
2999            | OperatorOp::Merge => Ok(None),
3000        }
3001    }
3002
3003    /// Sugar over [`Self::register`] — register a built-in operator node
3004    /// (Slice C-1, D009; D026 generic scratch). The operator dispatch path
3005    /// lives in `fire_operator`; `op` selects which per-operator FFI
3006    /// method on [`BindingBoundary`] gets called per fire.
3007    ///
3008    /// For stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
3009    /// [`Last`] with a default), the seed/default handle is captured
3010    /// into the appropriate
3011    /// [`OperatorScratch`](crate::op_state::OperatorScratch) struct
3012    /// stored at [`NodeRecord::op_scratch`], and Core takes one retain
3013    /// share via [`BindingBoundary::retain_handle`].
3014    ///
3015    /// # Errors
3016    ///
3017    /// - [`RegisterError::OperatorWithoutDeps`] — `deps` is empty (use
3018    ///   [`Self::register_producer`] instead).
3019    /// - [`RegisterError::OperatorSeedSentinel`] — `op` is
3020    ///   [`OperatorOp::Scan`] / [`OperatorOp::Reduce`] with a
3021    ///   [`NO_HANDLE`] seed.
3022    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
3023    ///   registered.
3024    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
3025    ///   resubscribable.
3026    pub fn register_operator(
3027        &self,
3028        deps: &[NodeId],
3029        op: OperatorOp,
3030        opts: OperatorOpts,
3031    ) -> Result<NodeId, RegisterError> {
3032        self.register(NodeRegistration {
3033            deps: deps.to_vec(),
3034            fn_or_op: Some(NodeFnOrOp::Op(op)),
3035            opts: NodeOpts {
3036                equals: opts.equals,
3037                partial: opts.partial,
3038                ..NodeOpts::default()
3039            },
3040        })
3041    }
3042
3043    // -------------------------------------------------------------------
3044    // Subscription
3045    // -------------------------------------------------------------------
3046
3047    /// Subscribe a sink to a node. Returns a [`Subscription`] handle —
3048    /// dropping the handle unsubscribes the sink. Per §10.12, no manual
3049    /// `unsubscribe(node, id)` call is required.
3050    ///
3051    /// Push-on-subscribe (R1.2.3, R2.2.3 step 4): the sink is registered AFTER
3052    /// the START handshake fires. The handshake contents depend on node
3053    /// state:
3054    /// - Sentinel cache + live (non-terminal): `[START]`
3055    /// - Cached + live: `[START, DATA(handle)]`
3056    /// - Cached + terminated (non-resubscribable): `[START, DATA(handle), <terminal>]`
3057    /// - Sentinel + terminated (non-resubscribable): `[START, <terminal>]`
3058    ///
3059    /// Resubscribable terminal lifecycle (R2.2.7 / R2.5.3): if the node was
3060    /// marked resubscribable via [`Self::set_resubscribable`] AND has
3061    /// terminated, the subscribe call first **resets** the node — clears
3062    /// `terminal`, `has_fired_once`, `has_received_teardown`, all
3063    /// `dep_handles` to `NO_HANDLE`, all `dep_terminals` to `None`, and
3064    /// drains the pause lockset. The new subscriber then receives a fresh
3065    /// `[START]` (cache may survive for state nodes; sentinel for compute).
3066    ///
3067    /// Activation (R2.2.3 step 5): if this is the first subscriber and the
3068    /// node is a derived/dynamic compute, recursively activate deps so their
3069    /// cached handles fill our `dep_handles`.
3070    #[allow(clippy::needless_pass_by_value)] // Sink is `Arc<dyn Fn>`; we clone for the subscribers map and call it directly. Taking by value matches the ergonomics callers expect.
3071    pub fn subscribe(&self, node_id: NodeId, sink: Sink) -> Subscription {
3072        // Subscribe protocol (Slice E rework, post-handshake-reentry-lift):
3073        //
3074        // 1. Acquire `wave_owner` first (re-entrant; same-thread passes
3075        //    through, cross-thread blocks). This is the cross-thread
3076        //    serialization point that preserves R1.3.5.a happens-after
3077        //    ordering across the lock-released handshake fire.
3078        // 2. Acquire state lock briefly: alloc sub_id, run resubscribable
3079        //    reset if applicable, snapshot handshake state, install sink
3080        //    in `subscribers`. Drop state lock.
3081        // 3. Fire handshake LOCK-RELEASED. Per-tier slices (R1.3.5.a):
3082        //    `[Start]` / `[Data(cache)]?` / `[Complete]?` / `[Error(h)]?`
3083        //    / `[Teardown]?`. Empty tiers are skipped. Sink callbacks
3084        //    may re-enter Core freely — same-thread re-entry passes
3085        //    through `wave_owner` reentrantly.
3086        // 4. Run activation under `run_wave` if needed (first subscriber
3087        //    on a non-state node).
3088        // 5. Drop `wave_owner`.
3089        //
3090        // Race-fix discipline: the sink is installed in `subscribers`
3091        // BEFORE the state lock drops, so concurrent threads that
3092        // acquire `wave_owner` after our scope sees the sink already
3093        // registered. Cross-thread emits block on `wave_owner` until
3094        // we drop it, ensuring all our handshake calls land before
3095        // any concurrent wave's flush observes the sink.
3096
3097        // Acquire the partition's `wave_owner` first — cross-thread
3098        // serialization point. Per Slice Y1 / Phase E (2026-05-08),
3099        // subscribe routes through the per-partition lock instead of
3100        // a Core-global one. Subscribe touches only `node_id`'s
3101        // partition (activation cascade stays within the partition
3102        // because dep edges are unioned). `partition_wave_owner_lock_arc`
3103        // does retry-validate against concurrent union/split.
3104        // `lock_arc()` is `!Send`; same-thread reentrant.
3105        let _wave_guard = self.partition_wave_owner_lock_arc(node_id);
3106
3107        let (sub_id, tier_slices, needs_activation, did_reset) = {
3108            let mut s = self.lock_state();
3109            let sub_id = s.alloc_sub_id();
3110
3111            // Resubscribable reset: terminal + flagged → clear lifecycle
3112            // state so the incoming subscriber starts fresh. F3 audit
3113            // guard: a node that has received TEARDOWN (R2.6.4) is
3114            // permanently destroyed at this layer; resurrecting it via a
3115            // late subscribe is a category error. COMPLETE/ERROR is
3116            // recoverable for resubscribable nodes; TEARDOWN is not. The
3117            // handshake will still replay the terminal in the non-reset
3118            // branch so the late subscriber sees a clean
3119            // `[START, ?DATA, COMPLETE|ERROR, TEARDOWN]` stream.
3120            let needs_reset = {
3121                let rec = s.require_node(node_id);
3122                rec.resubscribable && rec.terminal.is_some() && !rec.has_received_teardown
3123            };
3124            if needs_reset {
3125                self.reset_for_fresh_lifecycle(&mut s, node_id);
3126            }
3127
3128            // Snapshot handshake state under lock.
3129            let (cache, is_state, first_subscriber, terminal, torn_down) = {
3130                let rec = s.require_node(node_id);
3131                (
3132                    rec.cache,
3133                    rec.is_state(),
3134                    rec.subscribers.is_empty(),
3135                    rec.terminal,
3136                    rec.has_received_teardown,
3137                )
3138            };
3139
3140            // Build per-tier handshake slices. Each non-empty slice is
3141            // fired as a separate sink call (R1.3.5.a tier-split).
3142            let mut tier_slices: SmallVec<[Vec<Message>; 4]> = SmallVec::new();
3143            tier_slices.push(vec![Message::Start]);
3144            if cache != NO_HANDLE {
3145                tier_slices.push(vec![Message::Data(cache)]);
3146            }
3147            // Slice E1 (R2.6.5 / Lock 6.G): replay buffered DATA between
3148            // [Start] (and the cache slice, if present) and any terminal.
3149            // Each buffered handle becomes a separate per-tier slice so
3150            // late subscribers see the historical Data sequence as
3151            // distinct sink calls.
3152            //
3153            // Dedupe: when a cache slice is present and the buffer's last
3154            // entry is the same handle (the typical case — cache always
3155            // tracks the last DATA emitted, and the buffer's tail entry
3156            // is that same DATA), skip the last buffer entry to avoid
3157            // delivering Data(cache) twice. For state nodes whose cache
3158            // survives unsubscribe, the buffer may have older entries
3159            // the cache doesn't reflect; the dedupe only drops the
3160            // single trailing entry that equals cache. (QA A1, 2026-05-07)
3161            let replay_handles: Vec<HandleId> = {
3162                let rec = s.require_node(node_id);
3163                let cap = rec.replay_buffer_cap.unwrap_or(0);
3164                if cap == 0 {
3165                    Vec::new()
3166                } else {
3167                    let mut v: Vec<HandleId> = rec.replay_buffer.iter().copied().collect();
3168                    if cache != NO_HANDLE && v.last() == Some(&cache) {
3169                        v.pop();
3170                    }
3171                    v
3172                }
3173            };
3174            for h in &replay_handles {
3175                tier_slices.push(vec![Message::Data(*h)]);
3176            }
3177            if let Some(t) = terminal {
3178                tier_slices.push(vec![match t {
3179                    TerminalKind::Complete => Message::Complete,
3180                    TerminalKind::Error(h) => Message::Error(h),
3181                }]);
3182            }
3183            if torn_down {
3184                tier_slices.push(vec![Message::Teardown]);
3185            }
3186
3187            // Install sink BEFORE dropping state lock so any thread that
3188            // subsequently acquires `wave_owner` (after our scope ends)
3189            // sees the sink already registered.
3190            //
3191            // Slice X4 / D2: bump `subscribers_revision` alongside the
3192            // insert so a pending_notify entry opened earlier in the same
3193            // wave (e.g. inside `batch(|| { emit(s, h1); subscribe(s,
3194            // late); emit(s, h2); })`) starts a fresh `PendingBatch` on
3195            // its next `queue_notify` push — making the new sink visible
3196            // to subsequent emits' flush slices, while the pre-subscribe
3197            // batch's snapshot stays frozen so we don't double-deliver
3198            // earlier emits via the wave's flush AND the new sub's
3199            // handshake replay.
3200            {
3201                let rec = s.require_node_mut(node_id);
3202                rec.subscribers.insert(sub_id, sink.clone());
3203                rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
3204            }
3205
3206            let needs_activation = first_subscriber && !is_state;
3207            (sub_id, tier_slices, needs_activation, needs_reset)
3208            // state lock drops here
3209        };
3210
3211        // Slice E2 (R2.4.6 / D055): on resubscribable terminal reset, fire
3212        // `wipe_ctx` LOCK-RELEASED so the binding drops its `NodeCtxState`
3213        // entry (clearing both `store` and any residual `current_cleanup`).
3214        // The new subscriber's first invoke_fn sees a fresh empty store.
3215        // Fires AFTER the state lock drops so the binding's
3216        // `node_ctx.lock()` can't deadlock against Core's state lock — and
3217        // BEFORE the handshake so the wipe is observable before any
3218        // user-visible interaction with the new lifecycle.
3219        if did_reset {
3220            self.binding.wipe_ctx(node_id);
3221        }
3222
3223        // Fire handshake LOCK-RELEASED. Sink may re-enter Core; same-
3224        // thread re-entry passes through `wave_owner` reentrantly.
3225        // Cross-thread emits block at `wave_owner` until our scope ends.
3226        //
3227        // A7 (Slice F, 2026-05-07): per-tier slice fire is wrapped in
3228        // `catch_unwind`. The sink is installed in `subscribers` BEFORE
3229        // the handshake fires (load-bearing — concurrent threads observe
3230        // the sink immediately). If a sink panics on tier N, the panic
3231        // would otherwise unwind out of `subscribe` BEFORE the
3232        // `Subscription` handle is constructed, leaving the sink
3233        // registered in `subscribers` with no user-held handle to drop.
3234        // Subsequent waves' `flush_notifications` would re-fire the
3235        // panicking sink forever.
3236        //
3237        // On panic: remove the sink from `subscribers` (via the
3238        // already-allocated `sub_id`), drop `_wave_guard` cleanly via
3239        // RAII, and resume the unwind so the user observes the panic at
3240        // the call site. Same effect as the user dropping the
3241        // `Subscription` immediately, but pre-emptive.
3242        for slice in &tier_slices {
3243            let sink_clone = sink.clone();
3244            let slice_ref: &[Message] = slice;
3245            let result = catch_unwind(AssertUnwindSafe(|| sink_clone(slice_ref)));
3246            if let Err(panic_payload) = result {
3247                // Remove the orphaned sink. Best-effort: if the node was
3248                // since torn down (e.g., the sink itself called teardown
3249                // before panicking), the entry may already be gone.
3250                {
3251                    let mut s = self.lock_state();
3252                    if let Some(rec) = s.nodes.get_mut(&node_id) {
3253                        rec.subscribers.remove(&sub_id);
3254                        // Slice X4 / D2: keep revision-tracked snapshot
3255                        // discipline consistent with the install site —
3256                        // any pending_notify entry that already absorbed
3257                        // the panicking sink under the post-install
3258                        // revision should start a fresh batch on its
3259                        // next queue_notify push.
3260                        rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
3261                    }
3262                }
3263                std::panic::resume_unwind(panic_payload);
3264            }
3265        }
3266
3267        // Run activation if needed. `run_wave_for(node_id)` acquires
3268        // only the partitions transitively touched from `node_id`
3269        // (downstream cascade + meta-companion teardown reach) — same-
3270        // partition activation re-enters reentrantly. Slice Y1 / Phase E.
3271        if needs_activation {
3272            self.run_wave_for(node_id, |this| {
3273                let mut s = this.lock_state();
3274                this.activate_derived(&mut s, node_id);
3275            });
3276        }
3277
3278        Subscription {
3279            state: Arc::downgrade(&self.state),
3280            node_id,
3281            sub_id,
3282        }
3283        // _wave_guard drops here, releasing wave_owner.
3284    }
3285
3286    /// Mark `node_id` as resubscribable per R2.2.7. Resubscribable nodes
3287    /// reset their terminal-lifecycle state on a fresh subscribe — see
3288    /// [`Self::subscribe`].
3289    ///
3290    /// Configuration call — must be made before the node has any active
3291    /// subscribers, since changing the policy mid-flight would surprise
3292    /// existing observers.
3293    ///
3294    /// # Panics
3295    ///
3296    /// Panics if the node has subscribers (the policy is observable
3297    /// behavior; changing it after the fact would change semantics for
3298    /// existing sinks).
3299    pub fn set_resubscribable(&self, node_id: NodeId, resubscribable: bool) {
3300        let mut s = self.lock_state();
3301        let rec = s.require_node_mut(node_id);
3302        assert!(
3303            rec.subscribers.is_empty(),
3304            "set_resubscribable: node already has subscribers; \
3305             configure resubscribable before any subscribe call"
3306        );
3307        rec.resubscribable = resubscribable;
3308    }
3309
3310    /// Reset a resubscribable node's terminal-lifecycle state. Called from
3311    /// `subscribe` when a late subscriber arrives at a flagged node.
3312    ///
3313    /// Released: terminal-slot retain (Error handle), all per-dep terminal
3314    /// retains (Error handles), all data_batch retains.
3315    /// Cleared: `terminal`, `has_fired_once`, `has_received_teardown`, all
3316    /// dep_records to sentinel, the pause lockset (any held locks are
3317    /// released — replay buffer drops silently because there are no
3318    /// subscribers to flush to).
3319    fn reset_for_fresh_lifecycle(&self, s: &mut CoreState, node_id: NodeId) {
3320        // Phase 1: collect wave-state handle releases + take the old
3321        // op_scratch + reset other state. Take all mutations under one
3322        // borrow so the post-borrow phases don't re-walk dep_records.
3323        let (prev_op, mut old_scratch, handles_to_release, pause_buffer_payloads) = {
3324            let rec = s.require_node_mut(node_id);
3325            let mut hs = Vec::new();
3326            if let Some(TerminalKind::Error(h)) = rec.terminal {
3327                hs.push(h);
3328            }
3329            for dr in &rec.dep_records {
3330                if let Some(TerminalKind::Error(h)) = dr.terminal {
3331                    hs.push(h);
3332                }
3333                for &h in &dr.data_batch {
3334                    hs.push(h);
3335                }
3336                // Slice C-3 /qa: also release `prev_data`. Prior to this
3337                // collection, `reset_for_fresh_lifecycle` overwrote
3338                // `dr.prev_data = NO_HANDLE` without releasing the old
3339                // handle, leaking one share per dep per resubscribable
3340                // cycle. The leak was masked because no test exercised
3341                // the per-dep `prev_data` retain across a lifecycle
3342                // reset; surfaced by the T1 tightening of
3343                // `last_releases_buffered_latest_on_lifecycle_reset`.
3344                if dr.prev_data != NO_HANDLE {
3345                    hs.push(dr.prev_data);
3346                }
3347            }
3348            // Take pause_state's buffer; collect its payload handles for
3349            // release (they were retained at queue_notify time; buffer
3350            // drops because the new subscriber starts fresh).
3351            let mut pulled = Vec::new();
3352            if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
3353                for msg in buffer.drain(..) {
3354                    if let Some(h) = msg.payload_handle() {
3355                        pulled.push(h);
3356                    }
3357                }
3358            }
3359            // Slice E1: drain the replay buffer too — the new subscriber
3360            // gets a fresh lifecycle and shouldn't see prior emissions.
3361            for h in rec.replay_buffer.drain(..) {
3362                pulled.push(h);
3363            }
3364            // Reset wave / lifecycle state.
3365            rec.terminal = None;
3366            rec.has_fired_once = rec.cache != NO_HANDLE && rec.is_state();
3367            rec.has_received_teardown = false;
3368            for dr in &mut rec.dep_records {
3369                dr.prev_data = NO_HANDLE;
3370                dr.data_batch.clear();
3371                dr.terminal = None;
3372                dr.dirty = false;
3373                dr.involved_this_wave = false;
3374            }
3375            rec.pause_state = PauseState::Active;
3376            rec.involved_this_wave = false;
3377            rec.dirty = false;
3378            // P7 (Slice A close /qa): Dynamic nodes clear `tracked` so
3379            // the post-reset first fire repopulates from the fn's
3380            // returned tracked-deps set.
3381            if rec.is_dynamic {
3382                rec.tracked.clear();
3383            }
3384            // Take the old scratch out so we can release its handles and
3385            // install a fresh one. Operator op is copied for the
3386            // rebuild step below.
3387            let prev_op = rec.op;
3388            let old = std::mem::take(&mut rec.op_scratch);
3389            (prev_op, old, hs, pulled)
3390        };
3391
3392        // Phase 2 (Slice C-3 /qa P1 — RETAIN-BEFORE-RELEASE ordering):
3393        // build the fresh scratch FIRST, taking new retains on any
3394        // seed/default handles. This must run BEFORE Phase 3 releases
3395        // the old scratch's shares — if old `acc` (Scan/Reduce) or old
3396        // `latest` (Last) aliases the new `seed`/`default` (common:
3397        // `fold(seed, x) == seed` interns to the same registry entry),
3398        // releasing the old share first could collapse the binding's
3399        // registry slot to zero (production bindings remove the value
3400        // entry on refcount-zero — see `tests/common/mod.rs:191-204`),
3401        // and a subsequent `retain_handle` on the new seed would bump a
3402        // refcount on a slot whose value has been removed. By taking
3403        // the new retains first, we floor the refcount at ≥1 before
3404        // any release happens.
3405        let new_scratch = match prev_op {
3406            // Slice H: the OperatorOp stored on NodeRecord previously
3407            // passed `make_op_scratch` validation at registration time
3408            // (RegisterError::OperatorSeedSentinel for Scan/Reduce
3409            // sentinel seeds; Last { default: NO_HANDLE } is accepted
3410            // and never errors). Re-running it here on the same op
3411            // value is structurally guaranteed to succeed.
3412            Some(op) => self
3413                .make_op_scratch(op)
3414                .expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time"),
3415            None => None,
3416        };
3417
3418        // Phase 3: NOW release handles owned by the old op_scratch
3419        // (Scan/Reduce acc, Distinct/Pairwise prev, Last latest +
3420        // default). Safe per Phase 2's retain-first floor. The boxed
3421        // value is consumed and dropped after.
3422        if let Some(scratch) = old_scratch.as_mut() {
3423            scratch.release_handles(&*self.binding);
3424        }
3425        drop(old_scratch);
3426
3427        // Phase 4: install the fresh scratch.
3428        {
3429            let rec = s.require_node_mut(node_id);
3430            rec.op_scratch = new_scratch;
3431        }
3432
3433        // Phase 5: release wave-state handles collected in phase 1.
3434        for h in handles_to_release {
3435            self.binding.release_handle(h);
3436        }
3437        for h in pause_buffer_payloads {
3438            self.binding.release_handle(h);
3439        }
3440    }
3441
3442    /// Activate `root` and any transitive uncached compute deps so their
3443    /// caches fill our dep_handles slots.
3444    ///
3445    /// Slice A close (M1): pure dep-walk + dep_handles population +
3446    /// pending_fires queueing. No `in_tick` management or `drain_and_flush`
3447    /// call — the outer caller (typically `Core::subscribe` via
3448    /// [`Core::run_wave`]) owns the wave lifecycle and drains lock-released
3449    /// around `invoke_fn`.
3450    ///
3451    /// Walk shape:
3452    ///   1. **Discover phase (DFS via Vec stack):** starting at `root`,
3453    ///      walk transitively-needing-activation deps via the `deps`
3454    ///      chain. Build an ordering where each node appears AFTER all
3455    ///      of its uncached compute deps — i.e., reverse topological
3456    ///      among the visited subgraph.
3457    ///   2. **Deliver phase (forward iteration):** for each visited
3458    ///      node in dep-first order, push deps' caches into the node's
3459    ///      `dep_handles` slots. Caches that were sentinel pre-walk are
3460    ///      filled because their parent's fn fires later in the wave's
3461    ///      drain loop and `commit_emission` propagates new caches forward
3462    ///      via `deliver_data_to_consumer` — the same path this method
3463    ///      uses for the initial seed. Adds the node to `pending_fires`
3464    ///      if its tracked-deps gate is satisfied; the wave-engine drain
3465    ///      fires the fn lock-released around `invoke_fn`.
3466    pub(crate) fn activate_derived(&self, s: &mut CoreState, root: NodeId) {
3467        // Phase 1: discover. DFS to collect every compute node reachable
3468        // via deps that doesn't yet have a cache and hasn't fired.
3469        // Record them in dep-first (post-order) so phase 2 can deliver
3470        // caches forward. Frame is `(node_id, finalize)` — `finalize=false`
3471        // means "first visit: push deps then re-push self with finalize=true";
3472        // `finalize=true` means "deps have been expanded, append self to
3473        // `order`."
3474        let mut visited: HashSet<NodeId> = HashSet::new();
3475        let mut order: Vec<NodeId> = Vec::new();
3476        let mut stack: Vec<(NodeId, bool)> = vec![(root, false)];
3477        while let Some((id, finalize)) = stack.pop() {
3478            if finalize {
3479                order.push(id);
3480                continue;
3481            }
3482            if !visited.insert(id) {
3483                continue;
3484            }
3485            stack.push((id, true));
3486            let dep_ids: Vec<NodeId> = s.require_node(id).dep_ids_vec();
3487            for dep_id in dep_ids {
3488                let (dep_is_state, dep_cache, dep_has_fired) = {
3489                    let dep_rec = s.require_node(dep_id);
3490                    (dep_rec.is_state(), dep_rec.cache, dep_rec.has_fired_once)
3491                };
3492                if !dep_is_state
3493                    && dep_cache == NO_HANDLE
3494                    && !dep_has_fired
3495                    && !visited.contains(&dep_id)
3496                {
3497                    stack.push((dep_id, false));
3498                }
3499            }
3500        }
3501
3502        // Phase 2: deliver caches in dep-first order. For each node, walk
3503        // its deps and call `deliver_data_to_consumer` for any with caches.
3504        // Producer nodes (no deps + has fn — Slice D, D031) have no deps
3505        // to walk; queue them directly into `pending_fires` so the wave
3506        // engine fires their fn once on activation.
3507        for &id in &order {
3508            let (dep_ids, is_producer) = {
3509                let rec = s.require_node(id);
3510                (rec.dep_ids_vec(), rec.is_producer())
3511            };
3512            if is_producer {
3513                s.pending_fires.insert(id);
3514                continue;
3515            }
3516            for (i, dep_id) in dep_ids.iter().copied().enumerate() {
3517                let dep_cache = s.require_node(dep_id).cache;
3518                if dep_cache != NO_HANDLE {
3519                    self.deliver_data_to_consumer(s, id, i, dep_cache);
3520                }
3521            }
3522        }
3523    }
3524
3525    // -------------------------------------------------------------------
3526    // Emission entry point
3527    // -------------------------------------------------------------------
3528
3529    /// Set a state node's value. Triggers a wave (DIRTY → DATA/RESOLVED →
3530    /// fn fires for downstream).
3531    ///
3532    /// Silent no-op if the node has already terminated (R1.3.4). The handle
3533    /// passed in is still released by the caller's binding-side intern path
3534    /// — no implicit retain is consumed when the call short-circuits.
3535    ///
3536    /// # Panics
3537    ///
3538    /// Panics if `node_id` is not a state node, or if `new_handle` is
3539    /// [`NO_HANDLE`] (per R1.2.4, sentinel is not a valid DATA payload).
3540    pub fn emit(&self, node_id: NodeId, new_handle: HandleId) {
3541        assert!(
3542            new_handle != NO_HANDLE,
3543            "NO_HANDLE is not a valid DATA payload (R1.2.4)"
3544        );
3545        // Validate + terminal short-circuit under a brief lock.
3546        //
3547        // emit() is valid for State and Producer nodes — both are
3548        // intrinsic sources whose values are not derived from declared
3549        // deps. State nodes get emit() from user code; Producer nodes
3550        // get emit() from sink callbacks the producer's build closure
3551        // registered (sink fires → re-enter Core → emit on self).
3552        // Derived / Dynamic / Operator nodes emit via their fn return
3553        // value through fire_fn / fire_operator, NOT via emit().
3554        {
3555            let s = self.lock_state();
3556            let rec = s.require_node(node_id);
3557            assert!(
3558                rec.is_state() || rec.is_producer(),
3559                "emit() is for state or producer nodes only; \
3560                 derived/dynamic/operator emit via their fn return value"
3561            );
3562            if rec.terminal.is_some() {
3563                drop(s);
3564                // Caller's intern share would otherwise leak; cache slot
3565                // ownership doesn't transfer because we're not advancing
3566                // cache. Released lock-released so the binding can't
3567                // deadlock against an internal binding mutex.
3568                self.binding.release_handle(new_handle);
3569                return;
3570            }
3571        }
3572        // Run wave on `node_id`'s touched partitions. Slice Y1 / Phase E:
3573        // emit cascades only via `s.children`, all unioned with `node_id`'s
3574        // partition by construction (dep edges = union edges). Common case
3575        // is a single-partition acquire — disjoint-partition emits run
3576        // truly parallel under per-partition `wave_owner`.
3577        self.run_wave_for(node_id, |this| {
3578            this.commit_emission(node_id, new_handle);
3579        });
3580    }
3581
3582    /// Read a node's current cache. Returns [`NO_HANDLE`] if sentinel.
3583    #[must_use]
3584    pub fn cache_of(&self, node_id: NodeId) -> HandleId {
3585        self.lock_state().require_node(node_id).cache
3586    }
3587
3588    /// Whether the node's fn has fired at least once (compute) OR it has had
3589    /// a non-sentinel value (state).
3590    #[must_use]
3591    pub fn has_fired_once(&self, node_id: NodeId) -> bool {
3592        self.lock_state().require_node(node_id).has_fired_once
3593    }
3594
3595    // -------------------------------------------------------------------
3596    // Read-side inspection helpers (Slice E+, M2)
3597    //
3598    // Non-panicking accessors for graph-layer introspection (`describe()`,
3599    // `observe()`, `node_count()`). All five return Option/empty for
3600    // unknown ids — they're meant to back walks over `node_ids()` where
3601    // the caller already knows the ids are valid, plus debugging /
3602    // dry-run probes that prefer "absence" over "panic".
3603    //
3604    // Keep these strictly read-only: no wave entry, no binding callbacks,
3605    // no lock release. Each takes the state lock once, copies a small
3606    // value, and drops the lock.
3607    // -------------------------------------------------------------------
3608
3609    /// Snapshot of every registered `NodeId` in unspecified order. The
3610    /// order matches `HashMap` iteration over the internal node table —
3611    /// callers that need stable ordering should track names at the
3612    /// `Graph` layer (canonical spec §3.5 namespace).
3613    #[must_use]
3614    pub fn node_ids(&self) -> Vec<NodeId> {
3615        self.lock_state().nodes.keys().copied().collect()
3616    }
3617
3618    /// Total number of nodes registered in this Core.
3619    #[must_use]
3620    pub fn node_count(&self) -> usize {
3621        self.lock_state().nodes.len()
3622    }
3623
3624    /// Returns `Some(kind)` for known nodes, `None` for unknown. Kind is
3625    /// **derived** from the field shape per D030 — see [`NodeKind`].
3626    #[must_use]
3627    pub fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
3628        self.lock_state().nodes.get(&node_id).map(NodeRecord::kind)
3629    }
3630
3631    /// Snapshot of the node's deps in declaration order. Empty for
3632    /// unknown nodes or for state nodes (which have no deps).
3633    #[must_use]
3634    pub fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
3635        self.lock_state()
3636            .nodes
3637            .get(&node_id)
3638            .map(NodeRecord::dep_ids_vec)
3639            .unwrap_or_default()
3640    }
3641
3642    /// Returns `Some(kind)` if the node has terminated (R1.3.4) — the
3643    /// pair `Some(Complete)` / `Some(Error(h))` mirrors the wire message
3644    /// the node emitted. `None` for live nodes or unknown ids.
3645    #[must_use]
3646    pub fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
3647        self.lock_state()
3648            .nodes
3649            .get(&node_id)
3650            .and_then(|r| r.terminal)
3651    }
3652
3653    /// Whether the node has wave-scoped DIRTY pending (a tier-1 message
3654    /// queued but the matching tier-3 settle has not yet flushed).
3655    /// `false` for unknown ids. Mostly useful for `describe()` status
3656    /// classification (R3.6.1 `"dirty"`).
3657    #[must_use]
3658    pub fn is_dirty(&self, node_id: NodeId) -> bool {
3659        self.lock_state()
3660            .nodes
3661            .get(&node_id)
3662            .is_some_and(|r| r.dirty)
3663    }
3664
3665    /// Snapshot of `parent`'s meta companion list (R1.3.9.d / R2.3.3 —
3666    /// the companions added via [`Self::add_meta_companion`]). Empty
3667    /// for unknown ids or for nodes with no companions registered.
3668    ///
3669    /// Used by the graph layer's `signal_invalidate` to filter meta
3670    /// children out of the broadcast (canonical R3.7.2 — meta caches
3671    /// are preserved across graph-wide INVALIDATE).
3672    #[must_use]
3673    pub fn meta_companions_of(&self, parent: NodeId) -> Vec<NodeId> {
3674        self.lock_state()
3675            .nodes
3676            .get(&parent)
3677            .map(|r| r.meta_companions.clone())
3678            .unwrap_or_default()
3679    }
3680
3681    // -------------------------------------------------------------------
3682    // Wave engine — lives in `crate::batch` (Slice C-1 module split;
3683    // Slice A close M1 refactor lifted the binding-callback re-entrance
3684    // restrictions). The methods are still on `Core`; see `batch.rs` for:
3685    //
3686    //   - `run_wave` — wave entry, manages own locking.
3687    //   - `drain_and_flush` — drain phase, lock-released around invoke_fn.
3688    //   - `commit_emission` — lock-released around custom_equals.
3689    //   - `pick_next_fire`, `deliver_data_to_consumer`, `queue_notify`,
3690    //     `flush_notifications` — wave-engine helpers.
3691    // -------------------------------------------------------------------
3692}
3693
3694// -----------------------------------------------------------------------
3695// COMPLETE / ERROR — terminal lifecycle + auto-cascade gating
3696// -----------------------------------------------------------------------
3697
3698impl Core {
3699    /// Emit `[COMPLETE]` (R1.3.4) on `node_id`, marking it terminal. After
3700    /// this call:
3701    ///
3702    /// - Subsequent `Core::emit` on this node is a silent no-op (idempotent
3703    ///   termination).
3704    /// - The node's fn no longer fires.
3705    /// - The node's cache is preserved (last value still observable via
3706    ///   `cache_of`).
3707    /// - Children receive `[COMPLETE]` (tier 5 — bypasses pause buffer).
3708    /// - Auto-cascade gating (Lock 2.B): each child that has all of its
3709    ///   deps in a terminal state auto-emits its own `[COMPLETE]`. ERROR
3710    ///   dominates COMPLETE — if any of a child's deps emitted ERROR, the
3711    ///   child auto-cascades that ERROR instead.
3712    ///
3713    /// Idempotent: calling `complete` on an already-terminal node is a no-op.
3714    ///
3715    /// # Panics
3716    ///
3717    /// Panics if `node_id` is unknown.
3718    pub fn complete(&self, node_id: NodeId) {
3719        self.emit_terminal(node_id, TerminalKind::Complete);
3720    }
3721
3722    /// Emit `[ERROR, error_handle]` (R1.3.4) on `node_id`. `error_handle`
3723    /// must resolve to a non-sentinel value (R1.2.5) — the binding side has
3724    /// already interned the error value before this call. Same lifecycle
3725    /// effects as [`Self::complete`]; ERROR dominates COMPLETE in auto-
3726    /// cascade gating.
3727    ///
3728    /// # Panics
3729    ///
3730    /// Panics if `node_id` is unknown or `error_handle == NO_HANDLE`.
3731    pub fn error(&self, node_id: NodeId, error_handle: HandleId) {
3732        assert!(
3733            error_handle != NO_HANDLE,
3734            "NO_HANDLE is not a valid ERROR payload (R1.2.5)"
3735        );
3736        self.emit_terminal(node_id, TerminalKind::Error(error_handle));
3737        // The caller's intern share for `error_handle` is NOT transferred
3738        // to any slot — `terminate_node` takes its OWN retain for every
3739        // populated `terminal` and `dep_terminals` slot. Release the
3740        // caller's share here (mirrors `Core::emit`'s short-circuit
3741        // release on terminal). Without this, every `error()` call leaks
3742        // one binding-side handle ref. Slice A-bigger /qa item D fix.
3743        self.binding.release_handle(error_handle);
3744    }
3745
3746    fn emit_terminal(&self, node_id: NodeId, terminal: TerminalKind) {
3747        {
3748            let s = self.lock_state();
3749            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
3750        }
3751        // Wave on `node_id`'s touched partitions (Slice Y1 / Phase E).
3752        // COMPLETE / ERROR cascade follows `s.children` (in-partition by
3753        // union-find construction). The thunk acquires its own state lock
3754        // to queue the cascade.
3755        self.run_wave_for(node_id, |this| {
3756            let mut s = this.lock_state();
3757            this.terminate_node(&mut s, node_id, terminal);
3758        });
3759    }
3760
3761    /// Set the node's terminal slot, queue the wire message, and cascade to
3762    /// children. Idempotent on already-terminal node (no-op).
3763    ///
3764    /// Iterative implementation (Slice A-bigger, M1-close): a work-queue
3765    /// drives the cascade so deep linear chains don't overflow the OS
3766    /// thread stack. Mirrors `path_from_to`'s explicit-stack pattern.
3767    fn terminate_node(&self, s: &mut CoreState, node_id: NodeId, terminal: TerminalKind) {
3768        let mut work: Vec<(NodeId, TerminalKind)> = vec![(node_id, terminal)];
3769        while let Some((id, t)) = work.pop() {
3770            if s.require_node(id).terminal.is_some() {
3771                continue; // Idempotent — already terminal.
3772            }
3773            // Take a refcount share for the terminal slot so the error
3774            // handle outlives the binding-side intern's transient share.
3775            if let TerminalKind::Error(h) = t {
3776                self.binding.retain_handle(h);
3777            }
3778            // Slice E2 /qa Q2(b) (D069): if a resubscribable node is
3779            // terminating with no live subscribers, queue eager
3780            // `wipe_ctx` for the wave's lock-released drain. This is the
3781            // mutually-exclusive complement of the `Subscription::Drop`
3782            // wipe site: when the LAST sub drops first then terminate
3783            // fires, subs are empty here and we queue; when terminate
3784            // fires WITH subs still alive, we DON'T queue (subs not
3785            // empty), and `Subscription::Drop` will fire wipe directly
3786            // when those subs eventually drop. Either way, exactly one
3787            // wipe fires per terminal lifecycle.
3788            let queue_wipe = {
3789                let rec = s.require_node(id);
3790                rec.resubscribable && rec.subscribers.is_empty()
3791            };
3792            s.require_node_mut(id).terminal = Some(t);
3793            if queue_wipe {
3794                s.pending_wipes.push(id);
3795            }
3796            // Drain pending fires for this node — fn won't fire on a
3797            // terminal node.
3798            s.pending_fires.remove(&id);
3799            // R1.3.8.b / Slice F (A3, 2026-05-07): if this node was paused
3800            // when terminating (the canonical case is the R1.3.8.c overflow
3801            // ERROR synthesis path), drain the pause buffer and release
3802            // each payload's queue_notify-time retain. Without this, the
3803            // buffer leaks one share per buffered DATA/RESOLVED/INVALIDATE.
3804            // Subscribers receive the terminal directly via the cascade
3805            // below (tier-5 bypasses the pause buffer); the buffered
3806            // content is moot post-terminal.
3807            let drained: Vec<HandleId> = {
3808                let rec = s.require_node_mut(id);
3809                let mut drained: Vec<HandleId> = Vec::new();
3810                if rec.pause_state.is_paused() {
3811                    // Take the buffered messages out, then collapse the
3812                    // pause state to Active so subsequent code observes a
3813                    // clean lifecycle. Idempotent on Active (no-op).
3814                    let prev = std::mem::replace(&mut rec.pause_state, PauseState::Active);
3815                    if let PauseState::Paused { buffer, .. } = prev {
3816                        drained.extend(buffer.into_iter().filter_map(Message::payload_handle));
3817                    }
3818                }
3819                // QA A4 (2026-05-07): drain replay buffer on terminate. A
3820                // non-resubscribable terminal ends the lifecycle; without
3821                // this drain the buffer's retains leak until `Drop for
3822                // CoreState`. Resubscribable nodes' replay buffers are
3823                // also drained (when they're hit by a terminal cascade);
3824                // a fresh subscribe rebuilds the buffer from scratch as
3825                // part of `reset_for_fresh_lifecycle`.
3826                drained.extend(rec.replay_buffer.drain(..));
3827                drained
3828            };
3829            for h in drained {
3830                self.binding.release_handle(h);
3831            }
3832            // Queue the wire message (tier 5 — bypasses pause buffer).
3833            let msg = match t {
3834                TerminalKind::Complete => Message::Complete,
3835                TerminalKind::Error(h) => Message::Error(h),
3836            };
3837            self.queue_notify(s, id, msg);
3838            // Cascade to children.
3839            let child_ids: Vec<NodeId> = s
3840                .children
3841                .get(&id)
3842                .map(|c| c.iter().copied().collect())
3843                .unwrap_or_default();
3844            for child_id in child_ids {
3845                let dep_idx = s.require_node(child_id).dep_index_of(id);
3846                let Some(idx) = dep_idx else { continue };
3847                // Mark this child's per-dep terminal slot. Take a retain on
3848                // the error handle for the slot share.
3849                {
3850                    let child = s.require_node_mut(child_id);
3851                    if child.dep_records[idx].terminal.is_some() {
3852                        // Idempotent — child already saw this dep terminate.
3853                        continue;
3854                    }
3855                    child.dep_records[idx].terminal = Some(t);
3856                }
3857                if let TerminalKind::Error(h) = t {
3858                    self.binding.retain_handle(h);
3859                }
3860                // Auto-cascade gating: if all deps now terminal, push child
3861                // onto the work queue with the chosen terminal.
3862                //
3863                // Slice C-1: kinds that opt out of Lock 2.B (currently
3864                // `Operator(Reduce)`) intercept upstream COMPLETE so they
3865                // can emit their accumulator before terminating. Instead of
3866                // cascading, queue the child for fn-fire — `fire_operator`
3867                // sees `dep_records[0].terminal` set and emits the
3868                // appropriate batch (Data(acc) + Complete on COMPLETE,
3869                // Error(h) on ERROR).
3870                let action = {
3871                    let child = s.require_node(child_id);
3872                    if child.terminal.is_some() {
3873                        ChildAction::None // Already terminated — no-op.
3874                    } else if child.all_deps_terminal() {
3875                        if child.skips_auto_cascade() {
3876                            ChildAction::QueueFire
3877                        } else {
3878                            ChildAction::Cascade(pick_cascade_terminal(&child.dep_records))
3879                        }
3880                    } else {
3881                        ChildAction::None
3882                    }
3883                };
3884                match action {
3885                    ChildAction::None => {}
3886                    ChildAction::Cascade(t_child) => {
3887                        work.push((child_id, t_child));
3888                    }
3889                    ChildAction::QueueFire => {
3890                        s.pending_fires.insert(child_id);
3891                    }
3892                }
3893            }
3894        }
3895    }
3896}
3897
3898/// Outcome of Lock 2.B child gating in `terminate_node`'s cascade walk.
3899enum ChildAction {
3900    /// No cascade; child is already terminal or not yet all-deps-terminal.
3901    None,
3902    /// Auto-cascade with the picked terminal kind (ERROR dominates COMPLETE).
3903    Cascade(TerminalKind),
3904    /// Queue child for fn-fire instead of cascading. Used by operator
3905    /// kinds that intercept upstream terminal (Operator(Reduce)).
3906    QueueFire,
3907}
3908
3909/// Lock 2.B cascade-terminal selection: ERROR dominates COMPLETE; first
3910/// ERROR seen wins. Caller has already verified all deps are terminal.
3911fn pick_cascade_terminal(dep_records: &[DepRecord]) -> TerminalKind {
3912    for dr in dep_records {
3913        if let Some(TerminalKind::Error(h)) = dr.terminal {
3914            return TerminalKind::Error(h);
3915        }
3916    }
3917    TerminalKind::Complete
3918}
3919
3920// -----------------------------------------------------------------------
3921// TEARDOWN — destruction, with auto-COMPLETE prepend (R2.6.4 / Lock 6.F)
3922// -----------------------------------------------------------------------
3923
3924impl Core {
3925    /// Tear `node_id` down. Per R2.6.4 / Lock 6.F:
3926    ///
3927    /// - **Auto-prepend COMPLETE.** If the node has not yet emitted a
3928    ///   terminal (`COMPLETE` / `ERROR`), `terminate_node` is called with
3929    ///   `Complete` first so subscribers see `[COMPLETE, TEARDOWN]`, not
3930    ///   bare `[TEARDOWN]`. This guarantees a clean end-of-stream signal
3931    ///   to async iterators and other consumers that wait on terminal
3932    ///   delivery.
3933    /// - **Idempotent on duplicate delivery.** The per-node
3934    ///   `has_received_teardown` flag is set on the first call; subsequent
3935    ///   `teardown` calls (or cascade visits from other paths) are silent
3936    ///   no-ops — no second `[COMPLETE, TEARDOWN]` pair to subscribers.
3937    /// - **Cascade downstream.** Each child is recursively torn down. The
3938    ///   child's own COMPLETE auto-cascades from `terminate_node`'s logic
3939    ///   (Lock 2.B); its TEARDOWN comes from this cascade.
3940    ///
3941    /// # Panics
3942    ///
3943    /// Panics if `node_id` is unknown.
3944    pub fn teardown(&self, node_id: NodeId) {
3945        {
3946            let s = self.lock_state();
3947            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
3948        }
3949        let torn_down: Arc<Mutex<Vec<NodeId>>> = Arc::new(Mutex::new(Vec::new()));
3950        let torn_down_for_wave = torn_down.clone();
3951        // TEARDOWN cascade follows `s.children` AND `meta_companions`
3952        // (R1.3.9.d) — meta-companions can cross partitions. Slice Y1 /
3953        // Phase E `compute_touched_partitions(node_id)` (called by
3954        // `run_wave_for`) walks both axes so the wave acquires every
3955        // partition reachable via the cascade.
3956        self.run_wave_for(node_id, move |this| {
3957            let mut s = this.lock_state();
3958            let collected = this.teardown_inner(&mut s, node_id);
3959            torn_down_for_wave.lock().extend(collected);
3960        });
3961        // Fire NodeTornDown for every cascaded id (root + metas +
3962        // downstream consumers that auto-cascaded). Outside the state
3963        // lock, matching fire_topology_event discipline.
3964        let ids = std::mem::take(&mut *torn_down.lock());
3965        for id in ids {
3966            self.fire_topology_event(&crate::topology::TopologyEvent::NodeTornDown(id));
3967        }
3968    }
3969
3970    /// Iterative teardown walk (Slice A-bigger, M1-close).
3971    ///
3972    /// The recursive shape was:
3973    ///   ```text
3974    ///   teardown(n):
3975    ///     if torn_down: return
3976    ///     mark torn_down
3977    ///     for meta in metas: teardown(meta)
3978    ///     terminate_node + queue Teardown
3979    ///     for child in children: teardown(child)
3980    ///   ```
3981    /// Deep linear chains (~10k nodes) overflowed the OS thread stack.
3982    ///
3983    /// The iterative shape uses a `Vec<Action>` stack with `Visit` and
3984    /// `EmitTeardown` actions. `Visit(n)` marks `n` torn-down (or no-ops
3985    /// if already), then pushes (in reverse order so LIFO pops in forward
3986    /// order) `Visit(child_K), …, Visit(child_1), EmitTeardown(n),
3987    /// Visit(meta_M), …, Visit(meta_1)`. The R1.3.9.d "metas first, then
3988    /// self, then children" ordering is preserved by the push order:
3989    /// metas pop first, recursively expand and emit; then `EmitTeardown(n)`
3990    /// pops and runs `terminate_node` + queue `Teardown`; then children
3991    /// pop. Idempotency via `has_received_teardown` keeps each node
3992    /// visited at most once even when multi-parent diamonds re-enter via
3993    /// a sibling path.
3994    fn teardown_inner(&self, s: &mut CoreState, root: NodeId) -> Vec<NodeId> {
3995        enum Action {
3996            Visit(NodeId),
3997            EmitTeardown(NodeId),
3998        }
3999        let mut stack: Vec<Action> = vec![Action::Visit(root)];
4000        // Topology accumulator: every node that actually emits TEARDOWN
4001        // (i.e. each `EmitTeardown(id)` site, NOT each `Visit` — visits
4002        // for already-torn-down nodes short-circuit on idempotency).
4003        let mut torn_down: Vec<NodeId> = Vec::new();
4004        while let Some(action) = stack.pop() {
4005            match action {
4006                Action::Visit(id) => {
4007                    if s.require_node(id).has_received_teardown {
4008                        continue; // Idempotent (R2.6.4).
4009                    }
4010                    s.require_node_mut(id).has_received_teardown = true;
4011                    // Push order: children first (pop LAST), then
4012                    // EmitTeardown(id), then metas (pop FIRST). Reverse
4013                    // each list so within-group order matches the original
4014                    // recursive iteration.
4015                    let children: Vec<NodeId> = s
4016                        .children
4017                        .get(&id)
4018                        .map(|c| c.iter().copied().collect())
4019                        .unwrap_or_default();
4020                    for &child in children.iter().rev() {
4021                        stack.push(Action::Visit(child));
4022                    }
4023                    stack.push(Action::EmitTeardown(id));
4024                    let metas: Vec<NodeId> = s.require_node(id).meta_companions.clone();
4025                    for &meta in metas.iter().rev() {
4026                        stack.push(Action::Visit(meta));
4027                    }
4028                }
4029                Action::EmitTeardown(id) => {
4030                    // Auto-prepend COMPLETE if not yet terminal. The (now
4031                    // iterative) terminate_node handles auto-cascade to
4032                    // children's own terminal slots per Lock 2.B.
4033                    let already_terminal = s.require_node(id).terminal.is_some();
4034                    if !already_terminal {
4035                        self.terminate_node(s, id, TerminalKind::Complete);
4036                    }
4037                    // Wire emission of the TEARDOWN itself (tier 6).
4038                    self.queue_notify(s, id, Message::Teardown);
4039                    torn_down.push(id);
4040                }
4041            }
4042        }
4043        torn_down
4044    }
4045
4046    /// Attach `companion` as a meta companion of `parent` per R1.3.9.d.
4047    /// Meta companions are nodes whose lifecycle is bound to the parent's
4048    /// in TEARDOWN ordering: when `parent` tears down, `companion` tears
4049    /// down first.
4050    ///
4051    /// Use this for inspection / audit / sidecar nodes that subscribe to
4052    /// parent state — without the ordering, the companion could observe
4053    /// the parent mid-destruction and emit garbage.
4054    ///
4055    /// Idempotent on duplicate registration of the same companion.
4056    ///
4057    /// # Lifecycle constraint
4058    ///
4059    /// Intended for **setup-time** wiring — call this before `parent` or
4060    /// `companion` enters a wave. Mid-wave registration (especially during
4061    /// a teardown cascade in flight) is implementation-defined: the new
4062    /// edge takes effect on the *next* wave. Adding a companion to a
4063    /// torn-down parent silently no-ops (the parent will not tear down
4064    /// again). For dynamic companion attachment with deterministic
4065    /// ordering, prefer constructing the wiring before subscribers exist.
4066    ///
4067    /// # Panics
4068    ///
4069    /// Panics if either node id is unknown, or if `parent == companion`
4070    /// (a node cannot be its own meta companion — would loop on TEARDOWN).
4071    pub fn add_meta_companion(&self, parent: NodeId, companion: NodeId) {
4072        assert!(parent != companion, "node cannot be its own meta companion");
4073        let mut s = self.lock_state();
4074        assert!(s.nodes.contains_key(&parent), "unknown parent {parent:?}");
4075        assert!(
4076            s.nodes.contains_key(&companion),
4077            "unknown companion {companion:?}"
4078        );
4079        let metas = &mut s.require_node_mut(parent).meta_companions;
4080        if !metas.contains(&companion) {
4081            metas.push(companion);
4082        }
4083    }
4084}
4085
4086// -----------------------------------------------------------------------
4087// INVALIDATE — cache clear + downstream cascade
4088// -----------------------------------------------------------------------
4089
4090impl Core {
4091    /// Clear `node_id`'s cache and cascade `[INVALIDATE]` to downstream
4092    /// dependents per canonical spec §1.4.
4093    ///
4094    /// Semantics:
4095    /// - **Never-populated case (R1.4 line 197):** if `cache == NO_HANDLE`,
4096    ///   the call is a no-op — no cache to clear, no INVALIDATE emitted.
4097    ///   This naturally provides idempotency within a wave: once a node has
4098    ///   been invalidated this wave (cache = NO_HANDLE), a second invalidate
4099    ///   on the same node does nothing.
4100    /// - **Cache clear (immediate):** the node's cached handle is dropped
4101    ///   (refcount released), `cache` becomes `NO_HANDLE`. State nodes
4102    ///   keep `has_fired_once` per spec — INVALIDATE is not a re-gating
4103    ///   event (the next emission to a previously-fired state still does
4104    ///   not re-trigger the first-run gate; that's a resubscribable-terminal
4105    ///   lifecycle concern, separate slice).
4106    /// - **Wire emission (tier 4):** `[INVALIDATE]` is queued via the
4107    ///   normal pause-aware notify path. Buffers while paused, flushes
4108    ///   immediately otherwise.
4109    /// - **Downstream cascade:** for each child of this node, the child's
4110    ///   `dep_handles[idx_of_node]` is reset to `NO_HANDLE` (its previous
4111    ///   value referenced a now-released handle). The child is then
4112    ///   recursively invalidated (no-op if its cache was already
4113    ///   `NO_HANDLE`). This re-closes the child's first-run gate — fn
4114    ///   won't fire again until the upstream re-emits a value.
4115    ///
4116    /// Wraps in a fresh wave when called from outside a wave, so
4117    /// notifications flush at the natural wave boundary.
4118    ///
4119    /// # Panics
4120    ///
4121    /// Panics if `node_id` is unknown, consistent with `emit` / `pause`.
4122    pub fn invalidate(&self, node_id: NodeId) {
4123        {
4124            let s = self.lock_state();
4125            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
4126        }
4127        // INVALIDATE cascade follows `s.children` (in-partition by union-
4128        // find construction). Slice Y1 / Phase E.
4129        self.run_wave_for(node_id, |this| {
4130            let mut s = this.lock_state();
4131            this.invalidate_inner(&mut s, node_id);
4132        });
4133    }
4134
4135    /// Iterative invalidate cascade (Slice A-bigger, M1-close).
4136    ///
4137    /// The recursive shape was a depth-first cache-clear walk:
4138    ///   ```text
4139    ///   invalidate(n):
4140    ///     if cache(n) == NO_HANDLE: return  // already-invalidated guard
4141    ///     cache(n) = NO_HANDLE; release handle
4142    ///     queue Invalidate(n)
4143    ///     for child in children:
4144    ///       child.dep_handles[idx] = NO_HANDLE
4145    ///       invalidate(child)
4146    ///   ```
4147    /// Deep linear chains overflowed the OS thread stack. The work-queue
4148    /// rewrite has no ordering subtleties (unlike teardown's R1.3.9.d
4149    /// metas-first constraint) — Invalidate is a tier-4 broadcast where
4150    /// the never-populated / already-invalidated guard provides natural
4151    /// idempotency for diamond fan-in.
4152    fn invalidate_inner(&self, s: &mut CoreState, root: NodeId) {
4153        let mut work: Vec<NodeId> = vec![root];
4154        while let Some(node_id) = work.pop() {
4155            // Never-populated / already-invalidated: no-op (R1.4 idempotency).
4156            // Per R1.3.9.c never-populated case, OnInvalidate cleanup hook
4157            // also does NOT fire — natural fallout of skipping via the
4158            // cache==NO_HANDLE guard (we never reach the queue-push below).
4159            let old_handle = s.require_node(node_id).cache;
4160            if old_handle == NO_HANDLE {
4161                continue;
4162            }
4163            // Clear cache + release the handle's slot ownership.
4164            s.require_node_mut(node_id).cache = NO_HANDLE;
4165            self.binding.release_handle(old_handle);
4166            // Slice E2 (R1.3.9.b strict per D057 + D058 fire-at-cache-clear):
4167            // queue OnInvalidate cleanup hook for lock-released drain at
4168            // wave-end. The dedup set guarantees at-most-once-per-wave-per-
4169            // node firing even if a node re-populates mid-wave (via fn-fire
4170            // emit) and gets re-invalidated through a separate path. Pure
4171            // cache==NO_HANDLE idempotency (above) catches "still at
4172            // sentinel" only; the explicit set is the strict R1.3.9.b
4173            // reading.
4174            if s.invalidate_hooks_fired_this_wave.insert(node_id) {
4175                s.deferred_cleanup_hooks
4176                    .push((node_id, CleanupTrigger::OnInvalidate));
4177            }
4178            // Wire emission. Pause-aware via queue_notify.
4179            self.queue_notify(s, node_id, Message::Invalidate);
4180            // Cascade: for each child, clear the dep record's prev_data
4181            // referencing this node and push child onto the work queue.
4182            let child_ids: Vec<NodeId> = s
4183                .children
4184                .get(&node_id)
4185                .map(|c| c.iter().copied().collect())
4186                .unwrap_or_default();
4187            for child_id in child_ids {
4188                let dep_idx = s.require_node(child_id).dep_index_of(node_id);
4189                if let Some(idx) = dep_idx {
4190                    // Reset the child's dep record — the handle was just
4191                    // released. Subsequent first-run-gate checks see
4192                    // sentinel and re-close.
4193                    //
4194                    // Snapshot prev_data + data_batch retains for deferred
4195                    // release, then clear the record. Two-phase to satisfy
4196                    // the borrow checker (nodes + deferred_handle_releases
4197                    // are separate CoreState fields).
4198                    let (old_prev, batch_hs): (HandleId, SmallVec<[HandleId; 1]>) = {
4199                        let dr = &s.require_node(child_id).dep_records[idx];
4200                        (dr.prev_data, dr.data_batch.clone())
4201                    };
4202                    {
4203                        // Q2 (2026-05-09): deferred_handle_releases moved
4204                        // to CrossPartitionState. Lock-discipline: state
4205                        // is held, so acquiring cross_partition next
4206                        // satisfies `state → cross_partition`.
4207                        let mut cps = self.cross_partition.lock();
4208                        if old_prev != NO_HANDLE {
4209                            cps.deferred_handle_releases.push(old_prev);
4210                        }
4211                        for h in batch_hs {
4212                            cps.deferred_handle_releases.push(h);
4213                        }
4214                    }
4215                    let dr = &mut s.require_node_mut(child_id).dep_records[idx];
4216                    dr.prev_data = NO_HANDLE;
4217                    dr.data_batch.clear();
4218                    work.push(child_id);
4219                }
4220            }
4221        }
4222    }
4223}
4224
4225// -----------------------------------------------------------------------
4226// PAUSE / RESUME — multi-pauser lockset + replay buffer
4227// -----------------------------------------------------------------------
4228
4229/// Reported back from [`Core::resume`] when the final lock releases.
4230///
4231/// `replayed` is the number of tier-3/tier-4 messages dispatched to
4232/// subscribers as part of the drain. `dropped` is the number of messages
4233/// that fell out the front of the buffer due to the Core-global
4234/// `pause_buffer_cap` while this pause cycle was active. A non-zero
4235/// `dropped` indicates a controller held the lock long enough to overflow
4236/// the cap; the binding may want to surface a warning or error.
4237#[derive(Copy, Clone, Debug, PartialEq, Eq)]
4238pub struct ResumeReport {
4239    pub replayed: u32,
4240    pub dropped: u32,
4241}
4242
4243impl Core {
4244    /// Acquire a pause lock on `node_id`. The first lock transitions the
4245    /// node from `Active` to `Paused`; further locks add to the lockset.
4246    /// While paused, tier-3 (DATA/RESOLVED) and tier-4 (INVALIDATE) outgoing
4247    /// messages buffer in the node's pause buffer; other tiers flush
4248    /// immediately.
4249    ///
4250    /// Re-acquiring the same `lock_id` is an idempotent no-op (matches TS
4251    /// convention, R1.2.6 silent on the case).
4252    pub fn pause(&self, node_id: NodeId, lock_id: LockId) -> Result<(), PauseError> {
4253        let mut s = self.lock_state();
4254        let rec = s
4255            .nodes
4256            .get_mut(&node_id)
4257            .ok_or(PauseError::UnknownNode(node_id))?;
4258        // QA A5 (2026-05-07): terminated nodes can't be re-paused. Without
4259        // this check, a stale pause-controller calling pause() on an
4260        // already-terminated node would re-arm `pause_state` to Paused.
4261        // The terminate_node path collapses pause_state → Active and
4262        // drains the buffer (A3-related), but doesn't gate subsequent
4263        // pause() calls. Treat as idempotent no-op (consistent with how
4264        // emit/complete/error early-return on terminal).
4265        if rec.terminal.is_some() {
4266            return Ok(());
4267        }
4268        // Slice F audit close (2026-05-07): `PausableMode::Off` means the
4269        // dispatcher ignores PAUSE for this node — tier-3 flushes
4270        // immediately, fn fires immediately. Treat the call as a successful
4271        // no-op so callers don't need to special-case.
4272        if rec.pausable == PausableMode::Off {
4273            return Ok(());
4274        }
4275        rec.pause_state.add_lock(lock_id);
4276        Ok(())
4277    }
4278
4279    /// Release a pause lock on `node_id`. If the lockset becomes empty, the
4280    /// node transitions back to `Active` and the buffered messages are
4281    /// dispatched to subscribers in arrival order. Returns a [`ResumeReport`]
4282    /// when the final lock released; `None` if the lockset is still
4283    /// non-empty (further locks held).
4284    ///
4285    /// Releasing an unknown `lock_id` (or releasing on an already-Active
4286    /// node) is an idempotent no-op returning `None`.
4287    pub fn resume(
4288        &self,
4289        node_id: NodeId,
4290        lock_id: LockId,
4291    ) -> Result<Option<ResumeReport>, PauseError> {
4292        // Phase 1 (lock-held): collect drained buffer + pending-wave flag +
4293        // sink Arcs. For default-mode nodes whose `pending_wave` was set
4294        // during pause, schedule a single fn-fire by adding to
4295        // `pending_fires` BEFORE we exit the lock — the wave engine picks
4296        // it up on the next drain tick.
4297        let (sinks, messages, dropped, pending_wave_for_default) = {
4298            let mut s = self.lock_state();
4299            let rec = s
4300                .nodes
4301                .get_mut(&node_id)
4302                .ok_or(PauseError::UnknownNode(node_id))?;
4303            // For Off mode, pause/resume are no-ops by construction.
4304            if rec.pausable == PausableMode::Off {
4305                return Ok(None);
4306            }
4307            let was_default_mode = rec.pausable == PausableMode::Default;
4308            // Capture pending_wave BEFORE remove_lock collapses the state.
4309            let pending_wave = if was_default_mode {
4310                rec.pause_state.take_pending_wave()
4311            } else {
4312                false
4313            };
4314            let Some((buffer, dropped)) = rec.pause_state.remove_lock(lock_id) else {
4315                // Not the final-resume — restore the pending_wave flag we
4316                // tentatively cleared, since we're not transitioning to
4317                // Active yet.
4318                if pending_wave {
4319                    rec.pause_state.mark_pending_wave();
4320                }
4321                return Ok(None);
4322            };
4323            let sinks: Vec<Sink> = rec.subscribers.values().cloned().collect();
4324            let messages: Vec<Message> = buffer.into_iter().collect();
4325            // Default-mode pending-wave handling: schedule the fn-fire so
4326            // the wave engine consolidates the pause-window dep deliveries
4327            // into one fn execution. State nodes don't fire fn (no
4328            // `pending_fires` membership has effect for them).
4329            if pending_wave && was_default_mode {
4330                s.pending_fires.insert(node_id);
4331            }
4332            (sinks, messages, dropped, pending_wave && was_default_mode)
4333        };
4334        let replayed = u32::try_from(messages.len()).unwrap_or(u32::MAX);
4335
4336        // Phase 2 (lock-released): fire sinks for ResumeAll-buffered
4337        // messages. Default-mode resume produces no buffered replay (the
4338        // consolidated fn-fire produces fresh wave traffic via the standard
4339        // commit_emission path).
4340        if !messages.is_empty() {
4341            for sink in &sinks {
4342                sink(&messages);
4343            }
4344            // Phase 3: balance the retain_handle calls done at buffer-push
4345            // time — sinks observe values but don't own refcount shares.
4346            for msg in &messages {
4347                if let Some(h) = msg.payload_handle() {
4348                    self.binding.release_handle(h);
4349                }
4350            }
4351        }
4352
4353        // Phase 4 (default-mode): drain the consolidated fn-fire scheduled
4354        // in Phase 1. `run_wave_for(node_id)` acquires the partitions
4355        // touched from `node_id` (Slice Y1 / Phase E) and runs the standard
4356        // drain pipeline; the new fn-fire emerges as a normal wave's worth
4357        // of messages to subscribers.
4358        if pending_wave_for_default {
4359            self.run_wave_for(node_id, |_this| {
4360                // The pending_fires entry was pushed in Phase 1 under the
4361                // lock. run_wave's drain picks it up.
4362            });
4363        }
4364        Ok(Some(ResumeReport { replayed, dropped }))
4365    }
4366
4367    /// True if the node currently holds at least one pause lock.
4368    #[must_use]
4369    pub fn is_paused(&self, node_id: NodeId) -> bool {
4370        self.state
4371            .lock()
4372            .require_node(node_id)
4373            .pause_state
4374            .is_paused()
4375    }
4376
4377    /// Number of pause locks currently held on `node_id`. `0` if Active.
4378    #[must_use]
4379    pub fn pause_lock_count(&self, node_id: NodeId) -> usize {
4380        self.state
4381            .lock()
4382            .require_node(node_id)
4383            .pause_state
4384            .lock_count()
4385    }
4386
4387    /// Test helper: whether `node_id` currently holds the given `lock_id`.
4388    #[must_use]
4389    pub fn holds_pause_lock(&self, node_id: NodeId, lock_id: LockId) -> bool {
4390        self.state
4391            .lock()
4392            .require_node(node_id)
4393            .pause_state
4394            .contains_lock(lock_id)
4395    }
4396}
4397
4398// -----------------------------------------------------------------------
4399// set_deps — atomic dep mutation
4400// -----------------------------------------------------------------------
4401
4402/// Errors returnable by [`Core::set_deps`].
4403///
4404/// Per `~/src/graphrefly-ts/docs/research/rewire-design-notes.md` and the
4405/// Phase 13.8 Q1 lock:
4406/// - `SelfDependency` — `n in newDeps` (self-loops are pathological without
4407///   explicit fixed-point semantics, which GraphReFly does not provide).
4408/// - `WouldCreateCycle { path }` — adding the new edge would create a cycle.
4409///   The `path` field reports the offending dep chain for debuggability.
4410/// - `UnknownNode` / `NotComputeNode` — invariant violations from the caller.
4411/// - `TerminalNode` — `n` itself has emitted COMPLETE/ERROR; rewiring a
4412///   terminal stream is a category error (terminal is one-shot at this
4413///   layer; recovery is the resubscribable path on a fresh subscribe).
4414/// - `TerminalDep` — a newly-added dep is terminal AND not resubscribable.
4415///   Resubscribable terminal deps are accepted because the subscribe path
4416///   resets their lifecycle. Non-resubscribable terminal deps would deliver
4417///   their already-emitted terminal directly to `n`'s `dep_terminals` slot,
4418///   which is rarely intended.
4419#[derive(Error, Debug, Clone, PartialEq)]
4420pub enum SetDepsError {
4421    /// `n` appeared in `new_deps` (self-loop rejection).
4422    #[error("set_deps({n:?}, ...): self-dependency rejected (n appeared in new_deps)")]
4423    SelfDependency { n: NodeId },
4424
4425    /// Adding the new dep would create a cycle. `path` is the chain
4426    /// `[added_dep, ..., n]` reachable via existing deps.
4427    #[error(
4428        "set_deps({n:?}, ...): cycle would form via path {path:?} \
4429         (adding {added_dep:?} → {n:?} closes the loop)"
4430    )]
4431    WouldCreateCycle {
4432        n: NodeId,
4433        added_dep: NodeId,
4434        path: Vec<NodeId>,
4435    },
4436
4437    #[error("set_deps: unknown node {0:?}")]
4438    UnknownNode(NodeId),
4439
4440    #[error("set_deps: node {0:?} is not a compute node (state nodes have no deps)")]
4441    NotComputeNode(NodeId),
4442
4443    /// `n` itself has terminated (COMPLETE / ERROR). Rewiring a terminal node
4444    /// is rejected — the stream has ended at this layer. To recover, mark
4445    /// the node resubscribable before terminate; a fresh subscribe will then
4446    /// reset its lifecycle.
4447    #[error("set_deps({n:?}, ...): node has already terminated; cannot rewire a terminal node")]
4448    TerminalNode { n: NodeId },
4449
4450    /// A newly-added dep is terminal AND non-resubscribable. Per Phase 13.8
4451    /// Q1, this is rejected; resubscribable terminal deps are allowed
4452    /// because the subscribe path resets them when activated. Already-present
4453    /// terminal deps are unaffected (their terminal status was accepted at
4454    /// the time they terminated).
4455    #[error(
4456        "set_deps({n:?}, ...): added dep {dep:?} is terminal and not resubscribable; \
4457         either mark it resubscribable before terminate, or remove the dep from new_deps"
4458    )]
4459    TerminalDep { n: NodeId, dep: NodeId },
4460
4461    /// `n` itself is currently mid-fire — a user fn for `n` re-entered Core
4462    /// via `set_deps(n, ...)` from inside `n`'s own `invoke_fn` /
4463    /// `project_each` / `predicate_each` / etc. Phase 1 of the dispatcher
4464    /// snapshotted `dep_handles` BEFORE the lock-released callback; the
4465    /// callback returning a `tracked` set indexed against THAT ordering
4466    /// would corrupt indices if the rewire re-orders deps mid-fire.
4467    /// Rejected to preserve the dynamic-tracked-indices invariant (D1).
4468    ///
4469    /// Workaround: schedule the rewire from a different node's fn (via
4470    /// `Core::emit` on a state node and observing the emit downstream),
4471    /// or perform the rewire after the wave completes (e.g. from a sink
4472    /// callback that is itself outside any fn-fire scope).
4473    ///
4474    /// Slice F (2026-05-07) — A6.
4475    #[error(
4476        "set_deps({n:?}, ...): rejected — node {n:?} is currently mid-fire \
4477         (set_deps from inside the firing node's own fn would corrupt the \
4478         Dynamic `tracked` indices snapshot taken before invoke_fn). \
4479         Schedule the rewire outside this fire scope."
4480    )]
4481    ReentrantOnFiringNode { n: NodeId },
4482
4483    /// `set_deps(n, ...)` would trigger a partition migration (union or
4484    /// split in the per-subgraph union-find registry) that affects the
4485    /// partition of a node currently mid-fire on this thread. Distinct
4486    /// from [`Self::ReentrantOnFiringNode`]: that variant rejects
4487    /// `set_deps(n, ...)` where `n` itself is firing; this variant
4488    /// rejects `set_deps(n, ...)` on some OTHER node whose union/split
4489    /// shifts a firing node's partition root mid-wave.
4490    ///
4491    /// Why this matters: Y1's wave engine holds an
4492    /// [`Arc<crate::subgraph::SubgraphLockBox>`] for the firing node's
4493    /// partition for the wave's duration. A union mid-wave swaps the
4494    /// box-identity for one of the two affected partitions; a split
4495    /// (Y1+ post-Phase-F) extracts a fresh box for the orphan side.
4496    /// Either way the held Arc would diverge from the registry's
4497    /// current root for that partition, so the wave would lose
4498    /// serialization against the box's true partition mid-flight.
4499    ///
4500    /// Per [`SESSION-rust-port-d3-per-subgraph-parallelism.md`](https://github.com/graphrefly/graphrefly-ts/blob/main/archive/docs/SESSION-rust-port-d3-per-subgraph-parallelism.md)
4501    /// Q3 = (a-strict): mid-wave migration is rejected at edge-mutation
4502    /// time. If a real consumer surfaces pressure to support mid-wave
4503    /// migration, lift via state-migration logic in a follow-up — but
4504    /// the v1 contract is "the partition a wave runs in cannot change
4505    /// shape mid-flight."
4506    ///
4507    /// `n` is the node whose `set_deps` was rejected; `firing` is the
4508    /// concretely-identified firing node whose partition would be
4509    /// migrated. Workaround: schedule the rewire outside the wave
4510    /// (e.g. emit a state-change that triggers `set_deps` from a sink
4511    /// callback running post-flush).
4512    ///
4513    /// Slice Y1 (D3 / D091, 2026-05-08).
4514    #[error(
4515        "set_deps({n:?}, ...): rejected — would migrate the partition of \
4516         currently-firing node {firing:?} mid-wave (union/split during \
4517         fire would invalidate the held wave_owner Arc). Schedule the \
4518         rewire outside the wave."
4519    )]
4520    PartitionMigrationDuringFire { n: NodeId, firing: NodeId },
4521}
4522
4523impl Core {
4524    /// Atomic dep mutation — change a node's upstream deps without TEARDOWN
4525    /// cascading and without losing cache.
4526    ///
4527    /// Per the TLA+-verified design at
4528    /// `~/src/graphrefly-ts/docs/research/wave_protocol_rewire.tla`
4529    /// (35,950 distinct states, all 7 invariants clean):
4530    ///
4531    /// - Removed deps: clear dirtyMask bit, drain pending queue, drop DepRecord.
4532    /// - Added deps: SENTINEL prevData; push-on-subscribe if added dep has cached DATA.
4533    /// - Preserved: `firstRunPassed`, `pauseLocks`, `pauseBuffer`, `cache` (ROM/RAM).
4534    /// - Status auto-settles if dirtyMask becomes empty.
4535    /// - Idempotent on `new_deps == current deps`.
4536    /// - Self-rewire `n ∈ new_deps` rejected (`SelfDependency`).
4537    /// - Cycles rejected (`WouldCreateCycle`).
4538    /// - Allowed mid-wave + while paused.
4539    /// - Phase 13.8 Q1: terminal `n` rejected (`TerminalNode`); newly-added
4540    ///   terminal non-resubscribable deps rejected (`TerminalDep`).
4541    ///
4542    /// The body is a single atomic dep-mutation transaction with several
4543    /// discrete validation stages. Splitting would require passing a
4544    /// partially-mutable CoreState across helpers, and the transaction's
4545    /// locality is what makes the F1 refcount-leak collection work.
4546    #[allow(clippy::too_many_lines)]
4547    pub fn set_deps(&self, n: NodeId, new_deps: &[NodeId]) -> Result<(), SetDepsError> {
4548        let mut s = self.lock_state();
4549        // Validate node exists and is compute. Read-once via the helper so
4550        // subsequent code can use `require_node(n)` without re-checking.
4551        let (is_state, is_producer, is_terminal) = {
4552            let rec = s.nodes.get(&n).ok_or(SetDepsError::UnknownNode(n))?;
4553            (rec.is_state(), rec.is_producer(), rec.terminal.is_some())
4554        };
4555        if is_state || is_producer {
4556            // State and Producer nodes have no declared deps — set_deps
4557            // is meaningless. Producer nodes manage their own subscriptions
4558            // through the binding's ProducerCtx; mutating their (empty)
4559            // dep set would not affect that.
4560            return Err(SetDepsError::NotComputeNode(n));
4561        }
4562        // Reject if `n` itself is terminal (Phase 13.8 Q1: terminal nodes
4563        // cannot be rewired; recovery is via resubscribable subscribe).
4564        if is_terminal {
4565            return Err(SetDepsError::TerminalNode { n });
4566        }
4567        // A6 reentrancy guard (Slice F, 2026-05-07): reject if `n` is
4568        // currently mid-fire on the wave-owner thread. Closes the D1 hazard
4569        // where `Phase 1` snapshotted `dep_handles` against pre-rewire dep
4570        // ordering and `Phase 3` would store the returned `tracked` indices
4571        // against post-rewire ordering. Same-thread re-entry is the only
4572        // path that matters — cross-thread emits already block on
4573        // `wave_owner` per the M1 design.
4574        if s.currently_firing.contains(&n) {
4575            return Err(SetDepsError::ReentrantOnFiringNode { n });
4576        }
4577        // Self-rewire rejection.
4578        if new_deps.contains(&n) {
4579            return Err(SetDepsError::SelfDependency { n });
4580        }
4581        // Validate all new deps exist.
4582        for &d in new_deps {
4583            if !s.nodes.contains_key(&d) {
4584                return Err(SetDepsError::UnknownNode(d));
4585            }
4586        }
4587        // Cycle detection: data flows parent → child via the `children` map.
4588        // Adding edge `d → n` (d becomes a dep of n) creates a cycle iff
4589        // `d` is already reachable from `n` via existing data-flow edges
4590        // (so `n → ... → d` exists, and the new `d → n` closes the loop).
4591        // DFS from `n` along `children` edges, looking for each added dep.
4592        let current_deps: HashSet<NodeId> = s.require_node(n).dep_ids().collect();
4593        let new_deps_set: HashSet<NodeId> = new_deps.iter().copied().collect();
4594        let added: HashSet<NodeId> = new_deps_set.difference(&current_deps).copied().collect();
4595        for &d in &added {
4596            if let Some(path) = self.path_from_to(&s, n, d) {
4597                return Err(SetDepsError::WouldCreateCycle {
4598                    n,
4599                    added_dep: d,
4600                    path,
4601                });
4602            }
4603        }
4604        // Phase 13.8 Q1: reject newly-added deps that are terminal AND not
4605        // resubscribable. Resubscribable terminal deps are allowed — the
4606        // subscribe path resets their lifecycle when something activates
4607        // them. Already-present (kept) deps are unaffected; their terminal
4608        // status was accepted at the time they terminated.
4609        for &d in &added {
4610            let dep_rec = s.require_node(d);
4611            if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
4612                return Err(SetDepsError::TerminalDep { n, dep: d });
4613            }
4614        }
4615        // Compute `removed` early (Phase F: needs to be available for P13
4616        // split-case widening below). Idempotent fast-path moved below the
4617        // P13 check accordingly.
4618        let removed: HashSet<NodeId> = current_deps.difference(&new_deps_set).copied().collect();
4619
4620        // Slice Y1 (D3 / D091 — P13, 2026-05-08): reject mid-wave set_deps
4621        // that would shift a currently-firing node's partition root.
4622        // Distinct from `ReentrantOnFiringNode` (same-node case, line above).
4623        // Holds the registry lock briefly under the state lock per the
4624        // P12-fix lock-discipline invariant `state lock → registry mutex`.
4625        //
4626        // **Two cases:**
4627        // 1. **Union (Phase D)** — adding a cross-partition dep merges two
4628        //    components. Both pre-merge components are affected (the
4629        //    smaller-rank loser's box is dropped, its members migrate to
4630        //    the winner's root).
4631        // 2. **Split (Phase F, 2026-05-09)** — removing an edge whose
4632        //    removal disconnects the dep graph splits one component into
4633        //    two. The pre-split component is affected (every member
4634        //    re-unions; the orphan side gets a fresh `SubgraphLockBox`
4635        //    while the keep side preserves the original Arc).
4636        //
4637        // Either case migrates the partition root (and box-identity) for
4638        // affected nodes mid-wave; if any node currently firing on this
4639        // thread is in an affected partition, the wave's held
4640        // `Arc<SubgraphLockBox>` would diverge from the registry's new
4641        // canonical box. Q3 = (a-strict) per the D3 design lock rejects
4642        // both cases at edge-mutation time.
4643        if !s.currently_firing.is_empty() && (!added.is_empty() || !removed.is_empty()) {
4644            let mut reg = self.registry.lock();
4645            // Snapshot firing nodes' partitions. `partition_of` is mutating
4646            // (path compression) but partition IDENTITY is stable across
4647            // reads (only `union_nodes` / `split_partition` mutate roots).
4648            let firing_with_partition: Vec<(NodeId, crate::subgraph::SubgraphId)> = s
4649                .currently_firing
4650                .iter()
4651                .filter_map(|&f| reg.partition_of(f).map(|p| (f, p)))
4652                .collect();
4653            if !firing_with_partition.is_empty() {
4654                let part_n = reg.partition_of(n);
4655                // Case 1 (union): for each added dep, check cross-partition merge.
4656                for &added_dep in &added {
4657                    let part_added = reg.partition_of(added_dep);
4658                    if part_n == part_added {
4659                        continue; // same-partition add is a no-op in union-find
4660                    }
4661                    let affected = [part_n, part_added];
4662                    if let Some(&(firing, _)) = firing_with_partition
4663                        .iter()
4664                        .find(|(_, p)| affected.contains(&Some(*p)))
4665                    {
4666                        return Err(SetDepsError::PartitionMigrationDuringFire { n, firing });
4667                    }
4668                }
4669                // Case 2 (split): for each removed dep, simulate undirected
4670                // walk from `removed_dep` skipping the would-be-removed edge
4671                // (`removed_dep → n`); if `n` is unreachable, removal would
4672                // disconnect — split — affecting all nodes in that
4673                // partition. Since dep edges are within a single partition
4674                // by construction (union-find merges on edge add), every
4675                // node currently in the partition is affected.
4676                //
4677                // QA-fix #4 (2026-05-09): pass `added_edges` as `extra_edges`
4678                // so a `set_deps` that simultaneously REMOVES one edge AND
4679                // ADDS another path isn't falsely rejected. Without this,
4680                // the pre-mutation walk doesn't see the would-be-added
4681                // edges and reports disconnect even when the net change
4682                // preserves connectivity.
4683                let added_edges: Vec<(NodeId, NodeId)> = added.iter().map(|&a| (a, n)).collect();
4684                for &removed_dep in &removed {
4685                    let part_removed = reg.partition_of(removed_dep);
4686                    let visited = walk_undirected_dep_graph(
4687                        &s,
4688                        removed_dep,
4689                        Some((removed_dep, n)),
4690                        &added_edges,
4691                    );
4692                    let would_disconnect = !visited.contains(&n);
4693                    if would_disconnect {
4694                        if let Some(&(firing, _)) = firing_with_partition
4695                            .iter()
4696                            .find(|(_, p)| Some(*p) == part_removed)
4697                        {
4698                            return Err(SetDepsError::PartitionMigrationDuringFire { n, firing });
4699                        }
4700                    }
4701                }
4702            }
4703        }
4704        // Idempotent fast-path. Now safe to short-circuit since the P13
4705        // check above already considered both `added` and `removed`.
4706        if added.is_empty() && removed.is_empty() {
4707            return Ok(());
4708        }
4709
4710        // Snapshot old deps (ordered) for topology event, before mutation.
4711        let old_deps_vec: Vec<NodeId> = s.require_node(n).dep_ids_vec();
4712
4713        // Carry out the rewire atomically.
4714        // 1. Build new dep_records, preserving DepRecord state for kept deps.
4715        let new_deps_vec: Vec<NodeId> = new_deps.to_vec();
4716        //
4717        // Refcount discipline (F1 audit fix): each `Some(TerminalKind::Error(h))`
4718        // slot owns a refcount share retained at `terminate_node` time. When a
4719        // dep is REMOVED, its slot is dropped — the corresponding handle's
4720        // share must be released here, otherwise it leaks until Core drop.
4721        // Also release data_batch retains for removed deps.
4722        let (new_dep_records, removed_handles): (Vec<DepRecord>, Vec<HandleId>) = {
4723            let rec = s.require_node(n);
4724            // Index old dep_records by NodeId for O(1) lookup of kept deps.
4725            let old_by_node: HashMap<NodeId, &DepRecord> =
4726                rec.dep_records.iter().map(|dr| (dr.node, dr)).collect();
4727            let new_records: Vec<DepRecord> = new_deps_vec
4728                .iter()
4729                .map(|&d| {
4730                    if let Some(old) = old_by_node.get(&d) {
4731                        // Kept dep: preserve all state (prev_data, data_batch,
4732                        // terminal, wave flags). Subscriptions stay live.
4733                        DepRecord {
4734                            node: d,
4735                            prev_data: old.prev_data,
4736                            dirty: old.dirty,
4737                            involved_this_wave: old.involved_this_wave,
4738                            data_batch: old.data_batch.clone(),
4739                            terminal: old.terminal,
4740                        }
4741                    } else {
4742                        // Added dep: fresh sentinel record.
4743                        DepRecord::new(d)
4744                    }
4745                })
4746                .collect();
4747            // Collect handles to release from REMOVED dep records.
4748            let mut to_release: Vec<HandleId> = Vec::new();
4749            for d in &removed {
4750                if let Some(old) = old_by_node.get(d) {
4751                    if let Some(TerminalKind::Error(h)) = old.terminal {
4752                        to_release.push(h);
4753                    }
4754                    // Release data_batch retains for removed deps.
4755                    for &h in &old.data_batch {
4756                        to_release.push(h);
4757                    }
4758                }
4759            }
4760            (new_records, to_release)
4761        };
4762        // Clear dirtyMask bit by re-emitting the wave-bookkeeping: we don't
4763        // currently model a per-dep dirtyMask explicitly (we use the boolean
4764        // `dirty` flag at node level). Removing a dep's entry from the implicit
4765        // mask is therefore implicit — by removing the dep, future emissions
4766        // from it can't re-arm the bit. The per-dep `involved_this_wave` flag
4767        // stays wave-scoped and gets cleared at wave end. The setDeps action
4768        // itself does NOT change the dirty boolean unless all deps are cleared;
4769        // in that case we settle.
4770        // Slice E2 (D067): on a dynamic node that had previously fired its
4771        // fn, capture `has_fired_once` BEFORE the reset so we can fire
4772        // `OnRerun` cleanup lock-released after `drop(s)` below. Without
4773        // this, the next `fire_regular` Phase 1 would capture
4774        // `has_fired_once = false`, causing Phase 1.5 to skip OnRerun —
4775        // silently dropping the prior activation's cleanup closure when
4776        // the next `invoke_fn` overwrites `current_cleanup`. Per spec
4777        // R2.4.5, `set_deps` does NOT end the activation cycle
4778        // (subscribe→unsubscribe is the cycle boundary), so OnRerun must
4779        // fire on every re-fire including post-set_deps.
4780        let fire_set_deps_on_rerun;
4781        {
4782            let rec = s.require_node_mut(n);
4783            fire_set_deps_on_rerun = rec.is_dynamic && rec.has_fired_once;
4784            rec.dep_records = new_dep_records;
4785            // Re-derive `tracked` for static derived: all indices.
4786            // For dynamic: clear `tracked` AND reset `has_fired_once` so the
4787            // next dep delivery satisfies the first-fire branch in
4788            // `deliver_data_to_consumer` (`!has_fired_once || tracked.contains(...)`).
4789            // Without resetting `has_fired_once`, the cleared `tracked` blocks
4790            // every future fire — fn never re-runs and the dynamic node sits
4791            // on stale cache derived from the old dep set. The next fire
4792            // re-runs fn unconditionally; fn's returned `tracked` then
4793            // repopulates `rec.tracked` and normal selective-deps semantics
4794            // resume from the next dep update onward.
4795            if rec.is_dynamic {
4796                rec.tracked.clear();
4797                rec.has_fired_once = false;
4798            } else {
4799                // Derived (static) and Operator track all deps.
4800                rec.tracked = (0..new_deps_vec.len()).collect();
4801            }
4802        }
4803
4804        // 2. Update inverted-edge map (children).
4805        for &removed_dep in &removed {
4806            if let Some(set) = s.children.get_mut(&removed_dep) {
4807                set.remove(&n);
4808            }
4809        }
4810        for &added_dep in &added {
4811            s.children.entry(added_dep).or_default().insert(n);
4812        }
4813
4814        // 3. Push-on-subscribe for added deps with cached DATA. Wraps in a
4815        // wave so any downstream propagation runs cleanly. We capture only
4816        // the LIST of added deps (not their cache values) because the cache
4817        // can change between releasing the validation lock and the wave's
4818        // re-acquisition — see the P2 race fix below.
4819        //
4820        // P2 (Slice A close /qa) — between `drop(s)` and `run_wave`'s
4821        // closure re-acquiring the lock, a concurrent thread could
4822        // invalidate one of the added deps, releasing its cache handle. A
4823        // pre-snapshot of `(added_dep, cache)` pairs would then carry a
4824        // dangling HandleId into `deliver_data_to_consumer`. The fix is to
4825        // re-read each added dep's `cache` INSIDE the closure (under the
4826        // freshly re-acquired state lock). The wave-owner re-entrant mutex
4827        // (Q2) blocks concurrent waves once we enter `run_wave`, so the
4828        // re-read sees a coherent post-validation state.
4829        let added_for_wave: Vec<NodeId> = added.iter().copied().collect();
4830        // Slice Y1 (D3 / D090 — P12 fix, 2026-05-08): maintain partition
4831        // membership BEFORE dropping the state lock so the registry can
4832        // never lag behind topology mutations as observed by concurrent
4833        // readers. Lock-order invariant `state lock → registry mutex`
4834        // (one-way; never registry → state) — see the matching block in
4835        // `Core::register` for the full rationale.
4836        //   - For each new edge: union the partitions of `n` and `added_dep`.
4837        //   - For each removed edge (Slice Y1 / Phase F, 2026-05-09):
4838        //     run undirected-dep-graph BFS from `removed_dep` over the
4839        //     POST-removal `s.children` + `dep_records`. If `n` is
4840        //     unreachable, the partition has split — gather the affected
4841        //     component nodes + intra-component edges, then call
4842        //     [`SubgraphRegistry::split_partition`] to migrate the orphan
4843        //     side onto a fresh `SubgraphLockBox`. Mid-fire splits would
4844        //     have been rejected at the P13 check above (Q3 = (a-strict)).
4845        {
4846            let mut reg = self.registry.lock();
4847            for &added_dep in &added {
4848                reg.union_nodes(n, added_dep);
4849            }
4850            for &removed_dep in &removed {
4851                // Post-removal walk — `s.children[removed_dep]` no longer
4852                // contains `n`, and `s.nodes[n].dep_records` no longer
4853                // contains `removed_dep`. No skip needed; no extra edges
4854                // (added edges are already applied to `s.children` and
4855                // `dep_records` by the time we reach this block).
4856                let visited = walk_undirected_dep_graph(&s, removed_dep, None, &[]);
4857                if visited.contains(&n) {
4858                    // Still connected via other dep edges — no split.
4859                    continue;
4860                }
4861                // Disconnected. `visited` is the keep-side (containing
4862                // `removed_dep`). Identify the original component, the
4863                // intra-component dep edges, and split.
4864                let original_root = reg.find(removed_dep);
4865                // Snapshot keys before iterating — `find` mutates via
4866                // path compression; iterating + mutating concurrently
4867                // would alias-borrow.
4868                let snapshot_keys: Vec<NodeId> = reg.registered_nodes();
4869                let component_nodes: Vec<NodeId> = snapshot_keys
4870                    .into_iter()
4871                    .filter(|&node| reg.find(node) == original_root)
4872                    .collect();
4873                let component_set: HashSet<NodeId> = component_nodes.iter().copied().collect();
4874                // Collect dep edges within the component (post-removal).
4875                // Edge convention: `(parent, child)` data-flow direction.
4876                let mut edges_in_component: Vec<(NodeId, NodeId)> = Vec::new();
4877                for &node in &component_nodes {
4878                    if let Some(rec) = s.nodes.get(&node) {
4879                        for d in rec.dep_records.iter().map(|r| r.node) {
4880                            if component_set.contains(&d) {
4881                                edges_in_component.push((d, node));
4882                            }
4883                        }
4884                    }
4885                }
4886                let keep_side_nodes: Vec<NodeId> = visited.iter().copied().collect();
4887                reg.split_partition(&component_nodes, &keep_side_nodes, &edges_in_component);
4888                // Marker call kept for symmetry with `union_nodes` — the
4889                // registry's `on_edge_removed` is itself a no-op (Phase F
4890                // moved the actual work into Core where the dep-graph
4891                // view is available).
4892                reg.on_edge_removed(n, removed_dep);
4893            }
4894        }
4895        // Drop the state lock before run_wave (which acquires its own) and
4896        // before crossing the binding boundary for the F1 refcount-fix
4897        // releases. Keeps the lock-discipline split (binding calls outside
4898        // the state lock) consistent with the rest of the dispatcher.
4899        drop(s);
4900        // Slice E2 (D067): fire OnRerun lock-released for dynamic nodes
4901        // that had previously fired. The cleanup closure cleans up
4902        // resources tied to the old dep shape before the next fn-fire
4903        // (triggered by added-dep push-on-subscribe below) registers a
4904        // fresh cleanup spec. Direct fire (NOT via deferred_cleanup_hooks)
4905        // because set_deps may NOT enter a wave (no added deps → no
4906        // run_wave below) — queueing the hook would orphan it until the
4907        // next unrelated wave drains.
4908        if fire_set_deps_on_rerun {
4909            self.binding.cleanup_for(n, CleanupTrigger::OnRerun);
4910        }
4911        // Fire topology event after lock is dropped.
4912        self.fire_topology_event(&crate::topology::TopologyEvent::DepsChanged {
4913            node: n,
4914            old_deps: old_deps_vec,
4915            new_deps: new_deps_vec.clone(),
4916        });
4917        if !added_for_wave.is_empty() {
4918            // Slice Y1 / Phase E: push-on-subscribe wave runs on `n`'s
4919            // touched partitions. Added deps are now unioned with `n`
4920            // (Phase C P12 fix moved registry mutation inside the state
4921            // lock), so any cascade through them stays in `n`'s partition
4922            // set as walked by `compute_touched_partitions`.
4923            self.run_wave_for(n, |this| {
4924                let mut s = this.lock_state();
4925                // Defensive: re-validate `n` still exists and isn't terminal.
4926                // A concurrent path could have terminated it between
4927                // validation and run_wave_for's partition-lock acquisition.
4928                if !s.nodes.contains_key(&n) || s.require_node(n).terminal.is_some() {
4929                    return;
4930                }
4931                for added_dep in &added_for_wave {
4932                    // Re-read cache under the wave-owner-held lock — this
4933                    // is the post-validation, post-concurrent-action
4934                    // snapshot. NO_HANDLE means the dep was invalidated
4935                    // concurrently; skip (no data to push).
4936                    let cache = match s.nodes.get(added_dep) {
4937                        Some(rec) => rec.cache,
4938                        None => continue, // dep deleted concurrently
4939                    };
4940                    if cache == NO_HANDLE {
4941                        continue;
4942                    }
4943                    let dep_idx = s.require_node(n).dep_index_of(*added_dep);
4944                    if let Some(idx) = dep_idx {
4945                        this.deliver_data_to_consumer(&mut s, n, idx, cache);
4946                    }
4947                }
4948            });
4949        }
4950        for h in removed_handles {
4951            self.binding.release_handle(h);
4952        }
4953        Ok(())
4954    }
4955
4956    /// DFS from `from` along data-flow edges (children map) looking for `to`.
4957    /// Returns the path including endpoints, or `None` if unreachable. Used
4958    /// for cycle detection in [`Self::set_deps`].
4959    fn path_from_to(&self, s: &CoreState, from: NodeId, to: NodeId) -> Option<Vec<NodeId>> {
4960        if from == to {
4961            return Some(vec![from]);
4962        }
4963        let mut stack: Vec<(NodeId, Vec<NodeId>)> = vec![(from, vec![from])];
4964        let mut visited: HashSet<NodeId> = HashSet::new();
4965        while let Some((cur, path)) = stack.pop() {
4966            if !visited.insert(cur) {
4967                continue;
4968            }
4969            if cur == to {
4970                return Some(path);
4971            }
4972            if let Some(children) = s.children.get(&cur) {
4973                for &child in children {
4974                    let mut new_path = path.clone();
4975                    new_path.push(child);
4976                    stack.push((child, new_path));
4977                }
4978            }
4979        }
4980        None
4981    }
4982}
4983
4984// CoreState helpers — kept on the inner struct so they're naturally scoped
4985// to the lock guard.
4986impl CoreState {
4987    fn alloc_node_id(&mut self) -> NodeId {
4988        let id = NodeId::new(self.next_node_id);
4989        self.next_node_id += 1;
4990        id
4991    }
4992
4993    fn alloc_sub_id(&mut self) -> SubscriptionId {
4994        let id = SubscriptionId(self.next_subscription_id);
4995        self.next_subscription_id += 1;
4996        id
4997    }
4998
4999    /// Clear wave-scoped flags and rotate per-dep batch data on every
5000    /// node. Run at the end of every wave (regular drain via `run_wave`,
5001    /// activation drain via `activate_derived`, and `BatchGuard::drop`'s
5002    /// drain). Centralized so a future wave-state field can't be missed
5003    /// at one of the cleanup sites.
5004    ///
5005    /// Per-dep rotation (R2.9.b / R1.3.6.b):
5006    /// - `prev_data` ← last element of `data_batch` (or unchanged if empty).
5007    ///   The last batch entry's retain transfers to `prev_data`; the old
5008    ///   `prev_data`'s retain is released. All earlier batch entries are
5009    ///   released.
5010    /// - `data_batch` cleared.
5011    /// - Per-dep `dirty` and `involved_this_wave` cleared.
5012    ///
5013    /// Handle releases are pushed to `deferred_handle_releases` for
5014    /// post-lock-drop release by the caller.
5015    pub(crate) fn clear_wave_state(&mut self, cps: &mut CrossPartitionState) {
5016        // Q2 (2026-05-09): `pending_auto_resolve` + `pending_pause_overflow`
5017        // clears moved to [`CrossPartitionState::clear_wave_state`]. The
5018        // per-NodeRecord rotation below ALSO pushes batch-handle and
5019        // prev_data releases into `cps.deferred_handle_releases` (was
5020        // `s.deferred_handle_releases` pre-Q2). Caller passes the
5021        // `cps` guard already-acquired in lock-discipline order
5022        // (`state → cross_partition`).
5023        // A6 (Slice F, 2026-05-07): currently_firing is push/pop balanced
5024        // by FiringGuard's RAII (including on panic). It should already be
5025        // empty here, but defensively clear in case a future code path
5026        // forgets the guard.
5027        self.currently_firing.clear();
5028        // Slice G tier3 emit tracking moved to per-partition state (Q3,
5029        // 2026-05-09); cleared by [`super::WaveOwnerGuard::drop`] on
5030        // outermost release for each partition the wave touched.
5031        // Slice E2 (D057): per-wave-per-node OnInvalidate dedup is
5032        // wave-scoped — cleared so the next wave can fire cleanups again.
5033        self.invalidate_hooks_fired_this_wave.clear();
5034        // Slice E2 INVARIANT (DO NOT CHANGE WITHOUT THINKING):
5035        // `deferred_cleanup_hooks` is NOT cleared here. It follows the
5036        // `deferred_handle_releases` discipline:
5037        //   - SUCCESS path (`BatchGuard::drop` non-panic): drained by
5038        //     `Core::drain_deferred` AFTER `clear_wave_state` runs, then
5039        //     fired lock-released by `Core::fire_deferred`.
5040        //   - PANIC-DISCARD path (`BatchGuard::drop` panic): explicitly
5041        //     `std::mem::take`-and-dropped AFTER `clear_wave_state` runs,
5042        //     silently per D061.
5043        // Clearing it INSIDE `clear_wave_state` would race the success
5044        // path: the wave's queued `OnInvalidate` cleanup hooks would be
5045        // erased BEFORE `drain_deferred` could take them, dropping every
5046        // user cleanup callback on every successful wave.
5047        // If a future change moves `deferred_cleanup_hooks` ownership
5048        // here, ALSO move the post-`clear_wave_state` take in both
5049        // BatchGuard paths to BEFORE the clear call. Until then, leaving
5050        // the field untouched here is load-bearing.
5051        for rec in self.nodes.values_mut() {
5052            rec.dirty = false;
5053            rec.involved_this_wave = false;
5054            for dr in &mut rec.dep_records {
5055                let batch_len = dr.data_batch.len();
5056                if batch_len > 0 {
5057                    // Release all batch entries EXCEPT the last — the last
5058                    // entry's retain transfers to prev_data.
5059                    for &h in &dr.data_batch[..batch_len - 1] {
5060                        cps.deferred_handle_releases.push(h);
5061                    }
5062                    // Release the OLD prev_data (its retain was from the
5063                    // previous wave's rotation or from initial delivery).
5064                    if dr.prev_data != NO_HANDLE {
5065                        cps.deferred_handle_releases.push(dr.prev_data);
5066                    }
5067                    // Rotate: last batch entry becomes new prev_data.
5068                    // Its retain carries over — no extra retain needed.
5069                    dr.prev_data = dr.data_batch[batch_len - 1];
5070                    dr.data_batch.clear();
5071                }
5072                dr.dirty = false;
5073                dr.involved_this_wave = false;
5074            }
5075        }
5076    }
5077
5078    pub(crate) fn require_node(&self, id: NodeId) -> &NodeRecord {
5079        self.nodes
5080            .get(&id)
5081            .unwrap_or_else(|| panic!("unknown node {id:?}"))
5082    }
5083
5084    pub(crate) fn require_node_mut(&mut self, id: NodeId) -> &mut NodeRecord {
5085        self.nodes
5086            .get_mut(&id)
5087            .unwrap_or_else(|| panic!("unknown node {id:?}"))
5088    }
5089}
5090
5091/// Release every binding-side refcount share owned by this `CoreState`
5092/// when the last `Core` clone drops the inner Mutex.
5093///
5094/// Without this, every retained handle in `cache` / `terminal` Error /
5095/// `dep_terminals` Error / pause-buffer-payload would leak in the binding
5096/// registry until process exit. Production bindings (napi-rs, pyo3,
5097/// wasm-bindgen) all maintain handle-ref maps that grow unbounded without
5098/// this cleanup.
5099///
5100/// Safe to call during panic unwinding — `BindingBoundary::release_handle`
5101/// is the only call, and a panicking binding during cleanup would already
5102/// have been a problem in normal operation.
5103impl Drop for CoreState {
5104    fn drop(&mut self) {
5105        // Drain pending in-flight retains too, so a panic mid-wave doesn't
5106        // strand the queue_notify retains.
5107        let pending = std::mem::take(&mut self.pending_notify);
5108        // `deferred_flush_jobs` carries `Vec<Sink>` clones — those Arcs
5109        // drop naturally when this CoreState drops; no handles to release
5110        // there.
5111        let _ = std::mem::take(&mut self.deferred_flush_jobs);
5112        // Q2 (2026-05-09): `deferred_handle_releases` and
5113        // `wave_cache_snapshots` releases moved to
5114        // [`Drop for CrossPartitionState`].
5115
5116        // Per-node retained handles:
5117        //   - `cache` (1 retain per non-NO_HANDLE state cache or
5118        //     populated compute cache).
5119        //   - `terminal == Some(Error(h))` (1 retain on the terminal slot).
5120        //   - `dep_terminals[i] == Some(Error(h))` (1 retain per consumer's
5121        //     terminated-dep slot).
5122        //   - `pause_state` paused buffer messages with payload handles
5123        //     (1 retain per buffered Data/Error).
5124        for rec in self.nodes.values_mut() {
5125            if rec.cache != NO_HANDLE {
5126                self.binding.release_handle(rec.cache);
5127            }
5128            if let Some(TerminalKind::Error(h)) = rec.terminal {
5129                self.binding.release_handle(h);
5130            }
5131            for dr in &rec.dep_records {
5132                if let Some(TerminalKind::Error(h)) = dr.terminal {
5133                    self.binding.release_handle(h);
5134                }
5135                // Release data_batch retains (in-flight wave data).
5136                for &h in &dr.data_batch {
5137                    self.binding.release_handle(h);
5138                }
5139                // Release prev_data retain (cross-wave persistence).
5140                if dr.prev_data != NO_HANDLE {
5141                    self.binding.release_handle(dr.prev_data);
5142                }
5143            }
5144            if let PauseState::Paused { buffer, .. } = &rec.pause_state {
5145                for msg in buffer {
5146                    if let Some(h) = msg.payload_handle() {
5147                        self.binding.release_handle(h);
5148                    }
5149                }
5150            }
5151            // Slice E1: release replay-buffer retains.
5152            for &h in &rec.replay_buffer {
5153                self.binding.release_handle(h);
5154            }
5155            // Operator scratch (Slice C-3, D026): generic per-operator
5156            // state struct. Each variant's release_handles releases the
5157            // shares it owns (Scan/Reduce acc, Distinct/Pairwise prev,
5158            // Last latest + default; Take/Skip/TakeWhile own no handles).
5159            if let Some(scratch) = rec.op_scratch.as_mut() {
5160                scratch.release_handles(&*self.binding);
5161            }
5162        }
5163
5164        // Pending wave retains. Slice X4 / D2: walk all batches' messages
5165        // — `iter_messages` flattens the per-node `SmallVec<PendingBatch>`.
5166        for entry in pending.values() {
5167            for msg in entry.iter_messages() {
5168                if let Some(h) = msg.payload_handle() {
5169                    self.binding.release_handle(h);
5170                }
5171            }
5172        }
5173        // Q2 (2026-05-09): `deferred_handle_releases` +
5174        // `wave_cache_snapshots` retain-release moved to
5175        // [`Drop for CrossPartitionState`] (sibling Mutex on `Core`).
5176        // Both Arc<Mutex<_>>s drop together when the last `Core` clone
5177        // drops, so the binding-side share gets released in either
5178        // ordering.
5179    }
5180}