graphrefly_core/node.rs
1//! The dispatcher — node registration, subscription, wave engine.
2//!
3//! Mirrors `~/src/graphrefly-ts/src/__experiments__/handle-core/core.ts`
4//! (the Phase 13.6 brainstorm prototype, ~370 lines, 22 invariant tests).
5//!
6//! # Scope (M1 dispatcher + Slice A+B parity, closed 2026-05-05)
7//!
8//! - State + derived + dynamic node registration.
9//! - Subscribe / unsubscribe with push-on-subscribe (R1.2.3).
10//! - RAII [`Subscription`] with Drop-based deregister (§10.12).
11//! - DIRTY → DATA / RESOLVED ordering (R1.3.1.b two-phase push).
12//! - Equals-substitution (R1.3.2): identity is zero-FFI; custom crosses boundary.
13//! - First-run gate (R2.5.3) — fn does not fire until every dep has a handle.
14//! - Diamond resolution — one fn fire per wave even with shared upstream.
15//! - `set_deps()` atomic dep mutation with cycle detection + Phase 13.8 Q1
16//! terminal-rejection policy (R3.3.1).
17//! - PAUSE / RESUME with lockId set + replay buffer (R1.2.6, R2.6, §10.2).
18//! - INVALIDATE broadcast + cascade with R1.4 idempotency.
19//! - COMPLETE / ERROR cascade + Lock 2.B auto-cascade gating
20//! (ERROR dominates COMPLETE; first error wins).
21//! - TEARDOWN auto-precedes COMPLETE (R2.6.4 / Lock 6.F) +
22//! `has_received_teardown` idempotency.
23//! - Meta TEARDOWN ordering (R1.3.9.d) — companions tear down before parent.
24//! - Resubscribable terminal lifecycle (R2.2.7, R2.5.3) — late subscribe to a
25//! resubscribable terminal node resets lifecycle, except after TEARDOWN
26//! (per F3 audit guard: TEARDOWN is permanent).
27//!
28//! # Module split (Slice C-1, 2026-05-05)
29//!
30//! Wave-engine internals (drain loop, fire selection, emission commit, sink
31//! dispatch) live in [`crate::batch`]. The split is purely organizational —
32//! the methods are still on `Core`. See `batch.rs` for the wave-engine
33//! entry points (`run_wave`, `drain_and_flush`, `commit_emission`,
34//! `queue_notify`, `deliver_data_to_consumer`).
35//!
36//! # Out of scope (later slices / milestones)
37//!
38//! - Deactivation cleanup (RAM nodes clear cache when sink count → 0) — M2.
39//!
40//! See [`migration-status.md`](../../../docs/migration-status.md) for the
41//! milestone tracker and [`porting-deferred.md`](../../../docs/porting-deferred.md)
42//! for surfaced concerns deferred to evidence-driven slices.
43//!
44//! # Re-entrance discipline (Slice A close, M1: fully lock-released)
45//!
46//! - **Wave-end sink fires** drop the state lock first. A subscriber's sink
47//! that calls back into `Core::emit` / `pause` / `resume` / `invalidate` /
48//! `complete` / `error` / `teardown` re-acquires the lock cleanly and runs
49//! a nested wave (`s.in_tick` is cleared before the deferred-fire phase).
50//! - **`BindingBoundary::invoke_fn`** fires lock-released. The wave engine
51//! acquires + drops the state lock per fn-fire iteration around the
52//! `invoke_fn` callback. User fns may re-enter `Core::emit` / `pause` /
53//! etc. and run a nested wave.
54//! - **`BindingBoundary::custom_equals`** fires lock-released.
55//! `commit_emission` brackets the equals check around a lock release;
56//! custom equals oracles may re-enter Core safely.
57//! - **Subscribe-time handshake** also fires lock-released. [`Core::subscribe`]
58//! acquires the [`Core::wave_owner`] re-entrant mutex first (cross-thread
59//! serialization), installs the sink under the state lock, drops the state
60//! lock, then fires the per-tier handshake (`[Start]` / `[Data(cache)]?` /
61//! `[Complete]?` / `[Error(h)]?` / `[Teardown]?` per R1.3.5.a) lock-released.
62//! A handshake-time sink callback may re-enter Core (`emit` / `complete` /
63//! `error` / `subscribe`); same-thread re-entry passes through `wave_owner`
64//! transparently. Cross-thread emits block on `wave_owner` until the
65//! subscribe path drops it, preserving R1.3.5.a happens-after ordering.
66
67use std::collections::VecDeque;
68use std::panic::{catch_unwind, AssertUnwindSafe};
69use std::sync::{Arc, Weak};
70
71use ahash::{AHashMap as HashMap, AHashSet as HashSet};
72use indexmap::IndexMap;
73use parking_lot::{Mutex, MutexGuard, ReentrantMutex};
74use smallvec::SmallVec;
75use thiserror::Error;
76
77use crate::batch::PendingPerNode;
78use crate::boundary::{BindingBoundary, CleanupTrigger};
79use crate::clock::monotonic_ns;
80use crate::handle::{FnId, HandleId, LockId, NodeId, NO_HANDLE};
81use crate::message::Message;
82
83/// Terminal-lifecycle state — once set on a node, the node will not emit
84/// further DATA; per-dep slots on consumers also use this to track which
85/// upstreams have terminated (R1.3.4 / Lock 2.B).
86///
87/// `Error` carries a [`HandleId`] resolving to the error value. Refcount is
88/// retained when the variant is stored in a node's `terminal` slot or any
89/// consumer's `dep_terminals` slot; v1 does not release these (terminal
90/// state is one-shot at this layer; release happens on resubscribable
91/// terminal-lifecycle reset, a separate slice).
92#[derive(Copy, Clone, Debug, PartialEq, Eq)]
93pub enum TerminalKind {
94 Complete,
95 Error(HandleId),
96}
97
98/// Node kind discriminant — **derived metadata** computed from
99/// [`NodeRecord`]'s field shape (D030 unification, Slice D).
100///
101/// Core no longer stores `kind` as a field; it's computed on demand from
102/// `(deps.is_empty(), fn_id.is_some(), op.is_some(), is_dynamic)`,
103/// mirroring TS's data model where `NodeImpl` has no `_kind` field. The
104/// shape uniquely identifies the kind:
105///
106/// | deps | fn_id | op | is_dynamic | kind |
107/// |-----------|-------|------|-----------|----------|
108/// | empty | None | None | - | State |
109/// | empty | Some | None | - | Producer |
110/// | non-empty | Some | None | false | Derived |
111/// | non-empty | Some | None | true | Dynamic |
112/// | non-empty | None | Some | - | Operator |
113///
114/// Public API ([`Core::kind_of`]) derives this enum on each call. State
115/// nodes are ROM (cache survives deactivation); compute nodes
116/// (Derived / Dynamic / Operator) and producers are RAM.
117#[derive(Copy, Clone, Eq, PartialEq, Debug)]
118pub enum NodeKind {
119 /// Source node: cache is intrinsic, no fn, no deps. Mutated via [`Core::emit`].
120 State,
121 /// Producer node: fn fires once on first subscribe. No deps;
122 /// emissions arrive via sinks the fn subscribes to (zip / concat /
123 /// race / takeUntil pattern). Slice D / D031.
124 Producer,
125 /// Derived node: fn fires on every dep change; all deps tracked.
126 Derived,
127 /// Dynamic node: fn declares which dep indices it actually read this run.
128 /// Untracked dep updates flow through cache but do NOT re-fire fn.
129 Dynamic,
130 /// Operator node: built-in dispatch path for transform / combine /
131 /// flow / resilience operators. The `OperatorOp` discriminant selects
132 /// the per-operator FFI path ([`BindingBoundary::project_each`] etc.);
133 /// Core manages per-operator state via the generic `op_scratch` slot
134 /// on `NodeRecord` (D026). Per Slice C-1 (D009) / Slice C-3 (D026).
135 Operator(OperatorOp),
136}
137
138impl NodeKind {
139 /// True if this kind opts OUT of Lock 2.B auto-cascade. Operator(Reduce)
140 /// and Operator(Last) must intercept upstream COMPLETE so they can emit
141 /// their accumulator / buffered value before the cascade terminates them;
142 /// instead of cascading, terminate_node queues such children for fn-fire
143 /// so `fire_operator` can handle the terminal.
144 pub(crate) fn skips_auto_cascade(self) -> bool {
145 matches!(
146 self,
147 NodeKind::Operator(OperatorOp::Reduce { .. } | OperatorOp::Last { .. })
148 )
149 }
150}
151
152/// Built-in operator discriminant. Selects the per-operator dispatch path
153/// in `fire_operator` (`crates/graphrefly-core/src/batch.rs`). Each variant
154/// carries the binding-side closure ids (and seed handle for stateful
155/// folders) needed for the wave-execution path; Core stores no user values
156/// itself per the handle-protocol cleaving plane.
157#[derive(Copy, Clone, Eq, PartialEq, Debug)]
158pub enum OperatorOp {
159 /// `map(source, project)` — element-wise transform. Calls
160 /// `BindingBoundary::project_each(fn_id, &inputs)` per fire; emits each
161 /// returned handle via `commit_emission_verbatim` (R1.3.2.d batch
162 /// semantics — no equals substitution between batch entries).
163 Map { fn_id: FnId },
164 /// `filter(source, predicate)` — silent-drop selection (D012/D018).
165 /// Calls `BindingBoundary::predicate_each(fn_id, &inputs)`; emits each
166 /// passing input verbatim. If zero pass on a wave that dirtied the
167 /// node, queues a single `RESOLVED` to settle (D018).
168 Filter { fn_id: FnId },
169 /// `scan(source, fold, seed)` — left-fold emitting each new accumulator.
170 /// `seed` is captured at registration; `acc` lives in
171 /// [`ScanState`](super::op_state::ScanState) inside
172 /// [`NodeRecord::op_scratch`] and persists across waves until
173 /// resubscribable reset. Calls `BindingBoundary::fold_each(fn_id, acc,
174 /// &inputs) -> SmallVec<HandleId>` per fire.
175 Scan { fn_id: FnId, seed: HandleId },
176 /// `reduce(source, fold, seed)` — left-fold emitting once on upstream
177 /// COMPLETE. Accumulates silently while source DATA flows; on
178 /// dep[0].terminal == Some(Complete), emits `[Data(acc), Complete]`.
179 /// On `Error(h)`, propagates the error verbatim. Opts out of Lock 2.B
180 /// auto-cascade (see `NodeKind::skips_auto_cascade`).
181 Reduce { fn_id: FnId, seed: HandleId },
182 /// `distinctUntilChanged(source, equals)` — suppresses adjacent
183 /// duplicates. Calls `BindingBoundary::custom_equals(equals_fn_id,
184 /// prev, current)` per input; emits non-equal items verbatim and
185 /// updates `prev`. If zero items pass on a wave that dirtied the node,
186 /// queues `RESOLVED` (matches Filter discipline).
187 DistinctUntilChanged { equals_fn_id: FnId },
188 /// `pairwise(source)` — emits `(prev, current)` pairs starting after
189 /// the second value. First value swallowed (sets `prev`). Calls
190 /// `BindingBoundary::pairwise_pack(fn_id, prev, current)` per pair to
191 /// produce the binding-side tuple handle.
192 Pairwise { fn_id: FnId },
193
194 // ----- Slice C-2: multi-dep combinators (D020) -----
195 /// `combine(...sources)` — N-dep combineLatest. On any dep fire, packs
196 /// the latest handle per dep into a single tuple handle via
197 /// `BindingBoundary::pack_tuple(pack_fn, &handles)`. First-run gate
198 /// (`partial: false` default) holds until all deps deliver real DATA
199 /// (R2.5.3). COMPLETE cascades when all deps complete (R1.3.4.b).
200 Combine { pack_fn: FnId },
201
202 /// `withLatestFrom(primary, secondary)` — 2-dep, fire-on-primary-only
203 /// (D021, Phase 10.5). Packs `[primary, secondary]` via
204 /// `BindingBoundary::pack_tuple(pack_fn, &handles)` when dep[0]
205 /// (primary) has DATA in the wave. If only dep[1] (secondary) fires,
206 /// settles with RESOLVED (D018 pattern). First-run gate holds until
207 /// both deps deliver (R2.5.3 `partial: false`). Post-warmup INVALIDATE
208 /// guard: if secondary `prev_data == NO_HANDLE` and batch empty after
209 /// warmup, settles with RESOLVED (no stale pair).
210 WithLatestFrom { pack_fn: FnId },
211
212 /// `merge(...sources)` — N-dep, forward all DATA handles verbatim
213 /// (D022). Zero FFI on fire: no transformation, no binding call.
214 /// Each dep's batch handles are retained and emitted individually.
215 /// COMPLETE cascades when all deps complete (R1.3.4.b).
216 Merge,
217
218 // ----- Slice C-3: flow operators (D024) -----
219 /// `take(source, count)` — emits the first `count` DATA values then
220 /// self-completes via `Core::complete`. Tracks `count_emitted` in
221 /// [`TakeState`](super::op_state::TakeState). When upstream completes
222 /// before `count` is reached, the standard auto-cascade propagates
223 /// COMPLETE. `count == 0` is allowed: the first fire emits zero
224 /// items then immediately self-completes (D027).
225 Take { count: u32 },
226
227 /// `skip(source, count)` — drops the first `count` DATA values; once
228 /// the threshold is crossed, subsequent DATAs pass through verbatim.
229 /// Tracks `count_skipped` in [`SkipState`](super::op_state::SkipState).
230 /// On a wave where every input is still in the skip window, queues
231 /// DIRTY+RESOLVED to settle (D018 pattern).
232 Skip { count: u32 },
233
234 /// `takeWhile(source, predicate)` — emits while `predicate(input)`
235 /// holds; on the first `false`, emits any preceding passes then
236 /// self-completes via `Core::complete`. Reuses
237 /// [`BindingBoundary::predicate_each`] (D029); after the first
238 /// `false`, subsequent inputs in the same batch are dropped.
239 TakeWhile { fn_id: FnId },
240
241 /// `last(source)` / `last_with_default(source, default)` — buffers
242 /// the latest DATA; on upstream COMPLETE, emits `Data(latest)` then
243 /// `Complete`. The `default` field is `NO_HANDLE` for the no-default
244 /// factory (emits only `Complete` on empty stream), or a registered
245 /// default handle (emits `Data(default)` + `Complete` on empty
246 /// stream). Storage: [`LastState`](super::op_state::LastState) holds
247 /// `latest` (live buffer) and `default` (registration-time, stable).
248 /// Opts out of Lock 2.B auto-cascade so it can intercept upstream
249 /// COMPLETE.
250 Last { default: HandleId },
251}
252
253/// Registration options for [`Core::register_operator`].
254///
255/// `equals` controls operator output dedup (R5.7 — defaults to identity).
256/// `partial` controls the R2.5.3 first-run gate (R5.4 — operator dispatch
257/// fires on first DATA from any dep when `true`; default `false` matches
258/// the gated derived discipline).
259#[derive(Copy, Clone, Debug)]
260pub struct OperatorOpts {
261 pub equals: EqualsMode,
262 pub partial: bool,
263}
264
265impl Default for OperatorOpts {
266 fn default() -> Self {
267 Self {
268 equals: EqualsMode::Identity,
269 partial: false,
270 }
271 }
272}
273
274/// Closure-form fn id OR typed operator discriminant — the two dispatch
275/// paths a node can use. State / passthrough nodes pass `None` to
276/// [`Core::register`] (no fn at all).
277#[derive(Copy, Clone, Debug)]
278pub enum NodeFnOrOp {
279 /// Closure-form: invokes [`BindingBoundary::invoke_fn`] per fire.
280 /// Used for Derived / Dynamic / Producer.
281 Fn(FnId),
282 /// Typed-op: routes to a `fire_op_*` helper that calls per-operator
283 /// FFI methods (`project_each` / `predicate_each` / `fold_each` /
284 /// `pairwise_pack` / `pack_tuple`). Used for Operator nodes.
285 Op(OperatorOp),
286}
287
288/// Pause behavior mode (canonical-spec §2.6 — three modes shipped in TS;
289/// Slice F audit, 2026-05-07 — closed the Rust port gap).
290///
291/// | Mode | Outgoing tier-3 routing while paused | RESUME behavior |
292/// |---|---|---|
293/// | [`PausableMode::Default`] | suppress fn-fire upstream (no DIRTY emitted) | fire fn ONCE on RESUME if any dep delivered DATA during pause; collapses N pause-window writes into one settle |
294/// | [`PausableMode::ResumeAll`] | buffer outgoing tier-3 / tier-4 messages per-wave | replay each buffered wave verbatim on RESUME |
295/// | [`PausableMode::Off`] | dispatcher ignores PAUSE; tier-3 flushes immediately | no-op (no buffer to drain) |
296///
297/// Default is [`PausableMode::Default`] per canonical §2.6 — every untagged
298/// source picks it up. Memory profile is O(1) per node (no buffer); the
299/// trade-off is "subscribers see one consolidated DATA on RESUME" rather
300/// than the K mid-pause emissions verbatim.
301///
302/// Note: tier-1 (DIRTY) / tier-2 (PAUSE/RESUME) / tier-5 (COMPLETE/ERROR) /
303/// tier-6 (TEARDOWN) bypass pause regardless of mode — they remain
304/// observable so leaked pause-controllers cannot strand subscribers.
305#[derive(Copy, Clone, Debug, Default, PartialEq, Eq)]
306pub enum PausableMode {
307 /// Suppress fn-fire while paused; fire once on RESUME if any dep
308 /// delivered DATA during the pause window. Canonical default.
309 #[default]
310 Default,
311 /// Buffer outgoing tier-3 / tier-4 messages per-wave; replay on
312 /// RESUME. Use when subscribers need verbatim emit history (e.g. an
313 /// audit log, replay-on-reconnect bridge).
314 ResumeAll,
315 /// Dispatcher ignores PAUSE for this node — tier-3 flushes
316 /// immediately even while a lock is held. Use for nodes whose value
317 /// production is intrinsically pause-immune (telemetry counters,
318 /// monotonic timers).
319 Off,
320}
321
322/// Per-kind opts for [`Core::register`]. Cross-kind config knobs live
323/// here; per-kind specifics (deps, fn_or_op) live on
324/// [`NodeRegistration`].
325#[derive(Copy, Clone, Debug)]
326pub struct NodeOpts {
327 /// Initial cached value. Only valid for state nodes (no deps + no
328 /// fn + no op). [`NO_HANDLE`] starts the node sentinel.
329 pub initial: HandleId,
330 /// Equality mode for outgoing emissions (R1.3.2). Defaults to
331 /// [`EqualsMode::Identity`].
332 pub equals: EqualsMode,
333 /// First-run gate (R2.5.3 / D011). When `true`, the node fires as
334 /// soon as ANY dep delivers a real handle; when `false` (default),
335 /// the node holds until every dep has delivered.
336 pub partial: bool,
337 /// Dynamic flag (R2.5.3) — fn declares actually-tracked dep indices
338 /// per fire. Only meaningful when `fn_or_op == Some(Fn(_))` AND
339 /// deps non-empty.
340 pub is_dynamic: bool,
341 /// Pause behavior mode (canonical §2.6). Default is
342 /// [`PausableMode::Default`]. See [`PausableMode`] for the trade-offs.
343 pub pausable: PausableMode,
344 /// Replay buffer cap (canonical R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
345 /// `None` (default) disables; `Some(N)` keeps a circular buffer of the
346 /// last N DATA emissions and replays them to late subscribers as part
347 /// of the per-tier handshake (between [`Message::Start`] and any
348 /// terminal slice). Only DATA is buffered; RESOLVED entries are NOT
349 /// (R2.6.5 explicit "DATA only").
350 pub replay_buffer: Option<usize>,
351}
352
353impl Default for NodeOpts {
354 fn default() -> Self {
355 Self {
356 initial: NO_HANDLE,
357 equals: EqualsMode::Identity,
358 partial: false,
359 is_dynamic: false,
360 pausable: PausableMode::Default,
361 replay_buffer: None,
362 }
363 }
364}
365
366/// Unified node-registration descriptor (D030, Slice D).
367///
368/// All node kinds (State / Producer / Derived / Dynamic / Operator)
369/// register through [`Core::register`] with a `NodeRegistration`. The
370/// kind is **derived from the field shape** of the registration —
371/// `(deps.is_empty(), fn_or_op variant)`:
372///
373/// | deps | fn_or_op | is_dynamic | resulting kind |
374/// |-----------|-----------|-----------|----------------|
375/// | empty | None | - | State |
376/// | empty | Some(Fn) | - | Producer |
377/// | non-empty | Some(Fn) | false | Derived |
378/// | non-empty | Some(Fn) | true | Dynamic |
379/// | non-empty | Some(Op) | - | Operator |
380///
381/// The sugar wrappers ([`Core::register_state`], [`Core::register_producer`],
382/// etc.) build a `NodeRegistration` and delegate.
383#[derive(Clone, Debug)]
384pub struct NodeRegistration {
385 /// Upstream deps in declaration order. Empty for state / producer.
386 pub deps: Vec<NodeId>,
387 /// Closure-form fn id or typed-op discriminant. `None` for state /
388 /// passthrough.
389 pub fn_or_op: Option<NodeFnOrOp>,
390 /// Cross-kind config knobs.
391 pub opts: NodeOpts,
392}
393
394/// Equality mode for a node's outgoing emissions.
395///
396/// `Identity` is the default: cache vs. new handle compare is a `u64` equal —
397/// zero FFI. `Custom` invokes [`BindingBoundary::custom_equals`] every check
398/// (R1.3.2.b two-arg call when both sides are non-sentinel).
399#[derive(Copy, Clone, Debug)]
400pub enum EqualsMode {
401 Identity,
402 Custom(FnId),
403}
404
405/// Internal identifier for a single subscription. Allocated per
406/// [`Core::subscribe`] call. Wrapped by [`Subscription`] for the public API;
407/// consumed directly only by Core internals and the [`Subscription::Drop`]
408/// path.
409#[derive(Copy, Clone, Eq, PartialEq, Hash, Debug)]
410pub(crate) struct SubscriptionId(u64);
411
412/// RAII subscription handle.
413///
414/// Returned by [`Core::subscribe`]. While the handle is held, the sink stays
415/// registered against its node. Dropping the handle (explicitly via
416/// `drop(sub)` or implicitly at scope exit) unsubscribes the sink — no manual
417/// `unsubscribe()` call is needed. Per §10.12 of the rust-port session doc.
418///
419/// # Lifetime semantics
420///
421/// The subscription holds a [`Weak`] reference back to the Core's state. If
422/// the Core is dropped before the subscription, the Drop impl is a silent
423/// no-op (the sink has nowhere to deregister from anyway). This avoids a
424/// reference cycle when subscribers capture an `Arc<Core>` in their closure.
425///
426/// # Thread safety
427///
428/// `Send + Sync`. The handle can be moved across threads or dropped from
429/// any thread.
430///
431/// # Not Clone
432///
433/// `Subscription` owns the unsubscribe action exclusively. Cloning would
434/// require either "first drop wins" or "last drop wins" semantics, both
435/// of which surprise. If a binding needs multiple deregistration handles,
436/// it should subscribe multiple times (each producing a fresh handle) or
437/// wrap the single `Subscription` in `Arc<Mutex<Option<Subscription>>>`.
438#[must_use = "dropping a Subscription unsubscribes its sink immediately"]
439pub struct Subscription {
440 state: Weak<Mutex<CoreState>>,
441 node_id: NodeId,
442 sub_id: SubscriptionId,
443}
444
445impl Subscription {
446 /// The node this subscription is attached to.
447 #[must_use]
448 pub fn node_id(&self) -> NodeId {
449 self.node_id
450 }
451}
452
453impl Drop for Subscription {
454 fn drop(&mut self) {
455 // Silent no-op if Core is gone. This keeps Drop infallible (no panics
456 // from a dropped subscription racing a dropped Core) and avoids
457 // surprising users with errors on shutdown.
458 //
459 // Producer deactivation (Slice D, D031): if removing this sub
460 // empties the subscribers map AND the node is a producer, fire
461 // `BindingBoundary::producer_deactivate(node_id)` AFTER releasing
462 // the state lock. The binding then drops its per-node state
463 // (subscriptions to upstream sources, captured closure state),
464 // which transitively unsubs from upstreams via their own
465 // `Subscription::Drop`. Re-entrance into Core from the deactivate
466 // hook is permitted since the lock is released first.
467 let Some(state) = self.state.upgrade() else {
468 return;
469 };
470 // Slice E2 (D056): when the last subscriber drops, fire the
471 // node's OnDeactivation cleanup hook BEFORE producer_deactivate
472 // (cleanup may release handles the producer subscription owns;
473 // reverse order would let producer_deactivate drop subs that user
474 // cleanup expected to be live). Both calls are lock-released per
475 // D045.
476 //
477 // OnDeactivation gating (D068, QA Q3 fix): fires only when the
478 // node has fired its fn at least once AND has a fn (`fn_id`
479 // populated). State nodes have no fn — they cannot register a
480 // cleanup spec via the production fn-return path (R2.4.5), so
481 // firing `cleanup_for` on them is wasted FFI; the binding's
482 // lookup is guaranteed to find no `current_cleanup`. Skipping
483 // here saves the FFI hop and matches the design-doc wording
484 // ("never-fired state nodes" — state-with-initial-value satisfies
485 // `has_fired_once = true` but still has no fn).
486 //
487 // Slice E2 /qa Q2(b) (D069): if the node is a resubscribable
488 // node that's ALREADY terminal (terminate fired BEFORE this last
489 // sub drop), fire `wipe_ctx` lock-released AFTER OnDeactivation
490 // + producer_deactivate. Mutually exclusive with `terminate_node`'s
491 // queue-wipe site: terminate-with-empty-subs goes through
492 // `pending_wipes`; terminate-with-live-subs routes here when
493 // those subs eventually drop. Either path fires exactly one
494 // wipe per terminal lifecycle.
495 let (was_last_sub, is_producer, has_user_cleanup, fire_wipe, binding) = {
496 let mut s = state.lock();
497 let Some(rec) = s.nodes.get_mut(&self.node_id) else {
498 return;
499 };
500 rec.subscribers.remove(&self.sub_id);
501 // Slice X4 / D2: bump revision so any pending_notify entry for
502 // this node opened earlier in the wave starts a fresh batch on
503 // the next queue_notify, dropping the now-departed sink from
504 // the snapshot.
505 rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
506 let last = rec.subscribers.is_empty();
507 let producer = rec.is_producer();
508 // OnDeactivation gate: must have run a fn at least once
509 // (has_fired_once) AND have a fn registered (fn_id.is_some()).
510 // The fn_id check excludes state nodes whose has_fired_once
511 // tracks initial-value status, not "user fn ran."
512 let user_cleanup = rec.has_fired_once && rec.fn_id.is_some();
513 let fire_wipe = last && rec.resubscribable && rec.terminal.is_some();
514 // Clone the binding Arc out only if at least one hook will
515 // fire. Cheap (Arc::clone) in the common path; skipped on
516 // non-last-sub or never-fired non-producer nodes.
517 let binding = if last && (producer || user_cleanup || fire_wipe) {
518 Some(s.binding.clone())
519 } else {
520 None
521 };
522 (last, producer, user_cleanup, fire_wipe, binding)
523 };
524 if was_last_sub {
525 if let Some(binding) = binding {
526 if has_user_cleanup {
527 binding.cleanup_for(self.node_id, CleanupTrigger::OnDeactivation);
528 }
529 if is_producer {
530 binding.producer_deactivate(self.node_id);
531 }
532 // D069: eager wipe — fires AFTER OnDeactivation so the
533 // user closure observes pre-wipe `store` (matches the
534 // existing "OnDeactivation runs before wipe on terminal
535 // reset" invariant covered by test 10). Idempotent —
536 // `HashMap::remove` on absent key is a no-op, so even
537 // if the wave already drained `pending_wipes` earlier,
538 // this fire is benign.
539 if fire_wipe {
540 binding.wipe_ctx(self.node_id);
541 }
542 }
543 }
544 }
545}
546
547// Compile-time assertion that Subscription is Send + Sync. If a future field
548// breaks this, the build fails here rather than downstream at the binding
549// site.
550const _: fn() = || {
551 fn assert_send_sync<T: Send + Sync>() {}
552 assert_send_sync::<Subscription>();
553};
554
555/// A subscriber callback. `Send + Sync` so the Core can fire it from any
556/// thread; `Fn` (not `FnMut`) so multiple references coexist — capture
557/// mutable state in `Mutex<T>` or atomics on the binding side.
558pub type Sink = Arc<dyn Fn(&[Message]) + Send + Sync>;
559
560// ---------------------------------------------------------------------------
561// PAUSE/RESUME state — §10.2 of the rust-port session doc
562// ---------------------------------------------------------------------------
563
564/// Per-node pause state.
565///
566/// Replaces the four TS fields (`_pauseLocks`, `_pauseBuffer`,
567/// `_pauseDroppedCount`, `_pauseStartNs`) with a single enum where
568/// the buffered fields are unreachable in the [`Self::Active`] variant —
569/// the compiler refuses access. Per §10.2 simplification.
570///
571/// # Invariants
572///
573/// - `Active` ⇔ no lockId held.
574/// - `Paused { locks, .. }` ⇔ `!locks.is_empty()`.
575/// - Buffered messages are tier 3 (DATA/RESOLVED) and tier 4 (INVALIDATE)
576/// only. Other tiers pass through immediately even while paused.
577/// - `dropped` counts messages that fell out the front of `buffer` due to
578/// the Core-global `pause_buffer_cap`; it is reported on resume so callers
579/// can detect overflow without re-tracking it externally.
580#[derive(Debug)]
581pub(crate) enum PauseState {
582 Active,
583 Paused {
584 /// Active lock holders. `SmallVec` keeps the common 1–2 lock case
585 /// stack-allocated. Replaces `Set<unknown>` from TS.
586 locks: SmallVec<[LockId; 2]>,
587 /// Buffered tier-3/tier-4 outgoing messages, in arrival order.
588 /// Replayed on the final RESUME.
589 buffer: VecDeque<Message>,
590 /// Count of messages dropped from the front when `buffer.len()` would
591 /// exceed `pause_buffer_cap`. Cleared on final RESUME (next pause
592 /// cycle starts fresh).
593 dropped: u32,
594 /// Wall-clock-monotonic ns when the lock first transitioned this node
595 /// from `Active` to `Paused`. Used by R1.3.8.c overflow ERROR
596 /// synthesis to compute `lock_held_duration_ms` in the diagnostic
597 /// payload (Slice F, A3 — 2026-05-07).
598 started_at_ns: u64,
599 /// True after the first overflow event in this pause cycle has been
600 /// reported via [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
601 /// Subsequent overflows in the same cycle don't re-emit ERROR
602 /// (canonical R1.3.8.c: "once per overflow event"). Cleared on
603 /// final RESUME (next pause cycle starts fresh).
604 overflow_reported: bool,
605 /// Default-mode bookkeeping (Slice F audit close, 2026-05-07).
606 /// Set to `true` when an upstream dep delivery arrives while this
607 /// node is paused with [`PausableMode::Default`]. On final RESUME,
608 /// if `true`, the node is added back to `pending_fires` so the fn
609 /// fires once with the consolidated dep state. Always `false` for
610 /// `ResumeAll` mode (the buffered messages are the consolidation
611 /// mechanism there). Cleared on final RESUME.
612 pending_wave: bool,
613 },
614}
615
616impl PauseState {
617 pub(crate) fn is_paused(&self) -> bool {
618 matches!(self, Self::Paused { .. })
619 }
620
621 fn lock_count(&self) -> usize {
622 match self {
623 Self::Active => 0,
624 Self::Paused { locks, .. } => locks.len(),
625 }
626 }
627
628 fn contains_lock(&self, lock_id: LockId) -> bool {
629 match self {
630 Self::Active => false,
631 Self::Paused { locks, .. } => locks.contains(&lock_id),
632 }
633 }
634
635 /// Add a lock; transitions Active → Paused on first lock. Idempotent on
636 /// duplicate lock_id (matches TS convention; spec is silent on the case).
637 fn add_lock(&mut self, lock_id: LockId) {
638 match self {
639 Self::Active => {
640 let mut locks = SmallVec::new();
641 locks.push(lock_id);
642 *self = Self::Paused {
643 locks,
644 buffer: VecDeque::new(),
645 dropped: 0,
646 started_at_ns: monotonic_ns(),
647 overflow_reported: false,
648 pending_wave: false,
649 };
650 }
651 Self::Paused { locks, .. } => {
652 if !locks.contains(&lock_id) {
653 locks.push(lock_id);
654 }
655 }
656 }
657 }
658
659 /// Mark that an upstream dep delivered DATA to a node paused with
660 /// [`PausableMode::Default`]. The node will re-enter `pending_fires`
661 /// on final RESUME via [`Self::take_pending_wave`].
662 pub(crate) fn mark_pending_wave(&mut self) {
663 if let Self::Paused { pending_wave, .. } = self {
664 *pending_wave = true;
665 }
666 }
667
668 /// Read and clear the `pending_wave` flag. Called from
669 /// [`Core::resume`] when transitioning Paused → Active. Returns `true`
670 /// only if the node was paused with `pending_wave` set.
671 pub(crate) fn take_pending_wave(&mut self) -> bool {
672 if let Self::Paused { pending_wave, .. } = self {
673 std::mem::replace(pending_wave, false)
674 } else {
675 false
676 }
677 }
678
679 /// Remove a lock; if the lockset becomes empty, transition Paused →
680 /// Active and return the buffered messages for replay (along with the
681 /// dropped count for diagnostics). Unknown lock_id is an idempotent
682 /// no-op (matches TS, R1.2.6 implicit).
683 fn remove_lock(&mut self, lock_id: LockId) -> Option<(VecDeque<Message>, u32)> {
684 match self {
685 Self::Active => None,
686 Self::Paused { locks, .. } => {
687 if let Some(idx) = locks.iter().position(|l| *l == lock_id) {
688 locks.swap_remove(idx);
689 }
690 if locks.is_empty() {
691 let prev = std::mem::replace(self, Self::Active);
692 if let Self::Paused {
693 buffer, dropped, ..
694 } = prev
695 {
696 return Some((buffer, dropped));
697 }
698 }
699 None
700 }
701 }
702 }
703
704 /// Append a message to the buffer; if the buffer would exceed `cap`,
705 /// pop from the front (oldest-first), increment `dropped`, and return
706 /// the dropped messages so the caller can release any payload handles
707 /// they reference. `cap` of `None` means unbounded.
708 ///
709 /// Returns [`PushBufferedResult`] carrying both the dropped messages
710 /// (for refcount release) and whether this push triggered the FIRST
711 /// overflow event in the current pause cycle (for R1.3.8.c ERROR
712 /// synthesis — the caller schedules a single ERROR per cycle).
713 ///
714 /// Note: refcount management for the message's payload handle is the
715 /// caller's responsibility — see [`Core::queue_notify`] for the
716 /// retain/release discipline. The buffer itself is just a message
717 /// container; refcounts cross the binding boundary.
718 pub(crate) fn push_buffered(&mut self, msg: Message, cap: Option<usize>) -> PushBufferedResult {
719 let mut result = PushBufferedResult::default();
720 if let Self::Paused {
721 buffer,
722 dropped,
723 overflow_reported,
724 ..
725 } = self
726 {
727 buffer.push_back(msg);
728 if let Some(c) = cap {
729 while buffer.len() > c {
730 if let Some(dropped_msg) = buffer.pop_front() {
731 result.dropped_msgs.push(dropped_msg);
732 }
733 *dropped = dropped.saturating_add(1);
734 }
735 }
736 // R1.3.8.c (Slice F, A3): flag first overflow this cycle.
737 if !result.dropped_msgs.is_empty() && !*overflow_reported {
738 *overflow_reported = true;
739 result.first_overflow_this_cycle = true;
740 }
741 }
742 result
743 }
744
745 /// Snapshot the diagnostic for an R1.3.8.c overflow ERROR synthesis.
746 /// Returns `(dropped_count, lock_held_ns)`. Caller must already know
747 /// the configured cap (it's a Core-global value, not per-PauseState).
748 pub(crate) fn overflow_diagnostic(&self) -> Option<(u32, u64)> {
749 match self {
750 Self::Active => None,
751 Self::Paused {
752 dropped,
753 started_at_ns,
754 ..
755 } => {
756 let lock_held_ns = monotonic_ns().saturating_sub(*started_at_ns);
757 Some((*dropped, lock_held_ns))
758 }
759 }
760 }
761}
762
763/// Return shape for [`PauseState::push_buffered`]. Carries both the dropped
764/// messages (for refcount release) and an "is this the first overflow this
765/// cycle" flag (for R1.3.8.c ERROR synthesis scheduling).
766#[derive(Default)]
767pub(crate) struct PushBufferedResult {
768 pub(crate) dropped_msgs: Vec<Message>,
769 pub(crate) first_overflow_this_cycle: bool,
770}
771
772/// Pending R1.3.8.c overflow ERROR synthesis entry. Recorded by
773/// [`Core::queue_notify`] when the pause buffer first overflows in a cycle;
774/// drained at wave-end after the lock-released call to
775/// [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
776///
777/// `configured_max` is captured at scheduling time rather than read at
778/// drain — the user could change `pause_buffer_cap` between schedule and
779/// drain, and the diagnostic reads "the cap that was in effect when the
780/// overflow happened."
781#[derive(Debug, Clone)]
782pub(crate) struct PendingPauseOverflow {
783 pub(crate) node_id: NodeId,
784 pub(crate) dropped_count: u32,
785 pub(crate) configured_max: usize,
786 pub(crate) lock_held_ns: u64,
787}
788
789/// Errors returnable by [`Core::pause`] and [`Core::resume`].
790#[derive(Error, Debug, Clone, PartialEq)]
791pub enum PauseError {
792 #[error("pause/resume: unknown node {0:?}")]
793 UnknownNode(NodeId),
794}
795
796/// Errors returnable by [`Core::up`] (canonical R1.4.1).
797#[derive(Error, Debug, Clone, PartialEq)]
798pub enum UpError {
799 /// Node id is not registered.
800 #[error("up: unknown node {0:?}")]
801 UnknownNode(NodeId),
802 /// Tier-3 (DATA / RESOLVED) and tier-5 (COMPLETE / ERROR) are
803 /// downstream-only per R1.4.1; rejected at the boundary.
804 #[error(
805 "up: tier {tier} is forbidden upstream — value (tier 3) and \
806 terminal-lifecycle (tier 5) planes are downstream-only per R1.4.1"
807 )]
808 TierForbidden { tier: u8 },
809}
810
811/// Errors returnable by [`Core::register`] and its sugar wrappers
812/// ([`Core::register_state`], [`Core::register_producer`],
813/// [`Core::register_derived`], [`Core::register_dynamic`],
814/// [`Core::register_operator`]).
815///
816/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
817/// errors so that callers can recover from contract violations without
818/// process abort. Every variant corresponds to a construction-time
819/// invariant that the caller is responsible for upholding; the dispatcher
820/// rejects the registration before any reactive state is created (so
821/// there is no `Message::Error` channel through which to surface the
822/// failure — these are imperative-layer errors, not reactive ones).
823///
824/// All variants are zero-side-effect: when [`Core::register`] returns
825/// `Err`, no node has been added to the graph and any handle retains
826/// taken on the way in (e.g. operator scratch seed retains via
827/// [`BindingBoundary::retain_handle`]) have been released.
828#[derive(Error, Debug, Clone, PartialEq, Eq)]
829pub enum RegisterError {
830 /// One of the supplied dep ids is not a registered node.
831 #[error("register: unknown dep {0:?}")]
832 UnknownDep(NodeId),
833
834 /// `op` was supplied (operator node) but `deps` was empty. Operator
835 /// nodes need at least one dep — for subscription-managed combinators
836 /// with no declared deps, use [`Core::register_producer`] instead.
837 #[error(
838 "register: operator nodes require at least one dep — \
839 use register_producer for subscription-managed combinators"
840 )]
841 OperatorWithoutDeps,
842
843 /// [`NodeOpts::initial`] was set to a real handle but the registration
844 /// shape is not a state node (state nodes are `deps.is_empty() &&
845 /// fn_id.is_none() && op.is_none()`). Initial cache only makes sense
846 /// for state nodes.
847 #[error("register: NodeOpts::initial only valid for state nodes (no deps + no fn + no op)")]
848 InitialOnlyForStateNodes,
849
850 /// A supplied dep is terminal (COMPLETE / ERROR) AND not
851 /// resubscribable. Adding it would create a permanent wedge — the dep
852 /// will never re-emit, so the registered node would be stuck.
853 /// Mirrors [`SetDepsError::TerminalDep`] at registration time.
854 #[error(
855 "register: dep {0:?} is terminal and not resubscribable; \
856 mark it resubscribable before terminating, or remove it from the dep list"
857 )]
858 TerminalDep(NodeId),
859
860 /// A stateful operator ([`OperatorOp::Scan`] / [`OperatorOp::Reduce`])
861 /// was registered with `seed = NO_HANDLE`. R2.5.3 first-run gate
862 /// requires the seed to be a real handle so that the operator can
863 /// emit on its first fire.
864 #[error("register: operator seed must be a real handle (R2.5.3); got NO_HANDLE")]
865 OperatorSeedSentinel,
866}
867
868/// Errors returnable by [`Core::set_pausable_mode`].
869///
870/// Slice H (2026-05-07) promoted these from `assert!`/`panic!` to typed
871/// errors. Same imperative-layer error model as [`RegisterError`].
872#[derive(Error, Debug, Clone, PartialEq, Eq)]
873pub enum SetPausableModeError {
874 /// `node_id` is not a registered node.
875 #[error("set_pausable_mode: unknown node {0:?}")]
876 UnknownNode(NodeId),
877 /// The node currently holds at least one pause lock. Changing pausable
878 /// mode mid-pause would lose buffered content or strand a
879 /// `pending_wave` flag — resume all locks first.
880 #[error(
881 "set_pausable_mode: cannot change pausable mode while paused; \
882 resume all locks first"
883 )]
884 WhilePaused,
885}
886
887/// Per-dep record. Replaces the parallel `deps` / `dep_handles` /
888/// `dep_terminals` vectors from v1. Canonical spec R2.9.b alignment.
889///
890/// Each entry tracks one dep's lifecycle state, wave-scoped batch data,
891/// and cross-wave `prev_data` for `ctx.prevData` access.
892pub(crate) struct DepRecord {
893 /// The dep node this record tracks.
894 pub(crate) node: NodeId,
895 /// Last DATA handle from the end of the previous wave. [`NO_HANDLE`]
896 /// means the dep has never emitted DATA.
897 pub(crate) prev_data: HandleId,
898 /// Per-dep dirty flag — awaiting DATA/RESOLVED for current wave.
899 pub(crate) dirty: bool,
900 /// Per-dep involved-this-wave flag. Distinguishes:
901 /// - `involved && data_batch.is_empty()` → dep settled RESOLVED
902 /// - `!involved && data_batch.is_empty()` → dep was not in this wave
903 pub(crate) involved_this_wave: bool,
904 /// DATA handles accumulated this wave. Outside `batch()` scope, at most
905 /// 1 element. Inside `batch()`, K emits on the source produce K entries
906 /// per R1.3.6.b coalescing. Each handle holds a `retain_handle` share
907 /// taken at `deliver_data_to_consumer` time; released at wave-end
908 /// rotation in `clear_wave_state`.
909 pub(crate) data_batch: SmallVec<[HandleId; 1]>,
910 /// Terminal state for this dep. `None` = dep is live.
911 /// `Some` = dep emitted COMPLETE/ERROR. When ALL entries are Some,
912 /// the node auto-cascades per Lock 2.B (ERROR dominates COMPLETE).
913 pub(crate) terminal: Option<TerminalKind>,
914}
915
916impl DepRecord {
917 fn new(node: NodeId) -> Self {
918 Self {
919 node,
920 prev_data: NO_HANDLE,
921 dirty: false,
922 involved_this_wave: false,
923 data_batch: SmallVec::new(),
924 terminal: None,
925 }
926 }
927}
928
929/// Internal node record. Mirrors `core.ts:132–154` post-D030 unification.
930///
931/// **Kind is derived, not stored** (D030, Slice D). `(dep_records.is_empty(),
932/// fn_id, op, is_dynamic)` uniquely identifies the kind — see [`NodeKind`].
933/// Helper methods (`is_state()`, `is_producer()`, `is_compute()`,
934/// `is_operator()`, `skips_auto_cascade()`, `kind()`) cover the common
935/// predicates without unpacking via [`Core::kind_of`].
936///
937/// The 5 bool fields (`has_fired_once`, `dirty`, `involved_this_wave`,
938/// `has_received_teardown`, `resubscribable`, `is_dynamic`) each represent
939/// an orthogonal concern. `is_dynamic` is constant per node (set at
940/// register time); the others are mutable lifecycle state. Collapsing
941/// them into a bitfield would obscure intent.
942#[allow(clippy::struct_excessive_bools)]
943pub(crate) struct NodeRecord {
944 /// Per-dep records. Replaces the old parallel `deps` / `dep_handles` /
945 /// `dep_terminals` vecs. Dep NodeIds derived via `dep_ids()`.
946 pub(crate) dep_records: Vec<DepRecord>,
947 /// User-fn id for closure-form dispatch. `Some` for Derived / Dynamic /
948 /// Producer; `None` for State / Operator. (Operator dispatch goes via
949 /// [`Self::op`] instead.)
950 pub(crate) fn_id: Option<FnId>,
951 /// Operator discriminant for typed-op dispatch. `Some` for Operator
952 /// nodes; `None` otherwise. Mutually exclusive with `fn_id` (a node is
953 /// either closure-form OR typed-op, never both).
954 pub(crate) op: Option<OperatorOp>,
955 /// True for Dynamic nodes (R2.5.3 — fn declares actually-tracked dep
956 /// indices per fire). False for everything else. Only meaningful when
957 /// `fn_id.is_some()` AND `!dep_records.is_empty()`.
958 pub(crate) is_dynamic: bool,
959 pub(crate) equals: EqualsMode,
960
961 // Mutable state
962 pub(crate) cache: HandleId,
963 pub(crate) has_fired_once: bool,
964 pub(crate) subscribers: HashMap<SubscriptionId, Sink>,
965 /// Monotonic counter bumped on every mutation of [`Self::subscribers`]
966 /// (insert on subscribe, remove on `Subscription::Drop`, remove on
967 /// handshake-panic cleanup). Used by
968 /// [`crate::batch::Core::queue_notify`] to detect mid-wave subscriber-
969 /// set changes and start a fresh `PendingBatch` with an updated sink
970 /// snapshot — closes D2 (Slice X4, 2026-05-08): the late-subscriber
971 /// and multi-emit-per-wave gap where the pre-fix per-node single
972 /// snapshot meant a sub installed between two emits to the same node
973 /// in one wave was invisible to the second emit's flush.
974 ///
975 /// Per-node (not per-Core) so that a subscribe to node A doesn't
976 /// invalidate snapshot reuse for node B's pending batch in the same
977 /// wave.
978 pub(crate) subscribers_revision: u64,
979 /// For dynamic nodes: which dep indices fn actually tracks.
980 /// For static derived: all indices, populated at construction.
981 pub(crate) tracked: HashSet<usize>,
982
983 // Wave-scoped state — cleared at wave end.
984 pub(crate) dirty: bool,
985 pub(crate) involved_this_wave: bool,
986
987 /// Per-node pause state. Default `Active`. See [`PauseState`].
988 pub(crate) pause_state: PauseState,
989 /// Pause behavior mode (canonical-spec §2.6). Set at registration via
990 /// [`NodeOpts::pausable`]. Default [`PausableMode::Default`] suppresses
991 /// fn-fire while paused and consolidates N pause-window dep deliveries
992 /// into one fn-fire on RESUME; `ResumeAll` buffers tier-3/4 outgoing
993 /// for verbatim replay; `Off` ignores PAUSE entirely. See
994 /// [`PausableMode`].
995 pub(crate) pausable: PausableMode,
996 /// Replay buffer cap (R2.6.5 / Lock 6.G — Slice E1, 2026-05-07).
997 /// `None` disables; `Some(N)` keeps a circular VecDeque of the last N
998 /// DATA-handle emissions for late-subscriber replay. Each handle in
999 /// the buffer owns one binding-side retain share, released on evict
1000 /// (cap exceeded) or in `Drop for CoreState`.
1001 pub(crate) replay_buffer_cap: Option<usize>,
1002 pub(crate) replay_buffer: VecDeque<HandleId>,
1003
1004 /// Terminal lifecycle state for THIS node's outgoing stream. Once set,
1005 /// further `emit` calls are silent no-ops, fn no longer fires, and only
1006 /// the terminal message has been queued downstream.
1007 pub(crate) terminal: Option<TerminalKind>,
1008 /// True after the first TEARDOWN has been processed for this node
1009 /// (R2.6.4 / Lock 6.F). Subsequent TEARDOWN deliveries are idempotent
1010 /// — the auto-prepended COMPLETE only fires on the first one. Without
1011 /// this flag, a redundant TEARDOWN delivered via the cascade plus an
1012 /// explicit `core.teardown(node)` would re-emit `[COMPLETE, TEARDOWN]`
1013 /// to subscribers per delivery, which is incorrect.
1014 pub(crate) has_received_teardown: bool,
1015 /// Per R2.2.7 / R2.5.3 — resubscribable terminal lifecycle.
1016 /// When `true` AND `terminal == Some(...)`, a fresh subscribe call
1017 /// will reset the node: clear `terminal`, `has_fired_once`,
1018 /// `has_received_teardown`, all dep_records to sentinel, and drain the
1019 /// pause lockset. Default `false`.
1020 pub(crate) resubscribable: bool,
1021 /// Meta companion nodes attached to this node per R1.3.9.d. When this
1022 /// node tears down, its meta companions are torn down FIRST (before
1023 /// the main node's auto-COMPLETE + TEARDOWN wire emission), so
1024 /// observers see companions terminate before the parent. The ordering
1025 /// is load-bearing — meta nodes typically subscribe to parent state
1026 /// that becomes inconsistent during the parent's destruction phase.
1027 pub(crate) meta_companions: Vec<NodeId>,
1028 /// R5.4 / D011 partial-mode: when `true`, fire_fn skips the R2.5.3
1029 /// first-run gate — the node fires as soon as ANY dep delivers a
1030 /// real handle, even if other deps remain sentinel. Defaults to
1031 /// `false` (gated). Lifted into Core for operator support; for
1032 /// State/Derived/Dynamic nodes the field is settable but the gated
1033 /// path remains the typical caller default.
1034 pub(crate) partial: bool,
1035 /// Generic per-operator scratch slot (Slice C-3, D026). Replaces
1036 /// the typed `operator_state: HandleId` field used by Slices C-1 / C-2.
1037 /// `None` for non-operator kinds and operators with no cross-wave
1038 /// state (Map / Filter / Combine / WithLatestFrom / Merge); `Some`
1039 /// for stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
1040 /// [`DistinctUntilChanged`] / [`Pairwise`] / [`Take`] / [`Skip`] /
1041 /// [`TakeWhile`] / [`Last`]).
1042 ///
1043 /// The boxed value implements
1044 /// [`OperatorScratch`](crate::op_state::OperatorScratch); its
1045 /// `release_handles` method is called from
1046 /// [`reset_for_fresh_lifecycle`] (resubscribable terminal cycle) and
1047 /// from [`Drop for CoreState`].
1048 ///
1049 /// **Refcount discipline:** the state struct owns whatever handle
1050 /// shares it stores (e.g., [`ScanState::acc`](crate::op_state::ScanState::acc),
1051 /// [`LastState::latest`](crate::op_state::LastState::latest)).
1052 /// Per-fire helpers retain the new value before releasing the old;
1053 /// `release_handles` releases the current shares at end-of-life.
1054 pub(crate) op_scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1055}
1056
1057impl NodeRecord {
1058 // ---- Kind predicates (D030 — derived from field shape) ----
1059
1060 /// True iff this is a state node (no deps, no fn, no op).
1061 pub(crate) fn is_state(&self) -> bool {
1062 self.dep_records.is_empty() && self.fn_id.is_none() && self.op.is_none()
1063 }
1064
1065 /// True iff this is a producer node (no deps + has fn + no op).
1066 /// Producers fire fn once on first subscribe; cleanup fires via
1067 /// [`BindingBoundary::producer_deactivate`] (D031, Slice D).
1068 pub(crate) fn is_producer(&self) -> bool {
1069 self.dep_records.is_empty() && self.fn_id.is_some() && self.op.is_none()
1070 }
1071
1072 /// True iff this is a compute node (Derived / Dynamic / Operator) —
1073 /// has at least one dep AND either a fn or an op.
1074 #[allow(dead_code)] // Convenience predicate; callers may use is_state/is_producer instead.
1075 pub(crate) fn is_compute(&self) -> bool {
1076 !self.dep_records.is_empty() && (self.fn_id.is_some() || self.op.is_some())
1077 }
1078
1079 /// True iff this is an Operator node (has op set).
1080 #[allow(dead_code)] // Direct `op.is_some()` is more common; this is a readability sugar.
1081 pub(crate) fn is_operator(&self) -> bool {
1082 self.op.is_some()
1083 }
1084
1085 /// True iff this node opts OUT of Lock 2.B auto-cascade —
1086 /// Operator(Reduce) / Operator(Last) intercept upstream COMPLETE.
1087 pub(crate) fn skips_auto_cascade(&self) -> bool {
1088 match self.op {
1089 Some(op) => NodeKind::Operator(op).skips_auto_cascade(),
1090 None => false,
1091 }
1092 }
1093
1094 /// Compute the public-API [`NodeKind`] from the field shape (D030).
1095 /// Used by [`Core::kind_of`] and rare internal sites that need the
1096 /// enum (most use the predicate methods above).
1097 pub(crate) fn kind(&self) -> NodeKind {
1098 if let Some(op) = self.op {
1099 NodeKind::Operator(op)
1100 } else if self.dep_records.is_empty() {
1101 if self.fn_id.is_some() {
1102 NodeKind::Producer
1103 } else {
1104 NodeKind::State
1105 }
1106 } else if self.is_dynamic {
1107 NodeKind::Dynamic
1108 } else {
1109 NodeKind::Derived
1110 }
1111 }
1112
1113 // ---- Existing accessors ----
1114
1115 /// Iterator over dep NodeIds in declaration order.
1116 pub(crate) fn dep_ids(&self) -> impl Iterator<Item = NodeId> + '_ {
1117 self.dep_records.iter().map(|r| r.node)
1118 }
1119
1120 /// Collected dep NodeIds — for call sites that need a `Vec<NodeId>`.
1121 pub(crate) fn dep_ids_vec(&self) -> Vec<NodeId> {
1122 self.dep_ids().collect()
1123 }
1124
1125 /// Number of deps.
1126 pub(crate) fn dep_count(&self) -> usize {
1127 self.dep_records.len()
1128 }
1129
1130 /// True if any dep is in sentinel state (never emitted DATA and no
1131 /// data this wave). Replaces the old `dep_handles.contains(&NO_HANDLE)`.
1132 pub(crate) fn has_sentinel_deps(&self) -> bool {
1133 self.dep_records
1134 .iter()
1135 .any(|r| r.prev_data == NO_HANDLE && r.data_batch.is_empty())
1136 }
1137
1138 /// Find the index of a dep by NodeId.
1139 pub(crate) fn dep_index_of(&self, dep_id: NodeId) -> Option<usize> {
1140 self.dep_records.iter().position(|r| r.node == dep_id)
1141 }
1142
1143 /// True if ALL dep terminal slots are populated (Lock 2.B cascade check).
1144 pub(crate) fn all_deps_terminal(&self) -> bool {
1145 !self.dep_records.is_empty() && self.dep_records.iter().all(|r| r.terminal.is_some())
1146 }
1147}
1148
1149/// All mutable Core state, behind one [`parking_lot::Mutex`].
1150///
1151/// v1 single-mutex; per-subgraph `ReentrantMutex` parallelism is a later
1152/// optimization (CLAUDE.md Rust invariant 3).
1153pub(crate) struct CoreState {
1154 pub(crate) next_node_id: u64,
1155 pub(crate) next_subscription_id: u64,
1156 pub(crate) next_lock_id: u64,
1157 pub(crate) nodes: HashMap<NodeId, NodeRecord>,
1158 /// Inverted adjacency: `parent → children`. Updated on registration.
1159 pub(crate) children: HashMap<NodeId, HashSet<NodeId>>,
1160 /// Nodes whose fn we owe a fire to — drained by [`Core::run_wave`].
1161 pub(crate) pending_fires: HashSet<NodeId>,
1162 /// Per-node outgoing message buffer; flushed at wave end. Insertion-
1163 /// ordered so flush order is deterministic — load-bearing for
1164 /// R1.3.9.d meta-TEARDOWN ordering: when a parent and its meta
1165 /// companion both have queued messages in the same wave, the meta
1166 /// (queued first via `teardown_inner`'s recursion order) flushes
1167 /// first.
1168 ///
1169 /// Each entry carries the per-wave subscriber snapshot taken at first
1170 /// touch (Slice A close, M1: lock-released drain). Late subscribers
1171 /// installed mid-wave between fn-fire iterations don't appear in
1172 /// already-snapshotted entries; this is the load-bearing fix that
1173 /// prevents duplicate-Data delivery when a handshake delivers the
1174 /// post-commit cache and the wave's flush would otherwise also fire
1175 /// to the same sink.
1176 pub(crate) pending_notify: IndexMap<NodeId, PendingPerNode>,
1177 pub(crate) in_tick: bool,
1178 /// Core-global cap on per-node pause replay buffer length. `None` means
1179 /// unbounded. Per the user direction (Q1, 2026-05-05): start core-global;
1180 /// per-node override can be added later as a pure addition without API
1181 /// breakage. Default `None`.
1182 pub(crate) pause_buffer_cap: Option<usize>,
1183 /// Core-global cap on wave-drain iterations before
1184 /// [`crate::batch::Core::drain_and_flush`] aborts with a diagnostic panic.
1185 /// Replaces the prior `MAX_DRAIN_ITERATIONS` hard-coded constant
1186 /// (R4.3 / Lock 2.F′). Default `10_000`.
1187 ///
1188 /// The drain loop bound exists to surface runtime cycles
1189 /// (e.g. an operator that re-arms its own `pending_fires` slot during
1190 /// `invoke_fn`) as a panic with context, rather than letting Core
1191 /// spin forever. Structural cycles via [`Core::set_deps`] are
1192 /// rejected at edge-mutation time (`SetDepsError::WouldCreateCycle`);
1193 /// registration is structurally cycle-safe by construction (the new
1194 /// node's id is not allocated until AFTER deps are validated, so deps
1195 /// cannot transitively reach the new node). The drain bound is the
1196 /// safety net for runtime cycles that bypass both static checks.
1197 pub(crate) max_batch_drain_iterations: u32,
1198 /// Deferred sink-fire jobs collected by `flush_notifications`. The wave
1199 /// engine populates this under the state lock during the flush phase;
1200 /// `run_wave` then drops the lock and fires the jobs. Each tuple is
1201 /// `(sinks_for_one_node_one_phase, phase_messages)`. Empty between waves.
1202 pub(crate) deferred_flush_jobs: crate::batch::DeferredJobs,
1203 /// Payload-handle releases owed for messages that landed in
1204 /// `pending_notify` during this wave (one per `payload_handle()`).
1205 /// `run_wave` releases these after sinks fire and the lock is dropped,
1206 /// balancing the retain done in `queue_notify`.
1207 pub(crate) deferred_handle_releases: Vec<HandleId>,
1208 /// Binding-boundary handle for `Drop`-time refcount balancing.
1209 /// `Core` also holds a clone of this Arc; storing it here lets
1210 /// `Drop for CoreState` walk every retained slot and release the
1211 /// binding-side share when the last `Core` clone drops. Without this,
1212 /// `cache` / `terminal` / `dep_terminals` Error / pause-buffer payload
1213 /// handle refs leak in the binding registry until process exit.
1214 pub(crate) binding: Arc<dyn BindingBoundary>,
1215 /// Pre-wave cache snapshots used to restore state if the wave aborts
1216 /// mid-flight (e.g., a `Core::batch` closure panics). Each entry is
1217 /// `(node_id → old_cache_handle)` — the handle the node held BEFORE
1218 /// the wave started writing to it. The snapshotted handle holds a
1219 /// retain (taken when the snapshot was inserted) so it stays alive
1220 /// for restoration. On wave success, snapshots are dropped and their
1221 /// retains released. On wave abort (`BatchGuard::drop` panic-discard
1222 /// path), each cache slot is restored from the snapshot — the slot's
1223 /// current handle is released, and the snapshot's retain transfers
1224 /// to the cache slot. Only populated for in-flight waves; empty
1225 /// between waves.
1226 pub(crate) wave_cache_snapshots: HashMap<NodeId, HandleId>,
1227 /// Nodes that need an auto-Resolved at wave end if they don't receive
1228 /// a tier-3+ message from their own commit_emission. Populated by
1229 /// the RESOLVED child propagation in `commit_emission` (which queues
1230 /// Dirty but defers Resolved to avoid double-settlement). Drained by
1231 /// the auto-resolve sweep in `drain_and_flush`. Cleared by
1232 /// `clear_wave_state`.
1233 pub(crate) pending_auto_resolve: ahash::AHashSet<NodeId>,
1234 /// Topology-change sinks. Keyed by subscription id for O(1) removal.
1235 pub(crate) topology_sinks: HashMap<u64, crate::topology::TopologySink>,
1236 pub(crate) next_topology_id: u64,
1237 /// A6 reentrancy guard (Slice F, 2026-05-07): the stack of NodeIds whose
1238 /// fn is currently being invoked on the wave-owner thread. Pushed at the
1239 /// top of `fire_fn` (just before the lock-released `invoke_fn` call) and
1240 /// popped on return / unwind via the [`crate::batch::FiringGuard`] RAII
1241 /// helper. [`Core::set_deps`] consults this set and rejects with
1242 /// [`SetDepsError::ReentrantOnFiringNode`] if `n` is currently firing —
1243 /// preventing the D1 `tracked` index corruption (see
1244 /// `porting-deferred.md` "Set_deps from inside firing node's fn corrupts
1245 /// Dynamic `tracked` indices").
1246 ///
1247 /// Stack rather than set so nested fn re-entrance (Producer subscribing
1248 /// to a fn that itself fires another fn) tracks every concurrently-firing
1249 /// node on the wave-owner. `Vec` rather than `HashSet` because the
1250 /// expected depth is small (typically 1, occasionally 2–3 with
1251 /// higher-order operators) and linear scan is faster than hash for that
1252 /// size.
1253 pub(crate) currently_firing: Vec<NodeId>,
1254 /// R1.3.8.c pause-overflow ERROR synthesis queue (Slice F, A3 —
1255 /// 2026-05-07). Recorded by [`Core::queue_notify`] when the pause
1256 /// buffer first overflows in a cycle; drained at wave-end after the
1257 /// lock-released call to
1258 /// [`crate::boundary::BindingBoundary::synthesize_pause_overflow_error`].
1259 ///
1260 /// One entry per (node × pause-cycle); subsequent overflows in the
1261 /// same cycle don't re-queue (gated by `PauseState::overflow_reported`).
1262 pub(crate) pending_pause_overflow: Vec<PendingPauseOverflow>,
1263 /// Slice G (R1.3.2.d / R1.3.3.a — 2026-05-07): nodes that have emitted
1264 /// at least one tier-3 message (Data or Resolved) in the CURRENT wave.
1265 /// Wave-scoped (cleared in `clear_wave_state`). Used by
1266 /// [`crate::batch::Core::commit_emission`] to detect "this is a
1267 /// subsequent emit at this node in the same wave" — when set,
1268 /// equals substitution is skipped (would produce a R1.3.3.a-violating
1269 /// mixed wave) and any prior Resolved entries in pending_notify or
1270 /// the pause buffer are rewritten to Data using the wave-start cache
1271 /// snapshot.
1272 ///
1273 /// Distinct from `pending_pause_overflow` (per-pause-cycle, not
1274 /// per-wave) and `wave_cache_snapshots` (per-wave snapshot, but only
1275 /// populated on Data path pre-Slice-G). Populated by both Data and
1276 /// Resolved branches of `commit_emission`; NOT populated by
1277 /// `commit_emission_verbatim` (Batch path passes through verbatim
1278 /// per R1.3.3.c).
1279 pub(crate) tier3_emitted_this_wave: ahash::AHashSet<NodeId>,
1280 /// Slice E2 (R1.3.9.b strict reading per D057): per-wave-per-node
1281 /// dedup for `OnInvalidate` cleanup hook firing. A node already in
1282 /// this set this wave has already had its `OnInvalidate` queued into
1283 /// `deferred_cleanup_hooks` and MUST NOT queue again, even if
1284 /// `invalidate_inner` re-encounters it (rare: only matters when the
1285 /// node re-populates mid-wave via fn-fire and then gets re-invalidated
1286 /// in the same wave through a separate path).
1287 ///
1288 /// Cleared in [`CoreState::clear_wave_state`] alongside the other
1289 /// wave-scoped queues.
1290 pub(crate) invalidate_hooks_fired_this_wave: ahash::AHashSet<NodeId>,
1291 /// Slice E2 (per D060/D061): lock-released drain queue for
1292 /// `OnInvalidate` cleanup hooks. Populated under the state lock by
1293 /// `Core::invalidate_inner` when a node's cache transitions
1294 /// `!= NO_HANDLE → NO_HANDLE`; drained after the lock drops at wave
1295 /// boundary by [`crate::batch::Core::fire_deferred_cleanup_hooks`]
1296 /// (each call wrapped in `catch_unwind` so a single binding panic
1297 /// doesn't short-circuit the drain — last panic re-raises after the
1298 /// loop completes per D060).
1299 ///
1300 /// **Panic-discard semantics (D061):** cleared in
1301 /// [`CoreState::clear_wave_state`] without firing — a panic-discarded
1302 /// wave drops the queued cleanup hooks silently, mirroring the
1303 /// `pending_pause_overflow` precedent (Slice F /qa A3). Bindings using
1304 /// `OnInvalidate` for external-resource cleanup MUST idempotent-cleanup
1305 /// at process exit or next successful invalidate cycle.
1306 pub(crate) deferred_cleanup_hooks: Vec<(NodeId, crate::boundary::CleanupTrigger)>,
1307 /// Slice E2 /qa Q2(b) (D069): lock-released drain queue for
1308 /// `BindingBoundary::wipe_ctx` calls fired eagerly from
1309 /// `Core::terminate_node` when a resubscribable node terminates with
1310 /// no live subscribers. Pairs with the `Subscription::Drop` direct-
1311 /// fire site (mutually exclusive: subs-empty-at-terminate routes
1312 /// here; subs-non-empty-at-terminate fires from Subscription::Drop's
1313 /// last-sub-drop path). Drained alongside `deferred_cleanup_hooks`
1314 /// at wave boundary; same `catch_unwind` discipline so a single
1315 /// binding panic doesn't short-circuit the drain. Same panic-discard
1316 /// semantics as `deferred_cleanup_hooks` (silent drop on
1317 /// panic-discarded waves).
1318 pub(crate) pending_wipes: Vec<NodeId>,
1319}
1320
1321/// The handle-protocol Core dispatcher.
1322///
1323/// Holds an [`Arc`] to the [`BindingBoundary`] and all dispatch state. Cheap
1324/// to clone (the inner `Arc<Mutex<CoreState>>` is shared); pass `Core` by
1325/// value to threads.
1326///
1327/// # Wave-owner re-entrant mutex (Slice A close /qa, M1)
1328///
1329/// The state lock (`state: Mutex<CoreState>`) is **dropped** around binding
1330/// callbacks (`invoke_fn`, `custom_equals`) so user fns may re-enter Core.
1331/// To preserve serializability of WAVE EXECUTION across threads — without
1332/// re-introducing the lock-held-during-fn-fire deadlock the Slice A close
1333/// refactor lifted — the wave engine acquires `wave_owner` (a
1334/// [`parking_lot::ReentrantMutex`]) for the lifetime of each wave.
1335///
1336/// Properties:
1337///
1338/// - **Same-thread re-entrance is free.** A user fn that calls back into
1339/// `Core::emit` / `Core::pause` / etc. mid-fire re-acquires `wave_owner`
1340/// on the same thread and runs as a nested wave (the inner `run_wave`
1341/// sees `in_tick=true` and skips drain — outer drain picks up).
1342/// - **Cross-thread emits BLOCK** at `wave_owner.lock_arc()` until the
1343/// in-flight wave completes (drain + flush + sink fire all done). This
1344/// serializes wave OWNERSHIP across threads, while still allowing the
1345/// state lock to drop inside the wave for binding callbacks.
1346///
1347/// Without this, Slice A close's lock-released drain let cross-thread
1348/// emits absorb into the in-flight wave's `pending_notify` and return
1349/// before subscribers fire — breaking the user-facing happens-after
1350/// contract that `emit` returning means subscribers have observed.
1351#[derive(Clone)]
1352pub struct Core {
1353 pub(crate) state: Arc<Mutex<CoreState>>,
1354 pub(crate) binding: Arc<dyn BindingBoundary>,
1355 pub(crate) wave_owner: Arc<ReentrantMutex<()>>,
1356 /// Slice X5 (D3 substrate, 2026-05-08): per-subgraph union-find
1357 /// registry. Tracks each registered node's connected-component
1358 /// membership (a "subgraph") so cross-thread emits to disjoint
1359 /// components can run truly parallel via per-component
1360 /// `wave_owner` (Y1 commit-2 wires the wave engine through the
1361 /// registry; X5 commit-1 just maintains the union-find state).
1362 ///
1363 /// Direct port of [`graphrefly-py`'s
1364 /// `subgraph_locks.py`](https://github.com/graphrefly/graphrefly-py/blob/main/src/graphrefly/core/subgraph_locks.py)
1365 /// design (locked in [`SESSION-rust-port-d3-per-subgraph-parallelism.md`](https://github.com/graphrefly/graphrefly-ts/blob/main/archive/docs/SESSION-rust-port-d3-per-subgraph-parallelism.md)).
1366 pub(crate) registry: Arc<parking_lot::Mutex<crate::subgraph::SubgraphRegistry>>,
1367}
1368
1369/// Weak handle to a [`Core`] — does not contribute to strong refcount.
1370///
1371/// Constructed via [`Core::weak_handle`]; upgraded back to a strong
1372/// [`Core`] via [`WeakCore::upgrade`]. Used by long-lived binding-stored
1373/// closures (notably `ProducerBuildFn`s registered via
1374/// [`graphrefly_operators::ProducerBinding::register_producer_build`])
1375/// to break the BenchBinding → registry → closure → strong-Core cycle
1376/// that would otherwise leak the entire graph state when a `BenchCore`
1377/// drops with active producer registrations.
1378///
1379/// Upgrade on each invocation; if the host `Core` was already dropped,
1380/// `upgrade()` returns `None` and the closure should no-op (the host
1381/// is being torn down, no work to do).
1382#[derive(Clone)]
1383pub struct WeakCore {
1384 state: Weak<Mutex<CoreState>>,
1385 binding: Weak<dyn BindingBoundary>,
1386 wave_owner: Weak<ReentrantMutex<()>>,
1387 registry: Weak<parking_lot::Mutex<crate::subgraph::SubgraphRegistry>>,
1388}
1389
1390impl WeakCore {
1391 /// Try to upgrade back to a strong [`Core`]. Returns `None` if the
1392 /// host `Core`'s strong count has reached zero (i.e. the host
1393 /// `BenchCore` / equivalent owner was dropped).
1394 #[must_use]
1395 pub fn upgrade(&self) -> Option<Core> {
1396 Some(Core {
1397 state: self.state.upgrade()?,
1398 binding: self.binding.upgrade()?,
1399 wave_owner: self.wave_owner.upgrade()?,
1400 registry: self.registry.upgrade()?,
1401 })
1402 }
1403}
1404
1405/// RAII guard that owns an [`OperatorScratch`] until either (a) the
1406/// caller `take()`s it for installation, or (b) the guard drops on an
1407/// early return / unwind, in which case the scratch's handle retains
1408/// are released via [`OperatorScratch::release_handles`].
1409///
1410/// Slice H /qa F1 + F2 (2026-05-07): closes two related correctness
1411/// gaps in `Core::register`:
1412///
1413/// 1. **TOCTOU window** — the original three-phase split called
1414/// `lock_state()` twice (once for validation, once for insertion),
1415/// so a concurrent `Core::complete(dep)` on a non-resubscribable
1416/// dep could slip in between the two acquisitions and re-create
1417/// the wedge `RegisterError::TerminalDep` was designed to prevent.
1418/// The guard plus a single locked region for both phases closes
1419/// this gap (release runs lock-released because guard variables
1420/// drop in reverse declaration order — guard declared BEFORE
1421/// `lock_state()`, so the lock guard drops first).
1422///
1423/// 2. **Panic-unsafe scratch leak** — without an RAII drop, a panic
1424/// between `make_op_scratch` (Phase 2) and the explicit
1425/// `if let Err(e)` cleanup branch (e.g., `lock_state()` reentrance
1426/// assert, OOM-as-panic on Vec growth in dep iteration) would
1427/// drop the `Box<dyn OperatorScratch>` without releasing the
1428/// seed/default retain. The guard's `Drop` impl releases on any
1429/// unwind path.
1430///
1431/// Lock-discipline: the guard holds `&dyn BindingBoundary` (through
1432/// the `Arc<dyn BindingBoundary>` it borrows from). On `Drop`, it
1433/// invokes `release_handles` lock-released — fires AFTER any
1434/// `MutexGuard<CoreState>` declared later in the same scope drops
1435/// (LIFO destruction order). Mirrors `Core::resume` Phase 2 release
1436/// pattern.
1437struct ScratchReleaseGuard<'a> {
1438 scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1439 binding: &'a dyn BindingBoundary,
1440}
1441
1442impl<'a> ScratchReleaseGuard<'a> {
1443 fn new(
1444 scratch: Option<Box<dyn crate::op_state::OperatorScratch>>,
1445 binding: &'a dyn BindingBoundary,
1446 ) -> Self {
1447 Self { scratch, binding }
1448 }
1449
1450 /// Take ownership of the scratch — disarms the release-on-drop
1451 /// behavior. Used on the success path to install the scratch on
1452 /// `NodeRecord.op_scratch`.
1453 fn take(mut self) -> Option<Box<dyn crate::op_state::OperatorScratch>> {
1454 self.scratch.take()
1455 }
1456}
1457
1458impl Drop for ScratchReleaseGuard<'_> {
1459 fn drop(&mut self) {
1460 if let Some(mut scratch) = self.scratch.take() {
1461 scratch.release_handles(self.binding);
1462 }
1463 }
1464}
1465
1466impl Core {
1467 /// Construct a fresh Core wired to the given binding. Pause buffer cap
1468 /// defaults to unbounded; set via [`Self::set_pause_buffer_cap`].
1469 #[must_use]
1470 pub fn new(binding: Arc<dyn BindingBoundary>) -> Self {
1471 Self {
1472 state: Arc::new(Mutex::new(CoreState {
1473 next_node_id: 1,
1474 next_subscription_id: 1,
1475 // A4 (Slice F, 2026-05-07): start `next_lock_id` in the high
1476 // half of the u32 range so `alloc_lock_id` can't collide with
1477 // user-supplied `LockId::new(N)` constructors (which the
1478 // napi-rs binding marshals from `u32` and tests typically use
1479 // in the low range, 1..1024). Phase E /qa F1 (2026-05-08):
1480 // lowered from `1u64 << 32` to `1u64 << 31` so the value
1481 // round-trips through `u32::try_from(...)` at the napi
1482 // boundary — the previous seed errored every napi
1483 // `alloc_lock_id` call. Anti-collision intent (high range vs
1484 // low user range) preserved at half the prior ceiling
1485 // (2^31 ≈ 2 billion allocations per Core, ample for parity
1486 // tests). Lift the floor when the deferred BigInt-narrowing
1487 // migration extends `LockId` to `u64` at the FFI layer
1488 // (porting-deferred "BigInt migration for u32-narrowed napi
1489 // types" entry).
1490 next_lock_id: 1u64 << 31,
1491 nodes: HashMap::new(),
1492 children: HashMap::new(),
1493 pending_fires: HashSet::new(),
1494 pending_notify: IndexMap::new(),
1495 in_tick: false,
1496 pause_buffer_cap: None,
1497 max_batch_drain_iterations: 10_000,
1498 deferred_flush_jobs: Vec::new(),
1499 deferred_handle_releases: Vec::new(),
1500 binding: binding.clone(),
1501 wave_cache_snapshots: HashMap::new(),
1502 pending_auto_resolve: ahash::AHashSet::new(),
1503 topology_sinks: HashMap::new(),
1504 next_topology_id: 1,
1505 currently_firing: Vec::new(),
1506 pending_pause_overflow: Vec::new(),
1507 tier3_emitted_this_wave: ahash::AHashSet::new(),
1508 invalidate_hooks_fired_this_wave: ahash::AHashSet::new(),
1509 deferred_cleanup_hooks: Vec::new(),
1510 pending_wipes: Vec::new(),
1511 })),
1512 binding,
1513 wave_owner: Arc::new(ReentrantMutex::new(())),
1514 registry: Arc::new(parking_lot::Mutex::new(
1515 crate::subgraph::SubgraphRegistry::new(),
1516 )),
1517 }
1518 }
1519
1520 /// Acquire the state lock.
1521 ///
1522 /// Post-Slice-E: `Core::subscribe` fires the per-tier handshake
1523 /// LOCK-RELEASED with `wave_owner` held; sink callbacks may freely
1524 /// re-enter Core (`emit` / `complete` / `error` / nested `subscribe`).
1525 /// Same-thread re-entry passes through `wave_owner`'s `ReentrantMutex`
1526 /// transparently; cross-thread emits block on `wave_owner` until the
1527 /// outer subscribe completes, preserving R1.3.5.a happens-after
1528 /// ordering. The previous `IN_HANDSHAKE_FIRE` panic-diagnostic is no
1529 /// longer needed.
1530 pub(crate) fn lock_state(&self) -> MutexGuard<'_, CoreState> {
1531 self.state.lock()
1532 }
1533
1534 /// Whether `self` and `other` point to the same dispatcher state.
1535 /// True when one was produced by `Clone`-ing the other (or they
1536 /// were both cloned from a common ancestor); false for two
1537 /// independently `Core::new`-constructed instances even with the
1538 /// same binding.
1539 ///
1540 /// Used by `graphrefly-graph`'s `mount` to enforce the "shared-Core
1541 /// only" v1 invariant — cross-Core mount is post-M6.
1542 #[must_use]
1543 pub fn same_dispatcher(&self, other: &Core) -> bool {
1544 Arc::ptr_eq(&self.state, &other.state)
1545 }
1546
1547 /// Downgrade to a [`WeakCore`] handle that doesn't contribute to
1548 /// strong refcount of the underlying state / binding / wave_owner.
1549 ///
1550 /// Used by binding-stored long-lived closures (e.g.
1551 /// `register_producer_build`-stored `ProducerBuildFn`s) to avoid the
1552 /// Arc cycle:
1553 ///
1554 /// ```text
1555 /// BenchBinding → registry → producer_builds[fn_id]
1556 /// → closure → strong Arc<dyn _Binding> → BenchBinding
1557 /// ```
1558 ///
1559 /// Closures hold `WeakCore` and `Weak<dyn _Binding>` instead, then
1560 /// upgrade-on-fire (returning early if either weak is dangling —
1561 /// indicating the host BenchCore was already dropped). Upgraded
1562 /// strong refs live only for the build closure's invocation; sinks
1563 /// the build closure spawns close over those upgraded strongs and
1564 /// stay alive only while the producer is active (cleared via
1565 /// `producer_deactivate` on last-subscriber unsubscribe).
1566 #[must_use]
1567 pub fn weak_handle(&self) -> WeakCore {
1568 WeakCore {
1569 state: Arc::downgrade(&self.state),
1570 binding: Arc::downgrade(&self.binding),
1571 wave_owner: Arc::downgrade(&self.wave_owner),
1572 registry: Arc::downgrade(&self.registry),
1573 }
1574 }
1575
1576 /// Number of distinct connected-component partitions tracked by
1577 /// the per-subgraph union-find registry (Slice X5 substrate).
1578 /// Two threads emitting into nodes with distinct partitions will
1579 /// run truly parallel once Y1 wires the wave engine through the
1580 /// registry; X5 reports the partition count for inspection
1581 /// (acceptance bar + debugging) but the wave engine still uses
1582 /// the legacy Core-level `wave_owner`.
1583 #[must_use]
1584 pub fn partition_count(&self) -> usize {
1585 self.registry.lock().component_count()
1586 }
1587
1588 /// Resolve `node`'s partition identity per the per-subgraph
1589 /// union-find registry (Slice X5 substrate). Two nodes with the
1590 /// same `SubgraphId` are connected via dep edges (transitively)
1591 /// and share a partition lock under Y1+; nodes in different
1592 /// partitions can run truly parallel.
1593 ///
1594 /// Returns `None` for unregistered nodes.
1595 #[must_use]
1596 pub fn partition_of(&self, node: NodeId) -> Option<crate::subgraph::SubgraphId> {
1597 self.registry.lock().partition_of(node)
1598 }
1599
1600 /// Test-only inspection: number of `PendingBatch`es queued for
1601 /// `node` in the current wave. Used by Slice X4 D2 regression
1602 /// tests to pin the "common case = single batch, no SmallVec
1603 /// spill" perf invariant.
1604 ///
1605 /// Returns `None` if no `pending_notify` entry exists for `node`
1606 /// (no tier-1+ message has been queued for this node yet in this
1607 /// wave). `Some(0)` is unreachable by construction (a vacant
1608 /// entry implies no batches; an occupied entry has at least one).
1609 #[cfg(any(test, debug_assertions))]
1610 #[must_use]
1611 pub fn pending_batch_count(&self, node: NodeId) -> Option<usize> {
1612 self.lock_state()
1613 .pending_notify
1614 .get(&node)
1615 .map(|entry| entry.batches.len())
1616 }
1617
1618 /// Configure the Core-global cap on pause replay buffer length. When set,
1619 /// any per-node pause buffer that would exceed `cap` drops the oldest
1620 /// message(s) from the front; the dropped count is reported back via the
1621 /// resume callback (see [`ResumeReport`]). `None` (default) means
1622 /// unbounded; messages buffer indefinitely until the lockset clears.
1623 pub fn set_pause_buffer_cap(&self, cap: Option<usize>) {
1624 self.lock_state().pause_buffer_cap = cap;
1625 }
1626
1627 /// Configure the replay buffer cap on `node_id` (R2.6.5 / Lock 6.G —
1628 /// Slice E1, 2026-05-07). `None` disables the buffer. `Some(N)` keeps
1629 /// the last `N` DATA emissions in a circular buffer; late subscribers
1630 /// receive them as part of the per-tier handshake (between START and
1631 /// any terminal). Switching from a larger cap to a smaller cap evicts
1632 /// the front of the buffer to fit; switching to `None` drains the
1633 /// buffer entirely. Each evicted/drained handle's retain is released
1634 /// back to the binding.
1635 ///
1636 /// # Panics
1637 ///
1638 /// Panics if `node_id` is not registered.
1639 pub fn set_replay_buffer_cap(&self, node_id: NodeId, cap: Option<usize>) {
1640 // QA A7 (2026-05-07): normalize `Some(0)` to `None`. Two ways to
1641 // express "disabled" is confusing: `push_replay_buffer` already
1642 // treats `Some(0)` as no-op, so persisting it adds nothing.
1643 let cap = match cap {
1644 Some(0) => None,
1645 other => other,
1646 };
1647 let to_release: Vec<HandleId> = {
1648 let mut s = self.lock_state();
1649 let rec = s.require_node_mut(node_id);
1650 rec.replay_buffer_cap = cap;
1651 match cap {
1652 None => rec.replay_buffer.drain(..).collect(),
1653 Some(c) => {
1654 let mut drained = Vec::new();
1655 while rec.replay_buffer.len() > c {
1656 if let Some(h) = rec.replay_buffer.pop_front() {
1657 drained.push(h);
1658 }
1659 }
1660 drained
1661 }
1662 }
1663 };
1664 for h in to_release {
1665 self.binding.release_handle(h);
1666 }
1667 }
1668
1669 /// Reconfigure the pause mode for `node_id` (canonical §2.6 — Slice F
1670 /// audit close, 2026-05-07). Default for new nodes is
1671 /// [`PausableMode::Default`]; switch to [`PausableMode::ResumeAll`]
1672 /// for nodes whose pause-window emit history must be observable
1673 /// verbatim, or [`PausableMode::Off`] for nodes intrinsically
1674 /// pause-immune.
1675 ///
1676 /// # Errors
1677 ///
1678 /// - [`SetPausableModeError::UnknownNode`] — `node_id` is not
1679 /// registered.
1680 /// - [`SetPausableModeError::WhilePaused`] — the node currently
1681 /// holds at least one pause lock. Changing mode mid-pause would
1682 /// lose buffered content or strand a `pending_wave` flag — resume
1683 /// all locks first.
1684 pub fn set_pausable_mode(
1685 &self,
1686 node_id: NodeId,
1687 mode: PausableMode,
1688 ) -> Result<(), SetPausableModeError> {
1689 let mut s = self.lock_state();
1690 let rec = s
1691 .nodes
1692 .get_mut(&node_id)
1693 .ok_or(SetPausableModeError::UnknownNode(node_id))?;
1694 if rec.pause_state.is_paused() {
1695 return Err(SetPausableModeError::WhilePaused);
1696 }
1697 rec.pausable = mode;
1698 Ok(())
1699 }
1700
1701 /// Configure the wave-drain iteration cap (R4.3 / Lock 2.F′). The wave
1702 /// engine aborts a drain after `cap` iterations with a diagnostic panic.
1703 /// Default is `10_000` — high enough to avoid false positives on legitimate
1704 /// fan-in cascades, low enough to surface runtime cycles within seconds.
1705 ///
1706 /// Lower this only when running adversarial / property-based tests that
1707 /// want fast cycle detection. Raise it only with concrete evidence that a
1708 /// legitimate workload needs more iterations than the default — and even
1709 /// then, prefer to tune the workload (per-subgraph batching, etc.) over
1710 /// raising the cap.
1711 ///
1712 /// # Panics
1713 ///
1714 /// Panics if `cap == 0` — a zero cap would abort every wave on the very
1715 /// first iteration, deadlocking any subsequent dispatcher work.
1716 pub fn set_max_batch_drain_iterations(&self, cap: u32) {
1717 assert!(cap > 0, "max_batch_drain_iterations must be > 0");
1718 self.lock_state().max_batch_drain_iterations = cap;
1719 }
1720
1721 /// Send a message UPSTREAM from `node_id` to each of its declared deps
1722 /// (canonical R1.4.1 — Slice F audit, F2 / 2026-05-07).
1723 ///
1724 /// The dispatcher rejects tier-3 (DATA / RESOLVED) and tier-5
1725 /// (COMPLETE / ERROR) per R1.4.1: value and terminal-lifecycle planes
1726 /// are downstream-only. All other tiers (0 START, 1 DIRTY, 2 PAUSE /
1727 /// RESUME, 4 INVALIDATE, 6 TEARDOWN) pass.
1728 ///
1729 /// # Routing per tier
1730 ///
1731 /// - **Tier 0 ([`Message::Start`]):** no-op. START is a per-subscription
1732 /// handshake, not a routable wire signal — sending it upstream has no
1733 /// well-defined target.
1734 /// - **Tier 1 ([`Message::Dirty`]):** no-op. The dep's "something
1735 /// changed" notification is its own [`Self::emit`] / commit
1736 /// responsibility; ignoring upstream DIRTY hints is safe.
1737 /// - **Tier 2 ([`Message::Pause`] / [`Message::Resume`]):** translates
1738 /// to [`Self::pause`] / [`Self::resume`] on each dep. Lock id is
1739 /// forwarded verbatim. Errors from individual deps are accumulated
1740 /// in the `dep_errors` field of the returned report.
1741 /// - **Tier 4 ([`Message::Invalidate`]):** translates to
1742 /// [`Self::invalidate`] on each dep. Note: canonical R1.4.2
1743 /// distinguishes "downstream INVALIDATE" (cache clear + cascade) from
1744 /// "upstream INVALIDATE" (plain forward, no self-process). The Rust
1745 /// port v1 SIMPLIFICATION delegates to the same `Core::invalidate`
1746 /// path — upstream INVALIDATE here DOES clear dep caches and cascade.
1747 /// If a "plain forward" mode surfaces as a real consumer need, add
1748 /// `up_with_options`.
1749 /// - **Tier 6 ([`Message::Teardown`]):** translates to
1750 /// [`Self::teardown`] on each dep. Cascades per the standard
1751 /// teardown path.
1752 ///
1753 /// # Errors
1754 ///
1755 /// - [`UpError::UnknownNode`] — `node_id` is not registered.
1756 /// - [`UpError::TierForbidden`] — tier 3 or tier 5.
1757 pub fn up(&self, node_id: NodeId, message: Message) -> Result<(), UpError> {
1758 // QA A10 (2026-05-07): check unknown node BEFORE tier rejection
1759 // for consistent error UX — `up(unknown, Data)` and
1760 // `up(unknown, Pause)` both report `UnknownNode` rather than
1761 // splitting on the tier.
1762 let dep_ids: Vec<NodeId> = {
1763 let s = self.lock_state();
1764 let rec = s.nodes.get(&node_id).ok_or(UpError::UnknownNode(node_id))?;
1765 rec.dep_ids_vec()
1766 };
1767 let tier = message.tier();
1768 if tier == 3 || tier == 5 {
1769 return Err(UpError::TierForbidden { tier });
1770 }
1771 for dep_id in dep_ids {
1772 match message {
1773 Message::Pause(lock) => {
1774 let _ = self.pause(dep_id, lock);
1775 }
1776 Message::Resume(lock) => {
1777 let _ = self.resume(dep_id, lock);
1778 }
1779 Message::Invalidate => {
1780 self.invalidate(dep_id);
1781 }
1782 Message::Teardown => {
1783 self.teardown(dep_id);
1784 }
1785 // Tier 0 START + tier 1 DIRTY: no-op upstream per the
1786 // routing table above.
1787 _ => {}
1788 }
1789 }
1790 Ok(())
1791 }
1792
1793 /// Allocate a unique [`LockId`] for use with [`Self::pause`] /
1794 /// [`Self::resume`]. Convenience for callers that don't already have an
1795 /// id-allocation scheme; user-supplied ids work too.
1796 #[must_use]
1797 pub fn alloc_lock_id(&self) -> LockId {
1798 let mut s = self.lock_state();
1799 let id = LockId::new(s.next_lock_id);
1800 s.next_lock_id += 1;
1801 id
1802 }
1803
1804 // -------------------------------------------------------------------
1805 // Registration — unified `register()` (D030, Slice D)
1806 //
1807 // All node kinds (State / Producer / Derived / Dynamic / Operator)
1808 // funnel through `Core::register(NodeRegistration) -> NodeId`. Sugar
1809 // wrappers (`register_state` / `register_producer` / `register_derived`
1810 // / `register_dynamic` / `register_operator`) build a `NodeRegistration`
1811 // and delegate. There is no parallel registration path internally.
1812 // -------------------------------------------------------------------
1813
1814 /// Unified node registration (D030).
1815 ///
1816 /// `reg` describes the node's identity (deps + closure-form fn id OR
1817 /// typed-op + per-kind opts). The kind is **derived from the field
1818 /// shape**, not stored — see [`NodeKind`].
1819 ///
1820 /// Sugar wrappers below ([`Self::register_state`],
1821 /// [`Self::register_producer`], [`Self::register_derived`],
1822 /// [`Self::register_dynamic`], [`Self::register_operator`]) build the
1823 /// registration for the common kinds and delegate here. Direct callers
1824 /// that need uncommon combinations (e.g., a partial-true derived) can
1825 /// invoke this method directly.
1826 ///
1827 /// # Errors
1828 ///
1829 /// Errors are returned in evaluation order — earlier phases short-circuit
1830 /// later ones, so a single registration produces at most one variant.
1831 ///
1832 /// **Phase 1 — lock-released, side-effect-free validation:**
1833 /// - [`RegisterError::OperatorWithoutDeps`] — `reg` carries an op but
1834 /// `deps` is empty. Operator nodes need at least one dep — for
1835 /// subscription-managed combinators with no declared deps, use
1836 /// [`Self::register_producer`] instead.
1837 /// - [`RegisterError::InitialOnlyForStateNodes`] — `reg.opts.initial`
1838 /// is non-sentinel for a non-state shape (deps non-empty, or
1839 /// fn_or_op present). State nodes are the only kind with an initial
1840 /// cache.
1841 ///
1842 /// **Phase 2 — operator scratch construction (lock-released):**
1843 /// - [`RegisterError::OperatorSeedSentinel`] — `reg` carries `Op(Scan)`
1844 /// / `Op(Reduce)` with a `NO_HANDLE` seed. R2.5.3 — stateful folders
1845 /// must have a real seed.
1846 ///
1847 /// **Phase 3 — state-lock validation (folded with insertion under a
1848 /// single lock acquisition per /qa F1 to prevent TOCTOU between
1849 /// validation and `nodes.insert`):**
1850 /// - [`RegisterError::UnknownDep`] — any element of `reg.deps` is not
1851 /// a registered node id.
1852 /// - [`RegisterError::TerminalDep`] — a dep is terminal (COMPLETE /
1853 /// ERROR) AND not resubscribable — would create a permanent wedge.
1854 ///
1855 /// All errors are construction-time invariants — the dispatcher
1856 /// rejects the registration before any reactive state is created.
1857 /// On `Err`, no node has been added and any handle retains taken on
1858 /// the way in (operator scratch seed retains via
1859 /// [`BindingBoundary::retain_handle`]) have been released
1860 /// lock-released — see [`ScratchReleaseGuard`] for the RAII
1861 /// discipline that covers both early-return AND unwind paths.
1862 /// `Last { default }` retains its `default` handle on the same
1863 /// release path.
1864 pub fn register(&self, reg: NodeRegistration) -> Result<NodeId, RegisterError> {
1865 let NodeRegistration {
1866 deps,
1867 fn_or_op,
1868 opts,
1869 } = reg;
1870 let NodeOpts {
1871 initial,
1872 equals,
1873 partial,
1874 is_dynamic,
1875 pausable,
1876 replay_buffer,
1877 } = opts;
1878
1879 // Derive the field shape from fn_or_op + deps.
1880 let (fn_id, op) = match fn_or_op {
1881 Some(NodeFnOrOp::Fn(f)) => (Some(f), None),
1882 Some(NodeFnOrOp::Op(o)) => (None, Some(o)),
1883 None => (None, None),
1884 };
1885
1886 // Phase 1 — lock-released, side-effect-free validation. Errors
1887 // here return BEFORE any handle retain is taken.
1888 //
1889 // - State (no deps + no fn + no op) is the only kind with `initial`.
1890 // - Dynamic flag only meaningful when fn + non-empty deps.
1891 // - Operator (op present) must have deps (P9: operator without deps
1892 // would skip activation — use a producer instead).
1893 let is_state_shape = deps.is_empty() && fn_id.is_none() && op.is_none();
1894 if op.is_some() && deps.is_empty() {
1895 return Err(RegisterError::OperatorWithoutDeps);
1896 }
1897 if initial != NO_HANDLE && !is_state_shape {
1898 return Err(RegisterError::InitialOnlyForStateNodes);
1899 }
1900
1901 // Phase 2 — build per-operator scratch struct (may take handle
1902 // retains via `binding.retain_handle` for Scan/Reduce/Last seed).
1903 // Lock-released per Slice E (D045) handshake discipline. Returns
1904 // `OperatorSeedSentinel` BEFORE retain so an Err leaves no
1905 // dangling handles.
1906 let scratch = match op {
1907 Some(operator_op) => self.make_op_scratch(operator_op)?,
1908 None => None,
1909 };
1910
1911 // Wrap scratch in an RAII guard immediately after Phase 2. From
1912 // here on, ANY early return / unwind path correctly releases the
1913 // scratch's handle retains via `OperatorScratch::release_handles`
1914 // (Slice H /qa F2 — defense against panics between Phase 2 and
1915 // Phase 3 cleanup branch). Lock-released because the guard is
1916 // declared BEFORE `lock_state()` below — variable destruction
1917 // order is reverse declaration order, so the `MutexGuard` drops
1918 // first on any return path.
1919 let scratch_guard = ScratchReleaseGuard::new(scratch, &*self.binding);
1920
1921 // Phase 3 — state-lock-required validation, FOLDED with insertion
1922 // under a single `lock_state()` acquisition per /qa F1. The
1923 // pre-/qa version split this into two acquisitions (one for
1924 // validation, one for `alloc_node_id` + `nodes.insert`), opening
1925 // a TOCTOU window where a concurrent `Core::complete(dep)` on a
1926 // non-resubscribable dep could slip in and recreate the wedge
1927 // `TerminalDep` was designed to prevent. Single locked region
1928 // closes the gap.
1929 let mut s = self.lock_state();
1930
1931 for &dep in &deps {
1932 if !s.nodes.contains_key(&dep) {
1933 return Err(RegisterError::UnknownDep(dep));
1934 }
1935 }
1936 // Slice F audit (2026-05-07): mirror `set_deps`'s `TerminalDep`
1937 // rejection at registration time. Adding a non-resubscribable
1938 // terminal node as a dep at registration creates a permanent wedge.
1939 for &dep in &deps {
1940 let dep_rec = s.require_node(dep);
1941 if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
1942 return Err(RegisterError::TerminalDep(dep));
1943 }
1944 }
1945
1946 // Validation passed — install. Take scratch out of the guard
1947 // (disarms the release-on-drop) and continue using `s`.
1948 let installed_scratch = scratch_guard.take();
1949
1950 let id = s.alloc_node_id();
1951
1952 // `tracked`: Static derived + Operator track all deps; Dynamic
1953 // starts empty and fills via fn return; State / Producer have no
1954 // deps so tracked is empty.
1955 let tracked: HashSet<usize> = if op.is_some() {
1956 (0..deps.len()).collect()
1957 } else if is_dynamic {
1958 HashSet::new()
1959 } else if fn_id.is_some() && !deps.is_empty() {
1960 // Static derived
1961 (0..deps.len()).collect()
1962 } else {
1963 HashSet::new()
1964 };
1965
1966 let dep_records: Vec<DepRecord> = deps.iter().map(|&d| DepRecord::new(d)).collect();
1967
1968 let rec = NodeRecord {
1969 dep_records,
1970 fn_id,
1971 op,
1972 is_dynamic,
1973 equals,
1974 cache: initial,
1975 has_fired_once: initial != NO_HANDLE,
1976 subscribers: HashMap::new(),
1977 subscribers_revision: 0,
1978 tracked,
1979 dirty: false,
1980 involved_this_wave: false,
1981 pause_state: PauseState::Active,
1982 pausable,
1983 replay_buffer_cap: replay_buffer,
1984 replay_buffer: VecDeque::new(),
1985 terminal: None,
1986 has_received_teardown: false,
1987 resubscribable: false,
1988 meta_companions: Vec::new(),
1989 partial,
1990 op_scratch: installed_scratch,
1991 };
1992 s.nodes.insert(id, rec);
1993 s.children.insert(id, HashSet::new());
1994 for &dep in &deps {
1995 s.children.entry(dep).or_default().insert(id);
1996 }
1997 drop(s);
1998 // Slice X5 (D3 substrate, 2026-05-08): track partition membership.
1999 // Register the new node as its own component, then `union_nodes`
2000 // each dep — connectivity-based grouping per Q1=(c-uf split-eager).
2001 // Lock-released wrt state so the registry mutex isn't ordered
2002 // under the state mutex (avoids constraining future cross-mutex
2003 // refactors).
2004 {
2005 let mut reg = self.registry.lock();
2006 reg.ensure_registered(id);
2007 for &dep in &deps {
2008 reg.union_nodes(id, dep);
2009 }
2010 }
2011 self.fire_topology_event(&crate::topology::TopologyEvent::NodeRegistered(id));
2012 Ok(id)
2013 }
2014
2015 /// Sugar over [`Self::register`] — register a state node. `initial`
2016 /// may be [`NO_HANDLE`] to start sentinel.
2017 ///
2018 /// `partial` is accepted for surface consistency (D019); for state
2019 /// nodes it has no effect (state nodes don't fire fn).
2020 ///
2021 /// # Errors
2022 ///
2023 /// State registration is structurally simple — no deps, no op — so
2024 /// the only reachable variant is none in practice. Returns
2025 /// [`Result`] for surface consistency with [`Self::register`].
2026 pub fn register_state(
2027 &self,
2028 initial: HandleId,
2029 partial: bool,
2030 ) -> Result<NodeId, RegisterError> {
2031 self.register(NodeRegistration {
2032 deps: Vec::new(),
2033 fn_or_op: None,
2034 opts: NodeOpts {
2035 initial,
2036 partial,
2037 ..NodeOpts::default()
2038 },
2039 })
2040 }
2041
2042 /// Sugar over [`Self::register`] — register a producer node (D031,
2043 /// Slice D). No deps; fn fires once on first subscribe; cleanup runs
2044 /// via [`BindingBoundary::producer_deactivate`] when the last
2045 /// subscriber unsubscribes.
2046 ///
2047 /// The fn body uses the binding's `ProducerCtx`-equivalent helper
2048 /// (see `graphrefly-operators::producer`) to subscribe to other Core
2049 /// nodes — the zip / concat / race / takeUntil pattern.
2050 ///
2051 /// # Errors
2052 ///
2053 /// Producer registration has no user-supplied deps, so structurally
2054 /// none of [`RegisterError`]'s variants are reachable. Returns
2055 /// [`Result`] for surface consistency with [`Self::register`].
2056 pub fn register_producer(&self, fn_id: FnId) -> Result<NodeId, RegisterError> {
2057 self.register(NodeRegistration {
2058 deps: Vec::new(),
2059 fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2060 opts: NodeOpts {
2061 // Producers have no deps — the first-run gate is degenerate.
2062 partial: true,
2063 ..NodeOpts::default()
2064 },
2065 })
2066 }
2067
2068 /// Sugar over [`Self::register`] — register a derived (static) node.
2069 /// `partial` controls the R2.5.3 first-run gate (D011).
2070 ///
2071 /// # Errors
2072 ///
2073 /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2074 /// registered.
2075 /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2076 /// resubscribable.
2077 pub fn register_derived(
2078 &self,
2079 deps: &[NodeId],
2080 fn_id: FnId,
2081 equals: EqualsMode,
2082 partial: bool,
2083 ) -> Result<NodeId, RegisterError> {
2084 self.register(NodeRegistration {
2085 deps: deps.to_vec(),
2086 fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2087 opts: NodeOpts {
2088 equals,
2089 partial,
2090 ..NodeOpts::default()
2091 },
2092 })
2093 }
2094
2095 /// Sugar over [`Self::register`] — register a dynamic node (fn
2096 /// declares its actually-tracked dep indices per fire). `partial`
2097 /// controls the R2.5.3 first-run gate (D011).
2098 ///
2099 /// # Errors
2100 ///
2101 /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2102 /// registered.
2103 /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2104 /// resubscribable.
2105 pub fn register_dynamic(
2106 &self,
2107 deps: &[NodeId],
2108 fn_id: FnId,
2109 equals: EqualsMode,
2110 partial: bool,
2111 ) -> Result<NodeId, RegisterError> {
2112 self.register(NodeRegistration {
2113 deps: deps.to_vec(),
2114 fn_or_op: Some(NodeFnOrOp::Fn(fn_id)),
2115 opts: NodeOpts {
2116 equals,
2117 partial,
2118 is_dynamic: true,
2119 ..NodeOpts::default()
2120 },
2121 })
2122 }
2123
2124 /// Build a fresh [`OperatorScratch`](crate::op_state::OperatorScratch)
2125 /// box for an operator variant, taking any required handle retains.
2126 /// Shared between `register_operator` (initial install) and
2127 /// `reset_for_fresh_lifecycle` (resubscribable cycle re-install).
2128 ///
2129 /// # Errors
2130 ///
2131 /// Returns [`RegisterError::OperatorSeedSentinel`] if `op` is `Scan`
2132 /// / `Reduce` with a [`NO_HANDLE`] seed (R2.5.3 — stateful folders
2133 /// must have a real seed). Refcount discipline: the seed-sentinel
2134 /// check happens BEFORE [`BindingBoundary::retain_handle`], so an
2135 /// `Err` leaves no handles dangling.
2136 fn make_op_scratch(
2137 &self,
2138 op: OperatorOp,
2139 ) -> Result<Option<Box<dyn crate::op_state::OperatorScratch>>, RegisterError> {
2140 use crate::op_state::{
2141 DistinctState, LastState, PairwiseState, ReduceState, ScanState, SkipState, TakeState,
2142 TakeWhileState,
2143 };
2144 // Slice H (2026-05-07): Scan/Reduce seed-sentinel checks happen
2145 // BEFORE retain_handle so an Err return leaves no handles dangling.
2146 //
2147 // Slice H /qa F13 (2026-05-07): for retaining variants, allocate
2148 // the `Box<State>` BEFORE calling `binding.retain_handle`. If
2149 // `Box::new` panics (e.g., OOM-as-panic), no retain has happened
2150 // yet — no leak. If `retain_handle` panics after Box succeeds,
2151 // the `Box<State>` is dropped on unwind; State has no handle yet
2152 // (we haven't touched the registry refcount), so still no leak.
2153 // Caller wraps the returned scratch in `ScratchReleaseGuard` to
2154 // cover panics AFTER make_op_scratch returns.
2155 match op {
2156 OperatorOp::Scan { seed, .. } => {
2157 if seed == NO_HANDLE {
2158 return Err(RegisterError::OperatorSeedSentinel);
2159 }
2160 let state = Box::new(ScanState { acc: seed });
2161 self.binding.retain_handle(seed);
2162 Ok(Some(state))
2163 }
2164 OperatorOp::Reduce { seed, .. } => {
2165 if seed == NO_HANDLE {
2166 return Err(RegisterError::OperatorSeedSentinel);
2167 }
2168 let state = Box::new(ReduceState { acc: seed });
2169 self.binding.retain_handle(seed);
2170 Ok(Some(state))
2171 }
2172 OperatorOp::DistinctUntilChanged { .. } => Ok(Some(Box::new(DistinctState::default()))),
2173 OperatorOp::Pairwise { .. } => Ok(Some(Box::new(PairwiseState::default()))),
2174 OperatorOp::Take { .. } => Ok(Some(Box::new(TakeState::default()))),
2175 OperatorOp::Skip { .. } => Ok(Some(Box::new(SkipState::default()))),
2176 OperatorOp::TakeWhile { .. } => Ok(Some(Box::new(TakeWhileState))),
2177 OperatorOp::Last { default } => {
2178 let state = Box::new(LastState {
2179 latest: NO_HANDLE,
2180 default,
2181 });
2182 if default != NO_HANDLE {
2183 self.binding.retain_handle(default);
2184 }
2185 Ok(Some(state))
2186 }
2187 OperatorOp::Map { .. }
2188 | OperatorOp::Filter { .. }
2189 | OperatorOp::Combine { .. }
2190 | OperatorOp::WithLatestFrom { .. }
2191 | OperatorOp::Merge => Ok(None),
2192 }
2193 }
2194
2195 /// Sugar over [`Self::register`] — register a built-in operator node
2196 /// (Slice C-1, D009; D026 generic scratch). The operator dispatch path
2197 /// lives in `fire_operator`; `op` selects which per-operator FFI
2198 /// method on [`BindingBoundary`] gets called per fire.
2199 ///
2200 /// For stateful operators ([`OperatorOp::Scan`] / [`Reduce`] /
2201 /// [`Last`] with a default), the seed/default handle is captured
2202 /// into the appropriate
2203 /// [`OperatorScratch`](crate::op_state::OperatorScratch) struct
2204 /// stored at [`NodeRecord::op_scratch`], and Core takes one retain
2205 /// share via [`BindingBoundary::retain_handle`].
2206 ///
2207 /// # Errors
2208 ///
2209 /// - [`RegisterError::OperatorWithoutDeps`] — `deps` is empty (use
2210 /// [`Self::register_producer`] instead).
2211 /// - [`RegisterError::OperatorSeedSentinel`] — `op` is
2212 /// [`OperatorOp::Scan`] / [`OperatorOp::Reduce`] with a
2213 /// [`NO_HANDLE`] seed.
2214 /// - [`RegisterError::UnknownDep`] — any element of `deps` is not
2215 /// registered.
2216 /// - [`RegisterError::TerminalDep`] — a dep is terminal and not
2217 /// resubscribable.
2218 pub fn register_operator(
2219 &self,
2220 deps: &[NodeId],
2221 op: OperatorOp,
2222 opts: OperatorOpts,
2223 ) -> Result<NodeId, RegisterError> {
2224 self.register(NodeRegistration {
2225 deps: deps.to_vec(),
2226 fn_or_op: Some(NodeFnOrOp::Op(op)),
2227 opts: NodeOpts {
2228 equals: opts.equals,
2229 partial: opts.partial,
2230 ..NodeOpts::default()
2231 },
2232 })
2233 }
2234
2235 // -------------------------------------------------------------------
2236 // Subscription
2237 // -------------------------------------------------------------------
2238
2239 /// Subscribe a sink to a node. Returns a [`Subscription`] handle —
2240 /// dropping the handle unsubscribes the sink. Per §10.12, no manual
2241 /// `unsubscribe(node, id)` call is required.
2242 ///
2243 /// Push-on-subscribe (R1.2.3, R2.2.3 step 4): the sink is registered AFTER
2244 /// the START handshake fires. The handshake contents depend on node
2245 /// state:
2246 /// - Sentinel cache + live (non-terminal): `[START]`
2247 /// - Cached + live: `[START, DATA(handle)]`
2248 /// - Cached + terminated (non-resubscribable): `[START, DATA(handle), <terminal>]`
2249 /// - Sentinel + terminated (non-resubscribable): `[START, <terminal>]`
2250 ///
2251 /// Resubscribable terminal lifecycle (R2.2.7 / R2.5.3): if the node was
2252 /// marked resubscribable via [`Self::set_resubscribable`] AND has
2253 /// terminated, the subscribe call first **resets** the node — clears
2254 /// `terminal`, `has_fired_once`, `has_received_teardown`, all
2255 /// `dep_handles` to `NO_HANDLE`, all `dep_terminals` to `None`, and
2256 /// drains the pause lockset. The new subscriber then receives a fresh
2257 /// `[START]` (cache may survive for state nodes; sentinel for compute).
2258 ///
2259 /// Activation (R2.2.3 step 5): if this is the first subscriber and the
2260 /// node is a derived/dynamic compute, recursively activate deps so their
2261 /// cached handles fill our `dep_handles`.
2262 #[allow(clippy::needless_pass_by_value)] // Sink is `Arc<dyn Fn>`; we clone for the subscribers map and call it directly. Taking by value matches the ergonomics callers expect.
2263 pub fn subscribe(&self, node_id: NodeId, sink: Sink) -> Subscription {
2264 // Subscribe protocol (Slice E rework, post-handshake-reentry-lift):
2265 //
2266 // 1. Acquire `wave_owner` first (re-entrant; same-thread passes
2267 // through, cross-thread blocks). This is the cross-thread
2268 // serialization point that preserves R1.3.5.a happens-after
2269 // ordering across the lock-released handshake fire.
2270 // 2. Acquire state lock briefly: alloc sub_id, run resubscribable
2271 // reset if applicable, snapshot handshake state, install sink
2272 // in `subscribers`. Drop state lock.
2273 // 3. Fire handshake LOCK-RELEASED. Per-tier slices (R1.3.5.a):
2274 // `[Start]` / `[Data(cache)]?` / `[Complete]?` / `[Error(h)]?`
2275 // / `[Teardown]?`. Empty tiers are skipped. Sink callbacks
2276 // may re-enter Core freely — same-thread re-entry passes
2277 // through `wave_owner` reentrantly.
2278 // 4. Run activation under `run_wave` if needed (first subscriber
2279 // on a non-state node).
2280 // 5. Drop `wave_owner`.
2281 //
2282 // Race-fix discipline: the sink is installed in `subscribers`
2283 // BEFORE the state lock drops, so concurrent threads that
2284 // acquire `wave_owner` after our scope sees the sink already
2285 // registered. Cross-thread emits block on `wave_owner` until
2286 // we drop it, ensuring all our handshake calls land before
2287 // any concurrent wave's flush observes the sink.
2288
2289 // Acquire wave_owner first — cross-thread serialization point.
2290 // `lock_arc()` is `!Send`; same-thread reentrant.
2291 let _wave_guard = self.wave_owner.lock_arc();
2292
2293 let (sub_id, tier_slices, needs_activation, did_reset) = {
2294 let mut s = self.lock_state();
2295 let sub_id = s.alloc_sub_id();
2296
2297 // Resubscribable reset: terminal + flagged → clear lifecycle
2298 // state so the incoming subscriber starts fresh. F3 audit
2299 // guard: a node that has received TEARDOWN (R2.6.4) is
2300 // permanently destroyed at this layer; resurrecting it via a
2301 // late subscribe is a category error. COMPLETE/ERROR is
2302 // recoverable for resubscribable nodes; TEARDOWN is not. The
2303 // handshake will still replay the terminal in the non-reset
2304 // branch so the late subscriber sees a clean
2305 // `[START, ?DATA, COMPLETE|ERROR, TEARDOWN]` stream.
2306 let needs_reset = {
2307 let rec = s.require_node(node_id);
2308 rec.resubscribable && rec.terminal.is_some() && !rec.has_received_teardown
2309 };
2310 if needs_reset {
2311 self.reset_for_fresh_lifecycle(&mut s, node_id);
2312 }
2313
2314 // Snapshot handshake state under lock.
2315 let (cache, is_state, first_subscriber, terminal, torn_down) = {
2316 let rec = s.require_node(node_id);
2317 (
2318 rec.cache,
2319 rec.is_state(),
2320 rec.subscribers.is_empty(),
2321 rec.terminal,
2322 rec.has_received_teardown,
2323 )
2324 };
2325
2326 // Build per-tier handshake slices. Each non-empty slice is
2327 // fired as a separate sink call (R1.3.5.a tier-split).
2328 let mut tier_slices: SmallVec<[Vec<Message>; 4]> = SmallVec::new();
2329 tier_slices.push(vec![Message::Start]);
2330 if cache != NO_HANDLE {
2331 tier_slices.push(vec![Message::Data(cache)]);
2332 }
2333 // Slice E1 (R2.6.5 / Lock 6.G): replay buffered DATA between
2334 // [Start] (and the cache slice, if present) and any terminal.
2335 // Each buffered handle becomes a separate per-tier slice so
2336 // late subscribers see the historical Data sequence as
2337 // distinct sink calls.
2338 //
2339 // Dedupe: when a cache slice is present and the buffer's last
2340 // entry is the same handle (the typical case — cache always
2341 // tracks the last DATA emitted, and the buffer's tail entry
2342 // is that same DATA), skip the last buffer entry to avoid
2343 // delivering Data(cache) twice. For state nodes whose cache
2344 // survives unsubscribe, the buffer may have older entries
2345 // the cache doesn't reflect; the dedupe only drops the
2346 // single trailing entry that equals cache. (QA A1, 2026-05-07)
2347 let replay_handles: Vec<HandleId> = {
2348 let rec = s.require_node(node_id);
2349 let cap = rec.replay_buffer_cap.unwrap_or(0);
2350 if cap == 0 {
2351 Vec::new()
2352 } else {
2353 let mut v: Vec<HandleId> = rec.replay_buffer.iter().copied().collect();
2354 if cache != NO_HANDLE && v.last() == Some(&cache) {
2355 v.pop();
2356 }
2357 v
2358 }
2359 };
2360 for h in &replay_handles {
2361 tier_slices.push(vec![Message::Data(*h)]);
2362 }
2363 if let Some(t) = terminal {
2364 tier_slices.push(vec![match t {
2365 TerminalKind::Complete => Message::Complete,
2366 TerminalKind::Error(h) => Message::Error(h),
2367 }]);
2368 }
2369 if torn_down {
2370 tier_slices.push(vec![Message::Teardown]);
2371 }
2372
2373 // Install sink BEFORE dropping state lock so any thread that
2374 // subsequently acquires `wave_owner` (after our scope ends)
2375 // sees the sink already registered.
2376 //
2377 // Slice X4 / D2: bump `subscribers_revision` alongside the
2378 // insert so a pending_notify entry opened earlier in the same
2379 // wave (e.g. inside `batch(|| { emit(s, h1); subscribe(s,
2380 // late); emit(s, h2); })`) starts a fresh `PendingBatch` on
2381 // its next `queue_notify` push — making the new sink visible
2382 // to subsequent emits' flush slices, while the pre-subscribe
2383 // batch's snapshot stays frozen so we don't double-deliver
2384 // earlier emits via the wave's flush AND the new sub's
2385 // handshake replay.
2386 {
2387 let rec = s.require_node_mut(node_id);
2388 rec.subscribers.insert(sub_id, sink.clone());
2389 rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
2390 }
2391
2392 let needs_activation = first_subscriber && !is_state;
2393 (sub_id, tier_slices, needs_activation, needs_reset)
2394 // state lock drops here
2395 };
2396
2397 // Slice E2 (R2.4.6 / D055): on resubscribable terminal reset, fire
2398 // `wipe_ctx` LOCK-RELEASED so the binding drops its `NodeCtxState`
2399 // entry (clearing both `store` and any residual `current_cleanup`).
2400 // The new subscriber's first invoke_fn sees a fresh empty store.
2401 // Fires AFTER the state lock drops so the binding's
2402 // `node_ctx.lock()` can't deadlock against Core's state lock — and
2403 // BEFORE the handshake so the wipe is observable before any
2404 // user-visible interaction with the new lifecycle.
2405 if did_reset {
2406 self.binding.wipe_ctx(node_id);
2407 }
2408
2409 // Fire handshake LOCK-RELEASED. Sink may re-enter Core; same-
2410 // thread re-entry passes through `wave_owner` reentrantly.
2411 // Cross-thread emits block at `wave_owner` until our scope ends.
2412 //
2413 // A7 (Slice F, 2026-05-07): per-tier slice fire is wrapped in
2414 // `catch_unwind`. The sink is installed in `subscribers` BEFORE
2415 // the handshake fires (load-bearing — concurrent threads observe
2416 // the sink immediately). If a sink panics on tier N, the panic
2417 // would otherwise unwind out of `subscribe` BEFORE the
2418 // `Subscription` handle is constructed, leaving the sink
2419 // registered in `subscribers` with no user-held handle to drop.
2420 // Subsequent waves' `flush_notifications` would re-fire the
2421 // panicking sink forever.
2422 //
2423 // On panic: remove the sink from `subscribers` (via the
2424 // already-allocated `sub_id`), drop `_wave_guard` cleanly via
2425 // RAII, and resume the unwind so the user observes the panic at
2426 // the call site. Same effect as the user dropping the
2427 // `Subscription` immediately, but pre-emptive.
2428 for slice in &tier_slices {
2429 let sink_clone = sink.clone();
2430 let slice_ref: &[Message] = slice;
2431 let result = catch_unwind(AssertUnwindSafe(|| sink_clone(slice_ref)));
2432 if let Err(panic_payload) = result {
2433 // Remove the orphaned sink. Best-effort: if the node was
2434 // since torn down (e.g., the sink itself called teardown
2435 // before panicking), the entry may already be gone.
2436 {
2437 let mut s = self.lock_state();
2438 if let Some(rec) = s.nodes.get_mut(&node_id) {
2439 rec.subscribers.remove(&sub_id);
2440 // Slice X4 / D2: keep revision-tracked snapshot
2441 // discipline consistent with the install site —
2442 // any pending_notify entry that already absorbed
2443 // the panicking sink under the post-install
2444 // revision should start a fresh batch on its
2445 // next queue_notify push.
2446 rec.subscribers_revision = rec.subscribers_revision.wrapping_add(1);
2447 }
2448 }
2449 std::panic::resume_unwind(panic_payload);
2450 }
2451 }
2452
2453 // Run activation if needed. `run_wave` re-acquires `wave_owner`
2454 // reentrantly + manages its own state-lock acquisition.
2455 if needs_activation {
2456 self.run_wave(|this| {
2457 let mut s = this.lock_state();
2458 this.activate_derived(&mut s, node_id);
2459 });
2460 }
2461
2462 Subscription {
2463 state: Arc::downgrade(&self.state),
2464 node_id,
2465 sub_id,
2466 }
2467 // _wave_guard drops here, releasing wave_owner.
2468 }
2469
2470 /// Mark `node_id` as resubscribable per R2.2.7. Resubscribable nodes
2471 /// reset their terminal-lifecycle state on a fresh subscribe — see
2472 /// [`Self::subscribe`].
2473 ///
2474 /// Configuration call — must be made before the node has any active
2475 /// subscribers, since changing the policy mid-flight would surprise
2476 /// existing observers.
2477 ///
2478 /// # Panics
2479 ///
2480 /// Panics if the node has subscribers (the policy is observable
2481 /// behavior; changing it after the fact would change semantics for
2482 /// existing sinks).
2483 pub fn set_resubscribable(&self, node_id: NodeId, resubscribable: bool) {
2484 let mut s = self.lock_state();
2485 let rec = s.require_node_mut(node_id);
2486 assert!(
2487 rec.subscribers.is_empty(),
2488 "set_resubscribable: node already has subscribers; \
2489 configure resubscribable before any subscribe call"
2490 );
2491 rec.resubscribable = resubscribable;
2492 }
2493
2494 /// Reset a resubscribable node's terminal-lifecycle state. Called from
2495 /// `subscribe` when a late subscriber arrives at a flagged node.
2496 ///
2497 /// Released: terminal-slot retain (Error handle), all per-dep terminal
2498 /// retains (Error handles), all data_batch retains.
2499 /// Cleared: `terminal`, `has_fired_once`, `has_received_teardown`, all
2500 /// dep_records to sentinel, the pause lockset (any held locks are
2501 /// released — replay buffer drops silently because there are no
2502 /// subscribers to flush to).
2503 fn reset_for_fresh_lifecycle(&self, s: &mut CoreState, node_id: NodeId) {
2504 // Phase 1: collect wave-state handle releases + take the old
2505 // op_scratch + reset other state. Take all mutations under one
2506 // borrow so the post-borrow phases don't re-walk dep_records.
2507 let (prev_op, mut old_scratch, handles_to_release, pause_buffer_payloads) = {
2508 let rec = s.require_node_mut(node_id);
2509 let mut hs = Vec::new();
2510 if let Some(TerminalKind::Error(h)) = rec.terminal {
2511 hs.push(h);
2512 }
2513 for dr in &rec.dep_records {
2514 if let Some(TerminalKind::Error(h)) = dr.terminal {
2515 hs.push(h);
2516 }
2517 for &h in &dr.data_batch {
2518 hs.push(h);
2519 }
2520 // Slice C-3 /qa: also release `prev_data`. Prior to this
2521 // collection, `reset_for_fresh_lifecycle` overwrote
2522 // `dr.prev_data = NO_HANDLE` without releasing the old
2523 // handle, leaking one share per dep per resubscribable
2524 // cycle. The leak was masked because no test exercised
2525 // the per-dep `prev_data` retain across a lifecycle
2526 // reset; surfaced by the T1 tightening of
2527 // `last_releases_buffered_latest_on_lifecycle_reset`.
2528 if dr.prev_data != NO_HANDLE {
2529 hs.push(dr.prev_data);
2530 }
2531 }
2532 // Take pause_state's buffer; collect its payload handles for
2533 // release (they were retained at queue_notify time; buffer
2534 // drops because the new subscriber starts fresh).
2535 let mut pulled = Vec::new();
2536 if let PauseState::Paused { ref mut buffer, .. } = rec.pause_state {
2537 for msg in buffer.drain(..) {
2538 if let Some(h) = msg.payload_handle() {
2539 pulled.push(h);
2540 }
2541 }
2542 }
2543 // Slice E1: drain the replay buffer too — the new subscriber
2544 // gets a fresh lifecycle and shouldn't see prior emissions.
2545 for h in rec.replay_buffer.drain(..) {
2546 pulled.push(h);
2547 }
2548 // Reset wave / lifecycle state.
2549 rec.terminal = None;
2550 rec.has_fired_once = rec.cache != NO_HANDLE && rec.is_state();
2551 rec.has_received_teardown = false;
2552 for dr in &mut rec.dep_records {
2553 dr.prev_data = NO_HANDLE;
2554 dr.data_batch.clear();
2555 dr.terminal = None;
2556 dr.dirty = false;
2557 dr.involved_this_wave = false;
2558 }
2559 rec.pause_state = PauseState::Active;
2560 rec.involved_this_wave = false;
2561 rec.dirty = false;
2562 // P7 (Slice A close /qa): Dynamic nodes clear `tracked` so
2563 // the post-reset first fire repopulates from the fn's
2564 // returned tracked-deps set.
2565 if rec.is_dynamic {
2566 rec.tracked.clear();
2567 }
2568 // Take the old scratch out so we can release its handles and
2569 // install a fresh one. Operator op is copied for the
2570 // rebuild step below.
2571 let prev_op = rec.op;
2572 let old = std::mem::take(&mut rec.op_scratch);
2573 (prev_op, old, hs, pulled)
2574 };
2575
2576 // Phase 2 (Slice C-3 /qa P1 — RETAIN-BEFORE-RELEASE ordering):
2577 // build the fresh scratch FIRST, taking new retains on any
2578 // seed/default handles. This must run BEFORE Phase 3 releases
2579 // the old scratch's shares — if old `acc` (Scan/Reduce) or old
2580 // `latest` (Last) aliases the new `seed`/`default` (common:
2581 // `fold(seed, x) == seed` interns to the same registry entry),
2582 // releasing the old share first could collapse the binding's
2583 // registry slot to zero (production bindings remove the value
2584 // entry on refcount-zero — see `tests/common/mod.rs:191-204`),
2585 // and a subsequent `retain_handle` on the new seed would bump a
2586 // refcount on a slot whose value has been removed. By taking
2587 // the new retains first, we floor the refcount at ≥1 before
2588 // any release happens.
2589 let new_scratch = match prev_op {
2590 // Slice H: the OperatorOp stored on NodeRecord previously
2591 // passed `make_op_scratch` validation at registration time
2592 // (RegisterError::OperatorSeedSentinel for Scan/Reduce
2593 // sentinel seeds; Last { default: NO_HANDLE } is accepted
2594 // and never errors). Re-running it here on the same op
2595 // value is structurally guaranteed to succeed.
2596 Some(op) => self
2597 .make_op_scratch(op)
2598 .expect("invariant: stored OperatorOp passed make_op_scratch validation at registration time"),
2599 None => None,
2600 };
2601
2602 // Phase 3: NOW release handles owned by the old op_scratch
2603 // (Scan/Reduce acc, Distinct/Pairwise prev, Last latest +
2604 // default). Safe per Phase 2's retain-first floor. The boxed
2605 // value is consumed and dropped after.
2606 if let Some(scratch) = old_scratch.as_mut() {
2607 scratch.release_handles(&*self.binding);
2608 }
2609 drop(old_scratch);
2610
2611 // Phase 4: install the fresh scratch.
2612 {
2613 let rec = s.require_node_mut(node_id);
2614 rec.op_scratch = new_scratch;
2615 }
2616
2617 // Phase 5: release wave-state handles collected in phase 1.
2618 for h in handles_to_release {
2619 self.binding.release_handle(h);
2620 }
2621 for h in pause_buffer_payloads {
2622 self.binding.release_handle(h);
2623 }
2624 }
2625
2626 /// Activate `root` and any transitive uncached compute deps so their
2627 /// caches fill our dep_handles slots.
2628 ///
2629 /// Slice A close (M1): pure dep-walk + dep_handles population +
2630 /// pending_fires queueing. No `in_tick` management or `drain_and_flush`
2631 /// call — the outer caller (typically `Core::subscribe` via
2632 /// [`Core::run_wave`]) owns the wave lifecycle and drains lock-released
2633 /// around `invoke_fn`.
2634 ///
2635 /// Walk shape:
2636 /// 1. **Discover phase (DFS via Vec stack):** starting at `root`,
2637 /// walk transitively-needing-activation deps via the `deps`
2638 /// chain. Build an ordering where each node appears AFTER all
2639 /// of its uncached compute deps — i.e., reverse topological
2640 /// among the visited subgraph.
2641 /// 2. **Deliver phase (forward iteration):** for each visited
2642 /// node in dep-first order, push deps' caches into the node's
2643 /// `dep_handles` slots. Caches that were sentinel pre-walk are
2644 /// filled because their parent's fn fires later in the wave's
2645 /// drain loop and `commit_emission` propagates new caches forward
2646 /// via `deliver_data_to_consumer` — the same path this method
2647 /// uses for the initial seed. Adds the node to `pending_fires`
2648 /// if its tracked-deps gate is satisfied; the wave-engine drain
2649 /// fires the fn lock-released around `invoke_fn`.
2650 pub(crate) fn activate_derived(&self, s: &mut CoreState, root: NodeId) {
2651 // Phase 1: discover. DFS to collect every compute node reachable
2652 // via deps that doesn't yet have a cache and hasn't fired.
2653 // Record them in dep-first (post-order) so phase 2 can deliver
2654 // caches forward. Frame is `(node_id, finalize)` — `finalize=false`
2655 // means "first visit: push deps then re-push self with finalize=true";
2656 // `finalize=true` means "deps have been expanded, append self to
2657 // `order`."
2658 let mut visited: HashSet<NodeId> = HashSet::new();
2659 let mut order: Vec<NodeId> = Vec::new();
2660 let mut stack: Vec<(NodeId, bool)> = vec![(root, false)];
2661 while let Some((id, finalize)) = stack.pop() {
2662 if finalize {
2663 order.push(id);
2664 continue;
2665 }
2666 if !visited.insert(id) {
2667 continue;
2668 }
2669 stack.push((id, true));
2670 let dep_ids: Vec<NodeId> = s.require_node(id).dep_ids_vec();
2671 for dep_id in dep_ids {
2672 let (dep_is_state, dep_cache, dep_has_fired) = {
2673 let dep_rec = s.require_node(dep_id);
2674 (dep_rec.is_state(), dep_rec.cache, dep_rec.has_fired_once)
2675 };
2676 if !dep_is_state
2677 && dep_cache == NO_HANDLE
2678 && !dep_has_fired
2679 && !visited.contains(&dep_id)
2680 {
2681 stack.push((dep_id, false));
2682 }
2683 }
2684 }
2685
2686 // Phase 2: deliver caches in dep-first order. For each node, walk
2687 // its deps and call `deliver_data_to_consumer` for any with caches.
2688 // Producer nodes (no deps + has fn — Slice D, D031) have no deps
2689 // to walk; queue them directly into `pending_fires` so the wave
2690 // engine fires their fn once on activation.
2691 for &id in &order {
2692 let (dep_ids, is_producer) = {
2693 let rec = s.require_node(id);
2694 (rec.dep_ids_vec(), rec.is_producer())
2695 };
2696 if is_producer {
2697 s.pending_fires.insert(id);
2698 continue;
2699 }
2700 for (i, dep_id) in dep_ids.iter().copied().enumerate() {
2701 let dep_cache = s.require_node(dep_id).cache;
2702 if dep_cache != NO_HANDLE {
2703 self.deliver_data_to_consumer(s, id, i, dep_cache);
2704 }
2705 }
2706 }
2707 }
2708
2709 // -------------------------------------------------------------------
2710 // Emission entry point
2711 // -------------------------------------------------------------------
2712
2713 /// Set a state node's value. Triggers a wave (DIRTY → DATA/RESOLVED →
2714 /// fn fires for downstream).
2715 ///
2716 /// Silent no-op if the node has already terminated (R1.3.4). The handle
2717 /// passed in is still released by the caller's binding-side intern path
2718 /// — no implicit retain is consumed when the call short-circuits.
2719 ///
2720 /// # Panics
2721 ///
2722 /// Panics if `node_id` is not a state node, or if `new_handle` is
2723 /// [`NO_HANDLE`] (per R1.2.4, sentinel is not a valid DATA payload).
2724 pub fn emit(&self, node_id: NodeId, new_handle: HandleId) {
2725 assert!(
2726 new_handle != NO_HANDLE,
2727 "NO_HANDLE is not a valid DATA payload (R1.2.4)"
2728 );
2729 // Validate + terminal short-circuit under a brief lock.
2730 //
2731 // emit() is valid for State and Producer nodes — both are
2732 // intrinsic sources whose values are not derived from declared
2733 // deps. State nodes get emit() from user code; Producer nodes
2734 // get emit() from sink callbacks the producer's build closure
2735 // registered (sink fires → re-enter Core → emit on self).
2736 // Derived / Dynamic / Operator nodes emit via their fn return
2737 // value through fire_fn / fire_operator, NOT via emit().
2738 {
2739 let s = self.lock_state();
2740 let rec = s.require_node(node_id);
2741 assert!(
2742 rec.is_state() || rec.is_producer(),
2743 "emit() is for state or producer nodes only; \
2744 derived/dynamic/operator emit via their fn return value"
2745 );
2746 if rec.terminal.is_some() {
2747 drop(s);
2748 // Caller's intern share would otherwise leak; cache slot
2749 // ownership doesn't transfer because we're not advancing
2750 // cache. Released lock-released so the binding can't
2751 // deadlock against an internal binding mutex.
2752 self.binding.release_handle(new_handle);
2753 return;
2754 }
2755 }
2756 // Run wave — `run_wave` and `commit_emission` manage their own
2757 // locking; binding callbacks (custom_equals, sinks) fire lock-
2758 // released.
2759 self.run_wave(|this| {
2760 this.commit_emission(node_id, new_handle);
2761 });
2762 }
2763
2764 /// Read a node's current cache. Returns [`NO_HANDLE`] if sentinel.
2765 #[must_use]
2766 pub fn cache_of(&self, node_id: NodeId) -> HandleId {
2767 self.lock_state().require_node(node_id).cache
2768 }
2769
2770 /// Whether the node's fn has fired at least once (compute) OR it has had
2771 /// a non-sentinel value (state).
2772 #[must_use]
2773 pub fn has_fired_once(&self, node_id: NodeId) -> bool {
2774 self.lock_state().require_node(node_id).has_fired_once
2775 }
2776
2777 // -------------------------------------------------------------------
2778 // Read-side inspection helpers (Slice E+, M2)
2779 //
2780 // Non-panicking accessors for graph-layer introspection (`describe()`,
2781 // `observe()`, `node_count()`). All five return Option/empty for
2782 // unknown ids — they're meant to back walks over `node_ids()` where
2783 // the caller already knows the ids are valid, plus debugging /
2784 // dry-run probes that prefer "absence" over "panic".
2785 //
2786 // Keep these strictly read-only: no wave entry, no binding callbacks,
2787 // no lock release. Each takes the state lock once, copies a small
2788 // value, and drops the lock.
2789 // -------------------------------------------------------------------
2790
2791 /// Snapshot of every registered `NodeId` in unspecified order. The
2792 /// order matches `HashMap` iteration over the internal node table —
2793 /// callers that need stable ordering should track names at the
2794 /// `Graph` layer (canonical spec §3.5 namespace).
2795 #[must_use]
2796 pub fn node_ids(&self) -> Vec<NodeId> {
2797 self.lock_state().nodes.keys().copied().collect()
2798 }
2799
2800 /// Total number of nodes registered in this Core.
2801 #[must_use]
2802 pub fn node_count(&self) -> usize {
2803 self.lock_state().nodes.len()
2804 }
2805
2806 /// Returns `Some(kind)` for known nodes, `None` for unknown. Kind is
2807 /// **derived** from the field shape per D030 — see [`NodeKind`].
2808 #[must_use]
2809 pub fn kind_of(&self, node_id: NodeId) -> Option<NodeKind> {
2810 self.lock_state().nodes.get(&node_id).map(NodeRecord::kind)
2811 }
2812
2813 /// Snapshot of the node's deps in declaration order. Empty for
2814 /// unknown nodes or for state nodes (which have no deps).
2815 #[must_use]
2816 pub fn deps_of(&self, node_id: NodeId) -> Vec<NodeId> {
2817 self.lock_state()
2818 .nodes
2819 .get(&node_id)
2820 .map(NodeRecord::dep_ids_vec)
2821 .unwrap_or_default()
2822 }
2823
2824 /// Returns `Some(kind)` if the node has terminated (R1.3.4) — the
2825 /// pair `Some(Complete)` / `Some(Error(h))` mirrors the wire message
2826 /// the node emitted. `None` for live nodes or unknown ids.
2827 #[must_use]
2828 pub fn is_terminal(&self, node_id: NodeId) -> Option<TerminalKind> {
2829 self.lock_state()
2830 .nodes
2831 .get(&node_id)
2832 .and_then(|r| r.terminal)
2833 }
2834
2835 /// Whether the node has wave-scoped DIRTY pending (a tier-1 message
2836 /// queued but the matching tier-3 settle has not yet flushed).
2837 /// `false` for unknown ids. Mostly useful for `describe()` status
2838 /// classification (R3.6.1 `"dirty"`).
2839 #[must_use]
2840 pub fn is_dirty(&self, node_id: NodeId) -> bool {
2841 self.lock_state()
2842 .nodes
2843 .get(&node_id)
2844 .is_some_and(|r| r.dirty)
2845 }
2846
2847 /// Snapshot of `parent`'s meta companion list (R1.3.9.d / R2.3.3 —
2848 /// the companions added via [`Self::add_meta_companion`]). Empty
2849 /// for unknown ids or for nodes with no companions registered.
2850 ///
2851 /// Used by the graph layer's `signal_invalidate` to filter meta
2852 /// children out of the broadcast (canonical R3.7.2 — meta caches
2853 /// are preserved across graph-wide INVALIDATE).
2854 #[must_use]
2855 pub fn meta_companions_of(&self, parent: NodeId) -> Vec<NodeId> {
2856 self.lock_state()
2857 .nodes
2858 .get(&parent)
2859 .map(|r| r.meta_companions.clone())
2860 .unwrap_or_default()
2861 }
2862
2863 // -------------------------------------------------------------------
2864 // Wave engine — lives in `crate::batch` (Slice C-1 module split;
2865 // Slice A close M1 refactor lifted the binding-callback re-entrance
2866 // restrictions). The methods are still on `Core`; see `batch.rs` for:
2867 //
2868 // - `run_wave` — wave entry, manages own locking.
2869 // - `drain_and_flush` — drain phase, lock-released around invoke_fn.
2870 // - `commit_emission` — lock-released around custom_equals.
2871 // - `pick_next_fire`, `deliver_data_to_consumer`, `queue_notify`,
2872 // `flush_notifications` — wave-engine helpers.
2873 // -------------------------------------------------------------------
2874}
2875
2876// -----------------------------------------------------------------------
2877// COMPLETE / ERROR — terminal lifecycle + auto-cascade gating
2878// -----------------------------------------------------------------------
2879
2880impl Core {
2881 /// Emit `[COMPLETE]` (R1.3.4) on `node_id`, marking it terminal. After
2882 /// this call:
2883 ///
2884 /// - Subsequent `Core::emit` on this node is a silent no-op (idempotent
2885 /// termination).
2886 /// - The node's fn no longer fires.
2887 /// - The node's cache is preserved (last value still observable via
2888 /// `cache_of`).
2889 /// - Children receive `[COMPLETE]` (tier 5 — bypasses pause buffer).
2890 /// - Auto-cascade gating (Lock 2.B): each child that has all of its
2891 /// deps in a terminal state auto-emits its own `[COMPLETE]`. ERROR
2892 /// dominates COMPLETE — if any of a child's deps emitted ERROR, the
2893 /// child auto-cascades that ERROR instead.
2894 ///
2895 /// Idempotent: calling `complete` on an already-terminal node is a no-op.
2896 ///
2897 /// # Panics
2898 ///
2899 /// Panics if `node_id` is unknown.
2900 pub fn complete(&self, node_id: NodeId) {
2901 self.emit_terminal(node_id, TerminalKind::Complete);
2902 }
2903
2904 /// Emit `[ERROR, error_handle]` (R1.3.4) on `node_id`. `error_handle`
2905 /// must resolve to a non-sentinel value (R1.2.5) — the binding side has
2906 /// already interned the error value before this call. Same lifecycle
2907 /// effects as [`Self::complete`]; ERROR dominates COMPLETE in auto-
2908 /// cascade gating.
2909 ///
2910 /// # Panics
2911 ///
2912 /// Panics if `node_id` is unknown or `error_handle == NO_HANDLE`.
2913 pub fn error(&self, node_id: NodeId, error_handle: HandleId) {
2914 assert!(
2915 error_handle != NO_HANDLE,
2916 "NO_HANDLE is not a valid ERROR payload (R1.2.5)"
2917 );
2918 self.emit_terminal(node_id, TerminalKind::Error(error_handle));
2919 // The caller's intern share for `error_handle` is NOT transferred
2920 // to any slot — `terminate_node` takes its OWN retain for every
2921 // populated `terminal` and `dep_terminals` slot. Release the
2922 // caller's share here (mirrors `Core::emit`'s short-circuit
2923 // release on terminal). Without this, every `error()` call leaks
2924 // one binding-side handle ref. Slice A-bigger /qa item D fix.
2925 self.binding.release_handle(error_handle);
2926 }
2927
2928 fn emit_terminal(&self, node_id: NodeId, terminal: TerminalKind) {
2929 {
2930 let s = self.lock_state();
2931 assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
2932 }
2933 // Wave runs with `run_wave` orchestrating drain. The thunk acquires
2934 // its own lock to queue the cascade (terminate_node is a fast
2935 // structural walk; no binding callbacks beyond non-re-entrant
2936 // retain/release).
2937 self.run_wave(|this| {
2938 let mut s = this.lock_state();
2939 this.terminate_node(&mut s, node_id, terminal);
2940 });
2941 }
2942
2943 /// Set the node's terminal slot, queue the wire message, and cascade to
2944 /// children. Idempotent on already-terminal node (no-op).
2945 ///
2946 /// Iterative implementation (Slice A-bigger, M1-close): a work-queue
2947 /// drives the cascade so deep linear chains don't overflow the OS
2948 /// thread stack. Mirrors `path_from_to`'s explicit-stack pattern.
2949 fn terminate_node(&self, s: &mut CoreState, node_id: NodeId, terminal: TerminalKind) {
2950 let mut work: Vec<(NodeId, TerminalKind)> = vec![(node_id, terminal)];
2951 while let Some((id, t)) = work.pop() {
2952 if s.require_node(id).terminal.is_some() {
2953 continue; // Idempotent — already terminal.
2954 }
2955 // Take a refcount share for the terminal slot so the error
2956 // handle outlives the binding-side intern's transient share.
2957 if let TerminalKind::Error(h) = t {
2958 self.binding.retain_handle(h);
2959 }
2960 // Slice E2 /qa Q2(b) (D069): if a resubscribable node is
2961 // terminating with no live subscribers, queue eager
2962 // `wipe_ctx` for the wave's lock-released drain. This is the
2963 // mutually-exclusive complement of the `Subscription::Drop`
2964 // wipe site: when the LAST sub drops first then terminate
2965 // fires, subs are empty here and we queue; when terminate
2966 // fires WITH subs still alive, we DON'T queue (subs not
2967 // empty), and `Subscription::Drop` will fire wipe directly
2968 // when those subs eventually drop. Either way, exactly one
2969 // wipe fires per terminal lifecycle.
2970 let queue_wipe = {
2971 let rec = s.require_node(id);
2972 rec.resubscribable && rec.subscribers.is_empty()
2973 };
2974 s.require_node_mut(id).terminal = Some(t);
2975 if queue_wipe {
2976 s.pending_wipes.push(id);
2977 }
2978 // Drain pending fires for this node — fn won't fire on a
2979 // terminal node.
2980 s.pending_fires.remove(&id);
2981 // R1.3.8.b / Slice F (A3, 2026-05-07): if this node was paused
2982 // when terminating (the canonical case is the R1.3.8.c overflow
2983 // ERROR synthesis path), drain the pause buffer and release
2984 // each payload's queue_notify-time retain. Without this, the
2985 // buffer leaks one share per buffered DATA/RESOLVED/INVALIDATE.
2986 // Subscribers receive the terminal directly via the cascade
2987 // below (tier-5 bypasses the pause buffer); the buffered
2988 // content is moot post-terminal.
2989 let drained: Vec<HandleId> = {
2990 let rec = s.require_node_mut(id);
2991 let mut drained: Vec<HandleId> = Vec::new();
2992 if rec.pause_state.is_paused() {
2993 // Take the buffered messages out, then collapse the
2994 // pause state to Active so subsequent code observes a
2995 // clean lifecycle. Idempotent on Active (no-op).
2996 let prev = std::mem::replace(&mut rec.pause_state, PauseState::Active);
2997 if let PauseState::Paused { buffer, .. } = prev {
2998 drained.extend(buffer.into_iter().filter_map(Message::payload_handle));
2999 }
3000 }
3001 // QA A4 (2026-05-07): drain replay buffer on terminate. A
3002 // non-resubscribable terminal ends the lifecycle; without
3003 // this drain the buffer's retains leak until `Drop for
3004 // CoreState`. Resubscribable nodes' replay buffers are
3005 // also drained (when they're hit by a terminal cascade);
3006 // a fresh subscribe rebuilds the buffer from scratch as
3007 // part of `reset_for_fresh_lifecycle`.
3008 drained.extend(rec.replay_buffer.drain(..));
3009 drained
3010 };
3011 for h in drained {
3012 self.binding.release_handle(h);
3013 }
3014 // Queue the wire message (tier 5 — bypasses pause buffer).
3015 let msg = match t {
3016 TerminalKind::Complete => Message::Complete,
3017 TerminalKind::Error(h) => Message::Error(h),
3018 };
3019 self.queue_notify(s, id, msg);
3020 // Cascade to children.
3021 let child_ids: Vec<NodeId> = s
3022 .children
3023 .get(&id)
3024 .map(|c| c.iter().copied().collect())
3025 .unwrap_or_default();
3026 for child_id in child_ids {
3027 let dep_idx = s.require_node(child_id).dep_index_of(id);
3028 let Some(idx) = dep_idx else { continue };
3029 // Mark this child's per-dep terminal slot. Take a retain on
3030 // the error handle for the slot share.
3031 {
3032 let child = s.require_node_mut(child_id);
3033 if child.dep_records[idx].terminal.is_some() {
3034 // Idempotent — child already saw this dep terminate.
3035 continue;
3036 }
3037 child.dep_records[idx].terminal = Some(t);
3038 }
3039 if let TerminalKind::Error(h) = t {
3040 self.binding.retain_handle(h);
3041 }
3042 // Auto-cascade gating: if all deps now terminal, push child
3043 // onto the work queue with the chosen terminal.
3044 //
3045 // Slice C-1: kinds that opt out of Lock 2.B (currently
3046 // `Operator(Reduce)`) intercept upstream COMPLETE so they
3047 // can emit their accumulator before terminating. Instead of
3048 // cascading, queue the child for fn-fire — `fire_operator`
3049 // sees `dep_records[0].terminal` set and emits the
3050 // appropriate batch (Data(acc) + Complete on COMPLETE,
3051 // Error(h) on ERROR).
3052 let action = {
3053 let child = s.require_node(child_id);
3054 if child.terminal.is_some() {
3055 ChildAction::None // Already terminated — no-op.
3056 } else if child.all_deps_terminal() {
3057 if child.skips_auto_cascade() {
3058 ChildAction::QueueFire
3059 } else {
3060 ChildAction::Cascade(pick_cascade_terminal(&child.dep_records))
3061 }
3062 } else {
3063 ChildAction::None
3064 }
3065 };
3066 match action {
3067 ChildAction::None => {}
3068 ChildAction::Cascade(t_child) => {
3069 work.push((child_id, t_child));
3070 }
3071 ChildAction::QueueFire => {
3072 s.pending_fires.insert(child_id);
3073 }
3074 }
3075 }
3076 }
3077 }
3078}
3079
3080/// Outcome of Lock 2.B child gating in `terminate_node`'s cascade walk.
3081enum ChildAction {
3082 /// No cascade; child is already terminal or not yet all-deps-terminal.
3083 None,
3084 /// Auto-cascade with the picked terminal kind (ERROR dominates COMPLETE).
3085 Cascade(TerminalKind),
3086 /// Queue child for fn-fire instead of cascading. Used by operator
3087 /// kinds that intercept upstream terminal (Operator(Reduce)).
3088 QueueFire,
3089}
3090
3091/// Lock 2.B cascade-terminal selection: ERROR dominates COMPLETE; first
3092/// ERROR seen wins. Caller has already verified all deps are terminal.
3093fn pick_cascade_terminal(dep_records: &[DepRecord]) -> TerminalKind {
3094 for dr in dep_records {
3095 if let Some(TerminalKind::Error(h)) = dr.terminal {
3096 return TerminalKind::Error(h);
3097 }
3098 }
3099 TerminalKind::Complete
3100}
3101
3102// -----------------------------------------------------------------------
3103// TEARDOWN — destruction, with auto-COMPLETE prepend (R2.6.4 / Lock 6.F)
3104// -----------------------------------------------------------------------
3105
3106impl Core {
3107 /// Tear `node_id` down. Per R2.6.4 / Lock 6.F:
3108 ///
3109 /// - **Auto-prepend COMPLETE.** If the node has not yet emitted a
3110 /// terminal (`COMPLETE` / `ERROR`), `terminate_node` is called with
3111 /// `Complete` first so subscribers see `[COMPLETE, TEARDOWN]`, not
3112 /// bare `[TEARDOWN]`. This guarantees a clean end-of-stream signal
3113 /// to async iterators and other consumers that wait on terminal
3114 /// delivery.
3115 /// - **Idempotent on duplicate delivery.** The per-node
3116 /// `has_received_teardown` flag is set on the first call; subsequent
3117 /// `teardown` calls (or cascade visits from other paths) are silent
3118 /// no-ops — no second `[COMPLETE, TEARDOWN]` pair to subscribers.
3119 /// - **Cascade downstream.** Each child is recursively torn down. The
3120 /// child's own COMPLETE auto-cascades from `terminate_node`'s logic
3121 /// (Lock 2.B); its TEARDOWN comes from this cascade.
3122 ///
3123 /// # Panics
3124 ///
3125 /// Panics if `node_id` is unknown.
3126 pub fn teardown(&self, node_id: NodeId) {
3127 {
3128 let s = self.lock_state();
3129 assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
3130 }
3131 let torn_down: Arc<Mutex<Vec<NodeId>>> = Arc::new(Mutex::new(Vec::new()));
3132 let torn_down_for_wave = torn_down.clone();
3133 self.run_wave(move |this| {
3134 let mut s = this.lock_state();
3135 let collected = this.teardown_inner(&mut s, node_id);
3136 torn_down_for_wave.lock().extend(collected);
3137 });
3138 // Fire NodeTornDown for every cascaded id (root + metas +
3139 // downstream consumers that auto-cascaded). Outside the state
3140 // lock, matching fire_topology_event discipline.
3141 let ids = std::mem::take(&mut *torn_down.lock());
3142 for id in ids {
3143 self.fire_topology_event(&crate::topology::TopologyEvent::NodeTornDown(id));
3144 }
3145 }
3146
3147 /// Iterative teardown walk (Slice A-bigger, M1-close).
3148 ///
3149 /// The recursive shape was:
3150 /// ```text
3151 /// teardown(n):
3152 /// if torn_down: return
3153 /// mark torn_down
3154 /// for meta in metas: teardown(meta)
3155 /// terminate_node + queue Teardown
3156 /// for child in children: teardown(child)
3157 /// ```
3158 /// Deep linear chains (~10k nodes) overflowed the OS thread stack.
3159 ///
3160 /// The iterative shape uses a `Vec<Action>` stack with `Visit` and
3161 /// `EmitTeardown` actions. `Visit(n)` marks `n` torn-down (or no-ops
3162 /// if already), then pushes (in reverse order so LIFO pops in forward
3163 /// order) `Visit(child_K), …, Visit(child_1), EmitTeardown(n),
3164 /// Visit(meta_M), …, Visit(meta_1)`. The R1.3.9.d "metas first, then
3165 /// self, then children" ordering is preserved by the push order:
3166 /// metas pop first, recursively expand and emit; then `EmitTeardown(n)`
3167 /// pops and runs `terminate_node` + queue `Teardown`; then children
3168 /// pop. Idempotency via `has_received_teardown` keeps each node
3169 /// visited at most once even when multi-parent diamonds re-enter via
3170 /// a sibling path.
3171 fn teardown_inner(&self, s: &mut CoreState, root: NodeId) -> Vec<NodeId> {
3172 enum Action {
3173 Visit(NodeId),
3174 EmitTeardown(NodeId),
3175 }
3176 let mut stack: Vec<Action> = vec![Action::Visit(root)];
3177 // Topology accumulator: every node that actually emits TEARDOWN
3178 // (i.e. each `EmitTeardown(id)` site, NOT each `Visit` — visits
3179 // for already-torn-down nodes short-circuit on idempotency).
3180 let mut torn_down: Vec<NodeId> = Vec::new();
3181 while let Some(action) = stack.pop() {
3182 match action {
3183 Action::Visit(id) => {
3184 if s.require_node(id).has_received_teardown {
3185 continue; // Idempotent (R2.6.4).
3186 }
3187 s.require_node_mut(id).has_received_teardown = true;
3188 // Push order: children first (pop LAST), then
3189 // EmitTeardown(id), then metas (pop FIRST). Reverse
3190 // each list so within-group order matches the original
3191 // recursive iteration.
3192 let children: Vec<NodeId> = s
3193 .children
3194 .get(&id)
3195 .map(|c| c.iter().copied().collect())
3196 .unwrap_or_default();
3197 for &child in children.iter().rev() {
3198 stack.push(Action::Visit(child));
3199 }
3200 stack.push(Action::EmitTeardown(id));
3201 let metas: Vec<NodeId> = s.require_node(id).meta_companions.clone();
3202 for &meta in metas.iter().rev() {
3203 stack.push(Action::Visit(meta));
3204 }
3205 }
3206 Action::EmitTeardown(id) => {
3207 // Auto-prepend COMPLETE if not yet terminal. The (now
3208 // iterative) terminate_node handles auto-cascade to
3209 // children's own terminal slots per Lock 2.B.
3210 let already_terminal = s.require_node(id).terminal.is_some();
3211 if !already_terminal {
3212 self.terminate_node(s, id, TerminalKind::Complete);
3213 }
3214 // Wire emission of the TEARDOWN itself (tier 6).
3215 self.queue_notify(s, id, Message::Teardown);
3216 torn_down.push(id);
3217 }
3218 }
3219 }
3220 torn_down
3221 }
3222
3223 /// Attach `companion` as a meta companion of `parent` per R1.3.9.d.
3224 /// Meta companions are nodes whose lifecycle is bound to the parent's
3225 /// in TEARDOWN ordering: when `parent` tears down, `companion` tears
3226 /// down first.
3227 ///
3228 /// Use this for inspection / audit / sidecar nodes that subscribe to
3229 /// parent state — without the ordering, the companion could observe
3230 /// the parent mid-destruction and emit garbage.
3231 ///
3232 /// Idempotent on duplicate registration of the same companion.
3233 ///
3234 /// # Lifecycle constraint
3235 ///
3236 /// Intended for **setup-time** wiring — call this before `parent` or
3237 /// `companion` enters a wave. Mid-wave registration (especially during
3238 /// a teardown cascade in flight) is implementation-defined: the new
3239 /// edge takes effect on the *next* wave. Adding a companion to a
3240 /// torn-down parent silently no-ops (the parent will not tear down
3241 /// again). For dynamic companion attachment with deterministic
3242 /// ordering, prefer constructing the wiring before subscribers exist.
3243 ///
3244 /// # Panics
3245 ///
3246 /// Panics if either node id is unknown, or if `parent == companion`
3247 /// (a node cannot be its own meta companion — would loop on TEARDOWN).
3248 pub fn add_meta_companion(&self, parent: NodeId, companion: NodeId) {
3249 assert!(parent != companion, "node cannot be its own meta companion");
3250 let mut s = self.lock_state();
3251 assert!(s.nodes.contains_key(&parent), "unknown parent {parent:?}");
3252 assert!(
3253 s.nodes.contains_key(&companion),
3254 "unknown companion {companion:?}"
3255 );
3256 let metas = &mut s.require_node_mut(parent).meta_companions;
3257 if !metas.contains(&companion) {
3258 metas.push(companion);
3259 }
3260 }
3261}
3262
3263// -----------------------------------------------------------------------
3264// INVALIDATE — cache clear + downstream cascade
3265// -----------------------------------------------------------------------
3266
3267impl Core {
3268 /// Clear `node_id`'s cache and cascade `[INVALIDATE]` to downstream
3269 /// dependents per canonical spec §1.4.
3270 ///
3271 /// Semantics:
3272 /// - **Never-populated case (R1.4 line 197):** if `cache == NO_HANDLE`,
3273 /// the call is a no-op — no cache to clear, no INVALIDATE emitted.
3274 /// This naturally provides idempotency within a wave: once a node has
3275 /// been invalidated this wave (cache = NO_HANDLE), a second invalidate
3276 /// on the same node does nothing.
3277 /// - **Cache clear (immediate):** the node's cached handle is dropped
3278 /// (refcount released), `cache` becomes `NO_HANDLE`. State nodes
3279 /// keep `has_fired_once` per spec — INVALIDATE is not a re-gating
3280 /// event (the next emission to a previously-fired state still does
3281 /// not re-trigger the first-run gate; that's a resubscribable-terminal
3282 /// lifecycle concern, separate slice).
3283 /// - **Wire emission (tier 4):** `[INVALIDATE]` is queued via the
3284 /// normal pause-aware notify path. Buffers while paused, flushes
3285 /// immediately otherwise.
3286 /// - **Downstream cascade:** for each child of this node, the child's
3287 /// `dep_handles[idx_of_node]` is reset to `NO_HANDLE` (its previous
3288 /// value referenced a now-released handle). The child is then
3289 /// recursively invalidated (no-op if its cache was already
3290 /// `NO_HANDLE`). This re-closes the child's first-run gate — fn
3291 /// won't fire again until the upstream re-emits a value.
3292 ///
3293 /// Wraps in a fresh wave when called from outside a wave, so
3294 /// notifications flush at the natural wave boundary.
3295 ///
3296 /// # Panics
3297 ///
3298 /// Panics if `node_id` is unknown, consistent with `emit` / `pause`.
3299 pub fn invalidate(&self, node_id: NodeId) {
3300 {
3301 let s = self.lock_state();
3302 assert!(s.nodes.contains_key(&node_id), "unknown node {node_id:?}");
3303 }
3304 self.run_wave(|this| {
3305 let mut s = this.lock_state();
3306 this.invalidate_inner(&mut s, node_id);
3307 });
3308 }
3309
3310 /// Iterative invalidate cascade (Slice A-bigger, M1-close).
3311 ///
3312 /// The recursive shape was a depth-first cache-clear walk:
3313 /// ```text
3314 /// invalidate(n):
3315 /// if cache(n) == NO_HANDLE: return // already-invalidated guard
3316 /// cache(n) = NO_HANDLE; release handle
3317 /// queue Invalidate(n)
3318 /// for child in children:
3319 /// child.dep_handles[idx] = NO_HANDLE
3320 /// invalidate(child)
3321 /// ```
3322 /// Deep linear chains overflowed the OS thread stack. The work-queue
3323 /// rewrite has no ordering subtleties (unlike teardown's R1.3.9.d
3324 /// metas-first constraint) — Invalidate is a tier-4 broadcast where
3325 /// the never-populated / already-invalidated guard provides natural
3326 /// idempotency for diamond fan-in.
3327 fn invalidate_inner(&self, s: &mut CoreState, root: NodeId) {
3328 let mut work: Vec<NodeId> = vec![root];
3329 while let Some(node_id) = work.pop() {
3330 // Never-populated / already-invalidated: no-op (R1.4 idempotency).
3331 // Per R1.3.9.c never-populated case, OnInvalidate cleanup hook
3332 // also does NOT fire — natural fallout of skipping via the
3333 // cache==NO_HANDLE guard (we never reach the queue-push below).
3334 let old_handle = s.require_node(node_id).cache;
3335 if old_handle == NO_HANDLE {
3336 continue;
3337 }
3338 // Clear cache + release the handle's slot ownership.
3339 s.require_node_mut(node_id).cache = NO_HANDLE;
3340 self.binding.release_handle(old_handle);
3341 // Slice E2 (R1.3.9.b strict per D057 + D058 fire-at-cache-clear):
3342 // queue OnInvalidate cleanup hook for lock-released drain at
3343 // wave-end. The dedup set guarantees at-most-once-per-wave-per-
3344 // node firing even if a node re-populates mid-wave (via fn-fire
3345 // emit) and gets re-invalidated through a separate path. Pure
3346 // cache==NO_HANDLE idempotency (above) catches "still at
3347 // sentinel" only; the explicit set is the strict R1.3.9.b
3348 // reading.
3349 if s.invalidate_hooks_fired_this_wave.insert(node_id) {
3350 s.deferred_cleanup_hooks
3351 .push((node_id, CleanupTrigger::OnInvalidate));
3352 }
3353 // Wire emission. Pause-aware via queue_notify.
3354 self.queue_notify(s, node_id, Message::Invalidate);
3355 // Cascade: for each child, clear the dep record's prev_data
3356 // referencing this node and push child onto the work queue.
3357 let child_ids: Vec<NodeId> = s
3358 .children
3359 .get(&node_id)
3360 .map(|c| c.iter().copied().collect())
3361 .unwrap_or_default();
3362 for child_id in child_ids {
3363 let dep_idx = s.require_node(child_id).dep_index_of(node_id);
3364 if let Some(idx) = dep_idx {
3365 // Reset the child's dep record — the handle was just
3366 // released. Subsequent first-run-gate checks see
3367 // sentinel and re-close.
3368 //
3369 // Snapshot prev_data + data_batch retains for deferred
3370 // release, then clear the record. Two-phase to satisfy
3371 // the borrow checker (nodes + deferred_handle_releases
3372 // are separate CoreState fields).
3373 let (old_prev, batch_hs): (HandleId, SmallVec<[HandleId; 1]>) = {
3374 let dr = &s.require_node(child_id).dep_records[idx];
3375 (dr.prev_data, dr.data_batch.clone())
3376 };
3377 if old_prev != NO_HANDLE {
3378 s.deferred_handle_releases.push(old_prev);
3379 }
3380 for h in batch_hs {
3381 s.deferred_handle_releases.push(h);
3382 }
3383 let dr = &mut s.require_node_mut(child_id).dep_records[idx];
3384 dr.prev_data = NO_HANDLE;
3385 dr.data_batch.clear();
3386 work.push(child_id);
3387 }
3388 }
3389 }
3390 }
3391}
3392
3393// -----------------------------------------------------------------------
3394// PAUSE / RESUME — multi-pauser lockset + replay buffer
3395// -----------------------------------------------------------------------
3396
3397/// Reported back from [`Core::resume`] when the final lock releases.
3398///
3399/// `replayed` is the number of tier-3/tier-4 messages dispatched to
3400/// subscribers as part of the drain. `dropped` is the number of messages
3401/// that fell out the front of the buffer due to the Core-global
3402/// `pause_buffer_cap` while this pause cycle was active. A non-zero
3403/// `dropped` indicates a controller held the lock long enough to overflow
3404/// the cap; the binding may want to surface a warning or error.
3405#[derive(Copy, Clone, Debug, PartialEq, Eq)]
3406pub struct ResumeReport {
3407 pub replayed: u32,
3408 pub dropped: u32,
3409}
3410
3411impl Core {
3412 /// Acquire a pause lock on `node_id`. The first lock transitions the
3413 /// node from `Active` to `Paused`; further locks add to the lockset.
3414 /// While paused, tier-3 (DATA/RESOLVED) and tier-4 (INVALIDATE) outgoing
3415 /// messages buffer in the node's pause buffer; other tiers flush
3416 /// immediately.
3417 ///
3418 /// Re-acquiring the same `lock_id` is an idempotent no-op (matches TS
3419 /// convention, R1.2.6 silent on the case).
3420 pub fn pause(&self, node_id: NodeId, lock_id: LockId) -> Result<(), PauseError> {
3421 let mut s = self.lock_state();
3422 let rec = s
3423 .nodes
3424 .get_mut(&node_id)
3425 .ok_or(PauseError::UnknownNode(node_id))?;
3426 // QA A5 (2026-05-07): terminated nodes can't be re-paused. Without
3427 // this check, a stale pause-controller calling pause() on an
3428 // already-terminated node would re-arm `pause_state` to Paused.
3429 // The terminate_node path collapses pause_state → Active and
3430 // drains the buffer (A3-related), but doesn't gate subsequent
3431 // pause() calls. Treat as idempotent no-op (consistent with how
3432 // emit/complete/error early-return on terminal).
3433 if rec.terminal.is_some() {
3434 return Ok(());
3435 }
3436 // Slice F audit close (2026-05-07): `PausableMode::Off` means the
3437 // dispatcher ignores PAUSE for this node — tier-3 flushes
3438 // immediately, fn fires immediately. Treat the call as a successful
3439 // no-op so callers don't need to special-case.
3440 if rec.pausable == PausableMode::Off {
3441 return Ok(());
3442 }
3443 rec.pause_state.add_lock(lock_id);
3444 Ok(())
3445 }
3446
3447 /// Release a pause lock on `node_id`. If the lockset becomes empty, the
3448 /// node transitions back to `Active` and the buffered messages are
3449 /// dispatched to subscribers in arrival order. Returns a [`ResumeReport`]
3450 /// when the final lock released; `None` if the lockset is still
3451 /// non-empty (further locks held).
3452 ///
3453 /// Releasing an unknown `lock_id` (or releasing on an already-Active
3454 /// node) is an idempotent no-op returning `None`.
3455 pub fn resume(
3456 &self,
3457 node_id: NodeId,
3458 lock_id: LockId,
3459 ) -> Result<Option<ResumeReport>, PauseError> {
3460 // Phase 1 (lock-held): collect drained buffer + pending-wave flag +
3461 // sink Arcs. For default-mode nodes whose `pending_wave` was set
3462 // during pause, schedule a single fn-fire by adding to
3463 // `pending_fires` BEFORE we exit the lock — the wave engine picks
3464 // it up on the next drain tick.
3465 let (sinks, messages, dropped, pending_wave_for_default) = {
3466 let mut s = self.lock_state();
3467 let rec = s
3468 .nodes
3469 .get_mut(&node_id)
3470 .ok_or(PauseError::UnknownNode(node_id))?;
3471 // For Off mode, pause/resume are no-ops by construction.
3472 if rec.pausable == PausableMode::Off {
3473 return Ok(None);
3474 }
3475 let was_default_mode = rec.pausable == PausableMode::Default;
3476 // Capture pending_wave BEFORE remove_lock collapses the state.
3477 let pending_wave = if was_default_mode {
3478 rec.pause_state.take_pending_wave()
3479 } else {
3480 false
3481 };
3482 let Some((buffer, dropped)) = rec.pause_state.remove_lock(lock_id) else {
3483 // Not the final-resume — restore the pending_wave flag we
3484 // tentatively cleared, since we're not transitioning to
3485 // Active yet.
3486 if pending_wave {
3487 rec.pause_state.mark_pending_wave();
3488 }
3489 return Ok(None);
3490 };
3491 let sinks: Vec<Sink> = rec.subscribers.values().cloned().collect();
3492 let messages: Vec<Message> = buffer.into_iter().collect();
3493 // Default-mode pending-wave handling: schedule the fn-fire so
3494 // the wave engine consolidates the pause-window dep deliveries
3495 // into one fn execution. State nodes don't fire fn (no
3496 // `pending_fires` membership has effect for them).
3497 if pending_wave && was_default_mode {
3498 s.pending_fires.insert(node_id);
3499 }
3500 (sinks, messages, dropped, pending_wave && was_default_mode)
3501 };
3502 let replayed = u32::try_from(messages.len()).unwrap_or(u32::MAX);
3503
3504 // Phase 2 (lock-released): fire sinks for ResumeAll-buffered
3505 // messages. Default-mode resume produces no buffered replay (the
3506 // consolidated fn-fire produces fresh wave traffic via the standard
3507 // commit_emission path).
3508 if !messages.is_empty() {
3509 for sink in &sinks {
3510 sink(&messages);
3511 }
3512 // Phase 3: balance the retain_handle calls done at buffer-push
3513 // time — sinks observe values but don't own refcount shares.
3514 for msg in &messages {
3515 if let Some(h) = msg.payload_handle() {
3516 self.binding.release_handle(h);
3517 }
3518 }
3519 }
3520
3521 // Phase 4 (default-mode): drain the consolidated fn-fire scheduled
3522 // in Phase 1. `run_wave` re-acquires `wave_owner` reentrantly + runs
3523 // the standard drain pipeline; the new fn-fire emerges as a normal
3524 // wave's worth of messages to subscribers.
3525 if pending_wave_for_default {
3526 self.run_wave(|_this| {
3527 // The pending_fires entry was pushed in Phase 1 under the
3528 // lock. run_wave's drain picks it up.
3529 });
3530 }
3531 Ok(Some(ResumeReport { replayed, dropped }))
3532 }
3533
3534 /// True if the node currently holds at least one pause lock.
3535 #[must_use]
3536 pub fn is_paused(&self, node_id: NodeId) -> bool {
3537 self.state
3538 .lock()
3539 .require_node(node_id)
3540 .pause_state
3541 .is_paused()
3542 }
3543
3544 /// Number of pause locks currently held on `node_id`. `0` if Active.
3545 #[must_use]
3546 pub fn pause_lock_count(&self, node_id: NodeId) -> usize {
3547 self.state
3548 .lock()
3549 .require_node(node_id)
3550 .pause_state
3551 .lock_count()
3552 }
3553
3554 /// Test helper: whether `node_id` currently holds the given `lock_id`.
3555 #[must_use]
3556 pub fn holds_pause_lock(&self, node_id: NodeId, lock_id: LockId) -> bool {
3557 self.state
3558 .lock()
3559 .require_node(node_id)
3560 .pause_state
3561 .contains_lock(lock_id)
3562 }
3563}
3564
3565// -----------------------------------------------------------------------
3566// set_deps — atomic dep mutation
3567// -----------------------------------------------------------------------
3568
3569/// Errors returnable by [`Core::set_deps`].
3570///
3571/// Per `~/src/graphrefly-ts/docs/research/rewire-design-notes.md` and the
3572/// Phase 13.8 Q1 lock:
3573/// - `SelfDependency` — `n in newDeps` (self-loops are pathological without
3574/// explicit fixed-point semantics, which GraphReFly does not provide).
3575/// - `WouldCreateCycle { path }` — adding the new edge would create a cycle.
3576/// The `path` field reports the offending dep chain for debuggability.
3577/// - `UnknownNode` / `NotComputeNode` — invariant violations from the caller.
3578/// - `TerminalNode` — `n` itself has emitted COMPLETE/ERROR; rewiring a
3579/// terminal stream is a category error (terminal is one-shot at this
3580/// layer; recovery is the resubscribable path on a fresh subscribe).
3581/// - `TerminalDep` — a newly-added dep is terminal AND not resubscribable.
3582/// Resubscribable terminal deps are accepted because the subscribe path
3583/// resets their lifecycle. Non-resubscribable terminal deps would deliver
3584/// their already-emitted terminal directly to `n`'s `dep_terminals` slot,
3585/// which is rarely intended.
3586#[derive(Error, Debug, Clone, PartialEq)]
3587pub enum SetDepsError {
3588 /// `n` appeared in `new_deps` (self-loop rejection).
3589 #[error("set_deps({n:?}, ...): self-dependency rejected (n appeared in new_deps)")]
3590 SelfDependency { n: NodeId },
3591
3592 /// Adding the new dep would create a cycle. `path` is the chain
3593 /// `[added_dep, ..., n]` reachable via existing deps.
3594 #[error(
3595 "set_deps({n:?}, ...): cycle would form via path {path:?} \
3596 (adding {added_dep:?} → {n:?} closes the loop)"
3597 )]
3598 WouldCreateCycle {
3599 n: NodeId,
3600 added_dep: NodeId,
3601 path: Vec<NodeId>,
3602 },
3603
3604 #[error("set_deps: unknown node {0:?}")]
3605 UnknownNode(NodeId),
3606
3607 #[error("set_deps: node {0:?} is not a compute node (state nodes have no deps)")]
3608 NotComputeNode(NodeId),
3609
3610 /// `n` itself has terminated (COMPLETE / ERROR). Rewiring a terminal node
3611 /// is rejected — the stream has ended at this layer. To recover, mark
3612 /// the node resubscribable before terminate; a fresh subscribe will then
3613 /// reset its lifecycle.
3614 #[error("set_deps({n:?}, ...): node has already terminated; cannot rewire a terminal node")]
3615 TerminalNode { n: NodeId },
3616
3617 /// A newly-added dep is terminal AND non-resubscribable. Per Phase 13.8
3618 /// Q1, this is rejected; resubscribable terminal deps are allowed
3619 /// because the subscribe path resets them when activated. Already-present
3620 /// terminal deps are unaffected (their terminal status was accepted at
3621 /// the time they terminated).
3622 #[error(
3623 "set_deps({n:?}, ...): added dep {dep:?} is terminal and not resubscribable; \
3624 either mark it resubscribable before terminate, or remove the dep from new_deps"
3625 )]
3626 TerminalDep { n: NodeId, dep: NodeId },
3627
3628 /// `n` itself is currently mid-fire — a user fn for `n` re-entered Core
3629 /// via `set_deps(n, ...)` from inside `n`'s own `invoke_fn` /
3630 /// `project_each` / `predicate_each` / etc. Phase 1 of the dispatcher
3631 /// snapshotted `dep_handles` BEFORE the lock-released callback; the
3632 /// callback returning a `tracked` set indexed against THAT ordering
3633 /// would corrupt indices if the rewire re-orders deps mid-fire.
3634 /// Rejected to preserve the dynamic-tracked-indices invariant (D1).
3635 ///
3636 /// Workaround: schedule the rewire from a different node's fn (via
3637 /// `Core::emit` on a state node and observing the emit downstream),
3638 /// or perform the rewire after the wave completes (e.g. from a sink
3639 /// callback that is itself outside any fn-fire scope).
3640 ///
3641 /// Slice F (2026-05-07) — A6.
3642 #[error(
3643 "set_deps({n:?}, ...): rejected — node {n:?} is currently mid-fire \
3644 (set_deps from inside the firing node's own fn would corrupt the \
3645 Dynamic `tracked` indices snapshot taken before invoke_fn). \
3646 Schedule the rewire outside this fire scope."
3647 )]
3648 ReentrantOnFiringNode { n: NodeId },
3649}
3650
3651impl Core {
3652 /// Atomic dep mutation — change a node's upstream deps without TEARDOWN
3653 /// cascading and without losing cache.
3654 ///
3655 /// Per the TLA+-verified design at
3656 /// `~/src/graphrefly-ts/docs/research/wave_protocol_rewire.tla`
3657 /// (35,950 distinct states, all 7 invariants clean):
3658 ///
3659 /// - Removed deps: clear dirtyMask bit, drain pending queue, drop DepRecord.
3660 /// - Added deps: SENTINEL prevData; push-on-subscribe if added dep has cached DATA.
3661 /// - Preserved: `firstRunPassed`, `pauseLocks`, `pauseBuffer`, `cache` (ROM/RAM).
3662 /// - Status auto-settles if dirtyMask becomes empty.
3663 /// - Idempotent on `new_deps == current deps`.
3664 /// - Self-rewire `n ∈ new_deps` rejected (`SelfDependency`).
3665 /// - Cycles rejected (`WouldCreateCycle`).
3666 /// - Allowed mid-wave + while paused.
3667 /// - Phase 13.8 Q1: terminal `n` rejected (`TerminalNode`); newly-added
3668 /// terminal non-resubscribable deps rejected (`TerminalDep`).
3669 ///
3670 /// The body is a single atomic dep-mutation transaction with several
3671 /// discrete validation stages. Splitting would require passing a
3672 /// partially-mutable CoreState across helpers, and the transaction's
3673 /// locality is what makes the F1 refcount-leak collection work.
3674 #[allow(clippy::too_many_lines)]
3675 pub fn set_deps(&self, n: NodeId, new_deps: &[NodeId]) -> Result<(), SetDepsError> {
3676 let mut s = self.lock_state();
3677 // Validate node exists and is compute. Read-once via the helper so
3678 // subsequent code can use `require_node(n)` without re-checking.
3679 let (is_state, is_producer, is_terminal) = {
3680 let rec = s.nodes.get(&n).ok_or(SetDepsError::UnknownNode(n))?;
3681 (rec.is_state(), rec.is_producer(), rec.terminal.is_some())
3682 };
3683 if is_state || is_producer {
3684 // State and Producer nodes have no declared deps — set_deps
3685 // is meaningless. Producer nodes manage their own subscriptions
3686 // through the binding's ProducerCtx; mutating their (empty)
3687 // dep set would not affect that.
3688 return Err(SetDepsError::NotComputeNode(n));
3689 }
3690 // Reject if `n` itself is terminal (Phase 13.8 Q1: terminal nodes
3691 // cannot be rewired; recovery is via resubscribable subscribe).
3692 if is_terminal {
3693 return Err(SetDepsError::TerminalNode { n });
3694 }
3695 // A6 reentrancy guard (Slice F, 2026-05-07): reject if `n` is
3696 // currently mid-fire on the wave-owner thread. Closes the D1 hazard
3697 // where `Phase 1` snapshotted `dep_handles` against pre-rewire dep
3698 // ordering and `Phase 3` would store the returned `tracked` indices
3699 // against post-rewire ordering. Same-thread re-entry is the only
3700 // path that matters — cross-thread emits already block on
3701 // `wave_owner` per the M1 design.
3702 if s.currently_firing.contains(&n) {
3703 return Err(SetDepsError::ReentrantOnFiringNode { n });
3704 }
3705 // Self-rewire rejection.
3706 if new_deps.contains(&n) {
3707 return Err(SetDepsError::SelfDependency { n });
3708 }
3709 // Validate all new deps exist.
3710 for &d in new_deps {
3711 if !s.nodes.contains_key(&d) {
3712 return Err(SetDepsError::UnknownNode(d));
3713 }
3714 }
3715 // Cycle detection: data flows parent → child via the `children` map.
3716 // Adding edge `d → n` (d becomes a dep of n) creates a cycle iff
3717 // `d` is already reachable from `n` via existing data-flow edges
3718 // (so `n → ... → d` exists, and the new `d → n` closes the loop).
3719 // DFS from `n` along `children` edges, looking for each added dep.
3720 let current_deps: HashSet<NodeId> = s.require_node(n).dep_ids().collect();
3721 let new_deps_set: HashSet<NodeId> = new_deps.iter().copied().collect();
3722 let added: HashSet<NodeId> = new_deps_set.difference(¤t_deps).copied().collect();
3723 for &d in &added {
3724 if let Some(path) = self.path_from_to(&s, n, d) {
3725 return Err(SetDepsError::WouldCreateCycle {
3726 n,
3727 added_dep: d,
3728 path,
3729 });
3730 }
3731 }
3732 // Phase 13.8 Q1: reject newly-added deps that are terminal AND not
3733 // resubscribable. Resubscribable terminal deps are allowed — the
3734 // subscribe path resets their lifecycle when something activates
3735 // them. Already-present (kept) deps are unaffected; their terminal
3736 // status was accepted at the time they terminated.
3737 for &d in &added {
3738 let dep_rec = s.require_node(d);
3739 if dep_rec.terminal.is_some() && !dep_rec.resubscribable {
3740 return Err(SetDepsError::TerminalDep { n, dep: d });
3741 }
3742 }
3743 // Idempotent fast-path.
3744 if added.is_empty() && current_deps == new_deps_set {
3745 return Ok(());
3746 }
3747 let removed: HashSet<NodeId> = current_deps.difference(&new_deps_set).copied().collect();
3748
3749 // Snapshot old deps (ordered) for topology event, before mutation.
3750 let old_deps_vec: Vec<NodeId> = s.require_node(n).dep_ids_vec();
3751
3752 // Carry out the rewire atomically.
3753 // 1. Build new dep_records, preserving DepRecord state for kept deps.
3754 let new_deps_vec: Vec<NodeId> = new_deps.to_vec();
3755 //
3756 // Refcount discipline (F1 audit fix): each `Some(TerminalKind::Error(h))`
3757 // slot owns a refcount share retained at `terminate_node` time. When a
3758 // dep is REMOVED, its slot is dropped — the corresponding handle's
3759 // share must be released here, otherwise it leaks until Core drop.
3760 // Also release data_batch retains for removed deps.
3761 let (new_dep_records, removed_handles): (Vec<DepRecord>, Vec<HandleId>) = {
3762 let rec = s.require_node(n);
3763 // Index old dep_records by NodeId for O(1) lookup of kept deps.
3764 let old_by_node: HashMap<NodeId, &DepRecord> =
3765 rec.dep_records.iter().map(|dr| (dr.node, dr)).collect();
3766 let new_records: Vec<DepRecord> = new_deps_vec
3767 .iter()
3768 .map(|&d| {
3769 if let Some(old) = old_by_node.get(&d) {
3770 // Kept dep: preserve all state (prev_data, data_batch,
3771 // terminal, wave flags). Subscriptions stay live.
3772 DepRecord {
3773 node: d,
3774 prev_data: old.prev_data,
3775 dirty: old.dirty,
3776 involved_this_wave: old.involved_this_wave,
3777 data_batch: old.data_batch.clone(),
3778 terminal: old.terminal,
3779 }
3780 } else {
3781 // Added dep: fresh sentinel record.
3782 DepRecord::new(d)
3783 }
3784 })
3785 .collect();
3786 // Collect handles to release from REMOVED dep records.
3787 let mut to_release: Vec<HandleId> = Vec::new();
3788 for d in &removed {
3789 if let Some(old) = old_by_node.get(d) {
3790 if let Some(TerminalKind::Error(h)) = old.terminal {
3791 to_release.push(h);
3792 }
3793 // Release data_batch retains for removed deps.
3794 for &h in &old.data_batch {
3795 to_release.push(h);
3796 }
3797 }
3798 }
3799 (new_records, to_release)
3800 };
3801 // Clear dirtyMask bit by re-emitting the wave-bookkeeping: we don't
3802 // currently model a per-dep dirtyMask explicitly (we use the boolean
3803 // `dirty` flag at node level). Removing a dep's entry from the implicit
3804 // mask is therefore implicit — by removing the dep, future emissions
3805 // from it can't re-arm the bit. The per-dep `involved_this_wave` flag
3806 // stays wave-scoped and gets cleared at wave end. The setDeps action
3807 // itself does NOT change the dirty boolean unless all deps are cleared;
3808 // in that case we settle.
3809 // Slice E2 (D067): on a dynamic node that had previously fired its
3810 // fn, capture `has_fired_once` BEFORE the reset so we can fire
3811 // `OnRerun` cleanup lock-released after `drop(s)` below. Without
3812 // this, the next `fire_regular` Phase 1 would capture
3813 // `has_fired_once = false`, causing Phase 1.5 to skip OnRerun —
3814 // silently dropping the prior activation's cleanup closure when
3815 // the next `invoke_fn` overwrites `current_cleanup`. Per spec
3816 // R2.4.5, `set_deps` does NOT end the activation cycle
3817 // (subscribe→unsubscribe is the cycle boundary), so OnRerun must
3818 // fire on every re-fire including post-set_deps.
3819 let fire_set_deps_on_rerun;
3820 {
3821 let rec = s.require_node_mut(n);
3822 fire_set_deps_on_rerun = rec.is_dynamic && rec.has_fired_once;
3823 rec.dep_records = new_dep_records;
3824 // Re-derive `tracked` for static derived: all indices.
3825 // For dynamic: clear `tracked` AND reset `has_fired_once` so the
3826 // next dep delivery satisfies the first-fire branch in
3827 // `deliver_data_to_consumer` (`!has_fired_once || tracked.contains(...)`).
3828 // Without resetting `has_fired_once`, the cleared `tracked` blocks
3829 // every future fire — fn never re-runs and the dynamic node sits
3830 // on stale cache derived from the old dep set. The next fire
3831 // re-runs fn unconditionally; fn's returned `tracked` then
3832 // repopulates `rec.tracked` and normal selective-deps semantics
3833 // resume from the next dep update onward.
3834 if rec.is_dynamic {
3835 rec.tracked.clear();
3836 rec.has_fired_once = false;
3837 } else {
3838 // Derived (static) and Operator track all deps.
3839 rec.tracked = (0..new_deps_vec.len()).collect();
3840 }
3841 }
3842
3843 // 2. Update inverted-edge map (children).
3844 for &removed_dep in &removed {
3845 if let Some(set) = s.children.get_mut(&removed_dep) {
3846 set.remove(&n);
3847 }
3848 }
3849 for &added_dep in &added {
3850 s.children.entry(added_dep).or_default().insert(n);
3851 }
3852
3853 // 3. Push-on-subscribe for added deps with cached DATA. Wraps in a
3854 // wave so any downstream propagation runs cleanly. We capture only
3855 // the LIST of added deps (not their cache values) because the cache
3856 // can change between releasing the validation lock and the wave's
3857 // re-acquisition — see the P2 race fix below.
3858 //
3859 // P2 (Slice A close /qa) — between `drop(s)` and `run_wave`'s
3860 // closure re-acquiring the lock, a concurrent thread could
3861 // invalidate one of the added deps, releasing its cache handle. A
3862 // pre-snapshot of `(added_dep, cache)` pairs would then carry a
3863 // dangling HandleId into `deliver_data_to_consumer`. The fix is to
3864 // re-read each added dep's `cache` INSIDE the closure (under the
3865 // freshly re-acquired state lock). The wave-owner re-entrant mutex
3866 // (Q2) blocks concurrent waves once we enter `run_wave`, so the
3867 // re-read sees a coherent post-validation state.
3868 let added_for_wave: Vec<NodeId> = added.iter().copied().collect();
3869 let added_for_registry: Vec<NodeId> = added.iter().copied().collect();
3870 let removed_for_registry: Vec<NodeId> = removed.iter().copied().collect();
3871 // Drop the state lock before run_wave (which acquires its own) and
3872 // before crossing the binding boundary for the F1 refcount-fix
3873 // releases. Keeps the lock-discipline split (binding calls outside
3874 // the state lock) consistent with the rest of the dispatcher.
3875 drop(s);
3876 // Slice X5 (D3 substrate, 2026-05-08): maintain partition
3877 // membership across topology change.
3878 // - For each new edge: union the partitions of `n` and `added_dep`.
3879 // - For each removed edge: notify registry (X5 commit-1: no-op;
3880 // Y1 commit-2 will run reachability walk + split if disconnected).
3881 // Done lock-released wrt state. Registry mutex is held briefly.
3882 {
3883 let mut reg = self.registry.lock();
3884 for added_dep in &added_for_registry {
3885 reg.union_nodes(n, *added_dep);
3886 }
3887 for removed_dep in &removed_for_registry {
3888 reg.on_edge_removed(n, *removed_dep);
3889 }
3890 }
3891 // Slice E2 (D067): fire OnRerun lock-released for dynamic nodes
3892 // that had previously fired. The cleanup closure cleans up
3893 // resources tied to the old dep shape before the next fn-fire
3894 // (triggered by added-dep push-on-subscribe below) registers a
3895 // fresh cleanup spec. Direct fire (NOT via deferred_cleanup_hooks)
3896 // because set_deps may NOT enter a wave (no added deps → no
3897 // run_wave below) — queueing the hook would orphan it until the
3898 // next unrelated wave drains.
3899 if fire_set_deps_on_rerun {
3900 self.binding.cleanup_for(n, CleanupTrigger::OnRerun);
3901 }
3902 // Fire topology event after lock is dropped.
3903 self.fire_topology_event(&crate::topology::TopologyEvent::DepsChanged {
3904 node: n,
3905 old_deps: old_deps_vec,
3906 new_deps: new_deps_vec.clone(),
3907 });
3908 if !added_for_wave.is_empty() {
3909 self.run_wave(|this| {
3910 let mut s = this.lock_state();
3911 // Defensive: re-validate `n` still exists and isn't terminal.
3912 // A concurrent path could have terminated it between
3913 // validation and run_wave's wave_owner acquisition.
3914 if !s.nodes.contains_key(&n) || s.require_node(n).terminal.is_some() {
3915 return;
3916 }
3917 for added_dep in &added_for_wave {
3918 // Re-read cache under the wave-owner-held lock — this
3919 // is the post-validation, post-concurrent-action
3920 // snapshot. NO_HANDLE means the dep was invalidated
3921 // concurrently; skip (no data to push).
3922 let cache = match s.nodes.get(added_dep) {
3923 Some(rec) => rec.cache,
3924 None => continue, // dep deleted concurrently
3925 };
3926 if cache == NO_HANDLE {
3927 continue;
3928 }
3929 let dep_idx = s.require_node(n).dep_index_of(*added_dep);
3930 if let Some(idx) = dep_idx {
3931 this.deliver_data_to_consumer(&mut s, n, idx, cache);
3932 }
3933 }
3934 });
3935 }
3936 for h in removed_handles {
3937 self.binding.release_handle(h);
3938 }
3939 Ok(())
3940 }
3941
3942 /// DFS from `from` along data-flow edges (children map) looking for `to`.
3943 /// Returns the path including endpoints, or `None` if unreachable. Used
3944 /// for cycle detection in [`Self::set_deps`].
3945 fn path_from_to(&self, s: &CoreState, from: NodeId, to: NodeId) -> Option<Vec<NodeId>> {
3946 if from == to {
3947 return Some(vec![from]);
3948 }
3949 let mut stack: Vec<(NodeId, Vec<NodeId>)> = vec![(from, vec![from])];
3950 let mut visited: HashSet<NodeId> = HashSet::new();
3951 while let Some((cur, path)) = stack.pop() {
3952 if !visited.insert(cur) {
3953 continue;
3954 }
3955 if cur == to {
3956 return Some(path);
3957 }
3958 if let Some(children) = s.children.get(&cur) {
3959 for &child in children {
3960 let mut new_path = path.clone();
3961 new_path.push(child);
3962 stack.push((child, new_path));
3963 }
3964 }
3965 }
3966 None
3967 }
3968}
3969
3970// CoreState helpers — kept on the inner struct so they're naturally scoped
3971// to the lock guard.
3972impl CoreState {
3973 fn alloc_node_id(&mut self) -> NodeId {
3974 let id = NodeId::new(self.next_node_id);
3975 self.next_node_id += 1;
3976 id
3977 }
3978
3979 fn alloc_sub_id(&mut self) -> SubscriptionId {
3980 let id = SubscriptionId(self.next_subscription_id);
3981 self.next_subscription_id += 1;
3982 id
3983 }
3984
3985 /// Clear wave-scoped flags and rotate per-dep batch data on every
3986 /// node. Run at the end of every wave (regular drain via `run_wave`,
3987 /// activation drain via `activate_derived`, and `BatchGuard::drop`'s
3988 /// drain). Centralized so a future wave-state field can't be missed
3989 /// at one of the cleanup sites.
3990 ///
3991 /// Per-dep rotation (R2.9.b / R1.3.6.b):
3992 /// - `prev_data` ← last element of `data_batch` (or unchanged if empty).
3993 /// The last batch entry's retain transfers to `prev_data`; the old
3994 /// `prev_data`'s retain is released. All earlier batch entries are
3995 /// released.
3996 /// - `data_batch` cleared.
3997 /// - Per-dep `dirty` and `involved_this_wave` cleared.
3998 ///
3999 /// Handle releases are pushed to `deferred_handle_releases` for
4000 /// post-lock-drop release by the caller.
4001 pub(crate) fn clear_wave_state(&mut self) {
4002 self.pending_auto_resolve.clear();
4003 // A6 (Slice F, 2026-05-07): currently_firing is push/pop balanced
4004 // by FiringGuard's RAII (including on panic). It should already be
4005 // empty here, but defensively clear in case a future code path
4006 // forgets the guard.
4007 self.currently_firing.clear();
4008 // A3 (Slice F, 2026-05-07): pending_pause_overflow is normally
4009 // drained by drain_and_flush via the synthesis loop. If a wave is
4010 // panic-discarded BEFORE the synthesis runs (e.g. invoke_fn panics
4011 // before a paused-overflow has a chance to synthesize), we drop the
4012 // queued entries silently — the binding never sees ERROR for that
4013 // overflow event, but the pause buffer's `dropped` count is
4014 // unchanged so callers can still detect via ResumeReport. Re-firing
4015 // the synthesis on the next wave would be confusing (the overflow
4016 // event is logically scoped to the panicked wave).
4017 self.pending_pause_overflow.clear();
4018 // Slice G: tier3 emit tracking is wave-scoped.
4019 self.tier3_emitted_this_wave.clear();
4020 // Slice E2 (D057): per-wave-per-node OnInvalidate dedup is
4021 // wave-scoped — cleared so the next wave can fire cleanups again.
4022 self.invalidate_hooks_fired_this_wave.clear();
4023 // Slice E2 INVARIANT (DO NOT CHANGE WITHOUT THINKING):
4024 // `deferred_cleanup_hooks` is NOT cleared here. It follows the
4025 // `deferred_handle_releases` discipline:
4026 // - SUCCESS path (`BatchGuard::drop` non-panic): drained by
4027 // `Core::drain_deferred` AFTER `clear_wave_state` runs, then
4028 // fired lock-released by `Core::fire_deferred`.
4029 // - PANIC-DISCARD path (`BatchGuard::drop` panic): explicitly
4030 // `std::mem::take`-and-dropped AFTER `clear_wave_state` runs,
4031 // silently per D061.
4032 // Clearing it INSIDE `clear_wave_state` would race the success
4033 // path: the wave's queued `OnInvalidate` cleanup hooks would be
4034 // erased BEFORE `drain_deferred` could take them, dropping every
4035 // user cleanup callback on every successful wave.
4036 // If a future change moves `deferred_cleanup_hooks` ownership
4037 // here, ALSO move the post-`clear_wave_state` take in both
4038 // BatchGuard paths to BEFORE the clear call. Until then, leaving
4039 // the field untouched here is load-bearing.
4040 for rec in self.nodes.values_mut() {
4041 rec.dirty = false;
4042 rec.involved_this_wave = false;
4043 for dr in &mut rec.dep_records {
4044 let batch_len = dr.data_batch.len();
4045 if batch_len > 0 {
4046 // Release all batch entries EXCEPT the last — the last
4047 // entry's retain transfers to prev_data.
4048 for &h in &dr.data_batch[..batch_len - 1] {
4049 self.deferred_handle_releases.push(h);
4050 }
4051 // Release the OLD prev_data (its retain was from the
4052 // previous wave's rotation or from initial delivery).
4053 if dr.prev_data != NO_HANDLE {
4054 self.deferred_handle_releases.push(dr.prev_data);
4055 }
4056 // Rotate: last batch entry becomes new prev_data.
4057 // Its retain carries over — no extra retain needed.
4058 dr.prev_data = dr.data_batch[batch_len - 1];
4059 dr.data_batch.clear();
4060 }
4061 dr.dirty = false;
4062 dr.involved_this_wave = false;
4063 }
4064 }
4065 }
4066
4067 pub(crate) fn require_node(&self, id: NodeId) -> &NodeRecord {
4068 self.nodes
4069 .get(&id)
4070 .unwrap_or_else(|| panic!("unknown node {id:?}"))
4071 }
4072
4073 pub(crate) fn require_node_mut(&mut self, id: NodeId) -> &mut NodeRecord {
4074 self.nodes
4075 .get_mut(&id)
4076 .unwrap_or_else(|| panic!("unknown node {id:?}"))
4077 }
4078}
4079
4080/// Release every binding-side refcount share owned by this `CoreState`
4081/// when the last `Core` clone drops the inner Mutex.
4082///
4083/// Without this, every retained handle in `cache` / `terminal` Error /
4084/// `dep_terminals` Error / pause-buffer-payload would leak in the binding
4085/// registry until process exit. Production bindings (napi-rs, pyo3,
4086/// wasm-bindgen) all maintain handle-ref maps that grow unbounded without
4087/// this cleanup.
4088///
4089/// Safe to call during panic unwinding — `BindingBoundary::release_handle`
4090/// is the only call, and a panicking binding during cleanup would already
4091/// have been a problem in normal operation.
4092impl Drop for CoreState {
4093 fn drop(&mut self) {
4094 // Drain pending in-flight retains too, so a panic mid-wave doesn't
4095 // strand the queue_notify retains in `deferred_handle_releases`.
4096 let pending = std::mem::take(&mut self.pending_notify);
4097 let deferred_releases = std::mem::take(&mut self.deferred_handle_releases);
4098 // `deferred_flush_jobs` carries `Vec<Sink>` clones — those Arcs
4099 // drop naturally when this CoreState drops; no handles to release
4100 // there.
4101 let _ = std::mem::take(&mut self.deferred_flush_jobs);
4102
4103 // Per-node retained handles:
4104 // - `cache` (1 retain per non-NO_HANDLE state cache or
4105 // populated compute cache).
4106 // - `terminal == Some(Error(h))` (1 retain on the terminal slot).
4107 // - `dep_terminals[i] == Some(Error(h))` (1 retain per consumer's
4108 // terminated-dep slot).
4109 // - `pause_state` paused buffer messages with payload handles
4110 // (1 retain per buffered Data/Error).
4111 for rec in self.nodes.values_mut() {
4112 if rec.cache != NO_HANDLE {
4113 self.binding.release_handle(rec.cache);
4114 }
4115 if let Some(TerminalKind::Error(h)) = rec.terminal {
4116 self.binding.release_handle(h);
4117 }
4118 for dr in &rec.dep_records {
4119 if let Some(TerminalKind::Error(h)) = dr.terminal {
4120 self.binding.release_handle(h);
4121 }
4122 // Release data_batch retains (in-flight wave data).
4123 for &h in &dr.data_batch {
4124 self.binding.release_handle(h);
4125 }
4126 // Release prev_data retain (cross-wave persistence).
4127 if dr.prev_data != NO_HANDLE {
4128 self.binding.release_handle(dr.prev_data);
4129 }
4130 }
4131 if let PauseState::Paused { buffer, .. } = &rec.pause_state {
4132 for msg in buffer {
4133 if let Some(h) = msg.payload_handle() {
4134 self.binding.release_handle(h);
4135 }
4136 }
4137 }
4138 // Slice E1: release replay-buffer retains.
4139 for &h in &rec.replay_buffer {
4140 self.binding.release_handle(h);
4141 }
4142 // Operator scratch (Slice C-3, D026): generic per-operator
4143 // state struct. Each variant's release_handles releases the
4144 // shares it owns (Scan/Reduce acc, Distinct/Pairwise prev,
4145 // Last latest + default; Take/Skip/TakeWhile own no handles).
4146 if let Some(scratch) = rec.op_scratch.as_mut() {
4147 scratch.release_handles(&*self.binding);
4148 }
4149 }
4150
4151 // Pending wave retains. Slice X4 / D2: walk all batches' messages
4152 // — `iter_messages` flattens the per-node `SmallVec<PendingBatch>`.
4153 for entry in pending.values() {
4154 for msg in entry.iter_messages() {
4155 if let Some(h) = msg.payload_handle() {
4156 self.binding.release_handle(h);
4157 }
4158 }
4159 }
4160 for h in deferred_releases {
4161 self.binding.release_handle(h);
4162 }
4163 // Wave-cache snapshot retains (defensive — should normally be
4164 // empty by the time Core drops, but a panicked-mid-wave Core
4165 // could leave them populated).
4166 let snapshots = std::mem::take(&mut self.wave_cache_snapshots);
4167 for (_, h) in snapshots {
4168 self.binding.release_handle(h);
4169 }
4170 }
4171}