Skip to main content

graphrefly_core/
node.rs

1//! The dispatcher — node registration, subscription, wave engine.
2//!
3//! Mirrors `~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`
4//! (the Phase 13.6 brainstorm prototype, ~370 lines, 22 invariant tests).
5//!
6//! # Scope (M1 dispatcher + Slice A+B parity, closed 2026-05-05)
7//!
8//! - State + derived + dynamic node registration.
9//! - Subscribe / unsubscribe with push-on-subscribe (R1.2.3).
10//! - RAII [`Subscription`] with Drop-based deregister (§10.12).
11//! - DIRTY → DATA / RESOLVED ordering (R1.3.1.b two-phase push).
12//! - Equals-substitution (R1.3.2): identity is zero-FFI; custom crosses boundary.
13//! - First-run gate (R2.5.3) — fn does not fire until every dep has a handle.
14//! - Diamond resolution — one fn fire per wave even with shared upstream.
15//! - `set_deps()` atomic dep mutation with cycle detection + Phase 13.8 Q1
16//!   terminal-rejection policy (R3.3.1).
17//! - PAUSE / RESUME with lockId set + replay buffer (R1.2.6, R2.6, §10.2).
18//! - INVALIDATE broadcast + cascade with R1.4 idempotency.
19//! - COMPLETE / ERROR cascade + Lock 2.B auto-cascade gating
20//!   (ERROR dominates COMPLETE; first error wins).
21//! - TEARDOWN auto-precedes COMPLETE (R2.6.4 / Lock 6.F) +
22//!   `has_received_teardown` idempotency.
23//! - Meta TEARDOWN ordering (R1.3.9.d) — companions tear down before parent.
24//! - Resubscribable terminal lifecycle (R2.2.7, R2.5.3) — late subscribe to a
25//!   resubscribable terminal node resets lifecycle, except after TEARDOWN
26//!   (per F3 audit guard: TEARDOWN is permanent).
27//!
28//! # Module split (Slice C-1, 2026-05-05)
29//!
30//! Wave-engine internals (drain loop, fire selection, emission commit, sink
31//! dispatch) live in [`crate::batch`]. The split is purely organizational —
32//! the methods are still on `Core`. See `batch.rs` for the wave-engine
33//! entry points (`run_wave`, `drain_and_flush`, `commit_emission`,
34//! `queue_notify`, `deliver_data_to_consumer`).
35//!
36//! # Out of scope (later slices / milestones)
37//!
38//! - Deactivation cleanup (RAM nodes clear cache when sink count → 0) — M2.
39//!
40//! See [`migration-status.md`](../../../docs/migration-status.md) for the
41//! milestone tracker and [`porting-deferred.md`](../../../docs/porting-deferred.md)
42//! for surfaced concerns deferred to evidence-driven slices.
43//!
44//! # Re-entrance discipline (Slice A close, M1: fully lock-released)
45//!
46//! - **Wave-end sink fires** drop the state lock first. A subscriber's sink
47//!   that calls back into `Core::emit` / `pause` / `resume` / `invalidate` /
48//!   `complete` / `error` / `teardown` re-acquires the lock cleanly and runs
49//!   a nested wave (`s.in_tick` is cleared before the deferred-fire phase).
50//! - **`BindingBoundary::invoke_fn`** fires lock-released. The wave engine
51//!   acquires + drops the state lock per fn-fire iteration around the
52//!   `invoke_fn` callback. User fns may re-enter `Core::emit` / `pause` /
53//!   etc. and run a nested wave.
54//! - **`BindingBoundary::custom_equals`** fires lock-released.
55//!   `commit_emission` brackets the equals check around a lock release;
56//!   custom equals oracles may re-enter Core safely.
57//! - **Subscribe-time handshake** also fires lock-released. [`Core::subscribe`]
58//!   acquires the [`Core::wave_owner`] re-entrant mutex first (cross-thread
59//!   serialization), installs the sink under the state lock, drops the state
60//!   lock, then fires the per-tier handshake (`[Start]` / `[Data(cache)]?` /
61//!   `[Complete]?` / `[Error(h)]?` / `[Teardown]?` per R1.3.5.a) lock-released.
62//!   A handshake-time sink callback may re-enter Core (`emit` / `complete` /
63//!   `error` / `subscribe`); same-thread re-entry passes through `wave_owner`
64//!   transparently. Cross-thread emits block on `wave_owner` until the
65//!   subscribe path drops it, preserving R1.3.5.a happens-after ordering.
66
67use std::collections::VecDeque;
68use std::panic::{catch_unwind, AssertUnwindSafe};
69use std::sync::{Arc, Weak};
70
71use ahash::{AHashMap as HashMap, AHashSet as HashSet};
72use indexmap::IndexMap;
73use parking_lot::{Mutex, MutexGuard, ReentrantMutex};
74use smallvec::SmallVec;
75use thiserror::Error;
76
77use crate::batch::PendingPerNode;
78use crate::boundary::{BindingBoundary, CleanupTrigger};
79use crate::clock::monotonic_ns;
80use crate::handle::{FnId, HandleId, LockId, NodeId, NO_HANDLE};
81use crate::message::Message;
82
83/// Terminal-lifecycle state — once set on a node, the node will not emit
84/// further DATA; per-dep slots on consumers also use this to track which
85/// upstreams have terminated (R1.3.4 / Lock 2.B).
86///
87/// `Error` carries a [`HandleId`] resolving to the error value. Refcount is
88/// retained when the variant is stored in a node's `terminal` slot or any
89/// consumer's `dep_terminals` slot; v1 does not release these (terminal
90/// state is one-shot at this layer; release happens on resubscribable
91/// terminal-lifecycle reset, a separate slice).
92#[derive(Copy, Clone, Debug, PartialEq, Eq)]
93pub enum TerminalKind {
94    Complete,
95    Error(HandleId),
96}
97
98/// Node kind discriminant — **derived metadata** computed from
99/// [`NodeRecord`]'s field shape (D030 unification, Slice D).
100///
101/// Core no longer stores `kind` as a field; it's computed on demand from
102/// `(deps.is_empty(), fn_id.is_some(), op.is_some(), is_dynamic)`,
103/// mirroring TS's data model where `NodeImpl` has no `_kind` field. The
104/// shape uniquely identifies the kind:
105///
106/// | deps      | fn_id | op   | is_dynamic | kind     |
107/// |-----------|-------|------|-----------|----------|
108/// | empty     | None  | None | -         | State    |
109/// | empty     | Some  | None | -         | Producer |
110/// | non-empty | Some  | None | false     | Derived  |
111/// | non-empty | Some  | None | true      | Dynamic  |
112/// | non-empty | None  | Some | -         | Operator |
113///
114/// Public API ([`Core::kind_of`]) derives this enum on each call. State
115/// nodes are ROM (cache survives deactivation); compute nodes
116/// (Derived / Dynamic / Operator) and producers are RAM.
117#[derive(Copy, Clone, Eq, PartialEq, Debug)]
118pub enum NodeKind {
119    /// Source node: cache is intrinsic, no fn, no deps. Mutated via [`Core::emit`].
120    State,
121    /// Producer node: fn fires once on first subscribe. No deps;
122    /// emissions arrive via sinks the fn subscribes to (zip / concat /
123    /// race / takeUntil pattern). Slice D / D031.
124    Producer,
125    /// Derived node: fn fires on every dep change; all deps tracked.
126    Derived,
127    /// Dynamic node: fn declares which dep indices it actually read this run.
128    /// Untracked dep updates flow through cache but do NOT re-fire fn.
129    Dynamic,
130    /// Operator node: built-in dispatch path for transform / combine /
131    /// flow / resilience operators. The `OperatorOp` discriminant selects
132    /// the per-operator FFI path ([`BindingBoundary::project_each`] etc.);
133    /// Core manages per-operator state via the generic `op_scratch` slot
134    /// on `NodeRecord` (D026). Per Slice C-1 (D009) / Slice C-3 (D026).
135    Operator(OperatorOp),
136}
137
138impl NodeKind {
139    /// True if this kind opts OUT of Lock 2.B auto-cascade. Operator(Reduce)
140    /// and Operator(Last) must intercept upstream COMPLETE so they can emit
141    /// their accumulator / buffered value before the cascade terminates them;
142    /// instead of cascading, terminate_node queues such children for fn-fire
143    /// so `fire_operator` can handle the terminal.
144    pub(crate) fn skips_auto_cascade(self) -> bool {
145        matches!(
146            self,
147            NodeKind::Operator(OperatorOp::Reduce { .. } | OperatorOp::Last { .. })
148        )
149    }
150}
151
152/// Built-in operator discriminant. Selects the per-operator dispatch path
153/// in `fire_operator` (`crates/graphrefly-core/src/batch.rs`). Each variant
154/// carries the binding-side closure ids (and seed handle for stateful
155/// folders) needed for the wave-execution path; Core stores no user values
156/// itself per the handle-protocol cleaving plane.
157#[derive(Copy, Clone, Eq, PartialEq, Debug)]
158pub enum OperatorOp {
159    /// `map(source, project)` — element-wise transform. Calls
160    /// `BindingBoundary::project_each(fn_id, &inputs)` per fire; emits each
161    /// returned handle via `commit_emission_verbatim` (R1.3.2.d batch
162    /// semantics — no equals substitution between batch entries).
163    Map { fn_id: FnId },
164    /// `filter(source, predicate)` — silent-drop selection (D012/D018).
165    /// Calls `BindingBoundary::predicate_each(fn_id, &inputs)`; emits each
166    /// passing input verbatim. If zero pass on a wave that dirtied the
167    /// node, queues a single `RESOLVED` to settle (D018).
168    Filter { fn_id: FnId },
169    /// `scan(source, fold, seed)` — left-fold emitting each new accumulator.
170    /// `seed` is captured at registration; `acc` lives in
171    /// [`ScanState`](super::op_state::ScanState) inside
172    /// [`NodeRecord::op_scratch`] and persists across waves until
173    /// resubscribable reset. Calls `BindingBoundary::fold_each(fn_id, acc,
174    /// &inputs) -> SmallVec<HandleId>` per fire.
175    Scan { fn_id: FnId, seed: HandleId },
176    /// `reduce(source, fold, seed)` — left-fold emitting once on upstream
177    /// COMPLETE. Accumulates silently while source DATA flows; on
178    /// dep[0].terminal == Some(Complete), emits `[Data(acc), Complete]`.
179    /// On `Error(h)`, propagates the error verbatim. Opts out of Lock 2.B
180    /// auto-cascade (see `NodeKind::skips_auto_cascade`).
181    Reduce { fn_id: FnId, seed: HandleId },
182    /// `distinctUntilChanged(source, equals)` — suppresses adjacent
183    /// duplicates. Calls `BindingBoundary::custom_equals(equals_fn_id,
184    /// prev, current)` per input; emits non-equal items verbatim and
185    /// updates `prev`. If zero items pass on a wave that dirtied the node,
186    /// queues `RESOLVED` (matches Filter discipline).
187    DistinctUntilChanged { equals_fn_id: FnId },
188    /// `pairwise(source)` — emits `(prev, current)` pairs starting after
189    /// the second value. First value swallowed (sets `prev`). Calls
190    /// `BindingBoundary::pairwise_pack(fn_id, prev, current)` per pair to
191    /// produce the binding-side tuple handle.
192    Pairwise { fn_id: FnId },
193
194    // ----- Slice C-2: multi-dep combinators (D020) -----
195    /// `combine(...sources)` — N-dep combineLatest. On any dep fire, packs
196    /// the latest handle per dep into a single tuple handle via
197    /// `BindingBoundary::pack_tuple(pack_fn, &handles)`. First-run gate
198    /// (`partial: false` default) holds until all deps deliver real DATA
199    /// (R2.5.3). COMPLETE cascades when all deps complete (R1.3.4.b).
200    Combine { pack_fn: FnId },
201
202    /// `withLatestFrom(primary, secondary)` — 2-dep, fire-on-primary-only
203    /// (D021, Phase 10.5). Packs `[primary, secondary]` via
204    /// `BindingBoundary::pack_tuple(pack_fn, &handles)` when dep[0]
205    /// (primary) has DATA in the wave. If only dep[1] (secondary) fires,
206    /// settles with RESOLVED (D018 pattern). First-run gate holds until
207    /// both deps deliver (R2.5.3 `partial: false`). Post-warmup INVALIDATE
208    /// guard: if secondary `prev_data == NO_HANDLE` and batch empty after
209    /// warmup, settles with RESOLVED (no stale pair).
210    WithLatestFrom { pack_fn: FnId },
211
212    /// `merge(...sources)` — N-dep, forward all DATA handles verbatim
213    /// (D022). Zero FFI on fire: no transformation, no binding call.
214    /// Each dep's batch handles are retained and emitted individually.
215    /// COMPLETE cascades when all deps complete (R1.3.4.b).
216    Merge,
217
218    // ----- Slice C-3: flow operators (D024) -----
219    /// `take(source, count)` — emits the first `count` DATA values then
220    /// self-completes via `Core::complete`. Tracks `count_emitted` in
221    /// [`TakeState`](super::op_state::TakeState). When upstream completes
222    /// before `count` is reached, the standard auto-cascade propagates
223    /// COMPLETE. `count == 0` is allowed: the first fire emits zero
224    /// items then immediately self-completes (D027).
225    Take { count: u32 },
226
227    /// `skip(source, count)` — drops the first `count` DATA values; once
228    /// the threshold is crossed, subsequent DATAs pass through verbatim.
229    /// Tracks `count_skipped` in [`SkipState`](super::op_state::SkipState).
230    /// On a wave where every input is still in the skip window, queues
231    /// DIRTY+RESOLVED to settle (D018 pattern).
232    Skip { count: u32 },
233
234    /// `takeWhile(source, predicate)` — emits while `predicate(input)`
235    /// holds; on the first `false`, emits any preceding passes then
236    /// self-completes via `Core::complete`. Reuses
237    /// [`BindingBoundary::predicate_each`] (D029); after the first
238    /// `false`, subsequent inputs in the same batch are dropped.
239    TakeWhile { fn_id: FnId },
240
241    /// `last(source)` / `last_with_default(source, default)` — buffers
242    /// the latest DATA; on upstream COMPLETE, emits `Data(latest)` then
243    /// `Complete`. The `default` field is `NO_HANDLE` for the no-default
244    /// factory (emits only `Complete` on empty stream), or a registered
245    /// default handle (emits `Data(default)` + `Complete` on empty
246    /// stream). Storage: [`LastState`](super::op_state::LastState) holds
247    /// `latest` (live buffer) and `default` (registration-time, stable).
248    /// Opts out of Lock 2.B auto-cascade so it can intercept upstream
249    /// COMPLETE.
250    Last { default: HandleId },
251}
252
253/// Registration options for [`Core::register_operator`].
254///
255/// `equals` controls operator output dedup (R5.7 — defaults to identity).
256/// `partial` controls the R2.5.3 first-run gate (R5.4 — operator dispatch
257/// fires on first DATA from any dep when `true`; default `false` matches
258/// the gated derived discipline).
259#[derive(Copy, Clone, Debug)]
260pub struct OperatorOpts {
261    pub equals: EqualsMode,
262    pub partial: bool,
263}
264
265impl Default for OperatorOpts {
266    fn default() -> Self {
267        Self {
268            equals: EqualsMode::Identity,
269            partial: false,
270        }
271    }
272}
273
274/// Closure-form fn id OR typed operator discriminant — the two dispatch
275/// paths a node can use. State / passthrough nodes pass `None` to
276/// [`Core::register`] (no fn at all).
277#[derive(Copy, Clone, Debug)]
278pub enum NodeFnOrOp {
279    /// Closure-form: invokes [`BindingBoundary::invoke_fn`] per fire.
280    /// Used for Derived / Dynamic / Producer.
281    Fn(FnId),
282    /// Typed-op: routes to a `fire_op_*` helper that calls per-operator
283    /// FFI methods (`project_each` / `predicate_each` / `fold_each` /
284    /// `pairwise_pack` / `pack_tuple`). Used for Operator nodes.
285    Op(OperatorOp),
286}
287
288/// Pause behavior mode (canonical-spec §2.6 — three modes shipped in TS;
289/// Slice F audit, 2026-05-07 — closed the Rust port gap).
290///
291/// | Mode | Outgoing tier-3 routing while paused | RESUME behavior |
292/// |---|---|---|
293/// | [`PausableMode::Default`] | suppress fn-fire upstream (no DIRTY emitted) | fire fn ONCE on RESUME if any dep delivered DATA during pause; collapses N pause-window writes into one settle |
294/// | [`PausableMode::ResumeAll`] | buffer outgoing tier-3 / tier-4 messages per-wave | replay each buffered wave verbatim on RESUME |
295/// | [`PausableMode::Off`] | dispatcher ignores PAUSE; tier-3 flushes immediately | no-op (no buffer to drain) |
296///
297/// Default is [`PausableMode::Default`] per canonical §2.6 — every untagged
298/// source picks it up. Memory profile is O(1) per node (no buffer); the
299/// trade-off is "subscribers see one consolidated DATA on RESUME" rather
300/// than the K mid-pause emissions verbatim.
301///
302/// Note: tier-1 (DIRTY) / tier-2 (PAUSE/RESUME) / tier-5 (COMPLETE/ERROR) /
303/// tier-6 (TEARDOWN) bypass pause regardless of mode — they remain
304/// observable so leaked pause-controllers cannot strand subscribers.
305#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
306pub enum PausableMode {
307    /// Suppress fn-fire while paused; fire once on RESUME if any dep
308    /// delivered DATA during the pause window. Canonical default.
309    #[default]
310    Default,
311    /// Buffer outgoing tier-3 / tier-4 messages per-wave; replay on
312    /// RESUME. Use when subscribers need verbatim emit history (e.g. an
313    /// audit log, replay-on-reconnect bridge).
314    ResumeAll,
315    /// Dispatcher ignores PAUSE for this node — tier-3 flushes
316    /// immediately even while a lock is held. Use for nodes whose value
317    /// production is intrinsically pause-immune (telemetry counters,
318    /// monotonic timers).
319    Off,
320}
321
322/// Per-kind opts for [`Core::register`]. Cross-kind config knobs live
323/// here; per-kind specifics (deps, fn_or_op) live on
324/// [`NodeRegistration`].
325#[derive(Copy, Clone, Debug)]
326pub struct NodeOpts {
327    /// Initial cached value. Only valid for state nodes (no deps + no
328    /// fn + no op). [`NO_HANDLE`] starts the node sentinel.
329    pub initial: HandleId,
330    /// Equality mode for outgoing emissions (R1.3.2). Defaults to
331    /// [`EqualsMode::Identity`].
332    pub equals: EqualsMode,
333    /// First-run gate (R2.5.3 / D011). When `true`, the node fires as
334    /// soon as ANY dep delivers a real handle; when `false` (default),
335    /// the node holds until every dep has delivered.
336    pub partial: bool,
337    /// Dynamic flag (R2.5.3) — fn declares actually-tracked dep indices
338    /// per fire. Only meaningful when `fn_or_op == Some(Fn(_))` AND
339    /// deps non-empty.
340    pub is_dynamic: bool,
341    /// Pause behavior mode (canonical §2.6). Default is
342    /// [`PausableMode::Default`]. See [`PausableMode`] for the trade-offs.
343    pub pausable: PausableMode,
344    /// Replay buffer cap (canonical R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
345    /// `None` (default) disables; `Some(N)` keeps a circular buffer of the
346    /// last N DATA emissions and replays them to late subscribers as part
347    /// of the per-tier handshake (between [`Message::Start`] and any
348    /// terminal slice). Only DATA is buffered; RESOLVED entries are NOT
349    /// (R2.6.5 explicit "DATA only").
350    pub replay_buffer: Option<usize>,
351}
352
353impl Default for NodeOpts {
354    fn default() -> Self {
355        Self {
356            initial: NO_HANDLE,
357            equals: EqualsMode::Identity,
358            partial: false,
359            is_dynamic: false,
360            pausable: PausableMode::Default,
361            replay_buffer: None,
362        }
363    }
364}
365
366/// Unified node-registration descriptor (D030, Slice D).
367///
368/// All node kinds (State / Producer / Derived / Dynamic / Operator)
369/// register through [`Core::register`] with a `NodeRegistration`. The
370/// kind is **derived from the field shape** of the registration —
371/// `(deps.is_empty(), fn_or_op variant)`:
372///
373/// | deps      | fn_or_op   | is_dynamic | resulting kind |
374/// |-----------|-----------|-----------|----------------|
375/// | empty     | None      | -         | State          |
376/// | empty     | Some(Fn)  | -         | Producer       |
377/// | non-empty | Some(Fn)  | false     | Derived        |
378/// | non-empty | Some(Fn)  | true      | Dynamic        |
379/// | non-empty | Some(Op)  | -         | Operator       |
380///
381/// The sugar wrappers ([`Core::register_state`], [`Core::register_producer`],
382/// etc.) build a `NodeRegistration` and delegate.
383#[derive(Clone, Debug)]
384pub struct NodeRegistration {
385    /// Upstream deps in declaration order. Empty for state / producer.
386    pub deps: Vec<NodeId>,
387    /// Closure-form fn id or typed-op discriminant. `None` for state /
388    /// passthrough.
389    pub fn_or_op: Option<NodeFnOrOp>,
390    /// Cross-kind config knobs.
391    pub opts: NodeOpts,
392}
393
394/// Equality mode for a node's outgoing emissions.
395///
396/// `Identity` is the default: cache vs. new handle compare is a `u64` equal —
397/// zero FFI. `Custom` invokes [`BindingBoundary::custom_equals`] every check
398/// (R1.3.2.b two-arg call when both sides are non-sentinel).
399#[derive(Copy, Clone, Debug)]
400pub enum EqualsMode {
401    Identity,
402    Custom(FnId),
403}
404
405/// Internal identifier for a single subscription. Allocated per
406/// [`Core::subscribe`] call. Wrapped by [`Subscription`] for the public API;
407/// consumed directly only by Core internals and the [`Subscription::Drop`]
408/// path.
409#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
410pub(crate) struct SubscriptionId(u64);
411
412/// RAII subscription handle.
413///
414/// Returned by [`Core::subscribe`]. While the handle is held, the sink stays
415/// registered against its node. Dropping the handle (explicitly via
416/// `drop(sub)` or implicitly at scope exit) unsubscribes the sink — no manual
417/// `unsubscribe()` call is needed. Per §10.12 of the rust-port session doc.
418///
419/// # Lifetime semantics
420///
421/// The subscription holds a [`Weak`] reference back to the Core's state. If
422/// the Core is dropped before the subscription, the Drop impl is a silent
423/// no-op (the sink has nowhere to deregister from anyway). This avoids a
424/// reference cycle when subscribers capture an `Arc<Core>` in their closure.
425///
426/// # Thread safety
427///
428/// `Send + Sync`. The handle can be moved across threads or dropped from
429/// any thread.
430///
431/// # Not Clone
432///
433/// `Subscription` owns the unsubscribe action exclusively. Cloning would
434/// require either "first drop wins" or "last drop wins" semantics, both
435/// of which surprise. If a binding needs multiple deregistration handles,
436/// it should subscribe multiple times (each producing a fresh handle) or
437/// wrap the single `Subscription` in `Arc<Mutex<Option<Subscription>>>`.
438#[must_use = "dropping a Subscription unsubscribes its sink immediately"]
439pub struct Subscription {
440    state: Weak<Mutex<CoreState>>,
441    node_id: NodeId,
442    sub_id: SubscriptionId,
443}
444
445impl Subscription {
446    /// The node this subscription is attached to.
447    #[must_use]
448    pub fn node_id(&self) -> NodeId {
449        self.node_id
450    }
451}
452
453impl Drop for Subscription {
454    fn drop(&mut self) {
455        // Silent no-op if Core is gone. This keeps Drop infallible (no panics
456        // from a dropped subscription racing a dropped Core) and avoids
457        // surprising users with errors on shutdown.
458        //
459        // Producer deactivation (Slice D, D031): if removing this sub
460        // empties the subscribers map AND the node is a producer, fire
461        // `BindingBoundary::producer_deactivate(node_id)` AFTER releasing
462        // the state lock. The binding then drops its per-node state
463        // (subscriptions to upstream sources, captured closure state),
464        // which transitively unsubs from upstreams via their own
465        // `Subscription::Drop`. Re-entrance into Core from the deactivate
466        // hook is permitted since the lock is released first.
467        let Some(state) = self.state.upgrade() else {
468            return;
469        };
470        // Slice E2 (D056): when the last subscriber drops, fire the
471        // node's OnDeactivation cleanup hook BEFORE producer_deactivate
472        // (cleanup may release handles the producer subscription owns;
473        // reverse order would let producer_deactivate drop subs that user
474        // cleanup expected to be live). Both calls are lock-released per
475        // D045.
476        //
477        // OnDeactivation gating (D068, QA Q3 fix): fires only when the
478        // node has fired its fn at least once AND has a fn (`fn_id`
479        // populated). State nodes have no fn — they cannot register a
480        // cleanup spec via the production fn-return path (R2.4.5), so
481        // firing `cleanup_for` on them is wasted FFI; the binding's
482        // lookup is guaranteed to find no `current_cleanup`. Skipping
483        // here saves the FFI hop and matches the design-doc wording
484        // ("never-fired state nodes" — state-with-initial-value satisfies
485        // `has_fired_once = true` but still has no fn).
486        //
487        // Slice E2 /qa Q2(b) (D069): if the node is a resubscribable
488        // node that's ALREADY terminal (terminate fired BEFORE this last
489        // sub drop), fire `wipe_ctx` lock-released AFTER OnDeactivation
490        // + producer_deactivate. Mutually exclusive with `terminate_node`'s
491        // queue-wipe site: terminate-with-empty-subs goes through
492        // `pending_wipes`; terminate-with-live-subs routes here when
493        // those subs eventually drop. Either path fires exactly one
494        // wipe per terminal lifecycle.
495        let (was_last_sub, is_producer, has_user_cleanup, fire_wipe, binding) = {
496            let mut s = state.lock();
497            let Some(rec) = s.nodes.get_mut(&self.node_id) else {
498                return;
499            };
500            rec.subscribers.remove(&self.sub_id);
501            let last = rec.subscribers.is_empty();
502            let producer = rec.is_producer();
503            // OnDeactivation gate: must have run a fn at least once
504            // (has_fired_once) AND have a fn registered (fn_id.is_some()).
505            // The fn_id check excludes state nodes whose has_fired_once
506            // tracks initial-value status, not "user fn ran."
507            let user_cleanup = rec.has_fired_once && rec.fn_id.is_some();
508            let fire_wipe = last && rec.resubscribable && rec.terminal.is_some();
509            // Clone the binding Arc out only if at least one hook will
510            // fire. Cheap (Arc::clone) in the common path; skipped on
511            // non-last-sub or never-fired non-producer nodes.
512            let binding = if last && (producer || user_cleanup || fire_wipe) {
513                Some(s.binding.clone())
514            } else {
515                None
516            };
517            (last, producer, user_cleanup, fire_wipe, binding)
518        };
519        if was_last_sub {
520            if let Some(binding) = binding {
521                if has_user_cleanup {
522                    binding.cleanup_for(self.node_id, CleanupTrigger::OnDeactivation);
523                }
524                if is_producer {
525                    binding.producer_deactivate(self.node_id);
526                }
527                // D069: eager wipe — fires AFTER OnDeactivation so the
528                // user closure observes pre-wipe `store` (matches the
529                // existing "OnDeactivation runs before wipe on terminal
530                // reset" invariant covered by test 10). Idempotent —
531                // `HashMap::remove` on absent key is a no-op, so even
532                // if the wave already drained `pending_wipes` earlier,
533                // this fire is benign.
534                if fire_wipe {
535                    binding.wipe_ctx(self.node_id);
536                }
537            }
538        }
539    }
540}
541
542// Compile-time assertion that Subscription is Send + Sync. If a future field
543// breaks this, the build fails here rather than downstream at the binding
544// site.
545const _: fn() = || {
546    fn assert_send_sync<T: Send + Sync>() {}
547    assert_send_sync::<Subscription>();
548};
549
550/// A subscriber callback. `Send + Sync` so the Core can fire it from any
551/// thread; `Fn` (not `FnMut`) so multiple references coexist — capture
552/// mutable state in `Mutex<T>` or atomics on the binding side.
553pub type Sink = Arc<dyn Fn(&[Message]) + Send + Sync>;
554
555// ---------------------------------------------------------------------------
556// PAUSE/RESUME state — §10.2 of the rust-port session doc
557// ---------------------------------------------------------------------------
558
559/// Per-node pause state.
560///
561/// Replaces the four TS fields (`_pauseLocks`, `_pauseBuffer`,
562/// `_pauseDroppedCount`, `_pauseStartNs`) with a single enum where
563/// the buffered fields are unreachable in the [`Self::Active`] variant —
564/// the compiler refuses access. Per §10.2 simplification.
565///
566/// # Invariants
567///
568/// - `Active` ⇔ no lockId held.
569/// - `Paused { locks, .. }` ⇔ `!locks.is_empty()`.
570/// - Buffered messages are tier 3 (DATA/RESOLVED) and tier 4 (INVALIDATE)
571///   only. Other tiers pass through immediately even while paused.
572/// - `dropped` counts messages that fell out the front of `buffer` due to
573///   the Core-global `pause_buffer_cap`; it is reported on resume so callers
574///   can detect overflow without re-tracking it externally.
575#[derive(Debug)]
576pub(crate) enum PauseState {
577    Active,
578    Paused {
579        /// Active lock holders. `SmallVec` keeps the common 1–2 lock case
580        /// stack-allocated. Replaces `Set<unknown>` from TS.
581        locks: SmallVec<[LockId; 2]>,
582        /// Buffered tier-3/tier-4 outgoing messages, in arrival order.
583        /// Replayed on the final RESUME.
584        buffer: VecDeque<Message>,
585        /// Count of messages dropped from the front when `buffer.len()` would
586        /// exceed `pause_buffer_cap`. Cleared on final RESUME (next pause
587        /// cycle starts fresh).
588        dropped: u32,
589        /// Wall-clock-monotonic ns when the lock first transitioned this node
590        /// from `Active` to `Paused`. Used by R1.3.8.c overflow ERROR
591        /// synthesis to compute `lock_held_duration_ms` in the diagnostic
592        /// payload (Slice F, A3 — 2026-05-07).
593        started_at_ns: u64,
594        /// True after the first overflow event in this pause cycle has been
595        /// reported via [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
596        /// Subsequent overflows in the same cycle don't re-emit ERROR
597        /// (canonical R1.3.8.c: "once per overflow event"). Cleared on
598        /// final RESUME (next pause cycle starts fresh).
599        overflow_reported: bool,
600        /// Default-mode bookkeeping (Slice F audit close, 2026-05-07).
601        /// Set to `true` when an upstream dep delivery arrives while this
602        /// node is paused with [`PausableMode::Default`]. On final RESUME,
603        /// if `true`, the node is added back to `pending_fires` so the fn
604        /// fires once with the consolidated dep state. Always `false` for
605        /// `ResumeAll` mode (the buffered messages are the consolidation
606        /// mechanism there). Cleared on final RESUME.
607        pending_wave: bool,
608    },
609}
610
611impl PauseState {
612    pub(crate) fn is_paused(&self) -> bool {
613        matches!(self, Self::Paused { .. })
614    }
615
616    fn lock_count(&self) -> usize {
617        match self {
618            Self::Active => 0,
619            Self::Paused { locks, .. } => locks.len(),
620        }
621    }
622
623    fn contains_lock(&self, lock_id: LockId) -> bool {
624        match self {
625            Self::Active => false,
626            Self::Paused { locks, .. } => locks.contains(&lock_id),
627        }
628    }
629
630    /// Add a lock; transitions Active → Paused on first lock. Idempotent on
631    /// duplicate lock_id (matches TS convention; spec is silent on the case).
632    fn add_lock(&mut self, lock_id: LockId) {
633        match self {
634            Self::Active => {
635                let mut locks = SmallVec::new();
636                locks.push(lock_id);
637                *self = Self::Paused {
638                    locks,
639                    buffer: VecDeque::new(),
640                    dropped: 0,
641                    started_at_ns: monotonic_ns(),
642                    overflow_reported: false,
643                    pending_wave: false,
644                };
645            }
646            Self::Paused { locks, .. } => {
647                if !locks.contains(&lock_id) {
648                    locks.push(lock_id);
649                }
650            }
651        }
652    }
653
654    /// Mark that an upstream dep delivered DATA to a node paused with
655    /// [`PausableMode::Default`]. The node will re-enter `pending_fires`
656    /// on final RESUME via [`Self::take_pending_wave`].
657    pub(crate) fn mark_pending_wave(&mut self) {
658        if let Self::Paused { pending_wave, .. } = self {
659            *pending_wave = true;
660        }
661    }
662
663    /// Read and clear the `pending_wave` flag. Called from
664    /// [`Core::resume`] when transitioning Paused → Active. Returns `true`
665    /// only if the node was paused with `pending_wave` set.
666    pub(crate) fn take_pending_wave(&mut self) -> bool {
667        if let Self::Paused { pending_wave, .. } = self {
668            std::mem::replace(pending_wave, false)
669        } else {
670            false
671        }
672    }
673
674    /// Remove a lock; if the lockset becomes empty, transition Paused →
675    /// Active and return the buffered messages for replay (along with the
676    /// dropped count for diagnostics). Unknown lock_id is an idempotent
677    /// no-op (matches TS, R1.2.6 implicit).
678    fn remove_lock(&mut self, lock_id: LockId) -> Option<(VecDeque<Message>, u32)> {
679        match self {
680            Self::Active => None,
681            Self::Paused { locks, .. } => {
682                if let Some(idx) = locks.iter().position(|l| *l == lock_id) {
683                    locks.swap_remove(idx);
684                }
685                if locks.is_empty() {
686                    let prev = std::mem::replace(self, Self::Active);
687                    if let Self::Paused {
688                        buffer, dropped, ..
689                    } = prev
690                    {
691                        return Some((buffer, dropped));
692                    }
693                }
694                None
695            }
696        }
697    }
698
699    /// Append a message to the buffer; if the buffer would exceed `cap`,
700    /// pop from the front (oldest-first), increment `dropped`, and return
701    /// the dropped messages so the caller can release any payload handles
702    /// they reference. `cap` of `None` means unbounded.
703    ///
704    /// Returns [`PushBufferedResult`] carrying both the dropped messages
705    /// (for refcount release) and whether this push triggered the FIRST
706    /// overflow event in the current pause cycle (for R1.3.8.c ERROR
707    /// synthesis — the caller schedules a single ERROR per cycle).
708    ///
709    /// Note: refcount management for the message's payload handle is the
710    /// caller's responsibility — see [`Core::queue_notify`] for the
711    /// retain/release discipline. The buffer itself is just a message
712    /// container; refcounts cross the binding boundary.
713    pub(crate) fn push_buffered(&mut self, msg: Message, cap: Option<usize>) -> PushBufferedResult {
714        let mut result = PushBufferedResult::default();
715        if let Self::Paused {
716            buffer,
717            dropped,
718            overflow_reported,
719            ..
720        } = self
721        {
722            buffer.push_back(msg);
723            if let Some(c) = cap {
724                while buffer.len() > c {
725                    if let Some(dropped_msg) = buffer.pop_front() {
726                        result.dropped_msgs.push(dropped_msg);
727                    }
728                    *dropped = dropped.saturating_add(1);
729                }
730            }
731            // R1.3.8.c (Slice F, A3): flag first overflow this cycle.
732            if !result.dropped_msgs.is_empty() && !*overflow_reported {
733                *overflow_reported = true;
734                result.first_overflow_this_cycle = true;
735            }
736        }
737        result
738    }
739
740    /// Snapshot the diagnostic for an R1.3.8.c overflow ERROR synthesis.
741    /// Returns `(dropped_count, lock_held_ns)`. Caller must already know
742    /// the configured cap (it's a Core-global value, not per-PauseState).
743    pub(crate) fn overflow_diagnostic(&self) -> Option<(u32, u64)> {
744        match self {
745            Self::Active => None,
746            Self::Paused {
747                dropped,
748                started_at_ns,
749                ..
750            } => {
751                let lock_held_ns = monotonic_ns().saturating_sub(*started_at_ns);
752                Some((*dropped, lock_held_ns))
753            }
754        }
755    }
756}
757
758/// Return shape for [`PauseState::push_buffered`]. Carries both the dropped
759/// messages (for refcount release) and an "is this the first overflow this
760/// cycle" flag (for R1.3.8.c ERROR synthesis scheduling).
761#[derive(Default)]
762pub(crate) struct PushBufferedResult {
763    pub(crate) dropped_msgs: Vec<Message>,
764    pub(crate) first_overflow_this_cycle: bool,
765}
766
767/// Pending R1.3.8.c overflow ERROR synthesis entry. Recorded by
768/// [`Core::queue_notify`] when the pause buffer first overflows in a cycle;
769/// drained at wave-end after the lock-released call to
770/// [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
771///
772/// `configured_max` is captured at scheduling time rather than read at
773/// drain — the user could change `pause_buffer_cap` between schedule and
774/// drain, and the diagnostic reads "the cap that was in effect when the
775/// overflow happened."
776#[derive(Debug, Clone)]
777pub(crate) struct PendingPauseOverflow {
778    pub(crate) node_id: NodeId,
779    pub(crate) dropped_count: u32,
780    pub(crate) configured_max: usize,
781    pub(crate) lock_held_ns: u64,
782}
783
784/// Errors returnable by [`Core::pause`] and [`Core::resume`].
785#[derive(Error, Debug, Clone, PartialEq)]
786pub enum PauseError {
787    #[error("pause/resume: unknown node {0:?}")]
788    UnknownNode(NodeId),
789}
790
791/// Errors returnable by [`Core::up`] (canonical R1.4.1).
792#[derive(Error, Debug, Clone, PartialEq)]
793pub enum UpError {
794    /// Node id is not registered.
795    #[error("up: unknown node {0:?}")]
796    UnknownNode(NodeId),
797    /// Tier-3 (DATA / RESOLVED) and tier-5 (COMPLETE / ERROR) are
798    /// downstream-only per R1.4.1; rejected at the boundary.
799    #[error(
800        "up: tier {tier} is forbidden upstream — value (tier 3) and \
801         terminal-lifecycle (tier 5) planes are downstream-only per R1.4.1"
802    )]
803    TierForbidden { tier: u8 },
804}
805
806/// Errors returnable by [`Core::register`] and its sugar wrappers
807/// ([`Core::register_state`], [`Core::register_producer`],
808/// [`Core::register_derived`], [`Core::register_dynamic`],
809/// [`Core::register_operator`]).
810///
811/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
812/// errors so that callers can recover from contract violations without
813/// process abort. Every variant corresponds to a construction-time
814/// invariant that the caller is responsible for upholding; the dispatcher
815/// rejects the registration before any reactive state is created (so
816/// there is no `Message::Error` channel through which to surface the
817/// failure — these are imperative-layer errors, not reactive ones).
818///
819/// All variants are zero-side-effect: when [`Core::register`] returns
820/// `Err`, no node has been added to the graph and any handle retains
821/// taken on the way in (e.g. operator scratch seed retains via
822/// [`BindingBoundary::retain_handle`]) have been released.
823#[derive(Error, Debug, Clone, PartialEq, Eq)]
824pub enum RegisterError {
825    /// One of the supplied dep ids is not a registered node.
826    #[error("register: unknown dep {0:?}")]
827    UnknownDep(NodeId),
828
829    /// `op` was supplied (operator node) but `deps` was empty. Operator
830    /// nodes need at least one dep — for subscription-managed combinators
831    /// with no declared deps, use [`Core::register_producer`] instead.
832    #[error(
833        "register: operator nodes require at least one dep — \
834         use register_producer for subscription-managed combinators"
835    )]
836    OperatorWithoutDeps,
837
838    /// [`NodeOpts::initial`] was set to a real handle but the registration
839    /// shape is not a state node (state nodes are `deps.is_empty() &&
840    /// fn_id.is_none() && op.is_none()`). Initial cache only makes sense
841    /// for state nodes.
842    #[error("register: NodeOpts::initial only valid for state nodes (no deps + no fn + no op)")]
843    InitialOnlyForStateNodes,
844
845    /// A supplied dep is terminal (COMPLETE / ERROR) AND not
846    /// resubscribable. Adding it would create a permanent wedge — the dep
847    /// will never re-emit, so the registered node would be stuck.
848    /// Mirrors [`SetDepsError::TerminalDep`] at registration time.
849    #[error(
850        "register: dep {0:?} is terminal and not resubscribable; \
851         mark it resubscribable before terminating, or remove it from the dep list"
852    )]
853    TerminalDep(NodeId),
854
855    /// A stateful operator ([`OperatorOp::Scan`] / [`OperatorOp::Reduce`])
856    /// was registered with `seed = NO_HANDLE`. R2.5.3 first-run gate
857    /// requires the seed to be a real handle so that the operator can
858    /// emit on its first fire.
859    #[error("register: operator seed must be a real handle (R2.5.3); got NO_HANDLE")]
860    OperatorSeedSentinel,
861}
862
863/// Errors returnable by [`Core::set_pausable_mode`].
864///
865/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
866/// errors. Same imperative-layer error model as [`RegisterError`].
867#[derive(Error, Debug, Clone, PartialEq, Eq)]
868pub enum SetPausableModeError {
869    /// `node_id` is not a registered node.
870    #[error("set_pausable_mode: unknown node {0:?}")]
871    UnknownNode(NodeId),
872    /// The node currently holds at least one pause lock. Changing pausable
873    /// mode mid-pause would lose buffered content or strand a
874    /// `pending_wave` flag — resume all locks first.
875    #[error(
876        "set_pausable_mode: cannot change pausable mode while paused; \
877         resume all locks first"
878    )]
879    WhilePaused,
880}
881
882/// Per-dep record. Replaces the parallel `deps` / `dep_handles` /
883/// `dep_terminals` vectors from v1. Canonical spec R2.9.b alignment.
884///
885/// Each entry tracks one dep's lifecycle state, wave-scoped batch data,
886/// and cross-wave `prev_data` for `ctx.prevData` access.
887pub(crate) struct DepRecord {
888    /// The dep node this record tracks.
889    pub(crate) node: NodeId,
890    /// Last DATA handle from the end of the previous wave. [`NO_HANDLE`]
891    /// means the dep has never emitted DATA.
892    pub(crate) prev_data: HandleId,
893    /// Per-dep dirty flag — awaiting DATA/RESOLVED for current wave.
894    pub(crate) dirty: bool,
895    /// Per-dep involved-this-wave flag. Distinguishes:
896    /// - `involved && data_batch.is_empty()` → dep settled RESOLVED
897    /// - `!involved && data_batch.is_empty()` → dep was not in this wave
898    pub(crate) involved_this_wave: bool,
899    /// DATA handles accumulated this wave. Outside `batch()` scope, at most
900    /// 1 element. Inside `batch()`, K emits on the source produce K entries
901    /// per R1.3.6.b coalescing. Each handle holds a `retain_handle` share
902    /// taken at `deliver_data_to_consumer` time; released at wave-end
903    /// rotation in `clear_wave_state`.
904    pub(crate) data_batch: SmallVec<[HandleId; 1]>,
905    /// Terminal state for this dep. `None` = dep is live.
906    /// `Some` = dep emitted COMPLETE/ERROR. When ALL entries are Some,
907    /// the node auto-cascades per Lock 2.B (ERROR dominates COMPLETE).
908    pub(crate) terminal: Option<TerminalKind>,
909}
910
911impl DepRecord {
912    fn new(node: NodeId) -> Self {
913        Self {
914            node,
915            prev_data: NO_HANDLE,
916            dirty: false,
917            involved_this_wave: false,
918            data_batch: SmallVec::new(),
919            terminal: None,
920        }
921    }
922}
923
924/// Internal node record. Mirrors `core.ts:132–154` post-D030 unification.
925///
926/// **Kind is derived, not stored** (D030, Slice D). `(dep_records.is_empty(),
927/// fn_id, op, is_dynamic)` uniquely identifies the kind — see [`NodeKind`].
928/// Helper methods (`is_state()`, `is_producer()`, `is_compute()`,
929/// `is_operator()`, `skips_auto_cascade()`, `kind()`) cover the common
930/// predicates without unpacking via [`Core::kind_of`].
931///
932/// The 5 bool fields (`has_fired_once`, `dirty`, `involved_this_wave`,
933/// `has_received_teardown`, `resubscribable`, `is_dynamic`) each represent
934/// an orthogonal concern. `is_dynamic` is constant per node (set at
935/// register time); the others are mutable lifecycle state. Collapsing
936/// them into a bitfield would obscure intent.
937#[allow(clippy::struct_excessive_bools)]
938pub(crate) struct NodeRecord {
939    /// Per-dep records. Replaces the old parallel `deps` / `dep_handles` /
940    /// `dep_terminals` vecs. Dep NodeIds derived via `dep_ids()`.
941    pub(crate) dep_records: Vec<DepRecord>,
942    /// User-fn id for closure-form dispatch. `Some` for Derived / Dynamic /
943    /// Producer; `None` for State / Operator. (Operator dispatch goes via
944    /// [`Self::op`] instead.)
945    pub(crate) fn_id: Option<FnId>,
946    /// Operator discriminant for typed-op dispatch. `Some` for Operator
947    /// nodes; `None` otherwise. Mutually exclusive with `fn_id` (a node is
948    /// either closure-form OR typed-op, never both).
949    pub(crate) op: Option<OperatorOp>,
950    /// True for Dynamic nodes (R2.5.3 — fn declares actually-tracked dep
951    /// indices per fire). False for everything else. Only meaningful when
952    /// `fn_id.is_some()` AND `!dep_records.is_empty()`.
953    pub(crate) is_dynamic: bool,
954    pub(crate) equals: EqualsMode,
955
956    // Mutable state
957    pub(crate) cache: HandleId,
958    pub(crate) has_fired_once: bool,
959    pub(crate) subscribers: HashMap<SubscriptionId, Sink>,
960    /// For dynamic nodes: which dep indices fn actually tracks.
961    /// For static derived: all indices, populated at construction.
962    pub(crate) tracked: HashSet<usize>,
963
964    // Wave-scoped state — cleared at wave end.
965    pub(crate) dirty: bool,
966    pub(crate) involved_this_wave: bool,
967
968    /// Per-node pause state. Default `Active`. See [`PauseState`].
969    pub(crate) pause_state: PauseState,
970    /// Pause behavior mode (canonical-spec §2.6). Set at registration via
971    /// [`NodeOpts::pausable`]. Default [`PausableMode::Default`] suppresses
972    /// fn-fire while paused and consolidates N pause-window dep deliveries
973    /// into one fn-fire on RESUME; `ResumeAll` buffers tier-3/4 outgoing
974    /// for verbatim replay; `Off` ignores PAUSE entirely. See
975    /// [`PausableMode`].
976    pub(crate) pausable: PausableMode,
977    /// Replay buffer cap (R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
978    /// `None` disables; `Some(N)` keeps a circular VecDeque of the last N
979    /// DATA-handle emissions for late-subscriber replay. Each handle in
980    /// the buffer owns one binding-side retain share, released on evict
981    /// (cap exceeded) or in `Drop for CoreState`.
982    pub(crate) replay_buffer_cap: Option<usize>,
983    pub(crate) replay_buffer: VecDeque<HandleId>,
984
985    /// Terminal lifecycle state for THIS node's outgoing stream. Once set,
986    /// further `emit` calls are silent no-ops, fn no longer fires, and only
987    /// the terminal message has been queued downstream.
988    pub(crate) terminal: Option<TerminalKind>,
989    /// True after the first TEARDOWN has been processed for this node
990    /// (R2.6.4 / Lock 6.F). Subsequent TEARDOWN deliveries are idempotent
991    /// — the auto-prepended COMPLETE only fires on the first one. Without
992    /// this flag, a redundant TEARDOWN delivered via the cascade plus an
993    /// explicit `core.teardown(node)` would re-emit `[COMPLETE, TEARDOWN]`
994    /// to subscribers per delivery, which is incorrect.
995    pub(crate) has_received_teardown: bool,
996    /// Per R2.2.7 / R2.5.3 — resubscribable terminal lifecycle.
997    /// When `true` AND `terminal == Some(...)`, a fresh subscribe call
998    /// will reset the node: clear `terminal`, `has_fired_once`,
999    /// `has_received_teardown`, all dep_records to sentinel, and drain the
1000    /// pause lockset. Default `false`.
1001    pub(crate) resubscribable: bool,
1002    /// Meta companion nodes attached to this node per R1.3.9.d. When this
1003    /// node tears down, its meta companions are torn down FIRST (before
1004    /// the main node's auto-COMPLETE + TEARDOWN wire emission), so
1005    /// observers see companions terminate before the parent. The ordering
1006    /// is load-bearing — meta nodes typically subscribe to parent state
1007    /// that becomes inconsistent during the parent's destruction phase.
1008    pub(crate) meta_companions: Vec<NodeId>,
1009    /// R5.4 / D011 partial-mode: when `true`, fire_fn skips the R2.5.3
1010    /// first-run gate — the node fires as soon as ANY dep delivers a
1011    /// real handle, even if other deps remain sentinel. Defaults to
1012    /// `false` (gated). Lifted into Core for operator support; for
1013    /// State/Derived/Dynamic nodes the field is settable but the gated
1014    /// path remains the typical caller default.
1015    pub(crate) partial: bool,
1016    /// Generic per-operator scratch slot (Slice C-3, D026). Replaces
1017    /// the typed `operator_state: HandleId` field used by Slices C-1 / C-2.
1018    /// `None` for non-operator kinds and operators with no cross-wave
1019    /// state (Map / Filter / Combine / WithLatestFrom / Merge); `Some`
1020    /// for stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
1021    /// [`DistinctUntilChanged`] / [`Pairwise`] / [`Take`] / [`Skip`] /
1022    /// [`TakeWhile`] / [`Last`]).
1023    ///
1024    /// The boxed value implements
1025    /// [`OperatorScratch`](crate::op_state::OperatorScratch); its
1026    /// `release_handles` method is called from
1027    /// [`reset_for_fresh_lifecycle`] (resubscribable terminal cycle) and
1028    /// from [`Drop for CoreState`].
1029    ///
1030    /// **Refcount discipline:** the state struct owns whatever handle
1031    /// shares it stores (e.g., [`ScanState::acc`](crate::op_state::ScanState::acc),
1032    /// [`LastState::latest`](crate::op_state::LastState::latest)).
1033    /// Per-fire helpers retain the new value before releasing the old;
1034    /// `release_handles` releases the current shares at end-of-life.
1035    pub(crate) op_scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1036}
1037
1038impl NodeRecord {
1039    // ---- Kind predicates (D030 — derived from field shape) ----
1040
1041    /// True iff this is a state node (no deps, no fn, no op).
1042    pub(crate) fn is_state(&self) -> bool {
1043        self.dep_records.is_empty() && self.fn_id.is_none() && self.op.is_none()
1044    }
1045
1046    /// True iff this is a producer node (no deps + has fn + no op).
1047    /// Producers fire fn once on first subscribe; cleanup fires via
1048    /// [`BindingBoundary::producer_deactivate`] (D031, Slice D).
1049    pub(crate) fn is_producer(&self) -> bool {
1050        self.dep_records.is_empty() && self.fn_id.is_some() && self.op.is_none()
1051    }
1052
1053    /// True iff this is a compute node (Derived / Dynamic / Operator) —
1054    /// has at least one dep AND either a fn or an op.
1055    #[allow(dead_code)] // Convenience predicate; callers may use is_state/is_producer instead.
1056    pub(crate) fn is_compute(&self) -> bool {
1057        !self.dep_records.is_empty() && (self.fn_id.is_some() || self.op.is_some())
1058    }
1059
1060    /// True iff this is an Operator node (has op set).
1061    #[allow(dead_code)] // Direct `op.is_some()` is more common; this is a readability sugar.
1062    pub(crate) fn is_operator(&self) -> bool {
1063        self.op.is_some()
1064    }
1065
1066    /// True iff this node opts OUT of Lock 2.B auto-cascade —
1067    /// Operator(Reduce) / Operator(Last) intercept upstream COMPLETE.
1068    pub(crate) fn skips_auto_cascade(&self) -> bool {
1069        match self.op {
1070            Some(op) => NodeKind::Operator(op).skips_auto_cascade(),
1071            None => false,
1072        }
1073    }
1074
1075    /// Compute the public-API [`NodeKind`] from the field shape (D030).
1076    /// Used by [`Core::kind_of`] and rare internal sites that need the
1077    /// enum (most use the predicate methods above).
1078    pub(crate) fn kind(&self) -> NodeKind {
1079        if let Some(op) = self.op {
1080            NodeKind::Operator(op)
1081        } else if self.dep_records.is_empty() {
1082            if self.fn_id.is_some() {
1083                NodeKind::Producer
1084            } else {
1085                NodeKind::State
1086            }
1087        } else if self.is_dynamic {
1088            NodeKind::Dynamic
1089        } else {
1090            NodeKind::Derived
1091        }
1092    }
1093
1094    // ---- Existing accessors ----
1095
1096    /// Iterator over dep NodeIds in declaration order.
1097    pub(crate) fn dep_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
1098        self.dep_records.iter().map(|r| r.node)
1099    }
1100
1101    /// Collected dep NodeIds — for call sites that need a `Vec<NodeId>`.
1102    pub(crate) fn dep_ids_vec(&self) -> Vec<NodeId> {
1103        self.dep_ids().collect()
1104    }
1105
1106    /// Number of deps.
1107    pub(crate) fn dep_count(&self) -> usize {
1108        self.dep_records.len()
1109    }
1110
1111    /// True if any dep is in sentinel state (never emitted DATA and no
1112    /// data this wave). Replaces the old `dep_handles.contains(&NO_HANDLE)`.
1113    pub(crate) fn has_sentinel_deps(&self) -> bool {
1114        self.dep_records
1115            .iter()
1116            .any(|r| r.prev_data == NO_HANDLE && r.data_batch.is_empty())
1117    }
1118
1119    /// Find the index of a dep by NodeId.
1120    pub(crate) fn dep_index_of(&self, dep_id: NodeId) -> Option<usize> {
1121        self.dep_records.iter().position(|r| r.node == dep_id)
1122    }
1123
1124    /// True if ALL dep terminal slots are populated (Lock 2.B cascade check).
1125    pub(crate) fn all_deps_terminal(&self) -> bool {
1126        !self.dep_records.is_empty() && self.dep_records.iter().all(|r| r.terminal.is_some())
1127    }
1128}
1129
1130/// All mutable Core state, behind one [`parking_lot::Mutex`].
1131///
1132/// v1 single-mutex; per-subgraph `ReentrantMutex` parallelism is a later
1133/// optimization (CLAUDE.md Rust invariant 3).
1134pub(crate) struct CoreState {
1135    pub(crate) next_node_id: u64,
1136    pub(crate) next_subscription_id: u64,
1137    pub(crate) next_lock_id: u64,
1138    pub(crate) nodes: HashMap<NodeId, NodeRecord>,
1139    /// Inverted adjacency: `parent → children`. Updated on registration.
1140    pub(crate) children: HashMap<NodeId, HashSet<NodeId>>,
1141    /// Nodes whose fn we owe a fire to — drained by [`Core::run_wave`].
1142    pub(crate) pending_fires: HashSet<NodeId>,
1143    /// Per-node outgoing message buffer; flushed at wave end. Insertion-
1144    /// ordered so flush order is deterministic — load-bearing for
1145    /// R1.3.9.d meta-TEARDOWN ordering: when a parent and its meta
1146    /// companion both have queued messages in the same wave, the meta
1147    /// (queued first via `teardown_inner`'s recursion order) flushes
1148    /// first.
1149    ///
1150    /// Each entry carries the per-wave subscriber snapshot taken at first
1151    /// touch (Slice A close, M1: lock-released drain). Late subscribers
1152    /// installed mid-wave between fn-fire iterations don't appear in
1153    /// already-snapshotted entries; this is the load-bearing fix that
1154    /// prevents duplicate-Data delivery when a handshake delivers the
1155    /// post-commit cache and the wave's flush would otherwise also fire
1156    /// to the same sink.
1157    pub(crate) pending_notify: IndexMap<NodeId, PendingPerNode>,
1158    pub(crate) in_tick: bool,
1159    /// Core-global cap on per-node pause replay buffer length. `None` means
1160    /// unbounded. Per the user direction (Q1, 2026-05-05): start core-global;
1161    /// per-node override can be added later as a pure addition without API
1162    /// breakage. Default `None`.
1163    pub(crate) pause_buffer_cap: Option<usize>,
1164    /// Core-global cap on wave-drain iterations before
1165    /// [`crate::batch::Core::drain_and_flush`] aborts with a diagnostic panic.
1166    /// Replaces the prior `MAX_DRAIN_ITERATIONS` hard-coded constant
1167    /// (R4.3 / Lock 2.F′). Default `10_000`.
1168    ///
1169    /// The drain loop bound exists to surface runtime cycles
1170    /// (e.g. an operator that re-arms its own `pending_fires` slot during
1171    /// `invoke_fn`) as a panic with context, rather than letting Core
1172    /// spin forever. Structural cycles via [`Core::set_deps`] are
1173    /// rejected at edge-mutation time (`SetDepsError::WouldCreateCycle`);
1174    /// registration is structurally cycle-safe by construction (the new
1175    /// node's id is not allocated until AFTER deps are validated, so deps
1176    /// cannot transitively reach the new node). The drain bound is the
1177    /// safety net for runtime cycles that bypass both static checks.
1178    pub(crate) max_batch_drain_iterations: u32,
1179    /// Deferred sink-fire jobs collected by `flush_notifications`. The wave
1180    /// engine populates this under the state lock during the flush phase;
1181    /// `run_wave` then drops the lock and fires the jobs. Each tuple is
1182    /// `(sinks_for_one_node_one_phase, phase_messages)`. Empty between waves.
1183    pub(crate) deferred_flush_jobs: crate::batch::DeferredJobs,
1184    /// Payload-handle releases owed for messages that landed in
1185    /// `pending_notify` during this wave (one per `payload_handle()`).
1186    /// `run_wave` releases these after sinks fire and the lock is dropped,
1187    /// balancing the retain done in `queue_notify`.
1188    pub(crate) deferred_handle_releases: Vec<HandleId>,
1189    /// Binding-boundary handle for `Drop`-time refcount balancing.
1190    /// `Core` also holds a clone of this Arc; storing it here lets
1191    /// `Drop for CoreState` walk every retained slot and release the
1192    /// binding-side share when the last `Core` clone drops. Without this,
1193    /// `cache` / `terminal` / `dep_terminals` Error / pause-buffer payload
1194    /// handle refs leak in the binding registry until process exit.
1195    pub(crate) binding: Arc<dyn BindingBoundary>,
1196    /// Pre-wave cache snapshots used to restore state if the wave aborts
1197    /// mid-flight (e.g., a `Core::batch` closure panics). Each entry is
1198    /// `(node_id → old_cache_handle)` — the handle the node held BEFORE
1199    /// the wave started writing to it. The snapshotted handle holds a
1200    /// retain (taken when the snapshot was inserted) so it stays alive
1201    /// for restoration. On wave success, snapshots are dropped and their
1202    /// retains released. On wave abort (`BatchGuard::drop` panic-discard
1203    /// path), each cache slot is restored from the snapshot — the slot's
1204    /// current handle is released, and the snapshot's retain transfers
1205    /// to the cache slot. Only populated for in-flight waves; empty
1206    /// between waves.
1207    pub(crate) wave_cache_snapshots: HashMap<NodeId, HandleId>,
1208    /// Nodes that need an auto-Resolved at wave end if they don't receive
1209    /// a tier-3+ message from their own commit_emission. Populated by
1210    /// the RESOLVED child propagation in `commit_emission` (which queues
1211    /// Dirty but defers Resolved to avoid double-settlement). Drained by
1212    /// the auto-resolve sweep in `drain_and_flush`. Cleared by
1213    /// `clear_wave_state`.
1214    pub(crate) pending_auto_resolve: ahash::AHashSet<NodeId>,
1215    /// Topology-change sinks. Keyed by subscription id for O(1) removal.
1216    pub(crate) topology_sinks: HashMap<u64, crate::topology::TopologySink>,
1217    pub(crate) next_topology_id: u64,
1218    /// A6 reentrancy guard (Slice F, 2026-05-07): the stack of NodeIds whose
1219    /// fn is currently being invoked on the wave-owner thread. Pushed at the
1220    /// top of `fire_fn` (just before the lock-released `invoke_fn` call) and
1221    /// popped on return / unwind via the [`crate::batch::FiringGuard`] RAII
1222    /// helper. [`Core::set_deps`] consults this set and rejects with
1223    /// [`SetDepsError::ReentrantOnFiringNode`] if `n` is currently firing —
1224    /// preventing the D1 `tracked` index corruption (see
1225    /// `porting-deferred.md` "Set_deps from inside firing node's fn corrupts
1226    /// Dynamic `tracked` indices").
1227    ///
1228    /// Stack rather than set so nested fn re-entrance (Producer subscribing
1229    /// to a fn that itself fires another fn) tracks every concurrently-firing
1230    /// node on the wave-owner. `Vec` rather than `HashSet` because the
1231    /// expected depth is small (typically 1, occasionally 2–3 with
1232    /// higher-order operators) and linear scan is faster than hash for that
1233    /// size.
1234    pub(crate) currently_firing: Vec<NodeId>,
1235    /// R1.3.8.c pause-overflow ERROR synthesis queue (Slice F, A3 —
1236    /// 2026-05-07). Recorded by [`Core::queue_notify`] when the pause
1237    /// buffer first overflows in a cycle; drained at wave-end after the
1238    /// lock-released call to
1239    /// [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
1240    ///
1241    /// One entry per (node × pause-cycle); subsequent overflows in the
1242    /// same cycle don't re-queue (gated by `PauseState::overflow_reported`).
1243    pub(crate) pending_pause_overflow: Vec<PendingPauseOverflow>,
1244    /// Slice G (R1.3.2.d / R1.3.3.a — 2026-05-07): nodes that have emitted
1245    /// at least one tier-3 message (Data or Resolved) in the CURRENT wave.
1246    /// Wave-scoped (cleared in `clear_wave_state`). Used by
1247    /// [`crate::batch::Core::commit_emission`] to detect "this is a
1248    /// subsequent emit at this node in the same wave" — when set,
1249    /// equals substitution is skipped (would produce a R1.3.3.a-violating
1250    /// mixed wave) and any prior Resolved entries in pending_notify or
1251    /// the pause buffer are rewritten to Data using the wave-start cache
1252    /// snapshot.
1253    ///
1254    /// Distinct from `pending_pause_overflow` (per-pause-cycle, not
1255    /// per-wave) and `wave_cache_snapshots` (per-wave snapshot, but only
1256    /// populated on Data path pre-Slice-G). Populated by both Data and
1257    /// Resolved branches of `commit_emission`; NOT populated by
1258    /// `commit_emission_verbatim` (Batch path passes through verbatim
1259    /// per R1.3.3.c).
1260    pub(crate) tier3_emitted_this_wave: ahash::AHashSet<NodeId>,
1261    /// Slice E2 (R1.3.9.b strict reading per D057): per-wave-per-node
1262    /// dedup for `OnInvalidate` cleanup hook firing. A node already in
1263    /// this set this wave has already had its `OnInvalidate` queued into
1264    /// `deferred_cleanup_hooks` and MUST NOT queue again, even if
1265    /// `invalidate_inner` re-encounters it (rare: only matters when the
1266    /// node re-populates mid-wave via fn-fire and then gets re-invalidated
1267    /// in the same wave through a separate path).
1268    ///
1269    /// Cleared in [`CoreState::clear_wave_state`] alongside the other
1270    /// wave-scoped queues.
1271    pub(crate) invalidate_hooks_fired_this_wave: ahash::AHashSet<NodeId>,
1272    /// Slice E2 (per D060/D061): lock-released drain queue for
1273    /// `OnInvalidate` cleanup hooks. Populated under the state lock by
1274    /// `Core::invalidate_inner` when a node's cache transitions
1275    /// `!= NO_HANDLE → NO_HANDLE`; drained after the lock drops at wave
1276    /// boundary by [`crate::batch::Core::fire_deferred_cleanup_hooks`]
1277    /// (each call wrapped in `catch_unwind` so a single binding panic
1278    /// doesn't short-circuit the drain — last panic re-raises after the
1279    /// loop completes per D060).
1280    ///
1281    /// **Panic-discard semantics (D061):** cleared in
1282    /// [`CoreState::clear_wave_state`] without firing — a panic-discarded
1283    /// wave drops the queued cleanup hooks silently, mirroring the
1284    /// `pending_pause_overflow` precedent (Slice F /qa A3). Bindings using
1285    /// `OnInvalidate` for external-resource cleanup MUST idempotent-cleanup
1286    /// at process exit or next successful invalidate cycle.
1287    pub(crate) deferred_cleanup_hooks: Vec<(NodeId, crate::boundary::CleanupTrigger)>,
1288    /// Slice E2 /qa Q2(b) (D069): lock-released drain queue for
1289    /// `BindingBoundary::wipe_ctx` calls fired eagerly from
1290    /// `Core::terminate_node` when a resubscribable node terminates with
1291    /// no live subscribers. Pairs with the `Subscription::Drop` direct-
1292    /// fire site (mutually exclusive: subs-empty-at-terminate routes
1293    /// here; subs-non-empty-at-terminate fires from Subscription::Drop's
1294    /// last-sub-drop path). Drained alongside `deferred_cleanup_hooks`
1295    /// at wave boundary; same `catch_unwind` discipline so a single
1296    /// binding panic doesn't short-circuit the drain. Same panic-discard
1297    /// semantics as `deferred_cleanup_hooks` (silent drop on
1298    /// panic-discarded waves).
1299    pub(crate) pending_wipes: Vec<NodeId>,
1300}
1301
1302/// The handle-protocol Core dispatcher.
1303///
1304/// Holds an [`Arc`] to the [`BindingBoundary`] and all dispatch state. Cheap
1305/// to clone (the inner `Arc<Mutex<CoreState>>` is shared); pass `Core` by
1306/// value to threads.
1307///
1308/// # Wave-owner re-entrant mutex (Slice A close /qa, M1)
1309///
1310/// The state lock (`state: Mutex<CoreState>`) is **dropped** around binding
1311/// callbacks (`invoke_fn`, `custom_equals`) so user fns may re-enter Core.
1312/// To preserve serializability of WAVE EXECUTION across threads — without
1313/// re-introducing the lock-held-during-fn-fire deadlock the Slice A close
1314/// refactor lifted — the wave engine acquires `wave_owner` (a
1315/// [`parking_lot::ReentrantMutex`]) for the lifetime of each wave.
1316///
1317/// Properties:
1318///
1319/// - **Same-thread re-entrance is free.** A user fn that calls back into
1320///   `Core::emit` / `Core::pause` / etc. mid-fire re-acquires `wave_owner`
1321///   on the same thread and runs as a nested wave (the inner `run_wave`
1322///   sees `in_tick=true` and skips drain — outer drain picks up).
1323/// - **Cross-thread emits BLOCK** at `wave_owner.lock_arc()` until the
1324///   in-flight wave completes (drain + flush + sink fire all done). This
1325///   serializes wave OWNERSHIP across threads, while still allowing the
1326///   state lock to drop inside the wave for binding callbacks.
1327///
1328/// Without this, Slice A close's lock-released drain let cross-thread
1329/// emits absorb into the in-flight wave's `pending_notify` and return
1330/// before subscribers fire — breaking the user-facing happens-after
1331/// contract that `emit` returning means subscribers have observed.
1332#[derive(Clone)]
1333pub struct Core {
1334    pub(crate) state: Arc<Mutex<CoreState>>,
1335    pub(crate) binding: Arc<dyn BindingBoundary>,
1336    pub(crate) wave_owner: Arc<ReentrantMutex<()>>,
1337}
1338
1339/// Weak handle to a [`Core`] — does not contribute to strong refcount.
1340///
1341/// Constructed via [`Core::weak_handle`]; upgraded back to a strong
1342/// [`Core`] via [`WeakCore::upgrade`]. Used by long-lived binding-stored
1343/// closures (notably `ProducerBuildFn`s registered via
1344/// [`graphrefly_operators::ProducerBinding::register_producer_build`])
1345/// to break the BenchBinding → registry → closure → strong-Core cycle
1346/// that would otherwise leak the entire graph state when a `BenchCore`
1347/// drops with active producer registrations.
1348///
1349/// Upgrade on each invocation; if the host `Core` was already dropped,
1350/// `upgrade()` returns `None` and the closure should no-op (the host
1351/// is being torn down, no work to do).
1352#[derive(Clone)]
1353pub struct WeakCore {
1354    state: Weak<Mutex<CoreState>>,
1355    binding: Weak<dyn BindingBoundary>,
1356    wave_owner: Weak<ReentrantMutex<()>>,
1357}
1358
1359impl WeakCore {
1360    /// Try to upgrade back to a strong [`Core`]. Returns `None` if the
1361    /// host `Core`'s strong count has reached zero (i.e. the host
1362    /// `BenchCore` / equivalent owner was dropped).
1363    #[must_use]
1364    pub fn upgrade(&self) -> Option<Core> {
1365        Some(Core {
1366            state: self.state.upgrade()?,
1367            binding: self.binding.upgrade()?,
1368            wave_owner: self.wave_owner.upgrade()?,
1369        })
1370    }
1371}
1372
1373/// RAII guard that owns an [`OperatorScratch`] until either (a) the
1374/// caller `take()`s it for installation, or (b) the guard drops on an
1375/// early return / unwind, in which case the scratch's handle retains
1376/// are released via [`OperatorScratch::release_handles`].
1377///
1378/// Slice H /qa F1 + F2 (2026-05-07): closes two related correctness
1379/// gaps in `Core::register`:
1380///
1381/// 1. **TOCTOU window** — the original three-phase split called
1382///    `lock_state()` twice (once for validation, once for insertion),
1383///    so a concurrent `Core::complete(dep)` on a non-resubscribable
1384///    dep could slip in between the two acquisitions and re-create
1385///    the wedge `RegisterError::TerminalDep` was designed to prevent.
1386///    The guard plus a single locked region for both phases closes
1387///    this gap (release runs lock-released because guard variables
1388///    drop in reverse declaration order — guard declared BEFORE
1389///    `lock_state()`, so the lock guard drops first).
1390///
1391/// 2. **Panic-unsafe scratch leak** — without an RAII drop, a panic
1392///    between `make_op_scratch` (Phase 2) and the explicit
1393///    `if let Err(e)` cleanup branch (e.g., `lock_state()` reentrance
1394///    assert, OOM-as-panic on Vec growth in dep iteration) would
1395///    drop the `Box<dyn OperatorScratch>` without releasing the
1396///    seed/default retain. The guard's `Drop` impl releases on any
1397///    unwind path.
1398///
1399/// Lock-discipline: the guard holds `&dyn BindingBoundary` (through
1400/// the `Arc<dyn BindingBoundary>` it borrows from). On `Drop`, it
1401/// invokes `release_handles` lock-released — fires AFTER any
1402/// `MutexGuard<CoreState>` declared later in the same scope drops
1403/// (LIFO destruction order). Mirrors `Core::resume` Phase 2 release
1404/// pattern.
1405struct ScratchReleaseGuard<'a> {
1406    scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1407    binding: &'a dyn BindingBoundary,
1408}
1409
1410impl<'a> ScratchReleaseGuard<'a> {
1411    fn new(
1412        scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1413        binding: &'a dyn BindingBoundary,
1414    ) -> Self {
1415        Self { scratch, binding }
1416    }
1417
1418    /// Take ownership of the scratch — disarms the release-on-drop
1419    /// behavior. Used on the success path to install the scratch on
1420    /// `NodeRecord.op_scratch`.
1421    fn take(mut self) -> Option<Box<dyn crate::op_state::OperatorScratch>> {
1422        self.scratch.take()
1423    }
1424}
1425
1426impl Drop for ScratchReleaseGuard<'_> {
1427    fn drop(&mut self) {
1428        if let Some(mut scratch) = self.scratch.take() {
1429            scratch.release_handles(self.binding);
1430        }
1431    }
1432}
1433
1434impl Core {
1435    /// Construct a fresh Core wired to the given binding. Pause buffer cap
1436    /// defaults to unbounded; set via [`Self::set_pause_buffer_cap`].
1437    #[must_use]
1438    pub fn new(binding: Arc<dyn BindingBoundary>) -> Self {
1439        Self {
1440            state: Arc::new(Mutex::new(CoreState {
1441                next_node_id: 1,
1442                next_subscription_id: 1,
1443                // A4 (Slice F, 2026-05-07): start `next_lock_id` in the high
1444                // half of the u32 range so `alloc_lock_id` can't collide with
1445                // user-supplied `LockId::new(N)` constructors (which the
1446                // napi-rs binding marshals from `u32` and tests typically use
1447                // in the low range, 1..1024). Phase E /qa F1 (2026-05-08):
1448                // lowered from `1u64 << 32` to `1u64 << 31` so the value
1449                // round-trips through `u32::try_from(...)` at the napi
1450                // boundary — the previous seed errored every napi
1451                // `alloc_lock_id` call. Anti-collision intent (high range vs
1452                // low user range) preserved at half the prior ceiling
1453                // (2^31 ≈ 2 billion allocations per Core, ample for parity
1454                // tests). Lift the floor when the deferred BigInt-narrowing
1455                // migration extends `LockId` to `u64` at the FFI layer
1456                // (porting-deferred "BigInt migration for u32-narrowed napi
1457                // types" entry).
1458                next_lock_id: 1u64 << 31,
1459                nodes: HashMap::new(),
1460                children: HashMap::new(),
1461                pending_fires: HashSet::new(),
1462                pending_notify: IndexMap::new(),
1463                in_tick: false,
1464                pause_buffer_cap: None,
1465                max_batch_drain_iterations: 10_000,
1466                deferred_flush_jobs: Vec::new(),
1467                deferred_handle_releases: Vec::new(),
1468                binding: binding.clone(),
1469                wave_cache_snapshots: HashMap::new(),
1470                pending_auto_resolve: ahash::AHashSet::new(),
1471                topology_sinks: HashMap::new(),
1472                next_topology_id: 1,
1473                currently_firing: Vec::new(),
1474                pending_pause_overflow: Vec::new(),
1475                tier3_emitted_this_wave: ahash::AHashSet::new(),
1476                invalidate_hooks_fired_this_wave: ahash::AHashSet::new(),
1477                deferred_cleanup_hooks: Vec::new(),
1478                pending_wipes: Vec::new(),
1479            })),
1480            binding,
1481            wave_owner: Arc::new(ReentrantMutex::new(())),
1482        }
1483    }
1484
1485    /// Acquire the state lock.
1486    ///
1487    /// Post-Slice-E: `Core::subscribe` fires the per-tier handshake
1488    /// LOCK-RELEASED with `wave_owner` held; sink callbacks may freely
1489    /// re-enter Core (`emit` / `complete` / `error` / nested `subscribe`).
1490    /// Same-thread re-entry passes through `wave_owner`'s `ReentrantMutex`
1491    /// transparently; cross-thread emits block on `wave_owner` until the
1492    /// outer subscribe completes, preserving R1.3.5.a happens-after
1493    /// ordering. The previous `IN_HANDSHAKE_FIRE` panic-diagnostic is no
1494    /// longer needed.
1495    pub(crate) fn lock_state(&self) -> MutexGuard<'_, CoreState> {
1496        self.state.lock()
1497    }
1498
1499    /// Whether `self` and `other` point to the same dispatcher state.
1500    /// True when one was produced by `Clone`-ing the other (or they
1501    /// were both cloned from a common ancestor); false for two
1502    /// independently `Core::new`-constructed instances even with the
1503    /// same binding.
1504    ///
1505    /// Used by `graphrefly-graph`'s `mount` to enforce the "shared-Core
1506    /// only" v1 invariant — cross-Core mount is post-M6.
1507    #[must_use]
1508    pub fn same_dispatcher(&self, other: &Core) -> bool {
1509        Arc::ptr_eq(&self.state, &other.state)
1510    }
1511
1512    /// Downgrade to a [`WeakCore`] handle that doesn't contribute to
1513    /// strong refcount of the underlying state / binding / wave_owner.
1514    ///
1515    /// Used by binding-stored long-lived closures (e.g.
1516    /// `register_producer_build`-stored `ProducerBuildFn`s) to avoid the
1517    /// Arc cycle:
1518    ///
1519    /// ```text
1520    /// BenchBinding → registry → producer_builds[fn_id]
1521    ///   → closure → strong Arc<dyn _Binding> → BenchBinding
1522    /// ```
1523    ///
1524    /// Closures hold `WeakCore` and `Weak<dyn _Binding>` instead, then
1525    /// upgrade-on-fire (returning early if either weak is dangling —
1526    /// indicating the host BenchCore was already dropped). Upgraded
1527    /// strong refs live only for the build closure's invocation; sinks
1528    /// the build closure spawns close over those upgraded strongs and
1529    /// stay alive only while the producer is active (cleared via
1530    /// `producer_deactivate` on last-subscriber unsubscribe).
1531    #[must_use]
1532    pub fn weak_handle(&self) -> WeakCore {
1533        WeakCore {
1534            state: Arc::downgrade(&self.state),
1535            binding: Arc::downgrade(&self.binding),
1536            wave_owner: Arc::downgrade(&self.wave_owner),
1537        }
1538    }
1539
1540    /// Configure the Core-global cap on pause replay buffer length. When set,
1541    /// any per-node pause buffer that would exceed `cap` drops the oldest
1542    /// message(s) from the front; the dropped count is reported back via the
1543    /// resume callback (see [`ResumeReport`]). `None` (default) means
1544    /// unbounded; messages buffer indefinitely until the lockset clears.
1545    pub fn set_pause_buffer_cap(&self, cap: Option<usize>) {
1546        self.lock_state().pause_buffer_cap = cap;
1547    }
1548
1549    /// Configure the replay buffer cap on `node_id` (R2.6.5 / Lock 6.G —
1550    /// Slice E1, 2026-05-07). `None` disables the buffer. `Some(N)` keeps
1551    /// the last `N` DATA emissions in a circular buffer; late subscribers
1552    /// receive them as part of the per-tier handshake (between START and
1553    /// any terminal). Switching from a larger cap to a smaller cap evicts
1554    /// the front of the buffer to fit; switching to `None` drains the
1555    /// buffer entirely. Each evicted/drained handle's retain is released
1556    /// back to the binding.
1557    ///
1558    /// # Panics
1559    ///
1560    /// Panics if `node_id` is not registered.
1561    pub fn set_replay_buffer_cap(&self, node_id: NodeId, cap: Option<usize>) {
1562        // QA A7 (2026-05-07): normalize `Some(0)` to `None`. Two ways to
1563        // express "disabled" is confusing: `push_replay_buffer` already
1564        // treats `Some(0)` as no-op, so persisting it adds nothing.
1565        let cap = match cap {
1566            Some(0) => None,
1567            other => other,
1568        };
1569        let to_release: Vec<HandleId> = {
1570            let mut s = self.lock_state();
1571            let rec = s.require_node_mut(node_id);
1572            rec.replay_buffer_cap = cap;
1573            match cap {
1574                None => rec.replay_buffer.drain(..).collect(),
1575                Some(c) => {
1576                    let mut drained = Vec::new();
1577                    while rec.replay_buffer.len() > c {
1578                        if let Some(h) = rec.replay_buffer.pop_front() {
1579                            drained.push(h);
1580                        }
1581                    }
1582                    drained
1583                }
1584            }
1585        };
1586        for h in to_release {
1587            self.binding.release_handle(h);
1588        }
1589    }
1590
1591    /// Reconfigure the pause mode for `node_id` (canonical §2.6 — Slice F
1592    /// audit close, 2026-05-07). Default for new nodes is
1593    /// [`PausableMode::Default`]; switch to [`PausableMode::ResumeAll`]
1594    /// for nodes whose pause-window emit history must be observable
1595    /// verbatim, or [`PausableMode::Off`] for nodes intrinsically
1596    /// pause-immune.
1597    ///
1598    /// # Errors
1599    ///
1600    /// - [`SetPausableModeError::UnknownNode`] — `node_id` is not
1601    ///   registered.
1602    /// - [`SetPausableModeError::WhilePaused`] — the node currently
1603    ///   holds at least one pause lock. Changing mode mid-pause would
1604    ///   lose buffered content or strand a `pending_wave` flag — resume
1605    ///   all locks first.
1606    pub fn set_pausable_mode(
1607        &self,
1608        node_id: NodeId,
1609        mode: PausableMode,
1610    ) -> Result<(), SetPausableModeError> {
1611        let mut s = self.lock_state();
1612        let rec = s
1613            .nodes
1614            .get_mut(&node_id)
1615            .ok_or(SetPausableModeError::UnknownNode(node_id))?;
1616        if rec.pause_state.is_paused() {
1617            return Err(SetPausableModeError::WhilePaused);
1618        }
1619        rec.pausable = mode;
1620        Ok(())
1621    }
1622
1623    /// Configure the wave-drain iteration cap (R4.3 / Lock 2.F′). The wave
1624    /// engine aborts a drain after `cap` iterations with a diagnostic panic.
1625    /// Default is `10_000` — high enough to avoid false positives on legitimate
1626    /// fan-in cascades, low enough to surface runtime cycles within seconds.
1627    ///
1628    /// Lower this only when running adversarial / property-based tests that
1629    /// want fast cycle detection. Raise it only with concrete evidence that a
1630    /// legitimate workload needs more iterations than the default — and even
1631    /// then, prefer to tune the workload (per-subgraph batching, etc.) over
1632    /// raising the cap.
1633    ///
1634    /// # Panics
1635    ///
1636    /// Panics if `cap == 0` — a zero cap would abort every wave on the very
1637    /// first iteration, deadlocking any subsequent dispatcher work.
1638    pub fn set_max_batch_drain_iterations(&self, cap: u32) {
1639        assert!(cap > 0, "max_batch_drain_iterations must be > 0");
1640        self.lock_state().max_batch_drain_iterations = cap;
1641    }
1642
1643    /// Send a message UPSTREAM from `node_id` to each of its declared deps
1644    /// (canonical R1.4.1 — Slice F audit, F2 / 2026-05-07).
1645    ///
1646    /// The dispatcher rejects tier-3 (DATA / RESOLVED) and tier-5
1647    /// (COMPLETE / ERROR) per R1.4.1: value and terminal-lifecycle planes
1648    /// are downstream-only. All other tiers (0 START, 1 DIRTY, 2 PAUSE /
1649    /// RESUME, 4 INVALIDATE, 6 TEARDOWN) pass.
1650    ///
1651    /// # Routing per tier
1652    ///
1653    /// - **Tier 0 ([`Message::Start`]):** no-op. START is a per-subscription
1654    ///   handshake, not a routable wire signal — sending it upstream has no
1655    ///   well-defined target.
1656    /// - **Tier 1 ([`Message::Dirty`]):** no-op. The dep's "something
1657    ///   changed" notification is its own [`Self::emit`] / commit
1658    ///   responsibility; ignoring upstream DIRTY hints is safe.
1659    /// - **Tier 2 ([`Message::Pause`] / [`Message::Resume`]):** translates
1660    ///   to [`Self::pause`] / [`Self::resume`] on each dep. Lock id is
1661    ///   forwarded verbatim. Errors from individual deps are accumulated
1662    ///   in the `dep_errors` field of the returned report.
1663    /// - **Tier 4 ([`Message::Invalidate`]):** translates to
1664    ///   [`Self::invalidate`] on each dep. Note: canonical R1.4.2
1665    ///   distinguishes "downstream INVALIDATE" (cache clear + cascade) from
1666    ///   "upstream INVALIDATE" (plain forward, no self-process). The Rust
1667    ///   port v1 SIMPLIFICATION delegates to the same `Core::invalidate`
1668    ///   path — upstream INVALIDATE here DOES clear dep caches and cascade.
1669    ///   If a "plain forward" mode surfaces as a real consumer need, add
1670    ///   `up_with_options`.
1671    /// - **Tier 6 ([`Message::Teardown`]):** translates to
1672    ///   [`Self::teardown`] on each dep. Cascades per the standard
1673    ///   teardown path.
1674    ///
1675    /// # Errors
1676    ///
1677    /// - [`UpError::UnknownNode`] — `node_id` is not registered.
1678    /// - [`UpError::TierForbidden`] — tier 3 or tier 5.
1679    pub fn up(&self, node_id: NodeId, message: Message) -> Result<(), UpError> {
1680        // QA A10 (2026-05-07): check unknown node BEFORE tier rejection
1681        // for consistent error UX — `up(unknown, Data)` and
1682        // `up(unknown, Pause)` both report `UnknownNode` rather than
1683        // splitting on the tier.
1684        let dep_ids: Vec<NodeId> = {
1685            let s = self.lock_state();
1686            let rec = s.nodes.get(&node_id).ok_or(UpError::UnknownNode(node_id))?;
1687            rec.dep_ids_vec()
1688        };
1689        let tier = message.tier();
1690        if tier == 3 || tier == 5 {
1691            return Err(UpError::TierForbidden { tier });
1692        }
1693        for dep_id in dep_ids {
1694            match message {
1695                Message::Pause(lock) => {
1696                    let _ = self.pause(dep_id, lock);
1697                }
1698                Message::Resume(lock) => {
1699                    let _ = self.resume(dep_id, lock);
1700                }
1701                Message::Invalidate => {
1702                    self.invalidate(dep_id);
1703                }
1704                Message::Teardown => {
1705                    self.teardown(dep_id);
1706                }
1707                // Tier 0 START + tier 1 DIRTY: no-op upstream per the
1708                // routing table above.
1709                _ => {}
1710            }
1711        }
1712        Ok(())
1713    }
1714
1715    /// Allocate a unique [`LockId`] for use with [`Self::pause`] /
1716    /// [`Self::resume`]. Convenience for callers that don't already have an
1717    /// id-allocation scheme; user-supplied ids work too.
1718    #[must_use]
1719    pub fn alloc_lock_id(&self) -> LockId {
1720        let mut s = self.lock_state();
1721        let id = LockId::new(s.next_lock_id);
1722        s.next_lock_id += 1;
1723        id
1724    }
1725
1726    // -------------------------------------------------------------------
1727    // Registration — unified `register()` (D030, Slice D)
1728    //
1729    // All node kinds (State / Producer / Derived / Dynamic / Operator)
1730    // funnel through `Core::register(NodeRegistration) -> NodeId`. Sugar
1731    // wrappers (`register_state` / `register_producer` / `register_derived`
1732    // / `register_dynamic` / `register_operator`) build a `NodeRegistration`
1733    // and delegate. There is no parallel registration path internally.
1734    // -------------------------------------------------------------------
1735
1736    /// Unified node registration (D030).
1737    ///
1738    /// `reg` describes the node's identity (deps + closure-form fn id OR
1739    /// typed-op + per-kind opts). The kind is **derived from the field
1740    /// shape**, not stored — see [`NodeKind`].
1741    ///
1742    /// Sugar wrappers below ([`Self::register_state`],
1743    /// [`Self::register_producer`], [`Self::register_derived`],
1744    /// [`Self::register_dynamic`], [`Self::register_operator`]) build the
1745    /// registration for the common kinds and delegate here. Direct callers
1746    /// that need uncommon combinations (e.g., a partial-true derived) can
1747    /// invoke this method directly.
1748    ///
1749    /// # Errors
1750    ///
1751    /// Errors are returned in evaluation order — earlier phases short-circuit
1752    /// later ones, so a single registration produces at most one variant.
1753    ///
1754    /// **Phase 1 — lock-released, side-effect-free validation:**
1755    /// - [`RegisterError::OperatorWithoutDeps`] — `reg` carries an op but
1756    ///   `deps` is empty. Operator nodes need at least one dep — for
1757    ///   subscription-managed combinators with no declared deps, use
1758    ///   [`Self::register_producer`] instead.
1759    /// - [`RegisterError::InitialOnlyForStateNodes`] — `reg.opts.initial`
1760    ///   is non-sentinel for a non-state shape (deps non-empty, or
1761    ///   fn_or_op present). State nodes are the only kind with an initial
1762    ///   cache.
1763    ///
1764    /// **Phase 2 — operator scratch construction (lock-released):**
1765    /// - [`RegisterError::OperatorSeedSentinel`] — `reg` carries `Op(Scan)`
1766    ///   / `Op(Reduce)` with a `NO_HANDLE` seed. R2.5.3 — stateful folders
1767    ///   must have a real seed.
1768    ///
1769    /// **Phase 3 — state-lock validation (folded with insertion under a
1770    /// single lock acquisition per /qa F1 to prevent TOCTOU between
1771    /// validation and `nodes.insert`):**
1772    /// - [`RegisterError::UnknownDep`] — any element of `reg.deps` is not
1773    ///   a registered node id.
1774    /// - [`RegisterError::TerminalDep`] — a dep is terminal (COMPLETE /
1775    ///   ERROR) AND not resubscribable — would create a permanent wedge.
1776    ///
1777    /// All errors are construction-time invariants — the dispatcher
1778    /// rejects the registration before any reactive state is created.
1779    /// On `Err`, no node has been added and any handle retains taken on
1780    /// the way in (operator scratch seed retains via
1781    /// [`BindingBoundary::retain_handle`]) have been released
1782    /// lock-released — see [`ScratchReleaseGuard`] for the RAII
1783    /// discipline that covers both early-return AND unwind paths.
1784    /// `Last { default }` retains its `default` handle on the same
1785    /// release path.
1786    pub fn register(&self, reg: NodeRegistration) -> Result<NodeId, RegisterError> {
1787        let NodeRegistration {
1788            deps,
1789            fn_or_op,
1790            opts,
1791        } = reg;
1792        let NodeOpts {
1793            initial,
1794            equals,
1795            partial,
1796            is_dynamic,
1797            pausable,
1798            replay_buffer,
1799        } = opts;
1800
1801        // Derive the field shape from fn_or_op + deps.
1802        let (fn_id, op) = match fn_or_op {
1803            Some(NodeFnOrOp::Fn(f)) => (Some(f), None),
1804            Some(NodeFnOrOp::Op(o)) => (None, Some(o)),
1805            None => (None, None),
1806        };
1807
1808        // Phase 1 — lock-released, side-effect-free validation. Errors
1809        // here return BEFORE any handle retain is taken.
1810        //
1811        //   - State (no deps + no fn + no op) is the only kind with `initial`.
1812        //   - Dynamic flag only meaningful when fn + non-empty deps.
1813        //   - Operator (op present) must have deps (P9: operator without deps
1814        //     would skip activation — use a producer instead).
1815        let is_state_shape = deps.is_empty() && fn_id.is_none() && op.is_none();
1816        if op.is_some() && deps.is_empty() {
1817            return Err(RegisterError::OperatorWithoutDeps);
1818        }
1819        if initial != NO_HANDLE && !is_state_shape {
1820            return Err(RegisterError::InitialOnlyForStateNodes);
1821        }
1822
1823        // Phase 2 — build per-operator scratch struct (may take handle
1824        // retains via `binding.retain_handle` for Scan/Reduce/Last seed).
1825        // Lock-released per Slice E (D045) handshake discipline. Returns
1826        // `OperatorSeedSentinel` BEFORE retain so an Err leaves no
1827        // dangling handles.
1828        let scratch = match op {
1829            Some(operator_op) => self.make_op_scratch(operator_op)?,
1830            None => None,
1831        };
1832
1833        // Wrap scratch in an RAII guard immediately after Phase 2. From
1834        // here on, ANY early return / unwind path correctly releases the
1835        // scratch's handle retains via `OperatorScratch::release_handles`
1836        // (Slice H /qa F2 — defense against panics between Phase 2 and
1837        // Phase 3 cleanup branch). Lock-released because the guard is
1838        // declared BEFORE `lock_state()` below — variable destruction
1839        // order is reverse declaration order, so the `MutexGuard` drops
1840        // first on any return path.
1841        let scratch_guard = ScratchReleaseGuard::new(scratch, &*self.binding);
1842
1843        // Phase 3 — state-lock-required validation, FOLDED with insertion
1844        // under a single `lock_state()` acquisition per /qa F1. The
1845        // pre-/qa version split this into two acquisitions (one for
1846        // validation, one for `alloc_node_id` + `nodes.insert`), opening
1847        // a TOCTOU window where a concurrent `Core::complete(dep)` on a
1848        // non-resubscribable dep could slip in and recreate the wedge
1849        // `TerminalDep` was designed to prevent. Single locked region
1850        // closes the gap.
1851        let mut s = self.lock_state();
1852
1853        for &dep in &deps {
1854            if !s.nodes.contains_key(&dep) {
1855                return Err(RegisterError::UnknownDep(dep));
1856            }
1857        }
1858        // Slice F audit (2026-05-07): mirror `set_deps`'s `TerminalDep`
1859        // rejection at registration time. Adding a non-resubscribable
1860        // terminal node as a dep at registration creates a permanent wedge.
1861        for &dep in &deps {
1862            let dep_rec = s.require_node(dep);
1863            if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
1864                return Err(RegisterError::TerminalDep(dep));
1865            }
1866        }
1867
1868        // Validation passed — install. Take scratch out of the guard
1869        // (disarms the release-on-drop) and continue using `s`.
1870        let installed_scratch = scratch_guard.take();
1871
1872        let id = s.alloc_node_id();
1873
1874        // `tracked`: Static derived + Operator track all deps; Dynamic
1875        // starts empty and fills via fn return; State / Producer have no
1876        // deps so tracked is empty.
1877        let tracked: HashSet<usize> = if op.is_some() {
1878            (0..deps.len()).collect()
1879        } else if is_dynamic {
1880            HashSet::new()
1881        } else if fn_id.is_some() && !deps.is_empty() {
1882            // Static derived
1883            (0..deps.len()).collect()
1884        } else {
1885            HashSet::new()
1886        };
1887
1888        let dep_records: Vec<DepRecord> = deps.iter().map(|&d| DepRecord::new(d)).collect();
1889
1890        let rec = NodeRecord {
1891            dep_records,
1892            fn_id,
1893            op,
1894            is_dynamic,
1895            equals,
1896            cache: initial,
1897            has_fired_once: initial != NO_HANDLE,
1898            subscribers: HashMap::new(),
1899            tracked,
1900            dirty: false,
1901            involved_this_wave: false,
1902            pause_state: PauseState::Active,
1903            pausable,
1904            replay_buffer_cap: replay_buffer,
1905            replay_buffer: VecDeque::new(),
1906            terminal: None,
1907            has_received_teardown: false,
1908            resubscribable: false,
1909            meta_companions: Vec::new(),
1910            partial,
1911            op_scratch: installed_scratch,
1912        };
1913        s.nodes.insert(id, rec);
1914        s.children.insert(id, HashSet::new());
1915        for &dep in &deps {
1916            s.children.entry(dep).or_default().insert(id);
1917        }
1918        drop(s);
1919        self.fire_topology_event(&crate::topology::TopologyEvent::NodeRegistered(id));
1920        Ok(id)
1921    }
1922
1923    /// Sugar over [`Self::register`] — register a state node. `initial`
1924    /// may be [`NO_HANDLE`] to start sentinel.
1925    ///
1926    /// `partial` is accepted for surface consistency (D019); for state
1927    /// nodes it has no effect (state nodes don't fire fn).
1928    ///
1929    /// # Errors
1930    ///
1931    /// State registration is structurally simple — no deps, no op — so
1932    /// the only reachable variant is none in practice. Returns
1933    /// [`Result`] for surface consistency with [`Self::register`].
1934    pub fn register_state(
1935        &self,
1936        initial: HandleId,
1937        partial: bool,
1938    ) -> Result<NodeId, RegisterError> {
1939        self.register(NodeRegistration {
1940            deps: Vec::new(),
1941            fn_or_op: None,
1942            opts: NodeOpts {
1943                initial,
1944                partial,
1945                ..NodeOpts::default()
1946            },
1947        })
1948    }
1949
1950    /// Sugar over [`Self::register`] — register a producer node (D031,
1951    /// Slice D). No deps; fn fires once on first subscribe; cleanup runs
1952    /// via [`BindingBoundary::producer_deactivate`] when the last
1953    /// subscriber unsubscribes.
1954    ///
1955    /// The fn body uses the binding's `ProducerCtx`-equivalent helper
1956    /// (see `graphrefly-operators::producer`) to subscribe to other Core
1957    /// nodes — the zip / concat / race / takeUntil pattern.
1958    ///
1959    /// # Errors
1960    ///
1961    /// Producer registration has no user-supplied deps, so structurally
1962    /// none of [`RegisterError`]'s variants are reachable. Returns
1963    /// [`Result`] for surface consistency with [`Self::register`].
1964    pub fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
1965        self.register(NodeRegistration {
1966            deps: Vec::new(),
1967            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
1968            opts: NodeOpts {
1969                // Producers have no deps — the first-run gate is degenerate.
1970                partial: true,
1971                ..NodeOpts::default()
1972            },
1973        })
1974    }
1975
1976    /// Sugar over [`Self::register`] — register a derived (static) node.
1977    /// `partial` controls the R2.5.3 first-run gate (D011).
1978    ///
1979    /// # Errors
1980    ///
1981    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
1982    ///   registered.
1983    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
1984    ///   resubscribable.
1985    pub fn register_derived(
1986        &self,
1987        deps: &[NodeId],
1988        fn_id: FnId,
1989        equals: EqualsMode,
1990        partial: bool,
1991    ) -> Result<NodeId, RegisterError> {
1992        self.register(NodeRegistration {
1993            deps: deps.to_vec(),
1994            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
1995            opts: NodeOpts {
1996                equals,
1997                partial,
1998                ..NodeOpts::default()
1999            },
2000        })
2001    }
2002
2003    /// Sugar over [`Self::register`] — register a dynamic node (fn
2004    /// declares its actually-tracked dep indices per fire). `partial`
2005    /// controls the R2.5.3 first-run gate (D011).
2006    ///
2007    /// # Errors
2008    ///
2009    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2010    ///   registered.
2011    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2012    ///   resubscribable.
2013    pub fn register_dynamic(
2014        &self,
2015        deps: &[NodeId],
2016        fn_id: FnId,
2017        equals: EqualsMode,
2018        partial: bool,
2019    ) -> Result<NodeId, RegisterError> {
2020        self.register(NodeRegistration {
2021            deps: deps.to_vec(),
2022            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2023            opts: NodeOpts {
2024                equals,
2025                partial,
2026                is_dynamic: true,
2027                ..NodeOpts::default()
2028            },
2029        })
2030    }
2031
2032    /// Build a fresh [`OperatorScratch`](crate::op_state::OperatorScratch)
2033    /// box for an operator variant, taking any required handle retains.
2034    /// Shared between `register_operator` (initial install) and
2035    /// `reset_for_fresh_lifecycle` (resubscribable cycle re-install).
2036    ///
2037    /// # Errors
2038    ///
2039    /// Returns [`RegisterError::OperatorSeedSentinel`] if `op` is `Scan`
2040    /// / `Reduce` with a [`NO_HANDLE`] seed (R2.5.3 — stateful folders
2041    /// must have a real seed). Refcount discipline: the seed-sentinel
2042    /// check happens BEFORE [`BindingBoundary::retain_handle`], so an
2043    /// `Err` leaves no handles dangling.
2044    fn make_op_scratch(
2045        &self,
2046        op: OperatorOp,
2047    ) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
2048        use crate::op_state::{
2049            DistinctState, LastState, PairwiseState, ReduceState, ScanState, SkipState, TakeState,
2050            TakeWhileState,
2051        };
2052        // Slice H (2026-05-07): Scan/Reduce seed-sentinel checks happen
2053        // BEFORE retain_handle so an Err return leaves no handles dangling.
2054        //
2055        // Slice H /qa F13 (2026-05-07): for retaining variants, allocate
2056        // the `Box<State>` BEFORE calling `binding.retain_handle`. If
2057        // `Box::new` panics (e.g., OOM-as-panic), no retain has happened
2058        // yet — no leak. If `retain_handle` panics after Box succeeds,
2059        // the `Box<State>` is dropped on unwind; State has no handle yet
2060        // (we haven't touched the registry refcount), so still no leak.
2061        // Caller wraps the returned scratch in `ScratchReleaseGuard` to
2062        // cover panics AFTER make_op_scratch returns.
2063        match op {
2064            OperatorOp::Scan { seed, .. } => {
2065                if seed == NO_HANDLE {
2066                    return Err(RegisterError::OperatorSeedSentinel);
2067                }
2068                let state = Box::new(ScanState { acc: seed });
2069                self.binding.retain_handle(seed);
2070                Ok(Some(state))
2071            }
2072            OperatorOp::Reduce { seed, .. } => {
2073                if seed == NO_HANDLE {
2074                    return Err(RegisterError::OperatorSeedSentinel);
2075                }
2076                let state = Box::new(ReduceState { acc: seed });
2077                self.binding.retain_handle(seed);
2078                Ok(Some(state))
2079            }
2080            OperatorOp::DistinctUntilChanged { .. } => Ok(Some(Box::new(DistinctState::default()))),
2081            OperatorOp::Pairwise { .. } => Ok(Some(Box::new(PairwiseState::default()))),
2082            OperatorOp::Take { .. } => Ok(Some(Box::new(TakeState::default()))),
2083            OperatorOp::Skip { .. } => Ok(Some(Box::new(SkipState::default()))),
2084            OperatorOp::TakeWhile { .. } => Ok(Some(Box::new(TakeWhileState))),
2085            OperatorOp::Last { default } => {
2086                let state = Box::new(LastState {
2087                    latest: NO_HANDLE,
2088                    default,
2089                });
2090                if default != NO_HANDLE {
2091                    self.binding.retain_handle(default);
2092                }
2093                Ok(Some(state))
2094            }
2095            OperatorOp::Map { .. }
2096            | OperatorOp::Filter { .. }
2097            | OperatorOp::Combine { .. }
2098            | OperatorOp::WithLatestFrom { .. }
2099            | OperatorOp::Merge => Ok(None),
2100        }
2101    }
2102
2103    /// Sugar over [`Self::register`] — register a built-in operator node
2104    /// (Slice C-1, D009; D026 generic scratch). The operator dispatch path
2105    /// lives in `fire_operator`; `op` selects which per-operator FFI
2106    /// method on [`BindingBoundary`] gets called per fire.
2107    ///
2108    /// For stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
2109    /// [`Last`] with a default), the seed/default handle is captured
2110    /// into the appropriate
2111    /// [`OperatorScratch`](crate::op_state::OperatorScratch) struct
2112    /// stored at [`NodeRecord::op_scratch`], and Core takes one retain
2113    /// share via [`BindingBoundary::retain_handle`].
2114    ///
2115    /// # Errors
2116    ///
2117    /// - [`RegisterError::OperatorWithoutDeps`] — `deps` is empty (use
2118    ///   [`Self::register_producer`] instead).
2119    /// - [`RegisterError::OperatorSeedSentinel`] — `op` is
2120    ///   [`OperatorOp::Scan`] / [`OperatorOp::Reduce`] with a
2121    ///   [`NO_HANDLE`] seed.
2122    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2123    ///   registered.
2124    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2125    ///   resubscribable.
2126    pub fn register_operator(
2127        &self,
2128        deps: &[NodeId],
2129        op: OperatorOp,
2130        opts: OperatorOpts,
2131    ) -> Result<NodeId, RegisterError> {
2132        self.register(NodeRegistration {
2133            deps: deps.to_vec(),
2134            fn_or_op: Some(NodeFnOrOp::Op(op)),
2135            opts: NodeOpts {
2136                equals: opts.equals,
2137                partial: opts.partial,
2138                ..NodeOpts::default()
2139            },
2140        })
2141    }
2142
2143    // -------------------------------------------------------------------
2144    // Subscription
2145    // -------------------------------------------------------------------
2146
2147    /// Subscribe a sink to a node. Returns a [`Subscription`] handle —
2148    /// dropping the handle unsubscribes the sink. Per §10.12, no manual
2149    /// `unsubscribe(node, id)` call is required.
2150    ///
2151    /// Push-on-subscribe (R1.2.3, R2.2.3 step 4): the sink is registered AFTER
2152    /// the START handshake fires. The handshake contents depend on node
2153    /// state:
2154    /// - Sentinel cache + live (non-terminal): `[START]`
2155    /// - Cached + live: `[START, DATA(handle)]`
2156    /// - Cached + terminated (non-resubscribable): `[START, DATA(handle), <terminal>]`
2157    /// - Sentinel + terminated (non-resubscribable): `[START, <terminal>]`
2158    ///
2159    /// Resubscribable terminal lifecycle (R2.2.7 / R2.5.3): if the node was
2160    /// marked resubscribable via [`Self::set_resubscribable`] AND has
2161    /// terminated, the subscribe call first **resets** the node — clears
2162    /// `terminal`, `has_fired_once`, `has_received_teardown`, all
2163    /// `dep_handles` to `NO_HANDLE`, all `dep_terminals` to `None`, and
2164    /// drains the pause lockset. The new subscriber then receives a fresh
2165    /// `[START]` (cache may survive for state nodes; sentinel for compute).
2166    ///
2167    /// Activation (R2.2.3 step 5): if this is the first subscriber and the
2168    /// node is a derived/dynamic compute, recursively activate deps so their
2169    /// cached handles fill our `dep_handles`.
2170    #[allow(clippy::needless_pass_by_value)] // Sink is `Arc<dyn Fn>`; we clone for the subscribers map and call it directly. Taking by value matches the ergonomics callers expect.
2171    pub fn subscribe(&self, node_id: NodeId, sink: Sink) -> Subscription {
2172        // Subscribe protocol (Slice E rework, post-handshake-reentry-lift):
2173        //
2174        // 1. Acquire `wave_owner` first (re-entrant; same-thread passes
2175        //    through, cross-thread blocks). This is the cross-thread
2176        //    serialization point that preserves R1.3.5.a happens-after
2177        //    ordering across the lock-released handshake fire.
2178        // 2. Acquire state lock briefly: alloc sub_id, run resubscribable
2179        //    reset if applicable, snapshot handshake state, install sink
2180        //    in `subscribers`. Drop state lock.
2181        // 3. Fire handshake LOCK-RELEASED. Per-tier slices (R1.3.5.a):
2182        //    `[Start]` / `[Data(cache)]?` / `[Complete]?` / `[Error(h)]?`
2183        //    / `[Teardown]?`. Empty tiers are skipped. Sink callbacks
2184        //    may re-enter Core freely — same-thread re-entry passes
2185        //    through `wave_owner` reentrantly.
2186        // 4. Run activation under `run_wave` if needed (first subscriber
2187        //    on a non-state node).
2188        // 5. Drop `wave_owner`.
2189        //
2190        // Race-fix discipline: the sink is installed in `subscribers`
2191        // BEFORE the state lock drops, so concurrent threads that
2192        // acquire `wave_owner` after our scope sees the sink already
2193        // registered. Cross-thread emits block on `wave_owner` until
2194        // we drop it, ensuring all our handshake calls land before
2195        // any concurrent wave's flush observes the sink.
2196
2197        // Acquire wave_owner first — cross-thread serialization point.
2198        // `lock_arc()` is `!Send`; same-thread reentrant.
2199        let _wave_guard = self.wave_owner.lock_arc();
2200
2201        let (sub_id, tier_slices, needs_activation, did_reset) = {
2202            let mut s = self.lock_state();
2203            let sub_id = s.alloc_sub_id();
2204
2205            // Resubscribable reset: terminal + flagged → clear lifecycle
2206            // state so the incoming subscriber starts fresh. F3 audit
2207            // guard: a node that has received TEARDOWN (R2.6.4) is
2208            // permanently destroyed at this layer; resurrecting it via a
2209            // late subscribe is a category error. COMPLETE/ERROR is
2210            // recoverable for resubscribable nodes; TEARDOWN is not. The
2211            // handshake will still replay the terminal in the non-reset
2212            // branch so the late subscriber sees a clean
2213            // `[START, ?DATA, COMPLETE|ERROR, TEARDOWN]` stream.
2214            let needs_reset = {
2215                let rec = s.require_node(node_id);
2216                rec.resubscribable && rec.terminal.is_some() && !rec.has_received_teardown
2217            };
2218            if needs_reset {
2219                self.reset_for_fresh_lifecycle(&mut s, node_id);
2220            }
2221
2222            // Snapshot handshake state under lock.
2223            let (cache, is_state, first_subscriber, terminal, torn_down) = {
2224                let rec = s.require_node(node_id);
2225                (
2226                    rec.cache,
2227                    rec.is_state(),
2228                    rec.subscribers.is_empty(),
2229                    rec.terminal,
2230                    rec.has_received_teardown,
2231                )
2232            };
2233
2234            // Build per-tier handshake slices. Each non-empty slice is
2235            // fired as a separate sink call (R1.3.5.a tier-split).
2236            let mut tier_slices: SmallVec<[Vec<Message>; 4]> = SmallVec::new();
2237            tier_slices.push(vec![Message::Start]);
2238            if cache != NO_HANDLE {
2239                tier_slices.push(vec![Message::Data(cache)]);
2240            }
2241            // Slice E1 (R2.6.5 / Lock 6.G): replay buffered DATA between
2242            // [Start] (and the cache slice, if present) and any terminal.
2243            // Each buffered handle becomes a separate per-tier slice so
2244            // late subscribers see the historical Data sequence as
2245            // distinct sink calls.
2246            //
2247            // Dedupe: when a cache slice is present and the buffer's last
2248            // entry is the same handle (the typical case — cache always
2249            // tracks the last DATA emitted, and the buffer's tail entry
2250            // is that same DATA), skip the last buffer entry to avoid
2251            // delivering Data(cache) twice. For state nodes whose cache
2252            // survives unsubscribe, the buffer may have older entries
2253            // the cache doesn't reflect; the dedupe only drops the
2254            // single trailing entry that equals cache. (QA A1, 2026-05-07)
2255            let replay_handles: Vec<HandleId> = {
2256                let rec = s.require_node(node_id);
2257                let cap = rec.replay_buffer_cap.unwrap_or(0);
2258                if cap == 0 {
2259                    Vec::new()
2260                } else {
2261                    let mut v: Vec<HandleId> = rec.replay_buffer.iter().copied().collect();
2262                    if cache != NO_HANDLE && v.last() == Some(&cache) {
2263                        v.pop();
2264                    }
2265                    v
2266                }
2267            };
2268            for h in &replay_handles {
2269                tier_slices.push(vec![Message::Data(*h)]);
2270            }
2271            if let Some(t) = terminal {
2272                tier_slices.push(vec![match t {
2273                    TerminalKind::Complete => Message::Complete,
2274                    TerminalKind::Error(h) => Message::Error(h),
2275                }]);
2276            }
2277            if torn_down {
2278                tier_slices.push(vec![Message::Teardown]);
2279            }
2280
2281            // Install sink BEFORE dropping state lock so any thread that
2282            // subsequently acquires `wave_owner` (after our scope ends)
2283            // sees the sink already registered.
2284            s.require_node_mut(node_id)
2285                .subscribers
2286                .insert(sub_id, sink.clone());
2287
2288            let needs_activation = first_subscriber && !is_state;
2289            (sub_id, tier_slices, needs_activation, needs_reset)
2290            // state lock drops here
2291        };
2292
2293        // Slice E2 (R2.4.6 / D055): on resubscribable terminal reset, fire
2294        // `wipe_ctx` LOCK-RELEASED so the binding drops its `NodeCtxState`
2295        // entry (clearing both `store` and any residual `current_cleanup`).
2296        // The new subscriber's first invoke_fn sees a fresh empty store.
2297        // Fires AFTER the state lock drops so the binding's
2298        // `node_ctx.lock()` can't deadlock against Core's state lock — and
2299        // BEFORE the handshake so the wipe is observable before any
2300        // user-visible interaction with the new lifecycle.
2301        if did_reset {
2302            self.binding.wipe_ctx(node_id);
2303        }
2304
2305        // Fire handshake LOCK-RELEASED. Sink may re-enter Core; same-
2306        // thread re-entry passes through `wave_owner` reentrantly.
2307        // Cross-thread emits block at `wave_owner` until our scope ends.
2308        //
2309        // A7 (Slice F, 2026-05-07): per-tier slice fire is wrapped in
2310        // `catch_unwind`. The sink is installed in `subscribers` BEFORE
2311        // the handshake fires (load-bearing — concurrent threads observe
2312        // the sink immediately). If a sink panics on tier N, the panic
2313        // would otherwise unwind out of `subscribe` BEFORE the
2314        // `Subscription` handle is constructed, leaving the sink
2315        // registered in `subscribers` with no user-held handle to drop.
2316        // Subsequent waves' `flush_notifications` would re-fire the
2317        // panicking sink forever.
2318        //
2319        // On panic: remove the sink from `subscribers` (via the
2320        // already-allocated `sub_id`), drop `_wave_guard` cleanly via
2321        // RAII, and resume the unwind so the user observes the panic at
2322        // the call site. Same effect as the user dropping the
2323        // `Subscription` immediately, but pre-emptive.
2324        for slice in &tier_slices {
2325            let sink_clone = sink.clone();
2326            let slice_ref: &[Message] = slice;
2327            let result = catch_unwind(AssertUnwindSafe(|| sink_clone(slice_ref)));
2328            if let Err(panic_payload) = result {
2329                // Remove the orphaned sink. Best-effort: if the node was
2330                // since torn down (e.g., the sink itself called teardown
2331                // before panicking), the entry may already be gone.
2332                {
2333                    let mut s = self.lock_state();
2334                    if let Some(rec) = s.nodes.get_mut(&node_id) {
2335                        rec.subscribers.remove(&sub_id);
2336                    }
2337                }
2338                std::panic::resume_unwind(panic_payload);
2339            }
2340        }
2341
2342        // Run activation if needed. `run_wave` re-acquires `wave_owner`
2343        // reentrantly + manages its own state-lock acquisition.
2344        if needs_activation {
2345            self.run_wave(|this| {
2346                let mut s = this.lock_state();
2347                this.activate_derived(&mut s, node_id);
2348            });
2349        }
2350
2351        Subscription {
2352            state: Arc::downgrade(&self.state),
2353            node_id,
2354            sub_id,
2355        }
2356        // _wave_guard drops here, releasing wave_owner.
2357    }
2358
2359    /// Mark `node_id` as resubscribable per R2.2.7. Resubscribable nodes
2360    /// reset their terminal-lifecycle state on a fresh subscribe — see
2361    /// [`Self::subscribe`].
2362    ///
2363    /// Configuration call — must be made before the node has any active
2364    /// subscribers, since changing the policy mid-flight would surprise
2365    /// existing observers.
2366    ///
2367    /// # Panics
2368    ///
2369    /// Panics if the node has subscribers (the policy is observable
2370    /// behavior; changing it after the fact would change semantics for
2371    /// existing sinks).
2372    pub fn set_resubscribable(&self, node_id: NodeId, resubscribable: bool) {
2373        let mut s = self.lock_state();
2374        let rec = s.require_node_mut(node_id);
2375        assert!(
2376            rec.subscribers.is_empty(),
2377            "set_resubscribable: node already has subscribers; \
2378             configure resubscribable before any subscribe call"
2379        );
2380        rec.resubscribable = resubscribable;
2381    }
2382
2383    /// Reset a resubscribable node's terminal-lifecycle state. Called from
2384    /// `subscribe` when a late subscriber arrives at a flagged node.
2385    ///
2386    /// Released: terminal-slot retain (Error handle), all per-dep terminal
2387    /// retains (Error handles), all data_batch retains.
2388    /// Cleared: `terminal`, `has_fired_once`, `has_received_teardown`, all
2389    /// dep_records to sentinel, the pause lockset (any held locks are
2390    /// released — replay buffer drops silently because there are no
2391    /// subscribers to flush to).
2392    fn reset_for_fresh_lifecycle(&self, s: &mut CoreState, node_id: NodeId) {
2393        // Phase 1: collect wave-state handle releases + take the old
2394        // op_scratch + reset other state. Take all mutations under one
2395        // borrow so the post-borrow phases don't re-walk dep_records.
2396        let (prev_op, mut old_scratch, handles_to_release, pause_buffer_payloads) = {
2397            let rec = s.require_node_mut(node_id);
2398            let mut hs = Vec::new();
2399            if let Some(TerminalKind::Error(h)) = rec.terminal {
2400                hs.push(h);
2401            }
2402            for dr in &rec.dep_records {
2403                if let Some(TerminalKind::Error(h)) = dr.terminal {
2404                    hs.push(h);
2405                }
2406                for &h in &dr.data_batch {
2407                    hs.push(h);
2408                }
2409                // Slice C-3 /qa: also release `prev_data`. Prior to this
2410                // collection, `reset_for_fresh_lifecycle` overwrote
2411                // `dr.prev_data = NO_HANDLE` without releasing the old
2412                // handle, leaking one share per dep per resubscribable
2413                // cycle. The leak was masked because no test exercised
2414                // the per-dep `prev_data` retain across a lifecycle
2415                // reset; surfaced by the T1 tightening of
2416                // `last_releases_buffered_latest_on_lifecycle_reset`.
2417                if dr.prev_data != NO_HANDLE {
2418                    hs.push(dr.prev_data);
2419                }
2420            }
2421            // Take pause_state's buffer; collect its payload handles for
2422            // release (they were retained at queue_notify time; buffer
2423            // drops because the new subscriber starts fresh).
2424            let mut pulled = Vec::new();
2425            if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
2426                for msg in buffer.drain(..) {
2427                    if let Some(h) = msg.payload_handle() {
2428                        pulled.push(h);
2429                    }
2430                }
2431            }
2432            // Slice E1: drain the replay buffer too — the new subscriber
2433            // gets a fresh lifecycle and shouldn't see prior emissions.
2434            for h in rec.replay_buffer.drain(..) {
2435                pulled.push(h);
2436            }
2437            // Reset wave / lifecycle state.
2438            rec.terminal = None;
2439            rec.has_fired_once = rec.cache != NO_HANDLE && rec.is_state();
2440            rec.has_received_teardown = false;
2441            for dr in &mut rec.dep_records {
2442                dr.prev_data = NO_HANDLE;
2443                dr.data_batch.clear();
2444                dr.terminal = None;
2445                dr.dirty = false;
2446                dr.involved_this_wave = false;
2447            }
2448            rec.pause_state = PauseState::Active;
2449            rec.involved_this_wave = false;
2450            rec.dirty = false;
2451            // P7 (Slice A close /qa): Dynamic nodes clear `tracked` so
2452            // the post-reset first fire repopulates from the fn's
2453            // returned tracked-deps set.
2454            if rec.is_dynamic {
2455                rec.tracked.clear();
2456            }
2457            // Take the old scratch out so we can release its handles and
2458            // install a fresh one. Operator op is copied for the
2459            // rebuild step below.
2460            let prev_op = rec.op;
2461            let old = std::mem::take(&mut rec.op_scratch);
2462            (prev_op, old, hs, pulled)
2463        };
2464
2465        // Phase 2 (Slice C-3 /qa P1 — RETAIN-BEFORE-RELEASE ordering):
2466        // build the fresh scratch FIRST, taking new retains on any
2467        // seed/default handles. This must run BEFORE Phase 3 releases
2468        // the old scratch's shares — if old `acc` (Scan/Reduce) or old
2469        // `latest` (Last) aliases the new `seed`/`default` (common:
2470        // `fold(seed, x) == seed` interns to the same registry entry),
2471        // releasing the old share first could collapse the binding's
2472        // registry slot to zero (production bindings remove the value
2473        // entry on refcount-zero — see `tests/common/mod.rs:191-204`),
2474        // and a subsequent `retain_handle` on the new seed would bump a
2475        // refcount on a slot whose value has been removed. By taking
2476        // the new retains first, we floor the refcount at ≥1 before
2477        // any release happens.
2478        let new_scratch = match prev_op {
2479            // Slice H: the OperatorOp stored on NodeRecord previously
2480            // passed `make_op_scratch` validation at registration time
2481            // (RegisterError::OperatorSeedSentinel for Scan/Reduce
2482            // sentinel seeds; Last { default: NO_HANDLE } is accepted
2483            // and never errors). Re-running it here on the same op
2484            // value is structurally guaranteed to succeed.
2485            Some(op) => self
2486                .make_op_scratch(op)
2487                .expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time"),
2488            None => None,
2489        };
2490
2491        // Phase 3: NOW release handles owned by the old op_scratch
2492        // (Scan/Reduce acc, Distinct/Pairwise prev, Last latest +
2493        // default). Safe per Phase 2's retain-first floor. The boxed
2494        // value is consumed and dropped after.
2495        if let Some(scratch) = old_scratch.as_mut() {
2496            scratch.release_handles(&*self.binding);
2497        }
2498        drop(old_scratch);
2499
2500        // Phase 4: install the fresh scratch.
2501        {
2502            let rec = s.require_node_mut(node_id);
2503            rec.op_scratch = new_scratch;
2504        }
2505
2506        // Phase 5: release wave-state handles collected in phase 1.
2507        for h in handles_to_release {
2508            self.binding.release_handle(h);
2509        }
2510        for h in pause_buffer_payloads {
2511            self.binding.release_handle(h);
2512        }
2513    }
2514
2515    /// Activate `root` and any transitive uncached compute deps so their
2516    /// caches fill our dep_handles slots.
2517    ///
2518    /// Slice A close (M1): pure dep-walk + dep_handles population +
2519    /// pending_fires queueing. No `in_tick` management or `drain_and_flush`
2520    /// call — the outer caller (typically `Core::subscribe` via
2521    /// [`Core::run_wave`]) owns the wave lifecycle and drains lock-released
2522    /// around `invoke_fn`.
2523    ///
2524    /// Walk shape:
2525    ///   1. **Discover phase (DFS via Vec stack):** starting at `root`,
2526    ///      walk transitively-needing-activation deps via the `deps`
2527    ///      chain. Build an ordering where each node appears AFTER all
2528    ///      of its uncached compute deps — i.e., reverse topological
2529    ///      among the visited subgraph.
2530    ///   2. **Deliver phase (forward iteration):** for each visited
2531    ///      node in dep-first order, push deps' caches into the node's
2532    ///      `dep_handles` slots. Caches that were sentinel pre-walk are
2533    ///      filled because their parent's fn fires later in the wave's
2534    ///      drain loop and `commit_emission` propagates new caches forward
2535    ///      via `deliver_data_to_consumer` — the same path this method
2536    ///      uses for the initial seed. Adds the node to `pending_fires`
2537    ///      if its tracked-deps gate is satisfied; the wave-engine drain
2538    ///      fires the fn lock-released around `invoke_fn`.
2539    pub(crate) fn activate_derived(&self, s: &mut CoreState, root: NodeId) {
2540        // Phase 1: discover. DFS to collect every compute node reachable
2541        // via deps that doesn't yet have a cache and hasn't fired.
2542        // Record them in dep-first (post-order) so phase 2 can deliver
2543        // caches forward. Frame is `(node_id, finalize)` — `finalize=false`
2544        // means "first visit: push deps then re-push self with finalize=true";
2545        // `finalize=true` means "deps have been expanded, append self to
2546        // `order`."
2547        let mut visited: HashSet<NodeId> = HashSet::new();
2548        let mut order: Vec<NodeId> = Vec::new();
2549        let mut stack: Vec<(NodeId, bool)> = vec![(root, false)];
2550        while let Some((id, finalize)) = stack.pop() {
2551            if finalize {
2552                order.push(id);
2553                continue;
2554            }
2555            if !visited.insert(id) {
2556                continue;
2557            }
2558            stack.push((id, true));
2559            let dep_ids: Vec<NodeId> = s.require_node(id).dep_ids_vec();
2560            for dep_id in dep_ids {
2561                let (dep_is_state, dep_cache, dep_has_fired) = {
2562                    let dep_rec = s.require_node(dep_id);
2563                    (dep_rec.is_state(), dep_rec.cache, dep_rec.has_fired_once)
2564                };
2565                if !dep_is_state
2566                    && dep_cache == NO_HANDLE
2567                    && !dep_has_fired
2568                    && !visited.contains(&dep_id)
2569                {
2570                    stack.push((dep_id, false));
2571                }
2572            }
2573        }
2574
2575        // Phase 2: deliver caches in dep-first order. For each node, walk
2576        // its deps and call `deliver_data_to_consumer` for any with caches.
2577        // Producer nodes (no deps + has fn — Slice D, D031) have no deps
2578        // to walk; queue them directly into `pending_fires` so the wave
2579        // engine fires their fn once on activation.
2580        for &id in &order {
2581            let (dep_ids, is_producer) = {
2582                let rec = s.require_node(id);
2583                (rec.dep_ids_vec(), rec.is_producer())
2584            };
2585            if is_producer {
2586                s.pending_fires.insert(id);
2587                continue;
2588            }
2589            for (i, dep_id) in dep_ids.iter().copied().enumerate() {
2590                let dep_cache = s.require_node(dep_id).cache;
2591                if dep_cache != NO_HANDLE {
2592                    self.deliver_data_to_consumer(s, id, i, dep_cache);
2593                }
2594            }
2595        }
2596    }
2597
2598    // -------------------------------------------------------------------
2599    // Emission entry point
2600    // -------------------------------------------------------------------
2601
2602    /// Set a state node's value. Triggers a wave (DIRTY → DATA/RESOLVED →
2603    /// fn fires for downstream).
2604    ///
2605    /// Silent no-op if the node has already terminated (R1.3.4). The handle
2606    /// passed in is still released by the caller's binding-side intern path
2607    /// — no implicit retain is consumed when the call short-circuits.
2608    ///
2609    /// # Panics
2610    ///
2611    /// Panics if `node_id` is not a state node, or if `new_handle` is
2612    /// [`NO_HANDLE`] (per R1.2.4, sentinel is not a valid DATA payload).
2613    pub fn emit(&self, node_id: NodeId, new_handle: HandleId) {
2614        assert!(
2615            new_handle != NO_HANDLE,
2616            "NO_HANDLE is not a valid DATA payload (R1.2.4)"
2617        );
2618        // Validate + terminal short-circuit under a brief lock.
2619        //
2620        // emit() is valid for State and Producer nodes — both are
2621        // intrinsic sources whose values are not derived from declared
2622        // deps. State nodes get emit() from user code; Producer nodes
2623        // get emit() from sink callbacks the producer's build closure
2624        // registered (sink fires → re-enter Core → emit on self).
2625        // Derived / Dynamic / Operator nodes emit via their fn return
2626        // value through fire_fn / fire_operator, NOT via emit().
2627        {
2628            let s = self.lock_state();
2629            let rec = s.require_node(node_id);
2630            assert!(
2631                rec.is_state() || rec.is_producer(),
2632                "emit() is for state or producer nodes only; \
2633                 derived/dynamic/operator emit via their fn return value"
2634            );
2635            if rec.terminal.is_some() {
2636                drop(s);
2637                // Caller's intern share would otherwise leak; cache slot
2638                // ownership doesn't transfer because we're not advancing
2639                // cache. Released lock-released so the binding can't
2640                // deadlock against an internal binding mutex.
2641                self.binding.release_handle(new_handle);
2642                return;
2643            }
2644        }
2645        // Run wave — `run_wave` and `commit_emission` manage their own
2646        // locking; binding callbacks (custom_equals, sinks) fire lock-
2647        // released.
2648        self.run_wave(|this| {
2649            this.commit_emission(node_id, new_handle);
2650        });
2651    }
2652
2653    /// Read a node's current cache. Returns [`NO_HANDLE`] if sentinel.
2654    #[must_use]
2655    pub fn cache_of(&self, node_id: NodeId) -> HandleId {
2656        self.lock_state().require_node(node_id).cache
2657    }
2658
2659    /// Whether the node's fn has fired at least once (compute) OR it has had
2660    /// a non-sentinel value (state).
2661    #[must_use]
2662    pub fn has_fired_once(&self, node_id: NodeId) -> bool {
2663        self.lock_state().require_node(node_id).has_fired_once
2664    }
2665
2666    // -------------------------------------------------------------------
2667    // Read-side inspection helpers (Slice E+, M2)
2668    //
2669    // Non-panicking accessors for graph-layer introspection (`describe()`,
2670    // `observe()`, `node_count()`). All five return Option/empty for
2671    // unknown ids — they're meant to back walks over `node_ids()` where
2672    // the caller already knows the ids are valid, plus debugging /
2673    // dry-run probes that prefer "absence" over "panic".
2674    //
2675    // Keep these strictly read-only: no wave entry, no binding callbacks,
2676    // no lock release. Each takes the state lock once, copies a small
2677    // value, and drops the lock.
2678    // -------------------------------------------------------------------
2679
2680    /// Snapshot of every registered `NodeId` in unspecified order. The
2681    /// order matches `HashMap` iteration over the internal node table —
2682    /// callers that need stable ordering should track names at the
2683    /// `Graph` layer (canonical spec §3.5 namespace).
2684    #[must_use]
2685    pub fn node_ids(&self) -> Vec<NodeId> {
2686        self.lock_state().nodes.keys().copied().collect()
2687    }
2688
2689    /// Total number of nodes registered in this Core.
2690    #[must_use]
2691    pub fn node_count(&self) -> usize {
2692        self.lock_state().nodes.len()
2693    }
2694
2695    /// Returns `Some(kind)` for known nodes, `None` for unknown. Kind is
2696    /// **derived** from the field shape per D030 — see [`NodeKind`].
2697    #[must_use]
2698    pub fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
2699        self.lock_state().nodes.get(&node_id).map(NodeRecord::kind)
2700    }
2701
2702    /// Snapshot of the node's deps in declaration order. Empty for
2703    /// unknown nodes or for state nodes (which have no deps).
2704    #[must_use]
2705    pub fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
2706        self.lock_state()
2707            .nodes
2708            .get(&node_id)
2709            .map(NodeRecord::dep_ids_vec)
2710            .unwrap_or_default()
2711    }
2712
2713    /// Returns `Some(kind)` if the node has terminated (R1.3.4) — the
2714    /// pair `Some(Complete)` / `Some(Error(h))` mirrors the wire message
2715    /// the node emitted. `None` for live nodes or unknown ids.
2716    #[must_use]
2717    pub fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
2718        self.lock_state()
2719            .nodes
2720            .get(&node_id)
2721            .and_then(|r| r.terminal)
2722    }
2723
2724    /// Whether the node has wave-scoped DIRTY pending (a tier-1 message
2725    /// queued but the matching tier-3 settle has not yet flushed).
2726    /// `false` for unknown ids. Mostly useful for `describe()` status
2727    /// classification (R3.6.1 `"dirty"`).
2728    #[must_use]
2729    pub fn is_dirty(&self, node_id: NodeId) -> bool {
2730        self.lock_state()
2731            .nodes
2732            .get(&node_id)
2733            .is_some_and(|r| r.dirty)
2734    }
2735
2736    /// Snapshot of `parent`'s meta companion list (R1.3.9.d / R2.3.3 —
2737    /// the companions added via [`Self::add_meta_companion`]). Empty
2738    /// for unknown ids or for nodes with no companions registered.
2739    ///
2740    /// Used by the graph layer's `signal_invalidate` to filter meta
2741    /// children out of the broadcast (canonical R3.7.2 — meta caches
2742    /// are preserved across graph-wide INVALIDATE).
2743    #[must_use]
2744    pub fn meta_companions_of(&self, parent: NodeId) -> Vec<NodeId> {
2745        self.lock_state()
2746            .nodes
2747            .get(&parent)
2748            .map(|r| r.meta_companions.clone())
2749            .unwrap_or_default()
2750    }
2751
2752    // -------------------------------------------------------------------
2753    // Wave engine — lives in `crate::batch` (Slice C-1 module split;
2754    // Slice A close M1 refactor lifted the binding-callback re-entrance
2755    // restrictions). The methods are still on `Core`; see `batch.rs` for:
2756    //
2757    //   - `run_wave` — wave entry, manages own locking.
2758    //   - `drain_and_flush` — drain phase, lock-released around invoke_fn.
2759    //   - `commit_emission` — lock-released around custom_equals.
2760    //   - `pick_next_fire`, `deliver_data_to_consumer`, `queue_notify`,
2761    //     `flush_notifications` — wave-engine helpers.
2762    // -------------------------------------------------------------------
2763}
2764
2765// -----------------------------------------------------------------------
2766// COMPLETE / ERROR — terminal lifecycle + auto-cascade gating
2767// -----------------------------------------------------------------------
2768
2769impl Core {
2770    /// Emit `[COMPLETE]` (R1.3.4) on `node_id`, marking it terminal. After
2771    /// this call:
2772    ///
2773    /// - Subsequent `Core::emit` on this node is a silent no-op (idempotent
2774    ///   termination).
2775    /// - The node's fn no longer fires.
2776    /// - The node's cache is preserved (last value still observable via
2777    ///   `cache_of`).
2778    /// - Children receive `[COMPLETE]` (tier 5 — bypasses pause buffer).
2779    /// - Auto-cascade gating (Lock 2.B): each child that has all of its
2780    ///   deps in a terminal state auto-emits its own `[COMPLETE]`. ERROR
2781    ///   dominates COMPLETE — if any of a child's deps emitted ERROR, the
2782    ///   child auto-cascades that ERROR instead.
2783    ///
2784    /// Idempotent: calling `complete` on an already-terminal node is a no-op.
2785    ///
2786    /// # Panics
2787    ///
2788    /// Panics if `node_id` is unknown.
2789    pub fn complete(&self, node_id: NodeId) {
2790        self.emit_terminal(node_id, TerminalKind::Complete);
2791    }
2792
2793    /// Emit `[ERROR, error_handle]` (R1.3.4) on `node_id`. `error_handle`
2794    /// must resolve to a non-sentinel value (R1.2.5) — the binding side has
2795    /// already interned the error value before this call. Same lifecycle
2796    /// effects as [`Self::complete`]; ERROR dominates COMPLETE in auto-
2797    /// cascade gating.
2798    ///
2799    /// # Panics
2800    ///
2801    /// Panics if `node_id` is unknown or `error_handle == NO_HANDLE`.
2802    pub fn error(&self, node_id: NodeId, error_handle: HandleId) {
2803        assert!(
2804            error_handle != NO_HANDLE,
2805            "NO_HANDLE is not a valid ERROR payload (R1.2.5)"
2806        );
2807        self.emit_terminal(node_id, TerminalKind::Error(error_handle));
2808        // The caller's intern share for `error_handle` is NOT transferred
2809        // to any slot — `terminate_node` takes its OWN retain for every
2810        // populated `terminal` and `dep_terminals` slot. Release the
2811        // caller's share here (mirrors `Core::emit`'s short-circuit
2812        // release on terminal). Without this, every `error()` call leaks
2813        // one binding-side handle ref. Slice A-bigger /qa item D fix.
2814        self.binding.release_handle(error_handle);
2815    }
2816
2817    fn emit_terminal(&self, node_id: NodeId, terminal: TerminalKind) {
2818        {
2819            let s = self.lock_state();
2820            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
2821        }
2822        // Wave runs with `run_wave` orchestrating drain. The thunk acquires
2823        // its own lock to queue the cascade (terminate_node is a fast
2824        // structural walk; no binding callbacks beyond non-re-entrant
2825        // retain/release).
2826        self.run_wave(|this| {
2827            let mut s = this.lock_state();
2828            this.terminate_node(&mut s, node_id, terminal);
2829        });
2830    }
2831
2832    /// Set the node's terminal slot, queue the wire message, and cascade to
2833    /// children. Idempotent on already-terminal node (no-op).
2834    ///
2835    /// Iterative implementation (Slice A-bigger, M1-close): a work-queue
2836    /// drives the cascade so deep linear chains don't overflow the OS
2837    /// thread stack. Mirrors `path_from_to`'s explicit-stack pattern.
2838    fn terminate_node(&self, s: &mut CoreState, node_id: NodeId, terminal: TerminalKind) {
2839        let mut work: Vec<(NodeId, TerminalKind)> = vec![(node_id, terminal)];
2840        while let Some((id, t)) = work.pop() {
2841            if s.require_node(id).terminal.is_some() {
2842                continue; // Idempotent — already terminal.
2843            }
2844            // Take a refcount share for the terminal slot so the error
2845            // handle outlives the binding-side intern's transient share.
2846            if let TerminalKind::Error(h) = t {
2847                self.binding.retain_handle(h);
2848            }
2849            // Slice E2 /qa Q2(b) (D069): if a resubscribable node is
2850            // terminating with no live subscribers, queue eager
2851            // `wipe_ctx` for the wave's lock-released drain. This is the
2852            // mutually-exclusive complement of the `Subscription::Drop`
2853            // wipe site: when the LAST sub drops first then terminate
2854            // fires, subs are empty here and we queue; when terminate
2855            // fires WITH subs still alive, we DON'T queue (subs not
2856            // empty), and `Subscription::Drop` will fire wipe directly
2857            // when those subs eventually drop. Either way, exactly one
2858            // wipe fires per terminal lifecycle.
2859            let queue_wipe = {
2860                let rec = s.require_node(id);
2861                rec.resubscribable && rec.subscribers.is_empty()
2862            };
2863            s.require_node_mut(id).terminal = Some(t);
2864            if queue_wipe {
2865                s.pending_wipes.push(id);
2866            }
2867            // Drain pending fires for this node — fn won't fire on a
2868            // terminal node.
2869            s.pending_fires.remove(&id);
2870            // R1.3.8.b / Slice F (A3, 2026-05-07): if this node was paused
2871            // when terminating (the canonical case is the R1.3.8.c overflow
2872            // ERROR synthesis path), drain the pause buffer and release
2873            // each payload's queue_notify-time retain. Without this, the
2874            // buffer leaks one share per buffered DATA/RESOLVED/INVALIDATE.
2875            // Subscribers receive the terminal directly via the cascade
2876            // below (tier-5 bypasses the pause buffer); the buffered
2877            // content is moot post-terminal.
2878            let drained: Vec<HandleId> = {
2879                let rec = s.require_node_mut(id);
2880                let mut drained: Vec<HandleId> = Vec::new();
2881                if rec.pause_state.is_paused() {
2882                    // Take the buffered messages out, then collapse the
2883                    // pause state to Active so subsequent code observes a
2884                    // clean lifecycle. Idempotent on Active (no-op).
2885                    let prev = std::mem::replace(&mut rec.pause_state, PauseState::Active);
2886                    if let PauseState::Paused { buffer, .. } = prev {
2887                        drained.extend(buffer.into_iter().filter_map(Message::payload_handle));
2888                    }
2889                }
2890                // QA A4 (2026-05-07): drain replay buffer on terminate. A
2891                // non-resubscribable terminal ends the lifecycle; without
2892                // this drain the buffer's retains leak until `Drop for
2893                // CoreState`. Resubscribable nodes' replay buffers are
2894                // also drained (when they're hit by a terminal cascade);
2895                // a fresh subscribe rebuilds the buffer from scratch as
2896                // part of `reset_for_fresh_lifecycle`.
2897                drained.extend(rec.replay_buffer.drain(..));
2898                drained
2899            };
2900            for h in drained {
2901                self.binding.release_handle(h);
2902            }
2903            // Queue the wire message (tier 5 — bypasses pause buffer).
2904            let msg = match t {
2905                TerminalKind::Complete => Message::Complete,
2906                TerminalKind::Error(h) => Message::Error(h),
2907            };
2908            self.queue_notify(s, id, msg);
2909            // Cascade to children.
2910            let child_ids: Vec<NodeId> = s
2911                .children
2912                .get(&id)
2913                .map(|c| c.iter().copied().collect())
2914                .unwrap_or_default();
2915            for child_id in child_ids {
2916                let dep_idx = s.require_node(child_id).dep_index_of(id);
2917                let Some(idx) = dep_idx else { continue };
2918                // Mark this child's per-dep terminal slot. Take a retain on
2919                // the error handle for the slot share.
2920                {
2921                    let child = s.require_node_mut(child_id);
2922                    if child.dep_records[idx].terminal.is_some() {
2923                        // Idempotent — child already saw this dep terminate.
2924                        continue;
2925                    }
2926                    child.dep_records[idx].terminal = Some(t);
2927                }
2928                if let TerminalKind::Error(h) = t {
2929                    self.binding.retain_handle(h);
2930                }
2931                // Auto-cascade gating: if all deps now terminal, push child
2932                // onto the work queue with the chosen terminal.
2933                //
2934                // Slice C-1: kinds that opt out of Lock 2.B (currently
2935                // `Operator(Reduce)`) intercept upstream COMPLETE so they
2936                // can emit their accumulator before terminating. Instead of
2937                // cascading, queue the child for fn-fire — `fire_operator`
2938                // sees `dep_records[0].terminal` set and emits the
2939                // appropriate batch (Data(acc) + Complete on COMPLETE,
2940                // Error(h) on ERROR).
2941                let action = {
2942                    let child = s.require_node(child_id);
2943                    if child.terminal.is_some() {
2944                        ChildAction::None // Already terminated — no-op.
2945                    } else if child.all_deps_terminal() {
2946                        if child.skips_auto_cascade() {
2947                            ChildAction::QueueFire
2948                        } else {
2949                            ChildAction::Cascade(pick_cascade_terminal(&child.dep_records))
2950                        }
2951                    } else {
2952                        ChildAction::None
2953                    }
2954                };
2955                match action {
2956                    ChildAction::None => {}
2957                    ChildAction::Cascade(t_child) => {
2958                        work.push((child_id, t_child));
2959                    }
2960                    ChildAction::QueueFire => {
2961                        s.pending_fires.insert(child_id);
2962                    }
2963                }
2964            }
2965        }
2966    }
2967}
2968
2969/// Outcome of Lock 2.B child gating in `terminate_node`'s cascade walk.
2970enum ChildAction {
2971    /// No cascade; child is already terminal or not yet all-deps-terminal.
2972    None,
2973    /// Auto-cascade with the picked terminal kind (ERROR dominates COMPLETE).
2974    Cascade(TerminalKind),
2975    /// Queue child for fn-fire instead of cascading. Used by operator
2976    /// kinds that intercept upstream terminal (Operator(Reduce)).
2977    QueueFire,
2978}
2979
2980/// Lock 2.B cascade-terminal selection: ERROR dominates COMPLETE; first
2981/// ERROR seen wins. Caller has already verified all deps are terminal.
2982fn pick_cascade_terminal(dep_records: &[DepRecord]) -> TerminalKind {
2983    for dr in dep_records {
2984        if let Some(TerminalKind::Error(h)) = dr.terminal {
2985            return TerminalKind::Error(h);
2986        }
2987    }
2988    TerminalKind::Complete
2989}
2990
2991// -----------------------------------------------------------------------
2992// TEARDOWN — destruction, with auto-COMPLETE prepend (R2.6.4 / Lock 6.F)
2993// -----------------------------------------------------------------------
2994
2995impl Core {
2996    /// Tear `node_id` down. Per R2.6.4 / Lock 6.F:
2997    ///
2998    /// - **Auto-prepend COMPLETE.** If the node has not yet emitted a
2999    ///   terminal (`COMPLETE` / `ERROR`), `terminate_node` is called with
3000    ///   `Complete` first so subscribers see `[COMPLETE, TEARDOWN]`, not
3001    ///   bare `[TEARDOWN]`. This guarantees a clean end-of-stream signal
3002    ///   to async iterators and other consumers that wait on terminal
3003    ///   delivery.
3004    /// - **Idempotent on duplicate delivery.** The per-node
3005    ///   `has_received_teardown` flag is set on the first call; subsequent
3006    ///   `teardown` calls (or cascade visits from other paths) are silent
3007    ///   no-ops — no second `[COMPLETE, TEARDOWN]` pair to subscribers.
3008    /// - **Cascade downstream.** Each child is recursively torn down. The
3009    ///   child's own COMPLETE auto-cascades from `terminate_node`'s logic
3010    ///   (Lock 2.B); its TEARDOWN comes from this cascade.
3011    ///
3012    /// # Panics
3013    ///
3014    /// Panics if `node_id` is unknown.
3015    pub fn teardown(&self, node_id: NodeId) {
3016        {
3017            let s = self.lock_state();
3018            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
3019        }
3020        let torn_down: Arc<Mutex<Vec<NodeId>>> = Arc::new(Mutex::new(Vec::new()));
3021        let torn_down_for_wave = torn_down.clone();
3022        self.run_wave(move |this| {
3023            let mut s = this.lock_state();
3024            let collected = this.teardown_inner(&mut s, node_id);
3025            torn_down_for_wave.lock().extend(collected);
3026        });
3027        // Fire NodeTornDown for every cascaded id (root + metas +
3028        // downstream consumers that auto-cascaded). Outside the state
3029        // lock, matching fire_topology_event discipline.
3030        let ids = std::mem::take(&mut *torn_down.lock());
3031        for id in ids {
3032            self.fire_topology_event(&crate::topology::TopologyEvent::NodeTornDown(id));
3033        }
3034    }
3035
3036    /// Iterative teardown walk (Slice A-bigger, M1-close).
3037    ///
3038    /// The recursive shape was:
3039    ///   ```text
3040    ///   teardown(n):
3041    ///     if torn_down: return
3042    ///     mark torn_down
3043    ///     for meta in metas: teardown(meta)
3044    ///     terminate_node + queue Teardown
3045    ///     for child in children: teardown(child)
3046    ///   ```
3047    /// Deep linear chains (~10k nodes) overflowed the OS thread stack.
3048    ///
3049    /// The iterative shape uses a `Vec<Action>` stack with `Visit` and
3050    /// `EmitTeardown` actions. `Visit(n)` marks `n` torn-down (or no-ops
3051    /// if already), then pushes (in reverse order so LIFO pops in forward
3052    /// order) `Visit(child_K), …, Visit(child_1), EmitTeardown(n),
3053    /// Visit(meta_M), …, Visit(meta_1)`. The R1.3.9.d "metas first, then
3054    /// self, then children" ordering is preserved by the push order:
3055    /// metas pop first, recursively expand and emit; then `EmitTeardown(n)`
3056    /// pops and runs `terminate_node` + queue `Teardown`; then children
3057    /// pop. Idempotency via `has_received_teardown` keeps each node
3058    /// visited at most once even when multi-parent diamonds re-enter via
3059    /// a sibling path.
3060    fn teardown_inner(&self, s: &mut CoreState, root: NodeId) -> Vec<NodeId> {
3061        enum Action {
3062            Visit(NodeId),
3063            EmitTeardown(NodeId),
3064        }
3065        let mut stack: Vec<Action> = vec![Action::Visit(root)];
3066        // Topology accumulator: every node that actually emits TEARDOWN
3067        // (i.e. each `EmitTeardown(id)` site, NOT each `Visit` — visits
3068        // for already-torn-down nodes short-circuit on idempotency).
3069        let mut torn_down: Vec<NodeId> = Vec::new();
3070        while let Some(action) = stack.pop() {
3071            match action {
3072                Action::Visit(id) => {
3073                    if s.require_node(id).has_received_teardown {
3074                        continue; // Idempotent (R2.6.4).
3075                    }
3076                    s.require_node_mut(id).has_received_teardown = true;
3077                    // Push order: children first (pop LAST), then
3078                    // EmitTeardown(id), then metas (pop FIRST). Reverse
3079                    // each list so within-group order matches the original
3080                    // recursive iteration.
3081                    let children: Vec<NodeId> = s
3082                        .children
3083                        .get(&id)
3084                        .map(|c| c.iter().copied().collect())
3085                        .unwrap_or_default();
3086                    for &child in children.iter().rev() {
3087                        stack.push(Action::Visit(child));
3088                    }
3089                    stack.push(Action::EmitTeardown(id));
3090                    let metas: Vec<NodeId> = s.require_node(id).meta_companions.clone();
3091                    for &meta in metas.iter().rev() {
3092                        stack.push(Action::Visit(meta));
3093                    }
3094                }
3095                Action::EmitTeardown(id) => {
3096                    // Auto-prepend COMPLETE if not yet terminal. The (now
3097                    // iterative) terminate_node handles auto-cascade to
3098                    // children's own terminal slots per Lock 2.B.
3099                    let already_terminal = s.require_node(id).terminal.is_some();
3100                    if !already_terminal {
3101                        self.terminate_node(s, id, TerminalKind::Complete);
3102                    }
3103                    // Wire emission of the TEARDOWN itself (tier 6).
3104                    self.queue_notify(s, id, Message::Teardown);
3105                    torn_down.push(id);
3106                }
3107            }
3108        }
3109        torn_down
3110    }
3111
3112    /// Attach `companion` as a meta companion of `parent` per R1.3.9.d.
3113    /// Meta companions are nodes whose lifecycle is bound to the parent's
3114    /// in TEARDOWN ordering: when `parent` tears down, `companion` tears
3115    /// down first.
3116    ///
3117    /// Use this for inspection / audit / sidecar nodes that subscribe to
3118    /// parent state — without the ordering, the companion could observe
3119    /// the parent mid-destruction and emit garbage.
3120    ///
3121    /// Idempotent on duplicate registration of the same companion.
3122    ///
3123    /// # Lifecycle constraint
3124    ///
3125    /// Intended for **setup-time** wiring — call this before `parent` or
3126    /// `companion` enters a wave. Mid-wave registration (especially during
3127    /// a teardown cascade in flight) is implementation-defined: the new
3128    /// edge takes effect on the *next* wave. Adding a companion to a
3129    /// torn-down parent silently no-ops (the parent will not tear down
3130    /// again). For dynamic companion attachment with deterministic
3131    /// ordering, prefer constructing the wiring before subscribers exist.
3132    ///
3133    /// # Panics
3134    ///
3135    /// Panics if either node id is unknown, or if `parent == companion`
3136    /// (a node cannot be its own meta companion — would loop on TEARDOWN).
3137    pub fn add_meta_companion(&self, parent: NodeId, companion: NodeId) {
3138        assert!(parent != companion, "node cannot be its own meta companion");
3139        let mut s = self.lock_state();
3140        assert!(s.nodes.contains_key(&parent), "unknown parent {parent:?}");
3141        assert!(
3142            s.nodes.contains_key(&companion),
3143            "unknown companion {companion:?}"
3144        );
3145        let metas = &mut s.require_node_mut(parent).meta_companions;
3146        if !metas.contains(&companion) {
3147            metas.push(companion);
3148        }
3149    }
3150}
3151
3152// -----------------------------------------------------------------------
3153// INVALIDATE — cache clear + downstream cascade
3154// -----------------------------------------------------------------------
3155
3156impl Core {
3157    /// Clear `node_id`'s cache and cascade `[INVALIDATE]` to downstream
3158    /// dependents per canonical spec §1.4.
3159    ///
3160    /// Semantics:
3161    /// - **Never-populated case (R1.4 line 197):** if `cache == NO_HANDLE`,
3162    ///   the call is a no-op — no cache to clear, no INVALIDATE emitted.
3163    ///   This naturally provides idempotency within a wave: once a node has
3164    ///   been invalidated this wave (cache = NO_HANDLE), a second invalidate
3165    ///   on the same node does nothing.
3166    /// - **Cache clear (immediate):** the node's cached handle is dropped
3167    ///   (refcount released), `cache` becomes `NO_HANDLE`. State nodes
3168    ///   keep `has_fired_once` per spec — INVALIDATE is not a re-gating
3169    ///   event (the next emission to a previously-fired state still does
3170    ///   not re-trigger the first-run gate; that's a resubscribable-terminal
3171    ///   lifecycle concern, separate slice).
3172    /// - **Wire emission (tier 4):** `[INVALIDATE]` is queued via the
3173    ///   normal pause-aware notify path. Buffers while paused, flushes
3174    ///   immediately otherwise.
3175    /// - **Downstream cascade:** for each child of this node, the child's
3176    ///   `dep_handles[idx_of_node]` is reset to `NO_HANDLE` (its previous
3177    ///   value referenced a now-released handle). The child is then
3178    ///   recursively invalidated (no-op if its cache was already
3179    ///   `NO_HANDLE`). This re-closes the child's first-run gate — fn
3180    ///   won't fire again until the upstream re-emits a value.
3181    ///
3182    /// Wraps in a fresh wave when called from outside a wave, so
3183    /// notifications flush at the natural wave boundary.
3184    ///
3185    /// # Panics
3186    ///
3187    /// Panics if `node_id` is unknown, consistent with `emit` / `pause`.
3188    pub fn invalidate(&self, node_id: NodeId) {
3189        {
3190            let s = self.lock_state();
3191            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
3192        }
3193        self.run_wave(|this| {
3194            let mut s = this.lock_state();
3195            this.invalidate_inner(&mut s, node_id);
3196        });
3197    }
3198
3199    /// Iterative invalidate cascade (Slice A-bigger, M1-close).
3200    ///
3201    /// The recursive shape was a depth-first cache-clear walk:
3202    ///   ```text
3203    ///   invalidate(n):
3204    ///     if cache(n) == NO_HANDLE: return  // already-invalidated guard
3205    ///     cache(n) = NO_HANDLE; release handle
3206    ///     queue Invalidate(n)
3207    ///     for child in children:
3208    ///       child.dep_handles[idx] = NO_HANDLE
3209    ///       invalidate(child)
3210    ///   ```
3211    /// Deep linear chains overflowed the OS thread stack. The work-queue
3212    /// rewrite has no ordering subtleties (unlike teardown's R1.3.9.d
3213    /// metas-first constraint) — Invalidate is a tier-4 broadcast where
3214    /// the never-populated / already-invalidated guard provides natural
3215    /// idempotency for diamond fan-in.
3216    fn invalidate_inner(&self, s: &mut CoreState, root: NodeId) {
3217        let mut work: Vec<NodeId> = vec![root];
3218        while let Some(node_id) = work.pop() {
3219            // Never-populated / already-invalidated: no-op (R1.4 idempotency).
3220            // Per R1.3.9.c never-populated case, OnInvalidate cleanup hook
3221            // also does NOT fire — natural fallout of skipping via the
3222            // cache==NO_HANDLE guard (we never reach the queue-push below).
3223            let old_handle = s.require_node(node_id).cache;
3224            if old_handle == NO_HANDLE {
3225                continue;
3226            }
3227            // Clear cache + release the handle's slot ownership.
3228            s.require_node_mut(node_id).cache = NO_HANDLE;
3229            self.binding.release_handle(old_handle);
3230            // Slice E2 (R1.3.9.b strict per D057 + D058 fire-at-cache-clear):
3231            // queue OnInvalidate cleanup hook for lock-released drain at
3232            // wave-end. The dedup set guarantees at-most-once-per-wave-per-
3233            // node firing even if a node re-populates mid-wave (via fn-fire
3234            // emit) and gets re-invalidated through a separate path. Pure
3235            // cache==NO_HANDLE idempotency (above) catches "still at
3236            // sentinel" only; the explicit set is the strict R1.3.9.b
3237            // reading.
3238            if s.invalidate_hooks_fired_this_wave.insert(node_id) {
3239                s.deferred_cleanup_hooks
3240                    .push((node_id, CleanupTrigger::OnInvalidate));
3241            }
3242            // Wire emission. Pause-aware via queue_notify.
3243            self.queue_notify(s, node_id, Message::Invalidate);
3244            // Cascade: for each child, clear the dep record's prev_data
3245            // referencing this node and push child onto the work queue.
3246            let child_ids: Vec<NodeId> = s
3247                .children
3248                .get(&node_id)
3249                .map(|c| c.iter().copied().collect())
3250                .unwrap_or_default();
3251            for child_id in child_ids {
3252                let dep_idx = s.require_node(child_id).dep_index_of(node_id);
3253                if let Some(idx) = dep_idx {
3254                    // Reset the child's dep record — the handle was just
3255                    // released. Subsequent first-run-gate checks see
3256                    // sentinel and re-close.
3257                    //
3258                    // Snapshot prev_data + data_batch retains for deferred
3259                    // release, then clear the record. Two-phase to satisfy
3260                    // the borrow checker (nodes + deferred_handle_releases
3261                    // are separate CoreState fields).
3262                    let (old_prev, batch_hs): (HandleId, SmallVec<[HandleId; 1]>) = {
3263                        let dr = &s.require_node(child_id).dep_records[idx];
3264                        (dr.prev_data, dr.data_batch.clone())
3265                    };
3266                    if old_prev != NO_HANDLE {
3267                        s.deferred_handle_releases.push(old_prev);
3268                    }
3269                    for h in batch_hs {
3270                        s.deferred_handle_releases.push(h);
3271                    }
3272                    let dr = &mut s.require_node_mut(child_id).dep_records[idx];
3273                    dr.prev_data = NO_HANDLE;
3274                    dr.data_batch.clear();
3275                    work.push(child_id);
3276                }
3277            }
3278        }
3279    }
3280}
3281
3282// -----------------------------------------------------------------------
3283// PAUSE / RESUME — multi-pauser lockset + replay buffer
3284// -----------------------------------------------------------------------
3285
3286/// Reported back from [`Core::resume`] when the final lock releases.
3287///
3288/// `replayed` is the number of tier-3/tier-4 messages dispatched to
3289/// subscribers as part of the drain. `dropped` is the number of messages
3290/// that fell out the front of the buffer due to the Core-global
3291/// `pause_buffer_cap` while this pause cycle was active. A non-zero
3292/// `dropped` indicates a controller held the lock long enough to overflow
3293/// the cap; the binding may want to surface a warning or error.
3294#[derive(Copy, Clone, Debug, PartialEq, Eq)]
3295pub struct ResumeReport {
3296    pub replayed: u32,
3297    pub dropped: u32,
3298}
3299
3300impl Core {
3301    /// Acquire a pause lock on `node_id`. The first lock transitions the
3302    /// node from `Active` to `Paused`; further locks add to the lockset.
3303    /// While paused, tier-3 (DATA/RESOLVED) and tier-4 (INVALIDATE) outgoing
3304    /// messages buffer in the node's pause buffer; other tiers flush
3305    /// immediately.
3306    ///
3307    /// Re-acquiring the same `lock_id` is an idempotent no-op (matches TS
3308    /// convention, R1.2.6 silent on the case).
3309    pub fn pause(&self, node_id: NodeId, lock_id: LockId) -> Result<(), PauseError> {
3310        let mut s = self.lock_state();
3311        let rec = s
3312            .nodes
3313            .get_mut(&node_id)
3314            .ok_or(PauseError::UnknownNode(node_id))?;
3315        // QA A5 (2026-05-07): terminated nodes can't be re-paused. Without
3316        // this check, a stale pause-controller calling pause() on an
3317        // already-terminated node would re-arm `pause_state` to Paused.
3318        // The terminate_node path collapses pause_state → Active and
3319        // drains the buffer (A3-related), but doesn't gate subsequent
3320        // pause() calls. Treat as idempotent no-op (consistent with how
3321        // emit/complete/error early-return on terminal).
3322        if rec.terminal.is_some() {
3323            return Ok(());
3324        }
3325        // Slice F audit close (2026-05-07): `PausableMode::Off` means the
3326        // dispatcher ignores PAUSE for this node — tier-3 flushes
3327        // immediately, fn fires immediately. Treat the call as a successful
3328        // no-op so callers don't need to special-case.
3329        if rec.pausable == PausableMode::Off {
3330            return Ok(());
3331        }
3332        rec.pause_state.add_lock(lock_id);
3333        Ok(())
3334    }
3335
3336    /// Release a pause lock on `node_id`. If the lockset becomes empty, the
3337    /// node transitions back to `Active` and the buffered messages are
3338    /// dispatched to subscribers in arrival order. Returns a [`ResumeReport`]
3339    /// when the final lock released; `None` if the lockset is still
3340    /// non-empty (further locks held).
3341    ///
3342    /// Releasing an unknown `lock_id` (or releasing on an already-Active
3343    /// node) is an idempotent no-op returning `None`.
3344    pub fn resume(
3345        &self,
3346        node_id: NodeId,
3347        lock_id: LockId,
3348    ) -> Result<Option<ResumeReport>, PauseError> {
3349        // Phase 1 (lock-held): collect drained buffer + pending-wave flag +
3350        // sink Arcs. For default-mode nodes whose `pending_wave` was set
3351        // during pause, schedule a single fn-fire by adding to
3352        // `pending_fires` BEFORE we exit the lock — the wave engine picks
3353        // it up on the next drain tick.
3354        let (sinks, messages, dropped, pending_wave_for_default) = {
3355            let mut s = self.lock_state();
3356            let rec = s
3357                .nodes
3358                .get_mut(&node_id)
3359                .ok_or(PauseError::UnknownNode(node_id))?;
3360            // For Off mode, pause/resume are no-ops by construction.
3361            if rec.pausable == PausableMode::Off {
3362                return Ok(None);
3363            }
3364            let was_default_mode = rec.pausable == PausableMode::Default;
3365            // Capture pending_wave BEFORE remove_lock collapses the state.
3366            let pending_wave = if was_default_mode {
3367                rec.pause_state.take_pending_wave()
3368            } else {
3369                false
3370            };
3371            let Some((buffer, dropped)) = rec.pause_state.remove_lock(lock_id) else {
3372                // Not the final-resume — restore the pending_wave flag we
3373                // tentatively cleared, since we're not transitioning to
3374                // Active yet.
3375                if pending_wave {
3376                    rec.pause_state.mark_pending_wave();
3377                }
3378                return Ok(None);
3379            };
3380            let sinks: Vec<Sink> = rec.subscribers.values().cloned().collect();
3381            let messages: Vec<Message> = buffer.into_iter().collect();
3382            // Default-mode pending-wave handling: schedule the fn-fire so
3383            // the wave engine consolidates the pause-window dep deliveries
3384            // into one fn execution. State nodes don't fire fn (no
3385            // `pending_fires` membership has effect for them).
3386            if pending_wave && was_default_mode {
3387                s.pending_fires.insert(node_id);
3388            }
3389            (sinks, messages, dropped, pending_wave && was_default_mode)
3390        };
3391        let replayed = u32::try_from(messages.len()).unwrap_or(u32::MAX);
3392
3393        // Phase 2 (lock-released): fire sinks for ResumeAll-buffered
3394        // messages. Default-mode resume produces no buffered replay (the
3395        // consolidated fn-fire produces fresh wave traffic via the standard
3396        // commit_emission path).
3397        if !messages.is_empty() {
3398            for sink in &sinks {
3399                sink(&messages);
3400            }
3401            // Phase 3: balance the retain_handle calls done at buffer-push
3402            // time — sinks observe values but don't own refcount shares.
3403            for msg in &messages {
3404                if let Some(h) = msg.payload_handle() {
3405                    self.binding.release_handle(h);
3406                }
3407            }
3408        }
3409
3410        // Phase 4 (default-mode): drain the consolidated fn-fire scheduled
3411        // in Phase 1. `run_wave` re-acquires `wave_owner` reentrantly + runs
3412        // the standard drain pipeline; the new fn-fire emerges as a normal
3413        // wave's worth of messages to subscribers.
3414        if pending_wave_for_default {
3415            self.run_wave(|_this| {
3416                // The pending_fires entry was pushed in Phase 1 under the
3417                // lock. run_wave's drain picks it up.
3418            });
3419        }
3420        Ok(Some(ResumeReport { replayed, dropped }))
3421    }
3422
3423    /// True if the node currently holds at least one pause lock.
3424    #[must_use]
3425    pub fn is_paused(&self, node_id: NodeId) -> bool {
3426        self.state
3427            .lock()
3428            .require_node(node_id)
3429            .pause_state
3430            .is_paused()
3431    }
3432
3433    /// Number of pause locks currently held on `node_id`. `0` if Active.
3434    #[must_use]
3435    pub fn pause_lock_count(&self, node_id: NodeId) -> usize {
3436        self.state
3437            .lock()
3438            .require_node(node_id)
3439            .pause_state
3440            .lock_count()
3441    }
3442
3443    /// Test helper: whether `node_id` currently holds the given `lock_id`.
3444    #[must_use]
3445    pub fn holds_pause_lock(&self, node_id: NodeId, lock_id: LockId) -> bool {
3446        self.state
3447            .lock()
3448            .require_node(node_id)
3449            .pause_state
3450            .contains_lock(lock_id)
3451    }
3452}
3453
3454// -----------------------------------------------------------------------
3455// set_deps — atomic dep mutation
3456// -----------------------------------------------------------------------
3457
3458/// Errors returnable by [`Core::set_deps`].
3459///
3460/// Per `~/src/graphrefly-ts/docs/research/rewire-design-notes.md` and the
3461/// Phase 13.8 Q1 lock:
3462/// - `SelfDependency` — `n in newDeps` (self-loops are pathological without
3463///   explicit fixed-point semantics, which GraphReFly does not provide).
3464/// - `WouldCreateCycle { path }` — adding the new edge would create a cycle.
3465///   The `path` field reports the offending dep chain for debuggability.
3466/// - `UnknownNode` / `NotComputeNode` — invariant violations from the caller.
3467/// - `TerminalNode` — `n` itself has emitted COMPLETE/ERROR; rewiring a
3468///   terminal stream is a category error (terminal is one-shot at this
3469///   layer; recovery is the resubscribable path on a fresh subscribe).
3470/// - `TerminalDep` — a newly-added dep is terminal AND not resubscribable.
3471///   Resubscribable terminal deps are accepted because the subscribe path
3472///   resets their lifecycle. Non-resubscribable terminal deps would deliver
3473///   their already-emitted terminal directly to `n`'s `dep_terminals` slot,
3474///   which is rarely intended.
3475#[derive(Error, Debug, Clone, PartialEq)]
3476pub enum SetDepsError {
3477    /// `n` appeared in `new_deps` (self-loop rejection).
3478    #[error("set_deps({n:?}, ...): self-dependency rejected (n appeared in new_deps)")]
3479    SelfDependency { n: NodeId },
3480
3481    /// Adding the new dep would create a cycle. `path` is the chain
3482    /// `[added_dep, ..., n]` reachable via existing deps.
3483    #[error(
3484        "set_deps({n:?}, ...): cycle would form via path {path:?} \
3485         (adding {added_dep:?} → {n:?} closes the loop)"
3486    )]
3487    WouldCreateCycle {
3488        n: NodeId,
3489        added_dep: NodeId,
3490        path: Vec<NodeId>,
3491    },
3492
3493    #[error("set_deps: unknown node {0:?}")]
3494    UnknownNode(NodeId),
3495
3496    #[error("set_deps: node {0:?} is not a compute node (state nodes have no deps)")]
3497    NotComputeNode(NodeId),
3498
3499    /// `n` itself has terminated (COMPLETE / ERROR). Rewiring a terminal node
3500    /// is rejected — the stream has ended at this layer. To recover, mark
3501    /// the node resubscribable before terminate; a fresh subscribe will then
3502    /// reset its lifecycle.
3503    #[error("set_deps({n:?}, ...): node has already terminated; cannot rewire a terminal node")]
3504    TerminalNode { n: NodeId },
3505
3506    /// A newly-added dep is terminal AND non-resubscribable. Per Phase 13.8
3507    /// Q1, this is rejected; resubscribable terminal deps are allowed
3508    /// because the subscribe path resets them when activated. Already-present
3509    /// terminal deps are unaffected (their terminal status was accepted at
3510    /// the time they terminated).
3511    #[error(
3512        "set_deps({n:?}, ...): added dep {dep:?} is terminal and not resubscribable; \
3513         either mark it resubscribable before terminate, or remove the dep from new_deps"
3514    )]
3515    TerminalDep { n: NodeId, dep: NodeId },
3516
3517    /// `n` itself is currently mid-fire — a user fn for `n` re-entered Core
3518    /// via `set_deps(n, ...)` from inside `n`'s own `invoke_fn` /
3519    /// `project_each` / `predicate_each` / etc. Phase 1 of the dispatcher
3520    /// snapshotted `dep_handles` BEFORE the lock-released callback; the
3521    /// callback returning a `tracked` set indexed against THAT ordering
3522    /// would corrupt indices if the rewire re-orders deps mid-fire.
3523    /// Rejected to preserve the dynamic-tracked-indices invariant (D1).
3524    ///
3525    /// Workaround: schedule the rewire from a different node's fn (via
3526    /// `Core::emit` on a state node and observing the emit downstream),
3527    /// or perform the rewire after the wave completes (e.g. from a sink
3528    /// callback that is itself outside any fn-fire scope).
3529    ///
3530    /// Slice F (2026-05-07) — A6.
3531    #[error(
3532        "set_deps({n:?}, ...): rejected — node {n:?} is currently mid-fire \
3533         (set_deps from inside the firing node's own fn would corrupt the \
3534         Dynamic `tracked` indices snapshot taken before invoke_fn). \
3535         Schedule the rewire outside this fire scope."
3536    )]
3537    ReentrantOnFiringNode { n: NodeId },
3538}
3539
3540impl Core {
3541    /// Atomic dep mutation — change a node's upstream deps without TEARDOWN
3542    /// cascading and without losing cache.
3543    ///
3544    /// Per the TLA+-verified design at
3545    /// `~/src/graphrefly-ts/docs/research/wave_protocol_rewire.tla`
3546    /// (35,950 distinct states, all 7 invariants clean):
3547    ///
3548    /// - Removed deps: clear dirtyMask bit, drain pending queue, drop DepRecord.
3549    /// - Added deps: SENTINEL prevData; push-on-subscribe if added dep has cached DATA.
3550    /// - Preserved: `firstRunPassed`, `pauseLocks`, `pauseBuffer`, `cache` (ROM/RAM).
3551    /// - Status auto-settles if dirtyMask becomes empty.
3552    /// - Idempotent on `new_deps == current deps`.
3553    /// - Self-rewire `n ∈ new_deps` rejected (`SelfDependency`).
3554    /// - Cycles rejected (`WouldCreateCycle`).
3555    /// - Allowed mid-wave + while paused.
3556    /// - Phase 13.8 Q1: terminal `n` rejected (`TerminalNode`); newly-added
3557    ///   terminal non-resubscribable deps rejected (`TerminalDep`).
3558    ///
3559    /// The body is a single atomic dep-mutation transaction with several
3560    /// discrete validation stages. Splitting would require passing a
3561    /// partially-mutable CoreState across helpers, and the transaction's
3562    /// locality is what makes the F1 refcount-leak collection work.
3563    #[allow(clippy::too_many_lines)]
3564    pub fn set_deps(&self, n: NodeId, new_deps: &[NodeId]) -> Result<(), SetDepsError> {
3565        let mut s = self.lock_state();
3566        // Validate node exists and is compute. Read-once via the helper so
3567        // subsequent code can use `require_node(n)` without re-checking.
3568        let (is_state, is_producer, is_terminal) = {
3569            let rec = s.nodes.get(&n).ok_or(SetDepsError::UnknownNode(n))?;
3570            (rec.is_state(), rec.is_producer(), rec.terminal.is_some())
3571        };
3572        if is_state || is_producer {
3573            // State and Producer nodes have no declared deps — set_deps
3574            // is meaningless. Producer nodes manage their own subscriptions
3575            // through the binding's ProducerCtx; mutating their (empty)
3576            // dep set would not affect that.
3577            return Err(SetDepsError::NotComputeNode(n));
3578        }
3579        // Reject if `n` itself is terminal (Phase 13.8 Q1: terminal nodes
3580        // cannot be rewired; recovery is via resubscribable subscribe).
3581        if is_terminal {
3582            return Err(SetDepsError::TerminalNode { n });
3583        }
3584        // A6 reentrancy guard (Slice F, 2026-05-07): reject if `n` is
3585        // currently mid-fire on the wave-owner thread. Closes the D1 hazard
3586        // where `Phase 1` snapshotted `dep_handles` against pre-rewire dep
3587        // ordering and `Phase 3` would store the returned `tracked` indices
3588        // against post-rewire ordering. Same-thread re-entry is the only
3589        // path that matters — cross-thread emits already block on
3590        // `wave_owner` per the M1 design.
3591        if s.currently_firing.contains(&n) {
3592            return Err(SetDepsError::ReentrantOnFiringNode { n });
3593        }
3594        // Self-rewire rejection.
3595        if new_deps.contains(&n) {
3596            return Err(SetDepsError::SelfDependency { n });
3597        }
3598        // Validate all new deps exist.
3599        for &d in new_deps {
3600            if !s.nodes.contains_key(&d) {
3601                return Err(SetDepsError::UnknownNode(d));
3602            }
3603        }
3604        // Cycle detection: data flows parent → child via the `children` map.
3605        // Adding edge `d → n` (d becomes a dep of n) creates a cycle iff
3606        // `d` is already reachable from `n` via existing data-flow edges
3607        // (so `n → ... → d` exists, and the new `d → n` closes the loop).
3608        // DFS from `n` along `children` edges, looking for each added dep.
3609        let current_deps: HashSet<NodeId> = s.require_node(n).dep_ids().collect();
3610        let new_deps_set: HashSet<NodeId> = new_deps.iter().copied().collect();
3611        let added: HashSet<NodeId> = new_deps_set.difference(&current_deps).copied().collect();
3612        for &d in &added {
3613            if let Some(path) = self.path_from_to(&s, n, d) {
3614                return Err(SetDepsError::WouldCreateCycle {
3615                    n,
3616                    added_dep: d,
3617                    path,
3618                });
3619            }
3620        }
3621        // Phase 13.8 Q1: reject newly-added deps that are terminal AND not
3622        // resubscribable. Resubscribable terminal deps are allowed — the
3623        // subscribe path resets their lifecycle when something activates
3624        // them. Already-present (kept) deps are unaffected; their terminal
3625        // status was accepted at the time they terminated.
3626        for &d in &added {
3627            let dep_rec = s.require_node(d);
3628            if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
3629                return Err(SetDepsError::TerminalDep { n, dep: d });
3630            }
3631        }
3632        // Idempotent fast-path.
3633        if added.is_empty() && current_deps == new_deps_set {
3634            return Ok(());
3635        }
3636        let removed: HashSet<NodeId> = current_deps.difference(&new_deps_set).copied().collect();
3637
3638        // Snapshot old deps (ordered) for topology event, before mutation.
3639        let old_deps_vec: Vec<NodeId> = s.require_node(n).dep_ids_vec();
3640
3641        // Carry out the rewire atomically.
3642        // 1. Build new dep_records, preserving DepRecord state for kept deps.
3643        let new_deps_vec: Vec<NodeId> = new_deps.to_vec();
3644        //
3645        // Refcount discipline (F1 audit fix): each `Some(TerminalKind::Error(h))`
3646        // slot owns a refcount share retained at `terminate_node` time. When a
3647        // dep is REMOVED, its slot is dropped — the corresponding handle's
3648        // share must be released here, otherwise it leaks until Core drop.
3649        // Also release data_batch retains for removed deps.
3650        let (new_dep_records, removed_handles): (Vec<DepRecord>, Vec<HandleId>) = {
3651            let rec = s.require_node(n);
3652            // Index old dep_records by NodeId for O(1) lookup of kept deps.
3653            let old_by_node: HashMap<NodeId, &DepRecord> =
3654                rec.dep_records.iter().map(|dr| (dr.node, dr)).collect();
3655            let new_records: Vec<DepRecord> = new_deps_vec
3656                .iter()
3657                .map(|&d| {
3658                    if let Some(old) = old_by_node.get(&d) {
3659                        // Kept dep: preserve all state (prev_data, data_batch,
3660                        // terminal, wave flags). Subscriptions stay live.
3661                        DepRecord {
3662                            node: d,
3663                            prev_data: old.prev_data,
3664                            dirty: old.dirty,
3665                            involved_this_wave: old.involved_this_wave,
3666                            data_batch: old.data_batch.clone(),
3667                            terminal: old.terminal,
3668                        }
3669                    } else {
3670                        // Added dep: fresh sentinel record.
3671                        DepRecord::new(d)
3672                    }
3673                })
3674                .collect();
3675            // Collect handles to release from REMOVED dep records.
3676            let mut to_release: Vec<HandleId> = Vec::new();
3677            for d in &removed {
3678                if let Some(old) = old_by_node.get(d) {
3679                    if let Some(TerminalKind::Error(h)) = old.terminal {
3680                        to_release.push(h);
3681                    }
3682                    // Release data_batch retains for removed deps.
3683                    for &h in &old.data_batch {
3684                        to_release.push(h);
3685                    }
3686                }
3687            }
3688            (new_records, to_release)
3689        };
3690        // Clear dirtyMask bit by re-emitting the wave-bookkeeping: we don't
3691        // currently model a per-dep dirtyMask explicitly (we use the boolean
3692        // `dirty` flag at node level). Removing a dep's entry from the implicit
3693        // mask is therefore implicit — by removing the dep, future emissions
3694        // from it can't re-arm the bit. The per-dep `involved_this_wave` flag
3695        // stays wave-scoped and gets cleared at wave end. The setDeps action
3696        // itself does NOT change the dirty boolean unless all deps are cleared;
3697        // in that case we settle.
3698        // Slice E2 (D067): on a dynamic node that had previously fired its
3699        // fn, capture `has_fired_once` BEFORE the reset so we can fire
3700        // `OnRerun` cleanup lock-released after `drop(s)` below. Without
3701        // this, the next `fire_regular` Phase 1 would capture
3702        // `has_fired_once = false`, causing Phase 1.5 to skip OnRerun —
3703        // silently dropping the prior activation's cleanup closure when
3704        // the next `invoke_fn` overwrites `current_cleanup`. Per spec
3705        // R2.4.5, `set_deps` does NOT end the activation cycle
3706        // (subscribe→unsubscribe is the cycle boundary), so OnRerun must
3707        // fire on every re-fire including post-set_deps.
3708        let fire_set_deps_on_rerun;
3709        {
3710            let rec = s.require_node_mut(n);
3711            fire_set_deps_on_rerun = rec.is_dynamic && rec.has_fired_once;
3712            rec.dep_records = new_dep_records;
3713            // Re-derive `tracked` for static derived: all indices.
3714            // For dynamic: clear `tracked` AND reset `has_fired_once` so the
3715            // next dep delivery satisfies the first-fire branch in
3716            // `deliver_data_to_consumer` (`!has_fired_once || tracked.contains(...)`).
3717            // Without resetting `has_fired_once`, the cleared `tracked` blocks
3718            // every future fire — fn never re-runs and the dynamic node sits
3719            // on stale cache derived from the old dep set. The next fire
3720            // re-runs fn unconditionally; fn's returned `tracked` then
3721            // repopulates `rec.tracked` and normal selective-deps semantics
3722            // resume from the next dep update onward.
3723            if rec.is_dynamic {
3724                rec.tracked.clear();
3725                rec.has_fired_once = false;
3726            } else {
3727                // Derived (static) and Operator track all deps.
3728                rec.tracked = (0..new_deps_vec.len()).collect();
3729            }
3730        }
3731
3732        // 2. Update inverted-edge map (children).
3733        for &removed_dep in &removed {
3734            if let Some(set) = s.children.get_mut(&removed_dep) {
3735                set.remove(&n);
3736            }
3737        }
3738        for &added_dep in &added {
3739            s.children.entry(added_dep).or_default().insert(n);
3740        }
3741
3742        // 3. Push-on-subscribe for added deps with cached DATA. Wraps in a
3743        // wave so any downstream propagation runs cleanly. We capture only
3744        // the LIST of added deps (not their cache values) because the cache
3745        // can change between releasing the validation lock and the wave's
3746        // re-acquisition — see the P2 race fix below.
3747        //
3748        // P2 (Slice A close /qa) — between `drop(s)` and `run_wave`'s
3749        // closure re-acquiring the lock, a concurrent thread could
3750        // invalidate one of the added deps, releasing its cache handle. A
3751        // pre-snapshot of `(added_dep, cache)` pairs would then carry a
3752        // dangling HandleId into `deliver_data_to_consumer`. The fix is to
3753        // re-read each added dep's `cache` INSIDE the closure (under the
3754        // freshly re-acquired state lock). The wave-owner re-entrant mutex
3755        // (Q2) blocks concurrent waves once we enter `run_wave`, so the
3756        // re-read sees a coherent post-validation state.
3757        let added_for_wave: Vec<NodeId> = added.iter().copied().collect();
3758        // Drop the state lock before run_wave (which acquires its own) and
3759        // before crossing the binding boundary for the F1 refcount-fix
3760        // releases. Keeps the lock-discipline split (binding calls outside
3761        // the state lock) consistent with the rest of the dispatcher.
3762        drop(s);
3763        // Slice E2 (D067): fire OnRerun lock-released for dynamic nodes
3764        // that had previously fired. The cleanup closure cleans up
3765        // resources tied to the old dep shape before the next fn-fire
3766        // (triggered by added-dep push-on-subscribe below) registers a
3767        // fresh cleanup spec. Direct fire (NOT via deferred_cleanup_hooks)
3768        // because set_deps may NOT enter a wave (no added deps → no
3769        // run_wave below) — queueing the hook would orphan it until the
3770        // next unrelated wave drains.
3771        if fire_set_deps_on_rerun {
3772            self.binding.cleanup_for(n, CleanupTrigger::OnRerun);
3773        }
3774        // Fire topology event after lock is dropped.
3775        self.fire_topology_event(&crate::topology::TopologyEvent::DepsChanged {
3776            node: n,
3777            old_deps: old_deps_vec,
3778            new_deps: new_deps_vec.clone(),
3779        });
3780        if !added_for_wave.is_empty() {
3781            self.run_wave(|this| {
3782                let mut s = this.lock_state();
3783                // Defensive: re-validate `n` still exists and isn't terminal.
3784                // A concurrent path could have terminated it between
3785                // validation and run_wave's wave_owner acquisition.
3786                if !s.nodes.contains_key(&n) || s.require_node(n).terminal.is_some() {
3787                    return;
3788                }
3789                for added_dep in &added_for_wave {
3790                    // Re-read cache under the wave-owner-held lock — this
3791                    // is the post-validation, post-concurrent-action
3792                    // snapshot. NO_HANDLE means the dep was invalidated
3793                    // concurrently; skip (no data to push).
3794                    let cache = match s.nodes.get(added_dep) {
3795                        Some(rec) => rec.cache,
3796                        None => continue, // dep deleted concurrently
3797                    };
3798                    if cache == NO_HANDLE {
3799                        continue;
3800                    }
3801                    let dep_idx = s.require_node(n).dep_index_of(*added_dep);
3802                    if let Some(idx) = dep_idx {
3803                        this.deliver_data_to_consumer(&mut s, n, idx, cache);
3804                    }
3805                }
3806            });
3807        }
3808        for h in removed_handles {
3809            self.binding.release_handle(h);
3810        }
3811        Ok(())
3812    }
3813
3814    /// DFS from `from` along data-flow edges (children map) looking for `to`.
3815    /// Returns the path including endpoints, or `None` if unreachable. Used
3816    /// for cycle detection in [`Self::set_deps`].
3817    fn path_from_to(&self, s: &CoreState, from: NodeId, to: NodeId) -> Option<Vec<NodeId>> {
3818        if from == to {
3819            return Some(vec![from]);
3820        }
3821        let mut stack: Vec<(NodeId, Vec<NodeId>)> = vec![(from, vec![from])];
3822        let mut visited: HashSet<NodeId> = HashSet::new();
3823        while let Some((cur, path)) = stack.pop() {
3824            if !visited.insert(cur) {
3825                continue;
3826            }
3827            if cur == to {
3828                return Some(path);
3829            }
3830            if let Some(children) = s.children.get(&cur) {
3831                for &child in children {
3832                    let mut new_path = path.clone();
3833                    new_path.push(child);
3834                    stack.push((child, new_path));
3835                }
3836            }
3837        }
3838        None
3839    }
3840}
3841
3842// CoreState helpers — kept on the inner struct so they're naturally scoped
3843// to the lock guard.
3844impl CoreState {
3845    fn alloc_node_id(&mut self) -> NodeId {
3846        let id = NodeId::new(self.next_node_id);
3847        self.next_node_id += 1;
3848        id
3849    }
3850
3851    fn alloc_sub_id(&mut self) -> SubscriptionId {
3852        let id = SubscriptionId(self.next_subscription_id);
3853        self.next_subscription_id += 1;
3854        id
3855    }
3856
3857    /// Clear wave-scoped flags and rotate per-dep batch data on every
3858    /// node. Run at the end of every wave (regular drain via `run_wave`,
3859    /// activation drain via `activate_derived`, and `BatchGuard::drop`'s
3860    /// drain). Centralized so a future wave-state field can't be missed
3861    /// at one of the cleanup sites.
3862    ///
3863    /// Per-dep rotation (R2.9.b / R1.3.6.b):
3864    /// - `prev_data` ← last element of `data_batch` (or unchanged if empty).
3865    ///   The last batch entry's retain transfers to `prev_data`; the old
3866    ///   `prev_data`'s retain is released. All earlier batch entries are
3867    ///   released.
3868    /// - `data_batch` cleared.
3869    /// - Per-dep `dirty` and `involved_this_wave` cleared.
3870    ///
3871    /// Handle releases are pushed to `deferred_handle_releases` for
3872    /// post-lock-drop release by the caller.
3873    pub(crate) fn clear_wave_state(&mut self) {
3874        self.pending_auto_resolve.clear();
3875        // A6 (Slice F, 2026-05-07): currently_firing is push/pop balanced
3876        // by FiringGuard's RAII (including on panic). It should already be
3877        // empty here, but defensively clear in case a future code path
3878        // forgets the guard.
3879        self.currently_firing.clear();
3880        // A3 (Slice F, 2026-05-07): pending_pause_overflow is normally
3881        // drained by drain_and_flush via the synthesis loop. If a wave is
3882        // panic-discarded BEFORE the synthesis runs (e.g. invoke_fn panics
3883        // before a paused-overflow has a chance to synthesize), we drop the
3884        // queued entries silently — the binding never sees ERROR for that
3885        // overflow event, but the pause buffer's `dropped` count is
3886        // unchanged so callers can still detect via ResumeReport. Re-firing
3887        // the synthesis on the next wave would be confusing (the overflow
3888        // event is logically scoped to the panicked wave).
3889        self.pending_pause_overflow.clear();
3890        // Slice G: tier3 emit tracking is wave-scoped.
3891        self.tier3_emitted_this_wave.clear();
3892        // Slice E2 (D057): per-wave-per-node OnInvalidate dedup is
3893        // wave-scoped — cleared so the next wave can fire cleanups again.
3894        self.invalidate_hooks_fired_this_wave.clear();
3895        // Slice E2 INVARIANT (DO NOT CHANGE WITHOUT THINKING):
3896        // `deferred_cleanup_hooks` is NOT cleared here. It follows the
3897        // `deferred_handle_releases` discipline:
3898        //   - SUCCESS path (`BatchGuard::drop` non-panic): drained by
3899        //     `Core::drain_deferred` AFTER `clear_wave_state` runs, then
3900        //     fired lock-released by `Core::fire_deferred`.
3901        //   - PANIC-DISCARD path (`BatchGuard::drop` panic): explicitly
3902        //     `std::mem::take`-and-dropped AFTER `clear_wave_state` runs,
3903        //     silently per D061.
3904        // Clearing it INSIDE `clear_wave_state` would race the success
3905        // path: the wave's queued `OnInvalidate` cleanup hooks would be
3906        // erased BEFORE `drain_deferred` could take them, dropping every
3907        // user cleanup callback on every successful wave.
3908        // If a future change moves `deferred_cleanup_hooks` ownership
3909        // here, ALSO move the post-`clear_wave_state` take in both
3910        // BatchGuard paths to BEFORE the clear call. Until then, leaving
3911        // the field untouched here is load-bearing.
3912        for rec in self.nodes.values_mut() {
3913            rec.dirty = false;
3914            rec.involved_this_wave = false;
3915            for dr in &mut rec.dep_records {
3916                let batch_len = dr.data_batch.len();
3917                if batch_len > 0 {
3918                    // Release all batch entries EXCEPT the last — the last
3919                    // entry's retain transfers to prev_data.
3920                    for &h in &dr.data_batch[..batch_len - 1] {
3921                        self.deferred_handle_releases.push(h);
3922                    }
3923                    // Release the OLD prev_data (its retain was from the
3924                    // previous wave's rotation or from initial delivery).
3925                    if dr.prev_data != NO_HANDLE {
3926                        self.deferred_handle_releases.push(dr.prev_data);
3927                    }
3928                    // Rotate: last batch entry becomes new prev_data.
3929                    // Its retain carries over — no extra retain needed.
3930                    dr.prev_data = dr.data_batch[batch_len - 1];
3931                    dr.data_batch.clear();
3932                }
3933                dr.dirty = false;
3934                dr.involved_this_wave = false;
3935            }
3936        }
3937    }
3938
3939    pub(crate) fn require_node(&self, id: NodeId) -> &NodeRecord {
3940        self.nodes
3941            .get(&id)
3942            .unwrap_or_else(|| panic!("unknown node {id:?}"))
3943    }
3944
3945    pub(crate) fn require_node_mut(&mut self, id: NodeId) -> &mut NodeRecord {
3946        self.nodes
3947            .get_mut(&id)
3948            .unwrap_or_else(|| panic!("unknown node {id:?}"))
3949    }
3950}
3951
3952/// Release every binding-side refcount share owned by this `CoreState`
3953/// when the last `Core` clone drops the inner Mutex.
3954///
3955/// Without this, every retained handle in `cache` / `terminal` Error /
3956/// `dep_terminals` Error / pause-buffer-payload would leak in the binding
3957/// registry until process exit. Production bindings (napi-rs, pyo3,
3958/// wasm-bindgen) all maintain handle-ref maps that grow unbounded without
3959/// this cleanup.
3960///
3961/// Safe to call during panic unwinding — `BindingBoundary::release_handle`
3962/// is the only call, and a panicking binding during cleanup would already
3963/// have been a problem in normal operation.
3964impl Drop for CoreState {
3965    fn drop(&mut self) {
3966        // Drain pending in-flight retains too, so a panic mid-wave doesn't
3967        // strand the queue_notify retains in `deferred_handle_releases`.
3968        let pending = std::mem::take(&mut self.pending_notify);
3969        let deferred_releases = std::mem::take(&mut self.deferred_handle_releases);
3970        // `deferred_flush_jobs` carries `Vec<Sink>` clones — those Arcs
3971        // drop naturally when this CoreState drops; no handles to release
3972        // there.
3973        let _ = std::mem::take(&mut self.deferred_flush_jobs);
3974
3975        // Per-node retained handles:
3976        //   - `cache` (1 retain per non-NO_HANDLE state cache or
3977        //     populated compute cache).
3978        //   - `terminal == Some(Error(h))` (1 retain on the terminal slot).
3979        //   - `dep_terminals[i] == Some(Error(h))` (1 retain per consumer's
3980        //     terminated-dep slot).
3981        //   - `pause_state` paused buffer messages with payload handles
3982        //     (1 retain per buffered Data/Error).
3983        for rec in self.nodes.values_mut() {
3984            if rec.cache != NO_HANDLE {
3985                self.binding.release_handle(rec.cache);
3986            }
3987            if let Some(TerminalKind::Error(h)) = rec.terminal {
3988                self.binding.release_handle(h);
3989            }
3990            for dr in &rec.dep_records {
3991                if let Some(TerminalKind::Error(h)) = dr.terminal {
3992                    self.binding.release_handle(h);
3993                }
3994                // Release data_batch retains (in-flight wave data).
3995                for &h in &dr.data_batch {
3996                    self.binding.release_handle(h);
3997                }
3998                // Release prev_data retain (cross-wave persistence).
3999                if dr.prev_data != NO_HANDLE {
4000                    self.binding.release_handle(dr.prev_data);
4001                }
4002            }
4003            if let PauseState::Paused { buffer, .. } = &rec.pause_state {
4004                for msg in buffer {
4005                    if let Some(h) = msg.payload_handle() {
4006                        self.binding.release_handle(h);
4007                    }
4008                }
4009            }
4010            // Slice E1: release replay-buffer retains.
4011            for &h in &rec.replay_buffer {
4012                self.binding.release_handle(h);
4013            }
4014            // Operator scratch (Slice C-3, D026): generic per-operator
4015            // state struct. Each variant's release_handles releases the
4016            // shares it owns (Scan/Reduce acc, Distinct/Pairwise prev,
4017            // Last latest + default; Take/Skip/TakeWhile own no handles).
4018            if let Some(scratch) = rec.op_scratch.as_mut() {
4019                scratch.release_handles(&*self.binding);
4020            }
4021        }
4022
4023        // Pending wave retains.
4024        for entry in pending.values() {
4025            for msg in &entry.messages {
4026                if let Some(h) = msg.payload_handle() {
4027                    self.binding.release_handle(h);
4028                }
4029            }
4030        }
4031        for h in deferred_releases {
4032            self.binding.release_handle(h);
4033        }
4034        // Wave-cache snapshot retains (defensive — should normally be
4035        // empty by the time Core drops, but a panicked-mid-wave Core
4036        // could leave them populated).
4037        let snapshots = std::mem::take(&mut self.wave_cache_snapshots);
4038        for (_, h) in snapshots {
4039            self.binding.release_handle(h);
4040        }
4041    }
4042}