graphrefly_core/node.rs
1//! The dispatcher — node registration, subscription, wave engine.
2//!
3//! Mirrors `~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`
4//! (the Phase 13.6 brainstorm prototype, ~370 lines, 22 invariant tests).
5//!
6//! # Scope (M1 dispatcher + Slice A+B parity, closed 2026-05-05)
7//!
8//! - State + derived + dynamic node registration.
9//! - Subscribe / unsubscribe with push-on-subscribe (R1.2.3).
10//! - RAII [`Subscription`] with Drop-based deregister (§10.12).
11//! - DIRTY → DATA / RESOLVED ordering (R1.3.1.b two-phase push).
12//! - Equals-substitution (R1.3.2): identity is zero-FFI; custom crosses boundary.
13//! - First-run gate (R2.5.3) — fn does not fire until every dep has a handle.
14//! - Diamond resolution — one fn fire per wave even with shared upstream.
15//! - `set_deps()` atomic dep mutation with cycle detection + Phase 13.8 Q1
16//! terminal-rejection policy (R3.3.1).
17//! - PAUSE / RESUME with lockId set + replay buffer (R1.2.6, R2.6, §10.2).
18//! - INVALIDATE broadcast + cascade with R1.4 idempotency.
19//! - COMPLETE / ERROR cascade + Lock 2.B auto-cascade gating
20//! (ERROR dominates COMPLETE; first error wins).
21//! - TEARDOWN auto-precedes COMPLETE (R2.6.4 / Lock 6.F) +
22//! `has_received_teardown` idempotency.
23//! - Meta TEARDOWN ordering (R1.3.9.d) — companions tear down before parent.
24//! - Resubscribable terminal lifecycle (R2.2.7, R2.5.3) — late subscribe to a
25//! resubscribable terminal node resets lifecycle, except after TEARDOWN
26//! (per F3 audit guard: TEARDOWN is permanent).
27//!
28//! # Module split (Slice C-1, 2026-05-05)
29//!
30//! Wave-engine internals (drain loop, fire selection, emission commit, sink
31//! dispatch) live in [`crate::batch`]. The split is purely organizational —
32//! the methods are still on `Core`. See `batch.rs` for the wave-engine
33//! entry points (`run_wave`, `drain_and_flush`, `commit_emission`,
34//! `queue_notify`, `deliver_data_to_consumer`).
35//!
36//! # Out of scope (later slices / milestones)
37//!
38//! - Deactivation cleanup (RAM nodes clear cache when sink count → 0) — M2.
39//!
40//! See [`migration-status.md`](../../../docs/migration-status.md) for the
41//! milestone tracker and [`porting-deferred.md`](../../../docs/porting-deferred.md)
42//! for surfaced concerns deferred to evidence-driven slices.
43//!
44//! # Re-entrance discipline (Slice A close, M1: fully lock-released)
45//!
46//! - **Wave-end sink fires** drop the state lock first. A subscriber's sink
47//! that calls back into `Core::emit` / `pause` / `resume` / `invalidate` /
48//! `complete` / `error` / `teardown` re-acquires the lock cleanly and runs
49//! a nested wave (the one-Core-per-OS-thread `in_tick` ownership slot is
50//! cleared by the owning `BatchGuard::drop` before the deferred-fire
51//! phase).
52//! - **`BindingBoundary::invoke_fn`** fires lock-released. The wave engine
53//! acquires + drops the state lock per fn-fire iteration around the
54//! `invoke_fn` callback. User fns may re-enter `Core::emit` / `pause` /
55//! etc. and run a nested wave.
56//! - **`BindingBoundary::custom_equals`** fires lock-released.
57//! `commit_emission` brackets the equals check around a lock release;
58//! custom equals oracles may re-enter Core safely.
59//! - **Subscribe-time handshake** also fires lock-released.
60//! [`Core::subscribe`] installs the sink under the state lock, drops
61//! it, then fires the per-tier handshake (`[Start]` / `[Data(cache)]?`
62//! / `[Complete]?` / `[Error(h)]?` / `[Teardown]?` per R1.3.5.a)
63//! lock-released. A handshake-time sink callback may re-enter Core
64//! via the owner-side mailbox/`DeferQueue` seam (`emit` / `complete` /
65//! `error` / `subscribe`); the owner drain applies it as a nested
66//! wave. Post-S4 `Core` is single-owner `!Send + !Sync` — there is no
67//! `wave_owner` mutex and no cross-thread block; R1.3.5.a
68//! happens-after ordering holds because the one owner thread installs
69//! the sink (state lock) before any subsequent emit's flush observes
70//! it (`subscribers_revision` freeze, Slice X4/D2).
71
72use std::collections::VecDeque;
73use std::panic::{catch_unwind, AssertUnwindSafe};
74use std::sync::atomic::{AtomicU64, Ordering};
75use std::sync::Arc;
76
77use ahash::{AHashMap as HashMap, AHashSet as HashSet};
78use parking_lot::Mutex;
79
80// D246/S2c: `WaveOwnerGuard` (the §7 per-group `parking_lot::ReentrantMutex`
81// wave-lock wrapper) is deleted — single-owner ⇒ no cross-thread wave
82// serialization.
83use smallvec::SmallVec;
84use thiserror::Error;
85
86use crate::boundary::{BindingBoundary, CleanupTrigger};
87use crate::clock::monotonic_ns;
88use crate::handle::{FnId, HandleId, LockId, NodeId, NO_HANDLE};
89use crate::message::Message;
90
91/// Terminal-lifecycle state — once set on a node, the node will not emit
92/// further DATA; per-dep slots on consumers also use this to track which
93/// upstreams have terminated (R1.3.4 / Lock 2.B).
94///
95/// `Error` carries a [`HandleId`] resolving to the error value. Refcount is
96/// retained when the variant is stored in a node's `terminal` slot or any
97/// consumer's `dep_terminals` slot; v1 does not release these (terminal
98/// state is one-shot at this layer; release happens on resubscribable
99/// terminal-lifecycle reset, a separate slice).
100#[derive(Copy, Clone, Debug, PartialEq, Eq)]
101pub enum TerminalKind {
102 Complete,
103 Error(HandleId),
104}
105
106/// Node kind discriminant — **derived metadata** computed from
107/// [`NodeRecord`]'s field shape (D030 unification, Slice D).
108///
109/// Core no longer stores `kind` as a field; it's computed on demand from
110/// `(deps.is_empty(), fn_id.is_some(), op.is_some(), is_dynamic)`,
111/// mirroring TS's data model where `NodeImpl` has no `_kind` field. The
112/// shape uniquely identifies the kind:
113///
114/// | deps | fn_id | op | is_dynamic | kind |
115/// |-----------|-------|------|-----------|----------|
116/// | empty | None | None | - | State |
117/// | empty | Some | None | - | Producer |
118/// | non-empty | Some | None | false | Derived |
119/// | non-empty | Some | None | true | Dynamic |
120/// | non-empty | None | Some | - | Operator |
121///
122/// Public API ([`Core::kind_of`]) derives this enum on each call. State
123/// nodes are ROM (cache survives deactivation); compute nodes
124/// (Derived / Dynamic / Operator) and producers are RAM.
125#[derive(Copy, Clone, Eq, PartialEq, Debug)]
126pub enum NodeKind {
127 /// Source node: cache is intrinsic, no fn, no deps. Mutated via [`Core::emit`].
128 State,
129 /// Producer node: fn fires once on first subscribe. No deps;
130 /// emissions arrive via sinks the fn subscribes to (zip / concat /
131 /// race / takeUntil pattern). Slice D / D031.
132 Producer,
133 /// Derived node: fn fires on every dep change; all deps tracked.
134 Derived,
135 /// Dynamic node: fn declares which dep indices it actually read this run.
136 /// Untracked dep updates flow through cache but do NOT re-fire fn.
137 Dynamic,
138 /// Operator node: built-in dispatch path for transform / combine /
139 /// flow / resilience operators. The `OperatorOp` discriminant selects
140 /// the per-operator FFI path ([`BindingBoundary::project_each`] etc.);
141 /// Core manages per-operator state via the generic `op_scratch` slot
142 /// on `NodeRecord` (D026). Per Slice C-1 (D009) / Slice C-3 (D026).
143 Operator(OperatorOp),
144}
145
146impl NodeKind {
147 /// True if this kind opts OUT of Lock 2.B auto-cascade. Operator(Reduce)
148 /// and Operator(Last) must intercept upstream COMPLETE so they can emit
149 /// their accumulator / buffered value before the cascade terminates them;
150 /// instead of cascading, terminate_node queues such children for fn-fire
151 /// so `fire_operator` can handle the terminal.
152 pub(crate) fn skips_auto_cascade(self) -> bool {
153 matches!(
154 self,
155 NodeKind::Operator(
156 OperatorOp::Reduce { .. } | OperatorOp::Last { .. } | OperatorOp::Valve
157 )
158 )
159 }
160}
161
162/// Built-in operator discriminant. Selects the per-operator dispatch path
163/// in `fire_operator` (`crates/graphrefly-core/src/batch.rs`). Each variant
164/// carries the binding-side closure ids (and seed handle for stateful
165/// folders) needed for the wave-execution path; Core stores no user values
166/// itself per the handle-protocol cleaving plane.
167#[derive(Copy, Clone, Eq, PartialEq, Debug)]
168pub enum OperatorOp {
169 /// `map(source, project)` — element-wise transform. Calls
170 /// `BindingBoundary::project_each(fn_id, &inputs)` per fire; emits each
171 /// returned handle via `commit_emission_verbatim` (R1.3.2.d batch
172 /// semantics — no equals substitution between batch entries).
173 Map { fn_id: FnId },
174 /// `filter(source, predicate)` — silent-drop selection (D012/D018).
175 /// Calls `BindingBoundary::predicate_each(fn_id, &inputs)`; emits each
176 /// passing input verbatim. If zero pass on a wave that dirtied the
177 /// node, queues a single `RESOLVED` to settle (D018).
178 Filter { fn_id: FnId },
179 /// `scan(source, fold, seed)` — left-fold emitting each new accumulator.
180 /// `seed` is captured at registration; `acc` lives in
181 /// [`ScanState`](super::op_state::ScanState) inside
182 /// [`NodeRecord::op_scratch`] and persists across waves until
183 /// resubscribable reset. Calls `BindingBoundary::fold_each(fn_id, acc,
184 /// &inputs) -> SmallVec<HandleId>` per fire.
185 Scan { fn_id: FnId, seed: HandleId },
186 /// `reduce(source, fold, seed)` — left-fold emitting once on upstream
187 /// COMPLETE. Accumulates silently while source DATA flows; on
188 /// dep[0].terminal == Some(Complete), emits `[Data(acc), Complete]`.
189 /// On `Error(h)`, propagates the error verbatim. Opts out of Lock 2.B
190 /// auto-cascade (see `NodeKind::skips_auto_cascade`).
191 Reduce { fn_id: FnId, seed: HandleId },
192 /// `distinctUntilChanged(source, equals)` — suppresses adjacent
193 /// duplicates. Calls `BindingBoundary::custom_equals(equals_fn_id,
194 /// prev, current)` per input; emits non-equal items verbatim and
195 /// updates `prev`. If zero items pass on a wave that dirtied the node,
196 /// queues `RESOLVED` (matches Filter discipline).
197 DistinctUntilChanged { equals_fn_id: FnId },
198 /// `pairwise(source)` — emits `(prev, current)` pairs starting after
199 /// the second value. First value swallowed (sets `prev`). Calls
200 /// `BindingBoundary::pairwise_pack(fn_id, prev, current)` per pair to
201 /// produce the binding-side tuple handle.
202 Pairwise { fn_id: FnId },
203
204 // ----- Slice C-2: multi-dep combinators (D020) -----
205 /// `combine(...sources)` — N-dep combineLatest. On any dep fire, packs
206 /// the latest handle per dep into a single tuple handle via
207 /// `BindingBoundary::pack_tuple(pack_fn, &handles)`. First-run gate
208 /// (`partial: false` default) holds until all deps deliver real DATA
209 /// (R2.5.3). COMPLETE cascades when all deps complete (R1.3.4.b).
210 Combine { pack_fn: FnId },
211
212 /// `withLatestFrom(primary, secondary)` — 2-dep, fire-on-primary-only
213 /// (D021, Phase 10.5). Packs `[primary, secondary]` via
214 /// `BindingBoundary::pack_tuple(pack_fn, &handles)` when dep[0]
215 /// (primary) has DATA in the wave. If only dep[1] (secondary) fires,
216 /// settles with RESOLVED (D018 pattern). First-run gate holds until
217 /// both deps deliver (R2.5.3 `partial: false`). Post-warmup INVALIDATE
218 /// guard: if secondary `prev_data == NO_HANDLE` and batch empty after
219 /// warmup, settles with RESOLVED (no stale pair).
220 WithLatestFrom { pack_fn: FnId },
221
222 /// `merge(...sources)` — N-dep, forward all DATA handles verbatim
223 /// (D022). Zero FFI on fire: no transformation, no binding call.
224 /// Each dep's batch handles are retained and emitted individually.
225 /// COMPLETE cascades when all deps complete (R1.3.4.b).
226 Merge,
227
228 // ----- Slice C-3: flow operators (D024) -----
229 /// `take(source, count)` — emits the first `count` DATA values then
230 /// self-completes via `Core::complete`. Tracks `count_emitted` in
231 /// [`TakeState`](super::op_state::TakeState). When upstream completes
232 /// before `count` is reached, the standard auto-cascade propagates
233 /// COMPLETE. `count == 0` is allowed: the first fire emits zero
234 /// items then immediately self-completes (D027).
235 Take { count: u32 },
236
237 /// `skip(source, count)` — drops the first `count` DATA values; once
238 /// the threshold is crossed, subsequent DATAs pass through verbatim.
239 /// Tracks `count_skipped` in [`SkipState`](super::op_state::SkipState).
240 /// On a wave where every input is still in the skip window, queues
241 /// DIRTY+RESOLVED to settle (D018 pattern).
242 Skip { count: u32 },
243
244 /// `takeWhile(source, predicate)` — emits while `predicate(input)`
245 /// holds; on the first `false`, emits any preceding passes then
246 /// self-completes via `Core::complete`. Reuses
247 /// [`BindingBoundary::predicate_each`] (D029); after the first
248 /// `false`, subsequent inputs in the same batch are dropped.
249 TakeWhile { fn_id: FnId },
250
251 /// `last(source)` / `last_with_default(source, default)` — buffers
252 /// the latest DATA; on upstream COMPLETE, emits `Data(latest)` then
253 /// `Complete`. The `default` field is `NO_HANDLE` for the no-default
254 /// factory (emits only `Complete` on empty stream), or a registered
255 /// default handle (emits `Data(default)` + `Complete` on empty
256 /// stream). Storage: [`LastState`](super::op_state::LastState) holds
257 /// `latest` (live buffer) and `default` (registration-time, stable).
258 /// Opts out of Lock 2.B auto-cascade so it can intercept upstream
259 /// COMPLETE.
260 Last { default: HandleId },
261
262 // ----- Slice U: control operators (D047) -----
263 /// `tap(source, fn)` — side-effect passthrough. Calls
264 /// `BindingBoundary::invoke_tap_fn(fn_id, handle)` on each input DATA,
265 /// then emits the input handle unchanged. Zero-transform: output
266 /// handles are the inputs verbatim (no equals substitution, no
267 /// allocation).
268 Tap { fn_id: FnId },
269
270 /// `tapFirst(source, fn)` — one-shot side-effect on first DATA. Same
271 /// as [`Tap`](Self::Tap) but fires `invoke_tap_fn` only once; after
272 /// the first fire, subsequent DATA passes through without a callback.
273 /// State: [`TapFirstState`](super::op_state::TapFirstState) tracks
274 /// `fired: bool`.
275 TapFirst { fn_id: FnId },
276
277 /// `valve(source, control)` — conditional forward. 2-dep: dep[0] is
278 /// source, dep[1] is boolean control. When the latest control value
279 /// is truthy (non-zero handle), forwards source DATA; when falsy,
280 /// settles with RESOLVED. Partial mode so it fires on control-alone
281 /// before source has delivered. Does NOT auto-complete on control
282 /// terminal (`completeWhenDepsComplete: false` equivalent).
283 Valve,
284
285 /// `settle(source, quietWaves, maxWaves)` — convergence detector.
286 /// Forwards each upstream DATA, counts consecutive no-change waves,
287 /// and self-completes when `quiet_count >= quiet_waves` (or
288 /// `wave_count >= max_waves` if set). State:
289 /// [`SettleState`](super::op_state::SettleState).
290 Settle {
291 quiet_waves: u32,
292 max_waves: Option<u32>,
293 },
294}
295
296/// Registration options for [`Core::register_operator`].
297///
298/// `equals` controls operator output dedup (R5.7 — defaults to identity).
299/// `partial` controls the R2.5.3 first-run gate (R5.4 — operator dispatch
300/// fires on first DATA from any dep when `true`; default `false` matches
301/// the gated derived discipline).
302#[derive(Copy, Clone, Debug)]
303pub struct OperatorOpts {
304 pub equals: EqualsMode,
305 pub partial: bool,
306}
307
308impl Default for OperatorOpts {
309 fn default() -> Self {
310 Self {
311 equals: EqualsMode::Identity,
312 partial: false,
313 }
314 }
315}
316
317/// Closure-form fn id OR typed operator discriminant — the two dispatch
318/// paths a node can use. State / passthrough nodes pass `None` to
319/// [`Core::register`] (no fn at all).
320#[derive(Copy, Clone, Debug)]
321pub enum NodeFnOrOp {
322 /// Closure-form: invokes [`BindingBoundary::invoke_fn`] per fire.
323 /// Used for Derived / Dynamic / Producer.
324 Fn(FnId),
325 /// Typed-op: routes to a `fire_op_*` helper that calls per-operator
326 /// FFI methods (`project_each` / `predicate_each` / `fold_each` /
327 /// `pairwise_pack` / `pack_tuple`). Used for Operator nodes.
328 Op(OperatorOp),
329}
330
331/// Pause behavior mode (canonical-spec §2.6 — three modes shipped in TS;
332/// Slice F audit, 2026-05-07 — closed the Rust port gap).
333///
334/// | Mode | Outgoing tier-3 routing while paused | RESUME behavior |
335/// |---|---|---|
336/// | [`PausableMode::Default`] | suppress fn-fire upstream (no DIRTY emitted) | fire fn ONCE on RESUME if any dep delivered DATA during pause; collapses N pause-window writes into one settle |
337/// | [`PausableMode::ResumeAll`] | buffer outgoing tier-3 / tier-4 messages per-wave | replay each buffered wave verbatim on RESUME |
338/// | [`PausableMode::Off`] | dispatcher ignores PAUSE; tier-3 flushes immediately | no-op (no buffer to drain) |
339///
340/// Default is [`PausableMode::Default`] per canonical §2.6 — every untagged
341/// source picks it up. Memory profile is O(1) per node (no buffer); the
342/// trade-off is "subscribers see one consolidated DATA on RESUME" rather
343/// than the K mid-pause emissions verbatim.
344///
345/// Note: tier-1 (DIRTY) / tier-2 (PAUSE/RESUME) / tier-5 (COMPLETE/ERROR) /
346/// tier-6 (TEARDOWN) bypass pause regardless of mode — they remain
347/// observable so leaked pause-controllers cannot strand subscribers.
348#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
349pub enum PausableMode {
350 /// Suppress fn-fire while paused; fire once on RESUME if any dep
351 /// delivered DATA during the pause window. Canonical default.
352 #[default]
353 Default,
354 /// Buffer outgoing tier-3 / tier-4 messages per-wave; replay on
355 /// RESUME. Use when subscribers need verbatim emit history (e.g. an
356 /// audit log, replay-on-reconnect bridge).
357 ResumeAll,
358 /// Dispatcher ignores PAUSE for this node — tier-3 flushes
359 /// immediately even while a lock is held. Use for nodes whose value
360 /// production is intrinsically pause-immune (telemetry counters,
361 /// monotonic timers).
362 Off,
363}
364
365/// Per-kind opts for [`Core::register`]. Cross-kind config knobs live
366/// here; per-kind specifics (deps, fn_or_op) live on
367/// [`NodeRegistration`].
368#[derive(Copy, Clone, Debug)]
369pub struct NodeOpts {
370 /// Initial cached value. Only valid for state nodes (no deps + no
371 /// fn + no op). [`NO_HANDLE`] starts the node sentinel.
372 pub initial: HandleId,
373 /// Equality mode for outgoing emissions (R1.3.2). Defaults to
374 /// [`EqualsMode::Identity`].
375 pub equals: EqualsMode,
376 /// First-run gate (R2.5.3 / D011). When `true`, the node fires as
377 /// soon as ANY dep delivers a real handle; when `false` (default),
378 /// the node holds until every dep has delivered.
379 pub partial: bool,
380 /// Dynamic flag (R2.5.3) — fn declares actually-tracked dep indices
381 /// per fire. Only meaningful when `fn_or_op == Some(Fn(_))` AND
382 /// deps non-empty.
383 pub is_dynamic: bool,
384 /// Pause behavior mode (canonical §2.6). Default is
385 /// [`PausableMode::Default`]. See [`PausableMode`] for the trade-offs.
386 pub pausable: PausableMode,
387 /// Replay buffer cap (canonical R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
388 /// `None` (default) disables; `Some(N)` keeps a circular buffer of the
389 /// last N DATA emissions and replays them to late subscribers as part
390 /// of the per-tier handshake (between [`Message::Start`] and any
391 /// terminal slice). Only DATA is buffered; RESOLVED entries are NOT
392 /// (R2.6.5 explicit "DATA only").
393 pub replay_buffer: Option<usize>,
394 /// D263 — when `true`, the `fire_fn` first-run gate (R2.5.3) treats a
395 /// terminal dep as "real input" so the node fires even if the only
396 /// signal a dep ever delivered was a `COMPLETE` (no DATA). Mirrors the
397 /// unconditional `fire_operator` semantics (`fire_operator`'s gate
398 /// already counts dep terminals as real input — e.g. for `Reduce`).
399 /// Default `false` preserves the historical gate (sentinel deps hold
400 /// the node until they deliver real DATA). NOT yet widened onto the
401 /// `Impl` parity contract per D196 + the parity-tests comment
402 /// ("NOT widened onto Impl"); kept substrate-internal until a
403 /// cross-arm scenario surfaces.
404 pub terminal_as_real_input: bool,
405}
406
407impl Default for NodeOpts {
408 fn default() -> Self {
409 Self {
410 initial: NO_HANDLE,
411 equals: EqualsMode::Identity,
412 partial: false,
413 is_dynamic: false,
414 pausable: PausableMode::Default,
415 replay_buffer: None,
416 terminal_as_real_input: false,
417 }
418 }
419}
420
421/// Unified node-registration descriptor (D030, Slice D).
422///
423/// All node kinds (State / Producer / Derived / Dynamic / Operator)
424/// register through [`Core::register`] with a `NodeRegistration`. The
425/// kind is **derived from the field shape** of the registration —
426/// `(deps.is_empty(), fn_or_op variant)`:
427///
428/// | deps | fn_or_op | is_dynamic | resulting kind |
429/// |-----------|-----------|-----------|----------------|
430/// | empty | None | - | State |
431/// | empty | Some(Fn) | - | Producer |
432/// | non-empty | Some(Fn) | false | Derived |
433/// | non-empty | Some(Fn) | true | Dynamic |
434/// | non-empty | Some(Op) | - | Operator |
435///
436/// The sugar wrappers ([`Core::register_state`], [`Core::register_producer`],
437/// etc.) build a `NodeRegistration` and delegate.
438#[derive(Clone, Debug)]
439pub struct NodeRegistration {
440 /// Upstream deps in declaration order. Empty for state / producer.
441 pub deps: Vec<NodeId>,
442 /// Closure-form fn id or typed-op discriminant. `None` for state /
443 /// passthrough.
444 pub fn_or_op: Option<NodeFnOrOp>,
445 /// Cross-kind config knobs.
446 pub opts: NodeOpts,
447}
448
449/// Equality mode for a node's outgoing emissions.
450///
451/// `Identity` is the default: cache vs. new handle compare is a `u64` equal —
452/// zero FFI. `Custom` invokes [`BindingBoundary::custom_equals`] every check
453/// (R1.3.2.b two-arg call when both sides are non-sentinel).
454#[derive(Copy, Clone, Debug)]
455pub enum EqualsMode {
456 Identity,
457 Custom(FnId),
458}
459
460/// Public identifier for a single subscription (S2b / D225: promoted
461/// from `pub(crate)`). Returned by [`Core::subscribe`] /
462/// [`Core::try_subscribe`]; pair it with the `node_id` you subscribed
463/// and pass both to [`Core::unsubscribe`] to deregister.
464///
465/// S2b retires the core-level RAII `Subscription`/`WeakCore` (D223:
466/// the `Core` is owned by value and relocates between workers, so a
467/// parameterless `Drop` cannot reach it without `Weak<C>`/`unsafe` —
468/// D225). Drop-convenience is a *binding-layer* RAII wrapper over
469/// [`Core::unsubscribe`], used only where the holder co-owns the `Core`
470/// on its affinity worker (test harness, napi `BenchCore`). Substrate
471/// callers (e.g. producer upstream-sub cleanup, D229) unsubscribe
472/// explicitly via the owner-driven chain.
473#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
474pub struct SubscriptionId(u64);
475
476/// Deregister `sub_id` from `node_id` and run the Phase G / lifecycle
477/// cleanup chain (OnDeactivation → `producer_deactivate` → `wipe_ctx` →
478/// Core cache-clear). Extracted verbatim from the former
479/// `Subscription::Drop` (D225 refined A2, S2a) so `Core::unsubscribe`
480/// (the synchronous owner-invoked path) and the legacy RAII `Drop`
481/// share ONE body. Operates on `&C` directly — the body already used
482/// only `&dyn BindingBoundary` (via `make_op_scratch_with_binding`),
483/// never a `Core`, so this is a behaviour-identical move.
484///
485/// Producer deactivation (Slice D, D031): if removing this sub empties
486/// the subscribers map AND the node is a producer, fire
487/// `BindingBoundary::producer_deactivate(node_id)` AFTER releasing the
488/// state lock. The binding then drops its per-node state (subscriptions
489/// to upstream sources, captured closure state), which transitively
490/// unsubs from upstreams via their own unsubscribe. Re-entrance into
491/// Core from the deactivate hook is permitted since the lock is
492/// released first.
493#[allow(clippy::too_many_lines)] // Phase G is one continuous lifecycle hook chain (user cleanup → producer_deactivate → wipe_ctx → Core cache-clear); splitting it would obscure the ordering invariant.
494pub(crate) fn unsubscribe_sink(core: &Core, node_id: NodeId, sub_id: SubscriptionId) {
495 // Slice E2 (D056): when the last subscriber drops, fire the
496 // node's OnDeactivation cleanup hook BEFORE producer_deactivate
497 // (cleanup may release handles the producer subscription owns;
498 // reverse order would let producer_deactivate drop subs that user
499 // cleanup expected to be live). Both calls are lock-released per
500 // D045.
501 //
502 // OnDeactivation gating (D068, QA Q3 fix): fires only when the
503 // node has fired its fn at least once AND has a fn (`fn_id`
504 // populated). State nodes have no fn — they cannot register a
505 // cleanup spec via the production fn-return path (R2.4.5), so
506 // firing `cleanup_for` on them is wasted FFI; the binding's
507 // lookup is guaranteed to find no `current_cleanup`. Skipping
508 // here saves the FFI hop and matches the design-doc wording
509 // ("never-fired state nodes" — state-with-initial-value satisfies
510 // `has_fired_once = true` but still has no fn).
511 //
512 // Slice E2 /qa Q2(b) (D069): if the node is a resubscribable
513 // node that's ALREADY terminal (terminate fired BEFORE this last
514 // sub drop), fire `wipe_ctx` lock-released AFTER OnDeactivation
515 // + producer_deactivate. Mutually exclusive with `terminate_node`'s
516 // queue-wipe site: terminate-with-empty-subs goes through
517 // `pending_wipes`; terminate-with-live-subs routes here when
518 // those subs eventually drop. Either path fires exactly one
519 // wipe per terminal lifecycle.
520 let (was_last_sub, is_producer, has_user_cleanup, fire_wipe, binding) = {
521 // Step 2b-ii-B: route to the node's shard (this drop runs
522 // OUTSIDE any wave ⇒ ambient `None`; a grouped node's
523 // record is in shard `g`, so without this the unsubscribe
524 // would no-op on `DEFAULT_SHARD` → lost cleanup / leak).
525 let mut s = St::new(core);
526 let Some(rec) = s.nodes.get_mut(&node_id) else {
527 return;
528 };
529 rec.subscribers.remove(&sub_id);
530 // Slice X4 / D2: bump revision so any pending_notify entry for
531 // this node opened earlier in the wave starts a fresh batch on
532 // the next queue_notify, dropping the now-departed sink from
533 // the snapshot.
534 rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
535 let last = rec.subscribers.is_empty();
536 let producer = rec.is_producer();
537 // OnDeactivation gate: must have run a fn at least once
538 // (has_fired_once) AND have a fn registered (fn_id.is_some()).
539 // The fn_id check excludes state nodes whose has_fired_once
540 // tracks initial-value status, not "user fn ran."
541 let user_cleanup = rec.has_fired_once && rec.fn_id.is_some();
542 let fire_wipe = last && rec.resubscribable && rec.terminal.is_some();
543 // Phase G (D119/D120/D121, 2026-05-10): always clone the
544 // binding when last sub leaves so we can run the Core
545 // cache-clear after the existing hooks. The Arc::clone is
546 // cheap and dwarfed by the cost of the hooks themselves.
547 let binding = if last { Some(s.binding.clone()) } else { None };
548 (last, producer, user_cleanup, fire_wipe, binding)
549 };
550 if was_last_sub {
551 if let Some(binding) = binding {
552 if has_user_cleanup {
553 binding.cleanup_for(node_id, CleanupTrigger::OnDeactivation);
554 }
555 if is_producer {
556 // D229: hand the binding a `Core::unsubscribe`-capable
557 // closure over the `&C` already in scope (the owner's
558 // synchronous unsubscribe context — exactly what D225
559 // requires; no `Weak<C>`, no parameterless-`Drop`). The
560 // binding loops it over its recorded upstream
561 // `(NodeId, SubscriptionId)` pairs. Lock-released here, so
562 // the recursive `unsubscribe_sink` (re-entrant producer
563 // cascade) is safe — behaviour-identical to the retired
564 // `Subscription::Drop` cascade.
565 let unsub = |up_node: NodeId, up_sub: SubscriptionId| {
566 unsubscribe_sink(core, up_node, up_sub);
567 };
568 binding.producer_deactivate(node_id, &unsub);
569 }
570 // D069: eager wipe — fires AFTER OnDeactivation so the
571 // user closure observes pre-wipe `store` (matches the
572 // existing "OnDeactivation runs before wipe on terminal
573 // reset" invariant covered by test 10). Idempotent —
574 // `HashMap::remove` on absent key is a no-op, so even
575 // if the wave already drained `pending_wipes` earlier,
576 // this fire is benign.
577 if fire_wipe {
578 binding.wipe_ctx(node_id);
579 }
580
581 // Phase G (D118/D119/D120/D121, 2026-05-10) — Core
582 // cache-clear on deactivation, mirroring TS `_deactivate`
583 // (`pure-ts/src/core/node.ts:2185-2297`):
584 //
585 // 1. user `cleanup_for(OnDeactivation)` ← above
586 // 2. `producer_deactivate` ← above
587 // 3. `wipe_ctx` (resubscribable+terminal) ← above
588 // 4. **NEW: Core cache-clear** ← here
589 //
590 // Releases per-dep `prev_data` + `data_batch` retains +
591 // `dep_terminals` Error retains (the latter closes the
592 // long-standing "Non-resubscribable terminal Error
593 // handles leak via diamond cascade" porting-deferred
594 // entry — D121). Clears pause/replay buffers. Releases
595 // `cached` for compute nodes (R2.2.7 / R2.2.8 ROM:
596 // state nodes preserve cache; compute nodes clear).
597 // Keeps the per-node `terminal` slot intact (D121:
598 // producer-side terminal stays for late-subscriber
599 // R2.2.7.a reset or R2.2.7.b rejection).
600 //
601 // Lock-released release discipline: collect handles
602 // under the lock, drop the lock, fire `release_handle`
603 // outside (mirrors `Core::resume` Phase 2 + the
604 // existing F1 `set_deps` removed-handles release at
605 // node.rs:5003).
606 //
607 // Re-entrance safety (F1 / D123, /qa 2026-05-10): if the
608 // user `cleanup_for` / `producer_deactivate` / `wipe_ctx`
609 // hook re-subscribed via some path, `subscribers` is now
610 // non-empty. **Skip Phase G entirely in that case** —
611 // the new subscriber's handshake delivered the live
612 // `cache` handle to its sink and is holding a refcount
613 // share through `pending_notify`; clearing cache here
614 // would race with the new subscriber's wave and cause
615 // a use-after-release in bindings that reap registry
616 // slots at refcount-zero.
617 //
618 // Why this diverges from TS `_deactivate` (which clears
619 // unconditionally): TS runs the cleanup hook + cache
620 // clear as ONE sync block under the (implicit) JS
621 // single-thread mutex; there's no released-lock window
622 // for a re-subscribe to install a sub before the
623 // cache-clear runs. Rust's lock-released hook discipline
624 // (D045) opens that window, so the re-check is
625 // necessary to preserve refcount soundness.
626 //
627 // Re-acquire state lock atomically with the recheck so
628 // a concurrent thread cannot install a sub between the
629 // emptiness check and the cache mutation.
630 // F8 (/qa 2026-05-10): also take `op_scratch` so its
631 // retains release lock-released after the state-lock
632 // scope. Pre-F8 Phase G only released per-edge handles
633 // + the compute `cache`, leaving operator-internal
634 // retains (Last.latest, Scan.acc, Reduce.acc, etc.)
635 // in-place. For non-resubscribable nodes that never
636 // re-subscribe, this was a permanent leak —
637 // asymmetric with the per-edge cleanup. F8 closes that
638 // by taking the scratch alongside per-edge handles
639 // and calling `release_handles` lock-released.
640 //
641 // D-α (D028 full close, 2026-05-10): for resubscribable
642 // operator nodes, take the OLD scratch AND build a
643 // FRESH scratch via `make_op_scratch` (lock-held, but
644 // `binding.retain_handle` is a leaf operation per
645 // op_state.rs:69-80 docs), install the fresh scratch
646 // on `rec.op_scratch`, and push the old scratch to
647 // `pending_scratch_release` for deferred release. The
648 // queue drains on the next `reset_for_fresh_lifecycle`
649 // (after Phase 2 takes fresh retains — preserves the
650 // C-3 /qa P1 seed-aliasing-acc invariant) or on
651 // `Drop for CoreState`. The fresh install gives
652 // re-activation correct counter / acc state (Take.taken
653 // back to 0, Scan.acc back to seed, etc.) matching TS
654 // Lock 6.D ("resets on every deactivation").
655 let (to_release, scratch_to_release): (
656 Vec<HandleId>,
657 Option<Box<dyn crate::op_state::OperatorScratch>>,
658 ) = {
659 // Step 2b-ii-B: route to the node's shard (drop
660 // runs outside any wave; grouped node's record is
661 // in shard `g` — without this the scratch/refcount
662 // cleanup would no-op on `DEFAULT_SHARD`).
663 let mut s = St::new(core);
664 if let Some(rec) = s.nodes.get_mut(&node_id) {
665 // F1 re-entrance check.
666 if !rec.subscribers.is_empty() {
667 // A user hook re-subscribed during the
668 // lock-released window; the new lifecycle
669 // owns this node now. Phase G is a no-op.
670 return;
671 }
672 let mut handles: Vec<HandleId> = Vec::new();
673 // Per-dep state. Empty for state nodes.
674 for dr in &mut rec.dep_records {
675 if dr.prev_data != NO_HANDLE {
676 handles.push(dr.prev_data);
677 dr.prev_data = NO_HANDLE;
678 }
679 for h in dr.data_batch.drain(..) {
680 handles.push(h);
681 }
682 // D121: per-edge terminal-Error retain
683 // released here. Closes the cascade leak.
684 // The producer's own `rec.terminal` slot
685 // stays intact (preserved below).
686 if let Some(TerminalKind::Error(h)) = dr.terminal {
687 handles.push(h);
688 }
689 dr.terminal = None;
690 dr.dirty = false;
691 dr.involved_this_wave = false;
692 }
693 // Pause buffer DATA / replay buffer.
694 if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
695 for msg in buffer.drain(..) {
696 if let Some(h) = msg.payload_handle() {
697 handles.push(h);
698 }
699 }
700 }
701 for h in rec.replay_buffer.drain(..) {
702 handles.push(h);
703 }
704 // Pause locks drained → node back to Active.
705 rec.pause_state = PauseState::Active;
706 // R2.2.7 / R2.2.8 ROM: state nodes preserve
707 // cache; compute (fn or op) nodes clear.
708 // D119: state nodes keep `cached` because the
709 // value is intrinsic and non-volatile;
710 // resubscribe sees the same value.
711 let is_compute = rec.fn_id.is_some() || rec.op.is_some();
712 if is_compute && rec.cache != NO_HANDLE {
713 handles.push(rec.cache);
714 rec.cache = NO_HANDLE;
715 }
716 // Reset wave + lifecycle state so reactivation
717 // begins fresh. `terminal` STAYS (D121).
718 rec.has_fired_once = false;
719 rec.dirty = false;
720 rec.involved_this_wave = false;
721 // §10.13 perf (D047): reset received_mask — all deps back
722 // to sentinel on resubscribe.
723 rec.received_mask = 0;
724 // §10.3 perf (Slice V1): reset involved_mask.
725 rec.involved_mask = 0;
726 if rec.is_dynamic {
727 rec.tracked.clear();
728 }
729 // F8 + D-α: op_scratch handling forks by
730 // `resubscribable`. Non-resubscribable: eager
731 // release (F8 path — there's no future reset
732 // so the leak ends here). Resubscribable +
733 // has-op: take old AND install fresh; defer
734 // old release to the queue.
735 let scratch = if !rec.resubscribable {
736 std::mem::take(&mut rec.op_scratch)
737 } else if let Some(op) = rec.op {
738 // Slice C-3 /qa P1 (retain-before-release):
739 // build fresh scratch FIRST (this calls
740 // `binding.retain_handle` for any seed /
741 // default the op carries), THEN swap. The
742 // old scratch's release is deferred to the
743 // `pending_scratch_release` queue (drained
744 // on next reset_for_fresh_lifecycle or
745 // Drop for CoreState).
746 //
747 // make_op_scratch is fallible only for
748 // OperatorSeedSentinel; the OperatorOp
749 // stored on NodeRecord passed validation
750 // at registration time, so the unwrap
751 // here is structurally guaranteed (mirrors
752 // reset_for_fresh_lifecycle).
753 //
754 // Uses the binding-explicit static variant
755 // because we have only `&dyn BindingBoundary`
756 // here (Subscription::Drop holds no Core).
757 let new_scratch = Core::make_op_scratch_with_binding(&*binding, op)
758 .expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time");
759 let old = std::mem::replace(&mut rec.op_scratch, new_scratch);
760 if let Some(old_box) = old {
761 s.shared.pending_scratch_release.push(old_box);
762 }
763 // The "scratch_to_release" for lock-released
764 // release stays None here — the resubscribable
765 // case routes through the queue.
766 None
767 } else {
768 // Resubscribable but no op (e.g. derived
769 // compute / dynamic / state). Nothing to
770 // do for op_scratch.
771 None
772 };
773 (handles, scratch)
774 } else {
775 // Node destroyed between lock-released hooks
776 // and this re-acquire (terminate cascade or
777 // graph removal) — nothing to clear.
778 (Vec::new(), None)
779 }
780 };
781 // Release handles lock-released. Binding may re-enter
782 // Core during `release_handle` (final-Drop callbacks
783 // are user code).
784 for h in to_release {
785 binding.release_handle(h);
786 }
787 // F8: release operator-scratch handles lock-released
788 // (mirrors `ScratchReleaseGuard::drop` ordering).
789 if let Some(mut scratch) = scratch_to_release {
790 scratch.release_handles(&*binding);
791 }
792 }
793 }
794}
795
796// S2b (D225): no `impl Drop for Subscription` and no `Subscription`
797// Send+Sync assertion — the core-level RAII handle is retired. The sole
798// deregister path is the synchronous owner-invoked `Core::unsubscribe`
799// (which delegates to the shared `unsubscribe_sink` free fn below);
800// binding-layer RAII wraps it where the holder co-owns the Core.
801
802/// A subscriber callback. `Send + Sync` so the Core can fire it from any
803/// thread; `Fn` (not `FnMut`) so multiple references coexist — capture
804/// mutable state in `Mutex<T>` or atomics on the binding side.
805// D246/S2c/D248: single-owner ⇒ sinks fire owner-side on the one
806// thread that drives the `Core`; the `Send + Sync` bound was
807// shared-Core-era legacy. Dropped — `Core` (which owns the subscriber
808// map) is consequently `!Send + !Sync`, the actor-model shape (the only
809// cross-thread bridge is `Arc<CoreMailbox>` for id-only timer posts).
810pub type Sink = Arc<dyn Fn(&[Message])>;
811
812// ---------------------------------------------------------------------------
813// PAUSE/RESUME state — §10.2 of the rust-port session doc
814// ---------------------------------------------------------------------------
815
816/// Per-node pause state.
817///
818/// Replaces the four TS fields (`_pauseLocks`, `_pauseBuffer`,
819/// `_pauseDroppedCount`, `_pauseStartNs`) with a single enum where
820/// the buffered fields are unreachable in the [`Self::Active`] variant —
821/// the compiler refuses access. Per §10.2 simplification.
822///
823/// # Invariants
824///
825/// - `Active` ⇔ no lockId held.
826/// - `Paused { locks, .. }` ⇔ `!locks.is_empty()`.
827/// - Buffered messages are tier 3 (DATA/RESOLVED) and tier 4 (INVALIDATE)
828/// only. Other tiers pass through immediately even while paused.
829/// - `dropped` counts messages that fell out the front of `buffer` due to
830/// the Core-global `pause_buffer_cap`; it is reported on resume so callers
831/// can detect overflow without re-tracking it externally.
832#[derive(Debug)]
833pub(crate) enum PauseState {
834 Active,
835 Paused {
836 /// Active lock holders. `SmallVec` keeps the common 1–2 lock case
837 /// stack-allocated. Replaces `Set<unknown>` from TS.
838 locks: SmallVec<[LockId; 2]>,
839 /// Buffered tier-3/tier-4 outgoing messages, in arrival order.
840 /// Replayed on the final RESUME.
841 buffer: VecDeque<Message>,
842 /// Count of messages dropped from the front when `buffer.len()` would
843 /// exceed `pause_buffer_cap`. Cleared on final RESUME (next pause
844 /// cycle starts fresh).
845 dropped: u32,
846 /// Wall-clock-monotonic ns when the lock first transitioned this node
847 /// from `Active` to `Paused`. Used by R1.3.8.c overflow ERROR
848 /// synthesis to compute `lock_held_duration_ms` in the diagnostic
849 /// payload (Slice F, A3 — 2026-05-07).
850 started_at_ns: u64,
851 /// True after the first overflow event in this pause cycle has been
852 /// reported via [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
853 /// Subsequent overflows in the same cycle don't re-emit ERROR
854 /// (canonical R1.3.8.c: "once per overflow event"). Cleared on
855 /// final RESUME (next pause cycle starts fresh).
856 overflow_reported: bool,
857 /// Default-mode bookkeeping (Slice F audit close, 2026-05-07).
858 /// Set to `true` when an upstream dep delivery arrives while this
859 /// node is paused with [`PausableMode::Default`]. On final RESUME,
860 /// if `true`, the node is added back to `pending_fires` so the fn
861 /// fires once with the consolidated dep state. Always `false` for
862 /// `ResumeAll` mode (the buffered messages are the consolidation
863 /// mechanism there). Cleared on final RESUME.
864 pending_wave: bool,
865 },
866}
867
868impl PauseState {
869 pub(crate) fn is_paused(&self) -> bool {
870 matches!(self, Self::Paused { .. })
871 }
872
873 fn lock_count(&self) -> usize {
874 match self {
875 Self::Active => 0,
876 Self::Paused { locks, .. } => locks.len(),
877 }
878 }
879
880 fn contains_lock(&self, lock_id: LockId) -> bool {
881 match self {
882 Self::Active => false,
883 Self::Paused { locks, .. } => locks.contains(&lock_id),
884 }
885 }
886
887 /// Add a lock; transitions Active → Paused on first lock. Idempotent on
888 /// duplicate lock_id (matches TS convention; spec is silent on the case).
889 fn add_lock(&mut self, lock_id: LockId) {
890 match self {
891 Self::Active => {
892 let mut locks = SmallVec::new();
893 locks.push(lock_id);
894 *self = Self::Paused {
895 locks,
896 buffer: VecDeque::new(),
897 dropped: 0,
898 started_at_ns: monotonic_ns(),
899 overflow_reported: false,
900 pending_wave: false,
901 };
902 }
903 Self::Paused { locks, .. } => {
904 if !locks.contains(&lock_id) {
905 locks.push(lock_id);
906 }
907 }
908 }
909 }
910
911 /// Mark that an upstream dep delivered DATA to a node paused with
912 /// [`PausableMode::Default`]. The node will re-enter `pending_fires`
913 /// on final RESUME via [`Self::take_pending_wave`].
914 pub(crate) fn mark_pending_wave(&mut self) {
915 if let Self::Paused { pending_wave, .. } = self {
916 *pending_wave = true;
917 }
918 }
919
920 /// Read and clear the `pending_wave` flag. Called from
921 /// [`Core::resume`] when transitioning Paused → Active. Returns `true`
922 /// only if the node was paused with `pending_wave` set.
923 pub(crate) fn take_pending_wave(&mut self) -> bool {
924 if let Self::Paused { pending_wave, .. } = self {
925 std::mem::replace(pending_wave, false)
926 } else {
927 false
928 }
929 }
930
931 /// Remove a lock; if the lockset becomes empty, transition Paused →
932 /// Active and return the buffered messages for replay (along with the
933 /// dropped count for diagnostics). Unknown lock_id is an idempotent
934 /// no-op (matches TS, R1.2.6 implicit).
935 fn remove_lock(&mut self, lock_id: LockId) -> Option<(VecDeque<Message>, u32)> {
936 match self {
937 Self::Active => None,
938 Self::Paused { locks, .. } => {
939 if let Some(idx) = locks.iter().position(|l| *l == lock_id) {
940 locks.swap_remove(idx);
941 }
942 if locks.is_empty() {
943 let prev = std::mem::replace(self, Self::Active);
944 if let Self::Paused {
945 buffer, dropped, ..
946 } = prev
947 {
948 return Some((buffer, dropped));
949 }
950 }
951 None
952 }
953 }
954 }
955
956 /// Append a message to the buffer; if the buffer would exceed `cap`,
957 /// pop from the front (oldest-first), increment `dropped`, and return
958 /// the dropped messages so the caller can release any payload handles
959 /// they reference. `cap` of `None` means unbounded.
960 ///
961 /// Returns [`PushBufferedResult`] carrying both the dropped messages
962 /// (for refcount release) and whether this push triggered the FIRST
963 /// overflow event in the current pause cycle (for R1.3.8.c ERROR
964 /// synthesis — the caller schedules a single ERROR per cycle).
965 ///
966 /// Note: refcount management for the message's payload handle is the
967 /// caller's responsibility — see [`Core::queue_notify`] for the
968 /// retain/release discipline. The buffer itself is just a message
969 /// container; refcounts cross the binding boundary.
970 pub(crate) fn push_buffered(&mut self, msg: Message, cap: Option<usize>) -> PushBufferedResult {
971 let mut result = PushBufferedResult::default();
972 if let Self::Paused {
973 buffer,
974 dropped,
975 overflow_reported,
976 ..
977 } = self
978 {
979 buffer.push_back(msg);
980 if let Some(c) = cap {
981 while buffer.len() > c {
982 if let Some(dropped_msg) = buffer.pop_front() {
983 result.dropped_msgs.push(dropped_msg);
984 }
985 *dropped = dropped.saturating_add(1);
986 }
987 }
988 // R1.3.8.c (Slice F, A3): flag first overflow this cycle.
989 if !result.dropped_msgs.is_empty() && !*overflow_reported {
990 *overflow_reported = true;
991 result.first_overflow_this_cycle = true;
992 }
993 }
994 result
995 }
996
997 /// Snapshot the diagnostic for an R1.3.8.c overflow ERROR synthesis.
998 /// Returns `(dropped_count, lock_held_ns)`. Caller must already know
999 /// the configured cap (it's a Core-global value, not per-PauseState).
1000 pub(crate) fn overflow_diagnostic(&self) -> Option<(u32, u64)> {
1001 match self {
1002 Self::Active => None,
1003 Self::Paused {
1004 dropped,
1005 started_at_ns,
1006 ..
1007 } => {
1008 let lock_held_ns = monotonic_ns().saturating_sub(*started_at_ns);
1009 Some((*dropped, lock_held_ns))
1010 }
1011 }
1012 }
1013}
1014
1015/// Return shape for [`PauseState::push_buffered`]. Carries both the dropped
1016/// messages (for refcount release) and an "is this the first overflow this
1017/// cycle" flag (for R1.3.8.c ERROR synthesis scheduling).
1018#[derive(Default)]
1019pub(crate) struct PushBufferedResult {
1020 pub(crate) dropped_msgs: Vec<Message>,
1021 pub(crate) first_overflow_this_cycle: bool,
1022}
1023
1024/// Pending R1.3.8.c overflow ERROR synthesis entry. Recorded by
1025/// [`Core::queue_notify`] when the pause buffer first overflows in a cycle;
1026/// drained at wave-end after the lock-released call to
1027/// [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
1028///
1029/// `configured_max` is captured at scheduling time rather than read at
1030/// drain — the user could change `pause_buffer_cap` between schedule and
1031/// drain, and the diagnostic reads "the cap that was in effect when the
1032/// overflow happened."
1033#[derive(Debug, Clone)]
1034pub(crate) struct PendingPauseOverflow {
1035 pub(crate) node_id: NodeId,
1036 pub(crate) dropped_count: u32,
1037 pub(crate) configured_max: usize,
1038 pub(crate) lock_held_ns: u64,
1039}
1040
1041/// Error returned when a same-thread partition acquire violates
1042/// ascending order. Phase H+ STRICT variant (D115).
1043///
1044/// The ascending-order protocol prevents AB/BA deadlocks between
1045/// threads. When this error surfaces, the caller should defer the
1046/// operation to wave-end (when no partitions are held) and retry.
1047/// **Vestigial (§7, D208–D211, 2026-05-16).** The union-find ascending-
1048/// order acquisition discipline that this represented is deleted:
1049/// scheduling groups are user-declared + static and the wave engine
1050/// acquires the whole touched-group set sorted **upfront**, so there is
1051/// no incremental mid-wave acquisition that could violate ordering.
1052/// This type is **never constructed** post-§7. It is retained only so
1053/// `SubscribeError::PartitionOrderViolation` and the `Err(_)` match arms
1054/// in `graphrefly-operators` compile unchanged (D211 minimal-churn);
1055/// removing the type + those arms is a tracked downstream-churn
1056/// follow-on (`porting-deferred.md`).
1057#[derive(Error, Debug, Clone, PartialEq, Eq)]
1058#[error("vestigial PartitionOrderViolation (never constructed post-§7)")]
1059pub struct PartitionOrderViolation {
1060 /// Vestigial. Unused; retained for struct-shape stability only.
1061 pub attempted: u64,
1062 /// Vestigial. Unused; retained for struct-shape stability only.
1063 pub max_held: u64,
1064}
1065
1066/// Errors returnable by [`Core::try_subscribe`].
1067///
1068/// `Core::subscribe` (the panic-on-error variant) panics on either
1069/// case; `try_subscribe` returns these so operators (zip / concat /
1070/// race / take_until / merge / switch_map / etc.) can match on the
1071/// variant — defer for [`Self::PartitionOrderViolation`], skip the
1072/// source for [`Self::TornDown`].
1073///
1074/// Per canonical spec R2.2.7.a / R2.2.7.b (D118, 2026-05-10).
1075#[derive(Error, Debug, Clone, PartialEq, Eq)]
1076pub enum SubscribeError {
1077 /// Phase H+ STRICT (D115): partition acquisition would violate the
1078 /// ascending-order protocol. Caller should defer the subscribe to
1079 /// wave-end via the producer-pattern deferred-op queue.
1080 #[error(transparent)]
1081 PartitionOrderViolation(#[from] PartitionOrderViolation),
1082
1083 /// R2.2.7.b (D118, 2026-05-10): the node is non-resubscribable AND
1084 /// has terminated (`[COMPLETE]` or `[ERROR, h]` was delivered).
1085 /// The stream is permanently over; subscribe is rejected.
1086 /// Resubscribable terminal nodes do NOT surface this error — they
1087 /// reset to a fresh lifecycle on subscribe per R2.2.7.a, regardless
1088 /// of TEARDOWN state.
1089 #[error(
1090 "subscribe({node:?}): node is non-resubscribable and has terminated; \
1091 the stream is permanently over (R2.2.7.b)"
1092 )]
1093 TornDown {
1094 /// The non-resubscribable terminal node that rejected the subscribe.
1095 node: NodeId,
1096 },
1097}
1098
1099/// A producer-pattern operation deferred because it would have
1100/// violated the ascending partition-order protocol (Phase H+ STRICT,
1101/// D115). Drained by `BatchGuard::drop` after wave_guards are
1102/// released (no partitions held → safe to acquire any partition).
1103///
1104/// Variants with `HandleId` fields hold a binding-side retain taken
1105/// at defer time. The drain path releases this retain after the
1106/// operation fires; the panic-discard path releases it without firing.
1107pub enum DeferredProducerOp {
1108 /// Deferred `Core::emit`. Retain held on `handle`.
1109 Emit { node_id: NodeId, handle: HandleId },
1110 /// Deferred `Core::complete`. No handle.
1111 Complete { node_id: NodeId },
1112 /// Deferred `Core::error`. Retain held on `handle`.
1113 Error { node_id: NodeId, handle: HandleId },
1114 /// Generic deferred callback (e.g., deferred subscribe from
1115 /// producer build closure). The closure captures everything it
1116 /// needs; `graphrefly-core` doesn't depend on operator-specific
1117 /// types. The closure is responsible for its own retain discipline.
1118 Callback(Box<dyn FnOnce() + Send>),
1119}
1120
1121/// Errors returnable by [`Core::pause`] and [`Core::resume`].
1122#[derive(Error, Debug, Clone, PartialEq)]
1123pub enum PauseError {
1124 #[error("pause/resume: unknown node {0:?}")]
1125 UnknownNode(NodeId),
1126}
1127
1128/// Errors returnable by [`Core::up`] (canonical R1.4.1).
1129#[derive(Error, Debug, Clone, PartialEq)]
1130pub enum UpError {
1131 /// Node id is not registered.
1132 #[error("up: unknown node {0:?}")]
1133 UnknownNode(NodeId),
1134 /// Tier-3 (DATA / RESOLVED) and tier-5 (COMPLETE / ERROR) are
1135 /// downstream-only per R1.4.1; rejected at the boundary.
1136 #[error(
1137 "up: tier {tier} is forbidden upstream — value (tier 3) and \
1138 terminal-lifecycle (tier 5) planes are downstream-only per R1.4.1"
1139 )]
1140 TierForbidden { tier: u8 },
1141}
1142
1143/// Errors returnable by [`Core::register`] and its sugar wrappers
1144/// ([`Core::register_state`], [`Core::register_producer`],
1145/// [`Core::register_derived`], [`Core::register_dynamic`],
1146/// [`Core::register_operator`]).
1147///
1148/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
1149/// errors so that callers can recover from contract violations without
1150/// process abort. Every variant corresponds to a construction-time
1151/// invariant that the caller is responsible for upholding; the dispatcher
1152/// rejects the registration before any reactive state is created (so
1153/// there is no `Message::Error` channel through which to surface the
1154/// failure — these are imperative-layer errors, not reactive ones).
1155///
1156/// All variants are zero-side-effect: when [`Core::register`] returns
1157/// `Err`, no node has been added to the graph and any handle retains
1158/// taken on the way in (e.g. operator scratch seed retains via
1159/// [`BindingBoundary::retain_handle`]) have been released.
1160#[derive(Error, Debug, Clone, PartialEq, Eq)]
1161pub enum RegisterError {
1162 /// One of the supplied dep ids is not a registered node.
1163 #[error("register: unknown dep {0:?}")]
1164 UnknownDep(NodeId),
1165
1166 /// `op` was supplied (operator node) but `deps` was empty. Operator
1167 /// nodes need at least one dep — for subscription-managed combinators
1168 /// with no declared deps, use [`Core::register_producer`] instead.
1169 #[error(
1170 "register: operator nodes require at least one dep — \
1171 use register_producer for subscription-managed combinators"
1172 )]
1173 OperatorWithoutDeps,
1174
1175 /// [`NodeOpts::initial`] was set to a real handle but the registration
1176 /// shape is not a state node (state nodes are `deps.is_empty() &&
1177 /// fn_id.is_none() && op.is_none()`). Initial cache only makes sense
1178 /// for state nodes.
1179 #[error("register: NodeOpts::initial only valid for state nodes (no deps + no fn + no op)")]
1180 InitialOnlyForStateNodes,
1181
1182 /// A supplied dep is terminal (COMPLETE / ERROR) AND not
1183 /// resubscribable. Adding it would create a permanent wedge — the dep
1184 /// will never re-emit, so the registered node would be stuck.
1185 /// Mirrors [`SetDepsError::TerminalDep`] at registration time.
1186 #[error(
1187 "register: dep {0:?} is terminal and not resubscribable; \
1188 mark it resubscribable before terminating, or remove it from the dep list"
1189 )]
1190 TerminalDep(NodeId),
1191
1192 /// A stateful operator ([`OperatorOp::Scan`] / [`OperatorOp::Reduce`])
1193 /// was registered with `seed = NO_HANDLE`. R2.5.3 first-run gate
1194 /// requires the seed to be a real handle so that the operator can
1195 /// emit on its first fire.
1196 #[error("register: operator seed must be a real handle (R2.5.3); got NO_HANDLE")]
1197 OperatorSeedSentinel,
1198}
1199
1200/// Errors returnable by [`Core::set_pausable_mode`].
1201///
1202/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
1203/// errors. Same imperative-layer error model as [`RegisterError`].
1204#[derive(Error, Debug, Clone, PartialEq, Eq)]
1205pub enum SetPausableModeError {
1206 /// `node_id` is not a registered node.
1207 #[error("set_pausable_mode: unknown node {0:?}")]
1208 UnknownNode(NodeId),
1209 /// The node currently holds at least one pause lock. Changing pausable
1210 /// mode mid-pause would lose buffered content or strand a
1211 /// `pending_wave` flag — resume all locks first.
1212 #[error(
1213 "set_pausable_mode: cannot change pausable mode while paused; \
1214 resume all locks first"
1215 )]
1216 WhilePaused,
1217}
1218
1219/// Per-dep record. Replaces the parallel `deps` / `dep_handles` /
1220/// `dep_terminals` vectors from v1. Canonical spec R2.9.b alignment.
1221///
1222/// Each entry tracks one dep's lifecycle state, wave-scoped batch data,
1223/// and cross-wave `prev_data` for `ctx.prevData` access.
1224pub(crate) struct DepRecord {
1225 /// The dep node this record tracks.
1226 pub(crate) node: NodeId,
1227 /// Last DATA handle from the end of the previous wave. [`NO_HANDLE`]
1228 /// means the dep has never emitted DATA.
1229 pub(crate) prev_data: HandleId,
1230 /// Per-dep dirty flag — awaiting DATA/RESOLVED for current wave.
1231 pub(crate) dirty: bool,
1232 /// Per-dep involved-this-wave flag. Distinguishes:
1233 /// - `involved && data_batch.is_empty()` → dep settled RESOLVED
1234 /// - `!involved && data_batch.is_empty()` → dep was not in this wave
1235 pub(crate) involved_this_wave: bool,
1236 /// DATA handles accumulated this wave. Outside `batch()` scope, at most
1237 /// 1 element. Inside `batch()`, K emits on the source produce K entries
1238 /// per R1.3.6.b coalescing. Each handle holds a `retain_handle` share
1239 /// taken at `deliver_data_to_consumer` time; released at wave-end
1240 /// rotation in `clear_wave_state`.
1241 pub(crate) data_batch: SmallVec<[HandleId; 1]>,
1242 /// Terminal state for this dep. `None` = dep is live.
1243 /// `Some` = dep emitted COMPLETE/ERROR. When ALL entries are Some,
1244 /// the node auto-cascades per Lock 2.B (ERROR dominates COMPLETE).
1245 pub(crate) terminal: Option<TerminalKind>,
1246}
1247
1248impl DepRecord {
1249 fn new(node: NodeId) -> Self {
1250 Self {
1251 node,
1252 prev_data: NO_HANDLE,
1253 dirty: false,
1254 involved_this_wave: false,
1255 data_batch: SmallVec::new(),
1256 terminal: None,
1257 }
1258 }
1259}
1260
1261/// Internal node record. Mirrors `core.ts:132–154` post-D030 unification.
1262///
1263/// **Kind is derived, not stored** (D030, Slice D). `(dep_records.is_empty(),
1264/// fn_id, op, is_dynamic)` uniquely identifies the kind — see [`NodeKind`].
1265/// Helper methods (`is_state()`, `is_producer()`, `is_compute()`,
1266/// `is_operator()`, `skips_auto_cascade()`, `kind()`) cover the common
1267/// predicates without unpacking via [`Core::kind_of`].
1268///
1269/// The 5 bool fields (`has_fired_once`, `dirty`, `involved_this_wave`,
1270/// `has_received_teardown`, `resubscribable`, `is_dynamic`) each represent
1271/// an orthogonal concern. `is_dynamic` is constant per node (set at
1272/// register time); the others are mutable lifecycle state. Collapsing
1273/// them into a bitfield would obscure intent.
1274#[allow(clippy::struct_excessive_bools)]
1275pub(crate) struct NodeRecord {
1276 /// Per-dep records. Replaces the old parallel `deps` / `dep_handles` /
1277 /// `dep_terminals` vecs. Dep NodeIds derived via `dep_ids()`.
1278 pub(crate) dep_records: Vec<DepRecord>,
1279 /// User-fn id for closure-form dispatch. `Some` for Derived / Dynamic /
1280 /// Producer; `None` for State / Operator. (Operator dispatch goes via
1281 /// [`Self::op`] instead.)
1282 pub(crate) fn_id: Option<FnId>,
1283 /// Operator discriminant for typed-op dispatch. `Some` for Operator
1284 /// nodes; `None` otherwise. Mutually exclusive with `fn_id` (a node is
1285 /// either closure-form OR typed-op, never both).
1286 pub(crate) op: Option<OperatorOp>,
1287 /// True for Dynamic nodes (R2.5.3 — fn declares actually-tracked dep
1288 /// indices per fire). False for everything else. Only meaningful when
1289 /// `fn_id.is_some()` AND `!dep_records.is_empty()`.
1290 pub(crate) is_dynamic: bool,
1291 pub(crate) equals: EqualsMode,
1292
1293 // Mutable state
1294 pub(crate) cache: HandleId,
1295 pub(crate) has_fired_once: bool,
1296 pub(crate) subscribers: HashMap<SubscriptionId, Sink>,
1297 /// Monotonic counter bumped on every mutation of [`Self::subscribers`]
1298 /// (insert on subscribe, remove on `Subscription::Drop`, remove on
1299 /// handshake-panic cleanup). Used by
1300 /// [`crate::batch::Core::queue_notify`] to detect mid-wave subscriber-
1301 /// set changes and start a fresh `PendingBatch` with an updated sink
1302 /// snapshot — closes D2 (Slice X4, 2026-05-08): the late-subscriber
1303 /// and multi-emit-per-wave gap where the pre-fix per-node single
1304 /// snapshot meant a sub installed between two emits to the same node
1305 /// in one wave was invisible to the second emit's flush.
1306 ///
1307 /// Per-node (not per-Core) so that a subscribe to node A doesn't
1308 /// invalidate snapshot reuse for node B's pending batch in the same
1309 /// wave.
1310 pub(crate) subscribers_revision: u64,
1311 /// For dynamic nodes: which dep indices fn actually tracks.
1312 /// For static derived: all indices, populated at construction.
1313 pub(crate) tracked: HashSet<usize>,
1314
1315 // Wave-scoped state — cleared at wave end.
1316 pub(crate) dirty: bool,
1317 pub(crate) involved_this_wave: bool,
1318
1319 /// Per-node pause state. Default `Active`. See [`PauseState`].
1320 pub(crate) pause_state: PauseState,
1321 /// Pause behavior mode (canonical-spec §2.6). Set at registration via
1322 /// [`NodeOpts::pausable`]. Default [`PausableMode::Default`] suppresses
1323 /// fn-fire while paused and consolidates N pause-window dep deliveries
1324 /// into one fn-fire on RESUME; `ResumeAll` buffers tier-3/4 outgoing
1325 /// for verbatim replay; `Off` ignores PAUSE entirely. See
1326 /// [`PausableMode`].
1327 pub(crate) pausable: PausableMode,
1328 /// Replay buffer cap (R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
1329 /// `None` disables; `Some(N)` keeps a circular VecDeque of the last N
1330 /// DATA-handle emissions for late-subscriber replay. Each handle in
1331 /// the buffer owns one binding-side retain share, released on evict
1332 /// (cap exceeded) or in `Drop for CoreState`.
1333 pub(crate) replay_buffer_cap: Option<usize>,
1334 pub(crate) replay_buffer: VecDeque<HandleId>,
1335
1336 /// Terminal lifecycle state for THIS node's outgoing stream. Once set,
1337 /// further `emit` calls are silent no-ops, fn no longer fires, and only
1338 /// the terminal message has been queued downstream.
1339 pub(crate) terminal: Option<TerminalKind>,
1340 /// True after the first TEARDOWN has been processed for this node
1341 /// (R2.6.4 / Lock 6.F). Subsequent TEARDOWN deliveries are idempotent
1342 /// — the auto-prepended COMPLETE only fires on the first one. Without
1343 /// this flag, a redundant TEARDOWN delivered via the cascade plus an
1344 /// explicit `core.teardown(node)` would re-emit `[COMPLETE, TEARDOWN]`
1345 /// to subscribers per delivery, which is incorrect.
1346 pub(crate) has_received_teardown: bool,
1347 /// Per R2.2.7 / R2.5.3 — resubscribable terminal lifecycle.
1348 /// When `true` AND `terminal == Some(...)`, a fresh subscribe call
1349 /// will reset the node: clear `terminal`, `has_fired_once`,
1350 /// `has_received_teardown`, all dep_records to sentinel, and drain the
1351 /// pause lockset. Default `false`.
1352 pub(crate) resubscribable: bool,
1353 /// Meta companion nodes attached to this node per R1.3.9.d. When this
1354 /// node tears down, its meta companions are torn down FIRST (before
1355 /// the main node's auto-COMPLETE + TEARDOWN wire emission), so
1356 /// observers see companions terminate before the parent. The ordering
1357 /// is load-bearing — meta nodes typically subscribe to parent state
1358 /// that becomes inconsistent during the parent's destruction phase.
1359 pub(crate) meta_companions: Vec<NodeId>,
1360 /// R5.4 / D011 partial-mode: when `true`, fire_fn skips the R2.5.3
1361 /// first-run gate — the node fires as soon as ANY dep delivers a
1362 /// real handle, even if other deps remain sentinel. Defaults to
1363 /// `false` (gated). Lifted into Core for operator support; for
1364 /// State/Derived/Dynamic nodes the field is settable but the gated
1365 /// path remains the typical caller default.
1366 pub(crate) partial: bool,
1367 /// D263 — when `true`, `fire_fn`'s first-run gate (R2.5.3) counts a
1368 /// terminal dep as "real input" so the node can fire on a
1369 /// COMPLETE-without-DATA from any single dep. Mirrors `fire_operator`'s
1370 /// unconditional `dr.terminal.is_some()` clause (gated here on the
1371 /// per-node opt-in so existing `fire_fn` consumers preserve the
1372 /// historical sentinel-hold behaviour by default). Substrate-internal
1373 /// per D263 — not surfaced on the `Impl` parity contract yet.
1374 pub(crate) terminal_as_real_input: bool,
1375 /// Topological rank — 1 + max dep rank. Nodes with no deps have rank 0.
1376 /// Used by `pick_next_fire` for O(|pending_fires|) scheduling instead
1377 /// of O(V) transitive BFS. Computed at registration; recomputed on
1378 /// `set_deps` for the modified node (consumer propagation deferred —
1379 /// see `porting-deferred.md`). §10 perf optimization (D047, Slice U).
1380 pub(crate) topo_rank: u32,
1381 /// Bitmask for first-run gate — bit i set when dep i delivers first
1382 /// DATA. `has_sentinel_deps()` becomes a single integer compare for
1383 /// ≤64 deps. Falls back to `iter().any()` when `dep_count > 64`.
1384 /// Reset on resubscribe. §10.13 perf optimization (D047, Slice U).
1385 pub(crate) received_mask: u64,
1386 /// §10.3 diamond resolution bitmask — bit i set when dep i is
1387 /// involved in the current wave (DATA delivered). Mirrors
1388 /// `received_mask` pattern. Cleared to 0 at wave end instead of
1389 /// iterating all deps. For ≤64 deps, `DepBatch::involved` can be
1390 /// derived via `(involved_mask >> dep_idx) & 1 != 0`. For >64 deps,
1391 /// falls back to per-dep `DepRecord::involved_this_wave` field.
1392 /// §10.3 perf optimization (Slice V1).
1393 pub(crate) involved_mask: u64,
1394 /// Generic per-operator scratch slot (Slice C-3, D026). Replaces
1395 /// the typed `operator_state: HandleId` field used by Slices C-1 / C-2.
1396 /// `None` for non-operator kinds and operators with no cross-wave
1397 /// state (Map / Filter / Combine / WithLatestFrom / Merge); `Some`
1398 /// for stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
1399 /// [`DistinctUntilChanged`] / [`Pairwise`] / [`Take`] / [`Skip`] /
1400 /// [`TakeWhile`] / [`Last`]).
1401 ///
1402 /// The boxed value implements
1403 /// [`OperatorScratch`](crate::op_state::OperatorScratch); its
1404 /// `release_handles` method is called from
1405 /// [`reset_for_fresh_lifecycle`] (resubscribable terminal cycle) and
1406 /// from [`Drop for CoreState`].
1407 ///
1408 /// **Refcount discipline:** the state struct owns whatever handle
1409 /// shares it stores (e.g., [`ScanState::acc`](crate::op_state::ScanState::acc),
1410 /// [`LastState::latest`](crate::op_state::LastState::latest)).
1411 /// Per-fire helpers retain the new value before releasing the old;
1412 /// `release_handles` releases the current shares at end-of-life.
1413 pub(crate) op_scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1414}
1415
1416impl NodeRecord {
1417 // ---- Kind predicates (D030 — derived from field shape) ----
1418
1419 /// True iff this is a state node (no deps, no fn, no op).
1420 pub(crate) fn is_state(&self) -> bool {
1421 self.dep_records.is_empty() && self.fn_id.is_none() && self.op.is_none()
1422 }
1423
1424 /// True iff this is a producer node (no deps + has fn + no op).
1425 /// Producers fire fn once on first subscribe; cleanup fires via
1426 /// [`BindingBoundary::producer_deactivate`] (D031, Slice D).
1427 pub(crate) fn is_producer(&self) -> bool {
1428 self.dep_records.is_empty() && self.fn_id.is_some() && self.op.is_none()
1429 }
1430
1431 /// True iff this is a compute node (Derived / Dynamic / Operator) —
1432 /// has at least one dep AND either a fn or an op.
1433 #[allow(dead_code)] // Convenience predicate; callers may use is_state/is_producer instead.
1434 pub(crate) fn is_compute(&self) -> bool {
1435 !self.dep_records.is_empty() && (self.fn_id.is_some() || self.op.is_some())
1436 }
1437
1438 /// True iff this is an Operator node (has op set).
1439 #[allow(dead_code)] // Direct `op.is_some()` is more common; this is a readability sugar.
1440 pub(crate) fn is_operator(&self) -> bool {
1441 self.op.is_some()
1442 }
1443
1444 /// True iff this node opts OUT of Lock 2.B auto-cascade —
1445 /// Operator(Reduce) / Operator(Last) intercept upstream COMPLETE.
1446 ///
1447 /// D263: a non-operator node with `terminal_as_real_input == true`
1448 /// ALSO skips auto-cascade so `fire_fn` runs on terminal-only deps —
1449 /// mirrors the Reduce-class operator dispatch that already counts
1450 /// terminal as real input. Note: `partial == true` ALONE does NOT
1451 /// skip cascade — canonical-spec §5.4 / R5.4 locks `partial` to
1452 /// first-run-gate-bypass ONLY. To get fire-on-COMPLETE in partial
1453 /// mode, set BOTH flags (or wrap in a Reduce-class operator).
1454 pub(crate) fn skips_auto_cascade(&self) -> bool {
1455 match self.op {
1456 Some(op) => NodeKind::Operator(op).skips_auto_cascade(),
1457 None => self.terminal_as_real_input,
1458 }
1459 }
1460
1461 /// Compute the public-API [`NodeKind`] from the field shape (D030).
1462 /// Used by [`Core::kind_of`] and rare internal sites that need the
1463 /// enum (most use the predicate methods above).
1464 pub(crate) fn kind(&self) -> NodeKind {
1465 if let Some(op) = self.op {
1466 NodeKind::Operator(op)
1467 } else if self.dep_records.is_empty() {
1468 if self.fn_id.is_some() {
1469 NodeKind::Producer
1470 } else {
1471 NodeKind::State
1472 }
1473 } else if self.is_dynamic {
1474 NodeKind::Dynamic
1475 } else {
1476 NodeKind::Derived
1477 }
1478 }
1479
1480 // ---- Existing accessors ----
1481
1482 /// Iterator over dep NodeIds in declaration order.
1483 pub(crate) fn dep_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
1484 self.dep_records.iter().map(|r| r.node)
1485 }
1486
1487 /// Collected dep NodeIds — for call sites that need a `Vec<NodeId>`.
1488 pub(crate) fn dep_ids_vec(&self) -> Vec<NodeId> {
1489 self.dep_ids().collect()
1490 }
1491
1492 /// True if any dep is in sentinel state (never emitted DATA and no
1493 /// data this wave). Replaces the old `dep_handles.contains(&NO_HANDLE)`.
1494 pub(crate) fn has_sentinel_deps(&self) -> bool {
1495 let n = self.dep_records.len();
1496 if n == 0 {
1497 return false;
1498 }
1499 if n <= 64 {
1500 // O(1): check if all bits [0..n) are set.
1501 let full_mask = if n == 64 { u64::MAX } else { (1u64 << n) - 1 };
1502 self.received_mask != full_mask
1503 } else {
1504 // Fallback for >64 deps (extremely rare).
1505 self.dep_records
1506 .iter()
1507 .any(|r| r.prev_data == NO_HANDLE && r.data_batch.is_empty())
1508 }
1509 }
1510
1511 /// Find the index of a dep by NodeId.
1512 pub(crate) fn dep_index_of(&self, dep_id: NodeId) -> Option<usize> {
1513 self.dep_records.iter().position(|r| r.node == dep_id)
1514 }
1515
1516 /// True if ALL dep terminal slots are populated (Lock 2.B cascade check).
1517 pub(crate) fn all_deps_terminal(&self) -> bool {
1518 !self.dep_records.is_empty() && self.dep_records.iter().all(|r| r.terminal.is_some())
1519 }
1520}
1521
1522// Q-beyond Sub-slice 1 (D108, 2026-05-09): `CrossPartitionState` removed.
1523//
1524// The four wave-scoped fields previously held under
1525// `Core::cross_partition: Arc<parking_lot::Mutex<CrossPartitionState>>`
1526// (Q2, 2026-05-09) moved to a per-thread `WaveState` thread_local in
1527// [`crate::batch`]. The bench-driven rationale is documented at
1528// [`crate::batch::WaveState`]: cross-thread cache-line bouncing on the
1529// `cross_partition` mutex was the dominant cost in Phase J's regression,
1530// not single-thread mutex hop count. Per-thread placement eliminates the
1531// bounce point entirely.
1532//
1533// **Refcount discipline preserved.** `wave_cache_snapshots` and
1534// `deferred_handle_releases` still hold binding-side handle retains;
1535// outermost `BatchGuard::drop` drains them via `Core::binding` on both
1536// success and panic paths (the binding ref the prior
1537// `CrossPartitionState` held for its `Drop` impl is no longer needed —
1538// `BatchGuard` already holds a `Core` clone with the binding).
1539
1540/// Core-global state with NO per-scheduling-group partition
1541/// (Slice B-2 Step 1, D220-EXEC). Hoisted out of [`CoreState`]
1542/// so that when Step 2 shards `nodes`/`children` per `ShardKey`, THESE
1543/// remain singular: monotonic id counters, the topology-sink registry,
1544/// the cross-shard-visible `currently_firing` reentrancy stack (P13 /
1545/// /qa F2 — the load-bearing cross-thread-visibility property), the two
1546/// Core-global caps, and the Core-shutdown scratch-release queue.
1547///
1548/// **Step 1 (this stage): behaviour-identical.** A plain sub-struct of
1549/// [`CoreState`] under the SAME single cell/lock — `s.shared.<f>` is
1550/// exactly the old `s.<f>`. Step 2 reshapes [`crate::state_cell`] to
1551/// hold one `CoreShared` + a per-`ShardKey` shard map; pure-shared ops
1552/// then take only the shared guard (`with_shared`), shard ops the shard
1553/// guard (`with_shard`), and when both are needed the deadlock-free
1554/// rule is **shard guard OUTER, shared guard INNER** (D220-EXEC).
1555// `pub` (not `pub(crate)`): appears in the public `StateCell` trait
1556// surface (`from_parts` / `lock_shared`), same as `CoreState`. The
1557// struct *name* is public but every field is `pub(crate)`, so it is an
1558// opaque token outside the crate — no internal state is reachable.
1559pub struct CoreShared {
1560 pub(crate) next_node_id: u64,
1561 pub(crate) next_subscription_id: u64,
1562 pub(crate) next_lock_id: u64,
1563 /// Topology-change sinks. Keyed by subscription id for O(1) removal.
1564 pub(crate) topology_sinks: HashMap<u64, crate::topology::TopologySink>,
1565 pub(crate) next_topology_id: u64,
1566 /// Core-global cap on per-node pause replay buffer length. `None` means
1567 /// unbounded. Per the user direction (Q1, 2026-05-05): start core-global;
1568 /// per-node override can be added later as a pure addition without API
1569 /// breakage. Default `None`.
1570 pub(crate) pause_buffer_cap: Option<usize>,
1571 /// Core-global cap on wave-drain iterations before
1572 /// [`crate::batch::Core::drain_and_flush`] aborts with a diagnostic panic.
1573 /// Replaces the prior `MAX_DRAIN_ITERATIONS` hard-coded constant
1574 /// (R4.3 / Lock 2.F′). Default `10_000`.
1575 ///
1576 /// The drain loop bound exists to surface runtime cycles
1577 /// (e.g. an operator that re-arms its own `pending_fires` slot during
1578 /// `invoke_fn`) as a panic with context, rather than letting Core
1579 /// spin forever. Structural cycles via [`Core::set_deps`] are
1580 /// rejected at edge-mutation time (`SetDepsError::WouldCreateCycle`);
1581 /// registration is structurally cycle-safe by construction (the new
1582 /// node's id is not allocated until AFTER deps are validated, so deps
1583 /// cannot transitively reach the new node). The drain bound is the
1584 /// safety net for runtime cycles that bypass both static checks.
1585 pub(crate) max_batch_drain_iterations: u32,
1586 /// A6 reentrancy guard stack (Slice F, 2026-05-07): the stack of
1587 /// NodeIds whose fn is currently being invoked. Pushed at the top of
1588 /// `fire_fn` (just before the lock-released `invoke_fn` call) and
1589 /// popped on return / unwind via the [`crate::batch::FiringGuard`]
1590 /// RAII helper. [`Core::set_deps`] consults this set and rejects
1591 /// with [`SetDepsError::ReentrantOnFiringNode`] if `n` is currently
1592 /// firing — preventing the D1 `tracked` index corruption. Read by
1593 /// the P13 partition-migration check (D091) to reject mid-fire
1594 /// `set_deps` that would migrate a firing node's partition.
1595 ///
1596 /// **Core-global / cross-shard-visible.** /qa F2 reverted
1597 /// (2026-05-10): briefly placed on per-thread `WaveState`, then
1598 /// moved BACK after /qa F2 surfaced the cross-thread P13 bypass
1599 /// (per-thread placement made Thread B's `set_deps` read its own
1600 /// empty stack → P13 silently bypassed for cross-thread `set_deps`
1601 /// during Thread A's lock-released `invoke_fn`). Cross-thread
1602 /// visibility is the load-bearing property D091 requires; keeping it
1603 /// in `CoreShared` (NOT a per-`ShardKey` shard) preserves it across
1604 /// the Slice B-2 shard split (D220-EXEC / /qa F2 / D214 P13).
1605 ///
1606 /// Membership semantics (NOT strict LIFO): consumed via
1607 /// `contains(&n)` membership test. `FiringGuard::drop` pops the
1608 /// right-most matching `node_id` via `rposition` + `swap_remove`;
1609 /// physical order of remaining entries may not match construction
1610 /// order, but membership is preserved.
1611 pub(crate) currently_firing: Vec<NodeId>,
1612 /// D-α (D028 full close, 2026-05-10): per-Core defer queue for old
1613 /// operator-scratch boxes pushed by Phase G on resubscribable nodes.
1614 /// Phase G builds a fresh scratch via [`Core::make_op_scratch`] and
1615 /// installs it on the node's `op_scratch` slot (so re-activation
1616 /// sees fresh counters / a fresh seed-share); the OLD scratch's
1617 /// handle retains are deferred to one of two drain points to
1618 /// preserve the Slice C-3 /qa P1 retain-before-release invariant
1619 /// (the C-3 test
1620 /// `scan_resubscribable_reset_with_seed_aliasing_acc_does_not_collapse_registry`
1621 /// fails if the old `acc` share is released before the fresh seed
1622 /// share is taken — when `acc == seed` interns to the same registry
1623 /// slot, eager release drops the slot to zero before the fresh
1624 /// retain bumps it back up).
1625 ///
1626 /// Drain points:
1627 /// 1. [`Core::reset_for_fresh_lifecycle`] — after Phase 2 takes
1628 /// fresh retains on the new seed/default, the queue drain
1629 /// releases queued boxes whose handles may have aliased.
1630 /// 2. [`Drop for CoreState`] — catch-all on Core shutdown.
1631 ///
1632 /// Note that the queue lives on `CoreShared` within `CoreState` (not
1633 /// on `Core`) so `Drop for CoreState` has access to both the binding
1634 /// and the queue under the single state lock — no separate mutex
1635 /// needed.
1636 ///
1637 /// **Growth bound (/qa m17, 2026-05-10):** size is bounded by the
1638 /// number of non-terminal deactivate-reactivate cycles since the
1639 /// last terminal-then-resubscribe reset on any resubscribable +
1640 /// has-op node in this Core. Each entry is a `Box<dyn OperatorScratch>`
1641 /// holding O(1) handles (Scan/Reduce/Last: 1 handle; Take/Skip/
1642 /// TakeWhile/DistinctUntilChanged/Pairwise: 0 or 1 handle). Typical
1643 /// workloads: O(few KB). Degenerate workloads (long-lived Cores
1644 /// with frequent deactivate-reactivate cycles and no terminal
1645 /// resets) should call `core.complete()` / `core.error()` on the
1646 /// op node periodically to trigger queue drain via
1647 /// `reset_for_fresh_lifecycle`. The release happens unconditionally
1648 /// on Core drop, so this is not a leak — just a deferred-release
1649 /// growth concern under unusual workloads.
1650 pub(crate) pending_scratch_release: Vec<Box<dyn crate::op_state::OperatorScratch>>,
1651 /// Binding handle, held here so [`Drop for CoreShared`] can drain
1652 /// `pending_scratch_release` on Core shutdown (Step 2a, D220-EXEC:
1653 /// `CoreShared` is hoisted to its own region, separate from any
1654 /// shard's `CoreState`, so the old `Drop for CoreState` scratch-drain
1655 /// — which used `CoreState::binding` — moves here). Cheap `Arc`
1656 /// clone; `Core` / each shard `CoreState` also hold one.
1657 pub(crate) binding: Arc<dyn BindingBoundary>,
1658}
1659
1660impl Drop for CoreShared {
1661 fn drop(&mut self) {
1662 // D-α (D028) catch-all drain of the deferred operator-scratch
1663 // queue on Core shutdown (moved from `Drop for CoreState`'s tail
1664 // with the `pending_scratch_release` + `binding` hoist). Release
1665 // BEFORE the `Vec<Box<…>>` drops (each box's `Drop` is a plain
1666 // field drop — HandleIds are raw u64s, no binding re-entry).
1667 let queued: Vec<Box<dyn crate::op_state::OperatorScratch>> =
1668 std::mem::take(&mut self.pending_scratch_release);
1669 for mut scratch in queued {
1670 scratch.release_handles(&*self.binding);
1671 }
1672 }
1673}
1674
1675/// Per-shard mutable Core state (Step 2a, D220-EXEC: still exactly ONE
1676/// shard — behaviour-identical with the pre-B-2 single `CoreState`;
1677/// Step 2b keys a per-[`crate::state_cell::ShardKey`] map of these).
1678/// All mutable Core state, behind one [`parking_lot::Mutex`].
1679///
1680/// **Architecture history.** Q2 (2026-05-09) split four wave-scoped
1681/// cross-partition aggregation fields out into a separate
1682/// `parking_lot::Mutex<CrossPartitionState>` on [`Core`]. Q-beyond
1683/// Sub-slice 1 (D108, 2026-05-09) eliminated `CrossPartitionState`
1684/// entirely — its four fields moved to per-thread `WaveState`
1685/// thread_local in `crate::batch`. Sub-slice 2 + 3 moved 8 more
1686/// wave-scoped fields the same way. /qa F1+F2 (2026-05-10) reverted
1687/// `in_tick` and `currently_firing` to CoreState; `currently_firing`
1688/// stays here (cross-thread P13 set_deps check, /qa F2), but `in_tick`
1689/// was re-keyed per-(Core, thread) into the
1690/// `crate::batch::IN_TICK_OWNED` thread_local (D047, 2026-05-15) and
1691/// then collapsed to a one-Core-per-OS-thread `Cell<u64>` slot
1692/// (D252, S5, 2026-05-19) — the actor-model invariant locks
1693/// "one Core per OS thread" so a single-generation slot suffices,
1694/// with cross-Core same-thread nesting panicking fail-loud at
1695/// `BatchGuard::claim_in_tick`. See `docs/rust-port-decisions.md`.
1696///
1697/// The D1 patch (2026-05-09) moved Slice G's `tier3_emitted_this_wave`
1698/// set to a per-thread thread-local in `crate::batch` (was briefly
1699/// per-partition under Q3 v1; that placement was vulnerable to mid-wave
1700/// cross-thread `set_deps` partition splits — see
1701/// `docs/porting-deferred.md` "Per-partition state-shard refactor"
1702/// closing summary). Q-beyond
1703/// will continue the shape decomposition by sharding most of the
1704/// remaining fields per-partition.
1705// `pub` (not `pub(crate)`): appears in the public `StateCell` trait surface
1706// (`Core<C: StateCell>` is public and `SingleThreadCell`/`LockedCell` are
1707// exported). The struct *name* is public but every field stays `pub(crate)`,
1708// so it is an opaque token outside the crate — no internal state is reachable.
1709pub struct CoreState {
1710 pub(crate) nodes: HashMap<NodeId, NodeRecord>,
1711 /// Inverted adjacency: `parent → children`. Updated on registration.
1712 pub(crate) children: HashMap<NodeId, HashSet<NodeId>>,
1713 /// Binding-boundary handle for `Drop`-time refcount balancing.
1714 /// `Core` also holds a clone of this Arc; storing it here lets
1715 /// `Drop for CoreState` walk every retained slot and release the
1716 /// binding-side share when the last `Core` clone drops. Without this,
1717 /// `cache` / `terminal` / `dep_terminals` Error / pause-buffer payload
1718 /// handle refs leak in the binding registry until process exit.
1719 pub(crate) binding: Arc<dyn BindingBoundary>,
1720 // Wave-scoped state lives off `CoreState`:
1721 // - `pending_fires` / `pending_notify` / `deferred_flush_jobs` /
1722 // `deferred_cleanup_hooks` / `pending_wipes` /
1723 // `invalidate_hooks_fired_this_wave` / `wave_cache_snapshots` /
1724 // `pending_auto_resolve` / `pending_pause_overflow` →
1725 // per-thread [`crate::batch::WaveState`] (D108 Sub-slices 1–3).
1726 // - `in_tick` → one-Core-per-OS-thread [`crate::batch::IN_TICK_OWNED`]
1727 // (D252; cross-Core same-thread nesting panics fail-loud).
1728 // - tier3-emitted-this-wave → per-thread
1729 // [`crate::batch::TIER3_EMITTED_THIS_WAVE`] (Slice G / D1).
1730 // Core-global non-shard state (`next_*`, `topology_sinks`,
1731 // `currently_firing` [cross-shard-visible /qa F2], the two caps,
1732 // `pending_scratch_release`) lives on [`CoreShared`] (Slice B-2
1733 // Step 1, D220-EXEC) — see its doc for each field's rationale.
1734}
1735
1736// # Wave execution = one uninterrupted owner-side drain (S4, D246/D248/D249)
1737//
1738// The state lock (`state: RefCell<CoreState>`) is **dropped** around
1739// binding callbacks (`invoke_fn`, `custom_equals`) so user fns may
1740// re-enter Core. The original M1 design serialized WAVE EXECUTION
1741// across threads with a `wave_owner` `parking_lot::ReentrantMutex` held
1742// for the wave; S2c/D248 deleted it. `Core` is now single-owner
1743// `!Send + !Sync`: exactly one owner thread ever drives a given `Core`,
1744// so a wave is one uninterrupted owner-side drain. There is no
1745// cross-thread emitter to block and no wave-ownership mutex.
1746//
1747// - Same-thread / same-Core re-entrance (a user fn or in-wave sink
1748// re-entering via the owner-side mailbox/`DeferQueue` seam) runs as a
1749// nested wave: the inner `run_wave` sees `in_tick = true` (the
1750// one-Core-per-OS-thread `batch::IN_TICK_OWNED` slot holds this
1751// Core's `generation`, D252) and skips its own drain — the outer
1752// drain picks the queued op up to quiescence.
1753// - Cross-`Core` concurrency is host-native: independent per-worker
1754// Cores (actor model). The only cross-thread bridge into a `!Send`
1755// Core is the id-only `Arc<CoreMailbox>` (timer / producer `post_*` +
1756// the `runnable` wake bit), drained owner-side.
1757//
1758// The happens-after contract (`emit` returning ⇒ subscribers observed)
1759// holds structurally: the single owner thread runs the emit's drain to
1760// quiescence before returning; a mid-wave subscribe is frozen against
1761// double-delivery by the `subscribers_revision` `PendingBatch` snapshot
1762// (Slice X4/D2), not by a cross-thread lock.
1763
1764/// Monotonic generation counter for `Core` instances. Used by the
1765/// per-thread `PARTITION_CACHE` in `batch.rs` to distinguish Core
1766/// instances without relying on `Arc::as_ptr` (which can be reused by
1767/// the allocator after a Core is dropped — ABA hazard). One atomic
1768/// increment per `Core::new`; negligible cost.
1769static CORE_GENERATION: AtomicU64 = AtomicU64::new(1);
1770
1771/// The handle-protocol Core dispatcher.
1772///
1773/// Holds an [`Arc`] to the [`BindingBoundary`] and all dispatch state. Cheap
1774/// to clone (the inner `Arc<Mutex<CoreState>>` is shared); pass `Core` by
1775/// value to threads.
1776pub struct Core {
1777 /// Core-global region (id counters, topology-sink registry,
1778 /// `currently_firing`, the two caps, the scratch-release queue).
1779 /// D246/S2c: single-owner ⇒ a plain
1780 /// `RefCell` (the `parking_lot`/`StateCell`-generic split was
1781 /// shared-Core-era legacy; the actor model drives a `Core` from
1782 /// exactly one thread). D248 relaxed the substrate `Sink`/
1783 /// `TopologySink` off `Send + Sync`, so `CoreState` owns `!Send`
1784 /// sinks ⇒ **`Core` is `!Send + !Sync`** (one owner thread, never
1785 /// relocated cross-thread; the only cross-thread bridge is the
1786 /// id-only `Arc<CoreMailbox>`). This is the actor-model shape
1787 /// (D221/D223/D248); cross-`Core` parallelism = independent
1788 /// per-worker Cores. A re-entrant double-borrow panics loudly — a
1789 /// dispatcher bug (a missing lock-released bracket around a binding
1790 /// callback), not a user error.
1791 pub(crate) shared: std::cell::RefCell<CoreShared>,
1792 /// The node/children/binding region (was the sharded `CoreState`;
1793 /// single shard always under single-owner — D246/S2c collapse).
1794 pub(crate) state: std::cell::RefCell<CoreState>,
1795 /// `Send + Sync` bridge for autonomous async producers (timer tasks)
1796 /// that can no longer hold `&Core` (D223/D227/D230). Timer tasks
1797 /// post `(node, handle)`; [`Self::drain_mailbox`] applies them
1798 /// owner-side via the sync `emit`. Owns the per-group `runnable` wake
1799 /// bit (S4 wires it; M6 reads it from the host executor).
1800 pub(crate) mailbox: Arc<crate::mailbox::CoreMailbox>,
1801 /// Owner-only deferred-closure queue (D249/S2c). Holds the `!Send`
1802 /// `Defer` closures split off `CoreMailbox` (which stays
1803 /// `Send + Sync` for the timer bridge). `Rc` (not `Arc`): shared
1804 /// with `graphrefly-operators`' `ProducerEmitter` on the **one**
1805 /// owner thread; never crosses threads. Drained owner-side by
1806 /// [`Self::drain_mailbox`] + the in-wave `BatchGuard` drain.
1807 pub(crate) deferred: std::rc::Rc<crate::mailbox::DeferQueue>,
1808 pub(crate) binding: Arc<dyn BindingBoundary>,
1809 /// Unique generation ID for this Core instance. Assigned from
1810 /// [`CORE_GENERATION`] at construction; nonzero (the counter starts
1811 /// at 1 so `0` can serve as the [`crate::batch::IN_TICK_OWNED`]
1812 /// "no active Core" sentinel under D252). Stored in the one-Core-
1813 /// per-OS-thread `IN_TICK_OWNED` slot while this Core owns the
1814 /// thread's wave; cross-Core same-thread nesting panics fail-loud
1815 /// at `BatchGuard::claim_in_tick`.
1816 pub(crate) generation: u64,
1817}
1818
1819// S2b (D223): `Core` is **not** `Clone` and has **no** `WeakCore`. Under
1820// the actor / work-stealing model a `Core` owns its state cell by value
1821// and is moved between workers, never shared — so the old `Arc<C>`-shared
1822// `Clone` and the `Weak<C>`-back-ref `WeakCore` (used to break the
1823// BenchBinding→registry→closure→strong-Core cycle for long-lived
1824// binding-stored closures / timer tasks) are deleted. Autonomous async
1825// producers (timer tasks) now reach the `Core` via the `Send + Sync`
1826// [`crate::mailbox::CoreMailbox`] instead of upgrading a `Weak<C>`
1827// (D227/D230). Identity (`same_dispatcher`) is the unique per-`Core`
1828// `generation` counter, not `Arc::ptr_eq`.
1829
1830impl Drop for Core {
1831 fn drop(&mut self) {
1832 // Tell any in-flight timer tasks the Core is gone so they release
1833 // their pending handle and bail instead of leaking it into a
1834 // mailbox no one will drain (mirrors the old
1835 // `WeakCore::upgrade() == None` teardown branch in `timer.rs`).
1836 // `close()` is mutually exclusive with `post_op` (shared `ops`
1837 // lock), so after it returns no new op can enqueue.
1838 self.mailbox.close();
1839 // D249/S2c: close the owner-side defer queue too — its queued
1840 // closures are dropped unrun (running `CoreFull` on a
1841 // half-dropped `Core` is unsound — user-locked QA decision A).
1842 // No handle-release contract: `DeferFn`s capture no bare
1843 // retained `HandleId` (D235 P8 pattern).
1844 self.deferred.close();
1845 // QA F-A / Blind #2 (2026-05-18): an op posted just before
1846 // `close()` won the lock is now stranded in the queue with its
1847 // retained `HandleId`. Drain the remainder and release those
1848 // handles — without this `Drop` regresses the exact
1849 // BenchCore-teardown leak the deleted `WeakCore` path prevented.
1850 for op in self.mailbox.take_all() {
1851 match op {
1852 crate::mailbox::MailboxOp::Emit(_, h) | crate::mailbox::MailboxOp::Error(_, h) => {
1853 self.binding.release_handle(h);
1854 }
1855 crate::mailbox::MailboxOp::Complete(_) | crate::mailbox::MailboxOp::Defer(_) => {}
1856 }
1857 }
1858 }
1859}
1860
1861/// Object-safe full-`Core` re-entry surface (S2b / D233) — the methods a
1862/// producer sink's owner-side [`crate::mailbox::MailboxOp::Defer`]
1863/// closure needs, by `NodeId`/`HandleId`/`Sink`/id only (no `C`/`T`),
1864/// blanket-impl'd for every `Core`. Lets windowing /
1865/// higher-order-operator sinks perform value-returning topology mutation
1866/// (`register_*`/`subscribe`) **in-wave** without naming the cell type:
1867/// the `BatchGuard` drain-to-quiescence loop calls
1868/// `f(self as &dyn CoreFull)` while it holds the owner `&Core`.
1869pub trait CoreFull {
1870 /// See [`Core::register_state`].
1871 fn register_state(&self, initial: HandleId, partial: bool) -> Result<NodeId, RegisterError>;
1872 /// See [`Core::register_producer`].
1873 fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError>;
1874 /// See [`Core::subscribe`].
1875 fn subscribe(&self, node_id: NodeId, sink: Sink) -> SubscriptionId;
1876 /// See [`Core::try_subscribe`].
1877 fn try_subscribe(&self, node_id: NodeId, sink: Sink) -> Result<SubscriptionId, SubscribeError>;
1878 /// See [`Core::unsubscribe`].
1879 fn unsubscribe(&self, node_id: NodeId, sub_id: SubscriptionId);
1880 /// See [`Core::emit`].
1881 fn emit(&self, node_id: NodeId, handle: HandleId);
1882 /// See [`Core::complete`].
1883 fn complete(&self, node_id: NodeId);
1884 /// See [`Core::error`].
1885 fn error(&self, node_id: NodeId, handle: HandleId);
1886 /// See [`Core::teardown`]. Sink-side terminal forwards (e.g.
1887 /// `stratify` TEARDOWN passthrough) route through `em.defer` — rare,
1888 /// so `Defer` rather than a 5th `MailboxOp` fast-path variant.
1889 fn teardown(&self, node_id: NodeId);
1890 /// See [`Core::invalidate`]. Sink-side INVALIDATE forwards
1891 /// (higher-order `build_inner_sink`) route through `em.defer`.
1892 fn invalidate(&self, node_id: NodeId);
1893
1894 // --- read-only inspection (S2b/β D243 / D237-A-AMEND) ---
1895 //
1896 // Needed so a `MailboxOp::Defer` closure (the in-wave owner-side
1897 // re-entry point — D233) can run `graphrefly_graph::describe_of`
1898 // for the reactive-describe / observe topology-sub path. These are
1899 // pure reads (no `C`/`T` surfaced) — `HandleId`/`NodeId`/enum only.
1900
1901 /// See [`Core::cache_of`].
1902 fn cache_of(&self, node_id: NodeId) -> HandleId;
1903 /// See [`Core::has_fired_once`].
1904 fn has_fired_once(&self, node_id: NodeId) -> bool;
1905 /// See [`Core::kind_of`].
1906 fn kind_of(&self, node_id: NodeId) -> Option<NodeKind>;
1907 /// See [`Core::deps_of`].
1908 fn deps_of(&self, node_id: NodeId) -> Vec<NodeId>;
1909 /// See [`Core::is_terminal`].
1910 fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind>;
1911 /// See [`Core::is_dirty`].
1912 fn is_dirty(&self, node_id: NodeId) -> bool;
1913
1914 /// Serialize a node's cached `HandleId` via the binding (S2b/β
1915 /// D244). Delegates to `binding_ptr().serialize_handle` — needed so
1916 /// an in-wave `MailboxOp::Defer` closure (storage snapshot-on-
1917 /// observe) can run `graphrefly_graph` snapshot through `&dyn
1918 /// CoreFull`. Pure binding-delegating read; no `C`/`T` surfaced.
1919 fn serialize_handle(&self, handle: HandleId) -> Option<serde_json::Value>;
1920
1921 /// The wave-drained [`CoreMailbox`] (D246 rule 5: the one facade is
1922 /// mutation + inspection + serialize + **mailbox**). Lets a holder
1923 /// of `&dyn CoreFull` post deferred ops without naming the cell
1924 /// type — folds D245's per-binding "how do I reach the mailbox".
1925 fn mailbox(&self) -> Arc<crate::mailbox::CoreMailbox>;
1926
1927 /// The owner-side [`crate::mailbox::DeferQueue`] (D249/S2c — the
1928 /// `!Send` `Defer` split off `CoreMailbox`). Lets a holder of
1929 /// `&dyn CoreFull` post owner-side deferred closures (`ProducerCtx`
1930 /// build path) without naming the cell type.
1931 fn defer_queue(&self) -> std::rc::Rc<crate::mailbox::DeferQueue>;
1932
1933 // --- producer-build accessors (D246 r5 / D245) ---
1934 //
1935 // The producer-build path runs owner-side with this one facade
1936 // (D245 Option A: `BindingBoundary::invoke_fn_with_core` hands the
1937 // binding `&dyn CoreFull`, from which it builds its `ProducerCtx`).
1938 // A build closure + its spawned sinks need the binding `Arc` (for
1939 // `retain_handle`/`release_handle` around payload shares) and the
1940 // build-side deferred-emit helpers — these mirror `Core`'s existing
1941 // same-named methods, so `ProducerCtx::core()` over `&dyn CoreFull`
1942 // keeps every producer factory compiling unchanged. Pure
1943 // delegation; no `C`/`T` surfaced.
1944
1945 /// See [`Core::binding`].
1946 fn binding(&self) -> Arc<dyn crate::boundary::BindingBoundary>;
1947 /// See [`Core::emit_or_defer`].
1948 fn emit_or_defer(&self, node_id: NodeId, new_handle: HandleId);
1949 /// See [`Core::complete_or_defer`].
1950 fn complete_or_defer(&self, node_id: NodeId);
1951 /// See [`Core::error_or_defer`].
1952 fn error_or_defer(&self, node_id: NodeId, error_handle: HandleId);
1953}
1954
1955impl CoreFull for Core {
1956 #[inline]
1957 fn register_state(&self, initial: HandleId, partial: bool) -> Result<NodeId, RegisterError> {
1958 Core::register_state(self, initial, partial)
1959 }
1960 #[inline]
1961 fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
1962 Core::register_producer(self, fn_id)
1963 }
1964 #[inline]
1965 fn subscribe(&self, node_id: NodeId, sink: Sink) -> SubscriptionId {
1966 Core::subscribe(self, node_id, sink)
1967 }
1968 #[inline]
1969 fn try_subscribe(&self, node_id: NodeId, sink: Sink) -> Result<SubscriptionId, SubscribeError> {
1970 Core::try_subscribe(self, node_id, sink)
1971 }
1972 #[inline]
1973 fn unsubscribe(&self, node_id: NodeId, sub_id: SubscriptionId) {
1974 Core::unsubscribe(self, node_id, sub_id);
1975 }
1976 #[inline]
1977 fn emit(&self, node_id: NodeId, handle: HandleId) {
1978 Core::emit(self, node_id, handle);
1979 }
1980 #[inline]
1981 fn complete(&self, node_id: NodeId) {
1982 Core::complete(self, node_id);
1983 }
1984 #[inline]
1985 fn error(&self, node_id: NodeId, handle: HandleId) {
1986 Core::error(self, node_id, handle);
1987 }
1988 #[inline]
1989 fn teardown(&self, node_id: NodeId) {
1990 Core::teardown(self, node_id);
1991 }
1992 #[inline]
1993 fn invalidate(&self, node_id: NodeId) {
1994 Core::invalidate(self, node_id);
1995 }
1996 #[inline]
1997 fn cache_of(&self, node_id: NodeId) -> HandleId {
1998 Core::cache_of(self, node_id)
1999 }
2000 #[inline]
2001 fn has_fired_once(&self, node_id: NodeId) -> bool {
2002 Core::has_fired_once(self, node_id)
2003 }
2004 #[inline]
2005 fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
2006 Core::kind_of(self, node_id)
2007 }
2008 #[inline]
2009 fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
2010 Core::deps_of(self, node_id)
2011 }
2012 #[inline]
2013 fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
2014 Core::is_terminal(self, node_id)
2015 }
2016 #[inline]
2017 fn is_dirty(&self, node_id: NodeId) -> bool {
2018 Core::is_dirty(self, node_id)
2019 }
2020 #[inline]
2021 fn serialize_handle(&self, handle: HandleId) -> Option<serde_json::Value> {
2022 self.binding_ptr().serialize_handle(handle)
2023 }
2024 #[inline]
2025 fn mailbox(&self) -> Arc<crate::mailbox::CoreMailbox> {
2026 Core::mailbox(self)
2027 }
2028 #[inline]
2029 fn defer_queue(&self) -> std::rc::Rc<crate::mailbox::DeferQueue> {
2030 Core::defer_queue(self)
2031 }
2032 #[inline]
2033 fn binding(&self) -> Arc<dyn crate::boundary::BindingBoundary> {
2034 Core::binding(self)
2035 }
2036 // Inlined over the `try_*` + `push_deferred_producer_op`
2037 // primitives (behaviour-identical to the public
2038 // `Core::{emit,complete,error}_or_defer` wrappers).
2039 #[inline]
2040 fn emit_or_defer(&self, node_id: NodeId, new_handle: HandleId) {
2041 if self.try_emit(node_id, new_handle).is_err() {
2042 self.binding.retain_handle(new_handle);
2043 self.push_deferred_producer_op(DeferredProducerOp::Emit {
2044 node_id,
2045 handle: new_handle,
2046 });
2047 }
2048 }
2049 #[inline]
2050 fn complete_or_defer(&self, node_id: NodeId) {
2051 if self.try_complete(node_id).is_err() {
2052 self.push_deferred_producer_op(DeferredProducerOp::Complete { node_id });
2053 }
2054 }
2055 #[inline]
2056 fn error_or_defer(&self, node_id: NodeId, error_handle: HandleId) {
2057 if self.try_error(node_id, error_handle).is_err() {
2058 self.binding.retain_handle(error_handle);
2059 self.push_deferred_producer_op(DeferredProducerOp::Error {
2060 node_id,
2061 handle: error_handle,
2062 });
2063 }
2064 }
2065}
2066
2067/// RAII guard that owns an [`OperatorScratch`] until either (a) the
2068/// caller `take()`s it for installation, or (b) the guard drops on an
2069/// early return / unwind, in which case the scratch's handle retains
2070/// are released via [`OperatorScratch::release_handles`].
2071///
2072/// Slice H /qa F1 + F2 (2026-05-07): closes two related correctness
2073/// gaps in `Core::register`:
2074///
2075/// 1. **TOCTOU window** — the original three-phase split called
2076/// `lock_state()` twice (once for validation, once for insertion),
2077/// so a concurrent `Core::complete(dep)` on a non-resubscribable
2078/// dep could slip in between the two acquisitions and re-create
2079/// the wedge `RegisterError::TerminalDep` was designed to prevent.
2080/// The guard plus a single locked region for both phases closes
2081/// this gap (release runs lock-released because guard variables
2082/// drop in reverse declaration order — guard declared BEFORE
2083/// `lock_state()`, so the lock guard drops first).
2084///
2085/// 2. **Panic-unsafe scratch leak** — without an RAII drop, a panic
2086/// between `make_op_scratch` (Phase 2) and the explicit
2087/// `if let Err(e)` cleanup branch (e.g., `lock_state()` reentrance
2088/// assert, OOM-as-panic on Vec growth in dep iteration) would
2089/// drop the `Box<dyn OperatorScratch>` without releasing the
2090/// seed/default retain. The guard's `Drop` impl releases on any
2091/// unwind path.
2092///
2093/// Lock-discipline: the guard holds `&dyn BindingBoundary` (through
2094/// the `Arc<dyn BindingBoundary>` it borrows from). On `Drop`, it
2095/// invokes `release_handles` lock-released — fires AFTER any
2096/// `MutexGuard<CoreState>` declared later in the same scope drops
2097/// (LIFO destruction order). Mirrors `Core::resume` Phase 2 release
2098/// pattern.
2099struct ScratchReleaseGuard<'a> {
2100 scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
2101 binding: &'a dyn BindingBoundary,
2102}
2103
2104impl<'a> ScratchReleaseGuard<'a> {
2105 fn new(
2106 scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
2107 binding: &'a dyn BindingBoundary,
2108 ) -> Self {
2109 Self { scratch, binding }
2110 }
2111
2112 /// Take ownership of the scratch — disarms the release-on-drop
2113 /// behavior. Used on the success path to install the scratch on
2114 /// `NodeRecord.op_scratch`.
2115 fn take(mut self) -> Option<Box<dyn crate::op_state::OperatorScratch>> {
2116 self.scratch.take()
2117 }
2118}
2119
2120impl Drop for ScratchReleaseGuard<'_> {
2121 fn drop(&mut self) {
2122 if let Some(mut scratch) = self.scratch.take() {
2123 scratch.release_handles(self.binding);
2124 }
2125 }
2126}
2127
2128/// Combined RAII guard returned by [`Core::lock_state`]. Holds the
2129/// [`CoreState`] borrow + the [`CoreShared`] borrow **together**.
2130/// `DerefMut`s to [`CoreState`] (so `s.nodes` / `s.children` /
2131/// `s.binding` work) **and** exposes an inherent `shared` field (so
2132/// `s.shared.<f>` works — inherent-field access is resolved before
2133/// `Deref`). D246/S2c: single-owner ⇒ the two regions are plain
2134/// `RefCell`s on [`Core`] (the `parking_lot`/`StateCell`-generic
2135/// sharded split was shared-Core-era legacy). A re-entrant
2136/// double-borrow panics loudly — a dispatcher bug (a missing
2137/// lock-released bracket around a binding callback), not a user error.
2138pub(crate) struct St<'a> {
2139 state: std::cell::RefMut<'a, CoreState>,
2140 pub(crate) shared: std::cell::RefMut<'a, CoreShared>,
2141}
2142
2143impl<'a> St<'a> {
2144 /// Borrow both Core regions together. Used by [`Core::lock_state`]
2145 /// and the owner-side drop paths (which hold `&Core`).
2146 #[inline]
2147 pub(crate) fn new(core: &'a Core) -> Self {
2148 let state = core.state.borrow_mut();
2149 let shared = core.shared.borrow_mut();
2150 Self { state, shared }
2151 }
2152
2153 /// Allocate a fresh [`NodeId`] (the counter lives in the separate
2154 /// `CoreShared` region — `St` is the only place holding both).
2155 pub(crate) fn alloc_node_id(&mut self) -> NodeId {
2156 let id = NodeId::new(self.shared.next_node_id);
2157 self.shared.next_node_id += 1;
2158 id
2159 }
2160
2161 /// Allocate a fresh [`SubscriptionId`].
2162 ///
2163 /// **Invariant (QA F4, 2026-05-18): monotonic, never recycled for
2164 /// the `Core`'s lifetime.** A strictly-incrementing counter, never
2165 /// decremented or reset. This is what makes the owner-driven
2166 /// `unsubscribe` model sound: a `(NodeId, SubscriptionId)` pair
2167 /// recorded by `producer_deactivate` / `SubGuard` and unsubscribed
2168 /// later (after the slot may have been removed by a re-entrant
2169 /// cascade) can NEVER deregister a *different* subscription —
2170 /// `unsubscribe_sink` on a since-departed `sub_id` is a documented
2171 /// no-op, not a wrong-target removal.
2172 pub(crate) fn alloc_sub_id(&mut self) -> SubscriptionId {
2173 let id = SubscriptionId(self.shared.next_subscription_id);
2174 self.shared.next_subscription_id += 1;
2175 id
2176 }
2177}
2178
2179impl std::ops::Deref for St<'_> {
2180 type Target = CoreState;
2181 #[inline]
2182 fn deref(&self) -> &CoreState {
2183 &self.state
2184 }
2185}
2186
2187impl std::ops::DerefMut for St<'_> {
2188 #[inline]
2189 fn deref_mut(&mut self) -> &mut CoreState {
2190 &mut self.state
2191 }
2192}
2193
2194impl Core {
2195 /// Construct a fresh Core wired to the given binding. Pause buffer
2196 /// cap defaults to unbounded; set via [`Self::set_pause_buffer_cap`].
2197 #[must_use]
2198 pub fn new(binding: Arc<dyn BindingBoundary>) -> Self {
2199 Self {
2200 // D246/S2c: single-owner ⇒ plain `RefCell` regions (no
2201 // `StateCell` generic, no shard map). D248 relaxed the
2202 // substrate `Sink` off `Send+Sync` ⇒ `Core` is `!Send +
2203 // !Sync` — the actor-model shape (D221/D223/D248); one
2204 // owner thread, cross-`Core` parallelism via independent
2205 // per-worker Cores.
2206 shared: std::cell::RefCell::new(CoreShared {
2207 next_node_id: 1,
2208 next_subscription_id: 1,
2209 // A4 (Slice F, 2026-05-07): start `next_lock_id` in the
2210 // high half of the u32 range so `alloc_lock_id` can't
2211 // collide with user-supplied `LockId::new(N)`
2212 // constructors (which the napi-rs binding marshals from
2213 // `u32` and tests typically use in the low range,
2214 // 1..1024). Phase E /qa F1 (2026-05-08): lowered from
2215 // `1u64 << 32` to `1u64 << 31` so the value round-trips
2216 // through `u32::try_from(...)` at the napi boundary —
2217 // the previous seed errored every napi `alloc_lock_id`
2218 // call. Anti-collision intent (high range vs low user
2219 // range) preserved at half the prior ceiling (2^31 ≈ 2
2220 // billion allocations per Core, ample for parity
2221 // tests). Lift the floor when the deferred
2222 // BigInt-narrowing migration extends `LockId` to `u64`
2223 // at the FFI layer (porting-deferred "BigInt migration
2224 // for u32-narrowed napi types" entry).
2225 next_lock_id: 1u64 << 31,
2226 // `currently_firing` is the P13 set_deps reentrancy
2227 // check stack (/qa F2). Single-owner ⇒ one thread,
2228 // but kept here (Core-global) as the canonical
2229 // location.
2230 currently_firing: Vec::new(),
2231 pause_buffer_cap: None,
2232 max_batch_drain_iterations: 10_000,
2233 topology_sinks: HashMap::new(),
2234 next_topology_id: 1,
2235 pending_scratch_release: Vec::new(),
2236 binding: binding.clone(),
2237 }),
2238 state: std::cell::RefCell::new(CoreState {
2239 nodes: HashMap::new(),
2240 children: HashMap::new(),
2241 binding: binding.clone(),
2242 }),
2243 mailbox: Arc::new(crate::mailbox::CoreMailbox::new()),
2244 deferred: std::rc::Rc::new(crate::mailbox::DeferQueue::new()),
2245 binding,
2246 generation: CORE_GENERATION.fetch_add(1, Ordering::Relaxed),
2247 }
2248 }
2249
2250 /// Acquire the **combined** state guard: [`CoreState`] +
2251 /// [`CoreShared`] borrowed together. Returns [`St`] — `DerefMut`s
2252 /// to `CoreState` and exposes a `.shared` field, so
2253 /// `let mut s = self.lock_state(); … s.nodes … s.shared.x …` works.
2254 ///
2255 /// D246/S2c: single-owner ⇒ plain `RefCell` borrows (no lock).
2256 /// Binding callbacks fire LOCK-RELEASED (the guard is dropped
2257 /// before `invoke_fn`), so a re-entrant `lock_state()` while a
2258 /// guard is held is a dispatcher bug — it panics loudly via
2259 /// `RefCell`'s double-borrow check, the same observable contract
2260 /// the prior `parking_lot` re-entrant-deadlock path had.
2261 pub(crate) fn lock_state(&self) -> St<'_> {
2262 St::new(self)
2263 }
2264
2265 /// Run `f` with mutable access to the [`CoreShared`] region ONLY
2266 /// (id counters, topology-sink registry, `currently_firing`, the
2267 /// two caps, the scratch-release queue). Pure-shared
2268 /// ops (id alloc, topology) take only this borrow.
2269 pub(crate) fn with_shared<R>(&self, f: impl FnOnce(&mut CoreShared) -> R) -> R {
2270 f(&mut self.shared.borrow_mut())
2271 }
2272
2273 /// Run `f` with mutable access to the [`CoreState`] region ONLY
2274 /// (`nodes` / `children` / `binding`). The state-access seam —
2275 /// kept as a single method so a future M6 owner-thread overlay is
2276 /// a re-impl of one method, not a re-architecture of the ~104 call
2277 /// sites. D246/S2c: single shard always (single-owner), plain
2278 /// `RefCell` borrow.
2279 #[allow(dead_code)]
2280 pub(crate) fn with_shard<R>(&self, f: impl FnOnce(&mut crate::node::CoreState) -> R) -> R {
2281 f(&mut self.state.borrow_mut())
2282 }
2283
2284 /// Whether `self` and `other` are the same dispatcher instance.
2285 ///
2286 /// S2b (D223): `Core` is owned by value (no `Arc<C>`, no `Clone`), so
2287 /// identity is the unique per-`Core` [`Self::generation`] counter
2288 /// (assigned once from [`CORE_GENERATION`] at construction), not
2289 /// `Arc::ptr_eq`. Two independently `Core::new`-constructed instances
2290 /// have distinct generations even with the same binding; there is no
2291 /// longer any way to produce two handles to one `Core` (no `Clone`),
2292 /// so this is `true` only for genuine self-vs-self comparison.
2293 ///
2294 /// Used by `graphrefly-graph`'s `mount` to enforce the single-Core
2295 /// v1 invariant — cross-Core mount is post-M6.
2296 #[must_use]
2297 pub fn same_dispatcher(&self, other: &Core) -> bool {
2298 self.generation == other.generation
2299 }
2300
2301 /// Owner-side mailbox drain (D227/D230). Pops every
2302 /// [`crate::mailbox::CoreMailbox`]-queued timer `Emit` request in
2303 /// FIFO order and applies it via the synchronous [`Self::emit`] — so
2304 /// autonomous timer tasks (which can no longer hold `&Core`/`Weak<C>`
2305 /// under D223) get their fires delivered with **no async in Core**.
2306 ///
2307 /// Called from the embedder's existing advance/pump site (test
2308 /// harness `TestRuntime` advance helper, napi pump). Timer tasks
2309 /// already require the host runtime to be advanced before they fire,
2310 /// so draining here is behaviour-identical to the deleted autonomous
2311 /// `WeakCore::upgrade → core.emit` path. Idempotent on an empty
2312 /// mailbox. Re-entrant-safe: `emit` may cascade and a concurrent
2313 /// timer task may post again — the next drain picks it up (the
2314 /// `runnable` bit is re-set on every post).
2315 ///
2316 /// # Panics
2317 ///
2318 /// Panics if the id-mailbox / defer-queue keep mutually re-posting
2319 /// past the configured `max_batch_drain_iterations` cap — a livelock
2320 /// signal that a `Defer`/`Emit` pair is consuming its own output.
2321 pub fn drain_mailbox(&self) {
2322 // Clone the Arc out so the drain closure can call `&self.*`
2323 // without aliasing `self.mailbox` through `&self`.
2324 let mailbox = Arc::clone(&self.mailbox);
2325 // QA P3: bound the inner drain by the configured cap (the
2326 // self-reposting-Defer livelock guard lives here, NOT in
2327 // `drain_and_flush`'s fire-cascade counter).
2328 let deferred = std::rc::Rc::clone(&self.deferred);
2329 // QA P3: bound the inner drain by the configured cap (the
2330 // self-reposting-Defer livelock guard lives here, NOT in
2331 // `drain_and_flush`'s fire-cascade counter).
2332 //
2333 // QA F10 (2026-05-19): note that `max_ops` here is **shared
2334 // across three bounds** — each `mailbox.drain_into` inner
2335 // cap, each `deferred.drain_into` inner cap, AND the outer
2336 // mutual-quiescence round counter below. A workload requiring
2337 // legitimately deep cross-queue chaining (defer→emit→defer→…)
2338 // must tune `Core::set_max_batch_drain_iterations` up to
2339 // cover ALL THREE; counting against each independently is
2340 // intentional (the bound's purpose is "any single drain
2341 // dimension's livelock fires loudly"), not a knob granularity
2342 // bug.
2343 let max_ops = self.with_shared(|sh| sh.max_batch_drain_iterations);
2344 // D249/S2c: the owner-side `Defer` queue was split off the
2345 // `Send` id-`CoreMailbox` (so `!Send` `Defer` closures can
2346 // capture relaxed `Sink`s / `Rc<RefCell<GraphInner>>`). They
2347 // are now two physical queues, but a closure in either can
2348 // re-post to the OTHER (the canonical case: a `DeferQueue`
2349 // inner-subscribe → push-on-subscribe → an `Emit` on the
2350 // id-`CoreMailbox`). Pre-D249 this was ONE FIFO drained to
2351 // quiescence; restore that contract by draining BOTH to
2352 // **mutual quiescence** — id-mailbox first, then deferred, loop
2353 // until neither is runnable. Bounded by `max_ops` rounds.
2354 //
2355 // M1 contract (QA, 2026-05-19) — **CROSS-QUEUE ORDER = QUEUE
2356 // PRIORITY, NOT ARRIVAL ORDER.** Within each queue intra-FIFO
2357 // order is preserved (D234 cancel-before-resubscribe holds —
2358 // both are `DeferQueue` posts). ACROSS queues the order is
2359 // structurally `CoreMailbox` ops first, then `DeferQueue` ops,
2360 // every round. Pre-D249 a sink posting `[defer, emit]` to ONE
2361 // queue saw `defer` applied first; post-D249 a sink posting
2362 // `[defer (DeferQueue), emit (CoreMailbox)]` sees `emit`
2363 // applied first (mailbox-priority). Operators that need a
2364 // specific cross-queue ordering MUST capture the routing state
2365 // inside the *later* op's closure (D234 pattern: read shared
2366 // state at apply time, not at post time). Locked by the
2367 // `cross_queue_order_mailbox_then_deferred` regression in
2368 // `tests/lock_discipline.rs`.
2369 let mut rounds = 0u32;
2370 loop {
2371 mailbox.drain_into(max_ops, |op| match op {
2372 crate::mailbox::MailboxOp::Emit(node_id, handle) => self.emit(node_id, handle),
2373 crate::mailbox::MailboxOp::Complete(node_id) => self.complete(node_id),
2374 crate::mailbox::MailboxOp::Error(node_id, handle) => self.error(node_id, handle),
2375 // D233/D249: `Send` cross-thread timer defer applied
2376 // in-wave (we hold `&Core`).
2377 crate::mailbox::MailboxOp::Defer(f) => f(self),
2378 });
2379 // Each closure runs with the full object-safe Core surface
2380 // (we hold `&Core`); its topology mutation + result
2381 // consumption runs here.
2382 deferred.drain_into(max_ops, |f| f(self));
2383 if !mailbox.is_runnable() && !deferred.is_runnable() {
2384 break;
2385 }
2386 rounds += 1;
2387 assert!(
2388 rounds < max_ops,
2389 "drain_mailbox: id-mailbox / defer-queue mutual re-post \
2390 livelock (> {max_ops} rounds) — a Defer/Emit pair is \
2391 re-posting across the two queues every round. Tune via \
2392 Core::set_max_batch_drain_iterations only with concrete \
2393 evidence the workload needs more."
2394 );
2395 }
2396 }
2397
2398 /// Owner-side enqueue of a deferred closure (D233/D249). Runs at
2399 /// the next [`Self::drain_mailbox`] / in-wave `BatchGuard` drain
2400 /// with the object-safe full-`Core` surface. Returns `false` iff
2401 /// the owning `Core` is gone (closure dropped unrun). The
2402 /// owner-only `Rc<DeferQueue>` (also reachable via
2403 /// [`Self::defer_queue`] for capture into `!Send`
2404 /// `Sink`/`TopologySink` closures that cannot hold `&Core`).
2405 pub fn post_defer(&self, f: crate::mailbox::DeferFn) -> bool {
2406 self.deferred.post(f)
2407 }
2408
2409 /// Shared handle to this `Core`'s owner-side defer queue (D249).
2410 /// `graphrefly-operators`' `ProducerEmitter` + reactive
2411 /// describe/observe topo sinks hold this `Rc` to enqueue `Defer`
2412 /// work from inside a `!Send` sink closure (which carries no
2413 /// `&Core`). Owner-thread-only; never sent across threads.
2414 #[must_use]
2415 pub fn defer_queue(&self) -> std::rc::Rc<crate::mailbox::DeferQueue> {
2416 std::rc::Rc::clone(&self.deferred)
2417 }
2418
2419 /// Shared handle to this `Core`'s [`crate::mailbox::CoreMailbox`].
2420 /// Handed to autonomous async producers (timer tasks via
2421 /// [`crate::timer::spawn_timer_task`]) so they can post `Emit`
2422 /// requests without holding `&Core`/`Weak<C>` (D223/D227/D230).
2423 #[must_use]
2424 pub fn mailbox(&self) -> Arc<crate::mailbox::CoreMailbox> {
2425 Arc::clone(&self.mailbox)
2426 }
2427
2428 /// Shared handle to this `Core`'s binding. S2b/D231/A′: producer
2429 /// build closures obtain the binding here (via `ctx.core()`) for
2430 /// their spawned sinks' `retain_handle`/`pack_tuple`/`release_handle`
2431 /// — instead of the deleted `Weak<dyn ProducerBinding>` cycle-break.
2432 /// No cycle: the registry-stored build closure captures nothing
2433 /// strong; the `Arc<dyn BindingBoundary>` a sink clones lives in
2434 /// `Core`'s subscriber map and drops on unsubscribe.
2435 #[must_use]
2436 pub fn binding(&self) -> Arc<dyn BindingBoundary> {
2437 Arc::clone(&self.binding)
2438 }
2439
2440 /// Execute a producer-pattern op. §7: the union-find ascending-order
2441 /// constraint that motivated *deferring* these is deleted (groups
2442 /// are static identity only — S2c removed the group-lock machinery,
2443 /// so there is no `PartitionOrderViolation` to dodge). The op runs
2444 /// **immediately** — re-entrant execution on the single owner thread
2445 /// is absorbed by the in-flight wave drain (one-Core-per-OS-thread
2446 /// `IN_TICK_OWNED` slot, D252), exactly as the deferred drain used
2447 /// to run it at wave-end. The `deferred_producer_ops` queue +
2448 /// `drain_deferred_producer_ops` are deleted; this shim is retained
2449 /// so `graphrefly-operators` compiles unchanged (D211 minimal-churn).
2450 ///
2451 /// For `Emit`/`Error` the caller retained the handle before
2452 /// pushing (mirroring the old drain contract); we release it after
2453 /// firing so refcount discipline is unchanged.
2454 pub fn push_deferred_producer_op(&self, op: DeferredProducerOp) {
2455 match op {
2456 DeferredProducerOp::Emit { node_id, handle } => {
2457 self.emit(node_id, handle);
2458 self.binding.release_handle(handle);
2459 }
2460 DeferredProducerOp::Complete { node_id } => {
2461 self.complete(node_id);
2462 }
2463 DeferredProducerOp::Error { node_id, handle } => {
2464 self.error(node_id, handle);
2465 self.binding.release_handle(handle);
2466 }
2467 DeferredProducerOp::Callback(f) => {
2468 f();
2469 }
2470 }
2471 }
2472
2473 /// §7: `drain_deferred_producer_ops` is **deleted** — there is no
2474 /// `deferred_producer_ops` queue (the union-find ascending-order
2475 /// constraint that required deferral is gone). Producer ops execute
2476 /// immediately via [`Self::push_deferred_producer_op`]. The former
2477 /// `BatchGuard::drop` / `try_subscribe` drain call sites become
2478 /// no-ops. Kept as an empty inline fn so those call sites compile
2479 /// unchanged (D211 minimal-churn); the optimizer elides it.
2480 #[inline]
2481 pub(crate) fn drain_deferred_producer_ops(&self) {}
2482
2483 // D246/S2c: the §7 cross-thread wave-lock acquisition
2484 // (`compute_touched_groups` / `all_groups_sorted` /
2485 // `acquire_touched_group_guards` / `acquire_all_group_guards` /
2486 // `with_global_wave_fallback`) is **deleted** — single-owner ⇒ a
2487 // `Core` is driven by exactly one thread, so there are no
2488 // interleaving waves to serialize. D253 (S5) further deletes the
2489 // declared-group identity surface (`SchedulingGroupId`,
2490 // `node_group`, `partition_of`/`group_of`/`set_scheduling_group`,
2491 // and `NodeOpts.scheduling_group`) — re-introduce in M6 with M6's
2492 // actual scheduling needs in view.
2493}
2494
2495// `walk_undirected_dep_graph` + `component_is_group_consistent`: deleted
2496// by D253 (S5). They existed solely to validate `scheduling_group`'s
2497// dep+children+meta-component invariant at topology-mutation time. With
2498// the declared-group identity surface deleted, the component walk has no
2499// remaining caller — re-introduce alongside M6's scheduling work.
2500
2501impl Core {
2502 /// Internal inspection helper: number of `PendingBatch`es queued
2503 /// for `node` in the current wave. Used by Slice X4 D2 regression
2504 /// tests to pin the "common case = single batch, no SmallVec
2505 /// spill" perf invariant.
2506 ///
2507 /// Returns `None` if no `pending_notify` entry exists for `node`
2508 /// (no tier-1+ message has been queued for this node yet in this
2509 /// wave). `Some(0)` is unreachable by construction (a vacant
2510 /// entry implies no batches; an occupied entry has at least one).
2511 ///
2512 /// `#[doc(hidden)]` + unconditionally `pub` (NOT
2513 /// `cfg(any(test, debug_assertions))`): integration tests are a
2514 /// separate crate, so they get neither `cfg(test)` nor
2515 /// `debug_assertions` on the lib dependency under `--release`.
2516 /// Gating this out of release builds made the entire
2517 /// integration-test suite fail to compile under
2518 /// `cargo nextest run --release`, blocking release-optimized test
2519 /// runs (e.g. the `cascade_depth` stress tests). It is NOT primary
2520 /// public API — `#[doc(hidden)]` keeps it off the generated
2521 /// surface (spec §5.12: protocol-internal inspection is allowed
2522 /// but never a primary API).
2523 #[doc(hidden)]
2524 #[must_use]
2525 pub fn pending_batch_count(&self, node: NodeId) -> Option<usize> {
2526 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_notify lives
2527 // on per-thread `WaveState`. Test callers run on the same
2528 // thread that ran the wave, so the per-thread placement is
2529 // observable here.
2530 crate::batch::with_wave_state(|ws| {
2531 ws.pending_notify
2532 .get(&node)
2533 .map(|entry| entry.batches.len())
2534 })
2535 }
2536
2537 /// Configure the Core-global cap on pause replay buffer length. When set,
2538 /// any per-node pause buffer that would exceed `cap` drops the oldest
2539 /// message(s) from the front; the dropped count is reported back via the
2540 /// resume callback (see [`ResumeReport`]). `None` (default) means
2541 /// unbounded; messages buffer indefinitely until the lockset clears.
2542 pub fn set_pause_buffer_cap(&self, cap: Option<usize>) {
2543 self.lock_state().shared.pause_buffer_cap = cap;
2544 }
2545
2546 /// Configure the replay buffer cap on `node_id` (R2.6.5 / Lock 6.G —
2547 /// Slice E1, 2026-05-07). `None` disables the buffer. `Some(N)` keeps
2548 /// the last `N` DATA emissions in a circular buffer; late subscribers
2549 /// receive them as part of the per-tier handshake (between START and
2550 /// any terminal). Switching from a larger cap to a smaller cap evicts
2551 /// the front of the buffer to fit; switching to `None` drains the
2552 /// buffer entirely. Each evicted/drained handle's retain is released
2553 /// back to the binding.
2554 ///
2555 /// # Panics
2556 ///
2557 /// Panics if `node_id` is not registered.
2558 pub fn set_replay_buffer_cap(&self, node_id: NodeId, cap: Option<usize>) {
2559 // QA A7 (2026-05-07): normalize `Some(0)` to `None`. Two ways to
2560 // express "disabled" is confusing: `push_replay_buffer` already
2561 // treats `Some(0)` as no-op, so persisting it adds nothing.
2562 let cap = match cap {
2563 Some(0) => None,
2564 other => other,
2565 };
2566 let to_release: Vec<HandleId> = {
2567 let mut s = self.lock_state();
2568 let rec = s.require_node_mut(node_id);
2569 rec.replay_buffer_cap = cap;
2570 match cap {
2571 None => rec.replay_buffer.drain(..).collect(),
2572 Some(c) => {
2573 let mut drained = Vec::new();
2574 while rec.replay_buffer.len() > c {
2575 if let Some(h) = rec.replay_buffer.pop_front() {
2576 drained.push(h);
2577 }
2578 }
2579 drained
2580 }
2581 }
2582 };
2583 for h in to_release {
2584 self.binding.release_handle(h);
2585 }
2586 }
2587
2588 /// Reconfigure the pause mode for `node_id` (canonical §2.6 — Slice F
2589 /// audit close, 2026-05-07). Default for new nodes is
2590 /// [`PausableMode::Default`]; switch to [`PausableMode::ResumeAll`]
2591 /// for nodes whose pause-window emit history must be observable
2592 /// verbatim, or [`PausableMode::Off`] for nodes intrinsically
2593 /// pause-immune.
2594 ///
2595 /// # Errors
2596 ///
2597 /// - [`SetPausableModeError::UnknownNode`] — `node_id` is not
2598 /// registered.
2599 /// - [`SetPausableModeError::WhilePaused`] — the node currently
2600 /// holds at least one pause lock. Changing mode mid-pause would
2601 /// lose buffered content or strand a `pending_wave` flag — resume
2602 /// all locks first.
2603 pub fn set_pausable_mode(
2604 &self,
2605 node_id: NodeId,
2606 mode: PausableMode,
2607 ) -> Result<(), SetPausableModeError> {
2608 let mut s = self.lock_state();
2609 let rec = s
2610 .nodes
2611 .get_mut(&node_id)
2612 .ok_or(SetPausableModeError::UnknownNode(node_id))?;
2613 if rec.pause_state.is_paused() {
2614 return Err(SetPausableModeError::WhilePaused);
2615 }
2616 rec.pausable = mode;
2617 Ok(())
2618 }
2619
2620 /// Configure the wave-drain iteration cap (R4.3 / Lock 2.F′). The wave
2621 /// engine aborts a drain after `cap` iterations with a diagnostic panic.
2622 /// Default is `10_000` — high enough to avoid false positives on legitimate
2623 /// fan-in cascades, low enough to surface runtime cycles within seconds.
2624 ///
2625 /// Lower this only when running adversarial / property-based tests that
2626 /// want fast cycle detection. Raise it only with concrete evidence that a
2627 /// legitimate workload needs more iterations than the default — and even
2628 /// then, prefer to tune the workload (per-subgraph batching, etc.) over
2629 /// raising the cap.
2630 ///
2631 /// # Panics
2632 ///
2633 /// Panics if `cap == 0` — a zero cap would abort every wave on the very
2634 /// first iteration, deadlocking any subsequent dispatcher work.
2635 pub fn set_max_batch_drain_iterations(&self, cap: u32) {
2636 assert!(cap > 0, "max_batch_drain_iterations must be > 0");
2637 self.lock_state().shared.max_batch_drain_iterations = cap;
2638 }
2639
2640 /// Send a message UPSTREAM from `node_id` to each of its declared deps
2641 /// (canonical R1.4.1 — Slice F audit, F2 / 2026-05-07).
2642 ///
2643 /// The dispatcher rejects tier-3 (DATA / RESOLVED) and tier-5
2644 /// (COMPLETE / ERROR) per R1.4.1: value and terminal-lifecycle planes
2645 /// are downstream-only. All other tiers (0 START, 1 DIRTY, 2 PAUSE /
2646 /// RESUME, 4 INVALIDATE, 6 TEARDOWN) pass.
2647 ///
2648 /// # Routing per tier
2649 ///
2650 /// - **Tier 0 ([`Message::Start`]):** no-op. START is a per-subscription
2651 /// handshake, not a routable wire signal — sending it upstream has no
2652 /// well-defined target.
2653 /// - **Tier 1 ([`Message::Dirty`]):** no-op. The dep's "something
2654 /// changed" notification is its own [`Self::emit`] / commit
2655 /// responsibility; ignoring upstream DIRTY hints is safe.
2656 /// - **Tier 2 ([`Message::Pause`] / [`Message::Resume`]):** translates
2657 /// to [`Self::pause`] / [`Self::resume`] on each dep. Lock id is
2658 /// forwarded verbatim. Errors from individual deps are accumulated
2659 /// in the `dep_errors` field of the returned report.
2660 /// - **Tier 4 ([`Message::Invalidate`]):** translates to
2661 /// [`Self::invalidate`] on each dep. Note: canonical R1.4.2
2662 /// distinguishes "downstream INVALIDATE" (cache clear + cascade) from
2663 /// "upstream INVALIDATE" (plain forward, no self-process). The Rust
2664 /// port v1 SIMPLIFICATION delegates to the same `Core::invalidate`
2665 /// path — upstream INVALIDATE here DOES clear dep caches and cascade.
2666 /// If a "plain forward" mode surfaces as a real consumer need, add
2667 /// `up_with_options`.
2668 /// - **Tier 6 ([`Message::Teardown`]):** translates to
2669 /// [`Self::teardown`] on each dep. Cascades per the standard
2670 /// teardown path.
2671 ///
2672 /// # Errors
2673 ///
2674 /// - [`UpError::UnknownNode`] — `node_id` is not registered.
2675 /// - [`UpError::TierForbidden`] — tier 3 or tier 5.
2676 pub fn up(&self, node_id: NodeId, message: Message) -> Result<(), UpError> {
2677 // 2b-ii-B (D220-EXEC): route to the node's shard before the
2678 // pre-wave/cold `lock_state()` (see `try_emit`).
2679 // QA A10 (2026-05-07): check unknown node BEFORE tier rejection
2680 // for consistent error UX — `up(unknown, Data)` and
2681 // `up(unknown, Pause)` both report `UnknownNode` rather than
2682 // splitting on the tier.
2683 let dep_ids: Vec<NodeId> = {
2684 let s = self.lock_state();
2685 let rec = s.nodes.get(&node_id).ok_or(UpError::UnknownNode(node_id))?;
2686 rec.dep_ids_vec()
2687 };
2688 let tier = message.tier();
2689 if tier == 3 || tier == 5 {
2690 return Err(UpError::TierForbidden { tier });
2691 }
2692 for dep_id in dep_ids {
2693 match message {
2694 Message::Pause(lock) => {
2695 let _ = self.pause(dep_id, lock);
2696 }
2697 Message::Resume(lock) => {
2698 let _ = self.resume(dep_id, lock);
2699 }
2700 Message::Invalidate => {
2701 self.invalidate(dep_id);
2702 }
2703 Message::Teardown => {
2704 self.teardown(dep_id);
2705 }
2706 // Tier 0 START + tier 1 DIRTY: no-op upstream per the
2707 // routing table above.
2708 _ => {}
2709 }
2710 }
2711 Ok(())
2712 }
2713
2714 /// Allocate a unique [`LockId`] for use with [`Self::pause`] /
2715 /// [`Self::resume`]. Convenience for callers that don't already have an
2716 /// id-allocation scheme; user-supplied ids work too.
2717 #[must_use]
2718 pub fn alloc_lock_id(&self) -> LockId {
2719 let mut s = self.lock_state();
2720 let id = LockId::new(s.shared.next_lock_id);
2721 s.shared.next_lock_id += 1;
2722 id
2723 }
2724
2725 /// Access the binding boundary for this Core.
2726 ///
2727 /// Used by `graphrefly-graph` for snapshot serialization (M4.E1 / D166):
2728 /// `Graph::snapshot()` calls `binding.serialize_handle(cache)` to
2729 /// project each node's cached value into portable JSON.
2730 #[must_use]
2731 pub fn binding_ptr(&self) -> &Arc<dyn BindingBoundary> {
2732 &self.binding
2733 }
2734
2735 // -------------------------------------------------------------------
2736 // Registration — unified `register()` (D030, Slice D)
2737 //
2738 // All node kinds (State / Producer / Derived / Dynamic / Operator)
2739 // funnel through `Core::register(NodeRegistration) -> NodeId`. Sugar
2740 // wrappers (`register_state` / `register_producer` / `register_derived`
2741 // / `register_dynamic` / `register_operator`) build a `NodeRegistration`
2742 // and delegate. There is no parallel registration path internally.
2743 // -------------------------------------------------------------------
2744
2745 /// Unified node registration (D030).
2746 ///
2747 /// `reg` describes the node's identity (deps + closure-form fn id OR
2748 /// typed-op + per-kind opts). The kind is **derived from the field
2749 /// shape**, not stored — see [`NodeKind`].
2750 ///
2751 /// Sugar wrappers below ([`Self::register_state`],
2752 /// [`Self::register_producer`], [`Self::register_derived`],
2753 /// [`Self::register_dynamic`], [`Self::register_operator`]) build the
2754 /// registration for the common kinds and delegate here. Direct callers
2755 /// that need uncommon combinations (e.g., a partial-true derived) can
2756 /// invoke this method directly.
2757 ///
2758 /// # Errors
2759 ///
2760 /// Errors are returned in evaluation order — earlier phases short-circuit
2761 /// later ones, so a single registration produces at most one variant.
2762 ///
2763 /// **Phase 1 — lock-released, side-effect-free validation:**
2764 /// - [`RegisterError::OperatorWithoutDeps`] — `reg` carries an op but
2765 /// `deps` is empty. Operator nodes need at least one dep — for
2766 /// subscription-managed combinators with no declared deps, use
2767 /// [`Self::register_producer`] instead.
2768 /// - [`RegisterError::InitialOnlyForStateNodes`] — `reg.opts.initial`
2769 /// is non-sentinel for a non-state shape (deps non-empty, or
2770 /// fn_or_op present). State nodes are the only kind with an initial
2771 /// cache.
2772 ///
2773 /// **Phase 2 — operator scratch construction (lock-released):**
2774 /// - [`RegisterError::OperatorSeedSentinel`] — `reg` carries `Op(Scan)`
2775 /// / `Op(Reduce)` with a `NO_HANDLE` seed. R2.5.3 — stateful folders
2776 /// must have a real seed.
2777 ///
2778 /// **Phase 3 — state-lock validation (folded with insertion under a
2779 /// single lock acquisition per /qa F1 to prevent TOCTOU between
2780 /// validation and `nodes.insert`):**
2781 /// - [`RegisterError::UnknownDep`] — any element of `reg.deps` is not
2782 /// a registered node id.
2783 /// - [`RegisterError::TerminalDep`] — a dep is terminal (COMPLETE /
2784 /// ERROR) AND not resubscribable — would create a permanent wedge.
2785 ///
2786 /// All errors are construction-time invariants — the dispatcher
2787 /// rejects the registration before any reactive state is created.
2788 /// On `Err`, no node has been added and any handle retains taken on
2789 /// the way in (operator scratch seed retains via
2790 /// [`BindingBoundary::retain_handle`]) have been released
2791 /// lock-released — see [`ScratchReleaseGuard`] for the RAII
2792 /// discipline that covers both early-return AND unwind paths.
2793 /// `Last { default }` retains its `default` handle on the same
2794 /// release path.
2795 #[allow(clippy::too_many_lines)]
2796 pub fn register(&self, reg: NodeRegistration) -> Result<NodeId, RegisterError> {
2797 let NodeRegistration {
2798 deps,
2799 fn_or_op,
2800 opts,
2801 } = reg;
2802 let NodeOpts {
2803 initial,
2804 equals,
2805 partial,
2806 is_dynamic,
2807 pausable,
2808 replay_buffer,
2809 terminal_as_real_input,
2810 } = opts;
2811
2812 // Derive the field shape from fn_or_op + deps.
2813 let (fn_id, op) = match fn_or_op {
2814 Some(NodeFnOrOp::Fn(f)) => (Some(f), None),
2815 Some(NodeFnOrOp::Op(o)) => (None, Some(o)),
2816 None => (None, None),
2817 };
2818
2819 // Phase 1 — lock-released, side-effect-free validation. Errors
2820 // here return BEFORE any handle retain is taken.
2821 //
2822 // - State (no deps + no fn + no op) is the only kind with `initial`.
2823 // - Dynamic flag only meaningful when fn + non-empty deps.
2824 // - Operator (op present) must have deps (P9: operator without deps
2825 // would skip activation — use a producer instead).
2826 let is_state_shape = deps.is_empty() && fn_id.is_none() && op.is_none();
2827 if op.is_some() && deps.is_empty() {
2828 return Err(RegisterError::OperatorWithoutDeps);
2829 }
2830 if initial != NO_HANDLE && !is_state_shape {
2831 return Err(RegisterError::InitialOnlyForStateNodes);
2832 }
2833
2834 // Phase 2 — build per-operator scratch struct (may take handle
2835 // retains via `binding.retain_handle` for Scan/Reduce/Last seed).
2836 // Lock-released per Slice E (D045) handshake discipline. Returns
2837 // `OperatorSeedSentinel` BEFORE retain so an Err leaves no
2838 // dangling handles.
2839 let scratch = match op {
2840 Some(operator_op) => self.make_op_scratch(operator_op)?,
2841 None => None,
2842 };
2843
2844 // Wrap scratch in an RAII guard immediately after Phase 2. From
2845 // here on, ANY early return / unwind path correctly releases the
2846 // scratch's handle retains via `OperatorScratch::release_handles`
2847 // (Slice H /qa F2 — defense against panics between Phase 2 and
2848 // Phase 3 cleanup branch). Lock-released because the guard is
2849 // declared BEFORE `lock_state()` below — variable destruction
2850 // order is reverse declaration order, so the `MutexGuard` drops
2851 // first on any return path.
2852 let scratch_guard = ScratchReleaseGuard::new(scratch, &*self.binding);
2853
2854 // Phase 3 — state-lock-required validation, FOLDED with insertion
2855 // under a single `lock_state()` acquisition per /qa F1. The
2856 // pre-/qa version split this into two acquisitions (one for
2857 // validation, one for `alloc_node_id` + `nodes.insert`), opening
2858 // a TOCTOU window where a concurrent `Core::complete(dep)` on a
2859 // non-resubscribable dep could slip in and recreate the wedge
2860 // `TerminalDep` was designed to prevent. Single locked region
2861 // closes the gap.
2862 let mut s = self.lock_state();
2863
2864 for &dep in &deps {
2865 if !s.nodes.contains_key(&dep) {
2866 return Err(RegisterError::UnknownDep(dep));
2867 }
2868 }
2869 // Slice F audit (2026-05-07): mirror `set_deps`'s `TerminalDep`
2870 // rejection at registration time. Adding a non-resubscribable
2871 // terminal node as a dep at registration creates a permanent wedge.
2872 for &dep in &deps {
2873 let dep_rec = s.require_node(dep);
2874 if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
2875 return Err(RegisterError::TerminalDep(dep));
2876 }
2877 }
2878
2879 // Validation passed — install. Take scratch out of the guard
2880 // (disarms the release-on-drop) and continue using `s`.
2881 let installed_scratch = scratch_guard.take();
2882
2883 let id = s.alloc_node_id();
2884
2885 // `tracked`: Static derived + Operator track all deps; Dynamic
2886 // starts empty and fills via fn return; State / Producer have no
2887 // deps so tracked is empty.
2888 let tracked: HashSet<usize> = if op.is_some() {
2889 (0..deps.len()).collect()
2890 } else if is_dynamic {
2891 HashSet::new()
2892 } else if fn_id.is_some() && !deps.is_empty() {
2893 // Static derived
2894 (0..deps.len()).collect()
2895 } else {
2896 HashSet::new()
2897 };
2898
2899 let dep_records: Vec<DepRecord> = deps.iter().map(|&d| DepRecord::new(d)).collect();
2900
2901 // §10 perf (D047): compute topo_rank = 1 + max(dep ranks).
2902 let topo_rank = if deps.is_empty() {
2903 0
2904 } else {
2905 deps.iter()
2906 .filter_map(|&d| s.nodes.get(&d).map(|r| r.topo_rank))
2907 .max()
2908 .unwrap_or(0)
2909 .saturating_add(1)
2910 };
2911
2912 let rec = NodeRecord {
2913 dep_records,
2914 fn_id,
2915 op,
2916 is_dynamic,
2917 equals,
2918 cache: initial,
2919 has_fired_once: initial != NO_HANDLE,
2920 subscribers: HashMap::new(),
2921 subscribers_revision: 0,
2922 tracked,
2923 dirty: false,
2924 involved_this_wave: false,
2925 pause_state: PauseState::Active,
2926 pausable,
2927 replay_buffer_cap: replay_buffer,
2928 replay_buffer: VecDeque::new(),
2929 terminal: None,
2930 has_received_teardown: false,
2931 resubscribable: false,
2932 meta_companions: Vec::new(),
2933 partial,
2934 terminal_as_real_input,
2935 topo_rank,
2936 received_mask: 0,
2937 involved_mask: 0,
2938 op_scratch: installed_scratch,
2939 };
2940 s.nodes.insert(id, rec);
2941 s.children.insert(id, HashSet::new());
2942 for &dep in &deps {
2943 s.children.entry(dep).or_default().insert(id);
2944 }
2945 drop(s);
2946 self.fire_topology_event(&crate::topology::TopologyEvent::NodeRegistered(id));
2947 Ok(id)
2948 }
2949
2950 /// Sugar over [`Self::register`] — register a state node. `initial`
2951 /// may be [`NO_HANDLE`] to start sentinel.
2952 ///
2953 /// `partial` is accepted for surface consistency (D019); for state
2954 /// nodes it has no effect (state nodes don't fire fn).
2955 ///
2956 /// # Errors
2957 ///
2958 /// State registration is structurally simple — no deps, no op — so
2959 /// the only reachable variant is none in practice. Returns
2960 /// [`Result`] for surface consistency with [`Self::register`].
2961 pub fn register_state(
2962 &self,
2963 initial: HandleId,
2964 partial: bool,
2965 ) -> Result<NodeId, RegisterError> {
2966 self.register(NodeRegistration {
2967 deps: Vec::new(),
2968 fn_or_op: None,
2969 opts: NodeOpts {
2970 initial,
2971 partial,
2972 ..NodeOpts::default()
2973 },
2974 })
2975 }
2976
2977 /// Sugar over [`Self::register`] — register a producer node (D031,
2978 /// Slice D). No deps; fn fires once on first subscribe; cleanup runs
2979 /// via [`BindingBoundary::producer_deactivate`] when the last
2980 /// subscriber unsubscribes.
2981 ///
2982 /// The fn body uses the binding's `ProducerCtx`-equivalent helper
2983 /// (see `graphrefly-operators::producer`) to subscribe to other Core
2984 /// nodes — the zip / concat / race / takeUntil pattern.
2985 ///
2986 /// # Errors
2987 ///
2988 /// Producer registration has no user-supplied deps, so structurally
2989 /// none of [`RegisterError`]'s variants are reachable. Returns
2990 /// [`Result`] for surface consistency with [`Self::register`].
2991 pub fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
2992 self.register(NodeRegistration {
2993 deps: Vec::new(),
2994 fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2995 opts: NodeOpts {
2996 // Producers have no deps — the first-run gate is degenerate.
2997 partial: true,
2998 ..NodeOpts::default()
2999 },
3000 })
3001 }
3002
3003 /// Sugar over [`Self::register`] — register a derived (static) node.
3004 /// `partial` controls the R2.5.3 first-run gate (D011).
3005 ///
3006 /// # Errors
3007 ///
3008 /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
3009 /// registered.
3010 /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
3011 /// resubscribable.
3012 pub fn register_derived(
3013 &self,
3014 deps: &[NodeId],
3015 fn_id: FnId,
3016 equals: EqualsMode,
3017 partial: bool,
3018 ) -> Result<NodeId, RegisterError> {
3019 self.register(NodeRegistration {
3020 deps: deps.to_vec(),
3021 fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
3022 opts: NodeOpts {
3023 equals,
3024 partial,
3025 ..NodeOpts::default()
3026 },
3027 })
3028 }
3029
3030 /// Sugar over [`Self::register`] — register a dynamic node (fn
3031 /// declares its actually-tracked dep indices per fire). `partial`
3032 /// controls the R2.5.3 first-run gate (D011).
3033 ///
3034 /// # Errors
3035 ///
3036 /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
3037 /// registered.
3038 /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
3039 /// resubscribable.
3040 pub fn register_dynamic(
3041 &self,
3042 deps: &[NodeId],
3043 fn_id: FnId,
3044 equals: EqualsMode,
3045 partial: bool,
3046 ) -> Result<NodeId, RegisterError> {
3047 self.register(NodeRegistration {
3048 deps: deps.to_vec(),
3049 fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
3050 opts: NodeOpts {
3051 equals,
3052 partial,
3053 is_dynamic: true,
3054 ..NodeOpts::default()
3055 },
3056 })
3057 }
3058
3059 /// Build a fresh [`OperatorScratch`](crate::op_state::OperatorScratch)
3060 /// box for an operator variant, taking any required handle retains.
3061 /// Shared between `register_operator` (initial install),
3062 /// `reset_for_fresh_lifecycle` (resubscribable terminal-cycle
3063 /// re-install), and Phase G of `Subscription::Drop` (D-α
3064 /// resubscribable + non-terminal deactivate re-install).
3065 ///
3066 /// # Errors
3067 ///
3068 /// Returns [`RegisterError::OperatorSeedSentinel`] if `op` is `Scan`
3069 /// / `Reduce` with a [`NO_HANDLE`] seed (R2.5.3 — stateful folders
3070 /// must have a real seed). Refcount discipline: the seed-sentinel
3071 /// check happens BEFORE [`BindingBoundary::retain_handle`], so an
3072 /// `Err` leaves no handles dangling.
3073 fn make_op_scratch(
3074 &self,
3075 op: OperatorOp,
3076 ) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
3077 Self::make_op_scratch_with_binding(&*self.binding, op)
3078 }
3079
3080 /// Associated-function variant of [`Self::make_op_scratch`] that
3081 /// takes the binding explicitly. Used by call sites that have a
3082 /// `&dyn BindingBoundary` but not a `&Core` (notably
3083 /// [`Subscription::Drop`]'s Phase G, which holds only a
3084 /// `Weak<Mutex<CoreState>>` and operates on `s.binding`).
3085 pub(crate) fn make_op_scratch_with_binding(
3086 binding: &dyn BindingBoundary,
3087 op: OperatorOp,
3088 ) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
3089 use crate::op_state::{
3090 DistinctState, LastState, PairwiseState, ReduceState, ScanState, SkipState, TakeState,
3091 TakeWhileState,
3092 };
3093 // Slice H (2026-05-07): Scan/Reduce seed-sentinel checks happen
3094 // BEFORE retain_handle so an Err return leaves no handles dangling.
3095 //
3096 // Slice H /qa F13 (2026-05-07): for retaining variants, allocate
3097 // the `Box<State>` BEFORE calling `binding.retain_handle`. If
3098 // `Box::new` panics (e.g., OOM-as-panic), no retain has happened
3099 // yet — no leak. If `retain_handle` panics after Box succeeds,
3100 // the `Box<State>` is dropped on unwind; State has no handle yet
3101 // (we haven't touched the registry refcount), so still no leak.
3102 // Caller wraps the returned scratch in `ScratchReleaseGuard` to
3103 // cover panics AFTER make_op_scratch returns.
3104 match op {
3105 OperatorOp::Scan { seed, .. } => {
3106 if seed == NO_HANDLE {
3107 return Err(RegisterError::OperatorSeedSentinel);
3108 }
3109 let state = Box::new(ScanState { acc: seed });
3110 binding.retain_handle(seed);
3111 Ok(Some(state))
3112 }
3113 OperatorOp::Reduce { seed, .. } => {
3114 if seed == NO_HANDLE {
3115 return Err(RegisterError::OperatorSeedSentinel);
3116 }
3117 let state = Box::new(ReduceState { acc: seed });
3118 binding.retain_handle(seed);
3119 Ok(Some(state))
3120 }
3121 OperatorOp::DistinctUntilChanged { .. } => Ok(Some(Box::new(DistinctState::default()))),
3122 OperatorOp::Pairwise { .. } => Ok(Some(Box::new(PairwiseState::default()))),
3123 OperatorOp::Take { .. } => Ok(Some(Box::new(TakeState::default()))),
3124 OperatorOp::Skip { .. } => Ok(Some(Box::new(SkipState::default()))),
3125 OperatorOp::TakeWhile { .. } => Ok(Some(Box::new(TakeWhileState))),
3126 OperatorOp::Last { default } => {
3127 let state = Box::new(LastState {
3128 latest: NO_HANDLE,
3129 default,
3130 });
3131 if default != NO_HANDLE {
3132 binding.retain_handle(default);
3133 }
3134 Ok(Some(state))
3135 }
3136 OperatorOp::TapFirst { .. } => {
3137 Ok(Some(Box::new(crate::op_state::TapFirstState::default())))
3138 }
3139 OperatorOp::Settle { .. } => {
3140 Ok(Some(Box::new(crate::op_state::SettleState::default())))
3141 }
3142 OperatorOp::Map { .. }
3143 | OperatorOp::Filter { .. }
3144 | OperatorOp::Combine { .. }
3145 | OperatorOp::WithLatestFrom { .. }
3146 | OperatorOp::Merge
3147 | OperatorOp::Tap { .. }
3148 | OperatorOp::Valve => Ok(None),
3149 }
3150 }
3151
3152 /// Sugar over [`Self::register`] — register a built-in operator node
3153 /// (Slice C-1, D009; D026 generic scratch). The operator dispatch path
3154 /// lives in `fire_operator`; `op` selects which per-operator FFI
3155 /// method on [`BindingBoundary`] gets called per fire.
3156 ///
3157 /// For stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
3158 /// [`Last`] with a default), the seed/default handle is captured
3159 /// into the appropriate
3160 /// [`OperatorScratch`](crate::op_state::OperatorScratch) struct
3161 /// stored at [`NodeRecord::op_scratch`], and Core takes one retain
3162 /// share via [`BindingBoundary::retain_handle`].
3163 ///
3164 /// # Errors
3165 ///
3166 /// - [`RegisterError::OperatorWithoutDeps`] — `deps` is empty (use
3167 /// [`Self::register_producer`] instead).
3168 /// - [`RegisterError::OperatorSeedSentinel`] — `op` is
3169 /// [`OperatorOp::Scan`] / [`OperatorOp::Reduce`] with a
3170 /// [`NO_HANDLE`] seed.
3171 /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
3172 /// registered.
3173 /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
3174 /// resubscribable.
3175 pub fn register_operator(
3176 &self,
3177 deps: &[NodeId],
3178 op: OperatorOp,
3179 opts: OperatorOpts,
3180 ) -> Result<NodeId, RegisterError> {
3181 self.register(NodeRegistration {
3182 deps: deps.to_vec(),
3183 fn_or_op: Some(NodeFnOrOp::Op(op)),
3184 opts: NodeOpts {
3185 equals: opts.equals,
3186 partial: opts.partial,
3187 ..NodeOpts::default()
3188 },
3189 })
3190 }
3191
3192 // -------------------------------------------------------------------
3193 // Subscription
3194 // -------------------------------------------------------------------
3195
3196 /// Subscribe a sink to a node. Returns a [`Subscription`] handle —
3197 /// dropping the handle unsubscribes the sink. Per §10.12, no manual
3198 /// `unsubscribe(node, id)` call is required.
3199 ///
3200 /// Push-on-subscribe (R1.2.3, R2.2.3 step 4): the sink is registered AFTER
3201 /// the START handshake fires. The handshake contents depend on node
3202 /// state:
3203 /// - Sentinel cache + live (non-terminal): `[START]`
3204 /// - Cached + live: `[START, DATA(handle)]`
3205 ///
3206 /// Subscribe-after-terminal semantics (canonical R2.2.7.a / R2.2.7.b,
3207 /// D118 2026-05-10):
3208 /// - **Resubscribable + terminal** (any TEARDOWN state): the subscribe
3209 /// call first **resets** the node — clears `terminal`,
3210 /// `has_fired_once`, `has_received_teardown`, all `dep_handles` to
3211 /// `NO_HANDLE`, all `dep_terminals` to `None`, drains the pause
3212 /// lockset, clears the replay buffer. The new subscriber receives a
3213 /// fresh `[START]` (cache survives for state nodes per R2.2.8;
3214 /// sentinel for compute). The `wipe_ctx` cleanup hook fires
3215 /// lock-released so binding-side `ctx.store` starts fresh.
3216 /// - **Non-resubscribable + terminal** (any TEARDOWN state): the
3217 /// subscribe is rejected — `try_subscribe` returns
3218 /// [`SubscribeError::TornDown`]; this method (the panic-on-error
3219 /// variant) panics with the diagnostic.
3220 ///
3221 /// Activation (R2.2.3 step 5): if this is the first subscriber and the
3222 /// node is a derived/dynamic compute, recursively activate deps so their
3223 /// cached handles fill our `dep_handles`.
3224 ///
3225 /// # Returns
3226 ///
3227 /// Returns the [`SubscriptionId`]. Pair it with the `node_id` you
3228 /// passed here and call [`Self::unsubscribe`] to deregister (S2b /
3229 /// D225: core-level RAII retired — a binding-layer RAII wrapper over
3230 /// `unsubscribe` provides drop-convenience where the holder co-owns
3231 /// the `Core` on its affinity worker).
3232 ///
3233 /// # Panics
3234 ///
3235 /// Panics if:
3236 /// - Subscribing would violate the Phase H+ ascending partition-order
3237 /// invariant ([`SubscribeError::PartitionOrderViolation`]).
3238 /// - The node is non-resubscribable AND has terminated
3239 /// ([`SubscribeError::TornDown`], R2.2.7.b).
3240 #[allow(clippy::needless_pass_by_value)] // Sink is `Arc<dyn Fn>`; we clone for the subscribers map and call it directly. Taking by value matches the ergonomics callers expect.
3241 pub fn subscribe(&self, node_id: NodeId, sink: Sink) -> SubscriptionId {
3242 match self.try_subscribe(node_id, sink) {
3243 Ok(sub) => sub,
3244 Err(e) => panic!("{e}"),
3245 }
3246 }
3247
3248 /// Synchronous owner-invoked unsubscribe (D225 refined A2). Symmetric
3249 /// with [`Core::subscribe`] (the caller has `&Core`, exactly like
3250 /// [`Core::emit`]). Runs the full deregister + Phase G / lifecycle
3251 /// cleanup chain (OnDeactivation → `producer_deactivate` → `wipe_ctx`
3252 /// → Core cache-clear), behaviour-identical to the legacy
3253 /// `Subscription::Drop`. Idempotent: a `sub_id` already gone (node
3254 /// destroyed / double-unsub) is a silent no-op. D225 S2b retires the
3255 /// core-level RAII `Subscription` in favour of a binding-layer RAII
3256 /// wrapper over this method (the binding holds its `Core` on its
3257 /// affinity worker, so its wrapper's `Drop` calls this synchronously).
3258 ///
3259 /// S2b promotes this (and [`SubscriptionId`]) to `pub` as the
3260 /// binding-facing surface that replaces the retired core-level RAII
3261 /// `Subscription`. Callers (binding-layer RAII wrapper, producer
3262 /// `producer_deactivate` per D229, tests) pair the `node_id` they
3263 /// subscribed with the returned `sub_id`.
3264 pub fn unsubscribe(&self, node_id: NodeId, sub_id: SubscriptionId) {
3265 unsubscribe_sink(self, node_id, sub_id);
3266 }
3267
3268 /// Fallible subscribe. Returns `Err` on:
3269 /// - Partition order violation (Phase H+ STRICT, D115) — caller defers.
3270 /// - Non-resubscribable terminal node (R2.2.7.b, D118) — caller skips.
3271 ///
3272 /// Used by `subscribe` (unwraps both errors as panics) and producer-
3273 /// pattern operator sinks (match on variant).
3274 #[allow(clippy::needless_pass_by_value)]
3275 pub fn try_subscribe(
3276 &self,
3277 node_id: NodeId,
3278 sink: Sink,
3279 ) -> Result<SubscriptionId, SubscribeError> {
3280 // Subscribe protocol (S4/D246/D248 single-owner; supersedes the
3281 // pre-S2c Slice E `wave_owner` ReentrantMutex sequence which was
3282 // deleted with the §7 machinery):
3283 //
3284 // 1. Acquire the state lock briefly: alloc sub_id, run
3285 // resubscribable reset if applicable, snapshot handshake
3286 // state, install sink in `subscribers` + bump
3287 // `subscribers_revision` (Slice X4/D2 freeze). Drop the
3288 // state lock.
3289 // 2. Fire the handshake LOCK-RELEASED. Per-tier slices
3290 // (R1.3.5.a): `[Start]` / `[Data(cache)]?` / `[Complete]?` /
3291 // `[Error(h)]?` / `[Teardown]?`. Empty tiers are skipped. A
3292 // sink callback may re-enter Core via the owner-side
3293 // mailbox/`DeferQueue` seam; the owner drain applies it as
3294 // a nested wave.
3295 // 3. Run activation under `run_wave` if needed (first
3296 // subscriber on a non-state node).
3297 //
3298 // Happens-after discipline (replaces the pre-S2c cross-thread
3299 // `wave_owner` BLOCK): `Core` is single-owner `!Send + !Sync`
3300 // (D248) — the one owner thread installs the sink under the
3301 // state lock BEFORE the handshake fires, and the
3302 // `subscribers_revision` bump (Slice X4/D2) freezes any
3303 // earlier-in-wave `PendingBatch` against double-delivery to
3304 // the new sink. There is no concurrent thread to block.
3305
3306 let (sub_id, tier_slices, needs_activation, did_reset) = {
3307 let mut s = self.lock_state();
3308
3309 // R2.2.7.b (D118, 2026-05-10): non-resubscribable + terminal →
3310 // reject. The stream is permanently over; subscribe gets a
3311 // clean error rather than a confusing replay of past events.
3312 // Operators (zip / concat / race / ...) match on the variant
3313 // and skip the source.
3314 //
3315 // The `has_received_teardown` flag is irrelevant here —
3316 // `terminal.is_some()` alone gates rejection. The auto-TEARDOWN
3317 // cascade in R2.6.4 / Lock 6.F means torn_down lags terminal
3318 // by at most one wave anyway; a brief mid-wave window where
3319 // `terminal.is_some() && !torn_down` is reachable but the
3320 // rejection decision doesn't depend on which side of that
3321 // window we're in.
3322 let should_reject = {
3323 let rec = s.require_node(node_id);
3324 !rec.resubscribable && rec.terminal.is_some()
3325 };
3326 if should_reject {
3327 drop(s);
3328 return Err(SubscribeError::TornDown { node: node_id });
3329 }
3330
3331 let sub_id = s.alloc_sub_id();
3332
3333 // R2.2.7.a (D118, 2026-05-10): resubscribable + terminal → reset
3334 // to fresh lifecycle, regardless of TEARDOWN state. The prior
3335 // `!has_received_teardown` guard (Slice A+B F3) conflated
3336 // "TEARDOWN is the cleanup signal of the previous activation
3337 // cycle" with "permanent destruction" — corrected per the
3338 // canonical-spec amendment. `reset_for_fresh_lifecycle` clears
3339 // `has_received_teardown` along with `terminal`,
3340 // `has_fired_once`, dep_records sentinels, pause lockset, and
3341 // replay buffer. `wipe_ctx` fires lock-released after the
3342 // state lock drops so the binding's `ctx.store` starts fresh.
3343 let needs_reset = {
3344 let rec = s.require_node(node_id);
3345 rec.resubscribable && rec.terminal.is_some()
3346 };
3347 if needs_reset {
3348 self.reset_for_fresh_lifecycle(&mut s, node_id);
3349 }
3350
3351 // Snapshot handshake state under lock.
3352 //
3353 // F5 (/qa 2026-05-10): post-D118 R2.2.7.a/b, the snapshot
3354 // ALWAYS sees a non-terminal node here — `should_reject`
3355 // already rejected non-resubscribable terminal above, and
3356 // `needs_reset` cleared resubscribable terminal back to
3357 // `terminal = None` / `has_received_teardown = false`.
3358 // The pre-D118 terminal-replay + teardown-replay branches
3359 // were dead code in this post-D118 sequence and are
3360 // removed.
3361 let (cache, is_state, first_subscriber) = {
3362 let rec = s.require_node(node_id);
3363 debug_assert!(
3364 rec.terminal.is_none(),
3365 "R2.2.7.a/b invariant: post-reject/reset, terminal must be None"
3366 );
3367 debug_assert!(
3368 !rec.has_received_teardown,
3369 "R2.2.7.a invariant: reset clears has_received_teardown"
3370 );
3371 (rec.cache, rec.is_state(), rec.subscribers.is_empty())
3372 };
3373
3374 // Build per-tier handshake slices. Each non-empty slice is
3375 // fired as a separate sink call (R1.3.5.a tier-split).
3376 let mut tier_slices: SmallVec<[Vec<Message>; 4]> = SmallVec::new();
3377 tier_slices.push(vec![Message::Start]);
3378 if cache != NO_HANDLE {
3379 tier_slices.push(vec![Message::Data(cache)]);
3380 }
3381 // Slice E1 (R2.6.5 / Lock 6.G): replay buffered DATA between
3382 // [Start] (and the cache slice, if present) and any terminal.
3383 // Each buffered handle becomes a separate per-tier slice so
3384 // late subscribers see the historical Data sequence as
3385 // distinct sink calls.
3386 //
3387 // Dedupe: when a cache slice is present and the buffer's last
3388 // entry is the same handle (the typical case — cache always
3389 // tracks the last DATA emitted, and the buffer's tail entry
3390 // is that same DATA), skip the last buffer entry to avoid
3391 // delivering Data(cache) twice. For state nodes whose cache
3392 // survives unsubscribe, the buffer may have older entries
3393 // the cache doesn't reflect; the dedupe only drops the
3394 // single trailing entry that equals cache. (QA A1, 2026-05-07)
3395 let replay_handles: Vec<HandleId> = {
3396 let rec = s.require_node(node_id);
3397 let cap = rec.replay_buffer_cap.unwrap_or(0);
3398 if cap == 0 {
3399 Vec::new()
3400 } else {
3401 let mut v: Vec<HandleId> = rec.replay_buffer.iter().copied().collect();
3402 if cache != NO_HANDLE && v.last() == Some(&cache) {
3403 v.pop();
3404 }
3405 v
3406 }
3407 };
3408 for h in &replay_handles {
3409 tier_slices.push(vec![Message::Data(*h)]);
3410 }
3411
3412 // Install sink BEFORE dropping state lock so any subsequent
3413 // owner-side wave (after our scope ends) sees the sink
3414 // already registered. (Pre-S2c the comment referenced
3415 // acquiring `wave_owner` for cross-thread serialization;
3416 // post-D248 single-owner there is no cross-thread emitter
3417 // and no `wave_owner` lock — the rule is just "install
3418 // before drop so future waves see the sink.")
3419 //
3420 // Slice X4 / D2: bump `subscribers_revision` alongside the
3421 // insert so a pending_notify entry opened earlier in the same
3422 // wave (e.g. inside `batch(|| { emit(s, h1); subscribe(s,
3423 // late); emit(s, h2); })`) starts a fresh `PendingBatch` on
3424 // its next `queue_notify` push — making the new sink visible
3425 // to subsequent emits' flush slices, while the pre-subscribe
3426 // batch's snapshot stays frozen so we don't double-deliver
3427 // earlier emits via the wave's flush AND the new sub's
3428 // handshake replay.
3429 {
3430 let rec = s.require_node_mut(node_id);
3431 rec.subscribers.insert(sub_id, sink.clone());
3432 rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
3433 }
3434
3435 let needs_activation = first_subscriber && !is_state;
3436 (sub_id, tier_slices, needs_activation, needs_reset)
3437 // state lock drops here
3438 };
3439
3440 // Slice E2 (R2.4.6 / D055): on resubscribable terminal reset, fire
3441 // `wipe_ctx` LOCK-RELEASED so the binding drops its `NodeCtxState`
3442 // entry (clearing both `store` and any residual `current_cleanup`).
3443 // The new subscriber's first invoke_fn sees a fresh empty store.
3444 // Fires AFTER the state lock drops so the binding's
3445 // `node_ctx.lock()` can't deadlock against Core's state lock — and
3446 // BEFORE the handshake so the wipe is observable before any
3447 // user-visible interaction with the new lifecycle.
3448 if did_reset {
3449 self.binding.wipe_ctx(node_id);
3450 }
3451
3452 // Fire handshake LOCK-RELEASED. A sink may re-enter Core via
3453 // the owner-side mailbox/`DeferQueue` seam; the owner drain
3454 // applies it as a nested wave. Single-owner `!Send` Core: no
3455 // `wave_owner` mutex, no cross-thread emitter (S4/D248).
3456 //
3457 // A7 (Slice F, 2026-05-07): per-tier slice fire is wrapped in
3458 // `catch_unwind`. The sink is installed in `subscribers` BEFORE
3459 // the handshake fires (load-bearing — concurrent threads observe
3460 // the sink immediately). If a sink panics on tier N, the panic
3461 // would otherwise unwind out of `subscribe` BEFORE the
3462 // `Subscription` handle is constructed, leaving the sink
3463 // registered in `subscribers` with no user-held handle to drop.
3464 // Subsequent waves' `flush_notifications` would re-fire the
3465 // panicking sink forever.
3466 //
3467 // On panic: remove the sink from `subscribers` (via the
3468 // already-allocated `sub_id`), drop `_wave_guard` cleanly via
3469 // RAII, and resume the unwind so the user observes the panic at
3470 // the call site. Same effect as the user dropping the
3471 // `Subscription` immediately, but pre-emptive.
3472 for slice in &tier_slices {
3473 let sink_clone = sink.clone();
3474 let slice_ref: &[Message] = slice;
3475 let result = catch_unwind(AssertUnwindSafe(|| sink_clone(slice_ref)));
3476 if let Err(panic_payload) = result {
3477 // Remove the orphaned sink. Best-effort: if the node was
3478 // since torn down (e.g., the sink itself called teardown
3479 // before panicking), the entry may already be gone.
3480 {
3481 let mut s = self.lock_state();
3482 if let Some(rec) = s.nodes.get_mut(&node_id) {
3483 rec.subscribers.remove(&sub_id);
3484 // Slice X4 / D2: keep revision-tracked snapshot
3485 // discipline consistent with the install site —
3486 // any pending_notify entry that already absorbed
3487 // the panicking sink under the post-install
3488 // revision should start a fresh batch on its
3489 // next queue_notify push.
3490 rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
3491 }
3492 }
3493 std::panic::resume_unwind(panic_payload);
3494 }
3495 }
3496
3497 // D261 / S7: drain mailbox if the handshake-fire phase posted to
3498 // it. The per-tier handshake slices above fire sinks LOCK-RELEASED
3499 // outside any `BatchGuard`; a sink that posts via
3500 // `MailboxEmitter`/`SinkEmitter` (e.g. `graphrefly_structures::
3501 // reactive::SinkEmitter::emit`'s `mailbox.post_emit`) would
3502 // otherwise leave the post stranded until the next external wave
3503 // entry. Without this drain, push-on-subscribe replay for an
3504 // operator whose internal sink re-emits via the mailbox
3505 // (`reactive_log.view(slice/tail/fromCursor)`'s internal sinks)
3506 // never lands on the operator's output node by subscribe-time, so
3507 // the **next** `core.subscribe(operator_node, …)` sees `cache ==
3508 // NO_HANDLE` and emits an empty `[Data]` slice — same root
3509 // mechanism as D260 (mailbox post stranded past a sink-fire
3510 // boundary), different boundary (handshake-fire vs fire_deferred).
3511 // The `BatchGuard` here is a wave-scoped RAII handle; **its
3512 // `Drop`** (not its construction) runs `drain_and_flush` (applies
3513 // the queued mailbox/DeferQueue ops) + D260's post-`fire_deferred`
3514 // re-drain loop until both queues quiesce. The brief lifetime
3515 // — opened then dropped at the end of the `if` block, before
3516 // activation — is the entire purpose of the binding.
3517 //
3518 // Cheap on the no-post path: one `runnable` atomic load per
3519 // subscribe; on a positive load, one `BatchGuard` open+drop
3520 // round-trip that drains whatever post the handshake produced.
3521 //
3522 // **Nested-subscribe note.** If `try_subscribe` is called from
3523 // inside an already-owning wave (e.g. a Defer closure calling
3524 // `cf.subscribe`), `begin_batch_for` → `claim_in_tick` returns
3525 // `false` (already owned) → the `BatchGuard` is non-owning →
3526 // its `Drop` is a no-op (early-return at the `!owns_tick` guard
3527 // in `BatchGuard::Drop`). The outer wave's drain loop catches
3528 // the post at its next `is_runnable()` top-of-loop check. Cheap
3529 // and correct in both cases.
3530 if self.mailbox.is_runnable() || self.deferred.is_runnable() {
3531 // Use the same single-owner batch entry as `try_emit` — seed
3532 // is the node we just subscribed to (the partition-touch hint
3533 // is vestigial post-S2c per `try_begin_batch_for` doc, but
3534 // the seed is retained for D211 minimal-churn).
3535 let _drain = self.begin_batch_for(node_id);
3536 }
3537
3538 // Run activation if needed. `run_wave_for(node_id)` acquires
3539 // only the partitions transitively touched from `node_id`
3540 // (downstream cascade + meta-companion teardown reach) — same-
3541 // partition activation re-enters reentrantly. Slice Y1 / Phase E.
3542 if needs_activation {
3543 self.run_wave_for(node_id, |this| {
3544 let mut s = this.lock_state();
3545 this.activate_derived(&mut s, node_id);
3546 });
3547 }
3548
3549 // D246/S2c: no group wave-locks to drop (single-owner).
3550
3551 // Drain deferred producer ops now that no partitions are held
3552 // on this thread. The drain is a loop because each deferred op
3553 // may itself produce new deferred ops.
3554 self.drain_deferred_producer_ops();
3555
3556 Ok(sub_id)
3557 }
3558
3559 /// Mark `node_id` as resubscribable per R2.2.7. Resubscribable nodes
3560 /// reset their terminal-lifecycle state on a fresh subscribe — see
3561 /// [`Self::subscribe`].
3562 ///
3563 /// Configuration call — must be made before the node has any active
3564 /// subscribers, since changing the policy mid-flight would surprise
3565 /// existing observers.
3566 ///
3567 /// # Panics
3568 ///
3569 /// Panics if the node has subscribers (the policy is observable
3570 /// behavior; changing it after the fact would change semantics for
3571 /// existing sinks).
3572 pub fn set_resubscribable(&self, node_id: NodeId, resubscribable: bool) {
3573 // 2b-ii-B (D220-EXEC): route to the node's shard (see `try_emit`).
3574 let mut s = self.lock_state();
3575 let rec = s.require_node_mut(node_id);
3576 assert!(
3577 rec.subscribers.is_empty(),
3578 "set_resubscribable: node already has subscribers; \
3579 configure resubscribable before any subscribe call"
3580 );
3581 rec.resubscribable = resubscribable;
3582 }
3583
3584 /// Reset a resubscribable node's terminal-lifecycle state. Called from
3585 /// `subscribe` when a late subscriber arrives at a flagged node.
3586 ///
3587 /// Released: terminal-slot retain (Error handle), all per-dep terminal
3588 /// retains (Error handles), all data_batch retains.
3589 /// Cleared: `terminal`, `has_fired_once`, `has_received_teardown`, all
3590 /// dep_records to sentinel, the pause lockset (any held locks are
3591 /// released — replay buffer drops silently because there are no
3592 /// subscribers to flush to).
3593 fn reset_for_fresh_lifecycle(&self, s: &mut St<'_>, node_id: NodeId) {
3594 // Phase 1: collect wave-state handle releases + take the old
3595 // op_scratch + reset other state. Take all mutations under one
3596 // borrow so the post-borrow phases don't re-walk dep_records.
3597 let (prev_op, mut old_scratch, handles_to_release, pause_buffer_payloads) = {
3598 let rec = s.require_node_mut(node_id);
3599 let mut hs = Vec::new();
3600 if let Some(TerminalKind::Error(h)) = rec.terminal {
3601 hs.push(h);
3602 }
3603 for dr in &rec.dep_records {
3604 if let Some(TerminalKind::Error(h)) = dr.terminal {
3605 hs.push(h);
3606 }
3607 for &h in &dr.data_batch {
3608 hs.push(h);
3609 }
3610 // Slice C-3 /qa: also release `prev_data`. Prior to this
3611 // collection, `reset_for_fresh_lifecycle` overwrote
3612 // `dr.prev_data = NO_HANDLE` without releasing the old
3613 // handle, leaking one share per dep per resubscribable
3614 // cycle. The leak was masked because no test exercised
3615 // the per-dep `prev_data` retain across a lifecycle
3616 // reset; surfaced by the T1 tightening of
3617 // `last_releases_buffered_latest_on_lifecycle_reset`.
3618 if dr.prev_data != NO_HANDLE {
3619 hs.push(dr.prev_data);
3620 }
3621 }
3622 // Take pause_state's buffer; collect its payload handles for
3623 // release (they were retained at queue_notify time; buffer
3624 // drops because the new subscriber starts fresh).
3625 let mut pulled = Vec::new();
3626 if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
3627 for msg in buffer.drain(..) {
3628 if let Some(h) = msg.payload_handle() {
3629 pulled.push(h);
3630 }
3631 }
3632 }
3633 // Slice E1: drain the replay buffer too — the new subscriber
3634 // gets a fresh lifecycle and shouldn't see prior emissions.
3635 for h in rec.replay_buffer.drain(..) {
3636 pulled.push(h);
3637 }
3638 // Reset wave / lifecycle state.
3639 rec.terminal = None;
3640 rec.has_fired_once = rec.cache != NO_HANDLE && rec.is_state();
3641 rec.has_received_teardown = false;
3642 for dr in &mut rec.dep_records {
3643 dr.prev_data = NO_HANDLE;
3644 dr.data_batch.clear();
3645 dr.terminal = None;
3646 dr.dirty = false;
3647 dr.involved_this_wave = false;
3648 }
3649 rec.pause_state = PauseState::Active;
3650 rec.involved_this_wave = false;
3651 rec.dirty = false;
3652 // §10.13 perf (D047): reset received_mask — fresh lifecycle
3653 // means all deps are sentinel again.
3654 rec.received_mask = 0;
3655 // §10.3 perf (Slice V1): reset involved_mask.
3656 rec.involved_mask = 0;
3657 // P7 (Slice A close /qa): Dynamic nodes clear `tracked` so
3658 // the post-reset first fire repopulates from the fn's
3659 // returned tracked-deps set.
3660 if rec.is_dynamic {
3661 rec.tracked.clear();
3662 }
3663 // Take the old scratch out so we can release its handles and
3664 // install a fresh one. Operator op is copied for the
3665 // rebuild step below.
3666 let prev_op = rec.op;
3667 let old = std::mem::take(&mut rec.op_scratch);
3668 (prev_op, old, hs, pulled)
3669 };
3670
3671 // Phase 2 (Slice C-3 /qa P1 — RETAIN-BEFORE-RELEASE ordering):
3672 // build the fresh scratch FIRST, taking new retains on any
3673 // seed/default handles. This must run BEFORE Phase 3 releases
3674 // the old scratch's shares — if old `acc` (Scan/Reduce) or old
3675 // `latest` (Last) aliases the new `seed`/`default` (common:
3676 // `fold(seed, x) == seed` interns to the same registry entry),
3677 // releasing the old share first could collapse the binding's
3678 // registry slot to zero (production bindings remove the value
3679 // entry on refcount-zero — see `tests/common/mod.rs:191-204`),
3680 // and a subsequent `retain_handle` on the new seed would bump a
3681 // refcount on a slot whose value has been removed. By taking
3682 // the new retains first, we floor the refcount at ≥1 before
3683 // any release happens.
3684 let new_scratch = match prev_op {
3685 // Slice H: the OperatorOp stored on NodeRecord previously
3686 // passed `make_op_scratch` validation at registration time
3687 // (RegisterError::OperatorSeedSentinel for Scan/Reduce
3688 // sentinel seeds; Last { default: NO_HANDLE } is accepted
3689 // and never errors). Re-running it here on the same op
3690 // value is structurally guaranteed to succeed.
3691 Some(op) => self
3692 .make_op_scratch(op)
3693 .expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time"),
3694 None => None,
3695 };
3696
3697 // Phase 3: NOW release handles owned by the old op_scratch
3698 // (Scan/Reduce acc, Distinct/Pairwise prev, Last latest +
3699 // default). Safe per Phase 2's retain-first floor. The boxed
3700 // value is consumed and dropped after.
3701 if let Some(scratch) = old_scratch.as_mut() {
3702 scratch.release_handles(&*self.binding);
3703 }
3704 drop(old_scratch);
3705
3706 // Phase 3b (D-α, D028 full close, 2026-05-10): drain the
3707 // per-Core `pending_scratch_release` queue populated by Phase G
3708 // on prior resubscribable + non-terminal deactivate cycles.
3709 // Queued boxes' shares may alias the seed/default we just
3710 // retained in Phase 2 (e.g., when scan's `acc` interns to the
3711 // same registry slot as `seed`). Releasing AFTER Phase 2's
3712 // floor keeps the registry slot at refcount ≥1 throughout —
3713 // mirrors the Phase 2/3 retain-before-release ordering. Safe
3714 // even when the queue is empty (no prior deactivations
3715 // happened).
3716 let queued: Vec<Box<dyn crate::op_state::OperatorScratch>> =
3717 std::mem::take(&mut s.shared.pending_scratch_release);
3718 for mut scratch in queued {
3719 scratch.release_handles(&*self.binding);
3720 }
3721
3722 // Phase 4: install the fresh scratch.
3723 {
3724 let rec = s.require_node_mut(node_id);
3725 rec.op_scratch = new_scratch;
3726 }
3727
3728 // Phase 5: release wave-state handles collected in phase 1.
3729 for h in handles_to_release {
3730 self.binding.release_handle(h);
3731 }
3732 for h in pause_buffer_payloads {
3733 self.binding.release_handle(h);
3734 }
3735 }
3736
3737 /// Activate `root` and any transitive uncached compute deps so their
3738 /// caches fill our dep_handles slots.
3739 ///
3740 /// Slice A close (M1): pure dep-walk + dep_handles population +
3741 /// pending_fires queueing. No `in_tick` management or `drain_and_flush`
3742 /// call — the outer caller (typically `Core::subscribe` via
3743 /// [`Core::run_wave`]) owns the wave lifecycle and drains lock-released
3744 /// around `invoke_fn`.
3745 ///
3746 /// Walk shape:
3747 /// 1. **Discover phase (DFS via Vec stack):** starting at `root`,
3748 /// walk transitively-needing-activation deps via the `deps`
3749 /// chain. Build an ordering where each node appears AFTER all
3750 /// of its uncached compute deps — i.e., reverse topological
3751 /// among the visited subgraph.
3752 /// 2. **Deliver phase (forward iteration):** for each visited
3753 /// node in dep-first order, push deps' caches into the node's
3754 /// `dep_handles` slots. Caches that were sentinel pre-walk are
3755 /// filled because their parent's fn fires later in the wave's
3756 /// drain loop and `commit_emission` propagates new caches forward
3757 /// via `deliver_data_to_consumer` — the same path this method
3758 /// uses for the initial seed. Adds the node to `pending_fires`
3759 /// if its tracked-deps gate is satisfied; the wave-engine drain
3760 /// fires the fn lock-released around `invoke_fn`.
3761 pub(crate) fn activate_derived(&self, s: &mut CoreState, root: NodeId) {
3762 // Phase 1: discover. DFS to collect every compute node reachable
3763 // via deps that doesn't yet have a cache and hasn't fired.
3764 // Record them in dep-first (post-order) so phase 2 can deliver
3765 // caches forward. Frame is `(node_id, finalize)` — `finalize=false`
3766 // means "first visit: push deps then re-push self with finalize=true";
3767 // `finalize=true` means "deps have been expanded, append self to
3768 // `order`."
3769 let mut visited: HashSet<NodeId> = HashSet::new();
3770 let mut order: Vec<NodeId> = Vec::new();
3771 let mut stack: Vec<(NodeId, bool)> = vec![(root, false)];
3772 while let Some((id, finalize)) = stack.pop() {
3773 if finalize {
3774 order.push(id);
3775 continue;
3776 }
3777 if !visited.insert(id) {
3778 continue;
3779 }
3780 stack.push((id, true));
3781 let dep_ids: Vec<NodeId> = s.require_node(id).dep_ids_vec();
3782 for dep_id in dep_ids {
3783 let (dep_is_state, dep_cache, dep_has_fired) = {
3784 let dep_rec = s.require_node(dep_id);
3785 (dep_rec.is_state(), dep_rec.cache, dep_rec.has_fired_once)
3786 };
3787 if !dep_is_state
3788 && dep_cache == NO_HANDLE
3789 && !dep_has_fired
3790 && !visited.contains(&dep_id)
3791 {
3792 stack.push((dep_id, false));
3793 }
3794 }
3795 }
3796
3797 // Phase 2: deliver caches in dep-first order. For each node, walk
3798 // its deps and call `deliver_data_to_consumer` for any with caches.
3799 // Producer nodes (no deps + has fn — Slice D, D031) have no deps
3800 // to walk; queue them directly into `pending_fires` so the wave
3801 // engine fires their fn once on activation.
3802 //
3803 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives on
3804 // per-thread WaveState. State lock + thread_local borrow are
3805 // independent; deliver_data_to_consumer also writes pending_fires
3806 // via WaveState (no nested with_wave_state borrows here).
3807 for &id in &order {
3808 let (dep_ids, is_producer) = {
3809 let rec = s.require_node(id);
3810 (rec.dep_ids_vec(), rec.is_producer())
3811 };
3812 if is_producer {
3813 crate::batch::with_wave_state(|ws| {
3814 ws.pending_fires.insert(id);
3815 });
3816 continue;
3817 }
3818 for (i, dep_id) in dep_ids.iter().copied().enumerate() {
3819 let dep_cache = s.require_node(dep_id).cache;
3820 if dep_cache != NO_HANDLE {
3821 self.deliver_data_to_consumer(s, id, i, dep_cache);
3822 }
3823 }
3824 }
3825 }
3826
3827 // -------------------------------------------------------------------
3828 // Emission entry point
3829 // -------------------------------------------------------------------
3830
3831 /// Set a state node's value. Triggers a wave (DIRTY → DATA/RESOLVED →
3832 /// fn fires for downstream).
3833 ///
3834 /// Silent no-op if the node has already terminated (R1.3.4). The handle
3835 /// passed in is still released by the caller's binding-side intern path
3836 /// — no implicit retain is consumed when the call short-circuits.
3837 ///
3838 /// # Panics
3839 ///
3840 /// Panics if `node_id` is not a state node, or if `new_handle` is
3841 /// [`NO_HANDLE`] (per R1.2.4, sentinel is not a valid DATA payload).
3842 pub fn emit(&self, node_id: NodeId, new_handle: HandleId) {
3843 match self.try_emit(node_id, new_handle) {
3844 Ok(()) => {}
3845 Err(e) => panic!("{e}"),
3846 }
3847 }
3848
3849 /// Fallible emit. Returns `Err` on partition order violation
3850 /// (Phase H+ STRICT, D115). The public `emit` calls this and
3851 /// unwraps; `emit_or_defer` calls this and defers on Err.
3852 pub(crate) fn try_emit(
3853 &self,
3854 node_id: NodeId,
3855 new_handle: HandleId,
3856 ) -> Result<(), PartitionOrderViolation> {
3857 // Step 2b-ii-B (D220-EXEC): route this whole entry point to the
3858 // node's shard. Single-node public entry points do a pre-wave
3859 // `lock_state().require_node(node)` BEFORE `try_run_wave_for`
3860 // sets the ambient — a grouped node's record is in shard `g`,
3861 // not `DEFAULT_SHARD`, so without this the validation panics
3862 // `unknown node`. Held for the whole fn; the nested
3863 // `try_run_wave_for(node_id)` re-derives the same key
3864 // (no-op restore). `None`/all-`None` ⇒ behaviour-identical.
3865 assert!(
3866 new_handle != NO_HANDLE,
3867 "NO_HANDLE is not a valid DATA payload (R1.2.4)"
3868 );
3869 // Validate + terminal short-circuit under a brief lock.
3870 //
3871 // emit() is valid for State and Producer nodes — both are
3872 // intrinsic sources whose values are not derived from declared
3873 // deps. State nodes get emit() from user code; Producer nodes
3874 // get emit() from sink callbacks the producer's build closure
3875 // registered (sink fires → re-enter Core → emit on self).
3876 // Derived / Dynamic / Operator nodes emit via their fn return
3877 // value through fire_fn / fire_operator, NOT via emit().
3878 //
3879 // §7-A (deferred): this standalone validation lock is redundant
3880 // with commit_emission Phase 1 and collapsing it is part of §7
3881 // item (1); §5b measured ~0% single-thread benefit so the
3882 // collapse is deferred (porting-deferred.md §7-A, flagged for
3883 // ratification). The normal-path validation is retained here.
3884 {
3885 let s = self.lock_state();
3886 let rec = s.require_node(node_id);
3887 assert!(
3888 rec.is_state() || rec.is_producer(),
3889 "emit() is for state or producer nodes only; \
3890 derived/dynamic/operator emit via their fn return value"
3891 );
3892 if rec.terminal.is_some() {
3893 drop(s);
3894 // Caller's intern share would otherwise leak; cache slot
3895 // ownership doesn't transfer because we're not advancing
3896 // cache. Released lock-released so the binding can't
3897 // deadlock against an internal binding mutex.
3898 self.binding.release_handle(new_handle);
3899 return Ok(());
3900 }
3901 }
3902 // Run the wave for `node_id`. Slice Y1 / Phase E: emit cascades
3903 // only via `s.children`. S4/D248: single-owner `!Send` Core —
3904 // one uninterrupted owner-side drain (no `wave_owner` mutex,
3905 // no cross-thread parallel waves within a Core); cross-`Core`
3906 // parallelism is host-native via independent per-worker Cores.
3907 self.try_run_wave_for(node_id, |this| {
3908 this.commit_emission(node_id, new_handle);
3909 })?;
3910 Ok(())
3911 }
3912
3913 /// Emit or defer to wave-end on partition order violation.
3914 /// For producer-pattern operator sinks. Retains `handle` on defer;
3915 /// the drain releases it after firing (or on discard).
3916 pub fn emit_or_defer(&self, node_id: NodeId, new_handle: HandleId) {
3917 if self.try_emit(node_id, new_handle).is_err() {
3918 self.binding.retain_handle(new_handle);
3919 self.push_deferred_producer_op(DeferredProducerOp::Emit {
3920 node_id,
3921 handle: new_handle,
3922 });
3923 }
3924 }
3925
3926 /// Read a node's current cache. Returns [`NO_HANDLE`] if sentinel.
3927 #[must_use]
3928 pub fn cache_of(&self, node_id: NodeId) -> HandleId {
3929 self.lock_state().require_node(node_id).cache
3930 }
3931
3932 /// Whether the node's fn has fired at least once (compute) OR it has had
3933 /// a non-sentinel value (state).
3934 #[must_use]
3935 pub fn has_fired_once(&self, node_id: NodeId) -> bool {
3936 self.lock_state().require_node(node_id).has_fired_once
3937 }
3938
3939 // -------------------------------------------------------------------
3940 // Read-side inspection helpers (Slice E+, M2)
3941 //
3942 // Non-panicking accessors for graph-layer introspection (`describe()`,
3943 // `observe()`, `node_count()`). All five return Option/empty for
3944 // unknown ids — they're meant to back walks over `node_ids()` where
3945 // the caller already knows the ids are valid, plus debugging /
3946 // dry-run probes that prefer "absence" over "panic".
3947 //
3948 // Keep these strictly read-only: no wave entry, no binding callbacks,
3949 // no lock release. Each takes the state lock once, copies a small
3950 // value, and drops the lock.
3951 // -------------------------------------------------------------------
3952
3953 /// Snapshot of every registered `NodeId` in unspecified order. The
3954 /// order matches `HashMap` iteration over the internal node table —
3955 /// callers that need stable ordering should track names at the
3956 /// `Graph` layer (canonical spec §3.5 namespace).
3957 #[must_use]
3958 pub fn node_ids(&self) -> Vec<NodeId> {
3959 self.lock_state().nodes.keys().copied().collect()
3960 }
3961
3962 /// Total number of nodes registered in this Core.
3963 #[must_use]
3964 pub fn node_count(&self) -> usize {
3965 self.lock_state().nodes.len()
3966 }
3967
3968 /// Returns `Some(kind)` for known nodes, `None` for unknown. Kind is
3969 /// **derived** from the field shape per D030 — see [`NodeKind`].
3970 #[must_use]
3971 pub fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
3972 self.lock_state().nodes.get(&node_id).map(NodeRecord::kind)
3973 }
3974
3975 /// Snapshot of the node's deps in declaration order. Empty for
3976 /// unknown nodes or for state nodes (which have no deps).
3977 #[must_use]
3978 pub fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
3979 self.lock_state()
3980 .nodes
3981 .get(&node_id)
3982 .map(NodeRecord::dep_ids_vec)
3983 .unwrap_or_default()
3984 }
3985
3986 /// Returns `Some(kind)` if the node has terminated (R1.3.4) — the
3987 /// pair `Some(Complete)` / `Some(Error(h))` mirrors the wire message
3988 /// the node emitted. `None` for live nodes or unknown ids.
3989 #[must_use]
3990 pub fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
3991 self.lock_state()
3992 .nodes
3993 .get(&node_id)
3994 .and_then(|r| r.terminal)
3995 }
3996
3997 /// Whether the node has wave-scoped DIRTY pending (a tier-1 message
3998 /// queued but the matching tier-3 settle has not yet flushed).
3999 /// `false` for unknown ids. Mostly useful for `describe()` status
4000 /// classification (R3.6.1 `"dirty"`).
4001 #[must_use]
4002 pub fn is_dirty(&self, node_id: NodeId) -> bool {
4003 self.lock_state()
4004 .nodes
4005 .get(&node_id)
4006 .is_some_and(|r| r.dirty)
4007 }
4008
4009 /// Snapshot of `parent`'s meta companion list (R1.3.9.d / R2.3.3 —
4010 /// the companions added via [`Self::add_meta_companion`]). Empty
4011 /// for unknown ids or for nodes with no companions registered.
4012 ///
4013 /// Used by the graph layer's `signal_invalidate` to filter meta
4014 /// children out of the broadcast (canonical R3.7.2 — meta caches
4015 /// are preserved across graph-wide INVALIDATE).
4016 #[must_use]
4017 pub fn meta_companions_of(&self, parent: NodeId) -> Vec<NodeId> {
4018 self.lock_state()
4019 .nodes
4020 .get(&parent)
4021 .map(|r| r.meta_companions.clone())
4022 .unwrap_or_default()
4023 }
4024
4025 // -------------------------------------------------------------------
4026 // Wave engine — lives in `crate::batch` (Slice C-1 module split;
4027 // Slice A close M1 refactor lifted the binding-callback re-entrance
4028 // restrictions). The methods are still on `Core`; see `batch.rs` for:
4029 //
4030 // - `run_wave` — wave entry, manages own locking.
4031 // - `drain_and_flush` — drain phase, lock-released around invoke_fn.
4032 // - `commit_emission` — lock-released around custom_equals.
4033 // - `pick_next_fire`, `deliver_data_to_consumer`, `queue_notify`,
4034 // `flush_notifications` — wave-engine helpers.
4035 // -------------------------------------------------------------------
4036}
4037
4038// -----------------------------------------------------------------------
4039// COMPLETE / ERROR — terminal lifecycle + auto-cascade gating
4040// -----------------------------------------------------------------------
4041
4042impl Core {
4043 /// Emit `[COMPLETE]` (R1.3.4) on `node_id`, marking it terminal. After
4044 /// this call:
4045 ///
4046 /// - Subsequent `Core::emit` on this node is a silent no-op (idempotent
4047 /// termination).
4048 /// - The node's fn no longer fires.
4049 /// - The node's cache is preserved (last value still observable via
4050 /// `cache_of`).
4051 /// - Children receive `[COMPLETE]` (tier 5 — bypasses pause buffer).
4052 /// - Auto-cascade gating (Lock 2.B): each child that has all of its
4053 /// deps in a terminal state auto-emits its own `[COMPLETE]`. ERROR
4054 /// dominates COMPLETE — if any of a child's deps emitted ERROR, the
4055 /// child auto-cascades that ERROR instead.
4056 ///
4057 /// Idempotent: calling `complete` on an already-terminal node is a no-op.
4058 ///
4059 /// # Panics
4060 ///
4061 /// Panics if `node_id` is unknown.
4062 pub fn complete(&self, node_id: NodeId) {
4063 match self.try_complete(node_id) {
4064 Ok(()) => {}
4065 Err(e) => panic!("{e}"),
4066 }
4067 }
4068
4069 /// Fallible complete. Returns `Err` on partition order violation.
4070 pub(crate) fn try_complete(&self, node_id: NodeId) -> Result<(), PartitionOrderViolation> {
4071 self.try_emit_terminal(node_id, TerminalKind::Complete)
4072 }
4073
4074 /// Complete or defer to wave-end on partition order violation.
4075 /// For producer-pattern operator sinks.
4076 pub fn complete_or_defer(&self, node_id: NodeId) {
4077 match self.try_complete(node_id) {
4078 Ok(()) => {}
4079 Err(_) => {
4080 self.push_deferred_producer_op(DeferredProducerOp::Complete { node_id });
4081 }
4082 }
4083 }
4084
4085 /// Emit `[ERROR, error_handle]` (R1.3.4) on `node_id`. `error_handle`
4086 /// must resolve to a non-sentinel value (R1.2.5) — the binding side has
4087 /// already interned the error value before this call. Same lifecycle
4088 /// effects as [`Self::complete`]; ERROR dominates COMPLETE in auto-
4089 /// cascade gating.
4090 ///
4091 /// # Panics
4092 ///
4093 /// Panics if `node_id` is unknown or `error_handle == NO_HANDLE`.
4094 pub fn error(&self, node_id: NodeId, error_handle: HandleId) {
4095 match self.try_error(node_id, error_handle) {
4096 Ok(()) => {}
4097 Err(e) => panic!("{e}"),
4098 }
4099 }
4100
4101 /// Fallible error. Returns `Err` on partition order violation.
4102 pub(crate) fn try_error(
4103 &self,
4104 node_id: NodeId,
4105 error_handle: HandleId,
4106 ) -> Result<(), PartitionOrderViolation> {
4107 assert!(
4108 error_handle != NO_HANDLE,
4109 "NO_HANDLE is not a valid ERROR payload (R1.2.5)"
4110 );
4111 self.try_emit_terminal(node_id, TerminalKind::Error(error_handle))?;
4112 // The caller's intern share for `error_handle` is NOT transferred
4113 // to any slot — `terminate_node` takes its OWN retain for every
4114 // populated `terminal` and `dep_terminals` slot. Release the
4115 // caller's share here (mirrors `Core::emit`'s short-circuit
4116 // release on terminal). Without this, every `error()` call leaks
4117 // one binding-side handle ref. Slice A-bigger /qa item D fix.
4118 self.binding.release_handle(error_handle);
4119 Ok(())
4120 }
4121
4122 /// Error or defer to wave-end on partition order violation.
4123 /// For producer-pattern operator sinks. Retains `handle` on defer;
4124 /// the drain releases it after firing (or on discard).
4125 pub fn error_or_defer(&self, node_id: NodeId, error_handle: HandleId) {
4126 if self.try_error(node_id, error_handle).is_err() {
4127 self.binding.retain_handle(error_handle);
4128 self.push_deferred_producer_op(DeferredProducerOp::Error {
4129 node_id,
4130 handle: error_handle,
4131 });
4132 }
4133 }
4134
4135 fn try_emit_terminal(
4136 &self,
4137 node_id: NodeId,
4138 terminal: TerminalKind,
4139 ) -> Result<(), PartitionOrderViolation> {
4140 {
4141 let s = self.lock_state();
4142 assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
4143 }
4144 // Wave on `node_id`'s touched partitions (Slice Y1 / Phase E).
4145 // COMPLETE / ERROR cascade follows `s.children` (in-partition by
4146 // union-find construction). The thunk acquires its own state lock
4147 // to queue the cascade.
4148 self.try_run_wave_for(node_id, |this| {
4149 let mut s = this.lock_state();
4150 this.terminate_node(&mut s, node_id, terminal);
4151 })
4152 }
4153
4154 /// Set the node's terminal slot, queue the wire message, and cascade to
4155 /// children. Idempotent on already-terminal node (no-op).
4156 ///
4157 /// Iterative implementation (Slice A-bigger, M1-close): a work-queue
4158 /// drives the cascade so deep linear chains don't overflow the OS
4159 /// thread stack. Mirrors `path_from_to`'s explicit-stack pattern.
4160 fn terminate_node(&self, s: &mut St<'_>, node_id: NodeId, terminal: TerminalKind) {
4161 let mut work: Vec<(NodeId, TerminalKind)> = vec![(node_id, terminal)];
4162 while let Some((id, t)) = work.pop() {
4163 if s.require_node(id).terminal.is_some() {
4164 continue; // Idempotent — already terminal.
4165 }
4166 // Take a refcount share for the terminal slot so the error
4167 // handle outlives the binding-side intern's transient share.
4168 if let TerminalKind::Error(h) = t {
4169 self.binding.retain_handle(h);
4170 }
4171 // Slice E2 /qa Q2(b) (D069): if a resubscribable node is
4172 // terminating with no live subscribers, queue eager
4173 // `wipe_ctx` for the wave's lock-released drain. This is the
4174 // mutually-exclusive complement of the `Subscription::Drop`
4175 // wipe site: when the LAST sub drops first then terminate
4176 // fires, subs are empty here and we queue; when terminate
4177 // fires WITH subs still alive, we DON'T queue (subs not
4178 // empty), and `Subscription::Drop` will fire wipe directly
4179 // when those subs eventually drop. Either way, exactly one
4180 // wipe fires per terminal lifecycle.
4181 let queue_wipe = {
4182 let rec = s.require_node(id);
4183 rec.resubscribable && rec.subscribers.is_empty()
4184 };
4185 s.require_node_mut(id).terminal = Some(t);
4186 // Q-beyond Sub-slice 2 + 3 (D108, 2026-05-09): pending_fires
4187 // and pending_wipes both live on per-thread WaveState. Single
4188 // borrow handles the queue-wipe push and the pending_fires
4189 // remove.
4190 crate::batch::with_wave_state(|ws| {
4191 if queue_wipe {
4192 ws.pending_wipes.push(id);
4193 }
4194 // Drain pending fires for this node — fn won't fire on a
4195 // terminal node.
4196 ws.pending_fires.remove(&id);
4197 });
4198 // R1.3.8.b / Slice F (A3, 2026-05-07): if this node was paused
4199 // when terminating (the canonical case is the R1.3.8.c overflow
4200 // ERROR synthesis path), drain the pause buffer and release
4201 // each payload's queue_notify-time retain. Without this, the
4202 // buffer leaks one share per buffered DATA/RESOLVED/INVALIDATE.
4203 // Subscribers receive the terminal directly via the cascade
4204 // below (tier-5 bypasses the pause buffer); the buffered
4205 // content is moot post-terminal.
4206 let drained: Vec<HandleId> = {
4207 let rec = s.require_node_mut(id);
4208 let mut drained: Vec<HandleId> = Vec::new();
4209 if rec.pause_state.is_paused() {
4210 // Take the buffered messages out, then collapse the
4211 // pause state to Active so subsequent code observes a
4212 // clean lifecycle. Idempotent on Active (no-op).
4213 let prev = std::mem::replace(&mut rec.pause_state, PauseState::Active);
4214 if let PauseState::Paused { buffer, .. } = prev {
4215 drained.extend(buffer.into_iter().filter_map(Message::payload_handle));
4216 }
4217 }
4218 // QA A4 (2026-05-07): drain replay buffer on terminate. A
4219 // non-resubscribable terminal ends the lifecycle; without
4220 // this drain the buffer's retains leak until `Drop for
4221 // CoreState`. Resubscribable nodes' replay buffers are
4222 // also drained (when they're hit by a terminal cascade);
4223 // a fresh subscribe rebuilds the buffer from scratch as
4224 // part of `reset_for_fresh_lifecycle`.
4225 drained.extend(rec.replay_buffer.drain(..));
4226 drained
4227 };
4228 for h in drained {
4229 self.binding.release_handle(h);
4230 }
4231 // Queue the wire message (tier 5 — bypasses pause buffer).
4232 let msg = match t {
4233 TerminalKind::Complete => Message::Complete,
4234 TerminalKind::Error(h) => Message::Error(h),
4235 };
4236 self.queue_notify(s, id, msg);
4237 // Cascade to children.
4238 let child_ids: Vec<NodeId> = s
4239 .children
4240 .get(&id)
4241 .map(|c| c.iter().copied().collect())
4242 .unwrap_or_default();
4243 for child_id in child_ids {
4244 let dep_idx = s.require_node(child_id).dep_index_of(id);
4245 let Some(idx) = dep_idx else { continue };
4246 // Mark this child's per-dep terminal slot. Take a retain on
4247 // the error handle for the slot share.
4248 {
4249 let child = s.require_node_mut(child_id);
4250 if child.dep_records[idx].terminal.is_some() {
4251 // Idempotent — child already saw this dep terminate.
4252 continue;
4253 }
4254 child.dep_records[idx].terminal = Some(t);
4255 }
4256 if let TerminalKind::Error(h) = t {
4257 self.binding.retain_handle(h);
4258 }
4259 // Auto-cascade gating: if all deps now terminal, push child
4260 // onto the work queue with the chosen terminal.
4261 //
4262 // Slice C-1: kinds that opt out of Lock 2.B (currently
4263 // `Operator(Reduce)`) intercept upstream COMPLETE so they
4264 // can emit their accumulator before terminating. Instead of
4265 // cascading, queue the child for fn-fire — `fire_operator`
4266 // sees `dep_records[0].terminal` set and emits the
4267 // appropriate batch (Data(acc) + Complete on COMPLETE,
4268 // Error(h) on ERROR).
4269 let action = {
4270 let child = s.require_node(child_id);
4271 if child.terminal.is_some() {
4272 ChildAction::None // Already terminated — no-op.
4273 } else if child.all_deps_terminal() {
4274 if child.skips_auto_cascade() {
4275 ChildAction::QueueFire
4276 } else {
4277 ChildAction::Cascade(pick_cascade_terminal(&child.dep_records))
4278 }
4279 } else {
4280 ChildAction::None
4281 }
4282 };
4283 match action {
4284 ChildAction::None => {}
4285 ChildAction::Cascade(t_child) => {
4286 work.push((child_id, t_child));
4287 }
4288 ChildAction::QueueFire => {
4289 // Q-beyond Sub-slice 2 (D108, 2026-05-09):
4290 // pending_fires lives on per-thread WaveState.
4291 crate::batch::with_wave_state(|ws| {
4292 ws.pending_fires.insert(child_id);
4293 });
4294 }
4295 }
4296 }
4297 }
4298 }
4299}
4300
4301/// Outcome of Lock 2.B child gating in `terminate_node`'s cascade walk.
4302enum ChildAction {
4303 /// No cascade; child is already terminal or not yet all-deps-terminal.
4304 None,
4305 /// Auto-cascade with the picked terminal kind (ERROR dominates COMPLETE).
4306 Cascade(TerminalKind),
4307 /// Queue child for fn-fire instead of cascading. Used by operator
4308 /// kinds that intercept upstream terminal (Operator(Reduce)).
4309 QueueFire,
4310}
4311
4312/// Lock 2.B cascade-terminal selection: ERROR dominates COMPLETE; first
4313/// ERROR seen wins. Caller has already verified all deps are terminal.
4314fn pick_cascade_terminal(dep_records: &[DepRecord]) -> TerminalKind {
4315 for dr in dep_records {
4316 if let Some(TerminalKind::Error(h)) = dr.terminal {
4317 return TerminalKind::Error(h);
4318 }
4319 }
4320 TerminalKind::Complete
4321}
4322
4323// -----------------------------------------------------------------------
4324// TEARDOWN — destruction, with auto-COMPLETE prepend (R2.6.4 / Lock 6.F)
4325// -----------------------------------------------------------------------
4326
4327impl Core {
4328 /// Tear `node_id` down. Per R2.6.4 / Lock 6.F:
4329 ///
4330 /// - **Auto-prepend COMPLETE.** If the node has not yet emitted a
4331 /// terminal (`COMPLETE` / `ERROR`), `terminate_node` is called with
4332 /// `Complete` first so subscribers see `[COMPLETE, TEARDOWN]`, not
4333 /// bare `[TEARDOWN]`. This guarantees a clean end-of-stream signal
4334 /// to async iterators and other consumers that wait on terminal
4335 /// delivery.
4336 /// - **Idempotent on duplicate delivery.** The per-node
4337 /// `has_received_teardown` flag is set on the first call; subsequent
4338 /// `teardown` calls (or cascade visits from other paths) are silent
4339 /// no-ops — no second `[COMPLETE, TEARDOWN]` pair to subscribers.
4340 /// - **Cascade downstream.** Each child is recursively torn down. The
4341 /// child's own COMPLETE auto-cascades from `terminate_node`'s logic
4342 /// (Lock 2.B); its TEARDOWN comes from this cascade.
4343 ///
4344 /// # Panics
4345 ///
4346 /// Panics if `node_id` is unknown.
4347 pub fn teardown(&self, node_id: NodeId) {
4348 match self.try_teardown(node_id) {
4349 Ok(()) => {}
4350 Err(e) => panic!("{e}"),
4351 }
4352 }
4353
4354 /// Teardown or defer to wave-end on partition order violation.
4355 /// For producer-pattern operator sinks.
4356 pub fn teardown_or_defer(&self, node_id: NodeId) {
4357 match self.try_teardown(node_id) {
4358 Ok(()) => {}
4359 Err(_) => {
4360 // S2b (D223): `Core` is no longer `Clone`. The old
4361 // `DeferredProducerOp::Callback(move || core.teardown(..))`
4362 // boxed a cloned `Core` only to have
4363 // `push_deferred_producer_op` run it *immediately* (the
4364 // deferred queue is a deleted D211 no-op shim). A direct
4365 // owner-context call is behaviour-identical and drops the
4366 // pointless `Send`-closure (which now needs `C: Sync`).
4367 self.teardown(node_id);
4368 }
4369 }
4370 }
4371
4372 fn try_teardown(&self, node_id: NodeId) -> Result<(), PartitionOrderViolation> {
4373 {
4374 let s = self.lock_state();
4375 assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
4376 }
4377 let torn_down: Arc<Mutex<Vec<NodeId>>> = Arc::new(Mutex::new(Vec::new()));
4378 let torn_down_for_wave = torn_down.clone();
4379 // TEARDOWN cascade follows `s.children` AND `meta_companions`
4380 // (R1.3.9.d) — meta-companions can cross partitions. Slice Y1 /
4381 // Phase E `compute_touched_partitions(node_id)` (called by
4382 // `run_wave_for`) walks both axes so the wave acquires every
4383 // partition reachable via the cascade.
4384 self.try_run_wave_for(node_id, move |this| {
4385 let mut s = this.lock_state();
4386 let collected = this.teardown_inner(&mut s, node_id);
4387 torn_down_for_wave.lock().extend(collected);
4388 })?;
4389 // Fire NodeTornDown for every cascaded id (root + metas +
4390 // downstream consumers that auto-cascaded). Outside the state
4391 // lock, matching fire_topology_event discipline.
4392 let ids = std::mem::take(&mut *torn_down.lock());
4393 for id in ids {
4394 self.fire_topology_event(&crate::topology::TopologyEvent::NodeTornDown(id));
4395 }
4396 Ok(())
4397 }
4398
4399 /// Iterative teardown walk (Slice A-bigger, M1-close).
4400 ///
4401 /// The recursive shape was:
4402 /// ```text
4403 /// teardown(n):
4404 /// if torn_down: return
4405 /// mark torn_down
4406 /// for meta in metas: teardown(meta)
4407 /// terminate_node + queue Teardown
4408 /// for child in children: teardown(child)
4409 /// ```
4410 /// Deep linear chains (~10k nodes) overflowed the OS thread stack.
4411 ///
4412 /// The iterative shape uses a `Vec<Action>` stack with `Visit` and
4413 /// `EmitTeardown` actions. `Visit(n)` marks `n` torn-down (or no-ops
4414 /// if already), then pushes (in reverse order so LIFO pops in forward
4415 /// order) `Visit(child_K), …, Visit(child_1), EmitTeardown(n),
4416 /// Visit(meta_M), …, Visit(meta_1)`. The R1.3.9.d "metas first, then
4417 /// self, then children" ordering is preserved by the push order:
4418 /// metas pop first, recursively expand and emit; then `EmitTeardown(n)`
4419 /// pops and runs `terminate_node` + queue `Teardown`; then children
4420 /// pop. Idempotency via `has_received_teardown` keeps each node
4421 /// visited at most once even when multi-parent diamonds re-enter via
4422 /// a sibling path.
4423 fn teardown_inner(&self, s: &mut St<'_>, root: NodeId) -> Vec<NodeId> {
4424 enum Action {
4425 Visit(NodeId),
4426 EmitTeardown(NodeId),
4427 }
4428 let mut stack: Vec<Action> = vec![Action::Visit(root)];
4429 // Topology accumulator: every node that actually emits TEARDOWN
4430 // (i.e. each `EmitTeardown(id)` site, NOT each `Visit` — visits
4431 // for already-torn-down nodes short-circuit on idempotency).
4432 let mut torn_down: Vec<NodeId> = Vec::new();
4433 while let Some(action) = stack.pop() {
4434 match action {
4435 Action::Visit(id) => {
4436 if s.require_node(id).has_received_teardown {
4437 continue; // Idempotent (R2.6.4).
4438 }
4439 s.require_node_mut(id).has_received_teardown = true;
4440 // Push order: children first (pop LAST), then
4441 // EmitTeardown(id), then metas (pop FIRST). Reverse
4442 // each list so within-group order matches the original
4443 // recursive iteration.
4444 let children: Vec<NodeId> = s
4445 .children
4446 .get(&id)
4447 .map(|c| c.iter().copied().collect())
4448 .unwrap_or_default();
4449 for &child in children.iter().rev() {
4450 stack.push(Action::Visit(child));
4451 }
4452 stack.push(Action::EmitTeardown(id));
4453 let metas: Vec<NodeId> = s.require_node(id).meta_companions.clone();
4454 for &meta in metas.iter().rev() {
4455 stack.push(Action::Visit(meta));
4456 }
4457 }
4458 Action::EmitTeardown(id) => {
4459 // Auto-prepend COMPLETE if not yet terminal. The (now
4460 // iterative) terminate_node handles auto-cascade to
4461 // children's own terminal slots per Lock 2.B.
4462 let already_terminal = s.require_node(id).terminal.is_some();
4463 if !already_terminal {
4464 self.terminate_node(s, id, TerminalKind::Complete);
4465 }
4466 // Wire emission of the TEARDOWN itself (tier 6).
4467 self.queue_notify(s, id, Message::Teardown);
4468 torn_down.push(id);
4469 }
4470 }
4471 }
4472 torn_down
4473 }
4474
4475 /// Attach `companion` as a meta companion of `parent` per R1.3.9.d.
4476 /// Meta companions are nodes whose lifecycle is bound to the parent's
4477 /// in TEARDOWN ordering: when `parent` tears down, `companion` tears
4478 /// down first.
4479 ///
4480 /// Use this for inspection / audit / sidecar nodes that subscribe to
4481 /// parent state — without the ordering, the companion could observe
4482 /// the parent mid-destruction and emit garbage.
4483 ///
4484 /// Idempotent on duplicate registration of the same companion.
4485 ///
4486 /// # Lifecycle constraint
4487 ///
4488 /// Intended for **setup-time** wiring — call this before `parent` or
4489 /// `companion` enters a wave. Mid-wave registration (especially during
4490 /// a teardown cascade in flight) is implementation-defined: the new
4491 /// edge takes effect on the *next* wave. Adding a companion to a
4492 /// torn-down parent silently no-ops (the parent will not tear down
4493 /// again). For dynamic companion attachment with deterministic
4494 /// ordering, prefer constructing the wiring before subscribers exist.
4495 ///
4496 /// # Panics
4497 ///
4498 /// Panics if either node id is unknown, or if `parent == companion`
4499 /// (a node cannot be its own meta companion — would loop on TEARDOWN).
4500 pub fn add_meta_companion(&self, parent: NodeId, companion: NodeId) {
4501 assert!(parent != companion, "node cannot be its own meta companion");
4502 // D246/S2c: single-owner ⇒ ONE shard; the prior cross-shard
4503 // `shard_of(parent) == shard_of(companion)` assert is vacuous
4504 // and deleted. D253 (S5) further deletes the §7 group-
4505 // consistency invariant on this edge (the `SchedulingGroupId`
4506 // surface is gone until M6 re-introduces it).
4507 let mut s = self.lock_state();
4508 assert!(s.nodes.contains_key(&parent), "unknown parent {parent:?}");
4509 assert!(
4510 s.nodes.contains_key(&companion),
4511 "unknown companion {companion:?}"
4512 );
4513 let metas = &mut s.require_node_mut(parent).meta_companions;
4514 if metas.contains(&companion) {
4515 return;
4516 }
4517 metas.push(companion);
4518 }
4519}
4520
4521// -----------------------------------------------------------------------
4522// INVALIDATE — cache clear + downstream cascade
4523// -----------------------------------------------------------------------
4524
4525impl Core {
4526 /// Clear `node_id`'s cache and cascade `[INVALIDATE]` to downstream
4527 /// dependents per canonical spec §1.4.
4528 ///
4529 /// Semantics:
4530 /// - **Never-populated case (R1.4 line 197):** if `cache == NO_HANDLE`,
4531 /// the call is a no-op — no cache to clear, no INVALIDATE emitted.
4532 /// This naturally provides idempotency within a wave: once a node has
4533 /// been invalidated this wave (cache = NO_HANDLE), a second invalidate
4534 /// on the same node does nothing.
4535 /// - **Cache clear (immediate):** the node's cached handle is dropped
4536 /// (refcount released), `cache` becomes `NO_HANDLE`. State nodes
4537 /// keep `has_fired_once` per spec — INVALIDATE is not a re-gating
4538 /// event (the next emission to a previously-fired state still does
4539 /// not re-trigger the first-run gate; that's a resubscribable-terminal
4540 /// lifecycle concern, separate slice).
4541 /// - **Wire emission (tier 4):** `[INVALIDATE]` is queued via the
4542 /// normal pause-aware notify path. Buffers while paused, flushes
4543 /// immediately otherwise.
4544 /// - **Downstream cascade:** for each child of this node, the child's
4545 /// `dep_handles[idx_of_node]` is reset to `NO_HANDLE` (its previous
4546 /// value referenced a now-released handle). The child is then
4547 /// recursively invalidated (no-op if its cache was already
4548 /// `NO_HANDLE`). This re-closes the child's first-run gate — fn
4549 /// won't fire again until the upstream re-emits a value.
4550 ///
4551 /// Wraps in a fresh wave when called from outside a wave, so
4552 /// notifications flush at the natural wave boundary.
4553 ///
4554 /// # Panics
4555 ///
4556 /// Panics if `node_id` is unknown, consistent with `emit` / `pause`.
4557 pub fn invalidate(&self, node_id: NodeId) {
4558 {
4559 let s = self.lock_state();
4560 assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
4561 }
4562 // INVALIDATE cascade follows `s.children` (in-partition by union-
4563 // find construction). Slice Y1 / Phase E.
4564 self.run_wave_for(node_id, |this| {
4565 let mut s = this.lock_state();
4566 this.invalidate_inner(&mut s, node_id);
4567 });
4568 }
4569
4570 /// Invalidate or defer to wave-end on partition order violation.
4571 /// For producer-pattern operator sinks.
4572 ///
4573 /// # Panics
4574 ///
4575 /// Panics if `node_id` is not registered in this Core.
4576 pub fn invalidate_or_defer(&self, node_id: NodeId) {
4577 {
4578 let s = self.lock_state();
4579 assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
4580 }
4581 let result = self.try_run_wave_for(node_id, |this| {
4582 let mut s = this.lock_state();
4583 this.invalidate_inner(&mut s, node_id);
4584 });
4585 if result.is_err() {
4586 // S2b (D223): direct owner-context call — see the identical
4587 // rationale in `teardown_or_defer` (immediate-run no-op shim;
4588 // no `Clone`, no pointless `Send` closure needing `C: Sync`).
4589 self.invalidate(node_id);
4590 }
4591 }
4592
4593 /// Iterative invalidate cascade (Slice A-bigger, M1-close).
4594 ///
4595 /// The recursive shape was a depth-first cache-clear walk:
4596 /// ```text
4597 /// invalidate(n):
4598 /// if cache(n) == NO_HANDLE: return // already-invalidated guard
4599 /// cache(n) = NO_HANDLE; release handle
4600 /// queue Invalidate(n)
4601 /// for child in children:
4602 /// child.dep_handles[idx] = NO_HANDLE
4603 /// invalidate(child)
4604 /// ```
4605 /// Deep linear chains overflowed the OS thread stack. The work-queue
4606 /// rewrite has no ordering subtleties (unlike teardown's R1.3.9.d
4607 /// metas-first constraint) — Invalidate is a tier-4 broadcast where
4608 /// the never-populated / already-invalidated guard provides natural
4609 /// idempotency for diamond fan-in.
4610 fn invalidate_inner(&self, s: &mut St<'_>, root: NodeId) {
4611 let mut work: Vec<NodeId> = vec![root];
4612 while let Some(node_id) = work.pop() {
4613 // Never-populated / already-invalidated: no-op (R1.4 idempotency).
4614 // Per R1.3.9.c never-populated case, OnInvalidate cleanup hook
4615 // also does NOT fire — natural fallout of skipping via the
4616 // cache==NO_HANDLE guard (we never reach the queue-push below).
4617 let old_handle = s.require_node(node_id).cache;
4618 if old_handle == NO_HANDLE {
4619 continue;
4620 }
4621 // Clear cache + release the handle's slot ownership.
4622 s.require_node_mut(node_id).cache = NO_HANDLE;
4623 self.binding.release_handle(old_handle);
4624 // Slice E2 (R1.3.9.b strict per D057 + D058 fire-at-cache-clear):
4625 // queue OnInvalidate cleanup hook for lock-released drain at
4626 // wave-end. The dedup set guarantees at-most-once-per-wave-per-
4627 // node firing even if a node re-populates mid-wave (via fn-fire
4628 // emit) and gets re-invalidated through a separate path. Pure
4629 // cache==NO_HANDLE idempotency (above) catches "still at
4630 // sentinel" only; the explicit set is the strict R1.3.9.b
4631 // reading.
4632 // Q-beyond Sub-slice 3 (D108, 2026-05-09):
4633 // `invalidate_hooks_fired_this_wave` and
4634 // `deferred_cleanup_hooks` both live on per-thread WaveState.
4635 // Single borrow handles the dedup-insert and (on first
4636 // insertion) the cleanup-hook push.
4637 crate::batch::with_wave_state(|ws| {
4638 if ws.invalidate_hooks_fired_this_wave.insert(node_id) {
4639 ws.deferred_cleanup_hooks
4640 .push((node_id, CleanupTrigger::OnInvalidate));
4641 }
4642 });
4643 // Wire emission. Pause-aware via queue_notify.
4644 self.queue_notify(s, node_id, Message::Invalidate);
4645 // Cascade: for each child, clear the dep record's prev_data
4646 // referencing this node and push child onto the work queue.
4647 let child_ids: Vec<NodeId> = s
4648 .children
4649 .get(&node_id)
4650 .map(|c| c.iter().copied().collect())
4651 .unwrap_or_default();
4652 for child_id in child_ids {
4653 let dep_idx = s.require_node(child_id).dep_index_of(node_id);
4654 if let Some(idx) = dep_idx {
4655 // Reset the child's dep record — the handle was just
4656 // released. Subsequent first-run-gate checks see
4657 // sentinel and re-close.
4658 //
4659 // Snapshot prev_data + data_batch retains for deferred
4660 // release, then clear the record. Two-phase to satisfy
4661 // the borrow checker (nodes + deferred_handle_releases
4662 // are separate CoreState fields).
4663 let (old_prev, batch_hs): (HandleId, SmallVec<[HandleId; 1]>) = {
4664 let dr = &s.require_node(child_id).dep_records[idx];
4665 (dr.prev_data, dr.data_batch.clone())
4666 };
4667 {
4668 // Q-beyond Sub-slice 1 (D108, 2026-05-09):
4669 // deferred_handle_releases moved to per-thread
4670 // WaveState thread_local. State lock held; the
4671 // thread_local borrow is independent.
4672 crate::batch::with_wave_state(|ws| {
4673 if old_prev != NO_HANDLE {
4674 ws.deferred_handle_releases.push(old_prev);
4675 }
4676 for h in batch_hs {
4677 ws.deferred_handle_releases.push(h);
4678 }
4679 });
4680 }
4681 let child_rec = s.require_node_mut(child_id);
4682 child_rec.dep_records[idx].prev_data = NO_HANDLE;
4683 child_rec.dep_records[idx].data_batch.clear();
4684 // §10.13 perf (D047): clear received_mask bit so
4685 // has_sentinel_deps() re-closes the first-run gate.
4686 if idx < 64 {
4687 child_rec.received_mask &= !(1u64 << idx);
4688 }
4689 work.push(child_id);
4690 }
4691 }
4692 }
4693 }
4694}
4695
4696// -----------------------------------------------------------------------
4697// PAUSE / RESUME — multi-pauser lockset + replay buffer
4698// -----------------------------------------------------------------------
4699
4700/// Reported back from [`Core::resume`] when the final lock releases.
4701///
4702/// `replayed` is the number of tier-3/tier-4 messages dispatched to
4703/// subscribers as part of the drain. `dropped` is the number of messages
4704/// that fell out the front of the buffer due to the Core-global
4705/// `pause_buffer_cap` while this pause cycle was active. A non-zero
4706/// `dropped` indicates a controller held the lock long enough to overflow
4707/// the cap; the binding may want to surface a warning or error.
4708#[derive(Copy, Clone, Debug, PartialEq, Eq)]
4709pub struct ResumeReport {
4710 pub replayed: u32,
4711 pub dropped: u32,
4712}
4713
4714impl Core {
4715 /// Acquire a pause lock on `node_id`. The first lock transitions the
4716 /// node from `Active` to `Paused`; further locks add to the lockset.
4717 /// While paused, tier-3 (DATA/RESOLVED) and tier-4 (INVALIDATE) outgoing
4718 /// messages buffer in the node's pause buffer; other tiers flush
4719 /// immediately.
4720 ///
4721 /// Re-acquiring the same `lock_id` is an idempotent no-op (matches TS
4722 /// convention, R1.2.6 silent on the case).
4723 pub fn pause(&self, node_id: NodeId, lock_id: LockId) -> Result<(), PauseError> {
4724 let mut s = self.lock_state();
4725 let rec = s
4726 .nodes
4727 .get_mut(&node_id)
4728 .ok_or(PauseError::UnknownNode(node_id))?;
4729 // QA A5 (2026-05-07): terminated nodes can't be re-paused. Without
4730 // this check, a stale pause-controller calling pause() on an
4731 // already-terminated node would re-arm `pause_state` to Paused.
4732 // The terminate_node path collapses pause_state → Active and
4733 // drains the buffer (A3-related), but doesn't gate subsequent
4734 // pause() calls. Treat as idempotent no-op (consistent with how
4735 // emit/complete/error early-return on terminal).
4736 if rec.terminal.is_some() {
4737 return Ok(());
4738 }
4739 // Slice F audit close (2026-05-07): `PausableMode::Off` means the
4740 // dispatcher ignores PAUSE for this node — tier-3 flushes
4741 // immediately, fn fires immediately. Treat the call as a successful
4742 // no-op so callers don't need to special-case.
4743 if rec.pausable == PausableMode::Off {
4744 return Ok(());
4745 }
4746 rec.pause_state.add_lock(lock_id);
4747 Ok(())
4748 }
4749
4750 /// Release a pause lock on `node_id`. If the lockset becomes empty, the
4751 /// node transitions back to `Active` and the buffered messages are
4752 /// dispatched to subscribers in arrival order. Returns a [`ResumeReport`]
4753 /// when the final lock released; `None` if the lockset is still
4754 /// non-empty (further locks held).
4755 ///
4756 /// Releasing an unknown `lock_id` (or releasing on an already-Active
4757 /// node) is an idempotent no-op returning `None`.
4758 pub fn resume(
4759 &self,
4760 node_id: NodeId,
4761 lock_id: LockId,
4762 ) -> Result<Option<ResumeReport>, PauseError> {
4763 // Phase 1 (lock-held): collect drained buffer + pending-wave flag +
4764 // sink Arcs. For default-mode nodes whose `pending_wave` was set
4765 // during pause, schedule a single fn-fire by adding to
4766 // `pending_fires` BEFORE we exit the lock — the wave engine picks
4767 // it up on the next drain tick.
4768 let (sinks, messages, dropped, pending_wave_for_default) = {
4769 let mut s = self.lock_state();
4770 let rec = s
4771 .nodes
4772 .get_mut(&node_id)
4773 .ok_or(PauseError::UnknownNode(node_id))?;
4774 // For Off mode, pause/resume are no-ops by construction.
4775 if rec.pausable == PausableMode::Off {
4776 return Ok(None);
4777 }
4778 let was_default_mode = rec.pausable == PausableMode::Default;
4779 // Capture pending_wave BEFORE remove_lock collapses the state.
4780 let pending_wave = if was_default_mode {
4781 rec.pause_state.take_pending_wave()
4782 } else {
4783 false
4784 };
4785 let Some((buffer, dropped)) = rec.pause_state.remove_lock(lock_id) else {
4786 // Not the final-resume — restore the pending_wave flag we
4787 // tentatively cleared, since we're not transitioning to
4788 // Active yet.
4789 if pending_wave {
4790 rec.pause_state.mark_pending_wave();
4791 }
4792 return Ok(None);
4793 };
4794 let sinks: Vec<Sink> = rec.subscribers.values().cloned().collect();
4795 let messages: Vec<Message> = buffer.into_iter().collect();
4796 // Default-mode pending-wave handling: schedule the fn-fire so
4797 // the wave engine consolidates the pause-window dep deliveries
4798 // into one fn execution. State nodes don't fire fn (no
4799 // `pending_fires` membership has effect for them).
4800 //
4801 // Q-beyond Sub-slice 2 (D108, 2026-05-09): pending_fires lives
4802 // on per-thread WaveState.
4803 if pending_wave && was_default_mode {
4804 crate::batch::with_wave_state(|ws| {
4805 ws.pending_fires.insert(node_id);
4806 });
4807 }
4808 (sinks, messages, dropped, pending_wave && was_default_mode)
4809 };
4810 let replayed = u32::try_from(messages.len()).unwrap_or(u32::MAX);
4811
4812 // Phase 2 (lock-released): fire sinks for ResumeAll-buffered
4813 // messages. Default-mode resume produces no buffered replay (the
4814 // consolidated fn-fire produces fresh wave traffic via the standard
4815 // commit_emission path).
4816 if !messages.is_empty() {
4817 for sink in &sinks {
4818 sink(&messages);
4819 }
4820 // Phase 3: balance the retain_handle calls done at buffer-push
4821 // time — sinks observe values but don't own refcount shares.
4822 for msg in &messages {
4823 if let Some(h) = msg.payload_handle() {
4824 self.binding.release_handle(h);
4825 }
4826 }
4827 }
4828
4829 // Phase 4 (default-mode): drain the consolidated fn-fire scheduled
4830 // in Phase 1. `run_wave_for(node_id)` acquires the partitions
4831 // touched from `node_id` (Slice Y1 / Phase E) and runs the standard
4832 // drain pipeline; the new fn-fire emerges as a normal wave's worth
4833 // of messages to subscribers.
4834 if pending_wave_for_default {
4835 self.run_wave_for(node_id, |_this| {
4836 // The pending_fires entry was pushed in Phase 1 under the
4837 // lock. run_wave's drain picks it up.
4838 });
4839 }
4840 Ok(Some(ResumeReport { replayed, dropped }))
4841 }
4842
4843 /// True if the node currently holds at least one pause lock.
4844 #[must_use]
4845 pub fn is_paused(&self, node_id: NodeId) -> bool {
4846 self.lock_state()
4847 .require_node(node_id)
4848 .pause_state
4849 .is_paused()
4850 }
4851
4852 /// Number of pause locks currently held on `node_id`. `0` if Active.
4853 #[must_use]
4854 pub fn pause_lock_count(&self, node_id: NodeId) -> usize {
4855 self.lock_state()
4856 .require_node(node_id)
4857 .pause_state
4858 .lock_count()
4859 }
4860
4861 /// Test helper: whether `node_id` currently holds the given `lock_id`.
4862 #[must_use]
4863 pub fn holds_pause_lock(&self, node_id: NodeId, lock_id: LockId) -> bool {
4864 self.lock_state()
4865 .require_node(node_id)
4866 .pause_state
4867 .contains_lock(lock_id)
4868 }
4869}
4870
4871// -----------------------------------------------------------------------
4872// set_deps — atomic dep mutation
4873// -----------------------------------------------------------------------
4874
4875/// Errors returnable by [`Core::set_deps`].
4876///
4877/// Per `~/src/graphrefly-ts/docs/research/rewire-design-notes.md` and the
4878/// Phase 13.8 Q1 lock:
4879/// - `SelfDependency` — `n in newDeps` (self-loops are pathological without
4880/// explicit fixed-point semantics, which GraphReFly does not provide).
4881/// - `WouldCreateCycle { path }` — adding the new edge would create a cycle.
4882/// The `path` field reports the offending dep chain for debuggability.
4883/// - `UnknownNode` / `NotComputeNode` — invariant violations from the caller.
4884/// - `TerminalNode` — `n` itself has emitted COMPLETE/ERROR; rewiring a
4885/// terminal stream is a category error (terminal is one-shot at this
4886/// layer; recovery is the resubscribable path on a fresh subscribe).
4887/// - `TerminalDep` — a newly-added dep is terminal AND not resubscribable.
4888/// Resubscribable terminal deps are accepted because the subscribe path
4889/// resets their lifecycle. Non-resubscribable terminal deps would deliver
4890/// their already-emitted terminal directly to `n`'s `dep_terminals` slot,
4891/// which is rarely intended.
4892#[derive(Error, Debug, Clone, PartialEq)]
4893pub enum SetDepsError {
4894 /// `n` appeared in `new_deps` (self-loop rejection).
4895 #[error("set_deps({n:?}, ...): self-dependency rejected (n appeared in new_deps)")]
4896 SelfDependency { n: NodeId },
4897
4898 /// Adding the new dep would create a cycle. `path` is the chain
4899 /// `[added_dep, ..., n]` reachable via existing deps.
4900 #[error(
4901 "set_deps({n:?}, ...): cycle would form via path {path:?} \
4902 (adding {added_dep:?} → {n:?} closes the loop)"
4903 )]
4904 WouldCreateCycle {
4905 n: NodeId,
4906 added_dep: NodeId,
4907 path: Vec<NodeId>,
4908 },
4909
4910 #[error("set_deps: unknown node {0:?}")]
4911 UnknownNode(NodeId),
4912
4913 #[error("set_deps: node {0:?} is not a compute node (state nodes have no deps)")]
4914 NotComputeNode(NodeId),
4915
4916 /// `n` itself has terminated (COMPLETE / ERROR). Rewiring a terminal node
4917 /// is rejected — the stream has ended at this layer. To recover, mark
4918 /// the node resubscribable before terminate; a fresh subscribe will then
4919 /// reset its lifecycle.
4920 #[error("set_deps({n:?}, ...): node has already terminated; cannot rewire a terminal node")]
4921 TerminalNode { n: NodeId },
4922
4923 /// A newly-added dep is terminal AND non-resubscribable. Per Phase 13.8
4924 /// Q1, this is rejected; resubscribable terminal deps are allowed
4925 /// because the subscribe path resets them when activated. Already-present
4926 /// terminal deps are unaffected (their terminal status was accepted at
4927 /// the time they terminated).
4928 #[error(
4929 "set_deps({n:?}, ...): added dep {dep:?} is terminal and not resubscribable; \
4930 either mark it resubscribable before terminate, or remove the dep from new_deps"
4931 )]
4932 TerminalDep { n: NodeId, dep: NodeId },
4933
4934 /// `n` itself is currently mid-fire — a user fn for `n` re-entered Core
4935 /// via `set_deps(n, ...)` from inside `n`'s own `invoke_fn` /
4936 /// `project_each` / `predicate_each` / etc. Phase 1 of the dispatcher
4937 /// snapshotted `dep_handles` BEFORE the lock-released callback; the
4938 /// callback returning a `tracked` set indexed against THAT ordering
4939 /// would corrupt indices if the rewire re-orders deps mid-fire.
4940 /// Rejected to preserve the dynamic-tracked-indices invariant (D1).
4941 ///
4942 /// Workaround: schedule the rewire from a different node's fn (via
4943 /// `Core::emit` on a state node and observing the emit downstream),
4944 /// or perform the rewire after the wave completes (e.g. from a sink
4945 /// callback that is itself outside any fn-fire scope).
4946 ///
4947 /// Slice F (2026-05-07) — A6.
4948 #[error(
4949 "set_deps({n:?}, ...): rejected — node {n:?} is currently mid-fire \
4950 (set_deps from inside the firing node's own fn would corrupt the \
4951 Dynamic `tracked` indices snapshot taken before invoke_fn). \
4952 Schedule the rewire outside this fire scope."
4953 )]
4954 ReentrantOnFiringNode { n: NodeId },
4955 // /qa m1 (2026-05-19): the vestigial `PartitionMigrationDuringFire`
4956 // variant — kept across §7/D208–D211 + D253 (S5) as a downstream-
4957 // churn courtesy for `Err(_)` match arms — is now deleted. D253
4958 // removed the entire scheduling-group identity surface
4959 // (`SchedulingGroupId`, `node_group` index,
4960 // `set_scheduling_group`/`partition_of`/`group_of`), so the
4961 // partition-migration concept is gone too. No surviving caller
4962 // pattern-matches the variant (verified by grep; the only
4963 // reference was a TRASH/scheduling_groups.rs comment).
4964}
4965
4966// D246/S2c: `MigrationRollback` + `MovedComponent` deleted — there is
4967// no cross-shard extract→reinsert (single shard always under
4968// single-owner). D253 (S5) further deletes `SetGroupError` +
4969// `Core::set_scheduling_group` entirely — the declared-group identity
4970// surface (`SchedulingGroupId`, `node_group`, `partition_of`/`group_of`)
4971// is removed until M6 re-introduces it with M6's actual scheduling
4972// needs in view.
4973
4974impl Core {
4975 /// Atomic dep mutation — change a node's upstream deps without TEARDOWN
4976 /// cascading and without losing cache.
4977 ///
4978 /// Per the TLA+-verified design at
4979 /// `~/src/graphrefly-ts/docs/research/wave_protocol_rewire.tla`
4980 /// (35,950 distinct states, all 7 invariants clean):
4981 ///
4982 /// - Removed deps: clear dirtyMask bit, drain pending queue, drop DepRecord.
4983 /// - Added deps: SENTINEL prevData; push-on-subscribe if added dep has cached DATA.
4984 /// - Preserved: `firstRunPassed`, `pauseLocks`, `pauseBuffer`, `cache` (ROM/RAM).
4985 /// - Status auto-settles if dirtyMask becomes empty.
4986 /// - Idempotent on `new_deps == current deps`.
4987 /// - Self-rewire `n ∈ new_deps` rejected (`SelfDependency`).
4988 /// - Cycles rejected (`WouldCreateCycle`).
4989 /// - Allowed mid-wave + while paused.
4990 /// - Phase 13.8 Q1: terminal `n` rejected (`TerminalNode`); newly-added
4991 /// terminal non-resubscribable deps rejected (`TerminalDep`).
4992 ///
4993 /// The body is a single atomic dep-mutation transaction with several
4994 /// discrete validation stages. Splitting would require passing a
4995 /// partially-mutable CoreState across helpers, and the transaction's
4996 /// locality is what makes the F1 refcount-leak collection work.
4997 #[allow(clippy::too_many_lines)]
4998 pub fn set_deps(&self, n: NodeId, new_deps: &[NodeId]) -> Result<(), SetDepsError> {
4999 // 2b-ii-B (D220-EXEC): route to the node's shard; the new deps
5000 // are in the same component ⇒ same group ⇒ same shard (the
5001 // component-consistency check enforces). See `try_emit`.
5002 let mut s = self.lock_state();
5003 // Validate node exists and is compute. Read-once via the helper so
5004 // subsequent code can use `require_node(n)` without re-checking.
5005 let (is_state, is_producer, is_terminal) = {
5006 let rec = s.nodes.get(&n).ok_or(SetDepsError::UnknownNode(n))?;
5007 (rec.is_state(), rec.is_producer(), rec.terminal.is_some())
5008 };
5009 if is_state || is_producer {
5010 // State and Producer nodes have no declared deps — set_deps
5011 // is meaningless. Producer nodes manage their own subscriptions
5012 // through the binding's ProducerCtx; mutating their (empty)
5013 // dep set would not affect that.
5014 return Err(SetDepsError::NotComputeNode(n));
5015 }
5016 // Reject if `n` itself is terminal (Phase 13.8 Q1: terminal nodes
5017 // cannot be rewired; recovery is via resubscribable subscribe).
5018 if is_terminal {
5019 return Err(SetDepsError::TerminalNode { n });
5020 }
5021 // A6 reentrancy guard (Slice F, 2026-05-07): reject if `n` is
5022 // mid-fire. Closes the D1 hazard where `Phase 1` snapshotted
5023 // `dep_handles` against pre-rewire dep ordering and `Phase 3`
5024 // would store the returned `tracked` indices against post-rewire
5025 // ordering.
5026 //
5027 // D250 (S4, 2026-05-19): post-D248/D249 single-owner the only
5028 // historical *trigger* for this guard — a synchronous `set_deps`
5029 // re-entry from inside `n`'s own fn-fire via the binding-holds-
5030 // cloned-Core mechanism — is structurally deleted (`invoke_fn`
5031 // has no `&Core`; `set_deps` is not on `CoreFull`/`MailboxOp`; a
5032 // Defer runs owner-side AFTER the wave settles with
5033 // `currently_firing` empty). The deleted cross-thread P13 path
5034 // (Thread B's set_deps observing Thread A's lock-released
5035 // invoke_fn pushes) is likewise gone — Core is `!Send`, one
5036 // owner thread. The guard is therefore **defensive**: it stays
5037 // live so a future owner-side mid-wave self-rewire seam (or any
5038 // re-entry that lands an owner inside the firing set) is
5039 // rejected fail-loud rather than corrupting tracked indices. The
5040 // **Guard preserved for future owner-side mid-wave self-rewire
5041 // seam** (S4-horizon work). No test exercises this code path
5042 // today — the pre-D221 binding-holds-cloned-Core trigger was
5043 // deleted in D246 and the test stub was removed at S6
5044 // (2026-05-20). **Do NOT delete this guard as "dead code"**:
5045 // removing it weakens the invariant that protects
5046 // `dep_records` indices against a future seam that lands an
5047 // owner inside `currently_firing`. If `CoreFull`/`MailboxOp`
5048 // is ever widened with a `set_deps`-equivalent for a real
5049 // consumer, write a FRESH test against the new seam.
5050 // `currently_firing` lives on `CoreShared`, read under the
5051 // already-held state lock (under D246/D248 single-owner the
5052 // borrow is *structural* rather than concurrency-required —
5053 // `CoreShared` is owner-thread-only).
5054 if s.shared.currently_firing.contains(&n) {
5055 return Err(SetDepsError::ReentrantOnFiringNode { n });
5056 }
5057 // Self-rewire rejection.
5058 if new_deps.contains(&n) {
5059 return Err(SetDepsError::SelfDependency { n });
5060 }
5061 // Validate all new deps exist.
5062 for &d in new_deps {
5063 if !s.nodes.contains_key(&d) {
5064 return Err(SetDepsError::UnknownNode(d));
5065 }
5066 }
5067 // Cycle detection: data flows parent → child via the `children` map.
5068 // Adding edge `d → n` (d becomes a dep of n) creates a cycle iff
5069 // `d` is already reachable from `n` via existing data-flow edges
5070 // (so `n → ... → d` exists, and the new `d → n` closes the loop).
5071 // DFS from `n` along `children` edges, looking for each added dep.
5072 let current_deps: HashSet<NodeId> = s.require_node(n).dep_ids().collect();
5073 let new_deps_set: HashSet<NodeId> = new_deps.iter().copied().collect();
5074 let added: HashSet<NodeId> = new_deps_set.difference(¤t_deps).copied().collect();
5075 for &d in &added {
5076 if let Some(path) = self.path_from_to(&s, n, d) {
5077 return Err(SetDepsError::WouldCreateCycle {
5078 n,
5079 added_dep: d,
5080 path,
5081 });
5082 }
5083 }
5084 // Phase 13.8 Q1: reject newly-added deps that are terminal AND not
5085 // resubscribable. Resubscribable terminal deps are allowed — the
5086 // subscribe path resets their lifecycle when something activates
5087 // them. Already-present (kept) deps are unaffected; their terminal
5088 // status was accepted at the time they terminated.
5089 for &d in &added {
5090 let dep_rec = s.require_node(d);
5091 if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
5092 return Err(SetDepsError::TerminalDep { n, dep: d });
5093 }
5094 }
5095 // Compute `removed` early (Phase F: needs to be available for P13
5096 // split-case widening below). Idempotent fast-path moved below the
5097 // P13 check accordingly.
5098 let removed: HashSet<NodeId> = current_deps.difference(&new_deps_set).copied().collect();
5099
5100 // Idempotent fast-path: no add/remove ⇒ no-op. D253 (S5) deletes
5101 // the prior §7 None/Some-mix endpoint check that lived above —
5102 // the declared-group identity surface is gone until M6.
5103 if added.is_empty() && removed.is_empty() {
5104 return Ok(());
5105 }
5106
5107 // Snapshot old deps (ordered) for topology event, before mutation.
5108 let old_deps_vec: Vec<NodeId> = s.require_node(n).dep_ids_vec();
5109
5110 // Carry out the rewire atomically.
5111 // 1. Build new dep_records, preserving DepRecord state for kept deps.
5112 let new_deps_vec: Vec<NodeId> = new_deps.to_vec();
5113 //
5114 // Refcount discipline (F1 audit fix): each `Some(TerminalKind::Error(h))`
5115 // slot owns a refcount share retained at `terminate_node` time. When a
5116 // dep is REMOVED, its slot is dropped — the corresponding handle's
5117 // share must be released here, otherwise it leaks until Core drop.
5118 // Also release data_batch retains for removed deps.
5119 let (new_dep_records, removed_handles): (Vec<DepRecord>, Vec<HandleId>) = {
5120 let rec = s.require_node(n);
5121 // Index old dep_records by NodeId for O(1) lookup of kept deps.
5122 let old_by_node: HashMap<NodeId, &DepRecord> =
5123 rec.dep_records.iter().map(|dr| (dr.node, dr)).collect();
5124 let new_records: Vec<DepRecord> = new_deps_vec
5125 .iter()
5126 .map(|&d| {
5127 if let Some(old) = old_by_node.get(&d) {
5128 // Kept dep: preserve all state (prev_data, data_batch,
5129 // terminal, wave flags). Subscriptions stay live.
5130 DepRecord {
5131 node: d,
5132 prev_data: old.prev_data,
5133 dirty: old.dirty,
5134 involved_this_wave: old.involved_this_wave,
5135 data_batch: old.data_batch.clone(),
5136 terminal: old.terminal,
5137 }
5138 } else {
5139 // Added dep: fresh sentinel record.
5140 DepRecord::new(d)
5141 }
5142 })
5143 .collect();
5144 // Collect handles to release from REMOVED dep records.
5145 let mut to_release: Vec<HandleId> = Vec::new();
5146 for d in &removed {
5147 if let Some(old) = old_by_node.get(d) {
5148 if let Some(TerminalKind::Error(h)) = old.terminal {
5149 to_release.push(h);
5150 }
5151 // Release data_batch retains for removed deps.
5152 for &h in &old.data_batch {
5153 to_release.push(h);
5154 }
5155 }
5156 }
5157 (new_records, to_release)
5158 };
5159 // Clear dirtyMask bit by re-emitting the wave-bookkeeping: we don't
5160 // currently model a per-dep dirtyMask explicitly (we use the boolean
5161 // `dirty` flag at node level). Removing a dep's entry from the implicit
5162 // mask is therefore implicit — by removing the dep, future emissions
5163 // from it can't re-arm the bit. The per-dep `involved_this_wave` flag
5164 // stays wave-scoped and gets cleared at wave end. The setDeps action
5165 // itself does NOT change the dirty boolean unless all deps are cleared;
5166 // in that case we settle.
5167 // Slice E2 (D067): on a dynamic node that had previously fired its
5168 // fn, capture `has_fired_once` BEFORE the reset so we can fire
5169 // `OnRerun` cleanup lock-released after `drop(s)` below. Without
5170 // this, the next `fire_regular` Phase 1 would capture
5171 // `has_fired_once = false`, causing Phase 1.5 to skip OnRerun —
5172 // silently dropping the prior activation's cleanup closure when
5173 // the next `invoke_fn` overwrites `current_cleanup`. Per spec
5174 // R2.4.5, `set_deps` does NOT end the activation cycle
5175 // (subscribe→unsubscribe is the cycle boundary), so OnRerun must
5176 // fire on every re-fire including post-set_deps.
5177 // §10 perf (D047): compute new topo_rank before the mutable
5178 // borrow on rec, since we need to read other nodes' depths.
5179 let new_topo_rank = if new_deps_vec.is_empty() {
5180 0
5181 } else {
5182 new_deps_vec
5183 .iter()
5184 .filter(|&&d| d != n)
5185 .filter_map(|&d| s.nodes.get(&d).map(|r| r.topo_rank))
5186 .max()
5187 .unwrap_or(0)
5188 .saturating_add(1)
5189 };
5190 let fire_set_deps_on_rerun;
5191 {
5192 let rec = s.require_node_mut(n);
5193 fire_set_deps_on_rerun = rec.is_dynamic && rec.has_fired_once;
5194 rec.dep_records = new_dep_records;
5195 rec.topo_rank = new_topo_rank;
5196 // §10.13 perf (D047): recompute received_mask from new dep_records.
5197 // §10.3 perf (Slice V1): recompute involved_mask alongside.
5198 rec.received_mask = 0;
5199 rec.involved_mask = 0;
5200 for (i, dr) in rec.dep_records.iter().enumerate() {
5201 if i < 64 {
5202 if dr.prev_data != NO_HANDLE || !dr.data_batch.is_empty() {
5203 rec.received_mask |= 1u64 << i;
5204 }
5205 if dr.involved_this_wave {
5206 rec.involved_mask |= 1u64 << i;
5207 }
5208 }
5209 }
5210 // Re-derive `tracked` for static derived: all indices.
5211 // For dynamic: clear `tracked` AND reset `has_fired_once` so the
5212 // next dep delivery satisfies the first-fire branch in
5213 // `deliver_data_to_consumer` (`!has_fired_once || tracked.contains(...)`).
5214 // Without resetting `has_fired_once`, the cleared `tracked` blocks
5215 // every future fire — fn never re-runs and the dynamic node sits
5216 // on stale cache derived from the old dep set. The next fire
5217 // re-runs fn unconditionally; fn's returned `tracked` then
5218 // repopulates `rec.tracked` and normal selective-deps semantics
5219 // resume from the next dep update onward.
5220 if rec.is_dynamic {
5221 rec.tracked.clear();
5222 rec.has_fired_once = false;
5223 } else {
5224 // Derived (static) and Operator track all deps.
5225 rec.tracked = (0..new_deps_vec.len()).collect();
5226 }
5227 }
5228
5229 // 2. Update inverted-edge map (children).
5230 for &removed_dep in &removed {
5231 if let Some(set) = s.children.get_mut(&removed_dep) {
5232 set.remove(&n);
5233 }
5234 }
5235 for &added_dep in &added {
5236 s.children.entry(added_dep).or_default().insert(n);
5237 }
5238
5239 // 3. Push-on-subscribe for added deps with cached DATA. Wraps in a
5240 // wave so any downstream propagation runs cleanly. We capture only
5241 // the LIST of added deps (not their cache values) because the cache
5242 // can change between releasing the validation lock and the wave's
5243 // re-acquisition — see the P2 race fix below.
5244 //
5245 // P2 (Slice A close /qa) — between `drop(s)` and `run_wave`'s
5246 // closure re-acquiring the lock, a concurrent thread could
5247 // invalidate one of the added deps, releasing its cache handle. A
5248 // pre-snapshot of `(added_dep, cache)` pairs would then carry a
5249 // dangling HandleId into `deliver_data_to_consumer`. The fix is to
5250 // re-read each added dep's `cache` INSIDE the closure (under the
5251 // freshly re-acquired state lock). The wave-owner re-entrant mutex
5252 // (Q2) blocks concurrent waves once we enter `run_wave`, so the
5253 // re-read sees a coherent post-validation state.
5254 let added_for_wave: Vec<NodeId> = added.iter().copied().collect();
5255 // §7 (D208–D211): the union-find union/split-eager block that
5256 // lived here is DELETED. scheduling groups are static +
5257 // user-declared and do NOT migrate with topology, so adding /
5258 // removing dep edges never recomputes any node's group. The
5259 // group-consistency invariant for the new adjacency was already
5260 // enforced above (the strict check that replaced the P13 block).
5261 // No registry mutation on the set_deps path.
5262 // B3 (D117, 2026-05-10): when set_deps clears ALL deps mid-wave AND
5263 // n has a tier-1 (DIRTY) message already queued this wave with no
5264 // settle yet, push n to `pending_auto_resolve` so the drain-end
5265 // sweep emits a paired Resolved. Without this, subscribers observe
5266 // an unpaired DIRTY, violating R1.3.1.b two-phase push pairing.
5267 //
5268 // Outside any active wave the per-thread `pending_notify` is empty
5269 // (cleared at wave-end), so the predicate short-circuits and the
5270 // insert is a no-op. Inside a wave, the `pending_auto_resolve`
5271 // sweep at `drain_and_flush` end re-checks pending_notify and
5272 // routes through `queue_notify` (which handles paused-children
5273 // pause-buffer placement automatically).
5274 if new_deps_vec.is_empty() {
5275 // F6 (/qa 2026-05-10): walk pending_notify in arrival order
5276 // counting unpaired DIRTYs. Tier 4 INVALIDATE is NOT a
5277 // settle for two-phase pairing — it clears cache but does
5278 // not pair with a DIRTY. Pairs are DIRTY ↔ DATA / RESOLVED
5279 // (tier 3 value-class) or DIRTY ↔ COMPLETE / ERROR (tier 5
5280 // terminal). Multi-emit waves like `[DIRTY, RESOLVED, DIRTY]`
5281 // leave one trailing unpaired DIRTY that needs auto-Resolved.
5282 crate::batch::with_wave_state(|ws| {
5283 let needs_auto_resolve = ws.pending_notify.get(&n).is_some_and(|entry| {
5284 let mut unpaired: i32 = 0;
5285 for m in entry.iter_messages() {
5286 match m {
5287 crate::message::Message::Dirty => unpaired += 1,
5288 crate::message::Message::Data(_)
5289 | crate::message::Message::Resolved
5290 | crate::message::Message::Complete
5291 | crate::message::Message::Error(_)
5292 if unpaired > 0 =>
5293 {
5294 unpaired -= 1;
5295 }
5296 // INVALIDATE / PAUSE / RESUME / TEARDOWN /
5297 // START — not settles for two-phase pairing.
5298 _ => {}
5299 }
5300 }
5301 unpaired > 0
5302 });
5303 if needs_auto_resolve {
5304 ws.pending_auto_resolve.insert(n);
5305 }
5306 });
5307 }
5308 // Drop the state lock before run_wave (which acquires its own) and
5309 // before crossing the binding boundary for the F1 refcount-fix
5310 // releases. Keeps the lock-discipline split (binding calls outside
5311 // the state lock) consistent with the rest of the dispatcher.
5312 drop(s);
5313 // Slice E2 (D067): fire OnRerun lock-released for dynamic nodes
5314 // that had previously fired. The cleanup closure cleans up
5315 // resources tied to the old dep shape before the next fn-fire
5316 // (triggered by added-dep push-on-subscribe below) registers a
5317 // fresh cleanup spec. Direct fire (NOT via deferred_cleanup_hooks)
5318 // because set_deps may NOT enter a wave (no added deps → no
5319 // run_wave below) — queueing the hook would orphan it until the
5320 // next unrelated wave drains.
5321 if fire_set_deps_on_rerun {
5322 self.binding.cleanup_for(n, CleanupTrigger::OnRerun);
5323 }
5324 // Fire topology event after lock is dropped.
5325 self.fire_topology_event(&crate::topology::TopologyEvent::DepsChanged {
5326 node: n,
5327 old_deps: old_deps_vec,
5328 new_deps: new_deps_vec.clone(),
5329 });
5330 if !added_for_wave.is_empty() {
5331 // Slice Y1 / Phase E: push-on-subscribe wave runs on `n`'s
5332 // touched partitions. Added deps are now unioned with `n`
5333 // (Phase C P12 fix moved registry mutation inside the state
5334 // lock), so any cascade through them stays in `n`'s partition
5335 // set as walked by `compute_touched_partitions`.
5336 self.run_wave_for(n, |this| {
5337 let mut s = this.lock_state();
5338 // Defensive: re-validate `n` still exists and isn't terminal.
5339 // A concurrent path could have terminated it between
5340 // validation and run_wave_for's partition-lock acquisition.
5341 if !s.nodes.contains_key(&n) || s.require_node(n).terminal.is_some() {
5342 return;
5343 }
5344 for added_dep in &added_for_wave {
5345 // Re-read cache under the wave-owner-held lock — this
5346 // is the post-validation, post-concurrent-action
5347 // snapshot. NO_HANDLE means the dep was invalidated
5348 // concurrently; skip (no data to push).
5349 let cache = match s.nodes.get(added_dep) {
5350 Some(rec) => rec.cache,
5351 None => continue, // dep deleted concurrently
5352 };
5353 if cache == NO_HANDLE {
5354 continue;
5355 }
5356 let dep_idx = s.require_node(n).dep_index_of(*added_dep);
5357 if let Some(idx) = dep_idx {
5358 this.deliver_data_to_consumer(&mut s, n, idx, cache);
5359 }
5360 }
5361 });
5362 }
5363 for h in removed_handles {
5364 self.binding.release_handle(h);
5365 }
5366 Ok(())
5367 }
5368
5369 /// DFS from `from` along data-flow edges (children map) looking for `to`.
5370 /// Returns the path including endpoints, or `None` if unreachable. Used
5371 /// for cycle detection in [`Self::set_deps`].
5372 fn path_from_to(&self, s: &CoreState, from: NodeId, to: NodeId) -> Option<Vec<NodeId>> {
5373 if from == to {
5374 return Some(vec![from]);
5375 }
5376 let mut stack: Vec<(NodeId, Vec<NodeId>)> = vec![(from, vec![from])];
5377 let mut visited: HashSet<NodeId> = HashSet::new();
5378 while let Some((cur, path)) = stack.pop() {
5379 if !visited.insert(cur) {
5380 continue;
5381 }
5382 if cur == to {
5383 return Some(path);
5384 }
5385 if let Some(children) = s.children.get(&cur) {
5386 for &child in children {
5387 let mut new_path = path.clone();
5388 new_path.push(child);
5389 stack.push((child, new_path));
5390 }
5391 }
5392 }
5393 None
5394 }
5395}
5396
5397// CoreState helpers — kept on the inner struct so they're naturally scoped
5398// to the lock guard.
5399impl CoreState {
5400 // D246/S2c: `empty_shard` deleted — no per-`ShardKey` shard
5401 // construction (single shard always under single-owner).
5402
5403 // `alloc_node_id` / `alloc_sub_id` moved to `impl St` (Step 2a,
5404 // D220-EXEC): their counters now live in the separate `CoreShared`
5405 // region, which only the combined `St` guard holds alongside the
5406 // shard. Call sites (`s.alloc_node_id()` / `s.alloc_sub_id()` where
5407 // `s = self.lock_state()`) are unchanged — inherent `St` methods
5408 // resolve before the `Deref`-to-`CoreState` ones.
5409
5410 /// Clear wave-scoped flags and rotate per-dep batch data on every
5411 /// node. Run at the end of every wave (regular drain via `run_wave`,
5412 /// activation drain via `activate_derived`, and `BatchGuard::drop`'s
5413 /// drain). Centralized so a future wave-state field can't be missed
5414 /// at one of the cleanup sites.
5415 ///
5416 /// Per-dep rotation (R2.9.b / R1.3.6.b):
5417 /// - `prev_data` ← last element of `data_batch` (or unchanged if empty).
5418 /// The last batch entry's retain transfers to `prev_data`; the old
5419 /// `prev_data`'s retain is released. All earlier batch entries are
5420 /// released.
5421 /// - `data_batch` cleared.
5422 /// - Per-dep `dirty` and `involved_this_wave` cleared.
5423 ///
5424 /// Handle releases are pushed to `deferred_handle_releases` for
5425 /// post-lock-drop release by the caller.
5426 pub(crate) fn clear_wave_state(&mut self, ws: &mut crate::batch::WaveState) {
5427 // Q-beyond Sub-slice 1 (D108, 2026-05-09): `pending_auto_resolve`
5428 // + `pending_pause_overflow` clears moved to
5429 // [`crate::batch::WaveState::clear_wave_state`]. The per-NodeRecord
5430 // rotation below pushes batch-handle and prev_data releases into
5431 // `ws.deferred_handle_releases` (was `cps.deferred_handle_releases`
5432 // pre-sub-slice-1, was `s.deferred_handle_releases` pre-Q2). Caller
5433 // borrows the WaveState thread_local; no lock-discipline rule
5434 // applies (state lock + thread_local borrow are independent).
5435 //
5436 // Q-beyond Sub-slice 3 (D108, 2026-05-09):
5437 // `invalidate_hooks_fired_this_wave` clear moved to
5438 // [`crate::batch::WaveState::clear_wave_state`]. The
5439 // `deferred_cleanup_hooks` invariant (NOT cleared here, drained
5440 // explicitly on success/panic paths) likewise moves with the
5441 // field.
5442 //
5443 // /qa F2 reverted (2026-05-10): `currently_firing` stays on
5444 // CoreState (per-Core, cross-thread visible — load-bearing for
5445 // P13). Defensive clear here mirrors the pre-sub-slice-3 safety
5446 // net (`FiringGuard`'s RAII push/pop is balanced even on panic;
5447 // a future code path that bypasses the guard would otherwise
5448 // leak a stale entry into the next wave).
5449 //
5450 // Step 2a (D220-EXEC): `currently_firing` now lives in the
5451 // separate `CoreShared` region (unreachable from `&mut
5452 // CoreState`). The defensive clear is performed by the two
5453 // `clear_wave_state` call sites in `batch.rs` via the combined
5454 // `St` guard's `.shared` field, immediately adjacent — same
5455 // wave-end point, same belt-and-suspenders intent.
5456 //
5457 // Slice G tier3 emit tracking lives on the per-thread
5458 // `WAVE_STATE`; cleared by the outermost `BatchGuard::drop`
5459 // (S4/D246: `WaveOwnerGuard` deleted — single-owner ⇒ one
5460 // uninterrupted owner-side drain, no per-partition release).
5461 for rec in self.nodes.values_mut() {
5462 rec.dirty = false;
5463 rec.involved_this_wave = false;
5464 // §10.3 perf (Slice V1): clear involved_mask in one op.
5465 rec.involved_mask = 0;
5466 for dr in &mut rec.dep_records {
5467 let batch_len = dr.data_batch.len();
5468 if batch_len > 0 {
5469 // Release all batch entries EXCEPT the last — the last
5470 // entry's retain transfers to prev_data.
5471 for &h in &dr.data_batch[..batch_len - 1] {
5472 ws.deferred_handle_releases.push(h);
5473 }
5474 // Release the OLD prev_data (its retain was from the
5475 // previous wave's rotation or from initial delivery).
5476 if dr.prev_data != NO_HANDLE {
5477 ws.deferred_handle_releases.push(dr.prev_data);
5478 }
5479 // Rotate: last batch entry becomes new prev_data.
5480 // Its retain carries over — no extra retain needed.
5481 dr.prev_data = dr.data_batch[batch_len - 1];
5482 dr.data_batch.clear();
5483 }
5484 dr.dirty = false;
5485 dr.involved_this_wave = false;
5486 }
5487 }
5488 }
5489
5490 pub(crate) fn require_node(&self, id: NodeId) -> &NodeRecord {
5491 self.nodes
5492 .get(&id)
5493 .unwrap_or_else(|| panic!("unknown node {id:?}"))
5494 }
5495
5496 pub(crate) fn require_node_mut(&mut self, id: NodeId) -> &mut NodeRecord {
5497 self.nodes
5498 .get_mut(&id)
5499 .unwrap_or_else(|| panic!("unknown node {id:?}"))
5500 }
5501}
5502
5503/// Release every binding-side refcount share owned by this `CoreState`
5504/// when the last `Core` clone drops the inner Mutex.
5505///
5506/// Without this, every retained handle in `cache` / `terminal` Error /
5507/// `dep_terminals` Error / pause-buffer-payload would leak in the binding
5508/// registry until process exit. Production bindings (napi-rs, pyo3,
5509/// wasm-bindgen) all maintain handle-ref maps that grow unbounded without
5510/// this cleanup.
5511///
5512/// Safe to call during panic unwinding — `BindingBoundary::release_handle`
5513/// is the only call, and a panicking binding during cleanup would already
5514/// have been a problem in normal operation.
5515impl Drop for CoreState {
5516 fn drop(&mut self) {
5517 // Q-beyond Sub-slice 3 (D108, 2026-05-09): `deferred_flush_jobs`
5518 // moved to [`crate::batch::WaveState`]. The `Vec<Sink>` clones
5519 // drop naturally with the per-thread WaveState's lifetime; no
5520 // CoreState-side cleanup needed.
5521 // Q-beyond Sub-slice 1 (D108, 2026-05-09): `deferred_handle_releases`
5522 // and `wave_cache_snapshots` moved to per-thread WaveState
5523 // thread_local. By outermost-BatchGuard-drop discipline both fields
5524 // are empty by the time CoreState drops (BatchGuard owns a Core
5525 // clone, so Core can't drop while a BatchGuard is in flight). Any
5526 // thread that ran a wave on this Core drained on its own outermost
5527 // BatchGuard; cross-Core thread_local sharing is fine because each
5528 // wave drains its own retains.
5529 //
5530 // Q-beyond Sub-slice 2 (D108, 2026-05-09): `pending_fires` and
5531 // `pending_notify` likewise moved to per-thread WaveState. The
5532 // pre-Sub-slice-2 `pending_notify` walk here (drain + release each
5533 // payload_handle) is no longer reachable from `Drop for CoreState`:
5534 // by invariant, no wave is in flight when CoreState drops (BatchGuard
5535 // holds a Core clone), so the originating thread's WaveState
5536 // pending_notify is empty by then. Other threads' WaveStates are
5537 // unreachable from CoreState::drop anyway — they're per-thread
5538 // thread_locals scoped to whichever thread ran the wave. The
5539 // outermost `BatchGuard::drop` is the canonical drain point on both
5540 // success and panic paths; Drop for CoreState relies on that
5541 // discipline holding rather than re-implementing it.
5542
5543 // Per-node retained handles:
5544 // - `cache` (1 retain per non-NO_HANDLE state cache or
5545 // populated compute cache).
5546 // - `terminal == Some(Error(h))` (1 retain on the terminal slot).
5547 // - `dep_terminals[i] == Some(Error(h))` (1 retain per consumer's
5548 // terminated-dep slot).
5549 // - `pause_state` paused buffer messages with payload handles
5550 // (1 retain per buffered Data/Error).
5551 for rec in self.nodes.values_mut() {
5552 if rec.cache != NO_HANDLE {
5553 self.binding.release_handle(rec.cache);
5554 }
5555 if let Some(TerminalKind::Error(h)) = rec.terminal {
5556 self.binding.release_handle(h);
5557 }
5558 for dr in &rec.dep_records {
5559 if let Some(TerminalKind::Error(h)) = dr.terminal {
5560 self.binding.release_handle(h);
5561 }
5562 // Release data_batch retains (in-flight wave data).
5563 for &h in &dr.data_batch {
5564 self.binding.release_handle(h);
5565 }
5566 // Release prev_data retain (cross-wave persistence).
5567 if dr.prev_data != NO_HANDLE {
5568 self.binding.release_handle(dr.prev_data);
5569 }
5570 }
5571 if let PauseState::Paused { buffer, .. } = &rec.pause_state {
5572 for msg in buffer {
5573 if let Some(h) = msg.payload_handle() {
5574 self.binding.release_handle(h);
5575 }
5576 }
5577 }
5578 // Slice E1: release replay-buffer retains.
5579 for &h in &rec.replay_buffer {
5580 self.binding.release_handle(h);
5581 }
5582 // Operator scratch (Slice C-3, D026): generic per-operator
5583 // state struct. Each variant's release_handles releases the
5584 // shares it owns (Scan/Reduce acc, Distinct/Pairwise prev,
5585 // Last latest + default; Take/Skip/TakeWhile own no handles).
5586 if let Some(scratch) = rec.op_scratch.as_mut() {
5587 scratch.release_handles(&*self.binding);
5588 }
5589 }
5590
5591 // D-α (D028) `pending_scratch_release` shutdown drain moved to
5592 // `impl Drop for CoreShared` (Step 2a, D220-EXEC): the queue +
5593 // its `binding` were hoisted into the separate `CoreShared`
5594 // region, so `Drop for CoreState` can no longer reach them.
5595 // `CoreShared` (one per Core, owned by the cell) drops with the
5596 // cell — its `Drop` performs the identical
5597 // release-before-Vec-drop discipline. The per-node retain walk
5598 // above stays here (per-shard `CoreState` owns its `nodes`).
5599
5600 // Q-beyond Sub-slice 1 + 2 (D108, 2026-05-09): WaveState's
5601 // retain-holding fields (`wave_cache_snapshots`,
5602 // `deferred_handle_releases`, `pending_notify`) are drained by
5603 // outermost BatchGuard::drop (success + panic paths). See
5604 // comment above for the invariant.
5605 }
5606}