Skip to main content

graphrefly_core/
node.rs

1//! The dispatcher — node registration, subscription, wave engine.
2//!
3//! Mirrors `~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`
4//! (the Phase 13.6 brainstorm prototype, ~370 lines, 22 invariant tests).
5//!
6//! # Scope (M1 dispatcher + Slice A+B parity, closed 2026-05-05)
7//!
8//! - State + derived + dynamic node registration.
9//! - Subscribe / unsubscribe with push-on-subscribe (R1.2.3).
10//! - RAII [`Subscription`] with Drop-based deregister (§10.12).
11//! - DIRTY → DATA / RESOLVED ordering (R1.3.1.b two-phase push).
12//! - Equals-substitution (R1.3.2): identity is zero-FFI; custom crosses boundary.
13//! - First-run gate (R2.5.3) — fn does not fire until every dep has a handle.
14//! - Diamond resolution — one fn fire per wave even with shared upstream.
15//! - `set_deps()` atomic dep mutation with cycle detection + Phase 13.8 Q1
16//!   terminal-rejection policy (R3.3.1).
17//! - PAUSE / RESUME with lockId set + replay buffer (R1.2.6, R2.6, §10.2).
18//! - INVALIDATE broadcast + cascade with R1.4 idempotency.
19//! - COMPLETE / ERROR cascade + Lock 2.B auto-cascade gating
20//!   (ERROR dominates COMPLETE; first error wins).
21//! - TEARDOWN auto-precedes COMPLETE (R2.6.4 / Lock 6.F) +
22//!   `has_received_teardown` idempotency.
23//! - Meta TEARDOWN ordering (R1.3.9.d) — companions tear down before parent.
24//! - Resubscribable terminal lifecycle (R2.2.7, R2.5.3) — late subscribe to a
25//!   resubscribable terminal node resets lifecycle, except after TEARDOWN
26//!   (per F3 audit guard: TEARDOWN is permanent).
27//!
28//! # Module split (Slice C-1, 2026-05-05)
29//!
30//! Wave-engine internals (drain loop, fire selection, emission commit, sink
31//! dispatch) live in [`crate::batch`]. The split is purely organizational —
32//! the methods are still on `Core`. See `batch.rs` for the wave-engine
33//! entry points (`run_wave`, `drain_and_flush`, `commit_emission`,
34//! `queue_notify`, `deliver_data_to_consumer`).
35//!
36//! # Out of scope (later slices / milestones)
37//!
38//! - Deactivation cleanup (RAM nodes clear cache when sink count → 0) — M2.
39//!
40//! See [`migration-status.md`](../../../docs/migration-status.md) for the
41//! milestone tracker and [`porting-deferred.md`](../../../docs/porting-deferred.md)
42//! for surfaced concerns deferred to evidence-driven slices.
43//!
44//! # Re-entrance discipline (Slice A close, M1: fully lock-released)
45//!
46//! - **Wave-end sink fires** drop the state lock first. A subscriber's sink
47//!   that calls back into `Core::emit` / `pause` / `resume` / `invalidate` /
48//!   `complete` / `error` / `teardown` re-acquires the lock cleanly and runs
49//!   a nested wave (the one-Core-per-OS-thread `in_tick` ownership slot is
50//!   cleared by the owning `BatchGuard::drop` before the deferred-fire
51//!   phase).
52//! - **`BindingBoundary::invoke_fn`** fires lock-released. The wave engine
53//!   acquires + drops the state lock per fn-fire iteration around the
54//!   `invoke_fn` callback. User fns may re-enter `Core::emit` / `pause` /
55//!   etc. and run a nested wave.
56//! - **`BindingBoundary::custom_equals`** fires lock-released.
57//!   `commit_emission` brackets the equals check around a lock release;
58//!   custom equals oracles may re-enter Core safely.
59//! - **Subscribe-time handshake** also fires lock-released.
60//!   [`Core::subscribe`] installs the sink under the state lock, drops
61//!   it, then fires the per-tier handshake (`[Start]` / `[Data(cache)]?`
62//!   / `[Complete]?` / `[Error(h)]?` / `[Teardown]?` per R1.3.5.a)
63//!   lock-released. A handshake-time sink callback may re-enter Core
64//!   via the owner-side mailbox/`DeferQueue` seam (`emit` / `complete` /
65//!   `error` / `subscribe`); the owner drain applies it as a nested
66//!   wave. Post-S4 `Core` is single-owner `!Send + !Sync` — there is no
67//!   `wave_owner` mutex and no cross-thread block; R1.3.5.a
68//!   happens-after ordering holds because the one owner thread installs
69//!   the sink (state lock) before any subsequent emit's flush observes
70//!   it (`subscribers_revision` freeze, Slice X4/D2).
71
72use std::collections::VecDeque;
73use std::panic::{catch_unwind, AssertUnwindSafe};
74use std::sync::atomic::{AtomicU64, Ordering};
75use std::sync::Arc;
76
77use ahash::{AHashMap as HashMap, AHashSet as HashSet};
78
79// D246/S2c: `WaveOwnerGuard` (the §7 per-group `parking_lot::ReentrantMutex`
80// wave-lock wrapper) is deleted — single-owner ⇒ no cross-thread wave
81// serialization.
82use smallvec::SmallVec;
83use thiserror::Error;
84
85use crate::boundary::{BindingBoundary, CleanupTrigger};
86use crate::clock::monotonic_ns;
87use crate::handle::{FnId, HandleId, LockId, NodeId, NO_HANDLE};
88use crate::message::Message;
89
90/// Terminal-lifecycle state — once set on a node, the node will not emit
91/// further DATA; per-dep slots on consumers also use this to track which
92/// upstreams have terminated (R1.3.4 / Lock 2.B).
93///
94/// `Error` carries a [`HandleId`] resolving to the error value. Refcount is
95/// retained when the variant is stored in a node's `terminal` slot or any
96/// consumer's `dep_terminals` slot; v1 does not release these (terminal
97/// state is one-shot at this layer; release happens on resubscribable
98/// terminal-lifecycle reset, a separate slice).
99#[derive(Copy, Clone, Debug, PartialEq, Eq)]
100pub enum TerminalKind {
101    Complete,
102    Error(HandleId),
103}
104
105/// Node kind discriminant — **derived metadata** computed from
106/// [`NodeRecord`]'s field shape (D030 unification, Slice D).
107///
108/// Core no longer stores `kind` as a field; it's computed on demand from
109/// `(deps.is_empty(), fn_id.is_some(), op.is_some(), is_dynamic)`,
110/// mirroring TS's data model where `NodeImpl` has no `_kind` field. The
111/// shape uniquely identifies the kind:
112///
113/// | deps      | fn_id | op   | is_dynamic | kind     |
114/// |-----------|-------|------|-----------|----------|
115/// | empty     | None  | None | -         | State    |
116/// | empty     | Some  | None | -         | Producer |
117/// | non-empty | Some  | None | false     | Derived  |
118/// | non-empty | Some  | None | true      | Dynamic  |
119/// | non-empty | None  | Some | -         | Operator |
120///
121/// Public API ([`Core::kind_of`]) derives this enum on each call. State
122/// nodes are ROM (cache survives deactivation); compute nodes
123/// (Derived / Dynamic / Operator) and producers are RAM.
124#[derive(Copy, Clone, Eq, PartialEq, Debug)]
125pub enum NodeKind {
126    /// Source node: cache is intrinsic, no fn, no deps. Mutated via [`Core::emit`].
127    State,
128    /// Producer node: fn fires once on first subscribe. No deps;
129    /// emissions arrive via sinks the fn subscribes to (zip / concat /
130    /// race / takeUntil pattern). Slice D / D031.
131    Producer,
132    /// Derived node: fn fires on every dep change; all deps tracked.
133    Derived,
134    /// Dynamic node: fn declares which dep indices it actually read this run.
135    /// Untracked dep updates flow through cache but do NOT re-fire fn.
136    Dynamic,
137    /// Operator node: built-in dispatch path for transform / combine /
138    /// flow / resilience operators. The `OperatorOp` discriminant selects
139    /// the per-operator FFI path ([`BindingBoundary::project_each`] etc.);
140    /// Core manages per-operator state via the generic `op_scratch` slot
141    /// on `NodeRecord` (D026). Per Slice C-1 (D009) / Slice C-3 (D026).
142    Operator(OperatorOp),
143}
144
145impl NodeKind {
146    /// True if this kind opts OUT of Lock 2.B auto-cascade. Operator(Reduce)
147    /// and Operator(Last) must intercept upstream COMPLETE so they can emit
148    /// their accumulator / buffered value before the cascade terminates them;
149    /// instead of cascading, terminate_node queues such children for fn-fire
150    /// so `fire_operator` can handle the terminal.
151    pub(crate) fn skips_auto_cascade(self) -> bool {
152        matches!(
153            self,
154            NodeKind::Operator(
155                OperatorOp::Reduce { .. } | OperatorOp::Last { .. } | OperatorOp::Valve
156            )
157        )
158    }
159}
160
161/// Built-in operator discriminant. Selects the per-operator dispatch path
162/// in `fire_operator` (`crates/graphrefly-core/src/batch.rs`). Each variant
163/// carries the binding-side closure ids (and seed handle for stateful
164/// folders) needed for the wave-execution path; Core stores no user values
165/// itself per the handle-protocol cleaving plane.
166#[derive(Copy, Clone, Eq, PartialEq, Debug)]
167pub enum OperatorOp {
168    /// `map(source, project)` — element-wise transform. Calls
169    /// `BindingBoundary::project_each(fn_id, &inputs)` per fire; emits each
170    /// returned handle via `commit_emission_verbatim` (R1.3.2.d batch
171    /// semantics — no equals substitution between batch entries).
172    Map { fn_id: FnId },
173    /// `filter(source, predicate)` — silent-drop selection (D012/D018).
174    /// Calls `BindingBoundary::predicate_each(fn_id, &inputs)`; emits each
175    /// passing input verbatim. If zero pass on a wave that dirtied the
176    /// node, queues a single `RESOLVED` to settle (D018).
177    Filter { fn_id: FnId },
178    /// `scan(source, fold, seed)` — left-fold emitting each new accumulator.
179    /// `seed` is captured at registration; `acc` lives in
180    /// [`ScanState`](super::op_state::ScanState) inside
181    /// [`NodeRecord::op_scratch`] and persists across waves until
182    /// resubscribable reset. Calls `BindingBoundary::fold_each(fn_id, acc,
183    /// &inputs) -> SmallVec<HandleId>` per fire.
184    Scan { fn_id: FnId, seed: HandleId },
185    /// `reduce(source, fold, seed)` — left-fold emitting once on upstream
186    /// COMPLETE. Accumulates silently while source DATA flows; on
187    /// dep[0].terminal == Some(Complete), emits `[Data(acc), Complete]`.
188    /// On `Error(h)`, propagates the error verbatim. Opts out of Lock 2.B
189    /// auto-cascade (see `NodeKind::skips_auto_cascade`).
190    Reduce { fn_id: FnId, seed: HandleId },
191    /// `distinctUntilChanged(source, equals)` — suppresses adjacent
192    /// duplicates. Calls `BindingBoundary::custom_equals(equals_fn_id,
193    /// prev, current)` per input; emits non-equal items verbatim and
194    /// updates `prev`. If zero items pass on a wave that dirtied the node,
195    /// queues `RESOLVED` (matches Filter discipline).
196    DistinctUntilChanged { equals_fn_id: FnId },
197    /// `pairwise(source)` — emits `(prev, current)` pairs starting after
198    /// the second value. First value swallowed (sets `prev`). Calls
199    /// `BindingBoundary::pairwise_pack(fn_id, prev, current)` per pair to
200    /// produce the binding-side tuple handle.
201    Pairwise { fn_id: FnId },
202
203    // ----- Slice C-2: multi-dep combinators (D020) -----
204    /// `combine(...sources)` — N-dep combineLatest. On any dep fire, packs
205    /// the latest handle per dep into a single tuple handle via
206    /// `BindingBoundary::pack_tuple(pack_fn, &handles)`. First-run gate
207    /// (`partial: false` default) holds until all deps deliver real DATA
208    /// (R2.5.3). COMPLETE cascades when all deps complete (R1.3.4.b).
209    Combine { pack_fn: FnId },
210
211    /// `withLatestFrom(primary, secondary)` — 2-dep, fire-on-primary-only
212    /// (D021, Phase 10.5). Packs `[primary, secondary]` via
213    /// `BindingBoundary::pack_tuple(pack_fn, &handles)` when dep[0]
214    /// (primary) has DATA in the wave. If only dep[1] (secondary) fires,
215    /// settles with RESOLVED (D018 pattern). First-run gate holds until
216    /// both deps deliver (R2.5.3 `partial: false`). Post-warmup INVALIDATE
217    /// guard: if secondary `prev_data == NO_HANDLE` and batch empty after
218    /// warmup, settles with RESOLVED (no stale pair).
219    WithLatestFrom { pack_fn: FnId },
220
221    /// `merge(...sources)` — N-dep, forward all DATA handles verbatim
222    /// (D022). Zero FFI on fire: no transformation, no binding call.
223    /// Each dep's batch handles are retained and emitted individually.
224    /// COMPLETE cascades when all deps complete (R1.3.4.b).
225    Merge,
226
227    // ----- Slice C-3: flow operators (D024) -----
228    /// `take(source, count)` — emits the first `count` DATA values then
229    /// self-completes via `Core::complete`. Tracks `count_emitted` in
230    /// [`TakeState`](super::op_state::TakeState). When upstream completes
231    /// before `count` is reached, the standard auto-cascade propagates
232    /// COMPLETE. `count == 0` is allowed: the first fire emits zero
233    /// items then immediately self-completes (D027).
234    Take { count: u32 },
235
236    /// `skip(source, count)` — drops the first `count` DATA values; once
237    /// the threshold is crossed, subsequent DATAs pass through verbatim.
238    /// Tracks `count_skipped` in [`SkipState`](super::op_state::SkipState).
239    /// On a wave where every input is still in the skip window, queues
240    /// DIRTY+RESOLVED to settle (D018 pattern).
241    Skip { count: u32 },
242
243    /// `takeWhile(source, predicate)` — emits while `predicate(input)`
244    /// holds; on the first `false`, emits any preceding passes then
245    /// self-completes via `Core::complete`. Reuses
246    /// [`BindingBoundary::predicate_each`] (D029); after the first
247    /// `false`, subsequent inputs in the same batch are dropped.
248    TakeWhile { fn_id: FnId },
249
250    /// `last(source)` / `last_with_default(source, default)` — buffers
251    /// the latest DATA; on upstream COMPLETE, emits `Data(latest)` then
252    /// `Complete`. The `default` field is `NO_HANDLE` for the no-default
253    /// factory (emits only `Complete` on empty stream), or a registered
254    /// default handle (emits `Data(default)` + `Complete` on empty
255    /// stream). Storage: [`LastState`](super::op_state::LastState) holds
256    /// `latest` (live buffer) and `default` (registration-time, stable).
257    /// Opts out of Lock 2.B auto-cascade so it can intercept upstream
258    /// COMPLETE.
259    Last { default: HandleId },
260
261    // ----- Slice U: control operators (D047) -----
262    /// `tap(source, fn)` — side-effect passthrough. Calls
263    /// `BindingBoundary::invoke_tap_fn(fn_id, handle)` on each input DATA,
264    /// then emits the input handle unchanged. Zero-transform: output
265    /// handles are the inputs verbatim (no equals substitution, no
266    /// allocation).
267    Tap { fn_id: FnId },
268
269    /// `tapFirst(source, fn)` — one-shot side-effect on first DATA. Same
270    /// as [`Tap`](Self::Tap) but fires `invoke_tap_fn` only once; after
271    /// the first fire, subsequent DATA passes through without a callback.
272    /// State: [`TapFirstState`](super::op_state::TapFirstState) tracks
273    /// `fired: bool`.
274    TapFirst { fn_id: FnId },
275
276    /// `valve(source, control)` — conditional forward. 2-dep: dep[0] is
277    /// source, dep[1] is boolean control. When the latest control value
278    /// is truthy (non-zero handle), forwards source DATA; when falsy,
279    /// settles with RESOLVED. Partial mode so it fires on control-alone
280    /// before source has delivered. Does NOT auto-complete on control
281    /// terminal (`completeWhenDepsComplete: false` equivalent).
282    Valve,
283
284    /// `settle(source, quietWaves, maxWaves)` — convergence detector.
285    /// Forwards each upstream DATA, counts consecutive no-change waves,
286    /// and self-completes when `quiet_count >= quiet_waves` (or
287    /// `wave_count >= max_waves` if set). State:
288    /// [`SettleState`](super::op_state::SettleState).
289    Settle {
290        quiet_waves: u32,
291        max_waves: Option<u32>,
292    },
293}
294
295/// Registration options for [`Core::register_operator`].
296///
297/// `equals` controls operator output dedup (R5.7 — defaults to identity).
298/// `partial` controls the R2.5.3 first-run gate (R5.4 — operator dispatch
299/// fires on first DATA from any dep when `true`; default `false` matches
300/// the gated derived discipline).
301#[derive(Copy, Clone, Debug)]
302pub struct OperatorOpts {
303    pub equals: EqualsMode,
304    pub partial: bool,
305}
306
307impl Default for OperatorOpts {
308    fn default() -> Self {
309        Self {
310            equals: EqualsMode::Identity,
311            partial: false,
312        }
313    }
314}
315
316/// Closure-form fn id OR typed operator discriminant — the two dispatch
317/// paths a node can use. State / passthrough nodes pass `None` to
318/// [`Core::register`] (no fn at all).
319#[derive(Copy, Clone, Debug)]
320pub enum NodeFnOrOp {
321    /// Closure-form: invokes [`BindingBoundary::invoke_fn`] per fire.
322    /// Used for Derived / Dynamic / Producer.
323    Fn(FnId),
324    /// Typed-op: routes to a `fire_op_*` helper that calls per-operator
325    /// FFI methods (`project_each` / `predicate_each` / `fold_each` /
326    /// `pairwise_pack` / `pack_tuple`). Used for Operator nodes.
327    Op(OperatorOp),
328}
329
330/// Pause behavior mode (canonical-spec §2.6 — three modes shipped in TS;
331/// Slice F audit, 2026-05-07 — closed the Rust port gap).
332///
333/// | Mode | Outgoing tier-3 routing while paused | RESUME behavior |
334/// |---|---|---|
335/// | [`PausableMode::Default`] | suppress fn-fire upstream (no DIRTY emitted) | fire fn ONCE on RESUME if any dep delivered DATA during pause; collapses N pause-window writes into one settle |
336/// | [`PausableMode::ResumeAll`] | buffer outgoing tier-3 / tier-4 messages per-wave | replay each buffered wave verbatim on RESUME |
337/// | [`PausableMode::Off`] | dispatcher ignores PAUSE; tier-3 flushes immediately | no-op (no buffer to drain) |
338///
339/// Default is [`PausableMode::Default`] per canonical §2.6 — every untagged
340/// source picks it up. Memory profile is O(1) per node (no buffer); the
341/// trade-off is "subscribers see one consolidated DATA on RESUME" rather
342/// than the K mid-pause emissions verbatim.
343///
344/// Note: tier-1 (DIRTY) / tier-2 (PAUSE/RESUME) / tier-5 (COMPLETE/ERROR) /
345/// tier-6 (TEARDOWN) bypass pause regardless of mode — they remain
346/// observable so leaked pause-controllers cannot strand subscribers.
347#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
348pub enum PausableMode {
349    /// Suppress fn-fire while paused; fire once on RESUME if any dep
350    /// delivered DATA during the pause window. Canonical default.
351    #[default]
352    Default,
353    /// Buffer outgoing tier-3 / tier-4 messages per-wave; replay on
354    /// RESUME. Use when subscribers need verbatim emit history (e.g. an
355    /// audit log, replay-on-reconnect bridge).
356    ResumeAll,
357    /// Dispatcher ignores PAUSE for this node — tier-3 flushes
358    /// immediately even while a lock is held. Use for nodes whose value
359    /// production is intrinsically pause-immune (telemetry counters,
360    /// monotonic timers).
361    Off,
362}
363
364/// Per-kind opts for [`Core::register`]. Cross-kind config knobs live
365/// here; per-kind specifics (deps, fn_or_op) live on
366/// [`NodeRegistration`].
367#[derive(Copy, Clone, Debug)]
368pub struct NodeOpts {
369    /// Initial cached value. Only valid for state nodes (no deps + no
370    /// fn + no op). [`NO_HANDLE`] starts the node sentinel.
371    pub initial: HandleId,
372    /// Equality mode for outgoing emissions (R1.3.2). Defaults to
373    /// [`EqualsMode::Identity`].
374    pub equals: EqualsMode,
375    /// First-run gate (R2.5.3 / D011). When `true`, the node fires as
376    /// soon as ANY dep delivers a real handle; when `false` (default),
377    /// the node holds until every dep has delivered.
378    pub partial: bool,
379    /// Dynamic flag (R2.5.3) — fn declares actually-tracked dep indices
380    /// per fire. Only meaningful when `fn_or_op == Some(Fn(_))` AND
381    /// deps non-empty.
382    pub is_dynamic: bool,
383    /// Pause behavior mode (canonical §2.6). Default is
384    /// [`PausableMode::Default`]. See [`PausableMode`] for the trade-offs.
385    pub pausable: PausableMode,
386    /// Replay buffer cap (canonical R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
387    /// `None` (default) disables; `Some(N)` keeps a circular buffer of the
388    /// last N DATA emissions and replays them to late subscribers as part
389    /// of the per-tier handshake (between [`Message::Start`] and any
390    /// terminal slice). Only DATA is buffered; RESOLVED entries are NOT
391    /// (R2.6.5 explicit "DATA only").
392    pub replay_buffer: Option<usize>,
393    /// D263 — when `true`, the `fire_fn` first-run gate (R2.5.3) treats a
394    /// terminal dep as "real input" so the node fires even if the only
395    /// signal a dep ever delivered was a `COMPLETE` (no DATA). Mirrors the
396    /// unconditional `fire_operator` semantics (`fire_operator`'s gate
397    /// already counts dep terminals as real input — e.g. for `Reduce`).
398    /// Default `false` preserves the historical gate (sentinel deps hold
399    /// the node until they deliver real DATA). NOT yet widened onto the
400    /// `Impl` parity contract per D196 + the parity-tests comment
401    /// ("NOT widened onto Impl"); kept substrate-internal until a
402    /// cross-arm scenario surfaces.
403    pub terminal_as_real_input: bool,
404}
405
406impl Default for NodeOpts {
407    fn default() -> Self {
408        Self {
409            initial: NO_HANDLE,
410            equals: EqualsMode::Identity,
411            partial: false,
412            is_dynamic: false,
413            pausable: PausableMode::Default,
414            replay_buffer: None,
415            terminal_as_real_input: false,
416        }
417    }
418}
419
420/// Unified node-registration descriptor (D030, Slice D).
421///
422/// All node kinds (State / Producer / Derived / Dynamic / Operator)
423/// register through [`Core::register`] with a `NodeRegistration`. The
424/// kind is **derived from the field shape** of the registration —
425/// `(deps.is_empty(), fn_or_op variant)`:
426///
427/// | deps      | fn_or_op   | is_dynamic | resulting kind |
428/// |-----------|-----------|-----------|----------------|
429/// | empty     | None      | -         | State          |
430/// | empty     | Some(Fn)  | -         | Producer       |
431/// | non-empty | Some(Fn)  | false     | Derived        |
432/// | non-empty | Some(Fn)  | true      | Dynamic        |
433/// | non-empty | Some(Op)  | -         | Operator       |
434///
435/// The sugar wrappers ([`Core::register_state`], [`Core::register_producer`],
436/// etc.) build a `NodeRegistration` and delegate.
437#[derive(Clone, Debug)]
438pub struct NodeRegistration {
439    /// Upstream deps in declaration order. Empty for state / producer.
440    pub deps: Vec<NodeId>,
441    /// Closure-form fn id or typed-op discriminant. `None` for state /
442    /// passthrough.
443    pub fn_or_op: Option<NodeFnOrOp>,
444    /// Cross-kind config knobs.
445    pub opts: NodeOpts,
446}
447
448/// Equality mode for a node's outgoing emissions.
449///
450/// `Identity` is the default: cache vs. new handle compare is a `u64` equal —
451/// zero FFI. `Custom` invokes [`BindingBoundary::custom_equals`] every check
452/// (R1.3.2.b two-arg call when both sides are non-sentinel).
453#[derive(Copy, Clone, Debug)]
454pub enum EqualsMode {
455    Identity,
456    Custom(FnId),
457}
458
459/// Public identifier for a single subscription (S2b / D225: promoted
460/// from `pub(crate)`). Returned by [`Core::subscribe`] /
461/// [`Core::try_subscribe`]; pair it with the `node_id` you subscribed
462/// and pass both to [`Core::unsubscribe`] to deregister.
463///
464/// S2b retires the core-level RAII `Subscription`/`WeakCore` (D223:
465/// the `Core` is owned by value and relocates between workers, so a
466/// parameterless `Drop` cannot reach it without `Weak<C>`/`unsafe` —
467/// D225). Drop-convenience is a *binding-layer* RAII wrapper over
468/// [`Core::unsubscribe`], used only where the holder co-owns the `Core`
469/// on its affinity worker (test harness, napi `BenchCore`). Substrate
470/// callers (e.g. producer upstream-sub cleanup, D229) unsubscribe
471/// explicitly via the owner-driven chain.
472#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
473pub struct SubscriptionId(u64);
474
475/// Deregister `sub_id` from `node_id` and run the Phase G / lifecycle
476/// cleanup chain (OnDeactivation → `producer_deactivate` → `wipe_ctx` →
477/// Core cache-clear). Extracted verbatim from the former
478/// `Subscription::Drop` (D225 refined A2, S2a) so `Core::unsubscribe`
479/// (the synchronous owner-invoked path) and the legacy RAII `Drop`
480/// share ONE body. Operates on `&C` directly — the body already used
481/// only `&dyn BindingBoundary` (via `make_op_scratch_with_binding`),
482/// never a `Core`, so this is a behaviour-identical move.
483///
484/// Producer deactivation (Slice D, D031): if removing this sub empties
485/// the subscribers map AND the node is a producer, fire
486/// `BindingBoundary::producer_deactivate(node_id)` AFTER releasing the
487/// state lock. The binding then drops its per-node state (subscriptions
488/// to upstream sources, captured closure state), which transitively
489/// unsubs from upstreams via their own unsubscribe. Re-entrance into
490/// Core from the deactivate hook is permitted since the lock is
491/// released first.
492#[allow(clippy::too_many_lines)] // Phase G is one continuous lifecycle hook chain (user cleanup → producer_deactivate → wipe_ctx → Core cache-clear); splitting it would obscure the ordering invariant.
493pub(crate) fn unsubscribe_sink(core: &Core, node_id: NodeId, sub_id: SubscriptionId) {
494    // Slice E2 (D056): when the last subscriber drops, fire the
495    // node's OnDeactivation cleanup hook BEFORE producer_deactivate
496    // (cleanup may release handles the producer subscription owns;
497    // reverse order would let producer_deactivate drop subs that user
498    // cleanup expected to be live). Both calls are lock-released per
499    // D045.
500    //
501    // OnDeactivation gating (D068, QA Q3 fix): fires only when the
502    // node has fired its fn at least once AND has a fn (`fn_id`
503    // populated). State nodes have no fn — they cannot register a
504    // cleanup spec via the production fn-return path (R2.4.5), so
505    // firing `cleanup_for` on them is wasted FFI; the binding's
506    // lookup is guaranteed to find no `current_cleanup`. Skipping
507    // here saves the FFI hop and matches the design-doc wording
508    // ("never-fired state nodes" — state-with-initial-value satisfies
509    // `has_fired_once = true` but still has no fn).
510    //
511    // Slice E2 /qa Q2(b) (D069): if the node is a resubscribable
512    // node that's ALREADY terminal (terminate fired BEFORE this last
513    // sub drop), fire `wipe_ctx` lock-released AFTER OnDeactivation
514    // + producer_deactivate. Mutually exclusive with `terminate_node`'s
515    // queue-wipe site: terminate-with-empty-subs goes through
516    // `pending_wipes`; terminate-with-live-subs routes here when
517    // those subs eventually drop. Either path fires exactly one
518    // wipe per terminal lifecycle.
519    let (was_last_sub, is_producer, has_user_cleanup, fire_wipe, binding) = {
520        // Step 2b-ii-B: route to the node's shard (this drop runs
521        // OUTSIDE any wave ⇒ ambient `None`; a grouped node's
522        // record is in shard `g`, so without this the unsubscribe
523        // would no-op on `DEFAULT_SHARD` → lost cleanup / leak).
524        let mut s = St::new(core);
525        let Some(rec) = s.nodes.get_mut(&node_id) else {
526            return;
527        };
528        rec.subscribers.remove(&sub_id);
529        // Slice X4 / D2: bump revision so any pending_notify entry for
530        // this node opened earlier in the wave starts a fresh batch on
531        // the next queue_notify, dropping the now-departed sink from
532        // the snapshot.
533        rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
534        let last = rec.subscribers.is_empty();
535        let producer = rec.is_producer();
536        // OnDeactivation gate: must have run a fn at least once
537        // (has_fired_once) AND have a fn registered (fn_id.is_some()).
538        // The fn_id check excludes state nodes whose has_fired_once
539        // tracks initial-value status, not "user fn ran."
540        let user_cleanup = rec.has_fired_once && rec.fn_id.is_some();
541        let fire_wipe = last && rec.resubscribable && rec.terminal.is_some();
542        // Phase G (D119/D120/D121, 2026-05-10): always clone the
543        // binding when last sub leaves so we can run the Core
544        // cache-clear after the existing hooks. The Arc::clone is
545        // cheap and dwarfed by the cost of the hooks themselves.
546        let binding = if last { Some(s.binding.clone()) } else { None };
547        (last, producer, user_cleanup, fire_wipe, binding)
548    };
549    if was_last_sub {
550        if let Some(binding) = binding {
551            if has_user_cleanup {
552                binding.cleanup_for(node_id, CleanupTrigger::OnDeactivation);
553            }
554            if is_producer {
555                // D229: hand the binding a `Core::unsubscribe`-capable
556                // closure over the `&C` already in scope (the owner's
557                // synchronous unsubscribe context — exactly what D225
558                // requires; no `Weak<C>`, no parameterless-`Drop`). The
559                // binding loops it over its recorded upstream
560                // `(NodeId, SubscriptionId)` pairs. Lock-released here, so
561                // the recursive `unsubscribe_sink` (re-entrant producer
562                // cascade) is safe — behaviour-identical to the retired
563                // `Subscription::Drop` cascade.
564                let unsub = |up_node: NodeId, up_sub: SubscriptionId| {
565                    unsubscribe_sink(core, up_node, up_sub);
566                };
567                binding.producer_deactivate(node_id, &unsub);
568            }
569            // D069: eager wipe — fires AFTER OnDeactivation so the
570            // user closure observes pre-wipe `store` (matches the
571            // existing "OnDeactivation runs before wipe on terminal
572            // reset" invariant covered by test 10). Idempotent —
573            // `HashMap::remove` on absent key is a no-op, so even
574            // if the wave already drained `pending_wipes` earlier,
575            // this fire is benign.
576            if fire_wipe {
577                binding.wipe_ctx(node_id);
578            }
579
580            // Phase G (D118/D119/D120/D121, 2026-05-10) — Core
581            // cache-clear on deactivation, mirroring TS `_deactivate`
582            // (`pure-ts/src/core/node.ts:2185-2297`):
583            //
584            //   1. user `cleanup_for(OnDeactivation)`  ← above
585            //   2. `producer_deactivate`                ← above
586            //   3. `wipe_ctx` (resubscribable+terminal) ← above
587            //   4. **NEW: Core cache-clear**            ← here
588            //
589            // Releases per-dep `prev_data` + `data_batch` retains +
590            // `dep_terminals` Error retains (the latter closes the
591            // long-standing "Non-resubscribable terminal Error
592            // handles leak via diamond cascade" porting-deferred
593            // entry — D121). Clears pause/replay buffers. Releases
594            // `cached` for compute nodes (R2.2.7 / R2.2.8 ROM:
595            // state nodes preserve cache; compute nodes clear).
596            // Keeps the per-node `terminal` slot intact (D121:
597            // producer-side terminal stays for late-subscriber
598            // R2.2.7.a reset or R2.2.7.b rejection).
599            //
600            // Lock-released release discipline: collect handles
601            // under the lock, drop the lock, fire `release_handle`
602            // outside (mirrors `Core::resume` Phase 2 + the
603            // existing F1 `set_deps` removed-handles release at
604            // node.rs:5003).
605            //
606            // Re-entrance safety (F1 / D123, /qa 2026-05-10): if the
607            // user `cleanup_for` / `producer_deactivate` / `wipe_ctx`
608            // hook re-subscribed via some path, `subscribers` is now
609            // non-empty. **Skip Phase G entirely in that case** —
610            // the new subscriber's handshake delivered the live
611            // `cache` handle to its sink and is holding a refcount
612            // share through `pending_notify`; clearing cache here
613            // would race with the new subscriber's wave and cause
614            // a use-after-release in bindings that reap registry
615            // slots at refcount-zero.
616            //
617            // Why this diverges from TS `_deactivate` (which clears
618            // unconditionally): TS runs the cleanup hook + cache
619            // clear as ONE sync block under the (implicit) JS
620            // single-thread mutex; there's no released-lock window
621            // for a re-subscribe to install a sub before the
622            // cache-clear runs. Rust's lock-released hook discipline
623            // (D045) opens that window, so the re-check is
624            // necessary to preserve refcount soundness.
625            //
626            // Re-acquire state lock atomically with the recheck so
627            // a concurrent thread cannot install a sub between the
628            // emptiness check and the cache mutation.
629            // F8 (/qa 2026-05-10): also take `op_scratch` so its
630            // retains release lock-released after the state-lock
631            // scope. Pre-F8 Phase G only released per-edge handles
632            // + the compute `cache`, leaving operator-internal
633            // retains (Last.latest, Scan.acc, Reduce.acc, etc.)
634            // in-place. For non-resubscribable nodes that never
635            // re-subscribe, this was a permanent leak —
636            // asymmetric with the per-edge cleanup. F8 closes that
637            // by taking the scratch alongside per-edge handles
638            // and calling `release_handles` lock-released.
639            //
640            // D-α (D028 full close, 2026-05-10): for resubscribable
641            // operator nodes, take the OLD scratch AND build a
642            // FRESH scratch via `make_op_scratch` (lock-held, but
643            // `binding.retain_handle` is a leaf operation per
644            // op_state.rs:69-80 docs), install the fresh scratch
645            // on `rec.op_scratch`, and push the old scratch to
646            // `pending_scratch_release` for deferred release. The
647            // queue drains on the next `reset_for_fresh_lifecycle`
648            // (after Phase 2 takes fresh retains — preserves the
649            // C-3 /qa P1 seed-aliasing-acc invariant) or on
650            // `Drop for CoreState`. The fresh install gives
651            // re-activation correct counter / acc state (Take.taken
652            // back to 0, Scan.acc back to seed, etc.) matching TS
653            // Lock 6.D ("resets on every deactivation").
654            let (to_release, scratch_to_release): (
655                Vec<HandleId>,
656                Option<Box<dyn crate::op_state::OperatorScratch>>,
657            ) = {
658                // Step 2b-ii-B: route to the node's shard (drop
659                // runs outside any wave; grouped node's record is
660                // in shard `g` — without this the scratch/refcount
661                // cleanup would no-op on `DEFAULT_SHARD`).
662                let mut s = St::new(core);
663                if let Some(rec) = s.nodes.get_mut(&node_id) {
664                    // F1 re-entrance check.
665                    if !rec.subscribers.is_empty() {
666                        // A user hook re-subscribed during the
667                        // lock-released window; the new lifecycle
668                        // owns this node now. Phase G is a no-op.
669                        return;
670                    }
671                    let mut handles: Vec<HandleId> = Vec::new();
672                    // Per-dep state. Empty for state nodes.
673                    for dr in &mut rec.dep_records {
674                        if dr.prev_data != NO_HANDLE {
675                            handles.push(dr.prev_data);
676                            dr.prev_data = NO_HANDLE;
677                        }
678                        for h in dr.data_batch.drain(..) {
679                            handles.push(h);
680                        }
681                        // D121: per-edge terminal-Error retain
682                        // released here. Closes the cascade leak.
683                        // The producer's own `rec.terminal` slot
684                        // stays intact (preserved below).
685                        if let Some(TerminalKind::Error(h)) = dr.terminal {
686                            handles.push(h);
687                        }
688                        dr.terminal = None;
689                        dr.dirty = false;
690                        dr.involved_this_wave = false;
691                    }
692                    // Pause buffer DATA / replay buffer.
693                    if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
694                        for msg in buffer.drain(..) {
695                            if let Some(h) = msg.payload_handle() {
696                                handles.push(h);
697                            }
698                        }
699                    }
700                    for h in rec.replay_buffer.drain(..) {
701                        handles.push(h);
702                    }
703                    // Pause locks drained → node back to Active.
704                    rec.pause_state = PauseState::Active;
705                    // R2.2.7 / R2.2.8 ROM: state nodes preserve
706                    // cache; compute (fn or op) nodes clear.
707                    // D119: state nodes keep `cached` because the
708                    // value is intrinsic and non-volatile;
709                    // resubscribe sees the same value.
710                    let is_compute = rec.fn_id.is_some() || rec.op.is_some();
711                    if is_compute && rec.cache != NO_HANDLE {
712                        handles.push(rec.cache);
713                        rec.cache = NO_HANDLE;
714                    }
715                    // Reset wave + lifecycle state so reactivation
716                    // begins fresh. `terminal` STAYS (D121).
717                    rec.has_fired_once = false;
718                    rec.dirty = false;
719                    rec.involved_this_wave = false;
720                    // §10.13 perf (D047): reset received_mask — all deps back
721                    // to sentinel on resubscribe.
722                    rec.received_mask = 0;
723                    // §10.3 perf (Slice V1): reset involved_mask.
724                    rec.involved_mask = 0;
725                    if rec.is_dynamic {
726                        rec.tracked.clear();
727                    }
728                    // F8 + D-α: op_scratch handling forks by
729                    // `resubscribable`. Non-resubscribable: eager
730                    // release (F8 path — there's no future reset
731                    // so the leak ends here). Resubscribable +
732                    // has-op: take old AND install fresh; defer
733                    // old release to the queue.
734                    let scratch = if !rec.resubscribable {
735                        std::mem::take(&mut rec.op_scratch)
736                    } else if let Some(op) = rec.op {
737                        // Slice C-3 /qa P1 (retain-before-release):
738                        // build fresh scratch FIRST (this calls
739                        // `binding.retain_handle` for any seed /
740                        // default the op carries), THEN swap. The
741                        // old scratch's release is deferred to the
742                        // `pending_scratch_release` queue (drained
743                        // on next reset_for_fresh_lifecycle or
744                        // Drop for CoreState).
745                        //
746                        // make_op_scratch is fallible only for
747                        // OperatorSeedSentinel; the OperatorOp
748                        // stored on NodeRecord passed validation
749                        // at registration time, so the unwrap
750                        // here is structurally guaranteed (mirrors
751                        // reset_for_fresh_lifecycle).
752                        //
753                        // Uses the binding-explicit static variant
754                        // because we have only `&dyn BindingBoundary`
755                        // here (Subscription::Drop holds no Core).
756                        let new_scratch = Core::make_op_scratch_with_binding(&*binding, op)
757                                .expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time");
758                        let old = std::mem::replace(&mut rec.op_scratch, new_scratch);
759                        if let Some(old_box) = old {
760                            s.shared.pending_scratch_release.push(old_box);
761                        }
762                        // The "scratch_to_release" for lock-released
763                        // release stays None here — the resubscribable
764                        // case routes through the queue.
765                        None
766                    } else {
767                        // Resubscribable but no op (e.g. derived
768                        // compute / dynamic / state). Nothing to
769                        // do for op_scratch.
770                        None
771                    };
772                    (handles, scratch)
773                } else {
774                    // Node destroyed between lock-released hooks
775                    // and this re-acquire (terminate cascade or
776                    // graph removal) — nothing to clear.
777                    (Vec::new(), None)
778                }
779            };
780            // Release handles lock-released. Binding may re-enter
781            // Core during `release_handle` (final-Drop callbacks
782            // are user code).
783            for h in to_release {
784                binding.release_handle(h);
785            }
786            // F8: release operator-scratch handles lock-released
787            // (mirrors `ScratchReleaseGuard::drop` ordering).
788            if let Some(mut scratch) = scratch_to_release {
789                scratch.release_handles(&*binding);
790            }
791        }
792    }
793}
794
795// S2b (D225): no `impl Drop for Subscription` and no `Subscription`
796// Send+Sync assertion — the core-level RAII handle is retired. The sole
797// deregister path is the synchronous owner-invoked `Core::unsubscribe`
798// (which delegates to the shared `unsubscribe_sink` free fn below);
799// binding-layer RAII wraps it where the holder co-owns the Core.
800
801/// A subscriber callback. `Fn` (not `FnMut`) so multiple references coexist
802/// — capture mutable state in `RefCell<T>` (owner-side single-thread) or
803/// atomics on the binding side.
804// D246/S2c/D248: single-owner ⇒ sinks fire owner-side on the one
805// thread that drives the `Core`; the `Send + Sync` bound was
806// shared-Core-era legacy. Dropped — `Core` (which owns the subscriber
807// map) is consequently `!Send + !Sync`, the actor-model shape (the only
808// cross-thread bridge is `Arc<CoreMailbox>` for id-only timer posts).
809// D272 (2026-05-21): the residual `Arc` over the `!Send !Sync` `dyn Fn`
810// was itself vestigial — `Rc` matches the single-owner-thread shape and
811// drops the `clippy::arc_with_non_send_sync` lints. The
812// `assert_not_impl_any!` below locks D248 intent at the type system —
813// any future change that re-introduces `Send`/`Sync` on the callback
814// fails this assert at build time.
815pub type Sink = std::rc::Rc<dyn Fn(&[Message])>;
816
817static_assertions::assert_not_impl_any!(Sink: Send, Sync);
818
819// ---------------------------------------------------------------------------
820// PAUSE/RESUME state — §10.2 of the rust-port session doc
821// ---------------------------------------------------------------------------
822
823/// Per-node pause state.
824///
825/// Replaces the four TS fields (`_pauseLocks`, `_pauseBuffer`,
826/// `_pauseDroppedCount`, `_pauseStartNs`) with a single enum where
827/// the buffered fields are unreachable in the [`Self::Active`] variant —
828/// the compiler refuses access. Per §10.2 simplification.
829///
830/// # Invariants
831///
832/// - `Active` ⇔ no lockId held.
833/// - `Paused { locks, .. }` ⇔ `!locks.is_empty()`.
834/// - Buffered messages are tier 3 (DATA/RESOLVED) and tier 4 (INVALIDATE)
835///   only. Other tiers pass through immediately even while paused.
836/// - `dropped` counts messages that fell out the front of `buffer` due to
837///   the Core-global `pause_buffer_cap`; it is reported on resume so callers
838///   can detect overflow without re-tracking it externally.
839#[derive(Debug)]
840pub(crate) enum PauseState {
841    Active,
842    Paused {
843        /// Active lock holders. `SmallVec` keeps the common 1–2 lock case
844        /// stack-allocated. Replaces `Set<unknown>` from TS.
845        locks: SmallVec<[LockId; 2]>,
846        /// Buffered tier-3/tier-4 outgoing messages, in arrival order.
847        /// Replayed on the final RESUME.
848        buffer: VecDeque<Message>,
849        /// Count of messages dropped from the front when `buffer.len()` would
850        /// exceed `pause_buffer_cap`. Cleared on final RESUME (next pause
851        /// cycle starts fresh).
852        dropped: u32,
853        /// Wall-clock-monotonic ns when the lock first transitioned this node
854        /// from `Active` to `Paused`. Used by R1.3.8.c overflow ERROR
855        /// synthesis to compute `lock_held_duration_ms` in the diagnostic
856        /// payload (Slice F, A3 — 2026-05-07).
857        started_at_ns: u64,
858        /// True after the first overflow event in this pause cycle has been
859        /// reported via [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
860        /// Subsequent overflows in the same cycle don't re-emit ERROR
861        /// (canonical R1.3.8.c: "once per overflow event"). Cleared on
862        /// final RESUME (next pause cycle starts fresh).
863        overflow_reported: bool,
864        /// Default-mode bookkeeping (Slice F audit close, 2026-05-07).
865        /// Set to `true` when an upstream dep delivery arrives while this
866        /// node is paused with [`PausableMode::Default`]. On final RESUME,
867        /// if `true`, the node is added back to `pending_fires` so the fn
868        /// fires once with the consolidated dep state. Always `false` for
869        /// `ResumeAll` mode (the buffered messages are the consolidation
870        /// mechanism there). Cleared on final RESUME.
871        pending_wave: bool,
872    },
873}
874
875impl PauseState {
876    pub(crate) fn is_paused(&self) -> bool {
877        matches!(self, Self::Paused { .. })
878    }
879
880    fn lock_count(&self) -> usize {
881        match self {
882            Self::Active => 0,
883            Self::Paused { locks, .. } => locks.len(),
884        }
885    }
886
887    fn contains_lock(&self, lock_id: LockId) -> bool {
888        match self {
889            Self::Active => false,
890            Self::Paused { locks, .. } => locks.contains(&lock_id),
891        }
892    }
893
894    /// Add a lock; transitions Active → Paused on first lock. Idempotent on
895    /// duplicate lock_id (matches TS convention; spec is silent on the case).
896    fn add_lock(&mut self, lock_id: LockId) {
897        match self {
898            Self::Active => {
899                let mut locks = SmallVec::new();
900                locks.push(lock_id);
901                *self = Self::Paused {
902                    locks,
903                    buffer: VecDeque::new(),
904                    dropped: 0,
905                    started_at_ns: monotonic_ns(),
906                    overflow_reported: false,
907                    pending_wave: false,
908                };
909            }
910            Self::Paused { locks, .. } => {
911                if !locks.contains(&lock_id) {
912                    locks.push(lock_id);
913                }
914            }
915        }
916    }
917
918    /// Mark that an upstream dep delivered DATA to a node paused with
919    /// [`PausableMode::Default`]. The node will re-enter `pending_fires`
920    /// on final RESUME via [`Self::take_pending_wave`].
921    pub(crate) fn mark_pending_wave(&mut self) {
922        if let Self::Paused { pending_wave, .. } = self {
923            *pending_wave = true;
924        }
925    }
926
927    /// Read and clear the `pending_wave` flag. Called from
928    /// [`Core::resume`] when transitioning Paused → Active. Returns `true`
929    /// only if the node was paused with `pending_wave` set.
930    pub(crate) fn take_pending_wave(&mut self) -> bool {
931        if let Self::Paused { pending_wave, .. } = self {
932            std::mem::replace(pending_wave, false)
933        } else {
934            false
935        }
936    }
937
938    /// Remove a lock; if the lockset becomes empty, transition Paused →
939    /// Active and return the buffered messages for replay (along with the
940    /// dropped count for diagnostics). Unknown lock_id is an idempotent
941    /// no-op (matches TS, R1.2.6 implicit).
942    fn remove_lock(&mut self, lock_id: LockId) -> Option<(VecDeque<Message>, u32)> {
943        match self {
944            Self::Active => None,
945            Self::Paused { locks, .. } => {
946                if let Some(idx) = locks.iter().position(|l| *l == lock_id) {
947                    locks.swap_remove(idx);
948                }
949                if locks.is_empty() {
950                    let prev = std::mem::replace(self, Self::Active);
951                    if let Self::Paused {
952                        buffer, dropped, ..
953                    } = prev
954                    {
955                        return Some((buffer, dropped));
956                    }
957                }
958                None
959            }
960        }
961    }
962
963    /// Append a message to the buffer; if the buffer would exceed `cap`,
964    /// pop from the front (oldest-first), increment `dropped`, and return
965    /// the dropped messages so the caller can release any payload handles
966    /// they reference. `cap` of `None` means unbounded.
967    ///
968    /// Returns [`PushBufferedResult`] carrying both the dropped messages
969    /// (for refcount release) and whether this push triggered the FIRST
970    /// overflow event in the current pause cycle (for R1.3.8.c ERROR
971    /// synthesis — the caller schedules a single ERROR per cycle).
972    ///
973    /// Note: refcount management for the message's payload handle is the
974    /// caller's responsibility — see [`Core::queue_notify`] for the
975    /// retain/release discipline. The buffer itself is just a message
976    /// container; refcounts cross the binding boundary.
977    pub(crate) fn push_buffered(&mut self, msg: Message, cap: Option<usize>) -> PushBufferedResult {
978        let mut result = PushBufferedResult::default();
979        if let Self::Paused {
980            buffer,
981            dropped,
982            overflow_reported,
983            ..
984        } = self
985        {
986            buffer.push_back(msg);
987            if let Some(c) = cap {
988                while buffer.len() > c {
989                    if let Some(dropped_msg) = buffer.pop_front() {
990                        result.dropped_msgs.push(dropped_msg);
991                    }
992                    *dropped = dropped.saturating_add(1);
993                }
994            }
995            // R1.3.8.c (Slice F, A3): flag first overflow this cycle.
996            if !result.dropped_msgs.is_empty() && !*overflow_reported {
997                *overflow_reported = true;
998                result.first_overflow_this_cycle = true;
999            }
1000        }
1001        result
1002    }
1003
1004    /// Snapshot the diagnostic for an R1.3.8.c overflow ERROR synthesis.
1005    /// Returns `(dropped_count, lock_held_ns)`. Caller must already know
1006    /// the configured cap (it's a Core-global value, not per-PauseState).
1007    pub(crate) fn overflow_diagnostic(&self) -> Option<(u32, u64)> {
1008        match self {
1009            Self::Active => None,
1010            Self::Paused {
1011                dropped,
1012                started_at_ns,
1013                ..
1014            } => {
1015                let lock_held_ns = monotonic_ns().saturating_sub(*started_at_ns);
1016                Some((*dropped, lock_held_ns))
1017            }
1018        }
1019    }
1020}
1021
1022/// Return shape for [`PauseState::push_buffered`]. Carries both the dropped
1023/// messages (for refcount release) and an "is this the first overflow this
1024/// cycle" flag (for R1.3.8.c ERROR synthesis scheduling).
1025#[derive(Default)]
1026pub(crate) struct PushBufferedResult {
1027    pub(crate) dropped_msgs: Vec<Message>,
1028    pub(crate) first_overflow_this_cycle: bool,
1029}
1030
1031/// Pending R1.3.8.c overflow ERROR synthesis entry. Recorded by
1032/// [`Core::queue_notify`] when the pause buffer first overflows in a cycle;
1033/// drained at wave-end after the lock-released call to
1034/// [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
1035///
1036/// `configured_max` is captured at scheduling time rather than read at
1037/// drain — the user could change `pause_buffer_cap` between schedule and
1038/// drain, and the diagnostic reads "the cap that was in effect when the
1039/// overflow happened."
1040#[derive(Debug, Clone)]
1041pub(crate) struct PendingPauseOverflow {
1042    pub(crate) node_id: NodeId,
1043    pub(crate) dropped_count: u32,
1044    pub(crate) configured_max: usize,
1045    pub(crate) lock_held_ns: u64,
1046}
1047
1048// D274 (2026-05-21): `PartitionOrderViolation` struct was deleted — the
1049// union-find ascending-order acquisition discipline it represented is
1050// gone (groups are user-declared + static post-D248/D253; one Core per
1051// OS thread per D252). The type was never constructed post-§7; it was
1052// retained only so the downstream `Err(_)` match arms compiled (D211
1053// minimal-churn). All those arms are deleted here too.
1054
1055/// Errors returnable by [`Core::try_subscribe`].
1056///
1057/// `Core::subscribe` (the panic-on-error variant) panics on either
1058/// case; `try_subscribe` returns these so operators (zip / concat /
1059/// race / take_until / merge / switch_map / etc.) can match on the
1060/// variant — skip the source for [`Self::TornDown`].
1061///
1062/// Per canonical spec R2.2.7.a / R2.2.7.b (D118, 2026-05-10).
1063#[derive(Error, Debug, Clone, PartialEq, Eq)]
1064pub enum SubscribeError {
1065    // D274 (2026-05-21): `PartitionOrderViolation` variant was deleted —
1066    // it was never constructed post-§7 (the union-find ascending-order
1067    // shim is gone). Downstream `Err(_)` match arms are deleted too.
1068    /// R2.2.7.b (D118, 2026-05-10): the node is non-resubscribable AND
1069    /// has terminated (`[COMPLETE]` or `[ERROR, h]` was delivered).
1070    /// The stream is permanently over; subscribe is rejected.
1071    /// Resubscribable terminal nodes do NOT surface this error — they
1072    /// reset to a fresh lifecycle on subscribe per R2.2.7.a, regardless
1073    /// of TEARDOWN state.
1074    #[error(
1075        "subscribe({node:?}): node is non-resubscribable and has terminated; \
1076         the stream is permanently over (R2.2.7.b)"
1077    )]
1078    TornDown {
1079        /// The non-resubscribable terminal node that rejected the subscribe.
1080        node: NodeId,
1081    },
1082}
1083
1084// D274 (2026-05-21): `DeferredProducerOp` enum was deleted — the
1085// union-find ascending-order shim it deferred over is gone. Producer
1086// sinks call `core.emit/complete/error` directly.
1087
1088/// Errors returnable by [`Core::pause`] and [`Core::resume`].
1089#[derive(Error, Debug, Clone, PartialEq)]
1090pub enum PauseError {
1091    #[error("pause/resume: unknown node {0:?}")]
1092    UnknownNode(NodeId),
1093}
1094
1095/// Errors returnable by [`Core::up`] (canonical R1.4.1).
1096#[derive(Error, Debug, Clone, PartialEq)]
1097pub enum UpError {
1098    /// Node id is not registered.
1099    #[error("up: unknown node {0:?}")]
1100    UnknownNode(NodeId),
1101    /// Tier-3 (DATA / RESOLVED) and tier-5 (COMPLETE / ERROR) are
1102    /// downstream-only per R1.4.1; rejected at the boundary.
1103    #[error(
1104        "up: tier {tier} is forbidden upstream — value (tier 3) and \
1105         terminal-lifecycle (tier 5) planes are downstream-only per R1.4.1"
1106    )]
1107    TierForbidden { tier: u8 },
1108}
1109
1110/// Errors returnable by [`Core::register`] and its sugar wrappers
1111/// ([`Core::register_state`], [`Core::register_producer`],
1112/// [`Core::register_derived`], [`Core::register_dynamic`],
1113/// [`Core::register_operator`]).
1114///
1115/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
1116/// errors so that callers can recover from contract violations without
1117/// process abort. Every variant corresponds to a construction-time
1118/// invariant that the caller is responsible for upholding; the dispatcher
1119/// rejects the registration before any reactive state is created (so
1120/// there is no `Message::Error` channel through which to surface the
1121/// failure — these are imperative-layer errors, not reactive ones).
1122///
1123/// All variants are zero-side-effect: when [`Core::register`] returns
1124/// `Err`, no node has been added to the graph and any handle retains
1125/// taken on the way in (e.g. operator scratch seed retains via
1126/// [`BindingBoundary::retain_handle`]) have been released.
1127#[derive(Error, Debug, Clone, PartialEq, Eq)]
1128pub enum RegisterError {
1129    /// One of the supplied dep ids is not a registered node.
1130    #[error("register: unknown dep {0:?}")]
1131    UnknownDep(NodeId),
1132
1133    /// `op` was supplied (operator node) but `deps` was empty. Operator
1134    /// nodes need at least one dep — for subscription-managed combinators
1135    /// with no declared deps, use [`Core::register_producer`] instead.
1136    #[error(
1137        "register: operator nodes require at least one dep — \
1138         use register_producer for subscription-managed combinators"
1139    )]
1140    OperatorWithoutDeps,
1141
1142    /// [`NodeOpts::initial`] was set to a real handle but the registration
1143    /// shape is not a state node (state nodes are `deps.is_empty() &&
1144    /// fn_id.is_none() && op.is_none()`). Initial cache only makes sense
1145    /// for state nodes.
1146    #[error("register: NodeOpts::initial only valid for state nodes (no deps + no fn + no op)")]
1147    InitialOnlyForStateNodes,
1148
1149    /// A supplied dep is terminal (COMPLETE / ERROR) AND not
1150    /// resubscribable. Adding it would create a permanent wedge — the dep
1151    /// will never re-emit, so the registered node would be stuck.
1152    /// Mirrors [`SetDepsError::TerminalDep`] at registration time.
1153    #[error(
1154        "register: dep {0:?} is terminal and not resubscribable; \
1155         mark it resubscribable before terminating, or remove it from the dep list"
1156    )]
1157    TerminalDep(NodeId),
1158
1159    /// A stateful operator ([`OperatorOp::Scan`] / [`OperatorOp::Reduce`])
1160    /// was registered with `seed = NO_HANDLE`. R2.5.3 first-run gate
1161    /// requires the seed to be a real handle so that the operator can
1162    /// emit on its first fire.
1163    #[error("register: operator seed must be a real handle (R2.5.3); got NO_HANDLE")]
1164    OperatorSeedSentinel,
1165}
1166
1167/// Errors returnable by [`Core::set_pausable_mode`].
1168///
1169/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
1170/// errors. Same imperative-layer error model as [`RegisterError`].
1171#[derive(Error, Debug, Clone, PartialEq, Eq)]
1172pub enum SetPausableModeError {
1173    /// `node_id` is not a registered node.
1174    #[error("set_pausable_mode: unknown node {0:?}")]
1175    UnknownNode(NodeId),
1176    /// The node currently holds at least one pause lock. Changing pausable
1177    /// mode mid-pause would lose buffered content or strand a
1178    /// `pending_wave` flag — resume all locks first.
1179    #[error(
1180        "set_pausable_mode: cannot change pausable mode while paused; \
1181         resume all locks first"
1182    )]
1183    WhilePaused,
1184}
1185
1186/// Per-dep record. Replaces the parallel `deps` / `dep_handles` /
1187/// `dep_terminals` vectors from v1. Canonical spec R2.9.b alignment.
1188///
1189/// Each entry tracks one dep's lifecycle state, wave-scoped batch data,
1190/// and cross-wave `prev_data` for `ctx.prevData` access.
1191pub(crate) struct DepRecord {
1192    /// The dep node this record tracks.
1193    pub(crate) node: NodeId,
1194    /// Last DATA handle from the end of the previous wave. [`NO_HANDLE`]
1195    /// means the dep has never emitted DATA.
1196    pub(crate) prev_data: HandleId,
1197    /// Per-dep dirty flag — awaiting DATA/RESOLVED for current wave.
1198    pub(crate) dirty: bool,
1199    /// Per-dep involved-this-wave flag. Distinguishes:
1200    /// - `involved && data_batch.is_empty()` → dep settled RESOLVED
1201    /// - `!involved && data_batch.is_empty()` → dep was not in this wave
1202    pub(crate) involved_this_wave: bool,
1203    /// DATA handles accumulated this wave. Outside `batch()` scope, at most
1204    /// 1 element. Inside `batch()`, K emits on the source produce K entries
1205    /// per R1.3.6.b coalescing. Each handle holds a `retain_handle` share
1206    /// taken at `deliver_data_to_consumer` time; released at wave-end
1207    /// rotation in `clear_wave_state`.
1208    pub(crate) data_batch: SmallVec<[HandleId; 1]>,
1209    /// Terminal state for this dep. `None` = dep is live.
1210    /// `Some` = dep emitted COMPLETE/ERROR. When ALL entries are Some,
1211    /// the node auto-cascades per Lock 2.B (ERROR dominates COMPLETE).
1212    pub(crate) terminal: Option<TerminalKind>,
1213}
1214
1215impl DepRecord {
1216    fn new(node: NodeId) -> Self {
1217        Self {
1218            node,
1219            prev_data: NO_HANDLE,
1220            dirty: false,
1221            involved_this_wave: false,
1222            data_batch: SmallVec::new(),
1223            terminal: None,
1224        }
1225    }
1226}
1227
1228/// Internal node record. Mirrors `core.ts:132–154` post-D030 unification.
1229///
1230/// **Kind is derived, not stored** (D030, Slice D). `(dep_records.is_empty(),
1231/// fn_id, op, is_dynamic)` uniquely identifies the kind — see [`NodeKind`].
1232/// Helper methods (`is_state()`, `is_producer()`, `is_compute()`,
1233/// `is_operator()`, `skips_auto_cascade()`, `kind()`) cover the common
1234/// predicates without unpacking via [`Core::kind_of`].
1235///
1236/// The 5 bool fields (`has_fired_once`, `dirty`, `involved_this_wave`,
1237/// `has_received_teardown`, `resubscribable`, `is_dynamic`) each represent
1238/// an orthogonal concern. `is_dynamic` is constant per node (set at
1239/// register time); the others are mutable lifecycle state. Collapsing
1240/// them into a bitfield would obscure intent.
1241#[allow(clippy::struct_excessive_bools)]
1242pub(crate) struct NodeRecord {
1243    /// Per-dep records. Replaces the old parallel `deps` / `dep_handles` /
1244    /// `dep_terminals` vecs. Dep NodeIds derived via `dep_ids()`.
1245    pub(crate) dep_records: Vec<DepRecord>,
1246    /// User-fn id for closure-form dispatch. `Some` for Derived / Dynamic /
1247    /// Producer; `None` for State / Operator. (Operator dispatch goes via
1248    /// [`Self::op`] instead.)
1249    pub(crate) fn_id: Option<FnId>,
1250    /// Operator discriminant for typed-op dispatch. `Some` for Operator
1251    /// nodes; `None` otherwise. Mutually exclusive with `fn_id` (a node is
1252    /// either closure-form OR typed-op, never both).
1253    pub(crate) op: Option<OperatorOp>,
1254    /// True for Dynamic nodes (R2.5.3 — fn declares actually-tracked dep
1255    /// indices per fire). False for everything else. Only meaningful when
1256    /// `fn_id.is_some()` AND `!dep_records.is_empty()`.
1257    pub(crate) is_dynamic: bool,
1258    pub(crate) equals: EqualsMode,
1259
1260    // Mutable state
1261    pub(crate) cache: HandleId,
1262    pub(crate) has_fired_once: bool,
1263    pub(crate) subscribers: HashMap<SubscriptionId, Sink>,
1264    /// Monotonic counter bumped on every mutation of [`Self::subscribers`]
1265    /// (insert on subscribe, remove on `Subscription::Drop`, remove on
1266    /// handshake-panic cleanup). Used by
1267    /// [`crate::batch::Core::queue_notify`] to detect mid-wave subscriber-
1268    /// set changes and start a fresh `PendingBatch` with an updated sink
1269    /// snapshot — closes D2 (Slice X4, 2026-05-08): the late-subscriber
1270    /// and multi-emit-per-wave gap where the pre-fix per-node single
1271    /// snapshot meant a sub installed between two emits to the same node
1272    /// in one wave was invisible to the second emit's flush.
1273    ///
1274    /// Per-node (not per-Core) so that a subscribe to node A doesn't
1275    /// invalidate snapshot reuse for node B's pending batch in the same
1276    /// wave.
1277    pub(crate) subscribers_revision: u64,
1278    /// For dynamic nodes: which dep indices fn actually tracks.
1279    /// For static derived: all indices, populated at construction.
1280    pub(crate) tracked: HashSet<usize>,
1281
1282    // Wave-scoped state — cleared at wave end.
1283    pub(crate) dirty: bool,
1284    pub(crate) involved_this_wave: bool,
1285
1286    /// Per-node pause state. Default `Active`. See [`PauseState`].
1287    pub(crate) pause_state: PauseState,
1288    /// Pause behavior mode (canonical-spec §2.6). Set at registration via
1289    /// [`NodeOpts::pausable`]. Default [`PausableMode::Default`] suppresses
1290    /// fn-fire while paused and consolidates N pause-window dep deliveries
1291    /// into one fn-fire on RESUME; `ResumeAll` buffers tier-3/4 outgoing
1292    /// for verbatim replay; `Off` ignores PAUSE entirely. See
1293    /// [`PausableMode`].
1294    pub(crate) pausable: PausableMode,
1295    /// Replay buffer cap (R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
1296    /// `None` disables; `Some(N)` keeps a circular VecDeque of the last N
1297    /// DATA-handle emissions for late-subscriber replay. Each handle in
1298    /// the buffer owns one binding-side retain share, released on evict
1299    /// (cap exceeded) or in `Drop for CoreState`.
1300    pub(crate) replay_buffer_cap: Option<usize>,
1301    pub(crate) replay_buffer: VecDeque<HandleId>,
1302
1303    /// Terminal lifecycle state for THIS node's outgoing stream. Once set,
1304    /// further `emit` calls are silent no-ops, fn no longer fires, and only
1305    /// the terminal message has been queued downstream.
1306    pub(crate) terminal: Option<TerminalKind>,
1307    /// True after the first TEARDOWN has been processed for this node
1308    /// (R2.6.4 / Lock 6.F). Subsequent TEARDOWN deliveries are idempotent
1309    /// — the auto-prepended COMPLETE only fires on the first one. Without
1310    /// this flag, a redundant TEARDOWN delivered via the cascade plus an
1311    /// explicit `core.teardown(node)` would re-emit `[COMPLETE, TEARDOWN]`
1312    /// to subscribers per delivery, which is incorrect.
1313    pub(crate) has_received_teardown: bool,
1314    /// Per R2.2.7 / R2.5.3 — resubscribable terminal lifecycle.
1315    /// When `true` AND `terminal == Some(...)`, a fresh subscribe call
1316    /// will reset the node: clear `terminal`, `has_fired_once`,
1317    /// `has_received_teardown`, all dep_records to sentinel, and drain the
1318    /// pause lockset. Default `false`.
1319    pub(crate) resubscribable: bool,
1320    /// Meta companion nodes attached to this node per R1.3.9.d. When this
1321    /// node tears down, its meta companions are torn down FIRST (before
1322    /// the main node's auto-COMPLETE + TEARDOWN wire emission), so
1323    /// observers see companions terminate before the parent. The ordering
1324    /// is load-bearing — meta nodes typically subscribe to parent state
1325    /// that becomes inconsistent during the parent's destruction phase.
1326    pub(crate) meta_companions: Vec<NodeId>,
1327    /// R5.4 / D011 partial-mode: when `true`, fire_fn skips the R2.5.3
1328    /// first-run gate — the node fires as soon as ANY dep delivers a
1329    /// real handle, even if other deps remain sentinel. Defaults to
1330    /// `false` (gated). Lifted into Core for operator support; for
1331    /// State/Derived/Dynamic nodes the field is settable but the gated
1332    /// path remains the typical caller default.
1333    pub(crate) partial: bool,
1334    /// D263 — when `true`, `fire_fn`'s first-run gate (R2.5.3) counts a
1335    /// terminal dep as "real input" so the node can fire on a
1336    /// COMPLETE-without-DATA from any single dep. Mirrors `fire_operator`'s
1337    /// unconditional `dr.terminal.is_some()` clause (gated here on the
1338    /// per-node opt-in so existing `fire_fn` consumers preserve the
1339    /// historical sentinel-hold behaviour by default). Substrate-internal
1340    /// per D263 — not surfaced on the `Impl` parity contract yet.
1341    pub(crate) terminal_as_real_input: bool,
1342    /// Topological rank — 1 + max dep rank. Nodes with no deps have rank 0.
1343    /// Used by `pick_next_fire` for O(|pending_fires|) scheduling instead
1344    /// of O(V) transitive BFS. Computed at registration; recomputed on
1345    /// `set_deps` for the modified node (consumer propagation deferred —
1346    /// see `porting-deferred.md`). §10 perf optimization (D047, Slice U).
1347    pub(crate) topo_rank: u32,
1348    /// Bitmask for first-run gate — bit i set when dep i delivers first
1349    /// DATA. `has_sentinel_deps()` becomes a single integer compare for
1350    /// ≤64 deps. Falls back to `iter().any()` when `dep_count > 64`.
1351    /// Reset on resubscribe. §10.13 perf optimization (D047, Slice U).
1352    pub(crate) received_mask: u64,
1353    /// §10.3 diamond resolution bitmask — bit i set when dep i is
1354    /// involved in the current wave (DATA delivered). Mirrors
1355    /// `received_mask` pattern. Cleared to 0 at wave end instead of
1356    /// iterating all deps. For ≤64 deps, `DepBatch::involved` can be
1357    /// derived via `(involved_mask >> dep_idx) & 1 != 0`. For >64 deps,
1358    /// falls back to per-dep `DepRecord::involved_this_wave` field.
1359    /// §10.3 perf optimization (Slice V1).
1360    pub(crate) involved_mask: u64,
1361    /// Generic per-operator scratch slot (Slice C-3, D026). Replaces
1362    /// the typed `operator_state: HandleId` field used by Slices C-1 / C-2.
1363    /// `None` for non-operator kinds and operators with no cross-wave
1364    /// state (Map / Filter / Combine / WithLatestFrom / Merge); `Some`
1365    /// for stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
1366    /// [`DistinctUntilChanged`] / [`Pairwise`] / [`Take`] / [`Skip`] /
1367    /// [`TakeWhile`] / [`Last`]).
1368    ///
1369    /// The boxed value implements
1370    /// [`OperatorScratch`](crate::op_state::OperatorScratch); its
1371    /// `release_handles` method is called from
1372    /// [`reset_for_fresh_lifecycle`] (resubscribable terminal cycle) and
1373    /// from [`Drop for CoreState`].
1374    ///
1375    /// **Refcount discipline:** the state struct owns whatever handle
1376    /// shares it stores (e.g., [`ScanState::acc`](crate::op_state::ScanState::acc),
1377    /// [`LastState::latest`](crate::op_state::LastState::latest)).
1378    /// Per-fire helpers retain the new value before releasing the old;
1379    /// `release_handles` releases the current shares at end-of-life.
1380    pub(crate) op_scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1381}
1382
1383impl NodeRecord {
1384    // ---- Kind predicates (D030 — derived from field shape) ----
1385
1386    /// True iff this is a state node (no deps, no fn, no op).
1387    pub(crate) fn is_state(&self) -> bool {
1388        self.dep_records.is_empty() && self.fn_id.is_none() && self.op.is_none()
1389    }
1390
1391    /// True iff this is a producer node (no deps + has fn + no op).
1392    /// Producers fire fn once on first subscribe; cleanup fires via
1393    /// [`BindingBoundary::producer_deactivate`] (D031, Slice D).
1394    pub(crate) fn is_producer(&self) -> bool {
1395        self.dep_records.is_empty() && self.fn_id.is_some() && self.op.is_none()
1396    }
1397
1398    /// True iff this is a compute node (Derived / Dynamic / Operator) —
1399    /// has at least one dep AND either a fn or an op.
1400    #[allow(dead_code)] // Convenience predicate; callers may use is_state/is_producer instead.
1401    pub(crate) fn is_compute(&self) -> bool {
1402        !self.dep_records.is_empty() && (self.fn_id.is_some() || self.op.is_some())
1403    }
1404
1405    /// True iff this is an Operator node (has op set).
1406    #[allow(dead_code)] // Direct `op.is_some()` is more common; this is a readability sugar.
1407    pub(crate) fn is_operator(&self) -> bool {
1408        self.op.is_some()
1409    }
1410
1411    /// True iff this node opts OUT of Lock 2.B auto-cascade —
1412    /// Operator(Reduce) / Operator(Last) intercept upstream COMPLETE.
1413    ///
1414    /// D263: a non-operator node with `terminal_as_real_input == true`
1415    /// ALSO skips auto-cascade so `fire_fn` runs on terminal-only deps —
1416    /// mirrors the Reduce-class operator dispatch that already counts
1417    /// terminal as real input. Note: `partial == true` ALONE does NOT
1418    /// skip cascade — canonical-spec §5.4 / R5.4 locks `partial` to
1419    /// first-run-gate-bypass ONLY. To get fire-on-COMPLETE in partial
1420    /// mode, set BOTH flags (or wrap in a Reduce-class operator).
1421    pub(crate) fn skips_auto_cascade(&self) -> bool {
1422        match self.op {
1423            Some(op) => NodeKind::Operator(op).skips_auto_cascade(),
1424            None => self.terminal_as_real_input,
1425        }
1426    }
1427
1428    /// Compute the public-API [`NodeKind`] from the field shape (D030).
1429    /// Used by [`Core::kind_of`] and rare internal sites that need the
1430    /// enum (most use the predicate methods above).
1431    pub(crate) fn kind(&self) -> NodeKind {
1432        if let Some(op) = self.op {
1433            NodeKind::Operator(op)
1434        } else if self.dep_records.is_empty() {
1435            if self.fn_id.is_some() {
1436                NodeKind::Producer
1437            } else {
1438                NodeKind::State
1439            }
1440        } else if self.is_dynamic {
1441            NodeKind::Dynamic
1442        } else {
1443            NodeKind::Derived
1444        }
1445    }
1446
1447    // ---- Existing accessors ----
1448
1449    /// Iterator over dep NodeIds in declaration order.
1450    pub(crate) fn dep_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
1451        self.dep_records.iter().map(|r| r.node)
1452    }
1453
1454    /// Collected dep NodeIds — for call sites that need a `Vec<NodeId>`.
1455    pub(crate) fn dep_ids_vec(&self) -> Vec<NodeId> {
1456        self.dep_ids().collect()
1457    }
1458
1459    /// True if any dep is in sentinel state (never emitted DATA and no
1460    /// data this wave). Replaces the old `dep_handles.contains(&NO_HANDLE)`.
1461    pub(crate) fn has_sentinel_deps(&self) -> bool {
1462        let n = self.dep_records.len();
1463        if n == 0 {
1464            return false;
1465        }
1466        if n <= 64 {
1467            // O(1): check if all bits [0..n) are set.
1468            let full_mask = if n == 64 { u64::MAX } else { (1u64 << n) - 1 };
1469            self.received_mask != full_mask
1470        } else {
1471            // Fallback for >64 deps (extremely rare).
1472            self.dep_records
1473                .iter()
1474                .any(|r| r.prev_data == NO_HANDLE && r.data_batch.is_empty())
1475        }
1476    }
1477
1478    /// Find the index of a dep by NodeId.
1479    pub(crate) fn dep_index_of(&self, dep_id: NodeId) -> Option<usize> {
1480        self.dep_records.iter().position(|r| r.node == dep_id)
1481    }
1482
1483    /// True if ALL dep terminal slots are populated (Lock 2.B cascade check).
1484    pub(crate) fn all_deps_terminal(&self) -> bool {
1485        !self.dep_records.is_empty() && self.dep_records.iter().all(|r| r.terminal.is_some())
1486    }
1487}
1488
1489// Q-beyond Sub-slice 1 (D108, 2026-05-09): `CrossPartitionState` removed.
1490//
1491// The four wave-scoped fields previously held under
1492// `Core::cross_partition: Arc<parking_lot::Mutex<CrossPartitionState>>`
1493// (Q2, 2026-05-09) moved to a per-thread `WaveState` thread_local in
1494// [`crate::batch`]. The bench-driven rationale is documented at
1495// [`crate::batch::WaveState`]: cross-thread cache-line bouncing on the
1496// `cross_partition` mutex was the dominant cost in Phase J's regression,
1497// not single-thread mutex hop count. Per-thread placement eliminates the
1498// bounce point entirely.
1499//
1500// **Refcount discipline preserved.** `wave_cache_snapshots` and
1501// `deferred_handle_releases` still hold binding-side handle retains;
1502// outermost `BatchGuard::drop` drains them via `Core::binding` on both
1503// success and panic paths (the binding ref the prior
1504// `CrossPartitionState` held for its `Drop` impl is no longer needed —
1505// `BatchGuard` already holds a `Core` clone with the binding).
1506
1507/// Core-global state with NO per-scheduling-group partition
1508/// (Slice B-2 Step 1, D220-EXEC). Hoisted out of [`CoreState`]
1509/// so that when Step 2 shards `nodes`/`children` per `ShardKey`, THESE
1510/// remain singular: monotonic id counters, the topology-sink registry,
1511/// the cross-shard-visible `currently_firing` reentrancy stack (P13 /
1512/// /qa F2 — the load-bearing cross-thread-visibility property), the two
1513/// Core-global caps, and the Core-shutdown scratch-release queue.
1514///
1515/// **Step 1 (this stage): behaviour-identical.** A plain sub-struct of
1516/// [`CoreState`] under the SAME single cell/lock — `s.shared.<f>` is
1517/// exactly the old `s.<f>`. Step 2 reshapes [`crate::state_cell`] to
1518/// hold one `CoreShared` + a per-`ShardKey` shard map; pure-shared ops
1519/// then take only the shared guard (`with_shared`), shard ops the shard
1520/// guard (`with_shard`), and when both are needed the deadlock-free
1521/// rule is **shard guard OUTER, shared guard INNER** (D220-EXEC).
1522// `pub` (not `pub(crate)`): same as `CoreState`. The struct *name* is
1523// public but every field is `pub(crate)`, so it is an opaque token
1524// outside the crate — no internal state is reachable. (Pre-D246 this
1525// was the shared region of a `StateCell`-generic sharded split; the
1526// generic + shard map were collapsed when single-owner became the
1527// substrate floor.)
1528pub struct CoreShared {
1529    pub(crate) next_node_id: u64,
1530    pub(crate) next_subscription_id: u64,
1531    pub(crate) next_lock_id: u64,
1532    /// Topology-change sinks. Keyed by subscription id for O(1) removal.
1533    pub(crate) topology_sinks: HashMap<u64, crate::topology::TopologySink>,
1534    pub(crate) next_topology_id: u64,
1535    /// Core-global cap on per-node pause replay buffer length. `None` means
1536    /// unbounded. Per the user direction (Q1, 2026-05-05): start core-global;
1537    /// per-node override can be added later as a pure addition without API
1538    /// breakage. Default `None`.
1539    pub(crate) pause_buffer_cap: Option<usize>,
1540    /// Core-global cap on wave-drain iterations before
1541    /// [`crate::batch::Core::drain_and_flush`] aborts with a diagnostic panic.
1542    /// Replaces the prior `MAX_DRAIN_ITERATIONS` hard-coded constant
1543    /// (R4.3 / Lock 2.F′). Default `10_000`.
1544    ///
1545    /// The drain loop bound exists to surface runtime cycles
1546    /// (e.g. an operator that re-arms its own `pending_fires` slot during
1547    /// `invoke_fn`) as a panic with context, rather than letting Core
1548    /// spin forever. Structural cycles via [`Core::set_deps`] are
1549    /// rejected at edge-mutation time (`SetDepsError::WouldCreateCycle`);
1550    /// registration is structurally cycle-safe by construction (the new
1551    /// node's id is not allocated until AFTER deps are validated, so deps
1552    /// cannot transitively reach the new node). The drain bound is the
1553    /// safety net for runtime cycles that bypass both static checks.
1554    pub(crate) max_batch_drain_iterations: u32,
1555    /// A6 reentrancy guard stack (Slice F, 2026-05-07): the stack of
1556    /// NodeIds whose fn is currently being invoked. Pushed at the top of
1557    /// `fire_fn` (just before the lock-released `invoke_fn` call) and
1558    /// popped on return / unwind via the [`crate::batch::FiringGuard`]
1559    /// RAII helper. [`Core::set_deps`] consults this set and rejects
1560    /// with [`SetDepsError::ReentrantOnFiringNode`] if `n` is currently
1561    /// firing — preventing the D1 `tracked` index corruption. Read by
1562    /// the P13 partition-migration check (D091) to reject mid-fire
1563    /// `set_deps` that would migrate a firing node's partition.
1564    ///
1565    /// **Core-global / cross-shard-visible.** /qa F2 reverted
1566    /// (2026-05-10): briefly placed on per-thread `WaveState`, then
1567    /// moved BACK after /qa F2 surfaced the cross-thread P13 bypass
1568    /// (per-thread placement made Thread B's `set_deps` read its own
1569    /// empty stack → P13 silently bypassed for cross-thread `set_deps`
1570    /// during Thread A's lock-released `invoke_fn`). Cross-thread
1571    /// visibility is the load-bearing property D091 requires; keeping it
1572    /// in `CoreShared` (NOT a per-`ShardKey` shard) preserves it across
1573    /// the Slice B-2 shard split (D220-EXEC / /qa F2 / D214 P13).
1574    ///
1575    /// Membership semantics (NOT strict LIFO): consumed via
1576    /// `contains(&n)` membership test. `FiringGuard::drop` pops the
1577    /// right-most matching `node_id` via `rposition` + `swap_remove`;
1578    /// physical order of remaining entries may not match construction
1579    /// order, but membership is preserved.
1580    pub(crate) currently_firing: Vec<NodeId>,
1581    /// D-α (D028 full close, 2026-05-10): per-Core defer queue for old
1582    /// operator-scratch boxes pushed by Phase G on resubscribable nodes.
1583    /// Phase G builds a fresh scratch via [`Core::make_op_scratch`] and
1584    /// installs it on the node's `op_scratch` slot (so re-activation
1585    /// sees fresh counters / a fresh seed-share); the OLD scratch's
1586    /// handle retains are deferred to one of two drain points to
1587    /// preserve the Slice C-3 /qa P1 retain-before-release invariant
1588    /// (the C-3 test
1589    /// `scan_resubscribable_reset_with_seed_aliasing_acc_does_not_collapse_registry`
1590    /// fails if the old `acc` share is released before the fresh seed
1591    /// share is taken — when `acc == seed` interns to the same registry
1592    /// slot, eager release drops the slot to zero before the fresh
1593    /// retain bumps it back up).
1594    ///
1595    /// Drain points:
1596    /// 1. [`Core::reset_for_fresh_lifecycle`] — after Phase 2 takes
1597    ///    fresh retains on the new seed/default, the queue drain
1598    ///    releases queued boxes whose handles may have aliased.
1599    /// 2. [`Drop for CoreState`] — catch-all on Core shutdown.
1600    ///
1601    /// Note that the queue lives on `CoreShared` within `CoreState` (not
1602    /// on `Core`) so `Drop for CoreState` has access to both the binding
1603    /// and the queue under the single state lock — no separate mutex
1604    /// needed.
1605    ///
1606    /// **Growth bound (/qa m17, 2026-05-10):** size is bounded by the
1607    /// number of non-terminal deactivate-reactivate cycles since the
1608    /// last terminal-then-resubscribe reset on any resubscribable +
1609    /// has-op node in this Core. Each entry is a `Box<dyn OperatorScratch>`
1610    /// holding O(1) handles (Scan/Reduce/Last: 1 handle; Take/Skip/
1611    /// TakeWhile/DistinctUntilChanged/Pairwise: 0 or 1 handle). Typical
1612    /// workloads: O(few KB). Degenerate workloads (long-lived Cores
1613    /// with frequent deactivate-reactivate cycles and no terminal
1614    /// resets) should call `core.complete()` / `core.error()` on the
1615    /// op node periodically to trigger queue drain via
1616    /// `reset_for_fresh_lifecycle`. The release happens unconditionally
1617    /// on Core drop, so this is not a leak — just a deferred-release
1618    /// growth concern under unusual workloads.
1619    pub(crate) pending_scratch_release: Vec<Box<dyn crate::op_state::OperatorScratch>>,
1620    /// Binding handle, held here so [`Drop for CoreShared`] can drain
1621    /// `pending_scratch_release` on Core shutdown (Step 2a, D220-EXEC:
1622    /// `CoreShared` is hoisted to its own region, separate from any
1623    /// shard's `CoreState`, so the old `Drop for CoreState` scratch-drain
1624    /// — which used `CoreState::binding` — moves here). Cheap `Arc`
1625    /// clone; `Core` / each shard `CoreState` also hold one.
1626    pub(crate) binding: Arc<dyn BindingBoundary>,
1627}
1628
1629impl Drop for CoreShared {
1630    fn drop(&mut self) {
1631        // D-α (D028) catch-all drain of the deferred operator-scratch
1632        // queue on Core shutdown (moved from `Drop for CoreState`'s tail
1633        // with the `pending_scratch_release` + `binding` hoist). Release
1634        // BEFORE the `Vec<Box<…>>` drops (each box's `Drop` is a plain
1635        // field drop — HandleIds are raw u64s, no binding re-entry).
1636        let queued: Vec<Box<dyn crate::op_state::OperatorScratch>> =
1637            std::mem::take(&mut self.pending_scratch_release);
1638        for mut scratch in queued {
1639            scratch.release_handles(&*self.binding);
1640        }
1641    }
1642}
1643
1644/// Per-shard mutable Core state (Step 2a, D220-EXEC: still exactly ONE
1645/// shard — behaviour-identical with the pre-B-2 single `CoreState`;
1646/// Step 2b keys a per-[`crate::state_cell::ShardKey`] map of these).
1647/// All mutable Core state, behind one [`parking_lot::Mutex`].
1648///
1649/// **Architecture history.** Q2 (2026-05-09) split four wave-scoped
1650/// cross-partition aggregation fields out into a separate
1651/// `parking_lot::Mutex<CrossPartitionState>` on [`Core`]. Q-beyond
1652/// Sub-slice 1 (D108, 2026-05-09) eliminated `CrossPartitionState`
1653/// entirely — its four fields moved to per-thread `WaveState`
1654/// thread_local in `crate::batch`. Sub-slice 2 + 3 moved 8 more
1655/// wave-scoped fields the same way. /qa F1+F2 (2026-05-10) reverted
1656/// `in_tick` and `currently_firing` to CoreState; `currently_firing`
1657/// stays here (cross-thread P13 set_deps check, /qa F2), but `in_tick`
1658/// was re-keyed per-(Core, thread) into the
1659/// `crate::batch::IN_TICK_OWNED` thread_local (D047, 2026-05-15) and
1660/// then collapsed to a one-Core-per-OS-thread `Cell<u64>` slot
1661/// (D252, S5, 2026-05-19) — the actor-model invariant locks
1662/// "one Core per OS thread" so a single-generation slot suffices,
1663/// with cross-Core same-thread nesting panicking fail-loud at
1664/// `BatchGuard::claim_in_tick`. See `docs/rust-port-decisions.md`.
1665///
1666/// The D1 patch (2026-05-09) moved Slice G's `tier3_emitted_this_wave`
1667/// set to a per-thread thread-local in `crate::batch` (was briefly
1668/// per-partition under Q3 v1; that placement was vulnerable to mid-wave
1669/// cross-thread `set_deps` partition splits — see
1670/// `docs/porting-deferred.md` "Per-partition state-shard refactor"
1671/// closing summary). Q-beyond
1672/// will continue the shape decomposition by sharding most of the
1673/// remaining fields per-partition.
1674// `pub` (not `pub(crate)`): the struct *name* is public but every field
1675// stays `pub(crate)`, so it is an opaque token outside the crate — no
1676// internal state is reachable. (Pre-D246 this was a per-shard region
1677// of a `StateCell`-generic split; the generic + shard map were
1678// collapsed when single-owner became the substrate floor.)
1679pub struct CoreState {
1680    pub(crate) nodes: HashMap<NodeId, NodeRecord>,
1681    /// Inverted adjacency: `parent → children`. Updated on registration.
1682    pub(crate) children: HashMap<NodeId, HashSet<NodeId>>,
1683    /// Binding-boundary handle for `Drop`-time refcount balancing.
1684    /// `Core` also holds a clone of this Arc; storing it here lets
1685    /// `Drop for CoreState` walk every retained slot and release the
1686    /// binding-side share when the last `Core` clone drops. Without this,
1687    /// `cache` / `terminal` / `dep_terminals` Error / pause-buffer payload
1688    /// handle refs leak in the binding registry until process exit.
1689    pub(crate) binding: Arc<dyn BindingBoundary>,
1690    // Wave-scoped state lives off `CoreState`:
1691    // - `pending_fires` / `pending_notify` / `deferred_flush_jobs` /
1692    //   `deferred_cleanup_hooks` / `pending_wipes` /
1693    //   `invalidate_hooks_fired_this_wave` / `wave_cache_snapshots` /
1694    //   `pending_auto_resolve` / `pending_pause_overflow` →
1695    //   per-thread [`crate::batch::WaveState`] (D108 Sub-slices 1–3).
1696    // - `in_tick` → one-Core-per-OS-thread [`crate::batch::IN_TICK_OWNED`]
1697    //   (D252; cross-Core same-thread nesting panics fail-loud).
1698    // - tier3-emitted-this-wave → per-thread
1699    //   [`crate::batch::TIER3_EMITTED_THIS_WAVE`] (Slice G / D1).
1700    // Core-global non-shard state (`next_*`, `topology_sinks`,
1701    // `currently_firing` [cross-shard-visible /qa F2], the two caps,
1702    // `pending_scratch_release`) lives on [`CoreShared`] (Slice B-2
1703    // Step 1, D220-EXEC) — see its doc for each field's rationale.
1704}
1705
1706// # Wave execution = one uninterrupted owner-side drain (S4, D246/D248/D249)
1707//
1708// The state lock (`state: RefCell<CoreState>`) is **dropped** around
1709// binding callbacks (`invoke_fn`, `custom_equals`) so user fns may
1710// re-enter Core. The original M1 design serialized WAVE EXECUTION
1711// across threads with a `wave_owner` `parking_lot::ReentrantMutex` held
1712// for the wave; S2c/D248 deleted it. `Core` is now single-owner
1713// `!Send + !Sync`: exactly one owner thread ever drives a given `Core`,
1714// so a wave is one uninterrupted owner-side drain. There is no
1715// cross-thread emitter to block and no wave-ownership mutex.
1716//
1717// - Same-thread / same-Core re-entrance (a user fn or in-wave sink
1718//   re-entering via the owner-side mailbox/`DeferQueue` seam) runs as a
1719//   nested wave: the inner `run_wave` sees `in_tick = true` (the
1720//   one-Core-per-OS-thread `batch::IN_TICK_OWNED` slot holds this
1721//   Core's `generation`, D252) and skips its own drain — the outer
1722//   drain picks the queued op up to quiescence.
1723// - Cross-`Core` concurrency is host-native: independent per-worker
1724//   Cores (actor model). The only cross-thread bridge into a `!Send`
1725//   Core is the id-only `Arc<CoreMailbox>` (timer / producer `post_*` +
1726//   the `runnable` wake bit), drained owner-side.
1727//
1728// The happens-after contract (`emit` returning ⇒ subscribers observed)
1729// holds structurally: the single owner thread runs the emit's drain to
1730// quiescence before returning; a mid-wave subscribe is frozen against
1731// double-delivery by the `subscribers_revision` `PendingBatch` snapshot
1732// (Slice X4/D2), not by a cross-thread lock.
1733
1734/// Monotonic generation counter for `Core` instances. Used by the
1735/// per-thread `PARTITION_CACHE` in `batch.rs` to distinguish Core
1736/// instances without relying on `Arc::as_ptr` (which can be reused by
1737/// the allocator after a Core is dropped — ABA hazard). One atomic
1738/// increment per `Core::new`; negligible cost.
1739static CORE_GENERATION: AtomicU64 = AtomicU64::new(1);
1740
1741/// The handle-protocol Core dispatcher.
1742///
1743/// **Single-owner, `!Send + !Sync`** (D221/D246/D248). Holds an `Arc`
1744/// to the [`BindingBoundary`] (the `Send + Sync` boundary aggregate)
1745/// and all dispatch state in owner-thread-only `RefCell`s. **NOT
1746/// `Clone`** (D223 deleted shared-Core), **NOT `Send`/`Sync`** (D248
1747/// relaxed sinks off `Send + Sync` and the state cell follows). One
1748/// `Core` is driven from exactly one OS thread (the actor model);
1749/// cross-`Core` parallelism is independent per-worker Cores
1750/// (M6-ready), reached from any thread via the `Send + Sync`
1751/// [`crate::mailbox::CoreMailbox`] id-only bridge for autonomous
1752/// timer/producer tasks. See `RefCell<CoreShared>` / `RefCell<CoreState>`
1753/// field rustdoc below for the post-D246/D248 single-owner shape.
1754pub struct Core {
1755    /// Core-global region (id counters, topology-sink registry,
1756    /// `currently_firing`, the two caps, the scratch-release queue).
1757    /// D246/S2c: single-owner ⇒ a plain `RefCell` (the pre-D246
1758    /// `parking_lot`-Mutex + shard-generic split was shared-Core-era
1759    /// legacy; the actor model drives a `Core` from exactly one
1760    /// thread). D248 relaxed the substrate `Sink`/
1761    /// `TopologySink` off `Send + Sync`, so `CoreState` owns `!Send`
1762    /// sinks ⇒ **`Core` is `!Send + !Sync`** (one owner thread, never
1763    /// relocated cross-thread; the only cross-thread bridge is the
1764    /// id-only `Arc<CoreMailbox>`). This is the actor-model shape
1765    /// (D221/D223/D248); cross-`Core` parallelism = independent
1766    /// per-worker Cores. A re-entrant double-borrow panics loudly — a
1767    /// dispatcher bug (a missing lock-released bracket around a binding
1768    /// callback), not a user error.
1769    pub(crate) shared: std::cell::RefCell<CoreShared>,
1770    /// The node/children/binding region (was the sharded `CoreState`;
1771    /// single shard always under single-owner — D246/S2c collapse).
1772    pub(crate) state: std::cell::RefCell<CoreState>,
1773    /// `Send + Sync` bridge for autonomous async producers (timer tasks)
1774    /// that can no longer hold `&Core` (D223/D227/D230). Timer tasks
1775    /// post `(node, handle)`; [`Self::drain_mailbox`] applies them
1776    /// owner-side via the sync `emit`. Owns the per-group `runnable` wake
1777    /// bit (S4 wires it; M6 reads it from the host executor).
1778    pub(crate) mailbox: Arc<crate::mailbox::CoreMailbox>,
1779    /// Owner-only deferred-closure queue (D249/S2c). Holds the `!Send`
1780    /// `Defer` closures split off `CoreMailbox` (which stays
1781    /// `Send + Sync` for the timer bridge). `Rc` (not `Arc`): shared
1782    /// with `graphrefly-operators`' `ProducerEmitter` on the **one**
1783    /// owner thread; never crosses threads. Drained owner-side by
1784    /// [`Self::drain_mailbox`] + the in-wave `BatchGuard` drain.
1785    pub(crate) deferred: std::rc::Rc<crate::mailbox::DeferQueue>,
1786    pub(crate) binding: Arc<dyn BindingBoundary>,
1787    /// Unique generation ID for this Core instance. Assigned from
1788    /// [`CORE_GENERATION`] at construction; nonzero (the counter starts
1789    /// at 1 so `0` can serve as the [`crate::batch::IN_TICK_OWNED`]
1790    /// "no active Core" sentinel under D252). Stored in the one-Core-
1791    /// per-OS-thread `IN_TICK_OWNED` slot while this Core owns the
1792    /// thread's wave; cross-Core same-thread nesting panics fail-loud
1793    /// at `BatchGuard::claim_in_tick`.
1794    pub(crate) generation: u64,
1795}
1796
1797// S2b (D223): `Core` is **not** `Clone` and has **no** `WeakCore`. Under
1798// the actor / work-stealing model a `Core` owns its state cell by value
1799// and is moved between workers, never shared — so the old `Arc<C>`-shared
1800// `Clone` and the `Weak<C>`-back-ref `WeakCore` (used to break the
1801// BenchBinding→registry→closure→strong-Core cycle for long-lived
1802// binding-stored closures / timer tasks) are deleted. Autonomous async
1803// producers (timer tasks) now reach the `Core` via the `Send + Sync`
1804// [`crate::mailbox::CoreMailbox`] instead of upgrading a `Weak<C>`
1805// (D227/D230). Identity (`same_dispatcher`) is the unique per-`Core`
1806// `generation` counter, not `Arc::ptr_eq`.
1807
1808impl Drop for Core {
1809    fn drop(&mut self) {
1810        // Tell any in-flight timer tasks the Core is gone so they release
1811        // their pending handle and bail instead of leaking it into a
1812        // mailbox no one will drain (mirrors the old
1813        // `WeakCore::upgrade() == None` teardown branch in `timer.rs`).
1814        // `close()` is mutually exclusive with `post_op` (shared `ops`
1815        // lock), so after it returns no new op can enqueue.
1816        self.mailbox.close();
1817        // D249/S2c: close the owner-side defer queue too — its queued
1818        // closures are dropped unrun (running `CoreFull` on a
1819        // half-dropped `Core` is unsound — user-locked QA decision A).
1820        // No handle-release contract: `DeferFn`s capture no bare
1821        // retained `HandleId` (D235 P8 pattern).
1822        self.deferred.close();
1823        // QA F-A / Blind #2 (2026-05-18): an op posted just before
1824        // `close()` won the lock is now stranded in the queue with its
1825        // retained `HandleId`. Drain the remainder and release those
1826        // handles — without this `Drop` regresses the exact
1827        // BenchCore-teardown leak the deleted `WeakCore` path prevented.
1828        for op in self.mailbox.take_all() {
1829            match op {
1830                crate::mailbox::MailboxOp::Emit(_, h) | crate::mailbox::MailboxOp::Error(_, h) => {
1831                    self.binding.release_handle(h);
1832                }
1833                crate::mailbox::MailboxOp::Complete(_) | crate::mailbox::MailboxOp::Defer(_) => {}
1834            }
1835        }
1836    }
1837}
1838
1839/// Object-safe full-`Core` re-entry surface (S2b / D233) — the methods a
1840/// producer sink's owner-side [`crate::mailbox::MailboxOp::Defer`]
1841/// closure needs, by `NodeId`/`HandleId`/`Sink`/id only (no `C`/`T`),
1842/// blanket-impl'd for every `Core`. Lets windowing /
1843/// higher-order-operator sinks perform value-returning topology mutation
1844/// (`register_*`/`subscribe`) **in-wave** without naming the cell type:
1845/// the `BatchGuard` drain-to-quiescence loop calls
1846/// `f(self as &dyn CoreFull)` while it holds the owner `&Core`.
1847pub trait CoreFull {
1848    /// See [`Core::register_state`].
1849    fn register_state(&self, initial: HandleId, partial: bool) -> Result<NodeId, RegisterError>;
1850    /// See [`Core::register_producer`].
1851    fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError>;
1852    /// See [`Core::subscribe`].
1853    fn subscribe(&self, node_id: NodeId, sink: Sink) -> SubscriptionId;
1854    /// See [`Core::try_subscribe`].
1855    fn try_subscribe(&self, node_id: NodeId, sink: Sink) -> Result<SubscriptionId, SubscribeError>;
1856    /// See [`Core::unsubscribe`].
1857    fn unsubscribe(&self, node_id: NodeId, sub_id: SubscriptionId);
1858    /// See [`Core::emit`].
1859    fn emit(&self, node_id: NodeId, handle: HandleId);
1860    /// See [`Core::complete`].
1861    fn complete(&self, node_id: NodeId);
1862    /// See [`Core::error`].
1863    fn error(&self, node_id: NodeId, handle: HandleId);
1864    /// See [`Core::teardown`]. Sink-side terminal forwards (e.g.
1865    /// `stratify` TEARDOWN passthrough) route through `em.defer` — rare,
1866    /// so `Defer` rather than a 5th `MailboxOp` fast-path variant.
1867    fn teardown(&self, node_id: NodeId);
1868    /// See [`Core::invalidate`]. Sink-side INVALIDATE forwards
1869    /// (higher-order `build_inner_sink`) route through `em.defer`.
1870    fn invalidate(&self, node_id: NodeId);
1871
1872    // --- read-only inspection (S2b/β D243 / D237-A-AMEND) ---
1873    //
1874    // Needed so a `MailboxOp::Defer` closure (the in-wave owner-side
1875    // re-entry point — D233) can run `graphrefly_graph::describe_of`
1876    // for the reactive-describe / observe topology-sub path. These are
1877    // pure reads (no `C`/`T` surfaced) — `HandleId`/`NodeId`/enum only.
1878
1879    /// See [`Core::cache_of`].
1880    fn cache_of(&self, node_id: NodeId) -> HandleId;
1881    /// See [`Core::has_fired_once`].
1882    fn has_fired_once(&self, node_id: NodeId) -> bool;
1883    /// See [`Core::kind_of`].
1884    fn kind_of(&self, node_id: NodeId) -> Option<NodeKind>;
1885    /// See [`Core::deps_of`].
1886    fn deps_of(&self, node_id: NodeId) -> Vec<NodeId>;
1887    /// See [`Core::is_terminal`].
1888    fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind>;
1889    /// See [`Core::is_dirty`].
1890    fn is_dirty(&self, node_id: NodeId) -> bool;
1891    /// Subscriber count for `node_id` — number of live external sinks
1892    /// (NOT counting downstream node-fn fan-out, which is tracked
1893    /// internally via `consumers`). Used by
1894    /// `graphrefly_graph::resource_profile` (R3.6.3 / D285 / D286) to
1895    /// compute per-node `subscriberCount` and the
1896    /// `hotspots.bySubscriberCount` ranking without needing a separate
1897    /// substrate widening. Returns 0 for unknown / torn-down nodes
1898    /// (consistent with the other `*_of` accessors' "unknown ⇒ default"
1899    /// stance). Pure read; no `C`/`T` surfaced.
1900    fn sink_count_of(&self, node_id: NodeId) -> usize;
1901
1902    /// Serialize a node's cached `HandleId` via the binding (S2b/β
1903    /// D244). Delegates to `binding_ptr().serialize_handle` — needed so
1904    /// an in-wave `MailboxOp::Defer` closure (storage snapshot-on-
1905    /// observe) can run `graphrefly_graph` snapshot through `&dyn
1906    /// CoreFull`. Pure binding-delegating read; no `C`/`T` surfaced.
1907    fn serialize_handle(&self, handle: HandleId) -> Option<serde_json::Value>;
1908
1909    /// The wave-drained [`CoreMailbox`] (D246 rule 5: the one facade is
1910    /// mutation + inspection + serialize + **mailbox**). Lets a holder
1911    /// of `&dyn CoreFull` post deferred ops without naming the cell
1912    /// type — folds D245's per-binding "how do I reach the mailbox".
1913    fn mailbox(&self) -> Arc<crate::mailbox::CoreMailbox>;
1914
1915    /// The owner-side [`crate::mailbox::DeferQueue`] (D249/S2c — the
1916    /// `!Send` `Defer` split off `CoreMailbox`). Lets a holder of
1917    /// `&dyn CoreFull` post owner-side deferred closures (`ProducerCtx`
1918    /// build path) without naming the cell type.
1919    fn defer_queue(&self) -> std::rc::Rc<crate::mailbox::DeferQueue>;
1920
1921    // --- producer-build accessors (D246 r5 / D245) ---
1922    //
1923    // The producer-build path runs owner-side with this one facade
1924    // (D245 Option A: `BindingBoundary::invoke_fn_with_core` hands the
1925    // binding `&dyn CoreFull`, from which it builds its `ProducerCtx`).
1926    // A build closure + its spawned sinks need the binding `Arc` (for
1927    // `retain_handle`/`release_handle` around payload shares). D274
1928    // (2026-05-21): the `*_or_defer` deferred-emit helpers were deleted
1929    // — producer sinks call `core.emit/complete/error` directly via the
1930    // owner-side actor model (no partition-order violation can fire
1931    // post-D248/D255 actor model + D246 single-owner Core).
1932
1933    /// See [`Core::binding`].
1934    fn binding(&self) -> Arc<dyn crate::boundary::BindingBoundary>;
1935}
1936
1937impl CoreFull for Core {
1938    #[inline]
1939    fn register_state(&self, initial: HandleId, partial: bool) -> Result<NodeId, RegisterError> {
1940        Core::register_state(self, initial, partial)
1941    }
1942    #[inline]
1943    fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
1944        Core::register_producer(self, fn_id)
1945    }
1946    #[inline]
1947    fn subscribe(&self, node_id: NodeId, sink: Sink) -> SubscriptionId {
1948        Core::subscribe(self, node_id, sink)
1949    }
1950    #[inline]
1951    fn try_subscribe(&self, node_id: NodeId, sink: Sink) -> Result<SubscriptionId, SubscribeError> {
1952        Core::try_subscribe(self, node_id, sink)
1953    }
1954    #[inline]
1955    fn unsubscribe(&self, node_id: NodeId, sub_id: SubscriptionId) {
1956        Core::unsubscribe(self, node_id, sub_id);
1957    }
1958    #[inline]
1959    fn emit(&self, node_id: NodeId, handle: HandleId) {
1960        Core::emit(self, node_id, handle);
1961    }
1962    #[inline]
1963    fn complete(&self, node_id: NodeId) {
1964        Core::complete(self, node_id);
1965    }
1966    #[inline]
1967    fn error(&self, node_id: NodeId, handle: HandleId) {
1968        Core::error(self, node_id, handle);
1969    }
1970    #[inline]
1971    fn teardown(&self, node_id: NodeId) {
1972        Core::teardown(self, node_id);
1973    }
1974    #[inline]
1975    fn invalidate(&self, node_id: NodeId) {
1976        Core::invalidate(self, node_id);
1977    }
1978    #[inline]
1979    fn cache_of(&self, node_id: NodeId) -> HandleId {
1980        Core::cache_of(self, node_id)
1981    }
1982    #[inline]
1983    fn has_fired_once(&self, node_id: NodeId) -> bool {
1984        Core::has_fired_once(self, node_id)
1985    }
1986    #[inline]
1987    fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
1988        Core::kind_of(self, node_id)
1989    }
1990    #[inline]
1991    fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
1992        Core::deps_of(self, node_id)
1993    }
1994    #[inline]
1995    fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
1996        Core::is_terminal(self, node_id)
1997    }
1998    #[inline]
1999    fn is_dirty(&self, node_id: NodeId) -> bool {
2000        Core::is_dirty(self, node_id)
2001    }
2002    #[inline]
2003    fn sink_count_of(&self, node_id: NodeId) -> usize {
2004        Core::sink_count_of(self, node_id)
2005    }
2006    #[inline]
2007    fn serialize_handle(&self, handle: HandleId) -> Option<serde_json::Value> {
2008        self.binding_ptr().serialize_handle(handle)
2009    }
2010    #[inline]
2011    fn mailbox(&self) -> Arc<crate::mailbox::CoreMailbox> {
2012        Core::mailbox(self)
2013    }
2014    #[inline]
2015    fn defer_queue(&self) -> std::rc::Rc<crate::mailbox::DeferQueue> {
2016        Core::defer_queue(self)
2017    }
2018    #[inline]
2019    fn binding(&self) -> Arc<dyn crate::boundary::BindingBoundary> {
2020        Core::binding(self)
2021    }
2022    // D274 (2026-05-21): the trait `*_or_defer` methods were deleted.
2023    // Producer sinks call `core.emit/complete/error` directly via the
2024    // owner-side actor model.
2025}
2026
2027/// RAII guard that owns an [`OperatorScratch`] until either (a) the
2028/// caller `take()`s it for installation, or (b) the guard drops on an
2029/// early return / unwind, in which case the scratch's handle retains
2030/// are released via [`OperatorScratch::release_handles`].
2031///
2032/// Slice H /qa F1 + F2 (2026-05-07): closes two related correctness
2033/// gaps in `Core::register`:
2034///
2035/// 1. **TOCTOU window** — the original three-phase split called
2036///    `lock_state()` twice (once for validation, once for insertion),
2037///    so a concurrent `Core::complete(dep)` on a non-resubscribable
2038///    dep could slip in between the two acquisitions and re-create
2039///    the wedge `RegisterError::TerminalDep` was designed to prevent.
2040///    The guard plus a single locked region for both phases closes
2041///    this gap (release runs lock-released because guard variables
2042///    drop in reverse declaration order — guard declared BEFORE
2043///    `lock_state()`, so the lock guard drops first).
2044///
2045/// 2. **Panic-unsafe scratch leak** — without an RAII drop, a panic
2046///    between `make_op_scratch` (Phase 2) and the explicit
2047///    `if let Err(e)` cleanup branch (e.g., `lock_state()` reentrance
2048///    assert, OOM-as-panic on Vec growth in dep iteration) would
2049///    drop the `Box<dyn OperatorScratch>` without releasing the
2050///    seed/default retain. The guard's `Drop` impl releases on any
2051///    unwind path.
2052///
2053/// Lock-discipline: the guard holds `&dyn BindingBoundary` (through
2054/// the `Arc<dyn BindingBoundary>` it borrows from). On `Drop`, it
2055/// invokes `release_handles` lock-released — fires AFTER any
2056/// `MutexGuard<CoreState>` declared later in the same scope drops
2057/// (LIFO destruction order). Mirrors `Core::resume` Phase 2 release
2058/// pattern.
2059struct ScratchReleaseGuard<'a> {
2060    scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
2061    binding: &'a dyn BindingBoundary,
2062}
2063
2064impl<'a> ScratchReleaseGuard<'a> {
2065    fn new(
2066        scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
2067        binding: &'a dyn BindingBoundary,
2068    ) -> Self {
2069        Self { scratch, binding }
2070    }
2071
2072    /// Take ownership of the scratch — disarms the release-on-drop
2073    /// behavior. Used on the success path to install the scratch on
2074    /// `NodeRecord.op_scratch`.
2075    fn take(mut self) -> Option<Box<dyn crate::op_state::OperatorScratch>> {
2076        self.scratch.take()
2077    }
2078}
2079
2080impl Drop for ScratchReleaseGuard<'_> {
2081    fn drop(&mut self) {
2082        if let Some(mut scratch) = self.scratch.take() {
2083            scratch.release_handles(self.binding);
2084        }
2085    }
2086}
2087
2088/// Combined RAII guard returned by [`Core::lock_state`]. Holds the
2089/// [`CoreState`] borrow + the [`CoreShared`] borrow **together**.
2090/// `DerefMut`s to [`CoreState`] (so `s.nodes` / `s.children` /
2091/// `s.binding` work) **and** exposes an inherent `shared` field (so
2092/// `s.shared.<f>` works — inherent-field access is resolved before
2093/// `Deref`). D246/S2c: single-owner ⇒ the two regions are plain
2094/// `RefCell`s on [`Core`] (the pre-D246 `parking_lot`-Mutex +
2095/// shard-generic split was shared-Core-era legacy). A re-entrant
2096/// double-borrow panics loudly — a dispatcher bug (a missing
2097/// lock-released bracket around a binding callback), not a user error.
2098pub(crate) struct St<'a> {
2099    state: std::cell::RefMut<'a, CoreState>,
2100    pub(crate) shared: std::cell::RefMut<'a, CoreShared>,
2101}
2102
2103impl<'a> St<'a> {
2104    /// Borrow both Core regions together. Used by [`Core::lock_state`]
2105    /// and the owner-side drop paths (which hold `&Core`).
2106    #[inline]
2107    pub(crate) fn new(core: &'a Core) -> Self {
2108        let state = core.state.borrow_mut();
2109        let shared = core.shared.borrow_mut();
2110        Self { state, shared }
2111    }
2112
2113    /// Allocate a fresh [`NodeId`] (the counter lives in the separate
2114    /// `CoreShared` region — `St` is the only place holding both).
2115    pub(crate) fn alloc_node_id(&mut self) -> NodeId {
2116        let id = NodeId::new(self.shared.next_node_id);
2117        self.shared.next_node_id += 1;
2118        id
2119    }
2120
2121    /// Allocate a fresh [`SubscriptionId`].
2122    ///
2123    /// **Invariant (QA F4, 2026-05-18): monotonic, never recycled for
2124    /// the `Core`'s lifetime.** A strictly-incrementing counter, never
2125    /// decremented or reset. This is what makes the owner-driven
2126    /// `unsubscribe` model sound: a `(NodeId, SubscriptionId)` pair
2127    /// recorded by `producer_deactivate` / `SubGuard` and unsubscribed
2128    /// later (after the slot may have been removed by a re-entrant
2129    /// cascade) can NEVER deregister a *different* subscription —
2130    /// `unsubscribe_sink` on a since-departed `sub_id` is a documented
2131    /// no-op, not a wrong-target removal.
2132    pub(crate) fn alloc_sub_id(&mut self) -> SubscriptionId {
2133        let id = SubscriptionId(self.shared.next_subscription_id);
2134        self.shared.next_subscription_id += 1;
2135        id
2136    }
2137}
2138
2139impl std::ops::Deref for St<'_> {
2140    type Target = CoreState;
2141    #[inline]
2142    fn deref(&self) -> &CoreState {
2143        &self.state
2144    }
2145}
2146
2147impl std::ops::DerefMut for St<'_> {
2148    #[inline]
2149    fn deref_mut(&mut self) -> &mut CoreState {
2150        &mut self.state
2151    }
2152}
2153
2154impl Core {
2155    /// Construct a fresh Core wired to the given binding. Pause buffer
2156    /// cap defaults to unbounded; set via [`Self::set_pause_buffer_cap`].
2157    #[must_use]
2158    pub fn new(binding: Arc<dyn BindingBoundary>) -> Self {
2159        Self {
2160            // D246/S2c: single-owner ⇒ plain `RefCell` regions (the
2161            // pre-D246 shard-generic + map were collapsed). D248 relaxed the
2162            // substrate `Sink` off `Send+Sync` ⇒ `Core` is `!Send +
2163            // !Sync` — the actor-model shape (D221/D223/D248); one
2164            // owner thread, cross-`Core` parallelism via independent
2165            // per-worker Cores.
2166            shared: std::cell::RefCell::new(CoreShared {
2167                next_node_id: 1,
2168                next_subscription_id: 1,
2169                // A4 (Slice F, 2026-05-07): start `next_lock_id` in the
2170                // high half of the u32 range so `alloc_lock_id` can't
2171                // collide with user-supplied `LockId::new(N)`
2172                // constructors (which the napi-rs binding marshals from
2173                // `u32` and tests typically use in the low range,
2174                // 1..1024). Phase E /qa F1 (2026-05-08): lowered from
2175                // `1u64 << 32` to `1u64 << 31` so the value round-trips
2176                // through `u32::try_from(...)` at the napi boundary —
2177                // the previous seed errored every napi `alloc_lock_id`
2178                // call. Anti-collision intent (high range vs low user
2179                // range) preserved at half the prior ceiling (2^31 ≈ 2
2180                // billion allocations per Core, ample for parity
2181                // tests). Lift the floor when the deferred
2182                // BigInt-narrowing migration extends `LockId` to `u64`
2183                // at the FFI layer (porting-deferred "BigInt migration
2184                // for u32-narrowed napi types" entry).
2185                next_lock_id: 1u64 << 31,
2186                // `currently_firing` is the P13 set_deps reentrancy
2187                // check stack (/qa F2). Single-owner ⇒ one thread,
2188                // but kept here (Core-global) as the canonical
2189                // location.
2190                currently_firing: Vec::new(),
2191                pause_buffer_cap: None,
2192                max_batch_drain_iterations: 10_000,
2193                topology_sinks: HashMap::new(),
2194                next_topology_id: 1,
2195                pending_scratch_release: Vec::new(),
2196                binding: binding.clone(),
2197            }),
2198            state: std::cell::RefCell::new(CoreState {
2199                nodes: HashMap::new(),
2200                children: HashMap::new(),
2201                binding: binding.clone(),
2202            }),
2203            mailbox: Arc::new(crate::mailbox::CoreMailbox::new()),
2204            deferred: std::rc::Rc::new(crate::mailbox::DeferQueue::new()),
2205            binding,
2206            generation: CORE_GENERATION.fetch_add(1, Ordering::Relaxed),
2207        }
2208    }
2209
2210    /// Acquire the **combined** state guard: [`CoreState`] +
2211    /// [`CoreShared`] borrowed together. Returns [`St`] — `DerefMut`s
2212    /// to `CoreState` and exposes a `.shared` field, so
2213    /// `let mut s = self.lock_state(); … s.nodes … s.shared.x …` works.
2214    ///
2215    /// D246/S2c: single-owner ⇒ plain `RefCell` borrows (no lock).
2216    /// Binding callbacks fire LOCK-RELEASED (the guard is dropped
2217    /// before `invoke_fn`), so a re-entrant `lock_state()` while a
2218    /// guard is held is a dispatcher bug — it panics loudly via
2219    /// `RefCell`'s double-borrow check, the same observable contract
2220    /// the prior `parking_lot` re-entrant-deadlock path had.
2221    pub(crate) fn lock_state(&self) -> St<'_> {
2222        St::new(self)
2223    }
2224
2225    /// Run `f` with mutable access to the [`CoreShared`] region ONLY
2226    /// (id counters, topology-sink registry, `currently_firing`, the
2227    /// two caps, the scratch-release queue). Pure-shared
2228    /// ops (id alloc, topology) take only this borrow.
2229    pub(crate) fn with_shared<R>(&self, f: impl FnOnce(&mut CoreShared) -> R) -> R {
2230        f(&mut self.shared.borrow_mut())
2231    }
2232
2233    /// Run `f` with mutable access to the [`CoreState`] region ONLY
2234    /// (`nodes` / `children` / `binding`). The state-access seam —
2235    /// kept as a single method so a future M6 owner-thread overlay is
2236    /// a re-impl of one method, not a re-architecture of the ~104 call
2237    /// sites. D246/S2c: single shard always (single-owner), plain
2238    /// `RefCell` borrow.
2239    #[allow(dead_code)]
2240    pub(crate) fn with_shard<R>(&self, f: impl FnOnce(&mut crate::node::CoreState) -> R) -> R {
2241        f(&mut self.state.borrow_mut())
2242    }
2243
2244    /// Whether `self` and `other` are the same dispatcher instance.
2245    ///
2246    /// S2b (D223): `Core` is owned by value (no `Arc<C>`, no `Clone`), so
2247    /// identity is the unique per-`Core` [`Self::generation`] counter
2248    /// (assigned once from [`CORE_GENERATION`] at construction), not
2249    /// `Arc::ptr_eq`. Two independently `Core::new`-constructed instances
2250    /// have distinct generations even with the same binding; there is no
2251    /// longer any way to produce two handles to one `Core` (no `Clone`),
2252    /// so this is `true` only for genuine self-vs-self comparison.
2253    ///
2254    /// Used by `graphrefly-graph`'s `mount` to enforce the single-Core
2255    /// v1 invariant — cross-Core mount is post-M6.
2256    #[must_use]
2257    pub fn same_dispatcher(&self, other: &Core) -> bool {
2258        self.generation == other.generation
2259    }
2260
2261    /// Owner-side mailbox drain (D227/D230). Pops every
2262    /// [`crate::mailbox::CoreMailbox`]-queued timer `Emit` request in
2263    /// FIFO order and applies it via the synchronous [`Self::emit`] — so
2264    /// autonomous timer tasks (which can no longer hold `&Core`/`Weak<C>`
2265    /// under D223) get their fires delivered with **no async in Core**.
2266    ///
2267    /// Called from the embedder's existing advance/pump site (test
2268    /// harness `TestRuntime` advance helper, napi pump). Timer tasks
2269    /// already require the host runtime to be advanced before they fire,
2270    /// so draining here is behaviour-identical to the deleted autonomous
2271    /// `WeakCore::upgrade → core.emit` path. Idempotent on an empty
2272    /// mailbox. Re-entrant-safe: `emit` may cascade and a concurrent
2273    /// timer task may post again — the next drain picks it up (the
2274    /// `runnable` bit is re-set on every post).
2275    ///
2276    /// # Panics
2277    ///
2278    /// Panics if the id-mailbox / defer-queue keep mutually re-posting
2279    /// past the configured `max_batch_drain_iterations` cap — a livelock
2280    /// signal that a `Defer`/`Emit` pair is consuming its own output.
2281    pub fn drain_mailbox(&self) {
2282        // Clone the Arc out so the drain closure can call `&self.*`
2283        // without aliasing `self.mailbox` through `&self`.
2284        let mailbox = Arc::clone(&self.mailbox);
2285        // QA P3: bound the inner drain by the configured cap (the
2286        // self-reposting-Defer livelock guard lives here, NOT in
2287        // `drain_and_flush`'s fire-cascade counter).
2288        let deferred = std::rc::Rc::clone(&self.deferred);
2289        // QA P3: bound the inner drain by the configured cap (the
2290        // self-reposting-Defer livelock guard lives here, NOT in
2291        // `drain_and_flush`'s fire-cascade counter).
2292        //
2293        // QA F10 (2026-05-19): note that `max_ops` here is **shared
2294        // across three bounds** — each `mailbox.drain_into` inner
2295        // cap, each `deferred.drain_into` inner cap, AND the outer
2296        // mutual-quiescence round counter below. A workload requiring
2297        // legitimately deep cross-queue chaining (defer→emit→defer→…)
2298        // must tune `Core::set_max_batch_drain_iterations` up to
2299        // cover ALL THREE; counting against each independently is
2300        // intentional (the bound's purpose is "any single drain
2301        // dimension's livelock fires loudly"), not a knob granularity
2302        // bug.
2303        let max_ops = self.with_shared(|sh| sh.max_batch_drain_iterations);
2304        // D249/S2c: the owner-side `Defer` queue was split off the
2305        // `Send` id-`CoreMailbox` (so `!Send` `Defer` closures can
2306        // capture relaxed `Sink`s / `Rc<RefCell<GraphInner>>`). They
2307        // are now two physical queues, but a closure in either can
2308        // re-post to the OTHER (the canonical case: a `DeferQueue`
2309        // inner-subscribe → push-on-subscribe → an `Emit` on the
2310        // id-`CoreMailbox`). Pre-D249 this was ONE FIFO drained to
2311        // quiescence; restore that contract by draining BOTH to
2312        // **mutual quiescence** — id-mailbox first, then deferred, loop
2313        // until neither is runnable. Bounded by `max_ops` rounds.
2314        //
2315        // M1 contract (QA, 2026-05-19) — **CROSS-QUEUE ORDER = QUEUE
2316        // PRIORITY, NOT ARRIVAL ORDER.** Within each queue intra-FIFO
2317        // order is preserved (D234 cancel-before-resubscribe holds —
2318        // both are `DeferQueue` posts). ACROSS queues the order is
2319        // structurally `CoreMailbox` ops first, then `DeferQueue` ops,
2320        // every round. Pre-D249 a sink posting `[defer, emit]` to ONE
2321        // queue saw `defer` applied first; post-D249 a sink posting
2322        // `[defer (DeferQueue), emit (CoreMailbox)]` sees `emit`
2323        // applied first (mailbox-priority). Operators that need a
2324        // specific cross-queue ordering MUST capture the routing state
2325        // inside the *later* op's closure (D234 pattern: read shared
2326        // state at apply time, not at post time). Locked by the
2327        // `cross_queue_order_mailbox_then_deferred` regression in
2328        // `tests/lock_discipline.rs`.
2329        let mut rounds = 0u32;
2330        loop {
2331            mailbox.drain_into(max_ops, |op| match op {
2332                crate::mailbox::MailboxOp::Emit(node_id, handle) => self.emit(node_id, handle),
2333                crate::mailbox::MailboxOp::Complete(node_id) => self.complete(node_id),
2334                crate::mailbox::MailboxOp::Error(node_id, handle) => self.error(node_id, handle),
2335                // D233/D249: `Send` cross-thread timer defer applied
2336                // in-wave (we hold `&Core`).
2337                crate::mailbox::MailboxOp::Defer(f) => f(self),
2338            });
2339            // Each closure runs with the full object-safe Core surface
2340            // (we hold `&Core`); its topology mutation + result
2341            // consumption runs here.
2342            deferred.drain_into(max_ops, |f| f(self));
2343            if !mailbox.is_runnable() && !deferred.is_runnable() {
2344                break;
2345            }
2346            rounds += 1;
2347            assert!(
2348                rounds < max_ops,
2349                "drain_mailbox: id-mailbox / defer-queue mutual re-post \
2350                 livelock (> {max_ops} rounds) — a Defer/Emit pair is \
2351                 re-posting across the two queues every round. Tune via \
2352                 Core::set_max_batch_drain_iterations only with concrete \
2353                 evidence the workload needs more."
2354            );
2355        }
2356    }
2357
2358    /// Owner-side enqueue of a deferred closure (D233/D249). Runs at
2359    /// the next [`Self::drain_mailbox`] / in-wave `BatchGuard` drain
2360    /// with the object-safe full-`Core` surface. Returns `false` iff
2361    /// the owning `Core` is gone (closure dropped unrun). The
2362    /// owner-only `Rc<DeferQueue>` (also reachable via
2363    /// [`Self::defer_queue`] for capture into `!Send`
2364    /// `Sink`/`TopologySink` closures that cannot hold `&Core`).
2365    pub fn post_defer(&self, f: crate::mailbox::DeferFn) -> bool {
2366        self.deferred.post(f)
2367    }
2368
2369    /// Shared handle to this `Core`'s owner-side defer queue (D249).
2370    /// `graphrefly-operators`' `ProducerEmitter` + reactive
2371    /// describe/observe topo sinks hold this `Rc` to enqueue `Defer`
2372    /// work from inside a `!Send` sink closure (which carries no
2373    /// `&Core`). Owner-thread-only; never sent across threads.
2374    #[must_use]
2375    pub fn defer_queue(&self) -> std::rc::Rc<crate::mailbox::DeferQueue> {
2376        std::rc::Rc::clone(&self.deferred)
2377    }
2378
2379    /// Shared handle to this `Core`'s [`crate::mailbox::CoreMailbox`].
2380    /// Handed to autonomous async producers (timer tasks via
2381    /// [`crate::timer::spawn_timer_task`]) so they can post `Emit`
2382    /// requests without holding `&Core`/`Weak<C>` (D223/D227/D230).
2383    #[must_use]
2384    pub fn mailbox(&self) -> Arc<crate::mailbox::CoreMailbox> {
2385        Arc::clone(&self.mailbox)
2386    }
2387
2388    /// Shared handle to this `Core`'s binding. S2b/D231/A′: producer
2389    /// build closures obtain the binding here (via `ctx.core()`) for
2390    /// their spawned sinks' `retain_handle`/`pack_tuple`/`release_handle`
2391    /// — instead of the deleted `Weak<dyn ProducerBinding>` cycle-break.
2392    /// No cycle: the registry-stored build closure captures nothing
2393    /// strong; the `Arc<dyn BindingBoundary>` a sink clones lives in
2394    /// `Core`'s subscriber map and drops on unsubscribe.
2395    #[must_use]
2396    pub fn binding(&self) -> Arc<dyn BindingBoundary> {
2397        Arc::clone(&self.binding)
2398    }
2399
2400    // D274 (2026-05-21): `push_deferred_producer_op` /
2401    // `drain_deferred_producer_ops` / `DeferredProducerOp` were deleted
2402    // — the union-find ascending-order constraint they shimmed around
2403    // is gone (groups are static identity only post-D248/D253; single-
2404    // owner per Core post-D246; one Core per OS thread per D252). All
2405    // producer-pattern sinks now call `core.emit/complete/error`
2406    // directly via the owner-side actor model; partition-order
2407    // violations cannot fire.
2408
2409    // D246/S2c: the §7 cross-thread wave-lock acquisition
2410    // (`compute_touched_groups` / `all_groups_sorted` /
2411    // `acquire_touched_group_guards` / `acquire_all_group_guards` /
2412    // `with_global_wave_fallback`) is **deleted** — single-owner ⇒ a
2413    // `Core` is driven by exactly one thread, so there are no
2414    // interleaving waves to serialize. D253 (S5) further deletes the
2415    // declared-group identity surface (`SchedulingGroupId`,
2416    // `node_group`, `partition_of`/`group_of`/`set_scheduling_group`,
2417    // and `NodeOpts.scheduling_group`) — re-introduce in M6 with M6's
2418    // actual scheduling needs in view.
2419}
2420
2421// `walk_undirected_dep_graph` + `component_is_group_consistent`: deleted
2422// by D253 (S5). They existed solely to validate `scheduling_group`'s
2423// dep+children+meta-component invariant at topology-mutation time. With
2424// the declared-group identity surface deleted, the component walk has no
2425// remaining caller — re-introduce alongside M6's scheduling work.
2426
2427impl Core {
2428    /// Internal inspection helper: number of `PendingBatch`es queued
2429    /// for `node` in the current wave. Used by Slice X4 D2 regression
2430    /// tests to pin the "common case = single batch, no SmallVec
2431    /// spill" perf invariant.
2432    ///
2433    /// Returns `None` if no `pending_notify` entry exists for `node`
2434    /// (no tier-1+ message has been queued for this node yet in this
2435    /// wave). `Some(0)` is unreachable by construction (a vacant
2436    /// entry implies no batches; an occupied entry has at least one).
2437    ///
2438    /// `#[doc(hidden)]` + unconditionally `pub` (NOT
2439    /// `cfg(any(test, debug_assertions))`): integration tests are a
2440    /// separate crate, so they get neither `cfg(test)` nor
2441    /// `debug_assertions` on the lib dependency under `--release`.
2442    /// Gating this out of release builds made the entire
2443    /// integration-test suite fail to compile under
2444    /// `cargo nextest run --release`, blocking release-optimized test
2445    /// runs (e.g. the `cascade_depth` stress tests). It is NOT primary
2446    /// public API — `#[doc(hidden)]` keeps it off the generated
2447    /// surface (spec §5.12: protocol-internal inspection is allowed
2448    /// but never a primary API).
2449    #[doc(hidden)]
2450    #[must_use]
2451    pub fn pending_batch_count(&self, node: NodeId) -> Option<usize> {
2452        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
2453        // on per-thread `WaveState`. Test callers run on the same
2454        // thread that ran the wave, so the per-thread placement is
2455        // observable here.
2456        crate::batch::with_wave_state(|ws| {
2457            ws.pending_notify
2458                .get(&node)
2459                .map(|entry| entry.batches.len())
2460        })
2461    }
2462
2463    /// Configure the Core-global cap on pause replay buffer length. When set,
2464    /// any per-node pause buffer that would exceed `cap` drops the oldest
2465    /// message(s) from the front; the dropped count is reported back via the
2466    /// resume callback (see [`ResumeReport`]). `None` (default) means
2467    /// unbounded; messages buffer indefinitely until the lockset clears.
2468    pub fn set_pause_buffer_cap(&self, cap: Option<usize>) {
2469        self.lock_state().shared.pause_buffer_cap = cap;
2470    }
2471
2472    /// Configure the replay buffer cap on `node_id` (R2.6.5 / Lock 6.G —
2473    /// Slice E1, 2026-05-07). `None` disables the buffer. `Some(N)` keeps
2474    /// the last `N` DATA emissions in a circular buffer; late subscribers
2475    /// receive them as part of the per-tier handshake (between START and
2476    /// any terminal). Switching from a larger cap to a smaller cap evicts
2477    /// the front of the buffer to fit; switching to `None` drains the
2478    /// buffer entirely. Each evicted/drained handle's retain is released
2479    /// back to the binding.
2480    ///
2481    /// # Panics
2482    ///
2483    /// Panics if `node_id` is not registered.
2484    pub fn set_replay_buffer_cap(&self, node_id: NodeId, cap: Option<usize>) {
2485        // QA A7 (2026-05-07): normalize `Some(0)` to `None`. Two ways to
2486        // express "disabled" is confusing: `push_replay_buffer` already
2487        // treats `Some(0)` as no-op, so persisting it adds nothing.
2488        let cap = match cap {
2489            Some(0) => None,
2490            other => other,
2491        };
2492        let to_release: Vec<HandleId> = {
2493            let mut s = self.lock_state();
2494            let rec = s.require_node_mut(node_id);
2495            rec.replay_buffer_cap = cap;
2496            match cap {
2497                None => rec.replay_buffer.drain(..).collect(),
2498                Some(c) => {
2499                    let mut drained = Vec::new();
2500                    while rec.replay_buffer.len() > c {
2501                        if let Some(h) = rec.replay_buffer.pop_front() {
2502                            drained.push(h);
2503                        }
2504                    }
2505                    drained
2506                }
2507            }
2508        };
2509        for h in to_release {
2510            self.binding.release_handle(h);
2511        }
2512    }
2513
2514    /// Reconfigure the pause mode for `node_id` (canonical §2.6 — Slice F
2515    /// audit close, 2026-05-07). Default for new nodes is
2516    /// [`PausableMode::Default`]; switch to [`PausableMode::ResumeAll`]
2517    /// for nodes whose pause-window emit history must be observable
2518    /// verbatim, or [`PausableMode::Off`] for nodes intrinsically
2519    /// pause-immune.
2520    ///
2521    /// # Errors
2522    ///
2523    /// - [`SetPausableModeError::UnknownNode`] — `node_id` is not
2524    ///   registered.
2525    /// - [`SetPausableModeError::WhilePaused`] — the node currently
2526    ///   holds at least one pause lock. Changing mode mid-pause would
2527    ///   lose buffered content or strand a `pending_wave` flag — resume
2528    ///   all locks first.
2529    pub fn set_pausable_mode(
2530        &self,
2531        node_id: NodeId,
2532        mode: PausableMode,
2533    ) -> Result<(), SetPausableModeError> {
2534        let mut s = self.lock_state();
2535        let rec = s
2536            .nodes
2537            .get_mut(&node_id)
2538            .ok_or(SetPausableModeError::UnknownNode(node_id))?;
2539        if rec.pause_state.is_paused() {
2540            return Err(SetPausableModeError::WhilePaused);
2541        }
2542        rec.pausable = mode;
2543        Ok(())
2544    }
2545
2546    /// Configure the wave-drain iteration cap (R4.3 / Lock 2.F′). The wave
2547    /// engine aborts a drain after `cap` iterations with a diagnostic panic.
2548    /// Default is `10_000` — high enough to avoid false positives on legitimate
2549    /// fan-in cascades, low enough to surface runtime cycles within seconds.
2550    ///
2551    /// Lower this only when running adversarial / property-based tests that
2552    /// want fast cycle detection. Raise it only with concrete evidence that a
2553    /// legitimate workload needs more iterations than the default — and even
2554    /// then, prefer to tune the workload (per-subgraph batching, etc.) over
2555    /// raising the cap.
2556    ///
2557    /// # Panics
2558    ///
2559    /// Panics if `cap == 0` — a zero cap would abort every wave on the very
2560    /// first iteration, deadlocking any subsequent dispatcher work.
2561    pub fn set_max_batch_drain_iterations(&self, cap: u32) {
2562        assert!(cap > 0, "max_batch_drain_iterations must be > 0");
2563        self.lock_state().shared.max_batch_drain_iterations = cap;
2564    }
2565
2566    /// Send a message UPSTREAM from `node_id` to each of its declared deps
2567    /// (canonical R1.4.1 — Slice F audit, F2 / 2026-05-07).
2568    ///
2569    /// The dispatcher rejects tier-3 (DATA / RESOLVED) and tier-5
2570    /// (COMPLETE / ERROR) per R1.4.1: value and terminal-lifecycle planes
2571    /// are downstream-only. All other tiers (0 START, 1 DIRTY, 2 PAUSE /
2572    /// RESUME, 4 INVALIDATE, 6 TEARDOWN) pass.
2573    ///
2574    /// # Routing per tier
2575    ///
2576    /// - **Tier 0 ([`Message::Start`]):** no-op. START is a per-subscription
2577    ///   handshake, not a routable wire signal — sending it upstream has no
2578    ///   well-defined target.
2579    /// - **Tier 1 ([`Message::Dirty`]):** no-op. The dep's "something
2580    ///   changed" notification is its own [`Self::emit`] / commit
2581    ///   responsibility; ignoring upstream DIRTY hints is safe.
2582    /// - **Tier 2 ([`Message::Pause`] / [`Message::Resume`]):** translates
2583    ///   to [`Self::pause`] / [`Self::resume`] on each dep. Lock id is
2584    ///   forwarded verbatim. Errors from individual deps are accumulated
2585    ///   in the `dep_errors` field of the returned report.
2586    /// - **Tier 4 ([`Message::Invalidate`]):** translates to
2587    ///   [`Self::invalidate`] on each dep. Note: canonical R1.4.2
2588    ///   distinguishes "downstream INVALIDATE" (cache clear + cascade) from
2589    ///   "upstream INVALIDATE" (plain forward, no self-process). The Rust
2590    ///   port v1 SIMPLIFICATION delegates to the same `Core::invalidate`
2591    ///   path — upstream INVALIDATE here DOES clear dep caches and cascade.
2592    ///   If a "plain forward" mode surfaces as a real consumer need, add
2593    ///   `up_with_options`.
2594    /// - **Tier 6 ([`Message::Teardown`]):** translates to
2595    ///   [`Self::teardown`] on each dep. Cascades per the standard
2596    ///   teardown path.
2597    ///
2598    /// # Errors
2599    ///
2600    /// - [`UpError::UnknownNode`] — `node_id` is not registered.
2601    /// - [`UpError::TierForbidden`] — tier 3 or tier 5.
2602    pub fn up(&self, node_id: NodeId, message: Message) -> Result<(), UpError> {
2603        // 2b-ii-B (D220-EXEC): route to the node's shard before the
2604        // pre-wave/cold `lock_state()` (see `try_emit`).
2605        // QA A10 (2026-05-07): check unknown node BEFORE tier rejection
2606        // for consistent error UX — `up(unknown, Data)` and
2607        // `up(unknown, Pause)` both report `UnknownNode` rather than
2608        // splitting on the tier.
2609        let dep_ids: Vec<NodeId> = {
2610            let s = self.lock_state();
2611            let rec = s.nodes.get(&node_id).ok_or(UpError::UnknownNode(node_id))?;
2612            rec.dep_ids_vec()
2613        };
2614        let tier = message.tier();
2615        if tier == 3 || tier == 5 {
2616            return Err(UpError::TierForbidden { tier });
2617        }
2618        for dep_id in dep_ids {
2619            match message {
2620                Message::Pause(lock) => {
2621                    let _ = self.pause(dep_id, lock);
2622                }
2623                Message::Resume(lock) => {
2624                    let _ = self.resume(dep_id, lock);
2625                }
2626                Message::Invalidate => {
2627                    // D281 / R1.4.2 plain-forward (2026-05-22):
2628                    // canonical-spec §1.4.2 "Upstream INVALIDATE: plain
2629                    // forward — does not self-process INVALIDATE on
2630                    // intermediate or terminal nodes (no `_emit`, no
2631                    // cache clear at source). Cache-clearing semantics
2632                    // apply downstream side only."
2633                    //
2634                    // Mirrors TS `Node.up()` (`packages/pure-ts/src/
2635                    // core/node.ts:1333-1344`) which recursively
2636                    // forwards `up(messages)` to each dep's own `up()`
2637                    // and no-ops at leaf sources (empty `_deps`). The
2638                    // recursive walk bottoms out at leaves where the
2639                    // `for dep_id in dep_ids` iteration above is empty.
2640                    //
2641                    // Pre-D281 this arm called `self.invalidate(dep_id)`,
2642                    // which IS the downstream-cascade entry — it clears
2643                    // the dep's cache AND cascades INVALIDATE to the
2644                    // dep's children. That is exactly the two
2645                    // prohibitions R1.4.2 lists (self-process + cache
2646                    // clear at source). Cross-arm verified: TS pure-ts
2647                    // was already R1.4.2-conformant; Rust was the
2648                    // divergent arm.
2649                    let _ = self.up(dep_id, Message::Invalidate);
2650                }
2651                Message::Teardown => {
2652                    self.teardown(dep_id);
2653                }
2654                // Tier 0 START + tier 1 DIRTY: no-op upstream per the
2655                // routing table above.
2656                _ => {}
2657            }
2658        }
2659        Ok(())
2660    }
2661
2662    /// Allocate a unique [`LockId`] for use with [`Self::pause`] /
2663    /// [`Self::resume`]. Convenience for callers that don't already have an
2664    /// id-allocation scheme; user-supplied ids work too.
2665    #[must_use]
2666    pub fn alloc_lock_id(&self) -> LockId {
2667        let mut s = self.lock_state();
2668        let id = LockId::new(s.shared.next_lock_id);
2669        s.shared.next_lock_id += 1;
2670        id
2671    }
2672
2673    /// Access the binding boundary for this Core.
2674    ///
2675    /// Used by `graphrefly-graph` for snapshot serialization (M4.E1 / D166):
2676    /// `Graph::snapshot()` calls `binding.serialize_handle(cache)` to
2677    /// project each node's cached value into portable JSON.
2678    #[must_use]
2679    pub fn binding_ptr(&self) -> &Arc<dyn BindingBoundary> {
2680        &self.binding
2681    }
2682
2683    // -------------------------------------------------------------------
2684    // Registration — unified `register()` (D030, Slice D)
2685    //
2686    // All node kinds (State / Producer / Derived / Dynamic / Operator)
2687    // funnel through `Core::register(NodeRegistration) -> NodeId`. Sugar
2688    // wrappers (`register_state` / `register_producer` / `register_derived`
2689    // / `register_dynamic` / `register_operator`) build a `NodeRegistration`
2690    // and delegate. There is no parallel registration path internally.
2691    // -------------------------------------------------------------------
2692
2693    /// Unified node registration (D030).
2694    ///
2695    /// `reg` describes the node's identity (deps + closure-form fn id OR
2696    /// typed-op + per-kind opts). The kind is **derived from the field
2697    /// shape**, not stored — see [`NodeKind`].
2698    ///
2699    /// Sugar wrappers below ([`Self::register_state`],
2700    /// [`Self::register_producer`], [`Self::register_derived`],
2701    /// [`Self::register_dynamic`], [`Self::register_operator`]) build the
2702    /// registration for the common kinds and delegate here. Direct callers
2703    /// that need uncommon combinations (e.g., a partial-true derived) can
2704    /// invoke this method directly.
2705    ///
2706    /// # Errors
2707    ///
2708    /// Errors are returned in evaluation order — earlier phases short-circuit
2709    /// later ones, so a single registration produces at most one variant.
2710    ///
2711    /// **Phase 1 — lock-released, side-effect-free validation:**
2712    /// - [`RegisterError::OperatorWithoutDeps`] — `reg` carries an op but
2713    ///   `deps` is empty. Operator nodes need at least one dep — for
2714    ///   subscription-managed combinators with no declared deps, use
2715    ///   [`Self::register_producer`] instead.
2716    /// - [`RegisterError::InitialOnlyForStateNodes`] — `reg.opts.initial`
2717    ///   is non-sentinel for a non-state shape (deps non-empty, or
2718    ///   fn_or_op present). State nodes are the only kind with an initial
2719    ///   cache.
2720    ///
2721    /// **Phase 2 — operator scratch construction (lock-released):**
2722    /// - [`RegisterError::OperatorSeedSentinel`] — `reg` carries `Op(Scan)`
2723    ///   / `Op(Reduce)` with a `NO_HANDLE` seed. R2.5.3 — stateful folders
2724    ///   must have a real seed.
2725    ///
2726    /// **Phase 3 — state-lock validation (folded with insertion under a
2727    /// single lock acquisition per /qa F1 to prevent TOCTOU between
2728    /// validation and `nodes.insert`):**
2729    /// - [`RegisterError::UnknownDep`] — any element of `reg.deps` is not
2730    ///   a registered node id.
2731    /// - [`RegisterError::TerminalDep`] — a dep is terminal (COMPLETE /
2732    ///   ERROR) AND not resubscribable — would create a permanent wedge.
2733    ///
2734    /// All errors are construction-time invariants — the dispatcher
2735    /// rejects the registration before any reactive state is created.
2736    /// On `Err`, no node has been added and any handle retains taken on
2737    /// the way in (operator scratch seed retains via
2738    /// [`BindingBoundary::retain_handle`]) have been released
2739    /// lock-released — see [`ScratchReleaseGuard`] for the RAII
2740    /// discipline that covers both early-return AND unwind paths.
2741    /// `Last { default }` retains its `default` handle on the same
2742    /// release path.
2743    #[allow(clippy::too_many_lines)]
2744    pub fn register(&self, reg: NodeRegistration) -> Result<NodeId, RegisterError> {
2745        let NodeRegistration {
2746            deps,
2747            fn_or_op,
2748            opts,
2749        } = reg;
2750        let NodeOpts {
2751            initial,
2752            equals,
2753            partial,
2754            is_dynamic,
2755            pausable,
2756            replay_buffer,
2757            terminal_as_real_input,
2758        } = opts;
2759
2760        // Derive the field shape from fn_or_op + deps.
2761        let (fn_id, op) = match fn_or_op {
2762            Some(NodeFnOrOp::Fn(f)) => (Some(f), None),
2763            Some(NodeFnOrOp::Op(o)) => (None, Some(o)),
2764            None => (None, None),
2765        };
2766
2767        // Phase 1 — lock-released, side-effect-free validation. Errors
2768        // here return BEFORE any handle retain is taken.
2769        //
2770        //   - State (no deps + no fn + no op) is the only kind with `initial`.
2771        //   - Dynamic flag only meaningful when fn + non-empty deps.
2772        //   - Operator (op present) must have deps (P9: operator without deps
2773        //     would skip activation — use a producer instead).
2774        let is_state_shape = deps.is_empty() && fn_id.is_none() && op.is_none();
2775        if op.is_some() && deps.is_empty() {
2776            return Err(RegisterError::OperatorWithoutDeps);
2777        }
2778        if initial != NO_HANDLE && !is_state_shape {
2779            return Err(RegisterError::InitialOnlyForStateNodes);
2780        }
2781
2782        // Phase 2 — build per-operator scratch struct (may take handle
2783        // retains via `binding.retain_handle` for Scan/Reduce/Last seed).
2784        // Lock-released per Slice E (D045) handshake discipline. Returns
2785        // `OperatorSeedSentinel` BEFORE retain so an Err leaves no
2786        // dangling handles.
2787        let scratch = match op {
2788            Some(operator_op) => self.make_op_scratch(operator_op)?,
2789            None => None,
2790        };
2791
2792        // Wrap scratch in an RAII guard immediately after Phase 2. From
2793        // here on, ANY early return / unwind path correctly releases the
2794        // scratch's handle retains via `OperatorScratch::release_handles`
2795        // (Slice H /qa F2 — defense against panics between Phase 2 and
2796        // Phase 3 cleanup branch). Lock-released because the guard is
2797        // declared BEFORE `lock_state()` below — variable destruction
2798        // order is reverse declaration order, so the `MutexGuard` drops
2799        // first on any return path.
2800        let scratch_guard = ScratchReleaseGuard::new(scratch, &*self.binding);
2801
2802        // Phase 3 — state-lock-required validation, FOLDED with insertion
2803        // under a single `lock_state()` acquisition per /qa F1. The
2804        // pre-/qa version split this into two acquisitions (one for
2805        // validation, one for `alloc_node_id` + `nodes.insert`), opening
2806        // a TOCTOU window where a concurrent `Core::complete(dep)` on a
2807        // non-resubscribable dep could slip in and recreate the wedge
2808        // `TerminalDep` was designed to prevent. Single locked region
2809        // closes the gap.
2810        let mut s = self.lock_state();
2811
2812        for &dep in &deps {
2813            if !s.nodes.contains_key(&dep) {
2814                return Err(RegisterError::UnknownDep(dep));
2815            }
2816        }
2817        // Slice F audit (2026-05-07): mirror `set_deps`'s `TerminalDep`
2818        // rejection at registration time. Adding a non-resubscribable
2819        // terminal node as a dep at registration creates a permanent wedge.
2820        for &dep in &deps {
2821            let dep_rec = s.require_node(dep);
2822            if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
2823                return Err(RegisterError::TerminalDep(dep));
2824            }
2825        }
2826
2827        // Validation passed — install. Take scratch out of the guard
2828        // (disarms the release-on-drop) and continue using `s`.
2829        let installed_scratch = scratch_guard.take();
2830
2831        let id = s.alloc_node_id();
2832
2833        // `tracked`: Static derived + Operator track all deps; Dynamic
2834        // starts empty and fills via fn return; State / Producer have no
2835        // deps so tracked is empty.
2836        let tracked: HashSet<usize> = if op.is_some() {
2837            (0..deps.len()).collect()
2838        } else if is_dynamic {
2839            HashSet::new()
2840        } else if fn_id.is_some() && !deps.is_empty() {
2841            // Static derived
2842            (0..deps.len()).collect()
2843        } else {
2844            HashSet::new()
2845        };
2846
2847        let dep_records: Vec<DepRecord> = deps.iter().map(|&d| DepRecord::new(d)).collect();
2848
2849        // §10 perf (D047): compute topo_rank = 1 + max(dep ranks).
2850        let topo_rank = if deps.is_empty() {
2851            0
2852        } else {
2853            deps.iter()
2854                .filter_map(|&d| s.nodes.get(&d).map(|r| r.topo_rank))
2855                .max()
2856                .unwrap_or(0)
2857                .saturating_add(1)
2858        };
2859
2860        let rec = NodeRecord {
2861            dep_records,
2862            fn_id,
2863            op,
2864            is_dynamic,
2865            equals,
2866            cache: initial,
2867            has_fired_once: initial != NO_HANDLE,
2868            subscribers: HashMap::new(),
2869            subscribers_revision: 0,
2870            tracked,
2871            dirty: false,
2872            involved_this_wave: false,
2873            pause_state: PauseState::Active,
2874            pausable,
2875            replay_buffer_cap: replay_buffer,
2876            replay_buffer: VecDeque::new(),
2877            terminal: None,
2878            has_received_teardown: false,
2879            resubscribable: false,
2880            meta_companions: Vec::new(),
2881            partial,
2882            terminal_as_real_input,
2883            topo_rank,
2884            received_mask: 0,
2885            involved_mask: 0,
2886            op_scratch: installed_scratch,
2887        };
2888        s.nodes.insert(id, rec);
2889        s.children.insert(id, HashSet::new());
2890        for &dep in &deps {
2891            s.children.entry(dep).or_default().insert(id);
2892        }
2893        drop(s);
2894        self.fire_topology_event(&crate::topology::TopologyEvent::NodeRegistered(id));
2895        Ok(id)
2896    }
2897
2898    /// Sugar over [`Self::register`] — register a state node. `initial`
2899    /// may be [`NO_HANDLE`] to start sentinel.
2900    ///
2901    /// `partial` is accepted for surface consistency (D019); for state
2902    /// nodes it has no effect (state nodes don't fire fn).
2903    ///
2904    /// # Errors
2905    ///
2906    /// State registration is structurally simple — no deps, no op — so
2907    /// the only reachable variant is none in practice. Returns
2908    /// [`Result`] for surface consistency with [`Self::register`].
2909    pub fn register_state(
2910        &self,
2911        initial: HandleId,
2912        partial: bool,
2913    ) -> Result<NodeId, RegisterError> {
2914        self.register(NodeRegistration {
2915            deps: Vec::new(),
2916            fn_or_op: None,
2917            opts: NodeOpts {
2918                initial,
2919                partial,
2920                ..NodeOpts::default()
2921            },
2922        })
2923    }
2924
2925    /// Sugar over [`Self::register`] — register a producer node (D031,
2926    /// Slice D). No deps; fn fires once on first subscribe; cleanup runs
2927    /// via [`BindingBoundary::producer_deactivate`] when the last
2928    /// subscriber unsubscribes.
2929    ///
2930    /// The fn body uses the binding's `ProducerCtx`-equivalent helper
2931    /// (see `graphrefly-operators::producer`) to subscribe to other Core
2932    /// nodes — the zip / concat / race / takeUntil pattern.
2933    ///
2934    /// # Errors
2935    ///
2936    /// Producer registration has no user-supplied deps, so structurally
2937    /// none of [`RegisterError`]'s variants are reachable. Returns
2938    /// [`Result`] for surface consistency with [`Self::register`].
2939    pub fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
2940        self.register(NodeRegistration {
2941            deps: Vec::new(),
2942            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2943            opts: NodeOpts {
2944                // Producers have no deps — the first-run gate is degenerate.
2945                partial: true,
2946                ..NodeOpts::default()
2947            },
2948        })
2949    }
2950
2951    /// Sugar over [`Self::register`] — register a derived (static) node.
2952    /// `partial` controls the R2.5.3 first-run gate (D011).
2953    ///
2954    /// # Errors
2955    ///
2956    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2957    ///   registered.
2958    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2959    ///   resubscribable.
2960    pub fn register_derived(
2961        &self,
2962        deps: &[NodeId],
2963        fn_id: FnId,
2964        equals: EqualsMode,
2965        partial: bool,
2966    ) -> Result<NodeId, RegisterError> {
2967        self.register(NodeRegistration {
2968            deps: deps.to_vec(),
2969            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2970            opts: NodeOpts {
2971                equals,
2972                partial,
2973                ..NodeOpts::default()
2974            },
2975        })
2976    }
2977
2978    /// Sugar over [`Self::register`] — register a dynamic node (fn
2979    /// declares its actually-tracked dep indices per fire). `partial`
2980    /// controls the R2.5.3 first-run gate (D011).
2981    ///
2982    /// # Errors
2983    ///
2984    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2985    ///   registered.
2986    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2987    ///   resubscribable.
2988    pub fn register_dynamic(
2989        &self,
2990        deps: &[NodeId],
2991        fn_id: FnId,
2992        equals: EqualsMode,
2993        partial: bool,
2994    ) -> Result<NodeId, RegisterError> {
2995        self.register(NodeRegistration {
2996            deps: deps.to_vec(),
2997            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2998            opts: NodeOpts {
2999                equals,
3000                partial,
3001                is_dynamic: true,
3002                ..NodeOpts::default()
3003            },
3004        })
3005    }
3006
3007    /// Build a fresh [`OperatorScratch`](crate::op_state::OperatorScratch)
3008    /// box for an operator variant, taking any required handle retains.
3009    /// Shared between `register_operator` (initial install),
3010    /// `reset_for_fresh_lifecycle` (resubscribable terminal-cycle
3011    /// re-install), and Phase G of `Subscription::Drop` (D-α
3012    /// resubscribable + non-terminal deactivate re-install).
3013    ///
3014    /// # Errors
3015    ///
3016    /// Returns [`RegisterError::OperatorSeedSentinel`] if `op` is `Scan`
3017    /// / `Reduce` with a [`NO_HANDLE`] seed (R2.5.3 — stateful folders
3018    /// must have a real seed). Refcount discipline: the seed-sentinel
3019    /// check happens BEFORE [`BindingBoundary::retain_handle`], so an
3020    /// `Err` leaves no handles dangling.
3021    fn make_op_scratch(
3022        &self,
3023        op: OperatorOp,
3024    ) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
3025        Self::make_op_scratch_with_binding(&*self.binding, op)
3026    }
3027
3028    /// Associated-function variant of [`Self::make_op_scratch`] that
3029    /// takes the binding explicitly. Used by call sites that have a
3030    /// `&dyn BindingBoundary` but not a `&Core` (notably
3031    /// [`Subscription::Drop`]'s Phase G, which holds only a
3032    /// `Weak<Mutex<CoreState>>` and operates on `s.binding`).
3033    pub(crate) fn make_op_scratch_with_binding(
3034        binding: &dyn BindingBoundary,
3035        op: OperatorOp,
3036    ) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
3037        use crate::op_state::{
3038            DistinctState, LastState, PairwiseState, ReduceState, ScanState, SkipState, TakeState,
3039            TakeWhileState,
3040        };
3041        // Slice H (2026-05-07): Scan/Reduce seed-sentinel checks happen
3042        // BEFORE retain_handle so an Err return leaves no handles dangling.
3043        //
3044        // Slice H /qa F13 (2026-05-07): for retaining variants, allocate
3045        // the `Box<State>` BEFORE calling `binding.retain_handle`. If
3046        // `Box::new` panics (e.g., OOM-as-panic), no retain has happened
3047        // yet — no leak. If `retain_handle` panics after Box succeeds,
3048        // the `Box<State>` is dropped on unwind; State has no handle yet
3049        // (we haven't touched the registry refcount), so still no leak.
3050        // Caller wraps the returned scratch in `ScratchReleaseGuard` to
3051        // cover panics AFTER make_op_scratch returns.
3052        match op {
3053            OperatorOp::Scan { seed, .. } => {
3054                if seed == NO_HANDLE {
3055                    return Err(RegisterError::OperatorSeedSentinel);
3056                }
3057                let state = Box::new(ScanState { acc: seed });
3058                binding.retain_handle(seed);
3059                Ok(Some(state))
3060            }
3061            OperatorOp::Reduce { seed, .. } => {
3062                if seed == NO_HANDLE {
3063                    return Err(RegisterError::OperatorSeedSentinel);
3064                }
3065                let state = Box::new(ReduceState { acc: seed });
3066                binding.retain_handle(seed);
3067                Ok(Some(state))
3068            }
3069            OperatorOp::DistinctUntilChanged { .. } => Ok(Some(Box::new(DistinctState::default()))),
3070            OperatorOp::Pairwise { .. } => Ok(Some(Box::new(PairwiseState::default()))),
3071            OperatorOp::Take { .. } => Ok(Some(Box::new(TakeState::default()))),
3072            OperatorOp::Skip { .. } => Ok(Some(Box::new(SkipState::default()))),
3073            OperatorOp::TakeWhile { .. } => Ok(Some(Box::new(TakeWhileState))),
3074            OperatorOp::Last { default } => {
3075                let state = Box::new(LastState {
3076                    latest: NO_HANDLE,
3077                    default,
3078                });
3079                if default != NO_HANDLE {
3080                    binding.retain_handle(default);
3081                }
3082                Ok(Some(state))
3083            }
3084            OperatorOp::TapFirst { .. } => {
3085                Ok(Some(Box::new(crate::op_state::TapFirstState::default())))
3086            }
3087            OperatorOp::Settle { .. } => {
3088                Ok(Some(Box::new(crate::op_state::SettleState::default())))
3089            }
3090            OperatorOp::Map { .. }
3091            | OperatorOp::Filter { .. }
3092            | OperatorOp::Combine { .. }
3093            | OperatorOp::WithLatestFrom { .. }
3094            | OperatorOp::Merge
3095            | OperatorOp::Tap { .. }
3096            | OperatorOp::Valve => Ok(None),
3097        }
3098    }
3099
3100    /// Sugar over [`Self::register`] — register a built-in operator node
3101    /// (Slice C-1, D009; D026 generic scratch). The operator dispatch path
3102    /// lives in `fire_operator`; `op` selects which per-operator FFI
3103    /// method on [`BindingBoundary`] gets called per fire.
3104    ///
3105    /// For stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
3106    /// [`Last`] with a default), the seed/default handle is captured
3107    /// into the appropriate
3108    /// [`OperatorScratch`](crate::op_state::OperatorScratch) struct
3109    /// stored at [`NodeRecord::op_scratch`], and Core takes one retain
3110    /// share via [`BindingBoundary::retain_handle`].
3111    ///
3112    /// # Errors
3113    ///
3114    /// - [`RegisterError::OperatorWithoutDeps`] — `deps` is empty (use
3115    ///   [`Self::register_producer`] instead).
3116    /// - [`RegisterError::OperatorSeedSentinel`] — `op` is
3117    ///   [`OperatorOp::Scan`] / [`OperatorOp::Reduce`] with a
3118    ///   [`NO_HANDLE`] seed.
3119    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
3120    ///   registered.
3121    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
3122    ///   resubscribable.
3123    pub fn register_operator(
3124        &self,
3125        deps: &[NodeId],
3126        op: OperatorOp,
3127        opts: OperatorOpts,
3128    ) -> Result<NodeId, RegisterError> {
3129        self.register(NodeRegistration {
3130            deps: deps.to_vec(),
3131            fn_or_op: Some(NodeFnOrOp::Op(op)),
3132            opts: NodeOpts {
3133                equals: opts.equals,
3134                partial: opts.partial,
3135                ..NodeOpts::default()
3136            },
3137        })
3138    }
3139
3140    // -------------------------------------------------------------------
3141    // Subscription
3142    // -------------------------------------------------------------------
3143
3144    /// Subscribe a sink to a node. Returns a [`Subscription`] handle —
3145    /// dropping the handle unsubscribes the sink. Per §10.12, no manual
3146    /// `unsubscribe(node, id)` call is required.
3147    ///
3148    /// Push-on-subscribe (R1.2.3, R2.2.3 step 4): the sink is registered AFTER
3149    /// the START handshake fires. The handshake contents depend on node
3150    /// state:
3151    /// - Sentinel cache + live (non-terminal): `[START]`
3152    /// - Cached + live: `[START, DATA(handle)]`
3153    ///
3154    /// Subscribe-after-terminal semantics (canonical R2.2.7.a / R2.2.7.b,
3155    /// D118 2026-05-10):
3156    /// - **Resubscribable + terminal** (any TEARDOWN state): the subscribe
3157    ///   call first **resets** the node — clears `terminal`,
3158    ///   `has_fired_once`, `has_received_teardown`, all `dep_handles` to
3159    ///   `NO_HANDLE`, all `dep_terminals` to `None`, drains the pause
3160    ///   lockset, clears the replay buffer. The new subscriber receives a
3161    ///   fresh `[START]` (cache survives for state nodes per R2.2.8;
3162    ///   sentinel for compute). The `wipe_ctx` cleanup hook fires
3163    ///   lock-released so binding-side `ctx.store` starts fresh.
3164    /// - **Non-resubscribable + terminal** (any TEARDOWN state): the
3165    ///   subscribe is rejected — `try_subscribe` returns
3166    ///   [`SubscribeError::TornDown`]; this method (the panic-on-error
3167    ///   variant) panics with the diagnostic.
3168    ///
3169    /// Activation (R2.2.3 step 5): if this is the first subscriber and the
3170    /// node is a derived/dynamic compute, recursively activate deps so their
3171    /// cached handles fill our `dep_handles`.
3172    ///
3173    /// # Returns
3174    ///
3175    /// Returns the [`SubscriptionId`]. Pair it with the `node_id` you
3176    /// passed here and call [`Self::unsubscribe`] to deregister (S2b /
3177    /// D225: core-level RAII retired — a binding-layer RAII wrapper over
3178    /// `unsubscribe` provides drop-convenience where the holder co-owns
3179    /// the `Core` on its affinity worker).
3180    ///
3181    /// # Panics
3182    ///
3183    /// Panics if the node is non-resubscribable AND has terminated
3184    /// ([`SubscribeError::TornDown`], R2.2.7.b).
3185    #[allow(clippy::needless_pass_by_value)] // Sink is `Rc<dyn Fn>` (D272); we clone for the subscribers map and call it directly. Taking by value matches the ergonomics callers expect.
3186    pub fn subscribe(&self, node_id: NodeId, sink: Sink) -> SubscriptionId {
3187        match self.try_subscribe(node_id, sink) {
3188            Ok(sub) => sub,
3189            Err(e) => panic!("{e}"),
3190        }
3191    }
3192
3193    /// Synchronous owner-invoked unsubscribe (D225 refined A2). Symmetric
3194    /// with [`Core::subscribe`] (the caller has `&Core`, exactly like
3195    /// [`Core::emit`]). Runs the full deregister + Phase G / lifecycle
3196    /// cleanup chain (OnDeactivation → `producer_deactivate` → `wipe_ctx`
3197    /// → Core cache-clear), behaviour-identical to the legacy
3198    /// `Subscription::Drop`. Idempotent: a `sub_id` already gone (node
3199    /// destroyed / double-unsub) is a silent no-op. D225 S2b retires the
3200    /// core-level RAII `Subscription` in favour of a binding-layer RAII
3201    /// wrapper over this method (the binding holds its `Core` on its
3202    /// affinity worker, so its wrapper's `Drop` calls this synchronously).
3203    ///
3204    /// S2b promotes this (and [`SubscriptionId`]) to `pub` as the
3205    /// binding-facing surface that replaces the retired core-level RAII
3206    /// `Subscription`. Callers (binding-layer RAII wrapper, producer
3207    /// `producer_deactivate` per D229, tests) pair the `node_id` they
3208    /// subscribed with the returned `sub_id`.
3209    pub fn unsubscribe(&self, node_id: NodeId, sub_id: SubscriptionId) {
3210        unsubscribe_sink(self, node_id, sub_id);
3211    }
3212
3213    /// Fallible subscribe. Returns `Err` on:
3214    /// - Partition order violation (Phase H+ STRICT, D115) — caller defers.
3215    /// - Non-resubscribable terminal node (R2.2.7.b, D118) — caller skips.
3216    ///
3217    /// Used by `subscribe` (unwraps both errors as panics) and producer-
3218    /// pattern operator sinks (match on variant).
3219    #[allow(clippy::needless_pass_by_value)]
3220    pub fn try_subscribe(
3221        &self,
3222        node_id: NodeId,
3223        sink: Sink,
3224    ) -> Result<SubscriptionId, SubscribeError> {
3225        // Subscribe protocol (S4/D246/D248 single-owner; supersedes the
3226        // pre-S2c Slice E `wave_owner` ReentrantMutex sequence which was
3227        // deleted with the §7 machinery):
3228        //
3229        // 1. Acquire the state lock briefly: alloc sub_id, run
3230        //    resubscribable reset if applicable, snapshot handshake
3231        //    state, install sink in `subscribers` + bump
3232        //    `subscribers_revision` (Slice X4/D2 freeze). Drop the
3233        //    state lock.
3234        // 2. Fire the handshake LOCK-RELEASED. Per-tier slices
3235        //    (R1.3.5.a): `[Start]` / `[Data(cache)]?` / `[Complete]?` /
3236        //    `[Error(h)]?` / `[Teardown]?`. Empty tiers are skipped. A
3237        //    sink callback may re-enter Core via the owner-side
3238        //    mailbox/`DeferQueue` seam; the owner drain applies it as
3239        //    a nested wave.
3240        // 3. Run activation under `run_wave` if needed (first
3241        //    subscriber on a non-state node).
3242        //
3243        // Happens-after discipline (replaces the pre-S2c cross-thread
3244        // `wave_owner` BLOCK): `Core` is single-owner `!Send + !Sync`
3245        // (D248) — the one owner thread installs the sink under the
3246        // state lock BEFORE the handshake fires, and the
3247        // `subscribers_revision` bump (Slice X4/D2) freezes any
3248        // earlier-in-wave `PendingBatch` against double-delivery to
3249        // the new sink. There is no concurrent thread to block.
3250
3251        let (sub_id, tier_slices, needs_activation, did_reset) = {
3252            let mut s = self.lock_state();
3253
3254            // R2.2.7.b (D118, 2026-05-10): non-resubscribable + terminal →
3255            // reject. The stream is permanently over; subscribe gets a
3256            // clean error rather than a confusing replay of past events.
3257            // Operators (zip / concat / race / ...) match on the variant
3258            // and skip the source.
3259            //
3260            // The `has_received_teardown` flag is irrelevant here —
3261            // `terminal.is_some()` alone gates rejection. The auto-TEARDOWN
3262            // cascade in R2.6.4 / Lock 6.F means torn_down lags terminal
3263            // by at most one wave anyway; a brief mid-wave window where
3264            // `terminal.is_some() && !torn_down` is reachable but the
3265            // rejection decision doesn't depend on which side of that
3266            // window we're in.
3267            let should_reject = {
3268                let rec = s.require_node(node_id);
3269                !rec.resubscribable && rec.terminal.is_some()
3270            };
3271            if should_reject {
3272                drop(s);
3273                return Err(SubscribeError::TornDown { node: node_id });
3274            }
3275
3276            let sub_id = s.alloc_sub_id();
3277
3278            // R2.2.7.a (D118, 2026-05-10): resubscribable + terminal → reset
3279            // to fresh lifecycle, regardless of TEARDOWN state. The prior
3280            // `!has_received_teardown` guard (Slice A+B F3) conflated
3281            // "TEARDOWN is the cleanup signal of the previous activation
3282            // cycle" with "permanent destruction" — corrected per the
3283            // canonical-spec amendment. `reset_for_fresh_lifecycle` clears
3284            // `has_received_teardown` along with `terminal`,
3285            // `has_fired_once`, dep_records sentinels, pause lockset, and
3286            // replay buffer. `wipe_ctx` fires lock-released after the
3287            // state lock drops so the binding's `ctx.store` starts fresh.
3288            let needs_reset = {
3289                let rec = s.require_node(node_id);
3290                rec.resubscribable && rec.terminal.is_some()
3291            };
3292            if needs_reset {
3293                self.reset_for_fresh_lifecycle(&mut s, node_id);
3294            }
3295
3296            // Snapshot handshake state under lock.
3297            //
3298            // F5 (/qa 2026-05-10): post-D118 R2.2.7.a/b, the snapshot
3299            // ALWAYS sees a non-terminal node here — `should_reject`
3300            // already rejected non-resubscribable terminal above, and
3301            // `needs_reset` cleared resubscribable terminal back to
3302            // `terminal = None` / `has_received_teardown = false`.
3303            // The pre-D118 terminal-replay + teardown-replay branches
3304            // were dead code in this post-D118 sequence and are
3305            // removed.
3306            let (cache, is_state, first_subscriber) = {
3307                let rec = s.require_node(node_id);
3308                debug_assert!(
3309                    rec.terminal.is_none(),
3310                    "R2.2.7.a/b invariant: post-reject/reset, terminal must be None"
3311                );
3312                debug_assert!(
3313                    !rec.has_received_teardown,
3314                    "R2.2.7.a invariant: reset clears has_received_teardown"
3315                );
3316                (rec.cache, rec.is_state(), rec.subscribers.is_empty())
3317            };
3318
3319            // Build per-tier handshake slices. Each non-empty slice is
3320            // fired as a separate sink call (R1.3.5.a tier-split).
3321            let mut tier_slices: SmallVec<[Vec<Message>; 4]> = SmallVec::new();
3322            tier_slices.push(vec![Message::Start]);
3323            if cache != NO_HANDLE {
3324                tier_slices.push(vec![Message::Data(cache)]);
3325            }
3326            // Slice E1 (R2.6.5 / Lock 6.G): replay buffered DATA between
3327            // [Start] (and the cache slice, if present) and any terminal.
3328            // Each buffered handle becomes a separate per-tier slice so
3329            // late subscribers see the historical Data sequence as
3330            // distinct sink calls.
3331            //
3332            // Dedupe: when a cache slice is present and the buffer's last
3333            // entry is the same handle (the typical case — cache always
3334            // tracks the last DATA emitted, and the buffer's tail entry
3335            // is that same DATA), skip the last buffer entry to avoid
3336            // delivering Data(cache) twice. For state nodes whose cache
3337            // survives unsubscribe, the buffer may have older entries
3338            // the cache doesn't reflect; the dedupe only drops the
3339            // single trailing entry that equals cache. (QA A1, 2026-05-07)
3340            let replay_handles: Vec<HandleId> = {
3341                let rec = s.require_node(node_id);
3342                let cap = rec.replay_buffer_cap.unwrap_or(0);
3343                if cap == 0 {
3344                    Vec::new()
3345                } else {
3346                    let mut v: Vec<HandleId> = rec.replay_buffer.iter().copied().collect();
3347                    if cache != NO_HANDLE && v.last() == Some(&cache) {
3348                        v.pop();
3349                    }
3350                    v
3351                }
3352            };
3353            for h in &replay_handles {
3354                tier_slices.push(vec![Message::Data(*h)]);
3355            }
3356
3357            // Install sink BEFORE dropping state lock so any subsequent
3358            // owner-side wave (after our scope ends) sees the sink
3359            // already registered. (Pre-S2c the comment referenced
3360            // acquiring `wave_owner` for cross-thread serialization;
3361            // post-D248 single-owner there is no cross-thread emitter
3362            // and no `wave_owner` lock — the rule is just "install
3363            // before drop so future waves see the sink.")
3364            //
3365            // Slice X4 / D2: bump `subscribers_revision` alongside the
3366            // insert so a pending_notify entry opened earlier in the same
3367            // wave (e.g. inside `batch(|| { emit(s, h1); subscribe(s,
3368            // late); emit(s, h2); })`) starts a fresh `PendingBatch` on
3369            // its next `queue_notify` push — making the new sink visible
3370            // to subsequent emits' flush slices, while the pre-subscribe
3371            // batch's snapshot stays frozen so we don't double-deliver
3372            // earlier emits via the wave's flush AND the new sub's
3373            // handshake replay.
3374            {
3375                let rec = s.require_node_mut(node_id);
3376                rec.subscribers.insert(sub_id, sink.clone());
3377                rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
3378            }
3379
3380            let needs_activation = first_subscriber && !is_state;
3381            (sub_id, tier_slices, needs_activation, needs_reset)
3382            // state lock drops here
3383        };
3384
3385        // Slice E2 (R2.4.6 / D055): on resubscribable terminal reset, fire
3386        // `wipe_ctx` LOCK-RELEASED so the binding drops its `NodeCtxState`
3387        // entry (clearing both `store` and any residual `current_cleanup`).
3388        // The new subscriber's first invoke_fn sees a fresh empty store.
3389        // Fires AFTER the state lock drops so the binding's
3390        // `node_ctx.lock()` can't deadlock against Core's state lock — and
3391        // BEFORE the handshake so the wipe is observable before any
3392        // user-visible interaction with the new lifecycle.
3393        if did_reset {
3394            self.binding.wipe_ctx(node_id);
3395        }
3396
3397        // Fire handshake LOCK-RELEASED. A sink may re-enter Core via
3398        // the owner-side mailbox/`DeferQueue` seam; the owner drain
3399        // applies it as a nested wave. Single-owner `!Send` Core: no
3400        // `wave_owner` mutex, no cross-thread emitter (S4/D248).
3401        //
3402        // A7 (Slice F, 2026-05-07): per-tier slice fire is wrapped in
3403        // `catch_unwind`. The sink is installed in `subscribers` BEFORE
3404        // the handshake fires (load-bearing — concurrent threads observe
3405        // the sink immediately). If a sink panics on tier N, the panic
3406        // would otherwise unwind out of `subscribe` BEFORE the
3407        // `Subscription` handle is constructed, leaving the sink
3408        // registered in `subscribers` with no user-held handle to drop.
3409        // Subsequent waves' `flush_notifications` would re-fire the
3410        // panicking sink forever.
3411        //
3412        // On panic: remove the sink from `subscribers` (via the
3413        // already-allocated `sub_id`), drop `_wave_guard` cleanly via
3414        // RAII, and resume the unwind so the user observes the panic at
3415        // the call site. Same effect as the user dropping the
3416        // `Subscription` immediately, but pre-emptive.
3417        for slice in &tier_slices {
3418            let sink_clone = sink.clone();
3419            let slice_ref: &[Message] = slice;
3420            let result = catch_unwind(AssertUnwindSafe(|| sink_clone(slice_ref)));
3421            if let Err(panic_payload) = result {
3422                // Remove the orphaned sink. Best-effort: if the node was
3423                // since torn down (e.g., the sink itself called teardown
3424                // before panicking), the entry may already be gone.
3425                {
3426                    let mut s = self.lock_state();
3427                    if let Some(rec) = s.nodes.get_mut(&node_id) {
3428                        rec.subscribers.remove(&sub_id);
3429                        // Slice X4 / D2: keep revision-tracked snapshot
3430                        // discipline consistent with the install site —
3431                        // any pending_notify entry that already absorbed
3432                        // the panicking sink under the post-install
3433                        // revision should start a fresh batch on its
3434                        // next queue_notify push.
3435                        rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
3436                    }
3437                }
3438                std::panic::resume_unwind(panic_payload);
3439            }
3440        }
3441
3442        // D261 / S7: drain mailbox if the handshake-fire phase posted to
3443        // it. The per-tier handshake slices above fire sinks LOCK-RELEASED
3444        // outside any `BatchGuard`; a sink that posts via
3445        // `MailboxEmitter`/`SinkEmitter` (e.g. `graphrefly_structures::
3446        // reactive::SinkEmitter::emit`'s `mailbox.post_emit`) would
3447        // otherwise leave the post stranded until the next external wave
3448        // entry. Without this drain, push-on-subscribe replay for an
3449        // operator whose internal sink re-emits via the mailbox
3450        // (`reactive_log.view(slice/tail/fromCursor)`'s internal sinks)
3451        // never lands on the operator's output node by subscribe-time, so
3452        // the **next** `core.subscribe(operator_node, …)` sees `cache ==
3453        // NO_HANDLE` and emits an empty `[Data]` slice — same root
3454        // mechanism as D260 (mailbox post stranded past a sink-fire
3455        // boundary), different boundary (handshake-fire vs fire_deferred).
3456        // The `BatchGuard` here is a wave-scoped RAII handle; **its
3457        // `Drop`** (not its construction) runs `drain_and_flush` (applies
3458        // the queued mailbox/DeferQueue ops) + D260's post-`fire_deferred`
3459        // re-drain loop until both queues quiesce. The brief lifetime
3460        // — opened then dropped at the end of the `if` block, before
3461        // activation — is the entire purpose of the binding.
3462        //
3463        // Cheap on the no-post path: one `runnable` atomic load per
3464        // subscribe; on a positive load, one `BatchGuard` open+drop
3465        // round-trip that drains whatever post the handshake produced.
3466        //
3467        // **Nested-subscribe note.** If `try_subscribe` is called from
3468        // inside an already-owning wave (e.g. a Defer closure calling
3469        // `cf.subscribe`), `begin_batch_for` → `claim_in_tick` returns
3470        // `false` (already owned) → the `BatchGuard` is non-owning →
3471        // its `Drop` is a no-op (early-return at the `!owns_tick` guard
3472        // in `BatchGuard::Drop`). The outer wave's drain loop catches
3473        // the post at its next `is_runnable()` top-of-loop check. Cheap
3474        // and correct in both cases.
3475        if self.mailbox.is_runnable() || self.deferred.is_runnable() {
3476            // Use the same single-owner batch entry as `try_emit` — seed
3477            // is the node we just subscribed to (the partition-touch hint
3478            // is vestigial post-S2c per `try_begin_batch_for` doc, but
3479            // the seed is retained for D211 minimal-churn).
3480            let _drain = self.begin_batch_for(node_id);
3481        }
3482
3483        // Run activation if needed. `run_wave_for(node_id)` acquires
3484        // only the partitions transitively touched from `node_id`
3485        // (downstream cascade + meta-companion teardown reach) — same-
3486        // partition activation re-enters reentrantly. Slice Y1 / Phase E.
3487        if needs_activation {
3488            self.run_wave_for(node_id, |this| {
3489                let mut s = this.lock_state();
3490                this.activate_derived(&mut s, node_id);
3491            });
3492        }
3493
3494        // D246/S2c: no group wave-locks to drop (single-owner).
3495        // D274 (2026-05-21): the orphan "drain deferred producer ops"
3496        // call site was deleted along with the producer-defer queue
3497        // itself (was a no-op shim per D211); no remaining work here.
3498
3499        Ok(sub_id)
3500    }
3501
3502    /// Mark `node_id` as resubscribable per R2.2.7. Resubscribable nodes
3503    /// reset their terminal-lifecycle state on a fresh subscribe — see
3504    /// [`Self::subscribe`].
3505    ///
3506    /// Configuration call — must be made before the node has any active
3507    /// subscribers, since changing the policy mid-flight would surprise
3508    /// existing observers.
3509    ///
3510    /// # Panics
3511    ///
3512    /// Panics if the node has subscribers (the policy is observable
3513    /// behavior; changing it after the fact would change semantics for
3514    /// existing sinks).
3515    pub fn set_resubscribable(&self, node_id: NodeId, resubscribable: bool) {
3516        // 2b-ii-B (D220-EXEC): route to the node's shard (see `try_emit`).
3517        let mut s = self.lock_state();
3518        let rec = s.require_node_mut(node_id);
3519        assert!(
3520            rec.subscribers.is_empty(),
3521            "set_resubscribable: node already has subscribers; \
3522             configure resubscribable before any subscribe call"
3523        );
3524        rec.resubscribable = resubscribable;
3525    }
3526
3527    /// Reset a resubscribable node's terminal-lifecycle state. Called from
3528    /// `subscribe` when a late subscriber arrives at a flagged node.
3529    ///
3530    /// Released: terminal-slot retain (Error handle), all per-dep terminal
3531    /// retains (Error handles), all data_batch retains.
3532    /// Cleared: `terminal`, `has_fired_once`, `has_received_teardown`, all
3533    /// dep_records to sentinel, the pause lockset (any held locks are
3534    /// released — replay buffer drops silently because there are no
3535    /// subscribers to flush to).
3536    fn reset_for_fresh_lifecycle(&self, s: &mut St<'_>, node_id: NodeId) {
3537        // Phase 1: collect wave-state handle releases + take the old
3538        // op_scratch + reset other state. Take all mutations under one
3539        // borrow so the post-borrow phases don't re-walk dep_records.
3540        let (prev_op, mut old_scratch, handles_to_release, pause_buffer_payloads) = {
3541            let rec = s.require_node_mut(node_id);
3542            let mut hs = Vec::new();
3543            if let Some(TerminalKind::Error(h)) = rec.terminal {
3544                hs.push(h);
3545            }
3546            for dr in &rec.dep_records {
3547                if let Some(TerminalKind::Error(h)) = dr.terminal {
3548                    hs.push(h);
3549                }
3550                for &h in &dr.data_batch {
3551                    hs.push(h);
3552                }
3553                // Slice C-3 /qa: also release `prev_data`. Prior to this
3554                // collection, `reset_for_fresh_lifecycle` overwrote
3555                // `dr.prev_data = NO_HANDLE` without releasing the old
3556                // handle, leaking one share per dep per resubscribable
3557                // cycle. The leak was masked because no test exercised
3558                // the per-dep `prev_data` retain across a lifecycle
3559                // reset; surfaced by the T1 tightening of
3560                // `last_releases_buffered_latest_on_lifecycle_reset`.
3561                if dr.prev_data != NO_HANDLE {
3562                    hs.push(dr.prev_data);
3563                }
3564            }
3565            // Take pause_state's buffer; collect its payload handles for
3566            // release (they were retained at queue_notify time; buffer
3567            // drops because the new subscriber starts fresh).
3568            let mut pulled = Vec::new();
3569            if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
3570                for msg in buffer.drain(..) {
3571                    if let Some(h) = msg.payload_handle() {
3572                        pulled.push(h);
3573                    }
3574                }
3575            }
3576            // Slice E1: drain the replay buffer too — the new subscriber
3577            // gets a fresh lifecycle and shouldn't see prior emissions.
3578            for h in rec.replay_buffer.drain(..) {
3579                pulled.push(h);
3580            }
3581            // Reset wave / lifecycle state.
3582            rec.terminal = None;
3583            rec.has_fired_once = rec.cache != NO_HANDLE && rec.is_state();
3584            rec.has_received_teardown = false;
3585            for dr in &mut rec.dep_records {
3586                dr.prev_data = NO_HANDLE;
3587                dr.data_batch.clear();
3588                dr.terminal = None;
3589                dr.dirty = false;
3590                dr.involved_this_wave = false;
3591            }
3592            rec.pause_state = PauseState::Active;
3593            rec.involved_this_wave = false;
3594            rec.dirty = false;
3595            // §10.13 perf (D047): reset received_mask — fresh lifecycle
3596            // means all deps are sentinel again.
3597            rec.received_mask = 0;
3598            // §10.3 perf (Slice V1): reset involved_mask.
3599            rec.involved_mask = 0;
3600            // P7 (Slice A close /qa): Dynamic nodes clear `tracked` so
3601            // the post-reset first fire repopulates from the fn's
3602            // returned tracked-deps set.
3603            if rec.is_dynamic {
3604                rec.tracked.clear();
3605            }
3606            // Take the old scratch out so we can release its handles and
3607            // install a fresh one. Operator op is copied for the
3608            // rebuild step below.
3609            let prev_op = rec.op;
3610            let old = std::mem::take(&mut rec.op_scratch);
3611            (prev_op, old, hs, pulled)
3612        };
3613
3614        // Phase 2 (Slice C-3 /qa P1 — RETAIN-BEFORE-RELEASE ordering):
3615        // build the fresh scratch FIRST, taking new retains on any
3616        // seed/default handles. This must run BEFORE Phase 3 releases
3617        // the old scratch's shares — if old `acc` (Scan/Reduce) or old
3618        // `latest` (Last) aliases the new `seed`/`default` (common:
3619        // `fold(seed, x) == seed` interns to the same registry entry),
3620        // releasing the old share first could collapse the binding's
3621        // registry slot to zero (production bindings remove the value
3622        // entry on refcount-zero — see `tests/common/mod.rs:191-204`),
3623        // and a subsequent `retain_handle` on the new seed would bump a
3624        // refcount on a slot whose value has been removed. By taking
3625        // the new retains first, we floor the refcount at ≥1 before
3626        // any release happens.
3627        let new_scratch = match prev_op {
3628            // Slice H: the OperatorOp stored on NodeRecord previously
3629            // passed `make_op_scratch` validation at registration time
3630            // (RegisterError::OperatorSeedSentinel for Scan/Reduce
3631            // sentinel seeds; Last { default: NO_HANDLE } is accepted
3632            // and never errors). Re-running it here on the same op
3633            // value is structurally guaranteed to succeed.
3634            Some(op) => self
3635                .make_op_scratch(op)
3636                .expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time"),
3637            None => None,
3638        };
3639
3640        // Phase 3: NOW release handles owned by the old op_scratch
3641        // (Scan/Reduce acc, Distinct/Pairwise prev, Last latest +
3642        // default). Safe per Phase 2's retain-first floor. The boxed
3643        // value is consumed and dropped after.
3644        if let Some(scratch) = old_scratch.as_mut() {
3645            scratch.release_handles(&*self.binding);
3646        }
3647        drop(old_scratch);
3648
3649        // Phase 3b (D-α, D028 full close, 2026-05-10): drain the
3650        // per-Core `pending_scratch_release` queue populated by Phase G
3651        // on prior resubscribable + non-terminal deactivate cycles.
3652        // Queued boxes' shares may alias the seed/default we just
3653        // retained in Phase 2 (e.g., when scan's `acc` interns to the
3654        // same registry slot as `seed`). Releasing AFTER Phase 2's
3655        // floor keeps the registry slot at refcount ≥1 throughout —
3656        // mirrors the Phase 2/3 retain-before-release ordering. Safe
3657        // even when the queue is empty (no prior deactivations
3658        // happened).
3659        let queued: Vec<Box<dyn crate::op_state::OperatorScratch>> =
3660            std::mem::take(&mut s.shared.pending_scratch_release);
3661        for mut scratch in queued {
3662            scratch.release_handles(&*self.binding);
3663        }
3664
3665        // Phase 4: install the fresh scratch.
3666        {
3667            let rec = s.require_node_mut(node_id);
3668            rec.op_scratch = new_scratch;
3669        }
3670
3671        // Phase 5: release wave-state handles collected in phase 1.
3672        for h in handles_to_release {
3673            self.binding.release_handle(h);
3674        }
3675        for h in pause_buffer_payloads {
3676            self.binding.release_handle(h);
3677        }
3678    }
3679
3680    /// Activate `root` and any transitive uncached compute deps so their
3681    /// caches fill our dep_handles slots.
3682    ///
3683    /// Slice A close (M1): pure dep-walk + dep_handles population +
3684    /// pending_fires queueing. No `in_tick` management or `drain_and_flush`
3685    /// call — the outer caller (typically `Core::subscribe` via
3686    /// [`Core::run_wave`]) owns the wave lifecycle and drains lock-released
3687    /// around `invoke_fn`.
3688    ///
3689    /// Walk shape:
3690    ///   1. **Discover phase (DFS via Vec stack):** starting at `root`,
3691    ///      walk transitively-needing-activation deps via the `deps`
3692    ///      chain. Build an ordering where each node appears AFTER all
3693    ///      of its uncached compute deps — i.e., reverse topological
3694    ///      among the visited subgraph.
3695    ///   2. **Deliver phase (forward iteration):** for each visited
3696    ///      node in dep-first order, push deps' caches into the node's
3697    ///      `dep_handles` slots. Caches that were sentinel pre-walk are
3698    ///      filled because their parent's fn fires later in the wave's
3699    ///      drain loop and `commit_emission` propagates new caches forward
3700    ///      via `deliver_data_to_consumer` — the same path this method
3701    ///      uses for the initial seed. Adds the node to `pending_fires`
3702    ///      if its tracked-deps gate is satisfied; the wave-engine drain
3703    ///      fires the fn lock-released around `invoke_fn`.
3704    pub(crate) fn activate_derived(&self, s: &mut CoreState, root: NodeId) {
3705        // Phase 1: discover. DFS to collect every compute node reachable
3706        // via deps that doesn't yet have a cache and hasn't fired.
3707        // Record them in dep-first (post-order) so phase 2 can deliver
3708        // caches forward. Frame is `(node_id, finalize)` — `finalize=false`
3709        // means "first visit: push deps then re-push self with finalize=true";
3710        // `finalize=true` means "deps have been expanded, append self to
3711        // `order`."
3712        let mut visited: HashSet<NodeId> = HashSet::new();
3713        let mut order: Vec<NodeId> = Vec::new();
3714        let mut stack: Vec<(NodeId, bool)> = vec![(root, false)];
3715        while let Some((id, finalize)) = stack.pop() {
3716            if finalize {
3717                order.push(id);
3718                continue;
3719            }
3720            if !visited.insert(id) {
3721                continue;
3722            }
3723            stack.push((id, true));
3724            let dep_ids: Vec<NodeId> = s.require_node(id).dep_ids_vec();
3725            for dep_id in dep_ids {
3726                let (dep_is_state, dep_cache, dep_has_fired) = {
3727                    let dep_rec = s.require_node(dep_id);
3728                    (dep_rec.is_state(), dep_rec.cache, dep_rec.has_fired_once)
3729                };
3730                if !dep_is_state
3731                    && dep_cache == NO_HANDLE
3732                    && !dep_has_fired
3733                    && !visited.contains(&dep_id)
3734                {
3735                    stack.push((dep_id, false));
3736                }
3737            }
3738        }
3739
3740        // Phase 2: deliver caches in dep-first order. For each node, walk
3741        // its deps and call `deliver_data_to_consumer` for any with caches.
3742        // Producer nodes (no deps + has fn — Slice D, D031) have no deps
3743        // to walk; queue them directly into `pending_fires` so the wave
3744        // engine fires their fn once on activation.
3745        //
3746        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
3747        // per-thread WaveState. State lock + thread_local borrow are
3748        // independent; deliver_data_to_consumer also writes pending_fires
3749        // via WaveState (no nested with_wave_state borrows here).
3750        for &id in &order {
3751            let (dep_ids, is_producer) = {
3752                let rec = s.require_node(id);
3753                (rec.dep_ids_vec(), rec.is_producer())
3754            };
3755            if is_producer {
3756                crate::batch::with_wave_state(|ws| {
3757                    ws.pending_fires.insert(id);
3758                });
3759                continue;
3760            }
3761            for (i, dep_id) in dep_ids.iter().copied().enumerate() {
3762                let dep_cache = s.require_node(dep_id).cache;
3763                if dep_cache != NO_HANDLE {
3764                    self.deliver_data_to_consumer(s, id, i, dep_cache);
3765                }
3766            }
3767        }
3768    }
3769
3770    // -------------------------------------------------------------------
3771    // Emission entry point
3772    // -------------------------------------------------------------------
3773
3774    /// Set a state node's value. Triggers a wave (DIRTY → DATA/RESOLVED →
3775    /// fn fires for downstream).
3776    ///
3777    /// Silent no-op if the node has already terminated (R1.3.4). The handle
3778    /// passed in is still released by the caller's binding-side intern path
3779    /// — no implicit retain is consumed when the call short-circuits.
3780    ///
3781    /// # Panics
3782    ///
3783    /// Panics if `node_id` is not a state node, or if `new_handle` is
3784    /// [`NO_HANDLE`] (per R1.2.4, sentinel is not a valid DATA payload).
3785    pub fn emit(&self, node_id: NodeId, new_handle: HandleId) {
3786        // D274 (2026-05-21): inlined `try_emit`. The
3787        // `Result<(), PartitionOrderViolation>` wrapper is gone — groups
3788        // are static identity only post-D248/D253, single-owner per Core
3789        // per D246, one Core per OS thread per D252; partition-order
3790        // violations cannot fire.
3791        assert!(
3792            new_handle != NO_HANDLE,
3793            "NO_HANDLE is not a valid DATA payload (R1.2.4)"
3794        );
3795        // Validate + terminal short-circuit under a brief lock.
3796        //
3797        // emit() is valid for State and Producer nodes — both are
3798        // intrinsic sources whose values are not derived from declared
3799        // deps. State nodes get emit() from user code; Producer nodes
3800        // get emit() from sink callbacks the producer's build closure
3801        // registered (sink fires → re-enter Core → emit on self).
3802        // Derived / Dynamic / Operator nodes emit via their fn return
3803        // value through fire_fn / fire_operator, NOT via emit().
3804        //
3805        // §7-A (deferred): this standalone validation lock is redundant
3806        // with commit_emission Phase 1 and collapsing it is part of §7
3807        // item (1); §5b measured ~0% single-thread benefit so the
3808        // collapse is deferred (porting-deferred.md §7-A, flagged for
3809        // ratification). The normal-path validation is retained here.
3810        {
3811            let s = self.lock_state();
3812            let rec = s.require_node(node_id);
3813            assert!(
3814                rec.is_state() || rec.is_producer(),
3815                "emit() is for state or producer nodes only; \
3816                 derived/dynamic/operator emit via their fn return value"
3817            );
3818            if rec.terminal.is_some() {
3819                drop(s);
3820                // Caller's intern share would otherwise leak; cache slot
3821                // ownership doesn't transfer because we're not advancing
3822                // cache. Released lock-released so the binding can't
3823                // deadlock against an internal binding mutex.
3824                self.binding.release_handle(new_handle);
3825                return;
3826            }
3827        }
3828        // Run the wave for `node_id`. Slice Y1 / Phase E: emit cascades
3829        // only via `s.children`. S4/D248: single-owner `!Send` Core —
3830        // one uninterrupted owner-side drain (no `wave_owner` mutex,
3831        // no cross-thread parallel waves within a Core); cross-`Core`
3832        // parallelism is host-native via independent per-worker Cores.
3833        self.try_run_wave_for(node_id, |this| {
3834            this.commit_emission(node_id, new_handle);
3835        });
3836    }
3837
3838    /// Read a node's current cache. Returns [`NO_HANDLE`] if sentinel.
3839    #[must_use]
3840    pub fn cache_of(&self, node_id: NodeId) -> HandleId {
3841        self.lock_state().require_node(node_id).cache
3842    }
3843
3844    /// Whether the node's fn has fired at least once (compute) OR it has had
3845    /// a non-sentinel value (state).
3846    #[must_use]
3847    pub fn has_fired_once(&self, node_id: NodeId) -> bool {
3848        self.lock_state().require_node(node_id).has_fired_once
3849    }
3850
3851    // -------------------------------------------------------------------
3852    // Read-side inspection helpers (Slice E+, M2)
3853    //
3854    // Non-panicking accessors for graph-layer introspection (`describe()`,
3855    // `observe()`, `node_count()`). All five return Option/empty for
3856    // unknown ids — they're meant to back walks over `node_ids()` where
3857    // the caller already knows the ids are valid, plus debugging /
3858    // dry-run probes that prefer "absence" over "panic".
3859    //
3860    // Keep these strictly read-only: no wave entry, no binding callbacks,
3861    // no lock release. Each takes the state lock once, copies a small
3862    // value, and drops the lock.
3863    // -------------------------------------------------------------------
3864
3865    /// Snapshot of every registered `NodeId` in unspecified order. The
3866    /// order matches `HashMap` iteration over the internal node table —
3867    /// callers that need stable ordering should track names at the
3868    /// `Graph` layer (canonical spec §3.5 namespace).
3869    #[must_use]
3870    pub fn node_ids(&self) -> Vec<NodeId> {
3871        self.lock_state().nodes.keys().copied().collect()
3872    }
3873
3874    /// Total number of nodes registered in this Core.
3875    #[must_use]
3876    pub fn node_count(&self) -> usize {
3877        self.lock_state().nodes.len()
3878    }
3879
3880    /// Returns `Some(kind)` for known nodes, `None` for unknown. Kind is
3881    /// **derived** from the field shape per D030 — see [`NodeKind`].
3882    #[must_use]
3883    pub fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
3884        self.lock_state().nodes.get(&node_id).map(NodeRecord::kind)
3885    }
3886
3887    /// Snapshot of the node's deps in declaration order. Empty for
3888    /// unknown nodes or for state nodes (which have no deps).
3889    #[must_use]
3890    pub fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
3891        self.lock_state()
3892            .nodes
3893            .get(&node_id)
3894            .map(NodeRecord::dep_ids_vec)
3895            .unwrap_or_default()
3896    }
3897
3898    /// Returns `Some(kind)` if the node has terminated (R1.3.4) — the
3899    /// pair `Some(Complete)` / `Some(Error(h))` mirrors the wire message
3900    /// the node emitted. `None` for live nodes or unknown ids.
3901    #[must_use]
3902    pub fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
3903        self.lock_state()
3904            .nodes
3905            .get(&node_id)
3906            .and_then(|r| r.terminal)
3907    }
3908
3909    /// Returns the per-dep terminal slot for `child_id`'s dep at the
3910    /// given dep `NodeId` — `Some(kind)` if cascaded-terminal, `None`
3911    /// if live or if the dep isn't in `child_id`'s dep list. Mirrors
3912    /// [`Self::is_terminal`] for the cascade-side state (`rec
3913    /// .dep_records[idx].terminal`).
3914    ///
3915    /// **Visibility:** `#[doc(hidden)] pub` — D291 /qa A7 (2026-05-25).
3916    /// Marked `pub` ONLY so the `d291_terminal_rollback.rs` integration
3917    /// test (a separate compilation unit per Rust's `tests/` model)
3918    /// can pin the per-dep slot directly — observing the downstream
3919    /// re-fire effect alone wouldn't close the F2/BH11 gap. `#[doc(hidden)]`
3920    /// flags it as NOT part of the public API surface; rustdoc skips
3921    /// it, and no parity-tests `Impl` contract method shadows it (per
3922    /// D196's consumer-pressure gate — there's no `nodeDepTerminalOf`
3923    /// equivalent on `ImplNode`). The original intent was `pub(crate)`
3924    /// (mirror [`Self::in_tick`]), but `pub(crate)` is inaccessible to
3925    /// integration tests; if a future Impl widening adds the read API,
3926    /// convert to plain `pub` (no breaking change, just drop the
3927    /// `#[doc(hidden)]`).
3928    #[doc(hidden)]
3929    #[must_use]
3930    pub fn dep_terminal_of(&self, child_id: NodeId, dep_node_id: NodeId) -> Option<TerminalKind> {
3931        let state = self.lock_state();
3932        let rec = state.nodes.get(&child_id)?;
3933        let idx = rec.dep_index_of(dep_node_id)?;
3934        rec.dep_records[idx].terminal
3935    }
3936
3937    /// Whether the node has wave-scoped DIRTY pending (a tier-1 message
3938    /// queued but the matching tier-3 settle has not yet flushed).
3939    /// `false` for unknown ids. Mostly useful for `describe()` status
3940    /// classification (R3.6.1 `"dirty"`).
3941    #[must_use]
3942    pub fn is_dirty(&self, node_id: NodeId) -> bool {
3943        self.lock_state()
3944            .nodes
3945            .get(&node_id)
3946            .is_some_and(|r| r.dirty)
3947    }
3948
3949    /// External sink count for `node_id` — number of live subscribers
3950    /// added via [`Self::subscribe`] / [`Self::try_subscribe`] that
3951    /// haven't been removed via [`Self::unsubscribe`] yet (NOT counting
3952    /// downstream node-fn fan-out tracked via `consumers`). Returns 0
3953    /// for unknown / torn-down nodes (consistent with the other `*_of`
3954    /// accessors' "unknown ⇒ default" stance).
3955    ///
3956    /// Used by `graphrefly_graph::resource_profile` (R3.6.3 / D285) to
3957    /// compute per-node `subscriberCount` + `hotspots.bySubscriberCount`
3958    /// + orphan classification (zero-subscriber non-state nodes ⇒
3959    ///   `idle-derived` / `idle-producer` / `orphan-effect`).
3960    #[must_use]
3961    pub fn sink_count_of(&self, node_id: NodeId) -> usize {
3962        self.lock_state()
3963            .nodes
3964            .get(&node_id)
3965            .map_or(0, |r| r.subscribers.len())
3966    }
3967
3968    /// Topological depth of `node_id` (1 + max dep `topo_rank`; 0 for
3969    /// leaf state nodes / no deps; `None` for unknown ids).
3970    ///
3971    /// Used by [`Self::pick_next_fire`] for O(|pending_fires|)
3972    /// glitch-free scheduling. Exposed publicly so consumers (and
3973    /// integration tests) can verify the cascade behavior after
3974    /// [`Self::set_deps`] (D300 / Q3, 2026-05-26 — consumer cascade).
3975    #[must_use]
3976    pub fn topo_rank_of(&self, node_id: NodeId) -> Option<u32> {
3977        self.lock_state().nodes.get(&node_id).map(|r| r.topo_rank)
3978    }
3979
3980    /// Snapshot of `parent`'s meta companion list (R1.3.9.d / R2.3.3 —
3981    /// the companions added via [`Self::add_meta_companion`]). Empty
3982    /// for unknown ids or for nodes with no companions registered.
3983    ///
3984    /// Used by the graph layer's `signal_invalidate` to filter meta
3985    /// children out of the broadcast (canonical R3.7.2 — meta caches
3986    /// are preserved across graph-wide INVALIDATE).
3987    #[must_use]
3988    pub fn meta_companions_of(&self, parent: NodeId) -> Vec<NodeId> {
3989        self.lock_state()
3990            .nodes
3991            .get(&parent)
3992            .map(|r| r.meta_companions.clone())
3993            .unwrap_or_default()
3994    }
3995
3996    // -------------------------------------------------------------------
3997    // Wave engine — lives in `crate::batch` (Slice C-1 module split;
3998    // Slice A close M1 refactor lifted the binding-callback re-entrance
3999    // restrictions). The methods are still on `Core`; see `batch.rs` for:
4000    //
4001    //   - `run_wave` — wave entry, manages own locking.
4002    //   - `drain_and_flush` — drain phase, lock-released around invoke_fn.
4003    //   - `commit_emission` — lock-released around custom_equals.
4004    //   - `pick_next_fire`, `deliver_data_to_consumer`, `queue_notify`,
4005    //     `flush_notifications` — wave-engine helpers.
4006    // -------------------------------------------------------------------
4007}
4008
4009// -----------------------------------------------------------------------
4010// COMPLETE / ERROR — terminal lifecycle + auto-cascade gating
4011// -----------------------------------------------------------------------
4012
4013impl Core {
4014    /// Emit `[COMPLETE]` (R1.3.4) on `node_id`, marking it terminal. After
4015    /// this call:
4016    ///
4017    /// - Subsequent `Core::emit` on this node is a silent no-op (idempotent
4018    ///   termination).
4019    /// - The node's fn no longer fires.
4020    /// - The node's cache is preserved (last value still observable via
4021    ///   `cache_of`).
4022    /// - Children receive `[COMPLETE]` (tier 5 — bypasses pause buffer).
4023    /// - Auto-cascade gating (Lock 2.B): each child that has all of its
4024    ///   deps in a terminal state auto-emits its own `[COMPLETE]`. ERROR
4025    ///   dominates COMPLETE — if any of a child's deps emitted ERROR, the
4026    ///   child auto-cascades that ERROR instead.
4027    ///
4028    /// Idempotent: calling `complete` on an already-terminal node is a no-op.
4029    ///
4030    /// # Panics
4031    ///
4032    /// Panics if `node_id` is unknown.
4033    pub fn complete(&self, node_id: NodeId) {
4034        // D274 (2026-05-21): inlined `try_complete`. The
4035        // `Result<(), PartitionOrderViolation>` wrapper was deleted.
4036        self.try_emit_terminal(node_id, TerminalKind::Complete);
4037    }
4038
4039    /// Emit `[ERROR, error_handle]` (R1.3.4) on `node_id`. `error_handle`
4040    /// must resolve to a non-sentinel value (R1.2.5) — the binding side has
4041    /// already interned the error value before this call. Same lifecycle
4042    /// effects as [`Self::complete`]; ERROR dominates COMPLETE in auto-
4043    /// cascade gating.
4044    ///
4045    /// # Panics
4046    ///
4047    /// Panics if `node_id` is unknown or `error_handle == NO_HANDLE`.
4048    pub fn error(&self, node_id: NodeId, error_handle: HandleId) {
4049        // D274 (2026-05-21): inlined `try_error`. The
4050        // `Result<(), PartitionOrderViolation>` wrapper was deleted.
4051        assert!(
4052            error_handle != NO_HANDLE,
4053            "NO_HANDLE is not a valid ERROR payload (R1.2.5)"
4054        );
4055        self.try_emit_terminal(node_id, TerminalKind::Error(error_handle));
4056        // The caller's intern share for `error_handle` is NOT transferred
4057        // to any slot — `terminate_node` takes its OWN retain for every
4058        // populated `terminal` and `dep_terminals` slot. Release the
4059        // caller's share here (mirrors `Core::emit`'s short-circuit
4060        // release on terminal). Without this, every `error()` call leaks
4061        // one binding-side handle ref. Slice A-bigger /qa item D fix.
4062        self.binding.release_handle(error_handle);
4063    }
4064
4065    fn try_emit_terminal(&self, node_id: NodeId, terminal: TerminalKind) {
4066        {
4067            let s = self.lock_state();
4068            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
4069        }
4070        // Wave on `node_id`'s touched partitions (Slice Y1 / Phase E).
4071        // COMPLETE / ERROR cascade follows `s.children` (in-partition by
4072        // union-find construction). The thunk acquires its own state lock
4073        // to queue the cascade.
4074        self.try_run_wave_for(node_id, |this| {
4075            let mut s = this.lock_state();
4076            this.terminate_node(&mut s, node_id, terminal);
4077        });
4078    }
4079
4080    /// Set the node's terminal slot, queue the wire message, and cascade to
4081    /// children. Idempotent on already-terminal node (no-op).
4082    ///
4083    /// Iterative implementation (Slice A-bigger, M1-close): a work-queue
4084    /// drives the cascade so deep linear chains don't overflow the OS
4085    /// thread stack. Mirrors `path_from_to`'s explicit-stack pattern.
4086    fn terminate_node(&self, s: &mut St<'_>, node_id: NodeId, terminal: TerminalKind) {
4087        let mut work: Vec<(NodeId, TerminalKind)> = vec![(node_id, terminal)];
4088        while let Some((id, t)) = work.pop() {
4089            if s.require_node(id).terminal.is_some() {
4090                continue; // Idempotent — already terminal.
4091            }
4092            // D291: snapshot the terminal slot's `None` state for
4093            // wave-rollback (R4.3.2 status-snapshot completeness). The
4094            // idempotent guard above means we only reach here on a
4095            // `None → Some(_)` transition; the snapshot set just records
4096            // "this node's terminal slot mutated this wave," and
4097            // `restore_wave_terminal_snapshots` (called from
4098            // `BatchGuard::drop`'s panic-discard path) sets the slot back
4099            // to `None`, releasing the ERROR-handle retain we take below
4100            // (if any) lock-released.
4101            //
4102            // Gated on `in_tick`: outside a wave (e.g. shutdown teardown)
4103            // no rollback is possible and the snapshot machinery is
4104            // unused. Mirrors the `commit_emission` gate.
4105            if self.in_tick() {
4106                crate::batch::with_wave_state(|ws| {
4107                    ws.wave_terminal_snapshots.insert(id);
4108                });
4109            }
4110            // Take a refcount share for the terminal slot so the error
4111            // handle outlives the binding-side intern's transient share.
4112            if let TerminalKind::Error(h) = t {
4113                self.binding.retain_handle(h);
4114            }
4115            // Slice E2 /qa Q2(b) (D069): if a resubscribable node is
4116            // terminating with no live subscribers, queue eager
4117            // `wipe_ctx` for the wave's lock-released drain. This is the
4118            // mutually-exclusive complement of the `Subscription::Drop`
4119            // wipe site: when the LAST sub drops first then terminate
4120            // fires, subs are empty here and we queue; when terminate
4121            // fires WITH subs still alive, we DON'T queue (subs not
4122            // empty), and `Subscription::Drop` will fire wipe directly
4123            // when those subs eventually drop. Either way, exactly one
4124            // wipe fires per terminal lifecycle.
4125            let queue_wipe = {
4126                let rec = s.require_node(id);
4127                rec.resubscribable && rec.subscribers.is_empty()
4128            };
4129            s.require_node_mut(id).terminal = Some(t);
4130            // Q-beyond Sub-slice 2 + 3 (D108, 2026-05-09): pending_fires
4131            // and pending_wipes both live on per-thread WaveState. Single
4132            // borrow handles the queue-wipe push and the pending_fires
4133            // remove.
4134            crate::batch::with_wave_state(|ws| {
4135                if queue_wipe {
4136                    ws.pending_wipes.push(id);
4137                }
4138                // Drain pending fires for this node — fn won't fire on a
4139                // terminal node.
4140                ws.pending_fires.remove(&id);
4141            });
4142            // R1.3.8.b / Slice F (A3, 2026-05-07): if this node was paused
4143            // when terminating (the canonical case is the R1.3.8.c overflow
4144            // ERROR synthesis path), drain the pause buffer and release
4145            // each payload's queue_notify-time retain. Without this, the
4146            // buffer leaks one share per buffered DATA/RESOLVED/INVALIDATE.
4147            // Subscribers receive the terminal directly via the cascade
4148            // below (tier-5 bypasses the pause buffer); the buffered
4149            // content is moot post-terminal.
4150            let drained: Vec<HandleId> = {
4151                let rec = s.require_node_mut(id);
4152                let mut drained: Vec<HandleId> = Vec::new();
4153                if rec.pause_state.is_paused() {
4154                    // Take the buffered messages out, then collapse the
4155                    // pause state to Active so subsequent code observes a
4156                    // clean lifecycle. Idempotent on Active (no-op).
4157                    let prev = std::mem::replace(&mut rec.pause_state, PauseState::Active);
4158                    if let PauseState::Paused { buffer, .. } = prev {
4159                        drained.extend(buffer.into_iter().filter_map(Message::payload_handle));
4160                    }
4161                }
4162                // QA A4 (2026-05-07): drain replay buffer on terminate. A
4163                // non-resubscribable terminal ends the lifecycle; without
4164                // this drain the buffer's retains leak until `Drop for
4165                // CoreState`. Resubscribable nodes' replay buffers are
4166                // also drained (when they're hit by a terminal cascade);
4167                // a fresh subscribe rebuilds the buffer from scratch as
4168                // part of `reset_for_fresh_lifecycle`.
4169                drained.extend(rec.replay_buffer.drain(..));
4170                drained
4171            };
4172            for h in drained {
4173                self.binding.release_handle(h);
4174            }
4175            // Queue the wire message (tier 5 — bypasses pause buffer).
4176            let msg = match t {
4177                TerminalKind::Complete => Message::Complete,
4178                TerminalKind::Error(h) => Message::Error(h),
4179            };
4180            self.queue_notify(s, id, msg);
4181            // Cascade to children.
4182            let child_ids: Vec<NodeId> = s
4183                .children
4184                .get(&id)
4185                .map(|c| c.iter().copied().collect())
4186                .unwrap_or_default();
4187            for child_id in child_ids {
4188                let dep_idx = s.require_node(child_id).dep_index_of(id);
4189                let Some(idx) = dep_idx else { continue };
4190                // D291 /qa D1 (2026-05-25): order cascade-slot mutations
4191                // `snapshot → retain → assign` to match the entry-block
4192                // discipline at the top of this loop (lines ~4067-4090).
4193                // Pre-fix the order was `assign → snapshot → retain`,
4194                // which had two latent panic-edge hazards: (a) if the
4195                // `with_wave_state` AHashSet insert panicked AFTER the
4196                // slot was already `Some(t)`, restore would silently
4197                // fail to revert (terminal-stuck); (b) if `retain_handle`
4198                // panicked AFTER both, the snapshot would record "release
4199                // h on restore" for a handle that was never retained,
4200                // double-releasing the binding-side share. The atomic
4201                // order is: idempotent guard → snapshot (records intent
4202                // to restore) → retain (takes the binding-side share) →
4203                // assign (slot owns the share). Any panic between steps
4204                // leaves a consistent invariant.
4205                //
4206                // D291 /qa D2 (2026-05-25): key the snapshot on
4207                // `(child_id, id)` — the dep's NodeId, NOT the dep_idx.
4208                // A mid-batch `Core::set_deps(child_id, …)` regenerates
4209                // `dep_records` and invalidates positional indexes;
4210                // re-keying on dep NodeId lets restore re-resolve the
4211                // (possibly-new) index via `child.dep_index_of(id)`.
4212                // Pre-fix a set_deps-mid-batch + cascade-rollback would
4213                // silently zero the WRONG dep's terminal slot (or skip
4214                // via the `get_mut(idx) → None` branch).
4215                let already_terminal = s.require_node(child_id).dep_records[idx].terminal.is_some();
4216                if already_terminal {
4217                    // Idempotent — child already saw this dep terminate.
4218                    continue;
4219                }
4220                if self.in_tick() {
4221                    crate::batch::with_wave_state(|ws| {
4222                        ws.wave_dep_terminal_snapshots.insert((child_id, id));
4223                    });
4224                }
4225                if let TerminalKind::Error(h) = t {
4226                    self.binding.retain_handle(h);
4227                }
4228                s.require_node_mut(child_id).dep_records[idx].terminal = Some(t);
4229                // Auto-cascade gating: if all deps now terminal, push child
4230                // onto the work queue with the chosen terminal.
4231                //
4232                // Slice C-1: kinds that opt out of Lock 2.B (currently
4233                // `Operator(Reduce)`) intercept upstream COMPLETE so they
4234                // can emit their accumulator before terminating. Instead of
4235                // cascading, queue the child for fn-fire — `fire_operator`
4236                // sees `dep_records[0].terminal` set and emits the
4237                // appropriate batch (Data(acc) + Complete on COMPLETE,
4238                // Error(h) on ERROR).
4239                let action = {
4240                    let child = s.require_node(child_id);
4241                    if child.terminal.is_some() {
4242                        ChildAction::None // Already terminated — no-op.
4243                    } else if child.all_deps_terminal() {
4244                        if child.skips_auto_cascade() {
4245                            ChildAction::QueueFire
4246                        } else {
4247                            ChildAction::Cascade(pick_cascade_terminal(&child.dep_records))
4248                        }
4249                    } else {
4250                        ChildAction::None
4251                    }
4252                };
4253                match action {
4254                    ChildAction::None => {}
4255                    ChildAction::Cascade(t_child) => {
4256                        work.push((child_id, t_child));
4257                    }
4258                    ChildAction::QueueFire => {
4259                        // Q-beyond Sub-slice 2 (D108, 2026-05-09):
4260                        // pending_fires lives on per-thread WaveState.
4261                        crate::batch::with_wave_state(|ws| {
4262                            ws.pending_fires.insert(child_id);
4263                        });
4264                    }
4265                }
4266            }
4267        }
4268    }
4269}
4270
4271/// Outcome of Lock 2.B child gating in `terminate_node`'s cascade walk.
4272enum ChildAction {
4273    /// No cascade; child is already terminal or not yet all-deps-terminal.
4274    None,
4275    /// Auto-cascade with the picked terminal kind (ERROR dominates COMPLETE).
4276    Cascade(TerminalKind),
4277    /// Queue child for fn-fire instead of cascading. Used by operator
4278    /// kinds that intercept upstream terminal (Operator(Reduce)).
4279    QueueFire,
4280}
4281
4282/// Lock 2.B cascade-terminal selection: ERROR dominates COMPLETE; first
4283/// ERROR seen wins. Caller has already verified all deps are terminal.
4284fn pick_cascade_terminal(dep_records: &[DepRecord]) -> TerminalKind {
4285    for dr in dep_records {
4286        if let Some(TerminalKind::Error(h)) = dr.terminal {
4287            return TerminalKind::Error(h);
4288        }
4289    }
4290    TerminalKind::Complete
4291}
4292
4293// -----------------------------------------------------------------------
4294// TEARDOWN — destruction, with auto-COMPLETE prepend (R2.6.4 / Lock 6.F)
4295// -----------------------------------------------------------------------
4296
4297impl Core {
4298    /// Tear `node_id` down. Per R2.6.4 / Lock 6.F:
4299    ///
4300    /// - **Auto-prepend COMPLETE.** If the node has not yet emitted a
4301    ///   terminal (`COMPLETE` / `ERROR`), `terminate_node` is called with
4302    ///   `Complete` first so subscribers see `[COMPLETE, TEARDOWN]`, not
4303    ///   bare `[TEARDOWN]`. This guarantees a clean end-of-stream signal
4304    ///   to async iterators and other consumers that wait on terminal
4305    ///   delivery.
4306    /// - **Idempotent on duplicate delivery.** The per-node
4307    ///   `has_received_teardown` flag is set on the first call; subsequent
4308    ///   `teardown` calls (or cascade visits from other paths) are silent
4309    ///   no-ops — no second `[COMPLETE, TEARDOWN]` pair to subscribers.
4310    /// - **Cascade downstream.** Each child is recursively torn down. The
4311    ///   child's own COMPLETE auto-cascades from `terminate_node`'s logic
4312    ///   (Lock 2.B); its TEARDOWN comes from this cascade.
4313    ///
4314    /// # Panics
4315    ///
4316    /// Panics if `node_id` is unknown.
4317    pub fn teardown(&self, node_id: NodeId) {
4318        // D274 (2026-05-21): inlined `try_teardown` + deleted the
4319        // `teardown_or_defer` shim. The
4320        // `Result<(), PartitionOrderViolation>` wrapper was deleted —
4321        // single-owner per Core post-D246; partition-order violations
4322        // cannot fire.
4323        {
4324            let s = self.lock_state();
4325            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
4326        }
4327        // D273 / Cat-3: single-owner — `Rc<RefCell<...>>` matches the
4328        // wave-confined ownership shape exactly.
4329        let torn_down: std::rc::Rc<std::cell::RefCell<Vec<NodeId>>> =
4330            std::rc::Rc::new(std::cell::RefCell::new(Vec::new()));
4331        let torn_down_for_wave = torn_down.clone();
4332        // TEARDOWN cascade follows `s.children` AND `meta_companions`
4333        // (R1.3.9.d) — meta-companions can cross partitions. Slice Y1 /
4334        // Phase E `compute_touched_partitions(node_id)` (called by
4335        // `run_wave_for`) walks both axes so the wave acquires every
4336        // partition reachable via the cascade.
4337        self.try_run_wave_for(node_id, move |this| {
4338            let mut s = this.lock_state();
4339            let collected = this.teardown_inner(&mut s, node_id);
4340            torn_down_for_wave.borrow_mut().extend(collected);
4341        });
4342        // Fire NodeTornDown for every cascaded id (root + metas +
4343        // downstream consumers that auto-cascaded). Outside the state
4344        // lock, matching fire_topology_event discipline.
4345        let ids = std::mem::take(&mut *torn_down.borrow_mut());
4346        for id in ids {
4347            self.fire_topology_event(&crate::topology::TopologyEvent::NodeTornDown(id));
4348        }
4349    }
4350
4351    /// Iterative teardown walk (Slice A-bigger, M1-close).
4352    ///
4353    /// The recursive shape was:
4354    ///   ```text
4355    ///   teardown(n):
4356    ///     if torn_down: return
4357    ///     mark torn_down
4358    ///     for meta in metas: teardown(meta)
4359    ///     terminate_node + queue Teardown
4360    ///     for child in children: teardown(child)
4361    ///   ```
4362    /// Deep linear chains (~10k nodes) overflowed the OS thread stack.
4363    ///
4364    /// The iterative shape uses a `Vec<Action>` stack with `Visit` and
4365    /// `EmitTeardown` actions. `Visit(n)` marks `n` torn-down (or no-ops
4366    /// if already), then pushes (in reverse order so LIFO pops in forward
4367    /// order) `Visit(child_K), …, Visit(child_1), EmitTeardown(n),
4368    /// Visit(meta_M), …, Visit(meta_1)`. The R1.3.9.d "metas first, then
4369    /// self, then children" ordering is preserved by the push order:
4370    /// metas pop first, recursively expand and emit; then `EmitTeardown(n)`
4371    /// pops and runs `terminate_node` + queue `Teardown`; then children
4372    /// pop. Idempotency via `has_received_teardown` keeps each node
4373    /// visited at most once even when multi-parent diamonds re-enter via
4374    /// a sibling path.
4375    fn teardown_inner(&self, s: &mut St<'_>, root: NodeId) -> Vec<NodeId> {
4376        enum Action {
4377            Visit(NodeId),
4378            EmitTeardown(NodeId),
4379        }
4380        let mut stack: Vec<Action> = vec![Action::Visit(root)];
4381        // Topology accumulator: every node that actually emits TEARDOWN
4382        // (i.e. each `EmitTeardown(id)` site, NOT each `Visit` — visits
4383        // for already-torn-down nodes short-circuit on idempotency).
4384        let mut torn_down: Vec<NodeId> = Vec::new();
4385        while let Some(action) = stack.pop() {
4386            match action {
4387                Action::Visit(id) => {
4388                    if s.require_node(id).has_received_teardown {
4389                        continue; // Idempotent (R2.6.4).
4390                    }
4391                    // D297: snapshot the pre-transition `false` state for
4392                    // wave-rollback (R4.3.2 + R2.2.7.a atomicity completeness
4393                    // for the `has_received_teardown` flag). Mirrors D291's
4394                    // `wave_terminal_snapshots.insert(id)` pattern. The
4395                    // idempotent guard above means we only reach here on a
4396                    // `false → true` transition; the snapshot set records
4397                    // "this node's teardown flag mutated this wave," and
4398                    // `restore_wave_teardown_snapshots` (called from
4399                    // `BatchGuard::drop`'s panic-discard path) sets the
4400                    // flag back to `false`, so a retry of `teardown(node)`
4401                    // post-rollback re-runs the auto-COMPLETE prepend +
4402                    // queue Teardown path (not silently no-op'd by this
4403                    // very idempotency guard).
4404                    //
4405                    // Gated on `in_tick`: outside a wave (e.g. shutdown
4406                    // teardown) no rollback is possible and the snapshot
4407                    // machinery is unused. Mirrors the D291 `in_tick` gate
4408                    // in `terminate_node`.
4409                    if self.in_tick() {
4410                        crate::batch::with_wave_state(|ws| {
4411                            ws.wave_teardown_snapshots.insert(id);
4412                        });
4413                    }
4414                    s.require_node_mut(id).has_received_teardown = true;
4415                    // Push order: children first (pop LAST), then
4416                    // EmitTeardown(id), then metas (pop FIRST). Reverse
4417                    // each list so within-group order matches the original
4418                    // recursive iteration.
4419                    let children: Vec<NodeId> = s
4420                        .children
4421                        .get(&id)
4422                        .map(|c| c.iter().copied().collect())
4423                        .unwrap_or_default();
4424                    for &child in children.iter().rev() {
4425                        stack.push(Action::Visit(child));
4426                    }
4427                    stack.push(Action::EmitTeardown(id));
4428                    let metas: Vec<NodeId> = s.require_node(id).meta_companions.clone();
4429                    for &meta in metas.iter().rev() {
4430                        stack.push(Action::Visit(meta));
4431                    }
4432                }
4433                Action::EmitTeardown(id) => {
4434                    // Auto-prepend COMPLETE if not yet terminal. The (now
4435                    // iterative) terminate_node handles auto-cascade to
4436                    // children's own terminal slots per Lock 2.B.
4437                    let already_terminal = s.require_node(id).terminal.is_some();
4438                    if !already_terminal {
4439                        self.terminate_node(s, id, TerminalKind::Complete);
4440                    }
4441                    // Wire emission of the TEARDOWN itself (tier 6).
4442                    self.queue_notify(s, id, Message::Teardown);
4443                    torn_down.push(id);
4444                }
4445            }
4446        }
4447        torn_down
4448    }
4449
4450    /// Attach `companion` as a meta companion of `parent` per R1.3.9.d.
4451    /// Meta companions are nodes whose lifecycle is bound to the parent's
4452    /// in TEARDOWN ordering: when `parent` tears down, `companion` tears
4453    /// down first.
4454    ///
4455    /// Use this for inspection / audit / sidecar nodes that subscribe to
4456    /// parent state — without the ordering, the companion could observe
4457    /// the parent mid-destruction and emit garbage.
4458    ///
4459    /// Idempotent on duplicate registration of the same companion.
4460    ///
4461    /// # Lifecycle constraint
4462    ///
4463    /// Intended for **setup-time** wiring — call this before `parent` or
4464    /// `companion` enters a wave. Mid-wave registration (especially during
4465    /// a teardown cascade in flight) is implementation-defined: the new
4466    /// edge takes effect on the *next* wave. Adding a companion to a
4467    /// torn-down parent silently no-ops (the parent will not tear down
4468    /// again). For dynamic companion attachment with deterministic
4469    /// ordering, prefer constructing the wiring before subscribers exist.
4470    ///
4471    /// # Panics
4472    ///
4473    /// Panics if either node id is unknown, or if `parent == companion`
4474    /// (a node cannot be its own meta companion — would loop on TEARDOWN).
4475    pub fn add_meta_companion(&self, parent: NodeId, companion: NodeId) {
4476        assert!(parent != companion, "node cannot be its own meta companion");
4477        // D246/S2c: single-owner ⇒ ONE shard; the prior cross-shard
4478        // `shard_of(parent) == shard_of(companion)` assert is vacuous
4479        // and deleted. D253 (S5) further deletes the §7 group-
4480        // consistency invariant on this edge (the `SchedulingGroupId`
4481        // surface is gone until M6 re-introduces it).
4482        let mut s = self.lock_state();
4483        assert!(s.nodes.contains_key(&parent), "unknown parent {parent:?}");
4484        assert!(
4485            s.nodes.contains_key(&companion),
4486            "unknown companion {companion:?}"
4487        );
4488        let metas = &mut s.require_node_mut(parent).meta_companions;
4489        if metas.contains(&companion) {
4490            return;
4491        }
4492        metas.push(companion);
4493    }
4494}
4495
4496// -----------------------------------------------------------------------
4497// INVALIDATE — cache clear + downstream cascade
4498// -----------------------------------------------------------------------
4499
4500impl Core {
4501    /// Clear `node_id`'s cache and cascade `[INVALIDATE]` to downstream
4502    /// dependents per canonical spec §1.4.
4503    ///
4504    /// Semantics:
4505    /// - **Never-populated case (R1.4 line 197):** if `cache == NO_HANDLE`,
4506    ///   the call is a no-op — no cache to clear, no INVALIDATE emitted.
4507    ///   This naturally provides idempotency within a wave: once a node has
4508    ///   been invalidated this wave (cache = NO_HANDLE), a second invalidate
4509    ///   on the same node does nothing.
4510    /// - **Cache clear (immediate):** the node's cached handle is dropped
4511    ///   (refcount released), `cache` becomes `NO_HANDLE`. State nodes
4512    ///   keep `has_fired_once` per spec — INVALIDATE is not a re-gating
4513    ///   event (the next emission to a previously-fired state still does
4514    ///   not re-trigger the first-run gate; that's a resubscribable-terminal
4515    ///   lifecycle concern, separate slice).
4516    /// - **Wire emission (tier 4):** `[INVALIDATE]` is queued via the
4517    ///   normal pause-aware notify path. Buffers while paused, flushes
4518    ///   immediately otherwise.
4519    /// - **Downstream cascade:** for each child of this node, the child's
4520    ///   `dep_handles[idx_of_node]` is reset to `NO_HANDLE` (its previous
4521    ///   value referenced a now-released handle). The child is then
4522    ///   recursively invalidated (no-op if its cache was already
4523    ///   `NO_HANDLE`). This re-closes the child's first-run gate — fn
4524    ///   won't fire again until the upstream re-emits a value.
4525    ///
4526    /// Wraps in a fresh wave when called from outside a wave, so
4527    /// notifications flush at the natural wave boundary.
4528    ///
4529    /// # Panics
4530    ///
4531    /// Panics if `node_id` is unknown, consistent with `emit` / `pause`.
4532    pub fn invalidate(&self, node_id: NodeId) {
4533        {
4534            let s = self.lock_state();
4535            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
4536        }
4537        // INVALIDATE cascade follows `s.children` (in-partition by union-
4538        // find construction). Slice Y1 / Phase E.
4539        self.run_wave_for(node_id, |this| {
4540            let mut s = this.lock_state();
4541            this.invalidate_inner(&mut s, node_id);
4542        });
4543    }
4544
4545    // D274 (2026-05-21): `invalidate_or_defer` was deleted — it was an
4546    // immediate-run no-op shim around `invalidate` whose `Err` path was
4547    // never reachable post-D248/D253/D255. No external callers existed.
4548    // Callers go directly to `Core::invalidate`.
4549
4550    /// Iterative invalidate cascade (Slice A-bigger, M1-close).
4551    ///
4552    /// The recursive shape was a depth-first cache-clear walk:
4553    ///   ```text
4554    ///   invalidate(n):
4555    ///     if cache(n) == NO_HANDLE: return  // already-invalidated guard
4556    ///     cache(n) = NO_HANDLE; release handle
4557    ///     queue Invalidate(n)
4558    ///     for child in children:
4559    ///       child.dep_handles[idx] = NO_HANDLE
4560    ///       invalidate(child)
4561    ///   ```
4562    /// Deep linear chains overflowed the OS thread stack. The work-queue
4563    /// rewrite has no ordering subtleties (unlike teardown's R1.3.9.d
4564    /// metas-first constraint) — Invalidate is a tier-4 broadcast where
4565    /// the never-populated / already-invalidated guard provides natural
4566    /// idempotency for diamond fan-in.
4567    fn invalidate_inner(&self, s: &mut St<'_>, root: NodeId) {
4568        let mut work: Vec<NodeId> = vec![root];
4569        while let Some(node_id) = work.pop() {
4570            // Never-populated / already-invalidated: no-op (R1.4 idempotency).
4571            // Per R1.3.9.c never-populated case, OnInvalidate cleanup hook
4572            // also does NOT fire — natural fallout of skipping via the
4573            // cache==NO_HANDLE guard (we never reach the queue-push below).
4574            let old_handle = s.require_node(node_id).cache;
4575            if old_handle == NO_HANDLE {
4576                continue;
4577            }
4578            // Clear cache + release the handle's slot ownership.
4579            s.require_node_mut(node_id).cache = NO_HANDLE;
4580            self.binding.release_handle(old_handle);
4581            // Slice E2 (R1.3.9.b strict per D057 + D058 fire-at-cache-clear):
4582            // queue OnInvalidate cleanup hook for lock-released drain at
4583            // wave-end. The dedup set guarantees at-most-once-per-wave-per-
4584            // node firing even if a node re-populates mid-wave (via fn-fire
4585            // emit) and gets re-invalidated through a separate path. Pure
4586            // cache==NO_HANDLE idempotency (above) catches "still at
4587            // sentinel" only; the explicit set is the strict R1.3.9.b
4588            // reading.
4589            // Q-beyond Sub-slice 3 (D108, 2026-05-09):
4590            // `invalidate_hooks_fired_this_wave` and
4591            // `deferred_cleanup_hooks` both live on per-thread WaveState.
4592            // Single borrow handles the dedup-insert and (on first
4593            // insertion) the cleanup-hook push.
4594            crate::batch::with_wave_state(|ws| {
4595                if ws.invalidate_hooks_fired_this_wave.insert(node_id) {
4596                    ws.deferred_cleanup_hooks
4597                        .push((node_id, CleanupTrigger::OnInvalidate));
4598                }
4599            });
4600            // Wire emission. Pause-aware via queue_notify.
4601            self.queue_notify(s, node_id, Message::Invalidate);
4602            // Cascade: for each child, clear the dep record's prev_data
4603            // referencing this node and push child onto the work queue.
4604            let child_ids: Vec<NodeId> = s
4605                .children
4606                .get(&node_id)
4607                .map(|c| c.iter().copied().collect())
4608                .unwrap_or_default();
4609            for child_id in child_ids {
4610                let dep_idx = s.require_node(child_id).dep_index_of(node_id);
4611                if let Some(idx) = dep_idx {
4612                    // Reset the child's dep record — the handle was just
4613                    // released. Subsequent first-run-gate checks see
4614                    // sentinel and re-close.
4615                    //
4616                    // Snapshot prev_data + data_batch retains for deferred
4617                    // release, then clear the record. Two-phase to satisfy
4618                    // the borrow checker (nodes + deferred_handle_releases
4619                    // are separate CoreState fields).
4620                    let (old_prev, batch_hs): (HandleId, SmallVec<[HandleId; 1]>) = {
4621                        let dr = &s.require_node(child_id).dep_records[idx];
4622                        (dr.prev_data, dr.data_batch.clone())
4623                    };
4624                    {
4625                        // Q-beyond Sub-slice 1 (D108, 2026-05-09):
4626                        // deferred_handle_releases moved to per-thread
4627                        // WaveState thread_local. State lock held; the
4628                        // thread_local borrow is independent.
4629                        crate::batch::with_wave_state(|ws| {
4630                            if old_prev != NO_HANDLE {
4631                                ws.deferred_handle_releases.push(old_prev);
4632                            }
4633                            for h in batch_hs {
4634                                ws.deferred_handle_releases.push(h);
4635                            }
4636                        });
4637                    }
4638                    let child_rec = s.require_node_mut(child_id);
4639                    child_rec.dep_records[idx].prev_data = NO_HANDLE;
4640                    child_rec.dep_records[idx].data_batch.clear();
4641                    // §10.13 perf (D047): clear received_mask bit so
4642                    // has_sentinel_deps() re-closes the first-run gate.
4643                    if idx < 64 {
4644                        child_rec.received_mask &= !(1u64 << idx);
4645                    }
4646                    work.push(child_id);
4647                }
4648            }
4649        }
4650    }
4651}
4652
4653// -----------------------------------------------------------------------
4654// PAUSE / RESUME — multi-pauser lockset + replay buffer
4655// -----------------------------------------------------------------------
4656
4657/// Reported back from [`Core::resume`] when the final lock releases.
4658///
4659/// `replayed` is the number of tier-3/tier-4 messages dispatched to
4660/// subscribers as part of the drain. `dropped` is the number of messages
4661/// that fell out the front of the buffer due to the Core-global
4662/// `pause_buffer_cap` while this pause cycle was active. A non-zero
4663/// `dropped` indicates a controller held the lock long enough to overflow
4664/// the cap; the binding may want to surface a warning or error.
4665#[derive(Copy, Clone, Debug, PartialEq, Eq)]
4666pub struct ResumeReport {
4667    pub replayed: u32,
4668    pub dropped: u32,
4669}
4670
4671impl Core {
4672    /// Acquire a pause lock on `node_id`. The first lock transitions the
4673    /// node from `Active` to `Paused`; further locks add to the lockset.
4674    /// While paused, tier-3 (DATA/RESOLVED) and tier-4 (INVALIDATE) outgoing
4675    /// messages buffer in the node's pause buffer; other tiers flush
4676    /// immediately.
4677    ///
4678    /// Re-acquiring the same `lock_id` is an idempotent no-op (matches TS
4679    /// convention, R1.2.6 silent on the case).
4680    pub fn pause(&self, node_id: NodeId, lock_id: LockId) -> Result<(), PauseError> {
4681        let mut s = self.lock_state();
4682        let rec = s
4683            .nodes
4684            .get_mut(&node_id)
4685            .ok_or(PauseError::UnknownNode(node_id))?;
4686        // QA A5 (2026-05-07): terminated nodes can't be re-paused. Without
4687        // this check, a stale pause-controller calling pause() on an
4688        // already-terminated node would re-arm `pause_state` to Paused.
4689        // The terminate_node path collapses pause_state → Active and
4690        // drains the buffer (A3-related), but doesn't gate subsequent
4691        // pause() calls. Treat as idempotent no-op (consistent with how
4692        // emit/complete/error early-return on terminal).
4693        if rec.terminal.is_some() {
4694            return Ok(());
4695        }
4696        // Slice F audit close (2026-05-07): `PausableMode::Off` means the
4697        // dispatcher ignores PAUSE for this node — tier-3 flushes
4698        // immediately, fn fires immediately. Treat the call as a successful
4699        // no-op so callers don't need to special-case.
4700        if rec.pausable == PausableMode::Off {
4701            return Ok(());
4702        }
4703        rec.pause_state.add_lock(lock_id);
4704        Ok(())
4705    }
4706
4707    /// Release a pause lock on `node_id`. If the lockset becomes empty, the
4708    /// node transitions back to `Active` and the buffered messages are
4709    /// dispatched to subscribers in arrival order. Returns a [`ResumeReport`]
4710    /// when the final lock released; `None` if the lockset is still
4711    /// non-empty (further locks held).
4712    ///
4713    /// Releasing an unknown `lock_id` (or releasing on an already-Active
4714    /// node) is an idempotent no-op returning `None`.
4715    pub fn resume(
4716        &self,
4717        node_id: NodeId,
4718        lock_id: LockId,
4719    ) -> Result<Option<ResumeReport>, PauseError> {
4720        // Phase 1 (lock-held): collect drained buffer + pending-wave flag +
4721        // sink Arcs. For default-mode nodes whose `pending_wave` was set
4722        // during pause, schedule a single fn-fire by adding to
4723        // `pending_fires` BEFORE we exit the lock — the wave engine picks
4724        // it up on the next drain tick.
4725        let (sinks, messages, dropped, pending_wave_for_default) = {
4726            let mut s = self.lock_state();
4727            let rec = s
4728                .nodes
4729                .get_mut(&node_id)
4730                .ok_or(PauseError::UnknownNode(node_id))?;
4731            // For Off mode, pause/resume are no-ops by construction.
4732            if rec.pausable == PausableMode::Off {
4733                return Ok(None);
4734            }
4735            let was_default_mode = rec.pausable == PausableMode::Default;
4736            // Capture pending_wave BEFORE remove_lock collapses the state.
4737            let pending_wave = if was_default_mode {
4738                rec.pause_state.take_pending_wave()
4739            } else {
4740                false
4741            };
4742            let Some((buffer, dropped)) = rec.pause_state.remove_lock(lock_id) else {
4743                // Not the final-resume — restore the pending_wave flag we
4744                // tentatively cleared, since we're not transitioning to
4745                // Active yet.
4746                if pending_wave {
4747                    rec.pause_state.mark_pending_wave();
4748                }
4749                return Ok(None);
4750            };
4751            let sinks: Vec<Sink> = rec.subscribers.values().cloned().collect();
4752            let messages: Vec<Message> = buffer.into_iter().collect();
4753            // Default-mode pending-wave handling: schedule the fn-fire so
4754            // the wave engine consolidates the pause-window dep deliveries
4755            // into one fn execution. State nodes don't fire fn (no
4756            // `pending_fires` membership has effect for them).
4757            //
4758            // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
4759            // on per-thread WaveState.
4760            if pending_wave && was_default_mode {
4761                crate::batch::with_wave_state(|ws| {
4762                    ws.pending_fires.insert(node_id);
4763                });
4764            }
4765            (sinks, messages, dropped, pending_wave && was_default_mode)
4766        };
4767        let replayed = u32::try_from(messages.len()).unwrap_or(u32::MAX);
4768
4769        // Phase 2 (lock-released): fire sinks for ResumeAll-buffered
4770        // messages. Default-mode resume produces no buffered replay (the
4771        // consolidated fn-fire produces fresh wave traffic via the standard
4772        // commit_emission path).
4773        if !messages.is_empty() {
4774            for sink in &sinks {
4775                sink(&messages);
4776            }
4777            // Phase 3: balance the retain_handle calls done at buffer-push
4778            // time — sinks observe values but don't own refcount shares.
4779            for msg in &messages {
4780                if let Some(h) = msg.payload_handle() {
4781                    self.binding.release_handle(h);
4782                }
4783            }
4784        }
4785
4786        // Phase 4 (default-mode): drain the consolidated fn-fire scheduled
4787        // in Phase 1. `run_wave_for(node_id)` acquires the partitions
4788        // touched from `node_id` (Slice Y1 / Phase E) and runs the standard
4789        // drain pipeline; the new fn-fire emerges as a normal wave's worth
4790        // of messages to subscribers.
4791        if pending_wave_for_default {
4792            self.run_wave_for(node_id, |_this| {
4793                // The pending_fires entry was pushed in Phase 1 under the
4794                // lock. run_wave's drain picks it up.
4795            });
4796        }
4797        Ok(Some(ResumeReport { replayed, dropped }))
4798    }
4799
4800    /// True if the node currently holds at least one pause lock.
4801    #[must_use]
4802    pub fn is_paused(&self, node_id: NodeId) -> bool {
4803        self.lock_state()
4804            .require_node(node_id)
4805            .pause_state
4806            .is_paused()
4807    }
4808
4809    /// Number of pause locks currently held on `node_id`. `0` if Active.
4810    #[must_use]
4811    pub fn pause_lock_count(&self, node_id: NodeId) -> usize {
4812        self.lock_state()
4813            .require_node(node_id)
4814            .pause_state
4815            .lock_count()
4816    }
4817
4818    /// Test helper: whether `node_id` currently holds the given `lock_id`.
4819    #[must_use]
4820    pub fn holds_pause_lock(&self, node_id: NodeId, lock_id: LockId) -> bool {
4821        self.lock_state()
4822            .require_node(node_id)
4823            .pause_state
4824            .contains_lock(lock_id)
4825    }
4826}
4827
4828// -----------------------------------------------------------------------
4829// set_deps — atomic dep mutation
4830// -----------------------------------------------------------------------
4831
4832/// Errors returnable by [`Core::set_deps`].
4833///
4834/// Per `~/src/graphrefly-ts/docs/research/rewire-design-notes.md` and the
4835/// Phase 13.8 Q1 lock:
4836/// - `SelfDependency` — `n in newDeps` (self-loops are pathological without
4837///   explicit fixed-point semantics, which GraphReFly does not provide).
4838/// - `WouldCreateCycle { path }` — adding the new edge would create a cycle.
4839///   The `path` field reports the offending dep chain for debuggability.
4840/// - `UnknownNode` / `NotComputeNode` — invariant violations from the caller.
4841/// - `TerminalNode` — `n` itself has emitted COMPLETE/ERROR; rewiring a
4842///   terminal stream is a category error (terminal is one-shot at this
4843///   layer; recovery is the resubscribable path on a fresh subscribe).
4844/// - `TerminalDep` — a newly-added dep is terminal AND not resubscribable.
4845///   Resubscribable terminal deps are accepted because the subscribe path
4846///   resets their lifecycle. Non-resubscribable terminal deps would deliver
4847///   their already-emitted terminal directly to `n`'s `dep_terminals` slot,
4848///   which is rarely intended.
4849#[derive(Error, Debug, Clone, PartialEq)]
4850pub enum SetDepsError {
4851    /// `n` appeared in `new_deps` (self-loop rejection).
4852    #[error("set_deps({n:?}, ...): self-dependency rejected (n appeared in new_deps)")]
4853    SelfDependency { n: NodeId },
4854
4855    /// Adding the new dep would create a cycle. `path` is the chain
4856    /// `[added_dep, ..., n]` reachable via existing deps.
4857    #[error(
4858        "set_deps({n:?}, ...): cycle would form via path {path:?} \
4859         (adding {added_dep:?} → {n:?} closes the loop)"
4860    )]
4861    WouldCreateCycle {
4862        n: NodeId,
4863        added_dep: NodeId,
4864        path: Vec<NodeId>,
4865    },
4866
4867    #[error("set_deps: unknown node {0:?}")]
4868    UnknownNode(NodeId),
4869
4870    #[error("set_deps: node {0:?} is not a compute node (state nodes have no deps)")]
4871    NotComputeNode(NodeId),
4872
4873    /// `n` itself has terminated (COMPLETE / ERROR). Rewiring a terminal node
4874    /// is rejected — the stream has ended at this layer. To recover, mark
4875    /// the node resubscribable before terminate; a fresh subscribe will then
4876    /// reset its lifecycle.
4877    #[error("set_deps({n:?}, ...): node has already terminated; cannot rewire a terminal node")]
4878    TerminalNode { n: NodeId },
4879
4880    /// A newly-added dep is terminal AND non-resubscribable. Per Phase 13.8
4881    /// Q1, this is rejected; resubscribable terminal deps are allowed
4882    /// because the subscribe path resets them when activated. Already-present
4883    /// terminal deps are unaffected (their terminal status was accepted at
4884    /// the time they terminated).
4885    #[error(
4886        "set_deps({n:?}, ...): added dep {dep:?} is terminal and not resubscribable; \
4887         either mark it resubscribable before terminate, or remove the dep from new_deps"
4888    )]
4889    TerminalDep { n: NodeId, dep: NodeId },
4890
4891    /// `n` itself is currently mid-fire — a user fn for `n` re-entered Core
4892    /// via `set_deps(n, ...)` from inside `n`'s own `invoke_fn` /
4893    /// `project_each` / `predicate_each` / etc. Phase 1 of the dispatcher
4894    /// snapshotted `dep_handles` BEFORE the lock-released callback; the
4895    /// callback returning a `tracked` set indexed against THAT ordering
4896    /// would corrupt indices if the rewire re-orders deps mid-fire.
4897    /// Rejected to preserve the dynamic-tracked-indices invariant (D1).
4898    ///
4899    /// Workaround: schedule the rewire from a different node's fn (via
4900    /// `Core::emit` on a state node and observing the emit downstream),
4901    /// or perform the rewire after the wave completes (e.g. from a sink
4902    /// callback that is itself outside any fn-fire scope).
4903    ///
4904    /// Slice F (2026-05-07) — A6.
4905    #[error(
4906        "set_deps({n:?}, ...): rejected — node {n:?} is currently mid-fire \
4907         (set_deps from inside the firing node's own fn would corrupt the \
4908         Dynamic `tracked` indices snapshot taken before invoke_fn). \
4909         Schedule the rewire outside this fire scope."
4910    )]
4911    ReentrantOnFiringNode { n: NodeId },
4912    // /qa m1 (2026-05-19): the vestigial `PartitionMigrationDuringFire`
4913    // variant — kept across §7/D208–D211 + D253 (S5) as a downstream-
4914    // churn courtesy for `Err(_)` match arms — is now deleted. D253
4915    // removed the entire scheduling-group identity surface
4916    // (`SchedulingGroupId`, `node_group` index,
4917    // `set_scheduling_group`/`partition_of`/`group_of`), so the
4918    // partition-migration concept is gone too. No surviving caller
4919    // pattern-matches the variant (verified by grep; the only
4920    // reference was a TRASH/scheduling_groups.rs comment).
4921}
4922
4923// D246/S2c: `MigrationRollback` + `MovedComponent` deleted — there is
4924// no cross-shard extract→reinsert (single shard always under
4925// single-owner). D253 (S5) further deletes `SetGroupError` +
4926// `Core::set_scheduling_group` entirely — the declared-group identity
4927// surface (`SchedulingGroupId`, `node_group`, `partition_of`/`group_of`)
4928// is removed until M6 re-introduces it with M6's actual scheduling
4929// needs in view.
4930
4931impl Core {
4932    /// Atomic dep mutation — change a node's upstream deps without TEARDOWN
4933    /// cascading and without losing cache.
4934    ///
4935    /// Per the TLA+-verified design at
4936    /// `~/src/graphrefly-ts/docs/research/wave_protocol_rewire.tla`
4937    /// (35,950 distinct states, all 7 invariants clean):
4938    ///
4939    /// - Removed deps: clear dirtyMask bit, drain pending queue, drop DepRecord.
4940    /// - Added deps: SENTINEL prevData; push-on-subscribe if added dep has cached DATA.
4941    /// - Preserved: `firstRunPassed`, `pauseLocks`, `pauseBuffer`, `cache` (ROM/RAM).
4942    /// - Status auto-settles if dirtyMask becomes empty.
4943    /// - Idempotent on `new_deps == current deps`.
4944    /// - Self-rewire `n ∈ new_deps` rejected (`SelfDependency`).
4945    /// - Cycles rejected (`WouldCreateCycle`).
4946    /// - Allowed mid-wave + while paused.
4947    /// - Phase 13.8 Q1: terminal `n` rejected (`TerminalNode`); newly-added
4948    ///   terminal non-resubscribable deps rejected (`TerminalDep`).
4949    ///
4950    /// The body is a single atomic dep-mutation transaction with several
4951    /// discrete validation stages. Splitting would require passing a
4952    /// partially-mutable CoreState across helpers, and the transaction's
4953    /// locality is what makes the F1 refcount-leak collection work.
4954    #[allow(clippy::too_many_lines)]
4955    pub fn set_deps(&self, n: NodeId, new_deps: &[NodeId]) -> Result<(), SetDepsError> {
4956        // 2b-ii-B (D220-EXEC): route to the node's shard; the new deps
4957        // are in the same component ⇒ same group ⇒ same shard (the
4958        // component-consistency check enforces). See `try_emit`.
4959        let mut s = self.lock_state();
4960        // Validate node exists and is compute. Read-once via the helper so
4961        // subsequent code can use `require_node(n)` without re-checking.
4962        let (is_state, is_producer, is_terminal) = {
4963            let rec = s.nodes.get(&n).ok_or(SetDepsError::UnknownNode(n))?;
4964            (rec.is_state(), rec.is_producer(), rec.terminal.is_some())
4965        };
4966        if is_state || is_producer {
4967            // State and Producer nodes have no declared deps — set_deps
4968            // is meaningless. Producer nodes manage their own subscriptions
4969            // through the binding's ProducerCtx; mutating their (empty)
4970            // dep set would not affect that.
4971            return Err(SetDepsError::NotComputeNode(n));
4972        }
4973        // Reject if `n` itself is terminal (Phase 13.8 Q1: terminal nodes
4974        // cannot be rewired; recovery is via resubscribable subscribe).
4975        if is_terminal {
4976            return Err(SetDepsError::TerminalNode { n });
4977        }
4978        // A6 reentrancy guard (Slice F, 2026-05-07): reject if `n` is
4979        // mid-fire. Closes the D1 hazard where `Phase 1` snapshotted
4980        // `dep_handles` against pre-rewire dep ordering and `Phase 3`
4981        // would store the returned `tracked` indices against post-rewire
4982        // ordering.
4983        //
4984        // D250 (S4, 2026-05-19): post-D248/D249 single-owner the only
4985        // historical *trigger* for this guard — a synchronous `set_deps`
4986        // re-entry from inside `n`'s own fn-fire via the binding-holds-
4987        // cloned-Core mechanism — is structurally deleted (`invoke_fn`
4988        // has no `&Core`; `set_deps` is not on `CoreFull`/`MailboxOp`; a
4989        // Defer runs owner-side AFTER the wave settles with
4990        // `currently_firing` empty). The deleted cross-thread P13 path
4991        // (Thread B's set_deps observing Thread A's lock-released
4992        // invoke_fn pushes) is likewise gone — Core is `!Send`, one
4993        // owner thread. The guard is therefore **defensive**: it stays
4994        // live so a future owner-side mid-wave self-rewire seam (or any
4995        // re-entry that lands an owner inside the firing set) is
4996        // rejected fail-loud rather than corrupting tracked indices. The
4997        // **Guard preserved for future owner-side mid-wave self-rewire
4998        // seam** (S4-horizon work). No test exercises this code path
4999        // today — the pre-D221 binding-holds-cloned-Core trigger was
5000        // deleted in D246 and the test stub was removed at S6
5001        // (2026-05-20). **Do NOT delete this guard as "dead code"**:
5002        // removing it weakens the invariant that protects
5003        // `dep_records` indices against a future seam that lands an
5004        // owner inside `currently_firing`. If `CoreFull`/`MailboxOp`
5005        // is ever widened with a `set_deps`-equivalent for a real
5006        // consumer, write a FRESH test against the new seam.
5007        // `currently_firing` lives on `CoreShared`, read under the
5008        // already-held state lock (under D246/D248 single-owner the
5009        // borrow is *structural* rather than concurrency-required —
5010        // `CoreShared` is owner-thread-only).
5011        if s.shared.currently_firing.contains(&n) {
5012            return Err(SetDepsError::ReentrantOnFiringNode { n });
5013        }
5014        // Self-rewire rejection.
5015        if new_deps.contains(&n) {
5016            return Err(SetDepsError::SelfDependency { n });
5017        }
5018        // Validate all new deps exist.
5019        for &d in new_deps {
5020            if !s.nodes.contains_key(&d) {
5021                return Err(SetDepsError::UnknownNode(d));
5022            }
5023        }
5024        // Cycle detection: data flows parent → child via the `children` map.
5025        // Adding edge `d → n` (d becomes a dep of n) creates a cycle iff
5026        // `d` is already reachable from `n` via existing data-flow edges
5027        // (so `n → ... → d` exists, and the new `d → n` closes the loop).
5028        // DFS from `n` along `children` edges, looking for each added dep.
5029        let current_deps: HashSet<NodeId> = s.require_node(n).dep_ids().collect();
5030        let new_deps_set: HashSet<NodeId> = new_deps.iter().copied().collect();
5031        let added: HashSet<NodeId> = new_deps_set.difference(&current_deps).copied().collect();
5032        for &d in &added {
5033            if let Some(path) = self.path_from_to(&s, n, d) {
5034                return Err(SetDepsError::WouldCreateCycle {
5035                    n,
5036                    added_dep: d,
5037                    path,
5038                });
5039            }
5040        }
5041        // Phase 13.8 Q1: reject newly-added deps that are terminal AND not
5042        // resubscribable. Resubscribable terminal deps are allowed — the
5043        // subscribe path resets their lifecycle when something activates
5044        // them. Already-present (kept) deps are unaffected; their terminal
5045        // status was accepted at the time they terminated.
5046        for &d in &added {
5047            let dep_rec = s.require_node(d);
5048            if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
5049                return Err(SetDepsError::TerminalDep { n, dep: d });
5050            }
5051        }
5052        // Compute `removed` early (Phase F: needs to be available for P13
5053        // split-case widening below). Idempotent fast-path moved below the
5054        // P13 check accordingly.
5055        let removed: HashSet<NodeId> = current_deps.difference(&new_deps_set).copied().collect();
5056
5057        // Idempotent fast-path: no add/remove ⇒ no-op. D253 (S5) deletes
5058        // the prior §7 None/Some-mix endpoint check that lived above —
5059        // the declared-group identity surface is gone until M6.
5060        if added.is_empty() && removed.is_empty() {
5061            return Ok(());
5062        }
5063
5064        // Snapshot old deps (ordered) for topology event, before mutation.
5065        let old_deps_vec: Vec<NodeId> = s.require_node(n).dep_ids_vec();
5066
5067        // Carry out the rewire atomically.
5068        // 1. Build new dep_records, preserving DepRecord state for kept deps.
5069        let new_deps_vec: Vec<NodeId> = new_deps.to_vec();
5070        //
5071        // Refcount discipline (F1 audit fix): each `Some(TerminalKind::Error(h))`
5072        // slot owns a refcount share retained at `terminate_node` time. When a
5073        // dep is REMOVED, its slot is dropped — the corresponding handle's
5074        // share must be released here, otherwise it leaks until Core drop.
5075        // Also release data_batch retains for removed deps.
5076        let (new_dep_records, removed_handles): (Vec<DepRecord>, Vec<HandleId>) = {
5077            let rec = s.require_node(n);
5078            // Index old dep_records by NodeId for O(1) lookup of kept deps.
5079            let old_by_node: HashMap<NodeId, &DepRecord> =
5080                rec.dep_records.iter().map(|dr| (dr.node, dr)).collect();
5081            let new_records: Vec<DepRecord> = new_deps_vec
5082                .iter()
5083                .map(|&d| {
5084                    if let Some(old) = old_by_node.get(&d) {
5085                        // Kept dep: preserve all state (prev_data, data_batch,
5086                        // terminal, wave flags). Subscriptions stay live.
5087                        DepRecord {
5088                            node: d,
5089                            prev_data: old.prev_data,
5090                            dirty: old.dirty,
5091                            involved_this_wave: old.involved_this_wave,
5092                            data_batch: old.data_batch.clone(),
5093                            terminal: old.terminal,
5094                        }
5095                    } else {
5096                        // Added dep: fresh sentinel record.
5097                        DepRecord::new(d)
5098                    }
5099                })
5100                .collect();
5101            // Collect handles to release from REMOVED dep records.
5102            let mut to_release: Vec<HandleId> = Vec::new();
5103            for d in &removed {
5104                if let Some(old) = old_by_node.get(d) {
5105                    if let Some(TerminalKind::Error(h)) = old.terminal {
5106                        to_release.push(h);
5107                    }
5108                    // Release data_batch retains for removed deps.
5109                    for &h in &old.data_batch {
5110                        to_release.push(h);
5111                    }
5112                }
5113            }
5114            (new_records, to_release)
5115        };
5116        // Clear dirtyMask bit by re-emitting the wave-bookkeeping: we don't
5117        // currently model a per-dep dirtyMask explicitly (we use the boolean
5118        // `dirty` flag at node level). Removing a dep's entry from the implicit
5119        // mask is therefore implicit — by removing the dep, future emissions
5120        // from it can't re-arm the bit. The per-dep `involved_this_wave` flag
5121        // stays wave-scoped and gets cleared at wave end. The setDeps action
5122        // itself does NOT change the dirty boolean unless all deps are cleared;
5123        // in that case we settle.
5124        // Slice E2 (D067): on a dynamic node that had previously fired its
5125        // fn, capture `has_fired_once` BEFORE the reset so we can fire
5126        // `OnRerun` cleanup lock-released after `drop(s)` below. Without
5127        // this, the next `fire_regular` Phase 1 would capture
5128        // `has_fired_once = false`, causing Phase 1.5 to skip OnRerun —
5129        // silently dropping the prior activation's cleanup closure when
5130        // the next `invoke_fn` overwrites `current_cleanup`. Per spec
5131        // R2.4.5, `set_deps` does NOT end the activation cycle
5132        // (subscribe→unsubscribe is the cycle boundary), so OnRerun must
5133        // fire on every re-fire including post-set_deps.
5134        // §10 perf (D047): compute new topo_rank before the mutable
5135        // borrow on rec, since we need to read other nodes' depths.
5136        let new_topo_rank = if new_deps_vec.is_empty() {
5137            0
5138        } else {
5139            new_deps_vec
5140                .iter()
5141                .filter(|&&d| d != n)
5142                .filter_map(|&d| s.nodes.get(&d).map(|r| r.topo_rank))
5143                .max()
5144                .unwrap_or(0)
5145                .saturating_add(1)
5146        };
5147        let fire_set_deps_on_rerun;
5148        let old_topo_rank;
5149        {
5150            let rec = s.require_node_mut(n);
5151            fire_set_deps_on_rerun = rec.is_dynamic && rec.has_fired_once;
5152            old_topo_rank = rec.topo_rank;
5153            rec.dep_records = new_dep_records;
5154            rec.topo_rank = new_topo_rank;
5155            // §10.13 perf (D047): recompute received_mask from new dep_records.
5156            // §10.3 perf (Slice V1): recompute involved_mask alongside.
5157            rec.received_mask = 0;
5158            rec.involved_mask = 0;
5159            for (i, dr) in rec.dep_records.iter().enumerate() {
5160                if i < 64 {
5161                    if dr.prev_data != NO_HANDLE || !dr.data_batch.is_empty() {
5162                        rec.received_mask |= 1u64 << i;
5163                    }
5164                    if dr.involved_this_wave {
5165                        rec.involved_mask |= 1u64 << i;
5166                    }
5167                }
5168            }
5169            // Re-derive `tracked` for static derived: all indices.
5170            // For dynamic: clear `tracked` AND reset `has_fired_once` so the
5171            // next dep delivery satisfies the first-fire branch in
5172            // `deliver_data_to_consumer` (`!has_fired_once || tracked.contains(...)`).
5173            // Without resetting `has_fired_once`, the cleared `tracked` blocks
5174            // every future fire — fn never re-runs and the dynamic node sits
5175            // on stale cache derived from the old dep set. The next fire
5176            // re-runs fn unconditionally; fn's returned `tracked` then
5177            // repopulates `rec.tracked` and normal selective-deps semantics
5178            // resume from the next dep update onward.
5179            if rec.is_dynamic {
5180                rec.tracked.clear();
5181                rec.has_fired_once = false;
5182            } else {
5183                // Derived (static) and Operator track all deps.
5184                rec.tracked = (0..new_deps_vec.len()).collect();
5185            }
5186        }
5187
5188        // 2. Update inverted-edge map (children).
5189        for &removed_dep in &removed {
5190            if let Some(set) = s.children.get_mut(&removed_dep) {
5191                set.remove(&n);
5192            }
5193        }
5194        for &added_dep in &added {
5195            s.children.entry(added_dep).or_default().insert(n);
5196        }
5197
5198        // 2b. (D300, Q3 — 2026-05-26) topo_rank consumer cascade. When
5199        // n's topo_rank changes via set_deps, downstream consumers
5200        // inherit a stale rank — `pick_next_fire`'s O(|pending_fires|)
5201        // min-scan by `topo_rank` (Slice U) would order them
5202        // incorrectly, producing one extra no-op fire that settles on
5203        // the next wave. BFS through s.children, recompute each
5204        // visited consumer's `topo_rank` from its current deps; cascade
5205        // further if it changed. Worst case O(V) for the connected
5206        // downstream component; common case bounded by topology depth.
5207        //
5208        // Per the verbatim `/porting-to-rs clear all the perf items
5209        // and the deferred items. regardless if they need a valid
5210        // consumer demand` directive (user-locked 2026-05-26 override
5211        // of D196 + the prior Slice U "set_deps is rarely called; lift
5212        // if hot path" defer-rationale at `porting-deferred.md` Slice
5213        // U known limitations).
5214        if old_topo_rank != new_topo_rank {
5215            // Seed with direct children of n. Self-edges are
5216            // structurally rejected at register / set_deps, so we
5217            // don't need an explicit self-loop guard (mirrors the
5218            // n's-own-rank computation above which uses the same
5219            // `filter(|&&d| d != n)` pattern as defense-in-depth).
5220            let mut worklist: VecDeque<NodeId> = s
5221                .children
5222                .get(&n)
5223                .map(|set| set.iter().copied().collect())
5224                .unwrap_or_default();
5225            let mut visited: HashSet<NodeId> = HashSet::default();
5226            while let Some(child_id) = worklist.pop_front() {
5227                if !visited.insert(child_id) {
5228                    continue;
5229                }
5230                // Read child's current deps + old rank.
5231                let (child_new_rank, child_old_rank) = {
5232                    let Some(child_rec) = s.nodes.get(&child_id) else {
5233                        continue;
5234                    };
5235                    let new_rank = child_rec
5236                        .dep_records
5237                        .iter()
5238                        .filter(|dr| dr.node != child_id)
5239                        .filter_map(|dr| s.nodes.get(&dr.node).map(|r| r.topo_rank))
5240                        .max()
5241                        .unwrap_or(0)
5242                        .saturating_add(1);
5243                    (new_rank, child_rec.topo_rank)
5244                };
5245                if child_new_rank != child_old_rank {
5246                    if let Some(child_rec) = s.nodes.get_mut(&child_id) {
5247                        child_rec.topo_rank = child_new_rank;
5248                    }
5249                    // Cascade to grandchildren — only when rank actually
5250                    // changed (early-exit prunes the BFS on the common
5251                    // case where downstream ranks were already correct
5252                    // because n's change was absorbed by another dep
5253                    // with equal-or-greater rank).
5254                    if let Some(grandchildren) = s.children.get(&child_id) {
5255                        for &gc in grandchildren {
5256                            if !visited.contains(&gc) {
5257                                worklist.push_back(gc);
5258                            }
5259                        }
5260                    }
5261                }
5262            }
5263        }
5264
5265        // 3. Push-on-subscribe for added deps with cached DATA. Wraps in a
5266        // wave so any downstream propagation runs cleanly. We capture only
5267        // the LIST of added deps (not their cache values) because the cache
5268        // can change between releasing the validation lock and the wave's
5269        // re-acquisition — see the P2 race fix below.
5270        //
5271        // P2 (Slice A close /qa) — between `drop(s)` and `run_wave`'s
5272        // closure re-acquiring the lock, a concurrent thread could
5273        // invalidate one of the added deps, releasing its cache handle. A
5274        // pre-snapshot of `(added_dep, cache)` pairs would then carry a
5275        // dangling HandleId into `deliver_data_to_consumer`. The fix is to
5276        // re-read each added dep's `cache` INSIDE the closure (under the
5277        // freshly re-acquired state lock). The wave-owner re-entrant mutex
5278        // (Q2) blocks concurrent waves once we enter `run_wave`, so the
5279        // re-read sees a coherent post-validation state.
5280        let added_for_wave: Vec<NodeId> = added.iter().copied().collect();
5281        // §7 (D208–D211): the union-find union/split-eager block that
5282        // lived here is DELETED. scheduling groups are static +
5283        // user-declared and do NOT migrate with topology, so adding /
5284        // removing dep edges never recomputes any node's group. The
5285        // group-consistency invariant for the new adjacency was already
5286        // enforced above (the strict check that replaced the P13 block).
5287        // No registry mutation on the set_deps path.
5288        // B3 (D117, 2026-05-10): when set_deps clears ALL deps mid-wave AND
5289        // n has a tier-1 (DIRTY) message already queued this wave with no
5290        // settle yet, push n to `pending_auto_resolve` so the drain-end
5291        // sweep emits a paired Resolved. Without this, subscribers observe
5292        // an unpaired DIRTY, violating R1.3.1.b two-phase push pairing.
5293        //
5294        // Outside any active wave the per-thread `pending_notify` is empty
5295        // (cleared at wave-end), so the predicate short-circuits and the
5296        // insert is a no-op. Inside a wave, the `pending_auto_resolve`
5297        // sweep at `drain_and_flush` end re-checks pending_notify and
5298        // routes through `queue_notify` (which handles paused-children
5299        // pause-buffer placement automatically).
5300        if new_deps_vec.is_empty() {
5301            // F6 (/qa 2026-05-10): walk pending_notify in arrival order
5302            // counting unpaired DIRTYs. Tier 4 INVALIDATE is NOT a
5303            // settle for two-phase pairing — it clears cache but does
5304            // not pair with a DIRTY. Pairs are DIRTY ↔ DATA / RESOLVED
5305            // (tier 3 value-class) or DIRTY ↔ COMPLETE / ERROR (tier 5
5306            // terminal). Multi-emit waves like `[DIRTY, RESOLVED, DIRTY]`
5307            // leave one trailing unpaired DIRTY that needs auto-Resolved.
5308            crate::batch::with_wave_state(|ws| {
5309                let needs_auto_resolve = ws.pending_notify.get(&n).is_some_and(|entry| {
5310                    let mut unpaired: i32 = 0;
5311                    for m in entry.iter_messages() {
5312                        match m {
5313                            crate::message::Message::Dirty => unpaired += 1,
5314                            crate::message::Message::Data(_)
5315                            | crate::message::Message::Resolved
5316                            | crate::message::Message::Complete
5317                            | crate::message::Message::Error(_)
5318                                if unpaired > 0 =>
5319                            {
5320                                unpaired -= 1;
5321                            }
5322                            // INVALIDATE / PAUSE / RESUME / TEARDOWN /
5323                            // START — not settles for two-phase pairing.
5324                            _ => {}
5325                        }
5326                    }
5327                    unpaired > 0
5328                });
5329                if needs_auto_resolve {
5330                    ws.pending_auto_resolve.insert(n);
5331                }
5332            });
5333        }
5334        // Drop the state lock before run_wave (which acquires its own) and
5335        // before crossing the binding boundary for the F1 refcount-fix
5336        // releases. Keeps the lock-discipline split (binding calls outside
5337        // the state lock) consistent with the rest of the dispatcher.
5338        drop(s);
5339        // Slice E2 (D067): fire OnRerun lock-released for dynamic nodes
5340        // that had previously fired. The cleanup closure cleans up
5341        // resources tied to the old dep shape before the next fn-fire
5342        // (triggered by added-dep push-on-subscribe below) registers a
5343        // fresh cleanup spec. Direct fire (NOT via deferred_cleanup_hooks)
5344        // because set_deps may NOT enter a wave (no added deps → no
5345        // run_wave below) — queueing the hook would orphan it until the
5346        // next unrelated wave drains.
5347        if fire_set_deps_on_rerun {
5348            self.binding.cleanup_for(n, CleanupTrigger::OnRerun);
5349        }
5350        // Fire topology event after lock is dropped.
5351        self.fire_topology_event(&crate::topology::TopologyEvent::DepsChanged {
5352            node: n,
5353            old_deps: old_deps_vec,
5354            new_deps: new_deps_vec.clone(),
5355        });
5356        if !added_for_wave.is_empty() {
5357            // Slice Y1 / Phase E: push-on-subscribe wave runs on `n`'s
5358            // touched partitions. Added deps are now unioned with `n`
5359            // (Phase C P12 fix moved registry mutation inside the state
5360            // lock), so any cascade through them stays in `n`'s partition
5361            // set as walked by `compute_touched_partitions`.
5362            self.run_wave_for(n, |this| {
5363                let mut s = this.lock_state();
5364                // Defensive: re-validate `n` still exists and isn't terminal.
5365                // A concurrent path could have terminated it between
5366                // validation and run_wave_for's partition-lock acquisition.
5367                if !s.nodes.contains_key(&n) || s.require_node(n).terminal.is_some() {
5368                    return;
5369                }
5370                for added_dep in &added_for_wave {
5371                    // Re-read cache under the wave-owner-held lock — this
5372                    // is the post-validation, post-concurrent-action
5373                    // snapshot. NO_HANDLE means the dep was invalidated
5374                    // concurrently; skip (no data to push).
5375                    let cache = match s.nodes.get(added_dep) {
5376                        Some(rec) => rec.cache,
5377                        None => continue, // dep deleted concurrently
5378                    };
5379                    if cache == NO_HANDLE {
5380                        continue;
5381                    }
5382                    let dep_idx = s.require_node(n).dep_index_of(*added_dep);
5383                    if let Some(idx) = dep_idx {
5384                        this.deliver_data_to_consumer(&mut s, n, idx, cache);
5385                    }
5386                }
5387            });
5388        }
5389        for h in removed_handles {
5390            self.binding.release_handle(h);
5391        }
5392        Ok(())
5393    }
5394
5395    /// DFS from `from` along data-flow edges (children map) looking for `to`.
5396    /// Returns the path including endpoints, or `None` if unreachable. Used
5397    /// for cycle detection in [`Self::set_deps`].
5398    fn path_from_to(&self, s: &CoreState, from: NodeId, to: NodeId) -> Option<Vec<NodeId>> {
5399        if from == to {
5400            return Some(vec![from]);
5401        }
5402        let mut stack: Vec<(NodeId, Vec<NodeId>)> = vec![(from, vec![from])];
5403        let mut visited: HashSet<NodeId> = HashSet::new();
5404        while let Some((cur, path)) = stack.pop() {
5405            if !visited.insert(cur) {
5406                continue;
5407            }
5408            if cur == to {
5409                return Some(path);
5410            }
5411            if let Some(children) = s.children.get(&cur) {
5412                for &child in children {
5413                    let mut new_path = path.clone();
5414                    new_path.push(child);
5415                    stack.push((child, new_path));
5416                }
5417            }
5418        }
5419        None
5420    }
5421}
5422
5423// CoreState helpers — kept on the inner struct so they're naturally scoped
5424// to the lock guard.
5425impl CoreState {
5426    // D246/S2c: `empty_shard` deleted — no per-`ShardKey` shard
5427    // construction (single shard always under single-owner).
5428
5429    // `alloc_node_id` / `alloc_sub_id` moved to `impl St` (Step 2a,
5430    // D220-EXEC): their counters now live in the separate `CoreShared`
5431    // region, which only the combined `St` guard holds alongside the
5432    // shard. Call sites (`s.alloc_node_id()` / `s.alloc_sub_id()` where
5433    // `s = self.lock_state()`) are unchanged — inherent `St` methods
5434    // resolve before the `Deref`-to-`CoreState` ones.
5435
5436    /// Clear wave-scoped flags and rotate per-dep batch data on every
5437    /// node. Run at the end of every wave (regular drain via `run_wave`,
5438    /// activation drain via `activate_derived`, and `BatchGuard::drop`'s
5439    /// drain). Centralized so a future wave-state field can't be missed
5440    /// at one of the cleanup sites.
5441    ///
5442    /// Per-dep rotation (R2.9.b / R1.3.6.b):
5443    /// - `prev_data` ← last element of `data_batch` (or unchanged if empty).
5444    ///   The last batch entry's retain transfers to `prev_data`; the old
5445    ///   `prev_data`'s retain is released. All earlier batch entries are
5446    ///   released.
5447    /// - `data_batch` cleared.
5448    /// - Per-dep `dirty` and `involved_this_wave` cleared.
5449    ///
5450    /// Handle releases are pushed to `deferred_handle_releases` for
5451    /// post-lock-drop release by the caller.
5452    pub(crate) fn clear_wave_state(&mut self, ws: &mut crate::batch::WaveState) {
5453        // Q-beyond Sub-slice 1 (D108, 2026-05-09): `pending_auto_resolve`
5454        // + `pending_pause_overflow` clears moved to
5455        // [`crate::batch::WaveState::clear_wave_state`]. The per-NodeRecord
5456        // rotation below pushes batch-handle and prev_data releases into
5457        // `ws.deferred_handle_releases` (was `cps.deferred_handle_releases`
5458        // pre-sub-slice-1, was `s.deferred_handle_releases` pre-Q2). Caller
5459        // borrows the WaveState thread_local; no lock-discipline rule
5460        // applies (state lock + thread_local borrow are independent).
5461        //
5462        // Q-beyond Sub-slice 3 (D108, 2026-05-09):
5463        // `invalidate_hooks_fired_this_wave` clear moved to
5464        // [`crate::batch::WaveState::clear_wave_state`]. The
5465        // `deferred_cleanup_hooks` invariant (NOT cleared here, drained
5466        // explicitly on success/panic paths) likewise moves with the
5467        // field.
5468        //
5469        // /qa F2 reverted (2026-05-10): `currently_firing` stays on
5470        // CoreState (per-Core, cross-thread visible — load-bearing for
5471        // P13). Defensive clear here mirrors the pre-sub-slice-3 safety
5472        // net (`FiringGuard`'s RAII push/pop is balanced even on panic;
5473        // a future code path that bypasses the guard would otherwise
5474        // leak a stale entry into the next wave).
5475        //
5476        // Step 2a (D220-EXEC): `currently_firing` now lives in the
5477        // separate `CoreShared` region (unreachable from `&mut
5478        // CoreState`). The defensive clear is performed by the two
5479        // `clear_wave_state` call sites in `batch.rs` via the combined
5480        // `St` guard's `.shared` field, immediately adjacent — same
5481        // wave-end point, same belt-and-suspenders intent.
5482        //
5483        // Slice G tier3 emit tracking lives on the per-thread
5484        // `WAVE_STATE`; cleared by the outermost `BatchGuard::drop`
5485        // (S4/D246: `WaveOwnerGuard` deleted — single-owner ⇒ one
5486        // uninterrupted owner-side drain, no per-partition release).
5487        for rec in self.nodes.values_mut() {
5488            rec.dirty = false;
5489            rec.involved_this_wave = false;
5490            // §10.3 perf (Slice V1): clear involved_mask in one op.
5491            rec.involved_mask = 0;
5492            for dr in &mut rec.dep_records {
5493                let batch_len = dr.data_batch.len();
5494                if batch_len > 0 {
5495                    // Release all batch entries EXCEPT the last — the last
5496                    // entry's retain transfers to prev_data.
5497                    for &h in &dr.data_batch[..batch_len - 1] {
5498                        ws.deferred_handle_releases.push(h);
5499                    }
5500                    // Release the OLD prev_data (its retain was from the
5501                    // previous wave's rotation or from initial delivery).
5502                    if dr.prev_data != NO_HANDLE {
5503                        ws.deferred_handle_releases.push(dr.prev_data);
5504                    }
5505                    // Rotate: last batch entry becomes new prev_data.
5506                    // Its retain carries over — no extra retain needed.
5507                    dr.prev_data = dr.data_batch[batch_len - 1];
5508                    dr.data_batch.clear();
5509                }
5510                dr.dirty = false;
5511                dr.involved_this_wave = false;
5512            }
5513        }
5514    }
5515
5516    pub(crate) fn require_node(&self, id: NodeId) -> &NodeRecord {
5517        self.nodes
5518            .get(&id)
5519            .unwrap_or_else(|| panic!("unknown node {id:?}"))
5520    }
5521
5522    pub(crate) fn require_node_mut(&mut self, id: NodeId) -> &mut NodeRecord {
5523        self.nodes
5524            .get_mut(&id)
5525            .unwrap_or_else(|| panic!("unknown node {id:?}"))
5526    }
5527}
5528
5529/// Release every binding-side refcount share owned by this `CoreState`
5530/// when the last `Core` clone drops the inner Mutex.
5531///
5532/// Without this, every retained handle in `cache` / `terminal` Error /
5533/// `dep_terminals` Error / pause-buffer-payload would leak in the binding
5534/// registry until process exit. Production bindings (napi-rs, pyo3,
5535/// wasm-bindgen) all maintain handle-ref maps that grow unbounded without
5536/// this cleanup.
5537///
5538/// Safe to call during panic unwinding — `BindingBoundary::release_handle`
5539/// is the only call, and a panicking binding during cleanup would already
5540/// have been a problem in normal operation.
5541impl Drop for CoreState {
5542    fn drop(&mut self) {
5543        // Q-beyond Sub-slice 3 (D108, 2026-05-09): `deferred_flush_jobs`
5544        // moved to [`crate::batch::WaveState`]. The `Vec<Sink>` clones
5545        // drop naturally with the per-thread WaveState's lifetime; no
5546        // CoreState-side cleanup needed.
5547        // Q-beyond Sub-slice 1 (D108, 2026-05-09): `deferred_handle_releases`
5548        // and `wave_cache_snapshots` moved to per-thread WaveState
5549        // thread_local. By outermost-BatchGuard-drop discipline both fields
5550        // are empty by the time CoreState drops (BatchGuard owns a Core
5551        // clone, so Core can't drop while a BatchGuard is in flight). Any
5552        // thread that ran a wave on this Core drained on its own outermost
5553        // BatchGuard; cross-Core thread_local sharing is fine because each
5554        // wave drains its own retains.
5555        //
5556        // Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_fires` and
5557        // `pending_notify` likewise moved to per-thread WaveState. The
5558        // pre-Sub-slice-2 `pending_notify` walk here (drain + release each
5559        // payload_handle) is no longer reachable from `Drop for CoreState`:
5560        // by invariant, no wave is in flight when CoreState drops (BatchGuard
5561        // holds a Core clone), so the originating thread's WaveState
5562        // pending_notify is empty by then. Other threads' WaveStates are
5563        // unreachable from CoreState::drop anyway — they're per-thread
5564        // thread_locals scoped to whichever thread ran the wave. The
5565        // outermost `BatchGuard::drop` is the canonical drain point on both
5566        // success and panic paths; Drop for CoreState relies on that
5567        // discipline holding rather than re-implementing it.
5568
5569        // Per-node retained handles:
5570        //   - `cache` (1 retain per non-NO_HANDLE state cache or
5571        //     populated compute cache).
5572        //   - `terminal == Some(Error(h))` (1 retain on the terminal slot).
5573        //   - `dep_terminals[i] == Some(Error(h))` (1 retain per consumer's
5574        //     terminated-dep slot).
5575        //   - `pause_state` paused buffer messages with payload handles
5576        //     (1 retain per buffered Data/Error).
5577        for rec in self.nodes.values_mut() {
5578            if rec.cache != NO_HANDLE {
5579                self.binding.release_handle(rec.cache);
5580            }
5581            if let Some(TerminalKind::Error(h)) = rec.terminal {
5582                self.binding.release_handle(h);
5583            }
5584            for dr in &rec.dep_records {
5585                if let Some(TerminalKind::Error(h)) = dr.terminal {
5586                    self.binding.release_handle(h);
5587                }
5588                // Release data_batch retains (in-flight wave data).
5589                for &h in &dr.data_batch {
5590                    self.binding.release_handle(h);
5591                }
5592                // Release prev_data retain (cross-wave persistence).
5593                if dr.prev_data != NO_HANDLE {
5594                    self.binding.release_handle(dr.prev_data);
5595                }
5596            }
5597            if let PauseState::Paused { buffer, .. } = &rec.pause_state {
5598                for msg in buffer {
5599                    if let Some(h) = msg.payload_handle() {
5600                        self.binding.release_handle(h);
5601                    }
5602                }
5603            }
5604            // Slice E1: release replay-buffer retains.
5605            for &h in &rec.replay_buffer {
5606                self.binding.release_handle(h);
5607            }
5608            // Operator scratch (Slice C-3, D026): generic per-operator
5609            // state struct. Each variant's release_handles releases the
5610            // shares it owns (Scan/Reduce acc, Distinct/Pairwise prev,
5611            // Last latest + default; Take/Skip/TakeWhile own no handles).
5612            if let Some(scratch) = rec.op_scratch.as_mut() {
5613                scratch.release_handles(&*self.binding);
5614            }
5615        }
5616
5617        // D-α (D028) `pending_scratch_release` shutdown drain moved to
5618        // `impl Drop for CoreShared` (Step 2a, D220-EXEC): the queue +
5619        // its `binding` were hoisted into the separate `CoreShared`
5620        // region, so `Drop for CoreState` can no longer reach them.
5621        // `CoreShared` (one per Core, owned by the cell) drops with the
5622        // cell — its `Drop` performs the identical
5623        // release-before-Vec-drop discipline. The per-node retain walk
5624        // above stays here (per-shard `CoreState` owns its `nodes`).
5625
5626        // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): WaveState's
5627        // retain-holding fields (`wave_cache_snapshots`,
5628        // `deferred_handle_releases`, `pending_notify`) are drained by
5629        // outermost BatchGuard::drop (success + panic paths). See
5630        // comment above for the invariant.
5631    }
5632}