Skip to main content

graphrefly_core/
node.rs

1//! The dispatcher — node registration, subscription, wave engine.
2//!
3//! Mirrors `~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`
4//! (the Phase 13.6 brainstorm prototype, ~370 lines, 22 invariant tests).
5//!
6//! # Scope (M1 dispatcher + Slice A+B parity, closed 2026-05-05)
7//!
8//! - State + derived + dynamic node registration.
9//! - Subscribe / unsubscribe with push-on-subscribe (R1.2.3).
10//! - RAII [`Subscription`] with Drop-based deregister (§10.12).
11//! - DIRTY → DATA / RESOLVED ordering (R1.3.1.b two-phase push).
12//! - Equals-substitution (R1.3.2): identity is zero-FFI; custom crosses boundary.
13//! - First-run gate (R2.5.3) — fn does not fire until every dep has a handle.
14//! - Diamond resolution — one fn fire per wave even with shared upstream.
15//! - `set_deps()` atomic dep mutation with cycle detection + Phase 13.8 Q1
16//!   terminal-rejection policy (R3.3.1).
17//! - PAUSE / RESUME with lockId set + replay buffer (R1.2.6, R2.6, §10.2).
18//! - INVALIDATE broadcast + cascade with R1.4 idempotency.
19//! - COMPLETE / ERROR cascade + Lock 2.B auto-cascade gating
20//!   (ERROR dominates COMPLETE; first error wins).
21//! - TEARDOWN auto-precedes COMPLETE (R2.6.4 / Lock 6.F) +
22//!   `has_received_teardown` idempotency.
23//! - Meta TEARDOWN ordering (R1.3.9.d) — companions tear down before parent.
24//! - Resubscribable terminal lifecycle (R2.2.7, R2.5.3) — late subscribe to a
25//!   resubscribable terminal node resets lifecycle, except after TEARDOWN
26//!   (per F3 audit guard: TEARDOWN is permanent).
27//!
28//! # Module split (Slice C-1, 2026-05-05)
29//!
30//! Wave-engine internals (drain loop, fire selection, emission commit, sink
31//! dispatch) live in [`crate::batch`]. The split is purely organizational —
32//! the methods are still on `Core`. See `batch.rs` for the wave-engine
33//! entry points (`run_wave`, `drain_and_flush`, `commit_emission`,
34//! `queue_notify`, `deliver_data_to_consumer`).
35//!
36//! # Out of scope (later slices / milestones)
37//!
38//! - Deactivation cleanup (RAM nodes clear cache when sink count → 0) — M2.
39//!
40//! See [`migration-status.md`](../../../docs/migration-status.md) for the
41//! milestone tracker and [`porting-deferred.md`](../../../docs/porting-deferred.md)
42//! for surfaced concerns deferred to evidence-driven slices.
43//!
44//! # Re-entrance discipline (Slice A close, M1: fully lock-released)
45//!
46//! - **Wave-end sink fires** drop the state lock first. A subscriber's sink
47//!   that calls back into `Core::emit` / `pause` / `resume` / `invalidate` /
48//!   `complete` / `error` / `teardown` re-acquires the lock cleanly and runs
49//!   a nested wave (the one-Core-per-OS-thread `in_tick` ownership slot is
50//!   cleared by the owning `BatchGuard::drop` before the deferred-fire
51//!   phase).
52//! - **`BindingBoundary::invoke_fn`** fires lock-released. The wave engine
53//!   acquires + drops the state lock per fn-fire iteration around the
54//!   `invoke_fn` callback. User fns may re-enter `Core::emit` / `pause` /
55//!   etc. and run a nested wave.
56//! - **`BindingBoundary::custom_equals`** fires lock-released.
57//!   `commit_emission` brackets the equals check around a lock release;
58//!   custom equals oracles may re-enter Core safely.
59//! - **Subscribe-time handshake** also fires lock-released.
60//!   [`Core::subscribe`] installs the sink under the state lock, drops
61//!   it, then fires the per-tier handshake (`[Start]` / `[Data(cache)]?`
62//!   / `[Complete]?` / `[Error(h)]?` / `[Teardown]?` per R1.3.5.a)
63//!   lock-released. A handshake-time sink callback may re-enter Core
64//!   via the owner-side mailbox/`DeferQueue` seam (`emit` / `complete` /
65//!   `error` / `subscribe`); the owner drain applies it as a nested
66//!   wave. Post-S4 `Core` is single-owner `!Send + !Sync` — there is no
67//!   `wave_owner` mutex and no cross-thread block; R1.3.5.a
68//!   happens-after ordering holds because the one owner thread installs
69//!   the sink (state lock) before any subsequent emit's flush observes
70//!   it (`subscribers_revision` freeze, Slice X4/D2).
71
72use std::collections::VecDeque;
73use std::panic::{catch_unwind, AssertUnwindSafe};
74use std::sync::atomic::{AtomicU64, Ordering};
75use std::sync::Arc;
76
77use ahash::{AHashMap as HashMap, AHashSet as HashSet};
78use parking_lot::Mutex;
79
80// D246/S2c: `WaveOwnerGuard` (the §7 per-group `parking_lot::ReentrantMutex`
81// wave-lock wrapper) is deleted — single-owner ⇒ no cross-thread wave
82// serialization.
83use smallvec::SmallVec;
84use thiserror::Error;
85
86use crate::boundary::{BindingBoundary, CleanupTrigger};
87use crate::clock::monotonic_ns;
88use crate::handle::{FnId, HandleId, LockId, NodeId, NO_HANDLE};
89use crate::message::Message;
90
91/// Terminal-lifecycle state — once set on a node, the node will not emit
92/// further DATA; per-dep slots on consumers also use this to track which
93/// upstreams have terminated (R1.3.4 / Lock 2.B).
94///
95/// `Error` carries a [`HandleId`] resolving to the error value. Refcount is
96/// retained when the variant is stored in a node's `terminal` slot or any
97/// consumer's `dep_terminals` slot; v1 does not release these (terminal
98/// state is one-shot at this layer; release happens on resubscribable
99/// terminal-lifecycle reset, a separate slice).
100#[derive(Copy, Clone, Debug, PartialEq, Eq)]
101pub enum TerminalKind {
102    Complete,
103    Error(HandleId),
104}
105
106/// Node kind discriminant — **derived metadata** computed from
107/// [`NodeRecord`]'s field shape (D030 unification, Slice D).
108///
109/// Core no longer stores `kind` as a field; it's computed on demand from
110/// `(deps.is_empty(), fn_id.is_some(), op.is_some(), is_dynamic)`,
111/// mirroring TS's data model where `NodeImpl` has no `_kind` field. The
112/// shape uniquely identifies the kind:
113///
114/// | deps      | fn_id | op   | is_dynamic | kind     |
115/// |-----------|-------|------|-----------|----------|
116/// | empty     | None  | None | -         | State    |
117/// | empty     | Some  | None | -         | Producer |
118/// | non-empty | Some  | None | false     | Derived  |
119/// | non-empty | Some  | None | true      | Dynamic  |
120/// | non-empty | None  | Some | -         | Operator |
121///
122/// Public API ([`Core::kind_of`]) derives this enum on each call. State
123/// nodes are ROM (cache survives deactivation); compute nodes
124/// (Derived / Dynamic / Operator) and producers are RAM.
125#[derive(Copy, Clone, Eq, PartialEq, Debug)]
126pub enum NodeKind {
127    /// Source node: cache is intrinsic, no fn, no deps. Mutated via [`Core::emit`].
128    State,
129    /// Producer node: fn fires once on first subscribe. No deps;
130    /// emissions arrive via sinks the fn subscribes to (zip / concat /
131    /// race / takeUntil pattern). Slice D / D031.
132    Producer,
133    /// Derived node: fn fires on every dep change; all deps tracked.
134    Derived,
135    /// Dynamic node: fn declares which dep indices it actually read this run.
136    /// Untracked dep updates flow through cache but do NOT re-fire fn.
137    Dynamic,
138    /// Operator node: built-in dispatch path for transform / combine /
139    /// flow / resilience operators. The `OperatorOp` discriminant selects
140    /// the per-operator FFI path ([`BindingBoundary::project_each`] etc.);
141    /// Core manages per-operator state via the generic `op_scratch` slot
142    /// on `NodeRecord` (D026). Per Slice C-1 (D009) / Slice C-3 (D026).
143    Operator(OperatorOp),
144}
145
146impl NodeKind {
147    /// True if this kind opts OUT of Lock 2.B auto-cascade. Operator(Reduce)
148    /// and Operator(Last) must intercept upstream COMPLETE so they can emit
149    /// their accumulator / buffered value before the cascade terminates them;
150    /// instead of cascading, terminate_node queues such children for fn-fire
151    /// so `fire_operator` can handle the terminal.
152    pub(crate) fn skips_auto_cascade(self) -> bool {
153        matches!(
154            self,
155            NodeKind::Operator(
156                OperatorOp::Reduce { .. } | OperatorOp::Last { .. } | OperatorOp::Valve
157            )
158        )
159    }
160}
161
162/// Built-in operator discriminant. Selects the per-operator dispatch path
163/// in `fire_operator` (`crates/graphrefly-core/src/batch.rs`). Each variant
164/// carries the binding-side closure ids (and seed handle for stateful
165/// folders) needed for the wave-execution path; Core stores no user values
166/// itself per the handle-protocol cleaving plane.
167#[derive(Copy, Clone, Eq, PartialEq, Debug)]
168pub enum OperatorOp {
169    /// `map(source, project)` — element-wise transform. Calls
170    /// `BindingBoundary::project_each(fn_id, &inputs)` per fire; emits each
171    /// returned handle via `commit_emission_verbatim` (R1.3.2.d batch
172    /// semantics — no equals substitution between batch entries).
173    Map { fn_id: FnId },
174    /// `filter(source, predicate)` — silent-drop selection (D012/D018).
175    /// Calls `BindingBoundary::predicate_each(fn_id, &inputs)`; emits each
176    /// passing input verbatim. If zero pass on a wave that dirtied the
177    /// node, queues a single `RESOLVED` to settle (D018).
178    Filter { fn_id: FnId },
179    /// `scan(source, fold, seed)` — left-fold emitting each new accumulator.
180    /// `seed` is captured at registration; `acc` lives in
181    /// [`ScanState`](super::op_state::ScanState) inside
182    /// [`NodeRecord::op_scratch`] and persists across waves until
183    /// resubscribable reset. Calls `BindingBoundary::fold_each(fn_id, acc,
184    /// &inputs) -> SmallVec<HandleId>` per fire.
185    Scan { fn_id: FnId, seed: HandleId },
186    /// `reduce(source, fold, seed)` — left-fold emitting once on upstream
187    /// COMPLETE. Accumulates silently while source DATA flows; on
188    /// dep[0].terminal == Some(Complete), emits `[Data(acc), Complete]`.
189    /// On `Error(h)`, propagates the error verbatim. Opts out of Lock 2.B
190    /// auto-cascade (see `NodeKind::skips_auto_cascade`).
191    Reduce { fn_id: FnId, seed: HandleId },
192    /// `distinctUntilChanged(source, equals)` — suppresses adjacent
193    /// duplicates. Calls `BindingBoundary::custom_equals(equals_fn_id,
194    /// prev, current)` per input; emits non-equal items verbatim and
195    /// updates `prev`. If zero items pass on a wave that dirtied the node,
196    /// queues `RESOLVED` (matches Filter discipline).
197    DistinctUntilChanged { equals_fn_id: FnId },
198    /// `pairwise(source)` — emits `(prev, current)` pairs starting after
199    /// the second value. First value swallowed (sets `prev`). Calls
200    /// `BindingBoundary::pairwise_pack(fn_id, prev, current)` per pair to
201    /// produce the binding-side tuple handle.
202    Pairwise { fn_id: FnId },
203
204    // ----- Slice C-2: multi-dep combinators (D020) -----
205    /// `combine(...sources)` — N-dep combineLatest. On any dep fire, packs
206    /// the latest handle per dep into a single tuple handle via
207    /// `BindingBoundary::pack_tuple(pack_fn, &handles)`. First-run gate
208    /// (`partial: false` default) holds until all deps deliver real DATA
209    /// (R2.5.3). COMPLETE cascades when all deps complete (R1.3.4.b).
210    Combine { pack_fn: FnId },
211
212    /// `withLatestFrom(primary, secondary)` — 2-dep, fire-on-primary-only
213    /// (D021, Phase 10.5). Packs `[primary, secondary]` via
214    /// `BindingBoundary::pack_tuple(pack_fn, &handles)` when dep[0]
215    /// (primary) has DATA in the wave. If only dep[1] (secondary) fires,
216    /// settles with RESOLVED (D018 pattern). First-run gate holds until
217    /// both deps deliver (R2.5.3 `partial: false`). Post-warmup INVALIDATE
218    /// guard: if secondary `prev_data == NO_HANDLE` and batch empty after
219    /// warmup, settles with RESOLVED (no stale pair).
220    WithLatestFrom { pack_fn: FnId },
221
222    /// `merge(...sources)` — N-dep, forward all DATA handles verbatim
223    /// (D022). Zero FFI on fire: no transformation, no binding call.
224    /// Each dep's batch handles are retained and emitted individually.
225    /// COMPLETE cascades when all deps complete (R1.3.4.b).
226    Merge,
227
228    // ----- Slice C-3: flow operators (D024) -----
229    /// `take(source, count)` — emits the first `count` DATA values then
230    /// self-completes via `Core::complete`. Tracks `count_emitted` in
231    /// [`TakeState`](super::op_state::TakeState). When upstream completes
232    /// before `count` is reached, the standard auto-cascade propagates
233    /// COMPLETE. `count == 0` is allowed: the first fire emits zero
234    /// items then immediately self-completes (D027).
235    Take { count: u32 },
236
237    /// `skip(source, count)` — drops the first `count` DATA values; once
238    /// the threshold is crossed, subsequent DATAs pass through verbatim.
239    /// Tracks `count_skipped` in [`SkipState`](super::op_state::SkipState).
240    /// On a wave where every input is still in the skip window, queues
241    /// DIRTY+RESOLVED to settle (D018 pattern).
242    Skip { count: u32 },
243
244    /// `takeWhile(source, predicate)` — emits while `predicate(input)`
245    /// holds; on the first `false`, emits any preceding passes then
246    /// self-completes via `Core::complete`. Reuses
247    /// [`BindingBoundary::predicate_each`] (D029); after the first
248    /// `false`, subsequent inputs in the same batch are dropped.
249    TakeWhile { fn_id: FnId },
250
251    /// `last(source)` / `last_with_default(source, default)` — buffers
252    /// the latest DATA; on upstream COMPLETE, emits `Data(latest)` then
253    /// `Complete`. The `default` field is `NO_HANDLE` for the no-default
254    /// factory (emits only `Complete` on empty stream), or a registered
255    /// default handle (emits `Data(default)` + `Complete` on empty
256    /// stream). Storage: [`LastState`](super::op_state::LastState) holds
257    /// `latest` (live buffer) and `default` (registration-time, stable).
258    /// Opts out of Lock 2.B auto-cascade so it can intercept upstream
259    /// COMPLETE.
260    Last { default: HandleId },
261
262    // ----- Slice U: control operators (D047) -----
263    /// `tap(source, fn)` — side-effect passthrough. Calls
264    /// `BindingBoundary::invoke_tap_fn(fn_id, handle)` on each input DATA,
265    /// then emits the input handle unchanged. Zero-transform: output
266    /// handles are the inputs verbatim (no equals substitution, no
267    /// allocation).
268    Tap { fn_id: FnId },
269
270    /// `tapFirst(source, fn)` — one-shot side-effect on first DATA. Same
271    /// as [`Tap`](Self::Tap) but fires `invoke_tap_fn` only once; after
272    /// the first fire, subsequent DATA passes through without a callback.
273    /// State: [`TapFirstState`](super::op_state::TapFirstState) tracks
274    /// `fired: bool`.
275    TapFirst { fn_id: FnId },
276
277    /// `valve(source, control)` — conditional forward. 2-dep: dep[0] is
278    /// source, dep[1] is boolean control. When the latest control value
279    /// is truthy (non-zero handle), forwards source DATA; when falsy,
280    /// settles with RESOLVED. Partial mode so it fires on control-alone
281    /// before source has delivered. Does NOT auto-complete on control
282    /// terminal (`completeWhenDepsComplete: false` equivalent).
283    Valve,
284
285    /// `settle(source, quietWaves, maxWaves)` — convergence detector.
286    /// Forwards each upstream DATA, counts consecutive no-change waves,
287    /// and self-completes when `quiet_count >= quiet_waves` (or
288    /// `wave_count >= max_waves` if set). State:
289    /// [`SettleState`](super::op_state::SettleState).
290    Settle {
291        quiet_waves: u32,
292        max_waves: Option<u32>,
293    },
294}
295
296/// Registration options for [`Core::register_operator`].
297///
298/// `equals` controls operator output dedup (R5.7 — defaults to identity).
299/// `partial` controls the R2.5.3 first-run gate (R5.4 — operator dispatch
300/// fires on first DATA from any dep when `true`; default `false` matches
301/// the gated derived discipline).
302#[derive(Copy, Clone, Debug)]
303pub struct OperatorOpts {
304    pub equals: EqualsMode,
305    pub partial: bool,
306}
307
308impl Default for OperatorOpts {
309    fn default() -> Self {
310        Self {
311            equals: EqualsMode::Identity,
312            partial: false,
313        }
314    }
315}
316
317/// Closure-form fn id OR typed operator discriminant — the two dispatch
318/// paths a node can use. State / passthrough nodes pass `None` to
319/// [`Core::register`] (no fn at all).
320#[derive(Copy, Clone, Debug)]
321pub enum NodeFnOrOp {
322    /// Closure-form: invokes [`BindingBoundary::invoke_fn`] per fire.
323    /// Used for Derived / Dynamic / Producer.
324    Fn(FnId),
325    /// Typed-op: routes to a `fire_op_*` helper that calls per-operator
326    /// FFI methods (`project_each` / `predicate_each` / `fold_each` /
327    /// `pairwise_pack` / `pack_tuple`). Used for Operator nodes.
328    Op(OperatorOp),
329}
330
331/// Pause behavior mode (canonical-spec §2.6 — three modes shipped in TS;
332/// Slice F audit, 2026-05-07 — closed the Rust port gap).
333///
334/// | Mode | Outgoing tier-3 routing while paused | RESUME behavior |
335/// |---|---|---|
336/// | [`PausableMode::Default`] | suppress fn-fire upstream (no DIRTY emitted) | fire fn ONCE on RESUME if any dep delivered DATA during pause; collapses N pause-window writes into one settle |
337/// | [`PausableMode::ResumeAll`] | buffer outgoing tier-3 / tier-4 messages per-wave | replay each buffered wave verbatim on RESUME |
338/// | [`PausableMode::Off`] | dispatcher ignores PAUSE; tier-3 flushes immediately | no-op (no buffer to drain) |
339///
340/// Default is [`PausableMode::Default`] per canonical §2.6 — every untagged
341/// source picks it up. Memory profile is O(1) per node (no buffer); the
342/// trade-off is "subscribers see one consolidated DATA on RESUME" rather
343/// than the K mid-pause emissions verbatim.
344///
345/// Note: tier-1 (DIRTY) / tier-2 (PAUSE/RESUME) / tier-5 (COMPLETE/ERROR) /
346/// tier-6 (TEARDOWN) bypass pause regardless of mode — they remain
347/// observable so leaked pause-controllers cannot strand subscribers.
348#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
349pub enum PausableMode {
350    /// Suppress fn-fire while paused; fire once on RESUME if any dep
351    /// delivered DATA during the pause window. Canonical default.
352    #[default]
353    Default,
354    /// Buffer outgoing tier-3 / tier-4 messages per-wave; replay on
355    /// RESUME. Use when subscribers need verbatim emit history (e.g. an
356    /// audit log, replay-on-reconnect bridge).
357    ResumeAll,
358    /// Dispatcher ignores PAUSE for this node — tier-3 flushes
359    /// immediately even while a lock is held. Use for nodes whose value
360    /// production is intrinsically pause-immune (telemetry counters,
361    /// monotonic timers).
362    Off,
363}
364
365/// Per-kind opts for [`Core::register`]. Cross-kind config knobs live
366/// here; per-kind specifics (deps, fn_or_op) live on
367/// [`NodeRegistration`].
368#[derive(Copy, Clone, Debug)]
369pub struct NodeOpts {
370    /// Initial cached value. Only valid for state nodes (no deps + no
371    /// fn + no op). [`NO_HANDLE`] starts the node sentinel.
372    pub initial: HandleId,
373    /// Equality mode for outgoing emissions (R1.3.2). Defaults to
374    /// [`EqualsMode::Identity`].
375    pub equals: EqualsMode,
376    /// First-run gate (R2.5.3 / D011). When `true`, the node fires as
377    /// soon as ANY dep delivers a real handle; when `false` (default),
378    /// the node holds until every dep has delivered.
379    pub partial: bool,
380    /// Dynamic flag (R2.5.3) — fn declares actually-tracked dep indices
381    /// per fire. Only meaningful when `fn_or_op == Some(Fn(_))` AND
382    /// deps non-empty.
383    pub is_dynamic: bool,
384    /// Pause behavior mode (canonical §2.6). Default is
385    /// [`PausableMode::Default`]. See [`PausableMode`] for the trade-offs.
386    pub pausable: PausableMode,
387    /// Replay buffer cap (canonical R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
388    /// `None` (default) disables; `Some(N)` keeps a circular buffer of the
389    /// last N DATA emissions and replays them to late subscribers as part
390    /// of the per-tier handshake (between [`Message::Start`] and any
391    /// terminal slice). Only DATA is buffered; RESOLVED entries are NOT
392    /// (R2.6.5 explicit "DATA only").
393    pub replay_buffer: Option<usize>,
394    /// D263 — when `true`, the `fire_fn` first-run gate (R2.5.3) treats a
395    /// terminal dep as "real input" so the node fires even if the only
396    /// signal a dep ever delivered was a `COMPLETE` (no DATA). Mirrors the
397    /// unconditional `fire_operator` semantics (`fire_operator`'s gate
398    /// already counts dep terminals as real input — e.g. for `Reduce`).
399    /// Default `false` preserves the historical gate (sentinel deps hold
400    /// the node until they deliver real DATA). NOT yet widened onto the
401    /// `Impl` parity contract per D196 + the parity-tests comment
402    /// ("NOT widened onto Impl"); kept substrate-internal until a
403    /// cross-arm scenario surfaces.
404    pub terminal_as_real_input: bool,
405}
406
407impl Default for NodeOpts {
408    fn default() -> Self {
409        Self {
410            initial: NO_HANDLE,
411            equals: EqualsMode::Identity,
412            partial: false,
413            is_dynamic: false,
414            pausable: PausableMode::Default,
415            replay_buffer: None,
416            terminal_as_real_input: false,
417        }
418    }
419}
420
421/// Unified node-registration descriptor (D030, Slice D).
422///
423/// All node kinds (State / Producer / Derived / Dynamic / Operator)
424/// register through [`Core::register`] with a `NodeRegistration`. The
425/// kind is **derived from the field shape** of the registration —
426/// `(deps.is_empty(), fn_or_op variant)`:
427///
428/// | deps      | fn_or_op   | is_dynamic | resulting kind |
429/// |-----------|-----------|-----------|----------------|
430/// | empty     | None      | -         | State          |
431/// | empty     | Some(Fn)  | -         | Producer       |
432/// | non-empty | Some(Fn)  | false     | Derived        |
433/// | non-empty | Some(Fn)  | true      | Dynamic        |
434/// | non-empty | Some(Op)  | -         | Operator       |
435///
436/// The sugar wrappers ([`Core::register_state`], [`Core::register_producer`],
437/// etc.) build a `NodeRegistration` and delegate.
438#[derive(Clone, Debug)]
439pub struct NodeRegistration {
440    /// Upstream deps in declaration order. Empty for state / producer.
441    pub deps: Vec<NodeId>,
442    /// Closure-form fn id or typed-op discriminant. `None` for state /
443    /// passthrough.
444    pub fn_or_op: Option<NodeFnOrOp>,
445    /// Cross-kind config knobs.
446    pub opts: NodeOpts,
447}
448
449/// Equality mode for a node's outgoing emissions.
450///
451/// `Identity` is the default: cache vs. new handle compare is a `u64` equal —
452/// zero FFI. `Custom` invokes [`BindingBoundary::custom_equals`] every check
453/// (R1.3.2.b two-arg call when both sides are non-sentinel).
454#[derive(Copy, Clone, Debug)]
455pub enum EqualsMode {
456    Identity,
457    Custom(FnId),
458}
459
460/// Public identifier for a single subscription (S2b / D225: promoted
461/// from `pub(crate)`). Returned by [`Core::subscribe`] /
462/// [`Core::try_subscribe`]; pair it with the `node_id` you subscribed
463/// and pass both to [`Core::unsubscribe`] to deregister.
464///
465/// S2b retires the core-level RAII `Subscription`/`WeakCore` (D223:
466/// the `Core` is owned by value and relocates between workers, so a
467/// parameterless `Drop` cannot reach it without `Weak<C>`/`unsafe` —
468/// D225). Drop-convenience is a *binding-layer* RAII wrapper over
469/// [`Core::unsubscribe`], used only where the holder co-owns the `Core`
470/// on its affinity worker (test harness, napi `BenchCore`). Substrate
471/// callers (e.g. producer upstream-sub cleanup, D229) unsubscribe
472/// explicitly via the owner-driven chain.
473#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
474pub struct SubscriptionId(u64);
475
476/// Deregister `sub_id` from `node_id` and run the Phase G / lifecycle
477/// cleanup chain (OnDeactivation → `producer_deactivate` → `wipe_ctx` →
478/// Core cache-clear). Extracted verbatim from the former
479/// `Subscription::Drop` (D225 refined A2, S2a) so `Core::unsubscribe`
480/// (the synchronous owner-invoked path) and the legacy RAII `Drop`
481/// share ONE body. Operates on `&C` directly — the body already used
482/// only `&dyn BindingBoundary` (via `make_op_scratch_with_binding`),
483/// never a `Core`, so this is a behaviour-identical move.
484///
485/// Producer deactivation (Slice D, D031): if removing this sub empties
486/// the subscribers map AND the node is a producer, fire
487/// `BindingBoundary::producer_deactivate(node_id)` AFTER releasing the
488/// state lock. The binding then drops its per-node state (subscriptions
489/// to upstream sources, captured closure state), which transitively
490/// unsubs from upstreams via their own unsubscribe. Re-entrance into
491/// Core from the deactivate hook is permitted since the lock is
492/// released first.
493#[allow(clippy::too_many_lines)] // Phase G is one continuous lifecycle hook chain (user cleanup → producer_deactivate → wipe_ctx → Core cache-clear); splitting it would obscure the ordering invariant.
494pub(crate) fn unsubscribe_sink(core: &Core, node_id: NodeId, sub_id: SubscriptionId) {
495    // Slice E2 (D056): when the last subscriber drops, fire the
496    // node's OnDeactivation cleanup hook BEFORE producer_deactivate
497    // (cleanup may release handles the producer subscription owns;
498    // reverse order would let producer_deactivate drop subs that user
499    // cleanup expected to be live). Both calls are lock-released per
500    // D045.
501    //
502    // OnDeactivation gating (D068, QA Q3 fix): fires only when the
503    // node has fired its fn at least once AND has a fn (`fn_id`
504    // populated). State nodes have no fn — they cannot register a
505    // cleanup spec via the production fn-return path (R2.4.5), so
506    // firing `cleanup_for` on them is wasted FFI; the binding's
507    // lookup is guaranteed to find no `current_cleanup`. Skipping
508    // here saves the FFI hop and matches the design-doc wording
509    // ("never-fired state nodes" — state-with-initial-value satisfies
510    // `has_fired_once = true` but still has no fn).
511    //
512    // Slice E2 /qa Q2(b) (D069): if the node is a resubscribable
513    // node that's ALREADY terminal (terminate fired BEFORE this last
514    // sub drop), fire `wipe_ctx` lock-released AFTER OnDeactivation
515    // + producer_deactivate. Mutually exclusive with `terminate_node`'s
516    // queue-wipe site: terminate-with-empty-subs goes through
517    // `pending_wipes`; terminate-with-live-subs routes here when
518    // those subs eventually drop. Either path fires exactly one
519    // wipe per terminal lifecycle.
520    let (was_last_sub, is_producer, has_user_cleanup, fire_wipe, binding) = {
521        // Step 2b-ii-B: route to the node's shard (this drop runs
522        // OUTSIDE any wave ⇒ ambient `None`; a grouped node's
523        // record is in shard `g`, so without this the unsubscribe
524        // would no-op on `DEFAULT_SHARD` → lost cleanup / leak).
525        let mut s = St::new(core);
526        let Some(rec) = s.nodes.get_mut(&node_id) else {
527            return;
528        };
529        rec.subscribers.remove(&sub_id);
530        // Slice X4 / D2: bump revision so any pending_notify entry for
531        // this node opened earlier in the wave starts a fresh batch on
532        // the next queue_notify, dropping the now-departed sink from
533        // the snapshot.
534        rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
535        let last = rec.subscribers.is_empty();
536        let producer = rec.is_producer();
537        // OnDeactivation gate: must have run a fn at least once
538        // (has_fired_once) AND have a fn registered (fn_id.is_some()).
539        // The fn_id check excludes state nodes whose has_fired_once
540        // tracks initial-value status, not "user fn ran."
541        let user_cleanup = rec.has_fired_once && rec.fn_id.is_some();
542        let fire_wipe = last && rec.resubscribable && rec.terminal.is_some();
543        // Phase G (D119/D120/D121, 2026-05-10): always clone the
544        // binding when last sub leaves so we can run the Core
545        // cache-clear after the existing hooks. The Arc::clone is
546        // cheap and dwarfed by the cost of the hooks themselves.
547        let binding = if last { Some(s.binding.clone()) } else { None };
548        (last, producer, user_cleanup, fire_wipe, binding)
549    };
550    if was_last_sub {
551        if let Some(binding) = binding {
552            if has_user_cleanup {
553                binding.cleanup_for(node_id, CleanupTrigger::OnDeactivation);
554            }
555            if is_producer {
556                // D229: hand the binding a `Core::unsubscribe`-capable
557                // closure over the `&C` already in scope (the owner's
558                // synchronous unsubscribe context — exactly what D225
559                // requires; no `Weak<C>`, no parameterless-`Drop`). The
560                // binding loops it over its recorded upstream
561                // `(NodeId, SubscriptionId)` pairs. Lock-released here, so
562                // the recursive `unsubscribe_sink` (re-entrant producer
563                // cascade) is safe — behaviour-identical to the retired
564                // `Subscription::Drop` cascade.
565                let unsub = |up_node: NodeId, up_sub: SubscriptionId| {
566                    unsubscribe_sink(core, up_node, up_sub);
567                };
568                binding.producer_deactivate(node_id, &unsub);
569            }
570            // D069: eager wipe — fires AFTER OnDeactivation so the
571            // user closure observes pre-wipe `store` (matches the
572            // existing "OnDeactivation runs before wipe on terminal
573            // reset" invariant covered by test 10). Idempotent —
574            // `HashMap::remove` on absent key is a no-op, so even
575            // if the wave already drained `pending_wipes` earlier,
576            // this fire is benign.
577            if fire_wipe {
578                binding.wipe_ctx(node_id);
579            }
580
581            // Phase G (D118/D119/D120/D121, 2026-05-10) — Core
582            // cache-clear on deactivation, mirroring TS `_deactivate`
583            // (`pure-ts/src/core/node.ts:2185-2297`):
584            //
585            //   1. user `cleanup_for(OnDeactivation)`  ← above
586            //   2. `producer_deactivate`                ← above
587            //   3. `wipe_ctx` (resubscribable+terminal) ← above
588            //   4. **NEW: Core cache-clear**            ← here
589            //
590            // Releases per-dep `prev_data` + `data_batch` retains +
591            // `dep_terminals` Error retains (the latter closes the
592            // long-standing "Non-resubscribable terminal Error
593            // handles leak via diamond cascade" porting-deferred
594            // entry — D121). Clears pause/replay buffers. Releases
595            // `cached` for compute nodes (R2.2.7 / R2.2.8 ROM:
596            // state nodes preserve cache; compute nodes clear).
597            // Keeps the per-node `terminal` slot intact (D121:
598            // producer-side terminal stays for late-subscriber
599            // R2.2.7.a reset or R2.2.7.b rejection).
600            //
601            // Lock-released release discipline: collect handles
602            // under the lock, drop the lock, fire `release_handle`
603            // outside (mirrors `Core::resume` Phase 2 + the
604            // existing F1 `set_deps` removed-handles release at
605            // node.rs:5003).
606            //
607            // Re-entrance safety (F1 / D123, /qa 2026-05-10): if the
608            // user `cleanup_for` / `producer_deactivate` / `wipe_ctx`
609            // hook re-subscribed via some path, `subscribers` is now
610            // non-empty. **Skip Phase G entirely in that case** —
611            // the new subscriber's handshake delivered the live
612            // `cache` handle to its sink and is holding a refcount
613            // share through `pending_notify`; clearing cache here
614            // would race with the new subscriber's wave and cause
615            // a use-after-release in bindings that reap registry
616            // slots at refcount-zero.
617            //
618            // Why this diverges from TS `_deactivate` (which clears
619            // unconditionally): TS runs the cleanup hook + cache
620            // clear as ONE sync block under the (implicit) JS
621            // single-thread mutex; there's no released-lock window
622            // for a re-subscribe to install a sub before the
623            // cache-clear runs. Rust's lock-released hook discipline
624            // (D045) opens that window, so the re-check is
625            // necessary to preserve refcount soundness.
626            //
627            // Re-acquire state lock atomically with the recheck so
628            // a concurrent thread cannot install a sub between the
629            // emptiness check and the cache mutation.
630            // F8 (/qa 2026-05-10): also take `op_scratch` so its
631            // retains release lock-released after the state-lock
632            // scope. Pre-F8 Phase G only released per-edge handles
633            // + the compute `cache`, leaving operator-internal
634            // retains (Last.latest, Scan.acc, Reduce.acc, etc.)
635            // in-place. For non-resubscribable nodes that never
636            // re-subscribe, this was a permanent leak —
637            // asymmetric with the per-edge cleanup. F8 closes that
638            // by taking the scratch alongside per-edge handles
639            // and calling `release_handles` lock-released.
640            //
641            // D-α (D028 full close, 2026-05-10): for resubscribable
642            // operator nodes, take the OLD scratch AND build a
643            // FRESH scratch via `make_op_scratch` (lock-held, but
644            // `binding.retain_handle` is a leaf operation per
645            // op_state.rs:69-80 docs), install the fresh scratch
646            // on `rec.op_scratch`, and push the old scratch to
647            // `pending_scratch_release` for deferred release. The
648            // queue drains on the next `reset_for_fresh_lifecycle`
649            // (after Phase 2 takes fresh retains — preserves the
650            // C-3 /qa P1 seed-aliasing-acc invariant) or on
651            // `Drop for CoreState`. The fresh install gives
652            // re-activation correct counter / acc state (Take.taken
653            // back to 0, Scan.acc back to seed, etc.) matching TS
654            // Lock 6.D ("resets on every deactivation").
655            let (to_release, scratch_to_release): (
656                Vec<HandleId>,
657                Option<Box<dyn crate::op_state::OperatorScratch>>,
658            ) = {
659                // Step 2b-ii-B: route to the node's shard (drop
660                // runs outside any wave; grouped node's record is
661                // in shard `g` — without this the scratch/refcount
662                // cleanup would no-op on `DEFAULT_SHARD`).
663                let mut s = St::new(core);
664                if let Some(rec) = s.nodes.get_mut(&node_id) {
665                    // F1 re-entrance check.
666                    if !rec.subscribers.is_empty() {
667                        // A user hook re-subscribed during the
668                        // lock-released window; the new lifecycle
669                        // owns this node now. Phase G is a no-op.
670                        return;
671                    }
672                    let mut handles: Vec<HandleId> = Vec::new();
673                    // Per-dep state. Empty for state nodes.
674                    for dr in &mut rec.dep_records {
675                        if dr.prev_data != NO_HANDLE {
676                            handles.push(dr.prev_data);
677                            dr.prev_data = NO_HANDLE;
678                        }
679                        for h in dr.data_batch.drain(..) {
680                            handles.push(h);
681                        }
682                        // D121: per-edge terminal-Error retain
683                        // released here. Closes the cascade leak.
684                        // The producer's own `rec.terminal` slot
685                        // stays intact (preserved below).
686                        if let Some(TerminalKind::Error(h)) = dr.terminal {
687                            handles.push(h);
688                        }
689                        dr.terminal = None;
690                        dr.dirty = false;
691                        dr.involved_this_wave = false;
692                    }
693                    // Pause buffer DATA / replay buffer.
694                    if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
695                        for msg in buffer.drain(..) {
696                            if let Some(h) = msg.payload_handle() {
697                                handles.push(h);
698                            }
699                        }
700                    }
701                    for h in rec.replay_buffer.drain(..) {
702                        handles.push(h);
703                    }
704                    // Pause locks drained → node back to Active.
705                    rec.pause_state = PauseState::Active;
706                    // R2.2.7 / R2.2.8 ROM: state nodes preserve
707                    // cache; compute (fn or op) nodes clear.
708                    // D119: state nodes keep `cached` because the
709                    // value is intrinsic and non-volatile;
710                    // resubscribe sees the same value.
711                    let is_compute = rec.fn_id.is_some() || rec.op.is_some();
712                    if is_compute && rec.cache != NO_HANDLE {
713                        handles.push(rec.cache);
714                        rec.cache = NO_HANDLE;
715                    }
716                    // Reset wave + lifecycle state so reactivation
717                    // begins fresh. `terminal` STAYS (D121).
718                    rec.has_fired_once = false;
719                    rec.dirty = false;
720                    rec.involved_this_wave = false;
721                    // §10.13 perf (D047): reset received_mask — all deps back
722                    // to sentinel on resubscribe.
723                    rec.received_mask = 0;
724                    // §10.3 perf (Slice V1): reset involved_mask.
725                    rec.involved_mask = 0;
726                    if rec.is_dynamic {
727                        rec.tracked.clear();
728                    }
729                    // F8 + D-α: op_scratch handling forks by
730                    // `resubscribable`. Non-resubscribable: eager
731                    // release (F8 path — there's no future reset
732                    // so the leak ends here). Resubscribable +
733                    // has-op: take old AND install fresh; defer
734                    // old release to the queue.
735                    let scratch = if !rec.resubscribable {
736                        std::mem::take(&mut rec.op_scratch)
737                    } else if let Some(op) = rec.op {
738                        // Slice C-3 /qa P1 (retain-before-release):
739                        // build fresh scratch FIRST (this calls
740                        // `binding.retain_handle` for any seed /
741                        // default the op carries), THEN swap. The
742                        // old scratch's release is deferred to the
743                        // `pending_scratch_release` queue (drained
744                        // on next reset_for_fresh_lifecycle or
745                        // Drop for CoreState).
746                        //
747                        // make_op_scratch is fallible only for
748                        // OperatorSeedSentinel; the OperatorOp
749                        // stored on NodeRecord passed validation
750                        // at registration time, so the unwrap
751                        // here is structurally guaranteed (mirrors
752                        // reset_for_fresh_lifecycle).
753                        //
754                        // Uses the binding-explicit static variant
755                        // because we have only `&dyn BindingBoundary`
756                        // here (Subscription::Drop holds no Core).
757                        let new_scratch = Core::make_op_scratch_with_binding(&*binding, op)
758                                .expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time");
759                        let old = std::mem::replace(&mut rec.op_scratch, new_scratch);
760                        if let Some(old_box) = old {
761                            s.shared.pending_scratch_release.push(old_box);
762                        }
763                        // The "scratch_to_release" for lock-released
764                        // release stays None here — the resubscribable
765                        // case routes through the queue.
766                        None
767                    } else {
768                        // Resubscribable but no op (e.g. derived
769                        // compute / dynamic / state). Nothing to
770                        // do for op_scratch.
771                        None
772                    };
773                    (handles, scratch)
774                } else {
775                    // Node destroyed between lock-released hooks
776                    // and this re-acquire (terminate cascade or
777                    // graph removal) — nothing to clear.
778                    (Vec::new(), None)
779                }
780            };
781            // Release handles lock-released. Binding may re-enter
782            // Core during `release_handle` (final-Drop callbacks
783            // are user code).
784            for h in to_release {
785                binding.release_handle(h);
786            }
787            // F8: release operator-scratch handles lock-released
788            // (mirrors `ScratchReleaseGuard::drop` ordering).
789            if let Some(mut scratch) = scratch_to_release {
790                scratch.release_handles(&*binding);
791            }
792        }
793    }
794}
795
796// S2b (D225): no `impl Drop for Subscription` and no `Subscription`
797// Send+Sync assertion — the core-level RAII handle is retired. The sole
798// deregister path is the synchronous owner-invoked `Core::unsubscribe`
799// (which delegates to the shared `unsubscribe_sink` free fn below);
800// binding-layer RAII wraps it where the holder co-owns the Core.
801
802/// A subscriber callback. `Send + Sync` so the Core can fire it from any
803/// thread; `Fn` (not `FnMut`) so multiple references coexist — capture
804/// mutable state in `Mutex<T>` or atomics on the binding side.
805// D246/S2c/D248: single-owner ⇒ sinks fire owner-side on the one
806// thread that drives the `Core`; the `Send + Sync` bound was
807// shared-Core-era legacy. Dropped — `Core` (which owns the subscriber
808// map) is consequently `!Send + !Sync`, the actor-model shape (the only
809// cross-thread bridge is `Arc<CoreMailbox>` for id-only timer posts).
810pub type Sink = Arc<dyn Fn(&[Message])>;
811
812// ---------------------------------------------------------------------------
813// PAUSE/RESUME state — §10.2 of the rust-port session doc
814// ---------------------------------------------------------------------------
815
816/// Per-node pause state.
817///
818/// Replaces the four TS fields (`_pauseLocks`, `_pauseBuffer`,
819/// `_pauseDroppedCount`, `_pauseStartNs`) with a single enum where
820/// the buffered fields are unreachable in the [`Self::Active`] variant —
821/// the compiler refuses access. Per §10.2 simplification.
822///
823/// # Invariants
824///
825/// - `Active` ⇔ no lockId held.
826/// - `Paused { locks, .. }` ⇔ `!locks.is_empty()`.
827/// - Buffered messages are tier 3 (DATA/RESOLVED) and tier 4 (INVALIDATE)
828///   only. Other tiers pass through immediately even while paused.
829/// - `dropped` counts messages that fell out the front of `buffer` due to
830///   the Core-global `pause_buffer_cap`; it is reported on resume so callers
831///   can detect overflow without re-tracking it externally.
832#[derive(Debug)]
833pub(crate) enum PauseState {
834    Active,
835    Paused {
836        /// Active lock holders. `SmallVec` keeps the common 1–2 lock case
837        /// stack-allocated. Replaces `Set<unknown>` from TS.
838        locks: SmallVec<[LockId; 2]>,
839        /// Buffered tier-3/tier-4 outgoing messages, in arrival order.
840        /// Replayed on the final RESUME.
841        buffer: VecDeque<Message>,
842        /// Count of messages dropped from the front when `buffer.len()` would
843        /// exceed `pause_buffer_cap`. Cleared on final RESUME (next pause
844        /// cycle starts fresh).
845        dropped: u32,
846        /// Wall-clock-monotonic ns when the lock first transitioned this node
847        /// from `Active` to `Paused`. Used by R1.3.8.c overflow ERROR
848        /// synthesis to compute `lock_held_duration_ms` in the diagnostic
849        /// payload (Slice F, A3 — 2026-05-07).
850        started_at_ns: u64,
851        /// True after the first overflow event in this pause cycle has been
852        /// reported via [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
853        /// Subsequent overflows in the same cycle don't re-emit ERROR
854        /// (canonical R1.3.8.c: "once per overflow event"). Cleared on
855        /// final RESUME (next pause cycle starts fresh).
856        overflow_reported: bool,
857        /// Default-mode bookkeeping (Slice F audit close, 2026-05-07).
858        /// Set to `true` when an upstream dep delivery arrives while this
859        /// node is paused with [`PausableMode::Default`]. On final RESUME,
860        /// if `true`, the node is added back to `pending_fires` so the fn
861        /// fires once with the consolidated dep state. Always `false` for
862        /// `ResumeAll` mode (the buffered messages are the consolidation
863        /// mechanism there). Cleared on final RESUME.
864        pending_wave: bool,
865    },
866}
867
868impl PauseState {
869    pub(crate) fn is_paused(&self) -> bool {
870        matches!(self, Self::Paused { .. })
871    }
872
873    fn lock_count(&self) -> usize {
874        match self {
875            Self::Active => 0,
876            Self::Paused { locks, .. } => locks.len(),
877        }
878    }
879
880    fn contains_lock(&self, lock_id: LockId) -> bool {
881        match self {
882            Self::Active => false,
883            Self::Paused { locks, .. } => locks.contains(&lock_id),
884        }
885    }
886
887    /// Add a lock; transitions Active → Paused on first lock. Idempotent on
888    /// duplicate lock_id (matches TS convention; spec is silent on the case).
889    fn add_lock(&mut self, lock_id: LockId) {
890        match self {
891            Self::Active => {
892                let mut locks = SmallVec::new();
893                locks.push(lock_id);
894                *self = Self::Paused {
895                    locks,
896                    buffer: VecDeque::new(),
897                    dropped: 0,
898                    started_at_ns: monotonic_ns(),
899                    overflow_reported: false,
900                    pending_wave: false,
901                };
902            }
903            Self::Paused { locks, .. } => {
904                if !locks.contains(&lock_id) {
905                    locks.push(lock_id);
906                }
907            }
908        }
909    }
910
911    /// Mark that an upstream dep delivered DATA to a node paused with
912    /// [`PausableMode::Default`]. The node will re-enter `pending_fires`
913    /// on final RESUME via [`Self::take_pending_wave`].
914    pub(crate) fn mark_pending_wave(&mut self) {
915        if let Self::Paused { pending_wave, .. } = self {
916            *pending_wave = true;
917        }
918    }
919
920    /// Read and clear the `pending_wave` flag. Called from
921    /// [`Core::resume`] when transitioning Paused → Active. Returns `true`
922    /// only if the node was paused with `pending_wave` set.
923    pub(crate) fn take_pending_wave(&mut self) -> bool {
924        if let Self::Paused { pending_wave, .. } = self {
925            std::mem::replace(pending_wave, false)
926        } else {
927            false
928        }
929    }
930
931    /// Remove a lock; if the lockset becomes empty, transition Paused →
932    /// Active and return the buffered messages for replay (along with the
933    /// dropped count for diagnostics). Unknown lock_id is an idempotent
934    /// no-op (matches TS, R1.2.6 implicit).
935    fn remove_lock(&mut self, lock_id: LockId) -> Option<(VecDeque<Message>, u32)> {
936        match self {
937            Self::Active => None,
938            Self::Paused { locks, .. } => {
939                if let Some(idx) = locks.iter().position(|l| *l == lock_id) {
940                    locks.swap_remove(idx);
941                }
942                if locks.is_empty() {
943                    let prev = std::mem::replace(self, Self::Active);
944                    if let Self::Paused {
945                        buffer, dropped, ..
946                    } = prev
947                    {
948                        return Some((buffer, dropped));
949                    }
950                }
951                None
952            }
953        }
954    }
955
956    /// Append a message to the buffer; if the buffer would exceed `cap`,
957    /// pop from the front (oldest-first), increment `dropped`, and return
958    /// the dropped messages so the caller can release any payload handles
959    /// they reference. `cap` of `None` means unbounded.
960    ///
961    /// Returns [`PushBufferedResult`] carrying both the dropped messages
962    /// (for refcount release) and whether this push triggered the FIRST
963    /// overflow event in the current pause cycle (for R1.3.8.c ERROR
964    /// synthesis — the caller schedules a single ERROR per cycle).
965    ///
966    /// Note: refcount management for the message's payload handle is the
967    /// caller's responsibility — see [`Core::queue_notify`] for the
968    /// retain/release discipline. The buffer itself is just a message
969    /// container; refcounts cross the binding boundary.
970    pub(crate) fn push_buffered(&mut self, msg: Message, cap: Option<usize>) -> PushBufferedResult {
971        let mut result = PushBufferedResult::default();
972        if let Self::Paused {
973            buffer,
974            dropped,
975            overflow_reported,
976            ..
977        } = self
978        {
979            buffer.push_back(msg);
980            if let Some(c) = cap {
981                while buffer.len() > c {
982                    if let Some(dropped_msg) = buffer.pop_front() {
983                        result.dropped_msgs.push(dropped_msg);
984                    }
985                    *dropped = dropped.saturating_add(1);
986                }
987            }
988            // R1.3.8.c (Slice F, A3): flag first overflow this cycle.
989            if !result.dropped_msgs.is_empty() && !*overflow_reported {
990                *overflow_reported = true;
991                result.first_overflow_this_cycle = true;
992            }
993        }
994        result
995    }
996
997    /// Snapshot the diagnostic for an R1.3.8.c overflow ERROR synthesis.
998    /// Returns `(dropped_count, lock_held_ns)`. Caller must already know
999    /// the configured cap (it's a Core-global value, not per-PauseState).
1000    pub(crate) fn overflow_diagnostic(&self) -> Option<(u32, u64)> {
1001        match self {
1002            Self::Active => None,
1003            Self::Paused {
1004                dropped,
1005                started_at_ns,
1006                ..
1007            } => {
1008                let lock_held_ns = monotonic_ns().saturating_sub(*started_at_ns);
1009                Some((*dropped, lock_held_ns))
1010            }
1011        }
1012    }
1013}
1014
1015/// Return shape for [`PauseState::push_buffered`]. Carries both the dropped
1016/// messages (for refcount release) and an "is this the first overflow this
1017/// cycle" flag (for R1.3.8.c ERROR synthesis scheduling).
1018#[derive(Default)]
1019pub(crate) struct PushBufferedResult {
1020    pub(crate) dropped_msgs: Vec<Message>,
1021    pub(crate) first_overflow_this_cycle: bool,
1022}
1023
1024/// Pending R1.3.8.c overflow ERROR synthesis entry. Recorded by
1025/// [`Core::queue_notify`] when the pause buffer first overflows in a cycle;
1026/// drained at wave-end after the lock-released call to
1027/// [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
1028///
1029/// `configured_max` is captured at scheduling time rather than read at
1030/// drain — the user could change `pause_buffer_cap` between schedule and
1031/// drain, and the diagnostic reads "the cap that was in effect when the
1032/// overflow happened."
1033#[derive(Debug, Clone)]
1034pub(crate) struct PendingPauseOverflow {
1035    pub(crate) node_id: NodeId,
1036    pub(crate) dropped_count: u32,
1037    pub(crate) configured_max: usize,
1038    pub(crate) lock_held_ns: u64,
1039}
1040
1041/// Error returned when a same-thread partition acquire violates
1042/// ascending order. Phase H+ STRICT variant (D115).
1043///
1044/// The ascending-order protocol prevents AB/BA deadlocks between
1045/// threads. When this error surfaces, the caller should defer the
1046/// operation to wave-end (when no partitions are held) and retry.
1047/// **Vestigial (§7, D208–D211, 2026-05-16).** The union-find ascending-
1048/// order acquisition discipline that this represented is deleted:
1049/// scheduling groups are user-declared + static and the wave engine
1050/// acquires the whole touched-group set sorted **upfront**, so there is
1051/// no incremental mid-wave acquisition that could violate ordering.
1052/// This type is **never constructed** post-§7. It is retained only so
1053/// `SubscribeError::PartitionOrderViolation` and the `Err(_)` match arms
1054/// in `graphrefly-operators` compile unchanged (D211 minimal-churn);
1055/// removing the type + those arms is a tracked downstream-churn
1056/// follow-on (`porting-deferred.md`).
1057#[derive(Error, Debug, Clone, PartialEq, Eq)]
1058#[error("vestigial PartitionOrderViolation (never constructed post-§7)")]
1059pub struct PartitionOrderViolation {
1060    /// Vestigial. Unused; retained for struct-shape stability only.
1061    pub attempted: u64,
1062    /// Vestigial. Unused; retained for struct-shape stability only.
1063    pub max_held: u64,
1064}
1065
1066/// Errors returnable by [`Core::try_subscribe`].
1067///
1068/// `Core::subscribe` (the panic-on-error variant) panics on either
1069/// case; `try_subscribe` returns these so operators (zip / concat /
1070/// race / take_until / merge / switch_map / etc.) can match on the
1071/// variant — defer for [`Self::PartitionOrderViolation`], skip the
1072/// source for [`Self::TornDown`].
1073///
1074/// Per canonical spec R2.2.7.a / R2.2.7.b (D118, 2026-05-10).
1075#[derive(Error, Debug, Clone, PartialEq, Eq)]
1076pub enum SubscribeError {
1077    /// Phase H+ STRICT (D115): partition acquisition would violate the
1078    /// ascending-order protocol. Caller should defer the subscribe to
1079    /// wave-end via the producer-pattern deferred-op queue.
1080    #[error(transparent)]
1081    PartitionOrderViolation(#[from] PartitionOrderViolation),
1082
1083    /// R2.2.7.b (D118, 2026-05-10): the node is non-resubscribable AND
1084    /// has terminated (`[COMPLETE]` or `[ERROR, h]` was delivered).
1085    /// The stream is permanently over; subscribe is rejected.
1086    /// Resubscribable terminal nodes do NOT surface this error — they
1087    /// reset to a fresh lifecycle on subscribe per R2.2.7.a, regardless
1088    /// of TEARDOWN state.
1089    #[error(
1090        "subscribe({node:?}): node is non-resubscribable and has terminated; \
1091         the stream is permanently over (R2.2.7.b)"
1092    )]
1093    TornDown {
1094        /// The non-resubscribable terminal node that rejected the subscribe.
1095        node: NodeId,
1096    },
1097}
1098
1099/// A producer-pattern operation deferred because it would have
1100/// violated the ascending partition-order protocol (Phase H+ STRICT,
1101/// D115). Drained by `BatchGuard::drop` after wave_guards are
1102/// released (no partitions held → safe to acquire any partition).
1103///
1104/// Variants with `HandleId` fields hold a binding-side retain taken
1105/// at defer time. The drain path releases this retain after the
1106/// operation fires; the panic-discard path releases it without firing.
1107pub enum DeferredProducerOp {
1108    /// Deferred `Core::emit`. Retain held on `handle`.
1109    Emit { node_id: NodeId, handle: HandleId },
1110    /// Deferred `Core::complete`. No handle.
1111    Complete { node_id: NodeId },
1112    /// Deferred `Core::error`. Retain held on `handle`.
1113    Error { node_id: NodeId, handle: HandleId },
1114    /// Generic deferred callback (e.g., deferred subscribe from
1115    /// producer build closure). The closure captures everything it
1116    /// needs; `graphrefly-core` doesn't depend on operator-specific
1117    /// types. The closure is responsible for its own retain discipline.
1118    Callback(Box<dyn FnOnce() + Send>),
1119}
1120
1121/// Errors returnable by [`Core::pause`] and [`Core::resume`].
1122#[derive(Error, Debug, Clone, PartialEq)]
1123pub enum PauseError {
1124    #[error("pause/resume: unknown node {0:?}")]
1125    UnknownNode(NodeId),
1126}
1127
1128/// Errors returnable by [`Core::up`] (canonical R1.4.1).
1129#[derive(Error, Debug, Clone, PartialEq)]
1130pub enum UpError {
1131    /// Node id is not registered.
1132    #[error("up: unknown node {0:?}")]
1133    UnknownNode(NodeId),
1134    /// Tier-3 (DATA / RESOLVED) and tier-5 (COMPLETE / ERROR) are
1135    /// downstream-only per R1.4.1; rejected at the boundary.
1136    #[error(
1137        "up: tier {tier} is forbidden upstream — value (tier 3) and \
1138         terminal-lifecycle (tier 5) planes are downstream-only per R1.4.1"
1139    )]
1140    TierForbidden { tier: u8 },
1141}
1142
1143/// Errors returnable by [`Core::register`] and its sugar wrappers
1144/// ([`Core::register_state`], [`Core::register_producer`],
1145/// [`Core::register_derived`], [`Core::register_dynamic`],
1146/// [`Core::register_operator`]).
1147///
1148/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
1149/// errors so that callers can recover from contract violations without
1150/// process abort. Every variant corresponds to a construction-time
1151/// invariant that the caller is responsible for upholding; the dispatcher
1152/// rejects the registration before any reactive state is created (so
1153/// there is no `Message::Error` channel through which to surface the
1154/// failure — these are imperative-layer errors, not reactive ones).
1155///
1156/// All variants are zero-side-effect: when [`Core::register`] returns
1157/// `Err`, no node has been added to the graph and any handle retains
1158/// taken on the way in (e.g. operator scratch seed retains via
1159/// [`BindingBoundary::retain_handle`]) have been released.
1160#[derive(Error, Debug, Clone, PartialEq, Eq)]
1161pub enum RegisterError {
1162    /// One of the supplied dep ids is not a registered node.
1163    #[error("register: unknown dep {0:?}")]
1164    UnknownDep(NodeId),
1165
1166    /// `op` was supplied (operator node) but `deps` was empty. Operator
1167    /// nodes need at least one dep — for subscription-managed combinators
1168    /// with no declared deps, use [`Core::register_producer`] instead.
1169    #[error(
1170        "register: operator nodes require at least one dep — \
1171         use register_producer for subscription-managed combinators"
1172    )]
1173    OperatorWithoutDeps,
1174
1175    /// [`NodeOpts::initial`] was set to a real handle but the registration
1176    /// shape is not a state node (state nodes are `deps.is_empty() &&
1177    /// fn_id.is_none() && op.is_none()`). Initial cache only makes sense
1178    /// for state nodes.
1179    #[error("register: NodeOpts::initial only valid for state nodes (no deps + no fn + no op)")]
1180    InitialOnlyForStateNodes,
1181
1182    /// A supplied dep is terminal (COMPLETE / ERROR) AND not
1183    /// resubscribable. Adding it would create a permanent wedge — the dep
1184    /// will never re-emit, so the registered node would be stuck.
1185    /// Mirrors [`SetDepsError::TerminalDep`] at registration time.
1186    #[error(
1187        "register: dep {0:?} is terminal and not resubscribable; \
1188         mark it resubscribable before terminating, or remove it from the dep list"
1189    )]
1190    TerminalDep(NodeId),
1191
1192    /// A stateful operator ([`OperatorOp::Scan`] / [`OperatorOp::Reduce`])
1193    /// was registered with `seed = NO_HANDLE`. R2.5.3 first-run gate
1194    /// requires the seed to be a real handle so that the operator can
1195    /// emit on its first fire.
1196    #[error("register: operator seed must be a real handle (R2.5.3); got NO_HANDLE")]
1197    OperatorSeedSentinel,
1198}
1199
1200/// Errors returnable by [`Core::set_pausable_mode`].
1201///
1202/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
1203/// errors. Same imperative-layer error model as [`RegisterError`].
1204#[derive(Error, Debug, Clone, PartialEq, Eq)]
1205pub enum SetPausableModeError {
1206    /// `node_id` is not a registered node.
1207    #[error("set_pausable_mode: unknown node {0:?}")]
1208    UnknownNode(NodeId),
1209    /// The node currently holds at least one pause lock. Changing pausable
1210    /// mode mid-pause would lose buffered content or strand a
1211    /// `pending_wave` flag — resume all locks first.
1212    #[error(
1213        "set_pausable_mode: cannot change pausable mode while paused; \
1214         resume all locks first"
1215    )]
1216    WhilePaused,
1217}
1218
1219/// Per-dep record. Replaces the parallel `deps` / `dep_handles` /
1220/// `dep_terminals` vectors from v1. Canonical spec R2.9.b alignment.
1221///
1222/// Each entry tracks one dep's lifecycle state, wave-scoped batch data,
1223/// and cross-wave `prev_data` for `ctx.prevData` access.
1224pub(crate) struct DepRecord {
1225    /// The dep node this record tracks.
1226    pub(crate) node: NodeId,
1227    /// Last DATA handle from the end of the previous wave. [`NO_HANDLE`]
1228    /// means the dep has never emitted DATA.
1229    pub(crate) prev_data: HandleId,
1230    /// Per-dep dirty flag — awaiting DATA/RESOLVED for current wave.
1231    pub(crate) dirty: bool,
1232    /// Per-dep involved-this-wave flag. Distinguishes:
1233    /// - `involved && data_batch.is_empty()` → dep settled RESOLVED
1234    /// - `!involved && data_batch.is_empty()` → dep was not in this wave
1235    pub(crate) involved_this_wave: bool,
1236    /// DATA handles accumulated this wave. Outside `batch()` scope, at most
1237    /// 1 element. Inside `batch()`, K emits on the source produce K entries
1238    /// per R1.3.6.b coalescing. Each handle holds a `retain_handle` share
1239    /// taken at `deliver_data_to_consumer` time; released at wave-end
1240    /// rotation in `clear_wave_state`.
1241    pub(crate) data_batch: SmallVec<[HandleId; 1]>,
1242    /// Terminal state for this dep. `None` = dep is live.
1243    /// `Some` = dep emitted COMPLETE/ERROR. When ALL entries are Some,
1244    /// the node auto-cascades per Lock 2.B (ERROR dominates COMPLETE).
1245    pub(crate) terminal: Option<TerminalKind>,
1246}
1247
1248impl DepRecord {
1249    fn new(node: NodeId) -> Self {
1250        Self {
1251            node,
1252            prev_data: NO_HANDLE,
1253            dirty: false,
1254            involved_this_wave: false,
1255            data_batch: SmallVec::new(),
1256            terminal: None,
1257        }
1258    }
1259}
1260
1261/// Internal node record. Mirrors `core.ts:132–154` post-D030 unification.
1262///
1263/// **Kind is derived, not stored** (D030, Slice D). `(dep_records.is_empty(),
1264/// fn_id, op, is_dynamic)` uniquely identifies the kind — see [`NodeKind`].
1265/// Helper methods (`is_state()`, `is_producer()`, `is_compute()`,
1266/// `is_operator()`, `skips_auto_cascade()`, `kind()`) cover the common
1267/// predicates without unpacking via [`Core::kind_of`].
1268///
1269/// The 5 bool fields (`has_fired_once`, `dirty`, `involved_this_wave`,
1270/// `has_received_teardown`, `resubscribable`, `is_dynamic`) each represent
1271/// an orthogonal concern. `is_dynamic` is constant per node (set at
1272/// register time); the others are mutable lifecycle state. Collapsing
1273/// them into a bitfield would obscure intent.
1274#[allow(clippy::struct_excessive_bools)]
1275pub(crate) struct NodeRecord {
1276    /// Per-dep records. Replaces the old parallel `deps` / `dep_handles` /
1277    /// `dep_terminals` vecs. Dep NodeIds derived via `dep_ids()`.
1278    pub(crate) dep_records: Vec<DepRecord>,
1279    /// User-fn id for closure-form dispatch. `Some` for Derived / Dynamic /
1280    /// Producer; `None` for State / Operator. (Operator dispatch goes via
1281    /// [`Self::op`] instead.)
1282    pub(crate) fn_id: Option<FnId>,
1283    /// Operator discriminant for typed-op dispatch. `Some` for Operator
1284    /// nodes; `None` otherwise. Mutually exclusive with `fn_id` (a node is
1285    /// either closure-form OR typed-op, never both).
1286    pub(crate) op: Option<OperatorOp>,
1287    /// True for Dynamic nodes (R2.5.3 — fn declares actually-tracked dep
1288    /// indices per fire). False for everything else. Only meaningful when
1289    /// `fn_id.is_some()` AND `!dep_records.is_empty()`.
1290    pub(crate) is_dynamic: bool,
1291    pub(crate) equals: EqualsMode,
1292
1293    // Mutable state
1294    pub(crate) cache: HandleId,
1295    pub(crate) has_fired_once: bool,
1296    pub(crate) subscribers: HashMap<SubscriptionId, Sink>,
1297    /// Monotonic counter bumped on every mutation of [`Self::subscribers`]
1298    /// (insert on subscribe, remove on `Subscription::Drop`, remove on
1299    /// handshake-panic cleanup). Used by
1300    /// [`crate::batch::Core::queue_notify`] to detect mid-wave subscriber-
1301    /// set changes and start a fresh `PendingBatch` with an updated sink
1302    /// snapshot — closes D2 (Slice X4, 2026-05-08): the late-subscriber
1303    /// and multi-emit-per-wave gap where the pre-fix per-node single
1304    /// snapshot meant a sub installed between two emits to the same node
1305    /// in one wave was invisible to the second emit's flush.
1306    ///
1307    /// Per-node (not per-Core) so that a subscribe to node A doesn't
1308    /// invalidate snapshot reuse for node B's pending batch in the same
1309    /// wave.
1310    pub(crate) subscribers_revision: u64,
1311    /// For dynamic nodes: which dep indices fn actually tracks.
1312    /// For static derived: all indices, populated at construction.
1313    pub(crate) tracked: HashSet<usize>,
1314
1315    // Wave-scoped state — cleared at wave end.
1316    pub(crate) dirty: bool,
1317    pub(crate) involved_this_wave: bool,
1318
1319    /// Per-node pause state. Default `Active`. See [`PauseState`].
1320    pub(crate) pause_state: PauseState,
1321    /// Pause behavior mode (canonical-spec §2.6). Set at registration via
1322    /// [`NodeOpts::pausable`]. Default [`PausableMode::Default`] suppresses
1323    /// fn-fire while paused and consolidates N pause-window dep deliveries
1324    /// into one fn-fire on RESUME; `ResumeAll` buffers tier-3/4 outgoing
1325    /// for verbatim replay; `Off` ignores PAUSE entirely. See
1326    /// [`PausableMode`].
1327    pub(crate) pausable: PausableMode,
1328    /// Replay buffer cap (R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
1329    /// `None` disables; `Some(N)` keeps a circular VecDeque of the last N
1330    /// DATA-handle emissions for late-subscriber replay. Each handle in
1331    /// the buffer owns one binding-side retain share, released on evict
1332    /// (cap exceeded) or in `Drop for CoreState`.
1333    pub(crate) replay_buffer_cap: Option<usize>,
1334    pub(crate) replay_buffer: VecDeque<HandleId>,
1335
1336    /// Terminal lifecycle state for THIS node's outgoing stream. Once set,
1337    /// further `emit` calls are silent no-ops, fn no longer fires, and only
1338    /// the terminal message has been queued downstream.
1339    pub(crate) terminal: Option<TerminalKind>,
1340    /// True after the first TEARDOWN has been processed for this node
1341    /// (R2.6.4 / Lock 6.F). Subsequent TEARDOWN deliveries are idempotent
1342    /// — the auto-prepended COMPLETE only fires on the first one. Without
1343    /// this flag, a redundant TEARDOWN delivered via the cascade plus an
1344    /// explicit `core.teardown(node)` would re-emit `[COMPLETE, TEARDOWN]`
1345    /// to subscribers per delivery, which is incorrect.
1346    pub(crate) has_received_teardown: bool,
1347    /// Per R2.2.7 / R2.5.3 — resubscribable terminal lifecycle.
1348    /// When `true` AND `terminal == Some(...)`, a fresh subscribe call
1349    /// will reset the node: clear `terminal`, `has_fired_once`,
1350    /// `has_received_teardown`, all dep_records to sentinel, and drain the
1351    /// pause lockset. Default `false`.
1352    pub(crate) resubscribable: bool,
1353    /// Meta companion nodes attached to this node per R1.3.9.d. When this
1354    /// node tears down, its meta companions are torn down FIRST (before
1355    /// the main node's auto-COMPLETE + TEARDOWN wire emission), so
1356    /// observers see companions terminate before the parent. The ordering
1357    /// is load-bearing — meta nodes typically subscribe to parent state
1358    /// that becomes inconsistent during the parent's destruction phase.
1359    pub(crate) meta_companions: Vec<NodeId>,
1360    /// R5.4 / D011 partial-mode: when `true`, fire_fn skips the R2.5.3
1361    /// first-run gate — the node fires as soon as ANY dep delivers a
1362    /// real handle, even if other deps remain sentinel. Defaults to
1363    /// `false` (gated). Lifted into Core for operator support; for
1364    /// State/Derived/Dynamic nodes the field is settable but the gated
1365    /// path remains the typical caller default.
1366    pub(crate) partial: bool,
1367    /// D263 — when `true`, `fire_fn`'s first-run gate (R2.5.3) counts a
1368    /// terminal dep as "real input" so the node can fire on a
1369    /// COMPLETE-without-DATA from any single dep. Mirrors `fire_operator`'s
1370    /// unconditional `dr.terminal.is_some()` clause (gated here on the
1371    /// per-node opt-in so existing `fire_fn` consumers preserve the
1372    /// historical sentinel-hold behaviour by default). Substrate-internal
1373    /// per D263 — not surfaced on the `Impl` parity contract yet.
1374    pub(crate) terminal_as_real_input: bool,
1375    /// Topological rank — 1 + max dep rank. Nodes with no deps have rank 0.
1376    /// Used by `pick_next_fire` for O(|pending_fires|) scheduling instead
1377    /// of O(V) transitive BFS. Computed at registration; recomputed on
1378    /// `set_deps` for the modified node (consumer propagation deferred —
1379    /// see `porting-deferred.md`). §10 perf optimization (D047, Slice U).
1380    pub(crate) topo_rank: u32,
1381    /// Bitmask for first-run gate — bit i set when dep i delivers first
1382    /// DATA. `has_sentinel_deps()` becomes a single integer compare for
1383    /// ≤64 deps. Falls back to `iter().any()` when `dep_count > 64`.
1384    /// Reset on resubscribe. §10.13 perf optimization (D047, Slice U).
1385    pub(crate) received_mask: u64,
1386    /// §10.3 diamond resolution bitmask — bit i set when dep i is
1387    /// involved in the current wave (DATA delivered). Mirrors
1388    /// `received_mask` pattern. Cleared to 0 at wave end instead of
1389    /// iterating all deps. For ≤64 deps, `DepBatch::involved` can be
1390    /// derived via `(involved_mask >> dep_idx) & 1 != 0`. For >64 deps,
1391    /// falls back to per-dep `DepRecord::involved_this_wave` field.
1392    /// §10.3 perf optimization (Slice V1).
1393    pub(crate) involved_mask: u64,
1394    /// Generic per-operator scratch slot (Slice C-3, D026). Replaces
1395    /// the typed `operator_state: HandleId` field used by Slices C-1 / C-2.
1396    /// `None` for non-operator kinds and operators with no cross-wave
1397    /// state (Map / Filter / Combine / WithLatestFrom / Merge); `Some`
1398    /// for stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
1399    /// [`DistinctUntilChanged`] / [`Pairwise`] / [`Take`] / [`Skip`] /
1400    /// [`TakeWhile`] / [`Last`]).
1401    ///
1402    /// The boxed value implements
1403    /// [`OperatorScratch`](crate::op_state::OperatorScratch); its
1404    /// `release_handles` method is called from
1405    /// [`reset_for_fresh_lifecycle`] (resubscribable terminal cycle) and
1406    /// from [`Drop for CoreState`].
1407    ///
1408    /// **Refcount discipline:** the state struct owns whatever handle
1409    /// shares it stores (e.g., [`ScanState::acc`](crate::op_state::ScanState::acc),
1410    /// [`LastState::latest`](crate::op_state::LastState::latest)).
1411    /// Per-fire helpers retain the new value before releasing the old;
1412    /// `release_handles` releases the current shares at end-of-life.
1413    pub(crate) op_scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1414}
1415
1416impl NodeRecord {
1417    // ---- Kind predicates (D030 — derived from field shape) ----
1418
1419    /// True iff this is a state node (no deps, no fn, no op).
1420    pub(crate) fn is_state(&self) -> bool {
1421        self.dep_records.is_empty() && self.fn_id.is_none() && self.op.is_none()
1422    }
1423
1424    /// True iff this is a producer node (no deps + has fn + no op).
1425    /// Producers fire fn once on first subscribe; cleanup fires via
1426    /// [`BindingBoundary::producer_deactivate`] (D031, Slice D).
1427    pub(crate) fn is_producer(&self) -> bool {
1428        self.dep_records.is_empty() && self.fn_id.is_some() && self.op.is_none()
1429    }
1430
1431    /// True iff this is a compute node (Derived / Dynamic / Operator) —
1432    /// has at least one dep AND either a fn or an op.
1433    #[allow(dead_code)] // Convenience predicate; callers may use is_state/is_producer instead.
1434    pub(crate) fn is_compute(&self) -> bool {
1435        !self.dep_records.is_empty() && (self.fn_id.is_some() || self.op.is_some())
1436    }
1437
1438    /// True iff this is an Operator node (has op set).
1439    #[allow(dead_code)] // Direct `op.is_some()` is more common; this is a readability sugar.
1440    pub(crate) fn is_operator(&self) -> bool {
1441        self.op.is_some()
1442    }
1443
1444    /// True iff this node opts OUT of Lock 2.B auto-cascade —
1445    /// Operator(Reduce) / Operator(Last) intercept upstream COMPLETE.
1446    ///
1447    /// D263: a non-operator node with `terminal_as_real_input == true`
1448    /// ALSO skips auto-cascade so `fire_fn` runs on terminal-only deps —
1449    /// mirrors the Reduce-class operator dispatch that already counts
1450    /// terminal as real input. Note: `partial == true` ALONE does NOT
1451    /// skip cascade — canonical-spec §5.4 / R5.4 locks `partial` to
1452    /// first-run-gate-bypass ONLY. To get fire-on-COMPLETE in partial
1453    /// mode, set BOTH flags (or wrap in a Reduce-class operator).
1454    pub(crate) fn skips_auto_cascade(&self) -> bool {
1455        match self.op {
1456            Some(op) => NodeKind::Operator(op).skips_auto_cascade(),
1457            None => self.terminal_as_real_input,
1458        }
1459    }
1460
1461    /// Compute the public-API [`NodeKind`] from the field shape (D030).
1462    /// Used by [`Core::kind_of`] and rare internal sites that need the
1463    /// enum (most use the predicate methods above).
1464    pub(crate) fn kind(&self) -> NodeKind {
1465        if let Some(op) = self.op {
1466            NodeKind::Operator(op)
1467        } else if self.dep_records.is_empty() {
1468            if self.fn_id.is_some() {
1469                NodeKind::Producer
1470            } else {
1471                NodeKind::State
1472            }
1473        } else if self.is_dynamic {
1474            NodeKind::Dynamic
1475        } else {
1476            NodeKind::Derived
1477        }
1478    }
1479
1480    // ---- Existing accessors ----
1481
1482    /// Iterator over dep NodeIds in declaration order.
1483    pub(crate) fn dep_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
1484        self.dep_records.iter().map(|r| r.node)
1485    }
1486
1487    /// Collected dep NodeIds — for call sites that need a `Vec<NodeId>`.
1488    pub(crate) fn dep_ids_vec(&self) -> Vec<NodeId> {
1489        self.dep_ids().collect()
1490    }
1491
1492    /// True if any dep is in sentinel state (never emitted DATA and no
1493    /// data this wave). Replaces the old `dep_handles.contains(&NO_HANDLE)`.
1494    pub(crate) fn has_sentinel_deps(&self) -> bool {
1495        let n = self.dep_records.len();
1496        if n == 0 {
1497            return false;
1498        }
1499        if n <= 64 {
1500            // O(1): check if all bits [0..n) are set.
1501            let full_mask = if n == 64 { u64::MAX } else { (1u64 << n) - 1 };
1502            self.received_mask != full_mask
1503        } else {
1504            // Fallback for >64 deps (extremely rare).
1505            self.dep_records
1506                .iter()
1507                .any(|r| r.prev_data == NO_HANDLE && r.data_batch.is_empty())
1508        }
1509    }
1510
1511    /// Find the index of a dep by NodeId.
1512    pub(crate) fn dep_index_of(&self, dep_id: NodeId) -> Option<usize> {
1513        self.dep_records.iter().position(|r| r.node == dep_id)
1514    }
1515
1516    /// True if ALL dep terminal slots are populated (Lock 2.B cascade check).
1517    pub(crate) fn all_deps_terminal(&self) -> bool {
1518        !self.dep_records.is_empty() && self.dep_records.iter().all(|r| r.terminal.is_some())
1519    }
1520}
1521
1522// Q-beyond Sub-slice 1 (D108, 2026-05-09): `CrossPartitionState` removed.
1523//
1524// The four wave-scoped fields previously held under
1525// `Core::cross_partition: Arc<parking_lot::Mutex<CrossPartitionState>>`
1526// (Q2, 2026-05-09) moved to a per-thread `WaveState` thread_local in
1527// [`crate::batch`]. The bench-driven rationale is documented at
1528// [`crate::batch::WaveState`]: cross-thread cache-line bouncing on the
1529// `cross_partition` mutex was the dominant cost in Phase J's regression,
1530// not single-thread mutex hop count. Per-thread placement eliminates the
1531// bounce point entirely.
1532//
1533// **Refcount discipline preserved.** `wave_cache_snapshots` and
1534// `deferred_handle_releases` still hold binding-side handle retains;
1535// outermost `BatchGuard::drop` drains them via `Core::binding` on both
1536// success and panic paths (the binding ref the prior
1537// `CrossPartitionState` held for its `Drop` impl is no longer needed —
1538// `BatchGuard` already holds a `Core` clone with the binding).
1539
1540/// Core-global state with NO per-scheduling-group partition
1541/// (Slice B-2 Step 1, D220-EXEC). Hoisted out of [`CoreState`]
1542/// so that when Step 2 shards `nodes`/`children` per `ShardKey`, THESE
1543/// remain singular: monotonic id counters, the topology-sink registry,
1544/// the cross-shard-visible `currently_firing` reentrancy stack (P13 /
1545/// /qa F2 — the load-bearing cross-thread-visibility property), the two
1546/// Core-global caps, and the Core-shutdown scratch-release queue.
1547///
1548/// **Step 1 (this stage): behaviour-identical.** A plain sub-struct of
1549/// [`CoreState`] under the SAME single cell/lock — `s.shared.<f>` is
1550/// exactly the old `s.<f>`. Step 2 reshapes [`crate::state_cell`] to
1551/// hold one `CoreShared` + a per-`ShardKey` shard map; pure-shared ops
1552/// then take only the shared guard (`with_shared`), shard ops the shard
1553/// guard (`with_shard`), and when both are needed the deadlock-free
1554/// rule is **shard guard OUTER, shared guard INNER** (D220-EXEC).
1555// `pub` (not `pub(crate)`): appears in the public `StateCell` trait
1556// surface (`from_parts` / `lock_shared`), same as `CoreState`. The
1557// struct *name* is public but every field is `pub(crate)`, so it is an
1558// opaque token outside the crate — no internal state is reachable.
1559pub struct CoreShared {
1560    pub(crate) next_node_id: u64,
1561    pub(crate) next_subscription_id: u64,
1562    pub(crate) next_lock_id: u64,
1563    /// Topology-change sinks. Keyed by subscription id for O(1) removal.
1564    pub(crate) topology_sinks: HashMap<u64, crate::topology::TopologySink>,
1565    pub(crate) next_topology_id: u64,
1566    /// Core-global cap on per-node pause replay buffer length. `None` means
1567    /// unbounded. Per the user direction (Q1, 2026-05-05): start core-global;
1568    /// per-node override can be added later as a pure addition without API
1569    /// breakage. Default `None`.
1570    pub(crate) pause_buffer_cap: Option<usize>,
1571    /// Core-global cap on wave-drain iterations before
1572    /// [`crate::batch::Core::drain_and_flush`] aborts with a diagnostic panic.
1573    /// Replaces the prior `MAX_DRAIN_ITERATIONS` hard-coded constant
1574    /// (R4.3 / Lock 2.F′). Default `10_000`.
1575    ///
1576    /// The drain loop bound exists to surface runtime cycles
1577    /// (e.g. an operator that re-arms its own `pending_fires` slot during
1578    /// `invoke_fn`) as a panic with context, rather than letting Core
1579    /// spin forever. Structural cycles via [`Core::set_deps`] are
1580    /// rejected at edge-mutation time (`SetDepsError::WouldCreateCycle`);
1581    /// registration is structurally cycle-safe by construction (the new
1582    /// node's id is not allocated until AFTER deps are validated, so deps
1583    /// cannot transitively reach the new node). The drain bound is the
1584    /// safety net for runtime cycles that bypass both static checks.
1585    pub(crate) max_batch_drain_iterations: u32,
1586    /// A6 reentrancy guard stack (Slice F, 2026-05-07): the stack of
1587    /// NodeIds whose fn is currently being invoked. Pushed at the top of
1588    /// `fire_fn` (just before the lock-released `invoke_fn` call) and
1589    /// popped on return / unwind via the [`crate::batch::FiringGuard`]
1590    /// RAII helper. [`Core::set_deps`] consults this set and rejects
1591    /// with [`SetDepsError::ReentrantOnFiringNode`] if `n` is currently
1592    /// firing — preventing the D1 `tracked` index corruption. Read by
1593    /// the P13 partition-migration check (D091) to reject mid-fire
1594    /// `set_deps` that would migrate a firing node's partition.
1595    ///
1596    /// **Core-global / cross-shard-visible.** /qa F2 reverted
1597    /// (2026-05-10): briefly placed on per-thread `WaveState`, then
1598    /// moved BACK after /qa F2 surfaced the cross-thread P13 bypass
1599    /// (per-thread placement made Thread B's `set_deps` read its own
1600    /// empty stack → P13 silently bypassed for cross-thread `set_deps`
1601    /// during Thread A's lock-released `invoke_fn`). Cross-thread
1602    /// visibility is the load-bearing property D091 requires; keeping it
1603    /// in `CoreShared` (NOT a per-`ShardKey` shard) preserves it across
1604    /// the Slice B-2 shard split (D220-EXEC / /qa F2 / D214 P13).
1605    ///
1606    /// Membership semantics (NOT strict LIFO): consumed via
1607    /// `contains(&n)` membership test. `FiringGuard::drop` pops the
1608    /// right-most matching `node_id` via `rposition` + `swap_remove`;
1609    /// physical order of remaining entries may not match construction
1610    /// order, but membership is preserved.
1611    pub(crate) currently_firing: Vec<NodeId>,
1612    /// D-α (D028 full close, 2026-05-10): per-Core defer queue for old
1613    /// operator-scratch boxes pushed by Phase G on resubscribable nodes.
1614    /// Phase G builds a fresh scratch via [`Core::make_op_scratch`] and
1615    /// installs it on the node's `op_scratch` slot (so re-activation
1616    /// sees fresh counters / a fresh seed-share); the OLD scratch's
1617    /// handle retains are deferred to one of two drain points to
1618    /// preserve the Slice C-3 /qa P1 retain-before-release invariant
1619    /// (the C-3 test
1620    /// `scan_resubscribable_reset_with_seed_aliasing_acc_does_not_collapse_registry`
1621    /// fails if the old `acc` share is released before the fresh seed
1622    /// share is taken — when `acc == seed` interns to the same registry
1623    /// slot, eager release drops the slot to zero before the fresh
1624    /// retain bumps it back up).
1625    ///
1626    /// Drain points:
1627    /// 1. [`Core::reset_for_fresh_lifecycle`] — after Phase 2 takes
1628    ///    fresh retains on the new seed/default, the queue drain
1629    ///    releases queued boxes whose handles may have aliased.
1630    /// 2. [`Drop for CoreState`] — catch-all on Core shutdown.
1631    ///
1632    /// Note that the queue lives on `CoreShared` within `CoreState` (not
1633    /// on `Core`) so `Drop for CoreState` has access to both the binding
1634    /// and the queue under the single state lock — no separate mutex
1635    /// needed.
1636    ///
1637    /// **Growth bound (/qa m17, 2026-05-10):** size is bounded by the
1638    /// number of non-terminal deactivate-reactivate cycles since the
1639    /// last terminal-then-resubscribe reset on any resubscribable +
1640    /// has-op node in this Core. Each entry is a `Box<dyn OperatorScratch>`
1641    /// holding O(1) handles (Scan/Reduce/Last: 1 handle; Take/Skip/
1642    /// TakeWhile/DistinctUntilChanged/Pairwise: 0 or 1 handle). Typical
1643    /// workloads: O(few KB). Degenerate workloads (long-lived Cores
1644    /// with frequent deactivate-reactivate cycles and no terminal
1645    /// resets) should call `core.complete()` / `core.error()` on the
1646    /// op node periodically to trigger queue drain via
1647    /// `reset_for_fresh_lifecycle`. The release happens unconditionally
1648    /// on Core drop, so this is not a leak — just a deferred-release
1649    /// growth concern under unusual workloads.
1650    pub(crate) pending_scratch_release: Vec<Box<dyn crate::op_state::OperatorScratch>>,
1651    /// Binding handle, held here so [`Drop for CoreShared`] can drain
1652    /// `pending_scratch_release` on Core shutdown (Step 2a, D220-EXEC:
1653    /// `CoreShared` is hoisted to its own region, separate from any
1654    /// shard's `CoreState`, so the old `Drop for CoreState` scratch-drain
1655    /// — which used `CoreState::binding` — moves here). Cheap `Arc`
1656    /// clone; `Core` / each shard `CoreState` also hold one.
1657    pub(crate) binding: Arc<dyn BindingBoundary>,
1658}
1659
1660impl Drop for CoreShared {
1661    fn drop(&mut self) {
1662        // D-α (D028) catch-all drain of the deferred operator-scratch
1663        // queue on Core shutdown (moved from `Drop for CoreState`'s tail
1664        // with the `pending_scratch_release` + `binding` hoist). Release
1665        // BEFORE the `Vec<Box<…>>` drops (each box's `Drop` is a plain
1666        // field drop — HandleIds are raw u64s, no binding re-entry).
1667        let queued: Vec<Box<dyn crate::op_state::OperatorScratch>> =
1668            std::mem::take(&mut self.pending_scratch_release);
1669        for mut scratch in queued {
1670            scratch.release_handles(&*self.binding);
1671        }
1672    }
1673}
1674
1675/// Per-shard mutable Core state (Step 2a, D220-EXEC: still exactly ONE
1676/// shard — behaviour-identical with the pre-B-2 single `CoreState`;
1677/// Step 2b keys a per-[`crate::state_cell::ShardKey`] map of these).
1678/// All mutable Core state, behind one [`parking_lot::Mutex`].
1679///
1680/// **Architecture history.** Q2 (2026-05-09) split four wave-scoped
1681/// cross-partition aggregation fields out into a separate
1682/// `parking_lot::Mutex<CrossPartitionState>` on [`Core`]. Q-beyond
1683/// Sub-slice 1 (D108, 2026-05-09) eliminated `CrossPartitionState`
1684/// entirely — its four fields moved to per-thread `WaveState`
1685/// thread_local in `crate::batch`. Sub-slice 2 + 3 moved 8 more
1686/// wave-scoped fields the same way. /qa F1+F2 (2026-05-10) reverted
1687/// `in_tick` and `currently_firing` to CoreState; `currently_firing`
1688/// stays here (cross-thread P13 set_deps check, /qa F2), but `in_tick`
1689/// was re-keyed per-(Core, thread) into the
1690/// `crate::batch::IN_TICK_OWNED` thread_local (D047, 2026-05-15) and
1691/// then collapsed to a one-Core-per-OS-thread `Cell<u64>` slot
1692/// (D252, S5, 2026-05-19) — the actor-model invariant locks
1693/// "one Core per OS thread" so a single-generation slot suffices,
1694/// with cross-Core same-thread nesting panicking fail-loud at
1695/// `BatchGuard::claim_in_tick`. See `docs/rust-port-decisions.md`.
1696///
1697/// The D1 patch (2026-05-09) moved Slice G's `tier3_emitted_this_wave`
1698/// set to a per-thread thread-local in `crate::batch` (was briefly
1699/// per-partition under Q3 v1; that placement was vulnerable to mid-wave
1700/// cross-thread `set_deps` partition splits — see
1701/// `docs/porting-deferred.md` "Per-partition state-shard refactor"
1702/// closing summary). Q-beyond
1703/// will continue the shape decomposition by sharding most of the
1704/// remaining fields per-partition.
1705// `pub` (not `pub(crate)`): appears in the public `StateCell` trait surface
1706// (`Core<C: StateCell>` is public and `SingleThreadCell`/`LockedCell` are
1707// exported). The struct *name* is public but every field stays `pub(crate)`,
1708// so it is an opaque token outside the crate — no internal state is reachable.
1709pub struct CoreState {
1710    pub(crate) nodes: HashMap<NodeId, NodeRecord>,
1711    /// Inverted adjacency: `parent → children`. Updated on registration.
1712    pub(crate) children: HashMap<NodeId, HashSet<NodeId>>,
1713    /// Binding-boundary handle for `Drop`-time refcount balancing.
1714    /// `Core` also holds a clone of this Arc; storing it here lets
1715    /// `Drop for CoreState` walk every retained slot and release the
1716    /// binding-side share when the last `Core` clone drops. Without this,
1717    /// `cache` / `terminal` / `dep_terminals` Error / pause-buffer payload
1718    /// handle refs leak in the binding registry until process exit.
1719    pub(crate) binding: Arc<dyn BindingBoundary>,
1720    // Wave-scoped state lives off `CoreState`:
1721    // - `pending_fires` / `pending_notify` / `deferred_flush_jobs` /
1722    //   `deferred_cleanup_hooks` / `pending_wipes` /
1723    //   `invalidate_hooks_fired_this_wave` / `wave_cache_snapshots` /
1724    //   `pending_auto_resolve` / `pending_pause_overflow` →
1725    //   per-thread [`crate::batch::WaveState`] (D108 Sub-slices 1–3).
1726    // - `in_tick` → one-Core-per-OS-thread [`crate::batch::IN_TICK_OWNED`]
1727    //   (D252; cross-Core same-thread nesting panics fail-loud).
1728    // - tier3-emitted-this-wave → per-thread
1729    //   [`crate::batch::TIER3_EMITTED_THIS_WAVE`] (Slice G / D1).
1730    // Core-global non-shard state (`next_*`, `topology_sinks`,
1731    // `currently_firing` [cross-shard-visible /qa F2], the two caps,
1732    // `pending_scratch_release`) lives on [`CoreShared`] (Slice B-2
1733    // Step 1, D220-EXEC) — see its doc for each field's rationale.
1734}
1735
1736// # Wave execution = one uninterrupted owner-side drain (S4, D246/D248/D249)
1737//
1738// The state lock (`state: RefCell<CoreState>`) is **dropped** around
1739// binding callbacks (`invoke_fn`, `custom_equals`) so user fns may
1740// re-enter Core. The original M1 design serialized WAVE EXECUTION
1741// across threads with a `wave_owner` `parking_lot::ReentrantMutex` held
1742// for the wave; S2c/D248 deleted it. `Core` is now single-owner
1743// `!Send + !Sync`: exactly one owner thread ever drives a given `Core`,
1744// so a wave is one uninterrupted owner-side drain. There is no
1745// cross-thread emitter to block and no wave-ownership mutex.
1746//
1747// - Same-thread / same-Core re-entrance (a user fn or in-wave sink
1748//   re-entering via the owner-side mailbox/`DeferQueue` seam) runs as a
1749//   nested wave: the inner `run_wave` sees `in_tick = true` (the
1750//   one-Core-per-OS-thread `batch::IN_TICK_OWNED` slot holds this
1751//   Core's `generation`, D252) and skips its own drain — the outer
1752//   drain picks the queued op up to quiescence.
1753// - Cross-`Core` concurrency is host-native: independent per-worker
1754//   Cores (actor model). The only cross-thread bridge into a `!Send`
1755//   Core is the id-only `Arc<CoreMailbox>` (timer / producer `post_*` +
1756//   the `runnable` wake bit), drained owner-side.
1757//
1758// The happens-after contract (`emit` returning ⇒ subscribers observed)
1759// holds structurally: the single owner thread runs the emit's drain to
1760// quiescence before returning; a mid-wave subscribe is frozen against
1761// double-delivery by the `subscribers_revision` `PendingBatch` snapshot
1762// (Slice X4/D2), not by a cross-thread lock.
1763
1764/// Monotonic generation counter for `Core` instances. Used by the
1765/// per-thread `PARTITION_CACHE` in `batch.rs` to distinguish Core
1766/// instances without relying on `Arc::as_ptr` (which can be reused by
1767/// the allocator after a Core is dropped — ABA hazard). One atomic
1768/// increment per `Core::new`; negligible cost.
1769static CORE_GENERATION: AtomicU64 = AtomicU64::new(1);
1770
1771/// The handle-protocol Core dispatcher.
1772///
1773/// Holds an [`Arc`] to the [`BindingBoundary`] and all dispatch state. Cheap
1774/// to clone (the inner `Arc<Mutex<CoreState>>` is shared); pass `Core` by
1775/// value to threads.
1776pub struct Core {
1777    /// Core-global region (id counters, topology-sink registry,
1778    /// `currently_firing`, the two caps, the scratch-release queue).
1779    /// D246/S2c: single-owner ⇒ a plain
1780    /// `RefCell` (the `parking_lot`/`StateCell`-generic split was
1781    /// shared-Core-era legacy; the actor model drives a `Core` from
1782    /// exactly one thread). D248 relaxed the substrate `Sink`/
1783    /// `TopologySink` off `Send + Sync`, so `CoreState` owns `!Send`
1784    /// sinks ⇒ **`Core` is `!Send + !Sync`** (one owner thread, never
1785    /// relocated cross-thread; the only cross-thread bridge is the
1786    /// id-only `Arc<CoreMailbox>`). This is the actor-model shape
1787    /// (D221/D223/D248); cross-`Core` parallelism = independent
1788    /// per-worker Cores. A re-entrant double-borrow panics loudly — a
1789    /// dispatcher bug (a missing lock-released bracket around a binding
1790    /// callback), not a user error.
1791    pub(crate) shared: std::cell::RefCell<CoreShared>,
1792    /// The node/children/binding region (was the sharded `CoreState`;
1793    /// single shard always under single-owner — D246/S2c collapse).
1794    pub(crate) state: std::cell::RefCell<CoreState>,
1795    /// `Send + Sync` bridge for autonomous async producers (timer tasks)
1796    /// that can no longer hold `&Core` (D223/D227/D230). Timer tasks
1797    /// post `(node, handle)`; [`Self::drain_mailbox`] applies them
1798    /// owner-side via the sync `emit`. Owns the per-group `runnable` wake
1799    /// bit (S4 wires it; M6 reads it from the host executor).
1800    pub(crate) mailbox: Arc<crate::mailbox::CoreMailbox>,
1801    /// Owner-only deferred-closure queue (D249/S2c). Holds the `!Send`
1802    /// `Defer` closures split off `CoreMailbox` (which stays
1803    /// `Send + Sync` for the timer bridge). `Rc` (not `Arc`): shared
1804    /// with `graphrefly-operators`' `ProducerEmitter` on the **one**
1805    /// owner thread; never crosses threads. Drained owner-side by
1806    /// [`Self::drain_mailbox`] + the in-wave `BatchGuard` drain.
1807    pub(crate) deferred: std::rc::Rc<crate::mailbox::DeferQueue>,
1808    pub(crate) binding: Arc<dyn BindingBoundary>,
1809    /// Unique generation ID for this Core instance. Assigned from
1810    /// [`CORE_GENERATION`] at construction; nonzero (the counter starts
1811    /// at 1 so `0` can serve as the [`crate::batch::IN_TICK_OWNED`]
1812    /// "no active Core" sentinel under D252). Stored in the one-Core-
1813    /// per-OS-thread `IN_TICK_OWNED` slot while this Core owns the
1814    /// thread's wave; cross-Core same-thread nesting panics fail-loud
1815    /// at `BatchGuard::claim_in_tick`.
1816    pub(crate) generation: u64,
1817}
1818
1819// S2b (D223): `Core` is **not** `Clone` and has **no** `WeakCore`. Under
1820// the actor / work-stealing model a `Core` owns its state cell by value
1821// and is moved between workers, never shared — so the old `Arc<C>`-shared
1822// `Clone` and the `Weak<C>`-back-ref `WeakCore` (used to break the
1823// BenchBinding→registry→closure→strong-Core cycle for long-lived
1824// binding-stored closures / timer tasks) are deleted. Autonomous async
1825// producers (timer tasks) now reach the `Core` via the `Send + Sync`
1826// [`crate::mailbox::CoreMailbox`] instead of upgrading a `Weak<C>`
1827// (D227/D230). Identity (`same_dispatcher`) is the unique per-`Core`
1828// `generation` counter, not `Arc::ptr_eq`.
1829
1830impl Drop for Core {
1831    fn drop(&mut self) {
1832        // Tell any in-flight timer tasks the Core is gone so they release
1833        // their pending handle and bail instead of leaking it into a
1834        // mailbox no one will drain (mirrors the old
1835        // `WeakCore::upgrade() == None` teardown branch in `timer.rs`).
1836        // `close()` is mutually exclusive with `post_op` (shared `ops`
1837        // lock), so after it returns no new op can enqueue.
1838        self.mailbox.close();
1839        // D249/S2c: close the owner-side defer queue too — its queued
1840        // closures are dropped unrun (running `CoreFull` on a
1841        // half-dropped `Core` is unsound — user-locked QA decision A).
1842        // No handle-release contract: `DeferFn`s capture no bare
1843        // retained `HandleId` (D235 P8 pattern).
1844        self.deferred.close();
1845        // QA F-A / Blind #2 (2026-05-18): an op posted just before
1846        // `close()` won the lock is now stranded in the queue with its
1847        // retained `HandleId`. Drain the remainder and release those
1848        // handles — without this `Drop` regresses the exact
1849        // BenchCore-teardown leak the deleted `WeakCore` path prevented.
1850        for op in self.mailbox.take_all() {
1851            match op {
1852                crate::mailbox::MailboxOp::Emit(_, h) | crate::mailbox::MailboxOp::Error(_, h) => {
1853                    self.binding.release_handle(h);
1854                }
1855                crate::mailbox::MailboxOp::Complete(_) | crate::mailbox::MailboxOp::Defer(_) => {}
1856            }
1857        }
1858    }
1859}
1860
1861/// Object-safe full-`Core` re-entry surface (S2b / D233) — the methods a
1862/// producer sink's owner-side [`crate::mailbox::MailboxOp::Defer`]
1863/// closure needs, by `NodeId`/`HandleId`/`Sink`/id only (no `C`/`T`),
1864/// blanket-impl'd for every `Core`. Lets windowing /
1865/// higher-order-operator sinks perform value-returning topology mutation
1866/// (`register_*`/`subscribe`) **in-wave** without naming the cell type:
1867/// the `BatchGuard` drain-to-quiescence loop calls
1868/// `f(self as &dyn CoreFull)` while it holds the owner `&Core`.
1869pub trait CoreFull {
1870    /// See [`Core::register_state`].
1871    fn register_state(&self, initial: HandleId, partial: bool) -> Result<NodeId, RegisterError>;
1872    /// See [`Core::register_producer`].
1873    fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError>;
1874    /// See [`Core::subscribe`].
1875    fn subscribe(&self, node_id: NodeId, sink: Sink) -> SubscriptionId;
1876    /// See [`Core::try_subscribe`].
1877    fn try_subscribe(&self, node_id: NodeId, sink: Sink) -> Result<SubscriptionId, SubscribeError>;
1878    /// See [`Core::unsubscribe`].
1879    fn unsubscribe(&self, node_id: NodeId, sub_id: SubscriptionId);
1880    /// See [`Core::emit`].
1881    fn emit(&self, node_id: NodeId, handle: HandleId);
1882    /// See [`Core::complete`].
1883    fn complete(&self, node_id: NodeId);
1884    /// See [`Core::error`].
1885    fn error(&self, node_id: NodeId, handle: HandleId);
1886    /// See [`Core::teardown`]. Sink-side terminal forwards (e.g.
1887    /// `stratify` TEARDOWN passthrough) route through `em.defer` — rare,
1888    /// so `Defer` rather than a 5th `MailboxOp` fast-path variant.
1889    fn teardown(&self, node_id: NodeId);
1890    /// See [`Core::invalidate`]. Sink-side INVALIDATE forwards
1891    /// (higher-order `build_inner_sink`) route through `em.defer`.
1892    fn invalidate(&self, node_id: NodeId);
1893
1894    // --- read-only inspection (S2b/β D243 / D237-A-AMEND) ---
1895    //
1896    // Needed so a `MailboxOp::Defer` closure (the in-wave owner-side
1897    // re-entry point — D233) can run `graphrefly_graph::describe_of`
1898    // for the reactive-describe / observe topology-sub path. These are
1899    // pure reads (no `C`/`T` surfaced) — `HandleId`/`NodeId`/enum only.
1900
1901    /// See [`Core::cache_of`].
1902    fn cache_of(&self, node_id: NodeId) -> HandleId;
1903    /// See [`Core::has_fired_once`].
1904    fn has_fired_once(&self, node_id: NodeId) -> bool;
1905    /// See [`Core::kind_of`].
1906    fn kind_of(&self, node_id: NodeId) -> Option<NodeKind>;
1907    /// See [`Core::deps_of`].
1908    fn deps_of(&self, node_id: NodeId) -> Vec<NodeId>;
1909    /// See [`Core::is_terminal`].
1910    fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind>;
1911    /// See [`Core::is_dirty`].
1912    fn is_dirty(&self, node_id: NodeId) -> bool;
1913
1914    /// Serialize a node's cached `HandleId` via the binding (S2b/β
1915    /// D244). Delegates to `binding_ptr().serialize_handle` — needed so
1916    /// an in-wave `MailboxOp::Defer` closure (storage snapshot-on-
1917    /// observe) can run `graphrefly_graph` snapshot through `&dyn
1918    /// CoreFull`. Pure binding-delegating read; no `C`/`T` surfaced.
1919    fn serialize_handle(&self, handle: HandleId) -> Option<serde_json::Value>;
1920
1921    /// The wave-drained [`CoreMailbox`] (D246 rule 5: the one facade is
1922    /// mutation + inspection + serialize + **mailbox**). Lets a holder
1923    /// of `&dyn CoreFull` post deferred ops without naming the cell
1924    /// type — folds D245's per-binding "how do I reach the mailbox".
1925    fn mailbox(&self) -> Arc<crate::mailbox::CoreMailbox>;
1926
1927    /// The owner-side [`crate::mailbox::DeferQueue`] (D249/S2c — the
1928    /// `!Send` `Defer` split off `CoreMailbox`). Lets a holder of
1929    /// `&dyn CoreFull` post owner-side deferred closures (`ProducerCtx`
1930    /// build path) without naming the cell type.
1931    fn defer_queue(&self) -> std::rc::Rc<crate::mailbox::DeferQueue>;
1932
1933    // --- producer-build accessors (D246 r5 / D245) ---
1934    //
1935    // The producer-build path runs owner-side with this one facade
1936    // (D245 Option A: `BindingBoundary::invoke_fn_with_core` hands the
1937    // binding `&dyn CoreFull`, from which it builds its `ProducerCtx`).
1938    // A build closure + its spawned sinks need the binding `Arc` (for
1939    // `retain_handle`/`release_handle` around payload shares) and the
1940    // build-side deferred-emit helpers — these mirror `Core`'s existing
1941    // same-named methods, so `ProducerCtx::core()` over `&dyn CoreFull`
1942    // keeps every producer factory compiling unchanged. Pure
1943    // delegation; no `C`/`T` surfaced.
1944
1945    /// See [`Core::binding`].
1946    fn binding(&self) -> Arc<dyn crate::boundary::BindingBoundary>;
1947    /// See [`Core::emit_or_defer`].
1948    fn emit_or_defer(&self, node_id: NodeId, new_handle: HandleId);
1949    /// See [`Core::complete_or_defer`].
1950    fn complete_or_defer(&self, node_id: NodeId);
1951    /// See [`Core::error_or_defer`].
1952    fn error_or_defer(&self, node_id: NodeId, error_handle: HandleId);
1953}
1954
1955impl CoreFull for Core {
1956    #[inline]
1957    fn register_state(&self, initial: HandleId, partial: bool) -> Result<NodeId, RegisterError> {
1958        Core::register_state(self, initial, partial)
1959    }
1960    #[inline]
1961    fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
1962        Core::register_producer(self, fn_id)
1963    }
1964    #[inline]
1965    fn subscribe(&self, node_id: NodeId, sink: Sink) -> SubscriptionId {
1966        Core::subscribe(self, node_id, sink)
1967    }
1968    #[inline]
1969    fn try_subscribe(&self, node_id: NodeId, sink: Sink) -> Result<SubscriptionId, SubscribeError> {
1970        Core::try_subscribe(self, node_id, sink)
1971    }
1972    #[inline]
1973    fn unsubscribe(&self, node_id: NodeId, sub_id: SubscriptionId) {
1974        Core::unsubscribe(self, node_id, sub_id);
1975    }
1976    #[inline]
1977    fn emit(&self, node_id: NodeId, handle: HandleId) {
1978        Core::emit(self, node_id, handle);
1979    }
1980    #[inline]
1981    fn complete(&self, node_id: NodeId) {
1982        Core::complete(self, node_id);
1983    }
1984    #[inline]
1985    fn error(&self, node_id: NodeId, handle: HandleId) {
1986        Core::error(self, node_id, handle);
1987    }
1988    #[inline]
1989    fn teardown(&self, node_id: NodeId) {
1990        Core::teardown(self, node_id);
1991    }
1992    #[inline]
1993    fn invalidate(&self, node_id: NodeId) {
1994        Core::invalidate(self, node_id);
1995    }
1996    #[inline]
1997    fn cache_of(&self, node_id: NodeId) -> HandleId {
1998        Core::cache_of(self, node_id)
1999    }
2000    #[inline]
2001    fn has_fired_once(&self, node_id: NodeId) -> bool {
2002        Core::has_fired_once(self, node_id)
2003    }
2004    #[inline]
2005    fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
2006        Core::kind_of(self, node_id)
2007    }
2008    #[inline]
2009    fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
2010        Core::deps_of(self, node_id)
2011    }
2012    #[inline]
2013    fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
2014        Core::is_terminal(self, node_id)
2015    }
2016    #[inline]
2017    fn is_dirty(&self, node_id: NodeId) -> bool {
2018        Core::is_dirty(self, node_id)
2019    }
2020    #[inline]
2021    fn serialize_handle(&self, handle: HandleId) -> Option<serde_json::Value> {
2022        self.binding_ptr().serialize_handle(handle)
2023    }
2024    #[inline]
2025    fn mailbox(&self) -> Arc<crate::mailbox::CoreMailbox> {
2026        Core::mailbox(self)
2027    }
2028    #[inline]
2029    fn defer_queue(&self) -> std::rc::Rc<crate::mailbox::DeferQueue> {
2030        Core::defer_queue(self)
2031    }
2032    #[inline]
2033    fn binding(&self) -> Arc<dyn crate::boundary::BindingBoundary> {
2034        Core::binding(self)
2035    }
2036    // Inlined over the `try_*` + `push_deferred_producer_op`
2037    // primitives (behaviour-identical to the public
2038    // `Core::{emit,complete,error}_or_defer` wrappers).
2039    #[inline]
2040    fn emit_or_defer(&self, node_id: NodeId, new_handle: HandleId) {
2041        if self.try_emit(node_id, new_handle).is_err() {
2042            self.binding.retain_handle(new_handle);
2043            self.push_deferred_producer_op(DeferredProducerOp::Emit {
2044                node_id,
2045                handle: new_handle,
2046            });
2047        }
2048    }
2049    #[inline]
2050    fn complete_or_defer(&self, node_id: NodeId) {
2051        if self.try_complete(node_id).is_err() {
2052            self.push_deferred_producer_op(DeferredProducerOp::Complete { node_id });
2053        }
2054    }
2055    #[inline]
2056    fn error_or_defer(&self, node_id: NodeId, error_handle: HandleId) {
2057        if self.try_error(node_id, error_handle).is_err() {
2058            self.binding.retain_handle(error_handle);
2059            self.push_deferred_producer_op(DeferredProducerOp::Error {
2060                node_id,
2061                handle: error_handle,
2062            });
2063        }
2064    }
2065}
2066
2067/// RAII guard that owns an [`OperatorScratch`] until either (a) the
2068/// caller `take()`s it for installation, or (b) the guard drops on an
2069/// early return / unwind, in which case the scratch's handle retains
2070/// are released via [`OperatorScratch::release_handles`].
2071///
2072/// Slice H /qa F1 + F2 (2026-05-07): closes two related correctness
2073/// gaps in `Core::register`:
2074///
2075/// 1. **TOCTOU window** — the original three-phase split called
2076///    `lock_state()` twice (once for validation, once for insertion),
2077///    so a concurrent `Core::complete(dep)` on a non-resubscribable
2078///    dep could slip in between the two acquisitions and re-create
2079///    the wedge `RegisterError::TerminalDep` was designed to prevent.
2080///    The guard plus a single locked region for both phases closes
2081///    this gap (release runs lock-released because guard variables
2082///    drop in reverse declaration order — guard declared BEFORE
2083///    `lock_state()`, so the lock guard drops first).
2084///
2085/// 2. **Panic-unsafe scratch leak** — without an RAII drop, a panic
2086///    between `make_op_scratch` (Phase 2) and the explicit
2087///    `if let Err(e)` cleanup branch (e.g., `lock_state()` reentrance
2088///    assert, OOM-as-panic on Vec growth in dep iteration) would
2089///    drop the `Box<dyn OperatorScratch>` without releasing the
2090///    seed/default retain. The guard's `Drop` impl releases on any
2091///    unwind path.
2092///
2093/// Lock-discipline: the guard holds `&dyn BindingBoundary` (through
2094/// the `Arc<dyn BindingBoundary>` it borrows from). On `Drop`, it
2095/// invokes `release_handles` lock-released — fires AFTER any
2096/// `MutexGuard<CoreState>` declared later in the same scope drops
2097/// (LIFO destruction order). Mirrors `Core::resume` Phase 2 release
2098/// pattern.
2099struct ScratchReleaseGuard<'a> {
2100    scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
2101    binding: &'a dyn BindingBoundary,
2102}
2103
2104impl<'a> ScratchReleaseGuard<'a> {
2105    fn new(
2106        scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
2107        binding: &'a dyn BindingBoundary,
2108    ) -> Self {
2109        Self { scratch, binding }
2110    }
2111
2112    /// Take ownership of the scratch — disarms the release-on-drop
2113    /// behavior. Used on the success path to install the scratch on
2114    /// `NodeRecord.op_scratch`.
2115    fn take(mut self) -> Option<Box<dyn crate::op_state::OperatorScratch>> {
2116        self.scratch.take()
2117    }
2118}
2119
2120impl Drop for ScratchReleaseGuard<'_> {
2121    fn drop(&mut self) {
2122        if let Some(mut scratch) = self.scratch.take() {
2123            scratch.release_handles(self.binding);
2124        }
2125    }
2126}
2127
2128/// Combined RAII guard returned by [`Core::lock_state`]. Holds the
2129/// [`CoreState`] borrow + the [`CoreShared`] borrow **together**.
2130/// `DerefMut`s to [`CoreState`] (so `s.nodes` / `s.children` /
2131/// `s.binding` work) **and** exposes an inherent `shared` field (so
2132/// `s.shared.<f>` works — inherent-field access is resolved before
2133/// `Deref`). D246/S2c: single-owner ⇒ the two regions are plain
2134/// `RefCell`s on [`Core`] (the `parking_lot`/`StateCell`-generic
2135/// sharded split was shared-Core-era legacy). A re-entrant
2136/// double-borrow panics loudly — a dispatcher bug (a missing
2137/// lock-released bracket around a binding callback), not a user error.
2138pub(crate) struct St<'a> {
2139    state: std::cell::RefMut<'a, CoreState>,
2140    pub(crate) shared: std::cell::RefMut<'a, CoreShared>,
2141}
2142
2143impl<'a> St<'a> {
2144    /// Borrow both Core regions together. Used by [`Core::lock_state`]
2145    /// and the owner-side drop paths (which hold `&Core`).
2146    #[inline]
2147    pub(crate) fn new(core: &'a Core) -> Self {
2148        let state = core.state.borrow_mut();
2149        let shared = core.shared.borrow_mut();
2150        Self { state, shared }
2151    }
2152
2153    /// Allocate a fresh [`NodeId`] (the counter lives in the separate
2154    /// `CoreShared` region — `St` is the only place holding both).
2155    pub(crate) fn alloc_node_id(&mut self) -> NodeId {
2156        let id = NodeId::new(self.shared.next_node_id);
2157        self.shared.next_node_id += 1;
2158        id
2159    }
2160
2161    /// Allocate a fresh [`SubscriptionId`].
2162    ///
2163    /// **Invariant (QA F4, 2026-05-18): monotonic, never recycled for
2164    /// the `Core`'s lifetime.** A strictly-incrementing counter, never
2165    /// decremented or reset. This is what makes the owner-driven
2166    /// `unsubscribe` model sound: a `(NodeId, SubscriptionId)` pair
2167    /// recorded by `producer_deactivate` / `SubGuard` and unsubscribed
2168    /// later (after the slot may have been removed by a re-entrant
2169    /// cascade) can NEVER deregister a *different* subscription —
2170    /// `unsubscribe_sink` on a since-departed `sub_id` is a documented
2171    /// no-op, not a wrong-target removal.
2172    pub(crate) fn alloc_sub_id(&mut self) -> SubscriptionId {
2173        let id = SubscriptionId(self.shared.next_subscription_id);
2174        self.shared.next_subscription_id += 1;
2175        id
2176    }
2177}
2178
2179impl std::ops::Deref for St<'_> {
2180    type Target = CoreState;
2181    #[inline]
2182    fn deref(&self) -> &CoreState {
2183        &self.state
2184    }
2185}
2186
2187impl std::ops::DerefMut for St<'_> {
2188    #[inline]
2189    fn deref_mut(&mut self) -> &mut CoreState {
2190        &mut self.state
2191    }
2192}
2193
2194impl Core {
2195    /// Construct a fresh Core wired to the given binding. Pause buffer
2196    /// cap defaults to unbounded; set via [`Self::set_pause_buffer_cap`].
2197    #[must_use]
2198    pub fn new(binding: Arc<dyn BindingBoundary>) -> Self {
2199        Self {
2200            // D246/S2c: single-owner ⇒ plain `RefCell` regions (no
2201            // `StateCell` generic, no shard map). D248 relaxed the
2202            // substrate `Sink` off `Send+Sync` ⇒ `Core` is `!Send +
2203            // !Sync` — the actor-model shape (D221/D223/D248); one
2204            // owner thread, cross-`Core` parallelism via independent
2205            // per-worker Cores.
2206            shared: std::cell::RefCell::new(CoreShared {
2207                next_node_id: 1,
2208                next_subscription_id: 1,
2209                // A4 (Slice F, 2026-05-07): start `next_lock_id` in the
2210                // high half of the u32 range so `alloc_lock_id` can't
2211                // collide with user-supplied `LockId::new(N)`
2212                // constructors (which the napi-rs binding marshals from
2213                // `u32` and tests typically use in the low range,
2214                // 1..1024). Phase E /qa F1 (2026-05-08): lowered from
2215                // `1u64 << 32` to `1u64 << 31` so the value round-trips
2216                // through `u32::try_from(...)` at the napi boundary —
2217                // the previous seed errored every napi `alloc_lock_id`
2218                // call. Anti-collision intent (high range vs low user
2219                // range) preserved at half the prior ceiling (2^31 ≈ 2
2220                // billion allocations per Core, ample for parity
2221                // tests). Lift the floor when the deferred
2222                // BigInt-narrowing migration extends `LockId` to `u64`
2223                // at the FFI layer (porting-deferred "BigInt migration
2224                // for u32-narrowed napi types" entry).
2225                next_lock_id: 1u64 << 31,
2226                // `currently_firing` is the P13 set_deps reentrancy
2227                // check stack (/qa F2). Single-owner ⇒ one thread,
2228                // but kept here (Core-global) as the canonical
2229                // location.
2230                currently_firing: Vec::new(),
2231                pause_buffer_cap: None,
2232                max_batch_drain_iterations: 10_000,
2233                topology_sinks: HashMap::new(),
2234                next_topology_id: 1,
2235                pending_scratch_release: Vec::new(),
2236                binding: binding.clone(),
2237            }),
2238            state: std::cell::RefCell::new(CoreState {
2239                nodes: HashMap::new(),
2240                children: HashMap::new(),
2241                binding: binding.clone(),
2242            }),
2243            mailbox: Arc::new(crate::mailbox::CoreMailbox::new()),
2244            deferred: std::rc::Rc::new(crate::mailbox::DeferQueue::new()),
2245            binding,
2246            generation: CORE_GENERATION.fetch_add(1, Ordering::Relaxed),
2247        }
2248    }
2249
2250    /// Acquire the **combined** state guard: [`CoreState`] +
2251    /// [`CoreShared`] borrowed together. Returns [`St`] — `DerefMut`s
2252    /// to `CoreState` and exposes a `.shared` field, so
2253    /// `let mut s = self.lock_state(); … s.nodes … s.shared.x …` works.
2254    ///
2255    /// D246/S2c: single-owner ⇒ plain `RefCell` borrows (no lock).
2256    /// Binding callbacks fire LOCK-RELEASED (the guard is dropped
2257    /// before `invoke_fn`), so a re-entrant `lock_state()` while a
2258    /// guard is held is a dispatcher bug — it panics loudly via
2259    /// `RefCell`'s double-borrow check, the same observable contract
2260    /// the prior `parking_lot` re-entrant-deadlock path had.
2261    pub(crate) fn lock_state(&self) -> St<'_> {
2262        St::new(self)
2263    }
2264
2265    /// Run `f` with mutable access to the [`CoreShared`] region ONLY
2266    /// (id counters, topology-sink registry, `currently_firing`, the
2267    /// two caps, the scratch-release queue). Pure-shared
2268    /// ops (id alloc, topology) take only this borrow.
2269    pub(crate) fn with_shared<R>(&self, f: impl FnOnce(&mut CoreShared) -> R) -> R {
2270        f(&mut self.shared.borrow_mut())
2271    }
2272
2273    /// Run `f` with mutable access to the [`CoreState`] region ONLY
2274    /// (`nodes` / `children` / `binding`). The state-access seam —
2275    /// kept as a single method so a future M6 owner-thread overlay is
2276    /// a re-impl of one method, not a re-architecture of the ~104 call
2277    /// sites. D246/S2c: single shard always (single-owner), plain
2278    /// `RefCell` borrow.
2279    #[allow(dead_code)]
2280    pub(crate) fn with_shard<R>(&self, f: impl FnOnce(&mut crate::node::CoreState) -> R) -> R {
2281        f(&mut self.state.borrow_mut())
2282    }
2283
2284    /// Whether `self` and `other` are the same dispatcher instance.
2285    ///
2286    /// S2b (D223): `Core` is owned by value (no `Arc<C>`, no `Clone`), so
2287    /// identity is the unique per-`Core` [`Self::generation`] counter
2288    /// (assigned once from [`CORE_GENERATION`] at construction), not
2289    /// `Arc::ptr_eq`. Two independently `Core::new`-constructed instances
2290    /// have distinct generations even with the same binding; there is no
2291    /// longer any way to produce two handles to one `Core` (no `Clone`),
2292    /// so this is `true` only for genuine self-vs-self comparison.
2293    ///
2294    /// Used by `graphrefly-graph`'s `mount` to enforce the single-Core
2295    /// v1 invariant — cross-Core mount is post-M6.
2296    #[must_use]
2297    pub fn same_dispatcher(&self, other: &Core) -> bool {
2298        self.generation == other.generation
2299    }
2300
2301    /// Owner-side mailbox drain (D227/D230). Pops every
2302    /// [`crate::mailbox::CoreMailbox`]-queued timer `Emit` request in
2303    /// FIFO order and applies it via the synchronous [`Self::emit`] — so
2304    /// autonomous timer tasks (which can no longer hold `&Core`/`Weak<C>`
2305    /// under D223) get their fires delivered with **no async in Core**.
2306    ///
2307    /// Called from the embedder's existing advance/pump site (test
2308    /// harness `TestRuntime` advance helper, napi pump). Timer tasks
2309    /// already require the host runtime to be advanced before they fire,
2310    /// so draining here is behaviour-identical to the deleted autonomous
2311    /// `WeakCore::upgrade → core.emit` path. Idempotent on an empty
2312    /// mailbox. Re-entrant-safe: `emit` may cascade and a concurrent
2313    /// timer task may post again — the next drain picks it up (the
2314    /// `runnable` bit is re-set on every post).
2315    ///
2316    /// # Panics
2317    ///
2318    /// Panics if the id-mailbox / defer-queue keep mutually re-posting
2319    /// past the configured `max_batch_drain_iterations` cap — a livelock
2320    /// signal that a `Defer`/`Emit` pair is consuming its own output.
2321    pub fn drain_mailbox(&self) {
2322        // Clone the Arc out so the drain closure can call `&self.*`
2323        // without aliasing `self.mailbox` through `&self`.
2324        let mailbox = Arc::clone(&self.mailbox);
2325        // QA P3: bound the inner drain by the configured cap (the
2326        // self-reposting-Defer livelock guard lives here, NOT in
2327        // `drain_and_flush`'s fire-cascade counter).
2328        let deferred = std::rc::Rc::clone(&self.deferred);
2329        // QA P3: bound the inner drain by the configured cap (the
2330        // self-reposting-Defer livelock guard lives here, NOT in
2331        // `drain_and_flush`'s fire-cascade counter).
2332        //
2333        // QA F10 (2026-05-19): note that `max_ops` here is **shared
2334        // across three bounds** — each `mailbox.drain_into` inner
2335        // cap, each `deferred.drain_into` inner cap, AND the outer
2336        // mutual-quiescence round counter below. A workload requiring
2337        // legitimately deep cross-queue chaining (defer→emit→defer→…)
2338        // must tune `Core::set_max_batch_drain_iterations` up to
2339        // cover ALL THREE; counting against each independently is
2340        // intentional (the bound's purpose is "any single drain
2341        // dimension's livelock fires loudly"), not a knob granularity
2342        // bug.
2343        let max_ops = self.with_shared(|sh| sh.max_batch_drain_iterations);
2344        // D249/S2c: the owner-side `Defer` queue was split off the
2345        // `Send` id-`CoreMailbox` (so `!Send` `Defer` closures can
2346        // capture relaxed `Sink`s / `Rc<RefCell<GraphInner>>`). They
2347        // are now two physical queues, but a closure in either can
2348        // re-post to the OTHER (the canonical case: a `DeferQueue`
2349        // inner-subscribe → push-on-subscribe → an `Emit` on the
2350        // id-`CoreMailbox`). Pre-D249 this was ONE FIFO drained to
2351        // quiescence; restore that contract by draining BOTH to
2352        // **mutual quiescence** — id-mailbox first, then deferred, loop
2353        // until neither is runnable. Bounded by `max_ops` rounds.
2354        //
2355        // M1 contract (QA, 2026-05-19) — **CROSS-QUEUE ORDER = QUEUE
2356        // PRIORITY, NOT ARRIVAL ORDER.** Within each queue intra-FIFO
2357        // order is preserved (D234 cancel-before-resubscribe holds —
2358        // both are `DeferQueue` posts). ACROSS queues the order is
2359        // structurally `CoreMailbox` ops first, then `DeferQueue` ops,
2360        // every round. Pre-D249 a sink posting `[defer, emit]` to ONE
2361        // queue saw `defer` applied first; post-D249 a sink posting
2362        // `[defer (DeferQueue), emit (CoreMailbox)]` sees `emit`
2363        // applied first (mailbox-priority). Operators that need a
2364        // specific cross-queue ordering MUST capture the routing state
2365        // inside the *later* op's closure (D234 pattern: read shared
2366        // state at apply time, not at post time). Locked by the
2367        // `cross_queue_order_mailbox_then_deferred` regression in
2368        // `tests/lock_discipline.rs`.
2369        let mut rounds = 0u32;
2370        loop {
2371            mailbox.drain_into(max_ops, |op| match op {
2372                crate::mailbox::MailboxOp::Emit(node_id, handle) => self.emit(node_id, handle),
2373                crate::mailbox::MailboxOp::Complete(node_id) => self.complete(node_id),
2374                crate::mailbox::MailboxOp::Error(node_id, handle) => self.error(node_id, handle),
2375                // D233/D249: `Send` cross-thread timer defer applied
2376                // in-wave (we hold `&Core`).
2377                crate::mailbox::MailboxOp::Defer(f) => f(self),
2378            });
2379            // Each closure runs with the full object-safe Core surface
2380            // (we hold `&Core`); its topology mutation + result
2381            // consumption runs here.
2382            deferred.drain_into(max_ops, |f| f(self));
2383            if !mailbox.is_runnable() && !deferred.is_runnable() {
2384                break;
2385            }
2386            rounds += 1;
2387            assert!(
2388                rounds < max_ops,
2389                "drain_mailbox: id-mailbox / defer-queue mutual re-post \
2390                 livelock (> {max_ops} rounds) — a Defer/Emit pair is \
2391                 re-posting across the two queues every round. Tune via \
2392                 Core::set_max_batch_drain_iterations only with concrete \
2393                 evidence the workload needs more."
2394            );
2395        }
2396    }
2397
2398    /// Owner-side enqueue of a deferred closure (D233/D249). Runs at
2399    /// the next [`Self::drain_mailbox`] / in-wave `BatchGuard` drain
2400    /// with the object-safe full-`Core` surface. Returns `false` iff
2401    /// the owning `Core` is gone (closure dropped unrun). The
2402    /// owner-only `Rc<DeferQueue>` (also reachable via
2403    /// [`Self::defer_queue`] for capture into `!Send`
2404    /// `Sink`/`TopologySink` closures that cannot hold `&Core`).
2405    pub fn post_defer(&self, f: crate::mailbox::DeferFn) -> bool {
2406        self.deferred.post(f)
2407    }
2408
2409    /// Shared handle to this `Core`'s owner-side defer queue (D249).
2410    /// `graphrefly-operators`' `ProducerEmitter` + reactive
2411    /// describe/observe topo sinks hold this `Rc` to enqueue `Defer`
2412    /// work from inside a `!Send` sink closure (which carries no
2413    /// `&Core`). Owner-thread-only; never sent across threads.
2414    #[must_use]
2415    pub fn defer_queue(&self) -> std::rc::Rc<crate::mailbox::DeferQueue> {
2416        std::rc::Rc::clone(&self.deferred)
2417    }
2418
2419    /// Shared handle to this `Core`'s [`crate::mailbox::CoreMailbox`].
2420    /// Handed to autonomous async producers (timer tasks via
2421    /// [`crate::timer::spawn_timer_task`]) so they can post `Emit`
2422    /// requests without holding `&Core`/`Weak<C>` (D223/D227/D230).
2423    #[must_use]
2424    pub fn mailbox(&self) -> Arc<crate::mailbox::CoreMailbox> {
2425        Arc::clone(&self.mailbox)
2426    }
2427
2428    /// Shared handle to this `Core`'s binding. S2b/D231/A′: producer
2429    /// build closures obtain the binding here (via `ctx.core()`) for
2430    /// their spawned sinks' `retain_handle`/`pack_tuple`/`release_handle`
2431    /// — instead of the deleted `Weak<dyn ProducerBinding>` cycle-break.
2432    /// No cycle: the registry-stored build closure captures nothing
2433    /// strong; the `Arc<dyn BindingBoundary>` a sink clones lives in
2434    /// `Core`'s subscriber map and drops on unsubscribe.
2435    #[must_use]
2436    pub fn binding(&self) -> Arc<dyn BindingBoundary> {
2437        Arc::clone(&self.binding)
2438    }
2439
2440    /// Execute a producer-pattern op. §7: the union-find ascending-order
2441    /// constraint that motivated *deferring* these is deleted (groups
2442    /// are static identity only — S2c removed the group-lock machinery,
2443    /// so there is no `PartitionOrderViolation` to dodge). The op runs
2444    /// **immediately** — re-entrant execution on the single owner thread
2445    /// is absorbed by the in-flight wave drain (one-Core-per-OS-thread
2446    /// `IN_TICK_OWNED` slot, D252), exactly as the deferred drain used
2447    /// to run it at wave-end. The `deferred_producer_ops` queue +
2448    /// `drain_deferred_producer_ops` are deleted; this shim is retained
2449    /// so `graphrefly-operators` compiles unchanged (D211 minimal-churn).
2450    ///
2451    /// For `Emit`/`Error` the caller retained the handle before
2452    /// pushing (mirroring the old drain contract); we release it after
2453    /// firing so refcount discipline is unchanged.
2454    pub fn push_deferred_producer_op(&self, op: DeferredProducerOp) {
2455        match op {
2456            DeferredProducerOp::Emit { node_id, handle } => {
2457                self.emit(node_id, handle);
2458                self.binding.release_handle(handle);
2459            }
2460            DeferredProducerOp::Complete { node_id } => {
2461                self.complete(node_id);
2462            }
2463            DeferredProducerOp::Error { node_id, handle } => {
2464                self.error(node_id, handle);
2465                self.binding.release_handle(handle);
2466            }
2467            DeferredProducerOp::Callback(f) => {
2468                f();
2469            }
2470        }
2471    }
2472
2473    /// §7: `drain_deferred_producer_ops` is **deleted** — there is no
2474    /// `deferred_producer_ops` queue (the union-find ascending-order
2475    /// constraint that required deferral is gone). Producer ops execute
2476    /// immediately via [`Self::push_deferred_producer_op`]. The former
2477    /// `BatchGuard::drop` / `try_subscribe` drain call sites become
2478    /// no-ops. Kept as an empty inline fn so those call sites compile
2479    /// unchanged (D211 minimal-churn); the optimizer elides it.
2480    #[inline]
2481    pub(crate) fn drain_deferred_producer_ops(&self) {}
2482
2483    // D246/S2c: the §7 cross-thread wave-lock acquisition
2484    // (`compute_touched_groups` / `all_groups_sorted` /
2485    // `acquire_touched_group_guards` / `acquire_all_group_guards` /
2486    // `with_global_wave_fallback`) is **deleted** — single-owner ⇒ a
2487    // `Core` is driven by exactly one thread, so there are no
2488    // interleaving waves to serialize. D253 (S5) further deletes the
2489    // declared-group identity surface (`SchedulingGroupId`,
2490    // `node_group`, `partition_of`/`group_of`/`set_scheduling_group`,
2491    // and `NodeOpts.scheduling_group`) — re-introduce in M6 with M6's
2492    // actual scheduling needs in view.
2493}
2494
2495// `walk_undirected_dep_graph` + `component_is_group_consistent`: deleted
2496// by D253 (S5). They existed solely to validate `scheduling_group`'s
2497// dep+children+meta-component invariant at topology-mutation time. With
2498// the declared-group identity surface deleted, the component walk has no
2499// remaining caller — re-introduce alongside M6's scheduling work.
2500
2501impl Core {
2502    /// Internal inspection helper: number of `PendingBatch`es queued
2503    /// for `node` in the current wave. Used by Slice X4 D2 regression
2504    /// tests to pin the "common case = single batch, no SmallVec
2505    /// spill" perf invariant.
2506    ///
2507    /// Returns `None` if no `pending_notify` entry exists for `node`
2508    /// (no tier-1+ message has been queued for this node yet in this
2509    /// wave). `Some(0)` is unreachable by construction (a vacant
2510    /// entry implies no batches; an occupied entry has at least one).
2511    ///
2512    /// `#[doc(hidden)]` + unconditionally `pub` (NOT
2513    /// `cfg(any(test, debug_assertions))`): integration tests are a
2514    /// separate crate, so they get neither `cfg(test)` nor
2515    /// `debug_assertions` on the lib dependency under `--release`.
2516    /// Gating this out of release builds made the entire
2517    /// integration-test suite fail to compile under
2518    /// `cargo nextest run --release`, blocking release-optimized test
2519    /// runs (e.g. the `cascade_depth` stress tests). It is NOT primary
2520    /// public API — `#[doc(hidden)]` keeps it off the generated
2521    /// surface (spec §5.12: protocol-internal inspection is allowed
2522    /// but never a primary API).
2523    #[doc(hidden)]
2524    #[must_use]
2525    pub fn pending_batch_count(&self, node: NodeId) -> Option<usize> {
2526        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
2527        // on per-thread `WaveState`. Test callers run on the same
2528        // thread that ran the wave, so the per-thread placement is
2529        // observable here.
2530        crate::batch::with_wave_state(|ws| {
2531            ws.pending_notify
2532                .get(&node)
2533                .map(|entry| entry.batches.len())
2534        })
2535    }
2536
2537    /// Configure the Core-global cap on pause replay buffer length. When set,
2538    /// any per-node pause buffer that would exceed `cap` drops the oldest
2539    /// message(s) from the front; the dropped count is reported back via the
2540    /// resume callback (see [`ResumeReport`]). `None` (default) means
2541    /// unbounded; messages buffer indefinitely until the lockset clears.
2542    pub fn set_pause_buffer_cap(&self, cap: Option<usize>) {
2543        self.lock_state().shared.pause_buffer_cap = cap;
2544    }
2545
2546    /// Configure the replay buffer cap on `node_id` (R2.6.5 / Lock 6.G —
2547    /// Slice E1, 2026-05-07). `None` disables the buffer. `Some(N)` keeps
2548    /// the last `N` DATA emissions in a circular buffer; late subscribers
2549    /// receive them as part of the per-tier handshake (between START and
2550    /// any terminal). Switching from a larger cap to a smaller cap evicts
2551    /// the front of the buffer to fit; switching to `None` drains the
2552    /// buffer entirely. Each evicted/drained handle's retain is released
2553    /// back to the binding.
2554    ///
2555    /// # Panics
2556    ///
2557    /// Panics if `node_id` is not registered.
2558    pub fn set_replay_buffer_cap(&self, node_id: NodeId, cap: Option<usize>) {
2559        // QA A7 (2026-05-07): normalize `Some(0)` to `None`. Two ways to
2560        // express "disabled" is confusing: `push_replay_buffer` already
2561        // treats `Some(0)` as no-op, so persisting it adds nothing.
2562        let cap = match cap {
2563            Some(0) => None,
2564            other => other,
2565        };
2566        let to_release: Vec<HandleId> = {
2567            let mut s = self.lock_state();
2568            let rec = s.require_node_mut(node_id);
2569            rec.replay_buffer_cap = cap;
2570            match cap {
2571                None => rec.replay_buffer.drain(..).collect(),
2572                Some(c) => {
2573                    let mut drained = Vec::new();
2574                    while rec.replay_buffer.len() > c {
2575                        if let Some(h) = rec.replay_buffer.pop_front() {
2576                            drained.push(h);
2577                        }
2578                    }
2579                    drained
2580                }
2581            }
2582        };
2583        for h in to_release {
2584            self.binding.release_handle(h);
2585        }
2586    }
2587
2588    /// Reconfigure the pause mode for `node_id` (canonical §2.6 — Slice F
2589    /// audit close, 2026-05-07). Default for new nodes is
2590    /// [`PausableMode::Default`]; switch to [`PausableMode::ResumeAll`]
2591    /// for nodes whose pause-window emit history must be observable
2592    /// verbatim, or [`PausableMode::Off`] for nodes intrinsically
2593    /// pause-immune.
2594    ///
2595    /// # Errors
2596    ///
2597    /// - [`SetPausableModeError::UnknownNode`] — `node_id` is not
2598    ///   registered.
2599    /// - [`SetPausableModeError::WhilePaused`] — the node currently
2600    ///   holds at least one pause lock. Changing mode mid-pause would
2601    ///   lose buffered content or strand a `pending_wave` flag — resume
2602    ///   all locks first.
2603    pub fn set_pausable_mode(
2604        &self,
2605        node_id: NodeId,
2606        mode: PausableMode,
2607    ) -> Result<(), SetPausableModeError> {
2608        let mut s = self.lock_state();
2609        let rec = s
2610            .nodes
2611            .get_mut(&node_id)
2612            .ok_or(SetPausableModeError::UnknownNode(node_id))?;
2613        if rec.pause_state.is_paused() {
2614            return Err(SetPausableModeError::WhilePaused);
2615        }
2616        rec.pausable = mode;
2617        Ok(())
2618    }
2619
2620    /// Configure the wave-drain iteration cap (R4.3 / Lock 2.F′). The wave
2621    /// engine aborts a drain after `cap` iterations with a diagnostic panic.
2622    /// Default is `10_000` — high enough to avoid false positives on legitimate
2623    /// fan-in cascades, low enough to surface runtime cycles within seconds.
2624    ///
2625    /// Lower this only when running adversarial / property-based tests that
2626    /// want fast cycle detection. Raise it only with concrete evidence that a
2627    /// legitimate workload needs more iterations than the default — and even
2628    /// then, prefer to tune the workload (per-subgraph batching, etc.) over
2629    /// raising the cap.
2630    ///
2631    /// # Panics
2632    ///
2633    /// Panics if `cap == 0` — a zero cap would abort every wave on the very
2634    /// first iteration, deadlocking any subsequent dispatcher work.
2635    pub fn set_max_batch_drain_iterations(&self, cap: u32) {
2636        assert!(cap > 0, "max_batch_drain_iterations must be > 0");
2637        self.lock_state().shared.max_batch_drain_iterations = cap;
2638    }
2639
2640    /// Send a message UPSTREAM from `node_id` to each of its declared deps
2641    /// (canonical R1.4.1 — Slice F audit, F2 / 2026-05-07).
2642    ///
2643    /// The dispatcher rejects tier-3 (DATA / RESOLVED) and tier-5
2644    /// (COMPLETE / ERROR) per R1.4.1: value and terminal-lifecycle planes
2645    /// are downstream-only. All other tiers (0 START, 1 DIRTY, 2 PAUSE /
2646    /// RESUME, 4 INVALIDATE, 6 TEARDOWN) pass.
2647    ///
2648    /// # Routing per tier
2649    ///
2650    /// - **Tier 0 ([`Message::Start`]):** no-op. START is a per-subscription
2651    ///   handshake, not a routable wire signal — sending it upstream has no
2652    ///   well-defined target.
2653    /// - **Tier 1 ([`Message::Dirty`]):** no-op. The dep's "something
2654    ///   changed" notification is its own [`Self::emit`] / commit
2655    ///   responsibility; ignoring upstream DIRTY hints is safe.
2656    /// - **Tier 2 ([`Message::Pause`] / [`Message::Resume`]):** translates
2657    ///   to [`Self::pause`] / [`Self::resume`] on each dep. Lock id is
2658    ///   forwarded verbatim. Errors from individual deps are accumulated
2659    ///   in the `dep_errors` field of the returned report.
2660    /// - **Tier 4 ([`Message::Invalidate`]):** translates to
2661    ///   [`Self::invalidate`] on each dep. Note: canonical R1.4.2
2662    ///   distinguishes "downstream INVALIDATE" (cache clear + cascade) from
2663    ///   "upstream INVALIDATE" (plain forward, no self-process). The Rust
2664    ///   port v1 SIMPLIFICATION delegates to the same `Core::invalidate`
2665    ///   path — upstream INVALIDATE here DOES clear dep caches and cascade.
2666    ///   If a "plain forward" mode surfaces as a real consumer need, add
2667    ///   `up_with_options`.
2668    /// - **Tier 6 ([`Message::Teardown`]):** translates to
2669    ///   [`Self::teardown`] on each dep. Cascades per the standard
2670    ///   teardown path.
2671    ///
2672    /// # Errors
2673    ///
2674    /// - [`UpError::UnknownNode`] — `node_id` is not registered.
2675    /// - [`UpError::TierForbidden`] — tier 3 or tier 5.
2676    pub fn up(&self, node_id: NodeId, message: Message) -> Result<(), UpError> {
2677        // 2b-ii-B (D220-EXEC): route to the node's shard before the
2678        // pre-wave/cold `lock_state()` (see `try_emit`).
2679        // QA A10 (2026-05-07): check unknown node BEFORE tier rejection
2680        // for consistent error UX — `up(unknown, Data)` and
2681        // `up(unknown, Pause)` both report `UnknownNode` rather than
2682        // splitting on the tier.
2683        let dep_ids: Vec<NodeId> = {
2684            let s = self.lock_state();
2685            let rec = s.nodes.get(&node_id).ok_or(UpError::UnknownNode(node_id))?;
2686            rec.dep_ids_vec()
2687        };
2688        let tier = message.tier();
2689        if tier == 3 || tier == 5 {
2690            return Err(UpError::TierForbidden { tier });
2691        }
2692        for dep_id in dep_ids {
2693            match message {
2694                Message::Pause(lock) => {
2695                    let _ = self.pause(dep_id, lock);
2696                }
2697                Message::Resume(lock) => {
2698                    let _ = self.resume(dep_id, lock);
2699                }
2700                Message::Invalidate => {
2701                    self.invalidate(dep_id);
2702                }
2703                Message::Teardown => {
2704                    self.teardown(dep_id);
2705                }
2706                // Tier 0 START + tier 1 DIRTY: no-op upstream per the
2707                // routing table above.
2708                _ => {}
2709            }
2710        }
2711        Ok(())
2712    }
2713
2714    /// Allocate a unique [`LockId`] for use with [`Self::pause`] /
2715    /// [`Self::resume`]. Convenience for callers that don't already have an
2716    /// id-allocation scheme; user-supplied ids work too.
2717    #[must_use]
2718    pub fn alloc_lock_id(&self) -> LockId {
2719        let mut s = self.lock_state();
2720        let id = LockId::new(s.shared.next_lock_id);
2721        s.shared.next_lock_id += 1;
2722        id
2723    }
2724
2725    /// Access the binding boundary for this Core.
2726    ///
2727    /// Used by `graphrefly-graph` for snapshot serialization (M4.E1 / D166):
2728    /// `Graph::snapshot()` calls `binding.serialize_handle(cache)` to
2729    /// project each node's cached value into portable JSON.
2730    #[must_use]
2731    pub fn binding_ptr(&self) -> &Arc<dyn BindingBoundary> {
2732        &self.binding
2733    }
2734
2735    // -------------------------------------------------------------------
2736    // Registration — unified `register()` (D030, Slice D)
2737    //
2738    // All node kinds (State / Producer / Derived / Dynamic / Operator)
2739    // funnel through `Core::register(NodeRegistration) -> NodeId`. Sugar
2740    // wrappers (`register_state` / `register_producer` / `register_derived`
2741    // / `register_dynamic` / `register_operator`) build a `NodeRegistration`
2742    // and delegate. There is no parallel registration path internally.
2743    // -------------------------------------------------------------------
2744
2745    /// Unified node registration (D030).
2746    ///
2747    /// `reg` describes the node's identity (deps + closure-form fn id OR
2748    /// typed-op + per-kind opts). The kind is **derived from the field
2749    /// shape**, not stored — see [`NodeKind`].
2750    ///
2751    /// Sugar wrappers below ([`Self::register_state`],
2752    /// [`Self::register_producer`], [`Self::register_derived`],
2753    /// [`Self::register_dynamic`], [`Self::register_operator`]) build the
2754    /// registration for the common kinds and delegate here. Direct callers
2755    /// that need uncommon combinations (e.g., a partial-true derived) can
2756    /// invoke this method directly.
2757    ///
2758    /// # Errors
2759    ///
2760    /// Errors are returned in evaluation order — earlier phases short-circuit
2761    /// later ones, so a single registration produces at most one variant.
2762    ///
2763    /// **Phase 1 — lock-released, side-effect-free validation:**
2764    /// - [`RegisterError::OperatorWithoutDeps`] — `reg` carries an op but
2765    ///   `deps` is empty. Operator nodes need at least one dep — for
2766    ///   subscription-managed combinators with no declared deps, use
2767    ///   [`Self::register_producer`] instead.
2768    /// - [`RegisterError::InitialOnlyForStateNodes`] — `reg.opts.initial`
2769    ///   is non-sentinel for a non-state shape (deps non-empty, or
2770    ///   fn_or_op present). State nodes are the only kind with an initial
2771    ///   cache.
2772    ///
2773    /// **Phase 2 — operator scratch construction (lock-released):**
2774    /// - [`RegisterError::OperatorSeedSentinel`] — `reg` carries `Op(Scan)`
2775    ///   / `Op(Reduce)` with a `NO_HANDLE` seed. R2.5.3 — stateful folders
2776    ///   must have a real seed.
2777    ///
2778    /// **Phase 3 — state-lock validation (folded with insertion under a
2779    /// single lock acquisition per /qa F1 to prevent TOCTOU between
2780    /// validation and `nodes.insert`):**
2781    /// - [`RegisterError::UnknownDep`] — any element of `reg.deps` is not
2782    ///   a registered node id.
2783    /// - [`RegisterError::TerminalDep`] — a dep is terminal (COMPLETE /
2784    ///   ERROR) AND not resubscribable — would create a permanent wedge.
2785    ///
2786    /// All errors are construction-time invariants — the dispatcher
2787    /// rejects the registration before any reactive state is created.
2788    /// On `Err`, no node has been added and any handle retains taken on
2789    /// the way in (operator scratch seed retains via
2790    /// [`BindingBoundary::retain_handle`]) have been released
2791    /// lock-released — see [`ScratchReleaseGuard`] for the RAII
2792    /// discipline that covers both early-return AND unwind paths.
2793    /// `Last { default }` retains its `default` handle on the same
2794    /// release path.
2795    #[allow(clippy::too_many_lines)]
2796    pub fn register(&self, reg: NodeRegistration) -> Result<NodeId, RegisterError> {
2797        let NodeRegistration {
2798            deps,
2799            fn_or_op,
2800            opts,
2801        } = reg;
2802        let NodeOpts {
2803            initial,
2804            equals,
2805            partial,
2806            is_dynamic,
2807            pausable,
2808            replay_buffer,
2809            terminal_as_real_input,
2810        } = opts;
2811
2812        // Derive the field shape from fn_or_op + deps.
2813        let (fn_id, op) = match fn_or_op {
2814            Some(NodeFnOrOp::Fn(f)) => (Some(f), None),
2815            Some(NodeFnOrOp::Op(o)) => (None, Some(o)),
2816            None => (None, None),
2817        };
2818
2819        // Phase 1 — lock-released, side-effect-free validation. Errors
2820        // here return BEFORE any handle retain is taken.
2821        //
2822        //   - State (no deps + no fn + no op) is the only kind with `initial`.
2823        //   - Dynamic flag only meaningful when fn + non-empty deps.
2824        //   - Operator (op present) must have deps (P9: operator without deps
2825        //     would skip activation — use a producer instead).
2826        let is_state_shape = deps.is_empty() && fn_id.is_none() && op.is_none();
2827        if op.is_some() && deps.is_empty() {
2828            return Err(RegisterError::OperatorWithoutDeps);
2829        }
2830        if initial != NO_HANDLE && !is_state_shape {
2831            return Err(RegisterError::InitialOnlyForStateNodes);
2832        }
2833
2834        // Phase 2 — build per-operator scratch struct (may take handle
2835        // retains via `binding.retain_handle` for Scan/Reduce/Last seed).
2836        // Lock-released per Slice E (D045) handshake discipline. Returns
2837        // `OperatorSeedSentinel` BEFORE retain so an Err leaves no
2838        // dangling handles.
2839        let scratch = match op {
2840            Some(operator_op) => self.make_op_scratch(operator_op)?,
2841            None => None,
2842        };
2843
2844        // Wrap scratch in an RAII guard immediately after Phase 2. From
2845        // here on, ANY early return / unwind path correctly releases the
2846        // scratch's handle retains via `OperatorScratch::release_handles`
2847        // (Slice H /qa F2 — defense against panics between Phase 2 and
2848        // Phase 3 cleanup branch). Lock-released because the guard is
2849        // declared BEFORE `lock_state()` below — variable destruction
2850        // order is reverse declaration order, so the `MutexGuard` drops
2851        // first on any return path.
2852        let scratch_guard = ScratchReleaseGuard::new(scratch, &*self.binding);
2853
2854        // Phase 3 — state-lock-required validation, FOLDED with insertion
2855        // under a single `lock_state()` acquisition per /qa F1. The
2856        // pre-/qa version split this into two acquisitions (one for
2857        // validation, one for `alloc_node_id` + `nodes.insert`), opening
2858        // a TOCTOU window where a concurrent `Core::complete(dep)` on a
2859        // non-resubscribable dep could slip in and recreate the wedge
2860        // `TerminalDep` was designed to prevent. Single locked region
2861        // closes the gap.
2862        let mut s = self.lock_state();
2863
2864        for &dep in &deps {
2865            if !s.nodes.contains_key(&dep) {
2866                return Err(RegisterError::UnknownDep(dep));
2867            }
2868        }
2869        // Slice F audit (2026-05-07): mirror `set_deps`'s `TerminalDep`
2870        // rejection at registration time. Adding a non-resubscribable
2871        // terminal node as a dep at registration creates a permanent wedge.
2872        for &dep in &deps {
2873            let dep_rec = s.require_node(dep);
2874            if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
2875                return Err(RegisterError::TerminalDep(dep));
2876            }
2877        }
2878
2879        // Validation passed — install. Take scratch out of the guard
2880        // (disarms the release-on-drop) and continue using `s`.
2881        let installed_scratch = scratch_guard.take();
2882
2883        let id = s.alloc_node_id();
2884
2885        // `tracked`: Static derived + Operator track all deps; Dynamic
2886        // starts empty and fills via fn return; State / Producer have no
2887        // deps so tracked is empty.
2888        let tracked: HashSet<usize> = if op.is_some() {
2889            (0..deps.len()).collect()
2890        } else if is_dynamic {
2891            HashSet::new()
2892        } else if fn_id.is_some() && !deps.is_empty() {
2893            // Static derived
2894            (0..deps.len()).collect()
2895        } else {
2896            HashSet::new()
2897        };
2898
2899        let dep_records: Vec<DepRecord> = deps.iter().map(|&d| DepRecord::new(d)).collect();
2900
2901        // §10 perf (D047): compute topo_rank = 1 + max(dep ranks).
2902        let topo_rank = if deps.is_empty() {
2903            0
2904        } else {
2905            deps.iter()
2906                .filter_map(|&d| s.nodes.get(&d).map(|r| r.topo_rank))
2907                .max()
2908                .unwrap_or(0)
2909                .saturating_add(1)
2910        };
2911
2912        let rec = NodeRecord {
2913            dep_records,
2914            fn_id,
2915            op,
2916            is_dynamic,
2917            equals,
2918            cache: initial,
2919            has_fired_once: initial != NO_HANDLE,
2920            subscribers: HashMap::new(),
2921            subscribers_revision: 0,
2922            tracked,
2923            dirty: false,
2924            involved_this_wave: false,
2925            pause_state: PauseState::Active,
2926            pausable,
2927            replay_buffer_cap: replay_buffer,
2928            replay_buffer: VecDeque::new(),
2929            terminal: None,
2930            has_received_teardown: false,
2931            resubscribable: false,
2932            meta_companions: Vec::new(),
2933            partial,
2934            terminal_as_real_input,
2935            topo_rank,
2936            received_mask: 0,
2937            involved_mask: 0,
2938            op_scratch: installed_scratch,
2939        };
2940        s.nodes.insert(id, rec);
2941        s.children.insert(id, HashSet::new());
2942        for &dep in &deps {
2943            s.children.entry(dep).or_default().insert(id);
2944        }
2945        drop(s);
2946        self.fire_topology_event(&crate::topology::TopologyEvent::NodeRegistered(id));
2947        Ok(id)
2948    }
2949
2950    /// Sugar over [`Self::register`] — register a state node. `initial`
2951    /// may be [`NO_HANDLE`] to start sentinel.
2952    ///
2953    /// `partial` is accepted for surface consistency (D019); for state
2954    /// nodes it has no effect (state nodes don't fire fn).
2955    ///
2956    /// # Errors
2957    ///
2958    /// State registration is structurally simple — no deps, no op — so
2959    /// the only reachable variant is none in practice. Returns
2960    /// [`Result`] for surface consistency with [`Self::register`].
2961    pub fn register_state(
2962        &self,
2963        initial: HandleId,
2964        partial: bool,
2965    ) -> Result<NodeId, RegisterError> {
2966        self.register(NodeRegistration {
2967            deps: Vec::new(),
2968            fn_or_op: None,
2969            opts: NodeOpts {
2970                initial,
2971                partial,
2972                ..NodeOpts::default()
2973            },
2974        })
2975    }
2976
2977    /// Sugar over [`Self::register`] — register a producer node (D031,
2978    /// Slice D). No deps; fn fires once on first subscribe; cleanup runs
2979    /// via [`BindingBoundary::producer_deactivate`] when the last
2980    /// subscriber unsubscribes.
2981    ///
2982    /// The fn body uses the binding's `ProducerCtx`-equivalent helper
2983    /// (see `graphrefly-operators::producer`) to subscribe to other Core
2984    /// nodes — the zip / concat / race / takeUntil pattern.
2985    ///
2986    /// # Errors
2987    ///
2988    /// Producer registration has no user-supplied deps, so structurally
2989    /// none of [`RegisterError`]'s variants are reachable. Returns
2990    /// [`Result`] for surface consistency with [`Self::register`].
2991    pub fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
2992        self.register(NodeRegistration {
2993            deps: Vec::new(),
2994            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2995            opts: NodeOpts {
2996                // Producers have no deps — the first-run gate is degenerate.
2997                partial: true,
2998                ..NodeOpts::default()
2999            },
3000        })
3001    }
3002
3003    /// Sugar over [`Self::register`] — register a derived (static) node.
3004    /// `partial` controls the R2.5.3 first-run gate (D011).
3005    ///
3006    /// # Errors
3007    ///
3008    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
3009    ///   registered.
3010    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
3011    ///   resubscribable.
3012    pub fn register_derived(
3013        &self,
3014        deps: &[NodeId],
3015        fn_id: FnId,
3016        equals: EqualsMode,
3017        partial: bool,
3018    ) -> Result<NodeId, RegisterError> {
3019        self.register(NodeRegistration {
3020            deps: deps.to_vec(),
3021            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
3022            opts: NodeOpts {
3023                equals,
3024                partial,
3025                ..NodeOpts::default()
3026            },
3027        })
3028    }
3029
3030    /// Sugar over [`Self::register`] — register a dynamic node (fn
3031    /// declares its actually-tracked dep indices per fire). `partial`
3032    /// controls the R2.5.3 first-run gate (D011).
3033    ///
3034    /// # Errors
3035    ///
3036    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
3037    ///   registered.
3038    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
3039    ///   resubscribable.
3040    pub fn register_dynamic(
3041        &self,
3042        deps: &[NodeId],
3043        fn_id: FnId,
3044        equals: EqualsMode,
3045        partial: bool,
3046    ) -> Result<NodeId, RegisterError> {
3047        self.register(NodeRegistration {
3048            deps: deps.to_vec(),
3049            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
3050            opts: NodeOpts {
3051                equals,
3052                partial,
3053                is_dynamic: true,
3054                ..NodeOpts::default()
3055            },
3056        })
3057    }
3058
3059    /// Build a fresh [`OperatorScratch`](crate::op_state::OperatorScratch)
3060    /// box for an operator variant, taking any required handle retains.
3061    /// Shared between `register_operator` (initial install),
3062    /// `reset_for_fresh_lifecycle` (resubscribable terminal-cycle
3063    /// re-install), and Phase G of `Subscription::Drop` (D-α
3064    /// resubscribable + non-terminal deactivate re-install).
3065    ///
3066    /// # Errors
3067    ///
3068    /// Returns [`RegisterError::OperatorSeedSentinel`] if `op` is `Scan`
3069    /// / `Reduce` with a [`NO_HANDLE`] seed (R2.5.3 — stateful folders
3070    /// must have a real seed). Refcount discipline: the seed-sentinel
3071    /// check happens BEFORE [`BindingBoundary::retain_handle`], so an
3072    /// `Err` leaves no handles dangling.
3073    fn make_op_scratch(
3074        &self,
3075        op: OperatorOp,
3076    ) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
3077        Self::make_op_scratch_with_binding(&*self.binding, op)
3078    }
3079
3080    /// Associated-function variant of [`Self::make_op_scratch`] that
3081    /// takes the binding explicitly. Used by call sites that have a
3082    /// `&dyn BindingBoundary` but not a `&Core` (notably
3083    /// [`Subscription::Drop`]'s Phase G, which holds only a
3084    /// `Weak<Mutex<CoreState>>` and operates on `s.binding`).
3085    pub(crate) fn make_op_scratch_with_binding(
3086        binding: &dyn BindingBoundary,
3087        op: OperatorOp,
3088    ) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
3089        use crate::op_state::{
3090            DistinctState, LastState, PairwiseState, ReduceState, ScanState, SkipState, TakeState,
3091            TakeWhileState,
3092        };
3093        // Slice H (2026-05-07): Scan/Reduce seed-sentinel checks happen
3094        // BEFORE retain_handle so an Err return leaves no handles dangling.
3095        //
3096        // Slice H /qa F13 (2026-05-07): for retaining variants, allocate
3097        // the `Box<State>` BEFORE calling `binding.retain_handle`. If
3098        // `Box::new` panics (e.g., OOM-as-panic), no retain has happened
3099        // yet — no leak. If `retain_handle` panics after Box succeeds,
3100        // the `Box<State>` is dropped on unwind; State has no handle yet
3101        // (we haven't touched the registry refcount), so still no leak.
3102        // Caller wraps the returned scratch in `ScratchReleaseGuard` to
3103        // cover panics AFTER make_op_scratch returns.
3104        match op {
3105            OperatorOp::Scan { seed, .. } => {
3106                if seed == NO_HANDLE {
3107                    return Err(RegisterError::OperatorSeedSentinel);
3108                }
3109                let state = Box::new(ScanState { acc: seed });
3110                binding.retain_handle(seed);
3111                Ok(Some(state))
3112            }
3113            OperatorOp::Reduce { seed, .. } => {
3114                if seed == NO_HANDLE {
3115                    return Err(RegisterError::OperatorSeedSentinel);
3116                }
3117                let state = Box::new(ReduceState { acc: seed });
3118                binding.retain_handle(seed);
3119                Ok(Some(state))
3120            }
3121            OperatorOp::DistinctUntilChanged { .. } => Ok(Some(Box::new(DistinctState::default()))),
3122            OperatorOp::Pairwise { .. } => Ok(Some(Box::new(PairwiseState::default()))),
3123            OperatorOp::Take { .. } => Ok(Some(Box::new(TakeState::default()))),
3124            OperatorOp::Skip { .. } => Ok(Some(Box::new(SkipState::default()))),
3125            OperatorOp::TakeWhile { .. } => Ok(Some(Box::new(TakeWhileState))),
3126            OperatorOp::Last { default } => {
3127                let state = Box::new(LastState {
3128                    latest: NO_HANDLE,
3129                    default,
3130                });
3131                if default != NO_HANDLE {
3132                    binding.retain_handle(default);
3133                }
3134                Ok(Some(state))
3135            }
3136            OperatorOp::TapFirst { .. } => {
3137                Ok(Some(Box::new(crate::op_state::TapFirstState::default())))
3138            }
3139            OperatorOp::Settle { .. } => {
3140                Ok(Some(Box::new(crate::op_state::SettleState::default())))
3141            }
3142            OperatorOp::Map { .. }
3143            | OperatorOp::Filter { .. }
3144            | OperatorOp::Combine { .. }
3145            | OperatorOp::WithLatestFrom { .. }
3146            | OperatorOp::Merge
3147            | OperatorOp::Tap { .. }
3148            | OperatorOp::Valve => Ok(None),
3149        }
3150    }
3151
3152    /// Sugar over [`Self::register`] — register a built-in operator node
3153    /// (Slice C-1, D009; D026 generic scratch). The operator dispatch path
3154    /// lives in `fire_operator`; `op` selects which per-operator FFI
3155    /// method on [`BindingBoundary`] gets called per fire.
3156    ///
3157    /// For stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
3158    /// [`Last`] with a default), the seed/default handle is captured
3159    /// into the appropriate
3160    /// [`OperatorScratch`](crate::op_state::OperatorScratch) struct
3161    /// stored at [`NodeRecord::op_scratch`], and Core takes one retain
3162    /// share via [`BindingBoundary::retain_handle`].
3163    ///
3164    /// # Errors
3165    ///
3166    /// - [`RegisterError::OperatorWithoutDeps`] — `deps` is empty (use
3167    ///   [`Self::register_producer`] instead).
3168    /// - [`RegisterError::OperatorSeedSentinel`] — `op` is
3169    ///   [`OperatorOp::Scan`] / [`OperatorOp::Reduce`] with a
3170    ///   [`NO_HANDLE`] seed.
3171    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
3172    ///   registered.
3173    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
3174    ///   resubscribable.
3175    pub fn register_operator(
3176        &self,
3177        deps: &[NodeId],
3178        op: OperatorOp,
3179        opts: OperatorOpts,
3180    ) -> Result<NodeId, RegisterError> {
3181        self.register(NodeRegistration {
3182            deps: deps.to_vec(),
3183            fn_or_op: Some(NodeFnOrOp::Op(op)),
3184            opts: NodeOpts {
3185                equals: opts.equals,
3186                partial: opts.partial,
3187                ..NodeOpts::default()
3188            },
3189        })
3190    }
3191
3192    // -------------------------------------------------------------------
3193    // Subscription
3194    // -------------------------------------------------------------------
3195
3196    /// Subscribe a sink to a node. Returns a [`Subscription`] handle —
3197    /// dropping the handle unsubscribes the sink. Per §10.12, no manual
3198    /// `unsubscribe(node, id)` call is required.
3199    ///
3200    /// Push-on-subscribe (R1.2.3, R2.2.3 step 4): the sink is registered AFTER
3201    /// the START handshake fires. The handshake contents depend on node
3202    /// state:
3203    /// - Sentinel cache + live (non-terminal): `[START]`
3204    /// - Cached + live: `[START, DATA(handle)]`
3205    ///
3206    /// Subscribe-after-terminal semantics (canonical R2.2.7.a / R2.2.7.b,
3207    /// D118 2026-05-10):
3208    /// - **Resubscribable + terminal** (any TEARDOWN state): the subscribe
3209    ///   call first **resets** the node — clears `terminal`,
3210    ///   `has_fired_once`, `has_received_teardown`, all `dep_handles` to
3211    ///   `NO_HANDLE`, all `dep_terminals` to `None`, drains the pause
3212    ///   lockset, clears the replay buffer. The new subscriber receives a
3213    ///   fresh `[START]` (cache survives for state nodes per R2.2.8;
3214    ///   sentinel for compute). The `wipe_ctx` cleanup hook fires
3215    ///   lock-released so binding-side `ctx.store` starts fresh.
3216    /// - **Non-resubscribable + terminal** (any TEARDOWN state): the
3217    ///   subscribe is rejected — `try_subscribe` returns
3218    ///   [`SubscribeError::TornDown`]; this method (the panic-on-error
3219    ///   variant) panics with the diagnostic.
3220    ///
3221    /// Activation (R2.2.3 step 5): if this is the first subscriber and the
3222    /// node is a derived/dynamic compute, recursively activate deps so their
3223    /// cached handles fill our `dep_handles`.
3224    ///
3225    /// # Returns
3226    ///
3227    /// Returns the [`SubscriptionId`]. Pair it with the `node_id` you
3228    /// passed here and call [`Self::unsubscribe`] to deregister (S2b /
3229    /// D225: core-level RAII retired — a binding-layer RAII wrapper over
3230    /// `unsubscribe` provides drop-convenience where the holder co-owns
3231    /// the `Core` on its affinity worker).
3232    ///
3233    /// # Panics
3234    ///
3235    /// Panics if:
3236    /// - Subscribing would violate the Phase H+ ascending partition-order
3237    ///   invariant ([`SubscribeError::PartitionOrderViolation`]).
3238    /// - The node is non-resubscribable AND has terminated
3239    ///   ([`SubscribeError::TornDown`], R2.2.7.b).
3240    #[allow(clippy::needless_pass_by_value)] // Sink is `Arc<dyn Fn>`; we clone for the subscribers map and call it directly. Taking by value matches the ergonomics callers expect.
3241    pub fn subscribe(&self, node_id: NodeId, sink: Sink) -> SubscriptionId {
3242        match self.try_subscribe(node_id, sink) {
3243            Ok(sub) => sub,
3244            Err(e) => panic!("{e}"),
3245        }
3246    }
3247
3248    /// Synchronous owner-invoked unsubscribe (D225 refined A2). Symmetric
3249    /// with [`Core::subscribe`] (the caller has `&Core`, exactly like
3250    /// [`Core::emit`]). Runs the full deregister + Phase G / lifecycle
3251    /// cleanup chain (OnDeactivation → `producer_deactivate` → `wipe_ctx`
3252    /// → Core cache-clear), behaviour-identical to the legacy
3253    /// `Subscription::Drop`. Idempotent: a `sub_id` already gone (node
3254    /// destroyed / double-unsub) is a silent no-op. D225 S2b retires the
3255    /// core-level RAII `Subscription` in favour of a binding-layer RAII
3256    /// wrapper over this method (the binding holds its `Core` on its
3257    /// affinity worker, so its wrapper's `Drop` calls this synchronously).
3258    ///
3259    /// S2b promotes this (and [`SubscriptionId`]) to `pub` as the
3260    /// binding-facing surface that replaces the retired core-level RAII
3261    /// `Subscription`. Callers (binding-layer RAII wrapper, producer
3262    /// `producer_deactivate` per D229, tests) pair the `node_id` they
3263    /// subscribed with the returned `sub_id`.
3264    pub fn unsubscribe(&self, node_id: NodeId, sub_id: SubscriptionId) {
3265        unsubscribe_sink(self, node_id, sub_id);
3266    }
3267
3268    /// Fallible subscribe. Returns `Err` on:
3269    /// - Partition order violation (Phase H+ STRICT, D115) — caller defers.
3270    /// - Non-resubscribable terminal node (R2.2.7.b, D118) — caller skips.
3271    ///
3272    /// Used by `subscribe` (unwraps both errors as panics) and producer-
3273    /// pattern operator sinks (match on variant).
3274    #[allow(clippy::needless_pass_by_value)]
3275    pub fn try_subscribe(
3276        &self,
3277        node_id: NodeId,
3278        sink: Sink,
3279    ) -> Result<SubscriptionId, SubscribeError> {
3280        // Subscribe protocol (S4/D246/D248 single-owner; supersedes the
3281        // pre-S2c Slice E `wave_owner` ReentrantMutex sequence which was
3282        // deleted with the §7 machinery):
3283        //
3284        // 1. Acquire the state lock briefly: alloc sub_id, run
3285        //    resubscribable reset if applicable, snapshot handshake
3286        //    state, install sink in `subscribers` + bump
3287        //    `subscribers_revision` (Slice X4/D2 freeze). Drop the
3288        //    state lock.
3289        // 2. Fire the handshake LOCK-RELEASED. Per-tier slices
3290        //    (R1.3.5.a): `[Start]` / `[Data(cache)]?` / `[Complete]?` /
3291        //    `[Error(h)]?` / `[Teardown]?`. Empty tiers are skipped. A
3292        //    sink callback may re-enter Core via the owner-side
3293        //    mailbox/`DeferQueue` seam; the owner drain applies it as
3294        //    a nested wave.
3295        // 3. Run activation under `run_wave` if needed (first
3296        //    subscriber on a non-state node).
3297        //
3298        // Happens-after discipline (replaces the pre-S2c cross-thread
3299        // `wave_owner` BLOCK): `Core` is single-owner `!Send + !Sync`
3300        // (D248) — the one owner thread installs the sink under the
3301        // state lock BEFORE the handshake fires, and the
3302        // `subscribers_revision` bump (Slice X4/D2) freezes any
3303        // earlier-in-wave `PendingBatch` against double-delivery to
3304        // the new sink. There is no concurrent thread to block.
3305
3306        let (sub_id, tier_slices, needs_activation, did_reset) = {
3307            let mut s = self.lock_state();
3308
3309            // R2.2.7.b (D118, 2026-05-10): non-resubscribable + terminal →
3310            // reject. The stream is permanently over; subscribe gets a
3311            // clean error rather than a confusing replay of past events.
3312            // Operators (zip / concat / race / ...) match on the variant
3313            // and skip the source.
3314            //
3315            // The `has_received_teardown` flag is irrelevant here —
3316            // `terminal.is_some()` alone gates rejection. The auto-TEARDOWN
3317            // cascade in R2.6.4 / Lock 6.F means torn_down lags terminal
3318            // by at most one wave anyway; a brief mid-wave window where
3319            // `terminal.is_some() && !torn_down` is reachable but the
3320            // rejection decision doesn't depend on which side of that
3321            // window we're in.
3322            let should_reject = {
3323                let rec = s.require_node(node_id);
3324                !rec.resubscribable && rec.terminal.is_some()
3325            };
3326            if should_reject {
3327                drop(s);
3328                return Err(SubscribeError::TornDown { node: node_id });
3329            }
3330
3331            let sub_id = s.alloc_sub_id();
3332
3333            // R2.2.7.a (D118, 2026-05-10): resubscribable + terminal → reset
3334            // to fresh lifecycle, regardless of TEARDOWN state. The prior
3335            // `!has_received_teardown` guard (Slice A+B F3) conflated
3336            // "TEARDOWN is the cleanup signal of the previous activation
3337            // cycle" with "permanent destruction" — corrected per the
3338            // canonical-spec amendment. `reset_for_fresh_lifecycle` clears
3339            // `has_received_teardown` along with `terminal`,
3340            // `has_fired_once`, dep_records sentinels, pause lockset, and
3341            // replay buffer. `wipe_ctx` fires lock-released after the
3342            // state lock drops so the binding's `ctx.store` starts fresh.
3343            let needs_reset = {
3344                let rec = s.require_node(node_id);
3345                rec.resubscribable && rec.terminal.is_some()
3346            };
3347            if needs_reset {
3348                self.reset_for_fresh_lifecycle(&mut s, node_id);
3349            }
3350
3351            // Snapshot handshake state under lock.
3352            //
3353            // F5 (/qa 2026-05-10): post-D118 R2.2.7.a/b, the snapshot
3354            // ALWAYS sees a non-terminal node here — `should_reject`
3355            // already rejected non-resubscribable terminal above, and
3356            // `needs_reset` cleared resubscribable terminal back to
3357            // `terminal = None` / `has_received_teardown = false`.
3358            // The pre-D118 terminal-replay + teardown-replay branches
3359            // were dead code in this post-D118 sequence and are
3360            // removed.
3361            let (cache, is_state, first_subscriber) = {
3362                let rec = s.require_node(node_id);
3363                debug_assert!(
3364                    rec.terminal.is_none(),
3365                    "R2.2.7.a/b invariant: post-reject/reset, terminal must be None"
3366                );
3367                debug_assert!(
3368                    !rec.has_received_teardown,
3369                    "R2.2.7.a invariant: reset clears has_received_teardown"
3370                );
3371                (rec.cache, rec.is_state(), rec.subscribers.is_empty())
3372            };
3373
3374            // Build per-tier handshake slices. Each non-empty slice is
3375            // fired as a separate sink call (R1.3.5.a tier-split).
3376            let mut tier_slices: SmallVec<[Vec<Message>; 4]> = SmallVec::new();
3377            tier_slices.push(vec![Message::Start]);
3378            if cache != NO_HANDLE {
3379                tier_slices.push(vec![Message::Data(cache)]);
3380            }
3381            // Slice E1 (R2.6.5 / Lock 6.G): replay buffered DATA between
3382            // [Start] (and the cache slice, if present) and any terminal.
3383            // Each buffered handle becomes a separate per-tier slice so
3384            // late subscribers see the historical Data sequence as
3385            // distinct sink calls.
3386            //
3387            // Dedupe: when a cache slice is present and the buffer's last
3388            // entry is the same handle (the typical case — cache always
3389            // tracks the last DATA emitted, and the buffer's tail entry
3390            // is that same DATA), skip the last buffer entry to avoid
3391            // delivering Data(cache) twice. For state nodes whose cache
3392            // survives unsubscribe, the buffer may have older entries
3393            // the cache doesn't reflect; the dedupe only drops the
3394            // single trailing entry that equals cache. (QA A1, 2026-05-07)
3395            let replay_handles: Vec<HandleId> = {
3396                let rec = s.require_node(node_id);
3397                let cap = rec.replay_buffer_cap.unwrap_or(0);
3398                if cap == 0 {
3399                    Vec::new()
3400                } else {
3401                    let mut v: Vec<HandleId> = rec.replay_buffer.iter().copied().collect();
3402                    if cache != NO_HANDLE && v.last() == Some(&cache) {
3403                        v.pop();
3404                    }
3405                    v
3406                }
3407            };
3408            for h in &replay_handles {
3409                tier_slices.push(vec![Message::Data(*h)]);
3410            }
3411
3412            // Install sink BEFORE dropping state lock so any subsequent
3413            // owner-side wave (after our scope ends) sees the sink
3414            // already registered. (Pre-S2c the comment referenced
3415            // acquiring `wave_owner` for cross-thread serialization;
3416            // post-D248 single-owner there is no cross-thread emitter
3417            // and no `wave_owner` lock — the rule is just "install
3418            // before drop so future waves see the sink.")
3419            //
3420            // Slice X4 / D2: bump `subscribers_revision` alongside the
3421            // insert so a pending_notify entry opened earlier in the same
3422            // wave (e.g. inside `batch(|| { emit(s, h1); subscribe(s,
3423            // late); emit(s, h2); })`) starts a fresh `PendingBatch` on
3424            // its next `queue_notify` push — making the new sink visible
3425            // to subsequent emits' flush slices, while the pre-subscribe
3426            // batch's snapshot stays frozen so we don't double-deliver
3427            // earlier emits via the wave's flush AND the new sub's
3428            // handshake replay.
3429            {
3430                let rec = s.require_node_mut(node_id);
3431                rec.subscribers.insert(sub_id, sink.clone());
3432                rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
3433            }
3434
3435            let needs_activation = first_subscriber && !is_state;
3436            (sub_id, tier_slices, needs_activation, needs_reset)
3437            // state lock drops here
3438        };
3439
3440        // Slice E2 (R2.4.6 / D055): on resubscribable terminal reset, fire
3441        // `wipe_ctx` LOCK-RELEASED so the binding drops its `NodeCtxState`
3442        // entry (clearing both `store` and any residual `current_cleanup`).
3443        // The new subscriber's first invoke_fn sees a fresh empty store.
3444        // Fires AFTER the state lock drops so the binding's
3445        // `node_ctx.lock()` can't deadlock against Core's state lock — and
3446        // BEFORE the handshake so the wipe is observable before any
3447        // user-visible interaction with the new lifecycle.
3448        if did_reset {
3449            self.binding.wipe_ctx(node_id);
3450        }
3451
3452        // Fire handshake LOCK-RELEASED. A sink may re-enter Core via
3453        // the owner-side mailbox/`DeferQueue` seam; the owner drain
3454        // applies it as a nested wave. Single-owner `!Send` Core: no
3455        // `wave_owner` mutex, no cross-thread emitter (S4/D248).
3456        //
3457        // A7 (Slice F, 2026-05-07): per-tier slice fire is wrapped in
3458        // `catch_unwind`. The sink is installed in `subscribers` BEFORE
3459        // the handshake fires (load-bearing — concurrent threads observe
3460        // the sink immediately). If a sink panics on tier N, the panic
3461        // would otherwise unwind out of `subscribe` BEFORE the
3462        // `Subscription` handle is constructed, leaving the sink
3463        // registered in `subscribers` with no user-held handle to drop.
3464        // Subsequent waves' `flush_notifications` would re-fire the
3465        // panicking sink forever.
3466        //
3467        // On panic: remove the sink from `subscribers` (via the
3468        // already-allocated `sub_id`), drop `_wave_guard` cleanly via
3469        // RAII, and resume the unwind so the user observes the panic at
3470        // the call site. Same effect as the user dropping the
3471        // `Subscription` immediately, but pre-emptive.
3472        for slice in &tier_slices {
3473            let sink_clone = sink.clone();
3474            let slice_ref: &[Message] = slice;
3475            let result = catch_unwind(AssertUnwindSafe(|| sink_clone(slice_ref)));
3476            if let Err(panic_payload) = result {
3477                // Remove the orphaned sink. Best-effort: if the node was
3478                // since torn down (e.g., the sink itself called teardown
3479                // before panicking), the entry may already be gone.
3480                {
3481                    let mut s = self.lock_state();
3482                    if let Some(rec) = s.nodes.get_mut(&node_id) {
3483                        rec.subscribers.remove(&sub_id);
3484                        // Slice X4 / D2: keep revision-tracked snapshot
3485                        // discipline consistent with the install site —
3486                        // any pending_notify entry that already absorbed
3487                        // the panicking sink under the post-install
3488                        // revision should start a fresh batch on its
3489                        // next queue_notify push.
3490                        rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
3491                    }
3492                }
3493                std::panic::resume_unwind(panic_payload);
3494            }
3495        }
3496
3497        // D261 / S7: drain mailbox if the handshake-fire phase posted to
3498        // it. The per-tier handshake slices above fire sinks LOCK-RELEASED
3499        // outside any `BatchGuard`; a sink that posts via
3500        // `MailboxEmitter`/`SinkEmitter` (e.g. `graphrefly_structures::
3501        // reactive::SinkEmitter::emit`'s `mailbox.post_emit`) would
3502        // otherwise leave the post stranded until the next external wave
3503        // entry. Without this drain, push-on-subscribe replay for an
3504        // operator whose internal sink re-emits via the mailbox
3505        // (`reactive_log.view(slice/tail/fromCursor)`'s internal sinks)
3506        // never lands on the operator's output node by subscribe-time, so
3507        // the **next** `core.subscribe(operator_node, …)` sees `cache ==
3508        // NO_HANDLE` and emits an empty `[Data]` slice — same root
3509        // mechanism as D260 (mailbox post stranded past a sink-fire
3510        // boundary), different boundary (handshake-fire vs fire_deferred).
3511        // The `BatchGuard` here is a wave-scoped RAII handle; **its
3512        // `Drop`** (not its construction) runs `drain_and_flush` (applies
3513        // the queued mailbox/DeferQueue ops) + D260's post-`fire_deferred`
3514        // re-drain loop until both queues quiesce. The brief lifetime
3515        // — opened then dropped at the end of the `if` block, before
3516        // activation — is the entire purpose of the binding.
3517        //
3518        // Cheap on the no-post path: one `runnable` atomic load per
3519        // subscribe; on a positive load, one `BatchGuard` open+drop
3520        // round-trip that drains whatever post the handshake produced.
3521        //
3522        // **Nested-subscribe note.** If `try_subscribe` is called from
3523        // inside an already-owning wave (e.g. a Defer closure calling
3524        // `cf.subscribe`), `begin_batch_for` → `claim_in_tick` returns
3525        // `false` (already owned) → the `BatchGuard` is non-owning →
3526        // its `Drop` is a no-op (early-return at the `!owns_tick` guard
3527        // in `BatchGuard::Drop`). The outer wave's drain loop catches
3528        // the post at its next `is_runnable()` top-of-loop check. Cheap
3529        // and correct in both cases.
3530        if self.mailbox.is_runnable() || self.deferred.is_runnable() {
3531            // Use the same single-owner batch entry as `try_emit` — seed
3532            // is the node we just subscribed to (the partition-touch hint
3533            // is vestigial post-S2c per `try_begin_batch_for` doc, but
3534            // the seed is retained for D211 minimal-churn).
3535            let _drain = self.begin_batch_for(node_id);
3536        }
3537
3538        // Run activation if needed. `run_wave_for(node_id)` acquires
3539        // only the partitions transitively touched from `node_id`
3540        // (downstream cascade + meta-companion teardown reach) — same-
3541        // partition activation re-enters reentrantly. Slice Y1 / Phase E.
3542        if needs_activation {
3543            self.run_wave_for(node_id, |this| {
3544                let mut s = this.lock_state();
3545                this.activate_derived(&mut s, node_id);
3546            });
3547        }
3548
3549        // D246/S2c: no group wave-locks to drop (single-owner).
3550
3551        // Drain deferred producer ops now that no partitions are held
3552        // on this thread. The drain is a loop because each deferred op
3553        // may itself produce new deferred ops.
3554        self.drain_deferred_producer_ops();
3555
3556        Ok(sub_id)
3557    }
3558
3559    /// Mark `node_id` as resubscribable per R2.2.7. Resubscribable nodes
3560    /// reset their terminal-lifecycle state on a fresh subscribe — see
3561    /// [`Self::subscribe`].
3562    ///
3563    /// Configuration call — must be made before the node has any active
3564    /// subscribers, since changing the policy mid-flight would surprise
3565    /// existing observers.
3566    ///
3567    /// # Panics
3568    ///
3569    /// Panics if the node has subscribers (the policy is observable
3570    /// behavior; changing it after the fact would change semantics for
3571    /// existing sinks).
3572    pub fn set_resubscribable(&self, node_id: NodeId, resubscribable: bool) {
3573        // 2b-ii-B (D220-EXEC): route to the node's shard (see `try_emit`).
3574        let mut s = self.lock_state();
3575        let rec = s.require_node_mut(node_id);
3576        assert!(
3577            rec.subscribers.is_empty(),
3578            "set_resubscribable: node already has subscribers; \
3579             configure resubscribable before any subscribe call"
3580        );
3581        rec.resubscribable = resubscribable;
3582    }
3583
3584    /// Reset a resubscribable node's terminal-lifecycle state. Called from
3585    /// `subscribe` when a late subscriber arrives at a flagged node.
3586    ///
3587    /// Released: terminal-slot retain (Error handle), all per-dep terminal
3588    /// retains (Error handles), all data_batch retains.
3589    /// Cleared: `terminal`, `has_fired_once`, `has_received_teardown`, all
3590    /// dep_records to sentinel, the pause lockset (any held locks are
3591    /// released — replay buffer drops silently because there are no
3592    /// subscribers to flush to).
3593    fn reset_for_fresh_lifecycle(&self, s: &mut St<'_>, node_id: NodeId) {
3594        // Phase 1: collect wave-state handle releases + take the old
3595        // op_scratch + reset other state. Take all mutations under one
3596        // borrow so the post-borrow phases don't re-walk dep_records.
3597        let (prev_op, mut old_scratch, handles_to_release, pause_buffer_payloads) = {
3598            let rec = s.require_node_mut(node_id);
3599            let mut hs = Vec::new();
3600            if let Some(TerminalKind::Error(h)) = rec.terminal {
3601                hs.push(h);
3602            }
3603            for dr in &rec.dep_records {
3604                if let Some(TerminalKind::Error(h)) = dr.terminal {
3605                    hs.push(h);
3606                }
3607                for &h in &dr.data_batch {
3608                    hs.push(h);
3609                }
3610                // Slice C-3 /qa: also release `prev_data`. Prior to this
3611                // collection, `reset_for_fresh_lifecycle` overwrote
3612                // `dr.prev_data = NO_HANDLE` without releasing the old
3613                // handle, leaking one share per dep per resubscribable
3614                // cycle. The leak was masked because no test exercised
3615                // the per-dep `prev_data` retain across a lifecycle
3616                // reset; surfaced by the T1 tightening of
3617                // `last_releases_buffered_latest_on_lifecycle_reset`.
3618                if dr.prev_data != NO_HANDLE {
3619                    hs.push(dr.prev_data);
3620                }
3621            }
3622            // Take pause_state's buffer; collect its payload handles for
3623            // release (they were retained at queue_notify time; buffer
3624            // drops because the new subscriber starts fresh).
3625            let mut pulled = Vec::new();
3626            if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
3627                for msg in buffer.drain(..) {
3628                    if let Some(h) = msg.payload_handle() {
3629                        pulled.push(h);
3630                    }
3631                }
3632            }
3633            // Slice E1: drain the replay buffer too — the new subscriber
3634            // gets a fresh lifecycle and shouldn't see prior emissions.
3635            for h in rec.replay_buffer.drain(..) {
3636                pulled.push(h);
3637            }
3638            // Reset wave / lifecycle state.
3639            rec.terminal = None;
3640            rec.has_fired_once = rec.cache != NO_HANDLE && rec.is_state();
3641            rec.has_received_teardown = false;
3642            for dr in &mut rec.dep_records {
3643                dr.prev_data = NO_HANDLE;
3644                dr.data_batch.clear();
3645                dr.terminal = None;
3646                dr.dirty = false;
3647                dr.involved_this_wave = false;
3648            }
3649            rec.pause_state = PauseState::Active;
3650            rec.involved_this_wave = false;
3651            rec.dirty = false;
3652            // §10.13 perf (D047): reset received_mask — fresh lifecycle
3653            // means all deps are sentinel again.
3654            rec.received_mask = 0;
3655            // §10.3 perf (Slice V1): reset involved_mask.
3656            rec.involved_mask = 0;
3657            // P7 (Slice A close /qa): Dynamic nodes clear `tracked` so
3658            // the post-reset first fire repopulates from the fn's
3659            // returned tracked-deps set.
3660            if rec.is_dynamic {
3661                rec.tracked.clear();
3662            }
3663            // Take the old scratch out so we can release its handles and
3664            // install a fresh one. Operator op is copied for the
3665            // rebuild step below.
3666            let prev_op = rec.op;
3667            let old = std::mem::take(&mut rec.op_scratch);
3668            (prev_op, old, hs, pulled)
3669        };
3670
3671        // Phase 2 (Slice C-3 /qa P1 — RETAIN-BEFORE-RELEASE ordering):
3672        // build the fresh scratch FIRST, taking new retains on any
3673        // seed/default handles. This must run BEFORE Phase 3 releases
3674        // the old scratch's shares — if old `acc` (Scan/Reduce) or old
3675        // `latest` (Last) aliases the new `seed`/`default` (common:
3676        // `fold(seed, x) == seed` interns to the same registry entry),
3677        // releasing the old share first could collapse the binding's
3678        // registry slot to zero (production bindings remove the value
3679        // entry on refcount-zero — see `tests/common/mod.rs:191-204`),
3680        // and a subsequent `retain_handle` on the new seed would bump a
3681        // refcount on a slot whose value has been removed. By taking
3682        // the new retains first, we floor the refcount at ≥1 before
3683        // any release happens.
3684        let new_scratch = match prev_op {
3685            // Slice H: the OperatorOp stored on NodeRecord previously
3686            // passed `make_op_scratch` validation at registration time
3687            // (RegisterError::OperatorSeedSentinel for Scan/Reduce
3688            // sentinel seeds; Last { default: NO_HANDLE } is accepted
3689            // and never errors). Re-running it here on the same op
3690            // value is structurally guaranteed to succeed.
3691            Some(op) => self
3692                .make_op_scratch(op)
3693                .expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time"),
3694            None => None,
3695        };
3696
3697        // Phase 3: NOW release handles owned by the old op_scratch
3698        // (Scan/Reduce acc, Distinct/Pairwise prev, Last latest +
3699        // default). Safe per Phase 2's retain-first floor. The boxed
3700        // value is consumed and dropped after.
3701        if let Some(scratch) = old_scratch.as_mut() {
3702            scratch.release_handles(&*self.binding);
3703        }
3704        drop(old_scratch);
3705
3706        // Phase 3b (D-α, D028 full close, 2026-05-10): drain the
3707        // per-Core `pending_scratch_release` queue populated by Phase G
3708        // on prior resubscribable + non-terminal deactivate cycles.
3709        // Queued boxes' shares may alias the seed/default we just
3710        // retained in Phase 2 (e.g., when scan's `acc` interns to the
3711        // same registry slot as `seed`). Releasing AFTER Phase 2's
3712        // floor keeps the registry slot at refcount ≥1 throughout —
3713        // mirrors the Phase 2/3 retain-before-release ordering. Safe
3714        // even when the queue is empty (no prior deactivations
3715        // happened).
3716        let queued: Vec<Box<dyn crate::op_state::OperatorScratch>> =
3717            std::mem::take(&mut s.shared.pending_scratch_release);
3718        for mut scratch in queued {
3719            scratch.release_handles(&*self.binding);
3720        }
3721
3722        // Phase 4: install the fresh scratch.
3723        {
3724            let rec = s.require_node_mut(node_id);
3725            rec.op_scratch = new_scratch;
3726        }
3727
3728        // Phase 5: release wave-state handles collected in phase 1.
3729        for h in handles_to_release {
3730            self.binding.release_handle(h);
3731        }
3732        for h in pause_buffer_payloads {
3733            self.binding.release_handle(h);
3734        }
3735    }
3736
3737    /// Activate `root` and any transitive uncached compute deps so their
3738    /// caches fill our dep_handles slots.
3739    ///
3740    /// Slice A close (M1): pure dep-walk + dep_handles population +
3741    /// pending_fires queueing. No `in_tick` management or `drain_and_flush`
3742    /// call — the outer caller (typically `Core::subscribe` via
3743    /// [`Core::run_wave`]) owns the wave lifecycle and drains lock-released
3744    /// around `invoke_fn`.
3745    ///
3746    /// Walk shape:
3747    ///   1. **Discover phase (DFS via Vec stack):** starting at `root`,
3748    ///      walk transitively-needing-activation deps via the `deps`
3749    ///      chain. Build an ordering where each node appears AFTER all
3750    ///      of its uncached compute deps — i.e., reverse topological
3751    ///      among the visited subgraph.
3752    ///   2. **Deliver phase (forward iteration):** for each visited
3753    ///      node in dep-first order, push deps' caches into the node's
3754    ///      `dep_handles` slots. Caches that were sentinel pre-walk are
3755    ///      filled because their parent's fn fires later in the wave's
3756    ///      drain loop and `commit_emission` propagates new caches forward
3757    ///      via `deliver_data_to_consumer` — the same path this method
3758    ///      uses for the initial seed. Adds the node to `pending_fires`
3759    ///      if its tracked-deps gate is satisfied; the wave-engine drain
3760    ///      fires the fn lock-released around `invoke_fn`.
3761    pub(crate) fn activate_derived(&self, s: &mut CoreState, root: NodeId) {
3762        // Phase 1: discover. DFS to collect every compute node reachable
3763        // via deps that doesn't yet have a cache and hasn't fired.
3764        // Record them in dep-first (post-order) so phase 2 can deliver
3765        // caches forward. Frame is `(node_id, finalize)` — `finalize=false`
3766        // means "first visit: push deps then re-push self with finalize=true";
3767        // `finalize=true` means "deps have been expanded, append self to
3768        // `order`."
3769        let mut visited: HashSet<NodeId> = HashSet::new();
3770        let mut order: Vec<NodeId> = Vec::new();
3771        let mut stack: Vec<(NodeId, bool)> = vec![(root, false)];
3772        while let Some((id, finalize)) = stack.pop() {
3773            if finalize {
3774                order.push(id);
3775                continue;
3776            }
3777            if !visited.insert(id) {
3778                continue;
3779            }
3780            stack.push((id, true));
3781            let dep_ids: Vec<NodeId> = s.require_node(id).dep_ids_vec();
3782            for dep_id in dep_ids {
3783                let (dep_is_state, dep_cache, dep_has_fired) = {
3784                    let dep_rec = s.require_node(dep_id);
3785                    (dep_rec.is_state(), dep_rec.cache, dep_rec.has_fired_once)
3786                };
3787                if !dep_is_state
3788                    && dep_cache == NO_HANDLE
3789                    && !dep_has_fired
3790                    && !visited.contains(&dep_id)
3791                {
3792                    stack.push((dep_id, false));
3793                }
3794            }
3795        }
3796
3797        // Phase 2: deliver caches in dep-first order. For each node, walk
3798        // its deps and call `deliver_data_to_consumer` for any with caches.
3799        // Producer nodes (no deps + has fn — Slice D, D031) have no deps
3800        // to walk; queue them directly into `pending_fires` so the wave
3801        // engine fires their fn once on activation.
3802        //
3803        // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
3804        // per-thread WaveState. State lock + thread_local borrow are
3805        // independent; deliver_data_to_consumer also writes pending_fires
3806        // via WaveState (no nested with_wave_state borrows here).
3807        for &id in &order {
3808            let (dep_ids, is_producer) = {
3809                let rec = s.require_node(id);
3810                (rec.dep_ids_vec(), rec.is_producer())
3811            };
3812            if is_producer {
3813                crate::batch::with_wave_state(|ws| {
3814                    ws.pending_fires.insert(id);
3815                });
3816                continue;
3817            }
3818            for (i, dep_id) in dep_ids.iter().copied().enumerate() {
3819                let dep_cache = s.require_node(dep_id).cache;
3820                if dep_cache != NO_HANDLE {
3821                    self.deliver_data_to_consumer(s, id, i, dep_cache);
3822                }
3823            }
3824        }
3825    }
3826
3827    // -------------------------------------------------------------------
3828    // Emission entry point
3829    // -------------------------------------------------------------------
3830
3831    /// Set a state node's value. Triggers a wave (DIRTY → DATA/RESOLVED →
3832    /// fn fires for downstream).
3833    ///
3834    /// Silent no-op if the node has already terminated (R1.3.4). The handle
3835    /// passed in is still released by the caller's binding-side intern path
3836    /// — no implicit retain is consumed when the call short-circuits.
3837    ///
3838    /// # Panics
3839    ///
3840    /// Panics if `node_id` is not a state node, or if `new_handle` is
3841    /// [`NO_HANDLE`] (per R1.2.4, sentinel is not a valid DATA payload).
3842    pub fn emit(&self, node_id: NodeId, new_handle: HandleId) {
3843        match self.try_emit(node_id, new_handle) {
3844            Ok(()) => {}
3845            Err(e) => panic!("{e}"),
3846        }
3847    }
3848
3849    /// Fallible emit. Returns `Err` on partition order violation
3850    /// (Phase H+ STRICT, D115). The public `emit` calls this and
3851    /// unwraps; `emit_or_defer` calls this and defers on Err.
3852    pub(crate) fn try_emit(
3853        &self,
3854        node_id: NodeId,
3855        new_handle: HandleId,
3856    ) -> Result<(), PartitionOrderViolation> {
3857        // Step 2b-ii-B (D220-EXEC): route this whole entry point to the
3858        // node's shard. Single-node public entry points do a pre-wave
3859        // `lock_state().require_node(node)` BEFORE `try_run_wave_for`
3860        // sets the ambient — a grouped node's record is in shard `g`,
3861        // not `DEFAULT_SHARD`, so without this the validation panics
3862        // `unknown node`. Held for the whole fn; the nested
3863        // `try_run_wave_for(node_id)` re-derives the same key
3864        // (no-op restore). `None`/all-`None` ⇒ behaviour-identical.
3865        assert!(
3866            new_handle != NO_HANDLE,
3867            "NO_HANDLE is not a valid DATA payload (R1.2.4)"
3868        );
3869        // Validate + terminal short-circuit under a brief lock.
3870        //
3871        // emit() is valid for State and Producer nodes — both are
3872        // intrinsic sources whose values are not derived from declared
3873        // deps. State nodes get emit() from user code; Producer nodes
3874        // get emit() from sink callbacks the producer's build closure
3875        // registered (sink fires → re-enter Core → emit on self).
3876        // Derived / Dynamic / Operator nodes emit via their fn return
3877        // value through fire_fn / fire_operator, NOT via emit().
3878        //
3879        // §7-A (deferred): this standalone validation lock is redundant
3880        // with commit_emission Phase 1 and collapsing it is part of §7
3881        // item (1); §5b measured ~0% single-thread benefit so the
3882        // collapse is deferred (porting-deferred.md §7-A, flagged for
3883        // ratification). The normal-path validation is retained here.
3884        {
3885            let s = self.lock_state();
3886            let rec = s.require_node(node_id);
3887            assert!(
3888                rec.is_state() || rec.is_producer(),
3889                "emit() is for state or producer nodes only; \
3890                 derived/dynamic/operator emit via their fn return value"
3891            );
3892            if rec.terminal.is_some() {
3893                drop(s);
3894                // Caller's intern share would otherwise leak; cache slot
3895                // ownership doesn't transfer because we're not advancing
3896                // cache. Released lock-released so the binding can't
3897                // deadlock against an internal binding mutex.
3898                self.binding.release_handle(new_handle);
3899                return Ok(());
3900            }
3901        }
3902        // Run the wave for `node_id`. Slice Y1 / Phase E: emit cascades
3903        // only via `s.children`. S4/D248: single-owner `!Send` Core —
3904        // one uninterrupted owner-side drain (no `wave_owner` mutex,
3905        // no cross-thread parallel waves within a Core); cross-`Core`
3906        // parallelism is host-native via independent per-worker Cores.
3907        self.try_run_wave_for(node_id, |this| {
3908            this.commit_emission(node_id, new_handle);
3909        })?;
3910        Ok(())
3911    }
3912
3913    /// Emit or defer to wave-end on partition order violation.
3914    /// For producer-pattern operator sinks. Retains `handle` on defer;
3915    /// the drain releases it after firing (or on discard).
3916    pub fn emit_or_defer(&self, node_id: NodeId, new_handle: HandleId) {
3917        if self.try_emit(node_id, new_handle).is_err() {
3918            self.binding.retain_handle(new_handle);
3919            self.push_deferred_producer_op(DeferredProducerOp::Emit {
3920                node_id,
3921                handle: new_handle,
3922            });
3923        }
3924    }
3925
3926    /// Read a node's current cache. Returns [`NO_HANDLE`] if sentinel.
3927    #[must_use]
3928    pub fn cache_of(&self, node_id: NodeId) -> HandleId {
3929        self.lock_state().require_node(node_id).cache
3930    }
3931
3932    /// Whether the node's fn has fired at least once (compute) OR it has had
3933    /// a non-sentinel value (state).
3934    #[must_use]
3935    pub fn has_fired_once(&self, node_id: NodeId) -> bool {
3936        self.lock_state().require_node(node_id).has_fired_once
3937    }
3938
3939    // -------------------------------------------------------------------
3940    // Read-side inspection helpers (Slice E+, M2)
3941    //
3942    // Non-panicking accessors for graph-layer introspection (`describe()`,
3943    // `observe()`, `node_count()`). All five return Option/empty for
3944    // unknown ids — they're meant to back walks over `node_ids()` where
3945    // the caller already knows the ids are valid, plus debugging /
3946    // dry-run probes that prefer "absence" over "panic".
3947    //
3948    // Keep these strictly read-only: no wave entry, no binding callbacks,
3949    // no lock release. Each takes the state lock once, copies a small
3950    // value, and drops the lock.
3951    // -------------------------------------------------------------------
3952
3953    /// Snapshot of every registered `NodeId` in unspecified order. The
3954    /// order matches `HashMap` iteration over the internal node table —
3955    /// callers that need stable ordering should track names at the
3956    /// `Graph` layer (canonical spec §3.5 namespace).
3957    #[must_use]
3958    pub fn node_ids(&self) -> Vec<NodeId> {
3959        self.lock_state().nodes.keys().copied().collect()
3960    }
3961
3962    /// Total number of nodes registered in this Core.
3963    #[must_use]
3964    pub fn node_count(&self) -> usize {
3965        self.lock_state().nodes.len()
3966    }
3967
3968    /// Returns `Some(kind)` for known nodes, `None` for unknown. Kind is
3969    /// **derived** from the field shape per D030 — see [`NodeKind`].
3970    #[must_use]
3971    pub fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
3972        self.lock_state().nodes.get(&node_id).map(NodeRecord::kind)
3973    }
3974
3975    /// Snapshot of the node's deps in declaration order. Empty for
3976    /// unknown nodes or for state nodes (which have no deps).
3977    #[must_use]
3978    pub fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
3979        self.lock_state()
3980            .nodes
3981            .get(&node_id)
3982            .map(NodeRecord::dep_ids_vec)
3983            .unwrap_or_default()
3984    }
3985
3986    /// Returns `Some(kind)` if the node has terminated (R1.3.4) — the
3987    /// pair `Some(Complete)` / `Some(Error(h))` mirrors the wire message
3988    /// the node emitted. `None` for live nodes or unknown ids.
3989    #[must_use]
3990    pub fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
3991        self.lock_state()
3992            .nodes
3993            .get(&node_id)
3994            .and_then(|r| r.terminal)
3995    }
3996
3997    /// Whether the node has wave-scoped DIRTY pending (a tier-1 message
3998    /// queued but the matching tier-3 settle has not yet flushed).
3999    /// `false` for unknown ids. Mostly useful for `describe()` status
4000    /// classification (R3.6.1 `"dirty"`).
4001    #[must_use]
4002    pub fn is_dirty(&self, node_id: NodeId) -> bool {
4003        self.lock_state()
4004            .nodes
4005            .get(&node_id)
4006            .is_some_and(|r| r.dirty)
4007    }
4008
4009    /// Snapshot of `parent`'s meta companion list (R1.3.9.d / R2.3.3 —
4010    /// the companions added via [`Self::add_meta_companion`]). Empty
4011    /// for unknown ids or for nodes with no companions registered.
4012    ///
4013    /// Used by the graph layer's `signal_invalidate` to filter meta
4014    /// children out of the broadcast (canonical R3.7.2 — meta caches
4015    /// are preserved across graph-wide INVALIDATE).
4016    #[must_use]
4017    pub fn meta_companions_of(&self, parent: NodeId) -> Vec<NodeId> {
4018        self.lock_state()
4019            .nodes
4020            .get(&parent)
4021            .map(|r| r.meta_companions.clone())
4022            .unwrap_or_default()
4023    }
4024
4025    // -------------------------------------------------------------------
4026    // Wave engine — lives in `crate::batch` (Slice C-1 module split;
4027    // Slice A close M1 refactor lifted the binding-callback re-entrance
4028    // restrictions). The methods are still on `Core`; see `batch.rs` for:
4029    //
4030    //   - `run_wave` — wave entry, manages own locking.
4031    //   - `drain_and_flush` — drain phase, lock-released around invoke_fn.
4032    //   - `commit_emission` — lock-released around custom_equals.
4033    //   - `pick_next_fire`, `deliver_data_to_consumer`, `queue_notify`,
4034    //     `flush_notifications` — wave-engine helpers.
4035    // -------------------------------------------------------------------
4036}
4037
4038// -----------------------------------------------------------------------
4039// COMPLETE / ERROR — terminal lifecycle + auto-cascade gating
4040// -----------------------------------------------------------------------
4041
4042impl Core {
4043    /// Emit `[COMPLETE]` (R1.3.4) on `node_id`, marking it terminal. After
4044    /// this call:
4045    ///
4046    /// - Subsequent `Core::emit` on this node is a silent no-op (idempotent
4047    ///   termination).
4048    /// - The node's fn no longer fires.
4049    /// - The node's cache is preserved (last value still observable via
4050    ///   `cache_of`).
4051    /// - Children receive `[COMPLETE]` (tier 5 — bypasses pause buffer).
4052    /// - Auto-cascade gating (Lock 2.B): each child that has all of its
4053    ///   deps in a terminal state auto-emits its own `[COMPLETE]`. ERROR
4054    ///   dominates COMPLETE — if any of a child's deps emitted ERROR, the
4055    ///   child auto-cascades that ERROR instead.
4056    ///
4057    /// Idempotent: calling `complete` on an already-terminal node is a no-op.
4058    ///
4059    /// # Panics
4060    ///
4061    /// Panics if `node_id` is unknown.
4062    pub fn complete(&self, node_id: NodeId) {
4063        match self.try_complete(node_id) {
4064            Ok(()) => {}
4065            Err(e) => panic!("{e}"),
4066        }
4067    }
4068
4069    /// Fallible complete. Returns `Err` on partition order violation.
4070    pub(crate) fn try_complete(&self, node_id: NodeId) -> Result<(), PartitionOrderViolation> {
4071        self.try_emit_terminal(node_id, TerminalKind::Complete)
4072    }
4073
4074    /// Complete or defer to wave-end on partition order violation.
4075    /// For producer-pattern operator sinks.
4076    pub fn complete_or_defer(&self, node_id: NodeId) {
4077        match self.try_complete(node_id) {
4078            Ok(()) => {}
4079            Err(_) => {
4080                self.push_deferred_producer_op(DeferredProducerOp::Complete { node_id });
4081            }
4082        }
4083    }
4084
4085    /// Emit `[ERROR, error_handle]` (R1.3.4) on `node_id`. `error_handle`
4086    /// must resolve to a non-sentinel value (R1.2.5) — the binding side has
4087    /// already interned the error value before this call. Same lifecycle
4088    /// effects as [`Self::complete`]; ERROR dominates COMPLETE in auto-
4089    /// cascade gating.
4090    ///
4091    /// # Panics
4092    ///
4093    /// Panics if `node_id` is unknown or `error_handle == NO_HANDLE`.
4094    pub fn error(&self, node_id: NodeId, error_handle: HandleId) {
4095        match self.try_error(node_id, error_handle) {
4096            Ok(()) => {}
4097            Err(e) => panic!("{e}"),
4098        }
4099    }
4100
4101    /// Fallible error. Returns `Err` on partition order violation.
4102    pub(crate) fn try_error(
4103        &self,
4104        node_id: NodeId,
4105        error_handle: HandleId,
4106    ) -> Result<(), PartitionOrderViolation> {
4107        assert!(
4108            error_handle != NO_HANDLE,
4109            "NO_HANDLE is not a valid ERROR payload (R1.2.5)"
4110        );
4111        self.try_emit_terminal(node_id, TerminalKind::Error(error_handle))?;
4112        // The caller's intern share for `error_handle` is NOT transferred
4113        // to any slot — `terminate_node` takes its OWN retain for every
4114        // populated `terminal` and `dep_terminals` slot. Release the
4115        // caller's share here (mirrors `Core::emit`'s short-circuit
4116        // release on terminal). Without this, every `error()` call leaks
4117        // one binding-side handle ref. Slice A-bigger /qa item D fix.
4118        self.binding.release_handle(error_handle);
4119        Ok(())
4120    }
4121
4122    /// Error or defer to wave-end on partition order violation.
4123    /// For producer-pattern operator sinks. Retains `handle` on defer;
4124    /// the drain releases it after firing (or on discard).
4125    pub fn error_or_defer(&self, node_id: NodeId, error_handle: HandleId) {
4126        if self.try_error(node_id, error_handle).is_err() {
4127            self.binding.retain_handle(error_handle);
4128            self.push_deferred_producer_op(DeferredProducerOp::Error {
4129                node_id,
4130                handle: error_handle,
4131            });
4132        }
4133    }
4134
4135    fn try_emit_terminal(
4136        &self,
4137        node_id: NodeId,
4138        terminal: TerminalKind,
4139    ) -> Result<(), PartitionOrderViolation> {
4140        {
4141            let s = self.lock_state();
4142            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
4143        }
4144        // Wave on `node_id`'s touched partitions (Slice Y1 / Phase E).
4145        // COMPLETE / ERROR cascade follows `s.children` (in-partition by
4146        // union-find construction). The thunk acquires its own state lock
4147        // to queue the cascade.
4148        self.try_run_wave_for(node_id, |this| {
4149            let mut s = this.lock_state();
4150            this.terminate_node(&mut s, node_id, terminal);
4151        })
4152    }
4153
4154    /// Set the node's terminal slot, queue the wire message, and cascade to
4155    /// children. Idempotent on already-terminal node (no-op).
4156    ///
4157    /// Iterative implementation (Slice A-bigger, M1-close): a work-queue
4158    /// drives the cascade so deep linear chains don't overflow the OS
4159    /// thread stack. Mirrors `path_from_to`'s explicit-stack pattern.
4160    fn terminate_node(&self, s: &mut St<'_>, node_id: NodeId, terminal: TerminalKind) {
4161        let mut work: Vec<(NodeId, TerminalKind)> = vec![(node_id, terminal)];
4162        while let Some((id, t)) = work.pop() {
4163            if s.require_node(id).terminal.is_some() {
4164                continue; // Idempotent — already terminal.
4165            }
4166            // Take a refcount share for the terminal slot so the error
4167            // handle outlives the binding-side intern's transient share.
4168            if let TerminalKind::Error(h) = t {
4169                self.binding.retain_handle(h);
4170            }
4171            // Slice E2 /qa Q2(b) (D069): if a resubscribable node is
4172            // terminating with no live subscribers, queue eager
4173            // `wipe_ctx` for the wave's lock-released drain. This is the
4174            // mutually-exclusive complement of the `Subscription::Drop`
4175            // wipe site: when the LAST sub drops first then terminate
4176            // fires, subs are empty here and we queue; when terminate
4177            // fires WITH subs still alive, we DON'T queue (subs not
4178            // empty), and `Subscription::Drop` will fire wipe directly
4179            // when those subs eventually drop. Either way, exactly one
4180            // wipe fires per terminal lifecycle.
4181            let queue_wipe = {
4182                let rec = s.require_node(id);
4183                rec.resubscribable && rec.subscribers.is_empty()
4184            };
4185            s.require_node_mut(id).terminal = Some(t);
4186            // Q-beyond Sub-slice 2 + 3 (D108, 2026-05-09): pending_fires
4187            // and pending_wipes both live on per-thread WaveState. Single
4188            // borrow handles the queue-wipe push and the pending_fires
4189            // remove.
4190            crate::batch::with_wave_state(|ws| {
4191                if queue_wipe {
4192                    ws.pending_wipes.push(id);
4193                }
4194                // Drain pending fires for this node — fn won't fire on a
4195                // terminal node.
4196                ws.pending_fires.remove(&id);
4197            });
4198            // R1.3.8.b / Slice F (A3, 2026-05-07): if this node was paused
4199            // when terminating (the canonical case is the R1.3.8.c overflow
4200            // ERROR synthesis path), drain the pause buffer and release
4201            // each payload's queue_notify-time retain. Without this, the
4202            // buffer leaks one share per buffered DATA/RESOLVED/INVALIDATE.
4203            // Subscribers receive the terminal directly via the cascade
4204            // below (tier-5 bypasses the pause buffer); the buffered
4205            // content is moot post-terminal.
4206            let drained: Vec<HandleId> = {
4207                let rec = s.require_node_mut(id);
4208                let mut drained: Vec<HandleId> = Vec::new();
4209                if rec.pause_state.is_paused() {
4210                    // Take the buffered messages out, then collapse the
4211                    // pause state to Active so subsequent code observes a
4212                    // clean lifecycle. Idempotent on Active (no-op).
4213                    let prev = std::mem::replace(&mut rec.pause_state, PauseState::Active);
4214                    if let PauseState::Paused { buffer, .. } = prev {
4215                        drained.extend(buffer.into_iter().filter_map(Message::payload_handle));
4216                    }
4217                }
4218                // QA A4 (2026-05-07): drain replay buffer on terminate. A
4219                // non-resubscribable terminal ends the lifecycle; without
4220                // this drain the buffer's retains leak until `Drop for
4221                // CoreState`. Resubscribable nodes' replay buffers are
4222                // also drained (when they're hit by a terminal cascade);
4223                // a fresh subscribe rebuilds the buffer from scratch as
4224                // part of `reset_for_fresh_lifecycle`.
4225                drained.extend(rec.replay_buffer.drain(..));
4226                drained
4227            };
4228            for h in drained {
4229                self.binding.release_handle(h);
4230            }
4231            // Queue the wire message (tier 5 — bypasses pause buffer).
4232            let msg = match t {
4233                TerminalKind::Complete => Message::Complete,
4234                TerminalKind::Error(h) => Message::Error(h),
4235            };
4236            self.queue_notify(s, id, msg);
4237            // Cascade to children.
4238            let child_ids: Vec<NodeId> = s
4239                .children
4240                .get(&id)
4241                .map(|c| c.iter().copied().collect())
4242                .unwrap_or_default();
4243            for child_id in child_ids {
4244                let dep_idx = s.require_node(child_id).dep_index_of(id);
4245                let Some(idx) = dep_idx else { continue };
4246                // Mark this child's per-dep terminal slot. Take a retain on
4247                // the error handle for the slot share.
4248                {
4249                    let child = s.require_node_mut(child_id);
4250                    if child.dep_records[idx].terminal.is_some() {
4251                        // Idempotent — child already saw this dep terminate.
4252                        continue;
4253                    }
4254                    child.dep_records[idx].terminal = Some(t);
4255                }
4256                if let TerminalKind::Error(h) = t {
4257                    self.binding.retain_handle(h);
4258                }
4259                // Auto-cascade gating: if all deps now terminal, push child
4260                // onto the work queue with the chosen terminal.
4261                //
4262                // Slice C-1: kinds that opt out of Lock 2.B (currently
4263                // `Operator(Reduce)`) intercept upstream COMPLETE so they
4264                // can emit their accumulator before terminating. Instead of
4265                // cascading, queue the child for fn-fire — `fire_operator`
4266                // sees `dep_records[0].terminal` set and emits the
4267                // appropriate batch (Data(acc) + Complete on COMPLETE,
4268                // Error(h) on ERROR).
4269                let action = {
4270                    let child = s.require_node(child_id);
4271                    if child.terminal.is_some() {
4272                        ChildAction::None // Already terminated — no-op.
4273                    } else if child.all_deps_terminal() {
4274                        if child.skips_auto_cascade() {
4275                            ChildAction::QueueFire
4276                        } else {
4277                            ChildAction::Cascade(pick_cascade_terminal(&child.dep_records))
4278                        }
4279                    } else {
4280                        ChildAction::None
4281                    }
4282                };
4283                match action {
4284                    ChildAction::None => {}
4285                    ChildAction::Cascade(t_child) => {
4286                        work.push((child_id, t_child));
4287                    }
4288                    ChildAction::QueueFire => {
4289                        // Q-beyond Sub-slice 2 (D108, 2026-05-09):
4290                        // pending_fires lives on per-thread WaveState.
4291                        crate::batch::with_wave_state(|ws| {
4292                            ws.pending_fires.insert(child_id);
4293                        });
4294                    }
4295                }
4296            }
4297        }
4298    }
4299}
4300
4301/// Outcome of Lock 2.B child gating in `terminate_node`'s cascade walk.
4302enum ChildAction {
4303    /// No cascade; child is already terminal or not yet all-deps-terminal.
4304    None,
4305    /// Auto-cascade with the picked terminal kind (ERROR dominates COMPLETE).
4306    Cascade(TerminalKind),
4307    /// Queue child for fn-fire instead of cascading. Used by operator
4308    /// kinds that intercept upstream terminal (Operator(Reduce)).
4309    QueueFire,
4310}
4311
4312/// Lock 2.B cascade-terminal selection: ERROR dominates COMPLETE; first
4313/// ERROR seen wins. Caller has already verified all deps are terminal.
4314fn pick_cascade_terminal(dep_records: &[DepRecord]) -> TerminalKind {
4315    for dr in dep_records {
4316        if let Some(TerminalKind::Error(h)) = dr.terminal {
4317            return TerminalKind::Error(h);
4318        }
4319    }
4320    TerminalKind::Complete
4321}
4322
4323// -----------------------------------------------------------------------
4324// TEARDOWN — destruction, with auto-COMPLETE prepend (R2.6.4 / Lock 6.F)
4325// -----------------------------------------------------------------------
4326
4327impl Core {
4328    /// Tear `node_id` down. Per R2.6.4 / Lock 6.F:
4329    ///
4330    /// - **Auto-prepend COMPLETE.** If the node has not yet emitted a
4331    ///   terminal (`COMPLETE` / `ERROR`), `terminate_node` is called with
4332    ///   `Complete` first so subscribers see `[COMPLETE, TEARDOWN]`, not
4333    ///   bare `[TEARDOWN]`. This guarantees a clean end-of-stream signal
4334    ///   to async iterators and other consumers that wait on terminal
4335    ///   delivery.
4336    /// - **Idempotent on duplicate delivery.** The per-node
4337    ///   `has_received_teardown` flag is set on the first call; subsequent
4338    ///   `teardown` calls (or cascade visits from other paths) are silent
4339    ///   no-ops — no second `[COMPLETE, TEARDOWN]` pair to subscribers.
4340    /// - **Cascade downstream.** Each child is recursively torn down. The
4341    ///   child's own COMPLETE auto-cascades from `terminate_node`'s logic
4342    ///   (Lock 2.B); its TEARDOWN comes from this cascade.
4343    ///
4344    /// # Panics
4345    ///
4346    /// Panics if `node_id` is unknown.
4347    pub fn teardown(&self, node_id: NodeId) {
4348        match self.try_teardown(node_id) {
4349            Ok(()) => {}
4350            Err(e) => panic!("{e}"),
4351        }
4352    }
4353
4354    /// Teardown or defer to wave-end on partition order violation.
4355    /// For producer-pattern operator sinks.
4356    pub fn teardown_or_defer(&self, node_id: NodeId) {
4357        match self.try_teardown(node_id) {
4358            Ok(()) => {}
4359            Err(_) => {
4360                // S2b (D223): `Core` is no longer `Clone`. The old
4361                // `DeferredProducerOp::Callback(move || core.teardown(..))`
4362                // boxed a cloned `Core` only to have
4363                // `push_deferred_producer_op` run it *immediately* (the
4364                // deferred queue is a deleted D211 no-op shim). A direct
4365                // owner-context call is behaviour-identical and drops the
4366                // pointless `Send`-closure (which now needs `C: Sync`).
4367                self.teardown(node_id);
4368            }
4369        }
4370    }
4371
4372    fn try_teardown(&self, node_id: NodeId) -> Result<(), PartitionOrderViolation> {
4373        {
4374            let s = self.lock_state();
4375            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
4376        }
4377        let torn_down: Arc<Mutex<Vec<NodeId>>> = Arc::new(Mutex::new(Vec::new()));
4378        let torn_down_for_wave = torn_down.clone();
4379        // TEARDOWN cascade follows `s.children` AND `meta_companions`
4380        // (R1.3.9.d) — meta-companions can cross partitions. Slice Y1 /
4381        // Phase E `compute_touched_partitions(node_id)` (called by
4382        // `run_wave_for`) walks both axes so the wave acquires every
4383        // partition reachable via the cascade.
4384        self.try_run_wave_for(node_id, move |this| {
4385            let mut s = this.lock_state();
4386            let collected = this.teardown_inner(&mut s, node_id);
4387            torn_down_for_wave.lock().extend(collected);
4388        })?;
4389        // Fire NodeTornDown for every cascaded id (root + metas +
4390        // downstream consumers that auto-cascaded). Outside the state
4391        // lock, matching fire_topology_event discipline.
4392        let ids = std::mem::take(&mut *torn_down.lock());
4393        for id in ids {
4394            self.fire_topology_event(&crate::topology::TopologyEvent::NodeTornDown(id));
4395        }
4396        Ok(())
4397    }
4398
4399    /// Iterative teardown walk (Slice A-bigger, M1-close).
4400    ///
4401    /// The recursive shape was:
4402    ///   ```text
4403    ///   teardown(n):
4404    ///     if torn_down: return
4405    ///     mark torn_down
4406    ///     for meta in metas: teardown(meta)
4407    ///     terminate_node + queue Teardown
4408    ///     for child in children: teardown(child)
4409    ///   ```
4410    /// Deep linear chains (~10k nodes) overflowed the OS thread stack.
4411    ///
4412    /// The iterative shape uses a `Vec<Action>` stack with `Visit` and
4413    /// `EmitTeardown` actions. `Visit(n)` marks `n` torn-down (or no-ops
4414    /// if already), then pushes (in reverse order so LIFO pops in forward
4415    /// order) `Visit(child_K), …, Visit(child_1), EmitTeardown(n),
4416    /// Visit(meta_M), …, Visit(meta_1)`. The R1.3.9.d "metas first, then
4417    /// self, then children" ordering is preserved by the push order:
4418    /// metas pop first, recursively expand and emit; then `EmitTeardown(n)`
4419    /// pops and runs `terminate_node` + queue `Teardown`; then children
4420    /// pop. Idempotency via `has_received_teardown` keeps each node
4421    /// visited at most once even when multi-parent diamonds re-enter via
4422    /// a sibling path.
4423    fn teardown_inner(&self, s: &mut St<'_>, root: NodeId) -> Vec<NodeId> {
4424        enum Action {
4425            Visit(NodeId),
4426            EmitTeardown(NodeId),
4427        }
4428        let mut stack: Vec<Action> = vec![Action::Visit(root)];
4429        // Topology accumulator: every node that actually emits TEARDOWN
4430        // (i.e. each `EmitTeardown(id)` site, NOT each `Visit` — visits
4431        // for already-torn-down nodes short-circuit on idempotency).
4432        let mut torn_down: Vec<NodeId> = Vec::new();
4433        while let Some(action) = stack.pop() {
4434            match action {
4435                Action::Visit(id) => {
4436                    if s.require_node(id).has_received_teardown {
4437                        continue; // Idempotent (R2.6.4).
4438                    }
4439                    s.require_node_mut(id).has_received_teardown = true;
4440                    // Push order: children first (pop LAST), then
4441                    // EmitTeardown(id), then metas (pop FIRST). Reverse
4442                    // each list so within-group order matches the original
4443                    // recursive iteration.
4444                    let children: Vec<NodeId> = s
4445                        .children
4446                        .get(&id)
4447                        .map(|c| c.iter().copied().collect())
4448                        .unwrap_or_default();
4449                    for &child in children.iter().rev() {
4450                        stack.push(Action::Visit(child));
4451                    }
4452                    stack.push(Action::EmitTeardown(id));
4453                    let metas: Vec<NodeId> = s.require_node(id).meta_companions.clone();
4454                    for &meta in metas.iter().rev() {
4455                        stack.push(Action::Visit(meta));
4456                    }
4457                }
4458                Action::EmitTeardown(id) => {
4459                    // Auto-prepend COMPLETE if not yet terminal. The (now
4460                    // iterative) terminate_node handles auto-cascade to
4461                    // children's own terminal slots per Lock 2.B.
4462                    let already_terminal = s.require_node(id).terminal.is_some();
4463                    if !already_terminal {
4464                        self.terminate_node(s, id, TerminalKind::Complete);
4465                    }
4466                    // Wire emission of the TEARDOWN itself (tier 6).
4467                    self.queue_notify(s, id, Message::Teardown);
4468                    torn_down.push(id);
4469                }
4470            }
4471        }
4472        torn_down
4473    }
4474
4475    /// Attach `companion` as a meta companion of `parent` per R1.3.9.d.
4476    /// Meta companions are nodes whose lifecycle is bound to the parent's
4477    /// in TEARDOWN ordering: when `parent` tears down, `companion` tears
4478    /// down first.
4479    ///
4480    /// Use this for inspection / audit / sidecar nodes that subscribe to
4481    /// parent state — without the ordering, the companion could observe
4482    /// the parent mid-destruction and emit garbage.
4483    ///
4484    /// Idempotent on duplicate registration of the same companion.
4485    ///
4486    /// # Lifecycle constraint
4487    ///
4488    /// Intended for **setup-time** wiring — call this before `parent` or
4489    /// `companion` enters a wave. Mid-wave registration (especially during
4490    /// a teardown cascade in flight) is implementation-defined: the new
4491    /// edge takes effect on the *next* wave. Adding a companion to a
4492    /// torn-down parent silently no-ops (the parent will not tear down
4493    /// again). For dynamic companion attachment with deterministic
4494    /// ordering, prefer constructing the wiring before subscribers exist.
4495    ///
4496    /// # Panics
4497    ///
4498    /// Panics if either node id is unknown, or if `parent == companion`
4499    /// (a node cannot be its own meta companion — would loop on TEARDOWN).
4500    pub fn add_meta_companion(&self, parent: NodeId, companion: NodeId) {
4501        assert!(parent != companion, "node cannot be its own meta companion");
4502        // D246/S2c: single-owner ⇒ ONE shard; the prior cross-shard
4503        // `shard_of(parent) == shard_of(companion)` assert is vacuous
4504        // and deleted. D253 (S5) further deletes the §7 group-
4505        // consistency invariant on this edge (the `SchedulingGroupId`
4506        // surface is gone until M6 re-introduces it).
4507        let mut s = self.lock_state();
4508        assert!(s.nodes.contains_key(&parent), "unknown parent {parent:?}");
4509        assert!(
4510            s.nodes.contains_key(&companion),
4511            "unknown companion {companion:?}"
4512        );
4513        let metas = &mut s.require_node_mut(parent).meta_companions;
4514        if metas.contains(&companion) {
4515            return;
4516        }
4517        metas.push(companion);
4518    }
4519}
4520
4521// -----------------------------------------------------------------------
4522// INVALIDATE — cache clear + downstream cascade
4523// -----------------------------------------------------------------------
4524
4525impl Core {
4526    /// Clear `node_id`'s cache and cascade `[INVALIDATE]` to downstream
4527    /// dependents per canonical spec §1.4.
4528    ///
4529    /// Semantics:
4530    /// - **Never-populated case (R1.4 line 197):** if `cache == NO_HANDLE`,
4531    ///   the call is a no-op — no cache to clear, no INVALIDATE emitted.
4532    ///   This naturally provides idempotency within a wave: once a node has
4533    ///   been invalidated this wave (cache = NO_HANDLE), a second invalidate
4534    ///   on the same node does nothing.
4535    /// - **Cache clear (immediate):** the node's cached handle is dropped
4536    ///   (refcount released), `cache` becomes `NO_HANDLE`. State nodes
4537    ///   keep `has_fired_once` per spec — INVALIDATE is not a re-gating
4538    ///   event (the next emission to a previously-fired state still does
4539    ///   not re-trigger the first-run gate; that's a resubscribable-terminal
4540    ///   lifecycle concern, separate slice).
4541    /// - **Wire emission (tier 4):** `[INVALIDATE]` is queued via the
4542    ///   normal pause-aware notify path. Buffers while paused, flushes
4543    ///   immediately otherwise.
4544    /// - **Downstream cascade:** for each child of this node, the child's
4545    ///   `dep_handles[idx_of_node]` is reset to `NO_HANDLE` (its previous
4546    ///   value referenced a now-released handle). The child is then
4547    ///   recursively invalidated (no-op if its cache was already
4548    ///   `NO_HANDLE`). This re-closes the child's first-run gate — fn
4549    ///   won't fire again until the upstream re-emits a value.
4550    ///
4551    /// Wraps in a fresh wave when called from outside a wave, so
4552    /// notifications flush at the natural wave boundary.
4553    ///
4554    /// # Panics
4555    ///
4556    /// Panics if `node_id` is unknown, consistent with `emit` / `pause`.
4557    pub fn invalidate(&self, node_id: NodeId) {
4558        {
4559            let s = self.lock_state();
4560            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
4561        }
4562        // INVALIDATE cascade follows `s.children` (in-partition by union-
4563        // find construction). Slice Y1 / Phase E.
4564        self.run_wave_for(node_id, |this| {
4565            let mut s = this.lock_state();
4566            this.invalidate_inner(&mut s, node_id);
4567        });
4568    }
4569
4570    /// Invalidate or defer to wave-end on partition order violation.
4571    /// For producer-pattern operator sinks.
4572    ///
4573    /// # Panics
4574    ///
4575    /// Panics if `node_id` is not registered in this Core.
4576    pub fn invalidate_or_defer(&self, node_id: NodeId) {
4577        {
4578            let s = self.lock_state();
4579            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
4580        }
4581        let result = self.try_run_wave_for(node_id, |this| {
4582            let mut s = this.lock_state();
4583            this.invalidate_inner(&mut s, node_id);
4584        });
4585        if result.is_err() {
4586            // S2b (D223): direct owner-context call — see the identical
4587            // rationale in `teardown_or_defer` (immediate-run no-op shim;
4588            // no `Clone`, no pointless `Send` closure needing `C: Sync`).
4589            self.invalidate(node_id);
4590        }
4591    }
4592
4593    /// Iterative invalidate cascade (Slice A-bigger, M1-close).
4594    ///
4595    /// The recursive shape was a depth-first cache-clear walk:
4596    ///   ```text
4597    ///   invalidate(n):
4598    ///     if cache(n) == NO_HANDLE: return  // already-invalidated guard
4599    ///     cache(n) = NO_HANDLE; release handle
4600    ///     queue Invalidate(n)
4601    ///     for child in children:
4602    ///       child.dep_handles[idx] = NO_HANDLE
4603    ///       invalidate(child)
4604    ///   ```
4605    /// Deep linear chains overflowed the OS thread stack. The work-queue
4606    /// rewrite has no ordering subtleties (unlike teardown's R1.3.9.d
4607    /// metas-first constraint) — Invalidate is a tier-4 broadcast where
4608    /// the never-populated / already-invalidated guard provides natural
4609    /// idempotency for diamond fan-in.
4610    fn invalidate_inner(&self, s: &mut St<'_>, root: NodeId) {
4611        let mut work: Vec<NodeId> = vec![root];
4612        while let Some(node_id) = work.pop() {
4613            // Never-populated / already-invalidated: no-op (R1.4 idempotency).
4614            // Per R1.3.9.c never-populated case, OnInvalidate cleanup hook
4615            // also does NOT fire — natural fallout of skipping via the
4616            // cache==NO_HANDLE guard (we never reach the queue-push below).
4617            let old_handle = s.require_node(node_id).cache;
4618            if old_handle == NO_HANDLE {
4619                continue;
4620            }
4621            // Clear cache + release the handle's slot ownership.
4622            s.require_node_mut(node_id).cache = NO_HANDLE;
4623            self.binding.release_handle(old_handle);
4624            // Slice E2 (R1.3.9.b strict per D057 + D058 fire-at-cache-clear):
4625            // queue OnInvalidate cleanup hook for lock-released drain at
4626            // wave-end. The dedup set guarantees at-most-once-per-wave-per-
4627            // node firing even if a node re-populates mid-wave (via fn-fire
4628            // emit) and gets re-invalidated through a separate path. Pure
4629            // cache==NO_HANDLE idempotency (above) catches "still at
4630            // sentinel" only; the explicit set is the strict R1.3.9.b
4631            // reading.
4632            // Q-beyond Sub-slice 3 (D108, 2026-05-09):
4633            // `invalidate_hooks_fired_this_wave` and
4634            // `deferred_cleanup_hooks` both live on per-thread WaveState.
4635            // Single borrow handles the dedup-insert and (on first
4636            // insertion) the cleanup-hook push.
4637            crate::batch::with_wave_state(|ws| {
4638                if ws.invalidate_hooks_fired_this_wave.insert(node_id) {
4639                    ws.deferred_cleanup_hooks
4640                        .push((node_id, CleanupTrigger::OnInvalidate));
4641                }
4642            });
4643            // Wire emission. Pause-aware via queue_notify.
4644            self.queue_notify(s, node_id, Message::Invalidate);
4645            // Cascade: for each child, clear the dep record's prev_data
4646            // referencing this node and push child onto the work queue.
4647            let child_ids: Vec<NodeId> = s
4648                .children
4649                .get(&node_id)
4650                .map(|c| c.iter().copied().collect())
4651                .unwrap_or_default();
4652            for child_id in child_ids {
4653                let dep_idx = s.require_node(child_id).dep_index_of(node_id);
4654                if let Some(idx) = dep_idx {
4655                    // Reset the child's dep record — the handle was just
4656                    // released. Subsequent first-run-gate checks see
4657                    // sentinel and re-close.
4658                    //
4659                    // Snapshot prev_data + data_batch retains for deferred
4660                    // release, then clear the record. Two-phase to satisfy
4661                    // the borrow checker (nodes + deferred_handle_releases
4662                    // are separate CoreState fields).
4663                    let (old_prev, batch_hs): (HandleId, SmallVec<[HandleId; 1]>) = {
4664                        let dr = &s.require_node(child_id).dep_records[idx];
4665                        (dr.prev_data, dr.data_batch.clone())
4666                    };
4667                    {
4668                        // Q-beyond Sub-slice 1 (D108, 2026-05-09):
4669                        // deferred_handle_releases moved to per-thread
4670                        // WaveState thread_local. State lock held; the
4671                        // thread_local borrow is independent.
4672                        crate::batch::with_wave_state(|ws| {
4673                            if old_prev != NO_HANDLE {
4674                                ws.deferred_handle_releases.push(old_prev);
4675                            }
4676                            for h in batch_hs {
4677                                ws.deferred_handle_releases.push(h);
4678                            }
4679                        });
4680                    }
4681                    let child_rec = s.require_node_mut(child_id);
4682                    child_rec.dep_records[idx].prev_data = NO_HANDLE;
4683                    child_rec.dep_records[idx].data_batch.clear();
4684                    // §10.13 perf (D047): clear received_mask bit so
4685                    // has_sentinel_deps() re-closes the first-run gate.
4686                    if idx < 64 {
4687                        child_rec.received_mask &= !(1u64 << idx);
4688                    }
4689                    work.push(child_id);
4690                }
4691            }
4692        }
4693    }
4694}
4695
4696// -----------------------------------------------------------------------
4697// PAUSE / RESUME — multi-pauser lockset + replay buffer
4698// -----------------------------------------------------------------------
4699
4700/// Reported back from [`Core::resume`] when the final lock releases.
4701///
4702/// `replayed` is the number of tier-3/tier-4 messages dispatched to
4703/// subscribers as part of the drain. `dropped` is the number of messages
4704/// that fell out the front of the buffer due to the Core-global
4705/// `pause_buffer_cap` while this pause cycle was active. A non-zero
4706/// `dropped` indicates a controller held the lock long enough to overflow
4707/// the cap; the binding may want to surface a warning or error.
4708#[derive(Copy, Clone, Debug, PartialEq, Eq)]
4709pub struct ResumeReport {
4710    pub replayed: u32,
4711    pub dropped: u32,
4712}
4713
4714impl Core {
4715    /// Acquire a pause lock on `node_id`. The first lock transitions the
4716    /// node from `Active` to `Paused`; further locks add to the lockset.
4717    /// While paused, tier-3 (DATA/RESOLVED) and tier-4 (INVALIDATE) outgoing
4718    /// messages buffer in the node's pause buffer; other tiers flush
4719    /// immediately.
4720    ///
4721    /// Re-acquiring the same `lock_id` is an idempotent no-op (matches TS
4722    /// convention, R1.2.6 silent on the case).
4723    pub fn pause(&self, node_id: NodeId, lock_id: LockId) -> Result<(), PauseError> {
4724        let mut s = self.lock_state();
4725        let rec = s
4726            .nodes
4727            .get_mut(&node_id)
4728            .ok_or(PauseError::UnknownNode(node_id))?;
4729        // QA A5 (2026-05-07): terminated nodes can't be re-paused. Without
4730        // this check, a stale pause-controller calling pause() on an
4731        // already-terminated node would re-arm `pause_state` to Paused.
4732        // The terminate_node path collapses pause_state → Active and
4733        // drains the buffer (A3-related), but doesn't gate subsequent
4734        // pause() calls. Treat as idempotent no-op (consistent with how
4735        // emit/complete/error early-return on terminal).
4736        if rec.terminal.is_some() {
4737            return Ok(());
4738        }
4739        // Slice F audit close (2026-05-07): `PausableMode::Off` means the
4740        // dispatcher ignores PAUSE for this node — tier-3 flushes
4741        // immediately, fn fires immediately. Treat the call as a successful
4742        // no-op so callers don't need to special-case.
4743        if rec.pausable == PausableMode::Off {
4744            return Ok(());
4745        }
4746        rec.pause_state.add_lock(lock_id);
4747        Ok(())
4748    }
4749
4750    /// Release a pause lock on `node_id`. If the lockset becomes empty, the
4751    /// node transitions back to `Active` and the buffered messages are
4752    /// dispatched to subscribers in arrival order. Returns a [`ResumeReport`]
4753    /// when the final lock released; `None` if the lockset is still
4754    /// non-empty (further locks held).
4755    ///
4756    /// Releasing an unknown `lock_id` (or releasing on an already-Active
4757    /// node) is an idempotent no-op returning `None`.
4758    pub fn resume(
4759        &self,
4760        node_id: NodeId,
4761        lock_id: LockId,
4762    ) -> Result<Option<ResumeReport>, PauseError> {
4763        // Phase 1 (lock-held): collect drained buffer + pending-wave flag +
4764        // sink Arcs. For default-mode nodes whose `pending_wave` was set
4765        // during pause, schedule a single fn-fire by adding to
4766        // `pending_fires` BEFORE we exit the lock — the wave engine picks
4767        // it up on the next drain tick.
4768        let (sinks, messages, dropped, pending_wave_for_default) = {
4769            let mut s = self.lock_state();
4770            let rec = s
4771                .nodes
4772                .get_mut(&node_id)
4773                .ok_or(PauseError::UnknownNode(node_id))?;
4774            // For Off mode, pause/resume are no-ops by construction.
4775            if rec.pausable == PausableMode::Off {
4776                return Ok(None);
4777            }
4778            let was_default_mode = rec.pausable == PausableMode::Default;
4779            // Capture pending_wave BEFORE remove_lock collapses the state.
4780            let pending_wave = if was_default_mode {
4781                rec.pause_state.take_pending_wave()
4782            } else {
4783                false
4784            };
4785            let Some((buffer, dropped)) = rec.pause_state.remove_lock(lock_id) else {
4786                // Not the final-resume — restore the pending_wave flag we
4787                // tentatively cleared, since we're not transitioning to
4788                // Active yet.
4789                if pending_wave {
4790                    rec.pause_state.mark_pending_wave();
4791                }
4792                return Ok(None);
4793            };
4794            let sinks: Vec<Sink> = rec.subscribers.values().cloned().collect();
4795            let messages: Vec<Message> = buffer.into_iter().collect();
4796            // Default-mode pending-wave handling: schedule the fn-fire so
4797            // the wave engine consolidates the pause-window dep deliveries
4798            // into one fn execution. State nodes don't fire fn (no
4799            // `pending_fires` membership has effect for them).
4800            //
4801            // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
4802            // on per-thread WaveState.
4803            if pending_wave && was_default_mode {
4804                crate::batch::with_wave_state(|ws| {
4805                    ws.pending_fires.insert(node_id);
4806                });
4807            }
4808            (sinks, messages, dropped, pending_wave && was_default_mode)
4809        };
4810        let replayed = u32::try_from(messages.len()).unwrap_or(u32::MAX);
4811
4812        // Phase 2 (lock-released): fire sinks for ResumeAll-buffered
4813        // messages. Default-mode resume produces no buffered replay (the
4814        // consolidated fn-fire produces fresh wave traffic via the standard
4815        // commit_emission path).
4816        if !messages.is_empty() {
4817            for sink in &sinks {
4818                sink(&messages);
4819            }
4820            // Phase 3: balance the retain_handle calls done at buffer-push
4821            // time — sinks observe values but don't own refcount shares.
4822            for msg in &messages {
4823                if let Some(h) = msg.payload_handle() {
4824                    self.binding.release_handle(h);
4825                }
4826            }
4827        }
4828
4829        // Phase 4 (default-mode): drain the consolidated fn-fire scheduled
4830        // in Phase 1. `run_wave_for(node_id)` acquires the partitions
4831        // touched from `node_id` (Slice Y1 / Phase E) and runs the standard
4832        // drain pipeline; the new fn-fire emerges as a normal wave's worth
4833        // of messages to subscribers.
4834        if pending_wave_for_default {
4835            self.run_wave_for(node_id, |_this| {
4836                // The pending_fires entry was pushed in Phase 1 under the
4837                // lock. run_wave's drain picks it up.
4838            });
4839        }
4840        Ok(Some(ResumeReport { replayed, dropped }))
4841    }
4842
4843    /// True if the node currently holds at least one pause lock.
4844    #[must_use]
4845    pub fn is_paused(&self, node_id: NodeId) -> bool {
4846        self.lock_state()
4847            .require_node(node_id)
4848            .pause_state
4849            .is_paused()
4850    }
4851
4852    /// Number of pause locks currently held on `node_id`. `0` if Active.
4853    #[must_use]
4854    pub fn pause_lock_count(&self, node_id: NodeId) -> usize {
4855        self.lock_state()
4856            .require_node(node_id)
4857            .pause_state
4858            .lock_count()
4859    }
4860
4861    /// Test helper: whether `node_id` currently holds the given `lock_id`.
4862    #[must_use]
4863    pub fn holds_pause_lock(&self, node_id: NodeId, lock_id: LockId) -> bool {
4864        self.lock_state()
4865            .require_node(node_id)
4866            .pause_state
4867            .contains_lock(lock_id)
4868    }
4869}
4870
4871// -----------------------------------------------------------------------
4872// set_deps — atomic dep mutation
4873// -----------------------------------------------------------------------
4874
4875/// Errors returnable by [`Core::set_deps`].
4876///
4877/// Per `~/src/graphrefly-ts/docs/research/rewire-design-notes.md` and the
4878/// Phase 13.8 Q1 lock:
4879/// - `SelfDependency` — `n in newDeps` (self-loops are pathological without
4880///   explicit fixed-point semantics, which GraphReFly does not provide).
4881/// - `WouldCreateCycle { path }` — adding the new edge would create a cycle.
4882///   The `path` field reports the offending dep chain for debuggability.
4883/// - `UnknownNode` / `NotComputeNode` — invariant violations from the caller.
4884/// - `TerminalNode` — `n` itself has emitted COMPLETE/ERROR; rewiring a
4885///   terminal stream is a category error (terminal is one-shot at this
4886///   layer; recovery is the resubscribable path on a fresh subscribe).
4887/// - `TerminalDep` — a newly-added dep is terminal AND not resubscribable.
4888///   Resubscribable terminal deps are accepted because the subscribe path
4889///   resets their lifecycle. Non-resubscribable terminal deps would deliver
4890///   their already-emitted terminal directly to `n`'s `dep_terminals` slot,
4891///   which is rarely intended.
4892#[derive(Error, Debug, Clone, PartialEq)]
4893pub enum SetDepsError {
4894    /// `n` appeared in `new_deps` (self-loop rejection).
4895    #[error("set_deps({n:?}, ...): self-dependency rejected (n appeared in new_deps)")]
4896    SelfDependency { n: NodeId },
4897
4898    /// Adding the new dep would create a cycle. `path` is the chain
4899    /// `[added_dep, ..., n]` reachable via existing deps.
4900    #[error(
4901        "set_deps({n:?}, ...): cycle would form via path {path:?} \
4902         (adding {added_dep:?} → {n:?} closes the loop)"
4903    )]
4904    WouldCreateCycle {
4905        n: NodeId,
4906        added_dep: NodeId,
4907        path: Vec<NodeId>,
4908    },
4909
4910    #[error("set_deps: unknown node {0:?}")]
4911    UnknownNode(NodeId),
4912
4913    #[error("set_deps: node {0:?} is not a compute node (state nodes have no deps)")]
4914    NotComputeNode(NodeId),
4915
4916    /// `n` itself has terminated (COMPLETE / ERROR). Rewiring a terminal node
4917    /// is rejected — the stream has ended at this layer. To recover, mark
4918    /// the node resubscribable before terminate; a fresh subscribe will then
4919    /// reset its lifecycle.
4920    #[error("set_deps({n:?}, ...): node has already terminated; cannot rewire a terminal node")]
4921    TerminalNode { n: NodeId },
4922
4923    /// A newly-added dep is terminal AND non-resubscribable. Per Phase 13.8
4924    /// Q1, this is rejected; resubscribable terminal deps are allowed
4925    /// because the subscribe path resets them when activated. Already-present
4926    /// terminal deps are unaffected (their terminal status was accepted at
4927    /// the time they terminated).
4928    #[error(
4929        "set_deps({n:?}, ...): added dep {dep:?} is terminal and not resubscribable; \
4930         either mark it resubscribable before terminate, or remove the dep from new_deps"
4931    )]
4932    TerminalDep { n: NodeId, dep: NodeId },
4933
4934    /// `n` itself is currently mid-fire — a user fn for `n` re-entered Core
4935    /// via `set_deps(n, ...)` from inside `n`'s own `invoke_fn` /
4936    /// `project_each` / `predicate_each` / etc. Phase 1 of the dispatcher
4937    /// snapshotted `dep_handles` BEFORE the lock-released callback; the
4938    /// callback returning a `tracked` set indexed against THAT ordering
4939    /// would corrupt indices if the rewire re-orders deps mid-fire.
4940    /// Rejected to preserve the dynamic-tracked-indices invariant (D1).
4941    ///
4942    /// Workaround: schedule the rewire from a different node's fn (via
4943    /// `Core::emit` on a state node and observing the emit downstream),
4944    /// or perform the rewire after the wave completes (e.g. from a sink
4945    /// callback that is itself outside any fn-fire scope).
4946    ///
4947    /// Slice F (2026-05-07) — A6.
4948    #[error(
4949        "set_deps({n:?}, ...): rejected — node {n:?} is currently mid-fire \
4950         (set_deps from inside the firing node's own fn would corrupt the \
4951         Dynamic `tracked` indices snapshot taken before invoke_fn). \
4952         Schedule the rewire outside this fire scope."
4953    )]
4954    ReentrantOnFiringNode { n: NodeId },
4955    // /qa m1 (2026-05-19): the vestigial `PartitionMigrationDuringFire`
4956    // variant — kept across §7/D208–D211 + D253 (S5) as a downstream-
4957    // churn courtesy for `Err(_)` match arms — is now deleted. D253
4958    // removed the entire scheduling-group identity surface
4959    // (`SchedulingGroupId`, `node_group` index,
4960    // `set_scheduling_group`/`partition_of`/`group_of`), so the
4961    // partition-migration concept is gone too. No surviving caller
4962    // pattern-matches the variant (verified by grep; the only
4963    // reference was a TRASH/scheduling_groups.rs comment).
4964}
4965
4966// D246/S2c: `MigrationRollback` + `MovedComponent` deleted — there is
4967// no cross-shard extract→reinsert (single shard always under
4968// single-owner). D253 (S5) further deletes `SetGroupError` +
4969// `Core::set_scheduling_group` entirely — the declared-group identity
4970// surface (`SchedulingGroupId`, `node_group`, `partition_of`/`group_of`)
4971// is removed until M6 re-introduces it with M6's actual scheduling
4972// needs in view.
4973
4974impl Core {
4975    /// Atomic dep mutation — change a node's upstream deps without TEARDOWN
4976    /// cascading and without losing cache.
4977    ///
4978    /// Per the TLA+-verified design at
4979    /// `~/src/graphrefly-ts/docs/research/wave_protocol_rewire.tla`
4980    /// (35,950 distinct states, all 7 invariants clean):
4981    ///
4982    /// - Removed deps: clear dirtyMask bit, drain pending queue, drop DepRecord.
4983    /// - Added deps: SENTINEL prevData; push-on-subscribe if added dep has cached DATA.
4984    /// - Preserved: `firstRunPassed`, `pauseLocks`, `pauseBuffer`, `cache` (ROM/RAM).
4985    /// - Status auto-settles if dirtyMask becomes empty.
4986    /// - Idempotent on `new_deps == current deps`.
4987    /// - Self-rewire `n ∈ new_deps` rejected (`SelfDependency`).
4988    /// - Cycles rejected (`WouldCreateCycle`).
4989    /// - Allowed mid-wave + while paused.
4990    /// - Phase 13.8 Q1: terminal `n` rejected (`TerminalNode`); newly-added
4991    ///   terminal non-resubscribable deps rejected (`TerminalDep`).
4992    ///
4993    /// The body is a single atomic dep-mutation transaction with several
4994    /// discrete validation stages. Splitting would require passing a
4995    /// partially-mutable CoreState across helpers, and the transaction's
4996    /// locality is what makes the F1 refcount-leak collection work.
4997    #[allow(clippy::too_many_lines)]
4998    pub fn set_deps(&self, n: NodeId, new_deps: &[NodeId]) -> Result<(), SetDepsError> {
4999        // 2b-ii-B (D220-EXEC): route to the node's shard; the new deps
5000        // are in the same component ⇒ same group ⇒ same shard (the
5001        // component-consistency check enforces). See `try_emit`.
5002        let mut s = self.lock_state();
5003        // Validate node exists and is compute. Read-once via the helper so
5004        // subsequent code can use `require_node(n)` without re-checking.
5005        let (is_state, is_producer, is_terminal) = {
5006            let rec = s.nodes.get(&n).ok_or(SetDepsError::UnknownNode(n))?;
5007            (rec.is_state(), rec.is_producer(), rec.terminal.is_some())
5008        };
5009        if is_state || is_producer {
5010            // State and Producer nodes have no declared deps — set_deps
5011            // is meaningless. Producer nodes manage their own subscriptions
5012            // through the binding's ProducerCtx; mutating their (empty)
5013            // dep set would not affect that.
5014            return Err(SetDepsError::NotComputeNode(n));
5015        }
5016        // Reject if `n` itself is terminal (Phase 13.8 Q1: terminal nodes
5017        // cannot be rewired; recovery is via resubscribable subscribe).
5018        if is_terminal {
5019            return Err(SetDepsError::TerminalNode { n });
5020        }
5021        // A6 reentrancy guard (Slice F, 2026-05-07): reject if `n` is
5022        // mid-fire. Closes the D1 hazard where `Phase 1` snapshotted
5023        // `dep_handles` against pre-rewire dep ordering and `Phase 3`
5024        // would store the returned `tracked` indices against post-rewire
5025        // ordering.
5026        //
5027        // D250 (S4, 2026-05-19): post-D248/D249 single-owner the only
5028        // historical *trigger* for this guard — a synchronous `set_deps`
5029        // re-entry from inside `n`'s own fn-fire via the binding-holds-
5030        // cloned-Core mechanism — is structurally deleted (`invoke_fn`
5031        // has no `&Core`; `set_deps` is not on `CoreFull`/`MailboxOp`; a
5032        // Defer runs owner-side AFTER the wave settles with
5033        // `currently_firing` empty). The deleted cross-thread P13 path
5034        // (Thread B's set_deps observing Thread A's lock-released
5035        // invoke_fn pushes) is likewise gone — Core is `!Send`, one
5036        // owner thread. The guard is therefore **defensive**: it stays
5037        // live so a future owner-side mid-wave self-rewire seam (or any
5038        // re-entry that lands an owner inside the firing set) is
5039        // rejected fail-loud rather than corrupting tracked indices. The
5040        // **Guard preserved for future owner-side mid-wave self-rewire
5041        // seam** (S4-horizon work). No test exercises this code path
5042        // today — the pre-D221 binding-holds-cloned-Core trigger was
5043        // deleted in D246 and the test stub was removed at S6
5044        // (2026-05-20). **Do NOT delete this guard as "dead code"**:
5045        // removing it weakens the invariant that protects
5046        // `dep_records` indices against a future seam that lands an
5047        // owner inside `currently_firing`. If `CoreFull`/`MailboxOp`
5048        // is ever widened with a `set_deps`-equivalent for a real
5049        // consumer, write a FRESH test against the new seam.
5050        // `currently_firing` lives on `CoreShared`, read under the
5051        // already-held state lock (under D246/D248 single-owner the
5052        // borrow is *structural* rather than concurrency-required —
5053        // `CoreShared` is owner-thread-only).
5054        if s.shared.currently_firing.contains(&n) {
5055            return Err(SetDepsError::ReentrantOnFiringNode { n });
5056        }
5057        // Self-rewire rejection.
5058        if new_deps.contains(&n) {
5059            return Err(SetDepsError::SelfDependency { n });
5060        }
5061        // Validate all new deps exist.
5062        for &d in new_deps {
5063            if !s.nodes.contains_key(&d) {
5064                return Err(SetDepsError::UnknownNode(d));
5065            }
5066        }
5067        // Cycle detection: data flows parent → child via the `children` map.
5068        // Adding edge `d → n` (d becomes a dep of n) creates a cycle iff
5069        // `d` is already reachable from `n` via existing data-flow edges
5070        // (so `n → ... → d` exists, and the new `d → n` closes the loop).
5071        // DFS from `n` along `children` edges, looking for each added dep.
5072        let current_deps: HashSet<NodeId> = s.require_node(n).dep_ids().collect();
5073        let new_deps_set: HashSet<NodeId> = new_deps.iter().copied().collect();
5074        let added: HashSet<NodeId> = new_deps_set.difference(&current_deps).copied().collect();
5075        for &d in &added {
5076            if let Some(path) = self.path_from_to(&s, n, d) {
5077                return Err(SetDepsError::WouldCreateCycle {
5078                    n,
5079                    added_dep: d,
5080                    path,
5081                });
5082            }
5083        }
5084        // Phase 13.8 Q1: reject newly-added deps that are terminal AND not
5085        // resubscribable. Resubscribable terminal deps are allowed — the
5086        // subscribe path resets their lifecycle when something activates
5087        // them. Already-present (kept) deps are unaffected; their terminal
5088        // status was accepted at the time they terminated.
5089        for &d in &added {
5090            let dep_rec = s.require_node(d);
5091            if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
5092                return Err(SetDepsError::TerminalDep { n, dep: d });
5093            }
5094        }
5095        // Compute `removed` early (Phase F: needs to be available for P13
5096        // split-case widening below). Idempotent fast-path moved below the
5097        // P13 check accordingly.
5098        let removed: HashSet<NodeId> = current_deps.difference(&new_deps_set).copied().collect();
5099
5100        // Idempotent fast-path: no add/remove ⇒ no-op. D253 (S5) deletes
5101        // the prior §7 None/Some-mix endpoint check that lived above —
5102        // the declared-group identity surface is gone until M6.
5103        if added.is_empty() && removed.is_empty() {
5104            return Ok(());
5105        }
5106
5107        // Snapshot old deps (ordered) for topology event, before mutation.
5108        let old_deps_vec: Vec<NodeId> = s.require_node(n).dep_ids_vec();
5109
5110        // Carry out the rewire atomically.
5111        // 1. Build new dep_records, preserving DepRecord state for kept deps.
5112        let new_deps_vec: Vec<NodeId> = new_deps.to_vec();
5113        //
5114        // Refcount discipline (F1 audit fix): each `Some(TerminalKind::Error(h))`
5115        // slot owns a refcount share retained at `terminate_node` time. When a
5116        // dep is REMOVED, its slot is dropped — the corresponding handle's
5117        // share must be released here, otherwise it leaks until Core drop.
5118        // Also release data_batch retains for removed deps.
5119        let (new_dep_records, removed_handles): (Vec<DepRecord>, Vec<HandleId>) = {
5120            let rec = s.require_node(n);
5121            // Index old dep_records by NodeId for O(1) lookup of kept deps.
5122            let old_by_node: HashMap<NodeId, &DepRecord> =
5123                rec.dep_records.iter().map(|dr| (dr.node, dr)).collect();
5124            let new_records: Vec<DepRecord> = new_deps_vec
5125                .iter()
5126                .map(|&d| {
5127                    if let Some(old) = old_by_node.get(&d) {
5128                        // Kept dep: preserve all state (prev_data, data_batch,
5129                        // terminal, wave flags). Subscriptions stay live.
5130                        DepRecord {
5131                            node: d,
5132                            prev_data: old.prev_data,
5133                            dirty: old.dirty,
5134                            involved_this_wave: old.involved_this_wave,
5135                            data_batch: old.data_batch.clone(),
5136                            terminal: old.terminal,
5137                        }
5138                    } else {
5139                        // Added dep: fresh sentinel record.
5140                        DepRecord::new(d)
5141                    }
5142                })
5143                .collect();
5144            // Collect handles to release from REMOVED dep records.
5145            let mut to_release: Vec<HandleId> = Vec::new();
5146            for d in &removed {
5147                if let Some(old) = old_by_node.get(d) {
5148                    if let Some(TerminalKind::Error(h)) = old.terminal {
5149                        to_release.push(h);
5150                    }
5151                    // Release data_batch retains for removed deps.
5152                    for &h in &old.data_batch {
5153                        to_release.push(h);
5154                    }
5155                }
5156            }
5157            (new_records, to_release)
5158        };
5159        // Clear dirtyMask bit by re-emitting the wave-bookkeeping: we don't
5160        // currently model a per-dep dirtyMask explicitly (we use the boolean
5161        // `dirty` flag at node level). Removing a dep's entry from the implicit
5162        // mask is therefore implicit — by removing the dep, future emissions
5163        // from it can't re-arm the bit. The per-dep `involved_this_wave` flag
5164        // stays wave-scoped and gets cleared at wave end. The setDeps action
5165        // itself does NOT change the dirty boolean unless all deps are cleared;
5166        // in that case we settle.
5167        // Slice E2 (D067): on a dynamic node that had previously fired its
5168        // fn, capture `has_fired_once` BEFORE the reset so we can fire
5169        // `OnRerun` cleanup lock-released after `drop(s)` below. Without
5170        // this, the next `fire_regular` Phase 1 would capture
5171        // `has_fired_once = false`, causing Phase 1.5 to skip OnRerun —
5172        // silently dropping the prior activation's cleanup closure when
5173        // the next `invoke_fn` overwrites `current_cleanup`. Per spec
5174        // R2.4.5, `set_deps` does NOT end the activation cycle
5175        // (subscribe→unsubscribe is the cycle boundary), so OnRerun must
5176        // fire on every re-fire including post-set_deps.
5177        // §10 perf (D047): compute new topo_rank before the mutable
5178        // borrow on rec, since we need to read other nodes' depths.
5179        let new_topo_rank = if new_deps_vec.is_empty() {
5180            0
5181        } else {
5182            new_deps_vec
5183                .iter()
5184                .filter(|&&d| d != n)
5185                .filter_map(|&d| s.nodes.get(&d).map(|r| r.topo_rank))
5186                .max()
5187                .unwrap_or(0)
5188                .saturating_add(1)
5189        };
5190        let fire_set_deps_on_rerun;
5191        {
5192            let rec = s.require_node_mut(n);
5193            fire_set_deps_on_rerun = rec.is_dynamic && rec.has_fired_once;
5194            rec.dep_records = new_dep_records;
5195            rec.topo_rank = new_topo_rank;
5196            // §10.13 perf (D047): recompute received_mask from new dep_records.
5197            // §10.3 perf (Slice V1): recompute involved_mask alongside.
5198            rec.received_mask = 0;
5199            rec.involved_mask = 0;
5200            for (i, dr) in rec.dep_records.iter().enumerate() {
5201                if i < 64 {
5202                    if dr.prev_data != NO_HANDLE || !dr.data_batch.is_empty() {
5203                        rec.received_mask |= 1u64 << i;
5204                    }
5205                    if dr.involved_this_wave {
5206                        rec.involved_mask |= 1u64 << i;
5207                    }
5208                }
5209            }
5210            // Re-derive `tracked` for static derived: all indices.
5211            // For dynamic: clear `tracked` AND reset `has_fired_once` so the
5212            // next dep delivery satisfies the first-fire branch in
5213            // `deliver_data_to_consumer` (`!has_fired_once || tracked.contains(...)`).
5214            // Without resetting `has_fired_once`, the cleared `tracked` blocks
5215            // every future fire — fn never re-runs and the dynamic node sits
5216            // on stale cache derived from the old dep set. The next fire
5217            // re-runs fn unconditionally; fn's returned `tracked` then
5218            // repopulates `rec.tracked` and normal selective-deps semantics
5219            // resume from the next dep update onward.
5220            if rec.is_dynamic {
5221                rec.tracked.clear();
5222                rec.has_fired_once = false;
5223            } else {
5224                // Derived (static) and Operator track all deps.
5225                rec.tracked = (0..new_deps_vec.len()).collect();
5226            }
5227        }
5228
5229        // 2. Update inverted-edge map (children).
5230        for &removed_dep in &removed {
5231            if let Some(set) = s.children.get_mut(&removed_dep) {
5232                set.remove(&n);
5233            }
5234        }
5235        for &added_dep in &added {
5236            s.children.entry(added_dep).or_default().insert(n);
5237        }
5238
5239        // 3. Push-on-subscribe for added deps with cached DATA. Wraps in a
5240        // wave so any downstream propagation runs cleanly. We capture only
5241        // the LIST of added deps (not their cache values) because the cache
5242        // can change between releasing the validation lock and the wave's
5243        // re-acquisition — see the P2 race fix below.
5244        //
5245        // P2 (Slice A close /qa) — between `drop(s)` and `run_wave`'s
5246        // closure re-acquiring the lock, a concurrent thread could
5247        // invalidate one of the added deps, releasing its cache handle. A
5248        // pre-snapshot of `(added_dep, cache)` pairs would then carry a
5249        // dangling HandleId into `deliver_data_to_consumer`. The fix is to
5250        // re-read each added dep's `cache` INSIDE the closure (under the
5251        // freshly re-acquired state lock). The wave-owner re-entrant mutex
5252        // (Q2) blocks concurrent waves once we enter `run_wave`, so the
5253        // re-read sees a coherent post-validation state.
5254        let added_for_wave: Vec<NodeId> = added.iter().copied().collect();
5255        // §7 (D208–D211): the union-find union/split-eager block that
5256        // lived here is DELETED. scheduling groups are static +
5257        // user-declared and do NOT migrate with topology, so adding /
5258        // removing dep edges never recomputes any node's group. The
5259        // group-consistency invariant for the new adjacency was already
5260        // enforced above (the strict check that replaced the P13 block).
5261        // No registry mutation on the set_deps path.
5262        // B3 (D117, 2026-05-10): when set_deps clears ALL deps mid-wave AND
5263        // n has a tier-1 (DIRTY) message already queued this wave with no
5264        // settle yet, push n to `pending_auto_resolve` so the drain-end
5265        // sweep emits a paired Resolved. Without this, subscribers observe
5266        // an unpaired DIRTY, violating R1.3.1.b two-phase push pairing.
5267        //
5268        // Outside any active wave the per-thread `pending_notify` is empty
5269        // (cleared at wave-end), so the predicate short-circuits and the
5270        // insert is a no-op. Inside a wave, the `pending_auto_resolve`
5271        // sweep at `drain_and_flush` end re-checks pending_notify and
5272        // routes through `queue_notify` (which handles paused-children
5273        // pause-buffer placement automatically).
5274        if new_deps_vec.is_empty() {
5275            // F6 (/qa 2026-05-10): walk pending_notify in arrival order
5276            // counting unpaired DIRTYs. Tier 4 INVALIDATE is NOT a
5277            // settle for two-phase pairing — it clears cache but does
5278            // not pair with a DIRTY. Pairs are DIRTY ↔ DATA / RESOLVED
5279            // (tier 3 value-class) or DIRTY ↔ COMPLETE / ERROR (tier 5
5280            // terminal). Multi-emit waves like `[DIRTY, RESOLVED, DIRTY]`
5281            // leave one trailing unpaired DIRTY that needs auto-Resolved.
5282            crate::batch::with_wave_state(|ws| {
5283                let needs_auto_resolve = ws.pending_notify.get(&n).is_some_and(|entry| {
5284                    let mut unpaired: i32 = 0;
5285                    for m in entry.iter_messages() {
5286                        match m {
5287                            crate::message::Message::Dirty => unpaired += 1,
5288                            crate::message::Message::Data(_)
5289                            | crate::message::Message::Resolved
5290                            | crate::message::Message::Complete
5291                            | crate::message::Message::Error(_)
5292                                if unpaired > 0 =>
5293                            {
5294                                unpaired -= 1;
5295                            }
5296                            // INVALIDATE / PAUSE / RESUME / TEARDOWN /
5297                            // START — not settles for two-phase pairing.
5298                            _ => {}
5299                        }
5300                    }
5301                    unpaired > 0
5302                });
5303                if needs_auto_resolve {
5304                    ws.pending_auto_resolve.insert(n);
5305                }
5306            });
5307        }
5308        // Drop the state lock before run_wave (which acquires its own) and
5309        // before crossing the binding boundary for the F1 refcount-fix
5310        // releases. Keeps the lock-discipline split (binding calls outside
5311        // the state lock) consistent with the rest of the dispatcher.
5312        drop(s);
5313        // Slice E2 (D067): fire OnRerun lock-released for dynamic nodes
5314        // that had previously fired. The cleanup closure cleans up
5315        // resources tied to the old dep shape before the next fn-fire
5316        // (triggered by added-dep push-on-subscribe below) registers a
5317        // fresh cleanup spec. Direct fire (NOT via deferred_cleanup_hooks)
5318        // because set_deps may NOT enter a wave (no added deps → no
5319        // run_wave below) — queueing the hook would orphan it until the
5320        // next unrelated wave drains.
5321        if fire_set_deps_on_rerun {
5322            self.binding.cleanup_for(n, CleanupTrigger::OnRerun);
5323        }
5324        // Fire topology event after lock is dropped.
5325        self.fire_topology_event(&crate::topology::TopologyEvent::DepsChanged {
5326            node: n,
5327            old_deps: old_deps_vec,
5328            new_deps: new_deps_vec.clone(),
5329        });
5330        if !added_for_wave.is_empty() {
5331            // Slice Y1 / Phase E: push-on-subscribe wave runs on `n`'s
5332            // touched partitions. Added deps are now unioned with `n`
5333            // (Phase C P12 fix moved registry mutation inside the state
5334            // lock), so any cascade through them stays in `n`'s partition
5335            // set as walked by `compute_touched_partitions`.
5336            self.run_wave_for(n, |this| {
5337                let mut s = this.lock_state();
5338                // Defensive: re-validate `n` still exists and isn't terminal.
5339                // A concurrent path could have terminated it between
5340                // validation and run_wave_for's partition-lock acquisition.
5341                if !s.nodes.contains_key(&n) || s.require_node(n).terminal.is_some() {
5342                    return;
5343                }
5344                for added_dep in &added_for_wave {
5345                    // Re-read cache under the wave-owner-held lock — this
5346                    // is the post-validation, post-concurrent-action
5347                    // snapshot. NO_HANDLE means the dep was invalidated
5348                    // concurrently; skip (no data to push).
5349                    let cache = match s.nodes.get(added_dep) {
5350                        Some(rec) => rec.cache,
5351                        None => continue, // dep deleted concurrently
5352                    };
5353                    if cache == NO_HANDLE {
5354                        continue;
5355                    }
5356                    let dep_idx = s.require_node(n).dep_index_of(*added_dep);
5357                    if let Some(idx) = dep_idx {
5358                        this.deliver_data_to_consumer(&mut s, n, idx, cache);
5359                    }
5360                }
5361            });
5362        }
5363        for h in removed_handles {
5364            self.binding.release_handle(h);
5365        }
5366        Ok(())
5367    }
5368
5369    /// DFS from `from` along data-flow edges (children map) looking for `to`.
5370    /// Returns the path including endpoints, or `None` if unreachable. Used
5371    /// for cycle detection in [`Self::set_deps`].
5372    fn path_from_to(&self, s: &CoreState, from: NodeId, to: NodeId) -> Option<Vec<NodeId>> {
5373        if from == to {
5374            return Some(vec![from]);
5375        }
5376        let mut stack: Vec<(NodeId, Vec<NodeId>)> = vec![(from, vec![from])];
5377        let mut visited: HashSet<NodeId> = HashSet::new();
5378        while let Some((cur, path)) = stack.pop() {
5379            if !visited.insert(cur) {
5380                continue;
5381            }
5382            if cur == to {
5383                return Some(path);
5384            }
5385            if let Some(children) = s.children.get(&cur) {
5386                for &child in children {
5387                    let mut new_path = path.clone();
5388                    new_path.push(child);
5389                    stack.push((child, new_path));
5390                }
5391            }
5392        }
5393        None
5394    }
5395}
5396
5397// CoreState helpers — kept on the inner struct so they're naturally scoped
5398// to the lock guard.
5399impl CoreState {
5400    // D246/S2c: `empty_shard` deleted — no per-`ShardKey` shard
5401    // construction (single shard always under single-owner).
5402
5403    // `alloc_node_id` / `alloc_sub_id` moved to `impl St` (Step 2a,
5404    // D220-EXEC): their counters now live in the separate `CoreShared`
5405    // region, which only the combined `St` guard holds alongside the
5406    // shard. Call sites (`s.alloc_node_id()` / `s.alloc_sub_id()` where
5407    // `s = self.lock_state()`) are unchanged — inherent `St` methods
5408    // resolve before the `Deref`-to-`CoreState` ones.
5409
5410    /// Clear wave-scoped flags and rotate per-dep batch data on every
5411    /// node. Run at the end of every wave (regular drain via `run_wave`,
5412    /// activation drain via `activate_derived`, and `BatchGuard::drop`'s
5413    /// drain). Centralized so a future wave-state field can't be missed
5414    /// at one of the cleanup sites.
5415    ///
5416    /// Per-dep rotation (R2.9.b / R1.3.6.b):
5417    /// - `prev_data` ← last element of `data_batch` (or unchanged if empty).
5418    ///   The last batch entry's retain transfers to `prev_data`; the old
5419    ///   `prev_data`'s retain is released. All earlier batch entries are
5420    ///   released.
5421    /// - `data_batch` cleared.
5422    /// - Per-dep `dirty` and `involved_this_wave` cleared.
5423    ///
5424    /// Handle releases are pushed to `deferred_handle_releases` for
5425    /// post-lock-drop release by the caller.
5426    pub(crate) fn clear_wave_state(&mut self, ws: &mut crate::batch::WaveState) {
5427        // Q-beyond Sub-slice 1 (D108, 2026-05-09): `pending_auto_resolve`
5428        // + `pending_pause_overflow` clears moved to
5429        // [`crate::batch::WaveState::clear_wave_state`]. The per-NodeRecord
5430        // rotation below pushes batch-handle and prev_data releases into
5431        // `ws.deferred_handle_releases` (was `cps.deferred_handle_releases`
5432        // pre-sub-slice-1, was `s.deferred_handle_releases` pre-Q2). Caller
5433        // borrows the WaveState thread_local; no lock-discipline rule
5434        // applies (state lock + thread_local borrow are independent).
5435        //
5436        // Q-beyond Sub-slice 3 (D108, 2026-05-09):
5437        // `invalidate_hooks_fired_this_wave` clear moved to
5438        // [`crate::batch::WaveState::clear_wave_state`]. The
5439        // `deferred_cleanup_hooks` invariant (NOT cleared here, drained
5440        // explicitly on success/panic paths) likewise moves with the
5441        // field.
5442        //
5443        // /qa F2 reverted (2026-05-10): `currently_firing` stays on
5444        // CoreState (per-Core, cross-thread visible — load-bearing for
5445        // P13). Defensive clear here mirrors the pre-sub-slice-3 safety
5446        // net (`FiringGuard`'s RAII push/pop is balanced even on panic;
5447        // a future code path that bypasses the guard would otherwise
5448        // leak a stale entry into the next wave).
5449        //
5450        // Step 2a (D220-EXEC): `currently_firing` now lives in the
5451        // separate `CoreShared` region (unreachable from `&mut
5452        // CoreState`). The defensive clear is performed by the two
5453        // `clear_wave_state` call sites in `batch.rs` via the combined
5454        // `St` guard's `.shared` field, immediately adjacent — same
5455        // wave-end point, same belt-and-suspenders intent.
5456        //
5457        // Slice G tier3 emit tracking lives on the per-thread
5458        // `WAVE_STATE`; cleared by the outermost `BatchGuard::drop`
5459        // (S4/D246: `WaveOwnerGuard` deleted — single-owner ⇒ one
5460        // uninterrupted owner-side drain, no per-partition release).
5461        for rec in self.nodes.values_mut() {
5462            rec.dirty = false;
5463            rec.involved_this_wave = false;
5464            // §10.3 perf (Slice V1): clear involved_mask in one op.
5465            rec.involved_mask = 0;
5466            for dr in &mut rec.dep_records {
5467                let batch_len = dr.data_batch.len();
5468                if batch_len > 0 {
5469                    // Release all batch entries EXCEPT the last — the last
5470                    // entry's retain transfers to prev_data.
5471                    for &h in &dr.data_batch[..batch_len - 1] {
5472                        ws.deferred_handle_releases.push(h);
5473                    }
5474                    // Release the OLD prev_data (its retain was from the
5475                    // previous wave's rotation or from initial delivery).
5476                    if dr.prev_data != NO_HANDLE {
5477                        ws.deferred_handle_releases.push(dr.prev_data);
5478                    }
5479                    // Rotate: last batch entry becomes new prev_data.
5480                    // Its retain carries over — no extra retain needed.
5481                    dr.prev_data = dr.data_batch[batch_len - 1];
5482                    dr.data_batch.clear();
5483                }
5484                dr.dirty = false;
5485                dr.involved_this_wave = false;
5486            }
5487        }
5488    }
5489
5490    pub(crate) fn require_node(&self, id: NodeId) -> &NodeRecord {
5491        self.nodes
5492            .get(&id)
5493            .unwrap_or_else(|| panic!("unknown node {id:?}"))
5494    }
5495
5496    pub(crate) fn require_node_mut(&mut self, id: NodeId) -> &mut NodeRecord {
5497        self.nodes
5498            .get_mut(&id)
5499            .unwrap_or_else(|| panic!("unknown node {id:?}"))
5500    }
5501}
5502
5503/// Release every binding-side refcount share owned by this `CoreState`
5504/// when the last `Core` clone drops the inner Mutex.
5505///
5506/// Without this, every retained handle in `cache` / `terminal` Error /
5507/// `dep_terminals` Error / pause-buffer-payload would leak in the binding
5508/// registry until process exit. Production bindings (napi-rs, pyo3,
5509/// wasm-bindgen) all maintain handle-ref maps that grow unbounded without
5510/// this cleanup.
5511///
5512/// Safe to call during panic unwinding — `BindingBoundary::release_handle`
5513/// is the only call, and a panicking binding during cleanup would already
5514/// have been a problem in normal operation.
5515impl Drop for CoreState {
5516    fn drop(&mut self) {
5517        // Q-beyond Sub-slice 3 (D108, 2026-05-09): `deferred_flush_jobs`
5518        // moved to [`crate::batch::WaveState`]. The `Vec<Sink>` clones
5519        // drop naturally with the per-thread WaveState's lifetime; no
5520        // CoreState-side cleanup needed.
5521        // Q-beyond Sub-slice 1 (D108, 2026-05-09): `deferred_handle_releases`
5522        // and `wave_cache_snapshots` moved to per-thread WaveState
5523        // thread_local. By outermost-BatchGuard-drop discipline both fields
5524        // are empty by the time CoreState drops (BatchGuard owns a Core
5525        // clone, so Core can't drop while a BatchGuard is in flight). Any
5526        // thread that ran a wave on this Core drained on its own outermost
5527        // BatchGuard; cross-Core thread_local sharing is fine because each
5528        // wave drains its own retains.
5529        //
5530        // Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_fires` and
5531        // `pending_notify` likewise moved to per-thread WaveState. The
5532        // pre-Sub-slice-2 `pending_notify` walk here (drain + release each
5533        // payload_handle) is no longer reachable from `Drop for CoreState`:
5534        // by invariant, no wave is in flight when CoreState drops (BatchGuard
5535        // holds a Core clone), so the originating thread's WaveState
5536        // pending_notify is empty by then. Other threads' WaveStates are
5537        // unreachable from CoreState::drop anyway — they're per-thread
5538        // thread_locals scoped to whichever thread ran the wave. The
5539        // outermost `BatchGuard::drop` is the canonical drain point on both
5540        // success and panic paths; Drop for CoreState relies on that
5541        // discipline holding rather than re-implementing it.
5542
5543        // Per-node retained handles:
5544        //   - `cache` (1 retain per non-NO_HANDLE state cache or
5545        //     populated compute cache).
5546        //   - `terminal == Some(Error(h))` (1 retain on the terminal slot).
5547        //   - `dep_terminals[i] == Some(Error(h))` (1 retain per consumer's
5548        //     terminated-dep slot).
5549        //   - `pause_state` paused buffer messages with payload handles
5550        //     (1 retain per buffered Data/Error).
5551        for rec in self.nodes.values_mut() {
5552            if rec.cache != NO_HANDLE {
5553                self.binding.release_handle(rec.cache);
5554            }
5555            if let Some(TerminalKind::Error(h)) = rec.terminal {
5556                self.binding.release_handle(h);
5557            }
5558            for dr in &rec.dep_records {
5559                if let Some(TerminalKind::Error(h)) = dr.terminal {
5560                    self.binding.release_handle(h);
5561                }
5562                // Release data_batch retains (in-flight wave data).
5563                for &h in &dr.data_batch {
5564                    self.binding.release_handle(h);
5565                }
5566                // Release prev_data retain (cross-wave persistence).
5567                if dr.prev_data != NO_HANDLE {
5568                    self.binding.release_handle(dr.prev_data);
5569                }
5570            }
5571            if let PauseState::Paused { buffer, .. } = &rec.pause_state {
5572                for msg in buffer {
5573                    if let Some(h) = msg.payload_handle() {
5574                        self.binding.release_handle(h);
5575                    }
5576                }
5577            }
5578            // Slice E1: release replay-buffer retains.
5579            for &h in &rec.replay_buffer {
5580                self.binding.release_handle(h);
5581            }
5582            // Operator scratch (Slice C-3, D026): generic per-operator
5583            // state struct. Each variant's release_handles releases the
5584            // shares it owns (Scan/Reduce acc, Distinct/Pairwise prev,
5585            // Last latest + default; Take/Skip/TakeWhile own no handles).
5586            if let Some(scratch) = rec.op_scratch.as_mut() {
5587                scratch.release_handles(&*self.binding);
5588            }
5589        }
5590
5591        // D-α (D028) `pending_scratch_release` shutdown drain moved to
5592        // `impl Drop for CoreShared` (Step 2a, D220-EXEC): the queue +
5593        // its `binding` were hoisted into the separate `CoreShared`
5594        // region, so `Drop for CoreState` can no longer reach them.
5595        // `CoreShared` (one per Core, owned by the cell) drops with the
5596        // cell — its `Drop` performs the identical
5597        // release-before-Vec-drop discipline. The per-node retain walk
5598        // above stays here (per-shard `CoreState` owns its `nodes`).
5599
5600        // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): WaveState's
5601        // retain-holding fields (`wave_cache_snapshots`,
5602        // `deferred_handle_releases`, `pending_notify`) are drained by
5603        // outermost BatchGuard::drop (success + panic paths). See
5604        // comment above for the invariant.
5605    }
5606}