Skip to main content

graphrefly_core/
node.rs

1//! The dispatcher — node registration, subscription, wave engine.
2//!
3//! Mirrors `~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`
4//! (the Phase 13.6 brainstorm prototype, ~370 lines, 22 invariant tests).
5//!
6//! # Scope (M1 dispatcher + Slice A+B parity, closed 2026-05-05)
7//!
8//! - State + derived + dynamic node registration.
9//! - Subscribe / unsubscribe with push-on-subscribe (R1.2.3).
10//! - RAII [`Subscription`] with Drop-based deregister (§10.12).
11//! - DIRTY → DATA / RESOLVED ordering (R1.3.1.b two-phase push).
12//! - Equals-substitution (R1.3.2): identity is zero-FFI; custom crosses boundary.
13//! - First-run gate (R2.5.3) — fn does not fire until every dep has a handle.
14//! - Diamond resolution — one fn fire per wave even with shared upstream.
15//! - `set_deps()` atomic dep mutation with cycle detection + Phase 13.8 Q1
16//!   terminal-rejection policy (R3.3.1).
17//! - PAUSE / RESUME with lockId set + replay buffer (R1.2.6, R2.6, §10.2).
18//! - INVALIDATE broadcast + cascade with R1.4 idempotency.
19//! - COMPLETE / ERROR cascade + Lock 2.B auto-cascade gating
20//!   (ERROR dominates COMPLETE; first error wins).
21//! - TEARDOWN auto-precedes COMPLETE (R2.6.4 / Lock 6.F) +
22//!   `has_received_teardown` idempotency.
23//! - Meta TEARDOWN ordering (R1.3.9.d) — companions tear down before parent.
24//! - Resubscribable terminal lifecycle (R2.2.7, R2.5.3) — late subscribe to a
25//!   resubscribable terminal node resets lifecycle, except after TEARDOWN
26//!   (per F3 audit guard: TEARDOWN is permanent).
27//!
28//! # Module split (Slice C-1, 2026-05-05)
29//!
30//! Wave-engine internals (drain loop, fire selection, emission commit, sink
31//! dispatch) live in [`crate::batch`]. The split is purely organizational —
32//! the methods are still on `Core`. See `batch.rs` for the wave-engine
33//! entry points (`run_wave`, `drain_and_flush`, `commit_emission`,
34//! `queue_notify`, `deliver_data_to_consumer`).
35//!
36//! # Out of scope (later slices / milestones)
37//!
38//! - Deactivation cleanup (RAM nodes clear cache when sink count → 0) — M2.
39//!
40//! See [`migration-status.md`](../../../docs/migration-status.md) for the
41//! milestone tracker and [`porting-deferred.md`](../../../docs/porting-deferred.md)
42//! for surfaced concerns deferred to evidence-driven slices.
43//!
44//! # Re-entrance discipline (Slice A close, M1: fully lock-released)
45//!
46//! - **Wave-end sink fires** drop the state lock first. A subscriber's sink
47//!   that calls back into `Core::emit` / `pause` / `resume` / `invalidate` /
48//!   `complete` / `error` / `teardown` re-acquires the lock cleanly and runs
49//!   a nested wave (`s.in_tick` is cleared before the deferred-fire phase).
50//! - **`BindingBoundary::invoke_fn`** fires lock-released. The wave engine
51//!   acquires + drops the state lock per fn-fire iteration around the
52//!   `invoke_fn` callback. User fns may re-enter `Core::emit` / `pause` /
53//!   etc. and run a nested wave.
54//! - **`BindingBoundary::custom_equals`** fires lock-released.
55//!   `commit_emission` brackets the equals check around a lock release;
56//!   custom equals oracles may re-enter Core safely.
57//! - **Subscribe-time handshake** also fires lock-released. [`Core::subscribe`]
58//!   acquires the [`Core::wave_owner`] re-entrant mutex first (cross-thread
59//!   serialization), installs the sink under the state lock, drops the state
60//!   lock, then fires the per-tier handshake (`[Start]` / `[Data(cache)]?` /
61//!   `[Complete]?` / `[Error(h)]?` / `[Teardown]?` per R1.3.5.a) lock-released.
62//!   A handshake-time sink callback may re-enter Core (`emit` / `complete` /
63//!   `error` / `subscribe`); same-thread re-entry passes through `wave_owner`
64//!   transparently. Cross-thread emits block on `wave_owner` until the
65//!   subscribe path drops it, preserving R1.3.5.a happens-after ordering.
66
67use std::collections::VecDeque;
68use std::panic::{catch_unwind, AssertUnwindSafe};
69use std::sync::{Arc, Weak};
70
71use ahash::{AHashMap as HashMap, AHashSet as HashSet};
72use indexmap::IndexMap;
73use parking_lot::{ArcReentrantMutexGuard, Mutex, MutexGuard};
74
75/// Held guard from `parking_lot::ReentrantMutex::lock_arc()` on a
76/// partition's `wave_owner`. `!Send` per `parking_lot::ReentrantMutex`'s
77/// thread-affinity contract — the type-level `!Send` flows into
78/// [`crate::batch::BatchGuard::_wave_guards`] so any attempt to send
79/// the batch guard across threads fails to compile.
80///
81/// Slice Y1 / Phase E (2026-05-08).
82pub(crate) type WaveOwnerGuard =
83    ArcReentrantMutexGuard<parking_lot::RawMutex, parking_lot::RawThreadId, ()>;
84use smallvec::SmallVec;
85use thiserror::Error;
86
87use crate::batch::PendingPerNode;
88use crate::boundary::{BindingBoundary, CleanupTrigger};
89use crate::clock::monotonic_ns;
90use crate::handle::{FnId, HandleId, LockId, NodeId, NO_HANDLE};
91use crate::message::Message;
92
93/// Terminal-lifecycle state — once set on a node, the node will not emit
94/// further DATA; per-dep slots on consumers also use this to track which
95/// upstreams have terminated (R1.3.4 / Lock 2.B).
96///
97/// `Error` carries a [`HandleId`] resolving to the error value. Refcount is
98/// retained when the variant is stored in a node's `terminal` slot or any
99/// consumer's `dep_terminals` slot; v1 does not release these (terminal
100/// state is one-shot at this layer; release happens on resubscribable
101/// terminal-lifecycle reset, a separate slice).
102#[derive(Copy, Clone, Debug, PartialEq, Eq)]
103pub enum TerminalKind {
104    Complete,
105    Error(HandleId),
106}
107
108/// Node kind discriminant — **derived metadata** computed from
109/// [`NodeRecord`]'s field shape (D030 unification, Slice D).
110///
111/// Core no longer stores `kind` as a field; it's computed on demand from
112/// `(deps.is_empty(), fn_id.is_some(), op.is_some(), is_dynamic)`,
113/// mirroring TS's data model where `NodeImpl` has no `_kind` field. The
114/// shape uniquely identifies the kind:
115///
116/// | deps      | fn_id | op   | is_dynamic | kind     |
117/// |-----------|-------|------|-----------|----------|
118/// | empty     | None  | None | -         | State    |
119/// | empty     | Some  | None | -         | Producer |
120/// | non-empty | Some  | None | false     | Derived  |
121/// | non-empty | Some  | None | true      | Dynamic  |
122/// | non-empty | None  | Some | -         | Operator |
123///
124/// Public API ([`Core::kind_of`]) derives this enum on each call. State
125/// nodes are ROM (cache survives deactivation); compute nodes
126/// (Derived / Dynamic / Operator) and producers are RAM.
127#[derive(Copy, Clone, Eq, PartialEq, Debug)]
128pub enum NodeKind {
129    /// Source node: cache is intrinsic, no fn, no deps. Mutated via [`Core::emit`].
130    State,
131    /// Producer node: fn fires once on first subscribe. No deps;
132    /// emissions arrive via sinks the fn subscribes to (zip / concat /
133    /// race / takeUntil pattern). Slice D / D031.
134    Producer,
135    /// Derived node: fn fires on every dep change; all deps tracked.
136    Derived,
137    /// Dynamic node: fn declares which dep indices it actually read this run.
138    /// Untracked dep updates flow through cache but do NOT re-fire fn.
139    Dynamic,
140    /// Operator node: built-in dispatch path for transform / combine /
141    /// flow / resilience operators. The `OperatorOp` discriminant selects
142    /// the per-operator FFI path ([`BindingBoundary::project_each`] etc.);
143    /// Core manages per-operator state via the generic `op_scratch` slot
144    /// on `NodeRecord` (D026). Per Slice C-1 (D009) / Slice C-3 (D026).
145    Operator(OperatorOp),
146}
147
148impl NodeKind {
149    /// True if this kind opts OUT of Lock 2.B auto-cascade. Operator(Reduce)
150    /// and Operator(Last) must intercept upstream COMPLETE so they can emit
151    /// their accumulator / buffered value before the cascade terminates them;
152    /// instead of cascading, terminate_node queues such children for fn-fire
153    /// so `fire_operator` can handle the terminal.
154    pub(crate) fn skips_auto_cascade(self) -> bool {
155        matches!(
156            self,
157            NodeKind::Operator(OperatorOp::Reduce { .. } | OperatorOp::Last { .. })
158        )
159    }
160}
161
162/// Built-in operator discriminant. Selects the per-operator dispatch path
163/// in `fire_operator` (`crates/graphrefly-core/src/batch.rs`). Each variant
164/// carries the binding-side closure ids (and seed handle for stateful
165/// folders) needed for the wave-execution path; Core stores no user values
166/// itself per the handle-protocol cleaving plane.
167#[derive(Copy, Clone, Eq, PartialEq, Debug)]
168pub enum OperatorOp {
169    /// `map(source, project)` — element-wise transform. Calls
170    /// `BindingBoundary::project_each(fn_id, &inputs)` per fire; emits each
171    /// returned handle via `commit_emission_verbatim` (R1.3.2.d batch
172    /// semantics — no equals substitution between batch entries).
173    Map { fn_id: FnId },
174    /// `filter(source, predicate)` — silent-drop selection (D012/D018).
175    /// Calls `BindingBoundary::predicate_each(fn_id, &inputs)`; emits each
176    /// passing input verbatim. If zero pass on a wave that dirtied the
177    /// node, queues a single `RESOLVED` to settle (D018).
178    Filter { fn_id: FnId },
179    /// `scan(source, fold, seed)` — left-fold emitting each new accumulator.
180    /// `seed` is captured at registration; `acc` lives in
181    /// [`ScanState`](super::op_state::ScanState) inside
182    /// [`NodeRecord::op_scratch`] and persists across waves until
183    /// resubscribable reset. Calls `BindingBoundary::fold_each(fn_id, acc,
184    /// &inputs) -> SmallVec<HandleId>` per fire.
185    Scan { fn_id: FnId, seed: HandleId },
186    /// `reduce(source, fold, seed)` — left-fold emitting once on upstream
187    /// COMPLETE. Accumulates silently while source DATA flows; on
188    /// dep[0].terminal == Some(Complete), emits `[Data(acc), Complete]`.
189    /// On `Error(h)`, propagates the error verbatim. Opts out of Lock 2.B
190    /// auto-cascade (see `NodeKind::skips_auto_cascade`).
191    Reduce { fn_id: FnId, seed: HandleId },
192    /// `distinctUntilChanged(source, equals)` — suppresses adjacent
193    /// duplicates. Calls `BindingBoundary::custom_equals(equals_fn_id,
194    /// prev, current)` per input; emits non-equal items verbatim and
195    /// updates `prev`. If zero items pass on a wave that dirtied the node,
196    /// queues `RESOLVED` (matches Filter discipline).
197    DistinctUntilChanged { equals_fn_id: FnId },
198    /// `pairwise(source)` — emits `(prev, current)` pairs starting after
199    /// the second value. First value swallowed (sets `prev`). Calls
200    /// `BindingBoundary::pairwise_pack(fn_id, prev, current)` per pair to
201    /// produce the binding-side tuple handle.
202    Pairwise { fn_id: FnId },
203
204    // ----- Slice C-2: multi-dep combinators (D020) -----
205    /// `combine(...sources)` — N-dep combineLatest. On any dep fire, packs
206    /// the latest handle per dep into a single tuple handle via
207    /// `BindingBoundary::pack_tuple(pack_fn, &handles)`. First-run gate
208    /// (`partial: false` default) holds until all deps deliver real DATA
209    /// (R2.5.3). COMPLETE cascades when all deps complete (R1.3.4.b).
210    Combine { pack_fn: FnId },
211
212    /// `withLatestFrom(primary, secondary)` — 2-dep, fire-on-primary-only
213    /// (D021, Phase 10.5). Packs `[primary, secondary]` via
214    /// `BindingBoundary::pack_tuple(pack_fn, &handles)` when dep[0]
215    /// (primary) has DATA in the wave. If only dep[1] (secondary) fires,
216    /// settles with RESOLVED (D018 pattern). First-run gate holds until
217    /// both deps deliver (R2.5.3 `partial: false`). Post-warmup INVALIDATE
218    /// guard: if secondary `prev_data == NO_HANDLE` and batch empty after
219    /// warmup, settles with RESOLVED (no stale pair).
220    WithLatestFrom { pack_fn: FnId },
221
222    /// `merge(...sources)` — N-dep, forward all DATA handles verbatim
223    /// (D022). Zero FFI on fire: no transformation, no binding call.
224    /// Each dep's batch handles are retained and emitted individually.
225    /// COMPLETE cascades when all deps complete (R1.3.4.b).
226    Merge,
227
228    // ----- Slice C-3: flow operators (D024) -----
229    /// `take(source, count)` — emits the first `count` DATA values then
230    /// self-completes via `Core::complete`. Tracks `count_emitted` in
231    /// [`TakeState`](super::op_state::TakeState). When upstream completes
232    /// before `count` is reached, the standard auto-cascade propagates
233    /// COMPLETE. `count == 0` is allowed: the first fire emits zero
234    /// items then immediately self-completes (D027).
235    Take { count: u32 },
236
237    /// `skip(source, count)` — drops the first `count` DATA values; once
238    /// the threshold is crossed, subsequent DATAs pass through verbatim.
239    /// Tracks `count_skipped` in [`SkipState`](super::op_state::SkipState).
240    /// On a wave where every input is still in the skip window, queues
241    /// DIRTY+RESOLVED to settle (D018 pattern).
242    Skip { count: u32 },
243
244    /// `takeWhile(source, predicate)` — emits while `predicate(input)`
245    /// holds; on the first `false`, emits any preceding passes then
246    /// self-completes via `Core::complete`. Reuses
247    /// [`BindingBoundary::predicate_each`] (D029); after the first
248    /// `false`, subsequent inputs in the same batch are dropped.
249    TakeWhile { fn_id: FnId },
250
251    /// `last(source)` / `last_with_default(source, default)` — buffers
252    /// the latest DATA; on upstream COMPLETE, emits `Data(latest)` then
253    /// `Complete`. The `default` field is `NO_HANDLE` for the no-default
254    /// factory (emits only `Complete` on empty stream), or a registered
255    /// default handle (emits `Data(default)` + `Complete` on empty
256    /// stream). Storage: [`LastState`](super::op_state::LastState) holds
257    /// `latest` (live buffer) and `default` (registration-time, stable).
258    /// Opts out of Lock 2.B auto-cascade so it can intercept upstream
259    /// COMPLETE.
260    Last { default: HandleId },
261}
262
263/// Registration options for [`Core::register_operator`].
264///
265/// `equals` controls operator output dedup (R5.7 — defaults to identity).
266/// `partial` controls the R2.5.3 first-run gate (R5.4 — operator dispatch
267/// fires on first DATA from any dep when `true`; default `false` matches
268/// the gated derived discipline).
269#[derive(Copy, Clone, Debug)]
270pub struct OperatorOpts {
271    pub equals: EqualsMode,
272    pub partial: bool,
273}
274
275impl Default for OperatorOpts {
276    fn default() -> Self {
277        Self {
278            equals: EqualsMode::Identity,
279            partial: false,
280        }
281    }
282}
283
284/// Closure-form fn id OR typed operator discriminant — the two dispatch
285/// paths a node can use. State / passthrough nodes pass `None` to
286/// [`Core::register`] (no fn at all).
287#[derive(Copy, Clone, Debug)]
288pub enum NodeFnOrOp {
289    /// Closure-form: invokes [`BindingBoundary::invoke_fn`] per fire.
290    /// Used for Derived / Dynamic / Producer.
291    Fn(FnId),
292    /// Typed-op: routes to a `fire_op_*` helper that calls per-operator
293    /// FFI methods (`project_each` / `predicate_each` / `fold_each` /
294    /// `pairwise_pack` / `pack_tuple`). Used for Operator nodes.
295    Op(OperatorOp),
296}
297
298/// Pause behavior mode (canonical-spec §2.6 — three modes shipped in TS;
299/// Slice F audit, 2026-05-07 — closed the Rust port gap).
300///
301/// | Mode | Outgoing tier-3 routing while paused | RESUME behavior |
302/// |---|---|---|
303/// | [`PausableMode::Default`] | suppress fn-fire upstream (no DIRTY emitted) | fire fn ONCE on RESUME if any dep delivered DATA during pause; collapses N pause-window writes into one settle |
304/// | [`PausableMode::ResumeAll`] | buffer outgoing tier-3 / tier-4 messages per-wave | replay each buffered wave verbatim on RESUME |
305/// | [`PausableMode::Off`] | dispatcher ignores PAUSE; tier-3 flushes immediately | no-op (no buffer to drain) |
306///
307/// Default is [`PausableMode::Default`] per canonical §2.6 — every untagged
308/// source picks it up. Memory profile is O(1) per node (no buffer); the
309/// trade-off is "subscribers see one consolidated DATA on RESUME" rather
310/// than the K mid-pause emissions verbatim.
311///
312/// Note: tier-1 (DIRTY) / tier-2 (PAUSE/RESUME) / tier-5 (COMPLETE/ERROR) /
313/// tier-6 (TEARDOWN) bypass pause regardless of mode — they remain
314/// observable so leaked pause-controllers cannot strand subscribers.
315#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
316pub enum PausableMode {
317    /// Suppress fn-fire while paused; fire once on RESUME if any dep
318    /// delivered DATA during the pause window. Canonical default.
319    #[default]
320    Default,
321    /// Buffer outgoing tier-3 / tier-4 messages per-wave; replay on
322    /// RESUME. Use when subscribers need verbatim emit history (e.g. an
323    /// audit log, replay-on-reconnect bridge).
324    ResumeAll,
325    /// Dispatcher ignores PAUSE for this node — tier-3 flushes
326    /// immediately even while a lock is held. Use for nodes whose value
327    /// production is intrinsically pause-immune (telemetry counters,
328    /// monotonic timers).
329    Off,
330}
331
332/// Per-kind opts for [`Core::register`]. Cross-kind config knobs live
333/// here; per-kind specifics (deps, fn_or_op) live on
334/// [`NodeRegistration`].
335#[derive(Copy, Clone, Debug)]
336pub struct NodeOpts {
337    /// Initial cached value. Only valid for state nodes (no deps + no
338    /// fn + no op). [`NO_HANDLE`] starts the node sentinel.
339    pub initial: HandleId,
340    /// Equality mode for outgoing emissions (R1.3.2). Defaults to
341    /// [`EqualsMode::Identity`].
342    pub equals: EqualsMode,
343    /// First-run gate (R2.5.3 / D011). When `true`, the node fires as
344    /// soon as ANY dep delivers a real handle; when `false` (default),
345    /// the node holds until every dep has delivered.
346    pub partial: bool,
347    /// Dynamic flag (R2.5.3) — fn declares actually-tracked dep indices
348    /// per fire. Only meaningful when `fn_or_op == Some(Fn(_))` AND
349    /// deps non-empty.
350    pub is_dynamic: bool,
351    /// Pause behavior mode (canonical §2.6). Default is
352    /// [`PausableMode::Default`]. See [`PausableMode`] for the trade-offs.
353    pub pausable: PausableMode,
354    /// Replay buffer cap (canonical R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
355    /// `None` (default) disables; `Some(N)` keeps a circular buffer of the
356    /// last N DATA emissions and replays them to late subscribers as part
357    /// of the per-tier handshake (between [`Message::Start`] and any
358    /// terminal slice). Only DATA is buffered; RESOLVED entries are NOT
359    /// (R2.6.5 explicit "DATA only").
360    pub replay_buffer: Option<usize>,
361}
362
363impl Default for NodeOpts {
364    fn default() -> Self {
365        Self {
366            initial: NO_HANDLE,
367            equals: EqualsMode::Identity,
368            partial: false,
369            is_dynamic: false,
370            pausable: PausableMode::Default,
371            replay_buffer: None,
372        }
373    }
374}
375
376/// Unified node-registration descriptor (D030, Slice D).
377///
378/// All node kinds (State / Producer / Derived / Dynamic / Operator)
379/// register through [`Core::register`] with a `NodeRegistration`. The
380/// kind is **derived from the field shape** of the registration —
381/// `(deps.is_empty(), fn_or_op variant)`:
382///
383/// | deps      | fn_or_op   | is_dynamic | resulting kind |
384/// |-----------|-----------|-----------|----------------|
385/// | empty     | None      | -         | State          |
386/// | empty     | Some(Fn)  | -         | Producer       |
387/// | non-empty | Some(Fn)  | false     | Derived        |
388/// | non-empty | Some(Fn)  | true      | Dynamic        |
389/// | non-empty | Some(Op)  | -         | Operator       |
390///
391/// The sugar wrappers ([`Core::register_state`], [`Core::register_producer`],
392/// etc.) build a `NodeRegistration` and delegate.
393#[derive(Clone, Debug)]
394pub struct NodeRegistration {
395    /// Upstream deps in declaration order. Empty for state / producer.
396    pub deps: Vec<NodeId>,
397    /// Closure-form fn id or typed-op discriminant. `None` for state /
398    /// passthrough.
399    pub fn_or_op: Option<NodeFnOrOp>,
400    /// Cross-kind config knobs.
401    pub opts: NodeOpts,
402}
403
404/// Equality mode for a node's outgoing emissions.
405///
406/// `Identity` is the default: cache vs. new handle compare is a `u64` equal —
407/// zero FFI. `Custom` invokes [`BindingBoundary::custom_equals`] every check
408/// (R1.3.2.b two-arg call when both sides are non-sentinel).
409#[derive(Copy, Clone, Debug)]
410pub enum EqualsMode {
411    Identity,
412    Custom(FnId),
413}
414
415/// Internal identifier for a single subscription. Allocated per
416/// [`Core::subscribe`] call. Wrapped by [`Subscription`] for the public API;
417/// consumed directly only by Core internals and the [`Subscription::Drop`]
418/// path.
419#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
420pub(crate) struct SubscriptionId(u64);
421
422/// RAII subscription handle.
423///
424/// Returned by [`Core::subscribe`]. While the handle is held, the sink stays
425/// registered against its node. Dropping the handle (explicitly via
426/// `drop(sub)` or implicitly at scope exit) unsubscribes the sink — no manual
427/// `unsubscribe()` call is needed. Per §10.12 of the rust-port session doc.
428///
429/// # Lifetime semantics
430///
431/// The subscription holds a [`Weak`] reference back to the Core's state. If
432/// the Core is dropped before the subscription, the Drop impl is a silent
433/// no-op (the sink has nowhere to deregister from anyway). This avoids a
434/// reference cycle when subscribers capture an `Arc<Core>` in their closure.
435///
436/// # Thread safety
437///
438/// `Send + Sync`. The handle can be moved across threads or dropped from
439/// any thread.
440///
441/// # Not Clone
442///
443/// `Subscription` owns the unsubscribe action exclusively. Cloning would
444/// require either "first drop wins" or "last drop wins" semantics, both
445/// of which surprise. If a binding needs multiple deregistration handles,
446/// it should subscribe multiple times (each producing a fresh handle) or
447/// wrap the single `Subscription` in `Arc<Mutex<Option<Subscription>>>`.
448#[must_use = "dropping a Subscription unsubscribes its sink immediately"]
449pub struct Subscription {
450    state: Weak<Mutex<CoreState>>,
451    node_id: NodeId,
452    sub_id: SubscriptionId,
453}
454
455impl Subscription {
456    /// The node this subscription is attached to.
457    #[must_use]
458    pub fn node_id(&self) -> NodeId {
459        self.node_id
460    }
461}
462
463impl Drop for Subscription {
464    fn drop(&mut self) {
465        // Silent no-op if Core is gone. This keeps Drop infallible (no panics
466        // from a dropped subscription racing a dropped Core) and avoids
467        // surprising users with errors on shutdown.
468        //
469        // Producer deactivation (Slice D, D031): if removing this sub
470        // empties the subscribers map AND the node is a producer, fire
471        // `BindingBoundary::producer_deactivate(node_id)` AFTER releasing
472        // the state lock. The binding then drops its per-node state
473        // (subscriptions to upstream sources, captured closure state),
474        // which transitively unsubs from upstreams via their own
475        // `Subscription::Drop`. Re-entrance into Core from the deactivate
476        // hook is permitted since the lock is released first.
477        let Some(state) = self.state.upgrade() else {
478            return;
479        };
480        // Slice E2 (D056): when the last subscriber drops, fire the
481        // node's OnDeactivation cleanup hook BEFORE producer_deactivate
482        // (cleanup may release handles the producer subscription owns;
483        // reverse order would let producer_deactivate drop subs that user
484        // cleanup expected to be live). Both calls are lock-released per
485        // D045.
486        //
487        // OnDeactivation gating (D068, QA Q3 fix): fires only when the
488        // node has fired its fn at least once AND has a fn (`fn_id`
489        // populated). State nodes have no fn — they cannot register a
490        // cleanup spec via the production fn-return path (R2.4.5), so
491        // firing `cleanup_for` on them is wasted FFI; the binding's
492        // lookup is guaranteed to find no `current_cleanup`. Skipping
493        // here saves the FFI hop and matches the design-doc wording
494        // ("never-fired state nodes" — state-with-initial-value satisfies
495        // `has_fired_once = true` but still has no fn).
496        //
497        // Slice E2 /qa Q2(b) (D069): if the node is a resubscribable
498        // node that's ALREADY terminal (terminate fired BEFORE this last
499        // sub drop), fire `wipe_ctx` lock-released AFTER OnDeactivation
500        // + producer_deactivate. Mutually exclusive with `terminate_node`'s
501        // queue-wipe site: terminate-with-empty-subs goes through
502        // `pending_wipes`; terminate-with-live-subs routes here when
503        // those subs eventually drop. Either path fires exactly one
504        // wipe per terminal lifecycle.
505        let (was_last_sub, is_producer, has_user_cleanup, fire_wipe, binding) = {
506            let mut s = state.lock();
507            let Some(rec) = s.nodes.get_mut(&self.node_id) else {
508                return;
509            };
510            rec.subscribers.remove(&self.sub_id);
511            // Slice X4 / D2: bump revision so any pending_notify entry for
512            // this node opened earlier in the wave starts a fresh batch on
513            // the next queue_notify, dropping the now-departed sink from
514            // the snapshot.
515            rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
516            let last = rec.subscribers.is_empty();
517            let producer = rec.is_producer();
518            // OnDeactivation gate: must have run a fn at least once
519            // (has_fired_once) AND have a fn registered (fn_id.is_some()).
520            // The fn_id check excludes state nodes whose has_fired_once
521            // tracks initial-value status, not "user fn ran."
522            let user_cleanup = rec.has_fired_once && rec.fn_id.is_some();
523            let fire_wipe = last && rec.resubscribable && rec.terminal.is_some();
524            // Clone the binding Arc out only if at least one hook will
525            // fire. Cheap (Arc::clone) in the common path; skipped on
526            // non-last-sub or never-fired non-producer nodes.
527            let binding = if last && (producer || user_cleanup || fire_wipe) {
528                Some(s.binding.clone())
529            } else {
530                None
531            };
532            (last, producer, user_cleanup, fire_wipe, binding)
533        };
534        if was_last_sub {
535            if let Some(binding) = binding {
536                if has_user_cleanup {
537                    binding.cleanup_for(self.node_id, CleanupTrigger::OnDeactivation);
538                }
539                if is_producer {
540                    binding.producer_deactivate(self.node_id);
541                }
542                // D069: eager wipe — fires AFTER OnDeactivation so the
543                // user closure observes pre-wipe `store` (matches the
544                // existing "OnDeactivation runs before wipe on terminal
545                // reset" invariant covered by test 10). Idempotent —
546                // `HashMap::remove` on absent key is a no-op, so even
547                // if the wave already drained `pending_wipes` earlier,
548                // this fire is benign.
549                if fire_wipe {
550                    binding.wipe_ctx(self.node_id);
551                }
552            }
553        }
554    }
555}
556
557// Compile-time assertion that Subscription is Send + Sync. If a future field
558// breaks this, the build fails here rather than downstream at the binding
559// site.
560const _: fn() = || {
561    fn assert_send_sync<T: Send + Sync>() {}
562    assert_send_sync::<Subscription>();
563};
564
565/// A subscriber callback. `Send + Sync` so the Core can fire it from any
566/// thread; `Fn` (not `FnMut`) so multiple references coexist — capture
567/// mutable state in `Mutex<T>` or atomics on the binding side.
568pub type Sink = Arc<dyn Fn(&[Message]) + Send + Sync>;
569
570// ---------------------------------------------------------------------------
571// PAUSE/RESUME state — §10.2 of the rust-port session doc
572// ---------------------------------------------------------------------------
573
574/// Per-node pause state.
575///
576/// Replaces the four TS fields (`_pauseLocks`, `_pauseBuffer`,
577/// `_pauseDroppedCount`, `_pauseStartNs`) with a single enum where
578/// the buffered fields are unreachable in the [`Self::Active`] variant —
579/// the compiler refuses access. Per §10.2 simplification.
580///
581/// # Invariants
582///
583/// - `Active` ⇔ no lockId held.
584/// - `Paused { locks, .. }` ⇔ `!locks.is_empty()`.
585/// - Buffered messages are tier 3 (DATA/RESOLVED) and tier 4 (INVALIDATE)
586///   only. Other tiers pass through immediately even while paused.
587/// - `dropped` counts messages that fell out the front of `buffer` due to
588///   the Core-global `pause_buffer_cap`; it is reported on resume so callers
589///   can detect overflow without re-tracking it externally.
590#[derive(Debug)]
591pub(crate) enum PauseState {
592    Active,
593    Paused {
594        /// Active lock holders. `SmallVec` keeps the common 1–2 lock case
595        /// stack-allocated. Replaces `Set<unknown>` from TS.
596        locks: SmallVec<[LockId; 2]>,
597        /// Buffered tier-3/tier-4 outgoing messages, in arrival order.
598        /// Replayed on the final RESUME.
599        buffer: VecDeque<Message>,
600        /// Count of messages dropped from the front when `buffer.len()` would
601        /// exceed `pause_buffer_cap`. Cleared on final RESUME (next pause
602        /// cycle starts fresh).
603        dropped: u32,
604        /// Wall-clock-monotonic ns when the lock first transitioned this node
605        /// from `Active` to `Paused`. Used by R1.3.8.c overflow ERROR
606        /// synthesis to compute `lock_held_duration_ms` in the diagnostic
607        /// payload (Slice F, A3 — 2026-05-07).
608        started_at_ns: u64,
609        /// True after the first overflow event in this pause cycle has been
610        /// reported via [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
611        /// Subsequent overflows in the same cycle don't re-emit ERROR
612        /// (canonical R1.3.8.c: "once per overflow event"). Cleared on
613        /// final RESUME (next pause cycle starts fresh).
614        overflow_reported: bool,
615        /// Default-mode bookkeeping (Slice F audit close, 2026-05-07).
616        /// Set to `true` when an upstream dep delivery arrives while this
617        /// node is paused with [`PausableMode::Default`]. On final RESUME,
618        /// if `true`, the node is added back to `pending_fires` so the fn
619        /// fires once with the consolidated dep state. Always `false` for
620        /// `ResumeAll` mode (the buffered messages are the consolidation
621        /// mechanism there). Cleared on final RESUME.
622        pending_wave: bool,
623    },
624}
625
626impl PauseState {
627    pub(crate) fn is_paused(&self) -> bool {
628        matches!(self, Self::Paused { .. })
629    }
630
631    fn lock_count(&self) -> usize {
632        match self {
633            Self::Active => 0,
634            Self::Paused { locks, .. } => locks.len(),
635        }
636    }
637
638    fn contains_lock(&self, lock_id: LockId) -> bool {
639        match self {
640            Self::Active => false,
641            Self::Paused { locks, .. } => locks.contains(&lock_id),
642        }
643    }
644
645    /// Add a lock; transitions Active → Paused on first lock. Idempotent on
646    /// duplicate lock_id (matches TS convention; spec is silent on the case).
647    fn add_lock(&mut self, lock_id: LockId) {
648        match self {
649            Self::Active => {
650                let mut locks = SmallVec::new();
651                locks.push(lock_id);
652                *self = Self::Paused {
653                    locks,
654                    buffer: VecDeque::new(),
655                    dropped: 0,
656                    started_at_ns: monotonic_ns(),
657                    overflow_reported: false,
658                    pending_wave: false,
659                };
660            }
661            Self::Paused { locks, .. } => {
662                if !locks.contains(&lock_id) {
663                    locks.push(lock_id);
664                }
665            }
666        }
667    }
668
669    /// Mark that an upstream dep delivered DATA to a node paused with
670    /// [`PausableMode::Default`]. The node will re-enter `pending_fires`
671    /// on final RESUME via [`Self::take_pending_wave`].
672    pub(crate) fn mark_pending_wave(&mut self) {
673        if let Self::Paused { pending_wave, .. } = self {
674            *pending_wave = true;
675        }
676    }
677
678    /// Read and clear the `pending_wave` flag. Called from
679    /// [`Core::resume`] when transitioning Paused → Active. Returns `true`
680    /// only if the node was paused with `pending_wave` set.
681    pub(crate) fn take_pending_wave(&mut self) -> bool {
682        if let Self::Paused { pending_wave, .. } = self {
683            std::mem::replace(pending_wave, false)
684        } else {
685            false
686        }
687    }
688
689    /// Remove a lock; if the lockset becomes empty, transition Paused →
690    /// Active and return the buffered messages for replay (along with the
691    /// dropped count for diagnostics). Unknown lock_id is an idempotent
692    /// no-op (matches TS, R1.2.6 implicit).
693    fn remove_lock(&mut self, lock_id: LockId) -> Option<(VecDeque<Message>, u32)> {
694        match self {
695            Self::Active => None,
696            Self::Paused { locks, .. } => {
697                if let Some(idx) = locks.iter().position(|l| *l == lock_id) {
698                    locks.swap_remove(idx);
699                }
700                if locks.is_empty() {
701                    let prev = std::mem::replace(self, Self::Active);
702                    if let Self::Paused {
703                        buffer, dropped, ..
704                    } = prev
705                    {
706                        return Some((buffer, dropped));
707                    }
708                }
709                None
710            }
711        }
712    }
713
714    /// Append a message to the buffer; if the buffer would exceed `cap`,
715    /// pop from the front (oldest-first), increment `dropped`, and return
716    /// the dropped messages so the caller can release any payload handles
717    /// they reference. `cap` of `None` means unbounded.
718    ///
719    /// Returns [`PushBufferedResult`] carrying both the dropped messages
720    /// (for refcount release) and whether this push triggered the FIRST
721    /// overflow event in the current pause cycle (for R1.3.8.c ERROR
722    /// synthesis — the caller schedules a single ERROR per cycle).
723    ///
724    /// Note: refcount management for the message's payload handle is the
725    /// caller's responsibility — see [`Core::queue_notify`] for the
726    /// retain/release discipline. The buffer itself is just a message
727    /// container; refcounts cross the binding boundary.
728    pub(crate) fn push_buffered(&mut self, msg: Message, cap: Option<usize>) -> PushBufferedResult {
729        let mut result = PushBufferedResult::default();
730        if let Self::Paused {
731            buffer,
732            dropped,
733            overflow_reported,
734            ..
735        } = self
736        {
737            buffer.push_back(msg);
738            if let Some(c) = cap {
739                while buffer.len() > c {
740                    if let Some(dropped_msg) = buffer.pop_front() {
741                        result.dropped_msgs.push(dropped_msg);
742                    }
743                    *dropped = dropped.saturating_add(1);
744                }
745            }
746            // R1.3.8.c (Slice F, A3): flag first overflow this cycle.
747            if !result.dropped_msgs.is_empty() && !*overflow_reported {
748                *overflow_reported = true;
749                result.first_overflow_this_cycle = true;
750            }
751        }
752        result
753    }
754
755    /// Snapshot the diagnostic for an R1.3.8.c overflow ERROR synthesis.
756    /// Returns `(dropped_count, lock_held_ns)`. Caller must already know
757    /// the configured cap (it's a Core-global value, not per-PauseState).
758    pub(crate) fn overflow_diagnostic(&self) -> Option<(u32, u64)> {
759        match self {
760            Self::Active => None,
761            Self::Paused {
762                dropped,
763                started_at_ns,
764                ..
765            } => {
766                let lock_held_ns = monotonic_ns().saturating_sub(*started_at_ns);
767                Some((*dropped, lock_held_ns))
768            }
769        }
770    }
771}
772
773/// Return shape for [`PauseState::push_buffered`]. Carries both the dropped
774/// messages (for refcount release) and an "is this the first overflow this
775/// cycle" flag (for R1.3.8.c ERROR synthesis scheduling).
776#[derive(Default)]
777pub(crate) struct PushBufferedResult {
778    pub(crate) dropped_msgs: Vec<Message>,
779    pub(crate) first_overflow_this_cycle: bool,
780}
781
782/// Pending R1.3.8.c overflow ERROR synthesis entry. Recorded by
783/// [`Core::queue_notify`] when the pause buffer first overflows in a cycle;
784/// drained at wave-end after the lock-released call to
785/// [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
786///
787/// `configured_max` is captured at scheduling time rather than read at
788/// drain — the user could change `pause_buffer_cap` between schedule and
789/// drain, and the diagnostic reads "the cap that was in effect when the
790/// overflow happened."
791#[derive(Debug, Clone)]
792pub(crate) struct PendingPauseOverflow {
793    pub(crate) node_id: NodeId,
794    pub(crate) dropped_count: u32,
795    pub(crate) configured_max: usize,
796    pub(crate) lock_held_ns: u64,
797}
798
799/// Errors returnable by [`Core::pause`] and [`Core::resume`].
800#[derive(Error, Debug, Clone, PartialEq)]
801pub enum PauseError {
802    #[error("pause/resume: unknown node {0:?}")]
803    UnknownNode(NodeId),
804}
805
806/// Errors returnable by [`Core::up`] (canonical R1.4.1).
807#[derive(Error, Debug, Clone, PartialEq)]
808pub enum UpError {
809    /// Node id is not registered.
810    #[error("up: unknown node {0:?}")]
811    UnknownNode(NodeId),
812    /// Tier-3 (DATA / RESOLVED) and tier-5 (COMPLETE / ERROR) are
813    /// downstream-only per R1.4.1; rejected at the boundary.
814    #[error(
815        "up: tier {tier} is forbidden upstream — value (tier 3) and \
816         terminal-lifecycle (tier 5) planes are downstream-only per R1.4.1"
817    )]
818    TierForbidden { tier: u8 },
819}
820
821/// Errors returnable by [`Core::register`] and its sugar wrappers
822/// ([`Core::register_state`], [`Core::register_producer`],
823/// [`Core::register_derived`], [`Core::register_dynamic`],
824/// [`Core::register_operator`]).
825///
826/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
827/// errors so that callers can recover from contract violations without
828/// process abort. Every variant corresponds to a construction-time
829/// invariant that the caller is responsible for upholding; the dispatcher
830/// rejects the registration before any reactive state is created (so
831/// there is no `Message::Error` channel through which to surface the
832/// failure — these are imperative-layer errors, not reactive ones).
833///
834/// All variants are zero-side-effect: when [`Core::register`] returns
835/// `Err`, no node has been added to the graph and any handle retains
836/// taken on the way in (e.g. operator scratch seed retains via
837/// [`BindingBoundary::retain_handle`]) have been released.
838#[derive(Error, Debug, Clone, PartialEq, Eq)]
839pub enum RegisterError {
840    /// One of the supplied dep ids is not a registered node.
841    #[error("register: unknown dep {0:?}")]
842    UnknownDep(NodeId),
843
844    /// `op` was supplied (operator node) but `deps` was empty. Operator
845    /// nodes need at least one dep — for subscription-managed combinators
846    /// with no declared deps, use [`Core::register_producer`] instead.
847    #[error(
848        "register: operator nodes require at least one dep — \
849         use register_producer for subscription-managed combinators"
850    )]
851    OperatorWithoutDeps,
852
853    /// [`NodeOpts::initial`] was set to a real handle but the registration
854    /// shape is not a state node (state nodes are `deps.is_empty() &&
855    /// fn_id.is_none() && op.is_none()`). Initial cache only makes sense
856    /// for state nodes.
857    #[error("register: NodeOpts::initial only valid for state nodes (no deps + no fn + no op)")]
858    InitialOnlyForStateNodes,
859
860    /// A supplied dep is terminal (COMPLETE / ERROR) AND not
861    /// resubscribable. Adding it would create a permanent wedge — the dep
862    /// will never re-emit, so the registered node would be stuck.
863    /// Mirrors [`SetDepsError::TerminalDep`] at registration time.
864    #[error(
865        "register: dep {0:?} is terminal and not resubscribable; \
866         mark it resubscribable before terminating, or remove it from the dep list"
867    )]
868    TerminalDep(NodeId),
869
870    /// A stateful operator ([`OperatorOp::Scan`] / [`OperatorOp::Reduce`])
871    /// was registered with `seed = NO_HANDLE`. R2.5.3 first-run gate
872    /// requires the seed to be a real handle so that the operator can
873    /// emit on its first fire.
874    #[error("register: operator seed must be a real handle (R2.5.3); got NO_HANDLE")]
875    OperatorSeedSentinel,
876}
877
878/// Errors returnable by [`Core::set_pausable_mode`].
879///
880/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
881/// errors. Same imperative-layer error model as [`RegisterError`].
882#[derive(Error, Debug, Clone, PartialEq, Eq)]
883pub enum SetPausableModeError {
884    /// `node_id` is not a registered node.
885    #[error("set_pausable_mode: unknown node {0:?}")]
886    UnknownNode(NodeId),
887    /// The node currently holds at least one pause lock. Changing pausable
888    /// mode mid-pause would lose buffered content or strand a
889    /// `pending_wave` flag — resume all locks first.
890    #[error(
891        "set_pausable_mode: cannot change pausable mode while paused; \
892         resume all locks first"
893    )]
894    WhilePaused,
895}
896
897/// Per-dep record. Replaces the parallel `deps` / `dep_handles` /
898/// `dep_terminals` vectors from v1. Canonical spec R2.9.b alignment.
899///
900/// Each entry tracks one dep's lifecycle state, wave-scoped batch data,
901/// and cross-wave `prev_data` for `ctx.prevData` access.
902pub(crate) struct DepRecord {
903    /// The dep node this record tracks.
904    pub(crate) node: NodeId,
905    /// Last DATA handle from the end of the previous wave. [`NO_HANDLE`]
906    /// means the dep has never emitted DATA.
907    pub(crate) prev_data: HandleId,
908    /// Per-dep dirty flag — awaiting DATA/RESOLVED for current wave.
909    pub(crate) dirty: bool,
910    /// Per-dep involved-this-wave flag. Distinguishes:
911    /// - `involved && data_batch.is_empty()` → dep settled RESOLVED
912    /// - `!involved && data_batch.is_empty()` → dep was not in this wave
913    pub(crate) involved_this_wave: bool,
914    /// DATA handles accumulated this wave. Outside `batch()` scope, at most
915    /// 1 element. Inside `batch()`, K emits on the source produce K entries
916    /// per R1.3.6.b coalescing. Each handle holds a `retain_handle` share
917    /// taken at `deliver_data_to_consumer` time; released at wave-end
918    /// rotation in `clear_wave_state`.
919    pub(crate) data_batch: SmallVec<[HandleId; 1]>,
920    /// Terminal state for this dep. `None` = dep is live.
921    /// `Some` = dep emitted COMPLETE/ERROR. When ALL entries are Some,
922    /// the node auto-cascades per Lock 2.B (ERROR dominates COMPLETE).
923    pub(crate) terminal: Option<TerminalKind>,
924}
925
926impl DepRecord {
927    fn new(node: NodeId) -> Self {
928        Self {
929            node,
930            prev_data: NO_HANDLE,
931            dirty: false,
932            involved_this_wave: false,
933            data_batch: SmallVec::new(),
934            terminal: None,
935        }
936    }
937}
938
939/// Internal node record. Mirrors `core.ts:132–154` post-D030 unification.
940///
941/// **Kind is derived, not stored** (D030, Slice D). `(dep_records.is_empty(),
942/// fn_id, op, is_dynamic)` uniquely identifies the kind — see [`NodeKind`].
943/// Helper methods (`is_state()`, `is_producer()`, `is_compute()`,
944/// `is_operator()`, `skips_auto_cascade()`, `kind()`) cover the common
945/// predicates without unpacking via [`Core::kind_of`].
946///
947/// The 5 bool fields (`has_fired_once`, `dirty`, `involved_this_wave`,
948/// `has_received_teardown`, `resubscribable`, `is_dynamic`) each represent
949/// an orthogonal concern. `is_dynamic` is constant per node (set at
950/// register time); the others are mutable lifecycle state. Collapsing
951/// them into a bitfield would obscure intent.
952#[allow(clippy::struct_excessive_bools)]
953pub(crate) struct NodeRecord {
954    /// Per-dep records. Replaces the old parallel `deps` / `dep_handles` /
955    /// `dep_terminals` vecs. Dep NodeIds derived via `dep_ids()`.
956    pub(crate) dep_records: Vec<DepRecord>,
957    /// User-fn id for closure-form dispatch. `Some` for Derived / Dynamic /
958    /// Producer; `None` for State / Operator. (Operator dispatch goes via
959    /// [`Self::op`] instead.)
960    pub(crate) fn_id: Option<FnId>,
961    /// Operator discriminant for typed-op dispatch. `Some` for Operator
962    /// nodes; `None` otherwise. Mutually exclusive with `fn_id` (a node is
963    /// either closure-form OR typed-op, never both).
964    pub(crate) op: Option<OperatorOp>,
965    /// True for Dynamic nodes (R2.5.3 — fn declares actually-tracked dep
966    /// indices per fire). False for everything else. Only meaningful when
967    /// `fn_id.is_some()` AND `!dep_records.is_empty()`.
968    pub(crate) is_dynamic: bool,
969    pub(crate) equals: EqualsMode,
970
971    // Mutable state
972    pub(crate) cache: HandleId,
973    pub(crate) has_fired_once: bool,
974    pub(crate) subscribers: HashMap<SubscriptionId, Sink>,
975    /// Monotonic counter bumped on every mutation of [`Self::subscribers`]
976    /// (insert on subscribe, remove on `Subscription::Drop`, remove on
977    /// handshake-panic cleanup). Used by
978    /// [`crate::batch::Core::queue_notify`] to detect mid-wave subscriber-
979    /// set changes and start a fresh `PendingBatch` with an updated sink
980    /// snapshot — closes D2 (Slice X4, 2026-05-08): the late-subscriber
981    /// and multi-emit-per-wave gap where the pre-fix per-node single
982    /// snapshot meant a sub installed between two emits to the same node
983    /// in one wave was invisible to the second emit's flush.
984    ///
985    /// Per-node (not per-Core) so that a subscribe to node A doesn't
986    /// invalidate snapshot reuse for node B's pending batch in the same
987    /// wave.
988    pub(crate) subscribers_revision: u64,
989    /// For dynamic nodes: which dep indices fn actually tracks.
990    /// For static derived: all indices, populated at construction.
991    pub(crate) tracked: HashSet<usize>,
992
993    // Wave-scoped state — cleared at wave end.
994    pub(crate) dirty: bool,
995    pub(crate) involved_this_wave: bool,
996
997    /// Per-node pause state. Default `Active`. See [`PauseState`].
998    pub(crate) pause_state: PauseState,
999    /// Pause behavior mode (canonical-spec §2.6). Set at registration via
1000    /// [`NodeOpts::pausable`]. Default [`PausableMode::Default`] suppresses
1001    /// fn-fire while paused and consolidates N pause-window dep deliveries
1002    /// into one fn-fire on RESUME; `ResumeAll` buffers tier-3/4 outgoing
1003    /// for verbatim replay; `Off` ignores PAUSE entirely. See
1004    /// [`PausableMode`].
1005    pub(crate) pausable: PausableMode,
1006    /// Replay buffer cap (R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
1007    /// `None` disables; `Some(N)` keeps a circular VecDeque of the last N
1008    /// DATA-handle emissions for late-subscriber replay. Each handle in
1009    /// the buffer owns one binding-side retain share, released on evict
1010    /// (cap exceeded) or in `Drop for CoreState`.
1011    pub(crate) replay_buffer_cap: Option<usize>,
1012    pub(crate) replay_buffer: VecDeque<HandleId>,
1013
1014    /// Terminal lifecycle state for THIS node's outgoing stream. Once set,
1015    /// further `emit` calls are silent no-ops, fn no longer fires, and only
1016    /// the terminal message has been queued downstream.
1017    pub(crate) terminal: Option<TerminalKind>,
1018    /// True after the first TEARDOWN has been processed for this node
1019    /// (R2.6.4 / Lock 6.F). Subsequent TEARDOWN deliveries are idempotent
1020    /// — the auto-prepended COMPLETE only fires on the first one. Without
1021    /// this flag, a redundant TEARDOWN delivered via the cascade plus an
1022    /// explicit `core.teardown(node)` would re-emit `[COMPLETE, TEARDOWN]`
1023    /// to subscribers per delivery, which is incorrect.
1024    pub(crate) has_received_teardown: bool,
1025    /// Per R2.2.7 / R2.5.3 — resubscribable terminal lifecycle.
1026    /// When `true` AND `terminal == Some(...)`, a fresh subscribe call
1027    /// will reset the node: clear `terminal`, `has_fired_once`,
1028    /// `has_received_teardown`, all dep_records to sentinel, and drain the
1029    /// pause lockset. Default `false`.
1030    pub(crate) resubscribable: bool,
1031    /// Meta companion nodes attached to this node per R1.3.9.d. When this
1032    /// node tears down, its meta companions are torn down FIRST (before
1033    /// the main node's auto-COMPLETE + TEARDOWN wire emission), so
1034    /// observers see companions terminate before the parent. The ordering
1035    /// is load-bearing — meta nodes typically subscribe to parent state
1036    /// that becomes inconsistent during the parent's destruction phase.
1037    pub(crate) meta_companions: Vec<NodeId>,
1038    /// R5.4 / D011 partial-mode: when `true`, fire_fn skips the R2.5.3
1039    /// first-run gate — the node fires as soon as ANY dep delivers a
1040    /// real handle, even if other deps remain sentinel. Defaults to
1041    /// `false` (gated). Lifted into Core for operator support; for
1042    /// State/Derived/Dynamic nodes the field is settable but the gated
1043    /// path remains the typical caller default.
1044    pub(crate) partial: bool,
1045    /// Generic per-operator scratch slot (Slice C-3, D026). Replaces
1046    /// the typed `operator_state: HandleId` field used by Slices C-1 / C-2.
1047    /// `None` for non-operator kinds and operators with no cross-wave
1048    /// state (Map / Filter / Combine / WithLatestFrom / Merge); `Some`
1049    /// for stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
1050    /// [`DistinctUntilChanged`] / [`Pairwise`] / [`Take`] / [`Skip`] /
1051    /// [`TakeWhile`] / [`Last`]).
1052    ///
1053    /// The boxed value implements
1054    /// [`OperatorScratch`](crate::op_state::OperatorScratch); its
1055    /// `release_handles` method is called from
1056    /// [`reset_for_fresh_lifecycle`] (resubscribable terminal cycle) and
1057    /// from [`Drop for CoreState`].
1058    ///
1059    /// **Refcount discipline:** the state struct owns whatever handle
1060    /// shares it stores (e.g., [`ScanState::acc`](crate::op_state::ScanState::acc),
1061    /// [`LastState::latest`](crate::op_state::LastState::latest)).
1062    /// Per-fire helpers retain the new value before releasing the old;
1063    /// `release_handles` releases the current shares at end-of-life.
1064    pub(crate) op_scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1065}
1066
1067impl NodeRecord {
1068    // ---- Kind predicates (D030 — derived from field shape) ----
1069
1070    /// True iff this is a state node (no deps, no fn, no op).
1071    pub(crate) fn is_state(&self) -> bool {
1072        self.dep_records.is_empty() && self.fn_id.is_none() && self.op.is_none()
1073    }
1074
1075    /// True iff this is a producer node (no deps + has fn + no op).
1076    /// Producers fire fn once on first subscribe; cleanup fires via
1077    /// [`BindingBoundary::producer_deactivate`] (D031, Slice D).
1078    pub(crate) fn is_producer(&self) -> bool {
1079        self.dep_records.is_empty() && self.fn_id.is_some() && self.op.is_none()
1080    }
1081
1082    /// True iff this is a compute node (Derived / Dynamic / Operator) —
1083    /// has at least one dep AND either a fn or an op.
1084    #[allow(dead_code)] // Convenience predicate; callers may use is_state/is_producer instead.
1085    pub(crate) fn is_compute(&self) -> bool {
1086        !self.dep_records.is_empty() && (self.fn_id.is_some() || self.op.is_some())
1087    }
1088
1089    /// True iff this is an Operator node (has op set).
1090    #[allow(dead_code)] // Direct `op.is_some()` is more common; this is a readability sugar.
1091    pub(crate) fn is_operator(&self) -> bool {
1092        self.op.is_some()
1093    }
1094
1095    /// True iff this node opts OUT of Lock 2.B auto-cascade —
1096    /// Operator(Reduce) / Operator(Last) intercept upstream COMPLETE.
1097    pub(crate) fn skips_auto_cascade(&self) -> bool {
1098        match self.op {
1099            Some(op) => NodeKind::Operator(op).skips_auto_cascade(),
1100            None => false,
1101        }
1102    }
1103
1104    /// Compute the public-API [`NodeKind`] from the field shape (D030).
1105    /// Used by [`Core::kind_of`] and rare internal sites that need the
1106    /// enum (most use the predicate methods above).
1107    pub(crate) fn kind(&self) -> NodeKind {
1108        if let Some(op) = self.op {
1109            NodeKind::Operator(op)
1110        } else if self.dep_records.is_empty() {
1111            if self.fn_id.is_some() {
1112                NodeKind::Producer
1113            } else {
1114                NodeKind::State
1115            }
1116        } else if self.is_dynamic {
1117            NodeKind::Dynamic
1118        } else {
1119            NodeKind::Derived
1120        }
1121    }
1122
1123    // ---- Existing accessors ----
1124
1125    /// Iterator over dep NodeIds in declaration order.
1126    pub(crate) fn dep_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
1127        self.dep_records.iter().map(|r| r.node)
1128    }
1129
1130    /// Collected dep NodeIds — for call sites that need a `Vec<NodeId>`.
1131    pub(crate) fn dep_ids_vec(&self) -> Vec<NodeId> {
1132        self.dep_ids().collect()
1133    }
1134
1135    /// Number of deps.
1136    pub(crate) fn dep_count(&self) -> usize {
1137        self.dep_records.len()
1138    }
1139
1140    /// True if any dep is in sentinel state (never emitted DATA and no
1141    /// data this wave). Replaces the old `dep_handles.contains(&NO_HANDLE)`.
1142    pub(crate) fn has_sentinel_deps(&self) -> bool {
1143        self.dep_records
1144            .iter()
1145            .any(|r| r.prev_data == NO_HANDLE && r.data_batch.is_empty())
1146    }
1147
1148    /// Find the index of a dep by NodeId.
1149    pub(crate) fn dep_index_of(&self, dep_id: NodeId) -> Option<usize> {
1150        self.dep_records.iter().position(|r| r.node == dep_id)
1151    }
1152
1153    /// True if ALL dep terminal slots are populated (Lock 2.B cascade check).
1154    pub(crate) fn all_deps_terminal(&self) -> bool {
1155        !self.dep_records.is_empty() && self.dep_records.iter().all(|r| r.terminal.is_some())
1156    }
1157}
1158
1159/// All mutable Core state, behind one [`parking_lot::Mutex`].
1160///
1161/// v1 single-mutex; per-subgraph `ReentrantMutex` parallelism is a later
1162/// optimization (CLAUDE.md Rust invariant 3).
1163pub(crate) struct CoreState {
1164    pub(crate) next_node_id: u64,
1165    pub(crate) next_subscription_id: u64,
1166    pub(crate) next_lock_id: u64,
1167    pub(crate) nodes: HashMap<NodeId, NodeRecord>,
1168    /// Inverted adjacency: `parent → children`. Updated on registration.
1169    pub(crate) children: HashMap<NodeId, HashSet<NodeId>>,
1170    /// Nodes whose fn we owe a fire to — drained by [`Core::run_wave`].
1171    pub(crate) pending_fires: HashSet<NodeId>,
1172    /// Per-node outgoing message buffer; flushed at wave end. Insertion-
1173    /// ordered so flush order is deterministic — load-bearing for
1174    /// R1.3.9.d meta-TEARDOWN ordering: when a parent and its meta
1175    /// companion both have queued messages in the same wave, the meta
1176    /// (queued first via `teardown_inner`'s recursion order) flushes
1177    /// first.
1178    ///
1179    /// Each entry carries the per-wave subscriber snapshot taken at first
1180    /// touch (Slice A close, M1: lock-released drain). Late subscribers
1181    /// installed mid-wave between fn-fire iterations don't appear in
1182    /// already-snapshotted entries; this is the load-bearing fix that
1183    /// prevents duplicate-Data delivery when a handshake delivers the
1184    /// post-commit cache and the wave's flush would otherwise also fire
1185    /// to the same sink.
1186    pub(crate) pending_notify: IndexMap<NodeId, PendingPerNode>,
1187    pub(crate) in_tick: bool,
1188    /// Core-global cap on per-node pause replay buffer length. `None` means
1189    /// unbounded. Per the user direction (Q1, 2026-05-05): start core-global;
1190    /// per-node override can be added later as a pure addition without API
1191    /// breakage. Default `None`.
1192    pub(crate) pause_buffer_cap: Option<usize>,
1193    /// Core-global cap on wave-drain iterations before
1194    /// [`crate::batch::Core::drain_and_flush`] aborts with a diagnostic panic.
1195    /// Replaces the prior `MAX_DRAIN_ITERATIONS` hard-coded constant
1196    /// (R4.3 / Lock 2.F′). Default `10_000`.
1197    ///
1198    /// The drain loop bound exists to surface runtime cycles
1199    /// (e.g. an operator that re-arms its own `pending_fires` slot during
1200    /// `invoke_fn`) as a panic with context, rather than letting Core
1201    /// spin forever. Structural cycles via [`Core::set_deps`] are
1202    /// rejected at edge-mutation time (`SetDepsError::WouldCreateCycle`);
1203    /// registration is structurally cycle-safe by construction (the new
1204    /// node's id is not allocated until AFTER deps are validated, so deps
1205    /// cannot transitively reach the new node). The drain bound is the
1206    /// safety net for runtime cycles that bypass both static checks.
1207    pub(crate) max_batch_drain_iterations: u32,
1208    /// Deferred sink-fire jobs collected by `flush_notifications`. The wave
1209    /// engine populates this under the state lock during the flush phase;
1210    /// `run_wave` then drops the lock and fires the jobs. Each tuple is
1211    /// `(sinks_for_one_node_one_phase, phase_messages)`. Empty between waves.
1212    pub(crate) deferred_flush_jobs: crate::batch::DeferredJobs,
1213    /// Payload-handle releases owed for messages that landed in
1214    /// `pending_notify` during this wave (one per `payload_handle()`).
1215    /// `run_wave` releases these after sinks fire and the lock is dropped,
1216    /// balancing the retain done in `queue_notify`.
1217    pub(crate) deferred_handle_releases: Vec<HandleId>,
1218    /// Binding-boundary handle for `Drop`-time refcount balancing.
1219    /// `Core` also holds a clone of this Arc; storing it here lets
1220    /// `Drop for CoreState` walk every retained slot and release the
1221    /// binding-side share when the last `Core` clone drops. Without this,
1222    /// `cache` / `terminal` / `dep_terminals` Error / pause-buffer payload
1223    /// handle refs leak in the binding registry until process exit.
1224    pub(crate) binding: Arc<dyn BindingBoundary>,
1225    /// Pre-wave cache snapshots used to restore state if the wave aborts
1226    /// mid-flight (e.g., a `Core::batch` closure panics). Each entry is
1227    /// `(node_id → old_cache_handle)` — the handle the node held BEFORE
1228    /// the wave started writing to it. The snapshotted handle holds a
1229    /// retain (taken when the snapshot was inserted) so it stays alive
1230    /// for restoration. On wave success, snapshots are dropped and their
1231    /// retains released. On wave abort (`BatchGuard::drop` panic-discard
1232    /// path), each cache slot is restored from the snapshot — the slot's
1233    /// current handle is released, and the snapshot's retain transfers
1234    /// to the cache slot. Only populated for in-flight waves; empty
1235    /// between waves.
1236    pub(crate) wave_cache_snapshots: HashMap<NodeId, HandleId>,
1237    /// Nodes that need an auto-Resolved at wave end if they don't receive
1238    /// a tier-3+ message from their own commit_emission. Populated by
1239    /// the RESOLVED child propagation in `commit_emission` (which queues
1240    /// Dirty but defers Resolved to avoid double-settlement). Drained by
1241    /// the auto-resolve sweep in `drain_and_flush`. Cleared by
1242    /// `clear_wave_state`.
1243    pub(crate) pending_auto_resolve: ahash::AHashSet<NodeId>,
1244    /// Topology-change sinks. Keyed by subscription id for O(1) removal.
1245    pub(crate) topology_sinks: HashMap<u64, crate::topology::TopologySink>,
1246    pub(crate) next_topology_id: u64,
1247    /// A6 reentrancy guard (Slice F, 2026-05-07): the stack of NodeIds whose
1248    /// fn is currently being invoked on the wave-owner thread. Pushed at the
1249    /// top of `fire_fn` (just before the lock-released `invoke_fn` call) and
1250    /// popped on return / unwind via the [`crate::batch::FiringGuard`] RAII
1251    /// helper. [`Core::set_deps`] consults this set and rejects with
1252    /// [`SetDepsError::ReentrantOnFiringNode`] if `n` is currently firing —
1253    /// preventing the D1 `tracked` index corruption (see
1254    /// `porting-deferred.md` "Set_deps from inside firing node's fn corrupts
1255    /// Dynamic `tracked` indices").
1256    ///
1257    /// Stack rather than set so nested fn re-entrance (Producer subscribing
1258    /// to a fn that itself fires another fn) tracks every concurrently-firing
1259    /// node on the wave-owner. `Vec` rather than `HashSet` because the
1260    /// expected depth is small (typically 1, occasionally 2–3 with
1261    /// higher-order operators) and linear scan is faster than hash for that
1262    /// size.
1263    pub(crate) currently_firing: Vec<NodeId>,
1264    /// R1.3.8.c pause-overflow ERROR synthesis queue (Slice F, A3 —
1265    /// 2026-05-07). Recorded by [`Core::queue_notify`] when the pause
1266    /// buffer first overflows in a cycle; drained at wave-end after the
1267    /// lock-released call to
1268    /// [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
1269    ///
1270    /// One entry per (node × pause-cycle); subsequent overflows in the
1271    /// same cycle don't re-queue (gated by `PauseState::overflow_reported`).
1272    pub(crate) pending_pause_overflow: Vec<PendingPauseOverflow>,
1273    /// Slice G (R1.3.2.d / R1.3.3.a — 2026-05-07): nodes that have emitted
1274    /// at least one tier-3 message (Data or Resolved) in the CURRENT wave.
1275    /// Wave-scoped (cleared in `clear_wave_state`). Used by
1276    /// [`crate::batch::Core::commit_emission`] to detect "this is a
1277    /// subsequent emit at this node in the same wave" — when set,
1278    /// equals substitution is skipped (would produce a R1.3.3.a-violating
1279    /// mixed wave) and any prior Resolved entries in pending_notify or
1280    /// the pause buffer are rewritten to Data using the wave-start cache
1281    /// snapshot.
1282    ///
1283    /// Distinct from `pending_pause_overflow` (per-pause-cycle, not
1284    /// per-wave) and `wave_cache_snapshots` (per-wave snapshot, but only
1285    /// populated on Data path pre-Slice-G). Populated by both Data and
1286    /// Resolved branches of `commit_emission`; NOT populated by
1287    /// `commit_emission_verbatim` (Batch path passes through verbatim
1288    /// per R1.3.3.c).
1289    pub(crate) tier3_emitted_this_wave: ahash::AHashSet<NodeId>,
1290    /// Slice E2 (R1.3.9.b strict reading per D057): per-wave-per-node
1291    /// dedup for `OnInvalidate` cleanup hook firing. A node already in
1292    /// this set this wave has already had its `OnInvalidate` queued into
1293    /// `deferred_cleanup_hooks` and MUST NOT queue again, even if
1294    /// `invalidate_inner` re-encounters it (rare: only matters when the
1295    /// node re-populates mid-wave via fn-fire and then gets re-invalidated
1296    /// in the same wave through a separate path).
1297    ///
1298    /// Cleared in [`CoreState::clear_wave_state`] alongside the other
1299    /// wave-scoped queues.
1300    pub(crate) invalidate_hooks_fired_this_wave: ahash::AHashSet<NodeId>,
1301    /// Slice E2 (per D060/D061): lock-released drain queue for
1302    /// `OnInvalidate` cleanup hooks. Populated under the state lock by
1303    /// `Core::invalidate_inner` when a node's cache transitions
1304    /// `!= NO_HANDLE → NO_HANDLE`; drained after the lock drops at wave
1305    /// boundary by [`crate::batch::Core::fire_deferred_cleanup_hooks`]
1306    /// (each call wrapped in `catch_unwind` so a single binding panic
1307    /// doesn't short-circuit the drain — last panic re-raises after the
1308    /// loop completes per D060).
1309    ///
1310    /// **Panic-discard semantics (D061):** cleared in
1311    /// [`CoreState::clear_wave_state`] without firing — a panic-discarded
1312    /// wave drops the queued cleanup hooks silently, mirroring the
1313    /// `pending_pause_overflow` precedent (Slice F /qa A3). Bindings using
1314    /// `OnInvalidate` for external-resource cleanup MUST idempotent-cleanup
1315    /// at process exit or next successful invalidate cycle.
1316    pub(crate) deferred_cleanup_hooks: Vec<(NodeId, crate::boundary::CleanupTrigger)>,
1317    /// Slice E2 /qa Q2(b) (D069): lock-released drain queue for
1318    /// `BindingBoundary::wipe_ctx` calls fired eagerly from
1319    /// `Core::terminate_node` when a resubscribable node terminates with
1320    /// no live subscribers. Pairs with the `Subscription::Drop` direct-
1321    /// fire site (mutually exclusive: subs-empty-at-terminate routes
1322    /// here; subs-non-empty-at-terminate fires from Subscription::Drop's
1323    /// last-sub-drop path). Drained alongside `deferred_cleanup_hooks`
1324    /// at wave boundary; same `catch_unwind` discipline so a single
1325    /// binding panic doesn't short-circuit the drain. Same panic-discard
1326    /// semantics as `deferred_cleanup_hooks` (silent drop on
1327    /// panic-discarded waves).
1328    pub(crate) pending_wipes: Vec<NodeId>,
1329}
1330
1331/// The handle-protocol Core dispatcher.
1332///
1333/// Holds an [`Arc`] to the [`BindingBoundary`] and all dispatch state. Cheap
1334/// to clone (the inner `Arc<Mutex<CoreState>>` is shared); pass `Core` by
1335/// value to threads.
1336///
1337/// # Wave-owner re-entrant mutex (Slice A close /qa, M1)
1338///
1339/// The state lock (`state: Mutex<CoreState>`) is **dropped** around binding
1340/// callbacks (`invoke_fn`, `custom_equals`) so user fns may re-enter Core.
1341/// To preserve serializability of WAVE EXECUTION across threads — without
1342/// re-introducing the lock-held-during-fn-fire deadlock the Slice A close
1343/// refactor lifted — the wave engine acquires `wave_owner` (a
1344/// [`parking_lot::ReentrantMutex`]) for the lifetime of each wave.
1345///
1346/// Properties:
1347///
1348/// - **Same-thread re-entrance is free.** A user fn that calls back into
1349///   `Core::emit` / `Core::pause` / etc. mid-fire re-acquires `wave_owner`
1350///   on the same thread and runs as a nested wave (the inner `run_wave`
1351///   sees `in_tick=true` and skips drain — outer drain picks up).
1352/// - **Cross-thread emits BLOCK** at `wave_owner.lock_arc()` until the
1353///   in-flight wave completes (drain + flush + sink fire all done). This
1354///   serializes wave OWNERSHIP across threads, while still allowing the
1355///   state lock to drop inside the wave for binding callbacks.
1356///
1357/// Without this, Slice A close's lock-released drain let cross-thread
1358/// emits absorb into the in-flight wave's `pending_notify` and return
1359/// before subscribers fire — breaking the user-facing happens-after
1360/// contract that `emit` returning means subscribers have observed.
1361#[derive(Clone)]
1362pub struct Core {
1363    pub(crate) state: Arc<Mutex<CoreState>>,
1364    pub(crate) binding: Arc<dyn BindingBoundary>,
1365    /// Slice X5 (D3 substrate, 2026-05-08) + Slice Y1 / Phase E
1366    /// (wave-engine migration, 2026-05-08): per-subgraph union-find
1367    /// registry. Tracks each registered node's connected-component
1368    /// membership (a "subgraph"). Each component's root carries an
1369    /// `Arc<SubgraphLockBox>` whose `wave_owner: ReentrantMutex<()>`
1370    /// is the per-partition wave-serialization lock — acquired by
1371    /// [`Self::partition_wave_owner_lock_arc`] under the retry-validate
1372    /// loop. Cross-thread emits to disjoint partitions run truly
1373    /// parallel; same-thread re-entry passes through reentrantly.
1374    ///
1375    /// Direct port of [`graphrefly-py`'s
1376    /// `subgraph_locks.py`](https://github.com/graphrefly/graphrefly-py/blob/main/src/graphrefly/core/subgraph_locks.py)
1377    /// design (locked in [`SESSION-rust-port-d3-per-subgraph-parallelism.md`](https://github.com/graphrefly/graphrefly-ts/blob/main/archive/docs/SESSION-rust-port-d3-per-subgraph-parallelism.md)).
1378    pub(crate) registry: Arc<parking_lot::Mutex<crate::subgraph::SubgraphRegistry>>,
1379}
1380
1381/// Weak handle to a [`Core`] — does not contribute to strong refcount.
1382///
1383/// Constructed via [`Core::weak_handle`]; upgraded back to a strong
1384/// [`Core`] via [`WeakCore::upgrade`]. Used by long-lived binding-stored
1385/// closures (notably `ProducerBuildFn`s registered via
1386/// [`graphrefly_operators::ProducerBinding::register_producer_build`])
1387/// to break the BenchBinding → registry → closure → strong-Core cycle
1388/// that would otherwise leak the entire graph state when a `BenchCore`
1389/// drops with active producer registrations.
1390///
1391/// Upgrade on each invocation; if the host `Core` was already dropped,
1392/// `upgrade()` returns `None` and the closure should no-op (the host
1393/// is being torn down, no work to do).
1394#[derive(Clone)]
1395pub struct WeakCore {
1396    state: Weak<Mutex<CoreState>>,
1397    binding: Weak<dyn BindingBoundary>,
1398    registry: Weak<parking_lot::Mutex<crate::subgraph::SubgraphRegistry>>,
1399}
1400
1401impl WeakCore {
1402    /// Try to upgrade back to a strong [`Core`]. Returns `None` if the
1403    /// host `Core`'s strong count has reached zero (i.e. the host
1404    /// `BenchCore` / equivalent owner was dropped).
1405    #[must_use]
1406    pub fn upgrade(&self) -> Option<Core> {
1407        Some(Core {
1408            state: self.state.upgrade()?,
1409            binding: self.binding.upgrade()?,
1410            registry: self.registry.upgrade()?,
1411        })
1412    }
1413}
1414
1415/// RAII guard that owns an [`OperatorScratch`] until either (a) the
1416/// caller `take()`s it for installation, or (b) the guard drops on an
1417/// early return / unwind, in which case the scratch's handle retains
1418/// are released via [`OperatorScratch::release_handles`].
1419///
1420/// Slice H /qa F1 + F2 (2026-05-07): closes two related correctness
1421/// gaps in `Core::register`:
1422///
1423/// 1. **TOCTOU window** — the original three-phase split called
1424///    `lock_state()` twice (once for validation, once for insertion),
1425///    so a concurrent `Core::complete(dep)` on a non-resubscribable
1426///    dep could slip in between the two acquisitions and re-create
1427///    the wedge `RegisterError::TerminalDep` was designed to prevent.
1428///    The guard plus a single locked region for both phases closes
1429///    this gap (release runs lock-released because guard variables
1430///    drop in reverse declaration order — guard declared BEFORE
1431///    `lock_state()`, so the lock guard drops first).
1432///
1433/// 2. **Panic-unsafe scratch leak** — without an RAII drop, a panic
1434///    between `make_op_scratch` (Phase 2) and the explicit
1435///    `if let Err(e)` cleanup branch (e.g., `lock_state()` reentrance
1436///    assert, OOM-as-panic on Vec growth in dep iteration) would
1437///    drop the `Box<dyn OperatorScratch>` without releasing the
1438///    seed/default retain. The guard's `Drop` impl releases on any
1439///    unwind path.
1440///
1441/// Lock-discipline: the guard holds `&dyn BindingBoundary` (through
1442/// the `Arc<dyn BindingBoundary>` it borrows from). On `Drop`, it
1443/// invokes `release_handles` lock-released — fires AFTER any
1444/// `MutexGuard<CoreState>` declared later in the same scope drops
1445/// (LIFO destruction order). Mirrors `Core::resume` Phase 2 release
1446/// pattern.
1447struct ScratchReleaseGuard<'a> {
1448    scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1449    binding: &'a dyn BindingBoundary,
1450}
1451
1452impl<'a> ScratchReleaseGuard<'a> {
1453    fn new(
1454        scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1455        binding: &'a dyn BindingBoundary,
1456    ) -> Self {
1457        Self { scratch, binding }
1458    }
1459
1460    /// Take ownership of the scratch — disarms the release-on-drop
1461    /// behavior. Used on the success path to install the scratch on
1462    /// `NodeRecord.op_scratch`.
1463    fn take(mut self) -> Option<Box<dyn crate::op_state::OperatorScratch>> {
1464        self.scratch.take()
1465    }
1466}
1467
1468impl Drop for ScratchReleaseGuard<'_> {
1469    fn drop(&mut self) {
1470        if let Some(mut scratch) = self.scratch.take() {
1471            scratch.release_handles(self.binding);
1472        }
1473    }
1474}
1475
1476impl Core {
1477    /// Construct a fresh Core wired to the given binding. Pause buffer cap
1478    /// defaults to unbounded; set via [`Self::set_pause_buffer_cap`].
1479    #[must_use]
1480    pub fn new(binding: Arc<dyn BindingBoundary>) -> Self {
1481        Self {
1482            state: Arc::new(Mutex::new(CoreState {
1483                next_node_id: 1,
1484                next_subscription_id: 1,
1485                // A4 (Slice F, 2026-05-07): start `next_lock_id` in the high
1486                // half of the u32 range so `alloc_lock_id` can't collide with
1487                // user-supplied `LockId::new(N)` constructors (which the
1488                // napi-rs binding marshals from `u32` and tests typically use
1489                // in the low range, 1..1024). Phase E /qa F1 (2026-05-08):
1490                // lowered from `1u64 << 32` to `1u64 << 31` so the value
1491                // round-trips through `u32::try_from(...)` at the napi
1492                // boundary — the previous seed errored every napi
1493                // `alloc_lock_id` call. Anti-collision intent (high range vs
1494                // low user range) preserved at half the prior ceiling
1495                // (2^31 ≈ 2 billion allocations per Core, ample for parity
1496                // tests). Lift the floor when the deferred BigInt-narrowing
1497                // migration extends `LockId` to `u64` at the FFI layer
1498                // (porting-deferred "BigInt migration for u32-narrowed napi
1499                // types" entry).
1500                next_lock_id: 1u64 << 31,
1501                nodes: HashMap::new(),
1502                children: HashMap::new(),
1503                pending_fires: HashSet::new(),
1504                pending_notify: IndexMap::new(),
1505                in_tick: false,
1506                pause_buffer_cap: None,
1507                max_batch_drain_iterations: 10_000,
1508                deferred_flush_jobs: Vec::new(),
1509                deferred_handle_releases: Vec::new(),
1510                binding: binding.clone(),
1511                wave_cache_snapshots: HashMap::new(),
1512                pending_auto_resolve: ahash::AHashSet::new(),
1513                topology_sinks: HashMap::new(),
1514                next_topology_id: 1,
1515                currently_firing: Vec::new(),
1516                pending_pause_overflow: Vec::new(),
1517                tier3_emitted_this_wave: ahash::AHashSet::new(),
1518                invalidate_hooks_fired_this_wave: ahash::AHashSet::new(),
1519                deferred_cleanup_hooks: Vec::new(),
1520                pending_wipes: Vec::new(),
1521            })),
1522            binding,
1523            registry: Arc::new(parking_lot::Mutex::new(
1524                crate::subgraph::SubgraphRegistry::new(),
1525            )),
1526        }
1527    }
1528
1529    /// Acquire the state lock.
1530    ///
1531    /// Post-Slice-E: `Core::subscribe` fires the per-tier handshake
1532    /// LOCK-RELEASED with `wave_owner` held; sink callbacks may freely
1533    /// re-enter Core (`emit` / `complete` / `error` / nested `subscribe`).
1534    /// Same-thread re-entry passes through `wave_owner`'s `ReentrantMutex`
1535    /// transparently; cross-thread emits block on `wave_owner` until the
1536    /// outer subscribe completes, preserving R1.3.5.a happens-after
1537    /// ordering. The previous `IN_HANDSHAKE_FIRE` panic-diagnostic is no
1538    /// longer needed.
1539    pub(crate) fn lock_state(&self) -> MutexGuard<'_, CoreState> {
1540        self.state.lock()
1541    }
1542
1543    /// Whether `self` and `other` point to the same dispatcher state.
1544    /// True when one was produced by `Clone`-ing the other (or they
1545    /// were both cloned from a common ancestor); false for two
1546    /// independently `Core::new`-constructed instances even with the
1547    /// same binding.
1548    ///
1549    /// Used by `graphrefly-graph`'s `mount` to enforce the "shared-Core
1550    /// only" v1 invariant — cross-Core mount is post-M6.
1551    #[must_use]
1552    pub fn same_dispatcher(&self, other: &Core) -> bool {
1553        Arc::ptr_eq(&self.state, &other.state)
1554    }
1555
1556    /// Downgrade to a [`WeakCore`] handle that doesn't contribute to
1557    /// strong refcount of the underlying state / binding / wave_owner.
1558    ///
1559    /// Used by binding-stored long-lived closures (e.g.
1560    /// `register_producer_build`-stored `ProducerBuildFn`s) to avoid the
1561    /// Arc cycle:
1562    ///
1563    /// ```text
1564    /// BenchBinding → registry → producer_builds[fn_id]
1565    ///   → closure → strong Arc<dyn _Binding> → BenchBinding
1566    /// ```
1567    ///
1568    /// Closures hold `WeakCore` and `Weak<dyn _Binding>` instead, then
1569    /// upgrade-on-fire (returning early if either weak is dangling —
1570    /// indicating the host BenchCore was already dropped). Upgraded
1571    /// strong refs live only for the build closure's invocation; sinks
1572    /// the build closure spawns close over those upgraded strongs and
1573    /// stay alive only while the producer is active (cleared via
1574    /// `producer_deactivate` on last-subscriber unsubscribe).
1575    #[must_use]
1576    pub fn weak_handle(&self) -> WeakCore {
1577        WeakCore {
1578            state: Arc::downgrade(&self.state),
1579            binding: Arc::downgrade(&self.binding),
1580            registry: Arc::downgrade(&self.registry),
1581        }
1582    }
1583
1584    /// Number of distinct connected-component partitions tracked by
1585    /// the per-subgraph union-find registry (Slice X5 substrate).
1586    /// Two threads emitting into nodes with distinct partitions will
1587    /// run truly parallel once Y1 wires the wave engine through the
1588    /// registry; X5 reports the partition count for inspection
1589    /// (acceptance bar + debugging) but the wave engine still uses
1590    /// the legacy Core-level `wave_owner`.
1591    #[must_use]
1592    pub fn partition_count(&self) -> usize {
1593        self.registry.lock().component_count()
1594    }
1595
1596    /// Resolve `node`'s partition identity per the per-subgraph
1597    /// union-find registry (Slice X5 substrate). Two nodes with the
1598    /// same `SubgraphId` are connected via dep edges (transitively)
1599    /// and share a partition lock under Y1+; nodes in different
1600    /// partitions can run truly parallel.
1601    ///
1602    /// Returns `None` for unregistered nodes.
1603    #[must_use]
1604    pub fn partition_of(&self, node: NodeId) -> Option<crate::subgraph::SubgraphId> {
1605        self.registry.lock().partition_of(node)
1606    }
1607
1608    /// Acquire `seed`'s partition `wave_owner` re-entrant mutex with
1609    /// retry-validate against concurrent union/split. Mirrors
1610    /// graphrefly-py's `subgraph_locks.py::lock_for` retry pattern
1611    /// (lines 154–178): a concurrent `union_nodes` may redirect
1612    /// `seed`'s partition root between our `lock_for` resolve and
1613    /// our `lock_arc` call; if so, the held guard is on a stale
1614    /// (but still valid) `SubgraphLockBox` whose `Arc` no longer
1615    /// matches the registry's canonical box for `seed`'s current
1616    /// root. Release + retry up to [`crate::subgraph::MAX_LOCK_RETRIES`].
1617    ///
1618    /// Returns the held guard. Caller holds it for the wave's
1619    /// duration; drop releases.
1620    ///
1621    /// **Panics** if `seed` is not registered (caller violation —
1622    /// every wave entry takes a `NodeId` already in `s.nodes`, and
1623    /// the P12-fixed lock-discipline guarantees registry membership
1624    /// is published atomically with state). **Panics** on exceeding
1625    /// `MAX_LOCK_RETRIES` — pathological union activity.
1626    ///
1627    /// Slice Y1 / Phase E (2026-05-08).
1628    pub(crate) fn partition_wave_owner_lock_arc(&self, seed: NodeId) -> WaveOwnerGuard {
1629        for _ in 0..crate::subgraph::MAX_LOCK_RETRIES {
1630            let lock_box = {
1631                let mut reg = self.registry.lock();
1632                let (_sid, b) = reg.lock_for(seed).expect(
1633                    "partition_wave_owner_lock_arc: seed must be registered \
1634                     (P12-fix invariant: registry membership is published \
1635                     atomically with `s.nodes`)",
1636                );
1637                b
1638            };
1639            let guard = lock_box.wave_owner.lock_arc();
1640            // Re-validate post-acquire. If a concurrent `union` redirected
1641            // `seed`'s root between our `lock_for` and `lock_arc`, the
1642            // registry's current box for `seed` differs from what we hold.
1643            let still_valid = self.registry.lock().lock_for_validate(seed, &lock_box);
1644            if still_valid {
1645                return guard;
1646            }
1647            // Stale — drop guard and retry.
1648            drop(guard);
1649        }
1650        panic!(
1651            "partition_wave_owner_lock_arc: exceeded {} retries for seed {:?} \
1652             — pathological concurrent union/split activity. Mirrors py \
1653             `_MAX_LOCK_RETRIES`.",
1654            crate::subgraph::MAX_LOCK_RETRIES,
1655            seed
1656        );
1657    }
1658
1659    /// BFS from `seed` along `s.children` (downstream consumer cascade
1660    /// for DATA / RESOLVED / INVALIDATE / COMPLETE / ERROR / TEARDOWN)
1661    /// and `meta_companions` (R1.3.9.d TEARDOWN cascade). Collects
1662    /// every partition reachable from `seed`, returning the unique
1663    /// `SubgraphId`s sorted ascending — the canonical lock-acquisition
1664    /// order per session-doc Q7 / decision D092 that guarantees
1665    /// deadlock-freedom across cross-partition waves.
1666    ///
1667    /// Holds the state lock + registry lock for the BFS duration
1668    /// (lock order `state → registry` per the P12-fix invariant).
1669    /// Bounded by the cascade graph reachable from `seed`; for typical
1670    /// apps the partition count is small (1–3) and the BFS is
1671    /// negligible relative to wave drain.
1672    ///
1673    /// Used by [`Core::begin_batch_for`] to compute the upfront-
1674    /// acquired partition set for per-seed waves. Closure-form
1675    /// [`Core::batch`] doesn't have a seed and uses
1676    /// [`Core::all_partitions_lock_boxes`] instead.
1677    ///
1678    /// Slice Y1 / Phase E (2026-05-08).
1679    pub(crate) fn compute_touched_partitions(
1680        &self,
1681        seed: NodeId,
1682    ) -> SmallVec<[crate::subgraph::SubgraphId; 4]> {
1683        let s = self.lock_state();
1684        let mut reg = self.registry.lock();
1685        let mut partitions: SmallVec<[crate::subgraph::SubgraphId; 4]> = SmallVec::new();
1686        let mut visited: HashSet<NodeId> = HashSet::default();
1687        let mut stack: SmallVec<[NodeId; 16]> = SmallVec::new();
1688        stack.push(seed);
1689        while let Some(n) = stack.pop() {
1690            if !visited.insert(n) {
1691                continue;
1692            }
1693            if let Some(p) = reg.partition_of(n) {
1694                if !partitions.contains(&p) {
1695                    partitions.push(p);
1696                }
1697            }
1698            if let Some(children) = s.children.get(&n) {
1699                stack.extend(children.iter().copied());
1700            }
1701            if let Some(rec) = s.nodes.get(&n) {
1702                stack.extend(rec.meta_companions.iter().copied());
1703            }
1704        }
1705        partitions.sort_unstable_by_key(|sid| sid.raw());
1706        partitions
1707    }
1708
1709    /// Snapshot of every currently-existing partition's lock box, in
1710    /// ascending [`crate::subgraph::SubgraphId`] order (canonical
1711    /// lock-acquisition order per session-doc Q7 / D092). Used by
1712    /// closure-form [`Core::batch`] / [`Core::begin_batch`] which
1713    /// don't have a known seed and must serialize against every
1714    /// existing partition.
1715    ///
1716    /// Slice Y1 / Phase E (2026-05-08).
1717    pub(crate) fn all_partitions_lock_boxes(
1718        &self,
1719    ) -> Vec<(
1720        crate::subgraph::SubgraphId,
1721        Arc<crate::subgraph::SubgraphLockBox>,
1722    )> {
1723        self.registry.lock().all_partitions()
1724    }
1725}
1726
1727/// BFS over the undirected dep-edge graph from `start`, optionally
1728/// skipping ONE edge in both directions. Returns every reachable
1729/// [`NodeId`].
1730///
1731/// **Edge convention:** the dep edge `parent → child` represents
1732/// data flow from `parent` (a dep) to `child` (the consumer). It
1733/// appears in `s.children[parent]` as `child`, and in
1734/// `s.nodes[child].dep_records` as `parent`. `skip_edge =
1735/// Some((parent, child))` skips both forward (`parent → child`) and
1736/// backward (`child → parent`) traversals of that edge.
1737///
1738/// Used by Slice Y1 / Phase F (D3 split-eager, 2026-05-09):
1739/// - **P13 widening (pre-removal connectivity):** call with
1740///   `skip_edge = Some((removed_parent, removed_child))` to determine
1741///   if removing the edge would disconnect the partition.
1742/// - **Actual split execution (post-removal):** call with
1743///   `skip_edge = None`; the visited set is the keep-side of the
1744///   split (the side containing `start`).
1745pub(crate) fn bfs_undirected_dep_graph(
1746    s: &CoreState,
1747    start: NodeId,
1748    skip_edge: Option<(NodeId, NodeId)>,
1749) -> HashSet<NodeId> {
1750    let mut visited: HashSet<NodeId> = HashSet::default();
1751    let mut queue: SmallVec<[NodeId; 32]> = SmallVec::new();
1752    queue.push(start);
1753    while let Some(cur) = queue.pop() {
1754        if !visited.insert(cur) {
1755            continue;
1756        }
1757        if let Some(consumers) = s.children.get(&cur) {
1758            for &c in consumers {
1759                let is_skipped = skip_edge.is_some_and(|(sp, sc)| cur == sp && c == sc);
1760                if !is_skipped && !visited.contains(&c) {
1761                    queue.push(c);
1762                }
1763            }
1764        }
1765        if let Some(rec) = s.nodes.get(&cur) {
1766            for d in rec.dep_records.iter().map(|r| r.node) {
1767                let is_skipped = skip_edge.is_some_and(|(sp, sc)| cur == sc && d == sp);
1768                if !is_skipped && !visited.contains(&d) {
1769                    queue.push(d);
1770                }
1771            }
1772        }
1773    }
1774    visited
1775}
1776
1777impl Core {
1778    /// Test-only inspection: number of `PendingBatch`es queued for
1779    /// `node` in the current wave. Used by Slice X4 D2 regression
1780    /// tests to pin the "common case = single batch, no SmallVec
1781    /// spill" perf invariant.
1782    ///
1783    /// Returns `None` if no `pending_notify` entry exists for `node`
1784    /// (no tier-1+ message has been queued for this node yet in this
1785    /// wave). `Some(0)` is unreachable by construction (a vacant
1786    /// entry implies no batches; an occupied entry has at least one).
1787    #[cfg(any(test, debug_assertions))]
1788    #[must_use]
1789    pub fn pending_batch_count(&self, node: NodeId) -> Option<usize> {
1790        self.lock_state()
1791            .pending_notify
1792            .get(&node)
1793            .map(|entry| entry.batches.len())
1794    }
1795
1796    /// Configure the Core-global cap on pause replay buffer length. When set,
1797    /// any per-node pause buffer that would exceed `cap` drops the oldest
1798    /// message(s) from the front; the dropped count is reported back via the
1799    /// resume callback (see [`ResumeReport`]). `None` (default) means
1800    /// unbounded; messages buffer indefinitely until the lockset clears.
1801    pub fn set_pause_buffer_cap(&self, cap: Option<usize>) {
1802        self.lock_state().pause_buffer_cap = cap;
1803    }
1804
1805    /// Configure the replay buffer cap on `node_id` (R2.6.5 / Lock 6.G —
1806    /// Slice E1, 2026-05-07). `None` disables the buffer. `Some(N)` keeps
1807    /// the last `N` DATA emissions in a circular buffer; late subscribers
1808    /// receive them as part of the per-tier handshake (between START and
1809    /// any terminal). Switching from a larger cap to a smaller cap evicts
1810    /// the front of the buffer to fit; switching to `None` drains the
1811    /// buffer entirely. Each evicted/drained handle's retain is released
1812    /// back to the binding.
1813    ///
1814    /// # Panics
1815    ///
1816    /// Panics if `node_id` is not registered.
1817    pub fn set_replay_buffer_cap(&self, node_id: NodeId, cap: Option<usize>) {
1818        // QA A7 (2026-05-07): normalize `Some(0)` to `None`. Two ways to
1819        // express "disabled" is confusing: `push_replay_buffer` already
1820        // treats `Some(0)` as no-op, so persisting it adds nothing.
1821        let cap = match cap {
1822            Some(0) => None,
1823            other => other,
1824        };
1825        let to_release: Vec<HandleId> = {
1826            let mut s = self.lock_state();
1827            let rec = s.require_node_mut(node_id);
1828            rec.replay_buffer_cap = cap;
1829            match cap {
1830                None => rec.replay_buffer.drain(..).collect(),
1831                Some(c) => {
1832                    let mut drained = Vec::new();
1833                    while rec.replay_buffer.len() > c {
1834                        if let Some(h) = rec.replay_buffer.pop_front() {
1835                            drained.push(h);
1836                        }
1837                    }
1838                    drained
1839                }
1840            }
1841        };
1842        for h in to_release {
1843            self.binding.release_handle(h);
1844        }
1845    }
1846
1847    /// Reconfigure the pause mode for `node_id` (canonical §2.6 — Slice F
1848    /// audit close, 2026-05-07). Default for new nodes is
1849    /// [`PausableMode::Default`]; switch to [`PausableMode::ResumeAll`]
1850    /// for nodes whose pause-window emit history must be observable
1851    /// verbatim, or [`PausableMode::Off`] for nodes intrinsically
1852    /// pause-immune.
1853    ///
1854    /// # Errors
1855    ///
1856    /// - [`SetPausableModeError::UnknownNode`] — `node_id` is not
1857    ///   registered.
1858    /// - [`SetPausableModeError::WhilePaused`] — the node currently
1859    ///   holds at least one pause lock. Changing mode mid-pause would
1860    ///   lose buffered content or strand a `pending_wave` flag — resume
1861    ///   all locks first.
1862    pub fn set_pausable_mode(
1863        &self,
1864        node_id: NodeId,
1865        mode: PausableMode,
1866    ) -> Result<(), SetPausableModeError> {
1867        let mut s = self.lock_state();
1868        let rec = s
1869            .nodes
1870            .get_mut(&node_id)
1871            .ok_or(SetPausableModeError::UnknownNode(node_id))?;
1872        if rec.pause_state.is_paused() {
1873            return Err(SetPausableModeError::WhilePaused);
1874        }
1875        rec.pausable = mode;
1876        Ok(())
1877    }
1878
1879    /// Configure the wave-drain iteration cap (R4.3 / Lock 2.F′). The wave
1880    /// engine aborts a drain after `cap` iterations with a diagnostic panic.
1881    /// Default is `10_000` — high enough to avoid false positives on legitimate
1882    /// fan-in cascades, low enough to surface runtime cycles within seconds.
1883    ///
1884    /// Lower this only when running adversarial / property-based tests that
1885    /// want fast cycle detection. Raise it only with concrete evidence that a
1886    /// legitimate workload needs more iterations than the default — and even
1887    /// then, prefer to tune the workload (per-subgraph batching, etc.) over
1888    /// raising the cap.
1889    ///
1890    /// # Panics
1891    ///
1892    /// Panics if `cap == 0` — a zero cap would abort every wave on the very
1893    /// first iteration, deadlocking any subsequent dispatcher work.
1894    pub fn set_max_batch_drain_iterations(&self, cap: u32) {
1895        assert!(cap > 0, "max_batch_drain_iterations must be > 0");
1896        self.lock_state().max_batch_drain_iterations = cap;
1897    }
1898
1899    /// Send a message UPSTREAM from `node_id` to each of its declared deps
1900    /// (canonical R1.4.1 — Slice F audit, F2 / 2026-05-07).
1901    ///
1902    /// The dispatcher rejects tier-3 (DATA / RESOLVED) and tier-5
1903    /// (COMPLETE / ERROR) per R1.4.1: value and terminal-lifecycle planes
1904    /// are downstream-only. All other tiers (0 START, 1 DIRTY, 2 PAUSE /
1905    /// RESUME, 4 INVALIDATE, 6 TEARDOWN) pass.
1906    ///
1907    /// # Routing per tier
1908    ///
1909    /// - **Tier 0 ([`Message::Start`]):** no-op. START is a per-subscription
1910    ///   handshake, not a routable wire signal — sending it upstream has no
1911    ///   well-defined target.
1912    /// - **Tier 1 ([`Message::Dirty`]):** no-op. The dep's "something
1913    ///   changed" notification is its own [`Self::emit`] / commit
1914    ///   responsibility; ignoring upstream DIRTY hints is safe.
1915    /// - **Tier 2 ([`Message::Pause`] / [`Message::Resume`]):** translates
1916    ///   to [`Self::pause`] / [`Self::resume`] on each dep. Lock id is
1917    ///   forwarded verbatim. Errors from individual deps are accumulated
1918    ///   in the `dep_errors` field of the returned report.
1919    /// - **Tier 4 ([`Message::Invalidate`]):** translates to
1920    ///   [`Self::invalidate`] on each dep. Note: canonical R1.4.2
1921    ///   distinguishes "downstream INVALIDATE" (cache clear + cascade) from
1922    ///   "upstream INVALIDATE" (plain forward, no self-process). The Rust
1923    ///   port v1 SIMPLIFICATION delegates to the same `Core::invalidate`
1924    ///   path — upstream INVALIDATE here DOES clear dep caches and cascade.
1925    ///   If a "plain forward" mode surfaces as a real consumer need, add
1926    ///   `up_with_options`.
1927    /// - **Tier 6 ([`Message::Teardown`]):** translates to
1928    ///   [`Self::teardown`] on each dep. Cascades per the standard
1929    ///   teardown path.
1930    ///
1931    /// # Errors
1932    ///
1933    /// - [`UpError::UnknownNode`] — `node_id` is not registered.
1934    /// - [`UpError::TierForbidden`] — tier 3 or tier 5.
1935    pub fn up(&self, node_id: NodeId, message: Message) -> Result<(), UpError> {
1936        // QA A10 (2026-05-07): check unknown node BEFORE tier rejection
1937        // for consistent error UX — `up(unknown, Data)` and
1938        // `up(unknown, Pause)` both report `UnknownNode` rather than
1939        // splitting on the tier.
1940        let dep_ids: Vec<NodeId> = {
1941            let s = self.lock_state();
1942            let rec = s.nodes.get(&node_id).ok_or(UpError::UnknownNode(node_id))?;
1943            rec.dep_ids_vec()
1944        };
1945        let tier = message.tier();
1946        if tier == 3 || tier == 5 {
1947            return Err(UpError::TierForbidden { tier });
1948        }
1949        for dep_id in dep_ids {
1950            match message {
1951                Message::Pause(lock) => {
1952                    let _ = self.pause(dep_id, lock);
1953                }
1954                Message::Resume(lock) => {
1955                    let _ = self.resume(dep_id, lock);
1956                }
1957                Message::Invalidate => {
1958                    self.invalidate(dep_id);
1959                }
1960                Message::Teardown => {
1961                    self.teardown(dep_id);
1962                }
1963                // Tier 0 START + tier 1 DIRTY: no-op upstream per the
1964                // routing table above.
1965                _ => {}
1966            }
1967        }
1968        Ok(())
1969    }
1970
1971    /// Allocate a unique [`LockId`] for use with [`Self::pause`] /
1972    /// [`Self::resume`]. Convenience for callers that don't already have an
1973    /// id-allocation scheme; user-supplied ids work too.
1974    #[must_use]
1975    pub fn alloc_lock_id(&self) -> LockId {
1976        let mut s = self.lock_state();
1977        let id = LockId::new(s.next_lock_id);
1978        s.next_lock_id += 1;
1979        id
1980    }
1981
1982    // -------------------------------------------------------------------
1983    // Registration — unified `register()` (D030, Slice D)
1984    //
1985    // All node kinds (State / Producer / Derived / Dynamic / Operator)
1986    // funnel through `Core::register(NodeRegistration) -> NodeId`. Sugar
1987    // wrappers (`register_state` / `register_producer` / `register_derived`
1988    // / `register_dynamic` / `register_operator`) build a `NodeRegistration`
1989    // and delegate. There is no parallel registration path internally.
1990    // -------------------------------------------------------------------
1991
1992    /// Unified node registration (D030).
1993    ///
1994    /// `reg` describes the node's identity (deps + closure-form fn id OR
1995    /// typed-op + per-kind opts). The kind is **derived from the field
1996    /// shape**, not stored — see [`NodeKind`].
1997    ///
1998    /// Sugar wrappers below ([`Self::register_state`],
1999    /// [`Self::register_producer`], [`Self::register_derived`],
2000    /// [`Self::register_dynamic`], [`Self::register_operator`]) build the
2001    /// registration for the common kinds and delegate here. Direct callers
2002    /// that need uncommon combinations (e.g., a partial-true derived) can
2003    /// invoke this method directly.
2004    ///
2005    /// # Errors
2006    ///
2007    /// Errors are returned in evaluation order — earlier phases short-circuit
2008    /// later ones, so a single registration produces at most one variant.
2009    ///
2010    /// **Phase 1 — lock-released, side-effect-free validation:**
2011    /// - [`RegisterError::OperatorWithoutDeps`] — `reg` carries an op but
2012    ///   `deps` is empty. Operator nodes need at least one dep — for
2013    ///   subscription-managed combinators with no declared deps, use
2014    ///   [`Self::register_producer`] instead.
2015    /// - [`RegisterError::InitialOnlyForStateNodes`] — `reg.opts.initial`
2016    ///   is non-sentinel for a non-state shape (deps non-empty, or
2017    ///   fn_or_op present). State nodes are the only kind with an initial
2018    ///   cache.
2019    ///
2020    /// **Phase 2 — operator scratch construction (lock-released):**
2021    /// - [`RegisterError::OperatorSeedSentinel`] — `reg` carries `Op(Scan)`
2022    ///   / `Op(Reduce)` with a `NO_HANDLE` seed. R2.5.3 — stateful folders
2023    ///   must have a real seed.
2024    ///
2025    /// **Phase 3 — state-lock validation (folded with insertion under a
2026    /// single lock acquisition per /qa F1 to prevent TOCTOU between
2027    /// validation and `nodes.insert`):**
2028    /// - [`RegisterError::UnknownDep`] — any element of `reg.deps` is not
2029    ///   a registered node id.
2030    /// - [`RegisterError::TerminalDep`] — a dep is terminal (COMPLETE /
2031    ///   ERROR) AND not resubscribable — would create a permanent wedge.
2032    ///
2033    /// All errors are construction-time invariants — the dispatcher
2034    /// rejects the registration before any reactive state is created.
2035    /// On `Err`, no node has been added and any handle retains taken on
2036    /// the way in (operator scratch seed retains via
2037    /// [`BindingBoundary::retain_handle`]) have been released
2038    /// lock-released — see [`ScratchReleaseGuard`] for the RAII
2039    /// discipline that covers both early-return AND unwind paths.
2040    /// `Last { default }` retains its `default` handle on the same
2041    /// release path.
2042    pub fn register(&self, reg: NodeRegistration) -> Result<NodeId, RegisterError> {
2043        let NodeRegistration {
2044            deps,
2045            fn_or_op,
2046            opts,
2047        } = reg;
2048        let NodeOpts {
2049            initial,
2050            equals,
2051            partial,
2052            is_dynamic,
2053            pausable,
2054            replay_buffer,
2055        } = opts;
2056
2057        // Derive the field shape from fn_or_op + deps.
2058        let (fn_id, op) = match fn_or_op {
2059            Some(NodeFnOrOp::Fn(f)) => (Some(f), None),
2060            Some(NodeFnOrOp::Op(o)) => (None, Some(o)),
2061            None => (None, None),
2062        };
2063
2064        // Phase 1 — lock-released, side-effect-free validation. Errors
2065        // here return BEFORE any handle retain is taken.
2066        //
2067        //   - State (no deps + no fn + no op) is the only kind with `initial`.
2068        //   - Dynamic flag only meaningful when fn + non-empty deps.
2069        //   - Operator (op present) must have deps (P9: operator without deps
2070        //     would skip activation — use a producer instead).
2071        let is_state_shape = deps.is_empty() && fn_id.is_none() && op.is_none();
2072        if op.is_some() && deps.is_empty() {
2073            return Err(RegisterError::OperatorWithoutDeps);
2074        }
2075        if initial != NO_HANDLE && !is_state_shape {
2076            return Err(RegisterError::InitialOnlyForStateNodes);
2077        }
2078
2079        // Phase 2 — build per-operator scratch struct (may take handle
2080        // retains via `binding.retain_handle` for Scan/Reduce/Last seed).
2081        // Lock-released per Slice E (D045) handshake discipline. Returns
2082        // `OperatorSeedSentinel` BEFORE retain so an Err leaves no
2083        // dangling handles.
2084        let scratch = match op {
2085            Some(operator_op) => self.make_op_scratch(operator_op)?,
2086            None => None,
2087        };
2088
2089        // Wrap scratch in an RAII guard immediately after Phase 2. From
2090        // here on, ANY early return / unwind path correctly releases the
2091        // scratch's handle retains via `OperatorScratch::release_handles`
2092        // (Slice H /qa F2 — defense against panics between Phase 2 and
2093        // Phase 3 cleanup branch). Lock-released because the guard is
2094        // declared BEFORE `lock_state()` below — variable destruction
2095        // order is reverse declaration order, so the `MutexGuard` drops
2096        // first on any return path.
2097        let scratch_guard = ScratchReleaseGuard::new(scratch, &*self.binding);
2098
2099        // Phase 3 — state-lock-required validation, FOLDED with insertion
2100        // under a single `lock_state()` acquisition per /qa F1. The
2101        // pre-/qa version split this into two acquisitions (one for
2102        // validation, one for `alloc_node_id` + `nodes.insert`), opening
2103        // a TOCTOU window where a concurrent `Core::complete(dep)` on a
2104        // non-resubscribable dep could slip in and recreate the wedge
2105        // `TerminalDep` was designed to prevent. Single locked region
2106        // closes the gap.
2107        let mut s = self.lock_state();
2108
2109        for &dep in &deps {
2110            if !s.nodes.contains_key(&dep) {
2111                return Err(RegisterError::UnknownDep(dep));
2112            }
2113        }
2114        // Slice F audit (2026-05-07): mirror `set_deps`'s `TerminalDep`
2115        // rejection at registration time. Adding a non-resubscribable
2116        // terminal node as a dep at registration creates a permanent wedge.
2117        for &dep in &deps {
2118            let dep_rec = s.require_node(dep);
2119            if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
2120                return Err(RegisterError::TerminalDep(dep));
2121            }
2122        }
2123
2124        // Validation passed — install. Take scratch out of the guard
2125        // (disarms the release-on-drop) and continue using `s`.
2126        let installed_scratch = scratch_guard.take();
2127
2128        let id = s.alloc_node_id();
2129
2130        // `tracked`: Static derived + Operator track all deps; Dynamic
2131        // starts empty and fills via fn return; State / Producer have no
2132        // deps so tracked is empty.
2133        let tracked: HashSet<usize> = if op.is_some() {
2134            (0..deps.len()).collect()
2135        } else if is_dynamic {
2136            HashSet::new()
2137        } else if fn_id.is_some() && !deps.is_empty() {
2138            // Static derived
2139            (0..deps.len()).collect()
2140        } else {
2141            HashSet::new()
2142        };
2143
2144        let dep_records: Vec<DepRecord> = deps.iter().map(|&d| DepRecord::new(d)).collect();
2145
2146        let rec = NodeRecord {
2147            dep_records,
2148            fn_id,
2149            op,
2150            is_dynamic,
2151            equals,
2152            cache: initial,
2153            has_fired_once: initial != NO_HANDLE,
2154            subscribers: HashMap::new(),
2155            subscribers_revision: 0,
2156            tracked,
2157            dirty: false,
2158            involved_this_wave: false,
2159            pause_state: PauseState::Active,
2160            pausable,
2161            replay_buffer_cap: replay_buffer,
2162            replay_buffer: VecDeque::new(),
2163            terminal: None,
2164            has_received_teardown: false,
2165            resubscribable: false,
2166            meta_companions: Vec::new(),
2167            partial,
2168            op_scratch: installed_scratch,
2169        };
2170        s.nodes.insert(id, rec);
2171        s.children.insert(id, HashSet::new());
2172        for &dep in &deps {
2173            s.children.entry(dep).or_default().insert(id);
2174        }
2175        // Slice Y1 (D3 / D090 — P12 fix, 2026-05-08): maintain partition
2176        // membership BEFORE dropping the state lock. Closes the
2177        // eventual-consistency window where a concurrent thread observed
2178        // the new node in `s.nodes` / new edges in `s.children` but the
2179        // registry hadn't unioned the partition yet. Today benign
2180        // (`partition_of` is debug-only); under Y1's wave engine
2181        // migration `lock_for(node)` consumes registry state on the hot
2182        // path, and the window means `lock_for` could resolve to a
2183        // partition that's been topologically unioned in `s.children`
2184        // but not yet in `registry`.
2185        //
2186        // **Lock-discipline invariant:** `state lock → registry mutex`
2187        // (one-way; never registry → state). Registry mutex is
2188        // uncontended in the X5 substrate — the only acquisition sites
2189        // are this one + `Core::set_deps` + the read-only accessors
2190        // `partition_count`/`partition_of` and (Y1+) `lock_for` — none
2191        // of which take the state lock — so the inner critical section
2192        // adds negligible latency.
2193        {
2194            let mut reg = self.registry.lock();
2195            reg.ensure_registered(id);
2196            for &dep in &deps {
2197                reg.union_nodes(id, dep);
2198            }
2199        }
2200        drop(s);
2201        self.fire_topology_event(&crate::topology::TopologyEvent::NodeRegistered(id));
2202        Ok(id)
2203    }
2204
2205    /// Sugar over [`Self::register`] — register a state node. `initial`
2206    /// may be [`NO_HANDLE`] to start sentinel.
2207    ///
2208    /// `partial` is accepted for surface consistency (D019); for state
2209    /// nodes it has no effect (state nodes don't fire fn).
2210    ///
2211    /// # Errors
2212    ///
2213    /// State registration is structurally simple — no deps, no op — so
2214    /// the only reachable variant is none in practice. Returns
2215    /// [`Result`] for surface consistency with [`Self::register`].
2216    pub fn register_state(
2217        &self,
2218        initial: HandleId,
2219        partial: bool,
2220    ) -> Result<NodeId, RegisterError> {
2221        self.register(NodeRegistration {
2222            deps: Vec::new(),
2223            fn_or_op: None,
2224            opts: NodeOpts {
2225                initial,
2226                partial,
2227                ..NodeOpts::default()
2228            },
2229        })
2230    }
2231
2232    /// Sugar over [`Self::register`] — register a producer node (D031,
2233    /// Slice D). No deps; fn fires once on first subscribe; cleanup runs
2234    /// via [`BindingBoundary::producer_deactivate`] when the last
2235    /// subscriber unsubscribes.
2236    ///
2237    /// The fn body uses the binding's `ProducerCtx`-equivalent helper
2238    /// (see `graphrefly-operators::producer`) to subscribe to other Core
2239    /// nodes — the zip / concat / race / takeUntil pattern.
2240    ///
2241    /// # Errors
2242    ///
2243    /// Producer registration has no user-supplied deps, so structurally
2244    /// none of [`RegisterError`]'s variants are reachable. Returns
2245    /// [`Result`] for surface consistency with [`Self::register`].
2246    pub fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
2247        self.register(NodeRegistration {
2248            deps: Vec::new(),
2249            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2250            opts: NodeOpts {
2251                // Producers have no deps — the first-run gate is degenerate.
2252                partial: true,
2253                ..NodeOpts::default()
2254            },
2255        })
2256    }
2257
2258    /// Sugar over [`Self::register`] — register a derived (static) node.
2259    /// `partial` controls the R2.5.3 first-run gate (D011).
2260    ///
2261    /// # Errors
2262    ///
2263    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2264    ///   registered.
2265    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2266    ///   resubscribable.
2267    pub fn register_derived(
2268        &self,
2269        deps: &[NodeId],
2270        fn_id: FnId,
2271        equals: EqualsMode,
2272        partial: bool,
2273    ) -> Result<NodeId, RegisterError> {
2274        self.register(NodeRegistration {
2275            deps: deps.to_vec(),
2276            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2277            opts: NodeOpts {
2278                equals,
2279                partial,
2280                ..NodeOpts::default()
2281            },
2282        })
2283    }
2284
2285    /// Sugar over [`Self::register`] — register a dynamic node (fn
2286    /// declares its actually-tracked dep indices per fire). `partial`
2287    /// controls the R2.5.3 first-run gate (D011).
2288    ///
2289    /// # Errors
2290    ///
2291    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2292    ///   registered.
2293    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2294    ///   resubscribable.
2295    pub fn register_dynamic(
2296        &self,
2297        deps: &[NodeId],
2298        fn_id: FnId,
2299        equals: EqualsMode,
2300        partial: bool,
2301    ) -> Result<NodeId, RegisterError> {
2302        self.register(NodeRegistration {
2303            deps: deps.to_vec(),
2304            fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2305            opts: NodeOpts {
2306                equals,
2307                partial,
2308                is_dynamic: true,
2309                ..NodeOpts::default()
2310            },
2311        })
2312    }
2313
2314    /// Build a fresh [`OperatorScratch`](crate::op_state::OperatorScratch)
2315    /// box for an operator variant, taking any required handle retains.
2316    /// Shared between `register_operator` (initial install) and
2317    /// `reset_for_fresh_lifecycle` (resubscribable cycle re-install).
2318    ///
2319    /// # Errors
2320    ///
2321    /// Returns [`RegisterError::OperatorSeedSentinel`] if `op` is `Scan`
2322    /// / `Reduce` with a [`NO_HANDLE`] seed (R2.5.3 — stateful folders
2323    /// must have a real seed). Refcount discipline: the seed-sentinel
2324    /// check happens BEFORE [`BindingBoundary::retain_handle`], so an
2325    /// `Err` leaves no handles dangling.
2326    fn make_op_scratch(
2327        &self,
2328        op: OperatorOp,
2329    ) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
2330        use crate::op_state::{
2331            DistinctState, LastState, PairwiseState, ReduceState, ScanState, SkipState, TakeState,
2332            TakeWhileState,
2333        };
2334        // Slice H (2026-05-07): Scan/Reduce seed-sentinel checks happen
2335        // BEFORE retain_handle so an Err return leaves no handles dangling.
2336        //
2337        // Slice H /qa F13 (2026-05-07): for retaining variants, allocate
2338        // the `Box<State>` BEFORE calling `binding.retain_handle`. If
2339        // `Box::new` panics (e.g., OOM-as-panic), no retain has happened
2340        // yet — no leak. If `retain_handle` panics after Box succeeds,
2341        // the `Box<State>` is dropped on unwind; State has no handle yet
2342        // (we haven't touched the registry refcount), so still no leak.
2343        // Caller wraps the returned scratch in `ScratchReleaseGuard` to
2344        // cover panics AFTER make_op_scratch returns.
2345        match op {
2346            OperatorOp::Scan { seed, .. } => {
2347                if seed == NO_HANDLE {
2348                    return Err(RegisterError::OperatorSeedSentinel);
2349                }
2350                let state = Box::new(ScanState { acc: seed });
2351                self.binding.retain_handle(seed);
2352                Ok(Some(state))
2353            }
2354            OperatorOp::Reduce { seed, .. } => {
2355                if seed == NO_HANDLE {
2356                    return Err(RegisterError::OperatorSeedSentinel);
2357                }
2358                let state = Box::new(ReduceState { acc: seed });
2359                self.binding.retain_handle(seed);
2360                Ok(Some(state))
2361            }
2362            OperatorOp::DistinctUntilChanged { .. } => Ok(Some(Box::new(DistinctState::default()))),
2363            OperatorOp::Pairwise { .. } => Ok(Some(Box::new(PairwiseState::default()))),
2364            OperatorOp::Take { .. } => Ok(Some(Box::new(TakeState::default()))),
2365            OperatorOp::Skip { .. } => Ok(Some(Box::new(SkipState::default()))),
2366            OperatorOp::TakeWhile { .. } => Ok(Some(Box::new(TakeWhileState))),
2367            OperatorOp::Last { default } => {
2368                let state = Box::new(LastState {
2369                    latest: NO_HANDLE,
2370                    default,
2371                });
2372                if default != NO_HANDLE {
2373                    self.binding.retain_handle(default);
2374                }
2375                Ok(Some(state))
2376            }
2377            OperatorOp::Map { .. }
2378            | OperatorOp::Filter { .. }
2379            | OperatorOp::Combine { .. }
2380            | OperatorOp::WithLatestFrom { .. }
2381            | OperatorOp::Merge => Ok(None),
2382        }
2383    }
2384
2385    /// Sugar over [`Self::register`] — register a built-in operator node
2386    /// (Slice C-1, D009; D026 generic scratch). The operator dispatch path
2387    /// lives in `fire_operator`; `op` selects which per-operator FFI
2388    /// method on [`BindingBoundary`] gets called per fire.
2389    ///
2390    /// For stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
2391    /// [`Last`] with a default), the seed/default handle is captured
2392    /// into the appropriate
2393    /// [`OperatorScratch`](crate::op_state::OperatorScratch) struct
2394    /// stored at [`NodeRecord::op_scratch`], and Core takes one retain
2395    /// share via [`BindingBoundary::retain_handle`].
2396    ///
2397    /// # Errors
2398    ///
2399    /// - [`RegisterError::OperatorWithoutDeps`] — `deps` is empty (use
2400    ///   [`Self::register_producer`] instead).
2401    /// - [`RegisterError::OperatorSeedSentinel`] — `op` is
2402    ///   [`OperatorOp::Scan`] / [`OperatorOp::Reduce`] with a
2403    ///   [`NO_HANDLE`] seed.
2404    /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2405    ///   registered.
2406    /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2407    ///   resubscribable.
2408    pub fn register_operator(
2409        &self,
2410        deps: &[NodeId],
2411        op: OperatorOp,
2412        opts: OperatorOpts,
2413    ) -> Result<NodeId, RegisterError> {
2414        self.register(NodeRegistration {
2415            deps: deps.to_vec(),
2416            fn_or_op: Some(NodeFnOrOp::Op(op)),
2417            opts: NodeOpts {
2418                equals: opts.equals,
2419                partial: opts.partial,
2420                ..NodeOpts::default()
2421            },
2422        })
2423    }
2424
2425    // -------------------------------------------------------------------
2426    // Subscription
2427    // -------------------------------------------------------------------
2428
2429    /// Subscribe a sink to a node. Returns a [`Subscription`] handle —
2430    /// dropping the handle unsubscribes the sink. Per §10.12, no manual
2431    /// `unsubscribe(node, id)` call is required.
2432    ///
2433    /// Push-on-subscribe (R1.2.3, R2.2.3 step 4): the sink is registered AFTER
2434    /// the START handshake fires. The handshake contents depend on node
2435    /// state:
2436    /// - Sentinel cache + live (non-terminal): `[START]`
2437    /// - Cached + live: `[START, DATA(handle)]`
2438    /// - Cached + terminated (non-resubscribable): `[START, DATA(handle), <terminal>]`
2439    /// - Sentinel + terminated (non-resubscribable): `[START, <terminal>]`
2440    ///
2441    /// Resubscribable terminal lifecycle (R2.2.7 / R2.5.3): if the node was
2442    /// marked resubscribable via [`Self::set_resubscribable`] AND has
2443    /// terminated, the subscribe call first **resets** the node — clears
2444    /// `terminal`, `has_fired_once`, `has_received_teardown`, all
2445    /// `dep_handles` to `NO_HANDLE`, all `dep_terminals` to `None`, and
2446    /// drains the pause lockset. The new subscriber then receives a fresh
2447    /// `[START]` (cache may survive for state nodes; sentinel for compute).
2448    ///
2449    /// Activation (R2.2.3 step 5): if this is the first subscriber and the
2450    /// node is a derived/dynamic compute, recursively activate deps so their
2451    /// cached handles fill our `dep_handles`.
2452    #[allow(clippy::needless_pass_by_value)] // Sink is `Arc<dyn Fn>`; we clone for the subscribers map and call it directly. Taking by value matches the ergonomics callers expect.
2453    pub fn subscribe(&self, node_id: NodeId, sink: Sink) -> Subscription {
2454        // Subscribe protocol (Slice E rework, post-handshake-reentry-lift):
2455        //
2456        // 1. Acquire `wave_owner` first (re-entrant; same-thread passes
2457        //    through, cross-thread blocks). This is the cross-thread
2458        //    serialization point that preserves R1.3.5.a happens-after
2459        //    ordering across the lock-released handshake fire.
2460        // 2. Acquire state lock briefly: alloc sub_id, run resubscribable
2461        //    reset if applicable, snapshot handshake state, install sink
2462        //    in `subscribers`. Drop state lock.
2463        // 3. Fire handshake LOCK-RELEASED. Per-tier slices (R1.3.5.a):
2464        //    `[Start]` / `[Data(cache)]?` / `[Complete]?` / `[Error(h)]?`
2465        //    / `[Teardown]?`. Empty tiers are skipped. Sink callbacks
2466        //    may re-enter Core freely — same-thread re-entry passes
2467        //    through `wave_owner` reentrantly.
2468        // 4. Run activation under `run_wave` if needed (first subscriber
2469        //    on a non-state node).
2470        // 5. Drop `wave_owner`.
2471        //
2472        // Race-fix discipline: the sink is installed in `subscribers`
2473        // BEFORE the state lock drops, so concurrent threads that
2474        // acquire `wave_owner` after our scope sees the sink already
2475        // registered. Cross-thread emits block on `wave_owner` until
2476        // we drop it, ensuring all our handshake calls land before
2477        // any concurrent wave's flush observes the sink.
2478
2479        // Acquire the partition's `wave_owner` first — cross-thread
2480        // serialization point. Per Slice Y1 / Phase E (2026-05-08),
2481        // subscribe routes through the per-partition lock instead of
2482        // a Core-global one. Subscribe touches only `node_id`'s
2483        // partition (activation cascade stays within the partition
2484        // because dep edges are unioned). `partition_wave_owner_lock_arc`
2485        // does retry-validate against concurrent union/split.
2486        // `lock_arc()` is `!Send`; same-thread reentrant.
2487        let _wave_guard = self.partition_wave_owner_lock_arc(node_id);
2488
2489        let (sub_id, tier_slices, needs_activation, did_reset) = {
2490            let mut s = self.lock_state();
2491            let sub_id = s.alloc_sub_id();
2492
2493            // Resubscribable reset: terminal + flagged → clear lifecycle
2494            // state so the incoming subscriber starts fresh. F3 audit
2495            // guard: a node that has received TEARDOWN (R2.6.4) is
2496            // permanently destroyed at this layer; resurrecting it via a
2497            // late subscribe is a category error. COMPLETE/ERROR is
2498            // recoverable for resubscribable nodes; TEARDOWN is not. The
2499            // handshake will still replay the terminal in the non-reset
2500            // branch so the late subscriber sees a clean
2501            // `[START, ?DATA, COMPLETE|ERROR, TEARDOWN]` stream.
2502            let needs_reset = {
2503                let rec = s.require_node(node_id);
2504                rec.resubscribable && rec.terminal.is_some() && !rec.has_received_teardown
2505            };
2506            if needs_reset {
2507                self.reset_for_fresh_lifecycle(&mut s, node_id);
2508            }
2509
2510            // Snapshot handshake state under lock.
2511            let (cache, is_state, first_subscriber, terminal, torn_down) = {
2512                let rec = s.require_node(node_id);
2513                (
2514                    rec.cache,
2515                    rec.is_state(),
2516                    rec.subscribers.is_empty(),
2517                    rec.terminal,
2518                    rec.has_received_teardown,
2519                )
2520            };
2521
2522            // Build per-tier handshake slices. Each non-empty slice is
2523            // fired as a separate sink call (R1.3.5.a tier-split).
2524            let mut tier_slices: SmallVec<[Vec<Message>; 4]> = SmallVec::new();
2525            tier_slices.push(vec![Message::Start]);
2526            if cache != NO_HANDLE {
2527                tier_slices.push(vec![Message::Data(cache)]);
2528            }
2529            // Slice E1 (R2.6.5 / Lock 6.G): replay buffered DATA between
2530            // [Start] (and the cache slice, if present) and any terminal.
2531            // Each buffered handle becomes a separate per-tier slice so
2532            // late subscribers see the historical Data sequence as
2533            // distinct sink calls.
2534            //
2535            // Dedupe: when a cache slice is present and the buffer's last
2536            // entry is the same handle (the typical case — cache always
2537            // tracks the last DATA emitted, and the buffer's tail entry
2538            // is that same DATA), skip the last buffer entry to avoid
2539            // delivering Data(cache) twice. For state nodes whose cache
2540            // survives unsubscribe, the buffer may have older entries
2541            // the cache doesn't reflect; the dedupe only drops the
2542            // single trailing entry that equals cache. (QA A1, 2026-05-07)
2543            let replay_handles: Vec<HandleId> = {
2544                let rec = s.require_node(node_id);
2545                let cap = rec.replay_buffer_cap.unwrap_or(0);
2546                if cap == 0 {
2547                    Vec::new()
2548                } else {
2549                    let mut v: Vec<HandleId> = rec.replay_buffer.iter().copied().collect();
2550                    if cache != NO_HANDLE && v.last() == Some(&cache) {
2551                        v.pop();
2552                    }
2553                    v
2554                }
2555            };
2556            for h in &replay_handles {
2557                tier_slices.push(vec![Message::Data(*h)]);
2558            }
2559            if let Some(t) = terminal {
2560                tier_slices.push(vec![match t {
2561                    TerminalKind::Complete => Message::Complete,
2562                    TerminalKind::Error(h) => Message::Error(h),
2563                }]);
2564            }
2565            if torn_down {
2566                tier_slices.push(vec![Message::Teardown]);
2567            }
2568
2569            // Install sink BEFORE dropping state lock so any thread that
2570            // subsequently acquires `wave_owner` (after our scope ends)
2571            // sees the sink already registered.
2572            //
2573            // Slice X4 / D2: bump `subscribers_revision` alongside the
2574            // insert so a pending_notify entry opened earlier in the same
2575            // wave (e.g. inside `batch(|| { emit(s, h1); subscribe(s,
2576            // late); emit(s, h2); })`) starts a fresh `PendingBatch` on
2577            // its next `queue_notify` push — making the new sink visible
2578            // to subsequent emits' flush slices, while the pre-subscribe
2579            // batch's snapshot stays frozen so we don't double-deliver
2580            // earlier emits via the wave's flush AND the new sub's
2581            // handshake replay.
2582            {
2583                let rec = s.require_node_mut(node_id);
2584                rec.subscribers.insert(sub_id, sink.clone());
2585                rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
2586            }
2587
2588            let needs_activation = first_subscriber && !is_state;
2589            (sub_id, tier_slices, needs_activation, needs_reset)
2590            // state lock drops here
2591        };
2592
2593        // Slice E2 (R2.4.6 / D055): on resubscribable terminal reset, fire
2594        // `wipe_ctx` LOCK-RELEASED so the binding drops its `NodeCtxState`
2595        // entry (clearing both `store` and any residual `current_cleanup`).
2596        // The new subscriber's first invoke_fn sees a fresh empty store.
2597        // Fires AFTER the state lock drops so the binding's
2598        // `node_ctx.lock()` can't deadlock against Core's state lock — and
2599        // BEFORE the handshake so the wipe is observable before any
2600        // user-visible interaction with the new lifecycle.
2601        if did_reset {
2602            self.binding.wipe_ctx(node_id);
2603        }
2604
2605        // Fire handshake LOCK-RELEASED. Sink may re-enter Core; same-
2606        // thread re-entry passes through `wave_owner` reentrantly.
2607        // Cross-thread emits block at `wave_owner` until our scope ends.
2608        //
2609        // A7 (Slice F, 2026-05-07): per-tier slice fire is wrapped in
2610        // `catch_unwind`. The sink is installed in `subscribers` BEFORE
2611        // the handshake fires (load-bearing — concurrent threads observe
2612        // the sink immediately). If a sink panics on tier N, the panic
2613        // would otherwise unwind out of `subscribe` BEFORE the
2614        // `Subscription` handle is constructed, leaving the sink
2615        // registered in `subscribers` with no user-held handle to drop.
2616        // Subsequent waves' `flush_notifications` would re-fire the
2617        // panicking sink forever.
2618        //
2619        // On panic: remove the sink from `subscribers` (via the
2620        // already-allocated `sub_id`), drop `_wave_guard` cleanly via
2621        // RAII, and resume the unwind so the user observes the panic at
2622        // the call site. Same effect as the user dropping the
2623        // `Subscription` immediately, but pre-emptive.
2624        for slice in &tier_slices {
2625            let sink_clone = sink.clone();
2626            let slice_ref: &[Message] = slice;
2627            let result = catch_unwind(AssertUnwindSafe(|| sink_clone(slice_ref)));
2628            if let Err(panic_payload) = result {
2629                // Remove the orphaned sink. Best-effort: if the node was
2630                // since torn down (e.g., the sink itself called teardown
2631                // before panicking), the entry may already be gone.
2632                {
2633                    let mut s = self.lock_state();
2634                    if let Some(rec) = s.nodes.get_mut(&node_id) {
2635                        rec.subscribers.remove(&sub_id);
2636                        // Slice X4 / D2: keep revision-tracked snapshot
2637                        // discipline consistent with the install site —
2638                        // any pending_notify entry that already absorbed
2639                        // the panicking sink under the post-install
2640                        // revision should start a fresh batch on its
2641                        // next queue_notify push.
2642                        rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
2643                    }
2644                }
2645                std::panic::resume_unwind(panic_payload);
2646            }
2647        }
2648
2649        // Run activation if needed. `run_wave_for(node_id)` acquires
2650        // only the partitions transitively touched from `node_id`
2651        // (downstream cascade + meta-companion teardown reach) — same-
2652        // partition activation re-enters reentrantly. Slice Y1 / Phase E.
2653        if needs_activation {
2654            self.run_wave_for(node_id, |this| {
2655                let mut s = this.lock_state();
2656                this.activate_derived(&mut s, node_id);
2657            });
2658        }
2659
2660        Subscription {
2661            state: Arc::downgrade(&self.state),
2662            node_id,
2663            sub_id,
2664        }
2665        // _wave_guard drops here, releasing wave_owner.
2666    }
2667
2668    /// Mark `node_id` as resubscribable per R2.2.7. Resubscribable nodes
2669    /// reset their terminal-lifecycle state on a fresh subscribe — see
2670    /// [`Self::subscribe`].
2671    ///
2672    /// Configuration call — must be made before the node has any active
2673    /// subscribers, since changing the policy mid-flight would surprise
2674    /// existing observers.
2675    ///
2676    /// # Panics
2677    ///
2678    /// Panics if the node has subscribers (the policy is observable
2679    /// behavior; changing it after the fact would change semantics for
2680    /// existing sinks).
2681    pub fn set_resubscribable(&self, node_id: NodeId, resubscribable: bool) {
2682        let mut s = self.lock_state();
2683        let rec = s.require_node_mut(node_id);
2684        assert!(
2685            rec.subscribers.is_empty(),
2686            "set_resubscribable: node already has subscribers; \
2687             configure resubscribable before any subscribe call"
2688        );
2689        rec.resubscribable = resubscribable;
2690    }
2691
2692    /// Reset a resubscribable node's terminal-lifecycle state. Called from
2693    /// `subscribe` when a late subscriber arrives at a flagged node.
2694    ///
2695    /// Released: terminal-slot retain (Error handle), all per-dep terminal
2696    /// retains (Error handles), all data_batch retains.
2697    /// Cleared: `terminal`, `has_fired_once`, `has_received_teardown`, all
2698    /// dep_records to sentinel, the pause lockset (any held locks are
2699    /// released — replay buffer drops silently because there are no
2700    /// subscribers to flush to).
2701    fn reset_for_fresh_lifecycle(&self, s: &mut CoreState, node_id: NodeId) {
2702        // Phase 1: collect wave-state handle releases + take the old
2703        // op_scratch + reset other state. Take all mutations under one
2704        // borrow so the post-borrow phases don't re-walk dep_records.
2705        let (prev_op, mut old_scratch, handles_to_release, pause_buffer_payloads) = {
2706            let rec = s.require_node_mut(node_id);
2707            let mut hs = Vec::new();
2708            if let Some(TerminalKind::Error(h)) = rec.terminal {
2709                hs.push(h);
2710            }
2711            for dr in &rec.dep_records {
2712                if let Some(TerminalKind::Error(h)) = dr.terminal {
2713                    hs.push(h);
2714                }
2715                for &h in &dr.data_batch {
2716                    hs.push(h);
2717                }
2718                // Slice C-3 /qa: also release `prev_data`. Prior to this
2719                // collection, `reset_for_fresh_lifecycle` overwrote
2720                // `dr.prev_data = NO_HANDLE` without releasing the old
2721                // handle, leaking one share per dep per resubscribable
2722                // cycle. The leak was masked because no test exercised
2723                // the per-dep `prev_data` retain across a lifecycle
2724                // reset; surfaced by the T1 tightening of
2725                // `last_releases_buffered_latest_on_lifecycle_reset`.
2726                if dr.prev_data != NO_HANDLE {
2727                    hs.push(dr.prev_data);
2728                }
2729            }
2730            // Take pause_state's buffer; collect its payload handles for
2731            // release (they were retained at queue_notify time; buffer
2732            // drops because the new subscriber starts fresh).
2733            let mut pulled = Vec::new();
2734            if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
2735                for msg in buffer.drain(..) {
2736                    if let Some(h) = msg.payload_handle() {
2737                        pulled.push(h);
2738                    }
2739                }
2740            }
2741            // Slice E1: drain the replay buffer too — the new subscriber
2742            // gets a fresh lifecycle and shouldn't see prior emissions.
2743            for h in rec.replay_buffer.drain(..) {
2744                pulled.push(h);
2745            }
2746            // Reset wave / lifecycle state.
2747            rec.terminal = None;
2748            rec.has_fired_once = rec.cache != NO_HANDLE && rec.is_state();
2749            rec.has_received_teardown = false;
2750            for dr in &mut rec.dep_records {
2751                dr.prev_data = NO_HANDLE;
2752                dr.data_batch.clear();
2753                dr.terminal = None;
2754                dr.dirty = false;
2755                dr.involved_this_wave = false;
2756            }
2757            rec.pause_state = PauseState::Active;
2758            rec.involved_this_wave = false;
2759            rec.dirty = false;
2760            // P7 (Slice A close /qa): Dynamic nodes clear `tracked` so
2761            // the post-reset first fire repopulates from the fn's
2762            // returned tracked-deps set.
2763            if rec.is_dynamic {
2764                rec.tracked.clear();
2765            }
2766            // Take the old scratch out so we can release its handles and
2767            // install a fresh one. Operator op is copied for the
2768            // rebuild step below.
2769            let prev_op = rec.op;
2770            let old = std::mem::take(&mut rec.op_scratch);
2771            (prev_op, old, hs, pulled)
2772        };
2773
2774        // Phase 2 (Slice C-3 /qa P1 — RETAIN-BEFORE-RELEASE ordering):
2775        // build the fresh scratch FIRST, taking new retains on any
2776        // seed/default handles. This must run BEFORE Phase 3 releases
2777        // the old scratch's shares — if old `acc` (Scan/Reduce) or old
2778        // `latest` (Last) aliases the new `seed`/`default` (common:
2779        // `fold(seed, x) == seed` interns to the same registry entry),
2780        // releasing the old share first could collapse the binding's
2781        // registry slot to zero (production bindings remove the value
2782        // entry on refcount-zero — see `tests/common/mod.rs:191-204`),
2783        // and a subsequent `retain_handle` on the new seed would bump a
2784        // refcount on a slot whose value has been removed. By taking
2785        // the new retains first, we floor the refcount at ≥1 before
2786        // any release happens.
2787        let new_scratch = match prev_op {
2788            // Slice H: the OperatorOp stored on NodeRecord previously
2789            // passed `make_op_scratch` validation at registration time
2790            // (RegisterError::OperatorSeedSentinel for Scan/Reduce
2791            // sentinel seeds; Last { default: NO_HANDLE } is accepted
2792            // and never errors). Re-running it here on the same op
2793            // value is structurally guaranteed to succeed.
2794            Some(op) => self
2795                .make_op_scratch(op)
2796                .expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time"),
2797            None => None,
2798        };
2799
2800        // Phase 3: NOW release handles owned by the old op_scratch
2801        // (Scan/Reduce acc, Distinct/Pairwise prev, Last latest +
2802        // default). Safe per Phase 2's retain-first floor. The boxed
2803        // value is consumed and dropped after.
2804        if let Some(scratch) = old_scratch.as_mut() {
2805            scratch.release_handles(&*self.binding);
2806        }
2807        drop(old_scratch);
2808
2809        // Phase 4: install the fresh scratch.
2810        {
2811            let rec = s.require_node_mut(node_id);
2812            rec.op_scratch = new_scratch;
2813        }
2814
2815        // Phase 5: release wave-state handles collected in phase 1.
2816        for h in handles_to_release {
2817            self.binding.release_handle(h);
2818        }
2819        for h in pause_buffer_payloads {
2820            self.binding.release_handle(h);
2821        }
2822    }
2823
2824    /// Activate `root` and any transitive uncached compute deps so their
2825    /// caches fill our dep_handles slots.
2826    ///
2827    /// Slice A close (M1): pure dep-walk + dep_handles population +
2828    /// pending_fires queueing. No `in_tick` management or `drain_and_flush`
2829    /// call — the outer caller (typically `Core::subscribe` via
2830    /// [`Core::run_wave`]) owns the wave lifecycle and drains lock-released
2831    /// around `invoke_fn`.
2832    ///
2833    /// Walk shape:
2834    ///   1. **Discover phase (DFS via Vec stack):** starting at `root`,
2835    ///      walk transitively-needing-activation deps via the `deps`
2836    ///      chain. Build an ordering where each node appears AFTER all
2837    ///      of its uncached compute deps — i.e., reverse topological
2838    ///      among the visited subgraph.
2839    ///   2. **Deliver phase (forward iteration):** for each visited
2840    ///      node in dep-first order, push deps' caches into the node's
2841    ///      `dep_handles` slots. Caches that were sentinel pre-walk are
2842    ///      filled because their parent's fn fires later in the wave's
2843    ///      drain loop and `commit_emission` propagates new caches forward
2844    ///      via `deliver_data_to_consumer` — the same path this method
2845    ///      uses for the initial seed. Adds the node to `pending_fires`
2846    ///      if its tracked-deps gate is satisfied; the wave-engine drain
2847    ///      fires the fn lock-released around `invoke_fn`.
2848    pub(crate) fn activate_derived(&self, s: &mut CoreState, root: NodeId) {
2849        // Phase 1: discover. DFS to collect every compute node reachable
2850        // via deps that doesn't yet have a cache and hasn't fired.
2851        // Record them in dep-first (post-order) so phase 2 can deliver
2852        // caches forward. Frame is `(node_id, finalize)` — `finalize=false`
2853        // means "first visit: push deps then re-push self with finalize=true";
2854        // `finalize=true` means "deps have been expanded, append self to
2855        // `order`."
2856        let mut visited: HashSet<NodeId> = HashSet::new();
2857        let mut order: Vec<NodeId> = Vec::new();
2858        let mut stack: Vec<(NodeId, bool)> = vec![(root, false)];
2859        while let Some((id, finalize)) = stack.pop() {
2860            if finalize {
2861                order.push(id);
2862                continue;
2863            }
2864            if !visited.insert(id) {
2865                continue;
2866            }
2867            stack.push((id, true));
2868            let dep_ids: Vec<NodeId> = s.require_node(id).dep_ids_vec();
2869            for dep_id in dep_ids {
2870                let (dep_is_state, dep_cache, dep_has_fired) = {
2871                    let dep_rec = s.require_node(dep_id);
2872                    (dep_rec.is_state(), dep_rec.cache, dep_rec.has_fired_once)
2873                };
2874                if !dep_is_state
2875                    && dep_cache == NO_HANDLE
2876                    && !dep_has_fired
2877                    && !visited.contains(&dep_id)
2878                {
2879                    stack.push((dep_id, false));
2880                }
2881            }
2882        }
2883
2884        // Phase 2: deliver caches in dep-first order. For each node, walk
2885        // its deps and call `deliver_data_to_consumer` for any with caches.
2886        // Producer nodes (no deps + has fn — Slice D, D031) have no deps
2887        // to walk; queue them directly into `pending_fires` so the wave
2888        // engine fires their fn once on activation.
2889        for &id in &order {
2890            let (dep_ids, is_producer) = {
2891                let rec = s.require_node(id);
2892                (rec.dep_ids_vec(), rec.is_producer())
2893            };
2894            if is_producer {
2895                s.pending_fires.insert(id);
2896                continue;
2897            }
2898            for (i, dep_id) in dep_ids.iter().copied().enumerate() {
2899                let dep_cache = s.require_node(dep_id).cache;
2900                if dep_cache != NO_HANDLE {
2901                    self.deliver_data_to_consumer(s, id, i, dep_cache);
2902                }
2903            }
2904        }
2905    }
2906
2907    // -------------------------------------------------------------------
2908    // Emission entry point
2909    // -------------------------------------------------------------------
2910
2911    /// Set a state node's value. Triggers a wave (DIRTY → DATA/RESOLVED →
2912    /// fn fires for downstream).
2913    ///
2914    /// Silent no-op if the node has already terminated (R1.3.4). The handle
2915    /// passed in is still released by the caller's binding-side intern path
2916    /// — no implicit retain is consumed when the call short-circuits.
2917    ///
2918    /// # Panics
2919    ///
2920    /// Panics if `node_id` is not a state node, or if `new_handle` is
2921    /// [`NO_HANDLE`] (per R1.2.4, sentinel is not a valid DATA payload).
2922    pub fn emit(&self, node_id: NodeId, new_handle: HandleId) {
2923        assert!(
2924            new_handle != NO_HANDLE,
2925            "NO_HANDLE is not a valid DATA payload (R1.2.4)"
2926        );
2927        // Validate + terminal short-circuit under a brief lock.
2928        //
2929        // emit() is valid for State and Producer nodes — both are
2930        // intrinsic sources whose values are not derived from declared
2931        // deps. State nodes get emit() from user code; Producer nodes
2932        // get emit() from sink callbacks the producer's build closure
2933        // registered (sink fires → re-enter Core → emit on self).
2934        // Derived / Dynamic / Operator nodes emit via their fn return
2935        // value through fire_fn / fire_operator, NOT via emit().
2936        {
2937            let s = self.lock_state();
2938            let rec = s.require_node(node_id);
2939            assert!(
2940                rec.is_state() || rec.is_producer(),
2941                "emit() is for state or producer nodes only; \
2942                 derived/dynamic/operator emit via their fn return value"
2943            );
2944            if rec.terminal.is_some() {
2945                drop(s);
2946                // Caller's intern share would otherwise leak; cache slot
2947                // ownership doesn't transfer because we're not advancing
2948                // cache. Released lock-released so the binding can't
2949                // deadlock against an internal binding mutex.
2950                self.binding.release_handle(new_handle);
2951                return;
2952            }
2953        }
2954        // Run wave on `node_id`'s touched partitions. Slice Y1 / Phase E:
2955        // emit cascades only via `s.children`, all unioned with `node_id`'s
2956        // partition by construction (dep edges = union edges). Common case
2957        // is a single-partition acquire — disjoint-partition emits run
2958        // truly parallel under per-partition `wave_owner`.
2959        self.run_wave_for(node_id, |this| {
2960            this.commit_emission(node_id, new_handle);
2961        });
2962    }
2963
2964    /// Read a node's current cache. Returns [`NO_HANDLE`] if sentinel.
2965    #[must_use]
2966    pub fn cache_of(&self, node_id: NodeId) -> HandleId {
2967        self.lock_state().require_node(node_id).cache
2968    }
2969
2970    /// Whether the node's fn has fired at least once (compute) OR it has had
2971    /// a non-sentinel value (state).
2972    #[must_use]
2973    pub fn has_fired_once(&self, node_id: NodeId) -> bool {
2974        self.lock_state().require_node(node_id).has_fired_once
2975    }
2976
2977    // -------------------------------------------------------------------
2978    // Read-side inspection helpers (Slice E+, M2)
2979    //
2980    // Non-panicking accessors for graph-layer introspection (`describe()`,
2981    // `observe()`, `node_count()`). All five return Option/empty for
2982    // unknown ids — they're meant to back walks over `node_ids()` where
2983    // the caller already knows the ids are valid, plus debugging /
2984    // dry-run probes that prefer "absence" over "panic".
2985    //
2986    // Keep these strictly read-only: no wave entry, no binding callbacks,
2987    // no lock release. Each takes the state lock once, copies a small
2988    // value, and drops the lock.
2989    // -------------------------------------------------------------------
2990
2991    /// Snapshot of every registered `NodeId` in unspecified order. The
2992    /// order matches `HashMap` iteration over the internal node table —
2993    /// callers that need stable ordering should track names at the
2994    /// `Graph` layer (canonical spec §3.5 namespace).
2995    #[must_use]
2996    pub fn node_ids(&self) -> Vec<NodeId> {
2997        self.lock_state().nodes.keys().copied().collect()
2998    }
2999
3000    /// Total number of nodes registered in this Core.
3001    #[must_use]
3002    pub fn node_count(&self) -> usize {
3003        self.lock_state().nodes.len()
3004    }
3005
3006    /// Returns `Some(kind)` for known nodes, `None` for unknown. Kind is
3007    /// **derived** from the field shape per D030 — see [`NodeKind`].
3008    #[must_use]
3009    pub fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
3010        self.lock_state().nodes.get(&node_id).map(NodeRecord::kind)
3011    }
3012
3013    /// Snapshot of the node's deps in declaration order. Empty for
3014    /// unknown nodes or for state nodes (which have no deps).
3015    #[must_use]
3016    pub fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
3017        self.lock_state()
3018            .nodes
3019            .get(&node_id)
3020            .map(NodeRecord::dep_ids_vec)
3021            .unwrap_or_default()
3022    }
3023
3024    /// Returns `Some(kind)` if the node has terminated (R1.3.4) — the
3025    /// pair `Some(Complete)` / `Some(Error(h))` mirrors the wire message
3026    /// the node emitted. `None` for live nodes or unknown ids.
3027    #[must_use]
3028    pub fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
3029        self.lock_state()
3030            .nodes
3031            .get(&node_id)
3032            .and_then(|r| r.terminal)
3033    }
3034
3035    /// Whether the node has wave-scoped DIRTY pending (a tier-1 message
3036    /// queued but the matching tier-3 settle has not yet flushed).
3037    /// `false` for unknown ids. Mostly useful for `describe()` status
3038    /// classification (R3.6.1 `"dirty"`).
3039    #[must_use]
3040    pub fn is_dirty(&self, node_id: NodeId) -> bool {
3041        self.lock_state()
3042            .nodes
3043            .get(&node_id)
3044            .is_some_and(|r| r.dirty)
3045    }
3046
3047    /// Snapshot of `parent`'s meta companion list (R1.3.9.d / R2.3.3 —
3048    /// the companions added via [`Self::add_meta_companion`]). Empty
3049    /// for unknown ids or for nodes with no companions registered.
3050    ///
3051    /// Used by the graph layer's `signal_invalidate` to filter meta
3052    /// children out of the broadcast (canonical R3.7.2 — meta caches
3053    /// are preserved across graph-wide INVALIDATE).
3054    #[must_use]
3055    pub fn meta_companions_of(&self, parent: NodeId) -> Vec<NodeId> {
3056        self.lock_state()
3057            .nodes
3058            .get(&parent)
3059            .map(|r| r.meta_companions.clone())
3060            .unwrap_or_default()
3061    }
3062
3063    // -------------------------------------------------------------------
3064    // Wave engine — lives in `crate::batch` (Slice C-1 module split;
3065    // Slice A close M1 refactor lifted the binding-callback re-entrance
3066    // restrictions). The methods are still on `Core`; see `batch.rs` for:
3067    //
3068    //   - `run_wave` — wave entry, manages own locking.
3069    //   - `drain_and_flush` — drain phase, lock-released around invoke_fn.
3070    //   - `commit_emission` — lock-released around custom_equals.
3071    //   - `pick_next_fire`, `deliver_data_to_consumer`, `queue_notify`,
3072    //     `flush_notifications` — wave-engine helpers.
3073    // -------------------------------------------------------------------
3074}
3075
3076// -----------------------------------------------------------------------
3077// COMPLETE / ERROR — terminal lifecycle + auto-cascade gating
3078// -----------------------------------------------------------------------
3079
3080impl Core {
3081    /// Emit `[COMPLETE]` (R1.3.4) on `node_id`, marking it terminal. After
3082    /// this call:
3083    ///
3084    /// - Subsequent `Core::emit` on this node is a silent no-op (idempotent
3085    ///   termination).
3086    /// - The node's fn no longer fires.
3087    /// - The node's cache is preserved (last value still observable via
3088    ///   `cache_of`).
3089    /// - Children receive `[COMPLETE]` (tier 5 — bypasses pause buffer).
3090    /// - Auto-cascade gating (Lock 2.B): each child that has all of its
3091    ///   deps in a terminal state auto-emits its own `[COMPLETE]`. ERROR
3092    ///   dominates COMPLETE — if any of a child's deps emitted ERROR, the
3093    ///   child auto-cascades that ERROR instead.
3094    ///
3095    /// Idempotent: calling `complete` on an already-terminal node is a no-op.
3096    ///
3097    /// # Panics
3098    ///
3099    /// Panics if `node_id` is unknown.
3100    pub fn complete(&self, node_id: NodeId) {
3101        self.emit_terminal(node_id, TerminalKind::Complete);
3102    }
3103
3104    /// Emit `[ERROR, error_handle]` (R1.3.4) on `node_id`. `error_handle`
3105    /// must resolve to a non-sentinel value (R1.2.5) — the binding side has
3106    /// already interned the error value before this call. Same lifecycle
3107    /// effects as [`Self::complete`]; ERROR dominates COMPLETE in auto-
3108    /// cascade gating.
3109    ///
3110    /// # Panics
3111    ///
3112    /// Panics if `node_id` is unknown or `error_handle == NO_HANDLE`.
3113    pub fn error(&self, node_id: NodeId, error_handle: HandleId) {
3114        assert!(
3115            error_handle != NO_HANDLE,
3116            "NO_HANDLE is not a valid ERROR payload (R1.2.5)"
3117        );
3118        self.emit_terminal(node_id, TerminalKind::Error(error_handle));
3119        // The caller's intern share for `error_handle` is NOT transferred
3120        // to any slot — `terminate_node` takes its OWN retain for every
3121        // populated `terminal` and `dep_terminals` slot. Release the
3122        // caller's share here (mirrors `Core::emit`'s short-circuit
3123        // release on terminal). Without this, every `error()` call leaks
3124        // one binding-side handle ref. Slice A-bigger /qa item D fix.
3125        self.binding.release_handle(error_handle);
3126    }
3127
3128    fn emit_terminal(&self, node_id: NodeId, terminal: TerminalKind) {
3129        {
3130            let s = self.lock_state();
3131            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
3132        }
3133        // Wave on `node_id`'s touched partitions (Slice Y1 / Phase E).
3134        // COMPLETE / ERROR cascade follows `s.children` (in-partition by
3135        // union-find construction). The thunk acquires its own state lock
3136        // to queue the cascade.
3137        self.run_wave_for(node_id, |this| {
3138            let mut s = this.lock_state();
3139            this.terminate_node(&mut s, node_id, terminal);
3140        });
3141    }
3142
3143    /// Set the node's terminal slot, queue the wire message, and cascade to
3144    /// children. Idempotent on already-terminal node (no-op).
3145    ///
3146    /// Iterative implementation (Slice A-bigger, M1-close): a work-queue
3147    /// drives the cascade so deep linear chains don't overflow the OS
3148    /// thread stack. Mirrors `path_from_to`'s explicit-stack pattern.
3149    fn terminate_node(&self, s: &mut CoreState, node_id: NodeId, terminal: TerminalKind) {
3150        let mut work: Vec<(NodeId, TerminalKind)> = vec![(node_id, terminal)];
3151        while let Some((id, t)) = work.pop() {
3152            if s.require_node(id).terminal.is_some() {
3153                continue; // Idempotent — already terminal.
3154            }
3155            // Take a refcount share for the terminal slot so the error
3156            // handle outlives the binding-side intern's transient share.
3157            if let TerminalKind::Error(h) = t {
3158                self.binding.retain_handle(h);
3159            }
3160            // Slice E2 /qa Q2(b) (D069): if a resubscribable node is
3161            // terminating with no live subscribers, queue eager
3162            // `wipe_ctx` for the wave's lock-released drain. This is the
3163            // mutually-exclusive complement of the `Subscription::Drop`
3164            // wipe site: when the LAST sub drops first then terminate
3165            // fires, subs are empty here and we queue; when terminate
3166            // fires WITH subs still alive, we DON'T queue (subs not
3167            // empty), and `Subscription::Drop` will fire wipe directly
3168            // when those subs eventually drop. Either way, exactly one
3169            // wipe fires per terminal lifecycle.
3170            let queue_wipe = {
3171                let rec = s.require_node(id);
3172                rec.resubscribable && rec.subscribers.is_empty()
3173            };
3174            s.require_node_mut(id).terminal = Some(t);
3175            if queue_wipe {
3176                s.pending_wipes.push(id);
3177            }
3178            // Drain pending fires for this node — fn won't fire on a
3179            // terminal node.
3180            s.pending_fires.remove(&id);
3181            // R1.3.8.b / Slice F (A3, 2026-05-07): if this node was paused
3182            // when terminating (the canonical case is the R1.3.8.c overflow
3183            // ERROR synthesis path), drain the pause buffer and release
3184            // each payload's queue_notify-time retain. Without this, the
3185            // buffer leaks one share per buffered DATA/RESOLVED/INVALIDATE.
3186            // Subscribers receive the terminal directly via the cascade
3187            // below (tier-5 bypasses the pause buffer); the buffered
3188            // content is moot post-terminal.
3189            let drained: Vec<HandleId> = {
3190                let rec = s.require_node_mut(id);
3191                let mut drained: Vec<HandleId> = Vec::new();
3192                if rec.pause_state.is_paused() {
3193                    // Take the buffered messages out, then collapse the
3194                    // pause state to Active so subsequent code observes a
3195                    // clean lifecycle. Idempotent on Active (no-op).
3196                    let prev = std::mem::replace(&mut rec.pause_state, PauseState::Active);
3197                    if let PauseState::Paused { buffer, .. } = prev {
3198                        drained.extend(buffer.into_iter().filter_map(Message::payload_handle));
3199                    }
3200                }
3201                // QA A4 (2026-05-07): drain replay buffer on terminate. A
3202                // non-resubscribable terminal ends the lifecycle; without
3203                // this drain the buffer's retains leak until `Drop for
3204                // CoreState`. Resubscribable nodes' replay buffers are
3205                // also drained (when they're hit by a terminal cascade);
3206                // a fresh subscribe rebuilds the buffer from scratch as
3207                // part of `reset_for_fresh_lifecycle`.
3208                drained.extend(rec.replay_buffer.drain(..));
3209                drained
3210            };
3211            for h in drained {
3212                self.binding.release_handle(h);
3213            }
3214            // Queue the wire message (tier 5 — bypasses pause buffer).
3215            let msg = match t {
3216                TerminalKind::Complete => Message::Complete,
3217                TerminalKind::Error(h) => Message::Error(h),
3218            };
3219            self.queue_notify(s, id, msg);
3220            // Cascade to children.
3221            let child_ids: Vec<NodeId> = s
3222                .children
3223                .get(&id)
3224                .map(|c| c.iter().copied().collect())
3225                .unwrap_or_default();
3226            for child_id in child_ids {
3227                let dep_idx = s.require_node(child_id).dep_index_of(id);
3228                let Some(idx) = dep_idx else { continue };
3229                // Mark this child's per-dep terminal slot. Take a retain on
3230                // the error handle for the slot share.
3231                {
3232                    let child = s.require_node_mut(child_id);
3233                    if child.dep_records[idx].terminal.is_some() {
3234                        // Idempotent — child already saw this dep terminate.
3235                        continue;
3236                    }
3237                    child.dep_records[idx].terminal = Some(t);
3238                }
3239                if let TerminalKind::Error(h) = t {
3240                    self.binding.retain_handle(h);
3241                }
3242                // Auto-cascade gating: if all deps now terminal, push child
3243                // onto the work queue with the chosen terminal.
3244                //
3245                // Slice C-1: kinds that opt out of Lock 2.B (currently
3246                // `Operator(Reduce)`) intercept upstream COMPLETE so they
3247                // can emit their accumulator before terminating. Instead of
3248                // cascading, queue the child for fn-fire — `fire_operator`
3249                // sees `dep_records[0].terminal` set and emits the
3250                // appropriate batch (Data(acc) + Complete on COMPLETE,
3251                // Error(h) on ERROR).
3252                let action = {
3253                    let child = s.require_node(child_id);
3254                    if child.terminal.is_some() {
3255                        ChildAction::None // Already terminated — no-op.
3256                    } else if child.all_deps_terminal() {
3257                        if child.skips_auto_cascade() {
3258                            ChildAction::QueueFire
3259                        } else {
3260                            ChildAction::Cascade(pick_cascade_terminal(&child.dep_records))
3261                        }
3262                    } else {
3263                        ChildAction::None
3264                    }
3265                };
3266                match action {
3267                    ChildAction::None => {}
3268                    ChildAction::Cascade(t_child) => {
3269                        work.push((child_id, t_child));
3270                    }
3271                    ChildAction::QueueFire => {
3272                        s.pending_fires.insert(child_id);
3273                    }
3274                }
3275            }
3276        }
3277    }
3278}
3279
3280/// Outcome of Lock 2.B child gating in `terminate_node`'s cascade walk.
3281enum ChildAction {
3282    /// No cascade; child is already terminal or not yet all-deps-terminal.
3283    None,
3284    /// Auto-cascade with the picked terminal kind (ERROR dominates COMPLETE).
3285    Cascade(TerminalKind),
3286    /// Queue child for fn-fire instead of cascading. Used by operator
3287    /// kinds that intercept upstream terminal (Operator(Reduce)).
3288    QueueFire,
3289}
3290
3291/// Lock 2.B cascade-terminal selection: ERROR dominates COMPLETE; first
3292/// ERROR seen wins. Caller has already verified all deps are terminal.
3293fn pick_cascade_terminal(dep_records: &[DepRecord]) -> TerminalKind {
3294    for dr in dep_records {
3295        if let Some(TerminalKind::Error(h)) = dr.terminal {
3296            return TerminalKind::Error(h);
3297        }
3298    }
3299    TerminalKind::Complete
3300}
3301
3302// -----------------------------------------------------------------------
3303// TEARDOWN — destruction, with auto-COMPLETE prepend (R2.6.4 / Lock 6.F)
3304// -----------------------------------------------------------------------
3305
3306impl Core {
3307    /// Tear `node_id` down. Per R2.6.4 / Lock 6.F:
3308    ///
3309    /// - **Auto-prepend COMPLETE.** If the node has not yet emitted a
3310    ///   terminal (`COMPLETE` / `ERROR`), `terminate_node` is called with
3311    ///   `Complete` first so subscribers see `[COMPLETE, TEARDOWN]`, not
3312    ///   bare `[TEARDOWN]`. This guarantees a clean end-of-stream signal
3313    ///   to async iterators and other consumers that wait on terminal
3314    ///   delivery.
3315    /// - **Idempotent on duplicate delivery.** The per-node
3316    ///   `has_received_teardown` flag is set on the first call; subsequent
3317    ///   `teardown` calls (or cascade visits from other paths) are silent
3318    ///   no-ops — no second `[COMPLETE, TEARDOWN]` pair to subscribers.
3319    /// - **Cascade downstream.** Each child is recursively torn down. The
3320    ///   child's own COMPLETE auto-cascades from `terminate_node`'s logic
3321    ///   (Lock 2.B); its TEARDOWN comes from this cascade.
3322    ///
3323    /// # Panics
3324    ///
3325    /// Panics if `node_id` is unknown.
3326    pub fn teardown(&self, node_id: NodeId) {
3327        {
3328            let s = self.lock_state();
3329            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
3330        }
3331        let torn_down: Arc<Mutex<Vec<NodeId>>> = Arc::new(Mutex::new(Vec::new()));
3332        let torn_down_for_wave = torn_down.clone();
3333        // TEARDOWN cascade follows `s.children` AND `meta_companions`
3334        // (R1.3.9.d) — meta-companions can cross partitions. Slice Y1 /
3335        // Phase E `compute_touched_partitions(node_id)` (called by
3336        // `run_wave_for`) walks both axes so the wave acquires every
3337        // partition reachable via the cascade.
3338        self.run_wave_for(node_id, move |this| {
3339            let mut s = this.lock_state();
3340            let collected = this.teardown_inner(&mut s, node_id);
3341            torn_down_for_wave.lock().extend(collected);
3342        });
3343        // Fire NodeTornDown for every cascaded id (root + metas +
3344        // downstream consumers that auto-cascaded). Outside the state
3345        // lock, matching fire_topology_event discipline.
3346        let ids = std::mem::take(&mut *torn_down.lock());
3347        for id in ids {
3348            self.fire_topology_event(&crate::topology::TopologyEvent::NodeTornDown(id));
3349        }
3350    }
3351
3352    /// Iterative teardown walk (Slice A-bigger, M1-close).
3353    ///
3354    /// The recursive shape was:
3355    ///   ```text
3356    ///   teardown(n):
3357    ///     if torn_down: return
3358    ///     mark torn_down
3359    ///     for meta in metas: teardown(meta)
3360    ///     terminate_node + queue Teardown
3361    ///     for child in children: teardown(child)
3362    ///   ```
3363    /// Deep linear chains (~10k nodes) overflowed the OS thread stack.
3364    ///
3365    /// The iterative shape uses a `Vec<Action>` stack with `Visit` and
3366    /// `EmitTeardown` actions. `Visit(n)` marks `n` torn-down (or no-ops
3367    /// if already), then pushes (in reverse order so LIFO pops in forward
3368    /// order) `Visit(child_K), …, Visit(child_1), EmitTeardown(n),
3369    /// Visit(meta_M), …, Visit(meta_1)`. The R1.3.9.d "metas first, then
3370    /// self, then children" ordering is preserved by the push order:
3371    /// metas pop first, recursively expand and emit; then `EmitTeardown(n)`
3372    /// pops and runs `terminate_node` + queue `Teardown`; then children
3373    /// pop. Idempotency via `has_received_teardown` keeps each node
3374    /// visited at most once even when multi-parent diamonds re-enter via
3375    /// a sibling path.
3376    fn teardown_inner(&self, s: &mut CoreState, root: NodeId) -> Vec<NodeId> {
3377        enum Action {
3378            Visit(NodeId),
3379            EmitTeardown(NodeId),
3380        }
3381        let mut stack: Vec<Action> = vec![Action::Visit(root)];
3382        // Topology accumulator: every node that actually emits TEARDOWN
3383        // (i.e. each `EmitTeardown(id)` site, NOT each `Visit` — visits
3384        // for already-torn-down nodes short-circuit on idempotency).
3385        let mut torn_down: Vec<NodeId> = Vec::new();
3386        while let Some(action) = stack.pop() {
3387            match action {
3388                Action::Visit(id) => {
3389                    if s.require_node(id).has_received_teardown {
3390                        continue; // Idempotent (R2.6.4).
3391                    }
3392                    s.require_node_mut(id).has_received_teardown = true;
3393                    // Push order: children first (pop LAST), then
3394                    // EmitTeardown(id), then metas (pop FIRST). Reverse
3395                    // each list so within-group order matches the original
3396                    // recursive iteration.
3397                    let children: Vec<NodeId> = s
3398                        .children
3399                        .get(&id)
3400                        .map(|c| c.iter().copied().collect())
3401                        .unwrap_or_default();
3402                    for &child in children.iter().rev() {
3403                        stack.push(Action::Visit(child));
3404                    }
3405                    stack.push(Action::EmitTeardown(id));
3406                    let metas: Vec<NodeId> = s.require_node(id).meta_companions.clone();
3407                    for &meta in metas.iter().rev() {
3408                        stack.push(Action::Visit(meta));
3409                    }
3410                }
3411                Action::EmitTeardown(id) => {
3412                    // Auto-prepend COMPLETE if not yet terminal. The (now
3413                    // iterative) terminate_node handles auto-cascade to
3414                    // children's own terminal slots per Lock 2.B.
3415                    let already_terminal = s.require_node(id).terminal.is_some();
3416                    if !already_terminal {
3417                        self.terminate_node(s, id, TerminalKind::Complete);
3418                    }
3419                    // Wire emission of the TEARDOWN itself (tier 6).
3420                    self.queue_notify(s, id, Message::Teardown);
3421                    torn_down.push(id);
3422                }
3423            }
3424        }
3425        torn_down
3426    }
3427
3428    /// Attach `companion` as a meta companion of `parent` per R1.3.9.d.
3429    /// Meta companions are nodes whose lifecycle is bound to the parent's
3430    /// in TEARDOWN ordering: when `parent` tears down, `companion` tears
3431    /// down first.
3432    ///
3433    /// Use this for inspection / audit / sidecar nodes that subscribe to
3434    /// parent state — without the ordering, the companion could observe
3435    /// the parent mid-destruction and emit garbage.
3436    ///
3437    /// Idempotent on duplicate registration of the same companion.
3438    ///
3439    /// # Lifecycle constraint
3440    ///
3441    /// Intended for **setup-time** wiring — call this before `parent` or
3442    /// `companion` enters a wave. Mid-wave registration (especially during
3443    /// a teardown cascade in flight) is implementation-defined: the new
3444    /// edge takes effect on the *next* wave. Adding a companion to a
3445    /// torn-down parent silently no-ops (the parent will not tear down
3446    /// again). For dynamic companion attachment with deterministic
3447    /// ordering, prefer constructing the wiring before subscribers exist.
3448    ///
3449    /// # Panics
3450    ///
3451    /// Panics if either node id is unknown, or if `parent == companion`
3452    /// (a node cannot be its own meta companion — would loop on TEARDOWN).
3453    pub fn add_meta_companion(&self, parent: NodeId, companion: NodeId) {
3454        assert!(parent != companion, "node cannot be its own meta companion");
3455        let mut s = self.lock_state();
3456        assert!(s.nodes.contains_key(&parent), "unknown parent {parent:?}");
3457        assert!(
3458            s.nodes.contains_key(&companion),
3459            "unknown companion {companion:?}"
3460        );
3461        let metas = &mut s.require_node_mut(parent).meta_companions;
3462        if !metas.contains(&companion) {
3463            metas.push(companion);
3464        }
3465    }
3466}
3467
3468// -----------------------------------------------------------------------
3469// INVALIDATE — cache clear + downstream cascade
3470// -----------------------------------------------------------------------
3471
3472impl Core {
3473    /// Clear `node_id`'s cache and cascade `[INVALIDATE]` to downstream
3474    /// dependents per canonical spec §1.4.
3475    ///
3476    /// Semantics:
3477    /// - **Never-populated case (R1.4 line 197):** if `cache == NO_HANDLE`,
3478    ///   the call is a no-op — no cache to clear, no INVALIDATE emitted.
3479    ///   This naturally provides idempotency within a wave: once a node has
3480    ///   been invalidated this wave (cache = NO_HANDLE), a second invalidate
3481    ///   on the same node does nothing.
3482    /// - **Cache clear (immediate):** the node's cached handle is dropped
3483    ///   (refcount released), `cache` becomes `NO_HANDLE`. State nodes
3484    ///   keep `has_fired_once` per spec — INVALIDATE is not a re-gating
3485    ///   event (the next emission to a previously-fired state still does
3486    ///   not re-trigger the first-run gate; that's a resubscribable-terminal
3487    ///   lifecycle concern, separate slice).
3488    /// - **Wire emission (tier 4):** `[INVALIDATE]` is queued via the
3489    ///   normal pause-aware notify path. Buffers while paused, flushes
3490    ///   immediately otherwise.
3491    /// - **Downstream cascade:** for each child of this node, the child's
3492    ///   `dep_handles[idx_of_node]` is reset to `NO_HANDLE` (its previous
3493    ///   value referenced a now-released handle). The child is then
3494    ///   recursively invalidated (no-op if its cache was already
3495    ///   `NO_HANDLE`). This re-closes the child's first-run gate — fn
3496    ///   won't fire again until the upstream re-emits a value.
3497    ///
3498    /// Wraps in a fresh wave when called from outside a wave, so
3499    /// notifications flush at the natural wave boundary.
3500    ///
3501    /// # Panics
3502    ///
3503    /// Panics if `node_id` is unknown, consistent with `emit` / `pause`.
3504    pub fn invalidate(&self, node_id: NodeId) {
3505        {
3506            let s = self.lock_state();
3507            assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
3508        }
3509        // INVALIDATE cascade follows `s.children` (in-partition by union-
3510        // find construction). Slice Y1 / Phase E.
3511        self.run_wave_for(node_id, |this| {
3512            let mut s = this.lock_state();
3513            this.invalidate_inner(&mut s, node_id);
3514        });
3515    }
3516
3517    /// Iterative invalidate cascade (Slice A-bigger, M1-close).
3518    ///
3519    /// The recursive shape was a depth-first cache-clear walk:
3520    ///   ```text
3521    ///   invalidate(n):
3522    ///     if cache(n) == NO_HANDLE: return  // already-invalidated guard
3523    ///     cache(n) = NO_HANDLE; release handle
3524    ///     queue Invalidate(n)
3525    ///     for child in children:
3526    ///       child.dep_handles[idx] = NO_HANDLE
3527    ///       invalidate(child)
3528    ///   ```
3529    /// Deep linear chains overflowed the OS thread stack. The work-queue
3530    /// rewrite has no ordering subtleties (unlike teardown's R1.3.9.d
3531    /// metas-first constraint) — Invalidate is a tier-4 broadcast where
3532    /// the never-populated / already-invalidated guard provides natural
3533    /// idempotency for diamond fan-in.
3534    fn invalidate_inner(&self, s: &mut CoreState, root: NodeId) {
3535        let mut work: Vec<NodeId> = vec![root];
3536        while let Some(node_id) = work.pop() {
3537            // Never-populated / already-invalidated: no-op (R1.4 idempotency).
3538            // Per R1.3.9.c never-populated case, OnInvalidate cleanup hook
3539            // also does NOT fire — natural fallout of skipping via the
3540            // cache==NO_HANDLE guard (we never reach the queue-push below).
3541            let old_handle = s.require_node(node_id).cache;
3542            if old_handle == NO_HANDLE {
3543                continue;
3544            }
3545            // Clear cache + release the handle's slot ownership.
3546            s.require_node_mut(node_id).cache = NO_HANDLE;
3547            self.binding.release_handle(old_handle);
3548            // Slice E2 (R1.3.9.b strict per D057 + D058 fire-at-cache-clear):
3549            // queue OnInvalidate cleanup hook for lock-released drain at
3550            // wave-end. The dedup set guarantees at-most-once-per-wave-per-
3551            // node firing even if a node re-populates mid-wave (via fn-fire
3552            // emit) and gets re-invalidated through a separate path. Pure
3553            // cache==NO_HANDLE idempotency (above) catches "still at
3554            // sentinel" only; the explicit set is the strict R1.3.9.b
3555            // reading.
3556            if s.invalidate_hooks_fired_this_wave.insert(node_id) {
3557                s.deferred_cleanup_hooks
3558                    .push((node_id, CleanupTrigger::OnInvalidate));
3559            }
3560            // Wire emission. Pause-aware via queue_notify.
3561            self.queue_notify(s, node_id, Message::Invalidate);
3562            // Cascade: for each child, clear the dep record's prev_data
3563            // referencing this node and push child onto the work queue.
3564            let child_ids: Vec<NodeId> = s
3565                .children
3566                .get(&node_id)
3567                .map(|c| c.iter().copied().collect())
3568                .unwrap_or_default();
3569            for child_id in child_ids {
3570                let dep_idx = s.require_node(child_id).dep_index_of(node_id);
3571                if let Some(idx) = dep_idx {
3572                    // Reset the child's dep record — the handle was just
3573                    // released. Subsequent first-run-gate checks see
3574                    // sentinel and re-close.
3575                    //
3576                    // Snapshot prev_data + data_batch retains for deferred
3577                    // release, then clear the record. Two-phase to satisfy
3578                    // the borrow checker (nodes + deferred_handle_releases
3579                    // are separate CoreState fields).
3580                    let (old_prev, batch_hs): (HandleId, SmallVec<[HandleId; 1]>) = {
3581                        let dr = &s.require_node(child_id).dep_records[idx];
3582                        (dr.prev_data, dr.data_batch.clone())
3583                    };
3584                    if old_prev != NO_HANDLE {
3585                        s.deferred_handle_releases.push(old_prev);
3586                    }
3587                    for h in batch_hs {
3588                        s.deferred_handle_releases.push(h);
3589                    }
3590                    let dr = &mut s.require_node_mut(child_id).dep_records[idx];
3591                    dr.prev_data = NO_HANDLE;
3592                    dr.data_batch.clear();
3593                    work.push(child_id);
3594                }
3595            }
3596        }
3597    }
3598}
3599
3600// -----------------------------------------------------------------------
3601// PAUSE / RESUME — multi-pauser lockset + replay buffer
3602// -----------------------------------------------------------------------
3603
3604/// Reported back from [`Core::resume`] when the final lock releases.
3605///
3606/// `replayed` is the number of tier-3/tier-4 messages dispatched to
3607/// subscribers as part of the drain. `dropped` is the number of messages
3608/// that fell out the front of the buffer due to the Core-global
3609/// `pause_buffer_cap` while this pause cycle was active. A non-zero
3610/// `dropped` indicates a controller held the lock long enough to overflow
3611/// the cap; the binding may want to surface a warning or error.
3612#[derive(Copy, Clone, Debug, PartialEq, Eq)]
3613pub struct ResumeReport {
3614    pub replayed: u32,
3615    pub dropped: u32,
3616}
3617
3618impl Core {
3619    /// Acquire a pause lock on `node_id`. The first lock transitions the
3620    /// node from `Active` to `Paused`; further locks add to the lockset.
3621    /// While paused, tier-3 (DATA/RESOLVED) and tier-4 (INVALIDATE) outgoing
3622    /// messages buffer in the node's pause buffer; other tiers flush
3623    /// immediately.
3624    ///
3625    /// Re-acquiring the same `lock_id` is an idempotent no-op (matches TS
3626    /// convention, R1.2.6 silent on the case).
3627    pub fn pause(&self, node_id: NodeId, lock_id: LockId) -> Result<(), PauseError> {
3628        let mut s = self.lock_state();
3629        let rec = s
3630            .nodes
3631            .get_mut(&node_id)
3632            .ok_or(PauseError::UnknownNode(node_id))?;
3633        // QA A5 (2026-05-07): terminated nodes can't be re-paused. Without
3634        // this check, a stale pause-controller calling pause() on an
3635        // already-terminated node would re-arm `pause_state` to Paused.
3636        // The terminate_node path collapses pause_state → Active and
3637        // drains the buffer (A3-related), but doesn't gate subsequent
3638        // pause() calls. Treat as idempotent no-op (consistent with how
3639        // emit/complete/error early-return on terminal).
3640        if rec.terminal.is_some() {
3641            return Ok(());
3642        }
3643        // Slice F audit close (2026-05-07): `PausableMode::Off` means the
3644        // dispatcher ignores PAUSE for this node — tier-3 flushes
3645        // immediately, fn fires immediately. Treat the call as a successful
3646        // no-op so callers don't need to special-case.
3647        if rec.pausable == PausableMode::Off {
3648            return Ok(());
3649        }
3650        rec.pause_state.add_lock(lock_id);
3651        Ok(())
3652    }
3653
3654    /// Release a pause lock on `node_id`. If the lockset becomes empty, the
3655    /// node transitions back to `Active` and the buffered messages are
3656    /// dispatched to subscribers in arrival order. Returns a [`ResumeReport`]
3657    /// when the final lock released; `None` if the lockset is still
3658    /// non-empty (further locks held).
3659    ///
3660    /// Releasing an unknown `lock_id` (or releasing on an already-Active
3661    /// node) is an idempotent no-op returning `None`.
3662    pub fn resume(
3663        &self,
3664        node_id: NodeId,
3665        lock_id: LockId,
3666    ) -> Result<Option<ResumeReport>, PauseError> {
3667        // Phase 1 (lock-held): collect drained buffer + pending-wave flag +
3668        // sink Arcs. For default-mode nodes whose `pending_wave` was set
3669        // during pause, schedule a single fn-fire by adding to
3670        // `pending_fires` BEFORE we exit the lock — the wave engine picks
3671        // it up on the next drain tick.
3672        let (sinks, messages, dropped, pending_wave_for_default) = {
3673            let mut s = self.lock_state();
3674            let rec = s
3675                .nodes
3676                .get_mut(&node_id)
3677                .ok_or(PauseError::UnknownNode(node_id))?;
3678            // For Off mode, pause/resume are no-ops by construction.
3679            if rec.pausable == PausableMode::Off {
3680                return Ok(None);
3681            }
3682            let was_default_mode = rec.pausable == PausableMode::Default;
3683            // Capture pending_wave BEFORE remove_lock collapses the state.
3684            let pending_wave = if was_default_mode {
3685                rec.pause_state.take_pending_wave()
3686            } else {
3687                false
3688            };
3689            let Some((buffer, dropped)) = rec.pause_state.remove_lock(lock_id) else {
3690                // Not the final-resume — restore the pending_wave flag we
3691                // tentatively cleared, since we're not transitioning to
3692                // Active yet.
3693                if pending_wave {
3694                    rec.pause_state.mark_pending_wave();
3695                }
3696                return Ok(None);
3697            };
3698            let sinks: Vec<Sink> = rec.subscribers.values().cloned().collect();
3699            let messages: Vec<Message> = buffer.into_iter().collect();
3700            // Default-mode pending-wave handling: schedule the fn-fire so
3701            // the wave engine consolidates the pause-window dep deliveries
3702            // into one fn execution. State nodes don't fire fn (no
3703            // `pending_fires` membership has effect for them).
3704            if pending_wave && was_default_mode {
3705                s.pending_fires.insert(node_id);
3706            }
3707            (sinks, messages, dropped, pending_wave && was_default_mode)
3708        };
3709        let replayed = u32::try_from(messages.len()).unwrap_or(u32::MAX);
3710
3711        // Phase 2 (lock-released): fire sinks for ResumeAll-buffered
3712        // messages. Default-mode resume produces no buffered replay (the
3713        // consolidated fn-fire produces fresh wave traffic via the standard
3714        // commit_emission path).
3715        if !messages.is_empty() {
3716            for sink in &sinks {
3717                sink(&messages);
3718            }
3719            // Phase 3: balance the retain_handle calls done at buffer-push
3720            // time — sinks observe values but don't own refcount shares.
3721            for msg in &messages {
3722                if let Some(h) = msg.payload_handle() {
3723                    self.binding.release_handle(h);
3724                }
3725            }
3726        }
3727
3728        // Phase 4 (default-mode): drain the consolidated fn-fire scheduled
3729        // in Phase 1. `run_wave_for(node_id)` acquires the partitions
3730        // touched from `node_id` (Slice Y1 / Phase E) and runs the standard
3731        // drain pipeline; the new fn-fire emerges as a normal wave's worth
3732        // of messages to subscribers.
3733        if pending_wave_for_default {
3734            self.run_wave_for(node_id, |_this| {
3735                // The pending_fires entry was pushed in Phase 1 under the
3736                // lock. run_wave's drain picks it up.
3737            });
3738        }
3739        Ok(Some(ResumeReport { replayed, dropped }))
3740    }
3741
3742    /// True if the node currently holds at least one pause lock.
3743    #[must_use]
3744    pub fn is_paused(&self, node_id: NodeId) -> bool {
3745        self.state
3746            .lock()
3747            .require_node(node_id)
3748            .pause_state
3749            .is_paused()
3750    }
3751
3752    /// Number of pause locks currently held on `node_id`. `0` if Active.
3753    #[must_use]
3754    pub fn pause_lock_count(&self, node_id: NodeId) -> usize {
3755        self.state
3756            .lock()
3757            .require_node(node_id)
3758            .pause_state
3759            .lock_count()
3760    }
3761
3762    /// Test helper: whether `node_id` currently holds the given `lock_id`.
3763    #[must_use]
3764    pub fn holds_pause_lock(&self, node_id: NodeId, lock_id: LockId) -> bool {
3765        self.state
3766            .lock()
3767            .require_node(node_id)
3768            .pause_state
3769            .contains_lock(lock_id)
3770    }
3771}
3772
3773// -----------------------------------------------------------------------
3774// set_deps — atomic dep mutation
3775// -----------------------------------------------------------------------
3776
3777/// Errors returnable by [`Core::set_deps`].
3778///
3779/// Per `~/src/graphrefly-ts/docs/research/rewire-design-notes.md` and the
3780/// Phase 13.8 Q1 lock:
3781/// - `SelfDependency` — `n in newDeps` (self-loops are pathological without
3782///   explicit fixed-point semantics, which GraphReFly does not provide).
3783/// - `WouldCreateCycle { path }` — adding the new edge would create a cycle.
3784///   The `path` field reports the offending dep chain for debuggability.
3785/// - `UnknownNode` / `NotComputeNode` — invariant violations from the caller.
3786/// - `TerminalNode` — `n` itself has emitted COMPLETE/ERROR; rewiring a
3787///   terminal stream is a category error (terminal is one-shot at this
3788///   layer; recovery is the resubscribable path on a fresh subscribe).
3789/// - `TerminalDep` — a newly-added dep is terminal AND not resubscribable.
3790///   Resubscribable terminal deps are accepted because the subscribe path
3791///   resets their lifecycle. Non-resubscribable terminal deps would deliver
3792///   their already-emitted terminal directly to `n`'s `dep_terminals` slot,
3793///   which is rarely intended.
3794#[derive(Error, Debug, Clone, PartialEq)]
3795pub enum SetDepsError {
3796    /// `n` appeared in `new_deps` (self-loop rejection).
3797    #[error("set_deps({n:?}, ...): self-dependency rejected (n appeared in new_deps)")]
3798    SelfDependency { n: NodeId },
3799
3800    /// Adding the new dep would create a cycle. `path` is the chain
3801    /// `[added_dep, ..., n]` reachable via existing deps.
3802    #[error(
3803        "set_deps({n:?}, ...): cycle would form via path {path:?} \
3804         (adding {added_dep:?} → {n:?} closes the loop)"
3805    )]
3806    WouldCreateCycle {
3807        n: NodeId,
3808        added_dep: NodeId,
3809        path: Vec<NodeId>,
3810    },
3811
3812    #[error("set_deps: unknown node {0:?}")]
3813    UnknownNode(NodeId),
3814
3815    #[error("set_deps: node {0:?} is not a compute node (state nodes have no deps)")]
3816    NotComputeNode(NodeId),
3817
3818    /// `n` itself has terminated (COMPLETE / ERROR). Rewiring a terminal node
3819    /// is rejected — the stream has ended at this layer. To recover, mark
3820    /// the node resubscribable before terminate; a fresh subscribe will then
3821    /// reset its lifecycle.
3822    #[error("set_deps({n:?}, ...): node has already terminated; cannot rewire a terminal node")]
3823    TerminalNode { n: NodeId },
3824
3825    /// A newly-added dep is terminal AND non-resubscribable. Per Phase 13.8
3826    /// Q1, this is rejected; resubscribable terminal deps are allowed
3827    /// because the subscribe path resets them when activated. Already-present
3828    /// terminal deps are unaffected (their terminal status was accepted at
3829    /// the time they terminated).
3830    #[error(
3831        "set_deps({n:?}, ...): added dep {dep:?} is terminal and not resubscribable; \
3832         either mark it resubscribable before terminate, or remove the dep from new_deps"
3833    )]
3834    TerminalDep { n: NodeId, dep: NodeId },
3835
3836    /// `n` itself is currently mid-fire — a user fn for `n` re-entered Core
3837    /// via `set_deps(n, ...)` from inside `n`'s own `invoke_fn` /
3838    /// `project_each` / `predicate_each` / etc. Phase 1 of the dispatcher
3839    /// snapshotted `dep_handles` BEFORE the lock-released callback; the
3840    /// callback returning a `tracked` set indexed against THAT ordering
3841    /// would corrupt indices if the rewire re-orders deps mid-fire.
3842    /// Rejected to preserve the dynamic-tracked-indices invariant (D1).
3843    ///
3844    /// Workaround: schedule the rewire from a different node's fn (via
3845    /// `Core::emit` on a state node and observing the emit downstream),
3846    /// or perform the rewire after the wave completes (e.g. from a sink
3847    /// callback that is itself outside any fn-fire scope).
3848    ///
3849    /// Slice F (2026-05-07) — A6.
3850    #[error(
3851        "set_deps({n:?}, ...): rejected — node {n:?} is currently mid-fire \
3852         (set_deps from inside the firing node's own fn would corrupt the \
3853         Dynamic `tracked` indices snapshot taken before invoke_fn). \
3854         Schedule the rewire outside this fire scope."
3855    )]
3856    ReentrantOnFiringNode { n: NodeId },
3857
3858    /// `set_deps(n, ...)` would trigger a partition migration (union or
3859    /// split in the per-subgraph union-find registry) that affects the
3860    /// partition of a node currently mid-fire on this thread. Distinct
3861    /// from [`Self::ReentrantOnFiringNode`]: that variant rejects
3862    /// `set_deps(n, ...)` where `n` itself is firing; this variant
3863    /// rejects `set_deps(n, ...)` on some OTHER node whose union/split
3864    /// shifts a firing node's partition root mid-wave.
3865    ///
3866    /// Why this matters: Y1's wave engine holds an
3867    /// [`Arc<crate::subgraph::SubgraphLockBox>`] for the firing node's
3868    /// partition for the wave's duration. A union mid-wave swaps the
3869    /// box-identity for one of the two affected partitions; a split
3870    /// (Y1+ post-Phase-F) extracts a fresh box for the orphan side.
3871    /// Either way the held Arc would diverge from the registry's
3872    /// current root for that partition, so the wave would lose
3873    /// serialization against the box's true partition mid-flight.
3874    ///
3875    /// Per [`SESSION-rust-port-d3-per-subgraph-parallelism.md`](https://github.com/graphrefly/graphrefly-ts/blob/main/archive/docs/SESSION-rust-port-d3-per-subgraph-parallelism.md)
3876    /// Q3 = (a-strict): mid-wave migration is rejected at edge-mutation
3877    /// time. If a real consumer surfaces pressure to support mid-wave
3878    /// migration, lift via state-migration logic in a follow-up — but
3879    /// the v1 contract is "the partition a wave runs in cannot change
3880    /// shape mid-flight."
3881    ///
3882    /// `n` is the node whose `set_deps` was rejected; `firing` is the
3883    /// concretely-identified firing node whose partition would be
3884    /// migrated. Workaround: schedule the rewire outside the wave
3885    /// (e.g. emit a state-change that triggers `set_deps` from a sink
3886    /// callback running post-flush).
3887    ///
3888    /// Slice Y1 (D3 / D091, 2026-05-08).
3889    #[error(
3890        "set_deps({n:?}, ...): rejected — would migrate the partition of \
3891         currently-firing node {firing:?} mid-wave (union/split during \
3892         fire would invalidate the held wave_owner Arc). Schedule the \
3893         rewire outside the wave."
3894    )]
3895    PartitionMigrationDuringFire { n: NodeId, firing: NodeId },
3896}
3897
3898impl Core {
3899    /// Atomic dep mutation — change a node's upstream deps without TEARDOWN
3900    /// cascading and without losing cache.
3901    ///
3902    /// Per the TLA+-verified design at
3903    /// `~/src/graphrefly-ts/docs/research/wave_protocol_rewire.tla`
3904    /// (35,950 distinct states, all 7 invariants clean):
3905    ///
3906    /// - Removed deps: clear dirtyMask bit, drain pending queue, drop DepRecord.
3907    /// - Added deps: SENTINEL prevData; push-on-subscribe if added dep has cached DATA.
3908    /// - Preserved: `firstRunPassed`, `pauseLocks`, `pauseBuffer`, `cache` (ROM/RAM).
3909    /// - Status auto-settles if dirtyMask becomes empty.
3910    /// - Idempotent on `new_deps == current deps`.
3911    /// - Self-rewire `n ∈ new_deps` rejected (`SelfDependency`).
3912    /// - Cycles rejected (`WouldCreateCycle`).
3913    /// - Allowed mid-wave + while paused.
3914    /// - Phase 13.8 Q1: terminal `n` rejected (`TerminalNode`); newly-added
3915    ///   terminal non-resubscribable deps rejected (`TerminalDep`).
3916    ///
3917    /// The body is a single atomic dep-mutation transaction with several
3918    /// discrete validation stages. Splitting would require passing a
3919    /// partially-mutable CoreState across helpers, and the transaction's
3920    /// locality is what makes the F1 refcount-leak collection work.
3921    #[allow(clippy::too_many_lines)]
3922    pub fn set_deps(&self, n: NodeId, new_deps: &[NodeId]) -> Result<(), SetDepsError> {
3923        let mut s = self.lock_state();
3924        // Validate node exists and is compute. Read-once via the helper so
3925        // subsequent code can use `require_node(n)` without re-checking.
3926        let (is_state, is_producer, is_terminal) = {
3927            let rec = s.nodes.get(&n).ok_or(SetDepsError::UnknownNode(n))?;
3928            (rec.is_state(), rec.is_producer(), rec.terminal.is_some())
3929        };
3930        if is_state || is_producer {
3931            // State and Producer nodes have no declared deps — set_deps
3932            // is meaningless. Producer nodes manage their own subscriptions
3933            // through the binding's ProducerCtx; mutating their (empty)
3934            // dep set would not affect that.
3935            return Err(SetDepsError::NotComputeNode(n));
3936        }
3937        // Reject if `n` itself is terminal (Phase 13.8 Q1: terminal nodes
3938        // cannot be rewired; recovery is via resubscribable subscribe).
3939        if is_terminal {
3940            return Err(SetDepsError::TerminalNode { n });
3941        }
3942        // A6 reentrancy guard (Slice F, 2026-05-07): reject if `n` is
3943        // currently mid-fire on the wave-owner thread. Closes the D1 hazard
3944        // where `Phase 1` snapshotted `dep_handles` against pre-rewire dep
3945        // ordering and `Phase 3` would store the returned `tracked` indices
3946        // against post-rewire ordering. Same-thread re-entry is the only
3947        // path that matters — cross-thread emits already block on
3948        // `wave_owner` per the M1 design.
3949        if s.currently_firing.contains(&n) {
3950            return Err(SetDepsError::ReentrantOnFiringNode { n });
3951        }
3952        // Self-rewire rejection.
3953        if new_deps.contains(&n) {
3954            return Err(SetDepsError::SelfDependency { n });
3955        }
3956        // Validate all new deps exist.
3957        for &d in new_deps {
3958            if !s.nodes.contains_key(&d) {
3959                return Err(SetDepsError::UnknownNode(d));
3960            }
3961        }
3962        // Cycle detection: data flows parent → child via the `children` map.
3963        // Adding edge `d → n` (d becomes a dep of n) creates a cycle iff
3964        // `d` is already reachable from `n` via existing data-flow edges
3965        // (so `n → ... → d` exists, and the new `d → n` closes the loop).
3966        // DFS from `n` along `children` edges, looking for each added dep.
3967        let current_deps: HashSet<NodeId> = s.require_node(n).dep_ids().collect();
3968        let new_deps_set: HashSet<NodeId> = new_deps.iter().copied().collect();
3969        let added: HashSet<NodeId> = new_deps_set.difference(&current_deps).copied().collect();
3970        for &d in &added {
3971            if let Some(path) = self.path_from_to(&s, n, d) {
3972                return Err(SetDepsError::WouldCreateCycle {
3973                    n,
3974                    added_dep: d,
3975                    path,
3976                });
3977            }
3978        }
3979        // Phase 13.8 Q1: reject newly-added deps that are terminal AND not
3980        // resubscribable. Resubscribable terminal deps are allowed — the
3981        // subscribe path resets their lifecycle when something activates
3982        // them. Already-present (kept) deps are unaffected; their terminal
3983        // status was accepted at the time they terminated.
3984        for &d in &added {
3985            let dep_rec = s.require_node(d);
3986            if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
3987                return Err(SetDepsError::TerminalDep { n, dep: d });
3988            }
3989        }
3990        // Compute `removed` early (Phase F: needs to be available for P13
3991        // split-case widening below). Idempotent fast-path moved below the
3992        // P13 check accordingly.
3993        let removed: HashSet<NodeId> = current_deps.difference(&new_deps_set).copied().collect();
3994
3995        // Slice Y1 (D3 / D091 — P13, 2026-05-08): reject mid-wave set_deps
3996        // that would shift a currently-firing node's partition root.
3997        // Distinct from `ReentrantOnFiringNode` (same-node case, line above).
3998        // Holds the registry lock briefly under the state lock per the
3999        // P12-fix lock-discipline invariant `state lock → registry mutex`.
4000        //
4001        // **Two cases:**
4002        // 1. **Union (Phase D)** — adding a cross-partition dep merges two
4003        //    components. Both pre-merge components are affected (the
4004        //    smaller-rank loser's box is dropped, its members migrate to
4005        //    the winner's root).
4006        // 2. **Split (Phase F, 2026-05-09)** — removing an edge whose
4007        //    removal disconnects the dep graph splits one component into
4008        //    two. The pre-split component is affected (every member
4009        //    re-unions; the orphan side gets a fresh `SubgraphLockBox`
4010        //    while the keep side preserves the original Arc).
4011        //
4012        // Either case migrates the partition root (and box-identity) for
4013        // affected nodes mid-wave; if any node currently firing on this
4014        // thread is in an affected partition, the wave's held
4015        // `Arc<SubgraphLockBox>` would diverge from the registry's new
4016        // canonical box. Q3 = (a-strict) per the D3 design lock rejects
4017        // both cases at edge-mutation time.
4018        if !s.currently_firing.is_empty() && (!added.is_empty() || !removed.is_empty()) {
4019            let mut reg = self.registry.lock();
4020            // Snapshot firing nodes' partitions. `partition_of` is mutating
4021            // (path compression) but partition IDENTITY is stable across
4022            // reads (only `union_nodes` / `split_partition` mutate roots).
4023            let firing_with_partition: Vec<(NodeId, crate::subgraph::SubgraphId)> = s
4024                .currently_firing
4025                .iter()
4026                .filter_map(|&f| reg.partition_of(f).map(|p| (f, p)))
4027                .collect();
4028            if !firing_with_partition.is_empty() {
4029                let part_n = reg.partition_of(n);
4030                // Case 1 (union): for each added dep, check cross-partition merge.
4031                for &added_dep in &added {
4032                    let part_added = reg.partition_of(added_dep);
4033                    if part_n == part_added {
4034                        continue; // same-partition add is a no-op in union-find
4035                    }
4036                    let affected = [part_n, part_added];
4037                    if let Some(&(firing, _)) = firing_with_partition
4038                        .iter()
4039                        .find(|(_, p)| affected.contains(&Some(*p)))
4040                    {
4041                        return Err(SetDepsError::PartitionMigrationDuringFire { n, firing });
4042                    }
4043                }
4044                // Case 2 (split): for each removed dep, simulate undirected
4045                // BFS from `removed_dep` skipping the would-be-removed edge
4046                // (`removed_dep → n`); if `n` is unreachable, removal would
4047                // disconnect — split — affecting all nodes in that
4048                // partition. Since dep edges are within a single partition
4049                // by construction (union-find merges on edge add), every
4050                // node currently in the partition is affected.
4051                for &removed_dep in &removed {
4052                    let part_removed = reg.partition_of(removed_dep);
4053                    let visited = bfs_undirected_dep_graph(&s, removed_dep, Some((removed_dep, n)));
4054                    let would_disconnect = !visited.contains(&n);
4055                    if would_disconnect {
4056                        if let Some(&(firing, _)) = firing_with_partition
4057                            .iter()
4058                            .find(|(_, p)| Some(*p) == part_removed)
4059                        {
4060                            return Err(SetDepsError::PartitionMigrationDuringFire { n, firing });
4061                        }
4062                    }
4063                }
4064            }
4065        }
4066        // Idempotent fast-path. Now safe to short-circuit since the P13
4067        // check above already considered both `added` and `removed`.
4068        if added.is_empty() && removed.is_empty() {
4069            return Ok(());
4070        }
4071
4072        // Snapshot old deps (ordered) for topology event, before mutation.
4073        let old_deps_vec: Vec<NodeId> = s.require_node(n).dep_ids_vec();
4074
4075        // Carry out the rewire atomically.
4076        // 1. Build new dep_records, preserving DepRecord state for kept deps.
4077        let new_deps_vec: Vec<NodeId> = new_deps.to_vec();
4078        //
4079        // Refcount discipline (F1 audit fix): each `Some(TerminalKind::Error(h))`
4080        // slot owns a refcount share retained at `terminate_node` time. When a
4081        // dep is REMOVED, its slot is dropped — the corresponding handle's
4082        // share must be released here, otherwise it leaks until Core drop.
4083        // Also release data_batch retains for removed deps.
4084        let (new_dep_records, removed_handles): (Vec<DepRecord>, Vec<HandleId>) = {
4085            let rec = s.require_node(n);
4086            // Index old dep_records by NodeId for O(1) lookup of kept deps.
4087            let old_by_node: HashMap<NodeId, &DepRecord> =
4088                rec.dep_records.iter().map(|dr| (dr.node, dr)).collect();
4089            let new_records: Vec<DepRecord> = new_deps_vec
4090                .iter()
4091                .map(|&d| {
4092                    if let Some(old) = old_by_node.get(&d) {
4093                        // Kept dep: preserve all state (prev_data, data_batch,
4094                        // terminal, wave flags). Subscriptions stay live.
4095                        DepRecord {
4096                            node: d,
4097                            prev_data: old.prev_data,
4098                            dirty: old.dirty,
4099                            involved_this_wave: old.involved_this_wave,
4100                            data_batch: old.data_batch.clone(),
4101                            terminal: old.terminal,
4102                        }
4103                    } else {
4104                        // Added dep: fresh sentinel record.
4105                        DepRecord::new(d)
4106                    }
4107                })
4108                .collect();
4109            // Collect handles to release from REMOVED dep records.
4110            let mut to_release: Vec<HandleId> = Vec::new();
4111            for d in &removed {
4112                if let Some(old) = old_by_node.get(d) {
4113                    if let Some(TerminalKind::Error(h)) = old.terminal {
4114                        to_release.push(h);
4115                    }
4116                    // Release data_batch retains for removed deps.
4117                    for &h in &old.data_batch {
4118                        to_release.push(h);
4119                    }
4120                }
4121            }
4122            (new_records, to_release)
4123        };
4124        // Clear dirtyMask bit by re-emitting the wave-bookkeeping: we don't
4125        // currently model a per-dep dirtyMask explicitly (we use the boolean
4126        // `dirty` flag at node level). Removing a dep's entry from the implicit
4127        // mask is therefore implicit — by removing the dep, future emissions
4128        // from it can't re-arm the bit. The per-dep `involved_this_wave` flag
4129        // stays wave-scoped and gets cleared at wave end. The setDeps action
4130        // itself does NOT change the dirty boolean unless all deps are cleared;
4131        // in that case we settle.
4132        // Slice E2 (D067): on a dynamic node that had previously fired its
4133        // fn, capture `has_fired_once` BEFORE the reset so we can fire
4134        // `OnRerun` cleanup lock-released after `drop(s)` below. Without
4135        // this, the next `fire_regular` Phase 1 would capture
4136        // `has_fired_once = false`, causing Phase 1.5 to skip OnRerun —
4137        // silently dropping the prior activation's cleanup closure when
4138        // the next `invoke_fn` overwrites `current_cleanup`. Per spec
4139        // R2.4.5, `set_deps` does NOT end the activation cycle
4140        // (subscribe→unsubscribe is the cycle boundary), so OnRerun must
4141        // fire on every re-fire including post-set_deps.
4142        let fire_set_deps_on_rerun;
4143        {
4144            let rec = s.require_node_mut(n);
4145            fire_set_deps_on_rerun = rec.is_dynamic && rec.has_fired_once;
4146            rec.dep_records = new_dep_records;
4147            // Re-derive `tracked` for static derived: all indices.
4148            // For dynamic: clear `tracked` AND reset `has_fired_once` so the
4149            // next dep delivery satisfies the first-fire branch in
4150            // `deliver_data_to_consumer` (`!has_fired_once || tracked.contains(...)`).
4151            // Without resetting `has_fired_once`, the cleared `tracked` blocks
4152            // every future fire — fn never re-runs and the dynamic node sits
4153            // on stale cache derived from the old dep set. The next fire
4154            // re-runs fn unconditionally; fn's returned `tracked` then
4155            // repopulates `rec.tracked` and normal selective-deps semantics
4156            // resume from the next dep update onward.
4157            if rec.is_dynamic {
4158                rec.tracked.clear();
4159                rec.has_fired_once = false;
4160            } else {
4161                // Derived (static) and Operator track all deps.
4162                rec.tracked = (0..new_deps_vec.len()).collect();
4163            }
4164        }
4165
4166        // 2. Update inverted-edge map (children).
4167        for &removed_dep in &removed {
4168            if let Some(set) = s.children.get_mut(&removed_dep) {
4169                set.remove(&n);
4170            }
4171        }
4172        for &added_dep in &added {
4173            s.children.entry(added_dep).or_default().insert(n);
4174        }
4175
4176        // 3. Push-on-subscribe for added deps with cached DATA. Wraps in a
4177        // wave so any downstream propagation runs cleanly. We capture only
4178        // the LIST of added deps (not their cache values) because the cache
4179        // can change between releasing the validation lock and the wave's
4180        // re-acquisition — see the P2 race fix below.
4181        //
4182        // P2 (Slice A close /qa) — between `drop(s)` and `run_wave`'s
4183        // closure re-acquiring the lock, a concurrent thread could
4184        // invalidate one of the added deps, releasing its cache handle. A
4185        // pre-snapshot of `(added_dep, cache)` pairs would then carry a
4186        // dangling HandleId into `deliver_data_to_consumer`. The fix is to
4187        // re-read each added dep's `cache` INSIDE the closure (under the
4188        // freshly re-acquired state lock). The wave-owner re-entrant mutex
4189        // (Q2) blocks concurrent waves once we enter `run_wave`, so the
4190        // re-read sees a coherent post-validation state.
4191        let added_for_wave: Vec<NodeId> = added.iter().copied().collect();
4192        // Slice Y1 (D3 / D090 — P12 fix, 2026-05-08): maintain partition
4193        // membership BEFORE dropping the state lock so the registry can
4194        // never lag behind topology mutations as observed by concurrent
4195        // readers. Lock-order invariant `state lock → registry mutex`
4196        // (one-way; never registry → state) — see the matching block in
4197        // `Core::register` for the full rationale.
4198        //   - For each new edge: union the partitions of `n` and `added_dep`.
4199        //   - For each removed edge (Slice Y1 / Phase F, 2026-05-09):
4200        //     run undirected-dep-graph BFS from `removed_dep` over the
4201        //     POST-removal `s.children` + `dep_records`. If `n` is
4202        //     unreachable, the partition has split — gather the affected
4203        //     component nodes + intra-component edges, then call
4204        //     [`SubgraphRegistry::split_partition`] to migrate the orphan
4205        //     side onto a fresh `SubgraphLockBox`. Mid-fire splits would
4206        //     have been rejected at the P13 check above (Q3 = (a-strict)).
4207        {
4208            let mut reg = self.registry.lock();
4209            for &added_dep in &added {
4210                reg.union_nodes(n, added_dep);
4211            }
4212            for &removed_dep in &removed {
4213                // Post-removal BFS — `s.children[removed_dep]` no longer
4214                // contains `n`, and `s.nodes[n].dep_records` no longer
4215                // contains `removed_dep`. No skip needed.
4216                let visited = bfs_undirected_dep_graph(&s, removed_dep, None);
4217                if visited.contains(&n) {
4218                    // Still connected via other dep edges — no split.
4219                    continue;
4220                }
4221                // Disconnected. `visited` is the keep-side (containing
4222                // `removed_dep`). Identify the original component, the
4223                // intra-component dep edges, and split.
4224                let original_root = reg.find(removed_dep);
4225                // Snapshot keys before iterating — `find` mutates via
4226                // path compression; iterating + mutating concurrently
4227                // would alias-borrow.
4228                let snapshot_keys: Vec<NodeId> = reg.registered_nodes();
4229                let component_nodes: Vec<NodeId> = snapshot_keys
4230                    .into_iter()
4231                    .filter(|&node| reg.find(node) == original_root)
4232                    .collect();
4233                let component_set: HashSet<NodeId> = component_nodes.iter().copied().collect();
4234                // Collect dep edges within the component (post-removal).
4235                // Edge convention: `(parent, child)` data-flow direction.
4236                let mut edges_in_component: Vec<(NodeId, NodeId)> = Vec::new();
4237                for &node in &component_nodes {
4238                    if let Some(rec) = s.nodes.get(&node) {
4239                        for d in rec.dep_records.iter().map(|r| r.node) {
4240                            if component_set.contains(&d) {
4241                                edges_in_component.push((d, node));
4242                            }
4243                        }
4244                    }
4245                }
4246                let keep_side_nodes: Vec<NodeId> = visited.iter().copied().collect();
4247                reg.split_partition(&component_nodes, &keep_side_nodes, &edges_in_component);
4248                // Marker call kept for symmetry with `union_nodes` — the
4249                // registry's `on_edge_removed` is itself a no-op (Phase F
4250                // moved the actual work into Core where the dep-graph
4251                // view is available).
4252                reg.on_edge_removed(n, removed_dep);
4253            }
4254        }
4255        // Drop the state lock before run_wave (which acquires its own) and
4256        // before crossing the binding boundary for the F1 refcount-fix
4257        // releases. Keeps the lock-discipline split (binding calls outside
4258        // the state lock) consistent with the rest of the dispatcher.
4259        drop(s);
4260        // Slice E2 (D067): fire OnRerun lock-released for dynamic nodes
4261        // that had previously fired. The cleanup closure cleans up
4262        // resources tied to the old dep shape before the next fn-fire
4263        // (triggered by added-dep push-on-subscribe below) registers a
4264        // fresh cleanup spec. Direct fire (NOT via deferred_cleanup_hooks)
4265        // because set_deps may NOT enter a wave (no added deps → no
4266        // run_wave below) — queueing the hook would orphan it until the
4267        // next unrelated wave drains.
4268        if fire_set_deps_on_rerun {
4269            self.binding.cleanup_for(n, CleanupTrigger::OnRerun);
4270        }
4271        // Fire topology event after lock is dropped.
4272        self.fire_topology_event(&crate::topology::TopologyEvent::DepsChanged {
4273            node: n,
4274            old_deps: old_deps_vec,
4275            new_deps: new_deps_vec.clone(),
4276        });
4277        if !added_for_wave.is_empty() {
4278            // Slice Y1 / Phase E: push-on-subscribe wave runs on `n`'s
4279            // touched partitions. Added deps are now unioned with `n`
4280            // (Phase C P12 fix moved registry mutation inside the state
4281            // lock), so any cascade through them stays in `n`'s partition
4282            // set as walked by `compute_touched_partitions`.
4283            self.run_wave_for(n, |this| {
4284                let mut s = this.lock_state();
4285                // Defensive: re-validate `n` still exists and isn't terminal.
4286                // A concurrent path could have terminated it between
4287                // validation and run_wave_for's partition-lock acquisition.
4288                if !s.nodes.contains_key(&n) || s.require_node(n).terminal.is_some() {
4289                    return;
4290                }
4291                for added_dep in &added_for_wave {
4292                    // Re-read cache under the wave-owner-held lock — this
4293                    // is the post-validation, post-concurrent-action
4294                    // snapshot. NO_HANDLE means the dep was invalidated
4295                    // concurrently; skip (no data to push).
4296                    let cache = match s.nodes.get(added_dep) {
4297                        Some(rec) => rec.cache,
4298                        None => continue, // dep deleted concurrently
4299                    };
4300                    if cache == NO_HANDLE {
4301                        continue;
4302                    }
4303                    let dep_idx = s.require_node(n).dep_index_of(*added_dep);
4304                    if let Some(idx) = dep_idx {
4305                        this.deliver_data_to_consumer(&mut s, n, idx, cache);
4306                    }
4307                }
4308            });
4309        }
4310        for h in removed_handles {
4311            self.binding.release_handle(h);
4312        }
4313        Ok(())
4314    }
4315
4316    /// DFS from `from` along data-flow edges (children map) looking for `to`.
4317    /// Returns the path including endpoints, or `None` if unreachable. Used
4318    /// for cycle detection in [`Self::set_deps`].
4319    fn path_from_to(&self, s: &CoreState, from: NodeId, to: NodeId) -> Option<Vec<NodeId>> {
4320        if from == to {
4321            return Some(vec![from]);
4322        }
4323        let mut stack: Vec<(NodeId, Vec<NodeId>)> = vec![(from, vec![from])];
4324        let mut visited: HashSet<NodeId> = HashSet::new();
4325        while let Some((cur, path)) = stack.pop() {
4326            if !visited.insert(cur) {
4327                continue;
4328            }
4329            if cur == to {
4330                return Some(path);
4331            }
4332            if let Some(children) = s.children.get(&cur) {
4333                for &child in children {
4334                    let mut new_path = path.clone();
4335                    new_path.push(child);
4336                    stack.push((child, new_path));
4337                }
4338            }
4339        }
4340        None
4341    }
4342}
4343
4344// CoreState helpers — kept on the inner struct so they're naturally scoped
4345// to the lock guard.
4346impl CoreState {
4347    fn alloc_node_id(&mut self) -> NodeId {
4348        let id = NodeId::new(self.next_node_id);
4349        self.next_node_id += 1;
4350        id
4351    }
4352
4353    fn alloc_sub_id(&mut self) -> SubscriptionId {
4354        let id = SubscriptionId(self.next_subscription_id);
4355        self.next_subscription_id += 1;
4356        id
4357    }
4358
4359    /// Clear wave-scoped flags and rotate per-dep batch data on every
4360    /// node. Run at the end of every wave (regular drain via `run_wave`,
4361    /// activation drain via `activate_derived`, and `BatchGuard::drop`'s
4362    /// drain). Centralized so a future wave-state field can't be missed
4363    /// at one of the cleanup sites.
4364    ///
4365    /// Per-dep rotation (R2.9.b / R1.3.6.b):
4366    /// - `prev_data` ← last element of `data_batch` (or unchanged if empty).
4367    ///   The last batch entry's retain transfers to `prev_data`; the old
4368    ///   `prev_data`'s retain is released. All earlier batch entries are
4369    ///   released.
4370    /// - `data_batch` cleared.
4371    /// - Per-dep `dirty` and `involved_this_wave` cleared.
4372    ///
4373    /// Handle releases are pushed to `deferred_handle_releases` for
4374    /// post-lock-drop release by the caller.
4375    pub(crate) fn clear_wave_state(&mut self) {
4376        self.pending_auto_resolve.clear();
4377        // A6 (Slice F, 2026-05-07): currently_firing is push/pop balanced
4378        // by FiringGuard's RAII (including on panic). It should already be
4379        // empty here, but defensively clear in case a future code path
4380        // forgets the guard.
4381        self.currently_firing.clear();
4382        // A3 (Slice F, 2026-05-07): pending_pause_overflow is normally
4383        // drained by drain_and_flush via the synthesis loop. If a wave is
4384        // panic-discarded BEFORE the synthesis runs (e.g. invoke_fn panics
4385        // before a paused-overflow has a chance to synthesize), we drop the
4386        // queued entries silently — the binding never sees ERROR for that
4387        // overflow event, but the pause buffer's `dropped` count is
4388        // unchanged so callers can still detect via ResumeReport. Re-firing
4389        // the synthesis on the next wave would be confusing (the overflow
4390        // event is logically scoped to the panicked wave).
4391        self.pending_pause_overflow.clear();
4392        // Slice G: tier3 emit tracking is wave-scoped.
4393        self.tier3_emitted_this_wave.clear();
4394        // Slice E2 (D057): per-wave-per-node OnInvalidate dedup is
4395        // wave-scoped — cleared so the next wave can fire cleanups again.
4396        self.invalidate_hooks_fired_this_wave.clear();
4397        // Slice E2 INVARIANT (DO NOT CHANGE WITHOUT THINKING):
4398        // `deferred_cleanup_hooks` is NOT cleared here. It follows the
4399        // `deferred_handle_releases` discipline:
4400        //   - SUCCESS path (`BatchGuard::drop` non-panic): drained by
4401        //     `Core::drain_deferred` AFTER `clear_wave_state` runs, then
4402        //     fired lock-released by `Core::fire_deferred`.
4403        //   - PANIC-DISCARD path (`BatchGuard::drop` panic): explicitly
4404        //     `std::mem::take`-and-dropped AFTER `clear_wave_state` runs,
4405        //     silently per D061.
4406        // Clearing it INSIDE `clear_wave_state` would race the success
4407        // path: the wave's queued `OnInvalidate` cleanup hooks would be
4408        // erased BEFORE `drain_deferred` could take them, dropping every
4409        // user cleanup callback on every successful wave.
4410        // If a future change moves `deferred_cleanup_hooks` ownership
4411        // here, ALSO move the post-`clear_wave_state` take in both
4412        // BatchGuard paths to BEFORE the clear call. Until then, leaving
4413        // the field untouched here is load-bearing.
4414        for rec in self.nodes.values_mut() {
4415            rec.dirty = false;
4416            rec.involved_this_wave = false;
4417            for dr in &mut rec.dep_records {
4418                let batch_len = dr.data_batch.len();
4419                if batch_len > 0 {
4420                    // Release all batch entries EXCEPT the last — the last
4421                    // entry's retain transfers to prev_data.
4422                    for &h in &dr.data_batch[..batch_len - 1] {
4423                        self.deferred_handle_releases.push(h);
4424                    }
4425                    // Release the OLD prev_data (its retain was from the
4426                    // previous wave's rotation or from initial delivery).
4427                    if dr.prev_data != NO_HANDLE {
4428                        self.deferred_handle_releases.push(dr.prev_data);
4429                    }
4430                    // Rotate: last batch entry becomes new prev_data.
4431                    // Its retain carries over — no extra retain needed.
4432                    dr.prev_data = dr.data_batch[batch_len - 1];
4433                    dr.data_batch.clear();
4434                }
4435                dr.dirty = false;
4436                dr.involved_this_wave = false;
4437            }
4438        }
4439    }
4440
4441    pub(crate) fn require_node(&self, id: NodeId) -> &NodeRecord {
4442        self.nodes
4443            .get(&id)
4444            .unwrap_or_else(|| panic!("unknown node {id:?}"))
4445    }
4446
4447    pub(crate) fn require_node_mut(&mut self, id: NodeId) -> &mut NodeRecord {
4448        self.nodes
4449            .get_mut(&id)
4450            .unwrap_or_else(|| panic!("unknown node {id:?}"))
4451    }
4452}
4453
4454/// Release every binding-side refcount share owned by this `CoreState`
4455/// when the last `Core` clone drops the inner Mutex.
4456///
4457/// Without this, every retained handle in `cache` / `terminal` Error /
4458/// `dep_terminals` Error / pause-buffer-payload would leak in the binding
4459/// registry until process exit. Production bindings (napi-rs, pyo3,
4460/// wasm-bindgen) all maintain handle-ref maps that grow unbounded without
4461/// this cleanup.
4462///
4463/// Safe to call during panic unwinding — `BindingBoundary::release_handle`
4464/// is the only call, and a panicking binding during cleanup would already
4465/// have been a problem in normal operation.
4466impl Drop for CoreState {
4467    fn drop(&mut self) {
4468        // Drain pending in-flight retains too, so a panic mid-wave doesn't
4469        // strand the queue_notify retains in `deferred_handle_releases`.
4470        let pending = std::mem::take(&mut self.pending_notify);
4471        let deferred_releases = std::mem::take(&mut self.deferred_handle_releases);
4472        // `deferred_flush_jobs` carries `Vec<Sink>` clones — those Arcs
4473        // drop naturally when this CoreState drops; no handles to release
4474        // there.
4475        let _ = std::mem::take(&mut self.deferred_flush_jobs);
4476
4477        // Per-node retained handles:
4478        //   - `cache` (1 retain per non-NO_HANDLE state cache or
4479        //     populated compute cache).
4480        //   - `terminal == Some(Error(h))` (1 retain on the terminal slot).
4481        //   - `dep_terminals[i] == Some(Error(h))` (1 retain per consumer's
4482        //     terminated-dep slot).
4483        //   - `pause_state` paused buffer messages with payload handles
4484        //     (1 retain per buffered Data/Error).
4485        for rec in self.nodes.values_mut() {
4486            if rec.cache != NO_HANDLE {
4487                self.binding.release_handle(rec.cache);
4488            }
4489            if let Some(TerminalKind::Error(h)) = rec.terminal {
4490                self.binding.release_handle(h);
4491            }
4492            for dr in &rec.dep_records {
4493                if let Some(TerminalKind::Error(h)) = dr.terminal {
4494                    self.binding.release_handle(h);
4495                }
4496                // Release data_batch retains (in-flight wave data).
4497                for &h in &dr.data_batch {
4498                    self.binding.release_handle(h);
4499                }
4500                // Release prev_data retain (cross-wave persistence).
4501                if dr.prev_data != NO_HANDLE {
4502                    self.binding.release_handle(dr.prev_data);
4503                }
4504            }
4505            if let PauseState::Paused { buffer, .. } = &rec.pause_state {
4506                for msg in buffer {
4507                    if let Some(h) = msg.payload_handle() {
4508                        self.binding.release_handle(h);
4509                    }
4510                }
4511            }
4512            // Slice E1: release replay-buffer retains.
4513            for &h in &rec.replay_buffer {
4514                self.binding.release_handle(h);
4515            }
4516            // Operator scratch (Slice C-3, D026): generic per-operator
4517            // state struct. Each variant's release_handles releases the
4518            // shares it owns (Scan/Reduce acc, Distinct/Pairwise prev,
4519            // Last latest + default; Take/Skip/TakeWhile own no handles).
4520            if let Some(scratch) = rec.op_scratch.as_mut() {
4521                scratch.release_handles(&*self.binding);
4522            }
4523        }
4524
4525        // Pending wave retains. Slice X4 / D2: walk all batches' messages
4526        // — `iter_messages` flattens the per-node `SmallVec<PendingBatch>`.
4527        for entry in pending.values() {
4528            for msg in entry.iter_messages() {
4529                if let Some(h) = msg.payload_handle() {
4530                    self.binding.release_handle(h);
4531                }
4532            }
4533        }
4534        for h in deferred_releases {
4535            self.binding.release_handle(h);
4536        }
4537        // Wave-cache snapshot retains (defensive — should normally be
4538        // empty by the time Core drops, but a panicked-mid-wave Core
4539        // could leave them populated).
4540        let snapshots = std::mem::take(&mut self.wave_cache_snapshots);
4541        for (_, h) in snapshots {
4542            self.binding.release_handle(h);
4543        }
4544    }
4545}